Skip to content

Commit

Permalink
chop log
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Sep 9, 2024
1 parent ebbc27b commit 800f4e6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 9 deletions.
59 changes: 51 additions & 8 deletions crates/polars-io/src/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use polars_core::series::Series;
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
///
/// The `hive_partition_columns` must be ordered by their position in the `reader_schema`
///
/// # Safety
///
/// num_rows equals the height of the df when the df height is non-zero.
Expand All @@ -16,16 +18,57 @@ pub(crate) fn materialize_hive_partitions<D>(
) {
if let Some(hive_columns) = hive_partition_columns {
// Insert these hive columns in the order they are stored in the file.
for s in hive_columns {
let i = match df.get_columns().binary_search_by_key(
&reader_schema.index_of(s.name()).unwrap_or(usize::MAX),
|df_col| reader_schema.index_of(df_col.name()).unwrap_or(usize::MIN),
) {
Ok(i) => i,
Err(i) => i,
if hive_columns.is_empty() {
return;
}

let hive_columns_iter = hive_columns.iter().map(|s| s.new_from_index(0, num_rows));

if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
// Fast-path - all hive columns are at the end
unsafe { df.get_columns_mut() }.extend(hive_columns_iter);
return;
}

let out_width: usize = df.width() + hive_columns.len();
let df_columns = df.get_columns();
let mut out_columns = Vec::with_capacity(out_width);

// We have a slightly involved algorithm here because `reader_schema` may contain extra
// columns that were excluded from a projection pushdown.

let hive_columns = hive_columns_iter.collect::<Vec<_>>();
// Safety: These are both non-empty at the start
let mut series_arr = [df_columns, hive_columns.as_slice()];
let mut schema_idx_arr = [
reader_schema.index_of(series_arr[0][0].name()).unwrap(),
reader_schema.index_of(series_arr[1][0].name()).unwrap(),
];

loop {
let arg_min = if schema_idx_arr[0] < schema_idx_arr[1] {
0
} else {
1
};

df.insert_column(i, s.new_from_index(0, num_rows)).unwrap();
out_columns.push(series_arr[arg_min][0].clone());
series_arr[arg_min] = &series_arr[arg_min][1..];

if series_arr[arg_min].is_empty() {
break;
}

let Some(i) = reader_schema.index_of(series_arr[arg_min][0].name()) else {
break;
};

schema_idx_arr[arg_min] = i;
}

out_columns.extend_from_slice(series_arr[0]);
out_columns.extend_from_slice(series_arr[1]);

*unsafe { df.get_columns_mut() } = out_columns;
}
}
5 changes: 4 additions & 1 deletion crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ impl HivePartitions {
}
}

/// Note: Returned hive partitions are ordered by their position in the `reader_schema`
///
/// # Safety
/// `hive_start_idx <= [min path length]`
pub fn hive_partitions_from_paths(
Expand Down Expand Up @@ -198,10 +200,11 @@ pub fn hive_partitions_from_paths(
}

let mut hive_partitions = Vec::with_capacity(paths.len());
let buffers = buffers
let mut buffers = buffers
.into_iter()
.map(|x| x.into_series())
.collect::<PolarsResult<Vec<_>>>()?;
buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX));

#[allow(clippy::needless_range_loop)]
for i in 0..paths.len() {
Expand Down

0 comments on commit 800f4e6

Please sign in to comment.