diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 3612104910..104c3a1e54 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -707,6 +707,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Map wasIdlePartition; + private final Set pausedForNack = new HashSet<>(); + private Map definedPartitions; private int count; @@ -723,6 +725,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private long nackSleep = -1; + private long nackWake; + private int nackIndex; private Iterator batchIterator; @@ -1589,6 +1593,10 @@ private void pauseConsumerIfNecessary() { } private void doPauseConsumerIfNecessary() { + if (this.pausedForNack.size() > 0) { + this.logger.debug("Still paused for nack sleep"); + return; + } if (this.offsetsInThisBatch != null && this.offsetsInThisBatch.size() > 0 && !this.pausedForAsyncAcks) { this.pausedForAsyncAcks = true; this.logger.debug(() -> "Pausing for incomplete async acks: " + this.offsetsInThisBatch); @@ -1602,7 +1610,15 @@ private void doPauseConsumerIfNecessary() { } private void resumeConsumerIfNeccessary() { - if (this.offsetsInThisBatch != null) { + if (this.nackWake > 0) { + if (System.currentTimeMillis() > this.nackWake) { + this.nackWake = 0; + this.consumer.resume(this.pausedForNack); + this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack); + this.pausedForNack.clear(); + } + } + else if (this.offsetsInThisBatch != null) { synchronized (this) { doResumeConsumerIfNeccessary(); } @@ -1646,12 +1662,10 @@ private void pausePartitionsIfNecessary() { } private void resumePartitionsIfNecessary() { - Set pausedConsumerPartitions = this.consumer.paused(); - List partitionsToResume = this - .assignedPartitions + List partitionsToResume = getAssignedPartitions() .stream() .filter(tp -> !isPartitionPauseRequested(tp) - && pausedConsumerPartitions.contains(tp)) + && this.pausedPartitions.contains(tp)) .collect(Collectors.toList()); if (partitionsToResume.size() > 0) { this.consumer.resume(partitionsToResume); @@ -2200,7 +2214,7 @@ private void invokeBatchOnMessage(final ConsumerRecords records, // NOSONA processCommits(); } SeekUtils.doSeeks(toSeek, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR - nackSleepAndReset(); + pauseForNackSleep(); } } @@ -2458,17 +2472,29 @@ private void handleNack(final ConsumerRecords records, final ConsumerRecor } } SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR - nackSleepAndReset(); + pauseForNackSleep(); } - private void nackSleepAndReset() { - try { - ListenerUtils.stoppableSleep(KafkaMessageListenerContainer.this.thisOrParentContainer, this.nackSleep); - } - catch (@SuppressWarnings(UNUSED) InterruptedException e) { - Thread.currentThread().interrupt(); + private void pauseForNackSleep() { + if (this.nackSleep > 0) { + this.nackWake = System.currentTimeMillis() + this.nackSleep; + this.nackSleep = -1; + Set alreadyPaused = this.consumer.paused(); + this.pausedForNack.addAll(getAssignedPartitions()); + this.pausedForNack.removeAll(alreadyPaused); + this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack); + try { + this.consumer.pause(this.pausedForNack); + } + catch (IllegalStateException ex) { + // this should never happen; defensive, just in case... + this.logger.warn(() -> "Could not pause for nack, possible rebalance in process: " + + ex.getMessage()); + Set nowPaused = new HashSet<>(this.consumer.paused()); + nowPaused.removeAll(alreadyPaused); + this.consumer.resume(nowPaused); + } } - this.nackSleep = -1; } /** @@ -3237,6 +3263,7 @@ public void onPartitionsRevoked(Collection partitions) { if (ListenerConsumer.this.assignedPartitions != null) { ListenerConsumer.this.assignedPartitions.removeAll(partitions); } + ListenerConsumer.this.pausedForNack.removeAll(partitions); partitions.forEach(tp -> ListenerConsumer.this.lastCommits.remove(tp)); synchronized (ListenerConsumer.this) { if (ListenerConsumer.this.offsetsInThisBatch != null) { @@ -3261,6 +3288,9 @@ public void onPartitionsAssigned(Collection partitions) { ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; " + "consumer paused again, so the initial poll() will never return any records"); } + if (ListenerConsumer.this.pausedForNack.size() > 0) { + ListenerConsumer.this.consumer.pause(ListenerConsumer.this.pausedForNack); + } ListenerConsumer.this.assignedPartitions.addAll(partitions); if (ListenerConsumer.this.commitCurrentOnAssignment && !collectAndCommitIfNecessary(partitions)) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java index 625e11ffb3..f60f027f6e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -197,7 +197,12 @@ public Consumer consumer() { new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", new RecordHeaders(), Optional.empty()))); final AtomicInteger which = new AtomicInteger(); + final AtomicBoolean paused = new AtomicBoolean(); willAnswer(i -> { + if (paused.get()) { + Thread.sleep(10); + return ConsumerRecords.empty(); + } this.pollLatch.countDown(); switch (which.getAndIncrement()) { case 0: @@ -211,9 +216,20 @@ public Consumer consumer() { catch (@SuppressWarnings("unused") InterruptedException e) { Thread.currentThread().interrupt(); } - return new ConsumerRecords(Collections.emptyMap()); + return ConsumerRecords.empty(); } }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + willAnswer(i -> { + return Collections.emptySet(); + }).given(consumer).paused(); + willAnswer(i -> { + paused.set(true); + return null; + }).given(consumer).pause(any()); + willAnswer(i -> { + paused.set(false); + return null; + }).given(consumer).resume(any()); willAnswer(i -> { this.commitLatch.countDown(); return null; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTxTests.java similarity index 90% rename from spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java rename to spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTxTests.java index 5f62a416d3..358104ec77 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTxTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.Consumer; @@ -77,7 +78,7 @@ @SpringJUnitConfig @DirtiesContext @SuppressWarnings("deprecation") -public class ManualNackRecordTxTests { +public class ManualNackBatchTxTests { @SuppressWarnings("rawtypes") @Autowired @@ -102,6 +103,7 @@ public class ManualNackRecordTxTests { @Test public void discardRemainingRecordsFromPollAndSeek() throws Exception { assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.replayTime).isBetween(50L, 30_000L); assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); this.registry.stop(); @@ -128,24 +130,27 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception { @EnableKafka public static class Config { - private final List> contents = new ArrayList<>(); + final List> contents = new ArrayList<>(); - private final CountDownLatch pollLatch = new CountDownLatch(3); + final CountDownLatch pollLatch = new CountDownLatch(3); - private final CountDownLatch deliveryLatch = new CountDownLatch(2); + final CountDownLatch deliveryLatch = new CountDownLatch(2); - private final CountDownLatch closeLatch = new CountDownLatch(1); + final CountDownLatch closeLatch = new CountDownLatch(1); - private final CountDownLatch commitLatch = new CountDownLatch(2); + final CountDownLatch commitLatch = new CountDownLatch(2); - private int count; + volatile int count; + + volatile long replayTime; @KafkaListener(topics = "foo", groupId = "grp") public void foo(List in, Acknowledgment ack) { this.contents.add(in); + this.replayTime = System.currentTimeMillis() - this.replayTime; this.deliveryLatch.countDown(); if (++this.count == 1) { // part 1, offset 1, first time - ack.nack(3, 0); + ack.nack(3, 50); } else { ack.acknowledge(); @@ -196,7 +201,12 @@ public Consumer consumer() { new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", new RecordHeaders(), Optional.empty()))); final AtomicInteger which = new AtomicInteger(); + final AtomicBoolean paused = new AtomicBoolean(); willAnswer(i -> { + if (paused.get()) { + Thread.sleep(10); + return ConsumerRecords.empty(); + } this.pollLatch.countDown(); switch (which.getAndIncrement()) { case 0: @@ -210,9 +220,20 @@ public Consumer consumer() { catch (InterruptedException e) { Thread.currentThread().interrupt(); } - return new ConsumerRecords(Collections.emptyMap()); + return ConsumerRecords.empty(); } }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + willAnswer(i -> { + return Collections.emptySet(); + }).given(consumer).paused(); + willAnswer(i -> { + paused.set(true); + return null; + }).given(consumer).pause(any()); + willAnswer(i -> { + paused.set(false); + return null; + }).given(consumer).resume(any()); willAnswer(i -> { this.commitLatch.countDown(); return null; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java new file mode 100644 index 0000000000..30506d6900 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java @@ -0,0 +1,237 @@ +/* + * Copyright 2017-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.mock; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.3 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class ManualNackPauseResumeTests { + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + @SuppressWarnings({ "unchecked" }) + @Test + public void dontResumeAlreadyPaused() throws Exception { + assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.replayTime).isBetween(50L, 30_000L); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.registry.stop(); + assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pausedForNack).hasSize(1); + assertThat(this.config.resumedForNack).hasSize(1); + assertThat(this.config.pausedForNack).contains(new TopicPartition("foo", 1)); + assertThat(this.config.resumedForNack).contains(new TopicPartition("foo", 1)); + } + + @Configuration + @EnableKafka + public static class Config { + + final List contents = new ArrayList<>(); + + final CountDownLatch pollLatch = new CountDownLatch(3); + + final CountDownLatch deliveryLatch = new CountDownLatch(7); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + final CountDownLatch commitLatch = new CountDownLatch(2); + + final Set pausedForNack = new HashSet<>(); + + final Set resumedForNack = new HashSet<>(); + + volatile int count; + + volatile long replayTime; + + @KafkaListener(id = "foo", topics = "foo", groupId = "grp") + public void foo(String in, Acknowledgment ack) { + this.contents.add(in); + if (in.equals("qux")) { + this.replayTime = System.currentTimeMillis() - this.replayTime; + } + this.deliveryLatch.countDown(); + if (++this.count == 4) { // part 1, offset 1, first time + ack.nack(50); + } + else { + ack.acknowledge(); + } + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory() { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(); + given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer() { + final Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + final TopicPartition topicPartition2 = new TopicPartition("foo", 2); + willAnswer(i -> { + ((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned( + Collections.singletonList(topicPartition1)); + return null; + }).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + Map> records1 = new LinkedHashMap<>(); + records1.put(topicPartition0, Arrays.asList( + new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition2, Arrays.asList( + new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", + new RecordHeaders(), Optional.empty()))); + Map> records2 = new LinkedHashMap<>(records1); + records2.remove(topicPartition0); + records2.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + final AtomicInteger which = new AtomicInteger(); + final AtomicBoolean paused = new AtomicBoolean(); + willAnswer(i -> { + if (paused.get()) { + Thread.sleep(10); + return ConsumerRecords.empty(); + } + this.pollLatch.countDown(); + switch (which.getAndIncrement()) { + case 0: + return new ConsumerRecords(records1); + case 1: + return new ConsumerRecords(records2); + default: + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new ConsumerRecords(Collections.emptyMap()); + } + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + willAnswer(i -> { + return Set.of(topicPartition0, topicPartition2); + }).given(consumer).paused(); + willAnswer(i -> { + paused.set(true); + this.pausedForNack.addAll(i.getArgument(0)); + return null; + }).given(consumer).pause(any()); + willAnswer(i -> { + paused.set(false); + this.resumedForNack.addAll(i.getArgument(0)); + return null; + }).given(consumer).resume(any()); + willAnswer(i -> { + this.commitLatch.countDown(); + return null; + }).given(consumer).commitSync(anyMap(), any()); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(AckMode.MANUAL); + factory.getContainerProperties().setMissingTopicsFatal(false); + return factory; + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java index 47bc754e51..688c8504da 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.Consumer; @@ -90,6 +91,7 @@ public class ManualNackRecordTests { @Test public void discardRemainingRecordsFromPollAndSeek() throws Exception { assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.replayTime).isBetween(50L, 30_000L); assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); this.registry.stop(); @@ -117,24 +119,29 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception { @EnableKafka public static class Config { - private final List contents = new ArrayList<>(); + final List contents = new ArrayList<>(); - private final CountDownLatch pollLatch = new CountDownLatch(3); + final CountDownLatch pollLatch = new CountDownLatch(3); - private final CountDownLatch deliveryLatch = new CountDownLatch(7); + final CountDownLatch deliveryLatch = new CountDownLatch(7); - private final CountDownLatch closeLatch = new CountDownLatch(1); + final CountDownLatch closeLatch = new CountDownLatch(1); - private final CountDownLatch commitLatch = new CountDownLatch(2); + final CountDownLatch commitLatch = new CountDownLatch(2); - private int count; + volatile int count; + + volatile long replayTime; @KafkaListener(topics = "foo", groupId = "grp") public void foo(String in, Acknowledgment ack) { this.contents.add(in); + if (in.equals("qux")) { + this.replayTime = System.currentTimeMillis() - this.replayTime; + } this.deliveryLatch.countDown(); if (++this.count == 4) { // part 1, offset 1, first time - ack.nack(0); + ack.nack(50); } else { ack.acknowledge(); @@ -185,7 +192,12 @@ public Consumer consumer() { new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", new RecordHeaders(), Optional.empty()))); final AtomicInteger which = new AtomicInteger(); + final AtomicBoolean paused = new AtomicBoolean(); willAnswer(i -> { + if (paused.get()) { + Thread.sleep(10); + return ConsumerRecords.empty(); + } this.pollLatch.countDown(); switch (which.getAndIncrement()) { case 0: @@ -202,6 +214,17 @@ public Consumer consumer() { return new ConsumerRecords(Collections.emptyMap()); } }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + willAnswer(i -> { + return Collections.emptySet(); + }).given(consumer).paused(); + willAnswer(i -> { + paused.set(true); + return null; + }).given(consumer).pause(any()); + willAnswer(i -> { + paused.set(false); + return null; + }).given(consumer).resume(any()); willAnswer(i -> { this.commitLatch.countDown(); return null; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java index 36e11ed5f1..e00243e68a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java @@ -52,6 +52,7 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; +import org.springframework.kafka.listener.ListenerUtils; import org.springframework.kafka.support.EndpointHandlerMethod; import org.springframework.test.util.ReflectionTestUtils; @@ -358,6 +359,7 @@ void shouldInstantiateIfNotInContainer() { @Test void shouldLogConsumerRecordMessage() { + ListenerUtils.setLogOnlyMetadata(false); RetryTopicConfigurer.LoggingDltListenerHandlerMethod method = new RetryTopicConfigurer.LoggingDltListenerHandlerMethod(); method.logMessage(consumerRecordMessage); diff --git a/spring-kafka/src/test/resources/log4j2-test.xml b/spring-kafka/src/test/resources/log4j2-test.xml index 9f33afab77..cee3db9d3f 100644 --- a/spring-kafka/src/test/resources/log4j2-test.xml +++ b/spring-kafka/src/test/resources/log4j2-test.xml @@ -6,7 +6,7 @@ - +