diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index 7ccdad2..938c92e 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -5,6 +5,7 @@ import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import org.slf4j.Logger; +import org.swisspush.redisques.exception.RedisQuesExceptionFactory; import org.swisspush.redisques.exception.NoStacktraceException; import org.swisspush.redisques.util.DequeueStatistic; import org.swisspush.redisques.util.DequeueStatisticCollector; @@ -48,21 +49,24 @@ public class QueueStatsService { private final String redisquesAddress; private final QueueStatisticsCollector queueStatisticsCollector; private final DequeueStatisticCollector dequeueStatisticCollector; + private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore incomingRequestQuota; public QueueStatsService( - Vertx vertx, - EventBus eventBus, - String redisquesAddress, - QueueStatisticsCollector queueStatisticsCollector, - DequeueStatisticCollector dequeueStatisticCollector, - Semaphore incomingRequestQuota + Vertx vertx, + EventBus eventBus, + String redisquesAddress, + QueueStatisticsCollector queueStatisticsCollector, + DequeueStatisticCollector dequeueStatisticCollector, + RedisQuesExceptionFactory exceptionFactory, + Semaphore incomingRequestQuota ) { this.vertx = vertx; this.eventBus = eventBus; this.redisquesAddress = redisquesAddress; this.queueStatisticsCollector = queueStatisticsCollector; this.dequeueStatisticCollector = dequeueStatisticCollector; + this.exceptionFactory = exceptionFactory; this.incomingRequestQuota = incomingRequestQuota; } @@ -114,13 +118,17 @@ private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsu JsonObject operation = buildGetQueuesItemsCountOperation(filter); eventBus.request(redisquesAddress, operation, ev -> { if (ev.failed()) { - onDone.accept(new NoStacktraceException("error_QzkCACMbAgCgOwIA", ev.cause()), req); + Throwable ex = exceptionFactory.newException("eventBus.request()", ev.cause()); + assert ex != null; + onDone.accept(ex, null); return; } JsonObject body = ev.result().body(); String status = body.getString(STATUS); if (!OK.equals(status)) { - onDone.accept(new NoStacktraceException("error_aXACAPcbAgBLGgIABnAC: " + status), null); + Throwable ex = exceptionFactory.newException("Unexpected status " + status); + assert ex != null; + onDone.accept(ex, null); return; } JsonArray queuesJsonArr = body.getJsonArray(QUEUES); diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 9c77e26..7fb7968 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -14,12 +14,14 @@ import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Command; import io.vertx.redis.client.Redis; +import io.vertx.redis.client.Redis; import io.vertx.redis.client.RedisAPI; import io.vertx.redis.client.Request; import io.vertx.redis.client.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.swisspush.redisques.action.QueueAction; +import org.swisspush.redisques.exception.RedisQuesExceptionFactory; import org.swisspush.redisques.exception.ExceptionFactory; import org.swisspush.redisques.exception.NoStacktraceException; import org.swisspush.redisques.handler.RedisquesHttpRequestHandler; @@ -55,9 +57,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.Consumer; import static java.lang.System.currentTimeMillis; -import static org.swisspush.redisques.exception.ExceptionFactory.newThriftyExceptionFactory; +import static org.swisspush.redisques.exception.RedisQuesExceptionFactory.newThriftyExceptionFactory; import static org.swisspush.redisques.util.RedisquesAPI.ERROR; import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE; import static org.swisspush.redisques.util.RedisquesAPI.OK; @@ -97,7 +100,7 @@ public static class RedisQuesBuilder { private MemoryUsageProvider memoryUsageProvider; private RedisquesConfigurationProvider configurationProvider; private RedisProvider redisProvider; - private ExceptionFactory exceptionFactory; + private RedisQuesExceptionFactory exceptionFactory; private Semaphore redisMonitoringReqQuota; private Semaphore checkQueueRequestsQuota; private Semaphore queueStatsRequestQuota; @@ -122,7 +125,7 @@ public RedisQuesBuilder withRedisProvider(RedisProvider redisProvider) { return this; } - public RedisQuesBuilder withExceptionFactory(ExceptionFactory exceptionFactory) { + public RedisQuesBuilder withExceptionFactory(RedisQuesExceptionFactory exceptionFactory) { this.exceptionFactory = exceptionFactory; return this; } @@ -235,16 +238,16 @@ private enum QueueState { private Map dequeueStatistic = new ConcurrentHashMap<>(); private boolean dequeueStatisticEnabled = false; + private final RedisQuesExceptionFactory exceptionFactory; private PeriodicSkipScheduler periodicSkipScheduler; - private final ExceptionFactory exceptionFactory; private final Semaphore redisMonitoringReqQuota; private final Semaphore checkQueueRequestsQuota; private final Semaphore queueStatsRequestQuota; private final Semaphore getQueuesItemsCountRedisRequestQuota; public RedisQues() { - log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE); this.exceptionFactory = newThriftyExceptionFactory(); + log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE); this.redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE); this.checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE); this.queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE); @@ -255,7 +258,7 @@ public RedisQues( MemoryUsageProvider memoryUsageProvider, RedisquesConfigurationProvider configurationProvider, RedisProvider redisProvider, - ExceptionFactory exceptionFactory, + RedisQuesExceptionFactory exceptionFactory, Semaphore redisMonitoringReqQuota, Semaphore checkQueueRequestsQuota, Semaphore queueStatsRequestQuota, @@ -371,11 +374,12 @@ public void start(Promise promise) { private void initialize() { RedisquesConfiguration configuration = configurationProvider.configuration(); this.queueStatisticsCollector = new QueueStatisticsCollector( - redisProvider, queuesPrefix, vertx, redisMonitoringReqQuota, + redisProvider, queuesPrefix, vertx, exceptionFactory, redisMonitoringReqQuota, configuration.getQueueSpeedIntervalSec()); - RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector, - dequeueStatisticCollector, queueStatsRequestQuota); + RedisquesHttpRequestHandler.init( + vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector, + exceptionFactory, queueStatsRequestQuota); // only initialize memoryUsageProvider when not provided in the constructor if (memoryUsageProvider == null) { @@ -384,9 +388,10 @@ private void initialize() { } assert getQueuesItemsCountRedisRequestQuota != null; - queueActionFactory = new QueueActionFactory(redisProvider, vertx, log, - queuesKey, queuesPrefix, consumersPrefix, locksKey, queueStatisticsCollector, memoryUsageProvider, - configurationProvider, exceptionFactory, getQueuesItemsCountRedisRequestQuota); + queueActionFactory = new QueueActionFactory( + redisProvider, vertx, log, queuesKey, queuesPrefix, consumersPrefix, locksKey, + memoryUsageProvider, queueStatisticsCollector, exceptionFactory, + configurationProvider, getQueuesItemsCountRedisRequestQuota); queueActions.put(addQueueItem, queueActionFactory.buildQueueAction(addQueueItem)); queueActions.put(deleteQueueItem, queueActionFactory.buildQueueAction(deleteQueueItem)); @@ -513,6 +518,7 @@ void resume() { return task.execute().compose(taskResult -> { // append task result to previous results previousResults.add(taskResult); + i.incrementAndGet(); return Future.succeededFuture(previousResults); }); }), (a,b) -> Future.succeededFuture()); @@ -534,60 +540,75 @@ void resume() { private void registerActiveQueueRegistrationRefresh() { // Periodic refresh of my registrations on active queues. var periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; - periodicSkipScheduler.setPeriodic(periodMs, "registerActiveQueueRegistrationRefresh", onDone_ -> { - AtomicInteger numPending = new AtomicInteger(); - Runnable onDone = () -> { - var remaining = numPending.decrementAndGet(); - assert remaining >= 0 : "Why is remaining " + remaining; - if (remaining == 0) onDone_.run(); - }; - boolean foundAtLeastOne = false; - for (Map.Entry entry : myQueues.entrySet()) { - if (entry.getValue() == QueueState.CONSUMING) continue; - foundAtLeastOne = true; - - numPending.incrementAndGet(); - final String queue = entry.getKey(); + periodicSkipScheduler.setPeriodic(periodMs, "registerActiveQueueRegistrationRefresh", new Consumer() { + Iterator> iter; + @Override public void accept(Runnable onPeriodicDone) { + // Need a copy to prevent concurrent modification issuses. + iter = new HashMap<>(myQueues).entrySet().iterator(); + // Trigger only a limitted amount of requests in parallel. + upperBoundParallel.request(redisMonitoringReqQuota, iter, new UpperBoundParallel.Mentor<>() { + @Override public boolean runOneMore(BiConsumer onQueueDone, Iterator> iter) { + handleNextQueueOfInterest(onQueueDone); + return iter.hasNext(); + } + @Override public boolean onError(Throwable ex, Iterator> iter) { + if (log.isWarnEnabled()) log.warn("TODO error handling", exceptionFactory.newException(ex)); + return false; + } + @Override public void onDone(Iterator> iter) { + onPeriodicDone.run(); + } + }); + } + void handleNextQueueOfInterest(BiConsumer onQueueDone) { + while (iter.hasNext()) { + var entry = iter.next(); + if (entry.getValue() != QueueState.CONSUMING) continue; + checkIfImStillTheRegisteredConsumer(entry.getKey(), onQueueDone); + return; + } + // no entry found. we're done. + onQueueDone.accept(null, null); + } + void checkIfImStillTheRegisteredConsumer(String queue, BiConsumer onDone) { // Check if I am still the registered consumer String consumerKey = consumersPrefix + queue; - if (log.isTraceEnabled()) { - log.trace("RedisQues refresh queues get: {}", consumerKey); - } - redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(consumerKey, getConsumerEvent -> { - if (getConsumerEvent.failed()) { - log.warn("Failed to get queue consumer for queue '{}'. But we'll continue anyway :)", queue, getConsumerEvent.cause()); - // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" - } - final String consumer = Objects.toString(getConsumerEvent.result(), ""); - if (uid.equals(consumer)) { - log.debug("RedisQues Periodic consumer refresh for active queue {}", queue); - refreshRegistration(queue, ev -> { - if (ev.failed()) - log.warn("TODO_1WoCAD1bAgBRBwIA9yQC error handling", ev.cause()); - updateTimestamp(queue, updateTimestampEv -> { - if (updateTimestampEv.failed()) log.warn( - "TODO_mWoCAARJAgDTXAIASwUC error handling", updateTimestampEv.cause()); - onDone.run(); - }); - }); - } else { - log.debug("RedisQues Removing queue {} from the list", queue); - myQueues.remove(queue); - queueStatisticsCollector.resetQueueFailureStatistics(queue, (Throwable ex, Void v) -> { - if (ex != null) log.warn("TODO_RGsCAA4CAgDdWAIAVRMC error handling", ex); - onDone.run(); + log.trace("RedisQues refresh queues get: {}", consumerKey); + redisProvider.redis().onComplete( ev1 -> { + if (ev1.failed()) { + onDone.accept(exceptionFactory.newException("redisProvider.redis() failed", ev1.cause()), null); + return; + } + var redisAPI = ev1.result(); + redisAPI.get(consumerKey, getConsumerEvent -> { + if (getConsumerEvent.failed()) { + Throwable ex = exceptionFactory.newException( + "Failed to get queue consumer for queue '" + queue + "'", getConsumerEvent.cause()); + assert ex != null; + onDone.accept(ex, null); + return; + } + final String consumer = Objects.toString(getConsumerEvent.result(), ""); + if (uid.equals(consumer)) { + log.debug("RedisQues Periodic consumer refresh for active queue {}", queue); + refreshRegistration(queue, ev -> { + if (ev.failed()) { + onDone.accept(exceptionFactory.newException("TODO error handling", ev.cause()), null); + return; + } + updateTimestamp(queue, ev3 -> { + Throwable ex = ev3.succeeded() ? null : exceptionFactory.newException( + "updateTimestamp(" + queue + ") failed", ev3.cause()); + onDone.accept(ex, null); }); - } - })) - .onFailure(ex -> { - log.error("Redis: Failed to registerActiveQueueRegistrationRefresh", ex); - onDone.run(); - }); - - } - if (!foundAtLeastOne) { - numPending.incrementAndGet(); - onDone.run(); + }); + } else { + log.debug("RedisQues Removing queue {} from the list", queue); + myQueues.remove(queue); + queueStatisticsCollector.resetQueueFailureStatistics(queue, onDone); + } + }); + }); } }); } @@ -609,7 +630,11 @@ private Handler> operationsHandler() { // handle system operations switch (queueOperation) { case check: - checkQueues().onFailure(ex -> log.warn("TODO_e2UCAGlXAgBAZQIAlBcC error handling", ex)); + checkQueues().onFailure(ex -> { + if (log.isWarnEnabled()) { + log.warn("TODO error handling", exceptionFactory.newException(ex)); + } + }); return; case reset: resetConsumers(); @@ -650,17 +675,18 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s return configurationProvider.configuration().getRefreshPeriod(); } + private void registerQueueCheck() { vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> { - redisProvider.connection().compose((Redis conn) -> { + redisProvider.connection().compose((Redis conn) -> { int checkInterval = configurationProvider.configuration().getCheckInterval(); Request req = Request.cmd(Command.SET, queueCheckLastexecKey, currentTimeMillis(), "NX", "EX", checkInterval); return conn.send(req); }).compose((Response todoExplainWhyThisIsIgnored) -> { log.info("periodic queue check is triggered now"); return checkQueues(); - }).onFailure(ex -> { - log.warn("TODO_WmYCAIxGAgB0aAIA4yoC error handling", ex); + }).onFailure((Throwable ex) -> { + if (log.isErrorEnabled()) log.error("TODO error handling", exceptionFactory.newException(ex)); }); }); } @@ -681,10 +707,13 @@ public void stop() { private void gracefulStop(final Handler doneHandler) { consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> { - if( event.failed() ) log.warn("TODO_FWsCAAE0AgCPRQIA8QgC error handling", event.cause()); + if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException( + "unregister(" + event + ") failed", event.cause())); unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { - if( unregisterEvent.failed() ) - log.warn("TODO_82sCAAczAgCQVgIA6E0C error handling", unregisterEvent.cause()); + if( unregisterEvent.failed() ) { + log.warn("TODO error handling", exceptionFactory.newException( + "unregisterConsumers() failed", unregisterEvent.cause())); + } stoppedHandler = doneHandler; if (myQueues.keySet().isEmpty()) { doneHandler.handle(null); @@ -704,7 +733,10 @@ private Future unregisterConsumers(boolean force) { if (force || entry.getValue() == QueueState.READY) { log.trace("RedisQues unregister consumers queue: {}", queue); refreshRegistration(queue, event -> { - if( event.failed() ) log.warn("TODO_12sCABtNAgAqKAIAWg0C error handling", event.cause()); + if (event.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "refreshRegistration(" + queue + ") failed", event.cause())); + } // Make sure that I am still the registered consumer String consumerKey = consumersPrefix + queue; log.trace("RedisQues unregister consumers get: {}", consumerKey); @@ -730,7 +762,7 @@ private Future unregisterConsumers(boolean force) { } } CompositeFuture.all(futureList).onComplete(ev -> { - if( ev.failed() ) log.warn("TODO_m2sCABpzAgCHbQIA5TYC error handling", ev.cause()); + if (ev.failed()) log.warn("TODO error handling", exceptionFactory.newException(ev.cause())); result.complete(); }); return result.future(); @@ -802,8 +834,10 @@ private Future consume(final String queueName) { } log.debug("RedisQues Starting to consume queue {}", queueName); readQueue(queueName).onComplete(readQueueEvent -> { - if( readQueueEvent.failed() ) - log.warn("TODO_aWwCACYIAgCpZwIA8RgC error handling", readQueueEvent.cause()); + if (readQueueEvent.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "readQueue(" + queueName + ") failed", readQueueEvent.cause())); + } promise.complete(); }); } else { @@ -815,8 +849,10 @@ private Future consume(final String queueName) { log.debug("Registration for queue {} has changed to {}", queueName, consumer); myQueues.remove(queueName); notifyConsumer(queueName).onComplete(notifyConsumerEvent -> { - if( notifyConsumerEvent.failed() ) - log.warn("TODO_Q2wCAHFYAgCgRQIA8yoC error handling", notifyConsumerEvent.cause()); + if (notifyConsumerEvent.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "notifyConsumer(" + queueName + ") failed", notifyConsumerEvent.cause())); + } promise.complete(); }); } @@ -855,8 +891,9 @@ private Future readQueue(final String queueName) { log.trace("RedisQues read queue lindex: {}", queueKey); isQueueLocked(queueName).onComplete(lockAnswer -> { - if( lockAnswer.failed() ) - throw new NoStacktraceException("TODO_o2wCACFRAgAAKAIAMUUC error handling " + queueName, lockAnswer.cause()); + if (lockAnswer.failed()) { + throw exceptionFactory.newRuntimeException("TODO error handling " + queueName, lockAnswer.cause()); + } boolean locked = lockAnswer.result(); if (!locked) { redisProvider.redis().onSuccess(redisAPI -> redisAPI.lindex(queueKey, "0", answer -> { @@ -893,13 +930,16 @@ private Future readQueue(final String queueName) { redisAPI.llen(queueKey, answer1 -> { if (answer1.succeeded() && answer1.result() != null && answer1.result().toInteger() > 0) { notifyConsumer(queueName).onComplete(event1 -> { - if( event1.failed() ) - log.warn("TODO_d20CAD8wAgCbZQIAvSgC error handling", event1.cause()); + if (event1.failed()) + log.warn("TODO error handling", exceptionFactory.newException( + "notifyConsumer(" + queueName + ") failed", event1.cause())); promise.complete(); }); } else { - if( answer1.failed() ) - log.warn("TODO_bm0CAEFuAgBcUAIAfQ8C error handling", answer1.cause()); + if (answer1.failed() && log.isWarnEnabled()) { + log.warn("TODO error handling", exceptionFactory.newException( + "redisAPI.llen(" + queueKey + ") failed", answer1.cause())); + } promise.complete(); } }); @@ -908,8 +948,10 @@ private Future readQueue(final String queueName) { // Notify that we are stopped in case it was the last active consumer if (stoppedHandler != null) { unregisterConsumers(false).onComplete(event -> { - if( event.failed() ) - log.warn("TODO_GW0CAMUlAgDcJgIAzncC error handling", event.cause()); + if (event.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "unregisterConsumers() failed", event.cause())); + } if (myQueues.isEmpty()) { stoppedHandler.handle(null); } @@ -968,7 +1010,10 @@ private void rescheduleSendMessageAfterFailure(final String queueName, int retry log.debug("RedisQues re-notify the consumer of queue '{}' at {}", queueName, new Date(System.currentTimeMillis())); } notifyConsumer(queueName).onComplete(event -> { - if( event.failed() ) log.warn("TODO_6m0CAExTAgCYSgIAnxkC error handling", event.cause()); + if (event.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "notifyConsumer(" + queueName + ") failed", event.cause())); + } // reset the queue state to be consumed by {@link RedisQues#consume(String)} myQueues.put(queueName, QueueState.READY); }); @@ -982,7 +1027,7 @@ private void processMessageWithTimeout(final String queue, final String payload, } timer.executeDelayedMax(processorDelayMax).onComplete(delayed -> { if (delayed.failed()) { - log.error("Delayed execution has failed.", new Exception(delayed.cause())); + log.error("Delayed execution has failed.", exceptionFactory.newException(delayed.cause())); // TODO: May we should call handler with failed state now. return; } @@ -1005,7 +1050,7 @@ private void processMessageWithTimeout(final String queue, final String payload, } } else { log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {}", - uid, queue, new Exception(reply.cause())); + uid, queue, exceptionFactory.newException(reply.cause())); success = Boolean.FALSE; } @@ -1095,13 +1140,12 @@ private void updateTimestamp(final String queueName, Handler checkQueues() { final var ctx = new Object() { - final Promise checkQueuesResult = Promise.promise(); long limit; RedisAPI redisAPI; AtomicInteger counter; Iterator iter; }; - Future.succeededFuture().compose((Void v) -> { + return Future.succeededFuture().compose((Void v) -> { log.debug("Checking queues timestamps"); // List all queues that look inactive (i.e. that have not been updated since 3 periods). ctx.limit = currentTimeMillis() - 3L * configurationProvider.configuration().getRefreshPeriod() * 1000; @@ -1156,8 +1200,10 @@ private Future checkQueues() { // Ensure we clean the old queues after having updated all timestamps if (ctx.counter.decrementAndGet() == 0) { removeOldQueues(ctx.limit).onComplete(removeOldQueuesEvent -> { - if (removeOldQueuesEvent.failed()) - log.warn("TODO_i2kCAChDAgDrbAIAIWcC error handling", removeOldQueuesEvent.cause()); + if (removeOldQueuesEvent.failed() && log.isWarnEnabled()) { + log.warn("TODO error handling", exceptionFactory.newException( + "removeOldQueues(" + ctx.limit + ") failed", removeOldQueuesEvent.cause())); + } refreshRegHandler.handle(null); }); } else { @@ -1177,8 +1223,10 @@ private Future checkQueues() { } if (ctx.counter.decrementAndGet() == 0) { removeOldQueues(ctx.limit).onComplete(removeOldQueuesEvent -> { - if( removeOldQueuesEvent.failed() ) - log.warn("TODO_aGoCAOtOAgAnbQIA4zQC error handling", removeOldQueuesEvent.cause()); + if (removeOldQueuesEvent.failed() && log.isWarnEnabled()) { + log.warn("TODO error handling", exceptionFactory.newException( + "removeOldQueues(" + ctx.limit + ") failed", removeOldQueuesEvent.cause())); + } queueStatisticsCollector.resetQueueFailureStatistics(queueName, onDone); }); } else { @@ -1190,7 +1238,7 @@ private Future checkQueues() { return ctx.iter.hasNext(); } @Override public boolean onError(Throwable ex, Void ctx_) { - log.warn("TODO_LWoCABp8AgAiIQIAlC4C error handling", ex); + log.warn("TODO error handling", exceptionFactory.newException(ex)); return true; // true, keep going with other queues. } @Override public void onDone(Void ctx_) { @@ -1203,14 +1251,7 @@ private Future checkQueues() { } }); return p.future(); - }).compose((Void v) -> { - ctx.checkQueuesResult.complete(); - return Future.succeededFuture(); - }).onFailure(ex -> { - log.debug("Redis: Failed to checkQueues", ex); - ctx.checkQueuesResult.fail(ex); }); - return ctx.checkQueuesResult.future(); } /** @@ -1224,7 +1265,9 @@ private Future removeOldQueues(long limit) { redisProvider.redis() .onSuccess(redisAPI -> { redisAPI.zremrangebyscore(queuesKey, "-inf", String.valueOf(limit), event -> { - if( event.failed() ) log.warn("TODO_92oCAP5SAgCcOgIAMW8C error handling", event.cause()); + if (event.failed() && log.isWarnEnabled()) log.warn("TODO error handling", + exceptionFactory.newException("redisAPI.zremrangebyscore('" + queuesKey + "', '-inf', " + limit + ") failed", + event.cause())); promise.complete(); }); }) diff --git a/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java b/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java index bcd78aa..d3e7b03 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java @@ -4,7 +4,7 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import org.slf4j.Logger; -import org.swisspush.redisques.exception.ExceptionFactory; +import org.swisspush.redisques.exception.RedisQuesExceptionFactory; import org.swisspush.redisques.handler.GetQueuesItemsCountHandler; import org.swisspush.redisques.util.*; @@ -20,7 +20,7 @@ */ public class GetQueuesItemsCountAction extends AbstractQueueAction { - private final ExceptionFactory exceptionFactory; + private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore redisRequestQuota; public GetQueuesItemsCountAction( @@ -32,7 +32,7 @@ public GetQueuesItemsCountAction( String consumersPrefix, String locksKey, List queueConfigurations, - ExceptionFactory exceptionFactory, + RedisQuesExceptionFactory exceptionFactory, Semaphore redisRequestQuota, QueueStatisticsCollector queueStatisticsCollector, Logger log diff --git a/src/main/java/org/swisspush/redisques/exception/RedisQuesExceptionFactory.java b/src/main/java/org/swisspush/redisques/exception/RedisQuesExceptionFactory.java new file mode 100644 index 0000000..afa36a9 --- /dev/null +++ b/src/main/java/org/swisspush/redisques/exception/RedisQuesExceptionFactory.java @@ -0,0 +1,50 @@ +package org.swisspush.redisques.exception; + +import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.eventbus.ReplyFailure; + + +/** + * Applies dependency inversion for exception instantiation. + * + * This class did arise because we had different use cases in different + * applications. One of them has the need to perform fine-grained error + * reporting. Whereas in the other application this led to performance issues. + * So now through this abstraction, both applications can choose the behavior + * they need. + * + * If dependency-injection gets applied properly, an app can even provide its + * custom implementation to fine-tune the exact behavior even further. + */ +public interface RedisQuesExceptionFactory { + + public default Exception newException(String message) { return newException(message, null); } + + public default Exception newException(Throwable cause) { return newException(null, cause); } + + public Exception newException(String message, Throwable cause); + + public default RuntimeException newRuntimeException(String message) { return newRuntimeException(message, null); } + + public default RuntimeException newRuntimeException(Throwable cause) { return newRuntimeException(null, cause); } + + public RuntimeException newRuntimeException(String message, Throwable cause); + + public ReplyException newReplyException(ReplyFailure failureType, int failureCode, String msg); + + + /** + * See {@link ThriftyRedisQuesExceptionFactory}. + */ + public static RedisQuesExceptionFactory newThriftyExceptionFactory() { + return new ThriftyRedisQuesExceptionFactory(); + } + + /** + * See {@link WastefulRedisQuesExceptionFactory}. + */ + public static RedisQuesExceptionFactory newWastefulExceptionFactory() { + return new WastefulRedisQuesExceptionFactory(); + } + +} diff --git a/src/main/java/org/swisspush/redisques/exception/ThriftyRedisQuesExceptionFactory.java b/src/main/java/org/swisspush/redisques/exception/ThriftyRedisQuesExceptionFactory.java new file mode 100644 index 0000000..e93fdbd --- /dev/null +++ b/src/main/java/org/swisspush/redisques/exception/ThriftyRedisQuesExceptionFactory.java @@ -0,0 +1,40 @@ +package org.swisspush.redisques.exception; + +import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.eventbus.ReplyFailure; + +/** + * Trades maintainability for speed. For example prefers lightweight + * exceptions without stacktrace recording. It may even decide to drop 'cause' + * and 'suppressed' exceptions. If an app needs more error details it should use + * {@link WastefulRedisQuesExceptionFactory}. If none of those fits the apps needs, it + * can provide its own implementation. + */ +class ThriftyRedisQuesExceptionFactory implements RedisQuesExceptionFactory { + + ThriftyRedisQuesExceptionFactory() { + } + + public Exception newException(String message, Throwable cause) { + // This impl exists for speed. So why even bother creating new instances + // if we can use already existing ones. If caller really needs another + // instance, he should use another implementation of this factory. + if (cause instanceof Exception) return (Exception) cause; + return new NoStacktraceException(message, cause); + } + + @Override + public RuntimeException newRuntimeException(String message, Throwable cause) { + // This impl exists for speed. So why even bother creating new instances + // if we can use already existing ones. If caller really needs another + // instance, he should use another implementation of this factory. + if (cause instanceof RuntimeException) return (RuntimeException) cause; + return new NoStacktraceException(message, cause); + } + + @Override + public ReplyException newReplyException(ReplyFailure failureType, int failureCode, String msg) { + return new NoStackReplyException(failureType, failureCode, msg); + } + +} diff --git a/src/main/java/org/swisspush/redisques/exception/WastefulRedisQuesExceptionFactory.java b/src/main/java/org/swisspush/redisques/exception/WastefulRedisQuesExceptionFactory.java new file mode 100644 index 0000000..8890fd4 --- /dev/null +++ b/src/main/java/org/swisspush/redisques/exception/WastefulRedisQuesExceptionFactory.java @@ -0,0 +1,33 @@ +package org.swisspush.redisques.exception; + +import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.eventbus.ReplyFailure; + +/** + * Trades speed for maintainability. For example invests more resources like + * recording stack traces (which likely provocates more logs) to get easier + * to debug error messages and better hints of what is happening. It also + * keeps details like 'causes' and 'suppressed' exceptions. If an app needs + * more error details it should use {@link WastefulRedisQuesExceptionFactory}. If none + * of those fits the apps needs, it can provide its own implementation. + */ +class WastefulRedisQuesExceptionFactory implements RedisQuesExceptionFactory { + + WastefulRedisQuesExceptionFactory() { + } + + public Exception newException(String message, Throwable cause) { + return new Exception(message, cause); + } + + @Override + public RuntimeException newRuntimeException(String message, Throwable cause) { + return new RuntimeException(message, cause); + } + + @Override + public ReplyException newReplyException(ReplyFailure failureType, int failureCode, String msg) { + return new ReplyException(failureType, failureCode, msg); + } + +} diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java index 82e85f8..a35de45 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java @@ -1,11 +1,9 @@ package org.swisspush.redisques.handler; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.ReplyFailure; import io.vertx.core.json.JsonArray; @@ -16,14 +14,9 @@ import io.vertx.redis.client.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.swisspush.redisques.action.GetQueuesItemsCountAction; -import org.swisspush.redisques.exception.ExceptionFactory; -import org.swisspush.redisques.exception.NoStackReplyException; -import org.swisspush.redisques.exception.NoStacktraceException; import org.swisspush.redisques.performance.UpperBoundParallel; import org.swisspush.redisques.util.HandlerUtil; import org.swisspush.redisques.util.RedisProvider; -import org.swisspush.redisques.util.RedisquesAPI.QueueOperation; import java.util.Iterator; import java.util.List; @@ -32,6 +25,8 @@ import java.util.function.BiConsumer; import java.util.regex.Pattern; +import org.swisspush.redisques.exception.RedisQuesExceptionFactory; + import static java.lang.System.currentTimeMillis; import static org.swisspush.redisques.util.RedisquesAPI.ERROR; import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; @@ -40,6 +35,7 @@ import static org.swisspush.redisques.util.RedisquesAPI.QUEUES; import static org.swisspush.redisques.util.RedisquesAPI.STATUS; + public class GetQueuesItemsCountHandler implements Handler> { private final Logger log = LoggerFactory.getLogger(GetQueuesItemsCountHandler.class); @@ -50,7 +46,7 @@ public class GetQueuesItemsCountHandler implements Handler private final String queuesPrefix; private final RedisProvider redisProvider; private final UpperBoundParallel upperBoundParallel; - private final ExceptionFactory exceptionFactory; + private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore redisRequestQuota; public GetQueuesItemsCountHandler( @@ -59,7 +55,7 @@ public GetQueuesItemsCountHandler( Optional filterPattern, String queuesPrefix, RedisProvider redisProvider, - ExceptionFactory exceptionFactory, + RedisQuesExceptionFactory exceptionFactory, Semaphore redisRequestQuota ) { this.vertx = vertx; @@ -75,11 +71,11 @@ public GetQueuesItemsCountHandler( @Override public void handle(AsyncResult handleQueues) { if (!handleQueues.succeeded()) { - log.warn("Concealed error", new Exception(handleQueues.cause())); + log.warn("Concealed error", exceptionFactory.newException(handleQueues.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); return; } - var ctx = new Object() { + var ctx = new Object(){ Redis redis; Iterator iter; List queues = HandlerUtil.filterByPattern(handleQueues.result(), filterPattern); @@ -96,26 +92,28 @@ public void handle(AsyncResult handleQueues) { "Too many simultaneous '" + GetQueuesItemsCountHandler.class.getSimpleName() + "' requests in progress")); return; } - redisProvider.connection().compose((Redis redis_) -> { + redisProvider.connection().compose((Redis redis_) -> { ctx.redis = redis_; ctx.queueLengths = new int[ctx.queues.size()]; ctx.iter = ctx.queues.iterator(); var p = Promise.promise(); upperBoundParallel.request(redisRequestQuota, null, new UpperBoundParallel.Mentor() { @Override public boolean runOneMore(BiConsumer onDone, Void unused) { + /*TODO rename 'onDone' to 'onLlenDone' or similar*/ if (ctx.iter.hasNext()) { String queue = ctx.iter.next(); int iNum = ctx.iNumberResult++; ctx.redis.send(Request.cmd(Command.LLEN, queuesPrefix + queue)).onSuccess((Response rsp) -> { ctx.queueLengths[iNum] = rsp.toInteger(); onDone.accept(null, null); - }).onFailure(ex -> { + }).onFailure((Throwable ex) -> { onDone.accept(ex, null); }); } return ctx.iter.hasNext(); } @Override public boolean onError(Throwable ex, Void ctx_) { + /*TODO use exceptionFactory*/ log.error("Unexpected queue length result", new Exception(ex)); event.reply(new JsonObject().put(STATUS, ERROR)); return false; @@ -125,11 +123,12 @@ public void handle(AsyncResult handleQueues) { } }); return p.future(); - }).compose((Void v) -> { + }).compose((Void v) -> { /*going to waste another threads time to produce those garbage objects*/ return vertx.executeBlocking((Promise workerPromise) -> { assert !Thread.currentThread().getName().toUpperCase().contains("EVENTLOOP"); long beginEpchMs = currentTimeMillis(); + JsonArray result = new JsonArray(); for (int i = 0; i < ctx.queueLengths.length; ++i) { String queueName = ctx.queues.get(i); @@ -146,8 +145,8 @@ public void handle(AsyncResult handleQueues) { }).onSuccess((JsonObject json) -> { log.trace("call event.reply(json)"); event.reply(json); - }).onFailure(ex -> { - log.warn("Redis: Failed to get queue length (error_c3gCAEFrAgChbAIAdhwC)", ex); + }).onFailure((Throwable ex) -> { + log.warn("Redis: Failed to get queue length.", exceptionFactory.newException(ex)); event.reply(new JsonObject().put(STATUS, ERROR)); }); } diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 31bd6f1..9f46233 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -21,6 +21,8 @@ import org.slf4j.LoggerFactory; import org.swisspush.redisques.QueueStatsService; import org.swisspush.redisques.QueueStatsService.GetQueueStatsMentor; +import org.swisspush.redisques.exception.RedisQuesExceptionFactory; +import org.swisspush.redisques.util.DequeueStatistic; import org.swisspush.redisques.util.DequeueStatisticCollector; import org.swisspush.redisques.util.QueueStatisticsCollector; import org.swisspush.redisques.util.RedisquesAPI; @@ -63,7 +65,7 @@ public class RedisquesHttpRequestHandler implements Handler { private static final String EMPTY_QUEUES_PARAM = "emptyQueues"; private static final String DELETED = "deleted"; - /** @deprecated about obsolete date formats */ + /** @deprecated about obsolete date formats */ @Deprecated private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss"); @@ -72,16 +74,21 @@ public class RedisquesHttpRequestHandler implements Handler { private final boolean enableQueueNameDecoding; private final int queueSpeedIntervalSec; private final QueueStatisticsCollector queueStatisticsCollector; + private final RedisQuesExceptionFactory exceptionFactory; private final QueueStatsService queueStatsService; private final GetQueueStatsMentor queueStatsMentor = new MyQueueStatsMentor(); - public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector, - DequeueStatisticCollector dequeueStatisticCollector, Semaphore queueStatsRequestLimit) { + public static void init( + Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector, + DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory exceptionFactory, + Semaphore queueStatsRequestQuota + ) { log.info("Enabling http request handler: {}", modConfig.getHttpRequestHandlerEnabled()); if (modConfig.getHttpRequestHandlerEnabled()) { if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) { - RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, - queueStatisticsCollector, dequeueStatisticCollector, queueStatsRequestLimit); + var handler = new RedisquesHttpRequestHandler( + vertx, modConfig, queueStatisticsCollector, dequeueStatisticCollector, + exceptionFactory, queueStatsRequestQuota); // in Vert.x 2x 100-continues was activated per default, in vert.x 3x it is off per default. HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true); vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> { @@ -110,8 +117,14 @@ private Result checkHttpAuthenticationConfiguration(RedisquesCo return Result.ok(false); } - private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector, - DequeueStatisticCollector dequeueStatisticCollector, Semaphore queueStatsRequestLimit) { + private RedisquesHttpRequestHandler( + Vertx vertx, + RedisquesConfiguration modConfig, + QueueStatisticsCollector queueStatisticsCollector, + DequeueStatisticCollector dequeueStatisticCollector, + RedisQuesExceptionFactory exceptionFactory, + Semaphore queueStatsRequestQuota + ) { this.vertx = vertx; this.router = Router.router(vertx); this.eventBus = vertx.eventBus(); @@ -120,8 +133,10 @@ private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfi this.enableQueueNameDecoding = modConfig.getEnableQueueNameDecoding(); this.queueSpeedIntervalSec = modConfig.getQueueSpeedIntervalSec(); this.queueStatisticsCollector = queueStatisticsCollector; - this.queueStatsService = new QueueStatsService(vertx, eventBus, redisquesAddress, - queueStatisticsCollector, dequeueStatisticCollector, queueStatsRequestLimit); + this.exceptionFactory = exceptionFactory; + this.queueStatsService = new QueueStatsService( + vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector, + exceptionFactory, queueStatsRequestQuota); final String prefix = modConfig.getHttpRequestHandlerPrefix(); @@ -590,11 +605,12 @@ void resumeJsonWriting() { @Override public void onError(Throwable ex, RoutingContext ctx) { if (!ctx.response().headWritten()) { - log.debug("Failed to serve queue stats", ex); + log.debug("TODO error handling {}", ctx.request().uri(), exceptionFactory.newException( + "Failed to serve queue stats", ex)); StatusCode rspCode = tryExtractStatusCode(ex, StatusCode.INTERNAL_SERVER_ERROR); respondWith(rspCode, ex.getMessage(), ctx.request()); } else { - log.warn("Response already written. MUST let it run into timeout now (error_qykCAJ8aAgCSfAIA1kMC): {}", ctx.request().uri(), ex); + log.warn("TODO error handling {}", ctx.request().uri(), exceptionFactory.newException(ex)); } } } diff --git a/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java b/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java index 1b89783..a275de9 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java +++ b/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java @@ -3,7 +3,7 @@ import io.vertx.core.Vertx; import org.slf4j.Logger; import org.swisspush.redisques.action.*; -import org.swisspush.redisques.exception.ExceptionFactory; +import org.swisspush.redisques.exception.RedisQuesExceptionFactory; import java.util.List; import java.util.concurrent.Semaphore; @@ -22,17 +22,24 @@ public class QueueActionFactory { private final QueueStatisticsCollector queueStatisticsCollector; private final int memoryUsageLimitPercent; private final MemoryUsageProvider memoryUsageProvider; - private final ExceptionFactory exceptionFactory; + private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore getQueuesItemsCountRedisRequestQuota; private final RedisquesConfigurationProvider configurationProvider; - public QueueActionFactory(RedisProvider redisProvider, Vertx vertx, Logger log, - String queuesKey, String queuesPrefix, String consumersPrefix, - String locksKey, QueueStatisticsCollector queueStatisticsCollector, MemoryUsageProvider memoryUsageProvider, - RedisquesConfigurationProvider configurationProvider, - ExceptionFactory exceptionFactory, - Semaphore getQueuesItemsCountRedisRequestQuota + public QueueActionFactory( + RedisProvider redisProvider, + Vertx vertx, + Logger log, + String queuesKey, + String queuesPrefix, + String consumersPrefix, + String locksKey, + MemoryUsageProvider memoryUsageProvider, + QueueStatisticsCollector queueStatisticsCollector, + RedisQuesExceptionFactory exceptionFactory, + RedisquesConfigurationProvider configurationProvider, + Semaphore getQueuesItemsCountRedisRequestQuota ) { this.redisProvider = redisProvider; this.vertx = vertx; @@ -41,14 +48,13 @@ public QueueActionFactory(RedisProvider redisProvider, Vertx vertx, Logger log, this.queuesPrefix = queuesPrefix; this.consumersPrefix = consumersPrefix; this.locksKey = locksKey; - this.queueStatisticsCollector = queueStatisticsCollector; this.memoryUsageProvider = memoryUsageProvider; + this.queueStatisticsCollector = queueStatisticsCollector; + this.exceptionFactory = exceptionFactory; this.configurationProvider = configurationProvider; - this.address = configurationProvider.configuration().getAddress(); this.queueConfigurations = configurationProvider.configuration().getQueueConfigurations(); this.memoryUsageLimitPercent = configurationProvider.configuration().getMemoryUsageLimitPercent(); - this.exceptionFactory = exceptionFactory; this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota; } @@ -86,7 +92,8 @@ public QueueAction buildQueueAction(RedisquesAPI.QueueOperation queueOperation){ consumersPrefix, locksKey, queueConfigurations, queueStatisticsCollector, log); case getQueuesItemsCount: return new GetQueuesItemsCountAction(vertx, redisProvider, address, queuesKey, queuesPrefix, - consumersPrefix, locksKey, queueConfigurations, exceptionFactory, getQueuesItemsCountRedisRequestQuota, queueStatisticsCollector, log); + consumersPrefix, locksKey, queueConfigurations, exceptionFactory, + getQueuesItemsCountRedisRequestQuota, queueStatisticsCollector, log); case enqueue: return new EnqueueAction(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, queueStatisticsCollector, log, memoryUsageProvider, diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index 3d8ee6b..d5b7685 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -15,6 +15,7 @@ import io.vertx.redis.client.impl.types.NumberType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.swisspush.redisques.exception.RedisQuesExceptionFactory; import org.swisspush.redisques.exception.NoStacktraceException; import org.swisspush.redisques.performance.UpperBoundParallel; @@ -73,6 +74,7 @@ public class QueueStatisticsCollector { private final RedisProvider redisProvider; private final String queuePrefix; private final Vertx vertx; + private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore redisRequestQuota; private final UpperBoundParallel upperBoundParallel; @@ -80,12 +82,14 @@ public QueueStatisticsCollector( RedisProvider redisProvider, String queuePrefix, Vertx vertx, + RedisQuesExceptionFactory exceptionFactory, Semaphore redisRequestQuota, int speedIntervalSec ) { this.redisProvider = redisProvider; this.queuePrefix = queuePrefix; this.vertx = vertx; + this.exceptionFactory = exceptionFactory; this.redisRequestQuota = redisRequestQuota; this.upperBoundParallel = new UpperBoundParallel(vertx); speedStatisticsScheduler(speedIntervalSec); @@ -362,18 +366,18 @@ private void updateStatisticsInRedis(String queueName, BiConsumer { redisAPI.hset(List.of(STATSKEY, queueName, obj.toString()), ev -> { - onDone.accept(ev.failed() ? new NoStacktraceException("TODO_Wn0CANwoAgAZDwIA20gC error handling", ev.cause()) : null, null); + onDone.accept(ev.failed() ? exceptionFactory.newException("redisAPI.hset() failed", ev.cause()) : null, null); }); }) - .onFailure(ex -> onDone.accept(new NoStacktraceException("TODO_H30CACQ6AgAUWwIAoCYC error handling", ex), null)); + .onFailure(ex -> onDone.accept(exceptionFactory.newException("redisProvider.redis() failed", ex), null)); } else { redisProvider.redis() .onSuccess(redisAPI -> { redisAPI.hdel(List.of(STATSKEY, queueName), ev -> { - onDone.accept(ev.failed() ? new NoStacktraceException("TODO_Vn4CACQIAgDeLAIAUyEC error handling", ev.cause()) : null, null); + onDone.accept(ev.failed() ? exceptionFactory.newException("redisAPI.hdel() failed", ev.cause()) : null, null); }); }) - .onFailure(ex -> onDone.accept(new NoStacktraceException("TODO_Kn4CABJoAgBvaQIA7QQC error handling", ex), null)); + .onFailure(ex -> onDone.accept(exceptionFactory.newException("redisProvider.redis() failed", ex), null)); } } catch (RuntimeException ex) { onDone.accept(ex, null);