Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LRU cache for the entity manager #4

Merged
merged 4 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ git-version = "0.3.5"
hex = "0.4.3"
humantime = "2.1.0"
log = "0.4.17"
lru = "0.11.0"
postcard = { version = "1.0.6", default-features = false }
rand = "0.8"
scopeguard = "1.1"
Expand Down
31 changes: 27 additions & 4 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,20 @@ where

async fn produce(
&mut self,
_entity_id: &EntityId,
entity_id: &EntityId,
) -> io::Result<Pin<Box<dyn Stream<Item = Record<E>> + Send + 'async_trait>>> {
Ok(Box::pin(tokio_stream::empty()))
let records = self.produce_initial().await;
if let Ok(mut records) = records {
Ok(Box::pin(stream!({
while let Some(record) = records.next().await {
if &record.entity_id == entity_id {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An optimization could be if we could read the record entity_id without deserializing the payload. Then it would be more efficient to scan for a single entity_id.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah - that's a good idea. I'll do that as a separate PR.

yield record;
}
}
})))
} else {
Ok(Box::pin(tokio_stream::empty()))
}
}

async fn process(&mut self, record: Record<E>) -> io::Result<Record<E>> {
Expand Down Expand Up @@ -437,6 +448,18 @@ mod tests {
.unwrap();
assert_eq!(record.entity_id, entity_id);

// Produce to a different entity id, so that we can test out the filtering next.

adapter
.process(Record::new(
"some-other-entity-id",
MyEvent {
value: "third-event".to_string(),
},
))
.await
.unwrap();

// Wait until the number of records reported as being written is the number
// that we have produced. We should then return those events that have been
// produced.
Expand All @@ -446,14 +469,14 @@ mod tests {
.offsets("some-topic".to_string(), 0)
.await
.map(|lo| lo.end_offset);
if last_offset == Some(2) {
if last_offset == Some(3) {
break;
}
time::sleep(Duration::from_millis(100)).await;
}

{
let mut records = adapter.produce_initial().await.unwrap();
let mut records = adapter.produce(&entity_id).await.unwrap();

let record = records.next().await.unwrap();
assert_eq!(record.entity_id, entity_id);
Expand Down
1 change: 1 addition & 0 deletions akka-persistence-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"

[dependencies]
async-trait = { workspace = true }
lru = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"macros",
Expand Down
46 changes: 23 additions & 23 deletions akka-persistence-rs/src/effect.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Effects that are lazily performed as a result of performing a command
//! of an entity. Effects can be chained with other effects.

use std::{collections::HashMap, future::Future, io, marker::PhantomData};

use async_trait::async_trait;
use lru::LruCache;
use std::{future::Future, io, marker::PhantomData};
use tokio::sync::oneshot;

use crate::{entity::EventSourcedBehavior, entity_manager::RecordAdapter, EntityId, Record};
Expand All @@ -28,10 +28,10 @@ where
&mut self,
behavior: &B,
adapter: &mut (dyn RecordAdapter<B::Event> + Send),
entities: &mut HashMap<EntityId, B::State>,
entity_id: EntityId,
entities: &mut LruCache<EntityId, B::State>,
entity_id: &EntityId,
prev_result: Result,
update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap<EntityId, B::State>, Record<B::Event>)
update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache<EntityId, B::State>, Record<B::Event>)
+ Send),
) -> Result;
}
Expand All @@ -55,10 +55,10 @@ where
&mut self,
behavior: &B,
adapter: &mut (dyn RecordAdapter<B::Event> + Send),
entities: &mut HashMap<EntityId, B::State>,
entity_id: EntityId,
entities: &mut LruCache<EntityId, B::State>,
entity_id: &EntityId,
prev_result: Result,
update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap<EntityId, B::State>, Record<B::Event>)
update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache<EntityId, B::State>, Record<B::Event>)
+ Send),
) -> Result {
let r = self
Expand All @@ -67,7 +67,7 @@ where
behavior,
adapter,
entities,
entity_id.clone(),
entity_id,
prev_result,
update_entity,
)
Expand Down Expand Up @@ -139,16 +139,16 @@ where
&mut self,
_behavior: &B,
adapter: &mut (dyn RecordAdapter<B::Event> + Send),
entities: &mut HashMap<EntityId, B::State>,
entity_id: EntityId,
entities: &mut LruCache<EntityId, B::State>,
entity_id: &EntityId,
prev_result: Result,
update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap<EntityId, B::State>, Record<B::Event>)
update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache<EntityId, B::State>, Record<B::Event>)
+ Send),
) -> Result {
if prev_result.is_ok() {
if let Some(event) = self.event.take() {
let record = Record {
entity_id,
entity_id: entity_id.clone(),
event,
metadata: crate::RecordMetadata {
deletion_event: self.deletion_event,
Expand Down Expand Up @@ -222,10 +222,10 @@ where
&mut self,
_behavior: &B,
_adapter: &mut (dyn RecordAdapter<B::Event> + Send),
_entities: &mut HashMap<EntityId, B::State>,
_entity_id: EntityId,
_entities: &mut LruCache<EntityId, B::State>,
_entity_id: &EntityId,
prev_result: Result,
_update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap<EntityId, B::State>, Record<B::Event>)
_update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache<EntityId, B::State>, Record<B::Event>)
+ Send),
) -> Result {
if prev_result.is_ok() {
Expand Down Expand Up @@ -274,15 +274,15 @@ where
&mut self,
behavior: &B,
_adapter: &mut (dyn RecordAdapter<B::Event> + Send),
entities: &mut HashMap<EntityId, B::State>,
entity_id: EntityId,
entities: &mut LruCache<EntityId, B::State>,
entity_id: &EntityId,
prev_result: Result,
_update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap<EntityId, B::State>, Record<B::Event>)
_update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache<EntityId, B::State>, Record<B::Event>)
+ Send),
) -> Result {
let f = self.f.take();
if let Some(f) = f {
f(behavior, entities.get(&entity_id), prev_result).await
f(behavior, entities.get(entity_id), prev_result).await
} else {
Ok(())
}
Expand Down Expand Up @@ -332,10 +332,10 @@ where
&mut self,
_behavior: &B,
_adapter: &mut (dyn RecordAdapter<B::Event> + Send),
_entities: &mut HashMap<EntityId, B::State>,
_entity_id: EntityId,
_entities: &mut LruCache<EntityId, B::State>,
_entity_id: &EntityId,
_prev_result: Result,
_update_entity: &mut (dyn for<'a> FnMut(&'a mut HashMap<EntityId, B::State>, Record<B::Event>)
_update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache<EntityId, B::State>, Record<B::Event>)
+ Send),
) -> Result {
Ok(())
Expand Down
13 changes: 11 additions & 2 deletions akka-persistence-rs/src/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@
//! entity, in contrast to commands. Event sourced entities may also process commands that do not change application state such
//! as query commands for example.

use async_trait::async_trait;

use crate::effect::Effect;
use crate::EntityId;

/// A context provides information about the environment that hosts a specific entity.
pub struct Context {
pub struct Context<'a> {
/// The entity's unique identifier.
pub entity_id: EntityId,
pub entity_id: &'a EntityId,
}

/// An entity's behavior is the basic unit of modelling aspects of an Akka-Persistence-based application and
/// encapsulates how commands can be applied to state, including the emission of events. Events can
/// also be applied to state in order to produce more state.
#[async_trait]
pub trait EventSourcedBehavior {
/// The state managed by the entity.
type State: Default;
Expand All @@ -42,4 +45,10 @@ pub trait EventSourcedBehavior {
/// the next state. No side effects are to be performed. Can be used to replay
/// events to attain a new state i.e. the major function of event sourcing.
fn on_event(context: &Context, state: &mut Self::State, event: &Self::Event);

/// The entity will always receive a "recovery completed" signal, even if there
/// are no events sourced, or if it’s a new entity with a previously unused EntityId.
/// Any required side effects should be performed once recovery has completed by
/// overriding this method.
async fn on_recovery_completed(&self, _context: &Context, _state: &Self::State) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
Loading