Skip to content

Commit

Permalink
Removed panics on read (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 26, 2022
1 parent 34aac65 commit 077b02c
Show file tree
Hide file tree
Showing 28 changed files with 327 additions and 174 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ name = "parquet2"
bench = false

[dependencies]
parquet-format-async-temp = "0.3.0"
parquet-format-async-temp = "0.3.1"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
streaming-decompression = "0.1"

Expand All @@ -34,6 +34,7 @@ xxhash-rust = { version="0.8.3", optional = true, features = ["xxh64"] }
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt"] }
criterion = "0.3"
rand = "0.8"

[features]
default = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter"]
Expand Down
4 changes: 2 additions & 2 deletions examples/read_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use parquet2::encoding::Encoding;
use parquet2::page::{split_buffer, DataPage};
use parquet2::schema::types::PhysicalType;

fn deserialize(page: &DataPage) {
fn deserialize(page: &DataPage) -> Result<()> {
// split the data buffer in repetition levels, definition levels and values
let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page);
let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page)?;

// decode and deserialize.
match (
Expand Down
18 changes: 9 additions & 9 deletions src/deserialize/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ pub struct Dictionary<'a> {
}

impl<'a> Dictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let indexes = utils::dict_indices_decoder(page);
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self, Error> {
let indexes = utils::dict_indices_decoder(page)?;

Self { indexes, dict }
Ok(Self { indexes, dict })
}

#[inline]
Expand All @@ -41,26 +41,26 @@ impl<'a> BinaryPageState<'a> {
match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(Self::RequiredDictionary(Dictionary::new(page, dict)))
Ok(Self::RequiredDictionary(Dictionary::try_new(page, dict)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(Self::OptionalDictionary(
utils::DefLevelsDecoder::new(page),
Dictionary::new(page, dict),
utils::DefLevelsDecoder::try_new(page)?,
Dictionary::try_new(page, dict)?,
))
}
(Encoding::Plain, _, true) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;

let validity = utils::DefLevelsDecoder::new(page);
let validity = utils::DefLevelsDecoder::try_new(page)?;
let values = BinaryIter::new(values, None);

Ok(Self::Optional(validity, values))
}
(Encoding::Plain, _, false) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
let values = BinaryIter::new(values, Some(page.num_values()));

Ok(Self::Required(values))
Expand Down
6 changes: 3 additions & 3 deletions src/deserialize/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ impl<'a> BooleanPageState<'a> {

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, _, true) => {
let validity = utils::DefLevelsDecoder::new(page);
let validity = utils::DefLevelsDecoder::try_new(page)?;

let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(Self::Optional(validity, values))
}
(Encoding::Plain, _, false) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
Ok(Self::Required(values, page.num_values()))
}
_ => Err(Error::General(format!(
Expand Down
18 changes: 9 additions & 9 deletions src/deserialize/fixed_len.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ pub struct Dictionary<'a> {
}

impl<'a> Dictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Self {
let indexes = utils::dict_indices_decoder(page);
pub fn try_new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Result<Self, Error> {
let indexes = utils::dict_indices_decoder(page)?;

Self { indexes, dict }
Ok(Self { indexes, dict })
}

#[inline]
Expand Down Expand Up @@ -79,26 +79,26 @@ impl<'a> FixedLenBinaryPageState<'a> {
match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(Self::RequiredDictionary(Dictionary::new(page, dict)))
Ok(Self::RequiredDictionary(Dictionary::try_new(page, dict)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(Self::OptionalDictionary(
utils::DefLevelsDecoder::new(page),
Dictionary::new(page, dict),
utils::DefLevelsDecoder::try_new(page)?,
Dictionary::try_new(page, dict)?,
))
}
(Encoding::Plain, _, true) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;

let validity = utils::DefLevelsDecoder::new(page);
let validity = utils::DefLevelsDecoder::try_new(page)?;
let values = FixexBinaryIter::new(values, size);

Ok(Self::Optional(validity, values))
}
(Encoding::Plain, _, false) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
let values = FixexBinaryIter::new(values, size);

Ok(Self::Required(values))
Expand Down
21 changes: 9 additions & 12 deletions src/deserialize/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub type Casted<'a, T> = std::iter::Map<std::slice::ChunksExact<'a, u8>, fn(&'a

/// Views the values of the data page as [`Casted`] to [`NativeType`].
pub fn native_cast<T: NativeType>(page: &DataPage) -> Result<Casted<T>, Error> {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
if values.len() % std::mem::size_of::<T>() != 0 {
return Err(Error::OutOfSpec(
"A primitive page data's len must be a multiple of the type".to_string(),
Expand All @@ -31,20 +31,17 @@ where
T: NativeType,
{
pub indexes: hybrid_rle::HybridRleDecoder<'a>,
pub values: &'a [T],
pub dict: &'a PrimitivePageDict<T>,
}

impl<'a, T> Dictionary<'a, T>
where
T: NativeType,
{
pub fn new(page: &'a DataPage, dict: &'a PrimitivePageDict<T>) -> Self {
let indexes = utils::dict_indices_decoder(page);
pub fn try_new(page: &'a DataPage, dict: &'a PrimitivePageDict<T>) -> Result<Self, Error> {
let indexes = utils::dict_indices_decoder(page)?;

Self {
values: dict.values(),
indexes,
}
Ok(Self { dict, indexes })
}

pub fn len(&self) -> usize {
Expand Down Expand Up @@ -84,18 +81,18 @@ impl<'a, T: NativeType> NativePageState<'a, T> {
match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(Self::RequiredDictionary(Dictionary::new(page, dict)))
Ok(Self::RequiredDictionary(Dictionary::try_new(page, dict)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(Self::OptionalDictionary(
utils::DefLevelsDecoder::new(page),
Dictionary::new(page, dict),
utils::DefLevelsDecoder::try_new(page)?,
Dictionary::try_new(page, dict)?,
))
}
(Encoding::Plain, _, true) => {
let validity = utils::DefLevelsDecoder::new(page);
let validity = utils::DefLevelsDecoder::try_new(page)?;
let values = native_cast(page)?;

Ok(Self::Optional(validity, values))
Expand Down
24 changes: 17 additions & 7 deletions src/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@ use std::collections::VecDeque;

use crate::{
encoding::hybrid_rle::{self, HybridRleDecoder},
error::{Error, Result},
indexes::Interval,
page::{split_buffer, DataPage},
read::levels::get_bit_width,
};

use super::hybrid_rle::{HybridDecoderBitmapIter, HybridRleIter};

pub(super) fn dict_indices_decoder(page: &DataPage) -> hybrid_rle::HybridRleDecoder {
let (_, _, indices_buffer) = split_buffer(page);
pub(super) fn dict_indices_decoder(page: &DataPage) -> Result<hybrid_rle::HybridRleDecoder> {
let (_, _, indices_buffer) = split_buffer(page)?;

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
if bit_width > 32 {
return Err(Error::OutOfSpec(
"Bit width of dictionary pages cannot be larger than 32".to_string(),
));
}
let indices_buffer = &indices_buffer[1..];

hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, page.num_values())
Ok(hybrid_rle::HybridRleDecoder::new(
indices_buffer,
bit_width as u32,
page.num_values(),
))
}

/// Decoder of definition levels.
Expand All @@ -32,19 +42,19 @@ pub enum DefLevelsDecoder<'a> {
}

impl<'a> DefLevelsDecoder<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, def_levels, _) = split_buffer(page);
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, def_levels, _) = split_buffer(page)?;

let max_def_level = page.descriptor.max_def_level;
if max_def_level == 1 {
Ok(if max_def_level == 1 {
let iter = hybrid_rle::Decoder::new(def_levels, 1);
let iter = HybridRleIter::new(iter, page.num_values());
Self::Bitmap(iter)
} else {
let iter =
HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values());
Self::Levels(iter, max_def_level as u32)
}
})
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/encoding/bitpacking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ fn decode_pack(compressed: &[u8], num_bits: u8, pack: &mut [u32; BitPacker1x::BL
}

impl<'a> Decoder<'a> {
pub fn new(compressed: &'a [u8], num_bits: u8, length: usize) -> Self {
pub fn new(compressed: &'a [u8], num_bits: u8, mut length: usize) -> Self {
let compressed_block_size = BitPacker1x::BLOCK_LEN * num_bits as usize / 8;

let mut compressed_chunks = compressed.chunks(compressed_block_size);
let mut current_pack = [0; BitPacker1x::BLOCK_LEN];
decode_pack(
compressed_chunks.next().unwrap(),
num_bits,
&mut current_pack,
);
if let Some(chunk) = compressed_chunks.next() {
decode_pack(chunk, num_bits, &mut current_pack);
} else {
length = 0
};

Self {
remaining: length,
Expand Down
13 changes: 9 additions & 4 deletions src/encoding/hybrid_rle/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,25 @@ impl<'a> Iterator for Decoder<'a> {
}
let (indicator, consumed) = uleb128::decode(self.values);
self.values = &self.values[consumed..];
if self.values.is_empty() {
return None;
};
if indicator & 1 == 1 {
// is bitpacking
let bytes = (indicator as usize >> 1) * self.num_bits as usize;
let bytes = std::cmp::min(bytes, self.values.len());
let result = Some(HybridEncoded::Bitpacked(&self.values[..bytes]));
self.values = &self.values[bytes..];
let (result, remaining) = self.values.split_at(bytes);
let result = Some(HybridEncoded::Bitpacked(result));
self.values = remaining;
result
} else {
// is rle
let run_length = indicator as usize >> 1;
// repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width)
let rle_bytes = ceil8(self.num_bits as usize);
let result = Some(HybridEncoded::Rle(&self.values[..rle_bytes], run_length));
self.values = &self.values[rle_bytes..];
let (result, remaining) = self.values.split_at(rle_bytes);
let result = Some(HybridEncoded::Rle(result, run_length));
self.values = remaining;
result
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ pub use crate::parquet_bridge::Encoding;
/// # Panics
/// This function panics iff `values.len() < 4`.
#[inline]
pub fn get_length(values: &[u8]) -> u32 {
u32::from_le_bytes(values[0..4].try_into().unwrap())
pub fn get_length(values: &[u8]) -> Option<usize> {
values
.get(0..4)
.map(|x| u32::from_le_bytes(x.try_into().unwrap()) as usize)
}

/// Returns the ceil of value/divisor
Expand Down
14 changes: 10 additions & 4 deletions src/encoding/plain_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/// prefixes, lengths and values
/// # Implementation
/// This struct does not allocate on the heap.
use crate::error::Error;

#[derive(Debug)]
pub struct BinaryIter<'a> {
Expand All @@ -16,7 +17,7 @@ impl<'a> BinaryIter<'a> {
}

impl<'a> Iterator for BinaryIter<'a> {
type Item = &'a [u8];
type Item = Result<&'a [u8], Error>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -28,9 +29,14 @@ impl<'a> Iterator for BinaryIter<'a> {
}
let length = u32::from_le_bytes(self.values[0..4].try_into().unwrap()) as usize;
self.values = &self.values[4..];
let result = &self.values[..length];
self.values = &self.values[length..];
Some(result)
if length > self.values.len() {
return Some(Err(Error::OutOfSpec(
"A string in plain encoding declares a length that is out of range".to_string(),
)));
}
let (result, remaining) = self.values.split_at(length);
self.values = remaining;
Some(Ok(result))
}

#[inline]
Expand Down
18 changes: 17 additions & 1 deletion src/metadata/column_chunk_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use parquet_format_async_temp::{ColumnChunk, ColumnMetaData, Encoding};

use super::column_descriptor::ColumnDescriptor;
use crate::compression::Compression;
use crate::error::Result;
use crate::error::{Error, Result};
use crate::schema::types::PhysicalType;
use crate::statistics::{deserialize_statistics, Statistics};

Expand Down Expand Up @@ -135,6 +135,22 @@ impl ColumnChunkMetaData {
column_descr: ColumnDescriptor,
column_chunk: ColumnChunk,
) -> Result<Self> {
// validate metadata
if let Some(meta) = &column_chunk.meta_data {
let _: usize = meta.total_compressed_size.try_into()?;

if let Some(offset) = meta.dictionary_page_offset {
let _: usize = offset.try_into()?;
}
let _: usize = meta.data_page_offset.try_into()?;

let _: Compression = meta.codec.try_into()?;
} else {
return Err(Error::OutOfSpec(
"Column chunk requires metdata".to_string(),
));
}

Ok(Self {
column_chunk,
column_descr,
Expand Down
Loading

0 comments on commit 077b02c

Please sign in to comment.