diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java index e892e8be..d84b0672 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java @@ -59,20 +59,20 @@ static List parse(Buffer configurationResourceBuffer null, null )); - continue; - } - String postfixFromHeader = queueConfig.getString(POSTFIX_FROM_HEADER_KEY); - String postfixFromUrl = queueConfig.getString(POSTFIX_FROM_URL_KEY); - if (postfixFromHeader != null || postfixFromUrl != null) { - queueSplitterConfigurations.add(new QueueSplitterConfiguration( - pattern, - queueConfig.getString(POSTFIX_DELIMITER_KEY, DEFAULT_POSTFIX_DELIMITER), - null, - postfixFromHeader, - postfixFromUrl != null ? Pattern.compile(postfixFromUrl) : null - )); } else { - log.warn("Queue splitter configuration without a postfix definition"); + String postfixFromHeader = queueConfig.getString(POSTFIX_FROM_HEADER_KEY); + String postfixFromUrl = queueConfig.getString(POSTFIX_FROM_URL_KEY); + if (postfixFromHeader != null || postfixFromUrl != null) { + queueSplitterConfigurations.add(new QueueSplitterConfiguration( + pattern, + queueConfig.getString(POSTFIX_DELIMITER_KEY, DEFAULT_POSTFIX_DELIMITER), + null, + postfixFromHeader, + postfixFromUrl != null ? Pattern.compile(postfixFromUrl) : null + )); + } else { + log.warn("Queue splitter configuration without a postfix definition"); + } } } catch (PatternSyntaxException patternException) { log.warn("Queue splitter '{}' is not a valid regex pattern. Discarding this queue splitter configuration", queuePattern); diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java index aa5faef6..085cb384 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java @@ -9,7 +9,7 @@ import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer; import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutor; -import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromList; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromStaticList; import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromRequest; import java.util.*; @@ -67,7 +67,7 @@ private void initializeQueueSplitterConfiguration(Buffer configuration) { queueSplitExecutors.clear(); queueSplitExecutors = configurations.stream().map(queueSplitterConfiguration -> { if (queueSplitterConfiguration.isSplitStatic()) { - return new QueueSplitExecutorFromList(queueSplitterConfiguration); + return new QueueSplitExecutorFromStaticList(queueSplitterConfiguration); } else { return new QueueSplitExecutorFromRequest(queueSplitterConfiguration); } diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromList.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticList.java similarity index 85% rename from gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromList.java rename to gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticList.java index 8124f85a..d23d8056 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromList.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticList.java @@ -5,11 +5,11 @@ import java.util.concurrent.atomic.AtomicInteger; -public class QueueSplitExecutorFromList extends QueueSplitExecutorBase { +public class QueueSplitExecutorFromStaticList extends QueueSplitExecutorBase { AtomicInteger atomicInteger = new AtomicInteger(0); - public QueueSplitExecutorFromList(QueueSplitterConfiguration configuration) { + public QueueSplitExecutorFromStaticList(QueueSplitterConfiguration configuration) { super(configuration); } diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequestTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequestTest.java index c79f2c8a..9a44b520 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequestTest.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequestTest.java @@ -126,7 +126,7 @@ public void testExecuteSplitWithUrlAndHeader() { public void testExecuteSplitForWrongQueue() { // Given - QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration( + QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration( Pattern.compile("queue-1"), "-", null, diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromListTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticListTest.java similarity index 82% rename from gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromListTest.java rename to gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticListTest.java index e815df2e..3a162b08 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromListTest.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticListTest.java @@ -10,13 +10,13 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.mock; -public class QueueSplitExecutorFromListTest { +public class QueueSplitExecutorFromStaticListTest { @Test public void testMatchesWithStaticQueueName() { // Given - QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration( + QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration( Pattern.compile("queue-1"), "-", List.of("A", "B", "C", "D"), @@ -33,7 +33,7 @@ public void testMatchesWithStaticQueueName() { public void testMatchesWithWildCharQueueName() { // Given - QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration( + QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration( Pattern.compile("queue-[0-9]+"), "-", List.of("A", "B", "C", "D"), @@ -52,7 +52,7 @@ public void testMatchesWithWildCharQueueName() { public void testExecuteSplit() { // Given - QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration( + QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration( Pattern.compile("queue-1"), "-", List.of("A", "B", "C", "D"), @@ -77,7 +77,7 @@ public void testExecuteSplit() { public void testExecuteSplitForWrongQueue() { // Given - QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration( + QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration( Pattern.compile("queue-1"), "-", List.of("A", "B", "C", "D"), diff --git a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java index 9bb039f7..5f632b7e 100755 --- a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java +++ b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java @@ -230,7 +230,7 @@ public static class RunConfigBuilder { private LoggingResourceManager loggingResourceManager; private ConfigurationResourceManager configurationResourceManager; private QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager; - private QueueSplitter queueSplitter = new NoOpQueueSplitter(); + private QueueSplitter queueSplitter; private EventBusHandler eventBusHandler; private KafkaHandler kafkaHandler; private CustomHttpResponseHandler customHttpResponseHandler; @@ -521,22 +521,22 @@ public static void deployModules(final Vertx vertx, Class verticleClass, Map { - if (event1.failed()) { - log.error("Could not load rest storage redis module", event1.cause()); - handler.handle(false); - return; - } - - // metrics module - vertx.deployVerticle("org.swisspush.metrics.MetricsModule", new DeploymentOptions().setConfig(RunConfig.buildMetricsConfig()), event2 -> { - if (event2.failed()) { - log.error("Could not load metrics module", event2.cause()); + // rest storage module + vertx.deployVerticle("org.swisspush.reststorage.RestStorageMod", new DeploymentOptions().setConfig(RunConfig.buildStorageConfig()).setInstances(4), event1 -> { + if (event1.failed()) { + log.error("Could not load rest storage redis module", event1.cause()); handler.handle(false); return; } - handler.handle(true); + + // metrics module + vertx.deployVerticle("org.swisspush.metrics.MetricsModule", new DeploymentOptions().setConfig(RunConfig.buildMetricsConfig()), event2 -> { + if (event2.failed()) { + log.error("Could not load metrics module", event2.cause()); + handler.handle(false); + return; + } + handler.handle(true); }); }); }); @@ -608,11 +608,25 @@ private void handleRequest(final RoutingContext ctx) { return; } if (PackingHandler.isPacked(request)) { - request.bodyHandler(new PackingHandler(request, new QueuingHandler(vertx, redisProvider, request, monitoringHandler, queueSplitter))); + request.bodyHandler(new PackingHandler( + request, + new QueuingHandler( + vertx, + redisProvider, + request, + monitoringHandler, + queueSplitter != null ? queueSplitter : new NoOpQueueSplitter() + ) + )); } else { if (QueuingHandler.isQueued(request)) { setISO8601Timestamps(request); - request.bodyHandler(new QueuingHandler(vertx, redisProvider, request, monitoringHandler, queueSplitter)); + request.bodyHandler(new QueuingHandler( + vertx, + redisProvider, + request, + monitoringHandler, + queueSplitter != null ? queueSplitter : new NoOpQueueSplitter())); } else { if (cacheHandler != null && cacheHandler.handle(request)) { return;