Skip to content

Commit

Permalink
feat: Filter Pushdown Rule (#140)
Browse files Browse the repository at this point in the history
This PR brings a filter pushdown heuristic rule, built on @AveryQi115's
hybrid scheme.

### Filter Pushdown Rule
- This series of rules matches on any filter, and pushes any part of the
filter down below a node if possible.
- They are registered in the HeuristicsRuleWrapper currently, but they
work as cost-based rules as well.

### Helper Functions
- `LogOpExpr::new_flattened_nested_logical` creates a new `LogOpExpr`
from an `ExprList`, and it flattens any nested `LogOpExpr`s of the same
`LogOpType`.
- `Expr::rewrite_column_refs` recursively rewrites any `ColumnExpr` in
an expression tree, using a provided `rewrite_fn`.
- `LogicalJoin::map_through_join` takes in left/right schema sizes, and
maps an index to be as it would if it were pushed down to the left or
right side of a join.
- `LogicalProjection::compute_column_mapping` creates a `ColumnMapping`
object from a `LogicalProjection`.
- The `ColumnMapping` object has a few methods, but most importantly it
has `rewrite_condition`, which given an expr, will rewrite the
expression with the projection's mapping.

### Testing Utilities
- `new_test_optimizer` creates a new heuristic optimizer, which applies
a given rule. It uses a `TpchCatalog`.
- `TpchCatalog` is a catalog implementing a couple of tables from the
TPC-H schema. It can be extended to have more as needed.
- `DummyCostModel` implements a cost model, only giving zero cost. It is
used for constructing a cascades optimizer without a real cost model,
and isn't used in this PR.
- This pull request, using these test optimizer components, pioneers a
new testing scheme, based on running a constructed query plan through an
optimizer, rather than text-based SQL planner tests, which may be flaky.
They also test rules in isolation.

---------

Signed-off-by: AveryQi115 <averyqi115@gmail.com>
Co-authored-by: AveryQi115 <averyqi115@gmail.com>
  • Loading branch information
jurplel and AveryQi115 authored Apr 9, 2024
1 parent d732a10 commit 3b0e6b7
Show file tree
Hide file tree
Showing 15 changed files with 1,464 additions and 542 deletions.
41 changes: 8 additions & 33 deletions optd-datafusion-bridge/src/into_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,6 @@ use optd_datafusion_repr::properties::schema::Schema as OPTDSchema;

use crate::OptdPlanContext;

// flatten_nested_logical is a helper function to flatten nested logical operators with same op type
// eg. (a AND (b AND c)) => ExprList([a, b, c])
// (a OR (b OR c)) => ExprList([a, b, c])
// It assume the children of the input expr_list are already flattened
// and can only be used in bottom up manner
fn flatten_nested_logical(op: LogOpType, expr_list: ExprList) -> ExprList {
// conv_into_optd_expr is building the children bottom up so there is no need to
// call flatten_nested_logical recursively
let mut new_expr_list = Vec::new();
for child in expr_list.to_vec() {
if let OptRelNodeTyp::LogOp(child_op) = child.typ() {
if child_op == op {
let child_log_op_expr = LogOpExpr::from_rel_node(child.into_rel_node()).unwrap();
new_expr_list.extend(child_log_op_expr.children().to_vec());
continue;
}
}
new_expr_list.push(child.clone());
}
ExprList::new(new_expr_list)
}

impl OptdPlanContext<'_> {
fn conv_into_optd_table_scan(&mut self, node: &logical_plan::TableScan) -> Result<PlanNode> {
let table_name = node.table_name.to_string();
Expand Down Expand Up @@ -73,14 +51,16 @@ impl OptdPlanContext<'_> {
Operator::And => {
let op = LogOpType::And;
let expr_list = ExprList::new(vec![left, right]);
let expr_list = flatten_nested_logical(op, expr_list);
return Ok(LogOpExpr::new(op, expr_list).into_expr());
return Ok(
LogOpExpr::new_flattened_nested_logical(op, expr_list).into_expr()
);
}
Operator::Or => {
let op = LogOpType::Or;
let expr_list = ExprList::new(vec![left, right]);
let expr_list = flatten_nested_logical(op, expr_list);
return Ok(LogOpExpr::new(op, expr_list).into_expr());
return Ok(
LogOpExpr::new_flattened_nested_logical(op, expr_list).into_expr()
);
}
_ => {}
}
Expand Down Expand Up @@ -331,13 +311,8 @@ impl OptdPlanContext<'_> {
} else {
let expr_list = ExprList::new(log_ops);
// the expr from filter is already flattened in conv_into_optd_expr
let expr_list = flatten_nested_logical(LogOpType::And, expr_list);
Ok(LogicalJoin::new(
left,
right,
LogOpExpr::new(LogOpType::And, expr_list).into_expr(),
join_type,
))
let log_op = LogOpExpr::new_flattened_nested_logical(LogOpType::And, expr_list);
Ok(LogicalJoin::new(left, right, log_op.into_expr(), join_type))
}
}

Expand Down
25 changes: 23 additions & 2 deletions optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use properties::{
};
use rules::{
EliminateDuplicatedAggExprRule, EliminateDuplicatedSortExprRule, EliminateFilterRule,
EliminateJoinRule, EliminateLimitRule, HashJoinRule, JoinAssocRule, JoinCommuteRule,
PhysicalConversionRule, ProjectionPullUpJoin, SimplifyFilterRule, SimplifyJoinCondRule,
EliminateJoinRule, EliminateLimitRule, FilterAggTransposeRule, FilterCrossJoinTransposeRule,
FilterInnerJoinTransposeRule, FilterMergeRule, FilterProjectTransposeRule,
FilterSortTransposeRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, PhysicalConversionRule,
ProjectionPullUpJoin, SimplifyFilterRule, SimplifyJoinCondRule,
};

pub use optd_core::rel_node::Value;
Expand All @@ -34,6 +36,8 @@ mod explain;
pub mod plan_nodes;
pub mod properties;
pub mod rules;
#[cfg(test)]
mod testing;

pub struct DatafusionOptimizer {
hueristic_optimizer: HeuristicsOptimizer<OptRelNodeTyp>,
Expand Down Expand Up @@ -92,6 +96,23 @@ impl DatafusionOptimizer {
for rule in rules {
rule_wrappers.push(RuleWrapper::new_cascades(rule));
}
// add all filter pushdown rules as heuristic rules
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterProjectTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(FilterMergeRule::new())));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterCrossJoinTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterInnerJoinTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterSortTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterAggTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(HashJoinRule::new()))); // 17
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinCommuteRule::new()))); // 18
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinAssocRule::new())));
Expand Down
51 changes: 51 additions & 0 deletions optd-datafusion-repr/src/plan_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use arrow_schema::DataType;
use itertools::Itertools;
use optd_core::{
cascades::{CascadesOptimizer, GroupId},
rel_node::{RelNode, RelNodeMeta, RelNodeMetaMap, RelNodeRef, RelNodeTyp},
Expand Down Expand Up @@ -284,6 +285,56 @@ impl Expr {
pub fn child(&self, idx: usize) -> OptRelNodeRef {
self.0.child(idx)
}

/// Recursively rewrite all column references in the expression.using a provided
/// function that replaces a column index.
/// The provided function will, given a ColumnRefExpr's index,
/// return either Some(usize) or None.
/// - If it is Some, the column index can be rewritten with the value.
/// - If any of the columns is None, we will return None all the way up
/// the call stack, and no expression will be returned.
pub fn rewrite_column_refs(
&self,
rewrite_fn: &impl Fn(usize) -> Option<usize>,
) -> Option<Self> {
assert!(self.typ().is_expression());
if let OptRelNodeTyp::ColumnRef = self.typ() {
let col_ref = ColumnRefExpr::from_rel_node(self.0.clone()).unwrap();
let rewritten = rewrite_fn(col_ref.index());
return if let Some(rewritten_idx) = rewritten {
let new_col_ref = ColumnRefExpr::new(rewritten_idx);
Some(Self(new_col_ref.into_rel_node()))
} else {
None
};
}

let children = self.0.children.clone();
let children = children
.into_iter()
.map(|child| {
if child.typ == OptRelNodeTyp::List {
// TODO: What should we do with List?
return Some(child);
}
Expr::from_rel_node(child.clone())
.unwrap()
.rewrite_column_refs(rewrite_fn)
.map(|x| x.into_rel_node())
})
.collect::<Option<Vec<_>>>()?;
Some(
Expr::from_rel_node(
RelNode {
typ: self.0.typ.clone(),
children: children.into_iter().collect_vec(),
data: self.0.data.clone(),
}
.into(),
)
.unwrap(),
)
}
}

impl OptRelNode for Expr {
Expand Down
23 changes: 23 additions & 0 deletions optd-datafusion-repr/src/plan_nodes/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,29 @@ impl LogOpExpr {
))
}

/// flatten_nested_logical is a helper function to flatten nested logical operators with same op type
/// eg. (a AND (b AND c)) => ExprList([a, b, c])
/// (a OR (b OR c)) => ExprList([a, b, c])
/// It assume the children of the input expr_list are already flattened
/// and can only be used in bottom up manner
pub fn new_flattened_nested_logical(op: LogOpType, expr_list: ExprList) -> Self {
// Since we assume that we are building the children bottom up,
// there is no need to call flatten_nested_logical recursively
let mut new_expr_list = Vec::new();
for child in expr_list.to_vec() {
if let OptRelNodeTyp::LogOp(child_op) = child.typ() {
if child_op == op {
let child_log_op_expr =
LogOpExpr::from_rel_node(child.into_rel_node()).unwrap();
new_expr_list.extend(child_log_op_expr.children().to_vec());
continue;
}
}
new_expr_list.push(child.clone());
}
LogOpExpr::new(op, ExprList::new(new_expr_list))
}

pub fn children(&self) -> Vec<Expr> {
self.0
.0
Expand Down
17 changes: 17 additions & 0 deletions optd-datafusion-repr/src/plan_nodes/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,20 @@ define_plan_node!(
{ 3, right_keys: ExprList }
], { join_type: JoinType }
);

impl LogicalJoin {
/// Takes in left/right schema sizes, and maps a column index to be as if it
/// were pushed down to the left or right side of a join accordingly.
pub fn map_through_join(
col_idx: usize,
left_schema_size: usize,
right_schema_size: usize,
) -> usize {
assert!(col_idx < left_schema_size + right_schema_size);
if col_idx < left_schema_size {
col_idx
} else {
col_idx - left_schema_size
}
}
}
73 changes: 72 additions & 1 deletion optd-datafusion-repr/src/plan_nodes/projection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::expr::ExprList;
use super::macros::define_plan_node;

use super::{OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};
use super::{ColumnRefExpr, Expr, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};

#[derive(Clone, Debug)]
pub struct LogicalProjection(pub PlanNode);
Expand All @@ -26,3 +26,74 @@ define_plan_node!(
{ 1, exprs: ExprList }
]
);

/// This struct holds the mapping from original columns to projected columns.
///
/// # Example
/// With the following plan:
/// | Filter (#0 < 5)
/// |
/// |-| Projection [#2, #3]
/// |- Scan [#0, #1, #2, #3]
///
/// The computed projection mapping is:
/// #2 -> #0
/// #3 -> #1
pub struct ProjectionMapping {
forward: Vec<usize>,
_backward: Vec<Option<usize>>,
}

impl ProjectionMapping {
pub fn build(mapping: Vec<usize>) -> Option<Self> {
let mut backward = vec![];
for (i, &x) in mapping.iter().enumerate() {
if x >= backward.len() {
backward.resize(x + 1, None);
}
backward[x] = Some(i);
}
Some(Self {
forward: mapping,
_backward: backward,
})
}

pub fn projection_col_refers_to(&self, col: usize) -> usize {
self.forward[col]
}

pub fn _original_col_maps_to(&self, col: usize) -> Option<usize> {
self._backward[col]
}

/// Recursively rewrites all ColumnRefs in an Expr to *undo* the projection
/// condition. You might want to do this if you are pushing something
/// through a projection, or pulling a projection up.
///
/// # Example
/// If we have a projection node, mapping column A to column B (A -> B)
/// All B's in `cond` will be rewritten as A.
pub fn rewrite_condition(&self, cond: Expr, child_schema_len: usize) -> Expr {
let proj_schema_size = self.forward.len();
cond.rewrite_column_refs(&|idx| {
Some(if idx < proj_schema_size {
self.projection_col_refers_to(idx)
} else {
idx - proj_schema_size + child_schema_len
})
})
.unwrap()
}
}

impl LogicalProjection {
pub fn compute_column_mapping(exprs: &ExprList) -> Option<ProjectionMapping> {
let mut mapping = vec![];
for expr in exprs.to_vec() {
let col_expr = ColumnRefExpr::from_rel_node(expr.into_rel_node())?;
mapping.push(col_expr.index());
}
ProjectionMapping::build(mapping)
}
}
5 changes: 5 additions & 0 deletions optd-datafusion-repr/src/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod eliminate_duplicated_expr;
mod eliminate_limit;
mod filter;
mod filter_pushdown;
mod joins;
mod macros;
mod physical;
Expand All @@ -12,6 +13,10 @@ pub use eliminate_duplicated_expr::{
};
pub use eliminate_limit::EliminateLimitRule;
pub use filter::{EliminateFilterRule, SimplifyFilterRule, SimplifyJoinCondRule};
pub use filter_pushdown::{
FilterAggTransposeRule, FilterCrossJoinTransposeRule, FilterInnerJoinTransposeRule,
FilterMergeRule, FilterProjectTransposeRule, FilterSortTransposeRule,
};
pub use joins::{
EliminateJoinRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, ProjectionPullUpJoin,
};
Expand Down
Loading

0 comments on commit 3b0e6b7

Please sign in to comment.