Skip to content

Commit

Permalink
feat: Implement LazyFrame.sink_ndjson (#10786)
Browse files Browse the repository at this point in the history
Co-authored-by: Abraham Alcantara Gonzalez <abrahamqg9@gmail.com>
  • Loading branch information
fernandocast and abealcantara authored Nov 20, 2023
1 parent 402e280 commit 1bcdf00
Show file tree
Hide file tree
Showing 18 changed files with 421 additions and 93 deletions.
29 changes: 29 additions & 0 deletions crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_core::utils::try_get_supertype;
use polars_json::json::infer;
use polars_json::json::write::FallibleStreamingIterator;
use simd_json::BorrowedValue;

use crate::mmap::{MmapBytesReader, ReaderBytes};
Expand Down Expand Up @@ -153,6 +154,34 @@ where
}
}

pub struct BatchedWriter<W: Write> {
writer: W,
}

impl<W> BatchedWriter<W>
where
W: Write,
{
pub fn new(writer: W) -> Self {
BatchedWriter { writer }
}
/// Write a batch to the json writer.
///
/// # Panics
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
let fields = df.iter().map(|s| s.field().to_arrow()).collect::<Vec<_>>();
let chunks = df.iter_chunks();
let batches =
chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
while let Some(block) = serializer.next()? {
self.writer.write_all(block)?;
}
Ok(())
}
}

/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
#[must_use]
pub struct JsonReader<'a, R>
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async = [
cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio", "futures"]
cloud_write = ["cloud"]
ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe?/ipc"]
json = ["polars-io/json", "polars-plan/json", "polars-json"]
json = ["polars-io/json", "polars-plan/json", "polars-json", "polars-pipe/json"]
csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe?/csv"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time", "dtype-duration", "polars-plan/temporal"]
# debugging purposes
Expand Down
126 changes: 74 additions & 52 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ mod err;
pub mod pivot;

use std::borrow::Cow;
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
#[cfg(any(
feature = "parquet",
feature = "ipc",
feature = "csv",
feature = "json"
))]
use std::path::PathBuf;
use std::sync::Arc;

Expand All @@ -27,7 +32,12 @@ use polars_core::prelude::*;
use polars_io::RowCount;
pub use polars_plan::frame::{AllowedOptimizations, OptState};
use polars_plan::global::FETCH_ROWS;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
use polars_plan::logical_plan::collect_fingerprints;
use polars_plan::logical_plan::optimize;
use polars_plan::utils::expr_output_name;
Expand Down Expand Up @@ -596,13 +606,23 @@ impl LazyFrame {
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;

let finger_prints = if file_caching {
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
{
let mut fps = Vec::with_capacity(8);
collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena);
Some(fps)
}
#[cfg(not(any(feature = "ipc", feature = "parquet", feature = "csv")))]
#[cfg(not(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
)))]
{
None
}
Expand Down Expand Up @@ -669,23 +689,14 @@ impl LazyFrame {
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(feature = "parquet")]
pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::File {
pub fn sink_parquet(self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path),
file_type: FileType::Parquet(options),
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order; \
use `collect().write_parquet()` instead"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
"collect().write_parquet()",
)
}

/// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit
Expand All @@ -694,50 +705,33 @@ impl LazyFrame {
/// streaming fashion.
#[cfg(all(feature = "cloud_write", feature = "parquet"))]
pub fn sink_parquet_cloud(
mut self,
self,
uri: String,
cloud_options: Option<polars_io::cloud::CloudOptions>,
parquet_options: ParquetWriteOptions,
) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::Cloud {
self.sink(
SinkType::Cloud {
uri: Arc::new(uri),
cloud_options,
file_type: FileType::Parquet(parquet_options),
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
"collect().write_parquet()",
)
}

/// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(feature = "ipc")]
pub fn sink_ipc(mut self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::File {
pub fn sink_ipc(self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path),
file_type: FileType::Ipc(options),
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order; \
use `collect().write_ipc()` instead"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
"collect().write_ipc()",
)
}

/// Stream a query result into an ipc/arrow file on an ObjectStore-compatible cloud service.
Expand Down Expand Up @@ -775,20 +769,48 @@ impl LazyFrame {
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(feature = "csv")]
pub fn sink_csv(mut self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::File {
pub fn sink_csv(self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path),
file_type: FileType::Csv(options),
},
"collect().write_csv()",
)
}

/// Stream a query result into a json file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(feature = "json")]
pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path),
file_type: FileType::Json(options),
},
"collect().write_ndjson()` or `collect().write_json()",
)
}

#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "cloud_write",
feature = "csv",
feature = "json",
))]
fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload,
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order; \
use `collect().write_csv()` instead"
ComputeError: format!("cannot run the whole query in a streaming order; \
use `{msg_alternative}` instead", msg_alternative=msg_alternative)
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
Expand Down
7 changes: 6 additions & 1 deletion crates/polars-lazy/src/physical_plan/file_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::sync::Mutex;

use polars_core::prelude::*;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
use polars_plan::logical_plan::FileFingerPrint;

use crate::prelude::*;
Expand Down
7 changes: 6 additions & 1 deletion crates/polars-lazy/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ pub mod executors;
#[cfg(any(feature = "list_eval", feature = "pivot"))]
pub(crate) mod exotic;
pub mod expressions;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
mod file_cache;
mod node_timer;
pub mod planner;
Expand Down
63 changes: 54 additions & 9 deletions crates/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,20 @@ use polars_core::config::verbose;
use polars_core::frame::group_by::GroupsProxy;
use polars_core::prelude::*;
use polars_ops::prelude::ChunkJoinOptIds;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
use polars_plan::logical_plan::FileFingerPrint;

#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
use super::file_cache::FileCache;
use crate::physical_plan::node_timer::NodeTimer;

Expand Down Expand Up @@ -65,7 +75,12 @@ pub struct ExecutionState {
// cached by a `.cache` call and kept in memory for the duration of the plan.
df_cache: Arc<Mutex<PlHashMap<usize, Arc<OnceCell<DataFrame>>>>>,
// cache file reads until all branches got there file, then we delete it
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
pub(crate) file_cache: FileCache,
pub(super) schema_cache: RwLock<Option<SchemaRef>>,
/// Used by Window Expression to prevent redundant grouping
Expand Down Expand Up @@ -110,7 +125,12 @@ impl ExecutionState {
pub(super) fn split(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: Default::default(),
group_tuples: Default::default(),
Expand All @@ -126,7 +146,12 @@ impl ExecutionState {
pub(super) fn clone(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: self.schema_cache.read().unwrap().clone().into(),
group_tuples: self.group_tuples.clone(),
Expand All @@ -138,16 +163,31 @@ impl ExecutionState {
}
}

#[cfg(not(any(feature = "parquet", feature = "csv", feature = "ipc")))]
#[cfg(not(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
)))]
pub(crate) fn with_finger_prints(_finger_prints: Option<usize>) -> Self {
Self::new()
}
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
pub(crate) fn with_finger_prints(finger_prints: Option<Vec<FileFingerPrint>>) -> Self {
Self {
df_cache: Arc::new(Mutex::new(PlHashMap::default())),
schema_cache: Default::default(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: FileCache::new(finger_prints),
group_tuples: Arc::new(RwLock::new(PlHashMap::default())),
join_tuples: Arc::new(Mutex::new(PlHashMap::default())),
Expand All @@ -166,7 +206,12 @@ impl ExecutionState {
Self {
df_cache: Default::default(),
schema_cache: Default::default(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: FileCache::new(None),
group_tuples: Default::default(),
join_tuples: Default::default(),
Expand Down
Loading

0 comments on commit 1bcdf00

Please sign in to comment.