Skip to content

Commit

Permalink
perf: Start on better Parquet delta decoding (#18049)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Aug 13, 2024
1 parent 9dc4106 commit 3f8d1a2
Show file tree
Hide file tree
Showing 25 changed files with 1,829 additions and 679 deletions.
206 changes: 166 additions & 40 deletions crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::default::Default;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, Ordering};

use arrow::array::specification::try_check_utf8;
Expand All @@ -11,11 +12,15 @@ use super::super::utils;
use super::super::utils::extend_from_decoder;
use super::decoders::*;
use super::utils::*;
use crate::parquet::encoding::delta_bitpacked::DeltaGatherer;
use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer;
use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
use crate::parquet::encoding::{delta_byte_array, delta_length_byte_array};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{DataPage, DictPage};
use crate::read::deserialize::utils::{Decoder, GatheredHybridRle, StateTranslation};
use crate::read::deserialize::utils::{
BatchableCollector, Decoder, GatheredHybridRle, StateTranslation,
};
use crate::read::PrimitiveLogicalType;

impl<O: Offset> utils::ExactSize for (Binary<O>, MutableBitmap) {
Expand All @@ -24,6 +29,138 @@ impl<O: Offset> utils::ExactSize for (Binary<O>, MutableBitmap) {
}
}

pub(crate) struct DeltaCollector<'a, 'b, O: Offset> {
pub(crate) decoder: &'b mut delta_length_byte_array::Decoder<'a>,
pub(crate) _pd: std::marker::PhantomData<O>,
}

pub(crate) struct DeltaBytesCollector<'a, 'b, O: Offset> {
pub(crate) decoder: &'b mut delta_byte_array::Decoder<'a>,
pub(crate) _pd: std::marker::PhantomData<O>,
}

impl<'a, 'b, O: Offset> DeltaBytesCollector<'a, 'b, O> {
pub fn gather_n_into(&mut self, target: &mut Binary<O>, n: usize) -> ParquetResult<()> {
struct MaybeUninitCollector(usize);

impl DeltaGatherer for MaybeUninitCollector {
type Target = [MaybeUninit<usize>; BATCH_SIZE];

fn target_len(&self, _target: &Self::Target) -> usize {
self.0
}

fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {}

fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> {
target[self.0] = MaybeUninit::new(v as usize);
self.0 += 1;
Ok(())
}
}

let decoder_len = self.decoder.len();
let mut n = usize::min(n, decoder_len);

if n == 0 {
return Ok(());
}

target.offsets.reserve(n);
let num_reserve_bytes = if target.offsets.len_proxy() == 0 {
self.decoder.values.len() - self.decoder.offset
} else {
// Make an estimate of how many bytes we will need
target.values.len() / target.offsets.len_proxy() * n
};
target.values.reserve(num_reserve_bytes);

const BATCH_SIZE: usize = 4096;

let mut prefix_lengths = [const { MaybeUninit::<usize>::uninit() }; BATCH_SIZE];
let mut suffix_lengths = [const { MaybeUninit::<usize>::uninit() }; BATCH_SIZE];

while n > 0 {
let num_elems = usize::min(n, BATCH_SIZE);
n -= num_elems;

self.decoder.prefix_lengths.gather_n_into(
&mut prefix_lengths,
num_elems,
&mut MaybeUninitCollector(0),
)?;
self.decoder.suffix_lengths.gather_n_into(
&mut suffix_lengths,
num_elems,
&mut MaybeUninitCollector(0),
)?;

for i in 0..num_elems {
let prefix_length = unsafe { prefix_lengths[i].assume_init() };
let suffix_length = unsafe { suffix_lengths[i].assume_init() };

target
.values
.extend_from_slice(&self.decoder.last[..prefix_length]);
target.values.extend_from_slice(
&self.decoder.values[self.decoder.offset..self.decoder.offset + suffix_length],
);

self.decoder.last.clear();
self.decoder.last.extend_from_slice(
&target.values[target.values.len() - prefix_length - suffix_length..],
);

self.decoder.offset += suffix_length;
}
}

Ok(())
}
}

impl<'a, 'b, O: Offset> BatchableCollector<(), Binary<O>> for DeltaCollector<'a, 'b, O> {
fn reserve(target: &mut Binary<O>, n: usize) {
target.offsets.reserve(n);
}

fn push_n(&mut self, target: &mut Binary<O>, n: usize) -> ParquetResult<()> {
let start = target.offsets.last().to_usize();
let mut gatherer = OffsetGatherer::default();
self.decoder
.lengths
.gather_n_into(&mut target.offsets, n, &mut gatherer)?;
let end = target.offsets.last().to_usize();

target.values.extend_from_slice(
&self.decoder.values[self.decoder.offset..self.decoder.offset + end - start],
);
self.decoder.offset += end - start;

Ok(())
}

fn push_n_nulls(&mut self, target: &mut Binary<O>, n: usize) -> ParquetResult<()> {
target.extend_constant(n);
Ok(())
}
}

impl<'a, 'b, O: Offset> BatchableCollector<(), Binary<O>> for DeltaBytesCollector<'a, 'b, O> {
fn reserve(target: &mut Binary<O>, n: usize) {
target.offsets.reserve(n);
}

fn push_n(&mut self, target: &mut Binary<O>, n: usize) -> ParquetResult<()> {
self.gather_n_into(target, n)
}

fn push_n_nulls(&mut self, target: &mut Binary<O>, n: usize) -> ParquetResult<()> {
target.extend_constant(n);
Ok(())
}
}

impl<'a, O: Offset> StateTranslation<'a, BinaryDecoder<O>> for BinaryStateTranslation<'a> {
type PlainDecoder = BinaryIter<'a>;

Expand Down Expand Up @@ -73,53 +210,42 @@ impl<'a, O: Offset> StateTranslation<'a, BinaryDecoder<O>> for BinaryStateTransl
page.dict,
additional,
)?,
T::Delta(page) => {
T::Delta(ref mut page) => {
let (values, validity) = decoded;

let mut collector = DeltaCollector {
decoder: page,
_pd: std::marker::PhantomData,
};

match page_validity {
None => values
.extend_lengths(page.lengths.by_ref().take(additional), &mut page.values),
Some(page_validity) => {
let Binary {
offsets,
values: values_,
} = values;

let last_offset = *offsets.last();
extend_from_decoder(
validity,
page_validity,
Some(additional),
offsets,
page.lengths.by_ref(),
)?;

let length = *offsets.last() - last_offset;

let (consumed, remaining) = page.values.split_at(length.to_usize());
page.values = remaining;
values_.extend_from_slice(consumed);
},
None => collector.push_n(values, additional)?,
Some(page_validity) => extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
collector,
)?,
}
},
T::DeltaBytes(page_values) => {
T::DeltaBytes(ref mut page_values) => {
let mut collector = DeltaBytesCollector {
decoder: page_values,
_pd: std::marker::PhantomData,
};

let (values, validity) = decoded;

match page_validity {
None => {
for x in page_values.take(additional) {
values.push(x)
}
},
Some(page_validity) => {
extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
page_values,
)?;
},
None => collector.push_n(values, additional)?,
Some(page_validity) => extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
collector,
)?,
}
},
}
Expand Down
Loading

0 comments on commit 3f8d1a2

Please sign in to comment.