diff --git a/crates/rayexec_execution/src/functions/table/mod.rs b/crates/rayexec_execution/src/functions/table/mod.rs index d6823890b..b5b15e49b 100644 --- a/crates/rayexec_execution/src/functions/table/mod.rs +++ b/crates/rayexec_execution/src/functions/table/mod.rs @@ -268,3 +268,16 @@ pub fn try_get_named<'a>( )) }) } + +pub fn try_get_positional<'a>( + func: &impl TableFunction, + pos: usize, + positional: &'a [OwnedScalarValue], +) -> Result<&'a OwnedScalarValue> { + positional.get(pos).ok_or_else(|| { + RayexecError::new(format!( + "Expected argument at position {pos} for function {}", + func.name() + )) + }) +} diff --git a/crates/rayexec_iceberg/src/read_iceberg.rs b/crates/rayexec_iceberg/src/read_iceberg.rs index 44d53e95e..f7283c0ff 100644 --- a/crates/rayexec_iceberg/src/read_iceberg.rs +++ b/crates/rayexec_iceberg/src/read_iceberg.rs @@ -1,16 +1,24 @@ +use std::collections::HashMap; use std::sync::Arc; use futures::future::BoxFuture; +use futures::FutureExt; use rayexec_bullet::datatype::DataTypeId; -use rayexec_bullet::field::Schema; -use rayexec_error::{not_implemented, RayexecError, Result}; +use rayexec_bullet::scalar::OwnedScalarValue; +use rayexec_error::Result; use rayexec_execution::database::DatabaseContext; -use rayexec_execution::functions::table::inputs::TableFunctionInputs; -use rayexec_execution::functions::table::{PlannedTableFunction2, TableFunction}; +use rayexec_execution::expr; +use rayexec_execution::functions::table::{ + try_location_and_access_config_from_args, + PlannedTableFunction, + ScanPlanner, + TableFunction, + TableFunctionImpl, + TableFunctionPlanner, +}; use rayexec_execution::functions::{FunctionInfo, Signature}; +use rayexec_execution::logical::statistics::StatisticsValue; use rayexec_execution::runtime::Runtime; -use rayexec_execution::storage::table_storage::DataTable; -use rayexec_io::location::{AccessConfig, FileLocation}; use crate::datatable::IcebergDataTable; use crate::table::Table; @@ -39,83 +47,46 @@ impl FunctionInfo for ReadIceberg { } impl TableFunction for ReadIceberg { - fn plan_and_initialize<'a>( - &self, - _context: &'a DatabaseContext, - args: TableFunctionInputs, - ) -> BoxFuture<'a, Result>> { - let func = self.clone(); - Box::pin(async move { ReadIcebergImpl::initialize(func, args).await }) - } - - fn decode_state(&self, _state: &[u8]) -> Result> { - // TODO - not_implemented!("decode iceberg state") + fn planner(&self) -> TableFunctionPlanner { + TableFunctionPlanner::Scan(self) } } -#[derive(Debug, Clone)] -struct ReadIcebergState { - _location: FileLocation, - _conf: AccessConfig, - schema: Schema, - table: Option>, // Populate on re-init if needed. -} - -#[derive(Debug, Clone)] -pub struct ReadIcebergImpl { - func: ReadIceberg, - state: ReadIcebergState, +impl ScanPlanner for ReadIceberg { + fn plan<'a>( + &self, + context: &'a DatabaseContext, + positional_inputs: Vec, + named_inputs: HashMap, + ) -> BoxFuture<'a, Result> { + Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed() + } } -impl ReadIcebergImpl { - async fn initialize( - func: ReadIceberg, - args: TableFunctionInputs, - ) -> Result> { - let (location, conf) = args.try_location_and_access_config()?; - let provider = func.runtime.file_provider(); +impl ReadIceberg { + async fn plan_inner<'a>( + self: Self, + _context: &'a DatabaseContext, + positional_inputs: Vec, + named_inputs: HashMap, + ) -> Result { + let (location, conf) = + try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?; + let provider = self.runtime.file_provider(); // TODO: Fetch stats, use during planning. let table = Table::load(location.clone(), provider, conf.clone()).await?; let schema = table.schema()?; - Ok(Box::new(ReadIcebergImpl { - func, - state: ReadIcebergState { - _location: location, - _conf: conf, - schema, - table: Some(Arc::new(table)), - }, - })) - } -} - -impl PlannedTableFunction2 for ReadIcebergImpl { - fn reinitialize(&self) -> BoxFuture> { - // TODO: See delta - Box::pin(async move { not_implemented!("reinit iceberg state") }) - } - - fn table_function(&self) -> &dyn TableFunction { - &self.func - } - - fn encode_state(&self, _state: &mut Vec) -> Result<()> { - not_implemented!("encode iceberg state") - } - - fn schema(&self) -> Schema { - self.state.schema.clone() - } - - fn datatable(&self) -> Result> { - let table = match self.state.table.as_ref() { - Some(table) => table.clone(), - None => return Err(RayexecError::new("Iceberg table not initialized")), - }; - - Ok(Box::new(IcebergDataTable { table })) + Ok(PlannedTableFunction { + function: Box::new(self), + positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(), + named_inputs, + function_impl: TableFunctionImpl::Scan(Arc::new(IcebergDataTable { + table: Arc::new(table), // TODO: Arc Arc + })), + cardinality: StatisticsValue::Unknown, + schema, + }) } } diff --git a/crates/rayexec_unity_catalog/src/functions.rs b/crates/rayexec_unity_catalog/src/functions.rs index f3669825c..0dc25fcea 100644 --- a/crates/rayexec_unity_catalog/src/functions.rs +++ b/crates/rayexec_unity_catalog/src/functions.rs @@ -1,18 +1,29 @@ +use std::collections::HashMap; use std::fmt::{self, Debug}; use std::marker::PhantomData; +use std::sync::Arc; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt}; use rayexec_bullet::array::Array; use rayexec_bullet::batch::Batch; use rayexec_bullet::datatype::{DataType, DataTypeId}; use rayexec_bullet::field::{Field, Schema}; -use rayexec_error::{not_implemented, Result}; +use rayexec_bullet::scalar::OwnedScalarValue; +use rayexec_error::Result; use rayexec_execution::database::DatabaseContext; -use rayexec_execution::functions::table::inputs::TableFunctionInputs; -use rayexec_execution::functions::table::{PlannedTableFunction2, TableFunction}; +use rayexec_execution::expr; +use rayexec_execution::functions::table::{ + try_get_positional, + PlannedTableFunction, + ScanPlanner, + TableFunction, + TableFunctionImpl, + TableFunctionPlanner, +}; use rayexec_execution::functions::{FunctionInfo, Signature}; +use rayexec_execution::logical::statistics::StatisticsValue; use rayexec_execution::runtime::Runtime; use rayexec_execution::storage::table_storage::{ DataTable, @@ -43,9 +54,10 @@ pub trait UnityObjectsOperation: /// Create the connection state. fn create_connection_state( - runtime: R, + info: UnityObjects, context: &DatabaseContext, - args: TableFunctionInputs, + positional_args: Vec, + named_args: HashMap, ) -> BoxFuture<'_, Result>; /// Create a stream state from the connection state. @@ -95,15 +107,16 @@ impl UnityObjectsOperation for ListSchemasOperation { } fn create_connection_state( - runtime: R, + info: UnityObjects, _context: &DatabaseContext, - args: TableFunctionInputs, + positional_args: Vec, + _named_args: HashMap, ) -> BoxFuture<'_, Result> { Box::pin(async move { - let endpoint = args.try_get_position(0)?.try_as_str()?; - let catalog = args.try_get_position(1)?.try_as_str()?; + let endpoint = try_get_positional(&info, 0, &positional_args)?.try_as_str()?; + let catalog = try_get_positional(&info, 1, &positional_args)?.try_as_str()?; - let conn = UnityCatalogConnection::connect(runtime, endpoint, catalog).await?; + let conn = UnityCatalogConnection::connect(info.runtime, endpoint, catalog).await?; Ok(ListSchemasConnectionState { conn }) }) @@ -177,16 +190,17 @@ impl UnityObjectsOperation for ListTablesOperation { } fn create_connection_state( - runtime: R, + info: UnityObjects, _context: &DatabaseContext, - args: TableFunctionInputs, + positional_args: Vec, + _named_args: HashMap, ) -> BoxFuture<'_, Result> { Box::pin(async move { - let endpoint = args.try_get_position(0)?.try_as_str()?; - let catalog = args.try_get_position(1)?.try_as_str()?; - let schema = args.try_get_position(2)?.try_as_str()?; + let endpoint = try_get_positional(&info, 0, &positional_args)?.try_as_str()?; + let catalog = try_get_positional(&info, 1, &positional_args)?.try_as_str()?; + let schema = try_get_positional(&info, 2, &positional_args)?.try_as_str()?; - let conn = UnityCatalogConnection::connect(runtime, endpoint, catalog).await?; + let conn = UnityCatalogConnection::connect(info.runtime, endpoint, catalog).await?; Ok(ListTablesConnectionState { conn, @@ -262,48 +276,48 @@ impl> FunctionInfo for UnityObjects> TableFunction for UnityObjects { - fn plan_and_initialize<'a>( - &self, - context: &'a DatabaseContext, - args: TableFunctionInputs, - ) -> BoxFuture<'a, Result>> { - let func = self.clone(); - let runtime = self.runtime.clone(); - - Box::pin(async move { - let state = O::create_connection_state(runtime, context, args).await?; - Ok(Box::new(UnityObjectsImpl:: { func, state }) as _) - }) + fn planner(&self) -> TableFunctionPlanner { + TableFunctionPlanner::Scan(self) } - - fn decode_state(&self, _state: &[u8]) -> Result> { - not_implemented!("decode state for unity operation") - } -} - -#[derive(Debug, Clone)] -pub struct UnityObjectsImpl> { - func: UnityObjects, - state: O::ConnectionState, } -impl> PlannedTableFunction2 for UnityObjectsImpl { - fn table_function(&self) -> &dyn TableFunction { - &self.func - } - - fn schema(&self) -> Schema { - O::schema() - } - - fn encode_state(&self, _state: &mut Vec) -> Result<()> { - not_implemented!("decode state for unity operation") +impl> ScanPlanner for UnityObjects { + fn plan<'a>( + &self, + context: &'a DatabaseContext, + positional_inputs: Vec, + named_inputs: HashMap, + ) -> BoxFuture<'a, Result> { + Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed() } +} - fn datatable(&self) -> Result> { - Ok(Box::new(UnityObjectsDataTable:: { - state: self.state.clone(), - })) +impl> UnityObjects { + async fn plan_inner<'a>( + self: Self, + context: &'a DatabaseContext, + positional_inputs: Vec, + named_inputs: HashMap, + ) -> Result { + // TODO: Remove clones. + let state = O::create_connection_state( + self.clone(), + context, + positional_inputs.clone(), + named_inputs.clone(), + ) + .await?; + + Ok(PlannedTableFunction { + function: Box::new(self), + positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(), + named_inputs, + function_impl: TableFunctionImpl::Scan(Arc::new(UnityObjectsDataTable:: { + state, + })), + cardinality: StatisticsValue::Unknown, + schema: O::schema(), + }) } }