From 18dcecf327becbc00b3463ae4dcef4fcdea1ac7c Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 7 Aug 2024 18:22:39 +1000 Subject: [PATCH] c --- crates/polars-io/src/csv/read/parser.rs | 22 ++++++++++++++----- crates/polars-io/src/utils/other.rs | 4 ++-- .../src/executors/scan/csv.rs | 13 ++++++----- .../src/executors/scan/ndjson.rs | 7 +++--- .../polars-plan/src/plans/conversion/scans.rs | 6 ++--- py-polars/tests/unit/io/test_lazy_csv.py | 15 +++++++++++++ 6 files changed, 46 insertions(+), 21 deletions(-) diff --git a/crates/polars-io/src/csv/read/parser.rs b/crates/polars-io/src/csv/read/parser.rs index 047520e098da..f96f6e04b1b2 100644 --- a/crates/polars-io/src/csv/read/parser.rs +++ b/crates/polars-io/src/csv/read/parser.rs @@ -13,7 +13,7 @@ use super::options::{CommentPrefix, NullValuesCompiled}; use super::splitfields::SplitFields; use super::utils::get_file_chunks; use crate::path_utils::is_cloud_url; -use crate::utils::get_reader_bytes; +use crate::utils::maybe_decompress_bytes; /// Read the number of rows without parsing columns /// useful for count(*) queries @@ -25,7 +25,7 @@ pub fn count_rows( eol_char: u8, has_header: bool, ) -> PolarsResult { - let mut reader = if is_cloud_url(path) || config::force_async() { + let file = if is_cloud_url(path) || config::force_async() { #[cfg(feature = "cloud")] { crate::file_cache::FILE_CACHE @@ -41,13 +41,25 @@ pub fn count_rows( } else { polars_utils::open_file(path)? }; - let reader_bytes = get_reader_bytes(&mut reader)?; + + let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; + let owned = &mut vec![]; + let mut reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?; + + for _ in 0..reader_bytes.len() { + if reader_bytes[0] != eol_char { + break; + } + + reader_bytes = &reader_bytes[1..]; + } + const MIN_ROWS_PER_THREAD: usize = 1024; let max_threads = POOL.current_num_threads(); // Determine if parallelism is beneficial and how many threads let n_threads = get_line_stats( - &reader_bytes, + reader_bytes, MIN_ROWS_PER_THREAD, eol_char, None, @@ -61,7 +73,7 @@ pub fn count_rows( .unwrap_or(1); let file_chunks: Vec<(usize, usize)> = get_file_chunks( - &reader_bytes, + reader_bytes, n_threads, None, separator, diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index 4df6a6910997..fd885987aeb8 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -47,9 +47,9 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>( /// /// # Safety /// The `out` vec outlives `bytes` (declare `out` first). -pub unsafe fn maybe_decompress_bytes<'a>( +pub fn maybe_decompress_bytes<'a, 'b: 'a>( bytes: &'a [u8], - out: &'a mut Vec, + out: &'b mut Vec, ) -> PolarsResult<&'a [u8]> { assert!(out.is_empty()); use crate::prelude::is_compressed; diff --git a/crates/polars-mem-engine/src/executors/scan/csv.rs b/crates/polars-mem-engine/src/executors/scan/csv.rs index 3310e1a8edb5..936d602afc5f 100644 --- a/crates/polars-mem-engine/src/executors/scan/csv.rs +++ b/crates/polars-mem-engine/src/executors/scan/csv.rs @@ -77,9 +77,9 @@ impl CsvExec { let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; options - .into_reader_with_file_handle(std::io::Cursor::new(unsafe { - maybe_decompress_bytes(mmap.as_ref(), owned) - }?)) + .into_reader_with_file_handle(std::io::Cursor::new( + maybe_decompress_bytes(mmap.as_ref(), owned)?, + )) ._with_predicate(predicate.clone()) .finish() } @@ -93,9 +93,10 @@ impl CsvExec { let owned = &mut vec![]; options - .into_reader_with_file_handle(std::io::Cursor::new(unsafe { - maybe_decompress_bytes(mmap.as_ref(), owned) - }?)) + .into_reader_with_file_handle(std::io::Cursor::new(maybe_decompress_bytes( + mmap.as_ref(), + owned, + )?)) ._with_predicate(predicate.clone()) .finish() }?; diff --git a/crates/polars-mem-engine/src/executors/scan/ndjson.rs b/crates/polars-mem-engine/src/executors/scan/ndjson.rs index 9d17232c88fe..5e17a289eac7 100644 --- a/crates/polars-mem-engine/src/executors/scan/ndjson.rs +++ b/crates/polars-mem-engine/src/executors/scan/ndjson.rs @@ -98,12 +98,11 @@ impl JsonExec { let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; let owned = &mut vec![]; - let curs = std::io::Cursor::new( - match unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) } { + let curs = + std::io::Cursor::new(match maybe_decompress_bytes(mmap.as_ref(), owned) { Ok(v) => v, Err(e) => return Some(Err(e)), - }, - ); + }); let reader = JsonLineReader::new(curs); let row_index = self.file_scan_options.row_index.as_mut(); diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 19ba780f5a25..959327148f6c 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -191,8 +191,7 @@ pub(super) fn csv_file_info( let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; let owned = &mut vec![]; - let mut curs = - std::io::Cursor::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?); + let mut curs = std::io::Cursor::new(maybe_decompress_bytes(mmap.as_ref(), owned)?); if curs.read(&mut [0; 4])? < 2 && csv_options.raise_if_empty { polars_bail!(NoData: "empty CSV") @@ -324,8 +323,7 @@ pub(super) fn ndjson_file_info( let owned = &mut vec![]; let mmap = unsafe { memmap::Mmap::map(&f).unwrap() }; - let mut reader = - std::io::BufReader::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?); + let mut reader = std::io::BufReader::new(maybe_decompress_bytes(mmap.as_ref(), owned)?); let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { if file_options.row_index.is_none() { diff --git a/py-polars/tests/unit/io/test_lazy_csv.py b/py-polars/tests/unit/io/test_lazy_csv.py index 5672c4b1b7c4..c2351ec109bc 100644 --- a/py-polars/tests/unit/io/test_lazy_csv.py +++ b/py-polars/tests/unit/io/test_lazy_csv.py @@ -438,3 +438,18 @@ def test_scan_csv_with_column_names_nonexistent_file() -> None: # Upon collection, it should fail with pytest.raises(FileNotFoundError): result.collect() + + +def test_scan_csv_compressed_row_count_18057(io_files_path: Path) -> None: + csv_file = io_files_path / "gzipped.csv.gz" + + expected = pl.DataFrame( + {"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [1.0, 2.0, 3.0]} + ) + lf = pl.scan_csv(csv_file, truncate_ragged_lines=True) + out = lf.collect() + assert_frame_equal(out, expected) + # This also tests: + # #18070 "CSV count_rows does not skip empty lines at file start" + # as the file has an empty line at the beginning. + assert lf.select(pl.len()).collect().item() == 3