diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 766941cbc..acd82445a 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -24,7 +24,7 @@ jobs: deactivate - uses: Swatinem/rust-cache@v1 - name: Generate code coverage - run: cargo llvm-cov --lcov --output-path lcov.info + run: cargo llvm-cov --features full --lcov --output-path lcov.info - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index aaaf5ca31..0ceb347eb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,9 +23,11 @@ jobs: python tests/write_pyarrow.py deactivate - name: Run - run: cargo test + run: | + cargo test --features full + cargo check # compiles without async - name: Run lz4-flex - run: cargo test --no-default-features --features lz4_flex,bloom_filter,snappy,brotli,zstd,gzip + run: cargo test --no-default-features --features lz4_flex,bloom_filter,snappy,brotli,zstd,gzip,async clippy: name: Clippy diff --git a/Cargo.toml b/Cargo.toml index 0c97ff424..3b59f2451 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,12 @@ name = "parquet2" bench = false [dependencies] -parquet-format-safe = "0.1" +parquet-format-safe = "0.2" bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] } streaming-decompression = "0.1" -async-stream = { version = "0.3.2" } -futures = { version = "0.3" } +async-stream = { version = "0.3.2", optional = true } +futures = { version = "0.3", optional = true } snap = { version = "^1.0", optional = true } brotli = { version = "^3.3", optional = true } @@ -38,6 +38,8 @@ rand = "0.8" [features] default = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter"] +full = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter", "async"] +async = [ "async-stream", "futures", "parquet-format-safe/async" ] snappy = ["snap"] gzip = ["flate2/rust_backend"] gzip_zlib_ng = ["flate2/zlib-ng"] diff --git a/src/read/mod.rs b/src/read/mod.rs index 24ce32269..95448f9a2 100644 --- a/src/read/mod.rs +++ b/src/read/mod.rs @@ -3,6 +3,7 @@ mod indexes; pub mod levels; mod metadata; mod page; +#[cfg(feature = "async")] mod stream; use std::io::{Read, Seek, SeekFrom}; @@ -11,8 +12,13 @@ use std::vec::IntoIter; pub use compression::{decompress, BasicDecompressor, Decompressor}; pub use metadata::{deserialize_metadata, read_metadata}; +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] pub use page::{get_page_stream, get_page_stream_from_column_start}; pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader}; + +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] pub use stream::read_metadata as read_metadata_async; use crate::error::Error; diff --git a/src/read/page/mod.rs b/src/read/page/mod.rs index 13df8bb65..365172ec2 100644 --- a/src/read/page/mod.rs +++ b/src/read/page/mod.rs @@ -1,5 +1,6 @@ mod indexed_reader; mod reader; +#[cfg(feature = "async")] mod stream; use crate::{error::Error, page::CompressedPage}; @@ -11,5 +12,6 @@ pub trait PageIterator: Iterator> { fn swap_buffer(&mut self, buffer: &mut Vec); } -pub use stream::get_page_stream; -pub use stream::get_page_stream_from_column_start; +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] +pub use stream::{get_page_stream, get_page_stream_from_column_start}; diff --git a/src/write/column_chunk.rs b/src/write/column_chunk.rs index a2e888091..a88ba69b5 100644 --- a/src/write/column_chunk.rs +++ b/src/write/column_chunk.rs @@ -1,12 +1,14 @@ use std::collections::HashSet; use std::io::Write; -use futures::AsyncWrite; -use parquet_format_safe::thrift::protocol::{ - TCompactOutputProtocol, TCompactOutputStreamProtocol, TOutputProtocol, TOutputStreamProtocol, -}; +use parquet_format_safe::thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; use parquet_format_safe::{ColumnChunk, ColumnMetaData, Type}; +#[cfg(feature = "async")] +use futures::AsyncWrite; +#[cfg(feature = "async")] +use parquet_format_safe::thrift::protocol::{TCompactOutputStreamProtocol, TOutputStreamProtocol}; + use crate::statistics::serialize_statistics; use crate::FallibleStreamingIterator; use crate::{ @@ -17,7 +19,10 @@ use crate::{ page::{CompressedPage, PageType}, }; -use super::page::{write_page, write_page_async, PageWriteSpec}; +#[cfg(feature = "async")] +use super::page::write_page_async; + +use super::page::{write_page, PageWriteSpec}; use super::statistics::reduce; use super::DynStreamingIterator; @@ -58,6 +63,8 @@ where Ok((column_chunk, specs, bytes_written)) } +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] pub async fn write_column_chunk_async( writer: &mut W, mut offset: u64, diff --git a/src/write/indexes/write.rs b/src/write/indexes/write.rs index e913d322f..3bc3c6724 100644 --- a/src/write/indexes/write.rs +++ b/src/write/indexes/write.rs @@ -1,7 +1,11 @@ -use futures::AsyncWrite; use std::io::Write; -use parquet_format_safe::thrift::protocol::{TCompactOutputProtocol, TCompactOutputStreamProtocol}; +#[cfg(feature = "async")] +use futures::AsyncWrite; +#[cfg(feature = "async")] +use parquet_format_safe::thrift::protocol::TCompactOutputStreamProtocol; + +use parquet_format_safe::thrift::protocol::TCompactOutputProtocol; use crate::error::Result; pub use crate::metadata::KeyValue; @@ -16,6 +20,8 @@ pub fn write_column_index(writer: &mut W, pages: &[PageWriteSpec]) -> Ok(index.write_to_out_protocol(&mut protocol)? as u64) } +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] pub async fn write_column_index_async( writer: &mut W, pages: &[PageWriteSpec], @@ -31,6 +37,8 @@ pub fn write_offset_index(writer: &mut W, pages: &[PageWriteSpec]) -> Ok(index.write_to_out_protocol(&mut protocol)? as u64) } +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] pub async fn write_offset_index_async( writer: &mut W, pages: &[PageWriteSpec], diff --git a/src/write/mod.rs b/src/write/mod.rs index 035f0c109..16ba7ca47 100644 --- a/src/write/mod.rs +++ b/src/write/mod.rs @@ -6,7 +6,10 @@ pub(crate) mod page; mod row_group; pub(self) mod statistics; +#[cfg(feature = "async")] mod stream; +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] pub use stream::FileStreamer; mod dyn_iter; diff --git a/src/write/page.rs b/src/write/page.rs index c6c6072f9..9b0a9c021 100644 --- a/src/write/page.rs +++ b/src/write/page.rs @@ -2,8 +2,12 @@ use std::convert::TryInto; use std::io::Write; use std::sync::Arc; +#[cfg(feature = "async")] use futures::{AsyncWrite, AsyncWriteExt}; -use parquet_format_safe::thrift::protocol::{TCompactOutputProtocol, TCompactOutputStreamProtocol}; +#[cfg(feature = "async")] +use parquet_format_safe::thrift::protocol::TCompactOutputStreamProtocol; + +use parquet_format_safe::thrift::protocol::TCompactOutputProtocol; use parquet_format_safe::{DictionaryPageHeader, Encoding, PageType}; use crate::compression::Compression; @@ -91,6 +95,8 @@ pub fn write_page( }) } +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] pub async fn write_page_async( writer: &mut W, offset: u64, @@ -197,6 +203,8 @@ fn write_page_header(mut writer: &mut W, header: &ParquetPageHeader) - Ok(header.write_to_out_protocol(&mut protocol)? as u64) } +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] /// writes the page header into `writer`, returning the number of bytes used in the process. async fn write_page_header_async( mut writer: &mut W, diff --git a/src/write/row_group.rs b/src/write/row_group.rs index 890d6c27d..94c7ef84c 100644 --- a/src/write/row_group.rs +++ b/src/write/row_group.rs @@ -1,6 +1,8 @@ use std::io::Write; +#[cfg(feature = "async")] use futures::AsyncWrite; + use parquet_format_safe::{ColumnChunk, RowGroup}; use crate::{ @@ -9,8 +11,11 @@ use crate::{ page::CompressedPage, }; +#[cfg(feature = "async")] +use super::column_chunk::write_column_chunk_async; + use super::{ - column_chunk::{write_column_chunk, write_column_chunk_async}, + column_chunk::write_column_chunk, page::{is_data_page, PageWriteSpec}, DynIter, DynStreamingIterator, }; @@ -137,6 +142,8 @@ where )) } +#[cfg(feature = "async")] +#[cfg_attr(docsrs, doc(cfg(feature = "async")))] pub async fn write_row_group_async< 'a, W,