Skip to content

Commit

Permalink
fix(redis): regularly ping connection, kill hub whenever ping fails
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek committed Jul 25, 2023
1 parent ca020f2 commit c767431
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
matrix:
transport:
- "php://localhost?size=10000"
- "redis://localhost?size=10000&trimInterval=0.5"
- "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0"
steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
2 changes: 2 additions & 0 deletions src/Hub/Controller/PublishController.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ public function __invoke(ServerRequestInterface $request): ResponseInterface
throw new AccessDeniedHttpException('Your rights are not sufficient to publish this update.');
}

// @codeCoverageIgnoreStart
try {
await($this->hub->publish($update));
} catch (Throwable) {
throw new ServiceUnavailableHttpException();
}
// @codeCoverageIgnoreEnd

return new Response(201, body: (string) $update->message->id);
}
Expand Down
11 changes: 11 additions & 0 deletions src/Hub/Hub.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
use Throwable;

use function array_key_exists;
use function sprintf;
Expand Down Expand Up @@ -99,4 +100,14 @@ public function getOption(string $name): mixed

return $this->options[$name];
}

/**
* @codeCoverageIgnore
*/
public static function die(Throwable $e): never
{
Loop::stop();

throw $e;
}
}
18 changes: 18 additions & 0 deletions src/Hub/Transport/Redis/RedisTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
use Clue\React\Redis\Client;
use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
use Freddie\Hub\Hub;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Update;
use Generator;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
use Throwable;

use function React\Async\await;
use function React\Promise\resolve;
Expand Down Expand Up @@ -41,8 +43,24 @@ public function __construct(
'trimInterval' => 0.0,
'channel' => 'mercure',
'key' => 'mercureUpdates',
'pingInterval' => 2.0,
]);
$this->options = $resolver->resolve($options);
if ($this->options['pingInterval']) {
Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping());
}
}

/**
* @codeCoverageIgnore
*/
private function ping(): void
{
try {
await($this->redis->ping()); // @phpstan-ignore-line
} catch (Throwable) {
Hub::die(new \RuntimeException('Redis connection closed unexpectedly.'));
}
}

public function subscribe(callable $callback): void
Expand Down
1 change: 1 addition & 0 deletions src/Hub/Transport/Redis/RedisTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public function create(string $dsn): TransportInterface
options: [
'size' => (int) max(0, $parsed->getParameter('size', 0)),
'trimInterval' => (float) max(0, $parsed->getParameter('trimInterval', 0.0)),
'pingInterval' => (float) max(0, $parsed->getParameter('pingInterval', 2.0)),
'channel' => (string) $parsed->getParameter('channel', 'mercure'),
'key' => (string) $parsed->getParameter('key', 'mercureUpdates'),
],
Expand Down

0 comments on commit c767431

Please sign in to comment.