Skip to content

Commit

Permalink
feat(python): IO plugins (#17939)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jul 30, 2024
1 parent fae85ff commit dea0679
Show file tree
Hide file tree
Showing 31 changed files with 541 additions and 242 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion crates/polars-lazy/src/frame/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ impl LazyFrame {
pub fn scan_from_python_function(schema: Schema, scan_fn: PyObject, pyarrow: bool) -> Self {
DslPlan::PythonScan {
options: PythonOptions {
// Should be a python function that returns a generator
scan_fn: Some(scan_fn.into()),
schema: Arc::new(schema),
pyarrow,
python_source: if pyarrow {
PythonScanSource::Pyarrow
} else {
PythonScanSource::IOPlugin
},
..Default::default()
},
}
Expand Down
15 changes: 0 additions & 15 deletions crates/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,21 +649,6 @@ fn scan_predicate_on_set_null_values() -> PolarsResult<()> {
Ok(())
}

#[test]
fn scan_anonymous_fn() -> PolarsResult<()> {
let function = Arc::new(|_scan_opts: AnonymousScanArgs| Ok(fruits_cars()));

let args = ScanArgsAnonymous {
schema: Some(Arc::new(fruits_cars().schema())),
..ScanArgsAnonymous::default()
};

let df = LazyFrame::anonymous_scan(function, args)?.collect()?;

assert_eq!(df.shape(), (5, 4));
Ok(())
}

#[test]
fn scan_anonymous_fn_with_options() -> PolarsResult<()> {
struct MyScan {}
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-mem-engine/src/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ mod join;
mod projection;
mod projection_simple;
mod projection_utils;
#[cfg(feature = "python")]
mod python_scan;
mod scan;
mod slice;
mod sort;
Expand Down Expand Up @@ -43,8 +41,6 @@ pub(super) use self::hconcat::*;
pub(super) use self::join::*;
pub(super) use self::projection::*;
pub(super) use self::projection_simple::*;
#[cfg(feature = "python")]
pub(super) use self::python_scan::*;
pub(super) use self::scan::*;
pub(super) use self::slice::*;
pub(super) use self::sort::*;
Expand Down
53 changes: 0 additions & 53 deletions crates/polars-mem-engine/src/executors/python_scan.rs

This file was deleted.

4 changes: 4 additions & 0 deletions crates/polars-mem-engine/src/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mod ipc;
mod ndjson;
#[cfg(feature = "parquet")]
mod parquet;
#[cfg(feature = "python")]
mod python_scan;

use std::mem;

Expand All @@ -23,6 +25,8 @@ use polars_io::predicates::PhysicalIoExpr;
use polars_io::prelude::*;
use polars_plan::global::_set_n_rows_for_scan;

#[cfg(feature = "python")]
pub(crate) use self::python_scan::*;
use super::*;
use crate::prelude::*;

Expand Down
134 changes: 134 additions & 0 deletions crates/polars-mem-engine/src/executors/scan/python_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use polars_core::error::to_compute_err;
use polars_core::utils::accumulate_dataframes_vertical;
use pyo3::exceptions::PyStopIteration;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::{intern, PyTypeInfo};

use super::*;

pub(crate) struct PythonScanExec {
pub(crate) options: PythonOptions,
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
pub(crate) predicate_serialized: Option<Vec<u8>>,
}

fn python_df_to_rust(py: Python, df: Bound<PyAny>) -> PolarsResult<DataFrame> {
let err = |_| polars_err!(ComputeError: "expected a polars.DataFrame; got {}", df);
let pydf = df.getattr(intern!(py, "_df")).map_err(err)?;
let raw_parts = pydf
.call_method0(intern!(py, "into_raw_parts"))
.map_err(err)?;
let raw_parts = raw_parts.extract::<(usize, usize, usize)>().unwrap();

let (ptr, len, cap) = raw_parts;
unsafe {
Ok(DataFrame::new_no_checks(Vec::from_raw_parts(
ptr as *mut Series,
len,
cap,
)))
}
}

impl Executor for PythonScanExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
state.should_stop()?;
#[cfg(debug_assertions)]
{
if state.verbose() {
eprintln!("run PythonScanExec")
}
}
let with_columns = self.options.with_columns.take();
let n_rows = self.options.n_rows.take();
Python::with_gil(|py| {
let pl = PyModule::import_bound(py, intern!(py, "polars")).unwrap();
let utils = pl.getattr(intern!(py, "_utils")).unwrap();
let callable = utils.getattr(intern!(py, "_execute_from_rust")).unwrap();

let python_scan_function = self.options.scan_fn.take().unwrap().0;

let with_columns = with_columns.map(|cols| cols.iter().cloned().collect::<Vec<_>>());

let predicate = match &self.options.predicate {
PythonPredicate::PyArrow(s) => s.into_py(py),
PythonPredicate::None => None::<()>.into_py(py),
PythonPredicate::Polars(_) => {
assert!(self.predicate.is_some(), "should be set");

match &self.predicate_serialized {
None => None::<()>.into_py(py),
Some(buf) => PyBytes::new_bound(py, buf).to_object(py),
}
},
};

let generator_init = if matches!(
self.options.python_source,
PythonScanSource::Pyarrow | PythonScanSource::Cuda
) {
let args = (python_scan_function, with_columns, predicate, n_rows);
callable.call1(args).map_err(to_compute_err)
} else {
// If there are filters, take smaller chunks to ensure we can keep memory
// pressure low.
let batch_size = if self.predicate.is_some() {
Some(100_000usize)
} else {
None
};
let args = (
python_scan_function,
with_columns,
predicate,
n_rows,
batch_size,
);
callable.call1(args).map_err(to_compute_err)
}?;

// This isn't a generator, but a `DataFrame`.
// This is the pyarrow and the CuDF path.
if generator_init.getattr(intern!(py, "_df")).is_ok() {
let df = python_df_to_rust(py, generator_init)?;
return if let Some(pred) = &self.predicate {
let mask = pred.evaluate(&df, state)?;
df.filter(mask.bool()?)
} else {
Ok(df)
};
}

// This is the IO plugin path.
let generator = generator_init
.get_item(0)
.map_err(|_| polars_err!(ComputeError: "expected tuple got {}", generator_init))?;
let can_parse_predicate = generator_init
.get_item(1)
.map_err(|_| polars_err!(ComputeError: "expected tuple got {}", generator))?;
let can_parse_predicate = can_parse_predicate.extract::<bool>().map_err(
|_| polars_err!(ComputeError: "expected bool got {}", can_parse_predicate),
)?;

let mut chunks = vec![];
loop {
match generator.call_method0(intern!(py, "__next__")) {
Ok(out) => {
let mut df = python_df_to_rust(py, out)?;
if let (Some(pred), false) = (&self.predicate, can_parse_predicate) {
let mask = pred.evaluate(&df, state)?;
df = df.filter(mask.bool()?)?;
}
chunks.push(df)
},
Err(err) if err.matches(py, PyStopIteration::type_object_bound(py)) => break,
Err(err) => {
polars_bail!(ComputeError: "caught exception during execution of a Python source, exception: {}", err)
},
}
}
accumulate_dataframes_vertical(chunks)
})
}
}
47 changes: 46 additions & 1 deletion crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,52 @@ fn create_physical_plan_impl(
let logical_plan = lp_arena.take(root);
match logical_plan {
#[cfg(feature = "python")]
PythonScan { options, .. } => Ok(Box::new(executors::PythonScanExec { options })),
PythonScan { mut options } => {
let mut predicate_serialized = None;

let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
let phys_expr = || {
let mut state = ExpressionConversionState::new(true, state.expr_depth);
create_physical_expr(
e,
Context::Default,
expr_arena,
Some(&options.schema),
&mut state,
)
};

// Convert to a pyarrow eval string.
if matches!(options.python_source, PythonScanSource::Pyarrow) {
if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
e.node(),
expr_arena,
Default::default(),
) {
options.predicate = PythonPredicate::PyArrow(eval_str);
// We don't have to use a physical expression as pyarrow deals with the filter.
None
} else {
Some(phys_expr()?)
}
}
// Convert to physical expression for the case the reader cannot consume the predicate.
else {
let dsl_expr = e.to_expr(expr_arena);
predicate_serialized =
polars_plan::plans::python::predicate::serialize(&dsl_expr)?;

Some(phys_expr()?)
}
} else {
None
};
Ok(Box::new(executors::PythonScanExec {
options,
predicate,
predicate_serialized,
}))
},
Sink { payload, .. } => match payload {
SinkType::Memory => {
polars_bail!(InvalidOperation: "memory sink not supported in the standard engine")
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ rayon = { workspace = true }
recursive = { workspace = true }
regex = { workspace = true, optional = true }
serde = { workspace = true, features = ["rc"], optional = true }
serde_json = { workspace = true, optional = true }
smartstring = { workspace = true }
strum_macros = { workspace = true }

Expand Down
25 changes: 9 additions & 16 deletions crates/polars-plan/src/plans/anonymous_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,37 @@ pub trait AnonymousScan: Send + Sync {
/// Creates a DataFrame from the supplied function & scan options.
fn scan(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<DataFrame>;

/// Produce the next batch Polars can consume. Implement this method to get proper
/// streaming support.
fn next_batch(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<Option<DataFrame>> {
self.scan(scan_opts).map(Some)
}

/// function to supply the schema.
/// Allows for an optional infer schema argument for data sources with dynamic schemas
fn schema(&self, _infer_schema_length: Option<usize>) -> PolarsResult<SchemaRef> {
polars_bail!(ComputeError: "must supply either a schema or a schema function");
}
/// specify if the scan provider should allow predicate pushdowns
/// Specify if the scan provider should allow predicate pushdowns.
///
/// Defaults to `false`
fn allows_predicate_pushdown(&self) -> bool {
false
}
/// specify if the scan provider should allow projection pushdowns
/// Specify if the scan provider should allow projection pushdowns.
///
/// Defaults to `false`
fn allows_projection_pushdown(&self) -> bool {
false
}
/// specify if the scan provider should allow slice pushdowns
/// Specify if the scan provider should allow slice pushdowns.
///
/// Defaults to `false`
fn allows_slice_pushdown(&self) -> bool {
false
}
}

impl<F> AnonymousScan for F
where
F: Fn(AnonymousScanArgs) -> PolarsResult<DataFrame> + Send + Sync,
{
fn as_any(&self) -> &dyn Any {
unimplemented!()
}

fn scan(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<DataFrame> {
self(scan_opts)
}
}

impl Debug for dyn AnonymousScan {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "anonymous_scan")
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,7 @@ pub fn to_alp_impl(
}
},
#[cfg(feature = "python")]
DslPlan::PythonScan { options } => IR::PythonScan {
options,
predicate: None,
},
DslPlan::PythonScan { options } => IR::PythonScan { options },
DslPlan::Union { inputs, args } => {
let mut inputs = inputs
.into_iter()
Expand Down
Loading

0 comments on commit dea0679

Please sign in to comment.