Skip to content

Commit

Permalink
#550 added split executors
Browse files Browse the repository at this point in the history
  • Loading branch information
zorba71 committed Jan 24, 2024
1 parent 2c28f27 commit c385935
Show file tree
Hide file tree
Showing 14 changed files with 430 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ public void start() {
monitoringHandler);

queueSplitter = new QueueSplitterImpl(configurationResourceManager, SERVER_ROOT + "/admin/v1/queueSplitters");
queueSplitter.initialize();

LogController logController = new LogController();
logController.registerLogConfiguratorMBean(JMX_DOMAIN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ public void handle(final Buffer buffer) {
request.response().setStatusMessage(StatusCode.ACCEPTED.getStatusMessage());
request.response().end();
} else {
requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue));
requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue, request));
}
});

} else {
requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue));
requestQueue.enqueue(request, headers, buffer, queueSplitter.convertToSubQueue(queue, request));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServerRequest;

/**
* {@inheritDoc}
*/
public class NoOpQueueSplitter implements QueueSplitter {

@Override
public Future<Void> initialize() {
Promise<Void> promise = Promise.promise();
promise.complete();
return promise.future();
}

@Override
/**
* {@inheritDoc}
*/
public String convertToSubQueue(String queue) {
public String convertToSubQueue(String queue, HttpServerRequest request) {
return queue;
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Future;
import io.vertx.core.http.HttpServerRequest;

/**
* Interface for queues configured to be split in sub-queues. The method {@link QueueSplitter#convertToSubQueue(String)}
* Interface for queues configured to be split in sub-queues. The method {@link QueueSplitter#convertToSubQueue(String, HttpServerRequest)}
* evaluates the convert of the queue name in a sub-queue name.
*
* @author https://github.com/gcastaldi [Giannandrea Castaldi]
*/
public interface QueueSplitter {
/**
* Convert the queue name in a sub-queue name. If not necessary maintains the initial queue name.
*
* @param queue
* @return sub-queue name
*/
String convertToSubQueue(String queue);

public Future<Void> initialize();

/**
* Convert the queue name in a sub-queue name. If not necessary maintains the initial queue name.
*
* @param queue
* @param request
* @return sub-queue name
*/
String convertToSubQueue(String queue, HttpServerRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,12 @@ public String toString() {
", postfixFromUrl='" + postfixFromUrl + '\'' +
'}';
}

public boolean isSplitStatic() {
return postfixFromStatic != null && !postfixFromStatic.isEmpty();
}

public boolean isSplitFromRequest() {
return postfixFromHeader != null || postfixFromUrl != null;
}
}
Original file line number Diff line number Diff line change
@@ -1,44 +1,95 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* {@inheritDoc}
*/
public class QueueSplitterImpl extends ConfigurationResourceConsumer implements QueueSplitter {

private final Logger log = LoggerFactory.getLogger(QueueSplitterImpl.class);

private final Map<String, Object> properties;

private boolean initialized = false;

public QueueSplitterImpl(
ConfigurationResourceManager configurationResourceManager,
String configResourceUri
) {
this(configurationResourceManager, configResourceUri, "gateleen_queue_splitter_configuration_schema");
this(configurationResourceManager, configResourceUri, new HashMap<>());
}

public QueueSplitterImpl(
ConfigurationResourceManager configurationResourceManager,
String configResourceUri,
String schemaResourceName
Map<String, Object> properties
) {
super(configurationResourceManager, configResourceUri, schemaResourceName);
super(configurationResourceManager, configResourceUri, "gateleen_queue_splitter_configuration_schema");
this.properties = properties;
}

@Override
public void resourceChanged(String resourceUri, Buffer resource) {
public Future<Void> initialize() {
Promise<Void> promise = Promise.promise();
configurationResourceManager().getRegisteredResource(configResourceUri()).onComplete((event -> {
if (event.succeeded() && event.result().isPresent()) {
initializeQueueSplitterConfiguration(event.result().get()).onComplete((event1 -> promise.complete()));
} else {
log.warn("No queue splitter configuration resource with uri '{}' found. Unable to setup kafka configuration correctly", configResourceUri());
promise.complete();
}
}));
return promise.future();
}

public boolean isInitialized() {
return initialized;
}

@Override
public void resourceRemoved(String resourceUri) {
private Future<Void> initializeQueueSplitterConfiguration(Buffer configuration) {
Promise<Void> promise = Promise.promise();
final List<QueueSplitterConfiguration> kafkaConfigurations = QueueSplitterConfigurationParser.parse(configuration, properties);

// TODO: release all splitters and creates new ones

return promise.future();
}

/**
* {@inheritDoc}
*/
@Override
public String convertToSubQueue(final String queue) {
public String convertToSubQueue(final String queue, HttpServerRequest request) {
return queue + "-1";
}

@Override
public void resourceChanged(String resourceUri, Buffer resource) {
if (configResourceUri() != null && configResourceUri().equals(resourceUri)) {
log.info("Queue splitter configuration resource {} was updated. Going to initialize with new configuration", resourceUri);
initializeQueueSplitterConfiguration(resource);
}
}

@Override
public void resourceRemoved(String resourceUri) {
if (configResourceUri() != null && configResourceUri().equals(resourceUri)) {
log.info("Queue splitter configuration resource {} was removed. Going to close all kafka producers", resourceUri);

// TODO: release all splitters and creates new ones

initialized = false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.swisspush.gateleen.queue.queuing.splitter.executors;

import io.vertx.core.http.HttpServerRequest;

public interface QueueSplitExecutor {

boolean matches(String queue);

String executeSplit(String queue, HttpServerRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.swisspush.gateleen.queue.queuing.splitter.executors;

import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterConfiguration;

public abstract class QueueSplitExecutorBase implements QueueSplitExecutor {

protected final QueueSplitterConfiguration configuration;

protected QueueSplitExecutorBase(QueueSplitterConfiguration configuration) {
this.configuration = configuration;
}

@Override
public boolean matches(String queue) {
return configuration.getQueue().matcher(queue).matches();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.swisspush.gateleen.queue.queuing.splitter.executors;

import io.vertx.core.http.HttpServerRequest;
import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterConfiguration;

import java.util.concurrent.atomic.AtomicInteger;

public class QueueSplitExecutorFromList extends QueueSplitExecutorBase {

AtomicInteger atomicInteger = new AtomicInteger(0);

protected QueueSplitExecutorFromList(QueueSplitterConfiguration configuration) {
super(configuration);
}

@Override
public String executeSplit(String queue, HttpServerRequest request) {
StringBuilder stringBuilder = new StringBuilder(queue);
if (matches(queue)) {
stringBuilder.append(configuration.getPostfixDelimiter());
stringBuilder.append(configuration.getPostfixFromStatic().get(
atomicInteger.getAndAccumulate(
1,
(left, right) -> (left + right) % configuration.getPostfixFromStatic().size()
)
));
}
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.swisspush.gateleen.queue.queuing.splitter.executors;

import io.vertx.core.http.HttpServerRequest;
import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitterConfiguration;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class QueueSplitExecutorFromRequest extends QueueSplitExecutorBase {
protected QueueSplitExecutorFromRequest(QueueSplitterConfiguration configuration) {
super(configuration);
}

@Override
public String executeSplit(String queue, HttpServerRequest request) {
StringBuilder stringBuilder = new StringBuilder(queue);
if (matches(queue)) {
if (configuration.getPostfixFromUrl() != null) {
Pattern pattern = Pattern.compile(configuration.getPostfixFromUrl());
Matcher matcher = pattern.matcher(request.uri());
if (matcher.matches()) {
for (int i = 0; i < matcher.groupCount(); i++) {
stringBuilder.append(configuration.getPostfixDelimiter());
stringBuilder.append(matcher.group(i + 1));
}
}
}
if (configuration.getPostfixFromHeader() != null) {
stringBuilder.append(configuration.getPostfixDelimiter());
stringBuilder.append(request.headers().get(configuration.getPostfixFromHeader()));
}
}
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,26 @@ public void parseWithAllValid(TestContext context) {
context.assertEquals(List.of("A", "B", "C", "D"), config_1.getPostfixFromStatic());
context.assertNull(config_1.getPostfixFromHeader());
context.assertNull(config_1.getPostfixFromUrl());
context.assertTrue(config_1.isSplitStatic());
context.assertFalse(config_1.isSplitFromRequest());

QueueSplitterConfiguration config_2 = configurations.get(1);
context.assertEquals(Pattern.compile("my-queue-[0-9]+").pattern(), config_2.getQueue().pattern());
context.assertEquals("+", config_2.getPostfixDelimiter());
context.assertNull(config_2.getPostfixFromStatic());
context.assertEquals("{x-rp-deviceid}", config_2.getPostfixFromHeader());
context.assertEquals("x-rp-deviceid", config_2.getPostfixFromHeader());
context.assertNull(config_2.getPostfixFromUrl());
context.assertFalse(config_2.isSplitStatic());
context.assertTrue(config_2.isSplitFromRequest());

QueueSplitterConfiguration config_3 = configurations.get(2);
context.assertEquals(Pattern.compile("my-queue-[a-zA-Z]+").pattern(), config_3.getQueue().pattern());
context.assertEquals("_", config_3.getPostfixDelimiter());
context.assertNull(config_3.getPostfixFromStatic());
context.assertNull(config_3.getPostfixFromHeader());
context.assertEquals(".*/path1/(.*)/path3/path4/.*", config_3.getPostfixFromUrl());
context.assertFalse(config_3.isSplitStatic());
context.assertTrue(config_3.isSplitFromRequest());
}

@Test
Expand Down
Loading

0 comments on commit c385935

Please sign in to comment.