Skip to content

Commit

Permalink
Compute all groups for group by queries with only filtered aggregatio…
Browse files Browse the repository at this point in the history
…ns (#14211)
  • Loading branch information
yashmayya authored Oct 17, 2024
1 parent fe47073 commit 76b219b
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ public static boolean isServerReturnFinalResultKeyUnpartitioned(Map<String, Stri
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT_KEY_UNPARTITIONED));
}

public static boolean isFilteredAggregationsSkipEmptyGroups(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS));
}

@Nullable
public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,33 +124,24 @@ protected GroupByResultsBlock getNextBlock() {

// Perform aggregation group-by on all the blocks
DefaultGroupByExecutor groupByExecutor;
if (groupKeyGenerator == null) {
// The group key generator should be shared across all AggregationFunctions so that agg results can be
// aligned. Given that filtered aggregations are stored as an iterable of iterables so that all filtered aggs
// with the same filter can share transform blocks, rather than a singular flat iterable in the case where
// aggs are all non-filtered, sharing a GroupKeyGenerator across all aggs cannot be accomplished by allowing
// the GroupByExecutor to have sole ownership of the GroupKeyGenerator. Therefore, we allow constructing a
// GroupByExecutor with a pre-existing GroupKeyGenerator so that the GroupKeyGenerator can be shared across
// loop iterations i.e. across all aggs.
if (aggregationInfo.isUseStarTree()) {
groupByExecutor =
new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator);
} else {
groupByExecutor =
new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator);
}
groupKeyGenerator = groupByExecutor.getGroupKeyGenerator();

if (aggregationInfo.isUseStarTree()) {
groupByExecutor =
new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator,
groupKeyGenerator);
} else {
if (aggregationInfo.isUseStarTree()) {
groupByExecutor =
new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator,
groupKeyGenerator);
} else {
groupByExecutor =
new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator,
groupKeyGenerator);
}
groupByExecutor =
new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator,
groupKeyGenerator);
}
// The group key generator should be shared across all AggregationFunctions so that agg results can be
// aligned. Given that filtered aggregations are stored as an iterable of iterables so that all filtered aggs
// with the same filter can share transform blocks, rather than a singular flat iterable in the case where
// aggs are all non-filtered, sharing a GroupKeyGenerator across all aggs cannot be accomplished by allowing
// the GroupByExecutor to have sole ownership of the GroupKeyGenerator. Therefore, we allow constructing a
// GroupByExecutor with a pre-existing GroupKeyGenerator so that the GroupKeyGenerator can be shared across
// loop iterations i.e. across all aggs.
groupKeyGenerator = groupByExecutor.getGroupKeyGenerator();

int numDocsScanned = 0;
ValueBlock valueBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.operator.BaseProjectOperator;
Expand Down Expand Up @@ -384,7 +385,14 @@ public static List<AggregationInfo> buildFilteredAggregationInfos(SegmentContext
}
}

if (!nonFilteredFunctions.isEmpty()) {
if (!nonFilteredFunctions.isEmpty() || ((queryContext.getGroupByExpressions() != null)
&& !QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(queryContext.getQueryOptions()))) {
// If there are no non-filtered aggregation functions for a group by query, we still add a new AggregationInfo
// with an empty AggregationFunction array and the main query filter so that the GroupByExecutor will compute all
// the groups (from the result of applying the main query filter) but no unnecessary additional aggregation will
// be done since the AggregationFunction array is empty. However, if the query option to skip empty groups is
// enabled, we don't do this in order to avoid unnecessary computation of empty groups (which can be very
// expensive if the main filter has high selectivity).
AggregationFunction[] aggregationFunctions = nonFilteredFunctions.toArray(new AggregationFunction[0]);
aggregationInfos.add(
buildAggregationInfo(segmentContext, queryContext, aggregationFunctions, mainFilter, mainFilterOperator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -394,9 +395,9 @@ public void testGroupBy() {

@Test
public void testGroupByMultipleColumns() {
String filterQuery =
"SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable GROUP BY BOOLEAN_COL, STRING_COL "
+ "ORDER BY BOOLEAN_COL, STRING_COL";
String filterQuery = "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
+ "=true; SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable GROUP BY BOOLEAN_COL, "
+ "STRING_COL ORDER BY BOOLEAN_COL, STRING_COL";
String nonFilterQuery =
"SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000 GROUP BY BOOLEAN_COL, STRING_COL "
+ "ORDER BY BOOLEAN_COL, STRING_COL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,6 @@ protected void setupTenants()
throws IOException {
}

// @Override
// protected boolean useMultiStageQueryEngine() {
// return true;
// }

@BeforeMethod
@Override
public void resetMultiStage() {
Expand Down Expand Up @@ -1043,6 +1038,46 @@ public void testMVNumericCastInFilter() throws Exception {
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asInt(), 15482);
}

@Test
public void testFilteredAggregationWithNoValueMatchingAggregationFilterDefault()
throws Exception {
// Use a hint to ensure that the aggregation will not be pushed to the leaf stage, so that we can test the
// MultistageGroupByExecutor
String sqlQuery = "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */"
+ "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY "
+ "AirlineID";
JsonNode result = postQuery(sqlQuery);
assertNoError(result);
// Ensure that result set is not empty
assertTrue(result.get("numRowsResultSet").asInt() > 0);

// Ensure that the count is 0 for all groups (because the aggregation filter does not match any rows)
JsonNode rows = result.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
assertEquals(rows.get(i).get(1).asInt(), 0);
// Ensure that the main filter was applied
assertTrue(rows.get(i).get(0).asInt() > 20000);
}
}

@Test
public void testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption()
throws Exception {
// Use a hint to ensure that the aggregation will not be pushed to the leaf stage, so that we can test the
// MultistageGroupByExecutor
String sqlQuery = "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
+ "=true; SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */"
+ "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY "
+ "AirlineID";

JsonNode result = postQuery(sqlQuery);
assertNoError(result);

// Result set will be empty since the aggregation filter does not match any rows, and we've set the query option to
// skip empty groups
assertEquals(result.get("numRowsResultSet").asInt(), 0);
}

@Override
protected String getTableName() {
return _tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3722,4 +3722,44 @@ public void testSkipIndexes(boolean useMultiStageQueryEngine)
updateTableConfig(tableConfig);
reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs);
}

@Test(dataProvider = "useBothQueryEngines")
public void testFilteredAggregationWithNoValueMatchingAggregationFilterDefault(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);

String sqlQuery =
"SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY "
+ "AirlineID";

JsonNode result = postQuery(sqlQuery);
assertNoError(result);

// Ensure that result set is not empty since all groups should be computed by default
assertTrue(result.get("numRowsResultSet").asInt() > 0);

// Ensure that the count is 0 for all groups (because the aggregation filter does not match any rows)
JsonNode rows = result.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
assertEquals(rows.get(i).get(1).asInt(), 0);
// Ensure that the main filter was applied
assertTrue(rows.get(i).get(0).asInt() > 20000);
}
}

@Test(dataProvider = "useBothQueryEngines")
public void testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String sqlQuery =
"SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS + "=true; "
+ "SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 "
+ "GROUP BY AirlineID";
JsonNode result = postQuery(sqlQuery);
assertNoError(result);

// Result set will be empty since the aggregation filter does not match any rows, and we've set the option to skip
// empty groups
assertEquals(result.get("numRowsResultSet").asInt(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class MultistageGroupByExecutor {
private final AggType _aggType;
private final DataSchema _resultSchema;
private final int _numGroupsLimit;
private final boolean _filteredAggregationsSkipEmptyGroups;

// Group By Result holders for each mode
private final GroupByResultHolder[] _aggregateResultHolders;
Expand All @@ -79,6 +80,10 @@ public MultistageGroupByExecutor(int[] groupKeyIds, AggregationFunction[] aggFun
int maxInitialResultHolderCapacity = getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
_numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint);

// By default, we compute all groups for SQL compliant results. However, we allow overriding this behavior via
// query option for improved performance.
_filteredAggregationsSkipEmptyGroups = QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(opChainMetadata);

int numFunctions = aggFunctions.length;
if (!aggType.isInputIntermediateFormat()) {
_aggregateResultHolders = new GroupByResultHolder[numFunctions];
Expand Down Expand Up @@ -241,6 +246,12 @@ private void processAggregate(TransferableBlock block) {
aggFunction.aggregateGroupBySV(numMatchedRows, filteredIntKeys, groupByResultHolder, blockValSetMap);
}
}
if (intKeys == null && !_filteredAggregationsSkipEmptyGroups) {
// _groupIdGenerator should still have all the groups even if there are only filtered aggregates for SQL
// compliant results. However, if the query option to skip empty groups is set, we avoid this step for
// improved performance.
generateGroupByKeys(block);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,15 @@ public static class QueryOptionKey {
// executed in an Unbounded FCFS fashion. However, secondary workloads are executed in a constrainted FCFS
// fashion with limited compute.
public static final String IS_SECONDARY_WORKLOAD = "isSecondaryWorkload";

// For group by queries with only filtered aggregations (and no non-filtered aggregations), the default behavior
// is to compute all groups over the rows matching the main query filter. This ensures SQL compliant results,
// since empty groups are also expected to be returned in such queries. However, this could be quite inefficient
// if the main query does not have a filter (since a scan would be required to compute all groups). In case
// users are okay with skipping empty groups - i.e., only the groups matching at least one aggregation filter
// will be returned - this query option can be set. This is useful for performance, since indexes can be used
// for the aggregation filters and a full scan can be avoided.
public static final String FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS = "filteredAggregationsSkipEmptyGroups";
}

public static class QueryOptionValue {
Expand Down

0 comments on commit 76b219b

Please sign in to comment.