Skip to content

Commit

Permalink
Removed AsyncSeek requirement from page stream (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
medwards authored Jun 15, 2022
1 parent 2cd218f commit 34aac65
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 14 deletions.
7 changes: 6 additions & 1 deletion src/page/page_dict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ pub struct CompressedDictPage {
}

impl CompressedDictPage {
pub fn new(buffer: Vec<u8>, compression: Compression, uncompressed_page_size: usize, num_values: usize) -> Self {
pub fn new(
buffer: Vec<u8>,
compression: Compression,
uncompressed_page_size: usize,
num_values: usize,
) -> Self {
Self {
buffer,
compression,
Expand Down
8 changes: 6 additions & 2 deletions src/read/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,13 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
Cursor::new(buffer)
};

deserialize_metadata(reader)
}

/// Parse loaded metadata bytes
pub fn deserialize_metadata<R: Read>(reader: R) -> Result<FileMetaData> {
let mut prot = TCompactInputProtocol::new(reader);
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| Error::General(format!("Could not parse metadata: {}", e)))?;
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)?;

FileMetaData::try_from_thrift(metadata)
}
4 changes: 2 additions & 2 deletions src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use std::sync::Arc;
use std::vec::IntoIter;

pub use compression::{decompress, BasicDecompressor, Decompressor};
pub use metadata::read_metadata;
pub use page::get_page_stream;
pub use metadata::{deserialize_metadata, read_metadata};
pub use page::{get_page_stream, get_page_stream_from_column_start};
pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader};
pub use stream::read_metadata as read_metadata_async;

Expand Down
1 change: 1 addition & 0 deletions src/read/page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ pub trait PageIterator: Iterator<Item = Result<CompressedDataPage, Error>> {
}

pub use stream::get_page_stream;
pub use stream::get_page_stream_from_column_start;
23 changes: 21 additions & 2 deletions src/read/page/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io::SeekFrom;

use async_stream::try_stream;
use futures::io::{copy, sink};
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream};
use parquet_format_async_temp::thrift::protocol::TCompactInputStreamProtocol;

Expand All @@ -22,6 +23,24 @@ pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>(
get_page_stream_with_page_meta(column_metadata.into(), reader, buffer, pages_filter).await
}

/// Returns a stream of compressed data pages from a reader that begins at the start of the column
pub async fn get_page_stream_from_column_start<'a, R: AsyncRead + Unpin + Send>(
column_metadata: &'a ColumnChunkMetaData,
reader: &'a mut R,
buffer: Vec<u8>,
pages_filter: PageFilter,
) -> Result<impl Stream<Item = Result<CompressedDataPage>> + 'a> {
let page_metadata: PageMetaData = column_metadata.into();
Ok(_get_page_stream(
reader,
page_metadata.num_values,
page_metadata.compression,
page_metadata.descriptor,
buffer,
pages_filter,
))
}

/// Returns a stream of compressed data pages with [`PageMetaData`]
pub async fn get_page_stream_with_page_meta<RR: AsyncRead + Unpin + Send + AsyncSeek>(
page_metadata: PageMetaData,
Expand All @@ -41,7 +60,7 @@ pub async fn get_page_stream_with_page_meta<RR: AsyncRead + Unpin + Send + Async
))
}

fn _get_page_stream<R: AsyncRead + AsyncSeek + Unpin + Send>(
fn _get_page_stream<R: AsyncRead + Unpin + Send>(
reader: &mut R,
total_num_values: i64,
compression: Compression,
Expand All @@ -64,7 +83,7 @@ fn _get_page_stream<R: AsyncRead + AsyncSeek + Unpin + Send>(
if let Some(data_header) = data_header {
if !pages_filter(&descriptor, &data_header) {
// page to be skipped, we sill need to seek
reader.seek(SeekFrom::Current(read_size)).await?;
copy(reader.take(read_size as u64), &mut sink()).await?;
continue
}
}
Expand Down
9 changes: 2 additions & 7 deletions src/read/stream.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::io::{Cursor, Seek, SeekFrom};

use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use parquet_format_async_temp::thrift::protocol::TCompactInputProtocol;
use parquet_format_async_temp::FileMetaData as TFileMetaData;

use super::super::{metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC};
use super::metadata::metadata_len;
use super::metadata::{deserialize_metadata, metadata_len};
use crate::error::{Error, Result};
use crate::HEADER_SIZE;

Expand Down Expand Up @@ -85,8 +83,5 @@ pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>
Cursor::new(buffer)
};

let mut prot = TCompactInputProtocol::new(reader);
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)?;

FileMetaData::try_from_thrift(metadata)
deserialize_metadata(reader)
}

0 comments on commit 34aac65

Please sign in to comment.