diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java index 34aaa139..74e88241 100644 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImpl.java @@ -63,7 +63,6 @@ public boolean isInitialized() { } private void initializeQueueSplitterConfiguration(Buffer configuration) { - Promise promise = Promise.promise(); final List configurations = QueueSplitterConfigurationParser.parse(configuration, properties); queueSplitExecutors.clear(); queueSplitExecutors = configurations.stream().map(queueSplitterConfiguration -> { @@ -89,6 +88,7 @@ public String convertToSubQueue(final String queue, HttpServerRequest request) { public void resourceChanged(String resourceUri, Buffer resource) { if (configResourceUri() != null && configResourceUri().equals(resourceUri)) { log.info("Queue splitter configuration resource {} was updated. Going to initialize with new configuration", resourceUri); + System.out.println("Queue splitter configuration resource changed"); initializeQueueSplitterConfiguration(resource); } } @@ -97,6 +97,7 @@ public void resourceChanged(String resourceUri, Buffer resource) { public void resourceRemoved(String resourceUri) { if (configResourceUri() != null && configResourceUri().equals(resourceUri)) { log.info("Queue splitter configuration resource {} was removed. Going to close all kafka producers", resourceUri); + System.out.println("Queue splitter configuration resource removed"); queueSplitExecutors.clear(); initialized = false; } diff --git a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java index 5e89bb6a..401ff948 100644 --- a/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java +++ b/gateleen-queue/src/test/java/org/swisspush/gateleen/queue/queuing/splitter/QueueSplitterImplTest.java @@ -1,6 +1,9 @@ package org.swisspush.gateleen.queue.queuing.splitter; +import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.Message; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.impl.headers.HeadersMultiMap; import io.vertx.core.json.JsonObject; @@ -12,9 +15,15 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.core.configuration.ConfigurationResourceObserver; import org.swisspush.gateleen.core.storage.MockResourceStorage; import org.swisspush.gateleen.core.util.ResourcesUtils; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.*; import static org.swisspush.gateleen.core.configuration.ConfigurationResourceManager.CONFIG_RESOURCE_CHANGED_ADDRESS; @@ -94,54 +103,170 @@ public void splitWithQueueNotMatchingAnyConfiguration(TestContext context) { } @Test - @Ignore("verify with timeout and await don't work, review with Marc") + public void configResourceRemovedTriggerRemoveAllExecutorsOld(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + queueSplitter.initialize().onComplete(event -> { + context.assertTrue(queueSplitter.isInitialized()); + verifySplitStaticExecuted(context); + verifySplitWithHeaderExecuted(context); + verifySplitWithUrlExecuted(context); + + configurationResourceManager.registerObserver(new ConfigurationResourceObserver() { + + @Override + public void resourceChanged(String resourceUri, Buffer resource) { + } + + @Override + public void resourceRemoved(String resourceUri) { + context.assertFalse(queueSplitter.isInitialized()); + verifySplitStaticNotExecuted(context); + verifySplitWithHeaderNotExecuted(context); + verifySplitWithUrlNotExecuted(context); + async.complete(); + } + }, configResourceUri); + + JsonObject object = new JsonObject(); + object.put("requestUri", configResourceUri); + object.put("type", "remove"); + vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object); + }); + } + + @Test public void configResourceRemovedTriggerRemoveAllExecutors(TestContext context) { Async async = context.async(); storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); - context.assertFalse(queueSplitter.isInitialized()); queueSplitter.initialize().onComplete(event -> { context.assertTrue(queueSplitter.isInitialized()); verifySplitStaticExecuted(context); verifySplitWithHeaderExecuted(context); verifySplitWithUrlExecuted(context); + + vertx.eventBus().consumer(CONFIG_RESOURCE_CHANGED_ADDRESS, (Handler>) message -> { + context.assertEquals("remove", message.body().getString("type")); + context.assertFalse(queueSplitter.isInitialized()); + verifySplitStaticNotExecuted(context); + verifySplitWithHeaderNotExecuted(context); + verifySplitWithUrlNotExecuted(context); + async.complete(); + }); JsonObject object = new JsonObject(); object.put("requestUri", configResourceUri); object.put("type", "remove"); vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object); - try { Thread.sleep(10000);} catch (InterruptedException e) {} - // verify(storage, timeout(100).times(1)).delete(eq(CONFIG_RESOURCE_CHANGED_ADDRESS), any()); - // await().atMost(TWO_SECONDS).until(() -> queueSplitter.getQueueSplitExecutors(), equalTo(0)); - verifySplitStaticNotExecuted(context); - verifySplitWithHeaderNotExecuted(context); - verifySplitWithUrlNotExecuted(context); - async.complete(); }); } @Test - @Ignore("verify with timeout and await don't work, review with Marc") - public void configResourceChangedTriggerNewInitOfExecutors(TestContext context) { + public void configResourceChangedTriggerNewInitOfExecutorsOld(TestContext context) { Async async = context.async(); storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); - context.assertFalse(queueSplitter.isInitialized()); + queueSplitter.initialize().onComplete(event -> { context.assertTrue(queueSplitter.isInitialized()); verifySplitStaticExecuted(context); verifySplitWithHeaderExecuted(context); verifySplitWithUrlExecuted(context); + + configurationResourceManager.registerObserver(new ConfigurationResourceObserver() { + + private int resourceChangedCalls = 0; + + @Override + public void resourceChanged(String resourceUri, Buffer resource) { + resourceChangedCalls++; + if (resourceChangedCalls == 2) { + context.assertTrue(queueSplitter.isInitialized()); + verifySplitStaticExecuted(context); + verifySplitWithHeaderNotExecuted(context); + verifySplitWithUrlNotExecuted(context); + async.complete(); + } + } + + @Override + public void resourceRemoved(String resourceUri) { + } + }, configResourceUri); + + storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID); JsonObject object = new JsonObject(); object.put("requestUri", configResourceUri); object.put("type", "change"); vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object); - // try { Thread.sleep(2000);} catch (InterruptedException e) {} - // verify(storage, timeout(100).times(1)).put(eq(CONFIG_RESOURCE_CHANGED_ADDRESS), any(), any()); - verify(storage, times(1)).get(anyString(), any()); - // await().atMost(TWO_SECONDS).until(() -> queueSplitter.getQueueSplitExecutors(), equalTo(0)); + }); + } + + @Test + @Ignore("to review") + public void configResourceChangedTriggerNewInitOfExecutors1(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + + queueSplitter.initialize().onComplete(event -> { + + context.assertTrue(queueSplitter.isInitialized()); verifySplitStaticExecuted(context); - verifySplitWithHeaderNotExecuted(context); - verifySplitWithUrlNotExecuted(context); - async.complete(); + verifySplitWithHeaderExecuted(context); + verifySplitWithUrlExecuted(context); + + storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID); + JsonObject object = new JsonObject(); + object.put("requestUri", configResourceUri); + object.put("type", "change"); + vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object); + + await().atMost(10, SECONDS).until( () -> { + HttpServerRequest request = mock(HttpServerRequest.class); + when(request.headers()).thenReturn(new HeadersMultiMap().add("x-rp-deviceid", "A1B2C3D4E5F6")); + String value = queueSplitter.convertToSubQueue("my-queue-2", request); + System.out.println("await called"); + return "my-queue-2".equals(value); + }, equalTo(Boolean.TRUE)); + + context.assertTrue(queueSplitter.isInitialized()); +// verifySplitStaticExecuted(context); +// verifySplitWithHeaderNotExecuted(context); +// verifySplitWithUrlNotExecuted(context); +// async.complete(); + }); + } + + @Test + @Ignore("to review") + public void configResourceChangedTriggerNewInitOfExecutors2(TestContext context) { + Async async = context.async(); + storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID); + + AtomicInteger resourceChangedCalls = new AtomicInteger(0); + vertx.eventBus().consumer(CONFIG_RESOURCE_CHANGED_ADDRESS, (Handler>) message -> { + + int counter = resourceChangedCalls.incrementAndGet(); + if (counter == 1) { + context.assertEquals("change", message.body().getString("type")); + context.assertFalse(queueSplitter.isInitialized()); + verifySplitStaticNotExecuted(context); + verifySplitWithHeaderNotExecuted(context); + verifySplitWithUrlNotExecuted(context); + async.complete(); + } else if (counter == 2) { + context.assertEquals("change", message.body().getString("type")); + context.assertTrue(queueSplitter.isInitialized()); + verifySplitStaticExecuted(context); + verifySplitWithHeaderExecuted(context); + verifySplitWithUrlExecuted(context); + async.complete(); + } }); + + storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID); + JsonObject object = new JsonObject(); + object.put("requestUri", configResourceUri); + object.put("type", "change"); + vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object); } private void verifySplitStaticExecuted(TestContext context) {