Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple GC for view array types #5885

Merged
merged 10 commits into from
Jun 14, 2024
Prev Previous commit
Next Next commit
feat: add gc to view-arrays
part2: implement actual gc algorithm

Signed-off-by: 蔡略 <cailue@bupt.edu.cn>
  • Loading branch information
ClSlaid authored and XiangpengHao committed Jun 13, 2024
commit 4c76bc551a84bd3c1e8c1f7537ddd81a6f6fc787
146 changes: 133 additions & 13 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
use arrow_schema::{ArrowError, DataType};
use std::any::Any;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
@@ -267,16 +267,32 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
}
}

/// Returns whether this array is a compact view
pub(self) fn is_compact_view(&self) -> bool {
todo!()
/// Returns whether the buffers are compact
pub(self) fn compact_check(&self) -> Vec<bool> {
let mut checkers: Vec<_> = self
.buffers
.iter()
.map(|b| CompactChecker::new(b.len()))
.collect();

for (i, view) in self.views.iter().enumerate() {
let view = ByteView::from(*view);
if self.is_null(i) || view.length <= 12 {
continue;
}
checkers[view.buffer_index as usize]
.accumulate(view.offset as usize, view.length as usize);
}
checkers.into_iter().map(|c| c.finish()).collect()
}

/// Returns a compact version of this array
/// Returns a buffer compact version of this array
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
///
/// The original array will *not* be modified
///
/// # Compaction
/// # Garbage Collection
///
/// before compaction:
/// Before GC:
/// ```text
/// ┌──────┐
/// │......│
@@ -292,7 +308,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
/// large buffer
/// ```
///
/// after compaction:
/// After GC:
///
/// ```text
/// ┌────────────────────┐ ┌─────┐ After gc, only
@@ -304,12 +320,60 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
///
/// 2 views
/// ```
/// this method will compact the data buffers to only include the data
/// that is pointed to by the views
/// This method will compact the data buffers to only include the data
/// that is pointed to by the views,
/// and return a new array with the compacted data buffers.
/// the original array will be left as is.
pub fn compact(&self) -> Self {
todo!()
/// The original array will be left as is.
pub fn gc(&self) -> Self {
let compact_check = self.compact_check();

if compact_check.iter().all(|x| *x) {
return self.clone();
}

let mut new_views = Vec::with_capacity(self.views.len());
let mut new_bufs: Vec<Vec<u8>> = vec![vec![]; self.buffers.len()];
for view in self.views.iter() {
let mut bv = ByteView::from(*view);
let idx = bv.buffer_index as usize;
if bv.length <= 12 || compact_check[idx] {
new_views.push(*view);
continue;
}
// copy data to new buffer
let data = self.buffers.get(idx).unwrap();
let offset = new_bufs[idx].len();
let len = bv.length as usize;
new_bufs[idx].extend_from_slice(
data.get(bv.offset as usize..bv.offset as usize + len)
.unwrap(),
);
// update view
bv.offset = offset as u32;

new_views.push(bv.into());
}
let new_bufs: Vec<_> = new_bufs.into_iter().map(Buffer::from_vec).collect();

let new_views = ScalarBuffer::from(new_views);

let new_buffers = self
.buffers
.iter()
.enumerate()
.map(|(idx, buf)| {
if compact_check[idx] {
buf.clone()
} else {
new_bufs[idx].clone()
}
})
.collect();

let mut compacted = self.clone();
compacted.buffers = new_buffers;
compacted.views = new_views;
compacted
}
}

@@ -805,4 +869,60 @@ mod tests {
checker.accumulate(5, 5);
assert!(checker.finish());
}

#[test]
fn test_gc() {
// test compact on compacted data
let array = {
let mut builder = StringViewBuilder::new();
builder.append_value("I look at you all");
builder.append_option(Some("see the love there that's sleeping"));
builder.finish()
};

XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
let compacted = array.gc();
// verify it is a shallow copy
assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());

// test compact on non-compacted data
let mut array = {
let mut builder = StringViewBuilder::new();
builder.append_value("while my guitar gently weeps");
builder.finish()
};

// shrink the view
let mut view = ByteView::from(array.views[0]);
view.length = 15;
let new_views = ScalarBuffer::from(vec![view.into()]);
array.views = new_views;

let compacted = array.gc();
// verify it is a deep copy
assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
// verify content
assert_eq!(array.value(0), compacted.value(0));
// verify compacted
assert!(compacted.compact_check().iter().all(|x| *x));

// test compact on array containing null
let mut array = {
let mut builder = StringViewBuilder::new();
builder.append_null();
builder.append_option(Some("I don't know why nobody told you"));
builder.finish()
};

let mut view = ByteView::from(array.views[1]);
view.length = 15;
let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]);
array.views = new_views;

let compacted = array.gc();

assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
assert_eq!(array.value(0), compacted.value(0));
assert_eq!(array.value(1), compacted.value(1));
assert!(compacted.compact_check().iter().all(|x| *x));
}
}