diff --git a/python/micromegas/micromegas/flightsql/client.py b/python/micromegas/micromegas/flightsql/client.py index 7adfcc5..0c57a53 100644 --- a/python/micromegas/micromegas/flightsql/client.py +++ b/python/micromegas/micromegas/flightsql/client.py @@ -235,4 +235,21 @@ def retire_partitions(self, view_set_name, view_instance_id, begin, end): ) for rb in self.query_stream(sql): for index, row in rb.to_pandas().iterrows(): - print(row['time'], row['msg']) + print(row["time"], row["msg"]) + + def materialize_partitions( + self, view_set_name, view_instance_id, begin, end, partition_delta_seconds + ): + sql = """ + SELECT time, msg + FROM materialize_partitions('{view_set_name}', '{view_instance_id}', '{begin}', '{end}', {partition_delta_seconds}) + """.format( + view_set_name=view_set_name, + view_instance_id=view_instance_id, + begin=begin.isoformat(), + end=end.isoformat(), + partition_delta_seconds=partition_delta_seconds, + ) + for rb in self.query_stream(sql): + for index, row in rb.to_pandas().iterrows(): + print(row["time"], row["msg"]) diff --git a/rust/analytics/src/dfext/expressions.rs b/rust/analytics/src/dfext/expressions.rs index 9c6c060..b24d737 100644 --- a/rust/analytics/src/dfext/expressions.rs +++ b/rust/analytics/src/dfext/expressions.rs @@ -22,6 +22,15 @@ pub fn exp_to_string(expr: &Expr) -> datafusion::error::Result { } } +pub fn exp_to_i64(expr: &Expr) -> datafusion::error::Result { + match simplify_exp(expr)? { + Expr::Literal(ScalarValue::Int64(Some(value))) => Ok(value), + other => { + plan_err!("can't convert {other:?} to string") + } + } +} + pub fn exp_to_timestamp(expr: &Expr) -> datafusion::error::Result> { match simplify_exp(expr)? { Expr::Literal(ScalarValue::Utf8(Some(string))) => { diff --git a/rust/analytics/src/lakehouse/batch_update.rs b/rust/analytics/src/lakehouse/batch_update.rs index 6d80958..34f60df 100644 --- a/rust/analytics/src/lakehouse/batch_update.rs +++ b/rust/analytics/src/lakehouse/batch_update.rs @@ -2,7 +2,7 @@ use super::{ merge::create_merged_partition, partition_cache::PartitionCache, partition_source_data::hash_to_object_count, view::View, }; -use crate::response_writer::{Logger, ResponseWriter}; +use crate::response_writer::Logger; use anyhow::{Context, Result}; use chrono::{DateTime, TimeDelta, Utc}; use micromegas_ingestion::data_lake_connection::DataLakeConnection; @@ -114,7 +114,7 @@ async fn materialize_partition( begin_insert: DateTime, end_insert: DateTime, view: Arc, - writer: Arc, + logger: Arc, ) -> Result<()> { let view_set_name = view.get_view_set_name(); let partition_spec = view @@ -135,19 +135,19 @@ async fn materialize_partition( &view_instance_id, &view.get_file_schema_hash(), &partition_spec.get_source_data_hash(), - writer.clone(), + logger.clone(), ) .await?; match strategy { PartitionCreationStrategy::CreateFromSource => { partition_spec - .write(lake, writer) + .write(lake, logger) .await .with_context(|| "writing partition")?; } PartitionCreationStrategy::MergeExisting => { - create_merged_partition(lake, view, begin_insert, end_insert, writer).await?; + create_merged_partition(lake, view, begin_insert, end_insert, logger).await?; } PartitionCreationStrategy::Abort => {} } @@ -162,7 +162,7 @@ pub async fn materialize_partition_range( begin_range: DateTime, end_range: DateTime, partition_time_delta: TimeDelta, - writer: Arc, + logger: Arc, ) -> Result<()> { let mut begin_part = begin_range; let mut end_part = begin_part + partition_time_delta; @@ -173,7 +173,7 @@ pub async fn materialize_partition_range( begin_part, end_part, view.clone(), - writer.clone(), + logger.clone(), ) .await .with_context(|| "materialize_partition")?; diff --git a/rust/analytics/src/lakehouse/materialize_partitions_table_function.rs b/rust/analytics/src/lakehouse/materialize_partitions_table_function.rs new file mode 100644 index 0000000..205c121 --- /dev/null +++ b/rust/analytics/src/lakehouse/materialize_partitions_table_function.rs @@ -0,0 +1,115 @@ +use super::batch_update::materialize_partition_range; +use super::partition_cache::PartitionCache; +use super::view_factory::ViewFactory; +use crate::dfext::expressions::exp_to_i64; +use crate::dfext::expressions::exp_to_string; +use crate::dfext::expressions::exp_to_timestamp; +use crate::dfext::log_stream_table_provider::LogStreamTableProvider; +use crate::dfext::task_log_exec_plan::TaskLogExecPlan; +use crate::response_writer::LogSender; +use crate::response_writer::Logger; +use anyhow::Context; +use chrono::DateTime; +use chrono::TimeDelta; +use chrono::Utc; +use datafusion::catalog::TableFunctionImpl; +use datafusion::catalog::TableProvider; +use datafusion::common::plan_err; +use datafusion::prelude::Expr; +use micromegas_ingestion::data_lake_connection::DataLakeConnection; +use micromegas_tracing::error; +use std::sync::Arc; + +#[derive(Debug)] +pub struct MaterializePartitionsTableFunction { + lake: Arc, + view_factory: Arc, +} + +impl MaterializePartitionsTableFunction { + pub fn new(lake: Arc, view_factory: Arc) -> Self { + Self { lake, view_factory } + } +} + +#[allow(clippy::too_many_arguments)] +async fn materialize_partitions_impl( + lake: Arc, + view_factory: Arc, + view_set_name: &str, + view_instance_id: &str, + begin: DateTime, + end: DateTime, + partition_time_delta: TimeDelta, + logger: Arc, +) -> anyhow::Result<()> { + let view = view_factory + .make_view(view_set_name, view_instance_id) + .with_context(|| "making view")?; + + let existing_partitions = + Arc::new(PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin, end).await?); + + materialize_partition_range( + existing_partitions, + lake.clone(), + view, + begin, + end, + partition_time_delta, + logger, + ) + .await?; + Ok(()) +} + +impl TableFunctionImpl for MaterializePartitionsTableFunction { + fn call(&self, args: &[Expr]) -> datafusion::error::Result> { + // an alternative would be to use coerce & create_physical_expr + let Some(view_set_name) = args.first().map(exp_to_string).transpose()? else { + return plan_err!("Missing first argument, expected view_set_name: String"); + }; + let Some(view_instance_id) = args.get(1).map(exp_to_string).transpose()? else { + return plan_err!("Missing 2nd argument, expected view_instance_id: String"); + }; + let Some(begin) = args.get(2).map(exp_to_timestamp).transpose()? else { + return plan_err!("Missing 3rd argument, expected a UTC nanoseconds timestamp"); + }; + let Some(end) = args.get(3).map(exp_to_timestamp).transpose()? else { + return plan_err!("Missing 4th argument, expected a UTC nanoseconds timestamp"); + }; + let Some(delta) = args.get(4).map(exp_to_i64).transpose()? else { + return plan_err!("Missing 5th argument, expected a number of seconds(i64)"); + }; + + let lake = self.lake.clone(); + let view_factory = self.view_factory.clone(); + + let spawner = move || { + let (tx, rx) = tokio::sync::mpsc::channel(100); + let logger = Arc::new(LogSender::new(tx)); + tokio::spawn(async move { + if let Err(e) = materialize_partitions_impl( + lake, + view_factory, + &view_set_name, + &view_instance_id, + begin, + end, + TimeDelta::seconds(delta), + logger, + ) + .await + .with_context(|| "materialize_partitions_impl") + { + error!("{e:?}"); + } + }); + rx + }; + + Ok(Arc::new(LogStreamTableProvider { + log_stream: Arc::new(TaskLogExecPlan::new(Box::new(spawner))), + })) + } +} diff --git a/rust/analytics/src/lakehouse/mod.rs b/rust/analytics/src/lakehouse/mod.rs index 4c083a0..0864ee0 100644 --- a/rust/analytics/src/lakehouse/mod.rs +++ b/rust/analytics/src/lakehouse/mod.rs @@ -16,6 +16,8 @@ pub mod list_partitions_table_function; pub mod log_block_processor; /// Materializable view of log entries accessible through datafusion pub mod log_view; +/// Exposes materialize_partitions as a table function +pub mod materialize_partitions_table_function; /// Merge consecutive parquet partitions into a single file pub mod merge; /// Specification for a view partition backed by a table in the postgresql metadata database. diff --git a/rust/analytics/src/lakehouse/query.rs b/rust/analytics/src/lakehouse/query.rs index 14e1630..5683209 100644 --- a/rust/analytics/src/lakehouse/query.rs +++ b/rust/analytics/src/lakehouse/query.rs @@ -1,5 +1,6 @@ use super::{ answer::Answer, list_partitions_table_function::ListPartitionsTableFunction, + materialize_partitions_table_function::MaterializePartitionsTableFunction, partition_cache::QueryPartitionProvider, property_get_function::PropertyGet, retire_partitions_table_function::RetirePartitionsTableFunction, view::View, view_factory::ViewFactory, @@ -102,6 +103,13 @@ pub async fn make_session_context( "retire_partitions", Arc::new(RetirePartitionsTableFunction::new(lake.clone())), ); + ctx.register_udtf( + "materialize_partitions", + Arc::new(MaterializePartitionsTableFunction::new( + lake.clone(), + view_factory.clone(), + )), + ); ctx.register_udf(ScalarUDF::from(PropertyGet::new()));