From 5a1298fcadbd536074624af1a4a65e860386e6c3 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 16 Nov 2023 15:00:38 -0500 Subject: [PATCH] GH-2891: Always MANUAL with null group.id Resolves https://github.com/spring-projects/spring-kafka/issues/2891 When using manual partition assignment with `null` `group.id` always coerce `AckMode` to `MANUAL`. --- .../main/antora/modules/ROOT/pages/tips.adoc | 1 + .../antora/modules/ROOT/pages/whats-new.adoc | 8 ++- .../KafkaMessageListenerContainer.java | 50 +++++++++++++------ 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc index 41d0e93507..0dbb13b4bf 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc @@ -44,6 +44,7 @@ public static class PartitionFinder { Using this in conjunction with `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest` will load all records each time the application is started. You should also set the container's `AckMode` to `MANUAL` to prevent the container from committing offsets for a `null` consumer group. +Starting with version 3.1, the container will automatically coerce the `AckMode` to `MANUAL` when manual topic assignment is used with no consumer `group.id`. However, starting with version 2.5.5, as shown above, you can apply an initial offset to all partitions; see xref:kafka/receiving-messages/listener-annotation.adoc#manual-assignment[Explicit Partition Assignment] for more information. [[ex-jdbc-sync]] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 47b210d147..45a04aba4f 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -43,4 +43,10 @@ See xref:kafka/serdes.adoc#error-handling-deserializer[Using `ErrorHandlingDeser === Retryable Topics Change suffix `-retry-5000` to `-retry` when `@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)`. If you want to keep suffix `-retry-5000`, use `@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")`. -See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information. \ No newline at end of file +See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information. + +[[x31-c]] +=== Listener Container Changes + +When manually assigning partitions, with a `null` consumer `group.id`, the `AckMode` is now automatically coerced to `MANUAL`. +See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 969fc31f0f..4419d5d90c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -661,19 +661,17 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean autoCommit; - private final boolean isManualAck = this.containerProperties.getAckMode().equals(AckMode.MANUAL); + private final boolean isManualAck; - private final boolean isCountAck = this.containerProperties.getAckMode().equals(AckMode.COUNT) - || this.containerProperties.getAckMode().equals(AckMode.COUNT_TIME); + private final boolean isCountAck; - private final boolean isTimeOnlyAck = this.containerProperties.getAckMode().equals(AckMode.TIME); + private final boolean isTimeOnlyAck; - private final boolean isManualImmediateAck = - this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE); + private final boolean isManualImmediateAck; - private final boolean isAnyManualAck = this.isManualAck || this.isManualImmediateAck; + private final boolean isAnyManualAck; - private final boolean isRecordAck = this.containerProperties.getAckMode().equals(AckMode.RECORD); + private final boolean isRecordAck; private final BlockingQueue> acks = new LinkedBlockingQueue<>(); @@ -768,15 +766,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Set pausedPartitions = new HashSet<>(); - private final Map> offsetsInThisBatch = - this.isAnyManualAck && this.containerProperties.isAsyncAcks() - ? new HashMap<>() - : null; + private final Map> offsetsInThisBatch; - private final Map>> deferredOffsets = - this.isAnyManualAck && this.containerProperties.isAsyncAcks() - ? new HashMap<>() - : null; + private final Map>> deferredOffsets; private final Map lastReceivePartition; @@ -857,6 +849,24 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, ObservationRegistry observationRegistry) { + AckMode ackMode = determineAckMode(); + this.isManualAck = ackMode.equals(AckMode.MANUAL); + this.isCountAck = ackMode.equals(AckMode.COUNT) + || ackMode.equals(AckMode.COUNT_TIME); + this.isTimeOnlyAck = ackMode.equals(AckMode.TIME); + this.isManualImmediateAck = + ackMode.equals(AckMode.MANUAL_IMMEDIATE); + this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck; + this.isRecordAck = ackMode.equals(AckMode.RECORD); + this.offsetsInThisBatch = + this.isAnyManualAck && this.containerProperties.isAsyncAcks() + ? new HashMap<>() + : null; + this.deferredOffsets = + this.isAnyManualAck && this.containerProperties.isAsyncAcks() + ? new HashMap<>() + : null; + this.observationRegistry = observationRegistry; Properties consumerProperties = propertiesFromConsumerPropertyOverrides(); checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory); @@ -950,6 +960,14 @@ else if (listener instanceof MessageListener) { this.kafkaAdmin = obtainAdmin(); } + private AckMode determineAckMode() { + AckMode ackMode = this.containerProperties.getAckMode(); + if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) { + ackMode = AckMode.MANUAL; + } + return ackMode; + } + @Nullable private Object determineBootstrapServers(Properties consumerProperties) { Object servers = consumerProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);