diff --git a/crates/polars-expr/src/expressions/mod.rs b/crates/polars-expr/src/expressions/mod.rs index 849f82c1ce83..1271dfa9de0a 100644 --- a/crates/polars-expr/src/expressions/mod.rs +++ b/crates/polars-expr/src/expressions/mod.rs @@ -42,6 +42,7 @@ pub(crate) use slice::*; pub(crate) use sort::*; pub(crate) use sortby::*; pub(crate) use ternary::*; +pub use window::window_function_format_order_by; pub(crate) use window::*; use crate::state::ExecutionState; diff --git a/crates/polars-expr/src/expressions/sort.rs b/crates/polars-expr/src/expressions/sort.rs index 1e729e0ff701..a7f2a94ae344 100644 --- a/crates/polars-expr/src/expressions/sort.rs +++ b/crates/polars-expr/src/expressions/sort.rs @@ -2,6 +2,7 @@ use polars_core::prelude::*; use polars_core::POOL; use polars_ops::chunked_array::ListNameSpaceImpl; use polars_utils::idx_vec::IdxVec; +use polars_utils::slice::GetSaferUnchecked; use rayon::prelude::*; use super::*; @@ -29,10 +30,7 @@ pub(crate) fn map_sorted_indices_to_group_idx(sorted_idx: &IdxCa, idx: &[IdxSize .cont_slice() .unwrap() .iter() - .map(|&i| { - debug_assert!(idx.get(i as usize).is_some()); - unsafe { *idx.get_unchecked(i as usize) } - }) + .map(|&i| unsafe { *idx.get_unchecked_release(i as usize) }) .collect() } diff --git a/crates/polars-expr/src/expressions/sortby.rs b/crates/polars-expr/src/expressions/sortby.rs index e51e4722dac3..d2f072e1dedf 100644 --- a/crates/polars-expr/src/expressions/sortby.rs +++ b/crates/polars-expr/src/expressions/sortby.rs @@ -53,32 +53,42 @@ fn check_groups(a: &GroupsProxy, b: &GroupsProxy) -> PolarsResult<()> { Ok(()) } +pub(super) fn update_groups_sort_by( + groups: &GroupsProxy, + sort_by_s: &Series, + options: &SortOptions, +) -> PolarsResult { + let groups = groups + .par_iter() + .map(|indicator| sort_by_groups_single_by(indicator, sort_by_s, options)) + .collect::>()?; + + Ok(GroupsProxy::Idx(groups)) +} + fn sort_by_groups_single_by( indicator: GroupsIndicator, sort_by_s: &Series, - descending: &[bool], + options: &SortOptions, ) -> PolarsResult<(IdxSize, IdxVec)> { + let options = SortOptions { + descending: options.descending, + nulls_last: options.nulls_last, + // We are already in par iter. + multithreaded: false, + ..Default::default() + }; let new_idx = match indicator { GroupsIndicator::Idx((_, idx)) => { // SAFETY: group tuples are always in bounds. let group = unsafe { sort_by_s.take_slice_unchecked(idx) }; - let sorted_idx = group.arg_sort(SortOptions { - descending: descending[0], - // We are already in par iter. - multithreaded: false, - ..Default::default() - }); + let sorted_idx = group.arg_sort(options); map_sorted_indices_to_group_idx(&sorted_idx, idx) }, GroupsIndicator::Slice([first, len]) => { let group = sort_by_s.slice(first as i64, len as usize); - let sorted_idx = group.arg_sort(SortOptions { - descending: descending[0], - // We are already in par iter. - multithreaded: false, - ..Default::default() - }); + let sorted_idx = group.arg_sort(options); map_sorted_indices_to_group_slice(&sorted_idx, first) }, }; @@ -283,17 +293,19 @@ impl PhysicalExpr for SortByExpr { let (check, groups) = POOL.join( || check_groups(groups, ac_in.groups()), || { - groups - .par_iter() - .map(|indicator| { - sort_by_groups_single_by(indicator, &sort_by_s, &descending) - }) - .collect::>() + update_groups_sort_by( + groups, + &sort_by_s, + &SortOptions { + descending: descending[0], + ..Default::default() + }, + ) }, ); check?; - GroupsProxy::Idx(groups?) + groups? } else { let groups = ac_sort_by[0].groups(); diff --git a/crates/polars-expr/src/expressions/window.rs b/crates/polars-expr/src/expressions/window.rs index 753063038cb9..5a0169752c79 100644 --- a/crates/polars-expr/src/expressions/window.rs +++ b/crates/polars-expr/src/expressions/window.rs @@ -21,6 +21,7 @@ pub struct WindowExpr { /// the root column that the Function will be applied on. /// This will be used to create a smaller DataFrame to prevent taking unneeded columns by index pub(crate) group_by: Vec>, + pub(crate) order_by: Option<(Arc, SortOptions)>, pub(crate) apply_columns: Vec>, pub(crate) out_name: Option>, /// A function Expr. i.e. Mean, Median, Max, etc. @@ -366,6 +367,11 @@ impl WindowExpr { } } +// Utility to create partitions and cache keys +pub fn window_function_format_order_by(to: &mut String, e: &Expr, k: &SortOptions) { + write!(to, "_PL_{:?}{}_{}", e, k.descending, k.nulls_last).unwrap(); +} + impl PhysicalExpr for WindowExpr { // Note: this was first implemented with expression evaluation but this performed really bad. // Therefore we choose the group_by -> apply -> self join approach @@ -439,7 +445,15 @@ impl PhysicalExpr for WindowExpr { let create_groups = || { let gb = df.group_by_with_series(group_by_columns.clone(), true, sort_groups)?; - let out: PolarsResult = Ok(gb.take_groups()); + let mut groups = gb.take_groups(); + + if let Some((order_by, options)) = &self.order_by { + let order_by = order_by.evaluate(df, state)?; + polars_ensure!(order_by.len() == df.height(), ShapeMismatch: "the order by expression evaluated to a length: {} that doesn't match the input DataFrame: {}", order_by.len(), df.height()); + groups = update_groups_sort_by(&groups, &order_by, options)? + } + + let out: PolarsResult = Ok(groups); out }; @@ -450,6 +464,15 @@ impl PhysicalExpr for WindowExpr { for s in &group_by_columns { cache_key.push_str(s.name()); } + if let Some((e, options)) = &self.order_by { + let e = match e.as_expression() { + Some(e) => e, + None => { + polars_bail!(InvalidOperation: "cannot order by this expression in window function") + }, + }; + window_function_format_order_by(&mut cache_key, e, options) + } let mut gt_map_guard = state.group_tuples.write().unwrap(); // we run sequential and partitioned diff --git a/crates/polars-expr/src/planner.rs b/crates/polars-expr/src/planner.rs index 300006432de6..9a993a9ef740 100644 --- a/crates/polars-expr/src/planner.rs +++ b/crates/polars-expr/src/planner.rs @@ -197,6 +197,7 @@ fn create_physical_expr_inner( Window { mut function, partition_by, + order_by, options, } => { state.set_window(); @@ -208,6 +209,21 @@ fn create_physical_expr_inner( state, )?; + let order_by = order_by + .map(|(node, options)| { + PolarsResult::Ok(( + create_physical_expr_inner( + node, + Context::Aggregation, + expr_arena, + schema, + state, + )?, + options, + )) + }) + .transpose()?; + let mut out_name = None; if let Alias(expr, name) = expr_arena.get(function) { function = *expr; @@ -250,6 +266,7 @@ fn create_physical_expr_inner( Ok(Arc::new(WindowExpr { group_by, + order_by, apply_columns, out_name, function: function_expr, diff --git a/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs b/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs index 3b3c4c2ca5a3..321e32521e09 100644 --- a/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs +++ b/crates/polars-lazy/src/physical_plan/executors/projection_utils.rs @@ -1,4 +1,3 @@ -use polars_utils::format_smartstring; use polars_utils::iter::EnumerateIdxTrait; use smartstring::alias::String as SmartString; @@ -51,7 +50,7 @@ fn rolling_evaluate( fn window_evaluate( df: &DataFrame, state: &ExecutionState, - window: PlHashMap>, + window: PlHashMap>, ) -> PolarsResult>> { POOL.install(|| { window @@ -111,7 +110,7 @@ fn execute_projection_cached_window_fns( #[allow(clippy::type_complexity)] // String: partition_name, // u32: index, - let mut windows: PlHashMap> = PlHashMap::default(); + let mut windows: PlHashMap> = PlHashMap::default(); #[cfg(feature = "dynamic_group_by")] let mut rolling: PlHashMap<&RollingGroupOptions, Vec> = PlHashMap::default(); let mut other = Vec::with_capacity(exprs.len()); @@ -126,13 +125,21 @@ fn execute_projection_cached_window_fns( if let Expr::Window { partition_by, options, + order_by, .. } = e { let entry = match options { WindowType::Over(_) => { - let group_by = format_smartstring!("{:?}", partition_by.as_slice()); - windows.entry(group_by).or_insert_with(Vec::new) + let mut key = format!("{:?}", partition_by.as_slice()); + if let Some((e, k)) = order_by { + polars_expr::prelude::window_function_format_order_by( + &mut key, + e.as_ref(), + k, + ) + } + windows.entry(key).or_insert_with(Vec::new) }, #[cfg(feature = "dynamic_group_by")] WindowType::Rolling(options) => rolling.entry(options).or_insert_with(Vec::new), diff --git a/crates/polars-lazy/src/tests/queries.rs b/crates/polars-lazy/src/tests/queries.rs index 87e54ef25802..405eef3a97a5 100644 --- a/crates/polars-lazy/src/tests/queries.rs +++ b/crates/polars-lazy/src/tests/queries.rs @@ -1143,9 +1143,11 @@ fn test_fill_forward() -> PolarsResult<()> { let out = df .lazy() - .select([col("b") - .forward_fill(None) - .over_with_options([col("a")], WindowMapping::Join)]) + .select([col("b").forward_fill(None).over_with_options( + [col("a")], + None, + WindowMapping::Join, + )]) .collect()?; let agg = out.column("b")?.list()?; @@ -1305,7 +1307,7 @@ fn test_filter_after_shift_in_groups() -> PolarsResult<()> { col("B") .shift(lit(1)) .filter(col("B").shift(lit(1)).gt(lit(4))) - .over_with_options([col("fruits")], WindowMapping::Join) + .over_with_options([col("fruits")], None, WindowMapping::Join) .alias("filtered"), ]) .collect()?; @@ -1664,7 +1666,7 @@ fn test_single_ranked_group() -> PolarsResult<()> { }, None, ) - .over_with_options([col("group")], WindowMapping::Join)]) + .over_with_options([col("group")], None, WindowMapping::Join)]) .collect()?; let out = out.column("value")?.explode()?; diff --git a/crates/polars-plan/src/dsl/expr.rs b/crates/polars-plan/src/dsl/expr.rs index 08688fcd5aba..b9407a19bdbb 100644 --- a/crates/polars-plan/src/dsl/expr.rs +++ b/crates/polars-plan/src/dsl/expr.rs @@ -117,11 +117,12 @@ pub enum Expr { input: Arc, by: Arc, }, - /// See postgres window functions + /// Polars flavored window functions. Window { /// Also has the input. i.e. avg("foo") function: Arc, partition_by: Vec, + order_by: Option<(Arc, SortOptions)>, options: WindowType, }, Wildcard, @@ -249,10 +250,12 @@ impl Hash for Expr { Expr::Window { function, partition_by, + order_by, options, } => { function.hash(state); partition_by.hash(state); + order_by.hash(state); options.hash(state); }, Expr::Slice { diff --git a/crates/polars-plan/src/dsl/mod.rs b/crates/polars-plan/src/dsl/mod.rs index 009ad30594b3..6d6f9304e2ec 100644 --- a/crates/polars-plan/src/dsl/mod.rs +++ b/crates/polars-plan/src/dsl/mod.rs @@ -56,6 +56,7 @@ pub use list::*; pub use meta::*; pub use name::*; pub use options::*; +use polars_core::error::feature_gated; use polars_core::prelude::*; #[cfg(feature = "diff")] use polars_core::series::ops::NullBehavior; @@ -952,12 +953,13 @@ impl Expr { /// ╰────────┴────────╯ /// ``` pub fn over, IE: Into + Clone>(self, partition_by: E) -> Self { - self.over_with_options(partition_by, Default::default()) + self.over_with_options(partition_by, None, Default::default()) } pub fn over_with_options, IE: Into + Clone>( self, partition_by: E, + order_by: Option<(E, SortOptions)>, options: WindowMapping, ) -> Self { let partition_by = partition_by @@ -965,9 +967,24 @@ impl Expr { .iter() .map(|e| e.clone().into()) .collect(); + + let order_by = order_by.map(|(e, options)| { + let e = e.as_ref(); + let e = if e.len() == 1 { + Arc::new(e[0].clone().into()) + } else { + feature_gated!["dtype-struct", { + let e = e.iter().map(|e| e.clone().into()).collect::>(); + Arc::new(as_struct(e)) + }] + }; + (e, options) + }); + Expr::Window { function: Arc::new(self), partition_by, + order_by, options: options.into(), } } @@ -980,6 +997,7 @@ impl Expr { Expr::Window { function: Arc::new(self), partition_by: vec![index_col], + order_by: None, options: WindowType::Rolling(options), } } diff --git a/crates/polars-plan/src/logical_plan/aexpr/mod.rs b/crates/polars-plan/src/logical_plan/aexpr/mod.rs index 93fd3520d83a..21458ac6bb8d 100644 --- a/crates/polars-plan/src/logical_plan/aexpr/mod.rs +++ b/crates/polars-plan/src/logical_plan/aexpr/mod.rs @@ -172,7 +172,7 @@ pub enum AExpr { /// Function arguments /// Some functions rely on aliases, /// for instance assignment of struct fields. - /// Therefore we need `[ExprIr]`. + /// Therefor we need `[ExprIr]`. input: Vec, /// function to apply function: FunctionExpr, @@ -181,6 +181,7 @@ pub enum AExpr { Window { function: Node, partition_by: Vec, + order_by: Option<(Node, SortOptions)>, options: WindowType, }, #[default] @@ -303,8 +304,12 @@ impl AExpr { Window { function, partition_by, + order_by, options: _, } => { + if let Some((n, _)) = order_by { + container.push_node(*n); + } for e in partition_by.iter().rev() { container.push_node(*e); } @@ -397,11 +402,17 @@ impl AExpr { Window { function, partition_by, + order_by, .. } => { + let offset = order_by.is_some() as usize; *function = *inputs.last().unwrap(); partition_by.clear(); - partition_by.extend_from_slice(&inputs[..inputs.len() - 1]); + partition_by.extend_from_slice(&inputs[offset..inputs.len() - 1]); + + if let Some((_, options)) = order_by { + *order_by = Some((inputs[0], *options)); + } return self; }, diff --git a/crates/polars-plan/src/logical_plan/alp/format.rs b/crates/polars-plan/src/logical_plan/alp/format.rs index 1391ea193898..5de2006ed602 100644 --- a/crates/polars-plan/src/logical_plan/alp/format.rs +++ b/crates/polars-plan/src/logical_plan/alp/format.rs @@ -420,6 +420,7 @@ impl<'a> Display for ExprIRDisplay<'a> { Window { function, partition_by, + order_by, options, } => { let function = self.with_root(function); @@ -434,7 +435,12 @@ impl<'a> Display for ExprIRDisplay<'a> { ) }, _ => { - write!(f, "{function}.over({partition_by})") + if let Some((order_by, _)) = order_by { + let order_by = self.with_root(order_by); + write!(f, "{function}.over(partition_by: {partition_by}, order_by: {order_by})") + } else { + write!(f, "{function}.over({partition_by})") + } }, } }, diff --git a/crates/polars-plan/src/logical_plan/conversion/expr_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/expr_to_ir.rs index 5a43fcc58ad3..391165573ec8 100644 --- a/crates/polars-plan/src/logical_plan/conversion/expr_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/expr_to_ir.rs @@ -330,10 +330,12 @@ fn to_aexpr_impl(expr: Expr, arena: &mut Arena, state: &mut ConversionSta Expr::Window { function, partition_by, + order_by, options, } => AExpr::Window { function: to_aexpr_impl(owned(function), arena, state), partition_by: to_aexprs(partition_by, arena, state), + order_by: order_by.map(|(e, options)| (to_aexpr_impl(owned(e), arena, state), options)), options, }, Expr::Slice { diff --git a/crates/polars-plan/src/logical_plan/conversion/ir_to_dsl.rs b/crates/polars-plan/src/logical_plan/conversion/ir_to_dsl.rs index 7caa4bb8c7b1..8204264d02b1 100644 --- a/crates/polars-plan/src/logical_plan/conversion/ir_to_dsl.rs +++ b/crates/polars-plan/src/logical_plan/conversion/ir_to_dsl.rs @@ -199,13 +199,17 @@ pub fn node_to_expr(node: Node, expr_arena: &Arena) -> Expr { AExpr::Window { function, partition_by, + order_by, options, } => { let function = Arc::new(node_to_expr(function, expr_arena)); let partition_by = nodes_to_exprs(&partition_by, expr_arena); + let order_by = + order_by.map(|(n, options)| (Arc::new(node_to_expr(n, expr_arena)), options)); Expr::Window { function, partition_by, + order_by, options, } }, diff --git a/crates/polars-plan/src/logical_plan/format.rs b/crates/polars-plan/src/logical_plan/format.rs index 4c7b46b01969..1e62cf112b07 100644 --- a/crates/polars-plan/src/logical_plan/format.rs +++ b/crates/polars-plan/src/logical_plan/format.rs @@ -15,6 +15,7 @@ impl fmt::Debug for Expr { Window { function, partition_by, + order_by, options, } => match options { #[cfg(feature = "dynamic_group_by")] @@ -26,7 +27,11 @@ impl fmt::Debug for Expr { ) }, _ => { - write!(f, "{function:?}.over({partition_by:?})") + if let Some((order_by, _)) = order_by { + write!(f, "{function:?}.over(partition_by: {partition_by:?}, order_by: {order_by:?})") + } else { + write!(f, "{function:?}.over({partition_by:?})") + } }, }, Nth(i) => write!(f, "nth({i})"), diff --git a/crates/polars-plan/src/logical_plan/visitor/expr.rs b/crates/polars-plan/src/logical_plan/visitor/expr.rs index 6038fb00251e..72b41efc51ef 100644 --- a/crates/polars-plan/src/logical_plan/visitor/expr.rs +++ b/crates/polars-plan/src/logical_plan/visitor/expr.rs @@ -75,9 +75,9 @@ impl TreeWalker for Expr { Function { input, function, options } => Function { input: input.into_iter().map(f).collect::>()?, function, options }, Explode(expr) => Explode(am(expr, f)?), Filter { input, by } => Filter { input: am(input, &mut f)?, by: am(by, f)? }, - Window { function, partition_by, options } => { + Window { function, partition_by, order_by, options } => { let partition_by = partition_by.into_iter().map(&mut f).collect::>()?; - Window { function: am(function, f)?, partition_by, options } + Window { function: am(function, f)?, partition_by, order_by, options } }, Wildcard => Wildcard, Slice { input, offset, length } => Slice { input: am(input, &mut f)?, offset: am(offset, &mut f)?, length: am(length, f)? }, diff --git a/crates/polars/tests/it/lazy/expressions/window.rs b/crates/polars/tests/it/lazy/expressions/window.rs index b42a8725acdd..9865a3a54380 100644 --- a/crates/polars/tests/it/lazy/expressions/window.rs +++ b/crates/polars/tests/it/lazy/expressions/window.rs @@ -47,9 +47,11 @@ fn test_shift_and_fill_window_function() -> PolarsResult<()> { .lazy() .select([ col("fruits"), - col("B") - .shift_and_fill(lit(-1), lit(-1)) - .over_with_options([col("fruits")], WindowMapping::Join), + col("B").shift_and_fill(lit(-1), lit(-1)).over_with_options( + [col("fruits")], + None, + WindowMapping::Join, + ), ]) .collect()?; @@ -58,9 +60,11 @@ fn test_shift_and_fill_window_function() -> PolarsResult<()> { .lazy() .select([ col("fruits"), - col("B") - .shift_and_fill(lit(-1), lit(-1)) - .over_with_options([col("fruits")], WindowMapping::Join), + col("B").shift_and_fill(lit(-1), lit(-1)).over_with_options( + [col("fruits")], + None, + WindowMapping::Join, + ), ]) .collect()?; @@ -81,7 +85,7 @@ fn test_exploded_window_function() -> PolarsResult<()> { col("fruits"), col("B") .shift(lit(1)) - .over_with_options([col("fruits")], WindowMapping::Explode) + .over_with_options([col("fruits")], None, WindowMapping::Explode) .alias("shifted"), ]) .collect()?; @@ -100,7 +104,7 @@ fn test_exploded_window_function() -> PolarsResult<()> { col("fruits"), col("B") .shift_and_fill(lit(1), lit(-1.0f32)) - .over_with_options([col("fruits")], WindowMapping::Explode) + .over_with_options([col("fruits")], None, WindowMapping::Explode) .alias("shifted"), ]) .collect()?; @@ -169,7 +173,7 @@ fn test_literal_window_fn() -> PolarsResult<()> { .lazy() .select([repeat(1, len()) .cum_sum(false) - .over_with_options([col("chars")], WindowMapping::Join) + .over_with_options([col("chars")], None, WindowMapping::Join) .alias("foo")]) .collect()?; diff --git a/py-polars/polars/expr/expr.py b/py-polars/polars/expr/expr.py index 6617dbb2d4e8..56d8eec09fcc 100644 --- a/py-polars/polars/expr/expr.py +++ b/py-polars/polars/expr/expr.py @@ -3588,8 +3588,9 @@ def last(self) -> Self: def over( self, - expr: IntoExpr | Iterable[IntoExpr], + partition_by: IntoExpr | Iterable[IntoExpr], *more_exprs: IntoExpr, + order_by: IntoExpr | Iterable[IntoExpr] | None = None, mapping_strategy: WindowMappingStrategy = "group_to_rows", ) -> Self: """ @@ -3604,11 +3605,14 @@ def over( Parameters ---------- - expr + partition_by Column(s) to group by. Accepts expression input. Strings are parsed as column names. *more_exprs Additional columns to group by, specified as positional arguments. + order_by: + Order the window functions/aggregations with the partitioned groups by the + result of the expression passed to `order_by`. mapping_strategy: {'group_to_rows', 'join', 'explode'} - group_to_rows If the aggregation results in multiple values, assign them back to their @@ -3725,8 +3729,18 @@ def over( └─────┴─────┴─────┘ """ - exprs = parse_as_list_of_expressions(expr, *more_exprs) - return self._from_pyexpr(self._pyexpr.over(exprs, mapping_strategy)) + partition_by = parse_as_list_of_expressions(partition_by, *more_exprs) + if order_by is not None: + order_by = parse_as_list_of_expressions(order_by) + return self._from_pyexpr( + self._pyexpr.over( + partition_by, + order_by=order_by, + order_by_descending=False, # does not work yet + order_by_nulls_last=False, # does not work yet + mapping_strategy=mapping_strategy, + ) + ) def rolling( self, diff --git a/py-polars/polars/expr/meta.py b/py-polars/polars/expr/meta.py index 96a0e4eaee86..c4c4bebdc974 100644 --- a/py-polars/polars/expr/meta.py +++ b/py-polars/polars/expr/meta.py @@ -280,7 +280,7 @@ def serialize(self, file: IOBase | str | Path | None = None) -> str | None: >>> expr = pl.col("foo").sum().over("bar") >>> json = expr.meta.serialize() >>> json - '{"Window":{"function":{"Agg":{"Sum":{"Column":"foo"}}},"partition_by":[{"Column":"bar"}],"options":{"Over":"GroupsToRows"}}}' + '{"Window":{"function":{"Agg":{"Sum":{"Column":"foo"}}},"partition_by":[{"Column":"bar"}],"order_by":null,"options":{"Over":"GroupsToRows"}}}' The expression can later be deserialized back into an `Expr` object. diff --git a/py-polars/src/expr/general.rs b/py-polars/src/expr/general.rs index b80d687a6d5a..d29b72a3548b 100644 --- a/py-polars/src/expr/general.rs +++ b/py-polars/src/expr/general.rs @@ -662,14 +662,35 @@ impl PyExpr { self.inner.clone().is_duplicated().into() } - fn over(&self, partition_by: Vec, mapping: Wrap) -> Self { + #[pyo3(signature = (partition_by, order_by, order_by_descending, order_by_nulls_last, mapping_strategy))] + fn over( + &self, + partition_by: Vec, + order_by: Option>, + order_by_descending: bool, + order_by_nulls_last: bool, + mapping_strategy: Wrap, + ) -> Self { let partition_by = partition_by .into_iter() .map(|e| e.inner) .collect::>(); + + let order_by = order_by.map(|order_by| { + ( + order_by.into_iter().map(|e| e.inner).collect::>(), + SortOptions { + descending: order_by_descending, + nulls_last: order_by_nulls_last, + maintain_order: false, + ..Default::default() + }, + ) + }); + self.inner .clone() - .over_with_options(partition_by, mapping.0) + .over_with_options(partition_by, order_by, mapping_strategy.0) .into() } diff --git a/py-polars/src/lazyframe/visitor/expr_nodes.rs b/py-polars/src/lazyframe/visitor/expr_nodes.rs index 9e6bee91b1b4..fc807d64402c 100644 --- a/py-polars/src/lazyframe/visitor/expr_nodes.rs +++ b/py-polars/src/lazyframe/visitor/expr_nodes.rs @@ -352,6 +352,12 @@ pub struct Window { #[pyo3(get)] partition_by: Vec, #[pyo3(get)] + order_by: Option, + #[pyo3(get)] + order_by_descending: bool, + #[pyo3(get)] + order_by_nulls_last: bool, + #[pyo3(get)] options: PyObject, } @@ -1225,10 +1231,19 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult { AExpr::Window { function, partition_by, + order_by, options, } => { let function = function.0; let partition_by = partition_by.iter().map(|n| n.0).collect(); + let order_by_descending = order_by + .map(|(_, options)| options.descending) + .unwrap_or(false); + let order_by_nulls_last = order_by + .map(|(_, options)| options.nulls_last) + .unwrap_or(false); + let order_by = order_by.map(|(n, _)| n.0); + let options = match options { WindowType::Over(options) => PyWindowMapping { inner: *options }.into_py(py), WindowType::Rolling(options) => PyRollingGroupOptions { @@ -1239,6 +1254,9 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult { Window { function, partition_by, + order_by, + order_by_descending, + order_by_nulls_last, options, } .into_py(py) diff --git a/py-polars/tests/unit/operations/test_window.py b/py-polars/tests/unit/operations/test_window.py index dfffd6e5cca9..959def0dc45a 100644 --- a/py-polars/tests/unit/operations/test_window.py +++ b/py-polars/tests/unit/operations/test_window.py @@ -490,3 +490,24 @@ def test_windows_not_cached() -> None: # this might fail if they are cached for _ in range(1000): ldf.collect() + + +def test_window_order_by_8662() -> None: + df = pl.DataFrame( + { + "g": [1, 1, 1, 1, 2, 2, 2, 2], + "t": [1, 2, 3, 4, 4, 1, 2, 3], + "x": [10, 20, 30, 40, 10, 20, 30, 40], + } + ) + + assert df.with_columns( + x_lag0=pl.col("x").shift(1).over("g"), + x_lag1=pl.col("x").shift(1).over("g", order_by="t"), + ).to_dict(as_series=False) == { + "g": [1, 1, 1, 1, 2, 2, 2, 2], + "t": [1, 2, 3, 4, 4, 1, 2, 3], + "x": [10, 20, 30, 40, 10, 20, 30, 40], + "x_lag0": [None, 10, 20, 30, None, 10, 20, 30], + "x_lag1": [None, 10, 20, 30, 40, None, 20, 30], + }