Skip to content

Commit

Permalink
inc rename
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 22, 2024
1 parent a6f0e29 commit cc20b88
Show file tree
Hide file tree
Showing 30 changed files with 276 additions and 247 deletions.
6 changes: 3 additions & 3 deletions crates/rayexec_execution/src/execution/executable/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl ExecutablePartitionPipeline {

// Otherwise do a normal pull.
let timer = Timer::<I>::start();
let poll_pull = operator.physical.poll_pull(
let poll_pull = operator.physical.poll_pull_old(
cx,
&mut operator.partition_state,
&operator.operator_state,
Expand Down Expand Up @@ -419,7 +419,7 @@ impl ExecutablePartitionPipeline {
.expect("next operator to exist");

let timer = Timer::<I>::start();
let poll_finalize = next_operator.physical.poll_finalize_push(
let poll_finalize = next_operator.physical.poll_finalize_push_old(
cx,
&mut next_operator.partition_state,
&next_operator.operator_state,
Expand Down Expand Up @@ -463,7 +463,7 @@ impl ExecutablePartitionPipeline {
operator.profile_data.rows_read += batch.num_rows();

let timer = Timer::<I>::start();
let poll_push = operator.physical.poll_push(
let poll_push = operator.physical.poll_push_old(
cx,
&mut operator.partition_state,
&operator.operator_state,
Expand Down
12 changes: 7 additions & 5 deletions crates/rayexec_execution/src/execution/executable/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl PendingQuery {
}

let operator = Arc::new(PhysicalOperator::ResultSink(SinkOperator::new(sink)));
let states = operator.create_states(context, vec![partitions])?;
let states = operator.create_states_old(context, vec![partitions])?;
let partition_states = match states.partition_states {
InputOutputStates::OneToOne { partition_states } => partition_states,
_ => return Err(RayexecError::new("invalid partition states for query sink")),
Expand Down Expand Up @@ -429,7 +429,7 @@ impl PendingQuery {
}
};

let states = operator.create_states(context, vec![partitions])?;
let states = operator.create_states_old(context, vec![partitions])?;
let partition_states = match states.partition_states {
InputOutputStates::OneToOne { partition_states } => partition_states,
_ => return Err(RayexecError::new("invalid partition states")),
Expand Down Expand Up @@ -549,7 +549,7 @@ impl PendingQuery {
}
};

let states = operator.create_states(context, vec![partitions])?;
let states = operator.create_states_old(context, vec![partitions])?;
let partition_states = match states.partition_states {
InputOutputStates::OneToOne { partition_states } => partition_states,
_ => {
Expand Down Expand Up @@ -610,7 +610,7 @@ impl PendingQuery {
) -> Result<ExecutablePipeline> {
let rr_operator = Arc::new(PhysicalOperator::RoundRobin(PhysicalRoundRobinRepartition));
let states = rr_operator
.create_states(context, vec![pipeline.num_partitions(), output_partitions])?;
.create_states_old(context, vec![pipeline.num_partitions(), output_partitions])?;

let (push_states, pull_states) = match states.partition_states {
InputOutputStates::SeparateInputOutput {
Expand Down Expand Up @@ -696,7 +696,9 @@ impl PendingOperatorWithState {
.unwrap_or(config.partitions);

// TODO: How to get other input partitions.
let states = operator.operator.create_states(context, vec![partitions])?;
let states = operator
.operator
.create_states_old(context, vec![partitions])?;

Ok(match states.partition_states {
InputOutputStates::OneToOne { partition_states } => PendingOperatorWithState {
Expand Down
8 changes: 4 additions & 4 deletions crates/rayexec_execution/src/execution/operators/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ use crate::explain::explainable::{ExplainConfig, ExplainEntry, Explainable};
pub struct PhysicalAnalyze {}

impl ExecutableOperator for PhysicalAnalyze {
fn create_states(
fn create_states_old(
&self,
_context: &DatabaseContext,
_partitions: Vec<usize>,
) -> Result<ExecutionStates> {
unimplemented!()
}

fn poll_push(
fn poll_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -38,7 +38,7 @@ impl ExecutableOperator for PhysicalAnalyze {
unimplemented!()
}

fn poll_finalize_push(
fn poll_finalize_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -47,7 +47,7 @@ impl ExecutableOperator for PhysicalAnalyze {
unimplemented!()
}

fn poll_pull(
fn poll_pull_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct BatchResizerPartitionState {
pub struct PhysicalBatchResizer;

impl ExecutableOperator for PhysicalBatchResizer {
fn create_states(
fn create_states_old(
&self,
_context: &DatabaseContext,
partitions: Vec<usize>,
Expand All @@ -60,7 +60,7 @@ impl ExecutableOperator for PhysicalBatchResizer {
})
}

fn poll_push(
fn poll_push_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down Expand Up @@ -99,7 +99,7 @@ impl ExecutableOperator for PhysicalBatchResizer {
}
}

fn poll_finalize_push(
fn poll_finalize_push_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down Expand Up @@ -131,7 +131,7 @@ impl ExecutableOperator for PhysicalBatchResizer {
Ok(PollFinalize::Finalized)
}

fn poll_pull(
fn poll_pull_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl PhysicalCreateSchema {
}

impl ExecutableOperator for PhysicalCreateSchema {
fn create_states(
fn create_states_old(
&self,
context: &DatabaseContext,
partitions: Vec<usize>,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl ExecutableOperator for PhysicalCreateSchema {
})
}

fn poll_push(
fn poll_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -93,7 +93,7 @@ impl ExecutableOperator for PhysicalCreateSchema {
Err(RayexecError::new("Cannot push to physical create table"))
}

fn poll_finalize_push(
fn poll_finalize_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -102,7 +102,7 @@ impl ExecutableOperator for PhysicalCreateSchema {
Err(RayexecError::new("Cannot push to physical create table"))
}

fn poll_pull(
fn poll_pull_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct PhysicalCreateView {
}

impl ExecutableOperator for PhysicalCreateView {
fn create_states(
fn create_states_old(
&self,
context: &DatabaseContext,
partitions: Vec<usize>,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl ExecutableOperator for PhysicalCreateView {
})
}

fn poll_push(
fn poll_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -91,7 +91,7 @@ impl ExecutableOperator for PhysicalCreateView {
Err(RayexecError::new("Cannot push to physical create view"))
}

fn poll_finalize_push(
fn poll_finalize_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -100,7 +100,7 @@ impl ExecutableOperator for PhysicalCreateView {
Err(RayexecError::new("Cannot push to physical create view"))
}

fn poll_pull(
fn poll_pull_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down
8 changes: 4 additions & 4 deletions crates/rayexec_execution/src/execution/operators/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl PhysicalDrop {
}

impl ExecutableOperator for PhysicalDrop {
fn create_states(
fn create_states_old(
&self,
context: &DatabaseContext,
partitions: Vec<usize>,
Expand Down Expand Up @@ -76,7 +76,7 @@ impl ExecutableOperator for PhysicalDrop {
})
}

fn poll_push(
fn poll_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -86,7 +86,7 @@ impl ExecutableOperator for PhysicalDrop {
Err(RayexecError::new("Cannot push to physical create table"))
}

fn poll_finalize_push(
fn poll_finalize_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -95,7 +95,7 @@ impl ExecutableOperator for PhysicalDrop {
Err(RayexecError::new("Cannot push to physical create table"))
}

fn poll_pull(
fn poll_pull_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down
8 changes: 4 additions & 4 deletions crates/rayexec_execution/src/execution/operators/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct EmptyPartitionState {
pub struct PhysicalEmpty;

impl ExecutableOperator for PhysicalEmpty {
fn create_states(
fn create_states_old(
&self,
_context: &DatabaseContext,
partitions: Vec<usize>,
Expand All @@ -44,7 +44,7 @@ impl ExecutableOperator for PhysicalEmpty {
})
}

fn poll_push(
fn poll_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -54,7 +54,7 @@ impl ExecutableOperator for PhysicalEmpty {
Err(RayexecError::new("Cannot push to physical empty"))
}

fn poll_finalize_push(
fn poll_finalize_push_old(
&self,
_cx: &mut Context,
_partition_state: &mut PartitionState,
Expand All @@ -63,7 +63,7 @@ impl ExecutableOperator for PhysicalEmpty {
Err(RayexecError::new("Cannot push to physical empty"))
}

fn poll_pull(
fn poll_pull_old(
&self,
_cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl PhysicalHashAggregate {
}

impl ExecutableOperator for PhysicalHashAggregate {
fn create_states(
fn create_states_old(
&self,
_context: &DatabaseContext,
partitions: Vec<usize>,
Expand Down Expand Up @@ -293,7 +293,7 @@ impl ExecutableOperator for PhysicalHashAggregate {
})
}

fn poll_push(
fn poll_push_old(
&self,
_cx: &mut Context,
partition_state: &mut PartitionState,
Expand All @@ -318,7 +318,7 @@ impl ExecutableOperator for PhysicalHashAggregate {
}
}

fn poll_finalize_push(
fn poll_finalize_push_old(
&self,
_cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down Expand Up @@ -376,7 +376,7 @@ impl ExecutableOperator for PhysicalHashAggregate {
}
}

fn poll_pull(
fn poll_pull_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl PhysicalHashJoin {
}

impl ExecutableOperator for PhysicalHashJoin {
fn create_states(
fn create_states_old(
&self,
_context: &DatabaseContext,
partitions: Vec<usize>,
Expand Down Expand Up @@ -246,7 +246,7 @@ impl ExecutableOperator for PhysicalHashJoin {
})
}

fn poll_push(
fn poll_push_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down Expand Up @@ -347,7 +347,7 @@ impl ExecutableOperator for PhysicalHashJoin {
}
}

fn poll_finalize_push(
fn poll_finalize_push_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down Expand Up @@ -491,7 +491,7 @@ impl ExecutableOperator for PhysicalHashJoin {
}
}

fn poll_pull(
fn poll_pull_old(
&self,
cx: &mut Context,
partition_state: &mut PartitionState,
Expand Down
Loading

0 comments on commit cc20b88

Please sign in to comment.