diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 12ad735865c..0ec3374aeaa 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -234,6 +234,10 @@ public static boolean isServerReturnFinalResultKeyUnpartitioned(Map queryOptions) { + return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS)); + } + @Nullable public static String getOrderByAlgorithm(Map queryOptions) { return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java index 97ce0e4f6bb..5c82827cb4d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java @@ -124,33 +124,24 @@ protected GroupByResultsBlock getNextBlock() { // Perform aggregation group-by on all the blocks DefaultGroupByExecutor groupByExecutor; - if (groupKeyGenerator == null) { - // The group key generator should be shared across all AggregationFunctions so that agg results can be - // aligned. Given that filtered aggregations are stored as an iterable of iterables so that all filtered aggs - // with the same filter can share transform blocks, rather than a singular flat iterable in the case where - // aggs are all non-filtered, sharing a GroupKeyGenerator across all aggs cannot be accomplished by allowing - // the GroupByExecutor to have sole ownership of the GroupKeyGenerator. Therefore, we allow constructing a - // GroupByExecutor with a pre-existing GroupKeyGenerator so that the GroupKeyGenerator can be shared across - // loop iterations i.e. across all aggs. - if (aggregationInfo.isUseStarTree()) { - groupByExecutor = - new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator); - } else { - groupByExecutor = - new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator); - } - groupKeyGenerator = groupByExecutor.getGroupKeyGenerator(); + + if (aggregationInfo.isUseStarTree()) { + groupByExecutor = + new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator, + groupKeyGenerator); } else { - if (aggregationInfo.isUseStarTree()) { - groupByExecutor = - new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator, - groupKeyGenerator); - } else { - groupByExecutor = - new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator, - groupKeyGenerator); - } + groupByExecutor = + new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator, + groupKeyGenerator); } + // The group key generator should be shared across all AggregationFunctions so that agg results can be + // aligned. Given that filtered aggregations are stored as an iterable of iterables so that all filtered aggs + // with the same filter can share transform blocks, rather than a singular flat iterable in the case where + // aggs are all non-filtered, sharing a GroupKeyGenerator across all aggs cannot be accomplished by allowing + // the GroupByExecutor to have sole ownership of the GroupKeyGenerator. Therefore, we allow constructing a + // GroupByExecutor with a pre-existing GroupKeyGenerator so that the GroupKeyGenerator can be shared across + // loop iterations i.e. across all aggs. + groupKeyGenerator = groupByExecutor.getGroupKeyGenerator(); int numDocsScanned = 0; ValueBlock valueBlock; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java index 95020d0a34e..962fc742598 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java @@ -39,6 +39,7 @@ import org.apache.pinot.common.request.context.FilterContext; import org.apache.pinot.common.request.context.predicate.Predicate; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.operator.BaseProjectOperator; @@ -384,7 +385,14 @@ public static List buildFilteredAggregationInfos(SegmentContext } } - if (!nonFilteredFunctions.isEmpty()) { + if (!nonFilteredFunctions.isEmpty() || ((queryContext.getGroupByExpressions() != null) + && !QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(queryContext.getQueryOptions()))) { + // If there are no non-filtered aggregation functions for a group by query, we still add a new AggregationInfo + // with an empty AggregationFunction array and the main query filter so that the GroupByExecutor will compute all + // the groups (from the result of applying the main query filter) but no unnecessary additional aggregation will + // be done since the AggregationFunction array is empty. However, if the query option to skip empty groups is + // enabled, we don't do this in order to avoid unnecessary computation of empty groups (which can be very + // expensive if the main filter has high selectivity). AggregationFunction[] aggregationFunctions = nonFilteredFunctions.toArray(new AggregationFunction[0]); aggregationInfos.add( buildAggregationInfo(segmentContext, queryContext, aggregationFunctions, mainFilter, mainFilterOperator, diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java index 02a9a05395f..e253dce452f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java @@ -41,6 +41,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -394,9 +395,9 @@ public void testGroupBy() { @Test public void testGroupByMultipleColumns() { - String filterQuery = - "SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable GROUP BY BOOLEAN_COL, STRING_COL " - + "ORDER BY BOOLEAN_COL, STRING_COL"; + String filterQuery = "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS + + "=true; SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable GROUP BY BOOLEAN_COL, " + + "STRING_COL ORDER BY BOOLEAN_COL, STRING_COL"; String nonFilterQuery = "SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000 GROUP BY BOOLEAN_COL, STRING_COL " + "ORDER BY BOOLEAN_COL, STRING_COL"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index 48cd23ea7d9..b450612d45e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -140,11 +140,6 @@ protected void setupTenants() throws IOException { } -// @Override -// protected boolean useMultiStageQueryEngine() { -// return true; -// } - @BeforeMethod @Override public void resetMultiStage() { @@ -1043,6 +1038,46 @@ public void testMVNumericCastInFilter() throws Exception { assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asInt(), 15482); } + @Test + public void testFilteredAggregationWithNoValueMatchingAggregationFilterDefault() + throws Exception { + // Use a hint to ensure that the aggregation will not be pushed to the leaf stage, so that we can test the + // MultistageGroupByExecutor + String sqlQuery = "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */" + + "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY " + + "AirlineID"; + JsonNode result = postQuery(sqlQuery); + assertNoError(result); + // Ensure that result set is not empty + assertTrue(result.get("numRowsResultSet").asInt() > 0); + + // Ensure that the count is 0 for all groups (because the aggregation filter does not match any rows) + JsonNode rows = result.get("resultTable").get("rows"); + for (int i = 0; i < rows.size(); i++) { + assertEquals(rows.get(i).get(1).asInt(), 0); + // Ensure that the main filter was applied + assertTrue(rows.get(i).get(0).asInt() > 20000); + } + } + + @Test + public void testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption() + throws Exception { + // Use a hint to ensure that the aggregation will not be pushed to the leaf stage, so that we can test the + // MultistageGroupByExecutor + String sqlQuery = "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS + + "=true; SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */" + + "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY " + + "AirlineID"; + + JsonNode result = postQuery(sqlQuery); + assertNoError(result); + + // Result set will be empty since the aggregation filter does not match any rows, and we've set the query option to + // skip empty groups + assertEquals(result.get("numRowsResultSet").asInt(), 0); + } + @Override protected String getTableName() { return _tableName; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 25a75352f72..0fa4868179c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -3722,4 +3722,44 @@ public void testSkipIndexes(boolean useMultiStageQueryEngine) updateTableConfig(tableConfig); reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs); } + + @Test(dataProvider = "useBothQueryEngines") + public void testFilteredAggregationWithNoValueMatchingAggregationFilterDefault(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + + String sqlQuery = + "SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY " + + "AirlineID"; + + JsonNode result = postQuery(sqlQuery); + assertNoError(result); + + // Ensure that result set is not empty since all groups should be computed by default + assertTrue(result.get("numRowsResultSet").asInt() > 0); + + // Ensure that the count is 0 for all groups (because the aggregation filter does not match any rows) + JsonNode rows = result.get("resultTable").get("rows"); + for (int i = 0; i < rows.size(); i++) { + assertEquals(rows.get(i).get(1).asInt(), 0); + // Ensure that the main filter was applied + assertTrue(rows.get(i).get(0).asInt() > 20000); + } + } + + @Test(dataProvider = "useBothQueryEngines") + public void testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String sqlQuery = + "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS + "=true; " + + "SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 " + + "GROUP BY AirlineID"; + JsonNode result = postQuery(sqlQuery); + assertNoError(result); + + // Result set will be empty since the aggregation filter does not match any rows, and we've set the option to skip + // empty groups + assertEquals(result.get("numRowsResultSet").asInt(), 0); + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java index 1f3f3a20fc3..41501f69383 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java @@ -58,6 +58,7 @@ public class MultistageGroupByExecutor { private final AggType _aggType; private final DataSchema _resultSchema; private final int _numGroupsLimit; + private final boolean _filteredAggregationsSkipEmptyGroups; // Group By Result holders for each mode private final GroupByResultHolder[] _aggregateResultHolders; @@ -79,6 +80,10 @@ public MultistageGroupByExecutor(int[] groupKeyIds, AggregationFunction[] aggFun int maxInitialResultHolderCapacity = getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint); _numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint); + // By default, we compute all groups for SQL compliant results. However, we allow overriding this behavior via + // query option for improved performance. + _filteredAggregationsSkipEmptyGroups = QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(opChainMetadata); + int numFunctions = aggFunctions.length; if (!aggType.isInputIntermediateFormat()) { _aggregateResultHolders = new GroupByResultHolder[numFunctions]; @@ -241,6 +246,12 @@ private void processAggregate(TransferableBlock block) { aggFunction.aggregateGroupBySV(numMatchedRows, filteredIntKeys, groupByResultHolder, blockValSetMap); } } + if (intKeys == null && !_filteredAggregationsSkipEmptyGroups) { + // _groupIdGenerator should still have all the groups even if there are only filtered aggregates for SQL + // compliant results. However, if the query option to skip empty groups is set, we avoid this step for + // improved performance. + generateGroupByKeys(block); + } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 8ccc9dbad0b..ff81f6bc4ea 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -463,6 +463,15 @@ public static class QueryOptionKey { // executed in an Unbounded FCFS fashion. However, secondary workloads are executed in a constrainted FCFS // fashion with limited compute. public static final String IS_SECONDARY_WORKLOAD = "isSecondaryWorkload"; + + // For group by queries with only filtered aggregations (and no non-filtered aggregations), the default behavior + // is to compute all groups over the rows matching the main query filter. This ensures SQL compliant results, + // since empty groups are also expected to be returned in such queries. However, this could be quite inefficient + // if the main query does not have a filter (since a scan would be required to compute all groups). In case + // users are okay with skipping empty groups - i.e., only the groups matching at least one aggregation filter + // will be returned - this query option can be set. This is useful for performance, since indexes can be used + // for the aggregation filters and a full scan can be avoided. + public static final String FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS = "filteredAggregationsSkipEmptyGroups"; } public static class QueryOptionValue {