From a4e2be0fedb552abe9cc082b24a9f94cbe4cc047 Mon Sep 17 00:00:00 2001 From: Alexey Podogov Date: Mon, 9 Sep 2024 11:38:01 +0300 Subject: [PATCH] KafkaPublisher Ack timeout can be set via properties and autoconfiguration --- .../extensions/kafka/KafkaProperties.java | 14 ++++++++++++++ .../kafka/autoconfig/KafkaAutoConfiguration.java | 1 + 2 files changed, 15 insertions(+) diff --git a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/KafkaProperties.java b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/KafkaProperties.java index a983962..9ab7955 100644 --- a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/KafkaProperties.java +++ b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/KafkaProperties.java @@ -234,6 +234,12 @@ public static class Publisher { */ private String processingGroup = DEFAULT_PROCESSING_GROUP; + /** + * The publisher acknowledge timeout in milliseconds specifying how long to wait for a publisher to + * acknowledge a message has been sent. Defaults to {@code 1000} milliseconds. + */ + private long ackTimeout = 1000; + public boolean isEnabled() { return enabled; } @@ -257,6 +263,14 @@ public String getProcessingGroup() { public void setProcessingGroup(String processingGroup) { this.processingGroup = processingGroup; } + + public long getAckTimeout() { + return ackTimeout; + } + + public void setAckTimeout(long ackTimeout) { + this.ackTimeout = ackTimeout; + } } /** diff --git a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java index 85b3023..6cb50d4 100644 --- a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java +++ b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java @@ -176,6 +176,7 @@ public TopicResolver topicResolver() { .messageConverter(kafkaMessageConverter) .messageMonitor(configuration.messageMonitor(KafkaPublisher.class, "kafkaPublisher")) .topicResolver(topicResolver) + .publisherAckTimeout(properties.getPublisher().getAckTimeout()) .build(); }