-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add multi stream ingestion support #13790
base: master
Are you sure you want to change the base?
Add multi stream ingestion support #13790
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13790 +/- ##
============================================
+ Coverage 61.75% 63.70% +1.95%
- Complexity 207 1535 +1328
============================================
Files 2436 2624 +188
Lines 133233 144733 +11500
Branches 20636 22155 +1519
============================================
+ Hits 82274 92209 +9935
- Misses 44911 45707 +796
- Partials 6048 6817 +769
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
@@ -686,9 +686,8 @@ public void ingestionStreamConfigsTest() { | |||
// only 1 stream config allowed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, update comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -173,15 +173,18 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N | |||
|
|||
// Only allow realtime tables with non-null stream.type and LLC consumer.type | |||
if (tableConfig.getTableType() == TableType.REALTIME) { | |||
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); | |||
List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a validation to avoid upsert table creating with multiple topics for now. One of the most important reason is that upsert table requires same primary keys to be distributed to the same host. it will be a bit complex to validate whether all source topics are partitioned equally (partition key, partition counts, parttion algorithms).
there are other potential concerns including race condition consumption during reload, rebalance, pause ingestion etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea, updated.
for (int i = 0; i < streamConfigs.size(); i++) { | ||
final int index = i; | ||
try { | ||
partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream().map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please elaborate on why we don't need to maintain orders for partitionIds? we use list for streamConfigs and use unordered set to store partitionids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is overloading an existing same name function. The other one is also returning a Set<>. Usage of the output is only checking if partitionId exists instead of checking its order.
0bfb91f
to
1af31b2
Compare
feature
Reference: #13780 Design Doc
Please refer to design doc for details. TLDR:
Feature tested on multiple Kafka topics with different decoder format. Due to resource limitations, not able to test other upstream source e2e.
Some TODOs: