Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Upgrade to redis-react ^3@dev and try to fix connection outages #28

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"ext-iconv": "*",
"bentools/querystring": "^1.1",
"clue/framework-x": "dev-main#cfe017426d8acc7a92b5893a501728e034a873cc",
"clue/redis-react": "^2.5",
"clue/redis-react": "^3@dev",
"doctrine/annotations": "^1.0",
"lcobucci/jwt": "^4.1",
"nyholm/dsn": "^2.0",
Expand Down
4 changes: 0 additions & 4 deletions src/DependencyInjection/services.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Freddie\DependencyInjection;

use Clue\React\Redis\Factory;
use DateTimeZone;
use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
Expand Down Expand Up @@ -121,9 +120,6 @@
$services
->alias(EventEmitterInterface::class, EventEmitter::class);

$services
->set(Factory::class);

$services
->set(Configuration::class)
->factory(service(ConfigurationFactory::class))
Expand Down
25 changes: 19 additions & 6 deletions src/Hub/Transport/Redis/RedisTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

namespace Freddie\Hub\Transport\Redis;

use Clue\React\Redis\Client;
use Clue\React\Redis\Io\StreamingClient;
use Clue\React\Redis\RedisClient;
use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
use Freddie\Hub\Hub;
Expand All @@ -13,6 +14,8 @@
use Generator;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use ReflectionMethod;
use RuntimeException;
use Symfony\Component\OptionsResolver\OptionsResolver;

use function Freddie\maybeTimeout;
Expand All @@ -31,8 +34,8 @@ final class RedisTransport implements TransportInterface
* @param array<string, mixed> $options
*/
public function __construct(
public readonly Client $subscriber,
public readonly Client $redis,
public readonly RedisClient $subscriber,
public readonly RedisClient $redis,
private readonly RedisSerializer $serializer = new RedisSerializer(),
private readonly EventEmitterInterface $eventEmitter = new EventEmitter(),
array $options = [],
Expand All @@ -48,17 +51,18 @@ public function __construct(
]);
$this->options = $resolver->resolve($options);
if ($this->options['pingInterval']) {
Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping());
Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping($this->subscriber));
Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping($this->redis));
}
}

/**
* @codeCoverageIgnore
*/
private function ping(): void
private function ping(RedisClient $client): void
{
/** @var PromiseInterface $ping */
$ping = $this->redis->ping(); // @phpstan-ignore-line
$ping = $client->ping(); // @phpstan-ignore-line
$ping = maybeTimeout($ping, $this->options['readTimeout']);
$ping->then(
onRejected: Hub::die(...),
Expand All @@ -67,7 +71,16 @@ private function ping(): void

public function subscribe(callable $callback): void
{
static $methodToInvade;
$methodToInvade ??= new ReflectionMethod($this->subscriber, 'client');
$this->init();
/** @var PromiseInterface<StreamingClient> $promise */
$promise = $methodToInvade->invoke($this->subscriber);
$promise->then(function (StreamingClient $redis) {
$redis->on('close', fn () => Hub::die(
new RuntimeException('Redis connection was unexpectedly closed.')
));
});
$this->eventEmitter->on('mercureUpdate', $callback);
}

Expand Down
7 changes: 3 additions & 4 deletions src/Hub/Transport/Redis/RedisTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@

namespace Freddie\Hub\Transport\Redis;

use Clue\React\Redis\RedisClient;
use Freddie\Hub\Transport\TransportFactoryInterface;
use Freddie\Hub\Transport\TransportInterface;
use Clue\React\Redis\Factory;
use Nyholm\Dsn\DsnParser;

use function max;

final class RedisTransportFactory implements TransportFactoryInterface
{
public function __construct(
private Factory $factory = new Factory(),
private RedisSerializer $serializer = new RedisSerializer(),
) {
}
Expand All @@ -28,8 +27,8 @@ public function supports(string $dsn): bool
public function create(string $dsn): TransportInterface
{
$parsed = DsnParser::parse($dsn);
$redis = $this->factory->createLazyClient($dsn);
$subscriber = $this->factory->createLazyClient($dsn); // Create a 2nd, blocking connection to receive updates
$redis = new RedisClient($dsn);
$subscriber = new RedisClient($dsn); // Create a 2nd, blocking connection to receive updates

return new RedisTransport(
$subscriber,
Expand Down
65 changes: 65 additions & 0 deletions tests/Unit/Hub/Transport/Redis/ConnectionStub.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

declare(strict_types=1);

namespace Freddie\Tests\Unit\Hub\Transport\Redis;

use Evenement\EventEmitterTrait;
use React\Socket\ConnectionInterface;
use React\Stream\WritableStreamInterface;

final class ConnectionStub implements ConnectionInterface
{
use EventEmitterTrait;

public string $received = '';

public function getRemoteAddress()
{
return '';
}

public function getLocalAddress()
{
return '';
}

public function isReadable()
{
return true;
}

public function pause(): void
{
}

public function resume(): void
{
}

// @phpstan-ignore-next-line
public function pipe(WritableStreamInterface $dest, array $options = [])
{
return $dest;
}

public function close(): void
{
}

public function isWritable(): bool
{
return true;
}

public function write($data): bool
{
$this->received .= $data;

return true;
}

public function end($data = null): void
{
}
}
30 changes: 19 additions & 11 deletions tests/Unit/Hub/Transport/Redis/RedisClientStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,34 @@
namespace Freddie\Tests\Unit\Hub\Transport\Redis;

use ArrayObject;
use Clue\React\Redis\Client;
use Clue\React\Redis\RedisClient;
use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
use Pest\Exceptions\ShouldNotHappen;
use React\Promise\PromiseInterface;
use React\Socket\ConnectorInterface;

use function abs;
use function array_splice;
use function count;
use function React\Async\async;
use function React\Promise\resolve;

final class RedisClientStub implements Client
final class RedisClientStub extends RedisClient
{
public array $subscribedChannels = [];

public function __construct(
public readonly ArrayObject $storage = new ArrayObject(),
private EventEmitterInterface $eventEmitter = new EventEmitter(),
public ArrayObject $storage = new ArrayObject(),
private readonly EventEmitterInterface $eventEmitter = new EventEmitter(),
) {
$connectorStub = new class implements ConnectorInterface {
public function connect($uri)
{
return resolve(new ConnectionStub());
}
};
parent::__construct('redis://127.0.0.1', $connectorStub);
}

public function subscribe(string $channel): void
Expand Down Expand Up @@ -68,7 +76,7 @@ public function ltrim(string $key, int $from, int $to)
->then(fn (array $items) => $this->storage[$key] = $items);
}

public function __call($name, $args)
public function __call($name, $args): PromiseInterface
{
throw new ShouldNotHappen(new \LogicException(__METHOD__));
}
Expand All @@ -83,32 +91,32 @@ public function close(): void
throw new ShouldNotHappen(new \LogicException(__METHOD__));
}

public function on($event, callable $listener)
public function on($event, callable $listener): void
{
$this->eventEmitter->on(...\func_get_args());
}

public function once($event, callable $listener)
public function once($event, callable $listener): void
{
$this->eventEmitter->once(...\func_get_args());
}

public function removeListener($event, callable $listener)
public function removeListener($event, callable $listener): void
{
$this->eventEmitter->removeListener(...\func_get_args());
}

public function removeAllListeners($event = null)
public function removeAllListeners($event = null): void
{
$this->eventEmitter->removeAllListeners(...\func_get_args());
}

public function listeners($event = null)
public function listeners($event = null): array
{
return $this->eventEmitter->listeners(...\func_get_args());
}

public function emit($event, array $arguments = [])
public function emit($event, array $arguments = []): void
{
$this->eventEmitter->emit(...\func_get_args());
}
Expand Down
16 changes: 7 additions & 9 deletions tests/Unit/Hub/Transport/Redis/RedisTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

namespace Freddie\Tests\Unit\Hub\Transport\Redis;

use Clue\React\Redis\RedisClient;
use Freddie\Hub\Transport\Redis\RedisTransport;
use Freddie\Hub\Transport\Redis\RedisTransportFactory;
use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;

it('supports Redis DSNs', function (string $dsn, bool $expected) {
$factory = new RedisTransportFactory();
Expand All @@ -22,14 +21,13 @@
$factory = new RedisTransportFactory();
expect($factory->create($dsn))->toEqual($expected);
})->with(function () {
$redisFactory = new Factory();
yield ['redis://localhost?foo=bar', new RedisTransport(
$redisFactory->createLazyClient('redis://localhost?foo=bar'),
$redisFactory->createLazyClient('redis://localhost?foo=bar'),
new RedisClient('redis://localhost?foo=bar'),
new RedisClient('redis://localhost?foo=bar'),
)];
yield ['redis://localhost?size=1000&trimInterval=2.5', new RedisTransport(
$redisFactory->createLazyClient('redis://localhost?size=1000&trimInterval=2.5'),
$redisFactory->createLazyClient('redis://localhost?size=1000&trimInterval=2.5'),
new RedisClient('redis://localhost?size=1000&trimInterval=2.5'),
new RedisClient('redis://localhost?size=1000&trimInterval=2.5'),
options: ['size' => 1000, 'trimInterval' => 2.5],
)];
});
Expand All @@ -38,7 +36,7 @@
$factory = new RedisTransportFactory();
/** @var RedisTransport $transport */
$transport = $factory->create('redis://localhost?size=1000');
expect($transport->redis)->toBeInstanceOf(Client::class);
expect($transport->subscriber)->toBeInstanceOf(Client::class);
expect($transport->redis)->toBeInstanceOf(RedisClient::class);
expect($transport->subscriber)->toBeInstanceOf(RedisClient::class);
expect($transport->redis)->not()->toBe($transport->subscriber);
});
Loading