Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi stream ingestion support #13790

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

lnbest0707-uber
Copy link
Contributor

@lnbest0707-uber lnbest0707-uber commented Aug 9, 2024

feature
Reference: #13780 Design Doc

Please refer to design doc for details. TLDR:

  • Add support to ingest from multiple source by a single table
  • Use existing interface (TableConfig) to define multiple streams
  • Separate the partition id definition between Stream and Pinot segment
  • Compatible with existing stream partition auto expansion logics

Feature tested on multiple Kafka topics with different decoder format. Due to resource limitations, not able to test other upstream source e2e.

Some TODOs:

  • Validation and Limitation on multiple stream configs.
  • Standardize the usage of StreamConfig object. e.g. some are only used to get non-topic related static metadata, should use other interface.
  • Adding/removing stream support or sanity check.

@codecov-commenter
Copy link

codecov-commenter commented Aug 10, 2024

Codecov Report

Attention: Patch coverage is 59.79899% with 80 lines in your changes missing coverage. Please review.

Project coverage is 63.70%. Comparing base (59551e4) to head (1af31b2).
Report is 1195 commits behind head on master.

Files with missing lines Patch % Lines
...g/apache/pinot/spi/utils/IngestionConfigUtils.java 53.65% 16 Missing and 3 partials ⚠️
...inot/spi/stream/PartitionGroupMetadataFetcher.java 63.41% 14 Missing and 1 partial ⚠️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 71.42% 14 Missing ⚠️
...roller/helix/core/PinotTableIdealStateBuilder.java 33.33% 6 Missing ⚠️
...x/core/realtime/MissingConsumingSegmentFinder.java 70.00% 6 Missing ⚠️
...apache/pinot/controller/BaseControllerStarter.java 0.00% 5 Missing ⚠️
...r/validation/RealtimeSegmentValidationManager.java 0.00% 5 Missing ⚠️
...he/pinot/segment/local/utils/TableConfigUtils.java 50.00% 3 Missing and 2 partials ⚠️
.../helix/core/realtime/SegmentCompletionManager.java 0.00% 1 Missing ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 87.50% 1 Missing ⚠️
... and 3 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13790      +/-   ##
============================================
+ Coverage     61.75%   63.70%   +1.95%     
- Complexity      207     1535    +1328     
============================================
  Files          2436     2624     +188     
  Lines        133233   144733   +11500     
  Branches      20636    22155    +1519     
============================================
+ Hits          82274    92209    +9935     
- Misses        44911    45707     +796     
- Partials       6048     6817     +769     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.69% <59.79%> (+1.98%) ⬆️
java-21 55.27% <33.96%> (-6.36%) ⬇️
skip-bytebuffers-false 63.70% <59.79%> (+1.95%) ⬆️
skip-bytebuffers-true 55.23% <33.96%> (+27.51%) ⬆️
temurin 63.70% <59.79%> (+1.95%) ⬆️
unittests 63.70% <59.79%> (+1.95%) ⬆️
unittests1 55.40% <33.96%> (+8.51%) ⬆️
unittests2 34.28% <46.73%> (+6.55%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes ingestion real-time labels Aug 12, 2024
@@ -686,9 +686,8 @@ public void ingestionStreamConfigsTest() {
// only 1 stream config allowed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, update comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -173,15 +173,18 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N

// Only allow realtime tables with non-null stream.type and LLC consumer.type
if (tableConfig.getTableType() == TableType.REALTIME) {
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a validation to avoid upsert table creating with multiple topics for now. One of the most important reason is that upsert table requires same primary keys to be distributed to the same host. it will be a bit complex to validate whether all source topics are partitioned equally (partition key, partition counts, parttion algorithms).
there are other potential concerns including race condition consumption during reload, rebalance, pause ingestion etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea, updated.

for (int i = 0; i < streamConfigs.size(); i++) {
final int index = i;
try {
partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream().map(
Copy link
Contributor

@deemoliu deemoliu Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please elaborate on why we don't need to maintain orders for partitionIds? we use list for streamConfigs and use unordered set to store partitionids?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is overloading an existing same name function. The other one is also returning a Set<>. Usage of the output is only checking if partitionId exists instead of checking its order.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature ingestion real-time release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants