Skip to content

Commit

Permalink
Add the ContinuousPipe\Events namespace from builder's component
Browse files Browse the repository at this point in the history
  • Loading branch information
sroze committed Feb 27, 2017
1 parent 5d89bd7 commit 3f6c6c9
Show file tree
Hide file tree
Showing 19 changed files with 579 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/ContinuousPipe/Events/Aggregate.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace ContinuousPipe\Events;

interface Aggregate
{
public function raisedEvents() : array;
}
7 changes: 7 additions & 0 deletions src/ContinuousPipe/Events/AggregateNotFound.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace ContinuousPipe\Events;

class AggregateNotFound extends \Exception
{
}
15 changes: 15 additions & 0 deletions src/ContinuousPipe/Events/AggregateRepository.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

namespace ContinuousPipe\Events;

interface AggregateRepository
{
/**
* @param string $aggregateIdentifier
*
* @throws AggregateNotFound
*
* @return Aggregate
*/
public function find(string $aggregateIdentifier) : Aggregate;
}
33 changes: 33 additions & 0 deletions src/ContinuousPipe/Events/Capabilities/ApplyEventCapability.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace ContinuousPipe\Events\Capabilities;

trait ApplyEventCapability
{
public static function fromEvents(array $events)
{
$self = new static();

foreach ($events as $event) {
$self->apply($event);
}

return $self;
}

public function apply($event)
{
$className = get_class($event);
$classNameTail = substr($className, strrpos($className, '\\') + 1);

$method = sprintf('apply%s', ucfirst($classNameTail));
if (!method_exists($this, $method)) {
throw new \BadMethodCallException(
"There is no event named '$method' that can be applied to '".get_class($this)."'. ".
'If you just want to emit an event without applying changes use the raise() method.'
);
}

$this->$method($event);
}
}
23 changes: 23 additions & 0 deletions src/ContinuousPipe/Events/Capabilities/RaiseEventCapability.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace ContinuousPipe\Events\Capabilities;

trait RaiseEventCapability
{
private $raisedEvents = [];

protected function raise($event)
{
$this->raisedEvents[] = $event;
}

public function raisedEvents() : array
{
return $this->raisedEvents;
}

public function eraseEvents()
{
$this->raisedEvents = [];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

namespace ContinuousPipe\Events\EventStore\Doctrine;

use ContinuousPipe\Events\EventStore\EventStore;
use ContinuousPipe\Events\EventStore\EventStoreException;
use ContinuousPipe\Events\TimeResolver\TimeResolver;
use Doctrine\ORM\EntityManager;
use Doctrine\ORM\ORMException;
use JMS\Serializer\SerializerInterface;
use Ramsey\Uuid\Uuid;

class DoctrineEventStore implements EventStore
{
/**
* @var EntityManager
*/
private $entityManager;
/**
* @var SerializerInterface
*/
private $serializer;
/**
* @var TimeResolver
*/
private $timeResolver;

/**
* @param EntityManager $entityManager
* @param SerializerInterface $serializer
* @param TimeResolver $timeResolver
*/
public function __construct(
EntityManager $entityManager,
SerializerInterface $serializer,
TimeResolver $timeResolver
) {
$this->entityManager = $entityManager;
$this->serializer = $serializer;
$this->timeResolver = $timeResolver;
}

/**
* {@inheritdoc}
*/
public function store(string $stream, $event)
{
$dataTransferObject = new EventDto(
Uuid::uuid4(),
$stream,
get_class($event),
$this->serializer->serialize($event, 'json'),
$this->timeResolver->resolve()
);

try {
$this->entityManager->persist($dataTransferObject);
$this->entityManager->flush($dataTransferObject);
} catch (ORMException $e) {
throw new EventStoreException('Unable to store the event', $e->getCode(), $e);
}
}

/**
* {@inheritdoc}
*/
public function read(string $stream): array
{
$dataTransferObjects = $this->entityManager->getRepository(EventDto::class)->findBy([
'stream' => $stream,
], [
'creationDate' => 'ASC'
]);

return array_map(function (EventDto $dataTransferObject) {
return $this->serializer->deserialize(
$dataTransferObject->getJsonSerialized(),
$dataTransferObject->getClass(),
'json'
);
}, $dataTransferObjects);
}
}
100 changes: 100 additions & 0 deletions src/ContinuousPipe/Events/EventStore/Doctrine/EventDto.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?php

namespace ContinuousPipe\Events\EventStore\Doctrine;

use Doctrine\ORM\Mapping as ORM;
use Ramsey\Uuid\UuidInterface;

/**
* @ORM\Entity
* @ORM\Table(indexes={
* @ORM\Index(name="idx_event_dto_by_stream", columns={"stream"}),
* })
*/
class EventDto
{
/**
* @ORM\Id
* @ORM\Column(type="uuid")
*
* @var UuidInterface
*/
private $uuid;

/**
* @ORM\Column(type="string", nullable=false)
*
* @var string
*/
private $stream;

/**
* @ORM\Column(type="string", nullable=false)
*
* @var string
*/
private $class;

/**
* @ORM\Column(type="text", nullable=false)
*
* @var string
*/
private $jsonSerialized;

/**
* @ORM\Column(type="datetime", nullable=false)
*
* @var \DateTimeInterface
*/
private $creationDate;

public function __construct(UuidInterface $uuid, string $stream, string $class, string $jsonSerialized, \DateTimeInterface $creationDate)
{
$this->uuid = $uuid;
$this->stream = $stream;
$this->class = $class;
$this->jsonSerialized = $jsonSerialized;
$this->creationDate = $creationDate;
}

/**
* @return UuidInterface
*/
public function getUuid(): UuidInterface
{
return $this->uuid;
}

/**
* @return string
*/
public function getStream(): string
{
return $this->stream;
}

/**
* @return string
*/
public function getClass(): string
{
return $this->class;
}

/**
* @return string
*/
public function getJsonSerialized(): string
{
return $this->jsonSerialized;
}

/**
* @return \DateTimeInterface
*/
public function getCreationDate(): \DateTimeInterface
{
return $this->creationDate;
}
}
27 changes: 27 additions & 0 deletions src/ContinuousPipe/Events/EventStore/EventStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace ContinuousPipe\Events\EventStore;

interface EventStore
{
/**
* Write an event to the given stream.
*
* @param string $stream
* @param mixed $event
*
* @throws EventStoreException
*/
public function store(string $stream, $event);

/**
* Read all the events from the given stream.
*
* @param string $stream
*
* @throws EventStoreException
*
* @return mixed[]
*/
public function read(string $stream) : array;
}
7 changes: 7 additions & 0 deletions src/ContinuousPipe/Events/EventStore/EventStoreException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace ContinuousPipe\Events\EventStore;

class EventStoreException extends \Exception
{
}
13 changes: 13 additions & 0 deletions src/ContinuousPipe/Events/EventStore/EventStreamResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace ContinuousPipe\Events\EventStore;

interface EventStreamResolver
{
/**
* @param mixed $event
*
* @return string|null
*/
public function streamByEvent($event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

namespace ContinuousPipe\Events\GetEventStore\EventStore;

use ContinuousPipe\Events\EventStore\EventStore;
use EventStore\EventStore as EventStoreClient;
use EventStore\WritableEvent;
use JMS\Serializer\SerializerInterface;

class HttpEventStoreAdapter implements EventStore
{
private $client;
private $serializer;
private $eventStoreHost;

public function __construct(SerializerInterface $serializer, string $eventStoreHost)
{
$this->serializer = $serializer;
$this->eventStoreHost = $eventStoreHost;
}

public function store(string $stream, $event)
{
$className = get_class($event);
$name = substr($className, strrpos($className, '\\') + 1);

$this->client()->writeToStream($stream, WritableEvent::newInstance(
$name,
\GuzzleHttp\json_decode($this->serializer->serialize($event, 'json'), true),
[
'class' => get_class($event),
]
));
}

public function read(string $stream) : array
{
$iterator = $this->client()->forwardStreamFeedIterator($stream);
$events = [];

foreach ($iterator as $entryWithEvent) {
/** @var \EventStore\StreamFeed\EntryWithEvent $entryWithEvent */
$event = $entryWithEvent->getEvent();

$events[] = $this->serializer->deserialize(
\GuzzleHttp\json_encode($event->getData()),
$event->getMetadata()['class'],
'json'
);
}

return $events;
}

private function client()
{
if (null === $this->client) {
$this->client = new EventStoreClient('http://'.$this->eventStoreHost.':2113');
}

return $this->client;
}
}
Loading

0 comments on commit 3f6c6c9

Please sign in to comment.