Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2128 Do Not Sleep Consumer Thread for Nack #2131

Merged
merged 1 commit into from
Mar 7, 2022

Conversation

garyrussell
Copy link
Contributor

Resolves #2128

Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.

Re-pause as necessary after a rebalance.

Also tested with reporter's reproducer.

cherry-pick to 2.8.x

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for clarification.
Do we still need that "nack() can only be called on the consumer thread" assertion since we don't sleep any more?

Although it is not clear why would we need it before anyway...

Thanks

@garyrussell
Copy link
Contributor Author

Yes, it's still a limitation; we can't handle async processing in this mode because we ack all records before the failure and re-seek those after.

+ ex.getMessage());
Set<TopicPartition> nowPaused = new HashSet<>(this.consumer.paused());
nowPaused.removeAll(alreadyPaused);
this.consumer.resume(nowPaused);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expect consumer.resume() to succeed if consumer.pause() threw an IllegalStateException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pause() iterates over the collection and throws an exception if the partition is no longer assigned; therefore some partitions may have been paused; all this code is doing is resuming those that were successful. As the comment states, this code should never be executed. I added it before I removed the paused (for nack) partitions in onPartitionsRevoked(), but decided to leave it here (with the comment).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the blazing fast reaction on the bug report and the detailed explanation above!

Resolves spring-projects#2128

Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.

Re-pause as necessary after a rebalance.

Also tested with reporter's reproducer.

**cherry-pick to 2.8.x**
@artembilan artembilan merged commit a1a5d48 into spring-projects:main Mar 7, 2022
@artembilan
Copy link
Member

... and cherry-picked to 2.8.x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Calling .nack() suspends consumer group rebalance
3 participants