From 1de71ad99dd9abc4f4596e2a07dd1ab3dff24d63 Mon Sep 17 00:00:00 2001 From: jinliu Date: Sun, 8 Sep 2024 14:02:43 +0800 Subject: [PATCH 1/5] fix materialize_hive_partitions --- crates/polars-io/src/hive.rs | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/crates/polars-io/src/hive.rs b/crates/polars-io/src/hive.rs index b027e6d1d054..8ddbdef56362 100644 --- a/crates/polars-io/src/hive.rs +++ b/crates/polars-io/src/hive.rs @@ -15,27 +15,17 @@ pub(crate) fn materialize_hive_partitions( num_rows: usize, ) { if let Some(hive_columns) = hive_partition_columns { - let Some(first) = hive_columns.first() else { - return; - }; + // Insert these hive columns in the order they are stored in the file. + for s in hive_columns { + let i = match df.get_columns().binary_search_by_key( + &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), + |s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN), + ) { + Ok(i) => i, + Err(i) => i, + }; - if reader_schema.index_of(first.name()).is_some() { - // Insert these hive columns in the order they are stored in the file. - for s in hive_columns { - let i = match df.get_columns().binary_search_by_key( - &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), - |s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN), - ) { - Ok(i) => i, - Err(i) => i, - }; - - df.insert_column(i, s.new_from_index(0, num_rows)).unwrap(); - } - } else { - for s in hive_columns { - unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) }; - } + df.insert_column(i, s.new_from_index(0, num_rows)).unwrap(); } } } From 744fdf6b3b25e53bdfbe665882641206f2ac25bd Mon Sep 17 00:00:00 2001 From: jinliu Date: Sun, 8 Sep 2024 14:14:47 +0800 Subject: [PATCH 2/5] unit tests --- crates/polars-io/src/hive.rs | 2 +- py-polars/tests/unit/io/test_hive.py | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/crates/polars-io/src/hive.rs b/crates/polars-io/src/hive.rs index 8ddbdef56362..5991e680111e 100644 --- a/crates/polars-io/src/hive.rs +++ b/crates/polars-io/src/hive.rs @@ -19,7 +19,7 @@ pub(crate) fn materialize_hive_partitions( for s in hive_columns { let i = match df.get_columns().binary_search_by_key( &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), - |s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN), + |df_col| reader_schema.index_of(df_col.name()).unwrap_or(usize::MIN), ) { Ok(i) => i, Err(i) => i, diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index ad285b82f3b3..16ce3cc43089 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -554,6 +554,30 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: ) assert_with_projections(lf, rhs) + # partial cols in file + df = pl.DataFrame( + {"x": 1, "b": 2, "y": 1}, + schema={"x": pl.Int32, "b": pl.Int16, "y": pl.Int32}, + ) + write_func(df, path) + + lf = scan_func(path, hive_partitioning=True) # type: ignore[call-arg] + rhs = df + assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs) + assert_with_projections(lf, rhs) + + lf = scan_func( # type: ignore[call-arg] + path, + hive_schema={"a": pl.String, "b": pl.String}, + hive_partitioning=True, + ) + rhs = df.with_columns(pl.col("a", "b").cast(pl.String)) + assert_frame_equal( + lf.collect(projection_pushdown=projection_pushdown), + rhs, + ) + assert_with_projections(lf, rhs) + @pytest.mark.write_disk def test_hive_partition_dates(tmp_path: Path) -> None: From d12b89a50b8f217cd7fa6e6d505c52ddcfc20f8f Mon Sep 17 00:00:00 2001 From: jinliu Date: Sun, 8 Sep 2024 14:27:20 +0800 Subject: [PATCH 3/5] fix tests --- py-polars/tests/unit/io/test_hive.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 16ce3cc43089..f98e87053451 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -555,19 +555,20 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: assert_with_projections(lf, rhs) # partial cols in file + partial_path = tmp_path / "a=1/b=2/partial_data.bin" df = pl.DataFrame( {"x": 1, "b": 2, "y": 1}, schema={"x": pl.Int32, "b": pl.Int16, "y": pl.Int32}, ) - write_func(df, path) + write_func(df, partial_path) - lf = scan_func(path, hive_partitioning=True) # type: ignore[call-arg] + lf = scan_func(partial_path, hive_partitioning=True) # type: ignore[call-arg] rhs = df assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs) assert_with_projections(lf, rhs) lf = scan_func( # type: ignore[call-arg] - path, + partial_path, hive_schema={"a": pl.String, "b": pl.String}, hive_partitioning=True, ) From ebbc27b4a57e30bf3798020232016b64a2a40778 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Mon, 9 Sep 2024 21:32:37 +1000 Subject: [PATCH 4/5] exclude partial hive columns from projection --- .../src/plans/conversion/dsl_to_ir.rs | 41 +++++++++---------- py-polars/tests/unit/io/test_hive.py | 15 ++++++- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 5ab23be19a14..2ab5e0fd0c13 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -1068,27 +1068,26 @@ pub(crate) fn maybe_init_projection_excluding_hive( // Update `with_columns` with a projection so that hive columns aren't loaded from the // file let hive_parts = hive_parts?; - let hive_schema = hive_parts.schema(); - let (first_hive_name, _) = hive_schema.get_at_index(0)?; - - // TODO: Optimize this - let names = match reader_schema { - Either::Left(ref v) => v - .contains(first_hive_name.as_str()) - .then(|| v.iter_names_cloned().collect::>()), - Either::Right(ref v) => v - .contains(first_hive_name.as_str()) - .then(|| v.iter_names_cloned().collect()), - }; - - let names = names?; - - Some( - names - .into_iter() - .filter(|x| !hive_schema.contains(x)) - .collect::>(), - ) + match &reader_schema { + Either::Left(reader_schema) => hive_schema + .iter_names() + .any(|x| reader_schema.contains(x)) + .then(|| { + reader_schema + .iter_names_cloned() + .filter(|x| !hive_schema.contains(x)) + .collect::>() + }), + Either::Right(reader_schema) => hive_schema + .iter_names() + .any(|x| reader_schema.contains(x)) + .then(|| { + reader_schema + .iter_names_cloned() + .filter(|x| !hive_schema.contains(x)) + .collect::>() + }), + } } diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index f98e87053451..a01a2ef6e59d 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -562,8 +562,14 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: ) write_func(df, partial_path) + rhs = rhs.select( + pl.col("x").cast(pl.Int32), + pl.col("b").cast(pl.Int16), + pl.col("y").cast(pl.Int32), + pl.col("a").cast(pl.Int64), + ) + lf = scan_func(partial_path, hive_partitioning=True) # type: ignore[call-arg] - rhs = df assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs) assert_with_projections(lf, rhs) @@ -572,7 +578,12 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: hive_schema={"a": pl.String, "b": pl.String}, hive_partitioning=True, ) - rhs = df.with_columns(pl.col("a", "b").cast(pl.String)) + rhs = rhs.select( + pl.col("x").cast(pl.Int32), + pl.col("b").cast(pl.String), + pl.col("y").cast(pl.Int32), + pl.col("a").cast(pl.String), + ) assert_frame_equal( lf.collect(projection_pushdown=projection_pushdown), rhs, From 800f4e6a4f4df3bf3d74429f48579af8cfc1303b Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Mon, 9 Sep 2024 23:00:33 +1000 Subject: [PATCH 5/5] chop log --- crates/polars-io/src/hive.rs | 59 ++++++++++++++++++++++++---- crates/polars-plan/src/plans/hive.rs | 5 ++- 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/crates/polars-io/src/hive.rs b/crates/polars-io/src/hive.rs index 5991e680111e..17ace26d6be7 100644 --- a/crates/polars-io/src/hive.rs +++ b/crates/polars-io/src/hive.rs @@ -5,6 +5,8 @@ use polars_core::series::Series; /// We have a special num_rows arg, as df can be empty when a projection contains /// only hive partition columns. /// +/// The `hive_partition_columns` must be ordered by their position in the `reader_schema` +/// /// # Safety /// /// num_rows equals the height of the df when the df height is non-zero. @@ -16,16 +18,57 @@ pub(crate) fn materialize_hive_partitions( ) { if let Some(hive_columns) = hive_partition_columns { // Insert these hive columns in the order they are stored in the file. - for s in hive_columns { - let i = match df.get_columns().binary_search_by_key( - &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), - |df_col| reader_schema.index_of(df_col.name()).unwrap_or(usize::MIN), - ) { - Ok(i) => i, - Err(i) => i, + if hive_columns.is_empty() { + return; + } + + let hive_columns_iter = hive_columns.iter().map(|s| s.new_from_index(0, num_rows)); + + if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 { + // Fast-path - all hive columns are at the end + unsafe { df.get_columns_mut() }.extend(hive_columns_iter); + return; + } + + let out_width: usize = df.width() + hive_columns.len(); + let df_columns = df.get_columns(); + let mut out_columns = Vec::with_capacity(out_width); + + // We have a slightly involved algorithm here because `reader_schema` may contain extra + // columns that were excluded from a projection pushdown. + + let hive_columns = hive_columns_iter.collect::>(); + // Safety: These are both non-empty at the start + let mut series_arr = [df_columns, hive_columns.as_slice()]; + let mut schema_idx_arr = [ + reader_schema.index_of(series_arr[0][0].name()).unwrap(), + reader_schema.index_of(series_arr[1][0].name()).unwrap(), + ]; + + loop { + let arg_min = if schema_idx_arr[0] < schema_idx_arr[1] { + 0 + } else { + 1 }; - df.insert_column(i, s.new_from_index(0, num_rows)).unwrap(); + out_columns.push(series_arr[arg_min][0].clone()); + series_arr[arg_min] = &series_arr[arg_min][1..]; + + if series_arr[arg_min].is_empty() { + break; + } + + let Some(i) = reader_schema.index_of(series_arr[arg_min][0].name()) else { + break; + }; + + schema_idx_arr[arg_min] = i; } + + out_columns.extend_from_slice(series_arr[0]); + out_columns.extend_from_slice(series_arr[1]); + + *unsafe { df.get_columns_mut() } = out_columns; } } diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 3fc7531ea2b3..a711aeb11848 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -57,6 +57,8 @@ impl HivePartitions { } } +/// Note: Returned hive partitions are ordered by their position in the `reader_schema` +/// /// # Safety /// `hive_start_idx <= [min path length]` pub fn hive_partitions_from_paths( @@ -198,10 +200,11 @@ pub fn hive_partitions_from_paths( } let mut hive_partitions = Vec::with_capacity(paths.len()); - let buffers = buffers + let mut buffers = buffers .into_iter() .map(|x| x.into_series()) .collect::>>()?; + buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX)); #[allow(clippy::needless_range_loop)] for i in 0..paths.len() {