diff --git a/crates/rayexec_execution/src/execution/operators_exp/physical_hash_join/build_data.rs b/crates/rayexec_execution/src/execution/operators_exp/physical_hash_join/build_data.rs new file mode 100644 index 000000000..62ab0eab7 --- /dev/null +++ b/crates/rayexec_execution/src/execution/operators_exp/physical_hash_join/build_data.rs @@ -0,0 +1,62 @@ +use rayexec_error::Result; + +use crate::arrays::batch::Batch; +use crate::arrays::buffer_manager::BufferManager; +use crate::arrays::datatype::DataType; +use crate::execution::operators_exp::batch_collection::BatchCollectionBlock; + +#[derive(Debug)] +pub struct BuildData { + capacity_per_block: usize, + blocks: Vec>, +} + +impl BuildData +where + B: BufferManager, +{ + pub fn push_batch(&mut self, manager: &B, input_types: &[DataType], batch: &Batch) -> Result<()> { + let mut block = self.pop_or_allocate_block(manager, input_types, batch.num_rows())?; + + // TODO: Hashes + + block.block.append_batch_data(batch)?; + + self.blocks.push(block); + + Ok(()) + } + + fn pop_or_allocate_block(&mut self, manager: &B, input_types: &[DataType], count: usize) -> Result> { + debug_assert!(count <= self.capacity_per_block); + + if let Some(last) = self.blocks.last() { + if last.block.has_capacity_for_rows(count) { + return Ok(self.blocks.pop().unwrap()); + } + } + + let block = BuildBlock::new(manager, input_types, self.capacity_per_block)?; + + Ok(block) + } +} + +#[derive(Debug)] +pub struct BuildBlock { + block: BatchCollectionBlock, + /// Row hashes, allocated to capacity of the batch block. + hashes: Vec, +} + +impl BuildBlock +where + B: BufferManager, +{ + pub fn new(manager: &B, input_types: &[DataType], capacity: usize) -> Result { + let block = BatchCollectionBlock::new(manager, input_types, capacity)?; + let hashes = vec![0; capacity]; // TODO: Track + + Ok(BuildBlock { block, hashes }) + } +} diff --git a/crates/rayexec_execution/src/execution/operators_exp/physical_hash_join/mod.rs b/crates/rayexec_execution/src/execution/operators_exp/physical_hash_join/mod.rs new file mode 100644 index 000000000..7e9884380 --- /dev/null +++ b/crates/rayexec_execution/src/execution/operators_exp/physical_hash_join/mod.rs @@ -0,0 +1,143 @@ +mod build_data; + +use std::task::{Context, Waker}; + +use build_data::BuildData; +use parking_lot::Mutex; +use rayexec_error::{OptionExt, Result}; + +use super::{ + ExecutableOperator, + ExecuteInOutState, + OperatorState, + PartitionAndOperatorStates, + PartitionState, + PollExecute, + PollFinalize, +}; +use crate::arrays::buffer_manager::NopBufferManager; +use crate::arrays::datatype::DataType; +use crate::database::DatabaseContext; +use crate::explain::explainable::{ExplainConfig, ExplainEntry, Explainable}; + +#[derive(Debug)] +pub enum HashJoinBuildPartitionState { + /// Partition is building. + Building(InProgressBuildState), + /// Partition finished building. + Finished, +} + +#[derive(Debug)] +pub struct InProgressBuildState { + build_data: BuildData, +} + +#[derive(Debug)] +pub enum HashJoinProbePartitionState { + /// Partition waiting for build side to complete. + Waiting(usize), + /// Partition is probing. + Probing(ProbeState), + /// Left-join drain state. + Draining(DrainState), + /// Probing finished. + Finished, +} + +#[derive(Debug)] +pub struct ProbeState {} + +#[derive(Debug)] +pub struct DrainState {} + +#[derive(Debug)] +pub struct HashJoinOperatorState { + inner: Mutex, +} + +#[derive(Debug)] +struct HashJoinOperatorStateInner { + /// Wakers from the probe side that are waiting for the build side to + /// complete. + /// + /// Keyed by probe-side partition index. + build_waiting_probers: Vec>, +} + +#[derive(Debug)] +pub struct PhysicalHashJoin { + /// Data types from the left (build) side of the join. + left_types: Vec, + /// Data types from the right (probe) side of the join. + right_types: Vec, +} + +impl ExecutableOperator for PhysicalHashJoin { + fn create_states( + &self, + context: &DatabaseContext, + batch_size: usize, + partitions: usize, + ) -> Result { + unimplemented!() + } + + fn poll_execute( + &self, + cx: &mut Context, + partition_state: &mut PartitionState, + operator_state: &OperatorState, + inout: ExecuteInOutState, + ) -> Result { + match partition_state { + PartitionState::HashJoinBuild(state) => { + let state = match state { + HashJoinBuildPartitionState::Building(state) => state, + HashJoinBuildPartitionState::Finished => return Ok(PollExecute::Exhausted), // TODO: Probably should error instead. + }; + + let batch = inout.input.required("input batch required")?; + state + .build_data + .push_batch(&NopBufferManager, &self.left_types, batch)?; + + Ok(PollExecute::NeedsMore) + } + PartitionState::HashJoinProbe(state) => { + match state { + HashJoinProbePartitionState::Waiting(probe_idx) => { + // Still waiting for build side to complete, just need + // to register a waker. + + let mut operator_state = match operator_state { + OperatorState::HashJoin(state) => state.inner.lock(), + other => panic!("invalid operator state: {other:?}"), + }; + + operator_state.build_waiting_probers[*probe_idx] = Some(cx.waker().clone()); + + Ok(PollExecute::Pending) + } + _ => unimplemented!(), + } + } + other => panic!("invalid partition state: {other:?}"), + } + } + + fn poll_finalize( + &self, + cx: &mut Context, + partition_state: &mut PartitionState, + operator_state: &OperatorState, + ) -> Result { + unimplemented!() + } +} + +impl Explainable for PhysicalHashJoin { + fn explain_entry(&self, conf: ExplainConfig) -> ExplainEntry { + unimplemented!() + } +}