From da2224f621039e6ef1cfd6244990c4d98094c899 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 23 Jan 2021 15:32:55 +0100 Subject: [PATCH] Stop using exception flow control to stop consumer --- src/Clients/Consumer/KafkaConsumer.php | 62 +++++++++++----------- src/Clients/Consumer/WithSignalControl.php | 8 ++- src/Common/Exception/Wakeup.php | 9 ---- 3 files changed, 33 insertions(+), 46 deletions(-) delete mode 100644 src/Common/Exception/Wakeup.php diff --git a/src/Clients/Consumer/KafkaConsumer.php b/src/Clients/Consumer/KafkaConsumer.php index cc64614..9dae147 100644 --- a/src/Clients/Consumer/KafkaConsumer.php +++ b/src/Clients/Consumer/KafkaConsumer.php @@ -11,7 +11,6 @@ use RdKafka\Message; use RdKafka\TopicPartition; use SimPod\Kafka\Clients\Consumer\Exception\IncompatibleStatus; -use SimPod\Kafka\Common\Exception\Wakeup; use function array_map; use function rd_kafka_err2str; @@ -30,6 +29,8 @@ final class KafkaConsumer extends RdKafkaConsumer private LoggerInterface $logger; + private bool $shouldRun = true; + public function __construct(ConsumerConfig $config, ?LoggerInterface $logger = null) { $this->logger = $logger ?? new NullLogger(); @@ -144,9 +145,9 @@ function (Message $message) use ( } /** - * @param callable(Message) : void $onSuccess - * @param callable() : void $onPartitionEof - * @param callable() : void $onTimedOut + * @param callable(Message): void $onSuccess + * @param callable() : void $onPartitionEof + * @param callable() : void $onTimedOut */ private function doStart( int $timeoutMs, @@ -154,40 +155,37 @@ private function doStart( ?callable $onPartitionEof = null, ?callable $onTimedOut = null ): void { - $this->registerSignals(); + $this->registerSignals($this->shouldRun); - try { - while (true) { - pcntl_signal_dispatch(); + while ($this->shouldRun) { + $message = $this->consume($timeoutMs); - $message = $this->consume($timeoutMs); + switch ($message->err) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + $onSuccess($message); - switch ($message->err) { - case RD_KAFKA_RESP_ERR_NO_ERROR: - $onSuccess($message); - - break; - case RD_KAFKA_RESP_ERR__PARTITION_EOF: - if ($onPartitionEof !== null) { - $onPartitionEof(); - } + break; + case RD_KAFKA_RESP_ERR__PARTITION_EOF: + if ($onPartitionEof !== null) { + $onPartitionEof(); + } - $this->logger->info('No more messages. Will wait for more'); + $this->logger->info('No more messages. Will wait for more'); - break; - case RD_KAFKA_RESP_ERR__TIMED_OUT: - $this->logger->info(sprintf('Timed out with timeout %d ms', $timeoutMs)); - if ($onTimedOut !== null) { - $onTimedOut(); - } + break; + case RD_KAFKA_RESP_ERR__TIMED_OUT: + $this->logger->info(sprintf('Timed out with timeout %d ms', $timeoutMs)); + if ($onTimedOut !== null) { + $onTimedOut(); + } - break; - default: - $exception = IncompatibleStatus::fromMessage($message); - $this->logger->error($exception->getMessage(), ['exception' => $exception]); - } + break; + default: + $exception = IncompatibleStatus::fromMessage($message); + $this->logger->error($exception->getMessage(), ['exception' => $exception]); } - } catch (Wakeup $wakeup) { + + pcntl_signal_dispatch(); } $this->degisterSignals(); @@ -229,6 +227,6 @@ public function shutdown(): void { $this->logger->info('Shutting down'); - throw new Wakeup(); + $this->shouldRun = false; } } diff --git a/src/Clients/Consumer/WithSignalControl.php b/src/Clients/Consumer/WithSignalControl.php index 444908a..375656f 100644 --- a/src/Clients/Consumer/WithSignalControl.php +++ b/src/Clients/Consumer/WithSignalControl.php @@ -4,8 +4,6 @@ namespace SimPod\Kafka\Clients\Consumer; -use SimPod\Kafka\Common\Exception\Wakeup; - use function pcntl_signal; use function Safe\pcntl_sigprocmask; @@ -24,10 +22,10 @@ private function setupInternalTerminationSignal(ConsumerConfig $config): void $config->set('internal.termination.signal', SIGIO); } - private function registerSignals(): void + private function registerSignals(bool &$shouldRun): void { - $terminationCallback = static function (): void { - throw new Wakeup(); + $terminationCallback = static function () use (&$shouldRun): void { + $shouldRun = false; }; pcntl_signal(SIGTERM, $terminationCallback); diff --git a/src/Common/Exception/Wakeup.php b/src/Common/Exception/Wakeup.php deleted file mode 100644 index 1b7ebed..0000000 --- a/src/Common/Exception/Wakeup.php +++ /dev/null @@ -1,9 +0,0 @@ -