diff --git a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java index 6decb174b..72c68e461 100755 --- a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java +++ b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java @@ -64,6 +64,8 @@ import org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.QueueCircuitBreakerImpl; import org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage; import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitBreakerRulePatternToCircuitMapping; +import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter; +import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterImpl; import org.swisspush.gateleen.routing.CustomHttpResponseHandler; import org.swisspush.gateleen.routing.DeferCloseHttpClient; import org.swisspush.gateleen.routing.Router; @@ -154,6 +156,8 @@ public class Server extends AbstractVerticle { private ContentTypeConstraintHandler contentTypeConstraintHandler; private CacheHandler cacheHandler; + private QueueSplitter queueSplitter; + public static void main(String[] args) { Vertx.vertx().deployVerticle("org.swisspush.gateleen.playground.Server", event -> LoggerFactory.getLogger(Server.class).info("[_] Gateleen - http://localhost:7012/gateleen/") @@ -323,6 +327,9 @@ public void start() { final QueueBrowser queueBrowser = new QueueBrowser(vertx, SERVER_ROOT + "/queuing", Address.redisquesAddress(), monitoringHandler); + queueSplitter = new QueueSplitterImpl(configurationResourceManager, SERVER_ROOT + "/admin/v1/queueSplitters"); + queueSplitter.initialize(); + LogController logController = new LogController(); logController.registerLogConfiguratorMBean(JMX_DOMAIN); @@ -347,6 +354,7 @@ public void start() { .loggingResourceManager(loggingResourceManager) .configurationResourceManager(configurationResourceManager) .queueCircuitBreakerConfigurationResourceManager(queueCircuitBreakerConfigurationResourceManager) + .queueSplitter(queueSplitter) .schedulerResourceManager(schedulerResourceManager) .zipExtractHandler(zipExtractHandler) .delegateHandler(delegateHandler) diff --git a/gateleen-playground/src/main/resources/playground/server/admin/v1/queueSplitters b/gateleen-playground/src/main/resources/playground/server/admin/v1/queueSplitters new file mode 100644 index 000000000..5479df115 --- /dev/null +++ b/gateleen-playground/src/main/resources/playground/server/admin/v1/queueSplitters @@ -0,0 +1,33 @@ +{ + "queue-static-split": { + "description": "Simple static splitter", + "postfixFromStatic": [ + "A", + "B", + "C", + "D" + ] + }, + "queue-header-[a-z]+": { + "description": "Simple splitter with request header", + "postfixDelimiter": "+", + "postfixFromRequest": { + "header": "x-rp-deviceid" + } + }, + "queue-path-[a-z]+": { + "description": "Simple splitter with request url matching", + "postfixDelimiter": "_", + "postfixFromRequest": { + "url": ".*/path1/(.*)/.*" + } + }, + "queue-header-and-path-[a-z]+": { + "description": "Simple splitter with request header and url matching", + "postfixDelimiter": "_", + "postfixFromRequest": { + "header": "x-rp-deviceid", + "url": ".*/path1/(.*)/.*" + } + } +} \ No newline at end of file diff --git a/gateleen-queue/README_queue.md b/gateleen-queue/README_queue.md index b733b6924..0b2617756 100644 --- a/gateleen-queue/README_queue.md +++ b/gateleen-queue/README_queue.md @@ -1,7 +1,7 @@ # gateleen-queue The gateleen-queue module provides queuing functionality and acts as a bridge to [vertx-redisques](https://github.com/swisspush/vertx-redisques). -### Queue retry configuration +## Queue retry configuration Normally, failed queue items remain in the queue until successfully processed or manually deleted. With the `x-queue-retry-xxx` request header, you are able to control this behaviour. Example values are: @@ -32,9 +32,9 @@ The state diagram below describes the possible transitions between these states: ``` +--------+ - +------->+ | open circuit + +------->+ | fail ratio reached success | | CLOSED +---------------------+ - +--------+ | | + +--------+ | open circuit | +---+----+ | ^ | | v @@ -469,4 +469,59 @@ Example: Also you have to enable the logging on the [QueueCircuitBreakerConfigurationResource](src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/configuration/QueueCircuitBreakerConfigurationResource.java) by calling ```java queueCircuitBreakerConfigurationResourceManager.enableResourceLogging(true); -``` \ No newline at end of file +``` + +## Queue Splitter +In case there are queues with a large number of queue items to process there is the option to configure these queues to split into sub-queues. +The split is implemented dispatching the incoming request in one of the sub-queues (so the split is always active). Don't use the queue splitting feature when queue items from a queue have to be in cronological order (requests in sub-queues are processed in parallel). + +Queue splitters are configured together (for example in admin/v1/queueSplitters), each splitter configuration is composted of three parts: +* name: regex used to match the queue name of the incoming requests +* postfix rule: rule used to generate the postfix to append to the initial queue name +* postfixDelimiter: Optional delimiter value to add between queue name and postfix. When not configured, _-_ is used + +There are two types of postfix rules: +* static +* based on request + +### Static postfix rule +In the rule all postfixes for splitting are listed. Here is an example of splitter configuration with static postfix rule: +```json +"queue-static": { + "description": "Simple static splitter", + "postfixFromStatic": [ + "A", + "B", + "C", + "D" + ] + } +``` +In this case the splitter is applied only if the queue in the request is 'queue-static' and the splitting is done distributing uniformly the requests in the sub-queues 'queue-static-A', 'queue-static-B', 'queue-static-C' and 'queue-static-D'. + +### Postfix rule based on request +Request header to use as postfix and/or the regex to extract parts from the url can be defined in the _postfixFromRequest_ property. Here are two examples: +```json + "queue-header-[a-z]+": { + "description": "Simple splitter with request header", + "postfixDelimiter": "+", + "postfixFromRequest": { + "header": "x-rp-deviceid" + } + }, + "queue-path-[a-z]+": { + "description": "Simple splitter with request url matching", + "postfixDelimiter": "_", + "postfixFromRequest": { + "url": ".*/path1/(.*)/.*" + } + } +``` +In both cases the splitter is applied to all the queues matching the name regex. + +In first case the value of the header 'x-rp-deviceid' is added. A queue with name 'queue-header-test' and with a request header 'x-rp-deviceid' valued 'A1B2C3D4' the request is splitted in the sub-queue 'queue-header-test+A1B2C3D4'. +For the second case the matching parts of the request url are added. A queue with name 'queue-path-test' and with the url ending with .../path1/path2/path3 the request is splitted in the sub-queue 'queue-path-test_path2'. + +### Splitter implementation +The evaluation of splitting for a queue is defined in the interface [QueueSplitter](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitter.java) with two implementations: [QueueSplitterImpl](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java) (to execute the splitters configured) and [NoOpQueueSplitter](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/NoOpQueueSplitter.java) (no splitter). +For each splitter configured is created either an instance of [QueueSplitExecutorFromStaticList](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticList.java) (for the case of static postfix rule) or an instance of [QueueSplitExecutorFromRequest](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequest.java) (for the case of postfix rule based on request). diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java index a44fb7311..e95c0face 100755 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java @@ -11,6 +11,8 @@ import org.swisspush.gateleen.core.util.StatusCode; import org.swisspush.gateleen.monitoring.MonitoringHandler; import org.swisspush.gateleen.queue.duplicate.DuplicateCheckHandler; +import org.swisspush.gateleen.queue.queuing.splitter.NoOpQueueSplitter; +import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter; import static org.swisspush.redisques.util.RedisquesAPI.buildCheckOperation; @@ -34,16 +36,45 @@ public static boolean isQueued(HttpServerRequest request) { private final HttpServerRequest request; private final Vertx vertx; private final RedisProvider redisProvider; + private final QueueSplitter queueSplitter; - public QueuingHandler(Vertx vertx, RedisProvider redisProvider, HttpServerRequest request, MonitoringHandler monitoringHandler) { - this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler)); + public QueuingHandler( + Vertx vertx, + RedisProvider redisProvider, + HttpServerRequest request, + MonitoringHandler monitoringHandler + ) { + this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler), new NoOpQueueSplitter()); } - public QueuingHandler(Vertx vertx, RedisProvider redisProvider, HttpServerRequest request, RequestQueue requestQueue) { + public QueuingHandler( + Vertx vertx, + RedisProvider redisProvider, + HttpServerRequest request, + MonitoringHandler monitoringHandler, + QueueSplitter queueSplitter + ) { + this( + vertx, + redisProvider, + request, + new QueueClient(vertx, monitoringHandler), + queueSplitter == null ? new NoOpQueueSplitter() : queueSplitter + ); + } + + public QueuingHandler( + Vertx vertx, + RedisProvider redisProvider, + HttpServerRequest request, + RequestQueue requestQueue, + QueueSplitter queueSplitter + ) { this.request = request; this.vertx = vertx; this.redisProvider = redisProvider; this.requestQueue = requestQueue; + this.queueSplitter = queueSplitter; } @Override @@ -61,12 +92,12 @@ public void handle(final Buffer buffer) { request.response().setStatusMessage(StatusCode.ACCEPTED.getStatusMessage()); request.response().end(); } else { - requestQueue.enqueue(request, headers, buffer, queue); + requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue, request)); } }); } else { - requestQueue.enqueue(request, headers, buffer, queue); + requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue, request)); } } diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/NoOpQueueSplitter.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/NoOpQueueSplitter.java new file mode 100644 index 000000000..b494c7b68 --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/NoOpQueueSplitter.java @@ -0,0 +1,26 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpServerRequest; + +/** + * {@inheritDoc} + */ +public class NoOpQueueSplitter implements QueueSplitter { + + @Override + public Future initialize() { + Promise promise = Promise.promise(); + promise.complete(); + return promise.future(); + } + + @Override + /** + * {@inheritDoc} + */ + public String convertToSubQueue(String queue, HttpServerRequest request) { + return queue; + } +} diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitter.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitter.java new file mode 100644 index 000000000..7d94b1b64 --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitter.java @@ -0,0 +1,24 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import io.vertx.core.Future; +import io.vertx.core.http.HttpServerRequest; + +/** + * Interface for queues configured to be split in sub-queues. The method {@link QueueSplitter#convertToSubQueue(String, HttpServerRequest)} + * evaluates the convert of the queue name in a sub-queue name. + * + * @author https://github.com/gcastaldi [Giannandrea Castaldi] + */ +public interface QueueSplitter { + + public Future initialize(); + + /** + * Convert the queue name in a sub-queue name. If not necessary maintains the initial queue name. + * + * @param queue + * @param request + * @return sub-queue name + */ + String convertToSubQueue(String queue, HttpServerRequest request); +} diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java new file mode 100644 index 000000000..ca37d1047 --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java @@ -0,0 +1,101 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Objects; +import java.util.regex.Pattern; + +/** + * Container holding configuration values for {@link QueueSplitterImpl} identified + * by a queue pattern. + * + * @author https://github.com/gcastaldi [Giannandrea Castaldi] + */ +public class QueueSplitterConfiguration { + + private final Pattern queue; + + private final String postfixDelimiter; + + @Nullable + private final List postfixFromStatic; + + @Nullable + private final String postfixFromHeader; + + @Nullable + private final Pattern postfixFromUrl; + + + public QueueSplitterConfiguration( + Pattern queue, + String postfixDelimiter, + @Nullable List postfixFromStatic, + @Nullable String postfixFromHeader, + @Nullable Pattern postfixFromUrl) { + this.queue = queue; + this.postfixDelimiter = postfixDelimiter; + this.postfixFromStatic = postfixFromStatic; + this.postfixFromHeader = postfixFromHeader; + this.postfixFromUrl = postfixFromUrl; + } + + public Pattern getQueue() { + return queue; + } + + public String getPostfixDelimiter() { + return postfixDelimiter; + } + + @Nullable + public List getPostfixFromStatic() { + return postfixFromStatic; + } + + @Nullable + public String getPostfixFromHeader() { + return postfixFromHeader; + } + + @Nullable + public Pattern getPostfixFromUrl() { + return postfixFromUrl; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + QueueSplitterConfiguration that = (QueueSplitterConfiguration) o; + return Objects.equals(queue, that.queue) && + Objects.equals(postfixDelimiter, that.postfixDelimiter) && + Objects.equals(postfixFromStatic, that.postfixFromStatic) && + Objects.equals(postfixFromHeader, that.postfixFromHeader) && + Objects.equals(postfixFromUrl, that.postfixFromUrl); + } + + @Override + public int hashCode() { + return Objects.hash(queue, postfixDelimiter, postfixFromStatic, postfixFromHeader, postfixFromUrl); + } + + @Override + public String toString() { + return "QueueSplitterConfiguration{" + + "queue=" + queue + + ", postfixDelimiter='" + postfixDelimiter + '\'' + + ", postfixFromStatic=" + postfixFromStatic + + ", postfixFromHeader='" + postfixFromHeader + '\'' + + ", postfixFromUrl='" + postfixFromUrl + '\'' + + '}'; + } + + public boolean isSplitStatic() { + return postfixFromStatic != null && !postfixFromStatic.isEmpty(); + } + + public boolean isSplitFromRequest() { + return postfixFromHeader != null || postfixFromUrl != null; + } +} diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java new file mode 100644 index 000000000..ad972d8a1 --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java @@ -0,0 +1,87 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.gateleen.core.util.StringUtils; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import java.util.stream.Collectors; + +/** + * Parses the splitter configuration resource in to a list of {@link QueueSplitterConfiguration} + * + * @author https://github.com/gcastaldi [Giannandrea Castaldi] + */ +public class QueueSplitterConfigurationParser { + + private static final Logger log = LoggerFactory.getLogger(QueueSplitterConfigurationParser.class); + public static final String POSTFIX_FROM_STATIC_KEY = "postfixFromStatic"; + public static final String POSTFIX_FROM_REQUEST_KEY = "postfixFromRequest"; + public static final String POSTFIX_FROM_HEADER_KEY = "header"; + public static final String POSTFIX_FROM_URL_KEY = "url"; + public static final String POSTFIX_DELIMITER_KEY = "postfixDelimiter"; + public static final String DEFAULT_POSTFIX_DELIMITER = "-"; + + static List parse(Buffer configurationResourceBuffer, Map properties) { + + JsonObject config; + List queueSplitterConfigurations = new ArrayList<>(); + try { + String resolvedConfiguration = StringUtils.replaceWildcardConfigs( + configurationResourceBuffer.toString(StandardCharsets.UTF_8), + properties + ); + config = new JsonObject(Buffer.buffer(resolvedConfiguration)); + } catch (Exception ex) { + log.warn("Could not replace wildcards with environment properties for queue splitter configurations or json invalid. Here the reason: {}", + ex.getMessage()); + return queueSplitterConfigurations; + } + + for (String queuePattern : config.fieldNames()) { + try { + Pattern pattern = Pattern.compile(queuePattern); + JsonObject queueConfig = config.getJsonObject(queuePattern); + JsonArray postfixFromStatic = queueConfig.getJsonArray(POSTFIX_FROM_STATIC_KEY); + if (postfixFromStatic != null) { + List staticPostfixes = postfixFromStatic.stream().map(Object::toString).collect(Collectors.toList()); + queueSplitterConfigurations.add(new QueueSplitterConfiguration( + pattern, + queueConfig.getString("postfixDelimiter", DEFAULT_POSTFIX_DELIMITER), + staticPostfixes, + null, + null + )); + } else { + JsonObject postfixFromRequest = queueConfig.getJsonObject(POSTFIX_FROM_REQUEST_KEY); + String postfixFromHeader = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_HEADER_KEY) : null; + String postfixFromUrl = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_URL_KEY) : null; + if (postfixFromHeader != null || postfixFromUrl != null) { + queueSplitterConfigurations.add(new QueueSplitterConfiguration( + pattern, + queueConfig.getString(POSTFIX_DELIMITER_KEY, DEFAULT_POSTFIX_DELIMITER), + null, + postfixFromHeader, + postfixFromUrl != null ? Pattern.compile(postfixFromUrl) : null + )); + } else { + log.warn("Queue splitter configuration without a postfix definition"); + } + } + } catch (PatternSyntaxException patternException) { + log.warn("Queue splitter '{}' is not a valid regex pattern. Discarding this queue splitter configuration", queuePattern); + } + } + + return queueSplitterConfigurations; + } + +} diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java new file mode 100644 index 000000000..a7ebf5de9 --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java @@ -0,0 +1,95 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer; +import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutor; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromRequest; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromStaticList; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * {@inheritDoc} + */ +public class QueueSplitterImpl extends ConfigurationResourceConsumer implements QueueSplitter { + + private final Logger log = LoggerFactory.getLogger(QueueSplitterImpl.class); + + private final Map properties; + + private List queueSplitExecutors = new ArrayList<>(); + + public QueueSplitterImpl( + ConfigurationResourceManager configurationResourceManager, + String configResourceUri + ) { + this(configurationResourceManager, configResourceUri, new HashMap<>()); + } + + public QueueSplitterImpl( + ConfigurationResourceManager configurationResourceManager, + String configResourceUri, + Map properties + ) { + super(configurationResourceManager, configResourceUri, "gateleen_queue_splitter_configuration_schema"); + this.properties = properties; + } + + public Future initialize() { + Promise promise = Promise.promise(); + configurationResourceManager().getRegisteredResource(configResourceUri()).onComplete((event -> { + if (event.succeeded() && event.result().isPresent()) { + initializeQueueSplitterConfiguration(event.result().get()); + promise.complete(); + } else { + log.warn("No queue splitter configuration resource with uri '{}' found. Unable to setup splitter configuration correctly", configResourceUri()); + promise.complete(); + } + })); + return promise.future(); + } + + private void initializeQueueSplitterConfiguration(Buffer configuration) { + final List configurations = QueueSplitterConfigurationParser.parse(configuration, properties); + queueSplitExecutors.clear(); + queueSplitExecutors = configurations.stream().map(queueSplitterConfiguration -> { + if (queueSplitterConfiguration.isSplitStatic()) { + return new QueueSplitExecutorFromStaticList(queueSplitterConfiguration); + } else { + return new QueueSplitExecutorFromRequest(queueSplitterConfiguration); + } + }).collect(Collectors.toList()); + } + + /** + * {@inheritDoc} + */ + @Override + public String convertToSubQueue(final String queue, HttpServerRequest request) { + Optional executor = queueSplitExecutors.stream().filter(splitExecutor -> splitExecutor.matches(queue)).findFirst(); + return executor.isPresent() ? executor.get().executeSplit(queue, request) : queue; + } + + @Override + public void resourceChanged(String resourceUri, Buffer resource) { + if (configResourceUri() != null && configResourceUri().equals(resourceUri)) { + log.info("Queue splitter configuration resource {} was updated. Going to initialize with new configuration", resourceUri); + initializeQueueSplitterConfiguration(resource); + } + } + + @Override + public void resourceRemoved(String resourceUri) { + if (configResourceUri() != null && configResourceUri().equals(resourceUri)) { + log.info("Queue splitter configuration resource {} was removed. Going to release all executors", resourceUri); + queueSplitExecutors.clear(); + } + } +} diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutor.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutor.java new file mode 100644 index 000000000..7f56062d0 --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutor.java @@ -0,0 +1,10 @@ +package org.swisspush.gateleen.queue.queuing.splitter.executors; + +import io.vertx.core.http.HttpServerRequest; + +public interface QueueSplitExecutor { + + boolean matches(String queue); + + String executeSplit(String queue, HttpServerRequest request); +} diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorBase.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorBase.java new file mode 100644 index 000000000..1c15edf3c --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorBase.java @@ -0,0 +1,17 @@ +package org.swisspush.gateleen.queue.queuing.splitter.executors; + +import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterConfiguration; + +public abstract class QueueSplitExecutorBase implements QueueSplitExecutor { + + protected final QueueSplitterConfiguration configuration; + + protected QueueSplitExecutorBase(QueueSplitterConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public boolean matches(String queue) { + return configuration.getQueue().matcher(queue).matches(); + } +} diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequest.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequest.java new file mode 100644 index 000000000..98faa9c00 --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequest.java @@ -0,0 +1,37 @@ +package org.swisspush.gateleen.queue.queuing.splitter.executors; + +import io.vertx.core.http.HttpServerRequest; +import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterConfiguration; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class QueueSplitExecutorFromRequest extends QueueSplitExecutorBase { + public QueueSplitExecutorFromRequest(QueueSplitterConfiguration configuration) { + super(configuration); + } + + @Override + public String executeSplit(String queue, HttpServerRequest request) { + StringBuilder stringBuilder = new StringBuilder(queue); + if (matches(queue)) { + if (configuration.getPostfixFromUrl() != null) { + Matcher matcher = configuration.getPostfixFromUrl().matcher(request.uri()); + if (matcher.matches()) { + for (int i = 0; i < matcher.groupCount(); i++) { + stringBuilder.append(configuration.getPostfixDelimiter()); + stringBuilder.append(matcher.group(i + 1)); + } + } + } + if (configuration.getPostfixFromHeader() != null) { + String headerValue = request.headers().get(configuration.getPostfixFromHeader()); + if (headerValue != null) { + stringBuilder.append(configuration.getPostfixDelimiter()); + stringBuilder.append(headerValue); + } + } + } + return stringBuilder.toString(); + } +} diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticList.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticList.java new file mode 100644 index 000000000..d23d80561 --- /dev/null +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticList.java @@ -0,0 +1,30 @@ +package org.swisspush.gateleen.queue.queuing.splitter.executors; + +import io.vertx.core.http.HttpServerRequest; +import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterConfiguration; + +import java.util.concurrent.atomic.AtomicInteger; + +public class QueueSplitExecutorFromStaticList extends QueueSplitExecutorBase { + + AtomicInteger atomicInteger = new AtomicInteger(0); + + public QueueSplitExecutorFromStaticList(QueueSplitterConfiguration configuration) { + super(configuration); + } + + @Override + public String executeSplit(String queue, HttpServerRequest request) { + StringBuilder stringBuilder = new StringBuilder(queue); + if (matches(queue)) { + stringBuilder.append(configuration.getPostfixDelimiter()); + stringBuilder.append(configuration.getPostfixFromStatic().get( + atomicInteger.getAndAccumulate( + 1, + (left, right) -> (left + right) % configuration.getPostfixFromStatic().size() + ) + )); + } + return stringBuilder.toString(); + } +} diff --git a/gateleen-queue/src/main/resources/gateleen_queue_splitter_configuration_schema b/gateleen-queue/src/main/resources/gateleen_queue_splitter_configuration_schema new file mode 100644 index 000000000..e799b9c34 --- /dev/null +++ b/gateleen-queue/src/main/resources/gateleen_queue_splitter_configuration_schema @@ -0,0 +1,75 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "description": "Queue splitter configurations", + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/QueueSplitter" + }, + "definitions": { + "QueueSplitter": { + "description": "A single queue splitter configuration", + "type": "object", + "properties": { + "description": { + "description": "description of the splitter", + "type": "string" + }, + "postfixDelimiter": { + "description": "Separator between original queue and postfix added", + "type": "string" + }, + "postfixFromStatic": { + "description": "List of postfixes to use to compose sub-queues", + "type": "array", + "items": { + "type": "string" + }, + "minItems": 1, + "uniqueItems": true + }, + "postfixFromRequest": { + "$ref": "#/definitions/PostfixFromRequest" + } + }, + "additionalProperties": false, + "oneOf": [ + { + "required": [ + "postfixFromStatic" + ] + }, + { + "required": [ + "postfixFromRequest" + ] + } + ] + }, + "PostfixFromRequest": { + "description": "Postfix generated using request header and/or url", + "anyOf": [ + { + "required": [ + "header" + ] + }, + { + "required": [ + "url" + ] + } + ], + "properties": { + "header": { + "description": "Header to use as postfix", + "type": "string" + }, + "url": { + "description": "Regex to group postfixes from url", + "type": "string" + } + }, + "additionalProperties": false + } + } +} \ No newline at end of file diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/NoOpQueueSplitterTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/NoOpQueueSplitterTest.java new file mode 100644 index 000000000..db5a793a0 --- /dev/null +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/NoOpQueueSplitterTest.java @@ -0,0 +1,31 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +@RunWith(VertxUnitRunner.class) +public class NoOpQueueSplitterTest { + + @Test + public void splitKeepSameQueue(TestContext context) { + + // Given + NoOpQueueSplitter splitter = new NoOpQueueSplitter(); + HttpServerRequest request = mock(HttpServerRequest.class); + + // When + splitter.initialize().onComplete(event -> { + + // Then + context.assertEquals("my-queue-01", splitter.convertToSubQueue("my-queue-01", request)); + context.assertEquals("my-queue-02", splitter.convertToSubQueue("my-queue-02", request)); + context.assertEquals("my-queue-03", splitter.convertToSubQueue("my-queue-03", request)); + }); + } +} diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueConfigurationSchemaValidationTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueConfigurationSchemaValidationTest.java new file mode 100644 index 000000000..bbf9ec28f --- /dev/null +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueConfigurationSchemaValidationTest.java @@ -0,0 +1,70 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.gateleen.core.util.ResourcesUtils; +import org.swisspush.gateleen.core.validation.ValidationResult; +import org.swisspush.gateleen.core.validation.ValidationStatus; +import org.swisspush.gateleen.validation.Validator; + +@RunWith(VertxUnitRunner.class) +public class QueueConfigurationSchemaValidationTest { + + private final String CONFIG_SCHEMA = ResourcesUtils.loadResource("gateleen_queue_splitter_configuration_schema", true); + + private final Logger logger = LoggerFactory.getLogger(QueueConfigurationSchemaValidationTest.class); + + private final String CONFIG_RESOURCE_VALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid_1", true); + + private final String CONFIG_RESOURCE_MISSING_POSTFIX = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_missing_postfix", true); + private final String CONFIG_RESOURCE_MISSING_POSTFIX_REQUEST = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_missing_postfix_request", true); + + @Test + public void testValidConfig(TestContext context) { + + // When + ValidationResult validationResult = validate(CONFIG_RESOURCE_VALID); + + // Then + context.assertNotNull(validationResult); + context.assertEquals(ValidationStatus.VALIDATED_POSITIV, validationResult.getValidationStatus()); + } + + @Test + public void testMissingPostfix(TestContext context) { + + // When + ValidationResult validationResult = validate(CONFIG_RESOURCE_MISSING_POSTFIX); + + // Then + context.assertNotNull(validationResult); + context.assertEquals(ValidationStatus.VALIDATED_NEGATIV, validationResult.getValidationStatus()); + context.assertEquals("$.my-queue-[0-9]+.postfixFromStatic: is missing but it is required", extractErrorMessage(validationResult)); + } + + @Test + public void testMissingPostfixRequest(TestContext context) { + + // When + ValidationResult validationResult = validate(CONFIG_RESOURCE_MISSING_POSTFIX_REQUEST); + + // Then + context.assertNotNull(validationResult); + context.assertEquals(ValidationStatus.VALIDATED_NEGATIV, validationResult.getValidationStatus()); + context.assertEquals("$.my-queue-[0-9]+.postfixFromRequest.header: is missing but it is required", extractErrorMessage(validationResult)); + } + + private ValidationResult validate(String loggingResource) { + return Validator.validateStatic(Buffer.buffer(loggingResource), CONFIG_SCHEMA, logger); + } + + private String extractErrorMessage(ValidationResult validationResult){ + return validationResult.getValidationDetails().getJsonObject(0).getString("message"); + } + +} diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParserTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParserTest.java new file mode 100644 index 000000000..facd53c4d --- /dev/null +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParserTest.java @@ -0,0 +1,169 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.swisspush.gateleen.core.util.ResourcesUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * Tests for {@link QueueSplitterConfigurationParser} class + * + * @author https://github.com/gcastaldi [Giannandrea Castaldi] + */ +@RunWith(VertxUnitRunner.class) +public class QueueSplitterConfigurationParserTest { + + private final String CONFIGURATION_VALID = ResourcesUtils.loadResource( + "testresource_queuesplitter_configuration_valid_1", + true + ); + + private final String CONFIGURATION_INVALID = ResourcesUtils.loadResource( + "testresource_queuesplitter_configuration_missing_postfix", + true + ); + + private final String CONFIGURATION_INVALID_JSON = ResourcesUtils.loadResource( + "testresource_queuesplitter_configuration_invalid_json", + true + ); + + private final String CONFIGURATION_WITH_PROPS = ResourcesUtils.loadResource( + "testresource_queuesplitter_configuration_with_props", + true + ); + + @Test + public void parseWithAllValid(TestContext context) { + + // Given + Buffer configurationResourceBuffer = Buffer.buffer(CONFIGURATION_VALID); + HashMap properties = new HashMap<>(); + + // When + List configurations = QueueSplitterConfigurationParser.parse( + configurationResourceBuffer, + properties + ); + + // Then + context.assertEquals(3, configurations.size()); + + // Note that the order of the parsed configurations matters! + QueueSplitterConfiguration config_1 = configurations.get(0); + context.assertEquals(Pattern.compile("my-queue-1").pattern(), config_1.getQueue().pattern()); + context.assertEquals("-", config_1.getPostfixDelimiter()); + context.assertEquals(List.of("A", "B", "C", "D"), config_1.getPostfixFromStatic()); + context.assertNull(config_1.getPostfixFromHeader()); + context.assertNull(config_1.getPostfixFromUrl()); + context.assertTrue(config_1.isSplitStatic()); + context.assertFalse(config_1.isSplitFromRequest()); + + QueueSplitterConfiguration config_2 = configurations.get(1); + context.assertEquals(Pattern.compile("my-queue-[0-9]+").pattern(), config_2.getQueue().pattern()); + context.assertEquals("+", config_2.getPostfixDelimiter()); + context.assertNull(config_2.getPostfixFromStatic()); + context.assertEquals("x-rp-deviceid", config_2.getPostfixFromHeader()); + context.assertNull(config_2.getPostfixFromUrl()); + context.assertFalse(config_2.isSplitStatic()); + context.assertTrue(config_2.isSplitFromRequest()); + + QueueSplitterConfiguration config_3 = configurations.get(2); + context.assertEquals(Pattern.compile("my-queue-[a-zA-Z]+").pattern(), config_3.getQueue().pattern()); + context.assertEquals("_", config_3.getPostfixDelimiter()); + context.assertNull(config_3.getPostfixFromStatic()); + context.assertNull(config_3.getPostfixFromHeader()); + context.assertEquals( + Pattern.compile(".*/path1/(.*)/path3/path4/.*").pattern(), + config_3.getPostfixFromUrl().pattern()); + context.assertFalse(config_3.isSplitStatic()); + context.assertTrue(config_3.isSplitFromRequest()); + } + + @Test + public void parseWithOneValidOneNot(TestContext context) { + + // Given + Buffer configurationResourceBuffer = Buffer.buffer(CONFIGURATION_INVALID); + HashMap properties = new HashMap<>(); + + // When + List configurations = QueueSplitterConfigurationParser.parse( + configurationResourceBuffer, + properties + ); + + // Then + context.assertEquals(1, configurations.size()); + + QueueSplitterConfiguration config_1 = configurations.get(0); + context.assertEquals(Pattern.compile("my-queue-1").pattern(), config_1.getQueue().pattern()); + context.assertEquals("-", config_1.getPostfixDelimiter()); + context.assertEquals(List.of("A", "B", "C", "D"), config_1.getPostfixFromStatic()); + context.assertNull(config_1.getPostfixFromHeader()); + context.assertNull(config_1.getPostfixFromUrl()); + } + @Test + public void parseWithInvalidJson(TestContext context) { + + // Given + Buffer configurationResourceBuffer = Buffer.buffer(CONFIGURATION_INVALID_JSON); + HashMap properties = new HashMap<>(); + + // When + List configurations = QueueSplitterConfigurationParser.parse( + configurationResourceBuffer, + properties + ); + + // Then + context.assertEquals(0, configurations.size()); + } + + @Test + public void parseWithValidAndProps(TestContext context) { + + // Given + Buffer configurationResourceBuffer = Buffer.buffer(CONFIGURATION_WITH_PROPS); + Map properties = Map.of("queue.splitter.delimiter", "_"); + + // When + List configurations = QueueSplitterConfigurationParser.parse( + configurationResourceBuffer, + properties + ); + + // Then + context.assertEquals(1, configurations.size()); + + QueueSplitterConfiguration config_1 = configurations.get(0); + context.assertEquals(Pattern.compile("my-queue-[0-9]+").pattern(), config_1.getQueue().pattern()); + context.assertEquals("_", config_1.getPostfixDelimiter()); + context.assertNull(config_1.getPostfixFromStatic()); + context.assertEquals("x-rp-deviceid", config_1.getPostfixFromHeader()); + context.assertNull(config_1.getPostfixFromUrl()); + } + @Test + public void parseWithValidAndMissingProps(TestContext context) { + + // Given + Buffer configurationResourceBuffer = Buffer.buffer(CONFIGURATION_WITH_PROPS); + Map properties = new HashMap<>(); + + // When + List configurations = QueueSplitterConfigurationParser.parse( + configurationResourceBuffer, + properties + ); + + // Then + context.assertEquals(0, configurations.size()); + } +} diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java new file mode 100644 index 000000000..2fa996c49 --- /dev/null +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java @@ -0,0 +1,230 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.impl.headers.HeadersMultiMap; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.core.configuration.ConfigurationResourceObserver; +import org.swisspush.gateleen.core.storage.MockResourceStorage; +import org.swisspush.gateleen.core.util.ResourcesUtils; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.swisspush.gateleen.core.configuration.ConfigurationResourceManager.CONFIG_RESOURCE_CHANGED_ADDRESS; + +@RunWith(VertxUnitRunner.class) +public class QueueSplitterImplTest { + + private Vertx vertx; + private MockResourceStorage storage; + private final String configResourceUri = "/queueSplitters"; + private ConfigurationResourceManager configurationResourceManager; + private QueueSplitterImpl queueSplitter; + + private final String CONFIG_RESOURCE_VALID_1 = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid_1", true); + private final String CONFIG_RESOURCE_VALID_2 = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid_2", true); + private final String CONFIG_RESOURCE_INVALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_missing_postfix", true); + + @Before + public void setUp() { + vertx = Vertx.vertx(); + storage = new MockResourceStorage(); + configurationResourceManager = new ConfigurationResourceManager(vertx, storage); + queueSplitter = new QueueSplitterImpl(configurationResourceManager, configResourceUri); + } + + @Test + public void splitWithMissingConfigResource(TestContext context) { + + // Given + Async async = context.async(); + + // When + queueSplitter.initialize().onComplete(event -> { + + // Then + verifySplitStaticNotExecuted(context); + verifySplitWithHeaderNotExecuted(context); + verifySplitWithUrlNotExecuted(context); + async.complete(); + }); + } + + @Test + public void splitWithValidConfigResource(TestContext context) { + + // Given + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1); + + // When + queueSplitter.initialize().onComplete(event -> { + + // Then + verifySplitStaticExecuted(context); + verifySplitWithHeaderExecuted(context); + verifySplitWithUrlExecuted(context); + async.complete(); + }); + } + + @Test + public void splitWithPartiallyInvalidConfigResource(TestContext context) { + + // Given + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID); + + // When + queueSplitter.initialize().onComplete(event -> { + + // Then + verifySplitStaticNotExecuted(context); + verifySplitWithHeaderNotExecuted(context); + verifySplitWithUrlNotExecuted(context); + async.complete(); + }); + } + + + @Test + public void splitWithQueueNotMatchingAnyConfiguration(TestContext context) { + + // Given + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1); + + // When + queueSplitter.initialize().onComplete(event -> { + + // Then + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap()); + context.assertEquals("another-queue", queueSplitter.convertToSubQueue("another-queue", request)); + async.complete(); + }); + } + + @Test + public void configResourceRemovedTriggerRemoveAllExecutors(TestContext context) { + + // Given + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1); + queueSplitter.initialize().onComplete(event -> { + verifySplitStaticExecuted(context); + verifySplitWithHeaderExecuted(context); + verifySplitWithUrlExecuted(context); + + configurationResourceManager.registerObserver(new ConfigurationResourceObserver() { + + @Override + public void resourceChanged(String resourceUri, Buffer resource) { + } + + @Override + public void resourceRemoved(String resourceUri) { + + // Then + verifySplitStaticNotExecuted(context); + verifySplitWithHeaderNotExecuted(context); + verifySplitWithUrlNotExecuted(context); + async.complete(); + } + }, configResourceUri); + + // When + JsonObject object = new JsonObject(); + object.put("requestUri", configResourceUri); + object.put("type", "remove"); + vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object); + }); + } + + @Test + public void configResourceChangedTriggerNewInitOfExecutors(TestContext context) { + + // Given + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1); + queueSplitter.initialize().onComplete(event -> { + verifySplitStaticExecuted(context); + verifySplitWithHeaderExecuted(context); + verifySplitWithUrlExecuted(context); + + configurationResourceManager.registerObserver(new ConfigurationResourceObserver() { + + private int resourceChangedCalls = 0; + + @Override + public void resourceChanged(String resourceUri, Buffer resource) { + // Then + resourceChangedCalls++; + if (resourceChangedCalls == 2) { + verifySplitStaticExecuted(context); + verifySplitWithHeaderNotExecuted(context); + verifySplitWithUrlNotExecuted(context); + async.complete(); + } + } + + @Override + public void resourceRemoved(String resourceUri) { + } + }, configResourceUri); + + // When + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_2); + JsonObject object = new JsonObject(); + object.put("requestUri", configResourceUri); + object.put("type", "change"); + vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object); + }); + } + + private void verifySplitStaticExecuted(TestContext context) { + HttpServerRequest request = mock(HttpServerRequest.class); + context.assertEquals("my-queue-1-A", queueSplitter.convertToSubQueue("my-queue-1", request)); + context.assertEquals("my-queue-1-B", queueSplitter.convertToSubQueue("my-queue-1", request)); + context.assertEquals("my-queue-1-C", queueSplitter.convertToSubQueue("my-queue-1", request)); + } + + private void verifySplitStaticNotExecuted(TestContext context) { + HttpServerRequest request = mock(HttpServerRequest.class); + context.assertEquals("my-queue-1", queueSplitter.convertToSubQueue("my-queue-1", request)); + } + + private void verifySplitWithHeaderExecuted(TestContext context) { + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap().add("x-rp-deviceid", "A1B2C3D4E5F6")); + context.assertEquals("my-queue-2+A1B2C3D4E5F6", queueSplitter.convertToSubQueue("my-queue-2", request)); + } + + private void verifySplitWithHeaderNotExecuted(TestContext context) { + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap().add("x-rp-deviceid", "A1B2C3D4E5F6")); + context.assertEquals("my-queue-2", queueSplitter.convertToSubQueue("my-queue-2", request)); + } + + private void verifySplitWithUrlExecuted(TestContext context) { + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap()); + when(request.uri()).thenReturn("/path1/path2/path3/path4/"); + context.assertEquals("my-queue-a_path2", queueSplitter.convertToSubQueue("my-queue-a", request)); + } + + private void verifySplitWithUrlNotExecuted(TestContext context) { + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap()); + when(request.uri()).thenReturn("/path1/path2/path3/path4/"); + context.assertEquals("my-queue-a", queueSplitter.convertToSubQueue("my-queue-a", request)); + } +} diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequestTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequestTest.java new file mode 100644 index 000000000..3aaa63a90 --- /dev/null +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequestTest.java @@ -0,0 +1,159 @@ +package org.swisspush.gateleen.queue.queuing.splitter.executors; + +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.impl.headers.HeadersMultiMap; +import org.junit.Test; +import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterConfiguration; + +import java.util.regex.Pattern; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class QueueSplitExecutorFromRequestTest { + + @Test + public void testMatchesWithStaticQueueName() { + + // Given + QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + null, + "x-rp-deviceid", + null + )); + + // Then + assertTrue(executor.matches("queue-1")); + assertFalse(executor.matches("queue-2")); + } + + @Test + public void testMatchesWithWildCharQueueName() { + + // Given + QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration( + Pattern.compile("queue-[0-9]+"), + "-", + null, + "x-rp-deviceid", + null + )); + + // Then + assertTrue(executor.matches("queue-1")); + assertTrue(executor.matches("queue-2")); + assertTrue(executor.matches("queue-30")); + assertFalse(executor.matches("queue-a")); + } + + @Test + public void testExecuteSplitWithHeader() { + + // Given + QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + null, + "x-rp-deviceid", + null + )); + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap().add("x-rp-deviceid", "A1B2C3D4E5F6")); + + // Then + assertEquals("queue-1-A1B2C3D4E5F6", executor.executeSplit("queue-1", request)); + } + + @Test + public void testExecuteSplitWithHeaderInDifferentCase() { + + // Given + QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + null, + "x-rp-deviceid", + null + )); + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap().add("X-RP-DEVICEID", "A1B2C3D4E5F6")); + + // Then + assertEquals("queue-1-A1B2C3D4E5F6", executor.executeSplit("queue-1", request)); + } + + @Test + public void testExecuteSplitWithHeaderButMissingInRequest() { + + // Given + QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + null, + "x-rp-deviceid", + null + )); + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap()); + + // Then + assertEquals("queue-1", executor.executeSplit("queue-1", request)); + } + + @Test + public void testExecuteSplitWithUrl() { + + // Given + QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + null, + null, + Pattern.compile("/path1/(.+)/path3/(.+)") + )); + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap()); + when(request.uri()).thenReturn("/path1/path2/path3/path4"); + + // Then + assertEquals("queue-1-path2-path4", executor.executeSplit("queue-1", request)); + } + @Test + public void testExecuteSplitWithUrlAndHeader() { + + // Given + QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + null, + "x-rp-deviceid", + Pattern.compile("/path1/(.+)/path3/(.+)") + )); + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap().add("x-rp-deviceid", "A1B2C3D4E5F6")); + when(request.uri()).thenReturn("/path1/path2/path3/path4"); + + // Then + assertEquals("queue-1-path2-path4-A1B2C3D4E5F6", executor.executeSplit("queue-1", request)); + } + + @Test + public void testExecuteSplitForWrongQueue() { + + // Given + QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + null, + "x-rp-deviceid", + null + )); + HttpServerRequest request = mock(HttpServerRequest.class); + + // Then + assertEquals("queue-2", executor.executeSplit("queue-2", request)); + } +} diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticListTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticListTest.java new file mode 100644 index 000000000..3a162b08b --- /dev/null +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticListTest.java @@ -0,0 +1,92 @@ +package org.swisspush.gateleen.queue.queuing.splitter.executors; + +import io.vertx.core.http.HttpServerRequest; +import org.junit.Test; +import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterConfiguration; + +import java.util.List; +import java.util.regex.Pattern; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +public class QueueSplitExecutorFromStaticListTest { + + @Test + public void testMatchesWithStaticQueueName() { + + // Given + QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + List.of("A", "B", "C", "D"), + null, + null + )); + + // Then + assertTrue(executor.matches("queue-1")); + assertFalse(executor.matches("queue-2")); + } + + @Test + public void testMatchesWithWildCharQueueName() { + + // Given + QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration( + Pattern.compile("queue-[0-9]+"), + "-", + List.of("A", "B", "C", "D"), + null, + null + )); + + // Then + assertTrue(executor.matches("queue-1")); + assertTrue(executor.matches("queue-2")); + assertTrue(executor.matches("queue-30")); + assertFalse(executor.matches("queue-a")); + } + + @Test + public void testExecuteSplit() { + + // Given + QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + List.of("A", "B", "C", "D"), + null, + null + )); + HttpServerRequest request = mock(HttpServerRequest.class); + + // When + + // Then + assertEquals("queue-1-A", executor.executeSplit("queue-1", request)); + assertEquals("queue-1-B", executor.executeSplit("queue-1", request)); + assertEquals("queue-1-C", executor.executeSplit("queue-1", request)); + assertEquals("queue-1-D", executor.executeSplit("queue-1", request)); + assertEquals("queue-1-A", executor.executeSplit("queue-1", request)); + assertEquals("queue-1-B", executor.executeSplit("queue-1", request)); + assertEquals("queue-1-C", executor.executeSplit("queue-1", request)); + } + + @Test + public void testExecuteSplitForWrongQueue() { + + // Given + QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration( + Pattern.compile("queue-1"), + "-", + List.of("A", "B", "C", "D"), + null, + null + )); + HttpServerRequest request = mock(HttpServerRequest.class); + + // Then + assertEquals("queue-2", executor.executeSplit("queue-2", request)); + } +} diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_invalid_json b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_invalid_json new file mode 100644 index 000000000..3fdbfbc82 --- /dev/null +++ b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_invalid_json @@ -0,0 +1,10 @@ +{ + "my-queue-1" : { + "postfixFromStatic": [ + "A" + "B" + "C" + "D" + ] + } +} \ No newline at end of file diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_missing_postfix b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_missing_postfix new file mode 100644 index 000000000..f4522fb7a --- /dev/null +++ b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_missing_postfix @@ -0,0 +1,13 @@ +{ + "my-queue-1" : { + "postfixFromStatic": [ + "A", + "B", + "C", + "D" + ] + }, + "my-queue-[0-9]+" : { + "postfixDelimiter": "+" + } +} \ No newline at end of file diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_missing_postfix_request b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_missing_postfix_request new file mode 100644 index 000000000..a8a77721b --- /dev/null +++ b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_missing_postfix_request @@ -0,0 +1,14 @@ +{ + "my-queue-1" : { + "postfixFromStatic": [ + "A", + "B", + "C", + "D" + ] + }, + "my-queue-[0-9]+" : { + "postfixDelimiter": "+", + "postfixFromRequest": {} + } +} \ No newline at end of file diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_1 b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_1 new file mode 100644 index 000000000..7782e233b --- /dev/null +++ b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_1 @@ -0,0 +1,25 @@ +{ + "my-queue-1" : { + "description": "Simple splitter with static list", + "postfixFromStatic": [ + "A", + "B", + "C", + "D" + ] + }, + "my-queue-[0-9]+" : { + "description": "Simple splitter with request header", + "postfixFromRequest": { + "header": "x-rp-deviceid" + }, + "postfixDelimiter": "+" + }, + "my-queue-[a-zA-Z]+" : { + "description": "Simple splitter with request url matching", + "postfixFromRequest" : { + "url": ".*/path1/(.*)/path3/path4/.*" + }, + "postfixDelimiter": "_" + } +} \ No newline at end of file diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_2 b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_2 new file mode 100644 index 000000000..abf394389 --- /dev/null +++ b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_2 @@ -0,0 +1,10 @@ +{ + "my-queue-1" : { + "postfixFromStatic": [ + "A", + "B", + "C", + "D" + ] + } +} \ No newline at end of file diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_with_props b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_with_props new file mode 100644 index 000000000..3bde49078 --- /dev/null +++ b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_with_props @@ -0,0 +1,8 @@ +{ + "my-queue-[0-9]+" : { + "postfixDelimiter": "${queue.splitter.delimiter}", + "postfixFromRequest": { + "header": "x-rp-deviceid" + } + } +} \ No newline at end of file diff --git a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java index f2cedae8a..824add3af 100755 --- a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java +++ b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java @@ -38,6 +38,8 @@ import org.swisspush.gateleen.queue.queuing.QueueBrowser; import org.swisspush.gateleen.queue.queuing.QueuingHandler; import org.swisspush.gateleen.queue.queuing.circuitbreaker.configuration.QueueCircuitBreakerConfigurationResourceManager; +import org.swisspush.gateleen.queue.queuing.splitter.NoOpQueueSplitter; +import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter; import org.swisspush.gateleen.routing.CustomHttpResponseHandler; import org.swisspush.gateleen.routing.Router; import org.swisspush.gateleen.scheduler.SchedulerResourceManager; @@ -91,6 +93,7 @@ public class RunConfig { private final LoggingResourceManager loggingResourceManager; private final ConfigurationResourceManager configurationResourceManager; private final QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager; + private final QueueSplitter queueSplitter; private final EventBusHandler eventBusHandler; private final ValidationHandler validationHandler; private final HookHandler hookHandler; @@ -115,7 +118,7 @@ public RunConfig(Vertx vertx, RedisProvider redisProvider, Class verticleClass, ValidationResourceManager validationResourceManager, LoggingResourceManager loggingResourceManager, ConfigurationResourceManager configurationResourceManager, QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager, - EventBusHandler eventBusHandler, ValidationHandler validationHandler, HookHandler hookHandler, + QueueSplitter queueSplitter, EventBusHandler eventBusHandler, ValidationHandler validationHandler, HookHandler hookHandler, UserProfileHandler userProfileHandler, RoleProfileHandler roleProfileHandler, ExpansionHandler expansionHandler, DeltaHandler deltaHandler, Authorizer authorizer, CopyResourceHandler copyResourceHandler, QoSHandler qosHandler, PropertyHandler propertyHandler, ZipExtractHandler zipExtractHandler, @@ -134,6 +137,7 @@ public RunConfig(Vertx vertx, RedisProvider redisProvider, Class verticleClass, this.loggingResourceManager = loggingResourceManager; this.configurationResourceManager = configurationResourceManager; this.queueCircuitBreakerConfigurationResourceManager = queueCircuitBreakerConfigurationResourceManager; + this.queueSplitter = queueSplitter; this.eventBusHandler = eventBusHandler; this.validationHandler = validationHandler; this.hookHandler = hookHandler; @@ -168,6 +172,7 @@ private RunConfig(RunConfigBuilder builder) { builder.loggingResourceManager, builder.configurationResourceManager, builder.queueCircuitBreakerConfigurationResourceManager, + builder.queueSplitter, builder.eventBusHandler, builder.validationHandler, builder.hookHandler, @@ -225,6 +230,7 @@ public static class RunConfigBuilder { private LoggingResourceManager loggingResourceManager; private ConfigurationResourceManager configurationResourceManager; private QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager; + private QueueSplitter queueSplitter; private EventBusHandler eventBusHandler; private KafkaHandler kafkaHandler; private CustomHttpResponseHandler customHttpResponseHandler; @@ -277,6 +283,11 @@ public RunConfigBuilder queueCircuitBreakerConfigurationResourceManager(QueueCir return this; } + public RunConfigBuilder queueSplitter(QueueSplitter queueSplitter) { + this.queueSplitter = queueSplitter; + return this; + } + public RunConfigBuilder eventBusHandler(EventBusHandler eventBusHandler) { this.eventBusHandler = eventBusHandler; return this; @@ -510,22 +521,22 @@ public static void deployModules(final Vertx vertx, Class verticleClass, Map { - if (event1.failed()) { - log.error("Could not load rest storage redis module", event1.cause()); - handler.handle(false); - return; - } - - // metrics module - vertx.deployVerticle("org.swisspush.metrics.MetricsModule", new DeploymentOptions().setConfig(RunConfig.buildMetricsConfig()), event2 -> { - if (event2.failed()) { - log.error("Could not load metrics module", event2.cause()); + // rest storage module + vertx.deployVerticle("org.swisspush.reststorage.RestStorageMod", new DeploymentOptions().setConfig(RunConfig.buildStorageConfig()).setInstances(4), event1 -> { + if (event1.failed()) { + log.error("Could not load rest storage redis module", event1.cause()); handler.handle(false); return; } - handler.handle(true); + + // metrics module + vertx.deployVerticle("org.swisspush.metrics.MetricsModule", new DeploymentOptions().setConfig(RunConfig.buildMetricsConfig()), event2 -> { + if (event2.failed()) { + log.error("Could not load metrics module", event2.cause()); + handler.handle(false); + return; + } + handler.handle(true); }); }); }); @@ -597,11 +608,25 @@ private void handleRequest(final RoutingContext ctx) { return; } if (PackingHandler.isPacked(request)) { - request.bodyHandler(new PackingHandler(request, new QueuingHandler(vertx, redisProvider, request, monitoringHandler))); + request.bodyHandler(new PackingHandler( + request, + new QueuingHandler( + vertx, + redisProvider, + request, + monitoringHandler, + queueSplitter + ) + )); } else { if (QueuingHandler.isQueued(request)) { setISO8601Timestamps(request); - request.bodyHandler(new QueuingHandler(vertx, redisProvider, request, monitoringHandler)); + request.bodyHandler(new QueuingHandler( + vertx, + redisProvider, + request, + monitoringHandler, + queueSplitter)); } else { if (cacheHandler != null && cacheHandler.handle(request)) { return;