Skip to content

Commit

Permalink
read_delta
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 15, 2024
1 parent 7a8a5aa commit b5f9d73
Showing 1 changed file with 44 additions and 101 deletions.
145 changes: 44 additions & 101 deletions crates/rayexec_delta/src/read_delta.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::FutureExt;
use rayexec_bullet::datatype::DataTypeId;
use rayexec_bullet::field::Schema;
use rayexec_error::{RayexecError, Result};
use rayexec_bullet::scalar::OwnedScalarValue;
use rayexec_error::Result;
use rayexec_execution::database::DatabaseContext;
use rayexec_execution::functions::table::inputs::TableFunctionInputs;
use rayexec_execution::functions::table::{PlannedTableFunction2, TableFunction};
use rayexec_execution::expr;
use rayexec_execution::functions::table::{
try_location_and_access_config_from_args,
PlannedTableFunction,
ScanPlanner,
TableFunction,
TableFunctionImpl,
TableFunctionPlanner,
};
use rayexec_execution::functions::{FunctionInfo, Signature};
use rayexec_execution::logical::statistics::StatisticsValue;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::DataTable;
use rayexec_io::location::{AccessConfig, FileLocation};
use rayexec_proto::packed::{PackedDecoder, PackedEncoder};
use rayexec_proto::ProtoConv;

use crate::datatable::DeltaDataTable;
use crate::protocol::table::Table;
Expand Down Expand Up @@ -41,109 +47,46 @@ impl<R: Runtime> FunctionInfo for ReadDelta<R> {
}

impl<R: Runtime> TableFunction for ReadDelta<R> {
fn plan_and_initialize<'a>(
&self,
_context: &'a DatabaseContext,
args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction2>>> {
let func = self.clone();
Box::pin(async move { ReadDeltaImpl::initialize(func, args).await })
}

fn decode_state(&self, state: &[u8]) -> Result<Box<dyn PlannedTableFunction2>> {
Ok(Box::new(ReadDeltaImpl {
func: self.clone(),
state: ReadDeltaState::decode(state)?,
}))
fn planner(&self) -> TableFunctionPlanner {
TableFunctionPlanner::Scan(self)
}
}

#[derive(Debug, Clone)]
struct ReadDeltaState {
location: FileLocation,
conf: AccessConfig,
schema: Schema,
table: Option<Arc<Table>>, // Populate on re-init if needed.
}

impl ReadDeltaState {
fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
let mut packed = PackedEncoder::new(buf);
packed.encode_next(&self.location.to_proto()?)?;
packed.encode_next(&self.conf.to_proto()?)?;
packed.encode_next(&self.schema.to_proto()?)?;
Ok(())
}

fn decode(buf: &[u8]) -> Result<Self> {
let mut packed = PackedDecoder::new(buf);
let location = FileLocation::from_proto(packed.decode_next()?)?;
let conf = AccessConfig::from_proto(packed.decode_next()?)?;
let schema = Schema::from_proto(packed.decode_next()?)?;
Ok(ReadDeltaState {
location,
conf,
schema,
table: None,
})
impl<R: Runtime> ScanPlanner for ReadDelta<R> {
fn plan<'a>(
&self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'a, Result<PlannedTableFunction>> {
Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed()
}
}

#[derive(Debug, Clone)]
pub struct ReadDeltaImpl<R: Runtime> {
func: ReadDelta<R>,
state: ReadDeltaState,
}

impl<R: Runtime> ReadDeltaImpl<R> {
async fn initialize(
func: ReadDelta<R>,
args: TableFunctionInputs,
) -> Result<Box<dyn PlannedTableFunction2>> {
let (location, conf) = args.try_location_and_access_config()?;
impl<R: Runtime> ReadDelta<R> {
async fn plan_inner<'a>(
self: Self,
_context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> Result<PlannedTableFunction> {
let (location, conf) =
try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?;

let provider = func.runtime.file_provider();
let provider = self.runtime.file_provider();

let table = Table::load(location.clone(), provider, conf.clone()).await?;
let schema = table.table_schema()?;

Ok(Box::new(ReadDeltaImpl {
func,
state: ReadDeltaState {
location,
conf,
schema,
table: Some(Arc::new(table)),
},
}))
}
}

impl<R: Runtime> PlannedTableFunction2 for ReadDeltaImpl<R> {
fn reinitialize(&self) -> BoxFuture<Result<()>> {
// TODO: Reinit table.
// TODO: Needs mut
unimplemented!()
}

fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn encode_state(&self, state: &mut Vec<u8>) -> Result<()> {
self.state.encode(state)
}

fn schema(&self) -> Schema {
self.state.schema.clone()
}

fn datatable(&self) -> Result<Box<dyn DataTable>> {
let table = match self.state.table.as_ref() {
Some(table) => table.clone(),
None => return Err(RayexecError::new("Delta table not initialized")),
};

Ok(Box::new(DeltaDataTable { table }))
Ok(PlannedTableFunction {
function: Box::new(self),
positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(),
named_inputs,
function_impl: TableFunctionImpl::Scan(Arc::new(DeltaDataTable {
table: Arc::new(table), // TODO: Arc Arc
})),
cardinality: StatisticsValue::Unknown,
schema,
})
}
}

0 comments on commit b5f9d73

Please sign in to comment.