Skip to content

Commit

Permalink
feat: improve error handling in scan_parquet and deal with file limits (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 22, 2023
1 parent 7edac49 commit be87f99
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 80 deletions.
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl IpcExec {
self.file_options.row_count.is_some(),
None,
);
IpcReader::new(file.unwrap())
IpcReader::new(file?)
.with_n_rows(self.file_options.n_rows)
.with_row_count(std::mem::take(&mut self.file_options.row_count))
.set_rechunk(self.file_options.rechunk)
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ fn prepare_scan_args(
schema: &mut SchemaRef,
has_row_count: bool,
hive_partitions: Option<&[Series]>,
) -> (Option<std::fs::File>, Projection, Predicate) {
let file = std::fs::File::open(path).ok();
) -> (std::io::Result<std::fs::File>, Projection, Predicate) {
let file = std::fs::File::open(path);

let with_columns = mem::take(with_columns);
let schema = mem::take(schema);
Expand Down
173 changes: 96 additions & 77 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,90 +47,109 @@ impl ParquetExec {
identity => identity,
};

// First initialize the readers, predicates and metadata.
// This will be used to determine the slices. That way we can actually read all the
// files in parallel even when we add row counts or slices.
let readers_and_metadata = self
let mut result = vec![];

let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX);
let mut base_row_count = self.file_options.row_count.take();

// Limit no. of files at a time to prevent open file limits.
for paths in self
.paths
.iter()
.map(|path| {
let mut file_info = self.file_info.clone();
file_info.update_hive_partitions(path);
.chunks(std::cmp::min(POOL.current_num_threads(), 128))
{
if remaining_rows_to_read == 0 && !result.is_empty() {
return Ok(result);
}

let hive_partitions = file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());
// First initialize the readers, predicates and metadata.
// This will be used to determine the slices. That way we can actually read all the
// files in parallel even when we add row counts or slices.
let readers_and_metadata = paths
.iter()
.map(|path| {
let mut file_info = self.file_info.clone();
file_info.update_hive_partitions(path);

let (file, projection, predicate) = prepare_scan_args(
path,
&self.predicate,
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
self.file_options.row_count.is_some(),
hive_partitions.as_deref(),
);

let mut reader = if let Some(file) = file {
PolarsResult::Ok(
ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.read_parallel(parallel)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.set_rechunk(false)
.with_hive_partition_columns(hive_partitions),
)
} else {
polars_bail!(ComputeError: "could not read {}", path.display())
}?;
let hive_partitions = file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());

reader
.num_rows()
.map(|num_rows| (reader, num_rows, predicate, projection))
})
.collect::<PolarsResult<Vec<_>>>()?;
let (file, projection, predicate) = prepare_scan_args(
path,
&self.predicate,
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
base_row_count.is_some(),
hive_partitions.as_deref(),
);

let n_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX);
let iter = readers_and_metadata
.iter()
.map(|(_, num_rows, _, _)| *num_rows);
let file = file?;
let mut reader = ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.read_parallel(parallel)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.set_rechunk(false)
.with_hive_partition_columns(hive_partitions);

let rows_statistics = get_sequential_row_statistics(iter, n_rows_to_read);
let base_row_count = &self.file_options.row_count;
reader
.num_rows()
.map(|num_rows| (reader, num_rows, predicate, projection))
})
.collect::<PolarsResult<Vec<_>>>()?;

let iter = readers_and_metadata
.iter()
.map(|(_, num_rows, _, _)| *num_rows);

POOL.install(|| {
readers_and_metadata
.into_par_iter()
.zip(rows_statistics.par_iter())
.map(
|(
(reader, num_rows_this_file, predicate, projection),
(remaining_rows_to_read, cumulative_read),
)| {
let remaining_rows_to_read = *remaining_rows_to_read;
let remaining_rows_to_read = if num_rows_this_file < remaining_rows_to_read
{
None
} else {
Some(remaining_rows_to_read)
};
let row_count = base_row_count.as_ref().map(|rc| RowCount {
name: rc.name.clone(),
offset: rc.offset + *cumulative_read as IdxSize,
});

reader
.with_n_rows(remaining_rows_to_read)
.with_row_count(row_count)
._finish_with_scan_ops(
predicate.clone(),
projection.as_ref().map(|v| v.as_ref()),
)
},
)
.collect()
})
let rows_statistics = get_sequential_row_statistics(iter, remaining_rows_to_read);

let out = POOL.install(|| {
readers_and_metadata
.into_par_iter()
.zip(rows_statistics.par_iter())
.map(
|(
(reader, num_rows_this_file, predicate, projection),
(remaining_rows_to_read, cumulative_read),
)| {
let remaining_rows_to_read = *remaining_rows_to_read;
let remaining_rows_to_read =
if num_rows_this_file < remaining_rows_to_read {
None
} else {
Some(remaining_rows_to_read)
};
let row_count = base_row_count.as_ref().map(|rc| RowCount {
name: rc.name.clone(),
offset: rc.offset + *cumulative_read as IdxSize,
});

reader
.with_n_rows(remaining_rows_to_read)
.with_row_count(row_count)
._finish_with_scan_ops(
predicate.clone(),
projection.as_ref().map(|v| v.as_ref()),
)
},
)
.collect::<PolarsResult<Vec<_>>>()
})?;

let n_read = out.iter().map(|df| df.height()).sum();
remaining_rows_to_read = remaining_rows_to_read.saturating_sub(n_read);
if let Some(rc) = &mut base_row_count {
rc.offset += n_read as IdxSize;
}
if result.is_empty() {
result = out;
} else {
result.extend_from_slice(&out)
}
}
Ok(result)
}

#[cfg(feature = "cloud")]
Expand Down

0 comments on commit be87f99

Please sign in to comment.