Skip to content

Commit

Permalink
feat: limit concurrent downloads in async parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 24, 2023
1 parent eff109f commit 15f4199
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
7 changes: 5 additions & 2 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn read_single_column_async(
Ok((start as u64, chunk))
}

async fn read_columns_async2(
async fn read_columns_async(
async_reader: &ParquetObjectStore,
ranges: &[(u64, u64)],
) -> PolarsResult<Vec<(u64, Bytes)>> {
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn download_projection(
.collect::<Vec<_>>();
let async_reader = async_reader.clone();
let handle =
tokio::spawn(async move { read_columns_async2(&async_reader, &ranges).await });
tokio::spawn(async move { read_columns_async(&async_reader, &ranges).await });
handle.await.unwrap()
});

Expand Down Expand Up @@ -196,6 +196,9 @@ impl FetchRowGroupsFromObjectStore {
&mut self,
row_groups: Range<usize>,
) -> PolarsResult<ColumnStore> {
if row_groups.start == row_groups.end {
return Ok(ColumnStore::Fetched(Default::default()));
}
// Fetch the required row groups.
let row_groups = self
.row_groups_metadata
Expand Down
25 changes: 20 additions & 5 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,30 @@ impl ParquetExec {
let cloud_options = self.cloud_options.as_ref();

let mut result = vec![];
const MAX_CONCURRENT: usize = 64;
let batch_size = if let Some(md) = self.metadata.as_ref() {
let n_columns = self
.file_options
.with_columns
.as_ref()
.map(|opt| opt.len())
.unwrap_or(first_schema.len());
let concurrent_per_file = md.row_groups.len() * n_columns;
if verbose {
eprintln!(
"estimated concurrent downloads per file: {}",
concurrent_per_file
);
}
MAX_CONCURRENT / concurrent_per_file + 1
} else {
std::cmp::min(POOL.current_num_threads(), 16)
};

let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX);
let mut base_row_count = self.file_options.row_count.take();
let mut processed = 0;
for (batch_idx, paths) in self
.paths
.chunks(std::cmp::min(POOL.current_num_threads(), 128))
.enumerate()
{
for (batch_idx, paths) in self.paths.chunks(batch_size).enumerate() {
if remaining_rows_to_read == 0 && !result.is_empty() {
return Ok(result);
}
Expand Down

0 comments on commit 15f4199

Please sign in to comment.