Skip to content

Commit

Permalink
fix: ensure streaming parquet datasets deal with limits
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 24, 2023
1 parent eff109f commit 4f69464
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
8 changes: 8 additions & 0 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,14 @@ impl BatchedParquetReader {
})
}

pub fn limit_reached(&self) -> bool {
self.limit == 0
}

pub fn schema(&self) -> &SchemaRef {
&self.schema
}

pub async fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if self.limit == 0 && self.has_returned {
return Ok(None);
Expand Down
27 changes: 24 additions & 3 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;

use polars_core::error::PolarsResult;
use polars_core::error::*;
use polars_core::utils::arrow::io::parquet::read::FileMetaData;
use polars_core::POOL;
use polars_io::cloud::CloudOptions;
Expand All @@ -22,6 +22,7 @@ use crate::pipeline::determine_chunk_size;
pub struct ParquetSource {
batched_reader: Option<BatchedParquetReader>,
n_threads: usize,
processed_paths: usize,
chunk_index: IdxSize,
paths: std::slice::Iter<'static, PathBuf>,
_paths_lifetime: Arc<[PathBuf]>,
Expand Down Expand Up @@ -69,6 +70,12 @@ impl ParquetSource {
eprintln!("STREAMING CHUNK SIZE: {chunk_size} rows")
}

let reader_schema = if self.processed_paths == 0 {
Some(self.file_info.reader_schema.clone())
} else {
None
};

let batched_reader = if is_cloud_url(path) {
#[cfg(not(feature = "async"))]
{
Expand All @@ -83,7 +90,7 @@ impl ParquetSource {
ParquetAsyncReader::from_uri(
&uri,
self.cloud_options.as_ref(),
Some(self.file_info.reader_schema.clone()),
reader_schema,
self.metadata.clone(),
)
.await?
Expand All @@ -105,7 +112,7 @@ impl ParquetSource {
let file = std::fs::File::open(path).unwrap();

ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.with_schema(reader_schema)
.with_n_rows(file_options.n_rows)
.with_row_count(file_options.row_count)
.with_projection(projection)
Expand All @@ -118,7 +125,11 @@ impl ParquetSource {
)
.batched(chunk_size)?
};
if self.processed_paths >= 1 {
polars_ensure!(batched_reader.schema().as_ref() == self.file_info.reader_schema.as_ref(), ComputeError: "schema of all files in a single scan_parquet must be equal");
}
self.batched_reader = Some(batched_reader);
self.processed_paths += 1;
Ok(())
}

Expand All @@ -145,6 +156,7 @@ impl ParquetSource {
batched_reader: None,
n_threads,
chunk_index: 0,
processed_paths: 0,
options,
file_options,
paths: iter,
Expand Down Expand Up @@ -175,14 +187,23 @@ impl Source for ParquetSource {
)?;
Ok(match batches {
None => {
if self.batched_reader.as_ref().unwrap().limit_reached() {
return Ok(SourceResult::Finished);
}
// reset the reader
self.batched_reader = None;
self.metadata = None;
return self.get_batches(_context);
},
Some(batches) => SourceResult::GotMoreData(
batches
.into_iter()
.map(|data| {
// Keep the row limit updated so the next reader will have a correct limit.
if let Some(n_rows) = &mut self.file_options.n_rows {
*n_rows = n_rows.saturating_sub(data.height())
}

let chunk_index = self.chunk_index;
self.chunk_index += 1;
DataChunk { chunk_index, data }
Expand Down

0 comments on commit 4f69464

Please sign in to comment.