From dabdcfd98739d469c6e612536930beb4d75ddd4d Mon Sep 17 00:00:00 2001 From: Beno!t POLASZEK Date: Wed, 13 Mar 2024 15:50:17 +0100 Subject: [PATCH 1/3] fix: upgrade to redis-react ^3@dev and try to fix connection outages --- composer.json | 2 +- src/DependencyInjection/services.php | 4 -- src/Hub/Transport/Redis/RedisTransport.php | 24 +++++-- .../Transport/Redis/RedisTransportFactory.php | 7 +- .../Hub/Transport/Redis/ConnectionStub.php | 65 +++++++++++++++++++ .../Hub/Transport/Redis/RedisClientStub.php | 30 +++++---- .../Redis/RedisTransportFactoryTest.php | 16 ++--- 7 files changed, 113 insertions(+), 35 deletions(-) create mode 100755 tests/Unit/Hub/Transport/Redis/ConnectionStub.php diff --git a/composer.json b/composer.json index e0ae120..f79f46f 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,7 @@ "ext-iconv": "*", "bentools/querystring": "^1.1", "clue/framework-x": "dev-main#cfe017426d8acc7a92b5893a501728e034a873cc", - "clue/redis-react": "^2.5", + "clue/redis-react": "^3@dev", "doctrine/annotations": "^1.0", "lcobucci/jwt": "^4.1", "nyholm/dsn": "^2.0", diff --git a/src/DependencyInjection/services.php b/src/DependencyInjection/services.php index 3287727..f7f0bab 100644 --- a/src/DependencyInjection/services.php +++ b/src/DependencyInjection/services.php @@ -4,7 +4,6 @@ namespace Freddie\DependencyInjection; -use Clue\React\Redis\Factory; use DateTimeZone; use Evenement\EventEmitter; use Evenement\EventEmitterInterface; @@ -121,9 +120,6 @@ $services ->alias(EventEmitterInterface::class, EventEmitter::class); - $services - ->set(Factory::class); - $services ->set(Configuration::class) ->factory(service(ConfigurationFactory::class)) diff --git a/src/Hub/Transport/Redis/RedisTransport.php b/src/Hub/Transport/Redis/RedisTransport.php index a9cb864..a76818b 100644 --- a/src/Hub/Transport/Redis/RedisTransport.php +++ b/src/Hub/Transport/Redis/RedisTransport.php @@ -4,7 +4,8 @@ namespace Freddie\Hub\Transport\Redis; -use Clue\React\Redis\Client; +use Clue\React\Redis\Io\StreamingClient; +use Clue\React\Redis\RedisClient; use Evenement\EventEmitter; use Evenement\EventEmitterInterface; use Freddie\Hub\Hub; @@ -13,6 +14,8 @@ use Generator; use React\EventLoop\Loop; use React\Promise\PromiseInterface; +use ReflectionMethod; +use RuntimeException; use Symfony\Component\OptionsResolver\OptionsResolver; use function Freddie\maybeTimeout; @@ -31,8 +34,8 @@ final class RedisTransport implements TransportInterface * @param array $options */ public function __construct( - public readonly Client $subscriber, - public readonly Client $redis, + public readonly RedisClient $subscriber, + public readonly RedisClient $redis, private readonly RedisSerializer $serializer = new RedisSerializer(), private readonly EventEmitterInterface $eventEmitter = new EventEmitter(), array $options = [], @@ -48,17 +51,18 @@ public function __construct( ]); $this->options = $resolver->resolve($options); if ($this->options['pingInterval']) { - Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping()); + Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping($this->subscriber)); + Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping($this->redis)); } } /** * @codeCoverageIgnore */ - private function ping(): void + private function ping(RedisClient $client): void { /** @var PromiseInterface $ping */ - $ping = $this->redis->ping(); // @phpstan-ignore-line + $ping = $client->ping(); // @phpstan-ignore-line $ping = maybeTimeout($ping, $this->options['readTimeout']); $ping->then( onRejected: Hub::die(...), @@ -67,7 +71,15 @@ private function ping(): void public function subscribe(callable $callback): void { + static $methodToInvade; + $methodToInvade ??= new ReflectionMethod($this->subscriber, 'client'); $this->init(); + /** @var PromiseInterface $promise */ + $promise = $methodToInvade->invoke($this->subscriber); + $promise->then(function (StreamingClient $redis) { + $e = new RuntimeException('Redis connection was unexpectedly closed.'); + $redis->on('close', fn () => Hub::die($e)); + }); $this->eventEmitter->on('mercureUpdate', $callback); } diff --git a/src/Hub/Transport/Redis/RedisTransportFactory.php b/src/Hub/Transport/Redis/RedisTransportFactory.php index 1b63362..a335c53 100644 --- a/src/Hub/Transport/Redis/RedisTransportFactory.php +++ b/src/Hub/Transport/Redis/RedisTransportFactory.php @@ -4,9 +4,9 @@ namespace Freddie\Hub\Transport\Redis; +use Clue\React\Redis\RedisClient; use Freddie\Hub\Transport\TransportFactoryInterface; use Freddie\Hub\Transport\TransportInterface; -use Clue\React\Redis\Factory; use Nyholm\Dsn\DsnParser; use function max; @@ -14,7 +14,6 @@ final class RedisTransportFactory implements TransportFactoryInterface { public function __construct( - private Factory $factory = new Factory(), private RedisSerializer $serializer = new RedisSerializer(), ) { } @@ -28,8 +27,8 @@ public function supports(string $dsn): bool public function create(string $dsn): TransportInterface { $parsed = DsnParser::parse($dsn); - $redis = $this->factory->createLazyClient($dsn); - $subscriber = $this->factory->createLazyClient($dsn); // Create a 2nd, blocking connection to receive updates + $redis = $subscriber = new RedisClient($dsn); + $subscriber = new RedisClient($dsn); // Create a 2nd, blocking connection to receive updates return new RedisTransport( $subscriber, diff --git a/tests/Unit/Hub/Transport/Redis/ConnectionStub.php b/tests/Unit/Hub/Transport/Redis/ConnectionStub.php new file mode 100755 index 0000000..4edd535 --- /dev/null +++ b/tests/Unit/Hub/Transport/Redis/ConnectionStub.php @@ -0,0 +1,65 @@ +received .= $data; + + return true; + } + + public function end($data = null): void + { + } +} diff --git a/tests/Unit/Hub/Transport/Redis/RedisClientStub.php b/tests/Unit/Hub/Transport/Redis/RedisClientStub.php index 1485c00..d4c4478 100644 --- a/tests/Unit/Hub/Transport/Redis/RedisClientStub.php +++ b/tests/Unit/Hub/Transport/Redis/RedisClientStub.php @@ -5,11 +5,12 @@ namespace Freddie\Tests\Unit\Hub\Transport\Redis; use ArrayObject; -use Clue\React\Redis\Client; +use Clue\React\Redis\RedisClient; use Evenement\EventEmitter; use Evenement\EventEmitterInterface; use Pest\Exceptions\ShouldNotHappen; use React\Promise\PromiseInterface; +use React\Socket\ConnectorInterface; use function abs; use function array_splice; @@ -17,14 +18,21 @@ use function React\Async\async; use function React\Promise\resolve; -final class RedisClientStub implements Client +final class RedisClientStub extends RedisClient { public array $subscribedChannels = []; public function __construct( - public readonly ArrayObject $storage = new ArrayObject(), - private EventEmitterInterface $eventEmitter = new EventEmitter(), + public ArrayObject $storage = new ArrayObject(), + private readonly EventEmitterInterface $eventEmitter = new EventEmitter(), ) { + $connectorStub = new class implements ConnectorInterface { + public function connect($uri) + { + return resolve(new ConnectionStub()); + } + }; + parent::__construct('redis://127.0.0.1', $connectorStub); } public function subscribe(string $channel): void @@ -68,7 +76,7 @@ public function ltrim(string $key, int $from, int $to) ->then(fn (array $items) => $this->storage[$key] = $items); } - public function __call($name, $args) + public function __call($name, $args): PromiseInterface { throw new ShouldNotHappen(new \LogicException(__METHOD__)); } @@ -83,32 +91,32 @@ public function close(): void throw new ShouldNotHappen(new \LogicException(__METHOD__)); } - public function on($event, callable $listener) + public function on($event, callable $listener): void { $this->eventEmitter->on(...\func_get_args()); } - public function once($event, callable $listener) + public function once($event, callable $listener): void { $this->eventEmitter->once(...\func_get_args()); } - public function removeListener($event, callable $listener) + public function removeListener($event, callable $listener): void { $this->eventEmitter->removeListener(...\func_get_args()); } - public function removeAllListeners($event = null) + public function removeAllListeners($event = null): void { $this->eventEmitter->removeAllListeners(...\func_get_args()); } - public function listeners($event = null) + public function listeners($event = null): array { return $this->eventEmitter->listeners(...\func_get_args()); } - public function emit($event, array $arguments = []) + public function emit($event, array $arguments = []): void { $this->eventEmitter->emit(...\func_get_args()); } diff --git a/tests/Unit/Hub/Transport/Redis/RedisTransportFactoryTest.php b/tests/Unit/Hub/Transport/Redis/RedisTransportFactoryTest.php index 8ca3a65..26e68a9 100644 --- a/tests/Unit/Hub/Transport/Redis/RedisTransportFactoryTest.php +++ b/tests/Unit/Hub/Transport/Redis/RedisTransportFactoryTest.php @@ -4,10 +4,9 @@ namespace Freddie\Tests\Unit\Hub\Transport\Redis; +use Clue\React\Redis\RedisClient; use Freddie\Hub\Transport\Redis\RedisTransport; use Freddie\Hub\Transport\Redis\RedisTransportFactory; -use Clue\React\Redis\Client; -use Clue\React\Redis\Factory; it('supports Redis DSNs', function (string $dsn, bool $expected) { $factory = new RedisTransportFactory(); @@ -22,14 +21,13 @@ $factory = new RedisTransportFactory(); expect($factory->create($dsn))->toEqual($expected); })->with(function () { - $redisFactory = new Factory(); yield ['redis://localhost?foo=bar', new RedisTransport( - $redisFactory->createLazyClient('redis://localhost?foo=bar'), - $redisFactory->createLazyClient('redis://localhost?foo=bar'), + new RedisClient('redis://localhost?foo=bar'), + new RedisClient('redis://localhost?foo=bar'), )]; yield ['redis://localhost?size=1000&trimInterval=2.5', new RedisTransport( - $redisFactory->createLazyClient('redis://localhost?size=1000&trimInterval=2.5'), - $redisFactory->createLazyClient('redis://localhost?size=1000&trimInterval=2.5'), + new RedisClient('redis://localhost?size=1000&trimInterval=2.5'), + new RedisClient('redis://localhost?size=1000&trimInterval=2.5'), options: ['size' => 1000, 'trimInterval' => 2.5], )]; }); @@ -38,7 +36,7 @@ $factory = new RedisTransportFactory(); /** @var RedisTransport $transport */ $transport = $factory->create('redis://localhost?size=1000'); - expect($transport->redis)->toBeInstanceOf(Client::class); - expect($transport->subscriber)->toBeInstanceOf(Client::class); + expect($transport->redis)->toBeInstanceOf(RedisClient::class); + expect($transport->subscriber)->toBeInstanceOf(RedisClient::class); expect($transport->redis)->not()->toBe($transport->subscriber); }); From 3c36bfb75cb8f4ecfdda6f22966fd874d1b43558 Mon Sep 17 00:00:00 2001 From: Beno!t POLASZEK Date: Wed, 13 Mar 2024 15:53:25 +0100 Subject: [PATCH 2/3] style: fix --- src/Hub/Transport/Redis/RedisTransport.php | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Hub/Transport/Redis/RedisTransport.php b/src/Hub/Transport/Redis/RedisTransport.php index a76818b..fbf51e7 100644 --- a/src/Hub/Transport/Redis/RedisTransport.php +++ b/src/Hub/Transport/Redis/RedisTransport.php @@ -77,8 +77,9 @@ public function subscribe(callable $callback): void /** @var PromiseInterface $promise */ $promise = $methodToInvade->invoke($this->subscriber); $promise->then(function (StreamingClient $redis) { - $e = new RuntimeException('Redis connection was unexpectedly closed.'); - $redis->on('close', fn () => Hub::die($e)); + $redis->on('close', fn () => Hub::die( + new RuntimeException('Redis connection was unexpectedly closed.') + )); }); $this->eventEmitter->on('mercureUpdate', $callback); } From e25fe650ee326c106e2e99652282eef40ea2492a Mon Sep 17 00:00:00 2001 From: Beno!t POLASZEK Date: Wed, 13 Mar 2024 16:07:35 +0100 Subject: [PATCH 3/3] fix: duplicate var --- src/Hub/Transport/Redis/RedisTransportFactory.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Hub/Transport/Redis/RedisTransportFactory.php b/src/Hub/Transport/Redis/RedisTransportFactory.php index a335c53..693235b 100644 --- a/src/Hub/Transport/Redis/RedisTransportFactory.php +++ b/src/Hub/Transport/Redis/RedisTransportFactory.php @@ -27,7 +27,7 @@ public function supports(string $dsn): bool public function create(string $dsn): TransportInterface { $parsed = DsnParser::parse($dsn); - $redis = $subscriber = new RedisClient($dsn); + $redis = new RedisClient($dsn); $subscriber = new RedisClient($dsn); // Create a 2nd, blocking connection to receive updates return new RedisTransport(