diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs index 4c17b7bd2982..2d56f09d0af1 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs @@ -1,4 +1,5 @@ use std::default::Default; +use std::mem::MaybeUninit; use std::sync::atomic::{AtomicBool, Ordering}; use arrow::array::specification::try_check_utf8; @@ -11,11 +12,15 @@ use super::super::utils; use super::super::utils::extend_from_decoder; use super::decoders::*; use super::utils::*; +use crate::parquet::encoding::delta_bitpacked::DeltaGatherer; use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer; use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; +use crate::parquet::encoding::{delta_byte_array, delta_length_byte_array}; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{DataPage, DictPage}; -use crate::read::deserialize::utils::{Decoder, GatheredHybridRle, StateTranslation}; +use crate::read::deserialize::utils::{ + BatchableCollector, Decoder, GatheredHybridRle, StateTranslation, +}; use crate::read::PrimitiveLogicalType; impl utils::ExactSize for (Binary, MutableBitmap) { @@ -24,6 +29,138 @@ impl utils::ExactSize for (Binary, MutableBitmap) { } } +pub(crate) struct DeltaCollector<'a, 'b, O: Offset> { + pub(crate) decoder: &'b mut delta_length_byte_array::Decoder<'a>, + pub(crate) _pd: std::marker::PhantomData, +} + +pub(crate) struct DeltaBytesCollector<'a, 'b, O: Offset> { + pub(crate) decoder: &'b mut delta_byte_array::Decoder<'a>, + pub(crate) _pd: std::marker::PhantomData, +} + +impl<'a, 'b, O: Offset> DeltaBytesCollector<'a, 'b, O> { + pub fn gather_n_into(&mut self, target: &mut Binary, n: usize) -> ParquetResult<()> { + struct MaybeUninitCollector(usize); + + impl DeltaGatherer for MaybeUninitCollector { + type Target = [MaybeUninit; BATCH_SIZE]; + + fn target_len(&self, _target: &Self::Target) -> usize { + self.0 + } + + fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {} + + fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> { + target[self.0] = MaybeUninit::new(v as usize); + self.0 += 1; + Ok(()) + } + } + + let decoder_len = self.decoder.len(); + let mut n = usize::min(n, decoder_len); + + if n == 0 { + return Ok(()); + } + + target.offsets.reserve(n); + let num_reserve_bytes = if target.offsets.len_proxy() == 0 { + self.decoder.values.len() - self.decoder.offset + } else { + // Make an estimate of how many bytes we will need + target.values.len() / target.offsets.len_proxy() * n + }; + target.values.reserve(num_reserve_bytes); + + const BATCH_SIZE: usize = 4096; + + let mut prefix_lengths = [const { MaybeUninit::::uninit() }; BATCH_SIZE]; + let mut suffix_lengths = [const { MaybeUninit::::uninit() }; BATCH_SIZE]; + + while n > 0 { + let num_elems = usize::min(n, BATCH_SIZE); + n -= num_elems; + + self.decoder.prefix_lengths.gather_n_into( + &mut prefix_lengths, + num_elems, + &mut MaybeUninitCollector(0), + )?; + self.decoder.suffix_lengths.gather_n_into( + &mut suffix_lengths, + num_elems, + &mut MaybeUninitCollector(0), + )?; + + for i in 0..num_elems { + let prefix_length = unsafe { prefix_lengths[i].assume_init() }; + let suffix_length = unsafe { suffix_lengths[i].assume_init() }; + + target + .values + .extend_from_slice(&self.decoder.last[..prefix_length]); + target.values.extend_from_slice( + &self.decoder.values[self.decoder.offset..self.decoder.offset + suffix_length], + ); + + self.decoder.last.clear(); + self.decoder.last.extend_from_slice( + &target.values[target.values.len() - prefix_length - suffix_length..], + ); + + self.decoder.offset += suffix_length; + } + } + + Ok(()) + } +} + +impl<'a, 'b, O: Offset> BatchableCollector<(), Binary> for DeltaCollector<'a, 'b, O> { + fn reserve(target: &mut Binary, n: usize) { + target.offsets.reserve(n); + } + + fn push_n(&mut self, target: &mut Binary, n: usize) -> ParquetResult<()> { + let start = target.offsets.last().to_usize(); + let mut gatherer = OffsetGatherer::default(); + self.decoder + .lengths + .gather_n_into(&mut target.offsets, n, &mut gatherer)?; + let end = target.offsets.last().to_usize(); + + target.values.extend_from_slice( + &self.decoder.values[self.decoder.offset..self.decoder.offset + end - start], + ); + self.decoder.offset += end - start; + + Ok(()) + } + + fn push_n_nulls(&mut self, target: &mut Binary, n: usize) -> ParquetResult<()> { + target.extend_constant(n); + Ok(()) + } +} + +impl<'a, 'b, O: Offset> BatchableCollector<(), Binary> for DeltaBytesCollector<'a, 'b, O> { + fn reserve(target: &mut Binary, n: usize) { + target.offsets.reserve(n); + } + + fn push_n(&mut self, target: &mut Binary, n: usize) -> ParquetResult<()> { + self.gather_n_into(target, n) + } + + fn push_n_nulls(&mut self, target: &mut Binary, n: usize) -> ParquetResult<()> { + target.extend_constant(n); + Ok(()) + } +} + impl<'a, O: Offset> StateTranslation<'a, BinaryDecoder> for BinaryStateTranslation<'a> { type PlainDecoder = BinaryIter<'a>; @@ -73,53 +210,42 @@ impl<'a, O: Offset> StateTranslation<'a, BinaryDecoder> for BinaryStateTransl page.dict, additional, )?, - T::Delta(page) => { + T::Delta(ref mut page) => { let (values, validity) = decoded; + let mut collector = DeltaCollector { + decoder: page, + _pd: std::marker::PhantomData, + }; + match page_validity { - None => values - .extend_lengths(page.lengths.by_ref().take(additional), &mut page.values), - Some(page_validity) => { - let Binary { - offsets, - values: values_, - } = values; - - let last_offset = *offsets.last(); - extend_from_decoder( - validity, - page_validity, - Some(additional), - offsets, - page.lengths.by_ref(), - )?; - - let length = *offsets.last() - last_offset; - - let (consumed, remaining) = page.values.split_at(length.to_usize()); - page.values = remaining; - values_.extend_from_slice(consumed); - }, + None => collector.push_n(values, additional)?, + Some(page_validity) => extend_from_decoder( + validity, + page_validity, + Some(additional), + values, + collector, + )?, } }, - T::DeltaBytes(page_values) => { + T::DeltaBytes(ref mut page_values) => { + let mut collector = DeltaBytesCollector { + decoder: page_values, + _pd: std::marker::PhantomData, + }; + let (values, validity) = decoded; match page_validity { - None => { - for x in page_values.take(additional) { - values.push(x) - } - }, - Some(page_validity) => { - extend_from_decoder( - validity, - page_validity, - Some(additional), - values, - page_values, - )?; - }, + None => collector.push_n(values, additional)?, + Some(page_validity) => extend_from_decoder( + validity, + page_validity, + Some(additional), + values, + collector, + )?, } }, } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs index 27ee2a9a251c..53c25d8050b9 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs @@ -1,122 +1,20 @@ use arrow::array::specification::try_check_utf8; use arrow::array::{BinaryArray, MutableBinaryValuesArray}; +use arrow::offset::Offsets; +use arrow::types::Offset; use polars_error::PolarsResult; use super::super::utils; use super::utils::*; -use crate::parquet::encoding::{delta_bitpacked, delta_length_byte_array, hybrid_rle, Encoding}; -use crate::parquet::error::ParquetResult; +use crate::parquet::encoding::{ + delta_bitpacked, delta_byte_array, delta_length_byte_array, hybrid_rle, Encoding, +}; +use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{split_buffer, DataPage}; use crate::read::deserialize::utils::PageValidity; pub(crate) type BinaryDict = BinaryArray; -#[derive(Debug)] -pub(crate) struct Delta<'a> { - pub lengths: std::vec::IntoIter, - pub values: &'a [u8], -} - -impl<'a> Delta<'a> { - pub fn try_new(page: &'a DataPage) -> PolarsResult { - let values = split_buffer(page)?.values; - - let mut lengths_iter = delta_length_byte_array::Decoder::try_new(values)?; - - #[allow(clippy::needless_collect)] // we need to consume it to get the values - let lengths = lengths_iter - .by_ref() - .map(|x| x.map(|x| x as usize)) - .collect::>>()?; - - let values = lengths_iter.into_values(); - Ok(Self { - lengths: lengths.into_iter(), - values, - }) - } - - pub fn len(&self) -> usize { - self.lengths.size_hint().0 - } -} - -impl<'a> Iterator for Delta<'a> { - type Item = &'a [u8]; - - #[inline] - fn next(&mut self) -> Option { - let length = self.lengths.next()?; - let (item, remaining) = self.values.split_at(length); - self.values = remaining; - Some(item) - } - - fn size_hint(&self) -> (usize, Option) { - self.lengths.size_hint() - } -} - -#[derive(Debug)] -pub(crate) struct DeltaBytes<'a> { - prefix: std::vec::IntoIter, - suffix: std::vec::IntoIter, - data: &'a [u8], - data_offset: usize, - last_value: Vec, -} - -impl<'a> DeltaBytes<'a> { - pub fn try_new(page: &'a DataPage) -> PolarsResult { - let values = split_buffer(page)?.values; - let mut decoder = delta_bitpacked::Decoder::try_new(values)?; - let prefix = (&mut decoder) - .take(page.num_values()) - .map(|r| r.map(|v| v as i32).unwrap()) - .collect::>(); - - let mut data_offset = decoder.consumed_bytes(); - let mut decoder = delta_bitpacked::Decoder::try_new(&values[decoder.consumed_bytes()..])?; - let suffix = (&mut decoder) - .map(|r| r.map(|v| v as i32).unwrap()) - .collect::>(); - data_offset += decoder.consumed_bytes(); - - Ok(Self { - prefix: prefix.into_iter(), - suffix: suffix.into_iter(), - data: values, - data_offset, - last_value: vec![], - }) - } -} - -impl<'a> Iterator for DeltaBytes<'a> { - type Item = &'a [u8]; - - #[inline] - fn next(&mut self) -> Option { - let prefix_len = self.prefix.next()? as usize; - let suffix_len = self.suffix.next()? as usize; - - self.last_value.truncate(prefix_len); - self.last_value - .extend_from_slice(&self.data[self.data_offset..self.data_offset + suffix_len]); - self.data_offset += suffix_len; - - // SAFETY: the consumer will only keep one value around per iteration. - // We need a different API for this to work with safe code. - let extend_lifetime = - unsafe { std::mem::transmute::<&[u8], &'a [u8]>(self.last_value.as_slice()) }; - Some(extend_lifetime) - } - - fn size_hint(&self) -> (usize, Option) { - self.prefix.size_hint() - } -} - #[derive(Debug)] pub(crate) struct ValuesDictionary<'a> { pub values: hybrid_rle::HybridRleDecoder<'a>, @@ -136,12 +34,13 @@ impl<'a> ValuesDictionary<'a> { } } +#[allow(clippy::large_enum_variant)] #[derive(Debug)] pub(crate) enum BinaryStateTranslation<'a> { Plain(BinaryIter<'a>), Dictionary(ValuesDictionary<'a>), - Delta(Delta<'a>), - DeltaBytes(DeltaBytes<'a>), + Delta(delta_length_byte_array::Decoder<'a>), + DeltaBytes(delta_byte_array::Decoder<'a>), } impl<'a> BinaryStateTranslation<'a> { @@ -167,11 +66,17 @@ impl<'a> BinaryStateTranslation<'a> { Ok(BinaryStateTranslation::Plain(values)) }, (Encoding::DeltaLengthByteArray, _) => { - Ok(BinaryStateTranslation::Delta(Delta::try_new(page)?)) + let values = split_buffer(page)?.values; + Ok(BinaryStateTranslation::Delta( + delta_length_byte_array::Decoder::try_new(values)?, + )) + }, + (Encoding::DeltaByteArray, _) => { + let values = split_buffer(page)?.values; + Ok(BinaryStateTranslation::DeltaBytes( + delta_byte_array::Decoder::try_new(values)?, + )) }, - (Encoding::DeltaByteArray, _) => Ok(BinaryStateTranslation::DeltaBytes( - DeltaBytes::try_new(page)?, - )), _ => Err(utils::not_implemented(page)), } } @@ -180,7 +85,7 @@ impl<'a> BinaryStateTranslation<'a> { Self::Plain(v) => v.len_when_not_nullable(), Self::Dictionary(v) => v.len(), Self::Delta(v) => v.len(), - Self::DeltaBytes(v) => v.size_hint().0, + Self::DeltaBytes(v) => v.len(), } } @@ -192,8 +97,8 @@ impl<'a> BinaryStateTranslation<'a> { match self { Self::Plain(t) => _ = t.by_ref().nth(n - 1), Self::Dictionary(t) => t.values.skip_in_place(n)?, - Self::Delta(t) => _ = t.by_ref().nth(n - 1), - Self::DeltaBytes(t) => _ = t.by_ref().nth(n - 1), + Self::Delta(t) => t.skip_in_place(n)?, + Self::DeltaBytes(t) => t.skip_in_place(n)?, } Ok(()) @@ -211,3 +116,35 @@ pub(crate) fn deserialize_plain(values: &[u8], num_values: usize) -> BinaryDict dict_values.into() } + +#[derive(Default)] +pub(crate) struct OffsetGatherer { + _pd: std::marker::PhantomData, +} + +impl delta_bitpacked::DeltaGatherer for OffsetGatherer { + type Target = Offsets; + + fn target_len(&self, target: &Self::Target) -> usize { + target.len() + } + + fn target_reserve(&self, target: &mut Self::Target, n: usize) { + target.reserve(n); + } + + fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> { + target.try_push(v.try_into().unwrap()).unwrap(); + Ok(()) + } + fn gather_slice(&mut self, target: &mut Self::Target, slice: &[i64]) -> ParquetResult<()> { + target + .try_extend_from_lengths(slice.iter().copied().map(|i| i.try_into().unwrap())) + .map_err(|_| ParquetError::oos("Invalid length in delta encoding")) + } + fn gather_chunk(&mut self, target: &mut Self::Target, chunk: &[i64; 64]) -> ParquetResult<()> { + target + .try_extend_from_lengths(chunk.iter().copied().map(|i| i.try_into().unwrap())) + .map_err(|_| ParquetError::oos("Invalid length in delta encoding")) + } +} diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/utils.rs index 57e263d93587..65ea8aa54cc6 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/utils.rs @@ -40,17 +40,6 @@ impl Binary { pub fn len(&self) -> usize { self.offsets.len_proxy() } - - #[inline] - pub fn extend_lengths>(&mut self, lengths: I, values: &mut &[u8]) { - let current_offset = *self.offsets.last(); - self.offsets.try_extend_from_lengths(lengths).unwrap(); - let new_offset = *self.offsets.last(); - let length = new_offset.to_usize() - current_offset.to_usize(); - let (consumed, remaining) = values.split_at(length); - *values = remaining; - self.values.extend_from_slice(consumed); - } } impl<'a, O: Offset> Pushable<&'a [u8]> for Binary { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binview.rs b/crates/polars-parquet/src/arrow/read/deserialize/binview.rs index c9d2f6486017..bf6f4bf97f1d 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binview.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binview.rs @@ -1,3 +1,4 @@ +use std::mem::MaybeUninit; use std::sync::atomic::{AtomicBool, Ordering}; use arrow::array::{ @@ -8,8 +9,10 @@ use arrow::bitmap::MutableBitmap; use arrow::datatypes::{ArrowDataType, PhysicalType}; use super::binary::decoders::*; -use super::utils::freeze_validity; +use super::utils::{freeze_validity, BatchableCollector}; +use crate::parquet::encoding::delta_bitpacked::DeltaGatherer; use crate::parquet::encoding::hybrid_rle::{self, DictionaryTranslator}; +use crate::parquet::encoding::{delta_byte_array, delta_length_byte_array}; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{DataPage, DictPage}; use crate::read::deserialize::binary::utils::BinaryIter; @@ -82,39 +85,39 @@ impl<'a> StateTranslation<'a, BinViewDecoder> for BinaryStateTranslation<'a> { // Already done in decode_plain_encoded validate_utf8 = false; }, - Self::Delta(page_values) => { + Self::Delta(ref mut page_values) => { let (values, validity) = decoded; + + let mut collector = DeltaCollector { + decoder: page_values, + }; + match page_validity { - None => { - for value in page_values.by_ref().take(additional) { - values.push_value_ignore_validity(value) - } - }, - Some(page_validity) => { - extend_from_decoder( - validity, - page_validity, - Some(additional), - values, - page_values, - )?; - }, + None => collector.push_n(values, additional)?, + Some(page_validity) => extend_from_decoder( + validity, + page_validity, + Some(additional), + values, + collector, + )?, } }, - Self::DeltaBytes(page_values) => { + Self::DeltaBytes(ref mut page_values) => { let (values, validity) = decoded; + + let mut collector = DeltaBytesCollector { + decoder: page_values, + }; + match page_validity { - None => { - for x in page_values.take(additional) { - values.push_value_ignore_validity(x) - } - }, + None => collector.push_n(values, additional)?, Some(page_validity) => extend_from_decoder( validity, page_validity, Some(additional), values, - page_values, + collector, )?, } }, @@ -143,6 +146,153 @@ impl utils::ExactSize for DecodedStateTuple { } } +pub(crate) struct DeltaCollector<'a, 'b> { + pub(crate) decoder: &'b mut delta_length_byte_array::Decoder<'a>, +} + +pub(crate) struct DeltaBytesCollector<'a, 'b> { + pub(crate) decoder: &'b mut delta_byte_array::Decoder<'a>, +} + +pub(crate) struct ViewGatherer<'a, 'b> { + values: &'a [u8], + offset: &'b mut usize, +} + +impl<'a, 'b> DeltaGatherer for ViewGatherer<'a, 'b> { + type Target = MutableBinaryViewArray<[u8]>; + + fn target_len(&self, target: &Self::Target) -> usize { + target.len() + } + + fn target_reserve(&self, target: &mut Self::Target, n: usize) { + target.views_mut().reserve(n) + } + + fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> { + let v = v as usize; + let s = &self.values[*self.offset..*self.offset + v]; + *self.offset += v; + target.push(Some(s)); + Ok(()) + } +} + +impl<'a, 'b> BatchableCollector<(), MutableBinaryViewArray<[u8]>> for DeltaCollector<'a, 'b> { + fn reserve(target: &mut MutableBinaryViewArray<[u8]>, n: usize) { + target.views_mut().reserve(n); + } + + fn push_n(&mut self, target: &mut MutableBinaryViewArray<[u8]>, n: usize) -> ParquetResult<()> { + let mut gatherer = ViewGatherer { + values: self.decoder.values, + offset: &mut self.decoder.offset, + }; + self.decoder + .lengths + .gather_n_into(target, n, &mut gatherer)?; + + Ok(()) + } + + fn push_n_nulls( + &mut self, + target: &mut MutableBinaryViewArray<[u8]>, + n: usize, + ) -> ParquetResult<()> { + target.extend_constant(n, >::None); + Ok(()) + } +} + +impl<'a, 'b> BatchableCollector<(), MutableBinaryViewArray<[u8]>> for DeltaBytesCollector<'a, 'b> { + fn reserve(target: &mut MutableBinaryViewArray<[u8]>, n: usize) { + target.views_mut().reserve(n); + } + + fn push_n(&mut self, target: &mut MutableBinaryViewArray<[u8]>, n: usize) -> ParquetResult<()> { + struct MaybeUninitCollector(usize); + + impl DeltaGatherer for MaybeUninitCollector { + type Target = [MaybeUninit; BATCH_SIZE]; + + fn target_len(&self, _target: &Self::Target) -> usize { + self.0 + } + + fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {} + + fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> { + target[self.0] = MaybeUninit::new(v as usize); + self.0 += 1; + Ok(()) + } + } + + let decoder_len = self.decoder.len(); + let mut n = usize::min(n, decoder_len); + + if n == 0 { + return Ok(()); + } + + let mut buffer = Vec::new(); + target.views_mut().reserve(n); + + const BATCH_SIZE: usize = 4096; + + let mut prefix_lengths = [const { MaybeUninit::::uninit() }; BATCH_SIZE]; + let mut suffix_lengths = [const { MaybeUninit::::uninit() }; BATCH_SIZE]; + + while n > 0 { + let num_elems = usize::min(n, BATCH_SIZE); + n -= num_elems; + + self.decoder.prefix_lengths.gather_n_into( + &mut prefix_lengths, + num_elems, + &mut MaybeUninitCollector(0), + )?; + self.decoder.suffix_lengths.gather_n_into( + &mut suffix_lengths, + num_elems, + &mut MaybeUninitCollector(0), + )?; + + for i in 0..num_elems { + let prefix_length = unsafe { prefix_lengths[i].assume_init() }; + let suffix_length = unsafe { suffix_lengths[i].assume_init() }; + + buffer.clear(); + + buffer.extend_from_slice(&self.decoder.last[..prefix_length]); + buffer.extend_from_slice( + &self.decoder.values[self.decoder.offset..self.decoder.offset + suffix_length], + ); + + target.push_value(&buffer); + + self.decoder.last.clear(); + std::mem::swap(&mut self.decoder.last, &mut buffer); + + self.decoder.offset += suffix_length; + } + } + + Ok(()) + } + + fn push_n_nulls( + &mut self, + target: &mut MutableBinaryViewArray<[u8]>, + n: usize, + ) -> ParquetResult<()> { + target.extend_constant(n, >::None); + Ok(()) + } +} + impl utils::Decoder for BinViewDecoder { type Translation<'a> = BinaryStateTranslation<'a>; type Dict = BinaryDict; diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs index b82abec09996..e200d3c1a8da 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs @@ -50,7 +50,7 @@ pub fn columns_to_iter_recursive( PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::cast_as(), + primitive::IntDecoder::::cast_as(), init, )? .collect_n(filter) @@ -62,7 +62,7 @@ pub fn columns_to_iter_recursive( PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::cast_as(), + primitive::IntDecoder::::cast_as(), init, )? .collect_n(filter) @@ -74,7 +74,7 @@ pub fn columns_to_iter_recursive( PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::unit(), + primitive::IntDecoder::::unit(), init, )? .collect_n(filter) @@ -86,7 +86,7 @@ pub fn columns_to_iter_recursive( PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::unit(), + primitive::IntDecoder::::unit(), init, )? .collect_n(filter) @@ -98,7 +98,7 @@ pub fn columns_to_iter_recursive( PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::cast_as(), + primitive::IntDecoder::::cast_as(), init, )? .collect_n(filter) @@ -110,7 +110,7 @@ pub fn columns_to_iter_recursive( PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::cast_as(), + primitive::IntDecoder::::cast_as(), init, )? .collect_n(filter) @@ -123,7 +123,7 @@ pub fn columns_to_iter_recursive( PhysicalType::Int32 => PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::cast_as(), + primitive::IntDecoder::::cast_as(), init, )? .collect_n(filter) @@ -132,7 +132,7 @@ pub fn columns_to_iter_recursive( PhysicalType::Int64 => PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::cast_as(), + primitive::IntDecoder::::cast_as(), init, )? .collect_n(filter) @@ -150,7 +150,7 @@ pub fn columns_to_iter_recursive( PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::cast_as(), + primitive::IntDecoder::::cast_as(), init, )? .collect_n(filter) @@ -244,7 +244,7 @@ pub fn columns_to_iter_recursive( PhysicalType::Int32 => PageNestedDecoder::new( columns.pop().unwrap(), field.data_type.clone(), - primitive::PrimitiveDecoder::::cast_into(), + primitive::IntDecoder::::cast_into(), init, )? .collect_n(filter) @@ -252,7 +252,7 @@ pub fn columns_to_iter_recursive( PhysicalType::Int64 => PageNestedDecoder::new( columns.pop().unwrap(), field.data_type.clone(), - primitive::PrimitiveDecoder::::cast_into(), + primitive::IntDecoder::::cast_into(), init, )? .collect_n(filter) @@ -302,7 +302,7 @@ pub fn columns_to_iter_recursive( PhysicalType::Int32 => PageNestedDecoder::new( columns.pop().unwrap(), field.data_type.clone(), - primitive::PrimitiveDecoder::closure(|x: i32| i256(I256::new(x as i128))), + primitive::IntDecoder::closure(|x: i32| i256(I256::new(x as i128))), init, )? .collect_n(filter) @@ -310,7 +310,7 @@ pub fn columns_to_iter_recursive( PhysicalType::Int64 => PageNestedDecoder::new( columns.pop().unwrap(), field.data_type.clone(), - primitive::PrimitiveDecoder::closure(|x: i64| i256(I256::new(x as i128))), + primitive::IntDecoder::closure(|x: i64| i256(I256::new(x as i128))), init, )? .collect_n(filter) @@ -481,68 +481,52 @@ fn dict_read( }; Ok(match values_data_type.to_logical_type() { - UInt8 => { - PageNestedDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), - init, - )? - .collect_n(filter)? - }, + UInt8 => PageNestedDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::IntDecoder::::cast_as()), + init, + )? + .collect_n(filter)?, UInt16 => PageNestedDecoder::new( iter, data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), + dictionary::DictionaryDecoder::new(primitive::IntDecoder::::cast_as()), init, )? .collect_n(filter)?, UInt32 => PageNestedDecoder::new( iter, data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), + dictionary::DictionaryDecoder::new(primitive::IntDecoder::::cast_as()), + init, + )? + .collect_n(filter)?, + Int8 => PageNestedDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::IntDecoder::::cast_as()), init, )? .collect_n(filter)?, - Int8 => { - PageNestedDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), - init, - )? - .collect_n(filter)? - }, Int16 => PageNestedDecoder::new( iter, data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), + dictionary::DictionaryDecoder::new(primitive::IntDecoder::::cast_as()), init, )? .collect_n(filter)?, Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => PageNestedDecoder::new( iter, data_type, - dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::::unit()), + dictionary::DictionaryDecoder::new(primitive::IntDecoder::::unit()), init, )? .collect_n(filter)?, Int64 | Date64 | Time64(_) | Duration(_) => PageNestedDecoder::new( iter, data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), + dictionary::DictionaryDecoder::new(primitive::IntDecoder::::cast_as()), init, )? .collect_n(filter)?, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs index 10aeb5b9f640..45518947e0a1 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs @@ -2,13 +2,13 @@ use arrow::array::{DictionaryArray, DictionaryKey, PrimitiveArray}; use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; use arrow::types::NativeType; -use num_traits::AsPrimitive; use super::super::utils; use super::basic::{ AsDecoderFunction, ClosureDecoderFunction, DecoderFunction, IntoDecoderFunction, PlainDecoderFnCollector, PrimitiveDecoder, UnitDecoderFunction, ValuesDictionary, }; +use super::{DeltaCollector, DeltaTranslator}; use crate::parquet::encoding::hybrid_rle::{self, DictionaryTranslator}; use crate::parquet::encoding::{byte_stream_split, delta_bitpacked, Encoding}; use crate::parquet::error::ParquetResult; @@ -61,9 +61,9 @@ where }, (Encoding::DeltaBinaryPacked, _) => { let values = split_buffer(page)?.values; - Ok(Self::DeltaBinaryPacked(delta_bitpacked::Decoder::try_new( - values, - )?)) + Ok(Self::DeltaBinaryPacked( + delta_bitpacked::Decoder::try_new(values)?.0, + )) }, _ => Err(utils::not_implemented(page)), } @@ -74,7 +74,7 @@ where Self::Plain(v) => v.len(), Self::Dictionary(v) => v.len(), Self::ByteStreamSplit(v) => v.len(), - Self::DeltaBinaryPacked(v) => v.size_hint().0, + Self::DeltaBinaryPacked(v) => v.len(), } } @@ -87,7 +87,7 @@ where Self::Plain(v) => _ = v.nth(n - 1), Self::Dictionary(v) => v.values.skip_in_place(n)?, Self::ByteStreamSplit(v) => _ = v.iter_converted(|_| ()).nth(n - 1), - Self::DeltaBinaryPacked(v) => _ = v.nth(n - 1), + Self::DeltaBinaryPacked(v) => v.skip_in_place(n)?, } Ok(()) @@ -140,23 +140,22 @@ where Self::DeltaBinaryPacked(page_values) => { let (values, validity) = decoded; + let mut gatherer = DeltaTranslator { + dfn: decoder.0.decoder, + _pd: std::marker::PhantomData, + }; + match page_validity { - None => { - values.extend( - page_values - .by_ref() - .map(|x| decoder.0.decoder.decode(x.unwrap().as_())) - .take(additional), - ); - }, + None => page_values.gather_n_into(values, additional, &mut gatherer)?, Some(page_validity) => utils::extend_from_decoder( validity, page_validity, Some(additional), values, - &mut page_values - .by_ref() - .map(|x| decoder.0.decoder.decode(x.unwrap().as_())), + DeltaCollector { + decoder: page_values, + gatherer, + }, )?, } }, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/mod.rs index c13dfa88bc3e..45c95a7d5ee1 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/mod.rs @@ -1,5 +1,112 @@ +use arrow::types::NativeType; +use num_traits::AsPrimitive; + +use crate::parquet::types::NativeType as ParquetNativeType; + mod basic; mod integer; pub(crate) use basic::PrimitiveDecoder; pub(crate) use integer::IntDecoder; + +use self::basic::DecoderFunction; +use super::utils::BatchableCollector; +use super::ParquetResult; +use crate::parquet::encoding::delta_bitpacked::{self, DeltaGatherer}; + +struct DeltaTranslator +where + T: NativeType, + P: ParquetNativeType, + i64: AsPrimitive

, + D: DecoderFunction, +{ + dfn: D, + _pd: std::marker::PhantomData<(P, T)>, +} + +struct DeltaCollector<'a, 'b, P, T, D> +where + T: NativeType, + P: ParquetNativeType, + i64: AsPrimitive

, + D: DecoderFunction, +{ + decoder: &'b mut delta_bitpacked::Decoder<'a>, + gatherer: DeltaTranslator, +} + +impl DeltaGatherer for DeltaTranslator +where + T: NativeType, + P: ParquetNativeType, + i64: AsPrimitive

, + D: DecoderFunction, +{ + type Target = Vec; + + fn target_len(&self, target: &Self::Target) -> usize { + target.len() + } + + fn target_reserve(&self, target: &mut Self::Target, n: usize) { + target.reserve(n); + } + + fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> { + target.push(self.dfn.decode(v.as_())); + Ok(()) + } + + fn gather_constant( + &mut self, + target: &mut Self::Target, + v: i64, + delta: i64, + num_repeats: usize, + ) -> ParquetResult<()> { + target.extend((0..num_repeats).map(|i| self.dfn.decode((v + (i as i64) * delta).as_()))); + Ok(()) + } + + fn gather_slice(&mut self, target: &mut Self::Target, slice: &[i64]) -> ParquetResult<()> { + target.extend(slice.iter().copied().map(|v| self.dfn.decode(v.as_()))); + Ok(()) + } + + fn gather_chunk(&mut self, target: &mut Self::Target, chunk: &[i64; 64]) -> ParquetResult<()> { + target.extend(chunk.iter().copied().map(|v| self.dfn.decode(v.as_()))); + Ok(()) + } +} + +impl<'a, 'b, P, T, D> BatchableCollector<(), Vec> for DeltaCollector<'a, 'b, P, T, D> +where + T: NativeType, + P: ParquetNativeType, + i64: AsPrimitive

, + D: DecoderFunction, +{ + fn reserve(target: &mut Vec, n: usize) { + target.reserve(n); + } + + fn push_n(&mut self, target: &mut Vec, n: usize) -> ParquetResult<()> { + let start_length = target.len(); + let start_num_elems = self.decoder.len(); + + self.decoder.gather_n_into(target, n, &mut self.gatherer)?; + + let consumed_elements = usize::min(n, start_num_elems); + + debug_assert_eq!(self.decoder.len(), start_num_elems - consumed_elements); + debug_assert_eq!(target.len(), start_length + consumed_elements); + + Ok(()) + } + + fn push_n_nulls(&mut self, target: &mut Vec, n: usize) -> ParquetResult<()> { + target.resize(target.len() + n, T::default()); + Ok(()) + } +} diff --git a/crates/polars-parquet/src/arrow/write/binary/basic.rs b/crates/polars-parquet/src/arrow/write/binary/basic.rs index e675986d81cc..c977a4e4939c 100644 --- a/crates/polars-parquet/src/arrow/write/binary/basic.rs +++ b/crates/polars-parquet/src/arrow/write/binary/basic.rs @@ -126,14 +126,14 @@ pub(crate) fn encode_delta( let length = offsets.len() - 1 - validity.unset_bits(); let lengths = utils::ExactSizedIter::new(lengths, length); - delta_bitpacked::encode(lengths, buffer); + delta_bitpacked::encode(lengths, buffer, 1); } else { let lengths = offsets.windows(2).map(|w| (w[1] - w[0]).to_usize() as i64); - delta_bitpacked::encode(lengths, buffer); + delta_bitpacked::encode(lengths, buffer, 1); } } else { let lengths = offsets.windows(2).map(|w| (w[1] - w[0]).to_usize() as i64); - delta_bitpacked::encode(lengths, buffer); + delta_bitpacked::encode(lengths, buffer, 1); } buffer.extend_from_slice( diff --git a/crates/polars-parquet/src/arrow/write/binview/basic.rs b/crates/polars-parquet/src/arrow/write/binview/basic.rs index 2516e03e667c..c7059b63c99e 100644 --- a/crates/polars-parquet/src/arrow/write/binview/basic.rs +++ b/crates/polars-parquet/src/arrow/write/binview/basic.rs @@ -23,8 +23,11 @@ pub(crate) fn encode_plain(array: &BinaryViewArray, buffer: &mut Vec) { } pub(crate) fn encode_delta(array: &BinaryViewArray, buffer: &mut Vec) { - let lengths = array.non_null_views_iter().map(|v| v.length as i64); - delta_bitpacked::encode(lengths, buffer); + let lengths = utils::ExactSizedIter::new( + array.non_null_views_iter().map(|v| v.length as i64), + array.len() - array.null_count(), + ); + delta_bitpacked::encode(lengths, buffer, 1); for slice in array.non_null_values_iter() { buffer.extend_from_slice(slice) diff --git a/crates/polars-parquet/src/arrow/write/primitive/basic.rs b/crates/polars-parquet/src/arrow/write/primitive/basic.rs index b914978ea8db..2c6c137ce220 100644 --- a/crates/polars-parquet/src/arrow/write/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/write/primitive/basic.rs @@ -89,7 +89,7 @@ where integer }); let iterator = ExactSizedIter::new(iterator, array.len() - array.null_count()); - encode(iterator, &mut buffer) + encode(iterator, &mut buffer, 1) } else { // append all values let iterator = array.values().iter().map(|x| { @@ -97,7 +97,7 @@ where let integer: i64 = parquet_native.as_(); integer }); - encode(iterator, &mut buffer) + encode(iterator, &mut buffer, 1) } buffer } diff --git a/crates/polars-parquet/src/arrow/write/utils.rs b/crates/polars-parquet/src/arrow/write/utils.rs index bbbe177af648..7f7796b0fff2 100644 --- a/crates/polars-parquet/src/arrow/write/utils.rs +++ b/crates/polars-parquet/src/arrow/write/utils.rs @@ -134,6 +134,8 @@ impl> Iterator for ExactSizedIter { } } +impl> std::iter::ExactSizeIterator for ExactSizedIter {} + /// Returns the number of bits needed to bitpack `max` #[inline] pub fn get_bit_width(max: u64) -> u32 { diff --git a/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs b/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs index ce7fa301a7b4..6e37507d137f 100644 --- a/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs +++ b/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs @@ -1,5 +1,5 @@ use super::{Packed, Unpackable, Unpacked}; -use crate::parquet::error::ParquetError; +use crate::parquet::error::{ParquetError, ParquetResult}; /// An [`Iterator`] of [`Unpackable`] unpacked from a bitpacked slice of bytes. /// # Implementation @@ -9,34 +9,18 @@ pub struct Decoder<'a, T: Unpackable> { packed: std::slice::Chunks<'a, u8>, num_bits: usize, /// number of items - length: usize, + pub(crate) length: usize, _pd: std::marker::PhantomData, } -#[derive(Debug)] -pub struct DecoderIter { - buffer: Vec, - idx: usize, -} - -impl Iterator for DecoderIter { - type Item = T; - - fn next(&mut self) -> Option { - if self.idx >= self.buffer.len() { - return None; +impl<'a, T: Unpackable> Default for Decoder<'a, T> { + fn default() -> Self { + Self { + packed: [].chunks(1), + num_bits: 0, + length: 0, + _pd: std::marker::PhantomData, } - - let value = self.buffer[self.idx]; - self.idx += 1; - - Some(value) - } - - fn size_hint(&self) -> (usize, Option) { - let len = self.buffer.len() - self.idx; - - (len, Some(len)) } } @@ -57,18 +41,43 @@ impl<'a, T: Unpackable> Decoder<'a, T> { Self::try_new(packed, num_bits, length).unwrap() } - pub fn collect_into_iter(self) -> DecoderIter { - let mut buffer = Vec::new(); - self.collect_into(&mut buffer); - DecoderIter { buffer, idx: 0 } + /// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`. + /// + /// `num_bits` is allowed to be `0`. + pub fn new_allow_zero(packed: &'a [u8], num_bits: usize, length: usize) -> Self { + Self::try_new_allow_zero(packed, num_bits, length).unwrap() } - pub fn num_bits(&self) -> usize { - self.num_bits + /// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`. + /// + /// `num_bits` is allowed to be `0`. + pub fn try_new_allow_zero( + packed: &'a [u8], + num_bits: usize, + length: usize, + ) -> ParquetResult { + let block_size = std::mem::size_of::() * num_bits; + + if packed.len() * 8 < length * num_bits { + return Err(ParquetError::oos(format!( + "Unpacking {length} items with a number of bits {num_bits} requires at least {} bytes.", + length * num_bits / 8 + ))); + } + + debug_assert!(num_bits != 0 || packed.is_empty()); + let packed = packed.chunks(block_size.max(1)); + + Ok(Self { + length, + packed, + num_bits, + _pd: Default::default(), + }) } /// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`. - pub fn try_new(packed: &'a [u8], num_bits: usize, length: usize) -> Result { + pub fn try_new(packed: &'a [u8], num_bits: usize, length: usize) -> ParquetResult { let block_size = std::mem::size_of::() * num_bits; if num_bits == 0 { @@ -91,11 +100,16 @@ impl<'a, T: Unpackable> Decoder<'a, T> { _pd: Default::default(), }) } + + pub fn num_bits(&self) -> usize { + self.num_bits + } } /// A iterator over the exact chunks in a [`Decoder`]. /// /// The remainder can be accessed using `remainder` or `next_inexact`. +#[derive(Debug)] pub struct ChunkedDecoder<'a, 'b, T: Unpackable> { pub(crate) decoder: &'b mut Decoder<'a, T>, } diff --git a/crates/polars-parquet/src/parquet/encoding/bitpacked/mod.rs b/crates/polars-parquet/src/parquet/encoding/bitpacked/mod.rs index ef6c5313ba26..94f310d28f14 100644 --- a/crates/polars-parquet/src/parquet/encoding/bitpacked/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/bitpacked/mod.rs @@ -57,7 +57,7 @@ mod encode; mod pack; mod unpack; -pub use decode::{Decoder, DecoderIter}; +pub use decode::Decoder; pub use encode::{encode, encode_pack}; /// A byte slice (e.g. `[u8; 8]`) denoting types that represent complete packs. diff --git a/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/decoder.rs b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/decoder.rs index ee21a5094718..fb8eb153cfb7 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/decoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/decoder.rs @@ -1,246 +1,811 @@ +//! This module implements the `DELTA_BINARY_PACKED` encoding. +//! +//! For performance reasons this is done without iterators. Instead, we have `gather_n` functions +//! and a `DeltaGatherer` trait. These allow efficient decoding and mapping of the decoded values. +//! +//! Full information on the delta encoding can be found on the Apache Parquet Format repository. +//! +//! +//! +//! Delta encoding compresses sequential integer values by encoding the first value and the +//! differences between consequentive values. This variant encodes the data into `Block`s and +//! `MiniBlock`s. +//! +//! - A `Block` contains a minimum delta, bitwidths and one or more miniblocks. +//! - A `MiniBlock` contains many deltas that are encoded in [`bitpacked`] encoding. +//! +//! The decoder keeps track of the last value and calculates a new value with the following +//! function. +//! +//! ```text +//! NextValue(Delta) = { +//! Value = Decoder.LastValue + Delta + Block.MinDelta +//! Decoder.LastValue = Value +//! return Value +//! } +//! ``` +//! +//! Note that all these additions need to be wrapping. + use super::super::{bitpacked, uleb128, zigzag_leb128}; -use crate::parquet::encoding::ceil8; +use crate::parquet::encoding::bitpacked::{Unpackable, Unpacked}; use crate::parquet::error::{ParquetError, ParquetResult}; -/// An [`Iterator`] of [`i64`] +const MAX_BITWIDTH: u8 = 64; + +/// Decoder of parquets' `DELTA_BINARY_PACKED`. +#[derive(Debug)] +pub struct Decoder<'a> { + num_miniblocks_per_block: usize, + values_per_block: usize, + + values_remaining: usize, + + last_value: i64, + + values: &'a [u8], + + block: Block<'a>, +} + #[derive(Debug)] struct Block<'a> { - // this is the minimum delta that must be added to every value. min_delta: i64, - _num_mini_blocks: usize, - /// Number of values that each mini block has. - values_per_mini_block: usize, - bitwidths: std::slice::Iter<'a, u8>, - values: &'a [u8], - remaining: usize, // number of elements - current_index: usize, // invariant: < values_per_mini_block - // None represents a relative delta of zero, in which case there is no miniblock. - current_miniblock: Option>, - // number of bytes consumed. - consumed_bytes: usize, + + /// Bytes that give the `num_bits` for the [`bitpacked::Decoder`]. + /// + /// Invariant: `bitwidth[i] <= MAX_BITWIDTH` for all `i` + bitwidths: &'a [u8], + values_remaining: usize, + miniblock: MiniBlock<'a>, +} + +#[derive(Debug)] +struct MiniBlock<'a> { + decoder: bitpacked::Decoder<'a, u64>, + buffered: ::Unpacked, + unpacked_start: usize, + unpacked_end: usize, } -impl<'a> Block<'a> { - pub fn try_new( - mut values: &'a [u8], - num_mini_blocks: usize, - values_per_mini_block: usize, - length: usize, - ) -> ParquetResult { - let length = std::cmp::min(length, num_mini_blocks * values_per_mini_block); - - let mut consumed_bytes = 0; - let (min_delta, consumed) = zigzag_leb128::decode(values); - consumed_bytes += consumed; - values = &values[consumed..]; - - if num_mini_blocks > values.len() { - return Err(ParquetError::oos( - "Block must contain at least num_mini_blocks bytes (the bitwidths)", - )); +struct SkipGatherer; +pub(crate) struct SumGatherer(pub(crate) usize); + +pub trait DeltaGatherer { + type Target: std::fmt::Debug; + + fn target_len(&self, target: &Self::Target) -> usize; + fn target_reserve(&self, target: &mut Self::Target, n: usize); + + /// Gather one element with value `v` into `target`. + fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()>; + + /// Gather `num_repeats` elements into `target`. + /// + /// The first value is `v` and the `n`-th value is `v + (n-1)*delta`. + fn gather_constant( + &mut self, + target: &mut Self::Target, + v: i64, + delta: i64, + num_repeats: usize, + ) -> ParquetResult<()> { + for i in 0..num_repeats { + self.gather_one(target, v + (i as i64) * delta)?; + } + Ok(()) + } + /// Gather a `slice` of elements into `target`. + fn gather_slice(&mut self, target: &mut Self::Target, slice: &[i64]) -> ParquetResult<()> { + for &v in slice { + self.gather_one(target, v)?; } - let (bitwidths, remaining) = values.split_at(num_mini_blocks); - consumed_bytes += num_mini_blocks; - values = remaining; + Ok(()) + } + /// Gather a `chunk` of elements into `target`. + fn gather_chunk(&mut self, target: &mut Self::Target, chunk: &[i64; 64]) -> ParquetResult<()> { + self.gather_slice(target, chunk) + } +} - let mut block = Block { - min_delta, - _num_mini_blocks: num_mini_blocks, - values_per_mini_block, - bitwidths: bitwidths.iter(), - remaining: length, - values, - current_index: 0, - current_miniblock: None, - consumed_bytes, - }; +impl DeltaGatherer for SkipGatherer { + type Target = usize; - // Set up first mini-block - block.advance_miniblock()?; + fn target_len(&self, target: &Self::Target) -> usize { + *target + } + fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {} - Ok(block) + fn gather_one(&mut self, target: &mut Self::Target, _v: i64) -> ParquetResult<()> { + *target += 1; + Ok(()) + } + fn gather_constant( + &mut self, + target: &mut Self::Target, + _v: i64, + _delta: i64, + num_repeats: usize, + ) -> ParquetResult<()> { + *target += num_repeats; + Ok(()) } + fn gather_chunk(&mut self, target: &mut Self::Target, chunk: &[i64; 64]) -> ParquetResult<()> { + *target += chunk.len(); + Ok(()) + } + fn gather_slice(&mut self, target: &mut Self::Target, slice: &[i64]) -> ParquetResult<()> { + *target += slice.len(); + Ok(()) + } +} - fn advance_miniblock(&mut self) -> ParquetResult<()> { - // unwrap is ok: we sliced it by num_mini_blocks in try_new - let num_bits = self.bitwidths.next().copied().unwrap() as usize; +impl DeltaGatherer for SumGatherer { + type Target = usize; - self.current_miniblock = if num_bits > 0 { - let length = std::cmp::min(self.remaining, self.values_per_mini_block); + fn target_len(&self, _target: &Self::Target) -> usize { + self.0 + } + fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {} - let miniblock_length = ceil8(self.values_per_mini_block * num_bits); - if miniblock_length > self.values.len() { - return Err(ParquetError::oos( - "block must contain at least miniblock_length bytes (the mini block)", - )); - } - let (miniblock, remainder) = self.values.split_at(miniblock_length); - - self.values = remainder; - self.consumed_bytes += miniblock_length; - - Some( - bitpacked::Decoder::try_new(miniblock, num_bits, length) - .unwrap() - .collect_into_iter(), - ) - } else { - None - }; - self.current_index = 0; + fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> { + if v < 0 { + return Err(ParquetError::oos(format!( + "Invalid delta encoding length {v}" + ))); + } + *target += v as usize; + self.0 += 1; Ok(()) } -} + fn gather_constant( + &mut self, + target: &mut Self::Target, + v: i64, + delta: i64, + num_repeats: usize, + ) -> ParquetResult<()> { + if v < 0 || (delta < 0 && num_repeats as i64 * delta + v < 0) { + return Err(ParquetError::oos("Invalid delta encoding length")); + } + + let base = v * num_repeats as i64; + let is_even = num_repeats & 1; + // SUM_i=0^n f * i = f * (n(n+1)/2) + let increment = (num_repeats >> is_even) * ((num_repeats + 1) >> (is_even ^ 1)); -impl<'a> Iterator for Block<'a> { - type Item = Result; + *target += base as usize + increment; - fn next(&mut self) -> Option { - if self.remaining == 0 { - return None; + Ok(()) + } + fn gather_slice(&mut self, target: &mut Self::Target, slice: &[i64]) -> ParquetResult<()> { + let min = slice.iter().copied().min().unwrap_or_default(); + if min < 0 { + return Err(ParquetError::oos(format!( + "Invalid delta encoding length {min}" + ))); } - let result = self.min_delta - + self - .current_miniblock - .as_mut() - .map(|x| x.next().unwrap_or_default()) - .unwrap_or(0) as i64; - self.current_index += 1; - self.remaining -= 1; - - if self.remaining > 0 && self.current_index == self.values_per_mini_block { - if let Err(e) = self.advance_miniblock() { - return Some(Err(e)); - } + + *target += slice.iter().copied().map(|v| v as usize).sum::(); + self.0 += slice.len(); + Ok(()) + } + fn gather_chunk(&mut self, target: &mut Self::Target, chunk: &[i64; 64]) -> ParquetResult<()> { + let min = chunk.iter().copied().min().unwrap_or_default(); + if min < 0 { + return Err(ParquetError::oos(format!( + "Invalid delta encoding length {min}" + ))); } + *target += chunk.iter().copied().map(|v| v as usize).sum::(); + self.0 += chunk.len(); + Ok(()) + } +} - Some(Ok(result)) +/// Gather the rest of the [`bitpacked::Decoder`] into `target` +fn gather_bitpacked( + target: &mut G::Target, + min_delta: i64, + last_value: &mut i64, + mut decoder: bitpacked::Decoder, + gatherer: &mut G, +) -> ParquetResult<()> { + let mut chunked = decoder.chunked(); + for mut chunk in &mut chunked { + for value in &mut chunk { + *last_value = last_value + .wrapping_add(*value as i64) + .wrapping_add(min_delta); + *value = *last_value as u64; + } + + let chunk = bytemuck::cast_ref(&chunk); + gatherer.gather_chunk(target, chunk)?; + } + + if let Some((mut chunk, length)) = chunked.next_inexact() { + let slice = &mut chunk[..length]; + + for value in slice.iter_mut() { + *last_value = last_value + .wrapping_add(*value as i64) + .wrapping_add(min_delta); + *value = *last_value as u64; + } + + let slice = bytemuck::cast_slice(slice); + gatherer.gather_slice(target, slice)?; } + + Ok(()) } -/// Decoder of parquets' `DELTA_BINARY_PACKED`. Implements `Iterator`. -/// # Implementation -/// This struct does not allocate on the heap. -#[derive(Debug)] -pub struct Decoder<'a> { - num_mini_blocks: usize, - values_per_mini_block: usize, - values_remaining: usize, - next_value: i64, - values: &'a [u8], - current_block: Option>, - // the total number of bytes consumed up to a given point, excluding the bytes on the current_block - consumed_bytes: usize, +/// Gather an entire [`MiniBlock`] into `target` +fn gather_miniblock( + target: &mut G::Target, + min_delta: i64, + bitwidth: u8, + values: &[u8], + values_per_miniblock: usize, + last_value: &mut i64, + gatherer: &mut G, +) -> ParquetResult<()> { + let bitwidth = bitwidth as usize; + + debug_assert!(bitwidth <= 64); + debug_assert_eq!((bitwidth * values_per_miniblock).div_ceil(8), values.len()); + + let start_length = gatherer.target_len(target); + gather_bitpacked( + target, + min_delta, + last_value, + bitpacked::Decoder::new(values, bitwidth, values_per_miniblock), + gatherer, + )?; + let target_length = gatherer.target_len(target); + + debug_assert_eq!(target_length - start_length, values_per_miniblock); + + Ok(()) +} + +/// Gather an entire [`Block`] into `target` +fn gather_block<'a, G: DeltaGatherer>( + target: &mut G::Target, + num_miniblocks: usize, + values_per_miniblock: usize, + mut values: &'a [u8], + last_value: &mut i64, + gatherer: &mut G, +) -> ParquetResult<&'a [u8]> { + let (min_delta, consumed) = zigzag_leb128::decode(values); + values = &values[consumed..]; + let bitwidths; + (bitwidths, values) = values + .split_at_checked(num_miniblocks) + .ok_or(ParquetError::oos( + "Not enough bitwidths available in delta encoding", + ))?; + + gatherer.target_reserve(target, num_miniblocks * values_per_miniblock); + for &bitwidth in bitwidths { + let miniblock; + (miniblock, values) = values + .split_at_checked((bitwidth as usize * values_per_miniblock).div_ceil(8)) + .ok_or(ParquetError::oos( + "Not enough bytes for miniblock in delta encoding", + ))?; + gather_miniblock( + target, + min_delta, + bitwidth, + miniblock, + values_per_miniblock, + last_value, + gatherer, + )?; + } + + Ok(values) } impl<'a> Decoder<'a> { - pub fn try_new(mut values: &'a [u8]) -> Result { - let mut consumed_bytes = 0; - let (block_size, consumed) = uleb128::decode(values); - consumed_bytes += consumed; - assert_eq!(block_size % 128, 0); - values = &values[consumed..]; - let (num_mini_blocks, consumed) = uleb128::decode(values); - let num_mini_blocks = num_mini_blocks as usize; - consumed_bytes += consumed; - values = &values[consumed..]; + pub fn try_new(mut values: &'a [u8]) -> ParquetResult<(Self, &'a [u8])> { + let header_err = || ParquetError::oos("Insufficient bytes for Delta encoding header"); + + // header: + // + + let (values_per_block, consumed) = uleb128::decode(values); + let values_per_block = values_per_block as usize; + values = values.get(consumed..).ok_or_else(header_err)?; + + assert_eq!(values_per_block % 128, 0); + + let (num_miniblocks_per_block, consumed) = uleb128::decode(values); + let num_miniblocks_per_block = num_miniblocks_per_block as usize; + values = values.get(consumed..).ok_or_else(header_err)?; + let (total_count, consumed) = uleb128::decode(values); let total_count = total_count as usize; - consumed_bytes += consumed; - values = &values[consumed..]; + values = values.get(consumed..).ok_or_else(header_err)?; + let (first_value, consumed) = zigzag_leb128::decode(values); - consumed_bytes += consumed; - values = &values[consumed..]; - - let values_per_mini_block = block_size as usize / num_mini_blocks; - assert_eq!(values_per_mini_block % 8, 0); - - // If we only have one value (first_value), there are no blocks. - let current_block = if total_count > 1 { - Some(Block::try_new( - values, - num_mini_blocks, - values_per_mini_block, - total_count - 1, - )?) - } else { - None - }; + values = values.get(consumed..).ok_or_else(header_err)?; + + assert_eq!(values_per_block % num_miniblocks_per_block, 0); + assert_eq!((values_per_block / num_miniblocks_per_block) % 32, 0); + + let values_per_miniblock = values_per_block / num_miniblocks_per_block; + assert_eq!(values_per_miniblock % 8, 0); + + // We skip over all the values to determine where the slice stops. + // + // This also has the added benefit of error checking in advance, thus we can unwrap in + // other places. + + let mut rem = values; + if total_count > 1 { + let mut num_values_left = total_count - 1; + while num_values_left > 0 { + // If the number of values is does not need all the miniblocks anymore, we need to + // ignore the later miniblocks and regard them as having bitwidth = 0. + // + // Quoted from the specification: + // + // > If, in the last block, less than miniblocks + // > are needed to store the values, the bytes storing the bit widths of the + // > unneeded miniblocks are still present, their value should be zero, but readers + // > must accept arbitrary values as well. There are no additional padding bytes for + // > the miniblock bodies though, as if their bit widths were 0 (regardless of the + // > actual byte values). The reader knows when to stop reading by keeping track of + // > the number of values read. + let num_remaining_mini_blocks = usize::min( + num_miniblocks_per_block, + num_values_left.div_ceil(values_per_miniblock), + ); + + // block: + // + + let (_, consumed) = zigzag_leb128::decode(rem); + rem = rem.get(consumed..).ok_or(ParquetError::oos( + "No min-delta value in delta encoding miniblock", + ))?; + + if rem.len() < num_miniblocks_per_block { + return Err(ParquetError::oos( + "Not enough bitwidths available in delta encoding", + )); + } + if let Some(err_bitwidth) = rem + .get(..num_remaining_mini_blocks) + .expect("num_remaining_mini_blocks <= num_miniblocks_per_block") + .iter() + .copied() + .find(|&bitwidth| bitwidth > MAX_BITWIDTH) + { + return Err(ParquetError::oos(format!( + "Delta encoding miniblock with bitwidth {err_bitwidth} higher than maximum {MAX_BITWIDTH} bits", + ))); + } + + let num_bitpacking_bytes = rem[..num_remaining_mini_blocks] + .iter() + .copied() + .map(|bitwidth| (bitwidth as usize * values_per_miniblock).div_ceil(8)) + .sum::(); + + rem = rem + .get(num_miniblocks_per_block + num_bitpacking_bytes..) + .ok_or(ParquetError::oos( + "Not enough bytes for all bitpacked values in delta encoding", + ))?; + + num_values_left = num_values_left.saturating_sub(values_per_block); + } + } + + let values = &values[..values.len() - rem.len()]; - Ok(Self { - num_mini_blocks, - values_per_mini_block, - values_remaining: total_count, - next_value: first_value, + let decoder = Self { + num_miniblocks_per_block, + values_per_block, + values_remaining: total_count.saturating_sub(1), + last_value: first_value, values, - current_block, - consumed_bytes, - }) + + block: Block { + // @NOTE: + // We add one delta=0 into the buffered block which allows us to + // prepend like the `first_value` is just any normal value. + // + // This is a bit of a hack, but makes the rest of the logic + // **A LOT** simpler. + values_remaining: usize::from(total_count > 0), + min_delta: 0, + bitwidths: &[], + miniblock: MiniBlock { + decoder: bitpacked::Decoder::try_new_allow_zero(&[], 0, 1)?, + buffered: ::Unpacked::zero(), + unpacked_start: 0, + unpacked_end: 0, + }, + }, + }; + + Ok((decoder, rem)) } - /// Returns the total number of bytes consumed up to this point by [`Decoder`]. - pub fn consumed_bytes(&self) -> usize { - self.consumed_bytes + self.current_block.as_ref().map_or(0, |b| b.consumed_bytes) + /// Consume a new [`Block`] from `self.values`. + fn consume_block(&mut self) { + // @NOTE: All the panics here should be prevented in the `Decoder::try_new`. + + debug_assert!(!self.values.is_empty()); + + let values_per_miniblock = self.values_per_miniblock(); + + let length = usize::min(self.values_remaining, self.values_per_block); + let actual_num_miniblocks = usize::min( + self.num_miniblocks_per_block, + length.div_ceil(values_per_miniblock), + ); + + debug_assert!(actual_num_miniblocks > 0); + + // + + let (min_delta, consumed) = zigzag_leb128::decode(self.values); + + self.values = &self.values[consumed..]; + let (bitwidths, remainder) = self.values.split_at(self.num_miniblocks_per_block); + + let first_bitwidth = bitwidths[0]; + let bitwidths = &bitwidths[1..actual_num_miniblocks]; + debug_assert!(first_bitwidth <= MAX_BITWIDTH); + let first_bitwidth = first_bitwidth as usize; + + let values_in_first_miniblock = usize::min(length, values_per_miniblock); + let num_allocated_bytes = (first_bitwidth * values_per_miniblock).div_ceil(8); + let num_actual_bytes = (first_bitwidth * values_in_first_miniblock).div_ceil(8); + let (bytes, remainder) = remainder.split_at(num_allocated_bytes); + let bytes = &bytes[..num_actual_bytes]; + + let decoder = + bitpacked::Decoder::new_allow_zero(bytes, first_bitwidth, values_in_first_miniblock); + + self.block = Block { + min_delta, + bitwidths, + values_remaining: length, + miniblock: MiniBlock { + decoder, + // We can leave this as it should not be read before it is updated + buffered: self.block.miniblock.buffered, + unpacked_start: 0, + unpacked_end: 0, + }, + }; + + self.values_remaining -= length; + self.values = remainder; } - fn load_delta(&mut self) -> Result { - // At this point we must have at least one block and value available - let current_block = self.current_block.as_mut().unwrap(); - if let Some(x) = current_block.next() { - x - } else { - // load next block - self.values = &self.values[current_block.consumed_bytes..]; - self.consumed_bytes += current_block.consumed_bytes; - - let next_block = Block::try_new( - self.values, - self.num_mini_blocks, - self.values_per_mini_block, - self.values_remaining, + /// Gather `n` elements from the current [`MiniBlock`] to `target` + fn gather_miniblock_n_into( + &mut self, + target: &mut G::Target, + mut n: usize, + gatherer: &mut G, + ) -> ParquetResult<()> { + debug_assert!(n > 0); + debug_assert!(self.miniblock_len() >= n); + + // If the `num_bits == 0`, the delta is constant and equal to `min_delta`. The + // `bitpacked::Decoder` basically only keeps track of the length. + if self.block.miniblock.decoder.num_bits() == 0 { + let num_repeats = usize::min(self.miniblock_len(), n); + let v = self.last_value.wrapping_add(self.block.min_delta); + gatherer.gather_constant(target, v, self.block.min_delta, num_repeats)?; + self.last_value = self + .last_value + .wrapping_add(self.block.min_delta * num_repeats as i64); + self.block.miniblock.decoder.length -= num_repeats; + return Ok(()); + } + + if self.block.miniblock.unpacked_start < self.block.miniblock.unpacked_end { + let length = usize::min( + n, + self.block.miniblock.unpacked_end - self.block.miniblock.unpacked_start, ); - match next_block { - Ok(mut next_block) => { - let delta = next_block - .next() - .ok_or_else(|| ParquetError::oos("Missing block"))?; - self.current_block = Some(next_block); - delta - }, - Err(e) => Err(e), + self.block.miniblock.buffered + [self.block.miniblock.unpacked_start..self.block.miniblock.unpacked_start + length] + .iter_mut() + .for_each(|v| { + self.last_value = self + .last_value + .wrapping_add(*v as i64) + .wrapping_add(self.block.min_delta); + *v = self.last_value as u64; + }); + gatherer.gather_slice( + target, + bytemuck::cast_slice( + &self.block.miniblock.buffered[self.block.miniblock.unpacked_start + ..self.block.miniblock.unpacked_start + length], + ), + )?; + n -= length; + self.block.miniblock.unpacked_start += length; + } + + if n == 0 { + return Ok(()); + } + + const ITEMS_PER_PACK: usize = <::Unpacked as Unpacked>::LENGTH; + for _ in 0..n / ITEMS_PER_PACK { + let mut chunk = self.block.miniblock.decoder.chunked().next().unwrap(); + chunk.iter_mut().for_each(|v| { + self.last_value = self + .last_value + .wrapping_add(*v as i64) + .wrapping_add(self.block.min_delta); + *v = self.last_value as u64; + }); + gatherer.gather_chunk(target, bytemuck::cast_ref(&chunk))?; + n -= ITEMS_PER_PACK; + } + + if n == 0 { + return Ok(()); + } + + let Some((chunk, len)) = self.block.miniblock.decoder.chunked().next_inexact() else { + debug_assert_eq!(n, 0); + self.block.miniblock.buffered = ::Unpacked::zero(); + self.block.miniblock.unpacked_start = 0; + self.block.miniblock.unpacked_end = 0; + return Ok(()); + }; + + self.block.miniblock.buffered = chunk; + self.block.miniblock.unpacked_start = 0; + self.block.miniblock.unpacked_end = len; + + if n > 0 { + let length = usize::min(n, self.block.miniblock.unpacked_end); + self.block.miniblock.buffered[..length] + .iter_mut() + .for_each(|v| { + self.last_value = self + .last_value + .wrapping_add(*v as i64) + .wrapping_add(self.block.min_delta); + *v = self.last_value as u64; + }); + gatherer.gather_slice( + target, + bytemuck::cast_slice(&self.block.miniblock.buffered[..length]), + )?; + self.block.miniblock.unpacked_start = length; + } + + Ok(()) + } + + /// Gather `n` elements from the current [`Block`] to `target` + fn gather_block_n_into( + &mut self, + target: &mut G::Target, + n: usize, + gatherer: &mut G, + ) -> ParquetResult<()> { + let values_per_miniblock = self.values_per_miniblock(); + + debug_assert!(n <= self.values_per_block); + debug_assert!(self.values_per_block >= values_per_miniblock); + debug_assert_eq!(self.values_per_block % values_per_miniblock, 0); + + let mut n = usize::min(self.block.values_remaining, n); + + if n == 0 { + return Ok(()); + } + + let miniblock_len = self.miniblock_len(); + if n < miniblock_len { + self.gather_miniblock_n_into(target, n, gatherer)?; + debug_assert_eq!(self.miniblock_len(), miniblock_len - n); + self.block.values_remaining -= n; + return Ok(()); + } + + if miniblock_len > 0 { + self.gather_miniblock_n_into(target, miniblock_len, gatherer)?; + n -= miniblock_len; + self.block.values_remaining -= miniblock_len; + } + + while n >= values_per_miniblock { + let bitwidth = self.block.bitwidths[0]; + self.block.bitwidths = &self.block.bitwidths[1..]; + + let miniblock; + (miniblock, self.values) = self + .values + .split_at((bitwidth as usize * values_per_miniblock).div_ceil(8)); + gather_miniblock( + target, + self.block.min_delta, + bitwidth, + miniblock, + values_per_miniblock, + &mut self.last_value, + gatherer, + )?; + n -= values_per_miniblock; + self.block.values_remaining -= values_per_miniblock; + } + + if n == 0 { + return Ok(()); + } + + if !self.block.bitwidths.is_empty() { + let bitwidth = self.block.bitwidths[0]; + self.block.bitwidths = &self.block.bitwidths[1..]; + + if bitwidth > MAX_BITWIDTH { + return Err(ParquetError::oos(format!( + "Delta encoding bitwidth '{bitwidth}' is larger than maximum {MAX_BITWIDTH})" + ))); + } + + let length = usize::min(values_per_miniblock, self.block.values_remaining); + + let num_allocated_bytes = (bitwidth as usize * values_per_miniblock).div_ceil(8); + let num_actual_bytes = (bitwidth as usize * length).div_ceil(8); + + let miniblock; + (miniblock, self.values) = + self.values + .split_at_checked(num_allocated_bytes) + .ok_or(ParquetError::oos( + "Not enough space for delta encoded miniblock", + ))?; + + let miniblock = &miniblock[..num_actual_bytes]; + + let decoder = + bitpacked::Decoder::try_new_allow_zero(miniblock, bitwidth as usize, length)?; + self.block.miniblock = MiniBlock { + decoder, + buffered: self.block.miniblock.buffered, + unpacked_start: 0, + unpacked_end: 0, + }; + + if n > 0 { + self.gather_miniblock_n_into(target, n, gatherer)?; + self.block.values_remaining -= n; } } + + Ok(()) } -} -impl<'a> Iterator for Decoder<'a> { - type Item = Result; + /// Gather `n` elements to `target` + pub fn gather_n_into( + &mut self, + target: &mut G::Target, + mut n: usize, + gatherer: &mut G, + ) -> ParquetResult<()> { + n = usize::min(n, self.len()); + + if n == 0 { + return Ok(()); + } + + let values_per_miniblock = self.values_per_block / self.num_miniblocks_per_block; - fn next(&mut self) -> Option { - if self.values_remaining == 0 { - return None; + let start_num_values_remaining = self.block.values_remaining; + if n <= self.block.values_remaining { + self.gather_block_n_into(target, n, gatherer)?; + debug_assert_eq!(self.block.values_remaining, start_num_values_remaining - n); + return Ok(()); } - let result = Some(Ok(self.next_value)); + n -= self.block.values_remaining; + self.gather_block_n_into(target, self.block.values_remaining, gatherer)?; + debug_assert_eq!(self.block.values_remaining, 0); - self.values_remaining -= 1; - if self.values_remaining == 0 { - // do not try to load another block - return result; + while usize::min(n, self.values_remaining) >= self.values_per_block { + self.values = gather_block( + target, + self.num_miniblocks_per_block, + values_per_miniblock, + self.values, + &mut self.last_value, + gatherer, + )?; + n -= self.values_per_block; + self.values_remaining -= self.values_per_block; } - let delta = match self.load_delta() { - Ok(delta) => delta, - Err(e) => return Some(Err(e)), - }; + if n == 0 { + return Ok(()); + } + + self.consume_block(); + self.gather_block_n_into(target, n, gatherer)?; + + Ok(()) + } + + pub fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> { + let mut gatherer = SkipGatherer; + self.gather_n_into(&mut 0usize, n, &mut gatherer) + } + + #[cfg(test)] + pub(crate) fn collect_n>( + &mut self, + e: &mut E, + n: usize, + ) -> ParquetResult<()> { + struct ExtendGatherer<'a, E: std::fmt::Debug + Extend>( + std::marker::PhantomData<&'a E>, + ); + + impl<'a, E: std::fmt::Debug + Extend> DeltaGatherer for ExtendGatherer<'a, E> { + type Target = (usize, &'a mut E); + + fn target_len(&self, target: &Self::Target) -> usize { + target.0 + } + + fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {} + + fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> { + target.1.extend(Some(v)); + target.0 += 1; + Ok(()) + } + } + + let mut gatherer = ExtendGatherer(std::marker::PhantomData); + let mut target = (0, e); + + self.gather_n_into(&mut target, n, &mut gatherer) + } + + #[cfg(test)] + pub(crate) fn collect + Default>( + mut self, + ) -> ParquetResult { + let mut e = E::default(); + self.collect_n(&mut e, self.len())?; + Ok(e) + } + + pub fn len(&self) -> usize { + self.values_remaining + self.block.values_remaining + } - self.next_value += delta; - result + fn values_per_miniblock(&self) -> usize { + debug_assert_eq!(self.values_per_block % self.num_miniblocks_per_block, 0); + self.values_per_block / self.num_miniblocks_per_block } - fn size_hint(&self) -> (usize, Option) { - (self.values_remaining, Some(self.values_remaining)) + fn miniblock_len(&self) -> usize { + self.block.miniblock.unpacked_end - self.block.miniblock.unpacked_start + + self.block.miniblock.decoder.len() } } @@ -259,11 +824,11 @@ mod tests { // first_value: 2 <=z> 1 let data = &[128, 1, 4, 1, 2]; - let mut decoder = Decoder::try_new(data).unwrap(); - let r = decoder.by_ref().collect::, _>>().unwrap(); + let (decoder, rem) = Decoder::try_new(data).unwrap(); + let r = decoder.collect::>().unwrap(); assert_eq!(&r[..], &[1]); - assert_eq!(decoder.consumed_bytes(), 5); + assert_eq!(data.len() - rem.len(), 5); } #[test] @@ -280,12 +845,12 @@ mod tests { // bit_width: 0 let data = &[128, 1, 4, 5, 2, 2, 0, 0, 0, 0]; - let mut decoder = Decoder::try_new(data).unwrap(); - let r = decoder.by_ref().collect::, _>>().unwrap(); + let (decoder, rem) = Decoder::try_new(data).unwrap(); + let r = decoder.collect::>().unwrap(); assert_eq!(expected, r); - assert_eq!(decoder.consumed_bytes(), 10); + assert_eq!(data.len() - rem.len(), 10); } #[test] @@ -311,11 +876,11 @@ mod tests { 1, 2, 3, ]; - let mut decoder = Decoder::try_new(data).unwrap(); - let r = decoder.by_ref().collect::, _>>().unwrap(); + let (decoder, rem) = Decoder::try_new(data).unwrap(); + let r = decoder.collect::>().unwrap(); assert_eq!(expected, r); - assert_eq!(decoder.consumed_bytes(), data.len() - 3); + assert_eq!(rem, &[1, 2, 3]); } #[test] @@ -357,10 +922,11 @@ mod tests { -2, 2, 6, 10, 14, 18, 22, 26, 30, 34, 38, 42, 46, 50, ]; - let mut decoder = Decoder::try_new(data).unwrap(); - let r = decoder.by_ref().collect::, _>>().unwrap(); + let (decoder, rem) = Decoder::try_new(data).unwrap(); + let r = decoder.collect::>().unwrap(); assert_eq!(&expected[..], &r[..]); - assert_eq!(decoder.consumed_bytes(), data.len() - 3); + assert_eq!(data.len() - rem.len(), data.len() - 3); + assert_eq!(rem.len(), 3); } } diff --git a/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/encoder.rs b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/encoder.rs index 9bdb861504d1..24b6ea6523b8 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/encoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/encoder.rs @@ -5,49 +5,60 @@ use crate::parquet::encoding::ceil8; /// # Implementation /// * This function does not allocate on the heap. /// * The number of mini-blocks is always 1. This may change in the future. -pub fn encode>(mut iterator: I, buffer: &mut Vec) { - let block_size = 128; - let mini_blocks = 1; +pub fn encode>( + mut iterator: I, + buffer: &mut Vec, + num_miniblocks_per_block: usize, +) { + const BLOCK_SIZE: usize = 256; + assert!([1, 2, 4].contains(&num_miniblocks_per_block)); + let values_per_miniblock = BLOCK_SIZE / num_miniblocks_per_block; let mut container = [0u8; 10]; - let encoded_len = uleb128::encode(block_size, &mut container); + let encoded_len = uleb128::encode(BLOCK_SIZE as u64, &mut container); buffer.extend_from_slice(&container[..encoded_len]); - let encoded_len = uleb128::encode(mini_blocks, &mut container); + let encoded_len = uleb128::encode(num_miniblocks_per_block as u64, &mut container); buffer.extend_from_slice(&container[..encoded_len]); - let length = iterator.size_hint().1.unwrap(); + let length = iterator.len(); let encoded_len = uleb128::encode(length as u64, &mut container); buffer.extend_from_slice(&container[..encoded_len]); - let mut values = [0i64; 128]; - let mut deltas = [0u64; 128]; + let mut values = [0i64; BLOCK_SIZE]; + let mut deltas = [0u64; BLOCK_SIZE]; + let mut num_bits = [0u8; 4]; let first_value = iterator.next().unwrap_or_default(); let (container, encoded_len) = zigzag_leb128::encode(first_value); buffer.extend_from_slice(&container[..encoded_len]); let mut prev = first_value; - let mut length = iterator.size_hint().1.unwrap(); + let mut length = iterator.len(); while length != 0 { let mut min_delta = i64::MAX; let mut max_delta = i64::MIN; - let mut num_bits = 0; - for (i, integer) in (0..128).zip(&mut iterator) { - let delta = integer - prev; + for (i, integer) in iterator.by_ref().enumerate().take(BLOCK_SIZE) { + if i % values_per_miniblock == 0 { + min_delta = i64::MAX; + max_delta = i64::MIN + } + + let delta = integer.wrapping_sub(prev); min_delta = min_delta.min(delta); max_delta = max_delta.max(delta); - num_bits = 64 - (max_delta - min_delta).leading_zeros(); + let miniblock_idx = i / values_per_miniblock; + num_bits[miniblock_idx] = (64 - max_delta.abs_diff(min_delta).leading_zeros()) as u8; values[i] = delta; prev = integer; } - let consumed = std::cmp::min(length - iterator.size_hint().1.unwrap(), 128); - length = iterator.size_hint().1.unwrap(); + let consumed = std::cmp::min(length - iterator.len(), BLOCK_SIZE); + length = iterator.len(); let values = &values[..consumed]; values.iter().zip(deltas.iter_mut()).for_each(|(v, delta)| { - *delta = (v - min_delta) as u64; + *delta = v.wrapping_sub(min_delta) as u64; }); // @@ -55,19 +66,32 @@ pub fn encode>(mut iterator: I, buffer: &mut Vec) { buffer.extend_from_slice(&container[..encoded_len]); // one miniblock => 1 byte - buffer.push(num_bits as u8); - write_miniblock(buffer, num_bits as usize, deltas); + let mut values_remaining = consumed; + buffer.extend_from_slice(&num_bits[..num_miniblocks_per_block]); + for i in 0..num_miniblocks_per_block { + if values_remaining == 0 { + break; + } + + values_remaining = values_remaining.saturating_sub(values_per_miniblock); + write_miniblock( + buffer, + num_bits[i], + &deltas[i * values_per_miniblock..(i + 1) * values_per_miniblock], + ); + } } } -fn write_miniblock(buffer: &mut Vec, num_bits: usize, deltas: [u64; 128]) { +fn write_miniblock(buffer: &mut Vec, num_bits: u8, deltas: &[u64]) { + let num_bits = num_bits as usize; if num_bits > 0 { let start = buffer.len(); // bitpack encode all (deltas.len = 128 which is a multiple of 32) let bytes_needed = start + ceil8(deltas.len() * num_bits); buffer.resize(bytes_needed, 0); - bitpacked::encode(deltas.as_ref(), num_bits, &mut buffer[start..]); + bitpacked::encode(deltas, num_bits, &mut buffer[start..]); let bytes_needed = start + ceil8(deltas.len() * num_bits); buffer.truncate(bytes_needed); @@ -80,8 +104,8 @@ mod tests { #[test] fn constant_delta() { - // header: [128, 1, 1, 5, 2]: - // block size: 128 <=u> 128, 1 + // header: [128, 2, 1, 5, 2]: + // block size: 256 <=u> 128, 2 // mini-blocks: 1 <=u> 1 // elements: 5 <=u> 5 // first_value: 2 <=z> 1 @@ -89,10 +113,10 @@ mod tests { // min_delta: 1 <=z> 2 // bitwidth: 0 let data = 1..=5; - let expected = vec![128u8, 1, 1, 5, 2, 2, 0]; + let expected = vec![128u8, 2, 1, 5, 2, 2, 0]; let mut buffer = vec![]; - encode(data, &mut buffer); + encode(data.collect::>().into_iter(), &mut buffer, 1); assert_eq!(expected, buffer); } @@ -100,8 +124,8 @@ mod tests { fn negative_min_delta() { // max - min = 1 - -4 = 5 let data = vec![1, 2, 3, 4, 5, 1]; - // header: [128, 1, 4, 6, 2] - // block size: 128 <=u> 128, 1 + // header: [128, 2, 4, 6, 2] + // block size: 256 <=u> 128, 2 // mini-blocks: 1 <=u> 1 // elements: 6 <=u> 5 // first_value: 2 <=z> 1 @@ -112,11 +136,11 @@ mod tests { // 0b01101101 // 0b00001011 // ] - let mut expected = vec![128u8, 1, 1, 6, 2, 7, 3, 0b01101101, 0b00001011]; - expected.extend(std::iter::repeat(0).take(128 * 3 / 8 - 2)); // 128 values, 3 bits, 2 already used + let mut expected = vec![128u8, 2, 1, 6, 2, 7, 3, 0b01101101, 0b00001011]; + expected.extend(std::iter::repeat(0).take(256 * 3 / 8 - 2)); // 128 values, 3 bits, 2 already used let mut buffer = vec![]; - encode(data.into_iter(), &mut buffer); + encode(data.into_iter(), &mut buffer, 1); assert_eq!(expected, buffer); } } diff --git a/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/fuzz.rs b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/fuzz.rs new file mode 100644 index 000000000000..dc16bc8353fd --- /dev/null +++ b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/fuzz.rs @@ -0,0 +1,76 @@ +#[ignore = "Fuzz test. Takes too long"] +#[test] +fn fuzz_test_delta_encoding() -> Result<(), Box> { + use rand::Rng; + + use super::DeltaGatherer; + use crate::parquet::error::ParquetResult; + + struct SimpleGatherer; + + impl DeltaGatherer for SimpleGatherer { + type Target = Vec; + + fn target_len(&self, target: &Self::Target) -> usize { + target.len() + } + + fn target_reserve(&self, target: &mut Self::Target, n: usize) { + target.reserve(n); + } + + fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> { + target.push(v); + Ok(()) + } + } + + const MIN_VALUES: usize = 1; + const MAX_VALUES: usize = 515; + + const MIN: i64 = i64::MIN; + const MAX: i64 = i64::MAX; + + const NUM_ITERATIONS: usize = 1_000_000; + + let mut values = Vec::with_capacity(MAX_VALUES); + let mut rng = rand::thread_rng(); + + let mut encoded = Vec::with_capacity(MAX_VALUES); + let mut decoded = Vec::with_capacity(MAX_VALUES); + let mut gatherer = SimpleGatherer; + + for i in 0..NUM_ITERATIONS { + values.clear(); + + let num_values = rng.gen_range(MIN_VALUES..=MAX_VALUES); + values.extend(std::iter::from_fn(|| Some(rng.gen_range(MIN..=MAX))).take(num_values)); + + encoded.clear(); + decoded.clear(); + + super::encode( + values.iter().copied(), + &mut encoded, + 1 << rng.gen_range(0..=2), + ); + let (mut decoder, rem) = super::Decoder::try_new(&encoded)?; + + assert!(rem.is_empty()); + + let mut num_remaining = num_values; + while num_remaining > 0 { + let n = rng.gen_range(1usize..=num_remaining); + decoder.gather_n_into(&mut decoded, n, &mut gatherer)?; + num_remaining -= n; + } + + assert_eq!(values, decoded); + + if i % 1000 == 999 { + eprintln!("[INFO]: {} iterations done.", i + 1); + } + } + + Ok(()) +} diff --git a/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/mod.rs b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/mod.rs index 4f7922821c5f..23e67ee7fb4f 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/mod.rs @@ -1,23 +1,24 @@ mod decoder; mod encoder; +mod fuzz; -pub use decoder::Decoder; -pub use encoder::encode; +pub(crate) use decoder::{Decoder, DeltaGatherer, SumGatherer}; +pub(crate) use encoder::encode; #[cfg(test)] mod tests { use super::*; - use crate::parquet::error::ParquetError; + use crate::parquet::error::{ParquetError, ParquetResult}; #[test] fn basic() -> Result<(), ParquetError> { let data = vec![1, 3, 1, 2, 3]; let mut buffer = vec![]; - encode(data.clone().into_iter(), &mut buffer); - let iter = Decoder::try_new(&buffer)?; + encode(data.clone().into_iter(), &mut buffer, 1); + let (iter, _) = Decoder::try_new(&buffer)?; - let result = iter.collect::, _>>()?; + let result = iter.collect::>()?; assert_eq!(result, data); Ok(()) } @@ -27,10 +28,10 @@ mod tests { let data = vec![1, 3, -1, 2, 3]; let mut buffer = vec![]; - encode(data.clone().into_iter(), &mut buffer); - let iter = Decoder::try_new(&buffer)?; + encode(data.clone().into_iter(), &mut buffer, 1); + let (iter, _) = Decoder::try_new(&buffer)?; - let result = iter.collect::, _>>()?; + let result = iter.collect::>()?; assert_eq!(result, data); Ok(()) } @@ -48,10 +49,10 @@ mod tests { ]; let mut buffer = vec![]; - encode(data.clone().into_iter(), &mut buffer); - let iter = Decoder::try_new(&buffer)?; + encode(data.clone().into_iter(), &mut buffer, 1); + let (iter, _) = Decoder::try_new(&buffer)?; - let result = iter.collect::, ParquetError>>()?; + let result = iter.collect::>()?; assert_eq!(result, data); Ok(()) } @@ -64,10 +65,10 @@ mod tests { } let mut buffer = vec![]; - encode(data.clone().into_iter(), &mut buffer); - let iter = Decoder::try_new(&buffer)?; + encode(data.clone().into_iter(), &mut buffer, 1); + let (iter, _) = Decoder::try_new(&buffer)?; - let result = iter.collect::, _>>()?; + let result = iter.collect::>()?; assert_eq!(result, data); Ok(()) } @@ -77,14 +78,47 @@ mod tests { let data = vec![2, 3, 1, 2, 1]; let mut buffer = vec![]; - encode(data.clone().into_iter(), &mut buffer); - let len = buffer.len(); - let mut iter = Decoder::try_new(&buffer)?; + encode(data.clone().into_iter(), &mut buffer, 1); + let (iter, _) = Decoder::try_new(&buffer)?; - let result = iter.by_ref().collect::, _>>()?; + let result = iter.collect::>()?; + assert_eq!(result, data); + + Ok(()) + } + + #[test] + fn overflow_constant() -> ParquetResult<()> { + let data = vec![i64::MIN, i64::MAX, i64::MIN, i64::MAX]; + + let mut buffer = vec![]; + encode(data.clone().into_iter(), &mut buffer, 1); + let (iter, _) = Decoder::try_new(&buffer)?; + + let result = iter.collect::>()?; + assert_eq!(result, data); + + Ok(()) + } + + #[test] + fn overflow_vary() -> ParquetResult<()> { + let data = vec![ + 0, + i64::MAX, + i64::MAX - 1, + i64::MIN + 1, + i64::MAX, + i64::MIN + 2, + ]; + + let mut buffer = vec![]; + encode(data.clone().into_iter(), &mut buffer, 1); + let (iter, _) = Decoder::try_new(&buffer)?; + + let result = iter.collect::>()?; assert_eq!(result, data); - assert_eq!(iter.consumed_bytes(), len); Ok(()) } } diff --git a/crates/polars-parquet/src/parquet/encoding/delta_byte_array/decoder.rs b/crates/polars-parquet/src/parquet/encoding/delta_byte_array/decoder.rs index 9196eaedb7c8..03889e0aa5d3 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_byte_array/decoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_byte_array/decoder.rs @@ -1,5 +1,6 @@ -use super::super::{delta_bitpacked, delta_length_byte_array}; -use crate::parquet::error::ParquetError; +use super::super::delta_bitpacked; +use crate::parquet::encoding::delta_bitpacked::SumGatherer; +use crate::parquet::error::ParquetResult; /// Decodes according to [Delta strings](https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-strings-delta_byte_array--7), /// prefixes, lengths and values @@ -7,32 +8,47 @@ use crate::parquet::error::ParquetError; /// This struct does not allocate on the heap. #[derive(Debug)] pub struct Decoder<'a> { - values: &'a [u8], - prefix_lengths: delta_bitpacked::Decoder<'a>, + pub(crate) prefix_lengths: delta_bitpacked::Decoder<'a>, + pub(crate) suffix_lengths: delta_bitpacked::Decoder<'a>, + pub(crate) values: &'a [u8], + + pub(crate) offset: usize, + pub(crate) last: Vec, } impl<'a> Decoder<'a> { - pub fn try_new(values: &'a [u8]) -> Result { - let prefix_lengths = delta_bitpacked::Decoder::try_new(values)?; + pub fn try_new(values: &'a [u8]) -> ParquetResult { + let (prefix_lengths, values) = delta_bitpacked::Decoder::try_new(values)?; + let (suffix_lengths, values) = delta_bitpacked::Decoder::try_new(values)?; + Ok(Self { - values, prefix_lengths, + suffix_lengths, + values, + + offset: 0, + last: Vec::with_capacity(32), }) } - pub fn into_lengths(self) -> Result, ParquetError> { - assert_eq!(self.prefix_lengths.size_hint().0, 0); - delta_length_byte_array::Decoder::try_new( - &self.values[self.prefix_lengths.consumed_bytes()..], - ) + pub fn values(&self) -> &'a [u8] { + self.values } -} -impl<'a> Iterator for Decoder<'a> { - type Item = Result; + pub fn len(&self) -> usize { + debug_assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len()); + self.prefix_lengths.len() + } - fn next(&mut self) -> Option { - self.prefix_lengths.next().map(|x| x.map(|x| x as u32)) + pub fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> { + let mut prefix_sum = 0usize; + self.prefix_lengths + .gather_n_into(&mut prefix_sum, n, &mut SumGatherer(0))?; + let mut suffix_sum = 0usize; + self.suffix_lengths + .gather_n_into(&mut suffix_sum, n, &mut SumGatherer(0))?; + self.offset += prefix_sum + suffix_sum; + Ok(()) } } @@ -40,8 +56,44 @@ impl<'a> Iterator for Decoder<'a> { mod tests { use super::*; + impl<'a> Iterator for Decoder<'a> { + type Item = ParquetResult>; + + fn next(&mut self) -> Option { + if self.len() == 0 { + return None; + } + + let mut prefix_length = vec![]; + let mut suffix_length = vec![]; + if let Err(e) = self.prefix_lengths.collect_n(&mut prefix_length, 1) { + return Some(Err(e)); + } + if let Err(e) = self.suffix_lengths.collect_n(&mut suffix_length, 1) { + return Some(Err(e)); + } + let prefix_length = prefix_length[0]; + let suffix_length = suffix_length[0]; + + let prefix_length = prefix_length as usize; + let suffix_length = suffix_length as usize; + + let mut value = Vec::with_capacity(prefix_length + suffix_length); + + value.extend_from_slice(&self.last[..prefix_length]); + value.extend_from_slice(&self.values[self.offset..self.offset + suffix_length]); + + self.last.clear(); + self.last.extend_from_slice(&value); + + self.offset += suffix_length; + + Some(Ok(value)) + } + } + #[test] - fn test_bla() -> Result<(), ParquetError> { + fn test_bla() -> ParquetResult<()> { // VALIDATED from spark==3.1.1 let data = &[ 128, 1, 4, 2, 0, 0, 0, 0, 0, 0, 128, 1, 4, 2, 10, 0, 0, 0, 0, 0, 72, 101, 108, 108, @@ -50,31 +102,16 @@ mod tests { // because they are beyond the sum of all lengths. 1, 2, 3, ]; - // result of encoding - let expected = &["Hello", "World"]; - let expected_lengths = expected.iter().map(|x| x.len() as i32).collect::>(); - let expected_prefixes = vec![0, 0]; - let expected_values = expected.join(""); - let expected_values = expected_values.as_bytes(); - - let mut decoder = Decoder::try_new(data)?; - let prefixes = decoder.by_ref().collect::, _>>()?; - assert_eq!(prefixes, expected_prefixes); - - // move to the lengths - let mut decoder = decoder.into_lengths()?; - - let lengths = decoder.by_ref().collect::, _>>()?; - assert_eq!(lengths, expected_lengths); - - // move to the values - let values = decoder.values(); - assert_eq!(values, expected_values); + + let decoder = Decoder::try_new(data)?; + let values = decoder.collect::, _>>()?; + assert_eq!(values, vec![b"Hello".to_vec(), b"World".to_vec()]); + Ok(()) } #[test] - fn test_with_prefix() -> Result<(), ParquetError> { + fn test_with_prefix() -> ParquetResult<()> { // VALIDATED from spark==3.1.1 let data = &[ 128, 1, 4, 2, 0, 6, 0, 0, 0, 0, 128, 1, 4, 2, 10, 4, 0, 0, 0, 0, 72, 101, 108, 108, @@ -83,24 +120,11 @@ mod tests { // because they are beyond the sum of all lengths. 1, 2, 3, ]; - // result of encoding - let expected_lengths = vec![5, 7]; - let expected_prefixes = vec![0, 3]; - let expected_values = b"Helloicopter"; - - let mut decoder = Decoder::try_new(data)?; - let prefixes = decoder.by_ref().collect::, _>>()?; - assert_eq!(prefixes, expected_prefixes); - - // move to the lengths - let mut decoder = decoder.into_lengths()?; - let lengths = decoder.by_ref().collect::, _>>()?; - assert_eq!(lengths, expected_lengths); + let decoder = Decoder::try_new(data)?; + let prefixes = decoder.collect::, _>>()?; + assert_eq!(prefixes, vec![b"Hello".to_vec(), b"Helicopter".to_vec()]); - // move to the values - let values = decoder.values(); - assert_eq!(values, expected_values); Ok(()) } } diff --git a/crates/polars-parquet/src/parquet/encoding/delta_byte_array/encoder.rs b/crates/polars-parquet/src/parquet/encoding/delta_byte_array/encoder.rs index 1e9e071c87be..3a36e90b9966 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_byte_array/encoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_byte_array/encoder.rs @@ -2,7 +2,10 @@ use super::super::delta_bitpacked; use crate::parquet::encoding::delta_length_byte_array; /// Encodes an iterator of according to DELTA_BYTE_ARRAY -pub fn encode<'a, I: Iterator + Clone>(iterator: I, buffer: &mut Vec) { +pub fn encode<'a, I: ExactSizeIterator + Clone>( + iterator: I, + buffer: &mut Vec, +) { let mut previous = b"".as_ref(); let mut sum_lengths = 0; @@ -22,7 +25,7 @@ pub fn encode<'a, I: Iterator + Clone>(iterator: I, buffer: &mu prefix_length as i64 }) .collect::>(); - delta_bitpacked::encode(prefixes.iter().copied(), buffer); + delta_bitpacked::encode(prefixes.iter().copied(), buffer, 1); let remaining = iterator .zip(prefixes) diff --git a/crates/polars-parquet/src/parquet/encoding/delta_byte_array/mod.rs b/crates/polars-parquet/src/parquet/encoding/delta_byte_array/mod.rs index b5927ab95b58..2bb51511d67e 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_byte_array/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_byte_array/mod.rs @@ -17,13 +17,7 @@ mod tests { let mut decoder = Decoder::try_new(&buffer)?; let prefixes = decoder.by_ref().collect::, _>>()?; - assert_eq!(prefixes, vec![0, 3]); - - // move to the lengths - let mut decoder = decoder.into_lengths()?; - - let lengths = decoder.by_ref().collect::, _>>()?; - assert_eq!(lengths, vec![5, 7]); + assert_eq!(prefixes, vec![b"Hello".to_vec(), b"Helicopter".to_vec()]); // move to the values let values = decoder.values(); diff --git a/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/decoder.rs b/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/decoder.rs index bd9a77a00add..b3191e0a51ff 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/decoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/decoder.rs @@ -1,80 +1,57 @@ use super::super::delta_bitpacked; -use crate::parquet::error::ParquetError; +use crate::parquet::encoding::delta_bitpacked::SumGatherer; +use crate::parquet::error::ParquetResult; /// Decodes [Delta-length byte array](https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-length-byte-array-delta_length_byte_array--6) /// lengths and values. /// # Implementation /// This struct does not allocate on the heap. -/// # Example -/// ``` -/// use polars_parquet::parquet::encoding::delta_length_byte_array::Decoder; -/// -/// let expected = &["Hello", "World"]; -/// let expected_lengths = expected.iter().map(|x| x.len() as i32).collect::>(); -/// let expected_values = expected.join(""); -/// let expected_values = expected_values.as_bytes(); -/// let data = &[ -/// 128, 1, 4, 2, 10, 0, 0, 0, 0, 0, 72, 101, 108, 108, 111, 87, 111, 114, 108, 100, -/// ]; -/// -/// let mut decoder = Decoder::try_new(data).unwrap(); -/// -/// // Extract the lengths -/// let lengths = decoder.by_ref().collect::, _>>().unwrap(); -/// assert_eq!(lengths, expected_lengths); -/// -/// // Extract the values. This _must_ be called after consuming all lengths by reference (see above). -/// let values = decoder.into_values(); -/// -/// assert_eq!(values, expected_values); #[derive(Debug)] -pub struct Decoder<'a> { - values: &'a [u8], - lengths: delta_bitpacked::Decoder<'a>, - total_length: u32, +pub(crate) struct Decoder<'a> { + pub(crate) lengths: delta_bitpacked::Decoder<'a>, + pub(crate) values: &'a [u8], + pub(crate) offset: usize, } impl<'a> Decoder<'a> { - pub fn try_new(values: &'a [u8]) -> Result { - let lengths = delta_bitpacked::Decoder::try_new(values)?; + pub fn try_new(values: &'a [u8]) -> ParquetResult { + let (lengths, values) = delta_bitpacked::Decoder::try_new(values)?; Ok(Self { - values, lengths, - total_length: 0, + values, + offset: 0, }) } - /// Consumes this decoder and returns the slice of concatenated values. - /// # Panics - /// This function panics if this iterator has not been fully consumed. - pub fn into_values(self) -> &'a [u8] { - assert_eq!(self.lengths.size_hint().0, 0); - let start = self.lengths.consumed_bytes(); - &self.values[start..start + self.total_length as usize] + pub(crate) fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> { + let mut sum = 0usize; + self.lengths + .gather_n_into(&mut sum, n, &mut SumGatherer(0))?; + self.offset += sum; + Ok(()) } - /// Returns the slice of concatenated values. - /// # Panics - /// This function panics if this iterator has not yet been fully consumed. - pub fn values(&self) -> &'a [u8] { - assert_eq!(self.lengths.size_hint().0, 0); - let start = self.lengths.consumed_bytes(); - &self.values[start..start + self.total_length as usize] + pub fn len(&self) -> usize { + self.lengths.len() } } +#[cfg(test)] impl<'a> Iterator for Decoder<'a> { - type Item = Result; + type Item = ParquetResult<&'a [u8]>; fn next(&mut self) -> Option { - let result = self.lengths.next(); - match result { - Some(Ok(v)) => { - self.total_length += v as u32; - Some(Ok(v as i32)) - }, - Some(Err(error)) => Some(Err(error)), - None => None, + if self.lengths.len() == 0 { + return None; + } + + let mut length = vec![]; + if let Err(e) = self.lengths.collect_n(&mut length, 1) { + return Some(Err(e)); } + let length = length[0] as usize; + let value = &self.values[self.offset..self.offset + length]; + self.offset += length; + Some(Ok(value)) } } diff --git a/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/encoder.rs b/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/encoder.rs index fc2121cf68e8..d768b10c24f3 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/encoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/encoder.rs @@ -4,7 +4,10 @@ use crate::parquet::encoding::delta_bitpacked; /// # Implementation /// This encoding is equivalent to call [`delta_bitpacked::encode`] on the lengths of the items /// of the iterator followed by extending the buffer from each item of the iterator. -pub fn encode, I: Iterator + Clone>(iterator: I, buffer: &mut Vec) { +pub fn encode, I: ExactSizeIterator + Clone>( + iterator: I, + buffer: &mut Vec, +) { let mut total_length = 0; delta_bitpacked::encode( iterator.clone().map(|x| { @@ -13,6 +16,7 @@ pub fn encode, I: Iterator + Clone>(iterator: I, buffer len as i64 }), buffer, + 1, ); buffer.reserve(total_length); iterator.for_each(|x| buffer.extend(x.as_ref())) diff --git a/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/mod.rs b/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/mod.rs index 35b5bd9fd5fb..050ac766f545 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_length_byte_array/mod.rs @@ -1,8 +1,8 @@ mod decoder; mod encoder; -pub use decoder::Decoder; -pub use encoder::encode; +pub(crate) use decoder::Decoder; +pub(crate) use encoder::encode; #[cfg(test)] mod tests { @@ -19,9 +19,18 @@ mod tests { let mut iter = Decoder::try_new(&buffer)?; let result = iter.by_ref().collect::, _>>()?; - assert_eq!(result, vec![2, 3, 1, 2, 1]); - - let result = iter.values(); + assert_eq!( + result, + vec![ + b"aa".as_ref(), + b"bbb".as_ref(), + b"a".as_ref(), + b"aa".as_ref(), + b"b".as_ref() + ] + ); + + let result = iter.values; assert_eq!(result, b"aabbbaaab".as_ref()); Ok(()) } @@ -32,8 +41,11 @@ mod tests { for i in 0..136 { data.push(format!("a{}", i)) } - let expected_values = data.join(""); - let expected_lengths = data.iter().map(|x| x.len() as i32).collect::>(); + + let expected = data + .iter() + .map(|v| v.as_bytes().to_vec()) + .collect::>(); let mut buffer = vec![]; encode(data.into_iter(), &mut buffer); @@ -41,10 +53,8 @@ mod tests { let mut iter = Decoder::try_new(&buffer)?; let result = iter.by_ref().collect::, _>>()?; - assert_eq!(result, expected_lengths); + assert_eq!(result, expected); - let result = iter.into_values(); - assert_eq!(result, expected_values.as_bytes()); Ok(()) } } diff --git a/crates/polars/tests/it/io/parquet/read/primitive_nested.rs b/crates/polars/tests/it/io/parquet/read/primitive_nested.rs index e4abd2046432..36fdb254420a 100644 --- a/crates/polars/tests/it/io/parquet/read/primitive_nested.rs +++ b/crates/polars/tests/it/io/parquet/read/primitive_nested.rs @@ -1,6 +1,7 @@ +use polars_parquet::parquet::encoding::bitpacked::{Unpackable, Unpacked}; use polars_parquet::parquet::encoding::hybrid_rle::HybridRleDecoder; use polars_parquet::parquet::encoding::{bitpacked, uleb128, Encoding}; -use polars_parquet::parquet::error::ParquetError; +use polars_parquet::parquet::error::{ParquetError, ParquetResult}; use polars_parquet::parquet::page::{split_buffer, DataPage, EncodedSplitBuffer}; use polars_parquet::parquet::read::levels::get_bit_width; use polars_parquet::parquet::types::NativeType; @@ -171,6 +172,51 @@ pub fn page_to_array( } } +pub struct DecoderIter<'a, T: Unpackable> { + pub(crate) decoder: bitpacked::Decoder<'a, T>, + pub(crate) buffered: T::Unpacked, + pub(crate) unpacked_start: usize, + pub(crate) unpacked_end: usize, +} + +impl<'a, T: Unpackable> Iterator for DecoderIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + if self.unpacked_start >= self.unpacked_end { + let length; + (self.buffered, length) = self.decoder.chunked().next_inexact()?; + debug_assert!(length > 0); + self.unpacked_start = 1; + self.unpacked_end = length; + return Some(self.buffered[0]); + } + + let v = self.buffered[self.unpacked_start]; + self.unpacked_start += 1; + Some(v) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.decoder.len() + self.unpacked_end - self.unpacked_start; + (len, Some(len)) + } +} + +impl<'a, T: Unpackable> ExactSizeIterator for DecoderIter<'a, T> {} + +impl<'a, T: Unpackable> DecoderIter<'a, T> { + pub fn new(packed: &'a [u8], num_bits: usize, length: usize) -> ParquetResult { + assert!(num_bits > 0); + Ok(Self { + decoder: bitpacked::Decoder::try_new(packed, num_bits, length)?, + buffered: T::Unpacked::zero(), + unpacked_start: 0, + unpacked_end: 0, + }) + } +} + fn read_dict_array( rep_levels: &[u8], def_levels: &[u8], @@ -188,8 +234,7 @@ fn read_dict_array( let (_, consumed) = uleb128::decode(values); let values = &values[consumed..]; - let indices = bitpacked::Decoder::::try_new(values, bit_width as usize, length as usize)? - .collect_into_iter(); + let indices = DecoderIter::::new(values, bit_width as usize, length as usize)?; let values = indices.map(|id| dict_values[id as usize]); diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 69a2d1aa230d..451c9f693cda 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -13,6 +13,7 @@ import pyarrow.parquet as pq import pytest from hypothesis import HealthCheck, given, settings +from hypothesis import strategies as st import polars as pl from polars.exceptions import ComputeError @@ -1414,3 +1415,84 @@ def test_null_array_dict_pages_18085() -> None: test.to_parquet(f) f.seek(0) pl.read_parquet(f) + + +@given( + df=dataframes( + min_size=1, + max_size=1000, + allowed_dtypes=[ + pl.List, + pl.Int8, + pl.Int16, + pl.Int32, + pl.Int64, + pl.UInt8, + pl.UInt16, + pl.UInt32, + pl.UInt64, + ], + ), + row_group_size=st.integers(min_value=10, max_value=1000), +) +def test_delta_encoding_roundtrip(df: pl.DataFrame, row_group_size: int) -> None: + print(df.schema) + print(df) + + f = io.BytesIO() + pq.write_table( + df.to_arrow(), + f, + compression="NONE", + use_dictionary=False, + column_encoding="DELTA_BINARY_PACKED", + write_statistics=False, + row_group_size=row_group_size, + ) + + f.seek(0) + assert_frame_equal(pl.read_parquet(f), df) + + +@given( + df=dataframes(min_size=1, max_size=1000, allowed_dtypes=[pl.String, pl.Binary]), + row_group_size=st.integers(min_value=10, max_value=1000), +) +def test_delta_length_byte_array_encoding_roundtrip( + df: pl.DataFrame, row_group_size: int +) -> None: + f = io.BytesIO() + pq.write_table( + df.to_arrow(), + f, + compression="NONE", + use_dictionary=False, + column_encoding="DELTA_LENGTH_BYTE_ARRAY", + write_statistics=False, + row_group_size=row_group_size, + ) + + f.seek(0) + assert_frame_equal(pl.read_parquet(f), df) + + +@given( + df=dataframes(min_size=1, max_size=1000, allowed_dtypes=[pl.String, pl.Binary]), + row_group_size=st.integers(min_value=10, max_value=1000), +) +def test_delta_strings_encoding_roundtrip( + df: pl.DataFrame, row_group_size: int +) -> None: + f = io.BytesIO() + pq.write_table( + df.to_arrow(), + f, + compression="NONE", + use_dictionary=False, + column_encoding="DELTA_BYTE_ARRAY", + write_statistics=False, + row_group_size=row_group_size, + ) + + f.seek(0) + assert_frame_equal(pl.read_parquet(f), df)