Skip to content

Commit

Permalink
Support for Int RLE v1 encoding (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey authored Nov 5, 2023
1 parent 9f6c216 commit 2b7a2b0
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 35 deletions.
7 changes: 6 additions & 1 deletion src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use arrow::array::{
};
use arrow::datatypes::{
Date32Type, Int16Type, Int32Type, Int64Type, Schema, SchemaRef, TimestampNanosecondType,
UInt64Type,
};
use arrow::error::ArrowError;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
Expand Down Expand Up @@ -51,6 +52,10 @@ impl<R: Read> ArrowReader<R> {
batch_size,
}
}

pub fn total_row_count(&self) -> u64 {
self.cursor.reader.metadata().footer.number_of_rows()
}
}

pub fn create_arrow_schema<R>(cursor: &Cursor<R>) -> Schema {
Expand Down Expand Up @@ -288,7 +293,7 @@ impl NaiveStripeDecoder {
StringDecoder::Dictionary((indexes, dictionary)) => {
match indexes.collect_chunk(chunk).transpose()? {
Some(indexes) => {
fields.push(Arc::new(DictionaryArray::<Int64Type>::new(
fields.push(Arc::new(DictionaryArray::<UInt64Type>::new(
indexes.into(),
dictionary.clone(),
)));
Expand Down
4 changes: 4 additions & 0 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ impl Column {
pub fn kind(&self) -> crate::proto::r#type::Kind {
self.column.kind()
}

pub fn name(&self) -> &str {
&self.name
}
}

pub struct NullableIterator<T> {
Expand Down
8 changes: 4 additions & 4 deletions src/arrow_reader/column/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::error;
use crate::proto::stream::Kind;
use crate::reader::decode::rle_v2::RleReaderV2;
use crate::reader::decode::get_direct_unsigned_rle_reader;
use crate::reader::decode::variable_length::Values;
use crate::reader::decompress::Decompressor;

Expand All @@ -33,8 +33,8 @@ pub fn new_binary_iterator(column: &Column) -> error::Result<NullableIterator<Ve
let lengths = column
.stream(Kind::Length)
.transpose()?
.map(|reader| Box::new(RleReaderV2::try_new(reader, false, true)))
.context(error::InvalidColumnSnafu { name: &column.name })?;
.map(|reader| get_direct_unsigned_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

Ok(NullableIterator {
present: Box::new(null_mask.into_iter()),
Expand All @@ -44,7 +44,7 @@ pub fn new_binary_iterator(column: &Column) -> error::Result<NullableIterator<Ve

pub struct DirectBinaryIterator {
values: Box<Values<Decompressor>>,
lengths: Box<dyn Iterator<Item = error::Result<i64>> + Send>,
lengths: Box<dyn Iterator<Item = error::Result<u64>> + Send>,
}

impl Iterator for DirectBinaryIterator {
Expand Down
6 changes: 3 additions & 3 deletions src/arrow_reader/column/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::rle_v2::RleReaderV2;
use crate::reader::decode::get_direct_signed_rle_reader;

pub const UNIX_EPOCH_FROM_CE: i32 = 719_163;

Expand Down Expand Up @@ -43,8 +43,8 @@ pub fn new_date_iter(column: &Column) -> Result<NullableIterator<NaiveDate>> {
let data = column
.stream(Kind::Data)
.transpose()?
.map(|reader| Box::new(RleReaderV2::try_new(reader, true, true)))
.context(error::InvalidColumnSnafu { name: &column.name })?;
.map(|reader| get_direct_signed_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

Ok(NullableIterator {
present: Box::new(present.into_iter()),
Expand Down
9 changes: 3 additions & 6 deletions src/arrow_reader/column/int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::rle_v2::RleReaderV2;
use crate::reader::decode::get_direct_signed_rle_reader;

pub fn new_i64_iter(column: &Column) -> Result<NullableIterator<i64>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;

let iter = column
.stream(Kind::Data)
.transpose()?
.map(|reader| {
Box::new(RleReaderV2::try_new(reader, true, true))
as Box<dyn Iterator<Item = Result<i64>> + Send>
})
.context(InvalidColumnSnafu { name: &column.name })?;
.map(|reader| get_direct_signed_rle_reader(column, reader))
.context(InvalidColumnSnafu { name: &column.name })??;

Ok(NullableIterator {
present: Box::new(present.into_iter()),
Expand Down
36 changes: 21 additions & 15 deletions src/arrow_reader/column/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use crate::arrow_reader::column::{Column, NullableIterator};
use crate::error::{self, Result};
use crate::proto::column_encoding::Kind as ColumnEncodingKind;
use crate::proto::stream::Kind;
use crate::reader::decode::rle_v2::RleReaderV2;
use crate::reader::decode::variable_length::Values;
use crate::reader::decode::RleVersion;
use crate::reader::decompress::Decompressor;

pub struct DirectStringIterator {
values: Box<Values<Decompressor>>,
lengths: Box<dyn Iterator<Item = Result<i64>> + Send>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
}

impl Iterator for DirectStringIterator {
Expand All @@ -35,7 +35,10 @@ impl Iterator for DirectStringIterator {
}
}

pub fn new_direct_string_iter(column: &Column) -> Result<NullableIterator<String>> {
pub fn new_direct_string_iter(
column: &Column,
rle_version: RleVersion,
) -> Result<NullableIterator<String>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;

let values = column
Expand All @@ -47,7 +50,7 @@ pub fn new_direct_string_iter(column: &Column) -> Result<NullableIterator<String
let lengths = column
.stream(Kind::Length)
.transpose()?
.map(|reader| Box::new(RleReaderV2::try_new(reader, false, true)))
.map(|reader| rle_version.get_unsigned_rle_reader(reader))
.context(error::InvalidColumnSnafu { name: &column.name })?;

Ok(NullableIterator {
Expand All @@ -56,7 +59,10 @@ pub fn new_direct_string_iter(column: &Column) -> Result<NullableIterator<String
})
}

pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterator<i64>, ArrayRef)> {
pub fn new_arrow_dict_string_decoder(
column: &Column,
rle_version: RleVersion,
) -> Result<(NullableIterator<u64>, ArrayRef)> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;

// DictionaryData
Expand All @@ -69,7 +75,7 @@ pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterato
let lengths = column
.stream(Kind::Length)
.transpose()?
.map(|reader| Box::new(RleReaderV2::try_new(reader, false, true)))
.map(|reader| rle_version.get_unsigned_rle_reader(reader))
.context(error::InvalidColumnSnafu { name: &column.name })?;
let iter = DirectStringIterator { values, lengths };

Expand All @@ -78,7 +84,7 @@ pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterato
let indexes = column
.stream(Kind::Data)
.transpose()?
.map(|reader| Box::new(RleReaderV2::try_new(reader, false, true)))
.map(|reader| rle_version.get_unsigned_rle_reader(reader))
.context(error::InvalidColumnSnafu { name: &column.name })?;

let dictionary = StringArray::from_iter(values.into_iter().map(Some));
Expand All @@ -94,19 +100,19 @@ pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterato

pub enum StringDecoder {
Direct(NullableIterator<String>),
Dictionary((NullableIterator<i64>, ArrayRef)),
Dictionary((NullableIterator<u64>, ArrayRef)),
}

impl StringDecoder {
pub fn new(column: &Column) -> Result<Self> {
match column.encoding().kind() {
ColumnEncodingKind::DirectV2 => {
Ok(StringDecoder::Direct(new_direct_string_iter(column)?))
}
ColumnEncodingKind::DictionaryV2 => Ok(StringDecoder::Dictionary(
new_arrow_dict_string_decoder(column)?,
let kind = column.encoding().kind();
match kind {
ColumnEncodingKind::Direct | ColumnEncodingKind::DirectV2 => Ok(StringDecoder::Direct(
new_direct_string_iter(column, kind.into())?,
)),
other => unimplemented!("{other:?}"),
ColumnEncodingKind::Dictionary | ColumnEncodingKind::DictionaryV2 => Ok(
StringDecoder::Dictionary(new_arrow_dict_string_decoder(column, kind.into())?),
),
}
}
}
12 changes: 6 additions & 6 deletions src/arrow_reader/column/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::rle_v2::RleReaderV2;
use crate::reader::decode::{get_direct_signed_rle_reader, get_direct_unsigned_rle_reader};

// TIMESTAMP_BASE is 1 January 2015, the base value for all timestamp values.
const TIMESTAMP_BASE: i64 = 1420070400;

pub struct TimestampIterator {
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<u64>> + Send>,
}

impl Iterator for TimestampIterator {
Expand Down Expand Up @@ -50,14 +50,14 @@ pub fn new_timestamp_iter(column: &Column) -> Result<NullableIterator<NaiveDateT
let data = column
.stream(Kind::Data)
.transpose()?
.map(|reader| Box::new(RleReaderV2::try_new(reader, true, true)))
.context(error::InvalidColumnSnafu { name: &column.name })?;
.map(|reader| get_direct_signed_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

let secondary = column
.stream(Kind::Secondary)
.transpose()?
.map(|reader| Box::new(RleReaderV2::try_new(reader, false, true)))
.context(error::InvalidColumnSnafu { name: &column.name })?;
.map(|reader| get_direct_unsigned_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

Ok(NullableIterator {
present: Box::new(present.into_iter()),
Expand Down
14 changes: 14 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow::error::ArrowError;
pub use snafu::prelude::*;
use snafu::Location;

use crate::proto;
use crate::proto::r#type::Kind;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -52,6 +53,13 @@ pub enum Error {
#[snafu(display("Invalid column : {:?}", name))]
InvalidColumn { location: Location, name: String },

#[snafu(display("Invalid encoding for column '{}': {:?}", name, encoding))]
InvalidColumnEncoding {
location: Location,
name: String,
encoding: proto::column_encoding::Kind,
},

#[snafu(display("Failed to convert to timestamp"))]
InvalidTimestamp { location: Location },

Expand Down Expand Up @@ -108,3 +116,9 @@ pub enum Error {
}

pub type Result<T> = std::result::Result<T, Error>;

impl From<Error> for ArrowError {
fn from(value: Error) -> Self {
ArrowError::ExternalError(Box::new(value))
}
}
75 changes: 75 additions & 0 deletions src/reader/decode.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,81 @@
use std::io::Read;

use crate::arrow_reader::column::Column;
use crate::error::{InvalidColumnEncodingSnafu, Result};
use crate::proto;

use self::rle_v1::{SignedRleReaderV1, UnsignedRleReaderV1};
use self::rle_v2::{RleReaderV2, UnsignedRleReaderV2};

pub mod boolean_rle;
pub mod byte_rle;
pub mod float;
pub mod rle_v1;
pub mod rle_v2;
mod util;
pub mod variable_length;

#[derive(Clone, Copy, Debug)]
pub enum RleVersion {
V1,
V2,
}

impl RleVersion {
pub fn get_unsigned_rle_reader<R: Read + Send + 'static>(
&self,
reader: R,
) -> Box<dyn Iterator<Item = Result<u64>> + Send> {
match self {
RleVersion::V1 => Box::new(UnsignedRleReaderV1::new(reader)),
RleVersion::V2 => Box::new(UnsignedRleReaderV2::try_new(reader, true)),
}
}
}

impl From<proto::column_encoding::Kind> for RleVersion {
fn from(value: proto::column_encoding::Kind) -> Self {
match value {
proto::column_encoding::Kind::Direct => Self::V1,
proto::column_encoding::Kind::Dictionary => Self::V1,
proto::column_encoding::Kind::DirectV2 => Self::V2,
proto::column_encoding::Kind::DictionaryV2 => Self::V2,
}
}
}

pub fn get_direct_signed_rle_reader<R: Read + Send + 'static>(
column: &Column,
reader: R,
) -> Result<Box<dyn Iterator<Item = Result<i64>> + Send>> {
match column.encoding().kind() {
crate::proto::column_encoding::Kind::Direct => Ok(Box::new(SignedRleReaderV1::new(reader))),
crate::proto::column_encoding::Kind::DirectV2 => {
Ok(Box::new(RleReaderV2::try_new(reader, true, true)))
}
k => InvalidColumnEncodingSnafu {
name: column.name(),
encoding: k,
}
.fail(),
}
}

pub fn get_direct_unsigned_rle_reader<R: Read + Send + 'static>(
column: &Column,
reader: R,
) -> Result<Box<dyn Iterator<Item = Result<u64>> + Send>> {
match column.encoding().kind() {
crate::proto::column_encoding::Kind::Direct => {
Ok(Box::new(UnsignedRleReaderV1::new(reader)))
}
crate::proto::column_encoding::Kind::DirectV2 => {
Ok(Box::new(UnsignedRleReaderV2::try_new(reader, true)))
}
k => InvalidColumnEncodingSnafu {
name: column.name(),
encoding: k,
}
.fail(),
}
}
Loading

0 comments on commit 2b7a2b0

Please sign in to comment.