diff --git a/kobuka-client/pom.xml b/kobuka-client/pom.xml index 3b91097..f605a03 100644 --- a/kobuka-client/pom.xml +++ b/kobuka-client/pom.xml @@ -5,7 +5,7 @@ kobuka org.swisspush - 1.0.0 + 1.0.1 4.0.0 diff --git a/kobuka-gen/pom.xml b/kobuka-gen/pom.xml index f7f5a45..6d9355a 100644 --- a/kobuka-gen/pom.xml +++ b/kobuka-gen/pom.xml @@ -5,7 +5,7 @@ kobuka org.swisspush - 1.0.0 + 1.0.1 4.0.0 diff --git a/kobuka-spring/pom.xml b/kobuka-spring/pom.xml index d20a553..a783228 100644 --- a/kobuka-spring/pom.xml +++ b/kobuka-spring/pom.xml @@ -5,7 +5,7 @@ kobuka org.swisspush - 1.0.0 + 1.0.1 4.0.0 diff --git a/kobuka-spring/src/main/java/org/swisspush/kobuka/spring/internal/DefaultKafkaConsumerFactoryProvider.java b/kobuka-spring/src/main/java/org/swisspush/kobuka/spring/internal/DefaultKafkaConsumerFactoryProvider.java index 88eb46a..6eab099 100644 --- a/kobuka-spring/src/main/java/org/swisspush/kobuka/spring/internal/DefaultKafkaConsumerFactoryProvider.java +++ b/kobuka-spring/src/main/java/org/swisspush/kobuka/spring/internal/DefaultKafkaConsumerFactoryProvider.java @@ -1,8 +1,11 @@ package org.swisspush.kobuka.spring.internal; import net.karneim.pojobuilder.GeneratePojoBuilder; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.ConsumerPostProcessor; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import java.util.List; import java.util.Map; public class DefaultKafkaConsumerFactoryProvider { @@ -13,7 +16,17 @@ public class DefaultKafkaConsumerFactoryProvider { withGenerationGap = true, intoPackage = "org.swisspush.kobuka.spring" ) - public static DefaultKafkaConsumerFactory createDefaultKafkaConsumerFactory(Map configs) { - return new DefaultKafkaConsumerFactory(configs); + public static DefaultKafkaConsumerFactory + createDefaultKafkaConsumerFactory(Map configs, + List> withListeners, + List> withPostProcessors) { + DefaultKafkaConsumerFactory result = new DefaultKafkaConsumerFactory<>(configs); + if(withListeners != null) { + withListeners.forEach(result::addListener); + } + if(withPostProcessors != null) { + withPostProcessors.forEach(result::addPostProcessor); + } + return result; } } \ No newline at end of file diff --git a/kobuka-spring/src/main/java/org/swisspush/kobuka/spring/internal/DefaultKafkaProducerFactoryProvider.java b/kobuka-spring/src/main/java/org/swisspush/kobuka/spring/internal/DefaultKafkaProducerFactoryProvider.java index fd4effc..38d7518 100644 --- a/kobuka-spring/src/main/java/org/swisspush/kobuka/spring/internal/DefaultKafkaProducerFactoryProvider.java +++ b/kobuka-spring/src/main/java/org/swisspush/kobuka/spring/internal/DefaultKafkaProducerFactoryProvider.java @@ -2,7 +2,10 @@ import net.karneim.pojobuilder.GeneratePojoBuilder; import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.ProducerPostProcessor; +import java.util.List; import java.util.Map; public class DefaultKafkaProducerFactoryProvider { @@ -13,7 +16,17 @@ public class DefaultKafkaProducerFactoryProvider { withGenerationGap = true, intoPackage = "org.swisspush.kobuka.spring" ) - public static DefaultKafkaProducerFactory create(Map configs) { - return new DefaultKafkaProducerFactory(configs); + public static DefaultKafkaProducerFactory create(Map configs, + List> withListeners, + List> withPostProcessors) + { + DefaultKafkaProducerFactory result = new DefaultKafkaProducerFactory<>(configs); + if(withListeners != null) { + withListeners.forEach(result::addListener); + } + if(withPostProcessors != null) { + withPostProcessors.forEach(result::addPostProcessor); + } + return result; } } \ No newline at end of file diff --git a/pom.xml b/pom.xml index a0c6077..49d1b39 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.swisspush kobuka pom - 1.0.0 + 1.0.1