Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Lateral table functions + some refactoring #3362

Merged
merged 23 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 63 additions & 106 deletions crates/rayexec_csv/src/read_csv.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::StreamExt;
use rayexec_bullet::field::Schema;
use futures::{FutureExt, StreamExt};
use rayexec_bullet::datatype::DataTypeId;
use rayexec_bullet::scalar::OwnedScalarValue;
use rayexec_error::{RayexecError, Result};
use rayexec_execution::database::DatabaseContext;
use rayexec_execution::functions::table::inputs::TableFunctionInputs;
use rayexec_execution::functions::table::{PlannedTableFunction, TableFunction};
use rayexec_execution::expr;
use rayexec_execution::functions::table::{
try_location_and_access_config_from_args,
PlannedTableFunction,
ScanPlanner,
TableFunction,
TableFunctionImpl,
TableFunctionPlanner,
};
use rayexec_execution::functions::{FunctionInfo, Signature};
use rayexec_execution::logical::statistics::StatisticsValue;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::DataTable;
use rayexec_io::location::{AccessConfig, FileLocation};
use rayexec_io::{FileProvider, FileSource};
use rayexec_proto::packed::{PackedDecoder, PackedEncoder};
use rayexec_proto::ProtoConv;
use serde::{Deserialize, Serialize};

use crate::datatable::SingleFileCsvDataTable;
use crate::decoder::{CsvDecoder, DecoderState};
Expand All @@ -22,7 +30,7 @@ pub struct ReadCsv<R: Runtime> {
pub(crate) runtime: R,
}

impl<R: Runtime> TableFunction for ReadCsv<R> {
impl<R: Runtime> FunctionInfo for ReadCsv<R> {
fn name(&self) -> &'static str {
"read_csv"
}
Expand All @@ -31,79 +39,43 @@ impl<R: Runtime> TableFunction for ReadCsv<R> {
&["csv_scan"]
}

fn plan_and_initialize<'a>(
&self,
_context: &'a DatabaseContext,
args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction>>> {
Box::pin(ReadCsvImpl::initialize(self.clone(), args))
}

fn decode_state(&self, state: &[u8]) -> Result<Box<dyn PlannedTableFunction>> {
let state = ReadCsvState::decode(state)?;
Ok(Box::new(ReadCsvImpl {
func: self.clone(),
state,
}))
fn signatures(&self) -> &[Signature] {
&[Signature {
positional_args: &[DataTypeId::Utf8],
variadic_arg: None,
return_type: DataTypeId::Any,
}]
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct ReadCsvState {
location: FileLocation,
conf: AccessConfig,
csv_schema: CsvSchema,
dialect: DialectOptions,
}

impl ReadCsvState {
fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
let mut packed = PackedEncoder::new(buf);
packed.encode_next(&self.location.to_proto()?)?;
packed.encode_next(&self.conf.to_proto()?)?;
packed.encode_next(&self.csv_schema.schema.to_proto()?)?;
packed.encode_next(&self.csv_schema.has_header)?;
packed.encode_next(&self.csv_schema.has_header)?;
packed.encode_next(&(self.dialect.delimiter as i32))?;
packed.encode_next(&(self.dialect.quote as i32))?;
Ok(())
}

fn decode(buf: &[u8]) -> Result<Self> {
let mut packed = PackedDecoder::new(buf);
let location = FileLocation::from_proto(packed.decode_next()?)?;
let conf = AccessConfig::from_proto(packed.decode_next()?)?;
let schema = Schema::from_proto(packed.decode_next()?)?;
let has_header: bool = packed.decode_next()?;
let delimiter: i32 = packed.decode_next()?;
let quote: i32 = packed.decode_next()?;

Ok(ReadCsvState {
location,
conf,
csv_schema: CsvSchema { schema, has_header },
dialect: DialectOptions {
delimiter: delimiter as u8,
quote: quote as u8,
},
})
impl<R: Runtime> TableFunction for ReadCsv<R> {
fn planner(&self) -> TableFunctionPlanner {
TableFunctionPlanner::Scan(self)
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct ReadCsvImpl<R: Runtime> {
func: ReadCsv<R>,
state: ReadCsvState,
impl<R: Runtime> ScanPlanner for ReadCsv<R> {
fn plan<'a>(
&self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'a, Result<PlannedTableFunction>> {
Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed()
}
}

impl<R: Runtime> ReadCsvImpl<R> {
async fn initialize(
func: ReadCsv<R>,
args: TableFunctionInputs,
) -> Result<Box<dyn PlannedTableFunction>> {
let (location, conf) = args.try_location_and_access_config()?;

let mut source = func
impl<R: Runtime> ReadCsv<R> {
async fn plan_inner(
self,
_context: &DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> Result<PlannedTableFunction> {
let (location, conf) =
try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?;

let mut source = self
.runtime
.file_provider()
.file_source(location.clone(), &conf)?;
Expand Down Expand Up @@ -131,38 +103,23 @@ impl<R: Runtime> ReadCsvImpl<R> {
let completed = state.completed_records();
let csv_schema = CsvSchema::infer_from_records(completed)?;

Ok(Box::new(Self {
func,
state: ReadCsvState {
location,
conf,
dialect,
csv_schema,
},
}))
}
}

impl<R: Runtime> PlannedTableFunction for ReadCsvImpl<R> {
fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn encode_state(&self, state: &mut Vec<u8>) -> Result<()> {
self.state.encode(state)
}
let schema = csv_schema.schema.clone();

fn schema(&self) -> Schema {
self.state.csv_schema.schema.clone()
}
let datatable = SingleFileCsvDataTable {
options: dialect,
csv_schema,
location,
conf,
runtime: self.runtime.clone(),
};

fn datatable(&self) -> Result<Box<dyn DataTable>> {
Ok(Box::new(SingleFileCsvDataTable {
options: self.state.dialect,
csv_schema: self.state.csv_schema.clone(),
location: self.state.location.clone(),
conf: self.state.conf.clone(),
runtime: self.func.runtime.clone(),
}))
Ok(PlannedTableFunction {
function: Box::new(self),
positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(),
named_inputs,
function_impl: TableFunctionImpl::Scan(Arc::new(datatable)),
cardinality: StatisticsValue::Unknown,
schema,
})
}
}
155 changes: 55 additions & 100 deletions crates/rayexec_delta/src/read_delta.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use rayexec_bullet::field::Schema;
use rayexec_error::{RayexecError, Result};
use futures::FutureExt;
use rayexec_bullet::datatype::DataTypeId;
use rayexec_bullet::scalar::OwnedScalarValue;
use rayexec_error::Result;
use rayexec_execution::database::DatabaseContext;
use rayexec_execution::functions::table::inputs::TableFunctionInputs;
use rayexec_execution::functions::table::{PlannedTableFunction, TableFunction};
use rayexec_execution::expr;
use rayexec_execution::functions::table::{
try_location_and_access_config_from_args,
PlannedTableFunction,
ScanPlanner,
TableFunction,
TableFunctionImpl,
TableFunctionPlanner,
};
use rayexec_execution::functions::{FunctionInfo, Signature};
use rayexec_execution::logical::statistics::StatisticsValue;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::DataTable;
use rayexec_io::location::{AccessConfig, FileLocation};
use rayexec_proto::packed::{PackedDecoder, PackedEncoder};
use rayexec_proto::ProtoConv;

use crate::datatable::DeltaDataTable;
use crate::protocol::table::Table;
Expand All @@ -20,7 +28,7 @@ pub struct ReadDelta<R: Runtime> {
pub(crate) runtime: R,
}

impl<R: Runtime> TableFunction for ReadDelta<R> {
impl<R: Runtime> FunctionInfo for ReadDelta<R> {
fn name(&self) -> &'static str {
"read_delta"
}
Expand All @@ -29,109 +37,56 @@ impl<R: Runtime> TableFunction for ReadDelta<R> {
&["delta_scan"]
}

fn plan_and_initialize<'a>(
&self,
_context: &'a DatabaseContext,
args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction>>> {
let func = self.clone();
Box::pin(async move { ReadDeltaImpl::initialize(func, args).await })
}

fn decode_state(&self, state: &[u8]) -> Result<Box<dyn PlannedTableFunction>> {
Ok(Box::new(ReadDeltaImpl {
func: self.clone(),
state: ReadDeltaState::decode(state)?,
}))
fn signatures(&self) -> &[Signature] {
&[Signature {
positional_args: &[DataTypeId::Utf8],
variadic_arg: None,
return_type: DataTypeId::Any,
}]
}
}

#[derive(Debug, Clone)]
struct ReadDeltaState {
location: FileLocation,
conf: AccessConfig,
schema: Schema,
table: Option<Arc<Table>>, // Populate on re-init if needed.
}

impl ReadDeltaState {
fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
let mut packed = PackedEncoder::new(buf);
packed.encode_next(&self.location.to_proto()?)?;
packed.encode_next(&self.conf.to_proto()?)?;
packed.encode_next(&self.schema.to_proto()?)?;
Ok(())
}

fn decode(buf: &[u8]) -> Result<Self> {
let mut packed = PackedDecoder::new(buf);
let location = FileLocation::from_proto(packed.decode_next()?)?;
let conf = AccessConfig::from_proto(packed.decode_next()?)?;
let schema = Schema::from_proto(packed.decode_next()?)?;
Ok(ReadDeltaState {
location,
conf,
schema,
table: None,
})
impl<R: Runtime> TableFunction for ReadDelta<R> {
fn planner(&self) -> TableFunctionPlanner {
TableFunctionPlanner::Scan(self)
}
}

#[derive(Debug, Clone)]
pub struct ReadDeltaImpl<R: Runtime> {
func: ReadDelta<R>,
state: ReadDeltaState,
impl<R: Runtime> ScanPlanner for ReadDelta<R> {
fn plan<'a>(
&self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'a, Result<PlannedTableFunction>> {
Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed()
}
}

impl<R: Runtime> ReadDeltaImpl<R> {
async fn initialize(
func: ReadDelta<R>,
args: TableFunctionInputs,
) -> Result<Box<dyn PlannedTableFunction>> {
let (location, conf) = args.try_location_and_access_config()?;
impl<R: Runtime> ReadDelta<R> {
async fn plan_inner(
self,
_context: &DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> Result<PlannedTableFunction> {
let (location, conf) =
try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?;

let provider = func.runtime.file_provider();
let provider = self.runtime.file_provider();

let table = Table::load(location.clone(), provider, conf.clone()).await?;
let schema = table.table_schema()?;

Ok(Box::new(ReadDeltaImpl {
func,
state: ReadDeltaState {
location,
conf,
schema,
table: Some(Arc::new(table)),
},
}))
}
}

impl<R: Runtime> PlannedTableFunction for ReadDeltaImpl<R> {
fn reinitialize(&self) -> BoxFuture<Result<()>> {
// TODO: Reinit table.
// TODO: Needs mut
unimplemented!()
}

fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn encode_state(&self, state: &mut Vec<u8>) -> Result<()> {
self.state.encode(state)
}

fn schema(&self) -> Schema {
self.state.schema.clone()
}

fn datatable(&self) -> Result<Box<dyn DataTable>> {
let table = match self.state.table.as_ref() {
Some(table) => table.clone(),
None => return Err(RayexecError::new("Delta table not initialized")),
};

Ok(Box::new(DeltaDataTable { table }))
Ok(PlannedTableFunction {
function: Box::new(self),
positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(),
named_inputs,
function_impl: TableFunctionImpl::Scan(Arc::new(DeltaDataTable {
table: Arc::new(table), // TODO: Arc Arc
})),
cardinality: StatisticsValue::Unknown,
schema,
})
}
}
Loading
Loading