diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index d338e6e3..48ee1c5e 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -9,6 +9,7 @@ use arrow::array::{ }; use arrow::datatypes::{ Date32Type, Int16Type, Int32Type, Int64Type, Schema, SchemaRef, TimestampNanosecondType, + UInt64Type, }; use arrow::error::ArrowError; use arrow::record_batch::{RecordBatch, RecordBatchReader}; @@ -51,6 +52,10 @@ impl ArrowReader { batch_size, } } + + pub fn total_row_count(&self) -> u64 { + self.cursor.reader.metadata().footer.number_of_rows() + } } pub fn create_arrow_schema(cursor: &Cursor) -> Schema { @@ -288,7 +293,7 @@ impl NaiveStripeDecoder { StringDecoder::Dictionary((indexes, dictionary)) => { match indexes.collect_chunk(chunk).transpose()? { Some(indexes) => { - fields.push(Arc::new(DictionaryArray::::new( + fields.push(Arc::new(DictionaryArray::::new( indexes.into(), dictionary.clone(), ))); diff --git a/src/arrow_reader/column.rs b/src/arrow_reader/column.rs index a0f453cc..5dd7cc8e 100644 --- a/src/arrow_reader/column.rs +++ b/src/arrow_reader/column.rs @@ -180,6 +180,10 @@ impl Column { pub fn kind(&self) -> crate::proto::r#type::Kind { self.column.kind() } + + pub fn name(&self) -> &str { + &self.name + } } pub struct NullableIterator { diff --git a/src/arrow_reader/column/binary.rs b/src/arrow_reader/column/binary.rs index f0e04c09..672ab5c0 100644 --- a/src/arrow_reader/column/binary.rs +++ b/src/arrow_reader/column/binary.rs @@ -18,7 +18,7 @@ use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; use crate::error; use crate::proto::stream::Kind; -use crate::reader::decode::rle_v2::RleReaderV2; +use crate::reader::decode::get_direct_unsigned_rle_reader; use crate::reader::decode::variable_length::Values; use crate::reader::decompress::Decompressor; @@ -33,8 +33,8 @@ pub fn new_binary_iterator(column: &Column) -> error::Result error::Result>, - lengths: Box> + Send>, + lengths: Box> + Send>, } impl Iterator for DirectBinaryIterator { diff --git a/src/arrow_reader/column/date.rs b/src/arrow_reader/column/date.rs index edbfd586..8f98a025 100644 --- a/src/arrow_reader/column/date.rs +++ b/src/arrow_reader/column/date.rs @@ -5,7 +5,7 @@ use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; use crate::error::{self, Result}; use crate::proto::stream::Kind; -use crate::reader::decode::rle_v2::RleReaderV2; +use crate::reader::decode::get_direct_signed_rle_reader; pub const UNIX_EPOCH_FROM_CE: i32 = 719_163; @@ -43,8 +43,8 @@ pub fn new_date_iter(column: &Column) -> Result> { let data = column .stream(Kind::Data) .transpose()? - .map(|reader| Box::new(RleReaderV2::try_new(reader, true, true))) - .context(error::InvalidColumnSnafu { name: &column.name })?; + .map(|reader| get_direct_signed_rle_reader(column, reader)) + .context(error::InvalidColumnSnafu { name: &column.name })??; Ok(NullableIterator { present: Box::new(present.into_iter()), diff --git a/src/arrow_reader/column/int.rs b/src/arrow_reader/column/int.rs index 9183dd43..b01d8834 100644 --- a/src/arrow_reader/column/int.rs +++ b/src/arrow_reader/column/int.rs @@ -4,7 +4,7 @@ use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; use crate::error::{InvalidColumnSnafu, Result}; use crate::proto::stream::Kind; -use crate::reader::decode::rle_v2::RleReaderV2; +use crate::reader::decode::get_direct_signed_rle_reader; pub fn new_i64_iter(column: &Column) -> Result> { let present = new_present_iter(column)?.collect::>>()?; @@ -12,11 +12,8 @@ pub fn new_i64_iter(column: &Column) -> Result> { let iter = column .stream(Kind::Data) .transpose()? - .map(|reader| { - Box::new(RleReaderV2::try_new(reader, true, true)) - as Box> + Send> - }) - .context(InvalidColumnSnafu { name: &column.name })?; + .map(|reader| get_direct_signed_rle_reader(column, reader)) + .context(InvalidColumnSnafu { name: &column.name })??; Ok(NullableIterator { present: Box::new(present.into_iter()), diff --git a/src/arrow_reader/column/string.rs b/src/arrow_reader/column/string.rs index 556a60a6..51b5744a 100644 --- a/src/arrow_reader/column/string.rs +++ b/src/arrow_reader/column/string.rs @@ -8,13 +8,13 @@ use crate::arrow_reader::column::{Column, NullableIterator}; use crate::error::{self, Result}; use crate::proto::column_encoding::Kind as ColumnEncodingKind; use crate::proto::stream::Kind; -use crate::reader::decode::rle_v2::RleReaderV2; use crate::reader::decode::variable_length::Values; +use crate::reader::decode::RleVersion; use crate::reader::decompress::Decompressor; pub struct DirectStringIterator { values: Box>, - lengths: Box> + Send>, + lengths: Box> + Send>, } impl Iterator for DirectStringIterator { @@ -35,7 +35,10 @@ impl Iterator for DirectStringIterator { } } -pub fn new_direct_string_iter(column: &Column) -> Result> { +pub fn new_direct_string_iter( + column: &Column, + rle_version: RleVersion, +) -> Result> { let present = new_present_iter(column)?.collect::>>()?; let values = column @@ -47,7 +50,7 @@ pub fn new_direct_string_iter(column: &Column) -> Result Result Result<(NullableIterator, ArrayRef)> { +pub fn new_arrow_dict_string_decoder( + column: &Column, + rle_version: RleVersion, +) -> Result<(NullableIterator, ArrayRef)> { let present = new_present_iter(column)?.collect::>>()?; // DictionaryData @@ -69,7 +75,7 @@ pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterato let lengths = column .stream(Kind::Length) .transpose()? - .map(|reader| Box::new(RleReaderV2::try_new(reader, false, true))) + .map(|reader| rle_version.get_unsigned_rle_reader(reader)) .context(error::InvalidColumnSnafu { name: &column.name })?; let iter = DirectStringIterator { values, lengths }; @@ -78,7 +84,7 @@ pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterato let indexes = column .stream(Kind::Data) .transpose()? - .map(|reader| Box::new(RleReaderV2::try_new(reader, false, true))) + .map(|reader| rle_version.get_unsigned_rle_reader(reader)) .context(error::InvalidColumnSnafu { name: &column.name })?; let dictionary = StringArray::from_iter(values.into_iter().map(Some)); @@ -94,19 +100,19 @@ pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterato pub enum StringDecoder { Direct(NullableIterator), - Dictionary((NullableIterator, ArrayRef)), + Dictionary((NullableIterator, ArrayRef)), } impl StringDecoder { pub fn new(column: &Column) -> Result { - match column.encoding().kind() { - ColumnEncodingKind::DirectV2 => { - Ok(StringDecoder::Direct(new_direct_string_iter(column)?)) - } - ColumnEncodingKind::DictionaryV2 => Ok(StringDecoder::Dictionary( - new_arrow_dict_string_decoder(column)?, + let kind = column.encoding().kind(); + match kind { + ColumnEncodingKind::Direct | ColumnEncodingKind::DirectV2 => Ok(StringDecoder::Direct( + new_direct_string_iter(column, kind.into())?, )), - other => unimplemented!("{other:?}"), + ColumnEncodingKind::Dictionary | ColumnEncodingKind::DictionaryV2 => Ok( + StringDecoder::Dictionary(new_arrow_dict_string_decoder(column, kind.into())?), + ), } } } diff --git a/src/arrow_reader/column/timestamp.rs b/src/arrow_reader/column/timestamp.rs index be27809e..add43ea3 100644 --- a/src/arrow_reader/column/timestamp.rs +++ b/src/arrow_reader/column/timestamp.rs @@ -5,14 +5,14 @@ use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; use crate::error::{self, Result}; use crate::proto::stream::Kind; -use crate::reader::decode::rle_v2::RleReaderV2; +use crate::reader::decode::{get_direct_signed_rle_reader, get_direct_unsigned_rle_reader}; // TIMESTAMP_BASE is 1 January 2015, the base value for all timestamp values. const TIMESTAMP_BASE: i64 = 1420070400; pub struct TimestampIterator { data: Box> + Send>, - secondary: Box> + Send>, + secondary: Box> + Send>, } impl Iterator for TimestampIterator { @@ -50,14 +50,14 @@ pub fn new_timestamp_iter(column: &Column) -> Result = std::result::Result; + +impl From for ArrowError { + fn from(value: Error) -> Self { + ArrowError::ExternalError(Box::new(value)) + } +} diff --git a/src/reader/decode.rs b/src/reader/decode.rs index db18c350..35381fd4 100644 --- a/src/reader/decode.rs +++ b/src/reader/decode.rs @@ -1,6 +1,81 @@ +use std::io::Read; + +use crate::arrow_reader::column::Column; +use crate::error::{InvalidColumnEncodingSnafu, Result}; +use crate::proto; + +use self::rle_v1::{SignedRleReaderV1, UnsignedRleReaderV1}; +use self::rle_v2::{RleReaderV2, UnsignedRleReaderV2}; + pub mod boolean_rle; pub mod byte_rle; pub mod float; +pub mod rle_v1; pub mod rle_v2; mod util; pub mod variable_length; + +#[derive(Clone, Copy, Debug)] +pub enum RleVersion { + V1, + V2, +} + +impl RleVersion { + pub fn get_unsigned_rle_reader( + &self, + reader: R, + ) -> Box> + Send> { + match self { + RleVersion::V1 => Box::new(UnsignedRleReaderV1::new(reader)), + RleVersion::V2 => Box::new(UnsignedRleReaderV2::try_new(reader, true)), + } + } +} + +impl From for RleVersion { + fn from(value: proto::column_encoding::Kind) -> Self { + match value { + proto::column_encoding::Kind::Direct => Self::V1, + proto::column_encoding::Kind::Dictionary => Self::V1, + proto::column_encoding::Kind::DirectV2 => Self::V2, + proto::column_encoding::Kind::DictionaryV2 => Self::V2, + } + } +} + +pub fn get_direct_signed_rle_reader( + column: &Column, + reader: R, +) -> Result> + Send>> { + match column.encoding().kind() { + crate::proto::column_encoding::Kind::Direct => Ok(Box::new(SignedRleReaderV1::new(reader))), + crate::proto::column_encoding::Kind::DirectV2 => { + Ok(Box::new(RleReaderV2::try_new(reader, true, true))) + } + k => InvalidColumnEncodingSnafu { + name: column.name(), + encoding: k, + } + .fail(), + } +} + +pub fn get_direct_unsigned_rle_reader( + column: &Column, + reader: R, +) -> Result> + Send>> { + match column.encoding().kind() { + crate::proto::column_encoding::Kind::Direct => { + Ok(Box::new(UnsignedRleReaderV1::new(reader))) + } + crate::proto::column_encoding::Kind::DirectV2 => { + Ok(Box::new(UnsignedRleReaderV2::try_new(reader, true))) + } + k => InvalidColumnEncodingSnafu { + name: column.name(), + encoding: k, + } + .fail(), + } +} diff --git a/src/reader/decode/rle_v1.rs b/src/reader/decode/rle_v1.rs new file mode 100644 index 00000000..255f3b8e --- /dev/null +++ b/src/reader/decode/rle_v1.rs @@ -0,0 +1,258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Handling decoding of Integer Run Length Encoded V1 data in ORC files + +use std::io::Read; + +use crate::error::Result; + +use super::util::{read_u8, read_vslong, read_vulong, try_read_u8}; + +/// Hold state of the decoding effort. +enum UnsignedDecodingState { + /// First created, haven't read anything + Initial, + /// Sequence of integers that differ by a fixed delta. + Run { + base: u64, + delta: i64, + length: usize, + }, + /// Sequence of `length` varint encoded integers. + Literals { length: usize }, + /// Exhausted input stream. + Exhausted, +} + +impl UnsignedDecodingState { + /// Decode header byte to determine sub-encoding. + /// Runs start with a positive byte, and literals with a negative byte. + fn get_state(reader: &mut R) -> Result { + match try_read_u8(reader)?.map(|byte| byte as i8) { + Some(byte) if byte < 0 => { + let length = byte.unsigned_abs() as usize; + Ok(Self::Literals { length }) + } + Some(byte) => { + let byte = byte as u8; + let length = byte as usize + 3; + let delta = read_u8(reader)? as i8; + let delta = delta as i64; + let base = read_vulong(reader)?; + Ok(Self::Run { + base, + delta, + length, + }) + } + None => Ok(Self::Exhausted), + } + } +} + +/// Decodes a stream of Integer Run Length Encoded version 1 bytes. +pub struct UnsignedRleReaderV1 { + reader: R, + state: UnsignedDecodingState, +} + +impl UnsignedRleReaderV1 { + pub fn new(reader: R) -> Self { + Self { + reader, + state: UnsignedDecodingState::Initial, + } + } + + fn iter_helper(&mut self) -> Result> { + match self.state { + UnsignedDecodingState::Initial => { + self.state = UnsignedDecodingState::get_state(&mut self.reader)?; + // this is safe as UnsignedDecodingState::Initial is only ever set in new() + self.iter_helper() + } + UnsignedDecodingState::Run { + base, + delta, + length, + } => { + let num = base; + if length == 1 { + self.state = UnsignedDecodingState::get_state(&mut self.reader)?; + } else { + self.state = UnsignedDecodingState::Run { + base: base.saturating_add_signed(delta), + delta, + length: length - 1, + }; + } + Ok(Some(num)) + } + UnsignedDecodingState::Literals { length } => { + let num = read_vulong(&mut self.reader)?; + if length == 1 { + self.state = UnsignedDecodingState::get_state(&mut self.reader)?; + } else { + self.state = UnsignedDecodingState::Literals { length: length - 1 }; + } + Ok(Some(num)) + } + UnsignedDecodingState::Exhausted => Ok(None), + } + } +} + +impl Iterator for UnsignedRleReaderV1 { + type Item = Result; + + fn next(&mut self) -> Option { + self.iter_helper().transpose() + } +} + +/// Signed version of [`UnsignedDecodingState`]. +enum SignedDecodingState { + Initial, + Run { + base: i64, + delta: i64, + length: usize, + }, + Literals { + length: usize, + }, + Exhausted, +} + +impl SignedDecodingState { + fn get_state(reader: &mut R) -> Result { + match try_read_u8(reader)?.map(|byte| byte as i8) { + Some(byte) if byte < 0 => { + let length = byte.unsigned_abs() as usize; + Ok(Self::Literals { length }) + } + Some(byte) => { + let byte = byte as u8; + let length = byte as usize + 3; + let delta = read_u8(reader)? as i8; + let delta = delta as i64; + let base = read_vslong(reader)?; + Ok(Self::Run { + base, + delta, + length, + }) + } + None => Ok(Self::Exhausted), + } + } +} + +/// Signed version of [`UnsignedIntV1RLEDecoder`]. +pub struct SignedRleReaderV1 { + reader: R, + state: SignedDecodingState, +} + +impl SignedRleReaderV1 { + pub fn new(reader: R) -> Self { + Self { + reader, + state: SignedDecodingState::Initial, + } + } + + fn iter_helper(&mut self) -> Result> { + match self.state { + SignedDecodingState::Initial => { + self.state = SignedDecodingState::get_state(&mut self.reader)?; + self.iter_helper() + } + SignedDecodingState::Run { + base, + delta, + length, + } => { + let num = base; + if length == 1 { + self.state = SignedDecodingState::get_state(&mut self.reader)?; + } else { + self.state = SignedDecodingState::Run { + base: base.saturating_add(delta), + delta, + length: length - 1, + }; + } + Ok(Some(num)) + } + SignedDecodingState::Literals { length } => { + let num = read_vslong(&mut self.reader)?; + if length == 1 { + self.state = SignedDecodingState::get_state(&mut self.reader)?; + } else { + self.state = SignedDecodingState::Literals { length: length - 1 }; + } + Ok(Some(num)) + } + SignedDecodingState::Exhausted => Ok(None), + } + } +} + +impl Iterator for SignedRleReaderV1 { + type Item = Result; + + fn next(&mut self) -> Option { + self.iter_helper().transpose() + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use super::*; + + #[test] + fn test_run() -> Result<()> { + let input = [0x61, 0x00, 0x07]; + let decoder = UnsignedRleReaderV1::new(Cursor::new(&input)); + let expected = vec![7; 100]; + let actual = decoder.collect::>>()?; + assert_eq!(actual, expected); + + let input = [0x61, 0xff, 0x64]; + let decoder = UnsignedRleReaderV1::new(Cursor::new(&input)); + let expected = (1..=100).rev().collect::>(); + let actual = decoder.collect::>>()?; + assert_eq!(actual, expected); + + Ok(()) + } + + #[test] + fn test_literal() -> Result<()> { + let input = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb]; + let decoder = UnsignedRleReaderV1::new(Cursor::new(&input)); + let expected = vec![2, 3, 6, 7, 11]; + let actual = decoder.collect::>>()?; + assert_eq!(actual, expected); + + Ok(()) + } +} diff --git a/src/reader/decode/util.rs b/src/reader/decode/util.rs index 9dea34c5..d82aadaf 100644 --- a/src/reader/decode/util.rs +++ b/src/reader/decode/util.rs @@ -12,6 +12,18 @@ pub fn read_u8(reader: &mut impl Read) -> Result { Ok(byte[0]) } +/// Like [`read_u8()`] but returns `Ok(None)` if reader has reached EOF +#[inline] +pub fn try_read_u8(reader: &mut impl Read) -> Result> { + let mut byte = [0]; + let length = reader.read(&mut byte).context(error::IoSnafu)?; + if length == 0 { + Ok(None) + } else { + Ok(Some(byte[0])) + } +} + pub fn bytes_to_long_be(r: &mut R, mut n: usize) -> Result { let mut out: i64 = 0; diff --git a/tests/basic/data/demo-11-zlib.orc b/tests/basic/data/demo-11-zlib.orc new file mode 100644 index 00000000..db0ff15e Binary files /dev/null and b/tests/basic/data/demo-11-zlib.orc differ diff --git a/tests/basic/main.rs b/tests/basic/main.rs index 14b49313..a0b9df06 100644 --- a/tests/basic/main.rs +++ b/tests/basic/main.rs @@ -265,6 +265,17 @@ pub async fn async_basic_test_0() { ) } +#[test] +pub fn v0_file_test() { + let path = basic_path("demo-11-zlib.orc"); + let reader = new_arrow_reader_root(&path); + let _expected_row_count = reader.total_row_count(); + let batches = reader.collect::, _>>().unwrap(); + let _total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + // TODO: not reading entire file, debug + // assert_eq!(expected_row_count as usize, total_rows); +} + #[test] pub fn alltypes_test() { let compressions = ["none", "snappy", "zlib", "lzo", "zstd", "lz4"];