-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid unnecessary rows conversion in aggregation #11607
Avoid unnecessary rows conversion in aggregation #11607
Conversation
Codecov Report
@@ Coverage Diff @@
## master #11607 +/- ##
============================================
- Coverage 63.20% 63.06% -0.14%
- Complexity 1107 1145 +38
============================================
Files 2323 2325 +2
Lines 124465 124815 +350
Branches 18989 19136 +147
============================================
+ Hits 78672 78719 +47
- Misses 40204 40487 +283
- Partials 5589 5609 +20
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 11 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
a41c1c4
to
99b6aeb
Compare
99b6aeb
to
5f34fa6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm mostly. have a couple of questions
@@ -1065,4 +1066,14 @@ public enum JoinOverFlowMode { | |||
THROW, BREAK | |||
} | |||
} | |||
|
|||
public static class NullValuePlaceHolder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have default null value in FieldSpecs do we plan to do differently here? should we consolidate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have that in ColumnDataType
right now. Didn't change that to limit the scope of this PR
@@ -79,7 +79,7 @@ public void shouldHandleUpstreamErrorBlocks() { | |||
DataSchema outSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE}); | |||
AggregateOperator operator = | |||
new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, inSchema, calls, group, | |||
AggType.INTERMEDIATE, null, null); | |||
AggType.DIRECT, Collections.singletonList(-1), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any specific reason we change the AggType here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INTERMEDIATE is not testing the whole flow. I want to test the aggregate as well as the merge
aggFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i), true); | ||
} | ||
|
||
// Process the filter argument indices | ||
int[] filterArgIds = new int[numFunctions]; | ||
int maxFilterArgId = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we need this extra integer? isnt the filterArgIds array null/empty indicate theres no filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused initially as well, then find it is never empty from RelNode. Added max filter arg id to help quickly identify whether there is filter, and to create the cache
// value primitive type. | ||
static Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction, TransferableBlock block, | ||
DataSchema inputDataSchema, Map<String, Integer> colNameToIndexMap, int filterArgIdx) { | ||
private int[] getGroupKeyIds(List<RexExpression> groupSet) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why reordering functions? this is original getGroupSet right? (and if not cant we remove the convertRexExpressionToExpressionContext
func)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is quite different though. One retrieves the expressions, one just for the ids. I'll try to remove convertRexExpressionToExpressionContext
in a separate PR because that involves quite some change on the aggregation function
Map<ExpressionContext, BlockValSet> blockValSetMap = new HashMap<>(); | ||
for (ExpressionContext expression : expressions) { | ||
if (expression.getType().equals(ExpressionContext.Type.IDENTIFIER) && !"__PLACEHOLDER__".equals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was originally intended to get rid of the __PLACEHOLDER__
. let's factor this into a util or leave a TODO so that it's easier to replace in the future (i see 4 __PLACEHOLDER__
usage in the new impl)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to do that in the following PR very soon
protected final DataBlock _dataBlock; | ||
protected final int _index; | ||
protected final RoaringBitmap _nullBitMap; | ||
private final DataType _dataType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i knew that DataBlock is a v2 concept. but other than that any specific reason we put these in the runtime module and the RowBasedBlockValSet in the core module? (i see DataBlock is still in pinot-common, imo might be eaiser to put everything together)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say there is no specific reason. Ideally we should move all BlockValSet
together, but currently they are spread over 3 places. Can be done in a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looked again on the details. good to go and follow up later
Currently multi-stage aggregation/group-by requires both materialized rows and data block. This means we always need to do conversion whether getting the transferrable block locally (with materialized rows) or remotely (with data block).
This PR:
RowBasedBlockValSet
to supportnull
FilteredRowBasedBlockValSet
to support filtered aggregation from materialized rowsDataBlockExtractUtils
(originallyDataBlockUtils
):null
with filterAggregateOperator
,MultistageAggregationExecutor
,MultistageGroupByExecutor
: