diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 47ffeddedf9..49ae0d9d941 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -1747,6 +1747,19 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons statistics.setOfflineTotalCpuTimeNs(response.getOfflineTotalCpuTimeNs()); statistics.setRealtimeTotalCpuTimeNs(response.getRealtimeTotalCpuTimeNs()); statistics.setNumRowsResultSet(response.getNumRowsResultSet()); + statistics.setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried()); + statistics.setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed()); + statistics.setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched()); + statistics.setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs()); + statistics.setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker()); + statistics.setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer()); + statistics.setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid()); + statistics.setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit()); + statistics.setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue()); + statistics.setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments()); + statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments()); + statistics.setProcessingExceptions(response.getProcessingExceptions().stream().map(Object::toString).collect( + Collectors.toList())); } private String getGlobalQueryId(long requestId) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 7fb9b24c14b..5c4e86a7d4f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -156,6 +156,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan(); Set tableNames = queryPlanResult.getTableNames(); + requestContext.setTableNames(List.copyOf(tableNames)); // Compilation Time. This includes the time taken for parsing, compiling, create stage plans and assigning workers. long compilationEndTimeNs = System.nanoTime(); @@ -235,6 +236,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs)); brokerResponse.setTimeUsedMs(totalTimeMs); requestContext.setQueryProcessingTime(totalTimeMs); + requestContext.setTraceInfo(brokerResponse.getTraceInfo()); augmentStatistics(requestContext, brokerResponse); // Log query and stats diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 3ad86484426..0e0c35d3184 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -144,6 +144,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques _brokerReduceService.reduceOnDataTable(originalBrokerRequest, serverBrokerRequest, dataTableMap, reduceTimeOutMs, _brokerMetrics); final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs; + requestContext.setTraceInfo(brokerResponse.getTraceInfo()); requestContext.setReduceTimeNanos(reduceTimeNanos); _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, reduceTimeNanos); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java index 60e1a1cca81..6ce063d2536 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java @@ -18,6 +18,10 @@ */ package org.apache.pinot.spi.trace; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; @@ -32,7 +36,7 @@ public class DefaultRequestContext implements RequestScope { private int _errorCode = 0; private String _query; - private String _tableName = DEFAULT_TABLE_NAME; + private List _tableNames = new ArrayList<>(); private long _processingTimeMillis = -1; private long _totalDocs; private long _numDocsScanned; @@ -63,6 +67,19 @@ public class DefaultRequestContext implements RequestScope { private FanoutType _fanoutType; private int _numUnavailableSegments; + private long _numConsumingSegmentsQueried; + private long _numConsumingSegmentsProcessed; + private long _numConsumingSegmentsMatched; + private long _minConsumingFreshnessTimeMs; + private long _numSegmentsPrunedByBroker; + private long _numSegmentsPrunedByServer; + private long _numSegmentsPrunedInvalid; + private long _numSegmentsPrunedByLimit; + private long _numSegmentsPrunedByValue; + private long _explainPlanNumEmptyFilterSegments; + private long _explainPlanNumMatchAllFilterSegments; + private Map _traceInfo = new HashMap<>(); + private List _processingExceptions = new ArrayList<>(); public DefaultRequestContext() { } @@ -169,7 +186,12 @@ public void setQuery(String query) { @Override public void setTableName(String tableName) { - _tableName = tableName; + _tableNames.add(tableName); + } + + @Override + public void setTableNames(List tableNames) { + _tableNames.addAll(tableNames); } @Override @@ -239,7 +261,15 @@ public String getQuery() { @Override public String getTableName() { - return _tableName; + if (_tableNames.size() == 0) { + return DEFAULT_TABLE_NAME; + } + return _tableNames.get(0); + } + + @Override + public List getTableNames() { + return _tableNames; } @Override @@ -314,7 +344,7 @@ public int getNumExceptions() { @Override public boolean hasValidTableName() { - return !DEFAULT_TABLE_NAME.equals(_tableName); + return !_tableNames.isEmpty(); } @Override @@ -402,6 +432,136 @@ public void setReduceTimeMillis(long reduceTimeMillis) { _reduceTimeMillis = reduceTimeMillis; } + @Override + public long getNumConsumingSegmentsQueried() { + return _numConsumingSegmentsQueried; + } + + @Override + public void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried) { + _numConsumingSegmentsQueried = numConsumingSegmentsQueried; + } + + @Override + public long getNumConsumingSegmentsProcessed() { + return _numConsumingSegmentsProcessed; + } + + @Override + public void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed) { + _numConsumingSegmentsProcessed = numConsumingSegmentsProcessed; + } + + @Override + public long getNumConsumingSegmentsMatched() { + return _numConsumingSegmentsMatched; + } + + @Override + public void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched) { + _numConsumingSegmentsMatched = numConsumingSegmentsMatched; + } + + @Override + public long getMinConsumingFreshnessTimeMs() { + return _minConsumingFreshnessTimeMs; + } + + @Override + public void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs) { + _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs; + } + + @Override + public long getNumSegmentsPrunedByBroker() { + return _numSegmentsPrunedByBroker; + } + + @Override + public void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker) { + _numSegmentsPrunedByBroker = numSegmentsPrunedByBroker; + } + + @Override + public long getNumSegmentsPrunedByServer() { + return _numSegmentsPrunedByServer; + } + + @Override + public void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer) { + _numSegmentsPrunedByServer = numSegmentsPrunedByServer; + } + + @Override + public long getNumSegmentsPrunedInvalid() { + return _numSegmentsPrunedInvalid; + } + + @Override + public void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid) { + _numSegmentsPrunedInvalid = numSegmentsPrunedInvalid; + } + + @Override + public long getNumSegmentsPrunedByLimit() { + return _numSegmentsPrunedByLimit; + } + + @Override + public void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit) { + _numSegmentsPrunedByLimit = numSegmentsPrunedByLimit; + } + + @Override + public long getNumSegmentsPrunedByValue() { + return _numSegmentsPrunedByValue; + } + + @Override + public void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue) { + _numSegmentsPrunedByValue = numSegmentsPrunedByValue; + } + + @Override + public long getExplainPlanNumEmptyFilterSegments() { + return _explainPlanNumEmptyFilterSegments; + } + + @Override + public void setExplainPlanNumEmptyFilterSegments(long explainPlanNumEmptyFilterSegments) { + _explainPlanNumEmptyFilterSegments = explainPlanNumEmptyFilterSegments; + } + + @Override + public long getExplainPlanNumMatchAllFilterSegments() { + return _explainPlanNumMatchAllFilterSegments; + } + + @Override + public void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments) { + _explainPlanNumMatchAllFilterSegments = explainPlanNumMatchAllFilterSegments; + } + + @Override + public Map getTraceInfo() { + return _traceInfo; + } + + @Override + public void setTraceInfo(Map traceInfo) { + _traceInfo.putAll(traceInfo); + } + + @Override + public List getProcessingExceptions() { + return _processingExceptions; + } + + @Override + public void setProcessingExceptions(List processingExceptions) { + _processingExceptions.addAll(processingExceptions); + } + @Override public void close() { } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java index 89f768608b4..f8ec35a921d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java @@ -18,6 +18,10 @@ */ package org.apache.pinot.spi.trace; +import java.util.List; +import java.util.Map; + + public interface RequestContext { long getOfflineSystemActivitiesCpuTimeNs(); @@ -65,6 +69,8 @@ default boolean isSampledRequest() { void setTableName(String tableName); + void setTableNames(List tableNames); + void setQueryProcessingTime(long processingTimeMillis); void setBrokerId(String brokerId); @@ -93,6 +99,8 @@ default boolean isSampledRequest() { String getTableName(); + List getTableNames(); + long getProcessingTimeMillis(); long getTotalDocs(); @@ -157,6 +165,58 @@ default boolean isSampledRequest() { void setReduceTimeMillis(long reduceTimeMillis); + long getNumConsumingSegmentsQueried(); + + void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried); + + long getNumConsumingSegmentsProcessed(); + + void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed); + + long getNumConsumingSegmentsMatched(); + + void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched); + + long getMinConsumingFreshnessTimeMs(); + + void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs); + + long getNumSegmentsPrunedByBroker(); + + void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker); + + long getNumSegmentsPrunedByServer(); + + void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer); + + long getNumSegmentsPrunedInvalid(); + + void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid); + + long getNumSegmentsPrunedByLimit(); + + void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit); + + long getNumSegmentsPrunedByValue(); + + void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue); + + long getExplainPlanNumEmptyFilterSegments(); + + void setExplainPlanNumEmptyFilterSegments(long explainPlanNumEmptyFilterSegments); + + long getExplainPlanNumMatchAllFilterSegments(); + + void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments); + + Map getTraceInfo(); + + void setTraceInfo(Map traceInfo); + + List getProcessingExceptions(); + + void setProcessingExceptions(List processingExceptions); + enum FanoutType { OFFLINE, REALTIME, HYBRID }