Skip to content

Commit

Permalink
Merge pull request #557 from swisspost/feature/550_queue_splitter
Browse files Browse the repository at this point in the history
Feature/550 queue splitter
  • Loading branch information
gcastaldi authored Feb 9, 2024
2 parents af1f4a2 + c6597cd commit 161ce36
Show file tree
Hide file tree
Showing 27 changed files with 1,510 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.QueueCircuitBreakerImpl;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitBreakerRulePatternToCircuitMapping;
import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter;
import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterImpl;
import org.swisspush.gateleen.routing.CustomHttpResponseHandler;
import org.swisspush.gateleen.routing.DeferCloseHttpClient;
import org.swisspush.gateleen.routing.Router;
Expand Down Expand Up @@ -154,6 +156,8 @@ public class Server extends AbstractVerticle {
private ContentTypeConstraintHandler contentTypeConstraintHandler;
private CacheHandler cacheHandler;

private QueueSplitter queueSplitter;

public static void main(String[] args) {
Vertx.vertx().deployVerticle("org.swisspush.gateleen.playground.Server", event ->
LoggerFactory.getLogger(Server.class).info("[_] Gateleen - http://localhost:7012/gateleen/")
Expand Down Expand Up @@ -324,6 +328,9 @@ public void start() {
final QueueBrowser queueBrowser = new QueueBrowser(vertx, SERVER_ROOT + "/queuing", Address.redisquesAddress(),
monitoringHandler);

queueSplitter = new QueueSplitterImpl(configurationResourceManager, SERVER_ROOT + "/admin/v1/queueSplitters");
queueSplitter.initialize();

LogController logController = new LogController();
logController.registerLogConfiguratorMBean(JMX_DOMAIN);

Expand All @@ -348,6 +355,7 @@ public void start() {
.loggingResourceManager(loggingResourceManager)
.configurationResourceManager(configurationResourceManager)
.queueCircuitBreakerConfigurationResourceManager(queueCircuitBreakerConfigurationResourceManager)
.queueSplitter(queueSplitter)
.schedulerResourceManager(schedulerResourceManager)
.zipExtractHandler(zipExtractHandler)
.delegateHandler(delegateHandler)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"queue-static-split": {
"description": "Simple static splitter",
"postfixFromStatic": [
"A",
"B",
"C",
"D"
]
},
"queue-header-[a-z]+": {
"description": "Simple splitter with request header",
"postfixDelimiter": "+",
"postfixFromRequest": {
"header": "x-rp-deviceid"
}
},
"queue-path-[a-z]+": {
"description": "Simple splitter with request url matching",
"postfixDelimiter": "_",
"postfixFromRequest": {
"url": ".*/path1/(.*)/.*"
}
},
"queue-header-and-path-[a-z]+": {
"description": "Simple splitter with request header and url matching",
"postfixDelimiter": "_",
"postfixFromRequest": {
"header": "x-rp-deviceid",
"url": ".*/path1/(.*)/.*"
}
}
}
63 changes: 59 additions & 4 deletions gateleen-queue/README_queue.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# gateleen-queue
The gateleen-queue module provides queuing functionality and acts as a bridge to [vertx-redisques](https://github.com/swisspush/vertx-redisques).

### Queue retry configuration
## Queue retry configuration
Normally, failed queue items remain in the queue until successfully processed or manually deleted. With the `x-queue-retry-xxx` request header, you are able to control this behaviour.

Example values are:
Expand Down Expand Up @@ -32,9 +32,9 @@ The state diagram below describes the possible transitions between these states:

```
+--------+
+------->+ | open circuit
+------->+ | fail ratio reached
success | | CLOSED +---------------------+
+--------+ | |
+--------+ | open circuit |
+---+----+ |
^ |
| v
Expand Down Expand Up @@ -469,4 +469,59 @@ Example:
Also you have to enable the logging on the [QueueCircuitBreakerConfigurationResource](src/main/java/org/swisspush/gateleen/queue/queuing/circuitbreaker/configuration/QueueCircuitBreakerConfigurationResource.java) by calling
```java
queueCircuitBreakerConfigurationResourceManager.enableResourceLogging(true);
```
```

## Queue Splitter
In case there are queues with a large number of queue items to process there is the option to configure these queues to split into sub-queues.
The split is implemented dispatching the incoming request in one of the sub-queues (so the split is always active). Don't use the queue splitting feature when queue items from a queue have to be in cronological order (requests in sub-queues are processed in parallel).

Queue splitters are configured together (for example in admin/v1/queueSplitters), each splitter configuration is composted of three parts:
* name: regex used to match the queue name of the incoming requests
* postfix rule: rule used to generate the postfix to append to the initial queue name
* postfixDelimiter: Optional delimiter value to add between queue name and postfix. When not configured, _-_ is used

There are two types of postfix rules:
* static
* based on request

### Static postfix rule
In the rule all postfixes for splitting are listed. Here is an example of splitter configuration with static postfix rule:
```json
"queue-static": {
"description": "Simple static splitter",
"postfixFromStatic": [
"A",
"B",
"C",
"D"
]
}
```
In this case the splitter is applied only if the queue in the request is 'queue-static' and the splitting is done distributing uniformly the requests in the sub-queues 'queue-static-A', 'queue-static-B', 'queue-static-C' and 'queue-static-D'.

### Postfix rule based on request
Request header to use as postfix and/or the regex to extract parts from the url can be defined in the _postfixFromRequest_ property. Here are two examples:
```json
"queue-header-[a-z]+": {
"description": "Simple splitter with request header",
"postfixDelimiter": "+",
"postfixFromRequest": {
"header": "x-rp-deviceid"
}
},
"queue-path-[a-z]+": {
"description": "Simple splitter with request url matching",
"postfixDelimiter": "_",
"postfixFromRequest": {
"url": ".*/path1/(.*)/.*"
}
}
```
In both cases the splitter is applied to all the queues matching the name regex.

In first case the value of the header 'x-rp-deviceid' is added. A queue with name 'queue-header-test' and with a request header 'x-rp-deviceid' valued 'A1B2C3D4' the request is splitted in the sub-queue 'queue-header-test+A1B2C3D4'.
For the second case the matching parts of the request url are added. A queue with name 'queue-path-test' and with the url ending with .../path1/path2/path3 the request is splitted in the sub-queue 'queue-path-test_path2'.

### Splitter implementation
The evaluation of splitting for a queue is defined in the interface [QueueSplitter](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitter.java) with two implementations: [QueueSplitterImpl](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java) (to execute the splitters configured) and [NoOpQueueSplitter](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/NoOpQueueSplitter.java) (no splitter).
For each splitter configured is created either an instance of [QueueSplitExecutorFromStaticList](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromStaticList.java) (for the case of static postfix rule) or an instance of [QueueSplitExecutorFromRequest](../gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequest.java) (for the case of postfix rule based on request).
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.duplicate.DuplicateCheckHandler;
import org.swisspush.gateleen.queue.queuing.splitter.NoOpQueueSplitter;
import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter;

import static org.swisspush.redisques.util.RedisquesAPI.buildCheckOperation;

Expand All @@ -34,16 +36,45 @@ public static boolean isQueued(HttpServerRequest request) {
private final HttpServerRequest request;
private final Vertx vertx;
private final RedisProvider redisProvider;
private final QueueSplitter queueSplitter;

public QueuingHandler(Vertx vertx, RedisProvider redisProvider, HttpServerRequest request, MonitoringHandler monitoringHandler) {
this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler));
public QueuingHandler(
Vertx vertx,
RedisProvider redisProvider,
HttpServerRequest request,
MonitoringHandler monitoringHandler
) {
this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler), new NoOpQueueSplitter());

Check warning on line 47 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L47

Added line #L47 was not covered by tests
}

public QueuingHandler(Vertx vertx, RedisProvider redisProvider, HttpServerRequest request, RequestQueue requestQueue) {
public QueuingHandler(
Vertx vertx,
RedisProvider redisProvider,
HttpServerRequest request,
MonitoringHandler monitoringHandler,
QueueSplitter queueSplitter
) {
this(

Check warning on line 57 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L57

Added line #L57 was not covered by tests
vertx,
redisProvider,
request,
new QueueClient(vertx, monitoringHandler),
queueSplitter == null ? new NoOpQueueSplitter() : queueSplitter
);
}

Check warning on line 64 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L64

Added line #L64 was not covered by tests

public QueuingHandler(
Vertx vertx,
RedisProvider redisProvider,
HttpServerRequest request,
RequestQueue requestQueue,
QueueSplitter queueSplitter
) {

Check warning on line 72 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L72

Added line #L72 was not covered by tests
this.request = request;
this.vertx = vertx;
this.redisProvider = redisProvider;
this.requestQueue = requestQueue;
this.queueSplitter = queueSplitter;

Check warning on line 77 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L77

Added line #L77 was not covered by tests
}

@Override
Expand All @@ -61,12 +92,12 @@ public void handle(final Buffer buffer) {
request.response().setStatusMessage(StatusCode.ACCEPTED.getStatusMessage());
request.response().end();
} else {
requestQueue.enqueue(request, headers, buffer, queue);
requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue, request));

Check warning on line 95 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L95

Added line #L95 was not covered by tests
}
});

} else {
requestQueue.enqueue(request, headers, buffer, queue);
requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue, request));

Check warning on line 100 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L100

Added line #L100 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServerRequest;

/**
* {@inheritDoc}
*/
public class NoOpQueueSplitter implements QueueSplitter {

@Override
public Future<Void> initialize() {
Promise<Void> promise = Promise.promise();
promise.complete();
return promise.future();
}

@Override
/**
* {@inheritDoc}
*/
public String convertToSubQueue(String queue, HttpServerRequest request) {
return queue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Future;
import io.vertx.core.http.HttpServerRequest;

/**
* Interface for queues configured to be split in sub-queues. The method {@link QueueSplitter#convertToSubQueue(String, HttpServerRequest)}
* evaluates the convert of the queue name in a sub-queue name.
*
* @author https://github.com/gcastaldi [Giannandrea Castaldi]
*/
public interface QueueSplitter {

public Future<Void> initialize();

/**
* Convert the queue name in a sub-queue name. If not necessary maintains the initial queue name.
*
* @param queue
* @param request
* @return sub-queue name
*/
String convertToSubQueue(String queue, HttpServerRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

/**
* Container holding configuration values for {@link QueueSplitterImpl} identified
* by a queue pattern.
*
* @author https://github.com/gcastaldi [Giannandrea Castaldi]
*/
public class QueueSplitterConfiguration {

private final Pattern queue;

private final String postfixDelimiter;

@Nullable
private final List<String> postfixFromStatic;

@Nullable
private final String postfixFromHeader;

@Nullable
private final Pattern postfixFromUrl;


public QueueSplitterConfiguration(
Pattern queue,
String postfixDelimiter,
@Nullable List<String> postfixFromStatic,
@Nullable String postfixFromHeader,
@Nullable Pattern postfixFromUrl) {
this.queue = queue;
this.postfixDelimiter = postfixDelimiter;
this.postfixFromStatic = postfixFromStatic;
this.postfixFromHeader = postfixFromHeader;
this.postfixFromUrl = postfixFromUrl;
}

public Pattern getQueue() {
return queue;
}

public String getPostfixDelimiter() {
return postfixDelimiter;
}

@Nullable
public List<String> getPostfixFromStatic() {
return postfixFromStatic;
}

@Nullable
public String getPostfixFromHeader() {
return postfixFromHeader;
}

@Nullable
public Pattern getPostfixFromUrl() {
return postfixFromUrl;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QueueSplitterConfiguration that = (QueueSplitterConfiguration) o;

Check warning on line 70 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java#L70

Added line #L70 was not covered by tests
return Objects.equals(queue, that.queue) &&
Objects.equals(postfixDelimiter, that.postfixDelimiter) &&
Objects.equals(postfixFromStatic, that.postfixFromStatic) &&
Objects.equals(postfixFromHeader, that.postfixFromHeader) &&
Objects.equals(postfixFromUrl, that.postfixFromUrl);
}

@Override
public int hashCode() {
return Objects.hash(queue, postfixDelimiter, postfixFromStatic, postfixFromHeader, postfixFromUrl);

Check warning on line 80 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java#L80

Added line #L80 was not covered by tests
}

@Override
public String toString() {
return "QueueSplitterConfiguration{" +

Check warning on line 85 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java#L85

Added line #L85 was not covered by tests
"queue=" + queue +
", postfixDelimiter='" + postfixDelimiter + '\'' +
", postfixFromStatic=" + postfixFromStatic +
", postfixFromHeader='" + postfixFromHeader + '\'' +
", postfixFromUrl='" + postfixFromUrl + '\'' +
'}';
}

public boolean isSplitStatic() {
return postfixFromStatic != null && !postfixFromStatic.isEmpty();
}

public boolean isSplitFromRequest() {
return postfixFromHeader != null || postfixFromUrl != null;
}
}
Loading

0 comments on commit 161ce36

Please sign in to comment.