Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolve #1645 | stop subscription immediately even when there are pen… #1755

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private Retryer<MessageSendingResult> createRetryer(final MessageBatch batch,
.retryIfResult(result -> consuming && !result.succeeded() && shouldRetryOnClientError(retryClientErrors, result))
.withWaitStrategy(fixedWait(messageBackoff, MILLISECONDS))
.withStopStrategy(attempt -> attempt.getDelaySinceFirstAttempt() > messageTtlMillis
|| Thread.currentThread().isInterrupted())
|| Thread.currentThread().isInterrupted() || !consuming)
.withRetryListener(getRetryListener(result -> {
batch.incrementRetryCounter();
markSendingResult(batch, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,7 @@ public void run() {
} finally {
logger.info("Releasing consumer process thread of subscription {}", getSubscriptionName());
refreshHealthcheck();
try {
stop();
} catch (Exception exceptionWhileStopping) {
logger.error("An error occurred while stopping consumer process of subscription {}",
getSubscriptionName(), exceptionWhileStopping);
} finally {
onConsumerStopped.accept(getSubscriptionName());
Thread.currentThread().setName("consumer-released-thread");
}
stop();
}
}

Expand Down Expand Up @@ -115,7 +107,7 @@ private void process(Signal signal) {
break;
case STOP:
logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType());
this.running = false;
stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkasperowicz are you sure we need this ? The stop() method is actually called in a finally block in the run() method. The main try block in the run() method is stopped when running.flag is set to false.

Copy link
Author

@pkasperowicz pkasperowicz Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the stop signal is received, we need to inform the consumer to stop consuming events.
stop() invokes consumer.tearDown() and it sets BatchConsumer.consuming = false.
It informs Retryer to stop retrying messages. Without that we need to wait for all reties to be finished and it is blocking that way the main loop.

Alternative approach might be extending the API of BatchConsumer to receive stop signal and mark it as some boolean e.g. stopSignalReceived. This flag would be informative for Retryer to stop the process of retries.
What do you think?

Copy link
Contributor

@faderskd faderskd Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I see it will work, can we avoid duplicated calling stop() method ? For example we can write method like this:

    private void stopWithExceptionHandling() {
        if (!running) {
            return;
        }
        try {
            stop();
        } catch (Exception exceptionWhileStopping) {
            logger.error("An error occurred while stopping consumer process of subscription {}",
                    getSubscriptionName(), exceptionWhileStopping);
        } finally {
            onConsumerStopped.accept(getSubscriptionName());
            Thread.currentThread().setName("consumer-released-thread");
        }
    }
``` and use it in both `finally` block of run method and in `process` method:
               

...
case STOP:
      logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType());
      stopWithExceptionHandling();
      this.running = false;
      break;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@faderskd ok changed it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkasperowicz thank you, +1

break;
case RETRANSMIT:
retransmit(signal);
Expand Down Expand Up @@ -153,12 +145,24 @@ private void start(Signal signal) {
}

private void stop() {
long startTime = clock.millis();
logger.info("Stopping consumer for subscription {}", getSubscriptionName());
if (!running) {
return;
}
this.running = false;
try {
long startTime = clock.millis();
logger.info("Stopping consumer for subscription {}", getSubscriptionName());

consumer.tearDown();
consumer.tearDown();

logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime);
logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime);
} catch (Exception exceptionWhileStopping) {
logger.error("An error occurred while stopping consumer process of subscription {}",
getSubscriptionName(), exceptionWhileStopping);
} finally {
onConsumerStopped.accept(getSubscriptionName());
Thread.currentThread().setName("consumer-released-thread");
}
}

private void retransmit(Signal signal) {
Expand Down
Loading