Skip to content

Commit

Permalink
fix: Properly merge live- and dead columns in prefiltered (#18862)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Sep 23, 2024
1 parent 3be3b86 commit 58265f6
Showing 1 changed file with 63 additions and 16 deletions.
79 changes: 63 additions & 16 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ fn rg_to_dfs(
}
}

/// Load several Parquet row groups as DataFrames while filtering predicate items.
///
/// This strategy works as follows:
///
/// ```text
/// For each Row Group:
/// 1. Skip this row group if statistics already filter it out
/// 2. Load all the data for the columns needed for the predicate (i.e. the live columns)
/// 3. Create a predicate mask.
/// 4. Load the filtered data for the columns not in the predicate (i.e. the dead columns)
/// 5. Merge the columns into the right DataFrame
/// ```
#[allow(clippy::too_many_arguments)]
fn rg_to_dfs_prefiltered(
store: &mmap::ColumnStore,
Expand Down Expand Up @@ -276,14 +288,10 @@ fn rg_to_dfs_prefiltered(
// column indexes of the schema.
let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns);
let mut dead_idx_to_col_idx = Vec::with_capacity(num_dead_columns);
let mut offset = 0;
for (i, field) in schema.iter_values().enumerate() {
if projection_sorted.get(offset).copied() != Some(i) {
continue;
}
for &i in projection_sorted.iter() {
let name = schema.get_at_index(i).unwrap().0.as_str();

offset += 1;
if live_variables.contains(&field.name[..]) {
if live_variables.contains(name) {
live_idx_to_col_idx.push(i);
} else {
dead_idx_to_col_idx.push(i);
Expand All @@ -295,7 +303,7 @@ fn rg_to_dfs_prefiltered(

let mask_setting = PrefilterMaskSetting::init_from_env();

let dfs: Vec<Option<DataFrame>> = POOL.install(|| {
let dfs: Vec<Option<DataFrame>> = POOL.install(move || {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.

Expand Down Expand Up @@ -376,7 +384,7 @@ fn rg_to_dfs_prefiltered(
.then(|| calc_prefilter_cost(&filter_mask))
.unwrap_or_default();

let rg_columns = (0..num_dead_columns)
let mut dead_columns = (0..num_dead_columns)
.into_par_iter()
.map(|i| {
let col_idx = dead_idx_to_col_idx[i];
Expand Down Expand Up @@ -435,15 +443,54 @@ fn rg_to_dfs_prefiltered(
})
.collect::<PolarsResult<Vec<Column>>>()?;

let mut rearranged_schema = df.schema();
rearranged_schema.merge(Schema::from_arrow_schema(schema.as_ref()));
debug_assert!(dead_columns.iter().all(|v| v.len() == df.height()));

debug_assert!(rg_columns.iter().all(|v| v.len() == df.height()));
let mut live_columns = df.take_columns();

assert_eq!(
live_columns.len() + dead_columns.len(),
projection_sorted.len()
);

let mut live_idx = 0;
let mut dead_idx = 0;

// We create need to re-sort the columns by merging the live and dead columns.
let columns = projection_sorted
.iter()
.map(|&i| {
let name = schema.get_at_index(i).unwrap().0.as_str();

if live_variables.contains(name) {
debug_assert!(live_idx < live_columns.len());
// SAFETY: We calculate the amount of live_columns in the same way.
let column = unsafe { live_columns.as_ptr().add(live_idx).read() };
live_idx += 1;
column
} else {
debug_assert!(dead_idx < dead_columns.len());
// SAFETY: We calculate the amount of dead_columns in the same way.
let column = unsafe { dead_columns.as_ptr().add(dead_idx).read() };
dead_idx += 1;
column
}
})
.collect::<Vec<Column>>();

debug_assert_eq!(live_idx, live_columns.len());
debug_assert_eq!(dead_idx, dead_columns.len());
debug_assert_eq!(columns.len(), projection_sorted.len());

// SAFETY: We have now moved all items from live_columns and dead_columns to
// columns. So we should set the length to 0 to avoid a double free.
unsafe {
live_columns.set_len(0);
dead_columns.set_len(0);
}

// We first add the columns with the live columns at the start. Then, we do a
// projections that puts the columns at the right spot.
df._add_columns(rg_columns, &rearranged_schema)?;
let df = df.select(schema.iter_names_cloned())?;
// SAFETY: This is completely based on the schema so all column names are unique
// and the length is given by the parquet file which should always be the same.
let df = unsafe { DataFrame::new_no_checks(columns) };

PolarsResult::Ok(Some(df))
})
Expand Down

0 comments on commit 58265f6

Please sign in to comment.