diff --git a/gateleen-cache/pom.xml b/gateleen-cache/pom.xml
index a385fb39c..d72409530 100644
--- a/gateleen-cache/pom.xml
+++ b/gateleen-cache/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-cache
diff --git a/gateleen-core/pom.xml b/gateleen-core/pom.xml
index b59a1f8e4..aaeb29946 100644
--- a/gateleen-core/pom.xml
+++ b/gateleen-core/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-core
diff --git a/gateleen-delegate/pom.xml b/gateleen-delegate/pom.xml
index 37c1e194d..a113dbdae 100644
--- a/gateleen-delegate/pom.xml
+++ b/gateleen-delegate/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-delegate
diff --git a/gateleen-delta/pom.xml b/gateleen-delta/pom.xml
index 92918b5d7..6f2277501 100644
--- a/gateleen-delta/pom.xml
+++ b/gateleen-delta/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-delta
diff --git a/gateleen-expansion/pom.xml b/gateleen-expansion/pom.xml
index 204581740..049b866d0 100644
--- a/gateleen-expansion/pom.xml
+++ b/gateleen-expansion/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-expansion
diff --git a/gateleen-hook-js/pom.xml b/gateleen-hook-js/pom.xml
index e522ca7e6..a89915a28 100644
--- a/gateleen-hook-js/pom.xml
+++ b/gateleen-hook-js/pom.xml
@@ -4,7 +4,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-hook-js
jar
diff --git a/gateleen-hook/pom.xml b/gateleen-hook/pom.xml
index b16a8b842..e2834be6c 100644
--- a/gateleen-hook/pom.xml
+++ b/gateleen-hook/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-hook
diff --git a/gateleen-kafka/README_kafka.md b/gateleen-kafka/README_kafka.md
index e67094acf..9ce9febd7 100644
--- a/gateleen-kafka/README_kafka.md
+++ b/gateleen-kafka/README_kafka.md
@@ -22,18 +22,16 @@ The following example shows a topic configuration resource with configurations f
bottom. The first topic name matching the provided topic is used.
```json
{
- "topics": {
- "my.topic.*": {
- "bootstrap.servers": "localhost:9092",
- "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
- "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
- },
- ".+": {
- "bootstrap.servers": "localhost:9093",
- "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
- "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
- }
- }
+ "my.topic.*": {
+ "bootstrap.servers": "localhost:9092",
+ "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
+ "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
+ },
+ ".+": {
+ "bootstrap.servers": "localhost:9093",
+ "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
+ "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
+ }
}
```
In the example above, a topic called `my.topic.abc` would use the first configuration entry. A topic called `some.other.topic` would use the second configuration entry.
diff --git a/gateleen-kafka/pom.xml b/gateleen-kafka/pom.xml
index 2862128ea..de1cb4931 100644
--- a/gateleen-kafka/pom.xml
+++ b/gateleen-kafka/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-kafka
diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java
index 432bd87f5..134e31d1c 100644
--- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java
+++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java
@@ -14,18 +14,16 @@
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil;
import org.swisspush.gateleen.core.util.StatusCode;
-import org.swisspush.gateleen.core.util.StringUtils;
+import org.swisspush.gateleen.core.validation.ValidationResult;
+import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.ValidationException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
/**
* Handler class for all Kafka related requests.
*
@@ -47,25 +45,36 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final KafkaTopicExtractor topicExtractor;
private final KafkaMessageSender kafkaMessageSender;
private final Map properties;
+ private KafkaMessageValidator kafkaMessageValidator;
private boolean initialized = false;
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
- super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
- this.repository = repository;
- this.kafkaMessageSender = kafkaMessageSender;
- this.streamingPath = streamingPath;
- this.properties = new HashMap<>();
+ this(configurationResourceManager, null, repository, kafkaMessageSender,
+ configResourceUri, streamingPath);
+ }
- this.topicExtractor = new KafkaTopicExtractor(streamingPath);
+ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
+ KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
+ String streamingPath) {
+ this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
+ configResourceUri, streamingPath, new HashMap<>());
}
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) {
+ this(configurationResourceManager, null, repository, kafkaMessageSender,
+ configResourceUri, streamingPath, properties);
+ }
+
+ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
+ KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) {
+
super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
this.repository = repository;
+ this.kafkaMessageValidator = kafkaMessageValidator;
this.kafkaMessageSender = kafkaMessageSender;
this.streamingPath = streamingPath;
this.properties = properties;
@@ -114,14 +123,14 @@ public boolean handle(final HttpServerRequest request) {
}
final Optional optTopic = topicExtractor.extractTopic(request);
- if(!optTopic.isPresent()){
+ if(optTopic.isEmpty()){
respondWith(StatusCode.BAD_REQUEST, "Could not extract topic from request uri", request);
return true;
}
String topic = optTopic.get();
final Optional, Pattern>> optProducer = repository.findMatchingKafkaProducer(topic);
- if(!optProducer.isPresent()){
+ if(optProducer.isEmpty()){
respondWith(StatusCode.NOT_FOUND, "Could not find a matching producer for topic " + topic, request);
return true;
}
@@ -130,13 +139,23 @@ public boolean handle(final HttpServerRequest request) {
try {
log.debug("incoming kafka message payload: {}", payload.toString());
final List> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload);
- kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).setHandler(event -> {
- if(event.succeeded()) {
- RequestLoggerFactory.getLogger(KafkaHandler.class, request)
- .info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic);
- respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
+ maybeValidate(request, kafkaProducerRecords).setHandler(validationEvent -> {
+ if(validationEvent.succeeded()) {
+ if(validationEvent.result().isSuccess()) {
+ kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).setHandler(event -> {
+ if(event.succeeded()) {
+ RequestLoggerFactory.getLogger(KafkaHandler.class, request)
+ .info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic);
+ respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
+ } else {
+ respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
+ }
+ });
+ } else {
+ respondWith(StatusCode.BAD_REQUEST, validationEvent.result().getMessage(), request);
+ }
} else {
- respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
+ respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request);
}
});
} catch (ValidationException ve){
@@ -164,6 +183,13 @@ public void resourceRemoved(String resourceUri) {
}
}
+ private Future maybeValidate(HttpServerRequest request, List> kafkaProducerRecords) {
+ if(kafkaMessageValidator != null) {
+ return kafkaMessageValidator.validateMessages(request, kafkaProducerRecords);
+ }
+ return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
+ }
+
private void respondWith(StatusCode statusCode, String responseMessage, HttpServerRequest request) {
ResponseStatusCodeLogUtil.info(request, statusCode, KafkaHandler.class);
if(statusCode != StatusCode.OK) {
diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java
new file mode 100644
index 000000000..475e5c631
--- /dev/null
+++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java
@@ -0,0 +1,66 @@
+package org.swisspush.gateleen.kafka;
+
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.kafka.client.producer.KafkaProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.swisspush.gateleen.core.validation.ValidationResult;
+import org.swisspush.gateleen.core.validation.ValidationStatus;
+import org.swisspush.gateleen.validation.SchemaLocation;
+import org.swisspush.gateleen.validation.ValidationResourceManager;
+import org.swisspush.gateleen.validation.ValidationUtil;
+import org.swisspush.gateleen.validation.Validator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.stream.Collectors.toList;
+
+public class KafkaMessageValidator {
+
+ private final ValidationResourceManager validationResourceManager;
+ private final Validator validator;
+ private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);
+
+ public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) {
+ this.validationResourceManager = validationResourceManager;
+ this.validator = validator;
+ }
+
+ public Future validateMessages(HttpServerRequest request, List> kafkaProducerRecords) {
+ if (kafkaProducerRecords.isEmpty()) {
+ return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
+ }
+
+ Map entry = ValidationUtil.matchingValidationResourceEntry(validationResourceManager.getValidationResource(), request, log);
+ if (entry == null) {
+ return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
+ }
+
+ Optional optionalSchemaLocation = ValidationUtil.matchingSchemaLocation(validationResourceManager.getValidationResource(), request, log);
+ if (optionalSchemaLocation.isEmpty()) {
+ log.warn("No schema location found for {}. Could not validate kafka message", request.uri());
+ return Future.succeededFuture(new ValidationResult(ValidationStatus.COULD_NOT_VALIDATE));
+ }
+
+ SchemaLocation schemaLocation = optionalSchemaLocation.get();
+
+ @SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
+ List futures = kafkaProducerRecords.stream()
+ .map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log))
+ .collect(toList());
+
+ return CompositeFuture.all(futures).compose(compositeFuture -> {
+ for (Object o : compositeFuture.list()) {
+ if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) {
+ return Future.succeededFuture((ValidationResult) o);
+ }
+ }
+ return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
+ });
+ }
+}
diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java
index 852c196c0..41a664ada 100644
--- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java
+++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java
@@ -24,6 +24,8 @@
import org.swisspush.gateleen.core.storage.MockResourceStorage;
import org.swisspush.gateleen.core.util.ResourcesUtils;
import org.swisspush.gateleen.core.util.StatusCode;
+import org.swisspush.gateleen.core.validation.ValidationResult;
+import org.swisspush.gateleen.core.validation.ValidationStatus;
import java.util.HashMap;
import java.util.List;
@@ -48,6 +50,7 @@ public class KafkaHandlerTest {
private Vertx vertx;
private KafkaProducerRepository repository;
private KafkaMessageSender kafkaMessageSender;
+ private KafkaMessageValidator messageValidator;
private ConfigurationResourceManager configurationResourceManager;
private KafkaHandler handler;
private MockResourceStorage storage;
@@ -63,6 +66,7 @@ public void setUp() {
vertx = Vertx.vertx();
repository = Mockito.spy(new KafkaProducerRepository(vertx));
kafkaMessageSender = Mockito.mock(KafkaMessageSender.class);
+ messageValidator = Mockito.mock(KafkaMessageValidator.class);
storage = new MockResourceStorage();
configurationResourceManager = new ConfigurationResourceManager(vertx, storage);
handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender,
@@ -429,54 +433,101 @@ public void handleValidPayloadWithFailingMessageSending(TestContext context){
});
}
- static class StreamingRequest extends DummyHttpServerRequest {
- private final String uri;
- private final HttpMethod method;
- private final String body;
- private final MultiMap headers;
- private final HttpServerResponse response;
-
- StreamingRequest(HttpMethod method, String uri) {
- this(method, uri, "", new CaseInsensitiveHeaders(), new StreamingResponse());
- }
-
- StreamingRequest(HttpMethod method, String uri, String body, MultiMap headers, HttpServerResponse response) {
- this.method = method;
- this.uri = uri;
- this.body = body;
- this.headers = headers;
- this.response = response;
- }
-
- @Override public HttpMethod method() {
- return method;
- }
- @Override public String uri() {
- return uri;
- }
- @Override public MultiMap headers() { return headers; }
-
- @Override
- public HttpServerRequest bodyHandler(Handler bodyHandler) {
- bodyHandler.handle(Buffer.buffer(body));
- return this;
- }
-
- @Override public HttpServerResponse response() { return response; }
+ @Test
+ public void handlePayloadNotPassingValidation(TestContext context){
+ Async async = context.async();
+
+ handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender,
+ configResourceUri, streamingPath);
+
+ when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
+ .thenReturn(Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, "Boooom")));
+
+ storage.putMockData(configResourceUri, CONFIG_RESOURCE);
+ handler.initialize().setHandler(event -> {
+ context.assertTrue(handler.isInitialized());
+
+ String singleMessagePayload = "{\n" +
+ "\t\"records\": [{\n" +
+ "\t\t\"key\": \"record_0000001\",\n" +
+ "\t\t\"value\": {\n" +
+ "\t\t\t\"metadata\": {\n" +
+ "\t\t\t\t\"techId\": \"071X1500492vora1560860907613\",\n" +
+ "\t\t\t\t\"user\": \"foo\"\n" +
+ "\t\t\t},\n" +
+ "\t\t\t\"event\": {\n" +
+ "\t\t\t\t\"actionTime\": \"2019-06-18T14:28:27.617+02:00\",\n" +
+ "\t\t\t\t\"type\": 1,\n" +
+ "\t\t\t\t\"bool\": false\n" +
+ "\t\t\t}\n" +
+ "\t\t},\n" +
+ "\t\t\"headers\": {\n" +
+ "\t\t\t\"x-header-a\": \"value-a\",\n" +
+ "\t\t\t\"x-header-b\": \"value-b\",\n" +
+ "\t\t\t\"x-header-c\": \"value-c\"\n" +
+ "\t\t}\n" +
+ "\t}]\n" +
+ "}";
+
+ HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
+ StreamingRequest request = new StreamingRequest(HttpMethod.POST, streamingPath + "my.topic.x",
+ singleMessagePayload, new CaseInsensitiveHeaders(), response);
+ final boolean handled = handler.handle(request);
+
+ context.assertTrue(handled);
+ verifyZeroInteractions(kafkaMessageSender);
+ verify(response, times(1)).setStatusCode(eq(StatusCode.BAD_REQUEST.getStatusCode()));
+
+ async.complete();
+ });
}
- static class StreamingResponse extends DummyHttpServerResponse {
+ @Test
+ public void handleErrorWhileValidation(TestContext context){
+ Async async = context.async();
- private final MultiMap headers;
+ handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender,
+ configResourceUri, streamingPath);
- StreamingResponse(){
- this.headers = new CaseInsensitiveHeaders();
- }
+ when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
+ .thenReturn(Future.failedFuture("Boooom"));
- StreamingResponse(MultiMap headers){
- this.headers = headers;
- }
+ storage.putMockData(configResourceUri, CONFIG_RESOURCE);
+ handler.initialize().setHandler(event -> {
+ context.assertTrue(handler.isInitialized());
- @Override public MultiMap headers() { return headers; }
+ String singleMessagePayload = "{\n" +
+ "\t\"records\": [{\n" +
+ "\t\t\"key\": \"record_0000001\",\n" +
+ "\t\t\"value\": {\n" +
+ "\t\t\t\"metadata\": {\n" +
+ "\t\t\t\t\"techId\": \"071X1500492vora1560860907613\",\n" +
+ "\t\t\t\t\"user\": \"foo\"\n" +
+ "\t\t\t},\n" +
+ "\t\t\t\"event\": {\n" +
+ "\t\t\t\t\"actionTime\": \"2019-06-18T14:28:27.617+02:00\",\n" +
+ "\t\t\t\t\"type\": 1,\n" +
+ "\t\t\t\t\"bool\": false\n" +
+ "\t\t\t}\n" +
+ "\t\t},\n" +
+ "\t\t\"headers\": {\n" +
+ "\t\t\t\"x-header-a\": \"value-a\",\n" +
+ "\t\t\t\"x-header-b\": \"value-b\",\n" +
+ "\t\t\t\"x-header-c\": \"value-c\"\n" +
+ "\t\t}\n" +
+ "\t}]\n" +
+ "}";
+
+ HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
+ StreamingRequest request = new StreamingRequest(HttpMethod.POST, streamingPath + "my.topic.x",
+ singleMessagePayload, new CaseInsensitiveHeaders(), response);
+ final boolean handled = handler.handle(request);
+
+ context.assertTrue(handled);
+ verifyZeroInteractions(kafkaMessageSender);
+ verify(response, times(1)).setStatusCode(eq(StatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+
+ async.complete();
+ });
}
}
diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java
new file mode 100644
index 000000000..722f682a4
--- /dev/null
+++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java
@@ -0,0 +1,220 @@
+package org.swisspush.gateleen.kafka;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.CaseInsensitiveHeaders;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import io.vertx.kafka.client.producer.KafkaProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.swisspush.gateleen.core.validation.ValidationResult;
+import org.swisspush.gateleen.core.validation.ValidationStatus;
+import org.swisspush.gateleen.validation.ValidationResource;
+import org.swisspush.gateleen.validation.ValidationResourceManager;
+import org.swisspush.gateleen.validation.Validator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Test class for the {@link KafkaMessageValidator}
+ *
+ * @author https://github.com/mcweba [Marc-Andre Weber]
+ */
+@RunWith(VertxUnitRunner.class)
+public class KafkaMessageValidatorTest {
+
+ private Vertx vertx;
+ private KafkaMessageValidator messageValidator;
+ private Validator validator;
+ private ValidationResourceManager validationResourceManager;
+
+ @Before
+ public void setUp() {
+ vertx = Vertx.vertx();
+ validationResourceManager = Mockito.mock(ValidationResourceManager.class);
+ validator = Mockito.mock(Validator.class);
+
+ messageValidator = new KafkaMessageValidator(validationResourceManager, validator);
+ }
+
+ @Test
+ public void testValidateMessagesWithEmptyRecordsList(TestContext context) {
+ Async async = context.async();
+
+ HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
+ StreamingRequest request = new StreamingRequest(HttpMethod.GET, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response);
+
+ messageValidator.validateMessages(request, Collections.emptyList()).setHandler(event -> {
+ context.assertTrue(event.succeeded());
+ context.assertEquals(ValidationStatus.VALIDATED_POSITIV, event.result().getValidationStatus());
+ verifyZeroInteractions(validationResourceManager);
+ verifyZeroInteractions(validator);
+ async.complete();
+ });
+
+ }
+
+ @Test
+ public void testValidateMessagesNoMatchingValidationResourceEntry(TestContext context) {
+ Async async = context.async();
+
+ when(validationResourceManager.getValidationResource()).thenReturn(new ValidationResource());
+
+ HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
+ StreamingRequest request = new StreamingRequest(HttpMethod.GET, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response);
+
+ List> kafkaProducerRecords = new ArrayList<>();
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", "{}"));
+
+ messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> {
+ context.assertTrue(event.succeeded());
+ context.assertEquals(ValidationStatus.VALIDATED_POSITIV, event.result().getValidationStatus());
+ verify(validationResourceManager, times(1)).getValidationResource();
+ verifyZeroInteractions(validator);
+ async.complete();
+ });
+
+ }
+
+ @Test
+ public void testValidateMessagesMatchingValidationResourceEntryWithoutSchemaLocation(TestContext context) {
+ Async async = context.async();
+
+ ValidationResource validationResource = new ValidationResource();
+ validationResource.addResource(Map.of(ValidationResource.METHOD_PROPERTY, "PUT", ValidationResource.URL_PROPERTY, "/path/to/myTopic"));
+
+ when(validationResourceManager.getValidationResource()).thenReturn(validationResource);
+
+ HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
+ StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response);
+
+ List> kafkaProducerRecords = new ArrayList<>();
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", "{}"));
+
+ messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> {
+ context.assertTrue(event.succeeded());
+ context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, event.result().getValidationStatus());
+ verify(validationResourceManager, times(2)).getValidationResource();
+ verifyZeroInteractions(validator);
+ async.complete();
+ });
+
+ }
+
+ @Test
+ public void testValidateMessagesMatchingValidationResourceEntry(TestContext context) {
+ Async async = context.async();
+
+ ValidationResource validationResource = new ValidationResource();
+ validationResource.addResource(
+ Map.of(ValidationResource.METHOD_PROPERTY, "PUT",
+ ValidationResource.URL_PROPERTY, "/path/to/myTopic",
+ ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema"
+ ));
+
+ when(validationResourceManager.getValidationResource()).thenReturn(validationResource);
+
+ HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
+ StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response);
+
+ List> kafkaProducerRecords = new ArrayList<>();
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", "{}"));
+
+ when(validator.validateWithSchemaLocation(any(), any(), any())).thenReturn(
+ Future.succeededFuture(new ValidationResult(ValidationStatus.COULD_NOT_VALIDATE, "Error while getting schema.")));
+
+ messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> {
+ context.assertTrue(event.succeeded());
+ context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, event.result().getValidationStatus());
+ verify(validationResourceManager, times(2)).getValidationResource();
+ verify(validator, times(1)).validateWithSchemaLocation(any(), any(), any());
+ async.complete();
+ });
+
+ }
+
+ @Test
+ public void testValidateMessagesWithFailInValidator(TestContext context) {
+ Async async = context.async();
+
+ ValidationResource validationResource = new ValidationResource();
+ validationResource.addResource(
+ Map.of(ValidationResource.METHOD_PROPERTY, "PUT",
+ ValidationResource.URL_PROPERTY, "/path/to/myTopic",
+ ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema"
+ ));
+
+ when(validationResourceManager.getValidationResource()).thenReturn(validationResource);
+
+ HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
+ StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response);
+
+ String payload_1 = new JsonObject().encode();
+ String payload_2 = new JsonObject().put("foo", "bar").encode();
+ List> kafkaProducerRecords = new ArrayList<>();
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_1));
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_2));
+
+ when(validator.validateWithSchemaLocation(any(), eq(Buffer.buffer(payload_1)), any())).thenReturn(Future.failedFuture("Boooom"));
+ when(validator.validateWithSchemaLocation(any(), eq(Buffer.buffer(payload_2)), any())).thenReturn(
+ Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)));
+
+ messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> {
+ context.assertTrue(event.failed());
+ verify(validationResourceManager, times(2)).getValidationResource();
+ verify(validator, times(2)).validateWithSchemaLocation(any(), any(), any());
+ async.complete();
+ });
+
+ }
+
+ @Test
+ public void testValidateMultipleMessages(TestContext context) {
+ Async async = context.async();
+
+ ValidationResource validationResource = new ValidationResource();
+ validationResource.addResource(
+ Map.of(ValidationResource.METHOD_PROPERTY, "PUT",
+ ValidationResource.URL_PROPERTY, "/path/to/myTopic",
+ ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema"
+ ));
+
+ when(validationResourceManager.getValidationResource()).thenReturn(validationResource);
+
+ HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
+ StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response);
+
+ String payload_1 = new JsonObject().encode();
+ String payload_2 = new JsonObject().put("foo", "bar").encode();
+ String payload_3 = new JsonObject().put("abc", "def").encode();
+ List> kafkaProducerRecords = new ArrayList<>();
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_1));
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_2));
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_3));
+
+ when(validator.validateWithSchemaLocation(any(), any(), any())).thenReturn(
+ Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)));
+
+ messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> {
+ context.assertTrue(event.succeeded());
+ context.assertEquals(ValidationStatus.VALIDATED_POSITIV, event.result().getValidationStatus());
+ verify(validationResourceManager, times(2)).getValidationResource();
+ verify(validator, times(3)).validateWithSchemaLocation(any(), any(), any());
+ async.complete();
+ });
+
+ }
+}
diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingRequest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingRequest.java
new file mode 100644
index 000000000..4e73d7a99
--- /dev/null
+++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingRequest.java
@@ -0,0 +1,47 @@
+package org.swisspush.gateleen.kafka;
+
+import io.vertx.core.Handler;
+import io.vertx.core.MultiMap;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.CaseInsensitiveHeaders;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpServerResponse;
+import org.swisspush.gateleen.core.http.DummyHttpServerRequest;
+
+public class StreamingRequest extends DummyHttpServerRequest {
+
+ private final String uri;
+ private final HttpMethod method;
+ private final String body;
+ private final MultiMap headers;
+ private final HttpServerResponse response;
+
+ StreamingRequest(HttpMethod method, String uri) {
+ this(method, uri, "", new CaseInsensitiveHeaders(), new StreamingResponse());
+ }
+
+ StreamingRequest(HttpMethod method, String uri, String body, MultiMap headers, HttpServerResponse response) {
+ this.method = method;
+ this.uri = uri;
+ this.body = body;
+ this.headers = headers;
+ this.response = response;
+ }
+
+ @Override public HttpMethod method() {
+ return method;
+ }
+ @Override public String uri() {
+ return uri;
+ }
+ @Override public MultiMap headers() { return headers; }
+
+ @Override
+ public HttpServerRequest bodyHandler(Handler bodyHandler) {
+ bodyHandler.handle(Buffer.buffer(body));
+ return this;
+ }
+
+ @Override public HttpServerResponse response() { return response; }
+}
diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingResponse.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingResponse.java
new file mode 100644
index 000000000..70663f442
--- /dev/null
+++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingResponse.java
@@ -0,0 +1,20 @@
+package org.swisspush.gateleen.kafka;
+
+import io.vertx.core.MultiMap;
+import io.vertx.core.http.CaseInsensitiveHeaders;
+import org.swisspush.gateleen.core.http.DummyHttpServerResponse;
+
+public class StreamingResponse extends DummyHttpServerResponse {
+
+ private final MultiMap headers;
+
+ StreamingResponse(){
+ this.headers = new CaseInsensitiveHeaders();
+ }
+
+ StreamingResponse(MultiMap headers){
+ this.headers = headers;
+ }
+
+ @Override public MultiMap headers() { return headers; }
+}
diff --git a/gateleen-logging/pom.xml b/gateleen-logging/pom.xml
index 2c57b163a..c44055c0d 100644
--- a/gateleen-logging/pom.xml
+++ b/gateleen-logging/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-logging
diff --git a/gateleen-merge/pom.xml b/gateleen-merge/pom.xml
index 6232a705e..8a4be87c8 100644
--- a/gateleen-merge/pom.xml
+++ b/gateleen-merge/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-merge
diff --git a/gateleen-monitoring/pom.xml b/gateleen-monitoring/pom.xml
index 1001a48d6..ddd7b4d08 100644
--- a/gateleen-monitoring/pom.xml
+++ b/gateleen-monitoring/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-monitoring
diff --git a/gateleen-packing/pom.xml b/gateleen-packing/pom.xml
index 4a84c8153..e0de4567c 100644
--- a/gateleen-packing/pom.xml
+++ b/gateleen-packing/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-packing
diff --git a/gateleen-player/pom.xml b/gateleen-player/pom.xml
index 1e5d93721..1dd345aea 100644
--- a/gateleen-player/pom.xml
+++ b/gateleen-player/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-player
diff --git a/gateleen-playground/pom.xml b/gateleen-playground/pom.xml
index 91a1f78a7..2ec535786 100644
--- a/gateleen-playground/pom.xml
+++ b/gateleen-playground/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-playground
diff --git a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java
index 0adf6c744..058347642 100755
--- a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java
+++ b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java
@@ -41,6 +41,7 @@
import org.swisspush.gateleen.hook.reducedpropagation.impl.RedisReducedPropagationStorage;
import org.swisspush.gateleen.kafka.KafkaHandler;
import org.swisspush.gateleen.kafka.KafkaMessageSender;
+import org.swisspush.gateleen.kafka.KafkaMessageValidator;
import org.swisspush.gateleen.kafka.KafkaProducerRepository;
import org.swisspush.gateleen.logging.LogController;
import org.swisspush.gateleen.logging.LoggingResourceManager;
@@ -70,10 +71,7 @@
import org.swisspush.gateleen.security.content.ContentTypeConstraintRepository;
import org.swisspush.gateleen.user.RoleProfileHandler;
import org.swisspush.gateleen.user.UserProfileHandler;
-import org.swisspush.gateleen.validation.DefaultValidationSchemaProvider;
-import org.swisspush.gateleen.validation.ValidationHandler;
-import org.swisspush.gateleen.validation.ValidationResourceManager;
-import org.swisspush.gateleen.validation.ValidationSchemaProvider;
+import org.swisspush.gateleen.validation.*;
import java.io.IOException;
import java.time.Duration;
@@ -122,6 +120,7 @@ public class Server extends AbstractVerticle {
private ConfigurationResourceManager configurationResourceManager;
private ValidationResourceManager validationResourceManager;
private ValidationSchemaProvider validationSchemaProvider;
+ private Validator validator;
private SchedulerResourceManager schedulerResourceManager;
private QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager;
private ReducedPropagationManager reducedPropagationManager;
@@ -215,12 +214,6 @@ public void start() {
loggingResourceManager = new LoggingResourceManager(vertx, storage, SERVER_ROOT + "/admin/v1/logging");
loggingResourceManager.enableResourceLogging(true);
- KafkaProducerRepository kafkaProducerRepository = new KafkaProducerRepository(vertx);
- KafkaMessageSender kafkaMessageSender = new KafkaMessageSender();
- kafkaHandler = new KafkaHandler(configurationResourceManager, kafkaProducerRepository, kafkaMessageSender,
- SERVER_ROOT + "/admin/v1/kafka/topicsConfig",SERVER_ROOT + "/streaming/");
- kafkaHandler.initialize();
-
ContentTypeConstraintRepository repository = new ContentTypeConstraintRepository();
contentTypeConstraintHandler = new ContentTypeConstraintHandler(configurationResourceManager, repository,
SERVER_ROOT + "/admin/v1/contentTypeConstraints",
@@ -251,10 +244,16 @@ public void start() {
validationResourceManager = new ValidationResourceManager(vertx, storage, SERVER_ROOT + "/admin/v1/validation");
validationResourceManager.enableResourceLogging(true);
-
validationSchemaProvider = new DefaultValidationSchemaProvider(vertx, new ClientRequestCreator(selfClient), Duration.ofSeconds(30));
+ validator = new Validator(storage, ROOT + "/schemas/apis/", validationSchemaProvider);
+ validationHandler = new ValidationHandler(validationResourceManager, selfClient, validator);
- validationHandler = new ValidationHandler(validationResourceManager, validationSchemaProvider, storage, selfClient, ROOT + "/schemas/apis/");
+ KafkaProducerRepository kafkaProducerRepository = new KafkaProducerRepository(vertx);
+ KafkaMessageSender kafkaMessageSender = new KafkaMessageSender();
+ KafkaMessageValidator messageValidator = new KafkaMessageValidator(validationResourceManager, validator);
+ kafkaHandler = new KafkaHandler(configurationResourceManager, messageValidator, kafkaProducerRepository, kafkaMessageSender,
+ SERVER_ROOT + "/admin/v1/kafka/topicsConfig",SERVER_ROOT + "/streaming/");
+ kafkaHandler.initialize();
schedulerResourceManager = new SchedulerResourceManager(vertx, redisClient, storage, monitoringHandler,
SERVER_ROOT + "/admin/v1/schedulers");
diff --git a/gateleen-qos/pom.xml b/gateleen-qos/pom.xml
index 2e8600c99..20269236c 100644
--- a/gateleen-qos/pom.xml
+++ b/gateleen-qos/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-qos
diff --git a/gateleen-queue/pom.xml b/gateleen-queue/pom.xml
index 7332c00c8..dc8c393fb 100644
--- a/gateleen-queue/pom.xml
+++ b/gateleen-queue/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-queue
diff --git a/gateleen-routing/pom.xml b/gateleen-routing/pom.xml
index 27a3a9de3..01b5177da 100644
--- a/gateleen-routing/pom.xml
+++ b/gateleen-routing/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-routing
diff --git a/gateleen-runconfig/pom.xml b/gateleen-runconfig/pom.xml
index 135a556df..e9e96b536 100644
--- a/gateleen-runconfig/pom.xml
+++ b/gateleen-runconfig/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-runconfig
diff --git a/gateleen-scheduler/pom.xml b/gateleen-scheduler/pom.xml
index 681758da8..a2bafe5d7 100644
--- a/gateleen-scheduler/pom.xml
+++ b/gateleen-scheduler/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-scheduler
diff --git a/gateleen-security/pom.xml b/gateleen-security/pom.xml
index d4a777639..8ccd3f851 100644
--- a/gateleen-security/pom.xml
+++ b/gateleen-security/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-security
diff --git a/gateleen-test/pom.xml b/gateleen-test/pom.xml
index de25a022b..5a981a601 100644
--- a/gateleen-test/pom.xml
+++ b/gateleen-test/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-test
jar
diff --git a/gateleen-testhelper/pom.xml b/gateleen-testhelper/pom.xml
index c5eeb49dc..d572a28b0 100644
--- a/gateleen-testhelper/pom.xml
+++ b/gateleen-testhelper/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-testhelper
diff --git a/gateleen-user/pom.xml b/gateleen-user/pom.xml
index 7b2300753..05c4e888d 100644
--- a/gateleen-user/pom.xml
+++ b/gateleen-user/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-user
diff --git a/gateleen-validation/pom.xml b/gateleen-validation/pom.xml
index 4f2fdb691..8f4c64268 100644
--- a/gateleen-validation/pom.xml
+++ b/gateleen-validation/pom.xml
@@ -6,7 +6,7 @@
org.swisspush.gateleen
gateleen
- 1.2.8-SNAPSHOT
+ 1.2.9-SNAPSHOT
gateleen-validation
diff --git a/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationHandler.java b/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationHandler.java
index 146645882..003b979a5 100755
--- a/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationHandler.java
+++ b/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationHandler.java
@@ -12,11 +12,10 @@
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
+
+import static org.swisspush.gateleen.validation.ValidationUtil.matchingSchemaLocation;
+import static org.swisspush.gateleen.validation.ValidationUtil.matchingValidationResourceEntry;
/**
* Validates incoming and outgoing JSON and issues warnings in logs.
@@ -48,6 +47,12 @@ public ValidationHandler(ValidationResourceManager validationResourceManager, Va
this.validator = new Validator(storage, schemaRoot, validationSchemaProvider);
}
+ public ValidationHandler(ValidationResourceManager validationResourceManager, HttpClient httpClient, Validator validator) {
+ this.validationResourceManager = validationResourceManager;
+ this.httpClient = httpClient;
+ this.validator = validator;
+ }
+
/**
* Returns true when the {@link ValidationHandler} must be applied to this request.
*
@@ -70,29 +75,7 @@ public boolean isToValidate(HttpServerRequest request) {
return false; // do not validate
}
- return matchingValidationResource(request, log) != null;
- }
-
- private Map matchingValidationResource(HttpServerRequest request, Logger log) {
- List