Skip to content

Commit

Permalink
Merge splitt-off changes back to feature from develop
Browse files Browse the repository at this point in the history
OMG! What a conflict mess.

Conflicts:
      src/main/java/org/swisspush/redisques/QueueStatsService.java
      src/main/java/org/swisspush/redisques/RedisQues.java
      src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java
      src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java
      src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java
      src/main/java/org/swisspush/redisques/util/QueueActionFactory.java
      src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java

Related: SDCISA-15833, swisspost#170, swisspost#177, swisspost/vertx-rest-storage#186, swisspost/gateleen#577
  • Loading branch information
hiddenalpha committed May 24, 2024
2 parents b00ebff + af32aab commit 0d93d87
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 156 deletions.
24 changes: 16 additions & 8 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.exception.NoStacktraceException;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.DequeueStatisticCollector;
Expand Down Expand Up @@ -48,21 +49,24 @@ public class QueueStatsService {
private final String redisquesAddress;
private final QueueStatisticsCollector queueStatisticsCollector;
private final DequeueStatisticCollector dequeueStatisticCollector;
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore incomingRequestQuota;

public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
Semaphore incomingRequestQuota
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota
) {
this.vertx = vertx;
this.eventBus = eventBus;
this.redisquesAddress = redisquesAddress;
this.queueStatisticsCollector = queueStatisticsCollector;
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.exceptionFactory = exceptionFactory;
this.incomingRequestQuota = incomingRequestQuota;
}

Expand Down Expand Up @@ -114,13 +118,17 @@ private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsu
JsonObject operation = buildGetQueuesItemsCountOperation(filter);
eventBus.<JsonObject>request(redisquesAddress, operation, ev -> {
if (ev.failed()) {
onDone.accept(new NoStacktraceException("error_QzkCACMbAgCgOwIA", ev.cause()), req);
Throwable ex = exceptionFactory.newException("eventBus.request()", ev.cause());
assert ex != null;
onDone.accept(ex, null);
return;
}
JsonObject body = ev.result().body();
String status = body.getString(STATUS);
if (!OK.equals(status)) {
onDone.accept(new NoStacktraceException("error_aXACAPcbAgBLGgIABnAC: " + status), null);
Throwable ex = exceptionFactory.newException("Unexpected status " + status);
assert ex != null;
onDone.accept(ex, null);
return;
}
JsonArray queuesJsonArr = body.getJsonArray(QUEUES);
Expand Down
247 changes: 145 additions & 102 deletions src/main/java/org/swisspush/redisques/RedisQues.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.ExceptionFactory;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.handler.GetQueuesItemsCountHandler;
import org.swisspush.redisques.util.*;

Expand All @@ -20,7 +20,7 @@
*/
public class GetQueuesItemsCountAction extends AbstractQueueAction {

private final ExceptionFactory exceptionFactory;
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore redisRequestQuota;

public GetQueuesItemsCountAction(
Expand All @@ -32,7 +32,7 @@ public GetQueuesItemsCountAction(
String consumersPrefix,
String locksKey,
List<QueueConfiguration> queueConfigurations,
ExceptionFactory exceptionFactory,
RedisQuesExceptionFactory exceptionFactory,
Semaphore redisRequestQuota,
QueueStatisticsCollector queueStatisticsCollector,
Logger log
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.swisspush.redisques.exception;

import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;


/**
* Applies dependency inversion for exception instantiation.
*
* This class did arise because we had different use cases in different
* applications. One of them has the need to perform fine-grained error
* reporting. Whereas in the other application this led to performance issues.
* So now through this abstraction, both applications can choose the behavior
* they need.
*
* If dependency-injection gets applied properly, an app can even provide its
* custom implementation to fine-tune the exact behavior even further.
*/
public interface RedisQuesExceptionFactory {

public default Exception newException(String message) { return newException(message, null); }

public default Exception newException(Throwable cause) { return newException(null, cause); }

public Exception newException(String message, Throwable cause);

public default RuntimeException newRuntimeException(String message) { return newRuntimeException(message, null); }

public default RuntimeException newRuntimeException(Throwable cause) { return newRuntimeException(null, cause); }

public RuntimeException newRuntimeException(String message, Throwable cause);

public ReplyException newReplyException(ReplyFailure failureType, int failureCode, String msg);


/**
* See {@link ThriftyRedisQuesExceptionFactory}.
*/
public static RedisQuesExceptionFactory newThriftyExceptionFactory() {
return new ThriftyRedisQuesExceptionFactory();
}

/**
* See {@link WastefulRedisQuesExceptionFactory}.
*/
public static RedisQuesExceptionFactory newWastefulExceptionFactory() {
return new WastefulRedisQuesExceptionFactory();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.swisspush.redisques.exception;

import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;

/**
* Trades maintainability for speed. For example prefers lightweight
* exceptions without stacktrace recording. It may even decide to drop 'cause'
* and 'suppressed' exceptions. If an app needs more error details it should use
* {@link WastefulRedisQuesExceptionFactory}. If none of those fits the apps needs, it
* can provide its own implementation.
*/
class ThriftyRedisQuesExceptionFactory implements RedisQuesExceptionFactory {

ThriftyRedisQuesExceptionFactory() {
}

public Exception newException(String message, Throwable cause) {
// This impl exists for speed. So why even bother creating new instances
// if we can use already existing ones. If caller really needs another
// instance, he should use another implementation of this factory.
if (cause instanceof Exception) return (Exception) cause;
return new NoStacktraceException(message, cause);
}

@Override
public RuntimeException newRuntimeException(String message, Throwable cause) {
// This impl exists for speed. So why even bother creating new instances
// if we can use already existing ones. If caller really needs another
// instance, he should use another implementation of this factory.
if (cause instanceof RuntimeException) return (RuntimeException) cause;
return new NoStacktraceException(message, cause);
}

@Override
public ReplyException newReplyException(ReplyFailure failureType, int failureCode, String msg) {
return new NoStackReplyException(failureType, failureCode, msg);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.swisspush.redisques.exception;

import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;

/**
* Trades speed for maintainability. For example invests more resources like
* recording stack traces (which likely provocates more logs) to get easier
* to debug error messages and better hints of what is happening. It also
* keeps details like 'causes' and 'suppressed' exceptions. If an app needs
* more error details it should use {@link WastefulRedisQuesExceptionFactory}. If none
* of those fits the apps needs, it can provide its own implementation.
*/
class WastefulRedisQuesExceptionFactory implements RedisQuesExceptionFactory {

WastefulRedisQuesExceptionFactory() {
}

public Exception newException(String message, Throwable cause) {
return new Exception(message, cause);
}

@Override
public RuntimeException newRuntimeException(String message, Throwable cause) {
return new RuntimeException(message, cause);
}

@Override
public ReplyException newReplyException(ReplyFailure failureType, int failureCode, String msg) {
return new ReplyException(failureType, failureCode, msg);
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package org.swisspush.redisques.handler;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.json.JsonArray;
Expand All @@ -16,14 +14,9 @@
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.action.GetQueuesItemsCountAction;
import org.swisspush.redisques.exception.ExceptionFactory;
import org.swisspush.redisques.exception.NoStackReplyException;
import org.swisspush.redisques.exception.NoStacktraceException;
import org.swisspush.redisques.performance.UpperBoundParallel;
import org.swisspush.redisques.util.HandlerUtil;
import org.swisspush.redisques.util.RedisProvider;
import org.swisspush.redisques.util.RedisquesAPI.QueueOperation;

import java.util.Iterator;
import java.util.List;
Expand All @@ -32,6 +25,8 @@
import java.util.function.BiConsumer;
import java.util.regex.Pattern;

import org.swisspush.redisques.exception.RedisQuesExceptionFactory;

import static java.lang.System.currentTimeMillis;
import static org.swisspush.redisques.util.RedisquesAPI.ERROR;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME;
Expand All @@ -40,6 +35,7 @@
import static org.swisspush.redisques.util.RedisquesAPI.QUEUES;
import static org.swisspush.redisques.util.RedisquesAPI.STATUS;


public class GetQueuesItemsCountHandler implements Handler<AsyncResult<Response>> {

private final Logger log = LoggerFactory.getLogger(GetQueuesItemsCountHandler.class);
Expand All @@ -50,7 +46,7 @@ public class GetQueuesItemsCountHandler implements Handler<AsyncResult<Response>
private final String queuesPrefix;
private final RedisProvider redisProvider;
private final UpperBoundParallel upperBoundParallel;
private final ExceptionFactory exceptionFactory;
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore redisRequestQuota;

public GetQueuesItemsCountHandler(
Expand All @@ -59,7 +55,7 @@ public GetQueuesItemsCountHandler(
Optional<Pattern> filterPattern,
String queuesPrefix,
RedisProvider redisProvider,
ExceptionFactory exceptionFactory,
RedisQuesExceptionFactory exceptionFactory,
Semaphore redisRequestQuota
) {
this.vertx = vertx;
Expand All @@ -75,11 +71,11 @@ public GetQueuesItemsCountHandler(
@Override
public void handle(AsyncResult<Response> handleQueues) {
if (!handleQueues.succeeded()) {
log.warn("Concealed error", new Exception(handleQueues.cause()));
log.warn("Concealed error", exceptionFactory.newException(handleQueues.cause()));
event.reply(new JsonObject().put(STATUS, ERROR));
return;
}
var ctx = new Object() {
var ctx = new Object(){
Redis redis;
Iterator<String> iter;
List<String> queues = HandlerUtil.filterByPattern(handleQueues.result(), filterPattern);
Expand All @@ -96,26 +92,28 @@ public void handle(AsyncResult<Response> handleQueues) {
"Too many simultaneous '" + GetQueuesItemsCountHandler.class.getSimpleName() + "' requests in progress"));
return;
}
redisProvider.connection().compose((Redis redis_) -> {
redisProvider.connection().<Void>compose((Redis redis_) -> {
ctx.redis = redis_;
ctx.queueLengths = new int[ctx.queues.size()];
ctx.iter = ctx.queues.iterator();
var p = Promise.<Void>promise();
upperBoundParallel.request(redisRequestQuota, null, new UpperBoundParallel.Mentor<Void>() {
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void unused) {
/*TODO rename 'onDone' to 'onLlenDone' or similar*/
if (ctx.iter.hasNext()) {
String queue = ctx.iter.next();
int iNum = ctx.iNumberResult++;
ctx.redis.send(Request.cmd(Command.LLEN, queuesPrefix + queue)).onSuccess((Response rsp) -> {
ctx.queueLengths[iNum] = rsp.toInteger();
onDone.accept(null, null);
}).onFailure(ex -> {
}).onFailure((Throwable ex) -> {
onDone.accept(ex, null);
});
}
return ctx.iter.hasNext();
}
@Override public boolean onError(Throwable ex, Void ctx_) {
/*TODO use exceptionFactory*/
log.error("Unexpected queue length result", new Exception(ex));
event.reply(new JsonObject().put(STATUS, ERROR));
return false;
Expand All @@ -125,11 +123,12 @@ public void handle(AsyncResult<Response> handleQueues) {
}
});
return p.future();
}).compose((Void v) -> {
}).<JsonObject>compose((Void v) -> {
/*going to waste another threads time to produce those garbage objects*/
return vertx.<JsonObject>executeBlocking((Promise<JsonObject> workerPromise) -> {
assert !Thread.currentThread().getName().toUpperCase().contains("EVENTLOOP");
long beginEpchMs = currentTimeMillis();

JsonArray result = new JsonArray();
for (int i = 0; i < ctx.queueLengths.length; ++i) {
String queueName = ctx.queues.get(i);
Expand All @@ -146,8 +145,8 @@ public void handle(AsyncResult<Response> handleQueues) {
}).onSuccess((JsonObject json) -> {
log.trace("call event.reply(json)");
event.reply(json);
}).onFailure(ex -> {
log.warn("Redis: Failed to get queue length (error_c3gCAEFrAgChbAIAdhwC)", ex);
}).onFailure((Throwable ex) -> {
log.warn("Redis: Failed to get queue length.", exceptionFactory.newException(ex));
event.reply(new JsonObject().put(STATUS, ERROR));
});
}
Expand Down
Loading

0 comments on commit 0d93d87

Please sign in to comment.