From 58265f68078c19a1e07dfc9c7f81cff41b372ad3 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Mon, 23 Sep 2024 16:21:24 +0200 Subject: [PATCH] fix: Properly merge live- and dead columns in prefiltered (#18862) --- .../polars-io/src/parquet/read/read_impl.rs | 79 +++++++++++++++---- 1 file changed, 63 insertions(+), 16 deletions(-) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 4e0d741d972d..1d5d53cf4c4d 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -223,6 +223,18 @@ fn rg_to_dfs( } } +/// Load several Parquet row groups as DataFrames while filtering predicate items. +/// +/// This strategy works as follows: +/// +/// ```text +/// For each Row Group: +/// 1. Skip this row group if statistics already filter it out +/// 2. Load all the data for the columns needed for the predicate (i.e. the live columns) +/// 3. Create a predicate mask. +/// 4. Load the filtered data for the columns not in the predicate (i.e. the dead columns) +/// 5. Merge the columns into the right DataFrame +/// ``` #[allow(clippy::too_many_arguments)] fn rg_to_dfs_prefiltered( store: &mmap::ColumnStore, @@ -276,14 +288,10 @@ fn rg_to_dfs_prefiltered( // column indexes of the schema. let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns); let mut dead_idx_to_col_idx = Vec::with_capacity(num_dead_columns); - let mut offset = 0; - for (i, field) in schema.iter_values().enumerate() { - if projection_sorted.get(offset).copied() != Some(i) { - continue; - } + for &i in projection_sorted.iter() { + let name = schema.get_at_index(i).unwrap().0.as_str(); - offset += 1; - if live_variables.contains(&field.name[..]) { + if live_variables.contains(name) { live_idx_to_col_idx.push(i); } else { dead_idx_to_col_idx.push(i); @@ -295,7 +303,7 @@ fn rg_to_dfs_prefiltered( let mask_setting = PrefilterMaskSetting::init_from_env(); - let dfs: Vec> = POOL.install(|| { + let dfs: Vec> = POOL.install(move || { // Set partitioned fields to prevent quadratic behavior. // Ensure all row groups are partitioned. @@ -376,7 +384,7 @@ fn rg_to_dfs_prefiltered( .then(|| calc_prefilter_cost(&filter_mask)) .unwrap_or_default(); - let rg_columns = (0..num_dead_columns) + let mut dead_columns = (0..num_dead_columns) .into_par_iter() .map(|i| { let col_idx = dead_idx_to_col_idx[i]; @@ -435,15 +443,54 @@ fn rg_to_dfs_prefiltered( }) .collect::>>()?; - let mut rearranged_schema = df.schema(); - rearranged_schema.merge(Schema::from_arrow_schema(schema.as_ref())); + debug_assert!(dead_columns.iter().all(|v| v.len() == df.height())); - debug_assert!(rg_columns.iter().all(|v| v.len() == df.height())); + let mut live_columns = df.take_columns(); + + assert_eq!( + live_columns.len() + dead_columns.len(), + projection_sorted.len() + ); + + let mut live_idx = 0; + let mut dead_idx = 0; + + // We create need to re-sort the columns by merging the live and dead columns. + let columns = projection_sorted + .iter() + .map(|&i| { + let name = schema.get_at_index(i).unwrap().0.as_str(); + + if live_variables.contains(name) { + debug_assert!(live_idx < live_columns.len()); + // SAFETY: We calculate the amount of live_columns in the same way. + let column = unsafe { live_columns.as_ptr().add(live_idx).read() }; + live_idx += 1; + column + } else { + debug_assert!(dead_idx < dead_columns.len()); + // SAFETY: We calculate the amount of dead_columns in the same way. + let column = unsafe { dead_columns.as_ptr().add(dead_idx).read() }; + dead_idx += 1; + column + } + }) + .collect::>(); + + debug_assert_eq!(live_idx, live_columns.len()); + debug_assert_eq!(dead_idx, dead_columns.len()); + debug_assert_eq!(columns.len(), projection_sorted.len()); + + // SAFETY: We have now moved all items from live_columns and dead_columns to + // columns. So we should set the length to 0 to avoid a double free. + unsafe { + live_columns.set_len(0); + dead_columns.set_len(0); + } - // We first add the columns with the live columns at the start. Then, we do a - // projections that puts the columns at the right spot. - df._add_columns(rg_columns, &rearranged_schema)?; - let df = df.select(schema.iter_names_cloned())?; + // SAFETY: This is completely based on the schema so all column names are unique + // and the length is given by the parquet file which should always be the same. + let df = unsafe { DataFrame::new_no_checks(columns) }; PolarsResult::Ok(Some(df)) })