From fc544cd57563adebd166b3dec03df6eaff2d08c2 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 4 May 2022 11:50:44 -0400 Subject: [PATCH] GH-2128: Document Nack Sleep Time Limitations See https://github.com/spring-projects/spring-kafka/issues/2128 **cherry-pick to 2.9.x, 2.8.x** --- spring-kafka-docs/src/main/asciidoc/kafka.adoc | 9 ++++++--- .../springframework/kafka/support/Acknowledgment.java | 6 ++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index c6fbcf2570..2931cce526 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -1252,8 +1252,8 @@ NOTE: If you want to commit a partial batch, using `nack()`, When using transact IMPORTANT: `nack()` can only be called on the consumer thread that invokes your listener. -With a record listener, when `nack()` is called, any pending offsets are committed, the remaing records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`. -The consumer thread can be paused before redelivery, by setting the `sleep` argument. +With a record listener, when `nack()` is called, any pending offsets are committed, the remaining records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`. +The consumer can be paused before redelivery, by setting the `sleep` argument. This is similar functionality to throwing an exception when the container is configured with a `DefaultErrorHandler`. When using a batch listener, you can specify the index within the batch where the failure occurred. @@ -1261,7 +1261,10 @@ When `nack()` is called, offsets will be committed for records before the index See <> for more information. -IMPORTANT: When using partition assignment via group management, it is important to ensure the `sleep` argument (plus the time spent processing records from the previous poll) is less than the consumer `max.poll.interval.ms` property. +IMPORTANT: The consumer is paused during the sleep so that we continue to poll the broker to keep the consumer alive. +The actual sleep time, and its resolution depends on the container's `maxPollInterval` which defaults to 5 seconds. +The minimum sleep time is equal to the `maxPollInterval` and all sleep times will be a multiple of it. +For small sleep times, consider reducing the container's `maxPollInterval`. [[container-auto-startup]] ====== Listener Container Auto Startup diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index e9b88cfcf8..3c3673288f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -43,7 +43,8 @@ public interface Acknowledgment { * {@code sleep + time spent processing the previous messages from the poll} must be * less than the consumer {@code max.poll.interval.ms} property, to avoid a * rebalance. - * @param sleep the time to sleep. + * @param sleep the time to sleep; the actual sleep time will be larger of this value + * and the container's {@code maxPollInterval}, which defaults to 5 seconds. * @since 2.3 */ default void nack(long sleep) { @@ -60,7 +61,8 @@ default void nack(long sleep) { * {@code sleep + time spent processing the records before the index} must be less * than the consumer {@code max.poll.interval.ms} property, to avoid a rebalance. * @param index the index of the failed record in the batch. - * @param sleep the time to sleep. + * @param sleep the time to sleep; the actual sleep time will be larger of this value + * and the container's {@code maxPollInterval}, which defaults to 5 seconds. * @since 2.3 */ default void nack(int index, long sleep) {