From 138a5b40525ee6227d46490f4359705660e89d85 Mon Sep 17 00:00:00 2001 From: Marc-Andre Weber Date: Wed, 5 Jan 2022 15:00:18 +0100 Subject: [PATCH 1/2] #400 Extend gateleen-validation with schemaLocation. Allow direct validation of Kafka messages --- gateleen-kafka/README_kafka.md | 22 +- .../gateleen/kafka/KafkaHandler.java | 48 +++- .../gateleen/kafka/KafkaMessageValidator.java | 66 ++++++ .../gateleen/kafka/KafkaHandlerTest.java | 137 +++++++---- .../kafka/KafkaMessageValidatorTest.java | 220 ++++++++++++++++++ .../gateleen/kafka/StreamingRequest.java | 47 ++++ .../gateleen/kafka/StreamingResponse.java | 20 ++ .../swisspush/gateleen/playground/Server.java | 23 +- .../validation/ValidationHandler.java | 97 +++----- .../gateleen/validation/ValidationUtil.java | 83 +++++++ .../gateleen/validation/Validator.java | 82 ++++++- .../validation/ValidationUtilTest.java | 97 ++++++++ .../gateleen/validation/ValidatorTest.java | 136 ++++++++--- 13 files changed, 892 insertions(+), 186 deletions(-) create mode 100644 gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java create mode 100644 gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java create mode 100644 gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingRequest.java create mode 100644 gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingResponse.java create mode 100644 gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationUtil.java create mode 100644 gateleen-validation/src/test/java/org/swisspush/gateleen/validation/ValidationUtilTest.java diff --git a/gateleen-kafka/README_kafka.md b/gateleen-kafka/README_kafka.md index e67094acf..9ce9febd7 100644 --- a/gateleen-kafka/README_kafka.md +++ b/gateleen-kafka/README_kafka.md @@ -22,18 +22,16 @@ The following example shows a topic configuration resource with configurations f bottom. The first topic name matching the provided topic is used. ```json { - "topics": { - "my.topic.*": { - "bootstrap.servers": "localhost:9092", - "key.serializer": "org.apache.kafka.common.serialization.StringSerializer", - "value.serializer": "org.apache.kafka.common.serialization.StringSerializer" - }, - ".+": { - "bootstrap.servers": "localhost:9093", - "key.serializer": "org.apache.kafka.common.serialization.StringSerializer", - "value.serializer": "org.apache.kafka.common.serialization.StringSerializer" - } - } + "my.topic.*": { + "bootstrap.servers": "localhost:9092", + "key.serializer": "org.apache.kafka.common.serialization.StringSerializer", + "value.serializer": "org.apache.kafka.common.serialization.StringSerializer" + }, + ".+": { + "bootstrap.servers": "localhost:9093", + "key.serializer": "org.apache.kafka.common.serialization.StringSerializer", + "value.serializer": "org.apache.kafka.common.serialization.StringSerializer" + } } ``` In the example above, a topic called `my.topic.abc` would use the first configuration entry. A topic called `some.other.topic` would use the second configuration entry. diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java index 432bd87f5..e4e0b8d60 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java @@ -14,18 +14,16 @@ import org.swisspush.gateleen.core.http.RequestLoggerFactory; import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil; import org.swisspush.gateleen.core.util.StatusCode; -import org.swisspush.gateleen.core.util.StringUtils; +import org.swisspush.gateleen.core.validation.ValidationResult; +import org.swisspush.gateleen.core.validation.ValidationStatus; import org.swisspush.gateleen.validation.ValidationException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.regex.Pattern; -import static java.nio.charset.StandardCharsets.UTF_8; - /** * Handler class for all Kafka related requests. * @@ -47,12 +45,21 @@ public class KafkaHandler extends ConfigurationResourceConsumer { private final KafkaTopicExtractor topicExtractor; private final KafkaMessageSender kafkaMessageSender; private final Map properties; + private KafkaMessageValidator kafkaMessageValidator; private boolean initialized = false; public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { + this(configurationResourceManager, null, repository, kafkaMessageSender, + configResourceUri, streamingPath); + } + + public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, + KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, + String streamingPath) { super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema"); + this.kafkaMessageValidator = kafkaMessageValidator; this.repository = repository; this.kafkaMessageSender = kafkaMessageSender; this.streamingPath = streamingPath; @@ -114,14 +121,14 @@ public boolean handle(final HttpServerRequest request) { } final Optional optTopic = topicExtractor.extractTopic(request); - if(!optTopic.isPresent()){ + if(optTopic.isEmpty()){ respondWith(StatusCode.BAD_REQUEST, "Could not extract topic from request uri", request); return true; } String topic = optTopic.get(); final Optional, Pattern>> optProducer = repository.findMatchingKafkaProducer(topic); - if(!optProducer.isPresent()){ + if(optProducer.isEmpty()){ respondWith(StatusCode.NOT_FOUND, "Could not find a matching producer for topic " + topic, request); return true; } @@ -130,13 +137,23 @@ public boolean handle(final HttpServerRequest request) { try { log.debug("incoming kafka message payload: {}", payload.toString()); final List> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload); - kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).setHandler(event -> { - if(event.succeeded()) { - RequestLoggerFactory.getLogger(KafkaHandler.class, request) - .info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic); - respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request); + maybeValidate(request, kafkaProducerRecords).setHandler(validationEvent -> { + if(validationEvent.succeeded()) { + if(validationEvent.result().isSuccess()) { + kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).setHandler(event -> { + if(event.succeeded()) { + RequestLoggerFactory.getLogger(KafkaHandler.class, request) + .info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic); + respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request); + } else { + respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request); + } + }); + } else { + respondWith(StatusCode.BAD_REQUEST, validationEvent.result().getMessage(), request); + } } else { - respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request); + respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request); } }); } catch (ValidationException ve){ @@ -164,6 +181,13 @@ public void resourceRemoved(String resourceUri) { } } + private Future maybeValidate(HttpServerRequest request, List> kafkaProducerRecords) { + if(kafkaMessageValidator != null) { + return kafkaMessageValidator.validateMessages(request, kafkaProducerRecords); + } + return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)); + } + private void respondWith(StatusCode statusCode, String responseMessage, HttpServerRequest request) { ResponseStatusCodeLogUtil.info(request, statusCode, KafkaHandler.class); if(statusCode != StatusCode.OK) { diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java new file mode 100644 index 000000000..475e5c631 --- /dev/null +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java @@ -0,0 +1,66 @@ +package org.swisspush.gateleen.kafka; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.gateleen.core.validation.ValidationResult; +import org.swisspush.gateleen.core.validation.ValidationStatus; +import org.swisspush.gateleen.validation.SchemaLocation; +import org.swisspush.gateleen.validation.ValidationResourceManager; +import org.swisspush.gateleen.validation.ValidationUtil; +import org.swisspush.gateleen.validation.Validator; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.util.stream.Collectors.toList; + +public class KafkaMessageValidator { + + private final ValidationResourceManager validationResourceManager; + private final Validator validator; + private final Logger log = LoggerFactory.getLogger(KafkaHandler.class); + + public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) { + this.validationResourceManager = validationResourceManager; + this.validator = validator; + } + + public Future validateMessages(HttpServerRequest request, List> kafkaProducerRecords) { + if (kafkaProducerRecords.isEmpty()) { + return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)); + } + + Map entry = ValidationUtil.matchingValidationResourceEntry(validationResourceManager.getValidationResource(), request, log); + if (entry == null) { + return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)); + } + + Optional optionalSchemaLocation = ValidationUtil.matchingSchemaLocation(validationResourceManager.getValidationResource(), request, log); + if (optionalSchemaLocation.isEmpty()) { + log.warn("No schema location found for {}. Could not validate kafka message", request.uri()); + return Future.succeededFuture(new ValidationResult(ValidationStatus.COULD_NOT_VALIDATE)); + } + + SchemaLocation schemaLocation = optionalSchemaLocation.get(); + + @SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627 + List futures = kafkaProducerRecords.stream() + .map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log)) + .collect(toList()); + + return CompositeFuture.all(futures).compose(compositeFuture -> { + for (Object o : compositeFuture.list()) { + if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) { + return Future.succeededFuture((ValidationResult) o); + } + } + return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)); + }); + } +} diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java index 852c196c0..41a664ada 100644 --- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java @@ -24,6 +24,8 @@ import org.swisspush.gateleen.core.storage.MockResourceStorage; import org.swisspush.gateleen.core.util.ResourcesUtils; import org.swisspush.gateleen.core.util.StatusCode; +import org.swisspush.gateleen.core.validation.ValidationResult; +import org.swisspush.gateleen.core.validation.ValidationStatus; import java.util.HashMap; import java.util.List; @@ -48,6 +50,7 @@ public class KafkaHandlerTest { private Vertx vertx; private KafkaProducerRepository repository; private KafkaMessageSender kafkaMessageSender; + private KafkaMessageValidator messageValidator; private ConfigurationResourceManager configurationResourceManager; private KafkaHandler handler; private MockResourceStorage storage; @@ -63,6 +66,7 @@ public void setUp() { vertx = Vertx.vertx(); repository = Mockito.spy(new KafkaProducerRepository(vertx)); kafkaMessageSender = Mockito.mock(KafkaMessageSender.class); + messageValidator = Mockito.mock(KafkaMessageValidator.class); storage = new MockResourceStorage(); configurationResourceManager = new ConfigurationResourceManager(vertx, storage); handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender, @@ -429,54 +433,101 @@ public void handleValidPayloadWithFailingMessageSending(TestContext context){ }); } - static class StreamingRequest extends DummyHttpServerRequest { - private final String uri; - private final HttpMethod method; - private final String body; - private final MultiMap headers; - private final HttpServerResponse response; - - StreamingRequest(HttpMethod method, String uri) { - this(method, uri, "", new CaseInsensitiveHeaders(), new StreamingResponse()); - } - - StreamingRequest(HttpMethod method, String uri, String body, MultiMap headers, HttpServerResponse response) { - this.method = method; - this.uri = uri; - this.body = body; - this.headers = headers; - this.response = response; - } - - @Override public HttpMethod method() { - return method; - } - @Override public String uri() { - return uri; - } - @Override public MultiMap headers() { return headers; } - - @Override - public HttpServerRequest bodyHandler(Handler bodyHandler) { - bodyHandler.handle(Buffer.buffer(body)); - return this; - } - - @Override public HttpServerResponse response() { return response; } + @Test + public void handlePayloadNotPassingValidation(TestContext context){ + Async async = context.async(); + + handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender, + configResourceUri, streamingPath); + + when(messageValidator.validateMessages(any(HttpServerRequest.class), any())) + .thenReturn(Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, "Boooom"))); + + storage.putMockData(configResourceUri, CONFIG_RESOURCE); + handler.initialize().setHandler(event -> { + context.assertTrue(handler.isInitialized()); + + String singleMessagePayload = "{\n" + + "\t\"records\": [{\n" + + "\t\t\"key\": \"record_0000001\",\n" + + "\t\t\"value\": {\n" + + "\t\t\t\"metadata\": {\n" + + "\t\t\t\t\"techId\": \"071X1500492vora1560860907613\",\n" + + "\t\t\t\t\"user\": \"foo\"\n" + + "\t\t\t},\n" + + "\t\t\t\"event\": {\n" + + "\t\t\t\t\"actionTime\": \"2019-06-18T14:28:27.617+02:00\",\n" + + "\t\t\t\t\"type\": 1,\n" + + "\t\t\t\t\"bool\": false\n" + + "\t\t\t}\n" + + "\t\t},\n" + + "\t\t\"headers\": {\n" + + "\t\t\t\"x-header-a\": \"value-a\",\n" + + "\t\t\t\"x-header-b\": \"value-b\",\n" + + "\t\t\t\"x-header-c\": \"value-c\"\n" + + "\t\t}\n" + + "\t}]\n" + + "}"; + + HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders())); + StreamingRequest request = new StreamingRequest(HttpMethod.POST, streamingPath + "my.topic.x", + singleMessagePayload, new CaseInsensitiveHeaders(), response); + final boolean handled = handler.handle(request); + + context.assertTrue(handled); + verifyZeroInteractions(kafkaMessageSender); + verify(response, times(1)).setStatusCode(eq(StatusCode.BAD_REQUEST.getStatusCode())); + + async.complete(); + }); } - static class StreamingResponse extends DummyHttpServerResponse { + @Test + public void handleErrorWhileValidation(TestContext context){ + Async async = context.async(); - private final MultiMap headers; + handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender, + configResourceUri, streamingPath); - StreamingResponse(){ - this.headers = new CaseInsensitiveHeaders(); - } + when(messageValidator.validateMessages(any(HttpServerRequest.class), any())) + .thenReturn(Future.failedFuture("Boooom")); - StreamingResponse(MultiMap headers){ - this.headers = headers; - } + storage.putMockData(configResourceUri, CONFIG_RESOURCE); + handler.initialize().setHandler(event -> { + context.assertTrue(handler.isInitialized()); - @Override public MultiMap headers() { return headers; } + String singleMessagePayload = "{\n" + + "\t\"records\": [{\n" + + "\t\t\"key\": \"record_0000001\",\n" + + "\t\t\"value\": {\n" + + "\t\t\t\"metadata\": {\n" + + "\t\t\t\t\"techId\": \"071X1500492vora1560860907613\",\n" + + "\t\t\t\t\"user\": \"foo\"\n" + + "\t\t\t},\n" + + "\t\t\t\"event\": {\n" + + "\t\t\t\t\"actionTime\": \"2019-06-18T14:28:27.617+02:00\",\n" + + "\t\t\t\t\"type\": 1,\n" + + "\t\t\t\t\"bool\": false\n" + + "\t\t\t}\n" + + "\t\t},\n" + + "\t\t\"headers\": {\n" + + "\t\t\t\"x-header-a\": \"value-a\",\n" + + "\t\t\t\"x-header-b\": \"value-b\",\n" + + "\t\t\t\"x-header-c\": \"value-c\"\n" + + "\t\t}\n" + + "\t}]\n" + + "}"; + + HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders())); + StreamingRequest request = new StreamingRequest(HttpMethod.POST, streamingPath + "my.topic.x", + singleMessagePayload, new CaseInsensitiveHeaders(), response); + final boolean handled = handler.handle(request); + + context.assertTrue(handled); + verifyZeroInteractions(kafkaMessageSender); + verify(response, times(1)).setStatusCode(eq(StatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + + async.complete(); + }); } } diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java new file mode 100644 index 000000000..722f682a4 --- /dev/null +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java @@ -0,0 +1,220 @@ +package org.swisspush.gateleen.kafka; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.CaseInsensitiveHeaders; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.swisspush.gateleen.core.validation.ValidationResult; +import org.swisspush.gateleen.core.validation.ValidationStatus; +import org.swisspush.gateleen.validation.ValidationResource; +import org.swisspush.gateleen.validation.ValidationResourceManager; +import org.swisspush.gateleen.validation.Validator; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.*; + +/** + * Test class for the {@link KafkaMessageValidator} + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +@RunWith(VertxUnitRunner.class) +public class KafkaMessageValidatorTest { + + private Vertx vertx; + private KafkaMessageValidator messageValidator; + private Validator validator; + private ValidationResourceManager validationResourceManager; + + @Before + public void setUp() { + vertx = Vertx.vertx(); + validationResourceManager = Mockito.mock(ValidationResourceManager.class); + validator = Mockito.mock(Validator.class); + + messageValidator = new KafkaMessageValidator(validationResourceManager, validator); + } + + @Test + public void testValidateMessagesWithEmptyRecordsList(TestContext context) { + Async async = context.async(); + + HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders())); + StreamingRequest request = new StreamingRequest(HttpMethod.GET, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response); + + messageValidator.validateMessages(request, Collections.emptyList()).setHandler(event -> { + context.assertTrue(event.succeeded()); + context.assertEquals(ValidationStatus.VALIDATED_POSITIV, event.result().getValidationStatus()); + verifyZeroInteractions(validationResourceManager); + verifyZeroInteractions(validator); + async.complete(); + }); + + } + + @Test + public void testValidateMessagesNoMatchingValidationResourceEntry(TestContext context) { + Async async = context.async(); + + when(validationResourceManager.getValidationResource()).thenReturn(new ValidationResource()); + + HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders())); + StreamingRequest request = new StreamingRequest(HttpMethod.GET, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response); + + List> kafkaProducerRecords = new ArrayList<>(); + kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", "{}")); + + messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> { + context.assertTrue(event.succeeded()); + context.assertEquals(ValidationStatus.VALIDATED_POSITIV, event.result().getValidationStatus()); + verify(validationResourceManager, times(1)).getValidationResource(); + verifyZeroInteractions(validator); + async.complete(); + }); + + } + + @Test + public void testValidateMessagesMatchingValidationResourceEntryWithoutSchemaLocation(TestContext context) { + Async async = context.async(); + + ValidationResource validationResource = new ValidationResource(); + validationResource.addResource(Map.of(ValidationResource.METHOD_PROPERTY, "PUT", ValidationResource.URL_PROPERTY, "/path/to/myTopic")); + + when(validationResourceManager.getValidationResource()).thenReturn(validationResource); + + HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders())); + StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response); + + List> kafkaProducerRecords = new ArrayList<>(); + kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", "{}")); + + messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> { + context.assertTrue(event.succeeded()); + context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, event.result().getValidationStatus()); + verify(validationResourceManager, times(2)).getValidationResource(); + verifyZeroInteractions(validator); + async.complete(); + }); + + } + + @Test + public void testValidateMessagesMatchingValidationResourceEntry(TestContext context) { + Async async = context.async(); + + ValidationResource validationResource = new ValidationResource(); + validationResource.addResource( + Map.of(ValidationResource.METHOD_PROPERTY, "PUT", + ValidationResource.URL_PROPERTY, "/path/to/myTopic", + ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema" + )); + + when(validationResourceManager.getValidationResource()).thenReturn(validationResource); + + HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders())); + StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response); + + List> kafkaProducerRecords = new ArrayList<>(); + kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", "{}")); + + when(validator.validateWithSchemaLocation(any(), any(), any())).thenReturn( + Future.succeededFuture(new ValidationResult(ValidationStatus.COULD_NOT_VALIDATE, "Error while getting schema."))); + + messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> { + context.assertTrue(event.succeeded()); + context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, event.result().getValidationStatus()); + verify(validationResourceManager, times(2)).getValidationResource(); + verify(validator, times(1)).validateWithSchemaLocation(any(), any(), any()); + async.complete(); + }); + + } + + @Test + public void testValidateMessagesWithFailInValidator(TestContext context) { + Async async = context.async(); + + ValidationResource validationResource = new ValidationResource(); + validationResource.addResource( + Map.of(ValidationResource.METHOD_PROPERTY, "PUT", + ValidationResource.URL_PROPERTY, "/path/to/myTopic", + ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema" + )); + + when(validationResourceManager.getValidationResource()).thenReturn(validationResource); + + HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders())); + StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response); + + String payload_1 = new JsonObject().encode(); + String payload_2 = new JsonObject().put("foo", "bar").encode(); + List> kafkaProducerRecords = new ArrayList<>(); + kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_1)); + kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_2)); + + when(validator.validateWithSchemaLocation(any(), eq(Buffer.buffer(payload_1)), any())).thenReturn(Future.failedFuture("Boooom")); + when(validator.validateWithSchemaLocation(any(), eq(Buffer.buffer(payload_2)), any())).thenReturn( + Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV))); + + messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> { + context.assertTrue(event.failed()); + verify(validationResourceManager, times(2)).getValidationResource(); + verify(validator, times(2)).validateWithSchemaLocation(any(), any(), any()); + async.complete(); + }); + + } + + @Test + public void testValidateMultipleMessages(TestContext context) { + Async async = context.async(); + + ValidationResource validationResource = new ValidationResource(); + validationResource.addResource( + Map.of(ValidationResource.METHOD_PROPERTY, "PUT", + ValidationResource.URL_PROPERTY, "/path/to/myTopic", + ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema" + )); + + when(validationResourceManager.getValidationResource()).thenReturn(validationResource); + + HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders())); + StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new CaseInsensitiveHeaders(), response); + + String payload_1 = new JsonObject().encode(); + String payload_2 = new JsonObject().put("foo", "bar").encode(); + String payload_3 = new JsonObject().put("abc", "def").encode(); + List> kafkaProducerRecords = new ArrayList<>(); + kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_1)); + kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_2)); + kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_3)); + + when(validator.validateWithSchemaLocation(any(), any(), any())).thenReturn( + Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV))); + + messageValidator.validateMessages(request, kafkaProducerRecords).setHandler(event -> { + context.assertTrue(event.succeeded()); + context.assertEquals(ValidationStatus.VALIDATED_POSITIV, event.result().getValidationStatus()); + verify(validationResourceManager, times(2)).getValidationResource(); + verify(validator, times(3)).validateWithSchemaLocation(any(), any(), any()); + async.complete(); + }); + + } +} diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingRequest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingRequest.java new file mode 100644 index 000000000..4e73d7a99 --- /dev/null +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingRequest.java @@ -0,0 +1,47 @@ +package org.swisspush.gateleen.kafka; + +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.CaseInsensitiveHeaders; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import org.swisspush.gateleen.core.http.DummyHttpServerRequest; + +public class StreamingRequest extends DummyHttpServerRequest { + + private final String uri; + private final HttpMethod method; + private final String body; + private final MultiMap headers; + private final HttpServerResponse response; + + StreamingRequest(HttpMethod method, String uri) { + this(method, uri, "", new CaseInsensitiveHeaders(), new StreamingResponse()); + } + + StreamingRequest(HttpMethod method, String uri, String body, MultiMap headers, HttpServerResponse response) { + this.method = method; + this.uri = uri; + this.body = body; + this.headers = headers; + this.response = response; + } + + @Override public HttpMethod method() { + return method; + } + @Override public String uri() { + return uri; + } + @Override public MultiMap headers() { return headers; } + + @Override + public HttpServerRequest bodyHandler(Handler bodyHandler) { + bodyHandler.handle(Buffer.buffer(body)); + return this; + } + + @Override public HttpServerResponse response() { return response; } +} diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingResponse.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingResponse.java new file mode 100644 index 000000000..70663f442 --- /dev/null +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/StreamingResponse.java @@ -0,0 +1,20 @@ +package org.swisspush.gateleen.kafka; + +import io.vertx.core.MultiMap; +import io.vertx.core.http.CaseInsensitiveHeaders; +import org.swisspush.gateleen.core.http.DummyHttpServerResponse; + +public class StreamingResponse extends DummyHttpServerResponse { + + private final MultiMap headers; + + StreamingResponse(){ + this.headers = new CaseInsensitiveHeaders(); + } + + StreamingResponse(MultiMap headers){ + this.headers = headers; + } + + @Override public MultiMap headers() { return headers; } +} diff --git a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java index 0adf6c744..058347642 100755 --- a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java +++ b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java @@ -41,6 +41,7 @@ import org.swisspush.gateleen.hook.reducedpropagation.impl.RedisReducedPropagationStorage; import org.swisspush.gateleen.kafka.KafkaHandler; import org.swisspush.gateleen.kafka.KafkaMessageSender; +import org.swisspush.gateleen.kafka.KafkaMessageValidator; import org.swisspush.gateleen.kafka.KafkaProducerRepository; import org.swisspush.gateleen.logging.LogController; import org.swisspush.gateleen.logging.LoggingResourceManager; @@ -70,10 +71,7 @@ import org.swisspush.gateleen.security.content.ContentTypeConstraintRepository; import org.swisspush.gateleen.user.RoleProfileHandler; import org.swisspush.gateleen.user.UserProfileHandler; -import org.swisspush.gateleen.validation.DefaultValidationSchemaProvider; -import org.swisspush.gateleen.validation.ValidationHandler; -import org.swisspush.gateleen.validation.ValidationResourceManager; -import org.swisspush.gateleen.validation.ValidationSchemaProvider; +import org.swisspush.gateleen.validation.*; import java.io.IOException; import java.time.Duration; @@ -122,6 +120,7 @@ public class Server extends AbstractVerticle { private ConfigurationResourceManager configurationResourceManager; private ValidationResourceManager validationResourceManager; private ValidationSchemaProvider validationSchemaProvider; + private Validator validator; private SchedulerResourceManager schedulerResourceManager; private QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager; private ReducedPropagationManager reducedPropagationManager; @@ -215,12 +214,6 @@ public void start() { loggingResourceManager = new LoggingResourceManager(vertx, storage, SERVER_ROOT + "/admin/v1/logging"); loggingResourceManager.enableResourceLogging(true); - KafkaProducerRepository kafkaProducerRepository = new KafkaProducerRepository(vertx); - KafkaMessageSender kafkaMessageSender = new KafkaMessageSender(); - kafkaHandler = new KafkaHandler(configurationResourceManager, kafkaProducerRepository, kafkaMessageSender, - SERVER_ROOT + "/admin/v1/kafka/topicsConfig",SERVER_ROOT + "/streaming/"); - kafkaHandler.initialize(); - ContentTypeConstraintRepository repository = new ContentTypeConstraintRepository(); contentTypeConstraintHandler = new ContentTypeConstraintHandler(configurationResourceManager, repository, SERVER_ROOT + "/admin/v1/contentTypeConstraints", @@ -251,10 +244,16 @@ public void start() { validationResourceManager = new ValidationResourceManager(vertx, storage, SERVER_ROOT + "/admin/v1/validation"); validationResourceManager.enableResourceLogging(true); - validationSchemaProvider = new DefaultValidationSchemaProvider(vertx, new ClientRequestCreator(selfClient), Duration.ofSeconds(30)); + validator = new Validator(storage, ROOT + "/schemas/apis/", validationSchemaProvider); + validationHandler = new ValidationHandler(validationResourceManager, selfClient, validator); - validationHandler = new ValidationHandler(validationResourceManager, validationSchemaProvider, storage, selfClient, ROOT + "/schemas/apis/"); + KafkaProducerRepository kafkaProducerRepository = new KafkaProducerRepository(vertx); + KafkaMessageSender kafkaMessageSender = new KafkaMessageSender(); + KafkaMessageValidator messageValidator = new KafkaMessageValidator(validationResourceManager, validator); + kafkaHandler = new KafkaHandler(configurationResourceManager, messageValidator, kafkaProducerRepository, kafkaMessageSender, + SERVER_ROOT + "/admin/v1/kafka/topicsConfig",SERVER_ROOT + "/streaming/"); + kafkaHandler.initialize(); schedulerResourceManager = new SchedulerResourceManager(vertx, redisClient, storage, monitoringHandler, SERVER_ROOT + "/admin/v1/schedulers"); diff --git a/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationHandler.java b/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationHandler.java index 146645882..003b979a5 100755 --- a/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationHandler.java +++ b/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationHandler.java @@ -12,11 +12,10 @@ import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; + +import static org.swisspush.gateleen.validation.ValidationUtil.matchingSchemaLocation; +import static org.swisspush.gateleen.validation.ValidationUtil.matchingValidationResourceEntry; /** * Validates incoming and outgoing JSON and issues warnings in logs. @@ -48,6 +47,12 @@ public ValidationHandler(ValidationResourceManager validationResourceManager, Va this.validator = new Validator(storage, schemaRoot, validationSchemaProvider); } + public ValidationHandler(ValidationResourceManager validationResourceManager, HttpClient httpClient, Validator validator) { + this.validationResourceManager = validationResourceManager; + this.httpClient = httpClient; + this.validator = validator; + } + /** * Returns true when the {@link ValidationHandler} must be applied to this request. * @@ -70,29 +75,7 @@ public boolean isToValidate(HttpServerRequest request) { return false; // do not validate } - return matchingValidationResource(request, log) != null; - } - - private Map matchingValidationResource(HttpServerRequest request, Logger log) { - List> validationResources = validationResourceManager.getValidationResource().getResources(); - try { - for (Map validationResource : validationResources) { - if (doesRequestValueMatch(request.method().name(), validationResource.get(ValidationResource.METHOD_PROPERTY)) - && doesRequestValueMatch(request.uri(), validationResource.get(ValidationResource.URL_PROPERTY))) { - return validationResource; - } - } - } catch (PatternSyntaxException patternException) { - log.error(patternException.getMessage() + " " + patternException.getPattern()); - } - - return null; - } - - private boolean doesRequestValueMatch(String value, String valuePattern) { - Pattern pattern = Pattern.compile(valuePattern); - Matcher matcher = pattern.matcher(value); - return matcher.matches(); + return matchingValidationResourceEntry(validationResourceManager.getValidationResource(), request, log) != null; } private boolean isJsonRequest(HttpServerRequest request) { @@ -119,21 +102,23 @@ private void handleValidation(final HttpServerRequest req) { cRes.bodyHandler(data -> { if (req.response().getStatusCode() == StatusCode.OK.getStatusCode() && outMethods.contains(req.method().name()) && data.length() > 0) { - validator.validate(req, req.method() + "/out", data, schemaLocation(req, log).orElse(null), event -> { - if (event.isSuccess()) { - req.response().end(data); - } else { - if (isFailOnError()) { - req.response().headers().clear(); - req.response().setStatusCode(StatusCode.BAD_REQUEST.getStatusCode()); - req.response().setStatusMessage(StatusCode.BAD_REQUEST.getStatusMessage()); - req.response().end(); - } else { - req.response().end(data); - } - log.warn(event.getMessage()); - } - }); + validator.validate(req, req.method() + "/out", data, + matchingSchemaLocation(validationResourceManager.getValidationResource(), req, log).orElse(null), + event -> { + if (event.isSuccess()) { + req.response().end(data); + } else { + if (isFailOnError()) { + req.response().headers().clear(); + req.response().setStatusCode(StatusCode.BAD_REQUEST.getStatusCode()); + req.response().setStatusMessage(StatusCode.BAD_REQUEST.getStatusMessage()); + req.response().end(); + } else { + req.response().end(data); + } + log.warn(event.getMessage()); + } + }); } else { req.response().end(data); } @@ -146,7 +131,8 @@ private void handleValidation(final HttpServerRequest req) { req.bodyHandler(data -> { if (inMethods.contains(req.method().name())) { - validator.validate(req, req.method() + "/in", data, schemaLocation(req, log).orElse(null), + validator.validate(req, req.method() + "/in", data, + matchingSchemaLocation(validationResourceManager.getValidationResource(), req, log).orElse(null), event -> { if (event.isSuccess()) { cReq.end(data); @@ -185,29 +171,4 @@ public boolean isFailOnError() { public void setFailOnError(boolean failOnError) { this.failOnError = failOnError; } - - private Optional schemaLocation(HttpServerRequest request, Logger log) { - Map validationResource = matchingValidationResource(request, log); - if (validationResource != null) { - String location = validationResource.get(ValidationResource.SCHEMA_LOCATION_PROPERTY); - if(location == null) { - return Optional.empty(); - } - - String keepInMemoryStr = validationResource.get(ValidationResource.SCHEMA_KEEP_INMEMORY_PROPERTY); - - Integer keepInMemory = null; - if(keepInMemoryStr != null) { - try { - keepInMemory = Integer.parseInt(keepInMemoryStr); - } catch (NumberFormatException ex) { - log.warn("Property 'keepInMemory' is not a number but " + keepInMemoryStr, ex); - } - } - - return Optional.of(new SchemaLocation(location, keepInMemory)); - } - return Optional.empty(); - } - } \ No newline at end of file diff --git a/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationUtil.java b/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationUtil.java new file mode 100644 index 000000000..3ab43634b --- /dev/null +++ b/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/ValidationUtil.java @@ -0,0 +1,83 @@ +package org.swisspush.gateleen.validation; + +import io.vertx.core.http.HttpServerRequest; +import org.slf4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * Utility class providing functions for working with {@link ValidationResource} and {@link SchemaLocation} + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public final class ValidationUtil { + + private ValidationUtil() {} + + /** + * Get values of matching {@link ValidationResource} for provided {@link HttpServerRequest} + * + * @param validationResource the {@link ValidationResource} holding the configuration values + * @param request the {@link HttpServerRequest} to lookup + * @param log the {@link Logger} + * @return a {@link Map} holding the corresponding configuration values or null + */ + public static Map matchingValidationResourceEntry(ValidationResource validationResource, HttpServerRequest request, Logger log) { + List> validationResources = validationResource.getResources(); + try { + for (Map entry : validationResources) { + if (doesRequestValueMatch(request.method().name(), entry.get(ValidationResource.METHOD_PROPERTY)) + && doesRequestValueMatch(request.uri(), entry.get(ValidationResource.URL_PROPERTY))) { + return entry; + } + } + } catch (PatternSyntaxException patternException) { + log.error(patternException.getMessage() + " " + patternException.getPattern()); + } + + return null; + } + + /** + * Get a {@link SchemaLocation} (when present) of the matching {@link ValidationResource} for provided {@link HttpServerRequest} + * + * @param validationResource the {@link ValidationResource} holding the configuration values + * @param request the {@link HttpServerRequest} to lookup + * @param log the {@link Logger} + * @return the {@link SchemaLocation} if present in the configuration values. Otherwise returns {@link Optional#empty()} + */ + public static Optional matchingSchemaLocation(ValidationResource validationResource, HttpServerRequest request, Logger log) { + Map entry = matchingValidationResourceEntry(validationResource, request, log); + if (entry != null) { + String location = entry.get(ValidationResource.SCHEMA_LOCATION_PROPERTY); + if(location == null) { + return Optional.empty(); + } + + String keepInMemoryStr = entry.get(ValidationResource.SCHEMA_KEEP_INMEMORY_PROPERTY); + + Integer keepInMemory = null; + if(keepInMemoryStr != null) { + try { + keepInMemory = Integer.parseInt(keepInMemoryStr); + } catch (NumberFormatException ex) { + log.warn("Property 'keepInMemory' is not a number but " + keepInMemoryStr, ex); + } + } + + return Optional.of(new SchemaLocation(location, keepInMemory)); + } + return Optional.empty(); + } + + private static boolean doesRequestValueMatch(String value, String valuePattern) { + Pattern pattern = Pattern.compile(valuePattern); + Matcher matcher = pattern.matcher(value); + return matcher.matches(); + } +} diff --git a/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/Validator.java b/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/Validator.java index 81229f3d8..6ceacd49e 100755 --- a/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/Validator.java +++ b/gateleen-validation/src/main/java/org/swisspush/gateleen/validation/Validator.java @@ -2,21 +2,22 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; import com.networknt.schema.JsonSchema; import com.networknt.schema.JsonSchemaFactory; import com.networknt.schema.ValidationMessage; -import io.vertx.core.json.Json; -import org.swisspush.gateleen.core.http.RequestLoggerFactory; -import org.swisspush.gateleen.core.json.JsonUtil; -import org.swisspush.gateleen.core.storage.ResourceStorage; -import org.swisspush.gateleen.core.util.StringUtils; -import com.google.common.base.Joiner; -import org.slf4j.Logger; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.swisspush.gateleen.core.http.RequestLoggerFactory; +import org.swisspush.gateleen.core.json.JsonUtil; +import org.swisspush.gateleen.core.storage.ResourceStorage; +import org.swisspush.gateleen.core.util.StringUtils; import org.swisspush.gateleen.core.validation.ValidationResult; import org.swisspush.gateleen.core.validation.ValidationStatus; @@ -39,6 +40,29 @@ public Validator(ResourceStorage storage, String schemaRoot, ValidationSchemaPro this.schemaProvider = schemaProvider; } + public Future validateWithSchemaLocation(SchemaLocation schemaLocation, Buffer jsonBuffer, Logger log) { + Future future = Future.future(); + log.debug("Validating request"); + schemaProvider.schemaFromLocation(schemaLocation).setHandler(event -> { + if (event.failed()) { + future.complete(new ValidationResult(ValidationStatus.COULD_NOT_VALIDATE, + "Error while getting schema. Cause: " + event.cause().getMessage())); + return; + } + + Optional schemaOptional = event.result(); + if (schemaOptional.isEmpty()) { + future.complete(new ValidationResult(ValidationStatus.COULD_NOT_VALIDATE, + "No schema found in location " + schemaLocation.schemaLocation())); + return; + } + + JsonSchema jsonSchema = schemaOptional.get(); + performValidation(jsonSchema, schemaLocation, jsonBuffer, log).setHandler(validationEvent -> future.complete(validationEvent.result())); + }); + return future; + } + public void validate(HttpServerRequest req, String type, Buffer jsonBuffer, SchemaLocation schemaLocation, Handler callback) { if (schemaLocation == null) { validate(req, type, jsonBuffer, callback); @@ -162,12 +186,54 @@ public static ValidationResult validateStatic(Buffer dataToBeValidated, String s } } + private static Future performValidation(JsonSchema schema, SchemaLocation schemaLocation, + Buffer jsonBuffer, Logger log) { + Future future = Future.future(); + + try { + JsonNode jsonNode = new ObjectMapper().readTree(jsonBuffer.getBytes()); + if (jsonNode == null) { + future.complete(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, + "no valid JSON object: " + jsonBuffer.toString())); + return future; + } + final Set valMsgs = schema.validate(jsonNode); + if (valMsgs.isEmpty()) { + log.debug("Used schema: {}", schemaLocation.schemaLocation()); + future.complete(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)); + } else { + JsonArray validationDetails = extractMessagesAsJson(valMsgs, log); + String messages = StringUtils.getStringOrEmpty(extractMessages(valMsgs)); + StringBuilder msgBuilder = new StringBuilder(); + msgBuilder.append("Invalid JSON for ") + .append(schemaLocation.schemaLocation()).append(". Messages: ") + .append(messages) + .append(" | Report: ").append(getReportAsString(valMsgs)); + + if (log.isDebugEnabled()) { + msgBuilder.append(" | Validated JSON: ").append(jsonBuffer.toString()); + } + + log.warn(msgBuilder.toString()); + + future.complete(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, msgBuilder.toString(), validationDetails)); + log.warn("Used schema: {}", schemaLocation.schemaLocation()); + } + } catch (IOException e) { + String message = "Cannot read JSON of schema " + " (" + schemaLocation.schemaLocation() + ")"; + log.warn(message, e.getMessage()); + future.complete(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, message)); + } + + return future; + } + private static void performValidation(JsonSchema schema, Logger log, String base, Buffer jsonBuffer, String type, String path, Handler callback) { try { JsonNode jsonNode = new ObjectMapper().readTree(jsonBuffer.getBytes()); if (jsonNode == null) { - throw new IOException("no vaild JSON object: " + jsonBuffer.toString()); + throw new IOException("no valid JSON object: " + jsonBuffer.toString()); } final Set valMsgs = schema.validate(jsonNode); if (valMsgs.isEmpty()) { diff --git a/gateleen-validation/src/test/java/org/swisspush/gateleen/validation/ValidationUtilTest.java b/gateleen-validation/src/test/java/org/swisspush/gateleen/validation/ValidationUtilTest.java new file mode 100644 index 000000000..f1fc02bb9 --- /dev/null +++ b/gateleen-validation/src/test/java/org/swisspush/gateleen/validation/ValidationUtilTest.java @@ -0,0 +1,97 @@ +package org.swisspush.gateleen.validation; + +import io.vertx.core.http.HttpMethod; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.slf4j.Logger; + +import java.util.Map; +import java.util.Optional; + +import static org.swisspush.gateleen.validation.ValidationUtil.matchingValidationResourceEntry; + +/** + * Test class for the ValidationUtil + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +@RunWith(VertxUnitRunner.class) +public class ValidationUtilTest extends AbstractTest { + + private Logger log; + + @Before + public void setUp() { + log = Mockito.mock(Logger.class); + } + + @Test + public void testMatchingValidationResourceEntry(TestContext context) { + ValidationResource validationResource = new ValidationResource(); + validationResource.addResource(Map.of(ValidationResource.METHOD_PROPERTY, "PUT", ValidationResource.URL_PROPERTY, "/some/other/resource")); + + Map entryMap = matchingValidationResourceEntry(validationResource, + new CustomHttpServerRequest(HttpMethod.PUT, "/some/other/resource"), log); + context.assertNotNull(entryMap); + context.assertEquals(entryMap.get(ValidationResource.METHOD_PROPERTY), "PUT"); + context.assertEquals(entryMap.get(ValidationResource.URL_PROPERTY), "/some/other/resource"); + + entryMap = matchingValidationResourceEntry(validationResource, + new CustomHttpServerRequest(HttpMethod.GET, "/some/other/resource"), log); + context.assertNull(entryMap); + + entryMap = matchingValidationResourceEntry(validationResource, + new CustomHttpServerRequest(HttpMethod.PUT, "/foo/bar/resource"), log); + context.assertNull(entryMap); + } + + @Test + public void testMatchingSchemaLocation(TestContext context) { + ValidationResource validationResource = new ValidationResource(); + validationResource.addResource(Map.of( + ValidationResource.METHOD_PROPERTY, "PUT", + ValidationResource.URL_PROPERTY, "/some/other/resource", + ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema", + ValidationResource.SCHEMA_KEEP_INMEMORY_PROPERTY, "120" + )); + + Optional optionalSchemaLocation = ValidationUtil.matchingSchemaLocation(validationResource, + new CustomHttpServerRequest(HttpMethod.PUT, "/some/other/resource"), log); + context.assertTrue(optionalSchemaLocation.isPresent()); + context.assertEquals("/path/to/schema", optionalSchemaLocation.get().schemaLocation()); + context.assertEquals(120, optionalSchemaLocation.get().keepInMemory()); + + optionalSchemaLocation = ValidationUtil.matchingSchemaLocation(validationResource, + new CustomHttpServerRequest(HttpMethod.GET, "/some/other/resource"), log); + context.assertFalse(optionalSchemaLocation.isPresent()); + + optionalSchemaLocation = ValidationUtil.matchingSchemaLocation(validationResource, + new CustomHttpServerRequest(HttpMethod.PUT, "/foo/bar/resource"), log); + context.assertFalse(optionalSchemaLocation.isPresent()); + + validationResource.addResource(Map.of( + ValidationResource.METHOD_PROPERTY, "PUT", + ValidationResource.URL_PROPERTY, "/foo/bar/resource", + ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/other/schema" + )); + + optionalSchemaLocation = ValidationUtil.matchingSchemaLocation(validationResource, + new CustomHttpServerRequest(HttpMethod.PUT, "/foo/bar/resource"), log); + context.assertTrue(optionalSchemaLocation.isPresent()); + context.assertEquals("/path/to/other/schema", optionalSchemaLocation.get().schemaLocation()); + context.assertNull(optionalSchemaLocation.get().keepInMemory()); + + ValidationResource validationResourceWithoutSchemaLocation = new ValidationResource(); + validationResourceWithoutSchemaLocation.addResource(Map.of( + ValidationResource.METHOD_PROPERTY, "PUT", + ValidationResource.URL_PROPERTY, "/some/other/resource" + )); + optionalSchemaLocation = ValidationUtil.matchingSchemaLocation(validationResourceWithoutSchemaLocation, + new CustomHttpServerRequest(HttpMethod.PUT, "/some/other/resource"), log); + context.assertFalse(optionalSchemaLocation.isPresent()); + } +} diff --git a/gateleen-validation/src/test/java/org/swisspush/gateleen/validation/ValidatorTest.java b/gateleen-validation/src/test/java/org/swisspush/gateleen/validation/ValidatorTest.java index 7f424c7ac..c6b57ff16 100755 --- a/gateleen-validation/src/test/java/org/swisspush/gateleen/validation/ValidatorTest.java +++ b/gateleen-validation/src/test/java/org/swisspush/gateleen/validation/ValidatorTest.java @@ -11,6 +11,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; +import org.slf4j.Logger; import org.swisspush.gateleen.core.storage.MockResourceStorage; import org.swisspush.gateleen.core.validation.ValidationStatus; @@ -26,6 +27,7 @@ public class ValidatorTest extends AbstractTest { private ValidationSchemaProvider validationSchemaProvider; private final String SCHEMA_ROOT = "/foo/schemas/apis/"; private Validator validator; + private Logger logger; private final String SAMPLE_SCHEMA = "{\n" + "\t\"$schema\": \"http://json-schema.org/draft-04/schema#\",\n" + @@ -52,17 +54,18 @@ public class ValidatorTest extends AbstractTest { "}"; @Before - public void setUp(){ + public void setUp() { storage = new MockResourceStorage(); validationSchemaProvider = Mockito.mock(ValidationSchemaProvider.class); validator = new Validator(storage, SCHEMA_ROOT, validationSchemaProvider); + logger = Mockito.mock(Logger.class); - storage.putMockData("/foo/schemas/apis/","{\"apis\": [\"foo\"]}"); + storage.putMockData("/foo/schemas/apis/", "{\"apis\": [\"foo\"]}"); } @Test - public void testValidationWithVariables(TestContext context){ + public void testValidationWithVariables(TestContext context) { // add Data for lowdash replacement prepareSchema("{\n" + " \"$schema\": \"http://json-schema.org/draft-04/schema#\", \n" + @@ -85,7 +88,7 @@ public void testValidationWithVariables(TestContext context){ } @Test - public void testValidationWithNonValidResourceContent(TestContext context){ + public void testValidationWithNonValidResourceContent(TestContext context) { Async async = context.async(); prepareSchema(SAMPLE_SCHEMA); @@ -106,7 +109,7 @@ public void testValidationWithNonValidResourceContent(TestContext context){ } @Test - public void testValidationWithValidResourceContent(TestContext context){ + public void testValidationWithValidResourceContent(TestContext context) { Async async = context.async(); prepareSchema(SAMPLE_SCHEMA); @@ -123,15 +126,15 @@ public void testValidationWithValidResourceContent(TestContext context){ } @Test - public void testValidation(TestContext context){ + public void testValidation(TestContext context) { // add Data for lowdash replacement - storage.putMockData("/foo/schemas/apis/foo/","{\"foo\": [\"mediamessage\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediamessage/","{\"mediamessage\": [\"v1\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/","{\"v1\": [\"output\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/","{\"output\": [\"front\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/front/","{\"front\": [\"GET\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/front/GET/","{\"GET\": [\"out\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/front/GET/out","{\n" + + storage.putMockData("/foo/schemas/apis/foo/", "{\"foo\": [\"mediamessage\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediamessage/", "{\"mediamessage\": [\"v1\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/", "{\"v1\": [\"output\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/", "{\"output\": [\"front\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/front/", "{\"front\": [\"GET\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/front/GET/", "{\"GET\": [\"out\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/front/GET/out", "{\n" + " \"$schema\": \"http://json-schema.org/draft-04/schema#\", \n" + " \"type\": \"object\"\n" + "}"); @@ -152,12 +155,12 @@ public void testValidation(TestContext context){ } @Test - public void testValidationWithNoSchema(TestContext context){ + public void testValidationWithNoSchema(TestContext context) { // add Data for lowdash replacement - storage.putMockData("/foo/schemas/apis/foo/","{\"foo\": [\"mediamessage\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediamessage/","{\"mediamessage\": [\"v1\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/","{\"v1\": [\"output\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/","{\"output\": [\"front\"]}"); + storage.putMockData("/foo/schemas/apis/foo/", "{\"foo\": [\"mediamessage\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediamessage/", "{\"mediamessage\": [\"v1\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/", "{\"v1\": [\"output\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediamessage/v1/output/", "{\"output\": [\"front\"]}"); // schema line missing CustomHttpServerRequest getValidationResourceRequest = new CustomHttpServerRequest(HttpMethod.GET, "/foo/mediamessage/v1/output/front"); @@ -174,7 +177,7 @@ public void testValidationWithNoSchema(TestContext context){ } @Test - public void testValidationWithValidResourceContentAndPresentSchema(TestContext context){ + public void testValidationWithValidResourceContentAndPresentSchema(TestContext context) { Async async = context.async(); when(validationSchemaProvider.schemaFromLocation(any(SchemaLocation.class))) .thenReturn(Future.succeededFuture(Optional.of(createSchema(SAMPLE_SCHEMA)))); @@ -192,7 +195,7 @@ public void testValidationWithValidResourceContentAndPresentSchema(TestContext c } @Test - public void testValidationWithValidResourceContentAndMissingSchema(TestContext context){ + public void testValidationWithValidResourceContentAndMissingSchema(TestContext context) { Async async = context.async(); when(validationSchemaProvider.schemaFromLocation(any(SchemaLocation.class))) .thenReturn(Future.succeededFuture(Optional.empty())); @@ -204,13 +207,13 @@ public void testValidationWithValidResourceContentAndMissingSchema(TestContext c validator.validate(getValidationResourceRequest, type, jsonBuffer, new SchemaLocation("/path/to/the/schema", null), validationResult -> { context.assertFalse(validationResult.isSuccess(), "ValidationResult should not be a success (COULD_NOT_VALIDATE)"); context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, validationResult.getValidationStatus()); - context.assertEquals( "No schema found in location /path/to/the/schema", validationResult.getMessage()); + context.assertEquals("No schema found in location /path/to/the/schema", validationResult.getMessage()); async.complete(); }); } @Test - public void testValidationWithValidResourceContentAndSchemaProviderError(TestContext context){ + public void testValidationWithValidResourceContentAndSchemaProviderError(TestContext context) { Async async = context.async(); when(validationSchemaProvider.schemaFromLocation(any(SchemaLocation.class))) .thenReturn(Future.failedFuture("Boooom")); @@ -222,13 +225,13 @@ public void testValidationWithValidResourceContentAndSchemaProviderError(TestCon validator.validate(getValidationResourceRequest, type, jsonBuffer, new SchemaLocation("/path/to/the/schema", null), validationResult -> { context.assertFalse(validationResult.isSuccess(), "ValidationResult should not be a success (COULD_NOT_VALIDATE)"); context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, validationResult.getValidationStatus()); - context.assertEquals( "Error while getting schema. Cause: Boooom", validationResult.getMessage()); + context.assertEquals("Error while getting schema. Cause: Boooom", validationResult.getMessage()); async.complete(); }); } @Test - public void testValidationWithNonValidResourceContentAndPresentSchema(TestContext context){ + public void testValidationWithNonValidResourceContentAndPresentSchema(TestContext context) { Async async = context.async(); when(validationSchemaProvider.schemaFromLocation(any(SchemaLocation.class))) @@ -250,13 +253,84 @@ public void testValidationWithNonValidResourceContentAndPresentSchema(TestContex }); } - private void prepareSchema(String schemaJson){ - storage.putMockData("/foo/schemas/apis/foo/","{\"foo\": [\"mediadata\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediadata/","{\"mediadata\": [\"v1\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediadata/v1/","{\"v1\": [\"specials\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediadata/v1/specials/","{\"specials\": [\"_\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediadata/v1/specials/_/","{\"_\": [\"GET\"]}"); - storage.putMockData("/foo/schemas/apis/foo/mediadata/v1/specials/_/GET/","{\"GET\": [\"out\"]}"); + @Test + public void testValidateWithSchemaLocation(TestContext context) { + Async async = context.async(); + + when(validationSchemaProvider.schemaFromLocation(any(SchemaLocation.class))) + .thenReturn(Future.succeededFuture(Optional.of(createSchema(SAMPLE_SCHEMA)))); + + Buffer jsonBuffer = Buffer.buffer(CONTENT_MATCHING_SAMPLE_SCHEMA); + validator.validateWithSchemaLocation(new SchemaLocation("/path/to/the/schema", null), jsonBuffer, logger) + .setHandler(validationResult -> { + context.assertTrue(validationResult.succeeded()); + context.assertEquals(ValidationStatus.VALIDATED_POSITIV, validationResult.result().getValidationStatus()); + async.complete(); + }); + } + + @Test + public void testValidateWithSchemaLocationAndSchemaProviderError(TestContext context) { + Async async = context.async(); + when(validationSchemaProvider.schemaFromLocation(any(SchemaLocation.class))) + .thenReturn(Future.failedFuture("Boooom")); + + Buffer jsonBuffer = Buffer.buffer(CONTENT_MATCHING_SAMPLE_SCHEMA); + validator.validateWithSchemaLocation(new SchemaLocation("/path/to/the/schema", null), jsonBuffer, logger) + .setHandler(validationResult -> { + context.assertFalse(validationResult.result().isSuccess()); + context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, validationResult.result().getValidationStatus()); + context.assertEquals("Error while getting schema. Cause: Boooom", validationResult.result().getMessage()); + async.complete(); + }); + } + + @Test + public void testValidateWithSchemaLocationAndMissingSchema(TestContext context) { + Async async = context.async(); + when(validationSchemaProvider.schemaFromLocation(any(SchemaLocation.class))) + .thenReturn(Future.succeededFuture(Optional.empty())); + + Buffer jsonBuffer = Buffer.buffer(CONTENT_MATCHING_SAMPLE_SCHEMA); + + validator.validateWithSchemaLocation(new SchemaLocation("/path/to/the/schema", null), jsonBuffer, logger) + .setHandler(validationResult -> { + context.assertFalse(validationResult.result().isSuccess(), "ValidationResult should not be a success (COULD_NOT_VALIDATE)"); + context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, validationResult.result().getValidationStatus()); + context.assertEquals("No schema found in location /path/to/the/schema", validationResult.result().getMessage()); + async.complete(); + }); + } + + @Test + public void testValidateWithSchemaLocationWithNonValidResourceContentAndPresentSchema(TestContext context) { + Async async = context.async(); + + when(validationSchemaProvider.schemaFromLocation(any(SchemaLocation.class))) + .thenReturn(Future.succeededFuture(Optional.of(createSchema(SAMPLE_SCHEMA)))); + + Buffer jsonBuffer = Buffer.buffer(CONTENT_NOT_MATCHING_SAMPLE_SCHEMA); + + validator.validateWithSchemaLocation(new SchemaLocation("/path/to/the/schema", null), jsonBuffer, logger) + .setHandler(validationResult -> { + context.assertFalse(validationResult.result().isSuccess()); + context.assertEquals(ValidationStatus.VALIDATED_NEGATIV, validationResult.result().getValidationStatus()); + String message = validationResult.result().getMessage(); + context.assertFalse(message.contains("Could not get path"), message); + context.assertFalse(message.contains("No schema for"), message); + context.assertTrue(message.contains("Invalid JSON for /path/to/the/schema"), message); + context.assertTrue(message.contains("\"message\" : \"$.lastName: is missing but it is required\""), message); + async.complete(); + }); + } + + private void prepareSchema(String schemaJson) { + storage.putMockData("/foo/schemas/apis/foo/", "{\"foo\": [\"mediadata\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediadata/", "{\"mediadata\": [\"v1\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediadata/v1/", "{\"v1\": [\"specials\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediadata/v1/specials/", "{\"specials\": [\"_\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediadata/v1/specials/_/", "{\"_\": [\"GET\"]}"); + storage.putMockData("/foo/schemas/apis/foo/mediadata/v1/specials/_/GET/", "{\"GET\": [\"out\"]}"); storage.putMockData("/foo/schemas/apis/foo/mediadata/v1/specials/_/GET/out", schemaJson); } } From ab08c2f93b6d43f74cf4539fe581dd6d1082165b Mon Sep 17 00:00:00 2001 From: Marc-Andre Weber Date: Thu, 6 Jan 2022 09:16:16 +0100 Subject: [PATCH 2/2] #400 Adopted KafkaHandler constructor --- .../swisspush/gateleen/kafka/KafkaHandler.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java index e4e0b8d60..134e31d1c 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java @@ -58,21 +58,23 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { - super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema"); - this.kafkaMessageValidator = kafkaMessageValidator; - this.repository = repository; - this.kafkaMessageSender = kafkaMessageSender; - this.streamingPath = streamingPath; - this.properties = new HashMap<>(); - - this.topicExtractor = new KafkaTopicExtractor(streamingPath); + this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender, + configResourceUri, streamingPath, new HashMap<>()); } public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { + this(configurationResourceManager, null, repository, kafkaMessageSender, + configResourceUri, streamingPath, properties); + } + + public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, + KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { + super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema"); this.repository = repository; + this.kafkaMessageValidator = kafkaMessageValidator; this.kafkaMessageSender = kafkaMessageSender; this.streamingPath = streamingPath; this.properties = properties;