diff --git a/src/ContinuousPipe/Events/Aggregate.php b/src/ContinuousPipe/Events/Aggregate.php new file mode 100644 index 0000000..9ad80fd --- /dev/null +++ b/src/ContinuousPipe/Events/Aggregate.php @@ -0,0 +1,8 @@ +apply($event); + } + + return $self; + } + + public function apply($event) + { + $className = get_class($event); + $classNameTail = substr($className, strrpos($className, '\\') + 1); + + $method = sprintf('apply%s', ucfirst($classNameTail)); + if (!method_exists($this, $method)) { + throw new \BadMethodCallException( + "There is no event named '$method' that can be applied to '".get_class($this)."'. ". + 'If you just want to emit an event without applying changes use the raise() method.' + ); + } + + $this->$method($event); + } +} diff --git a/src/ContinuousPipe/Events/Capabilities/RaiseEventCapability.php b/src/ContinuousPipe/Events/Capabilities/RaiseEventCapability.php new file mode 100644 index 0000000..5a5973b --- /dev/null +++ b/src/ContinuousPipe/Events/Capabilities/RaiseEventCapability.php @@ -0,0 +1,23 @@ +raisedEvents[] = $event; + } + + public function raisedEvents() : array + { + return $this->raisedEvents; + } + + public function eraseEvents() + { + $this->raisedEvents = []; + } +} diff --git a/src/ContinuousPipe/Events/EventStore/Doctrine/DoctrineEventStore.php b/src/ContinuousPipe/Events/EventStore/Doctrine/DoctrineEventStore.php new file mode 100644 index 0000000..23ed3c1 --- /dev/null +++ b/src/ContinuousPipe/Events/EventStore/Doctrine/DoctrineEventStore.php @@ -0,0 +1,83 @@ +entityManager = $entityManager; + $this->serializer = $serializer; + $this->timeResolver = $timeResolver; + } + + /** + * {@inheritdoc} + */ + public function store(string $stream, $event) + { + $dataTransferObject = new EventDto( + Uuid::uuid4(), + $stream, + get_class($event), + $this->serializer->serialize($event, 'json'), + $this->timeResolver->resolve() + ); + + try { + $this->entityManager->persist($dataTransferObject); + $this->entityManager->flush($dataTransferObject); + } catch (ORMException $e) { + throw new EventStoreException('Unable to store the event', $e->getCode(), $e); + } + } + + /** + * {@inheritdoc} + */ + public function read(string $stream): array + { + $dataTransferObjects = $this->entityManager->getRepository(EventDto::class)->findBy([ + 'stream' => $stream, + ], [ + 'creationDate' => 'ASC' + ]); + + return array_map(function (EventDto $dataTransferObject) { + return $this->serializer->deserialize( + $dataTransferObject->getJsonSerialized(), + $dataTransferObject->getClass(), + 'json' + ); + }, $dataTransferObjects); + } +} diff --git a/src/ContinuousPipe/Events/EventStore/Doctrine/EventDto.php b/src/ContinuousPipe/Events/EventStore/Doctrine/EventDto.php new file mode 100644 index 0000000..009db03 --- /dev/null +++ b/src/ContinuousPipe/Events/EventStore/Doctrine/EventDto.php @@ -0,0 +1,100 @@ +uuid = $uuid; + $this->stream = $stream; + $this->class = $class; + $this->jsonSerialized = $jsonSerialized; + $this->creationDate = $creationDate; + } + + /** + * @return UuidInterface + */ + public function getUuid(): UuidInterface + { + return $this->uuid; + } + + /** + * @return string + */ + public function getStream(): string + { + return $this->stream; + } + + /** + * @return string + */ + public function getClass(): string + { + return $this->class; + } + + /** + * @return string + */ + public function getJsonSerialized(): string + { + return $this->jsonSerialized; + } + + /** + * @return \DateTimeInterface + */ + public function getCreationDate(): \DateTimeInterface + { + return $this->creationDate; + } +} diff --git a/src/ContinuousPipe/Events/EventStore/EventStore.php b/src/ContinuousPipe/Events/EventStore/EventStore.php new file mode 100644 index 0000000..2febebd --- /dev/null +++ b/src/ContinuousPipe/Events/EventStore/EventStore.php @@ -0,0 +1,27 @@ +serializer = $serializer; + $this->eventStoreHost = $eventStoreHost; + } + + public function store(string $stream, $event) + { + $className = get_class($event); + $name = substr($className, strrpos($className, '\\') + 1); + + $this->client()->writeToStream($stream, WritableEvent::newInstance( + $name, + \GuzzleHttp\json_decode($this->serializer->serialize($event, 'json'), true), + [ + 'class' => get_class($event), + ] + )); + } + + public function read(string $stream) : array + { + $iterator = $this->client()->forwardStreamFeedIterator($stream); + $events = []; + + foreach ($iterator as $entryWithEvent) { + /** @var \EventStore\StreamFeed\EntryWithEvent $entryWithEvent */ + $event = $entryWithEvent->getEvent(); + + $events[] = $this->serializer->deserialize( + \GuzzleHttp\json_encode($event->getData()), + $event->getMetadata()['class'], + 'json' + ); + } + + return $events; + } + + private function client() + { + if (null === $this->client) { + $this->client = new EventStoreClient('http://'.$this->eventStoreHost.':2113'); + } + + return $this->client; + } +} diff --git a/src/ContinuousPipe/Events/EventStore/InMemoryEventStore.php b/src/ContinuousPipe/Events/EventStore/InMemoryEventStore.php new file mode 100644 index 0000000..1a16b57 --- /dev/null +++ b/src/ContinuousPipe/Events/EventStore/InMemoryEventStore.php @@ -0,0 +1,26 @@ +streams)) { + $this->streams[$stream] = []; + } + + $this->streams[$stream][] = $event; + } + + public function read(string $stream): array + { + if (!array_key_exists($stream, $this->streams)) { + return []; + } + + return $this->streams[$stream]; + } +} diff --git a/src/ContinuousPipe/Events/EventStream/AbstractEventStream.php b/src/ContinuousPipe/Events/EventStream/AbstractEventStream.php new file mode 100644 index 0000000..4b984f9 --- /dev/null +++ b/src/ContinuousPipe/Events/EventStream/AbstractEventStream.php @@ -0,0 +1,24 @@ +name = $name; + } + + public function __toString() + { + return $this->name; + } +} diff --git a/src/ContinuousPipe/Events/SimpleBus/StoreEventsMiddleware.php b/src/ContinuousPipe/Events/SimpleBus/StoreEventsMiddleware.php new file mode 100644 index 0000000..271b600 --- /dev/null +++ b/src/ContinuousPipe/Events/SimpleBus/StoreEventsMiddleware.php @@ -0,0 +1,43 @@ +eventStore = $eventStore; + $this->eventStreamResolver = $eventStreamResolver; + } + + /** + * {@inheritdoc} + */ + public function handle($message, callable $next) + { + if (null !== ($stream = $this->eventStreamResolver->streamByEvent($message))) { + $this->eventStore->store($stream, $message); + } + + $next($message); + } +} diff --git a/src/ContinuousPipe/Events/TimeResolver/NativeTimeResolver.php b/src/ContinuousPipe/Events/TimeResolver/NativeTimeResolver.php new file mode 100644 index 0000000..409a4cc --- /dev/null +++ b/src/ContinuousPipe/Events/TimeResolver/NativeTimeResolver.php @@ -0,0 +1,16 @@ +aggregateRepository = $aggregateRepository; + $this->eventBus = $eventBus; + } + + /** + * {@inheritdoc} + */ + public function apply(string $aggregateIdentifier, callable $transaction) : Aggregate + { + $aggregate = $this->aggregateRepository->find($aggregateIdentifier); + + if (null !== ($result = $transaction($aggregate))) { + if (!$aggregate instanceof Aggregate) { + throw new TransactionException('The transaction have to return `null` or with an `Aggregate` object'); + } + + $aggregate = $result; + } + + foreach ($aggregate->raisedEvents() as $event) { + $this->eventBus->handle($event); + } + + return $aggregate; + } +} diff --git a/src/ContinuousPipe/Events/Transaction/TransactionException.php b/src/ContinuousPipe/Events/Transaction/TransactionException.php new file mode 100644 index 0000000..96f51fd --- /dev/null +++ b/src/ContinuousPipe/Events/Transaction/TransactionException.php @@ -0,0 +1,7 @@ +