Skip to content

Commit

Permalink
Precise unit for sleep duration and wake time
Browse files Browse the repository at this point in the history
Prior to, it was required to dig a few levels into to realize they are in millis.
  • Loading branch information
szpak committed May 18, 2022
1 parent 7d19b94 commit 6aeac5c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -730,9 +730,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private long lastAlertAt = this.lastReceive;

private long nackSleep = -1;
private long nackSleepDurationMillis = -1;

private long nackWake;
private long nackWakeTimeMillis;

private int nackIndex;

Expand Down Expand Up @@ -1622,9 +1622,9 @@ private void doPauseConsumerIfNecessary() {
}

private void resumeConsumerIfNeccessary() {
if (this.nackWake > 0) {
if (System.currentTimeMillis() > this.nackWake) {
this.nackWake = 0;
if (this.nackWakeTimeMillis > 0) {
if (System.currentTimeMillis() > this.nackWakeTimeMillis) {
this.nackWakeTimeMillis = 0;
this.consumer.resume(this.pausedForNack);
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
this.pausedForNack.clear();
Expand Down Expand Up @@ -2207,7 +2207,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA

invokeBatchOnMessageWithRecordsOrList(records, recordList);
List<ConsumerRecord<?, ?>> toSeek = null;
if (this.nackSleep >= 0) {
if (this.nackSleepDurationMillis >= 0) {
int index = 0;
toSeek = new ArrayList<>();
for (ConsumerRecord<K, V> record : records) {
Expand All @@ -2217,7 +2217,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
}
}
if (this.producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
if (this.nackSleep < 0) {
if (this.nackSleepDurationMillis < 0) {
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
this.acks.put(record);
}
Expand Down Expand Up @@ -2356,7 +2356,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
if (this.commonRecordInterceptor != null) {
this.commonRecordInterceptor.afterRecord(record, this.consumer);
}
if (this.nackSleep >= 0) {
if (this.nackSleepDurationMillis >= 0) {
handleNack(records, record);
break;
}
Expand Down Expand Up @@ -2435,7 +2435,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
if (this.commonRecordInterceptor != null) {
this.commonRecordInterceptor.afterRecord(record, this.consumer);
}
if (this.nackSleep >= 0) {
if (this.nackSleepDurationMillis >= 0) {
handleNack(records, record);
break;
}
Expand Down Expand Up @@ -2510,8 +2510,8 @@ private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec
}

private void pauseForNackSleep() {
if (this.nackSleep > 0) {
this.nackWake = System.currentTimeMillis() + this.nackSleep;
if (this.nackSleepDurationMillis > 0) {
this.nackWakeTimeMillis = System.currentTimeMillis() + this.nackSleepDurationMillis;
Set<TopicPartition> alreadyPaused = this.consumer.paused();
Collection<TopicPartition> assigned = getAssignedPartitions();
if (assigned != null) {
Expand All @@ -2531,7 +2531,7 @@ private void pauseForNackSleep() {
this.consumer.resume(nowPaused);
}
}
this.nackSleep = -1;
this.nackSleepDurationMillis = -1;
}

/**
Expand Down Expand Up @@ -2626,7 +2626,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
checkDeser(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
}
doInvokeOnMessage(record);
if (this.nackSleep < 0 && !this.isManualImmediateAck) {
if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
ackCurrent(record);
}
}
Expand Down Expand Up @@ -3174,11 +3174,11 @@ public void acknowledge() {
}

@Override
public void nack(long sleep) {
public void nack(long sleepMillis) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
ListenerConsumer.this.nackSleep = sleep;
Assert.isTrue(sleepMillis >= 0, "sleep cannot be negative");
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
synchronized (ListenerConsumer.this) {
if (ListenerConsumer.this.offsetsInThisBatch != null) {
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
Expand Down Expand Up @@ -3227,7 +3227,7 @@ public void nack(int index, long sleep) {
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
ListenerConsumer.this.nackIndex = index;
ListenerConsumer.this.nackSleep = sleep;
ListenerConsumer.this.nackSleepDurationMillis = sleep;
synchronized (ListenerConsumer.this) {
if (ListenerConsumer.this.offsetsInThisBatch != null) {
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ public interface Acknowledgment {
* {@code sleep + time spent processing the previous messages from the poll} must be
* less than the consumer {@code max.poll.interval.ms} property, to avoid a
* rebalance.</b>
* @param sleep the time to sleep; the actual sleep time will be larger of this value
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger of this value
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @since 2.3
*/
default void nack(long sleep) {
default void nack(long sleepMillis) {
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
}

Expand Down

0 comments on commit 6aeac5c

Please sign in to comment.