Skip to content

Commit

Permalink
expose materialize_partitions as table function
Browse files Browse the repository at this point in the history
  • Loading branch information
madesroches-ubi committed Jan 15, 2025
1 parent 24986ed commit 4e9d7c9
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 8 deletions.
19 changes: 18 additions & 1 deletion python/micromegas/micromegas/flightsql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,21 @@ def retire_partitions(self, view_set_name, view_instance_id, begin, end):
)
for rb in self.query_stream(sql):
for index, row in rb.to_pandas().iterrows():
print(row['time'], row['msg'])
print(row["time"], row["msg"])

def materialize_partitions(
self, view_set_name, view_instance_id, begin, end, partition_delta_seconds
):
sql = """
SELECT time, msg
FROM materialize_partitions('{view_set_name}', '{view_instance_id}', '{begin}', '{end}', {partition_delta_seconds})
""".format(
view_set_name=view_set_name,
view_instance_id=view_instance_id,
begin=begin.isoformat(),
end=end.isoformat(),
partition_delta_seconds=partition_delta_seconds,
)
for rb in self.query_stream(sql):
for index, row in rb.to_pandas().iterrows():
print(row["time"], row["msg"])
9 changes: 9 additions & 0 deletions rust/analytics/src/dfext/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ pub fn exp_to_string(expr: &Expr) -> datafusion::error::Result<String> {
}
}

pub fn exp_to_i64(expr: &Expr) -> datafusion::error::Result<i64> {
match simplify_exp(expr)? {
Expr::Literal(ScalarValue::Int64(Some(value))) => Ok(value),
other => {
plan_err!("can't convert {other:?} to string")
}
}
}

pub fn exp_to_timestamp(expr: &Expr) -> datafusion::error::Result<DateTime<Utc>> {
match simplify_exp(expr)? {
Expr::Literal(ScalarValue::Utf8(Some(string))) => {
Expand Down
14 changes: 7 additions & 7 deletions rust/analytics/src/lakehouse/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
merge::create_merged_partition, partition_cache::PartitionCache,
partition_source_data::hash_to_object_count, view::View,
};
use crate::response_writer::{Logger, ResponseWriter};
use crate::response_writer::Logger;
use anyhow::{Context, Result};
use chrono::{DateTime, TimeDelta, Utc};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
Expand Down Expand Up @@ -114,7 +114,7 @@ async fn materialize_partition(
begin_insert: DateTime<Utc>,
end_insert: DateTime<Utc>,
view: Arc<dyn View>,
writer: Arc<ResponseWriter>,
logger: Arc<dyn Logger>,
) -> Result<()> {
let view_set_name = view.get_view_set_name();
let partition_spec = view
Expand All @@ -135,19 +135,19 @@ async fn materialize_partition(
&view_instance_id,
&view.get_file_schema_hash(),
&partition_spec.get_source_data_hash(),
writer.clone(),
logger.clone(),
)
.await?;

match strategy {
PartitionCreationStrategy::CreateFromSource => {
partition_spec
.write(lake, writer)
.write(lake, logger)
.await
.with_context(|| "writing partition")?;
}
PartitionCreationStrategy::MergeExisting => {
create_merged_partition(lake, view, begin_insert, end_insert, writer).await?;
create_merged_partition(lake, view, begin_insert, end_insert, logger).await?;
}
PartitionCreationStrategy::Abort => {}
}
Expand All @@ -162,7 +162,7 @@ pub async fn materialize_partition_range(
begin_range: DateTime<Utc>,
end_range: DateTime<Utc>,
partition_time_delta: TimeDelta,
writer: Arc<ResponseWriter>,
logger: Arc<dyn Logger>,
) -> Result<()> {
let mut begin_part = begin_range;
let mut end_part = begin_part + partition_time_delta;
Expand All @@ -173,7 +173,7 @@ pub async fn materialize_partition_range(
begin_part,
end_part,
view.clone(),
writer.clone(),
logger.clone(),
)
.await
.with_context(|| "materialize_partition")?;
Expand Down
115 changes: 115 additions & 0 deletions rust/analytics/src/lakehouse/materialize_partitions_table_function.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use super::batch_update::materialize_partition_range;
use super::partition_cache::PartitionCache;
use super::view_factory::ViewFactory;
use crate::dfext::expressions::exp_to_i64;
use crate::dfext::expressions::exp_to_string;
use crate::dfext::expressions::exp_to_timestamp;
use crate::dfext::log_stream_table_provider::LogStreamTableProvider;
use crate::dfext::task_log_exec_plan::TaskLogExecPlan;
use crate::response_writer::LogSender;
use crate::response_writer::Logger;
use anyhow::Context;
use chrono::DateTime;
use chrono::TimeDelta;
use chrono::Utc;
use datafusion::catalog::TableFunctionImpl;
use datafusion::catalog::TableProvider;
use datafusion::common::plan_err;
use datafusion::prelude::Expr;
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use micromegas_tracing::error;
use std::sync::Arc;

#[derive(Debug)]
pub struct MaterializePartitionsTableFunction {
lake: Arc<DataLakeConnection>,
view_factory: Arc<ViewFactory>,
}

impl MaterializePartitionsTableFunction {
pub fn new(lake: Arc<DataLakeConnection>, view_factory: Arc<ViewFactory>) -> Self {
Self { lake, view_factory }
}
}

#[allow(clippy::too_many_arguments)]
async fn materialize_partitions_impl(
lake: Arc<DataLakeConnection>,
view_factory: Arc<ViewFactory>,
view_set_name: &str,
view_instance_id: &str,
begin: DateTime<Utc>,
end: DateTime<Utc>,
partition_time_delta: TimeDelta,
logger: Arc<dyn Logger>,
) -> anyhow::Result<()> {
let view = view_factory
.make_view(view_set_name, view_instance_id)
.with_context(|| "making view")?;

let existing_partitions =
Arc::new(PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin, end).await?);

materialize_partition_range(
existing_partitions,
lake.clone(),
view,
begin,
end,
partition_time_delta,
logger,
)
.await?;
Ok(())
}

impl TableFunctionImpl for MaterializePartitionsTableFunction {
fn call(&self, args: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
// an alternative would be to use coerce & create_physical_expr
let Some(view_set_name) = args.first().map(exp_to_string).transpose()? else {
return plan_err!("Missing first argument, expected view_set_name: String");
};
let Some(view_instance_id) = args.get(1).map(exp_to_string).transpose()? else {
return plan_err!("Missing 2nd argument, expected view_instance_id: String");
};
let Some(begin) = args.get(2).map(exp_to_timestamp).transpose()? else {
return plan_err!("Missing 3rd argument, expected a UTC nanoseconds timestamp");
};
let Some(end) = args.get(3).map(exp_to_timestamp).transpose()? else {
return plan_err!("Missing 4th argument, expected a UTC nanoseconds timestamp");
};
let Some(delta) = args.get(4).map(exp_to_i64).transpose()? else {
return plan_err!("Missing 5th argument, expected a number of seconds(i64)");
};

let lake = self.lake.clone();
let view_factory = self.view_factory.clone();

let spawner = move || {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let logger = Arc::new(LogSender::new(tx));
tokio::spawn(async move {
if let Err(e) = materialize_partitions_impl(
lake,
view_factory,
&view_set_name,
&view_instance_id,
begin,
end,
TimeDelta::seconds(delta),
logger,
)
.await
.with_context(|| "materialize_partitions_impl")
{
error!("{e:?}");
}
});
rx
};

Ok(Arc::new(LogStreamTableProvider {
log_stream: Arc::new(TaskLogExecPlan::new(Box::new(spawner))),
}))
}
}
2 changes: 2 additions & 0 deletions rust/analytics/src/lakehouse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub mod list_partitions_table_function;
pub mod log_block_processor;
/// Materializable view of log entries accessible through datafusion
pub mod log_view;
/// Exposes materialize_partitions as a table function
pub mod materialize_partitions_table_function;
/// Merge consecutive parquet partitions into a single file
pub mod merge;
/// Specification for a view partition backed by a table in the postgresql metadata database.
Expand Down
8 changes: 8 additions & 0 deletions rust/analytics/src/lakehouse/query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{
answer::Answer, list_partitions_table_function::ListPartitionsTableFunction,
materialize_partitions_table_function::MaterializePartitionsTableFunction,
partition_cache::QueryPartitionProvider, property_get_function::PropertyGet,
retire_partitions_table_function::RetirePartitionsTableFunction, view::View,
view_factory::ViewFactory,
Expand Down Expand Up @@ -102,6 +103,13 @@ pub async fn make_session_context(
"retire_partitions",
Arc::new(RetirePartitionsTableFunction::new(lake.clone())),
);
ctx.register_udtf(
"materialize_partitions",
Arc::new(MaterializePartitionsTableFunction::new(
lake.clone(),
view_factory.clone(),
)),
);

ctx.register_udf(ScalarUDF::from(PropertyGet::new()));

Expand Down

0 comments on commit 4e9d7c9

Please sign in to comment.