From 077b02c3c0d469f6c96dab537c555ae348e11993 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sun, 26 Jun 2022 00:38:08 -0700 Subject: [PATCH] Removed panics on read (#150) --- Cargo.toml | 3 +- examples/read_metadata.rs | 4 +- src/deserialize/binary.rs | 18 ++++---- src/deserialize/boolean.rs | 6 +-- src/deserialize/fixed_len.rs | 18 ++++---- src/deserialize/native.rs | 21 ++++----- src/deserialize/utils.rs | 24 +++++++--- src/encoding/bitpacking.rs | 12 ++--- src/encoding/hybrid_rle/decoder.rs | 13 ++++-- src/encoding/mod.rs | 6 ++- src/encoding/plain_byte_array.rs | 14 ++++-- src/metadata/column_chunk_metadata.rs | 18 +++++++- src/metadata/file_metadata.rs | 4 +- src/page/mod.rs | 64 ++++++++++++++++++++------ src/page/page_dict/binary.rs | 37 ++++++++++----- src/page/page_dict/fixed_len_binary.rs | 20 ++++---- src/page/page_dict/primitive.rs | 38 ++++++++------- src/read/page/reader.rs | 58 +++++++++++++++++------ src/read/page/stream.rs | 2 +- src/schema/io_thrift/from_thrift.rs | 7 +-- tests/it/read/binary.rs | 10 ++-- tests/it/read/boolean.rs | 4 +- tests/it/read/fixed_binary.rs | 11 +++-- tests/it/read/mod.rs | 8 ++-- tests/it/read/primitive.rs | 18 +++++--- tests/it/read/primitive_nested.rs | 4 +- tests/it/read/struct_.rs | 8 ++-- tests/it/read/utils.rs | 51 +++++++++++--------- 28 files changed, 327 insertions(+), 174 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3745e0cdb..942119009 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ name = "parquet2" bench = false [dependencies] -parquet-format-async-temp = "0.3.0" +parquet-format-async-temp = "0.3.1" bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] } streaming-decompression = "0.1" @@ -34,6 +34,7 @@ xxhash-rust = { version="0.8.3", optional = true, features = ["xxh64"] } [dev-dependencies] tokio = { version = "1", features = ["macros", "rt"] } criterion = "0.3" +rand = "0.8" [features] default = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter"] diff --git a/examples/read_metadata.rs b/examples/read_metadata.rs index e20e45eca..2188915b9 100644 --- a/examples/read_metadata.rs +++ b/examples/read_metadata.rs @@ -6,9 +6,9 @@ use parquet2::encoding::Encoding; use parquet2::page::{split_buffer, DataPage}; use parquet2::schema::types::PhysicalType; -fn deserialize(page: &DataPage) { +fn deserialize(page: &DataPage) -> Result<()> { // split the data buffer in repetition levels, definition levels and values - let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page); + let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page)?; // decode and deserialize. match ( diff --git a/src/deserialize/binary.rs b/src/deserialize/binary.rs index 32c8bb59f..3e5bc3760 100644 --- a/src/deserialize/binary.rs +++ b/src/deserialize/binary.rs @@ -14,10 +14,10 @@ pub struct Dictionary<'a> { } impl<'a> Dictionary<'a> { - pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { - let indexes = utils::dict_indices_decoder(page); + pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result { + let indexes = utils::dict_indices_decoder(page)?; - Self { indexes, dict } + Ok(Self { indexes, dict }) } #[inline] @@ -41,26 +41,26 @@ impl<'a> BinaryPageState<'a> { match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(Self::RequiredDictionary(Dictionary::new(page, dict))) + Ok(Self::RequiredDictionary(Dictionary::try_new(page, dict)?)) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(Self::OptionalDictionary( - utils::DefLevelsDecoder::new(page), - Dictionary::new(page, dict), + utils::DefLevelsDecoder::try_new(page)?, + Dictionary::try_new(page, dict)?, )) } (Encoding::Plain, _, true) => { - let (_, _, values) = split_buffer(page); + let (_, _, values) = split_buffer(page)?; - let validity = utils::DefLevelsDecoder::new(page); + let validity = utils::DefLevelsDecoder::try_new(page)?; let values = BinaryIter::new(values, None); Ok(Self::Optional(validity, values)) } (Encoding::Plain, _, false) => { - let (_, _, values) = split_buffer(page); + let (_, _, values) = split_buffer(page)?; let values = BinaryIter::new(values, Some(page.num_values())); Ok(Self::Required(values)) diff --git a/src/deserialize/boolean.rs b/src/deserialize/boolean.rs index f6a40fed6..4fe53b579 100644 --- a/src/deserialize/boolean.rs +++ b/src/deserialize/boolean.rs @@ -22,15 +22,15 @@ impl<'a> BooleanPageState<'a> { match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::Plain, _, true) => { - let validity = utils::DefLevelsDecoder::new(page); + let validity = utils::DefLevelsDecoder::try_new(page)?; - let (_, _, values) = split_buffer(page); + let (_, _, values) = split_buffer(page)?; let values = BitmapIter::new(values, 0, values.len() * 8); Ok(Self::Optional(validity, values)) } (Encoding::Plain, _, false) => { - let (_, _, values) = split_buffer(page); + let (_, _, values) = split_buffer(page)?; Ok(Self::Required(values, page.num_values())) } _ => Err(Error::General(format!( diff --git a/src/deserialize/fixed_len.rs b/src/deserialize/fixed_len.rs index 1686160f4..698d2bdd7 100644 --- a/src/deserialize/fixed_len.rs +++ b/src/deserialize/fixed_len.rs @@ -41,10 +41,10 @@ pub struct Dictionary<'a> { } impl<'a> Dictionary<'a> { - pub fn new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Self { - let indexes = utils::dict_indices_decoder(page); + pub fn try_new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Result { + let indexes = utils::dict_indices_decoder(page)?; - Self { indexes, dict } + Ok(Self { indexes, dict }) } #[inline] @@ -79,26 +79,26 @@ impl<'a> FixedLenBinaryPageState<'a> { match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(Self::RequiredDictionary(Dictionary::new(page, dict))) + Ok(Self::RequiredDictionary(Dictionary::try_new(page, dict)?)) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(Self::OptionalDictionary( - utils::DefLevelsDecoder::new(page), - Dictionary::new(page, dict), + utils::DefLevelsDecoder::try_new(page)?, + Dictionary::try_new(page, dict)?, )) } (Encoding::Plain, _, true) => { - let (_, _, values) = split_buffer(page); + let (_, _, values) = split_buffer(page)?; - let validity = utils::DefLevelsDecoder::new(page); + let validity = utils::DefLevelsDecoder::try_new(page)?; let values = FixexBinaryIter::new(values, size); Ok(Self::Optional(validity, values)) } (Encoding::Plain, _, false) => { - let (_, _, values) = split_buffer(page); + let (_, _, values) = split_buffer(page)?; let values = FixexBinaryIter::new(values, size); Ok(Self::Required(values)) diff --git a/src/deserialize/native.rs b/src/deserialize/native.rs index 8a4d11800..c055bcbf9 100644 --- a/src/deserialize/native.rs +++ b/src/deserialize/native.rs @@ -13,7 +13,7 @@ pub type Casted<'a, T> = std::iter::Map, fn(&'a /// Views the values of the data page as [`Casted`] to [`NativeType`]. pub fn native_cast(page: &DataPage) -> Result, Error> { - let (_, _, values) = split_buffer(page); + let (_, _, values) = split_buffer(page)?; if values.len() % std::mem::size_of::() != 0 { return Err(Error::OutOfSpec( "A primitive page data's len must be a multiple of the type".to_string(), @@ -31,20 +31,17 @@ where T: NativeType, { pub indexes: hybrid_rle::HybridRleDecoder<'a>, - pub values: &'a [T], + pub dict: &'a PrimitivePageDict, } impl<'a, T> Dictionary<'a, T> where T: NativeType, { - pub fn new(page: &'a DataPage, dict: &'a PrimitivePageDict) -> Self { - let indexes = utils::dict_indices_decoder(page); + pub fn try_new(page: &'a DataPage, dict: &'a PrimitivePageDict) -> Result { + let indexes = utils::dict_indices_decoder(page)?; - Self { - values: dict.values(), - indexes, - } + Ok(Self { dict, indexes }) } pub fn len(&self) -> usize { @@ -84,18 +81,18 @@ impl<'a, T: NativeType> NativePageState<'a, T> { match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(Self::RequiredDictionary(Dictionary::new(page, dict))) + Ok(Self::RequiredDictionary(Dictionary::try_new(page, dict)?)) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(Self::OptionalDictionary( - utils::DefLevelsDecoder::new(page), - Dictionary::new(page, dict), + utils::DefLevelsDecoder::try_new(page)?, + Dictionary::try_new(page, dict)?, )) } (Encoding::Plain, _, true) => { - let validity = utils::DefLevelsDecoder::new(page); + let validity = utils::DefLevelsDecoder::try_new(page)?; let values = native_cast(page)?; Ok(Self::Optional(validity, values)) diff --git a/src/deserialize/utils.rs b/src/deserialize/utils.rs index b46fe1e4b..9feb53f6d 100644 --- a/src/deserialize/utils.rs +++ b/src/deserialize/utils.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use crate::{ encoding::hybrid_rle::{self, HybridRleDecoder}, + error::{Error, Result}, indexes::Interval, page::{split_buffer, DataPage}, read::levels::get_bit_width, @@ -9,15 +10,24 @@ use crate::{ use super::hybrid_rle::{HybridDecoderBitmapIter, HybridRleIter}; -pub(super) fn dict_indices_decoder(page: &DataPage) -> hybrid_rle::HybridRleDecoder { - let (_, _, indices_buffer) = split_buffer(page); +pub(super) fn dict_indices_decoder(page: &DataPage) -> Result { + let (_, _, indices_buffer) = split_buffer(page)?; // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). let bit_width = indices_buffer[0]; + if bit_width > 32 { + return Err(Error::OutOfSpec( + "Bit width of dictionary pages cannot be larger than 32".to_string(), + )); + } let indices_buffer = &indices_buffer[1..]; - hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, page.num_values()) + Ok(hybrid_rle::HybridRleDecoder::new( + indices_buffer, + bit_width as u32, + page.num_values(), + )) } /// Decoder of definition levels. @@ -32,11 +42,11 @@ pub enum DefLevelsDecoder<'a> { } impl<'a> DefLevelsDecoder<'a> { - pub fn new(page: &'a DataPage) -> Self { - let (_, def_levels, _) = split_buffer(page); + pub fn try_new(page: &'a DataPage) -> Result { + let (_, def_levels, _) = split_buffer(page)?; let max_def_level = page.descriptor.max_def_level; - if max_def_level == 1 { + Ok(if max_def_level == 1 { let iter = hybrid_rle::Decoder::new(def_levels, 1); let iter = HybridRleIter::new(iter, page.num_values()); Self::Bitmap(iter) @@ -44,7 +54,7 @@ impl<'a> DefLevelsDecoder<'a> { let iter = HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values()); Self::Levels(iter, max_def_level as u32) - } + }) } } diff --git a/src/encoding/bitpacking.rs b/src/encoding/bitpacking.rs index 2a06ac7b4..f92bf9a4d 100644 --- a/src/encoding/bitpacking.rs +++ b/src/encoding/bitpacking.rs @@ -69,16 +69,16 @@ fn decode_pack(compressed: &[u8], num_bits: u8, pack: &mut [u32; BitPacker1x::BL } impl<'a> Decoder<'a> { - pub fn new(compressed: &'a [u8], num_bits: u8, length: usize) -> Self { + pub fn new(compressed: &'a [u8], num_bits: u8, mut length: usize) -> Self { let compressed_block_size = BitPacker1x::BLOCK_LEN * num_bits as usize / 8; let mut compressed_chunks = compressed.chunks(compressed_block_size); let mut current_pack = [0; BitPacker1x::BLOCK_LEN]; - decode_pack( - compressed_chunks.next().unwrap(), - num_bits, - &mut current_pack, - ); + if let Some(chunk) = compressed_chunks.next() { + decode_pack(chunk, num_bits, &mut current_pack); + } else { + length = 0 + }; Self { remaining: length, diff --git a/src/encoding/hybrid_rle/decoder.rs b/src/encoding/hybrid_rle/decoder.rs index 2d13de003..bdd4b5081 100644 --- a/src/encoding/hybrid_rle/decoder.rs +++ b/src/encoding/hybrid_rle/decoder.rs @@ -30,20 +30,25 @@ impl<'a> Iterator for Decoder<'a> { } let (indicator, consumed) = uleb128::decode(self.values); self.values = &self.values[consumed..]; + if self.values.is_empty() { + return None; + }; if indicator & 1 == 1 { // is bitpacking let bytes = (indicator as usize >> 1) * self.num_bits as usize; let bytes = std::cmp::min(bytes, self.values.len()); - let result = Some(HybridEncoded::Bitpacked(&self.values[..bytes])); - self.values = &self.values[bytes..]; + let (result, remaining) = self.values.split_at(bytes); + let result = Some(HybridEncoded::Bitpacked(result)); + self.values = remaining; result } else { // is rle let run_length = indicator as usize >> 1; // repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width) let rle_bytes = ceil8(self.num_bits as usize); - let result = Some(HybridEncoded::Rle(&self.values[..rle_bytes], run_length)); - self.values = &self.values[rle_bytes..]; + let (result, remaining) = self.values.split_at(rle_bytes); + let result = Some(HybridEncoded::Rle(result, run_length)); + self.values = remaining; result } } diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index efc890021..928eca936 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -14,8 +14,10 @@ pub use crate::parquet_bridge::Encoding; /// # Panics /// This function panics iff `values.len() < 4`. #[inline] -pub fn get_length(values: &[u8]) -> u32 { - u32::from_le_bytes(values[0..4].try_into().unwrap()) +pub fn get_length(values: &[u8]) -> Option { + values + .get(0..4) + .map(|x| u32::from_le_bytes(x.try_into().unwrap()) as usize) } /// Returns the ceil of value/divisor diff --git a/src/encoding/plain_byte_array.rs b/src/encoding/plain_byte_array.rs index 7c0b1bee3..23dcc926d 100644 --- a/src/encoding/plain_byte_array.rs +++ b/src/encoding/plain_byte_array.rs @@ -2,6 +2,7 @@ /// prefixes, lengths and values /// # Implementation /// This struct does not allocate on the heap. +use crate::error::Error; #[derive(Debug)] pub struct BinaryIter<'a> { @@ -16,7 +17,7 @@ impl<'a> BinaryIter<'a> { } impl<'a> Iterator for BinaryIter<'a> { - type Item = &'a [u8]; + type Item = Result<&'a [u8], Error>; #[inline] fn next(&mut self) -> Option { @@ -28,9 +29,14 @@ impl<'a> Iterator for BinaryIter<'a> { } let length = u32::from_le_bytes(self.values[0..4].try_into().unwrap()) as usize; self.values = &self.values[4..]; - let result = &self.values[..length]; - self.values = &self.values[length..]; - Some(result) + if length > self.values.len() { + return Some(Err(Error::OutOfSpec( + "A string in plain encoding declares a length that is out of range".to_string(), + ))); + } + let (result, remaining) = self.values.split_at(length); + self.values = remaining; + Some(Ok(result)) } #[inline] diff --git a/src/metadata/column_chunk_metadata.rs b/src/metadata/column_chunk_metadata.rs index 4e522822d..e2a2e6906 100644 --- a/src/metadata/column_chunk_metadata.rs +++ b/src/metadata/column_chunk_metadata.rs @@ -4,7 +4,7 @@ use parquet_format_async_temp::{ColumnChunk, ColumnMetaData, Encoding}; use super::column_descriptor::ColumnDescriptor; use crate::compression::Compression; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::schema::types::PhysicalType; use crate::statistics::{deserialize_statistics, Statistics}; @@ -135,6 +135,22 @@ impl ColumnChunkMetaData { column_descr: ColumnDescriptor, column_chunk: ColumnChunk, ) -> Result { + // validate metadata + if let Some(meta) = &column_chunk.meta_data { + let _: usize = meta.total_compressed_size.try_into()?; + + if let Some(offset) = meta.dictionary_page_offset { + let _: usize = offset.try_into()?; + } + let _: usize = meta.data_page_offset.try_into()?; + + let _: Compression = meta.codec.try_into()?; + } else { + return Err(Error::OutOfSpec( + "Column chunk requires metdata".to_string(), + )); + } + Ok(Self { column_chunk, column_descr, diff --git a/src/metadata/file_metadata.rs b/src/metadata/file_metadata.rs index 959f58c7f..a5e50ef49 100644 --- a/src/metadata/file_metadata.rs +++ b/src/metadata/file_metadata.rs @@ -40,7 +40,7 @@ pub struct FileMetaData { } impl FileMetaData { - /// Returns the ['SchemaDescriptor`] that describes schema of this file. + /// Returns the [`SchemaDescriptor`] that describes schema of this file. pub fn schema(&self) -> &SchemaDescriptor { &self.schema_descr } @@ -69,7 +69,7 @@ impl FileMetaData { .row_groups .into_iter() .map(|rg| RowGroupMetaData::try_from_thrift(&schema_descr, rg)) - .collect::, Error>>()?; + .collect::>()?; let column_orders = metadata .column_orders diff --git a/src/page/mod.rs b/src/page/mod.rs index 707f46740..ccf475899 100644 --- a/src/page/mod.rs +++ b/src/page/mod.rs @@ -12,7 +12,7 @@ pub use crate::parquet_bridge::{DataPageHeaderExt, PageType}; use crate::compression::Compression; use crate::encoding::{get_length, Encoding}; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::metadata::Descriptor; use crate::statistics::{deserialize_statistics, Statistics}; @@ -301,28 +301,62 @@ impl CompressedPage { /// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v1 pages. #[inline] -pub fn split_buffer_v1(buffer: &[u8], has_rep: bool, has_def: bool) -> (&[u8], &[u8], &[u8]) { +pub fn split_buffer_v1( + buffer: &[u8], + has_rep: bool, + has_def: bool, +) -> Result<(&[u8], &[u8], &[u8])> { let (rep, buffer) = if has_rep { - let level_buffer_length = get_length(buffer) as usize; + let level_buffer_length = get_length(buffer).ok_or_else(|| { + Error::OutOfSpec( + "The number of bytes declared in v1 rep levels is higher than the page size" + .to_string(), + ) + })?; ( - &buffer[4..4 + level_buffer_length], - &buffer[4 + level_buffer_length..], + buffer.get(4..4 + level_buffer_length).ok_or_else(|| { + Error::OutOfSpec( + "The number of bytes declared in v1 rep levels is higher than the page size" + .to_string(), + ) + })?, + buffer.get(4 + level_buffer_length..).ok_or_else(|| { + Error::OutOfSpec( + "The number of bytes declared in v1 rep levels is higher than the page size" + .to_string(), + ) + })?, ) } else { (&[] as &[u8], buffer) }; let (def, buffer) = if has_def { - let level_buffer_length = get_length(buffer) as usize; + let level_buffer_length = get_length(buffer).ok_or_else(|| { + Error::OutOfSpec( + "The number of bytes declared in v1 rep levels is higher than the page size" + .to_string(), + ) + })?; ( - &buffer[4..4 + level_buffer_length], - &buffer[4 + level_buffer_length..], + buffer.get(4..4 + level_buffer_length).ok_or_else(|| { + Error::OutOfSpec( + "The number of bytes declared in v1 def levels is higher than the page size" + .to_string(), + ) + })?, + buffer.get(4 + level_buffer_length..).ok_or_else(|| { + Error::OutOfSpec( + "The number of bytes declared in v1 def levels is higher than the page size" + .to_string(), + ) + })?, ) } else { (&[] as &[u8], buffer) }; - (rep, def, buffer) + Ok((rep, def, buffer)) } /// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v2 pages. @@ -330,16 +364,16 @@ pub fn split_buffer_v2( buffer: &[u8], rep_level_buffer_length: usize, def_level_buffer_length: usize, -) -> (&[u8], &[u8], &[u8]) { - ( +) -> Result<(&[u8], &[u8], &[u8])> { + Ok(( &buffer[..rep_level_buffer_length], &buffer[rep_level_buffer_length..rep_level_buffer_length + def_level_buffer_length], &buffer[rep_level_buffer_length + def_level_buffer_length..], - ) + )) } /// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values). -pub fn split_buffer(page: &DataPage) -> (&[u8], &[u8], &[u8]) { +pub fn split_buffer(page: &DataPage) -> Result<(&[u8], &[u8], &[u8])> { match page.header() { DataPageHeader::V1(_) => split_buffer_v1( page.buffer(), @@ -347,8 +381,8 @@ pub fn split_buffer(page: &DataPage) -> (&[u8], &[u8], &[u8]) { page.descriptor.max_def_level > 0, ), DataPageHeader::V2(header) => { - let def_level_buffer_length = header.definition_levels_byte_length as usize; - let rep_level_buffer_length = header.repetition_levels_byte_length as usize; + let def_level_buffer_length: usize = header.definition_levels_byte_length.try_into()?; + let rep_level_buffer_length: usize = header.repetition_levels_byte_length.try_into()?; split_buffer_v2( page.buffer(), rep_level_buffer_length, diff --git a/src/page/page_dict/binary.rs b/src/page/page_dict/binary.rs index 01e3db4f0..694fbe517 100644 --- a/src/page/page_dict/binary.rs +++ b/src/page/page_dict/binary.rs @@ -26,8 +26,13 @@ impl BinaryPageDict { #[inline] pub fn value(&self, index: usize) -> Result<&[u8], Error> { + let end = *self.offsets.get(index + 1).ok_or_else(|| { + Error::OutOfSpec( + "The data page has an index larger than the dictionary page values".to_string(), + ) + })?; + let end: usize = end.try_into()?; let start: usize = self.offsets[index].try_into()?; - let end: usize = self.offsets[(index + 1)].try_into()?; Ok(&self.values[start..end]) } } @@ -42,25 +47,35 @@ impl DictPage for BinaryPageDict { } } -fn read_plain(bytes: &[u8], length: usize) -> (Vec, Vec) { +fn read_plain(bytes: &[u8], length: usize) -> Result<(Vec, Vec), Error> { let mut bytes = bytes; let mut values = Vec::new(); let mut offsets = Vec::with_capacity(length as usize + 1); offsets.push(0); let mut current_length = 0; - offsets.extend((0..length).map(|_| { - let slot_length = get_length(bytes) as i32; - current_length += slot_length; - values.extend_from_slice(&bytes[4..4 + slot_length as usize]); - bytes = &bytes[4 + slot_length as usize..]; - current_length - })); + offsets.reserve(length); + for _ in 0..length { + let slot_length = get_length(bytes).unwrap(); + bytes = &bytes[4..]; + current_length += slot_length as i32; - (values, offsets) + if slot_length > bytes.len() { + return Err(Error::OutOfSpec( + "The string on a dictionary page has a length that is out of bounds".to_string(), + )); + } + let (result, remaining) = bytes.split_at(slot_length); + + values.extend_from_slice(result); + bytes = remaining; + offsets.push(current_length); + } + + Ok((values, offsets)) } pub fn read(buf: &[u8], num_values: usize) -> Result, Error> { - let (values, offsets) = read_plain(buf, num_values); + let (values, offsets) = read_plain(buf, num_values)?; Ok(Arc::new(BinaryPageDict::new(values, offsets))) } diff --git a/src/page/page_dict/fixed_len_binary.rs b/src/page/page_dict/fixed_len_binary.rs index d7058d9a7..93fd0d86e 100644 --- a/src/page/page_dict/fixed_len_binary.rs +++ b/src/page/page_dict/fixed_len_binary.rs @@ -1,6 +1,6 @@ use std::{any::Any, sync::Arc}; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::schema::types::PhysicalType; use super::DictPage; @@ -30,8 +30,14 @@ impl FixedLenByteArrayPageDict { } #[inline] - pub fn value(&self, index: usize) -> &[u8] { - &self.values[index * self.size..(index + 1) * self.size] + pub fn value(&self, index: usize) -> Result<&[u8]> { + self.values + .get(index * self.size..(index + 1) * self.size) + .ok_or_else(|| { + Error::OutOfSpec( + "The data page has an index larger than the dictionary page values".to_string(), + ) + }) } } @@ -45,12 +51,10 @@ impl DictPage for FixedLenByteArrayPageDict { } } -fn read_plain(bytes: &[u8], size: usize, length: usize) -> Vec { - bytes[..size * length].to_vec() -} - pub fn read(buf: &[u8], size: usize, num_values: usize) -> Result> { - let values = read_plain(buf, size, num_values); + let length = size.saturating_mul(num_values); + let values = buf.get(..length).ok_or_else(|| Error::OutOfSpec("Fixed sized binary declares a number of values times size larger than the page buffer".to_string()))?.to_vec(); + Ok(Arc::new(FixedLenByteArrayPageDict::new( values, PhysicalType::FixedLenByteArray(size), diff --git a/src/page/page_dict/primitive.rs b/src/page/page_dict/primitive.rs index 2ccbfb3b4..b2fa58ab1 100644 --- a/src/page/page_dict/primitive.rs +++ b/src/page/page_dict/primitive.rs @@ -21,6 +21,15 @@ impl PrimitivePageDict { pub fn values(&self) -> &[T] { &self.values } + + #[inline] + pub fn value(&self, index: usize) -> Result<&T> { + self.values.get(index).ok_or_else(|| { + Error::OutOfSpec( + "The data page has an index larger than the dictionary page values".to_string(), + ) + }) + } } impl DictPage for PrimitivePageDict { @@ -33,26 +42,23 @@ impl DictPage for PrimitivePageDict { } } -fn read_plain(values: &[u8]) -> Result> { - // read in plain - if values.len() % std::mem::size_of::() != 0 { - return Err(Error::OutOfSpec( - "A dictionary page with primitive values must contain a multiple of their format." - .to_string(), - )); - } - Ok(values - .chunks_exact(std::mem::size_of::()) - .map(decode::) - .collect()) -} - pub fn read( buf: &[u8], num_values: usize, _is_sorted: bool, ) -> Result> { - let typed_size = num_values * std::mem::size_of::(); - let values = read_plain::(&buf[..typed_size])?; + let size_of = std::mem::size_of::(); + + let typed_size = num_values.wrapping_mul(size_of); + + let values = buf.get(..typed_size).ok_or_else(|| { + Error::OutOfSpec( + "The number of values declared in the dict page does not match the length of the page" + .to_string(), + ) + })?; + + let values = values.chunks_exact(size_of).map(decode::).collect(); + Ok(Arc::new(PrimitivePageDict::new(values))) } diff --git a/src/read/page/reader.rs b/src/read/page/reader.rs index b0d8af8e4..b67d71bc3 100644 --- a/src/read/page/reader.rs +++ b/src/read/page/reader.rs @@ -4,7 +4,7 @@ use std::{io::Read, sync::Arc}; use parquet_format_async_temp::thrift::protocol::TCompactInputProtocol; use crate::compression::Compression; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::indexes::Interval; use crate::metadata::{ColumnChunkMetaData, Descriptor}; @@ -12,6 +12,7 @@ use crate::page::{ read_dict_page, CompressedDataPage, DataPageHeader, DictPage, EncodedDictPage, PageType, ParquetPageHeader, }; +use crate::parquet_bridge::Encoding; use super::PageIterator; @@ -190,7 +191,7 @@ pub(super) fn build_page( buffer: &mut Vec, ) -> Result> { let page_header = read_page_header(&mut reader.reader)?; - reader.seen_num_values += get_page_header(&page_header) + reader.seen_num_values += get_page_header(&page_header)? .map(|x| x.num_values() as i64) .unwrap_or_default(); @@ -238,9 +239,15 @@ pub(super) fn finish_page( selected_rows: Option>, ) -> Result { let type_ = page_header.type_.try_into()?; + let uncompressed_page_size = page_header.uncompressed_page_size.try_into()?; match type_ { PageType::DictionaryPage => { - let dict_header = page_header.dictionary_page_header.as_ref().unwrap(); + let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a dictionary page but the dictionary header is empty" + .to_string(), + ) + })?; let is_sorted = dict_header.is_sorted.unwrap_or(false); // move the buffer to `dict_page` @@ -249,7 +256,7 @@ pub(super) fn finish_page( let page = read_dict_page( &dict_page, - (compression, page_header.uncompressed_page_size as usize), + (compression, uncompressed_page_size), is_sorted, descriptor.primitive_type.physical_type, )?; @@ -259,26 +266,36 @@ pub(super) fn finish_page( Ok(FinishedPage::Dict(page)) } PageType::DataPage => { - let header = page_header.data_page_header.unwrap(); + let header = page_header.data_page_header.ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v1 data page but the v1 data header is empty" + .to_string(), + ) + })?; Ok(FinishedPage::Data(CompressedDataPage::new_read( DataPageHeader::V1(header), std::mem::take(data), compression, - page_header.uncompressed_page_size as usize, + uncompressed_page_size, current_dictionary.clone(), descriptor.clone(), selected_rows, ))) } PageType::DataPageV2 => { - let header = page_header.data_page_header_v2.unwrap(); + let header = page_header.data_page_header_v2.ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v2 data page but the v2 data header is empty" + .to_string(), + ) + })?; Ok(FinishedPage::Data(CompressedDataPage::new_read( DataPageHeader::V2(header), std::mem::take(data), compression, - page_header.uncompressed_page_size as usize, + uncompressed_page_size, current_dictionary.clone(), descriptor.clone(), selected_rows, @@ -287,17 +304,30 @@ pub(super) fn finish_page( } } -pub(super) fn get_page_header(header: &ParquetPageHeader) -> Option { - let type_ = header.type_.try_into().unwrap(); - match type_ { +pub(super) fn get_page_header(header: &ParquetPageHeader) -> Result> { + let type_ = header.type_.try_into()?; + Ok(match type_ { PageType::DataPage => { - let header = header.data_page_header.clone().unwrap(); + let header = header.data_page_header.clone().ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v1 data page but the v1 header is empty".to_string(), + ) + })?; + let _: Encoding = header.encoding.try_into()?; + let _: Encoding = header.repetition_level_encoding.try_into()?; + let _: Encoding = header.definition_level_encoding.try_into()?; + Some(DataPageHeader::V1(header)) } PageType::DataPageV2 => { - let header = header.data_page_header_v2.clone().unwrap(); + let header = header.data_page_header_v2.clone().ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v1 data page but the v1 header is empty".to_string(), + ) + })?; + let _: Encoding = header.encoding.try_into()?; Some(DataPageHeader::V2(header)) } _ => None, - } + }) } diff --git a/src/read/page/stream.rs b/src/read/page/stream.rs index 604dd0ec2..6c4056a8e 100644 --- a/src/read/page/stream.rs +++ b/src/read/page/stream.rs @@ -75,7 +75,7 @@ fn _get_page_stream( // the header let page_header = read_page_header(reader).await?; - let data_header = get_page_header(&page_header); + let data_header = get_page_header(&page_header)?; seen_values += data_header.as_ref().map(|x| x.num_values() as i64).unwrap_or_default(); let read_size = page_header.compressed_page_size as i64; diff --git a/src/schema/io_thrift/from_thrift.rs b/src/schema/io_thrift/from_thrift.rs index a9e95adaa..028d66f75 100644 --- a/src/schema/io_thrift/from_thrift.rs +++ b/src/schema/io_thrift/from_thrift.rs @@ -37,7 +37,9 @@ fn from_thrift_helper(elements: &[SchemaElement], index: usize) -> Result<(usize // There is only one message type node in the schema tree. let is_root_node = index == 0; - let element = &elements[index]; + let element = elements.get(index).ok_or_else(|| { + Error::OutOfSpec(format!("index {} on SchemaElement is not valid", index)) + })?; let name = element.name.clone(); let converted_type = element.converted_type; @@ -55,8 +57,7 @@ fn from_thrift_helper(elements: &[SchemaElement], index: usize) -> Result<(usize .ok_or_else(|| { general_err!("Repetition level must be defined for a primitive type") })? - .try_into() - .unwrap(); + .try_into()?; let physical_type = element.type_.ok_or_else(|| { general_err!("Physical type must be defined for a primitive type") })?; diff --git a/tests/it/read/binary.rs b/tests/it/read/binary.rs index a3401ac0c..89cae513e 100644 --- a/tests/it/read/binary.rs +++ b/tests/it/read/binary.rs @@ -9,9 +9,13 @@ pub fn page_to_vec(page: &DataPage) -> Result>>> { match state { BinaryPageState::Optional(validity, values) => { - deserialize_optional(validity, values.map(|x| x.to_vec())) + deserialize_optional(validity, values.map(|x| x.map(|x| x.to_vec()))) } - BinaryPageState::Required(values) => Ok(values.map(|x| x.to_vec()).map(Some).collect()), + BinaryPageState::Required(values) => values + .map(|x| x.map(|x| x.to_vec())) + .map(Some) + .map(|x| x.transpose()) + .collect(), BinaryPageState::RequiredDictionary(dict) => dict .indexes .map(|x| x as usize) @@ -21,7 +25,7 @@ pub fn page_to_vec(page: &DataPage) -> Result>>> { let values = dict .indexes .map(|x| x as usize) - .map(|x| dict.dict.value(x).map(|x| x.to_vec()).unwrap()); + .map(|x| dict.dict.value(x).map(|x| x.to_vec())); deserialize_optional(validity, values) } } diff --git a/tests/it/read/boolean.rs b/tests/it/read/boolean.rs index 231123580..aa697cc77 100644 --- a/tests/it/read/boolean.rs +++ b/tests/it/read/boolean.rs @@ -10,7 +10,9 @@ pub fn page_to_vec(page: &DataPage) -> Result>> { let state = BooleanPageState::try_new(page)?; match state { - BooleanPageState::Optional(validity, values) => deserialize_optional(validity, values), + BooleanPageState::Optional(validity, mut values) => { + deserialize_optional(validity, values.by_ref().map(Ok)) + } BooleanPageState::Required(bitmap, length) => Ok(BitmapIter::new(bitmap, 0, length) .into_iter() .map(Some) diff --git a/tests/it/read/fixed_binary.rs b/tests/it/read/fixed_binary.rs index 713f3d658..8988ebbf9 100644 --- a/tests/it/read/fixed_binary.rs +++ b/tests/it/read/fixed_binary.rs @@ -9,22 +9,23 @@ pub fn page_to_vec(page: &DataPage) -> Result>>> { match state { FixedLenBinaryPageState::Optional(validity, values) => { - deserialize_optional(validity, values.map(|x| x.to_vec())) + deserialize_optional(validity, values.map(|x| Ok(x.to_vec()))) } FixedLenBinaryPageState::Required(values) => { Ok(values.map(|x| x.to_vec()).map(Some).collect()) } - FixedLenBinaryPageState::RequiredDictionary(dict) => Ok(dict + FixedLenBinaryPageState::RequiredDictionary(dict) => dict .indexes .map(|x| x as usize) - .map(|x| dict.dict.value(x).to_vec()) + .map(|x| dict.dict.value(x).map(|x| x.to_vec())) .map(Some) - .collect()), + .map(|x| x.transpose()) + .collect(), FixedLenBinaryPageState::OptionalDictionary(validity, dict) => { let values = dict .indexes .map(|x| x as usize) - .map(|x| dict.dict.value(x).to_vec()); + .map(|x| dict.dict.value(x).map(|x| x.to_vec())); deserialize_optional(validity, values) } } diff --git a/tests/it/read/mod.rs b/tests/it/read/mod.rs index 68451fa4a..c8a268a5b 100644 --- a/tests/it/read/mod.rs +++ b/tests/it/read/mod.rs @@ -80,7 +80,7 @@ where let mut iterator = BasicDecompressor::new(pages, vec![]); while let Some(page) = iterator.next()? { if !has_filled { - struct_::extend_validity(&mut validity, page); + struct_::extend_validity(&mut validity, page)?; } // todo: this is wrong: multiple pages -> array arrays.push(page_to_array(page)?) @@ -91,7 +91,9 @@ where } match field { - ParquetType::PrimitiveType { .. } => Ok(arrays.pop().unwrap()), + ParquetType::PrimitiveType { .. } => { + arrays.pop().ok_or_else(|| Error::OutOfSpec("".to_string())) + } ParquetType::GroupType { converted_type, .. } => { if let Some(converted_type) = converted_type { match converted_type { @@ -119,7 +121,7 @@ pub fn read_column( .enumerate() .filter_map(|(i, x)| if x.name() == field { Some(i) } else { None }) .next() - .unwrap(); + .ok_or_else(|| Error::OutOfSpec("column does not exist".to_string()))?; let columns = get_column_iterator(reader, &metadata, row_group, field, None, vec![]); let field = &metadata.schema().fields()[field]; diff --git a/tests/it/read/primitive.rs b/tests/it/read/primitive.rs index 56bc1431c..f6e78d21c 100644 --- a/tests/it/read/primitive.rs +++ b/tests/it/read/primitive.rs @@ -46,7 +46,7 @@ impl<'a, T: NativeType> PageState<'a, T> { match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::Plain, _, true) => { - let (_, def_levels, _) = split_buffer(page); + let (_, def_levels, _) = split_buffer(page)?; let validity = HybridRleDecoderIter::new(HybridRleIter::new( Decoder::new(def_levels, 1), @@ -87,16 +87,22 @@ pub fn page_to_vec(page: &DataPage) -> Result>, Err match state { PageState::Nominal(state) => match state { - NativePageState::Optional(validity, values) => deserialize_optional(validity, values), + NativePageState::Optional(validity, mut values) => { + deserialize_optional(validity, values.by_ref().map(Ok)) + } NativePageState::Required(values) => Ok(values.map(Some).collect()), - NativePageState::RequiredDictionary(dict) => Ok(dict + NativePageState::RequiredDictionary(dict) => dict .indexes .map(|x| x as usize) - .map(|x| dict.values[x]) + .map(|x| dict.dict.value(x).copied()) .map(Some) - .collect()), + .map(|x| x.transpose()) + .collect(), NativePageState::OptionalDictionary(validity, dict) => { - let values = dict.indexes.map(|x| x as usize).map(|x| dict.values[x]); + let values = dict + .indexes + .map(|x| x as usize) + .map(|x| dict.dict.value(x).copied()); deserialize_optional(validity, values) } }, diff --git a/tests/it/read/primitive_nested.rs b/tests/it/read/primitive_nested.rs index 657b5d81c..6a77d17cf 100644 --- a/tests/it/read/primitive_nested.rs +++ b/tests/it/read/primitive_nested.rs @@ -138,7 +138,7 @@ fn read_array( } pub fn page_to_array(page: &DataPage) -> Result { - let (rep_levels, def_levels, values) = split_buffer(page); + let (rep_levels, def_levels, values) = split_buffer(page)?; match (&page.encoding(), &page.dictionary_page()) { (Encoding::Plain, None) => Ok(read_array::( @@ -193,7 +193,7 @@ fn read_dict_array( pub fn page_dict_to_array(page: &DataPage) -> Result { assert_eq!(page.descriptor.max_rep_level, 1); - let (rep_levels, def_levels, values) = split_buffer(page); + let (rep_levels, def_levels, values) = split_buffer(page)?; match (page.encoding(), &page.dictionary_page()) { (Encoding::PlainDictionary, Some(dict)) => Ok(read_dict_array::( diff --git a/tests/it/read/struct_.rs b/tests/it/read/struct_.rs index 5f48f5cfa..7905740f8 100644 --- a/tests/it/read/struct_.rs +++ b/tests/it/read/struct_.rs @@ -1,13 +1,14 @@ use parquet2::encoding::hybrid_rle::HybridRleDecoder; +use parquet2::error::Error; use parquet2::page::{split_buffer, DataPage}; use parquet2::read::levels::get_bit_width; -pub fn extend_validity(val: &mut Vec, page: &DataPage) { - let (_, def_levels, _) = split_buffer(page); +pub fn extend_validity(val: &mut Vec, page: &DataPage) -> Result<(), Error> { + let (_, def_levels, _) = split_buffer(page)?; let length = page.num_values(); if page.descriptor.max_def_level == 0 { - return; + return Ok(()); } let def_level_encoding = ( @@ -18,4 +19,5 @@ pub fn extend_validity(val: &mut Vec, page: &DataPage) { let def_levels = HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length); val.extend(def_levels.map(|x| x != 0)); + Ok(()) } diff --git a/tests/it/read/utils.rs b/tests/it/read/utils.rs index b25fc12b0..8c1b69f50 100644 --- a/tests/it/read/utils.rs +++ b/tests/it/read/utils.rs @@ -4,7 +4,7 @@ use parquet2::{ error::Error, }; -pub fn deserialize_optional>( +pub fn deserialize_optional>>( validity: DefLevelsDecoder, values: I, ) -> Result>, Error> { @@ -16,42 +16,51 @@ pub fn deserialize_optional>( } } -fn deserialize_bitmap>( - validity: HybridDecoderBitmapIter, +fn deserialize_bitmap>>( + mut validity: HybridDecoderBitmapIter, mut values: I, ) -> Result>, Error> { let mut deserialized = Vec::with_capacity(validity.len()); - validity.for_each(|run| match run { - HybridEncoded::Bitmap(bitmap, length) => { - BitmapIter::new(bitmap, 0, length) - .into_iter() - .for_each(|x| { - if x { - deserialized.push(values.next()) - } else { - deserialized.push(None) - } - }); - } + validity.try_for_each(|run| match run { + HybridEncoded::Bitmap(bitmap, length) => BitmapIter::new(bitmap, 0, length) + .into_iter() + .try_for_each(|x| { + if x { + deserialized.push(values.next().transpose()?); + } else { + deserialized.push(None); + } + Result::<_, Error>::Ok(()) + }), HybridEncoded::Repeated(is_set, length) => { if is_set { - deserialized.extend(values.by_ref().take(length).map(Some)) + deserialized.reserve(length); + for x in values.by_ref().take(length) { + deserialized.push(Some(x?)) + } } else { deserialized.extend(std::iter::repeat(None).take(length)) } + Ok(()) } - }); + })?; Ok(deserialized) } -fn deserialize_levels>( +fn deserialize_levels>>( levels: HybridRleDecoder, max: u32, mut values: I, ) -> Result>, Error> { - Ok(levels + levels .into_iter() - .map(|x| if x == max { values.next() } else { None }) - .collect()) + .map(|x| { + if x == max { + values.next().transpose() + } else { + Ok(None) + } + }) + .collect() }