Skip to content

Commit

Permalink
Set up the source provider using slices
Browse files Browse the repository at this point in the history
Sets up the source provider using slices.
  • Loading branch information
huntc committed Aug 22, 2023
1 parent 8b595df commit 2a2a3e3
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
12 changes: 10 additions & 2 deletions akka-projection-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![doc = include_str!("../README.md")]

use std::{marker::PhantomData, pin::Pin};
use std::{marker::PhantomData, ops::Range, pin::Pin};

use akka_persistence_rs::EntityType;
use akka_persistence_rs_commitlog::{CommitLogEventEnvelopeMarshaler, EventEnvelope};
Expand All @@ -14,16 +14,24 @@ pub struct CommitLogSourceProvider<CL, E, M> {
_commit_log: CL,
_consumer_group_name: String,
_marshaler: M,
_slice_range: Range<u32>,
_topic: Topic,
phantom: PhantomData<E>,
}

impl<CL, E, M> CommitLogSourceProvider<CL, E, M> {
pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: TopicRef) -> Self {
pub fn new(
commit_log: CL,
marshaler: M,
consumer_group_name: &str,
topic: TopicRef,
slice_range: Range<u32>,
) -> Self {
Self {
_commit_log: commit_log,
_consumer_group_name: consumer_group_name.into(),
_marshaler: marshaler,
_slice_range: slice_range,
_topic: topic.into(),
phantom: PhantomData,
}
Expand Down
3 changes: 2 additions & 1 deletion akka-projection-rs-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ pub enum Command {
}

/// Provides local file system based storage for projection offsets.
pub async fn run<E, H, SP>(_receiver: Receiver<Command>, _source_provider: SP, _handler: H)
pub async fn run<E, FSP, H, SP>(_receiver: Receiver<Command>, _source_provider: FSP, _handler: H)
where
H: Handler<Envelope = E>,
FSP: FnMut(u32) -> Option<SP>,
SP: SourceProvider<Envelope = E>,
{
}
36 changes: 25 additions & 11 deletions examples/iot-service/src/registration_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::sync::Arc;

use akka_persistence_rs::Message;
use akka_persistence_rs::{slice_ranges, Message};
use akka_persistence_rs_commitlog::EventEnvelope;
use akka_projection_rs::{Handler, HandlerError};
use akka_projection_rs_commitlog::CommitLogSourceProvider;
Expand Down Expand Up @@ -49,18 +49,32 @@ pub async fn task(
receiver: mpsc::Receiver<Command>,
temperature_sender: mpsc::Sender<Message<temperature::Command>>,
) {
// Establish our source of events as a commit log.
let source_provider = CommitLogSourceProvider::new(
commit_log,
EventEnvelopeMarshaler {
events_key_secret_path: Arc::from(events_key_secret_path),
secret_store,
},
"iot-service-projection",
registration::EVENTS_TOPIC,
);
let events_key_secret_path: Arc<str> = Arc::from(events_key_secret_path);

// When it comes to having a projection sourced from a local
// commit log, there's little benefit if having many of them.
// We therefore manage all slices from just one projection.
let slice_ranges = slice_ranges(1);

// A closure to establish our source of events as a commit log.
let source_provider = move |slice| {
Some(CommitLogSourceProvider::new(
commit_log.clone(),
EventEnvelopeMarshaler {
events_key_secret_path: events_key_secret_path.clone(),
secret_store: secret_store.clone(),
},
"iot-service-projection",
registration::EVENTS_TOPIC,
slice_ranges.get(slice as usize).cloned()?,
))
};

// Declare a handler to forward projection events on to the temperature entity.
let handler = RegistrationHandler { temperature_sender };

// Finally, start up a projection that will use Streambed storage
// to remember the offset consumed. This then permits us to restart
// from a specific point in the source given restarts.
akka_projection_rs_storage::run(receiver, source_provider, handler).await
}

0 comments on commit 2a2a3e3

Please sign in to comment.