Skip to content

Commit

Permalink
Extract ExceptionFactory in its own branch
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed May 21, 2024
1 parent b00ebff commit 88c44d4
Show file tree
Hide file tree
Showing 13 changed files with 407 additions and 657 deletions.
97 changes: 34 additions & 63 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,23 @@

import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.NoStacktraceException;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.DequeueStatisticCollector;
import org.swisspush.redisques.util.QueueStatisticsCollector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.*;
import java.util.function.BiConsumer;

import static java.lang.Long.compare;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.emptyList;
import static org.slf4j.LoggerFactory.getLogger;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE;
import static org.swisspush.redisques.util.RedisquesAPI.OK;
import static org.swisspush.redisques.util.RedisquesAPI.QUEUES;
import static org.swisspush.redisques.util.RedisquesAPI.STATUS;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesItemsCountOperation;
import static org.swisspush.redisques.util.RedisquesAPI.*;


/**
Expand All @@ -48,79 +38,60 @@ public class QueueStatsService {
private final String redisquesAddress;
private final QueueStatisticsCollector queueStatisticsCollector;
private final DequeueStatisticCollector dequeueStatisticCollector;
private final Semaphore incomingRequestQuota;
private final RedisQuesExceptionFactory exceptionFactory;

public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
Semaphore incomingRequestQuota
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory
) {
this.vertx = vertx;
this.eventBus = eventBus;
this.redisquesAddress = redisquesAddress;
this.queueStatisticsCollector = queueStatisticsCollector;
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.incomingRequestQuota = incomingRequestQuota;
this.exceptionFactory = exceptionFactory;
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
if (!incomingRequestQuota.tryAcquire()) {
var ex = new NoStacktraceException("Server too busy to handle yet-another-queue-stats-request now (error_bG8CAJJ3AgCKMwIAPBUC)");
vertx.runOnContext(v -> mentor.onError(ex, mCtx));
return;
}
AtomicBoolean isCompleted = new AtomicBoolean();
try {
var req0 = new GetQueueStatsRequest<CTX>();
BiConsumer<Throwable, List<Queue>> onDone = (Throwable ex, List<Queue> ans) -> {
if (!isCompleted.compareAndSet(false, true)) {
if (log.isInfoEnabled()) log.info("", new RuntimeException("onDone MUST be called ONCE only", ex));
return;
}
incomingRequestQuota.release();
if (ex != null) mentor.onError(ex, mCtx);
else mentor.onQueueStatistics(ans, mCtx);
};
req0.mCtx = mCtx;
req0.mentor = mentor;
fetchQueueNamesAndSize(req0, (ex1, req1) -> {
if (ex1 != null) { onDone.accept(ex1, null); return; }
// Prepare a list of queue names as it is needed to fetch retryDetails.
req1.queueNames = new ArrayList<>(req1.queues.size());
for (Queue q : req1.queues) req1.queueNames.add(q.name);
fetchRetryDetails(req1, (ex2, req2) -> {
if (ex2 != null) { onDone.accept(ex2, null); return; }
attachDequeueStats(req2, (ex3, req3) -> {
if (ex3 != null) { onDone.accept(ex3, null); return; }
onDone.accept(null, req3.queues);
});
var req0 = new GetQueueStatsRequest<CTX>();
req0.mCtx = mCtx;
req0.mentor = mentor;
fetchQueueNamesAndSize(req0, (ex1, req1) -> {
if (ex1 != null) { req1.mentor.onError(ex1, req1.mCtx); return; }
// Prepare a list of queue names as it is needed to fetch retryDetails.
req1.queueNames = new ArrayList<>(req1.queues.size());
for (Queue q : req1.queues) req1.queueNames.add(q.name);
fetchRetryDetails(req1, (ex2, req2) -> {
if (ex2 != null) { req2.mentor.onError(ex2, req2.mCtx); return; }
attachDequeueStats(req2, (ex3, req3) -> {
if (ex3 != null) { req3.mentor.onError(ex3, req3.mCtx); return; }
req3.mentor.onQueueStatistics(req3.queues, req3.mCtx);
});
});
} catch (Exception ex) {
if (!isCompleted.compareAndSet(false, true)) {
if (log.isInfoEnabled()) log.info("onDone MUST be called ONCE only", ex);
return;
}
incomingRequestQuota.release();
vertx.runOnContext(v -> mentor.onError(ex, mCtx));
}
});
}

private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
String filter = req.mentor.filter(req.mCtx);
JsonObject operation = buildGetQueuesItemsCountOperation(filter);
eventBus.<JsonObject>request(redisquesAddress, operation, ev -> {
if (ev.failed()) {
onDone.accept(new NoStacktraceException("error_QzkCACMbAgCgOwIA", ev.cause()), req);
Throwable ex = exceptionFactory.newException("eventBus.request()", ev.cause());
assert ex != null;
onDone.accept(ex, null);
return;
}
JsonObject body = ev.result().body();
Message<JsonObject> msg = ev.result();
JsonObject body = msg.body();
String status = body.getString(STATUS);
if (!OK.equals(status)) {
onDone.accept(new NoStacktraceException("error_aXACAPcbAgBLGgIABnAC: " + status), null);
Throwable ex = exceptionFactory.newException("Unexpected status " + status);
assert ex != null;
onDone.accept(ex, null);
return;
}
JsonArray queuesJsonArr = body.getJsonArray(QUEUES);
Expand Down
Loading

0 comments on commit 88c44d4

Please sign in to comment.