Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolve #1645 | stop subscription immediately even when there are pen… #1755

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

pkasperowicz
Copy link

…ding retries

szczygiel-m
szczygiel-m previously approved these changes Oct 19, 2023
@@ -116,6 +116,7 @@ private void process(Signal signal) {
case STOP:
logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType());
this.running = false;
stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkasperowicz are you sure we need this ? The stop() method is actually called in a finally block in the run() method. The main try block in the run() method is stopped when running.flag is set to false.

Copy link
Author

@pkasperowicz pkasperowicz Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the stop signal is received, we need to inform the consumer to stop consuming events.
stop() invokes consumer.tearDown() and it sets BatchConsumer.consuming = false.
It informs Retryer to stop retrying messages. Without that we need to wait for all reties to be finished and it is blocking that way the main loop.

Alternative approach might be extending the API of BatchConsumer to receive stop signal and mark it as some boolean e.g. stopSignalReceived. This flag would be informative for Retryer to stop the process of retries.
What do you think?

Copy link
Contributor

@faderskd faderskd Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I see it will work, can we avoid duplicated calling stop() method ? For example we can write method like this:

    private void stopWithExceptionHandling() {
        if (!running) {
            return;
        }
        try {
            stop();
        } catch (Exception exceptionWhileStopping) {
            logger.error("An error occurred while stopping consumer process of subscription {}",
                    getSubscriptionName(), exceptionWhileStopping);
        } finally {
            onConsumerStopped.accept(getSubscriptionName());
            Thread.currentThread().setName("consumer-released-thread");
        }
    }
``` and use it in both `finally` block of run method and in `process` method:
               

...
case STOP:
      logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType());
      stopWithExceptionHandling();
      this.running = false;
      break;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@faderskd ok changed it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkasperowicz thank you, +1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants