Skip to content

Commit

Permalink
read_parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 15, 2024
1 parent 1402b0b commit 7a8a5aa
Showing 1 changed file with 49 additions and 106 deletions.
155 changes: 49 additions & 106 deletions crates/rayexec_parquet/src/functions/read_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use std::collections::HashMap;
use std::sync::Arc;

use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use rayexec_bullet::datatype::DataTypeId;
use rayexec_bullet::field::Schema;
use rayexec_bullet::scalar::OwnedScalarValue;
use rayexec_error::Result;
use rayexec_execution::database::DatabaseContext;
use rayexec_execution::functions::table::inputs::TableFunctionInputs;
use rayexec_execution::functions::table::{PlannedTableFunction2, TableFunction};
use rayexec_execution::expr;
use rayexec_execution::functions::table::{
try_location_and_access_config_from_args,
PlannedTableFunction,
ScanPlanner,
TableFunction,
TableFunctionImpl,
TableFunctionPlanner,
};
use rayexec_execution::functions::{FunctionInfo, Signature};
use rayexec_execution::logical::statistics::StatisticsValue;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::DataTable;
use rayexec_io::location::{AccessConfig, FileLocation};
use rayexec_io::FileProvider;
use rayexec_proto::packed::{PackedDecoder, PackedEncoder};
use rayexec_proto::ProtoConv;

use super::datatable::RowGroupPartitionedDataTable;
use crate::metadata::Metadata;
Expand Down Expand Up @@ -45,71 +49,33 @@ impl<R: Runtime> FunctionInfo for ReadParquet<R> {
}

impl<R: Runtime> TableFunction for ReadParquet<R> {
fn plan_and_initialize<'a>(
&self,
_context: &'a DatabaseContext,
args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction2>>> {
Box::pin(ReadParquetImpl::initialize(self.clone(), args))
}

fn decode_state(&self, state: &[u8]) -> Result<Box<dyn PlannedTableFunction2>> {
Ok(Box::new(ReadParquetImpl {
func: self.clone(),
state: ReadParquetState::decode(state)?,
}))
fn planner(&self) -> TableFunctionPlanner {
TableFunctionPlanner::Scan(self)
}
}

#[derive(Debug, Clone)]
struct ReadParquetState {
location: FileLocation,
conf: AccessConfig,
metadata: Arc<Metadata>,
schema: Schema,
}

impl ReadParquetState {
fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
let mut packed = PackedEncoder::new(buf);
packed.encode_next(&self.location.to_proto()?)?;
packed.encode_next(&self.conf.to_proto()?)?;
packed.encode_next(&self.metadata.metadata_buffer)?;
packed.encode_next(&self.schema.to_proto()?)?;
Ok(())
}

fn decode(buf: &[u8]) -> Result<Self> {
let mut packed = PackedDecoder::new(buf);
let location = FileLocation::from_proto(packed.decode_next()?)?;
let conf = AccessConfig::from_proto(packed.decode_next()?)?;
let metadata_buffer: Bytes = packed.decode_next()?;
let schema = Schema::from_proto(packed.decode_next()?)?;

let metadata = Arc::new(Metadata::try_from_buffer(metadata_buffer)?);

Ok(ReadParquetState {
location,
conf,
schema,
metadata,
})
impl<R: Runtime> ScanPlanner for ReadParquet<R> {
fn plan<'a>(
&self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'a, Result<PlannedTableFunction>> {
Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed()
}
}

#[derive(Debug, Clone)]
pub struct ReadParquetImpl<R: Runtime> {
func: ReadParquet<R>,
state: ReadParquetState,
}
impl<R: Runtime> ReadParquet<R> {
async fn plan_inner<'a>(
self: Self,
_context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> Result<PlannedTableFunction> {
let (location, conf) =
try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?;

impl<R: Runtime> ReadParquetImpl<R> {
async fn initialize(
func: ReadParquet<R>,
args: TableFunctionInputs,
) -> Result<Box<dyn PlannedTableFunction2>> {
let (location, conf) = args.try_location_and_access_config()?;
let mut source = func
let mut source = self
.runtime
.file_provider()
.file_source(location.clone(), &conf)?;
Expand All @@ -119,51 +85,28 @@ impl<R: Runtime> ReadParquetImpl<R> {
let metadata = Metadata::new_from_source(source.as_mut(), size).await?;
let schema = from_parquet_schema(metadata.decoded_metadata.file_metadata().schema_descr())?;

Ok(Box::new(Self {
func,
state: ReadParquetState {
location,
conf,
metadata: Arc::new(metadata),
schema,
},
}))
}
}

impl<R: Runtime> PlannedTableFunction2 for ReadParquetImpl<R> {
fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn schema(&self) -> Schema {
self.state.schema.clone()
}

fn encode_state(&self, state: &mut Vec<u8>) -> Result<()> {
self.state.encode(state)
}

fn cardinality(&self) -> StatisticsValue<usize> {
let num_rows = self
.state
.metadata
let num_rows = metadata
.decoded_metadata
.row_groups()
.iter()
.map(|g| g.num_rows())
.sum::<i64>() as usize;

StatisticsValue::Exact(num_rows)
}

fn datatable(&self) -> Result<Box<dyn DataTable>> {
Ok(Box::new(RowGroupPartitionedDataTable {
metadata: self.state.metadata.clone(),
schema: self.state.schema.clone(),
location: self.state.location.clone(),
conf: self.state.conf.clone(),
runtime: self.func.runtime.clone(),
}))
let datatable = RowGroupPartitionedDataTable {
metadata: Arc::new(metadata),
schema: schema.clone(),
location,
conf,
runtime: self.runtime.clone(),
};

Ok(PlannedTableFunction {
function: Box::new(self),
positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(),
named_inputs,
function_impl: TableFunctionImpl::Scan(Arc::new(datatable)),
cardinality: StatisticsValue::Exact(num_rows),
schema,
})
}
}

0 comments on commit 7a8a5aa

Please sign in to comment.