diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java index 9d7b8211231..1c66f9a6489 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java @@ -88,6 +88,8 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule { new PinotAggregateExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY); public PinotAggregateExchangeNodeInsertRule(RelBuilderFactory factory) { + // NOTE: Explicitly match for LogicalAggregate because after applying the rule, LogicalAggregate is replaced with + // PinotLogicalAggregate, and the rule won't be applied again. super(operand(LogicalAggregate.class, any()), factory, null); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateToSemiJoinRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateToSemiJoinRule.java index e93609a38a3..327921df713 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateToSemiJoinRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateToSemiJoinRule.java @@ -29,7 +29,6 @@ import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinInfo; -import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.rules.CoreRules; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; @@ -40,8 +39,7 @@ /** - * SemiJoinRule that matches an Aggregate on top of a Join with an Aggregate - * as its right child. + * SemiJoinRule that matches an Aggregate on top of a Join with an Aggregate as its right child. * * @see CoreRules#PROJECT_TO_SEMI_JOIN */ @@ -50,18 +48,9 @@ public class PinotAggregateToSemiJoinRule extends RelOptRule { new PinotAggregateToSemiJoinRule(PinotRuleUtils.PINOT_REL_FACTORY); public PinotAggregateToSemiJoinRule(RelBuilderFactory factory) { - super(operand(LogicalAggregate.class, any()), factory, null); - } - - @Override - @SuppressWarnings("rawtypes") - public boolean matches(RelOptRuleCall call) { - final Aggregate topAgg = call.rel(0); - if (!PinotRuleUtils.isJoin(topAgg.getInput())) { - return false; - } - final Join join = (Join) PinotRuleUtils.unboxRel(topAgg.getInput()); - return PinotRuleUtils.isAggregate(join.getInput(1)); + super(operand(Aggregate.class, + some(operand(Join.class, some(operand(RelNode.class, any()), operand(Aggregate.class, any()))))), factory, + null); } @Override diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotEvaluateLiteralRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotEvaluateLiteralRule.java index 1d7f15ec5da..12aaef1d97b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotEvaluateLiteralRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotEvaluateLiteralRule.java @@ -97,7 +97,7 @@ private static LogicalProject constructNewProject(LogicalProject oldProject, Log } castedNewProjects.add(newNode); } - return needCast ? LogicalProject.create(oldProject.getInput(), oldProject.getHints(), castedNewProjects, + return needCast ? oldProject.copy(oldProject.getTraitSet(), oldProject.getInput(), castedNewProjects, oldProject.getRowType()) : newProject; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotExchangeEliminationRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotExchangeEliminationRule.java index e6ba7b9c518..5c8dfb0ba0d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotExchangeEliminationRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotExchangeEliminationRule.java @@ -18,11 +18,12 @@ */ package org.apache.pinot.calcite.rel.rules; -import java.util.Collections; +import java.util.List; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Exchange; import org.apache.calcite.tools.RelBuilderFactory; import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; @@ -36,17 +37,14 @@ public class PinotExchangeEliminationRule extends RelOptRule { new PinotExchangeEliminationRule(PinotRuleUtils.PINOT_REL_FACTORY); public PinotExchangeEliminationRule(RelBuilderFactory factory) { - super(operand(PinotLogicalExchange.class, - some(operand(PinotLogicalExchange.class, some(operand(RelNode.class, any()))))), factory, null); + super(operand(Exchange.class, some(operand(Exchange.class, some(operand(RelNode.class, any()))))), factory, null); } @Override public void onMatch(RelOptRuleCall call) { - PinotLogicalExchange exchange0 = call.rel(0); - PinotLogicalExchange exchange1 = call.rel(1); + Exchange exchange0 = call.rel(0); RelNode input = call.rel(2); // convert the call to skip the exchange. - RelNode rel = exchange0.copy(input.getTraitSet(), Collections.singletonList(input)); - call.transformTo(rel); + call.transformTo(exchange0.copy(input.getTraitSet(), List.of(input))); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotFilterExpandSearchRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotFilterExpandSearchRule.java index 1a7e00f6bbf..1f34913c192 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotFilterExpandSearchRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotFilterExpandSearchRule.java @@ -21,7 +21,6 @@ import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; @@ -33,27 +32,20 @@ public class PinotFilterExpandSearchRule extends RelOptRule { new PinotFilterExpandSearchRule(PinotRuleUtils.PINOT_REL_FACTORY); public PinotFilterExpandSearchRule(RelBuilderFactory factory) { - super(operand(LogicalFilter.class, any()), factory, null); + super(operand(Filter.class, any()), factory, null); } @Override - @SuppressWarnings("rawtypes") public boolean matches(RelOptRuleCall call) { - if (call.rels.length < 1) { - return false; - } - if (call.rel(0) instanceof Filter) { - Filter filter = call.rel(0); - return containsRangeSearch(filter.getCondition()); - } - return false; + Filter filter = call.rel(0); + return containsRangeSearch(filter.getCondition()); } @Override public void onMatch(RelOptRuleCall call) { Filter filter = call.rel(0); RexNode newCondition = RexUtil.expandSearch(filter.getCluster().getRexBuilder(), null, filter.getCondition()); - call.transformTo(LogicalFilter.create(filter.getInput(), newCondition)); + call.transformTo(filter.copy(filter.getTraitSet(), filter.getInput(), newCondition)); } private boolean containsRangeSearch(RexNode condition) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java index 1b485551f87..37f12fbd081 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java @@ -18,14 +18,12 @@ */ package org.apache.pinot.calcite.rel.rules; -import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinInfo; -import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.tools.RelBuilderFactory; import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; @@ -38,19 +36,13 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule { new PinotJoinExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY); public PinotJoinExchangeNodeInsertRule(RelBuilderFactory factory) { - super(operand(LogicalJoin.class, any()), factory, null); + super(operand(Join.class, any()), factory, null); } @Override public boolean matches(RelOptRuleCall call) { - if (call.rels.length < 1) { - return false; - } - if (call.rel(0) instanceof Join) { - Join join = call.rel(0); - return !PinotRuleUtils.isExchange(join.getLeft()) && !PinotRuleUtils.isExchange(join.getRight()); - } - return false; + Join join = call.rel(0); + return !PinotRuleUtils.isExchange(join.getLeft()) && !PinotRuleUtils.isExchange(join.getRight()); } @Override @@ -73,10 +65,7 @@ public void onMatch(RelOptRuleCall call) { rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys)); } - RelNode newJoinNode = - new LogicalJoin(join.getCluster(), join.getTraitSet(), join.getHints(), leftExchange, rightExchange, - join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(), - ImmutableList.copyOf(join.getSystemFieldList())); - call.transformTo(newJoinNode); + call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), leftExchange, rightExchange, join.getJoinType(), + join.isSemiJoinDone())); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java index f8bbeb63bf0..ed86a6fcc2b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.calcite.rel.rules; -import com.google.common.collect.ImmutableList; import java.util.Collections; import java.util.List; import org.apache.calcite.plan.RelOptRule; @@ -31,7 +30,6 @@ import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.tools.RelBuilderFactory; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable; @@ -121,27 +119,23 @@ public class PinotJoinToDynamicBroadcastRule extends RelOptRule { new PinotJoinToDynamicBroadcastRule(PinotRuleUtils.PINOT_REL_FACTORY); public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) { - super(operand(LogicalJoin.class, any()), factory, null); + super(operand(Join.class, any()), factory, null); } @Override public boolean matches(RelOptRuleCall call) { - if (call.rels.length < 1 || !(call.rel(0) instanceof Join)) { - return false; - } Join join = call.rel(0); - String joinStrategyString = PinotHintStrategyTable.getHintOption(join.getHints(), - PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.JOIN_STRATEGY); - List joinStrategies = joinStrategyString != null ? StringUtils.split(joinStrategyString, ",") - : Collections.emptyList(); - boolean explicitOtherStrategy = joinStrategies.size() > 0 - && !joinStrategies.contains(PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY); + String joinStrategyString = + PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS, + PinotHintOptions.JoinHintOptions.JOIN_STRATEGY); + List joinStrategies = + joinStrategyString != null ? StringUtils.split(joinStrategyString, ",") : Collections.emptyList(); + boolean explicitOtherStrategy = !joinStrategies.isEmpty() && !joinStrategies.contains( + PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY); JoinInfo joinInfo = join.analyzeCondition(); - RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) join.getLeft()).getCurrentRel() - : join.getLeft(); - RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel() - : join.getRight(); + RelNode left = ((HepRelVertex) join.getLeft()).getCurrentRel(); + RelNode right = ((HepRelVertex) join.getRight()).getCurrentRel(); return left instanceof Exchange && right instanceof Exchange // left side can be pushed as dynamic exchange && PinotRuleUtils.canPushDynamicBroadcastToLeaf(left.getInput(0)) @@ -155,16 +149,15 @@ public boolean matches(RelOptRuleCall call) { @Override public void onMatch(RelOptRuleCall call) { Join join = call.rel(0); - PinotLogicalExchange left = (PinotLogicalExchange) (join.getLeft() instanceof HepRelVertex - ? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft()); - PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex - ? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight()); + Exchange left = (Exchange) ((HepRelVertex) join.getLeft()).getCurrentRel(); + Exchange right = (Exchange) ((HepRelVertex) join.getRight()).getCurrentRel(); // when colocated join hint is given, dynamic broadcast exchange can be hash-distributed b/c // 1. currently, dynamic broadcast only works against main table off leaf-stage; (e.g. receive node on leaf) // 2. when hash key are the same but hash functions are different, it can be done via normal hash shuffle. - boolean isColocatedJoin = PinotHintStrategyTable.isHintOptionTrue(join.getHints(), - PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS); + boolean isColocatedJoin = + PinotHintStrategyTable.isHintOptionTrue(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS, + PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS); PinotLogicalExchange dynamicBroadcastExchange; RelNode rightInput = right.getInput(); if (isColocatedJoin) { @@ -174,10 +167,8 @@ public void onMatch(RelOptRuleCall call) { RelDistribution dist = RelDistributions.BROADCAST_DISTRIBUTED; dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER); } - Join dynamicFilterJoin = - new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange, - join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(), - ImmutableList.copyOf(join.getSystemFieldList())); - call.transformTo(dynamicFilterJoin); + + call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), left.getInput(), dynamicBroadcastExchange, + join.getJoinType(), join.isSemiJoinDone())); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java index f2aa72da846..22a37a5f918 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java @@ -68,11 +68,6 @@ public PinotRelDistributionTraitRule(RelBuilderFactory factory) { super(operand(RelNode.class, any()), factory, null); } - @Override - public boolean matches(RelOptRuleCall call) { - return call.rels.length >= 1; - } - @Override public void onMatch(RelOptRuleCall call) { RelNode current = call.rel(0); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java index 425fe09335c..3db58c048c0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java @@ -20,17 +20,13 @@ import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.SetOp; -import org.apache.calcite.rel.logical.LogicalIntersect; -import org.apache.calcite.rel.logical.LogicalMinus; -import org.apache.calcite.rel.logical.LogicalUnion; import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableIntList; import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; @@ -47,41 +43,20 @@ public PinotSetOpExchangeNodeInsertRule(RelBuilderFactory factory) { @Override public boolean matches(RelOptRuleCall call) { - if (call.rels.length < 1) { - return false; - } - if (call.rel(0) instanceof SetOp) { - SetOp setOp = call.rel(0); - for (RelNode input : setOp.getInputs()) { - if (PinotRuleUtils.isExchange(input)) { - return false; - } - } - return true; - } - return false; + SetOp setOp = call.rel(0); + return !PinotRuleUtils.isExchange(setOp.getInput(0)); } @Override public void onMatch(RelOptRuleCall call) { SetOp setOp = call.rel(0); - List newInputs = new ArrayList<>(); - List hashFields = - IntStream.range(0, setOp.getRowType().getFieldCount()).boxed().collect(Collectors.toCollection(ArrayList::new)); - for (RelNode input : setOp.getInputs()) { - RelNode exchange = PinotLogicalExchange.create(input, RelDistributions.hash(hashFields)); + List inputs = setOp.getInputs(); + List newInputs = new ArrayList<>(inputs.size()); + for (RelNode input : inputs) { + RelNode exchange = PinotLogicalExchange.create(input, + RelDistributions.hash(ImmutableIntList.range(0, setOp.getRowType().getFieldCount()))); newInputs.add(exchange); } - SetOp newSetOpNode; - if (setOp instanceof LogicalUnion) { - newSetOpNode = new LogicalUnion(setOp.getCluster(), setOp.getTraitSet(), newInputs, setOp.all); - } else if (setOp instanceof LogicalIntersect) { - newSetOpNode = new LogicalIntersect(setOp.getCluster(), setOp.getTraitSet(), newInputs, setOp.all); - } else if (setOp instanceof LogicalMinus) { - newSetOpNode = new LogicalMinus(setOp.getCluster(), setOp.getTraitSet(), newInputs, setOp.all); - } else { - throw new UnsupportedOperationException("Unsupported set op node: " + setOp); - } - call.transformTo(newSetOpNode); + call.transformTo(setOp.copy(setOp.getTraitSet(), newInputs)); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSingleValueAggregateRemoveRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSingleValueAggregateRemoveRule.java index 29ef7171369..de271cc69a3 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSingleValueAggregateRemoveRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSingleValueAggregateRemoveRule.java @@ -18,13 +18,12 @@ */ package org.apache.pinot.calcite.rel.rules; +import java.util.List; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.hep.HepRelVertex; -import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.tools.RelBuilderFactory; @@ -37,23 +36,22 @@ public class PinotSingleValueAggregateRemoveRule extends RelOptRule { new PinotSingleValueAggregateRemoveRule(PinotRuleUtils.PINOT_REL_FACTORY); public PinotSingleValueAggregateRemoveRule(RelBuilderFactory factory) { - super(operand(LogicalAggregate.class, any()), factory, null); + super(operand(Aggregate.class, any()), factory, null); } @Override public boolean matches(RelOptRuleCall call) { - final Aggregate agg = call.rel(0); - if (agg.getAggCallList().size() != 1) { + Aggregate agg = call.rel(0); + List aggCalls = agg.getAggCallList(); + if (aggCalls.size() != 1) { return false; } - final AggregateCall aggCall = agg.getAggCallList().get(0); - return aggCall.getAggregation().getName().equals("SINGLE_VALUE"); + return aggCalls.get(0).getAggregation().getName().equals("SINGLE_VALUE"); } @Override public void onMatch(RelOptRuleCall call) { - final Aggregate agg = call.rel(0); - final RelNode input = ((HepRelVertex) agg.getInput()).getCurrentRel(); - call.transformTo(input); + Aggregate agg = call.rel(0); + call.transformTo(((HepRelVertex) agg.getInput()).getCurrentRel()); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java index 2536b0ed01a..1069934c2f1 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java @@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.tools.RelBuilderFactory; import org.apache.pinot.calcite.rel.logical.PinotLogicalSortExchange; @@ -44,19 +43,13 @@ public class PinotSortExchangeNodeInsertRule extends RelOptRule { new PinotSortExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY); public PinotSortExchangeNodeInsertRule(RelBuilderFactory factory) { - super(operand(LogicalSort.class, any()), factory, null); + super(operand(Sort.class, any()), factory, null); } @Override public boolean matches(RelOptRuleCall call) { - if (call.rels.length < 1) { - return false; - } - if (call.rel(0) instanceof Sort) { - Sort sort = call.rel(0); - return !PinotRuleUtils.isExchange(sort.getInput()); - } - return false; + Sort sort = call.rel(0); + return !PinotRuleUtils.isExchange(sort.getInput()); } @Override @@ -65,12 +58,10 @@ public void onMatch(RelOptRuleCall call) { // TODO: Assess whether sorting is needed on both sender and receiver side or only receiver side. Potentially add // SqlHint support to determine this. For now setting sort only on receiver side as sender side sorting is // not yet implemented. - PinotLogicalSortExchange exchange = PinotLogicalSortExchange.create( - sort.getInput(), - RelDistributions.hash(Collections.emptyList()), - sort.getCollation(), - false, - !sort.getCollation().getKeys().isEmpty()); - call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch)); + // TODO: Revisit whether we should use hash distribution + PinotLogicalSortExchange exchange = + PinotLogicalSortExchange.create(sort.getInput(), RelDistributions.hash(Collections.emptyList()), + sort.getCollation(), false, !sort.getCollation().getKeys().isEmpty()); + call.transformTo(sort.copy(sort.getTraitSet(), exchange, sort.getCollation())); } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java index 64abb437bec..2050aa3b8a2 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -32,6 +31,7 @@ import org.apache.calcite.plan.hep.HepRelVertex; import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Exchange; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.Window; import org.apache.calcite.rel.logical.LogicalProject; @@ -65,66 +65,59 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule { // Supported window functions // OTHER_FUNCTION supported are: BOOL_AND, BOOL_OR - private static final Set SUPPORTED_WINDOW_FUNCTION_KIND = ImmutableSet.of(SqlKind.SUM, SqlKind.SUM0, - SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.ROW_NUMBER, SqlKind.RANK, SqlKind.DENSE_RANK, - SqlKind.LAG, SqlKind.LEAD, SqlKind.FIRST_VALUE, SqlKind.LAST_VALUE, SqlKind.OTHER_FUNCTION); + private static final Set SUPPORTED_WINDOW_FUNCTION_KIND = + Set.of(SqlKind.SUM, SqlKind.SUM0, SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.ROW_NUMBER, SqlKind.RANK, + SqlKind.DENSE_RANK, SqlKind.LAG, SqlKind.LEAD, SqlKind.FIRST_VALUE, SqlKind.LAST_VALUE, + SqlKind.OTHER_FUNCTION); public PinotWindowExchangeNodeInsertRule(RelBuilderFactory factory) { - super(operand(LogicalWindow.class, any()), factory, null); + super(operand(Window.class, any()), factory, null); } @Override public boolean matches(RelOptRuleCall call) { - if (call.rels.length < 1) { - return false; - } - if (call.rel(0) instanceof Window) { - Window window = call.rel(0); - // Only run the rule if the input isn't already an exchange node - return !PinotRuleUtils.isExchange(window.getInput()); - } - return false; + Window window = call.rel(0); + return !PinotRuleUtils.isExchange(window.getInput()); } @Override public void onMatch(RelOptRuleCall call) { Window window = call.rel(0); - RelNode windowInput = window.getInput(); - // Perform all validations validateWindows(window); + RelNode input = window.getInput(); Window.Group windowGroup = updateLiteralArgumentsInWindowGroup(window); - if (windowGroup.keys.isEmpty() && windowGroup.orderKeys.getKeys().isEmpty()) { + Exchange exchange; + if (windowGroup.keys.isEmpty()) { // Empty OVER() - // Add a single Exchange for empty OVER() since no sort is required + if (windowGroup.orderKeys.getKeys().isEmpty()) { + // Add a single Exchange for empty OVER() if sort is not required - if (PinotRuleUtils.isProject(windowInput)) { - // Check for empty LogicalProject below LogicalWindow. If present modify it to be a Literal only project and add - // a project above - Project project = (Project) ((HepRelVertex) windowInput).getCurrentRel(); - if (project.getProjects().isEmpty()) { - RelNode returnedRelNode = handleEmptyProjectBelowWindow(window, project); - call.transformTo(returnedRelNode); - return; + if (PinotRuleUtils.isProject(input)) { + // Check for empty LogicalProject below LogicalWindow. If present, modify it to be a Literal only project and + // add a project above. + Project project = (Project) ((HepRelVertex) input).getCurrentRel(); + if (project.getProjects().isEmpty()) { + RelNode returnedRelNode = handleEmptyProjectBelowWindow(window, project); + call.transformTo(returnedRelNode); + return; + } } - } - PinotLogicalExchange exchange = PinotLogicalExchange.create(windowInput, - RelDistributions.hash(Collections.emptyList())); - call.transformTo( - LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(), - List.of(windowGroup))); - } else if (windowGroup.keys.isEmpty() && !windowGroup.orderKeys.getKeys().isEmpty()) { - // Only ORDER BY - // Add a LogicalSortExchange with collation on the order by key(s) and an empty hash partition key - // TODO: ORDER BY only type queries need to be sorted on both sender and receiver side for better performance. - // Sorted input data can use a k-way merge instead of a PriorityQueue for sorting. For now support to - // sort on the sender side is not available thus setting this up to only sort on the receiver. - PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput, - RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys, false, true); - call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(), - List.of(windowGroup))); + // TODO: Revisit whether we should use hash distribution + exchange = PinotLogicalExchange.create(input, RelDistributions.hash(List.of())); + } else { + // Only ORDER BY + // Add a LogicalSortExchange with collation on the order by key(s) and an empty hash partition key + // TODO: ORDER BY only type queries need to be sorted on both sender and receiver side for better performance. + // Sorted input data can use a k-way merge instead of a PriorityQueue for sorting. For now support to + // sort on the sender side is not available thus setting this up to only sort on the receiver. + // TODO: Revisit whether we should use hash distribution + exchange = + PinotLogicalSortExchange.create(input, RelDistributions.hash(List.of()), windowGroup.orderKeys, false, + true); + } } else { // All other variants // Assess whether this is a PARTITION BY only query or not (includes queries of the type where PARTITION BY and @@ -134,10 +127,7 @@ public void onMatch(RelOptRuleCall call) { if (isPartitionByOnly) { // Only PARTITION BY or PARTITION BY and ORDER BY on the same key(s) // Add an Exchange hashed on the partition by keys - PinotLogicalExchange exchange = PinotLogicalExchange.create(windowInput, - RelDistributions.hash(windowGroup.keys.toList())); - call.transformTo(LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(), - List.of(windowGroup))); + exchange = PinotLogicalExchange.create(input, RelDistributions.hash(windowGroup.keys.toList())); } else { // PARTITION BY and ORDER BY on different key(s) // Add a LogicalSortExchange hashed on the partition by keys and collation based on order by keys @@ -145,12 +135,13 @@ public void onMatch(RelOptRuleCall call) { // that the data is already partitioned and sorting can be done on the sender side instead. This way // sorting on the receiver side can be a no-op. Add support for this hint and pass it on. Until sender // side sorting is implemented, setting this hint will throw an error on execution. - PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput, - RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys, false, true); - call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(), - List.of(windowGroup))); + exchange = PinotLogicalSortExchange.create(input, RelDistributions.hash(windowGroup.keys.toList()), + windowGroup.orderKeys, false, true); } } + // NOTE: Need to create a new LogicalWindow to use the modified window group. + call.transformTo(LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(), + List.of(windowGroup))); } private Window.Group updateLiteralArgumentsInWindowGroup(Window window) { @@ -288,8 +279,9 @@ private RelNode handleEmptyProjectBelowWindow(Window window, Project project) { final List expsForProjectBelowWindow = Collections.singletonList( rexBuilder.makeLiteral(0, cluster.getTypeFactory().createSqlType(SqlTypeName.INTEGER))); final List expsFieldNamesBelowWindow = Collections.singletonList("winLiteral"); - Project projectBelowWindow = LogicalProject.create(project.getInput(), project.getHints(), - expsForProjectBelowWindow, expsFieldNamesBelowWindow); + Project projectBelowWindow = + LogicalProject.create(project.getInput(), project.getHints(), expsForProjectBelowWindow, + expsFieldNamesBelowWindow); // Fix up the inputs to the Window to include the literal column and add an exchange final RelDataTypeFactory.Builder outputBuilder = cluster.getTypeFactory().builder(); @@ -300,8 +292,8 @@ private RelNode handleEmptyProjectBelowWindow(Window window, Project project) { // ROW_NUMBER(). Add an Exchange with empty hash distribution list PinotLogicalExchange exchange = PinotLogicalExchange.create(projectBelowWindow, RelDistributions.hash(Collections.emptyList())); - Window newWindow = new LogicalWindow(window.getCluster(), window.getTraitSet(), exchange, - window.getConstants(), outputBuilder.build(), window.groups); + Window newWindow = new LogicalWindow(window.getCluster(), window.getTraitSet(), exchange, window.getConstants(), + outputBuilder.build(), window.groups); // Create the LogicalProject above window to remove the literal column final List expsForProjectAboveWindow = new ArrayList<>();