Skip to content

Commit

Permalink
#550 QueueSplitter for HookHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
guentherm committed Apr 6, 2024
1 parent d944a59 commit d890d5e
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.*;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonArray;
Expand All @@ -26,8 +31,17 @@
import org.swisspush.gateleen.core.logging.LoggableResource;
import org.swisspush.gateleen.core.logging.RequestLogger;
import org.swisspush.gateleen.core.storage.ResourceStorage;
import org.swisspush.gateleen.core.util.*;
import org.swisspush.gateleen.hook.queueingstrategy.*;
import org.swisspush.gateleen.core.util.CollectionContentComparator;
import org.swisspush.gateleen.core.util.HttpHeaderUtil;
import org.swisspush.gateleen.core.util.HttpRequestHeader;
import org.swisspush.gateleen.core.util.HttpServerRequestUtil;
import org.swisspush.gateleen.core.util.ResourcesUtils;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.hook.queueingstrategy.DefaultQueueingStrategy;
import org.swisspush.gateleen.hook.queueingstrategy.DiscardPayloadQueueingStrategy;
import org.swisspush.gateleen.hook.queueingstrategy.QueueingStrategy;
import org.swisspush.gateleen.hook.queueingstrategy.QueueingStrategyFactory;
import org.swisspush.gateleen.hook.queueingstrategy.ReducedPropagationQueueingStrategy;
import org.swisspush.gateleen.hook.reducedpropagation.ReducedPropagationManager;
import org.swisspush.gateleen.logging.LogAppenderRepository;
import org.swisspush.gateleen.logging.LoggingResourceManager;
Expand All @@ -36,13 +50,24 @@
import org.swisspush.gateleen.queue.queuing.QueueClient;
import org.swisspush.gateleen.queue.queuing.QueueProcessor;
import org.swisspush.gateleen.queue.queuing.RequestQueue;
import org.swisspush.gateleen.queue.queuing.splitter.NoOpQueueSplitter;
import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter;
import org.swisspush.gateleen.routing.Router;
import org.swisspush.gateleen.routing.Rule;
import org.swisspush.gateleen.routing.RuleFactory;
import org.swisspush.gateleen.validation.RegexpValidator;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.*;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -126,6 +151,8 @@ public class HookHandler implements LoggableResource {
private final JsonSchema jsonSchemaHook;
private int routeMultiplier;

private final QueueSplitter queueSplitter;


/**
* Creates a new HookHandler.
Expand Down Expand Up @@ -189,19 +216,28 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage sto
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager) {
@Nullable ReducedPropagationManager reducedPropagationManager) {
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,
requestQueue, listableRoutes, reducedPropagationManager, null, storage);
}

public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage userProfileStorage,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager, Handler doneHandler, ResourceStorage hookStorage) {
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,
ReducedPropagationManager reducedPropagationManager, @Nullable Handler doneHandler, ResourceStorage hookStorage) {
this(vertx, selfClient, userProfileStorage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,
requestQueue, listableRoutes, reducedPropagationManager, doneHandler, hookStorage, Router.DEFAULT_ROUTER_MULTIPLIER);
}

public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage userProfileStorage,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager, @Nullable Handler doneHandler, ResourceStorage hookStorage,
int routeMultiplier) {
this(vertx, selfClient, userProfileStorage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,
requestQueue, listableRoutes, reducedPropagationManager, doneHandler, hookStorage, routeMultiplier, new NoOpQueueSplitter());
}

/**
* Creates a new HookHandler.
*
Expand All @@ -224,8 +260,8 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage sto
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage userProfileStorage,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager, Handler doneHandler, ResourceStorage hookStorage,
int routeMultiplier) {
ReducedPropagationManager reducedPropagationManager, @Nullable Handler doneHandler, ResourceStorage hookStorage,
int routeMultiplier, @Nonnull QueueSplitter queueSplitter) {
log.debug("Creating HookHandler ...");
this.vertx = vertx;
this.selfClient = selfClient;
Expand All @@ -244,7 +280,7 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage use
this.doneHandler = doneHandler;
this.hookStorage = hookStorage;
this.routeMultiplier = routeMultiplier;

this.queueSplitter = queueSplitter;
String hookSchema = ResourcesUtils.loadResource("gateleen_hooking_schema_hook", true);
jsonSchemaHook = JsonSchemaFactory.getInstance().getSchema(hookSchema);
}
Expand Down Expand Up @@ -605,11 +641,11 @@ private boolean createListingIfRequested(final HttpServerRequest request) {

request.response().headers().remove(HOOK_ROUTES_LISTED);

// if everything is fine, we add the listed collections to the given array
if (response.statusCode() == StatusCode.OK.getStatusCode()) {
if (log.isTraceEnabled()) {
log.trace("createListingIfRequested > use existing array");
}
// if everything is fine, we add the listed collections to the given array
if (response.statusCode() == StatusCode.OK.getStatusCode()) {
if (log.isTraceEnabled()) {
log.trace("createListingIfRequested > use existing array");
}

response.handler(data -> {
JsonObject responseObject = new JsonObject(data.toString());
Expand Down Expand Up @@ -731,7 +767,7 @@ private void installBodyHandler(final RoutingContext ctx, final List<Listener> l
/**
* Calls the passed listeners and passes the given handler to the enqueued listener requests.
*
* @param ctx original request context
* @param ctx original request context
* @param buffer buffer
* @param filteredListeners all listeners which should be called
* @param handler the handler, which should handle the requests
Expand Down Expand Up @@ -783,11 +819,13 @@ private void callListener(RoutingContext ctx, final Buffer buffer, final List<Li
// if there is an x-queue header (after applying the header manipulator chain!),
// then directly enqueue to this queue - else enqueue to a queue named alike this listener hook
String queue = queueHeaders.get(X_QUEUE);

if (queue == null) {
queue = LISTENER_QUEUE_PREFIX + "-" + listener.getListenerId(); // default queue name for this listener hook
} else {
queueHeaders.remove(X_QUEUE); // remove the "x-queue" header - otherwise we take a second turn through the queue
}
queue = queueSplitter.convertToSubQueue(queue, request);

QueueingStrategy queueingStrategy = listener.getHook().getQueueingStrategy();

Expand Down Expand Up @@ -827,7 +865,7 @@ private void callListener(RoutingContext ctx, final Buffer buffer, final List<Li
* The handler calls all listener (after), so this requests happen AFTER the original
* request is performed.
*
* @param ctx original request context
* @param ctx original request context
* @param buffer buffer
* @param afterListener list of listeners which should be called after the original request
* @return the after handler
Expand All @@ -842,7 +880,7 @@ private Handler<Void> installAfterHandler(final RoutingContext ctx, final Buffer
* The request happens BEFORE the original request is
* performed.
*
* @param ctx original request context
* @param ctx original request context
* @param buffer buffer
* @param beforeListener list of listeners which should be called before the original request
* @param afterHandler the handler for listeners which have to be called after the original request
Expand Down Expand Up @@ -1586,17 +1624,17 @@ private void registerRoute(Buffer buffer) {
private boolean mustCreateNewRouteForHook(Route existingRoute, HttpHook newHook) {
HttpHook oldHook = existingRoute.getHook();
boolean same;
same = Objects.equals(oldHook.getDestination() , newHook.getDestination ());
same &= Objects.equals(oldHook.getMethods (), newHook.getMethods ());
same &= Objects.equals(oldHook.getTranslateStatus (), newHook.getTranslateStatus ());
same &= oldHook.isCollection () == newHook.isCollection () ;
same &= oldHook.isFullUrl () == newHook.isFullUrl () ;
same &= oldHook.isListable () == newHook.isListable () ;
same &= oldHook.isCollection () == newHook.isCollection () ;
same &= oldHook.isCollection () == newHook.isCollection () ;
same &= Objects.equals(oldHook.getConnectionPoolSize() , newHook.getConnectionPoolSize());
same &= Objects.equals(oldHook.getMaxWaitQueueSize() , newHook.getMaxWaitQueueSize ());
same &= Objects.equals(oldHook.getTimeout(), newHook.getTimeout ());
same = Objects.equals(oldHook.getDestination(), newHook.getDestination());
same &= Objects.equals(oldHook.getMethods(), newHook.getMethods());
same &= Objects.equals(oldHook.getTranslateStatus(), newHook.getTranslateStatus());
same &= oldHook.isCollection() == newHook.isCollection();
same &= oldHook.isFullUrl() == newHook.isFullUrl();
same &= oldHook.isListable() == newHook.isListable();
same &= oldHook.isCollection() == newHook.isCollection();
same &= oldHook.isCollection() == newHook.isCollection();
same &= Objects.equals(oldHook.getConnectionPoolSize(), newHook.getConnectionPoolSize());
same &= Objects.equals(oldHook.getMaxWaitQueueSize(), newHook.getMaxWaitQueueSize());
same &= Objects.equals(oldHook.getTimeout(), newHook.getTimeout());
same &= headersFilterPatternEquals(oldHook.getHeadersFilterPattern(), newHook.getHeadersFilterPattern());

// queueingStrategy, filter, queueExpireAfter and hookTriggerType are not relevant for Route-Hooks
Expand All @@ -1606,7 +1644,7 @@ private boolean mustCreateNewRouteForHook(Route existingRoute, HttpHook newHook)
}

private boolean headersFilterPatternEquals(Pattern headersFilterPatternLeft, Pattern headersFilterPatternRight) {
if(headersFilterPatternLeft != null && headersFilterPatternRight != null){
if (headersFilterPatternLeft != null && headersFilterPatternRight != null) {
return Objects.equals(headersFilterPatternLeft.pattern(), headersFilterPatternRight.pattern());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;
import org.swisspush.gateleen.queue.queuing.RequestQueue;
import org.swisspush.gateleen.queue.queuing.splitter.NoOpQueueSplitter;
import org.swisspush.gateleen.routing.Router;

import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void start() {

hookHandler = new HookHandler(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository,
monitoringHandler,SERVER_ROOT + "/users/v1/%s/profile",
SERVER_ROOT + "/hooks/v1/", queueClient,false, reducedPropagationManager);
SERVER_ROOT + "/hooks/v1/", queueClient,false, reducedPropagationManager, null, storage, Router.DEFAULT_ROUTER_MULTIPLIER, queueSplitter);
hookHandler.enableResourceLogging(true);

authorizer = new Authorizer(vertx, storage, SERVER_ROOT + "/security/v1/", ROLE_PATTERN, ROLE_PREFIX, props);
Expand Down

0 comments on commit d890d5e

Please sign in to comment.