diff --git a/packages/Ecotone/src/Messaging/Support/ConcurrencyException.php b/packages/Ecotone/src/Messaging/Support/ConcurrencyException.php new file mode 100644 index 000000000..2e952b12d --- /dev/null +++ b/packages/Ecotone/src/Messaging/Support/ConcurrencyException.php @@ -0,0 +1,15 @@ +eventType = $eventType; - $this->payload = $payload; - $this->metadata = $metadata; + Assert::notNull($payload, 'Event can not be null for ' . $eventName); } public static function create(object $event, array $metadata = []) @@ -32,9 +28,9 @@ public static function createWithType(string $eventType, array|object $event, ar return new self($eventType, $event, $metadata); } - public function getEventType(): string + public function getEventName(): string { - return $this->eventType; + return $this->eventName; } public function getPayload(): array|object diff --git a/packages/PdoEventSourcing/src/EventMapper.php b/packages/PdoEventSourcing/src/EventMapper.php index 4760f0f96..b267260d7 100644 --- a/packages/PdoEventSourcing/src/EventMapper.php +++ b/packages/PdoEventSourcing/src/EventMapper.php @@ -64,7 +64,7 @@ public function mapNameToEventType(string $name): string public function mapEventToName(Event $event): string { - $type = $event->getEventType(); + $type = $event->getEventName(); if (array_key_exists($type, $this->eventToNameMapping)) { return $this->eventToNameMapping[$type]; } diff --git a/packages/PdoEventSourcing/src/EventSourcingConfiguration.php b/packages/PdoEventSourcing/src/EventSourcingConfiguration.php index f29004ffe..e895c5441 100644 --- a/packages/PdoEventSourcing/src/EventSourcingConfiguration.php +++ b/packages/PdoEventSourcing/src/EventSourcingConfiguration.php @@ -25,7 +25,7 @@ class EventSourcingConfiguration extends BaseEventSourcingConfiguration private string $eventStoreReferenceName; private string $projectManagerReferenceName; private string $connectionReferenceName; - private string $persistenceStrategy = LazyProophEventStore::SINGLE_STREAM_PERSISTENCE; + private string $persistenceStrategy = LazyProophEventStore::PARTITION_STREAM_PERSISTENCE; private ?PersistenceStrategy $customPersistenceStrategyInstance = null; private bool $isInMemory = false; private ?InMemoryEventStore $inMemoryEventStore = null; @@ -62,6 +62,7 @@ public static function createInMemory(): static /** * This work as simple stream strategy, however put constraints on aggregate_id, aggregate_version, aggregate_type being present + * @deprecated Ecotone 2.0 will be removed in favour of partition stream persistence strategy */ public function withSingleStreamPersistenceStrategy(): static { @@ -70,6 +71,13 @@ public function withSingleStreamPersistenceStrategy(): static return $this; } + public function withPartitionStreamPersistenceStrategy(): static + { + $this->persistenceStrategy = LazyProophEventStore::SINGLE_STREAM_PERSISTENCE; + + return $this; + } + /** * Aggregate_id becomes a stream and each stream is separate table. * Be careful this create a lot of database tables. @@ -93,7 +101,7 @@ public function withSimpleStreamPersistenceStrategy(): static public function withPersistenceStrategyFor(string $streamName, string $strategy): self { - $allowedStrategies = [LazyProophEventStore::SIMPLE_STREAM_PERSISTENCE, LazyProophEventStore::SINGLE_STREAM_PERSISTENCE, LazyProophEventStore::AGGREGATE_STREAM_PERSISTENCE]; + $allowedStrategies = [LazyProophEventStore::SIMPLE_STREAM_PERSISTENCE, LazyProophEventStore::SINGLE_STREAM_PERSISTENCE, LazyProophEventStore::AGGREGATE_STREAM_PERSISTENCE, LazyProophEventStore::PARTITION_STREAM_PERSISTENCE]; Assert::oneOf( $strategy, @@ -165,11 +173,6 @@ public function isInitializedOnStart(): bool return $this->initializeEventStoreOnStart; } - public function isUsingSingleStreamStrategy(): bool - { - return $this->getPersistenceStrategy() === LazyProophEventStore::SINGLE_STREAM_PERSISTENCE; - } - public function isUsingAggregateStreamStrategyFor(string $streamName): bool { return diff --git a/packages/PdoEventSourcing/src/EventStore.php b/packages/PdoEventSourcing/src/EventStore.php index 231f32c44..e7d5e41d9 100644 --- a/packages/PdoEventSourcing/src/EventStore.php +++ b/packages/PdoEventSourcing/src/EventStore.php @@ -11,10 +11,14 @@ interface EventStore { /** + * Creates new Stream with Metadata and appends events to it + * * @param Event[]|object[]|array[] $streamEvents */ public function create(string $streamName, array $streamEvents = [], array $streamMetadata = []): void; /** + * Appends events to existing Stream, or creates one and then appends events if it does not exists + * * @param Event[]|object[]|array[] $streamEvents */ public function appendTo(string $streamName, array $streamEvents): void; diff --git a/packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php b/packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php index 6635f1fe7..44ca9d0aa 100644 --- a/packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php +++ b/packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php @@ -9,12 +9,14 @@ use Ecotone\EventSourcing\EventSourcingConfiguration; use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMariaDbSimpleStreamStrategy; use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMysqlSimpleStreamStrategy; +use Ecotone\Messaging\Support\ConcurrencyException; use Ecotone\Messaging\Support\InvalidArgumentException; use Interop\Queue\ConnectionFactory; use Iterator; use PDO; use Prooph\Common\Messaging\MessageConverter; use Prooph\EventStore\EventStore; +use Prooph\EventStore\Exception\ConcurrencyException as ProophConcurrencyException; use Prooph\EventStore\Exception\StreamNotFound; use Prooph\EventStore\Metadata\MetadataMatcher; use Prooph\EventStore\Pdo\MariaDbEventStore; @@ -49,7 +51,15 @@ class LazyProophEventStore implements EventStore public const EVENT_STORE_TYPE_MARIADB = 'mariadb'; public const EVENT_STORE_TYPE_IN_MEMORY = 'inMemory'; + /** + * Partition persistence strategy + * @deprecated Ecotone 2.0 will be removed in favour of partition stream persistence strategy + */ public const SINGLE_STREAM_PERSISTENCE = 'single'; + /** + * Same as single stream strategy. + */ + public const PARTITION_STREAM_PERSISTENCE = 'partition'; public const AGGREGATE_STREAM_PERSISTENCE = 'aggregate'; public const SIMPLE_STREAM_PERSISTENCE = 'simple'; public const CUSTOM_STREAM_PERSISTENCE = 'custom'; @@ -57,6 +67,7 @@ class LazyProophEventStore implements EventStore public const AGGREGATE_VERSION = '_aggregate_version'; public const AGGREGATE_TYPE = '_aggregate_type'; public const AGGREGATE_ID = '_aggregate_id'; + const PERSISTENCE_STRATEGY_METADATA = '_persistence'; /** @var EventStore[] */ private array $initializedEventStore = []; @@ -122,7 +133,11 @@ public function updateStreamMetadata(StreamName $streamName, array $newMetadata) public function create(Stream $stream): void { - $this->getEventStore($stream->streamName())->create($stream); + try { + $this->getEventStore($stream->streamName(), $stream->metadata()[self::PERSISTENCE_STRATEGY_METADATA] ?? null)->create($stream); + } catch (ProophConcurrencyException $exception) { + throw new ConcurrencyException($exception->getMessage(), $exception->getCode(), $exception); + } $this->ensuredExistingStreams[$this->getContextName()][$stream->streamName()->toString()] = true; } @@ -135,6 +150,8 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void $this->getEventStore($streamName)->appendTo($streamName, $streamEvents); } catch (StreamNotFound) { $this->create(new Stream($streamName, $streamEvents, [])); + } catch (ProophConcurrencyException $exception) { + throw new ConcurrencyException($exception->getMessage(), $exception->getCode(), $exception); } } } @@ -171,7 +188,7 @@ public function prepareEventStore(): void $this->initializated[$connectionName] = true; } - public function getEventStore(?StreamName $streamName = null): EventStore + public function getEventStore(?StreamName $streamName = null, string|null $streamStrategy = null): EventStore { $contextName = $this->getContextName($streamName); if (isset($this->initializedEventStore[$contextName])) { @@ -188,9 +205,9 @@ public function getEventStore(?StreamName $streamName = null): EventStore $eventStoreType = $this->getEventStoreType(); $persistenceStrategy = match ($eventStoreType) { - self::EVENT_STORE_TYPE_MYSQL => $this->getMysqlPersistenceStrategyFor($streamName), - self::EVENT_STORE_TYPE_MARIADB => $this->getMariaDbPersistenceStrategyFor($streamName), - self::EVENT_STORE_TYPE_POSTGRES => $this->getPostgresPersistenceStrategyFor($streamName), + self::EVENT_STORE_TYPE_MYSQL => $this->getMysqlPersistenceStrategyFor($streamName, $streamStrategy), + self::EVENT_STORE_TYPE_MARIADB => $this->getMariaDbPersistenceStrategyFor($streamName, $streamStrategy), + self::EVENT_STORE_TYPE_POSTGRES => $this->getPostgresPersistenceStrategyFor($streamName, $streamStrategy), default => throw InvalidArgumentException::create('Unexpected match value ' . $eventStoreType) }; @@ -225,31 +242,37 @@ public function getEventStore(?StreamName $streamName = null): EventStore return $eventStore; } - private function getMysqlPersistenceStrategyFor(?StreamName $streamName = null): PersistenceStrategy + private function getMysqlPersistenceStrategyFor(?StreamName $streamName = null, ?string $forcedStrategy = null): PersistenceStrategy { - return match ($this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName)) { + $persistenceStrategy = $forcedStrategy ?? $this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName); + + return match ($persistenceStrategy) { self::AGGREGATE_STREAM_PERSISTENCE => new PersistenceStrategy\MySqlAggregateStreamStrategy($this->messageConverter), - self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MySqlSingleStreamStrategy($this->messageConverter), + self::PARTITION_STREAM_PERSISTENCE, self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MySqlSingleStreamStrategy($this->messageConverter), self::SIMPLE_STREAM_PERSISTENCE => new InterlopMysqlSimpleStreamStrategy($this->messageConverter), self::CUSTOM_STREAM_PERSISTENCE => $this->eventSourcingConfiguration->getCustomPersistenceStrategy(), }; } - private function getMariaDbPersistenceStrategyFor(?StreamName $streamName = null): PersistenceStrategy + private function getMariaDbPersistenceStrategyFor(?StreamName $streamName = null, ?string $forcedStrategy = null): PersistenceStrategy { - return match ($this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName)) { + $persistenceStrategy = $forcedStrategy ?? $this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName); + + return match ($persistenceStrategy) { self::AGGREGATE_STREAM_PERSISTENCE => new PersistenceStrategy\MariaDbAggregateStreamStrategy($this->messageConverter), - self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MariaDbSingleStreamStrategy($this->messageConverter), + self::PARTITION_STREAM_PERSISTENCE, self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MariaDbSingleStreamStrategy($this->messageConverter), self::SIMPLE_STREAM_PERSISTENCE => new InterlopMariaDbSimpleStreamStrategy($this->messageConverter), self::CUSTOM_STREAM_PERSISTENCE => $this->eventSourcingConfiguration->getCustomPersistenceStrategy(), }; } - private function getPostgresPersistenceStrategyFor(?StreamName $streamName = null): PersistenceStrategy + private function getPostgresPersistenceStrategyFor(?StreamName $streamName = null, ?string $forcedStrategy = null): PersistenceStrategy { - return match ($this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName)) { + $persistenceStrategy = $forcedStrategy ?? $this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName); + + return match ($persistenceStrategy) { self::AGGREGATE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresAggregateStreamStrategy($this->messageConverter), - self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresSingleStreamStrategy($this->messageConverter), + self::PARTITION_STREAM_PERSISTENCE, self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresSingleStreamStrategy($this->messageConverter), self::SIMPLE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresSimpleStreamStrategy($this->messageConverter), self::CUSTOM_STREAM_PERSISTENCE => $this->eventSourcingConfiguration->getCustomPersistenceStrategy(), }; diff --git a/packages/PdoEventSourcing/tests/Integration/EventStreamTest.php b/packages/PdoEventSourcing/tests/Integration/EventStreamTest.php new file mode 100644 index 000000000..d43af4aa3 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Integration/EventStreamTest.php @@ -0,0 +1,301 @@ +getConnection()), new TicketEventConverter(), DbalConnectionFactory::class => $this->getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withEnvironment('prod') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true + ); + + /** @var EventStore $eventStore */ + $eventStore = $ecotone->getGateway(EventStore::class); + + $streamName = Uuid::uuid4()->toString(); + $eventStore->appendTo( + $streamName, + [ + Event::create( + $event = new TicketWasRegistered('123', 'Johnny', 'alert'), + $metadata = [ + '_aggregate_id' => 1, + '_aggregate_version' => 1, + '_aggregate_type' => 'ticket', + 'executor' => 'johnny', + ] + ) + ] + ); + + $events = $eventStore->load($streamName); + + $this->assertCount(1, $events); + $this->assertEquals($event, $events[0]->getPayload()); + foreach ($metadata as $key => $value) { + $this->assertEquals($value, $events[0]->getMetadata()[$key]); + } + } + + public function test_storing_for_simple_stream() + { + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + containerOrAvailableServices: [new InProgressTicketList($this->getConnection()), new TicketEventConverter(), DbalConnectionFactory::class => $this->getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withEnvironment('prod') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true + ); + + /** @var EventStore $eventStore */ + $eventStore = $ecotone->getGateway(EventStore::class); + + $streamName = Uuid::uuid4()->toString(); + $eventStore->create($streamName, streamMetadata: [ + LazyProophEventStore::PERSISTENCE_STRATEGY_METADATA => 'simple', + ]); + $eventStore->appendTo( + $streamName, + [ + $eventOne = new TicketWasRegistered('123', 'Johnny', 'alert'), + ] + ); + $eventStore->appendTo( + $streamName, + [ + Event::create( + $eventTwo = new TicketWasClosed('123'), + ) + ] + ); + + $events = $eventStore->load($streamName); + + $this->assertCount(2, $events); + $this->assertEquals($eventOne, $events[0]->getPayload()); + $this->assertEquals($eventTwo, $events[1]->getPayload()); + } + + public function test_storing_same_event_for_simple_stream() + { + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + containerOrAvailableServices: [new InProgressTicketList($this->getConnection()), new TicketEventConverter(), DbalConnectionFactory::class => $this->getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withEnvironment('prod') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true + ); + + /** @var EventStore $eventStore */ + $eventStore = $ecotone->getGateway(EventStore::class); + + $streamName = Uuid::uuid4()->toString(); + $eventStore->create($streamName, streamMetadata: [ + LazyProophEventStore::PERSISTENCE_STRATEGY_METADATA => 'simple', + ]); + $eventStore->appendTo( + $streamName, + [ + Event::create( + new TicketWasRegistered('123', 'Johnny', 'alert'), + [ + '_aggregate_id' => 1, + '_aggregate_version' => 1, + '_aggregate_type' => 'ticket', + ] + ) + ] + ); + + $eventStore->appendTo( + $streamName, + [ + Event::create( + new TicketWasRegistered('123', 'Johnny', 'alert'), + [ + '_aggregate_id' => 1, + '_aggregate_version' => 1, + '_aggregate_type' => 'ticket', + ] + ) + ] + ); + + $events = $eventStore->load($streamName); + $this->assertCount(2, $events); + } + + public function test_storing_same_event_for_partioned_stream() + { + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + containerOrAvailableServices: [new InProgressTicketList($this->getConnection()), new TicketEventConverter(), DbalConnectionFactory::class => $this->getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withEnvironment('prod') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true + ); + + /** @var EventStore $eventStore */ + $eventStore = $ecotone->getGateway(EventStore::class); + + $streamName = Uuid::uuid4()->toString(); + $eventStore->create($streamName, streamMetadata: [ + LazyProophEventStore::PERSISTENCE_STRATEGY_METADATA => 'partition', + ]); + $eventStore->appendTo( + $streamName, + [ + Event::create( + new TicketWasRegistered('123', 'Johnny', 'alert'), + [ + '_aggregate_id' => 1, + '_aggregate_version' => 1, + '_aggregate_type' => 'ticket', + ] + ) + ] + ); + + $this->expectException(ConcurrencyException::class); + + $eventStore->appendTo( + $streamName, + [ + Event::create( + new TicketWasRegistered('123', 'Johnny', 'alert'), + [ + '_aggregate_id' => 1, + '_aggregate_version' => 1, + '_aggregate_type' => 'ticket', + ] + ) + ] + ); + } + + public function test_storing_same_event_for_default_partioned_stream() + { + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + containerOrAvailableServices: [new InProgressTicketList($this->getConnection()), new TicketEventConverter(), DbalConnectionFactory::class => $this->getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withEnvironment('prod') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true + ); + + /** @var EventStore $eventStore */ + $eventStore = $ecotone->getGateway(EventStore::class); + + $streamName = Uuid::uuid4()->toString(); + $eventStore->appendTo( + $streamName, + [ + Event::create( + new TicketWasRegistered('123', 'Johnny', 'alert'), + [ + '_aggregate_id' => 1, + '_aggregate_version' => 1, + '_aggregate_type' => 'ticket', + ] + ) + ] + ); + + $this->expectException(ConcurrencyException::class); + + $eventStore->appendTo( + $streamName, + [ + Event::create( + new TicketWasRegistered('123', 'Johnny', 'alert'), + [ + '_aggregate_id' => 1, + '_aggregate_version' => 1, + '_aggregate_type' => 'ticket', + ] + ) + ] + ); + } + + public function test_fetching_with_pagination() + { + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + containerOrAvailableServices: [new InProgressTicketList($this->getConnection()), new TicketEventConverter(), DbalConnectionFactory::class => $this->getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withEnvironment('prod') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE])) + ->withNamespaces([ + 'Test\Ecotone\EventSourcing\Fixture\Ticket', + ]), + pathToRootCatalog: __DIR__ . '/../../', + runForProductionEventStore: true + ); + + /** @var EventStore $eventStore */ + $eventStore = $ecotone->getGateway(EventStore::class); + + $streamName = Uuid::uuid4()->toString(); + $eventStore->create($streamName, streamMetadata: [ + LazyProophEventStore::PERSISTENCE_STRATEGY_METADATA => 'simple', + ]); + $eventStore->appendTo( + $streamName, + [ + new TicketWasRegistered('123', 'Johnny', 'alert'), + new TicketWasClosed('123'), + ] + ); + + $events = $eventStore->load($streamName, fromNumber: 2, count: 1); + + $this->assertEquals( + new TicketWasClosed('123'), + $events[0]->getPayload() + ); + } +} \ No newline at end of file