From 9f1050a849db35d3fabb821169c05a59c717fcf3 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 16 Nov 2023 14:55:17 -0500 Subject: [PATCH] GH-2891: Fix isAckAfterHandle with No group.id Resolves https://github.com/spring-projects/spring-kafka/issues/2891 When using manual partition assignment and `AckMode.MANUAL`, it is possible to have a `null` `group.id`, whereby Kafka does not maintain any committed offsets. In this case, we should not attempt to commit the offset after recovery, even if the error handler `ackAfterHandle` property is true. **cherry-pick to 3.0.x, 2.9.x** --- .../listener/KafkaMessageListenerContainer.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index ee36e44465..969fc31f0f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2259,7 +2259,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor try { this.batchFailed = true; invokeBatchErrorHandler(records, recordList, e); - commitOffsetsIfNeeded(records); + commitOffsetsIfNeededAfterHandlingError(records); } catch (KafkaException ke) { ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); @@ -2280,8 +2280,8 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor return null; } - private void commitOffsetsIfNeeded(final ConsumerRecords records) { - if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle()) + private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecords records) { + if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null) || this.producer != null) { if (this.remainingRecords != null) { ConsumerRecord firstUncommitted = this.remainingRecords.iterator().next(); @@ -2744,7 +2744,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco } try { invokeErrorHandler(cRecord, iterator, e); - commitOffsetsIfNeeded(cRecord); + commitOffsetsIfNeededAfterHandlingError(cRecord); } catch (KafkaException ke) { ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); @@ -2763,8 +2763,8 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco }); } - private void commitOffsetsIfNeeded(final ConsumerRecord cRecord) { - if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle()) + private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord cRecord) { + if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null) || this.producer != null) { if (this.isManualAck) { this.commitRecovered = true;