Skip to content

Commit

Permalink
refactor(rust): Fix simple projection in streaming engine (#17871)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Jul 25, 2024
1 parent 3016c07 commit 5a108d4
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 10 deletions.
12 changes: 8 additions & 4 deletions crates/polars-stream/src/nodes/simple_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ use polars_core::schema::Schema;
use super::compute_node_prelude::*;

pub struct SimpleProjectionNode {
schema: Arc<Schema>,
columns: Vec<String>,
input_schema: Arc<Schema>,
}

impl SimpleProjectionNode {
pub fn new(schema: Arc<Schema>) -> Self {
Self { schema }
pub fn new(columns: Vec<String>, input_schema: Arc<Schema>) -> Self {
Self {
columns,
input_schema,
}
}
}

Expand Down Expand Up @@ -42,7 +46,7 @@ impl ComputeNode for SimpleProjectionNode {
while let Ok(morsel) = recv.recv().await {
let morsel = morsel.try_map(|df| {
// TODO: can this be unchecked?
df.select_with_schema(slf.schema.iter_names(), &slf.schema)
df.select_with_schema(&slf.columns, &slf.input_schema)
})?;

if send.send(morsel).await.is_err() {
Expand Down
14 changes: 11 additions & 3 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ pub fn lower_ir(
let ir_node = ir_arena.get(node);
match ir_node {
IR::SimpleProjection { input, columns } => {
let schema = columns.clone();
let input_ir_node = ir_arena.get(*input);
let input_schema = input_ir_node.schema(ir_arena).into_owned();
let columns = columns.iter_names().map(|s| s.to_string()).collect();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Ok(phys_sm.insert(PhysNode::SimpleProjection { input, schema }))
Ok(phys_sm.insert(PhysNode::SimpleProjection {
input,
columns,
input_schema,
}))
},

// TODO: split partially streamable selections to avoid fallback as much as possible.
Expand Down Expand Up @@ -131,6 +137,7 @@ pub fn lower_ir(
df,
output_schema,
filter,
schema: input_schema,
..
} => {
if let Some(filter) = filter {
Expand All @@ -144,7 +151,8 @@ pub fn lower_ir(
if let Some(schema) = output_schema {
phys_node = phys_sm.insert(PhysNode::SimpleProjection {
input: phys_node,
schema: schema.clone(),
input_schema: input_schema.clone(),
columns: schema.iter_names().map(|s| s.to_string()).collect(),
})
}

Expand Down
5 changes: 4 additions & 1 deletion crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ pub enum PhysNode {
extend_original: bool,
output_schema: Arc<Schema>,
},

Reduce {
input: PhysNodeKey,
exprs: Vec<ExprIR>,
input_schema: Arc<Schema>,
output_schema: Arc<Schema>,
},

StreamingSlice {
input: PhysNodeKey,
offset: usize,
Expand All @@ -53,7 +55,8 @@ pub enum PhysNode {

SimpleProjection {
input: PhysNodeKey,
schema: Arc<Schema>,
input_schema: Arc<Schema>,
columns: Vec<String>,
},

InMemorySink {
Expand Down
11 changes: 9 additions & 2 deletions crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,17 @@ fn to_graph_rec<'a>(
[input_key],
)
},
SimpleProjection { schema, input } => {
SimpleProjection {
input,
columns,
input_schema,
} => {
let input_key = to_graph_rec(*input, ctx)?;
ctx.graph.add_node(
nodes::simple_projection::SimpleProjectionNode::new(schema.clone()),
nodes::simple_projection::SimpleProjectionNode::new(
columns.clone(),
input_schema.clone(),
),
[input_key],
)
},
Expand Down

0 comments on commit 5a108d4

Please sign in to comment.