Skip to content

Commit

Permalink
Autoconfigured TopicResolver bean with ability to replace it by user …
Browse files Browse the repository at this point in the history
…configuration
  • Loading branch information
aupodogov committed Sep 6, 2023
1 parent c662ef7 commit 39e10f1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.producer.TopicResolver;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.KafkaTokenStore;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
Expand Down Expand Up @@ -152,6 +153,14 @@ private boolean isNonEmptyString(String s) {
return s != null && !s.equals("");
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class})
@ConditionalOnProperty(name = "axon.kafka.publisher.enabled", havingValue = "true", matchIfMissing = true)
public TopicResolver topicResolver() {
return m -> Optional.of(properties.getDefaultTopic());
}

@ConditionalOnMissingBean
@Bean(destroyMethod = "shutDown")
@ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class})
Expand All @@ -161,13 +170,14 @@ private boolean isNonEmptyString(String s) {
@Qualifier("eventSerializer") Serializer eventSerializer,
ProducerFactory<K, V> axonKafkaProducerFactory,
KafkaMessageConverter<K, V> kafkaMessageConverter,
org.axonframework.config.Configuration configuration) {
org.axonframework.config.Configuration configuration,
TopicResolver topicResolver) {
return KafkaPublisher.<K, V>builder()
.serializer(eventSerializer)
.producerFactory(axonKafkaProducerFactory)
.messageConverter(kafkaMessageConverter)
.messageMonitor(configuration.messageMonitor(KafkaPublisher.class, "kafkaPublisher"))
.topicResolver(m -> Optional.of(properties.getDefaultTopic()))
.topicResolver(topicResolver)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.producer.TopicResolver;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.KafkaTokenStore;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
Expand Down Expand Up @@ -92,6 +93,7 @@ void testAutoConfigurationWithoutProperties() {
// Required bean assertions
assertNotNull(context.getBeanNamesForType(KafkaMessageConverter.class));
assertNotNull(context.getBeanNamesForType(ProducerFactory.class));
assertNotNull(context.getBeanNamesForType(TopicResolver.class));
assertNotNull(context.getBeanNamesForType(KafkaPublisher.class));
assertNotNull(context.getBeanNamesForType(KafkaEventPublisher.class));
assertNotNull(context.getBeanNamesForType(ConsumerFactory.class));
Expand Down Expand Up @@ -156,6 +158,7 @@ void testAutoConfigurationWithMinimalRequiredProperties() {
// Required bean assertions
assertNotNull(context.getBeanNamesForType(KafkaMessageConverter.class));
assertNotNull(context.getBeanNamesForType(ProducerFactory.class));
assertNotNull(context.getBeanNamesForType(TopicResolver.class));
assertNotNull(context.getBeanNamesForType(KafkaPublisher.class));
assertNotNull(context.getBeanNamesForType(KafkaEventPublisher.class));
assertNotNull(context.getBeanNamesForType(ConsumerFactory.class));
Expand Down Expand Up @@ -217,6 +220,7 @@ void testAutoConfigurationWithPublishingDisabled() {
// Required bean assertions
assertNotNull(context.getBeanNamesForType(KafkaMessageConverter.class));
assertEquals(0, context.getBeanNamesForType(ProducerFactory.class).length);
assertEquals(0, context.getBeanNamesForType(TopicResolver.class).length);
assertEquals(0, context.getBeanNamesForType(KafkaPublisher.class).length);
assertEquals(0, context.getBeanNamesForType(KafkaEventPublisher.class).length);
assertNotNull(context.getBeanNamesForType(ConsumerFactory.class));
Expand Down Expand Up @@ -262,6 +266,7 @@ void testAutoConfigurationWithFetchingDisabled() {
// Required bean assertions
assertNotNull(context.getBeanNamesForType(KafkaMessageConverter.class));
assertNotNull(context.getBeanNamesForType(ProducerFactory.class));
assertNotNull(context.getBeanNamesForType(TopicResolver.class));
assertNotNull(context.getBeanNamesForType(KafkaPublisher.class));
assertNotNull(context.getBeanNamesForType(KafkaEventPublisher.class));
assertEquals(0, context.getBeanNamesForType(ConsumerFactory.class).length);
Expand Down Expand Up @@ -301,6 +306,7 @@ void testAutoConfigurationWithPublishingAndFetchingDisabled() {
// Required bean assertions
assertEquals(0, context.getBeanNamesForType(KafkaMessageConverter.class).length);
assertEquals(0, context.getBeanNamesForType(ProducerFactory.class).length);
assertEquals(0, context.getBeanNamesForType(TopicResolver.class).length);
assertEquals(0, context.getBeanNamesForType(KafkaPublisher.class).length);
assertEquals(0, context.getBeanNamesForType(KafkaEventPublisher.class).length);
assertEquals(0, context.getBeanNamesForType(ConsumerFactory.class).length);
Expand Down

0 comments on commit 39e10f1

Please sign in to comment.