Skip to content

Commit

Permalink
[Multi-stage] Support partition based leaf stage processing (apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Aug 2, 2023
1 parent 044588f commit 5d312e5
Show file tree
Hide file tree
Showing 23 changed files with 454 additions and 416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public static class AggregateOptions {

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
public static final String IS_COLOCATED_BY_JOIN_KEYS = "is_colocated_by_join_keys";
}

public static class TableHintOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.PinotLogicalExchange;
import org.apache.calcite.tools.RelBuilderFactory;
Expand Down Expand Up @@ -65,14 +63,7 @@ public void onMatch(RelOptRuleCall call) {
RelNode rightExchange;
JoinInfo joinInfo = join.analyzeCondition();

boolean isColocatedJoin =
PinotHintStrategyTable.isHintOptionTrue(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
if (isColocatedJoin) {
// join exchange are colocated, we should directly pass through via join key
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.SINGLETON);
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.SINGLETON);
} else if (joinInfo.leftKeys.isEmpty()) {
if (joinInfo.leftKeys.isEmpty()) {
// when there's no JOIN key, use broadcast.
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,16 @@ public void onMatch(RelOptRuleCall call) {
PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex
? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());

boolean isColocatedJoin = PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
PinotLogicalExchange dynamicBroadcastExchange = isColocatedJoin
? PinotLogicalExchange.create(right.getInput(), RelDistributions.SINGLETON,
PinotRelExchangeType.PIPELINE_BREAKER)
: PinotLogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED,
PinotLogicalExchange dynamicBroadcastExchange =
PinotLogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED,
PinotRelExchangeType.PIPELINE_BREAKER);
Join dynamicFilterJoin =
new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
ImmutableList.copyOf(join.getSystemFieldList()));
// adding pass-through exchange after join b/c currently leaf-stage doesn't support chaining operator(s) after JOIN
// TODO: support pass-through for singleton again when non-colocated.
// TODO: this is b/c #10886 alters the singleton exchange and it no longer works if join is not colocated.
PinotLogicalExchange passThroughAfterJoinExchange = isColocatedJoin
? PinotLogicalExchange.create(dynamicFilterJoin, RelDistributions.SINGLETON)
: PinotLogicalExchange.create(dynamicFilterJoin, RelDistributions.hash(join.analyzeCondition().leftKeys));
PinotLogicalExchange passThroughAfterJoinExchange =
PinotLogicalExchange.create(dynamicFilterJoin, RelDistributions.hash(join.analyzeCondition().leftKeys));
call.transformTo(passThroughAfterJoinExchange);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.SortExchange;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
Expand Down Expand Up @@ -134,8 +132,8 @@ private static PlanNode convertLogicalExchange(Exchange node, int currentStageId
// Compute all the tables involved under this exchange node
Set<String> tableNames = getTableNamesFromRelRoot(node);

return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), exchangeType,
tableNames, node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver);
return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), exchangeType, tableNames,
node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver);
}

private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
Expand Down Expand Up @@ -187,11 +185,8 @@ private static PlanNode convertLogicalJoin(LogicalJoin node, int currentStageId)
new FieldSelectionKeySelector(joinInfo.rightKeys));
List<RexExpression> joinClause =
joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
boolean isColocatedJoin =
PinotHintStrategyTable.isHintOptionTrue(node.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
return new JoinNode(currentStageId, toDataSchema(node.getRowType()), toDataSchema(node.getLeft().getRowType()),
toDataSchema(node.getRight().getRowType()), joinType, joinKeys, joinClause, isColocatedJoin);
toDataSchema(node.getRight().getRowType()), joinType, joinKeys, joinClause);
}

private static DataSchema toDataSchema(RelDataType rowType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.routing.QueryServerInstance;
Expand All @@ -41,7 +42,9 @@
* </ul>
*/
public class DispatchablePlanMetadata implements Serializable {
// These 2 fields are extracted from TableScanNode
private final List<String> _scannedTables;
private Map<String, String> _tableOptions;

// used for assigning server/worker nodes.
private Map<QueryServerInstance, List<Integer>> _serverInstanceToWorkerIdMap;
Expand All @@ -64,6 +67,9 @@ public class DispatchablePlanMetadata implements Serializable {
// whether a stage requires singleton instance to execute, e.g. stage contains global reduce (sort/agg) operator.
private boolean _requiresSingletonInstance;

// whether a stage is partitioned table scan
private boolean _isPartitionedTableScan;

// Total worker count of this stage.
private int _totalWorkerCount;

Expand All @@ -72,8 +78,6 @@ public DispatchablePlanMetadata() {
_serverInstanceToWorkerIdMap = new HashMap<>();
_workerIdToSegmentsMap = new HashMap<>();
_workerIdToMailboxesMap = new HashMap<>();
_timeBoundaryInfo = null;
_requiresSingletonInstance = false;
_tableToUnavailableSegmentsMap = new HashMap<>();
}

Expand All @@ -85,6 +89,15 @@ public void addScannedTable(String tableName) {
_scannedTables.add(tableName);
}

@Nullable
public Map<String, String> getTableOptions() {
return _tableOptions;
}

public void setTableOptions(Map<String, String> tableOptions) {
_tableOptions = tableOptions;
}

// -----------------------------------------------
// attached physical plan context.
// -----------------------------------------------
Expand All @@ -93,8 +106,7 @@ public Map<Integer, Map<String, List<String>>> getWorkerIdToSegmentsMap() {
return _workerIdToSegmentsMap;
}

public void setWorkerIdToSegmentsMap(
Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap) {
public void setWorkerIdToSegmentsMap(Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap) {
_workerIdToSegmentsMap = workerIdToSegmentsMap;
}

Expand Down Expand Up @@ -135,6 +147,14 @@ public void setRequireSingleton(boolean newRequireInstance) {
_requiresSingletonInstance = _requiresSingletonInstance || newRequireInstance;
}

public boolean isPartitionedTableScan() {
return _isPartitionedTableScan;
}

public void setPartitionedTableScan(boolean isPartitionedTableScan) {
_isPartitionedTableScan = isPartitionedTableScan;
}

public int getTotalWorkerCount() {
return _totalWorkerCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.planner.physical;

import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
import org.apache.pinot.query.planner.plannode.FilterNode;
Expand Down Expand Up @@ -127,6 +128,7 @@ public Void visitSort(SortNode node, DispatchablePlanContext context) {
public Void visitTableScan(TableScanNode node, DispatchablePlanContext context) {
DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context);
dispatchablePlanMetadata.addScannedTable(node.getTableName());
dispatchablePlanMetadata.setTableOptions(node.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,39 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
}
});
} else if (senderMetadata.isPartitionedTableScan()) {
// For partitioned table scan, send the data to the worker with the same worker id (not necessary the same
// instance)
// TODO: Support further split the single partition into multiple workers
senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
for (int workerId : senderWorkerIds) {
receiverWorkerIdsMap.forEach((receiverServerInstance, receiverWorkerIds) -> {
for (int receiverWorkerId : receiverWorkerIds) {
if (receiverWorkerId == workerId) {
String mailboxId =
MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(receiverServerInstance, workerId)),
Collections.emptyMap());
MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
Collections.singletonList(new VirtualServerAddress(senderServerInstance, workerId)),
Collections.emptyMap());
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(receiverFragmentId, serderMailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(senderFragmentId, receiverMailboxMetadata);
break;
}
}
});
}
});
} else {
// For other exchange types, send the data to all the instances in the receiver fragment
// TODO: Add support for more exchange types
// TODO:
// 1. Add support for more exchange types
// 2. Keep the receiver worker id sequential in the senderMailboxMetadata so that the partitionId aligns with
// the workerId. It is useful for JOIN query when only left table is partitioned.
senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
for (int senderWorkerId : senderWorkerIds) {
Map<Integer, MailboxMetadata> senderMailboxMetadataMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public class JoinNode extends AbstractPlanNode {
@ProtoProperties
private List<RexExpression> _joinClause;
@ProtoProperties
private boolean _isColocatedJoin;
@ProtoProperties
private List<String> _leftColumnNames;
@ProtoProperties
private List<String> _rightColumnNames;
Expand All @@ -47,14 +45,13 @@ public JoinNode(int planFragmentId) {
}

public JoinNode(int planFragmentId, DataSchema dataSchema, DataSchema leftSchema, DataSchema rightSchema,
JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> joinClause, boolean isColocatedJoin) {
JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> joinClause) {
super(planFragmentId, dataSchema);
_leftColumnNames = Arrays.asList(leftSchema.getColumnNames());
_rightColumnNames = Arrays.asList(rightSchema.getColumnNames());
_joinRelType = joinRelType;
_joinKeys = joinKeys;
_joinClause = joinClause;
_isColocatedJoin = isColocatedJoin;
}

public JoinRelType getJoinRelType() {
Expand All @@ -69,10 +66,6 @@ public List<RexExpression> getJoinClauses() {
return _joinClause;
}

public boolean isColocatedJoin() {
return _isColocatedJoin;
}

public List<String> getLeftColumnNames() {
return _leftColumnNames;
}
Expand Down
Loading

0 comments on commit 5d312e5

Please sign in to comment.