Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 24, 2024
1 parent 4b6508c commit 02ac58e
Show file tree
Hide file tree
Showing 15 changed files with 546 additions and 51 deletions.
30 changes: 30 additions & 0 deletions crates/rayexec_execution/src/arrays/buffer/physical_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,36 @@ impl PhysicalStorage for PhysicalUntypedNull {
}
}

#[derive(Debug, Clone, Copy)]
pub struct PhysicalBoolean;

impl PhysicalStorage for PhysicalBoolean {
const PHYSICAL_TYPE: PhysicalType = PhysicalType::Boolean;

type PrimaryBufferType = bool;
type StorageType = Self::PrimaryBufferType;

type Storage<'a> = &'a [Self::StorageType];

fn get_storage<B>(buffer: &ArrayBuffer<B>) -> Result<Self::Storage<'_>>
where
B: BufferManager,
{
buffer.try_as_slice::<Self>()
}
}

impl MutablePhysicalStorage for PhysicalBoolean {
type MutableStorage<'a> = &'a mut [Self::StorageType];

fn get_storage_mut<B>(buffer: &mut ArrayBuffer<B>) -> Result<Self::MutableStorage<'_>>
where
B: BufferManager,
{
buffer.try_as_slice_mut::<Self>()
}
}

#[derive(Debug, Clone, Copy)]
pub struct PhysicalI8;

Expand Down
4 changes: 4 additions & 0 deletions crates/rayexec_execution/src/arrays/flat_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ where
})
}
}

pub fn logical_len(&self) -> usize {
self.selection.len()
}
}

#[derive(Debug, Clone, Copy)]
Expand Down
3 changes: 3 additions & 0 deletions crates/rayexec_execution/src/arrays/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ pub mod flat_array;
pub mod scalar;
pub mod schema;
pub mod validity;

#[cfg(test)]
pub(crate) mod testutil;
170 changes: 170 additions & 0 deletions crates/rayexec_execution/src/arrays/testutil.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::collections::BTreeMap;
use std::fmt::Debug;

use iterutil::exact_size::IntoExactSizeIterator;

use super::array::Array;
use super::batch::Batch;
use super::buffer::{Int32BufferBuilder, StringBufferBuilder};
use super::buffer_manager::NopBufferManager;
use super::datatype::DataType;
use crate::arrays::buffer::physical_type::{PhysicalBoolean, PhysicalI32, PhysicalStorage, PhysicalType, PhysicalUtf8};
use crate::arrays::buffer::ArrayBuffer;
use crate::arrays::executor::scalar::binary::BinaryExecutor;
use crate::arrays::executor::scalar::unary::UnaryExecutor;
use crate::arrays::executor::OutBuffer;
use crate::arrays::flat_array::FlatArrayView;
use crate::arrays::validity::Validity;

pub fn new_i32_array(vals: impl IntoExactSizeIterator<Item = i32>) -> Array {
Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter(vals).unwrap())
}

pub fn new_string_array<'a>(vals: impl IntoExactSizeIterator<Item = &'a str>) -> Array {
Array::new_with_buffer(DataType::Utf8, StringBufferBuilder::from_iter(vals).unwrap())
}

pub fn new_batch_from_arrays(arrays: impl IntoIterator<Item = Array>) -> Batch {
Batch::from_arrays(arrays, true).unwrap()
}

/// Assert two arrays are logically equal.
///
/// This will assume that the array's capacity is the array's logical length.
pub fn assert_arrays_eq(array1: &Array, array2: &Array) {
assert_eq!(array1.capacity(), array2.capacity(), "array capacities differ");
assert_arrays_eq_count(array1, array2, array1.capacity())
}

/// Asserts that two arrays are logically equal for the first `count` rows.
///
/// This will check valid and invalid values. Assertion error messages will
/// print out Some/None to represent valid/invalid.
pub fn assert_arrays_eq_count(array1: &Array, array2: &Array, count: usize) {
assert_eq!(array1.datatype, array2.datatype);

let flat1 = array1.flat_view().unwrap();
let flat2 = array2.flat_view().unwrap();

fn assert_eq_inner<S>(flat1: FlatArrayView, flat2: FlatArrayView, count: usize)
where
S: PhysicalStorage,
S::StorageType: ToOwned<Owned: Debug + PartialEq>,
{
let mut out = BTreeMap::new();
let sel = 0..count;

UnaryExecutor::for_each_flat::<S, _>(flat1, sel.clone(), |idx, v| {
out.insert(idx, v.map(|v| v.to_owned()));
})
.unwrap();

UnaryExecutor::for_each_flat::<S, _>(flat2, sel, |idx, v| match out.remove(&idx) {
Some(existing) => {
let v = v.map(|v| v.to_owned());
assert_eq!(existing, v, "values differ at index {idx}");
}
None => panic!("missing value for index in array 1 {idx}"),
})
.unwrap();

if !out.is_empty() {
panic!("extra entries in array 1: {:?}", out);
}
}

match array1.datatype.physical_type() {
PhysicalType::Int32 => assert_eq_inner::<PhysicalI32>(flat1, flat2, count),
PhysicalType::Utf8 => assert_eq_inner::<PhysicalUtf8>(flat1, flat2, count),
other => unimplemented!("{other:?}"),
}
}

/// Asserts two batches are logically equal.
pub fn assert_batches_eq(batch1: &Batch, batch2: &Batch) {
let arrays1 = batch1.arrays();
let arrays2 = batch2.arrays();

assert_eq!(arrays1.len(), arrays2.len(), "batches have different number of arrays");
assert_eq!(
batch1.num_rows(),
batch2.num_rows(),
"batches have different number of rows"
);

for (array1, array2) in arrays1.iter().zip(arrays2) {
assert_arrays_eq_count(array1, array2, batch1.num_rows());
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn assert_i32_arrays_eq_simple() {
let array1 = new_i32_array([4, 5, 6]);
let array2 = new_i32_array([4, 5, 6]);

assert_arrays_eq(&array1, &array2);
}

#[test]
fn assert_i32_arrays_eq_with_dictionary() {
let array1 = new_i32_array([5, 4, 4]);
let mut array2 = new_i32_array([4, 5]);
array2.select(&NopBufferManager, [1, 0, 0]).unwrap();

assert_arrays_eq(&array1, &array2);
}

#[test]
fn assert_i32_arrays_eq_with_invalid() {
let mut array1 = new_i32_array([4, 5, 6]);
array1.validity.set_invalid(1);

let mut array2 = new_i32_array([4, 8, 6]);
array2.validity.set_invalid(1);

assert_arrays_eq(&array1, &array2);
}

#[test]
fn assert_batches_eq_simple() {
let batch1 = new_batch_from_arrays([new_i32_array([4, 5, 6]), new_string_array(["a", "b", "c"])]);
let batch2 = new_batch_from_arrays([new_i32_array([4, 5, 6]), new_string_array(["a", "b", "c"])]);

assert_batches_eq(&batch1, &batch2);
}

#[test]
fn assert_batches_eq_logical_row_count() {
let mut batch1 = new_batch_from_arrays([
new_i32_array([4, 5, 6, 7, 8]),
new_string_array(["a", "b", "c", "d", "e"]),
]);
batch1.set_num_rows(3).unwrap();

let batch2 = new_batch_from_arrays([new_i32_array([4, 5, 6]), new_string_array(["a", "b", "c"])]);

assert_batches_eq(&batch1, &batch2);
}

#[test]
#[should_panic]
fn assert_i32_arrays_eq_not_eq() {
let array1 = new_i32_array([4, 5, 6]);
let array2 = new_i32_array([4, 5, 7]);

assert_arrays_eq(&array1, &array2);
}

#[test]
#[should_panic]
fn assert_i32_arrays_different_lengths() {
let array1 = new_i32_array([4, 5, 6]);
let array2 = new_i32_array([4, 5]);

assert_arrays_eq(&array1, &array2);
}
}
24 changes: 24 additions & 0 deletions crates/rayexec_execution/src/arrays/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,28 @@ impl Validity {
ValidityInner::Mask { bitmap } => bitmap.set(idx, false),
}
}

pub fn iter(&self) -> ValidityIter {
ValidityIter { idx: 0, validity: self }
}
}

#[derive(Debug)]
pub struct ValidityIter<'a> {
idx: usize,
validity: &'a Validity,
}

impl<'a> Iterator for ValidityIter<'a> {
type Item = bool;

fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.validity.len() {
return None;
}

let val = self.validity.is_valid(self.idx);
self.idx += 1;
Some(val)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ where
self.capacity
}

pub fn set_row_count(&mut self, count: usize) -> Result<()> {
if count > self.capacity {
return Err(RayexecError::new("Row count would exceed capacity"));
}
self.row_count = count;
Ok(())
}

pub fn row_count(&self) -> usize {
self.row_count
}
Expand All @@ -67,6 +75,7 @@ where
self.row_count + additional < self.capacity
}

/// Appends a batch to this block.
pub fn append_batch_data(&mut self, batch: &Batch<B>) -> Result<()> {
let total_num_rows = self.row_count + batch.num_rows();
if total_num_rows > self.capacity {
Expand Down
28 changes: 27 additions & 1 deletion crates/rayexec_execution/src/execution/operators_exp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ pub mod batch_collection;
pub mod physical_project;
pub mod physical_sort;

#[cfg(test)]
mod testutil;

use std::fmt::Debug;
use std::task::Context;

use physical_project::ProjectPartitionState;
use physical_sort::{SortOperatorState, SortPartitionState};
use rayexec_error::Result;
use rayexec_error::{RayexecError, Result};

use crate::arrays::batch::Batch;
use crate::arrays::buffer_manager::BufferManager;
use crate::database::DatabaseContext;
use crate::explain::explainable::Explainable;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -82,6 +86,21 @@ pub enum PartitionAndOperatorStates {
},
}

impl PartitionAndOperatorStates {
pub fn branchless_into_states(self) -> Result<(OperatorState, Vec<PartitionState>)> {
match self {
Self::Branchless {
operator_state,
partition_states,
} => Ok((operator_state, partition_states)),
Self::BranchingOutput { .. } => Err(RayexecError::new("Expected branchless states, got branching output")),
Self::TerminatingInput { .. } => {
Err(RayexecError::new("Expected branchless states, got terminating input"))
}
}
}
}

#[derive(Debug)]
pub enum PartitionState {
Project(ProjectPartitionState),
Expand All @@ -108,6 +127,13 @@ pub struct ExecuteInOutState<'a> {
}

pub trait ExecutableOperator: Sync + Send + Debug + Explainable {
fn create_states(
&self,
context: &DatabaseContext,
batch_size: usize,
partitions: usize,
) -> Result<PartitionAndOperatorStates>;

fn poll_execute(
&self,
cx: &mut Context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use super::{
ExecutableOperator,
ExecuteInOutState,
OperatorState,
PartitionAndOperatorStates,
PartitionState,
PollExecute,
PollFinalize,
};
use crate::database::DatabaseContext;
use crate::explain::explainable::{ExplainConfig, ExplainEntry, Explainable};
use crate::expr::physical::evaluator::ExpressionEvaluator;
use crate::expr::physical::PhysicalScalarExpression;
Expand All @@ -25,6 +27,26 @@ pub struct ProjectPartitionState {
}

impl ExecutableOperator for PhysicalProject {
fn create_states(
&self,
_context: &DatabaseContext,
batch_size: usize,
partitions: usize,
) -> Result<PartitionAndOperatorStates> {
let partition_states = (0..partitions)
.map(|_| {
PartitionState::Project(ProjectPartitionState {
evaluator: ExpressionEvaluator::new(self.projections.clone(), batch_size),
})
})
.collect();

Ok(PartitionAndOperatorStates::Branchless {
operator_state: OperatorState::None,
partition_states,
})
}

fn poll_execute(
&self,
_cx: &mut Context,
Expand Down
Loading

0 comments on commit 02ac58e

Please sign in to comment.