Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Aug 7, 2024
1 parent 3dda47e commit 18dcecf
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 21 deletions.
22 changes: 17 additions & 5 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::options::{CommentPrefix, NullValuesCompiled};
use super::splitfields::SplitFields;
use super::utils::get_file_chunks;
use crate::path_utils::is_cloud_url;
use crate::utils::get_reader_bytes;
use crate::utils::maybe_decompress_bytes;

/// Read the number of rows without parsing columns
/// useful for count(*) queries
Expand All @@ -25,7 +25,7 @@ pub fn count_rows(
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
let mut reader = if is_cloud_url(path) || config::force_async() {
let file = if is_cloud_url(path) || config::force_async() {
#[cfg(feature = "cloud")]
{
crate::file_cache::FILE_CACHE
Expand All @@ -41,13 +41,25 @@ pub fn count_rows(
} else {
polars_utils::open_file(path)?
};
let reader_bytes = get_reader_bytes(&mut reader)?;

let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];
let mut reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;

for _ in 0..reader_bytes.len() {
if reader_bytes[0] != eol_char {
break;
}

reader_bytes = &reader_bytes[1..];
}

const MIN_ROWS_PER_THREAD: usize = 1024;
let max_threads = POOL.current_num_threads();

// Determine if parallelism is beneficial and how many threads
let n_threads = get_line_stats(
&reader_bytes,
reader_bytes,
MIN_ROWS_PER_THREAD,
eol_char,
None,
Expand All @@ -61,7 +73,7 @@ pub fn count_rows(
.unwrap_or(1);

let file_chunks: Vec<(usize, usize)> = get_file_chunks(
&reader_bytes,
reader_bytes,
n_threads,
None,
separator,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
///
/// # Safety
/// The `out` vec outlives `bytes` (declare `out` first).
pub unsafe fn maybe_decompress_bytes<'a>(
pub fn maybe_decompress_bytes<'a, 'b: 'a>(
bytes: &'a [u8],
out: &'a mut Vec<u8>,
out: &'b mut Vec<u8>,
) -> PolarsResult<&'a [u8]> {
assert!(out.is_empty());
use crate::prelude::is_compressed;
Expand Down
13 changes: 7 additions & 6 deletions crates/polars-mem-engine/src/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ impl CsvExec {
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };

options
.into_reader_with_file_handle(std::io::Cursor::new(unsafe {
maybe_decompress_bytes(mmap.as_ref(), owned)
}?))
.into_reader_with_file_handle(std::io::Cursor::new(
maybe_decompress_bytes(mmap.as_ref(), owned)?,
))
._with_predicate(predicate.clone())
.finish()
}
Expand All @@ -93,9 +93,10 @@ impl CsvExec {
let owned = &mut vec![];

options
.into_reader_with_file_handle(std::io::Cursor::new(unsafe {
maybe_decompress_bytes(mmap.as_ref(), owned)
}?))
.into_reader_with_file_handle(std::io::Cursor::new(maybe_decompress_bytes(
mmap.as_ref(),
owned,
)?))
._with_predicate(predicate.clone())
.finish()
}?;
Expand Down
7 changes: 3 additions & 4 deletions crates/polars-mem-engine/src/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,11 @@ impl JsonExec {

let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];
let curs = std::io::Cursor::new(
match unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) } {
let curs =
std::io::Cursor::new(match maybe_decompress_bytes(mmap.as_ref(), owned) {
Ok(v) => v,
Err(e) => return Some(Err(e)),
},
);
});
let reader = JsonLineReader::new(curs);

let row_index = self.file_scan_options.row_index.as_mut();
Expand Down
6 changes: 2 additions & 4 deletions crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ pub(super) fn csv_file_info(
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];

let mut curs =
std::io::Cursor::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?);
let mut curs = std::io::Cursor::new(maybe_decompress_bytes(mmap.as_ref(), owned)?);

if curs.read(&mut [0; 4])? < 2 && csv_options.raise_if_empty {
polars_bail!(NoData: "empty CSV")
Expand Down Expand Up @@ -324,8 +323,7 @@ pub(super) fn ndjson_file_info(
let owned = &mut vec![];
let mmap = unsafe { memmap::Mmap::map(&f).unwrap() };

let mut reader =
std::io::BufReader::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?);
let mut reader = std::io::BufReader::new(maybe_decompress_bytes(mmap.as_ref(), owned)?);

let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() {
if file_options.row_index.is_none() {
Expand Down
15 changes: 15 additions & 0 deletions py-polars/tests/unit/io/test_lazy_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,18 @@ def test_scan_csv_with_column_names_nonexistent_file() -> None:
# Upon collection, it should fail
with pytest.raises(FileNotFoundError):
result.collect()


def test_scan_csv_compressed_row_count_18057(io_files_path: Path) -> None:
csv_file = io_files_path / "gzipped.csv.gz"

expected = pl.DataFrame(
{"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]}
)
lf = pl.scan_csv(csv_file, truncate_ragged_lines=True)
out = lf.collect()
assert_frame_equal(out, expected)
# This also tests:
# #18070 "CSV count_rows does not skip empty lines at file start"
# as the file has an empty line at the beginning.
assert lf.select(pl.len()).collect().item() == 3

0 comments on commit 18dcecf

Please sign in to comment.