diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index da2ab519e92..b78bdf33551 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -278,156 +278,204 @@ public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, Htt return true; } - @Override - protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, + private static class CompiledPinotQuery { + private PinotQuery _pinotQuery; + private PinotQuery _serverPinotQuery; + private Schema _schema; + private String _tableName; + private String _rawTableName; + private BrokerResponseNative _brokerResponseNative; + } + + private CompiledPinotQuery compilePinotQuery(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, - @Nullable HttpHeaders httpHeaders, AccessControl accessControl) - throws Exception { - LOGGER.debug("SQL query for request {}: {}", requestId, query); + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) { + // Compile the request into PinotQuery + CompiledPinotQuery compiledPinotQuery = new CompiledPinotQuery(); + PinotQuery pinotQuery; + try { + pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); + } catch (Exception e) { + LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage()); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); + requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); + // Check if the query is a v2 supported query + String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders); + if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) { + compiledPinotQuery._brokerResponseNative = new BrokerResponseNative( + QueryException.getException(QueryException.SQL_PARSING_ERROR, new Exception( + "It seems that the query is only supported by the multi-stage query engine, please retry the query " + + "using " + "the multi-stage query engine " + + "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"))); + return compiledPinotQuery; + } else { + compiledPinotQuery._brokerResponseNative = + new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); + return compiledPinotQuery; + } + } - //Start instrumentation context. This must not be moved further below interspersed into the code. - Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId)); + compiledPinotQuery._pinotQuery = pinotQuery; - try { - // Compile the request into PinotQuery - long compilationStartTimeNs = System.nanoTime(); - PinotQuery pinotQuery; + if (isLiteralOnlyQuery(pinotQuery)) { + LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query); try { - pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); - } catch (Exception e) { - LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage()); - _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); - requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); - // Check if the query is a v2 supported query - String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders); - if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) { - return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, new Exception( - "It seems that the query is only supported by the multi-stage query engine, please retry the query using " - + "the multi-stage query engine " - + "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"))); - } else { - return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); + if (pinotQuery.isExplain()) { + // EXPLAIN PLAN results to show that query is evaluated exclusively by Broker. + compiledPinotQuery._brokerResponseNative = BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; + return compiledPinotQuery; } + compiledPinotQuery._brokerResponseNative = processLiteralOnlyQuery(requestId, pinotQuery, requestContext); + return compiledPinotQuery; + } catch (Exception e) { + // TODO: refine the exceptions here to early termination the queries won't requires to send to servers. + LOGGER.warn("Unable to execute literal request {}: {} at broker, fallback to server query. {}", requestId, + query, e.getMessage()); } + } - if (isLiteralOnlyQuery(pinotQuery)) { - LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query); - try { - if (pinotQuery.isExplain()) { - // EXPLAIN PLAN results to show that query is evaluated exclusively by Broker. - return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; - } - return processLiteralOnlyQuery(requestId, pinotQuery, requestContext); - } catch (Exception e) { - // TODO: refine the exceptions here to early termination the queries won't requires to send to servers. - LOGGER.warn("Unable to execute literal request {}: {} at broker, fallback to server query. {}", requestId, - query, e.getMessage()); - } - } + PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery); + compiledPinotQuery._serverPinotQuery = serverPinotQuery; + DataSource dataSource = serverPinotQuery.getDataSource(); + if (dataSource == null) { + LOGGER.info("Data source (FROM clause) not found in request {}: {}", requestId, query); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + compiledPinotQuery._brokerResponseNative = new BrokerResponseNative( + QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "Data source (FROM clause) not found")); + return compiledPinotQuery; + } + if (dataSource.getJoin() != null) { + LOGGER.info("JOIN is not supported in request {}: {}", requestId, query); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + compiledPinotQuery._brokerResponseNative = new BrokerResponseNative( + QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "JOIN is not supported")); + return compiledPinotQuery; + } + if (dataSource.getTableName() == null) { + LOGGER.info("Table name not found in request {}: {}", requestId, query); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + compiledPinotQuery._brokerResponseNative = new BrokerResponseNative( + QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "Table name not found")); + return compiledPinotQuery; + } - PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery); - DataSource dataSource = serverPinotQuery.getDataSource(); - if (dataSource == null) { - LOGGER.info("Data source (FROM clause) not found in request {}: {}", requestId, query); - requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - return new BrokerResponseNative( - QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "Data source (FROM clause) not found")); - } - if (dataSource.getJoin() != null) { - LOGGER.info("JOIN is not supported in request {}: {}", requestId, query); - requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - return new BrokerResponseNative( - QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "JOIN is not supported")); - } - if (dataSource.getTableName() == null) { - LOGGER.info("Table name not found in request {}: {}", requestId, query); - requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - return new BrokerResponseNative( - QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "Table name not found")); - } + try { + handleSubquery(serverPinotQuery, requestId, request, requesterIdentity, requestContext, httpHeaders, + accessControl); + } catch (Exception e) { + LOGGER.info("Caught exception while handling the subquery in request {}: {}, {}", requestId, query, + e.getMessage()); + requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE); + compiledPinotQuery._brokerResponseNative = + new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + return compiledPinotQuery; + } - try { - handleSubquery(serverPinotQuery, requestId, request, requesterIdentity, requestContext, httpHeaders, - accessControl); - } catch (Exception e) { - LOGGER.info("Caught exception while handling the subquery in request {}: {}, {}", requestId, query, + boolean ignoreCase = _tableCache.isIgnoreCase(); + String tableName; + try { + tableName = + getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(), httpHeaders, ignoreCase), + _tableCache); + } catch (DatabaseConflictException e) { + LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + compiledPinotQuery._brokerResponseNative = + new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); + return compiledPinotQuery; + } + compiledPinotQuery._tableName = tableName; + dataSource.setTableName(compiledPinotQuery._tableName); + String rawTableName = TableNameBuilder.extractRawTableName(compiledPinotQuery._tableName); + compiledPinotQuery._rawTableName = rawTableName; + requestContext.setTableName(rawTableName); + + try { + Map columnNameMap = _tableCache.getColumnNameMap(rawTableName); + if (columnNameMap != null) { + updateColumnNames(rawTableName, serverPinotQuery, ignoreCase, columnNameMap); + } + } catch (Exception e) { + // Throw exceptions with column in-existence error. + if (e instanceof BadQueryRequestException) { + LOGGER.info("Caught exception while checking column names in request {}: {}, {}", requestId, query, e.getMessage()); - requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE); - return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE); + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1); + compiledPinotQuery._brokerResponseNative = + new BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR, e)); + return compiledPinotQuery; } + LOGGER.warn("Caught exception while updating column names in request {}: {}, {}", requestId, query, + e.getMessage()); + } + if (_defaultHllLog2m > 0) { + handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m); + } + if (_enableQueryLimitOverride) { + handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit); + } + handleSegmentPartitionedDistinctCountOverride(serverPinotQuery, + getSegmentPartitionedColumns(_tableCache, tableName)); + if (_enableDistinctCountBitmapOverride) { + handleDistinctCountBitmapOverride(serverPinotQuery); + } - boolean ignoreCase = _tableCache.isIgnoreCase(); - String tableName; - try { - tableName = - getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(), httpHeaders, ignoreCase), - _tableCache); - } catch (DatabaseConflictException e) { - LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query); - _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); - requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); - } - dataSource.setTableName(tableName); - String rawTableName = TableNameBuilder.extractRawTableName(tableName); - requestContext.setTableName(rawTableName); + Schema schema = _tableCache.getSchema(rawTableName); + compiledPinotQuery._schema = schema; + if (schema != null) { + handleDistinctMultiValuedOverride(serverPinotQuery, schema); + } - try { - Map columnNameMap = _tableCache.getColumnNameMap(rawTableName); - if (columnNameMap != null) { - updateColumnNames(rawTableName, serverPinotQuery, ignoreCase, columnNameMap); - } - } catch (Exception e) { - // Throw exceptions with column in-existence error. - if (e instanceof BadQueryRequestException) { - LOGGER.info("Caught exception while checking column names in request {}: {}, {}", requestId, query, - e.getMessage()); - requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1); - return new BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR, e)); - } - LOGGER.warn("Caught exception while updating column names in request {}: {}, {}", requestId, query, - e.getMessage()); - } - if (_defaultHllLog2m > 0) { - handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m); - } - if (_enableQueryLimitOverride) { - handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit); - } - handleSegmentPartitionedDistinctCountOverride(serverPinotQuery, - getSegmentPartitionedColumns(_tableCache, tableName)); - if (_enableDistinctCountBitmapOverride) { - handleDistinctCountBitmapOverride(serverPinotQuery); - } + return compiledPinotQuery; + } - Schema schema = _tableCache.getSchema(rawTableName); - if (schema != null) { - handleDistinctMultiValuedOverride(serverPinotQuery, schema); - } + @Override + protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) + throws Exception { + LOGGER.debug("SQL query for request {}: {}", requestId, query); + //Start instrumentation context. This must not be moved further below interspersed into the code. + Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId)); + + try { + long compilationStartTimeNs = System.nanoTime(); + CompiledPinotQuery compiledPinotQuery = + compilePinotQuery(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, + httpHeaders, accessControl); long compilationEndTimeNs = System.nanoTime(); + + if (compiledPinotQuery._brokerResponseNative != null) { + return compiledPinotQuery._brokerResponseNative; + } + // full request compile time = compilationTimeNs + parserTimeNs - _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION, + _brokerMetrics.addPhaseTiming(compiledPinotQuery._rawTableName, BrokerQueryPhase.REQUEST_COMPILATION, (compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs()); // Second-stage table-level access control // TODO: Modify AccessControl interface to directly take PinotQuery - BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(pinotQuery); + BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(compiledPinotQuery._pinotQuery); BrokerRequest serverBrokerRequest = - serverPinotQuery == pinotQuery ? brokerRequest : CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery); + compiledPinotQuery._serverPinotQuery == compiledPinotQuery._pinotQuery ? brokerRequest + : CalciteSqlCompiler.convertToBrokerRequest(compiledPinotQuery._serverPinotQuery); AuthorizationResult authorizationResult = accessControl.authorize(requesterIdentity, serverBrokerRequest); if (authorizationResult.hasAccess()) { - authorizationResult = accessControl.authorize(httpHeaders, TargetType.TABLE, tableName, Actions.Table.QUERY); + authorizationResult = + accessControl.authorize(httpHeaders, TargetType.TABLE, compiledPinotQuery._tableName, Actions.Table.QUERY); } - _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION, + _brokerMetrics.addPhaseTiming(compiledPinotQuery._rawTableName, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - compilationEndTimeNs); if (!authorizationResult.hasAccess()) { - _brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); - LOGGER.info("Access denied for request {}: {}, table: {}, reason :{}", requestId, query, tableName, - authorizationResult.getFailureMessage()); + _brokerMetrics.addMeteredTableValue(compiledPinotQuery._tableName, + BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); + LOGGER.info("Access denied for request {}: {}, table: {}, reason :{}", requestId, query, + compiledPinotQuery._tableName, authorizationResult.getFailureMessage()); requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); String failureMessage = authorizationResult.getFailureMessage(); if (StringUtils.isNotBlank(failureMessage)) { @@ -437,7 +485,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } // Validate QPS quota - String database = DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName); + String database = DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(compiledPinotQuery._tableName); if (!_queryQuotaManager.acquireDatabase(database)) { String errorMessage = String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database); @@ -445,30 +493,31 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); } - if (!_queryQuotaManager.acquire(tableName)) { - String errorMessage = - String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, tableName); + if (!_queryQuotaManager.acquire(compiledPinotQuery._tableName)) { + String errorMessage = String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, + compiledPinotQuery._tableName); LOGGER.info(errorMessage); requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); + _brokerMetrics.addMeteredTableValue(compiledPinotQuery._rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); } // Validate the request try { - validateRequest(serverPinotQuery, _queryResponseLimit); + validateRequest(compiledPinotQuery._serverPinotQuery, _queryResponseLimit); } catch (Exception e) { LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage()); requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + _brokerMetrics.addMeteredTableValue(compiledPinotQuery._rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, + 1); return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); } - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1); + _brokerMetrics.addMeteredTableValue(compiledPinotQuery._rawTableName, BrokerMeter.QUERIES, 1); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_GLOBAL, 1); - _brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.REQUEST_SIZE, query.length()); + _brokerMetrics.addValueToTableGauge(compiledPinotQuery._rawTableName, BrokerGauge.REQUEST_SIZE, query.length()); - if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) { + if (!compiledPinotQuery._pinotQuery.isExplain() && _enableMultistageMigrationMetric) { // Check if the query is a v2 supported query database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders); // Attempt to add the query to the compile queue; drop if queue is full @@ -479,8 +528,9 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } // Get the tables hit by the request - TableRoutingContext tableRoutingContext = getTablesHitByRequest(requestId, requestContext, tableName, query); + TableRoutingContext tableRoutingContext = getTablesHitByRequest(requestId, compiledPinotQuery._tableName, query); if (tableRoutingContext._exception != null) { + requestContext.setErrorCode(tableRoutingContext._exception.getErrorCode()); return new BrokerResponseNative(tableRoutingContext._exception); } @@ -488,21 +538,24 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO HandlerContext handlerContext = getHandlerContext(tableRoutingContext._offlineTableConfig, tableRoutingContext._realtimeTableConfig); if (handlerContext._disableGroovy) { - rejectGroovyQuery(serverPinotQuery); + rejectGroovyQuery(compiledPinotQuery._serverPinotQuery); } if (handlerContext._useApproximateFunction) { - handleApproximateFunctionOverride(serverPinotQuery); + handleApproximateFunctionOverride(compiledPinotQuery._serverPinotQuery); } // Prepare OFFLINE and REALTIME requests BrokerRequest offlineBrokerRequest = - tableRoutingContext.getOfflineBrokerRequest(serverPinotQuery, schema, _tableCache, _queryOptimizer); + tableRoutingContext.getOfflineBrokerRequest(compiledPinotQuery._serverPinotQuery, compiledPinotQuery._schema, + _tableCache, _queryOptimizer); BrokerRequest realtimeBrokerRequest = - tableRoutingContext.getRealtimeBrokerRequest(serverPinotQuery, schema, _tableCache, _queryOptimizer); + tableRoutingContext.getRealtimeBrokerRequest(compiledPinotQuery._serverPinotQuery, compiledPinotQuery._schema, + _tableCache, _queryOptimizer); tableRoutingContext.setFanoutTypeAndTenants(requestContext, _tableCache); if (offlineBrokerRequest == null && realtimeBrokerRequest == null) { - return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity); + return getEmptyBrokerOnlyResponse(compiledPinotQuery._pinotQuery, requestContext, compiledPinotQuery._tableName, + requesterIdentity); } // Calculate routing table for the query @@ -523,7 +576,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } // TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables - if (realtimeBrokerRequest != null && (!pinotQuery.isExplain() || offlineBrokerRequest != null)) { + if (realtimeBrokerRequest != null && (!compiledPinotQuery._pinotQuery.isExplain() + || offlineBrokerRequest != null)) { // Don't send explain queries to realtime for OFFLINE or HYBRID tables Integer numPrunedSegments = updateRoutingTable(requestId, realtimeBrokerRequest, queryRoutingTable, unavailableSegments); @@ -553,21 +607,24 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO offlineBrokerRequest != null ? getRoutingPolicy(tableRoutingContext._offlineTableConfig) : null; errorMessage = addRoutingPolicyInErrMsg(errorMessage, realtimeRoutingPolicy, offlineRoutingPolicy); exceptions.add(QueryException.getException(QueryException.BROKER_SEGMENT_UNAVAILABLE_ERROR, errorMessage)); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_UNAVAILABLE_SEGMENTS, 1); + _brokerMetrics.addMeteredTableValue(compiledPinotQuery._rawTableName, + BrokerMeter.BROKER_RESPONSES_WITH_UNAVAILABLE_SEGMENTS, 1); } if (offlineBrokerRequest == null && realtimeBrokerRequest == null) { if (!exceptions.isEmpty()) { LOGGER.info("No server found for request {}: {}", requestId, query); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, 1); + _brokerMetrics.addMeteredTableValue(compiledPinotQuery._rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, + 1); return new BrokerResponseNative(exceptions); } else { // When all segments have been pruned, we can just return an empty response. - return getEmptyBrokerOnlyResponse(pinotQuery, requestContext, tableName, requesterIdentity); + return getEmptyBrokerOnlyResponse(compiledPinotQuery._pinotQuery, requestContext, + compiledPinotQuery._tableName, requesterIdentity); } } long routingEndTimeNs = System.nanoTime(); - _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.QUERY_ROUTING, + _brokerMetrics.addPhaseTiming(compiledPinotQuery._rawTableName, BrokerQueryPhase.QUERY_ROUTING, routingEndTimeNs - routingStartTimeNs); // Set timeout in the requests @@ -591,7 +648,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } catch (TimeoutException e) { String errorMessage = e.getMessage(); LOGGER.info("{} {}: {}", errorMessage, requestId, query); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS, 1); + _brokerMetrics.addMeteredTableValue(compiledPinotQuery._rawTableName, + BrokerMeter.REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS, 1); exceptions.add(QueryException.getException(QueryException.BROKER_TIMEOUT_ERROR, errorMessage)); return new BrokerResponseNative(exceptions); } @@ -667,28 +725,28 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } brokerResponse.setNumSegmentsPrunedByBroker(numPrunedSegmentsTotal); long executionEndTimeNs = System.nanoTime(); - _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.QUERY_EXECUTION, + _brokerMetrics.addPhaseTiming(compiledPinotQuery._rawTableName, BrokerQueryPhase.QUERY_EXECUTION, executionEndTimeNs - routingEndTimeNs); // Track number of queries with number of groups limit reached if (brokerResponse.isNumGroupsLimitReached()) { - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED, - 1); + _brokerMetrics.addMeteredTableValue(compiledPinotQuery._rawTableName, + BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED, 1); } // Set total query processing time long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis(); brokerResponse.setTimeUsedMs(totalTimeMs); augmentStatistics(requestContext, brokerResponse); - if (QueryOptionsUtils.shouldDropResults(pinotQuery.getQueryOptions())) { + if (QueryOptionsUtils.shouldDropResults(compiledPinotQuery._pinotQuery.getQueryOptions())) { brokerResponse.setResultTable(null); } - _brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs, + _brokerMetrics.addTimedTableValue(compiledPinotQuery._rawTableName, BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs, TimeUnit.MILLISECONDS); // Log query and stats - _queryLogger.log( - new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, requesterIdentity, serverStats)); + _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, compiledPinotQuery._tableName, brokerResponse, + requesterIdentity, serverStats)); return brokerResponse; } finally { @@ -787,8 +845,7 @@ public void setFanoutTypeAndTenants(RequestContext requestContext, TableCache ta } } - private TableRoutingContext getTablesHitByRequest(long requestId, RequestContext requestContext, String tableName, - String query) { + private TableRoutingContext getTablesHitByRequest(long requestId, String tableName, String query) { boolean realtimeRoutingExists = false; boolean offlineRoutingExists = false; @@ -826,13 +883,11 @@ private TableRoutingContext getTablesHitByRequest(long requestId, RequestContext if (realtimeTableConfig == null && offlineTableConfig == null) { LOGGER.info("Table not found for request {}: {}", requestId, query); tableRoutingContext._exception = QueryException.TABLE_DOES_NOT_EXIST_ERROR; - requestContext.setErrorCode(tableRoutingContext._exception.getErrorCode()); return tableRoutingContext; } LOGGER.info("No table matches for request {}: {}", requestId, query); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1); tableRoutingContext._exception = QueryException.BROKER_RESOURCE_MISSING_ERROR; - requestContext.setErrorCode(tableRoutingContext._exception.getErrorCode()); return tableRoutingContext; } @@ -1045,8 +1100,7 @@ private void handleSubquery(Expression expression, long requestId, JsonNode json // Add null handling option from broker config only if there is no override in the query if (_enableNullHandling != null) { - sqlNodeAndOptions.getOptions() - .putIfAbsent(Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, _enableNullHandling); + sqlNodeAndOptions.getOptions().putIfAbsent(QueryOptionKey.ENABLE_NULL_HANDLING, _enableNullHandling); } BrokerResponse response =