Skip to content

Commit

Permalink
Stop using exception flow control to stop consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
simPod committed Jan 25, 2021
1 parent 7186416 commit da2224f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 46 deletions.
62 changes: 30 additions & 32 deletions src/Clients/Consumer/KafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use RdKafka\Message;
use RdKafka\TopicPartition;
use SimPod\Kafka\Clients\Consumer\Exception\IncompatibleStatus;
use SimPod\Kafka\Common\Exception\Wakeup;

use function array_map;
use function rd_kafka_err2str;
Expand All @@ -30,6 +29,8 @@ final class KafkaConsumer extends RdKafkaConsumer

private LoggerInterface $logger;

private bool $shouldRun = true;

public function __construct(ConsumerConfig $config, ?LoggerInterface $logger = null)
{
$this->logger = $logger ?? new NullLogger();
Expand Down Expand Up @@ -144,50 +145,47 @@ function (Message $message) use (
}

/**
* @param callable(Message) : void $onSuccess
* @param callable() : void $onPartitionEof
* @param callable() : void $onTimedOut
* @param callable(Message): void $onSuccess
* @param callable() : void $onPartitionEof
* @param callable() : void $onTimedOut
*/
private function doStart(
int $timeoutMs,
callable $onSuccess,
?callable $onPartitionEof = null,
?callable $onTimedOut = null
): void {
$this->registerSignals();
$this->registerSignals($this->shouldRun);

try {
while (true) {
pcntl_signal_dispatch();
while ($this->shouldRun) {
$message = $this->consume($timeoutMs);

$message = $this->consume($timeoutMs);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$onSuccess($message);

switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$onSuccess($message);

break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
if ($onPartitionEof !== null) {
$onPartitionEof();
}
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
if ($onPartitionEof !== null) {
$onPartitionEof();
}

$this->logger->info('No more messages. Will wait for more');
$this->logger->info('No more messages. Will wait for more');

break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
$this->logger->info(sprintf('Timed out with timeout %d ms', $timeoutMs));
if ($onTimedOut !== null) {
$onTimedOut();
}
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
$this->logger->info(sprintf('Timed out with timeout %d ms', $timeoutMs));
if ($onTimedOut !== null) {
$onTimedOut();
}

break;
default:
$exception = IncompatibleStatus::fromMessage($message);
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
}
break;
default:
$exception = IncompatibleStatus::fromMessage($message);
$this->logger->error($exception->getMessage(), ['exception' => $exception]);
}
} catch (Wakeup $wakeup) {

pcntl_signal_dispatch();
}

$this->degisterSignals();
Expand Down Expand Up @@ -229,6 +227,6 @@ public function shutdown(): void
{
$this->logger->info('Shutting down');

throw new Wakeup();
$this->shouldRun = false;
}
}
8 changes: 3 additions & 5 deletions src/Clients/Consumer/WithSignalControl.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

namespace SimPod\Kafka\Clients\Consumer;

use SimPod\Kafka\Common\Exception\Wakeup;

use function pcntl_signal;
use function Safe\pcntl_sigprocmask;

Expand All @@ -24,10 +22,10 @@ private function setupInternalTerminationSignal(ConsumerConfig $config): void
$config->set('internal.termination.signal', SIGIO);
}

private function registerSignals(): void
private function registerSignals(bool &$shouldRun): void
{
$terminationCallback = static function (): void {
throw new Wakeup();
$terminationCallback = static function () use (&$shouldRun): void {
$shouldRun = false;
};

pcntl_signal(SIGTERM, $terminationCallback);
Expand Down
9 changes: 0 additions & 9 deletions src/Common/Exception/Wakeup.php

This file was deleted.

0 comments on commit da2224f

Please sign in to comment.