diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index f6c5e25..d05d1ce 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -1,7 +1,7 @@ #![doc = include_str!("../README.md")] use akka_persistence_rs::{ - entity_manager::{EventEnvelope, Handler, SourceProvider}, + entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider}, EntityId, }; use async_stream::stream; @@ -9,11 +9,34 @@ use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; use std::{io, marker::PhantomData, pin::Pin, sync::Arc}; use streambed::{ - commit_log::{CommitLog, ConsumerRecord, Key, ProducerRecord, Subscription, Topic, TopicRef}, + commit_log::{ + CommitLog, ConsumerRecord, Key, Offset, ProducerRecord, Subscription, Topic, TopicRef, + }, secret_store::SecretStore, }; use tokio_stream::{Stream, StreamExt}; +/// An envelope wraps a commit log event associated with a specific entity. +#[derive(Clone, Debug, PartialEq)] +pub struct EventEnvelope { + pub entity_id: EntityId, + pub event: E, + pub offset: Offset, +} + +impl EventEnvelope { + pub fn new(entity_id: EI, event: E, offset: Offset) -> Self + where + EI: Into, + { + Self { + entity_id: entity_id.into(), + event, + offset, + } + } +} + /// Provides the ability to transform the the memory representation of Akka Persistence events from /// and to the records that a CommitLog expects. Given the "cbor" feature, we use CBOR for serialization. /// Encryption/decryption to commit log records is also applied. Therefore a secret store is expected. @@ -27,7 +50,7 @@ where /// Provide a key we can use for the purposes of log compaction. /// A key would generally comprise and event type value held in /// the high bits, and the entity id in the lower bits. - fn to_compaction_key(envelope: &EventEnvelope) -> Option; + fn to_compaction_key(entity_id: &EntityId, event: &E) -> Option; /// Extract an entity id from a consumer envelope. fn to_entity_id(record: &ConsumerRecord) -> Option; @@ -52,7 +75,7 @@ where |value| ciborium::de::from_reader(value), ) .await - .map(|event| EventEnvelope::new(entity_id, event)) + .map(|event| EventEnvelope::new(entity_id, event, record.offset)) } #[cfg(not(feature = "cbor"))] @@ -66,39 +89,38 @@ where async fn producer_record( &self, topic: Topic, - envelope: EventEnvelope, - ) -> Option<(ProducerRecord, EventEnvelope)> { - let key = Self::to_compaction_key(&envelope)?; + entity_id: EntityId, + event: E, + ) -> Option { + let key = Self::to_compaction_key(&entity_id, &event)?; let buf = streambed::encrypt_struct( self.secret_store(), - &self.secret_path(&envelope.entity_id), + &self.secret_path(&entity_id), |event| { let mut buf = Vec::new(); ciborium::ser::into_writer(event, &mut buf).map(|_| buf) }, rand::thread_rng, - &envelope.event, + &event, ) .await?; - Some(( - ProducerRecord { - topic, - headers: vec![], - timestamp: None, - key, - value: buf, - partition: 0, - }, - envelope, - )) + Some(ProducerRecord { + topic, + headers: vec![], + timestamp: None, + key, + value: buf, + partition: 0, + }) } #[cfg(not(feature = "cbor"))] async fn producer_record( &self, topic: Topic, - envelope: EventEnvelope, - ) -> Option<(ProducerRecord, EventEnvelope)>; + entity_id: EntityId, + event: E, + ) -> Option; } /// Adapts a Streambed CommitLog for use with Akka Persistence. @@ -153,7 +175,8 @@ where { async fn source_initial( &mut self, - ) -> io::Result> + Send + 'async_trait>>> { + ) -> io::Result> + Send + 'async_trait>>> + { let consumer_records = produce_to_last_offset( &self.commit_log, &self.consumer_group_name, @@ -170,7 +193,10 @@ where if let Some(envelope) = marshaler.envelope(record_entity_id, consumer_record).await { - yield envelope; + yield EntityManagerEventEnvelope::new( + envelope.entity_id, + envelope.event, + ); } } } @@ -183,7 +209,8 @@ where async fn source( &mut self, entity_id: &EntityId, - ) -> io::Result> + Send + 'async_trait>>> { + ) -> io::Result> + Send + 'async_trait>>> + { let consumer_records = produce_to_last_offset( &self.commit_log, &self.consumer_group_name, @@ -201,7 +228,10 @@ where if let Some(envelope) = marshaler.envelope(record_entity_id, consumer_record).await { - yield envelope; + yield EntityManagerEventEnvelope::new( + envelope.entity_id, + envelope.event, + ); } } } @@ -251,12 +281,19 @@ impl Handler for CommitLogTopicAdapter where CL: CommitLog, M: CommitLogEventEnvelopeMarshaler + Send + Sync, - for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, + for<'async_trait> E: Clone + DeserializeOwned + Serialize + Send + Sync + 'async_trait, { - async fn process(&mut self, envelope: EventEnvelope) -> io::Result> { - let (producer_record, envelope) = self + async fn process( + &mut self, + envelope: EntityManagerEventEnvelope, + ) -> io::Result> { + let producer_record = self .marshaler - .producer_record(self.topic.clone(), envelope) + .producer_record( + self.topic.clone(), + envelope.entity_id.clone(), + envelope.event.clone(), + ) .await .ok_or_else(|| { io::Error::new( @@ -294,7 +331,7 @@ mod tests { // Scaffolding - #[derive(Deserialize, Serialize)] + #[derive(Clone, Deserialize, Serialize)] struct MyEvent { value: String, } @@ -388,7 +425,7 @@ mod tests { impl CommitLogEventEnvelopeMarshaler for MyEventMarshaler { type SecretStore = NoopSecretStore; - fn to_compaction_key(_envelope: &EventEnvelope) -> Option { + fn to_compaction_key(_entity_id: &EntityId, _event: &MyEvent) -> Option { panic!("should not be called") } @@ -418,30 +455,28 @@ mod tests { Some(EventEnvelope { entity_id, event, - deletion_event: false, + offset: 0, }) } async fn producer_record( &self, topic: Topic, - envelope: EventEnvelope, - ) -> Option<(ProducerRecord, EventEnvelope)> { + entity_id: EntityId, + event: MyEvent, + ) -> Option { let headers = vec![Header { key: "entity-id".to_string(), - value: envelope.entity_id.as_bytes().into(), + value: entity_id.as_bytes().into(), }]; - Some(( - ProducerRecord { - topic, - headers, - timestamp: None, - key: 0, - value: envelope.event.value.clone().into_bytes(), - partition: 0, - }, - envelope, - )) + Some(ProducerRecord { + topic, + headers, + timestamp: None, + key: 0, + value: event.value.clone().into_bytes(), + partition: 0, + }) } } @@ -478,7 +513,7 @@ mod tests { // Process some events and then produce a stream. let envelope = adapter - .process(EventEnvelope::new( + .process(EntityManagerEventEnvelope::new( entity_id.clone(), MyEvent { value: "first-event".to_string(), @@ -489,7 +524,7 @@ mod tests { assert_eq!(envelope.entity_id, entity_id); let envelope = adapter - .process(EventEnvelope::new( + .process(EntityManagerEventEnvelope::new( entity_id.clone(), MyEvent { value: "second-event".to_string(), @@ -502,7 +537,7 @@ mod tests { // Produce to a different entity id, so that we can test out the filtering next. adapter - .process(EventEnvelope::new( + .process(EntityManagerEventEnvelope::new( "some-other-entity-id", MyEvent { value: "third-event".to_string(), diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index 6d5db67..07e15d9 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -1,5 +1,11 @@ #![doc = include_str!("../README.md")] +use std::{ + fmt::{self, Display, Write}, + num::Wrapping, + ops::Range, +}; + pub mod effect; pub mod entity; pub mod entity_manager; @@ -16,6 +22,23 @@ pub struct PersistenceId { pub entity_id: EntityId, } +impl PersistenceId { + pub fn new(entity_type: EntityType, entity_id: EntityId) -> Self { + Self { + entity_type, + entity_id, + } + } +} + +impl Display for PersistenceId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.entity_type)?; + f.write_char('|')?; + f.write_str(&self.entity_id) + } +} + /// A message encapsulates a command that is addressed to a specific entity. #[derive(Debug, PartialEq)] pub struct Message { @@ -34,3 +57,77 @@ impl Message { } } } + +/// A slice is deterministically defined based on the persistence id. +/// `numberOfSlices` is not configurable because changing the value would result in +/// different slice for a persistence id than what was used before, which would +/// result in invalid events_by_slices call on a source provider. +pub const NUMBER_OF_SLICES: u32 = 1024; + +/// A slice is deterministically defined based on the persistence id. The purpose is to +/// evenly distribute all persistence ids over the slices and be able to query the +/// events for a range of slices. +pub fn slice_for_persistence_id(persistence_id: &PersistenceId) -> u32 { + (jdk_string_hashcode(&persistence_id.to_string()) % NUMBER_OF_SLICES as i32).unsigned_abs() +} + +/// Split the total number of slices into ranges by the given `number_of_ranges`. +/// For example, `NUMBER_OF_SLICES` is 1024 and given 4 `number_of_ranges` this method will +/// return ranges (0 to 255), (256 to 511), (512 to 767) and (768 to 1023). +pub fn slice_ranges(number_of_ranges: u32) -> Vec> { + let range_size = NUMBER_OF_SLICES / number_of_ranges; + assert!( + number_of_ranges * range_size == NUMBER_OF_SLICES, + "number_of_ranges must be a whole number divisor of numberOfSlices." + ); + let mut ranges = Vec::with_capacity(number_of_ranges as usize); + for i in 0..number_of_ranges { + ranges.push(i * range_size..i * range_size + range_size) + } + ranges +} + +// Implementation of the JDK8 string hashcode: +// https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#hashCode +fn jdk_string_hashcode(s: &str) -> i32 { + let mut hash = Wrapping(0i32); + const MULTIPLIER: Wrapping = Wrapping(31); + let count = s.len(); + if count > 0 { + let mut chars = s.chars(); + for _ in 0..count { + hash = hash * MULTIPLIER + Wrapping(chars.next().unwrap() as i32); + } + } + hash.0 +} + +#[cfg(test)] +mod tests { + use smol_str::SmolStr; + + use super::*; + + #[test] + fn test_jdk_string_hashcode() { + assert_eq!(jdk_string_hashcode(""), 0); + assert_eq!(jdk_string_hashcode("howtodoinjava.com"), 1894145264); + assert_eq!(jdk_string_hashcode("hello world"), 1794106052); + } + + #[test] + fn test_slice_for_persistence_id() { + assert_eq!( + slice_for_persistence_id(&PersistenceId::new( + SmolStr::from("some-entity-type"), + SmolStr::from("some-entity-id") + )), + 451 + ); + } + + #[test] + fn test_slice_ranges() { + assert_eq!(slice_ranges(4), vec![0..256, 256..512, 512..768, 768..1024]); + } +} diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index 2d54449..8701dd8 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -1,9 +1,9 @@ #![doc = include_str!("../README.md")] -use std::{marker::PhantomData, pin::Pin}; +use std::{marker::PhantomData, ops::Range, pin::Pin}; -use akka_persistence_rs::{entity_manager::EventEnvelope, EntityType}; -use akka_persistence_rs_commitlog::CommitLogEventEnvelopeMarshaler; +use akka_persistence_rs::EntityType; +use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, EventEnvelope}; use akka_projection_rs::{Offset, SourceProvider}; use serde::{de::DeserializeOwned, Serialize}; use streambed::commit_log::{CommitLog, Topic, TopicRef}; @@ -14,16 +14,24 @@ pub struct CommitLogSourceProvider { _commit_log: CL, _consumer_group_name: String, _marshaler: M, + _slice_range: Range, _topic: Topic, phantom: PhantomData, } impl CommitLogSourceProvider { - pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: TopicRef) -> Self { + pub fn new( + commit_log: CL, + marshaler: M, + consumer_group_name: &str, + topic: TopicRef, + slice_range: Range, + ) -> Self { Self { _commit_log: commit_log, _consumer_group_name: consumer_group_name.into(), _marshaler: marshaler, + _slice_range: slice_range, _topic: topic.into(), phantom: PhantomData, } diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index 7396d3b..058089a 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -1,9 +1,11 @@ use std::{marker::PhantomData, pin::Pin}; -use akka_persistence_rs::{entity_manager::EventEnvelope, EntityType}; +use akka_persistence_rs::EntityType; use akka_projection_rs::{Offset, SourceProvider}; use tokio_stream::Stream; +use crate::EventEnvelope; + pub struct GrpcSourceProvider { phantom: PhantomData, } diff --git a/akka-projection-rs-grpc/src/lib.rs b/akka-projection-rs-grpc/src/lib.rs index 909a150..61f59a7 100644 --- a/akka-projection-rs-grpc/src/lib.rs +++ b/akka-projection-rs-grpc/src/lib.rs @@ -1,3 +1,25 @@ #![doc = include_str!("../README.md")] +use akka_persistence_rs::EntityId; + pub mod consumer; + +/// An envelope wraps a gRPC event associated with a specific entity. +/// FIXME: This will have gRPC specific fields. +#[derive(Clone, Debug, PartialEq)] +pub struct EventEnvelope { + pub entity_id: EntityId, + pub event: E, +} + +impl EventEnvelope { + pub fn new(entity_id: EI, event: E) -> Self + where + EI: Into, + { + Self { + entity_id: entity_id.into(), + event, + } + } +} diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index 9773450..7f97272 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -9,9 +9,10 @@ pub enum Command { } /// Provides local file system based storage for projection offsets. -pub async fn run(_receiver: Receiver, _source_provider: SP, _handler: H) +pub async fn run(_receiver: Receiver, _source_provider: FSP, _handler: H) where H: Handler, + FSP: FnMut(u32) -> Option, SP: SourceProvider, { } diff --git a/examples/iot-service/src/registration.rs b/examples/iot-service/src/registration.rs index dda8ec7..a63aa17 100644 --- a/examples/iot-service/src/registration.rs +++ b/examples/iot-service/src/registration.rs @@ -6,7 +6,7 @@ use std::{num::NonZeroUsize, sync::Arc}; use akka_persistence_rs::{ effect::{emit_event, EffectExt}, entity::EventSourcedBehavior, - entity_manager::{self, EventEnvelope}, + entity_manager::{self}, EntityId, Message, }; use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, CommitLogTopicAdapter}; @@ -94,9 +94,9 @@ const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF; impl CommitLogEventEnvelopeMarshaler for EventEnvelopeMarshaler { type SecretStore = FileSecretStore; - fn to_compaction_key(envelope: &EventEnvelope) -> Option { - let entity_id = envelope.entity_id.parse::().ok()?; - let Event::Registered { .. } = envelope.event; + fn to_compaction_key(entity_id: &EntityId, event: &Event) -> Option { + let entity_id = entity_id.parse::().ok()?; + let Event::Registered { .. } = event; Some(0 << EVENT_TYPE_BIT_SHIFT | entity_id as u64) } diff --git a/examples/iot-service/src/registration_projection.rs b/examples/iot-service/src/registration_projection.rs index c59c5f3..a3b5106 100644 --- a/examples/iot-service/src/registration_projection.rs +++ b/examples/iot-service/src/registration_projection.rs @@ -3,7 +3,8 @@ use std::sync::Arc; -use akka_persistence_rs::{entity_manager::EventEnvelope, Message}; +use akka_persistence_rs::{slice_ranges, Message}; +use akka_persistence_rs_commitlog::EventEnvelope; use akka_projection_rs::{Handler, HandlerError}; use akka_projection_rs_commitlog::CommitLogSourceProvider; use akka_projection_rs_storage::Command; @@ -48,18 +49,32 @@ pub async fn task( receiver: mpsc::Receiver, temperature_sender: mpsc::Sender>, ) { - // Establish our source of events as a commit log. - let source_provider = CommitLogSourceProvider::new( - commit_log, - EventEnvelopeMarshaler { - events_key_secret_path: Arc::from(events_key_secret_path), - secret_store, - }, - "iot-service-projection", - registration::EVENTS_TOPIC, - ); + let events_key_secret_path: Arc = Arc::from(events_key_secret_path); + // When it comes to having a projection sourced from a local + // commit log, there's little benefit if having many of them. + // We therefore manage all slices from just one projection. + let slice_ranges = slice_ranges(1); + + // A closure to establish our source of events as a commit log. + let source_provider = |slice| { + Some(CommitLogSourceProvider::new( + commit_log.clone(), + EventEnvelopeMarshaler { + events_key_secret_path: events_key_secret_path.clone(), + secret_store: secret_store.clone(), + }, + "iot-service-projection", + registration::EVENTS_TOPIC, + slice_ranges.get(slice as usize).cloned()?, + )) + }; + + // Declare a handler to forward projection events on to the temperature entity. let handler = RegistrationHandler { temperature_sender }; + // Finally, start up a projection that will use Streambed storage + // to remember the offset consumed. This then permits us to restart + // from a specific point in the source given restarts. akka_projection_rs_storage::run(receiver, source_provider, handler).await } diff --git a/examples/iot-service/src/temperature.rs b/examples/iot-service/src/temperature.rs index 6575104..10dd2f3 100644 --- a/examples/iot-service/src/temperature.rs +++ b/examples/iot-service/src/temperature.rs @@ -5,7 +5,7 @@ use std::{collections::VecDeque, num::NonZeroUsize, sync::Arc}; use akka_persistence_rs::{ effect::{emit_event, reply, unhandled, EffectExt}, entity::EventSourcedBehavior, - entity_manager::{self, EventEnvelope}, + entity_manager::{self}, EntityId, Message, }; use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, CommitLogTopicAdapter}; @@ -116,13 +116,13 @@ const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF; impl CommitLogEventEnvelopeMarshaler for EventEnvelopeMarshaler { type SecretStore = FileSecretStore; - fn to_compaction_key(envelope: &EventEnvelope) -> Option { - let record_type = match &envelope.event { + fn to_compaction_key(entity_id: &EntityId, event: &Event) -> Option { + let record_type = match event { Event::TemperatureRead { .. } => Some(0), Event::Registered { .. } => Some(1), }; record_type.and_then(|record_type| { - let entity_id = envelope.entity_id.parse::().ok()?; + let entity_id = entity_id.parse::().ok()?; Some(record_type << EVENT_TYPE_BIT_SHIFT | entity_id as u64) }) }