diff --git a/Cargo.lock b/Cargo.lock index 6231e1b8d839..3dd0cbb20640 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3109,6 +3109,7 @@ dependencies = [ "flate2", "fs4", "futures", + "glob", "home", "itoa", "memchr", @@ -3167,7 +3168,6 @@ dependencies = [ "ahash", "bitflags", "futures", - "glob", "memchr", "once_cell", "polars-arrow", diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index 7c835a861fd9..99602fc968e8 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -27,6 +27,7 @@ chrono-tz = { workspace = true, optional = true } fast-float = { workspace = true, optional = true } flate2 = { workspace = true, optional = true } futures = { workspace = true, optional = true } +glob = { version = "0.3" } itoa = { workspace = true, optional = true } memchr = { workspace = true } memmap = { package = "memmap2", version = "0.7" } diff --git a/crates/polars-io/src/utils/mod.rs b/crates/polars-io/src/utils/mod.rs new file mode 100644 index 000000000000..5bee84ea3938 --- /dev/null +++ b/crates/polars-io/src/utils/mod.rs @@ -0,0 +1,5 @@ +mod other; +mod path; + +pub use other::*; +pub use path::*; diff --git a/crates/polars-io/src/utils.rs b/crates/polars-io/src/utils/other.rs similarity index 77% rename from crates/polars-io/src/utils.rs rename to crates/polars-io/src/utils/other.rs index d70cc5d2f02f..5d210b245df7 100644 --- a/crates/polars-io/src/utils.rs +++ b/crates/polars-io/src/utils/other.rs @@ -1,7 +1,6 @@ #[cfg(any(feature = "ipc_streaming", feature = "parquet"))] use std::borrow::Cow; use std::io::Read; -use std::path::{Path, PathBuf}; use once_cell::sync::Lazy; use polars_core::prelude::*; @@ -11,39 +10,6 @@ use regex::{Regex, RegexBuilder}; use crate::mmap::{MmapBytesReader, ReaderBytes}; -pub static POLARS_TEMP_DIR_BASE_PATH: Lazy> = Lazy::new(|| { - let path = std::env::var("POLARS_TEMP_DIR") - .map(PathBuf::from) - .unwrap_or_else(|_| { - PathBuf::from(std::env::temp_dir().to_string_lossy().as_ref()).join("polars/") - }) - .into_boxed_path(); - - if let Err(err) = std::fs::create_dir_all(path.as_ref()) { - if !path.is_dir() { - panic!( - "failed to create temporary directory: path = {}, err = {}", - path.to_str().unwrap(), - err - ); - } - } - - path -}); - -/// Ignores errors from `std::fs::create_dir_all` if the directory exists. -#[cfg(feature = "file_cache")] -pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> { - let result = std::fs::create_dir_all(path); - - if path.is_dir() { - Ok(()) - } else { - result - } -} - pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>( reader: &'a mut R, ) -> PolarsResult> { @@ -70,18 +36,28 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>( } } -// used by python polars -pub fn resolve_homedir(path: &Path) -> PathBuf { - // replace "~" with home directory - if path.starts_with("~") { - // home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297 - #[cfg(not(target_family = "wasm"))] - if let Some(homedir) = home::home_dir() { - return homedir.join(path.strip_prefix("~").unwrap()); - } - } +/// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read +/// concurrently/parallel +/// +/// This takes an iterator over the number of rows per file. +pub fn get_sequential_row_statistics( + iter: I, + mut total_rows_to_read: usize, +) -> Vec<(usize, usize)> +where + I: Iterator, +{ + let mut cumulative_read = 0; + iter.map(|rows_this_file| { + let remaining_rows_to_read = total_rows_to_read; + total_rows_to_read = total_rows_to_read.saturating_sub(rows_this_file); + + let current_cumulative_read = cumulative_read; + cumulative_read += rows_this_file; - path.into() + (remaining_rows_to_read, current_cumulative_read) + }) + .collect() } #[cfg(any( @@ -186,30 +162,6 @@ pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], off } } -/// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read -/// concurrently/parallel -/// -/// This takes an iterator over the number of rows per file. -pub fn get_sequential_row_statistics( - iter: I, - mut total_rows_to_read: usize, -) -> Vec<(usize, usize)> -where - I: Iterator, -{ - let mut cumulative_read = 0; - iter.map(|rows_this_file| { - let remaining_rows_to_read = total_rows_to_read; - total_rows_to_read = total_rows_to_read.saturating_sub(rows_this_file); - - let current_cumulative_read = cumulative_read; - cumulative_read += rows_this_file; - - (remaining_rows_to_read, current_cumulative_read) - }) - .collect() -} - #[cfg(feature = "json")] pub(crate) fn overwrite_schema( schema: &mut Schema, @@ -338,22 +290,9 @@ pub(crate) fn chunk_df_for_writing( Ok(result) } -static CLOUD_URL: Lazy = - Lazy::new(|| Regex::new(r"^(s3a?|gs|gcs|file|abfss?|azure|az|adl|https?)://").unwrap()); - -/// Check if the path is a cloud url. -pub fn is_cloud_url>(p: P) -> bool { - match p.as_ref().as_os_str().to_str() { - Some(s) => CLOUD_URL.is_match(s), - _ => false, - } -} - #[cfg(test)] mod tests { - use std::path::PathBuf; - - use super::{resolve_homedir, FLOAT_RE}; + use super::FLOAT_RE; #[test] fn test_float_parse() { @@ -375,42 +314,4 @@ mod tests { assert!(FLOAT_RE.is_match("7e-05")); assert!(FLOAT_RE.is_match("+7e+05")); } - - #[cfg(not(target_os = "windows"))] - #[test] - fn test_resolve_homedir() { - let paths: Vec = vec![ - "~/dir1/dir2/test.csv".into(), - "/abs/path/test.csv".into(), - "rel/path/test.csv".into(), - "/".into(), - "~".into(), - ]; - - let resolved: Vec = paths.iter().map(|x| resolve_homedir(x)).collect(); - - assert_eq!(resolved[0].file_name(), paths[0].file_name()); - assert!(resolved[0].is_absolute()); - assert_eq!(resolved[1], paths[1]); - assert_eq!(resolved[2], paths[2]); - assert_eq!(resolved[3], paths[3]); - assert!(resolved[4].is_absolute()); - } - - #[cfg(target_os = "windows")] - #[test] - fn test_resolve_homedir_windows() { - let paths: Vec = vec![ - r#"c:\Users\user1\test.csv"#.into(), - r#"~\user1\test.csv"#.into(), - "~".into(), - ]; - - let resolved: Vec = paths.iter().map(|x| resolve_homedir(x)).collect(); - - assert_eq!(resolved[0], paths[0]); - assert_eq!(resolved[1].file_name(), paths[1].file_name()); - assert!(resolved[1].is_absolute()); - assert!(resolved[2].is_absolute()); - } } diff --git a/crates/polars-io/src/utils/path.rs b/crates/polars-io/src/utils/path.rs new file mode 100644 index 000000000000..91bc74ed2700 --- /dev/null +++ b/crates/polars-io/src/utils/path.rs @@ -0,0 +1,376 @@ +use std::collections::VecDeque; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use once_cell::sync::Lazy; +use polars_core::config; +use polars_core::error::{polars_bail, to_compute_err, PolarsError, PolarsResult}; +use regex::Regex; + +use crate::cloud::CloudOptions; + +pub static POLARS_TEMP_DIR_BASE_PATH: Lazy> = Lazy::new(|| { + let path = std::env::var("POLARS_TEMP_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| { + PathBuf::from(std::env::temp_dir().to_string_lossy().as_ref()).join("polars/") + }) + .into_boxed_path(); + + if let Err(err) = std::fs::create_dir_all(path.as_ref()) { + if !path.is_dir() { + panic!( + "failed to create temporary directory: path = {}, err = {}", + path.to_str().unwrap(), + err + ); + } + } + + path +}); + +/// Replaces a "~" in the Path with the home directory. +pub fn resolve_homedir(path: &Path) -> PathBuf { + if path.starts_with("~") { + // home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297 + #[cfg(not(target_family = "wasm"))] + if let Some(homedir) = home::home_dir() { + return homedir.join(path.strip_prefix("~").unwrap()); + } + } + + path.into() +} + +static CLOUD_URL: Lazy = + Lazy::new(|| Regex::new(r"^(s3a?|gs|gcs|file|abfss?|azure|az|adl|https?)://").unwrap()); + +/// Check if the path is a cloud url. +pub fn is_cloud_url>(p: P) -> bool { + match p.as_ref().as_os_str().to_str() { + Some(s) => CLOUD_URL.is_match(s), + _ => false, + } +} + +/// Get the index of the first occurrence of a glob symbol. +pub fn get_glob_start_idx(path: &[u8]) -> Option { + memchr::memchr3(b'*', b'?', b'[', path) +} + +/// Returns `true` if `expanded_paths` were expanded from a single directory +pub fn expanded_from_single_directory>( + paths: &[P], + expanded_paths: &[P], +) -> bool { + // Single input that isn't a glob + paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().to_str().unwrap().as_bytes()).is_none() + // And isn't a file + && { + ( + // For local paths, we can just use `is_dir` + !is_cloud_url(paths[0].as_ref()) && paths[0].as_ref().is_dir() + ) + || ( + // Otherwise we check the output path is different from the input path, so that we also + // handle the case of a directory containing a single file. + !expanded_paths.is_empty() && (paths[0].as_ref() != expanded_paths[0].as_ref()) + ) + } +} + +/// Recursively traverses directories and expands globs if `glob` is `true`. +/// Returns the expanded paths and the index at which to start parsing hive +/// partitions from the path. +pub fn expand_paths( + paths: &[PathBuf], + #[allow(unused_variables)] cloud_options: Option<&CloudOptions>, + glob: bool, + check_directory_level: bool, +) -> PolarsResult<(Arc<[PathBuf]>, usize)> { + let Some(first_path) = paths.first() else { + return Ok((vec![].into(), 0)); + }; + + let is_cloud = is_cloud_url(first_path); + let mut out_paths = vec![]; + + let expand_start_idx = &mut usize::MAX.clone(); + let mut update_expand_start_idx = |i, path_idx: usize| { + if check_directory_level + && ![usize::MAX, i].contains(expand_start_idx) + // They could still be the same directory level, just with different name length + && (paths[path_idx].parent() != paths[path_idx - 1].parent()) + { + polars_bail!( + InvalidOperation: + "attempted to read from different directory levels with hive partitioning enabled: first path: {}, second path: {}", + paths[path_idx - 1].to_str().unwrap(), + paths[path_idx].to_str().unwrap(), + ) + } else { + *expand_start_idx = std::cmp::min(*expand_start_idx, i); + Ok(()) + } + }; + + if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } { + #[cfg(feature = "cloud")] + { + use crate::cloud::object_path_from_string; + + let format_path = |scheme: &str, bucket: &str, location: &str| { + if is_cloud { + format!("{}://{}/{}", scheme, bucket, location) + } else { + format!("/{}", location) + } + }; + + let expand_path_cloud = |path: &str, + cloud_options: Option<&CloudOptions>| + -> PolarsResult<(usize, Vec)> { + crate::pl_async::get_runtime().block_on_potential_spawn(async { + let (cloud_location, store) = + crate::cloud::build_object_store(path, cloud_options).await?; + + let prefix = object_path_from_string(cloud_location.prefix.clone())?; + + let out = if !path.ends_with("/") + && cloud_location.expansion.is_none() + && store.head(&prefix).await.is_ok() + { + ( + 0, + vec![PathBuf::from(format_path( + &cloud_location.scheme, + &cloud_location.bucket, + &cloud_location.prefix, + ))], + ) + } else { + use futures::TryStreamExt; + + if !is_cloud { + // FORCE_ASYNC in the test suite wants us to raise a proper error message + // for non-existent file paths. Note we can't do this for cloud paths as + // there is no concept of a "directory" - a non-existent path is + // indistinguishable from an empty directory. + let path = PathBuf::from(path); + if !path.is_dir() { + path.metadata().map_err(|err| { + let msg = + Some(format!("{}: {}", err, path.to_str().unwrap()).into()); + PolarsError::IO { + error: err.into(), + msg, + } + })?; + } + } + + let cloud_location = &cloud_location; + + let mut paths = store + .list(Some(&prefix)) + .try_filter_map(|x| async move { + let out = (x.size > 0).then(|| { + PathBuf::from({ + format_path( + &cloud_location.scheme, + &cloud_location.bucket, + x.location.as_ref(), + ) + }) + }); + Ok(out) + }) + .try_collect::>() + .await + .map_err(to_compute_err)?; + + paths.sort_unstable(); + ( + format_path( + &cloud_location.scheme, + &cloud_location.bucket, + &cloud_location.prefix, + ) + .len(), + paths, + ) + }; + + PolarsResult::Ok(out) + }) + }; + + for (path_idx, path) in paths.iter().enumerate() { + let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes()); + + let path = if glob_start_idx.is_some() { + path.clone() + } else { + let (expand_start_idx, paths) = + expand_path_cloud(path.to_str().unwrap(), cloud_options)?; + out_paths.extend_from_slice(&paths); + update_expand_start_idx(expand_start_idx, path_idx)?; + continue; + }; + + update_expand_start_idx(0, path_idx)?; + + let iter = crate::pl_async::get_runtime().block_on_potential_spawn( + crate::async_glob(path.to_str().unwrap(), cloud_options), + )?; + + if is_cloud { + out_paths.extend(iter.into_iter().map(PathBuf::from)); + } else { + // FORCE_ASYNC, remove leading file:// as not all readers support it. + out_paths.extend(iter.iter().map(|x| &x[7..]).map(PathBuf::from)) + } + } + } + #[cfg(not(feature = "cloud"))] + panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.") + } else { + let mut stack = VecDeque::new(); + + for path_idx in 0..paths.len() { + let path = &paths[path_idx]; + stack.clear(); + + if path.is_dir() { + let i = path.to_str().unwrap().len(); + + update_expand_start_idx(i, path_idx)?; + + stack.push_back(path.clone()); + + while let Some(dir) = stack.pop_front() { + let mut paths = std::fs::read_dir(dir) + .map_err(PolarsError::from)? + .map(|x| x.map(|x| x.path())) + .collect::>>() + .map_err(PolarsError::from)?; + paths.sort_unstable(); + + for path in paths { + if path.is_dir() { + stack.push_back(path); + } else if path.metadata()?.len() > 0 { + out_paths.push(path); + } + } + } + + continue; + } + + let i = get_glob_start_idx(path.to_str().unwrap().as_bytes()); + + if glob && i.is_some() { + update_expand_start_idx(0, path_idx)?; + + let Ok(paths) = glob::glob(path.to_str().unwrap()) else { + polars_bail!(ComputeError: "invalid glob pattern given") + }; + + for path in paths { + let path = path.map_err(to_compute_err)?; + if !path.is_dir() && path.metadata()?.len() > 0 { + out_paths.push(path); + } + } + } else { + update_expand_start_idx(0, path_idx)?; + out_paths.push(path.clone()); + } + } + } + + let out_paths = if expanded_from_single_directory(paths, out_paths.as_ref()) { + // Require all file extensions to be the same when expanding a single directory. + let ext = out_paths[0].extension(); + + (0..out_paths.len()) + .map(|i| { + let path = out_paths[i].clone(); + + if path.extension() != ext { + polars_bail!( + InvalidOperation: r#"directory contained paths with different file extensions: \ + first path: {}, second path: {}. Please use a glob pattern to explicitly specify + which files to read (e.g. "dir/**/*", "dir/**/*.parquet")"#, + out_paths[i - 1].to_str().unwrap(), path.to_str().unwrap() + ); + }; + + Ok(path) + }) + .collect::>>()? + } else { + Arc::<[_]>::from(out_paths) + }; + + Ok((out_paths, *expand_start_idx)) +} + +/// Ignores errors from `std::fs::create_dir_all` if the directory exists. +#[cfg(feature = "file_cache")] +pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> { + let result = std::fs::create_dir_all(path); + + if path.is_dir() { + Ok(()) + } else { + result + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use super::resolve_homedir; + + #[cfg(not(target_os = "windows"))] + #[test] + fn test_resolve_homedir() { + let paths: Vec = vec![ + "~/dir1/dir2/test.csv".into(), + "/abs/path/test.csv".into(), + "rel/path/test.csv".into(), + "/".into(), + "~".into(), + ]; + + let resolved: Vec = paths.iter().map(|x| resolve_homedir(x)).collect(); + + assert_eq!(resolved[0].file_name(), paths[0].file_name()); + assert!(resolved[0].is_absolute()); + assert_eq!(resolved[1], paths[1]); + assert_eq!(resolved[2], paths[2]); + assert_eq!(resolved[3], paths[3]); + assert!(resolved[4].is_absolute()); + } + + #[cfg(target_os = "windows")] + #[test] + fn test_resolve_homedir_windows() { + let paths: Vec = vec![ + r#"c:\Users\user1\test.csv"#.into(), + r#"~\user1\test.csv"#.into(), + "~".into(), + ]; + + let resolved: Vec = paths.iter().map(|x| resolve_homedir(x)).collect(); + + assert_eq!(resolved[0], paths[0]); + assert_eq!(resolved[1].file_name(), paths[1].file_name()); + assert!(resolved[1].is_absolute()); + assert!(resolved[2].is_absolute()); + } +} diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index 34767752c1bb..a93d4743f17c 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -25,7 +25,6 @@ polars-utils = { workspace = true } ahash = { workspace = true } bitflags = { workspace = true } -glob = { version = "0.3" } memchr = { workspace = true } once_cell = { workspace = true } pyo3 = { workspace = true, optional = true } diff --git a/crates/polars-lazy/src/scan/file_list_reader.rs b/crates/polars-lazy/src/scan/file_list_reader.rs index 86a70de80b81..3ff9b17b1c13 100644 --- a/crates/polars-lazy/src/scan/file_list_reader.rs +++ b/crates/polars-lazy/src/scan/file_list_reader.rs @@ -1,281 +1,13 @@ -use std::collections::VecDeque; use std::path::PathBuf; -use polars_core::config; -use polars_core::error::to_compute_err; use polars_core::prelude::*; use polars_io::cloud::CloudOptions; -use polars_io::utils::is_cloud_url; +use polars_io::utils::expand_paths; use polars_io::RowIndex; use polars_plan::prelude::UnionArgs; use crate::prelude::*; -pub type PathIterator = Box>>; - -pub(super) fn get_glob_start_idx(path: &[u8]) -> Option { - memchr::memchr3(b'*', b'?', b'[', path) -} - -/// Checks if `expanded_paths` were expanded from a single directory -pub(super) fn expanded_from_single_directory>( - paths: &[P], - expanded_paths: &[P], -) -> bool { - // Single input that isn't a glob - paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().to_str().unwrap().as_bytes()).is_none() - // And isn't a file - && { - ( - // For local paths, we can just use `is_dir` - !is_cloud_url(paths[0].as_ref()) && paths[0].as_ref().is_dir() - ) - || ( - // Otherwise we check the output path is different from the input path, so that we also - // handle the case of a directory containing a single file. - !expanded_paths.is_empty() && (paths[0].as_ref() != expanded_paths[0].as_ref()) - ) - } -} - -/// Recursively traverses directories and expands globs if `glob` is `true`. -/// Returns the expanded paths and the index at which to start parsing hive -/// partitions from the path. -fn expand_paths( - paths: &[PathBuf], - #[allow(unused_variables)] cloud_options: Option<&CloudOptions>, - glob: bool, - check_directory_level: bool, -) -> PolarsResult<(Arc<[PathBuf]>, usize)> { - let Some(first_path) = paths.first() else { - return Ok((vec![].into(), 0)); - }; - - let is_cloud = is_cloud_url(first_path); - let mut out_paths = vec![]; - - let expand_start_idx = &mut usize::MAX.clone(); - let mut update_expand_start_idx = |i, path_idx: usize| { - if check_directory_level - && ![usize::MAX, i].contains(expand_start_idx) - // They could still be the same directory level, just with different name length - && (paths[path_idx].parent() != paths[path_idx - 1].parent()) - { - polars_bail!( - InvalidOperation: - "attempted to read from different directory levels with hive partitioning enabled: first path: {}, second path: {}", - paths[path_idx - 1].to_str().unwrap(), - paths[path_idx].to_str().unwrap(), - ) - } else { - *expand_start_idx = std::cmp::min(*expand_start_idx, i); - Ok(()) - } - }; - - if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } { - #[cfg(feature = "async")] - { - use polars_io::cloud::object_path_from_string; - - let format_path = |scheme: &str, bucket: &str, location: &str| { - if is_cloud { - format!("{}://{}/{}", scheme, bucket, location) - } else { - format!("/{}", location) - } - }; - - let expand_path_cloud = |path: &str, - cloud_options: Option<&CloudOptions>| - -> PolarsResult<(usize, Vec)> { - polars_io::pl_async::get_runtime().block_on_potential_spawn(async { - let (cloud_location, store) = - polars_io::cloud::build_object_store(path, cloud_options).await?; - - let prefix = object_path_from_string(cloud_location.prefix.clone())?; - - let out = if !path.ends_with("/") - && cloud_location.expansion.is_none() - && store.head(&prefix).await.is_ok() - { - ( - 0, - vec![PathBuf::from(format_path( - &cloud_location.scheme, - &cloud_location.bucket, - &cloud_location.prefix, - ))], - ) - } else { - use futures::TryStreamExt; - - if !is_cloud { - // FORCE_ASYNC in the test suite wants us to raise a proper error message - // for non-existent file paths. Note we can't do this for cloud paths as - // there is no concept of a "directory" - a non-existent path is - // indistinguishable from an empty directory. - let path = PathBuf::from(path); - if !path.is_dir() { - path.metadata().map_err(|err| { - let msg = - Some(format!("{}: {}", err, path.to_str().unwrap()).into()); - PolarsError::IO { - error: err.into(), - msg, - } - })?; - } - } - - let cloud_location = &cloud_location; - - let mut paths = store - .list(Some(&prefix)) - .try_filter_map(|x| async move { - let out = (x.size > 0).then(|| { - PathBuf::from({ - format_path( - &cloud_location.scheme, - &cloud_location.bucket, - x.location.as_ref(), - ) - }) - }); - Ok(out) - }) - .try_collect::>() - .await - .map_err(to_compute_err)?; - - paths.sort_unstable(); - ( - format_path( - &cloud_location.scheme, - &cloud_location.bucket, - &cloud_location.prefix, - ) - .len(), - paths, - ) - }; - - PolarsResult::Ok(out) - }) - }; - - for (path_idx, path) in paths.iter().enumerate() { - let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes()); - - let path = if glob_start_idx.is_some() { - path.clone() - } else { - let (expand_start_idx, paths) = - expand_path_cloud(path.to_str().unwrap(), cloud_options)?; - out_paths.extend_from_slice(&paths); - update_expand_start_idx(expand_start_idx, path_idx)?; - continue; - }; - - update_expand_start_idx(0, path_idx)?; - - let iter = polars_io::pl_async::get_runtime().block_on_potential_spawn( - polars_io::async_glob(path.to_str().unwrap(), cloud_options), - )?; - - if is_cloud { - out_paths.extend(iter.into_iter().map(PathBuf::from)); - } else { - // FORCE_ASYNC, remove leading file:// as not all readers support it. - out_paths.extend(iter.iter().map(|x| &x[7..]).map(PathBuf::from)) - } - } - } - #[cfg(not(feature = "async"))] - panic!("Feature `async` must be enabled to use globbing patterns with cloud urls.") - } else { - let mut stack = VecDeque::new(); - - for path_idx in 0..paths.len() { - let path = &paths[path_idx]; - stack.clear(); - - if path.is_dir() { - let i = path.to_str().unwrap().len(); - - update_expand_start_idx(i, path_idx)?; - - stack.push_back(path.clone()); - - while let Some(dir) = stack.pop_front() { - let mut paths = std::fs::read_dir(dir) - .map_err(PolarsError::from)? - .map(|x| x.map(|x| x.path())) - .collect::>>() - .map_err(PolarsError::from)?; - paths.sort_unstable(); - - for path in paths { - if path.is_dir() { - stack.push_back(path); - } else if path.metadata()?.len() > 0 { - out_paths.push(path); - } - } - } - - continue; - } - - let i = get_glob_start_idx(path.to_str().unwrap().as_bytes()); - - if glob && i.is_some() { - update_expand_start_idx(0, path_idx)?; - - let Ok(paths) = glob::glob(path.to_str().unwrap()) else { - polars_bail!(ComputeError: "invalid glob pattern given") - }; - - for path in paths { - let path = path.map_err(to_compute_err)?; - if !path.is_dir() && path.metadata()?.len() > 0 { - out_paths.push(path); - } - } - } else { - update_expand_start_idx(0, path_idx)?; - out_paths.push(path.clone()); - } - } - } - - let out_paths = if expanded_from_single_directory(paths, out_paths.as_ref()) { - // Require all file extensions to be the same when expanding a single directory. - let ext = out_paths[0].extension(); - - (0..out_paths.len()) - .map(|i| { - let path = out_paths[i].clone(); - - if path.extension() != ext { - polars_bail!( - InvalidOperation: r#"directory contained paths with different file extensions: \ - first path: {}, second path: {}. Please use a glob pattern to explicitly specify - which files to read (e.g. "dir/**/*", "dir/**/*.parquet")"#, - out_paths[i - 1].to_str().unwrap(), path.to_str().unwrap() - ); - }; - - Ok(path) - }) - .collect::>>()? - } else { - Arc::<[_]>::from(out_paths) - }; - - Ok((out_paths, *expand_start_idx)) -} - /// Reads [LazyFrame] from a filesystem or a cloud storage. /// Supports glob patterns. /// diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index a83a3c31b386..b079b3462e95 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -1,9 +1,9 @@ use std::path::{Path, PathBuf}; -use file_list_reader::expanded_from_single_directory; use polars_core::prelude::*; use polars_io::cloud::CloudOptions; use polars_io::ipc::IpcScanOptions; +use polars_io::utils::expanded_from_single_directory; use polars_io::{HiveOptions, RowIndex}; use crate::prelude::*; diff --git a/crates/polars-lazy/src/scan/parquet.rs b/crates/polars-lazy/src/scan/parquet.rs index fe4e1da1f43a..c5fce2ce85e8 100644 --- a/crates/polars-lazy/src/scan/parquet.rs +++ b/crates/polars-lazy/src/scan/parquet.rs @@ -1,9 +1,9 @@ use std::path::{Path, PathBuf}; -use file_list_reader::expanded_from_single_directory; use polars_core::prelude::*; use polars_io::cloud::CloudOptions; use polars_io::parquet::read::ParallelStrategy; +use polars_io::utils::expanded_from_single_directory; use polars_io::{HiveOptions, RowIndex}; use crate::prelude::*; diff --git a/crates/polars-plan/src/plans/builder_dsl.rs b/crates/polars-plan/src/plans/builder_dsl.rs index fd8a58d79e53..a4baf8a8c002 100644 --- a/crates/polars-plan/src/plans/builder_dsl.rs +++ b/crates/polars-plan/src/plans/builder_dsl.rs @@ -99,8 +99,8 @@ impl DslBuilder { paths, file_info: None, hive_parts: None, - file_options: options, predicate: None, + file_options: options, scan_type: FileScan::Parquet { options: ParquetOptions { parallel, diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 30b3f9feb4ac..0b1ad0d7cd8f 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -83,12 +83,12 @@ pub fn to_alp_impl( let v = match lp { DslPlan::Scan { + paths, file_info, hive_parts, - paths, predicate, - mut scan_type, mut file_options, + mut scan_type, } => { let mut file_info = if let Some(file_info) = file_info { file_info