Skip to content

Commit

Permalink
Track out of sequence events when consuming projection
Browse files Browse the repository at this point in the history
An offset store similar in function to the JVM is introduced so that sequence numbers arriving out of order can be detected. For this, we use an entity manager to track the last sequence number seen for a given entity id.
  • Loading branch information
huntc committed Sep 25, 2023
1 parent ed7c91f commit 82dba87
Show file tree
Hide file tree
Showing 18 changed files with 477 additions and 216 deletions.
5 changes: 5 additions & 0 deletions akka-persistence-rs-commitlog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

This crate adapts Streambed's CommitLog and SecretStore traits for the purposes of it being used with Akka Persistence of akka-edge-rs.

> A `u64` "key" field is used with Streambed's abstraction of commit logs as modelled off [Kafka](https://kafka.apache.org/). Keys are
used to distinguish and entity and its record type. Therefore, you must have an entity id that can be represented directly in 64 bits,
along with some bits reserved for the record type. Devices at the edge are often represented with a 32 bit address, and so this should
not present a problem at the edge in general. Hashes are to be avoided due to their potential for collisions.

[Streambed](https://github.com/streambed/streambed-rs) provides an implementation of a commit log named ["Logged"](https://github.com/streambed/streambed-rs/tree/main/streambed-logged).
Logged is a library focused on conserving storage and is particularly suited for use at the edge that uses flash based
storage. Other forms of commit log are also supported by Streabmed, including a [Kafka](https://kafka.apache.org/)-like HTTP interface.
Expand Down
131 changes: 48 additions & 83 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,47 @@ impl<E> WithTimestampOffset for EventEnvelope<E> {
}

/// Provides the ability to transform the the memory representation of Akka Persistence events from
/// and to the records that a CommitLog expects. Given the "cbor" feature, we use CBOR for serialization.
/// Encryption/decryption to commit log records is also applied. Therefore a secret store is expected.
/// and to the records that a CommitLog expects.
#[async_trait]
pub trait CommitLogEventEnvelopeMarshaler<E>
pub trait CommitLogMarshaler<E>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
type SecretStore: SecretStore;

/// Provide a key we can use for the purposes of log compaction.
/// A key would generally comprise and event type value held in
/// the high bits, and the entity id in the lower bits.
fn to_compaction_key(entity_id: &EntityId, event: &E) -> Option<Key>;
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option<Key>;

/// Extract an entity id from a consumer envelope.
fn to_entity_id(record: &ConsumerRecord) -> Option<EntityId>;
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId>;

/// Produce an event envelope from a consumer record.
async fn envelope(
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Option<EventEnvelope<E>>;

/// Produce a producer record from an event and its entity info.
async fn producer_record(
&self,
topic: Topic,
entity_id: EntityId,
seq_nr: u64,
timestamp: DateTime<Utc>,
event: E,
) -> Option<ProducerRecord>;
}

/// Provides the ability to transform the the memory representation of Akka Persistence events from
/// and to the records that a CommitLog expects. Given the "cbor" feature, we use CBOR for serialization.
/// Encryption/decryption to commit log records is also applied. Therefore a secret store is expected.
#[async_trait]
pub trait EncryptedCommitLogMarshaler<E>: CommitLogMarshaler<E>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
type SecretStore: SecretStore;

/// Return a reference to a secret store for encryption/decryption.
fn secret_store(&self) -> &Self::SecretStore;
Expand All @@ -103,7 +128,7 @@ where
fn secret_path(&self, entity_id: &EntityId) -> Arc<str>;

#[cfg(feature = "cbor")]
async fn envelope(
async fn decrypted_envelope(
&self,
entity_id: EntityId,
mut record: ConsumerRecord,
Expand Down Expand Up @@ -142,14 +167,14 @@ where
}

#[cfg(not(feature = "cbor"))]
async fn envelope(
async fn decrypted_envelope(
&self,
entity_id: EntityId,
record: ConsumerRecord,
mut record: ConsumerRecord,
) -> Option<EventEnvelope<E>>;

#[cfg(feature = "cbor")]
async fn producer_record(
async fn encrypted_producer_record(
&self,
topic: Topic,
entity_id: EntityId,
Expand All @@ -159,7 +184,7 @@ where
) -> Option<ProducerRecord> {
use streambed::commit_log::{Header, HeaderKey};

let key = Self::to_compaction_key(&entity_id, &event)?;
let key = self.to_compaction_key(&entity_id, &event)?;
let buf = streambed::encrypt_struct(
self.secret_store(),
&self.secret_path(&entity_id),
Expand All @@ -185,7 +210,7 @@ where
}

#[cfg(not(feature = "cbor"))]
async fn producer_record(
async fn encrypted_producer_record(
&self,
topic: Topic,
entity_id: EntityId,
Expand All @@ -211,7 +236,7 @@ where
pub struct CommitLogTopicAdapter<CL, E, M>
where
CL: CommitLog,
M: CommitLogEventEnvelopeMarshaler<E>,
M: CommitLogMarshaler<E>,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
commit_log: CL,
Expand All @@ -224,7 +249,7 @@ where
impl<CL, E, M> CommitLogTopicAdapter<CL, E, M>
where
CL: CommitLog,
M: CommitLogEventEnvelopeMarshaler<E>,
M: CommitLogMarshaler<E>,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: Topic) -> Self {
Expand All @@ -242,7 +267,7 @@ where
impl<CL, E, M> SourceProvider<E> for CommitLogTopicAdapter<CL, E, M>
where
CL: CommitLog,
M: CommitLogEventEnvelopeMarshaler<E> + Send + Sync,
M: CommitLogMarshaler<E> + Send + Sync,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
async fn source_initial(
Expand All @@ -261,7 +286,7 @@ where
if let Ok(mut consumer_records) = consumer_records {
Ok(Box::pin(stream!({
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = M::to_entity_id(&consumer_record) {
if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) {
if let Some(envelope) =
marshaler.envelope(record_entity_id, consumer_record).await
{
Expand Down Expand Up @@ -297,7 +322,7 @@ where
if let Ok(mut consumer_records) = consumer_records {
Ok(Box::pin(stream!({
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = M::to_entity_id(&consumer_record) {
if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) {
if &record_entity_id == entity_id {
if let Some(envelope) =
marshaler.envelope(record_entity_id, consumer_record).await
Expand Down Expand Up @@ -356,7 +381,7 @@ async fn produce_to_last_offset<'async_trait>(
impl<CL, E, M> Handler<E> for CommitLogTopicAdapter<CL, E, M>
where
CL: CommitLog,
M: CommitLogEventEnvelopeMarshaler<E> + Send + Sync,
M: CommitLogMarshaler<E> + Send + Sync,
for<'async_trait> E: Clone + DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
async fn process(
Expand Down Expand Up @@ -399,10 +424,7 @@ mod tests {
use super::*;
use akka_persistence_rs::{entity::EventSourcedBehavior, entity_manager};
use serde::Deserialize;
use streambed::{
commit_log::{Header, HeaderKey},
secret_store::{AppRoleAuthReply, Error, GetSecretReply, SecretData, UserPassAuthReply},
};
use streambed::commit_log::{Header, HeaderKey};
use streambed_logged::FileLog;
use test_log::test;
use tokio::{sync::mpsc, time};
Expand Down Expand Up @@ -450,79 +472,22 @@ mod tests {
// can lay out an envelope any way that you would like to. Note that secret keys
// are important though.

#[derive(Clone)]
struct NoopSecretStore;

#[async_trait]
impl SecretStore for NoopSecretStore {
async fn approle_auth(
&self,
_role_id: &str,
_secret_id: &str,
) -> Result<AppRoleAuthReply, Error> {
panic!("should not be called")
}

async fn create_secret(
&self,
_secret_path: &str,
_secret_data: SecretData,
) -> Result<(), Error> {
panic!("should not be called")
}

async fn get_secret(&self, _secret_path: &str) -> Result<Option<GetSecretReply>, Error> {
panic!("should not be called")
}

async fn token_auth(&self, _token: &str) -> Result<(), Error> {
panic!("should not be called")
}

async fn userpass_auth(
&self,
_username: &str,
_password: &str,
) -> Result<UserPassAuthReply, Error> {
panic!("should not be called")
}

async fn userpass_create_update_user(
&self,
_current_username: &str,
_username: &str,
_password: &str,
) -> Result<(), Error> {
panic!("should not be called")
}
}

struct MyEventMarshaler;

#[async_trait]
impl CommitLogEventEnvelopeMarshaler<MyEvent> for MyEventMarshaler {
type SecretStore = NoopSecretStore;

fn to_compaction_key(_entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
impl CommitLogMarshaler<MyEvent> for MyEventMarshaler {
fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
panic!("should not be called")
}

fn to_entity_id(record: &ConsumerRecord) -> Option<EntityId> {
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
let Header { value, .. } = record
.headers
.iter()
.find(|header| header.key == "entity-id")?;
std::str::from_utf8(value).ok().map(EntityId::from)
}

fn secret_store(&self) -> &Self::SecretStore {
panic!("should not be called")
}

fn secret_path(&self, _entity_id: &EntityId) -> Arc<str> {
panic!("should not be called")
}

async fn envelope(
&self,
entity_id: EntityId,
Expand Down
2 changes: 1 addition & 1 deletion akka-persistence-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub struct TimestampOffset {
pub seen: Vec<(PersistenceId, u64)>,
}

#[derive(Deserialize, Serialize)]
#[derive(Clone, Deserialize, Serialize)]
pub enum Offset {
/// Corresponds to an ordered sequence number for the events. Note that the corresponding
/// offset of each event is provided in an Envelope,
Expand Down
5 changes: 4 additions & 1 deletion akka-projection-rs-commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ edition = "2021"
[dependencies]
async-stream = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
serde = { workspace = true }
smol_str = { workspace = true }
streambed = { workspace = true }
streambed-logged = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }

akka-persistence-rs = { path = "../akka-persistence-rs" }
akka-persistence-rs-commitlog = { path = "../akka-persistence-rs-commitlog" }
akka-projection-rs = { path = "../akka-projection-rs" }

[dev-dependencies]
chrono = { workspace = true }
env_logger = { workspace = true }
streambed-logged = { workspace = true }
test-log = { workspace = true }
Expand Down
Loading

0 comments on commit 82dba87

Please sign in to comment.