Skip to content

Commit

Permalink
#550 implemented QueueSplitterImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
zorba71 committed Jan 25, 2024
1 parent c385935 commit 12b48f1
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutor;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromList;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromRequest;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;

/**
* {@inheritDoc}
Expand All @@ -24,6 +26,8 @@ public class QueueSplitterImpl extends ConfigurationResourceConsumer implements

private boolean initialized = false;

private List<QueueSplitExecutor> queueSplitExecutors = new ArrayList<>();

public QueueSplitterImpl(
ConfigurationResourceManager configurationResourceManager,
String configResourceUri
Expand All @@ -40,11 +44,16 @@ public QueueSplitterImpl(
this.properties = properties;
}

public List<QueueSplitExecutor> getQueueSplitExecutors() {
return queueSplitExecutors;
}

public Future<Void> initialize() {
Promise<Void> promise = Promise.promise();
configurationResourceManager().getRegisteredResource(configResourceUri()).onComplete((event -> {
if (event.succeeded() && event.result().isPresent()) {
initializeQueueSplitterConfiguration(event.result().get()).onComplete((event1 -> promise.complete()));
initializeQueueSplitterConfiguration(event.result().get());
promise.complete();
} else {
log.warn("No queue splitter configuration resource with uri '{}' found. Unable to setup kafka configuration correctly", configResourceUri());
promise.complete();
Expand All @@ -57,21 +66,27 @@ public boolean isInitialized() {
return initialized;
}

private Future<Void> initializeQueueSplitterConfiguration(Buffer configuration) {
private void initializeQueueSplitterConfiguration(Buffer configuration) {
Promise<Void> promise = Promise.promise();
final List<QueueSplitterConfiguration> kafkaConfigurations = QueueSplitterConfigurationParser.parse(configuration, properties);

// TODO: release all splitters and creates new ones

return promise.future();
final List<QueueSplitterConfiguration> configurations = QueueSplitterConfigurationParser.parse(configuration, properties);
queueSplitExecutors.clear();
queueSplitExecutors = configurations.stream().map(queueSplitterConfiguration -> {
if (queueSplitterConfiguration.isSplitStatic()) {
return new QueueSplitExecutorFromList(queueSplitterConfiguration);
} else {
return new QueueSplitExecutorFromRequest(queueSplitterConfiguration);
}
}).collect(Collectors.toList());
initialized = true;
}

/**
* {@inheritDoc}
*/
@Override
public String convertToSubQueue(final String queue, HttpServerRequest request) {
return queue + "-1";
Optional<QueueSplitExecutor> executor = queueSplitExecutors.stream().filter(splitExecutor -> splitExecutor.matches(queue)).findFirst();
return executor.isPresent() ? executor.get().executeSplit(queue, request) : queue;
}

@Override
Expand All @@ -86,9 +101,7 @@ public void resourceChanged(String resourceUri, Buffer resource) {
public void resourceRemoved(String resourceUri) {
if (configResourceUri() != null && configResourceUri().equals(resourceUri)) {
log.info("Queue splitter configuration resource {} was removed. Going to close all kafka producers", resourceUri);

// TODO: release all splitters and creates new ones

queueSplitExecutors.clear();
initialized = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class QueueSplitExecutorFromList extends QueueSplitExecutorBase {

AtomicInteger atomicInteger = new AtomicInteger(0);

protected QueueSplitExecutorFromList(QueueSplitterConfiguration configuration) {
public QueueSplitExecutorFromList(QueueSplitterConfiguration configuration) {
super(configuration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.regex.Pattern;

public class QueueSplitExecutorFromRequest extends QueueSplitExecutorBase {
protected QueueSplitExecutorFromRequest(QueueSplitterConfiguration configuration) {
public QueueSplitExecutorFromRequest(QueueSplitterConfiguration configuration) {
super(configuration);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package org.swisspush.gateleen.queue.queuing.splitter;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.core.storage.MockResourceStorage;
import org.swisspush.gateleen.core.util.ResourcesUtils;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutor;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromList;
import org.swisspush.gateleen.queue.queuing.splitter.executors.QueueSplitExecutorFromRequest;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.swisspush.gateleen.core.configuration.ConfigurationResourceManager.CONFIG_RESOURCE_CHANGED_ADDRESS;

@RunWith(VertxUnitRunner.class)
public class QueueSplitterImplTest {

private Vertx vertx;
private MockResourceStorage storage;
private final String configResourceUri = "/queueSplitters";
private ConfigurationResourceManager configurationResourceManager;
private QueueSplitterImpl queueSplitter;

private final String CONFIG_RESOURCE_VALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_valid", true);
private final String CONFIG_RESOURCE_INVALID = ResourcesUtils.loadResource("testresource_queuesplitter_configuration_invalid", true);

@Before
public void setUp() {
vertx = Vertx.vertx();
storage = new MockResourceStorage();
configurationResourceManager = new ConfigurationResourceManager(vertx, storage);
queueSplitter = new QueueSplitterImpl(configurationResourceManager, configResourceUri);
}

@Test
public void initWithMissingConfigResource(TestContext context) {
Async async = context.async();
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
context.assertFalse(queueSplitter.isInitialized());
context.assertTrue(queueSplitter.getQueueSplitExecutors().isEmpty());
async.complete();
});
}

@Test
public void initWithExistingConfigResource(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
context.assertTrue(queueSplitter.isInitialized());
context.assertEquals(3, queueSplitter.getQueueSplitExecutors().size());

QueueSplitExecutor executor_1 = queueSplitter.getQueueSplitExecutors().get(0);
context.assertTrue(executor_1 instanceof QueueSplitExecutorFromList);

QueueSplitExecutor executor_2 = queueSplitter.getQueueSplitExecutors().get(1);
context.assertTrue(executor_2 instanceof QueueSplitExecutorFromRequest);

QueueSplitExecutor executor_3 = queueSplitter.getQueueSplitExecutors().get(2);
context.assertTrue(executor_3 instanceof QueueSplitExecutorFromRequest);

async.complete();
});
}

@Test
public void initWithExistingInvalidConfigResource(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_INVALID);
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
context.assertTrue(queueSplitter.isInitialized());
context.assertEquals(1, queueSplitter.getQueueSplitExecutors().size());

QueueSplitExecutor executor_1 = queueSplitter.getQueueSplitExecutors().get(0);
context.assertTrue(executor_1 instanceof QueueSplitExecutorFromList);

async.complete();
});
}

@Test
@Ignore("verify with timeout and await don't work, review with Marc")
public void configResourceRemovedTriggerRemoveAllExecutors(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
context.assertTrue(queueSplitter.isInitialized());
context.assertEquals(3, queueSplitter.getQueueSplitExecutors().size());
JsonObject object = new JsonObject();
object.put("requestUri", configResourceUri);
object.put("type", "remove");
vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object);
// try { Thread.sleep(2000);} catch (InterruptedException e) {}
// verify(storage, timeout(100).times(1)).delete(eq(CONFIG_RESOURCE_CHANGED_ADDRESS), any());
// await().atMost(TWO_SECONDS).until(() -> queueSplitter.getQueueSplitExecutors(), equalTo(0));
context.assertEquals(0, queueSplitter.getQueueSplitExecutors().size());
async.complete();
});
}

@Test
@Ignore("verify with timeout and await don't work, review with Marc")
public void configResourceChangedTriggerNewInitOfExecutors(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
context.assertTrue(queueSplitter.isInitialized());
context.assertEquals(3, queueSplitter.getQueueSplitExecutors().size());

JsonObject object = new JsonObject();
object.put("requestUri", configResourceUri);
object.put("type", "change");
vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object);
// try { Thread.sleep(2000);} catch (InterruptedException e) {}
// verify(storage, timeout(100).times(1)).delete(eq(CONFIG_RESOURCE_CHANGED_ADDRESS), any());
// await().atMost(TWO_SECONDS).until(() -> queueSplitter.getQueueSplitExecutors(), equalTo(0));
context.assertEquals(0, queueSplitter.getQueueSplitExecutors().size());
async.complete();
});
}

@Test
public void testConvertToSubQueueWithPostfixFromStatic(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
HttpServerRequest request = mock(HttpServerRequest.class);
when(request.headers()).thenReturn(new HeadersMultiMap());
context.assertEquals("my-queue-1-A", queueSplitter.convertToSubQueue("my-queue-1", request));
context.assertEquals("my-queue-1-B", queueSplitter.convertToSubQueue("my-queue-1", request));
context.assertEquals("my-queue-1-C", queueSplitter.convertToSubQueue("my-queue-1", request));
async.complete();
});
}

@Test
public void testConvertToSubQueueWithPostfixFromHeader(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
HttpServerRequest request = mock(HttpServerRequest.class);
when(request.headers()).thenReturn(new HeadersMultiMap().add("x-rp-deviceid", "A1B2C3D4E5F6"));
context.assertEquals("my-queue-2+A1B2C3D4E5F6", queueSplitter.convertToSubQueue("my-queue-2", request));
async.complete();
});
}
@Test
public void testConvertToSubQueueWithPostfixFromUrl(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
HttpServerRequest request = mock(HttpServerRequest.class);
when(request.headers()).thenReturn(new HeadersMultiMap());
when(request.uri()).thenReturn("/path1/path2/path3/path4/");
context.assertEquals("my-queue-a_path2", queueSplitter.convertToSubQueue("my-queue-a", request));
async.complete();
});
}

@Test
public void testConvertToSubQueueWithNoPostfixConfigured(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE_VALID);
context.assertFalse(queueSplitter.isInitialized());
queueSplitter.initialize().onComplete(event -> {
HttpServerRequest request = mock(HttpServerRequest.class);
when(request.headers()).thenReturn(new HeadersMultiMap());
context.assertEquals("another-queue", queueSplitter.convertToSubQueue("another-queue", request));
async.complete();
});
}
}

0 comments on commit 12b48f1

Please sign in to comment.