From bac1e15b0075a3f8364b18aaa826b0802d69dac6 Mon Sep 17 00:00:00 2001 From: Alexey Podogov Date: Tue, 4 Jun 2024 11:32:12 +0300 Subject: [PATCH] CloudEventKafkaMessageConverter when converting to CloudEvent can ignore metadata keys that are not valid extension names --- .../CloudEventKafkaMessageConverter.java | 20 +++++++++- .../cloudevent/ExtensionUtils.java | 38 +++++++++++++------ .../CloudEventKafkaMessageConverterTest.java | 16 ++++++++ 3 files changed, 60 insertions(+), 14 deletions(-) diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverter.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverter.java index 3638f9fc..3aaa57d1 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverter.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverter.java @@ -92,6 +92,7 @@ public class CloudEventKafkaMessageConverter implements KafkaMessageConverter, Optional> subjectSupplier; private final Function, Optional> dataContentTypeSupplier; private final Function, Optional> dataSchemaSupplier; + private final boolean ignoreInvalidExtensionNames; /** * Instantiate a {@link CloudEventKafkaMessageConverter} based on the fields contained in the {@link Builder}. @@ -127,6 +128,7 @@ protected CloudEventKafkaMessageConverter(Builder builder) { this.subjectSupplier = builder.subjectSupplier; this.dataContentTypeSupplier = builder.dataContentTypeSupplier; this.dataSchemaSupplier = builder.dataSchemaSupplier; + this.ignoreInvalidExtensionNames = builder.ignoreInvalidExtensionNames; } /** @@ -177,7 +179,7 @@ private CloudEvent toCloudEvent(EventMessage message, SerializedObject> buildEventMessage(SerializedMessage * {@code sourceSupplier} is defaulted to the class of the message. The {@code subjectSupplier} is defaulted to * getting the subject from the 'cloud-event-subject' metadata. The {@code dataContentTypeSupplier} is defaulted to * getting the subject from the 'cloud-event-data-content-type' metadata. The {@code dataSchemaSupplier} is - * defaulted to getting the subject from the 'cloud-event-data-schema' metadata. + * defaulted to getting the subject from the 'cloud-event-data-schema' metadata. The + * {@code ignoreInvalidExtensionNames} is defaulted to not ignore extensions with invalid names thus leading + * to throwing an {@link InvalidMetaDataException} */ public static class Builder { @@ -314,6 +318,7 @@ public static class Builder { private Function, Optional> subjectSupplier = defaultSubjectSupplier(); private Function, Optional> dataContentTypeSupplier = defaultDataContentTypeSupplier(); private Function, Optional> dataSchemaSupplier = defaultDataSchemaSupplier(); + private boolean ignoreInvalidExtensionNames = false; /** * Creates a new map, to convert the two metadata properties used for tracing, which are incompatible with cloud @@ -449,6 +454,17 @@ public Builder dataSchemaSupplier(Function, Optional> dataS return this; } + /** + * if {@code true} than invalid extension names will not be added to {@link CloudEvent} message + * + * @param ignoreInvalidExtensionNames the ignore invalid extension names flag + * @return the current Builder instance, for fluent interfacing + */ + public Builder ignoreInvalidExtensionNames(boolean ignoreInvalidExtensionNames) { + this.ignoreInvalidExtensionNames = ignoreInvalidExtensionNames; + return this; + } + /** * Initializes a {@link CloudEventKafkaMessageConverter} as specified through this Builder. * diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/ExtensionUtils.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/ExtensionUtils.java index 1527ce76..9868ebe6 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/ExtensionUtils.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/ExtensionUtils.java @@ -79,17 +79,20 @@ private ExtensionUtils() { * Adds extension values to the {@link CloudEvent} based on the {@link EventMessage} and {@link SerializedObject} * using the {@code extensionNameResolver} map to resolve the extension names. * - * @param builder a {@link CloudEventBuilder} to add the values to - * @param message an {@link EventMessage} that might contain information that needs to be added as - * extension - * @param serializedObject a {@link SerializedObject} that might contain a revision - * @param extensionNameResolver a {@link Map} used to convert metadata keys to extension names + * @param builder a {@link CloudEventBuilder} to add the values to + * @param message an {@link EventMessage} that might contain information that needs to be added as + * extension + * @param serializedObject a {@link SerializedObject} that might contain a revision + * @param extensionNameResolver a {@link Map} used to convert metadata keys to extension names + * @param ignoreInvalidExtensionNames if {@code true} than invalid extension names will not be added to + * {@link CloudEvent} message */ public static void setExtensions( CloudEventBuilder builder, EventMessage message, SerializedObject serializedObject, - Map extensionNameResolver + Map extensionNameResolver, + boolean ignoreInvalidExtensionNames ) { if (!isNull(serializedObject.getType().getRevision())) { builder.withExtension(MESSAGE_REVISION, serializedObject.getType().getRevision()); @@ -105,7 +108,8 @@ public static void setExtensions( .filter(reservedMetadataFilter()) .forEach(entry -> setExtension(builder, resolveExtensionName(entry.getKey(), extensionNameResolver), - entry.getValue())); + entry.getValue(), + ignoreInvalidExtensionNames)); } /** @@ -145,7 +149,12 @@ private static boolean isNonMetadataExtension(String extensionName) { return NON_METADATA_EXTENSIONS.contains(extensionName); } - private static void setExtension(CloudEventBuilder builder, String extensionName, Object value) { + private static void setExtension( + CloudEventBuilder builder, + String extensionName, + Object value, + boolean ignoreInvalidExtensionNames + ) { if (isNonMetadataExtension(extensionName)) { throw new InvalidMetaDataException( String.format("Metadata property '%s' is already reserved to be used for Axon", @@ -153,10 +162,15 @@ private static void setExtension(CloudEventBuilder builder, String extensionName ); } if (!isValidExtensionName(extensionName)) { - throw new InvalidMetaDataException( - String.format("Metadata property '%s' is not a valid extension name", - extensionName) - ); + if (ignoreInvalidExtensionNames) { + logger.warn("Metadata property: '{}' is not a valid extension name and will be ignored", extensionName); + return; + } else { + throw new InvalidMetaDataException( + String.format("Metadata property '%s' is not a valid extension name", + extensionName) + ); + } } if (value instanceof String) { builder.withExtension(extensionName, (String) value); diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverterTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverterTest.java index aa3aa441..cdf1ca4a 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverterTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/cloudevent/CloudEventKafkaMessageConverterTest.java @@ -289,6 +289,22 @@ void whenMetadataContainsWrongName_thenThrowAnError() { assertThrows(InvalidMetaDataException.class, () -> testSubject.createKafkaMessage(eventMessage, SOME_TOPIC)); } + @Test + void whenMetadataContainsWrongName_thenIgnoreIt() { + testSubject = CloudEventKafkaMessageConverter.builder() + .serializer(serializer) + .ignoreInvalidExtensionNames(true) + .build(); + + EventMessage eventMessage = + asEventMessage("SomePayload").withMetaData(MetaData.with("_KEY", "value").and("key", "foo")); + + ProducerRecord senderMessage = testSubject.createKafkaMessage(eventMessage, SOME_TOPIC); + + assertEquals("foo", senderMessage.value().getExtension("key")); + assertNull(senderMessage.value().getExtension("_KEY")); + } + @Test void whenMetadataContainsUnsupportedValue_thenThrowAnError() { EventMessage eventMessage = asEventMessage("SomePayload")