From c91c93a04d034de6712156a2d0faa45c3a4b56c0 Mon Sep 17 00:00:00 2001 From: huntc Date: Sat, 11 Nov 2023 10:41:03 +1100 Subject: [PATCH] Refactor "emit" to "persist" Refactors "emit" as "persist" and attempts to convey the intention of associated operations that persistence is required. As a consequence, the commit log marshaller is refactored to remove the optionality of some return types to support that all events must be persisted. Along the way, I also refactored some options into a result with increased error reporting and logging. --- akka-persistence-rs-commitlog/Cargo.toml | 1 + akka-persistence-rs-commitlog/src/lib.rs | 191 ++++++++++++------ akka-persistence-rs/benches/benches.rs | 4 +- akka-persistence-rs/src/effect.rs | 57 +++--- akka-persistence-rs/src/entity.rs | 4 +- akka-persistence-rs/src/entity_manager.rs | 17 +- akka-projection-rs-commitlog/src/lib.rs | 24 ++- .../src/offset_store.rs | 39 ++-- 8 files changed, 210 insertions(+), 127 deletions(-) diff --git a/akka-persistence-rs-commitlog/Cargo.toml b/akka-persistence-rs-commitlog/Cargo.toml index 80df8ad..44271f9 100644 --- a/akka-persistence-rs-commitlog/Cargo.toml +++ b/akka-persistence-rs-commitlog/Cargo.toml @@ -15,6 +15,7 @@ async-trait = { workspace = true } chrono = { workspace = true } ciborium = { workspace = true, optional = true } itoa = { workspace = true } +log = { workspace = true } rand = { workspace = true } serde = { workspace = true } smol_str = { workspace = true } diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index ac89956..164aeba 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -8,6 +8,7 @@ use akka_persistence_rs::{ use async_stream::stream; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use log::warn; use serde::{de::DeserializeOwned, Serialize}; use std::{io, marker::PhantomData, pin::Pin, sync::Arc}; use streambed::{ @@ -69,6 +70,44 @@ impl WithTimestamp for EventEnvelope { } } +/// This describes an error when there has been some run-time issue in attempting to consume records. +#[derive(Debug)] +pub struct CannotConsume { + _entity_id: EntityId, + _cause: String, +} + +impl CannotConsume { + pub fn new(entity_id: EntityId, cause: S) -> Self + where + S: ToString, + { + Self { + _entity_id: entity_id, + _cause: cause.to_string(), + } + } +} + +/// This describes an error when there has been some run-time issue in attempting to produce records. +#[derive(Debug)] +pub struct CannotProduce { + _entity_id: EntityId, + _cause: String, +} + +impl CannotProduce { + pub fn new(entity_id: EntityId, cause: S) -> Self + where + S: ToString, + { + Self { + _entity_id: entity_id, + _cause: cause.to_string(), + } + } +} + /// Provides the ability to transform the the memory representation of Akka Persistence events from /// and to the records that a CommitLog expects. #[async_trait] @@ -82,17 +121,20 @@ where /// Provide a key we can use for the purposes of log compaction. /// A key would generally comprise and event type value held in /// the high bits, and the entity id in the lower bits. - fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option; + fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Key; /// Extract an entity id from a consumer envelope. fn to_entity_id(&self, record: &ConsumerRecord) -> Option; /// Produce an event envelope from a consumer record. + /// Note that this may not always be possible due to record formats having + /// changed, in which case we want the consumer to continue and skip it. + /// Changes in a record's layout should not prevent the system from working. async fn envelope( &self, entity_id: EntityId, record: ConsumerRecord, - ) -> Option>; + ) -> Result, CannotConsume>; /// Produce a producer record from an event and its entity info. async fn producer_record( @@ -102,7 +144,7 @@ where seq_nr: u64, timestamp: DateTime, event: &E, - ) -> Option; + ) -> Result; } /// Provides the ability to transform the the memory representation of Akka Persistence events from @@ -127,18 +169,26 @@ where &self, entity_id: EntityId, mut record: ConsumerRecord, - ) -> Option> { + ) -> Result, CannotConsume> { use streambed::commit_log::{Header, HeaderKey}; - streambed::decrypt_buf( + let secret_path = self.secret_path(&entity_id); + let event = streambed::decrypt_buf( self.secret_store(), - &self.secret_path(&entity_id), + &secret_path, &mut record.value, |value| ciborium::de::from_reader(value), ) .await - .and_then(|event| { - let seq_nr = record.headers.iter().find_map(|Header { key, value }| { + .ok_or(CannotConsume::new( + entity_id.clone(), + format!("Cannot decrypt with key: {secret_path}"), + ))?; + + let seq_nr = record + .headers + .iter() + .find_map(|Header { key, value }| { if key == &HeaderKey::from("seq_nr") { if value.len() >= 8 { if let Ok(value) = value[0..8].try_into() { @@ -152,18 +202,22 @@ where } else { None } - }); - seq_nr.and_then(|seq_nr| { - record.timestamp.map(|timestamp| EventEnvelope { - persistence_id: PersistenceId::new(self.entity_type(), entity_id), - seq_nr, - timestamp, - event, - offset: record.offset, - tags: vec![], - }) }) - }) + .ok_or(CannotConsume::new( + entity_id.clone(), + "Cannot find seq_nr header", + ))?; + + let envelope = record.timestamp.map(|timestamp| EventEnvelope { + persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()), + seq_nr, + timestamp, + event, + offset: record.offset, + tags: vec![], + }); + + envelope.ok_or(CannotConsume::new(entity_id, "No timestamp")) } #[cfg(not(feature = "cbor"))] @@ -181,13 +235,15 @@ where seq_nr: u64, timestamp: DateTime, event: &E, - ) -> Option { + ) -> Result { use streambed::commit_log::{Header, HeaderKey}; - let key = self.to_compaction_key(&entity_id, event)?; + // Being unable to derive a compaction key is a non-recoverable error. + let key = self.to_compaction_key(&entity_id, event); + let secret_path = self.secret_path(&entity_id); let buf = streambed::encrypt_struct( self.secret_store(), - &self.secret_path(&entity_id), + &secret_path, |event| { let mut buf = Vec::new(); ciborium::ser::into_writer(event, &mut buf).map(|_| buf) @@ -195,8 +251,13 @@ where rand::thread_rng, &event, ) - .await?; - Some(ProducerRecord { + .await + .ok_or(CannotProduce::new( + entity_id, + format!("Problem encrypting and serializing with secret path: {secret_path}"), + ))?; + + Ok(ProducerRecord { topic, headers: vec![Header { key: HeaderKey::from("seq_nr"), @@ -287,15 +348,18 @@ where Ok(Box::pin(stream!({ while let Some(consumer_record) = consumer_records.next().await { if let Some(record_entity_id) = marshaller.to_entity_id(&consumer_record) { - if let Some(envelope) = - marshaller.envelope(record_entity_id, consumer_record).await - { - yield EntityManagerEventEnvelope::new( - envelope.persistence_id.entity_id, - envelope.seq_nr, - envelope.timestamp, - envelope.event, - ); + match marshaller.envelope(record_entity_id, consumer_record).await { + Ok(envelope) => { + yield EntityManagerEventEnvelope::new( + envelope.persistence_id.entity_id, + envelope.seq_nr, + envelope.timestamp, + envelope.event, + ); + } + Err(e) => { + warn!("When initially consuming: {e:?}"); + } } } } @@ -324,15 +388,18 @@ where while let Some(consumer_record) = consumer_records.next().await { if let Some(record_entity_id) = marshaller.to_entity_id(&consumer_record) { if &record_entity_id == entity_id { - if let Some(envelope) = - marshaller.envelope(record_entity_id, consumer_record).await - { - yield EntityManagerEventEnvelope::new( - envelope.persistence_id.entity_id, - envelope.seq_nr, - envelope.timestamp, - envelope.event, - ); + match marshaller.envelope(record_entity_id, consumer_record).await { + Ok(envelope) => { + yield EntityManagerEventEnvelope::new( + envelope.persistence_id.entity_id, + envelope.seq_nr, + envelope.timestamp, + envelope.event, + ); + } + Err(e) => { + warn!("When consuming: {e:?}"); + } } } } @@ -398,7 +465,7 @@ where &envelope.event, ) .await - .ok_or_else(|| { + .map_err(|_| { io::Error::new( io::ErrorKind::Other, "A problem occurred converting a envelope when producing", @@ -444,19 +511,18 @@ pub mod cbor { impl CommitLogMarshaller for Marshaller where for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, - F: Fn(&E) -> Option + Sync, + F: Fn(&E) -> u32 + Sync, SS: SecretStore, { fn entity_type(&self) -> EntityType { self.entity_type.clone() } - fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option { + fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Key { let record_type = (self.to_record_type)(event); - record_type.and_then(|record_type| { - let entity_id = entity_id.parse::().ok()?; - Some((record_type as u64) << EVENT_TYPE_BIT_SHIFT | entity_id as u64) - }) + // It is an unrecoverable error if the entity id is non-numeric. + let entity_id = entity_id.parse::().unwrap(); + (record_type as u64) << EVENT_TYPE_BIT_SHIFT | entity_id as u64 } fn to_entity_id(&self, record: &ConsumerRecord) -> Option { @@ -469,7 +535,7 @@ pub mod cbor { &self, entity_id: EntityId, record: ConsumerRecord, - ) -> Option> { + ) -> Result, CannotConsume> { self.decrypted_envelope(entity_id, record).await } @@ -480,7 +546,7 @@ pub mod cbor { seq_nr: u64, timestamp: DateTime, event: &E, - ) -> Option { + ) -> Result { self.encrypted_producer_record(topic, entity_id, seq_nr, timestamp, event) .await } @@ -490,7 +556,7 @@ pub mod cbor { impl EncryptedCommitLogMarshaller for Marshaller where for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, - F: Fn(&E) -> Option + Sync, + F: Fn(&E) -> u32 + Sync, SS: SecretStore, { type SecretStore = SS; @@ -516,7 +582,7 @@ pub mod cbor { ) -> Marshaller where for<'a> E: DeserializeOwned + Serialize + Send + Sync + 'a, - F: Fn(&E) -> Option + Sync, + F: Fn(&E) -> u32 + Sync, SS: SecretStore, S: ToString, { @@ -593,7 +659,7 @@ mod tests { EntityType::from("some-entity-type") } - fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option { + fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Key { panic!("should not be called") } @@ -609,17 +675,20 @@ mod tests { &self, entity_id: EntityId, record: ConsumerRecord, - ) -> Option> { - let value = String::from_utf8(record.value).ok()?; + ) -> Result, CannotConsume> { + let value = String::from_utf8(record.value) + .ok() + .ok_or(CannotConsume::new(entity_id.clone(), "bad entity id"))?; let event = MyEvent { value }; - record.timestamp.map(|timestamp| EventEnvelope { - persistence_id: PersistenceId::new(self.entity_type(), entity_id), + let envelope = record.timestamp.map(|timestamp| EventEnvelope { + persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()), seq_nr: 1, timestamp, event, offset: 0, tags: vec![], - }) + }); + envelope.ok_or(CannotConsume::new(entity_id, "No timestamp")) } async fn producer_record( @@ -629,12 +698,12 @@ mod tests { _seq_nr: u64, timestamp: DateTime, event: &MyEvent, - ) -> Option { + ) -> Result { let headers = vec![Header { key: HeaderKey::from("entity-id"), value: entity_id.as_bytes().into(), }]; - Some(ProducerRecord { + Ok(ProducerRecord { topic, headers, timestamp: Some(timestamp), diff --git a/akka-persistence-rs/benches/benches.rs b/akka-persistence-rs/benches/benches.rs index c1921cf..ce4d972 100644 --- a/akka-persistence-rs/benches/benches.rs +++ b/akka-persistence-rs/benches/benches.rs @@ -1,7 +1,7 @@ use std::{io, num::NonZeroUsize, pin::Pin, sync::Arc}; use akka_persistence_rs::{ - effect::{emit_event, Effect, EffectExt}, + effect::{persist_event, Effect, EffectExt}, entity::{Context, EventSourcedBehavior}, entity_manager::{self, EventEnvelope, Handler, SourceProvider}, EntityId, Message, @@ -32,7 +32,7 @@ impl EventSourcedBehavior for Behavior { _state: &Self::State, _command: Self::Command, ) -> Box> { - emit_event(Event).boxed() + persist_event(Event).boxed() } fn on_event(_context: &Context, _state: &mut Self::State, _event: Self::Event) {} diff --git a/akka-persistence-rs/src/effect.rs b/akka-persistence-rs/src/effect.rs index 8c06749..8e3836f 100644 --- a/akka-persistence-rs/src/effect.rs +++ b/akka-persistence-rs/src/effect.rs @@ -125,7 +125,7 @@ where /// Perform a side effect to run a function asynchronously after this current one. /// The associated behavior is available so that communication channels, for /// example, can be accessed by the side-effect. Additionally, the - /// latest state given any previous effect having emitted an event, + /// latest state given any previous effect having persisted an event, /// or else the state at the outset of the effects being applied, /// is also available. fn and_then(self, f: F) -> And> @@ -145,19 +145,19 @@ where } } - /// An effect to emit an event. The latest state given any previous effect - /// having emitted an event, or else the state at the outset of the effects + /// An effect to persist an event. The latest state given any previous effect + /// having persisted an event, or else the state at the outset of the effects /// being applied, is also available. /// /// Only applied when the previous result succeeded. #[allow(clippy::type_complexity)] - fn then_emit_event( + fn then_persist_event( self, f: F, ) -> And< B, Self, - ThenEmitEvent< + ThenPersistEvent< B, Box< dyn FnOnce( @@ -181,7 +181,7 @@ where }); And { _l: self, - _r: ThenEmitEvent { + _r: ThenPersistEvent { deletion_event: false, f: Some(f), phantom: PhantomData, @@ -191,7 +191,7 @@ where } /// An effect to reply an envelope. The latest state given any previous effect - /// having emitted an event, or else the state at the outset of the effects + /// having persisted an event, or else the state at the outset of the effects /// being applied, is also available. /// /// Only applied when the previous result succeeded. @@ -238,8 +238,8 @@ where } } -/// The return type of [emit_event] and [emit_deletion_event]. -pub struct EmitEvent +/// The return type of [persist_event] and [persist_deletion_event]. +pub struct PersistEvent where B: EventSourcedBehavior, { @@ -248,7 +248,7 @@ where } #[async_trait] -impl Effect for EmitEvent +impl Effect for PersistEvent where B: EventSourcedBehavior + Send + Sync + 'static, B::State: Send, @@ -289,7 +289,7 @@ where } } -impl EffectExt for EmitEvent +impl EffectExt for PersistEvent where B: EventSourcedBehavior + Send + Sync + 'static, B::State: Send, @@ -297,26 +297,25 @@ where { } -/// An effect to emit an event upon having successfully handed it off to -/// be persisted. -pub fn emit_event(event: B::Event) -> EmitEvent +/// An effect to persist an event. +pub fn persist_event(event: B::Event) -> PersistEvent where B: EventSourcedBehavior + Send + Sync + 'static, { - EmitEvent { + PersistEvent { deletion_event: false, event: Some(event), } } -/// An effect to emit an event upon having successfully handed it off to +/// An effect to persist an event upon having successfully handed it off to /// be persisted. The event will be flagged to represent the deletion of /// an entity instance. -pub fn emit_deletion_event(event: B::Event) -> EmitEvent +pub fn persist_deletion_event(event: B::Event) -> PersistEvent where B: EventSourcedBehavior + Send + Sync + 'static, { - EmitEvent { + PersistEvent { deletion_event: true, event: Some(event), } @@ -368,7 +367,7 @@ where } /// An effect to reply an envelope if the previous effect completed -/// successfully. Note that if state from having emitted an event +/// successfully. Note that if state from having persisted an event /// via a prior effect is required, then use a [then] effect instead. pub fn reply(reply_to: oneshot::Sender, reply: T) -> Reply { Reply { @@ -421,7 +420,7 @@ where /// A side effect to run a function asynchronously. The associated /// behavior is available so that communication channels, for /// example, can be accessed by the side-effect. Additionally, the -/// latest state given any previous effect having emitted an event, +/// latest state given any previous effect having persisted an event, /// or else the state at the outset of the effects being applied, /// is also available. pub fn then(f: F) -> Then @@ -437,8 +436,8 @@ where } } -/// The return type of [EffectExt::and_then_emit_event] and [EffectExt::and_then_emit_deletion_event]. -pub struct ThenEmitEvent +/// The return type of [EffectExt::then_persist_event]. +pub struct ThenPersistEvent where B: EventSourcedBehavior, { @@ -448,7 +447,7 @@ where } #[async_trait] -impl Effect for ThenEmitEvent +impl Effect for ThenPersistEvent where B: EventSourcedBehavior + Send + Sync + 'static, B::State: Send + Sync, @@ -469,7 +468,7 @@ where if let Some(f) = f { let result = f(behavior, entities.get(entity_id), prev_result).await; if let Ok(event) = result { - let mut effect = EmitEvent:: { + let mut effect = PersistEvent:: { deletion_event: self.deletion_event, event, }; @@ -485,7 +484,7 @@ where } } -impl EffectExt for ThenEmitEvent +impl EffectExt for ThenPersistEvent where B: EventSourcedBehavior + Send + Sync + 'static, B::State: Send + Sync, @@ -655,7 +654,7 @@ mod tests { } #[test(tokio::test)] - async fn test_emit_then_reply() { + async fn test_persist_then_reply() { let entity_id = EntityId::from("entity-id"); let expected = EventEnvelope { deletion_event: false, @@ -675,7 +674,7 @@ mod tests { let (reply_to, reply_to_receiver) = oneshot::channel(); let reply_value = 1; - assert!(emit_event(TestEvent) + assert!(persist_event(TestEvent) .then_reply(move |_s| Some((reply_to, reply_value))) .process( &TestBehavior, @@ -692,7 +691,7 @@ mod tests { } #[test(tokio::test)] - async fn test_reply_then_emit() { + async fn test_reply_then_persist() { let entity_id = EntityId::from("entity-id"); let expected = EventEnvelope { deletion_event: false, @@ -713,7 +712,7 @@ mod tests { let reply_value = 1; assert!(reply(reply_to, reply_value) - .then_emit_event(|_s| Some(TestEvent)) + .then_persist_event(|_s| Some(TestEvent)) .process( &TestBehavior, &mut handler, diff --git a/akka-persistence-rs/src/entity.rs b/akka-persistence-rs/src/entity.rs index 693dff7..39232a3 100644 --- a/akka-persistence-rs/src/entity.rs +++ b/akka-persistence-rs/src/entity.rs @@ -27,10 +27,10 @@ pub trait EventSourcedBehavior { type State: Default; /// The command(s) that are able to be processed by the entity. type Command; - /// The event emitted having performed an effect. + /// The event produced having performed an effect. type Event; - /// Given a state and command, optionally emit an effect that may cause an + /// Given a state and command, optionally produce an effect that may cause an /// event transition. Events are responsible for mutating state. /// State can also be associated with the behavior so that other effects can be /// performed. For example, a behavior might be created with a channel sender diff --git a/akka-persistence-rs/src/entity_manager.rs b/akka-persistence-rs/src/entity_manager.rs index 909a30b..d10ddee 100644 --- a/akka-persistence-rs/src/entity_manager.rs +++ b/akka-persistence-rs/src/entity_manager.rs @@ -119,9 +119,10 @@ where } /// Manages the lifecycle of entities given a specific behavior. -/// Entity managers are established given a source of events associated +/// Entity managers are established given an adapter of persistent events associated /// with an entity type. That source is consumed by subsequently telling -/// the entity manager to run, generally on its own task. +/// the entity manager to run, generally on its own task. Events are persisted by +/// calling on the adapter's handler. /// /// Commands are sent to a channel established for the entity manager. /// Effects may be produced as a result of performing a command, which may, @@ -224,7 +225,7 @@ where } // Given an entity, send it the command, possibly producing an effect. - // Effects may emit events that will update state on success. + // Effects may persist events that will update state on success. let context = Context { entity_id: &message.entity_id, @@ -307,7 +308,7 @@ mod tests { use super::*; use crate::{ - effect::{emit_deletion_event, emit_event, reply, unhandled, Effect, EffectExt}, + effect::{persist_deletion_event, persist_event, reply, unhandled, Effect, EffectExt}, entity::Context, }; use async_trait::async_trait; @@ -362,11 +363,11 @@ mod tests { ) -> Box> { match command { TempCommand::Register if !state.registered => { - emit_event(TempEvent::Registered).boxed() + persist_event(TempEvent::Registered).boxed() } TempCommand::Deregister if state.registered => { - emit_deletion_event(TempEvent::Deregistered).boxed() + persist_deletion_event(TempEvent::Deregistered).boxed() } TempCommand::GetTemperature { reply_to } if state.registered => { @@ -374,7 +375,7 @@ mod tests { } TempCommand::UpdateTemperature { temp } if state.registered => { - emit_event(TempEvent::TemperatureUpdated { temp }) + persist_event(TempEvent::TemperatureUpdated { temp }) .and_then(|behavior: &Self, new_state, prev_result| { let updated = behavior.updated.clone(); let temp = new_state.map_or(0, |s| s.temp); @@ -411,7 +412,7 @@ mod tests { // The following adapter is not normally created by a developer, but we // declare one here so that we can provide a source of events and capture - // ones emitted by the entity manager. + // ones persisted by the entity manager. struct VecEventEnvelopeAdapter { initial_events: Option>>, captured_events: mpsc::Sender>, diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index 25b4df8..550b661 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -94,11 +94,12 @@ where let persistence_id = PersistenceId::new(marshaller.entity_type(), record_entity_id); if self.slice_range.contains(&persistence_id.slice()) { - if let Some(envelope) = marshaller + match marshaller .envelope(persistence_id.entity_id, consumer_record) .await { - yield envelope; + Ok(envelope) => yield envelope, + Err(e) => log::info!("Problem consuming record: {e:?}"), } } } @@ -143,6 +144,7 @@ mod tests { use super::*; use akka_persistence_rs::{EntityId, EntityType}; + use akka_persistence_rs_commitlog::{CannotConsume, CannotProduce}; use chrono::{DateTime, Utc}; use serde::Deserialize; use streambed::commit_log::{ConsumerRecord, Header, Key, ProducerRecord}; @@ -175,7 +177,7 @@ mod tests { EntityType::from("some-topic") } - fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option { + fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Key { panic!("should not be called") } @@ -191,17 +193,19 @@ mod tests { &self, entity_id: EntityId, record: ConsumerRecord, - ) -> Option> { - let value = String::from_utf8(record.value).ok()?; + ) -> Result, CannotConsume> { + let value = String::from_utf8(record.value) + .map_err(|_| CannotConsume::new(entity_id.clone(), "Non numeric entity id"))?; let event = MyEvent { value }; - record.timestamp.map(|timestamp| EventEnvelope { - persistence_id: PersistenceId::new(self.entity_type(), entity_id), + let envelope = record.timestamp.map(|timestamp| EventEnvelope { + persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()), seq_nr: 1, timestamp, event, offset: 0, tags: vec![], - }) + }); + envelope.ok_or(CannotConsume::new(entity_id, "No timestamp")) } async fn producer_record( @@ -211,12 +215,12 @@ mod tests { _seq_nr: u64, timestamp: DateTime, event: &MyEvent, - ) -> Option { + ) -> Result { let headers = vec![Header { key: Topic::from("entity-id"), value: entity_id.as_bytes().into(), }]; - Some(ProducerRecord { + Ok(ProducerRecord { topic, headers, timestamp: Some(timestamp), diff --git a/akka-projection-rs-commitlog/src/offset_store.rs b/akka-projection-rs-commitlog/src/offset_store.rs index c7017ba..080b6b2 100644 --- a/akka-projection-rs-commitlog/src/offset_store.rs +++ b/akka-projection-rs-commitlog/src/offset_store.rs @@ -1,7 +1,7 @@ //! An offset store for use with the Streambed commit log. use akka_persistence_rs::{ - effect::{emit_event, reply, Effect, EffectExt}, + effect::{persist_event, reply, Effect, EffectExt}, entity::{Context, EventSourcedBehavior}, entity_manager, EntityId, EntityType, Message, Offset, PersistenceId, }; @@ -20,6 +20,7 @@ use tokio::sync::{mpsc, oneshot, watch, Notify}; mod internal { use std::sync::Arc; + use akka_persistence_rs_commitlog::{CannotConsume, CannotProduce}; use tokio::sync::Notify; use super::*; @@ -140,7 +141,7 @@ mod internal { Command::Save { persistence_id, offset, - } => emit_event(Event::Saved { + } => persist_event(Event::Saved { persistence_id: persistence_id.clone(), offset, }) @@ -195,13 +196,13 @@ mod internal { #[async_trait] impl CommitLogMarshaller for OffsetStoreEventMarshaller where - F: Fn(&EntityId, &Event) -> Option + Send + Sync, + F: Fn(&EntityId, &Event) -> Key + Send + Sync, { fn entity_type(&self) -> EntityType { self.entity_type.clone() } - fn to_compaction_key(&self, entity_id: &EntityId, event: &Event) -> Option { + fn to_compaction_key(&self, entity_id: &EntityId, event: &Event) -> Key { (self.to_compaction_key)(entity_id, event) } @@ -217,16 +218,22 @@ mod internal { &self, entity_id: EntityId, record: ConsumerRecord, - ) -> Option> { - let event = ciborium::de::from_reader(record.value.as_slice()).ok()?; - record.timestamp.map(|timestamp| EventEnvelope { - persistence_id: PersistenceId::new(self.entity_type(), entity_id), + ) -> Result, CannotConsume> { + let event = ciborium::de::from_reader(record.value.as_slice()).map_err(|e| { + CannotConsume::new( + entity_id.clone(), + format!("CBOR deserialization issue: {e}"), + ) + })?; + let envelope = record.timestamp.map(|timestamp| EventEnvelope { + persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()), seq_nr: 0, // We don't care about sequence numbers with the offset store as they won't be projected anywhere timestamp, event, offset: record.offset, tags: vec![], - }) + }); + envelope.ok_or(CannotConsume::new(entity_id, "No timestamp")) } async fn producer_record( @@ -236,16 +243,18 @@ mod internal { _seq_nr: u64, timestamp: DateTime, event: &Event, - ) -> Option { + ) -> Result { let mut value = Vec::new(); - ciborium::ser::into_writer(event, &mut value).ok()?; + ciborium::ser::into_writer(event, &mut value).map_err(|e| { + CannotProduce::new(entity_id.clone(), format!("CBOR serialization issue: {e}")) + })?; let headers = vec![Header { key: HeaderKey::from("entity-id"), value: entity_id.as_bytes().into(), }]; - let key = self.to_compaction_key(&entity_id, event)?; - Some(ProducerRecord { + let key = self.to_compaction_key(&entity_id, event); + Ok(ProducerRecord { topic, headers, timestamp: Some(timestamp), @@ -321,9 +330,9 @@ pub async fn run( .await .unwrap(); - let to_compaction_key = |_: &EntityId, event: &Event| -> Option { + let to_compaction_key = |_: &EntityId, event: &Event| -> Key { let Event::Saved { persistence_id, .. } = event; - Some(persistence_id.jdk_string_hash() as u64) + persistence_id.jdk_string_hash() as u64 }; let file_log_topic_adapter = CommitLogTopicAdapter::new(