diff --git a/crates/rayexec_csv/src/read_csv.rs b/crates/rayexec_csv/src/read_csv.rs index 5ceef770b..4b8d31627 100644 --- a/crates/rayexec_csv/src/read_csv.rs +++ b/crates/rayexec_csv/src/read_csv.rs @@ -1,19 +1,25 @@ +use std::collections::HashMap; +use std::sync::Arc; + use futures::future::BoxFuture; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use rayexec_bullet::datatype::DataTypeId; -use rayexec_bullet::field::Schema; +use rayexec_bullet::scalar::OwnedScalarValue; use rayexec_error::{RayexecError, Result}; use rayexec_execution::database::DatabaseContext; -use rayexec_execution::functions::table::inputs::TableFunctionInputs; -use rayexec_execution::functions::table::{PlannedTableFunction2, TableFunction}; +use rayexec_execution::expr; +use rayexec_execution::functions::table::{ + try_location_and_access_config_from_args, + PlannedTableFunction, + ScanPlanner, + TableFunction, + TableFunctionImpl, + TableFunctionPlanner, +}; use rayexec_execution::functions::{FunctionInfo, Signature}; +use rayexec_execution::logical::statistics::StatisticsValue; use rayexec_execution::runtime::Runtime; -use rayexec_execution::storage::table_storage::DataTable; -use rayexec_io::location::{AccessConfig, FileLocation}; use rayexec_io::{FileProvider, FileSource}; -use rayexec_proto::packed::{PackedDecoder, PackedEncoder}; -use rayexec_proto::ProtoConv; -use serde::{Deserialize, Serialize}; use crate::datatable::SingleFileCsvDataTable; use crate::decoder::{CsvDecoder, DecoderState}; @@ -43,79 +49,33 @@ impl FunctionInfo for ReadCsv { } impl TableFunction for ReadCsv { - fn plan_and_initialize<'a>( - &self, - _context: &'a DatabaseContext, - args: TableFunctionInputs, - ) -> BoxFuture<'a, Result>> { - Box::pin(ReadCsvImpl::initialize(self.clone(), args)) - } - - fn decode_state(&self, state: &[u8]) -> Result> { - let state = ReadCsvState::decode(state)?; - Ok(Box::new(ReadCsvImpl { - func: self.clone(), - state, - })) + fn planner(&self) -> TableFunctionPlanner { + TableFunctionPlanner::Scan(self) } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -struct ReadCsvState { - location: FileLocation, - conf: AccessConfig, - csv_schema: CsvSchema, - dialect: DialectOptions, -} - -impl ReadCsvState { - fn encode(&self, buf: &mut Vec) -> Result<()> { - let mut packed = PackedEncoder::new(buf); - packed.encode_next(&self.location.to_proto()?)?; - packed.encode_next(&self.conf.to_proto()?)?; - packed.encode_next(&self.csv_schema.schema.to_proto()?)?; - packed.encode_next(&self.csv_schema.has_header)?; - packed.encode_next(&self.csv_schema.has_header)?; - packed.encode_next(&(self.dialect.delimiter as i32))?; - packed.encode_next(&(self.dialect.quote as i32))?; - Ok(()) - } - - fn decode(buf: &[u8]) -> Result { - let mut packed = PackedDecoder::new(buf); - let location = FileLocation::from_proto(packed.decode_next()?)?; - let conf = AccessConfig::from_proto(packed.decode_next()?)?; - let schema = Schema::from_proto(packed.decode_next()?)?; - let has_header: bool = packed.decode_next()?; - let delimiter: i32 = packed.decode_next()?; - let quote: i32 = packed.decode_next()?; - - Ok(ReadCsvState { - location, - conf, - csv_schema: CsvSchema { schema, has_header }, - dialect: DialectOptions { - delimiter: delimiter as u8, - quote: quote as u8, - }, - }) +impl ScanPlanner for ReadCsv { + fn plan<'a>( + &self, + context: &'a DatabaseContext, + positional_inputs: Vec, + named_inputs: HashMap, + ) -> BoxFuture<'a, Result> { + Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed() } } -#[derive(Debug, Clone, PartialEq, Eq)] -struct ReadCsvImpl { - func: ReadCsv, - state: ReadCsvState, -} - -impl ReadCsvImpl { - async fn initialize( - func: ReadCsv, - args: TableFunctionInputs, - ) -> Result> { - let (location, conf) = args.try_location_and_access_config()?; +impl ReadCsv { + async fn plan_inner<'a>( + self: Self, + _context: &'a DatabaseContext, + positional_inputs: Vec, + named_inputs: HashMap, + ) -> Result { + let (location, conf) = + try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?; - let mut source = func + let mut source = self .runtime .file_provider() .file_source(location.clone(), &conf)?; @@ -143,38 +103,23 @@ impl ReadCsvImpl { let completed = state.completed_records(); let csv_schema = CsvSchema::infer_from_records(completed)?; - Ok(Box::new(Self { - func, - state: ReadCsvState { - location, - conf, - dialect, - csv_schema, - }, - })) - } -} - -impl PlannedTableFunction2 for ReadCsvImpl { - fn table_function(&self) -> &dyn TableFunction { - &self.func - } - - fn encode_state(&self, state: &mut Vec) -> Result<()> { - self.state.encode(state) - } + let schema = csv_schema.schema.clone(); - fn schema(&self) -> Schema { - self.state.csv_schema.schema.clone() - } + let datatable = SingleFileCsvDataTable { + options: dialect, + csv_schema, + location, + conf, + runtime: self.runtime.clone(), + }; - fn datatable(&self) -> Result> { - Ok(Box::new(SingleFileCsvDataTable { - options: self.state.dialect, - csv_schema: self.state.csv_schema.clone(), - location: self.state.location.clone(), - conf: self.state.conf.clone(), - runtime: self.func.runtime.clone(), - })) + Ok(PlannedTableFunction { + function: Box::new(self), + positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(), + named_inputs, + function_impl: TableFunctionImpl::Scan(Arc::new(datatable)), + cardinality: StatisticsValue::Unknown, + schema, + }) } } diff --git a/crates/rayexec_execution/src/execution/operators/table_function.rs b/crates/rayexec_execution/src/execution/operators/table_function.rs index fcd4c9444..c3e4de288 100644 --- a/crates/rayexec_execution/src/execution/operators/table_function.rs +++ b/crates/rayexec_execution/src/execution/operators/table_function.rs @@ -20,13 +20,12 @@ use super::{ }; use crate::database::DatabaseContext; use crate::explain::explainable::{ExplainConfig, ExplainEntry, Explainable}; -use crate::functions::table::scan::TableScanState; use crate::functions::table::{PlannedTableFunction, TableFunctionImpl}; use crate::proto::DatabaseProtoConv; -use crate::storage::table_storage::Projections; +use crate::storage::table_storage::{DataTableScan, Projections}; pub struct TableFunctionPartitionState { - scan_state: Box, + scan_state: Box, /// In progress pull we're working on. future: Option>>>, } diff --git a/crates/rayexec_execution/src/functions/table/builtin/refresh.rs b/crates/rayexec_execution/src/functions/table/builtin/refresh.rs index 7521bf3f2..fa750739d 100644 --- a/crates/rayexec_execution/src/functions/table/builtin/refresh.rs +++ b/crates/rayexec_execution/src/functions/table/builtin/refresh.rs @@ -9,10 +9,9 @@ use rayexec_error::{RayexecError, Result}; use crate::database::memory_catalog::MemoryCatalog; use crate::database::DatabaseContext; -use crate::functions::table::{PlannedTableFunction2, TableFunction, TableFunctionInputs}; +use crate::functions::table::{TableFunction, TableFunctionInputs, TableFunctionPlanner}; use crate::functions::{FunctionInfo, Signature}; use crate::storage::catalog_storage::CatalogStorage; -use crate::storage::table_storage::DataTable; pub trait RefreshOperation: Debug + Clone + Copy + PartialEq + Eq + Sync + Send + 'static { const NAME: &'static str; @@ -96,40 +95,7 @@ impl FunctionInfo for RefreshObjects { } impl TableFunction for RefreshObjects { - fn plan_and_initialize<'a>( - &self, - _context: &'a DatabaseContext, - _args: TableFunctionInputs, - ) -> BoxFuture<'a, Result>> { - unimplemented!() - } - - fn decode_state(&self, _state: &[u8]) -> Result> { - unimplemented!() - } -} - -#[derive(Debug, Clone)] -pub struct RefreshObjectsImpl { - func: RefreshObjects, - _state: Option, - _op: PhantomData, -} - -impl PlannedTableFunction2 for RefreshObjectsImpl { - fn encode_state(&self, _state: &mut Vec) -> Result<()> { - Ok(()) - } - - fn table_function(&self) -> &dyn TableFunction { - &self.func - } - - fn schema(&self) -> Schema { - O::schema() - } - - fn datatable(&self) -> Result> { + fn planner(&self) -> TableFunctionPlanner { unimplemented!() } } diff --git a/crates/rayexec_execution/src/functions/table/builtin/system.rs b/crates/rayexec_execution/src/functions/table/builtin/system.rs index 7d869d332..b585f67f2 100644 --- a/crates/rayexec_execution/src/functions/table/builtin/system.rs +++ b/crates/rayexec_execution/src/functions/table/builtin/system.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; @@ -7,9 +7,10 @@ use futures::future::BoxFuture; use parking_lot::Mutex; use rayexec_bullet::array::Array; use rayexec_bullet::batch::Batch; -use rayexec_bullet::datatype::DataType; +use rayexec_bullet::datatype::{DataType, DataTypeId}; use rayexec_bullet::executor::builder::{ArrayDataBuffer, GermanVarlenBuffer}; use rayexec_bullet::field::{Field, Schema}; +use rayexec_bullet::scalar::OwnedScalarValue; use rayexec_bullet::storage::GermanVarlenStorage; use rayexec_error::{OptionExt, RayexecError, Result}; @@ -17,8 +18,16 @@ use crate::database::catalog::CatalogTx; use crate::database::catalog_entry::{CatalogEntryInner, CatalogEntryType}; use crate::database::memory_catalog::MemoryCatalog; use crate::database::{AttachInfo, DatabaseContext}; -use crate::functions::table::{PlannedTableFunction2, TableFunction, TableFunctionInputs}; +use crate::expr; +use crate::functions::table::{ + PlannedTableFunction, + ScanPlanner, + TableFunction, + TableFunctionImpl, + TableFunctionPlanner, +}; use crate::functions::{FunctionInfo, Signature}; +use crate::logical::statistics::StatisticsValue; use crate::storage::table_storage::{ DataTable, DataTableScan, @@ -248,18 +257,35 @@ impl FunctionInfo for SystemFunction { } fn signatures(&self) -> &[Signature] { - unimplemented!() + &[Signature { + positional_args: &[], + variadic_arg: None, + return_type: DataTypeId::Any, + }] } } impl TableFunction for SystemFunction { - fn plan_and_initialize<'a>( + fn planner(&self) -> TableFunctionPlanner { + TableFunctionPlanner::Scan(&SystemFunctionPlanner:: { _f: PhantomData }) + } +} + +#[derive(Debug, Clone)] +pub struct SystemFunctionPlanner { + _f: PhantomData, +} + +impl ScanPlanner for SystemFunctionPlanner +where + F: SystemFunctionImpl, +{ + fn plan<'a>( &self, context: &'a DatabaseContext, - _args: TableFunctionInputs, - ) -> BoxFuture<'a, Result>> { - // TODO: Method on args returning an error if not empty. - + positional_inputs: Vec, + named_inputs: HashMap, + ) -> BoxFuture<'a, Result> { let databases = context .iter_databases() .map(|(name, database)| { @@ -271,55 +297,27 @@ impl TableFunction for SystemFunction { }) .collect(); - let function = *self; - Box::pin(async move { - Ok(Box::new(PlannedSystemFunction { - databases, - function, - }) as _) - }) - } - - fn decode_state(&self, _state: &[u8]) -> Result> { - Ok(Box::new(PlannedSystemFunction { - databases: Vec::new(), - function: *self, - })) + let planned = PlannedTableFunction { + function: Box::new(SystemFunction::::new()), + positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(), + named_inputs, + function_impl: TableFunctionImpl::Scan(Arc::new(SystemDataTable:: { + databases: Arc::new(Mutex::new(Some(databases))), + _f: PhantomData, + })), + cardinality: StatisticsValue::Unknown, + schema: F::schema(), + }; + + Box::pin(async move { Ok(planned) }) } } #[derive(Debug, Clone)] -pub struct PlannedSystemFunction { - databases: Vec<(String, Arc, Option)>, - function: SystemFunction, -} - -impl PlannedTableFunction2 for PlannedSystemFunction { - fn encode_state(&self, _state: &mut Vec) -> Result<()> { - Ok(()) - } - - fn table_function(&self) -> &dyn TableFunction { - &self.function - } - - fn schema(&self) -> Schema { - F::schema() - } - - fn datatable(&self) -> Result> { - Ok(Box::new(SystemDataTable { - databases: Mutex::new(Some(self.databases.clone().into_iter().collect())), - function: self.function, - })) - } -} - -#[derive(Debug)] struct SystemDataTable { #[allow(clippy::type_complexity)] // Temp - databases: Mutex, Option)>>>, - function: SystemFunction, + databases: Arc, Option)>>>>, + _f: PhantomData, } impl DataTable for SystemDataTable { @@ -335,9 +333,9 @@ impl DataTable for SystemDataTable { .ok_or_else(|| RayexecError::new("Scan called multiple times"))?; let mut scans: Vec> = vec![Box::new(ProjectedScan::new( - SystemDataTableScan { + SystemDataTableScan:: { databases, - _function: self.function, + _f: PhantomData, }, projections, )) as _]; @@ -351,7 +349,7 @@ impl DataTable for SystemDataTable { #[derive(Debug)] struct SystemDataTableScan { databases: VecDeque<(String, Arc, Option)>, - _function: SystemFunction, + _f: PhantomData, } impl DataTableScan for SystemDataTableScan { diff --git a/crates/rayexec_execution/src/functions/table/mod.rs b/crates/rayexec_execution/src/functions/table/mod.rs index e7cd55321..d6823890b 100644 --- a/crates/rayexec_execution/src/functions/table/mod.rs +++ b/crates/rayexec_execution/src/functions/table/mod.rs @@ -1,10 +1,10 @@ pub mod builtin; pub mod inout; pub mod inputs; -pub mod scan; use std::collections::HashMap; use std::fmt::Debug; +use std::sync::Arc; use dyn_clone::DynClone; use futures::future::BoxFuture; @@ -13,8 +13,10 @@ use inout::TableInOutFunction; use inputs::TableFunctionInputs; use rayexec_bullet::field::Schema; use rayexec_bullet::scalar::OwnedScalarValue; -use rayexec_error::Result; -use scan::TableScanFunction; +use rayexec_error::{RayexecError, Result}; +use rayexec_io::location::{AccessConfig, FileLocation}; +use rayexec_io::s3::credentials::AwsCredentials; +use rayexec_io::s3::S3Location; use super::FunctionInfo; use crate::database::DatabaseContext; @@ -146,7 +148,7 @@ impl Eq for PlannedTableFunction {} #[derive(Debug, Clone)] pub enum TableFunctionImpl { /// Table function that produces a table as its output. - Scan(Box), + Scan(Arc), /// A table function that accepts dynamic arguments and produces a table /// output. InOut(Box), @@ -205,3 +207,64 @@ impl Clone for Box { dyn_clone::clone_box(&**self) } } + +/// Try to get a file location and access config from the table args. +// TODO: Secrets provider that we pass in allowing us to get creds from some +// secrets store. +pub fn try_location_and_access_config_from_args( + func: &impl TableFunction, + positional: &[OwnedScalarValue], + named: &HashMap, +) -> Result<(FileLocation, AccessConfig)> { + let loc = match positional.first() { + Some(loc) => { + let loc = loc.try_as_str()?; + FileLocation::parse(loc) + } + None => { + return Err(RayexecError::new(format!( + "Expected at least one position argument for function {}", + func.name(), + ))) + } + }; + + let conf = match &loc { + FileLocation::Url(url) => { + if S3Location::is_s3_location(url) { + let key_id = try_get_named(func, "key_id", named)? + .try_as_str()? + .to_string(); + let secret = try_get_named(func, "secret", named)? + .try_as_str()? + .to_string(); + let region = try_get_named(func, "region", named)? + .try_as_str()? + .to_string(); + + AccessConfig::S3 { + credentials: AwsCredentials { key_id, secret }, + region, + } + } else { + AccessConfig::None + } + } + FileLocation::Path(_) => AccessConfig::None, + }; + + Ok((loc, conf)) +} + +pub fn try_get_named<'a>( + func: &impl TableFunction, + name: &str, + named: &'a HashMap, +) -> Result<&'a OwnedScalarValue> { + named.get(name).ok_or_else(|| { + RayexecError::new(format!( + "Expected named argument '{name}' for function {}", + func.name() + )) + }) +} diff --git a/crates/rayexec_execution/src/functions/table/scan.rs b/crates/rayexec_execution/src/functions/table/scan.rs deleted file mode 100644 index 122cf5bf5..000000000 --- a/crates/rayexec_execution/src/functions/table/scan.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::fmt::Debug; - -use dyn_clone::DynClone; -use futures::future::BoxFuture; -use rayexec_bullet::batch::Batch; -use rayexec_error::Result; - -use crate::storage::table_storage::Projections; - -pub trait TableScanFunction: Debug + Sync + Send + DynClone { - fn scan( - &self, - projections: Projections, - num_partitions: usize, - ) -> Result>>; -} - -pub trait TableScanState: Debug + Sync + Send { - fn pull(&mut self) -> BoxFuture<'_, Result>>; -} - -impl Clone for Box { - fn clone(&self) -> Self { - dyn_clone::clone_box(&**self) - } -} - -/// Helper for wrapping an unprojected scan with a projections list to produce -/// projected batches. -/// -/// This is inefficient compared to handling the projection in the scan itself -/// since this projects a batch after it's already been read. -#[derive(Debug)] -pub struct ProjectedTableScanState { - pub projections: Projections, - pub scan_state: S, -} - -impl ProjectedTableScanState { - pub fn new(scan_state: S, projections: Projections) -> Self { - ProjectedTableScanState { - projections, - scan_state, - } - } - - async fn pull_inner(&mut self) -> Result> { - let batch = match self.scan_state.pull().await? { - Some(batch) => batch, - None => return Ok(None), - }; - - match self.projections.column_indices.as_ref() { - Some(indices) => { - let batch = batch.project(indices); - Ok(Some(batch)) - } - None => Ok(Some(batch)), - } - } -} - -impl TableScanState for ProjectedTableScanState { - fn pull(&mut self) -> BoxFuture<'_, Result>> { - Box::pin(async { self.pull_inner().await }) - } -}