Skip to content

Commit

Permalink
clean up the code a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Sep 8, 2024
1 parent 29fe063 commit 5daebe4
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 286 deletions.
6 changes: 3 additions & 3 deletions crates/polars-mem-engine/src/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ impl CsvExec {

let verbose = config::verbose();
let force_async = config::force_async();
let run_async = (self.sources.is_files() && force_async) || self.sources.is_cloud_url();
let run_async = (self.sources.is_paths() && force_async) || self.sources.is_cloud_url();

if self.sources.is_files() && force_async && verbose {
if self.sources.is_paths() && force_async && verbose {
eprintln!("ASYNC READING FORCED");
}

Expand All @@ -75,7 +75,7 @@ impl CsvExec {
.finish()?;

if let Some(col) = &self.file_options.include_file_paths {
let name = source.to_file_path();
let name = source.to_include_path_name();

unsafe {
df.with_column_unchecked(
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl IpcExec {
};
let force_async = config::force_async();

let mut out = if is_cloud || (self.sources.is_files() && force_async) {
let mut out = if is_cloud || (self.sources.is_paths() && force_async) {
feature_gated!("cloud", {
if force_async && config::verbose() {
eprintln!("ASYNC READING FORCED");
Expand Down Expand Up @@ -102,7 +102,7 @@ impl IpcExec {
self.file_options
.include_file_paths
.as_ref()
.map(|x| (x.clone(), Arc::from(source.to_file_path()))),
.map(|x| (x.clone(), Arc::from(source.to_include_path_name()))),
)
.finish()
};
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-mem-engine/src/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ impl JsonExec {

let verbose = config::verbose();
let force_async = config::force_async();
let run_async = (self.sources.is_files() && force_async) || self.sources.is_cloud_url();
let run_async = (self.sources.is_paths() && force_async) || self.sources.is_cloud_url();

if self.sources.is_files() && force_async && verbose {
if self.sources.is_paths() && force_async && verbose {
eprintln!("ASYNC READING FORCED");
}

Expand Down Expand Up @@ -108,7 +108,7 @@ impl JsonExec {
}

if let Some(col) = &self.file_scan_options.include_file_paths {
let name = source.to_file_path();
let name = source.to_include_path_name();
unsafe {
df.with_column_unchecked(
StringChunked::full(col.clone(), name, df.height()).into_series(),
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl ParquetExec {
self.file_options
.include_file_paths
.as_ref()
.map(|x| (x.clone(), Arc::from(source.to_file_path()))),
.map(|x| (x.clone(), Arc::from(source.to_include_path_name()))),
);

reader
Expand Down Expand Up @@ -453,7 +453,7 @@ impl ParquetExec {
let is_cloud = self.sources.is_cloud_url();
let force_async = config::force_async();

let out = if is_cloud || (self.sources.is_files() && force_async) {
let out = if is_cloud || (self.sources.is_paths() && force_async) {
feature_gated!("cloud", {
if force_async && config::verbose() {
eprintln!("ASYNC READING FORCED");
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub(super) fn csv_file_info(
// * See if we can do this without downloading the entire file

// prints the error message if paths is empty.
let run_async = sources.is_cloud_url() || (sources.is_files() && config::force_async());
let run_async = sources.is_cloud_url() || (sources.is_paths() && config::force_async());

let cache_entries = {
if run_async {
Expand Down Expand Up @@ -268,7 +268,7 @@ pub(super) fn ndjson_file_info(
polars_bail!(ComputeError: "expected at least 1 source");
};

let run_async = sources.is_cloud_url() || (sources.is_files() && config::force_async());
let run_async = sources.is_cloud_url() || (sources.is_paths() && config::force_async());

let cache_entries = {
if run_async {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/functions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ pub(super) fn count_rows_ndjson(
}

let is_cloud_url = sources.is_cloud_url();
let run_async = is_cloud_url || (sources.is_files() && config::force_async());
let run_async = is_cloud_url || (sources.is_paths() && config::force_async());

let cache_entries = {
if run_async {
Expand Down
246 changes: 2 additions & 244 deletions crates/polars-plan/src/plans/ir/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
mod dot;
mod format;
mod inputs;
mod scan_sources;
mod schema;
pub(crate) mod tree_format;

use std::borrow::Cow;
use std::fmt;
use std::fs::File;
use std::path::{Path, PathBuf};

pub use dot::{EscapeLabel, IRDotDisplay, PathsDisplay, ScanSourcesDisplay};
pub use format::{ExprIRDisplay, IRDisplay};
use hive::HivePartitions;
use polars_core::error::feature_gated;
use polars_core::prelude::*;
use polars_utils::idx_vec::UnitVec;
use polars_utils::mmap::MemSlice;
use polars_utils::unitvec;
pub use scan_sources::{ScanSourceIter, ScanSourceRef, ScanSources};
#[cfg(feature = "ir_serde")]
use serde::{Deserialize, Serialize};

Expand All @@ -36,246 +34,6 @@ pub struct IRPlanRef<'a> {
pub expr_arena: &'a Arena<AExpr>,
}

#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Debug, Clone)]
pub enum ScanSources {
Paths(Arc<[PathBuf]>),

#[cfg_attr(feature = "serde", serde(skip))]
Files(Arc<[File]>),
#[cfg_attr(feature = "serde", serde(skip))]
Buffers(Arc<[bytes::Bytes]>),
}

impl std::hash::Hash for ScanSources {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
std::mem::discriminant(self).hash(state);

// @NOTE: This is a bit crazy
match self {
Self::Paths(paths) => paths.hash(state),
Self::Files(files) => files.as_ptr().hash(state),
Self::Buffers(buffers) => buffers.as_ptr().hash(state),
}
}
}

impl PartialEq for ScanSources {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(ScanSources::Paths(l), ScanSources::Paths(r)) => l == r,
_ => false,
}
}
}

impl Eq for ScanSources {}

#[derive(Debug, Clone, Copy)]
pub enum ScanSourceRef<'a> {
Path(&'a Path),
File(&'a File),
Buffer(&'a bytes::Bytes),
}

pub struct ScanSourceSliceInfo {
pub item_slice: std::ops::Range<usize>,
pub source_slice: std::ops::Range<usize>,
}

impl Default for ScanSources {
fn default() -> Self {
Self::Buffers(Arc::default())
}
}

impl<'a> ScanSourceRef<'a> {
pub fn to_file_path(&self) -> &str {
match self {
Self::Path(path) => path.to_str().unwrap(),
Self::File(_) => "open-file",
Self::Buffer(_) => "in-mem",
}
}

pub fn to_memslice(&self) -> PolarsResult<MemSlice> {
self.to_memslice_possibly_async(false, None, 0)
}

pub fn to_memslice_async_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
match self {
ScanSourceRef::Path(path) => {
let file = if run_async {
feature_gated!("cloud", {
polars_io::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?
})
} else {
polars_utils::open_file(path)?
};

Ok(MemSlice::from_mmap(Arc::new(unsafe {
memmap::Mmap::map(&file)?
})))
},
ScanSourceRef::File(file) => Ok(MemSlice::from_mmap(Arc::new(unsafe {
memmap::Mmap::map(*file)?
}))),
ScanSourceRef::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())),
}
}

pub fn to_memslice_possibly_async(
&self,
run_async: bool,
#[cfg(feature = "cloud")] cache_entries: Option<
&Vec<Arc<polars_io::file_cache::FileCacheEntry>>,
>,
#[cfg(not(feature = "cloud"))] cache_entries: Option<&()>,
index: usize,
) -> PolarsResult<MemSlice> {
match self {
Self::Path(path) => {
let f = if run_async {
feature_gated!("cloud", {
cache_entries.unwrap()[index].try_open_check_latest()?
})
} else {
polars_utils::open_file(path)?
};

let mmap = unsafe { memmap::Mmap::map(&f)? };
Ok(MemSlice::from_mmap(Arc::new(mmap)))
},
Self::File(file) => {
let mmap = unsafe { memmap::Mmap::map(*file)? };
Ok(MemSlice::from_mmap(Arc::new(mmap)))
},
Self::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())),
}
}
}

impl ScanSources {
pub fn iter(&self) -> ScanSourceIter {
ScanSourceIter {
sources: self,
offset: 0,
}
}

pub fn as_paths(&self) -> Option<&[PathBuf]> {
match self {
Self::Paths(paths) => Some(paths.as_ref()),
Self::Files(_) | Self::Buffers(_) => None,
}
}

pub fn into_paths(&self) -> Option<Arc<[PathBuf]>> {
match self {
Self::Paths(paths) => Some(paths.clone()),
Self::Files(_) | Self::Buffers(_) => None,
}
}

pub fn first_path(&self) -> Option<&Path> {
match self {
Self::Paths(paths) => paths.first().map(|p| p.as_path()),
Self::Files(_) | Self::Buffers(_) => None,
}
}

pub fn to_dsl(self, is_expanded: bool) -> DslScanSources {
DslScanSources {
sources: self,
is_expanded,
}
}

pub fn is_files(&self) -> bool {
matches!(self, Self::Paths(_))
}

pub fn is_cloud_url(&self) -> bool {
match self {
Self::Paths(paths) => paths.first().map_or(false, polars_io::is_cloud_url),
Self::Files(_) | Self::Buffers(_) => false,
}
}

pub fn len(&self) -> usize {
match self {
Self::Paths(s) => s.len(),
Self::Files(s) => s.len(),
Self::Buffers(s) => s.len(),
}
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn first(&self) -> Option<ScanSourceRef> {
self.get(0)
}

pub fn id(&self) -> PlSmallStr {
if self.is_empty() {
return PlSmallStr::from_static("EMPTY");
}

match self {
Self::Paths(paths) => {
PlSmallStr::from_str(paths.first().unwrap().to_string_lossy().as_ref())
},
Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"),
Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"),
}
}

pub fn get(&self, idx: usize) -> Option<ScanSourceRef> {
match self {
Self::Paths(paths) => paths.get(idx).map(|p| ScanSourceRef::Path(p)),
Self::Files(files) => files.get(idx).map(ScanSourceRef::File),
Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),
}
}

pub fn at(&self, idx: usize) -> ScanSourceRef {
self.get(idx).unwrap()
}
}

pub struct ScanSourceIter<'a> {
sources: &'a ScanSources,
offset: usize,
}

impl<'a> Iterator for ScanSourceIter<'a> {
type Item = ScanSourceRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
let item = match self.sources {
ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?),
ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?),
ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?),
};

self.offset += 1;
Some(item)
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.sources.len() - self.offset;
(len, Some(len))
}
}

impl<'a> ExactSizeIterator for ScanSourceIter<'a> {}

/// [`IR`] is a representation of [`DslPlan`] with [`Node`]s which are allocated in an [`Arena`]
/// In this IR the logical plan has access to the full dataset.
#[derive(Clone, Debug, Default)]
Expand Down
Loading

0 comments on commit 5daebe4

Please sign in to comment.