Skip to content

Commit

Permalink
Refactor "emit" to "persist"
Browse files Browse the repository at this point in the history
Refactors "emit" as "persist" and attempts to convey the intention of associated operations that persistence is required.

As a consequence, the commit log marshaller is refactored to remove the optionality of some return types to support that all events must be persisted. Along the way, I also refactored some options into a result with increased error reporting and logging.
  • Loading branch information
huntc committed Nov 11, 2023
1 parent 8ac4a5d commit c91c93a
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 127 deletions.
1 change: 1 addition & 0 deletions akka-persistence-rs-commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-trait = { workspace = true }
chrono = { workspace = true }
ciborium = { workspace = true, optional = true }
itoa = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
smol_str = { workspace = true }
Expand Down
191 changes: 130 additions & 61 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use akka_persistence_rs::{
use async_stream::stream;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use log::warn;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData, pin::Pin, sync::Arc};
use streambed::{
Expand Down Expand Up @@ -69,6 +70,44 @@ impl<E> WithTimestamp for EventEnvelope<E> {
}
}

/// This describes an error when there has been some run-time issue in attempting to consume records.
#[derive(Debug)]
pub struct CannotConsume {
_entity_id: EntityId,
_cause: String,
}

impl CannotConsume {
pub fn new<S>(entity_id: EntityId, cause: S) -> Self
where
S: ToString,
{
Self {
_entity_id: entity_id,
_cause: cause.to_string(),
}
}
}

/// This describes an error when there has been some run-time issue in attempting to produce records.
#[derive(Debug)]
pub struct CannotProduce {
_entity_id: EntityId,
_cause: String,
}

impl CannotProduce {
pub fn new<S>(entity_id: EntityId, cause: S) -> Self
where
S: ToString,
{
Self {
_entity_id: entity_id,
_cause: cause.to_string(),
}
}
}

/// Provides the ability to transform the the memory representation of Akka Persistence events from
/// and to the records that a CommitLog expects.
#[async_trait]
Expand All @@ -82,17 +121,20 @@ where
/// Provide a key we can use for the purposes of log compaction.
/// A key would generally comprise and event type value held in
/// the high bits, and the entity id in the lower bits.
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option<Key>;
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Key;

/// Extract an entity id from a consumer envelope.
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId>;

/// Produce an event envelope from a consumer record.
/// Note that this may not always be possible due to record formats having
/// changed, in which case we want the consumer to continue and skip it.
/// Changes in a record's layout should not prevent the system from working.
async fn envelope(
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Option<EventEnvelope<E>>;
) -> Result<EventEnvelope<E>, CannotConsume>;

/// Produce a producer record from an event and its entity info.
async fn producer_record(
Expand All @@ -102,7 +144,7 @@ where
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Option<ProducerRecord>;
) -> Result<ProducerRecord, CannotProduce>;
}

/// Provides the ability to transform the the memory representation of Akka Persistence events from
Expand All @@ -127,18 +169,26 @@ where
&self,
entity_id: EntityId,
mut record: ConsumerRecord,
) -> Option<EventEnvelope<E>> {
) -> Result<EventEnvelope<E>, CannotConsume> {
use streambed::commit_log::{Header, HeaderKey};

streambed::decrypt_buf(
let secret_path = self.secret_path(&entity_id);
let event = streambed::decrypt_buf(
self.secret_store(),
&self.secret_path(&entity_id),
&secret_path,
&mut record.value,
|value| ciborium::de::from_reader(value),
)
.await
.and_then(|event| {
let seq_nr = record.headers.iter().find_map(|Header { key, value }| {
.ok_or(CannotConsume::new(
entity_id.clone(),
format!("Cannot decrypt with key: {secret_path}"),
))?;

let seq_nr = record
.headers
.iter()
.find_map(|Header { key, value }| {
if key == &HeaderKey::from("seq_nr") {
if value.len() >= 8 {
if let Ok(value) = value[0..8].try_into() {
Expand All @@ -152,18 +202,22 @@ where
} else {
None
}
});
seq_nr.and_then(|seq_nr| {
record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr,
timestamp,
event,
offset: record.offset,
tags: vec![],
})
})
})
.ok_or(CannotConsume::new(
entity_id.clone(),
"Cannot find seq_nr header",
))?;

let envelope = record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()),
seq_nr,
timestamp,
event,
offset: record.offset,
tags: vec![],
});

envelope.ok_or(CannotConsume::new(entity_id, "No timestamp"))
}

#[cfg(not(feature = "cbor"))]
Expand All @@ -181,22 +235,29 @@ where
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Option<ProducerRecord> {
) -> Result<ProducerRecord, CannotProduce> {
use streambed::commit_log::{Header, HeaderKey};

let key = self.to_compaction_key(&entity_id, event)?;
// Being unable to derive a compaction key is a non-recoverable error.
let key = self.to_compaction_key(&entity_id, event);
let secret_path = self.secret_path(&entity_id);
let buf = streambed::encrypt_struct(
self.secret_store(),
&self.secret_path(&entity_id),
&secret_path,
|event| {
let mut buf = Vec::new();
ciborium::ser::into_writer(event, &mut buf).map(|_| buf)
},
rand::thread_rng,
&event,
)
.await?;
Some(ProducerRecord {
.await
.ok_or(CannotProduce::new(
entity_id,
format!("Problem encrypting and serializing with secret path: {secret_path}"),
))?;

Ok(ProducerRecord {
topic,
headers: vec![Header {
key: HeaderKey::from("seq_nr"),
Expand Down Expand Up @@ -287,15 +348,18 @@ where
Ok(Box::pin(stream!({
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = marshaller.to_entity_id(&consumer_record) {
if let Some(envelope) =
marshaller.envelope(record_entity_id, consumer_record).await
{
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
match marshaller.envelope(record_entity_id, consumer_record).await {
Ok(envelope) => {
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
}
Err(e) => {
warn!("When initially consuming: {e:?}");
}
}
}
}
Expand Down Expand Up @@ -324,15 +388,18 @@ where
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = marshaller.to_entity_id(&consumer_record) {
if &record_entity_id == entity_id {
if let Some(envelope) =
marshaller.envelope(record_entity_id, consumer_record).await
{
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
match marshaller.envelope(record_entity_id, consumer_record).await {
Ok(envelope) => {
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
}
Err(e) => {
warn!("When consuming: {e:?}");
}
}
}
}
Expand Down Expand Up @@ -398,7 +465,7 @@ where
&envelope.event,
)
.await
.ok_or_else(|| {
.map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"A problem occurred converting a envelope when producing",
Expand Down Expand Up @@ -444,19 +511,18 @@ pub mod cbor {
impl<E, F, SS> CommitLogMarshaller<E> for Marshaller<E, F, SS>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> Option<u32> + Sync,
F: Fn(&E) -> u32 + Sync,
SS: SecretStore,
{
fn entity_type(&self) -> EntityType {
self.entity_type.clone()
}

fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option<Key> {
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Key {
let record_type = (self.to_record_type)(event);
record_type.and_then(|record_type| {
let entity_id = entity_id.parse::<u32>().ok()?;
Some((record_type as u64) << EVENT_TYPE_BIT_SHIFT | entity_id as u64)
})
// It is an unrecoverable error if the entity id is non-numeric.
let entity_id = entity_id.parse::<u32>().unwrap();
(record_type as u64) << EVENT_TYPE_BIT_SHIFT | entity_id as u64
}

fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
Expand All @@ -469,7 +535,7 @@ pub mod cbor {
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Option<EventEnvelope<E>> {
) -> Result<EventEnvelope<E>, CannotConsume> {
self.decrypted_envelope(entity_id, record).await
}

Expand All @@ -480,7 +546,7 @@ pub mod cbor {
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Option<ProducerRecord> {
) -> Result<ProducerRecord, CannotProduce> {
self.encrypted_producer_record(topic, entity_id, seq_nr, timestamp, event)
.await
}
Expand All @@ -490,7 +556,7 @@ pub mod cbor {
impl<E, F, SS> EncryptedCommitLogMarshaller<E> for Marshaller<E, F, SS>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> Option<u32> + Sync,
F: Fn(&E) -> u32 + Sync,
SS: SecretStore,
{
type SecretStore = SS;
Expand All @@ -516,7 +582,7 @@ pub mod cbor {
) -> Marshaller<E, F, SS>
where
for<'a> E: DeserializeOwned + Serialize + Send + Sync + 'a,
F: Fn(&E) -> Option<u32> + Sync,
F: Fn(&E) -> u32 + Sync,
SS: SecretStore,
S: ToString,
{
Expand Down Expand Up @@ -593,7 +659,7 @@ mod tests {
EntityType::from("some-entity-type")
}

fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Key {
panic!("should not be called")
}

Expand All @@ -609,17 +675,20 @@ mod tests {
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Option<EventEnvelope<MyEvent>> {
let value = String::from_utf8(record.value).ok()?;
) -> Result<EventEnvelope<MyEvent>, CannotConsume> {
let value = String::from_utf8(record.value)
.ok()
.ok_or(CannotConsume::new(entity_id.clone(), "bad entity id"))?;
let event = MyEvent { value };
record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
let envelope = record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()),
seq_nr: 1,
timestamp,
event,
offset: 0,
tags: vec![],
})
});
envelope.ok_or(CannotConsume::new(entity_id, "No timestamp"))
}

async fn producer_record(
Expand All @@ -629,12 +698,12 @@ mod tests {
_seq_nr: u64,
timestamp: DateTime<Utc>,
event: &MyEvent,
) -> Option<ProducerRecord> {
) -> Result<ProducerRecord, CannotProduce> {
let headers = vec![Header {
key: HeaderKey::from("entity-id"),
value: entity_id.as_bytes().into(),
}];
Some(ProducerRecord {
Ok(ProducerRecord {
topic,
headers,
timestamp: Some(timestamp),
Expand Down
4 changes: 2 additions & 2 deletions akka-persistence-rs/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{io, num::NonZeroUsize, pin::Pin, sync::Arc};

use akka_persistence_rs::{
effect::{emit_event, Effect, EffectExt},
effect::{persist_event, Effect, EffectExt},
entity::{Context, EventSourcedBehavior},
entity_manager::{self, EventEnvelope, Handler, SourceProvider},
EntityId, Message,
Expand Down Expand Up @@ -32,7 +32,7 @@ impl EventSourcedBehavior for Behavior {
_state: &Self::State,
_command: Self::Command,
) -> Box<dyn Effect<Self>> {
emit_event(Event).boxed()
persist_event(Event).boxed()
}

fn on_event(_context: &Context, _state: &mut Self::State, _event: Self::Event) {}
Expand Down
Loading

0 comments on commit c91c93a

Please sign in to comment.