Skip to content

Commit

Permalink
Merge pull request #9 from fraktalio/feature/saga
Browse files Browse the repository at this point in the history
Feature/saga
  • Loading branch information
idugalic authored Oct 14, 2023
2 parents 7b50ea8 + 54cc73f commit 090b5a0
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 8 deletions.
52 changes: 44 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@
//! ### Event-sourcing aggregate
//!
//! [aggregate::EventSourcedAggregate] is using/delegating a `Decider` to handle commands and produce new events.
//! It belongs to the
//! Application layer. In order to
//! handle the command, aggregate needs to fetch the current state (represented as a list/vector of events)
//!
//! It belongs to the Application layer.
//!
//! In order to handle the command, aggregate needs to fetch the current state (represented as a list/vector of events)
//! via `EventRepository.fetchEvents` async function, and then delegate the command to the decider which can produce new
//! events as
//! a result. Produced events are then stored via `EventRepository.save` async function.
Expand All @@ -53,10 +54,11 @@
//!
//! ### State-stored aggregate
//!
//! [aggregate::StateStoredAggregate] is using/delegating a `Decider` to handle commands and produce new state. It
//! belongs to the
//! Application layer. In order to
//! handle the command, aggregate needs to fetch the current state via `StateRepository.fetchState` async function first,
//! [aggregate::StateStoredAggregate] is using/delegating a `Decider` to handle commands and produce new state.
//!
//! It belongs to the Application layer.
//!
//! In order to handle the command, aggregate needs to fetch the current state via `StateRepository.fetchState` async function first,
//! and then
//! delegate the command to the decider which can produce new state as a result. New state is then stored
//! via `StateRepository.save` async function.
Expand Down Expand Up @@ -91,12 +93,42 @@
//! [materialized_view::MaterializedView] is using/delegating a `View` to handle events of type `E` and to maintain
//! a state of denormalized
//! projection(s) as a
//! result. Essentially, it represents the query/view side of the CQRS pattern. It belongs to the Application layer.
//! result. Essentially, it represents the query/view side of the CQRS pattern.
//!
//! It belongs to the Application layer.
//!
//! In order to handle the event, materialized view needs to fetch the current state via `ViewStateRepository.fetchState`
//! suspending function first, and then delegate the event to the view, which can produce new state as a result. New state
//! is then stored via `ViewStateRepository.save` suspending function.
//!
//!
//! ## Saga
//!
//! `Saga` is a datatype that represents the central point of control, deciding what to execute next (`A`), based on the action result (`AR`).
//! It has two generic parameters `AR`/Action Result, `A`/Action , representing the type of the values that Saga may contain or use.
//! `'a` is used as a lifetime parameter, indicating that all references contained within the struct (e.g., references within the function closures) must have a lifetime that is at least as long as 'a.
//!
//! `Saga` is a pure domain component.
//!
//! - `AR` - Action Result/Event
//! - `A` - Action/Command
//!
//! ```rust
//! pub type ReactFunction<'a, AR, A> = Box<dyn Fn(&AR) -> Vec<A> + 'a + Send + Sync>;
//! pub struct Saga<'a, AR: 'a, A: 'a> {
//! pub react: ReactFunction<'a, AR, A>,
//! }
//! ```
//!
//! ### Saga Manager
//!
//! [saga_manager::SagaManager] is using/delegating a `Saga` to react to the action result and to publish the new actions.
//!
//! It belongs to the Application layer.
//!
//! It is using a [saga::Saga] to react to the action result and to publish the new actions.
//! It is using an [saga_manager::ActionPublisher] to publish the new actions.
//!
//! ## FModel in other languages
//!
//! - [FModel Kotlin](https://github.com/fraktalio/fmodel/)
Expand All @@ -114,6 +146,8 @@
pub mod aggregate;
pub mod decider;
pub mod materialized_view;
pub mod saga;
pub mod saga_manager;
pub mod view;

/// The [DecideFunction] function is used to decide which events to produce based on the command and the current state.
Expand All @@ -122,3 +156,5 @@ pub type DecideFunction<'a, C, S, E> = Box<dyn Fn(&C, &S) -> Vec<E> + 'a + Send
pub type EvolveFunction<'a, S, E> = Box<dyn Fn(&S, &E) -> S + 'a + Send + Sync>;
/// The [InitialStateFunction] function is used to produce the initial state.
pub type InitialStateFunction<'a, S> = Box<dyn Fn() -> S + 'a + Send + Sync>;
/// The [ReactFunction] function is used to decide what actions/A to execute next based on the action result/AR.
pub type ReactFunction<'a, AR, A> = Box<dyn Fn(&AR) -> Vec<A> + 'a + Send + Sync>;
117 changes: 117 additions & 0 deletions src/saga.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use crate::ReactFunction;

/// [Saga] is a datatype that represents the central point of control, deciding what to execute next ([A]), based on the action result ([AR]).
/// It has two generic parameters `AR`/Action Result, `A`/Action , representing the type of the values that Saga may contain or use.
/// `'a` is used as a lifetime parameter, indicating that all references contained within the struct (e.g., references within the function closures) must have a lifetime that is at least as long as 'a.
///
/// It is common to consider Event as Action Result, and Command as Action, but it is not mandatory.
/// For example, Action Result can be a request response from a remote service.
///
/// ## Example
///
/// ```
/// use fmodel_rust::saga::Saga;
///
/// fn saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> {
/// Saga {
/// react: Box::new(|event| match event {
/// OrderEvent::Created(created_event) => {
/// vec![ShipmentCommand::Create(CreateShipmentCommand {
/// shipment_id: created_event.order_id,
/// order_id: created_event.order_id,
/// customer_name: created_event.customer_name.to_owned(),
/// items: created_event.items.to_owned(),
/// })]
/// }
/// OrderEvent::Updated(_updated_event) => {
/// vec![]
/// }
/// OrderEvent::Cancelled(_cancelled_event) => {
/// vec![]
/// }
/// }),
/// }
/// }
///
/// #[derive(Debug, PartialEq)]
/// #[allow(dead_code)]
/// pub enum ShipmentCommand {
/// Create(CreateShipmentCommand),
/// }
///
/// #[derive(Debug, PartialEq)]
/// pub struct CreateShipmentCommand {
/// pub shipment_id: u32,
/// pub order_id: u32,
/// pub customer_name: String,
/// pub items: Vec<String>,
/// }
///
/// #[derive(Debug)]
/// pub enum OrderEvent {
/// Created(OrderCreatedEvent),
/// Updated(OrderUpdatedEvent),
/// Cancelled(OrderCancelledEvent),
/// }
///
/// #[derive(Debug)]
/// pub struct OrderCreatedEvent {
/// pub order_id: u32,
/// pub customer_name: String,
/// pub items: Vec<String>,
/// }
///
/// #[derive(Debug)]
/// pub struct OrderUpdatedEvent {
/// pub order_id: u32,
/// pub updated_items: Vec<String>,
/// }
///
/// #[derive(Debug)]
/// pub struct OrderCancelledEvent {
/// pub order_id: u32,
/// }
///
/// let saga: Saga<OrderEvent, ShipmentCommand> = saga();
/// let order_created_event = OrderEvent::Created(OrderCreatedEvent {
/// order_id: 1,
/// customer_name: "John Doe".to_string(),
/// items: vec!["Item 1".to_string(), "Item 2".to_string()],
/// });
///
/// let commands = (saga.react)(&order_created_event);
/// ```
pub struct Saga<'a, AR: 'a, A: 'a> {
/// The `react` function is driving the next action based on the action result.
pub react: ReactFunction<'a, AR, A>,
}

impl<'a, AR, A> Saga<'a, AR, A> {
/// Maps the Saga over the A/Action type parameter.
/// Creates a new instance of [Saga]`<AR, A2>`.
pub fn map_action<A2, F>(self, f: &'a F) -> Saga<'a, AR, A2>
where
F: Fn(&A) -> A2 + Send + Sync,
{
let new_react = Box::new(move |ar: &AR| {
let a = (self.react)(ar);
a.into_iter().map(|a: A| f(&a)).collect()
});

Saga { react: new_react }
}

/// Maps the Saga over the AR/ActionResult type parameter.
/// Creates a new instance of [Saga]`<AR2, A>`.
pub fn map_action_result<AR2, F>(self, f: &'a F) -> Saga<'a, AR2, A>
where
F: Fn(&AR2) -> AR + Send + Sync,
{
let new_react = Box::new(move |ar2: &AR2| {
let ar = f(ar2);
(self.react)(&ar)
});

Saga { react: new_react }
}
}
54 changes: 54 additions & 0 deletions src/saga_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::marker::PhantomData;

use async_trait::async_trait;

use crate::saga::Saga;

/// Publishes the action/command to some external system.
///
/// Generic parameter:
///
/// - `A`. - action
/// - `Error` - error
#[async_trait]
pub trait ActionPublisher<A, Error> {
async fn publish(&self, action: &[A]) -> Result<Vec<A>, Error>;
}

/// Saga Manager.
///
/// It is using a [Saga] to react to the action result and to publish the new actions.
/// It is using an [ActionPublisher] to publish the new actions.
///
/// Generic parameters:
/// - `A` - Action / Command
/// - `AR` - Action Result / Event
/// - `Publisher` - Action Publisher
/// - `Error` - Error
pub struct SagaManager<'a, A, AR, Publisher, Error>
where
Publisher: ActionPublisher<A, Error>,
{
action_publisher: Publisher,
saga: Saga<'a, AR, A>,
_marker: PhantomData<(A, AR, Error)>,
}

impl<'a, A, AR, Publisher, Error> SagaManager<'a, A, AR, Publisher, Error>
where
Publisher: ActionPublisher<A, Error>,
{
pub fn new(action_publisher: Publisher, saga: Saga<'a, AR, A>) -> Self {
SagaManager {
action_publisher,
saga,
_marker: PhantomData,
}
}
/// Handles the action result by publishing it to the external system.
pub async fn handle(&self, action_result: &AR) -> Result<Vec<A>, Error> {
let new_actions = (self.saga.react)(action_result);
let published_actions = self.action_publisher.publish(&new_actions).await?;
Ok(published_actions)
}
}
17 changes: 17 additions & 0 deletions tests/api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Order API
#[derive(Debug)]
#[allow(dead_code)]
pub enum OrderCommand {
Expand Down Expand Up @@ -36,6 +37,7 @@ impl OrderCommand {
}

#[derive(Debug, Clone, PartialEq)]
#[allow(dead_code)]
pub enum OrderEvent {
Created(OrderCreatedEvent),
Updated(OrderUpdatedEvent),
Expand Down Expand Up @@ -70,3 +72,18 @@ impl OrderEvent {
}
}
}

// Shipment API
#[derive(Debug, PartialEq, Clone)]
#[allow(dead_code)]
pub enum ShipmentCommand {
Create(CreateShipmentCommand),
}

#[derive(Debug, PartialEq, Clone)]
pub struct CreateShipmentCommand {
pub shipment_id: u32,
pub order_id: u32,
pub customer_name: String,
pub items: Vec<String>,
}
82 changes: 82 additions & 0 deletions tests/saga_manager_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use async_trait::async_trait;
use derive_more::Display;
use fmodel_rust::saga::Saga;
use fmodel_rust::saga_manager::{ActionPublisher, SagaManager};
use std::error::Error;

use crate::api::{CreateShipmentCommand, OrderCreatedEvent, OrderEvent, ShipmentCommand};

mod api;

fn saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> {
Saga {
react: Box::new(|event| match event {
OrderEvent::Created(created_event) => {
vec![ShipmentCommand::Create(CreateShipmentCommand {
shipment_id: created_event.order_id,
order_id: created_event.order_id,
customer_name: created_event.customer_name.to_owned(),
items: created_event.items.to_owned(),
})]
}
OrderEvent::Updated(_updated_event) => {
vec![]
}
OrderEvent::Cancelled(_cancelled_event) => {
vec![]
}
}),
}
}

/// Error type for the saga manager
#[derive(Debug, Display)]
#[allow(dead_code)]
enum SagaManagerError {
PublishAction(String),
}

impl Error for SagaManagerError {}

/// Simple action publisher that just returns the action/command.
/// It is used for testing. In real life, it would publish the action/command to some external system. or to an aggregate that is able to handel the action/command.
struct SimpleActionPublisher;

impl SimpleActionPublisher {
fn new() -> Self {
SimpleActionPublisher {}
}
}

#[async_trait]
impl ActionPublisher<ShipmentCommand, SagaManagerError> for SimpleActionPublisher {
async fn publish(
&self,
action: &[ShipmentCommand],
) -> Result<Vec<ShipmentCommand>, SagaManagerError> {
Ok(Vec::from(action))
}
}

#[tokio::test]
async fn test() {
let saga: Saga<OrderEvent, ShipmentCommand> = saga();
let order_created_event = OrderEvent::Created(OrderCreatedEvent {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
});

let saga_manager = SagaManager::new(SimpleActionPublisher::new(), saga);
let result = saga_manager.handle(&order_created_event).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
vec![ShipmentCommand::Create(CreateShipmentCommand {
shipment_id: 1,
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
})]
);
}
Loading

0 comments on commit 090b5a0

Please sign in to comment.