Skip to content

Commit

Permalink
feat: Lateral table functions + some refactoring (#3362)
Browse files Browse the repository at this point in the history
```
>> create temp table t (a int, b int);
┌─────────────────────┐
│ Query success       │
│ No columns returned │
└─────────────────────┘

>> insert into t values (3,6), (8,9);
┌───────────────┐
│ rows_inserted │
│ UInt64        │
├───────────────┤
│             2 │
└───────────────┘

>> select * from t, generate_series(a, b) order by 1,2,3;
┌───────┬───────┬─────────────────┐
│ a     │ b     │ generate_series │
│ Int32 │ Int32 │ Int64           │
├───────┼───────┼─────────────────┤
│     3 │     6 │               3 │
│     3 │     6 │               4 │
│     3 │     6 │               5 │
│     3 │     6 │               6 │
│     8 │     9 │               8 │
│     8 │     9 │               9 │
└───────┴───────┴─────────────────┘
```
  • Loading branch information
scsmithr authored Dec 15, 2024
1 parent 9577aba commit 3653724
Show file tree
Hide file tree
Showing 87 changed files with 2,259 additions and 1,904 deletions.
169 changes: 63 additions & 106 deletions crates/rayexec_csv/src/read_csv.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::StreamExt;
use rayexec_bullet::field::Schema;
use futures::{FutureExt, StreamExt};
use rayexec_bullet::datatype::DataTypeId;
use rayexec_bullet::scalar::OwnedScalarValue;
use rayexec_error::{RayexecError, Result};
use rayexec_execution::database::DatabaseContext;
use rayexec_execution::functions::table::inputs::TableFunctionInputs;
use rayexec_execution::functions::table::{PlannedTableFunction, TableFunction};
use rayexec_execution::expr;
use rayexec_execution::functions::table::{
try_location_and_access_config_from_args,
PlannedTableFunction,
ScanPlanner,
TableFunction,
TableFunctionImpl,
TableFunctionPlanner,
};
use rayexec_execution::functions::{FunctionInfo, Signature};
use rayexec_execution::logical::statistics::StatisticsValue;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::DataTable;
use rayexec_io::location::{AccessConfig, FileLocation};
use rayexec_io::{FileProvider, FileSource};
use rayexec_proto::packed::{PackedDecoder, PackedEncoder};
use rayexec_proto::ProtoConv;
use serde::{Deserialize, Serialize};

use crate::datatable::SingleFileCsvDataTable;
use crate::decoder::{CsvDecoder, DecoderState};
Expand All @@ -22,7 +30,7 @@ pub struct ReadCsv<R: Runtime> {
pub(crate) runtime: R,
}

impl<R: Runtime> TableFunction for ReadCsv<R> {
impl<R: Runtime> FunctionInfo for ReadCsv<R> {
fn name(&self) -> &'static str {
"read_csv"
}
Expand All @@ -31,79 +39,43 @@ impl<R: Runtime> TableFunction for ReadCsv<R> {
&["csv_scan"]
}

fn plan_and_initialize<'a>(
&self,
_context: &'a DatabaseContext,
args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction>>> {
Box::pin(ReadCsvImpl::initialize(self.clone(), args))
}

fn decode_state(&self, state: &[u8]) -> Result<Box<dyn PlannedTableFunction>> {
let state = ReadCsvState::decode(state)?;
Ok(Box::new(ReadCsvImpl {
func: self.clone(),
state,
}))
fn signatures(&self) -> &[Signature] {
&[Signature {
positional_args: &[DataTypeId::Utf8],
variadic_arg: None,
return_type: DataTypeId::Any,
}]
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct ReadCsvState {
location: FileLocation,
conf: AccessConfig,
csv_schema: CsvSchema,
dialect: DialectOptions,
}

impl ReadCsvState {
fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
let mut packed = PackedEncoder::new(buf);
packed.encode_next(&self.location.to_proto()?)?;
packed.encode_next(&self.conf.to_proto()?)?;
packed.encode_next(&self.csv_schema.schema.to_proto()?)?;
packed.encode_next(&self.csv_schema.has_header)?;
packed.encode_next(&self.csv_schema.has_header)?;
packed.encode_next(&(self.dialect.delimiter as i32))?;
packed.encode_next(&(self.dialect.quote as i32))?;
Ok(())
}

fn decode(buf: &[u8]) -> Result<Self> {
let mut packed = PackedDecoder::new(buf);
let location = FileLocation::from_proto(packed.decode_next()?)?;
let conf = AccessConfig::from_proto(packed.decode_next()?)?;
let schema = Schema::from_proto(packed.decode_next()?)?;
let has_header: bool = packed.decode_next()?;
let delimiter: i32 = packed.decode_next()?;
let quote: i32 = packed.decode_next()?;

Ok(ReadCsvState {
location,
conf,
csv_schema: CsvSchema { schema, has_header },
dialect: DialectOptions {
delimiter: delimiter as u8,
quote: quote as u8,
},
})
impl<R: Runtime> TableFunction for ReadCsv<R> {
fn planner(&self) -> TableFunctionPlanner {
TableFunctionPlanner::Scan(self)
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct ReadCsvImpl<R: Runtime> {
func: ReadCsv<R>,
state: ReadCsvState,
impl<R: Runtime> ScanPlanner for ReadCsv<R> {
fn plan<'a>(
&self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'a, Result<PlannedTableFunction>> {
Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed()
}
}

impl<R: Runtime> ReadCsvImpl<R> {
async fn initialize(
func: ReadCsv<R>,
args: TableFunctionInputs,
) -> Result<Box<dyn PlannedTableFunction>> {
let (location, conf) = args.try_location_and_access_config()?;

let mut source = func
impl<R: Runtime> ReadCsv<R> {
async fn plan_inner(
self,
_context: &DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> Result<PlannedTableFunction> {
let (location, conf) =
try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?;

let mut source = self
.runtime
.file_provider()
.file_source(location.clone(), &conf)?;
Expand Down Expand Up @@ -131,38 +103,23 @@ impl<R: Runtime> ReadCsvImpl<R> {
let completed = state.completed_records();
let csv_schema = CsvSchema::infer_from_records(completed)?;

Ok(Box::new(Self {
func,
state: ReadCsvState {
location,
conf,
dialect,
csv_schema,
},
}))
}
}

impl<R: Runtime> PlannedTableFunction for ReadCsvImpl<R> {
fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn encode_state(&self, state: &mut Vec<u8>) -> Result<()> {
self.state.encode(state)
}
let schema = csv_schema.schema.clone();

fn schema(&self) -> Schema {
self.state.csv_schema.schema.clone()
}
let datatable = SingleFileCsvDataTable {
options: dialect,
csv_schema,
location,
conf,
runtime: self.runtime.clone(),
};

fn datatable(&self) -> Result<Box<dyn DataTable>> {
Ok(Box::new(SingleFileCsvDataTable {
options: self.state.dialect,
csv_schema: self.state.csv_schema.clone(),
location: self.state.location.clone(),
conf: self.state.conf.clone(),
runtime: self.func.runtime.clone(),
}))
Ok(PlannedTableFunction {
function: Box::new(self),
positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(),
named_inputs,
function_impl: TableFunctionImpl::Scan(Arc::new(datatable)),
cardinality: StatisticsValue::Unknown,
schema,
})
}
}
155 changes: 55 additions & 100 deletions crates/rayexec_delta/src/read_delta.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use rayexec_bullet::field::Schema;
use rayexec_error::{RayexecError, Result};
use futures::FutureExt;
use rayexec_bullet::datatype::DataTypeId;
use rayexec_bullet::scalar::OwnedScalarValue;
use rayexec_error::Result;
use rayexec_execution::database::DatabaseContext;
use rayexec_execution::functions::table::inputs::TableFunctionInputs;
use rayexec_execution::functions::table::{PlannedTableFunction, TableFunction};
use rayexec_execution::expr;
use rayexec_execution::functions::table::{
try_location_and_access_config_from_args,
PlannedTableFunction,
ScanPlanner,
TableFunction,
TableFunctionImpl,
TableFunctionPlanner,
};
use rayexec_execution::functions::{FunctionInfo, Signature};
use rayexec_execution::logical::statistics::StatisticsValue;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::DataTable;
use rayexec_io::location::{AccessConfig, FileLocation};
use rayexec_proto::packed::{PackedDecoder, PackedEncoder};
use rayexec_proto::ProtoConv;

use crate::datatable::DeltaDataTable;
use crate::protocol::table::Table;
Expand All @@ -20,7 +28,7 @@ pub struct ReadDelta<R: Runtime> {
pub(crate) runtime: R,
}

impl<R: Runtime> TableFunction for ReadDelta<R> {
impl<R: Runtime> FunctionInfo for ReadDelta<R> {
fn name(&self) -> &'static str {
"read_delta"
}
Expand All @@ -29,109 +37,56 @@ impl<R: Runtime> TableFunction for ReadDelta<R> {
&["delta_scan"]
}

fn plan_and_initialize<'a>(
&self,
_context: &'a DatabaseContext,
args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction>>> {
let func = self.clone();
Box::pin(async move { ReadDeltaImpl::initialize(func, args).await })
}

fn decode_state(&self, state: &[u8]) -> Result<Box<dyn PlannedTableFunction>> {
Ok(Box::new(ReadDeltaImpl {
func: self.clone(),
state: ReadDeltaState::decode(state)?,
}))
fn signatures(&self) -> &[Signature] {
&[Signature {
positional_args: &[DataTypeId::Utf8],
variadic_arg: None,
return_type: DataTypeId::Any,
}]
}
}

#[derive(Debug, Clone)]
struct ReadDeltaState {
location: FileLocation,
conf: AccessConfig,
schema: Schema,
table: Option<Arc<Table>>, // Populate on re-init if needed.
}

impl ReadDeltaState {
fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
let mut packed = PackedEncoder::new(buf);
packed.encode_next(&self.location.to_proto()?)?;
packed.encode_next(&self.conf.to_proto()?)?;
packed.encode_next(&self.schema.to_proto()?)?;
Ok(())
}

fn decode(buf: &[u8]) -> Result<Self> {
let mut packed = PackedDecoder::new(buf);
let location = FileLocation::from_proto(packed.decode_next()?)?;
let conf = AccessConfig::from_proto(packed.decode_next()?)?;
let schema = Schema::from_proto(packed.decode_next()?)?;
Ok(ReadDeltaState {
location,
conf,
schema,
table: None,
})
impl<R: Runtime> TableFunction for ReadDelta<R> {
fn planner(&self) -> TableFunctionPlanner {
TableFunctionPlanner::Scan(self)
}
}

#[derive(Debug, Clone)]
pub struct ReadDeltaImpl<R: Runtime> {
func: ReadDelta<R>,
state: ReadDeltaState,
impl<R: Runtime> ScanPlanner for ReadDelta<R> {
fn plan<'a>(
&self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'a, Result<PlannedTableFunction>> {
Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed()
}
}

impl<R: Runtime> ReadDeltaImpl<R> {
async fn initialize(
func: ReadDelta<R>,
args: TableFunctionInputs,
) -> Result<Box<dyn PlannedTableFunction>> {
let (location, conf) = args.try_location_and_access_config()?;
impl<R: Runtime> ReadDelta<R> {
async fn plan_inner(
self,
_context: &DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> Result<PlannedTableFunction> {
let (location, conf) =
try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?;

let provider = func.runtime.file_provider();
let provider = self.runtime.file_provider();

let table = Table::load(location.clone(), provider, conf.clone()).await?;
let schema = table.table_schema()?;

Ok(Box::new(ReadDeltaImpl {
func,
state: ReadDeltaState {
location,
conf,
schema,
table: Some(Arc::new(table)),
},
}))
}
}

impl<R: Runtime> PlannedTableFunction for ReadDeltaImpl<R> {
fn reinitialize(&self) -> BoxFuture<Result<()>> {
// TODO: Reinit table.
// TODO: Needs mut
unimplemented!()
}

fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn encode_state(&self, state: &mut Vec<u8>) -> Result<()> {
self.state.encode(state)
}

fn schema(&self) -> Schema {
self.state.schema.clone()
}

fn datatable(&self) -> Result<Box<dyn DataTable>> {
let table = match self.state.table.as_ref() {
Some(table) => table.clone(),
None => return Err(RayexecError::new("Delta table not initialized")),
};

Ok(Box::new(DeltaDataTable { table }))
Ok(PlannedTableFunction {
function: Box::new(self),
positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(),
named_inputs,
function_impl: TableFunctionImpl::Scan(Arc::new(DeltaDataTable {
table: Arc::new(table), // TODO: Arc Arc
})),
cardinality: StatisticsValue::Unknown,
schema,
})
}
}
Loading

0 comments on commit 3653724

Please sign in to comment.