From 60172c08cce3c5c0301e80479ad18d79e60086c1 Mon Sep 17 00:00:00 2001 From: huntc Date: Wed, 23 Aug 2023 18:02:45 +1000 Subject: [PATCH 1/4] Implement local projections Fleshed out the projection implementation. I'm not entirely across the required behaviour of slices, but I think the gRPC implementation will help me there. For now, I've removed some of the slice concerns when sourcing from the local commit log. The example README has been updated and registrations and their projections are fully working in this local scenario. I also improved some Streambed performance by migrating topics to SmolStr, as per what we do in this lib. Consequently, there's an update to dependencies. --- Cargo.toml | 7 +- akka-persistence-rs-commitlog/src/lib.rs | 22 ++--- akka-persistence-rs/src/lib.rs | 2 +- akka-projection-rs-commitlog/Cargo.toml | 2 + akka-projection-rs-commitlog/src/lib.rs | 83 ++++++++++++------ akka-projection-rs-grpc/Cargo.toml | 1 + akka-projection-rs-grpc/src/consumer.rs | 18 ++-- akka-projection-rs-storage/Cargo.toml | 8 ++ akka-projection-rs-storage/src/lib.rs | 85 ++++++++++++++++++- akka-projection-rs/src/lib.rs | 12 ++- examples/iot-service/README.md | 21 ++++- examples/iot-service/src/main.rs | 13 ++- examples/iot-service/src/registration.rs | 7 +- .../src/registration_projection.rs | 33 ++++--- examples/iot-service/src/temperature.rs | 10 ++- 15 files changed, 237 insertions(+), 87 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fdba106..09a161d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,9 +29,10 @@ rand = "0.8" scopeguard = "1.1" serde = "1.0.151" smol_str = "0.2.0" -streambed = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" } -streambed-confidant = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" } -streambed-logged = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" } +streambed = { git = "https://github.com/streambed/streambed-rs.git", rev = "d343171f7ec85cc028674115c8df30150b741ef6" } +streambed-confidant = { git = "https://github.com/streambed/streambed-rs.git", rev = "d343171f7ec85cc028674115c8df30150b741ef6" } +streambed-logged = { git = "https://github.com/streambed/streambed-rs.git", rev = "d343171f7ec85cc028674115c8df30150b741ef6" } +streambed-storage = { git = "https://github.com/streambed/streambed-rs.git", rev = "d343171f7ec85cc028674115c8df30150b741ef6" } test-log = "0.2.11" tokio = "1.23.0" tokio-stream = "0.1.14" diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index d05d1ce..5fe7f19 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -9,9 +9,7 @@ use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; use std::{io, marker::PhantomData, pin::Pin, sync::Arc}; use streambed::{ - commit_log::{ - CommitLog, ConsumerRecord, Key, Offset, ProducerRecord, Subscription, Topic, TopicRef, - }, + commit_log::{CommitLog, ConsumerRecord, Key, Offset, ProducerRecord, Subscription, Topic}, secret_store::SecretStore, }; use tokio_stream::{Stream, StreamExt}; @@ -155,12 +153,12 @@ where M: CommitLogEventEnvelopeMarshaler, for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { - pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: TopicRef) -> Self { + pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: Topic) -> Self { Self { commit_log, consumer_group_name: consumer_group_name.into(), marshaler, - topic: topic.into(), + topic, phantom: PhantomData, } } @@ -466,7 +464,7 @@ mod tests { event: MyEvent, ) -> Option { let headers = vec![Header { - key: "entity-id".to_string(), + key: Topic::from("entity-id"), value: entity_id.as_bytes().into(), }]; Some(ProducerRecord { @@ -496,7 +494,7 @@ mod tests { commit_log.clone(), marshaler, "some-consumer", - "some-topic", + Topic::from("some-topic"), ); // Scaffolding @@ -552,7 +550,7 @@ mod tests { for _ in 0..10 { let last_offset = commit_log - .offsets("some-topic".to_string(), 0) + .offsets(Topic::from("some-topic"), 0) .await .map(|lo| lo.end_offset); if last_offset == Some(3) { @@ -582,8 +580,12 @@ mod tests { let marshaler = MyEventMarshaler; - let file_log_topic_adapter = - CommitLogTopicAdapter::new(commit_log, marshaler, "some-consumer", "some-topic"); + let file_log_topic_adapter = CommitLogTopicAdapter::new( + commit_log, + marshaler, + "some-consumer", + Topic::from("some-topic"), + ); let my_behavior = MyBehavior; diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index 07e15d9..c61ff4a 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -59,7 +59,7 @@ impl Message { } /// A slice is deterministically defined based on the persistence id. -/// `numberOfSlices` is not configurable because changing the value would result in +/// `NUMBER_OF_SLICES` is not configurable because changing the value would result in /// different slice for a persistence id than what was used before, which would /// result in invalid events_by_slices call on a source provider. pub const NUMBER_OF_SLICES: u32 = 1024; diff --git a/akka-projection-rs-commitlog/Cargo.toml b/akka-projection-rs-commitlog/Cargo.toml index 554a4ad..1ecc239 100644 --- a/akka-projection-rs-commitlog/Cargo.toml +++ b/akka-projection-rs-commitlog/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] +async-stream = { workspace = true } +async-trait = { workspace = true } serde = { workspace = true } streambed = { workspace = true } tokio-stream = { workspace = true } diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index 8701dd8..69bc197 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -1,59 +1,88 @@ #![doc = include_str!("../README.md")] -use std::{marker::PhantomData, ops::Range, pin::Pin}; +use std::{marker::PhantomData, pin::Pin}; use akka_persistence_rs::EntityType; use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, EventEnvelope}; use akka_projection_rs::{Offset, SourceProvider}; +use async_stream::stream; +use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; -use streambed::commit_log::{CommitLog, Topic, TopicRef}; -use tokio_stream::Stream; +use streambed::commit_log::{CommitLog, ConsumerOffset, Subscription, Topic}; +use tokio_stream::{Stream, StreamExt}; /// Source events for a projection from a Streambed commit log. pub struct CommitLogSourceProvider { - _commit_log: CL, - _consumer_group_name: String, - _marshaler: M, - _slice_range: Range, - _topic: Topic, + commit_log: CL, + consumer_group_name: String, + marshaler: M, + topic: Topic, phantom: PhantomData, } impl CommitLogSourceProvider { - pub fn new( - commit_log: CL, - marshaler: M, - consumer_group_name: &str, - topic: TopicRef, - slice_range: Range, - ) -> Self { + pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: Topic) -> Self { Self { - _commit_log: commit_log, - _consumer_group_name: consumer_group_name.into(), - _marshaler: marshaler, - _slice_range: slice_range, - _topic: topic.into(), + commit_log, + consumer_group_name: consumer_group_name.into(), + marshaler, + topic, phantom: PhantomData, } } } +#[async_trait] impl SourceProvider for CommitLogSourceProvider where CL: CommitLog, - M: CommitLogEventEnvelopeMarshaler, + M: CommitLogEventEnvelopeMarshaler + Sync, for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { type Envelope = EventEnvelope; - type Offset = Offset; - - fn events_by_slices( + async fn events_by_slices( + &self, + // The entity type means nothing in this context as we have a topic. _entity_type: EntityType, + // Slices are not used with the commit log _min_slice: u32, _max_slice: u32, - _offset: Offset, - ) -> Pin> + Send>> { - todo!() + offset: Option, + ) -> Pin> + Send + 'async_trait>> { + let offsets = if let Some(Offset::Sequence(offset)) = offset { + vec![ConsumerOffset { + partition: 0, + offset, + topic: self.topic.clone(), + }] + } else { + vec![] + }; + + let subscriptions = vec![Subscription { + topic: self.topic.clone(), + }]; + + let mut records = self.commit_log.scoped_subscribe( + self.consumer_group_name.as_str(), + offsets, + subscriptions, + None, + ); + + let marshaler = &self.marshaler; + + Box::pin(stream!({ + while let Some(consumer_record) = records.next().await { + if let Some(record_entity_id) = M::to_entity_id(&consumer_record) { + if let Some(envelope) = + marshaler.envelope(record_entity_id, consumer_record).await + { + yield envelope; + } + } + } + })) } } diff --git a/akka-projection-rs-grpc/Cargo.toml b/akka-projection-rs-grpc/Cargo.toml index 02ee8e2..b0264ba 100644 --- a/akka-projection-rs-grpc/Cargo.toml +++ b/akka-projection-rs-grpc/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +async-trait = { workspace = true } tokio-stream = { workspace = true } akka-persistence-rs = { path = "../akka-persistence-rs" } diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index 058089a..a5aea0f 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -2,6 +2,7 @@ use std::{marker::PhantomData, pin::Pin}; use akka_persistence_rs::EntityType; use akka_projection_rs::{Offset, SourceProvider}; +use async_trait::async_trait; use tokio_stream::Stream; use crate::EventEnvelope; @@ -10,19 +11,20 @@ pub struct GrpcSourceProvider { phantom: PhantomData, } -impl SourceProvider for GrpcSourceProvider { - /// The envelope processed by the provider. +#[async_trait] +impl SourceProvider for GrpcSourceProvider +where + E: Sync, +{ type Envelope = EventEnvelope; - /// The type that describes offsets into a journal - type Offset = Offset; - - fn events_by_slices( + async fn events_by_slices( + &self, _entity_type: EntityType, _min_slice: u32, _max_slice: u32, - _offset: Offset, - ) -> Pin> + Send>> { + _offset: Option, + ) -> Pin> + Send + 'async_trait>> { todo!() } } diff --git a/akka-projection-rs-storage/Cargo.toml b/akka-projection-rs-storage/Cargo.toml index 6ca5cfc..84f8a79 100644 --- a/akka-projection-rs-storage/Cargo.toml +++ b/akka-projection-rs-storage/Cargo.toml @@ -7,6 +7,14 @@ edition = "2021" [dependencies] async-trait = { workspace = true } +ciborium = { workspace = true } +log = { workspace = true } +rand = { workspace = true } +serde = { workspace = true } +streambed = { workspace = true } +streambed-storage = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true } +akka-persistence-rs = { path = "../akka-persistence-rs" } akka-projection-rs = { path = "../akka-projection-rs" } diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index 7f97272..b7b1db7 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -1,18 +1,97 @@ #![doc = include_str!("../README.md")] -use akka_projection_rs::{Handler, SourceProvider}; +use std::path::Path; + +use akka_persistence_rs::EntityType; +use akka_projection_rs::{Handler, Offset, SourceProvider}; +use log::error; +use serde::{Deserialize, Serialize}; +use streambed::{commit_log::Offset as CommitLogOffset, secret_store::SecretStore}; use tokio::sync::mpsc::Receiver; +use tokio_stream::StreamExt; /// The commands that a projection task is receptive to. pub enum Command { Stop, } +#[derive(Default, Deserialize, Serialize)] +struct StorableState { + last_offset: Option, +} + /// Provides local file system based storage for projection offsets. -pub async fn run(_receiver: Receiver, _source_provider: FSP, _handler: H) -where +pub async fn run( + secret_store: &impl SecretStore, + secret_path: &str, + state_storage_path: &Path, + mut receiver: Receiver, + mut source_provider: FSP, + handler: H, +) where H: Handler, FSP: FnMut(u32) -> Option, SP: SourceProvider, { + // Source our last offset recorded, if we have one. + + let mut storable_state = + streambed_storage::load_struct(state_storage_path, secret_store, secret_path, |bytes| { + ciborium::de::from_reader::(bytes) + }) + .await + .unwrap_or_default(); + + // For now, we're going to produce a source provider for just one slice. + // When we implement the gRPC consumer, we will likely have to do more. + + if let Some(source_provider) = source_provider(0) { + // FIXME what to provide for entity type. + // FIXME what to provide for slice min/max. + let mut source = source_provider + .events_by_slices( + EntityType::from(""), + 0, + 0, + storable_state.last_offset.map(Offset::Sequence), + ) + .await; + + let serializer = |state: &StorableState| { + let mut buf = Vec::new(); + ciborium::ser::into_writer(state, &mut buf).map(|_| buf) + }; + + loop { + tokio::select! { + envelope = source.next() => { + if let Some(envelope) = envelope { + if let Ok(Offset::Sequence(offset)) = handler.process(envelope).await { + storable_state.last_offset = Some(offset); + if streambed_storage::save_struct( + state_storage_path, + secret_store, + secret_path, + serializer, + rand::thread_rng, + &storable_state + ) + .await.is_err() { + error!("Cannot persist offsets"); + } + + } + } + } + _ = receiver.recv() => { + break; + } + else => { + break; + } + } + } + } else { + error!("Cannot obtain a source provider. Exiting the projection runner."); + } } diff --git a/akka-projection-rs/src/lib.rs b/akka-projection-rs/src/lib.rs index 6f6c348..bd88895 100644 --- a/akka-projection-rs/src/lib.rs +++ b/akka-projection-rs/src/lib.rs @@ -31,7 +31,7 @@ pub trait Handler { /// Process an envelope. /// An offset will be persisted given a successful result. - async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError>; + async fn process(&self, envelope: Self::Envelope) -> Result; } /// Errors for event processing by a handler. @@ -43,9 +43,6 @@ pub trait SourceProvider { /// The envelope processed by the provider. type Envelope; - /// The type that describes offsets into a journal - type Offset; - /// Query events for given slices. A slice is deterministically defined based on the persistence id. The purpose is to /// evenly distribute all persistence ids over the slices. /// @@ -69,10 +66,11 @@ pub trait SourceProvider { /// /// The stream is not completed when it reaches the end of the currently stored events, but it continues to push new /// events when new events are persisted. - fn events_by_slices( + async fn events_by_slices( + &self, entity_type: EntityType, min_slice: u32, max_slice: u32, - offset: Self::Offset, - ) -> Pin + Send>>; + offset: Option, + ) -> Pin + Send + 'async_trait>>; } diff --git a/examples/iot-service/README.md b/examples/iot-service/README.md index c7126d8..145e0be 100644 --- a/examples/iot-service/README.md +++ b/examples/iot-service/README.md @@ -32,15 +32,26 @@ of authentication where, in the real-world, a shared key between the device and would be conveyed. That key would then be used to encrypt data. We simply use the key as a registration mechanism and do not accept data for devices where we have no key. +Let's first query for a sensor's data... it will fail as we have no sensors yet: + +``` +curl -v "127.0.0.1:8080/api/temperature/1" +``` + +So, let's now provision one: + +``` curl -v -d '"1"' -H"Content-Type: application/json" "127.0.0.1:8080/api/temperature" +``` -You should now be able to query for the current state of a temperature sensor: +You should now be able to query for the current state of a temperature sensor, although +they'll be no observations recorded for it yet: ``` curl -v "127.0.0.1:8080/api/temperature/1" ``` -You should also be able to post database events to the UDP socket. Note that +Let's now post database events to the UDP socket so that the sensor has observations. Note that we're using Postcard to deserialize binary data. Postcard uses variable length integers where the top bit, when set, indicates that the next byte also contains data. See [Postcard](https://docs.rs/postcard/latest/postcard/) for more details. @@ -50,7 +61,11 @@ echo -n "\x01\x02" | nc -w0 127.0.0.1 -u 8081 ``` You should see a `DEBUG` log indicating that the post has been received. You should -also be able to query the database again with the id that was sent (`1`). +also be able to query the database again with the id that was sent (`1`): + +``` +curl -v "127.0.0.1:8080/api/temperature/1" +``` You should also be able to see events being written to the log file store itself: diff --git a/examples/iot-service/src/main.rs b/examples/iot-service/src/main.rs index dc91168..c0e3c5d 100644 --- a/examples/iot-service/src/main.rs +++ b/examples/iot-service/src/main.rs @@ -75,6 +75,11 @@ async fn main() -> Result<(), Box> { let temperature_events_key_secret_path = format!("{}/secrets.temperature-events.key", args.ss_args.ss_ns); + // The path of a key to use to encrypt offsets for registration projections. We'll + // just re-use the above key for convenience, but ordinarily, these keys would be + // different. + let registration_offset_key_secret_path = temperature_events_key_secret_path.clone(); + if let Ok(None) = ss.get_secret(&temperature_events_key_secret_path).await { // If we can't write this initial secret then all bets are off let mut key = vec![0; 16]; @@ -116,18 +121,18 @@ async fn main() -> Result<(), Box> { ss.clone(), temperature_events_key_secret_path.clone(), registration_command_receiver, - )) - .await?; + )); // Start up a task to manage registration projections tokio::spawn(registration_projection::task( cl.clone(), ss.clone(), temperature_events_key_secret_path.clone(), + registration_offset_key_secret_path.clone(), registration_projection_command_receiver, + args.cl_args.cl_root_path.join("registration-offsets"), temperature_command, - )) - .await?; + )); // All things started up but our temperature. We're running // that in our main task. Therefore, we will return once the diff --git a/examples/iot-service/src/registration.rs b/examples/iot-service/src/registration.rs index a63aa17..0d69284 100644 --- a/examples/iot-service/src/registration.rs +++ b/examples/iot-service/src/registration.rs @@ -13,7 +13,7 @@ use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, CommitLogTo use async_trait::async_trait; use serde::{Deserialize, Serialize}; use smol_str::SmolStr; -use streambed::commit_log::Key; +use streambed::commit_log::{Key, Topic}; use streambed_confidant::FileSecretStore; use streambed_logged::{compaction::KeyBasedRetention, FileLog}; use tokio::sync::mpsc; @@ -126,9 +126,10 @@ pub async fn task( events_key_secret_path: String, command_receiver: mpsc::Receiver>, ) { + let events_topic = Topic::from(EVENTS_TOPIC); commit_log .register_compaction( - EVENTS_TOPIC.to_string(), + events_topic.clone(), KeyBasedRetention::new(MAX_TOPIC_COMPACTION_KEYS), ) .await @@ -141,7 +142,7 @@ pub async fn task( secret_store, }, "iot-service", - EVENTS_TOPIC, + events_topic, ); entity_manager::run( diff --git a/examples/iot-service/src/registration_projection.rs b/examples/iot-service/src/registration_projection.rs index a3b5106..9152c35 100644 --- a/examples/iot-service/src/registration_projection.rs +++ b/examples/iot-service/src/registration_projection.rs @@ -1,14 +1,15 @@ //! Handle registration projection concerns //! -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; -use akka_persistence_rs::{slice_ranges, Message}; +use akka_persistence_rs::Message; use akka_persistence_rs_commitlog::EventEnvelope; -use akka_projection_rs::{Handler, HandlerError}; +use akka_projection_rs::{Handler, HandlerError, Offset}; use akka_projection_rs_commitlog::CommitLogSourceProvider; use akka_projection_rs_storage::Command; use async_trait::async_trait; +use streambed::commit_log::Topic; use streambed_confidant::FileSecretStore; use streambed_logged::FileLog; use tokio::sync::mpsc; @@ -28,7 +29,7 @@ pub struct RegistrationHandler { impl Handler for RegistrationHandler { type Envelope = EventEnvelope; - async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError> { + async fn process(&self, envelope: Self::Envelope) -> Result { let registration::Event::Registered { secret } = envelope.event; self.temperature_sender .send(Message::new( @@ -36,7 +37,7 @@ impl Handler for RegistrationHandler { temperature::Command::Register { secret }, )) .await - .map(|_| ()) + .map(|_| Offset::Sequence(envelope.offset)) .map_err(|_| HandlerError) } } @@ -46,18 +47,15 @@ pub async fn task( commit_log: FileLog, secret_store: FileSecretStore, events_key_secret_path: String, + offsets_key_secret_path: String, receiver: mpsc::Receiver, + state_storage_path: PathBuf, temperature_sender: mpsc::Sender>, ) { let events_key_secret_path: Arc = Arc::from(events_key_secret_path); - // When it comes to having a projection sourced from a local - // commit log, there's little benefit if having many of them. - // We therefore manage all slices from just one projection. - let slice_ranges = slice_ranges(1); - // A closure to establish our source of events as a commit log. - let source_provider = |slice| { + let source_provider = |_slice| { Some(CommitLogSourceProvider::new( commit_log.clone(), EventEnvelopeMarshaler { @@ -65,8 +63,7 @@ pub async fn task( secret_store: secret_store.clone(), }, "iot-service-projection", - registration::EVENTS_TOPIC, - slice_ranges.get(slice as usize).cloned()?, + Topic::from(registration::EVENTS_TOPIC), )) }; @@ -76,5 +73,13 @@ pub async fn task( // Finally, start up a projection that will use Streambed storage // to remember the offset consumed. This then permits us to restart // from a specific point in the source given restarts. - akka_projection_rs_storage::run(receiver, source_provider, handler).await + akka_projection_rs_storage::run( + &secret_store, + &offsets_key_secret_path, + &state_storage_path, + receiver, + source_provider, + handler, + ) + .await } diff --git a/examples/iot-service/src/temperature.rs b/examples/iot-service/src/temperature.rs index 10dd2f3..b135425 100644 --- a/examples/iot-service/src/temperature.rs +++ b/examples/iot-service/src/temperature.rs @@ -12,7 +12,7 @@ use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, CommitLogTo use async_trait::async_trait; use serde::{Deserialize, Serialize}; use smol_str::SmolStr; -use streambed::commit_log::Key; +use streambed::commit_log::{Key, Topic}; use streambed_confidant::FileSecretStore; use streambed_logged::{compaction::NthKeyBasedRetention, FileLog}; use tokio::sync::{mpsc, oneshot}; @@ -54,7 +54,7 @@ impl EventSourcedBehavior for Behavior { command: Self::Command, ) -> Box> { match command { - Command::Get { reply_to } if !state.history.is_empty() => { + Command::Get { reply_to } if !state.secret.is_empty() => { reply(reply_to, state.history.clone().into()).boxed() } @@ -164,9 +164,11 @@ pub async fn task( // events are removed. In our scenario, unwanted events can be removed when // the exceed MAX_HISTORY_EVENTS as we do not have a requirement to ever // return more than that. + let events_topic = Topic::from(EVENTS_TOPIC); + commit_log .register_compaction( - EVENTS_TOPIC.to_string(), + events_topic.clone(), NthKeyBasedRetention::new(MAX_TOPIC_COMPACTION_KEYS, MAX_HISTORY_EVENTS), ) .await @@ -179,7 +181,7 @@ pub async fn task( secret_store, }, "iot-service", - EVENTS_TOPIC, + events_topic, ); entity_manager::run( From da3a3022a5050f56bd756fba817f9309ca86c48c Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 24 Aug 2023 09:30:32 +1000 Subject: [PATCH 2/4] Relocate Offset The declaration of Offset has been moved to Akka persistence (previously known only to Akka projection). This permits us to constraint Envelope types to those that can provide an offset. This move is in line with the JVM where Offset is also an Akka persistence type. --- akka-persistence-rs-commitlog/src/lib.rs | 17 +++++++++++++---- akka-persistence-rs/src/lib.rs | 17 +++++++++++++++++ akka-projection-rs-commitlog/src/lib.rs | 4 ++-- akka-projection-rs-grpc/src/consumer.rs | 4 ++-- akka-projection-rs-storage/src/lib.rs | 9 +++++---- akka-projection-rs/src/lib.rs | 17 ++--------------- .../iot-service/src/registration_projection.rs | 6 +++--- 7 files changed, 44 insertions(+), 30 deletions(-) diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 5fe7f19..5c4b2e0 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -2,14 +2,17 @@ use akka_persistence_rs::{ entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider}, - EntityId, + EntityId, Offset, WithOffset, }; use async_stream::stream; use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; use std::{io, marker::PhantomData, pin::Pin, sync::Arc}; use streambed::{ - commit_log::{CommitLog, ConsumerRecord, Key, Offset, ProducerRecord, Subscription, Topic}, + commit_log::{ + CommitLog, ConsumerRecord, Key, Offset as CommitLogOffset, ProducerRecord, Subscription, + Topic, + }, secret_store::SecretStore, }; use tokio_stream::{Stream, StreamExt}; @@ -19,11 +22,11 @@ use tokio_stream::{Stream, StreamExt}; pub struct EventEnvelope { pub entity_id: EntityId, pub event: E, - pub offset: Offset, + pub offset: CommitLogOffset, } impl EventEnvelope { - pub fn new(entity_id: EI, event: E, offset: Offset) -> Self + pub fn new(entity_id: EI, event: E, offset: CommitLogOffset) -> Self where EI: Into, { @@ -35,6 +38,12 @@ impl EventEnvelope { } } +impl WithOffset for EventEnvelope { + fn offset(&self) -> Offset { + Offset::Sequence(self.offset) + } +} + /// Provides the ability to transform the the memory representation of Akka Persistence events from /// and to the records that a CommitLog expects. Given the "cbor" feature, we use CBOR for serialization. /// Encryption/decryption to commit log records is also applied. Therefore a secret store is expected. diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index c61ff4a..7978984 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -58,6 +58,23 @@ impl Message { } } +pub enum Offset { + /// Corresponds to an ordered sequence number for the events. Note that the corresponding + /// offset of each event is provided in an Envelope, + /// which makes it possible to resume the stream at a later point from a given offset. + /// + /// The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + /// in the returned stream. This means that you can use the offset that is returned in an `Envelope` + /// as the `offset` parameter in a subsequent query. + /// + Sequence(u64), +} + +/// Implemented by structures that can return an offset. +pub trait WithOffset { + fn offset(&self) -> Offset; +} + /// A slice is deterministically defined based on the persistence id. /// `NUMBER_OF_SLICES` is not configurable because changing the value would result in /// different slice for a persistence id than what was used before, which would diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index 69bc197..3ccf938 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -2,9 +2,9 @@ use std::{marker::PhantomData, pin::Pin}; -use akka_persistence_rs::EntityType; +use akka_persistence_rs::{EntityType, Offset}; use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, EventEnvelope}; -use akka_projection_rs::{Offset, SourceProvider}; +use akka_projection_rs::SourceProvider; use async_stream::stream; use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index a5aea0f..e4a98da 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -1,7 +1,7 @@ use std::{marker::PhantomData, pin::Pin}; -use akka_persistence_rs::EntityType; -use akka_projection_rs::{Offset, SourceProvider}; +use akka_persistence_rs::{EntityType, Offset}; +use akka_projection_rs::SourceProvider; use async_trait::async_trait; use tokio_stream::Stream; diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index b7b1db7..ecf68e7 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -2,8 +2,8 @@ use std::path::Path; -use akka_persistence_rs::EntityType; -use akka_projection_rs::{Handler, Offset, SourceProvider}; +use akka_persistence_rs::{EntityType, Offset, WithOffset}; +use akka_projection_rs::{Handler, SourceProvider}; use log::error; use serde::{Deserialize, Serialize}; use streambed::{commit_log::Offset as CommitLogOffset, secret_store::SecretStore}; @@ -29,6 +29,7 @@ pub async fn run( mut source_provider: FSP, handler: H, ) where + E: WithOffset, H: Handler, FSP: FnMut(u32) -> Option, SP: SourceProvider, @@ -66,7 +67,8 @@ pub async fn run( tokio::select! { envelope = source.next() => { if let Some(envelope) = envelope { - if let Ok(Offset::Sequence(offset)) = handler.process(envelope).await { + let Offset::Sequence(offset) = envelope.offset(); + if handler.process(envelope).await.is_ok() { storable_state.last_offset = Some(offset); if streambed_storage::save_struct( state_storage_path, @@ -79,7 +81,6 @@ pub async fn run( .await.is_err() { error!("Cannot persist offsets"); } - } } } diff --git a/akka-projection-rs/src/lib.rs b/akka-projection-rs/src/lib.rs index bd88895..608c7ce 100644 --- a/akka-projection-rs/src/lib.rs +++ b/akka-projection-rs/src/lib.rs @@ -4,22 +4,10 @@ use std::pin::Pin; -use akka_persistence_rs::EntityType; +use akka_persistence_rs::{EntityType, Offset}; use async_trait::async_trait; use tokio_stream::Stream; -pub enum Offset { - /// Corresponds to an ordered sequence number for the events. Note that the corresponding - /// offset of each event is provided in an Envelope, - /// which makes it possible to resume the stream at a later point from a given offset. - /// - /// The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included - /// in the returned stream. This means that you can use the offset that is returned in an `Envelope` - /// as the `offset` parameter in a subsequent query. - /// - Sequence(u64), -} - /// Errors for event processing by a handler. pub struct HandlerError; @@ -30,8 +18,7 @@ pub trait Handler { type Envelope; /// Process an envelope. - /// An offset will be persisted given a successful result. - async fn process(&self, envelope: Self::Envelope) -> Result; + async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError>; } /// Errors for event processing by a handler. diff --git a/examples/iot-service/src/registration_projection.rs b/examples/iot-service/src/registration_projection.rs index 9152c35..57bac62 100644 --- a/examples/iot-service/src/registration_projection.rs +++ b/examples/iot-service/src/registration_projection.rs @@ -5,7 +5,7 @@ use std::{path::PathBuf, sync::Arc}; use akka_persistence_rs::Message; use akka_persistence_rs_commitlog::EventEnvelope; -use akka_projection_rs::{Handler, HandlerError, Offset}; +use akka_projection_rs::{Handler, HandlerError}; use akka_projection_rs_commitlog::CommitLogSourceProvider; use akka_projection_rs_storage::Command; use async_trait::async_trait; @@ -29,7 +29,7 @@ pub struct RegistrationHandler { impl Handler for RegistrationHandler { type Envelope = EventEnvelope; - async fn process(&self, envelope: Self::Envelope) -> Result { + async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError> { let registration::Event::Registered { secret } = envelope.event; self.temperature_sender .send(Message::new( @@ -37,7 +37,7 @@ impl Handler for RegistrationHandler { temperature::Command::Register { secret }, )) .await - .map(|_| Offset::Sequence(envelope.offset)) + .map(|_| ()) .map_err(|_| HandlerError) } } From ec06b31ca41aed941eabd1486d7a71d0f6ba4b99 Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 24 Aug 2023 12:05:08 +1000 Subject: [PATCH 3/4] Re-instate the projection source method Re-instated the source method of provider and made the slices query method private to the provider implementation. The query method now also filters projection ids so that they are within the slice range. EntityType remains distinct from a Topic name, but they can be the same in our example. Topics can be namespaced using a ':' delimiter so they should remain their own thing. --- akka-persistence-rs/src/lib.rs | 90 +++++++++---------- akka-projection-rs-commitlog/src/lib.rs | 78 ++++++++++------ akka-projection-rs-grpc/src/consumer.rs | 17 ++-- akka-projection-rs-storage/src/lib.rs | 38 ++++---- akka-projection-rs/src/lib.rs | 66 +++++++------- .../src/registration_projection.rs | 11 ++- 6 files changed, 166 insertions(+), 134 deletions(-) diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index 7978984..d06755c 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -16,6 +16,43 @@ pub type EntityType = smol_str::SmolStr; /// Uniquely identifies an entity, or entity instance. pub type EntityId = smol_str::SmolStr; +/// A slice is deterministically defined based on the persistence id. +/// `NUMBER_OF_SLICES` is not configurable because changing the value would result in +/// different slice for a persistence id than what was used before, which would +/// result in invalid events_by_slices call on a source provider. +pub const NUMBER_OF_SLICES: u32 = 1024; + +/// Split the total number of slices into ranges by the given `number_of_ranges`. +/// For example, `NUMBER_OF_SLICES` is 1024 and given 4 `number_of_ranges` this method will +/// return ranges (0 to 255), (256 to 511), (512 to 767) and (768 to 1023). +pub fn slice_ranges(number_of_ranges: u32) -> Vec> { + let range_size = NUMBER_OF_SLICES / number_of_ranges; + assert!( + number_of_ranges * range_size == NUMBER_OF_SLICES, + "number_of_ranges must be a whole number divisor of numberOfSlices." + ); + let mut ranges = Vec::with_capacity(number_of_ranges as usize); + for i in 0..number_of_ranges { + ranges.push(i * range_size..i * range_size + range_size) + } + ranges +} + +// Implementation of the JDK8 string hashcode: +// https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#hashCode +fn jdk_string_hashcode(s: &str) -> i32 { + let mut hash = Wrapping(0i32); + const MULTIPLIER: Wrapping = Wrapping(31); + let count = s.len(); + if count > 0 { + let mut chars = s.chars(); + for _ in 0..count { + hash = hash * MULTIPLIER + Wrapping(chars.next().unwrap() as i32); + } + } + hash.0 +} + /// A namespaced entity id given an entity type. pub struct PersistenceId { pub entity_type: EntityType, @@ -29,6 +66,10 @@ impl PersistenceId { entity_id, } } + + pub fn slice(&self) -> u32 { + (jdk_string_hashcode(&self.to_string()) % NUMBER_OF_SLICES as i32).unsigned_abs() + } } impl Display for PersistenceId { @@ -75,50 +116,6 @@ pub trait WithOffset { fn offset(&self) -> Offset; } -/// A slice is deterministically defined based on the persistence id. -/// `NUMBER_OF_SLICES` is not configurable because changing the value would result in -/// different slice for a persistence id than what was used before, which would -/// result in invalid events_by_slices call on a source provider. -pub const NUMBER_OF_SLICES: u32 = 1024; - -/// A slice is deterministically defined based on the persistence id. The purpose is to -/// evenly distribute all persistence ids over the slices and be able to query the -/// events for a range of slices. -pub fn slice_for_persistence_id(persistence_id: &PersistenceId) -> u32 { - (jdk_string_hashcode(&persistence_id.to_string()) % NUMBER_OF_SLICES as i32).unsigned_abs() -} - -/// Split the total number of slices into ranges by the given `number_of_ranges`. -/// For example, `NUMBER_OF_SLICES` is 1024 and given 4 `number_of_ranges` this method will -/// return ranges (0 to 255), (256 to 511), (512 to 767) and (768 to 1023). -pub fn slice_ranges(number_of_ranges: u32) -> Vec> { - let range_size = NUMBER_OF_SLICES / number_of_ranges; - assert!( - number_of_ranges * range_size == NUMBER_OF_SLICES, - "number_of_ranges must be a whole number divisor of numberOfSlices." - ); - let mut ranges = Vec::with_capacity(number_of_ranges as usize); - for i in 0..number_of_ranges { - ranges.push(i * range_size..i * range_size + range_size) - } - ranges -} - -// Implementation of the JDK8 string hashcode: -// https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#hashCode -fn jdk_string_hashcode(s: &str) -> i32 { - let mut hash = Wrapping(0i32); - const MULTIPLIER: Wrapping = Wrapping(31); - let count = s.len(); - if count > 0 { - let mut chars = s.chars(); - for _ in 0..count { - hash = hash * MULTIPLIER + Wrapping(chars.next().unwrap() as i32); - } - } - hash.0 -} - #[cfg(test)] mod tests { use smol_str::SmolStr; @@ -135,10 +132,11 @@ mod tests { #[test] fn test_slice_for_persistence_id() { assert_eq!( - slice_for_persistence_id(&PersistenceId::new( + PersistenceId::new( SmolStr::from("some-entity-type"), SmolStr::from("some-entity-id") - )), + ) + .slice(), 451 ); } diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index 3ccf938..3ec0e1d 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -1,8 +1,8 @@ #![doc = include_str!("../README.md")] -use std::{marker::PhantomData, pin::Pin}; +use std::{future::Future, marker::PhantomData, ops::Range, pin::Pin}; -use akka_persistence_rs::{EntityType, Offset}; +use akka_persistence_rs::{EntityType, Offset, PersistenceId}; use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, EventEnvelope}; use akka_projection_rs::SourceProvider; use async_stream::stream; @@ -11,45 +11,46 @@ use serde::{de::DeserializeOwned, Serialize}; use streambed::commit_log::{CommitLog, ConsumerOffset, Subscription, Topic}; use tokio_stream::{Stream, StreamExt}; -/// Source events for a projection from a Streambed commit log. +/// Source events for given slices from a Streambed commit log. pub struct CommitLogSourceProvider { commit_log: CL, consumer_group_name: String, + entity_type: EntityType, marshaler: M, + slice_range: Range, topic: Topic, phantom: PhantomData, } -impl CommitLogSourceProvider { - pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: Topic) -> Self { +impl CommitLogSourceProvider +where + CL: CommitLog, + M: CommitLogEventEnvelopeMarshaler + Sync, + for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, +{ + pub fn new( + commit_log: CL, + marshaler: M, + slice_range: Range, + consumer_group_name: &str, + topic: Topic, + entity_type: EntityType, + ) -> Self { Self { commit_log, consumer_group_name: consumer_group_name.into(), marshaler, + slice_range, topic, + entity_type, phantom: PhantomData, } } -} - -#[async_trait] -impl SourceProvider for CommitLogSourceProvider -where - CL: CommitLog, - M: CommitLogEventEnvelopeMarshaler + Sync, - for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, -{ - type Envelope = EventEnvelope; async fn events_by_slices( &self, - // The entity type means nothing in this context as we have a topic. - _entity_type: EntityType, - // Slices are not used with the commit log - _min_slice: u32, - _max_slice: u32, offset: Option, - ) -> Pin> + Send + 'async_trait>> { + ) -> Pin> + Send + '_>> { let offsets = if let Some(Offset::Sequence(offset)) = offset { vec![ConsumerOffset { partition: 0, @@ -76,13 +77,40 @@ where Box::pin(stream!({ while let Some(consumer_record) = records.next().await { if let Some(record_entity_id) = M::to_entity_id(&consumer_record) { - if let Some(envelope) = - marshaler.envelope(record_entity_id, consumer_record).await - { - yield envelope; + let persistence_id = + PersistenceId::new(self.entity_type.clone(), record_entity_id); + if self.slice_range.contains(&persistence_id.slice()) { + if let Some(envelope) = marshaler + .envelope(persistence_id.entity_id, consumer_record) + .await + { + yield envelope; + } } } } })) } } + +#[async_trait] +impl SourceProvider for CommitLogSourceProvider +where + CL: CommitLog, + M: CommitLogEventEnvelopeMarshaler + Send + Sync, + for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, +{ + type Envelope = EventEnvelope; + + async fn source( + &self, + offset: F, + ) -> Pin + Send + 'async_trait>> + where + F: FnOnce() -> FR + Send, + FR: Future> + Send, + { + let offset = offset().await; + self.events_by_slices(offset).await + } +} diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index e4a98da..6bddfdc 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -1,6 +1,6 @@ -use std::{marker::PhantomData, pin::Pin}; +use std::{future::Future, marker::PhantomData, pin::Pin}; -use akka_persistence_rs::{EntityType, Offset}; +use akka_persistence_rs::Offset; use akka_projection_rs::SourceProvider; use async_trait::async_trait; use tokio_stream::Stream; @@ -18,13 +18,14 @@ where { type Envelope = EventEnvelope; - async fn events_by_slices( + async fn source( &self, - _entity_type: EntityType, - _min_slice: u32, - _max_slice: u32, - _offset: Option, - ) -> Pin> + Send + 'async_trait>> { + _offset: F, + ) -> Pin + Send + 'async_trait>> + where + F: FnOnce() -> FR + Send, + FR: Future> + Send, + { todo!() } } diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index ecf68e7..bc56422 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -2,7 +2,7 @@ use std::path::Path; -use akka_persistence_rs::{EntityType, Offset, WithOffset}; +use akka_persistence_rs::{Offset, WithOffset}; use akka_projection_rs::{Handler, SourceProvider}; use log::error; use serde::{Deserialize, Serialize}; @@ -34,28 +34,22 @@ pub async fn run( FSP: FnMut(u32) -> Option, SP: SourceProvider, { - // Source our last offset recorded, if we have one. - - let mut storable_state = - streambed_storage::load_struct(state_storage_path, secret_store, secret_path, |bytes| { - ciborium::de::from_reader::(bytes) - }) - .await - .unwrap_or_default(); - // For now, we're going to produce a source provider for just one slice. // When we implement the gRPC consumer, we will likely have to do more. if let Some(source_provider) = source_provider(0) { - // FIXME what to provide for entity type. - // FIXME what to provide for slice min/max. let mut source = source_provider - .events_by_slices( - EntityType::from(""), - 0, - 0, - storable_state.last_offset.map(Offset::Sequence), - ) + .source(|| async { + streambed_storage::load_struct( + state_storage_path, + secret_store, + secret_path, + |bytes| ciborium::de::from_reader::(bytes), + ) + .await + .ok() + .and_then(|s| s.last_offset.map(Offset::Sequence)) + }) .await; let serializer = |state: &StorableState| { @@ -69,7 +63,10 @@ pub async fn run( if let Some(envelope) = envelope { let Offset::Sequence(offset) = envelope.offset(); if handler.process(envelope).await.is_ok() { - storable_state.last_offset = Some(offset); + // FIXME: Make this a periodic task + let storable_state = StorableState { + last_offset: Some(offset) + }; if streambed_storage::save_struct( state_storage_path, secret_store, @@ -77,8 +74,7 @@ pub async fn run( serializer, rand::thread_rng, &storable_state - ) - .await.is_err() { + ).await.is_err() { error!("Cannot persist offsets"); } } diff --git a/akka-projection-rs/src/lib.rs b/akka-projection-rs/src/lib.rs index 608c7ce..0d395a1 100644 --- a/akka-projection-rs/src/lib.rs +++ b/akka-projection-rs/src/lib.rs @@ -2,9 +2,9 @@ //! Each envelope is associated with an offset representing the position in the stream. This offset is used for resuming //! the stream from that position when the projection is restarted. -use std::pin::Pin; +use std::{future::Future, pin::Pin}; -use akka_persistence_rs::{EntityType, Offset}; +use akka_persistence_rs::Offset; use async_trait::async_trait; use tokio_stream::Stream; @@ -24,40 +24,42 @@ pub trait Handler { /// Errors for event processing by a handler. pub struct SourceProviderError; -/// Provides a source of envelopes. +/// Provides a source of envelopes using slices as a query. +/// +/// A slice is deterministically defined based on the persistence id. The purpose is to +/// evenly distribute all persistence ids over the slices. +/// +/// The consumer can keep track of its current position in the event stream by storing the `offset` and restart the +/// query from a given `offset` after a crash/restart. +/// +/// The exact meaning of the `offset` depends on the journal and must be documented by it. It may +/// be a sequential id number that uniquely identifies the position of each event within the event stream. Distributed +/// data stores cannot easily support those semantics and they may use a weaker meaning. For example it may be a +/// timestamp (taken when the event was created or stored). Timestamps are not unique and not strictly ordered, since +/// clocks on different machines may not be synchronized. +/// +/// In strongly consistent stores, where the `offset` is unique and strictly ordered, the stream should start from the +/// next event after the `offset`. Otherwise, the read journal should ensure that between an invocation that returned +/// an event with the given `offset`, and this invocation, no events are missed. Depending on the journal +/// implementation, this may mean that this invocation will return events that were already returned by the previous +/// invocation, including the event with the passed in `offset`. +/// +/// The returned event stream should be ordered by `offset` if possible, but this can also be difficult to fulfill for +/// a distributed data store. The order must be documented by the journal implementation. +/// +/// The stream is not completed when it reaches the end of the currently stored events, but it continues to push new +/// events when new events are persisted. #[async_trait] pub trait SourceProvider { /// The envelope processed by the provider. type Envelope; - /// Query events for given slices. A slice is deterministically defined based on the persistence id. The purpose is to - /// evenly distribute all persistence ids over the slices. - /// - /// The consumer can keep track of its current position in the event stream by storing the `offset` and restart the - /// query from a given `offset` after a crash/restart. - /// - /// The exact meaning of the `offset` depends on the journal and must be documented by it. It may - /// be a sequential id number that uniquely identifies the position of each event within the event stream. Distributed - /// data stores cannot easily support those semantics and they may use a weaker meaning. For example it may be a - /// timestamp (taken when the event was created or stored). Timestamps are not unique and not strictly ordered, since - /// clocks on different machines may not be synchronized. - /// - /// In strongly consistent stores, where the `offset` is unique and strictly ordered, the stream should start from the - /// next event after the `offset`. Otherwise, the read journal should ensure that between an invocation that returned - /// an event with the given `offset`, and this invocation, no events are missed. Depending on the journal - /// implementation, this may mean that this invocation will return events that were already returned by the previous - /// invocation, including the event with the passed in `offset`. - /// - /// The returned event stream should be ordered by `offset` if possible, but this can also be difficult to fulfill for - /// a distributed data store. The order must be documented by the journal implementation. - /// - /// The stream is not completed when it reaches the end of the currently stored events, but it continues to push new - /// events when new events are persisted. - async fn events_by_slices( + /// Given a closure that returns an offset, source envelopes. + async fn source( &self, - entity_type: EntityType, - min_slice: u32, - max_slice: u32, - offset: Option, - ) -> Pin + Send + 'async_trait>>; + offset: F, + ) -> Pin + Send + 'async_trait>> + where + F: FnOnce() -> FR + Send, + FR: Future> + Send; } diff --git a/examples/iot-service/src/registration_projection.rs b/examples/iot-service/src/registration_projection.rs index 57bac62..62125dc 100644 --- a/examples/iot-service/src/registration_projection.rs +++ b/examples/iot-service/src/registration_projection.rs @@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc}; -use akka_persistence_rs::Message; +use akka_persistence_rs::{EntityType, Message}; use akka_persistence_rs_commitlog::EventEnvelope; use akka_projection_rs::{Handler, HandlerError}; use akka_projection_rs_commitlog::CommitLogSourceProvider; @@ -54,16 +54,23 @@ pub async fn task( ) { let events_key_secret_path: Arc = Arc::from(events_key_secret_path); + // When it comes to having a projection sourced from a local + // commit log, there's little benefit if having many of them. + // We therefore manage all slices from just one projection. + let slice_ranges = akka_persistence_rs::slice_ranges(1); + // A closure to establish our source of events as a commit log. - let source_provider = |_slice| { + let source_provider = |slice| { Some(CommitLogSourceProvider::new( commit_log.clone(), EventEnvelopeMarshaler { events_key_secret_path: events_key_secret_path.clone(), secret_store: secret_store.clone(), }, + slice_ranges.get(slice as usize).cloned()?, "iot-service-projection", Topic::from(registration::EVENTS_TOPIC), + EntityType::from(registration::EVENTS_TOPIC), )) }; From 29246a483e26eaab4d645a7864720937cdce6d7c Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 24 Aug 2023 12:35:01 +1000 Subject: [PATCH 4/4] Type corrections --- akka-persistence-rs/src/lib.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index d06755c..35f5cb4 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -118,8 +118,6 @@ pub trait WithOffset { #[cfg(test)] mod tests { - use smol_str::SmolStr; - use super::*; #[test] @@ -133,8 +131,8 @@ mod tests { fn test_slice_for_persistence_id() { assert_eq!( PersistenceId::new( - SmolStr::from("some-entity-type"), - SmolStr::from("some-entity-id") + EntityType::from("some-entity-type"), + EntityId::from("some-entity-id") ) .slice(), 451