diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index e49e822863..323b5c295f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -730,9 +730,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private long lastAlertAt = this.lastReceive; - private long nackSleep = -1; + private long nackSleepDurationMillis = -1; - private long nackWake; + private long nackWakeTimeMillis; private int nackIndex; @@ -1622,9 +1622,9 @@ private void doPauseConsumerIfNecessary() { } private void resumeConsumerIfNeccessary() { - if (this.nackWake > 0) { - if (System.currentTimeMillis() > this.nackWake) { - this.nackWake = 0; + if (this.nackWakeTimeMillis > 0) { + if (System.currentTimeMillis() > this.nackWakeTimeMillis) { + this.nackWakeTimeMillis = 0; this.consumer.resume(this.pausedForNack); this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack); this.pausedForNack.clear(); @@ -2207,7 +2207,7 @@ private void invokeBatchOnMessage(final ConsumerRecords records, // NOSONA invokeBatchOnMessageWithRecordsOrList(records, recordList); List> toSeek = null; - if (this.nackSleep >= 0) { + if (this.nackSleepDurationMillis >= 0) { int index = 0; toSeek = new ArrayList<>(); for (ConsumerRecord record : records) { @@ -2217,7 +2217,7 @@ private void invokeBatchOnMessage(final ConsumerRecords records, // NOSONA } } if (this.producer != null || (!this.isAnyManualAck && !this.autoCommit)) { - if (this.nackSleep < 0) { + if (this.nackSleepDurationMillis < 0) { for (ConsumerRecord record : getHighestOffsetRecords(records)) { this.acks.put(record); } @@ -2356,7 +2356,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords records) { if (this.commonRecordInterceptor != null) { this.commonRecordInterceptor.afterRecord(record, this.consumer); } - if (this.nackSleep >= 0) { + if (this.nackSleepDurationMillis >= 0) { handleNack(records, record); break; } @@ -2435,7 +2435,7 @@ private void doInvokeWithRecords(final ConsumerRecords records) { if (this.commonRecordInterceptor != null) { this.commonRecordInterceptor.afterRecord(record, this.consumer); } - if (this.nackSleep >= 0) { + if (this.nackSleepDurationMillis >= 0) { handleNack(records, record); break; } @@ -2510,8 +2510,8 @@ private boolean recordsEqual(ConsumerRecord rec1, ConsumerRecord rec } private void pauseForNackSleep() { - if (this.nackSleep > 0) { - this.nackWake = System.currentTimeMillis() + this.nackSleep; + if (this.nackSleepDurationMillis > 0) { + this.nackWakeTimeMillis = System.currentTimeMillis() + this.nackSleepDurationMillis; Set alreadyPaused = this.consumer.paused(); Collection assigned = getAssignedPartitions(); if (assigned != null) { @@ -2531,7 +2531,7 @@ private void pauseForNackSleep() { this.consumer.resume(nowPaused); } } - this.nackSleep = -1; + this.nackSleepDurationMillis = -1; } /** @@ -2626,7 +2626,7 @@ private void invokeOnMessage(final ConsumerRecord record) { checkDeser(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER); } doInvokeOnMessage(record); - if (this.nackSleep < 0 && !this.isManualImmediateAck) { + if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) { ackCurrent(record); } } @@ -3174,11 +3174,11 @@ public void acknowledge() { } @Override - public void nack(long sleep) { + public void nack(long sleepMillis) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), "nack() can only be called on the consumer thread"); - Assert.isTrue(sleep >= 0, "sleep cannot be negative"); - ListenerConsumer.this.nackSleep = sleep; + Assert.isTrue(sleepMillis >= 0, "sleep cannot be negative"); + ListenerConsumer.this.nackSleepDurationMillis = sleepMillis; synchronized (ListenerConsumer.this) { if (ListenerConsumer.this.offsetsInThisBatch != null) { ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear()); @@ -3227,7 +3227,7 @@ public void nack(int index, long sleep) { Assert.isTrue(sleep >= 0, "sleep cannot be negative"); Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds"); ListenerConsumer.this.nackIndex = index; - ListenerConsumer.this.nackSleep = sleep; + ListenerConsumer.this.nackSleepDurationMillis = sleep; synchronized (ListenerConsumer.this) { if (ListenerConsumer.this.offsetsInThisBatch != null) { ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index 3c3673288f..411fbd661c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -43,11 +43,11 @@ public interface Acknowledgment { * {@code sleep + time spent processing the previous messages from the poll} must be * less than the consumer {@code max.poll.interval.ms} property, to avoid a * rebalance. - * @param sleep the time to sleep; the actual sleep time will be larger of this value + * @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger of this value * and the container's {@code maxPollInterval}, which defaults to 5 seconds. * @since 2.3 */ - default void nack(long sleep) { + default void nack(long sleepMillis) { throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment"); }