From 0d0aebe7e29658304f54c6c2d747e8db784c8f12 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Tue, 29 Nov 2022 07:45:09 +0100 Subject: [PATCH] Improved API to read column chunks (#195) --- src/read/column/mod.rs | 208 ++++++++++++++++++++++++++++++++++++ src/read/column/stream.rs | 51 +++++++++ src/read/mod.rs | 217 ++------------------------------------ tests/it/read/mod.rs | 28 ++--- 4 files changed, 278 insertions(+), 226 deletions(-) create mode 100644 src/read/column/mod.rs create mode 100644 src/read/column/stream.rs diff --git a/src/read/column/mod.rs b/src/read/column/mod.rs new file mode 100644 index 00000000..4e4b03fa --- /dev/null +++ b/src/read/column/mod.rs @@ -0,0 +1,208 @@ +use std::io::{Read, Seek}; +use std::vec::IntoIter; + +use crate::error::Error; +use crate::metadata::{ColumnChunkMetaData, RowGroupMetaData}; +use crate::page::CompressedPage; +use crate::schema::types::ParquetType; + +use super::{get_field_columns, get_page_iterator, PageFilter, PageReader}; + +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +mod stream; + +/// Returns a [`ColumnIterator`] of column chunks corresponding to `field`. +/// +/// Contrarily to [`get_page_iterator`] that returns a single iterator of pages, this iterator +/// iterates over columns, one by one, and returns a [`PageReader`] per column. +/// For primitive fields (e.g. `i64`), [`ColumnIterator`] yields exactly one column. +/// For complex fields, it yields multiple columns. +/// `max_page_size` is the maximum number of bytes allowed. +pub fn get_column_iterator( + reader: R, + row_group: &RowGroupMetaData, + field_name: &str, + page_filter: Option, + scratch: Vec, + max_page_size: usize, +) -> ColumnIterator { + let columns = get_field_columns(row_group.columns(), field_name) + .cloned() + .collect::>(); + + ColumnIterator::new(reader, columns, page_filter, scratch, max_page_size) +} + +/// State of [`MutStreamingIterator`]. +#[derive(Debug)] +pub enum State { + /// Iterator still has elements + Some(T), + /// Iterator finished + Finished(Vec), +} + +/// A special kind of fallible streaming iterator where `advance` consumes the iterator. +pub trait MutStreamingIterator: Sized { + type Item; + type Error; + + fn advance(self) -> std::result::Result, Self::Error>; + fn get(&mut self) -> Option<&mut Self::Item>; +} + +/// A [`MutStreamingIterator`] that reads column chunks one by one, +/// returning a [`PageReader`] per column. +pub struct ColumnIterator { + reader: Option, + columns: Vec, + page_filter: Option, + current: Option<(PageReader, ColumnChunkMetaData)>, + scratch: Vec, + max_page_size: usize, +} + +impl ColumnIterator { + /// Returns a new [`ColumnIterator`] + /// `max_page_size` is the maximum allowed page size + pub fn new( + reader: R, + mut columns: Vec, + page_filter: Option, + scratch: Vec, + max_page_size: usize, + ) -> Self { + columns.reverse(); + Self { + reader: Some(reader), + scratch, + columns, + page_filter, + current: None, + max_page_size, + } + } +} + +impl MutStreamingIterator for ColumnIterator { + type Item = (PageReader, ColumnChunkMetaData); + type Error = Error; + + fn advance(mut self) -> Result, Error> { + let (reader, scratch) = if let Some((iter, _)) = self.current { + iter.into_inner() + } else { + (self.reader.unwrap(), self.scratch) + }; + if self.columns.is_empty() { + return Ok(State::Finished(scratch)); + }; + let column = self.columns.pop().unwrap(); + + let iter = get_page_iterator( + &column, + reader, + self.page_filter.clone(), + scratch, + self.max_page_size, + )?; + let current = Some((iter, column)); + Ok(State::Some(Self { + reader: None, + columns: self.columns, + page_filter: self.page_filter, + current, + scratch: vec![], + max_page_size: self.max_page_size, + })) + } + + fn get(&mut self) -> Option<&mut Self::Item> { + self.current.as_mut() + } +} + +/// A [`MutStreamingIterator`] of pre-read column chunks +#[derive(Debug)] +pub struct ReadColumnIterator { + field: ParquetType, + chunks: Vec<(Vec>, ColumnChunkMetaData)>, + current: Option<(IntoIter>, ColumnChunkMetaData)>, +} + +impl ReadColumnIterator { + /// Returns a new [`ReadColumnIterator`] + pub fn new( + field: ParquetType, + chunks: Vec<(Vec>, ColumnChunkMetaData)>, + ) -> Self { + Self { + field, + chunks, + current: None, + } + } +} + +impl MutStreamingIterator for ReadColumnIterator { + type Item = (IntoIter>, ColumnChunkMetaData); + type Error = Error; + + fn advance(mut self) -> Result, Error> { + if self.chunks.is_empty() { + return Ok(State::Finished(vec![])); + } + self.current = self + .chunks + .pop() + .map(|(pages, meta)| (pages.into_iter(), meta)); + Ok(State::Some(Self { + field: self.field, + chunks: self.chunks, + current: self.current, + })) + } + + fn get(&mut self) -> Option<&mut Self::Item> { + self.current.as_mut() + } +} + +/// Reads all columns that are part of the parquet field `field_name` +/// # Implementation +/// This operation is IO-bounded `O(C)` where C is the number of columns associated to +/// the field (one for non-nested types) +/// It reads the columns sequentially. Use [`read_column`] to fork this operation to multiple +/// readers. +pub fn read_columns<'a, R: Read + Seek>( + reader: &mut R, + columns: &'a [ColumnChunkMetaData], + field_name: &'a str, +) -> Result)>, Error> { + get_field_columns(columns, field_name) + .map(|column| read_column(reader, column).map(|c| (column, c))) + .collect() +} + +/// Reads a column chunk into memory +/// This operation is IO-bounded and allocates the column's `compressed_size`. +pub fn read_column(reader: &mut R, column: &ColumnChunkMetaData) -> Result, Error> +where + R: Read + Seek, +{ + let (start, length) = column.byte_range(); + reader.seek(std::io::SeekFrom::Start(start))?; + + let mut chunk = vec![]; + chunk.try_reserve(length as usize)?; + reader + .by_ref() + .take(length as u64) + .read_to_end(&mut chunk)?; + Ok(chunk) +} + +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +pub use stream::{read_column_async, read_columns_async}; diff --git a/src/read/column/stream.rs b/src/read/column/stream.rs new file mode 100644 index 00000000..d8419e76 --- /dev/null +++ b/src/read/column/stream.rs @@ -0,0 +1,51 @@ +use futures::future::{try_join_all, BoxFuture}; +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; + +use crate::error::Error; +use crate::metadata::ColumnChunkMetaData; +use crate::read::get_field_columns; + +/// Reads a single column chunk into memory asynchronously +pub async fn read_column_async<'b, R, F>( + factory: F, + meta: &ColumnChunkMetaData, +) -> Result, Error> +where + R: AsyncRead + AsyncSeek + Send + Unpin, + F: Fn() -> BoxFuture<'b, std::io::Result>, +{ + let mut reader = factory().await?; + let (start, length) = meta.byte_range(); + reader.seek(std::io::SeekFrom::Start(start)).await?; + + let mut chunk = vec![]; + chunk.try_reserve(length as usize)?; + reader.take(length as u64).read_to_end(&mut chunk).await?; + Result::Ok(chunk) +} + +/// Reads all columns that are part of the parquet field `field_name` +/// # Implementation +/// This operation is IO-bounded `O(C)` where C is the number of columns associated to +/// the field (one for non-nested types) +/// +/// It does so asynchronously via a single `join_all` over all the necessary columns for +/// `field_name`. +pub async fn read_columns_async< + 'a, + 'b, + R: AsyncRead + AsyncSeek + Send + Unpin, + F: Fn() -> BoxFuture<'b, std::io::Result> + Clone, +>( + factory: F, + columns: &'a [ColumnChunkMetaData], + field_name: &'a str, +) -> Result)>, Error> { + let fields = get_field_columns(columns, field_name).collect::>(); + let futures = fields + .iter() + .map(|meta| async { read_column_async(factory.clone(), meta).await }); + + let columns = try_join_all(futures).await?; + Ok(fields.into_iter().zip(columns).collect()) +} diff --git a/src/read/mod.rs b/src/read/mod.rs index fee2fd94..736b660c 100644 --- a/src/read/mod.rs +++ b/src/read/mod.rs @@ -1,3 +1,4 @@ +mod column; mod compression; mod indexes; pub mod levels; @@ -8,8 +9,8 @@ mod stream; use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; -use std::vec::IntoIter; +pub use column::*; pub use compression::{decompress, BasicDecompressor, Decompressor}; pub use metadata::{deserialize_metadata, read_metadata}; #[cfg(feature = "async")] @@ -21,10 +22,7 @@ pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageRe #[cfg_attr(docsrs, doc(cfg(feature = "async")))] pub use stream::read_metadata as read_metadata_async; -use crate::error::Error; use crate::metadata::{ColumnChunkMetaData, RowGroupMetaData}; -use crate::page::CompressedPage; -use crate::schema::types::ParquetType; use crate::{error::Result, metadata::FileMetaData}; pub use indexes::{read_columns_indexes, read_pages_locations}; @@ -67,213 +65,15 @@ pub fn get_page_iterator( )) } -/// Returns an [`Iterator`] of [`ColumnChunkMetaData`] corresponding to the columns -/// from `field` at `row_group`. -/// For primitive fields (e.g. `i64`), the iterator has exactly one item. +/// Returns all [`ColumnChunkMetaData`] associated to `field_name`. +/// For non-nested types, this returns an iterator with a single column pub fn get_field_columns<'a>( - metadata: &'a FileMetaData, - row_group: usize, - field: &'a ParquetType, + columns: &'a [ColumnChunkMetaData], + field_name: &'a str, ) -> impl Iterator { - metadata - .schema() - .columns() + columns .iter() - .enumerate() - .filter(move |x| x.1.path_in_schema[0] == field.name()) - .map(move |x| &metadata.row_groups[row_group].columns()[x.0]) -} - -/// Returns a [`ColumnIterator`] of column chunks corresponding to `field`. -/// -/// Contrarily to [`get_page_iterator`] that returns a single iterator of pages, this iterator -/// returns multiple iterators, one per physical column of the `field`. -/// For primitive fields (e.g. `i64`), [`ColumnIterator`] yields exactly one column. -/// For complex fields, it yields multiple columns. -/// `max_header_size` is the maximum number of bytes thrift is allowed to allocate -/// to read a page header. -pub fn get_column_iterator( - reader: R, - metadata: &FileMetaData, - row_group: usize, - field: usize, - page_filter: Option, - scratch: Vec, - max_header_size: usize, -) -> ColumnIterator { - let field = metadata.schema().fields()[field].clone(); - let columns = get_field_columns(metadata, row_group, &field) - .cloned() - .collect::>(); - - ColumnIterator::new( - reader, - field, - columns, - page_filter, - scratch, - max_header_size, - ) -} - -/// State of [`MutStreamingIterator`]. -#[derive(Debug)] -pub enum State { - /// Iterator still has elements - Some(T), - /// Iterator finished - Finished(Vec), -} - -/// A special kind of fallible streaming iterator where `advance` consumes the iterator. -pub trait MutStreamingIterator: Sized { - type Item; - type Error; - - fn advance(self) -> std::result::Result, Self::Error>; - fn get(&mut self) -> Option<&mut Self::Item>; -} - -/// Trait describing a [`MutStreamingIterator`] of column chunks. -pub trait ColumnChunkIter: - MutStreamingIterator -{ - /// The field associated to the set of column chunks this iterator iterates over. - fn field(&self) -> &ParquetType; -} - -/// A [`MutStreamingIterator`] that reads column chunks one by one, -/// returning a [`PageReader`] per column. -pub struct ColumnIterator { - reader: Option, - field: ParquetType, - columns: Vec, - page_filter: Option, - current: Option<(PageReader, ColumnChunkMetaData)>, - scratch: Vec, - max_header_size: usize, -} - -impl ColumnIterator { - /// Returns a new [`ColumnIterator`] - /// `max_header_size` is the maximum number of bytes thrift is allowed to allocate - /// to read a page header. - pub fn new( - reader: R, - field: ParquetType, - mut columns: Vec, - page_filter: Option, - scratch: Vec, - max_header_size: usize, - ) -> Self { - columns.reverse(); - Self { - reader: Some(reader), - field, - scratch, - columns, - page_filter, - current: None, - max_header_size, - } - } -} - -impl MutStreamingIterator for ColumnIterator { - type Item = (PageReader, ColumnChunkMetaData); - type Error = Error; - - fn advance(mut self) -> Result> { - let (reader, scratch) = if let Some((iter, _)) = self.current { - iter.into_inner() - } else { - (self.reader.unwrap(), self.scratch) - }; - if self.columns.is_empty() { - return Ok(State::Finished(scratch)); - }; - let column = self.columns.pop().unwrap(); - - let iter = get_page_iterator( - &column, - reader, - self.page_filter.clone(), - scratch, - self.max_header_size, - )?; - let current = Some((iter, column)); - Ok(State::Some(Self { - reader: None, - field: self.field, - columns: self.columns, - page_filter: self.page_filter, - current, - scratch: vec![], - max_header_size: self.max_header_size, - })) - } - - fn get(&mut self) -> Option<&mut Self::Item> { - self.current.as_mut() - } -} - -impl ColumnChunkIter> for ColumnIterator { - fn field(&self) -> &ParquetType { - &self.field - } -} - -/// A [`MutStreamingIterator`] of pre-read column chunks -#[derive(Debug)] -pub struct ReadColumnIterator { - field: ParquetType, - chunks: Vec<(Vec>, ColumnChunkMetaData)>, - current: Option<(IntoIter>, ColumnChunkMetaData)>, -} - -impl ReadColumnIterator { - /// Returns a new [`ReadColumnIterator`] - pub fn new( - field: ParquetType, - chunks: Vec<(Vec>, ColumnChunkMetaData)>, - ) -> Self { - Self { - field, - chunks, - current: None, - } - } -} - -impl MutStreamingIterator for ReadColumnIterator { - type Item = (IntoIter>, ColumnChunkMetaData); - type Error = Error; - - fn advance(mut self) -> Result> { - if self.chunks.is_empty() { - return Ok(State::Finished(vec![])); - } - self.current = self - .chunks - .pop() - .map(|(pages, meta)| (pages.into_iter(), meta)); - Ok(State::Some(Self { - field: self.field, - chunks: self.chunks, - current: self.current, - })) - } - - fn get(&mut self) -> Option<&mut Self::Item> { - self.current.as_mut() - } -} - -impl ColumnChunkIter>> for ReadColumnIterator { - fn field(&self) -> &ParquetType { - &self.field - } + .filter(move |x| x.descriptor().path_in_schema[0] == field_name) } #[cfg(test)] @@ -414,7 +214,6 @@ mod tests { let mut iter = ColumnIterator::new( file, - metadata.schema().fields()[0].clone(), metadata.row_groups[0].columns().to_vec(), None, vec![], diff --git a/tests/it/read/mod.rs b/tests/it/read/mod.rs index db4b6b7d..086d3b7c 100644 --- a/tests/it/read/mod.rs +++ b/tests/it/read/mod.rs @@ -215,7 +215,7 @@ where pub fn read_column( reader: &mut R, row_group: usize, - field: &str, + field_name: &str, ) -> Result<(Array, Option>)> { let metadata = read_metadata(reader)?; @@ -223,23 +223,19 @@ pub fn read_column( .schema() .fields() .iter() - .enumerate() - .filter_map(|(i, x)| if x.name() == field { Some(i) } else { None }) - .next() + .find_map(|field| (field.name() == field_name).then(|| field)) .ok_or_else(|| Error::OutOfSpec("column does not exist".to_string()))?; let columns = get_column_iterator( reader, - &metadata, - row_group, - field, + &metadata.row_groups[row_group], + field.name(), None, vec![], usize::MAX, ); - let field = &metadata.schema().fields()[field]; - let mut statistics = get_field_columns(&metadata, row_group, field) + let mut statistics = get_field_columns(metadata.row_groups[row_group].columns(), field.name()) .map(|column_meta| column_meta.statistics().transpose()) .collect::>>()?; @@ -254,7 +250,7 @@ pub async fn read_column_async< >( reader: &mut R, row_group: usize, - field: &str, + field_name: &str, ) -> Result<(Array, Option>)> { let metadata = read_metadata_async(reader).await?; @@ -262,18 +258,16 @@ pub async fn read_column_async< .schema() .fields() .iter() - .enumerate() - .filter_map(|(i, x)| if x.name() == field { Some(i) } else { None }) + .find_map(|field| (field.name() == field_name).then(|| field)) + .ok_or_else(|| Error::OutOfSpec("column does not exist".to_string()))?; + + let column = get_field_columns(metadata.row_groups[row_group].columns(), field.name()) .next() .unwrap(); - let column = &metadata.row_groups[0].columns()[0]; - let pages = get_page_stream(column, reader, vec![], Arc::new(|_, _| true), usize::MAX).await?; - let field = &metadata.schema().fields()[field]; - - let mut statistics = get_field_columns(&metadata, row_group, field) + let mut statistics = get_field_columns(metadata.row_groups[row_group].columns(), field.name()) .map(|column_meta| column_meta.statistics().transpose()) .collect::>>()?;