diff --git a/src/lib.rs b/src/lib.rs index 32e9f09..e89ae32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,9 +42,10 @@ //! ### Event-sourcing aggregate //! //! [aggregate::EventSourcedAggregate] is using/delegating a `Decider` to handle commands and produce new events. -//! It belongs to the -//! Application layer. In order to -//! handle the command, aggregate needs to fetch the current state (represented as a list/vector of events) +//! +//! It belongs to the Application layer. +//! +//! In order to handle the command, aggregate needs to fetch the current state (represented as a list/vector of events) //! via `EventRepository.fetchEvents` async function, and then delegate the command to the decider which can produce new //! events as //! a result. Produced events are then stored via `EventRepository.save` async function. @@ -53,10 +54,11 @@ //! //! ### State-stored aggregate //! -//! [aggregate::StateStoredAggregate] is using/delegating a `Decider` to handle commands and produce new state. It -//! belongs to the -//! Application layer. In order to -//! handle the command, aggregate needs to fetch the current state via `StateRepository.fetchState` async function first, +//! [aggregate::StateStoredAggregate] is using/delegating a `Decider` to handle commands and produce new state. +//! +//! It belongs to the Application layer. +//! +//! In order to handle the command, aggregate needs to fetch the current state via `StateRepository.fetchState` async function first, //! and then //! delegate the command to the decider which can produce new state as a result. New state is then stored //! via `StateRepository.save` async function. @@ -91,12 +93,42 @@ //! [materialized_view::MaterializedView] is using/delegating a `View` to handle events of type `E` and to maintain //! a state of denormalized //! projection(s) as a -//! result. Essentially, it represents the query/view side of the CQRS pattern. It belongs to the Application layer. +//! result. Essentially, it represents the query/view side of the CQRS pattern. +//! +//! It belongs to the Application layer. //! //! In order to handle the event, materialized view needs to fetch the current state via `ViewStateRepository.fetchState` //! suspending function first, and then delegate the event to the view, which can produce new state as a result. New state //! is then stored via `ViewStateRepository.save` suspending function. //! +//! +//! ## Saga +//! +//! `Saga` is a datatype that represents the central point of control, deciding what to execute next (`A`), based on the action result (`AR`). +//! It has two generic parameters `AR`/Action Result, `A`/Action , representing the type of the values that Saga may contain or use. +//! `'a` is used as a lifetime parameter, indicating that all references contained within the struct (e.g., references within the function closures) must have a lifetime that is at least as long as 'a. +//! +//! `Saga` is a pure domain component. +//! +//! - `AR` - Action Result/Event +//! - `A` - Action/Command +//! +//! ```rust +//! pub type ReactFunction<'a, AR, A> = Box Vec + 'a + Send + Sync>; +//! pub struct Saga<'a, AR: 'a, A: 'a> { +//! pub react: ReactFunction<'a, AR, A>, +//! } +//! ``` +//! +//! ### Saga Manager +//! +//! [saga_manager::SagaManager] is using/delegating a `Saga` to react to the action result and to publish the new actions. +//! +//! It belongs to the Application layer. +//! +//! It is using a [saga::Saga] to react to the action result and to publish the new actions. +//! It is using an [saga_manager::ActionPublisher] to publish the new actions. +//! //! ## FModel in other languages //! //! - [FModel Kotlin](https://github.com/fraktalio/fmodel/) @@ -114,6 +146,8 @@ pub mod aggregate; pub mod decider; pub mod materialized_view; +pub mod saga; +pub mod saga_manager; pub mod view; /// The [DecideFunction] function is used to decide which events to produce based on the command and the current state. @@ -122,3 +156,5 @@ pub type DecideFunction<'a, C, S, E> = Box Vec + 'a + Send pub type EvolveFunction<'a, S, E> = Box S + 'a + Send + Sync>; /// The [InitialStateFunction] function is used to produce the initial state. pub type InitialStateFunction<'a, S> = Box S + 'a + Send + Sync>; +/// The [ReactFunction] function is used to decide what actions/A to execute next based on the action result/AR. +pub type ReactFunction<'a, AR, A> = Box Vec + 'a + Send + Sync>; diff --git a/src/saga.rs b/src/saga.rs new file mode 100644 index 0000000..12548a3 --- /dev/null +++ b/src/saga.rs @@ -0,0 +1,117 @@ +use crate::ReactFunction; + +/// [Saga] is a datatype that represents the central point of control, deciding what to execute next ([A]), based on the action result ([AR]). +/// It has two generic parameters `AR`/Action Result, `A`/Action , representing the type of the values that Saga may contain or use. +/// `'a` is used as a lifetime parameter, indicating that all references contained within the struct (e.g., references within the function closures) must have a lifetime that is at least as long as 'a. +/// +/// It is common to consider Event as Action Result, and Command as Action, but it is not mandatory. +/// For example, Action Result can be a request response from a remote service. +/// +/// ## Example +/// +/// ``` +/// use fmodel_rust::saga::Saga; +/// +/// fn saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { +/// Saga { +/// react: Box::new(|event| match event { +/// OrderEvent::Created(created_event) => { +/// vec![ShipmentCommand::Create(CreateShipmentCommand { +/// shipment_id: created_event.order_id, +/// order_id: created_event.order_id, +/// customer_name: created_event.customer_name.to_owned(), +/// items: created_event.items.to_owned(), +/// })] +/// } +/// OrderEvent::Updated(_updated_event) => { +/// vec![] +/// } +/// OrderEvent::Cancelled(_cancelled_event) => { +/// vec![] +/// } +/// }), +/// } +/// } +/// +/// #[derive(Debug, PartialEq)] +/// #[allow(dead_code)] +/// pub enum ShipmentCommand { +/// Create(CreateShipmentCommand), +/// } +/// +/// #[derive(Debug, PartialEq)] +/// pub struct CreateShipmentCommand { +/// pub shipment_id: u32, +/// pub order_id: u32, +/// pub customer_name: String, +/// pub items: Vec, +/// } +/// +/// #[derive(Debug)] +/// pub enum OrderEvent { +/// Created(OrderCreatedEvent), +/// Updated(OrderUpdatedEvent), +/// Cancelled(OrderCancelledEvent), +/// } +/// +/// #[derive(Debug)] +/// pub struct OrderCreatedEvent { +/// pub order_id: u32, +/// pub customer_name: String, +/// pub items: Vec, +/// } +/// +/// #[derive(Debug)] +/// pub struct OrderUpdatedEvent { +/// pub order_id: u32, +/// pub updated_items: Vec, +/// } +/// +/// #[derive(Debug)] +/// pub struct OrderCancelledEvent { +/// pub order_id: u32, +/// } +/// +/// let saga: Saga = saga(); +/// let order_created_event = OrderEvent::Created(OrderCreatedEvent { +/// order_id: 1, +/// customer_name: "John Doe".to_string(), +/// items: vec!["Item 1".to_string(), "Item 2".to_string()], +/// }); +/// +/// let commands = (saga.react)(&order_created_event); +/// ``` +pub struct Saga<'a, AR: 'a, A: 'a> { + /// The `react` function is driving the next action based on the action result. + pub react: ReactFunction<'a, AR, A>, +} + +impl<'a, AR, A> Saga<'a, AR, A> { + /// Maps the Saga over the A/Action type parameter. + /// Creates a new instance of [Saga]``. + pub fn map_action(self, f: &'a F) -> Saga<'a, AR, A2> + where + F: Fn(&A) -> A2 + Send + Sync, + { + let new_react = Box::new(move |ar: &AR| { + let a = (self.react)(ar); + a.into_iter().map(|a: A| f(&a)).collect() + }); + + Saga { react: new_react } + } + + /// Maps the Saga over the AR/ActionResult type parameter. + /// Creates a new instance of [Saga]``. + pub fn map_action_result(self, f: &'a F) -> Saga<'a, AR2, A> + where + F: Fn(&AR2) -> AR + Send + Sync, + { + let new_react = Box::new(move |ar2: &AR2| { + let ar = f(ar2); + (self.react)(&ar) + }); + + Saga { react: new_react } + } +} diff --git a/src/saga_manager.rs b/src/saga_manager.rs new file mode 100644 index 0000000..4ec899f --- /dev/null +++ b/src/saga_manager.rs @@ -0,0 +1,54 @@ +use std::marker::PhantomData; + +use async_trait::async_trait; + +use crate::saga::Saga; + +/// Publishes the action/command to some external system. +/// +/// Generic parameter: +/// +/// - `A`. - action +/// - `Error` - error +#[async_trait] +pub trait ActionPublisher { + async fn publish(&self, action: &[A]) -> Result, Error>; +} + +/// Saga Manager. +/// +/// It is using a [Saga] to react to the action result and to publish the new actions. +/// It is using an [ActionPublisher] to publish the new actions. +/// +/// Generic parameters: +/// - `A` - Action / Command +/// - `AR` - Action Result / Event +/// - `Publisher` - Action Publisher +/// - `Error` - Error +pub struct SagaManager<'a, A, AR, Publisher, Error> +where + Publisher: ActionPublisher, +{ + action_publisher: Publisher, + saga: Saga<'a, AR, A>, + _marker: PhantomData<(A, AR, Error)>, +} + +impl<'a, A, AR, Publisher, Error> SagaManager<'a, A, AR, Publisher, Error> +where + Publisher: ActionPublisher, +{ + pub fn new(action_publisher: Publisher, saga: Saga<'a, AR, A>) -> Self { + SagaManager { + action_publisher, + saga, + _marker: PhantomData, + } + } + /// Handles the action result by publishing it to the external system. + pub async fn handle(&self, action_result: &AR) -> Result, Error> { + let new_actions = (self.saga.react)(action_result); + let published_actions = self.action_publisher.publish(&new_actions).await?; + Ok(published_actions) + } +} diff --git a/tests/api.rs b/tests/api.rs index 57cd035..aa04036 100644 --- a/tests/api.rs +++ b/tests/api.rs @@ -1,3 +1,4 @@ +// Order API #[derive(Debug)] #[allow(dead_code)] pub enum OrderCommand { @@ -36,6 +37,7 @@ impl OrderCommand { } #[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] pub enum OrderEvent { Created(OrderCreatedEvent), Updated(OrderUpdatedEvent), @@ -70,3 +72,18 @@ impl OrderEvent { } } } + +// Shipment API +#[derive(Debug, PartialEq, Clone)] +#[allow(dead_code)] +pub enum ShipmentCommand { + Create(CreateShipmentCommand), +} + +#[derive(Debug, PartialEq, Clone)] +pub struct CreateShipmentCommand { + pub shipment_id: u32, + pub order_id: u32, + pub customer_name: String, + pub items: Vec, +} diff --git a/tests/saga_manager_test.rs b/tests/saga_manager_test.rs new file mode 100644 index 0000000..18ee951 --- /dev/null +++ b/tests/saga_manager_test.rs @@ -0,0 +1,82 @@ +use async_trait::async_trait; +use derive_more::Display; +use fmodel_rust::saga::Saga; +use fmodel_rust::saga_manager::{ActionPublisher, SagaManager}; +use std::error::Error; + +use crate::api::{CreateShipmentCommand, OrderCreatedEvent, OrderEvent, ShipmentCommand}; + +mod api; + +fn saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { + Saga { + react: Box::new(|event| match event { + OrderEvent::Created(created_event) => { + vec![ShipmentCommand::Create(CreateShipmentCommand { + shipment_id: created_event.order_id, + order_id: created_event.order_id, + customer_name: created_event.customer_name.to_owned(), + items: created_event.items.to_owned(), + })] + } + OrderEvent::Updated(_updated_event) => { + vec![] + } + OrderEvent::Cancelled(_cancelled_event) => { + vec![] + } + }), + } +} + +/// Error type for the saga manager +#[derive(Debug, Display)] +#[allow(dead_code)] +enum SagaManagerError { + PublishAction(String), +} + +impl Error for SagaManagerError {} + +/// Simple action publisher that just returns the action/command. +/// It is used for testing. In real life, it would publish the action/command to some external system. or to an aggregate that is able to handel the action/command. +struct SimpleActionPublisher; + +impl SimpleActionPublisher { + fn new() -> Self { + SimpleActionPublisher {} + } +} + +#[async_trait] +impl ActionPublisher for SimpleActionPublisher { + async fn publish( + &self, + action: &[ShipmentCommand], + ) -> Result, SagaManagerError> { + Ok(Vec::from(action)) + } +} + +#[tokio::test] +async fn test() { + let saga: Saga = saga(); + let order_created_event = OrderEvent::Created(OrderCreatedEvent { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + + let saga_manager = SagaManager::new(SimpleActionPublisher::new(), saga); + let result = saga_manager.handle(&order_created_event).await; + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + vec![ShipmentCommand::Create(CreateShipmentCommand { + shipment_id: 1, + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + })] + ); +} diff --git a/tests/saga_test.rs b/tests/saga_test.rs new file mode 100644 index 0000000..ecf6e40 --- /dev/null +++ b/tests/saga_test.rs @@ -0,0 +1,46 @@ +use fmodel_rust::saga::Saga; + +use crate::api::{CreateShipmentCommand, OrderCreatedEvent, OrderEvent, ShipmentCommand}; + +mod api; + +fn saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { + Saga { + react: Box::new(|event| match event { + OrderEvent::Created(created_event) => { + vec![ShipmentCommand::Create(CreateShipmentCommand { + shipment_id: created_event.order_id, + order_id: created_event.order_id, + customer_name: created_event.customer_name.to_owned(), + items: created_event.items.to_owned(), + })] + } + OrderEvent::Updated(_updated_event) => { + vec![] + } + OrderEvent::Cancelled(_cancelled_event) => { + vec![] + } + }), + } +} + +#[test] +fn test() { + let saga: Saga = saga(); + let order_created_event = OrderEvent::Created(OrderCreatedEvent { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let commands = (saga.react)(&order_created_event); + assert_eq!( + commands, + vec![ShipmentCommand::Create(CreateShipmentCommand { + shipment_id: 1, + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + })] + ); +}