Skip to content

Commit

Permalink
feat: Add support for IO[bytes] and bytes in scan_{...} functio…
Browse files Browse the repository at this point in the history
…ns (#18532)
  • Loading branch information
coastalwhite authored Sep 9, 2024
1 parent aa3b2c3 commit ac2456c
Show file tree
Hide file tree
Showing 69 changed files with 1,959 additions and 1,225 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod splitfields;
mod utils;

pub use options::{CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues};
pub use parser::count_rows;
pub use parser::{count_rows, count_rows_from_slice};
pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader};
pub use reader::CsvReader;
pub use schema_inference::infer_file_schema;
58 changes: 34 additions & 24 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::path::PathBuf;
use std::path::Path;

use memchr::memchr2_iter;
use num_traits::Pow;
use polars_core::prelude::*;
use polars_core::{config, POOL};
use polars_error::feature_gated;
use polars_utils::index::Bounded;
use polars_utils::slice::GetSaferUnchecked;
use rayon::prelude::*;
Expand All @@ -18,71 +19,80 @@ use crate::utils::maybe_decompress_bytes;
/// Read the number of rows without parsing columns
/// useful for count(*) queries
pub fn count_rows(
path: &PathBuf,
path: &Path,
separator: u8,
quote_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
let file = if is_cloud_url(path) || config::force_async() {
#[cfg(feature = "cloud")]
{
feature_gated!("cloud", {
crate::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
})
} else {
polars_utils::open_file(path)?
};

let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];
let mut reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;

for _ in 0..reader_bytes.len() {
if reader_bytes[0] != eol_char {
count_rows_from_slice(
reader_bytes,
separator,
quote_char,
comment_prefix,
eol_char,
has_header,
)
}

/// Read the number of rows without parsing columns
/// useful for count(*) queries
pub fn count_rows_from_slice(
mut bytes: &[u8],
separator: u8,
quote_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
for _ in 0..bytes.len() {
if bytes[0] != eol_char {
break;
}

reader_bytes = &reader_bytes[1..];
bytes = &bytes[1..];
}

const MIN_ROWS_PER_THREAD: usize = 1024;
let max_threads = POOL.current_num_threads();

// Determine if parallelism is beneficial and how many threads
let n_threads = get_line_stats(
reader_bytes,
bytes,
MIN_ROWS_PER_THREAD,
eol_char,
None,
separator,
quote_char,
)
.map(|(mean, std)| {
let n_rows = (reader_bytes.len() as f32 / (mean - 0.01 * std)) as usize;
let n_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
(n_rows / MIN_ROWS_PER_THREAD).clamp(1, max_threads)
})
.unwrap_or(1);

let file_chunks: Vec<(usize, usize)> = get_file_chunks(
reader_bytes,
n_threads,
None,
separator,
quote_char,
eol_char,
);
let file_chunks: Vec<(usize, usize)> =
get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);

let iter = file_chunks.into_par_iter().map(|(start, stop)| {
let local_bytes = &reader_bytes[start..stop];
let local_bytes = &bytes[start..stop];
let row_iterator = SplitLines::new(local_bytes, quote_char.unwrap_or(b'"'), eol_char);
if comment_prefix.is_some() {
Ok(row_iterator
Expand Down
4 changes: 1 addition & 3 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ use crate::RowIndex;

#[derive(Clone, Debug, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct IpcScanOptions {
pub memory_map: bool,
}
pub struct IpcScanOptions;

/// Read Arrows IPC format into a DataFrame
///
Expand Down
13 changes: 3 additions & 10 deletions crates/polars-io/src/ipc/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use arrow::io::ipc::read::{Dictionaries, FileMetadata};
use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
use polars_utils::mmap::MMapSemaphore;

use super::ipc_file::IpcReader;
use crate::mmap::{MMapSemaphore, MmapBytesReader};
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::shared::{finish_reader, ArrowReader};
use crate::utils::{apply_projection, columns_to_projection};
Expand All @@ -15,17 +16,9 @@ impl<R: MmapBytesReader> IpcReader<R> {
&mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<DataFrame> {
#[cfg(target_family = "unix")]
use std::os::unix::fs::MetadataExt;
match self.reader.to_file() {
Some(file) => {
#[cfg(target_family = "unix")]
let metadata = file.metadata()?;
let mmap = unsafe { memmap::Mmap::map(file).unwrap() };
#[cfg(target_family = "unix")]
let semaphore = MMapSemaphore::new(metadata.dev(), metadata.ino(), mmap);
#[cfg(not(target_family = "unix"))]
let semaphore = MMapSemaphore::new(mmap);
let semaphore = MMapSemaphore::new_from_file(file)?;
let metadata =
read::read_file_metadata(&mut std::io::Cursor::new(semaphore.as_ref()))?;

Expand Down
89 changes: 10 additions & 79 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,9 @@
#[cfg(target_family = "unix")]
use std::collections::btree_map::Entry;
#[cfg(target_family = "unix")]
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufReader, Cursor, Read, Seek};
use std::sync::Arc;
#[cfg(target_family = "unix")]
use std::sync::Mutex;

use memmap::Mmap;
#[cfg(target_family = "unix")]
use once_cell::sync::Lazy;
use polars_core::config::verbose;
#[cfg(target_family = "unix")]
use polars_error::polars_bail;
use polars_error::PolarsResult;
use polars_utils::mmap::MemSlice;

// Keep track of memory mapped files so we don't write to them while reading
// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
// Write handle in Windows is exclusive, so this is only necessary in Unix.
#[cfg(target_family = "unix")]
static MEMORY_MAPPED_FILES: Lazy<Mutex<BTreeMap<(u64, u64), u32>>> =
Lazy::new(|| Mutex::new(Default::default()));

pub(crate) struct MMapSemaphore {
#[cfg(target_family = "unix")]
key: (u64, u64),
mmap: Mmap,
}

impl MMapSemaphore {
#[cfg(target_family = "unix")]
pub(super) fn new(dev: u64, ino: u64, mmap: Mmap) -> Self {
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
let key = (dev, ino);
guard.insert(key, 1);
Self { key, mmap }
}

#[cfg(not(target_family = "unix"))]
pub(super) fn new(mmap: Mmap) -> Self {
Self { mmap }
}
}

impl AsRef<[u8]> for MMapSemaphore {
#[inline]
fn as_ref(&self) -> &[u8] {
self.mmap.as_ref()
}
}

#[cfg(target_family = "unix")]
impl Drop for MMapSemaphore {
fn drop(&mut self) {
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
if let Entry::Occupied(mut e) = guard.entry(self.key) {
let v = e.get_mut();
*v -= 1;

if *v == 0 {
e.remove_entry();
}
}
}
}

pub fn ensure_not_mapped(#[allow(unused)] file: &File) -> PolarsResult<()> {
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::MetadataExt;
let guard = MEMORY_MAPPED_FILES.lock().unwrap();
let metadata = file.metadata()?;
if guard.contains_key(&(metadata.dev(), metadata.ino())) {
polars_bail!(ComputeError: "cannot write to file: already memory mapped");
}
}
Ok(())
}
use polars_utils::mmap::{MMapSemaphore, MemSlice};

/// Trait used to get a hold to file handler or to the underlying bytes
/// without performing a Read.
Expand All @@ -104,6 +29,12 @@ impl MmapBytesReader for BufReader<File> {
}
}

impl MmapBytesReader for BufReader<&File> {
fn to_file(&self) -> Option<&File> {
Some(self.get_ref())
}
}

impl<T> MmapBytesReader for Cursor<T>
where
T: AsRef<[u8]> + Send + Sync,
Expand Down Expand Up @@ -137,7 +68,7 @@ impl<T: MmapBytesReader> MmapBytesReader for &mut T {
pub enum ReaderBytes<'a> {
Borrowed(&'a [u8]),
Owned(Vec<u8>),
Mapped(memmap::Mmap, &'a File),
Mapped(MMapSemaphore, &'a File),
}

impl std::ops::Deref for ReaderBytes<'_> {
Expand All @@ -146,7 +77,7 @@ impl std::ops::Deref for ReaderBytes<'_> {
match self {
Self::Borrowed(ref_bytes) => ref_bytes,
Self::Owned(vec) => vec,
Self::Mapped(mmap, _) => mmap,
Self::Mapped(mmap, _) => mmap.as_ref(),
}
}
}
Expand Down Expand Up @@ -174,7 +105,7 @@ impl<'a, T: 'a + MmapBytesReader> From<&'a mut T> for ReaderBytes<'a> {
None => {
if let Some(f) = m.to_file() {
let f = unsafe { std::mem::transmute::<&File, &'a File>(f) };
let mmap = unsafe { memmap::Mmap::map(f).unwrap() };
let mmap = MMapSemaphore::new_from_file(f).unwrap();
ReaderBytes::Mapped(mmap, f)
} else {
if verbose() {
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-io/src/path_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub fn expand_paths(
paths: &[PathBuf],
glob: bool,
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
) -> PolarsResult<Arc<Vec<PathBuf>>> {
) -> PolarsResult<Arc<[PathBuf]>> {
expand_paths_hive(paths, glob, cloud_options, false).map(|x| x.0)
}

Expand Down Expand Up @@ -129,7 +129,7 @@ pub fn expand_paths_hive(
glob: bool,
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
check_directory_level: bool,
) -> PolarsResult<(Arc<Vec<PathBuf>>, usize)> {
) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
let Some(first_path) = paths.first() else {
return Ok((vec![].into(), 0));
};
Expand Down Expand Up @@ -361,7 +361,7 @@ pub fn expand_paths_hive(
out_paths
};

Ok((Arc::new(out_paths), hive_idx_tracker.idx))
Ok((out_paths.into(), hive_idx_tracker.idx))
}

/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-io/src/utils/byte_source.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ops::Range;
use std::sync::Arc;

use polars_error::{to_compute_err, PolarsResult};
use polars_error::PolarsResult;
use polars_utils::_limit_path_len_io_err;
use polars_utils::mmap::MemSlice;

Expand Down Expand Up @@ -34,9 +34,8 @@ impl MemSliceByteSource {
.into_std()
.await,
);
let mmap = Arc::new(unsafe { memmap::Mmap::map(file.as_ref()) }.map_err(to_compute_err)?);

Ok(Self(MemSlice::from_mmap(mmap)))
Ok(Self(MemSlice::from_file(file.as_ref())?))
}
}

Expand Down
6 changes: 5 additions & 1 deletion crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::prelude::*;
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_error::to_compute_err;
use polars_utils::mmap::MMapSemaphore;
use regex::{Regex, RegexBuilder};

use crate::mmap::{MmapBytesReader, ReaderBytes};
Expand All @@ -21,12 +22,15 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
.ok()
.and_then(|offset| Some((reader.to_file()?, offset)))
{
let mmap = unsafe { memmap::MmapOptions::new().offset(offset).map(file)? };
let mut options = memmap::MmapOptions::new();
options.offset(offset);

// somehow bck thinks borrows alias
// this is sound as file was already bound to 'a
use std::fs::File;

let file = unsafe { std::mem::transmute::<&File, &'a File>(file) };
let mmap = MMapSemaphore::new_from_file_with_options(file, options)?;
Ok(ReaderBytes::Mapped(mmap, file))
} else {
// we can get the bytes for free
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ pub mod dsl;
pub mod frame;
pub mod physical_plan;
pub mod prelude;

mod scan;
#[cfg(test)]
mod tests;
Loading

0 comments on commit ac2456c

Please sign in to comment.