Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allows multiple requests per server per request ID #13742

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
2b339af
Allows multiple requests per server per request ID
egalpin Jun 1, 2024
4e8656e
Spotless linter
egalpin Aug 3, 2024
73748fd
Fixes test setups broken by method signature changes
egalpin Aug 3, 2024
c8285ec
Moves placement of appending tableName metadata
egalpin Aug 6, 2024
35188ea
Removes unused import
egalpin Aug 6, 2024
2ef9047
Updates signatures to match
egalpin Aug 6, 2024
a9eaa1f
Makes QueryResponse return iterable of ServerResponses without nested…
egalpin Aug 6, 2024
09d9e20
Updates scala to match iterable server responses
egalpin Aug 6, 2024
a692f62
Updates license header to comply with linter
egalpin Aug 7, 2024
5359950
Makes query routing table non-nullable
egalpin Aug 7, 2024
f441389
Moves table name metadata setting to server-side
egalpin Aug 7, 2024
7cac310
Duplicate addition of table name?
egalpin Aug 7, 2024
c41813a
Ensures async query response map employs ConcurrentHashMap at the nes…
egalpin Aug 8, 2024
df17781
WIP - fixing tests where tablename with type is now required
egalpin Aug 8, 2024
9088b46
Removing unnecessary table name metadata set ops
egalpin Aug 8, 2024
3124c99
Updates more tests to be compatible
egalpin Aug 8, 2024
31a19c7
Ensures query_hash is set everywhere query_id is
egalpin Aug 8, 2024
343104a
Linting fixes
egalpin Aug 8, 2024
9fb4537
Merge remote-tracking branch 'upstream/master' into egalpin/refactor-…
egalpin Aug 8, 2024
6248be0
Fixes bug in broker reduce service, remove empty serverInstance entries
egalpin Aug 9, 2024
30933ac
Updates data table reducers to check size number of data tables as op…
egalpin Aug 9, 2024
ff55b30
Ensures no concurrent modification of nested map
egalpin Aug 9, 2024
8c8c006
Adds comment RE deployment
egalpin Aug 9, 2024
7eac151
Uses count of issued queries rather than distinct server count for se…
egalpin Aug 12, 2024
c3d9bf8
Ensures table name is always added to DataTable metadata
egalpin Aug 13, 2024
75db44f
Fixes use of wrong hashcode
egalpin Aug 13, 2024
2ffd65d
Removes codeblock handled elsewhere
egalpin Aug 13, 2024
efc72fb
WIP - use TableName instead of query hash
egalpin Sep 3, 2024
859311d
Borrows 1 digit from request-id
egalpin Sep 27, 2024
5ed97ac
WIP - Use tableName not query hash
egalpin Oct 8, 2024
4941c0b
Use requestId to infer tableType if null
egalpin Oct 8, 2024
64305d5
Merge remote-tracking branch 'upstream/master' into egalpin/refactor-…
egalpin Oct 8, 2024
0cd47ca
Fixes post-merge import
egalpin Oct 8, 2024
c280cb8
Adds shared constants for request ID hacking
egalpin Oct 8, 2024
c5837bd
Linting fixes
egalpin Oct 8, 2024
598553e
Adds hacky access to table type inference from request ID
egalpin Oct 9, 2024
f7fc1fe
Removes QUERY_HASH from DataTable metadata
egalpin Oct 10, 2024
7fb325a
Renames Constants to BrokerRequestIdConstants for clarity
egalpin Oct 10, 2024
996a6f9
Moves inferTableType to DataTableUtils
egalpin Oct 10, 2024
eb10a8b
Clean up references to tableName
egalpin Oct 10, 2024
a9c8d2e
Updates PrioritySchedulerTest#testSubmitBeforeRunning to expect that …
egalpin Oct 10, 2024
734189b
Linting fixes
egalpin Oct 10, 2024
4d621fa
Empty commit retrigger tests
egalpin Oct 11, 2024
41aa2e1
Makes requestId manipulations idempotent
egalpin Oct 17, 2024
edc5aa4
Adds license to BrokerRequestIdUtilsTest.java
egalpin Oct 17, 2024
86c1d4c
Fixes spotless violations
egalpin Oct 17, 2024
2ec099b
Empty commit retrigger flaky tests
egalpin Oct 17, 2024
2703dbf
Empty commit retrigger flaky tests again
egalpin Oct 17, 2024
23a980f
Uses bitmask for canonical requestId
egalpin Oct 18, 2024
b022cc6
Empty commit for tests
egalpin Oct 18, 2024
c660fcb
Empty commit for tests off-hours
egalpin Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -87,6 +88,7 @@
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerQueryRoutingContext;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.query.parser.utils.ParserUtils;
import org.apache.pinot.spi.auth.AuthorizationResult;
Expand Down Expand Up @@ -616,44 +618,32 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// Calculate routing table for the query
// TODO: Modify RoutingManager interface to directly take PinotQuery
long routingStartTimeNs = System.nanoTime();
Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable = null;
Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable = null;
Map<ServerInstance, List<ServerQueryRoutingContext>> queryRoutingTable = new HashMap<>();
List<String> unavailableSegments = new ArrayList<>();
int numPrunedSegmentsTotal = 0;

if (offlineBrokerRequest != null) {
// NOTE: Routing table might be null if table is just removed
RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId);
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
routingTable.getServerInstanceToSegmentsMap();
if (!serverInstanceToSegmentsMap.isEmpty()) {
offlineRoutingTable = serverInstanceToSegmentsMap;
} else {
offlineBrokerRequest = null;
}
numPrunedSegmentsTotal += routingTable.getNumPrunedSegments();
} else {
Integer numPrunedSegments =
updateRoutingTable(requestId, offlineBrokerRequest, queryRoutingTable, unavailableSegments);
if (numPrunedSegments == null) {
offlineBrokerRequest = null;
} else {
numPrunedSegmentsTotal += numPrunedSegments;
}
}
if (realtimeBrokerRequest != null) {
// NOTE: Routing table might be null if table is just removed
RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
routingTable.getServerInstanceToSegmentsMap();
if (!serverInstanceToSegmentsMap.isEmpty()) {
realtimeRoutingTable = serverInstanceToSegmentsMap;
} else {
realtimeBrokerRequest = null;
}
numPrunedSegmentsTotal += routingTable.getNumPrunedSegments();
} else {

// TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables
if (realtimeBrokerRequest != null && (!pinotQuery.isExplain() || offlineBrokerRequest != null)) {
// Don't send explain queries to realtime for OFFLINE or HYBRID tables
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackjlli I'll remove lines 759-766 as it's handled here now instead.

Integer numPrunedSegments =
updateRoutingTable(requestId, realtimeBrokerRequest, queryRoutingTable, unavailableSegments);
if (numPrunedSegments == null) {
realtimeBrokerRequest = null;
} else {
numPrunedSegmentsTotal += numPrunedSegments;
}
}

int numUnavailableSegments = unavailableSegments.size();
requestContext.setNumUnavailableSegments(numUnavailableSegments);

Expand Down Expand Up @@ -716,18 +706,13 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO

// Set the maximum serialized response size per server, and ask server to directly return final response when only
// one server is queried
int numServers = 0;
if (offlineRoutingTable != null) {
numServers += offlineRoutingTable.size();
}
if (realtimeRoutingTable != null) {
numServers += realtimeRoutingTable.size();
}
int numQueriesIssued = queryRoutingTable.values().stream().mapToInt(List::size).sum();

if (offlineBrokerRequest != null) {
Map<String, String> queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions();
setMaxServerResponseSizeBytes(numServers, queryOptions, offlineTableConfig);
setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, offlineTableConfig);
// Set the query option to directly return final result for single server query unless it is explicitly disabled
if (numServers == 1) {
if (numQueriesIssued == 1) {
// Set the same flag in the original server request to be used in the reduce phase for hybrid table
if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
&& offlineBrokerRequest != serverBrokerRequest) {
Expand All @@ -738,9 +723,9 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
if (realtimeBrokerRequest != null) {
Map<String, String> queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions();
setMaxServerResponseSizeBytes(numServers, queryOptions, realtimeTableConfig);
setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, realtimeTableConfig);
// Set the query option to directly return final result for single server query unless it is explicitly disabled
if (numServers == 1) {
if (numQueriesIssued == 1) {
// Set the same flag in the original server request to be used in the reduce phase for hybrid table
if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
&& realtimeBrokerRequest != serverBrokerRequest) {
Expand All @@ -758,15 +743,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// - Compile time function invocation
// - Literal only queries
// - Any rewrites
if (pinotQuery.isExplain()) {
// Update routing tables to only send request to offline servers for OFFLINE and HYBRID tables.
// TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables
if (offlineRoutingTable != null) {
// For OFFLINE and HYBRID tables, don't send EXPLAIN query to realtime servers.
realtimeBrokerRequest = null;
realtimeRoutingTable = null;
}
}

BrokerResponseNative brokerResponse;
if (_queriesById != null) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any
Expand All @@ -777,20 +754,20 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// can always list the running queries and cancel query again until it ends. Just that such race
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
_queriesById.put(requestId, new QueryServers(query, queryRoutingTable));
LOGGER.debug("Keep track of running query: {}", requestId);
try {
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest,
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats,
requestContext);
brokerResponse =
processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, queryRoutingTable, remainingTimeMs,
serverStats, requestContext);
} finally {
_queriesById.remove(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
} else {
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest,
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats,
requestContext);
brokerResponse =
processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, queryRoutingTable, remainingTimeMs,
serverStats, requestContext);
}

for (ProcessingException exception : exceptions) {
Expand Down Expand Up @@ -827,6 +804,33 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}

private Integer updateRoutingTable(long requestId, BrokerRequest brokerRequest,
Map<ServerInstance, List<ServerQueryRoutingContext>> routingTableResult, List<String> unavailableSegments) {
// NOTE: Routing table might be null if table is just removed
RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, requestId);
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
routingTable.getServerInstanceToSegmentsMap();
if (!serverInstanceToSegmentsMap.isEmpty()) {
for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> serverMap
: serverInstanceToSegmentsMap.entrySet()) {
ServerInstance serverInstance = serverMap.getKey();
Pair<List<String>, List<String>> segmentsToQuery = serverMap.getValue();

routingTableResult.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(
new ServerQueryRoutingContext(brokerRequest, segmentsToQuery,
serverInstance.toServerRoutingInstance(serverInstance.isTlsEnabled())));
}
} else {
return null;
}
return routingTable.getNumPrunedSegments();
} else {
return null;
}
}

@VisibleForTesting
static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy,
String offlineRoutingPolicy) {
Expand Down Expand Up @@ -903,7 +907,7 @@ private void setTimestampIndexExpressionOverrideHints(@Nullable Expression expre
case "datetrunc":
String granularString = function.getOperands().get(0).getLiteral().getStringValue().toUpperCase();
Expression timeExpression = function.getOperands().get(1);
if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS".equalsIgnoreCase(
if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS" .equalsIgnoreCase(
function.getOperands().get(2).getLiteral().getStringValue()))) && TimestampIndexUtils.isValidGranularity(
granularString) && timeExpression.getIdentifier() != null) {
String timeColumn = timeExpression.getIdentifier().getName();
Expand Down Expand Up @@ -1646,7 +1650,7 @@ private static void fixColumnName(String rawTableName, Expression expression, Ma
@VisibleForTesting
static String getActualColumnName(String rawTableName, String columnName, @Nullable Map<String, String> columnNameMap,
boolean ignoreCase) {
if ("*".equals(columnName)) {
if ("*" .equals(columnName)) {
return columnName;
}
String columnNameToCheck = trimTableName(rawTableName, columnName, ignoreCase);
Expand Down Expand Up @@ -1750,7 +1754,7 @@ private long setQueryTimeout(String tableNameWithType, Map<String, String> query
* 5. BrokerConfig -> maxServerResponseSizeBytes
* 6. BrokerConfig -> maxServerResponseSizeBytes
*/
private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> queryOptions,
private void setMaxServerResponseSizeBytes(int numQueriesIssued, Map<String, String> queryOptions,
@Nullable TableConfig tableConfig) {
// QueryOption
if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null) {
Expand All @@ -1759,7 +1763,7 @@ private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> q
Long maxQueryResponseSizeQueryOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions);
if (maxQueryResponseSizeQueryOption != null) {
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(maxQueryResponseSizeQueryOption / numServers));
Long.toString(maxQueryResponseSizeQueryOption / numQueriesIssued));
return;
}

Expand All @@ -1773,7 +1777,7 @@ private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> q
}
if (queryConfig.getMaxQueryResponseSizeBytes() != null) {
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numServers));
Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numQueriesIssued));
return;
}
}
Expand All @@ -1789,7 +1793,7 @@ private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> q
String maxQueryResponseSizeBrokerConfig = _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES);
if (maxQueryResponseSizeBrokerConfig != null) {
queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
Long.toString(DataSizeUtils.toBytes(maxQueryResponseSizeBrokerConfig) / numServers));
Long.toString(DataSizeUtils.toBytes(maxQueryResponseSizeBrokerConfig) / numQueriesIssued));
}
}

Expand Down Expand Up @@ -1857,10 +1861,8 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t
* TODO: Directly take PinotQuery
*/
protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
BrokerRequest serverBrokerRequest,
Map<ServerInstance, List<ServerQueryRoutingContext>> queryRoutingTable, long timeoutMs,
ServerStats serverStats, RequestContext requestContext)
throws Exception;

Expand Down Expand Up @@ -1890,14 +1892,10 @@ private static class QueryServers {
final String _query;
final Set<ServerInstance> _servers = new HashSet<>();

QueryServers(String query, @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable) {
QueryServers(String query, @Nullable Map<ServerInstance, List<ServerQueryRoutingContext>> queryRoutingTable) {
_query = query;
if (offlineRoutingTable != null) {
_servers.addAll(offlineRoutingTable.keySet());
}
if (realtimeRoutingTable != null) {
_servers.addAll(realtimeRoutingTable.keySet());
if (queryRoutingTable != null) {
_servers.addAll(queryRoutingTable.keySet());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.broker.requesthandler;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.pinot.common.utils.request.BrokerRequestIdUtils;


/**
* An ID generator to produce a global unique identifier for each query, used in v1/v2 engine for tracking and
Expand All @@ -43,7 +45,9 @@ public BrokerRequestIdGenerator(String brokerId) {
}

public long get() {
long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
return _mask + normalized;
long normalized =
((_incrementingId.getAndIncrement() & Long.MAX_VALUE) % (OFFSET / BrokerRequestIdUtils.TABLE_TYPE_OFFSET))
* BrokerRequestIdUtils.TABLE_TYPE_OFFSET;
return BrokerRequestIdUtils.getCanonicalRequestId(_mask + normalized);
}
}
Loading
Loading