diff --git a/crates/rayexec_bullet/src/executor/scalar/fill.rs b/crates/rayexec_bullet/src/executor/scalar/fill.rs index 2580ba522..5765e2c75 100644 --- a/crates/rayexec_bullet/src/executor/scalar/fill.rs +++ b/crates/rayexec_bullet/src/executor/scalar/fill.rs @@ -2,8 +2,9 @@ use std::borrow::Borrow; use rayexec_error::{RayexecError, Result}; -use crate::array::Array; +use crate::array::{Array, ArrayData}; use crate::bitmap::Bitmap; +use crate::datatype::DataType; use crate::executor::builder::{ ArrayBuilder, ArrayDataBuffer, @@ -23,6 +24,7 @@ use crate::executor::physical_type::{ PhysicalI64, PhysicalI8, PhysicalInterval, + PhysicalList, PhysicalStorage, PhysicalType, PhysicalU128, @@ -32,8 +34,15 @@ use crate::executor::physical_type::{ PhysicalU8, PhysicalUtf8, }; +use crate::executor::scalar::UnaryExecutor; use crate::selection; -use crate::storage::{AddressableStorage, UntypedNullStorage}; +use crate::storage::{ + AddressableStorage, + ListItemMetadata, + ListStorage, + PrimitiveStorage, + UntypedNullStorage, +}; /// Singular mapping of a `from` index to a `to` index. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -269,13 +278,63 @@ pub(crate) fn concat_with_exact_total_len(arrays: &[&Array], total_len: usize) - }); concat_with_fill_state::(arrays, state) } - PhysicalType::List => { - // TODO: Very doable - Err(RayexecError::new( - "concatenating list arrays not yet supported", - )) - } + PhysicalType::List => concat_lists(datatype.clone(), arrays, total_len), + } +} + +fn concat_lists(datatype: DataType, arrays: &[&Array], total_len: usize) -> Result { + let inner_arrays = arrays + .iter() + .map(|arr| match arr.array_data() { + ArrayData::List(list) => { + if list.array.has_selection() { + return Err(RayexecError::new("List child array has selection")); + } + Ok(&list.array) + } + other => Err(RayexecError::new(format!( + "Invalid inner array data for concatenating lists, got {:?}", + other.physical_type() + ))), + }) + .collect::>>()?; + + let concatenated = concat(&inner_arrays)?; + + // Update metadata objects. + let mut metadatas = Vec::with_capacity(total_len); + let mut validity = Bitmap::new_with_all_true(total_len); + + let mut acc_rows = 0; + + for (array, child_array) in arrays.iter().zip(inner_arrays) { + UnaryExecutor::for_each::(array, |_row_num, metadata| match metadata { + Some(metadata) => { + metadatas.push(ListItemMetadata { + offset: metadata.offset + acc_rows, + len: metadata.len, + }); + } + None => { + metadatas.push(ListItemMetadata::default()); + validity.set_unchecked(metadatas.len() - 1, false); + } + })?; + + acc_rows += child_array.logical_len() as i32; } + + let data = ListStorage { + metadata: PrimitiveStorage::from(metadatas), + array: concatenated, + }; + + Ok(Array { + datatype, + selection: None, + validity: Some(validity.into()), + data: data.into(), + }) } fn concat_with_fill_state<'a, S, B>( @@ -641,4 +700,25 @@ mod tests { assert_eq!(ScalarValue::from(7), got.logical_value(3).unwrap()); assert_eq!(ScalarValue::from(8), got.logical_value(4).unwrap()); } + + #[test] + fn concat_lists() { + let arr1 = ScalarValue::List(vec![1.into(), 2.into()]) + .as_array(1) + .unwrap(); + let arr2 = ScalarValue::List(vec![3.into(), 4.into(), 5.into()]) + .as_array(1) + .unwrap(); + + let got = concat(&[&arr1, &arr2]).unwrap(); + + assert_eq!( + ScalarValue::List(vec![1.into(), 2.into()]), + got.logical_value(0).unwrap() + ); + assert_eq!( + ScalarValue::List(vec![3.into(), 4.into(), 5.into()]), + got.logical_value(1).unwrap() + ); + } } diff --git a/crates/rayexec_bullet/src/executor/scalar/hash.rs b/crates/rayexec_bullet/src/executor/scalar/hash.rs index 843e0b91f..7f4a35a05 100644 --- a/crates/rayexec_bullet/src/executor/scalar/hash.rs +++ b/crates/rayexec_bullet/src/executor/scalar/hash.rs @@ -2,7 +2,7 @@ use ahash::RandomState; use half::f16; use rayexec_error::{RayexecError, Result}; -use crate::array::Array; +use crate::array::{Array, ArrayData}; use crate::executor::physical_type::{ PhysicalBinary, PhysicalBool, @@ -15,6 +15,7 @@ use crate::executor::physical_type::{ PhysicalI64, PhysicalI8, PhysicalInterval, + PhysicalList, PhysicalStorage, PhysicalType, PhysicalU16, @@ -40,28 +41,60 @@ impl HashExecutor { pub fn hash_combine(array: &Array, hashes: &mut [u64]) -> Result<()> { match array.physical_type() { PhysicalType::UntypedNull => { - Self::hash_one_combine::(array, hashes)? - } - PhysicalType::Boolean => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Int8 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Int16 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Int32 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Int64 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Int128 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::UInt8 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::UInt16 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::UInt32 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::UInt64 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::UInt128 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Float16 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Float32 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Float64 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Binary => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Utf8 => Self::hash_one_combine::(array, hashes)?, - PhysicalType::Interval => Self::hash_one_combine::(array, hashes)?, - PhysicalType::List => { - return Err(RayexecError::new("Hashing list array not yet supported")) + Self::hash_one_inner::(array, hashes)? } + PhysicalType::Boolean => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int8 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int16 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int32 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int64 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int128 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt8 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt16 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt32 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt64 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt128 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Float16 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Float32 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Float64 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Binary => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Utf8 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Interval => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::List => Self::hash_list::(array, hashes)?, } Ok(()) @@ -72,28 +105,60 @@ impl HashExecutor { pub fn hash_no_combine(array: &Array, hashes: &mut [u64]) -> Result<()> { match array.physical_type() { PhysicalType::UntypedNull => { - Self::hash_one_no_combine::(array, hashes)? - } - PhysicalType::Boolean => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Int8 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Int16 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Int32 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Int64 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Int128 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::UInt8 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::UInt16 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::UInt32 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::UInt64 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::UInt128 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Float16 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Float32 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Float64 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Binary => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Utf8 => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::Interval => Self::hash_one_no_combine::(array, hashes)?, - PhysicalType::List => { - return Err(RayexecError::new("Hashing list array not yet supported")) + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Boolean => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int8 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int16 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int32 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int64 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Int128 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt8 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt16 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt32 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt64 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::UInt128 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Float16 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Float32 => { + Self::hash_one_inner::(array, hashes)? } + PhysicalType::Float64 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Binary => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Utf8 => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::Interval => { + Self::hash_one_inner::(array, hashes)? + } + PhysicalType::List => Self::hash_list::(array, hashes)?, } Ok(()) @@ -113,10 +178,11 @@ impl HashExecutor { Ok(hashes) } - fn hash_one_no_combine<'a, 'b, S>(array: &'a Array, hashes: &'b mut [u64]) -> Result<()> + fn hash_one_inner<'a, 'b, S, H>(array: &'a Array, hashes: &'b mut [u64]) -> Result<()> where S: PhysicalStorage, S::Type<'a>: HashValue, + H: SetHash, { let selection = array.selection_vector(); @@ -129,9 +195,9 @@ impl HashExecutor { if validity.value(sel) { let val = unsafe { values.get_unchecked(sel) }; - *hash = val.hash_one(); + H::set_hash(val.hash_one(), hash); } else { - *hash = null_hash_value(); + H::set_hash(null_hash_value(), hash) } } } @@ -141,7 +207,7 @@ impl HashExecutor { for (idx, hash) in hashes.iter_mut().enumerate() { let sel = unsafe { selection::get_unchecked(selection, idx) }; let val = unsafe { values.get_unchecked(sel) }; - *hash = val.hash_one(); + H::set_hash(val.hash_one(), hash); } } } @@ -149,35 +215,65 @@ impl HashExecutor { Ok(()) } - fn hash_one_combine<'a, 'b, S>(array: &'a Array, hashes: &'b mut [u64]) -> Result<()> + fn hash_list(array: &Array, hashes: &mut [u64]) -> Result<()> where - S: PhysicalStorage, - S::Type<'a>: HashValue, + H: SetHash, { + let inner = match array.array_data() { + ArrayData::List(list) => &list.array, + other => { + return Err(RayexecError::new(format!( + "Unexpected array data for list hashing: {:?}", + other.physical_type(), + ))) + } + }; + + // TODO: Try to avoid this. + let mut list_hashes_buf = vec![0; inner.logical_len()]; + Self::hash_no_combine(inner, &mut list_hashes_buf)?; + + let metadata = PhysicalList::get_storage(&array.data)?; let selection = array.selection_vector(); match array.validity() { Some(validity) => { - let values = S::get_storage(&array.data)?; - for (idx, hash) in hashes.iter_mut().enumerate() { let sel = unsafe { selection::get_unchecked(selection, idx) }; if validity.value(sel) { - let val = unsafe { values.get_unchecked(sel) }; - *hash = combine_hashes(val.hash_one(), *hash); + let val = unsafe { metadata.get_unchecked(sel) }; + + // Set first hash. + H::set_hash(list_hashes_buf[val.offset as usize], hash); + + // Combine all the rest. + for hash_idx in 1..val.len { + CombineSetHash::set_hash( + list_hashes_buf[(val.offset + hash_idx) as usize], + hash, + ); + } } else { - *hash = combine_hashes(null_hash_value(), *hash); + H::set_hash(null_hash_value(), hash); } } } None => { - let values = S::get_storage(&array.data)?; - for (idx, hash) in hashes.iter_mut().enumerate() { let sel = unsafe { selection::get_unchecked(selection, idx) }; - let val = unsafe { values.get_unchecked(sel) }; - *hash = combine_hashes(val.hash_one(), *hash); + let val = unsafe { metadata.get_unchecked(sel) }; + + // Set first hash. + H::set_hash(list_hashes_buf[val.offset as usize], hash); + + // Combine all the rest. + for hash_idx in 1..val.len { + CombineSetHash::set_hash( + list_hashes_buf[(val.offset + hash_idx) as usize], + hash, + ); + } } } } @@ -186,6 +282,28 @@ impl HashExecutor { } } +trait SetHash { + fn set_hash(new_hash_value: u64, existing: &mut u64); +} + +#[derive(Debug, Clone, Copy)] +struct OverwriteSetHash; + +impl SetHash for OverwriteSetHash { + fn set_hash(new_hash_value: u64, existing: &mut u64) { + *existing = new_hash_value + } +} + +#[derive(Debug, Clone, Copy)] +struct CombineSetHash; + +impl SetHash for CombineSetHash { + fn set_hash(new_hash_value: u64, existing: &mut u64) { + *existing = combine_hashes(new_hash_value, *existing) + } +} + /// All nulls should hash to the same value. /// /// _What_ that value is is arbitrary, but it needs to be consistent. diff --git a/crates/rayexec_bullet/src/executor/scalar/list.rs b/crates/rayexec_bullet/src/executor/scalar/list.rs index ca31525cf..70622af65 100644 --- a/crates/rayexec_bullet/src/executor/scalar/list.rs +++ b/crates/rayexec_bullet/src/executor/scalar/list.rs @@ -1,22 +1,37 @@ use rayexec_error::{not_implemented, RayexecError, Result}; use crate::array::{Array, ArrayData}; +use crate::bitmap::Bitmap; use crate::executor::builder::{ArrayBuilder, ArrayDataBuffer}; use crate::executor::physical_type::{PhysicalList, PhysicalStorage}; -use crate::executor::scalar::{can_skip_validity_check, validate_logical_len}; +use crate::executor::scalar::{can_skip_validity_check, check_validity, validate_logical_len}; use crate::selection::{self, SelectionVector}; -use crate::storage::AddressableStorage; +use crate::storage::{AddressableStorage, ListItemMetadata}; -pub trait BinaryListReducer: Default { +pub trait BinaryListReducer { + fn new(left_len: i32, right_len: i32) -> Self; fn put_values(&mut self, v1: T, v2: T); fn finish(self) -> O; } +/// List executor that allows for different list lengths, and nulls inside of +/// lists. +pub type FlexibleListExecutor = ListExecutor; + +/// Execute reductions on lists. +/// +/// `ALLOW_DIFFERENT_LENS` controls whether or not this allows for reducing +/// lists of different lengths. +/// +/// `ALLOW_NULLS` controls if this allows nulls in lists. #[derive(Debug, Clone, Copy)] -pub struct ListExecutor; +pub struct ListExecutor; -impl ListExecutor { - pub fn execute_binary_reduce<'a, S, B, R>( +impl + ListExecutor +{ + /// Execute a reducer on two list arrays. + pub fn binary_reduce<'a, S, B, R>( array1: &'a Array, array2: &'a Array, mut builder: ArrayBuilder, @@ -40,72 +55,124 @@ impl ListExecutor { let metadata1 = PhysicalList::get_storage(array1.array_data())?; let metadata2 = PhysicalList::get_storage(array2.array_data())?; - let values1 = get_inner_array_storage::(array1)?; - let values2 = get_inner_array_storage::(array2)?; + let (values1, inner_validity1) = get_inner_array_storage::(array1)?; + let (values2, inner_validity2) = get_inner_array_storage::(array2)?; let inner_sel1 = get_inner_array_selection(array1)?; let inner_sel2 = get_inner_array_selection(array2)?; - for idx in 0..len { - let sel1 = unsafe { selection::get_unchecked(selection1, idx) }; - let sel2 = unsafe { selection::get_unchecked(selection2, idx) }; + if can_skip_validity_check([inner_validity1, inner_validity2]) { + for idx in 0..len { + let sel1 = unsafe { selection::get_unchecked(selection1, idx) }; + let sel2 = unsafe { selection::get_unchecked(selection2, idx) }; - let m1 = unsafe { metadata1.get_unchecked(sel1) }; - let m2 = unsafe { metadata2.get_unchecked(sel2) }; + let m1 = unsafe { metadata1.get_unchecked(sel1) }; + let m2 = unsafe { metadata2.get_unchecked(sel2) }; - if m1.len != m2.len { - return Err(RayexecError::new(format!( - "Cannot reduce arrays with differing lengths, got {} and {}", - m1.len, m2.len - ))); - } + let len = Self::item_iter_len(m1, m2)?; + + let mut reducer = R::new(m1.len, m2.len); + + for inner_idx in 0..len { + let idx1 = m1.offset + inner_idx; + let idx2 = m2.offset + inner_idx; + + let sel1 = unsafe { selection::get_unchecked(inner_sel1, idx1 as usize) }; + let sel2 = unsafe { selection::get_unchecked(inner_sel2, idx2 as usize) }; - let mut reducer = R::default(); + let v1 = unsafe { values1.get_unchecked(sel1) }; + let v2 = unsafe { values2.get_unchecked(sel2) }; - for inner_idx in 0..m1.len { - let idx1 = m1.offset + inner_idx; - let idx2 = m2.offset + inner_idx; + reducer.put_values(v1, v2); + } - let sel1 = unsafe { selection::get_unchecked(inner_sel1, idx1 as usize) }; - let sel2 = unsafe { selection::get_unchecked(inner_sel2, idx2 as usize) }; + let out = reducer.finish(); - let v1 = unsafe { values1.get_unchecked(sel1) }; - let v2 = unsafe { values2.get_unchecked(sel2) }; + builder.buffer.put(idx, &out); + } - reducer.put_values(v1, v2); + Ok(Array { + datatype: builder.datatype, + selection: None, + validity: None, + data: builder.buffer.into_data(), + }) + } else { + if !ALLOW_NULLS { + return Err(RayexecError::new("Cannot reduce list containing NULLs")); } - let out = reducer.finish(); + for idx in 0..len { + let sel1 = unsafe { selection::get_unchecked(selection1, idx) }; + let sel2 = unsafe { selection::get_unchecked(selection2, idx) }; - builder.buffer.put(idx, &out); - } + let m1 = unsafe { metadata1.get_unchecked(sel1) }; + let m2 = unsafe { metadata2.get_unchecked(sel2) }; + + let len = Self::item_iter_len(m1, m2)?; + + let mut reducer = R::new(m1.len, m2.len); + + for inner_idx in 0..len { + let idx1 = m1.offset + inner_idx; + let idx2 = m2.offset + inner_idx; + + let sel1 = unsafe { selection::get_unchecked(inner_sel1, idx1 as usize) }; + let sel2 = unsafe { selection::get_unchecked(inner_sel2, idx2 as usize) }; - Ok(Array { - datatype: builder.datatype, - selection: None, - validity: None, - data: builder.buffer.into_data(), - }) + if check_validity(sel1, inner_validity1) + && check_validity(sel2, inner_validity2) + { + let v1 = unsafe { values1.get_unchecked(sel1) }; + let v2 = unsafe { values2.get_unchecked(sel2) }; + + reducer.put_values(v1, v2); + } + } + + let out = reducer.finish(); + + builder.buffer.put(idx, &out); + } + + Ok(Array { + datatype: builder.datatype, + selection: None, + validity: None, + data: builder.buffer.into_data(), + }) + } } else { // let mut out_validity = None; not_implemented!("list validity execute") } } + + fn item_iter_len(m1: ListItemMetadata, m2: ListItemMetadata) -> Result { + if m1.len == m2.len { + Ok(m1.len) + } else if ALLOW_DIFFERENT_LENS { + Ok(std::cmp::min(m1.len, m2.len)) + } else { + Err(RayexecError::new(format!( + "Cannot reduce arrays with differing lengths, got {} and {}", + m1.len, m2.len + ))) + } + } } /// Gets the inner array storage. Checks to ensure the inner array does not /// contain NULLs. -fn get_inner_array_storage(array: &Array) -> Result> +fn get_inner_array_storage(array: &Array) -> Result<(S::Storage<'_>, Option<&Bitmap>)> where S: PhysicalStorage, { match array.array_data() { ArrayData::List(d) => { - if !can_skip_validity_check([d.array.validity()]) { - return Err(RayexecError::new("Cannot reduce list containing NULLs")); - } - - S::get_storage(d.array.array_data()) + let storage = S::get_storage(d.array.array_data())?; + let validity = d.array.validity(); + Ok((storage, validity)) } _ => Err(RayexecError::new("Expected list array data")), } diff --git a/crates/rayexec_error/src/lib.rs b/crates/rayexec_error/src/lib.rs index e861be50b..3d1e6a084 100644 --- a/crates/rayexec_error/src/lib.rs +++ b/crates/rayexec_error/src/lib.rs @@ -68,7 +68,7 @@ impl RayexecError { } } - pub fn with_field(mut self, (key, value): (K, V)) -> Self + pub fn with_field(mut self, key: K, value: V) -> Self where K: Into, V: ErrorFieldValue + 'static, diff --git a/crates/rayexec_execution/src/execution/operators/unnest.rs b/crates/rayexec_execution/src/execution/operators/unnest.rs index 9ed303efd..22d023580 100644 --- a/crates/rayexec_execution/src/execution/operators/unnest.rs +++ b/crates/rayexec_execution/src/execution/operators/unnest.rs @@ -304,7 +304,7 @@ impl Explainable for PhysicalUnnest { } } -fn unnest(child: &Array, longest_len: usize, meta: ListItemMetadata) -> Result { +pub(crate) fn unnest(child: &Array, longest_len: usize, meta: ListItemMetadata) -> Result { let datatype = child.datatype().clone(); match child.physical_type() { diff --git a/crates/rayexec_execution/src/functions/scalar/builtin/comparison.rs b/crates/rayexec_execution/src/functions/scalar/builtin/comparison.rs index a60a7416e..183504064 100644 --- a/crates/rayexec_execution/src/functions/scalar/builtin/comparison.rs +++ b/crates/rayexec_execution/src/functions/scalar/builtin/comparison.rs @@ -20,17 +20,19 @@ use rayexec_bullet::executor::physical_type::{ PhysicalI8, PhysicalInterval, PhysicalStorage, + PhysicalType, PhysicalU128, PhysicalU16, PhysicalU32, PhysicalU64, PhysicalU8, + PhysicalUntypedNull, PhysicalUtf8, }; -use rayexec_bullet::executor::scalar::BinaryExecutor; +use rayexec_bullet::executor::scalar::{BinaryExecutor, BinaryListReducer, FlexibleListExecutor}; use rayexec_bullet::scalar::decimal::{Decimal128Type, Decimal64Type, DecimalType}; use rayexec_bullet::storage::PrimitiveStorage; -use rayexec_error::Result; +use rayexec_error::{RayexecError, Result}; use crate::expr::Expression; use crate::functions::scalar::{PlannedScalarFunction, ScalarFunction, ScalarFunctionImpl}; @@ -143,6 +145,11 @@ const COMPARISON_SIGNATURES: &[Signature] = &[ variadic_arg: None, return_type: DataTypeId::Boolean, }, + Signature { + positional_args: &[DataTypeId::List, DataTypeId::List], + variadic_arg: None, + return_type: DataTypeId::Boolean, + }, ]; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -475,11 +482,194 @@ fn new_comparison_impl( (DataType::Binary, DataType::Binary) => { Box::new(BaseComparisonImpl::::new()) } + (DataType::List(m1), DataType::List(m2)) if m1 == m2 => { + // TODO: We'll want to figure out casting for lists. + Box::new(ListComparisonImpl::::new(m1.datatype.physical_type()?)) + } (a, b) => return Err(invalid_input_types_error(func, &[a, b])), }, ) } +#[derive(Debug)] +struct ListComparisonReducer { + left_len: i32, + right_len: i32, + all_equal: bool, + result: Option, + _typ: PhantomData, + _op: PhantomData, +} + +impl BinaryListReducer for ListComparisonReducer +where + T: PartialEq + PartialOrd, + O: ComparisonOperation, +{ + fn new(left_len: i32, right_len: i32) -> Self { + ListComparisonReducer { + all_equal: true, + result: None, + left_len, + right_len, + _op: PhantomData, + _typ: PhantomData, + } + } + + fn put_values(&mut self, v1: T, v2: T) { + if self.result.is_some() { + return; + } + if v1 != v2 { + self.all_equal = false; + self.result = Some(O::compare(v1, v2)); + } + } + + fn finish(self) -> bool { + if let Some(result) = self.result { + return result; + } + + if self.all_equal { + O::compare(self.left_len, self.right_len) + } else { + true + } + } +} + +#[derive(Debug, Clone)] +struct ListComparisonImpl { + inner_physical_type: PhysicalType, + _op: PhantomData, +} + +impl ListComparisonImpl { + fn new(inner_physical_type: PhysicalType) -> Self { + ListComparisonImpl { + _op: PhantomData, + inner_physical_type, + } + } +} + +impl ScalarFunctionImpl for ListComparisonImpl +where + O: ComparisonOperation, +{ + fn execute(&self, inputs: &[&Array]) -> Result { + let left = inputs[0]; + let right = inputs[1]; + + let builder = ArrayBuilder { + datatype: DataType::Boolean, + buffer: BooleanBuffer::with_len(left.logical_len()), + }; + + let array = match self.inner_physical_type { + PhysicalType::UntypedNull => FlexibleListExecutor::binary_reduce::< + PhysicalUntypedNull, + _, + ListComparisonReducer<_, O>, + >(left, right, builder)?, + PhysicalType::Boolean => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Int8 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Int16 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Int32 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Int64 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Int128 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::UInt8 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::UInt16 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::UInt32 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::UInt64 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::UInt128 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Float16 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Float32 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Float64 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Interval => FlexibleListExecutor::binary_reduce::< + PhysicalInterval, + _, + ListComparisonReducer<_, O>, + >(left, right, builder)?, + PhysicalType::Binary => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::Utf8 => { + FlexibleListExecutor::binary_reduce::>( + left, right, builder, + )? + } + PhysicalType::List => { + return Err(RayexecError::new( + "Comparison between nested lists not yet supported", + )) + } + }; + + Ok(array) + } +} + #[derive(Debug, Clone)] struct BaseComparisonImpl { _op: PhantomData, diff --git a/crates/rayexec_execution/src/functions/scalar/builtin/similarity/l2_distance.rs b/crates/rayexec_execution/src/functions/scalar/builtin/similarity/l2_distance.rs index cd2765f17..d368bde0a 100644 --- a/crates/rayexec_execution/src/functions/scalar/builtin/similarity/l2_distance.rs +++ b/crates/rayexec_execution/src/functions/scalar/builtin/similarity/l2_distance.rs @@ -109,7 +109,7 @@ where buffer: PrimitiveBuffer::with_len(a.logical_len()), }; - ListExecutor::execute_binary_reduce::>(a, b, builder) + ListExecutor::::binary_reduce::>(a, b, builder) } } @@ -122,6 +122,11 @@ impl BinaryListReducer for L2DistanceReducer where F: Float + AddAssign + AsPrimitive + Default, { + fn new(left_len: i32, right_len: i32) -> Self { + debug_assert_eq!(left_len, right_len); + Self::default() + } + fn put_values(&mut self, v1: F, v2: F) { let diff = v1 - v2; self.distance += diff * diff; diff --git a/crates/rayexec_execution/src/functions/table/builtin/mod.rs b/crates/rayexec_execution/src/functions/table/builtin/mod.rs index fcd1cf122..ae068de94 100644 --- a/crates/rayexec_execution/src/functions/table/builtin/mod.rs +++ b/crates/rayexec_execution/src/functions/table/builtin/mod.rs @@ -1,16 +1,19 @@ pub mod series; pub mod system; +pub mod unnest; use std::sync::LazyLock; use series::GenerateSeries; use system::{ListDatabases, ListFunctions, ListSchemas, ListTables}; +use unnest::Unnest; use super::TableFunction; pub static BUILTIN_TABLE_FUNCTIONS: LazyLock>> = LazyLock::new(|| { vec![ Box::new(GenerateSeries), + Box::new(Unnest), // Various list system object functions. Box::new(ListDatabases::new()), Box::new(ListSchemas::new()), diff --git a/crates/rayexec_execution/src/functions/table/builtin/unnest.rs b/crates/rayexec_execution/src/functions/table/builtin/unnest.rs new file mode 100644 index 000000000..2a8016d9d --- /dev/null +++ b/crates/rayexec_execution/src/functions/table/builtin/unnest.rs @@ -0,0 +1,244 @@ +use std::collections::HashMap; +use std::task::{Context, Waker}; + +use rayexec_bullet::array::{Array, ArrayData}; +use rayexec_bullet::batch::Batch; +use rayexec_bullet::datatype::{DataType, DataTypeId}; +use rayexec_bullet::executor::physical_type::{PhysicalList, PhysicalType}; +use rayexec_bullet::executor::scalar::UnaryExecutor; +use rayexec_bullet::field::{Field, Schema}; +use rayexec_bullet::scalar::OwnedScalarValue; +use rayexec_error::{RayexecError, Result}; + +use crate::execution::operators::unnest::unnest; +use crate::execution::operators::{PollFinalize, PollPush}; +use crate::expr::Expression; +use crate::functions::table::inout::{InOutPollPull, TableInOutFunction, TableInOutPartitionState}; +use crate::functions::table::{ + InOutPlanner, + PlannedTableFunction, + TableFunction, + TableFunctionImpl, + TableFunctionPlanner, +}; +use crate::functions::{invalid_input_types_error, plan_check_num_args, FunctionInfo, Signature}; +use crate::logical::binder::table_list::TableList; +use crate::logical::statistics::StatisticsValue; + +#[derive(Debug, Clone, Copy)] +pub struct Unnest; + +impl FunctionInfo for Unnest { + fn name(&self) -> &'static str { + "unnest" + } + + fn signatures(&self) -> &[Signature] { + &[ + Signature { + positional_args: &[DataTypeId::List], + variadic_arg: None, + return_type: DataTypeId::Any, + }, + Signature { + positional_args: &[DataTypeId::Null], + variadic_arg: None, + return_type: DataTypeId::Null, + }, + ] + } +} + +impl TableFunction for Unnest { + fn planner(&self) -> TableFunctionPlanner { + TableFunctionPlanner::InOut(self) + } +} + +impl InOutPlanner for Unnest { + fn plan( + &self, + table_list: &TableList, + positional_inputs: Vec, + named_inputs: HashMap, + ) -> Result { + plan_check_num_args(self, &positional_inputs, 1)?; + if !named_inputs.is_empty() { + return Err(RayexecError::new( + "UNNEST does not yet accept named arguments", + )); + } + + let datatype = positional_inputs[0].datatype(table_list)?; + + let return_type = match datatype { + DataType::List(m) => *m.datatype, + DataType::Null => DataType::Null, + other => return Err(invalid_input_types_error(self, &[other])), + }; + + let schema = Schema::new([Field::new("unnest", return_type, true)]); + + Ok(PlannedTableFunction { + function: Box::new(*self), + positional_inputs, + named_inputs, + function_impl: TableFunctionImpl::InOut(Box::new(UnnestInOutImpl)), + cardinality: StatisticsValue::Unknown, + schema, + }) + } +} + +#[derive(Debug, Clone)] +pub struct UnnestInOutImpl; + +impl TableInOutFunction for UnnestInOutImpl { + fn create_states( + &self, + num_partitions: usize, + ) -> Result>> { + let states: Vec<_> = (0..num_partitions) + .map(|_| { + Box::new(UnnestInOutPartitionState { + input: None, + input_num_rows: 0, + current_row: 0, + finished: false, + push_waker: None, + pull_waker: None, + }) as _ + }) + .collect(); + + Ok(states) + } +} + +// TODO: A lot of this is duplicated with the Unnest operator. +// +// Ideally we'd have a more generic operator for handling table funcs in the +// select list at some point. +// +// Nearer term we should look at combining the logic a bit more. +#[derive(Debug)] +pub struct UnnestInOutPartitionState { + /// The array we're unnesting. + input: Option, + /// Number of rows in the input batch. + input_num_rows: usize, + /// Current row we're processing. + current_row: usize, + /// If we're finished receiving inputs. + finished: bool, + /// Push side waker. + /// + /// Set if we still have rows to process. + push_waker: Option, + /// Pull side waker. + /// + /// Set if we've processed all rows and need more input. + pull_waker: Option, +} + +impl TableInOutPartitionState for UnnestInOutPartitionState { + fn poll_push(&mut self, cx: &mut Context, inputs: Batch) -> Result { + if self.current_row < self.input_num_rows { + // Still processing inputs, come back later. + self.push_waker = Some(cx.waker().clone()); + if let Some(waker) = self.pull_waker.take() { + waker.wake(); + } + + return Ok(PollPush::Pending(inputs)); + } + + self.input_num_rows = inputs.num_rows(); + self.current_row = 0; + + match inputs.columns().len() { + 1 => self.input = inputs.into_arrays().pop(), + other => { + return Err(RayexecError::new("Invalid number of arrays").with_field("len", other)) + } + } + + if let Some(waker) = self.pull_waker.take() { + waker.wake(); + } + + Ok(PollPush::Pushed) + } + + fn poll_finalize_push(&mut self, _cx: &mut Context) -> Result { + self.finished = true; + + if let Some(waker) = self.pull_waker.take() { + waker.wake(); + } + + Ok(PollFinalize::Finalized) + } + + fn poll_pull(&mut self, cx: &mut Context) -> Result { + if self.current_row >= self.input_num_rows { + if self.finished { + return Ok(InOutPollPull::Exhausted); + } + + // We're done with these inputs. Come back later. + self.pull_waker = Some(cx.waker().clone()); + if let Some(waker) = self.push_waker.take() { + waker.wake(); + } + + return Ok(InOutPollPull::Pending); + } + + let input = self.input.as_ref().unwrap(); + let output = match input.physical_type() { + PhysicalType::List => { + let child = match input.array_data() { + ArrayData::List(list) => list.inner_array(), + _other => return Err(RayexecError::new("Unexpected storage type")), + }; + + match UnaryExecutor::value_at::(input, self.current_row)? { + Some(meta) => { + // Row is a list, unnest. + unnest(child, meta.len as usize, meta)? + } + None => { + // Row is null, produce as single null + Array::new_typed_null_array(child.datatype().clone(), 1)? + } + } + } + PhysicalType::UntypedNull => { + // Just produce null array of length 1. + Array::new_untyped_null_array(1) + } + other => { + return Err(RayexecError::new(format!( + "Unexpected physical type in unnest: {other:?}" + ))) + } + }; + + let row_nums = vec![self.current_row; output.logical_len()]; + + // Next pull works on the next row. + self.current_row += 1; + + // If these inputs are done, go ahead and let the push side know. + if self.current_row >= self.input_num_rows { + if let Some(waker) = self.push_waker.take() { + waker.wake() + } + } + + let batch = Batch::try_new([output])?; + + Ok(InOutPollPull::Batch { batch, row_nums }) + } +} diff --git a/crates/rayexec_execution/src/optimizer/column_prune.rs b/crates/rayexec_execution/src/optimizer/column_prune.rs index ef0d0fc4b..14a854e5e 100644 --- a/crates/rayexec_execution/src/optimizer/column_prune.rs +++ b/crates/rayexec_execution/src/optimizer/column_prune.rs @@ -157,6 +157,57 @@ impl PruneState { bind_context: &mut BindContext, plan: &mut LogicalOperator, ) -> Result<()> { + // TODO: Implement this. It'd let us remove lateral joins from the plan in cases + // where only the output of the right side is projected out. + // + // E.g. `SELECT u.* FROM my_table t, unnest(t.a) u` would let us remove + // the left side as it would only be feeding into the `unnest`. + // + // // Check if this is a magic join first, as we might be able to remove it + // // entirely. + // // + // // We can remove the join if: + // // + // // - We're not referencing anything from the left side in any of the + // // parent nodes. + // // - Join type is INNER + // if let LogicalOperator::MagicJoin(join) = plan { + // if join.node.join_type == JoinType::Inner && !self.implicit_reference { + // match join.get_nth_child(0)? { + // LogicalOperator::MaterializationScan(scan) => { + // let mat_plan = bind_context.get_materialization_mut(scan.node.mat)?; + + // let plan_references_left = self + // .current_references + // .iter() + // .any(|col_expr| mat_plan.table_refs.contains(&col_expr.table_scope)); + + // if !plan_references_left { + // // We can remove the left! Update the plan the + // // just be the right child and continue pruning. + // let [_left, right] = join.take_two_children_exact()?; + // *plan = right; + + // // Decrement scan count. We should be able to + // // remove it entirely if count == 1. + // mat_plan.scan_count -= 1; + + // // And now just walk the updated plan. + // self.walk_plan(bind_context, plan)?; + // self.apply_updated_expressions(plan)?; + + // return Ok(()); + // } + // } + // other => { + // return Err(RayexecError::new(format!( + // "unexpected left child for magic join: {other:?}" + // ))) + // } + // } + // } + // } + // Extract columns reference in this plan. // // Note that this may result in references tracked that we don't care @@ -181,6 +232,8 @@ impl PruneState { // Left child is materialization, right child is normal plan // with some number of magic scans. // + // Push down on both sides: + // // 1. Extract all column references to the materialized plan on // the right. This should get us the complete set of column // exprs that are referenced. diff --git a/slt/standard/functions/scalars/list_comparisons.slt b/slt/standard/functions/scalars/list_comparisons.slt new file mode 100644 index 000000000..09f6c66e9 --- /dev/null +++ b/slt/standard/functions/scalars/list_comparisons.slt @@ -0,0 +1,89 @@ +# Comparisons between lists + +query T +SELECT [] = []; +---- +true + +query T +SELECT [] <= []; +---- +true + +query T +SELECT [] < []; +---- +false + +# TODO: Cast +# query T +# SELECT [] < [3]; +# ---- +# true + +query T +SELECT [3] = [3]; +---- +true + +query T +SELECT [3] != [3]; +---- +false + +query T +SELECT [NULL] = [NULL]; +---- +true + +# TODO: Cast +# query T +# SELECT [NULL] = [4]; +# ---- +# true + +query T +SELECT [3, 4] = [4]; +---- +false + +query T +SELECT [3, 4] > [4]; +---- +false + +query T +SELECT [3] < [3, 4]; +---- +true + +query T +SELECT [3, 4, 5] < [3, 4]; +---- +false + +query T +SELECT [5] < [3, 4]; +---- +false + +query T +SELECT [3, 4] < [4]; +---- +true + +query T +SELECT [NULL, 4] = [4]; +---- +false + +query T +SELECT [NULL, 4] > [4]; +---- +true + +query T +SELECT [NULL, 4] = [NULL, 4]; +---- +true + diff --git a/slt/standard/functions/table/unnest_list.slt b/slt/standard/functions/table/unnest_list.slt index 514bb1399..209634c54 100644 --- a/slt/standard/functions/table/unnest_list.slt +++ b/slt/standard/functions/table/unnest_list.slt @@ -1,5 +1,50 @@ # UNNEST as a table function operating on lists. -# query I -# SELECT * FROM unnest([3,4,5]); -# ---- +query TT +DESCRIBE SELECT * FROM unnest([3,4,5]) ORDER BY 1; +---- +unnest Int32 + +query I +SELECT * FROM unnest([3,4,5]) ORDER BY 1; +---- +3 +4 +5 + +query TT +DESCRIBE SELECT * FROM unnest(NULL); +---- +unnest Null + +query ? +SELECT * FROM unnest(NULL); +---- +NULL + +query ? +SELECT * FROM unnest([]); +---- + +# Lateral + +query ? +SELECT u.* FROM (VALUES ([1,2,3]), ([8,9])) v(a), unnest(v.a) u ORDER BY 1; +---- +1 +2 +3 +8 +9 + +# TODO: Allow order by list (need to implement interleave on lists) +query ?I rowsort +SELECT * FROM (VALUES ([1,2,3]), ([8,9])) v(a), unnest(v.a); +---- +[1, 2, 3] 1 +[1, 2, 3] 2 +[1, 2, 3] 3 +[8, 9] 8 +[8, 9] 9 + +