diff --git a/Cargo.toml b/Cargo.toml index 648c649..8d448db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ git-version = "0.3.5" hex = "0.4.3" humantime = "2.1.0" log = "0.4.17" +lru = "0.11.0" postcard = { version = "1.0.6", default-features = false } rand = "0.8" scopeguard = "1.1" diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 7f48544..0d393c6 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -185,9 +185,20 @@ where async fn produce( &mut self, - _entity_id: &EntityId, + entity_id: &EntityId, ) -> io::Result> + Send + 'async_trait>>> { - Ok(Box::pin(tokio_stream::empty())) + let records = self.produce_initial().await; + if let Ok(mut records) = records { + Ok(Box::pin(stream!({ + while let Some(record) = records.next().await { + if &record.entity_id == entity_id { + yield record; + } + } + }))) + } else { + Ok(Box::pin(tokio_stream::empty())) + } } async fn process(&mut self, record: Record) -> io::Result> { @@ -437,6 +448,18 @@ mod tests { .unwrap(); assert_eq!(record.entity_id, entity_id); + // Produce to a different entity id, so that we can test out the filtering next. + + adapter + .process(Record::new( + "some-other-entity-id", + MyEvent { + value: "third-event".to_string(), + }, + )) + .await + .unwrap(); + // Wait until the number of records reported as being written is the number // that we have produced. We should then return those events that have been // produced. @@ -446,14 +469,14 @@ mod tests { .offsets("some-topic".to_string(), 0) .await .map(|lo| lo.end_offset); - if last_offset == Some(2) { + if last_offset == Some(3) { break; } time::sleep(Duration::from_millis(100)).await; } { - let mut records = adapter.produce_initial().await.unwrap(); + let mut records = adapter.produce(&entity_id).await.unwrap(); let record = records.next().await.unwrap(); assert_eq!(record.entity_id, entity_id); diff --git a/akka-persistence-rs/Cargo.toml b/akka-persistence-rs/Cargo.toml index c05fbfd..46901c6 100644 --- a/akka-persistence-rs/Cargo.toml +++ b/akka-persistence-rs/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] async-trait = { workspace = true } +lru = { workspace = true } tokio = { workspace = true, features = [ "sync", "macros", diff --git a/akka-persistence-rs/src/effect.rs b/akka-persistence-rs/src/effect.rs index d166c9b..cbf8da5 100644 --- a/akka-persistence-rs/src/effect.rs +++ b/akka-persistence-rs/src/effect.rs @@ -1,9 +1,9 @@ //! Effects that are lazily performed as a result of performing a command //! of an entity. Effects can be chained with other effects. -use std::{collections::HashMap, future::Future, io, marker::PhantomData}; - use async_trait::async_trait; +use lru::LruCache; +use std::{future::Future, io, marker::PhantomData}; use tokio::sync::oneshot; use crate::{entity::EventSourcedBehavior, entity_manager::RecordAdapter, EntityId, Record}; @@ -28,10 +28,10 @@ where &mut self, behavior: &B, adapter: &mut (dyn RecordAdapter + Send), - entities: &mut HashMap, - entity_id: EntityId, + entities: &mut LruCache, + entity_id: &EntityId, prev_result: Result, - update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap, Record) + update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + Send), ) -> Result; } @@ -55,10 +55,10 @@ where &mut self, behavior: &B, adapter: &mut (dyn RecordAdapter + Send), - entities: &mut HashMap, - entity_id: EntityId, + entities: &mut LruCache, + entity_id: &EntityId, prev_result: Result, - update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap, Record) + update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + Send), ) -> Result { let r = self @@ -67,7 +67,7 @@ where behavior, adapter, entities, - entity_id.clone(), + entity_id, prev_result, update_entity, ) @@ -139,16 +139,16 @@ where &mut self, _behavior: &B, adapter: &mut (dyn RecordAdapter + Send), - entities: &mut HashMap, - entity_id: EntityId, + entities: &mut LruCache, + entity_id: &EntityId, prev_result: Result, - update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap, Record) + update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + Send), ) -> Result { if prev_result.is_ok() { if let Some(event) = self.event.take() { let record = Record { - entity_id, + entity_id: entity_id.clone(), event, metadata: crate::RecordMetadata { deletion_event: self.deletion_event, @@ -222,10 +222,10 @@ where &mut self, _behavior: &B, _adapter: &mut (dyn RecordAdapter + Send), - _entities: &mut HashMap, - _entity_id: EntityId, + _entities: &mut LruCache, + _entity_id: &EntityId, prev_result: Result, - _update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap, Record) + _update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + Send), ) -> Result { if prev_result.is_ok() { @@ -274,15 +274,15 @@ where &mut self, behavior: &B, _adapter: &mut (dyn RecordAdapter + Send), - entities: &mut HashMap, - entity_id: EntityId, + entities: &mut LruCache, + entity_id: &EntityId, prev_result: Result, - _update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap, Record) + _update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + Send), ) -> Result { let f = self.f.take(); if let Some(f) = f { - f(behavior, entities.get(&entity_id), prev_result).await + f(behavior, entities.get(entity_id), prev_result).await } else { Ok(()) } @@ -332,10 +332,10 @@ where &mut self, _behavior: &B, _adapter: &mut (dyn RecordAdapter + Send), - _entities: &mut HashMap, - _entity_id: EntityId, + _entities: &mut LruCache, + _entity_id: &EntityId, _prev_result: Result, - _update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap, Record) + _update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + Send), ) -> Result { Ok(()) diff --git a/akka-persistence-rs/src/entity.rs b/akka-persistence-rs/src/entity.rs index 07c88f7..6e69720 100644 --- a/akka-persistence-rs/src/entity.rs +++ b/akka-persistence-rs/src/entity.rs @@ -7,18 +7,21 @@ //! entity, in contrast to commands. Event sourced entities may also process commands that do not change application state such //! as query commands for example. +use async_trait::async_trait; + use crate::effect::Effect; use crate::EntityId; /// A context provides information about the environment that hosts a specific entity. -pub struct Context { +pub struct Context<'a> { /// The entity's unique identifier. - pub entity_id: EntityId, + pub entity_id: &'a EntityId, } /// An entity's behavior is the basic unit of modelling aspects of an Akka-Persistence-based application and /// encapsulates how commands can be applied to state, including the emission of events. Events can /// also be applied to state in order to produce more state. +#[async_trait] pub trait EventSourcedBehavior { /// The state managed by the entity. type State: Default; @@ -42,4 +45,10 @@ pub trait EventSourcedBehavior { /// the next state. No side effects are to be performed. Can be used to replay /// events to attain a new state i.e. the major function of event sourcing. fn on_event(context: &Context, state: &mut Self::State, event: &Self::Event); + + /// The entity will always receive a "recovery completed" signal, even if there + /// are no events sourced, or if it’s a new entity with a previously unused EntityId. + /// Any required side effects should be performed once recovery has completed by + /// overriding this method. + async fn on_recovery_completed(&self, _context: &Context, _state: &Self::State) {} } diff --git a/akka-persistence-rs/src/entity_manager.rs b/akka-persistence-rs/src/entity_manager.rs index 885a729..fad73ce 100644 --- a/akka-persistence-rs/src/entity_manager.rs +++ b/akka-persistence-rs/src/entity_manager.rs @@ -6,9 +6,11 @@ //! The entities will recover their state from a stream of events. use async_trait::async_trait; +use lru::LruCache; use std::io; +use std::marker::PhantomData; +use std::num::NonZeroUsize; use std::pin::Pin; -use std::{collections::HashMap, marker::PhantomData}; use tokio::sync::mpsc::Receiver; use tokio::task::{JoinError, JoinHandle}; use tokio_stream::{Stream, StreamExt}; @@ -41,10 +43,6 @@ pub trait RecordAdapter { async fn process(&mut self, record: Record) -> io::Result>; } -/// The default amount of state (instances of an entity) that we -/// retain at any one time. -pub const DEFAULT_ACTIVE_STATE: usize = 1; - /// Manages the lifecycle of entities given a specific behavior. /// Entity managers are established given a source of events associated /// with an entity type. That source is consumed by subsequently telling @@ -70,21 +68,19 @@ where self.join_handle.await } - fn update_entity(entities: &mut HashMap, record: Record) + fn update_entity(entities: &mut LruCache, record: Record) where B::State: Default, { if !record.metadata.deletion_event { // Apply an event to state, creating the entity entry if necessary. let context = Context { - entity_id: record.entity_id.clone(), + entity_id: &record.entity_id, }; - let state = entities - .entry(record.entity_id) - .or_insert_with(B::State::default); + let state = entities.get_or_insert_mut(record.entity_id.clone(), B::State::default); B::on_event(&context, state, &record.event); } else { - entities.remove(&record.entity_id); + entities.pop(&record.entity_id); } } } @@ -104,7 +100,13 @@ where B::State: Send + Sync, A: RecordAdapter + Send + 'static, { - Self::with_capacity(behavior, adapter, receiver, DEFAULT_ACTIVE_STATE) + const DEFAULT_ACTIVE_STATE: usize = 10; + Self::with_capacity( + behavior, + adapter, + receiver, + NonZeroUsize::new(DEFAULT_ACTIVE_STATE).unwrap(), + ) } /// Establish a new entity manager with capacity for a number of instances @@ -119,7 +121,7 @@ where behavior: B, mut adapter: A, mut receiver: Receiver>, - capacity: usize, + capacity: NonZeroUsize, ) -> Self where B::Command: Send, @@ -129,16 +131,19 @@ where let join_handle = tokio::spawn(async move { // Source our initial events and populate our internal entities map. - let mut entities = HashMap::with_capacity(capacity); + let mut entities = LruCache::new(capacity); if let Ok(records) = adapter.produce_initial().await { tokio::pin!(records); while let Some(record) = records.next().await { Self::update_entity(&mut entities, record); } + for (entity_id, state) in entities.iter() { + let context = Context { entity_id }; + behavior.on_recovery_completed(&context, state).await; + } } else { // A problem sourcing initial events is regarded as fatal. - return; } @@ -156,6 +161,12 @@ where Self::update_entity(&mut entities, record); } state = entities.get(&message.entity_id); + let context = Context { + entity_id: &message.entity_id, + }; + behavior + .on_recovery_completed(&context, state.unwrap_or(&B::State::default())) + .await; } else { continue; } @@ -165,7 +176,7 @@ where // Effects may emit events that will update state on success. let context = Context { - entity_id: message.entity_id, + entity_id: &message.entity_id, }; let mut effect = B::for_command( &context, @@ -177,13 +188,13 @@ where &behavior, &mut adapter, &mut entities, - context.entity_id.clone(), + context.entity_id, Ok(()), &mut |entities, record| Self::update_entity(entities, record), ) .await; if result.is_err() { - entities.remove(&context.entity_id); + entities.pop(context.entity_id); } } }); @@ -236,9 +247,12 @@ mod tests { } struct TempSensorBehavior { + recovered_1: Arc, + recovered_2: Arc, updated: Arc, } + #[async_trait] impl EventSourcedBehavior for TempSensorBehavior { type State = TempState; @@ -291,6 +305,15 @@ mod tests { TempEvent::TemperatureUpdated { temp } => state.temp = *temp, } } + + async fn on_recovery_completed(&self, context: &Context, state: &Self::State) { + if context.entity_id == "id-1" { + self.recovered_1.notify_one(); + } else { + self.recovered_2.notify_one(); + }; + println!("Recovered {} with {}!", context.entity_id, state.temp); + } } // The following adapter is not normally created by a developer, but we @@ -343,9 +366,13 @@ mod tests { async fn new_manager_with_one_update_and_a_message_reply() { // Set up the behavior and entity manager. + let temp_sensor_recovered_id_1 = Arc::new(Notify::new()); + let temp_sensor_recovered_id_2 = Arc::new(Notify::new()); let temp_sensor_updated = Arc::new(Notify::new()); let temp_sensor_behavior = TempSensorBehavior { + recovered_1: temp_sensor_recovered_id_1.clone(), + recovered_2: temp_sensor_recovered_id_2.clone(), updated: temp_sensor_updated.clone(), }; @@ -360,10 +387,11 @@ mod tests { let (temp_sensor, temp_sensor_receiver) = mpsc::channel(10); - EntityManager::new( + EntityManager::with_capacity( temp_sensor_behavior, temp_sensor_record_adapter, temp_sensor_receiver, + NonZeroUsize::new(1).unwrap(), ); // Send a command to update the temperature and wait until it is done. We then wait @@ -379,6 +407,7 @@ mod tests { .await .is_ok()); + temp_sensor_recovered_id_1.notified().await; temp_sensor_updated.notified().await; let (reply_to, reply) = oneshot::channel(); @@ -402,6 +431,8 @@ mod tests { .await .is_ok()); + temp_sensor_updated.notified().await; + // Delete the entity assert!(temp_sensor @@ -409,13 +440,31 @@ mod tests { .await .is_ok()); - // Create another entity + // Create another entity. This should cause cache eviction as the cache is + // size for a capacity of 1 when we created the entity manager. assert!(temp_sensor .send(Message::new("id-2", TempCommand::Register,)) .await .is_ok()); + temp_sensor_recovered_id_2.notified().await; + + // We test eviction by querying for id-1 again. This should + // fail as we have an empty produce method in our adapter. + + let (reply_to, reply) = oneshot::channel(); + assert!(temp_sensor + .send(Message::new( + "id-1", + TempCommand::GetTemperature { reply_to } + )) + .await + .is_ok()); + assert!(reply.await.is_err()); + + temp_sensor_recovered_id_1.notified().await; + // Drop our command sender so that the entity manager stops. drop(temp_sensor);