From ca88243167044909528d0801c457a9ee5aa39045 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 26 Oct 2023 14:59:29 -0400 Subject: [PATCH] GH-2861: Add Validator to ErrorHandlingDeserializer Resolves https://github.com/spring-projects/spring-kafka/issues/2861 --- .../kafka/receiving-messages/validation.adoc | 2 + .../modules/ROOT/pages/kafka/serdes.adoc | 18 ++++-- .../antora/modules/ROOT/pages/whats-new.adoc | 7 +++ .../KafkaListenerEndpointRegistrar.java | 6 +- .../serializer/ErrorHandlingDeserializer.java | 52 +++++++++++++++-- .../ErrorHandlingDeserializerTests.java | 58 ++++++++++++++++++- 6 files changed, 128 insertions(+), 15 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/validation.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/validation.adoc index 22dd7dea61..5b1f70c0b5 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/validation.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/validation.adoc @@ -80,3 +80,5 @@ public KafkaListenerErrorHandler validationErrorHandler() { Starting with version 2.5.11, validation now works on payloads for `@KafkaHandler` methods in a class-level listener. See xref:kafka/receiving-messages/class-level-kafkalistener.adoc[`@KafkaListener` on a Class]. +Starting with version 3.1, you can perform validation in an `ErrorHandlingDeserializer` instead. +See xref:../serdes.adoc#error-handling-deserializer[Using `ErrorHandlingDeserializer`] for more information. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc index b0d8bda7ed..50f274986c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc @@ -555,11 +555,11 @@ The following example uses a `failedDeserializationFunction`. [source, java] ---- -public class BadFoo extends Foo { +public class BadThing extends Thing { private final FailedDeserializationInfo failedDeserializationInfo; - public BadFoo(FailedDeserializationInfo failedDeserializationInfo) { + public BadThing(FailedDeserializationInfo failedDeserializationInfo) { this.failedDeserializationInfo = failedDeserializationInfo; } @@ -569,11 +569,11 @@ public class BadFoo extends Foo { } -public class FailedFooProvider implements Function { +public class FailedThingProvider implements Function { @Override - public Foo apply(FailedDeserializationInfo info) { - return new BadFoo(info); + public Thing apply(FailedDeserializationInfo info) { + return new BadThing(info); } } @@ -586,7 +586,7 @@ The preceding example uses the following configuration: ... consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); -consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class); +consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class); ... ---- @@ -662,6 +662,12 @@ void listen(List> in) { IMPORTANT: If you are also using a `DeadLetterPublishingRecoverer`, the record published for a `DeserializationException` will have a `record.value()` of type `byte[]`; this should not be serialized. Consider using a `DelegatingByTypeSerializer` configured to use a `ByteArraySerializer` for `byte[]` and the normal serializer (Json, Avro, etc) for all other types. +Starting with version 3.1, you can add a `Validator` to the `ErrorHandlingDeserializer`. +If the delegate `Deserializer` successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring. +This allows the original raw data to be passed to the error handler. +WHen creating the deserializer yourself, simply call `setValidator`; if you configure the serializer using properties, set the consumer configuration property `ErrorHandlingDeserializer.VALIDATOR_CLASS` to the class or fully qualified class name for your `Validator`. +When using Spring Boot, this property name is `spring.deserializer.validator.class`. + [[payload-conversion-with-batch]] == Payload Conversion with Batch Listeners diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 024f30365f..1bba82b2aa 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -31,3 +31,10 @@ When not used with an `ErrorHandlingDeserializer`, the `KafkaConsumer` will cont Post processing can be applied on a listener container by specifying the bean name of a `ContainerPostProcessor` on the `@KafkaListener` annotation. This occurs after the container has been created and after any configured `ContainerCustomizer` configured on the container factory. See xref:kafka/container-factory.adoc[Container Factory] for more information. + +[[x31-ehd]] +=== ErrorHandlingDeserializer + +You can now add a `Validator` to this deserializer; if the delegate `Deserializer` successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring. +This allows the original raw data to be passed to the error handler. +See xref:kafka/serdes.adoc#error-handling-deserializer[Using `ErrorHandlingDeserializer`] for more information. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java index 9166eb4231..4bd3d0fb1e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2021 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -189,9 +189,9 @@ public void afterPropertiesSet() { protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) { for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { - if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint + if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint mmkle && this.validator != null) { - ((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator); + mmkle.setValidator(this.validator); } this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer.java index 43b6edb52e..cda8068b4c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.springframework.util.Assert; import org.springframework.util.ClassUtils; +import org.springframework.validation.Validator; /** * Delegating key/value deserializer that catches exceptions, returning them @@ -60,12 +61,19 @@ public class ErrorHandlingDeserializer implements Deserializer { */ public static final String VALUE_DESERIALIZER_CLASS = "spring.deserializer.value.delegate.class"; + /** + * Property name for the validator. + */ + public static final String VALIDATOR_CLASS = "spring.deserializer.validator.class"; + private Deserializer delegate; private boolean isForKey; private Function failedDeserializationFunction; + private Validator validator; + public ErrorHandlingDeserializer() { } @@ -108,6 +116,18 @@ public ErrorHandlingDeserializer keyDeserializer(boolean isKey) { return this; } + /** + * Set a validator to validate the object after successful deserialization. If the + * validator throws an exception, or returns an + * {@link org.springframework.validation.Errors} with validation failures, the raw + * data will be available in any configured error handler. + * @param validator the validator to set + * @since 3.1 + */ + public void setValidator(Validator validator) { + this.validator = validator; + } + @Override public void configure(Map configs, boolean isKey) { if (this.delegate == null) { @@ -119,6 +139,7 @@ public void configure(Map configs, boolean isKey) { if (this.failedDeserializationFunction == null) { setupFunction(configs, isKey ? KEY_FUNCTION : VALUE_FUNCTION); } + setupValidator(configs); } public void setupDelegate(Map configs, String configKey) { @@ -145,7 +166,7 @@ private void setupFunction(Map configs, String configKey) { if (configs.containsKey(configKey)) { try { Object value = configs.get(configKey); - Class clazz = value instanceof Class ? (Class) value : ClassUtils.forName((String) value, null); + Class clazz = value instanceof Class cls ? cls : ClassUtils.forName((String) value, null); Assert.isTrue(Function.class.isAssignableFrom(clazz), "'function' must be a 'Function ', not a " + clazz.getName()); this.failedDeserializationFunction = (Function) @@ -157,10 +178,25 @@ private void setupFunction(Map configs, String configKey) { } } + private void setupValidator(Map configs) { + if (configs.containsKey(VALIDATOR_CLASS)) { + try { + Object value = configs.get(VALIDATOR_CLASS); + Class clazz = value instanceof Class cls ? cls : ClassUtils.forName((String) value, null); + Object instance = clazz.getDeclaredConstructor().newInstance(); + Assert.isInstanceOf(Validator.class, instance, "'validator' must be a 'Validator', not a "); + this.validator = (Validator) instance; + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + } + @Override public T deserialize(String topic, byte[] data) { try { - return this.delegate.deserialize(topic, data); + return validate(this.delegate.deserialize(topic, data)); } catch (Exception e) { return recoverFromSupplier(topic, null, data, e); @@ -176,7 +212,7 @@ public T deserialize(String topic, Headers headers, byte[] data) { else { headers.remove(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER); } - return this.delegate.deserialize(topic, headers, data); + return validate(this.delegate.deserialize(topic, headers, data)); } catch (Exception e) { SerializationUtils.deserializationException(headers, data, e, this.isForKey); @@ -184,6 +220,14 @@ public T deserialize(String topic, Headers headers, byte[] data) { } } + private T validate(T deserialized) { + if (this.validator == null || !this.validator.supports(deserialized.getClass())) { + return deserialized; + } + this.validator.validateObject(deserialized).failOnError(IllegalStateException::new); + return deserialized; + } + private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) { if (this.failedDeserializationFunction != null) { FailedDeserializationInfo failedDeserializationInfo = diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java index 558bcac1e5..876c76c94a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -56,6 +57,8 @@ import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.validation.Errors; +import org.springframework.validation.Validator; /** * @author Gary Russell @@ -72,7 +75,7 @@ public class ErrorHandlingDeserializerTests { public Config config; @Test - public void testBadDeserializer() throws Exception { + void testBadDeserializer() throws Exception { this.config.template().send(TOPIC, "foo", "bar"); this.config.template().send(TOPIC, "fail", "bar"); this.config.template().send(TOPIC, "foo", "fail"); @@ -84,7 +87,7 @@ public void testBadDeserializer() throws Exception { } @Test - public void unitTests() throws Exception { + void unitTests() throws Exception { ErrorHandlingDeserializer ehd = new ErrorHandlingDeserializer<>(new StringDeserializer()); assertThat(ehd.deserialize("topic", "foo".getBytes())).isEqualTo("foo"); ehd.close(); @@ -137,6 +140,41 @@ public String deserialize(String topic, Headers headers, byte[] data) { .contains("original exception message"); } + @Test + void validate() { + ErrorHandlingDeserializer ehd = new ErrorHandlingDeserializer<>(new StringDeserializer()); + ehd.configure(Map.of(ErrorHandlingDeserializer.VALIDATOR_CLASS, Val.class.getName()), false); + + Headers headers = new RecordHeaders(); + assertThat(ehd.deserialize("foo", headers, "foo".getBytes())).isEqualTo("foo"); + ehd.deserialize("foo", headers, "bar".getBytes()); + DeserializationException ex = SerializationUtils.byteArrayToDeserializationException(null, + headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)); + assertThat(ex.getCause()).isInstanceOf(IllegalStateException.class) + .extracting("message", InstanceOfAssertFactories.STRING) + .contains("validation failure"); + + ehd.setValidator(new Validator() { + + @Override + public void validate(Object target, Errors errors) { + throw new IllegalArgumentException("test validation"); + } + + @Override + public boolean supports(Class clazz) { + return clazz.equals(String.class); + } + + }); + ehd.deserialize("foo", headers, "baz".getBytes()); + ex = SerializationUtils.byteArrayToDeserializationException(null, + headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)); + assertThat(ex.getCause()).isInstanceOf(IllegalArgumentException.class) + .extracting("message") + .isEqualTo("test validation"); + } + @Configuration @EnableKafka public static class Config { @@ -287,4 +325,20 @@ public static class Foo { } + public static class Val implements Validator { + + @Override + public void validate(Object target, Errors errors) { + if ("bar".equals(target)) { + errors.reject("validation failure"); + } + } + + @Override + public boolean supports(Class clazz) { + return clazz.equals(String.class); + } + + } + }