Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): ParquetCloudSink to allow streaming pipelines into remote ObjectStores #10060

Merged
merged 68 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
92aa917
Adds CloudWriter to polars-io/cloud/adaptors.rs
Qqwy Jul 22, 2023
50d04a3
Add CloudWriter implementation and related test case
Qqwy Jul 22, 2023
c7073bd
Make the cloud module public, such that CloudReader and CloudWriter c…
Qqwy Jul 22, 2023
09654e4
Ensure it is possible to use `file://`-based paths when `cloud` featu…
Qqwy Jul 23, 2023
9814ddc
Add another test case that uses `polars_io::cloud::build` ...
Qqwy Jul 23, 2023
9e2050c
Remove std::sync::Mutex wrapper from CloudWriter's ObjectStore since …
Qqwy Jul 23, 2023
4a6348a
Creating ParquetCloudSink. WIP, needs extra Sync bounds
Qqwy Jul 24, 2023
4c0cdb2
Make CloudWriter implement Sync by wrapping writer in Mutex
Qqwy Jul 24, 2023
cef96ae
Add a new sink to the ALogicalPlan
Qqwy Jul 24, 2023
1dbac7a
Add CloudSink to other pattern matches in alp
Qqwy Jul 24, 2023
2603a57
Fix pattern matches referring to new CloudSink in LogicalPlan and ALo…
Qqwy Jul 24, 2023
eb340e3
WIP cloud-sink example
Qqwy Jul 24, 2023
76be1c9
Add CloudSink support in get_sink
Qqwy Jul 24, 2023
7b95eb4
Make sure a CloudSink is added properly to a pipeline
Qqwy Jul 24, 2023
98257e7
Make sure CloudSink is used properly in pipeline
Qqwy Jul 24, 2023
b82d47d
Wrap the CloudWriter's internal writer in a mutex, to make it Sync.
Qqwy Jul 24, 2023
43aaa6c
Merge branch 'qqwy/cloud_writing' into qqwy/cloud_sink
Qqwy Jul 24, 2023
d02095f
Running cargo fmt
Qqwy Jul 24, 2023
f41e0b7
Merge branch 'qqwy/cloud_writing' into qqwy/cloud_sink
Qqwy Jul 24, 2023
23dd8c9
Running cargo fmt
Qqwy Jul 24, 2023
e915d25
Improve example program
Qqwy Jul 24, 2023
5bcc9eb
Test out the example program on a real AWS S3 bucket
Qqwy Jul 24, 2023
44f876a
Enable Tokio IO. This is necessary to write to real remote object stores
Qqwy Jul 24, 2023
7a1355f
Merge branch 'qqwy/cloud_writing' into qqwy/cloud_sink
Qqwy Jul 24, 2023
a78a79e
WIP writing example. Currently hitting a hang in 'AsyncWriteExt::shut…
Qqwy Jul 25, 2023
222d285
Skip tests depending on writing to /tmp/ on Windows
Qqwy Jul 25, 2023
4782d87
Merge branch 'qqwy/cloud_writing' into qqwy/cloud_sink
Qqwy Jul 25, 2023
db67914
Running cargo fmt on the new example program :-)
Qqwy Jul 25, 2023
1b1d24a
Replaces Tokio's SyncIoBridge as it results in a hang on shutdown.
Qqwy Jul 26, 2023
cefebe1
Merge branch 'qqwy/cloud_writing' into qqwy/cloud_sink
Qqwy Jul 26, 2023
7f3d52b
re-run cargo fmt :}
Qqwy Jul 26, 2023
036be68
Merge branch 'qqwy/cloud_writing' into qqwy/cloud_sink
Qqwy Jul 26, 2023
628fda6
Removes rogue dbg statements
Qqwy Jul 26, 2023
4bc92e9
Return an error on CloudWriter construction fail, dont use expect.
Qqwy Jul 26, 2023
326e5ae
Merge branch 'main' into qqwy/cloud_sink
Qqwy Jul 28, 2023
cfe8e5b
Remove the Mutex from CloudWriter
Qqwy Jul 28, 2023
46c1cd3
cargo fmt
Qqwy Jul 28, 2023
eda196c
Standardize conversion of object_store::Error -> PolarsError
Qqwy Jul 28, 2023
24cccbc
Alter order of deps in polars-error YAML to satisfy linter ;-)
Qqwy Jul 28, 2023
ca7af57
Only expose ParquetCloudSink when the cloud feature is enabled
Qqwy Jul 28, 2023
dd0fffe
Correctly use the cloud feature wherever CloudSink is used.
Qqwy Jul 28, 2023
a6e4842
Removes last occurrences featureless of CloudSink
Qqwy Aug 1, 2023
e44c91c
Alter definition of FileSink/CloudSink to 'Sink' in polars-plan
Qqwy Aug 1, 2023
4c97d1d
Make FileSink -> Sink changes to polars-lazy
Qqwy Aug 1, 2023
c2184e8
Fixing up polars-plan using SinkType
Qqwy Aug 1, 2023
18ecf79
Fix up polars-lazy with the changes to Sink/SinkType
Qqwy Aug 1, 2023
f646668
Replace final occurrences of FileSink/CloudSink with Sink
Qqwy Aug 1, 2023
5ba5570
cargo fmt ;-)
Qqwy Aug 1, 2023
2cf8c44
Remove commented code
Qqwy Aug 1, 2023
c378d91
Re-run cargo fmt, now with the --all flag.
Qqwy Aug 1, 2023
121fbb0
Making clippy happy (by silencing an feature-flag-conditional unreach…
Qqwy Aug 1, 2023
75c3515
Revert changes to the read_parquet_cloud example
Qqwy Aug 1, 2023
6dfe076
Merge branch 'main' into qqwy/cloud_sink
Qqwy Aug 18, 2023
1d4fef6
Running cargo fmt again ;-)
Qqwy Aug 18, 2023
fc0ceea
Fix TOML linter complaints about dependency ordering
Qqwy Aug 18, 2023
73c02e0
Finishing merge; set of features seems to have been slightly changed
Qqwy Aug 18, 2023
2861c17
Exclude the 'cloud' feature from the WASM CI check.
Qqwy Aug 18, 2023
0e9154e
Fixing spacing (use tabs) in Makefile
Qqwy Aug 22, 2023
a70e80a
Use file_type debugging info in panic message when using file sink on…
Qqwy Aug 22, 2023
a41b5b9
Split off 'cloud_write' feature based on which we toggle exposing sin…
Qqwy Aug 22, 2023
1f7cd12
Merge branch 'main' into qqwy/cloud_sink
Qqwy Aug 22, 2023
26fa1f6
Make sure the new cloud_write feature is exposed from the polars crat…
Qqwy Aug 22, 2023
ab968bd
Exclude new cloud_writer feature from WASM test build
Qqwy Aug 22, 2023
51c5dad
Merge branch 'main' into qqwy/cloud_sink
Qqwy Sep 8, 2023
b9ccae8
re-run cargo fmt
Qqwy Sep 8, 2023
091a87f
Make sure we don't use two different versions of object_store
Qqwy Sep 8, 2023
58c1966
Fix remaining compile errors
Qqwy Sep 8, 2023
7e0d694
Use 'std::env::temp_dir' in CloudWriter test case so it also works on…
Qqwy Sep 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ check-wasm: ## Check wasm build without supported features
--exclude-features async \
--exclude-features aws \
--exclude-features azure \
--exclude-features cloud \
--exclude-features cloud_write \
--exclude-features decompress \
--exclude-features decompress-fast \
--exclude-features default \
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ docs-selection = [
]

# Cloud support.
"async" = ["url"]
"async" = ["url", "object_store"]
"aws" = ["async", "object_store/aws"]
"azure" = ["async", "object_store/azure"]
"gcp" = ["async", "object_store/gcp"]
Expand Down
1 change: 1 addition & 0 deletions crates/polars-error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ description = "Error definitions for the Polars DataFrame library"

[dependencies]
arrow = { workspace = true }
object_store = { version = "0.6.0", default-features = false, optional = true }
regex = { workspace = true, optional = true }
thiserror = { workspace = true }

Expand Down
10 changes: 10 additions & 0 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ impl From<regex::Error> for PolarsError {
}
}

#[cfg(feature = "object_store")]
impl From<object_store::Error> for PolarsError {
fn from(err: object_store::Error) -> Self {
PolarsError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
format!("object store error {err:?}"),
))
}
}

pub type PolarsResult<T> = Result<T, PolarsError>;

pub use arrow::error::Error as ArrowError;
Expand Down
5 changes: 3 additions & 2 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ serde_json = { version = "1", default-features = false, features = ["alloc", "ra
simd-json = { workspace = true, optional = true }
simdutf8 = { version = "0.1", optional = true }
tokio = { version = "1.26", features = ["net"], optional = true }
tokio-util = { version = "0.7.8", features = ["io", "io-util"], optional = true }
url = { workspace = true, optional = true }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
Expand Down Expand Up @@ -87,8 +88,8 @@ dtype-decimal = ["polars-core/dtype-decimal"]
fmt = ["polars-core/fmt"]
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "memmap"]
async = ["async-trait", "futures", "tokio", "arrow/io_ipc_write_async", "polars-error/regex"]
cloud = ["object_store", "async", "url"]
async = ["async-trait", "futures", "tokio", "tokio-util", "arrow/io_ipc_write_async", "polars-error/regex"]
cloud = ["object_store", "async", "polars-core/async", "polars-error/object_store", "url"]
aws = ["object_store/aws", "cloud", "polars-core/aws"]
azure = ["object_store/azure", "cloud", "polars-core/azure"]
gcp = ["object_store/gcp", "cloud", "polars-core/gcp"]
Expand Down
161 changes: 160 additions & 1 deletion crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::{AsyncRead, AsyncSeek, Future, TryFutureExt};
use object_store::path::Path;
use object_store::ObjectStore;
use object_store::{MultipartId, ObjectStore};
use polars_core::cloud::CloudOptions;
use polars_error::{PolarsError, PolarsResult};
use tokio::io::{AsyncWrite, AsyncWriteExt};

type OptionalFuture = Arc<Mutex<Option<BoxFuture<'static, std::io::Result<Vec<u8>>>>>>;

Expand Down Expand Up @@ -129,3 +132,159 @@ impl AsyncSeek for CloudReader {
std::task::Poll::Ready(Ok(self.pos))
}
}

/// Adaptor which wraps the asynchronous interface of [ObjectStore::put_multipart](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#tymethod.put_multipart)
/// exposing a synchronous interface which implements `std::io::Write`.
///
/// This allows it to be used in sync code which would otherwise write to a simple File or byte stream,
/// such as with `polars::prelude::CsvWriter`.
pub struct CloudWriter {
// Hold a reference to the store
object_store: Arc<dyn ObjectStore>,
// The path in the object_store which we want to write to
path: Path,
// ID of a partially-done upload, used to abort the upload on error
multipart_id: MultipartId,
// The Tokio runtime which the writer uses internally.
runtime: tokio::runtime::Runtime,
// Internal writer, constructed at creation
writer: Box<dyn AsyncWrite + Send + Unpin>,
}

impl CloudWriter {
/// Construct a new CloudWriter, re-using the given `object_store`
///
/// Creates a new (current-thread) Tokio runtime
/// which bridges the sync writing process with the async ObjectStore multipart uploading.
/// TODO: Naming?
pub fn new_with_object_store(
object_store: Arc<dyn ObjectStore>,
path: Path,
) -> PolarsResult<Self> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();
let build_result =
runtime.block_on(async { Self::build_writer(&object_store, &path).await });
match build_result {
Err(error) => Err(PolarsError::from(error)),
Ok((multipart_id, writer)) => Ok(CloudWriter {
object_store,
path,
multipart_id,
runtime,
writer,
}),
}
}

/// Constructs a new CloudWriter from a path and an optional set of CloudOptions.
///
/// Wrapper around `CloudWriter::new_with_object_store` that is useful if you only have a single write task.
/// TODO: Naming?
pub fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Self> {
let (cloud_location, object_store) = crate::cloud::build(uri, cloud_options)?;
let object_store = Arc::from(object_store);
Self::new_with_object_store(object_store, cloud_location.prefix.into())
}

async fn build_writer(
object_store: &Arc<dyn ObjectStore>,
path: &Path,
) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Send + Unpin>)> {
let (multipart_id, s3_writer) = object_store.put_multipart(path).await?;
Ok((multipart_id, s3_writer))
}

fn abort(&self) {
let _ = self.runtime.block_on(async {
self.object_store
.abort_multipart(&self.path, &self.multipart_id)
.await
});
}
}

impl std::io::Write for CloudWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let res = self.runtime.block_on(self.writer.write(buf));
if res.is_err() {
self.abort();
}
res
}

fn flush(&mut self) -> std::io::Result<()> {
let res = self.runtime.block_on(self.writer.flush());
if res.is_err() {
self.abort();
}
res
}
}

impl Drop for CloudWriter {
fn drop(&mut self) {
let _ = self.runtime.block_on(self.writer.shutdown());
}
}

#[cfg(feature = "csv")]
#[cfg(test)]
mod tests {
use object_store::ObjectStore;
use polars_core::df;
use polars_core::prelude::{DataFrame, NamedFrom};

use super::*;

fn example_dataframe() -> DataFrame {
df!(
"foo" => &[1, 2, 3],
"bar" => &[None, Some("bak"), Some("baz")],
)
.unwrap()
}

// Skip this tests on Windows since it does not have a convenient /tmp/ location.
#[cfg_attr(target_os = "windows", ignore)]
#[test]
fn csv_to_local_objectstore_cloudwriter() {
use crate::csv::CsvWriter;
use crate::prelude::SerWriter;

let mut df = example_dataframe();

let object_store: Box<dyn ObjectStore> = Box::new(
object_store::local::LocalFileSystem::new_with_prefix("/tmp/")
Qqwy marked this conversation as resolved.
Show resolved Hide resolved
.expect("Could not initialize connection"),
);
let object_store: Arc<dyn ObjectStore> = Arc::from(object_store);

let path: object_store::path::Path = "cloud_writer_example.csv".into();

let mut cloud_writer = CloudWriter::new_with_object_store(object_store, path).unwrap();
CsvWriter::new(&mut cloud_writer)
.finish(&mut df)
.expect("Could not write dataframe as CSV to remote location");
}

// Skip this tests on Windows since it does not have a convenient /tmp/ location.
#[cfg_attr(target_os = "windows", ignore)]
#[test]
fn cloudwriter_from_cloudlocation_test() {
use crate::csv::CsvWriter;
use crate::prelude::SerWriter;

let mut df = example_dataframe();

let mut cloud_writer =
CloudWriter::new("file:///tmp/cloud_writer_example2.csv", None).unwrap();

CsvWriter::new(&mut cloud_writer)
.finish(&mut df)
.expect("Could not write dataframe as CSV to remote location");
}
}
2 changes: 1 addition & 1 deletion crates/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#[cfg(feature = "avro")]
pub mod avro;
#[cfg(feature = "cloud")]
mod cloud;
pub mod cloud;
#[cfg(any(feature = "csv", feature = "json"))]
pub mod csv;
#[cfg(feature = "parquet")]
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ async = [
"polars-pipe/async",
"streaming",
]
cloud = ["async", "polars-pipe/cloud"]
cloud_write = ["cloud"]
ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe/ipc"]
json = ["polars-io/json", "polars-plan/json", "polars-json"]
csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe/csv"]
Expand Down
41 changes: 35 additions & 6 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,9 @@ impl LazyFrame {
None
};

// file sink should be replaced
// sink should be replaced
let no_file_sink = if check_sink {
!matches!(lp_arena.get(lp_top), ALogicalPlan::FileSink { .. })
!matches!(lp_arena.get(lp_top), ALogicalPlan::Sink { .. })
} else {
true
};
Expand Down Expand Up @@ -639,9 +639,9 @@ impl LazyFrame {
#[cfg(feature = "parquet")]
pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::FileSink {
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: FileSinkOptions {
payload: SinkType::File {
path: Arc::new(path),
file_type: FileType::Parquet(options),
},
Expand All @@ -656,15 +656,44 @@ impl LazyFrame {
Ok(())
}

/// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit
/// into memory, and where you do not want to write to a local file but to a location in the cloud.
/// This method will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(all(feature = "cloud_write", feature = "parquet"))]
pub fn sink_parquet_cloud(
mut self,
uri: String,
cloud_options: Option<polars_core::cloud::CloudOptions>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the cloud_options parameter, this looks exactly the same as the sink_parquet method. Isn't there a possibility that you can just pass in the uri and the write/sink methods can determine if it's an uri or a local path?

That would make it much more easy to maintain the code, and open up all methods to natively read & write to local/cloud based sources/targets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While possible, there are some drawbacks of doing so:

  • It is a backwards-incompatible change. Polars is still pre-1.0 but nonetheless breaking existing code is not something to do lightly.
  • It makes sink_parquet more difficult to use for anyone not using the cloud feature, since for writing to a normal file you'd now need to pass in an extra None every time.
  • It complicates how to deal with feature flags. Currently, the feature flag cloud_write enables/disables sink_parquet_cloud. Attempting to use it without the feature flag enabled results in a compile-time error. If it would alter the internals of sink_parquet, the error would only be caught at runtime.

That is why I decided to keep it separate in this PR. Unifying the APIs is definitely interesting and maybe this refactor might be worth considering in a future version, but I did not want to rush this for these reasons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I keep forgetting Rust doesn't have default parameters... Pity because here it would be so much more helpful!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR enable us to write LazyFrames to cloud storage using sink_parquet? If so, how do I so this? I can't figure it out.

parquet_options: ParquetWriteOptions,
) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::Cloud {
uri: Arc::new(uri),
cloud_options,
file_type: FileType::Parquet(parquet_options),
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
}

/// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(feature = "ipc")]
pub fn sink_ipc(mut self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::FileSink {
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: FileSinkOptions {
payload: SinkType::File {
path: Arc::new(path),
file_type: FileType::Ipc(options),
},
Expand Down
13 changes: 10 additions & 3 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,16 @@ pub fn create_physical_plan(
match logical_plan {
#[cfg(feature = "python")]
PythonScan { options, .. } => Ok(Box::new(executors::PythonScanExec { options })),
FileSink { .. } => panic!(
"sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'"
),
Sink { payload, .. } => {
match payload {
SinkType::Memory => panic!("Memory Sink not supported in the standard engine."),
SinkType::File{file_type, ..} => panic!(
"sink_{file_type:?} not yet supported in standard engine. Use 'collect().write_parquet()'"
),
#[cfg(feature = "cloud")]
SinkType::Cloud{..} => panic!("Cloud Sink not supported in standard engine.")
}
}
Union { inputs, options } => {
let inputs = inputs
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub(super) fn construct(
// the file sink is always to the top of the tree
// not every branch has a final sink. For instance rhs join branches
if let Some(node) = branch.get_final_sink() {
if matches!(lp_arena.get(node), ALogicalPlan::FileSink { .. }) {
if matches!(lp_arena.get(node), ALogicalPlan::Sink { .. }) {
final_sink = Some(node)
}
}
Expand Down Expand Up @@ -191,20 +191,16 @@ pub(super) fn construct(
return Ok(None);
};
let insertion_location = match lp_arena.get(final_sink) {
FileSink {
// this was inserted only during conversion and does not exist
// in the original tree, so we take the input, as that's where
// we connect into the original tree.
Sink {
input,
payload: FileSinkOptions { file_type, .. },
} => {
// this was inserted only during conversion and does not exist
// in the original tree, so we take the input, as that's where
// we connect into the original tree.
if matches!(file_type, FileType::Memory) {
*input
} else {
// default case if the tree ended with a file_sink
final_sink
}
},
payload: SinkType::Memory,
} => *input,
// Other sinks were not inserted during conversion,
// so they are returned as-is
Sink { .. } => final_sink,
_ => unreachable!(),
};
// keep the original around for formatting purposes
Expand Down
Loading