Skip to content

Commit

Permalink
fix: Dropped/shifted rows in parquet scan with streaming=True (#18766)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Sep 17, 2024
1 parent 6ef1f1e commit 55e7fec
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,18 +604,15 @@ fn rg_to_dfs_par_over_rg(
continue;
}

row_groups.push((i, rg_md, rg_slice, row_count_start));
row_groups.push((rg_md, rg_slice, row_count_start));
}

let dfs = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
row_groups
.into_par_iter()
.enumerate()
.map(|(iter_idx, (_rg_idx, _md, slice, row_count_start))| {
let md = &file_metadata.row_groups[iter_idx];

.map(|(md, slice, row_count_start)| {
if slice.1 == 0 || use_statistics && !read_this_row_group(predicate, md, schema)? {
return Ok(None);
}
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "py-polars"
version = "1.7.1"
version = "1.7.2"
edition = "2021"

[lib]
Expand Down
12 changes: 12 additions & 0 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,3 +529,15 @@ def trim_to_metadata(path: str | Path) -> None:
assert_frame_equal(
pl.scan_parquet(path).slice(-1, (1 << 32) - 1).collect(), df.tail(1)
)


@pytest.mark.parametrize("streaming", [True, False])
def test_parquet_row_groups_shift_bug_18739(tmp_path: Path, streaming: bool) -> None:
tmp_path.mkdir(exist_ok=True)
path = tmp_path / "data.bin"

df = pl.DataFrame({"id": range(100)})
df.write_parquet(path, row_group_size=1)

lf = pl.scan_parquet(path)
assert_frame_equal(df, lf.collect(streaming=streaming))

0 comments on commit 55e7fec

Please sign in to comment.