Skip to content

Commit

Permalink
Overhaul the offset store and improve the DX (#76)
Browse files Browse the repository at this point in the history
Rationalises the offset store down to just one from two we had. Another change will follow where we move the offset store concerns out of the source provider.

* Consumer test
* and perform seq_nr validation only when we have timestamp offsets.
* Additional privacy
* Offset store tests
* Tidy up some doc
  • Loading branch information
huntc authored Oct 31, 2023
1 parent 7e8de7d commit 8cffbd1
Show file tree
Hide file tree
Showing 21 changed files with 1,608 additions and 1,086 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ members = [
"akka-projection-rs",
"akka-projection-rs-commitlog",
"akka-projection-rs-grpc",
"akka-projection-rs-storage",
]

[workspace.package]
Expand Down
21 changes: 12 additions & 9 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use akka_persistence_rs::{
entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider},
EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset,
WithPersistenceId, WithSeqNr, WithTags, WithTimestampOffset,
EntityId, EntityType, Offset, PersistenceId, Source, Tag, WithOffset, WithPersistenceId,
WithSeqNr, WithSource, WithTags, WithTimestamp,
};
use async_stream::stream;
use async_trait::async_trait;
Expand Down Expand Up @@ -51,18 +51,21 @@ impl<E> WithSeqNr for EventEnvelope<E> {
}
}

impl<E> WithSource for EventEnvelope<E> {
fn source(&self) -> akka_persistence_rs::Source {
Source::Regular
}
}

impl<E> WithTags for EventEnvelope<E> {
fn tags(&self) -> &[akka_persistence_rs::Tag] {
fn tags(&self) -> &[Tag] {
&self.tags
}
}

impl<E> WithTimestampOffset for EventEnvelope<E> {
fn timestamp_offset(&self) -> TimestampOffset {
TimestampOffset {
timestamp: self.timestamp,
seen: vec![],
}
impl<E> WithTimestamp for EventEnvelope<E> {
fn timestamp(&self) -> &DateTime<Utc> {
&self.timestamp
}
}

Expand Down
81 changes: 15 additions & 66 deletions akka-persistence-rs/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@

use async_trait::async_trait;
use chrono::Utc;
use lru::LruCache;
use std::{future::Future, io, marker::PhantomData};
use tokio::sync::oneshot;

use crate::{
entity::EventSourcedBehavior,
entity_manager::{EntityStatus, EventEnvelope, Handler},
entity_manager::{EntityOps, EventEnvelope, Handler},
EntityId,
};

Expand All @@ -35,19 +34,14 @@ where
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut LruCache<EntityId, EntityStatus<B::State>>,
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
update_entity: &mut (dyn for<'a> FnMut(
&'a mut LruCache<EntityId, EntityStatus<B::State>>,
EventEnvelope<B::Event>,
) -> u64
+ Send),
) -> Result;
}

/// An effect to chain one effect with another.
/// The return type of [EffectExt::and].
pub struct And<E, L, R> {
_l: L,
_r: R,
Expand All @@ -66,15 +60,10 @@ where
&mut self,
behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut LruCache<EntityId, EntityStatus<B::State>>,
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
update_entity: &mut (dyn for<'a> FnMut(
&'a mut LruCache<EntityId, EntityStatus<B::State>>,
EventEnvelope<B::Event>,
) -> u64
+ Send),
) -> Result {
let r = self
._l
Expand All @@ -85,20 +74,11 @@ where
entity_id,
last_seq_nr,
prev_result,
update_entity,
)
.await;
if r.is_ok() {
self._r
.process(
behavior,
handler,
entities,
entity_id,
last_seq_nr,
r,
update_entity,
)
.process(behavior, handler, entities, entity_id, last_seq_nr, r)
.await
} else {
r
Expand Down Expand Up @@ -142,8 +122,7 @@ where
}
}

// EmitEvent

/// The return type of [emit_event] and [emit_deletion_event].
pub struct EmitEvent<B>
where
B: EventSourcedBehavior + Send + Sync + 'static,
Expand All @@ -163,15 +142,10 @@ where
&mut self,
_behavior: &B,
handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut LruCache<EntityId, EntityStatus<B::State>>,
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
last_seq_nr: &mut u64,
prev_result: Result,
update_entity: &mut (dyn for<'a> FnMut(
&'a mut LruCache<EntityId, EntityStatus<B::State>>,
EventEnvelope<B::Event>,
) -> u64
+ Send),
) -> Result {
if prev_result.is_ok() {
if let Some(event) = self.event.take() {
Expand All @@ -185,7 +159,7 @@ where
};
let result = handler.process(envelope).await.map_err(Error::IoError);
if let Ok(envelope) = result {
*last_seq_nr = update_entity(entities, envelope);
*last_seq_nr = entities.update(envelope);
Ok(())
} else {
result.map(|_| ())
Expand Down Expand Up @@ -234,8 +208,7 @@ where
}
}

// Reply

/// The return type of [reply].
pub struct Reply<B, T> {
replier: Option<(oneshot::Sender<T>, T)>,
phantom: PhantomData<B>,
Expand All @@ -251,15 +224,10 @@ where
&mut self,
_behavior: &B,
_handler: &mut (dyn Handler<B::Event> + Send),
_entities: &mut LruCache<EntityId, EntityStatus<B::State>>,
_entities: &mut (dyn EntityOps<B> + Send + Sync),
_entity_id: &EntityId,
_last_seq_nr: &mut u64,
prev_result: Result,
_update_entity: &mut (dyn for<'a> FnMut(
&'a mut LruCache<EntityId, EntityStatus<B::State>>,
EventEnvelope<B::Event>,
) -> u64
+ Send),
) -> Result {
if prev_result.is_ok() {
if let Some((reply_to, reply)) = self.replier.take() {
Expand Down Expand Up @@ -288,8 +256,7 @@ pub fn reply<B, T>(reply_to: oneshot::Sender<T>, reply: T) -> Reply<B, T> {
}
}

// Then

/// The return type of [then].
pub struct Then<B, F, R> {
f: Option<F>,
phantom: PhantomData<(B, R)>,
Expand All @@ -307,26 +274,14 @@ where
&mut self,
behavior: &B,
_handler: &mut (dyn Handler<B::Event> + Send),
entities: &mut LruCache<EntityId, EntityStatus<B::State>>,
entities: &mut (dyn EntityOps<B> + Send + Sync),
entity_id: &EntityId,
_last_seq_nr: &mut u64,
prev_result: Result,
_update_entity: &mut (dyn for<'a> FnMut(
&'a mut LruCache<EntityId, EntityStatus<B::State>>,
EventEnvelope<B::Event>,
) -> u64
+ Send),
) -> Result {
let f = self.f.take();
if let Some(f) = f {
f(
behavior,
entities
.get(entity_id)
.map(|entity_status| &entity_status.state),
prev_result,
)
.await
f(behavior, entities.get(entity_id), prev_result).await
} else {
Ok(())
}
Expand Down Expand Up @@ -361,8 +316,7 @@ where
}
}

// Unhandled

/// The return type of [unhandled].
pub struct Unhandled<E> {
phantom: PhantomData<E>,
}
Expand All @@ -376,15 +330,10 @@ where
&mut self,
_behavior: &B,
_handler: &mut (dyn Handler<B::Event> + Send),
_entities: &mut LruCache<EntityId, EntityStatus<B::State>>,
_entities: &mut (dyn EntityOps<B> + Send + Sync),
_entity_id: &EntityId,
_last_seq_nr: &mut u64,
_prev_result: Result,
_update_entity: &mut (dyn for<'a> FnMut(
&'a mut LruCache<EntityId, EntityStatus<B::State>>,
EventEnvelope<B::Event>,
) -> u64
+ Send),
) -> Result {
Ok(())
}
Expand Down
Loading

0 comments on commit 8cffbd1

Please sign in to comment.