From 34ab0abc6b5ee5ceeea7a277ffdcd443d067b77f Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 16:15:34 +0200 Subject: [PATCH 01/16] feat(arrow-select): make list of dictionary merge dictionary keys TODO: - [ ] Add support to nested lists - [ ] Add more tests - [ ] Fix failing test --- arrow-select/src/concat.rs | 325 ++++++++++++++++++++++++++++++++++++- 1 file changed, 323 insertions(+), 2 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 129b90ee0470..43a707aa1979 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -34,9 +34,11 @@ use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values} use arrow_array::cast::AsArray; use arrow_array::types::*; use arrow_array::*; -use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer}; +use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer}; use arrow_data::transform::{Capacities, MutableArrayData}; +use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType, SchemaRef}; +use num::Saturating; use std::sync::Arc; fn binary_capacity(arrays: &[&dyn Array]) -> Capacities { @@ -129,12 +131,149 @@ fn concat_dictionaries( Ok(Arc::new(array)) } +fn concat_list_of_dictionaries( + arrays: &[&dyn Array], +) -> Result { + let mut output_len = 0; + let lists = arrays + .iter() + .map(|x| x.as_list::()) + .collect::>(); + + let dictionaries: Vec<_> = lists + .iter() + .map(|x| x.values().as_ref().as_dictionary::()) + // TODO? + .inspect(|d| output_len += d.len()) + .collect(); + + if !should_merge_dictionary_values::(&dictionaries, output_len) { + return concat_fallback(arrays, Capacities::Array(output_len)); + } + + let merged = merge_dictionary_values(&dictionaries, None)?; + + let lists_nulls = lists + .iter() + .fold(None, |acc, a| NullBuffer::union(acc.as_ref(), a.nulls())); + + + // Recompute keys + let mut key_values = Vec::with_capacity(output_len); + + let mut dictionary_has_nulls = false; + for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) { + dictionary_has_nulls |= d.null_count() != 0; + for key in d.keys().values() { + // Use get to safely handle nulls + key_values.push(mapping.get(key.as_usize()).copied().unwrap_or_default()) + } + } + + let dictionary_nulls = dictionary_has_nulls.then(|| { + let mut nulls = BooleanBufferBuilder::new(output_len); + for d in &dictionaries { + match d.nulls() { + Some(n) => nulls.append_buffer(n.inner()), + None => nulls.append_n(d.len(), true), + } + } + NullBuffer::new(nulls.finish()) + }); + + let keys = PrimitiveArray::::new(key_values.into(), dictionary_nulls); + // Sanity check + assert_eq!(keys.len(), output_len); + + let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) }; + + // Merge value offsets from the lists + let all_value_offsets_iterator = lists + .iter() + .map(|x| x.offsets()); + + let value_offset_buffer = merge_value_offsets(all_value_offsets_iterator); + + let builder = ArrayDataBuilder::new(arrays[0].data_type().clone()) + .len(output_len) + .nulls(lists_nulls) + // `GenericListArray` must only have 1 buffer + .buffers(vec![value_offset_buffer]) + // `GenericListArray` must only have 1 child_data + .child_data(vec![array.to_data()]); + + // TODO - maybe use build_unchecked? + let array_data = builder.build()?; + + let array = GenericListArray::::from(array_data); + Ok(Arc::new(array)) +} + +/// Merge value offsets +/// +/// +/// if we have the following +/// [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]] +/// The output should be +/// [ 0, 3, 5, 7, 7, 13, 13, 14] +fn merge_value_offsets<'a, OffsetSize: OffsetSizeTrait, I: Iterator>>(offset_buffers_iterator: I) -> Buffer { + // 1. Filter out empty lists + let mut offset_buffers_iterator = offset_buffers_iterator.filter(|x| !x.is_empty()); + + // 2. Get first non-empty list as the starting point + let starting_buffer = offset_buffers_iterator.next(); + + // 3. If we have only empty lists, return an empty buffer + if starting_buffer.is_none() { + return Buffer::from(&[]) + } + + let starting_buffer = starting_buffer.unwrap(); + + let mut offsets_iter: Box> = Box::new(starting_buffer.iter().copied()); + + // 4. Get the last value in the starting buffer as the starting point for the next buffer + // Safety: We already filtered out empty lists + let mut advance_by = *starting_buffer.last().unwrap(); + + // 5. Iterate over the remaining buffers + for offset_buffer in offset_buffers_iterator { + // 6. Get the last value of the current buffer so we can know how much to advance the next buffer + // Safety: We already filtered out empty lists + let last_value = *offset_buffer.last().unwrap(); + + // 7. Advance the offset buffer by the last value in the previous buffer + let offset_buffer_iter = offset_buffer + .iter() + // Skip the first value as it is the initial offset of 0 + .skip(1) + .map(move |&x| x + advance_by); + + // 8. concat the current buffer with the previous buffer + // Chaining keeps the iterator have trusting length + offsets_iter = Box::new(offsets_iter.chain(offset_buffer_iter)); + + // 9. Update the next advance_by + advance_by += last_value; + } + + unsafe { + Buffer::from_trusted_len_iter(offsets_iter) + } +} + macro_rules! dict_helper { ($t:ty, $arrays:expr) => { return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _) }; } +macro_rules! list_dict_helper { + ($t:ty, $o: ty, $arrays:expr) => { + return Ok(Arc::new(concat_list_of_dictionaries::<$o, $t>($arrays)?) as _) + }; +} + fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities { match data_type { DataType::Utf8 => binary_capacity::(arrays), @@ -169,6 +308,21 @@ pub fn concat(arrays: &[&dyn Array]) -> Result { _ => unreachable!("illegal dictionary key type {k}") }; } else { + if let DataType::List(field) = d { + if let DataType::Dictionary(k, _) = field.data_type() { + return downcast_integer! { + k.as_ref() => (list_dict_helper, i32, arrays), + _ => unreachable!("illegal dictionary key type {k}") + }; + } + } else if let DataType::LargeList(field) = d { + if let DataType::Dictionary(k, _) = field.data_type() { + return downcast_integer! { + k.as_ref() => (list_dict_helper, i64, arrays), + _ => unreachable!("illegal dictionary key type {k}") + }; + } + } let capacity = get_capacity(arrays, d); concat_fallback(arrays, capacity) } @@ -228,8 +382,9 @@ pub fn concat_batches<'a>( #[cfg(test)] mod tests { use super::*; - use arrow_array::builder::StringDictionaryBuilder; + use arrow_array::builder::{GenericListBuilder, StringDictionaryBuilder}; use arrow_schema::{Field, Schema}; + use std::fmt::Debug; #[test] fn test_concat_empty_vec() { @@ -851,4 +1006,170 @@ mod tests { assert_eq!(array.null_count(), 10); assert_eq!(array.logical_null_count(), 10); } + + #[test] + fn concat_dictionary_list_array_simple() { + let scalars = vec![ + create_single_row_list_of_dict(vec![Some("a")]), + create_single_row_list_of_dict(vec![Some("a")]), + create_single_row_list_of_dict(vec![Some("b")]), + ]; + + let arrays = scalars.iter().map(|a| a as &(dyn Array)).collect::>(); + let concat_res = concat(arrays.as_slice()).unwrap(); + + let expected_list = create_list_of_dict(vec![ + // Row 1 + Some(vec![Some("a")]), + Some(vec![Some("a")]), + Some(vec![Some("b")]), + ]); + + let list = concat_res.as_list::(); + + // Assert that the list is equal to the expected list + list.iter().zip(expected_list.iter()).for_each(|(a, b)| { + assert_eq!(a, b); + }); + + let dict = list + .values() + .as_dictionary::() + .downcast_dict::() + .unwrap(); + println!("{:?}", dict); + + assert_dictionary_has_unique_values::<_, StringArray>( + list.values().as_dictionary::(), + ); + } + + #[test] + fn concat_dictionary_list_array_with_multiple_rows() { + let scalars = vec![ + create_list_of_dict(vec![ + // Row 1 + Some(vec![Some("a"), Some("c")]), + // Row 2 + None, + // Row 3 + Some(vec![Some("f"), Some("g"), None]), + // Row 4 + Some(vec![Some("c"), Some("f")]), + ]), + create_list_of_dict(vec![ + // Row 1 + Some(vec![Some("a")]), + // Row 2 + Some(vec![]), + // Row 3 + Some(vec![None, Some("b")]), + // Row 4 + Some(vec![Some("d"), Some("e")]), + ]), + create_list_of_dict(vec![ + // Row 1 + Some(vec![Some("g")]), + // Row 2 + Some(vec![Some("h"), Some("i")]), + // Row 3 + Some(vec![Some("j"), Some("a")]), + // Row 4 + Some(vec![Some("d"), Some("e")]), + ]), + ]; + let arrays = scalars + .iter() + .map(|a| a as &(dyn Array)) + .collect::>(); + let concat_res = concat(arrays.as_slice()).unwrap(); + + let expected_list = create_list_of_dict(vec![ + // First list: + + // Row 1 + Some(vec![Some("a"), Some("c")]), + // Row 2 + None, + // Row 3 + Some(vec![Some("f"), Some("g"), None]), + // Row 4 + Some(vec![Some("c"), Some("f")]), + // Second list: + // Row 1 + Some(vec![Some("a")]), + // Row 2 + Some(vec![]), + // Row 3 + Some(vec![None, Some("b")]), + // Row 4 + Some(vec![Some("d"), Some("e")]), + // Third list: + + // Row 1 + Some(vec![Some("g")]), + // Row 2 + Some(vec![Some("h"), Some("i")]), + // Row 3 + Some(vec![Some("j"), Some("a")]), + // Row 4 + Some(vec![Some("d"), Some("e")]), + ]); + + let list = concat_res.as_list::(); + + // Assert that the list is equal to the expected list + list.iter().zip(expected_list.iter()).for_each(|(a, b)| { + assert_eq!(a, b); + }); + + // Assert that the + assert_dictionary_has_unique_values::<_, StringArray>( + list.values().as_dictionary::(), + ); + } + + fn create_single_row_list_of_dict(list_items: Vec>) -> GenericListArray { + let rows = list_items.into_iter().map(|row| Some(row)).collect(); + + create_list_of_dict(vec![rows]) + } + + fn create_list_of_dict(rows: Vec>>>) -> GenericListArray { + let mut builder = + GenericListBuilder::::new(StringDictionaryBuilder::::new()); + + for row in rows { + builder.append_option(row); + } + + builder.finish() + } + + // TODO - use already exists helper or make it use this one + fn assert_dictionary_has_unique_values<'a, K, V: 'static>( + array: &'a DictionaryArray, + ) where + K: ArrowDictionaryKeyType, + V: Sync + Send, + &'a V: ArrayAccessor + IntoIterator, + + <&'a V as ArrayAccessor>::Item: Default + Clone + PartialEq + Debug + Ord, + <&'a V as IntoIterator>::Item: Clone + PartialEq + Debug + Ord, + { + let dict = array.downcast_dict::().unwrap(); + let mut values = dict.values().clone().into_iter().collect::>(); + + // remove duplicates must be sorted first so we can compare + values.sort(); + + let mut unique_values = values.clone(); + + unique_values.dedup(); + + assert_eq!( + values, unique_values, + "There are duplicates in the value list (the value list here is sorted which is only for the assertion)" + ); + } } From 66825fb70f13e642aa98c409a45e8df470217151 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 17:03:07 +0200 Subject: [PATCH 02/16] fix concat lists of dictionaries --- arrow-select/src/concat.rs | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 43a707aa1979..2ae060ebc938 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -135,31 +135,43 @@ fn concat_list_of_dictionaries Result { let mut output_len = 0; + let mut list_has_nulls = false; + let lists = arrays .iter() .map(|x| x.as_list::()) + .inspect(|l| { + output_len += l.len(); + list_has_nulls |= l.null_count() != 0; + }) .collect::>(); + let mut dictionary_output_len = 0; let dictionaries: Vec<_> = lists .iter() .map(|x| x.values().as_ref().as_dictionary::()) - // TODO? - .inspect(|d| output_len += d.len()) + .inspect(|d| dictionary_output_len += d.len()) .collect(); - if !should_merge_dictionary_values::(&dictionaries, output_len) { + if !should_merge_dictionary_values::(&dictionaries, dictionary_output_len) { return concat_fallback(arrays, Capacities::Array(output_len)); } let merged = merge_dictionary_values(&dictionaries, None)?; - let lists_nulls = lists - .iter() - .fold(None, |acc, a| NullBuffer::union(acc.as_ref(), a.nulls())); - + let lists_nulls = list_has_nulls.then(|| { + let mut nulls = BooleanBufferBuilder::new(output_len); + for l in &lists { + match l.nulls() { + Some(n) => nulls.append_buffer(n.inner()), + None => nulls.append_n(l.len(), true), + } + } + NullBuffer::new(nulls.finish()) + }); // Recompute keys - let mut key_values = Vec::with_capacity(output_len); + let mut key_values = Vec::with_capacity(dictionary_output_len); let mut dictionary_has_nulls = false; for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) { @@ -171,7 +183,7 @@ fn concat_list_of_dictionaries nulls.append_buffer(n.inner()), @@ -183,7 +195,7 @@ fn concat_list_of_dictionaries::new(key_values.into(), dictionary_nulls); // Sanity check - assert_eq!(keys.len(), output_len); + assert_eq!(keys.len(), dictionary_output_len); let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) }; @@ -1146,7 +1158,6 @@ mod tests { builder.finish() } - // TODO - use already exists helper or make it use this one fn assert_dictionary_has_unique_values<'a, K, V: 'static>( array: &'a DictionaryArray, ) where From e14d09c63eb26f1789143a4115da95a51350868b Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 15:12:48 +0000 Subject: [PATCH 03/16] format --- arrow-select/src/concat.rs | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 2ae060ebc938..5c3de5140859 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -200,9 +200,7 @@ fn concat_list_of_dictionaries>>(offset_buffers_iterator: I) -> Buffer { +fn merge_value_offsets< + 'a, + OffsetSize: OffsetSizeTrait, + I: Iterator>, +>( + offset_buffers_iterator: I, +) -> Buffer { // 1. Filter out empty lists let mut offset_buffers_iterator = offset_buffers_iterator.filter(|x| !x.is_empty()); @@ -237,12 +241,13 @@ fn merge_value_offsets<'a, OffsetSize: OffsetSizeTrait, I: Iterator> = Box::new(starting_buffer.iter().copied()); + let mut offsets_iter: Box> = + Box::new(starting_buffer.iter().copied()); // 4. Get the last value in the starting buffer as the starting point for the next buffer // Safety: We already filtered out empty lists @@ -269,9 +274,7 @@ fn merge_value_offsets<'a, OffsetSize: OffsetSizeTrait, I: Iterator>(); + let arrays = scalars + .iter() + .map(|a| a as &(dyn Array)) + .collect::>(); let concat_res = concat(arrays.as_slice()).unwrap(); let expected_list = create_list_of_dict(vec![ @@ -1049,7 +1055,6 @@ mod tests { .as_dictionary::() .downcast_dict::() .unwrap(); - println!("{:?}", dict); assert_dictionary_has_unique_values::<_, StringArray>( list.values().as_dictionary::(), @@ -1135,13 +1140,14 @@ mod tests { assert_eq!(a, b); }); - // Assert that the assert_dictionary_has_unique_values::<_, StringArray>( list.values().as_dictionary::(), ); } - fn create_single_row_list_of_dict(list_items: Vec>) -> GenericListArray { + fn create_single_row_list_of_dict( + list_items: Vec>, + ) -> GenericListArray { let rows = list_items.into_iter().map(|row| Some(row)).collect(); create_list_of_dict(vec![rows]) @@ -1158,9 +1164,8 @@ mod tests { builder.finish() } - fn assert_dictionary_has_unique_values<'a, K, V: 'static>( - array: &'a DictionaryArray, - ) where + fn assert_dictionary_has_unique_values<'a, K, V: 'static>(array: &'a DictionaryArray) + where K: ArrowDictionaryKeyType, V: Sync + Send, &'a V: ArrayAccessor + IntoIterator, From e66f9bbb86936a213407254c8d5919bcc78aea2e Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 15:19:24 +0000 Subject: [PATCH 04/16] remove unused import --- arrow-select/src/concat.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 5c3de5140859..79f8427dd3e2 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -38,7 +38,6 @@ use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, Buffer, NullBuffer, Of use arrow_data::transform::{Capacities, MutableArrayData}; use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType, SchemaRef}; -use num::Saturating; use std::sync::Arc; fn binary_capacity(arrays: &[&dyn Array]) -> Capacities { From 3fbe1deb5f7dddf7973d3627c785db8721859454 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 15:32:02 +0000 Subject: [PATCH 05/16] improve test helper --- arrow-select/src/concat.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 79f8427dd3e2..66de9fb43c76 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -1145,14 +1145,16 @@ mod tests { } fn create_single_row_list_of_dict( - list_items: Vec>, + list_items: Vec>>, ) -> GenericListArray { let rows = list_items.into_iter().map(|row| Some(row)).collect(); create_list_of_dict(vec![rows]) } - fn create_list_of_dict(rows: Vec>>>) -> GenericListArray { + fn create_list_of_dict( + rows: Vec>>>>, + ) -> GenericListArray { let mut builder = GenericListBuilder::::new(StringDictionaryBuilder::::new()); From 80ee6760b16d053ffe0e7c8c7b7c86a4b6ab11f3 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 21:00:42 +0200 Subject: [PATCH 06/16] feat: add merge offset buffers into one --- arrow-buffer/src/buffer/offset.rs | 194 +++++++++++++++++++++++++++++- 1 file changed, 193 insertions(+), 1 deletion(-) diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index a6be2b67af84..fc48ac053331 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -16,7 +16,8 @@ // under the License. use crate::buffer::ScalarBuffer; -use crate::{ArrowNativeType, MutableBuffer, OffsetBufferBuilder}; +use crate::{ArrowNativeType, Buffer, MutableBuffer, OffsetBufferBuilder}; +use num::Integer; use std::ops::Deref; /// A non-empty buffer of monotonically increasing, positive integers. @@ -162,6 +163,47 @@ impl OffsetBuffer { } } +impl OffsetBuffer { + + /// Merge multiple [`OffsetBuffer`] into a single [`OffsetBuffer`] + /// + /// + /// ``` + /// # use arrow_buffer::{OffsetBuffer, ScalarBuffer}; + /// // [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]] + /// // The output should be + /// // [ 0, 3, 5, 7, 7, 13, 13, 14] + /// let buffers = [ + /// OffsetBuffer::::new(ScalarBuffer::from(vec![0, 3, 5])), + /// OffsetBuffer::::new(ScalarBuffer::from(vec![0, 2, 2, 8])), + /// OffsetBuffer::::new_empty(), + /// OffsetBuffer::::new(ScalarBuffer::from(vec![0, 0, 1])), + /// ]; + /// + /// let buffer = OffsetBuffer::::merge(&buffers); + /// assert_eq!(buffer.as_ref(), &[ 0, 3, 5, 7, 7, 13, 13, 14]); + /// ``` + /// + pub fn merge<'a, Iter>(offset_buffers_iterator: Iter) -> Self + where + Iter: IntoIterator>, + ::IntoIter: 'a + Clone, + { + let iter = MergeBuffersIter::from( + offset_buffers_iterator.into_iter(), + ); + if iter.len() == 0 { + return Self::new_empty(); + } + + let buffer = unsafe { Buffer::from_trusted_len_iter(iter) }; + + let scalar_buffer: ScalarBuffer = buffer.into(); + + Self::new(scalar_buffer) + } +} + impl Deref for OffsetBuffer { type Target = [T]; @@ -184,6 +226,127 @@ impl From> for OffsetBuffer { } } +struct MergeBuffersIter<'a, Offset: Integer + Copy> { + size: usize, + iterator: Box + 'a>, + inner_iterator: Box + 'a>, + advance_by: Offset, + next_advance_by: Offset, +} + +impl<'a, Offset, Iter> From for MergeBuffersIter<'a, Offset> +where + Offset: ArrowNativeType + Integer + Copy, + Iter: Iterator> + Clone + 'a, +{ + fn from(offset_buffers: Iter) -> Self { + Self::new(offset_buffers.clone(), Self::calculate_size(offset_buffers)) + } +} + +impl<'a, Offset: ArrowNativeType + Integer + Copy> MergeBuffersIter<'a, Offset> { + fn new( + offset_buffers_iterator: impl Iterator> + 'a, + size: usize, + ) -> Self { + let offsets_iterator: Box> = Box::new( + offset_buffers_iterator + // Filter out empty lists or lists with only 1 offset which are invalid as they should have at least 2 offsets (start and end) + .filter(|offset_buffer| offset_buffer.len() > 1) + .map(|offset_buffer| offset_buffer.inner().as_ref()), + ); + + Self { + size, + iterator: Box::new(offsets_iterator), + inner_iterator: if size == 0 { + Box::new([].into_iter()) + } else { + // Start initially with outputting the initial offset + Box::new([Offset::zero()].into_iter()) + }, + advance_by: Offset::zero(), + next_advance_by: Offset::zero(), + } + } + + fn calculate_size(buffers: impl Iterator>) -> usize { + // The total length of the merged offset buffer + // We calculate this so we can use the faster `try_from_trusted_len_iter` method which requires fixed length + let merged_offset_length: usize = buffers + // 1. `saturating_sub` as the list can be empty + // 2. subtract 1 as we have the initial offset of 0 that we don't need to count for each list, and we are adding 1 at the end + .map(|x| x.len().saturating_sub(1)) + .sum(); + + if merged_offset_length == 0 { + return 0; + } + + // we need to add 1 to the total length of the merged offset buffer as we have the initial offset of 0 + merged_offset_length + 1 + } +} + +impl<'a, Offset: ArrowNativeType + Integer + Copy> Iterator for MergeBuffersIter<'a, Offset> { + type Item = Offset; + + fn next(&mut self) -> Option { + // 1. Consume the inner iterator first + let inner_value = self.inner_iterator.next(); + + // 2. If we have a value, advance it by the last value in the previous buffer (advance_by) + if inner_value.is_some() { + self.size -= 1; + return Some(inner_value.unwrap() + self.advance_by); + } + + self.advance_by = self.next_advance_by; + + let current_offset_buffer = self.iterator.next(); + + // 3. If no more iterators, than we finished + if current_offset_buffer.is_none() { + return None; + } + + let current_offset_buffer = current_offset_buffer.unwrap(); + + // 4. Get the last value of the current buffer so we can know how much to advance the next buffer + // Safety: We already filtered out empty lists + let last_value = *current_offset_buffer.last().unwrap(); + + // 5. Update the next advance_by + self.next_advance_by = self.advance_by + last_value; + + self.inner_iterator = Box::new( + current_offset_buffer + .into_iter() + // 6. Skip the initial offset of 0 + // Skipping the first item as it is the initial offset of 0, + // and we skip even for the first iterator as we handle that by starting with inner_iterator of [0] + .skip(1) + .copied(), + ); + + // 7. Resume the inner iterator + // We already filtered out lists that have less than 2 offsets so can guarantee that the next call will return a value + self.next() + } + + fn size_hint(&self) -> (usize, Option) { + (self.size, Some(self.size)) + } +} + +impl<'a, Offset: ArrowNativeType + Integer + Copy> ExactSizeIterator + for MergeBuffersIter<'a, Offset> +{ + fn len(&self) -> usize { + self.size + } +} + #[cfg(test)] mod tests { use super::*; @@ -244,4 +407,33 @@ mod tests { fn from_lengths_usize_overflow() { OffsetBuffer::::from_lengths([usize::MAX, 1]); } + + #[test] + fn merge_from() { + // [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]] + // The output should be + // [ 0, 3, 5, 7, 7, 13, 13, 14] + // + let buffers = [ + OffsetBuffer::::new(ScalarBuffer::from(vec![0, 3, 5])), + OffsetBuffer::::new(ScalarBuffer::from(vec![0, 2, 2, 8])), + OffsetBuffer::::new_empty(), + OffsetBuffer::::new(ScalarBuffer::from(vec![0, 0, 1])), + ]; + + let buffer = OffsetBuffer::::merge(&buffers); + assert_eq!(buffer.as_ref(), &[ 0, 3, 5, 7, 7, 13, 13, 14]); + } + + #[test] + fn merge_from_empty() { + let buffers = [ + OffsetBuffer::::new_empty(), + OffsetBuffer::::new_empty(), + OffsetBuffer::::new_empty(), + ]; + + let buffer = OffsetBuffer::::merge(&buffers); + assert_eq!(buffer.as_ref(), OffsetBuffer::::new_empty().as_ref()); + } } From 69e3315f446a54e1f1c51f96fea3fec209de3f02 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 21:01:35 +0200 Subject: [PATCH 07/16] format --- arrow-buffer/src/buffer/offset.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index fc48ac053331..d0a471597df2 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -164,7 +164,6 @@ impl OffsetBuffer { } impl OffsetBuffer { - /// Merge multiple [`OffsetBuffer`] into a single [`OffsetBuffer`] /// /// @@ -189,9 +188,7 @@ impl OffsetBuffer { Iter: IntoIterator>, ::IntoIter: 'a + Clone, { - let iter = MergeBuffersIter::from( - offset_buffers_iterator.into_iter(), - ); + let iter = MergeBuffersIter::from(offset_buffers_iterator.into_iter()); if iter.len() == 0 { return Self::new_empty(); } @@ -422,7 +419,7 @@ mod tests { ]; let buffer = OffsetBuffer::::merge(&buffers); - assert_eq!(buffer.as_ref(), &[ 0, 3, 5, 7, 7, 13, 13, 14]); + assert_eq!(buffer.as_ref(), &[0, 3, 5, 7, 7, 13, 13, 14]); } #[test] From 338c90e77dad1d20563f1af065719007e7d987cc Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:11:27 +0000 Subject: [PATCH 08/16] add reproduction tst --- arrow-select/src/concat.rs | 43 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 66de9fb43c76..d07a9939ed7c 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -1060,6 +1060,49 @@ mod tests { ); } + #[test] + fn concat_many_dictionary_list_arrays() { + let number_of_unique_values = 8; + let scalars = (0..80000) + .into_iter() + .map(|i| { + create_single_row_list_of_dict(vec![Some( + (i % number_of_unique_values).to_string(), + )]) + }) + .collect::>(); + + let arrays = scalars + .iter() + .map(|a| a as &(dyn Array)) + .collect::>(); + let concat_res = concat(arrays.as_slice()).unwrap(); + + let expected_list = create_list_of_dict( + (0..80000) + .into_iter() + .map(|i| Some(vec![Some((i % number_of_unique_values).to_string())])) + .collect::>(), + ); + + let list = concat_res.as_list::(); + + // Assert that the list is equal to the expected list + list.iter().zip(expected_list.iter()).for_each(|(a, b)| { + assert_eq!(a, b); + }); + + let dict = list + .values() + .as_dictionary::() + .downcast_dict::() + .unwrap(); + + assert_dictionary_has_unique_values::<_, StringArray>( + list.values().as_dictionary::(), + ); + } + #[test] fn concat_dictionary_list_array_with_multiple_rows() { let scalars = vec![ From 95cbf0c18210c3fec496a0decec522a24ae17eb5 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:13:34 +0000 Subject: [PATCH 09/16] recommit --- arrow-select/src/concat.rs | 66 +++----------------------------------- 1 file changed, 4 insertions(+), 62 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index d07a9939ed7c..5a95bea36e90 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -34,7 +34,7 @@ use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values} use arrow_array::cast::AsArray; use arrow_array::types::*; use arrow_array::*; -use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer}; +use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, OffsetBuffer}; use arrow_data::transform::{Capacities, MutableArrayData}; use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType, SchemaRef}; @@ -199,9 +199,9 @@ fn concat_list_of_dictionaries>, ->( - offset_buffers_iterator: I, -) -> Buffer { - // 1. Filter out empty lists - let mut offset_buffers_iterator = offset_buffers_iterator.filter(|x| !x.is_empty()); - - // 2. Get first non-empty list as the starting point - let starting_buffer = offset_buffers_iterator.next(); - - // 3. If we have only empty lists, return an empty buffer - if starting_buffer.is_none() { - return Buffer::from(&[]); - } - - let starting_buffer = starting_buffer.unwrap(); - - let mut offsets_iter: Box> = - Box::new(starting_buffer.iter().copied()); - - // 4. Get the last value in the starting buffer as the starting point for the next buffer - // Safety: We already filtered out empty lists - let mut advance_by = *starting_buffer.last().unwrap(); - - // 5. Iterate over the remaining buffers - for offset_buffer in offset_buffers_iterator { - // 6. Get the last value of the current buffer so we can know how much to advance the next buffer - // Safety: We already filtered out empty lists - let last_value = *offset_buffer.last().unwrap(); - - // 7. Advance the offset buffer by the last value in the previous buffer - let offset_buffer_iter = offset_buffer - .iter() - // Skip the first value as it is the initial offset of 0 - .skip(1) - .map(move |&x| x + advance_by); - - // 8. concat the current buffer with the previous buffer - // Chaining keeps the iterator have trusting length - offsets_iter = Box::new(offsets_iter.chain(offset_buffer_iter)); - - // 9. Update the next advance_by - advance_by += last_value; - } - - unsafe { Buffer::from_trusted_len_iter(offsets_iter) } -} - macro_rules! dict_helper { ($t:ty, $arrays:expr) => { return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _) From e5290301a69eac2d3550aa9363e0fb7af60e4f6f Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:16:44 +0000 Subject: [PATCH 10/16] fix clippy --- arrow-buffer/src/buffer/offset.rs | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index d0a471597df2..ce222540c929 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -285,7 +285,7 @@ impl<'a, Offset: ArrowNativeType + Integer + Copy> MergeBuffersIter<'a, Offset> } } -impl<'a, Offset: ArrowNativeType + Integer + Copy> Iterator for MergeBuffersIter<'a, Offset> { +impl Iterator for MergeBuffersIter<'_, Offset> { type Item = Offset; fn next(&mut self) -> Option { @@ -300,14 +300,8 @@ impl<'a, Offset: ArrowNativeType + Integer + Copy> Iterator for MergeBuffersIter self.advance_by = self.next_advance_by; - let current_offset_buffer = self.iterator.next(); - // 3. If no more iterators, than we finished - if current_offset_buffer.is_none() { - return None; - } - - let current_offset_buffer = current_offset_buffer.unwrap(); + let current_offset_buffer = self.iterator.next()?; // 4. Get the last value of the current buffer so we can know how much to advance the next buffer // Safety: We already filtered out empty lists @@ -318,7 +312,7 @@ impl<'a, Offset: ArrowNativeType + Integer + Copy> Iterator for MergeBuffersIter self.inner_iterator = Box::new( current_offset_buffer - .into_iter() + .iter() // 6. Skip the initial offset of 0 // Skipping the first item as it is the initial offset of 0, // and we skip even for the first iterator as we handle that by starting with inner_iterator of [0] @@ -336,9 +330,7 @@ impl<'a, Offset: ArrowNativeType + Integer + Copy> Iterator for MergeBuffersIter } } -impl<'a, Offset: ArrowNativeType + Integer + Copy> ExactSizeIterator - for MergeBuffersIter<'a, Offset> -{ +impl ExactSizeIterator for MergeBuffersIter<'_, Offset> { fn len(&self) -> usize { self.size } From bb9fa2276eac66b9e352e5d128fab1ea66a1de36 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:25:38 +0000 Subject: [PATCH 11/16] fix clippy --- arrow-select/src/concat.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 5a95bea36e90..a8e353876034 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -266,14 +266,14 @@ pub fn concat(arrays: &[&dyn Array]) -> Result { } else { if let DataType::List(field) = d { if let DataType::Dictionary(k, _) = field.data_type() { - return downcast_integer! { + downcast_integer! { k.as_ref() => (list_dict_helper, i32, arrays), _ => unreachable!("illegal dictionary key type {k}") }; } } else if let DataType::LargeList(field) = d { if let DataType::Dictionary(k, _) = field.data_type() { - return downcast_integer! { + downcast_integer! { k.as_ref() => (list_dict_helper, i64, arrays), _ => unreachable!("illegal dictionary key type {k}") }; From 05cf96844e55101ec3da3092fff026ab58146d66 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:35:11 +0000 Subject: [PATCH 12/16] fix clippy --- arrow-select/src/concat.rs | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index a8e353876034..58242ba03b34 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -991,12 +991,6 @@ mod tests { assert_eq!(a, b); }); - let dict = list - .values() - .as_dictionary::() - .downcast_dict::() - .unwrap(); - assert_dictionary_has_unique_values::<_, StringArray>( list.values().as_dictionary::(), ); @@ -1006,7 +1000,6 @@ mod tests { fn concat_many_dictionary_list_arrays() { let number_of_unique_values = 8; let scalars = (0..80000) - .into_iter() .map(|i| { create_single_row_list_of_dict(vec![Some( (i % number_of_unique_values).to_string(), @@ -1022,7 +1015,6 @@ mod tests { let expected_list = create_list_of_dict( (0..80000) - .into_iter() .map(|i| Some(vec![Some((i % number_of_unique_values).to_string())])) .collect::>(), ); @@ -1034,12 +1026,6 @@ mod tests { assert_eq!(a, b); }); - let dict = list - .values() - .as_dictionary::() - .downcast_dict::() - .unwrap(); - assert_dictionary_has_unique_values::<_, StringArray>( list.values().as_dictionary::(), ); @@ -1132,7 +1118,7 @@ mod tests { fn create_single_row_list_of_dict( list_items: Vec>>, ) -> GenericListArray { - let rows = list_items.into_iter().map(|row| Some(row)).collect(); + let rows = list_items.into_iter().map(Some).collect(); create_list_of_dict(vec![rows]) } @@ -1150,17 +1136,17 @@ mod tests { builder.finish() } - fn assert_dictionary_has_unique_values<'a, K, V: 'static>(array: &'a DictionaryArray) + fn assert_dictionary_has_unique_values<'a, K, V>(array: &'a DictionaryArray) where K: ArrowDictionaryKeyType, - V: Sync + Send, + V: Sync + Send + 'static, &'a V: ArrayAccessor + IntoIterator, <&'a V as ArrayAccessor>::Item: Default + Clone + PartialEq + Debug + Ord, <&'a V as IntoIterator>::Item: Clone + PartialEq + Debug + Ord, { let dict = array.downcast_dict::().unwrap(); - let mut values = dict.values().clone().into_iter().collect::>(); + let mut values = dict.values().into_iter().collect::>(); // remove duplicates must be sorted first so we can compare values.sort(); From ec7e135f620e0c9ad1df73baeaa9a66c66382094 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sat, 21 Dec 2024 17:59:25 +0200 Subject: [PATCH 13/16] improve offsets code according to code review --- arrow-buffer/src/buffer/offset.rs | 223 +++++++----------------------- arrow-select/src/concat.rs | 91 +----------- 2 files changed, 51 insertions(+), 263 deletions(-) diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index ce222540c929..164af6f01d0e 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -16,8 +16,7 @@ // under the License. use crate::buffer::ScalarBuffer; -use crate::{ArrowNativeType, Buffer, MutableBuffer, OffsetBufferBuilder}; -use num::Integer; +use crate::{ArrowNativeType, MutableBuffer, OffsetBufferBuilder}; use std::ops::Deref; /// A non-empty buffer of monotonically increasing, positive integers. @@ -134,6 +133,38 @@ impl OffsetBuffer { Self(out.into()) } + /// Get an Iterator over the lengths of this [`OffsetBuffer`] + /// + /// ``` + /// # use arrow_buffer::{OffsetBuffer, ScalarBuffer}; + /// let offsets = OffsetBuffer::<_>::new(ScalarBuffer::::from(vec![0, 1, 4, 9])); + /// assert_eq!(offsets.lengths().collect::>(), vec![1, 3, 5]); + /// ``` + /// + /// Empty [`OffsetBuffer`] will return an empty iterator + /// ``` + /// # use arrow_buffer::OffsetBuffer; + /// let offsets = OffsetBuffer::::new_empty(); + /// assert_eq!(offsets.lengths().count(), 0); + /// ``` + /// + /// This can be used to merge multiple [`OffsetBuffer`]s to one + /// ``` + /// # use arrow_buffer::{OffsetBuffer, ScalarBuffer}; + /// + /// let buffer1 = OffsetBuffer::::from_lengths([2, 6, 3, 7, 2]); + /// let buffer2 = OffsetBuffer::::from_lengths([1, 3, 5, 7, 9]); + /// + /// let merged = OffsetBuffer::::from_lengths( + /// vec![buffer1, buffer2].iter().flat_map(|x| x.lengths()) + /// ); + /// + /// assert_eq!(merged.lengths().collect::>(), &[2, 6, 3, 7, 2, 1, 3, 5, 7, 9]); + /// ``` + pub fn lengths(&self) -> impl ExactSizeIterator + '_ { + self.0.windows(2).map(|x| x[1].as_usize() - x[0].as_usize()) + } + /// Free up unused memory. pub fn shrink_to_fit(&mut self) { self.0.shrink_to_fit(); @@ -163,44 +194,6 @@ impl OffsetBuffer { } } -impl OffsetBuffer { - /// Merge multiple [`OffsetBuffer`] into a single [`OffsetBuffer`] - /// - /// - /// ``` - /// # use arrow_buffer::{OffsetBuffer, ScalarBuffer}; - /// // [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]] - /// // The output should be - /// // [ 0, 3, 5, 7, 7, 13, 13, 14] - /// let buffers = [ - /// OffsetBuffer::::new(ScalarBuffer::from(vec![0, 3, 5])), - /// OffsetBuffer::::new(ScalarBuffer::from(vec![0, 2, 2, 8])), - /// OffsetBuffer::::new_empty(), - /// OffsetBuffer::::new(ScalarBuffer::from(vec![0, 0, 1])), - /// ]; - /// - /// let buffer = OffsetBuffer::::merge(&buffers); - /// assert_eq!(buffer.as_ref(), &[ 0, 3, 5, 7, 7, 13, 13, 14]); - /// ``` - /// - pub fn merge<'a, Iter>(offset_buffers_iterator: Iter) -> Self - where - Iter: IntoIterator>, - ::IntoIter: 'a + Clone, - { - let iter = MergeBuffersIter::from(offset_buffers_iterator.into_iter()); - if iter.len() == 0 { - return Self::new_empty(); - } - - let buffer = unsafe { Buffer::from_trusted_len_iter(iter) }; - - let scalar_buffer: ScalarBuffer = buffer.into(); - - Self::new(scalar_buffer) - } -} - impl Deref for OffsetBuffer { type Target = [T]; @@ -223,119 +216,6 @@ impl From> for OffsetBuffer { } } -struct MergeBuffersIter<'a, Offset: Integer + Copy> { - size: usize, - iterator: Box + 'a>, - inner_iterator: Box + 'a>, - advance_by: Offset, - next_advance_by: Offset, -} - -impl<'a, Offset, Iter> From for MergeBuffersIter<'a, Offset> -where - Offset: ArrowNativeType + Integer + Copy, - Iter: Iterator> + Clone + 'a, -{ - fn from(offset_buffers: Iter) -> Self { - Self::new(offset_buffers.clone(), Self::calculate_size(offset_buffers)) - } -} - -impl<'a, Offset: ArrowNativeType + Integer + Copy> MergeBuffersIter<'a, Offset> { - fn new( - offset_buffers_iterator: impl Iterator> + 'a, - size: usize, - ) -> Self { - let offsets_iterator: Box> = Box::new( - offset_buffers_iterator - // Filter out empty lists or lists with only 1 offset which are invalid as they should have at least 2 offsets (start and end) - .filter(|offset_buffer| offset_buffer.len() > 1) - .map(|offset_buffer| offset_buffer.inner().as_ref()), - ); - - Self { - size, - iterator: Box::new(offsets_iterator), - inner_iterator: if size == 0 { - Box::new([].into_iter()) - } else { - // Start initially with outputting the initial offset - Box::new([Offset::zero()].into_iter()) - }, - advance_by: Offset::zero(), - next_advance_by: Offset::zero(), - } - } - - fn calculate_size(buffers: impl Iterator>) -> usize { - // The total length of the merged offset buffer - // We calculate this so we can use the faster `try_from_trusted_len_iter` method which requires fixed length - let merged_offset_length: usize = buffers - // 1. `saturating_sub` as the list can be empty - // 2. subtract 1 as we have the initial offset of 0 that we don't need to count for each list, and we are adding 1 at the end - .map(|x| x.len().saturating_sub(1)) - .sum(); - - if merged_offset_length == 0 { - return 0; - } - - // we need to add 1 to the total length of the merged offset buffer as we have the initial offset of 0 - merged_offset_length + 1 - } -} - -impl Iterator for MergeBuffersIter<'_, Offset> { - type Item = Offset; - - fn next(&mut self) -> Option { - // 1. Consume the inner iterator first - let inner_value = self.inner_iterator.next(); - - // 2. If we have a value, advance it by the last value in the previous buffer (advance_by) - if inner_value.is_some() { - self.size -= 1; - return Some(inner_value.unwrap() + self.advance_by); - } - - self.advance_by = self.next_advance_by; - - // 3. If no more iterators, than we finished - let current_offset_buffer = self.iterator.next()?; - - // 4. Get the last value of the current buffer so we can know how much to advance the next buffer - // Safety: We already filtered out empty lists - let last_value = *current_offset_buffer.last().unwrap(); - - // 5. Update the next advance_by - self.next_advance_by = self.advance_by + last_value; - - self.inner_iterator = Box::new( - current_offset_buffer - .iter() - // 6. Skip the initial offset of 0 - // Skipping the first item as it is the initial offset of 0, - // and we skip even for the first iterator as we handle that by starting with inner_iterator of [0] - .skip(1) - .copied(), - ); - - // 7. Resume the inner iterator - // We already filtered out lists that have less than 2 offsets so can guarantee that the next call will return a value - self.next() - } - - fn size_hint(&self) -> (usize, Option) { - (self.size, Some(self.size)) - } -} - -impl ExactSizeIterator for MergeBuffersIter<'_, Offset> { - fn len(&self) -> usize { - self.size - } -} - #[cfg(test)] mod tests { use super::*; @@ -398,31 +278,22 @@ mod tests { } #[test] - fn merge_from() { - // [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]] - // The output should be - // [ 0, 3, 5, 7, 7, 13, 13, 14] - // - let buffers = [ - OffsetBuffer::::new(ScalarBuffer::from(vec![0, 3, 5])), - OffsetBuffer::::new(ScalarBuffer::from(vec![0, 2, 2, 8])), - OffsetBuffer::::new_empty(), - OffsetBuffer::::new(ScalarBuffer::from(vec![0, 0, 1])), - ]; - - let buffer = OffsetBuffer::::merge(&buffers); - assert_eq!(buffer.as_ref(), &[0, 3, 5, 7, 7, 13, 13, 14]); + fn get_lengths() { + let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 1, 4, 9])); + assert_eq!(offsets.lengths().collect::>(), vec![1, 3, 5]); + } + + #[test] + fn get_lengths_should_be_with_fixed_size() { + let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 1, 4, 9])); + let iter = offsets.lengths(); + assert_eq!(iter.size_hint(), (3, Some(3))); + assert_eq!(iter.len(), 3); } #[test] - fn merge_from_empty() { - let buffers = [ - OffsetBuffer::::new_empty(), - OffsetBuffer::::new_empty(), - OffsetBuffer::::new_empty(), - ]; - - let buffer = OffsetBuffer::::merge(&buffers); - assert_eq!(buffer.as_ref(), OffsetBuffer::::new_empty().as_ref()); + fn get_lengths_from_empty_offset_buffer_should_be_empty_iterator() { + let offsets = OffsetBuffer::::new_empty(); + assert_eq!(offsets.lengths().collect::>(), vec![]); } } diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 58242ba03b34..20b84bdc63a9 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -199,9 +199,10 @@ fn concat_list_of_dictionaries::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths())) + .into_inner() + .into_inner(); let builder = ArrayDataBuilder::new(arrays[0].data_type().clone()) .len(output_len) @@ -1031,90 +1032,6 @@ mod tests { ); } - #[test] - fn concat_dictionary_list_array_with_multiple_rows() { - let scalars = vec![ - create_list_of_dict(vec![ - // Row 1 - Some(vec![Some("a"), Some("c")]), - // Row 2 - None, - // Row 3 - Some(vec![Some("f"), Some("g"), None]), - // Row 4 - Some(vec![Some("c"), Some("f")]), - ]), - create_list_of_dict(vec![ - // Row 1 - Some(vec![Some("a")]), - // Row 2 - Some(vec![]), - // Row 3 - Some(vec![None, Some("b")]), - // Row 4 - Some(vec![Some("d"), Some("e")]), - ]), - create_list_of_dict(vec![ - // Row 1 - Some(vec![Some("g")]), - // Row 2 - Some(vec![Some("h"), Some("i")]), - // Row 3 - Some(vec![Some("j"), Some("a")]), - // Row 4 - Some(vec![Some("d"), Some("e")]), - ]), - ]; - let arrays = scalars - .iter() - .map(|a| a as &(dyn Array)) - .collect::>(); - let concat_res = concat(arrays.as_slice()).unwrap(); - - let expected_list = create_list_of_dict(vec![ - // First list: - - // Row 1 - Some(vec![Some("a"), Some("c")]), - // Row 2 - None, - // Row 3 - Some(vec![Some("f"), Some("g"), None]), - // Row 4 - Some(vec![Some("c"), Some("f")]), - // Second list: - // Row 1 - Some(vec![Some("a")]), - // Row 2 - Some(vec![]), - // Row 3 - Some(vec![None, Some("b")]), - // Row 4 - Some(vec![Some("d"), Some("e")]), - // Third list: - - // Row 1 - Some(vec![Some("g")]), - // Row 2 - Some(vec![Some("h"), Some("i")]), - // Row 3 - Some(vec![Some("j"), Some("a")]), - // Row 4 - Some(vec![Some("d"), Some("e")]), - ]); - - let list = concat_res.as_list::(); - - // Assert that the list is equal to the expected list - list.iter().zip(expected_list.iter()).for_each(|(a, b)| { - assert_eq!(a, b); - }); - - assert_dictionary_has_unique_values::<_, StringArray>( - list.values().as_dictionary::(), - ); - } - fn create_single_row_list_of_dict( list_items: Vec>>, ) -> GenericListArray { From 87c2865adb69efece34b210fbdf0c37302a5c688 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sat, 21 Dec 2024 18:12:14 +0200 Subject: [PATCH 14/16] use concat dictionaries --- arrow-select/src/concat.rs | 41 +++----------------------------------- 1 file changed, 3 insertions(+), 38 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 20b84bdc63a9..9dbae1f646c6 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -145,19 +145,11 @@ fn concat_list_of_dictionaries>(); - let mut dictionary_output_len = 0; let dictionaries: Vec<_> = lists .iter() - .map(|x| x.values().as_ref().as_dictionary::()) - .inspect(|d| dictionary_output_len += d.len()) + .map(|x| x.values().as_ref()) .collect(); - if !should_merge_dictionary_values::(&dictionaries, dictionary_output_len) { - return concat_fallback(arrays, Capacities::Array(output_len)); - } - - let merged = merge_dictionary_values(&dictionaries, None)?; - let lists_nulls = list_has_nulls.then(|| { let mut nulls = BooleanBufferBuilder::new(output_len); for l in &lists { @@ -169,34 +161,7 @@ fn concat_list_of_dictionaries nulls.append_buffer(n.inner()), - None => nulls.append_n(d.len(), true), - } - } - NullBuffer::new(nulls.finish()) - }); - - let keys = PrimitiveArray::::new(key_values.into(), dictionary_nulls); - // Sanity check - assert_eq!(keys.len(), dictionary_output_len); - - let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) }; + let concat_dictionaries = concat_dictionaries::(dictionaries.as_slice())?; // Merge value offsets from the lists let value_offset_buffer = @@ -210,7 +175,7 @@ fn concat_list_of_dictionaries Date: Sat, 21 Dec 2024 18:28:17 +0200 Subject: [PATCH 15/16] add specialize code to concat lists to be able to use the concat dictionary logic --- arrow-select/src/concat.rs | 55 ++++++++++++++------------------------ 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 9dbae1f646c6..91ce16e062a3 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -130,7 +130,7 @@ fn concat_dictionaries( Ok(Arc::new(array)) } -fn concat_list_of_dictionaries( +fn concat_lists( arrays: &[&dyn Array], ) -> Result { let mut output_len = 0; @@ -145,11 +145,6 @@ fn concat_list_of_dictionaries>(); - let dictionaries: Vec<_> = lists - .iter() - .map(|x| x.values().as_ref()) - .collect(); - let lists_nulls = list_has_nulls.then(|| { let mut nulls = BooleanBufferBuilder::new(output_len); for l in &lists { @@ -161,7 +156,12 @@ fn concat_list_of_dictionaries(dictionaries.as_slice())?; + let values = lists + .iter() + .map(|x| x.values().as_ref()) + .collect::>(); + + let concatenated_values = concat(values.as_slice())?; // Merge value offsets from the lists let value_offset_buffer = @@ -175,7 +175,7 @@ fn concat_list_of_dictionaries { - return Ok(Arc::new(concat_list_of_dictionaries::<$o, $t>($arrays)?) as _) - }; -} - fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities { match data_type { DataType::Utf8 => binary_capacity::(arrays), @@ -224,29 +218,20 @@ pub fn concat(arrays: &[&dyn Array]) -> Result { "It is not possible to concatenate arrays of different data types.".to_string(), )); } - if let DataType::Dictionary(k, _) = d { - downcast_integer! { - k.as_ref() => (dict_helper, arrays), - _ => unreachable!("illegal dictionary key type {k}") - }; - } else { - if let DataType::List(field) = d { - if let DataType::Dictionary(k, _) = field.data_type() { - downcast_integer! { - k.as_ref() => (list_dict_helper, i32, arrays), - _ => unreachable!("illegal dictionary key type {k}") - }; - } - } else if let DataType::LargeList(field) = d { - if let DataType::Dictionary(k, _) = field.data_type() { - downcast_integer! { - k.as_ref() => (list_dict_helper, i64, arrays), - _ => unreachable!("illegal dictionary key type {k}") - }; + + match d { + DataType::Dictionary(k, _) => { + downcast_integer! { + k.as_ref() => (dict_helper, arrays), + _ => unreachable!("illegal dictionary key type {k}") } } - let capacity = get_capacity(arrays, d); - concat_fallback(arrays, capacity) + DataType::List(_) => concat_lists::(arrays), + DataType::LargeList(_) => concat_lists::(arrays), + _ => { + let capacity = get_capacity(arrays, d); + concat_fallback(arrays, capacity) + } } } From ca61ce271bd1463272f2333ddc56ad9d66f30525 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sat, 21 Dec 2024 17:59:49 +0000 Subject: [PATCH 16/16] remove the use of ArrayData --- arrow-select/src/concat.rs | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 91ce16e062a3..4855e0087cc6 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -36,8 +36,7 @@ use arrow_array::types::*; use arrow_array::*; use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, OffsetBuffer}; use arrow_data::transform::{Capacities, MutableArrayData}; -use arrow_data::ArrayDataBuilder; -use arrow_schema::{ArrowError, DataType, SchemaRef}; +use arrow_schema::{ArrowError, DataType, FieldRef, SchemaRef}; use std::sync::Arc; fn binary_capacity(arrays: &[&dyn Array]) -> Capacities { @@ -132,6 +131,7 @@ fn concat_dictionaries( fn concat_lists( arrays: &[&dyn Array], + field: &FieldRef, ) -> Result { let mut output_len = 0; let mut list_has_nulls = false; @@ -156,7 +156,7 @@ fn concat_lists( NullBuffer::new(nulls.finish()) }); - let values = lists + let values: Vec<&dyn Array> = lists .iter() .map(|x| x.values().as_ref()) .collect::>(); @@ -165,22 +165,15 @@ fn concat_lists( // Merge value offsets from the lists let value_offset_buffer = - OffsetBuffer::::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths())) - .into_inner() - .into_inner(); - - let builder = ArrayDataBuilder::new(arrays[0].data_type().clone()) - .len(output_len) - .nulls(lists_nulls) - // `GenericListArray` must only have 1 buffer - .buffers(vec![value_offset_buffer]) - // `GenericListArray` must only have 1 child_data - .child_data(vec![concatenated_values.to_data()]); - - // TODO - maybe use build_unchecked? - let array_data = builder.build()?; - - let array = GenericListArray::::from(array_data); + OffsetBuffer::::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths())); + + let array = GenericListArray::::try_new( + Arc::clone(field), + value_offset_buffer, + concatenated_values, + lists_nulls, + )?; + Ok(Arc::new(array)) } @@ -226,8 +219,8 @@ pub fn concat(arrays: &[&dyn Array]) -> Result { _ => unreachable!("illegal dictionary key type {k}") } } - DataType::List(_) => concat_lists::(arrays), - DataType::LargeList(_) => concat_lists::(arrays), + DataType::List(field) => concat_lists::(arrays, field), + DataType::LargeList(field) => concat_lists::(arrays, field), _ => { let capacity = get_capacity(arrays, d); concat_fallback(arrays, capacity)