Skip to content

Commit

Permalink
#550 fixes, refactorings and improvements in splitter
Browse files Browse the repository at this point in the history
  • Loading branch information
zorba71 committed Feb 7, 2024
1 parent 7f30b4b commit ba3306b
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"queue-static-split": {
"description": "Simple splitter with request header",
"postfixFromStatic": [
"A",
"B",
Expand All @@ -8,16 +9,25 @@
]
},
"queue-header-[a-z]+": {
"description": "Simple splitter with request header",
"postfixDelimiter": "+",
"postfixFromHeader": "x-rp-deviceid"
"postfixFromRequest": {
"header": "x-rp-deviceid"
}
},
"queue-path-[a-z]+": {
"description": "Simple splitter with request url matching",
"postfixDelimiter": "_",
"postfixFromUrl": ".*/path1/(.*)/.*"
"postfixFromRequest": {
"url": ".*/path1/(.*)/.*"
}
},
"queue-header-and-path-[a-z]+": {
"description": "Simple splitter with request header and url matching",
"postfixDelimiter": "_",
"postfixFromHeader": "x-rp-deviceid",
"postfixFromUrl": ".*/path1/(.*)/.*"
"postfixFromRequest": {
"header": "x-rp-deviceid",
"url": ".*/path1/(.*)/.*"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,21 @@ public QueuingHandler(
) {
this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler), new NoOpQueueSplitter());

Check warning on line 47 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L47

Added line #L47 was not covered by tests
}

public QueuingHandler(
Vertx vertx,
RedisProvider redisProvider,
HttpServerRequest request,
MonitoringHandler monitoringHandler,
QueueSplitter queueSplitter
) {
this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler), queueSplitter);
this(

Check warning on line 57 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L57

Added line #L57 was not covered by tests
vertx,
redisProvider,
request,
new QueueClient(vertx, monitoringHandler),
queueSplitter == null ? new NoOpQueueSplitter() : queueSplitter
);
}

Check warning on line 64 in gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java#L64

Added line #L64 was not covered by tests

public QueuingHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public class QueueSplitterConfigurationParser {

private static final Logger log = LoggerFactory.getLogger(QueueSplitterConfigurationParser.class);
public static final String POSTFIX_FROM_STATIC_KEY = "postfixFromStatic";
public static final String POSTFIX_FROM_HEADER_KEY = "postfixFromHeader";
public static final String POSTFIX_FROM_URL_KEY = "postfixFromUrl";
public static final String POSTFIX_FROM_REQUEST_KEY = "postfixFromRequest";
public static final String POSTFIX_FROM_HEADER_KEY = "header";
public static final String POSTFIX_FROM_URL_KEY = "url";
public static final String POSTFIX_DELIMITER_KEY = "postfixDelimiter";
public static final String DEFAULT_POSTFIX_DELIMITER = "-";

Expand Down Expand Up @@ -60,9 +61,10 @@ static List<QueueSplitterConfiguration> parse(Buffer configurationResourceBuffer
null
));
} else {
String postfixFromHeader = queueConfig.getString(POSTFIX_FROM_HEADER_KEY);
String postfixFromUrl = queueConfig.getString(POSTFIX_FROM_URL_KEY);
if (postfixFromHeader != null || postfixFromUrl != null) {
JsonObject postfixFromRequest = queueConfig.getJsonObject(POSTFIX_FROM_REQUEST_KEY);
String postfixFromHeader = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_HEADER_KEY) : null;
String postfixFromUrl = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_URL_KEY) : null;
if (postfixFromRequest != null && (postfixFromHeader != null || postfixFromUrl != null)) {
queueSplitterConfigurations.add(new QueueSplitterConfiguration(
pattern,
queueConfig.getString(POSTFIX_DELIMITER_KEY, DEFAULT_POSTFIX_DELIMITER),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,14 @@ public String convertToSubQueue(final String queue, HttpServerRequest request) {
public void resourceChanged(String resourceUri, Buffer resource) {
if (configResourceUri() != null && configResourceUri().equals(resourceUri)) {
log.info("Queue splitter configuration resource {} was updated. Going to initialize with new configuration", resourceUri);
System.out.println("Queue splitter configuration resource changed");
initializeQueueSplitterConfiguration(resource);
}
}

@Override
public void resourceRemoved(String resourceUri) {
if (configResourceUri() != null && configResourceUri().equals(resourceUri)) {
log.info("Queue splitter configuration resource {} was removed. Going to close all kafka producers", resourceUri);
System.out.println("Queue splitter configuration resource removed");
log.info("Queue splitter configuration resource {} was removed. Going to release all executors", resourceUri);
queueSplitExecutors.clear();
initialized = false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "A list of queue splitter configurations",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/QueueSplitter"
Expand All @@ -9,24 +10,66 @@
"description": "A single queue splitter configuration",
"type": "object",
"properties": {
"description": {
"description": "description of the splitter",
"type": "string"
},
"postfixDelimiter": {
"description": "Separator between original queue and postfix added",
"type": "string"
},
"postfixFromStatic": {
"description": "List of postfixes to use to compose sub-queues",
"type": "array",
"items": {
"type": "string"
},
"minItems": 1,
"uniqueItems": true
},
"postfixFromHeader": {
"postfixFromRequest": {
"$ref": "#/definitions/PostfixFromRequest"
}
},
"additionalProperties": false,
"oneOf": [
{
"required": [
"postfixFromStatic"
]
},
{
"required": [
"postfixFromRequest"
]
}
]
},
"PostfixFromRequest": {
"description": "Postfix generated using request header and/or url",
"anyOf": [
{
"required": [
"header"
]
},
{
"required": [
"url"
]
}
],
"properties": {
"header": {
"description": "Header to use as postfix",
"type": "string"
},
"url": {
"description": "Regex to group postfixes from url",
"type": "string"
}
},
"postfixFromUrl": {
"type": "string"
}
"additionalProperties": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
public class QueueSplitterConfigurationParserTest {

private final String CONFIGURATION_VALID = ResourcesUtils.loadResource(
"testresource_queuesplitter_configuration_valid",
"testresource_queuesplitter_configuration_valid_1",
true
);

Expand Down Expand Up @@ -147,7 +147,7 @@ public void parseWithValidAndProps(TestContext context) {
context.assertEquals(Pattern.compile("my-queue-[0-9]+").pattern(), config_1.getQueue().pattern());
context.assertEquals("_", config_1.getPostfixDelimiter());
context.assertNull(config_1.getPostfixFromStatic());
context.assertEquals("{x-rp-deviceid}", config_1.getPostfixFromHeader());
context.assertEquals("x-rp-deviceid", config_1.getPostfixFromHeader());
context.assertNull(config_1.getPostfixFromUrl());
}
@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceObserver;
import org.swisspush.gateleen.core.storage.MockResourceStorage;
import org.swisspush.gateleen.core.util.ResourcesUtils;

import java.util.concurrent.atomic.AtomicInteger;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.*;
import static org.swisspush.gateleen.core.configuration.ConfigurationResourceManager.CONFIG_RESOURCE_CHANGED_ADDRESS;

Expand All @@ -36,7 +29,8 @@ public class QueueSplitterImplTest {
private ConfigurationResourceManager configurationResourceManager;
private QueueSplitterImpl queueSplitter;

private final String CONFIG_RESOURCE_VALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid", true);
private final String CONFIG_RESOURCE_VALID_1 = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid_1", true);
private final String CONFIG_RESOURCE_VALID_2 = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid_2", true);
private final String CONFIG_RESOURCE_INVALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_invalid", true);

@Before
Expand Down Expand Up @@ -71,7 +65,7 @@ public void splitWithValidConfigResource(TestContext context) {

// Given
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1);
context.assertFalse(queueSplitter.isInitialized());

// When
Expand All @@ -98,10 +92,7 @@ public void splitWithPartiallyInvalidConfigResource(TestContext context) {
queueSplitter.initialize().onComplete(event -> {

// Then
context.assertTrue(queueSplitter.isInitialized());
verifySplitStaticExecuted(context);
verifySplitWithHeaderNotExecuted(context);
verifySplitWithUrlNotExecuted(context);
context.assertFalse(queueSplitter.isInitialized());
async.complete();
});
}
Expand All @@ -112,7 +103,7 @@ public void splitWithQueueNotMatchingAnyConfiguration(TestContext context) {

// Given
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1);
context.assertFalse(queueSplitter.isInitialized());

// When
Expand All @@ -131,7 +122,7 @@ public void configResourceRemovedTriggerRemoveAllExecutors(TestContext context)

// Given
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1);
queueSplitter.initialize().onComplete(event -> {
context.assertTrue(queueSplitter.isInitialized());
verifySplitStaticExecuted(context);
Expand Down Expand Up @@ -169,7 +160,7 @@ public void configResourceChangedTriggerNewInitOfExecutors(TestContext context)

// Given
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1);
queueSplitter.initialize().onComplete(event -> {
context.assertTrue(queueSplitter.isInitialized());
verifySplitStaticExecuted(context);
Expand Down Expand Up @@ -199,7 +190,7 @@ public void resourceRemoved(String resourceUri) {
}, configResourceUri);

// When
storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID);
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_2);
JsonObject object = new JsonObject();
object.put("requestUri", configResourceUri);
object.put("type", "change");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"my-queue-1" : {
"description": "Simple splitter with static list",
"postfixFromStatic": [
"A",
"B",
"C",
"D"
]
},
"my-queue-[0-9]+" : {
"description": "Simple splitter with request header",
"postfixFromRequest": {
"header": "x-rp-deviceid"
},
"postfixDelimiter": "+"
},
"my-queue-[a-zA-Z]+" : {
"description": "Simple splitter with request url matching",
"postfixFromRequest" : {
"url": ".*/path1/(.*)/path3/path4/.*"
},
"postfixDelimiter": "_"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"my-queue-1" : {
"postfixFromStatic": [
"A",
"B",
"C",
"D"
]
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"my-queue-[0-9]+" : {
"postfixDelimiter": "${queue.splitter.delimiter}",
"postfixFromHeader": "{x-rp-deviceid}"
"postfixFromRequest": {
"header": "x-rp-deviceid"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ private void handleRequest(final RoutingContext ctx) {
redisProvider,
request,
monitoringHandler,
queueSplitter != null ? queueSplitter : new NoOpQueueSplitter()
queueSplitter
)
));
} else {
Expand All @@ -626,7 +626,7 @@ private void handleRequest(final RoutingContext ctx) {
redisProvider,
request,
monitoringHandler,
queueSplitter != null ? queueSplitter : new NoOpQueueSplitter()));
queueSplitter));
} else {
if (cacheHandler != null && cacheHandler.handle(request)) {
return;
Expand Down

0 comments on commit ba3306b

Please sign in to comment.