Skip to content

Commit

Permalink
read_csv
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 15, 2024
1 parent e796537 commit e631142
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 271 deletions.
157 changes: 51 additions & 106 deletions crates/rayexec_csv/src/read_csv.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use rayexec_bullet::datatype::DataTypeId;
use rayexec_bullet::field::Schema;
use rayexec_bullet::scalar::OwnedScalarValue;
use rayexec_error::{RayexecError, Result};
use rayexec_execution::database::DatabaseContext;
use rayexec_execution::functions::table::inputs::TableFunctionInputs;
use rayexec_execution::functions::table::{PlannedTableFunction2, TableFunction};
use rayexec_execution::expr;
use rayexec_execution::functions::table::{
try_location_and_access_config_from_args,
PlannedTableFunction,
ScanPlanner,
TableFunction,
TableFunctionImpl,
TableFunctionPlanner,
};
use rayexec_execution::functions::{FunctionInfo, Signature};
use rayexec_execution::logical::statistics::StatisticsValue;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::DataTable;
use rayexec_io::location::{AccessConfig, FileLocation};
use rayexec_io::{FileProvider, FileSource};
use rayexec_proto::packed::{PackedDecoder, PackedEncoder};
use rayexec_proto::ProtoConv;
use serde::{Deserialize, Serialize};

use crate::datatable::SingleFileCsvDataTable;
use crate::decoder::{CsvDecoder, DecoderState};
Expand Down Expand Up @@ -43,79 +49,33 @@ impl<R: Runtime> FunctionInfo for ReadCsv<R> {
}

impl<R: Runtime> TableFunction for ReadCsv<R> {
fn plan_and_initialize<'a>(
&self,
_context: &'a DatabaseContext,
args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction2>>> {
Box::pin(ReadCsvImpl::initialize(self.clone(), args))
}

fn decode_state(&self, state: &[u8]) -> Result<Box<dyn PlannedTableFunction2>> {
let state = ReadCsvState::decode(state)?;
Ok(Box::new(ReadCsvImpl {
func: self.clone(),
state,
}))
fn planner(&self) -> TableFunctionPlanner {
TableFunctionPlanner::Scan(self)
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct ReadCsvState {
location: FileLocation,
conf: AccessConfig,
csv_schema: CsvSchema,
dialect: DialectOptions,
}

impl ReadCsvState {
fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
let mut packed = PackedEncoder::new(buf);
packed.encode_next(&self.location.to_proto()?)?;
packed.encode_next(&self.conf.to_proto()?)?;
packed.encode_next(&self.csv_schema.schema.to_proto()?)?;
packed.encode_next(&self.csv_schema.has_header)?;
packed.encode_next(&self.csv_schema.has_header)?;
packed.encode_next(&(self.dialect.delimiter as i32))?;
packed.encode_next(&(self.dialect.quote as i32))?;
Ok(())
}

fn decode(buf: &[u8]) -> Result<Self> {
let mut packed = PackedDecoder::new(buf);
let location = FileLocation::from_proto(packed.decode_next()?)?;
let conf = AccessConfig::from_proto(packed.decode_next()?)?;
let schema = Schema::from_proto(packed.decode_next()?)?;
let has_header: bool = packed.decode_next()?;
let delimiter: i32 = packed.decode_next()?;
let quote: i32 = packed.decode_next()?;

Ok(ReadCsvState {
location,
conf,
csv_schema: CsvSchema { schema, has_header },
dialect: DialectOptions {
delimiter: delimiter as u8,
quote: quote as u8,
},
})
impl<R: Runtime> ScanPlanner for ReadCsv<R> {
fn plan<'a>(
&self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'a, Result<PlannedTableFunction>> {
Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed()
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct ReadCsvImpl<R: Runtime> {
func: ReadCsv<R>,
state: ReadCsvState,
}

impl<R: Runtime> ReadCsvImpl<R> {
async fn initialize(
func: ReadCsv<R>,
args: TableFunctionInputs,
) -> Result<Box<dyn PlannedTableFunction2>> {
let (location, conf) = args.try_location_and_access_config()?;
impl<R: Runtime> ReadCsv<R> {
async fn plan_inner<'a>(
self: Self,
_context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> Result<PlannedTableFunction> {
let (location, conf) =
try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?;

let mut source = func
let mut source = self
.runtime
.file_provider()
.file_source(location.clone(), &conf)?;
Expand Down Expand Up @@ -143,38 +103,23 @@ impl<R: Runtime> ReadCsvImpl<R> {
let completed = state.completed_records();
let csv_schema = CsvSchema::infer_from_records(completed)?;

Ok(Box::new(Self {
func,
state: ReadCsvState {
location,
conf,
dialect,
csv_schema,
},
}))
}
}

impl<R: Runtime> PlannedTableFunction2 for ReadCsvImpl<R> {
fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn encode_state(&self, state: &mut Vec<u8>) -> Result<()> {
self.state.encode(state)
}
let schema = csv_schema.schema.clone();

fn schema(&self) -> Schema {
self.state.csv_schema.schema.clone()
}
let datatable = SingleFileCsvDataTable {
options: dialect,
csv_schema,
location,
conf,
runtime: self.runtime.clone(),
};

fn datatable(&self) -> Result<Box<dyn DataTable>> {
Ok(Box::new(SingleFileCsvDataTable {
options: self.state.dialect,
csv_schema: self.state.csv_schema.clone(),
location: self.state.location.clone(),
conf: self.state.conf.clone(),
runtime: self.func.runtime.clone(),
}))
Ok(PlannedTableFunction {
function: Box::new(self),
positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(),
named_inputs,
function_impl: TableFunctionImpl::Scan(Arc::new(datatable)),
cardinality: StatisticsValue::Unknown,
schema,
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ use super::{
};
use crate::database::DatabaseContext;
use crate::explain::explainable::{ExplainConfig, ExplainEntry, Explainable};
use crate::functions::table::scan::TableScanState;
use crate::functions::table::{PlannedTableFunction, TableFunctionImpl};
use crate::proto::DatabaseProtoConv;
use crate::storage::table_storage::Projections;
use crate::storage::table_storage::{DataTableScan, Projections};

pub struct TableFunctionPartitionState {
scan_state: Box<dyn TableScanState>,
scan_state: Box<dyn DataTableScan>,
/// In progress pull we're working on.
future: Option<BoxFuture<'static, Result<Option<Batch>>>>,
}
Expand Down
38 changes: 2 additions & 36 deletions crates/rayexec_execution/src/functions/table/builtin/refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use rayexec_error::{RayexecError, Result};

use crate::database::memory_catalog::MemoryCatalog;
use crate::database::DatabaseContext;
use crate::functions::table::{PlannedTableFunction2, TableFunction, TableFunctionInputs};
use crate::functions::table::{TableFunction, TableFunctionInputs, TableFunctionPlanner};
use crate::functions::{FunctionInfo, Signature};
use crate::storage::catalog_storage::CatalogStorage;
use crate::storage::table_storage::DataTable;

pub trait RefreshOperation: Debug + Clone + Copy + PartialEq + Eq + Sync + Send + 'static {
const NAME: &'static str;
Expand Down Expand Up @@ -96,40 +95,7 @@ impl<O: RefreshOperation> FunctionInfo for RefreshObjects<O> {
}

impl<O: RefreshOperation> TableFunction for RefreshObjects<O> {
fn plan_and_initialize<'a>(
&self,
_context: &'a DatabaseContext,
_args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction2>>> {
unimplemented!()
}

fn decode_state(&self, _state: &[u8]) -> Result<Box<dyn PlannedTableFunction2>> {
unimplemented!()
}
}

#[derive(Debug, Clone)]
pub struct RefreshObjectsImpl<O: RefreshOperation> {
func: RefreshObjects<O>,
_state: Option<O::State>,
_op: PhantomData<O>,
}

impl<O: RefreshOperation> PlannedTableFunction2 for RefreshObjectsImpl<O> {
fn encode_state(&self, _state: &mut Vec<u8>) -> Result<()> {
Ok(())
}

fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn schema(&self) -> Schema {
O::schema()
}

fn datatable(&self) -> Result<Box<dyn DataTable>> {
fn planner(&self) -> TableFunctionPlanner {
unimplemented!()
}
}
Loading

0 comments on commit e631142

Please sign in to comment.