Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more information in RequestContext class #11708

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,19 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons
statistics.setOfflineTotalCpuTimeNs(response.getOfflineTotalCpuTimeNs());
statistics.setRealtimeTotalCpuTimeNs(response.getRealtimeTotalCpuTimeNs());
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
statistics.setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried());
statistics.setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed());
statistics.setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched());
statistics.setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs());
statistics.setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker());
statistics.setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer());
statistics.setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid());
statistics.setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit());
statistics.setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
statistics.setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
statistics.setProcessingExceptions(response.getProcessingExceptions().stream().map(Object::toString).collect(
Collectors.toList()));
}

private String getGlobalQueryId(long requestId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S

DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
Set<String> tableNames = queryPlanResult.getTableNames();
requestContext.setTableNames(List.copyOf(tableNames));

// Compilation Time. This includes the time taken for parsing, compiling, create stage plans and assigning workers.
long compilationEndTimeNs = System.nanoTime();
Expand Down Expand Up @@ -235,6 +236,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs));
brokerResponse.setTimeUsedMs(totalTimeMs);
requestContext.setQueryProcessingTime(totalTimeMs);
requestContext.setTraceInfo(brokerResponse.getTraceInfo());
augmentStatistics(requestContext, brokerResponse);

// Log query and stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
_brokerReduceService.reduceOnDataTable(originalBrokerRequest, serverBrokerRequest, dataTableMap,
reduceTimeOutMs, _brokerMetrics);
final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
requestContext.setTraceInfo(brokerResponse.getTraceInfo());
requestContext.setReduceTimeNanos(reduceTimeNanos);
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, reduceTimeNanos);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pinot.spi.trace;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;


Expand All @@ -32,7 +36,7 @@ public class DefaultRequestContext implements RequestScope {

private int _errorCode = 0;
private String _query;
private String _tableName = DEFAULT_TABLE_NAME;
private List<String> _tableNames = new ArrayList<>();
private long _processingTimeMillis = -1;
private long _totalDocs;
private long _numDocsScanned;
Expand Down Expand Up @@ -63,6 +67,19 @@ public class DefaultRequestContext implements RequestScope {

private FanoutType _fanoutType;
private int _numUnavailableSegments;
private long _numConsumingSegmentsQueried;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR -- I don't think we track RequestCompilationTime in BrokerResponse or RequestContext statistics. It is emitted as a metric from the code.

But I guess we should. Within LinkedIn we have always had requests that can take tens to hundreds of ms to compile a query. Will become more relevant in the context of multi stage engine imo.

Need not necessarily do as part of this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the plan is to add more metrics as need be. Thanks for bring this up - RequestCompilationTime. I am planning to add stageStats as well for multistage queries but that needs some discussion. Will take adding RequestCompilationTime in a follow-up PR. This PR tries to get alignment between brokerResponse and RequestContext class.

private long _numConsumingSegmentsProcessed;
private long _numConsumingSegmentsMatched;
private long _minConsumingFreshnessTimeMs;
private long _numSegmentsPrunedByBroker;
private long _numSegmentsPrunedByServer;
private long _numSegmentsPrunedInvalid;
private long _numSegmentsPrunedByLimit;
private long _numSegmentsPrunedByValue;
private long _explainPlanNumEmptyFilterSegments;
private long _explainPlanNumMatchAllFilterSegments;
private Map<String, String> _traceInfo = new HashMap<>();
private List<String> _processingExceptions = new ArrayList<>();

public DefaultRequestContext() {
}
Expand Down Expand Up @@ -169,7 +186,12 @@ public void setQuery(String query) {

@Override
public void setTableName(String tableName) {
_tableName = tableName;
_tableNames.add(tableName);
}

@Override
public void setTableNames(List<String> tableNames) {
_tableNames.addAll(tableNames);
}

@Override
Expand Down Expand Up @@ -239,7 +261,15 @@ public String getQuery() {

@Override
public String getTableName() {
return _tableName;
if (_tableNames.size() == 0) {
return DEFAULT_TABLE_NAME;
}
return _tableNames.get(0);
}

@Override
public List<String> getTableNames() {
return _tableNames;
}

@Override
Expand Down Expand Up @@ -314,7 +344,7 @@ public int getNumExceptions() {

@Override
public boolean hasValidTableName() {
return !DEFAULT_TABLE_NAME.equals(_tableName);
return !_tableNames.isEmpty();
}

@Override
Expand Down Expand Up @@ -402,6 +432,136 @@ public void setReduceTimeMillis(long reduceTimeMillis) {
_reduceTimeMillis = reduceTimeMillis;
}

@Override
public long getNumConsumingSegmentsQueried() {
return _numConsumingSegmentsQueried;
}

@Override
public void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) I think we can use int instead of long here. May not really matter in the grand scheme of things but off late we have seen a lot of heap usage improvement opportunities from production heap dumps. So just something to consider

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this long is from BrokerResponse

_numConsumingSegmentsQueried = numConsumingSegmentsQueried;
}

@Override
public long getNumConsumingSegmentsProcessed() {
return _numConsumingSegmentsProcessed;
}

@Override
public void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed) {
_numConsumingSegmentsProcessed = numConsumingSegmentsProcessed;
}

@Override
public long getNumConsumingSegmentsMatched() {
return _numConsumingSegmentsMatched;
}

@Override
public void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched) {
_numConsumingSegmentsMatched = numConsumingSegmentsMatched;
}

@Override
public long getMinConsumingFreshnessTimeMs() {
return _minConsumingFreshnessTimeMs;
}

@Override
public void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs) {
_minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
}

@Override
public long getNumSegmentsPrunedByBroker() {
return _numSegmentsPrunedByBroker;
}

@Override
public void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker) {
_numSegmentsPrunedByBroker = numSegmentsPrunedByBroker;
}

@Override
public long getNumSegmentsPrunedByServer() {
return _numSegmentsPrunedByServer;
}

@Override
public void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer) {
_numSegmentsPrunedByServer = numSegmentsPrunedByServer;
}

@Override
public long getNumSegmentsPrunedInvalid() {
return _numSegmentsPrunedInvalid;
}

@Override
public void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid) {
_numSegmentsPrunedInvalid = numSegmentsPrunedInvalid;
}

@Override
public long getNumSegmentsPrunedByLimit() {
return _numSegmentsPrunedByLimit;
}

@Override
public void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit) {
_numSegmentsPrunedByLimit = numSegmentsPrunedByLimit;
}

@Override
public long getNumSegmentsPrunedByValue() {
return _numSegmentsPrunedByValue;
}

@Override
public void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue) {
_numSegmentsPrunedByValue = numSegmentsPrunedByValue;
}

@Override
public long getExplainPlanNumEmptyFilterSegments() {
return _explainPlanNumEmptyFilterSegments;
}

@Override
public void setExplainPlanNumEmptyFilterSegments(long explainPlanNumEmptyFilterSegments) {
_explainPlanNumEmptyFilterSegments = explainPlanNumEmptyFilterSegments;
}

@Override
public long getExplainPlanNumMatchAllFilterSegments() {
return _explainPlanNumMatchAllFilterSegments;
}

@Override
public void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments) {
_explainPlanNumMatchAllFilterSegments = explainPlanNumMatchAllFilterSegments;
}

@Override
public Map<String, String> getTraceInfo() {
return _traceInfo;
}

@Override
public void setTraceInfo(Map<String, String> traceInfo) {
_traceInfo.putAll(traceInfo);
}

@Override
public List<String> getProcessingExceptions() {
return _processingExceptions;
}

@Override
public void setProcessingExceptions(List<String> processingExceptions) {
_processingExceptions.addAll(processingExceptions);
}

@Override
public void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pinot.spi.trace;

import java.util.List;
import java.util.Map;


public interface RequestContext {
long getOfflineSystemActivitiesCpuTimeNs();

Expand Down Expand Up @@ -65,6 +69,8 @@ default boolean isSampledRequest() {

void setTableName(String tableName);

void setTableNames(List<String> tableNames);
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved

void setQueryProcessingTime(long processingTimeMillis);

void setBrokerId(String brokerId);
Expand Down Expand Up @@ -93,6 +99,8 @@ default boolean isSampledRequest() {

String getTableName();

List<String> getTableNames();
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved

long getProcessingTimeMillis();

long getTotalDocs();
Expand Down Expand Up @@ -157,6 +165,58 @@ default boolean isSampledRequest() {

void setReduceTimeMillis(long reduceTimeMillis);

long getNumConsumingSegmentsQueried();

void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried);

long getNumConsumingSegmentsProcessed();

void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed);

long getNumConsumingSegmentsMatched();

void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched);

long getMinConsumingFreshnessTimeMs();

void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs);

long getNumSegmentsPrunedByBroker();

void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker);

long getNumSegmentsPrunedByServer();

void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer);

long getNumSegmentsPrunedInvalid();

void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid);

long getNumSegmentsPrunedByLimit();

void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit);

long getNumSegmentsPrunedByValue();

void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue);

long getExplainPlanNumEmptyFilterSegments();

void setExplainPlanNumEmptyFilterSegments(long explainPlanNumEmptyFilterSegments);

long getExplainPlanNumMatchAllFilterSegments();

void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments);

Map<String, String> getTraceInfo();

void setTraceInfo(Map<String, String> traceInfo);
Comment on lines +212 to +214
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure default creating these for tracing is a good idea. normally we dont enable tracing for production use case, so for those that were manually enabled, those must've been monitored and thus renders it less useful to keep the event listener (e.g. there's a human tracking it already)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but we would still want to keep the trace-info in the event so that we can persist it in Kafka / Hive and refer it later.


List<String> getProcessingExceptions();

void setProcessingExceptions(List<String> processingExceptions);

enum FanoutType {
OFFLINE, REALTIME, HYBRID
}
Expand Down
Loading