diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java index 7b946356..4bb04b7c 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java @@ -8,10 +8,12 @@ import org.slf4j.LoggerFactory; import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer; import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutor; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromList; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromRequest; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.stream.Collectors; /** * {@inheritDoc} @@ -24,6 +26,8 @@ public class QueueSplitterImpl extends ConfigurationResourceConsumer implements private boolean initialized = false; + private List queueSplitExecutors = new ArrayList<>(); + public QueueSplitterImpl( ConfigurationResourceManager configurationResourceManager, String configResourceUri @@ -40,11 +44,16 @@ public QueueSplitterImpl( this.properties = properties; } + public List getQueueSplitExecutors() { + return queueSplitExecutors; + } + public Future initialize() { Promise promise = Promise.promise(); configurationResourceManager().getRegisteredResource(configResourceUri()).onComplete((event -> { if (event.succeeded() && event.result().isPresent()) { - initializeQueueSplitterConfiguration(event.result().get()).onComplete((event1 -> promise.complete())); + initializeQueueSplitterConfiguration(event.result().get()); + promise.complete(); } else { log.warn("No queue splitter configuration resource with uri '{}' found. Unable to setup kafka configuration correctly", configResourceUri()); promise.complete(); @@ -57,13 +66,18 @@ public boolean isInitialized() { return initialized; } - private Future initializeQueueSplitterConfiguration(Buffer configuration) { + private void initializeQueueSplitterConfiguration(Buffer configuration) { Promise promise = Promise.promise(); - final List kafkaConfigurations = QueueSplitterConfigurationParser.parse(configuration, properties); - - // TODO: release all splitters and creates new ones - - return promise.future(); + final List configurations = QueueSplitterConfigurationParser.parse(configuration, properties); + queueSplitExecutors.clear(); + queueSplitExecutors = configurations.stream().map(queueSplitterConfiguration -> { + if (queueSplitterConfiguration.isSplitStatic()) { + return new QueueSplitExecutorFromList(queueSplitterConfiguration); + } else { + return new QueueSplitExecutorFromRequest(queueSplitterConfiguration); + } + }).collect(Collectors.toList()); + initialized = true; } /** @@ -71,7 +85,8 @@ private Future initializeQueueSplitterConfiguration(Buffer configuration) */ @Override public String convertToSubQueue(final String queue, HttpServerRequest request) { - return queue + "-1"; + Optional executor = queueSplitExecutors.stream().filter(splitExecutor -> splitExecutor.matches(queue)).findFirst(); + return executor.isPresent() ? executor.get().executeSplit(queue, request) : queue; } @Override @@ -86,9 +101,7 @@ public void resourceChanged(String resourceUri, Buffer resource) { public void resourceRemoved(String resourceUri) { if (configResourceUri() != null && configResourceUri().equals(resourceUri)) { log.info("Queue splitter configuration resource {} was removed. Going to close all kafka producers", resourceUri); - - // TODO: release all splitters and creates new ones - + queueSplitExecutors.clear(); initialized = false; } } diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromList.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromList.java index e5b94c58..8124f85a 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromList.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromList.java @@ -9,7 +9,7 @@ public class QueueSplitExecutorFromList extends QueueSplitExecutorBase { AtomicInteger atomicInteger = new AtomicInteger(0); - protected QueueSplitExecutorFromList(QueueSplitterConfiguration configuration) { + public QueueSplitExecutorFromList(QueueSplitterConfiguration configuration) { super(configuration); } diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequest.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequest.java index 68341b9e..11730392 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequest.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/executors/QueueSplitExecutorFromRequest.java @@ -7,7 +7,7 @@ import java.util.regex.Pattern; public class QueueSplitExecutorFromRequest extends QueueSplitExecutorBase { - protected QueueSplitExecutorFromRequest(QueueSplitterConfiguration configuration) { + public QueueSplitExecutorFromRequest(QueueSplitterConfiguration configuration) { super(configuration); } diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java new file mode 100644 index 00000000..b916589e --- /dev/null +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java @@ -0,0 +1,190 @@ +package org.swisspush.gateleen.queue.queuing.splitter; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.impl.headers.HeadersMultiMap; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.core.storage.MockResourceStorage; +import org.swisspush.gateleen.core.util.ResourcesUtils; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutor; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromList; +import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromRequest; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.swisspush.gateleen.core.configuration.ConfigurationResourceManager.CONFIG_RESOURCE_CHANGED_ADDRESS; + +@RunWith(VertxUnitRunner.class) +public class QueueSplitterImplTest { + + private Vertx vertx; + private MockResourceStorage storage; + private final String configResourceUri = "/queueSplitters"; + private ConfigurationResourceManager configurationResourceManager; + private QueueSplitterImpl queueSplitter; + + private final String CONFIG_RESOURCE_VALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid", true); + private final String CONFIG_RESOURCE_INVALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_invalid", true); + + @Before + public void setUp() { + vertx = Vertx.vertx(); + storage = new MockResourceStorage(); + configurationResourceManager = new ConfigurationResourceManager(vertx, storage); + queueSplitter = new QueueSplitterImpl(configurationResourceManager, configResourceUri); + } + + @Test + public void initWithMissingConfigResource(TestContext context) { + Async async = context.async(); + context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { + context.assertFalse(queueSplitter.isInitialized()); + context.assertTrue(queueSplitter.getQueueSplitExecutors().isEmpty()); + async.complete(); + }); + } + + @Test + public void initWithExistingConfigResource(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { + context.assertTrue(queueSplitter.isInitialized()); + context.assertEquals(3, queueSplitter.getQueueSplitExecutors().size()); + + QueueSplitExecutor executor_1 = queueSplitter.getQueueSplitExecutors().get(0); + context.assertTrue(executor_1 instanceof QueueSplitExecutorFromList); + + QueueSplitExecutor executor_2 = queueSplitter.getQueueSplitExecutors().get(1); + context.assertTrue(executor_2 instanceof QueueSplitExecutorFromRequest); + + QueueSplitExecutor executor_3 = queueSplitter.getQueueSplitExecutors().get(2); + context.assertTrue(executor_3 instanceof QueueSplitExecutorFromRequest); + + async.complete(); + }); + } + + @Test + public void initWithExistingInvalidConfigResource(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID); + context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { + context.assertTrue(queueSplitter.isInitialized()); + context.assertEquals(1, queueSplitter.getQueueSplitExecutors().size()); + + QueueSplitExecutor executor_1 = queueSplitter.getQueueSplitExecutors().get(0); + context.assertTrue(executor_1 instanceof QueueSplitExecutorFromList); + + async.complete(); + }); + } + + @Test + @Ignore("verify with timeout and await don't work, review with Marc") + public void configResourceRemovedTriggerRemoveAllExecutors(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { + context.assertTrue(queueSplitter.isInitialized()); + context.assertEquals(3, queueSplitter.getQueueSplitExecutors().size()); + JsonObject object = new JsonObject(); + object.put("requestUri", configResourceUri); + object.put("type", "remove"); + vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object); + // try { Thread.sleep(2000);} catch (InterruptedException e) {} + // verify(storage, timeout(100).times(1)).delete(eq(CONFIG_RESOURCE_CHANGED_ADDRESS), any()); + // await().atMost(TWO_SECONDS).until(() -> queueSplitter.getQueueSplitExecutors(), equalTo(0)); + context.assertEquals(0, queueSplitter.getQueueSplitExecutors().size()); + async.complete(); + }); + } + + @Test + @Ignore("verify with timeout and await don't work, review with Marc") + public void configResourceChangedTriggerNewInitOfExecutors(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { + context.assertTrue(queueSplitter.isInitialized()); + context.assertEquals(3, queueSplitter.getQueueSplitExecutors().size()); + + JsonObject object = new JsonObject(); + object.put("requestUri", configResourceUri); + object.put("type", "change"); + vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object); + // try { Thread.sleep(2000);} catch (InterruptedException e) {} + // verify(storage, timeout(100).times(1)).delete(eq(CONFIG_RESOURCE_CHANGED_ADDRESS), any()); + // await().atMost(TWO_SECONDS).until(() -> queueSplitter.getQueueSplitExecutors(), equalTo(0)); + context.assertEquals(0, queueSplitter.getQueueSplitExecutors().size()); + async.complete(); + }); + } + + @Test + public void testConvertToSubQueueWithPostfixFromStatic(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap()); + context.assertEquals("my-queue-1-A", queueSplitter.convertToSubQueue("my-queue-1", request)); + context.assertEquals("my-queue-1-B", queueSplitter.convertToSubQueue("my-queue-1", request)); + context.assertEquals("my-queue-1-C", queueSplitter.convertToSubQueue("my-queue-1", request)); + async.complete(); + }); + } + + @Test + public void testConvertToSubQueueWithPostfixFromHeader(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap().add("x-rp-deviceid", "A1B2C3D4E5F6")); + context.assertEquals("my-queue-2+A1B2C3D4E5F6", queueSplitter.convertToSubQueue("my-queue-2", request)); + async.complete(); + }); + } + @Test + public void testConvertToSubQueueWithPostfixFromUrl(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap()); + when(request.uri()).thenReturn("/path1/path2/path3/path4/"); + context.assertEquals("my-queue-a_path2", queueSplitter.convertToSubQueue("my-queue-a", request)); + async.complete(); + }); + } + + @Test + public void testConvertToSubQueueWithNoPostfixConfigured(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap()); + context.assertEquals("another-queue", queueSplitter.convertToSubQueue("another-queue", request)); + async.complete(); + }); + } +}