diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index fb2a197..8701dd8 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -1,6 +1,6 @@ #![doc = include_str!("../README.md")] -use std::{marker::PhantomData, pin::Pin}; +use std::{marker::PhantomData, ops::Range, pin::Pin}; use akka_persistence_rs::EntityType; use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, EventEnvelope}; @@ -14,16 +14,24 @@ pub struct CommitLogSourceProvider { _commit_log: CL, _consumer_group_name: String, _marshaler: M, + _slice_range: Range, _topic: Topic, phantom: PhantomData, } impl CommitLogSourceProvider { - pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: TopicRef) -> Self { + pub fn new( + commit_log: CL, + marshaler: M, + consumer_group_name: &str, + topic: TopicRef, + slice_range: Range, + ) -> Self { Self { _commit_log: commit_log, _consumer_group_name: consumer_group_name.into(), _marshaler: marshaler, + _slice_range: slice_range, _topic: topic.into(), phantom: PhantomData, } diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index 9773450..7f97272 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -9,9 +9,10 @@ pub enum Command { } /// Provides local file system based storage for projection offsets. -pub async fn run(_receiver: Receiver, _source_provider: SP, _handler: H) +pub async fn run(_receiver: Receiver, _source_provider: FSP, _handler: H) where H: Handler, + FSP: FnMut(u32) -> Option, SP: SourceProvider, { } diff --git a/examples/iot-service/src/registration_projection.rs b/examples/iot-service/src/registration_projection.rs index 649ddf5..3cf87fc 100644 --- a/examples/iot-service/src/registration_projection.rs +++ b/examples/iot-service/src/registration_projection.rs @@ -3,7 +3,7 @@ use std::sync::Arc; -use akka_persistence_rs::Message; +use akka_persistence_rs::{slice_ranges, Message}; use akka_persistence_rs_commitlog::EventEnvelope; use akka_projection_rs::{Handler, HandlerError}; use akka_projection_rs_commitlog::CommitLogSourceProvider; @@ -49,18 +49,32 @@ pub async fn task( receiver: mpsc::Receiver, temperature_sender: mpsc::Sender>, ) { - // Establish our source of events as a commit log. - let source_provider = CommitLogSourceProvider::new( - commit_log, - EventEnvelopeMarshaler { - events_key_secret_path: Arc::from(events_key_secret_path), - secret_store, - }, - "iot-service-projection", - registration::EVENTS_TOPIC, - ); + let events_key_secret_path: Arc = Arc::from(events_key_secret_path); + // When it comes to having a projection sourced from a local + // commit log, there's little benefit if having many of them. + // We therefore manage all slices from just one projection. + let slice_ranges = slice_ranges(1); + + // A closure to establish our source of events as a commit log. + let source_provider = move |slice| { + Some(CommitLogSourceProvider::new( + commit_log.clone(), + EventEnvelopeMarshaler { + events_key_secret_path: events_key_secret_path.clone(), + secret_store: secret_store.clone(), + }, + "iot-service-projection", + registration::EVENTS_TOPIC, + slice_ranges.get(slice as usize).cloned()?, + )) + }; + + // Declare a handler to forward projection events on to the temperature entity. let handler = RegistrationHandler { temperature_sender }; + // Finally, start up a projection that will use Streambed storage + // to remember the offset consumed. This then permits us to restart + // from a specific point in the source given restarts. akka_projection_rs_storage::run(receiver, source_provider, handler).await }