Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add isPartialResult flag to broker response #11592

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
int numUnavailableSegments = unavailableSegments.size();
requestContext.setNumUnavailableSegments(numUnavailableSegments);

boolean isPartialResult = false;
Copy link
Contributor

@walterddr walterddr Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the "partial result" definition? for example if grouplimits reached for order by queries do we consider that as partial results?

i guess the question is when do we set the flag for:

  1. result is correct (this is no flag)
  2. result is correct but we didn't look at the entire table (e.g. select * limit 1000)
  3. result is not correct but each row is correct (e.g. select ... group by key order by COUNT(*))
  4. result is not correct, some rows might be correct some rows might not due to data size limit (select ... join ... group by ... when join hits maxRow limit, grouping result might be partial)
  5. result is incorrect (we shouldn't return in this case)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@walterddr my idea with this flag is to generally cover all cases where we do sent a response back (no failures), but we also know either some segments / servers were not queried (segment unavailability / server down etc) or some rows / groups were ignored.

  1. result is correct (this is no flag)

  2. result is correct but we didn't look at the entire table (e.g. select * limit 1000) -> I don't think this should be flagged as partial result since the result is correct, we don't have to look the entire table up

  3. result is not correct but each row is correct (e.g. select ... group by key order by COUNT(*)) -> I'm not sure if there is a way of knowing this in the current server -> broker response? If yes, then I do think we should flag this a partial result. I may not have full context, but is it the trimming of cross segment groups to max(limit * 5, DEFAULT_MIN_NUM_GROUPS) that you're referring to?

  4. result is not correct, some rows might be correct some rows might not due to data size limit (select ... join ... group by ... when join hits maxRow limit, grouping result might be partial) -> Yes this should classify as partialResponse imo

  5. result is incorrect (we shouldn't return in this case) -> What do you mean by incorrect here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think general definition (and this is what we did in the current engine IIRC) is that partialResults flag would be set to true in either of the following:

  • Result is incomplete (determined by server based on things like GROUP BY heuristics, threshold etc)
  • Not all servers queried responded (determine by broker)

Basically when we are able to determine that there was more data to be processed but we couldn't either because of our own algorithms (heuristics / thresholds etc) or failed to hear back from servers.

I think (1) can also be determined by broker during merge.

For SELECT with ORDER BY, I don't think we ever set partialResults on server and there is no need imo.

For SELECT without ORDER BY, servers do early termination based on LIMIT N, but again partialResults is not set and there is no need.

List<ProcessingException> exceptions = new ArrayList<>();
if (numUnavailableSegments > 0) {
String errorMessage;
Expand All @@ -619,13 +620,15 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
}
exceptions.add(QueryException.getException(QueryException.BROKER_SEGMENT_UNAVAILABLE_ERROR, errorMessage));
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_UNAVAILABLE_SEGMENTS, 1);
isPartialResult = true;
}

if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
if (!exceptions.isEmpty()) {
LOGGER.info("No server found for request {}: {}", requestId, query);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, 1);
return new BrokerResponseNative(exceptions);
BrokerResponseNative responseNative = new BrokerResponseNative(exceptions);
responseNative.setPartialResult(isPartialResult);
} else {
// When all segments have been pruned, we can just return an empty response.
return getEmptyBrokerOnlyResponse(requestId, query, requesterIdentity, requestContext, pinotQuery, tableName);
Expand Down Expand Up @@ -658,7 +661,9 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
LOGGER.info("{} {}: {}", errorMessage, requestId, query);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS, 1);
exceptions.add(QueryException.getException(QueryException.BROKER_TIMEOUT_ERROR, errorMessage));
return new BrokerResponseNative(exceptions);
BrokerResponseNative responseNative = new BrokerResponseNative(exceptions);
responseNative.setPartialResult(isPartialResult);
return responseNative;
}

// Execute the query
Expand Down Expand Up @@ -712,9 +717,11 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S

// Track number of queries with number of groups limit reached
if (brokerResponse.isNumGroupsLimitReached()) {
isPartialResult = true;
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
1);
}
brokerResponse.setPartialResult(isPartialResult);

// Set total query processing time
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
String tableName = entry.getKey();
Set<String> unavailableSegments = entry.getValue();
numUnavailableSegments += unavailableSegments.size();
brokerResponse.setPartialResult(true);
brokerResponse.addToExceptions(new QueryProcessingException(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If some servers are not available we currently used to throw an Exception instead of the results
So likely this won't be required as we have an exception stating which servers are in error

String.format("Find unavailable segments: %s for table: %s", unavailableSegments, tableName)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
}
int numServersNotResponded = serversNotResponded.size();
if (numServersNotResponded != 0) {
brokerResponse.setPartialResult(true);
brokerResponse.addToExceptions(new QueryProcessingException(QueryException.SERVER_NOT_RESPONDING_ERROR_CODE,
String.format("%d servers %s not responded", numServersNotResponded, serversNotResponded)));
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ExecutionStats {
private static final String NUM_GROUPS_LIMIT_REACHED = "numGroupsLimitReached";
private static final String BROKER_REDUCE_TIME_MS = "brokerReduceTimeMs";
private static final String TIME_USED_MS = "timeUsedMs";
private static final String PARTIAL_RESULT = "partialResult";

private final JsonNode _brokerResponse;

Expand Down Expand Up @@ -110,6 +111,10 @@ public boolean isNumGroupsLimitReached() {
return _brokerResponse.has(NUM_GROUPS_LIMIT_REACHED) && _brokerResponse.get(NUM_GROUPS_LIMIT_REACHED).asBoolean();
}

public boolean isPartialResult() {
return _brokerResponse.has(PARTIAL_RESULT) && _brokerResponse.get(PARTIAL_RESULT).asBoolean();
}

public long getTimeUsedMs() {
return _brokerResponse.has(TIME_USED_MS) ? _brokerResponse.get(TIME_USED_MS).asLong() : -1L;
}
Expand All @@ -135,6 +140,7 @@ public String toString() {
map.put(NUM_GROUPS_LIMIT_REACHED, isNumGroupsLimitReached());
map.put(BROKER_REDUCE_TIME_MS, getBrokerReduceTimeMs() + "ms");
map.put(TIME_USED_MS, getTimeUsedMs() + "ms");
map.put(PARTIAL_RESULT, isPartialResult());
return map.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "segmentStatistics", "traceInfo"})
"realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "segmentStatistics", "traceInfo", "partialResult"})
public class BrokerResponseNative implements BrokerResponse {
public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty();
public static final BrokerResponseNative NO_TABLE_RESULT =
Expand Down Expand Up @@ -76,6 +76,7 @@ public class BrokerResponseNative implements BrokerResponse {

private long _totalDocs = 0L;
private boolean _numGroupsLimitReached = false;
private boolean _partialResult = false;
private long _timeUsedMs = 0L;
private long _offlineThreadCpuTimeNs = 0L;
private long _realtimeThreadCpuTimeNs = 0L;
Expand Down Expand Up @@ -493,6 +494,16 @@ public void setNumGroupsLimitReached(boolean numGroupsLimitReached) {
_numGroupsLimitReached = numGroupsLimitReached;
}

@JsonProperty("partialResult")
public boolean isPartialResult() {
return _partialResult;
}

@JsonProperty("partialResult")
public void setPartialResult(boolean partialResult) {
_partialResult = partialResult;
}

@JsonProperty("timeUsedMs")
public long getTimeUsedMs() {
return _timeUsedMs;
Expand Down
Loading