Skip to content

Commit

Permalink
Merge pull request #530 from swisspost/feature/appender_repository
Browse files Browse the repository at this point in the history
#529 EventBusAppender instance pollution in Java Heap Space
  • Loading branch information
mcweba authored Nov 22, 2023
2 parents 289da09 + 68ad87b commit b4d447b
Show file tree
Hide file tree
Showing 21 changed files with 308 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.swisspush.gateleen.core.util.*;
import org.swisspush.gateleen.hook.queueingstrategy.*;
import org.swisspush.gateleen.hook.reducedpropagation.ReducedPropagationManager;
import org.swisspush.gateleen.logging.LogAppenderRepository;
import org.swisspush.gateleen.logging.LoggingResourceManager;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;
Expand Down Expand Up @@ -107,6 +108,7 @@ public class HookHandler implements LoggableResource {
private final ResourceStorage hookStorage;
private final MonitoringHandler monitoringHandler;
private final LoggingResourceManager loggingResourceManager;
private final LogAppenderRepository logAppenderRepository;
private final HttpClient selfClient;
private final String userProfilePath;
private final String hookRootUri;
Expand Down Expand Up @@ -137,9 +139,9 @@ public class HookHandler implements LoggableResource {
* @param hookRootUri hookRootUri
*/
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,

Check warning on line 144 in gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java#L144

Added line #L144 was not covered by tests
new QueueClient(vertx, monitoringHandler));
}

Expand All @@ -157,16 +159,16 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage sto
* @param requestQueue requestQueue
*/
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,

Check warning on line 164 in gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java#L164

Added line #L164 was not covered by tests
requestQueue, false);
}

public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,

Check warning on line 171 in gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-hook/src/main/java/org/swisspush/gateleen/hook/HookHandler.java#L171

Added line #L171 was not covered by tests
requestQueue, false, null);
}

Expand All @@ -185,18 +187,18 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage sto
* @param reducedPropagationManager reducedPropagationManager
*/
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,
requestQueue, listableRoutes, reducedPropagationManager, null, storage);
}

public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage storage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager, Handler doneHandler, ResourceStorage hookStorage) {
this(vertx, selfClient, storage, loggingResourceManager, monitoringHandler, userProfilePath, hookRootUri,
this(vertx, selfClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler, userProfilePath, hookRootUri,
requestQueue, listableRoutes, reducedPropagationManager, doneHandler, hookStorage, Router.DEFAULT_ROUTER_MULTIPLIER);
}

Expand All @@ -220,7 +222,7 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage sto
* the number of {@link Router} instances within a cluster
*/
public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage userProfileStorage,
LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler,
LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, MonitoringHandler monitoringHandler,
String userProfilePath, String hookRootUri, RequestQueue requestQueue, boolean listableRoutes,
ReducedPropagationManager reducedPropagationManager, Handler doneHandler, ResourceStorage hookStorage,
int routeMultiplier) {
Expand All @@ -229,6 +231,7 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage use
this.selfClient = selfClient;
this.userProfileStorage = userProfileStorage;
this.loggingResourceManager = loggingResourceManager;
this.logAppenderRepository = logAppenderRepository;
this.monitoringHandler = monitoringHandler;
this.userProfilePath = userProfilePath;
this.hookRootUri = hookRootUri;
Expand Down Expand Up @@ -1618,7 +1621,8 @@ private boolean headersFilterPatternEquals(Pattern headersFilterPatternLeft, Pat
* @return Route
*/
private Route createRoute(String urlPattern, HttpHook hook) {
return new Route(vertx, userProfileStorage, loggingResourceManager, monitoringHandler, userProfilePath, hook, urlPattern, selfClient);
return new Route(vertx, userProfileStorage, loggingResourceManager, logAppenderRepository, monitoringHandler,
userProfilePath, hook, urlPattern, selfClient);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.storage.ResourceStorage;
import org.swisspush.gateleen.logging.LogAppenderRepository;
import org.swisspush.gateleen.logging.LoggingResourceManager;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.routing.Forwarder;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class Route {

private Vertx vertx;
private LoggingResourceManager loggingResourceManager;
private LogAppenderRepository logAppenderRepository;
private MonitoringHandler monitoringHandler;
private String userProfilePath;
private ResourceStorage storage;
Expand Down Expand Up @@ -76,10 +78,12 @@ public class Route {
* @param httpHook httpHook
* @param urlPattern - this can be a listener or a normal urlPattern (eg. for a route)
*/
public Route(Vertx vertx, ResourceStorage storage, LoggingResourceManager loggingResourceManager, MonitoringHandler monitoringHandler, String userProfilePath, HttpHook httpHook, String urlPattern, HttpClient selfClient) {
public Route(Vertx vertx, ResourceStorage storage, LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository,
MonitoringHandler monitoringHandler, String userProfilePath, HttpHook httpHook, String urlPattern, HttpClient selfClient) {
this.vertx = vertx;
this.storage = storage;
this.loggingResourceManager = loggingResourceManager;
this.logAppenderRepository = logAppenderRepository;
this.monitoringHandler = monitoringHandler;
this.userProfilePath = userProfilePath;
this.httpHook = httpHook;
Expand All @@ -97,7 +101,8 @@ public Route(Vertx vertx, ResourceStorage storage, LoggingResourceManager loggin
* Creates the forwarder for this hook.
*/
private void createForwarder() {
forwarder = new Forwarder(vertx, client, rule, storage, loggingResourceManager, monitoringHandler, userProfilePath, null);
forwarder = new Forwarder(vertx, client, rule, storage, loggingResourceManager, logAppenderRepository,
monitoringHandler, userProfilePath, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.swisspush.gateleen.core.http.*;
import org.swisspush.gateleen.core.storage.MockResourceStorage;
import org.swisspush.gateleen.hook.reducedpropagation.ReducedPropagationManager;
import org.swisspush.gateleen.logging.LogAppenderRepository;
import org.swisspush.gateleen.logging.LoggingResourceManager;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;
Expand Down Expand Up @@ -49,6 +50,7 @@ public class HookHandlerTest {
private HttpClient httpClient;
private MockResourceStorage storage;
private LoggingResourceManager loggingResourceManager;
private LogAppenderRepository logAppenderRepository;
private MonitoringHandler monitoringHandler;
private RequestQueue requestQueue;
private ReducedPropagationManager reducedPropagationManager;
Expand All @@ -66,12 +68,13 @@ public void setUp() {
Mockito.when(httpClient.request(any(HttpMethod.class), anyString())).thenReturn(Mockito.mock(Future.class));
storage = new MockResourceStorage();
loggingResourceManager = Mockito.mock(LoggingResourceManager.class);
logAppenderRepository = Mockito.mock(LogAppenderRepository.class);
monitoringHandler = Mockito.mock(MonitoringHandler.class);
requestQueue = Mockito.mock(RequestQueue.class);
reducedPropagationManager = Mockito.mock(ReducedPropagationManager.class);


hookHandler = new HookHandler(vertx, httpClient, storage, loggingResourceManager, monitoringHandler,
hookHandler = new HookHandler(vertx, httpClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler,
"userProfilePath", HOOK_ROOT_URI, requestQueue, false, reducedPropagationManager);
hookHandler.init();
}
Expand Down Expand Up @@ -218,7 +221,7 @@ public boolean matches(Object argument) {

@Test
public void testListenerEnqueueWithReducedPropagationQueueingStrategyButNoManager(TestContext context) throws InterruptedException {
hookHandler = new HookHandler(vertx, httpClient, storage, loggingResourceManager, monitoringHandler,
hookHandler = new HookHandler(vertx, httpClient, storage, loggingResourceManager, logAppenderRepository, monitoringHandler,
"userProfilePath", HOOK_ROOT_URI, requestQueue, false, null);
hookHandler.init();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.swisspush.gateleen.logging;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import org.apache.logging.log4j.core.Appender;

import java.util.HashMap;
import java.util.Map;

import static org.swisspush.gateleen.logging.LoggingResourceManager.UPDATE_ADDRESS;

/**
* Default implementation of the {@link LogAppenderRepository} caching the {@link Appender} instances in a {@link Map}
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class DefaultLogAppenderRepository implements LogAppenderRepository {

private Map<String, Appender> appenderMap = new HashMap<>();

public DefaultLogAppenderRepository(Vertx vertx) {
vertx.eventBus().consumer(UPDATE_ADDRESS, (Handler<Message<Boolean>>) event -> clearRepository());
}

@Override
public boolean hasAppender(String name) {
return appenderMap.containsKey(name);
}

@Override
public void addAppender(String name, Appender appender) {
appenderMap.put(name, appender);
}

@Override
public Appender getAppender(String name) {
return appenderMap.get(name);
}

@Override
public void clearRepository() {
appenderMap.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.swisspush.gateleen.logging;

import org.apache.logging.log4j.core.Appender;

/**
* A repository holding {@link Appender} instances. The repository allows to reuse an appender
* instead of creating a new one for every log statement
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public interface LogAppenderRepository {

boolean hasAppender(String name);

void addAppender(String name, Appender appender);

Appender getAppender(String name);

void clearRepository();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class LoggingHandler {
private Buffer responsePayload;
private LoggingResource loggingResource;
private EventBus eventBus;
private LogAppenderRepository logAppenderRepository;

private String currentDestination;

Expand All @@ -64,11 +65,11 @@ public class LoggingHandler {
private static final String DEFAULT = "default";

private Map<String, org.apache.logging.log4j.Logger> loggers = new HashMap<>();
private Map<String, Appender> appenders = new HashMap<>();

private Logger log;

public LoggingHandler(LoggingResourceManager loggingResourceManager, HttpServerRequest request, EventBus eventBus) {
public LoggingHandler(LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository, HttpServerRequest request, EventBus eventBus) {
this.logAppenderRepository = logAppenderRepository;
this.request = request;
this.eventBus = eventBus;
this.loggingResource = loggingResourceManager.getLoggingResource();
Expand Down Expand Up @@ -187,7 +188,7 @@ private String createLoggerAndGetDestination(Map<String, String> payloadFilter)
* @return
*/
private Appender getEventBusAppender(String filterDestination, Map<String, String> destinationOptions) {
if (!appenders.containsKey(filterDestination)) {
if (!logAppenderRepository.hasAppender(filterDestination)) {

/*
* <appender name="requestLogEventBusAppender" class="EventBusAppender">
Expand All @@ -204,9 +205,9 @@ private Appender getEventBusAppender(String filterDestination, Map<String, Strin
.add(META_DATA, destinationOptions.get(META_DATA)))
.setTransmissionMode(EventBusWriter.TransmissionMode.fromString(destinationOptions.get(TRANSMISSION)))
.setLayout(PatternLayout.createDefaultLayout()).build();
appenders.put(filterDestination, appender);
logAppenderRepository.addAppender(filterDestination, appender);
}
return appenders.get(filterDestination);
return logAppenderRepository.getAppender(filterDestination);
}

/**
Expand All @@ -220,7 +221,7 @@ private Appender getEventBusAppender(String filterDestination, Map<String, Strin
* @return
*/
private Appender getFileAppender(String filterDestination, String fileName) {
if (!appenders.containsKey(filterDestination)) {
if (!logAppenderRepository.hasAppender(filterDestination)) {

/*
* <appender name="requestLogFileAppender" class="org.apache.log4j.DailyRollingFileAppender">
Expand All @@ -242,10 +243,10 @@ private Appender getFileAppender(String filterDestination, String fileName) {
builder.withAppend(true);
PatternLayout layout = PatternLayout.createDefaultLayout();
builder.setLayout(layout);
appenders.put(filterDestination, builder.build());
logAppenderRepository.addAppender(filterDestination, builder.build());

Check warning on line 246 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java#L246

Added line #L246 was not covered by tests
}

return appenders.get(filterDestination);
return logAppenderRepository.getAppender(filterDestination);

Check warning on line 249 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/LoggingHandler.java#L249

Added line #L249 was not covered by tests
}

public void setResponse(HttpClientResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/
public class LoggingResourceManager implements LoggableResource {

private static final String UPDATE_ADDRESS = "gateleen.logging-updated";
static final String UPDATE_ADDRESS = "gateleen.logging-updated";

private final String loggingUri;
private final ResourceStorage storage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
public class RequestLoggingConsumer {
private final Vertx vertx;
private final LoggingResourceManager loggingResourceManager;
private final LogAppenderRepository logAppenderRepository;

public RequestLoggingConsumer(Vertx vertx, LoggingResourceManager loggingResourceManager) {
public RequestLoggingConsumer(Vertx vertx, LoggingResourceManager loggingResourceManager, LogAppenderRepository logAppenderRepository) {

Check warning on line 29 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java#L29

Added line #L29 was not covered by tests
this.vertx = vertx;
this.loggingResourceManager = loggingResourceManager;
this.logAppenderRepository = logAppenderRepository;

Check warning on line 32 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java#L32

Added line #L32 was not covered by tests

vertx.eventBus().localConsumer(Address.requestLoggingConsumerAddress(), (Handler<Message<JsonObject>>) event -> {
try {
Expand Down Expand Up @@ -66,7 +68,7 @@ public RequestLoggingConsumer(Vertx vertx, LoggingResourceManager loggingResourc
* @param responseHeaders the response headers
*/
private void logRequest(final HttpServerRequest request, final int status, Buffer data, final MultiMap responseHeaders) {
final LoggingHandler loggingHandler = new LoggingHandler(loggingResourceManager, request, vertx.eventBus());
final LoggingHandler loggingHandler = new LoggingHandler(loggingResourceManager, logAppenderRepository, request, vertx.eventBus());

Check warning on line 71 in gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java

View check run for this annotation

Codecov / codecov/patch

gateleen-logging/src/main/java/org/swisspush/gateleen/logging/RequestLoggingConsumer.java#L71

Added line #L71 was not covered by tests
if (HttpMethod.PUT == request.method() || HttpMethod.POST == request.method()) {
loggingHandler.appendRequestPayload(data);
} else if (HttpMethod.GET == request.method()) {
Expand Down
Loading

0 comments on commit b4d447b

Please sign in to comment.