Skip to content

Commit

Permalink
Add failing test with synchronous event sourced saga (#112)
Browse files Browse the repository at this point in the history
* Add failing test with synchronous event sourced saga

* Improve event handler routing

---------

Co-authored-by: jlabedo <jean@needelp.com>
Co-authored-by: Dariusz Gafka <dariuszgafka@gmail.com>
Co-authored-by: Dariusz Gafka <dgafka.mail@gmail.com>
  • Loading branch information
4 people authored Jun 4, 2023
1 parent 92b3118 commit f640b60
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 21 deletions.
27 changes: 8 additions & 19 deletions packages/Ecotone/src/Modelling/Config/BusRoutingModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ public static function getEventBusByObjectsMapping(AnnotationFinder $annotationR
continue;
}

$classChannels = ModellingHandlerModule::getEventPayloadClasses($registration, $interfaceToCallRegistry);
$unionEventClasses = ModellingHandlerModule::getEventPayloadClasses($registration, $interfaceToCallRegistry);
$namedMessageChannelFor = ModellingHandlerModule::getNamedMessageChannelForEventHandler($registration, $interfaceToCallRegistry);

foreach ($classChannels as $classChannel) {
foreach ($unionEventClasses as $classChannel) {
$objectEventHandlers[$classChannel][] = $namedMessageChannelFor;
$objectEventHandlers[$classChannel] = array_unique($objectEventHandlers[$classChannel]);
}
Expand All @@ -264,9 +264,9 @@ public static function getEventBusByObjectsMapping(AnnotationFinder $annotationR
continue;
}

$classChannels = ModellingHandlerModule::getEventPayloadClasses($registration, $interfaceToCallRegistry);
$unionEventClasses = ModellingHandlerModule::getEventPayloadClasses($registration, $interfaceToCallRegistry);
$namedMessageChannelFor = ModellingHandlerModule::getNamedMessageChannelForEventHandler($registration, $interfaceToCallRegistry);
foreach ($classChannels as $classChannel) {
foreach ($unionEventClasses as $classChannel) {
if (! EventBusRouter::isRegexBasedRoute($namedMessageChannelFor)) {
$objectEventHandlers[$classChannel][] = $namedMessageChannelFor;
$objectEventHandlers[$classChannel] = array_unique($objectEventHandlers[$classChannel]);
Expand All @@ -290,22 +290,10 @@ public static function getEventBusByNamesMapping(AnnotationFinder $annotationReg
continue;
}

$chanelName = ModellingHandlerModule::getNamedMessageChannelForEventHandler($registration, $interfaceToCallRegistry);

if ($annotation->getListenTo()) {
$chanelName = ModellingHandlerModule::getNamedMessageChannelForEventHandler($registration, $interfaceToCallRegistry);
$namedEventHandlers[$chanelName][] = $chanelName;
$namedEventHandlers[$chanelName] = array_unique($namedEventHandlers[$chanelName]);
} else {
$type = TypeDescriptor::create($chanelName);
if ($type->isUnionType()) {
foreach ($type->getUnionTypes() as $type) {
$namedEventHandlers[$type->toString()][] = $chanelName;
$namedEventHandlers[$type->toString()] = array_unique($namedEventHandlers[$type->toString()]);
}
} else {
$namedEventHandlers[$chanelName][] = $chanelName;
$namedEventHandlers[$chanelName] = array_unique($namedEventHandlers[$chanelName]);
}
}
}
foreach ($annotationRegistrationService->findCombined(Aggregate::class, EventHandler::class) as $registration) {
Expand Down Expand Up @@ -382,6 +370,7 @@ private static function verifyUniqueness(array $uniqueChannels): void
*/
public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
{
$pointcut = CommandBus::class . '||' . EventBus::class . '||' . QueryBus::class . '||' . AsynchronousRunningEndpoint::class . '||' . PropagateHeaders::class . '||' . MessagingEntrypointWithHeadersPropagation::class;
$messagingConfiguration
->registerBeforeMethodInterceptor(
MethodInterceptor::create(
Expand All @@ -394,7 +383,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
]
),
Precedence::ENDPOINT_HEADERS_PRECEDENCE - 2,
CommandBus::class . '||' . EventBus::class . '||' . QueryBus::class . '||' . AsynchronousRunningEndpoint::class . '||' . PropagateHeaders::class . '||' . MessagingEntrypointWithHeadersPropagation::class
$pointcut
)
)
->registerAroundMethodInterceptor(
Expand All @@ -403,7 +392,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
$this->messageHeadersPropagator,
'storeHeaders',
Precedence::ENDPOINT_HEADERS_PRECEDENCE - 1,
CommandBus::class . '||' . EventBus::class . '||' . QueryBus::class . '||' . AsynchronousRunningEndpoint::class . '||' . PropagateHeaders::class . '||' . MessagingEntrypointWithHeadersPropagation::class
$pointcut
)
)
->registerMessageHandler($this->commandBusByObject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,14 +420,14 @@ public function test_registering_service_event_handler()
ServiceEventHandlerWithClass::class,
];

$this->assertRouting($annotatedClasses, [], [], [], [], [stdClass::class => [stdClass::class]], [stdClass::class => [stdClass::class]]);
$this->assertRouting($annotatedClasses, [], [], [], [], [stdClass::class => [stdClass::class]], []);
}

public function test_union_registering_service_event_handler()
{
$annotatedClasses = [EventHandlerForUnionType::class];

$this->assertRouting($annotatedClasses, [], [], [], [], [stdClass::class => [stdClass::class . '|' . \Test\Ecotone\Modelling\Fixture\Annotation\EventHandler\OrderWasPlaced::class], \Test\Ecotone\Modelling\Fixture\Annotation\EventHandler\OrderWasPlaced::class => [stdClass::class . '|' . \Test\Ecotone\Modelling\Fixture\Annotation\EventHandler\OrderWasPlaced::class]], [stdClass::class => [stdClass::class . '|' . \Test\Ecotone\Modelling\Fixture\Annotation\EventHandler\OrderWasPlaced::class], \Test\Ecotone\Modelling\Fixture\Annotation\EventHandler\OrderWasPlaced::class => [stdClass::class . '|' . \Test\Ecotone\Modelling\Fixture\Annotation\EventHandler\OrderWasPlaced::class]]);
$this->assertRouting($annotatedClasses, [], [], [], [], [stdClass::class => [stdClass::class . '|' . \Test\Ecotone\Modelling\Fixture\Annotation\EventHandler\OrderWasPlaced::class], \Test\Ecotone\Modelling\Fixture\Annotation\EventHandler\OrderWasPlaced::class => [stdClass::class . '|' . \Test\Ecotone\Modelling\Fixture\Annotation\EventHandler\OrderWasPlaced::class]], []);
}

public function test_registering_aggregate_event_handler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@

use PHPUnit\Framework\TestCase;
use Ramsey\Uuid\Uuid;

use Test\Ecotone\EventSourcing\EventSourcingMessagingTest;
use Test\Ecotone\EventSourcing\Fixture\Basket\BasketEventConverter;
use Test\Ecotone\EventSourcing\Fixture\Basket\Command\AddProduct;
use Test\Ecotone\EventSourcing\Fixture\Basket\Command\CreateBasket;
use Test\Ecotone\EventSourcing\Fixture\BasketListProjection\BasketList;
use Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga\SagaEventConverter;
use Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga\SynchronousBasketList;
use Test\Ecotone\EventSourcing\Fixture\CustomEventStream\CustomEventStreamProjection;
use Test\Ecotone\EventSourcing\Fixture\ProjectionFromCategoryUsingAggregatePerStream\FromCategoryUsingAggregatePerStreamProjection;
use Test\Ecotone\EventSourcing\Fixture\ProjectionFromMultipleStreams\MultipleStreamsProjection;
Expand Down Expand Up @@ -232,6 +235,10 @@ private function prepareMessaging(array $namespaces, bool $failFast): void
$objects = array_merge($objects, [new BasketList()]);
break;
}
case "Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga": {
$objects = array_merge($objects, [new \Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga\SagaProjection(), new SagaEventConverter(), new SynchronousBasketList()]);
break;
}
case "Test\Ecotone\EventSourcing\Fixture\Snapshots":
{
$objects = array_merge($objects, [new TicketMediaTypeConverter(), new BasketMediaTypeConverter()]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,13 @@ Feature: activating as aggregate order entity
Then I should see ticket count equal 2 and ticket closed count equal 2
When I register "alert" ticket 12345 with assignation to "Johny"
Then I should see ticket count equal 3 and ticket closed count equal 2

Scenario: I verify building synchronous event driven projection using in memory event store and synchronous saga
Given I active messaging for namespaces
| Test\Ecotone\EventSourcing\Fixture\Basket |
| Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga |
| Test\Ecotone\EventSourcing\Fixture\InMemoryEventStore |
When I create basket with id 1000
Then I should see baskets:
| id | products |
| 1000 | ["chocolate"] |
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga;

use Ecotone\Modelling\Attribute\AggregateIdentifier;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\Attribute\EventSourcingHandler;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\WithAggregateVersioning;
use Test\Ecotone\EventSourcing\Fixture\Basket\Command\AddProduct;
use Test\Ecotone\EventSourcing\Fixture\Basket\Event\BasketWasCreated;

#[EventSourcingAggregate]
class Saga
{
use WithAggregateVersioning;
#[AggregateIdentifier]
private string $id;

#[EventHandler]
public static function start(BasketWasCreated $event): array
{
return [new SagaStarted($event->getId())];
}

#[EventHandler]
public function whenSagaStarted(SagaStarted $event, CommandBus $commandBus): array
{
if ($event->getId() === "1000") {
$commandBus->send(new AddProduct($event->getId(), 'chocolate'));
}

return [];
}

#[EventSourcingHandler()]
public function applySagaStarted(SagaStarted $event): void
{
$this->id = $event->getId();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga;

use Ecotone\Messaging\Attribute\Converter;

class SagaEventConverter
{
#[Converter]
public function fromSagaStarted(SagaStarted $event): array
{
return [
'id' => $event->getId(),
];
}

#[Converter]
public function toTicketWasRegistered(array $event): SagaStarted
{
return new SagaStarted($event['id']);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga;

use Ecotone\EventSourcing\Attribute\Projection;
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;

#[Projection(self::PROJECTION_NAME, Saga::class)]
class SagaProjection
{
public const PROJECTION_NAME = 'saga_projection';
private bool $isInitialized = false;
private array $sagaStarted = [];

#[EventHandler]
public function when(SagaStarted $event): void
{
Assert::isTrue($this->isInitialized, 'Saga Projection is not initialized');
$this->sagaStarted[$event->getId()] = true;
}

#[ProjectionInitialization]
public function init(): void
{
$this->isInitialized = true;
}

#[QueryHandler('isSagaStarted')]
public function isSagaStarted(string $sagaId): bool
{
return $this->sagaStarted[$sagaId] ?? false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga;

class SagaStarted
{
public function __construct(private string $id)
{
}

public function getId(): string
{
return $this->id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga;

use Ecotone\EventSourcing\Attribute\Projection;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Test\Ecotone\EventSourcing\Fixture\Basket\Basket;
use Test\Ecotone\EventSourcing\Fixture\Basket\Event\BasketWasCreated;
use Test\Ecotone\EventSourcing\Fixture\Basket\Event\ProductWasAddedToBasket;

#[Projection(self::PROJECTION_NAME, Basket::BASKET_STREAM)]
class SynchronousBasketList
{
public const PROJECTION_NAME = 'basketList';
private array $basketsList = [];

#[EventHandler(BasketWasCreated::EVENT_NAME)]
public function addBasket(array $event): void
{
$this->basketsList[$event['id']] = [];
}

#[EventHandler(ProductWasAddedToBasket::EVENT_NAME)]
public function addProduct(ProductWasAddedToBasket $event): void
{
$this->basketsList[$event->getId()][] = $event->getProductName();
}

#[QueryHandler('getALlBaskets')]
public function getAllBaskets(): array
{
return $this->basketsList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Integration;

use Ecotone\Lite\EcotoneLite;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use PHPUnit\Framework\TestCase;
use Test\Ecotone\EventSourcing\Fixture\Basket\Basket;
use Test\Ecotone\EventSourcing\Fixture\Basket\BasketEventConverter;
use Test\Ecotone\EventSourcing\Fixture\Basket\Command\AddProduct;
use Test\Ecotone\EventSourcing\Fixture\Basket\Command\CreateBasket;
use Test\Ecotone\EventSourcing\Fixture\Basket\Event\BasketWasCreated;
use Test\Ecotone\EventSourcing\Fixture\Basket\Event\ProductWasAddedToBasket;
use Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga\Saga;
use Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga\SagaEventConverter;
use Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga\SagaProjection;
use Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga\SagaStarted;
use Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga\SynchronousBasketList;

final class SynchronousEventDrivenSagaTest extends TestCase
{
public function test_product_is_added_by_synchronous_event_driven_saga(): void
{
$testSupport = EcotoneLite::bootstrapFlowTestingWithEventStore(
containerOrAvailableServices: [new SagaProjection(), new SynchronousBasketList(), new SagaEventConverter(), new BasketEventConverter()],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE]))
->withNamespaces([
'Test\Ecotone\EventSourcing\Fixture\Basket',
'Test\Ecotone\EventSourcing\Fixture\BasketWithSynchronousEventDrivenSaga',
]),
pathToRootCatalog: __DIR__ . "/../../"
);

$testSupport->sendCommand(new CreateBasket('1000'));

self::assertEquals([
new CreateBasket('1000'),
new AddProduct('1000', 'chocolate'),
], $testSupport->getRecordedCommands());

self::assertEquals([
new BasketWasCreated('1000'),
new SagaStarted('1000'),
new ProductWasAddedToBasket('1000', 'chocolate'),
], $testSupport->getRecordedEvents());

self::assertEquals(true, $testSupport->sendQueryWithRouting('isSagaStarted', '1000'));

self::assertEquals([
'1000' => ['chocolate'],
], $testSupport->sendQueryWithRouting('getALlBaskets'));
}
}

0 comments on commit f640b60

Please sign in to comment.