Skip to content

Commit

Permalink
Merge pull request #546 from SDCISA-13736-ApplyFindings-20240105-1528
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha authored Jan 10, 2024
2 parents 31e8e01 + 1418a77 commit acafdf6
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
*/
public class ConfigurationResourceManager implements LoggableResource {

private Logger log = LoggerFactory.getLogger(ConfigurationResourceManager.class);
private static final Logger log = LoggerFactory.getLogger(ConfigurationResourceManager.class);

private Vertx vertx;
private ResourceStorage storage;
private final Vertx vertx;
private final ResourceStorage storage;
private Map<String, String> registeredResources;
private Map<String, List<ConfigurationResourceObserver>> observers;
private ConfigurationResourceValidator configurationResourceValidator;
private final ConfigurationResourceValidator configurationResourceValidator;
private boolean logConfigurationResourceChanges = false;

public static final String CONFIG_RESOURCE_CHANGED_ADDRESS = "gateleen.configuration-resource-changed";
Expand Down Expand Up @@ -113,16 +113,20 @@ public boolean handleConfigurationResource(final HttpServerRequest request) {
if(HttpMethod.PUT == request.method()) {
requestLog.info("Refresh resource {}", resourceUri);
request.bodyHandler(buffer -> configurationResourceValidator.validateConfigurationResource(buffer, resourceSchema, event -> {
var rsp = request.response();
if (event.failed() || (event.succeeded() && !event.result().isSuccess())) {
requestLog.error("Could not parse configuration resource for uri '" + resourceUri + "' message: " + event.result().getMessage());
request.response().setStatusCode(StatusCode.BAD_REQUEST.getStatusCode());
request.response().setStatusMessage(StatusCode.BAD_REQUEST.getStatusMessage() + " " + event.result().getMessage());
var eventResult = event.result();
requestLog.error("Could not parse configuration resource for uri '{}' message: {}",
resourceUri, eventResult == null ? "<null>" : eventResult.getMessage());
rsp.setStatusCode(StatusCode.BAD_REQUEST.getStatusCode());
rsp.setStatusMessage(StatusCode.BAD_REQUEST.getStatusMessage() + " " +
(eventResult == null ? "<null>" : eventResult.getMessage()));
ResponseStatusCodeLogUtil.info(request, StatusCode.BAD_REQUEST, ConfigurationResourceManager.class);
if (event.result().getValidationDetails() != null) {
request.response().headers().add("content-type", "application/json");
request.response().end(event.result().getValidationDetails().encode());
if (eventResult != null && eventResult.getValidationDetails() != null) {
rsp.headers().add("content-type", "application/json");
rsp.end(eventResult.getValidationDetails().encode());
} else {
request.response().end(event.result().getMessage());
rsp.end(eventResult == null ? "<null>\n" : eventResult.getMessage());
}
} else {
storage.put(resourceUri, buffer, status -> {
Expand All @@ -135,10 +139,10 @@ public boolean handleConfigurationResource(final HttpServerRequest request) {
object.put("type", ConfigurationResourceChangeType.CHANGE);
vertx.eventBus().publish(CONFIG_RESOURCE_CHANGED_ADDRESS, object);
} else {
request.response().setStatusCode(status);
rsp.setStatusCode(status);
}
ResponseStatusCodeLogUtil.info(request, StatusCode.fromCode(status), ConfigurationResourceManager.class);
request.response().end();
rsp.end();
});
}
}));
Expand Down Expand Up @@ -188,10 +192,11 @@ private Future<Optional<Buffer>> getValidatedRegisteredResource(String resourceU
if (event.result().isSuccess()) {
promise.complete(Optional.of(buffer));
} else {
promise.fail("Failure during validation of resource " + resourceUri + ". Message: " + event.result().getMessage());
promise.fail(new Exception("Failure during validation of resource "
+ resourceUri + ". Message: " + event.result().getMessage()));
}
} else {
promise.fail("Failure during validation of resource " + resourceUri + ". Message: " + event.cause());
promise.fail(new Exception("ReleaseLockRedisCommand request failed", event.cause()));
}
});
} else {
Expand All @@ -213,7 +218,7 @@ private void notifyObserversAboutRemovedResource(String requestUri) {
private void notifyObserverAboutResourceChange(String requestUri, ConfigurationResourceObserver observer) {
getValidatedRegisteredResource(requestUri).onComplete(event -> {
if(event.failed()){
log.warn(event.cause().getMessage());
log.warn("TODO error handling", new Exception(event.cause()));
} else if(event.result().isPresent()){
if(observer != null) {
observer.resourceChanged(requestUri, event.result().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected void doWrite(Buffer chunk) {
} catch (Exception e) {
if (exceptionHandler != null) {
exceptionHandler.handle(e);
}else{
} else {
log.warn("TODO error handling", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
*/
public class RedisBasedLock implements Lock {

private Logger log = LoggerFactory.getLogger(RedisBasedLock.class);
private static final Logger log = LoggerFactory.getLogger(RedisBasedLock.class);

public static final String STORAGE_PREFIX = "gateleen.core-lock:";

private LuaScriptState releaseLockLuaScriptState;
private RedisProvider redisProvider;
private final LuaScriptState releaseLockLuaScriptState;
private final RedisProvider redisProvider;

public RedisBasedLock(RedisProvider redisProvider) {
this.redisProvider = redisProvider;
Expand All @@ -45,8 +45,18 @@ private void redisSetWithOptions(String key, String value, boolean nx, long px,
if (nx) {
options.add("NX");
}
redisProvider.redis().onSuccess(redisAPI -> redisAPI.send(Command.SET, RedisUtils.toPayload(key, value, options).toArray(new String[0]))
.onComplete(handler)).onFailure(throwable -> handler.handle(new FailedAsyncResult<>(throwable)));
redisProvider.redis().onComplete( redisEv -> {
if( redisEv.failed() ){
handler.handle(new FailedAsyncResult<>(redisEv.cause()));
return;
}
var redisAPI = redisEv.result();
redisAPI.send(Command.SET, RedisUtils.toPayload(key, value, options).toArray(new String[0]))
.onComplete( ev -> {
if( ev.failed() && log.isInfoEnabled() ) log.info("stacktrace", new Exception("stacktrace", ev.cause()));
handler.handle(ev);
});
});
}

@Override
Expand All @@ -60,6 +70,7 @@ public Future<Boolean> acquireLock(String lock, String token, long lockExpiryMs)
promise.complete(false);
}
} else {
if( log.isInfoEnabled() ) log.info("stacktrace", new Exception("stacktrace", event.cause()));
promise.fail(event.cause().getMessage());
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.swisspush.gateleen.core.lock.lua;

import io.vertx.core.Promise;
import io.vertx.redis.client.RedisAPI;
import org.slf4j.Logger;
import org.swisspush.gateleen.core.lua.LuaScriptState;
import org.swisspush.gateleen.core.lua.RedisCommand;
Expand Down Expand Up @@ -39,25 +40,33 @@ public void exec(int executionCounter) {
args.addAll(keys);
args.addAll(arguments);

redisProvider.redis().onSuccess(redisAPI -> redisAPI.evalsha(args, event -> {
if (event.succeeded()) {
Long unlocked = event.result().toLong();
promise.complete(unlocked > 0);
} else {
String message = event.cause().getMessage();
if (message != null && message.startsWith("NOSCRIPT")) {
log.warn("ReleaseLockRedisCommand script couldn't be found, reload it");
log.warn("amount the script got loaded: " + executionCounter);
if (executionCounter > 10) {
promise.fail("amount the script got loaded is higher than 10, we abort");
redisProvider.redis().onComplete( redisEv -> {
if( redisEv.failed() ){
promise.fail(new Exception("redisProvider.redis()", redisEv.cause()));
return;
}
RedisAPI redisAPI = redisEv.result();
redisAPI.evalsha(args, event -> {
if (event.succeeded()) {
Long unlocked = event.result().toLong();
promise.complete(unlocked > 0);
} else {
Throwable ex = event.cause();
String message = ex.getMessage();
if (message != null && message.startsWith("NOSCRIPT")) {
log.warn("ReleaseLockRedisCommand script couldn't be found, reload it", new Exception("stacktrace",ex));
log.warn("amount the script got loaded: {}", executionCounter);
if (executionCounter > 10) {
promise.fail(new Exception("amount the script got loaded is higher than 10, we abort"));
} else {
luaScriptState.loadLuaScript(new ReleaseLockRedisCommand(luaScriptState, keys,
arguments, redisProvider, log, promise), executionCounter);
}
} else {
luaScriptState.loadLuaScript(new ReleaseLockRedisCommand(luaScriptState, keys,
arguments, redisProvider, log, promise), executionCounter);
promise.fail(new Exception("ReleaseLockRedisCommand request failed", ex));
}
} else {
promise.fail("ReleaseLockRedisCommand request failed with message: " + message);
}
}
})).onFailure(throwable -> promise.fail("Redis: ReleaseLockRedisCommand request failed with error: " + throwable.getMessage()));
});
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.swisspush.gateleen.core.lua;

import io.vertx.redis.client.RedisAPI;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,31 +87,40 @@ private String readLuaScriptFromClasspath(LuaScript luaScriptType) {
public void loadLuaScript(final RedisCommand redisCommand, int executionCounter) {
final int executionCounterIncr = ++executionCounter;
// check first if the lua script already exists in the store
redisProvider.redis().onSuccess(redisAPI -> redisAPI.script(Arrays.asList("exists", sha), resultArray -> {
if (resultArray.failed()) {
log.error("Error checking whether lua script exists", resultArray.cause());
redisProvider.redis().onComplete( redisEv -> {
if (redisEv.failed()) {
log.error("Redis: Error checking whether lua script exists",
new Exception("stacktrace", redisEv.cause()));
return;
}
Long exists = resultArray.result().get(0).toLong();
// if script already
if (Long.valueOf(1).equals(exists)) {
log.debug("RedisStorage script already exists in redis cache: {}", luaScriptType);
redisCommand.exec(executionCounterIncr);
} else {
log.info("loading lua script for script type: {} log output: {}", luaScriptType, logoutput);
redisAPI.script(Arrays.asList("load",script), stringAsyncResult -> {
String redisSha = stringAsyncResult.result().toString();
log.info("got sha from redis for lua script: {}: {}", luaScriptType, redisSha);
if (!redisSha.equals(sha)) {
RedisAPI redisAPI = redisEv.result();
redisAPI.script(Arrays.asList("exists", sha), resultArray -> {
if (resultArray.failed()) {
log.error("Error checking whether lua script exists",
new Exception("stacktrace", resultArray.cause()));
return;
}
Long exists = resultArray.result().get(0).toLong();
// if script already
if (Long.valueOf(1).equals(exists)) {
log.debug("RedisStorage script already exists in redis cache: {}", luaScriptType);
redisCommand.exec(executionCounterIncr);
} else {
log.info("loading lua script for script type: {} log output: {}", luaScriptType, logoutput);
redisAPI.script(Arrays.asList("load", script), stringAsyncResult -> {
String redisSha = stringAsyncResult.result().toString();
log.info("got sha from redis for lua script: {}: {}", luaScriptType, redisSha);
if (!redisSha.equals(sha)) {
log.warn("the sha calculated by myself: {} doesn't match with the sha from redis: {}. " +
"We use the sha from redis", sha, redisSha);
}
sha = redisSha;
log.info("execute redis command for script type: {} with new sha: {}", luaScriptType, sha);
redisCommand.exec(executionCounterIncr);
});
}
})).onFailure(throwable -> log.error("Redis: Error checking whether lua script exists", throwable));
}
sha = redisSha;
log.info("execute redis command for script type: {} with new sha: {}", luaScriptType, sha);
redisCommand.exec(executionCounterIncr);
});
}
});
});
}

public String getScript() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static void releaseLock(Lock lockImpl, String lock, String token, Logger
log.debug("Released lock '{}' with token '{}'", lock, token);
}
} else {
log.error("Could not release lock '{}'. Message: {}", lock, releaseEvent.cause().getMessage());
log.error("Could not release lock '{}'.", lock, new Exception("stacktrace", releaseEvent.cause()));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,6 @@ public void testReleaseLockError(TestContext context) {
Mockito.when(lock.releaseLock(anyString(), anyString())).thenReturn(Future.failedFuture("Booom"));
LockUtil.releaseLock(lock, "someLock", "someToken", log);
Mockito.verify(log, Mockito.times(1)).debug(eq("Trying to release lock '{}' with token '{}'"), eq("someLock"), eq("someToken"));
Mockito.verify(log, Mockito.times(1)).error(eq("Could not release lock '{}'. Message: {}"), eq("someLock"), eq("Booom"));
Mockito.verify(log, Mockito.times(1)).error(eq("Could not release lock '{}'."), eq("someLock"), isA(Throwable.class));
}
}

0 comments on commit acafdf6

Please sign in to comment.