Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2128: Document Nack Sleep Time Limitations #2254

Merged
merged 1 commit into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1252,16 +1252,19 @@ NOTE: If you want to commit a partial batch, using `nack()`, When using transact

IMPORTANT: `nack()` can only be called on the consumer thread that invokes your listener.

With a record listener, when `nack()` is called, any pending offsets are committed, the remaing records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`.
The consumer thread can be paused before redelivery, by setting the `sleep` argument.
With a record listener, when `nack()` is called, any pending offsets are committed, the remaining records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`.
The consumer can be paused before redelivery, by setting the `sleep` argument.
This is similar functionality to throwing an exception when the container is configured with a `DefaultErrorHandler`.

When using a batch listener, you can specify the index within the batch where the failure occurred.
When `nack()` is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next `poll()`.

See <<error-handlers>> for more information.

IMPORTANT: When using partition assignment via group management, it is important to ensure the `sleep` argument (plus the time spent processing records from the previous poll) is less than the consumer `max.poll.interval.ms` property.
IMPORTANT: The consumer is paused during the sleep so that we continue to poll the broker to keep the consumer alive.
The actual sleep time, and its resolution depends on the container's `maxPollInterval` which defaults to 5 seconds.
The minimum sleep time is equal to the `maxPollInterval` and all sleep times will be a multiple of it.
For small sleep times, consider reducing the container's `maxPollInterval`.

[[container-auto-startup]]
====== Listener Container Auto Startup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public interface Acknowledgment {
* {@code sleep + time spent processing the previous messages from the poll} must be
* less than the consumer {@code max.poll.interval.ms} property, to avoid a
* rebalance.</b>
* @param sleep the time to sleep.
* @param sleep the time to sleep; the actual sleep time will be larger of this value
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @since 2.3
*/
default void nack(long sleep) {
Expand All @@ -60,7 +61,8 @@ default void nack(long sleep) {
* {@code sleep + time spent processing the records before the index} must be less
* than the consumer {@code max.poll.interval.ms} property, to avoid a rebalance.</b>
* @param index the index of the failed record in the batch.
* @param sleep the time to sleep.
* @param sleep the time to sleep; the actual sleep time will be larger of this value
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @since 2.3
*/
default void nack(int index, long sleep) {
Expand Down