From 5c4e7e97a66007d3052042543486c42d23cfac74 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Tue, 10 Sep 2024 23:54:39 +1000 Subject: [PATCH] refactor(rust): Re-use already decoded metadata for first path (new-parquet-source) (#18656) --- .../src/parquet/metadata/file_metadata.rs | 2 +- .../src/nodes/parquet_source/init.rs | 409 ----------------- .../nodes/parquet_source/metadata_fetch.rs | 430 ++++++++++++++++++ .../src/nodes/parquet_source/mod.rs | 6 +- .../src/physical_plan/to_graph.rs | 3 +- crates/polars-utils/src/mmap.rs | 4 +- 6 files changed, 439 insertions(+), 415 deletions(-) create mode 100644 crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs diff --git a/crates/polars-parquet/src/parquet/metadata/file_metadata.rs b/crates/polars-parquet/src/parquet/metadata/file_metadata.rs index 492d283f64ed..47c9f160781d 100644 --- a/crates/polars-parquet/src/parquet/metadata/file_metadata.rs +++ b/crates/polars-parquet/src/parquet/metadata/file_metadata.rs @@ -10,7 +10,7 @@ pub use crate::parquet::thrift_format::KeyValue; /// Metadata for a Parquet file. // This is almost equal to [`parquet_format_safe::FileMetaData`] but contains the descriptors, // which are crucial to deserialize pages. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FileMetadata { /// version of this file. pub version: i32, diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index 2aba9642fb04..661ea4b84825 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -5,13 +5,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use polars_core::frame::DataFrame; use polars_error::PolarsResult; -use polars_io::prelude::FileMetadata; -use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource}; -use polars_io::utils::slice::SplitSlicePosition; -use polars_utils::mmap::MemSlice; -use polars_utils::pl_str::PlSmallStr; -use super::metadata_utils::{ensure_metadata_has_projected_fields, read_parquet_metadata_bytes}; use super::row_group_data_fetch::RowGroupDataFetcher; use super::row_group_decode::RowGroupDecoder; use super::{AsyncTaskData, ParquetSourceNode}; @@ -20,7 +14,6 @@ use crate::async_primitives::connector::connector; use crate::async_primitives::wait_group::{WaitGroup, WaitToken}; use crate::morsel::get_ideal_morsel_size; use crate::nodes::{MorselSeq, TaskPriority}; -use crate::utils::task_handles_ext; impl ParquetSourceNode { /// # Panics @@ -269,408 +262,6 @@ impl ParquetSourceNode { (raw_morsel_receivers, morsel_stream_task_handle) } - /// Constructs the task that fetches file metadata. - /// Note: This must be called AFTER `self.projected_arrow_fields` has been initialized. - /// - /// TODO: During IR conversion the metadata of the first file is already downloaded - see if - /// we can find a way to re-use it. - #[allow(clippy::type_complexity)] - fn init_metadata_fetcher( - &mut self, - ) -> ( - tokio::sync::oneshot::Receiver>, - crate::async_primitives::connector::Receiver<( - usize, - usize, - Arc, - FileMetadata, - usize, - )>, - task_handles_ext::AbortOnDropHandle>, - ) { - let verbose = self.verbose; - let io_runtime = polars_io::pl_async::get_runtime(); - - assert!( - !self.projected_arrow_fields.is_empty() - || self.file_options.with_columns.as_deref() == Some(&[]) - ); - let projected_arrow_fields = self.projected_arrow_fields.clone(); - let needs_max_row_group_height_calc = - self.file_options.include_file_paths.is_some() || self.hive_parts.is_some(); - - let (normalized_slice_oneshot_tx, normalized_slice_oneshot_rx) = - tokio::sync::oneshot::channel(); - let (mut metadata_tx, metadata_rx) = connector(); - - let byte_source_builder = self.byte_source_builder.clone(); - - if self.verbose { - eprintln!( - "[ParquetSource]: Byte source builder: {:?}", - &byte_source_builder - ); - } - - let fetch_metadata_bytes_for_path_index = { - let scan_sources = &self.scan_sources; - let cloud_options = Arc::new(self.cloud_options.clone()); - - let scan_sources = scan_sources.clone(); - let cloud_options = cloud_options.clone(); - let byte_source_builder = byte_source_builder.clone(); - - move |path_idx: usize| { - let scan_sources = scan_sources.clone(); - let cloud_options = cloud_options.clone(); - let byte_source_builder = byte_source_builder.clone(); - - let handle = io_runtime.spawn(async move { - let mut byte_source = Arc::new( - scan_sources - .get(path_idx) - .unwrap() - .to_dyn_byte_source( - &byte_source_builder, - cloud_options.as_ref().as_ref(), - ) - .await?, - ); - let (metadata_bytes, maybe_full_bytes) = - read_parquet_metadata_bytes(byte_source.as_ref(), verbose).await?; - - if let Some(v) = maybe_full_bytes { - if !matches!(byte_source.as_ref(), DynByteSource::MemSlice(_)) { - if verbose { - eprintln!( - "[ParquetSource]: Parquet file was fully fetched during \ - metadata read ({} bytes).", - v.len(), - ); - } - - byte_source = Arc::new(DynByteSource::from(MemSliceByteSource(v))) - } - } - - PolarsResult::Ok((path_idx, byte_source, metadata_bytes)) - }); - - let handle = task_handles_ext::AbortOnDropHandle(handle); - - std::future::ready(handle) - } - }; - - let process_metadata_bytes = { - move |handle: task_handles_ext::AbortOnDropHandle< - PolarsResult<(usize, Arc, MemSlice)>, - >| { - let projected_arrow_fields = projected_arrow_fields.clone(); - // Run on CPU runtime - metadata deserialization is expensive, especially - // for very wide tables. - let handle = async_executor::spawn(TaskPriority::Low, async move { - let (path_index, byte_source, metadata_bytes) = handle.await.unwrap()?; - - let metadata = polars_parquet::parquet::read::deserialize_metadata( - metadata_bytes.as_ref(), - metadata_bytes.len() * 2 + 1024, - )?; - - ensure_metadata_has_projected_fields( - projected_arrow_fields.as_ref(), - &metadata, - )?; - - let file_max_row_group_height = if needs_max_row_group_height_calc { - metadata - .row_groups - .iter() - .map(|x| x.num_rows()) - .max() - .unwrap_or(0) - } else { - 0 - }; - - PolarsResult::Ok((path_index, byte_source, metadata, file_max_row_group_height)) - }); - - async_executor::AbortOnDropHandle::new(handle) - } - }; - - let metadata_prefetch_size = self.config.metadata_prefetch_size; - let metadata_decode_ahead_size = self.config.metadata_decode_ahead_size; - - let (start_tx, start_rx) = tokio::sync::oneshot::channel(); - self.morsel_stream_starter = Some(start_tx); - - let metadata_task_handle = if self - .file_options - .slice - .map(|(offset, _)| offset >= 0) - .unwrap_or(true) - { - normalized_slice_oneshot_tx - .send( - self.file_options - .slice - .map(|(offset, len)| (offset as usize, len)), - ) - .unwrap(); - - // Safety: `offset + len` does not overflow. - let slice_range = self - .file_options - .slice - .map(|(offset, len)| offset as usize..offset as usize + len); - - let mut metadata_stream = futures::stream::iter(0..self.scan_sources.len()) - .map(fetch_metadata_bytes_for_path_index) - .buffered(metadata_prefetch_size) - .map(process_metadata_bytes) - .buffered(metadata_decode_ahead_size); - - let scan_sources = self.scan_sources.clone(); - - // We need to be able to both stop early as well as skip values, which is easier to do - // using a custom task instead of futures::stream - io_runtime.spawn(async move { - let current_row_offset_ref = &mut 0usize; - let current_path_index_ref = &mut 0usize; - - if start_rx.await.is_err() { - return Ok(()); - } - - if verbose { - eprintln!("[ParquetSource]: Starting data fetch") - } - - loop { - let current_path_index = *current_path_index_ref; - *current_path_index_ref += 1; - - let Some(v) = metadata_stream.next().await else { - break; - }; - - let (path_index, byte_source, metadata, file_max_row_group_height) = v - .map_err(|err| { - err.wrap_msg(|msg| { - format!( - "error at path (index: {}, path: {:?}): {}", - current_path_index, - scan_sources - .get(current_path_index) - .map(|x| PlSmallStr::from_str(x.to_include_path_name())), - msg - ) - }) - })?; - - assert_eq!(path_index, current_path_index); - - let current_row_offset = *current_row_offset_ref; - *current_row_offset_ref = current_row_offset.saturating_add(metadata.num_rows); - - if let Some(slice_range) = slice_range.clone() { - match SplitSlicePosition::split_slice_at_file( - current_row_offset, - metadata.num_rows, - slice_range, - ) { - SplitSlicePosition::Before => { - if verbose { - eprintln!( - "[ParquetSource]: Slice pushdown: \ - Skipped file at index {} ({} rows)", - current_path_index, metadata.num_rows - ); - } - continue; - }, - SplitSlicePosition::After => unreachable!(), - SplitSlicePosition::Overlapping(..) => {}, - }; - }; - - if metadata_tx - .send(( - path_index, - current_row_offset, - byte_source, - metadata, - file_max_row_group_height, - )) - .await - .is_err() - { - break; - } - - if let Some(slice_range) = slice_range.as_ref() { - if *current_row_offset_ref >= slice_range.end { - if verbose { - eprintln!( - "[ParquetSource]: Slice pushdown: \ - Stopped reading at file at index {} \ - (remaining {} files will not be read)", - current_path_index, - scan_sources.len() - current_path_index - 1, - ); - } - break; - } - }; - } - - Ok(()) - }) - } else { - // Walk the files in reverse to translate the slice into a positive offset. - let slice = self.file_options.slice.unwrap(); - let slice_start_as_n_from_end = -slice.0 as usize; - - let mut metadata_stream = futures::stream::iter((0..self.scan_sources.len()).rev()) - .map(fetch_metadata_bytes_for_path_index) - .buffered(metadata_prefetch_size) - .map(process_metadata_bytes) - .buffered(metadata_decode_ahead_size); - - // Note: - // * We want to wait until the first morsel is requested before starting this - let init_negative_slice_and_metadata = async move { - let mut processed_metadata_rev = vec![]; - let mut cum_rows = 0; - - while let Some(v) = metadata_stream.next().await { - let v = v?; - let (_, _, metadata, _) = &v; - cum_rows += metadata.num_rows; - processed_metadata_rev.push(v); - - if cum_rows >= slice_start_as_n_from_end { - break; - } - } - - let (start, len) = if slice_start_as_n_from_end > cum_rows { - // We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50 - // rows should only give the first 25 rows. - let first_file_position = slice_start_as_n_from_end - cum_rows; - (0, slice.1.saturating_sub(first_file_position)) - } else { - (cum_rows - slice_start_as_n_from_end, slice.1) - }; - - if len == 0 { - processed_metadata_rev.clear(); - } - - normalized_slice_oneshot_tx - .send(Some((start, len))) - .unwrap(); - - let slice_range = start..(start + len); - - PolarsResult::Ok((slice_range, processed_metadata_rev, cum_rows)) - }; - - let path_count = self.scan_sources.len(); - - io_runtime.spawn(async move { - if start_rx.await.is_err() { - return Ok(()); - } - - if verbose { - eprintln!("[ParquetSource]: Starting data fetch (negative slice)") - } - - let (slice_range, processed_metadata_rev, cum_rows) = - async_executor::AbortOnDropHandle::new(async_executor::spawn( - TaskPriority::Low, - init_negative_slice_and_metadata, - )) - .await?; - - if verbose { - if let Some((path_index, ..)) = processed_metadata_rev.last() { - eprintln!( - "[ParquetSource]: Slice pushdown: Negatively-offsetted slice {:?} \ - begins at file index {}, translated to {:?}", - slice, path_index, slice_range - ); - } else { - eprintln!( - "[ParquetSource]: Slice pushdown: Negatively-offsetted slice {:?} \ - skipped all files ({} files containing {} rows)", - slice, path_count, cum_rows - ) - } - } - - let metadata_iter = processed_metadata_rev.into_iter().rev(); - let current_row_offset_ref = &mut 0usize; - - for (current_path_index, byte_source, metadata, file_max_row_group_height) in - metadata_iter - { - let current_row_offset = *current_row_offset_ref; - *current_row_offset_ref = current_row_offset.saturating_add(metadata.num_rows); - - assert!(matches!( - SplitSlicePosition::split_slice_at_file( - current_row_offset, - metadata.num_rows, - slice_range.clone(), - ), - SplitSlicePosition::Overlapping(..) - )); - - if metadata_tx - .send(( - current_path_index, - current_row_offset, - byte_source, - metadata, - file_max_row_group_height, - )) - .await - .is_err() - { - break; - } - - if *current_row_offset_ref >= slice_range.end { - if verbose { - eprintln!( - "[ParquetSource]: Slice pushdown: \ - Stopped reading at file at index {} \ - (remaining {} files will not be read)", - current_path_index, - path_count - current_path_index - 1, - ); - } - break; - } - } - - Ok(()) - }) - }; - - let metadata_task_handle = task_handles_ext::AbortOnDropHandle(metadata_task_handle); - - ( - normalized_slice_oneshot_rx, - metadata_rx, - metadata_task_handle, - ) - } - /// Creates a `RowGroupDecoder` that turns `RowGroupData` into DataFrames. /// This must be called AFTER the following have been initialized: /// * `self.projected_arrow_fields` diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs new file mode 100644 index 000000000000..5f3281145083 --- /dev/null +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs @@ -0,0 +1,430 @@ +use std::sync::Arc; + +use futures::StreamExt; +use polars_error::PolarsResult; +use polars_io::prelude::FileMetadata; +use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource}; +use polars_io::utils::slice::SplitSlicePosition; +use polars_utils::mmap::MemSlice; +use polars_utils::pl_str::PlSmallStr; + +use super::metadata_utils::{ensure_metadata_has_projected_fields, read_parquet_metadata_bytes}; +use super::ParquetSourceNode; +use crate::async_executor; +use crate::async_primitives::connector::connector; +use crate::nodes::TaskPriority; +use crate::utils::task_handles_ext; + +impl ParquetSourceNode { + /// Constructs the task that fetches file metadata. + /// Note: This must be called AFTER `self.projected_arrow_fields` has been initialized. + #[allow(clippy::type_complexity)] + pub(super) fn init_metadata_fetcher( + &mut self, + ) -> ( + tokio::sync::oneshot::Receiver>, + crate::async_primitives::connector::Receiver<( + usize, + usize, + Arc, + FileMetadata, + usize, + )>, + task_handles_ext::AbortOnDropHandle>, + ) { + let verbose = self.verbose; + let io_runtime = polars_io::pl_async::get_runtime(); + + assert!( + !self.projected_arrow_fields.is_empty() + || self.file_options.with_columns.as_deref() == Some(&[]) + ); + let projected_arrow_fields = self.projected_arrow_fields.clone(); + let needs_max_row_group_height_calc = + self.file_options.include_file_paths.is_some() || self.hive_parts.is_some(); + + let (normalized_slice_oneshot_tx, normalized_slice_oneshot_rx) = + tokio::sync::oneshot::channel(); + let (mut metadata_tx, metadata_rx) = connector(); + + let byte_source_builder = self.byte_source_builder.clone(); + + if self.verbose { + eprintln!( + "[ParquetSource]: Byte source builder: {:?}", + &byte_source_builder + ); + } + + let fetch_metadata_bytes_for_path_index = { + let scan_sources = &self.scan_sources; + let cloud_options = Arc::new(self.cloud_options.clone()); + + let scan_sources = scan_sources.clone(); + let cloud_options = cloud_options.clone(); + let byte_source_builder = byte_source_builder.clone(); + + move |path_idx: usize| { + let scan_sources = scan_sources.clone(); + let cloud_options = cloud_options.clone(); + let byte_source_builder = byte_source_builder.clone(); + + let handle = io_runtime.spawn(async move { + let mut byte_source = Arc::new( + scan_sources + .get(path_idx) + .unwrap() + .to_dyn_byte_source( + &byte_source_builder, + cloud_options.as_ref().as_ref(), + ) + .await?, + ); + + if path_idx == 0 { + let metadata_bytes = MemSlice::EMPTY; + return Ok((0, byte_source, metadata_bytes)); + } + + let (metadata_bytes, maybe_full_bytes) = + read_parquet_metadata_bytes(byte_source.as_ref(), verbose).await?; + + if let Some(v) = maybe_full_bytes { + if !matches!(byte_source.as_ref(), DynByteSource::MemSlice(_)) { + if verbose { + eprintln!( + "[ParquetSource]: Parquet file was fully fetched during \ + metadata read ({} bytes).", + v.len(), + ); + } + + byte_source = Arc::new(DynByteSource::from(MemSliceByteSource(v))) + } + } + + PolarsResult::Ok((path_idx, byte_source, metadata_bytes)) + }); + + let handle = task_handles_ext::AbortOnDropHandle(handle); + + std::future::ready(handle) + } + }; + + let first_metadata = self.first_metadata.clone(); + + let process_metadata_bytes = { + move |handle: task_handles_ext::AbortOnDropHandle< + PolarsResult<(usize, Arc, MemSlice)>, + >| { + let projected_arrow_fields = projected_arrow_fields.clone(); + let first_metadata = first_metadata.clone(); + // Run on CPU runtime - metadata deserialization is expensive, especially + // for very wide tables. + let handle = async_executor::spawn(TaskPriority::Low, async move { + let (path_index, byte_source, metadata_bytes) = handle.await.unwrap()?; + + let metadata = if path_index == 0 { + Arc::unwrap_or_clone(first_metadata) + } else { + polars_parquet::parquet::read::deserialize_metadata( + metadata_bytes.as_ref(), + metadata_bytes.len() * 2 + 1024, + )? + }; + + ensure_metadata_has_projected_fields( + projected_arrow_fields.as_ref(), + &metadata, + )?; + + let file_max_row_group_height = if needs_max_row_group_height_calc { + metadata + .row_groups + .iter() + .map(|x| x.num_rows()) + .max() + .unwrap_or(0) + } else { + 0 + }; + + PolarsResult::Ok((path_index, byte_source, metadata, file_max_row_group_height)) + }); + + async_executor::AbortOnDropHandle::new(handle) + } + }; + + let metadata_prefetch_size = self.config.metadata_prefetch_size; + let metadata_decode_ahead_size = self.config.metadata_decode_ahead_size; + + let (start_tx, start_rx) = tokio::sync::oneshot::channel(); + self.morsel_stream_starter = Some(start_tx); + + let metadata_task_handle = if self + .file_options + .slice + .map(|(offset, _)| offset >= 0) + .unwrap_or(true) + { + normalized_slice_oneshot_tx + .send( + self.file_options + .slice + .map(|(offset, len)| (offset as usize, len)), + ) + .unwrap(); + + // Safety: `offset + len` does not overflow. + let slice_range = self + .file_options + .slice + .map(|(offset, len)| offset as usize..offset as usize + len); + + let mut metadata_stream = futures::stream::iter(0..self.scan_sources.len()) + .map(fetch_metadata_bytes_for_path_index) + .buffered(metadata_prefetch_size) + .map(process_metadata_bytes) + .buffered(metadata_decode_ahead_size); + + let scan_sources = self.scan_sources.clone(); + + // We need to be able to both stop early as well as skip values, which is easier to do + // using a custom task instead of futures::stream + io_runtime.spawn(async move { + let current_row_offset_ref = &mut 0usize; + let current_path_index_ref = &mut 0usize; + + if start_rx.await.is_err() { + return Ok(()); + } + + if verbose { + eprintln!("[ParquetSource]: Starting data fetch") + } + + loop { + let current_path_index = *current_path_index_ref; + *current_path_index_ref += 1; + + let Some(v) = metadata_stream.next().await else { + break; + }; + + let (path_index, byte_source, metadata, file_max_row_group_height) = v + .map_err(|err| { + err.wrap_msg(|msg| { + format!( + "error at path (index: {}, path: {:?}): {}", + current_path_index, + scan_sources + .get(current_path_index) + .map(|x| PlSmallStr::from_str(x.to_include_path_name())), + msg + ) + }) + })?; + + assert_eq!(path_index, current_path_index); + + let current_row_offset = *current_row_offset_ref; + *current_row_offset_ref = current_row_offset.saturating_add(metadata.num_rows); + + if let Some(slice_range) = slice_range.clone() { + match SplitSlicePosition::split_slice_at_file( + current_row_offset, + metadata.num_rows, + slice_range, + ) { + SplitSlicePosition::Before => { + if verbose { + eprintln!( + "[ParquetSource]: Slice pushdown: \ + Skipped file at index {} ({} rows)", + current_path_index, metadata.num_rows + ); + } + continue; + }, + SplitSlicePosition::After => unreachable!(), + SplitSlicePosition::Overlapping(..) => {}, + }; + }; + + if metadata_tx + .send(( + path_index, + current_row_offset, + byte_source, + metadata, + file_max_row_group_height, + )) + .await + .is_err() + { + break; + } + + if let Some(slice_range) = slice_range.as_ref() { + if *current_row_offset_ref >= slice_range.end { + if verbose { + eprintln!( + "[ParquetSource]: Slice pushdown: \ + Stopped reading at file at index {} \ + (remaining {} files will not be read)", + current_path_index, + scan_sources.len() - current_path_index - 1, + ); + } + break; + } + }; + } + + Ok(()) + }) + } else { + // Walk the files in reverse to translate the slice into a positive offset. + let slice = self.file_options.slice.unwrap(); + let slice_start_as_n_from_end = -slice.0 as usize; + + let mut metadata_stream = futures::stream::iter((0..self.scan_sources.len()).rev()) + .map(fetch_metadata_bytes_for_path_index) + .buffered(metadata_prefetch_size) + .map(process_metadata_bytes) + .buffered(metadata_decode_ahead_size); + + // Note: + // * We want to wait until the first morsel is requested before starting this + let init_negative_slice_and_metadata = async move { + let mut processed_metadata_rev = vec![]; + let mut cum_rows = 0; + + while let Some(v) = metadata_stream.next().await { + let v = v?; + let (_, _, metadata, _) = &v; + cum_rows += metadata.num_rows; + processed_metadata_rev.push(v); + + if cum_rows >= slice_start_as_n_from_end { + break; + } + } + + let (start, len) = if slice_start_as_n_from_end > cum_rows { + // We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50 + // rows should only give the first 25 rows. + let first_file_position = slice_start_as_n_from_end - cum_rows; + (0, slice.1.saturating_sub(first_file_position)) + } else { + (cum_rows - slice_start_as_n_from_end, slice.1) + }; + + if len == 0 { + processed_metadata_rev.clear(); + } + + normalized_slice_oneshot_tx + .send(Some((start, len))) + .unwrap(); + + let slice_range = start..(start + len); + + PolarsResult::Ok((slice_range, processed_metadata_rev, cum_rows)) + }; + + let path_count = self.scan_sources.len(); + + io_runtime.spawn(async move { + if start_rx.await.is_err() { + return Ok(()); + } + + if verbose { + eprintln!("[ParquetSource]: Starting data fetch (negative slice)") + } + + let (slice_range, processed_metadata_rev, cum_rows) = + async_executor::AbortOnDropHandle::new(async_executor::spawn( + TaskPriority::Low, + init_negative_slice_and_metadata, + )) + .await?; + + if verbose { + if let Some((path_index, ..)) = processed_metadata_rev.last() { + eprintln!( + "[ParquetSource]: Slice pushdown: Negatively-offsetted slice {:?} \ + begins at file index {}, translated to {:?}", + slice, path_index, slice_range + ); + } else { + eprintln!( + "[ParquetSource]: Slice pushdown: Negatively-offsetted slice {:?} \ + skipped all files ({} files containing {} rows)", + slice, path_count, cum_rows + ) + } + } + + let metadata_iter = processed_metadata_rev.into_iter().rev(); + let current_row_offset_ref = &mut 0usize; + + for (current_path_index, byte_source, metadata, file_max_row_group_height) in + metadata_iter + { + let current_row_offset = *current_row_offset_ref; + *current_row_offset_ref = current_row_offset.saturating_add(metadata.num_rows); + + assert!(matches!( + SplitSlicePosition::split_slice_at_file( + current_row_offset, + metadata.num_rows, + slice_range.clone(), + ), + SplitSlicePosition::Overlapping(..) + )); + + if metadata_tx + .send(( + current_path_index, + current_row_offset, + byte_source, + metadata, + file_max_row_group_height, + )) + .await + .is_err() + { + break; + } + + if *current_row_offset_ref >= slice_range.end { + if verbose { + eprintln!( + "[ParquetSource]: Slice pushdown: \ + Stopped reading at file at index {} \ + (remaining {} files will not be read)", + current_path_index, + path_count - current_path_index - 1, + ); + } + break; + } + } + + Ok(()) + }) + }; + + let metadata_task_handle = task_handles_ext::AbortOnDropHandle(metadata_task_handle); + + ( + normalized_slice_oneshot_rx, + metadata_rx, + metadata_task_handle, + ) + } +} diff --git a/crates/polars-stream/src/nodes/parquet_source/mod.rs b/crates/polars-stream/src/nodes/parquet_source/mod.rs index a9344aa35c21..10df7ef0e3bf 100644 --- a/crates/polars-stream/src/nodes/parquet_source/mod.rs +++ b/crates/polars-stream/src/nodes/parquet_source/mod.rs @@ -8,7 +8,7 @@ use polars_error::PolarsResult; use polars_expr::prelude::{phys_expr_to_io_expr, PhysicalExpr}; use polars_io::cloud::CloudOptions; use polars_io::predicates::PhysicalIoExpr; -use polars_io::prelude::ParquetOptions; +use polars_io::prelude::{FileMetadata, ParquetOptions}; use polars_io::utils::byte_source::DynByteSourceBuilder; use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::{FileInfo, ScanSources}; @@ -23,6 +23,7 @@ use crate::morsel::SourceToken; mod init; mod mem_prefetch_funcs; +mod metadata_fetch; mod metadata_utils; mod row_group_data_fetch; mod row_group_decode; @@ -41,6 +42,7 @@ pub struct ParquetSourceNode { options: ParquetOptions, cloud_options: Option, file_options: FileScanOptions, + first_metadata: Arc, // Run-time vars config: Config, verbose: bool, @@ -77,6 +79,7 @@ impl ParquetSourceNode { options: ParquetOptions, cloud_options: Option, file_options: FileScanOptions, + first_metadata: Arc, ) -> Self { let verbose = config::verbose(); @@ -95,6 +98,7 @@ impl ParquetSourceNode { options, cloud_options, file_options, + first_metadata, config: Config { // Initialized later diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 9166acefa2e3..e5cbf86b0351 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -293,7 +293,7 @@ fn to_graph_rec<'a>( FileScan::Parquet { options, cloud_options, - metadata: _, + metadata: first_metadata, } => { if std::env::var("POLARS_DISABLE_PARQUET_SOURCE").as_deref() != Ok("1") { ctx.graph.add_node( @@ -305,6 +305,7 @@ fn to_graph_rec<'a>( options, cloud_options, file_options, + first_metadata.unwrap(), ), [], ) diff --git a/crates/polars-utils/src/mmap.rs b/crates/polars-utils/src/mmap.rs index 1a9e20191b1e..cd33ab85438a 100644 --- a/crates/polars-utils/src/mmap.rs +++ b/crates/polars-utils/src/mmap.rs @@ -62,9 +62,7 @@ mod private { } impl MemSlice { - pub const fn empty() -> Self { - Self::from_static(&[]) - } + pub const EMPTY: Self = Self::from_static(&[]); /// Copy the contents into a new owned `Vec` #[inline(always)]