Skip to content

Commit

Permalink
CloudEventKafkaMessageConverter when converting to CloudEvent can ign…
Browse files Browse the repository at this point in the history
…ore metadata keys that are not valid extension names
  • Loading branch information
aupodogov committed Jun 4, 2024
1 parent b0ff036 commit bac1e15
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class CloudEventKafkaMessageConverter implements KafkaMessageConverter<St
private final Function<EventMessage<?>, Optional<String>> subjectSupplier;
private final Function<EventMessage<?>, Optional<String>> dataContentTypeSupplier;
private final Function<EventMessage<?>, Optional<URI>> dataSchemaSupplier;
private final boolean ignoreInvalidExtensionNames;

/**
* Instantiate a {@link CloudEventKafkaMessageConverter} based on the fields contained in the {@link Builder}.
Expand Down Expand Up @@ -127,6 +128,7 @@ protected CloudEventKafkaMessageConverter(Builder builder) {
this.subjectSupplier = builder.subjectSupplier;
this.dataContentTypeSupplier = builder.dataContentTypeSupplier;
this.dataSchemaSupplier = builder.dataSchemaSupplier;
this.ignoreInvalidExtensionNames = builder.ignoreInvalidExtensionNames;
}

/**
Expand Down Expand Up @@ -177,7 +179,7 @@ private CloudEvent toCloudEvent(EventMessage<?> message, SerializedObject<byte[]
builder.withSource(sourceSupplier.apply(message));
builder.withType(serializedObject.getType().getName());
builder.withTime(message.getTimestamp().atOffset(ZoneOffset.UTC));
setExtensions(builder, message, serializedObject, extensionNameResolver);
setExtensions(builder, message, serializedObject, extensionNameResolver, ignoreInvalidExtensionNames);
return builder.build();
}

Expand Down Expand Up @@ -302,7 +304,9 @@ private static Optional<EventMessage<?>> buildEventMessage(SerializedMessage<?>
* {@code sourceSupplier} is defaulted to the class of the message. The {@code subjectSupplier} is defaulted to
* getting the subject from the 'cloud-event-subject' metadata. The {@code dataContentTypeSupplier} is defaulted to
* getting the subject from the 'cloud-event-data-content-type' metadata. The {@code dataSchemaSupplier} is
* defaulted to getting the subject from the 'cloud-event-data-schema' metadata.
* defaulted to getting the subject from the 'cloud-event-data-schema' metadata. The
* {@code ignoreInvalidExtensionNames} is defaulted to not ignore extensions with invalid names thus leading
* to throwing an {@link InvalidMetaDataException}
*/
public static class Builder {

Expand All @@ -314,6 +318,7 @@ public static class Builder {
private Function<EventMessage<?>, Optional<String>> subjectSupplier = defaultSubjectSupplier();
private Function<EventMessage<?>, Optional<String>> dataContentTypeSupplier = defaultDataContentTypeSupplier();
private Function<EventMessage<?>, Optional<URI>> dataSchemaSupplier = defaultDataSchemaSupplier();
private boolean ignoreInvalidExtensionNames = false;

/**
* Creates a new map, to convert the two metadata properties used for tracing, which are incompatible with cloud
Expand Down Expand Up @@ -449,6 +454,17 @@ public Builder dataSchemaSupplier(Function<EventMessage<?>, Optional<URI>> dataS
return this;
}

/**
* if {@code true} than invalid extension names will not be added to {@link CloudEvent} message
*
* @param ignoreInvalidExtensionNames the ignore invalid extension names flag
* @return the current Builder instance, for fluent interfacing
*/
public Builder ignoreInvalidExtensionNames(boolean ignoreInvalidExtensionNames) {
this.ignoreInvalidExtensionNames = ignoreInvalidExtensionNames;
return this;
}

/**
* Initializes a {@link CloudEventKafkaMessageConverter} as specified through this Builder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,20 @@ private ExtensionUtils() {
* Adds extension values to the {@link CloudEvent} based on the {@link EventMessage} and {@link SerializedObject}
* using the {@code extensionNameResolver} map to resolve the extension names.
*
* @param builder a {@link CloudEventBuilder} to add the values to
* @param message an {@link EventMessage} that might contain information that needs to be added as
* extension
* @param serializedObject a {@link SerializedObject} that might contain a revision
* @param extensionNameResolver a {@link Map} used to convert metadata keys to extension names
* @param builder a {@link CloudEventBuilder} to add the values to
* @param message an {@link EventMessage} that might contain information that needs to be added as
* extension
* @param serializedObject a {@link SerializedObject} that might contain a revision
* @param extensionNameResolver a {@link Map} used to convert metadata keys to extension names
* @param ignoreInvalidExtensionNames if {@code true} than invalid extension names will not be added to
* {@link CloudEvent} message
*/
public static void setExtensions(
CloudEventBuilder builder,
EventMessage<?> message,
SerializedObject<byte[]> serializedObject,
Map<String, String> extensionNameResolver
Map<String, String> extensionNameResolver,
boolean ignoreInvalidExtensionNames
) {
if (!isNull(serializedObject.getType().getRevision())) {
builder.withExtension(MESSAGE_REVISION, serializedObject.getType().getRevision());
Expand All @@ -105,7 +108,8 @@ public static void setExtensions(
.filter(reservedMetadataFilter())
.forEach(entry -> setExtension(builder,
resolveExtensionName(entry.getKey(), extensionNameResolver),
entry.getValue()));
entry.getValue(),
ignoreInvalidExtensionNames));
}

/**
Expand Down Expand Up @@ -145,18 +149,28 @@ private static boolean isNonMetadataExtension(String extensionName) {
return NON_METADATA_EXTENSIONS.contains(extensionName);
}

private static void setExtension(CloudEventBuilder builder, String extensionName, Object value) {
private static void setExtension(
CloudEventBuilder builder,
String extensionName,
Object value,
boolean ignoreInvalidExtensionNames
) {
if (isNonMetadataExtension(extensionName)) {
throw new InvalidMetaDataException(
String.format("Metadata property '%s' is already reserved to be used for Axon",
extensionName)
);
}
if (!isValidExtensionName(extensionName)) {
throw new InvalidMetaDataException(
String.format("Metadata property '%s' is not a valid extension name",
extensionName)
);
if (ignoreInvalidExtensionNames) {
logger.warn("Metadata property: '{}' is not a valid extension name and will be ignored", extensionName);
return;
} else {
throw new InvalidMetaDataException(
String.format("Metadata property '%s' is not a valid extension name",
extensionName)
);
}
}
if (value instanceof String) {
builder.withExtension(extensionName, (String) value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,22 @@ void whenMetadataContainsWrongName_thenThrowAnError() {
assertThrows(InvalidMetaDataException.class, () -> testSubject.createKafkaMessage(eventMessage, SOME_TOPIC));
}

@Test
void whenMetadataContainsWrongName_thenIgnoreIt() {
testSubject = CloudEventKafkaMessageConverter.builder()
.serializer(serializer)
.ignoreInvalidExtensionNames(true)
.build();

EventMessage<Object> eventMessage =
asEventMessage("SomePayload").withMetaData(MetaData.with("_KEY", "value").and("key", "foo"));

ProducerRecord<String, CloudEvent> senderMessage = testSubject.createKafkaMessage(eventMessage, SOME_TOPIC);

assertEquals("foo", senderMessage.value().getExtension("key"));
assertNull(senderMessage.value().getExtension("_KEY"));
}

@Test
void whenMetadataContainsUnsupportedValue_thenThrowAnError() {
EventMessage<Object> eventMessage = asEventMessage("SomePayload")
Expand Down

0 comments on commit bac1e15

Please sign in to comment.