From b9df7029dcbccd69937eb25458b71b25cfbb1235 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Zaj=C4=85czkowski?= <148013+szpak@users.noreply.github.com> Date: Wed, 18 May 2022 17:37:51 +0200 Subject: [PATCH] Precise unit for sleep duration and wake time Prior to, it was required to dig a few levels into to realize they are in millis. --- .../KafkaMessageListenerContainer.java | 38 +++++++++---------- .../kafka/support/Acknowledgment.java | 21 ++++------ 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index e49e822863..4168bc02f8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -730,9 +730,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private long lastAlertAt = this.lastReceive; - private long nackSleep = -1; + private long nackSleepDurationMillis = -1; - private long nackWake; + private long nackWakeTimeMillis; private int nackIndex; @@ -1622,9 +1622,9 @@ private void doPauseConsumerIfNecessary() { } private void resumeConsumerIfNeccessary() { - if (this.nackWake > 0) { - if (System.currentTimeMillis() > this.nackWake) { - this.nackWake = 0; + if (this.nackWakeTimeMillis > 0) { + if (System.currentTimeMillis() > this.nackWakeTimeMillis) { + this.nackWakeTimeMillis = 0; this.consumer.resume(this.pausedForNack); this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack); this.pausedForNack.clear(); @@ -2207,7 +2207,7 @@ private void invokeBatchOnMessage(final ConsumerRecords records, // NOSONA invokeBatchOnMessageWithRecordsOrList(records, recordList); List> toSeek = null; - if (this.nackSleep >= 0) { + if (this.nackSleepDurationMillis >= 0) { int index = 0; toSeek = new ArrayList<>(); for (ConsumerRecord record : records) { @@ -2217,7 +2217,7 @@ private void invokeBatchOnMessage(final ConsumerRecords records, // NOSONA } } if (this.producer != null || (!this.isAnyManualAck && !this.autoCommit)) { - if (this.nackSleep < 0) { + if (this.nackSleepDurationMillis < 0) { for (ConsumerRecord record : getHighestOffsetRecords(records)) { this.acks.put(record); } @@ -2356,7 +2356,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords records) { if (this.commonRecordInterceptor != null) { this.commonRecordInterceptor.afterRecord(record, this.consumer); } - if (this.nackSleep >= 0) { + if (this.nackSleepDurationMillis >= 0) { handleNack(records, record); break; } @@ -2435,7 +2435,7 @@ private void doInvokeWithRecords(final ConsumerRecords records) { if (this.commonRecordInterceptor != null) { this.commonRecordInterceptor.afterRecord(record, this.consumer); } - if (this.nackSleep >= 0) { + if (this.nackSleepDurationMillis >= 0) { handleNack(records, record); break; } @@ -2510,8 +2510,8 @@ private boolean recordsEqual(ConsumerRecord rec1, ConsumerRecord rec } private void pauseForNackSleep() { - if (this.nackSleep > 0) { - this.nackWake = System.currentTimeMillis() + this.nackSleep; + if (this.nackSleepDurationMillis > 0) { + this.nackWakeTimeMillis = System.currentTimeMillis() + this.nackSleepDurationMillis; Set alreadyPaused = this.consumer.paused(); Collection assigned = getAssignedPartitions(); if (assigned != null) { @@ -2531,7 +2531,7 @@ private void pauseForNackSleep() { this.consumer.resume(nowPaused); } } - this.nackSleep = -1; + this.nackSleepDurationMillis = -1; } /** @@ -2626,7 +2626,7 @@ private void invokeOnMessage(final ConsumerRecord record) { checkDeser(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER); } doInvokeOnMessage(record); - if (this.nackSleep < 0 && !this.isManualImmediateAck) { + if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) { ackCurrent(record); } } @@ -3174,11 +3174,11 @@ public void acknowledge() { } @Override - public void nack(long sleep) { + public void nack(long sleepMillis) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), "nack() can only be called on the consumer thread"); - Assert.isTrue(sleep >= 0, "sleep cannot be negative"); - ListenerConsumer.this.nackSleep = sleep; + Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative"); + ListenerConsumer.this.nackSleepDurationMillis = sleepMillis; synchronized (ListenerConsumer.this) { if (ListenerConsumer.this.offsetsInThisBatch != null) { ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear()); @@ -3221,13 +3221,13 @@ public void acknowledge() { } @Override - public void nack(int index, long sleep) { + public void nack(int index, long sleepMillis) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), "nack() can only be called on the consumer thread"); - Assert.isTrue(sleep >= 0, "sleep cannot be negative"); + Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative"); Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds"); ListenerConsumer.this.nackIndex = index; - ListenerConsumer.this.nackSleep = sleep; + ListenerConsumer.this.nackSleepDurationMillis = sleepMillis; synchronized (ListenerConsumer.this) { if (ListenerConsumer.this.offsetsInThisBatch != null) { ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index 3c3673288f..c504f5671a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -37,35 +37,28 @@ public interface Acknowledgment { /** * Negatively acknowledge the current record - discard remaining records from the poll * and re-seek all partitions so that this record will be redelivered after the sleep - * time. Must be called on the consumer thread. + * time (in milliseconds). Must be called on the consumer thread. *

- * When using group management, - * {@code sleep + time spent processing the previous messages from the poll} must be - * less than the consumer {@code max.poll.interval.ms} property, to avoid a - * rebalance. - * @param sleep the time to sleep; the actual sleep time will be larger of this value + * @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger of this value * and the container's {@code maxPollInterval}, which defaults to 5 seconds. * @since 2.3 */ - default void nack(long sleep) { + default void nack(long sleepMillis) { throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment"); } /** * Negatively acknowledge the record at an index in a batch - commit the offset(s) of * records before the index and re-seek the partitions so that the record at the index - * and subsequent records will be redelivered after the sleep time. Must be called on - * the consumer thread. + * and subsequent records will be redelivered after the sleep time (in milliseconds). + * Must be called on the consumer thread. *

- * When using group management, - * {@code sleep + time spent processing the records before the index} must be less - * than the consumer {@code max.poll.interval.ms} property, to avoid a rebalance. * @param index the index of the failed record in the batch. - * @param sleep the time to sleep; the actual sleep time will be larger of this value + * @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger of this value * and the container's {@code maxPollInterval}, which defaults to 5 seconds. * @since 2.3 */ - default void nack(int index, long sleep) { + default void nack(int index, long sleepMillis) { throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment"); }