From 7a8a5aa7e33744d855812cd1d5bf5605775d5280 Mon Sep 17 00:00:00 2001 From: Sean Smith Date: Sun, 15 Dec 2024 15:58:46 -0600 Subject: [PATCH] read_parquet --- .../src/functions/read_parquet.rs | 155 ++++++------------ 1 file changed, 49 insertions(+), 106 deletions(-) diff --git a/crates/rayexec_parquet/src/functions/read_parquet.rs b/crates/rayexec_parquet/src/functions/read_parquet.rs index e0a019d09..55a598f3a 100644 --- a/crates/rayexec_parquet/src/functions/read_parquet.rs +++ b/crates/rayexec_parquet/src/functions/read_parquet.rs @@ -1,21 +1,25 @@ +use std::collections::HashMap; use std::sync::Arc; -use bytes::Bytes; use futures::future::BoxFuture; +use futures::FutureExt; use rayexec_bullet::datatype::DataTypeId; -use rayexec_bullet::field::Schema; +use rayexec_bullet::scalar::OwnedScalarValue; use rayexec_error::Result; use rayexec_execution::database::DatabaseContext; -use rayexec_execution::functions::table::inputs::TableFunctionInputs; -use rayexec_execution::functions::table::{PlannedTableFunction2, TableFunction}; +use rayexec_execution::expr; +use rayexec_execution::functions::table::{ + try_location_and_access_config_from_args, + PlannedTableFunction, + ScanPlanner, + TableFunction, + TableFunctionImpl, + TableFunctionPlanner, +}; use rayexec_execution::functions::{FunctionInfo, Signature}; use rayexec_execution::logical::statistics::StatisticsValue; use rayexec_execution::runtime::Runtime; -use rayexec_execution::storage::table_storage::DataTable; -use rayexec_io::location::{AccessConfig, FileLocation}; use rayexec_io::FileProvider; -use rayexec_proto::packed::{PackedDecoder, PackedEncoder}; -use rayexec_proto::ProtoConv; use super::datatable::RowGroupPartitionedDataTable; use crate::metadata::Metadata; @@ -45,71 +49,33 @@ impl FunctionInfo for ReadParquet { } impl TableFunction for ReadParquet { - fn plan_and_initialize<'a>( - &self, - _context: &'a DatabaseContext, - args: TableFunctionInputs, - ) -> BoxFuture<'a, Result>> { - Box::pin(ReadParquetImpl::initialize(self.clone(), args)) - } - - fn decode_state(&self, state: &[u8]) -> Result> { - Ok(Box::new(ReadParquetImpl { - func: self.clone(), - state: ReadParquetState::decode(state)?, - })) + fn planner(&self) -> TableFunctionPlanner { + TableFunctionPlanner::Scan(self) } } -#[derive(Debug, Clone)] -struct ReadParquetState { - location: FileLocation, - conf: AccessConfig, - metadata: Arc, - schema: Schema, -} - -impl ReadParquetState { - fn encode(&self, buf: &mut Vec) -> Result<()> { - let mut packed = PackedEncoder::new(buf); - packed.encode_next(&self.location.to_proto()?)?; - packed.encode_next(&self.conf.to_proto()?)?; - packed.encode_next(&self.metadata.metadata_buffer)?; - packed.encode_next(&self.schema.to_proto()?)?; - Ok(()) - } - - fn decode(buf: &[u8]) -> Result { - let mut packed = PackedDecoder::new(buf); - let location = FileLocation::from_proto(packed.decode_next()?)?; - let conf = AccessConfig::from_proto(packed.decode_next()?)?; - let metadata_buffer: Bytes = packed.decode_next()?; - let schema = Schema::from_proto(packed.decode_next()?)?; - - let metadata = Arc::new(Metadata::try_from_buffer(metadata_buffer)?); - - Ok(ReadParquetState { - location, - conf, - schema, - metadata, - }) +impl ScanPlanner for ReadParquet { + fn plan<'a>( + &self, + context: &'a DatabaseContext, + positional_inputs: Vec, + named_inputs: HashMap, + ) -> BoxFuture<'a, Result> { + Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed() } } -#[derive(Debug, Clone)] -pub struct ReadParquetImpl { - func: ReadParquet, - state: ReadParquetState, -} +impl ReadParquet { + async fn plan_inner<'a>( + self: Self, + _context: &'a DatabaseContext, + positional_inputs: Vec, + named_inputs: HashMap, + ) -> Result { + let (location, conf) = + try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?; -impl ReadParquetImpl { - async fn initialize( - func: ReadParquet, - args: TableFunctionInputs, - ) -> Result> { - let (location, conf) = args.try_location_and_access_config()?; - let mut source = func + let mut source = self .runtime .file_provider() .file_source(location.clone(), &conf)?; @@ -119,51 +85,28 @@ impl ReadParquetImpl { let metadata = Metadata::new_from_source(source.as_mut(), size).await?; let schema = from_parquet_schema(metadata.decoded_metadata.file_metadata().schema_descr())?; - Ok(Box::new(Self { - func, - state: ReadParquetState { - location, - conf, - metadata: Arc::new(metadata), - schema, - }, - })) - } -} - -impl PlannedTableFunction2 for ReadParquetImpl { - fn table_function(&self) -> &dyn TableFunction { - &self.func - } - - fn schema(&self) -> Schema { - self.state.schema.clone() - } - - fn encode_state(&self, state: &mut Vec) -> Result<()> { - self.state.encode(state) - } - - fn cardinality(&self) -> StatisticsValue { - let num_rows = self - .state - .metadata + let num_rows = metadata .decoded_metadata .row_groups() .iter() .map(|g| g.num_rows()) .sum::() as usize; - StatisticsValue::Exact(num_rows) - } - - fn datatable(&self) -> Result> { - Ok(Box::new(RowGroupPartitionedDataTable { - metadata: self.state.metadata.clone(), - schema: self.state.schema.clone(), - location: self.state.location.clone(), - conf: self.state.conf.clone(), - runtime: self.func.runtime.clone(), - })) + let datatable = RowGroupPartitionedDataTable { + metadata: Arc::new(metadata), + schema: schema.clone(), + location, + conf, + runtime: self.runtime.clone(), + }; + + Ok(PlannedTableFunction { + function: Box::new(self), + positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(), + named_inputs, + function_impl: TableFunctionImpl::Scan(Arc::new(datatable)), + cardinality: StatisticsValue::Exact(num_rows), + schema, + }) } }