diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index 2f0f88e97a..3d6f39059d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -415,6 +415,10 @@ private Map> checkTopicsForConfigMismatches( if (topicOptional.isPresent() && topicOptional.get().configs() != null) { for (Map.Entry desiredConfigParameter : topicOptional.get().configs().entrySet()) { ConfigEntry actualConfigParameter = topicConfig.getValue().get(desiredConfigParameter.getKey()); + if (actualConfigParameter == null) { + throw new IllegalStateException("Topic property '" + desiredConfigParameter.getKey() + + "' does not exist"); + } if (!desiredConfigParameter.getValue().equals(actualConfigParameter.value())) { configMismatchesEntries.add(actualConfigParameter); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java index 1323b6272f..22219edfe8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java @@ -17,6 +17,7 @@ package org.springframework.kafka.core; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.awaitility.Awaitility.await; import java.lang.reflect.Method; @@ -176,6 +177,15 @@ public void testAddTopicsAndAddPartitions() throws Exception { && configResourceConfigMap.get(new ConfigResource(Type.TOPIC, "noConfigAddLater")) .get("retention.ms").value().equals("1000"); }); + + assertThatIllegalStateException().isThrownBy(() -> this.admin.createOrModifyTopics(mismatchconfig, + TopicBuilder.name("noConfigAddLater") + .partitions(2) + .replicas(1) + .config("no.such.config.key", "1000") + .build())) + .withMessageContaining("no.such.config.key"); + } @Test