Skip to content

Commit

Permalink
Merge pull request #416 from swisspush/develop
Browse files Browse the repository at this point in the history
PR for new release
  • Loading branch information
mcweba authored Jan 6, 2022
2 parents 23ad7bc + 30cb559 commit 4317dd3
Show file tree
Hide file tree
Showing 38 changed files with 925 additions and 217 deletions.
2 changes: 1 addition & 1 deletion gateleen-cache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.2.8-SNAPSHOT</version>
<version>1.2.9-SNAPSHOT</version>
</parent>

<artifactId>gateleen-cache</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.2.8-SNAPSHOT</version>
<version>1.2.9-SNAPSHOT</version>
</parent>

<artifactId>gateleen-core</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-delegate/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.2.8-SNAPSHOT</version>
<version>1.2.9-SNAPSHOT</version>
</parent>

<artifactId>gateleen-delegate</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-delta/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.2.8-SNAPSHOT</version>
<version>1.2.9-SNAPSHOT</version>
</parent>

<artifactId>gateleen-delta</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-expansion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.2.8-SNAPSHOT</version>
<version>1.2.9-SNAPSHOT</version>
</parent>

<artifactId>gateleen-expansion</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-hook-js/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.2.8-SNAPSHOT</version>
<version>1.2.9-SNAPSHOT</version>
</parent>
<artifactId>gateleen-hook-js</artifactId>
<packaging>jar</packaging>
Expand Down
2 changes: 1 addition & 1 deletion gateleen-hook/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.2.8-SNAPSHOT</version>
<version>1.2.9-SNAPSHOT</version>
</parent>

<artifactId>gateleen-hook</artifactId>
Expand Down
22 changes: 10 additions & 12 deletions gateleen-kafka/README_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@ The following example shows a topic configuration resource with configurations f
bottom. The first topic name matching the provided topic is used.
```json
{
"topics": {
"my.topic.*": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
},
".+": {
"bootstrap.servers": "localhost:9093",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
}
}
"my.topic.*": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
},
".+": {
"bootstrap.servers": "localhost:9093",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
}
}
```
In the example above, a topic called `my.topic.abc` would use the first configuration entry. A topic called `some.other.topic` would use the second configuration entry.
Expand Down
2 changes: 1 addition & 1 deletion gateleen-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.swisspush.gateleen</groupId>
<artifactId>gateleen</artifactId>
<version>1.2.8-SNAPSHOT</version>
<version>1.2.9-SNAPSHOT</version>
</parent>

<artifactId>gateleen-kafka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.core.util.StringUtils;
import org.swisspush.gateleen.core.validation.ValidationResult;
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Handler class for all Kafka related requests.
*
Expand All @@ -47,25 +45,36 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final KafkaTopicExtractor topicExtractor;
private final KafkaMessageSender kafkaMessageSender;
private final Map<String, Object> properties;
private KafkaMessageValidator kafkaMessageValidator;

private boolean initialized = false;

public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
this.repository = repository;
this.kafkaMessageSender = kafkaMessageSender;
this.streamingPath = streamingPath;
this.properties = new HashMap<>();
this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath);
}

this.topicExtractor = new KafkaTopicExtractor(streamingPath);
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
String streamingPath) {
this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath, new HashMap<>());
}

public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath, properties);
}

public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
this.repository = repository;
this.kafkaMessageValidator = kafkaMessageValidator;
this.kafkaMessageSender = kafkaMessageSender;
this.streamingPath = streamingPath;
this.properties = properties;
Expand Down Expand Up @@ -114,14 +123,14 @@ public boolean handle(final HttpServerRequest request) {
}

final Optional<String> optTopic = topicExtractor.extractTopic(request);
if(!optTopic.isPresent()){
if(optTopic.isEmpty()){
respondWith(StatusCode.BAD_REQUEST, "Could not extract topic from request uri", request);
return true;
}

String topic = optTopic.get();
final Optional<Pair<KafkaProducer<String, String>, Pattern>> optProducer = repository.findMatchingKafkaProducer(topic);
if(!optProducer.isPresent()){
if(optProducer.isEmpty()){
respondWith(StatusCode.NOT_FOUND, "Could not find a matching producer for topic " + topic, request);
return true;
}
Expand All @@ -130,13 +139,23 @@ public boolean handle(final HttpServerRequest request) {
try {
log.debug("incoming kafka message payload: {}", payload.toString());
final List<KafkaProducerRecord<String, String>> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload);
kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).setHandler(event -> {
if(event.succeeded()) {
RequestLoggerFactory.getLogger(KafkaHandler.class, request)
.info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic);
respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
maybeValidate(request, kafkaProducerRecords).setHandler(validationEvent -> {
if(validationEvent.succeeded()) {
if(validationEvent.result().isSuccess()) {
kafkaMessageSender.sendMessages(optProducer.get().getLeft(), kafkaProducerRecords).setHandler(event -> {
if(event.succeeded()) {
RequestLoggerFactory.getLogger(KafkaHandler.class, request)
.info("Successfully sent {} message(s) to kafka topic '{}'", kafkaProducerRecords.size(), topic);
respondWith(StatusCode.OK, StatusCode.OK.getStatusMessage(), request);
} else {
respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
}
});
} else {
respondWith(StatusCode.BAD_REQUEST, validationEvent.result().getMessage(), request);
}
} else {
respondWith(StatusCode.INTERNAL_SERVER_ERROR, event.cause().getMessage(), request);
respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request);
}
});
} catch (ValidationException ve){
Expand Down Expand Up @@ -164,6 +183,13 @@ public void resourceRemoved(String resourceUri) {
}
}

private Future<ValidationResult> maybeValidate(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
if(kafkaMessageValidator != null) {
return kafkaMessageValidator.validateMessages(request, kafkaProducerRecords);
}
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}

private void respondWith(StatusCode statusCode, String responseMessage, HttpServerRequest request) {
ResponseStatusCodeLogUtil.info(request, statusCode, KafkaHandler.class);
if(statusCode != StatusCode.OK) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.swisspush.gateleen.kafka;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.validation.ValidationResult;
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.SchemaLocation;
import org.swisspush.gateleen.validation.ValidationResourceManager;
import org.swisspush.gateleen.validation.ValidationUtil;
import org.swisspush.gateleen.validation.Validator;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.util.stream.Collectors.toList;

public class KafkaMessageValidator {

private final ValidationResourceManager validationResourceManager;
private final Validator validator;
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);

public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) {
this.validationResourceManager = validationResourceManager;
this.validator = validator;
}

public Future<ValidationResult> validateMessages(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
if (kafkaProducerRecords.isEmpty()) {
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}

Map<String, String> entry = ValidationUtil.matchingValidationResourceEntry(validationResourceManager.getValidationResource(), request, log);
if (entry == null) {
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}

Optional<SchemaLocation> optionalSchemaLocation = ValidationUtil.matchingSchemaLocation(validationResourceManager.getValidationResource(), request, log);
if (optionalSchemaLocation.isEmpty()) {
log.warn("No schema location found for {}. Could not validate kafka message", request.uri());
return Future.succeededFuture(new ValidationResult(ValidationStatus.COULD_NOT_VALIDATE));
}

SchemaLocation schemaLocation = optionalSchemaLocation.get();

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = kafkaProducerRecords.stream()
.map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log))
.collect(toList());

return CompositeFuture.all(futures).compose(compositeFuture -> {
for (Object o : compositeFuture.list()) {
if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) {
return Future.succeededFuture((ValidationResult) o);
}
}
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
});
}
}
Loading

0 comments on commit 4317dd3

Please sign in to comment.