diff --git a/crates/Makefile b/crates/Makefile index ad74b606bb9a..8589e77f68ca 100644 --- a/crates/Makefile +++ b/crates/Makefile @@ -118,6 +118,9 @@ check-wasm: ## Check wasm build without supported features --exclude-features async \ --exclude-features aws \ --exclude-features azure \ + --exclude-features cloud \ + --exclude-features cloud_write \ + --exclude-features decompress \ --exclude-features decompress-fast \ --exclude-features default \ --exclude-features docs-selection \ diff --git a/crates/polars-core/Cargo.toml b/crates/polars-core/Cargo.toml index adb530002559..11458b58d9d7 100644 --- a/crates/polars-core/Cargo.toml +++ b/crates/polars-core/Cargo.toml @@ -179,7 +179,7 @@ docs-selection = [ ] # Cloud support. -"async" = ["url"] +"async" = ["url", "object_store"] "aws" = ["async", "object_store/aws"] "azure" = ["async", "object_store/azure"] "gcp" = ["async", "object_store/gcp"] diff --git a/crates/polars-error/Cargo.toml b/crates/polars-error/Cargo.toml index ce622dcccbe9..a6ed9123f199 100644 --- a/crates/polars-error/Cargo.toml +++ b/crates/polars-error/Cargo.toml @@ -10,6 +10,7 @@ description = "Error definitions for the Polars DataFrame library" [dependencies] arrow = { workspace = true } +object_store = { version = "0.7", default-features = false, optional = true } regex = { workspace = true, optional = true } thiserror = { workspace = true } diff --git a/crates/polars-error/src/lib.rs b/crates/polars-error/src/lib.rs index 517270dcc187..c1d8491813e2 100644 --- a/crates/polars-error/src/lib.rs +++ b/crates/polars-error/src/lib.rs @@ -80,6 +80,16 @@ impl From for PolarsError { } } +#[cfg(feature = "object_store")] +impl From for PolarsError { + fn from(err: object_store::Error) -> Self { + PolarsError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + format!("object store error {err:?}"), + )) + } +} + pub type PolarsResult = Result; pub use arrow::error::Error as ArrowError; diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index 003b36abef42..3a6d3908b4da 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -39,6 +39,7 @@ serde_json = { version = "1", default-features = false, features = ["alloc", "ra simd-json = { workspace = true, optional = true } simdutf8 = { version = "0.1", optional = true } tokio = { version = "1.26", features = ["net"], optional = true } +tokio-util = { version = "0.7.8", features = ["io", "io-util"], optional = true } url = { workspace = true, optional = true } [target.'cfg(not(target_family = "wasm"))'.dependencies] @@ -88,8 +89,8 @@ dtype-decimal = ["polars-core/dtype-decimal"] fmt = ["polars-core/fmt"] lazy = [] parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "memmap"] -async = ["async-trait", "futures", "tokio", "arrow/io_ipc_write_async", "polars-error/regex"] -cloud = ["object_store", "async", "url"] +async = ["async-trait", "futures", "tokio", "tokio-util", "arrow/io_ipc_write_async", "polars-error/regex"] +cloud = ["object_store", "async", "polars-core/async", "polars-error/object_store", "url"] aws = ["object_store/aws", "cloud", "polars-core/aws"] azure = ["object_store/azure", "cloud", "polars-core/azure"] gcp = ["object_store/gcp", "cloud", "polars-core/gcp"] diff --git a/crates/polars-io/src/cloud/adaptors.rs b/crates/polars-io/src/cloud/adaptors.rs index 2060c876f4a8..48a912cfa995 100644 --- a/crates/polars-io/src/cloud/adaptors.rs +++ b/crates/polars-io/src/cloud/adaptors.rs @@ -10,7 +10,10 @@ use futures::future::BoxFuture; use futures::lock::Mutex; use futures::{AsyncRead, AsyncSeek, Future, TryFutureExt}; use object_store::path::Path; -use object_store::ObjectStore; +use object_store::{MultipartId, ObjectStore}; +use polars_core::cloud::CloudOptions; +use polars_error::{PolarsError, PolarsResult}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; type OptionalFuture = Arc>>>>>; @@ -129,3 +132,157 @@ impl AsyncSeek for CloudReader { std::task::Poll::Ready(Ok(self.pos)) } } + +/// Adaptor which wraps the asynchronous interface of [ObjectStore::put_multipart](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#tymethod.put_multipart) +/// exposing a synchronous interface which implements `std::io::Write`. +/// +/// This allows it to be used in sync code which would otherwise write to a simple File or byte stream, +/// such as with `polars::prelude::CsvWriter`. +pub struct CloudWriter { + // Hold a reference to the store + object_store: Arc, + // The path in the object_store which we want to write to + path: Path, + // ID of a partially-done upload, used to abort the upload on error + multipart_id: MultipartId, + // The Tokio runtime which the writer uses internally. + runtime: tokio::runtime::Runtime, + // Internal writer, constructed at creation + writer: Box, +} + +impl CloudWriter { + /// Construct a new CloudWriter, re-using the given `object_store` + /// + /// Creates a new (current-thread) Tokio runtime + /// which bridges the sync writing process with the async ObjectStore multipart uploading. + /// TODO: Naming? + pub fn new_with_object_store( + object_store: Arc, + path: Path, + ) -> PolarsResult { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + let build_result = + runtime.block_on(async { Self::build_writer(&object_store, &path).await }); + match build_result { + Err(error) => Err(PolarsError::from(error)), + Ok((multipart_id, writer)) => Ok(CloudWriter { + object_store, + path, + multipart_id, + runtime, + writer, + }), + } + } + + /// Constructs a new CloudWriter from a path and an optional set of CloudOptions. + /// + /// Wrapper around `CloudWriter::new_with_object_store` that is useful if you only have a single write task. + /// TODO: Naming? + pub fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult { + let (cloud_location, object_store) = crate::cloud::build(uri, cloud_options)?; + let object_store = Arc::from(object_store); + Self::new_with_object_store(object_store, cloud_location.prefix.into()) + } + + async fn build_writer( + object_store: &Arc, + path: &Path, + ) -> object_store::Result<(MultipartId, Box)> { + let (multipart_id, s3_writer) = object_store.put_multipart(path).await?; + Ok((multipart_id, s3_writer)) + } + + fn abort(&self) { + let _ = self.runtime.block_on(async { + self.object_store + .abort_multipart(&self.path, &self.multipart_id) + .await + }); + } +} + +impl std::io::Write for CloudWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let res = self.runtime.block_on(self.writer.write(buf)); + if res.is_err() { + self.abort(); + } + res + } + + fn flush(&mut self) -> std::io::Result<()> { + let res = self.runtime.block_on(self.writer.flush()); + if res.is_err() { + self.abort(); + } + res + } +} + +impl Drop for CloudWriter { + fn drop(&mut self) { + let _ = self.runtime.block_on(self.writer.shutdown()); + } +} + +#[cfg(feature = "csv")] +#[cfg(test)] +mod tests { + use object_store::ObjectStore; + use polars_core::df; + use polars_core::prelude::{DataFrame, NamedFrom}; + + use super::*; + + fn example_dataframe() -> DataFrame { + df!( + "foo" => &[1, 2, 3], + "bar" => &[None, Some("bak"), Some("baz")], + ) + .unwrap() + } + + #[test] + fn csv_to_local_objectstore_cloudwriter() { + use crate::csv::CsvWriter; + use crate::prelude::SerWriter; + + let mut df = example_dataframe(); + + let object_store: Box = Box::new( + object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir()) + .expect("Could not initialize connection"), + ); + let object_store: Arc = Arc::from(object_store); + + let path: object_store::path::Path = "cloud_writer_example.csv".into(); + + let mut cloud_writer = CloudWriter::new_with_object_store(object_store, path).unwrap(); + CsvWriter::new(&mut cloud_writer) + .finish(&mut df) + .expect("Could not write dataframe as CSV to remote location"); + } + + // Skip this tests on Windows since it does not have a convenient /tmp/ location. + #[cfg_attr(target_os = "windows", ignore)] + #[test] + fn cloudwriter_from_cloudlocation_test() { + use crate::csv::CsvWriter; + use crate::prelude::SerWriter; + + let mut df = example_dataframe(); + + let mut cloud_writer = + CloudWriter::new("file:///tmp/cloud_writer_example2.csv", None).unwrap(); + + CsvWriter::new(&mut cloud_writer) + .finish(&mut df) + .expect("Could not write dataframe as CSV to remote location"); + } +} diff --git a/crates/polars-io/src/lib.rs b/crates/polars-io/src/lib.rs index e3f5f25b737e..fb9860e7bcc5 100644 --- a/crates/polars-io/src/lib.rs +++ b/crates/polars-io/src/lib.rs @@ -5,7 +5,7 @@ #[cfg(feature = "avro")] pub mod avro; #[cfg(feature = "cloud")] -mod cloud; +pub mod cloud; #[cfg(any(feature = "csv", feature = "json"))] pub mod csv; #[cfg(feature = "parquet")] diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index 9e8ca793d7aa..c78c49901acd 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -45,6 +45,8 @@ async = [ "polars-pipe/async", "streaming", ] +cloud = ["async", "polars-pipe/cloud"] +cloud_write = ["cloud"] ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe/ipc"] json = ["polars-io/json", "polars-plan/json", "polars-json"] csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe/csv"] diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index bfd3a708f87c..f656e3e88249 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -603,9 +603,9 @@ impl LazyFrame { None }; - // file sink should be replaced + // sink should be replaced let no_file_sink = if check_sink { - !matches!(lp_arena.get(lp_top), ALogicalPlan::FileSink { .. }) + !matches!(lp_arena.get(lp_top), ALogicalPlan::Sink { .. }) } else { true }; @@ -664,9 +664,9 @@ impl LazyFrame { #[cfg(feature = "parquet")] pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::FileSink { + self.logical_plan = LogicalPlan::Sink { input: Box::new(self.logical_plan), - payload: FileSinkOptions { + payload: SinkType::File { path: Arc::new(path), file_type: FileType::Parquet(options), }, @@ -681,15 +681,44 @@ impl LazyFrame { Ok(()) } + /// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit + /// into memory, and where you do not want to write to a local file but to a location in the cloud. + /// This method will return an error if the query cannot be completely done in a + /// streaming fashion. + #[cfg(all(feature = "cloud_write", feature = "parquet"))] + pub fn sink_parquet_cloud( + mut self, + uri: String, + cloud_options: Option, + parquet_options: ParquetWriteOptions, + ) -> PolarsResult<()> { + self.opt_state.streaming = true; + self.logical_plan = LogicalPlan::Sink { + input: Box::new(self.logical_plan), + payload: SinkType::Cloud { + uri: Arc::new(uri), + cloud_options, + file_type: FileType::Parquet(parquet_options), + }, + }; + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: "cannot run the whole query in a streaming order" + ); + let _ = physical_plan.execute(&mut state)?; + Ok(()) + } + /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "ipc")] pub fn sink_ipc(mut self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::FileSink { + self.logical_plan = LogicalPlan::Sink { input: Box::new(self.logical_plan), - payload: FileSinkOptions { + payload: SinkType::File { path: Arc::new(path), file_type: FileType::Ipc(options), }, @@ -710,9 +739,9 @@ impl LazyFrame { #[cfg(feature = "csv")] pub fn sink_csv(mut self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::FileSink { + self.logical_plan = LogicalPlan::Sink { input: Box::new(self.logical_plan), - payload: FileSinkOptions { + payload: SinkType::File { path: Arc::new(path), file_type: FileType::Csv(options), }, diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 2f5dfb213586..b576ed117feb 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -150,9 +150,16 @@ pub fn create_physical_plan( match logical_plan { #[cfg(feature = "python")] PythonScan { options, .. } => Ok(Box::new(executors::PythonScanExec { options })), - FileSink { .. } => panic!( - "sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'" - ), + Sink { payload, .. } => { + match payload { + SinkType::Memory => panic!("Memory Sink not supported in the standard engine."), + SinkType::File{file_type, ..} => panic!( + "sink_{file_type:?} not yet supported in standard engine. Use 'collect().write_parquet()'" + ), + #[cfg(feature = "cloud")] + SinkType::Cloud{..} => panic!("Cloud Sink not supported in standard engine.") + } + } Union { inputs, options } => { let inputs = inputs .into_iter() diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index baf18e913ef3..240fb7aaa7ec 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -122,7 +122,7 @@ pub(super) fn construct( // the file sink is always to the top of the tree // not every branch has a final sink. For instance rhs join branches if let Some(node) = branch.get_final_sink() { - if matches!(lp_arena.get(node), ALogicalPlan::FileSink { .. }) { + if matches!(lp_arena.get(node), ALogicalPlan::Sink { .. }) { final_sink = Some(node) } } @@ -191,20 +191,16 @@ pub(super) fn construct( return Ok(None); }; let insertion_location = match lp_arena.get(final_sink) { - FileSink { + // this was inserted only during conversion and does not exist + // in the original tree, so we take the input, as that's where + // we connect into the original tree. + Sink { input, - payload: FileSinkOptions { file_type, .. }, - } => { - // this was inserted only during conversion and does not exist - // in the original tree, so we take the input, as that's where - // we connect into the original tree. - if matches!(file_type, FileType::Memory) { - *input - } else { - // default case if the tree ended with a file_sink - final_sink - } - }, + payload: SinkType::Memory, + } => *input, + // Other sinks were not inserted during conversion, + // so they are returned as-is + Sink { .. } => final_sink, _ => unreachable!(), }; // keep the original around for formatting purposes diff --git a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs index 20369fafd237..bca9824af290 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs @@ -37,13 +37,10 @@ fn process_non_streamable_node( fn insert_file_sink(mut root: Node, lp_arena: &mut Arena) -> Node { // The pipelines need a final sink, we insert that here. // this allows us to split at joins/unions and share a sink - if !matches!(lp_arena.get(root), ALogicalPlan::FileSink { .. }) { - root = lp_arena.add(ALogicalPlan::FileSink { + if !matches!(lp_arena.get(root), ALogicalPlan::Sink { .. }) { + root = lp_arena.add(ALogicalPlan::Sink { input: root, - payload: FileSinkOptions { - path: Default::default(), - file_type: FileType::Memory, - }, + payload: SinkType::Memory, }) } root @@ -154,7 +151,7 @@ pub(crate) fn insert_streaming_nodes( state.operators_sinks.push(PipelineNode::Sink(root)); stack.push((*input, state, current_idx)) }, - FileSink { input, .. } => { + Sink { input, .. } => { state.streamable = true; state.operators_sinks.push(PipelineNode::Sink(root)); stack.push((*input, state, current_idx)) diff --git a/crates/polars-pipe/Cargo.toml b/crates/polars-pipe/Cargo.toml index 592e713722e7..96df46c82f50 100644 --- a/crates/polars-pipe/Cargo.toml +++ b/crates/polars-pipe/Cargo.toml @@ -31,6 +31,7 @@ version_check = { workspace = true } [features] compile = ["crossbeam-channel", "crossbeam-queue", "polars-io/ipc"] csv = ["polars-plan/csv", "polars-io/csv"] +cloud = ["async", "polars-io/cloud", "polars-plan/cloud"] parquet = ["polars-plan/parquet", "polars-io/parquet"] ipc = ["polars-plan/ipc", "polars-io/ipc"] async = ["polars-plan/async", "polars-io/async"] diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 3845b03e590f..69e3e6c830b3 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -23,7 +23,7 @@ trait SinkWriter { } #[cfg(feature = "parquet")] -impl SinkWriter for polars_io::parquet::BatchedWriter { +impl SinkWriter for polars_io::parquet::BatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } @@ -35,7 +35,7 @@ impl SinkWriter for polars_io::parquet::BatchedWriter { } #[cfg(feature = "ipc")] -impl SinkWriter for polars_io::ipc::BatchedWriter { +impl SinkWriter for polars_io::ipc::BatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } @@ -78,7 +78,7 @@ impl ParquetSink { .set_parallel(false) .batched(schema)?; - let writer = Box::new(writer) as Box; + let writer = Box::new(writer) as Box; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; @@ -98,6 +98,48 @@ impl ParquetSink { } } +#[cfg(all(feature = "parquet", feature = "cloud"))] +pub struct ParquetCloudSink {} +#[cfg(all(feature = "parquet", feature = "cloud"))] +impl ParquetCloudSink { + #[allow(clippy::new_ret_no_self)] + pub fn new( + uri: &str, + cloud_options: Option<&polars_core::cloud::CloudOptions>, + parquet_options: ParquetWriteOptions, + schema: &Schema, + ) -> PolarsResult { + let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options)?; + let writer = ParquetWriter::new(cloud_writer) + .with_compression(parquet_options.compression) + .with_data_pagesize_limit(parquet_options.data_pagesize_limit) + .with_statistics(parquet_options.statistics) + .with_row_group_size(parquet_options.row_group_size) + // This is important! Otherwise we will deadlock + // See: #7074 + .set_parallel(false) + .batched(schema)?; + + let writer = Box::new(writer) as Box; + + let morsels_per_sink = morsels_per_sink(); + let backpressure = morsels_per_sink * 2; + let (sender, receiver) = bounded(backpressure); + + let io_thread_handle = Arc::new(Some(init_writer_thread( + receiver, + writer, + parquet_options.maintain_order, + morsels_per_sink, + ))); + + Ok(FilesSink { + sender, + io_thread_handle, + }) + } +} + #[cfg(feature = "ipc")] pub struct IpcSink {} #[cfg(feature = "ipc")] @@ -109,7 +151,7 @@ impl IpcSink { .with_compression(options.compression) .batched(schema)?; - let writer = Box::new(writer) as Box; + let writer = Box::new(writer) as Box; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; @@ -173,7 +215,7 @@ impl CsvSink { #[cfg(any(feature = "parquet", feature = "ipc"))] fn init_writer_thread( receiver: Receiver>, - mut writer: Box, + mut writer: Box, maintain_order: bool, // this is used to determine when a batch of chunks should be written to disk // all chunks per push should be collected to determine in which order they should diff --git a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs index 9e3276b9e0a6..247df7658e46 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs @@ -107,9 +107,9 @@ pub fn can_convert_to_hash_agg( } /// # Returns: -/// - input_dtype: dtype that goes into the agg expression -/// - physical expr: physical expression that produces the input of the aggregation -/// - aggregation function: the aggregation function +/// - input_dtype: dtype that goes into the agg expression +/// - physical expr: physical expression that produces the input of the aggregation +/// - aggregation function: the aggregation function pub(crate) fn convert_to_hash_agg( node: Node, expr_arena: &Arena, diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 97a64a7b7c8c..e1a06d7433ff 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -13,7 +13,7 @@ use crate::executors::sinks::group_by::GenericGroupby2; use crate::executors::sinks::*; use crate::executors::{operators, sources}; use crate::expressions::PhysicalPipedExpr; -use crate::operators::{Operator, Sink, Source}; +use crate::operators::{Operator, Sink as SinkTrait, Source}; use crate::pipeline::PipeLine; fn exprs_to_physical( @@ -122,32 +122,68 @@ pub fn get_sink( lp_arena: &Arena, expr_arena: &mut Arena, to_physical: &F, -) -> PolarsResult> +) -> PolarsResult> where F: Fn(Node, &Arena, Option<&SchemaRef>) -> PolarsResult>, { use ALogicalPlan::*; let out = match lp_arena.get(node) { - FileSink { input, payload } => { - let path = payload.path.as_ref().as_path(); + Sink { input, payload } => { let input_schema = lp_arena.get(*input).schema(lp_arena); - match &payload.file_type { - #[cfg(feature = "parquet")] - FileType::Parquet(options) => { - Box::new(ParquetSink::new(path, *options, input_schema.as_ref())?) - as Box - }, - #[cfg(feature = "ipc")] - FileType::Ipc(options) => { - Box::new(IpcSink::new(path, *options, input_schema.as_ref())?) as Box + match payload { + SinkType::Memory => { + Box::new(OrderedSink::new(input_schema.into_owned())) as Box }, - #[cfg(feature = "csv")] - FileType::Csv(options) => { - Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref())?) - as Box + SinkType::File { + path, file_type, .. + } => { + let path = path.as_ref().as_path(); + match &file_type { + #[cfg(feature = "parquet")] + FileType::Parquet(options) => { + Box::new(ParquetSink::new(path, *options, input_schema.as_ref())?) + as Box + }, + #[cfg(feature = "ipc")] + FileType::Ipc(options) => { + Box::new(IpcSink::new(path, *options, input_schema.as_ref())?) + as Box + }, + #[cfg(feature = "csv")] + FileType::Csv(options) => { + Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref())?) + as Box + }, + #[allow(unreachable_patterns)] + _ => unreachable!(), + } }, - FileType::Memory => { - Box::new(OrderedSink::new(input_schema.into_owned())) as Box + #[cfg(feature = "cloud")] + SinkType::Cloud { + uri, + file_type, + cloud_options, + } => { + let uri = uri.as_ref().as_str(); + let input_schema = lp_arena.get(*input).schema(lp_arena); + let cloud_options = &cloud_options; + match &file_type { + #[cfg(feature = "parquet")] + FileType::Parquet(parquet_options) => Box::new(ParquetCloudSink::new( + uri, + cloud_options.as_ref(), + *parquet_options, + input_schema.as_ref(), + )?) + as Box, + #[cfg(feature = "ipc")] + FileType::Ipc(_ipc_options) => { + // TODO: support Ipc as well + todo!("For now, only parquet cloud files are supported"); + }, + #[allow(unreachable_patterns)] + _ => unreachable!(), + } }, } }, @@ -165,7 +201,7 @@ where match &options.args.how { #[cfg(feature = "cross_join")] JoinType::Cross => { - Box::new(CrossJoin::new(options.args.suffix().into())) as Box + Box::new(CrossJoin::new(options.args.suffix().into())) as Box }, join_type @ JoinType::Inner | join_type @ JoinType::Left => { let input_schema_left = lp_arena.get(*input_left).schema(lp_arena); @@ -197,14 +233,14 @@ where swapped, join_columns_left, join_columns_right, - )) as Box + )) as Box }, _ => unimplemented!(), } }, Slice { offset, len, .. } => { let slice = SliceSink::new(*offset as u64, *len as usize); - Box::new(slice) as Box + Box::new(slice) as Box }, Sort { input, @@ -220,7 +256,7 @@ where let index = input_schema.try_index_of(by_column.as_ref())?; let sort_sink = SortSink::new(index, args.clone(), input_schema); - Box::new(sort_sink) as Box + Box::new(sort_sink) as Box } else { let sort_idx = by_column .iter() @@ -231,7 +267,7 @@ where .collect::>>()?; let sort_sink = SortSinkMultiple::new(args.clone(), input_schema, sort_idx); - Box::new(sort_sink) as Box + Box::new(sort_sink) as Box } }, Distinct { input, options } => { @@ -370,7 +406,7 @@ where input_schema, output_schema.clone(), options.slice, - )) as Box + )) as Box }) }, (DataType::Utf8, 1) => Box::new(group_by::Utf8GroupbySink::new( @@ -380,7 +416,7 @@ where input_schema, output_schema.clone(), options.slice, - )) as Box, + )) as Box, _ => Box::new(GenericGroupby2::new( key_columns, aggregation_columns, @@ -529,7 +565,7 @@ pub fn create_pipeline( expr_arena: &mut Arena, to_physical: F, verbose: bool, - sink_cache: &mut PlHashMap>, + sink_cache: &mut PlHashMap>, ) -> PolarsResult where F: Fn(Node, &Arena, Option<&SchemaRef>) -> PolarsResult>, diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index 479704041b4e..6212bf68e39f 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -53,7 +53,8 @@ serde = [ default = ["compile"] streaming = [] parquet = ["polars-core/parquet", "polars-io/parquet"] -async = ["polars-io/cloud"] +async = ["polars-io/async"] +cloud = ["async", "polars-io/cloud"] ipc = ["polars-io/ipc"] json = ["polars-io/json"] csv = ["polars-io/csv"] diff --git a/crates/polars-plan/src/dot.rs b/crates/polars-plan/src/dot.rs index 0ae903f3c8c9..f7151bed2ff7 100644 --- a/crates/polars-plan/src/dot.rs +++ b/crates/polars-plan/src/dot.rs @@ -391,11 +391,16 @@ impl LogicalPlan { self.write_dot(acc_str, prev_node, current_node, id_map)?; input.dot(acc_str, (branch, id + 1), current_node, id_map) }, - FileSink { input, .. } => { + Sink { input, payload, .. } => { let current_node = DotNode { branch, id, - fmt: "FILE_SINK", + fmt: match payload { + SinkType::Memory => "SINK (MEMORY)", + SinkType::File { .. } => "SINK (FILE)", + #[cfg(feature = "cloud")] + SinkType::Cloud { .. } => "SINK (CLOUD)", + }, }; self.write_dot(acc_str, prev_node, current_node, id_map)?; input.dot(acc_str, (branch, id + 1), current_node, id_map) diff --git a/crates/polars-plan/src/logical_plan/alp.rs b/crates/polars-plan/src/logical_plan/alp.rs index c379af84b90e..7289a4f79961 100644 --- a/crates/polars-plan/src/logical_plan/alp.rs +++ b/crates/polars-plan/src/logical_plan/alp.rs @@ -110,9 +110,9 @@ pub enum ALogicalPlan { contexts: Vec, schema: SchemaRef, }, - FileSink { + Sink { input: Node, - payload: FileSinkOptions, + payload: SinkType, }, } @@ -161,7 +161,12 @@ impl ALogicalPlan { MapFunction { .. } => "map_function", Union { .. } => "union", ExtContext { .. } => "ext_context", - FileSink { .. } => "file_sink", + Sink { payload, .. } => match payload { + SinkType::Memory => "sink (memory)", + SinkType::File { .. } => "sink (file)", + #[cfg(feature = "cloud")] + SinkType::Cloud { .. } => "sink (cloud)", + }, } } @@ -194,9 +199,7 @@ impl ALogicalPlan { Aggregate { schema, .. } => schema, Join { schema, .. } => schema, HStack { schema, .. } => schema, - Distinct { input, .. } | FileSink { input, .. } => { - return arena.get(*input).schema(arena) - }, + Distinct { input, .. } | Sink { input, .. } => return arena.get(*input).schema(arena), Slice { input, .. } => return arena.get(*input).schema(arena), MapFunction { input, function } => { let input_schema = arena.get(*input).schema(arena); @@ -373,7 +376,7 @@ impl ALogicalPlan { contexts: inputs, schema: schema.clone(), }, - FileSink { payload, .. } => FileSink { + Sink { payload, .. } => Sink { input: inputs.pop().unwrap(), payload: payload.clone(), }, @@ -416,7 +419,7 @@ impl ALogicalPlan { container.push(*node) } }, - ExtContext { .. } | FileSink { .. } => {}, + ExtContext { .. } | Sink { .. } => {}, } } @@ -460,7 +463,7 @@ impl ALogicalPlan { HStack { input, .. } => *input, Distinct { input, .. } => *input, MapFunction { input, .. } => *input, - FileSink { input, .. } => *input, + Sink { input, .. } => *input, ExtContext { input, contexts, .. } => { diff --git a/crates/polars-plan/src/logical_plan/conversion.rs b/crates/polars-plan/src/logical_plan/conversion.rs index 0a6ab0e086a6..71d40564ec02 100644 --- a/crates/polars-plan/src/logical_plan/conversion.rs +++ b/crates/polars-plan/src/logical_plan/conversion.rs @@ -364,9 +364,9 @@ pub fn to_alp( schema, } }, - LogicalPlan::FileSink { input, payload } => { + LogicalPlan::Sink { input, payload } => { let input = to_alp(*input, expr_arena, lp_arena)?; - ALogicalPlan::FileSink { input, payload } + ALogicalPlan::Sink { input, payload } }, }; Ok(lp_arena.add(v)) @@ -790,9 +790,9 @@ impl ALogicalPlan { schema, } }, - ALogicalPlan::FileSink { input, payload } => { + ALogicalPlan::Sink { input, payload } => { let input = Box::new(convert_to_lp(input, lp_arena)); - LogicalPlan::FileSink { input, payload } + LogicalPlan::Sink { input, payload } }, } } diff --git a/crates/polars-plan/src/logical_plan/format.rs b/crates/polars-plan/src/logical_plan/format.rs index d7767d39020a..b8291418743f 100644 --- a/crates/polars-plan/src/logical_plan/format.rs +++ b/crates/polars-plan/src/logical_plan/format.rs @@ -224,8 +224,14 @@ impl LogicalPlan { write!(f, "{:indent$}EXTERNAL_CONTEXT", "")?; input._format(f, sub_indent) }, - FileSink { input, .. } => { - write!(f, "{:indent$}FILE_SINK", "")?; + Sink { input, payload, .. } => { + let name = match payload { + SinkType::Memory => "SINK (memory)", + SinkType::File { .. } => "SINK (file)", + #[cfg(feature = "cloud")] + SinkType::Cloud { .. } => "SINK (cloud)", + }; + write!(f, "{:indent$}{}", "", name)?; input._format(f, sub_indent) }, } diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index 81137f71cb84..3865a268eda6 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -#[cfg(feature = "parquet")] +#[cfg(any(feature = "cloud", feature = "parquet"))] use polars_core::cloud::CloudOptions; use polars_core::prelude::*; @@ -250,9 +250,9 @@ pub enum LogicalPlan { contexts: Vec, schema: SchemaRef, }, - FileSink { + Sink { input: Box, - payload: FileSinkOptions, + payload: SinkType, }, } diff --git a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index ba12529521df..044a09e56412 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -439,7 +439,7 @@ impl PredicatePushDown { } // Pushed down passed these nodes - lp@ FileSink {..} => { + lp@ Sink {..} => { self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false) } lp @ HStack {..} | lp @ Projection {..} | lp @ ExtContext {..} => { diff --git a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs index 75fa97daf42e..a412cb5a6dc5 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs @@ -688,7 +688,7 @@ impl ProjectionPushDown { ) }, // These nodes only have inputs and exprs, so we can use same logic. - lp @ Slice { .. } | lp @ FileSink { .. } => process_generic( + lp @ Slice { .. } | lp @ Sink { .. } => process_generic( self, lp, acc_projections, diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 2ee720674715..bdd5ec0deb7d 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -301,6 +301,22 @@ pub struct AnonymousScanOptions { pub fmt_str: &'static str, } +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Clone, Debug)] +pub enum SinkType { + Memory, + File { + path: Arc, + file_type: FileType, + }, + #[cfg(feature = "cloud")] + Cloud { + uri: Arc, + file_type: FileType, + cloud_options: Option, + }, +} + #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Clone, Debug)] pub struct FileSinkOptions { @@ -317,7 +333,6 @@ pub enum FileType { Ipc(IpcWriterOptions), #[cfg(feature = "csv")] Csv(CsvWriterOptions), - Memory, } #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/crates/polars-plan/src/logical_plan/schema.rs b/crates/polars-plan/src/logical_plan/schema.rs index 75c80c8b46ca..7ae80f2756c4 100644 --- a/crates/polars-plan/src/logical_plan/schema.rs +++ b/crates/polars-plan/src/logical_plan/schema.rs @@ -24,7 +24,7 @@ impl LogicalPlan { Aggregate { schema, .. } => Ok(Cow::Borrowed(schema)), Join { schema, .. } => Ok(Cow::Borrowed(schema)), HStack { schema, .. } => Ok(Cow::Borrowed(schema)), - Distinct { input, .. } | FileSink { input, .. } => input.schema(), + Distinct { input, .. } | Sink { input, .. } => input.schema(), Slice { input, .. } => input.schema(), MapFunction { input, function, .. diff --git a/crates/polars/Cargo.toml b/crates/polars/Cargo.toml index 2ca73319d1d1..5a0ebefe093a 100644 --- a/crates/polars/Cargo.toml +++ b/crates/polars/Cargo.toml @@ -54,9 +54,11 @@ serde = ["polars-core/serde"] serde-lazy = ["polars-core/serde-lazy", "polars-lazy/serde", "polars-time/serde", "polars-io/serde", "polars-ops/serde"] parquet = ["polars-io", "polars-core/parquet", "polars-lazy/parquet", "polars-io/parquet", "polars-sql/parquet"] async = ["polars-lazy/async"] -aws = ["async", "polars-io/aws"] -azure = ["async", "polars-io/azure"] -gcp = ["async", "polars-io/gcp"] +cloud = ["polars-lazy/cloud", "polars-io/cloud"] +cloud_write = ["cloud", "polars-lazy/cloud_write"] +aws = ["async", "cloud", "polars-io/aws"] +azure = ["async", "cloud", "polars-io/azure"] +gcp = ["async", "cloud", "polars-io/gcp"] lazy = ["polars-core/lazy", "polars-lazy", "polars-lazy/compile"] # commented out until UB is fixed # parallel = ["polars-core/parallel"] diff --git a/examples/read_parquet_cloud/Cargo.toml b/examples/read_parquet_cloud/Cargo.toml index 634311904846..f6f5b56eb430 100644 --- a/examples/read_parquet_cloud/Cargo.toml +++ b/examples/read_parquet_cloud/Cargo.toml @@ -4,6 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] -polars = { path = "../../crates/polars", features = ["lazy", "aws"] } +polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet"] } aws-creds = "0.35.0" diff --git a/examples/write_parquet_cloud/Cargo.toml b/examples/write_parquet_cloud/Cargo.toml new file mode 100644 index 000000000000..7bf6a24e46d3 --- /dev/null +++ b/examples/write_parquet_cloud/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "write_parquet_cloud" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +aws-creds = "0.35.0" +polars = { path = "../../crates/polars", features = ["lazy", "aws", "parquet", "cloud_write"] } diff --git a/examples/write_parquet_cloud/src/main.rs b/examples/write_parquet_cloud/src/main.rs new file mode 100644 index 000000000000..8ed29bc402cf --- /dev/null +++ b/examples/write_parquet_cloud/src/main.rs @@ -0,0 +1,63 @@ +use awscreds::Credentials; +use cloud::AmazonS3ConfigKey as Key; +use polars::prelude::*; + +// Login to your aws account and then copy the ../datasets/foods1.parquet file to your own bucket. +// Adjust the link below. +const TEST_S3_LOCATION: &str = "s3://polarstesting/polars_write_example_cloud.parquet"; + +fn main() -> PolarsResult<()> { + sink_file(); + sink_cloud_local(); + sink_aws(); + + Ok(()) +} + +fn sink_file() { + let df = example_dataframe(); + + // Writing to a local file: + let path = "/tmp/polars_write_example.parquet".into(); + df.lazy().sink_parquet(path, Default::default()).unwrap(); +} + +fn sink_cloud_local() { + let df = example_dataframe(); + + // Writing to a location that might be in the cloud: + let uri = "file:///tmp/polars_write_example_cloud.parquet".to_string(); + df.lazy() + .sink_parquet_cloud(uri, None, Default::default()) + .unwrap(); +} + +fn sink_aws() { + let cred = Credentials::default().unwrap(); + + // Propagate the credentials and other cloud options. + let cloud_options = cloud::CloudOptions::default().with_aws([ + (Key::AccessKeyId, &cred.access_key.unwrap()), + (Key::SecretAccessKey, &cred.secret_key.unwrap()), + (Key::Region, &"eu-central-1".into()), + ]); + let cloud_options = Some(cloud_options); + + let df = example_dataframe(); + + df.lazy() + .sink_parquet_cloud( + TEST_S3_LOCATION.to_string(), + cloud_options, + Default::default(), + ) + .unwrap(); +} + +fn example_dataframe() -> DataFrame { + df!( + "foo" => &[1, 2, 3], + "bar" => &[None, Some("bak"), Some("baz")], + ) + .unwrap() +}