diff --git a/tests/aggregate_combined_test.rs b/tests/aggregate_combined_test.rs index f39a3ed..28c5dc0 100644 --- a/tests/aggregate_combined_test.rs +++ b/tests/aggregate_combined_test.rs @@ -1,41 +1,30 @@ use std::collections::HashMap; -use std::error::Error; use std::sync::{Arc, Mutex}; use std::thread; -use derive_more::Display; - use fmodel_rust::aggregate::{ EventRepository, EventSourcedAggregate, StateRepository, StateStoredAggregate, StateStoredOrchestratingAggregate, }; use fmodel_rust::decider::Decider; use fmodel_rust::saga::Saga; -use fmodel_rust::Sum; use crate::api::{ CancelOrderCommand, CreateOrderCommand, CreateShipmentCommand, OrderCancelledEvent, - OrderCommand, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, ShipmentCommand, - ShipmentCreatedEvent, ShipmentEvent, UpdateOrderCommand, + OrderCommand, OrderCreatedEvent, OrderEvent, OrderState, OrderUpdatedEvent, ShipmentCommand, + ShipmentCreatedEvent, ShipmentEvent, ShipmentState, UpdateOrderCommand, +}; +use crate::application::{ + command_from_sum, event_from_sum, sum_to_command, sum_to_event, AggregateError, Command, Event, + Id, }; mod api; - -/// Error type for the application/aggregate -#[derive(Debug, Display)] -#[allow(dead_code)] -enum AggregateError { - FetchEvents(String), - SaveEvents(String), - FetchState(String), - SaveState(String), -} - -impl Error for AggregateError {} +mod application; /// A simple in-memory event repository - infrastructure struct InMemoryEventRepository { - events: Mutex, i32)>>, + events: Mutex>, } impl InMemoryEventRepository { @@ -46,46 +35,9 @@ impl InMemoryEventRepository { } } -trait Id { - fn id(&self) -> u32; -} - -impl Id for Sum { - fn id(&self) -> u32 { - match self { - Sum::First(event) => event.id(), - Sum::Second(event) => event.id(), - } - } -} - -impl Id for Sum { - fn id(&self) -> u32 { - match self { - Sum::First(command) => command.id(), - Sum::Second(command) => command.id(), - } - } -} - -impl Id for (OrderState, ShipmentState) { - fn id(&self) -> u32 { - self.0.order_id - } -} /// Implementation of [EventRepository] for [InMemoryEventRepository] - infrastructure -impl - EventRepository< - Sum, - Sum, - i32, - AggregateError, - > for InMemoryEventRepository -{ - async fn fetch_events( - &self, - command: &Sum, - ) -> Result, i32)>, AggregateError> { +impl EventRepository for InMemoryEventRepository { + async fn fetch_events(&self, command: &Command) -> Result, AggregateError> { Ok(self .events .lock() @@ -98,9 +50,9 @@ impl async fn save( &self, - events: &[Sum], + events: &[Event], latest_version: &Option, - ) -> Result, i32)>, AggregateError> { + ) -> Result, AggregateError> { let mut latest_version = latest_version.to_owned().unwrap_or(-1); let events = events .into_iter() @@ -108,7 +60,7 @@ impl latest_version += 1; (event.clone(), latest_version) }) - .collect::, i32)>>(); + .collect::>(); self.events .lock() @@ -131,17 +83,12 @@ impl InMemoryStateRepository { } // Implementation of [StateRepository] for [InMemoryOrderStateRepository] -impl - StateRepository< - Sum, - (OrderState, ShipmentState), - i32, - AggregateError, - > for InMemoryStateRepository +impl StateRepository + for InMemoryStateRepository { async fn fetch_state( &self, - command: &Sum, + command: &Command, ) -> Result, AggregateError> { Ok(self.states.lock().unwrap().get(&command.id()).cloned()) } @@ -160,47 +107,31 @@ impl } } -#[derive(Debug, Clone, PartialEq)] -struct OrderState { - order_id: u32, - customer_name: String, - items: Vec, - is_cancelled: bool, -} - -#[derive(Debug, Clone, PartialEq)] -struct ShipmentState { - shipment_id: u32, - order_id: u32, - customer_name: String, - items: Vec, -} - /// Decider for the Order aggregate - Domain logic fn order_decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> { Decider { decide: Box::new(|command, state| match command { - OrderCommand::Create(create_cmd) => { + OrderCommand::Create(cmd) => { vec![OrderEvent::Created(OrderCreatedEvent { - order_id: create_cmd.order_id, - customer_name: create_cmd.customer_name.to_owned(), - items: create_cmd.items.to_owned(), + order_id: cmd.order_id, + customer_name: cmd.customer_name.to_owned(), + items: cmd.items.to_owned(), })] } - OrderCommand::Update(update_cmd) => { - if state.order_id == update_cmd.order_id { + OrderCommand::Update(cmd) => { + if state.order_id == cmd.order_id { vec![OrderEvent::Updated(OrderUpdatedEvent { - order_id: update_cmd.order_id, - updated_items: update_cmd.new_items.to_owned(), + order_id: cmd.order_id, + updated_items: cmd.new_items.to_owned(), })] } else { vec![] } } - OrderCommand::Cancel(cancel_cmd) => { - if state.order_id == cancel_cmd.order_id { + OrderCommand::Cancel(cmd) => { + if state.order_id == cmd.order_id { vec![OrderEvent::Cancelled(OrderCancelledEvent { - order_id: cancel_cmd.order_id, + order_id: cmd.order_id, })] } else { vec![] @@ -210,13 +141,13 @@ fn order_decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> { evolve: Box::new(|state, event| { let mut new_state = state.clone(); match event { - OrderEvent::Created(created_event) => { - new_state.order_id = created_event.order_id; - new_state.customer_name = created_event.customer_name.to_owned(); - new_state.items = created_event.items.to_owned(); + OrderEvent::Created(evt) => { + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); } - OrderEvent::Updated(updated_event) => { - new_state.items = updated_event.updated_items.to_owned(); + OrderEvent::Updated(evt) => { + new_state.items = evt.updated_items.to_owned(); } OrderEvent::Cancelled(_) => { new_state.is_cancelled = true; @@ -237,12 +168,12 @@ fn order_decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> { fn shipment_decider<'a>() -> Decider<'a, ShipmentCommand, ShipmentState, ShipmentEvent> { Decider { decide: Box::new(|command, _state| match command { - ShipmentCommand::Create(create_cmd) => { + ShipmentCommand::Create(cmd) => { vec![ShipmentEvent::Created(ShipmentCreatedEvent { - shipment_id: create_cmd.shipment_id, - order_id: create_cmd.order_id, - customer_name: create_cmd.customer_name.to_owned(), - items: create_cmd.items.to_owned(), + shipment_id: cmd.shipment_id, + order_id: cmd.order_id, + customer_name: cmd.customer_name.to_owned(), + items: cmd.items.to_owned(), })] } }), @@ -270,18 +201,18 @@ fn shipment_decider<'a>() -> Decider<'a, ShipmentCommand, ShipmentState, Shipmen fn order_saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { Saga { react: Box::new(|event| match event { - OrderEvent::Created(created_event) => { + OrderEvent::Created(evt) => { vec![ShipmentCommand::Create(CreateShipmentCommand { - shipment_id: created_event.order_id, - order_id: created_event.order_id, - customer_name: created_event.customer_name.to_owned(), - items: created_event.items.to_owned(), + shipment_id: evt.order_id, + order_id: evt.order_id, + customer_name: evt.customer_name.to_owned(), + items: evt.items.to_owned(), })] } - OrderEvent::Updated(_updated_event) => { + OrderEvent::Updated(_) => { vec![] } - OrderEvent::Cancelled(_cancelled_event) => { + OrderEvent::Cancelled(_) => { vec![] } }), @@ -291,10 +222,10 @@ fn order_saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { fn shipment_saga<'a>() -> Saga<'a, ShipmentEvent, OrderCommand> { Saga { react: Box::new(|event| match event { - ShipmentEvent::Created(created_event) => { + ShipmentEvent::Created(evt) => { vec![OrderCommand::Update(api::UpdateOrderCommand { - order_id: created_event.order_id, - new_items: created_event.items.to_owned(), + order_id: evt.order_id, + new_items: evt.items.to_owned(), })] } }), @@ -302,105 +233,108 @@ fn shipment_saga<'a>() -> Saga<'a, ShipmentEvent, OrderCommand> { } #[tokio::test] -async fn es_test() { - let combined_decider = order_decider().combine(shipment_decider()); +async fn event_sourced_aggregate_test() { + let combined_decider = order_decider() + .combine(shipment_decider()) // Decider, (OrderState, ShipmentState), Sum> + .map_command(&command_from_sum) // Decider> + .map_event(&event_from_sum, &sum_to_event); // Decider let repository = InMemoryEventRepository::new(); let aggregate = Arc::new(EventSourcedAggregate::new(repository, combined_decider)); // Makes a clone of the Arc pointer. // This creates another pointer to the same allocation, increasing the strong reference count. let aggregate2 = Arc::clone(&aggregate); - // Lets spawn two threads to simulate two concurrent requests + // Let's spawn two threads to simulate two concurrent requests let handle1 = thread::spawn(|| async move { - let command = Sum::First(OrderCommand::Create(CreateOrderCommand { + let command = Command::OrderCreate(CreateOrderCommand { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })); + }); let result = aggregate.handle(&command).await; assert!(result.is_ok()); assert_eq!( result.unwrap(), [( - Sum::First(OrderEvent::Created(OrderCreatedEvent { + Event::OrderCreated(OrderCreatedEvent { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })), + }), 0 )] ); - let command = Sum::First(OrderCommand::Update(UpdateOrderCommand { + let command = Command::OrderUpdate(UpdateOrderCommand { order_id: 1, new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })); + }); let result = aggregate.handle(&command).await; assert!(result.is_ok()); assert_eq!( result.unwrap(), [( - Sum::First(OrderEvent::Updated(OrderUpdatedEvent { + Event::OrderUpdated(OrderUpdatedEvent { order_id: 1, updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })), + }), 1 )] ); - let command = Sum::First(OrderCommand::Cancel(CancelOrderCommand { order_id: 1 })); + let command = Command::OrderCancel(CancelOrderCommand { order_id: 1 }); let result = aggregate.handle(&command).await; assert!(result.is_ok()); assert_eq!( result.unwrap(), [( - Sum::First(OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 })), + Event::OrderCancelled(OrderCancelledEvent { order_id: 1 }), 2 )] ); }); let handle2 = thread::spawn(|| async move { - let command = Sum::First(OrderCommand::Create(CreateOrderCommand { + let command = Command::OrderCreate(CreateOrderCommand { order_id: 2, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })); + }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); assert_eq!( result.unwrap(), [( - Sum::First(OrderEvent::Created(OrderCreatedEvent { + Event::OrderCreated(OrderCreatedEvent { order_id: 2, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })), + }), 0 )] ); - let command = Sum::First(OrderCommand::Update(UpdateOrderCommand { + let command = Command::OrderUpdate(UpdateOrderCommand { order_id: 2, new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })); + }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); assert_eq!( result.unwrap(), [( - Sum::First(OrderEvent::Updated(OrderUpdatedEvent { + Event::OrderUpdated(OrderUpdatedEvent { order_id: 2, updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })), + }), 1 )] ); - let command = Sum::First(OrderCommand::Cancel(CancelOrderCommand { order_id: 2 })); + let command = Command::OrderCancel(CancelOrderCommand { order_id: 2 }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); assert_eq!( result.unwrap(), [( - Sum::First(OrderEvent::Cancelled(OrderCancelledEvent { order_id: 2 })), + Event::OrderCancelled(OrderCancelledEvent { order_id: 2 }), 2 )] ); @@ -411,18 +345,22 @@ async fn es_test() { } #[tokio::test] -async fn ss_test() { - let combined_decider = order_decider().combine(shipment_decider()); +async fn state_stored_aggregate_test() { + let combined_decider = order_decider() + .combine(shipment_decider()) // Decider, (OrderState, ShipmentState), Sum> + .map_command(&command_from_sum) // Decider> + .map_event(&event_from_sum, &sum_to_event); // Decider + let repository = InMemoryStateRepository::new(); let aggregate = Arc::new(StateStoredAggregate::new(repository, combined_decider)); let aggregate2 = Arc::clone(&aggregate); let handle1 = thread::spawn(|| async move { - let command = Sum::First(OrderCommand::Create(CreateOrderCommand { + let command = Command::OrderCreate(CreateOrderCommand { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })); + }); let result = aggregate.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -445,10 +383,10 @@ async fn ss_test() { 0 ) ); - let command = Sum::First(OrderCommand::Update(UpdateOrderCommand { + let command = Command::OrderUpdate(UpdateOrderCommand { order_id: 1, new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })); + }); let result = aggregate.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -471,7 +409,7 @@ async fn ss_test() { 1 ) ); - let command = Sum::First(OrderCommand::Cancel(CancelOrderCommand { order_id: 1 })); + let command = Command::OrderCancel(CancelOrderCommand { order_id: 1 }); let result = aggregate.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -497,11 +435,11 @@ async fn ss_test() { }); let handle2 = thread::spawn(|| async move { - let command = Sum::First(OrderCommand::Create(CreateOrderCommand { + let command = Command::OrderCreate(CreateOrderCommand { order_id: 2, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })); + }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -524,10 +462,10 @@ async fn ss_test() { 0 ) ); - let command = Sum::First(OrderCommand::Update(UpdateOrderCommand { + let command = Command::OrderUpdate(UpdateOrderCommand { order_id: 2, new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })); + }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -550,7 +488,7 @@ async fn ss_test() { 1 ) ); - let command = Sum::First(OrderCommand::Cancel(CancelOrderCommand { order_id: 2 })); + let command = Command::OrderCancel(CancelOrderCommand { order_id: 2 }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -580,9 +518,16 @@ async fn ss_test() { } #[tokio::test] -async fn ss_combined_test() { - let combined_decider = order_decider().combine(shipment_decider()); - let combined_saga = order_saga().combine(shipment_saga()); +async fn state_stored_combined_test() { + let combined_decider = order_decider() + .combine(shipment_decider()) // Decider, (OrderState, ShipmentState), Sum> + .map_command(&command_from_sum) // Decider> + .map_event(&event_from_sum, &sum_to_event); // Decider + + let combined_saga = order_saga() + .combine(shipment_saga()) + .map_action(&sum_to_command) + .map_action_result(&event_from_sum); let repository = InMemoryStateRepository::new(); let aggregate = Arc::new(StateStoredOrchestratingAggregate::new( @@ -593,11 +538,11 @@ async fn ss_combined_test() { let aggregate2 = Arc::clone(&aggregate); let handle1 = thread::spawn(|| async move { - let command = Sum::First(OrderCommand::Create(CreateOrderCommand { + let command = Command::OrderCreate(CreateOrderCommand { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })); + }); let result = aggregate.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -620,10 +565,10 @@ async fn ss_combined_test() { 0 ) ); - let command = Sum::First(OrderCommand::Update(UpdateOrderCommand { + let command = Command::OrderUpdate(UpdateOrderCommand { order_id: 1, new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })); + }); let result = aggregate.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -646,7 +591,7 @@ async fn ss_combined_test() { 1 ) ); - let command = Sum::First(OrderCommand::Cancel(CancelOrderCommand { order_id: 1 })); + let command = Command::OrderCancel(CancelOrderCommand { order_id: 1 }); let result = aggregate.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -672,11 +617,11 @@ async fn ss_combined_test() { }); let handle2 = thread::spawn(|| async move { - let command = Sum::First(OrderCommand::Create(CreateOrderCommand { + let command = Command::OrderCreate(CreateOrderCommand { order_id: 2, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })); + }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -699,10 +644,10 @@ async fn ss_combined_test() { 0 ) ); - let command = Sum::First(OrderCommand::Update(UpdateOrderCommand { + let command = Command::OrderUpdate(UpdateOrderCommand { order_id: 2, new_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })); + }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); assert_eq!( @@ -725,7 +670,7 @@ async fn ss_combined_test() { 1 ) ); - let command = Sum::First(OrderCommand::Cancel(CancelOrderCommand { order_id: 2 })); + let command = Command::OrderCancel(CancelOrderCommand { order_id: 2 }); let result = aggregate2.handle(&command).await; assert!(result.is_ok()); assert_eq!( diff --git a/tests/aggregate_test.rs b/tests/aggregate_test.rs index e446c7f..cadd485 100644 --- a/tests/aggregate_test.rs +++ b/tests/aggregate_test.rs @@ -1,10 +1,7 @@ use std::collections::HashMap; -use std::error::Error; use std::sync::{Arc, Mutex}; use std::thread; -use derive_more::Display; - use fmodel_rust::aggregate::{ EventRepository, EventSourcedAggregate, StateRepository, StateStoredAggregate, }; @@ -12,22 +9,12 @@ use fmodel_rust::decider::Decider; use crate::api::{ CancelOrderCommand, CreateOrderCommand, OrderCancelledEvent, OrderCommand, OrderCreatedEvent, - OrderEvent, OrderUpdatedEvent, UpdateOrderCommand, + OrderEvent, OrderState, OrderUpdatedEvent, UpdateOrderCommand, }; +use crate::application::AggregateError; mod api; - -/// Error type for the application/aggregate -#[derive(Debug, Display)] -#[allow(dead_code)] -enum AggregateError { - FetchEvents(String), - SaveEvents(String), - FetchState(String), - SaveState(String), -} - -impl Error for AggregateError {} +mod application; /// A simple in-memory event repository - infrastructure struct InMemoryOrderEventRepository { @@ -119,39 +106,31 @@ impl StateRepository } } -#[derive(Debug, Clone, PartialEq)] -struct OrderState { - order_id: u32, - customer_name: String, - items: Vec, - is_cancelled: bool, -} - /// Decider for the Order aggregate - Domain logic fn decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> { Decider { decide: Box::new(|command, state| match command { - OrderCommand::Create(create_cmd) => { + OrderCommand::Create(cmd) => { vec![OrderEvent::Created(OrderCreatedEvent { - order_id: create_cmd.order_id, - customer_name: create_cmd.customer_name.to_owned(), - items: create_cmd.items.to_owned(), + order_id: cmd.order_id, + customer_name: cmd.customer_name.to_owned(), + items: cmd.items.to_owned(), })] } - OrderCommand::Update(update_cmd) => { - if state.order_id == update_cmd.order_id { + OrderCommand::Update(cmd) => { + if state.order_id == cmd.order_id { vec![OrderEvent::Updated(OrderUpdatedEvent { - order_id: update_cmd.order_id, - updated_items: update_cmd.new_items.to_owned(), + order_id: cmd.order_id, + updated_items: cmd.new_items.to_owned(), })] } else { vec![] } } - OrderCommand::Cancel(cancel_cmd) => { - if state.order_id == cancel_cmd.order_id { + OrderCommand::Cancel(cmd) => { + if state.order_id == cmd.order_id { vec![OrderEvent::Cancelled(OrderCancelledEvent { - order_id: cancel_cmd.order_id, + order_id: cmd.order_id, })] } else { vec![] @@ -161,13 +140,13 @@ fn decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> { evolve: Box::new(|state, event| { let mut new_state = state.clone(); match event { - OrderEvent::Created(created_event) => { - new_state.order_id = created_event.order_id; - new_state.customer_name = created_event.customer_name.to_owned(); - new_state.items = created_event.items.to_owned(); + OrderEvent::Created(evt) => { + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); } - OrderEvent::Updated(updated_event) => { - new_state.items = updated_event.updated_items.to_owned(); + OrderEvent::Updated(evt) => { + new_state.items = evt.updated_items.to_owned(); } OrderEvent::Cancelled(_) => { new_state.is_cancelled = true; @@ -192,7 +171,7 @@ async fn es_test() { // This creates another pointer to the same allocation, increasing the strong reference count. let aggregate2 = Arc::clone(&aggregate); - // Lets spawn two threads to simulate two concurrent requests + // Let's spawn two threads to simulate two concurrent requests let handle1 = thread::spawn(|| async move { let command = OrderCommand::Create(CreateOrderCommand { order_id: 1, diff --git a/tests/api.rs b/tests/api/mod.rs similarity index 60% rename from tests/api.rs rename to tests/api/mod.rs index c7f1fda..c7d2adb 100644 --- a/tests/api.rs +++ b/tests/api/mod.rs @@ -1,4 +1,26 @@ -// Order API +// ################################################################### +// ############################ Order API ############################ +// ################################################################### + +/// The state of the Order entity +#[derive(Debug, Clone, PartialEq)] +pub struct OrderState { + pub order_id: u32, + pub customer_name: String, + pub items: Vec, + pub is_cancelled: bool, +} + +/// The state of the ViewOrder entity / It represents the Query Model +#[derive(Debug, Clone, PartialEq)] +pub struct OrderViewState { + pub order_id: u32, + pub customer_name: String, + pub items: Vec, + pub is_cancelled: bool, +} + +/// All variants of Order commands #[derive(Debug, Clone, PartialEq)] #[allow(dead_code)] pub enum OrderCommand { @@ -25,6 +47,7 @@ pub struct CancelOrderCommand { pub order_id: u32, } +/// Provides a way to get the id of the Order commands impl OrderCommand { #[allow(dead_code)] pub fn id(&self) -> u32 { @@ -36,15 +59,7 @@ impl OrderCommand { } } -impl ShipmentCommand { - #[allow(dead_code)] - pub fn id(&self) -> u32 { - match self { - ShipmentCommand::Create(c) => c.shipment_id.to_owned(), - } - } -} - +/// All variants of Order events #[derive(Debug, Clone, PartialEq)] #[allow(dead_code)] pub enum OrderEvent { @@ -71,6 +86,7 @@ pub struct OrderCancelledEvent { pub order_id: u32, } +/// Provides a way to get the id of the Order events impl OrderEvent { #[allow(dead_code)] pub fn id(&self) -> u32 { @@ -81,16 +97,30 @@ impl OrderEvent { } } } -impl ShipmentEvent { - #[allow(dead_code)] - pub fn id(&self) -> u32 { - match self { - ShipmentEvent::Created(c) => c.shipment_id.to_owned(), - } - } + +// ###################################################################### +// ############################ Shipment API ############################ +// ###################################################################### + +/// The state of the Shipment entity +#[derive(Debug, Clone, PartialEq)] +pub struct ShipmentState { + pub shipment_id: u32, + pub order_id: u32, + pub customer_name: String, + pub items: Vec, } -// Shipment API +/// The state of the ViewShipment entity / It represents the Query Model +#[derive(Debug, Clone, PartialEq)] +pub struct ShipmentViewState { + pub shipment_id: u32, + pub order_id: u32, + pub customer_name: String, + pub items: Vec, +} + +/// All variants of Shipment commands #[derive(Debug, PartialEq, Clone)] #[allow(dead_code)] pub enum ShipmentCommand { @@ -105,6 +135,17 @@ pub struct CreateShipmentCommand { pub items: Vec, } +/// Provides a way to get the id of the Shipment commands +impl ShipmentCommand { + #[allow(dead_code)] + pub fn id(&self) -> u32 { + match self { + ShipmentCommand::Create(c) => c.shipment_id.to_owned(), + } + } +} + +/// All variants of Shipment events #[derive(Debug, PartialEq, Clone)] #[allow(dead_code)] pub enum ShipmentEvent { @@ -117,3 +158,13 @@ pub struct ShipmentCreatedEvent { pub customer_name: String, pub items: Vec, } + +/// Provides a way to get the id of the Shipment events +impl ShipmentEvent { + #[allow(dead_code)] + pub fn id(&self) -> u32 { + match self { + ShipmentEvent::Created(c) => c.shipment_id.to_owned(), + } + } +} diff --git a/tests/application/mod.rs b/tests/application/mod.rs new file mode 100644 index 0000000..68dd999 --- /dev/null +++ b/tests/application/mod.rs @@ -0,0 +1,176 @@ +use derive_more::Display; +use fmodel_rust::Sum; +use std::error::Error; + +use crate::api::{ + CancelOrderCommand, CreateOrderCommand, CreateShipmentCommand, OrderCancelledEvent, + OrderCommand, OrderCreatedEvent, OrderEvent, OrderState, OrderUpdatedEvent, OrderViewState, + ShipmentCommand, ShipmentCreatedEvent, ShipmentEvent, ShipmentState, ShipmentViewState, + UpdateOrderCommand, +}; + +/// The command enum for all the domain commands (shipment and order) +/// It is convenient to have a single enum for all the command variants in your system to make it easy to combine all deciders into a single decider +/// Consider exposing this API to the outside world, instead of exposing the Order or Shipment commands individually. It is on you! +#[derive(Debug, PartialEq, Clone)] +#[allow(dead_code)] +pub enum Command { + ShipmentCreate(CreateShipmentCommand), + OrderCreate(CreateOrderCommand), + OrderUpdate(UpdateOrderCommand), + OrderCancel(CancelOrderCommand), +} + +/// A mapping function to contra map the domain command to the inconvenient Sum +#[allow(dead_code)] +pub fn command_from_sum(command: &Command) -> Sum { + match command { + Command::ShipmentCreate(c) => Sum::Second(ShipmentCommand::Create(c.to_owned())), + Command::OrderCreate(c) => Sum::First(OrderCommand::Create(c.to_owned())), + Command::OrderUpdate(c) => Sum::First(OrderCommand::Update(c.to_owned())), + Command::OrderCancel(c) => Sum::First(OrderCommand::Cancel(c.to_owned())), + } +} +/// A mapping function to map the inconvenient Sum to the domain command +#[allow(dead_code)] +pub fn sum_to_command(command: &Sum) -> Command { + match command { + Sum::First(c) => match c { + OrderCommand::Create(c) => Command::OrderCreate(c.to_owned()), + OrderCommand::Update(c) => Command::OrderUpdate(c.to_owned()), + OrderCommand::Cancel(c) => Command::OrderCancel(c.to_owned()), + }, + Sum::Second(c) => match c { + ShipmentCommand::Create(c) => Command::ShipmentCreate(c.to_owned()), + }, + } +} +#[allow(dead_code)] +pub fn sum_to_command2(command: &Sum) -> Command { + match command { + Sum::First(c) => match c { + ShipmentCommand::Create(c) => Command::ShipmentCreate(c.to_owned()), + }, + Sum::Second(c) => match c { + OrderCommand::Create(c) => Command::OrderCreate(c.to_owned()), + OrderCommand::Update(c) => Command::OrderUpdate(c.to_owned()), + OrderCommand::Cancel(c) => Command::OrderCancel(c.to_owned()), + }, + } +} + +/// The event enum for all the domain events (shipment and order) +/// It is convenient to have a single enum for all the event variants in your system to make it easy to combine all deciders/sagas/views into a single decider/saga/view +/// Consider exposing this API to the outside world, instead of exposing the Order or Shipment events individually. It is on you! +#[derive(Debug, PartialEq, Clone)] +#[allow(dead_code)] +pub enum Event { + ShipmentCreated(ShipmentCreatedEvent), + OrderCreated(OrderCreatedEvent), + OrderUpdated(OrderUpdatedEvent), + OrderCancelled(OrderCancelledEvent), +} + +/// A mapping function to contra map the domain event to the inconvenient Sum +#[allow(dead_code)] +pub fn event_from_sum(event: &Event) -> Sum { + match event { + Event::ShipmentCreated(c) => Sum::Second(ShipmentEvent::Created(c.to_owned())), + Event::OrderCreated(c) => Sum::First(OrderEvent::Created(c.to_owned())), + Event::OrderUpdated(c) => Sum::First(OrderEvent::Updated(c.to_owned())), + Event::OrderCancelled(c) => Sum::First(OrderEvent::Cancelled(c.to_owned())), + } +} +#[allow(dead_code)] +pub fn event_from_sum2(event: &Event) -> Sum { + match event { + Event::ShipmentCreated(c) => Sum::First(ShipmentEvent::Created(c.to_owned())), + Event::OrderCreated(c) => Sum::Second(OrderEvent::Created(c.to_owned())), + Event::OrderUpdated(c) => Sum::Second(OrderEvent::Updated(c.to_owned())), + Event::OrderCancelled(c) => Sum::Second(OrderEvent::Cancelled(c.to_owned())), + } +} +/// A mapping function to map the inconvenient Sum to the domain event +#[allow(dead_code)] +pub fn sum_to_event(event: &Sum) -> Event { + match event { + Sum::First(e) => match e { + OrderEvent::Created(c) => Event::OrderCreated(c.to_owned()), + OrderEvent::Updated(c) => Event::OrderUpdated(c.to_owned()), + OrderEvent::Cancelled(c) => Event::OrderCancelled(c.to_owned()), + }, + Sum::Second(e) => match e { + ShipmentEvent::Created(c) => Event::ShipmentCreated(c.to_owned()), + }, + } +} + +/// A trait to provide a way to get the id of the messages/entities +pub trait Id { + fn id(&self) -> u32; +} + +impl Id for Event { + fn id(&self) -> u32 { + match self { + Event::OrderCreated(event) => event.order_id, + Event::OrderCancelled(event) => event.order_id, + Event::OrderUpdated(event) => event.order_id, + Event::ShipmentCreated(event) => event.shipment_id, + } + } +} + +impl Id for Command { + fn id(&self) -> u32 { + match self { + Command::OrderCreate(cmd) => cmd.order_id, + Command::OrderUpdate(cmd) => cmd.order_id, + Command::OrderCancel(cmd) => cmd.order_id, + Command::ShipmentCreate(cmd) => cmd.shipment_id, + } + } +} + +impl Id for (OrderState, ShipmentState) { + fn id(&self) -> u32 { + self.0.order_id + } +} + +impl Id for (OrderViewState, ShipmentViewState) { + fn id(&self) -> u32 { + self.0.order_id + } +} + +/// Error type for the application/aggregate +#[derive(Debug, Display)] +#[allow(dead_code)] +pub enum AggregateError { + FetchEvents(String), + SaveEvents(String), + FetchState(String), + SaveState(String), +} + +impl Error for AggregateError {} + +/// Error type for the application/materialized view +#[derive(Debug, Display)] +#[allow(dead_code)] +pub enum MaterializedViewError { + FetchState(String), + SaveState(String), +} + +impl Error for MaterializedViewError {} + +/// Error type for the saga manager +#[derive(Debug, Display)] +#[allow(dead_code)] +pub enum SagaManagerError { + PublishAction(String), +} + +impl Error for SagaManagerError {} diff --git a/tests/decider_test.rs b/tests/decider_test.rs index 8971d61..5f97aa4 100644 --- a/tests/decider_test.rs +++ b/tests/decider_test.rs @@ -1,54 +1,41 @@ use fmodel_rust::decider::{Decider, EventComputation, StateComputation}; -use fmodel_rust::{Sum::First as Order, Sum::Second as Shipment}; use crate::api::{ CancelOrderCommand, CreateOrderCommand, CreateShipmentCommand, OrderCancelledEvent, - OrderCommand, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, ShipmentCommand, - ShipmentCreatedEvent, ShipmentEvent, + OrderCommand, OrderCreatedEvent, OrderEvent, OrderState, OrderUpdatedEvent, ShipmentCommand, + ShipmentCreatedEvent, ShipmentEvent, ShipmentState, }; +use crate::application::Command::{OrderCreate, ShipmentCreate}; +use crate::application::Event::{OrderCreated, ShipmentCreated}; +use crate::application::{command_from_sum, event_from_sum, sum_to_event, Command, Event}; mod api; - -#[derive(Debug, Clone, PartialEq)] -struct OrderState { - order_id: u32, - customer_name: String, - items: Vec, - is_cancelled: bool, -} - -#[derive(Debug, Clone, PartialEq)] -struct ShipmentState { - shipment_id: u32, - order_id: u32, - customer_name: String, - items: Vec, -} +mod application; fn order_decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> { Decider { decide: Box::new(|command, state| match command { - OrderCommand::Create(create_cmd) => { + OrderCommand::Create(cmd) => { vec![OrderEvent::Created(OrderCreatedEvent { - order_id: create_cmd.order_id, - customer_name: create_cmd.customer_name.to_owned(), - items: create_cmd.items.to_owned(), + order_id: cmd.order_id, + customer_name: cmd.customer_name.to_owned(), + items: cmd.items.to_owned(), })] } - OrderCommand::Update(update_cmd) => { - if state.order_id == update_cmd.order_id { + OrderCommand::Update(cmd) => { + if state.order_id == cmd.order_id { vec![OrderEvent::Updated(OrderUpdatedEvent { - order_id: update_cmd.order_id, - updated_items: update_cmd.new_items.to_owned(), + order_id: cmd.order_id, + updated_items: cmd.new_items.to_owned(), })] } else { vec![] } } - OrderCommand::Cancel(cancel_cmd) => { - if state.order_id == cancel_cmd.order_id { + OrderCommand::Cancel(cmd) => { + if state.order_id == cmd.order_id { vec![OrderEvent::Cancelled(OrderCancelledEvent { - order_id: cancel_cmd.order_id, + order_id: cmd.order_id, })] } else { vec![] @@ -58,13 +45,13 @@ fn order_decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> { evolve: Box::new(|state, event| { let mut new_state = state.clone(); match event { - OrderEvent::Created(created_event) => { - new_state.order_id = created_event.order_id; - new_state.customer_name = created_event.customer_name.to_owned(); - new_state.items = created_event.items.to_owned(); + OrderEvent::Created(evt) => { + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); } - OrderEvent::Updated(updated_event) => { - new_state.items = updated_event.updated_items.to_owned(); + OrderEvent::Updated(evt) => { + new_state.items = evt.updated_items.to_owned(); } OrderEvent::Cancelled(_) => { new_state.is_cancelled = true; @@ -84,23 +71,23 @@ fn order_decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> { fn shipment_decider<'a>() -> Decider<'a, ShipmentCommand, ShipmentState, ShipmentEvent> { Decider { decide: Box::new(|command, _state| match command { - ShipmentCommand::Create(create_cmd) => { + ShipmentCommand::Create(cmd) => { vec![ShipmentEvent::Created(ShipmentCreatedEvent { - shipment_id: create_cmd.shipment_id, - order_id: create_cmd.order_id, - customer_name: create_cmd.customer_name.to_owned(), - items: create_cmd.items.to_owned(), + shipment_id: cmd.shipment_id, + order_id: cmd.order_id, + customer_name: cmd.customer_name.to_owned(), + items: cmd.items.to_owned(), })] } }), evolve: Box::new(|state, event| { let mut new_state = state.clone(); match event { - ShipmentEvent::Created(created_event) => { - new_state.shipment_id = created_event.shipment_id; - new_state.order_id = created_event.order_id; - new_state.customer_name = created_event.customer_name.to_owned(); - new_state.items = created_event.items.to_owned(); + ShipmentEvent::Created(evt) => { + new_state.shipment_id = evt.shipment_id; + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); } } new_state @@ -117,23 +104,31 @@ fn shipment_decider<'a>() -> Decider<'a, ShipmentCommand, ShipmentState, Shipmen #[test] fn test() { let order_decider: Decider = order_decider(); - let order_decider2: Decider = crate::order_decider(); - let shpiment_decider2: Decider = + let order_decider_clone: Decider = crate::order_decider(); + let shipment_decider: Decider = shipment_decider(); - let combined_decider = order_decider2.combine(shpiment_decider2); + let combined_decider: Decider = + order_decider_clone + .combine(shipment_decider) // Decider, (OrderState, ShipmentState), Sum> + .map_command(&command_from_sum) // Decider> + .map_event(&event_from_sum, &sum_to_event); // Decider - let create_order_command = OrderCommand::Create(CreateOrderCommand { + let create_order_command = CreateOrderCommand { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - let create_shipment_command = ShipmentCommand::Create(CreateShipmentCommand { + }; + + let create_shipment_command = CreateShipmentCommand { shipment_id: 1, order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); - let new_events = order_decider.compute_new_events(&[], &create_order_command); + }; + + // Test the OrderDecider + let new_events = + order_decider.compute_new_events(&[], &OrderCommand::Create(create_order_command.clone())); assert_eq!( new_events, [OrderEvent::Created(OrderCreatedEvent { @@ -142,29 +137,33 @@ fn test() { items: vec!["Item 1".to_string(), "Item 2".to_string()], })] ); + // Test the Decider that combines OrderDecider and ShipmentDecider and can handle both OrderCommand and ShipmentCommand and produce Event let new_events2 = - combined_decider.compute_new_events(&[], &Order(create_order_command.clone())); + combined_decider.compute_new_events(&[], &OrderCreate(create_order_command.clone())); assert_eq!( new_events2, - [Order(OrderEvent::Created(OrderCreatedEvent { + [OrderCreated(OrderCreatedEvent { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - }))] + })] ); + // Test the Decider that combines OrderDecider and ShipmentDecider and can handle both OrderCommand and ShipmentCommand and produce Event let new_events3 = - combined_decider.compute_new_events(&[], &Shipment(create_shipment_command.clone())); + combined_decider.compute_new_events(&[], &ShipmentCreate(create_shipment_command.clone())); assert_eq!( new_events3, - [Shipment(ShipmentEvent::Created(ShipmentCreatedEvent { + [ShipmentCreated(ShipmentCreatedEvent { shipment_id: 1, order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - }))] + })] ); - let new_state = order_decider.compute_new_state(None, &create_order_command); + // Test the OrderDecider + let new_state = + order_decider.compute_new_state(None, &OrderCommand::Create(create_order_command.clone())); assert_eq!( new_state, OrderState { @@ -174,8 +173,9 @@ fn test() { is_cancelled: false, } ); + // Test the Decider that combines OrderDecider and ShipmentDecider and can handle both OrderCommand and ShipmentCommand and produce a tuple of (OrderState, ShipmentState) let new_state2 = - combined_decider.compute_new_state(None, &Shipment(create_shipment_command.clone())); + combined_decider.compute_new_state(None, &ShipmentCreate(create_shipment_command.clone())); assert_eq!( new_state2, ( @@ -194,12 +194,14 @@ fn test() { ) ); + // Test the OrderDecider let cancel_command = OrderCommand::Cancel(CancelOrderCommand { order_id: 1 }); let new_events = order_decider.compute_new_events(&new_events, &cancel_command); assert_eq!( new_events, [OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 })] ); + // Test the OrderDecider let new_state = order_decider.compute_new_state(Some(new_state), &cancel_command); assert_eq!( new_state, diff --git a/tests/materialized_view_combined_test.rs b/tests/materialized_view_combined_test.rs index ed098b7..8e5eff0 100644 --- a/tests/materialized_view_combined_test.rs +++ b/tests/materialized_view_combined_test.rs @@ -1,48 +1,31 @@ use std::collections::HashMap; -use std::error::Error; use std::sync::{Arc, Mutex}; use std::thread; -use derive_more::Display; - use fmodel_rust::materialized_view::{MaterializedView, ViewStateRepository}; use fmodel_rust::view::View; -use fmodel_rust::Sum; use crate::api::{ - OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, ShipmentEvent, + OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, OrderViewState, + ShipmentEvent, ShipmentViewState, }; +use crate::application::{event_from_sum, Event, Id, MaterializedViewError}; mod api; - -#[derive(Debug, Clone, PartialEq)] -struct OrderViewState { - order_id: u32, - customer_name: String, - items: Vec, - is_cancelled: bool, -} - -#[derive(Debug, Clone, PartialEq)] -struct ShipmentViewState { - shipment_id: u32, - order_id: u32, - customer_name: String, - items: Vec, -} +mod application; fn order_view<'a>() -> View<'a, OrderViewState, OrderEvent> { View { evolve: Box::new(|state, event| { let mut new_state = state.clone(); match event { - OrderEvent::Created(created_event) => { - new_state.order_id = created_event.order_id; - new_state.customer_name = created_event.customer_name.to_owned(); - new_state.items = created_event.items.to_owned(); + OrderEvent::Created(evt) => { + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); } - OrderEvent::Updated(updated_event) => { - new_state.items = updated_event.updated_items.to_owned(); + OrderEvent::Updated(evt) => { + new_state.items = evt.updated_items.to_owned(); } OrderEvent::Cancelled(_) => { new_state.is_cancelled = true; @@ -64,11 +47,11 @@ fn shipment_view<'a>() -> View<'a, ShipmentViewState, ShipmentEvent> { evolve: Box::new(|state, event| { let mut new_state = state.clone(); match event { - ShipmentEvent::Created(created_event) => { - new_state.shipment_id = created_event.shipment_id; - new_state.order_id = created_event.order_id; - new_state.customer_name = created_event.customer_name.to_owned(); - new_state.items = created_event.items.to_owned(); + ShipmentEvent::Created(evt) => { + new_state.shipment_id = evt.shipment_id; + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); } } new_state @@ -82,16 +65,6 @@ fn shipment_view<'a>() -> View<'a, ShipmentViewState, ShipmentEvent> { } } -/// Error type for the application/materialized view -#[derive(Debug, Display)] -#[allow(dead_code)] -enum MaterializedViewError { - FetchState(String), - SaveState(String), -} - -impl Error for MaterializedViewError {} - struct InMemoryViewStateRepository { states: Mutex>, } @@ -104,36 +77,13 @@ impl InMemoryViewStateRepository { } } -trait Id { - fn id(&self) -> u32; -} - -impl Id for Sum { - fn id(&self) -> u32 { - match self { - Sum::First(event) => event.id(), - Sum::Second(event) => event.id(), - } - } -} - -impl Id for (OrderViewState, ShipmentViewState) { - fn id(&self) -> u32 { - self.0.order_id - } -} - // Implementation of [ViewStateRepository] for [InMemoryViewOrderStateRepository] -impl - ViewStateRepository< - Sum, - (OrderViewState, ShipmentViewState), - MaterializedViewError, - > for InMemoryViewStateRepository +impl ViewStateRepository + for InMemoryViewStateRepository { async fn fetch_state( &self, - event: &Sum, + event: &Event, ) -> Result, MaterializedViewError> { Ok(self.states.lock().unwrap().get(&event.id()).cloned()) } @@ -152,7 +102,9 @@ impl #[tokio::test] async fn test() { - let combined_view = order_view().combine(shipment_view()); + let combined_view = order_view() + .combine(shipment_view()) + .map_event(&event_from_sum); let repository = InMemoryViewStateRepository::new(); let materialized_view = Arc::new(MaterializedView::new(repository, combined_view)); let materialized_view1 = Arc::clone(&materialized_view); @@ -160,11 +112,11 @@ async fn test() { // Lets spawn two threads to simulate two concurrent requests let handle1 = thread::spawn(|| async move { - let event = Sum::First(OrderEvent::Created(OrderCreatedEvent { + let event = Event::OrderCreated(OrderCreatedEvent { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })); + }); let result = materialized_view1.handle(&event).await; assert!(result.is_ok()); assert_eq!( @@ -184,10 +136,10 @@ async fn test() { } ) ); - let event = Sum::First(OrderEvent::Updated(OrderUpdatedEvent { + let event = Event::OrderUpdated(OrderUpdatedEvent { order_id: 1, updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })); + }); let result = materialized_view1.handle(&event).await; assert!(result.is_ok()); assert_eq!( @@ -207,7 +159,7 @@ async fn test() { } ) ); - let event = Sum::First(OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 })); + let event = Event::OrderCancelled(OrderCancelledEvent { order_id: 1 }); let result = materialized_view1.handle(&event).await; assert!(result.is_ok()); assert_eq!( @@ -230,11 +182,11 @@ async fn test() { }); let handle2 = thread::spawn(|| async move { - let event = Sum::First(OrderEvent::Created(OrderCreatedEvent { + let event = Event::OrderCreated(OrderCreatedEvent { order_id: 2, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })); + }); let result = materialized_view2.handle(&event).await; assert!(result.is_ok()); assert_eq!( @@ -254,10 +206,10 @@ async fn test() { } ) ); - let event = Sum::First(OrderEvent::Updated(OrderUpdatedEvent { + let event = Event::OrderUpdated(OrderUpdatedEvent { order_id: 2, updated_items: vec!["Item 3".to_string(), "Item 4".to_string()], - })); + }); let result = materialized_view2.handle(&event).await; assert!(result.is_ok()); assert_eq!( @@ -277,7 +229,7 @@ async fn test() { } ) ); - let event = Sum::First(OrderEvent::Cancelled(OrderCancelledEvent { order_id: 2 })); + let event = Event::OrderCancelled(OrderCancelledEvent { order_id: 2 }); let result = materialized_view2.handle(&event).await; assert!(result.is_ok()); assert_eq!( diff --git a/tests/materialized_view_test.rs b/tests/materialized_view_test.rs index fccbd08..5c7325f 100644 --- a/tests/materialized_view_test.rs +++ b/tests/materialized_view_test.rs @@ -1,37 +1,30 @@ use std::collections::HashMap; -use std::error::Error; use std::sync::{Arc, Mutex}; use std::thread; -use derive_more::Display; - use fmodel_rust::materialized_view::{MaterializedView, ViewStateRepository}; use fmodel_rust::view::View; -use crate::api::{OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent}; +use crate::api::{ + OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, OrderViewState, +}; +use crate::application::MaterializedViewError; mod api; - -#[derive(Debug, Clone, PartialEq)] -struct OrderViewState { - order_id: u32, - customer_name: String, - items: Vec, - is_cancelled: bool, -} +mod application; fn view<'a>() -> View<'a, OrderViewState, OrderEvent> { View { evolve: Box::new(|state, event| { let mut new_state = state.clone(); match event { - OrderEvent::Created(created_event) => { - new_state.order_id = created_event.order_id; - new_state.customer_name = created_event.customer_name.to_owned(); - new_state.items = created_event.items.to_owned(); + OrderEvent::Created(evt) => { + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); } - OrderEvent::Updated(updated_event) => { - new_state.items = updated_event.updated_items.to_owned(); + OrderEvent::Updated(evt) => { + new_state.items = evt.updated_items.to_owned(); } OrderEvent::Cancelled(_) => { new_state.is_cancelled = true; @@ -48,16 +41,6 @@ fn view<'a>() -> View<'a, OrderViewState, OrderEvent> { } } -/// Error type for the application/materialized view -#[derive(Debug, Display)] -#[allow(dead_code)] -enum MaterializedViewError { - FetchState(String), - SaveState(String), -} - -impl Error for MaterializedViewError {} - struct InMemoryViewOrderStateRepository { states: Mutex>, } @@ -97,7 +80,7 @@ async fn test() { let materialized_view1 = Arc::clone(&materialized_view); let materialized_view2 = Arc::clone(&materialized_view); - // Lets spawn two threads to simulate two concurrent requests + // Let's spawn two threads to simulate two concurrent requests let handle1 = thread::spawn(|| async move { let event = OrderEvent::Created(OrderCreatedEvent { order_id: 1, diff --git a/tests/saga_manager_combined_test.rs b/tests/saga_manager_combined_test.rs index 936b006..a1d5c14 100644 --- a/tests/saga_manager_combined_test.rs +++ b/tests/saga_manager_combined_test.rs @@ -1,31 +1,30 @@ -use derive_more::Display; use fmodel_rust::saga::Saga; use fmodel_rust::saga_manager::{ActionPublisher, SagaManager}; -use fmodel_rust::Sum; -use std::error::Error; use crate::api::{ CreateShipmentCommand, OrderCommand, OrderCreatedEvent, OrderEvent, ShipmentCommand, - ShipmentEvent, + ShipmentEvent, UpdateOrderCommand, }; +use crate::application::{event_from_sum2, sum_to_command2, Command, Event, SagaManagerError}; mod api; +mod application; fn order_saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { Saga { react: Box::new(|event| match event { - OrderEvent::Created(created_event) => { + OrderEvent::Created(evt) => { vec![ShipmentCommand::Create(CreateShipmentCommand { - shipment_id: created_event.order_id, - order_id: created_event.order_id, - customer_name: created_event.customer_name.to_owned(), - items: created_event.items.to_owned(), + shipment_id: evt.order_id, + order_id: evt.order_id, + customer_name: evt.customer_name.to_owned(), + items: evt.items.to_owned(), })] } - OrderEvent::Updated(_updated_event) => { + OrderEvent::Updated(_) => { vec![] } - OrderEvent::Cancelled(_cancelled_event) => { + OrderEvent::Cancelled(_) => { vec![] } }), @@ -35,25 +34,16 @@ fn order_saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { fn shipment_saga<'a>() -> Saga<'a, ShipmentEvent, OrderCommand> { Saga { react: Box::new(|event| match event { - ShipmentEvent::Created(created_event) => { - vec![OrderCommand::Update(api::UpdateOrderCommand { - order_id: created_event.order_id, - new_items: created_event.items.to_owned(), + ShipmentEvent::Created(evt) => { + vec![OrderCommand::Update(UpdateOrderCommand { + order_id: evt.order_id, + new_items: evt.items.to_owned(), })] } }), } } -/// Error type for the saga manager -#[derive(Debug, Display)] -#[allow(dead_code)] -enum SagaManagerError { - PublishAction(String), -} - -impl Error for SagaManagerError {} - /// Simple action publisher that just returns the action/command. /// It is used for testing. In real life, it would publish the action/command to some external system. or to an aggregate that is able to handel the action/command. struct SimpleActionPublisher; @@ -64,38 +54,36 @@ impl SimpleActionPublisher { } } -impl ActionPublisher, SagaManagerError> - for SimpleActionPublisher -{ - async fn publish( - &self, - action: &[Sum], - ) -> Result>, SagaManagerError> { +impl ActionPublisher for SimpleActionPublisher { + async fn publish(&self, action: &[Command]) -> Result, SagaManagerError> { Ok(Vec::from(action)) } } #[tokio::test] async fn test() { - let order_created_event = Sum::Second(OrderEvent::Created(OrderCreatedEvent { + let order_created_event = Event::OrderCreated(OrderCreatedEvent { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - })); + }); let saga_manager = SagaManager::new( SimpleActionPublisher::new(), - shipment_saga().combine(order_saga()), + shipment_saga() + .combine(order_saga()) + .map_action(&sum_to_command2) + .map_action_result(&event_from_sum2), ); let result = saga_manager.handle(&order_created_event).await; assert!(result.is_ok()); assert_eq!( result.unwrap(), - vec![Sum::First(ShipmentCommand::Create(CreateShipmentCommand { + vec![Command::ShipmentCreate(CreateShipmentCommand { shipment_id: 1, order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], - }))] + })] ); } diff --git a/tests/saga_manager_test.rs b/tests/saga_manager_test.rs index f46bcce..fc524fb 100644 --- a/tests/saga_manager_test.rs +++ b/tests/saga_manager_test.rs @@ -1,42 +1,33 @@ -use derive_more::Display; use fmodel_rust::saga::Saga; use fmodel_rust::saga_manager::{ActionPublisher, SagaManager}; -use std::error::Error; use crate::api::{CreateShipmentCommand, OrderCreatedEvent, OrderEvent, ShipmentCommand}; +use crate::application::SagaManagerError; mod api; +mod application; fn saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { Saga { react: Box::new(|event| match event { - OrderEvent::Created(created_event) => { + OrderEvent::Created(evt) => { vec![ShipmentCommand::Create(CreateShipmentCommand { - shipment_id: created_event.order_id, - order_id: created_event.order_id, - customer_name: created_event.customer_name.to_owned(), - items: created_event.items.to_owned(), + shipment_id: evt.order_id, + order_id: evt.order_id, + customer_name: evt.customer_name.to_owned(), + items: evt.items.to_owned(), })] } - OrderEvent::Updated(_updated_event) => { + OrderEvent::Updated(_) => { vec![] } - OrderEvent::Cancelled(_cancelled_event) => { + OrderEvent::Cancelled(_) => { vec![] } }), } } -/// Error type for the saga manager -#[derive(Debug, Display)] -#[allow(dead_code)] -enum SagaManagerError { - PublishAction(String), -} - -impl Error for SagaManagerError {} - /// Simple action publisher that just returns the action/command. /// It is used for testing. In real life, it would publish the action/command to some external system. or to an aggregate that is able to handel the action/command. struct SimpleActionPublisher; diff --git a/tests/saga_test.rs b/tests/saga_test.rs index 64e3db4..d96e778 100644 --- a/tests/saga_test.rs +++ b/tests/saga_test.rs @@ -1,28 +1,29 @@ use fmodel_rust::saga::{ActionComputation, Saga}; -use fmodel_rust::Sum; use crate::api::{ CreateShipmentCommand, OrderCommand, OrderCreatedEvent, OrderEvent, ShipmentCommand, - ShipmentEvent, + ShipmentEvent, UpdateOrderCommand, }; +use crate::application::{event_from_sum, sum_to_command, Command, Event}; mod api; +mod application; fn order_saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { Saga { react: Box::new(|event| match event { - OrderEvent::Created(created_event) => { + OrderEvent::Created(evt) => { vec![ShipmentCommand::Create(CreateShipmentCommand { - shipment_id: created_event.order_id, - order_id: created_event.order_id, - customer_name: created_event.customer_name.to_owned(), - items: created_event.items.to_owned(), + shipment_id: evt.order_id, + order_id: evt.order_id, + customer_name: evt.customer_name.to_owned(), + items: evt.items.to_owned(), })] } - OrderEvent::Updated(_updated_event) => { + OrderEvent::Updated(_) => { vec![] } - OrderEvent::Cancelled(_cancelled_event) => { + OrderEvent::Cancelled(_) => { vec![] } }), @@ -32,10 +33,10 @@ fn order_saga<'a>() -> Saga<'a, OrderEvent, ShipmentCommand> { fn shipment_saga<'a>() -> Saga<'a, ShipmentEvent, OrderCommand> { Saga { react: Box::new(|event| match event { - ShipmentEvent::Created(created_event) => { - vec![OrderCommand::Update(api::UpdateOrderCommand { - order_id: created_event.order_id, - new_items: created_event.items.to_owned(), + ShipmentEvent::Created(evt) => { + vec![OrderCommand::Update(UpdateOrderCommand { + order_id: evt.order_id, + new_items: evt.items.to_owned(), })] } }), @@ -47,7 +48,10 @@ fn test() { let order_saga: Saga = order_saga(); let order_saga2: Saga = crate::order_saga(); let shipment_saga: Saga = shipment_saga(); - let combined_saga = order_saga2.combine(shipment_saga); + let combined_saga = order_saga2 + .combine(shipment_saga) + .map_action(&sum_to_command) + .map_action_result(&event_from_sum); let order_created_event = OrderEvent::Created(OrderCreatedEvent { order_id: 1, customer_name: "John Doe".to_string(), @@ -63,16 +67,19 @@ fn test() { items: vec!["Item 1".to_string(), "Item 2".to_string()], })] ); - let combined_commands = combined_saga.compute_new_actions(&Sum::First(order_created_event)); + let order_created_event2 = Event::OrderCreated(OrderCreatedEvent { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let combined_commands = combined_saga.compute_new_actions(&order_created_event2); assert_eq!( combined_commands, - [Sum::Second(ShipmentCommand::Create( - CreateShipmentCommand { - shipment_id: 1, - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - } - ))] + [Command::ShipmentCreate(CreateShipmentCommand { + shipment_id: 1, + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + })] ); } diff --git a/tests/view_test.rs b/tests/view_test.rs index ea6bbce..672ab71 100644 --- a/tests/view_test.rs +++ b/tests/view_test.rs @@ -1,41 +1,27 @@ use fmodel_rust::view::{View, ViewStateComputation}; -use fmodel_rust::{Sum::First as Order, Sum::Second as Shipment}; use crate::api::{ - OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, ShipmentCreatedEvent, - ShipmentEvent, + OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, OrderViewState, + ShipmentCreatedEvent, ShipmentEvent, ShipmentViewState, }; -mod api; - -#[derive(Debug, Clone, PartialEq)] -struct OrderViewState { - order_id: u32, - customer_name: String, - items: Vec, - is_cancelled: bool, -} +use crate::application::{event_from_sum, Event}; -#[derive(Debug, Clone, PartialEq)] -struct ShipmentViewState { - shipment_id: u32, - order_id: u32, - customer_name: String, - items: Vec, -} +mod api; +mod application; fn order_view<'a>() -> View<'a, OrderViewState, OrderEvent> { View { evolve: Box::new(|state, event| { let mut new_state = state.clone(); match event { - OrderEvent::Created(created_event) => { - new_state.order_id = created_event.order_id; - new_state.customer_name = created_event.customer_name.to_owned(); - new_state.items = created_event.items.to_owned(); + OrderEvent::Created(evt) => { + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); } - OrderEvent::Updated(updated_event) => { - new_state.items = updated_event.updated_items.to_owned(); + OrderEvent::Updated(evt) => { + new_state.items = evt.updated_items.to_owned(); } OrderEvent::Cancelled(_) => { new_state.is_cancelled = true; @@ -57,11 +43,11 @@ fn shipment_view<'a>() -> View<'a, ShipmentViewState, ShipmentEvent> { evolve: Box::new(|state, event| { let mut new_state = state.clone(); match event { - ShipmentEvent::Created(created_event) => { - new_state.shipment_id = created_event.shipment_id; - new_state.order_id = created_event.order_id; - new_state.customer_name = created_event.customer_name.to_owned(); - new_state.items = created_event.items.to_owned(); + ShipmentEvent::Created(evt) => { + new_state.shipment_id = evt.shipment_id; + new_state.order_id = evt.order_id; + new_state.customer_name = evt.customer_name.to_owned(); + new_state.items = evt.items.to_owned(); } } new_state @@ -80,19 +66,15 @@ fn test() { let order_view: View = order_view(); let order_view2: View = crate::order_view(); let shipment_view: View = shipment_view(); - let combined_view = order_view2.combine(shipment_view); + let combined_view = order_view2 + .combine(shipment_view) + .map_event(&event_from_sum); let order_created_event = OrderEvent::Created(OrderCreatedEvent { order_id: 1, customer_name: "John Doe".to_string(), items: vec!["Item 1".to_string(), "Item 2".to_string()], }); - let shipment_created_event = ShipmentEvent::Created(ShipmentCreatedEvent { - shipment_id: 1, - order_id: 1, - customer_name: "John Doe".to_string(), - items: vec!["Item 1".to_string(), "Item 2".to_string()], - }); let new_state = order_view.compute_new_state(None, &[&order_created_event]); assert_eq!( @@ -104,7 +86,12 @@ fn test() { is_cancelled: false, } ); - let new_combined_state2 = combined_view.compute_new_state(None, &[&Order(order_created_event)]); + let order_created_event2 = Event::OrderCreated(OrderCreatedEvent { + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let new_combined_state2 = combined_view.compute_new_state(None, &[&order_created_event2]); assert_eq!( new_combined_state2, ( @@ -123,8 +110,13 @@ fn test() { ) ); - let new_combined_state3 = - combined_view.compute_new_state(None, &[&Shipment(shipment_created_event)]); + let shipment_created_event2 = Event::ShipmentCreated(ShipmentCreatedEvent { + shipment_id: 1, + order_id: 1, + customer_name: "John Doe".to_string(), + items: vec!["Item 1".to_string(), "Item 2".to_string()], + }); + let new_combined_state3 = combined_view.compute_new_state(None, &[&shipment_created_event2]); assert_eq!( new_combined_state3, (