Skip to content

Commit

Permalink
Adds statistics to async writer (#144)
Browse files Browse the repository at this point in the history
Also adds option for compression to test_column_async

Co-authored-by: Ryan <ryan@ryanj.net>
  • Loading branch information
TurnOfACard and Ryan authored May 26, 2022
1 parent 3774868 commit e2b7533
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 6 deletions.
23 changes: 22 additions & 1 deletion src/write/indexes/write.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use futures::AsyncWrite;
use std::io::Write;

use parquet_format_async_temp::thrift::protocol::TCompactOutputProtocol;
use parquet_format_async_temp::thrift::protocol::{
TCompactOutputProtocol, TCompactOutputStreamProtocol,
};

use crate::error::Result;
pub use crate::metadata::KeyValue;
Expand All @@ -15,8 +18,26 @@ pub fn write_column_index<W: Write>(writer: &mut W, pages: &[PageWriteSpec]) ->
Ok(index.write_to_out_protocol(&mut protocol)? as u64)
}

pub async fn write_column_index_async<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
pages: &[PageWriteSpec],
) -> Result<u64> {
let index = serialize_column_index(pages)?;
let mut protocol = TCompactOutputStreamProtocol::new(writer);
Ok(index.write_to_out_stream_protocol(&mut protocol).await? as u64)
}

pub fn write_offset_index<W: Write>(writer: &mut W, pages: &[PageWriteSpec]) -> Result<u64> {
let index = serialize_offset_index(pages)?;
let mut protocol = TCompactOutputProtocol::new(&mut *writer);
Ok(index.write_to_out_protocol(&mut protocol)? as u64)
}

pub async fn write_offset_index_async<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
pages: &[PageWriteSpec],
) -> Result<u64> {
let index = serialize_offset_index(pages)?;
let mut protocol = TCompactOutputStreamProtocol::new(&mut *writer);
Ok(index.write_to_out_stream_protocol(&mut protocol).await? as u64)
}
3 changes: 2 additions & 1 deletion src/write/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub async fn write_row_group_async<
mut offset: u64,
descriptors: &[ColumnDescriptor],
columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,
ordinal: usize,
) -> Result<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>
where
W: AsyncWrite + Unpin + Send,
Expand Down Expand Up @@ -193,7 +194,7 @@ where
sorting_columns: None,
file_offset,
total_compressed_size: Some(total_compressed_size),
ordinal: None,
ordinal: ordinal.try_into().ok(),
},
specs,
bytes_written,
Expand Down
33 changes: 32 additions & 1 deletion src/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use parquet_format_async_temp::{
FileMetaData, RowGroup,
};

use crate::write::indexes::{write_column_index_async, write_offset_index_async};
use crate::write::page::PageWriteSpec;
use crate::write::State;
use crate::{
error::{Error, Result},
Expand Down Expand Up @@ -53,6 +55,7 @@ pub struct FileStreamer<W: AsyncWrite + Unpin + Send> {

offset: u64,
row_groups: Vec<RowGroup>,
page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
/// Used to store the current state for writing the file
state: State,
}
Expand Down Expand Up @@ -85,6 +88,7 @@ impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
created_by,
offset: 0,
row_groups: vec![],
page_specs: vec![],
state: State::Initialised,
}
}
Expand Down Expand Up @@ -114,15 +118,19 @@ impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
if self.offset == 0 {
self.start().await?;
}
let (group, _specs, size) = write_row_group_async(

let ordinal = self.row_groups.len();
let (group, specs, size) = write_row_group_async(
&mut self.writer,
self.offset,
self.schema.columns(),
row_group,
ordinal,
)
.await?;
self.offset += size;
self.row_groups.push(group);
self.page_specs.push(specs);
Ok(())
}

Expand All @@ -139,6 +147,29 @@ impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
// compute file stats
let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();

if self.options.write_statistics {
// write column indexes (require page statistics)
for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {
for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {
let offset = self.offset;
column.column_index_offset = Some(offset as i64);
self.offset += write_column_index_async(&mut self.writer, pages).await?;
let length = self.offset - offset;
column.column_index_length = Some(length as i32);
}
}
};

// write offset index
for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {
for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {
let offset = self.offset;
column.offset_index_offset = Some(offset as i64);
self.offset += write_offset_index_async(&mut self.writer, pages).await?;
column.offset_index_length = Some((self.offset - offset) as i32);
}
}

let metadata = FileMetaData::new(
self.options.version.into(),
self.schema.clone().into_thrift(),
Expand Down
7 changes: 4 additions & 3 deletions tests/it/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ fn basic() -> Result<()> {
Ok(())
}

async fn test_column_async(column: &str) -> Result<()> {
async fn test_column_async(column: &str, compression: CompressionOptions) -> Result<()> {
let array = alltypes_plain(column);

let options = WriteOptions {
Expand All @@ -246,6 +246,7 @@ async fn test_column_async(column: &str) -> Result<()> {
Array::Int96(_) => PhysicalType::Int96,
Array::Float32(_) => PhysicalType::Float,
Array::Float64(_) => PhysicalType::Double,
Array::Binary(_) => PhysicalType::ByteArray,
_ => todo!(),
};

Expand All @@ -262,7 +263,7 @@ async fn test_column_async(column: &str) -> Result<()> {
&options,
&a[0].descriptor,
))),
CompressionOptions::Uncompressed,
compression,
vec![],
));
let columns = std::iter::once(Ok(pages));
Expand All @@ -287,5 +288,5 @@ async fn test_column_async(column: &str) -> Result<()> {

#[tokio::test]
async fn test_async() -> Result<()> {
test_column_async("float_col").await
test_column_async("float_col", CompressionOptions::Uncompressed).await
}

0 comments on commit e2b7533

Please sign in to comment.