Skip to content

Commit

Permalink
[Multi-stage] Support lookup join (#13966)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Oct 8, 2024
1 parent 5963be2 commit bebd2b4
Show file tree
Hide file tree
Showing 34 changed files with 3,339 additions and 155 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ cscope.*
.externalToolBuilders/
maven-eclipse.xml
target/
examples/
/examples/
/logs/
bin/
*/bin/
.idea
Expand Down
6 changes: 6 additions & 0 deletions pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,17 @@ enum JoinType {
ANTI = 5;
}

enum JoinStrategy {
HASH = 0;
LOOKUP = 1;
}

message JoinNode {
JoinType joinType = 1;
repeated int32 leftKeys = 2;
repeated int32 rightKeys = 3;
repeated Expression nonEquiConditions = 4;
JoinStrategy joinStrategy = 5;
}

enum ExchangeType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,27 @@ public static class WindowHintOptions {

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
// "hash" is the default strategy for non-SEMI joins
public static final String HASH_JOIN_STRATEGY = "hash";
// "dynamic_broadcast" is the default strategy for SEMI joins
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
public static final String HASH_TABLE_JOIN_STRATEGY = "hash_table";
// "lookup" can be used when the right table is a dimension table replicated to all workers
public static final String LOOKUP_JOIN_STRATEGY = "lookup";

/**
* Max rows allowed to build the right table hash collection.
*/
public static final String MAX_ROWS_IN_JOIN = "max_rows_in_join";

/**
* Mode when join overflow happens, supported values: THROW or BREAK.
* THROW(default): Break right table build process, and throw exception, no JOIN with left table performed.
* BREAK: Break right table build process, continue to perform JOIN operation, results might be partial.
*/
public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";

/**
* Indicat that the join operator(s) within a certain selection scope are colocated
* Indicates that the join operator(s) within a certain selection scope are colocated
*/
public static final String IS_COLOCATED_BY_JOIN_KEYS = "is_colocated_by_join_keys";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;


Expand All @@ -48,24 +50,32 @@ public boolean matches(RelOptRuleCall call) {
@Override
public void onMatch(RelOptRuleCall call) {
Join join = call.rel(0);
RelNode leftInput = join.getInput(0);
RelNode rightInput = join.getInput(1);

RelNode leftExchange;
RelNode rightExchange;
RelNode left = PinotRuleUtils.unboxRel(join.getInput(0));
RelNode right = PinotRuleUtils.unboxRel(join.getInput(1));
JoinInfo joinInfo = join.analyzeCondition();

if (joinInfo.leftKeys.isEmpty()) {
// when there's no JOIN key, use broadcast.
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
RelNode newLeft;
RelNode newRight;
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategy)) {
// Lookup join - add local exchange on the left side
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
newRight = right;
} else {
// when join key exists, use hash distribution.
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
// Regular join - add exchange on both sides
if (joinInfo.leftKeys.isEmpty()) {
// Broadcast the right side if there is no join key
newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED);
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
} else {
// Use hash exchange when there are join keys
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys));
newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys));
}
}

call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), leftExchange, rightExchange, join.getJoinType(),
// TODO: Consider creating different JOIN Rel for each join strategy
call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), newLeft, newRight, join.getJoinType(),
join.isSemiJoinDone()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pinot.calcite.rel.rules;

import java.util.Collections;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.hep.HepRelVertex;
Expand All @@ -35,7 +33,6 @@
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.zookeeper.common.StringUtils;


/**
Expand Down Expand Up @@ -125,25 +122,27 @@ public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) {
@Override
public boolean matches(RelOptRuleCall call) {
Join join = call.rel(0);
String joinStrategyString =
PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
List<String> joinStrategies =
joinStrategyString != null ? StringUtils.split(joinStrategyString, ",") : Collections.emptyList();
boolean explicitOtherStrategy = !joinStrategies.isEmpty() && !joinStrategies.contains(
PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);

// Do not apply this rule if join strategy is explicitly set to something other than dynamic broadcast
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
if (joinStrategy != null && !joinStrategy.equals(
PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY)) {
return false;
}

// Do not apply this rule if it is not a SEMI join
JoinInfo joinInfo = join.analyzeCondition();
if (join.getJoinType() != JoinRelType.SEMI || !joinInfo.nonEquiConditions.isEmpty()
|| joinInfo.leftKeys.size() != 1) {
return false;
}

// Apply this rule if the left side can be pushed as dynamic exchange
RelNode left = ((HepRelVertex) join.getLeft()).getCurrentRel();
RelNode right = ((HepRelVertex) join.getRight()).getCurrentRel();
return left instanceof Exchange && right instanceof Exchange
// left side can be pushed as dynamic exchange
&& PinotRuleUtils.canPushDynamicBroadcastToLeaf(left.getInput(0))
// default enable dynamic broadcast for SEMI join unless other join strategy were specified
&& !explicitOtherStrategy
// condition for SEMI join
&& join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty()
&& joinInfo.leftKeys.size() == 1;
return left instanceof Exchange && right instanceof Exchange && PinotRuleUtils.canPushDynamicBroadcastToLeaf(
left.getInput(0));
}

@Override
Expand All @@ -158,15 +157,10 @@ public void onMatch(RelOptRuleCall call) {
boolean isColocatedJoin =
PinotHintStrategyTable.isHintOptionTrue(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
PinotLogicalExchange dynamicBroadcastExchange;
RelNode rightInput = right.getInput();
if (isColocatedJoin) {
RelDistribution dist = RelDistributions.hash(join.analyzeCondition().rightKeys);
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
} else {
RelDistribution dist = RelDistributions.BROADCAST_DISTRIBUTED;
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
}
RelDistribution relDistribution = isColocatedJoin ? RelDistributions.hash(join.analyzeCondition().rightKeys)
: RelDistributions.BROADCAST_DISTRIBUTED;
PinotLogicalExchange dynamicBroadcastExchange =
PinotLogicalExchange.create(right.getInput(), relDistribution, PinotRelExchangeType.PIPELINE_BREAKER);

call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), left.getInput(), dynamicBroadcastExchange,
join.getJoinType(), join.isSemiJoinDone()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.query.planner.logical;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -32,9 +33,12 @@
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
Expand All @@ -45,13 +49,18 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.calcite.rel.rules.PinotRuleUtils;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.DataSchema;
Expand Down Expand Up @@ -264,11 +273,62 @@ private TableScanNode convertLogicalTableScan(LogicalTableScan node) {
convertInputs(node.getInputs()), tableName, columns);
}

private JoinNode convertLogicalJoin(LogicalJoin node) {
JoinInfo joinInfo = node.analyzeCondition();
return new JoinNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()),
convertInputs(node.getInputs()), node.getJoinType(), joinInfo.leftKeys, joinInfo.rightKeys,
RexExpressionUtils.fromRexNodes(joinInfo.nonEquiConditions));
private JoinNode convertLogicalJoin(LogicalJoin join) {
JoinInfo joinInfo = join.analyzeCondition();
DataSchema dataSchema = toDataSchema(join.getRowType());
List<PlanNode> inputs = convertInputs(join.getInputs());
JoinRelType joinType = join.getJoinType();

// Run some validations for join
Preconditions.checkState(inputs.size() == 2, "Join should have exactly 2 inputs, got: %s", inputs.size());
PlanNode left = inputs.get(0);
PlanNode right = inputs.get(1);
int numLeftColumns = left.getDataSchema().size();
int numResultColumns = dataSchema.size();
if (joinType.projectsRight()) {
int numRightColumns = right.getDataSchema().size();
Preconditions.checkState(numLeftColumns + numRightColumns == numResultColumns,
"Invalid number of columns for join type: %s, left: %s, right: %s, result: %s", joinType, numLeftColumns,
numRightColumns, numResultColumns);
} else {
Preconditions.checkState(numLeftColumns == numResultColumns,
"Invalid number of columns for join type: %s, left: %s, result: %s", joinType, numLeftColumns,
numResultColumns);
}

// Check if the join hint specifies the join strategy
JoinNode.JoinStrategy joinStrategy;
ImmutableList<RelHint> relHints = join.getHints();
String joinStrategyHint = PinotHintStrategyTable.getHintOption(relHints, PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategyHint)) {
joinStrategy = JoinNode.JoinStrategy.LOOKUP;

// Run some validations for lookup join
Preconditions.checkArgument(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys");
// Right table should be a dimension table, and the right input should be an identifier only ProjectNode over
// TableScanNode.
RelNode rightInput = PinotRuleUtils.unboxRel(join.getRight());
Preconditions.checkState(rightInput instanceof Project, "Right input for lookup join must be a Project, got: %s",
rightInput.getClass().getSimpleName());
Project project = (Project) rightInput;
for (RexNode node : project.getProjects()) {
Preconditions.checkState(node instanceof RexInputRef,
"Right input for lookup join must be an identifier (RexInputRef) only Project, got: %s in project",
node.getClass().getSimpleName());
}
RelNode projectInput = PinotRuleUtils.unboxRel(project.getInput());
Preconditions.checkState(projectInput instanceof TableScan,
"Right input for lookup join must be a Project over TableScan, got Project over: %s",
projectInput.getClass().getSimpleName());
} else {
// TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy
joinStrategy = JoinNode.JoinStrategy.HASH;
}

return new JoinNode(DEFAULT_STAGE_ID, dataSchema, NodeHint.fromRelHints(relHints), inputs, joinType,
joinInfo.leftKeys, joinInfo.rightKeys, RexExpressionUtils.fromRexNodes(joinInfo.nonEquiConditions),
joinStrategy);
}

private List<PlanNode> convertInputs(List<RelNode> inputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ public class JoinNode extends BasePlanNode {
private final List<Integer> _leftKeys;
private final List<Integer> _rightKeys;
private final List<RexExpression> _nonEquiConditions;
private final JoinStrategy _joinStrategy;

public JoinNode(int stageId, DataSchema dataSchema, NodeHint nodeHint, List<PlanNode> inputs, JoinRelType joinType,
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> nonEquiConditions) {
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> nonEquiConditions,
JoinStrategy joinStrategy) {
super(stageId, dataSchema, nodeHint, inputs);
_joinType = joinType;
_leftKeys = leftKeys;
_rightKeys = rightKeys;
_nonEquiConditions = nonEquiConditions;
_joinStrategy = joinStrategy;
}

public JoinRelType getJoinType() {
Expand All @@ -56,6 +59,10 @@ public List<RexExpression> getNonEquiConditions() {
return _nonEquiConditions;
}

public JoinStrategy getJoinStrategy() {
return _joinStrategy;
}

@Override
public String explain() {
return "JOIN";
Expand All @@ -68,7 +75,8 @@ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {

@Override
public PlanNode withInputs(List<PlanNode> inputs) {
return new JoinNode(_stageId, _dataSchema, _nodeHint, inputs, _joinType, _leftKeys, _rightKeys, _nonEquiConditions);
return new JoinNode(_stageId, _dataSchema, _nodeHint, inputs, _joinType, _leftKeys, _rightKeys, _nonEquiConditions,
_joinStrategy);
}

@Override
Expand All @@ -84,11 +92,16 @@ public boolean equals(Object o) {
}
JoinNode joinNode = (JoinNode) o;
return _joinType == joinNode._joinType && Objects.equals(_leftKeys, joinNode._leftKeys) && Objects.equals(
_rightKeys, joinNode._rightKeys) && Objects.equals(_nonEquiConditions, joinNode._nonEquiConditions);
_rightKeys, joinNode._rightKeys) && Objects.equals(_nonEquiConditions, joinNode._nonEquiConditions)
&& _joinStrategy == joinNode._joinStrategy;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions);
return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions, _joinStrategy);
}

public enum JoinStrategy {
HASH, LOOKUP
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public static NodeHint fromRelHints(List<RelHint> relHints) {
} else {
hintOptions = Maps.newHashMapWithExpectedSize(numHints);
for (RelHint relHint : relHints) {
hintOptions.put(relHint.hintName, relHint.kvOptions);
// Put the first matching hint to match the behavior of PinotHintStrategyTable
hintOptions.putIfAbsent(relHint.hintName, relHint.kvOptions);
}
}
return new NodeHint(hintOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private static JoinNode deserializeJoinNode(Plan.PlanNode protoNode) {
Plan.JoinNode protoJoinNode = protoNode.getJoinNode();
return new JoinNode(protoNode.getStageId(), extractDataSchema(protoNode), extractNodeHint(protoNode),
extractInputs(protoNode), convertJoinType(protoJoinNode.getJoinType()), protoJoinNode.getLeftKeysList(),
protoJoinNode.getRightKeysList(), convertExpressions(protoJoinNode.getNonEquiConditionsList()));
protoJoinNode.getRightKeysList(), convertExpressions(protoJoinNode.getNonEquiConditionsList()),
convertJoinStrategy(protoJoinNode.getJoinStrategy()));
}

private static MailboxReceiveNode deserializeMailboxReceiveNode(Plan.PlanNode protoNode) {
Expand Down Expand Up @@ -274,6 +275,17 @@ private static JoinRelType convertJoinType(Plan.JoinType joinType) {
}
}

private static JoinNode.JoinStrategy convertJoinStrategy(Plan.JoinStrategy joinStrategy) {
switch (joinStrategy) {
case HASH:
return JoinNode.JoinStrategy.HASH;
case LOOKUP:
return JoinNode.JoinStrategy.LOOKUP;
default:
throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy);
}
}

private static PinotRelExchangeType convertExchangeType(Plan.ExchangeType exchangeType) {
switch (exchangeType) {
case STREAMING:
Expand Down
Loading

0 comments on commit bebd2b4

Please sign in to comment.