-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-2128 Do Not Sleep Consumer Thread for Nack #2131
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for clarification.
Do we still need that "nack() can only be called on the consumer thread"
assertion since we don't sleep any more?
Although it is not clear why would we need it before anyway...
Thanks
Yes, it's still a limitation; we can't handle async processing in this mode because we ack all records before the failure and re-seek those after. |
+ ex.getMessage()); | ||
Set<TopicPartition> nowPaused = new HashSet<>(this.consumer.paused()); | ||
nowPaused.removeAll(alreadyPaused); | ||
this.consumer.resume(nowPaused); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we expect consumer.resume()
to succeed if consumer.pause()
threw an IllegalStateException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pause()
iterates over the collection and throws an exception if the partition is no longer assigned; therefore some partitions may have been paused; all this code is doing is resuming those that were successful. As the comment states, this code should never be executed. I added it before I removed the paused (for nack) partitions in onPartitionsRevoked()
, but decided to leave it here (with the comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for the blazing fast reaction on the bug report and the detailed explanation above!
Resolves spring-projects#2128 Suspending polling delays rebalancing; instead pause the consumer and continue polling. Check if partitions are already paused and only pause the current active partitions and resume them after the sleep interval has passed. Re-pause as necessary after a rebalance. Also tested with reporter's reproducer. **cherry-pick to 2.8.x**
... and cherry-picked to |
Resolves #2128
Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.
Re-pause as necessary after a rebalance.
Also tested with reporter's reproducer.
cherry-pick to 2.8.x