From 7e84239568e4c2045af8ef7d39ba3ae6d19d832a Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Jaureguizar Date: Wed, 16 Oct 2024 15:47:50 +0200 Subject: [PATCH] Multi stage explain imprv (#14212) Improve explain plan when servers have different alternatives --- ...ultiStageEngineExplainIntegrationTest.java | 137 +++++++++--------- .../explain/ExplainNodeSimplifier.java | 18 +-- .../query/planner/explain/PlanNodeMerger.java | 65 +++++++-- 3 files changed, 124 insertions(+), 96 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java index 3c8e931616e..8303a583d38 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java @@ -83,7 +83,7 @@ public void simpleQuery() { + "PinotLogicalExchange(distribution=[broadcast])\n" + " LeafStageCombineOperator(table=[mytable])\n" + " StreamingInstanceResponse\n" - + " StreamingCombineSelect(repeated=[12])\n" + + " StreamingCombineSelect\n" + " SelectStreaming(table=[mytable], totalDocs=[115545])\n" + " Transform(expressions=[['1']])\n" + " Project(columns=[[]])\n" @@ -97,77 +97,70 @@ public void simpleQueryVerbose() { explainVerbose("SELECT 1 FROM mytable", //@formatter:off "Execution Plan\n" - + "IntermediateCombine\n" - + " Alternative(servers=[1])\n" - + " PinotLogicalExchange(distribution=[broadcast])\n" - + " LeafStageCombineOperator(table=[mytable])\n" - + " StreamingInstanceResponse\n" - + " StreamingCombineSelect\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " Alternative(servers=[1])\n" - + " PinotLogicalExchange(distribution=[broadcast])\n" - + " LeafStageCombineOperator(table=[mytable])\n" - + " StreamingInstanceResponse\n" - + " StreamingCombineSelect\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n" - + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" - + " Transform(expressions=[['1']])\n" - + " Project(columns=[[]])\n" - + " DocIdSet(maxDocs=[10000])\n" - + " FilterMatchEntireSegment(numDocs=[any])\n"); + + "PinotLogicalExchange(distribution=[broadcast])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " StreamingCombineSelect\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n" + + " SelectStreaming(segment=[any], table=[mytable], totalDocs=[any])\n" + + " Transform(expressions=[['1']])\n" + + " Project(columns=[[]])\n" + + " DocIdSet(maxDocs=[10000])\n" + + " FilterMatchEntireSegment(numDocs=[any])\n"); //@formatter:on } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java index e7a1b8d48d0..17b0b02fe1e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java @@ -21,10 +21,7 @@ import com.google.common.base.CaseFormat; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.apache.pinot.common.proto.Plan; import org.apache.pinot.core.query.reduce.ExplainPlanDataTableReducer; import org.apache.pinot.query.planner.plannode.AggregateNode; import org.apache.pinot.query.planner.plannode.ExchangeNode; @@ -54,10 +51,8 @@ *
  • Its title must contain the text {@code Combine}
  • * * - * Also nodes with only one input are not simplifiable by definition. - * - * Simplified nodes will have a new attribute {@code repeated} that will contain the number of times the node was - * repeated. + * The simplification process merges the inputs of the node into a single node. + * As a corollary, nodes with only one input are already simplified by definition. */ public class ExplainNodeSimplifier { private static final Logger LOGGER = LoggerFactory.getLogger(ExplainNodeSimplifier.class); @@ -73,8 +68,6 @@ public static PlanNode simplifyNode(PlanNode root) { } private static class Visitor implements PlanNodeVisitor { - private static final String REPEAT_ATTRIBUTE_KEY = "repeated"; - private PlanNode defaultNode(PlanNode node) { List inputs = node.getInputs(); List newInputs = simplifyChildren(inputs); @@ -159,13 +152,8 @@ public PlanNode visitExplained(ExplainedNode node, Void context) { } child1 = merged; } - Map attributes = new HashMap<>(node.getAttributes()); - Plan.ExplainNode.AttributeValue repeatedValue = Plan.ExplainNode.AttributeValue.newBuilder() - .setLong(simplifiedChildren.size()) - .build(); - attributes.put(REPEAT_ATTRIBUTE_KEY, repeatedValue); return new ExplainedNode(node.getStageId(), node.getDataSchema(), node.getNodeHint(), - Collections.singletonList(child1), node.getTitle(), attributes); + Collections.singletonList(child1), node.getTitle(), node.getAttributes()); } private List simplifyChildren(List children) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java index 1541a8b6742..aa2e44173b4 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java @@ -18,14 +18,18 @@ */ package org.apache.pinot.query.planner.explain; +import com.google.common.base.CaseFormat; import com.google.common.collect.Streams; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.common.proto.Plan; import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.query.reduce.ExplainPlanDataTableReducer; import org.apache.pinot.query.planner.plannode.AggregateNode; import org.apache.pinot.query.planner.plannode.ExchangeNode; import org.apache.pinot.query.planner.plannode.ExplainedNode; @@ -93,6 +97,8 @@ public static PlanNode mergePlans(PlanNode plan1, PlanNode plan2, boolean verbos } private static class Visitor implements PlanNodeVisitor { + public static final String COMBINE + = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, ExplainPlanDataTableReducer.COMBINE); private final boolean _verbose; public Visitor(boolean verbose) { @@ -441,7 +447,12 @@ public PlanNode visitExplained(ExplainedNode node, PlanNode context) { Map selfAttributes = node.getAttributes(); Map otherAttributes = otherNode.getAttributes(); - List children = mergeChildren(node, context); + List children; + if (node.getTitle().contains(COMBINE)) { + children = mergeCombineChildren(node, otherNode); + } else { + children = mergeChildren(node, context); + } if (children == null) { return null; } @@ -486,23 +497,23 @@ public PlanNode visitExplained(ExplainedNode node, PlanNode context) { if (selfValue.hasLong() && otherValue.hasLong()) { // If both are long, add them attributeBuilder.putLong(selfEntry.getKey(), selfValue.getLong() + otherValue.getLong()); } else { // Otherwise behave as if they are idempotent - if (!Objects.equals(otherValue, selfEntry.getValue())) { + if (!Objects.equals(otherValue, selfValue)) { return null; } - attributeBuilder.putAttribute(selfEntry.getKey(), selfEntry.getValue()); + attributeBuilder.putAttribute(selfEntry.getKey(), selfValue); } break; } case IDEMPOTENT: { - if (!Objects.equals(otherValue, selfEntry.getValue())) { + if (!Objects.equals(otherValue, selfValue)) { return null; } - attributeBuilder.putAttribute(selfEntry.getKey(), selfEntry.getValue()); + attributeBuilder.putAttribute(selfEntry.getKey(), selfValue); break; } case IGNORABLE: { - if (Objects.equals(otherValue, selfEntry.getValue())) { - attributeBuilder.putAttribute(selfEntry.getKey(), selfEntry.getValue()); + if (Objects.equals(otherValue, selfValue)) { + attributeBuilder.putAttribute(selfEntry.getKey(), selfValue); } else if (_verbose) { // If mode is verbose, we will not merge the nodes when an ignorable attribute is different return null; @@ -518,13 +529,49 @@ public PlanNode visitExplained(ExplainedNode node, PlanNode context) { } for (Map.Entry otherEntry : otherAttributes.entrySet()) { Plan.ExplainNode.AttributeValue selfValue = selfAttributes.get(otherEntry.getKey()); - if (selfValue == null) { // otherwise it has already been merged - attributeBuilder.putAttribute(otherEntry.getKey(), otherEntry.getValue()); + if (selfValue != null) { // it has already been merged + continue; + } + switch (otherEntry.getValue().getMergeType()) { + case DEFAULT: + attributeBuilder.putAttribute(otherEntry.getKey(), otherEntry.getValue()); + break; + case IGNORABLE: + if (_verbose) { + return null; + } + break; + case IDEMPOTENT: + case UNRECOGNIZED: + default: + return null; } } return new ExplainedNode(node.getStageId(), node.getDataSchema(), node.getNodeHint(), children, node.getTitle(), attributeBuilder.build()); } } + + private List mergeCombineChildren(ExplainedNode node1, ExplainedNode node2) { + List mergedChildren = new ArrayList<>(node1.getInputs().size() + node2.getInputs().size()); + + Set pendingOn2 = new HashSet<>(node2.getInputs()); + for (PlanNode input1 : node1.getInputs()) { + PlanNode merged = null; + for (PlanNode input2 : pendingOn2) { + merged = mergePlans(input1, input2); + if (merged != null) { + pendingOn2.remove(input2); + break; + } + } + mergedChildren.add(merged != null ? merged : input1); + } + mergedChildren.addAll(pendingOn2); + + mergedChildren.sort(PlanNodeSorter.DefaultComparator.INSTANCE); + + return mergedChildren; + } } }