Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/barak1412/polars into fix_f…
Browse files Browse the repository at this point in the history
…ilter_struct_index
  • Loading branch information
barak1412 committed Sep 17, 2024
2 parents f2b2674 + 6561eba commit aa35c03
Show file tree
Hide file tree
Showing 22 changed files with 454 additions and 48 deletions.
23 changes: 14 additions & 9 deletions crates/polars-stream/src/nodes/in_memory_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ impl ComputeNode for InMemorySourceNode {
assert!(recv.is_empty());
assert!(send.len() == 1);

let exhausted = self
.source
.as_ref()
.map(|s| {
self.seq.load(Ordering::Relaxed) * self.morsel_size as u64 >= s.height() as u64
})
.unwrap_or(true);

// As a temporary hack for some nodes (like the FunctionIR::FastCount)
// node that rely on an empty input, always ensure we send at least one
// morsel.
// TODO: remove this hack.
let exhausted = if let Some(src) = &self.source {
let seq = self.seq.load(Ordering::Relaxed);
seq > 0 && seq * self.morsel_size as u64 >= src.height() as u64
} else {
true
};
if send[0] == PortState::Done || exhausted {
send[0] = PortState::Done;
self.source = None;
Expand Down Expand Up @@ -77,7 +79,10 @@ impl ComputeNode for InMemorySourceNode {
let seq = slf.seq.fetch_add(1, Ordering::Relaxed);
let offset = (seq as usize * slf.morsel_size) as i64;
let df = source.slice(offset, slf.morsel_size);
if df.is_empty() {

// TODO: remove this 'always sent at least one morsel'
// condition, see update_state.
if df.is_empty() && seq > 0 {
break;
}

Expand Down
64 changes: 64 additions & 0 deletions crates/polars-stream/src/nodes/input_independent_select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use polars_core::prelude::IntoColumn;

use super::compute_node_prelude::*;
use crate::expression::StreamExpr;
use crate::morsel::SourceToken;

pub struct InputIndependentSelectNode {
selectors: Vec<StreamExpr>,
done: bool,
}

impl InputIndependentSelectNode {
pub fn new(selectors: Vec<StreamExpr>) -> Self {
Self {
selectors,
done: false,
}
}
}

impl ComputeNode for InputIndependentSelectNode {
fn name(&self) -> &str {
"input_independent_select"
}

fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> {
assert!(recv.is_empty() && send.len() == 1);
send[0] = if send[0] == PortState::Done || self.done {
PortState::Done
} else {
PortState::Ready
};
Ok(())
}

fn spawn<'env, 's>(
&'env mut self,
scope: &'s TaskScope<'s, 'env>,
recv: &mut [Option<RecvPort<'_>>],
send: &mut [Option<SendPort<'_>>],
state: &'s ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
assert!(recv.is_empty() && send.len() == 1);
let mut sender = send[0].take().unwrap().serial();

join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
let empty_df = DataFrame::empty();
let mut selected = Vec::new();
for selector in self.selectors.iter() {
let s = selector.evaluate(&empty_df, state).await?;
selected.push(s.into_column());
}

let ret = DataFrame::new_with_broadcast(selected)?;
let seq = MorselSeq::default();
let source_token = SourceToken::new();
let morsel = Morsel::new(ret, seq, source_token);
sender.send(morsel).await.ok();
self.done = true;
Ok(())
}));
}
}
1 change: 1 addition & 0 deletions crates/polars-stream/src/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod filter;
pub mod in_memory_map;
pub mod in_memory_sink;
pub mod in_memory_source;
pub mod input_independent_select;
pub mod map;
pub mod multiplexer;
pub mod ordered_union;
Expand Down
7 changes: 7 additions & 0 deletions crates/polars-stream/src/physical_plan/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ fn visualize_plan_rec(
from_ref(input),
)
},
PhysNodeKind::InputIndependentSelect { selectors } => (
format!(
"input-independent-select\\n{}",
fmt_exprs(selectors, expr_arena)
),
&[][..],
),
PhysNodeKind::Reduce { input, exprs } => (
format!("reduce\\n{}", fmt_exprs(exprs, expr_arena)),
from_ref(input),
Expand Down
62 changes: 32 additions & 30 deletions crates/polars-stream/src/physical_plan/lower_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,26 +228,12 @@ fn build_input_independent_node_with_ctx(
exprs: &[ExprIR],
ctx: &mut LowerExprContext,
) -> PolarsResult<PhysNodeKey> {
let expr_depth_limit = get_expr_depth_limit()?;
let mut state = ExpressionConversionState::new(false, expr_depth_limit);
let empty = DataFrame::empty();
let execution_state = ExecutionState::new();
let columns = exprs
.iter()
.map(|expr| {
let phys_expr =
create_physical_expr(expr, Context::Default, ctx.expr_arena, None, &mut state)?;

phys_expr
.evaluate(&empty, &execution_state)
.map(Column::from)
})
.try_collect_vec()?;

let df = Arc::new(DataFrame::new_with_broadcast(columns)?);
let output_schema = compute_output_schema(&Schema::default(), exprs, ctx.expr_arena)?;
Ok(ctx.phys_sm.insert(PhysNode::new(
Arc::new(df.schema()),
PhysNodeKind::InMemorySource { df },
output_schema,
PhysNodeKind::InputIndependentSelect {
selectors: exprs.to_vec(),
},
)))
}

Expand Down Expand Up @@ -313,10 +299,17 @@ fn build_fallback_node_with_ctx(
) -> PolarsResult<PhysNodeKey> {
// Pre-select only the columns that are needed for this fallback expression.
let input_schema = &ctx.phys_sm[input].output_schema;
let select_names: PlHashSet<_> = exprs
let mut select_names: PlHashSet<_> = exprs
.iter()
.flat_map(|expr| polars_plan::utils::aexpr_to_leaf_names_iter(expr.node(), ctx.expr_arena))
.collect();
// To keep the length correct we have to ensure we select at least one
// column.
if select_names.is_empty() {
if let Some(name) = input_schema.iter_names().next() {
select_names.insert(name.clone());
}
}
let input_node = if input_schema
.iter_names()
.any(|name| !select_names.contains(name.as_str()))
Expand Down Expand Up @@ -653,29 +646,38 @@ fn lower_exprs_with_ctx(
Ok((zip_node, transformed_exprs))
}

/// Computes the schema that selecting the given expressions on the input node
/// Computes the schema that selecting the given expressions on the input schema
/// would result in.
fn schema_for_select(
input: PhysNodeKey,
fn compute_output_schema(
input_schema: &Schema,
exprs: &[ExprIR],
ctx: &mut LowerExprContext,
expr_arena: &Arena<AExpr>,
) -> PolarsResult<Arc<Schema>> {
let input_schema = &ctx.phys_sm[input].output_schema;
let output_schema: Schema = exprs
.iter()
.map(|e| {
let name = e.output_name().clone();
let dtype = ctx.expr_arena.get(e.node()).to_dtype(
input_schema,
Context::Default,
ctx.expr_arena,
)?;
let dtype =
expr_arena
.get(e.node())
.to_dtype(input_schema, Context::Default, expr_arena)?;
PolarsResult::Ok(Field::new(name, dtype))
})
.try_collect()?;
Ok(Arc::new(output_schema))
}

/// Computes the schema that selecting the given expressions on the input node
/// would result in.
fn schema_for_select(
input: PhysNodeKey,
exprs: &[ExprIR],
ctx: &mut LowerExprContext,
) -> PolarsResult<Arc<Schema>> {
let input_schema = &ctx.phys_sm[input].output_schema;
compute_output_schema(input_schema, exprs, ctx.expr_arena)
}

fn build_select_node_with_ctx(
input: PhysNodeKey,
exprs: &[ExprIR],
Expand Down
8 changes: 7 additions & 1 deletion crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ pub enum PhysNodeKind {
extend_original: bool,
},

InputIndependentSelect {
selectors: Vec<ExprIR>,
},

Reduce {
input: PhysNodeKey,
exprs: Vec<ExprIR>,
Expand Down Expand Up @@ -156,7 +160,9 @@ fn insert_multiplexers(

if !seen_before {
match &phys_sm[node].kind {
PhysNodeKind::InMemorySource { .. } | PhysNodeKind::FileScan { .. } => {},
PhysNodeKind::InMemorySource { .. }
| PhysNodeKind::FileScan { .. }
| PhysNodeKind::InputIndependentSelect { .. } => {},
PhysNodeKind::Select { input, .. }
| PhysNodeKind::Reduce { input, .. }
| PhysNodeKind::StreamingSlice { input, .. }
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ fn to_graph_rec<'a>(
[input_key],
)
},

InputIndependentSelect { selectors } => {
let phys_selectors = selectors
.iter()
.map(|selector| create_stream_expr(selector, ctx))
.collect::<PolarsResult<_>>()?;
ctx.graph.add_node(
nodes::input_independent_select::InputIndependentSelectNode::new(phys_selectors),
[],
)
},

Reduce { input, exprs } => {
let input_key = to_graph_rec(*input, ctx)?;
let input_schema = &ctx.phys_sm[*input].output_schema;
Expand Down
2 changes: 2 additions & 0 deletions docs/_build/API_REFERENCE_LINKS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ python:
LazyFrame: https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html
Series: https://docs.pola.rs/api/python/stable/reference/series/index.html
Categorical: https://docs.pola.rs/api/python/stable/reference/api/polars.Categorical.html
GPUEngine: https://docs.pola.rs/api/python/stable/lazyframe/api/polars.lazyframe.engine_config.GPUEngine.html
Config: https://docs.pola.rs/api/python/stable/reference/config.html
select: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.select.html
filter: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.filter.html
with_columns: https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.with_columns.html
Expand Down
17 changes: 17 additions & 0 deletions docs/_build/scripts/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,23 @@ def code_tab(


def define_env(env):
@env.macro
def code_header(language: str, section: str = [], api_functions: List[str] = []) -> str:
language_info = LANGUAGES[language]

language = language_info["code_name"]

# Create feature flags
feature_flags_links = create_feature_flag_links(language, api_functions)

# Create API Links if they are defined in the YAML
api_functions = [
link for f in api_functions if (link := create_api_function_link(language, f))
]
language_headers = " ·".join(api_functions + feature_flags_links)
return f"""=== \":fontawesome-brands-{language_info['icon_name']}: {language_info['display_name']}\"
{language_headers}"""

@env.macro
def code_block(
path: str, section: str = None, api_functions: List[str] = None
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Polars is a blazingly fast DataFrame library for manipulating structured data. T
- **Out of Core**: The streaming API allows you to process your results without requiring all your data to be in memory at the same time
- **Parallel**: Utilises the power of your machine by dividing the workload among the available CPU cores without any additional configuration.
- **Vectorized Query Engine**: Using [Apache Arrow](https://arrow.apache.org/), a columnar data format, to process your queries in a vectorized manner and SIMD to optimize CPU usage.
- **GPU Support**: Optionally run queries on NVIDIA GPUs for maximum performance for in-memory workloads.

<!-- dprint-ignore-start -->

Expand Down
46 changes: 46 additions & 0 deletions docs/src/python/user-guide/lazy/gpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# --8<-- [start:setup]
import polars as pl

df = pl.LazyFrame({"a": [1.242, 1.535]})

q = df.select(pl.col("a").round(1))
# --8<-- [end:setup]

# Avoiding requiring the GPU engine for doc build output
# --8<-- [start:simple-result]
result = q.collect()
print(result)
# --8<-- [end:simple-result]


# --8<-- [start:engine-setup]
q = df.select((pl.col("a") ** 4))

# --8<-- [end:engine-setup]

# --8<-- [start:engine-result]
result = q.collect()
print(result)
# --8<-- [end:engine-result]

# --8<-- [start:fallback-setup]
df = pl.LazyFrame(
{
"key": [1, 1, 1, 2, 3, 3, 2, 2],
"value": [1, 2, 3, 4, 5, 6, 7, 8],
}
)

q = df.select(pl.col("value").sum().over("key"))

# --8<-- [end:fallback-setup]

# --8<-- [start:fallback-result]
print(
"PerformanceWarning: Query execution with GPU not supported, reason: \n"
"<class 'NotImplementedError'>: Grouped rolling window not implemented"
)
print("# some details elided")
print()
print(q.collect())
# --8<- [end:fallback-result]
Loading

0 comments on commit aa35c03

Please sign in to comment.