-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allows multiple requests per server per request ID #13742
Open
egalpin
wants to merge
51
commits into
apache:master
Choose a base branch
from
egalpin:egalpin/refactor-broker-request
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 24 commits
Commits
Show all changes
51 commits
Select commit
Hold shift + click to select a range
2b339af
Allows multiple requests per server per request ID
egalpin 4e8656e
Spotless linter
egalpin 73748fd
Fixes test setups broken by method signature changes
egalpin c8285ec
Moves placement of appending tableName metadata
egalpin 35188ea
Removes unused import
egalpin 2ef9047
Updates signatures to match
egalpin a9eaa1f
Makes QueryResponse return iterable of ServerResponses without nested…
egalpin 09d9e20
Updates scala to match iterable server responses
egalpin a692f62
Updates license header to comply with linter
egalpin 5359950
Makes query routing table non-nullable
egalpin f441389
Moves table name metadata setting to server-side
egalpin 7cac310
Duplicate addition of table name?
egalpin c41813a
Ensures async query response map employs ConcurrentHashMap at the nes…
egalpin df17781
WIP - fixing tests where tablename with type is now required
egalpin 9088b46
Removing unnecessary table name metadata set ops
egalpin 3124c99
Updates more tests to be compatible
egalpin 31a19c7
Ensures query_hash is set everywhere query_id is
egalpin 343104a
Linting fixes
egalpin 9fb4537
Merge remote-tracking branch 'upstream/master' into egalpin/refactor-…
egalpin 6248be0
Fixes bug in broker reduce service, remove empty serverInstance entries
egalpin 30933ac
Updates data table reducers to check size number of data tables as op…
egalpin ff55b30
Ensures no concurrent modification of nested map
egalpin 8c8c006
Adds comment RE deployment
egalpin 7eac151
Uses count of issued queries rather than distinct server count for se…
egalpin c3d9bf8
Ensures table name is always added to DataTable metadata
egalpin 75db44f
Fixes use of wrong hashcode
egalpin 2ffd65d
Removes codeblock handled elsewhere
egalpin efc72fb
WIP - use TableName instead of query hash
egalpin 859311d
Borrows 1 digit from request-id
egalpin 5ed97ac
WIP - Use tableName not query hash
egalpin 4941c0b
Use requestId to infer tableType if null
egalpin 64305d5
Merge remote-tracking branch 'upstream/master' into egalpin/refactor-…
egalpin 0cd47ca
Fixes post-merge import
egalpin c280cb8
Adds shared constants for request ID hacking
egalpin c5837bd
Linting fixes
egalpin 598553e
Adds hacky access to table type inference from request ID
egalpin f7fc1fe
Removes QUERY_HASH from DataTable metadata
egalpin 7fb325a
Renames Constants to BrokerRequestIdConstants for clarity
egalpin 996a6f9
Moves inferTableType to DataTableUtils
egalpin eb10a8b
Clean up references to tableName
egalpin a9c8d2e
Updates PrioritySchedulerTest#testSubmitBeforeRunning to expect that …
egalpin 734189b
Linting fixes
egalpin 4d621fa
Empty commit retrigger tests
egalpin 41aa2e1
Makes requestId manipulations idempotent
egalpin edc5aa4
Adds license to BrokerRequestIdUtilsTest.java
egalpin 86c1d4c
Fixes spotless violations
egalpin 2ec099b
Empty commit retrigger flaky tests
egalpin 2703dbf
Empty commit retrigger flaky tests again
egalpin 23a980f
Uses bitmask for canonical requestId
egalpin b022cc6
Empty commit for tests
egalpin c660fcb
Empty commit for tests off-hours
egalpin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
import java.net.URI; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -86,6 +87,7 @@ | |
import org.apache.pinot.core.routing.RoutingTable; | ||
import org.apache.pinot.core.routing.TimeBoundaryInfo; | ||
import org.apache.pinot.core.transport.ServerInstance; | ||
import org.apache.pinot.core.transport.ServerQueryRoutingContext; | ||
import org.apache.pinot.core.util.GapfillUtils; | ||
import org.apache.pinot.query.parser.utils.ParserUtils; | ||
import org.apache.pinot.spi.auth.AuthorizationResult; | ||
|
@@ -629,44 +631,32 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S | |
// Calculate routing table for the query | ||
// TODO: Modify RoutingManager interface to directly take PinotQuery | ||
long routingStartTimeNs = System.nanoTime(); | ||
Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable = null; | ||
Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable = null; | ||
Map<ServerInstance, List<ServerQueryRoutingContext>> queryRoutingTable = new HashMap<>(); | ||
List<String> unavailableSegments = new ArrayList<>(); | ||
int numPrunedSegmentsTotal = 0; | ||
|
||
if (offlineBrokerRequest != null) { | ||
// NOTE: Routing table might be null if table is just removed | ||
RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId); | ||
if (routingTable != null) { | ||
unavailableSegments.addAll(routingTable.getUnavailableSegments()); | ||
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap = | ||
routingTable.getServerInstanceToSegmentsMap(); | ||
if (!serverInstanceToSegmentsMap.isEmpty()) { | ||
offlineRoutingTable = serverInstanceToSegmentsMap; | ||
} else { | ||
offlineBrokerRequest = null; | ||
} | ||
numPrunedSegmentsTotal += routingTable.getNumPrunedSegments(); | ||
} else { | ||
Integer numPrunedSegments = | ||
updateRoutingTable(requestId, offlineBrokerRequest, queryRoutingTable, unavailableSegments); | ||
if (numPrunedSegments == null) { | ||
offlineBrokerRequest = null; | ||
} else { | ||
numPrunedSegmentsTotal += numPrunedSegments; | ||
} | ||
} | ||
if (realtimeBrokerRequest != null) { | ||
// NOTE: Routing table might be null if table is just removed | ||
RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId); | ||
if (routingTable != null) { | ||
unavailableSegments.addAll(routingTable.getUnavailableSegments()); | ||
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap = | ||
routingTable.getServerInstanceToSegmentsMap(); | ||
if (!serverInstanceToSegmentsMap.isEmpty()) { | ||
realtimeRoutingTable = serverInstanceToSegmentsMap; | ||
} else { | ||
realtimeBrokerRequest = null; | ||
} | ||
numPrunedSegmentsTotal += routingTable.getNumPrunedSegments(); | ||
} else { | ||
|
||
// TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables | ||
if (realtimeBrokerRequest != null && (!pinotQuery.isExplain() || offlineBrokerRequest != null)) { | ||
// Don't send explain queries to realtime for OFFLINE or HYBRID tables | ||
Integer numPrunedSegments = | ||
updateRoutingTable(requestId, realtimeBrokerRequest, queryRoutingTable, unavailableSegments); | ||
if (numPrunedSegments == null) { | ||
realtimeBrokerRequest = null; | ||
} else { | ||
numPrunedSegmentsTotal += numPrunedSegments; | ||
} | ||
} | ||
|
||
int numUnavailableSegments = unavailableSegments.size(); | ||
requestContext.setNumUnavailableSegments(numUnavailableSegments); | ||
|
||
|
@@ -729,18 +719,13 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S | |
|
||
// Set the maximum serialized response size per server, and ask server to directly return final response when only | ||
// one server is queried | ||
int numServers = 0; | ||
if (offlineRoutingTable != null) { | ||
numServers += offlineRoutingTable.size(); | ||
} | ||
if (realtimeRoutingTable != null) { | ||
numServers += realtimeRoutingTable.size(); | ||
} | ||
int numQueriesIssued = queryRoutingTable.values().stream().mapToInt(List::size).sum(); | ||
|
||
if (offlineBrokerRequest != null) { | ||
Map<String, String> queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions(); | ||
setMaxServerResponseSizeBytes(numServers, queryOptions, offlineTableConfig); | ||
setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, offlineTableConfig); | ||
// Set the query option to directly return final result for single server query unless it is explicitly disabled | ||
if (numServers == 1) { | ||
if (numQueriesIssued == 1) { | ||
// Set the same flag in the original server request to be used in the reduce phase for hybrid table | ||
if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null | ||
&& offlineBrokerRequest != serverBrokerRequest) { | ||
|
@@ -751,9 +736,9 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S | |
} | ||
if (realtimeBrokerRequest != null) { | ||
Map<String, String> queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions(); | ||
setMaxServerResponseSizeBytes(numServers, queryOptions, realtimeTableConfig); | ||
setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, realtimeTableConfig); | ||
// Set the query option to directly return final result for single server query unless it is explicitly disabled | ||
if (numServers == 1) { | ||
if (numQueriesIssued == 1) { | ||
// Set the same flag in the original server request to be used in the reduce phase for hybrid table | ||
if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null | ||
&& realtimeBrokerRequest != serverBrokerRequest) { | ||
|
@@ -771,15 +756,14 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S | |
// - Compile time function invocation | ||
// - Literal only queries | ||
// - Any rewrites | ||
if (pinotQuery.isExplain()) { | ||
// if (pinotQuery.isExplain()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: why is it commented out? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
// Update routing tables to only send request to offline servers for OFFLINE and HYBRID tables. | ||
// TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables | ||
if (offlineRoutingTable != null) { | ||
// For OFFLINE and HYBRID tables, don't send EXPLAIN query to realtime servers. | ||
realtimeBrokerRequest = null; | ||
realtimeRoutingTable = null; | ||
} | ||
} | ||
// if (offlineRoutingTable != null) { | ||
// // For OFFLINE and HYBRID tables, don't send EXPLAIN query to realtime servers. | ||
// realtimeBrokerRequest = null; | ||
// realtimeRoutingTable = null; | ||
// } | ||
// } | ||
BrokerResponseNative brokerResponse; | ||
if (_queriesById != null) { | ||
// Start to track the running query for cancellation just before sending it out to servers to avoid any | ||
|
@@ -790,20 +774,20 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S | |
// can always list the running queries and cancel query again until it ends. Just that such race | ||
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to | ||
// servers takes time, but will address later if needed. | ||
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); | ||
_queriesById.put(requestId, new QueryServers(query, queryRoutingTable)); | ||
LOGGER.debug("Keep track of running query: {}", requestId); | ||
try { | ||
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, | ||
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, | ||
requestContext); | ||
brokerResponse = | ||
processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, queryRoutingTable, remainingTimeMs, | ||
serverStats, requestContext); | ||
} finally { | ||
_queriesById.remove(requestId); | ||
LOGGER.debug("Remove track of running query: {}", requestId); | ||
} | ||
} else { | ||
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, | ||
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, | ||
requestContext); | ||
brokerResponse = | ||
processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, queryRoutingTable, remainingTimeMs, | ||
serverStats, requestContext); | ||
} | ||
|
||
for (ProcessingException exception : exceptions) { | ||
|
@@ -840,6 +824,33 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S | |
} | ||
} | ||
|
||
private Integer updateRoutingTable(long requestId, BrokerRequest brokerRequest, | ||
Map<ServerInstance, List<ServerQueryRoutingContext>> routingTableResult, List<String> unavailableSegments) { | ||
// NOTE: Routing table might be null if table is just removed | ||
RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, requestId); | ||
if (routingTable != null) { | ||
unavailableSegments.addAll(routingTable.getUnavailableSegments()); | ||
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap = | ||
routingTable.getServerInstanceToSegmentsMap(); | ||
if (!serverInstanceToSegmentsMap.isEmpty()) { | ||
for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> serverMap | ||
: serverInstanceToSegmentsMap.entrySet()) { | ||
ServerInstance serverInstance = serverMap.getKey(); | ||
Pair<List<String>, List<String>> segmentsToQuery = serverMap.getValue(); | ||
|
||
routingTableResult.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add( | ||
new ServerQueryRoutingContext(brokerRequest, segmentsToQuery, | ||
serverInstance.toServerRoutingInstance(serverInstance.isTlsEnabled()))); | ||
} | ||
} else { | ||
return null; | ||
} | ||
return routingTable.getNumPrunedSegments(); | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy, | ||
String offlineRoutingPolicy) { | ||
|
@@ -916,7 +927,7 @@ private void setTimestampIndexExpressionOverrideHints(@Nullable Expression expre | |
case "datetrunc": | ||
String granularString = function.getOperands().get(0).getLiteral().getStringValue().toUpperCase(); | ||
Expression timeExpression = function.getOperands().get(1); | ||
if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS".equalsIgnoreCase( | ||
if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS" .equalsIgnoreCase( | ||
function.getOperands().get(2).getLiteral().getStringValue()))) && TimestampIndexUtils.isValidGranularity( | ||
granularString) && timeExpression.getIdentifier() != null) { | ||
String timeColumn = timeExpression.getIdentifier().getName(); | ||
|
@@ -1698,7 +1709,7 @@ private static void fixColumnName(String rawTableName, Expression expression, Ma | |
@VisibleForTesting | ||
static String getActualColumnName(String rawTableName, String columnName, @Nullable Map<String, String> columnNameMap, | ||
boolean ignoreCase) { | ||
if ("*".equals(columnName)) { | ||
if ("*" .equals(columnName)) { | ||
return columnName; | ||
} | ||
String columnNameToCheck = trimTableName(rawTableName, columnName, ignoreCase); | ||
|
@@ -1802,7 +1813,7 @@ private long setQueryTimeout(String tableNameWithType, Map<String, String> query | |
* 5. BrokerConfig -> maxServerResponseSizeBytes | ||
* 6. BrokerConfig -> maxServerResponseSizeBytes | ||
*/ | ||
private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> queryOptions, | ||
private void setMaxServerResponseSizeBytes(int numQueriesIssued, Map<String, String> queryOptions, | ||
@Nullable TableConfig tableConfig) { | ||
// QueryOption | ||
if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null) { | ||
|
@@ -1811,7 +1822,7 @@ private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> q | |
Long maxQueryResponseSizeQueryOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions); | ||
if (maxQueryResponseSizeQueryOption != null) { | ||
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, | ||
Long.toString(maxQueryResponseSizeQueryOption / numServers)); | ||
Long.toString(maxQueryResponseSizeQueryOption / numQueriesIssued)); | ||
return; | ||
} | ||
|
||
|
@@ -1825,7 +1836,7 @@ private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> q | |
} | ||
if (queryConfig.getMaxQueryResponseSizeBytes() != null) { | ||
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, | ||
Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numServers)); | ||
Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numQueriesIssued)); | ||
return; | ||
} | ||
} | ||
|
@@ -1841,7 +1852,7 @@ private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> q | |
String maxQueryResponseSizeBrokerConfig = _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES); | ||
if (maxQueryResponseSizeBrokerConfig != null) { | ||
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, | ||
Long.toString(DataSizeUtils.toBytes(maxQueryResponseSizeBrokerConfig) / numServers)); | ||
Long.toString(DataSizeUtils.toBytes(maxQueryResponseSizeBrokerConfig) / numQueriesIssued)); | ||
} | ||
} | ||
|
||
|
@@ -1909,10 +1920,8 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t | |
* TODO: Directly take PinotQuery | ||
*/ | ||
protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, | ||
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, | ||
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable, | ||
@Nullable BrokerRequest realtimeBrokerRequest, | ||
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs, | ||
BrokerRequest serverBrokerRequest, | ||
Map<ServerInstance, List<ServerQueryRoutingContext>> queryRoutingTable, long timeoutMs, | ||
ServerStats serverStats, RequestContext requestContext) | ||
throws Exception; | ||
|
||
|
@@ -1942,14 +1951,10 @@ private static class QueryServers { | |
final String _query; | ||
final Set<ServerInstance> _servers = new HashSet<>(); | ||
|
||
QueryServers(String query, @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable, | ||
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable) { | ||
QueryServers(String query, @Nullable Map<ServerInstance, List<ServerQueryRoutingContext>> queryRoutingTable) { | ||
_query = query; | ||
if (offlineRoutingTable != null) { | ||
_servers.addAll(offlineRoutingTable.keySet()); | ||
} | ||
if (realtimeRoutingTable != null) { | ||
_servers.addAll(realtimeRoutingTable.keySet()); | ||
if (queryRoutingTable != null) { | ||
_servers.addAll(queryRoutingTable.keySet()); | ||
} | ||
} | ||
} | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackjlli I'll remove lines 759-766 as it's handled here now instead.