Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary rows conversion in aggregation #11607

Merged
merged 1 commit into from
Sep 18, 2023

Conversation

Jackie-Jiang
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang commented Sep 17, 2023

Currently multi-stage aggregation/group-by requires both materialized rows and data block. This means we always need to do conversion whether getting the transferrable block locally (with materialized rows) or remotely (with data block).

This PR:

  • Enhanced RowBasedBlockValSet to support null
  • Added FilteredRowBasedBlockValSet to support filtered aggregation from materialized rows
  • Enhanced DataBlockExtractUtils (originally DataBlockUtils):
    • Correctly handle null with filter
    • Avoid per row extract to reduce the overhead
  • Enhanced AggregateOperator, MultistageAggregationExecutor, MultistageGroupByExecutor:
    • Use only materialized rows or data block and avoid unnecessary row conversion
    • Avoid per row column index lookup
    • Avoid extracting filtered rows multiple times when it can be shared

@codecov-commenter
Copy link

codecov-commenter commented Sep 17, 2023

Codecov Report

Merging #11607 (5f34fa6) into master (a8411c0) will decrease coverage by 0.14%.
The diff coverage is 31.23%.

@@             Coverage Diff              @@
##             master   #11607      +/-   ##
============================================
- Coverage     63.20%   63.06%   -0.14%     
- Complexity     1107     1145      +38     
============================================
  Files          2323     2325       +2     
  Lines        124465   124815     +350     
  Branches      18989    19136     +147     
============================================
+ Hits          78672    78719      +47     
- Misses        40204    40487     +283     
- Partials       5589     5609      +20     
Flag Coverage Δ
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 63.02% <31.23%> (-0.12%) ⬇️
java-17 62.92% <31.23%> (-0.15%) ⬇️
java-20 62.92% <31.23%> (-0.15%) ⬇️
temurin 63.06% <31.23%> (-0.14%) ⬇️
unittests 63.06% <31.23%> (-0.14%) ⬇️
unittests1 67.27% <31.23%> (-0.18%) ⬇️
unittests2 14.48% <0.00%> (-0.07%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
.../apache/pinot/common/datablock/DataBlockUtils.java 87.93% <ø> (+60.73%) ⬆️
...untime/operator/block/FilteredDataBlockValSet.java 0.00% <0.00%> (-34.79%) ⬇️
...va/org/apache/pinot/spi/utils/CommonConstants.java 20.43% <0.00%> (-0.69%) ⬇️
.../runtime/operator/block/DataBlockExtractUtils.java 14.28% <14.28%> (ø)
...e/pinot/core/query/reduce/RowBasedBlockValSet.java 20.50% <17.87%> (+4.78%) ⬆️
...core/query/reduce/FilteredRowBasedBlockValSet.java 22.30% <22.30%> (ø)
.../query/runtime/operator/block/DataBlockValSet.java 40.74% <58.33%> (-9.26%) ⬇️
.../pinot/query/runtime/blocks/TransferableBlock.java 75.43% <66.66%> (-0.93%) ⬇️
...inot/query/runtime/operator/AggregateOperator.java 83.15% <78.78%> (-11.25%) ⬇️
...ry/runtime/operator/MultistageGroupByExecutor.java 91.90% <92.13%> (-4.70%) ⬇️
... and 3 more

... and 11 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm mostly. have a couple of questions

@@ -1065,4 +1066,14 @@ public enum JoinOverFlowMode {
THROW, BREAK
}
}

public static class NullValuePlaceHolder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have default null value in FieldSpecs do we plan to do differently here? should we consolidate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have that in ColumnDataType right now. Didn't change that to limit the scope of this PR

@@ -79,7 +79,7 @@ public void shouldHandleUpstreamErrorBlocks() {
DataSchema outSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE});
AggregateOperator operator =
new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, inSchema, calls, group,
AggType.INTERMEDIATE, null, null);
AggType.DIRECT, Collections.singletonList(-1), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific reason we change the AggType here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INTERMEDIATE is not testing the whole flow. I want to test the aggregate as well as the merge

aggFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i), true);
}

// Process the filter argument indices
int[] filterArgIds = new int[numFunctions];
int maxFilterArgId = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we need this extra integer? isnt the filterArgIds array null/empty indicate theres no filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused initially as well, then find it is never empty from RelNode. Added max filter arg id to help quickly identify whether there is filter, and to create the cache

// value primitive type.
static Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction, TransferableBlock block,
DataSchema inputDataSchema, Map<String, Integer> colNameToIndexMap, int filterArgIdx) {
private int[] getGroupKeyIds(List<RexExpression> groupSet) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why reordering functions? this is original getGroupSet right? (and if not cant we remove the convertRexExpressionToExpressionContext func)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is quite different though. One retrieves the expressions, one just for the ids. I'll try to remove convertRexExpressionToExpressionContext in a separate PR because that involves quite some change on the aggregation function

Map<ExpressionContext, BlockValSet> blockValSetMap = new HashMap<>();
for (ExpressionContext expression : expressions) {
if (expression.getType().equals(ExpressionContext.Type.IDENTIFIER) && !"__PLACEHOLDER__".equals(
Copy link
Contributor

@walterddr walterddr Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was originally intended to get rid of the __PLACEHOLDER__. let's factor this into a util or leave a TODO so that it's easier to replace in the future (i see 4 __PLACEHOLDER__ usage in the new impl)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to do that in the following PR very soon

protected final DataBlock _dataBlock;
protected final int _index;
protected final RoaringBitmap _nullBitMap;
private final DataType _dataType;
Copy link
Contributor

@walterddr walterddr Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i knew that DataBlock is a v2 concept. but other than that any specific reason we put these in the runtime module and the RowBasedBlockValSet in the core module? (i see DataBlock is still in pinot-common, imo might be eaiser to put everything together)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say there is no specific reason. Ideally we should move all BlockValSet together, but currently they are spread over 3 places. Can be done in a separate PR

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looked again on the details. good to go and follow up later

@Jackie-Jiang Jackie-Jiang merged commit fef4e64 into apache:master Sep 18, 2023
21 checks passed
@Jackie-Jiang Jackie-Jiang deleted the avoid_ser_de_in_group_by branch September 18, 2023 20:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bugfix enhancement multi-stage Related to the multi-stage query engine performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants