diff --git a/crates/rayexec_execution/src/execution/executable/pipeline.rs b/crates/rayexec_execution/src/execution/executable/pipeline.rs index 339a5f98c..7bcbed0af 100644 --- a/crates/rayexec_execution/src/execution/executable/pipeline.rs +++ b/crates/rayexec_execution/src/execution/executable/pipeline.rs @@ -349,7 +349,7 @@ impl ExecutablePartitionPipeline { // Otherwise do a normal pull. let timer = Timer::::start(); - let poll_pull = operator.physical.poll_pull( + let poll_pull = operator.physical.poll_pull_old( cx, &mut operator.partition_state, &operator.operator_state, @@ -419,7 +419,7 @@ impl ExecutablePartitionPipeline { .expect("next operator to exist"); let timer = Timer::::start(); - let poll_finalize = next_operator.physical.poll_finalize_push( + let poll_finalize = next_operator.physical.poll_finalize_push_old( cx, &mut next_operator.partition_state, &next_operator.operator_state, @@ -463,7 +463,7 @@ impl ExecutablePartitionPipeline { operator.profile_data.rows_read += batch.num_rows(); let timer = Timer::::start(); - let poll_push = operator.physical.poll_push( + let poll_push = operator.physical.poll_push_old( cx, &mut operator.partition_state, &operator.operator_state, diff --git a/crates/rayexec_execution/src/execution/executable/planner.rs b/crates/rayexec_execution/src/execution/executable/planner.rs index e26b4d20c..6031e6ebd 100644 --- a/crates/rayexec_execution/src/execution/executable/planner.rs +++ b/crates/rayexec_execution/src/execution/executable/planner.rs @@ -363,7 +363,7 @@ impl PendingQuery { } let operator = Arc::new(PhysicalOperator::ResultSink(SinkOperator::new(sink))); - let states = operator.create_states(context, vec![partitions])?; + let states = operator.create_states_old(context, vec![partitions])?; let partition_states = match states.partition_states { InputOutputStates::OneToOne { partition_states } => partition_states, _ => return Err(RayexecError::new("invalid partition states for query sink")), @@ -429,7 +429,7 @@ impl PendingQuery { } }; - let states = operator.create_states(context, vec![partitions])?; + let states = operator.create_states_old(context, vec![partitions])?; let partition_states = match states.partition_states { InputOutputStates::OneToOne { partition_states } => partition_states, _ => return Err(RayexecError::new("invalid partition states")), @@ -549,7 +549,7 @@ impl PendingQuery { } }; - let states = operator.create_states(context, vec![partitions])?; + let states = operator.create_states_old(context, vec![partitions])?; let partition_states = match states.partition_states { InputOutputStates::OneToOne { partition_states } => partition_states, _ => { @@ -610,7 +610,7 @@ impl PendingQuery { ) -> Result { let rr_operator = Arc::new(PhysicalOperator::RoundRobin(PhysicalRoundRobinRepartition)); let states = rr_operator - .create_states(context, vec![pipeline.num_partitions(), output_partitions])?; + .create_states_old(context, vec![pipeline.num_partitions(), output_partitions])?; let (push_states, pull_states) = match states.partition_states { InputOutputStates::SeparateInputOutput { @@ -696,7 +696,9 @@ impl PendingOperatorWithState { .unwrap_or(config.partitions); // TODO: How to get other input partitions. - let states = operator.operator.create_states(context, vec![partitions])?; + let states = operator + .operator + .create_states_old(context, vec![partitions])?; Ok(match states.partition_states { InputOutputStates::OneToOne { partition_states } => PendingOperatorWithState { diff --git a/crates/rayexec_execution/src/execution/operators/analyze.rs b/crates/rayexec_execution/src/execution/operators/analyze.rs index 532ecfd03..c10455727 100644 --- a/crates/rayexec_execution/src/execution/operators/analyze.rs +++ b/crates/rayexec_execution/src/execution/operators/analyze.rs @@ -20,7 +20,7 @@ use crate::explain::explainable::{ExplainConfig, ExplainEntry, Explainable}; pub struct PhysicalAnalyze {} impl ExecutableOperator for PhysicalAnalyze { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, _partitions: Vec, @@ -28,7 +28,7 @@ impl ExecutableOperator for PhysicalAnalyze { unimplemented!() } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -38,7 +38,7 @@ impl ExecutableOperator for PhysicalAnalyze { unimplemented!() } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -47,7 +47,7 @@ impl ExecutableOperator for PhysicalAnalyze { unimplemented!() } - fn poll_pull( + fn poll_pull_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/batch_resizer.rs b/crates/rayexec_execution/src/execution/operators/batch_resizer.rs index da85b41f1..c39508094 100644 --- a/crates/rayexec_execution/src/execution/operators/batch_resizer.rs +++ b/crates/rayexec_execution/src/execution/operators/batch_resizer.rs @@ -37,7 +37,7 @@ pub struct BatchResizerPartitionState { pub struct PhysicalBatchResizer; impl ExecutableOperator for PhysicalBatchResizer { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -60,7 +60,7 @@ impl ExecutableOperator for PhysicalBatchResizer { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -99,7 +99,7 @@ impl ExecutableOperator for PhysicalBatchResizer { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -131,7 +131,7 @@ impl ExecutableOperator for PhysicalBatchResizer { Ok(PollFinalize::Finalized) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/create_schema.rs b/crates/rayexec_execution/src/execution/operators/create_schema.rs index 5572dbe23..a691418bd 100644 --- a/crates/rayexec_execution/src/execution/operators/create_schema.rs +++ b/crates/rayexec_execution/src/execution/operators/create_schema.rs @@ -50,7 +50,7 @@ impl PhysicalCreateSchema { } impl ExecutableOperator for PhysicalCreateSchema { - fn create_states( + fn create_states_old( &self, context: &DatabaseContext, partitions: Vec, @@ -83,7 +83,7 @@ impl ExecutableOperator for PhysicalCreateSchema { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -93,7 +93,7 @@ impl ExecutableOperator for PhysicalCreateSchema { Err(RayexecError::new("Cannot push to physical create table")) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -102,7 +102,7 @@ impl ExecutableOperator for PhysicalCreateSchema { Err(RayexecError::new("Cannot push to physical create table")) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/create_view.rs b/crates/rayexec_execution/src/execution/operators/create_view.rs index 34d7fbecd..7770328d0 100644 --- a/crates/rayexec_execution/src/execution/operators/create_view.rs +++ b/crates/rayexec_execution/src/execution/operators/create_view.rs @@ -41,7 +41,7 @@ pub struct PhysicalCreateView { } impl ExecutableOperator for PhysicalCreateView { - fn create_states( + fn create_states_old( &self, context: &DatabaseContext, partitions: Vec, @@ -81,7 +81,7 @@ impl ExecutableOperator for PhysicalCreateView { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -91,7 +91,7 @@ impl ExecutableOperator for PhysicalCreateView { Err(RayexecError::new("Cannot push to physical create view")) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -100,7 +100,7 @@ impl ExecutableOperator for PhysicalCreateView { Err(RayexecError::new("Cannot push to physical create view")) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/drop.rs b/crates/rayexec_execution/src/execution/operators/drop.rs index 29fe32373..f83f25aad 100644 --- a/crates/rayexec_execution/src/execution/operators/drop.rs +++ b/crates/rayexec_execution/src/execution/operators/drop.rs @@ -47,7 +47,7 @@ impl PhysicalDrop { } impl ExecutableOperator for PhysicalDrop { - fn create_states( + fn create_states_old( &self, context: &DatabaseContext, partitions: Vec, @@ -76,7 +76,7 @@ impl ExecutableOperator for PhysicalDrop { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -86,7 +86,7 @@ impl ExecutableOperator for PhysicalDrop { Err(RayexecError::new("Cannot push to physical create table")) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -95,7 +95,7 @@ impl ExecutableOperator for PhysicalDrop { Err(RayexecError::new("Cannot push to physical create table")) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/empty.rs b/crates/rayexec_execution/src/execution/operators/empty.rs index b2e8b257b..16d2a843b 100644 --- a/crates/rayexec_execution/src/execution/operators/empty.rs +++ b/crates/rayexec_execution/src/execution/operators/empty.rs @@ -29,7 +29,7 @@ pub struct EmptyPartitionState { pub struct PhysicalEmpty; impl ExecutableOperator for PhysicalEmpty { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -44,7 +44,7 @@ impl ExecutableOperator for PhysicalEmpty { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -54,7 +54,7 @@ impl ExecutableOperator for PhysicalEmpty { Err(RayexecError::new("Cannot push to physical empty")) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -63,7 +63,7 @@ impl ExecutableOperator for PhysicalEmpty { Err(RayexecError::new("Cannot push to physical empty")) } - fn poll_pull( + fn poll_pull_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/hash_aggregate/mod.rs b/crates/rayexec_execution/src/execution/operators/hash_aggregate/mod.rs index 0f4b04e4e..745dffc3f 100644 --- a/crates/rayexec_execution/src/execution/operators/hash_aggregate/mod.rs +++ b/crates/rayexec_execution/src/execution/operators/hash_aggregate/mod.rs @@ -223,7 +223,7 @@ impl PhysicalHashAggregate { } impl ExecutableOperator for PhysicalHashAggregate { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -293,7 +293,7 @@ impl ExecutableOperator for PhysicalHashAggregate { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -318,7 +318,7 @@ impl ExecutableOperator for PhysicalHashAggregate { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -376,7 +376,7 @@ impl ExecutableOperator for PhysicalHashAggregate { } } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/hash_join/mod.rs b/crates/rayexec_execution/src/execution/operators/hash_join/mod.rs index 442d04e17..58568e4cf 100644 --- a/crates/rayexec_execution/src/execution/operators/hash_join/mod.rs +++ b/crates/rayexec_execution/src/execution/operators/hash_join/mod.rs @@ -188,7 +188,7 @@ impl PhysicalHashJoin { } impl ExecutableOperator for PhysicalHashJoin { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -246,7 +246,7 @@ impl ExecutableOperator for PhysicalHashJoin { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -347,7 +347,7 @@ impl ExecutableOperator for PhysicalHashJoin { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -491,7 +491,7 @@ impl ExecutableOperator for PhysicalHashJoin { } } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/limit.rs b/crates/rayexec_execution/src/execution/operators/limit.rs index 16676bcda..ba83b6e06 100644 --- a/crates/rayexec_execution/src/execution/operators/limit.rs +++ b/crates/rayexec_execution/src/execution/operators/limit.rs @@ -62,7 +62,7 @@ impl PhysicalLimit { } impl ExecutableOperator for PhysicalLimit { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -88,7 +88,7 @@ impl ExecutableOperator for PhysicalLimit { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -154,7 +154,7 @@ impl ExecutableOperator for PhysicalLimit { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -173,7 +173,7 @@ impl ExecutableOperator for PhysicalLimit { Ok(PollFinalize::Finalized) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -245,7 +245,9 @@ mod tests { fn create_states(operator: &PhysicalLimit, partitions: usize) -> Vec { let context = test_database_context(); - let states = operator.create_states(&context, vec![partitions]).unwrap(); + let states = operator + .create_states_old(&context, vec![partitions]) + .unwrap(); match states.partition_states { InputOutputStates::OneToOne { partition_states } => partition_states, diff --git a/crates/rayexec_execution/src/execution/operators/mod.rs b/crates/rayexec_execution/src/execution/operators/mod.rs index 24c4aec6b..2a6464772 100644 --- a/crates/rayexec_execution/src/execution/operators/mod.rs +++ b/crates/rayexec_execution/src/execution/operators/mod.rs @@ -277,14 +277,14 @@ pub trait ExecutableOperator: Sync + Send + Debug + Explainable { /// pushing batches through this operator. /// /// Joins are assumed to have two inputs. - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, _partitions: Vec, ) -> Result; /// Try to push a batch for this partition. - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -296,7 +296,7 @@ pub trait ExecutableOperator: Sync + Send + Debug + Explainable { /// /// This indicates the operator will receive no more input for a given /// partition, allowing the operator to execution some finalization logic. - fn poll_finalize_push( + fn poll_finalize_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -304,7 +304,7 @@ pub trait ExecutableOperator: Sync + Send + Debug + Explainable { ) -> Result; /// Try to pull a batch for this partition. - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -348,46 +348,46 @@ pub enum PhysicalOperator { } impl ExecutableOperator for PhysicalOperator { - fn create_states( + fn create_states_old( &self, context: &DatabaseContext, partitions: Vec, ) -> Result { match self { - Self::HashAggregate(op) => op.create_states(context, partitions), - Self::UngroupedAggregate(op) => op.create_states(context, partitions), - Self::Window(op) => op.create_states(context, partitions), - Self::NestedLoopJoin(op) => op.create_states(context, partitions), - Self::HashJoin(op) => op.create_states(context, partitions), - Self::Values(op) => op.create_states(context, partitions), - Self::ResultSink(op) => op.create_states(context, partitions), - Self::DynSink(op) => op.create_states(context, partitions), - Self::DynSource(op) => op.create_states(context, partitions), - Self::MaterializedSink(op) => op.create_states(context, partitions), - Self::MaterializedSource(op) => op.create_states(context, partitions), - Self::RoundRobin(op) => op.create_states(context, partitions), - Self::MergeSorted(op) => op.create_states(context, partitions), - Self::LocalSort(op) => op.create_states(context, partitions), - Self::Limit(op) => op.create_states(context, partitions), - Self::Union(op) => op.create_states(context, partitions), - Self::Filter(op) => op.create_states(context, partitions), - Self::Project(op) => op.create_states(context, partitions), - Self::Unnest(op) => op.create_states(context, partitions), - Self::Scan(op) => op.create_states(context, partitions), - Self::TableFunction(op) => op.create_states(context, partitions), - Self::TableInOut(op) => op.create_states(context, partitions), - Self::Insert(op) => op.create_states(context, partitions), - Self::CopyTo(op) => op.create_states(context, partitions), - Self::CreateTable(op) => op.create_states(context, partitions), - Self::CreateSchema(op) => op.create_states(context, partitions), - Self::CreateView(op) => op.create_states(context, partitions), - Self::Drop(op) => op.create_states(context, partitions), - Self::Empty(op) => op.create_states(context, partitions), - Self::BatchResizer(op) => op.create_states(context, partitions), + Self::HashAggregate(op) => op.create_states_old(context, partitions), + Self::UngroupedAggregate(op) => op.create_states_old(context, partitions), + Self::Window(op) => op.create_states_old(context, partitions), + Self::NestedLoopJoin(op) => op.create_states_old(context, partitions), + Self::HashJoin(op) => op.create_states_old(context, partitions), + Self::Values(op) => op.create_states_old(context, partitions), + Self::ResultSink(op) => op.create_states_old(context, partitions), + Self::DynSink(op) => op.create_states_old(context, partitions), + Self::DynSource(op) => op.create_states_old(context, partitions), + Self::MaterializedSink(op) => op.create_states_old(context, partitions), + Self::MaterializedSource(op) => op.create_states_old(context, partitions), + Self::RoundRobin(op) => op.create_states_old(context, partitions), + Self::MergeSorted(op) => op.create_states_old(context, partitions), + Self::LocalSort(op) => op.create_states_old(context, partitions), + Self::Limit(op) => op.create_states_old(context, partitions), + Self::Union(op) => op.create_states_old(context, partitions), + Self::Filter(op) => op.create_states_old(context, partitions), + Self::Project(op) => op.create_states_old(context, partitions), + Self::Unnest(op) => op.create_states_old(context, partitions), + Self::Scan(op) => op.create_states_old(context, partitions), + Self::TableFunction(op) => op.create_states_old(context, partitions), + Self::TableInOut(op) => op.create_states_old(context, partitions), + Self::Insert(op) => op.create_states_old(context, partitions), + Self::CopyTo(op) => op.create_states_old(context, partitions), + Self::CreateTable(op) => op.create_states_old(context, partitions), + Self::CreateSchema(op) => op.create_states_old(context, partitions), + Self::CreateView(op) => op.create_states_old(context, partitions), + Self::Drop(op) => op.create_states_old(context, partitions), + Self::Empty(op) => op.create_states_old(context, partitions), + Self::BatchResizer(op) => op.create_states_old(context, partitions), } } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -395,126 +395,140 @@ impl ExecutableOperator for PhysicalOperator { batch: BatchOld, ) -> Result { match self { - Self::HashAggregate(op) => op.poll_push(cx, partition_state, operator_state, batch), + Self::HashAggregate(op) => op.poll_push_old(cx, partition_state, operator_state, batch), Self::UngroupedAggregate(op) => { - op.poll_push(cx, partition_state, operator_state, batch) + op.poll_push_old(cx, partition_state, operator_state, batch) + } + Self::Window(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::NestedLoopJoin(op) => { + op.poll_push_old(cx, partition_state, operator_state, batch) + } + Self::HashJoin(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Values(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::ResultSink(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::DynSink(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::DynSource(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::MaterializedSink(op) => { + op.poll_push_old(cx, partition_state, operator_state, batch) } - Self::Window(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::NestedLoopJoin(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::HashJoin(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Values(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::ResultSink(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::DynSink(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::DynSource(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::MaterializedSink(op) => op.poll_push(cx, partition_state, operator_state, batch), Self::MaterializedSource(op) => { - op.poll_push(cx, partition_state, operator_state, batch) + op.poll_push_old(cx, partition_state, operator_state, batch) } - Self::RoundRobin(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::MergeSorted(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::LocalSort(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Limit(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Union(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Filter(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Project(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Unnest(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Scan(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::TableFunction(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::TableInOut(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Insert(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::CopyTo(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::CreateTable(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::CreateSchema(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::CreateView(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Drop(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::Empty(op) => op.poll_push(cx, partition_state, operator_state, batch), - Self::BatchResizer(op) => op.poll_push(cx, partition_state, operator_state, batch), + Self::RoundRobin(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::MergeSorted(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::LocalSort(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Limit(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Union(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Filter(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Project(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Unnest(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Scan(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::TableFunction(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::TableInOut(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Insert(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::CopyTo(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::CreateTable(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::CreateSchema(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::CreateView(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Drop(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::Empty(op) => op.poll_push_old(cx, partition_state, operator_state, batch), + Self::BatchResizer(op) => op.poll_push_old(cx, partition_state, operator_state, batch), } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, operator_state: &OperatorState, ) -> Result { match self { - Self::HashAggregate(op) => op.poll_finalize_push(cx, partition_state, operator_state), + Self::HashAggregate(op) => { + op.poll_finalize_push_old(cx, partition_state, operator_state) + } Self::UngroupedAggregate(op) => { - op.poll_finalize_push(cx, partition_state, operator_state) + op.poll_finalize_push_old(cx, partition_state, operator_state) } - Self::Window(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::NestedLoopJoin(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::HashJoin(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Values(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::ResultSink(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::DynSink(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::DynSource(op) => op.poll_finalize_push(cx, partition_state, operator_state), + Self::Window(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::NestedLoopJoin(op) => { + op.poll_finalize_push_old(cx, partition_state, operator_state) + } + Self::HashJoin(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Values(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::ResultSink(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::DynSink(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::DynSource(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), Self::MaterializedSink(op) => { - op.poll_finalize_push(cx, partition_state, operator_state) + op.poll_finalize_push_old(cx, partition_state, operator_state) } Self::MaterializedSource(op) => { - op.poll_finalize_push(cx, partition_state, operator_state) + op.poll_finalize_push_old(cx, partition_state, operator_state) + } + Self::RoundRobin(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::MergeSorted(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::LocalSort(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Limit(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Union(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Filter(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Project(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Unnest(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Scan(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::TableFunction(op) => { + op.poll_finalize_push_old(cx, partition_state, operator_state) + } + Self::TableInOut(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Insert(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::CopyTo(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::CreateTable(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::CreateSchema(op) => { + op.poll_finalize_push_old(cx, partition_state, operator_state) + } + Self::CreateView(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Drop(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::Empty(op) => op.poll_finalize_push_old(cx, partition_state, operator_state), + Self::BatchResizer(op) => { + op.poll_finalize_push_old(cx, partition_state, operator_state) } - Self::RoundRobin(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::MergeSorted(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::LocalSort(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Limit(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Union(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Filter(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Project(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Unnest(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Scan(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::TableFunction(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::TableInOut(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Insert(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::CopyTo(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::CreateTable(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::CreateSchema(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::CreateView(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Drop(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::Empty(op) => op.poll_finalize_push(cx, partition_state, operator_state), - Self::BatchResizer(op) => op.poll_finalize_push(cx, partition_state, operator_state), } } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, operator_state: &OperatorState, ) -> Result { match self { - Self::HashAggregate(op) => op.poll_pull(cx, partition_state, operator_state), - Self::UngroupedAggregate(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Window(op) => op.poll_pull(cx, partition_state, operator_state), - Self::NestedLoopJoin(op) => op.poll_pull(cx, partition_state, operator_state), - Self::HashJoin(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Values(op) => op.poll_pull(cx, partition_state, operator_state), - Self::ResultSink(op) => op.poll_pull(cx, partition_state, operator_state), - Self::DynSink(op) => op.poll_pull(cx, partition_state, operator_state), - Self::DynSource(op) => op.poll_pull(cx, partition_state, operator_state), - Self::MaterializedSink(op) => op.poll_pull(cx, partition_state, operator_state), - Self::MaterializedSource(op) => op.poll_pull(cx, partition_state, operator_state), - Self::RoundRobin(op) => op.poll_pull(cx, partition_state, operator_state), - Self::MergeSorted(op) => op.poll_pull(cx, partition_state, operator_state), - Self::LocalSort(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Limit(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Union(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Filter(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Project(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Unnest(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Scan(op) => op.poll_pull(cx, partition_state, operator_state), - Self::TableFunction(op) => op.poll_pull(cx, partition_state, operator_state), - Self::TableInOut(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Insert(op) => op.poll_pull(cx, partition_state, operator_state), - Self::CopyTo(op) => op.poll_pull(cx, partition_state, operator_state), - Self::CreateTable(op) => op.poll_pull(cx, partition_state, operator_state), - Self::CreateSchema(op) => op.poll_pull(cx, partition_state, operator_state), - Self::CreateView(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Drop(op) => op.poll_pull(cx, partition_state, operator_state), - Self::Empty(op) => op.poll_pull(cx, partition_state, operator_state), - Self::BatchResizer(op) => op.poll_pull(cx, partition_state, operator_state), + Self::HashAggregate(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::UngroupedAggregate(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Window(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::NestedLoopJoin(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::HashJoin(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Values(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::ResultSink(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::DynSink(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::DynSource(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::MaterializedSink(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::MaterializedSource(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::RoundRobin(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::MergeSorted(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::LocalSort(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Limit(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Union(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Filter(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Project(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Unnest(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Scan(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::TableFunction(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::TableInOut(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Insert(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::CopyTo(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::CreateTable(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::CreateSchema(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::CreateView(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Drop(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::Empty(op) => op.poll_pull_old(cx, partition_state, operator_state), + Self::BatchResizer(op) => op.poll_pull_old(cx, partition_state, operator_state), } } } diff --git a/crates/rayexec_execution/src/execution/operators/nl_join.rs b/crates/rayexec_execution/src/execution/operators/nl_join.rs index a3e176318..0d854b577 100644 --- a/crates/rayexec_execution/src/execution/operators/nl_join.rs +++ b/crates/rayexec_execution/src/execution/operators/nl_join.rs @@ -209,7 +209,7 @@ impl PhysicalNestedLoopJoin { } impl ExecutableOperator for PhysicalNestedLoopJoin { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -242,7 +242,7 @@ impl ExecutableOperator for PhysicalNestedLoopJoin { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -332,7 +332,7 @@ impl ExecutableOperator for PhysicalNestedLoopJoin { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -381,7 +381,7 @@ impl ExecutableOperator for PhysicalNestedLoopJoin { } } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/round_robin.rs b/crates/rayexec_execution/src/execution/operators/round_robin.rs index 229b55417..c8d434b17 100644 --- a/crates/rayexec_execution/src/execution/operators/round_robin.rs +++ b/crates/rayexec_execution/src/execution/operators/round_robin.rs @@ -58,7 +58,7 @@ pub struct RoundRobinOperatorState { pub struct PhysicalRoundRobinRepartition; impl ExecutableOperator for PhysicalRoundRobinRepartition { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -123,7 +123,7 @@ impl ExecutableOperator for PhysicalRoundRobinRepartition { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -163,7 +163,7 @@ impl ExecutableOperator for PhysicalRoundRobinRepartition { Ok(PollPush::Pushed) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -202,7 +202,7 @@ impl ExecutableOperator for PhysicalRoundRobinRepartition { Ok(PollFinalize::Finalized) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/scan.rs b/crates/rayexec_execution/src/execution/operators/scan.rs index afde56994..f411101ac 100644 --- a/crates/rayexec_execution/src/execution/operators/scan.rs +++ b/crates/rayexec_execution/src/execution/operators/scan.rs @@ -62,7 +62,7 @@ impl PhysicalScan { } impl ExecutableOperator for PhysicalScan { - fn create_states( + fn create_states_old( &self, context: &DatabaseContext, partitions: Vec, @@ -94,7 +94,7 @@ impl ExecutableOperator for PhysicalScan { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -104,7 +104,7 @@ impl ExecutableOperator for PhysicalScan { Err(RayexecError::new("Cannot push to physical scan")) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -113,7 +113,7 @@ impl ExecutableOperator for PhysicalScan { Err(RayexecError::new("Cannot push to physical scan")) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/simple.rs b/crates/rayexec_execution/src/execution/operators/simple.rs index 8ec37dfec..e90600230 100644 --- a/crates/rayexec_execution/src/execution/operators/simple.rs +++ b/crates/rayexec_execution/src/execution/operators/simple.rs @@ -77,7 +77,7 @@ impl SimpleOperator { } impl ExecutableOperator for SimpleOperator { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -92,7 +92,7 @@ impl ExecutableOperator for SimpleOperator { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -124,7 +124,7 @@ impl ExecutableOperator for SimpleOperator { Ok(PollPush::Pushed) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -144,7 +144,7 @@ impl ExecutableOperator for SimpleOperator { Ok(PollFinalize::Finalized) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/sink.rs b/crates/rayexec_execution/src/execution/operators/sink.rs index 09d5cbd8b..404fc39c5 100644 --- a/crates/rayexec_execution/src/execution/operators/sink.rs +++ b/crates/rayexec_execution/src/execution/operators/sink.rs @@ -141,7 +141,7 @@ impl SinkOperator { } impl ExecutableOperator for SinkOperator { - fn create_states( + fn create_states_old( &self, context: &DatabaseContext, partitions: Vec, @@ -178,7 +178,7 @@ impl ExecutableOperator for SinkOperator { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -245,7 +245,7 @@ impl ExecutableOperator for SinkOperator { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -340,7 +340,7 @@ impl ExecutableOperator for SinkOperator { } } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/sort/gather_sort.rs b/crates/rayexec_execution/src/execution/operators/sort/gather_sort.rs index d62948d57..3a051ca60 100644 --- a/crates/rayexec_execution/src/execution/operators/sort/gather_sort.rs +++ b/crates/rayexec_execution/src/execution/operators/sort/gather_sort.rs @@ -192,7 +192,7 @@ impl PhysicalGatherSort { } impl ExecutableOperator for PhysicalGatherSort { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -239,7 +239,7 @@ impl ExecutableOperator for PhysicalGatherSort { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -286,7 +286,7 @@ impl ExecutableOperator for PhysicalGatherSort { Ok(PollPush::Pushed) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -317,7 +317,7 @@ impl ExecutableOperator for PhysicalGatherSort { Ok(PollFinalize::Finalized) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -690,7 +690,7 @@ mod tests { // Partition input is finished. operator - .poll_finalize_push(&mut push_cx.context(), &mut push_states[0], &operator_state) + .poll_finalize_push_old(&mut push_cx.context(), &mut push_states[0], &operator_state) .unwrap(); // Now we can pull the sorted result. @@ -808,14 +808,14 @@ mod tests { // Partition inputs is finished. operator - .poll_finalize_push( + .poll_finalize_push_old( &mut p0_push_cx.context(), &mut push_states[0], &operator_state, ) .unwrap(); operator - .poll_finalize_push( + .poll_finalize_push_old( &mut p1_push_cx.context(), &mut push_states[1], &operator_state, diff --git a/crates/rayexec_execution/src/execution/operators/sort/scatter_sort.rs b/crates/rayexec_execution/src/execution/operators/sort/scatter_sort.rs index 6c2c1f412..647ced138 100644 --- a/crates/rayexec_execution/src/execution/operators/sort/scatter_sort.rs +++ b/crates/rayexec_execution/src/execution/operators/sort/scatter_sort.rs @@ -63,7 +63,7 @@ impl PhysicalScatterSort { } impl ExecutableOperator for PhysicalScatterSort { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -91,7 +91,7 @@ impl ExecutableOperator for PhysicalScatterSort { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -115,7 +115,7 @@ impl ExecutableOperator for PhysicalScatterSort { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -162,7 +162,7 @@ impl ExecutableOperator for PhysicalScatterSort { } } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -273,7 +273,9 @@ mod tests { fn create_states(operator: &PhysicalScatterSort, partitions: usize) -> Vec { let context = test_database_context(); - let states = operator.create_states(&context, vec![partitions]).unwrap(); + let states = operator + .create_states_old(&context, vec![partitions]) + .unwrap(); match states.partition_states { InputOutputStates::OneToOne { partition_states } => partition_states, @@ -306,7 +308,7 @@ mod tests { assert_eq!(PollPush::NeedsMore, poll_push); } operator - .poll_finalize_push( + .poll_finalize_push_old( &mut push_cx.context(), &mut partition_states[0], &operator_state, @@ -348,7 +350,7 @@ mod tests { assert_eq!(PollPush::NeedsMore, poll_push); } operator - .poll_finalize_push( + .poll_finalize_push_old( &mut push_cx.context(), &mut partition_states[0], &operator_state, @@ -394,7 +396,7 @@ mod tests { assert_eq!(PollPush::NeedsMore, poll_push); } operator - .poll_finalize_push( + .poll_finalize_push_old( &mut push_cx.context(), &mut partition_states[0], &operator_state, @@ -459,7 +461,7 @@ mod tests { assert_eq!(PollPush::NeedsMore, poll_push); } operator - .poll_finalize_push( + .poll_finalize_push_old( &mut push_cx.context(), &mut partition_states[0], &operator_state, diff --git a/crates/rayexec_execution/src/execution/operators/sort/top_k.rs b/crates/rayexec_execution/src/execution/operators/sort/top_k.rs index 8428100d9..5b802e6b4 100644 --- a/crates/rayexec_execution/src/execution/operators/sort/top_k.rs +++ b/crates/rayexec_execution/src/execution/operators/sort/top_k.rs @@ -25,7 +25,7 @@ pub struct TopKOperatorState {} pub struct PhysicalTopK {} impl ExecutableOperator for PhysicalTopK { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, _partitions: Vec, @@ -33,7 +33,7 @@ impl ExecutableOperator for PhysicalTopK { unimplemented!() } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -43,7 +43,7 @@ impl ExecutableOperator for PhysicalTopK { unimplemented!() } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -52,7 +52,7 @@ impl ExecutableOperator for PhysicalTopK { unimplemented!() } - fn poll_pull( + fn poll_pull_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/source.rs b/crates/rayexec_execution/src/execution/operators/source.rs index 63bc7528b..07952aff6 100644 --- a/crates/rayexec_execution/src/execution/operators/source.rs +++ b/crates/rayexec_execution/src/execution/operators/source.rs @@ -85,7 +85,7 @@ impl SourceOperator { } impl ExecutableOperator for SourceOperator { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -110,7 +110,7 @@ impl ExecutableOperator for SourceOperator { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -120,7 +120,7 @@ impl ExecutableOperator for SourceOperator { Err(RayexecError::new("Cannot push to physical scan")) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -129,7 +129,7 @@ impl ExecutableOperator for SourceOperator { Err(RayexecError::new("Cannot push to physical scan")) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/table_function.rs b/crates/rayexec_execution/src/execution/operators/table_function.rs index 43e36da59..578fc7471 100644 --- a/crates/rayexec_execution/src/execution/operators/table_function.rs +++ b/crates/rayexec_execution/src/execution/operators/table_function.rs @@ -53,7 +53,7 @@ impl PhysicalTableFunction { } impl ExecutableOperator for PhysicalTableFunction { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -89,7 +89,7 @@ impl ExecutableOperator for PhysicalTableFunction { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -100,7 +100,7 @@ impl ExecutableOperator for PhysicalTableFunction { Err(RayexecError::new("Cannot push to physical table function")) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -109,7 +109,7 @@ impl ExecutableOperator for PhysicalTableFunction { Err(RayexecError::new("Cannot push to physical table function")) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/table_inout.rs b/crates/rayexec_execution/src/execution/operators/table_inout.rs index dd340127b..be8b7978b 100644 --- a/crates/rayexec_execution/src/execution/operators/table_inout.rs +++ b/crates/rayexec_execution/src/execution/operators/table_inout.rs @@ -39,7 +39,7 @@ pub struct PhysicalTableInOut { } impl ExecutableOperator for PhysicalTableInOut { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -74,7 +74,7 @@ impl ExecutableOperator for PhysicalTableInOut { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -128,7 +128,7 @@ impl ExecutableOperator for PhysicalTableInOut { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -142,7 +142,7 @@ impl ExecutableOperator for PhysicalTableInOut { state.function_state.poll_finalize_push(cx) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/test_util.rs b/crates/rayexec_execution/src/execution/operators/test_util.rs index 4ab665880..c8f5f84b9 100644 --- a/crates/rayexec_execution/src/execution/operators/test_util.rs +++ b/crates/rayexec_execution/src/execution/operators/test_util.rs @@ -74,7 +74,7 @@ impl TestWakerContext { operator_state: &OperatorState, batch: impl Into, ) -> Result { - operator.as_ref().poll_push( + operator.as_ref().poll_push_old( &mut self.context(), partition_state, operator_state, @@ -90,7 +90,7 @@ impl TestWakerContext { ) -> Result { operator .as_ref() - .poll_pull(&mut self.context(), partition_state, operator_state) + .poll_pull_old(&mut self.context(), partition_state, operator_state) } } diff --git a/crates/rayexec_execution/src/execution/operators/ungrouped_aggregate.rs b/crates/rayexec_execution/src/execution/operators/ungrouped_aggregate.rs index a3ac15f57..02564d973 100644 --- a/crates/rayexec_execution/src/execution/operators/ungrouped_aggregate.rs +++ b/crates/rayexec_execution/src/execution/operators/ungrouped_aggregate.rs @@ -102,7 +102,7 @@ impl PhysicalUngroupedAggregate { } impl ExecutableOperator for PhysicalUngroupedAggregate { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -135,7 +135,7 @@ impl ExecutableOperator for PhysicalUngroupedAggregate { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -177,7 +177,7 @@ impl ExecutableOperator for PhysicalUngroupedAggregate { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -253,7 +253,7 @@ impl ExecutableOperator for PhysicalUngroupedAggregate { } } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/union.rs b/crates/rayexec_execution/src/execution/operators/union.rs index 852acd8ba..088352568 100644 --- a/crates/rayexec_execution/src/execution/operators/union.rs +++ b/crates/rayexec_execution/src/execution/operators/union.rs @@ -68,7 +68,7 @@ impl PhysicalUnion { } impl ExecutableOperator for PhysicalUnion { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -115,7 +115,7 @@ impl ExecutableOperator for PhysicalUnion { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -163,7 +163,7 @@ impl ExecutableOperator for PhysicalUnion { } } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -198,7 +198,7 @@ impl ExecutableOperator for PhysicalUnion { } } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/unnest.rs b/crates/rayexec_execution/src/execution/operators/unnest.rs index db6cadaaf..c335c19ee 100644 --- a/crates/rayexec_execution/src/execution/operators/unnest.rs +++ b/crates/rayexec_execution/src/execution/operators/unnest.rs @@ -82,7 +82,7 @@ pub struct PhysicalUnnest { } impl ExecutableOperator for PhysicalUnnest { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -117,7 +117,7 @@ impl ExecutableOperator for PhysicalUnnest { }) } - fn poll_push( + fn poll_push_old( &self, cx: &mut Context, partition_state: &mut PartitionState, @@ -158,7 +158,7 @@ impl ExecutableOperator for PhysicalUnnest { Ok(PollPush::Pushed) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, @@ -178,7 +178,7 @@ impl ExecutableOperator for PhysicalUnnest { Ok(PollFinalize::Finalized) } - fn poll_pull( + fn poll_pull_old( &self, cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/values.rs b/crates/rayexec_execution/src/execution/operators/values.rs index 28fed7b03..7940362b1 100644 --- a/crates/rayexec_execution/src/execution/operators/values.rs +++ b/crates/rayexec_execution/src/execution/operators/values.rs @@ -39,7 +39,7 @@ impl PhysicalValues { } impl ExecutableOperator for PhysicalValues { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, partitions: Vec, @@ -64,7 +64,7 @@ impl ExecutableOperator for PhysicalValues { }) } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -74,7 +74,7 @@ impl ExecutableOperator for PhysicalValues { Err(RayexecError::new("Cannot push to Values operator")) } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -83,7 +83,7 @@ impl ExecutableOperator for PhysicalValues { Err(RayexecError::new("Cannot push to Values operator")) } - fn poll_pull( + fn poll_pull_old( &self, _cx: &mut Context, partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/execution/operators/window/mod.rs b/crates/rayexec_execution/src/execution/operators/window/mod.rs index 8a73c59c0..4c312478f 100644 --- a/crates/rayexec_execution/src/execution/operators/window/mod.rs +++ b/crates/rayexec_execution/src/execution/operators/window/mod.rs @@ -19,7 +19,7 @@ use crate::explain::explainable::{ExplainConfig, ExplainEntry, Explainable}; pub struct PhysicalWindow {} impl ExecutableOperator for PhysicalWindow { - fn create_states( + fn create_states_old( &self, _context: &DatabaseContext, _partitions: Vec, @@ -27,7 +27,7 @@ impl ExecutableOperator for PhysicalWindow { unimplemented!() } - fn poll_push( + fn poll_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -37,7 +37,7 @@ impl ExecutableOperator for PhysicalWindow { unimplemented!() } - fn poll_finalize_push( + fn poll_finalize_push_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, @@ -46,7 +46,7 @@ impl ExecutableOperator for PhysicalWindow { unimplemented!() } - fn poll_pull( + fn poll_pull_old( &self, _cx: &mut Context, _partition_state: &mut PartitionState, diff --git a/crates/rayexec_execution/src/expr/physical/column_expr.rs b/crates/rayexec_execution/src/expr/physical/column_expr.rs index cb1e9fc15..e6ab09965 100644 --- a/crates/rayexec_execution/src/expr/physical/column_expr.rs +++ b/crates/rayexec_execution/src/expr/physical/column_expr.rs @@ -78,6 +78,7 @@ mod tests { use crate::arrays::buffer::physical_type::{PhysicalDictionary, PhysicalI32}; use crate::arrays::buffer::{Int32Builder, StringViewBufferBuilder}; use crate::arrays::datatype::DataType; + use crate::arrays::executor::scalar::unary::UnaryExecutor; #[test] fn eval_simple() { @@ -156,5 +157,13 @@ mod tests { // But with a selection stored in the array. let dict_slice = out.data.try_as_slice::().unwrap(); assert_eq!(&[2, 0], dict_slice); + + let mut out_buf = [None, None]; + UnaryExecutor::for_each_flat::(out.flat_view().unwrap(), 0..2, |idx, v| { + out_buf[idx] = v.cloned() + }) + .unwrap(); + + assert_eq!([Some(6), Some(4)], out_buf); } }