Skip to content

Commit

Permalink
#550 refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
zorba71 committed Feb 6, 2024
1 parent 00b1b03 commit 4a4eecd
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ static List<QueueSplitterConfiguration> parse(Buffer configurationResourceBuffer
null,
null
));
continue;
}
String postfixFromHeader = queueConfig.getString(POSTFIX_FROM_HEADER_KEY);
String postfixFromUrl = queueConfig.getString(POSTFIX_FROM_URL_KEY);
if (postfixFromHeader != null || postfixFromUrl != null) {
queueSplitterConfigurations.add(new QueueSplitterConfiguration(
pattern,
queueConfig.getString(POSTFIX_DELIMITER_KEY, DEFAULT_POSTFIX_DELIMITER),
null,
postfixFromHeader,
postfixFromUrl != null ? Pattern.compile(postfixFromUrl) : null
));
} else {
log.warn("Queue splitter configuration without a postfix definition");
String postfixFromHeader = queueConfig.getString(POSTFIX_FROM_HEADER_KEY);
String postfixFromUrl = queueConfig.getString(POSTFIX_FROM_URL_KEY);
if (postfixFromHeader != null || postfixFromUrl != null) {
queueSplitterConfigurations.add(new QueueSplitterConfiguration(
pattern,
queueConfig.getString(POSTFIX_DELIMITER_KEY, DEFAULT_POSTFIX_DELIMITER),
null,
postfixFromHeader,
postfixFromUrl != null ? Pattern.compile(postfixFromUrl) : null
));
} else {
log.warn("Queue splitter configuration without a postfix definition");
}
}
} catch (PatternSyntaxException patternException) {
log.warn("Queue splitter '{}' is not a valid regex pattern. Discarding this queue splitter configuration", queuePattern);

Check warning on line 78 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java#L77-L78

Added lines #L77 - L78 were not covered by tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutor;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromList;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromStaticList;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromRequest;

import java.util.*;
Expand Down Expand Up @@ -67,7 +67,7 @@ private void initializeQueueSplitterConfiguration(Buffer configuration) {
queueSplitExecutors.clear();
queueSplitExecutors = configurations.stream().map(queueSplitterConfiguration -> {
if (queueSplitterConfiguration.isSplitStatic()) {
return new QueueSplitExecutorFromList(queueSplitterConfiguration);
return new QueueSplitExecutorFromStaticList(queueSplitterConfiguration);
} else {
return new QueueSplitExecutorFromRequest(queueSplitterConfiguration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

import java.util.concurrent.atomic.AtomicInteger;

public class QueueSplitExecutorFromList extends QueueSplitExecutorBase {
public class QueueSplitExecutorFromStaticList extends QueueSplitExecutorBase {

AtomicInteger atomicInteger = new AtomicInteger(0);

public QueueSplitExecutorFromList(QueueSplitterConfiguration configuration) {
public QueueSplitExecutorFromStaticList(QueueSplitterConfiguration configuration) {
super(configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testExecuteSplitWithUrlAndHeader() {
public void testExecuteSplitForWrongQueue() {

// Given
QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration(
QueueSplitExecutorFromRequest executor = new QueueSplitExecutorFromRequest(new QueueSplitterConfiguration(
Pattern.compile("queue-1"),
"-",
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;

public class QueueSplitExecutorFromListTest {
public class QueueSplitExecutorFromStaticListTest {

@Test
public void testMatchesWithStaticQueueName() {

// Given
QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration(
QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration(
Pattern.compile("queue-1"),
"-",
List.of("A", "B", "C", "D"),
Expand All @@ -33,7 +33,7 @@ public void testMatchesWithStaticQueueName() {
public void testMatchesWithWildCharQueueName() {

// Given
QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration(
QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration(
Pattern.compile("queue-[0-9]+"),
"-",
List.of("A", "B", "C", "D"),
Expand All @@ -52,7 +52,7 @@ public void testMatchesWithWildCharQueueName() {
public void testExecuteSplit() {

// Given
QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration(
QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration(
Pattern.compile("queue-1"),
"-",
List.of("A", "B", "C", "D"),
Expand All @@ -77,7 +77,7 @@ public void testExecuteSplit() {
public void testExecuteSplitForWrongQueue() {

// Given
QueueSplitExecutorFromList executor = new QueueSplitExecutorFromList(new QueueSplitterConfiguration(
QueueSplitExecutorFromStaticList executor = new QueueSplitExecutorFromStaticList(new QueueSplitterConfiguration(
Pattern.compile("queue-1"),
"-",
List.of("A", "B", "C", "D"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public static class RunConfigBuilder {
private LoggingResourceManager loggingResourceManager;
private ConfigurationResourceManager configurationResourceManager;
private QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager;
private QueueSplitter queueSplitter = new NoOpQueueSplitter();
private QueueSplitter queueSplitter;
private EventBusHandler eventBusHandler;
private KafkaHandler kafkaHandler;
private CustomHttpResponseHandler customHttpResponseHandler;
Expand Down Expand Up @@ -521,22 +521,22 @@ public static void deployModules(final Vertx vertx, Class verticleClass, Map<Str
handler.handle(false);
return;
}
// rest storage module
vertx.deployVerticle("org.swisspush.reststorage.RestStorageMod", new DeploymentOptions().setConfig(RunConfig.buildStorageConfig()).setInstances(4), event1 -> {
if (event1.failed()) {
log.error("Could not load rest storage redis module", event1.cause());
handler.handle(false);
return;
}

// metrics module
vertx.deployVerticle("org.swisspush.metrics.MetricsModule", new DeploymentOptions().setConfig(RunConfig.buildMetricsConfig()), event2 -> {
if (event2.failed()) {
log.error("Could not load metrics module", event2.cause());
// rest storage module
vertx.deployVerticle("org.swisspush.reststorage.RestStorageMod", new DeploymentOptions().setConfig(RunConfig.buildStorageConfig()).setInstances(4), event1 -> {
if (event1.failed()) {
log.error("Could not load rest storage redis module", event1.cause());
handler.handle(false);
return;
}
handler.handle(true);

// metrics module
vertx.deployVerticle("org.swisspush.metrics.MetricsModule", new DeploymentOptions().setConfig(RunConfig.buildMetricsConfig()), event2 -> {
if (event2.failed()) {
log.error("Could not load metrics module", event2.cause());
handler.handle(false);
return;
}
handler.handle(true);
});
});
});
Expand Down Expand Up @@ -608,11 +608,25 @@ private void handleRequest(final RoutingContext ctx) {
return;
}
if (PackingHandler.isPacked(request)) {
request.bodyHandler(new PackingHandler(request, new QueuingHandler(vertx, redisProvider, request, monitoringHandler, queueSplitter)));
request.bodyHandler(new PackingHandler(
request,
new QueuingHandler(
vertx,
redisProvider,
request,
monitoringHandler,
queueSplitter != null ? queueSplitter : new NoOpQueueSplitter()
)
));
} else {
if (QueuingHandler.isQueued(request)) {
setISO8601Timestamps(request);
request.bodyHandler(new QueuingHandler(vertx, redisProvider, request, monitoringHandler, queueSplitter));
request.bodyHandler(new QueuingHandler(
vertx,
redisProvider,
request,
monitoringHandler,
queueSplitter != null ? queueSplitter : new NoOpQueueSplitter()));
} else {
if (cacheHandler != null && cacheHandler.handle(request)) {
return;
Expand Down

0 comments on commit 4a4eecd

Please sign in to comment.