From 96c3fd2f1b7cbb6831c026e5ba69a77f522fb6c1 Mon Sep 17 00:00:00 2001 From: "pawel.kasperowicz" Date: Wed, 18 Oct 2023 15:50:57 +0200 Subject: [PATCH 1/2] resolve #1645 | stop subscription immediately even when there are pending retries --- .../allegro/tech/hermes/consumers/consumer/BatchConsumer.java | 2 +- .../hermes/consumers/supervisor/process/ConsumerProcess.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java index 9ef73b023b..6d11747246 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java @@ -220,7 +220,7 @@ private Retryer createRetryer(final MessageBatch batch, .retryIfResult(result -> consuming && !result.succeeded() && shouldRetryOnClientError(retryClientErrors, result)) .withWaitStrategy(fixedWait(messageBackoff, MILLISECONDS)) .withStopStrategy(attempt -> attempt.getDelaySinceFirstAttempt() > messageTtlMillis - || Thread.currentThread().isInterrupted()) + || Thread.currentThread().isInterrupted() || !consuming) .withRetryListener(getRetryListener(result -> { batch.incrementRetryCounter(); markSendingResult(batch, result); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java index 576ff01b9e..d442972715 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java @@ -116,6 +116,7 @@ private void process(Signal signal) { case STOP: logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType()); this.running = false; + stop(); break; case RETRANSMIT: retransmit(signal); From f3e814b71f67c9dc3baf68e8575eca07237472c8 Mon Sep 17 00:00:00 2001 From: "pawel.kasperowicz" Date: Thu, 26 Oct 2023 16:54:07 +0200 Subject: [PATCH 2/2] resolve #1645 | refactor stop method not to be called twice when stopping the subscription --- .../supervisor/process/ConsumerProcess.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java index d442972715..5af2339de2 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java @@ -68,15 +68,7 @@ public void run() { } finally { logger.info("Releasing consumer process thread of subscription {}", getSubscriptionName()); refreshHealthcheck(); - try { - stop(); - } catch (Exception exceptionWhileStopping) { - logger.error("An error occurred while stopping consumer process of subscription {}", - getSubscriptionName(), exceptionWhileStopping); - } finally { - onConsumerStopped.accept(getSubscriptionName()); - Thread.currentThread().setName("consumer-released-thread"); - } + stop(); } } @@ -115,7 +107,6 @@ private void process(Signal signal) { break; case STOP: logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType()); - this.running = false; stop(); break; case RETRANSMIT: @@ -154,12 +145,24 @@ private void start(Signal signal) { } private void stop() { - long startTime = clock.millis(); - logger.info("Stopping consumer for subscription {}", getSubscriptionName()); + if (!running) { + return; + } + this.running = false; + try { + long startTime = clock.millis(); + logger.info("Stopping consumer for subscription {}", getSubscriptionName()); - consumer.tearDown(); + consumer.tearDown(); - logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime); + logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime); + } catch (Exception exceptionWhileStopping) { + logger.error("An error occurred while stopping consumer process of subscription {}", + getSubscriptionName(), exceptionWhileStopping); + } finally { + onConsumerStopped.accept(getSubscriptionName()); + Thread.currentThread().setName("consumer-released-thread"); + } } private void retransmit(Signal signal) {