Skip to content

Commit

Permalink
fix: Subtraction with overflow on negative slice offset in Parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Aug 4, 2024
1 parent d84b3fd commit 69b7ffd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
28 changes: 20 additions & 8 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ impl ParquetExec {
} else {
// Walk the files in reverse until we find the first file, and then translate the
// slice into a positive-offset equivalent.
let n_from_end = -slice.0 as usize;
let slice_start_as_n_from_end = -slice.0 as usize;
let slice_end_as_n_from_end = slice_start_as_n_from_end.saturating_sub(slice.1);
let mut cum_rows = 0;
let chunk_size = 8;
POOL.install(|| {
Expand All @@ -90,7 +91,7 @@ impl ParquetExec {
for (path_idx, rc) in path_indexes.iter().zip(row_counts) {
cum_rows += rc;

if cum_rows >= n_from_end {
if cum_rows >= slice_start_as_n_from_end {
first_file = *path_idx;
break;
}
Expand All @@ -104,8 +105,13 @@ impl ParquetExec {
PolarsResult::Ok(())
})?;

let start = cum_rows - n_from_end;
(start, start + slice.1)
let start = cum_rows.saturating_sub(slice_start_as_n_from_end);
let end = if slice_end_as_n_from_end >= cum_rows {
0
} else {
start + slice.1
};
(start, end)
}
} else {
(0, usize::MAX)
Expand Down Expand Up @@ -256,7 +262,8 @@ impl ParquetExec {
} else {
// Walk the files in reverse until we find the first file, and then translate the
// slice into a positive-offset equivalent.
let n_from_end = -slice.0 as usize;
let slice_start_as_n_from_end = -slice.0 as usize;
let slice_end_as_n_from_end = slice_start_as_n_from_end.saturating_sub(slice.1);
let mut cum_rows = 0;

let paths = &self.paths;
Expand Down Expand Up @@ -290,14 +297,19 @@ impl ParquetExec {

cum_rows += num_rows;

if cum_rows >= n_from_end {
if cum_rows >= slice_start_as_n_from_end {
first_file_idx = path_idx;
break;
}
}

let start = cum_rows - n_from_end;
(start, start + slice.1)
let start = cum_rows.saturating_sub(slice_start_as_n_from_end);
let end = if slice_end_as_n_from_end >= cum_rows {
0
} else {
start + slice.1
};
(start, end)
}
} else {
(0, usize::MAX)
Expand Down
7 changes: 6 additions & 1 deletion py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,9 @@ def test_predicate_push_down_categorical_17744(tmp_path: Path) -> None:


@pytest.mark.parametrize("streaming", [True, False])
def test_parquet_slice_pushdown_nonzero_offset(tmp_path: Path, streaming: bool) -> None:
def test_parquet_slice_pushdown_non_zero_offset(
tmp_path: Path, streaming: bool
) -> None:
paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]
dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]

Expand Down Expand Up @@ -512,3 +514,6 @@ def trim_to_metadata(path: str | Path) -> None:
if not streaming:
assert_frame_equal(pl.scan_parquet(paths).slice(-2, 1).collect(), df)
assert_frame_equal(pl.scan_parquet(paths[:2]).tail(1).collect(), df)
assert_frame_equal(
pl.scan_parquet(paths[1:]).slice(-99, 1).collect(), df.clear()
)

0 comments on commit 69b7ffd

Please sign in to comment.