diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index c9862d1af0f..a6f0c1bda0b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -25,6 +25,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -87,6 +88,7 @@ import org.apache.pinot.core.routing.RoutingTable; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerQueryRoutingContext; import org.apache.pinot.core.util.GapfillUtils; import org.apache.pinot.query.parser.utils.ParserUtils; import org.apache.pinot.spi.auth.AuthorizationResult; @@ -616,44 +618,32 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // Calculate routing table for the query // TODO: Modify RoutingManager interface to directly take PinotQuery long routingStartTimeNs = System.nanoTime(); - Map, List>> offlineRoutingTable = null; - Map, List>> realtimeRoutingTable = null; + Map> queryRoutingTable = new HashMap<>(); List unavailableSegments = new ArrayList<>(); int numPrunedSegmentsTotal = 0; + if (offlineBrokerRequest != null) { - // NOTE: Routing table might be null if table is just removed - RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId); - if (routingTable != null) { - unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map, List>> serverInstanceToSegmentsMap = - routingTable.getServerInstanceToSegmentsMap(); - if (!serverInstanceToSegmentsMap.isEmpty()) { - offlineRoutingTable = serverInstanceToSegmentsMap; - } else { - offlineBrokerRequest = null; - } - numPrunedSegmentsTotal += routingTable.getNumPrunedSegments(); - } else { + Integer numPrunedSegments = + updateRoutingTable(requestId, offlineBrokerRequest, queryRoutingTable, unavailableSegments); + if (numPrunedSegments == null) { offlineBrokerRequest = null; + } else { + numPrunedSegmentsTotal += numPrunedSegments; } } - if (realtimeBrokerRequest != null) { - // NOTE: Routing table might be null if table is just removed - RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId); - if (routingTable != null) { - unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map, List>> serverInstanceToSegmentsMap = - routingTable.getServerInstanceToSegmentsMap(); - if (!serverInstanceToSegmentsMap.isEmpty()) { - realtimeRoutingTable = serverInstanceToSegmentsMap; - } else { - realtimeBrokerRequest = null; - } - numPrunedSegmentsTotal += routingTable.getNumPrunedSegments(); - } else { + + // TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables + if (realtimeBrokerRequest != null && (!pinotQuery.isExplain() || offlineBrokerRequest != null)) { + // Don't send explain queries to realtime for OFFLINE or HYBRID tables + Integer numPrunedSegments = + updateRoutingTable(requestId, realtimeBrokerRequest, queryRoutingTable, unavailableSegments); + if (numPrunedSegments == null) { realtimeBrokerRequest = null; + } else { + numPrunedSegmentsTotal += numPrunedSegments; } } + int numUnavailableSegments = unavailableSegments.size(); requestContext.setNumUnavailableSegments(numUnavailableSegments); @@ -716,18 +706,13 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // Set the maximum serialized response size per server, and ask server to directly return final response when only // one server is queried - int numServers = 0; - if (offlineRoutingTable != null) { - numServers += offlineRoutingTable.size(); - } - if (realtimeRoutingTable != null) { - numServers += realtimeRoutingTable.size(); - } + int numQueriesIssued = queryRoutingTable.values().stream().mapToInt(List::size).sum(); + if (offlineBrokerRequest != null) { Map queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions(); - setMaxServerResponseSizeBytes(numServers, queryOptions, offlineTableConfig); + setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, offlineTableConfig); // Set the query option to directly return final result for single server query unless it is explicitly disabled - if (numServers == 1) { + if (numQueriesIssued == 1) { // Set the same flag in the original server request to be used in the reduce phase for hybrid table if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null && offlineBrokerRequest != serverBrokerRequest) { @@ -738,9 +723,9 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } if (realtimeBrokerRequest != null) { Map queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions(); - setMaxServerResponseSizeBytes(numServers, queryOptions, realtimeTableConfig); + setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, realtimeTableConfig); // Set the query option to directly return final result for single server query unless it is explicitly disabled - if (numServers == 1) { + if (numQueriesIssued == 1) { // Set the same flag in the original server request to be used in the reduce phase for hybrid table if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null && realtimeBrokerRequest != serverBrokerRequest) { @@ -758,15 +743,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // - Compile time function invocation // - Literal only queries // - Any rewrites - if (pinotQuery.isExplain()) { - // Update routing tables to only send request to offline servers for OFFLINE and HYBRID tables. - // TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables - if (offlineRoutingTable != null) { - // For OFFLINE and HYBRID tables, don't send EXPLAIN query to realtime servers. - realtimeBrokerRequest = null; - realtimeRoutingTable = null; - } - } + BrokerResponseNative brokerResponse; if (_queriesById != null) { // Start to track the running query for cancellation just before sending it out to servers to avoid any @@ -777,20 +754,20 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // can always list the running queries and cancel query again until it ends. Just that such race // condition makes cancel API less reliable. This should be rare as it assumes sending queries out to // servers takes time, but will address later if needed. - _queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); + _queriesById.put(requestId, new QueryServers(query, queryRoutingTable)); LOGGER.debug("Keep track of running query: {}", requestId); try { - brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, - offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, - requestContext); + brokerResponse = + processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, queryRoutingTable, remainingTimeMs, + serverStats, requestContext); } finally { _queriesById.remove(requestId); LOGGER.debug("Remove track of running query: {}", requestId); } } else { - brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, - offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, - requestContext); + brokerResponse = + processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, queryRoutingTable, remainingTimeMs, + serverStats, requestContext); } for (ProcessingException exception : exceptions) { @@ -827,6 +804,33 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } } + private Integer updateRoutingTable(long requestId, BrokerRequest brokerRequest, + Map> routingTableResult, List unavailableSegments) { + // NOTE: Routing table might be null if table is just removed + RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, requestId); + if (routingTable != null) { + unavailableSegments.addAll(routingTable.getUnavailableSegments()); + Map, List>> serverInstanceToSegmentsMap = + routingTable.getServerInstanceToSegmentsMap(); + if (!serverInstanceToSegmentsMap.isEmpty()) { + for (Map.Entry, List>> serverMap + : serverInstanceToSegmentsMap.entrySet()) { + ServerInstance serverInstance = serverMap.getKey(); + Pair, List> segmentsToQuery = serverMap.getValue(); + + routingTableResult.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add( + new ServerQueryRoutingContext(brokerRequest, segmentsToQuery, + serverInstance.toServerRoutingInstance(serverInstance.isTlsEnabled()))); + } + } else { + return null; + } + return routingTable.getNumPrunedSegments(); + } else { + return null; + } + } + @VisibleForTesting static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy, String offlineRoutingPolicy) { @@ -903,7 +907,7 @@ private void setTimestampIndexExpressionOverrideHints(@Nullable Expression expre case "datetrunc": String granularString = function.getOperands().get(0).getLiteral().getStringValue().toUpperCase(); Expression timeExpression = function.getOperands().get(1); - if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS".equalsIgnoreCase( + if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS" .equalsIgnoreCase( function.getOperands().get(2).getLiteral().getStringValue()))) && TimestampIndexUtils.isValidGranularity( granularString) && timeExpression.getIdentifier() != null) { String timeColumn = timeExpression.getIdentifier().getName(); @@ -1646,7 +1650,7 @@ private static void fixColumnName(String rawTableName, Expression expression, Ma @VisibleForTesting static String getActualColumnName(String rawTableName, String columnName, @Nullable Map columnNameMap, boolean ignoreCase) { - if ("*".equals(columnName)) { + if ("*" .equals(columnName)) { return columnName; } String columnNameToCheck = trimTableName(rawTableName, columnName, ignoreCase); @@ -1750,7 +1754,7 @@ private long setQueryTimeout(String tableNameWithType, Map query * 5. BrokerConfig -> maxServerResponseSizeBytes * 6. BrokerConfig -> maxServerResponseSizeBytes */ - private void setMaxServerResponseSizeBytes(int numServers, Map queryOptions, + private void setMaxServerResponseSizeBytes(int numQueriesIssued, Map queryOptions, @Nullable TableConfig tableConfig) { // QueryOption if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null) { @@ -1759,7 +1763,7 @@ private void setMaxServerResponseSizeBytes(int numServers, Map q Long maxQueryResponseSizeQueryOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions); if (maxQueryResponseSizeQueryOption != null) { queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, - Long.toString(maxQueryResponseSizeQueryOption / numServers)); + Long.toString(maxQueryResponseSizeQueryOption / numQueriesIssued)); return; } @@ -1773,7 +1777,7 @@ private void setMaxServerResponseSizeBytes(int numServers, Map q } if (queryConfig.getMaxQueryResponseSizeBytes() != null) { queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, - Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numServers)); + Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numQueriesIssued)); return; } } @@ -1789,7 +1793,7 @@ private void setMaxServerResponseSizeBytes(int numServers, Map q String maxQueryResponseSizeBrokerConfig = _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES); if (maxQueryResponseSizeBrokerConfig != null) { queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, - Long.toString(DataSizeUtils.toBytes(maxQueryResponseSizeBrokerConfig) / numServers)); + Long.toString(DataSizeUtils.toBytes(maxQueryResponseSizeBrokerConfig) / numQueriesIssued)); } } @@ -1857,10 +1861,8 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t * TODO: Directly take PinotQuery */ protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + BrokerRequest serverBrokerRequest, + Map> queryRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception; @@ -1890,14 +1892,10 @@ private static class QueryServers { final String _query; final Set _servers = new HashSet<>(); - QueryServers(String query, @Nullable Map, List>> offlineRoutingTable, - @Nullable Map, List>> realtimeRoutingTable) { + QueryServers(String query, @Nullable Map> queryRoutingTable) { _query = query; - if (offlineRoutingTable != null) { - _servers.addAll(offlineRoutingTable.keySet()); - } - if (realtimeRoutingTable != null) { - _servers.addAll(realtimeRoutingTable.keySet()); + if (queryRoutingTable != null) { + _servers.addAll(queryRoutingTable.keySet()); } } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java index c97ee44a0af..1faffcaa905 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java @@ -19,6 +19,8 @@ package org.apache.pinot.broker.requesthandler; import java.util.concurrent.atomic.AtomicLong; +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; + /** * An ID generator to produce a global unique identifier for each query, used in v1/v2 engine for tracking and @@ -43,7 +45,9 @@ public BrokerRequestIdGenerator(String brokerId) { } public long get() { - long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET; - return _mask + normalized; + long normalized = + ((_incrementingId.getAndIncrement() & Long.MAX_VALUE) % (OFFSET / BrokerRequestIdUtils.TABLE_TYPE_OFFSET)) + * BrokerRequestIdUtils.TABLE_TYPE_OFFSET; + return BrokerRequestIdUtils.getCanonicalRequestId(_mask + normalized); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 0484476a410..d1dd8a30e6d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -24,9 +24,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; @@ -39,8 +37,8 @@ import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; import org.apache.pinot.core.query.reduce.StreamingReduceService; import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerQueryRoutingContext; import org.apache.pinot.core.transport.ServerRoutingInstance; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.trace.RequestContext; @@ -75,26 +73,15 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + BrokerRequest serverBrokerRequest, + Map> queryRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { // TODO: Support failure detection // TODO: Add servers queried/responded stats - assert offlineBrokerRequest != null || realtimeBrokerRequest != null; + assert !queryRoutingTable.isEmpty(); Map> responseMap = new HashMap<>(); - if (offlineBrokerRequest != null) { - assert offlineRoutingTable != null; - sendRequest(requestId, TableType.OFFLINE, offlineBrokerRequest, offlineRoutingTable, responseMap, - requestContext.isSampledRequest()); - } - if (realtimeBrokerRequest != null) { - assert realtimeRoutingTable != null; - sendRequest(requestId, TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap, - requestContext.isSampledRequest()); - } + sendRequest(requestId, queryRoutingTable, responseMap, requestContext.isSampledRequest()); long reduceStartTimeNs = System.nanoTime(); BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics); @@ -105,21 +92,21 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques /** * Query pinot server for data table. */ - private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest, - Map, List>> routingTable, + private void sendRequest(long requestId, Map> queryRoutingTable, Map> responseMap, boolean trace) { - for (Map.Entry, List>> routingEntry : routingTable.entrySet()) { + for (Map.Entry> routingEntry : queryRoutingTable.entrySet()) { ServerInstance serverInstance = routingEntry.getKey(); - // TODO: support optional segments for GrpcQueryServer. - List segments = routingEntry.getValue().getLeft(); - String serverHost = serverInstance.getHostname(); - int port = serverInstance.getGrpcPort(); - // TODO: enable throttling on per host bases. - Iterator streamingResponse = _streamingQueryClient.submit(serverHost, port, - new GrpcRequestBuilder().setRequestId(requestId).setBrokerId(_brokerId).setEnableTrace(trace) - .setEnableStreaming(true).setBrokerRequest(brokerRequest).setSegments(segments).build()); - responseMap.put(serverInstance.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC), - streamingResponse); + for (ServerQueryRoutingContext serverQueryRoutingContext : routingEntry.getValue()) { + // TODO: support optional segments for GrpcQueryServer. + String serverHost = serverInstance.getHostname(); + int port = serverInstance.getGrpcPort(); + // TODO: enable throttling on per host bases. + Iterator streamingResponse = _streamingQueryClient.submit(serverHost, port, + new GrpcRequestBuilder().setRequestId(requestId).setBrokerId(_brokerId).setEnableTrace(trace) + .setEnableStreaming(true).setBrokerRequest(serverQueryRoutingContext.getBrokerRequest()) + .setSegments(serverQueryRoutingContext.getRequiredSegmentsToQuery()).build()); + responseMap.put(serverInstance.toServerRoutingInstance(ServerInstance.RoutingType.GRPC), streamingResponse); + } } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 349affe6c83..6ed7d618e10 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -20,12 +20,11 @@ import com.google.common.collect.Maps; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.failuredetector.FailureDetector; import org.apache.pinot.broker.failuredetector.FailureDetectorFactory; @@ -46,6 +45,7 @@ import org.apache.pinot.core.transport.QueryResponse; import org.apache.pinot.core.transport.QueryRouter; import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerQueryRoutingContext; import org.apache.pinot.core.transport.ServerResponse; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; @@ -97,13 +97,11 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + BrokerRequest serverBrokerRequest, + Map> queryRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { - assert offlineBrokerRequest != null || realtimeBrokerRequest != null; + assert !queryRoutingTable.isEmpty(); if (requestContext.isSampledRequest()) { serverBrokerRequest.getPinotQuery().putToQueryOptions(CommonConstants.Broker.Request.TRACE, "true"); } @@ -111,10 +109,9 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques String rawTableName = TableNameBuilder.extractRawTableName(serverBrokerRequest.getQuerySource().getTableName()); long scatterGatherStartTimeNs = System.nanoTime(); AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable, - realtimeBrokerRequest, realtimeRoutingTable, timeoutMs); + _queryRouter.submitQuery(requestId, rawTableName, queryRoutingTable, timeoutMs); _failureDetector.notifyQuerySubmitted(asyncQueryResponse); - Map finalResponses = asyncQueryResponse.getFinalResponses(); + Map> finalResponses = asyncQueryResponse.getFinalResponses(); if (asyncQueryResponse.getStatus() == QueryResponse.Status.TIMED_OUT) { _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1); } @@ -126,16 +123,17 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques int numServersQueried = finalResponses.size(); long totalResponseSize = 0; - Map dataTableMap = Maps.newHashMapWithExpectedSize(numServersQueried); + Map> dataTableMap = Maps.newHashMapWithExpectedSize(numServersQueried); List serversNotResponded = new ArrayList<>(); - for (Map.Entry entry : finalResponses.entrySet()) { - ServerResponse serverResponse = entry.getValue(); - DataTable dataTable = serverResponse.getDataTable(); - if (dataTable != null) { - dataTableMap.put(entry.getKey(), dataTable); - totalResponseSize += serverResponse.getResponseSize(); - } else { - serversNotResponded.add(entry.getKey()); + for (Map.Entry> serverResponses : finalResponses.entrySet()) { + for (ServerResponse response : serverResponses.getValue()) { + DataTable dataTable = response.getDataTable(); + if (dataTable != null) { + dataTableMap.computeIfAbsent(serverResponses.getKey(), k -> new ArrayList<>()).add(dataTable); + totalResponseSize += response.getResponseSize(); + } else { + serversNotResponded.add(serverResponses.getKey()); + } } } int numServersResponded = dataTableMap.size(); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java index 859b834f0ce..bd8e87fd062 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java @@ -26,7 +26,6 @@ import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.core.transport.QueryResponse; import org.apache.pinot.core.transport.ServerRoutingInstance; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.CommonConstants.Broker; @@ -65,7 +64,7 @@ public void setUp() { @Test public void testConnectionFailure() { QueryResponse queryResponse = mock(QueryResponse.class); - when(queryResponse.getFailedServer()).thenReturn(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE)); + when(queryResponse.getFailedServer()).thenReturn(new ServerRoutingInstance("localhost", 1234)); // No failure detection when submitting the query _failureDetector.notifyQuerySubmitted(queryResponse); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java index 0677d9dc5dc..7a4445df016 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import javax.annotation.Nullable; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.broker.broker.AllowAllAccessControlFactory; @@ -38,6 +37,7 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.core.routing.RoutingTable; import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerQueryRoutingContext; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.env.PinotConfiguration; @@ -154,7 +154,8 @@ public void testGetActualColumnNameCaseInSensitive() { } @Test - public void testCancelQuery() { + public void testCancelQuery() + throws InterruptedException { String tableName = "myTable_OFFLINE"; // Mock pretty much everything until the query can be submitted. TableCache tableCache = mock(TableCache.class); @@ -192,10 +193,8 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + BrokerRequest serverBrokerRequest, + Map> queryRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { testRequestId[0] = requestId; diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGeneratorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGeneratorTest.java new file mode 100644 index 00000000000..c4bea7cc2aa --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGeneratorTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class BrokerRequestIdGeneratorTest { + + @Test + public void testGet() { + BrokerRequestIdGenerator gen = new BrokerRequestIdGenerator("foo"); + long id = gen.get(); + assertEquals(id % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 0); + assertEquals(id / BrokerRequestIdUtils.TABLE_TYPE_OFFSET % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 0); + + id = gen.get(); + assertEquals(id % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 0); + assertEquals(id / BrokerRequestIdUtils.TABLE_TYPE_OFFSET % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 1); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java index 60d38a738d4..03a07f614ea 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java @@ -94,6 +94,7 @@ public class DataTableImplV4 implements DataTable { protected ByteBuffer _fixedSizeData; protected byte[] _variableSizeDataBytes; protected ByteBuffer _variableSizeData; + // TODO(egalpin): add query hash to metadata, alongside requestId protected Map _metadata; protected int[] _columnOffsets; protected int _rowSizeInBytes; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java index dc3b40bfd8a..37ec424baee 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; +import org.apache.pinot.spi.config.table.TableType; import static java.nio.charset.StandardCharsets.UTF_8; @@ -78,4 +80,15 @@ public static String decodeString(ByteBuffer buffer) return new String(bytes, UTF_8); } } + + public static TableType inferTableType(DataTable dataTable) { + TableType tableType; + long requestId = Long.parseLong(dataTable.getMetadata().get(DataTable.MetadataKey.REQUEST_ID.getName())); + if (requestId == BrokerRequestIdUtils.getRealtimeRequestId(requestId)) { + tableType = TableType.REALTIME; + } else { + tableType = TableType.OFFLINE; + } + return tableType; + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtils.java new file mode 100644 index 00000000000..f0937469c4c --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtils.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.request; + +public class BrokerRequestIdUtils { + public static final int TABLE_TYPE_OFFSET = 10; + private static final long CANONICAL_MASK = Long.MAX_VALUE / TABLE_TYPE_OFFSET * TABLE_TYPE_OFFSET; + + private BrokerRequestIdUtils() { + } + + public static long getOfflineRequestId(long requestId) { + return getCanonicalRequestId(requestId); + } + + public static long getRealtimeRequestId(long requestId) { + return getCanonicalRequestId(requestId) | 0x1; + } + + public static long getCanonicalRequestId(long requestId) { + return requestId & CANONICAL_MASK; + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtilsTest.java new file mode 100644 index 00000000000..1f05e870542 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtilsTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.request; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class BrokerRequestIdUtilsTest { + @Test + public void testGetOfflineRequestId() { + Assert.assertEquals(BrokerRequestIdUtils.getOfflineRequestId(123), 120); + } + + @Test + public void testGetRealtimeRequestId() { + Assert.assertEquals(BrokerRequestIdUtils.getRealtimeRequestId(123), 121); + } + + @Test + public void testGetCanonicalRequestId() { + Assert.assertEquals(BrokerRequestIdUtils.getCanonicalRequestId(123), 120); + } +} diff --git a/pinot-common/src/thrift/request.thrift b/pinot-common/src/thrift/request.thrift index 225836da549..141cb6d1e4c 100644 --- a/pinot-common/src/thrift/request.thrift +++ b/pinot-common/src/thrift/request.thrift @@ -52,4 +52,5 @@ struct InstanceRequest { 4: optional bool enableTrace; 5: optional string brokerId; 6: optional list optionalSegments; +// 7: required i64 queryHash; } diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala index 753b2c37949..efe0af2ea42 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala @@ -26,7 +26,7 @@ import org.apache.pinot.common.request.BrokerRequest import org.apache.pinot.connector.spark.common.partition.PinotSplit import org.apache.pinot.connector.spark.common.{Logging, PinotDataSourceReadOptions, PinotException} import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager -import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, ServerInstance} +import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, ServerInstance, ServerQueryRoutingContext} import org.apache.pinot.spi.config.table.TableType import org.apache.pinot.spi.env.PinotConfiguration import org.apache.pinot.spi.metrics.PinotMetricUtils @@ -40,9 +40,9 @@ import scala.collection.JavaConverters._ * Eg: offline-server1: segment1, segment2, segment3 */ private[reader] class PinotServerDataFetcher( - partitionId: Int, - pinotSplit: PinotSplit, - dataSourceOptions: PinotDataSourceReadOptions) + partitionId: Int, + pinotSplit: PinotSplit, + dataSourceOptions: PinotDataSourceReadOptions) extends Logging { private val brokerId = "apache_spark" private val metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry @@ -53,33 +53,32 @@ private[reader] class PinotServerDataFetcher( // TODO add support for TLS-secured server def fetchData(): List[DataTable] = { - val routingTableForRequest = createRoutingTableForRequest() - val requestStartTime = System.nanoTime() - val pinotServerAsyncQueryResponse = pinotSplit.serverAndSegments.serverType match { + val routingTableForRequest = + pinotSplit.serverAndSegments.serverType match { case TableType.REALTIME => - val realtimeBrokerRequest = - CalciteSqlCompiler.compileToBrokerRequest(pinotSplit.query.realtimeSelectQuery) - submitRequestToPinotServer(null, null, realtimeBrokerRequest, routingTableForRequest) + val realtimeBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(pinotSplit.query.realtimeSelectQuery) + createRoutingTableForRequest(realtimeBrokerRequest) + case TableType.OFFLINE => - val offlineBrokerRequest = - CalciteSqlCompiler.compileToBrokerRequest(pinotSplit.query.offlineSelectQuery) - submitRequestToPinotServer(offlineBrokerRequest, routingTableForRequest, null, null) + val offlineBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(pinotSplit.query.offlineSelectQuery) + createRoutingTableForRequest(offlineBrokerRequest) } - val pinotServerResponse = pinotServerAsyncQueryResponse.getFinalResponses.values().asScala.toList + val pinotServerAsyncQueryResponse = submitRequestToPinotServer(routingTableForRequest) + val pinotServerResponse = pinotServerAsyncQueryResponse.getFinalResponses.values().asScala.flatMap(_.asScala.toList).toList logInfo(s"Pinot server total response time in millis: ${System.nanoTime() - requestStartTime}") closePinotServerConnection() pinotServerResponse.foreach { response => - logInfo( - s"Request stats; " + - s"responseSize: ${response.getResponseSize}, " + - s"responseDelayMs: ${response.getResponseDelayMs}, " + - s"deserializationTimeMs: ${response.getDeserializationTimeMs}, " + - s"submitDelayMs: ${response.getSubmitDelayMs}" - ) + logInfo( + s"Request stats; " + + s"responseSize: ${response.getResponseSize}, " + + s"responseDelayMs: ${response.getResponseDelayMs}, " + + s"deserializationTimeMs: ${response.getDeserializationTimeMs}, " + + s"submitDelayMs: ${response.getSubmitDelayMs}" + ) } val dataTables = pinotServerResponse @@ -93,31 +92,29 @@ private[reader] class PinotServerDataFetcher( dataTables.filter(_.getNumberOfRows > 0) } - private def createRoutingTableForRequest(): JMap[ServerInstance, Pair[JList[String], JList[String]]] = { + // TODO(egalpin): update to use ServerQueryRoutingContext + private def createRoutingTableForRequest(brokerRequest: BrokerRequest): JMap[ServerInstance, JList[ServerQueryRoutingContext]] = { val nullZkId: String = null val instanceConfig = new InstanceConfig(nullZkId) instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost) instanceConfig.setPort(pinotSplit.serverAndSegments.serverPort) // TODO: support netty-sec val serverInstance = new ServerInstance(instanceConfig) + val serverQueryRoutingContext = new ServerQueryRoutingContext(brokerRequest, + Pair.of(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava), + serverInstance.toServerRoutingInstance(serverInstance.isTlsEnabled)) Map( - serverInstance -> Pair.of(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava) + serverInstance -> List(serverQueryRoutingContext).asJava ).asJava } private def submitRequestToPinotServer( - offlineBrokerRequest: BrokerRequest, - offlineRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]], - realtimeBrokerRequest: BrokerRequest, - realtimeRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]]): AsyncQueryResponse = { + queryRoutingTable: JMap[ServerInstance, JList[ServerQueryRoutingContext]]): AsyncQueryResponse = { logInfo(s"Sending request to ${pinotSplit.serverAndSegments.toString}") queryRouter.submitQuery( partitionId, pinotSplit.query.rawTableName, - offlineBrokerRequest, - offlineRoutingTable, - realtimeBrokerRequest, - realtimeRoutingTable, + queryRoutingTable, dataSourceOptions.pinotServerTimeoutMs ) } @@ -131,9 +128,9 @@ private[reader] class PinotServerDataFetcher( object PinotServerDataFetcher { def apply( - partitionId: Int, - pinotSplit: PinotSplit, - dataSourceOptions: PinotDataSourceReadOptions): PinotServerDataFetcher = { + partitionId: Int, + pinotSplit: PinotSplit, + dataSourceOptions: PinotDataSourceReadOptions): PinotServerDataFetcher = { new PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions) } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java index 94cccc45ae9..444442bf859 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java @@ -57,6 +57,8 @@ public InstanceResponseBlock execute() InstanceResponseBlock instanceResponseBlock = instanceResponseOperator.nextBlock(); long endTime2 = System.currentTimeMillis(); LOGGER.debug("InstanceResponseOperator.nextBlock() took: {}ms", endTime2 - endTime1); +// instanceResponseBlock.addMetadata(DataTable.MetadataKey.TABLE.getName(), +// _instanceResponsePlanNode._queryContext.getTableName()); return instanceResponseBlock; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 58d3befb569..aa9bf74c273 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -562,6 +562,7 @@ public static InstanceResponseBlock executeDescribeExplain(Plan queryPlan, Query String.valueOf(numEmptyFilterSegments)); instanceResponse.addMetadata(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(), String.valueOf(numMatchAllFilterSegments)); +// instanceResponse.addMetadata(MetadataKey.TABLE.getName(), queryContext.getTableName()); return instanceResponse; } finally { responseOperator.releaseAll(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java index 1c39b6971bf..b59422d6e16 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java @@ -57,18 +57,20 @@ public AggregationDataTableReducer(QueryContext queryContext) { */ @Override public void reduceAndSetResults(String tableName, DataSchema dataSchema, - Map dataTableMap, BrokerResponseNative brokerResponseNative, + Map> dataTableMap, BrokerResponseNative brokerResponseNative, DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) { dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(_queryContext, dataSchema); - if (dataTableMap.isEmpty()) { + // flatten the data tables from all servers responding to the query into a single collection of DataTables + Collection dataTables = getFlatDataTables(dataTableMap); + + if (dataTables.isEmpty()) { DataSchema resultTableSchema = new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema(dataSchema)).getResultDataSchema(); brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, Collections.emptyList())); return; } - Collection dataTables = dataTableMap.values(); if (_queryContext.isServerReturnFinalResult()) { if (dataTables.size() == 1) { processSingleFinalResult(dataSchema, dataTables.iterator().next(), brokerResponseNative); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index 03d801941e3..827e95bdf98 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -59,7 +60,8 @@ public BrokerReduceService(PinotConfiguration config) { } public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, BrokerRequest serverBrokerRequest, - Map dataTableMap, long reduceTimeOutMs, BrokerMetrics brokerMetrics) { + Map> dataTableMap, long reduceTimeOutMs, + BrokerMetrics brokerMetrics) { if (dataTableMap.isEmpty()) { // Empty response. return BrokerResponseNative.empty(); @@ -78,42 +80,50 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, Broke List serversWithConflictingDataSchema = new ArrayList<>(); // Process server response metadata. - Iterator> iterator = dataTableMap.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - DataTable dataTable = entry.getValue(); - - // aggregate metrics - aggregator.aggregate(entry.getKey(), dataTable); - - // After processing the metadata, remove data tables without data rows inside. - DataSchema dataSchema = dataTable.getDataSchema(); - if (dataSchema == null) { - iterator.remove(); - } else { - // Try to cache a data table with data rows inside, or cache one with data schema inside. - if (dataTable.getNumberOfRows() == 0) { - if (dataSchemaFromEmptyDataTable == null) { - dataSchemaFromEmptyDataTable = dataSchema; - } - iterator.remove(); + Iterator serverIter = dataTableMap.keySet().iterator(); + while (serverIter.hasNext()) { + ServerRoutingInstance serverRoutingInstance = serverIter.next(); + Iterator tableIter = dataTableMap.get(serverRoutingInstance).iterator(); + while (tableIter.hasNext()) { + DataTable dataTable = tableIter.next(); + + // aggregate metrics + aggregator.aggregate(serverRoutingInstance, dataTable); + + // After processing the metadata, remove data tables without data rows inside. + DataSchema dataSchema = dataTable.getDataSchema(); + if (dataSchema == null) { + tableIter.remove(); } else { - if (dataSchemaFromNonEmptyDataTable == null) { - dataSchemaFromNonEmptyDataTable = dataSchema; + // Try to cache a data table with data rows inside, or cache one with data schema inside. + if (dataTable.getNumberOfRows() == 0) { + if (dataSchemaFromEmptyDataTable == null) { + dataSchemaFromEmptyDataTable = dataSchema; + } + tableIter.remove(); } else { - // Remove data tables with conflicting data schema. - // NOTE: Only compare the column data types, since the column names (string representation of expression) - // can change across different versions. - if (!Arrays.equals(dataSchema.getColumnDataTypes(), dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) { - serversWithConflictingDataSchema.add(entry.getKey()); - iterator.remove(); + if (dataSchemaFromNonEmptyDataTable == null) { + dataSchemaFromNonEmptyDataTable = dataSchema; + } else { + // Remove data tables with conflicting data schema. + // NOTE: Only compare the column data types, since the column names (string representation of expression) + // can change across different versions. + if (!Arrays.equals(dataSchema.getColumnDataTypes(), + dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) { + serversWithConflictingDataSchema.add(serverRoutingInstance); + tableIter.remove(); + } } } } } + // Remove map entries where there are no longer any associated data tables (removed by above) + if (dataTableMap.get(serverRoutingInstance).isEmpty()) { + serverIter.remove(); + } } - String tableName = serverBrokerRequest.getQuerySource().getTableName(); + String tableName = serverBrokerRequest.getPinotQuery().getDataSource().getTableName(); String rawTableName = TableNameBuilder.extractRawTableName(tableName); // Set execution statistics and Update broker metrics. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java index 496df03da85..d23ea9e964e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.core.query.reduce; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -40,6 +43,14 @@ public interface DataTableReducer { * @param reducerContext DataTableReducer context * @param brokerMetrics broker metrics */ - void reduceAndSetResults(String tableName, DataSchema dataSchema, Map dataTableMap, - BrokerResponseNative brokerResponseNative, DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics); + // TODO(egalpin): could dataTableMap be made into an Iterable instead? The keys appear unused in all impls + void reduceAndSetResults(String tableName, DataSchema dataSchema, + Map> dataTableMap, BrokerResponseNative brokerResponseNative, + DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics); + + default Collection getFlatDataTables(Map> dataTableMap) { + List dataTables = new ArrayList<>(); + dataTableMap.values().forEach(dataTables::addAll); + return dataTables; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java index 4553776963e..8fd9e9182d9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.query.reduce; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -49,23 +50,26 @@ public DistinctDataTableReducer(QueryContext queryContext) { @Override public void reduceAndSetResults(String tableName, DataSchema dataSchema, - Map dataTableMap, BrokerResponseNative brokerResponseNative, + Map> dataTableMap, BrokerResponseNative brokerResponseNative, DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) { dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForDistinct(_queryContext, dataSchema); DistinctTable distinctTable = new DistinctTable(dataSchema, _queryContext.getOrderByExpressions(), _queryContext.getLimit(), _queryContext.isNullHandlingEnabled()); + + Collection dataTables = getFlatDataTables(dataTableMap); if (distinctTable.hasOrderBy()) { - addToOrderByDistinctTable(dataSchema, dataTableMap, distinctTable); + addToOrderByDistinctTable(dataSchema, dataTables, distinctTable); } else { - addToNonOrderByDistinctTable(dataSchema, dataTableMap, distinctTable); + addToNonOrderByDistinctTable(dataSchema, dataTables, distinctTable); } brokerResponseNative.setResultTable(reduceToResultTable(distinctTable)); } - private void addToOrderByDistinctTable(DataSchema dataSchema, Map dataTableMap, - DistinctTable distinctTable) { - for (DataTable dataTable : dataTableMap.values()) { + private void addToOrderByDistinctTable(DataSchema dataSchema, + Collection dataTables, DistinctTable distinctTable) { + + for (DataTable dataTable : dataTables) { Tracing.ThreadAccountantOps.sampleAndCheckInterruption(); int numColumns = dataSchema.size(); int numRows = dataTable.getNumberOfRows(); @@ -86,9 +90,9 @@ private void addToOrderByDistinctTable(DataSchema dataSchema, Map dataTableMap, - DistinctTable distinctTable) { - for (DataTable dataTable : dataTableMap.values()) { + private void addToNonOrderByDistinctTable(DataSchema dataSchema, + Collection dataTables, DistinctTable distinctTable) { + for (DataTable dataTable : dataTables) { Tracing.ThreadAccountantOps.sampleAndCheckInterruption(); int numColumns = dataSchema.size(); int numRows = dataTable.getNumberOfRows(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java index 322af48d7bc..757c00981e7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.LongConsumer; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTableUtils; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.BrokerTimer; @@ -32,6 +33,7 @@ import org.apache.pinot.common.response.broker.QueryProcessingException; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; public class ExecutionStatsAggregator { @@ -70,10 +72,15 @@ public ExecutionStatsAggregator(boolean enableTrace) { _enableTrace = enableTrace; } - public void aggregate(ServerRoutingInstance routingInstance, DataTable dataTable) { - TableType tableType = routingInstance.getTableType(); - String instanceName = routingInstance.getShortName(); + public void aggregate(ServerRoutingInstance serverRoutingInstance, DataTable dataTable) { + String instanceName = serverRoutingInstance.getShortName(); Map metadata = dataTable.getMetadata(); + TableType tableType = null; + if (metadata.get(DataTable.MetadataKey.TABLE.getName()) != null) { + tableType = TableNameBuilder.getTableTypeFromTableName(metadata.get(DataTable.MetadataKey.TABLE.getName())); + } else if (metadata.get(DataTable.MetadataKey.REQUEST_ID.getName()) != null) { + tableType = DataTableUtils.inferTableType(dataTable); + } // Reduce on trace info. if (_enableTrace && metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName())) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExplainPlanDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExplainPlanDataTableReducer.java index 2a7ce4bc205..b475dd5a0d0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExplainPlanDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExplainPlanDataTableReducer.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.query.reduce; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -55,16 +56,17 @@ public class ExplainPlanDataTableReducer implements DataTableReducer { @Override public void reduceAndSetResults(String tableName, DataSchema dataSchema, - Map dataTableMap, BrokerResponseNative brokerResponseNative, + Map> dataTableMap, BrokerResponseNative brokerResponseNative, DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) { List reducedRows = new ArrayList<>(); // Top node should be a BROKER_REDUCE node. addBrokerReduceOperation(reducedRows); + Collection dataTables = getFlatDataTables(dataTableMap); // Construct the combine node - Object[] combinedRow = extractCombineNode(dataTableMap); + Object[] combinedRow = extractCombineNode(dataTables); if (combinedRow != null) { reducedRows.add(combinedRow); } @@ -74,7 +76,7 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, boolean explainPlanVerbose = queryOptions != null && QueryOptionsUtils.isExplainPlanVerbose(queryOptions); // Add the rest of the rows for each unique Explain plan received from the servers - List explainPlanRowsList = extractUniqueExplainPlansAcrossServers(dataTableMap, combinedRow); + List explainPlanRowsList = extractUniqueExplainPlansAcrossServers(dataTables, combinedRow); if (!explainPlanVerbose && (explainPlanRowsList.size() > 1)) { // Pick the most appropriate plan if verbose option is disabled explainPlanRowsList = chooseBestExplainPlanToUse(explainPlanRowsList); @@ -103,14 +105,13 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, * Extract the combine node to use as the global combine step if present. If no combine node is found, return null. * A combine node may not be found if all segments were pruned across all servers. */ - private Object[] extractCombineNode(Map dataTableMap) { - if (dataTableMap.isEmpty()) { + private Object[] extractCombineNode(Collection dataTables) { + if (dataTables.isEmpty()) { return null; } Object[] combineRow = null; - for (Map.Entry entry : dataTableMap.entrySet()) { - DataTable dataTable = entry.getValue(); + for (DataTable dataTable : dataTables) { int numRows = dataTable.getNumberOfRows(); if (numRows > 0) { // First row should be the combine row data, unless all segments were pruned from the Server side @@ -129,12 +130,11 @@ private Object[] extractCombineNode(Map dataTa * Extract a list of all the unique explain plans across all servers */ private List extractUniqueExplainPlansAcrossServers( - Map dataTableMap, Object[] combinedRow) { + Collection dataTables, Object[] combinedRow) { List explainPlanRowsList = new ArrayList<>(); HashSet explainPlanHashCodeSet = new HashSet<>(); - for (Map.Entry entry : dataTableMap.entrySet()) { - DataTable dataTable = entry.getValue(); + for (DataTable dataTable : dataTables) { int numRows = dataTable.getNumberOfRows(); ExplainPlanRows explainPlanRows = null; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 09b4d6a1561..1d24e2c4d3a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -101,11 +101,12 @@ public GroupByDataTableReducer(QueryContext queryContext) { */ @Override public void reduceAndSetResults(String tableName, DataSchema dataSchema, - Map dataTableMap, BrokerResponseNative brokerResponse, + Map> dataTableMap, BrokerResponseNative brokerResponse, DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) { dataSchema = ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(_queryContext, dataSchema); - if (dataTableMap.isEmpty()) { + Collection dataTables = getFlatDataTables(dataTableMap); + if (dataTables.isEmpty()) { PostAggregationHandler postAggregationHandler = new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema(dataSchema)); DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema(); @@ -114,7 +115,6 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, return; } - Collection dataTables = dataTableMap.values(); // NOTE: Use regular reduce when group keys are not partitioned even if there are only one data table because the // records are not sorted yet. if (_queryContext.isServerReturnFinalResult() && dataTables.size() == 1) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java index a8226aee5e2..af57936f894 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.query.reduce; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -48,18 +49,20 @@ public SelectionDataTableReducer(QueryContext queryContext) { */ @Override public void reduceAndSetResults(String tableName, DataSchema dataSchema, - Map dataTableMap, BrokerResponseNative brokerResponseNative, + Map> dataTableMap, BrokerResponseNative brokerResponseNative, DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) { Pair pair = SelectionOperatorUtils.getResultTableDataSchemaAndColumnIndices(_queryContext, dataSchema); int limit = _queryContext.getLimit(); - if (dataTableMap.isEmpty() || limit == 0) { + Collection dataTables = getFlatDataTables(dataTableMap); + + if (dataTables.isEmpty() || limit == 0) { brokerResponseNative.setResultTable(new ResultTable(pair.getLeft(), Collections.emptyList())); return; } if (_queryContext.getOrderByExpressions() == null) { // Selection only - List reducedRows = SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit, + List reducedRows = SelectionOperatorUtils.reduceWithoutOrdering(dataTables, limit, _queryContext.isNullHandlingEnabled()); brokerResponseNative.setResultTable( SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows, pair.getLeft(), pair.getRight())); @@ -67,7 +70,7 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, // Selection order-by SelectionOperatorService selectionService = new SelectionOperatorService(_queryContext, pair.getLeft(), pair.getRight()); - selectionService.reduceWithOrdering(dataTableMap.values()); + selectionService.reduceWithOrdering(dataTables); brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java index 4783a1c9a53..98fcdb2d584 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java @@ -44,6 +44,7 @@ */ public class ServerQueryRequest { private final long _requestId; + private final String _tableName; private final String _brokerId; private final boolean _enableTrace; private final boolean _enableStreaming; @@ -71,9 +72,11 @@ public ServerQueryRequest(InstanceRequest instanceRequest, ServerMetrics serverM _segmentsToQuery = instanceRequest.getSearchSegments(); _optionalSegments = instanceRequest.getOptionalSegments(); _queryContext = getQueryContext(instanceRequest.getQuery().getPinotQuery()); + // Method to set table name needs to match whats in AsyncQueryResponse + _tableName = _queryContext.getTableName(); _queryId = QueryIdUtils.getQueryId(_brokerId, _requestId, - TableNameBuilder.getTableTypeFromTableName(_queryContext.getTableName())); - _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs); + TableNameBuilder.getTableTypeFromTableName(_tableName)); + _timerContext = new TimerContext(_tableName, serverMetrics, queryArrivalTimeMs); } public ServerQueryRequest(Server.ServerRequest serverRequest, ServerMetrics serverMetrics) @@ -102,9 +105,10 @@ public ServerQueryRequest(Server.ServerRequest serverRequest, ServerMetrics serv throw new UnsupportedOperationException("Unsupported payloadType: " + payloadType); } _queryContext = getQueryContext(brokerRequest.getPinotQuery()); + _tableName = _queryContext.getTableName(); _queryId = QueryIdUtils.getQueryId(_brokerId, _requestId, - TableNameBuilder.getTableTypeFromTableName(_queryContext.getTableName())); - _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs); + TableNameBuilder.getTableTypeFromTableName(_tableName)); + _timerContext = new TimerContext(_tableName, serverMetrics, queryArrivalTimeMs); } /** @@ -123,10 +127,11 @@ public ServerQueryRequest(QueryContext queryContext, List segmentsToQuer _queryId = QueryIdUtils.getQueryId(_brokerId, _requestId, TableNameBuilder.getTableTypeFromTableName(_queryContext.getTableName())); + _tableName = _queryContext.getTableName(); _segmentsToQuery = segmentsToQuery; _optionalSegments = null; - _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs); + _timerContext = new TimerContext(_tableName, serverMetrics, queryArrivalTimeMs); } private static QueryContext getQueryContext(PinotQuery pinotQuery) { @@ -137,6 +142,10 @@ public long getRequestId() { return _requestId; } + public String getTableName() { + return _tableName; + } + public String getBrokerId() { return _brokerId; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index f4c2a241e76..ec59b211fce 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -131,6 +131,7 @@ protected ListenableFutureTask createQueryFutureTask(ServerQueryRequest * @return serialized query response */ @Nullable + // egalpin: where dataTable is created and serialized protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest, ExecutorService executorService) { //Start instrumentation context. This must not be moved further below interspersed into the code. @@ -153,6 +154,7 @@ protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest, Execu long requestId = queryRequest.getRequestId(); Map responseMetadata = instanceResponse.getResponseMetadata(); responseMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + responseMetadata.put(MetadataKey.TABLE.getName(), queryRequest.getTableNameWithType()); byte[] responseBytes = serializeResponse(queryRequest, instanceResponse); // Log the statistics @@ -174,6 +176,7 @@ protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest, Execu instanceResponse = new InstanceResponseBlock(); instanceResponse.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, errMsg)); instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + instanceResponse.addMetadata(MetadataKey.TABLE.getName(), queryRequest.getTableNameWithType()); responseBytes = serializeResponse(queryRequest, instanceResponse); } @@ -226,6 +229,7 @@ private byte[] serializeResponse(ServerQueryRequest queryRequest, InstanceRespon LOGGER.error(errMsg); instanceResponse = new InstanceResponseBlock(new ExceptionResultsBlock(new QueryCancelledException(errMsg, e))); instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(), Long.toString(queryRequest.getRequestId())); + instanceResponse.addMetadata(MetadataKey.TABLE.getName(), queryRequest.getTableNameWithType()); return serializeResponse(queryRequest, instanceResponse); } catch (Exception e) { _serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1); @@ -248,6 +252,7 @@ protected ListenableFuture immediateErrorResponse(ServerQueryRequest que ProcessingException error) { InstanceResponseBlock instanceResponse = new InstanceResponseBlock(); instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(), Long.toString(queryRequest.getRequestId())); + instanceResponse.addMetadata(MetadataKey.TABLE.getName(), queryRequest.getTableNameWithType()); instanceResponse.addException(error); return Futures.immediateFuture(serializeResponse(queryRequest, instanceResponse)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java index a9546cc054e..6a78162347c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.core.transport; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,9 +30,12 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTableUtils; import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.common.utils.HashUtil; +import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; /** @@ -39,10 +44,13 @@ @ThreadSafe public class AsyncQueryResponse implements QueryResponse { private final QueryRouter _queryRouter; - private final long _requestId; + + // TODO(egalpin): Remove the concept of truncated request ID in next major release after all servers are expected + // to send back tableName in DataTable metadata + private final long _canonicalRequestId; private final AtomicReference _status = new AtomicReference<>(Status.IN_PROGRESS); private final AtomicInteger _numServersResponded = new AtomicInteger(); - private final ConcurrentHashMap _responseMap; + private final ConcurrentHashMap> _responses; private final CountDownLatch _countDownLatch; private final long _maxEndTimeMs; private final long _timeoutMs; @@ -51,21 +59,29 @@ public class AsyncQueryResponse implements QueryResponse { private volatile ServerRoutingInstance _failedServer; private volatile Exception _exception; - public AsyncQueryResponse(QueryRouter queryRouter, long requestId, Set serversQueried, - long startTimeMs, long timeoutMs, ServerRoutingStatsManager serverRoutingStatsManager) { + public AsyncQueryResponse(QueryRouter queryRouter, long canonicalRequestId, + Map> requestMap, long startTimeMs, long timeoutMs, + ServerRoutingStatsManager serverRoutingStatsManager) { _queryRouter = queryRouter; - _requestId = requestId; - int numServersQueried = serversQueried.size(); - _responseMap = new ConcurrentHashMap<>(HashUtil.getHashMapCapacity(numServersQueried)); + _canonicalRequestId = canonicalRequestId; + _responses = new ConcurrentHashMap<>(); _serverRoutingStatsManager = serverRoutingStatsManager; - for (ServerRoutingInstance serverRoutingInstance : serversQueried) { - // Record stats related to query submission just before sending the request. Otherwise, if the response is - // received immediately, there's a possibility of updating query response stats before updating query - // submission stats. - _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId, serverRoutingInstance.getInstanceId()); - _responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs)); + int numQueriesIssued = 0; + for (Map.Entry> serverRequests : requestMap.entrySet()) { + for (InstanceRequest request : serverRequests.getValue()) { + // Record stats related to query submission just before sending the request. Otherwise, if the response is + // received immediately, there's a possibility of updating query response stats before updating query + // submission stats. + _serverRoutingStatsManager.recordStatsForQuerySubmission(_canonicalRequestId, + serverRequests.getKey().getInstanceId()); + + _responses.computeIfAbsent(serverRequests.getKey(), k -> new ConcurrentHashMap<>()) + .put(request.getQuery().getPinotQuery().getDataSource().getTableName(), new ServerResponse(startTimeMs)); + numQueriesIssued++; + } } - _countDownLatch = new CountDownLatch(numServersQueried); + + _countDownLatch = new CountDownLatch(numQueriesIssued); _timeoutMs = timeoutMs; _maxEndTimeMs = startTimeMs + timeoutMs; } @@ -80,40 +96,56 @@ public int getNumServersResponded() { return _numServersResponded.get(); } + private Map> getFlatResponses() { + Map> flattened = new HashMap<>(); + for (Map.Entry> serverResponses + : _responses.entrySet()) { + ServerRoutingInstance serverRoutingInstance = serverResponses.getKey(); + + for (ServerResponse serverResponse : serverResponses.getValue().values()) { + flattened.computeIfAbsent(serverRoutingInstance, k -> new ArrayList<>()).add(serverResponse); + } + } + + return flattened; + } + @Override - public Map getCurrentResponses() { - return _responseMap; + public Map> getCurrentResponses() { + return getFlatResponses(); } @Override - public Map getFinalResponses() + public Map> getFinalResponses() throws InterruptedException { + Map> flatResponses = getFlatResponses(); try { boolean finish = _countDownLatch.await(_maxEndTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); _status.compareAndSet(Status.IN_PROGRESS, finish ? Status.COMPLETED : Status.TIMED_OUT); - return _responseMap; + return flatResponses; } finally { // Update ServerRoutingStats for query completion. This is done here to ensure that the stats are updated for // servers even if the query times out or if servers have not responded. - for (Map.Entry entry : _responseMap.entrySet()) { - ServerResponse response = entry.getValue(); - long latency; - - // If server has not responded or if the server response has exceptions, the latency is set to timeout - if (hasServerNotResponded(response) || hasServerReturnedExceptions(response)) { - latency = _timeoutMs; - } else { - latency = response.getResponseDelayMs(); + for (Map.Entry> serverResponses : flatResponses.entrySet()) { + for (ServerResponse response : serverResponses.getValue()) { + long latency; + + // If server has not responded or if the server response has exceptions, the latency is set to timeout + if (hasServerNotResponded(response) || hasServerReturnedExceptions(response)) { + latency = _timeoutMs; + } else { + latency = response.getResponseDelayMs(); + } + _serverRoutingStatsManager.recordStatsUponResponseArrival(_canonicalRequestId, + serverResponses.getKey().getInstanceId(), latency); } - _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, entry.getKey().getInstanceId(), latency); } - - _queryRouter.markQueryDone(_requestId); + _queryRouter.markQueryDone(_canonicalRequestId); } } private boolean hasServerReturnedExceptions(ServerResponse response) { - if (response.getDataTable() != null && response.getDataTable().getExceptions().size() > 0) { + if (response.getDataTable() != null && !response.getDataTable().getExceptions().isEmpty()) { DataTable dataTable = response.getDataTable(); Map exceptions = dataTable.getExceptions(); @@ -139,15 +171,26 @@ private boolean hasServerNotResponded(ServerResponse response) { public String getServerStats() { StringBuilder stringBuilder = new StringBuilder( "(Server=SubmitDelayMs,ResponseDelayMs,ResponseSize,DeserializationTimeMs,RequestSentDelayMs)"); - for (Map.Entry entry : _responseMap.entrySet()) { - stringBuilder.append(';').append(entry.getKey().getShortName()).append('=').append(entry.getValue().toString()); + for (Map.Entry> serverResponses + : _responses.entrySet()) { + for (Map.Entry responsePair : serverResponses.getValue().entrySet()) { + ServerRoutingInstance serverRoutingInstance = serverResponses.getKey(); + stringBuilder.append(';').append(serverRoutingInstance.getShortName()).append('=') + .append(responsePair.getValue().toString()); + } } return stringBuilder.toString(); } @Override public long getServerResponseDelayMs(ServerRoutingInstance serverRoutingInstance) { - return _responseMap.get(serverRoutingInstance).getResponseDelayMs(); + int count = 0; + long sum = 0; + for (ServerResponse serverResponse : _responses.get(serverRoutingInstance).values()) { + count += 1; + sum += serverResponse.getResponseDelayMs(); + } + return sum / count; } @Nullable @@ -163,7 +206,7 @@ public Exception getException() { @Override public long getRequestId() { - return _requestId; + return _canonicalRequestId; } @Override @@ -171,21 +214,47 @@ public long getTimeoutMs() { return _timeoutMs; } - void markRequestSubmitted(ServerRoutingInstance serverRoutingInstance) { - _responseMap.get(serverRoutingInstance).markRequestSubmitted(); + void markRequestSubmitted(ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest) { + _responses.get(serverRoutingInstance).get(instanceRequest.getQuery().getPinotQuery().getDataSource().getTableName()) + .markRequestSubmitted(); + } + + void markRequestSent(ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest, + int requestSentLatencyMs) { + _responses.get(serverRoutingInstance).get(instanceRequest.getQuery().getPinotQuery().getDataSource().getTableName()) + .markRequestSent(requestSentLatencyMs); } - void markRequestSent(ServerRoutingInstance serverRoutingInstance, int requestSentLatencyMs) { - _responseMap.get(serverRoutingInstance).markRequestSent(requestSentLatencyMs); + @Nullable + private String inferTableName(ServerRoutingInstance serverRoutingInstance, DataTable dataTable) { + TableType tableType = DataTableUtils.inferTableType(dataTable); + + for (Map.Entry serverResponseEntry : _responses.get(serverRoutingInstance).entrySet()) { + String tableName = serverResponseEntry.getKey(); + if (TableNameBuilder.getTableTypeFromTableName(tableName).equals(tableType)) { + return tableName; + } + } + return null; } void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable dataTable, int responseSize, int deserializationTimeMs) { - ServerResponse response = _responseMap.get(serverRoutingInstance); - response.receiveDataTable(dataTable, responseSize, deserializationTimeMs); + String tableName = dataTable.getMetadata().get(DataTable.MetadataKey.TABLE.getName()); + // tableName can be null but only in the case of version rollout. tableName will only be null when servers are not + // yet running the version where tableName is included in DataTable metadata. For the time being, it is safe to + // assume that a given server will have only been sent 1 or 2 requests (REALTIME, OFFLINE, or both). We can simply + // iterate through responses associated with a server and make use of the first response with null data table. + // TODO(egalpin): The null handling for tableName can and should be removed in a later release. + if (tableName == null) { + tableName = inferTableName(serverRoutingInstance, dataTable); + } + + _responses.get(serverRoutingInstance).get(tableName) + .receiveDataTable(dataTable, responseSize, deserializationTimeMs); - _numServersResponded.getAndIncrement(); _countDownLatch.countDown(); + _numServersResponded.getAndIncrement(); } void markQueryFailed(ServerRoutingInstance serverRoutingInstance, Exception exception) { @@ -203,9 +272,11 @@ void markQueryFailed(ServerRoutingInstance serverRoutingInstance, Exception exce * server hasn't responded yet. */ void markServerDown(ServerRoutingInstance serverRoutingInstance, Exception exception) { - ServerResponse serverResponse = _responseMap.get(serverRoutingInstance); - if (serverResponse != null && serverResponse.getDataTable() == null) { - markQueryFailed(serverRoutingInstance, exception); + Map serverResponses = _responses.get(serverRoutingInstance); + for (ServerResponse serverResponse : serverResponses.values()) { + if (serverResponse != null && serverResponse.getDataTable() == null) { + markQueryFailed(serverRoutingInstance, exception); + } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java index 3a117a49912..608dd9844c2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java @@ -48,11 +48,6 @@ public DataTableHandler(QueryRouter queryRouter, ServerRoutingInstance serverRou _brokerMetrics = brokerMetrics; } - @Override - public void channelActive(ChannelHandlerContext ctx) { - LOGGER.info("Channel for server: {} is now active", _serverRoutingInstance); - } - @Override public void channelInactive(ChannelHandlerContext ctx) { LOGGER.error("Channel for server: {} is now inactive, marking server down", _serverRoutingInstance); @@ -60,6 +55,11 @@ public void channelInactive(ChannelHandlerContext ctx) { new RuntimeException(String.format("Channel for server: %s is inactive", _serverRoutingInstance))); } + @Override + public void channelActive(ChannelHandlerContext ctx) { + LOGGER.info("Channel for server: {} is now active", _serverRoutingInstance); + } + @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { Tracing.ThreadAccountantOps.setThreadResourceUsageProvider(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java index 0ba00327f63..d120a1f68fd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java @@ -73,7 +73,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E if (DIRECT_OOM_SHUTTING_DOWN.compareAndSet(false, true)) { try { LOGGER.error("Closing ALL channels to servers, as we are running out of direct memory " - + "while receiving response from {}", _serverRoutingInstance, cause); + + "while receiving response from {}", + _serverRoutingInstance, cause); // close all channels to servers _serverToChannelMap.keySet().forEach(serverRoutingInstance -> { ServerChannels.ServerChannel removed = _serverToChannelMap.remove(serverRoutingInstance); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java index 4940823b741..b25c1aabda0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java @@ -65,6 +65,9 @@ * The {@code InstanceRequestHandler} is the Netty inbound handler on Pinot Server side to handle the serialized * instance requests sent from Pinot Broker. */ + +// TODO(egalpin): server-side handler for responding to requests sent from broker + @ChannelHandler.Sharable public class InstanceRequestHandler extends SimpleChannelInboundHandler { private static final Logger LOGGER = LoggerFactory.getLogger(InstanceRequestHandler.class); @@ -183,6 +186,8 @@ void submitQuery(ServerQueryRequest queryRequest, ChannelHandlerContext ctx, Str private FutureCallback createCallback(ChannelHandlerContext ctx, String tableNameWithType, long queryArrivalTimeMs, InstanceRequest instanceRequest, ServerQueryRequest queryRequest) { + // egalpin: whatever invokes this callback has constructed the data table, where we need to add queryHash to the + // metadata return new FutureCallback() { @Override public void onSuccess(@Nullable byte[] responseBytes) { @@ -323,8 +328,8 @@ private void sendResponse(ChannelHandlerContext ctx, String tableNameWithType, l + "request: {}", queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs); } if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) { - LOGGER.warn("Large query: response size in bytes: {}, table name {}", - serializedDataTable.length, tableNameWithType); + LOGGER.warn("Large query: response size in bytes: {}, table name {}", serializedDataTable.length, + tableNameWithType); ServerMetrics.get().addMeteredTableValue(tableNameWithType, ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1); } }); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java index 23f5946cd37..caa91a3ff75 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.transport; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -49,12 +50,12 @@ enum Status { /** * Returns the current server responses without blocking. */ - Map getCurrentResponses(); + Map> getCurrentResponses(); /** * Waits until the query is done (COMPLETED, FAILED or TIMED_OUT) and returns the final server responses. */ - Map getFinalResponses() + Map> getFinalResponses() throws InterruptedException; /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index 2b756118652..145ce57bcac 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.transport; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,6 +37,7 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; @@ -88,64 +90,68 @@ public QueryRouter(String brokerId, BrokerMetrics brokerMetrics, @Nullable Netty } public AsyncQueryResponse submitQuery(long requestId, String rawTableName, - @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs) { - assert offlineBrokerRequest != null || realtimeBrokerRequest != null; + @Nullable Map> queryRoutingTable, long timeoutMs) { + assert queryRoutingTable != null && !queryRoutingTable.isEmpty(); // can prefer but not require TLS until all servers guaranteed to be on TLS boolean preferTls = _serverChannelsTls != null; // skip unavailable servers if the query option is set - boolean skipUnavailableServers = isSkipUnavailableServers(offlineBrokerRequest, realtimeBrokerRequest); + boolean skipUnavailableServers = false; // Build map from server to request based on the routing table - Map requestMap = new HashMap<>(); - if (offlineBrokerRequest != null) { - assert offlineRoutingTable != null; - for (Map.Entry, List>> entry : offlineRoutingTable.entrySet()) { - ServerRoutingInstance serverRoutingInstance = - entry.getKey().toServerRoutingInstance(TableType.OFFLINE, preferTls); - InstanceRequest instanceRequest = getInstanceRequest(requestId, offlineBrokerRequest, entry.getValue()); - requestMap.put(serverRoutingInstance, instanceRequest); - } - } - if (realtimeBrokerRequest != null) { - assert realtimeRoutingTable != null; - for (Map.Entry, List>> entry : realtimeRoutingTable.entrySet()) { - ServerRoutingInstance serverRoutingInstance = - entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls); - InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue()); - requestMap.put(serverRoutingInstance, instanceRequest); + Map> requestMap = new HashMap<>(); + for (Map.Entry> entry : queryRoutingTable.entrySet()) { + for (ServerQueryRoutingContext serverQueryRoutingContext : entry.getValue()) { + ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(preferTls); + + long requestIdWithType = serverQueryRoutingContext.getTableType().equals(TableType.OFFLINE) + ? BrokerRequestIdUtils.getOfflineRequestId(requestId) + : BrokerRequestIdUtils.getRealtimeRequestId(requestId); + + InstanceRequest instanceRequest = + getInstanceRequest(requestIdWithType, serverQueryRoutingContext.getBrokerRequest(), + serverQueryRoutingContext.getSegmentsToQuery()); + if (!skipUnavailableServers && QueryOptionsUtils.isSkipUnavailableServers( + serverQueryRoutingContext.getBrokerRequest().getPinotQuery().getQueryOptions())) { + // Any single query having this option set will set it for all queries. This option should not be possible + // to differ between server instance requests pertaining to the same request ID. + skipUnavailableServers = true; + } + + requestMap.computeIfAbsent(serverRoutingInstance, k -> new ArrayList<>()).add(instanceRequest); } } - // Create the asynchronous query response with the request map + // Map the request using all but the last digit. This way, servers will return request IDs where the last digit + // (injected above) will indicate table type, but both will be able to be divided by the TABLE_TYPE_OFFSET in + // order to properly map back to this originating request AsyncQueryResponse asyncQueryResponse = - new AsyncQueryResponse(this, requestId, requestMap.keySet(), System.currentTimeMillis(), timeoutMs, + new AsyncQueryResponse(this, requestId, requestMap, System.currentTimeMillis(), timeoutMs, _serverRoutingStatsManager); _asyncQueryResponseMap.put(requestId, asyncQueryResponse); - for (Map.Entry entry : requestMap.entrySet()) { - ServerRoutingInstance serverRoutingInstance = entry.getKey(); - ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels; - try { - serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(), - timeoutMs); - asyncQueryResponse.markRequestSubmitted(serverRoutingInstance); - } catch (TimeoutException e) { - if (ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG.equals(e.getMessage())) { - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS, 1); - } - markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e); - break; - } catch (Exception e) { - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1); - if (skipUnavailableServers) { - asyncQueryResponse.skipServerResponse(); - } else { + for (Map.Entry> serverRequests : requestMap.entrySet()) { + for (InstanceRequest request : serverRequests.getValue()) { + + ServerRoutingInstance serverRoutingInstance = serverRequests.getKey(); + ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels; + try { + serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, request, timeoutMs); + asyncQueryResponse.markRequestSubmitted(serverRoutingInstance, request); + } catch (TimeoutException e) { + if (ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG.equals(e.getMessage())) { + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS, 1); + } markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e); break; + } catch (Exception e) { + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1); + if (skipUnavailableServers) { + asyncQueryResponse.skipServerResponse(); + } else { + markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e); + break; + } } } } @@ -153,16 +159,6 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, return asyncQueryResponse; } - private boolean isSkipUnavailableServers(@Nullable BrokerRequest offlineBrokerRequest, - @Nullable BrokerRequest realtimeBrokerRequest) { - if (offlineBrokerRequest != null && QueryOptionsUtils.isSkipUnavailableServers( - offlineBrokerRequest.getPinotQuery().getQueryOptions())) { - return true; - } - return realtimeBrokerRequest != null && QueryOptionsUtils.isSkipUnavailableServers( - realtimeBrokerRequest.getPinotQuery().getQueryOptions()); - } - private void markQueryFailed(long requestId, ServerRoutingInstance serverRoutingInstance, AsyncQueryResponse asyncQueryResponse, Exception e) { LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId, @@ -176,9 +172,9 @@ private void markQueryFailed(long requestId, ServerRoutingInstance serverRouting public boolean connect(ServerInstance serverInstance) { try { if (_serverChannelsTls != null) { - _serverChannelsTls.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, true)); + _serverChannelsTls.connect(serverInstance.toServerRoutingInstance(true)); } else { - _serverChannels.connect(serverInstance.toServerRoutingInstance(TableType.OFFLINE, false)); + _serverChannels.connect(serverInstance.toServerRoutingInstance(false)); } return true; } catch (Exception e) { @@ -194,7 +190,10 @@ public void shutDown() { void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable dataTable, int responseSize, int deserializationTimeMs) { long requestId = Long.parseLong(dataTable.getMetadata().get(MetadataKey.REQUEST_ID.getName())); - AsyncQueryResponse asyncQueryResponse = _asyncQueryResponseMap.get(requestId); + + // TODO(egalpin): remove use of canonical request ID in later major release + AsyncQueryResponse asyncQueryResponse = + _asyncQueryResponseMap.get(BrokerRequestIdUtils.getCanonicalRequestId(requestId)); // Query future might be null if the query is already done (maybe due to failure) if (asyncQueryResponse != null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java index c9fe068ed44..47091d408fe 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java @@ -87,15 +87,11 @@ public ServerChannels(QueryRouter queryRouter, BrokerMetrics brokerMetrics, @Nul @Nullable TlsConfig tlsConfig) { boolean enableNativeTransports = nettyConfig != null && nettyConfig.isNativeTransportsEnabled(); OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType(); - if (enableNativeTransports - && operatingSystemType == OsCheck.OSType.Linux - && Epoll.isAvailable()) { + if (enableNativeTransports && operatingSystemType == OsCheck.OSType.Linux && Epoll.isAvailable()) { _eventLoopGroup = new EpollEventLoopGroup(); _channelClass = EpollSocketChannel.class; LOGGER.info("Using Epoll event loop"); - } else if (enableNativeTransports - && operatingSystemType == OsCheck.OSType.MacOS - && KQueue.isAvailable()) { + } else if (enableNativeTransports && operatingSystemType == OsCheck.OSType.MacOS && KQueue.isAvailable()) { _eventLoopGroup = new KQueueEventLoopGroup(); _channelClass = KQueueSocketChannel.class; LOGGER.info("Using KQueue event loop"); @@ -103,11 +99,9 @@ public ServerChannels(QueryRouter queryRouter, BrokerMetrics brokerMetrics, @Nul _eventLoopGroup = new NioEventLoopGroup(); _channelClass = NioSocketChannel.class; StringBuilder log = new StringBuilder("Using NIO event loop"); - if (operatingSystemType == OsCheck.OSType.Linux - && enableNativeTransports) { + if (operatingSystemType == OsCheck.OSType.Linux && enableNativeTransports) { log.append(", as Epoll is not available: ").append(Epoll.unavailabilityCause()); - } else if (operatingSystemType == OsCheck.OSType.MacOS - && enableNativeTransports) { + } else if (operatingSystemType == OsCheck.OSType.MacOS && enableNativeTransports) { log.append(", as KQueue is not available: ").append(KQueue.unavailabilityCause()); } LOGGER.info(log.toString()); @@ -129,8 +123,9 @@ public void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryRespon ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest, long timeoutMs) throws Exception { byte[] requestBytes = _threadLocalTSerializer.get().serialize(instanceRequest); + // TODO(egalpin): create backward compat hash key? _serverToChannelMap.computeIfAbsent(serverRoutingInstance, ServerChannel::new) - .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes, timeoutMs); + .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, instanceRequest, requestBytes, timeoutMs); } public void connect(ServerRoutingInstance serverRoutingInstance) @@ -138,6 +133,12 @@ public void connect(ServerRoutingInstance serverRoutingInstance) _serverToChannelMap.computeIfAbsent(serverRoutingInstance, ServerChannel::new).connect(); } + public void connect(ServerQueryRoutingContext serverQueryRoutingContext) + throws InterruptedException, TimeoutException { + _serverToChannelMap.computeIfAbsent(serverQueryRoutingContext.getServerRoutingInstance(), ServerChannel::new) + .connect(); + } + public void shutDown() { // Shut down immediately _eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS); @@ -165,26 +166,24 @@ class ServerChannel { _brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE, metric::chunkSize); _bootstrap = new Bootstrap().remoteAddress(serverRoutingInstance.getHostname(), serverRoutingInstance.getPort()) - .option(ChannelOption.ALLOCATOR, bufAllocator) - .group(_eventLoopGroup).channel(_channelClass).option(ChannelOption.SO_KEEPALIVE, true) - .handler(new ChannelInitializer() { + .option(ChannelOption.ALLOCATOR, bufAllocator).group(_eventLoopGroup).channel(_channelClass) + .option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { if (_tlsConfig != null) { // Add SSL handler first to encrypt and decrypt everything. - ch.pipeline().addLast( - ChannelHandlerFactory.SSL, ChannelHandlerFactory.getClientTlsHandler(_tlsConfig, ch)); + ch.pipeline() + .addLast(ChannelHandlerFactory.SSL, ChannelHandlerFactory.getClientTlsHandler(_tlsConfig, ch)); } ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldBasedFrameDecoder()); ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldPrepender()); ch.pipeline().addLast( - ChannelHandlerFactory.getDirectOOMHandler(_queryRouter, _serverRoutingInstance, _serverToChannelMap) - ); + ChannelHandlerFactory.getDirectOOMHandler(_queryRouter, _serverRoutingInstance, _serverToChannelMap)); // NOTE: data table de-serialization happens inside this handler // Revisit if this becomes a bottleneck - ch.pipeline().addLast(ChannelHandlerFactory - .getDataTableHandler(_queryRouter, _serverRoutingInstance, _brokerMetrics)); + ch.pipeline().addLast( + ChannelHandlerFactory.getDataTableHandler(_queryRouter, _serverRoutingInstance, _brokerMetrics)); } }); } @@ -205,12 +204,14 @@ void setSilentShutdown() { } void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse, - ServerRoutingInstance serverRoutingInstance, byte[] requestBytes, long timeoutMs) + ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest, byte[] requestBytes, + long timeoutMs) throws InterruptedException, TimeoutException { if (_channelLock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) { try { connectWithoutLocking(); - sendRequestWithoutLocking(rawTableName, asyncQueryResponse, serverRoutingInstance, requestBytes); + sendRequestWithoutLocking(rawTableName, asyncQueryResponse, serverRoutingInstance, instanceRequest, + requestBytes); } finally { _channelLock.unlock(); } @@ -230,13 +231,13 @@ void connectWithoutLocking() } void sendRequestWithoutLocking(String rawTableName, AsyncQueryResponse asyncQueryResponse, - ServerRoutingInstance serverRoutingInstance, byte[] requestBytes) { + ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest, byte[] requestBytes) { long startTimeMs = System.currentTimeMillis(); _channel.writeAndFlush(Unpooled.wrappedBuffer(requestBytes)).addListener(f -> { int requestSentLatencyMs = (int) (System.currentTimeMillis() - startTimeMs); _brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY, requestSentLatencyMs, TimeUnit.MILLISECONDS); - asyncQueryResponse.markRequestSent(serverRoutingInstance, requestSentLatencyMs); + asyncQueryResponse.markRequestSent(serverRoutingInstance, instanceRequest, requestSentLatencyMs); }); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_REQUESTS_SENT, 1); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_SENT, requestBytes.length); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java index 61a0b73d8a9..d9e8c2024cb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java @@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.utils.config.InstanceUtils; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; @@ -124,27 +123,31 @@ public int getNettyTlsPort() { return _nettyTlsPort; } + public boolean isTlsEnabled() { + return getNettyTlsPort() > 0; + } + // Does not require TLS until all servers guaranteed to be on TLS @Deprecated - public ServerRoutingInstance toServerRoutingInstance(TableType tableType, boolean preferNettyTls) { + public ServerRoutingInstance toServerRoutingInstance(boolean preferNettyTls) { if (preferNettyTls && _nettyTlsPort > 0) { - return new ServerRoutingInstance(_instanceId, _hostname, _nettyTlsPort, tableType, true); + return new ServerRoutingInstance(_instanceId, _hostname, _nettyTlsPort, true); } else { - return new ServerRoutingInstance(_instanceId, _hostname, _port, tableType); + return new ServerRoutingInstance(_instanceId, _hostname, _port); } } - public ServerRoutingInstance toServerRoutingInstance(TableType tableType, RoutingType routingType) { + public ServerRoutingInstance toServerRoutingInstance(RoutingType routingType) { switch (routingType) { case NETTY: Preconditions.checkState(_port > 0, "Netty port is not configured for server: %s", _instanceId); - return new ServerRoutingInstance(_instanceId, _hostname, _port, tableType); + return new ServerRoutingInstance(_instanceId, _hostname, _port); case GRPC: Preconditions.checkState(_grpcPort > 0, "GRPC port is not configured for server: %s", _instanceId); - return new ServerRoutingInstance(_instanceId, _hostname, _grpcPort, tableType); + return new ServerRoutingInstance(_instanceId, _hostname, _grpcPort); case NETTY_TLS: Preconditions.checkState(_nettyTlsPort > 0, "Netty TLS port is not configured for server: %s", _instanceId); - return new ServerRoutingInstance(_instanceId, _hostname, _nettyTlsPort, tableType, true); + return new ServerRoutingInstance(_instanceId, _hostname, _nettyTlsPort, true); default: throw new IllegalStateException("Unsupported routing type: " + routingType); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerQueryRoutingContext.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerQueryRoutingContext.java new file mode 100644 index 00000000000..ccddf04beb7 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerQueryRoutingContext.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.transport; + +import java.util.List; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +public class ServerQueryRoutingContext { + private final BrokerRequest _brokerRequest; + private final Pair, List> _segmentsToQuery; + private final ServerRoutingInstance _serverRoutingInstance; + private final TableType _tableType; + + public ServerQueryRoutingContext(BrokerRequest brokerRequest, Pair, List> segmentsToQuery, + ServerRoutingInstance serverRoutingInstance) { + // TODO(egalpin): somewhere, should we create a map that uses the BrokerRequest as the key and combines all + // segmentsToQuery? this would allow servers to combine results from segments that share a common query before + // sending back to broker + _brokerRequest = brokerRequest; + _segmentsToQuery = segmentsToQuery; + _serverRoutingInstance = serverRoutingInstance; + String tableName = _brokerRequest.getPinotQuery().getDataSource().getTableName(); + _tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + } + + public BrokerRequest getBrokerRequest() { + return _brokerRequest; + } + + public Pair, List> getSegmentsToQuery() { + return _segmentsToQuery; + } + + public List getRequiredSegmentsToQuery() { + return getSegmentsToQuery().getLeft(); + } + + public List getOptionalSegmentsToQuery() { + return getSegmentsToQuery().getRight(); + } + + public TableType getTableType() { + return _tableType; + } + + public ServerRoutingInstance getServerRoutingInstance() { + return _serverRoutingInstance; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerRoutingInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerRoutingInstance.java index faa2cd91a34..5c9dd4904c9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerRoutingInstance.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerRoutingInstance.java @@ -23,43 +23,36 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.concurrent.ThreadSafe; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants.Helix; /** * The {@code ServerRoutingInstance} class represents the routing target instance which contains the information of - * hostname, port, and table type it serves. - *

Different table types on same host and port are counted as different instances. Therefore, one single Pinot Server - * might be treated as two different routing target instances based on the types of table it serves. + * hostname and port. */ @ThreadSafe public final class ServerRoutingInstance { - private static final String SHORT_OFFLINE_SUFFIX = "_O"; - private static final String SHORT_REALTIME_SUFFIX = "_R"; private static final Map SHORT_HOSTNAME_MAP = new ConcurrentHashMap<>(); private final String _instanceId; private final String _hostname; private final int _port; - private final TableType _tableType; private final boolean _tlsEnabled; - public ServerRoutingInstance(String instanceId, String hostname, int port, TableType tableType, boolean tlsEnabled) { + public ServerRoutingInstance(String instanceId, String hostname, int port, boolean tlsEnabled) { _instanceId = instanceId; _hostname = hostname; _port = port; - _tableType = tableType; _tlsEnabled = tlsEnabled; } - public ServerRoutingInstance(String instanceId, String hostname, int port, TableType tableType) { - this(instanceId, hostname, port, tableType, false); + public ServerRoutingInstance(String instanceId, String hostname, int port) { + this(instanceId, hostname, port, false); } @VisibleForTesting - public ServerRoutingInstance(String hostname, int port, TableType tableType) { - this(Helix.PREFIX_OF_SERVER_INSTANCE + hostname + "_" + port, hostname, port, tableType); + public ServerRoutingInstance(String hostname, int port) { + this(Helix.PREFIX_OF_SERVER_INSTANCE + hostname + "_" + port, hostname, port); } public String getInstanceId() { @@ -74,20 +67,17 @@ public int getPort() { return _port; } - public TableType getTableType() { - return _tableType; + public static String toShortName(String hostname) { + try { + InternetDomainName domainName = InternetDomainName.from(hostname); + return domainName.parts().get(0); + } catch (Exception e) { + return hostname; + } } public String getShortName() { - String shortHostname = SHORT_HOSTNAME_MAP.computeIfAbsent(_hostname, hostname -> { - try { - InternetDomainName domainName = InternetDomainName.from(hostname); - return domainName.parts().get(0); - } catch (Exception e) { - return hostname; - } - }); - return shortHostname + (_tableType == TableType.OFFLINE ? SHORT_OFFLINE_SUFFIX : SHORT_REALTIME_SUFFIX); + return SHORT_HOSTNAME_MAP.computeIfAbsent(_hostname, ServerRoutingInstance::toShortName); } public boolean isTlsEnabled() { @@ -103,16 +93,16 @@ public boolean equals(Object o) { return false; } ServerRoutingInstance that = (ServerRoutingInstance) o; - // NOTE: Only check hostname, port and tableType for performance concern because they can identify a routing + // NOTE: Only check hostname and port for performance concern because they can identify a routing // instance within the same query - return _hostname.equals(that._hostname) && _port == that._port && _tableType == that._tableType; + return _hostname.equals(that._hostname) && _port == that._port; } @Override public int hashCode() { - // NOTE: Only check hostname, port and tableType for performance concern because they can identify a routing + // NOTE: Only check hostname and port for performance concern because they can identify a routing // instance within the same query - return 31 * 31 * _hostname.hashCode() + 31 * Integer.hashCode(_port) + _tableType.hashCode(); + return 31 * _hostname.hashCode() + Integer.hashCode(_port); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java index ed08918251a..66fe14ba315 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java @@ -19,6 +19,8 @@ package org.apache.pinot.core.query.reduce; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,9 +35,9 @@ import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.transport.ServerRoutingInstance; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants.Broker; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import org.testng.annotations.Test; @@ -50,8 +52,9 @@ public void testReduceTimeout() throws IOException { BrokerReduceService brokerReduceService = new BrokerReduceService(new PinotConfiguration(Map.of(Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2))); + String tableName = TableNameBuilder.OFFLINE.tableNameWithType("testTable"); BrokerRequest brokerRequest = - CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable GROUP BY col1"); + CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM " + tableName + " GROUP BY col1"); DataSchema dataSchema = new DataSchema(new String[]{"col1", "count(*)"}, new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG}); DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); @@ -63,11 +66,12 @@ public void testReduceTimeout() dataTableBuilder.finishRow(); } DataTable dataTable = dataTableBuilder.build(); - Map dataTableMap = new HashMap<>(); + dataTable.getMetadata().put(DataTable.MetadataKey.TABLE.getName(), tableName); + Map> dataTableMap = new HashMap<>(); int numInstances = 1000; for (int i = 0; i < numInstances; i++) { - ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i, TableType.OFFLINE); - dataTableMap.put(instance, dataTable); + ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i); + dataTableMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(dataTable); } long reduceTimeoutMs = 1; BrokerResponseNative brokerResponse = diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java index a48b12563d1..306d53c9075 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java @@ -27,7 +27,6 @@ import java.util.function.Predicate; import org.apache.pinot.common.proto.Server; import org.apache.pinot.core.transport.ServerRoutingInstance; -import org.apache.pinot.spi.config.table.TableType; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.annotations.Test; @@ -48,7 +47,7 @@ public void testThreadExceptionTransfer() { RuntimeException innerException = new RuntimeException(exceptionMessage); when(mockedResponse.next()).thenThrow(innerException); ExecutorService threadPoolService = Executors.newFixedThreadPool(1); - ServerRoutingInstance routingInstance = new ServerRoutingInstance("localhost", 9527, TableType.OFFLINE); + ServerRoutingInstance routingInstance = new ServerRoutingInstance("localhost", 9527); // supposedly we can use TestNG's annotation like @Test(expectedExceptions = { IOException.class }) to verify // here we hope to verify deeper to make sure the thrown exception is nested inside the exception assertTrue(verifyException(() -> { @@ -77,7 +76,7 @@ public Void answer(InvocationOnMock invocationOnMock) } }); final ExecutorService threadPoolService = Executors.newFixedThreadPool(1); - final ServerRoutingInstance routingInstance = new ServerRoutingInstance("localhost", 9527, TableType.OFFLINE); + final ServerRoutingInstance routingInstance = new ServerRoutingInstance("localhost", 9527); //We cannot use TestNG's annotation like @Test(expectedExceptions = { IOException.class }) to verify // because the Exception we hope to verify is nested inside the final exception. assertTrue(verifyException(() -> { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java index 5f92dd5a25a..ebc6c86e1cc 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java @@ -225,7 +225,8 @@ public void testSubmitBeforeRunning() // start is not called DataTable response = DataTableFactory.getDataTable(result.get()); assertTrue(response.getExceptions().containsKey(QueryException.SERVER_SCHEDULER_DOWN_ERROR.getErrorCode())); - assertFalse(response.getMetadata().containsKey(MetadataKey.TABLE.getName())); + // Server should always try to return the table name to which the query pertained + assertTrue(response.getMetadata().containsKey(MetadataKey.TABLE.getName())); scheduler.stop(); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 1b32149d064..b3371d8111f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -27,20 +27,24 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.common.datatable.DataTableUtils; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.scheduler.QueryScheduler; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.server.access.AccessControl; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; @@ -60,14 +64,26 @@ public class QueryRoutingTest { private static final int TEST_PORT = 12345; private static final ServerInstance SERVER_INSTANCE = new ServerInstance("localhost", TEST_PORT); - private static final ServerRoutingInstance OFFLINE_SERVER_ROUTING_INSTANCE = - SERVER_INSTANCE.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); - private static final ServerRoutingInstance REALTIME_SERVER_ROUTING_INSTANCE = - SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME, ServerInstance.RoutingType.NETTY); - private static final BrokerRequest BROKER_REQUEST = - CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable"); - private static final Map, List>> ROUTING_TABLE = - Collections.singletonMap(SERVER_INSTANCE, Pair.of(Collections.emptyList(), Collections.emptyList())); + private static final ServerRoutingInstance SERVER_ROUTING_INSTANCE = + SERVER_INSTANCE.toServerRoutingInstance(ServerInstance.RoutingType.NETTY); + private static final String TABLE_NAME = "testTable"; + private static final BrokerRequest OFFLINE_BROKER_REQUEST = CalciteSqlCompiler.compileToBrokerRequest( + "SELECT * FROM " + TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME)); + private static final BrokerRequest REALTIME_BROKER_REQUEST = CalciteSqlCompiler.compileToBrokerRequest( + "SELECT * FROM " + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME)); + + private static final ServerQueryRoutingContext OFFLINE_BROKER_REQ_CONTEXT = + new ServerQueryRoutingContext(OFFLINE_BROKER_REQUEST, Pair.of(Collections.emptyList(), Collections.emptyList()), + SERVER_ROUTING_INSTANCE); + private static final ServerQueryRoutingContext REALTIME_BROKER_REQ_CONTEXT = + new ServerQueryRoutingContext(REALTIME_BROKER_REQUEST, Pair.of(Collections.emptyList(), Collections.emptyList()), + SERVER_ROUTING_INSTANCE); + private static final Map> OFFLINE_ROUTING_TABLE = + Collections.singletonMap(SERVER_INSTANCE, List.of(OFFLINE_BROKER_REQ_CONTEXT)); + private static final Map> REALTIME_ROUTING_TABLE = + Collections.singletonMap(SERVER_INSTANCE, List.of(REALTIME_BROKER_REQ_CONTEXT)); + private static final Map> HYBRID_ROUTING_TABLE = + Collections.singletonMap(SERVER_INSTANCE, List.of(OFFLINE_BROKER_REQ_CONTEXT, REALTIME_BROKER_REQ_CONTEXT)); private QueryRouter _queryRouter; private ServerRoutingStatsManager _serverRoutingStatsManager; @@ -90,17 +106,27 @@ void deregisterServerMetrics() { } private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes) { - return getQueryServer(responseDelayMs, responseBytes, TEST_PORT); + ServerMetrics serverMetrics = mock(ServerMetrics.class); + InstanceRequestHandler handler = new InstanceRequestHandler("server01", new PinotConfiguration(), + mockQueryScheduler(responseDelayMs, responseBytes), serverMetrics, mock(AccessControl.class)); + ServerMetrics.register(serverMetrics); + return new QueryServer(TEST_PORT, null, handler); } - private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes, int port) { + private QueryServer getQueryServer(int responseDelayMs, DataTable offlineDataTable, DataTable realtimeDataTable, + int port) { ServerMetrics serverMetrics = mock(ServerMetrics.class); InstanceRequestHandler handler = new InstanceRequestHandler("server01", new PinotConfiguration(), - mockQueryScheduler(responseDelayMs, responseBytes), serverMetrics, mock(AccessControl.class)); + mockQueryScheduler(responseDelayMs, offlineDataTable, realtimeDataTable), serverMetrics, + mock(AccessControl.class)); ServerMetrics.register(serverMetrics); return new QueryServer(port, null, handler); } + private QueryServer getQueryServer(int responseDelayMs, DataTable offlineDataTable, DataTable realtimeDataTable) { + return getQueryServer(responseDelayMs, offlineDataTable, realtimeDataTable, TEST_PORT); + } + private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] responseBytes) { QueryScheduler queryScheduler = mock(QueryScheduler.class); when(queryScheduler.submit(any())).thenAnswer(invocation -> { @@ -110,60 +136,100 @@ private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] responseBy return queryScheduler; } + private QueryScheduler mockQueryScheduler(int responseDelayMs, DataTable offlineDataTable, + DataTable realtimeDataTable) { + QueryScheduler queryScheduler = mock(QueryScheduler.class); + when(queryScheduler.submit(any())).thenAnswer(invocation -> { + Thread.sleep(responseDelayMs); + // TODO(egalpin): verify if this arg is tablename + String tableName = String.valueOf(((ServerQueryRequest) invocation.getArguments()[0]).getTableName()); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableName.equals(realtimeDataTable.getMetadata().get(MetadataKey.TABLE.getName())) || tableType.equals( + DataTableUtils.inferTableType(realtimeDataTable))) { + return Futures.immediateFuture(realtimeDataTable.toBytes()); + } else if (tableName.equals(offlineDataTable.getMetadata().get(MetadataKey.TABLE.getName())) || tableType.equals( + DataTableUtils.inferTableType(offlineDataTable))) { + return Futures.immediateFuture(offlineDataTable.toBytes()); + } + return Futures.immediateFuture(new byte[0]); + }); + return queryScheduler; + } + @Test public void testValidResponse() throws Exception { - long requestId = 123; - DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); - dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); - byte[] responseBytes = dataTable.toBytes(); + long requestId = 1230; + DataTable offlineDataTable = DataTableBuilderFactory.getEmptyDataTable(); + offlineDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), + Long.toString(BrokerRequestIdUtils.getOfflineRequestId(requestId))); + offlineDataTable.getMetadata() + .put(MetadataKey.TABLE.getName(), OFFLINE_BROKER_REQUEST.getQuerySource().getTableName()); + byte[] offlineResponseBytes = offlineDataTable.toBytes(); + + DataTable realtimeDataTable = DataTableBuilderFactory.getEmptyDataTable(); + realtimeDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), + Long.toString(BrokerRequestIdUtils.getRealtimeRequestId(requestId))); + realtimeDataTable.getMetadata() + .put(MetadataKey.TABLE.getName(), REALTIME_BROKER_REQUEST.getQuerySource().getTableName()); + byte[] realtimeResponseBytes = realtimeDataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server - QueryServer queryServer = getQueryServer(0, responseBytes); + QueryServer queryServer = getQueryServer(0, offlineDataTable, realtimeDataTable); queryServer.start(); // OFFLINE only AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 600_000L); - Map response = asyncQueryResponse.getFinalResponses(); + _queryRouter.submitQuery(requestId, "testTable", OFFLINE_ROUTING_TABLE, 600_000L); + Map> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); - ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); + assertEquals(response.get(SERVER_ROUTING_INSTANCE).size(), 1); + ServerResponse serverResponse = response.get(SERVER_ROUTING_INSTANCE).get(0); assertNotNull(serverResponse.getDataTable()); - assertEquals(serverResponse.getResponseSize(), responseBytes.length); + assertEquals(serverResponse.getResponseSize(), offlineResponseBytes.length); // 2 requests - query submit and query response. _requestCount += 2; waitForStatsUpdate(_requestCount); assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // REALTIME only - asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", null, null, BROKER_REQUEST, ROUTING_TABLE, 1_000L); + asyncQueryResponse = _queryRouter.submitQuery(requestId, "testTable", REALTIME_ROUTING_TABLE, 1_000L); response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE)); - serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); + assertEquals(response.get(SERVER_ROUTING_INSTANCE).size(), 1); + serverResponse = response.get(SERVER_ROUTING_INSTANCE).get(0); assertNotNull(serverResponse.getDataTable()); - assertEquals(serverResponse.getResponseSize(), responseBytes.length); + assertEquals(TableType.REALTIME, DataTableUtils.inferTableType(serverResponse.getDataTable())); + assertEquals(serverResponse.getResponseSize(), realtimeResponseBytes.length); _requestCount += 2; waitForStatsUpdate(_requestCount); assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // Hybrid - asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, BROKER_REQUEST, ROUTING_TABLE, - 1_000L); + asyncQueryResponse = _queryRouter.submitQuery(requestId, "testTable", HYBRID_ROUTING_TABLE, 1_000L); response = asyncQueryResponse.getFinalResponses(); - assertEquals(response.size(), 2); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); - serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); - assertNotNull(serverResponse.getDataTable()); - assertEquals(serverResponse.getResponseSize(), responseBytes.length); - assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE)); - serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE); - assertNotNull(serverResponse.getDataTable()); - assertEquals(serverResponse.getResponseSize(), responseBytes.length); + assertEquals(response.size(), 1); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); + assertEquals(response.get(SERVER_ROUTING_INSTANCE).size(), 2); + + int accountedFor = 0; + for (ServerResponse serverResponse1 : response.get(SERVER_ROUTING_INSTANCE)) { + assertNotNull(serverResponse1.getDataTable()); + if (serverResponse1.getDataTable().getMetadata().get(MetadataKey.TABLE.getName()) + .equals(offlineDataTable.getMetadata().get(MetadataKey.TABLE.getName()))) { + assertEquals(serverResponse1.getResponseSize(), offlineResponseBytes.length); + accountedFor++; + } else if (serverResponse1.getDataTable().getMetadata().get(MetadataKey.TABLE.getName()) + .equals(realtimeDataTable.getMetadata().get(MetadataKey.TABLE.getName()))) { + assertEquals(serverResponse1.getResponseSize(), realtimeResponseBytes.length); + accountedFor++; + } + } + assertEquals(accountedFor, 2, "Hybrid should have created 1 realtime and 1 offline request/response"); _requestCount += 4; waitForStatsUpdate(_requestCount); assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); @@ -184,11 +250,12 @@ public void testInvalidResponse() long startTimeMs = System.currentTimeMillis(); AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); - Map response = asyncQueryResponse.getFinalResponses(); + _queryRouter.submitQuery(requestId, "testTable", OFFLINE_ROUTING_TABLE, 1_000L); + Map> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); - ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); + assertEquals(response.get(SERVER_ROUTING_INSTANCE).size(), 1); + ServerResponse serverResponse = response.get(SERVER_ROUTING_INSTANCE).get(0); assertNull(serverResponse.getDataTable()); assertEquals(serverResponse.getResponseDelayMs(), -1); assertEquals(serverResponse.getResponseSize(), 0); @@ -222,10 +289,10 @@ public void testLatencyForQueryServerException() // Send a query with ServerSide exception and check if the latency is set to timeout value. Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); - Map response = asyncQueryResponse.getFinalResponses(); + _queryRouter.submitQuery(requestId, "testTable", OFFLINE_ROUTING_TABLE, 1_000L); + Map> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); _requestCount += 2; waitForStatsUpdate(_requestCount); @@ -251,7 +318,7 @@ public void testLatencyForQueryServerException() @Test public void testLatencyForClientException() throws Exception { - long requestId = 123; + long requestId = 1230; DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); Exception exception = new UnsupportedOperationException("Caught exception."); @@ -268,11 +335,12 @@ public void testLatencyForClientException() Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); - Map response = asyncQueryResponse.getFinalResponses(); + _queryRouter.submitQuery(requestId, "testTable", OFFLINE_ROUTING_TABLE, 1_000L); + Map> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); - ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); + assertEquals(response.get(SERVER_ROUTING_INSTANCE).size(), 1); + ServerResponse serverResponse = response.get(SERVER_ROUTING_INSTANCE).get(0); _requestCount += 2; waitForStatsUpdate(_requestCount); @@ -295,7 +363,7 @@ public void testLatencyForClientException() @Test public void testLatencyForMultipleExceptions() throws Exception { - long requestId = 123; + long requestId = 1230; DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); Exception exception = new UnsupportedOperationException("Caught exception."); @@ -315,10 +383,10 @@ public void testLatencyForMultipleExceptions() //server-side exception is seen. Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); - Map response = asyncQueryResponse.getFinalResponses(); + _queryRouter.submitQuery(requestId, "testTable", OFFLINE_ROUTING_TABLE, 1_000L); + Map> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); _requestCount += 2; waitForStatsUpdate(_requestCount); @@ -343,7 +411,7 @@ public void testLatencyForMultipleExceptions() @Test public void testLatencyForNoException() throws Exception { - long requestId = 123; + long requestId = 1230; DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); @@ -355,11 +423,12 @@ public void testLatencyForNoException() // Send a valid query and get latency Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); - Map response = asyncQueryResponse.getFinalResponses(); + _queryRouter.submitQuery(requestId, "testTable", OFFLINE_ROUTING_TABLE, 1_000L); + Map> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); - ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); + assertEquals(response.get(SERVER_ROUTING_INSTANCE).size(), 1); + ServerResponse serverResponse = response.get(SERVER_ROUTING_INSTANCE).get(0); _requestCount += 2; waitForStatsUpdate(_requestCount); @@ -380,23 +449,30 @@ public void testLatencyForNoException() @Test public void testNonMatchingRequestId() throws Exception { - long requestId = 123; - DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); - dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); - byte[] responseBytes = dataTable.toBytes(); + long requestId = 1230; + DataTable offlineDataTable = DataTableBuilderFactory.getEmptyDataTable(); + offlineDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + offlineDataTable.getMetadata() + .put(MetadataKey.TABLE.getName(), OFFLINE_BROKER_REQUEST.getQuerySource().getTableName()); + + DataTable realtimeDataTable = DataTableBuilderFactory.getEmptyDataTable(); + realtimeDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + realtimeDataTable.getMetadata() + .put(MetadataKey.TABLE.getName(), REALTIME_BROKER_REQUEST.getQuerySource().getTableName()); String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server - QueryServer queryServer = getQueryServer(0, responseBytes); + QueryServer queryServer = getQueryServer(0, offlineDataTable, realtimeDataTable); queryServer.start(); long startTimeMs = System.currentTimeMillis(); AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); - Map response = asyncQueryResponse.getFinalResponses(); + _queryRouter.submitQuery(requestId + 1, "testTable", OFFLINE_ROUTING_TABLE, 1_000L); + Map> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); - ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); + assertEquals(response.get(SERVER_ROUTING_INSTANCE).size(), 1); + ServerResponse serverResponse = response.get(SERVER_ROUTING_INSTANCE).get(0); assertNull(serverResponse.getDataTable()); assertEquals(serverResponse.getResponseDelayMs(), -1); assertEquals(serverResponse.getResponseSize(), 0); @@ -414,30 +490,38 @@ public void testNonMatchingRequestId() @Test public void testServerDown() throws Exception { - long requestId = 123; + long requestId = 1230; // To avoid flakyness, set timeoutMs to 2000 msec. For some test runs, it can take up to // 1400 msec to mark request as failed. long timeoutMs = 2000L; - DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); - dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); - byte[] responseBytes = dataTable.toBytes(); + DataTable offlineDataTable = DataTableBuilderFactory.getEmptyDataTable(); + offlineDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + offlineDataTable.getMetadata() + .put(MetadataKey.TABLE.getName(), OFFLINE_BROKER_REQUEST.getQuerySource().getTableName()); + + DataTable realtimeDataTable = DataTableBuilderFactory.getEmptyDataTable(); + realtimeDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + realtimeDataTable.getMetadata() + .put(MetadataKey.TABLE.getName(), REALTIME_BROKER_REQUEST.getQuerySource().getTableName()); + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server - QueryServer queryServer = getQueryServer(500, responseBytes); + QueryServer queryServer = getQueryServer(500, offlineDataTable, realtimeDataTable); queryServer.start(); long startTimeMs = System.currentTimeMillis(); AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, timeoutMs); + _queryRouter.submitQuery(requestId + 1, "testTable", OFFLINE_ROUTING_TABLE, timeoutMs); // Shut down the server before getting the response queryServer.shutDown(); - Map response = asyncQueryResponse.getFinalResponses(); + Map> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); - ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); + assertEquals(response.get(SERVER_ROUTING_INSTANCE).size(), 1); + ServerResponse serverResponse = response.get(SERVER_ROUTING_INSTANCE).get(0); assertNull(serverResponse.getDataTable()); assertEquals(serverResponse.getResponseDelayMs(), -1); assertEquals(serverResponse.getResponseSize(), 0); @@ -448,15 +532,14 @@ public void testServerDown() waitForStatsUpdate(_requestCount); assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); - // Submit query after server is down startTimeMs = System.currentTimeMillis(); - asyncQueryResponse = - _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, timeoutMs); + asyncQueryResponse = _queryRouter.submitQuery(requestId + 1, "testTable", OFFLINE_ROUTING_TABLE, timeoutMs); response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); - assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); - serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + assertTrue(response.containsKey(SERVER_ROUTING_INSTANCE)); + assertEquals(response.get(SERVER_ROUTING_INSTANCE).size(), 1); + serverResponse = response.get(SERVER_ROUTING_INSTANCE).get(0); assertNull(serverResponse.getDataTable()); assertEquals(serverResponse.getSubmitDelayMs(), -1); assertEquals(serverResponse.getResponseDelayMs(), -1); @@ -478,14 +561,27 @@ public void testSkipUnavailableServer() ServerInstance serverInstance1 = new ServerInstance("localhost", port); ServerInstance serverInstance2 = new ServerInstance("localhost", port + 1); ServerRoutingInstance serverRoutingInstance1 = - serverInstance1.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); + serverInstance1.toServerRoutingInstance(ServerInstance.RoutingType.NETTY); ServerRoutingInstance serverRoutingInstance2 = - serverInstance2.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); - Map, List>> routingTable = - Map.of(serverInstance1, Pair.of(Collections.emptyList(), Collections.emptyList()), serverInstance2, - Pair.of(Collections.emptyList(), Collections.emptyList())); + serverInstance2.toServerRoutingInstance(ServerInstance.RoutingType.NETTY); +// Map, List>> routingTable = +// Map.of(serverInstance1, Pair.of(Collections.emptyList(), Collections.emptyList()), serverInstance2, +// Pair.of(Collections.emptyList(), Collections.emptyList())); - long requestId = 123; + BrokerRequest brokerRequest = + CalciteSqlCompiler.compileToBrokerRequest("SET skipUnavailableServers=true; SELECT * FROM testTable_OFFLINE"); + + ServerQueryRoutingContext offlineBrokerRequestContext1 = + new ServerQueryRoutingContext(brokerRequest, Pair.of(Collections.emptyList(), Collections.emptyList()), + serverRoutingInstance1); + ServerQueryRoutingContext offlineBrokerRequestContext2 = + new ServerQueryRoutingContext(brokerRequest, Pair.of(Collections.emptyList(), Collections.emptyList()), + serverRoutingInstance2); + Map> routingTable = + Map.of(serverInstance1, List.of(offlineBrokerRequestContext1), serverInstance2, + List.of(offlineBrokerRequestContext2)); + + long requestId = 1230; DataSchema dataSchema = new DataSchema(new String[]{"column1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}); DataTableBuilder builder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); @@ -495,25 +591,22 @@ public void testSkipUnavailableServer() DataTable dataTableSuccess = builder.build(); Map dataTableMetadata = dataTableSuccess.getMetadata(); dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); - byte[] successResponseBytes = dataTableSuccess.toBytes(); + dataTableMetadata.put(MetadataKey.TABLE.getName(), brokerRequest.getQuerySource().getTableName()); // Only start a single QueryServer, on port from serverInstance1 - QueryServer queryServer = getQueryServer(500, successResponseBytes, port); + QueryServer queryServer = getQueryServer(500, dataTableSuccess, dataTableSuccess, port); queryServer.start(); // Submit the query with skipUnavailableServers=true, the single started server should return a valid response - BrokerRequest brokerRequest = - CalciteSqlCompiler.compileToBrokerRequest("SET skipUnavailableServers=true; SELECT * FROM testTable"); long startTime = System.currentTimeMillis(); - AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", brokerRequest, routingTable, null, null, 10_000L); - Map response = asyncQueryResponse.getFinalResponses(); + AsyncQueryResponse asyncQueryResponse = _queryRouter.submitQuery(requestId, "testTable", routingTable, 10_000L); + Map> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 2); assertTrue(response.containsKey(serverRoutingInstance1)); assertTrue(response.containsKey(serverRoutingInstance2)); - ServerResponse serverResponse1 = response.get(serverRoutingInstance1); - ServerResponse serverResponse2 = response.get(serverRoutingInstance2); + ServerResponse serverResponse1 = response.get(serverRoutingInstance1).get(0); + ServerResponse serverResponse2 = response.get(serverRoutingInstance2).get(0); assertNotNull(serverResponse1.getDataTable()); assertNull(serverResponse2.getDataTable()); assertTrue(serverResponse1.getResponseDelayMs() > 500); // > response delay set by getQueryServer @@ -526,18 +619,34 @@ public void testSkipUnavailableServer() assertEquals( _serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverInstance2.getInstanceId()).intValue(), 0); + queryServer.shutDown(); + // Submit the same query without skipUnavailableServers, the servers should not return any response - brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable"); + brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable_OFFLINE"); + dataTableMetadata.put(MetadataKey.TABLE.getName(), brokerRequest.getQuerySource().getTableName()); + + offlineBrokerRequestContext1 = + new ServerQueryRoutingContext(brokerRequest, Pair.of(Collections.emptyList(), Collections.emptyList()), + serverRoutingInstance1); + offlineBrokerRequestContext2 = + new ServerQueryRoutingContext(brokerRequest, Pair.of(Collections.emptyList(), Collections.emptyList()), + serverRoutingInstance2); + routingTable = Map.of(serverInstance1, List.of(offlineBrokerRequestContext1), serverInstance2, + List.of(offlineBrokerRequestContext2)); + + // Start a new query server with updated data table + queryServer = getQueryServer(500, dataTableSuccess, dataTableSuccess, port); + queryServer.start(); startTime = System.currentTimeMillis(); - asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", brokerRequest, routingTable, null, null, 10_000L); + + asyncQueryResponse = _queryRouter.submitQuery(requestId, "testTable", routingTable, 10_000L); response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 2); assertTrue(response.containsKey(serverRoutingInstance1)); assertTrue(response.containsKey(serverRoutingInstance2)); - serverResponse1 = response.get(serverRoutingInstance1); - serverResponse2 = response.get(serverRoutingInstance2); + serverResponse1 = response.get(serverRoutingInstance1).get(0); + serverResponse2 = response.get(serverRoutingInstance2).get(0); assertNull(serverResponse1.getDataTable()); assertNull(serverResponse2.getDataTable()); assertTrue(serverResponse1.getResponseDelayMs() < 100); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java index abbbe5be56c..6f57af374c0 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java @@ -24,7 +24,6 @@ import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; -import org.apache.pinot.spi.config.table.TableType; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -68,7 +67,7 @@ public void testConnect(boolean nativeTransportEnabled) QueryRouter queryRouter = mock(QueryRouter.class); ServerRoutingInstance serverRoutingInstance = - new ServerRoutingInstance("localhost", _dummyServer.getAddress().getPort(), TableType.REALTIME); + new ServerRoutingInstance("localhost", _dummyServer.getAddress().getPort()); ServerChannels serverChannels = new ServerChannels(queryRouter, brokerMetrics, nettyConfig, null); serverChannels.connect(serverRoutingInstance); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerRoutingInstanceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerRoutingInstanceTest.java index 991cd19181e..84c67bff74f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerRoutingInstanceTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerRoutingInstanceTest.java @@ -27,6 +27,6 @@ public class ServerRoutingInstanceTest { @Test public void equalsVerifier() { EqualsVerifier.configure().forClass(ServerRoutingInstance.class) - .withOnlyTheseFields("_hostname", "_port", "_tableType").suppress(Warning.NULL_FIELDS).verify(); + .withOnlyTheseFields("_hostname", "_port").suppress(Warning.NULL_FIELDS).verify(); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java index 5b1e07eb73d..d2761be3c62 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -55,12 +56,12 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants.Broker; import org.apache.pinot.spi.utils.CommonConstants.Server; import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.intellij.lang.annotations.Language; @@ -229,14 +230,19 @@ private BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery, PlanMaker } // Broker side - Map dataTableMap = new HashMap<>(); + Map> dataTableMap = new HashMap<>(); try { // For multi-threaded BrokerReduceService, we cannot reuse the same data-table - byte[] serializedResponse = instanceResponse.toDataTable().toBytes(); - dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE), - DataTableFactory.getDataTable(serializedResponse)); - dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.REALTIME), - DataTableFactory.getDataTable(serializedResponse)); + ServerRoutingInstance serverRoutingInstance = new ServerRoutingInstance("localhost", 1234); + dataTableMap.put(serverRoutingInstance, new ArrayList<>()); + + DataTable dataTable = instanceResponse.toDataTable(); + dataTable.getMetadata().put(DataTable.MetadataKey.TABLE.getName(), + TableNameBuilder.OFFLINE.tableNameWithType(serverQueryContext.getTableName())); + dataTableMap.get(serverRoutingInstance).add(DataTableFactory.getDataTable(dataTable.toBytes())); + dataTable.getMetadata().put(DataTable.MetadataKey.TABLE.getName(), + TableNameBuilder.REALTIME.tableNameWithType(serverQueryContext.getTableName())); + dataTableMap.get(serverRoutingInstance).add(DataTableFactory.getDataTable(dataTable.toBytes())); } catch (Exception e) { throw new RuntimeException(e); } @@ -253,7 +259,7 @@ private static List getSegmentContexts(List indexS } protected BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, BrokerRequest serverBrokerRequest, - Map dataTableMap) { + Map> dataTableMap) { BrokerReduceService brokerReduceService = new BrokerReduceService(new PinotConfiguration(Map.of(Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2))); BrokerResponseNative brokerResponse = @@ -342,15 +348,15 @@ private BrokerResponseNative getBrokerResponseDistinctInstances(PinotQuery pinot } // Broker side - Map dataTableMap = new HashMap<>(); + Map> dataTableMap = new HashMap<>(); try { // For multi-threaded BrokerReduceService, we cannot reuse the same data-table byte[] serializedResponse1 = instanceResponse1.toDataTable().toBytes(); byte[] serializedResponse2 = instanceResponse2.toDataTable().toBytes(); - dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE), - DataTableFactory.getDataTable(serializedResponse1)); - dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.REALTIME), - DataTableFactory.getDataTable(serializedResponse2)); + ServerRoutingInstance serverRoutingInstance = new ServerRoutingInstance("localhost", 1234); + dataTableMap.put(serverRoutingInstance, new ArrayList<>()); + dataTableMap.get(serverRoutingInstance).add(DataTableFactory.getDataTable(serializedResponse1)); + dataTableMap.get(serverRoutingInstance).add(DataTableFactory.getDataTable(serializedResponse2)); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java index dce00515d08..ac37d71f278 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.net.URL; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -370,15 +371,17 @@ private void checkWithQueryExecutor(String query, ResultTable expected, QueryExe // Use 2 Threads for 2 data-tables // Different segments are assigned to each set of DataTables. This is necessary to simulate scenarios where // certain segments may completely be pruned on one server but not the other. - Map dataTableMap = new HashMap<>(); + Map> dataTableMap = new HashMap<>(); try { + ServerRoutingInstance serverRoutingInstance = new ServerRoutingInstance("localhost", 1234); + // For multi-threaded BrokerReduceService, we cannot reuse the same data-table byte[] serializedResponse1 = instanceResponse1.toDataTable().toBytes(); - dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE), - DataTableFactory.getDataTable(serializedResponse1)); byte[] serializedResponse2 = instanceResponse2.toDataTable().toBytes(); - dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.REALTIME), - DataTableFactory.getDataTable(serializedResponse2)); + dataTableMap.computeIfAbsent(serverRoutingInstance, k -> new ArrayList<>()) + .add(DataTableFactory.getDataTable(serializedResponse1)); + dataTableMap.computeIfAbsent(serverRoutingInstance, k -> new ArrayList<>()) + .add(DataTableFactory.getDataTable(serializedResponse2)); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java index c7a4afc51ab..fba5749c95c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java @@ -284,9 +284,8 @@ public void testQueryTracing(boolean useMultiStageQueryEngine) Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), getCountStarResult()); Assert.assertTrue(jsonNode.get("exceptions").isEmpty()); JsonNode traceInfo = jsonNode.get("traceInfo"); - Assert.assertEquals(traceInfo.size(), 2); - Assert.assertTrue(traceInfo.has("localhost_O")); - Assert.assertTrue(traceInfo.has("localhost_R")); + Assert.assertEquals(traceInfo.size(), 1); + Assert.assertTrue(traceInfo.has("localhost")); } @Test(dataProvider = "useBothQueryEngines") @@ -305,9 +304,8 @@ public void testQueryTracingWithLiteral(boolean useMultiStageQueryEngine) } Assert.assertTrue(jsonNode.get("exceptions").isEmpty()); JsonNode traceInfo = jsonNode.get("traceInfo"); - Assert.assertEquals(traceInfo.size(), 2); - Assert.assertTrue(traceInfo.has("localhost_O")); - Assert.assertTrue(traceInfo.has("localhost_R")); + Assert.assertEquals(traceInfo.size(), 1); + Assert.assertTrue(traceInfo.has("localhost")); } @Test(dataProvider = "useBothQueryEngines") diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java index 6408fd8f31d..6815e71f6ad 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java @@ -20,6 +20,8 @@ import java.io.File; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -181,7 +183,7 @@ private DataTable collectNonStreamingRequestResult(Iterator streamingResponses, DataTable nonStreamResultDataTable) throws Exception { - Map dataTableMap = new HashMap<>(); + Map> dataTableMap = new HashMap<>(); DataSchema cachedDataSchema = null; while (streamingResponses.hasNext()) { Server.ServerResponse streamingResponse = streamingResponses.next(); @@ -196,7 +198,7 @@ private void collectAndCompareResult(String sql, Iterator assertNotNull(dataTable.getDataSchema()); cachedDataSchema = dataTable.getDataSchema(); // adding them to a fake dataTableMap for reduce - dataTableMap.put(mock(ServerRoutingInstance.class), dataTable); + dataTableMap.put(mock(ServerRoutingInstance.class), Collections.singleton(dataTable)); } else { // compare result dataTable against nonStreamingResultDataTable // Process server response. @@ -207,8 +209,8 @@ private void collectAndCompareResult(String sql, Iterator DATATABLE_REDUCER_CONTEXT, mock(BrokerMetrics.class)); BrokerResponseNative nonStreamBrokerResponse = new BrokerResponseNative(); reducer.reduceAndSetResults("mytable_OFFLINE", nonStreamResultDataTable.getDataSchema(), - Map.of(mock(ServerRoutingInstance.class), nonStreamResultDataTable), nonStreamBrokerResponse, - DATATABLE_REDUCER_CONTEXT, mock(BrokerMetrics.class)); + Map.of(mock(ServerRoutingInstance.class), Collections.singleton(nonStreamResultDataTable)), + nonStreamBrokerResponse, DATATABLE_REDUCER_CONTEXT, mock(BrokerMetrics.class)); assertEquals(streamingBrokerResponse.getResultTable().getRows().size(), nonStreamBrokerResponse.getResultTable().getRows().size());