Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor "emit" to "persist" #119

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions akka-persistence-rs-commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-trait = { workspace = true }
chrono = { workspace = true }
ciborium = { workspace = true, optional = true }
itoa = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
smol_str = { workspace = true }
Expand Down
191 changes: 130 additions & 61 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use akka_persistence_rs::{
use async_stream::stream;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use log::warn;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData, pin::Pin, sync::Arc};
use streambed::{
Expand Down Expand Up @@ -69,6 +70,44 @@ impl<E> WithTimestamp for EventEnvelope<E> {
}
}

/// This describes an error when there has been some run-time issue in attempting to consume records.
#[derive(Debug)]
pub struct CannotConsume {
_entity_id: EntityId,
_cause: String,
}

impl CannotConsume {
pub fn new<S>(entity_id: EntityId, cause: S) -> Self
where
S: ToString,
{
Self {
_entity_id: entity_id,
_cause: cause.to_string(),
}
}
}

/// This describes an error when there has been some run-time issue in attempting to produce records.
#[derive(Debug)]
pub struct CannotProduce {
_entity_id: EntityId,
_cause: String,
}

impl CannotProduce {
pub fn new<S>(entity_id: EntityId, cause: S) -> Self
where
S: ToString,
{
Self {
_entity_id: entity_id,
_cause: cause.to_string(),
}
}
}

/// Provides the ability to transform the the memory representation of Akka Persistence events from
/// and to the records that a CommitLog expects.
#[async_trait]
Expand All @@ -82,17 +121,20 @@ where
/// Provide a key we can use for the purposes of log compaction.
/// A key would generally comprise and event type value held in
/// the high bits, and the entity id in the lower bits.
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option<Key>;
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Key;

/// Extract an entity id from a consumer envelope.
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId>;

/// Produce an event envelope from a consumer record.
/// Note that this may not always be possible due to record formats having
/// changed, in which case we want the consumer to continue and skip it.
/// Changes in a record's layout should not prevent the system from working.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a difference from Akka JVM. There we don't automatically accept and ignore events with invalid format. The reason is that it is most likely a mistake to not evolve the serialization format in a compatible way, and then ignoring such events and continue with other events can lead to data corruption on the consumer side.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m definitely relying on this behaviour with my production system. Given that events can persist in a commit long, potentially indefinitely, it’s a handy feature.

What would Akka do when sourcing events from Kafka? It seems wrong for the app to not be able to recover.

Would you mind creating a separate issue for this so we can move forward? Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be a conscious application decision by the application to skip events that it can't deserialize. For example an application specific serializer implementation that makes that decision.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created issue #122

async fn envelope(
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Option<EventEnvelope<E>>;
) -> Result<EventEnvelope<E>, CannotConsume>;

/// Produce a producer record from an event and its entity info.
async fn producer_record(
Expand All @@ -102,7 +144,7 @@ where
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Option<ProducerRecord>;
) -> Result<ProducerRecord, CannotProduce>;
}

/// Provides the ability to transform the the memory representation of Akka Persistence events from
Expand All @@ -127,18 +169,26 @@ where
&self,
entity_id: EntityId,
mut record: ConsumerRecord,
) -> Option<EventEnvelope<E>> {
) -> Result<EventEnvelope<E>, CannotConsume> {
use streambed::commit_log::{Header, HeaderKey};

streambed::decrypt_buf(
let secret_path = self.secret_path(&entity_id);
let event = streambed::decrypt_buf(
self.secret_store(),
&self.secret_path(&entity_id),
&secret_path,
&mut record.value,
|value| ciborium::de::from_reader(value),
)
.await
.and_then(|event| {
let seq_nr = record.headers.iter().find_map(|Header { key, value }| {
.ok_or(CannotConsume::new(
entity_id.clone(),
format!("Cannot decrypt with key: {secret_path}"),
))?;

let seq_nr = record
.headers
.iter()
.find_map(|Header { key, value }| {
if key == &HeaderKey::from("seq_nr") {
if value.len() >= 8 {
if let Ok(value) = value[0..8].try_into() {
Expand All @@ -152,18 +202,22 @@ where
} else {
None
}
});
seq_nr.and_then(|seq_nr| {
record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr,
timestamp,
event,
offset: record.offset,
tags: vec![],
})
})
})
.ok_or(CannotConsume::new(
entity_id.clone(),
"Cannot find seq_nr header",
))?;

let envelope = record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()),
seq_nr,
timestamp,
event,
offset: record.offset,
tags: vec![],
});

envelope.ok_or(CannotConsume::new(entity_id, "No timestamp"))
}

#[cfg(not(feature = "cbor"))]
Expand All @@ -181,22 +235,29 @@ where
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Option<ProducerRecord> {
) -> Result<ProducerRecord, CannotProduce> {
use streambed::commit_log::{Header, HeaderKey};

let key = self.to_compaction_key(&entity_id, event)?;
// Being unable to derive a compaction key is a non-recoverable error.
let key = self.to_compaction_key(&entity_id, event);
let secret_path = self.secret_path(&entity_id);
let buf = streambed::encrypt_struct(
self.secret_store(),
&self.secret_path(&entity_id),
&secret_path,
|event| {
let mut buf = Vec::new();
ciborium::ser::into_writer(event, &mut buf).map(|_| buf)
},
rand::thread_rng,
&event,
)
.await?;
Some(ProducerRecord {
.await
.ok_or(CannotProduce::new(
entity_id,
format!("Problem encrypting and serializing with secret path: {secret_path}"),
))?;

Ok(ProducerRecord {
topic,
headers: vec![Header {
key: HeaderKey::from("seq_nr"),
Expand Down Expand Up @@ -287,15 +348,18 @@ where
Ok(Box::pin(stream!({
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = marshaller.to_entity_id(&consumer_record) {
if let Some(envelope) =
marshaller.envelope(record_entity_id, consumer_record).await
{
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
match marshaller.envelope(record_entity_id, consumer_record).await {
Ok(envelope) => {
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
}
Err(e) => {
warn!("When initially consuming: {e:?}");
}
}
}
}
Expand Down Expand Up @@ -324,15 +388,18 @@ where
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = marshaller.to_entity_id(&consumer_record) {
if &record_entity_id == entity_id {
if let Some(envelope) =
marshaller.envelope(record_entity_id, consumer_record).await
{
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
match marshaller.envelope(record_entity_id, consumer_record).await {
Ok(envelope) => {
yield EntityManagerEventEnvelope::new(
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
);
}
Err(e) => {
warn!("When consuming: {e:?}");
}
}
}
}
Expand Down Expand Up @@ -398,7 +465,7 @@ where
&envelope.event,
)
.await
.ok_or_else(|| {
.map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"A problem occurred converting a envelope when producing",
Expand Down Expand Up @@ -444,19 +511,18 @@ pub mod cbor {
impl<E, F, SS> CommitLogMarshaller<E> for Marshaller<E, F, SS>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> Option<u32> + Sync,
F: Fn(&E) -> u32 + Sync,
SS: SecretStore,
{
fn entity_type(&self) -> EntityType {
self.entity_type.clone()
}

fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option<Key> {
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Key {
let record_type = (self.to_record_type)(event);
record_type.and_then(|record_type| {
let entity_id = entity_id.parse::<u32>().ok()?;
Some((record_type as u64) << EVENT_TYPE_BIT_SHIFT | entity_id as u64)
})
// It is an unrecoverable error if the entity id is non-numeric.
let entity_id = entity_id.parse::<u32>().unwrap();
(record_type as u64) << EVENT_TYPE_BIT_SHIFT | entity_id as u64
}

fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
Expand All @@ -469,7 +535,7 @@ pub mod cbor {
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Option<EventEnvelope<E>> {
) -> Result<EventEnvelope<E>, CannotConsume> {
self.decrypted_envelope(entity_id, record).await
}

Expand All @@ -480,7 +546,7 @@ pub mod cbor {
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Option<ProducerRecord> {
) -> Result<ProducerRecord, CannotProduce> {
self.encrypted_producer_record(topic, entity_id, seq_nr, timestamp, event)
.await
}
Expand All @@ -490,7 +556,7 @@ pub mod cbor {
impl<E, F, SS> EncryptedCommitLogMarshaller<E> for Marshaller<E, F, SS>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> Option<u32> + Sync,
F: Fn(&E) -> u32 + Sync,
SS: SecretStore,
{
type SecretStore = SS;
Expand All @@ -516,7 +582,7 @@ pub mod cbor {
) -> Marshaller<E, F, SS>
where
for<'a> E: DeserializeOwned + Serialize + Send + Sync + 'a,
F: Fn(&E) -> Option<u32> + Sync,
F: Fn(&E) -> u32 + Sync,
SS: SecretStore,
S: ToString,
{
Expand Down Expand Up @@ -593,7 +659,7 @@ mod tests {
EntityType::from("some-entity-type")
}

fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Key {
panic!("should not be called")
}

Expand All @@ -609,17 +675,20 @@ mod tests {
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Option<EventEnvelope<MyEvent>> {
let value = String::from_utf8(record.value).ok()?;
) -> Result<EventEnvelope<MyEvent>, CannotConsume> {
let value = String::from_utf8(record.value)
.ok()
.ok_or(CannotConsume::new(entity_id.clone(), "bad entity id"))?;
let event = MyEvent { value };
record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
let envelope = record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id.clone()),
seq_nr: 1,
timestamp,
event,
offset: 0,
tags: vec![],
})
});
envelope.ok_or(CannotConsume::new(entity_id, "No timestamp"))
}

async fn producer_record(
Expand All @@ -629,12 +698,12 @@ mod tests {
_seq_nr: u64,
timestamp: DateTime<Utc>,
event: &MyEvent,
) -> Option<ProducerRecord> {
) -> Result<ProducerRecord, CannotProduce> {
let headers = vec![Header {
key: HeaderKey::from("entity-id"),
value: entity_id.as_bytes().into(),
}];
Some(ProducerRecord {
Ok(ProducerRecord {
topic,
headers,
timestamp: Some(timestamp),
Expand Down
4 changes: 2 additions & 2 deletions akka-persistence-rs/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{io, num::NonZeroUsize, pin::Pin, sync::Arc};

use akka_persistence_rs::{
effect::{emit_event, Effect, EffectExt},
effect::{persist_event, Effect, EffectExt},
entity::{Context, EventSourcedBehavior},
entity_manager::{self, EventEnvelope, Handler, SourceProvider},
EntityId, Message,
Expand Down Expand Up @@ -32,7 +32,7 @@ impl EventSourcedBehavior for Behavior {
_state: &Self::State,
_command: Self::Command,
) -> Box<dyn Effect<Self>> {
emit_event(Event).boxed()
persist_event(Event).boxed()
}

fn on_event(_context: &Context, _state: &mut Self::State, _event: Self::Event) {}
Expand Down
Loading