Skip to content

Commit

Permalink
spring-projectsGH-2891: Always MANUAL with null group.id
Browse files Browse the repository at this point in the history
Resolves spring-projects#2891

When using manual partition assignment with `null` `group.id` always
coerce `AckMode` to `MANUAL`.
  • Loading branch information
garyrussell committed Nov 16, 2023
1 parent 9f1050a commit 5a1298f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static class PartitionFinder {

Using this in conjunction with `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest` will load all records each time the application is started.
You should also set the container's `AckMode` to `MANUAL` to prevent the container from committing offsets for a `null` consumer group.
Starting with version 3.1, the container will automatically coerce the `AckMode` to `MANUAL` when manual topic assignment is used with no consumer `group.id`.
However, starting with version 2.5.5, as shown above, you can apply an initial offset to all partitions; see xref:kafka/receiving-messages/listener-annotation.adoc#manual-assignment[Explicit Partition Assignment] for more information.

[[ex-jdbc-sync]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,10 @@ See xref:kafka/serdes.adoc#error-handling-deserializer[Using `ErrorHandlingDeser
=== Retryable Topics
Change suffix `-retry-5000` to `-retry` when `@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)`.
If you want to keep suffix `-retry-5000`, use `@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")`.
See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.
See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.

[[x31-c]]
=== Listener Container Changes

When manually assigning partitions, with a `null` consumer `group.id`, the `AckMode` is now automatically coerced to `MANUAL`.
See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -661,19 +661,17 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final boolean autoCommit;

private final boolean isManualAck = this.containerProperties.getAckMode().equals(AckMode.MANUAL);
private final boolean isManualAck;

private final boolean isCountAck = this.containerProperties.getAckMode().equals(AckMode.COUNT)
|| this.containerProperties.getAckMode().equals(AckMode.COUNT_TIME);
private final boolean isCountAck;

private final boolean isTimeOnlyAck = this.containerProperties.getAckMode().equals(AckMode.TIME);
private final boolean isTimeOnlyAck;

private final boolean isManualImmediateAck =
this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE);
private final boolean isManualImmediateAck;

private final boolean isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
private final boolean isAnyManualAck;

private final boolean isRecordAck = this.containerProperties.getAckMode().equals(AckMode.RECORD);
private final boolean isRecordAck;

private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue<>();

Expand Down Expand Up @@ -768,15 +766,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final Set<TopicPartition> pausedPartitions = new HashSet<>();

private final Map<TopicPartition, List<Long>> offsetsInThisBatch =
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
? new HashMap<>()
: null;
private final Map<TopicPartition, List<Long>> offsetsInThisBatch;

private final Map<TopicPartition, List<ConsumerRecord<K, V>>> deferredOffsets =
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
? new HashMap<>()
: null;
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> deferredOffsets;

private final Map<TopicPartition, Long> lastReceivePartition;

Expand Down Expand Up @@ -857,6 +849,24 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
ObservationRegistry observationRegistry) {

AckMode ackMode = determineAckMode();
this.isManualAck = ackMode.equals(AckMode.MANUAL);
this.isCountAck = ackMode.equals(AckMode.COUNT)
|| ackMode.equals(AckMode.COUNT_TIME);
this.isTimeOnlyAck = ackMode.equals(AckMode.TIME);
this.isManualImmediateAck =
ackMode.equals(AckMode.MANUAL_IMMEDIATE);
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
this.isRecordAck = ackMode.equals(AckMode.RECORD);
this.offsetsInThisBatch =
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
? new HashMap<>()
: null;
this.deferredOffsets =
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
? new HashMap<>()
: null;

this.observationRegistry = observationRegistry;
Properties consumerProperties = propertiesFromConsumerPropertyOverrides();
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
Expand Down Expand Up @@ -950,6 +960,14 @@ else if (listener instanceof MessageListener) {
this.kafkaAdmin = obtainAdmin();
}

private AckMode determineAckMode() {
AckMode ackMode = this.containerProperties.getAckMode();
if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) {
ackMode = AckMode.MANUAL;
}
return ackMode;
}

@Nullable
private Object determineBootstrapServers(Properties consumerProperties) {
Object servers = consumerProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
Expand Down

0 comments on commit 5a1298f

Please sign in to comment.