Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): IO plugins #17939

Merged
merged 10 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion crates/polars-lazy/src/frame/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ impl LazyFrame {
pub fn scan_from_python_function(schema: Schema, scan_fn: PyObject, pyarrow: bool) -> Self {
DslPlan::PythonScan {
options: PythonOptions {
// Should be a python function that returns a generator
scan_fn: Some(scan_fn.into()),
schema: Arc::new(schema),
pyarrow,
python_source: if pyarrow {
PythonScanSource::Pyarrow
} else {
PythonScanSource::IOPlugin
},
..Default::default()
},
}
Expand Down
15 changes: 0 additions & 15 deletions crates/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,21 +649,6 @@ fn scan_predicate_on_set_null_values() -> PolarsResult<()> {
Ok(())
}

#[test]
fn scan_anonymous_fn() -> PolarsResult<()> {
let function = Arc::new(|_scan_opts: AnonymousScanArgs| Ok(fruits_cars()));

let args = ScanArgsAnonymous {
schema: Some(Arc::new(fruits_cars().schema())),
..ScanArgsAnonymous::default()
};

let df = LazyFrame::anonymous_scan(function, args)?.collect()?;

assert_eq!(df.shape(), (5, 4));
Ok(())
}

#[test]
fn scan_anonymous_fn_with_options() -> PolarsResult<()> {
struct MyScan {}
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-mem-engine/src/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ mod join;
mod projection;
mod projection_simple;
mod projection_utils;
#[cfg(feature = "python")]
mod python_scan;
mod scan;
mod slice;
mod sort;
Expand Down Expand Up @@ -43,8 +41,6 @@ pub(super) use self::hconcat::*;
pub(super) use self::join::*;
pub(super) use self::projection::*;
pub(super) use self::projection_simple::*;
#[cfg(feature = "python")]
pub(super) use self::python_scan::*;
pub(super) use self::scan::*;
pub(super) use self::slice::*;
pub(super) use self::sort::*;
Expand Down
53 changes: 0 additions & 53 deletions crates/polars-mem-engine/src/executors/python_scan.rs

This file was deleted.

4 changes: 4 additions & 0 deletions crates/polars-mem-engine/src/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mod ipc;
mod ndjson;
#[cfg(feature = "parquet")]
mod parquet;
#[cfg(feature = "python")]
mod python_scan;

use std::mem;

Expand All @@ -23,6 +25,8 @@ use polars_io::predicates::PhysicalIoExpr;
use polars_io::prelude::*;
use polars_plan::global::_set_n_rows_for_scan;

#[cfg(feature = "python")]
pub(crate) use self::python_scan::*;
use super::*;
use crate::prelude::*;

Expand Down
134 changes: 134 additions & 0 deletions crates/polars-mem-engine/src/executors/scan/python_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use polars_core::error::to_compute_err;
use polars_core::utils::accumulate_dataframes_vertical;
use pyo3::exceptions::PyStopIteration;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::{intern, PyTypeInfo};

use super::*;

pub(crate) struct PythonScanExec {
pub(crate) options: PythonOptions,
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
pub(crate) predicate_serialized: Option<Vec<u8>>,
}

fn python_df_to_rust(py: Python, df: Bound<PyAny>) -> PolarsResult<DataFrame> {
let err = |_| polars_err!(ComputeError: "expected a polars.DataFrame; got {}", df);
let pydf = df.getattr(intern!(py, "_df")).map_err(err)?;
let raw_parts = pydf
.call_method0(intern!(py, "into_raw_parts"))
.map_err(err)?;
let raw_parts = raw_parts.extract::<(usize, usize, usize)>().unwrap();

let (ptr, len, cap) = raw_parts;
unsafe {
Ok(DataFrame::new_no_checks(Vec::from_raw_parts(
ptr as *mut Series,
len,
cap,
)))
}
}

impl Executor for PythonScanExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
state.should_stop()?;
#[cfg(debug_assertions)]
{
if state.verbose() {
eprintln!("run PythonScanExec")
}
}
let with_columns = self.options.with_columns.take();
let n_rows = self.options.n_rows.take();
Python::with_gil(|py| {
let pl = PyModule::import_bound(py, intern!(py, "polars")).unwrap();
let utils = pl.getattr(intern!(py, "_utils")).unwrap();
let callable = utils.getattr(intern!(py, "_execute_from_rust")).unwrap();

let python_scan_function = self.options.scan_fn.take().unwrap().0;

let with_columns = with_columns.map(|cols| cols.iter().cloned().collect::<Vec<_>>());

let predicate = match &self.options.predicate {
PythonPredicate::PyArrow(s) => s.into_py(py),
PythonPredicate::None => None::<()>.into_py(py),
PythonPredicate::Polars(_) => {
assert!(self.predicate.is_some(), "should be set");

match &self.predicate_serialized {
None => None::<()>.into_py(py),
Some(buf) => PyBytes::new_bound(py, buf).to_object(py),
}
},
};

let generator_init = if matches!(
self.options.python_source,
PythonScanSource::Pyarrow | PythonScanSource::Cuda
) {
let args = (python_scan_function, with_columns, predicate, n_rows);
callable.call1(args).map_err(to_compute_err)
} else {
// If there are filters, take smaller chunks to ensure we can keep memory
// pressure low.
let batch_size = if self.predicate.is_some() {
Some(100_000usize)
} else {
None
};
let args = (
python_scan_function,
with_columns,
predicate,
n_rows,
batch_size,
);
callable.call1(args).map_err(to_compute_err)
}?;

// This isn't a generator, but a `DataFrame`.
// This is the pyarrow and the CuDF path.
if generator_init.getattr(intern!(py, "_df")).is_ok() {
let df = python_df_to_rust(py, generator_init)?;
return if let Some(pred) = &self.predicate {
let mask = pred.evaluate(&df, state)?;
df.filter(mask.bool()?)
} else {
Ok(df)
};
}

// This is the IO plugin path.
let generator = generator_init
.get_item(0)
.map_err(|_| polars_err!(ComputeError: "expected tuple got {}", generator_init))?;
let can_parse_predicate = generator_init
.get_item(1)
.map_err(|_| polars_err!(ComputeError: "expected tuple got {}", generator))?;
let can_parse_predicate = can_parse_predicate.extract::<bool>().map_err(
|_| polars_err!(ComputeError: "expected bool got {}", can_parse_predicate),
)?;

let mut chunks = vec![];
loop {
match generator.call_method0(intern!(py, "__next__")) {
Ok(out) => {
let mut df = python_df_to_rust(py, out)?;
if let (Some(pred), false) = (&self.predicate, can_parse_predicate) {
let mask = pred.evaluate(&df, state)?;
df = df.filter(mask.bool()?)?;
}
chunks.push(df)
},
Err(err) if err.matches(py, PyStopIteration::type_object_bound(py)) => break,
Err(err) => {
polars_bail!(ComputeError: "caught exception during execution of a Python source, exception: {}", err)
},
}
}
accumulate_dataframes_vertical(chunks)
})
}
}
47 changes: 46 additions & 1 deletion crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,52 @@ fn create_physical_plan_impl(
let logical_plan = lp_arena.take(root);
match logical_plan {
#[cfg(feature = "python")]
PythonScan { options, .. } => Ok(Box::new(executors::PythonScanExec { options })),
PythonScan { mut options } => {
let mut predicate_serialized = None;

let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
let phys_expr = || {
let mut state = ExpressionConversionState::new(true, state.expr_depth);
create_physical_expr(
e,
Context::Default,
expr_arena,
Some(&options.schema),
&mut state,
)
};

// Convert to a pyarrow eval string.
if matches!(options.python_source, PythonScanSource::Pyarrow) {
if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
e.node(),
expr_arena,
Default::default(),
) {
options.predicate = PythonPredicate::PyArrow(eval_str);
// We don't have to use a physical expression as pyarrow deals with the filter.
None
} else {
Some(phys_expr()?)
}
}
// Convert to physical expression for the case the reader cannot consume the predicate.
else {
let dsl_expr = e.to_expr(expr_arena);
predicate_serialized =
polars_plan::plans::python::predicate::serialize(&dsl_expr)?;

Some(phys_expr()?)
}
} else {
None
};
Ok(Box::new(executors::PythonScanExec {
options,
predicate,
predicate_serialized,
}))
},
Sink { payload, .. } => match payload {
SinkType::Memory => {
polars_bail!(InvalidOperation: "memory sink not supported in the standard engine")
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ rayon = { workspace = true }
recursive = { workspace = true }
regex = { workspace = true, optional = true }
serde = { workspace = true, features = ["rc"], optional = true }
serde_json = { workspace = true, optional = true }
smartstring = { workspace = true }
strum_macros = { workspace = true }

Expand Down
25 changes: 9 additions & 16 deletions crates/polars-plan/src/plans/anonymous_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,37 @@ pub trait AnonymousScan: Send + Sync {
/// Creates a DataFrame from the supplied function & scan options.
fn scan(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<DataFrame>;

/// Produce the next batch Polars can consume. Implement this method to get proper
/// streaming support.
fn next_batch(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<Option<DataFrame>> {
self.scan(scan_opts).map(Some)
}

/// function to supply the schema.
/// Allows for an optional infer schema argument for data sources with dynamic schemas
fn schema(&self, _infer_schema_length: Option<usize>) -> PolarsResult<SchemaRef> {
polars_bail!(ComputeError: "must supply either a schema or a schema function");
}
/// specify if the scan provider should allow predicate pushdowns
/// Specify if the scan provider should allow predicate pushdowns.
///
/// Defaults to `false`
fn allows_predicate_pushdown(&self) -> bool {
false
}
/// specify if the scan provider should allow projection pushdowns
/// Specify if the scan provider should allow projection pushdowns.
///
/// Defaults to `false`
fn allows_projection_pushdown(&self) -> bool {
false
}
/// specify if the scan provider should allow slice pushdowns
/// Specify if the scan provider should allow slice pushdowns.
///
/// Defaults to `false`
fn allows_slice_pushdown(&self) -> bool {
false
}
}

impl<F> AnonymousScan for F
where
F: Fn(AnonymousScanArgs) -> PolarsResult<DataFrame> + Send + Sync,
{
fn as_any(&self) -> &dyn Any {
unimplemented!()
}

fn scan(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<DataFrame> {
self(scan_opts)
}
}

impl Debug for dyn AnonymousScan {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "anonymous_scan")
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,7 @@ pub fn to_alp_impl(
}
},
#[cfg(feature = "python")]
DslPlan::PythonScan { options } => IR::PythonScan {
options,
predicate: None,
},
DslPlan::PythonScan { options } => IR::PythonScan { options },
DslPlan::Union { inputs, args } => {
let mut inputs = inputs
.into_iter()
Expand Down
Loading
Loading