Skip to content

Commit

Permalink
Creating given type of Event Stream (#355)
Browse files Browse the repository at this point in the history
* Creating given type of Event Stream

* with pagination
  • Loading branch information
dgafka authored Jul 27, 2024
1 parent a1f9e79 commit c8e193d
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 34 deletions.
15 changes: 15 additions & 0 deletions packages/Ecotone/src/Messaging/Support/ConcurrencyException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Support;

use Ecotone\Messaging\MessagingException;

final class ConcurrencyException extends MessagingException
{
protected static function errorCode(): int
{
return self::MESSAGE_HANDLING_EXCEPTION;
}
}
20 changes: 8 additions & 12 deletions packages/Ecotone/src/Modelling/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@
*/
class Event
{
private string $eventType;
private array|object $payload;
private array $metadata;

private function __construct(string $eventType, array|object $payload, array $metadata)
private function __construct(
private string $eventName,
private array|object $payload,
private array $metadata
)
{
Assert::notNull($payload, 'Event can not be null for ' . $eventType);

$this->eventType = $eventType;
$this->payload = $payload;
$this->metadata = $metadata;
Assert::notNull($payload, 'Event can not be null for ' . $eventName);
}

public static function create(object $event, array $metadata = [])
Expand All @@ -32,9 +28,9 @@ public static function createWithType(string $eventType, array|object $event, ar
return new self($eventType, $event, $metadata);
}

public function getEventType(): string
public function getEventName(): string
{
return $this->eventType;
return $this->eventName;
}

public function getPayload(): array|object
Expand Down
2 changes: 1 addition & 1 deletion packages/PdoEventSourcing/src/EventMapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public function mapNameToEventType(string $name): string

public function mapEventToName(Event $event): string
{
$type = $event->getEventType();
$type = $event->getEventName();
if (array_key_exists($type, $this->eventToNameMapping)) {
return $this->eventToNameMapping[$type];
}
Expand Down
17 changes: 10 additions & 7 deletions packages/PdoEventSourcing/src/EventSourcingConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class EventSourcingConfiguration extends BaseEventSourcingConfiguration
private string $eventStoreReferenceName;
private string $projectManagerReferenceName;
private string $connectionReferenceName;
private string $persistenceStrategy = LazyProophEventStore::SINGLE_STREAM_PERSISTENCE;
private string $persistenceStrategy = LazyProophEventStore::PARTITION_STREAM_PERSISTENCE;
private ?PersistenceStrategy $customPersistenceStrategyInstance = null;
private bool $isInMemory = false;
private ?InMemoryEventStore $inMemoryEventStore = null;
Expand Down Expand Up @@ -62,6 +62,7 @@ public static function createInMemory(): static

/**
* This work as simple stream strategy, however put constraints on aggregate_id, aggregate_version, aggregate_type being present
* @deprecated Ecotone 2.0 will be removed in favour of partition stream persistence strategy
*/
public function withSingleStreamPersistenceStrategy(): static
{
Expand All @@ -70,6 +71,13 @@ public function withSingleStreamPersistenceStrategy(): static
return $this;
}

public function withPartitionStreamPersistenceStrategy(): static
{
$this->persistenceStrategy = LazyProophEventStore::SINGLE_STREAM_PERSISTENCE;

return $this;
}

/**
* Aggregate_id becomes a stream and each stream is separate table.
* Be careful this create a lot of database tables.
Expand All @@ -93,7 +101,7 @@ public function withSimpleStreamPersistenceStrategy(): static

public function withPersistenceStrategyFor(string $streamName, string $strategy): self
{
$allowedStrategies = [LazyProophEventStore::SIMPLE_STREAM_PERSISTENCE, LazyProophEventStore::SINGLE_STREAM_PERSISTENCE, LazyProophEventStore::AGGREGATE_STREAM_PERSISTENCE];
$allowedStrategies = [LazyProophEventStore::SIMPLE_STREAM_PERSISTENCE, LazyProophEventStore::SINGLE_STREAM_PERSISTENCE, LazyProophEventStore::AGGREGATE_STREAM_PERSISTENCE, LazyProophEventStore::PARTITION_STREAM_PERSISTENCE];

Assert::oneOf(
$strategy,
Expand Down Expand Up @@ -165,11 +173,6 @@ public function isInitializedOnStart(): bool
return $this->initializeEventStoreOnStart;
}

public function isUsingSingleStreamStrategy(): bool
{
return $this->getPersistenceStrategy() === LazyProophEventStore::SINGLE_STREAM_PERSISTENCE;
}

public function isUsingAggregateStreamStrategyFor(string $streamName): bool
{
return
Expand Down
4 changes: 4 additions & 0 deletions packages/PdoEventSourcing/src/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
interface EventStore
{
/**
* Creates new Stream with Metadata and appends events to it
*
* @param Event[]|object[]|array[] $streamEvents
*/
public function create(string $streamName, array $streamEvents = [], array $streamMetadata = []): void;
/**
* Appends events to existing Stream, or creates one and then appends events if it does not exists
*
* @param Event[]|object[]|array[] $streamEvents
*/
public function appendTo(string $streamName, array $streamEvents): void;
Expand Down
51 changes: 37 additions & 14 deletions packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
use Ecotone\EventSourcing\EventSourcingConfiguration;
use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMariaDbSimpleStreamStrategy;
use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMysqlSimpleStreamStrategy;
use Ecotone\Messaging\Support\ConcurrencyException;
use Ecotone\Messaging\Support\InvalidArgumentException;
use Interop\Queue\ConnectionFactory;
use Iterator;
use PDO;
use Prooph\Common\Messaging\MessageConverter;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception\ConcurrencyException as ProophConcurrencyException;
use Prooph\EventStore\Exception\StreamNotFound;
use Prooph\EventStore\Metadata\MetadataMatcher;
use Prooph\EventStore\Pdo\MariaDbEventStore;
Expand Down Expand Up @@ -49,14 +51,23 @@ class LazyProophEventStore implements EventStore
public const EVENT_STORE_TYPE_MARIADB = 'mariadb';
public const EVENT_STORE_TYPE_IN_MEMORY = 'inMemory';

/**
* Partition persistence strategy
* @deprecated Ecotone 2.0 will be removed in favour of partition stream persistence strategy
*/
public const SINGLE_STREAM_PERSISTENCE = 'single';
/**
* Same as single stream strategy.
*/
public const PARTITION_STREAM_PERSISTENCE = 'partition';
public const AGGREGATE_STREAM_PERSISTENCE = 'aggregate';
public const SIMPLE_STREAM_PERSISTENCE = 'simple';
public const CUSTOM_STREAM_PERSISTENCE = 'custom';

public const AGGREGATE_VERSION = '_aggregate_version';
public const AGGREGATE_TYPE = '_aggregate_type';
public const AGGREGATE_ID = '_aggregate_id';
const PERSISTENCE_STRATEGY_METADATA = '_persistence';

/** @var EventStore[] */
private array $initializedEventStore = [];
Expand Down Expand Up @@ -122,7 +133,11 @@ public function updateStreamMetadata(StreamName $streamName, array $newMetadata)

public function create(Stream $stream): void
{
$this->getEventStore($stream->streamName())->create($stream);
try {
$this->getEventStore($stream->streamName(), $stream->metadata()[self::PERSISTENCE_STRATEGY_METADATA] ?? null)->create($stream);
} catch (ProophConcurrencyException $exception) {
throw new ConcurrencyException($exception->getMessage(), $exception->getCode(), $exception);
}
$this->ensuredExistingStreams[$this->getContextName()][$stream->streamName()->toString()] = true;
}

Expand All @@ -135,6 +150,8 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void
$this->getEventStore($streamName)->appendTo($streamName, $streamEvents);
} catch (StreamNotFound) {
$this->create(new Stream($streamName, $streamEvents, []));
} catch (ProophConcurrencyException $exception) {
throw new ConcurrencyException($exception->getMessage(), $exception->getCode(), $exception);
}
}
}
Expand Down Expand Up @@ -171,7 +188,7 @@ public function prepareEventStore(): void
$this->initializated[$connectionName] = true;
}

public function getEventStore(?StreamName $streamName = null): EventStore
public function getEventStore(?StreamName $streamName = null, string|null $streamStrategy = null): EventStore
{
$contextName = $this->getContextName($streamName);
if (isset($this->initializedEventStore[$contextName])) {
Expand All @@ -188,9 +205,9 @@ public function getEventStore(?StreamName $streamName = null): EventStore
$eventStoreType = $this->getEventStoreType();

$persistenceStrategy = match ($eventStoreType) {
self::EVENT_STORE_TYPE_MYSQL => $this->getMysqlPersistenceStrategyFor($streamName),
self::EVENT_STORE_TYPE_MARIADB => $this->getMariaDbPersistenceStrategyFor($streamName),
self::EVENT_STORE_TYPE_POSTGRES => $this->getPostgresPersistenceStrategyFor($streamName),
self::EVENT_STORE_TYPE_MYSQL => $this->getMysqlPersistenceStrategyFor($streamName, $streamStrategy),
self::EVENT_STORE_TYPE_MARIADB => $this->getMariaDbPersistenceStrategyFor($streamName, $streamStrategy),
self::EVENT_STORE_TYPE_POSTGRES => $this->getPostgresPersistenceStrategyFor($streamName, $streamStrategy),
default => throw InvalidArgumentException::create('Unexpected match value ' . $eventStoreType)
};

Expand Down Expand Up @@ -225,31 +242,37 @@ public function getEventStore(?StreamName $streamName = null): EventStore
return $eventStore;
}

private function getMysqlPersistenceStrategyFor(?StreamName $streamName = null): PersistenceStrategy
private function getMysqlPersistenceStrategyFor(?StreamName $streamName = null, ?string $forcedStrategy = null): PersistenceStrategy
{
return match ($this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName)) {
$persistenceStrategy = $forcedStrategy ?? $this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName);

return match ($persistenceStrategy) {
self::AGGREGATE_STREAM_PERSISTENCE => new PersistenceStrategy\MySqlAggregateStreamStrategy($this->messageConverter),
self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MySqlSingleStreamStrategy($this->messageConverter),
self::PARTITION_STREAM_PERSISTENCE, self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MySqlSingleStreamStrategy($this->messageConverter),
self::SIMPLE_STREAM_PERSISTENCE => new InterlopMysqlSimpleStreamStrategy($this->messageConverter),
self::CUSTOM_STREAM_PERSISTENCE => $this->eventSourcingConfiguration->getCustomPersistenceStrategy(),
};
}

private function getMariaDbPersistenceStrategyFor(?StreamName $streamName = null): PersistenceStrategy
private function getMariaDbPersistenceStrategyFor(?StreamName $streamName = null, ?string $forcedStrategy = null): PersistenceStrategy
{
return match ($this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName)) {
$persistenceStrategy = $forcedStrategy ?? $this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName);

return match ($persistenceStrategy) {
self::AGGREGATE_STREAM_PERSISTENCE => new PersistenceStrategy\MariaDbAggregateStreamStrategy($this->messageConverter),
self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MariaDbSingleStreamStrategy($this->messageConverter),
self::PARTITION_STREAM_PERSISTENCE, self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MariaDbSingleStreamStrategy($this->messageConverter),
self::SIMPLE_STREAM_PERSISTENCE => new InterlopMariaDbSimpleStreamStrategy($this->messageConverter),
self::CUSTOM_STREAM_PERSISTENCE => $this->eventSourcingConfiguration->getCustomPersistenceStrategy(),
};
}

private function getPostgresPersistenceStrategyFor(?StreamName $streamName = null): PersistenceStrategy
private function getPostgresPersistenceStrategyFor(?StreamName $streamName = null, ?string $forcedStrategy = null): PersistenceStrategy
{
return match ($this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName)) {
$persistenceStrategy = $forcedStrategy ?? $this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName);

return match ($persistenceStrategy) {
self::AGGREGATE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresAggregateStreamStrategy($this->messageConverter),
self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresSingleStreamStrategy($this->messageConverter),
self::PARTITION_STREAM_PERSISTENCE, self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresSingleStreamStrategy($this->messageConverter),
self::SIMPLE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresSimpleStreamStrategy($this->messageConverter),
self::CUSTOM_STREAM_PERSISTENCE => $this->eventSourcingConfiguration->getCustomPersistenceStrategy(),
};
Expand Down
Loading

0 comments on commit c8e193d

Please sign in to comment.