Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Race condition fix for re-subscription of real-time topic partitions. #1192

Merged
merged 4 commits into from
Sep 30, 2024

Conversation

haoxu07
Copy link
Contributor

@haoxu07 haoxu07 commented Sep 21, 2024

Summary, imperative, start upper case, don't end with a period

Recently we introduced re-subscription feature for different store versions. Leader and follower partitions from different version topics will experience re-subscription triggered by store version role change concurrently.

Problem:
Two StoreIngestionTask threads for different store versions (store version 1 and store version 2) try to do re-subscription (unsub and sub) for the same real-time topic partition concurrently. During re-subscription triggered by one store version, operation of consumer.unSubscribe to remove assignment of the topic partition and operation of consumerToConsumptionTask.get(consumer).removeDataReceiver(pubSubTopicPartition) are sequentially executed. It is possible that StoreIngestionTask thread for store version 2 got the same ConsumptionTask, but DataReceiver inside ConsumptionTask is still from store version 1). As each ConsumptionTask will only allow one DataReceiver for one particular real-time topic partition, the DataReceiver from store version 2 will not be able to be added to this ConsumptionTask.

Solution:
Using SharedKafkaConsumer level lock to protect KafkaConsumcerService for unSubscribe, SetDataReceiver, unSubscribeAll to guarantee there will one real-time partition from specific store version doing DataReceiver related assignment change.

How was this PR tested?

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

@haoxu07 haoxu07 force-pushed the raceConditionFixForResubscription branch from 912157a to 50f8772 Compare September 23, 2024 05:12
@haoxu07 haoxu07 changed the title Race condition fix for resubscription. [server] Race condition fix for re-subscription of real-time topic partitions. Sep 23, 2024
@haoxu07 haoxu07 force-pushed the raceConditionFixForResubscription branch 2 times, most recently from 489c3e7 to 74763da Compare September 23, 2024 06:00
@haoxu07 haoxu07 marked this pull request as ready for review September 23, 2024 22:33
@haoxu07 haoxu07 force-pushed the raceConditionFixForResubscription branch from 74763da to 41e81d3 Compare September 24, 2024 00:48
@haoxu07 haoxu07 force-pushed the raceConditionFixForResubscription branch from 41e81d3 to 9aeaaf8 Compare September 25, 2024 21:53
@haoxu07 haoxu07 changed the title [server] Race condition fix for re-subscription of real-time topic partitions. [server][WIP for debugging CI] Race condition fix for re-subscription of real-time topic partitions. Sep 25, 2024
@haoxu07 haoxu07 force-pushed the raceConditionFixForResubscription branch 7 times, most recently from 1a0fa1f to 28df58b Compare September 26, 2024 17:24
@haoxu07 haoxu07 changed the title [server][WIP for debugging CI] Race condition fix for re-subscription of real-time topic partitions. [server] Race condition fix for re-subscription of real-time topic partitions. Sep 26, 2024
@haoxu07 haoxu07 force-pushed the raceConditionFixForResubscription branch from 28df58b to bb726fc Compare September 26, 2024 20:25
@haoxu07 haoxu07 force-pushed the raceConditionFixForResubscription branch from b3b7174 to 0ad40ea Compare September 27, 2024 00:19
@haoxu07 haoxu07 force-pushed the raceConditionFixForResubscription branch 2 times, most recently from 2a7466d to adade07 Compare September 30, 2024 21:41
@haoxu07 haoxu07 force-pushed the raceConditionFixForResubscription branch from adade07 to 3ce8001 Compare September 30, 2024 22:26
@haoxu07 haoxu07 requested a review from lluwm September 30, 2024 22:29
Copy link
Contributor

@lluwm lluwm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @haoxu07 for the fix and it looks good to me!

@haoxu07 haoxu07 merged commit 06cc1fc into linkedin:main Sep 30, 2024
46 checks passed
gaojieliu added a commit to gaojieliu/venice that referenced this pull request Oct 7, 2024
gaojieliu added a commit that referenced this pull request Oct 7, 2024
haoxu07 added a commit that referenced this pull request Oct 30, 2024
…pic partitions. (#1263)

This improvement addresses a previously reverted #1192. The unit test in the prior fix triggered separate threads resubscribing to the same real-time topic partition, resulting in frequent resubscriptions. These frequent resubscriptions caused excessive metric emissions, leading to GC issues and causing other unit tests to fail in CI. To prevent metric emissions, we use a mocked AggKafkaConsumerServiceStats instead of passing null when instantiating KafkaConsumerService in the unit test, as KafkaConsumerService would otherwise create a real AggKafkaConsumerServiceStats when null is passed.

Previous #1192 commit message:

Recently we introduced re-subscription feature for different store versions. Leader and follower partitions from different version topics will experience re-subscription triggered by store version role change concurrently.

Problem:
Two StoreIngestionTask threads for different store versions (store version 1 and store version 2) try to do re-subscription (unsub and sub) for the same real-time topic partition concurrently. During re-subscription triggered by one store version, operation of consumer.unSubscribe to remove assignment of the topic partition and operation of consumerToConsumptionTask.get(consumer).removeDataReceiver(pubSubTopicPartition) are sequentially executed. It is possible that StoreIngestionTask thread for store version 2 got the same ConsumptionTask, but DataReceiver inside ConsumptionTask is still from store version 1). As each ConsumptionTask will only allow one DataReceiver for one particular real-time topic partition, the DataReceiver from store version 2 will not be able to be added to this ConsumptionTask.

Solution:
Using SharedKafkaConsumer level lock to protect KafkaConsumcerService for unSubscribe, SetDataReceiver, unSubscribeAll to guarantee there will one real-time partition from specific store version doing DataReceiver related assignment change.
Co-authored-by: Hao Xu <xhao@xhao-mn3.linkedin.biz>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants