diff --git a/crates/polars-stream/src/nodes/in_memory_source.rs b/crates/polars-stream/src/nodes/in_memory_source.rs index 45630eb7aab0..5ab6b0f75d50 100644 --- a/crates/polars-stream/src/nodes/in_memory_source.rs +++ b/crates/polars-stream/src/nodes/in_memory_source.rs @@ -38,14 +38,16 @@ impl ComputeNode for InMemorySourceNode { assert!(recv.is_empty()); assert!(send.len() == 1); - let exhausted = self - .source - .as_ref() - .map(|s| { - self.seq.load(Ordering::Relaxed) * self.morsel_size as u64 >= s.height() as u64 - }) - .unwrap_or(true); - + // As a temporary hack for some nodes (like the FunctionIR::FastCount) + // node that rely on an empty input, always ensure we send at least one + // morsel. + // TODO: remove this hack. + let exhausted = if let Some(src) = &self.source { + let seq = self.seq.load(Ordering::Relaxed); + seq > 0 && seq * self.morsel_size as u64 >= src.height() as u64 + } else { + true + }; if send[0] == PortState::Done || exhausted { send[0] = PortState::Done; self.source = None; @@ -77,7 +79,10 @@ impl ComputeNode for InMemorySourceNode { let seq = slf.seq.fetch_add(1, Ordering::Relaxed); let offset = (seq as usize * slf.morsel_size) as i64; let df = source.slice(offset, slf.morsel_size); - if df.is_empty() { + + // TODO: remove this 'always sent at least one morsel' + // condition, see update_state. + if df.is_empty() && seq > 0 { break; } diff --git a/crates/polars-stream/src/nodes/input_independent_select.rs b/crates/polars-stream/src/nodes/input_independent_select.rs new file mode 100644 index 000000000000..f1a9113d05d4 --- /dev/null +++ b/crates/polars-stream/src/nodes/input_independent_select.rs @@ -0,0 +1,64 @@ +use polars_core::prelude::IntoColumn; + +use super::compute_node_prelude::*; +use crate::expression::StreamExpr; +use crate::morsel::SourceToken; + +pub struct InputIndependentSelectNode { + selectors: Vec, + done: bool, +} + +impl InputIndependentSelectNode { + pub fn new(selectors: Vec) -> Self { + Self { + selectors, + done: false, + } + } +} + +impl ComputeNode for InputIndependentSelectNode { + fn name(&self) -> &str { + "input_independent_select" + } + + fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> { + assert!(recv.is_empty() && send.len() == 1); + send[0] = if send[0] == PortState::Done || self.done { + PortState::Done + } else { + PortState::Ready + }; + Ok(()) + } + + fn spawn<'env, 's>( + &'env mut self, + scope: &'s TaskScope<'s, 'env>, + recv: &mut [Option>], + send: &mut [Option>], + state: &'s ExecutionState, + join_handles: &mut Vec>>, + ) { + assert!(recv.is_empty() && send.len() == 1); + let mut sender = send[0].take().unwrap().serial(); + + join_handles.push(scope.spawn_task(TaskPriority::Low, async move { + let empty_df = DataFrame::empty(); + let mut selected = Vec::new(); + for selector in self.selectors.iter() { + let s = selector.evaluate(&empty_df, state).await?; + selected.push(s.into_column()); + } + + let ret = DataFrame::new_with_broadcast(selected)?; + let seq = MorselSeq::default(); + let source_token = SourceToken::new(); + let morsel = Morsel::new(ret, seq, source_token); + sender.send(morsel).await.ok(); + self.done = true; + Ok(()) + })); + } +} diff --git a/crates/polars-stream/src/nodes/mod.rs b/crates/polars-stream/src/nodes/mod.rs index 4c71380e0ad4..66e64626d307 100644 --- a/crates/polars-stream/src/nodes/mod.rs +++ b/crates/polars-stream/src/nodes/mod.rs @@ -2,6 +2,7 @@ pub mod filter; pub mod in_memory_map; pub mod in_memory_sink; pub mod in_memory_source; +pub mod input_independent_select; pub mod map; pub mod multiplexer; pub mod ordered_union; diff --git a/crates/polars-stream/src/physical_plan/fmt.rs b/crates/polars-stream/src/physical_plan/fmt.rs index 7d15337389a8..0ec15047f0a1 100644 --- a/crates/polars-stream/src/physical_plan/fmt.rs +++ b/crates/polars-stream/src/physical_plan/fmt.rs @@ -59,6 +59,13 @@ fn visualize_plan_rec( from_ref(input), ) }, + PhysNodeKind::InputIndependentSelect { selectors } => ( + format!( + "input-independent-select\\n{}", + fmt_exprs(selectors, expr_arena) + ), + &[][..], + ), PhysNodeKind::Reduce { input, exprs } => ( format!("reduce\\n{}", fmt_exprs(exprs, expr_arena)), from_ref(input), diff --git a/crates/polars-stream/src/physical_plan/lower_expr.rs b/crates/polars-stream/src/physical_plan/lower_expr.rs index 6c4126a9e779..8e891bd408af 100644 --- a/crates/polars-stream/src/physical_plan/lower_expr.rs +++ b/crates/polars-stream/src/physical_plan/lower_expr.rs @@ -228,26 +228,12 @@ fn build_input_independent_node_with_ctx( exprs: &[ExprIR], ctx: &mut LowerExprContext, ) -> PolarsResult { - let expr_depth_limit = get_expr_depth_limit()?; - let mut state = ExpressionConversionState::new(false, expr_depth_limit); - let empty = DataFrame::empty(); - let execution_state = ExecutionState::new(); - let columns = exprs - .iter() - .map(|expr| { - let phys_expr = - create_physical_expr(expr, Context::Default, ctx.expr_arena, None, &mut state)?; - - phys_expr - .evaluate(&empty, &execution_state) - .map(Column::from) - }) - .try_collect_vec()?; - - let df = Arc::new(DataFrame::new_with_broadcast(columns)?); + let output_schema = compute_output_schema(&Schema::default(), exprs, ctx.expr_arena)?; Ok(ctx.phys_sm.insert(PhysNode::new( - Arc::new(df.schema()), - PhysNodeKind::InMemorySource { df }, + output_schema, + PhysNodeKind::InputIndependentSelect { + selectors: exprs.to_vec(), + }, ))) } @@ -313,10 +299,17 @@ fn build_fallback_node_with_ctx( ) -> PolarsResult { // Pre-select only the columns that are needed for this fallback expression. let input_schema = &ctx.phys_sm[input].output_schema; - let select_names: PlHashSet<_> = exprs + let mut select_names: PlHashSet<_> = exprs .iter() .flat_map(|expr| polars_plan::utils::aexpr_to_leaf_names_iter(expr.node(), ctx.expr_arena)) .collect(); + // To keep the length correct we have to ensure we select at least one + // column. + if select_names.is_empty() { + if let Some(name) = input_schema.iter_names().next() { + select_names.insert(name.clone()); + } + } let input_node = if input_schema .iter_names() .any(|name| !select_names.contains(name.as_str())) @@ -653,29 +646,38 @@ fn lower_exprs_with_ctx( Ok((zip_node, transformed_exprs)) } -/// Computes the schema that selecting the given expressions on the input node +/// Computes the schema that selecting the given expressions on the input schema /// would result in. -fn schema_for_select( - input: PhysNodeKey, +fn compute_output_schema( + input_schema: &Schema, exprs: &[ExprIR], - ctx: &mut LowerExprContext, + expr_arena: &Arena, ) -> PolarsResult> { - let input_schema = &ctx.phys_sm[input].output_schema; let output_schema: Schema = exprs .iter() .map(|e| { let name = e.output_name().clone(); - let dtype = ctx.expr_arena.get(e.node()).to_dtype( - input_schema, - Context::Default, - ctx.expr_arena, - )?; + let dtype = + expr_arena + .get(e.node()) + .to_dtype(input_schema, Context::Default, expr_arena)?; PolarsResult::Ok(Field::new(name, dtype)) }) .try_collect()?; Ok(Arc::new(output_schema)) } +/// Computes the schema that selecting the given expressions on the input node +/// would result in. +fn schema_for_select( + input: PhysNodeKey, + exprs: &[ExprIR], + ctx: &mut LowerExprContext, +) -> PolarsResult> { + let input_schema = &ctx.phys_sm[input].output_schema; + compute_output_schema(input_schema, exprs, ctx.expr_arena) +} + fn build_select_node_with_ctx( input: PhysNodeKey, exprs: &[ExprIR], diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index e4ba35ce767e..d60b251c99c2 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -58,6 +58,10 @@ pub enum PhysNodeKind { extend_original: bool, }, + InputIndependentSelect { + selectors: Vec, + }, + Reduce { input: PhysNodeKey, exprs: Vec, @@ -156,7 +160,9 @@ fn insert_multiplexers( if !seen_before { match &phys_sm[node].kind { - PhysNodeKind::InMemorySource { .. } | PhysNodeKind::FileScan { .. } => {}, + PhysNodeKind::InMemorySource { .. } + | PhysNodeKind::FileScan { .. } + | PhysNodeKind::InputIndependentSelect { .. } => {}, PhysNodeKind::Select { input, .. } | PhysNodeKind::Reduce { input, .. } | PhysNodeKind::StreamingSlice { input, .. } diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index e5cbf86b0351..15373db78fbe 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -130,6 +130,18 @@ fn to_graph_rec<'a>( [input_key], ) }, + + InputIndependentSelect { selectors } => { + let phys_selectors = selectors + .iter() + .map(|selector| create_stream_expr(selector, ctx)) + .collect::>()?; + ctx.graph.add_node( + nodes::input_independent_select::InputIndependentSelectNode::new(phys_selectors), + [], + ) + }, + Reduce { input, exprs } => { let input_key = to_graph_rec(*input, ctx)?; let input_schema = &ctx.phys_sm[*input].output_schema; diff --git a/docs/_build/API_REFERENCE_LINKS.yml b/docs/_build/API_REFERENCE_LINKS.yml index eb868c5bab8c..e532a4923a4d 100644 --- a/docs/_build/API_REFERENCE_LINKS.yml +++ b/docs/_build/API_REFERENCE_LINKS.yml @@ -3,6 +3,8 @@ python: LazyFrame: https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html Series: https://docs.pola.rs/api/python/stable/reference/series/index.html Categorical: https://docs.pola.rs/api/python/stable/reference/api/polars.Categorical.html + GPUEngine: https://docs.pola.rs/api/python/stable/lazyframe/api/polars.lazyframe.engine_config.GPUEngine.html + Config: https://docs.pola.rs/api/python/stable/reference/config.html select: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.select.html filter: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.filter.html with_columns: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.with_columns.html diff --git a/docs/_build/scripts/macro.py b/docs/_build/scripts/macro.py index 5dad68348415..15fbf9e1ddc2 100644 --- a/docs/_build/scripts/macro.py +++ b/docs/_build/scripts/macro.py @@ -131,6 +131,23 @@ def code_tab( def define_env(env): + @env.macro + def code_header(language: str, section: str = [], api_functions: List[str] = []) -> str: + language_info = LANGUAGES[language] + + language = language_info["code_name"] + + # Create feature flags + feature_flags_links = create_feature_flag_links(language, api_functions) + + # Create API Links if they are defined in the YAML + api_functions = [ + link for f in api_functions if (link := create_api_function_link(language, f)) + ] + language_headers = " ·".join(api_functions + feature_flags_links) + return f"""=== \":fontawesome-brands-{language_info['icon_name']}: {language_info['display_name']}\" + {language_headers}""" + @env.macro def code_block( path: str, section: str = None, api_functions: List[str] = None diff --git a/docs/index.md b/docs/index.md index f4ae1d575621..e984a05f17e9 100644 --- a/docs/index.md +++ b/docs/index.md @@ -26,6 +26,7 @@ Polars is a blazingly fast DataFrame library for manipulating structured data. T - **Out of Core**: The streaming API allows you to process your results without requiring all your data to be in memory at the same time - **Parallel**: Utilises the power of your machine by dividing the workload among the available CPU cores without any additional configuration. - **Vectorized Query Engine**: Using [Apache Arrow](https://arrow.apache.org/), a columnar data format, to process your queries in a vectorized manner and SIMD to optimize CPU usage. +- **GPU Support**: Optionally run queries on NVIDIA GPUs for maximum performance for in-memory workloads. diff --git a/docs/src/python/user-guide/lazy/gpu.py b/docs/src/python/user-guide/lazy/gpu.py new file mode 100644 index 000000000000..21d246d09d76 --- /dev/null +++ b/docs/src/python/user-guide/lazy/gpu.py @@ -0,0 +1,46 @@ +# --8<-- [start:setup] +import polars as pl + +df = pl.LazyFrame({"a": [1.242, 1.535]}) + +q = df.select(pl.col("a").round(1)) +# --8<-- [end:setup] + +# Avoiding requiring the GPU engine for doc build output +# --8<-- [start:simple-result] +result = q.collect() +print(result) +# --8<-- [end:simple-result] + + +# --8<-- [start:engine-setup] +q = df.select((pl.col("a") ** 4)) + +# --8<-- [end:engine-setup] + +# --8<-- [start:engine-result] +result = q.collect() +print(result) +# --8<-- [end:engine-result] + +# --8<-- [start:fallback-setup] +df = pl.LazyFrame( + { + "key": [1, 1, 1, 2, 3, 3, 2, 2], + "value": [1, 2, 3, 4, 5, 6, 7, 8], + } +) + +q = df.select(pl.col("value").sum().over("key")) + +# --8<-- [end:fallback-setup] + +# --8<-- [start:fallback-result] +print( + "PerformanceWarning: Query execution with GPU not supported, reason: \n" + ": Grouped rolling window not implemented" +) +print("# some details elided") +print() +print(q.collect()) +# --8<- [end:fallback-result] diff --git a/docs/user-guide/gpu-support.md b/docs/user-guide/gpu-support.md new file mode 100644 index 000000000000..dec4169d0911 --- /dev/null +++ b/docs/user-guide/gpu-support.md @@ -0,0 +1,166 @@ +# GPU Support [Open Beta] + +Polars provides an in-memory, GPU-accelerated execution engine for Python users of the Lazy API on NVIDIA GPUs using [RAPIDS cuDF](https://docs.rapids.ai/api/cudf/stable/). This functionality is available in Open Beta and is undergoing rapid development. + +### System Requirements + +- NVIDIA Volta™ or higher GPU with [compute capability](https://developer.nvidia.com/cuda-gpus) 7.0+ +- CUDA 11 or CUDA 12 +- Linux or Windows Subsystem for Linux 2 (WSL2) + +See the [RAPIDS installation guide](https://docs.rapids.ai/install#system-req) for full details. + +### Installation + +You can install the GPU backend for Polars with a feature flag as part of a normal [installation](installation.md). + +=== ":fontawesome-brands-python: Python" +`bash pip install --extra-index-url=https://pypi.nvidia.com polars[gpu]` + +!!! note Installation on a CUDA 11 system + + If you have CUDA 11, the installation line is slightly more complicated: the relevant GPU package must be requested by hand. + + === ":fontawesome-brands-python: Python" + ```bash + pip install --extra-index-url=https://pypi.nvidia.com polars cudf-polars-cu11 + ``` + +### Usage + +Having built a query using the lazy API [as normal](lazy/index.md), GPU-enabled execution is requested by running `.collect(engine="gpu")` instead of `.collect()`. + +{{ code_header("python", [], []) }} + +```python +--8<-- "python/user-guide/lazy/gpu.py:setup" + +result = q.collect(engine="gpu") +print(result) +``` + +```python exec="on" result="text" session="user-guide/lazy" +--8<-- "python/user-guide/lazy/gpu.py:setup" +--8<-- "python/user-guide/lazy/gpu.py:simple-result" +``` + +For more detailed control over the execution, for example to specify which GPU to use on a multi-GPU node, we can provide a `GPUEngine` object. By default, the GPU engine will use a configuration applicable to most use cases. + +{{ code_header("python", [], []) }} + +```python +--8<-- "python/user-guide/lazy/gpu.py:engine-setup" +result = q.collect(engine=pl.GPUEngine(device=1)) +print(result) +``` + +```python exec="on" result="text" session="user-guide/lazy" +--8<-- "python/user-guide/lazy/gpu.py:engine-setup" +--8<-- "python/user-guide/lazy/gpu.py:engine-result" +``` + +### How It Works + +When you use the GPU-accelerated engine, Polars creates and optimizes a query plan and dispatches to a [RAPIDS](https://rapids.ai/) cuDF-based physical execution engine to compute the results on NVIDIA GPUs. The final result is returned as a normal CPU-backed Polars dataframe. + +### What's Supported on the GPU? + +GPU support is currently in Open Beta and the engine is undergoing rapid development. The engine currently supports many, but not all, of the core expressions and data types. + +Since expressions are composable, it's not feasible to list a full matrix of expressions supported on the GPU. Instead, we provide a list of the high-level categories of expressions and interfaces that are currently supported and not supported. + +#### Supported + +- LazyFrame API +- SQL API +- I/O from CSV, Parquet, ndjson, and in-memory CPU DataFrames. +- Operations on numeric, logical, string, and datetime types +- String processing +- Aggregations and grouped aggregations +- Joins +- Filters +- Missing data +- Concatenation + +#### Not Supported + +- Eager DataFrame API +- Streaming API +- Operations on categorical, struct, and list data types +- Rolling aggregations +- Time series resampling +- Timezones +- Folds +- User-defined functions +- JSON, Excel, and Database file formats + +#### Did my query use the GPU? + +The release of the GPU engine in Open Beta implies that we expect things to work well, but there are still some rough edges we're working on. In particular the full breadth of the Polars expression API is not yet supported. With fallback to the CPU, your query _should_ complete, but you might not observe any change in the time it takes to execute. There are two ways to get more information on whether the query ran on the GPU. + +When running in verbose mode, any queries that cannot execute on the GPU will issue a `PerformanceWarning`: + +{{ code_header("python", [], []) }} + +```python +--8<-- "python/user-guide/lazy/gpu.py:fallback-setup" + +with pl.Config() as cfg: + cfg.set_verbose(True) + result = q.collect(engine="gpu") + +print(result) +``` + +```python exec="on" result="text" session="user-guide/lazy" +--8<-- "python/user-guide/lazy/gpu.py:fallback-setup" +print( + "PerformanceWarning: Query execution with GPU not supported, reason: \n" + ": Grouped rolling window not implemented" +) +print("# some details elided") +print() +print(q.collect()) +``` + +To disable fallback, and have the GPU engine raise an exception if a query is unsupported, we can pass an appropriately configured `GPUEngine` object: + +{{ code_header("python", [], []) }} + +```python +q.collect(engine=pl.GPUEngine(raise_on_fail=True)) +``` + +```pytb +Traceback (most recent call last): + File "", line 1, in + File "/home/coder/third-party/polars/py-polars/polars/lazyframe/frame.py", line 2035, in collect + return wrap_df(ldf.collect(callback)) +polars.exceptions.ComputeError: 'cuda' conversion failed: NotImplementedError: Grouped rolling window not implemented +``` + +Currently, only the proximal cause of failure to execute on the GPU is reported, we plan to extend this functionality to report all unsupported operations for a query. + +### Testing + +The Polars and NVIDIA RAPIDS teams run comprehensive unit and integration tests to ensure that the GPU-accelerated Polars backend works smoothly. + +The **full** Polars test suite is run on every commit made to the GPU engine, ensuring consistency of results. + +The GPU engine currently passes 99.2% of the Polars unit tests with CPU fallback enabled. Without CPU fallback, the GPU engine passes 88.8% of the Polars unit tests. With fallback, there are approximately 100 failing tests: around 40 of these fail due to mismatching debug output; there are some cases where the GPU engine produces the a correct result but uses a different data type; the remainder are cases where we do not correctly determine that a query is unsupported and therefore fail at runtime, instead of falling back. + +### When Should I Use a GPU? + +Based on our benchmarking, you're most likely to observe speedups using the GPU engine when your workflow's profile is dominated by grouped aggregations and joins. In contrast I/O bound queries typically show similar performance on GPU and CPU. GPUs typically have less RAM than CPU systems, therefore very large datasets will fail due to out of memory errors. Based on our testing, raw datasets of 50-100 GiB fit (depending on the workflow) well with a GPU with 80GiB of memory. + +### CPU-GPU Interoperability + +Both the CPU and GPU engine use the Apache Arrow columnar memory specification, making it possible to quickly move data between the CPU and GPU. Additionally, files written by one engine can be read by the other engine. + +When using GPU mode, your workflow won't fail if something isn't supported. When you run `collect(engine="gpu")`, the optimized query plan is inspected to see whether it can be executed on the GPU. If it can't, it will transparently fall back to the standard Polars engine and run on the CPU. + +GPU execution is only available in the Lazy API, so materialized DataFrames will reside in CPU memory when the query execution finishes. + +### Providing feedback + +Please report issues, and missing features, on the Polars [issue tracker](../development/contributing/index.md). diff --git a/docs/user-guide/installation.md b/docs/user-guide/installation.md index 0642675dd6ab..791362403456 100644 --- a/docs/user-guide/installation.md +++ b/docs/user-guide/installation.md @@ -88,6 +88,19 @@ pip install 'polars[numpy,fsspec]' | --- | --------------------------------- | | all | Install all optional dependencies | +#### GPU + +| Tag | Description | +| --- | -------------------------- | +| gpu | Run queries on NVIDIA GPUs | + +!!! note + + To install the GPU engine, you need to pass + `--extra-index-url=https://pypi.nvidia.com` to `pip`. See [GPU + support](gpu-support.md) for more detailed instructions and + prerequisites. + #### Interop | Tag | Description | diff --git a/docs/user-guide/lazy/gpu.md b/docs/user-guide/lazy/gpu.md new file mode 100644 index 000000000000..97529eb28f55 --- /dev/null +++ b/docs/user-guide/lazy/gpu.md @@ -0,0 +1,21 @@ +# GPU Support + +Polars provides an in-memory, GPU-accelerated execution engine for the Lazy API in Python using [RAPIDS cuDF](https://docs.rapids.ai/api/cudf/stable/) on NVIDIA GPUs. This functionality is available in Open Beta and is undergoing rapid development. + +If you install Polars with the [GPU feature flag](../installation.md), you can trigger GPU-based execution by running `.collect(engine="gpu")` instead of `.collect()`. + +{{ code_header("python", [], []) }} + +```python +--8<-- "python/user-guide/lazy/gpu.py:setup" + +result = q.collect(engine="gpu") +print(result) +``` + +```python exec="on" result="text" session="user-guide/lazy" +--8<-- "python/user-guide/lazy/gpu.py:setup" +--8<-- "python/user-guide/lazy/gpu.py:simple-result" +``` + +Learn more in the [GPU Support guide](../gpu-support.md). diff --git a/docs/user-guide/lazy/index.md b/docs/user-guide/lazy/index.md index be731390f09c..bbf50fb34e11 100644 --- a/docs/user-guide/lazy/index.md +++ b/docs/user-guide/lazy/index.md @@ -8,3 +8,4 @@ The Lazy chapter is a guide for working with `LazyFrames`. It covers the functio - [Query plan](query-plan.md) - [Execution](execution.md) - [Streaming](streaming.md) +- [GPU Support](gpu.md) diff --git a/docs/user-guide/misc/multiprocessing.md b/docs/user-guide/misc/multiprocessing.md index d46a96a52bc5..6a10f8d61443 100644 --- a/docs/user-guide/misc/multiprocessing.md +++ b/docs/user-guide/misc/multiprocessing.md @@ -11,6 +11,8 @@ It does this by executing computations which can be done in parallel in separate For example, requesting two expressions in a `select` statement can be done in parallel, with the results only being combined at the end. Another example is aggregating a value within groups using `group_by().agg()`, each group can be evaluated separately. It is very unlikely that the `multiprocessing` module can improve your code performance in these cases. +If you're using the GPU Engine with Polars you should also avoid manual multiprocessing. When used simultaneously, they can compete +for system memory and processing power, leading to reduced performance. See [the optimizations section](../lazy/optimizations.md) for more optimizations. diff --git a/mkdocs.yml b/mkdocs.yml index ed93b9badcfe..a29cd1020af2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -58,6 +58,7 @@ nav: - user-guide/lazy/query-plan.md - user-guide/lazy/execution.md - user-guide/lazy/streaming.md + - user-guide/lazy/gpu.md - IO: - user-guide/io/index.md - user-guide/io/csv.md @@ -86,6 +87,7 @@ nav: - user-guide/misc/styling.md - user-guide/misc/comparison.md - user-guide/misc/arrow.md + - user-guide/gpu-support.md - API reference: api/index.md diff --git a/py-polars/docs/source/reference/lazyframe/gpu_engine.rst b/py-polars/docs/source/reference/lazyframe/gpu_engine.rst new file mode 100644 index 000000000000..3372f72566bf --- /dev/null +++ b/py-polars/docs/source/reference/lazyframe/gpu_engine.rst @@ -0,0 +1,14 @@ +========= +GPUEngine +========= + +This object provides fine-grained control over the behavior of the +GPU engine when calling `LazyFrame.collect()` with an `engine` +argument. + +.. currentmodule:: polars.lazyframe.engine_config + +.. autosummary:: + :toctree: api/ + + GPUEngine diff --git a/py-polars/docs/source/reference/lazyframe/index.rst b/py-polars/docs/source/reference/lazyframe/index.rst index fe0ea51e60ee..ab37eb776fb0 100644 --- a/py-polars/docs/source/reference/lazyframe/index.rst +++ b/py-polars/docs/source/reference/lazyframe/index.rst @@ -15,6 +15,7 @@ This page gives an overview of all public LazyFrame methods. modify_select miscellaneous in_process + gpu_engine .. _lazyframe: diff --git a/py-polars/polars/lazyframe/engine_config.py b/py-polars/polars/lazyframe/engine_config.py index ee6c2f8b7941..1dcad0ba611c 100644 --- a/py-polars/polars/lazyframe/engine_config.py +++ b/py-polars/polars/lazyframe/engine_config.py @@ -14,17 +14,35 @@ class GPUEngine: Use this if you want control over details of the execution. - Supported options + Parameters + ---------- + device : int, default None + Select the GPU used to run the query. If not provided, the + query uses the current CUDA device. + memory_resource : rmm.mr.DeviceMemoryResource, default None + Provide a memory resource for GPU memory allocations. + + .. warning:: + If passing a `memory_resource`, you must ensure that it is valid + for the selected `device`. See the `RMM documentation + `_ + for more details. + + raise_on_fail : bool, default False + If True, do not fall back to the Polars CPU engine if the GPU + engine cannot execute the query, but instead raise an error. - - `device`: Select the device to run the query on. - - `memory_resource`: Set an RMM memory resource for - device-side allocations. """ device: int | None """Device on which to run query.""" memory_resource: DeviceMemoryResource | None """Memory resource to use for device allocations.""" + raise_on_fail: bool + """ + Whether unsupported queries should raise an error, rather than falling + back to the CPU engine. + """ config: Mapping[str, Any] """Additional configuration options for the engine.""" @@ -33,8 +51,11 @@ def __init__( *, device: int | None = None, memory_resource: Any | None = None, + raise_on_fail: bool = False, **kwargs: Any, ) -> None: self.device = device self.memory_resource = memory_resource + # Avoids need for changes in cudf-polars + kwargs["raise_on_fail"] = raise_on_fail self.config = kwargs diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index f1db322e4460..dc39ab3c198c 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -1865,7 +1865,7 @@ def collect( polars CPU engine. If set to `"gpu"`, the GPU engine is used. Fine-grained control over the GPU engine, for example which device to use on a system with multiple - devices, is possible by providing a :class:`GPUEngine` object + devices, is possible by providing a :class:`~.GPUEngine` object with configuration options. .. note:: @@ -2019,9 +2019,10 @@ def collect( "cudf_polars", err_prefix="GPU engine requested, but required package", install_message=( - "Please install using the command `pip install cudf-polars-cu12` " - "(or `pip install cudf-polars-cu11` if your system has a " - "CUDA 11 driver)." + "Please install using the command " + "`pip install --extra-index-url=https://pypi.nvidia.com cudf-polars-cu12` " + "(or `pip install --extra-index-url=https://pypi.nvidia.com cudf-polars-cu11` " + "if your system has a CUDA 11 driver)." ), ) if not is_config_obj: diff --git a/py-polars/tests/unit/streaming/test_streaming_io.py b/py-polars/tests/unit/streaming/test_streaming_io.py index 0cbf0d90e4ba..c304686f89d3 100644 --- a/py-polars/tests/unit/streaming/test_streaming_io.py +++ b/py-polars/tests/unit/streaming/test_streaming_io.py @@ -297,6 +297,7 @@ def test_streaming_empty_parquet_16523(tmp_path: Path) -> None: assert q.join(q2, on="a").collect(streaming=True).shape == (0, 1) +@pytest.mark.may_fail_auto_streaming @pytest.mark.parametrize( "method", ["parquet", "csv"],