Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Precise unit for sleep duration and wake time #2278

Merged
merged 1 commit into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -730,9 +730,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private long lastAlertAt = this.lastReceive;

private long nackSleep = -1;
private long nackSleepDurationMillis = -1;

private long nackWake;
private long nackWakeTimeMillis;

private int nackIndex;

Expand Down Expand Up @@ -1622,9 +1622,9 @@ private void doPauseConsumerIfNecessary() {
}

private void resumeConsumerIfNeccessary() {
if (this.nackWake > 0) {
if (System.currentTimeMillis() > this.nackWake) {
this.nackWake = 0;
if (this.nackWakeTimeMillis > 0) {
if (System.currentTimeMillis() > this.nackWakeTimeMillis) {
this.nackWakeTimeMillis = 0;
this.consumer.resume(this.pausedForNack);
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
this.pausedForNack.clear();
Expand Down Expand Up @@ -2207,7 +2207,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA

invokeBatchOnMessageWithRecordsOrList(records, recordList);
List<ConsumerRecord<?, ?>> toSeek = null;
if (this.nackSleep >= 0) {
if (this.nackSleepDurationMillis >= 0) {
int index = 0;
toSeek = new ArrayList<>();
for (ConsumerRecord<K, V> record : records) {
Expand All @@ -2217,7 +2217,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
}
}
if (this.producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
if (this.nackSleep < 0) {
if (this.nackSleepDurationMillis < 0) {
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
this.acks.put(record);
}
Expand Down Expand Up @@ -2356,7 +2356,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
if (this.commonRecordInterceptor != null) {
this.commonRecordInterceptor.afterRecord(record, this.consumer);
}
if (this.nackSleep >= 0) {
if (this.nackSleepDurationMillis >= 0) {
handleNack(records, record);
break;
}
Expand Down Expand Up @@ -2435,7 +2435,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
if (this.commonRecordInterceptor != null) {
this.commonRecordInterceptor.afterRecord(record, this.consumer);
}
if (this.nackSleep >= 0) {
if (this.nackSleepDurationMillis >= 0) {
handleNack(records, record);
break;
}
Expand Down Expand Up @@ -2510,8 +2510,8 @@ private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec
}

private void pauseForNackSleep() {
if (this.nackSleep > 0) {
this.nackWake = System.currentTimeMillis() + this.nackSleep;
if (this.nackSleepDurationMillis > 0) {
this.nackWakeTimeMillis = System.currentTimeMillis() + this.nackSleepDurationMillis;
Set<TopicPartition> alreadyPaused = this.consumer.paused();
Collection<TopicPartition> assigned = getAssignedPartitions();
if (assigned != null) {
Expand All @@ -2531,7 +2531,7 @@ private void pauseForNackSleep() {
this.consumer.resume(nowPaused);
}
}
this.nackSleep = -1;
this.nackSleepDurationMillis = -1;
}

/**
Expand Down Expand Up @@ -2626,7 +2626,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
checkDeser(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
}
doInvokeOnMessage(record);
if (this.nackSleep < 0 && !this.isManualImmediateAck) {
if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
ackCurrent(record);
}
}
Expand Down Expand Up @@ -3174,11 +3174,11 @@ public void acknowledge() {
}

@Override
public void nack(long sleep) {
public void nack(long sleepMillis) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
ListenerConsumer.this.nackSleep = sleep;
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
synchronized (ListenerConsumer.this) {
if (ListenerConsumer.this.offsetsInThisBatch != null) {
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
Expand Down Expand Up @@ -3221,13 +3221,13 @@ public void acknowledge() {
}

@Override
public void nack(int index, long sleep) {
public void nack(int index, long sleepMillis) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
Assert.isTrue(sleepMillis >= 0, "sleepMillis cannot be negative");
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
ListenerConsumer.this.nackIndex = index;
ListenerConsumer.this.nackSleep = sleep;
ListenerConsumer.this.nackSleepDurationMillis = sleepMillis;
synchronized (ListenerConsumer.this) {
if (ListenerConsumer.this.offsetsInThisBatch != null) {
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,28 @@ public interface Acknowledgment {
/**
* Negatively acknowledge the current record - discard remaining records from the poll
* and re-seek all partitions so that this record will be redelivered after the sleep
* time. Must be called on the consumer thread.
* time (in milliseconds). Must be called on the consumer thread.
* <p>
* <b>When using group management,
* {@code sleep + time spent processing the previous messages from the poll} must be
* less than the consumer {@code max.poll.interval.ms} property, to avoid a
* rebalance.</b>
* @param sleep the time to sleep; the actual sleep time will be larger of this value
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @since 2.3
*/
default void nack(long sleep) {
default void nack(long sleepMillis) {
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
}

/**
* Negatively acknowledge the record at an index in a batch - commit the offset(s) of
* records before the index and re-seek the partitions so that the record at the index
* and subsequent records will be redelivered after the sleep time. Must be called on
* the consumer thread.
* and subsequent records will be redelivered after the sleep time (in milliseconds).
* Must be called on the consumer thread.
* <p>
* <b>When using group management,
* {@code sleep + time spent processing the records before the index} must be less
* than the consumer {@code max.poll.interval.ms} property, to avoid a rebalance.</b>
* @param index the index of the failed record in the batch.
* @param sleep the time to sleep; the actual sleep time will be larger of this value
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @param sleepMillis the time to sleep in milliseconds; the actual sleep time will be larger
* of this value and the container's {@code maxPollInterval}, which defaults to 5 seconds.
* @since 2.3
*/
default void nack(int index, long sleep) {
default void nack(int index, long sleepMillis) {
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
}

Expand Down