diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java index a2345d9f6..20a95c8f7 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java @@ -2,6 +2,7 @@ import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; @@ -12,6 +13,7 @@ import org.slf4j.LoggerFactory; import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer; import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; import org.swisspush.gateleen.core.http.RequestLoggerFactory; import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil; import org.swisspush.gateleen.core.util.StatusCode; @@ -25,6 +27,8 @@ import java.util.Optional; import java.util.regex.Pattern; +import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory; + /** * Handler class for all Kafka related requests. * @@ -42,20 +46,26 @@ public class KafkaHandler extends ConfigurationResourceConsumer { private final Logger log = LoggerFactory.getLogger(KafkaHandler.class); private final String streamingPath; + private final GateleenExceptionFactory exceptionFactory; private final KafkaProducerRepository repository; private final KafkaTopicExtractor topicExtractor; private final KafkaMessageSender kafkaMessageSender; private final Map properties; + private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder; private KafkaMessageValidator kafkaMessageValidator; private boolean initialized = false; + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { this(configurationResourceManager, null, repository, kafkaMessageSender, configResourceUri, streamingPath); } + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { @@ -63,6 +73,8 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K configResourceUri, streamingPath, new HashMap<>()); } + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { @@ -70,10 +82,29 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K configResourceUri, streamingPath, properties); } + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { + this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager, + kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath, + properties); + log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!"); + } + public KafkaHandler( + Vertx vertx, + GateleenExceptionFactory exceptionFactory, + ConfigurationResourceManager configurationResourceManager, + KafkaMessageValidator kafkaMessageValidator, + KafkaProducerRepository repository, + KafkaMessageSender kafkaMessageSender, + String configResourceUri, + String streamingPath, + Map properties + ) { super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema"); + this.exceptionFactory = exceptionFactory; this.repository = repository; this.kafkaMessageValidator = kafkaMessageValidator; this.kafkaMessageSender = kafkaMessageSender; @@ -81,6 +112,11 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K this.properties = properties; this.topicExtractor = new KafkaTopicExtractor(streamingPath); + this.kafkaProducerRecordBuilder = new KafkaProducerRecordBuilder(vertx, exceptionFactory); + } + + public static KafkaHandlerBuilder builder() { + return new KafkaHandlerBuilder(); } public Future initialize() { @@ -140,9 +176,11 @@ public boolean handle(final HttpServerRequest request) { } request.bodyHandler(payload -> { - try { - log.debug("incoming kafka message payload: {}", payload); - final List> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload); + log.debug("incoming kafka message payload: {}", payload); + // TODO refactor away this callback-hell (Counts for the COMPLETE method + // surrounding this line, named 'KafkaHandler.handle()', NOT only + // those lines below). + kafkaProducerRecordBuilder.buildRecordsAsync(topic, payload).compose((List> kafkaProducerRecords) -> { maybeValidate(request, kafkaProducerRecords).onComplete(validationEvent -> { if(validationEvent.succeeded()) { if(validationEvent.result().isSuccess()) { @@ -162,9 +200,15 @@ public boolean handle(final HttpServerRequest request) { respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request); } }); - } catch (ValidationException ve){ - respondWith(StatusCode.BAD_REQUEST, ve.getMessage(), request); - } + return Future.succeededFuture(); + }).onFailure((Throwable ex) -> { + if (ex instanceof ValidationException) { + respondWith(StatusCode.BAD_REQUEST, ex.getMessage(), request); + return; + } + log.error("TODO error handling", exceptionFactory.newException(ex)); + respondWith(StatusCode.INTERNAL_SERVER_ERROR, ex.getMessage(), request); + }); }); return true; } diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandlerBuilder.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandlerBuilder.java new file mode 100644 index 000000000..324e2354c --- /dev/null +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandlerBuilder.java @@ -0,0 +1,85 @@ +package org.swisspush.gateleen.kafka; + +import io.vertx.core.Vertx; +import org.slf4j.Logger; +import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; + +import java.util.Map; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory; + +public class KafkaHandlerBuilder { + + private static final Logger log = getLogger(KafkaHandlerBuilder.class); + private Vertx vertx; + private GateleenExceptionFactory exceptionFactory; + private ConfigurationResourceManager configurationResourceManager; + private KafkaMessageValidator kafkaMessageValidator; + private KafkaProducerRepository repository; + private KafkaMessageSender kafkaMessageSender; + private String configResourceUri; + private String streamingPath; + private Map properties; + + /** Use {@link KafkaHandler#builder()} */ + KafkaHandlerBuilder() {/**/} + + public KafkaHandler build() { + if (vertx == null) throw new NullPointerException("vertx missing"); + if (exceptionFactory == null) exceptionFactory = newGateleenThriftyExceptionFactory(); + if (repository == null) throw new NullPointerException("kafkaProducerRepository missing"); + if (kafkaMessageSender == null) throw new NullPointerException("kafkaMessageSender missing"); + if (streamingPath == null) log.warn("no 'streamingPath' given. Are you sure you want none?"); + return new KafkaHandler( + vertx, exceptionFactory, configurationResourceManager, kafkaMessageValidator, repository, + kafkaMessageSender, configResourceUri, streamingPath, properties); + } + + public KafkaHandlerBuilder withVertx(Vertx vertx) { + this.vertx = vertx; + return this; + } + + public KafkaHandlerBuilder withExceptionFactory(GateleenExceptionFactory exceptionFactory) { + this.exceptionFactory = exceptionFactory; + return this; + } + + public KafkaHandlerBuilder withConfigurationResourceManager(ConfigurationResourceManager configurationResourceManager) { + this.configurationResourceManager = configurationResourceManager; + return this; + } + + public KafkaHandlerBuilder withKafkaMessageValidator(KafkaMessageValidator kafkaMessageValidator) { + this.kafkaMessageValidator = kafkaMessageValidator; + return this; + } + + public KafkaHandlerBuilder withRepository(KafkaProducerRepository repository) { + this.repository = repository; + return this; + } + + public KafkaHandlerBuilder withKafkaMessageSender(KafkaMessageSender kafkaMessageSender) { + this.kafkaMessageSender = kafkaMessageSender; + return this; + } + + public KafkaHandlerBuilder withConfigResourceUri(String configResourceUri) { + this.configResourceUri = configResourceUri; + return this; + } + + public KafkaHandlerBuilder withStreamingPath(String streamingPath) { + this.streamingPath = streamingPath; + return this; + } + + public KafkaHandlerBuilder withProperties(Map properties) { + this.properties = properties; + return this; + } + +} diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java index 83b3bf62f..c7a850a1d 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java @@ -1,15 +1,23 @@ package org.swisspush.gateleen.kafka; +import io.vertx.core.Future; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.DecodeException; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.slf4j.Logger; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; import org.swisspush.gateleen.validation.ValidationException; import java.util.ArrayList; import java.util.List; +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.currentThread; +import static org.slf4j.LoggerFactory.getLogger; + /** * Creates {@link KafkaProducerRecord}s by parsing the request payload. * @@ -17,10 +25,21 @@ */ class KafkaProducerRecordBuilder { + private static final Logger log = getLogger(KafkaProducerRecordBuilder.class); private static final String RECORDS = "records"; private static final String KEY = "key"; private static final String VALUE = "value"; private static final String HEADERS = "headers"; + private final Vertx vertx; + private final GateleenExceptionFactory exceptionFactory; + + KafkaProducerRecordBuilder( + Vertx vertx, + GateleenExceptionFactory exceptionFactory + ) { + this.vertx = vertx; + this.exceptionFactory = exceptionFactory; + } /** * Builds a list of {@link KafkaProducerRecord}s based on the provided payload. @@ -32,6 +51,39 @@ class KafkaProducerRecordBuilder { * @return A list of {@link KafkaProducerRecord}s created from the provided payload * @throws ValidationException when the payload is not valid (missing properties, wrong types, etc.) */ + Future>> buildRecordsAsync(String topic, Buffer payload) { + return Future.succeededFuture().compose((Void v) -> { + JsonObject payloadObj; + try { + payloadObj = new JsonObject(payload); + } catch (DecodeException de) { + return Future.failedFuture(new ValidationException("Error while parsing payload", de)); + } + JsonArray recordsArray; + try { + recordsArray = payloadObj.getJsonArray(RECORDS); + } catch (ClassCastException cce) { + return Future.failedFuture(new ValidationException("Property '" + RECORDS + "' must be of type JsonArray holding JsonObject objects")); + } + if (recordsArray == null) { + return Future.failedFuture(new ValidationException("Missing 'records' array")); + } + return vertx.executeBlocking(() -> { + assert !currentThread().getName().toUpperCase().contains("EVENTLOOP") : currentThread().getName(); + long beginEpchMs = currentTimeMillis(); + List> kafkaProducerRecords = new ArrayList<>(recordsArray.size()); + for (int i = 0; i < recordsArray.size(); i++) { + kafkaProducerRecords.add(fromRecordJsonObject(topic, recordsArray.getJsonObject(i))); + } + long durationMs = currentTimeMillis() - beginEpchMs; + log.debug("Serializing JSON did block thread for {}ms", durationMs); + return kafkaProducerRecords; + }); + }); + } + + /** @deprecated Use {@link #buildRecordsAsync(String, Buffer)}. */ + @Deprecated static List> buildRecords(String topic, Buffer payload) throws ValidationException { List> kafkaProducerRecords = new ArrayList<>(); JsonObject payloadObj;