Skip to content

Commit

Permalink
spring-projectsGH-2891: Fix isAckAfterHandle with No group.id
Browse files Browse the repository at this point in the history
Resolves spring-projects#2891

When using manual partition assignment and `AckMode.MANUAL`, it is possible
to have a `null` `group.id`, whereby Kafka does not maintain any committed offsets.

In this case, we should not attempt to commit the offset after recovery, even if
the error handler `ackAfterHandle` property is true.

**cherry-pick to 3.0.x, 2.9.x**
  • Loading branch information
garyrussell authored Nov 16, 2023
1 parent 6270601 commit 9f1050a
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2259,7 +2259,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
try {
this.batchFailed = true;
invokeBatchErrorHandler(records, recordList, e);
commitOffsetsIfNeeded(records);
commitOffsetsIfNeededAfterHandlingError(records);
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
Expand All @@ -2280,8 +2280,8 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
return null;
}

private void commitOffsetsIfNeeded(final ConsumerRecords<K, V> records) {
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecords<K, V> records) {
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null)
|| this.producer != null) {
if (this.remainingRecords != null) {
ConsumerRecord<K, V> firstUncommitted = this.remainingRecords.iterator().next();
Expand Down Expand Up @@ -2744,7 +2744,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
}
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeeded(cRecord);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
Expand All @@ -2763,8 +2763,8 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
});
}

private void commitOffsetsIfNeeded(final ConsumerRecord<K, V> cRecord) {
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null)
|| this.producer != null) {
if (this.isManualAck) {
this.commitRecovered = true;
Expand Down

0 comments on commit 9f1050a

Please sign in to comment.