From 39e10f1b94898a2c67ebadd80dac920744792fc9 Mon Sep 17 00:00:00 2001 From: Alexey Podogov Date: Wed, 6 Sep 2023 21:54:32 +0300 Subject: [PATCH] Autoconfigured TopicResolver bean with ability to replace it by user configuration --- .../kafka/autoconfig/KafkaAutoConfiguration.java | 14 ++++++++++++-- .../KafkaAutoConfigurationIntegrationTest.java | 6 ++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java index b8ecbceb..c729d9da 100644 --- a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java +++ b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java @@ -36,6 +36,7 @@ import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher; import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher; import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory; +import org.axonframework.extensions.kafka.eventhandling.producer.TopicResolver; import org.axonframework.extensions.kafka.eventhandling.tokenstore.KafkaTokenStore; import org.axonframework.serialization.Serializer; import org.axonframework.serialization.upcasting.event.EventUpcasterChain; @@ -152,6 +153,14 @@ private boolean isNonEmptyString(String s) { return s != null && !s.equals(""); } + @Bean + @ConditionalOnMissingBean + @ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class}) + @ConditionalOnProperty(name = "axon.kafka.publisher.enabled", havingValue = "true", matchIfMissing = true) + public TopicResolver topicResolver() { + return m -> Optional.of(properties.getDefaultTopic()); + } + @ConditionalOnMissingBean @Bean(destroyMethod = "shutDown") @ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class}) @@ -161,13 +170,14 @@ private boolean isNonEmptyString(String s) { @Qualifier("eventSerializer") Serializer eventSerializer, ProducerFactory axonKafkaProducerFactory, KafkaMessageConverter kafkaMessageConverter, - org.axonframework.config.Configuration configuration) { + org.axonframework.config.Configuration configuration, + TopicResolver topicResolver) { return KafkaPublisher.builder() .serializer(eventSerializer) .producerFactory(axonKafkaProducerFactory) .messageConverter(kafkaMessageConverter) .messageMonitor(configuration.messageMonitor(KafkaPublisher.class, "kafkaPublisher")) - .topicResolver(m -> Optional.of(properties.getDefaultTopic())) + .topicResolver(topicResolver) .build(); } diff --git a/kafka-spring-boot-autoconfigure/src/test/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfigurationIntegrationTest.java b/kafka-spring-boot-autoconfigure/src/test/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfigurationIntegrationTest.java index e0c247df..4ac76080 100644 --- a/kafka-spring-boot-autoconfigure/src/test/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfigurationIntegrationTest.java +++ b/kafka-spring-boot-autoconfigure/src/test/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfigurationIntegrationTest.java @@ -48,6 +48,7 @@ import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher; import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher; import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory; +import org.axonframework.extensions.kafka.eventhandling.producer.TopicResolver; import org.axonframework.extensions.kafka.eventhandling.tokenstore.KafkaTokenStore; import org.axonframework.monitoring.MessageMonitor; import org.axonframework.monitoring.NoOpMessageMonitor; @@ -92,6 +93,7 @@ void testAutoConfigurationWithoutProperties() { // Required bean assertions assertNotNull(context.getBeanNamesForType(KafkaMessageConverter.class)); assertNotNull(context.getBeanNamesForType(ProducerFactory.class)); + assertNotNull(context.getBeanNamesForType(TopicResolver.class)); assertNotNull(context.getBeanNamesForType(KafkaPublisher.class)); assertNotNull(context.getBeanNamesForType(KafkaEventPublisher.class)); assertNotNull(context.getBeanNamesForType(ConsumerFactory.class)); @@ -156,6 +158,7 @@ void testAutoConfigurationWithMinimalRequiredProperties() { // Required bean assertions assertNotNull(context.getBeanNamesForType(KafkaMessageConverter.class)); assertNotNull(context.getBeanNamesForType(ProducerFactory.class)); + assertNotNull(context.getBeanNamesForType(TopicResolver.class)); assertNotNull(context.getBeanNamesForType(KafkaPublisher.class)); assertNotNull(context.getBeanNamesForType(KafkaEventPublisher.class)); assertNotNull(context.getBeanNamesForType(ConsumerFactory.class)); @@ -217,6 +220,7 @@ void testAutoConfigurationWithPublishingDisabled() { // Required bean assertions assertNotNull(context.getBeanNamesForType(KafkaMessageConverter.class)); assertEquals(0, context.getBeanNamesForType(ProducerFactory.class).length); + assertEquals(0, context.getBeanNamesForType(TopicResolver.class).length); assertEquals(0, context.getBeanNamesForType(KafkaPublisher.class).length); assertEquals(0, context.getBeanNamesForType(KafkaEventPublisher.class).length); assertNotNull(context.getBeanNamesForType(ConsumerFactory.class)); @@ -262,6 +266,7 @@ void testAutoConfigurationWithFetchingDisabled() { // Required bean assertions assertNotNull(context.getBeanNamesForType(KafkaMessageConverter.class)); assertNotNull(context.getBeanNamesForType(ProducerFactory.class)); + assertNotNull(context.getBeanNamesForType(TopicResolver.class)); assertNotNull(context.getBeanNamesForType(KafkaPublisher.class)); assertNotNull(context.getBeanNamesForType(KafkaEventPublisher.class)); assertEquals(0, context.getBeanNamesForType(ConsumerFactory.class).length); @@ -301,6 +306,7 @@ void testAutoConfigurationWithPublishingAndFetchingDisabled() { // Required bean assertions assertEquals(0, context.getBeanNamesForType(KafkaMessageConverter.class).length); assertEquals(0, context.getBeanNamesForType(ProducerFactory.class).length); + assertEquals(0, context.getBeanNamesForType(TopicResolver.class).length); assertEquals(0, context.getBeanNamesForType(KafkaPublisher.class).length); assertEquals(0, context.getBeanNamesForType(KafkaEventPublisher.class).length); assertEquals(0, context.getBeanNamesForType(ConsumerFactory.class).length);