Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colocated-without-hints #13943

Merged
merged 30 commits into from
Oct 16, 2024
Merged

colocated-without-hints #13943

merged 30 commits into from
Oct 16, 2024

Conversation

gortiz
Copy link
Contributor

@gortiz gortiz commented Sep 5, 2024

Right now in order to use colocated joins users need to provide the tableOption hint for each table in the join. This hint contains several attributes and most of them are required even if the information can be obtained from the table config. In fact we explicitly verify that the provided values are the equal to what it is stored in the table config.

This PR changes this optimization to be applied automatically. Although I've been studying other solutions, the simplest solution I've found is to create a rule that:

  • If the pinot.broker.enable.partition.metadata.manager is true
    • If there is no tableOptions hint: Add one tableOptions hint with the values inferred from the table partition info
    • If there is a tableOptions
      • For each any attribute that is not set, use the values from the table partition info.

TODO:

  • Add a pinot config option and a hint to disable this automation
  • Update older tests and create new ones

@gortiz gortiz added the multi-stage Related to the multi-stage query engine label Sep 5, 2024
@codecov-commenter
Copy link

codecov-commenter commented Sep 5, 2024

Codecov Report

Attention: Patch coverage is 28.35052% with 139 lines in your changes missing coverage. Please review.

Project coverage is 63.76%. Comparing base (59551e4) to head (8512fd2).
Report is 1190 commits behind head on master.

Files with missing lines Patch % Lines
.../calcite/rel/rules/PinotImplicitTableHintRule.java 0.00% 78 Missing ⚠️
.../org/apache/pinot/query/routing/WorkerManager.java 43.93% 31 Missing and 6 partials ⚠️
.../java/org/apache/pinot/query/QueryEnvironment.java 74.28% 6 Missing and 3 partials ⚠️
...pinot/calcite/rel/hint/PinotHintStrategyTable.java 0.00% 8 Missing ⚠️
...requesthandler/MultiStageBrokerRequestHandler.java 0.00% 7 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13943      +/-   ##
============================================
+ Coverage     61.75%   63.76%   +2.01%     
- Complexity      207     1535    +1328     
============================================
  Files          2436     2624     +188     
  Lines        133233   144580   +11347     
  Branches      20636    22122    +1486     
============================================
+ Hits          82274    92189    +9915     
- Misses        44911    45589     +678     
- Partials       6048     6802     +754     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.71% <28.35%> (+2.00%) ⬆️
java-21 63.64% <28.35%> (+2.01%) ⬆️
skip-bytebuffers-false 63.75% <28.35%> (+2.00%) ⬆️
skip-bytebuffers-true 63.60% <28.35%> (+35.87%) ⬆️
temurin 63.76% <28.35%> (+2.01%) ⬆️
unittests 63.75% <28.35%> (+2.01%) ⬆️
unittests1 55.49% <28.87%> (+8.60%) ⬆️
unittests2 34.28% <7.73%> (+6.55%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@ankitsultana
Copy link
Contributor

@gortiz are you aiming to make it so that the entire plan can be colocated when possible?

From Uber side we were planning to add this feature back again this year. For our current implementation that we are using internally for a big use-case, we are able to colocate the entire plan by inferring partition keys for each stage. Though it was doing it in the physical optimization phase and hence is no longer being used in OSS.

@gortiz
Copy link
Contributor Author

gortiz commented Sep 6, 2024

The idea is that Pinot should be able to automatically infer whether joins/group by be optimized using the partition distribution. That is not the case right now. Instead users need to specify a tableOptions hint with information that is redundant (Pinot already knows it!).

So this PR adds adds implicit hints if they are not already added (and in case it is, populates the hint with default values if not provided).

Though it was doing it in the physical optimization phase and hence is no longer being used in OSS.

What do you mean with physical optimization? I don't think the term is very well defined in Pinot. Specifically we use to use it to describe how to execute stages in different workers but also it is also used to describe how to map RelNodes into Pinot operators and/or indexes used.

Assuming you mean that you are applying this optimization after calcite rules have been applied (ie in or close to WorkerManager), that is what we are doing here as well. I would love to move the distribution calculations into Calcite (given it has that concept) but this PR is simpler. The code we have right now is applying the optimization in WorkerManager, but only when it makes sense and the hint is present. What this PR does is to add the hint even if the query doesn't include it.

@gortiz gortiz changed the title colocated-without-hints: First commit with the proposed code colocated-without-hints Sep 20, 2024
pinot-query-planner/pom.xml Outdated Show resolved Hide resolved
pom.xml Outdated Show resolved Hide resolved
@@ -343,6 +343,8 @@ public static class Broker {
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER =
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false;
public static final String IMPLICIT_COLOCATE_JOIN = "pinot.broker.multistage.implicit.colocate";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added rule is to auto-fill the missing info in the table hint. Should we make the config more explicit about that? This rule will be applied even without JOIN, and leaf stage worker will process each partition in parallel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaf stage worker will process each partition in parallel

By default we set parallelism to 1, which AFAIU was the default. Therefore this should not change, am I wrong?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is partition parallelism, i.e. how many threads to use in the following stage after partition is joined. When partition info is set, each partition is processed as a separate query in the leaf stage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed that to do not provide a value for parallelism. Anyway, you are right about the naming. Do you have a suggestion for a new and more precise property name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like AUTO_HINT_PARTITION_INFO or AUTO_APPLY_PARTITION_HINT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to change this

}

if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
tableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) This is not robust, and can cause wrong behavior on real-time or hybrid table. We'll need to check both real-time and offline and apply hint when they match each other, or only one exists

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should apply the same check done in WorkerManager that verifies that there is at least one server that is fully replicated? I would suggest:

  • If the table is hybrid
  • And it is not fully replicated
  • If the query includes SET implicitColocateJoin=true, then we should fail.
  • If the query does not include SET implicitColocateJoin=true then we should not apply the optimization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How complicated is this check? Are we going to replicate a lot of logic within WorkerManager?
I was thinking only apply the hint when both tables have the same partitionColumn, partitionFunctionName and numPartitions. Then I realized another issue: we need to know if a table exists to figure out whether it is either unpartitioned, or doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to know if a table exists

What do you mean? If the table doesn't exits we would have failed at validation time, right? Do you mean to actually have segments?

My concern here is that WorkerManager fails if there is at least one partition that is not fully replicated (see

Preconditions.checkState(!fullyReplicatedServers.isEmpty(),
). Therefore if we just compare partitionColumn, partitionFunctionName and numPartitions and the table is not fully replicated, we will end up failing at WorkerManager.

My suggestion is to modify WorkerManager to provide a method similar to getPartitionTableInfo that we can call from the rule. This method should check whether partition info is equal and if for each partition there is at least one server with a full copy of the partition. The main differences with current getPartitionTableInfo is that:

  • It cannot return PartitionTableInfo, given it is an internal class the rule don't care about.
  • It cannot call getTablePartitionInfo, which doesn't only get the info but also compares values on the hint with what is stored in the routing manager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the code. Can you take a look to the new implementation that delegates on worker manager?

pom.xml Outdated Show resolved Hide resolved
pom.xml Outdated Show resolved Hide resolved
@@ -343,6 +343,8 @@ public static class Broker {
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER =
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false;
public static final String IMPLICIT_COLOCATE_JOIN = "pinot.broker.multistage.implicit.colocate";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is partition parallelism, i.e. how many threads to use in the following stage after partition is joined. When partition info is set, each partition is processed as a separate query in the leaf stage

}

if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
tableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How complicated is this check? Are we going to replicate a lot of logic within WorkerManager?
I was thinking only apply the hint when both tables have the same partitionColumn, partitionFunctionName and numPartitions. Then I realized another issue: we need to know if a table exists to figure out whether it is either unpartitioned, or doesn't exist.

@gortiz
Copy link
Contributor Author

gortiz commented Oct 2, 2024

I've just pushed a couple of changes to delegate more logic on the WorkerManager. I didn't have time to actually verify it works as expected but I'm going to be traveling tomorrow and I wanted to be sure the changes are not lost.

gortiz and others added 3 commits October 10, 2024 14:30
pom.xml Outdated Show resolved Hide resolved
@@ -343,6 +343,8 @@ public static class Broker {
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER =
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false;
public static final String IMPLICIT_COLOCATE_JOIN = "pinot.broker.multistage.implicit.colocate";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like AUTO_HINT_PARTITION_INFO or AUTO_APPLY_PARTITION_HINT?

private static TableOptions calculateTableOptions(
@Nullable RelHint relHint, TablePartitionInfo tablePartitionInfo, LogicalTableScan tableScan) {
if (relHint == null) {
return ImmutablePinotImplicitTableHintRule.TableOptions.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked out the code but cannot find this class in IDE. Is this auto generated by the annotation framework?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the class that is autogenerated by immutables.

String tableName = RelToPlanNodeConverter.getTableNameFromTableScan(tableScan);

String offlineName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
TablePartitionInfo offlineTpi = _workerManager.getTablePartitionInfo(offlineName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible scenario is that both tables exist, but only one table has partition info. In that case we should return null instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just simplify this with WorkerManager.calculatePartitionTableInfo(tableName) now that we have that function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this code to delegate on WorkerManager. The rule is now simpler and all logic (but merge) is in WorkerManager now.

@gortiz
Copy link
Contributor Author

gortiz commented Oct 11, 2024

@Jackie-Jiang Question: Should we also set pinot.broker.enable.partition.metadata.manager to true by default?

@Jackie-Jiang
Copy link
Contributor

@gortiz Yes! I think we can enable it by default

@Jackie-Jiang Jackie-Jiang added enhancement release-notes Referenced by PRs that need attention when compiling the next release notes Configuration Config changes (addition/deletion/change in behavior) labels Oct 11, 2024
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly good. Nice job extracting the common logic in WorkerManager

private static RelHint getTableOptionHint(LogicalTableScan tableScan) {
return tableScan.getHints().stream()
.filter(relHint -> relHint.hintName.equals(PinotHintOptions.TABLE_HINT_OPTIONS))
.findAny()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to pick the first hint that matches the name. The reason is that user might override the same hint multiple times, and the first one should be the closest one.
Suggest adding this as a util method in PinotHintStrategyTable

// there is a hint, check fill default data and obtain the partition parallelism if supplied
Map<String, String> kvOptions = relHint.kvOptions;

ImmutableTableOptions newTableOptions = ImmutableTableOptions.copyOf(implicitTableOptions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we override here? I think we just want to validate if the explicit hint matches the implicit one, and always use the implicit one? Partition parallelism should always come from explicit hint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is here in case there is an explicit hint, in which case the explicit value provided by the writer wins. That is what is called override here. The base ImmutableTableOptions is the one calculated from the table and does not include partition information. Then for each dimension on TableOptions (key, function, etc) the implicit value is override by the explicit value provided, if any.

The code that enforces a specific value is in WorkerManager. We could move it here, but I think that can be done in another PR.

}
switch (useImplicitColocatedOptionValue.toLowerCase()) {
case "true":
Objects.requireNonNull(workerManager, "WorkerManager is required for implicit colocated join");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we run into this query path without the worker manager being set? I still think this path should be handled the same way as instance level setting. Whether worker manager is set or not should be orthogonal to whether the setting is instance level or query level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that worker manager is enabled by default is more difficult, but if pinot.broker.enable.partition.metadata.manager is set to false and then the query includes set implicitColocateJoin=true;, then we fail this check.

It is just a matter of failing fast. The writer asked to use this feature and it cannot be used because config indicates that worker manager should not be instantiated. We can just ignore the writer request, but given this is not a hint but an option, I think it is more elegant to fail with a correct message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the worker manager is read from the environment config, so whether it is null is not decided by this config, but purely based on the code path that initialized the QueryEnvironment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. WorkerManager will be null or not depending on how QueryEnvironment is initialized.

But inferPartitionHint (the new name of useImplicitColocatedOptionValue) is decided by the query option. What I'm doing here is to fail if WorkerManager was initialized to null AND the user is explicitly asking to inferPartitionHint. I think that is better than the alternative, which is to do not apply infer partition hint when explicitly asked because the worker manager wasn't enabled at startup time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the intention correctly, we want to fail the planning if partitionMetadataManager is unavailable instead of workerManager. workerManager can be null in certain query path independent of the value of partitionMetadataManager. We need to ensure query with option never hits that code path

@@ -343,6 +343,8 @@ public static class Broker {
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER =
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false;
public static final String IMPLICIT_COLOCATE_JOIN = "pinot.broker.multistage.implicit.colocate";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to change this

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
switch (useImplicitColocatedOptionValue.toLowerCase()) {
case "true":
Objects.requireNonNull(workerManager, "WorkerManager is required for implicit colocated join");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the worker manager is read from the environment config, so whether it is null is not decided by this config, but purely based on the code path that initialized the QueryEnvironment

@gortiz gortiz merged commit 77627ba into apache:master Oct 16, 2024
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) enhancement multi-stage Related to the multi-stage query engine release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants