Skip to content

Commit

Permalink
Add support to read async (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 9, 2021
1 parent d2a6cff commit 5ec7e09
Show file tree
Hide file tree
Showing 48 changed files with 1,002 additions and 292 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ name = "parquet2"
bench = false

[dependencies]
parquet-format = "~2.6.1"
thrift = "0.13"
parquet-format-async-temp = "0.1.0"
bitpacking = { version = "0.8.2", features = ["bitpacker1x"] }
streaming-iterator = "0.1.5"

async-stream = { version = "0.3.2", optional = true }
futures = { version = "0.3", optional = true }

snap = { version = "^1.0", optional = true }
Expand All @@ -29,4 +29,4 @@ zstd = { version = "^0.9", optional = true }
default = ["snappy", "gzip", "lz4", "zstd", "brotli", "stream"]
snappy = ["snap"]
gzip = ["flate2"]
stream = ["futures"]
stream = ["futures", "async-stream"]
10 changes: 10 additions & 0 deletions examples/s3/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "s3"
version = "0.1.0"
edition = "2018"

[dependencies]
parquet2 = { path = "../../" }
rust-s3 = { version = "0.27.0-rc4", features = ["blocking", "futures"] }
futures = "0.3"
tokio = { version = "1.0.0", features = ["macros", "rt-multi-thread"] }
67 changes: 67 additions & 0 deletions examples/s3/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::sync::Arc;

use futures::{
future::BoxFuture,
pin_mut,
StreamExt
};
use parquet2::{
error::Result,
read::{get_page_stream, read_metadata_async},
statistics::BinaryStatistics,
};
use s3::Bucket;

mod stream;
use stream::{RangedStreamer, SeekOutput};

#[tokio::main]
async fn main() -> Result<()> {
let bucket_name = "ursa-labs-taxi-data";
let region = "us-east-2".parse().unwrap();
let bucket = Bucket::new_public(bucket_name, region).unwrap();
let path = "2009/01/data.parquet".to_string();

let (data, _) = bucket.head_object(&path).await.unwrap();
let length = data.content_length.unwrap() as usize;

let range_get = std::sync::Arc::new(move |start: u64, length: usize| {
let bucket = bucket.clone();
let path = path.clone();
Box::pin(async move {
let bucket = bucket.clone();
let path = path.clone();
let (mut data, _) = bucket
.get_object_range(&path, start, Some(start + length as u64))
.await
.map_err(|x| std::io::Error::new(std::io::ErrorKind::Other, x.to_string()))?;

data.truncate(length);
Ok(SeekOutput { start, data })
}) as BoxFuture<'static, std::io::Result<SeekOutput>>
});

let mut reader = RangedStreamer::new(length, 1024 * 1024, range_get);

let metadata = read_metadata_async(&mut reader).await?;

// metadata
println!("{}", metadata.num_rows);

// * first row group
// * first column
// * do not skip any pages
let pages = get_page_stream(&metadata, 0, 0, &mut reader, vec![], Arc::new(|_, _| true)).await?;

pin_mut!(pages); // needed for iteration

let first_page = pages.next().await.unwrap()?;
// the page statistics
// first unwrap: they exist
let a = first_page.statistics().unwrap()?;
let a = a.as_any().downcast_ref::<BinaryStatistics>().unwrap();
println!("{:?}", a.min_value);
println!("{:?}", a.max_value);
println!("{:?}", a.null_count);
Ok(())
}
113 changes: 113 additions & 0 deletions examples/s3/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Special thanks to Alice for the help: https://users.rust-lang.org/t/63019/6
use std::io::{Result, SeekFrom};
use std::pin::Pin;

use futures::{
future::BoxFuture,
io::{AsyncRead, AsyncSeek},
Future,
};

pub struct RangedStreamer {
pos: u64,
length: u64, // total size
state: State,
range_get: F,
min_request_size: usize, // requests have at least this size
}

enum State {
HasChunk(SeekOutput),
Seeking(BoxFuture<'static, std::io::Result<SeekOutput>>),
}

pub struct SeekOutput {
pub start: u64,
pub data: Vec<u8>,
}

pub type F = std::sync::Arc<
dyn Fn(u64, usize) -> BoxFuture<'static, std::io::Result<SeekOutput>> + Send + Sync,
>;

impl RangedStreamer {
pub fn new(length: usize, min_request_size: usize, range_get: F) -> Self {
let length = length as u64;
Self {
pos: 0,
length,
state: State::HasChunk(SeekOutput {
start: 0,
data: vec![],
}),
range_get,
min_request_size,
}
}
}

// whether `test_interval` is inside `a` (start, length).
fn range_includes(a: (usize, usize), test_interval: (usize, usize)) -> bool {
if test_interval.0 < a.0 {
return false;
}
let test_end = test_interval.0 + test_interval.1;
let a_end = a.0 + a.1;
if test_end > a_end {
return false;
}
true
}

impl AsyncRead for RangedStreamer {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<Result<usize>> {
let requested_range = (self.pos as usize, buf.len());
let min_request_size = self.min_request_size;
match &mut self.state {
State::HasChunk(output) => {
let existing_range = (output.start as usize, output.data.len());
if range_includes(existing_range, requested_range) {
let offset = requested_range.0 - existing_range.0;
buf.copy_from_slice(&output.data[offset..offset + buf.len()]);
self.pos += buf.len() as u64;
std::task::Poll::Ready(Ok(buf.len()))
} else {
let start = requested_range.0 as u64;
let length = std::cmp::max(min_request_size, requested_range.1);
let future = (self.range_get)(start, length);
self.state = State::Seeking(Box::pin(future));
self.poll_read(cx, buf)
}
}
State::Seeking(ref mut future) => match Pin::new(future).poll(cx) {
std::task::Poll::Ready(v) => {
match v {
Ok(output) => self.state = State::HasChunk(output),
Err(e) => return std::task::Poll::Ready(Err(e)),
};
self.poll_read(cx, buf)
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
}
}
}

impl AsyncSeek for RangedStreamer {
fn poll_seek(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
pos: std::io::SeekFrom,
) -> std::task::Poll<Result<u64>> {
match pos {
SeekFrom::Start(pos) => self.pos = pos,
SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64,
SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
};
std::task::Poll::Ready(Ok(self.pos))
}
}
6 changes: 3 additions & 3 deletions integration-tests/src/read/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use parquet::{
encoding::{bitpacking, plain_byte_array, uleb128, Encoding},
error::Result,
metadata::ColumnDescriptor,
page::{BinaryPageDict, DataPage, DataPageHeader},
page::{BinaryPageDict, DataPage, DataPageHeader, DataPageHeaderExt},
read::levels,
};

Expand Down Expand Up @@ -81,7 +81,7 @@ pub fn page_dict_to_vec(
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
Expand Down Expand Up @@ -140,7 +140,7 @@ pub fn page_to_vec(page: &DataPage, descriptor: &ColumnDescriptor) -> Result<Vec
values,
page.num_values() as u32,
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
Expand Down
12 changes: 6 additions & 6 deletions integration-tests/src/read/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use parquet::{
encoding::{bitpacking, uleb128, Encoding},
error::{ParquetError, Result},
metadata::ColumnDescriptor,
page::{DataPage, DataPageHeader, PrimitivePageDict},
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
types::NativeType,
};
Expand Down Expand Up @@ -122,7 +122,7 @@ pub fn page_dict_to_vec<T: NativeType>(
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
Expand All @@ -132,7 +132,7 @@ pub fn page_dict_to_vec<T: NativeType>(
)),
_ => todo!(),
},
DataPageHeader::V2(header) => match (&header.encoding, &page.dictionary_page()) {
DataPageHeader::V2(header) => match (&header.encoding(), &page.dictionary_page()) {
(Encoding::RleDictionary, Some(dict)) | (Encoding::PlainDictionary, Some(dict)) => {
let (_, def_levels, values) = split_buffer_v2(
page.buffer(),
Expand All @@ -158,7 +158,7 @@ pub fn page_to_vec<T: NativeType>(
) -> Result<Vec<Option<T>>> {
assert_eq!(descriptor.max_rep_level(), 0);
match page.header() {
DataPageHeader::V1(header) => match (&header.encoding, &page.dictionary_page()) {
DataPageHeader::V1(header) => match (&header.encoding(), &page.dictionary_page()) {
(Encoding::Plain, None) => {
let (_, def_levels, values) =
split_buffer_v1(page.buffer(), false, descriptor.max_def_level() > 0);
Expand All @@ -167,14 +167,14 @@ pub fn page_to_vec<T: NativeType>(
values,
page.num_values() as u32,
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
}
_ => todo!(),
},
DataPageHeader::V2(header) => match (&header.encoding, &page.dictionary_page()) {
DataPageHeader::V2(header) => match (&header.encoding(), &page.dictionary_page()) {
(Encoding::Plain, None) => {
let (_, def_levels, values) = split_buffer_v2(
page.buffer(),
Expand Down
12 changes: 6 additions & 6 deletions integration-tests/src/read/primitive_nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use parquet::{
encoding::{bitpacking, uleb128, Encoding},
error::{ParquetError, Result},
metadata::ColumnDescriptor,
page::{DataPage, DataPageHeader, PrimitivePageDict},
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::levels::{get_bit_width, split_buffer_v1, RLEDecoder},
types::NativeType,
};
Expand Down Expand Up @@ -152,11 +152,11 @@ pub fn page_to_array<T: NativeType>(
values,
page.num_values() as u32,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
Expand Down Expand Up @@ -204,7 +204,7 @@ pub fn page_dict_to_array<T: NativeType>(
) -> Result<Array> {
assert_eq!(descriptor.max_rep_level(), 1);
match page.header() {
DataPageHeader::V1(header) => match (&page.encoding(), &page.dictionary_page()) {
DataPageHeader::V1(header) => match (page.encoding(), &page.dictionary_page()) {
(Encoding::PlainDictionary, Some(dict)) => {
let (rep_levels, def_levels, values) = split_buffer_v1(page.buffer(), true, true);
Ok(read_dict_array::<T>(
Expand All @@ -214,11 +214,11 @@ pub fn page_dict_to_array<T: NativeType>(
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
Expand Down
8 changes: 4 additions & 4 deletions integration-tests/src/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod tests {

use crate::tests::{alltypes_plain, alltypes_statistics};

use parquet::compression::CompressionCodec;
use parquet::compression::Compression;
use parquet::error::Result;
use parquet::metadata::SchemaDescriptor;
use parquet::statistics::Statistics;
Expand All @@ -47,7 +47,7 @@ mod tests {

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand Down Expand Up @@ -136,7 +136,7 @@ mod tests2 {

use crate::write::primitive::array_to_page_v1;
use parquet::{
compression::CompressionCodec,
compression::Compression,
error::Result,
metadata::SchemaDescriptor,
read::read_metadata,
Expand All @@ -157,7 +157,7 @@ mod tests2 {

let options = WriteOptions {
write_statistics: false,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand Down
6 changes: 3 additions & 3 deletions integration-tests/src/write/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ pub fn array_to_page_v1<T: NativeType>(

let header = DataPageHeaderV1 {
num_values: array.len() as i32,
encoding: Encoding::Plain,
definition_level_encoding: Encoding::Rle,
repetition_level_encoding: Encoding::Rle,
encoding: Encoding::Plain.into(),
definition_level_encoding: Encoding::Rle.into(),
repetition_level_encoding: Encoding::Rle.into(),
statistics,
};

Expand Down
Loading

0 comments on commit 5ec7e09

Please sign in to comment.