Skip to content

Commit

Permalink
unity stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 15, 2024
1 parent b5f9d73 commit 81a714a
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 128 deletions.
13 changes: 13 additions & 0 deletions crates/rayexec_execution/src/functions/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,16 @@ pub fn try_get_named<'a>(
))
})
}

pub fn try_get_positional<'a>(
func: &impl TableFunction,
pos: usize,
positional: &'a [OwnedScalarValue],
) -> Result<&'a OwnedScalarValue> {
positional.get(pos).ok_or_else(|| {
RayexecError::new(format!(
"Expected argument at position {pos} for function {}",
func.name()
))
})
}
119 changes: 45 additions & 74 deletions crates/rayexec_iceberg/src/read_iceberg.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::FutureExt;
use rayexec_bullet::datatype::DataTypeId;
use rayexec_bullet::field::Schema;
use rayexec_error::{not_implemented, RayexecError, Result};
use rayexec_bullet::scalar::OwnedScalarValue;
use rayexec_error::Result;
use rayexec_execution::database::DatabaseContext;
use rayexec_execution::functions::table::inputs::TableFunctionInputs;
use rayexec_execution::functions::table::{PlannedTableFunction2, TableFunction};
use rayexec_execution::expr;
use rayexec_execution::functions::table::{
try_location_and_access_config_from_args,
PlannedTableFunction,
ScanPlanner,
TableFunction,
TableFunctionImpl,
TableFunctionPlanner,
};
use rayexec_execution::functions::{FunctionInfo, Signature};
use rayexec_execution::logical::statistics::StatisticsValue;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::DataTable;
use rayexec_io::location::{AccessConfig, FileLocation};

use crate::datatable::IcebergDataTable;
use crate::table::Table;
Expand Down Expand Up @@ -39,83 +47,46 @@ impl<R: Runtime> FunctionInfo for ReadIceberg<R> {
}

impl<R: Runtime> TableFunction for ReadIceberg<R> {
fn plan_and_initialize<'a>(
&self,
_context: &'a DatabaseContext,
args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction2>>> {
let func = self.clone();
Box::pin(async move { ReadIcebergImpl::initialize(func, args).await })
}

fn decode_state(&self, _state: &[u8]) -> Result<Box<dyn PlannedTableFunction2>> {
// TODO
not_implemented!("decode iceberg state")
fn planner(&self) -> TableFunctionPlanner {
TableFunctionPlanner::Scan(self)
}
}

#[derive(Debug, Clone)]
struct ReadIcebergState {
_location: FileLocation,
_conf: AccessConfig,
schema: Schema,
table: Option<Arc<Table>>, // Populate on re-init if needed.
}

#[derive(Debug, Clone)]
pub struct ReadIcebergImpl<R: Runtime> {
func: ReadIceberg<R>,
state: ReadIcebergState,
impl<R: Runtime> ScanPlanner for ReadIceberg<R> {
fn plan<'a>(
&self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'a, Result<PlannedTableFunction>> {
Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed()
}
}

impl<R: Runtime> ReadIcebergImpl<R> {
async fn initialize(
func: ReadIceberg<R>,
args: TableFunctionInputs,
) -> Result<Box<dyn PlannedTableFunction2>> {
let (location, conf) = args.try_location_and_access_config()?;
let provider = func.runtime.file_provider();
impl<R: Runtime> ReadIceberg<R> {
async fn plan_inner<'a>(
self: Self,
_context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> Result<PlannedTableFunction> {
let (location, conf) =
try_location_and_access_config_from_args(&self, &positional_inputs, &named_inputs)?;
let provider = self.runtime.file_provider();

// TODO: Fetch stats, use during planning.
let table = Table::load(location.clone(), provider, conf.clone()).await?;
let schema = table.schema()?;

Ok(Box::new(ReadIcebergImpl {
func,
state: ReadIcebergState {
_location: location,
_conf: conf,
schema,
table: Some(Arc::new(table)),
},
}))
}
}

impl<R: Runtime> PlannedTableFunction2 for ReadIcebergImpl<R> {
fn reinitialize(&self) -> BoxFuture<Result<()>> {
// TODO: See delta
Box::pin(async move { not_implemented!("reinit iceberg state") })
}

fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn encode_state(&self, _state: &mut Vec<u8>) -> Result<()> {
not_implemented!("encode iceberg state")
}

fn schema(&self) -> Schema {
self.state.schema.clone()
}

fn datatable(&self) -> Result<Box<dyn DataTable>> {
let table = match self.state.table.as_ref() {
Some(table) => table.clone(),
None => return Err(RayexecError::new("Iceberg table not initialized")),
};

Ok(Box::new(IcebergDataTable { table }))
Ok(PlannedTableFunction {
function: Box::new(self),
positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(),
named_inputs,
function_impl: TableFunctionImpl::Scan(Arc::new(IcebergDataTable {
table: Arc::new(table), // TODO: Arc Arc
})),
cardinality: StatisticsValue::Unknown,
schema,
})
}
}
122 changes: 68 additions & 54 deletions crates/rayexec_unity_catalog/src/functions.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::marker::PhantomData;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use futures::{FutureExt, TryStreamExt};
use rayexec_bullet::array::Array;
use rayexec_bullet::batch::Batch;
use rayexec_bullet::datatype::{DataType, DataTypeId};
use rayexec_bullet::field::{Field, Schema};
use rayexec_error::{not_implemented, Result};
use rayexec_bullet::scalar::OwnedScalarValue;
use rayexec_error::Result;
use rayexec_execution::database::DatabaseContext;
use rayexec_execution::functions::table::inputs::TableFunctionInputs;
use rayexec_execution::functions::table::{PlannedTableFunction2, TableFunction};
use rayexec_execution::expr;
use rayexec_execution::functions::table::{
try_get_positional,
PlannedTableFunction,
ScanPlanner,
TableFunction,
TableFunctionImpl,
TableFunctionPlanner,
};
use rayexec_execution::functions::{FunctionInfo, Signature};
use rayexec_execution::logical::statistics::StatisticsValue;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::{
DataTable,
Expand Down Expand Up @@ -43,9 +54,10 @@ pub trait UnityObjectsOperation<R: Runtime>:

/// Create the connection state.
fn create_connection_state(
runtime: R,
info: UnityObjects<R, Self>,
context: &DatabaseContext,
args: TableFunctionInputs,
positional_args: Vec<OwnedScalarValue>,
named_args: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'_, Result<Self::ConnectionState>>;

/// Create a stream state from the connection state.
Expand Down Expand Up @@ -95,15 +107,16 @@ impl<R: Runtime> UnityObjectsOperation<R> for ListSchemasOperation {
}

fn create_connection_state(
runtime: R,
info: UnityObjects<R, Self>,
_context: &DatabaseContext,
args: TableFunctionInputs,
positional_args: Vec<OwnedScalarValue>,
_named_args: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'_, Result<Self::ConnectionState>> {
Box::pin(async move {
let endpoint = args.try_get_position(0)?.try_as_str()?;
let catalog = args.try_get_position(1)?.try_as_str()?;
let endpoint = try_get_positional(&info, 0, &positional_args)?.try_as_str()?;
let catalog = try_get_positional(&info, 1, &positional_args)?.try_as_str()?;

let conn = UnityCatalogConnection::connect(runtime, endpoint, catalog).await?;
let conn = UnityCatalogConnection::connect(info.runtime, endpoint, catalog).await?;

Ok(ListSchemasConnectionState { conn })
})
Expand Down Expand Up @@ -177,16 +190,17 @@ impl<R: Runtime> UnityObjectsOperation<R> for ListTablesOperation {
}

fn create_connection_state(
runtime: R,
info: UnityObjects<R, Self>,
_context: &DatabaseContext,
args: TableFunctionInputs,
positional_args: Vec<OwnedScalarValue>,
_named_args: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'_, Result<Self::ConnectionState>> {
Box::pin(async move {
let endpoint = args.try_get_position(0)?.try_as_str()?;
let catalog = args.try_get_position(1)?.try_as_str()?;
let schema = args.try_get_position(2)?.try_as_str()?;
let endpoint = try_get_positional(&info, 0, &positional_args)?.try_as_str()?;
let catalog = try_get_positional(&info, 1, &positional_args)?.try_as_str()?;
let schema = try_get_positional(&info, 2, &positional_args)?.try_as_str()?;

let conn = UnityCatalogConnection::connect(runtime, endpoint, catalog).await?;
let conn = UnityCatalogConnection::connect(info.runtime, endpoint, catalog).await?;

Ok(ListTablesConnectionState {
conn,
Expand Down Expand Up @@ -262,48 +276,48 @@ impl<R: Runtime, O: UnityObjectsOperation<R>> FunctionInfo for UnityObjects<R, O
}

impl<R: Runtime, O: UnityObjectsOperation<R>> TableFunction for UnityObjects<R, O> {
fn plan_and_initialize<'a>(
&self,
context: &'a DatabaseContext,
args: TableFunctionInputs,
) -> BoxFuture<'a, Result<Box<dyn PlannedTableFunction2>>> {
let func = self.clone();
let runtime = self.runtime.clone();

Box::pin(async move {
let state = O::create_connection_state(runtime, context, args).await?;
Ok(Box::new(UnityObjectsImpl::<R, O> { func, state }) as _)
})
fn planner(&self) -> TableFunctionPlanner {
TableFunctionPlanner::Scan(self)
}

fn decode_state(&self, _state: &[u8]) -> Result<Box<dyn PlannedTableFunction2>> {
not_implemented!("decode state for unity operation")
}
}

#[derive(Debug, Clone)]
pub struct UnityObjectsImpl<R: Runtime, O: UnityObjectsOperation<R>> {
func: UnityObjects<R, O>,
state: O::ConnectionState,
}

impl<R: Runtime, O: UnityObjectsOperation<R>> PlannedTableFunction2 for UnityObjectsImpl<R, O> {
fn table_function(&self) -> &dyn TableFunction {
&self.func
}

fn schema(&self) -> Schema {
O::schema()
}

fn encode_state(&self, _state: &mut Vec<u8>) -> Result<()> {
not_implemented!("decode state for unity operation")
impl<R: Runtime, O: UnityObjectsOperation<R>> ScanPlanner for UnityObjects<R, O> {
fn plan<'a>(
&self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> BoxFuture<'a, Result<PlannedTableFunction>> {
Self::plan_inner(self.clone(), context, positional_inputs, named_inputs).boxed()
}
}

fn datatable(&self) -> Result<Box<dyn DataTable>> {
Ok(Box::new(UnityObjectsDataTable::<R, O> {
state: self.state.clone(),
}))
impl<R: Runtime, O: UnityObjectsOperation<R>> UnityObjects<R, O> {
async fn plan_inner<'a>(
self: Self,
context: &'a DatabaseContext,
positional_inputs: Vec<OwnedScalarValue>,
named_inputs: HashMap<String, OwnedScalarValue>,
) -> Result<PlannedTableFunction> {
// TODO: Remove clones.
let state = O::create_connection_state(
self.clone(),
context,
positional_inputs.clone(),
named_inputs.clone(),
)
.await?;

Ok(PlannedTableFunction {
function: Box::new(self),
positional_inputs: positional_inputs.into_iter().map(expr::lit).collect(),
named_inputs,
function_impl: TableFunctionImpl::Scan(Arc::new(UnityObjectsDataTable::<R, O> {
state,
})),
cardinality: StatisticsValue::Unknown,
schema: O::schema(),
})
}
}

Expand Down

0 comments on commit 81a714a

Please sign in to comment.