Skip to content

Commit

Permalink
Merge pull request #415 from swisspush/issue400_schemaLocation_kafka
Browse files Browse the repository at this point in the history
#400 Extend gateleen-validation with schemaLocation
  • Loading branch information
mcweba authored Jan 6, 2022
2 parents 19cf48d + ab08c2f commit 30cb559
Show file tree
Hide file tree
Showing 13 changed files with 900 additions and 192 deletions.
22 changes: 10 additions & 12 deletions gateleen-kafka/README_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@ The following example shows a topic configuration resource with configurations f
bottom. The first topic name matching the provided topic is used.
```json
{
"topics": {
"my.topic.*": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
},
".+": {
"bootstrap.servers": "localhost:9093",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
}
}
"my.topic.*": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
},
".+": {
"bootstrap.servers": "localhost:9093",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
}
}
```
In the example above, a topic called `my.topic.abc` would use the first configuration entry. A topic called `some.other.topic` would use the second configuration entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.core.util.StringUtils;
import org.swisspush.gateleen.core.validation.ValidationResult;
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Handler class for all Kafka related requests.
*
Expand All @@ -47,25 +45,36 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final KafkaTopicExtractor topicExtractor;
private final KafkaMessageSender kafkaMessageSender;
private final Map<String, Object> properties;
private KafkaMessageValidator kafkaMessageValidator;

private boolean initialized = false;

public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
this.repository = repository;
this.kafkaMessageSender = kafkaMessageSender;
this.streamingPath = streamingPath;
this.properties = new HashMap<>();
this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath);
}

this.topicExtractor = new KafkaTopicExtractor(streamingPath);
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
String streamingPath) {
this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath, new HashMap<>());
}

public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath, properties);
}

public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
this.repository = repository;
this.kafkaMessageValidator = kafkaMessageValidator;
this.kafkaMessageSender = kafkaMessageSender;
this.streamingPath = streamingPath;
this.properties = properties;
Expand Down Expand Up @@ -114,14 +123,14 @@ public boolean handle(final HttpServerRequest request) {
}

final Optional<String> optTopic = topicExtractor.extractTopic(request);
if(!optTopic.isPresent()){
if(optTopic.isEmpty()){
respondWith(StatusCode.BAD_REQUEST, "Could not extract topic from request uri", request);
return true;
}

String topic = optTopic.get();
final Optional<Pair<KafkaProducer<String, String>, Pattern>> optProducer = repository.findMatchingKafkaProducer(topic);
if(!optProducer.isPresent()){
if(optProducer.isEmpty()){
respondWith(StatusCode.NOT_FOUND, "Could not find a matching producer for topic " + topic, request);
return true;
}
Expand All @@ -130,13 +139,23 @@ public boolean handle(final HttpServerRequest request) {
try {
log.debug("incoming kafka message payload: {}", payload.toString());
final List<KafkaProducerRecord<String, String>> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload);
kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).setHandler(event -> {
if(event.succeeded()) {
RequestLoggerFactory.getLogger(KafkaHandler.class, request)
.info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic);
respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
maybeValidate(request, kafkaProducerRecords).setHandler(validationEvent -> {
if(validationEvent.succeeded()) {
if(validationEvent.result().isSuccess()) {
kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).setHandler(event -> {
if(event.succeeded()) {
RequestLoggerFactory.getLogger(KafkaHandler.class, request)
.info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic);
respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
} else {
respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
}
});
} else {
respondWith(StatusCode.BAD_REQUEST, validationEvent.result().getMessage(), request);
}
} else {
respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request);
}
});
} catch (ValidationException ve){
Expand Down Expand Up @@ -164,6 +183,13 @@ public void resourceRemoved(String resourceUri) {
}
}

private Future<ValidationResult> maybeValidate(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
if(kafkaMessageValidator != null) {
return kafkaMessageValidator.validateMessages(request, kafkaProducerRecords);
}
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}

private void respondWith(StatusCode statusCode, String responseMessage, HttpServerRequest request) {
ResponseStatusCodeLogUtil.info(request, statusCode, KafkaHandler.class);
if(statusCode != StatusCode.OK) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.swisspush.gateleen.kafka;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.validation.ValidationResult;
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.SchemaLocation;
import org.swisspush.gateleen.validation.ValidationResourceManager;
import org.swisspush.gateleen.validation.ValidationUtil;
import org.swisspush.gateleen.validation.Validator;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.util.stream.Collectors.toList;

public class KafkaMessageValidator {

private final ValidationResourceManager validationResourceManager;
private final Validator validator;
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);

public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) {
this.validationResourceManager = validationResourceManager;
this.validator = validator;
}

public Future<ValidationResult> validateMessages(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
if (kafkaProducerRecords.isEmpty()) {
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}

Map<String, String> entry = ValidationUtil.matchingValidationResourceEntry(validationResourceManager.getValidationResource(), request, log);
if (entry == null) {
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}

Optional<SchemaLocation> optionalSchemaLocation = ValidationUtil.matchingSchemaLocation(validationResourceManager.getValidationResource(), request, log);
if (optionalSchemaLocation.isEmpty()) {
log.warn("No schema location found for {}. Could not validate kafka message", request.uri());
return Future.succeededFuture(new ValidationResult(ValidationStatus.COULD_NOT_VALIDATE));
}

SchemaLocation schemaLocation = optionalSchemaLocation.get();

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = kafkaProducerRecords.stream()
.map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log))
.collect(toList());

return CompositeFuture.all(futures).compose(compositeFuture -> {
for (Object o : compositeFuture.list()) {
if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) {
return Future.succeededFuture((ValidationResult) o);
}
}
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.swisspush.gateleen.core.storage.MockResourceStorage;
import org.swisspush.gateleen.core.util.ResourcesUtils;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.core.validation.ValidationResult;
import org.swisspush.gateleen.core.validation.ValidationStatus;

import java.util.HashMap;
import java.util.List;
Expand All @@ -48,6 +50,7 @@ public class KafkaHandlerTest {
private Vertx vertx;
private KafkaProducerRepository repository;
private KafkaMessageSender kafkaMessageSender;
private KafkaMessageValidator messageValidator;
private ConfigurationResourceManager configurationResourceManager;
private KafkaHandler handler;
private MockResourceStorage storage;
Expand All @@ -63,6 +66,7 @@ public void setUp() {
vertx = Vertx.vertx();
repository = Mockito.spy(new KafkaProducerRepository(vertx));
kafkaMessageSender = Mockito.mock(KafkaMessageSender.class);
messageValidator = Mockito.mock(KafkaMessageValidator.class);
storage = new MockResourceStorage();
configurationResourceManager = new ConfigurationResourceManager(vertx, storage);
handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender,
Expand Down Expand Up @@ -429,54 +433,101 @@ public void handleValidPayloadWithFailingMessageSending(TestContext context){
});
}

static class StreamingRequest extends DummyHttpServerRequest {
private final String uri;
private final HttpMethod method;
private final String body;
private final MultiMap headers;
private final HttpServerResponse response;

StreamingRequest(HttpMethod method, String uri) {
this(method, uri, "", new CaseInsensitiveHeaders(), new StreamingResponse());
}

StreamingRequest(HttpMethod method, String uri, String body, MultiMap headers, HttpServerResponse response) {
this.method = method;
this.uri = uri;
this.body = body;
this.headers = headers;
this.response = response;
}

@Override public HttpMethod method() {
return method;
}
@Override public String uri() {
return uri;
}
@Override public MultiMap headers() { return headers; }

@Override
public HttpServerRequest bodyHandler(Handler<Buffer> bodyHandler) {
bodyHandler.handle(Buffer.buffer(body));
return this;
}

@Override public HttpServerResponse response() { return response; }
@Test
public void handlePayloadNotPassingValidation(TestContext context){
Async async = context.async();

handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath);

when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
.thenReturn(Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, "Boooom")));

storage.putMockData(configResourceUri, CONFIG_RESOURCE);
handler.initialize().setHandler(event -> {
context.assertTrue(handler.isInitialized());

String singleMessagePayload = "{\n" +
"\t\"records\": [{\n" +
"\t\t\"key\": \"record_0000001\",\n" +
"\t\t\"value\": {\n" +
"\t\t\t\"metadata\": {\n" +
"\t\t\t\t\"techId\": \"071X1500492vora1560860907613\",\n" +
"\t\t\t\t\"user\": \"foo\"\n" +
"\t\t\t},\n" +
"\t\t\t\"event\": {\n" +
"\t\t\t\t\"actionTime\": \"2019-06-18T14:28:27.617+02:00\",\n" +
"\t\t\t\t\"type\": 1,\n" +
"\t\t\t\t\"bool\": false\n" +
"\t\t\t}\n" +
"\t\t},\n" +
"\t\t\"headers\": {\n" +
"\t\t\t\"x-header-a\": \"value-a\",\n" +
"\t\t\t\"x-header-b\": \"value-b\",\n" +
"\t\t\t\"x-header-c\": \"value-c\"\n" +
"\t\t}\n" +
"\t}]\n" +
"}";

HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
StreamingRequest request = new StreamingRequest(HttpMethod.POST, streamingPath + "my.topic.x",
singleMessagePayload, new CaseInsensitiveHeaders(), response);
final boolean handled = handler.handle(request);

context.assertTrue(handled);
verifyZeroInteractions(kafkaMessageSender);
verify(response, times(1)).setStatusCode(eq(StatusCode.BAD_REQUEST.getStatusCode()));

async.complete();
});
}

static class StreamingResponse extends DummyHttpServerResponse {
@Test
public void handleErrorWhileValidation(TestContext context){
Async async = context.async();

private final MultiMap headers;
handler = new KafkaHandler(configurationResourceManager, messageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath);

StreamingResponse(){
this.headers = new CaseInsensitiveHeaders();
}
when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
.thenReturn(Future.failedFuture("Boooom"));

StreamingResponse(MultiMap headers){
this.headers = headers;
}
storage.putMockData(configResourceUri, CONFIG_RESOURCE);
handler.initialize().setHandler(event -> {
context.assertTrue(handler.isInitialized());

@Override public MultiMap headers() { return headers; }
String singleMessagePayload = "{\n" +
"\t\"records\": [{\n" +
"\t\t\"key\": \"record_0000001\",\n" +
"\t\t\"value\": {\n" +
"\t\t\t\"metadata\": {\n" +
"\t\t\t\t\"techId\": \"071X1500492vora1560860907613\",\n" +
"\t\t\t\t\"user\": \"foo\"\n" +
"\t\t\t},\n" +
"\t\t\t\"event\": {\n" +
"\t\t\t\t\"actionTime\": \"2019-06-18T14:28:27.617+02:00\",\n" +
"\t\t\t\t\"type\": 1,\n" +
"\t\t\t\t\"bool\": false\n" +
"\t\t\t}\n" +
"\t\t},\n" +
"\t\t\"headers\": {\n" +
"\t\t\t\"x-header-a\": \"value-a\",\n" +
"\t\t\t\"x-header-b\": \"value-b\",\n" +
"\t\t\t\"x-header-c\": \"value-c\"\n" +
"\t\t}\n" +
"\t}]\n" +
"}";

HttpServerResponse response = spy(new StreamingResponse(new CaseInsensitiveHeaders()));
StreamingRequest request = new StreamingRequest(HttpMethod.POST, streamingPath + "my.topic.x",
singleMessagePayload, new CaseInsensitiveHeaders(), response);
final boolean handled = handler.handle(request);

context.assertTrue(handled);
verifyZeroInteractions(kafkaMessageSender);
verify(response, times(1)).setStatusCode(eq(StatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));

async.complete();
});
}
}
Loading

0 comments on commit 30cb559

Please sign in to comment.