diff --git a/crates/polars-mem-engine/src/executors/scan/csv.rs b/crates/polars-mem-engine/src/executors/scan/csv.rs index c00a0047d525..0ebcb7632ae7 100644 --- a/crates/polars-mem-engine/src/executors/scan/csv.rs +++ b/crates/polars-mem-engine/src/executors/scan/csv.rs @@ -55,9 +55,9 @@ impl CsvExec { let verbose = config::verbose(); let force_async = config::force_async(); - let run_async = (self.sources.is_files() && force_async) || self.sources.is_cloud_url(); + let run_async = (self.sources.is_paths() && force_async) || self.sources.is_cloud_url(); - if self.sources.is_files() && force_async && verbose { + if self.sources.is_paths() && force_async && verbose { eprintln!("ASYNC READING FORCED"); } @@ -75,7 +75,7 @@ impl CsvExec { .finish()?; if let Some(col) = &self.file_options.include_file_paths { - let name = source.to_file_path(); + let name = source.to_include_path_name(); unsafe { df.with_column_unchecked( diff --git a/crates/polars-mem-engine/src/executors/scan/ipc.rs b/crates/polars-mem-engine/src/executors/scan/ipc.rs index acbcc2d28dd6..edde4765fec5 100644 --- a/crates/polars-mem-engine/src/executors/scan/ipc.rs +++ b/crates/polars-mem-engine/src/executors/scan/ipc.rs @@ -29,7 +29,7 @@ impl IpcExec { }; let force_async = config::force_async(); - let mut out = if is_cloud || (self.sources.is_files() && force_async) { + let mut out = if is_cloud || (self.sources.is_paths() && force_async) { feature_gated!("cloud", { if force_async && config::verbose() { eprintln!("ASYNC READING FORCED"); @@ -102,7 +102,7 @@ impl IpcExec { self.file_options .include_file_paths .as_ref() - .map(|x| (x.clone(), Arc::from(source.to_file_path()))), + .map(|x| (x.clone(), Arc::from(source.to_include_path_name()))), ) .finish() }; diff --git a/crates/polars-mem-engine/src/executors/scan/ndjson.rs b/crates/polars-mem-engine/src/executors/scan/ndjson.rs index 06e1d18892c6..a662760fd54b 100644 --- a/crates/polars-mem-engine/src/executors/scan/ndjson.rs +++ b/crates/polars-mem-engine/src/executors/scan/ndjson.rs @@ -39,9 +39,9 @@ impl JsonExec { let verbose = config::verbose(); let force_async = config::force_async(); - let run_async = (self.sources.is_files() && force_async) || self.sources.is_cloud_url(); + let run_async = (self.sources.is_paths() && force_async) || self.sources.is_cloud_url(); - if self.sources.is_files() && force_async && verbose { + if self.sources.is_paths() && force_async && verbose { eprintln!("ASYNC READING FORCED"); } @@ -108,7 +108,7 @@ impl JsonExec { } if let Some(col) = &self.file_scan_options.include_file_paths { - let name = source.to_file_path(); + let name = source.to_include_path_name(); unsafe { df.with_column_unchecked( StringChunked::full(col.clone(), name, df.height()).into_series(), diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 2f32e0b50aa3..a37fc7c42f33 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -161,7 +161,7 @@ impl ParquetExec { self.file_options .include_file_paths .as_ref() - .map(|x| (x.clone(), Arc::from(source.to_file_path()))), + .map(|x| (x.clone(), Arc::from(source.to_include_path_name()))), ); reader @@ -453,7 +453,7 @@ impl ParquetExec { let is_cloud = self.sources.is_cloud_url(); let force_async = config::force_async(); - let out = if is_cloud || (self.sources.is_files() && force_async) { + let out = if is_cloud || (self.sources.is_paths() && force_async) { feature_gated!("cloud", { if force_async && config::verbose() { eprintln!("ASYNC READING FORCED"); diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 2b20d9fe932a..25dd61aa1eb9 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -152,7 +152,7 @@ pub(super) fn csv_file_info( // * See if we can do this without downloading the entire file // prints the error message if paths is empty. - let run_async = sources.is_cloud_url() || (sources.is_files() && config::force_async()); + let run_async = sources.is_cloud_url() || (sources.is_paths() && config::force_async()); let cache_entries = { if run_async { @@ -268,7 +268,7 @@ pub(super) fn ndjson_file_info( polars_bail!(ComputeError: "expected at least 1 source"); }; - let run_async = sources.is_cloud_url() || (sources.is_files() && config::force_async()); + let run_async = sources.is_cloud_url() || (sources.is_paths() && config::force_async()); let cache_entries = { if run_async { diff --git a/crates/polars-plan/src/plans/functions/count.rs b/crates/polars-plan/src/plans/functions/count.rs index 0b16c8eac994..7375ff47ff31 100644 --- a/crates/polars-plan/src/plans/functions/count.rs +++ b/crates/polars-plan/src/plans/functions/count.rs @@ -226,7 +226,7 @@ pub(super) fn count_rows_ndjson( } let is_cloud_url = sources.is_cloud_url(); - let run_async = is_cloud_url || (sources.is_files() && config::force_async()); + let run_async = is_cloud_url || (sources.is_paths() && config::force_async()); let cache_entries = { if run_async { diff --git a/crates/polars-plan/src/plans/ir/mod.rs b/crates/polars-plan/src/plans/ir/mod.rs index cf0b5ee8df7d..a9eb45b6406f 100644 --- a/crates/polars-plan/src/plans/ir/mod.rs +++ b/crates/polars-plan/src/plans/ir/mod.rs @@ -1,22 +1,20 @@ mod dot; mod format; mod inputs; +mod scan_sources; mod schema; pub(crate) mod tree_format; use std::borrow::Cow; use std::fmt; -use std::fs::File; -use std::path::{Path, PathBuf}; pub use dot::{EscapeLabel, IRDotDisplay, PathsDisplay, ScanSourcesDisplay}; pub use format::{ExprIRDisplay, IRDisplay}; use hive::HivePartitions; -use polars_core::error::feature_gated; use polars_core::prelude::*; use polars_utils::idx_vec::UnitVec; -use polars_utils::mmap::MemSlice; use polars_utils::unitvec; +pub use scan_sources::{ScanSourceIter, ScanSourceRef, ScanSources}; #[cfg(feature = "ir_serde")] use serde::{Deserialize, Serialize}; @@ -36,246 +34,6 @@ pub struct IRPlanRef<'a> { pub expr_arena: &'a Arena, } -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -#[derive(Debug, Clone)] -pub enum ScanSources { - Paths(Arc<[PathBuf]>), - - #[cfg_attr(feature = "serde", serde(skip))] - Files(Arc<[File]>), - #[cfg_attr(feature = "serde", serde(skip))] - Buffers(Arc<[bytes::Bytes]>), -} - -impl std::hash::Hash for ScanSources { - fn hash(&self, state: &mut H) { - std::mem::discriminant(self).hash(state); - - // @NOTE: This is a bit crazy - match self { - Self::Paths(paths) => paths.hash(state), - Self::Files(files) => files.as_ptr().hash(state), - Self::Buffers(buffers) => buffers.as_ptr().hash(state), - } - } -} - -impl PartialEq for ScanSources { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (ScanSources::Paths(l), ScanSources::Paths(r)) => l == r, - _ => false, - } - } -} - -impl Eq for ScanSources {} - -#[derive(Debug, Clone, Copy)] -pub enum ScanSourceRef<'a> { - Path(&'a Path), - File(&'a File), - Buffer(&'a bytes::Bytes), -} - -pub struct ScanSourceSliceInfo { - pub item_slice: std::ops::Range, - pub source_slice: std::ops::Range, -} - -impl Default for ScanSources { - fn default() -> Self { - Self::Buffers(Arc::default()) - } -} - -impl<'a> ScanSourceRef<'a> { - pub fn to_file_path(&self) -> &str { - match self { - Self::Path(path) => path.to_str().unwrap(), - Self::File(_) => "open-file", - Self::Buffer(_) => "in-mem", - } - } - - pub fn to_memslice(&self) -> PolarsResult { - self.to_memslice_possibly_async(false, None, 0) - } - - pub fn to_memslice_async_latest(&self, run_async: bool) -> PolarsResult { - match self { - ScanSourceRef::Path(path) => { - let file = if run_async { - feature_gated!("cloud", { - polars_io::file_cache::FILE_CACHE - .get_entry(path.to_str().unwrap()) - // Safety: This was initialized by schema inference. - .unwrap() - .try_open_assume_latest()? - }) - } else { - polars_utils::open_file(path)? - }; - - Ok(MemSlice::from_mmap(Arc::new(unsafe { - memmap::Mmap::map(&file)? - }))) - }, - ScanSourceRef::File(file) => Ok(MemSlice::from_mmap(Arc::new(unsafe { - memmap::Mmap::map(*file)? - }))), - ScanSourceRef::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())), - } - } - - pub fn to_memslice_possibly_async( - &self, - run_async: bool, - #[cfg(feature = "cloud")] cache_entries: Option< - &Vec>, - >, - #[cfg(not(feature = "cloud"))] cache_entries: Option<&()>, - index: usize, - ) -> PolarsResult { - match self { - Self::Path(path) => { - let f = if run_async { - feature_gated!("cloud", { - cache_entries.unwrap()[index].try_open_check_latest()? - }) - } else { - polars_utils::open_file(path)? - }; - - let mmap = unsafe { memmap::Mmap::map(&f)? }; - Ok(MemSlice::from_mmap(Arc::new(mmap))) - }, - Self::File(file) => { - let mmap = unsafe { memmap::Mmap::map(*file)? }; - Ok(MemSlice::from_mmap(Arc::new(mmap))) - }, - Self::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())), - } - } -} - -impl ScanSources { - pub fn iter(&self) -> ScanSourceIter { - ScanSourceIter { - sources: self, - offset: 0, - } - } - - pub fn as_paths(&self) -> Option<&[PathBuf]> { - match self { - Self::Paths(paths) => Some(paths.as_ref()), - Self::Files(_) | Self::Buffers(_) => None, - } - } - - pub fn into_paths(&self) -> Option> { - match self { - Self::Paths(paths) => Some(paths.clone()), - Self::Files(_) | Self::Buffers(_) => None, - } - } - - pub fn first_path(&self) -> Option<&Path> { - match self { - Self::Paths(paths) => paths.first().map(|p| p.as_path()), - Self::Files(_) | Self::Buffers(_) => None, - } - } - - pub fn to_dsl(self, is_expanded: bool) -> DslScanSources { - DslScanSources { - sources: self, - is_expanded, - } - } - - pub fn is_files(&self) -> bool { - matches!(self, Self::Paths(_)) - } - - pub fn is_cloud_url(&self) -> bool { - match self { - Self::Paths(paths) => paths.first().map_or(false, polars_io::is_cloud_url), - Self::Files(_) | Self::Buffers(_) => false, - } - } - - pub fn len(&self) -> usize { - match self { - Self::Paths(s) => s.len(), - Self::Files(s) => s.len(), - Self::Buffers(s) => s.len(), - } - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn first(&self) -> Option { - self.get(0) - } - - pub fn id(&self) -> PlSmallStr { - if self.is_empty() { - return PlSmallStr::from_static("EMPTY"); - } - - match self { - Self::Paths(paths) => { - PlSmallStr::from_str(paths.first().unwrap().to_string_lossy().as_ref()) - }, - Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"), - Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"), - } - } - - pub fn get(&self, idx: usize) -> Option { - match self { - Self::Paths(paths) => paths.get(idx).map(|p| ScanSourceRef::Path(p)), - Self::Files(files) => files.get(idx).map(ScanSourceRef::File), - Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer), - } - } - - pub fn at(&self, idx: usize) -> ScanSourceRef { - self.get(idx).unwrap() - } -} - -pub struct ScanSourceIter<'a> { - sources: &'a ScanSources, - offset: usize, -} - -impl<'a> Iterator for ScanSourceIter<'a> { - type Item = ScanSourceRef<'a>; - - fn next(&mut self) -> Option { - let item = match self.sources { - ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?), - ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?), - ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?), - }; - - self.offset += 1; - Some(item) - } - - fn size_hint(&self) -> (usize, Option) { - let len = self.sources.len() - self.offset; - (len, Some(len)) - } -} - -impl<'a> ExactSizeIterator for ScanSourceIter<'a> {} - /// [`IR`] is a representation of [`DslPlan`] with [`Node`]s which are allocated in an [`Arena`] /// In this IR the logical plan has access to the full dataset. #[derive(Clone, Debug, Default)] diff --git a/crates/polars-plan/src/plans/ir/scan_sources.rs b/crates/polars-plan/src/plans/ir/scan_sources.rs new file mode 100644 index 000000000000..5261d6ede706 --- /dev/null +++ b/crates/polars-plan/src/plans/ir/scan_sources.rs @@ -0,0 +1,270 @@ +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use polars_core::error::{feature_gated, PolarsResult}; +use polars_utils::mmap::MemSlice; +use polars_utils::pl_str::PlSmallStr; + +use super::DslScanSources; + +/// Set of sources to scan from +/// +/// This is can either be a list of paths to files, opened files or in-memory buffers. Mixing of +/// buffers is not currently possible. +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[derive(Debug, Clone)] +pub enum ScanSources { + Paths(Arc<[PathBuf]>), + + #[cfg_attr(feature = "serde", serde(skip))] + Files(Arc<[File]>), + #[cfg_attr(feature = "serde", serde(skip))] + Buffers(Arc<[bytes::Bytes]>), +} + +/// A reference to a single item in [`ScanSources`] +#[derive(Debug, Clone, Copy)] +pub enum ScanSourceRef<'a> { + Path(&'a Path), + File(&'a File), + Buffer(&'a bytes::Bytes), +} + +/// An iterator for [`ScanSources`] +pub struct ScanSourceIter<'a> { + sources: &'a ScanSources, + offset: usize, +} + +impl Default for ScanSources { + fn default() -> Self { + Self::Buffers(Arc::default()) + } +} + +impl std::hash::Hash for ScanSources { + fn hash(&self, state: &mut H) { + std::mem::discriminant(self).hash(state); + + // @NOTE: This is a bit crazy + // + // We don't really want to hash the file descriptors or the whole buffers so for now we + // just settle with the fact that the memory behind Arc's does not really move. Therefore, + // we can just hash the pointer. + match self { + Self::Paths(paths) => paths.hash(state), + Self::Files(files) => files.as_ptr().hash(state), + Self::Buffers(buffers) => buffers.as_ptr().hash(state), + } + } +} + +impl PartialEq for ScanSources { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (ScanSources::Paths(l), ScanSources::Paths(r)) => l == r, + (ScanSources::Files(l), ScanSources::Files(r)) => std::ptr::eq(l.as_ptr(), r.as_ptr()), + (ScanSources::Buffers(l), ScanSources::Buffers(r)) => { + std::ptr::eq(l.as_ptr(), r.as_ptr()) + }, + _ => false, + } + } +} + +impl Eq for ScanSources {} + +impl ScanSources { + pub fn iter(&self) -> ScanSourceIter { + ScanSourceIter { + sources: self, + offset: 0, + } + } + + pub fn to_dsl(self, is_expanded: bool) -> DslScanSources { + DslScanSources { + sources: self, + is_expanded, + } + } + + /// Are the sources all paths? + pub fn is_paths(&self) -> bool { + matches!(self, Self::Paths(_)) + } + + /// Try cast the scan sources to [`ScanSources::Paths`] + pub fn as_paths(&self) -> Option<&[PathBuf]> { + match self { + Self::Paths(paths) => Some(paths.as_ref()), + Self::Files(_) | Self::Buffers(_) => None, + } + } + + /// Try cast the scan sources to [`ScanSources::Paths`] with a clone + pub fn into_paths(&self) -> Option> { + match self { + Self::Paths(paths) => Some(paths.clone()), + Self::Files(_) | Self::Buffers(_) => None, + } + } + + /// Try get the first path in the scan sources + pub fn first_path(&self) -> Option<&Path> { + match self { + Self::Paths(paths) => paths.first().map(|p| p.as_path()), + Self::Files(_) | Self::Buffers(_) => None, + } + } + + /// Is the first path a cloud URL? + pub fn is_cloud_url(&self) -> bool { + self.first_path().is_some_and(polars_io::is_cloud_url) + } + + pub fn len(&self) -> usize { + match self { + Self::Paths(s) => s.len(), + Self::Files(s) => s.len(), + Self::Buffers(s) => s.len(), + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn first(&self) -> Option { + self.get(0) + } + + /// Turn the [`ScanSources`] into some kind of identifier + pub fn id(&self) -> PlSmallStr { + if self.is_empty() { + return PlSmallStr::from_static("EMPTY"); + } + + match self { + Self::Paths(paths) => { + PlSmallStr::from_str(paths.first().unwrap().to_string_lossy().as_ref()) + }, + Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"), + Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"), + } + } + + /// Get the scan source at specific address + pub fn get(&self, idx: usize) -> Option { + match self { + Self::Paths(paths) => paths.get(idx).map(|p| ScanSourceRef::Path(p)), + Self::Files(files) => files.get(idx).map(ScanSourceRef::File), + Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer), + } + } + + /// Get the scan source at specific address + /// + /// # Panics + /// + /// If the `idx` is out of range. + #[track_caller] + pub fn at(&self, idx: usize) -> ScanSourceRef { + self.get(idx).unwrap() + } +} + +impl<'a> ScanSourceRef<'a> { + /// Get the name for `include_paths` + pub fn to_include_path_name(&self) -> &str { + match self { + Self::Path(path) => path.to_str().unwrap(), + Self::File(_) => "open-file", + Self::Buffer(_) => "in-mem", + } + } + + /// Turn the scan source into a memory slice + pub fn to_memslice(&self) -> PolarsResult { + self.to_memslice_possibly_async(false, None, 0) + } + + pub fn to_memslice_async_latest(&self, run_async: bool) -> PolarsResult { + match self { + ScanSourceRef::Path(path) => { + let file = if run_async { + feature_gated!("cloud", { + polars_io::file_cache::FILE_CACHE + .get_entry(path.to_str().unwrap()) + // Safety: This was initialized by schema inference. + .unwrap() + .try_open_assume_latest()? + }) + } else { + polars_utils::open_file(path)? + }; + + Ok(MemSlice::from_mmap(Arc::new(unsafe { + memmap::Mmap::map(&file)? + }))) + }, + ScanSourceRef::File(file) => Ok(MemSlice::from_mmap(Arc::new(unsafe { + memmap::Mmap::map(*file)? + }))), + ScanSourceRef::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())), + } + } + + pub fn to_memslice_possibly_async( + &self, + run_async: bool, + #[cfg(feature = "cloud")] cache_entries: Option< + &Vec>, + >, + #[cfg(not(feature = "cloud"))] cache_entries: Option<&()>, + index: usize, + ) -> PolarsResult { + match self { + Self::Path(path) => { + let f = if run_async { + feature_gated!("cloud", { + cache_entries.unwrap()[index].try_open_check_latest()? + }) + } else { + polars_utils::open_file(path)? + }; + + let mmap = unsafe { memmap::Mmap::map(&f)? }; + Ok(MemSlice::from_mmap(Arc::new(mmap))) + }, + Self::File(file) => { + let mmap = unsafe { memmap::Mmap::map(*file)? }; + Ok(MemSlice::from_mmap(Arc::new(mmap))) + }, + Self::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())), + } + } +} + +impl<'a> Iterator for ScanSourceIter<'a> { + type Item = ScanSourceRef<'a>; + + fn next(&mut self) -> Option { + let item = match self.sources { + ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?), + ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?), + ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?), + }; + + self.offset += 1; + Some(item) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.sources.len() - self.offset; + (len, Some(len)) + } +} + +impl<'a> ExactSizeIterator for ScanSourceIter<'a> {} diff --git a/crates/polars-python/src/conversion/mod.rs b/crates/polars-python/src/conversion/mod.rs index 4eee205d8550..fd8e97cb7adc 100644 --- a/crates/polars-python/src/conversion/mod.rs +++ b/crates/polars-python/src/conversion/mod.rs @@ -32,7 +32,7 @@ use pyo3::pybacked::PyBackedStr; use pyo3::types::{PyDict, PyList, PySequence}; use crate::error::PyPolarsErr; -use crate::file::{get_either_file_or_path, EitherPythonFileOrPath}; +use crate::file::{get_python_scan_source_input, PythonScanSourceInput}; #[cfg(feature = "object")] use crate::object::OBJECT_NAME; use crate::prelude::*; @@ -549,29 +549,24 @@ impl<'py> FromPyObject<'py> for Wrap { let num_items = list.len(); let mut iter = list .into_iter() - .map(|val| get_either_file_or_path(val.unbind(), false)); + .map(|val| get_python_scan_source_input(val.unbind(), false)); let Some(first) = iter.next() else { return Ok(Wrap(ScanSources::default())); }; let mut sources = match first? { - EitherPythonFileOrPath::Py(f) => { - let mut sources = Vec::with_capacity(num_items); - sources.push(f.as_bytes()); - MutableSources::Buffers(sources) - }, - EitherPythonFileOrPath::Path(path) => { + PythonScanSourceInput::Path(path) => { let mut sources = Vec::with_capacity(num_items); sources.push(path); MutableSources::Paths(sources) }, - EitherPythonFileOrPath::File(file) => { + PythonScanSourceInput::File(file) => { let mut sources = Vec::with_capacity(num_items); sources.push(file); MutableSources::Files(sources) }, - EitherPythonFileOrPath::Buffer(buffer) => { + PythonScanSourceInput::Buffer(buffer) => { let mut sources = Vec::with_capacity(num_items); sources.push(buffer); MutableSources::Buffers(sources) @@ -580,10 +575,9 @@ impl<'py> FromPyObject<'py> for Wrap { for source in iter { match (&mut sources, source?) { - (MutableSources::Paths(v), EitherPythonFileOrPath::Path(p)) => v.push(p), - (MutableSources::Files(v), EitherPythonFileOrPath::File(f)) => v.push(f), - (MutableSources::Buffers(v), EitherPythonFileOrPath::Py(f)) => v.push(f.as_bytes()), - (MutableSources::Buffers(v), EitherPythonFileOrPath::Buffer(f)) => v.push(f), + (MutableSources::Paths(v), PythonScanSourceInput::Path(p)) => v.push(p), + (MutableSources::Files(v), PythonScanSourceInput::File(f)) => v.push(f), + (MutableSources::Buffers(v), PythonScanSourceInput::Buffer(f)) => v.push(f), _ => { return Err(PyTypeError::new_err( "Cannot combine in-memory bytes, paths and files for scan sources", diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index 2857b37a4891..33d084c5130c 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -203,20 +203,22 @@ impl EitherRustPythonFile { } } -pub enum EitherPythonFileOrPath { - Py(PyFileLikeObject), +pub enum PythonScanSourceInput { Buffer(bytes::Bytes), Path(PathBuf), File(File), } -pub fn get_either_file_or_path(py_f: PyObject, write: bool) -> PyResult { +pub fn get_python_scan_source_input( + py_f: PyObject, + write: bool, +) -> PyResult { Python::with_gil(|py| { let py_f = py_f.into_bound(py); // If the pyobject is a `bytes` class if let Ok(bytes) = py_f.downcast::() { - return Ok(EitherPythonFileOrPath::Buffer( + return Ok(PythonScanSourceInput::Buffer( bytes::Bytes::copy_from_slice(bytes.as_bytes()), )); } @@ -224,7 +226,7 @@ pub fn get_either_file_or_path(py_f: PyObject, write: bool) -> PyResult>() { let file_path = std::path::Path::new(&*s); let file_path = resolve_homedir(file_path); - Ok(EitherPythonFileOrPath::Path(file_path)) + Ok(PythonScanSourceInput::Path(file_path)) } else { let io = py.import_bound("io").unwrap(); let is_utf8_encoding = |py_f: &Bound| -> PyResult { @@ -277,7 +279,7 @@ pub fn get_either_file_or_path(py_f: PyObject, write: bool) -> PyResult PyResult PyResult<(Option, ScanSources)> { - use crate::file::{get_either_file_or_path, EitherPythonFileOrPath}; - Ok(match get_either_file_or_path(obj, false)? { - EitherPythonFileOrPath::Path(path) => { + use crate::file::{get_python_scan_source_input, PythonScanSourceInput}; + Ok(match get_python_scan_source_input(obj, false)? { + PythonScanSourceInput::Path(path) => { (Some(path.clone()), ScanSources::Paths([path].into())) }, - EitherPythonFileOrPath::File(file) => (None, ScanSources::Files([file].into())), - EitherPythonFileOrPath::Py(f) => (None, ScanSources::Buffers([f.as_bytes()].into())), - EitherPythonFileOrPath::Buffer(buff) => (None, ScanSources::Buffers([buff].into())), + PythonScanSourceInput::File(file) => (None, ScanSources::Files([file].into())), + PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())), }) }