Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/550 queue splitter #557

Merged
merged 20 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.QueueCircuitBreakerImpl;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitBreakerRulePatternToCircuitMapping;
import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter;
import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterImpl;
import org.swisspush.gateleen.routing.CustomHttpResponseHandler;
import org.swisspush.gateleen.routing.DeferCloseHttpClient;
import org.swisspush.gateleen.routing.Router;
Expand Down Expand Up @@ -154,6 +156,8 @@ public class Server extends AbstractVerticle {
private ContentTypeConstraintHandler contentTypeConstraintHandler;
private CacheHandler cacheHandler;

private QueueSplitter queueSplitter;

public static void main(String[] args) {
Vertx.vertx().deployVerticle("org.swisspush.gateleen.playground.Server", event ->
LoggerFactory.getLogger(Server.class).info("[_] Gateleen - http://localhost:7012/gateleen/")
Expand Down Expand Up @@ -323,6 +327,9 @@ public void start() {
final QueueBrowser queueBrowser = new QueueBrowser(vertx, SERVER_ROOT + "/queuing", Address.redisquesAddress(),
monitoringHandler);

queueSplitter = new QueueSplitterImpl(configurationResourceManager, SERVER_ROOT + "/admin/v1/queueSplitters");
queueSplitter.initialize();

LogController logController = new LogController();
logController.registerLogConfiguratorMBean(JMX_DOMAIN);

Expand All @@ -347,6 +354,7 @@ public void start() {
.loggingResourceManager(loggingResourceManager)
.configurationResourceManager(configurationResourceManager)
.queueCircuitBreakerConfigurationResourceManager(queueCircuitBreakerConfigurationResourceManager)
.queueSplitter(queueSplitter)
.schedulerResourceManager(schedulerResourceManager)
.zipExtractHandler(zipExtractHandler)
.delegateHandler(delegateHandler)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"queue-static-split": {
"description": "Simple splitter with request header",
"postfixFromStatic": [
"A",
"B",
"C",
"D"
]
},
"queue-header-[a-z]+": {
"description": "Simple splitter with request header",
"postfixDelimiter": "+",
"postfixFromRequest": {
"header": "x-rp-deviceid"
}
},
"queue-path-[a-z]+": {
"description": "Simple splitter with request url matching",
"postfixDelimiter": "_",
"postfixFromRequest": {
"url": ".*/path1/(.*)/.*"
}
},
"queue-header-and-path-[a-z]+": {
"description": "Simple splitter with request header and url matching",
"postfixDelimiter": "_",
"postfixFromRequest": {
"header": "x-rp-deviceid",
"url": ".*/path1/(.*)/.*"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.duplicate.DuplicateCheckHandler;
import org.swisspush.gateleen.queue.queuing.splitter.NoOpQueueSplitter;
import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter;

import static org.swisspush.redisques.util.RedisquesAPI.buildCheckOperation;

Expand All @@ -34,16 +36,45 @@
private final HttpServerRequest request;
private final Vertx vertx;
private final RedisProvider redisProvider;
private final QueueSplitter queueSplitter;

public QueuingHandler(Vertx vertx, RedisProvider redisProvider, HttpServerRequest request, MonitoringHandler monitoringHandler) {
this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler));
public QueuingHandler(
Vertx vertx,
RedisProvider redisProvider,
HttpServerRequest request,
MonitoringHandler monitoringHandler
) {
this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler), new NoOpQueueSplitter());

Check warning on line 47 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L47

Added line #L47 was not covered by tests
}

public QueuingHandler(Vertx vertx, RedisProvider redisProvider, HttpServerRequest request, RequestQueue requestQueue) {
public QueuingHandler(
Vertx vertx,
RedisProvider redisProvider,
HttpServerRequest request,
MonitoringHandler monitoringHandler,
QueueSplitter queueSplitter
) {
this(

Check warning on line 57 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L57

Added line #L57 was not covered by tests
vertx,
redisProvider,
request,
new QueueClient(vertx, monitoringHandler),
queueSplitter == null ? new NoOpQueueSplitter() : queueSplitter
);
}

Check warning on line 64 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L64

Added line #L64 was not covered by tests

public QueuingHandler(
Vertx vertx,
RedisProvider redisProvider,
HttpServerRequest request,
RequestQueue requestQueue,
QueueSplitter queueSplitter
) {

Check warning on line 72 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L72

Added line #L72 was not covered by tests
this.request = request;
this.vertx = vertx;
this.redisProvider = redisProvider;
this.requestQueue = requestQueue;
this.queueSplitter = queueSplitter;

Check warning on line 77 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L77

Added line #L77 was not covered by tests
}

@Override
Expand All @@ -61,12 +92,12 @@
request.response().setStatusMessage(StatusCode.ACCEPTED.getStatusMessage());
request.response().end();
} else {
requestQueue.enqueue(request, headers, buffer, queue);
requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue, request));

Check warning on line 95 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L95

Added line #L95 was not covered by tests
}
});

} else {
requestQueue.enqueue(request, headers, buffer, queue);
requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue, request));

Check warning on line 100 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L100

Added line #L100 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServerRequest;

/**
* {@inheritDoc}
*/
public class NoOpQueueSplitter implements QueueSplitter {

@Override
public Future<Void> initialize() {
Promise<Void> promise = Promise.promise();
promise.complete();
return promise.future();
}

@Override
/**
* {@inheritDoc}
*/
public String convertToSubQueue(String queue, HttpServerRequest request) {
return queue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Future;
import io.vertx.core.http.HttpServerRequest;

/**
* Interface for queues configured to be split in sub-queues. The method {@link QueueSplitter#convertToSubQueue(String, HttpServerRequest)}
* evaluates the convert of the queue name in a sub-queue name.
*
* @author https://github.com/gcastaldi [Giannandrea Castaldi]
*/
public interface QueueSplitter {

public Future<Void> initialize();

/**
* Convert the queue name in a sub-queue name. If not necessary maintains the initial queue name.
*
* @param queue
* @param request
* @return sub-queue name
*/
String convertToSubQueue(String queue, HttpServerRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

/**
* Container holding configuration values for {@link QueueSplitterImpl} identified
* by a queue pattern.
*
* @author https://github.com/gcastaldi [Giannandrea Castaldi]
*/
public class QueueSplitterConfiguration {

private final Pattern queue;

private final String postfixDelimiter;

@Nullable
private final List<String> postfixFromStatic;

@Nullable
private final String postfixFromHeader;

@Nullable
private final Pattern postfixFromUrl;


public QueueSplitterConfiguration(
Pattern queue,
String postfixDelimiter,
@Nullable List<String> postfixFromStatic,
@Nullable String postfixFromHeader,
@Nullable Pattern postfixFromUrl) {
this.queue = queue;
this.postfixDelimiter = postfixDelimiter;
this.postfixFromStatic = postfixFromStatic;
this.postfixFromHeader = postfixFromHeader;
this.postfixFromUrl = postfixFromUrl;
}

public Pattern getQueue() {
return queue;
}

public String getPostfixDelimiter() {
return postfixDelimiter;
}

@Nullable
public List<String> getPostfixFromStatic() {
return postfixFromStatic;
}

@Nullable
public String getPostfixFromHeader() {
return postfixFromHeader;
}

@Nullable
public Pattern getPostfixFromUrl() {
return postfixFromUrl;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QueueSplitterConfiguration that = (QueueSplitterConfiguration) o;

Check warning on line 70 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java#L70

Added line #L70 was not covered by tests
return Objects.equals(queue, that.queue) &&
Objects.equals(postfixDelimiter, that.postfixDelimiter) &&
Objects.equals(postfixFromStatic, that.postfixFromStatic) &&
Objects.equals(postfixFromHeader, that.postfixFromHeader) &&
Objects.equals(postfixFromUrl, that.postfixFromUrl);
}

@Override
public int hashCode() {
return Objects.hash(queue, postfixDelimiter, postfixFromStatic, postfixFromHeader, postfixFromUrl);

Check warning on line 80 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java#L80

Added line #L80 was not covered by tests
}

@Override
public String toString() {
return "QueueSplitterConfiguration{" +

Check warning on line 85 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfiguration.java#L85

Added line #L85 was not covered by tests
"queue=" + queue +
", postfixDelimiter='" + postfixDelimiter + '\'' +
", postfixFromStatic=" + postfixFromStatic +
", postfixFromHeader='" + postfixFromHeader + '\'' +
", postfixFromUrl='" + postfixFromUrl + '\'' +
'}';
}

public boolean isSplitStatic() {
return postfixFromStatic != null && !postfixFromStatic.isEmpty();
}

public boolean isSplitFromRequest() {
return postfixFromHeader != null || postfixFromUrl != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.util.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

/**
* Parses the splitter configuration resource in to a list of {@link QueueSplitterConfiguration}
*
* @author https://github.com/gcastaldi [Giannandrea Castaldi]
*/
public class QueueSplitterConfigurationParser {

Check warning on line 23 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java#L23

Added line #L23 was not covered by tests

private static final Logger log = LoggerFactory.getLogger(QueueSplitterConfigurationParser.class);
public static final String POSTFIX_FROM_STATIC_KEY = "postfixFromStatic";
public static final String POSTFIX_FROM_REQUEST_KEY = "postfixFromRequest";
public static final String POSTFIX_FROM_HEADER_KEY = "header";
public static final String POSTFIX_FROM_URL_KEY = "url";
public static final String POSTFIX_DELIMITER_KEY = "postfixDelimiter";
public static final String DEFAULT_POSTFIX_DELIMITER = "-";

static List<QueueSplitterConfiguration> parse(Buffer configurationResourceBuffer, Map<String, Object> properties) {

JsonObject config;
List<QueueSplitterConfiguration> queueSplitterConfigurations = new ArrayList<>();
try {
String resolvedConfiguration = StringUtils.replaceWildcardConfigs(
configurationResourceBuffer.toString(StandardCharsets.UTF_8),
properties
);
config = new JsonObject(Buffer.buffer(resolvedConfiguration));
} catch (Exception ex) {
log.warn("Could not replace wildcards with environment properties for queue splitter configurations or json invalid. Here the reason: {}",
ex.getMessage());
return queueSplitterConfigurations;
}

for (String queuePattern : config.fieldNames()) {
try {
Pattern pattern = Pattern.compile(queuePattern);
JsonObject queueConfig = config.getJsonObject(queuePattern);
JsonArray postfixFromStatic = queueConfig.getJsonArray(POSTFIX_FROM_STATIC_KEY);
if (postfixFromStatic != null) {
List<String> staticPostfixes = postfixFromStatic.stream().map(Object::toString).collect(Collectors.toList());
queueSplitterConfigurations.add(new QueueSplitterConfiguration(
pattern,
queueConfig.getString("postfixDelimiter", DEFAULT_POSTFIX_DELIMITER),
staticPostfixes,
null,
null
));
} else {
JsonObject postfixFromRequest = queueConfig.getJsonObject(POSTFIX_FROM_REQUEST_KEY);
String postfixFromHeader = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_HEADER_KEY) : null;
String postfixFromUrl = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_URL_KEY) : null;
if (postfixFromRequest != null && (postfixFromHeader != null || postfixFromUrl != null)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postfixFromRequest != null can be omitted since you already check postfixFromHeader and postfixFromUrl

queueSplitterConfigurations.add(new QueueSplitterConfiguration(
pattern,
queueConfig.getString(POSTFIX_DELIMITER_KEY, DEFAULT_POSTFIX_DELIMITER),
null,
postfixFromHeader,
postfixFromUrl != null ? Pattern.compile(postfixFromUrl) : null
));
} else {
log.warn("Queue splitter configuration without a postfix definition");
}
}
} catch (PatternSyntaxException patternException) {
log.warn("Queue splitter '{}' is not a valid regex pattern. Discarding this queue splitter configuration", queuePattern);

Check warning on line 80 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java#L79-L80

Added lines #L79 - L80 were not covered by tests
}
}

return queueSplitterConfigurations;
}

}
Loading
Loading