Skip to content

Commit

Permalink
Multi stage explain imprv (#14212)
Browse files Browse the repository at this point in the history
Improve explain plan when servers have different alternatives
  • Loading branch information
gortiz authored Oct 16, 2024
1 parent 77627ba commit 7e84239
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void simpleQuery() {
+ "PinotLogicalExchange(distribution=[broadcast])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect(repeated=[12])\n"
+ " StreamingCombineSelect\n"
+ " SelectStreaming(table=[mytable], totalDocs=[115545])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
Expand All @@ -97,77 +97,70 @@ public void simpleQueryVerbose() {
explainVerbose("SELECT 1 FROM mytable",
//@formatter:off
"Execution Plan\n"
+ "IntermediateCombine\n"
+ " Alternative(servers=[1])\n"
+ " PinotLogicalExchange(distribution=[broadcast])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " Alternative(servers=[1])\n"
+ " PinotLogicalExchange(distribution=[broadcast])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n");
+ "PinotLogicalExchange(distribution=[broadcast])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n"
+ " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n"
+ " Transform(expressions=[['1']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n");
//@formatter:on
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import com.google.common.base.CaseFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.core.query.reduce.ExplainPlanDataTableReducer;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
Expand Down Expand Up @@ -54,10 +51,8 @@
* <li>Its title must contain the text {@code Combine}</li>
* </ol>
*
* Also nodes with only one input are not simplifiable by definition.
*
* Simplified nodes will have a new attribute {@code repeated} that will contain the number of times the node was
* repeated.
* The simplification process merges the inputs of the node into a single node.
* As a corollary, nodes with only one input are already simplified by definition.
*/
public class ExplainNodeSimplifier {
private static final Logger LOGGER = LoggerFactory.getLogger(ExplainNodeSimplifier.class);
Expand All @@ -73,8 +68,6 @@ public static PlanNode simplifyNode(PlanNode root) {
}

private static class Visitor implements PlanNodeVisitor<PlanNode, Void> {
private static final String REPEAT_ATTRIBUTE_KEY = "repeated";

private PlanNode defaultNode(PlanNode node) {
List<PlanNode> inputs = node.getInputs();
List<PlanNode> newInputs = simplifyChildren(inputs);
Expand Down Expand Up @@ -159,13 +152,8 @@ public PlanNode visitExplained(ExplainedNode node, Void context) {
}
child1 = merged;
}
Map<String, Plan.ExplainNode.AttributeValue> attributes = new HashMap<>(node.getAttributes());
Plan.ExplainNode.AttributeValue repeatedValue = Plan.ExplainNode.AttributeValue.newBuilder()
.setLong(simplifiedChildren.size())
.build();
attributes.put(REPEAT_ATTRIBUTE_KEY, repeatedValue);
return new ExplainedNode(node.getStageId(), node.getDataSchema(), node.getNodeHint(),
Collections.singletonList(child1), node.getTitle(), attributes);
Collections.singletonList(child1), node.getTitle(), node.getAttributes());
}

private List<PlanNode> simplifyChildren(List<PlanNode> children) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.pinot.query.planner.explain;

import com.google.common.base.CaseFormat;
import com.google.common.collect.Streams;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.core.operator.ExplainAttributeBuilder;
import org.apache.pinot.core.query.reduce.ExplainPlanDataTableReducer;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
import org.apache.pinot.query.planner.plannode.ExplainedNode;
Expand Down Expand Up @@ -93,6 +97,8 @@ public static PlanNode mergePlans(PlanNode plan1, PlanNode plan2, boolean verbos
}

private static class Visitor implements PlanNodeVisitor<PlanNode, PlanNode> {
public static final String COMBINE
= CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, ExplainPlanDataTableReducer.COMBINE);
private final boolean _verbose;

public Visitor(boolean verbose) {
Expand Down Expand Up @@ -441,7 +447,12 @@ public PlanNode visitExplained(ExplainedNode node, PlanNode context) {
Map<String, Plan.ExplainNode.AttributeValue> selfAttributes = node.getAttributes();
Map<String, Plan.ExplainNode.AttributeValue> otherAttributes = otherNode.getAttributes();

List<PlanNode> children = mergeChildren(node, context);
List<PlanNode> children;
if (node.getTitle().contains(COMBINE)) {
children = mergeCombineChildren(node, otherNode);
} else {
children = mergeChildren(node, context);
}
if (children == null) {
return null;
}
Expand Down Expand Up @@ -486,23 +497,23 @@ public PlanNode visitExplained(ExplainedNode node, PlanNode context) {
if (selfValue.hasLong() && otherValue.hasLong()) { // If both are long, add them
attributeBuilder.putLong(selfEntry.getKey(), selfValue.getLong() + otherValue.getLong());
} else { // Otherwise behave as if they are idempotent
if (!Objects.equals(otherValue, selfEntry.getValue())) {
if (!Objects.equals(otherValue, selfValue)) {
return null;
}
attributeBuilder.putAttribute(selfEntry.getKey(), selfEntry.getValue());
attributeBuilder.putAttribute(selfEntry.getKey(), selfValue);
}
break;
}
case IDEMPOTENT: {
if (!Objects.equals(otherValue, selfEntry.getValue())) {
if (!Objects.equals(otherValue, selfValue)) {
return null;
}
attributeBuilder.putAttribute(selfEntry.getKey(), selfEntry.getValue());
attributeBuilder.putAttribute(selfEntry.getKey(), selfValue);
break;
}
case IGNORABLE: {
if (Objects.equals(otherValue, selfEntry.getValue())) {
attributeBuilder.putAttribute(selfEntry.getKey(), selfEntry.getValue());
if (Objects.equals(otherValue, selfValue)) {
attributeBuilder.putAttribute(selfEntry.getKey(), selfValue);
} else if (_verbose) {
// If mode is verbose, we will not merge the nodes when an ignorable attribute is different
return null;
Expand All @@ -518,13 +529,49 @@ public PlanNode visitExplained(ExplainedNode node, PlanNode context) {
}
for (Map.Entry<String, Plan.ExplainNode.AttributeValue> otherEntry : otherAttributes.entrySet()) {
Plan.ExplainNode.AttributeValue selfValue = selfAttributes.get(otherEntry.getKey());
if (selfValue == null) { // otherwise it has already been merged
attributeBuilder.putAttribute(otherEntry.getKey(), otherEntry.getValue());
if (selfValue != null) { // it has already been merged
continue;
}
switch (otherEntry.getValue().getMergeType()) {
case DEFAULT:
attributeBuilder.putAttribute(otherEntry.getKey(), otherEntry.getValue());
break;
case IGNORABLE:
if (_verbose) {
return null;
}
break;
case IDEMPOTENT:
case UNRECOGNIZED:
default:
return null;
}
}
return new ExplainedNode(node.getStageId(), node.getDataSchema(), node.getNodeHint(), children, node.getTitle(),
attributeBuilder.build());
}
}

private List<PlanNode> mergeCombineChildren(ExplainedNode node1, ExplainedNode node2) {
List<PlanNode> mergedChildren = new ArrayList<>(node1.getInputs().size() + node2.getInputs().size());

Set<PlanNode> pendingOn2 = new HashSet<>(node2.getInputs());
for (PlanNode input1 : node1.getInputs()) {
PlanNode merged = null;
for (PlanNode input2 : pendingOn2) {
merged = mergePlans(input1, input2);
if (merged != null) {
pendingOn2.remove(input2);
break;
}
}
mergedChildren.add(merged != null ? merged : input1);
}
mergedChildren.addAll(pendingOn2);

mergedChildren.sort(PlanNodeSorter.DefaultComparator.INSTANCE);

return mergedChildren;
}
}
}

0 comments on commit 7e84239

Please sign in to comment.