Skip to content

Commit

Permalink
Introduce PrimitiveValueDecoder to enable batch decoding of values
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Sep 22, 2024
1 parent 60b6bc4 commit ed4cf60
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 34 deletions.
6 changes: 4 additions & 2 deletions src/array_decoder/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::cmp::Ordering;

use crate::column::get_present_vec;
use crate::encoding::decimal::UnboundedVarintStreamDecoder;
use crate::encoding::get_rle_reader;
use crate::encoding::{get_rle_reader, PrimitiveValueDecoder};
use crate::proto::stream::Kind;
use crate::stripe::Stripe;
use crate::{column::Column, error::Result};
Expand Down Expand Up @@ -65,7 +65,7 @@ pub fn new_decimal_decoder(
/// varint basis, and needs to align with type specified scale
struct DecimalScaleRepairIter {
varint_iter: Box<dyn Iterator<Item = Result<i128>> + Send>,
scale_iter: Box<dyn Iterator<Item = Result<i32>> + Send>,
scale_iter: Box<dyn PrimitiveValueDecoder<i32> + Send>,
fixed_scale: u32,
}

Expand All @@ -88,6 +88,8 @@ impl Iterator for DecimalScaleRepairIter {
}
}

impl PrimitiveValueDecoder<i128> for DecimalScaleRepairIter {}

fn fix_i128_scale(i: i128, fixed_scale: u32, varying_scale: i32) -> i128 {
// TODO: Verify with C++ impl in ORC repo, which does this cast
// Not sure why scale stream can be signed if it gets casted to unsigned anyway
Expand Down
4 changes: 2 additions & 2 deletions src/array_decoder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use snafu::ResultExt;

use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls};
use crate::column::{get_present_vec, Column};
use crate::encoding::get_unsigned_rle_reader;
use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder};
use crate::proto::stream::Kind;

use crate::error::{ArrowSnafu, Result};
Expand All @@ -35,7 +35,7 @@ use super::{array_decoder_factory, ArrayBatchDecoder};
pub struct ListArrayDecoder {
inner: Box<dyn ArrayBatchDecoder>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
lengths: Box<dyn PrimitiveValueDecoder<i64> + Send>,
field: FieldRef,
}

Expand Down
4 changes: 2 additions & 2 deletions src/array_decoder/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use snafu::ResultExt;

use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls};
use crate::column::{get_present_vec, Column};
use crate::encoding::get_unsigned_rle_reader;
use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder};
use crate::error::{ArrowSnafu, Result};
use crate::proto::stream::Kind;
use crate::stripe::Stripe;
Expand All @@ -35,7 +35,7 @@ pub struct MapArrayDecoder {
keys: Box<dyn ArrayBatchDecoder>,
values: Box<dyn ArrayBatchDecoder>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
lengths: Box<dyn PrimitiveValueDecoder<i64> + Send>,
fields: Fields,
}

Expand Down
17 changes: 8 additions & 9 deletions src/array_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray, BooleanBuilder, PrimitiveArray, PrimitiveBuilder};
use arrow::buffer::NullBuffer;
use arrow::datatypes::ArrowNativeTypeOp;
use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type};
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::datatypes::{
Expand All @@ -31,7 +32,7 @@ use crate::column::{get_present_vec, Column};
use crate::encoding::boolean::BooleanIter;
use crate::encoding::byte::ByteRleReader;
use crate::encoding::float::FloatIter;
use crate::encoding::get_rle_reader;
use crate::encoding::{get_rle_reader, PrimitiveValueDecoder};
use crate::error::{
self, ArrowSnafu, MismatchedSchemaSnafu, Result, UnexpectedSnafu, UnsupportedTypeVariantSnafu,
};
Expand All @@ -56,13 +57,13 @@ mod timestamp;
mod union;

struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
iter: Box<dyn Iterator<Item = Result<T::Native>> + Send>,
iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
}

impl<T: ArrowPrimitiveType> PrimitiveArrayDecoder<T> {
pub fn new(
iter: Box<dyn Iterator<Item = Result<T::Native>> + Send>,
iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
) -> Self {
Self { iter, present }
Expand Down Expand Up @@ -95,11 +96,9 @@ impl<T: ArrowPrimitiveType> PrimitiveArrayDecoder<T> {
Ok(array)
}
None => {
let data = self
.iter
.by_ref()
.take(batch_size)
.collect::<Result<Vec<_>>>()?;
let mut data = vec![T::Native::ZERO; batch_size];
let len = self.iter.decode(data.as_mut_slice())?;
data.truncate(len);
let array = PrimitiveArray::<T>::from_iter_values(data);
Ok(array)
}
Expand Down Expand Up @@ -139,7 +138,7 @@ impl DecimalArrayDecoder {
pub fn new(
precision: u8,
scale: i8,
iter: Box<dyn Iterator<Item = Result<i128>> + Send>,
iter: Box<dyn PrimitiveValueDecoder<i128> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
) -> Self {
let inner = PrimitiveArrayDecoder::<Decimal128Type>::new(iter, present);
Expand Down
6 changes: 3 additions & 3 deletions src/array_decoder/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use snafu::ResultExt;
use crate::array_decoder::{create_null_buffer, derive_present_vec, populate_lengths_with_nulls};
use crate::column::{get_present_vec, Column};
use crate::compression::Decompressor;
use crate::encoding::get_unsigned_rle_reader;
use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder};
use crate::error::{ArrowSnafu, IoSnafu, Result};
use crate::proto::column_encoding::Kind as ColumnEncodingKind;
use crate::proto::stream::Kind;
Expand Down Expand Up @@ -90,15 +90,15 @@ pub type BinaryArrayDecoder = GenericByteArrayDecoder<GenericBinaryType<i32>>;

pub struct GenericByteArrayDecoder<T: ByteArrayType> {
bytes: Box<Decompressor>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
lengths: Box<dyn PrimitiveValueDecoder<i64> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
phantom: PhantomData<T>,
}

impl<T: ByteArrayType> GenericByteArrayDecoder<T> {
fn new(
bytes: Box<Decompressor>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
lengths: Box<dyn PrimitiveValueDecoder<i64> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
) -> Self {
Self {
Expand Down
5 changes: 4 additions & 1 deletion src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
encoding::{
get_rle_reader, get_unsigned_rle_reader,
timestamp::{TimestampIterator, TimestampNanosecondAsDecimalIterator},
PrimitiveValueDecoder,
},
error::{MismatchedSchemaSnafu, Result},
proto::stream::Kind,
Expand Down Expand Up @@ -108,7 +109,7 @@ fn decimal128_decoder(

let iter = TimestampNanosecondAsDecimalIterator::new(seconds_since_unix_epoch, data, secondary);

let iter: Box<dyn Iterator<Item = _> + Send> = match writer_tz {
let iter: Box<dyn PrimitiveValueDecoder<i128> + Send> = match writer_tz {
Some(UTC) | None => Box::new(iter),
Some(writer_tz) => Box::new(TimestampNanosecondAsDecimalWithTzIterator(iter, writer_tz)),
};
Expand Down Expand Up @@ -314,3 +315,5 @@ impl Iterator for TimestampNanosecondAsDecimalWithTzIterator {
Some(self.next_inner(ts))
}
}

impl PrimitiveValueDecoder<i128> for TimestampNanosecondAsDecimalWithTzIterator {}
4 changes: 3 additions & 1 deletion src/encoding/byte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use bytes::{BufMut, BytesMut};
use crate::{error::Result, memory::EstimateMemory};
use std::io::Read;

use super::{util::read_u8, PrimitiveValueEncoder};
use super::{util::read_u8, PrimitiveValueDecoder, PrimitiveValueEncoder};

const MAX_LITERAL_LENGTH: usize = 128;
const MIN_REPEAT_LENGTH: usize = 3;
Expand Down Expand Up @@ -245,6 +245,8 @@ impl<R: Read> Iterator for ByteRleReader<R> {
}
}

impl<R: Read> PrimitiveValueDecoder<i8> for ByteRleReader<R> {}

#[cfg(test)]
mod tests {
use std::io::Cursor;
Expand Down
4 changes: 3 additions & 1 deletion src/encoding/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
memory::EstimateMemory,
};

use super::PrimitiveValueEncoder;
use super::{PrimitiveValueDecoder, PrimitiveValueEncoder};

/// Generically represent f32 and f64.
// TODO: figure out how to use num::traits::FromBytes instead of rolling our own?
Expand Down Expand Up @@ -72,6 +72,8 @@ impl<T: Float, R: std::io::Read> FloatIter<T, R> {
}
}

impl<T: Float, R: std::io::Read> PrimitiveValueDecoder<T> for FloatIter<T, R> {}

impl<T: Float, R: std::io::Read> Iterator for FloatIter<T, R> {
type Item = Result<T>;

Expand Down
28 changes: 26 additions & 2 deletions src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,34 @@ where
fn take_inner(&mut self) -> Bytes;
}

pub trait PrimitiveValueDecoder<V>: Iterator<Item = Result<V>> {
/// Decode out.len() values into out at a time, returning the amount of
/// values decoded successfully.
///
/// By default it relies on Iterator::next(), but hopefully this can be
/// refactored away when it is properly implemented for all the decoders.
// TODO: what about returning a &mut []? or taking &mut Vec<> as input?
// relying on return usize to indicate how many values in out are
// actually valid is probably not the best interface here.
fn decode(&mut self, out: &mut [V]) -> Result<usize> {
let mut len = 0;
for n in out.iter_mut() {
match self.next() {
Some(r) => {
*n = r?;
len += 1;
}
None => break,
};
}
Ok(len)
}
}

pub fn get_unsigned_rle_reader<R: Read + Send + 'static>(
column: &Column,
reader: R,
) -> Box<dyn Iterator<Item = Result<i64>> + Send> {
) -> Box<dyn PrimitiveValueDecoder<i64> + Send> {
match column.encoding().kind() {
ProtoColumnKind::Direct | ProtoColumnKind::Dictionary => {
Box::new(RleReaderV1::<i64, _, UnsignedEncoding>::new(reader))
Expand All @@ -92,7 +116,7 @@ pub fn get_unsigned_rle_reader<R: Read + Send + 'static>(
pub fn get_rle_reader<N: NInt, R: Read + Send + 'static>(
column: &Column,
reader: R,
) -> Result<Box<dyn Iterator<Item = Result<N>> + Send>> {
) -> Result<Box<dyn PrimitiveValueDecoder<N> + Send>> {
match column.encoding().kind() {
ProtoColumnKind::Direct => Ok(Box::new(RleReaderV1::<N, _, SignedEncoding>::new(reader))),
ProtoColumnKind::DirectV2 => Ok(Box::new(RleReaderV2::<N, _, SignedEncoding>::new(reader))),
Expand Down
4 changes: 3 additions & 1 deletion src/encoding/rle_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::error::{OutOfSpecSnafu, Result};

use super::{
util::{read_u8, read_varint_zigzagged, try_read_u8},
EncodingSign, NInt,
EncodingSign, NInt, PrimitiveValueDecoder,
};

const MAX_RUN_LENGTH: usize = 130;
Expand Down Expand Up @@ -119,6 +119,8 @@ impl<N: NInt, R: Read, S: EncodingSign> Iterator for RleReaderV1<N, R, S> {
}
}

impl<N: NInt, R: Read, S: EncodingSign> PrimitiveValueDecoder<N> for RleReaderV1<N, R, S> {}

#[cfg(test)]
mod tests {
use std::io::Cursor;
Expand Down
38 changes: 37 additions & 1 deletion src/encoding/rle_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use self::{

use super::{
util::{calculate_percentile_bits, try_read_u8},
EncodingSign, NInt, PrimitiveValueEncoder, VarintSerde,
EncodingSign, NInt, PrimitiveValueDecoder, PrimitiveValueEncoder, VarintSerde,
};

pub mod delta;
Expand Down Expand Up @@ -120,6 +120,42 @@ impl<N: NInt, R: Read, S: EncodingSign> Iterator for RleReaderV2<N, R, S> {
}
}

impl<N: NInt, R: Read, S: EncodingSign> PrimitiveValueDecoder<N> for RleReaderV2<N, R, S> {
fn decode(&mut self, out: &mut [N]) -> Result<usize> {
let available = &self.decoded_ints[self.current_head..];
// If we have enough in buffer to copy over
if available.len() >= out.len() {
out.copy_from_slice(&available[..out.len()]);
self.current_head += out.len();
return Ok(out.len());
}

// Otherwise progressively copy over chunks
let len_to_copy = out.len();
let mut copied = 0;
while copied < len_to_copy {
let copying = self.decoded_ints.len() - self.current_head;
// At most, we fill to exact length of output buffer (don't overflow)
let copying = copying.min(len_to_copy - copied);

let target_out_slice = &mut out[copied..copied + copying];
target_out_slice.copy_from_slice(
&self.decoded_ints[self.current_head..self.current_head + copying],
);

copied += copying;
self.current_head += copying;

if self.current_head == self.decoded_ints.len() {
self.current_head = 0;
self.decoded_ints.clear();
self.decode_batch()?;
}
}
Ok(copied)
}
}

struct DeltaEncodingCheckResult<N: NInt> {
base_value: N,
min: N,
Expand Down
25 changes: 16 additions & 9 deletions src/encoding/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,25 @@ use std::marker::PhantomData;
use arrow::datatypes::{ArrowTimestampType, TimeUnit};
use snafu::ensure;

use crate::error::{DecodeTimestampSnafu, Result};
use crate::{
encoding::PrimitiveValueDecoder,
error::{DecodeTimestampSnafu, Result},
};

const NANOSECONDS_IN_SECOND: i64 = 1_000_000_000;

pub struct TimestampIterator<T: ArrowTimestampType> {
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<i64>> + Send>,
data: Box<dyn PrimitiveValueDecoder<i64> + Send>,
secondary: Box<dyn PrimitiveValueDecoder<i64> + Send>,
_marker: PhantomData<T>,
}

impl<T: ArrowTimestampType> TimestampIterator<T> {
pub fn new(
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<i64>> + Send>,
data: Box<dyn PrimitiveValueDecoder<i64> + Send>,
secondary: Box<dyn PrimitiveValueDecoder<i64> + Send>,
) -> Self {
Self {
base_from_epoch,
Expand All @@ -61,20 +64,22 @@ impl<T: ArrowTimestampType> Iterator for TimestampIterator<T> {
}
}

impl<T: ArrowTimestampType> PrimitiveValueDecoder<T::Native> for TimestampIterator<T> {}

/// Arrow TimestampNanosecond type cannot represent the full datetime range of
/// the ORC Timestamp type, so this iterator provides the ability to decode the
/// raw nanoseconds without restricting it to the Arrow TimestampNanosecond range.
pub struct TimestampNanosecondAsDecimalIterator {
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<i64>> + Send>,
data: Box<dyn PrimitiveValueDecoder<i64> + Send>,
secondary: Box<dyn PrimitiveValueDecoder<i64> + Send>,
}

impl TimestampNanosecondAsDecimalIterator {
pub fn new(
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<i64>> + Send>,
data: Box<dyn PrimitiveValueDecoder<i64> + Send>,
secondary: Box<dyn PrimitiveValueDecoder<i64> + Send>,
) -> Self {
Self {
base_from_epoch,
Expand All @@ -99,6 +104,8 @@ impl Iterator for TimestampNanosecondAsDecimalIterator {
}
}

impl PrimitiveValueDecoder<i128> for TimestampNanosecondAsDecimalIterator {}

fn decode(
base: i64,
seconds_since_orc_base: Result<i64>,
Expand Down

0 comments on commit ed4cf60

Please sign in to comment.