diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index a6f0c1bda0b..a573d75015f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -436,65 +436,6 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN); } - // Get the tables hit by the request - String offlineTableName = null; - String realtimeTableName = null; - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); - if (tableType == TableType.OFFLINE) { - // Offline table - if (_routingManager.routingExists(tableName)) { - offlineTableName = tableName; - } - } else if (tableType == TableType.REALTIME) { - // Realtime table - if (_routingManager.routingExists(tableName)) { - realtimeTableName = tableName; - } - } else { - // Hybrid table (check both OFFLINE and REALTIME) - String offlineTableNameToCheck = TableNameBuilder.OFFLINE.tableNameWithType(tableName); - if (_routingManager.routingExists(offlineTableNameToCheck)) { - offlineTableName = offlineTableNameToCheck; - } - String realtimeTableNameToCheck = TableNameBuilder.REALTIME.tableNameWithType(tableName); - if (_routingManager.routingExists(realtimeTableNameToCheck)) { - realtimeTableName = realtimeTableNameToCheck; - } - } - - TableConfig offlineTableConfig = - _tableCache.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)); - TableConfig realtimeTableConfig = - _tableCache.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)); - - if (offlineTableName == null && realtimeTableName == null) { - // No table matches the request - if (realtimeTableConfig == null && offlineTableConfig == null) { - LOGGER.info("Table not found for request {}: {}", requestId, query); - requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE); - return BrokerResponseNative.TABLE_DOES_NOT_EXIST; - } - LOGGER.info("No table matches for request {}: {}", requestId, query); - requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE); - _brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1); - return BrokerResponseNative.NO_TABLE_RESULT; - } - - // Handle query rewrite that can be overridden by the table configs - if (offlineTableName == null) { - offlineTableConfig = null; - } - if (realtimeTableName == null) { - realtimeTableConfig = null; - } - HandlerContext handlerContext = getHandlerContext(offlineTableConfig, realtimeTableConfig); - if (handlerContext._disableGroovy) { - rejectGroovyQuery(serverPinotQuery); - } - if (handlerContext._useApproximateFunction) { - handleApproximateFunctionOverride(serverPinotQuery); - } - // Validate QPS quota String database = DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName); if (!_queryQuotaManager.acquireDatabase(database)) { @@ -537,84 +478,33 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } } - // Prepare OFFLINE and REALTIME requests - BrokerRequest offlineBrokerRequest = null; - BrokerRequest realtimeBrokerRequest = null; - TimeBoundaryInfo timeBoundaryInfo = null; - if (offlineTableName != null && realtimeTableName != null) { - // Time boundary info might be null when there is no segment in the offline table, query real-time side only - timeBoundaryInfo = _routingManager.getTimeBoundaryInfo(offlineTableName); - if (timeBoundaryInfo == null) { - LOGGER.debug("No time boundary info found for hybrid table: {}", rawTableName); - offlineTableName = null; - } - } - if (offlineTableName != null && realtimeTableName != null) { - // Hybrid - PinotQuery offlinePinotQuery = serverPinotQuery.deepCopy(); - offlinePinotQuery.getDataSource().setTableName(offlineTableName); - attachTimeBoundary(offlinePinotQuery, timeBoundaryInfo, true); - handleExpressionOverride(offlinePinotQuery, _tableCache.getExpressionOverrideMap(offlineTableName)); - handleTimestampIndexOverride(offlinePinotQuery, offlineTableConfig); - _queryOptimizer.optimize(offlinePinotQuery, offlineTableConfig, schema); - offlineBrokerRequest = CalciteSqlCompiler.convertToBrokerRequest(offlinePinotQuery); - - PinotQuery realtimePinotQuery = serverPinotQuery.deepCopy(); - realtimePinotQuery.getDataSource().setTableName(realtimeTableName); - attachTimeBoundary(realtimePinotQuery, timeBoundaryInfo, false); - handleExpressionOverride(realtimePinotQuery, _tableCache.getExpressionOverrideMap(realtimeTableName)); - handleTimestampIndexOverride(realtimePinotQuery, realtimeTableConfig); - _queryOptimizer.optimize(realtimePinotQuery, realtimeTableConfig, schema); - realtimeBrokerRequest = CalciteSqlCompiler.convertToBrokerRequest(realtimePinotQuery); - - requestContext.setFanoutType(RequestContext.FanoutType.HYBRID); - requestContext.setOfflineServerTenant(getServerTenant(offlineTableName)); - requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName)); - } else if (offlineTableName != null) { - // OFFLINE only - setTableName(serverBrokerRequest, offlineTableName); - handleExpressionOverride(serverPinotQuery, _tableCache.getExpressionOverrideMap(offlineTableName)); - handleTimestampIndexOverride(serverPinotQuery, offlineTableConfig); - _queryOptimizer.optimize(serverPinotQuery, offlineTableConfig, schema); - offlineBrokerRequest = serverBrokerRequest; - - requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE); - requestContext.setOfflineServerTenant(getServerTenant(offlineTableName)); - } else { - // REALTIME only - setTableName(serverBrokerRequest, realtimeTableName); - handleExpressionOverride(serverPinotQuery, _tableCache.getExpressionOverrideMap(realtimeTableName)); - handleTimestampIndexOverride(serverPinotQuery, realtimeTableConfig); - _queryOptimizer.optimize(serverPinotQuery, realtimeTableConfig, schema); - realtimeBrokerRequest = serverBrokerRequest; - - requestContext.setFanoutType(RequestContext.FanoutType.REALTIME); - requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName)); + // Get the tables hit by the request + TableRoutingContext tableRoutingContext = getTablesHitByRequest(requestId, requestContext, tableName, query); + if (tableRoutingContext._brokerResponse != null) { + return tableRoutingContext._brokerResponse; } - // Check if response can be sent without server query evaluation. - if (offlineBrokerRequest != null && isFilterAlwaysFalse(offlineBrokerRequest.getPinotQuery())) { - // We don't need to evaluate offline request - offlineBrokerRequest = null; + // Handle query rewrite that can be overridden by the table configs + HandlerContext handlerContext = + getHandlerContext(tableRoutingContext._offlineTableConfig, tableRoutingContext._realtimeTableConfig); + if (handlerContext._disableGroovy) { + rejectGroovyQuery(serverPinotQuery); } - if (realtimeBrokerRequest != null && isFilterAlwaysFalse(realtimeBrokerRequest.getPinotQuery())) { - // We don't need to evaluate realtime request - realtimeBrokerRequest = null; + if (handlerContext._useApproximateFunction) { + handleApproximateFunctionOverride(serverPinotQuery); } + // Prepare OFFLINE and REALTIME requests + BrokerRequest offlineBrokerRequest = + tableRoutingContext.getOfflineBrokerRequest(serverPinotQuery, schema, _tableCache, _queryOptimizer); + BrokerRequest realtimeBrokerRequest = + tableRoutingContext.getRealtimeBrokerRequest(serverPinotQuery, schema, _tableCache, _queryOptimizer); + tableRoutingContext.setFanoutTypeAndTenants(requestContext, _tableCache); + if (offlineBrokerRequest == null && realtimeBrokerRequest == null) { return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity); } - if (offlineBrokerRequest != null && isFilterAlwaysTrue(offlineBrokerRequest.getPinotQuery())) { - // Drop offline request filter since it is always true - offlineBrokerRequest.getPinotQuery().setFilterExpression(null); - } - if (realtimeBrokerRequest != null && isFilterAlwaysTrue(realtimeBrokerRequest.getPinotQuery())) { - // Drop realtime request filter since it is always true - realtimeBrokerRequest.getPinotQuery().setFilterExpression(null); - } - // Calculate routing table for the query // TODO: Modify RoutingManager interface to directly take PinotQuery long routingStartTimeNs = System.nanoTime(); @@ -657,8 +547,10 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } else { errorMessage = String.format("%d segments unavailable: %s", numUnavailableSegments, unavailableSegments); } - String realtimeRoutingPolicy = realtimeBrokerRequest != null ? getRoutingPolicy(realtimeTableConfig) : null; - String offlineRoutingPolicy = offlineBrokerRequest != null ? getRoutingPolicy(offlineTableConfig) : null; + String realtimeRoutingPolicy = + realtimeBrokerRequest != null ? getRoutingPolicy(tableRoutingContext._realtimeTableConfig) : null; + String offlineRoutingPolicy = + offlineBrokerRequest != null ? getRoutingPolicy(tableRoutingContext._offlineTableConfig) : null; errorMessage = addRoutingPolicyInErrMsg(errorMessage, realtimeRoutingPolicy, offlineRoutingPolicy); exceptions.add(QueryException.getException(QueryException.BROKER_SEGMENT_UNAVAILABLE_ERROR, errorMessage)); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_UNAVAILABLE_SEGMENTS, 1); @@ -689,12 +581,12 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO long remainingTimeMs = 0; try { if (offlineBrokerRequest != null) { - remainingTimeMs = - setQueryTimeout(offlineTableName, offlineBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs); + remainingTimeMs = setQueryTimeout(tableRoutingContext.getOfflineTableName(), + offlineBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs); } if (realtimeBrokerRequest != null) { - remainingTimeMs = Math.max(remainingTimeMs, - setQueryTimeout(realtimeTableName, realtimeBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs)); + remainingTimeMs = Math.max(remainingTimeMs, setQueryTimeout(tableRoutingContext.getRealtimeTableName(), + realtimeBrokerRequest.getPinotQuery().getQueryOptions(), timeSpentMs)); } } catch (TimeoutException e) { String errorMessage = e.getMessage(); @@ -710,7 +602,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO if (offlineBrokerRequest != null) { Map queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions(); - setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, offlineTableConfig); + setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, tableRoutingContext._offlineTableConfig); // Set the query option to directly return final result for single server query unless it is explicitly disabled if (numQueriesIssued == 1) { // Set the same flag in the original server request to be used in the reduce phase for hybrid table @@ -723,7 +615,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } if (realtimeBrokerRequest != null) { Map queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions(); - setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, realtimeTableConfig); + setMaxServerResponseSizeBytes(numQueriesIssued, queryOptions, tableRoutingContext._realtimeTableConfig); // Set the query option to directly return final result for single server query unless it is explicitly disabled if (numQueriesIssued == 1) { // Set the same flag in the original server request to be used in the reduce phase for hybrid table @@ -804,6 +696,166 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } } + private static class TableRoutingContext { + private TableConfig _realtimeTableConfig; + private TableConfig _offlineTableConfig; + private TimeBoundaryInfo _timeBoundaryInfo; + private BrokerResponse _brokerResponse; + + @Nullable + public String getOfflineTableName() { + return _offlineTableConfig != null ? _offlineTableConfig.getTableName() : null; + } + + @Nullable + public String getRealtimeTableName() { + return _realtimeTableConfig != null ? _realtimeTableConfig.getTableName() : null; + } + + public boolean canRouteRealtime() { + return _realtimeTableConfig != null; + } + + public boolean canRouteOffline() { + return _offlineTableConfig != null; + } + + public boolean canRouteHybrid() { + return canRouteRealtime() && canRouteOffline(); + } + + @Nullable + private BrokerRequest getBrokerRequest(@Nullable TableConfig tableConfig, PinotQuery serverPinotQuery, + Schema schema, TableCache tableCache, QueryOptimizer queryOptimizer) { + if (tableConfig == null) { + return null; + } + String tableName = tableConfig.getTableName(); + PinotQuery pinotQuery = serverPinotQuery.deepCopy(); + pinotQuery.getDataSource().setTableName(tableName); + + if (canRouteHybrid()) { + attachTimeBoundary(pinotQuery, _timeBoundaryInfo, TableNameBuilder.isOfflineTableResource(tableName)); + } + + handleExpressionOverride(pinotQuery, tableCache.getExpressionOverrideMap(tableName)); + handleTimestampIndexOverride(pinotQuery, tableCache, tableConfig); + queryOptimizer.optimize(pinotQuery, tableConfig, schema); + + if (isFilterAlwaysFalse(pinotQuery)) { + return null; + } + + if (isFilterAlwaysTrue(pinotQuery)) { + pinotQuery.setFilterExpression(null); + } + + BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(pinotQuery); + if (!canRouteHybrid()) { + brokerRequest.getQuerySource().setTableName(tableName); + } + return brokerRequest; + } + + @Nullable + public BrokerRequest getOfflineBrokerRequest(PinotQuery serverPinotQuery, Schema schema, TableCache tableCache, + QueryOptimizer queryOptimizer) { + return getBrokerRequest(_offlineTableConfig, serverPinotQuery, schema, tableCache, queryOptimizer); + } + + @Nullable + public BrokerRequest getRealtimeBrokerRequest(PinotQuery serverPinotQuery, Schema schema, TableCache tableCache, + QueryOptimizer queryOptimizer) { + return getBrokerRequest(_realtimeTableConfig, serverPinotQuery, schema, tableCache, queryOptimizer); + } + + public void setFanoutTypeAndTenants(RequestContext requestContext, TableCache tableCache) { + if (canRouteHybrid()) { + requestContext.setFanoutType(RequestContext.FanoutType.HYBRID); + } else if (canRouteOffline()) { + requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE); + } else { + requestContext.setFanoutType(RequestContext.FanoutType.REALTIME); + } + + if (canRouteOffline()) { + requestContext.setOfflineServerTenant(getServerTenant(getOfflineTableName(), tableCache)); + } + if (canRouteRealtime()) { + requestContext.setRealtimeServerTenant(getServerTenant(getRealtimeTableName(), tableCache)); + } + } + } + + private TableRoutingContext getTablesHitByRequest(long requestId, RequestContext requestContext, String tableName, + String query) { + + boolean realtimeRoutingExists = false; + boolean offlineRoutingExists = false; + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + + if (tableType == TableType.OFFLINE) { + // Offline table + if (_routingManager.routingExists(tableName)) { + offlineRoutingExists = true; + } + } else if (tableType == TableType.REALTIME) { + // Realtime table + if (_routingManager.routingExists(tableName)) { + realtimeRoutingExists = true; + } + } else { + // Hybrid table (check both OFFLINE and REALTIME) + if (_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName))) { + offlineRoutingExists = true; + } + if (_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableName))) { + realtimeRoutingExists = true; + } + } + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + TableConfig offlineTableConfig = + _tableCache.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)); + TableConfig realtimeTableConfig = + _tableCache.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)); + + TableRoutingContext tableRoutingContext = new TableRoutingContext(); + if (!offlineRoutingExists && !realtimeRoutingExists) { + // No table matches the request + if (realtimeTableConfig == null && offlineTableConfig == null) { + LOGGER.info("Table not found for request {}: {}", requestId, query); + requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE); + tableRoutingContext._brokerResponse = BrokerResponseNative.TABLE_DOES_NOT_EXIST; + return tableRoutingContext; + } + LOGGER.info("No table matches for request {}: {}", requestId, query); + requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1); + tableRoutingContext._brokerResponse = BrokerResponseNative.NO_TABLE_RESULT; + return tableRoutingContext; + } + + if (offlineRoutingExists) { + tableRoutingContext._offlineTableConfig = offlineTableConfig; + } + + if (realtimeRoutingExists) { + tableRoutingContext._realtimeTableConfig = realtimeTableConfig; + } + + if (tableRoutingContext.canRouteHybrid()) { + TimeBoundaryInfo timeBoundaryInfo = _routingManager.getTimeBoundaryInfo(offlineTableConfig.getTableName()); + if (timeBoundaryInfo == null) { + LOGGER.debug("No time boundary info found for hybrid table: {}", rawTableName); + } + + tableRoutingContext._timeBoundaryInfo = timeBoundaryInfo; + } + + return tableRoutingContext; + } + private Integer updateRoutingTable(long requestId, BrokerRequest brokerRequest, Map> routingTableResult, List unavailableSegments) { // NOTE: Routing table might be null if table is just removed @@ -871,12 +923,13 @@ private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery pinotQuery, R return brokerResponse; } - private void handleTimestampIndexOverride(PinotQuery pinotQuery, @Nullable TableConfig tableConfig) { + private static void handleTimestampIndexOverride(PinotQuery pinotQuery, TableCache tableCache, + @Nullable TableConfig tableConfig) { if (tableConfig == null || tableConfig.getFieldConfigList() == null) { return; } - Set timestampIndexColumns = _tableCache.getTimestampIndexColumns(tableConfig.getTableName()); + Set timestampIndexColumns = tableCache.getTimestampIndexColumns(tableConfig.getTableName()); if (CollectionUtils.isEmpty(timestampIndexColumns)) { return; } @@ -897,7 +950,7 @@ private void handleTimestampIndexOverride(PinotQuery pinotQuery, @Nullable Table } } - private void setTimestampIndexExpressionOverrideHints(@Nullable Expression expression, + private static void setTimestampIndexExpressionOverrideHints(@Nullable Expression expression, Set timestampIndexColumns, PinotQuery pinotQuery) { if (expression == null || expression.getFunctionCall() == null) { return; @@ -907,7 +960,7 @@ private void setTimestampIndexExpressionOverrideHints(@Nullable Expression expre case "datetrunc": String granularString = function.getOperands().get(0).getLiteral().getStringValue().toUpperCase(); Expression timeExpression = function.getOperands().get(1); - if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS" .equalsIgnoreCase( + if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS".equalsIgnoreCase( function.getOperands().get(2).getLiteral().getStringValue()))) && TimestampIndexUtils.isValidGranularity( granularString) && timeExpression.getIdentifier() != null) { String timeColumn = timeExpression.getIdentifier().getName(); @@ -926,17 +979,17 @@ private void setTimestampIndexExpressionOverrideHints(@Nullable Expression expre } /** Given a {@link PinotQuery}, check if the WHERE clause will always evaluate to false. */ - private boolean isFilterAlwaysFalse(PinotQuery pinotQuery) { + private static boolean isFilterAlwaysFalse(PinotQuery pinotQuery) { return FALSE.equals(pinotQuery.getFilterExpression()); } /** Given a {@link PinotQuery}, check if the WHERE clause will always evaluate to true. */ - private boolean isFilterAlwaysTrue(PinotQuery pinotQuery) { + private static boolean isFilterAlwaysTrue(PinotQuery pinotQuery) { return TRUE.equals(pinotQuery.getFilterExpression()); } - private String getServerTenant(String tableNameWithType) { - TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + private static String getServerTenant(String tableNameWithType, TableCache tableCache) { + TableConfig tableConfig = tableCache.getTableConfig(tableNameWithType); if (tableConfig == null) { LOGGER.debug("Table config is not available for table {}", tableNameWithType); return "unknownTenant"; @@ -1084,15 +1137,6 @@ private static boolean isMultiValueColumn(Schema tableSchema, String columnName) return dimensionFieldSpec != null && !dimensionFieldSpec.isSingleValueField(); } - /** - * Sets the table name in the given broker request. - * NOTE: Set table name in broker request because it is used for access control, query routing etc. - */ - private void setTableName(BrokerRequest brokerRequest, String tableName) { - brokerRequest.getQuerySource().setTableName(tableName); - brokerRequest.getPinotQuery().getDataSource().setTableName(tableName); - } - /** * Sets HyperLogLog log2m for DistinctCountHLL functions if not explicitly set for the given query. */ @@ -1650,7 +1694,7 @@ private static void fixColumnName(String rawTableName, Expression expression, Ma @VisibleForTesting static String getActualColumnName(String rawTableName, String columnName, @Nullable Map columnNameMap, boolean ignoreCase) { - if ("*" .equals(columnName)) { + if ("*".equals(columnName)) { return columnName; } String columnNameToCheck = trimTableName(rawTableName, columnName, ignoreCase); @@ -1838,8 +1882,12 @@ static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) { /** * Helper method to attach the time boundary to the given PinotQuery. */ - private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo timeBoundaryInfo, + private static void attachTimeBoundary(PinotQuery pinotQuery, @Nullable TimeBoundaryInfo timeBoundaryInfo, boolean isOfflineRequest) { + if (timeBoundaryInfo == null) { + return; + } + String functionName = isOfflineRequest ? FilterKind.LESS_THAN_OR_EQUAL.name() : FilterKind.GREATER_THAN.name(); String timeColumn = timeBoundaryInfo.getTimeColumn(); String timeValue = timeBoundaryInfo.getTimeValue(); @@ -1861,9 +1909,8 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t * TODO: Directly take PinotQuery */ protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - BrokerRequest serverBrokerRequest, - Map> queryRoutingTable, long timeoutMs, - ServerStats serverStats, RequestContext requestContext) + BrokerRequest serverBrokerRequest, Map> queryRoutingTable, + long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception; private String getGlobalQueryId(long requestId) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index f2f65c91e80..683bb5916db 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -671,7 +671,11 @@ private String getExternalViewPath(String tableNameWithType) { */ @Nullable @Override - public TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName) { + public TimeBoundaryInfo getTimeBoundaryInfo(@Nullable String offlineTableName) { + if (offlineTableName == null) { + return null; + } + RoutingEntry routingEntry = _routingEntryMap.get(offlineTableName); if (routingEntry == null) { return null; diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java index 7a4445df016..31735bec360 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java @@ -163,6 +163,7 @@ public void testCancelQuery() when(tableCache.getActualTableName(anyString())).thenReturn(tableName); TenantConfig tenant = new TenantConfig("tier_BROKER", "tier_SERVER", null); when(tableCfg.getTenantConfig()).thenReturn(tenant); + when(tableCfg.getTableName()).thenReturn(tableName); when(tableCache.getTableConfig(tableName)).thenReturn(tableCfg); BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class); when(routingManager.routingExists(tableName)).thenReturn(true);