Skip to content

Commit

Permalink
#550 commited some tests to review
Browse files Browse the repository at this point in the history
  • Loading branch information
zorba71 committed Jan 30, 2024
1 parent 1446d05 commit 9ab5c6f
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public boolean isInitialized() {
}

private void initializeQueueSplitterConfiguration(Buffer configuration) {
Promise<Void> promise = Promise.promise();
final List<QueueSplitterConfiguration> configurations = QueueSplitterConfigurationParser.parse(configuration, properties);
queueSplitExecutors.clear();
queueSplitExecutors = configurations.stream().map(queueSplitterConfiguration -> {
Expand All @@ -89,6 +88,7 @@ public String convertToSubQueue(final String queue, HttpServerRequest request) {
public void resourceChanged(String resourceUri, Buffer resource) {
if (configResourceUri() != null && configResourceUri().equals(resourceUri)) {
log.info("Queue splitter configuration resource {} was updated. Going to initialize with new configuration", resourceUri);
System.out.println("Queue splitter configuration resource changed");
initializeQueueSplitterConfiguration(resource);
}
}
Expand All @@ -97,6 +97,7 @@ public void resourceChanged(String resourceUri, Buffer resource) {
public void resourceRemoved(String resourceUri) {
if (configResourceUri() != null && configResourceUri().equals(resourceUri)) {
log.info("Queue splitter configuration resource {} was removed. Going to close all kafka producers", resourceUri);
System.out.println("Queue splitter configuration resource removed");
queueSplitExecutors.clear();
initialized = false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.core.json.JsonObject;
Expand All @@ -12,9 +15,15 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceObserver;
import org.swisspush.gateleen.core.storage.MockResourceStorage;
import org.swisspush.gateleen.core.util.ResourcesUtils;

import java.util.concurrent.atomic.AtomicInteger;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.*;
import static org.swisspush.gateleen.core.configuration.ConfigurationResourceManager.CONFIG_RESOURCE_CHANGED_ADDRESS;

Expand Down Expand Up @@ -94,54 +103,170 @@ public void splitWithQueueNotMatchingAnyConfiguration(TestContext context) {
}

@Test
@Ignore("verify with timeout and await don't work, review with Marc")
public void configResourceRemovedTriggerRemoveAllExecutorsOld(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
queueSplitter.initialize().onComplete(event -> {
context.assertTrue(queueSplitter.isInitialized());
verifySplitStaticExecuted(context);
verifySplitWithHeaderExecuted(context);
verifySplitWithUrlExecuted(context);

configurationResourceManager.registerObserver(new ConfigurationResourceObserver() {

@Override
public void resourceChanged(String resourceUri, Buffer resource) {
}

@Override
public void resourceRemoved(String resourceUri) {
context.assertFalse(queueSplitter.isInitialized());
verifySplitStaticNotExecuted(context);
verifySplitWithHeaderNotExecuted(context);
verifySplitWithUrlNotExecuted(context);
async.complete();
}
}, configResourceUri);

JsonObject object = new JsonObject();
object.put("requestUri", configResourceUri);
object.put("type", "remove");
vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object);
});
}

@Test
public void configResourceRemovedTriggerRemoveAllExecutors(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
context.assertTrue(queueSplitter.isInitialized());
verifySplitStaticExecuted(context);
verifySplitWithHeaderExecuted(context);
verifySplitWithUrlExecuted(context);

vertx.eventBus().consumer(CONFIG_RESOURCE_CHANGED_ADDRESS, (Handler<Message<JsonObject>>) message -> {
context.assertEquals("remove", message.body().getString("type"));
context.assertFalse(queueSplitter.isInitialized());
verifySplitStaticNotExecuted(context);
verifySplitWithHeaderNotExecuted(context);
verifySplitWithUrlNotExecuted(context);
async.complete();
});
JsonObject object = new JsonObject();
object.put("requestUri", configResourceUri);
object.put("type", "remove");
vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object);
try { Thread.sleep(10000);} catch (InterruptedException e) {}
// verify(storage, timeout(100).times(1)).delete(eq(CONFIG_RESOURCE_CHANGED_ADDRESS), any());
// await().atMost(TWO_SECONDS).until(() -> queueSplitter.getQueueSplitExecutors(), equalTo(0));
verifySplitStaticNotExecuted(context);
verifySplitWithHeaderNotExecuted(context);
verifySplitWithUrlNotExecuted(context);
async.complete();
});
}

@Test
@Ignore("verify with timeout and await don't work, review with Marc")
public void configResourceChangedTriggerNewInitOfExecutors(TestContext context) {
public void configResourceChangedTriggerNewInitOfExecutorsOld(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
context.assertFalse(queueSplitter.isInitialized());

queueSplitter.initialize().onComplete(event -> {
context.assertTrue(queueSplitter.isInitialized());
verifySplitStaticExecuted(context);
verifySplitWithHeaderExecuted(context);
verifySplitWithUrlExecuted(context);

configurationResourceManager.registerObserver(new ConfigurationResourceObserver() {

private int resourceChangedCalls = 0;

@Override
public void resourceChanged(String resourceUri, Buffer resource) {
resourceChangedCalls++;
if (resourceChangedCalls == 2) {
context.assertTrue(queueSplitter.isInitialized());
verifySplitStaticExecuted(context);
verifySplitWithHeaderNotExecuted(context);
verifySplitWithUrlNotExecuted(context);
async.complete();
}
}

@Override
public void resourceRemoved(String resourceUri) {
}
}, configResourceUri);

storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID);
JsonObject object = new JsonObject();
object.put("requestUri", configResourceUri);
object.put("type", "change");
vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object);
// try { Thread.sleep(2000);} catch (InterruptedException e) {}
// verify(storage, timeout(100).times(1)).put(eq(CONFIG_RESOURCE_CHANGED_ADDRESS), any(), any());
verify(storage, times(1)).get(anyString(), any());
// await().atMost(TWO_SECONDS).until(() -> queueSplitter.getQueueSplitExecutors(), equalTo(0));
});
}

@Test
@Ignore("to review")
public void configResourceChangedTriggerNewInitOfExecutors1(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);

queueSplitter.initialize().onComplete(event -> {

context.assertTrue(queueSplitter.isInitialized());
verifySplitStaticExecuted(context);
verifySplitWithHeaderNotExecuted(context);
verifySplitWithUrlNotExecuted(context);
async.complete();
verifySplitWithHeaderExecuted(context);
verifySplitWithUrlExecuted(context);

storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID);
JsonObject object = new JsonObject();
object.put("requestUri", configResourceUri);
object.put("type", "change");
vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object);

await().atMost(10, SECONDS).until( () -> {
HttpServerRequest request = mock(HttpServerRequest.class);
when(request.headers()).thenReturn(new HeadersMultiMap().add("x-rp-deviceid", "A1B2C3D4E5F6"));
String value = queueSplitter.convertToSubQueue("my-queue-2", request);
System.out.println("await called");
return "my-queue-2".equals(value);
}, equalTo(Boolean.TRUE));

context.assertTrue(queueSplitter.isInitialized());
// verifySplitStaticExecuted(context);
// verifySplitWithHeaderNotExecuted(context);
// verifySplitWithUrlNotExecuted(context);
// async.complete();
});
}

@Test
@Ignore("to review")
public void configResourceChangedTriggerNewInitOfExecutors2(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);

AtomicInteger resourceChangedCalls = new AtomicInteger(0);
vertx.eventBus().consumer(CONFIG_RESOURCE_CHANGED_ADDRESS, (Handler<Message<JsonObject>>) message -> {

int counter = resourceChangedCalls.incrementAndGet();
if (counter == 1) {
context.assertEquals("change", message.body().getString("type"));
context.assertFalse(queueSplitter.isInitialized());
verifySplitStaticNotExecuted(context);
verifySplitWithHeaderNotExecuted(context);
verifySplitWithUrlNotExecuted(context);
async.complete();
} else if (counter == 2) {
context.assertEquals("change", message.body().getString("type"));
context.assertTrue(queueSplitter.isInitialized());
verifySplitStaticExecuted(context);
verifySplitWithHeaderExecuted(context);
verifySplitWithUrlExecuted(context);
async.complete();
}
});

storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID);
JsonObject object = new JsonObject();
object.put("requestUri", configResourceUri);
object.put("type", "change");
vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object);
}

private void verifySplitStaticExecuted(TestContext context) {
Expand Down

0 comments on commit 9ab5c6f

Please sign in to comment.