Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arrow-select): concat kernel will merge dictionary values for list of dictionaries #6893

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
183 changes: 182 additions & 1 deletion arrow-buffer/src/buffer/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use crate::buffer::ScalarBuffer;
use crate::{ArrowNativeType, MutableBuffer, OffsetBufferBuilder};
use crate::{ArrowNativeType, Buffer, MutableBuffer, OffsetBufferBuilder};
use num::Integer;
use std::ops::Deref;

/// A non-empty buffer of monotonically increasing, positive integers.
Expand Down Expand Up @@ -162,6 +163,44 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {
}
}

impl<O: ArrowNativeType + Integer + Copy> OffsetBuffer<O> {
rluvaton marked this conversation as resolved.
Show resolved Hide resolved
/// Merge multiple [`OffsetBuffer`] into a single [`OffsetBuffer`]
///
///
/// ```
/// # use arrow_buffer::{OffsetBuffer, ScalarBuffer};
/// // [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]]
rluvaton marked this conversation as resolved.
Show resolved Hide resolved
/// // The output should be
/// // [ 0, 3, 5, 7, 7, 13, 13, 14]
/// let buffers = [
/// OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 3, 5])),
/// OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 2, 2, 8])),
/// OffsetBuffer::<i32>::new_empty(),
/// OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 0, 1])),
/// ];
///
/// let buffer = OffsetBuffer::<i32>::merge(&buffers);
/// assert_eq!(buffer.as_ref(), &[ 0, 3, 5, 7, 7, 13, 13, 14]);
/// ```
///
pub fn merge<'a, Iter>(offset_buffers_iterator: Iter) -> Self
where
Iter: IntoIterator<Item = &'a OffsetBuffer<O>>,
<Iter as IntoIterator>::IntoIter: 'a + Clone,
{
let iter = MergeBuffersIter::from(offset_buffers_iterator.into_iter());
if iter.len() == 0 {
return Self::new_empty();
}

let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };

let scalar_buffer: ScalarBuffer<O> = buffer.into();

Self::new(scalar_buffer)
}
}

impl<T: ArrowNativeType> Deref for OffsetBuffer<T> {
type Target = [T];

Expand All @@ -184,6 +223,119 @@ impl<O: ArrowNativeType> From<OffsetBufferBuilder<O>> for OffsetBuffer<O> {
}
}

struct MergeBuffersIter<'a, Offset: Integer + Copy> {
rluvaton marked this conversation as resolved.
Show resolved Hide resolved
size: usize,
iterator: Box<dyn Iterator<Item = &'a [Offset]> + 'a>,
inner_iterator: Box<dyn Iterator<Item = Offset> + 'a>,
advance_by: Offset,
next_advance_by: Offset,
}

impl<'a, Offset, Iter> From<Iter> for MergeBuffersIter<'a, Offset>
where
Offset: ArrowNativeType + Integer + Copy,
Iter: Iterator<Item = &'a OffsetBuffer<Offset>> + Clone + 'a,
{
fn from(offset_buffers: Iter) -> Self {
Self::new(offset_buffers.clone(), Self::calculate_size(offset_buffers))
}
}

impl<'a, Offset: ArrowNativeType + Integer + Copy> MergeBuffersIter<'a, Offset> {
fn new(
offset_buffers_iterator: impl Iterator<Item = &'a OffsetBuffer<Offset>> + 'a,
size: usize,
) -> Self {
let offsets_iterator: Box<dyn Iterator<Item = &'a [Offset]>> = Box::new(
offset_buffers_iterator
// Filter out empty lists or lists with only 1 offset which are invalid as they should have at least 2 offsets (start and end)
.filter(|offset_buffer| offset_buffer.len() > 1)
.map(|offset_buffer| offset_buffer.inner().as_ref()),
);

Self {
size,
iterator: Box::new(offsets_iterator),
inner_iterator: if size == 0 {
Box::new([].into_iter())
} else {
// Start initially with outputting the initial offset
Box::new([Offset::zero()].into_iter())
},
advance_by: Offset::zero(),
next_advance_by: Offset::zero(),
}
}

fn calculate_size(buffers: impl Iterator<Item = &'a OffsetBuffer<Offset>>) -> usize {
// The total length of the merged offset buffer
// We calculate this so we can use the faster `try_from_trusted_len_iter` method which requires fixed length
let merged_offset_length: usize = buffers
// 1. `saturating_sub` as the list can be empty
// 2. subtract 1 as we have the initial offset of 0 that we don't need to count for each list, and we are adding 1 at the end
.map(|x| x.len().saturating_sub(1))
.sum();

if merged_offset_length == 0 {
return 0;
}

// we need to add 1 to the total length of the merged offset buffer as we have the initial offset of 0
merged_offset_length + 1
}
}

impl<Offset: ArrowNativeType + Integer + Copy> Iterator for MergeBuffersIter<'_, Offset> {
type Item = Offset;

fn next(&mut self) -> Option<Self::Item> {
// 1. Consume the inner iterator first
let inner_value = self.inner_iterator.next();

// 2. If we have a value, advance it by the last value in the previous buffer (advance_by)
if inner_value.is_some() {
self.size -= 1;
return Some(inner_value.unwrap() + self.advance_by);
}

self.advance_by = self.next_advance_by;

// 3. If no more iterators, than we finished
let current_offset_buffer = self.iterator.next()?;

// 4. Get the last value of the current buffer so we can know how much to advance the next buffer
// Safety: We already filtered out empty lists
let last_value = *current_offset_buffer.last().unwrap();

// 5. Update the next advance_by
self.next_advance_by = self.advance_by + last_value;

self.inner_iterator = Box::new(
current_offset_buffer
.iter()
// 6. Skip the initial offset of 0
// Skipping the first item as it is the initial offset of 0,
// and we skip even for the first iterator as we handle that by starting with inner_iterator of [0]
.skip(1)
.copied(),
);

// 7. Resume the inner iterator
// We already filtered out lists that have less than 2 offsets so can guarantee that the next call will return a value
self.next()
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.size, Some(self.size))
}
}

impl<Offset: ArrowNativeType + Integer + Copy> ExactSizeIterator for MergeBuffersIter<'_, Offset> {
fn len(&self) -> usize {
self.size
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -244,4 +396,33 @@ mod tests {
fn from_lengths_usize_overflow() {
OffsetBuffer::<i32>::from_lengths([usize::MAX, 1]);
}

#[test]
fn merge_from() {
// [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]]
// The output should be
// [ 0, 3, 5, 7, 7, 13, 13, 14]
//
let buffers = [
OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 3, 5])),
OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 2, 2, 8])),
OffsetBuffer::<i32>::new_empty(),
OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 0, 1])),
];

let buffer = OffsetBuffer::<i32>::merge(&buffers);
assert_eq!(buffer.as_ref(), &[0, 3, 5, 7, 7, 13, 13, 14]);
}

#[test]
fn merge_from_empty() {
let buffers = [
OffsetBuffer::<i32>::new_empty(),
OffsetBuffer::<i32>::new_empty(),
OffsetBuffer::<i32>::new_empty(),
];

let buffer = OffsetBuffer::<i32>::merge(&buffers);
assert_eq!(buffer.as_ref(), OffsetBuffer::<i32>::new_empty().as_ref());
}
}
Loading
Loading