diff --git a/Cargo.toml b/Cargo.toml index 28fcd80..fdba106 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,10 @@ members = [ "akka-persistence-rs", "akka-persistence-rs-commitlog", + "akka-projection-rs", + "akka-projection-rs-commitlog", + "akka-projection-rs-grpc", + "akka-projection-rs-storage", "examples/iot-service", ] diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 31cb6a9..f6c5e25 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -1,6 +1,9 @@ #![doc = include_str!("../README.md")] -use akka_persistence_rs::{entity_manager::RecordAdapter, EntityId, Record}; +use akka_persistence_rs::{ + entity_manager::{EventEnvelope, Handler, SourceProvider}, + EntityId, +}; use async_stream::stream; use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; @@ -11,11 +14,11 @@ use streambed::{ }; use tokio_stream::{Stream, StreamExt}; -/// Provides the ability to transform the the memory representation of Akka Persistence records from +/// Provides the ability to transform the the memory representation of Akka Persistence events from /// and to the records that a CommitLog expects. Given the "cbor" feature, we use CBOR for serialization. /// Encryption/decryption to commit log records is also applied. Therefore a secret store is expected. #[async_trait] -pub trait CommitLogRecordMarshaler +pub trait CommitLogEventEnvelopeMarshaler where for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { @@ -24,9 +27,9 @@ where /// Provide a key we can use for the purposes of log compaction. /// A key would generally comprise and event type value held in /// the high bits, and the entity id in the lower bits. - fn to_compaction_key(record: &Record) -> Option; + fn to_compaction_key(envelope: &EventEnvelope) -> Option; - /// Extract an entity id from a consumer record. + /// Extract an entity id from a consumer envelope. fn to_entity_id(record: &ConsumerRecord) -> Option; /// Return a reference to a secret store for encryption/decryption. @@ -37,7 +40,11 @@ where fn secret_path(&self, entity_id: &EntityId) -> Arc; #[cfg(feature = "cbor")] - async fn record(&self, entity_id: EntityId, mut record: ConsumerRecord) -> Option> { + async fn envelope( + &self, + entity_id: EntityId, + mut record: ConsumerRecord, + ) -> Option> { streambed::decrypt_buf( self.secret_store(), &self.secret_path(&entity_id), @@ -45,28 +52,32 @@ where |value| ciborium::de::from_reader(value), ) .await - .map(|event| Record::new(entity_id, event)) + .map(|event| EventEnvelope::new(entity_id, event)) } #[cfg(not(feature = "cbor"))] - async fn record(&self, entity_id: EntityId, record: ConsumerRecord) -> Option>; + async fn envelope( + &self, + entity_id: EntityId, + record: ConsumerRecord, + ) -> Option>; #[cfg(feature = "cbor")] async fn producer_record( &self, topic: Topic, - record: Record, - ) -> Option<(ProducerRecord, Record)> { - let key = Self::to_compaction_key(&record)?; + envelope: EventEnvelope, + ) -> Option<(ProducerRecord, EventEnvelope)> { + let key = Self::to_compaction_key(&envelope)?; let buf = streambed::encrypt_struct( self.secret_store(), - &self.secret_path(&record.entity_id), + &self.secret_path(&envelope.entity_id), |event| { let mut buf = Vec::new(); ciborium::ser::into_writer(event, &mut buf).map(|_| buf) }, rand::thread_rng, - &record.event, + &envelope.event, ) .await?; Some(( @@ -78,7 +89,7 @@ where value: buf, partition: 0, }, - record, + envelope, )) } @@ -86,8 +97,8 @@ where async fn producer_record( &self, topic: Topic, - record: Record, - ) -> Option<(ProducerRecord, Record)>; + envelope: EventEnvelope, + ) -> Option<(ProducerRecord, EventEnvelope)>; } /// Adapts a Streambed CommitLog for use with Akka Persistence. @@ -99,14 +110,14 @@ where /// As CommitLog is intended for use at the edge, we assume /// that all entities will be event sourced into memory. /// -/// Developers are required to provide implementations of [CommitLogRecordMarshaler] -/// for bytes and records i.e. deserialization/decryption and +/// Developers are required to provide implementations of [CommitLogEventEnvelopeMarshaler] +/// for bytes and events i.e. deserialization/decryption and /// serialization/encryption respectively, along with CommitLog's /// use of keys for compaction including the storage of entities. pub struct CommitLogTopicAdapter where CL: CommitLog, - M: CommitLogRecordMarshaler, + M: CommitLogEventEnvelopeMarshaler, for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { commit_log: CL, @@ -119,7 +130,7 @@ where impl CommitLogTopicAdapter where CL: CommitLog, - M: CommitLogRecordMarshaler, + M: CommitLogEventEnvelopeMarshaler, for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: TopicRef) -> Self { @@ -134,15 +145,15 @@ where } #[async_trait] -impl RecordAdapter for CommitLogTopicAdapter +impl SourceProvider for CommitLogTopicAdapter where CL: CommitLog, - M: CommitLogRecordMarshaler + Send + Sync, + M: CommitLogEventEnvelopeMarshaler + Send + Sync, for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { - async fn produce_initial( + async fn source_initial( &mut self, - ) -> io::Result> + Send + 'async_trait>>> { + ) -> io::Result> + Send + 'async_trait>>> { let consumer_records = produce_to_last_offset( &self.commit_log, &self.consumer_group_name, @@ -156,10 +167,10 @@ where Ok(Box::pin(stream!({ while let Some(consumer_record) = consumer_records.next().await { if let Some(record_entity_id) = M::to_entity_id(&consumer_record) { - if let Some(record) = - marshaler.record(record_entity_id, consumer_record).await + if let Some(envelope) = + marshaler.envelope(record_entity_id, consumer_record).await { - yield record; + yield envelope; } } } @@ -169,10 +180,10 @@ where } } - async fn produce( + async fn source( &mut self, entity_id: &EntityId, - ) -> io::Result> + Send + 'async_trait>>> { + ) -> io::Result> + Send + 'async_trait>>> { let consumer_records = produce_to_last_offset( &self.commit_log, &self.consumer_group_name, @@ -187,10 +198,10 @@ where while let Some(consumer_record) = consumer_records.next().await { if let Some(record_entity_id) = M::to_entity_id(&consumer_record) { if &record_entity_id == entity_id { - if let Some(record) = - marshaler.record(record_entity_id, consumer_record).await + if let Some(envelope) = + marshaler.envelope(record_entity_id, consumer_record).await { - yield record; + yield envelope; } } } @@ -200,29 +211,6 @@ where Ok(Box::pin(tokio_stream::empty())) } } - - async fn process(&mut self, record: Record) -> io::Result> { - let (producer_record, record) = self - .marshaler - .producer_record(self.topic.clone(), record) - .await - .ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "A problem occurred converting a record when producing", - ) - })?; - self.commit_log - .produce(producer_record) - .await - .map(|_| record) - .map_err(|_| { - io::Error::new( - io::ErrorKind::Other, - "A problem occurred producing a record", - ) - }) - } } async fn produce_to_last_offset<'async_trait>( @@ -258,12 +246,43 @@ async fn produce_to_last_offset<'async_trait>( } } +#[async_trait] +impl Handler for CommitLogTopicAdapter +where + CL: CommitLog, + M: CommitLogEventEnvelopeMarshaler + Send + Sync, + for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, +{ + async fn process(&mut self, envelope: EventEnvelope) -> io::Result> { + let (producer_record, envelope) = self + .marshaler + .producer_record(self.topic.clone(), envelope) + .await + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "A problem occurred converting a envelope when producing", + ) + })?; + self.commit_log + .produce(producer_record) + .await + .map(|_| envelope) + .map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "A problem occurred producing a envelope", + ) + }) + } +} + #[cfg(test)] mod tests { use std::{env, fs, num::NonZeroUsize, time::Duration}; use super::*; - use akka_persistence_rs::{entity::EventSourcedBehavior, entity_manager, RecordMetadata}; + use akka_persistence_rs::{entity::EventSourcedBehavior, entity_manager}; use serde::Deserialize; use streambed::{ commit_log::Header, @@ -300,7 +319,7 @@ mod tests { fn on_event( _context: &akka_persistence_rs::entity::Context, _state: &mut Self::State, - _event: &Self::Event, + _event: Self::Event, ) { todo!() } @@ -308,12 +327,12 @@ mod tests { // Developers are expected to provide a marshaler of events. // The marshaler is responsible for more than just the serialization - // of a record. Extracting/saving an entity id and determining other + // of an envelope. Extracting/saving an entity id and determining other // metadata is also important. We would also expect to see any encryption // and decyption being performed by the marshaler. // The example here overrides the default methods of the marshaler and // effectively ignores the use of a secret key; just to prove that you really - // can lay out a record any way that you would like to. Note that secret keys + // can lay out an envelope any way that you would like to. Note that secret keys // are important though. #[derive(Clone)] @@ -366,10 +385,10 @@ mod tests { struct MyEventMarshaler; #[async_trait] - impl CommitLogRecordMarshaler for MyEventMarshaler { + impl CommitLogEventEnvelopeMarshaler for MyEventMarshaler { type SecretStore = NoopSecretStore; - fn to_compaction_key(_record: &Record) -> Option { + fn to_compaction_key(_envelope: &EventEnvelope) -> Option { panic!("should not be called") } @@ -389,30 +408,28 @@ mod tests { panic!("should not be called") } - async fn record( + async fn envelope( &self, entity_id: EntityId, record: ConsumerRecord, - ) -> Option> { + ) -> Option> { let value = String::from_utf8(record.value).ok()?; let event = MyEvent { value }; - Some(Record { + Some(EventEnvelope { entity_id, event, - metadata: RecordMetadata { - deletion_event: false, - }, + deletion_event: false, }) } async fn producer_record( &self, topic: Topic, - record: Record, - ) -> Option<(ProducerRecord, Record)> { + envelope: EventEnvelope, + ) -> Option<(ProducerRecord, EventEnvelope)> { let headers = vec![Header { key: "entity-id".to_string(), - value: record.entity_id.as_bytes().into(), + value: envelope.entity_id.as_bytes().into(), }]; Some(( ProducerRecord { @@ -420,10 +437,10 @@ mod tests { headers, timestamp: None, key: 0, - value: record.event.value.clone().into_bytes(), + value: envelope.event.value.clone().into_bytes(), partition: 0, }, - record, + envelope, )) } } @@ -454,14 +471,14 @@ mod tests { // Produce a stream given no prior persistence. Should return an empty stream. { - let mut records = adapter.produce_initial().await.unwrap(); - assert!(records.next().await.is_none()); + let mut envelopes = adapter.source_initial().await.unwrap(); + assert!(envelopes.next().await.is_none()); } // Process some events and then produce a stream. - let record = adapter - .process(Record::new( + let envelope = adapter + .process(EventEnvelope::new( entity_id.clone(), MyEvent { value: "first-event".to_string(), @@ -469,10 +486,10 @@ mod tests { )) .await .unwrap(); - assert_eq!(record.entity_id, entity_id); + assert_eq!(envelope.entity_id, entity_id); - let record = adapter - .process(Record::new( + let envelope = adapter + .process(EventEnvelope::new( entity_id.clone(), MyEvent { value: "second-event".to_string(), @@ -480,12 +497,12 @@ mod tests { )) .await .unwrap(); - assert_eq!(record.entity_id, entity_id); + assert_eq!(envelope.entity_id, entity_id); // Produce to a different entity id, so that we can test out the filtering next. adapter - .process(Record::new( + .process(EventEnvelope::new( "some-other-entity-id", MyEvent { value: "third-event".to_string(), @@ -494,7 +511,7 @@ mod tests { .await .unwrap(); - // Wait until the number of records reported as being written is the number + // Wait until the number of events reported as being written is the number // that we have produced. We should then return those events that have been // produced. @@ -510,17 +527,17 @@ mod tests { } { - let mut records = adapter.produce(&entity_id).await.unwrap(); + let mut envelopes = adapter.source(&entity_id).await.unwrap(); - let record = records.next().await.unwrap(); - assert_eq!(record.entity_id, entity_id); - assert_eq!(record.event.value, "first-event"); + let envelope = envelopes.next().await.unwrap(); + assert_eq!(envelope.entity_id, entity_id); + assert_eq!(envelope.event.value, "first-event"); - let record = records.next().await.unwrap(); - assert_eq!(record.entity_id, entity_id); - assert_eq!(record.event.value, "second-event"); + let envelope = envelopes.next().await.unwrap(); + assert_eq!(envelope.entity_id, entity_id); + assert_eq!(envelope.event.value, "second-event"); - assert!(records.next().await.is_none()); + assert!(envelopes.next().await.is_none()); } } diff --git a/akka-persistence-rs/Cargo.toml b/akka-persistence-rs/Cargo.toml index 8681d20..e32a0e3 100644 --- a/akka-persistence-rs/Cargo.toml +++ b/akka-persistence-rs/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] async-trait = { workspace = true } +chrono = { workspace = true } lru = { workspace = true } smol_str = { workspace = true } tokio = { workspace = true, features = [ diff --git a/akka-persistence-rs/benches/benches.rs b/akka-persistence-rs/benches/benches.rs index b7cb36f..c1921cf 100644 --- a/akka-persistence-rs/benches/benches.rs +++ b/akka-persistence-rs/benches/benches.rs @@ -3,8 +3,8 @@ use std::{io, num::NonZeroUsize, pin::Pin, sync::Arc}; use akka_persistence_rs::{ effect::{emit_event, Effect, EffectExt}, entity::{Context, EventSourcedBehavior}, - entity_manager::{self, RecordAdapter}, - EntityId, Message, Record, + entity_manager::{self, EventEnvelope, Handler, SourceProvider}, + EntityId, Message, }; use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; @@ -35,7 +35,7 @@ impl EventSourcedBehavior for Behavior { emit_event(Event).boxed() } - fn on_event(_context: &Context, _state: &mut Self::State, _event: &Self::Event) {} + fn on_event(_context: &Context, _state: &mut Self::State, _event: Self::Event) {} } struct Adapter { @@ -44,27 +44,33 @@ struct Adapter { } #[async_trait] -impl RecordAdapter for Adapter { - async fn produce_initial( +impl SourceProvider for Adapter { + async fn source_initial( &mut self, - ) -> io::Result> + Send + 'async_trait>>> { + ) -> io::Result> + Send + 'async_trait>>> { Ok(Box::pin(tokio_stream::empty())) } - async fn produce( + async fn source( &mut self, _entity_id: &EntityId, - ) -> io::Result> + Send + 'async_trait>>> { + ) -> io::Result> + Send + 'async_trait>>> { Ok(Box::pin(tokio_stream::empty())) } +} - async fn process(&mut self, record: Record) -> io::Result> { +#[async_trait] +impl Handler for Adapter { + async fn process( + &mut self, + envelope: EventEnvelope, + ) -> io::Result> { self.event_count += 1; if self.event_count == NUM_EVENTS { self.events_processed.notify_one(); self.event_count = 0; } - Ok(record) + Ok(envelope) } } diff --git a/akka-persistence-rs/src/effect.rs b/akka-persistence-rs/src/effect.rs index cbf8da5..f9f7ba2 100644 --- a/akka-persistence-rs/src/effect.rs +++ b/akka-persistence-rs/src/effect.rs @@ -6,7 +6,11 @@ use lru::LruCache; use std::{future::Future, io, marker::PhantomData}; use tokio::sync::oneshot; -use crate::{entity::EventSourcedBehavior, entity_manager::RecordAdapter, EntityId, Record}; +use crate::{ + entity::EventSourcedBehavior, + entity_manager::{EventEnvelope, Handler}, + EntityId, +}; /// Errors that can occur when applying effects. pub enum Error { @@ -27,11 +31,11 @@ where async fn process( &mut self, behavior: &B, - adapter: &mut (dyn RecordAdapter + Send), + handler: &mut (dyn Handler + Send), entities: &mut LruCache, entity_id: &EntityId, prev_result: Result, - update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, EventEnvelope) + Send), ) -> Result; } @@ -54,18 +58,18 @@ where async fn process( &mut self, behavior: &B, - adapter: &mut (dyn RecordAdapter + Send), + handler: &mut (dyn Handler + Send), entities: &mut LruCache, entity_id: &EntityId, prev_result: Result, - update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, EventEnvelope) + Send), ) -> Result { let r = self ._l .process( behavior, - adapter, + handler, entities, entity_id, prev_result, @@ -74,7 +78,7 @@ where .await; if r.is_ok() { self._r - .process(behavior, adapter, entities, entity_id, r, update_entity) + .process(behavior, handler, entities, entity_id, r, update_entity) .await } else { r @@ -138,25 +142,23 @@ where async fn process( &mut self, _behavior: &B, - adapter: &mut (dyn RecordAdapter + Send), + handler: &mut (dyn Handler + Send), entities: &mut LruCache, entity_id: &EntityId, prev_result: Result, - update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, EventEnvelope) + Send), ) -> Result { if prev_result.is_ok() { if let Some(event) = self.event.take() { - let record = Record { + let envelope = EventEnvelope { entity_id: entity_id.clone(), event, - metadata: crate::RecordMetadata { - deletion_event: self.deletion_event, - }, + deletion_event: self.deletion_event, }; - let result = adapter.process(record).await.map_err(Error::IoError); - if let Ok(record) = result { - update_entity(entities, record); + let result = handler.process(envelope).await.map_err(Error::IoError); + if let Ok(envelope) = result { + update_entity(entities, envelope); Ok(()) } else { result.map(|_| ()) @@ -221,11 +223,11 @@ where async fn process( &mut self, _behavior: &B, - _adapter: &mut (dyn RecordAdapter + Send), + _handler: &mut (dyn Handler + Send), _entities: &mut LruCache, _entity_id: &EntityId, prev_result: Result, - _update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + _update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, EventEnvelope) + Send), ) -> Result { if prev_result.is_ok() { @@ -245,7 +247,7 @@ where { } -/// An effect to reply a record if the previous effect completed +/// An effect to reply an envelope if the previous effect completed /// successfully. Note that if state from having emitted an event /// via a prior effect is required, then use a [then] effect instead. pub fn reply(reply_to: oneshot::Sender, reply: T) -> Reply { @@ -273,11 +275,11 @@ where async fn process( &mut self, behavior: &B, - _adapter: &mut (dyn RecordAdapter + Send), + _handler: &mut (dyn Handler + Send), entities: &mut LruCache, entity_id: &EntityId, prev_result: Result, - _update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + _update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, EventEnvelope) + Send), ) -> Result { let f = self.f.take(); @@ -331,11 +333,11 @@ where async fn process( &mut self, _behavior: &B, - _adapter: &mut (dyn RecordAdapter + Send), + _handler: &mut (dyn Handler + Send), _entities: &mut LruCache, _entity_id: &EntityId, _prev_result: Result, - _update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, Record) + _update_entity: &mut (dyn for<'a> FnMut(&'a mut LruCache, EventEnvelope) + Send), ) -> Result { Ok(()) diff --git a/akka-persistence-rs/src/entity.rs b/akka-persistence-rs/src/entity.rs index 6e69720..912fb6a 100644 --- a/akka-persistence-rs/src/entity.rs +++ b/akka-persistence-rs/src/entity.rs @@ -44,7 +44,7 @@ pub trait EventSourcedBehavior { /// Given a state and event, modify state, which could indicate transition to /// the next state. No side effects are to be performed. Can be used to replay /// events to attain a new state i.e. the major function of event sourcing. - fn on_event(context: &Context, state: &mut Self::State, event: &Self::Event); + fn on_event(context: &Context, state: &mut Self::State, event: Self::Event); /// The entity will always receive a "recovery completed" signal, even if there /// are no events sourced, or if it’s a new entity with a previously unused EntityId. diff --git a/akka-persistence-rs/src/entity_manager.rs b/akka-persistence-rs/src/entity_manager.rs index c3265c3..edadabe 100644 --- a/akka-persistence-rs/src/entity_manager.rs +++ b/akka-persistence-rs/src/entity_manager.rs @@ -15,31 +15,59 @@ use tokio_stream::{Stream, StreamExt}; use crate::entity::Context; use crate::entity::EventSourcedBehavior; -use crate::{EntityId, Message, Record}; +use crate::{EntityId, Message}; + +/// An envelope wraps an event associated with a specific entity. +#[derive(Clone, Debug, PartialEq)] +pub struct EventEnvelope { + /// Flags whether the associated event is to be considered + /// as one that represents an entity instance being deleted. + pub deletion_event: bool, + pub entity_id: EntityId, + pub event: E, +} -/// Adapts events, in form of records, to another type of record e.g. those -/// that can be persisted using a storage technology. +impl EventEnvelope { + pub fn new(entity_id: EI, event: E) -> Self + where + EI: Into, + { + Self { + deletion_event: false, + entity_id: entity_id.into(), + event, + } + } +} + +/// Sources events, in form of event envelopes, to another type of envelope e.g. those +/// that can be sourced using a storage technology. #[async_trait] -pub trait RecordAdapter { +pub trait SourceProvider { /// Produce an initial source of events, which is called upon an entity /// manager task starting up. Any error from this method is considered fatal /// and will terminate the entity manager. - async fn produce_initial( + async fn source_initial( &mut self, - ) -> io::Result> + Send + 'async_trait>>>; + ) -> io::Result> + Send + 'async_trait>>>; /// Produce a source of events. An entity id /// is passed to the source method so that the source is /// discriminate regarding the entity events to supply. - async fn produce( + async fn source( &mut self, entity_id: &EntityId, - ) -> io::Result> + Send + 'async_trait>>>; + ) -> io::Result> + Send + 'async_trait>>>; +} - /// Consume a record, performing some processing - /// e.g. persisting a record, and then returning the same record +/// Handles events, in form of event envelopes, to another type of envelope e.g. those +/// that can be persisted using a storage technology. +#[async_trait] +pub trait Handler { + /// Consume an envelope, performing some processing + /// e.g. persisting an envelope, and then returning the same envelope /// if all went well. - async fn process(&mut self, record: Record) -> io::Result>; + async fn process(&mut self, envelope: EventEnvelope) -> io::Result>; } /// Manages the lifecycle of entities given a specific behavior. @@ -59,16 +87,16 @@ pub async fn run( B: EventSourcedBehavior + Send + Sync + 'static, B::Command: Send, B::State: Send + Sync, - A: RecordAdapter + Send + 'static, + A: SourceProvider + Handler + Send + 'static, { // Source our initial events and populate our internal entities map. let mut entities = LruCache::new(capacity); - if let Ok(records) = adapter.produce_initial().await { - tokio::pin!(records); - while let Some(record) = records.next().await { - update_entity::(&mut entities, record); + if let Ok(envelopes) = adapter.source_initial().await { + tokio::pin!(envelopes); + while let Some(envelope) = envelopes.next().await { + update_entity::(&mut entities, envelope); } for (entity_id, state) in entities.iter() { let context = Context { entity_id }; @@ -87,10 +115,10 @@ pub async fn run( let mut state = entities.get(&message.entity_id); if state.is_none() { - if let Ok(records) = adapter.produce(&message.entity_id).await { - tokio::pin!(records); - while let Some(record) = records.next().await { - update_entity::(&mut entities, record); + if let Ok(envelopes) = adapter.source(&message.entity_id).await { + tokio::pin!(envelopes); + while let Some(envelope) = envelopes.next().await { + update_entity::(&mut entities, envelope); } state = entities.get(&message.entity_id); let context = Context { @@ -122,7 +150,7 @@ pub async fn run( &mut entities, context.entity_id, Ok(()), - &mut |entities, record| update_entity::(entities, record), + &mut |entities, envelope| update_entity::(entities, envelope), ) .await; if result.is_err() { @@ -131,28 +159,27 @@ pub async fn run( } } -fn update_entity(entities: &mut LruCache, record: Record) +fn update_entity(entities: &mut LruCache, envelope: EventEnvelope) where B: EventSourcedBehavior + Send + Sync + 'static, B::State: Default, { - if !record.metadata.deletion_event { + if !envelope.deletion_event { // Apply an event to state, creating the entity entry if necessary. let context = Context { - entity_id: &record.entity_id, + entity_id: &envelope.entity_id, }; - let state = if let Some(state) = entities.get_mut(&record.entity_id) { + let state = if let Some(state) = entities.get_mut(&envelope.entity_id) { state } else { // We're avoiding the use of get_or_insert so that we can avoid // cloning the entity id unless necessary. - entities.push(record.entity_id.clone(), B::State::default()); - entities.get_mut(&record.entity_id).unwrap() + entities.push(envelope.entity_id.clone(), B::State::default()); + entities.get_mut(&envelope.entity_id).unwrap() }; - // let state = entities.get_or_insert_mut(record.entity_id, B::State::default); - B::on_event(&context, state, &record.event); + B::on_event(&context, state, envelope.event); } else { - entities.pop(&record.entity_id); + entities.pop(&envelope.entity_id); } } @@ -248,11 +275,11 @@ mod tests { } } - fn on_event(_context: &Context, state: &mut Self::State, event: &Self::Event) { + fn on_event(_context: &Context, state: &mut Self::State, event: Self::Event) { match event { TempEvent::Deregistered => state.registered = false, TempEvent::Registered => state.registered = true, - TempEvent::TemperatureUpdated { temp } => state.temp = *temp, + TempEvent::TemperatureUpdated { temp } => state.temp = temp, } } @@ -267,43 +294,49 @@ mod tests { } // The following adapter is not normally created by a developer, but we - // declare one here so that we can provide a source of records and capture + // declare one here so that we can provide a source of events and capture // ones emitted by the entity manager. - struct VecRecordAdapter { - initial_records: Option>>, - captured_records: mpsc::Sender>, + struct VecEventEnvelopeAdapter { + initial_events: Option>>, + captured_events: mpsc::Sender>, } #[async_trait] - impl RecordAdapter for VecRecordAdapter { - async fn produce_initial( + impl SourceProvider for VecEventEnvelopeAdapter { + async fn source_initial( &mut self, - ) -> io::Result> + Send + 'async_trait>>> + ) -> io::Result> + Send + 'async_trait>>> { - if let Some(records) = self.initial_records.take() { - Ok(Box::pin(tokio_stream::iter(records))) + if let Some(events) = self.initial_events.take() { + Ok(Box::pin(tokio_stream::iter(events))) } else { Ok(Box::pin(tokio_stream::empty())) } } - async fn produce( + async fn source( &mut self, _entity_id: &EntityId, - ) -> io::Result> + Send + 'async_trait>>> + ) -> io::Result> + Send + 'async_trait>>> { Ok(Box::pin(tokio_stream::empty())) } + } - async fn process(&mut self, record: Record) -> io::Result> { - self.captured_records - .send(record.clone()) + #[async_trait] + impl Handler for VecEventEnvelopeAdapter { + async fn process( + &mut self, + envelope: EventEnvelope, + ) -> io::Result> { + self.captured_events + .send(envelope.clone()) .await - .map(|_| record) + .map(|_| envelope) .map_err(|_| { io::Error::new( io::ErrorKind::Other, - "A problem occurred processing a record", + "A problem occurred processing an envelope", ) }) } @@ -327,19 +360,19 @@ mod tests { }; let (temp_sensor_events, mut temp_sensor_events_captured) = mpsc::channel(4); - let temp_sensor_record_adapter = VecRecordAdapter { - initial_records: Some(vec![ - Record::new("id-1", TempEvent::Registered), - Record::new("id-1", TempEvent::TemperatureUpdated { temp: 10 }), + let temp_sensor_event_adapter = VecEventEnvelopeAdapter { + initial_events: Some(vec![ + EventEnvelope::new("id-1", TempEvent::Registered), + EventEnvelope::new("id-1", TempEvent::TemperatureUpdated { temp: 10 }), ]), - captured_records: temp_sensor_events, + captured_events: temp_sensor_events, }; let (temp_sensor, temp_sensor_receiver) = mpsc::channel(10); let entity_manager_task = tokio::spawn(run( temp_sensor_behavior, - temp_sensor_record_adapter, + temp_sensor_event_adapter, temp_sensor_receiver, NonZeroUsize::new(1).unwrap(), )); @@ -425,25 +458,23 @@ mod tests { assert_eq!( temp_sensor_events_captured.recv().await.unwrap(), - Record::new("id-1", TempEvent::TemperatureUpdated { temp: 32 }) + EventEnvelope::new("id-1", TempEvent::TemperatureUpdated { temp: 32 }) ); assert_eq!( temp_sensor_events_captured.recv().await.unwrap(), - Record::new("id-1", TempEvent::TemperatureUpdated { temp: 64 }) + EventEnvelope::new("id-1", TempEvent::TemperatureUpdated { temp: 64 }) ); assert_eq!( temp_sensor_events_captured.recv().await.unwrap(), - Record { + EventEnvelope { entity_id: EntityId::from("id-1"), event: TempEvent::Deregistered, - metadata: crate::RecordMetadata { - deletion_event: true - } + deletion_event: true } ); assert_eq!( temp_sensor_events_captured.recv().await.unwrap(), - Record::new("id-2", TempEvent::Registered) + EventEnvelope::new("id-2", TempEvent::Registered) ); assert!(temp_sensor_events_captured.recv().await.is_none()); } diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index fdd2a5f..6d5db67 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -4,9 +4,18 @@ pub mod effect; pub mod entity; pub mod entity_manager; +/// Uniquely identifies the type of an Entity. +pub type EntityType = smol_str::SmolStr; + /// Uniquely identifies an entity, or entity instance. pub type EntityId = smol_str::SmolStr; +/// A namespaced entity id given an entity type. +pub struct PersistenceId { + pub entity_type: EntityType, + pub entity_id: EntityId, +} + /// A message encapsulates a command that is addressed to a specific entity. #[derive(Debug, PartialEq)] pub struct Message { @@ -25,34 +34,3 @@ impl Message { } } } - -/// Additional information associated with a record. -#[derive(Clone, Debug, PartialEq)] -pub struct RecordMetadata { - /// Flags whether the associated event is to be considered - /// as one that represents an entity instance being deleted. - pub deletion_event: bool, -} - -/// A record is an event associated with a specific entity. -#[derive(Clone, Debug, PartialEq)] -pub struct Record { - pub entity_id: EntityId, - pub event: E, - pub metadata: RecordMetadata, -} - -impl Record { - pub fn new(entity_id: EI, event: E) -> Self - where - EI: Into, - { - Self { - entity_id: entity_id.into(), - event, - metadata: RecordMetadata { - deletion_event: false, - }, - } - } -} diff --git a/akka-projection-rs-commitlog/Cargo.toml b/akka-projection-rs-commitlog/Cargo.toml new file mode 100644 index 0000000..554a4ad --- /dev/null +++ b/akka-projection-rs-commitlog/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "akka-projection-rs-commitlog" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { workspace = true } +streambed = { workspace = true } +tokio-stream = { workspace = true } + +akka-persistence-rs = { path = "../akka-persistence-rs" } +akka-persistence-rs-commitlog = { path = "../akka-persistence-rs-commitlog" } +akka-projection-rs = { path = "../akka-projection-rs" } diff --git a/akka-projection-rs-commitlog/README.md b/akka-projection-rs-commitlog/README.md new file mode 100644 index 0000000..811b777 --- /dev/null +++ b/akka-projection-rs-commitlog/README.md @@ -0,0 +1,4 @@ +akka-projection-rs-commitlog +=== + +Uses [streambed](https://github.com/streambed/streambed-rs)'s `CommitLog` to persist projection offsets. \ No newline at end of file diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs new file mode 100644 index 0000000..2d54449 --- /dev/null +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -0,0 +1,51 @@ +#![doc = include_str!("../README.md")] + +use std::{marker::PhantomData, pin::Pin}; + +use akka_persistence_rs::{entity_manager::EventEnvelope, EntityType}; +use akka_persistence_rs_commitlog::CommitLogEventEnvelopeMarshaler; +use akka_projection_rs::{Offset, SourceProvider}; +use serde::{de::DeserializeOwned, Serialize}; +use streambed::commit_log::{CommitLog, Topic, TopicRef}; +use tokio_stream::Stream; + +/// Source events for a projection from a Streambed commit log. +pub struct CommitLogSourceProvider { + _commit_log: CL, + _consumer_group_name: String, + _marshaler: M, + _topic: Topic, + phantom: PhantomData, +} + +impl CommitLogSourceProvider { + pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: TopicRef) -> Self { + Self { + _commit_log: commit_log, + _consumer_group_name: consumer_group_name.into(), + _marshaler: marshaler, + _topic: topic.into(), + phantom: PhantomData, + } + } +} + +impl SourceProvider for CommitLogSourceProvider +where + CL: CommitLog, + M: CommitLogEventEnvelopeMarshaler, + for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, +{ + type Envelope = EventEnvelope; + + type Offset = Offset; + + fn events_by_slices( + _entity_type: EntityType, + _min_slice: u32, + _max_slice: u32, + _offset: Offset, + ) -> Pin> + Send>> { + todo!() + } +} diff --git a/akka-projection-rs-grpc/Cargo.toml b/akka-projection-rs-grpc/Cargo.toml new file mode 100644 index 0000000..02ee8e2 --- /dev/null +++ b/akka-projection-rs-grpc/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "akka-projection-rs-grpc" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio-stream = { workspace = true } + +akka-persistence-rs = { path = "../akka-persistence-rs" } +akka-projection-rs = { path = "../akka-projection-rs" } diff --git a/akka-projection-rs-grpc/README.md b/akka-projection-rs-grpc/README.md new file mode 100644 index 0000000..811b777 --- /dev/null +++ b/akka-projection-rs-grpc/README.md @@ -0,0 +1,4 @@ +akka-projection-rs-commitlog +=== + +Uses [streambed](https://github.com/streambed/streambed-rs)'s `CommitLog` to persist projection offsets. \ No newline at end of file diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs new file mode 100644 index 0000000..7396d3b --- /dev/null +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -0,0 +1,26 @@ +use std::{marker::PhantomData, pin::Pin}; + +use akka_persistence_rs::{entity_manager::EventEnvelope, EntityType}; +use akka_projection_rs::{Offset, SourceProvider}; +use tokio_stream::Stream; + +pub struct GrpcSourceProvider { + phantom: PhantomData, +} + +impl SourceProvider for GrpcSourceProvider { + /// The envelope processed by the provider. + type Envelope = EventEnvelope; + + /// The type that describes offsets into a journal + type Offset = Offset; + + fn events_by_slices( + _entity_type: EntityType, + _min_slice: u32, + _max_slice: u32, + _offset: Offset, + ) -> Pin> + Send>> { + todo!() + } +} diff --git a/akka-projection-rs-grpc/src/lib.rs b/akka-projection-rs-grpc/src/lib.rs new file mode 100644 index 0000000..909a150 --- /dev/null +++ b/akka-projection-rs-grpc/src/lib.rs @@ -0,0 +1,3 @@ +#![doc = include_str!("../README.md")] + +pub mod consumer; diff --git a/akka-projection-rs-storage/Cargo.toml b/akka-projection-rs-storage/Cargo.toml new file mode 100644 index 0000000..6ca5cfc --- /dev/null +++ b/akka-projection-rs-storage/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "akka-projection-rs-storage" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = { workspace = true } +tokio = { workspace = true } + +akka-projection-rs = { path = "../akka-projection-rs" } diff --git a/akka-projection-rs-storage/README.md b/akka-projection-rs-storage/README.md new file mode 100644 index 0000000..c9f6fa1 --- /dev/null +++ b/akka-projection-rs-storage/README.md @@ -0,0 +1,4 @@ +akka-projection-rs-storage +=== + +Uses [streambed-storage](https://github.com/streambed/streambed-rs/streambed-storage) to persist projection offsets. \ No newline at end of file diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs new file mode 100644 index 0000000..9773450 --- /dev/null +++ b/akka-projection-rs-storage/src/lib.rs @@ -0,0 +1,17 @@ +#![doc = include_str!("../README.md")] + +use akka_projection_rs::{Handler, SourceProvider}; +use tokio::sync::mpsc::Receiver; + +/// The commands that a projection task is receptive to. +pub enum Command { + Stop, +} + +/// Provides local file system based storage for projection offsets. +pub async fn run(_receiver: Receiver, _source_provider: SP, _handler: H) +where + H: Handler, + SP: SourceProvider, +{ +} diff --git a/akka-projection-rs/Cargo.toml b/akka-projection-rs/Cargo.toml new file mode 100644 index 0000000..d25b922 --- /dev/null +++ b/akka-projection-rs/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "akka-projection-rs" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = { workspace = true } +chrono = { workspace = true } +tokio = { workspace = true } +tokio-stream = { workspace = true } + +akka-persistence-rs = { path = "../akka-persistence-rs" } diff --git a/akka-projection-rs/src/lib.rs b/akka-projection-rs/src/lib.rs new file mode 100644 index 0000000..6f6c348 --- /dev/null +++ b/akka-projection-rs/src/lib.rs @@ -0,0 +1,78 @@ +//! In Akka Projections you process a stream of event envelopes from a source to a projected model or external system. +//! Each envelope is associated with an offset representing the position in the stream. This offset is used for resuming +//! the stream from that position when the projection is restarted. + +use std::pin::Pin; + +use akka_persistence_rs::EntityType; +use async_trait::async_trait; +use tokio_stream::Stream; + +pub enum Offset { + /// Corresponds to an ordered sequence number for the events. Note that the corresponding + /// offset of each event is provided in an Envelope, + /// which makes it possible to resume the stream at a later point from a given offset. + /// + /// The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + /// in the returned stream. This means that you can use the offset that is returned in an `Envelope` + /// as the `offset` parameter in a subsequent query. + /// + Sequence(u64), +} + +/// Errors for event processing by a handler. +pub struct HandlerError; + +/// Handle event envelopes in any way that an application requires. +#[async_trait] +pub trait Handler { + /// The envelope processed by the handler. + type Envelope; + + /// Process an envelope. + /// An offset will be persisted given a successful result. + async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError>; +} + +/// Errors for event processing by a handler. +pub struct SourceProviderError; + +/// Provides a source of envelopes. +#[async_trait] +pub trait SourceProvider { + /// The envelope processed by the provider. + type Envelope; + + /// The type that describes offsets into a journal + type Offset; + + /// Query events for given slices. A slice is deterministically defined based on the persistence id. The purpose is to + /// evenly distribute all persistence ids over the slices. + /// + /// The consumer can keep track of its current position in the event stream by storing the `offset` and restart the + /// query from a given `offset` after a crash/restart. + /// + /// The exact meaning of the `offset` depends on the journal and must be documented by it. It may + /// be a sequential id number that uniquely identifies the position of each event within the event stream. Distributed + /// data stores cannot easily support those semantics and they may use a weaker meaning. For example it may be a + /// timestamp (taken when the event was created or stored). Timestamps are not unique and not strictly ordered, since + /// clocks on different machines may not be synchronized. + /// + /// In strongly consistent stores, where the `offset` is unique and strictly ordered, the stream should start from the + /// next event after the `offset`. Otherwise, the read journal should ensure that between an invocation that returned + /// an event with the given `offset`, and this invocation, no events are missed. Depending on the journal + /// implementation, this may mean that this invocation will return events that were already returned by the previous + /// invocation, including the event with the passed in `offset`. + /// + /// The returned event stream should be ordered by `offset` if possible, but this can also be difficult to fulfill for + /// a distributed data store. The order must be documented by the journal implementation. + /// + /// The stream is not completed when it reaches the end of the currently stored events, but it continues to push new + /// events when new events are persisted. + fn events_by_slices( + entity_type: EntityType, + min_slice: u32, + max_slice: u32, + offset: Self::Offset, + ) -> Pin + Send>>; +} diff --git a/examples/iot-service/Cargo.toml b/examples/iot-service/Cargo.toml index 2f5c040..0208a0c 100644 --- a/examples/iot-service/Cargo.toml +++ b/examples/iot-service/Cargo.toml @@ -20,6 +20,7 @@ postcard = { workspace = true, default-features = false, features = [ rand = { workspace = true } scopeguard = { workspace = true } serde = { workspace = true, features = ["derive"] } +smol_str = { workspace = true, features = ["serde"] } streambed = { workspace = true } streambed-confidant = { workspace = true } streambed-logged = { workspace = true } @@ -32,6 +33,9 @@ akka-persistence-rs = { path = "../../akka-persistence-rs" } akka-persistence-rs-commitlog = { path = "../../akka-persistence-rs-commitlog", features = [ "cbor", ] } +akka-projection-rs = { path = "../../akka-projection-rs" } +akka-projection-rs-commitlog = { path = "../../akka-projection-rs-commitlog" } +akka-projection-rs-storage = { path = "../../akka-projection-rs-storage" } [dev-dependencies] env_logger = { workspace = true } diff --git a/examples/iot-service/README.md b/examples/iot-service/README.md index af35d77..c7126d8 100644 --- a/examples/iot-service/README.md +++ b/examples/iot-service/README.md @@ -27,6 +27,13 @@ RUST_LOG=debug cargo run -- \ --ss-root-path=/tmp/iot-service/var/lib/confidant ``` +We must first register the device ids that we wish to receive data for. This is a form +of authentication where, in the real-world, a shared key between the device and service +would be conveyed. That key would then be used to encrypt data. We simply use the key +as a registration mechanism and do not accept data for devices where we have no key. + +curl -v -d '"1"' -H"Content-Type: application/json" "127.0.0.1:8080/api/temperature" + You should now be able to query for the current state of a temperature sensor: ``` diff --git a/examples/iot-service/src/http_server.rs b/examples/iot-service/src/http_server.rs index 0e7f040..7e330ce 100644 --- a/examples/iot-service/src/http_server.rs +++ b/examples/iot-service/src/http_server.rs @@ -1,15 +1,20 @@ //! Handle http serving concerns //! -use crate::temperature; +use crate::{ + registration::{self, SecretDataValue}, + temperature, +}; use akka_persistence_rs::Message; +use rand::RngCore; use tokio::sync::{mpsc, oneshot}; use warp::{hyper::StatusCode, Filter, Rejection, Reply}; /// Declares routes to serve our HTTP interface. pub fn routes( + registration_command: mpsc::Sender>, temperature_command: mpsc::Sender>, ) -> impl Filter + Clone { - let get_database_route = { + let get_temperature_route = { warp::get() .and(warp::path::param()) .and(warp::path::end()) @@ -47,5 +52,34 @@ pub fn routes( }) }; - warp::path("api").and(warp::path("temperature").and(get_database_route)) + let post_registration_route = { + warp::post() + .and(warp::path::end()) + .and(warp::body::json()) + .then(move |id: String| { + let task_registration_command_command = registration_command.clone(); + async move { + // Generate a random key - a real world app might provide this instead. + let mut key = vec![0; 16]; + rand::thread_rng().fill_bytes(&mut key); + + let Ok(_) = task_registration_command_command + .send(Message::new( + id, + registration::Command::Register { secret: SecretDataValue::from(hex::encode(key)) }, + )) + .await else { + return warp::reply::with_status( + warp::reply::json(&"Service unavailable"), + StatusCode::SERVICE_UNAVAILABLE, + ) + }; + + warp::reply::with_status(warp::reply::json(&"Secret submitted"), StatusCode::OK) + } + }) + }; + + warp::path("api") + .and(warp::path("temperature").and(get_temperature_route.or(post_registration_route))) } diff --git a/examples/iot-service/src/main.rs b/examples/iot-service/src/main.rs index 5fd6ab0..dc91168 100644 --- a/examples/iot-service/src/main.rs +++ b/examples/iot-service/src/main.rs @@ -10,6 +10,8 @@ use streambed_logged::{args::CommitLogArgs, FileLog}; use tokio::{net::UdpSocket, sync::mpsc}; mod http_server; +mod registration; +mod registration_projection; mod temperature; mod udp_server; @@ -37,6 +39,8 @@ struct Args { udp_addr: SocketAddr, } +const MAX_REGISTRATION_MANAGER_COMMANDS: usize = 10; +const MAX_REGISTRATION_PROJECTION_MANAGER_COMMANDS: usize = 10; const MAX_TEMPERATURE_MANAGER_COMMANDS: usize = 10; #[tokio::main] @@ -67,12 +71,7 @@ async fn main() -> Result<(), Box> { ss }; - // To keep things straightforward for this example, we will use - // a random key to encrypt our events where we've not already - // written a key. In a real-world scenario, our temperature entities - // would be provisioned using some out-of-band mechanism. Upon - // provisioning, the secret store would be updated with a secret - // at a path that is specific for the entity. + // The path of our key we use to encrypt data at rest. let temperature_events_key_secret_path = format!("{}/secrets.temperature-events.key", args.ss_args.ss_ns); @@ -93,18 +92,46 @@ async fn main() -> Result<(), Box> { let (temperature_command, temperature_command_receiver) = mpsc::channel(MAX_TEMPERATURE_MANAGER_COMMANDS); + // Establish channels for the registrations + let (registration_command, registration_command_receiver) = + mpsc::channel(MAX_REGISTRATION_MANAGER_COMMANDS); + + // Establish channels for the registration projection + let (_registration_projection_command, registration_projection_command_receiver) = + mpsc::channel(MAX_REGISTRATION_PROJECTION_MANAGER_COMMANDS); + // Start up the http service - let routes = http_server::routes(temperature_command.clone()); + let routes = http_server::routes(registration_command, temperature_command.clone()); tokio::spawn(warp::serve(routes).run(args.http_addr)); info!("HTTP listening on {}", args.http_addr); // Start up the UDP service let socket = UdpSocket::bind(args.udp_addr).await?; - tokio::spawn(udp_server::task(socket, temperature_command)); + tokio::spawn(udp_server::task(socket, temperature_command.clone())); info!("UDP listening on {}", args.udp_addr); + // Start up a task to manage registrations + tokio::spawn(registration::task( + cl.clone(), + ss.clone(), + temperature_events_key_secret_path.clone(), + registration_command_receiver, + )) + .await?; + + // Start up a task to manage registration projections + tokio::spawn(registration_projection::task( + cl.clone(), + ss.clone(), + temperature_events_key_secret_path.clone(), + registration_projection_command_receiver, + temperature_command, + )) + .await?; + // All things started up but our temperature. We're running - // that in our main task. + // that in our main task. Therefore, we will return once the + // entity manager has finished. info!("IoT service ready"); @@ -114,6 +141,11 @@ async fn main() -> Result<(), Box> { temperature_events_key_secret_path, temperature_command_receiver, )) - .await - .map_err(|e| e.into()) + .await?; + + // If we get here then we are shutting down. Any other task, + // such as the projection one, will stop automatically given + // that its sender will be dropped. + + Ok(()) } diff --git a/examples/iot-service/src/registration.rs b/examples/iot-service/src/registration.rs new file mode 100644 index 0000000..dda8ec7 --- /dev/null +++ b/examples/iot-service/src/registration.rs @@ -0,0 +1,154 @@ +//! Handle a local registration entity for testing without a remote gRPC endpoint. +//! Illustrates the use of a [akka_projection_rs::eventsource::LocalSourceProvider] +//! +use std::{num::NonZeroUsize, sync::Arc}; + +use akka_persistence_rs::{ + effect::{emit_event, EffectExt}, + entity::EventSourcedBehavior, + entity_manager::{self, EventEnvelope}, + EntityId, Message, +}; +use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, CommitLogTopicAdapter}; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use smol_str::SmolStr; +use streambed::commit_log::Key; +use streambed_confidant::FileSecretStore; +use streambed_logged::{compaction::KeyBasedRetention, FileLog}; +use tokio::sync::mpsc; + +// Declare the entity for the purposes of retaining registration keys for devices. + +pub type SecretDataValue = SmolStr; + +#[derive(Default)] +pub struct State { + secret: SecretDataValue, +} + +pub enum Command { + Register { secret: SecretDataValue }, +} + +#[derive(Clone, Deserialize, Serialize)] +pub enum Event { + Registered { secret: SecretDataValue }, +} + +struct Behavior; + +impl EventSourcedBehavior for Behavior { + type State = State; + + type Command = Command; + + type Event = Event; + + fn for_command( + _context: &akka_persistence_rs::entity::Context, + _state: &Self::State, + command: Self::Command, + ) -> Box> { + let Command::Register { secret } = command; + emit_event(Event::Registered { secret }).boxed() + } + + fn on_event( + _context: &akka_persistence_rs::entity::Context, + state: &mut Self::State, + event: Self::Event, + ) { + let Event::Registered { secret } = event; + + state.secret = secret.clone(); + } +} + +// Declare how we marshal our data with FileLog. This is essentially +// down to encoding our event type and entity id to a key used for +// the purposes of log compaction. The marshaller will use CBOR for +// serialization of data persisted with the log. We do this as it has the +// benefits of JSON in terms of schema evolution, but is faster to serialize +// and represents itself as smaller on disk. +// Marshalers can be completely overidden to serialize events and encode +// keys in any way required. + +pub struct EventEnvelopeMarshaler { + pub events_key_secret_path: Arc, + pub secret_store: FileSecretStore, +} + +// Our event keys will occupy the top 12 bits of the key, meaning +// that we can have 4K types of event. We use 32 of the bottom 52 +// bits as the entity id. We choose 32 bits as this is a common size +// for identifiers transmitted from IoT sensors. These identifiers +// are also known as "device addresses" and represent an address +// which may, in turn, equate to a 64 bit address globally unique +// to a device. These globally unique addresses are not generally +// transmitted in order to conserve packet size. +const EVENT_TYPE_BIT_SHIFT: usize = 52; +const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF; + +#[async_trait] +impl CommitLogEventEnvelopeMarshaler for EventEnvelopeMarshaler { + type SecretStore = FileSecretStore; + + fn to_compaction_key(envelope: &EventEnvelope) -> Option { + let entity_id = envelope.entity_id.parse::().ok()?; + let Event::Registered { .. } = envelope.event; + Some(0 << EVENT_TYPE_BIT_SHIFT | entity_id as u64) + } + + fn to_entity_id(record: &streambed::commit_log::ConsumerRecord) -> Option { + let entity_id = (record.key & EVENT_ID_BIT_MASK) as u32; + let mut buffer = itoa::Buffer::new(); + Some(EntityId::from(buffer.format(entity_id))) + } + + fn secret_store(&self) -> &Self::SecretStore { + &self.secret_store + } + + fn secret_path(&self, _entity_id: &EntityId) -> Arc { + self.events_key_secret_path.clone() + } +} + +pub const EVENTS_TOPIC: &str = "registrations"; +// Size the following to the typical number of entities we expect to have in the system. +const MAX_TOPIC_COMPACTION_KEYS: usize = 1; + +/// Manage the registration. +pub async fn task( + mut commit_log: FileLog, + secret_store: FileSecretStore, + events_key_secret_path: String, + command_receiver: mpsc::Receiver>, +) { + commit_log + .register_compaction( + EVENTS_TOPIC.to_string(), + KeyBasedRetention::new(MAX_TOPIC_COMPACTION_KEYS), + ) + .await + .unwrap(); + + let file_log_topic_adapter = CommitLogTopicAdapter::new( + commit_log, + EventEnvelopeMarshaler { + events_key_secret_path: Arc::from(events_key_secret_path), + secret_store, + }, + "iot-service", + EVENTS_TOPIC, + ); + + entity_manager::run( + Behavior, + file_log_topic_adapter, + command_receiver, + NonZeroUsize::new(10).unwrap(), + ) + .await +} diff --git a/examples/iot-service/src/registration_projection.rs b/examples/iot-service/src/registration_projection.rs new file mode 100644 index 0000000..c59c5f3 --- /dev/null +++ b/examples/iot-service/src/registration_projection.rs @@ -0,0 +1,65 @@ +//! Handle registration projection concerns +//! + +use std::sync::Arc; + +use akka_persistence_rs::{entity_manager::EventEnvelope, Message}; +use akka_projection_rs::{Handler, HandlerError}; +use akka_projection_rs_commitlog::CommitLogSourceProvider; +use akka_projection_rs_storage::Command; +use async_trait::async_trait; +use streambed_confidant::FileSecretStore; +use streambed_logged::FileLog; +use tokio::sync::mpsc; + +use crate::{ + registration::{self, EventEnvelopeMarshaler}, + temperature, +}; + +/// A handler for forwarding on registration envelopes from a projection source to +/// our temperature sensor entity. +pub struct RegistrationHandler { + temperature_sender: mpsc::Sender>, +} + +#[async_trait] +impl Handler for RegistrationHandler { + type Envelope = EventEnvelope; + + async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError> { + let registration::Event::Registered { secret } = envelope.event; + self.temperature_sender + .send(Message::new( + envelope.entity_id, + temperature::Command::Register { secret }, + )) + .await + .map(|_| ()) + .map_err(|_| HandlerError) + } +} + +/// Apply sensor registrations to the temperature sensor entity. +pub async fn task( + commit_log: FileLog, + secret_store: FileSecretStore, + events_key_secret_path: String, + receiver: mpsc::Receiver, + temperature_sender: mpsc::Sender>, +) { + // Establish our source of events as a commit log. + let source_provider = CommitLogSourceProvider::new( + commit_log, + EventEnvelopeMarshaler { + events_key_secret_path: Arc::from(events_key_secret_path), + secret_store, + }, + "iot-service-projection", + registration::EVENTS_TOPIC, + ); + + let handler = RegistrationHandler { temperature_sender }; + + akka_projection_rs_storage::run(receiver, source_provider, handler).await +} diff --git a/examples/iot-service/src/temperature.rs b/examples/iot-service/src/temperature.rs index a93a871..6575104 100644 --- a/examples/iot-service/src/temperature.rs +++ b/examples/iot-service/src/temperature.rs @@ -1,42 +1,41 @@ +//! Handle temperature sensor entity concerns +//! use std::{collections::VecDeque, num::NonZeroUsize, sync::Arc}; use akka_persistence_rs::{ effect::{emit_event, reply, unhandled, EffectExt}, entity::EventSourcedBehavior, - entity_manager, EntityId, Message, Record, + entity_manager::{self, EventEnvelope}, + EntityId, Message, }; -use akka_persistence_rs_commitlog::{CommitLogRecordMarshaler, CommitLogTopicAdapter}; +use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, CommitLogTopicAdapter}; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use smol_str::SmolStr; use streambed::commit_log::Key; use streambed_confidant::FileSecretStore; use streambed_logged::{compaction::NthKeyBasedRetention, FileLog}; use tokio::sync::{mpsc, oneshot}; -const EVENTS_TOPIC: &str = "temperature"; -const MAX_HISTORY_EVENTS: usize = 10; -// Size the following to the typical number of devices we expect to have in the system. -// Note though that it will impact memory, so there is a trade-off. Let's suppose this -// was some LoRaWAN system and that our gateway cannot handle more than 1,000 devices -// being connected. We can work out that 1,000 is therefore a reasonable limit. We can -// have less or more. The overhead is small, but should be calculated and measured for -// a production app. -const MAX_TOPIC_COMPACTION_KEYS: usize = 1_000; - // Declare the entity #[derive(Default)] pub struct State { history: VecDeque, + secret: SecretDataValue, } +pub type SecretDataValue = SmolStr; + pub enum Command { Get { reply_to: oneshot::Sender> }, Post { temperature: u32 }, + Register { secret: SecretDataValue }, } #[derive(Clone, Deserialize, Serialize)] pub enum Event { + Registered { secret: SecretDataValue }, TemperatureRead { temperature: u32 }, } @@ -59,10 +58,12 @@ impl EventSourcedBehavior for Behavior { reply(reply_to, state.history.clone().into()).boxed() } - Command::Post { temperature } => { + Command::Post { temperature } if !state.secret.is_empty() => { emit_event(Event::TemperatureRead { temperature }).boxed() } + Command::Register { secret } => emit_event(Event::Registered { secret }).boxed(), + _ => unhandled(), } } @@ -70,14 +71,19 @@ impl EventSourcedBehavior for Behavior { fn on_event( _context: &akka_persistence_rs::entity::Context, state: &mut Self::State, - event: &Self::Event, + event: Self::Event, ) { - let Event::TemperatureRead { temperature } = event; - - if state.history.len() == MAX_HISTORY_EVENTS { - state.history.pop_front(); + match event { + Event::Registered { secret } => { + state.secret = secret; + } + Event::TemperatureRead { temperature } => { + if state.history.len() == MAX_HISTORY_EVENTS { + state.history.pop_front(); + } + state.history.push_back(temperature); + } } - state.history.push_back(*temperature); } } @@ -90,13 +96,13 @@ impl EventSourcedBehavior for Behavior { // Marshalers can be completely overidden to serialize events and encode // keys in any way required. -struct RecordMarshaler { +struct EventEnvelopeMarshaler { events_key_secret_path: Arc, secret_store: FileSecretStore, } // Our event keys will occupy the top 12 bits of the key, meaning -// that we can have 4K types of record. We use 32 of the bottom 52 +// that we can have 4K types of event. We use 32 of the bottom 52 // bits as the entity id. We choose 32 bits as this is a common size // for identifiers transmitted from IoT sensors. These identifiers // are also known as "device addresses" and represent an address @@ -107,13 +113,18 @@ const EVENT_TYPE_BIT_SHIFT: usize = 52; const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF; #[async_trait] -impl CommitLogRecordMarshaler for RecordMarshaler { +impl CommitLogEventEnvelopeMarshaler for EventEnvelopeMarshaler { type SecretStore = FileSecretStore; - fn to_compaction_key(record: &Record) -> Option { - let entity_id = record.entity_id.parse::().ok()?; - let Event::TemperatureRead { .. } = record.event; - Some(0 << EVENT_TYPE_BIT_SHIFT | entity_id as u64) + fn to_compaction_key(envelope: &EventEnvelope) -> Option { + let record_type = match &envelope.event { + Event::TemperatureRead { .. } => Some(0), + Event::Registered { .. } => Some(1), + }; + record_type.and_then(|record_type| { + let entity_id = envelope.entity_id.parse::().ok()?; + Some(record_type << EVENT_TYPE_BIT_SHIFT | entity_id as u64) + }) } fn to_entity_id(record: &streambed::commit_log::ConsumerRecord) -> Option { @@ -131,6 +142,16 @@ impl CommitLogRecordMarshaler for RecordMarshaler { } } +const EVENTS_TOPIC: &str = "temperature"; +const MAX_HISTORY_EVENTS: usize = 10; +// Size the following to the typical number of devices we expect to have in the system. +// Note though that it will impact memory, so there is a trade-off. Let's suppose this +// was some LoRaWAN system and that our gateway cannot handle more than 1,000 devices +// being connected. We can work out that 1,000 is therefore a reasonable limit. We can +// have less or more. The overhead is small, but should be calculated and measured for +// a production app. +const MAX_TOPIC_COMPACTION_KEYS: usize = 1_000; + /// Manage the temperature sensor. pub async fn task( mut commit_log: FileLog, @@ -153,7 +174,7 @@ pub async fn task( let file_log_topic_adapter = CommitLogTopicAdapter::new( commit_log, - RecordMarshaler { + EventEnvelopeMarshaler { events_key_secret_path: Arc::from(events_key_secret_path), secret_store, }, diff --git a/examples/iot-service/src/udp_server.rs b/examples/iot-service/src/udp_server.rs index 91c8d25..6f86e39 100644 --- a/examples/iot-service/src/udp_server.rs +++ b/examples/iot-service/src/udp_server.rs @@ -1,3 +1,5 @@ +//! Handle UDP sensor serving concerns +//! use akka_persistence_rs::Message; use log::debug; use serde::{Deserialize, Serialize};