diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java index 9ef73b023b..6d11747246 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java @@ -220,7 +220,7 @@ private Retryer createRetryer(final MessageBatch batch, .retryIfResult(result -> consuming && !result.succeeded() && shouldRetryOnClientError(retryClientErrors, result)) .withWaitStrategy(fixedWait(messageBackoff, MILLISECONDS)) .withStopStrategy(attempt -> attempt.getDelaySinceFirstAttempt() > messageTtlMillis - || Thread.currentThread().isInterrupted()) + || Thread.currentThread().isInterrupted() || !consuming) .withRetryListener(getRetryListener(result -> { batch.incrementRetryCounter(); markSendingResult(batch, result); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java index 576ff01b9e..5af2339de2 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java @@ -68,15 +68,7 @@ public void run() { } finally { logger.info("Releasing consumer process thread of subscription {}", getSubscriptionName()); refreshHealthcheck(); - try { - stop(); - } catch (Exception exceptionWhileStopping) { - logger.error("An error occurred while stopping consumer process of subscription {}", - getSubscriptionName(), exceptionWhileStopping); - } finally { - onConsumerStopped.accept(getSubscriptionName()); - Thread.currentThread().setName("consumer-released-thread"); - } + stop(); } } @@ -115,7 +107,7 @@ private void process(Signal signal) { break; case STOP: logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType()); - this.running = false; + stop(); break; case RETRANSMIT: retransmit(signal); @@ -153,12 +145,24 @@ private void start(Signal signal) { } private void stop() { - long startTime = clock.millis(); - logger.info("Stopping consumer for subscription {}", getSubscriptionName()); + if (!running) { + return; + } + this.running = false; + try { + long startTime = clock.millis(); + logger.info("Stopping consumer for subscription {}", getSubscriptionName()); - consumer.tearDown(); + consumer.tearDown(); - logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime); + logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime); + } catch (Exception exceptionWhileStopping) { + logger.error("An error occurred while stopping consumer process of subscription {}", + getSubscriptionName(), exceptionWhileStopping); + } finally { + onConsumerStopped.accept(getSubscriptionName()); + Thread.currentThread().setName("consumer-released-thread"); + } } private void retransmit(Signal signal) {