From ba3306b9ea37a0595b192e15857e211f0587e95a Mon Sep 17 00:00:00 2001 From: Giannandrea Castaldi Date: Wed, 7 Feb 2024 13:54:35 +0100 Subject: [PATCH] #550 fixes, refactorings and improvements in splitter --- .../playground/server/admin/v1/queueSplitters | 18 +++++-- .../queue/queuing/QueuingHandler.java | 9 +++- .../QueueSplitterConfigurationParser.java | 12 +++-- .../queuing/splitter/QueueSplitterImpl.java | 4 +- ...teleen_queue_splitter_configuration_schema | 51 +++++++++++++++++-- .../QueueSplitterConfigurationParserTest.java | 4 +- .../splitter/QueueSplitterImplTest.java | 25 +++------ ...resource_queuesplitter_configuration_valid | 18 ------- ...source_queuesplitter_configuration_valid_1 | 25 +++++++++ ...source_queuesplitter_configuration_valid_2 | 10 ++++ ...rce_queuesplitter_configuration_with_props | 4 +- .../gateleen/runconfig/RunConfig.java | 4 +- 12 files changed, 127 insertions(+), 57 deletions(-) delete mode 100644 gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid create mode 100644 gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_1 create mode 100644 gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_2 diff --git a/gateleen-playground/src/main/resources/playground/server/admin/v1/queueSplitters b/gateleen-playground/src/main/resources/playground/server/admin/v1/queueSplitters index c96ee021..6722e7d3 100644 --- a/gateleen-playground/src/main/resources/playground/server/admin/v1/queueSplitters +++ b/gateleen-playground/src/main/resources/playground/server/admin/v1/queueSplitters @@ -1,5 +1,6 @@ { "queue-static-split": { + "description": "Simple splitter with request header", "postfixFromStatic": [ "A", "B", @@ -8,16 +9,25 @@ ] }, "queue-header-[a-z]+": { + "description": "Simple splitter with request header", "postfixDelimiter": "+", - "postfixFromHeader": "x-rp-deviceid" + "postfixFromRequest": { + "header": "x-rp-deviceid" + } }, "queue-path-[a-z]+": { + "description": "Simple splitter with request url matching", "postfixDelimiter": "_", - "postfixFromUrl": ".*/path1/(.*)/.*" + "postfixFromRequest": { + "url": ".*/path1/(.*)/.*" + } }, "queue-header-and-path-[a-z]+": { + "description": "Simple splitter with request header and url matching", "postfixDelimiter": "_", - "postfixFromHeader": "x-rp-deviceid", - "postfixFromUrl": ".*/path1/(.*)/.*" + "postfixFromRequest": { + "header": "x-rp-deviceid", + "url": ".*/path1/(.*)/.*" + } } } \ No newline at end of file diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java index b548a9c7..e95c0fac 100755 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueuingHandler.java @@ -46,6 +46,7 @@ public QueuingHandler( ) { this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler), new NoOpQueueSplitter()); } + public QueuingHandler( Vertx vertx, RedisProvider redisProvider, @@ -53,7 +54,13 @@ public QueuingHandler( MonitoringHandler monitoringHandler, QueueSplitter queueSplitter ) { - this(vertx, redisProvider, request, new QueueClient(vertx, monitoringHandler), queueSplitter); + this( + vertx, + redisProvider, + request, + new QueueClient(vertx, monitoringHandler), + queueSplitter == null ? new NoOpQueueSplitter() : queueSplitter + ); } public QueuingHandler( diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java index d84b0672..0861c9df 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParser.java @@ -24,8 +24,9 @@ public class QueueSplitterConfigurationParser { private static final Logger log = LoggerFactory.getLogger(QueueSplitterConfigurationParser.class); public static final String POSTFIX_FROM_STATIC_KEY = "postfixFromStatic"; - public static final String POSTFIX_FROM_HEADER_KEY = "postfixFromHeader"; - public static final String POSTFIX_FROM_URL_KEY = "postfixFromUrl"; + public static final String POSTFIX_FROM_REQUEST_KEY = "postfixFromRequest"; + public static final String POSTFIX_FROM_HEADER_KEY = "header"; + public static final String POSTFIX_FROM_URL_KEY = "url"; public static final String POSTFIX_DELIMITER_KEY = "postfixDelimiter"; public static final String DEFAULT_POSTFIX_DELIMITER = "-"; @@ -60,9 +61,10 @@ static List parse(Buffer configurationResourceBuffer null )); } else { - String postfixFromHeader = queueConfig.getString(POSTFIX_FROM_HEADER_KEY); - String postfixFromUrl = queueConfig.getString(POSTFIX_FROM_URL_KEY); - if (postfixFromHeader != null || postfixFromUrl != null) { + JsonObject postfixFromRequest = queueConfig.getJsonObject(POSTFIX_FROM_REQUEST_KEY); + String postfixFromHeader = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_HEADER_KEY) : null; + String postfixFromUrl = postfixFromRequest != null ? postfixFromRequest.getString(POSTFIX_FROM_URL_KEY) : null; + if (postfixFromRequest != null && (postfixFromHeader != null || postfixFromUrl != null)) { queueSplitterConfigurations.add(new QueueSplitterConfiguration( pattern, queueConfig.getString(POSTFIX_DELIMITER_KEY, DEFAULT_POSTFIX_DELIMITER), diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java index 085cb384..3e2f4779 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java @@ -88,7 +88,6 @@ public String convertToSubQueue(final String queue, HttpServerRequest request) { public void resourceChanged(String resourceUri, Buffer resource) { if (configResourceUri() != null && configResourceUri().equals(resourceUri)) { log.info("Queue splitter configuration resource {} was updated. Going to initialize with new configuration", resourceUri); - System.out.println("Queue splitter configuration resource changed"); initializeQueueSplitterConfiguration(resource); } } @@ -96,8 +95,7 @@ public void resourceChanged(String resourceUri, Buffer resource) { @Override public void resourceRemoved(String resourceUri) { if (configResourceUri() != null && configResourceUri().equals(resourceUri)) { - log.info("Queue splitter configuration resource {} was removed. Going to close all kafka producers", resourceUri); - System.out.println("Queue splitter configuration resource removed"); + log.info("Queue splitter configuration resource {} was removed. Going to release all executors", resourceUri); queueSplitExecutors.clear(); initialized = false; } diff --git a/gateleen-queue/src/main/resources/gateleen_queue_splitter_configuration_schema b/gateleen-queue/src/main/resources/gateleen_queue_splitter_configuration_schema index ab8dbb66..65053440 100644 --- a/gateleen-queue/src/main/resources/gateleen_queue_splitter_configuration_schema +++ b/gateleen-queue/src/main/resources/gateleen_queue_splitter_configuration_schema @@ -1,5 +1,6 @@ { "$schema": "http://json-schema.org/draft-04/schema#", + "description": "A list of queue splitter configurations", "type": "object", "additionalProperties": { "$ref": "#/definitions/QueueSplitter" @@ -9,10 +10,16 @@ "description": "A single queue splitter configuration", "type": "object", "properties": { + "description": { + "description": "description of the splitter", + "type": "string" + }, "postfixDelimiter": { + "description": "Separator between original queue and postfix added", "type": "string" }, "postfixFromStatic": { + "description": "List of postfixes to use to compose sub-queues", "type": "array", "items": { "type": "string" @@ -20,13 +27,49 @@ "minItems": 1, "uniqueItems": true }, - "postfixFromHeader": { + "postfixFromRequest": { + "$ref": "#/definitions/PostfixFromRequest" + } + }, + "additionalProperties": false, + "oneOf": [ + { + "required": [ + "postfixFromStatic" + ] + }, + { + "required": [ + "postfixFromRequest" + ] + } + ] + }, + "PostfixFromRequest": { + "description": "Postfix generated using request header and/or url", + "anyOf": [ + { + "required": [ + "header" + ] + }, + { + "required": [ + "url" + ] + } + ], + "properties": { + "header": { + "description": "Header to use as postfix", + "type": "string" + }, + "url": { + "description": "Regex to group postfixes from url", "type": "string" } }, - "postfixFromUrl": { - "type": "string" - } + "additionalProperties": false } } } \ No newline at end of file diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParserTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParserTest.java index e5dc3bcc..7d635776 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParserTest.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterConfigurationParserTest.java @@ -21,7 +21,7 @@ public class QueueSplitterConfigurationParserTest { private final String CONFIGURATION_VALID = ResourcesUtils.loadResource( - "testresource_queuesplitter_configuration_valid", + "testresource_queuesplitter_configuration_valid_1", true ); @@ -147,7 +147,7 @@ public void parseWithValidAndProps(TestContext context) { context.assertEquals(Pattern.compile("my-queue-[0-9]+").pattern(), config_1.getQueue().pattern()); context.assertEquals("_", config_1.getPostfixDelimiter()); context.assertNull(config_1.getPostfixFromStatic()); - context.assertEquals("{x-rp-deviceid}", config_1.getPostfixFromHeader()); + context.assertEquals("x-rp-deviceid", config_1.getPostfixFromHeader()); context.assertNull(config_1.getPostfixFromUrl()); } @Test diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java index 44e9fca0..9adeb3d5 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java @@ -1,9 +1,7 @@ package org.swisspush.gateleen.queue.queuing.splitter; -import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; -import io.vertx.core.eventbus.Message; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.impl.headers.HeadersMultiMap; import io.vertx.core.json.JsonObject; @@ -11,7 +9,6 @@ import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; @@ -19,11 +16,7 @@ import org.swisspush.gateleen.core.storage.MockResourceStorage; import org.swisspush.gateleen.core.util.ResourcesUtils; -import java.util.concurrent.atomic.AtomicInteger; - -import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; -import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.*; import static org.swisspush.gateleen.core.configuration.ConfigurationResourceManager.CONFIG_RESOURCE_CHANGED_ADDRESS; @@ -36,7 +29,8 @@ public class QueueSplitterImplTest { private ConfigurationResourceManager configurationResourceManager; private QueueSplitterImpl queueSplitter; - private final String CONFIG_RESOURCE_VALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid", true); + private final String CONFIG_RESOURCE_VALID_1 = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid_1", true); + private final String CONFIG_RESOURCE_VALID_2 = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid_2", true); private final String CONFIG_RESOURCE_INVALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_invalid", true); @Before @@ -71,7 +65,7 @@ public void splitWithValidConfigResource(TestContext context) { // Given Async async = context.async(); - storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1); context.assertFalse(queueSplitter.isInitialized()); // When @@ -98,10 +92,7 @@ public void splitWithPartiallyInvalidConfigResource(TestContext context) { queueSplitter.initialize().onComplete(event -> { // Then - context.assertTrue(queueSplitter.isInitialized()); - verifySplitStaticExecuted(context); - verifySplitWithHeaderNotExecuted(context); - verifySplitWithUrlNotExecuted(context); + context.assertFalse(queueSplitter.isInitialized()); async.complete(); }); } @@ -112,7 +103,7 @@ public void splitWithQueueNotMatchingAnyConfiguration(TestContext context) { // Given Async async = context.async(); - storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1); context.assertFalse(queueSplitter.isInitialized()); // When @@ -131,7 +122,7 @@ public void configResourceRemovedTriggerRemoveAllExecutors(TestContext context) // Given Async async = context.async(); - storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1); queueSplitter.initialize().onComplete(event -> { context.assertTrue(queueSplitter.isInitialized()); verifySplitStaticExecuted(context); @@ -169,7 +160,7 @@ public void configResourceChangedTriggerNewInitOfExecutors(TestContext context) // Given Async async = context.async(); - storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_1); queueSplitter.initialize().onComplete(event -> { context.assertTrue(queueSplitter.isInitialized()); verifySplitStaticExecuted(context); @@ -199,7 +190,7 @@ public void resourceRemoved(String resourceUri) { }, configResourceUri); // When - storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID_2); JsonObject object = new JsonObject(); object.put("requestUri", configResourceUri); object.put("type", "change"); diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid deleted file mode 100644 index bd359d38..00000000 --- a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid +++ /dev/null @@ -1,18 +0,0 @@ -{ - "my-queue-1" : { - "postfixFromStatic": [ - "A", - "B", - "C", - "D" - ] - }, - "my-queue-[0-9]+" : { - "postfixDelimiter": "+", - "postfixFromHeader": "x-rp-deviceid" - }, - "my-queue-[a-zA-Z]+" : { - "postfixDelimiter": "_", - "postfixFromUrl": ".*/path1/(.*)/path3/path4/.*" - } -} \ No newline at end of file diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_1 b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_1 new file mode 100644 index 00000000..7782e233 --- /dev/null +++ b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_1 @@ -0,0 +1,25 @@ +{ + "my-queue-1" : { + "description": "Simple splitter with static list", + "postfixFromStatic": [ + "A", + "B", + "C", + "D" + ] + }, + "my-queue-[0-9]+" : { + "description": "Simple splitter with request header", + "postfixFromRequest": { + "header": "x-rp-deviceid" + }, + "postfixDelimiter": "+" + }, + "my-queue-[a-zA-Z]+" : { + "description": "Simple splitter with request url matching", + "postfixFromRequest" : { + "url": ".*/path1/(.*)/path3/path4/.*" + }, + "postfixDelimiter": "_" + } +} \ No newline at end of file diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_2 b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_2 new file mode 100644 index 00000000..abf39438 --- /dev/null +++ b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_valid_2 @@ -0,0 +1,10 @@ +{ + "my-queue-1" : { + "postfixFromStatic": [ + "A", + "B", + "C", + "D" + ] + } +} \ No newline at end of file diff --git a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_with_props b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_with_props index ae03103c..3bde4907 100644 --- a/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_with_props +++ b/gateleen-queue/src/test/resources/testresource_queuesplitter_configuration_with_props @@ -1,6 +1,8 @@ { "my-queue-[0-9]+" : { "postfixDelimiter": "${queue.splitter.delimiter}", - "postfixFromHeader": "{x-rp-deviceid}" + "postfixFromRequest": { + "header": "x-rp-deviceid" + } } } \ No newline at end of file diff --git a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java index 5f632b7e..824add3a 100755 --- a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java +++ b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java @@ -615,7 +615,7 @@ private void handleRequest(final RoutingContext ctx) { redisProvider, request, monitoringHandler, - queueSplitter != null ? queueSplitter : new NoOpQueueSplitter() + queueSplitter ) )); } else { @@ -626,7 +626,7 @@ private void handleRequest(final RoutingContext ctx) { redisProvider, request, monitoringHandler, - queueSplitter != null ? queueSplitter : new NoOpQueueSplitter())); + queueSplitter)); } else { if (cacheHandler != null && cacheHandler.handle(request)) { return;