From 3671a96c9fc66e0b38a449aa3d0ea56fe384f47f Mon Sep 17 00:00:00 2001 From: Douglas Medeiros Date: Mon, 8 Jul 2024 12:19:58 -0300 Subject: [PATCH] feat: create Mongo Stream Listener --- .docker/mongo/create-new-record.sh | 9 ++ .docker/mongo/reset.sh | 58 +-------- .gitignore | 3 +- bin/command.php | 17 --- bin/console.php | 4 +- composer.json | 11 +- docker-compose.yml | 1 + makefile | 19 ++- phpstan.neon.dist | 6 - src/Factories/CacheFactory.php | 20 +++ src/Factories/MongoFactory.php | 23 ++++ src/LocalCacheDriver.php | 121 ++++++++++++++++++ src/MongoStreamListenerCommand.php | 155 +++++++++++++++-------- tests/.gitkeep | 0 tests/Feature.php | 11 -- tests/LocalCacheDriverTest.php | 96 ++++++++++++++ tests/MongoStreamListenerCommandTest.php | 83 ++++++++++++ 17 files changed, 488 insertions(+), 149 deletions(-) create mode 100755 .docker/mongo/create-new-record.sh mode change 100644 => 100755 .docker/mongo/reset.sh delete mode 100644 bin/command.php mode change 100644 => 100755 bin/console.php delete mode 100644 phpstan.neon.dist create mode 100644 src/Factories/CacheFactory.php create mode 100644 src/Factories/MongoFactory.php create mode 100644 src/LocalCacheDriver.php delete mode 100644 tests/.gitkeep delete mode 100644 tests/Feature.php create mode 100644 tests/LocalCacheDriverTest.php create mode 100644 tests/MongoStreamListenerCommandTest.php diff --git a/.docker/mongo/create-new-record.sh b/.docker/mongo/create-new-record.sh new file mode 100755 index 0000000..c18bf56 --- /dev/null +++ b/.docker/mongo/create-new-record.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +mongosh "mongodb://mongo/default?replicaSet=rs0&readPreference=primary" < db.getCollection(c).drop()) EOF - - -echo "Creating admin user: root@root/admin" -mongosh --host mongo < Medeirosinacio\MongoTtlIndexChangeStream\MongoStreamListenerCommand::listen(), - default => throw new InvalidArgumentException('Invalid command'), -}; diff --git a/bin/console.php b/bin/console.php old mode 100644 new mode 100755 index 9dc3718..1f05e64 --- a/bin/console.php +++ b/bin/console.php @@ -1,12 +1,12 @@ #!/usr/bin/env php db.getCollection(c).drop());'" + docker-compose exec mongo bash -c "/usr/scripts/reset.sh" database-migrate: ## Migrate the database docker-compose exec mongo bash -c "find /migrations/ -name \"*.sh\" -exec {} \;" +database-seed: ## Create a new record in the database + docker-compose exec mongo bash -c "/usr/scripts/create-new-record.sh" + playground: ## Start a PHP playground dockerized environment @make check-docker @docker-compose exec php bash diff --git a/phpstan.neon.dist b/phpstan.neon.dist deleted file mode 100644 index 4f2a3b2..0000000 --- a/phpstan.neon.dist +++ /dev/null @@ -1,6 +0,0 @@ -parameters: - level: max - paths: - - src - - reportUnmatchedIgnoredErrors: true \ No newline at end of file diff --git a/src/Factories/CacheFactory.php b/src/Factories/CacheFactory.php new file mode 100644 index 0000000..db5cab5 --- /dev/null +++ b/src/Factories/CacheFactory.php @@ -0,0 +1,20 @@ + new LocalCacheDriver(), + default => throw new InvalidArgumentException('Invalid cache driver'), + }; + } +} diff --git a/src/Factories/MongoFactory.php b/src/Factories/MongoFactory.php new file mode 100644 index 0000000..c9304ba --- /dev/null +++ b/src/Factories/MongoFactory.php @@ -0,0 +1,23 @@ + 'root', + 'password' => 'root', + 'ssl' => false, + 'replicaSet' => 'rs0', + ] + ); + } +} diff --git a/src/LocalCacheDriver.php b/src/LocalCacheDriver.php new file mode 100644 index 0000000..c19d3f2 --- /dev/null +++ b/src/LocalCacheDriver.php @@ -0,0 +1,121 @@ +cacheDir = rtrim($cacheDir, DIRECTORY_SEPARATOR); + if (is_dir($this->cacheDir)) { + return; + } + if (mkdir($this->cacheDir, 0755, true)) { + return; + } + if (is_dir($this->cacheDir)) { + return; + } + throw new Exception('Failed to create cache directory.'); + } + + public function set(string $key, mixed $value, int|DateInterval|null $ttl = 3600): bool + { + $filePath = $this->getFilePath($key); + $data = [ + 'expires_at' => time() + $ttl, + 'value' => serialize($value), + ]; + + return (bool) file_put_contents($filePath, serialize($data)); + } + + public function get(string $key, mixed $default = null): mixed + { + $filePath = $this->getFilePath($key); + + if (!file_exists($filePath)) { + return null; + } + + $data = unserialize(file_get_contents($filePath)); + + if ($data['expires_at'] < time()) { + unlink($filePath); + + return null; + } + + return unserialize($data['value']); + } + + public function delete(string $key): bool + { + $filePath = $this->getFilePath($key); + + if (file_exists($filePath)) { + return unlink($filePath); + } + + return false; + } + + public function clear(): bool + { + $files = glob($this->cacheDir.'/*'); + + foreach ($files as $file) { + if (is_file($file)) { + unlink($file); + } + } + + return true; + } + + private function getFilePath(string $key): string + { + return $this->cacheDir.DIRECTORY_SEPARATOR.md5($key).'.cache'; + } + + public function getMultiple(iterable $keys, mixed $default = null): iterable + { + $items = []; + foreach ($keys as $key) { + $items[$key] = $this->get($key); + } + + return $items; + } + + public function setMultiple(iterable $values, DateInterval|int|null $ttl = null): bool + { + foreach ($values as $key => $value) { + $this->set($key, $value, is_int($ttl) ? $ttl : 3600); + } + + return true; + } + + public function deleteMultiple(iterable $keys): bool + { + foreach ($keys as $key) { + $this->delete($key); + } + + return true; + } + + public function has(string $key): bool + { + return $this->get($key) !== null; + } +} diff --git a/src/MongoStreamListenerCommand.php b/src/MongoStreamListenerCommand.php index fccfffb..cdda5ee 100644 --- a/src/MongoStreamListenerCommand.php +++ b/src/MongoStreamListenerCommand.php @@ -4,69 +4,124 @@ namespace Medeirosinacio\MongoTtlIndexChangeStream; -final class MongoStreamListenerCommand +use Exception; +use InvalidArgumentException; +use Medeirosinacio\MongoTtlIndexChangeStream\Factories\CacheFactory; +use Medeirosinacio\MongoTtlIndexChangeStream\Factories\MongoFactory; +use MongoDB\ChangeStream; +use MongoDB\Client; +use MongoDB\Collection; +use MongoDB\Model\BSONDocument; +use MongoException; +use Psr\SimpleCache\CacheInterface; + +final readonly class MongoStreamListenerCommand { - public static function listen() + public function __construct( + private Client $mongo, + private CacheInterface $cache, + ) {} + + public static function listen(): void { - $cache = make(CacheInterface::class); + (new self( + mongo: MongoFactory::make(), + cache: CacheFactory::make() + ) + )->run(); + } - $collection = (new Client( - 'mongodb://mongo:27017', - [ - 'username' => 'root', - 'password' => 'root', - 'ssl' => false, - 'replicaSet' => 'rs0', - ])) - ->selectDatabase('sentinela') - ->selectCollection('transaction_timeouts'); + public function run(): void + { + $collection = $this->getCollection('default', 'records'); + $this->printListeningMessage($collection); + + $resumeToken = $this->getResumeToken(); + + try { + $changeStream = $this->watchCollection($collection, $resumeToken); + + for ($changeStream->rewind(); true; $changeStream->next()) { + $event = $changeStream->current(); + if (!empty($event) && $event instanceof BSONDocument) { + $this->processEvent($event); + } + $resumeToken = $changeStream->getResumeToken(); + if (!is_object($resumeToken)) { + throw new Exception('Resume token was not found'); + } + $this->cacheResumeToken($resumeToken); + } + } catch (MongoException $e) { + printf("MongoDB Error: %s\n", $e->getMessage()); + } catch (InvalidArgumentException $e) { + printf("Cache Error: %s\n", $e->getMessage()); + } catch (Exception $e) { + printf("Error: %s\n", $e->getMessage()); + } + } + + private function getCollection(string $database, string $collection): Collection + { + return $this->mongo->selectDatabase($database)->selectCollection($collection); + } + + private function printListeningMessage(Collection $collection): void + { printf("Listening for changes in %s.%s\n", $collection->getDatabaseName(), $collection->getCollectionName()); + } + + private function getResumeToken(): ?object + { + $resumeToken = $this->cache->get('mongo_resume_token'); + if (is_object($resumeToken) && !empty($resumeToken->_data)) { + printf("Resume token: %s\n", $resumeToken->_data ?? '(null)'); + + return $resumeToken; + } - $resumeToken = $cache->get('mongo_resume_token'); - printf("Resume token: %s\n", $resumeToken?->_data ?? '(null)'); + return null; + } - $changeStream = $collection->watch( + private function watchCollection(Collection $collection, ?object $resumeToken = null): ChangeStream + { + return $collection->watch( [], [ 'fullDocument' => 'updateLookup', - 'fullDocumentBeforeChange' => 'required', - ... $resumeToken ? ['resumeAfter' => $resumeToken] : [] + 'fullDocumentBeforeChange' => 'whenAvailable', + ...$resumeToken ? ['resumeAfter' => $resumeToken] : [], ] ); + } - for ($changeStream->rewind(); true; $changeStream->next()) { - if (! $changeStream->valid()) { - continue; - } - $event = $changeStream->current(); - $ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']); - $id = json_encode($event['documentKey']['_id']); - switch ($event['operationType']) { - case 'delete': - printf("Deleted document in %s with _id: %s\n\n", $ns, $id); - echo json_encode($event), "\n\n"; - break; - case 'insert': - printf("Inserted new document in %s\n", $ns); - echo json_encode($event['fullDocument']), "\n\n"; - break; - case 'replace': - printf("Replaced new document in %s with _id: %s\n", $ns, $id); - echo json_encode($event['fullDocument']), "\n\n"; - break; - case 'update': - printf("Updated document in %s with _id: %s\n", $ns, $id); - echo json_encode($event['updateDescription']), "\n\n"; - break; - } - - // token - $resumeToken = $changeStream->getResumeToken(); - if ($resumeToken === null) { - throw new \Exception('Resume token was not found'); - } - $cache->set('mongo_resume_token', $resumeToken); + private function processEvent(BSONDocument $event): void + { + $ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']); // @phpstan-ignore-line + $id = json_encode($event['documentKey']['_id'], JSON_THROW_ON_ERROR); // @phpstan-ignore-line + switch ($event['operationType']) { + case 'delete': + printf("Deleted document in %s with _id: %s\n\n", $ns, $id); + echo json_encode($event, JSON_THROW_ON_ERROR), "\n\n"; + break; + case 'insert': + printf("Inserted new document in %s\n", $ns); + echo json_encode($event, JSON_THROW_ON_ERROR), "\n\n"; + break; + case 'replace': + printf("Replaced new document in %s with _id: %s\n", $ns, $id); + echo json_encode($event, JSON_THROW_ON_ERROR), "\n\n"; + break; + case 'update': + printf("Updated document in %s with _id: %s\n", $ns, $id); + echo json_encode($event, JSON_THROW_ON_ERROR), "\n\n"; + break; } } + + private function cacheResumeToken(object $resumeToken): void + { + $this->cache->set('mongo_resume_token', $resumeToken); + } } diff --git a/tests/.gitkeep b/tests/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/tests/Feature.php b/tests/Feature.php deleted file mode 100644 index 550d79a..0000000 --- a/tests/Feature.php +++ /dev/null @@ -1,11 +0,0 @@ -foo(); - - expect($result)->toBe('bar'); -}); diff --git a/tests/LocalCacheDriverTest.php b/tests/LocalCacheDriverTest.php new file mode 100644 index 0000000..15677bf --- /dev/null +++ b/tests/LocalCacheDriverTest.php @@ -0,0 +1,96 @@ +cacheDir = __DIR__.'/cache'; + if (!is_dir($this->cacheDir)) { + mkdir($this->cacheDir, 0755, true); + } + $this->cacheDriver = new LocalCacheDriver($this->cacheDir); +}); + +afterEach(function () { + // Cleanup the temporary cache directory + $files = glob($this->cacheDir.'/*'); + foreach ($files as $file) { + if (is_file($file)) { + unlink($file); + } + } + rmdir($this->cacheDir); +}); + +test('it can set and get a cache item', function () { + $key = 'test_key'; + $value = 'test_value'; + + assertTrue($this->cacheDriver->set($key, $value)); + assertSame($value, $this->cacheDriver->get($key)); +}); + +test('it returns null for a non-existent cache item', function () { + assertNull($this->cacheDriver->get('non_existent_key')); +}); + +test('it can delete a cache item', function () { + $key = 'test_key'; + $value = 'test_value'; + + $this->cacheDriver->set($key, $value); + assertTrue($this->cacheDriver->delete($key)); + assertNull($this->cacheDriver->get($key)); +}); + +test('it can clear all cache items', function () { + $key1 = 'test_key1'; + $value1 = 'test_value1'; + $key2 = 'test_key2'; + $value2 = 'test_value2'; + + $this->cacheDriver->set($key1, $value1); + $this->cacheDriver->set($key2, $value2); + assertTrue($this->cacheDriver->clear()); + assertNull($this->cacheDriver->get($key1)); + assertNull($this->cacheDriver->get($key2)); +}); + +test('it can check if a cache item exists', function () { + $key = 'test_key'; + $value = 'test_value'; + + $this->cacheDriver->set($key, $value); + assertTrue($this->cacheDriver->has($key)); + $this->cacheDriver->delete($key); + assertFalse($this->cacheDriver->has($key)); +}); + +test('it can set and get multiple cache items', function () { + $items = [ + 'key1' => 'value1', + 'key2' => 'value2', + ]; + + assertTrue($this->cacheDriver->setMultiple($items)); + $retrievedItems = $this->cacheDriver->getMultiple(array_keys($items)); + assertSame($items, $retrievedItems); +}); + +test('it can delete multiple cache items', function () { + $items = [ + 'key1' => 'value1', + 'key2' => 'value2', + ]; + + $this->cacheDriver->setMultiple($items); + assertTrue($this->cacheDriver->deleteMultiple(array_keys($items))); + foreach ($items as $key => $value) { + assertNull($this->cacheDriver->get($key)); + } +}); diff --git a/tests/MongoStreamListenerCommandTest.php b/tests/MongoStreamListenerCommandTest.php new file mode 100644 index 0000000..6b2e287 --- /dev/null +++ b/tests/MongoStreamListenerCommandTest.php @@ -0,0 +1,83 @@ +mongo = Mockery::mock(Client::class); + $this->mongoDatabase = Mockery::mock(Database::class); + $this->cache = Mockery::mock(CacheInterface::class); + $this->listener = new MongoStreamListenerCommand($this->mongo, $this->cache); +}); + +afterEach(function () { + Mockery::close(); +}); + +test('it listens and processes changes', function () { + $collection = Mockery::mock(Collection::class); + $changeStream = Mockery::mock(ChangeStream::class); + $resumeToken = (object) ['_data' => 'resume_token_data']; + $event = new BSONDocument([ + 'ns' => ['db' => 'default', 'coll' => 'records'], + 'documentKey' => ['_id' => 'some_id'], + 'operationType' => 'insert', + 'fullDocument' => ['_id' => 'some_id', 'field' => 'value'], + ]); + + $this->mongo->expects('selectDatabase') + ->with('default') + ->andReturn($this->mongoDatabase); + + $this->mongoDatabase->expects('selectCollection') + ->with('records') + ->andReturn($collection); + + $collection->expects('getDatabaseName') + ->andReturn('default'); + + $collection->expects('getCollectionName') + ->andReturn('records'); + + $this->cache->expects('get') + ->with('mongo_resume_token') + ->andReturn($resumeToken); + + $collection->expects('watch') + ->andReturn($changeStream); + + $changeStream->expects('rewind'); + $changeStream->expects('next')->andReturnUsing(function () use ($changeStream) { + static $count = 0; + if ($count < 2) { + $count++; + } else { + $changeStream->valid = false; + } + }); + + $changeStream->expects('current') + ->andReturn($event, null); + + $changeStream->expects('getResumeToken') + ->andReturn($resumeToken); + + $this->cache->expects('set') + ->with('mongo_resume_token', $resumeToken) + ->andReturn(true); + + ob_start(); + $this->listener->run(); + $output = ob_get_clean(); + + assertTrue(str_contains($output, 'Inserted new document in default.records')); +});