From ff93e315ca5577d1234b906733af528a4280f219 Mon Sep 17 00:00:00 2001 From: huntc Date: Wed, 15 Nov 2023 13:59:33 +1100 Subject: [PATCH 1/3] Optional offset store This commit permits a projection consumer to optionally receive an offset store. Where it is not supplied, the source is requested to provided all of the events that it has. --- akka-projection-rs/benches/benches.rs | 9 +- akka-projection-rs/src/consumer.rs | 147 ++++++++++++++------------ 2 files changed, 85 insertions(+), 71 deletions(-) diff --git a/akka-projection-rs/benches/benches.rs b/akka-projection-rs/benches/benches.rs index b434add..8a5685b 100644 --- a/akka-projection-rs/benches/benches.rs +++ b/akka-projection-rs/benches/benches.rs @@ -10,7 +10,7 @@ use akka_projection_rs::{ use async_stream::stream; use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; -use tokio::sync::{mpsc, Notify}; +use tokio::sync::Notify; use tokio_stream::Stream; const NUM_EVENTS: usize = 10_000; @@ -106,18 +106,15 @@ fn criterion_benchmark(c: &mut Criterion) { .unwrap(); let events_processed = Arc::new(Notify::new()); - let (offset_store, mut offset_store_receiver) = mpsc::channel(1); - let offset_store_task = - async move { while let Some(_) = offset_store_receiver.recv().await {} }; let (projection_task, _kill_switch) = consumer::task( - offset_store, + None, TestSourceProvider, TestHandler { events_processed: events_processed.clone(), }, ); - let _ = rt.spawn(async move { tokio::join!(offset_store_task, projection_task) }); + let _ = rt.spawn(projection_task); b.to_async(&rt).iter(|| { let task_events_processed = events_processed.clone(); diff --git a/akka-projection-rs/src/consumer.rs b/akka-projection-rs/src/consumer.rs index 445e7ce..a2f0bb3 100644 --- a/akka-projection-rs/src/consumer.rs +++ b/akka-projection-rs/src/consumer.rs @@ -26,7 +26,7 @@ struct StorableState { /// meaning, for multiple runs of a projection, it is possible for events to repeat /// from previous runs. pub fn task( - offset_store: mpsc::Sender, + offset_store: Option>, source_provider: SP, handler: IH, ) -> (impl Future, oneshot::Sender<()>) @@ -49,13 +49,20 @@ where 'outer: loop { let mut source = source_provider - .source(|| async { - let (reply_to, reply_to_receiver) = oneshot::channel(); - offset_store - .send(offset_store::Command::GetLastOffset { reply_to }) - .await - .ok()?; - reply_to_receiver.await.ok()? + .source(|| { + let offset_store = offset_store.clone(); + async { + if let Some(offset_store) = offset_store { + let (reply_to, reply_to_receiver) = oneshot::channel(); + offset_store + .send(offset_store::Command::GetLastOffset { reply_to }) + .await + .ok()?; + reply_to_receiver.await.ok()? + } else { + None + } + } }) .await; @@ -75,60 +82,64 @@ where // Validate timestamp offsets if we have one. let envelope = if matches!(offset, Offset::Timestamp(_)) { - // Process the sequence number. If it isn't what we expect then we go round again. + if let Some(offset_store) = &offset_store { + // Process the sequence number. If it isn't what we expect then we go round again. - let seq_nr = envelope.seq_nr(); + let seq_nr = envelope.seq_nr(); - let (reply_to, reply_to_receiver) = oneshot::channel(); - if offset_store - .send(offset_store::Command::GetOffset { persistence_id: persistence_id.clone(), reply_to }) - .await - .is_err() - { - warn!("Cannot send to the offset store: {}. Aborting stream.", persistence_id); - break; - } - - let next_seq_nr = if let Ok(offset) = reply_to_receiver.await { - if let Some(Offset::Timestamp(TimestampOffset { seq_nr, .. })) = offset { - seq_nr.wrapping_add(1) - } else { - 1 + let (reply_to, reply_to_receiver) = oneshot::channel(); + if offset_store + .send(offset_store::Command::GetOffset { persistence_id: persistence_id.clone(), reply_to }) + .await + .is_err() + { + warn!("Cannot send to the offset store: {}. Aborting stream.", persistence_id); + break; } - } else { - warn!("Cannot receive from the offset store: {}. Aborting stream.", persistence_id); - break - }; - let source = envelope.source(); + let next_seq_nr = if let Ok(offset) = reply_to_receiver.await { + if let Some(Offset::Timestamp(TimestampOffset { seq_nr, .. })) = offset { + seq_nr.wrapping_add(1) + } else { + 1 + } + } else { + warn!("Cannot receive from the offset store: {}. Aborting stream.", persistence_id); + break + }; - if seq_nr > next_seq_nr && envelope.source() == Source::Backtrack { - // This shouldn't happen, if so then abort. - warn!("Back track received for a future event: {}. Aborting stream.", persistence_id); - break; - } else if seq_nr != next_seq_nr { - // Duplicate or gap - continue; - } + let source = envelope.source(); - // If the sequence number is what we expect and the producer is backtracking, then - // request its payload. If we can't get its payload then we abort as it is an error. + if seq_nr > next_seq_nr && envelope.source() == Source::Backtrack { + // This shouldn't happen, if so then abort. + warn!("Back track received for a future event: {}. Aborting stream.", persistence_id); + break; + } else if seq_nr != next_seq_nr { + // Duplicate or gap + continue; + } - let resolved_envelope = if source == Source::Backtrack { - if let Some(event) = source_provider.load_envelope(persistence_id.clone(), seq_nr) - .await - { - Some(event) + // If the sequence number is what we expect and the producer is backtracking, then + // request its payload. If we can't get its payload then we abort as it is an error. + + let resolved_envelope = if source == Source::Backtrack { + if let Some(event) = source_provider.load_envelope(persistence_id.clone(), seq_nr) + .await + { + Some(event) + } else { + warn!("Cannot obtain an backtrack envelope: {}. Aborting stream.", persistence_id); + None + } } else { - warn!("Cannot obtain an backtrack envelope: {}. Aborting stream.", persistence_id); - None - } - } else { - Some(envelope) - }; + Some(envelope) + }; - let Some(envelope) = resolved_envelope else { break; }; - envelope + let Some(envelope) = resolved_envelope else { break; }; + envelope + } else { + envelope + } } else { envelope }; @@ -142,12 +153,14 @@ where break; } } - if offset_store - .send(offset_store::Command::SaveOffset { persistence_id, offset }) - .await - .is_err() - { - break; + if let Some(offset_store) = &offset_store { + if offset_store + .send(offset_store::Command::SaveOffset { persistence_id, offset }) + .await + .is_err() + { + break; + } } } Handlers::Pending(handler, _) => { @@ -180,13 +193,17 @@ where active_source = &mut source; // If all is well with our pending future so we can finally cause the offset to be persisted. - if pending.is_err() - || offset_store + if pending.is_err() { + break; + } + + if let Some(offset_store) = &offset_store { + if offset_store .send(offset_store::Command::SaveOffset { persistence_id, offset }) .await - .is_err() - { - break; + .is_err() { + break; + } } } @@ -401,7 +418,7 @@ mod tests { let (offset_store, mut offset_store_receiver) = mpsc::channel(1); let task_persistence_id = persistence_id.clone(); let (projection_task, _kill_switch) = task( - offset_store, + Some(offset_store), MySourceProvider { persistence_id: task_persistence_id.clone(), event_value: event_value.clone(), From 3f4eae6ff3076ea865728ffed425dbdd7fc22ada Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 16 Nov 2023 09:31:31 +1100 Subject: [PATCH 2/3] Revert "Optional offset store" This reverts commit ff93e315ca5577d1234b906733af528a4280f219. --- akka-projection-rs/benches/benches.rs | 9 +- akka-projection-rs/src/consumer.rs | 147 ++++++++++++-------------- 2 files changed, 71 insertions(+), 85 deletions(-) diff --git a/akka-projection-rs/benches/benches.rs b/akka-projection-rs/benches/benches.rs index 8a5685b..b434add 100644 --- a/akka-projection-rs/benches/benches.rs +++ b/akka-projection-rs/benches/benches.rs @@ -10,7 +10,7 @@ use akka_projection_rs::{ use async_stream::stream; use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; -use tokio::sync::Notify; +use tokio::sync::{mpsc, Notify}; use tokio_stream::Stream; const NUM_EVENTS: usize = 10_000; @@ -106,15 +106,18 @@ fn criterion_benchmark(c: &mut Criterion) { .unwrap(); let events_processed = Arc::new(Notify::new()); + let (offset_store, mut offset_store_receiver) = mpsc::channel(1); + let offset_store_task = + async move { while let Some(_) = offset_store_receiver.recv().await {} }; let (projection_task, _kill_switch) = consumer::task( - None, + offset_store, TestSourceProvider, TestHandler { events_processed: events_processed.clone(), }, ); - let _ = rt.spawn(projection_task); + let _ = rt.spawn(async move { tokio::join!(offset_store_task, projection_task) }); b.to_async(&rt).iter(|| { let task_events_processed = events_processed.clone(); diff --git a/akka-projection-rs/src/consumer.rs b/akka-projection-rs/src/consumer.rs index a2f0bb3..445e7ce 100644 --- a/akka-projection-rs/src/consumer.rs +++ b/akka-projection-rs/src/consumer.rs @@ -26,7 +26,7 @@ struct StorableState { /// meaning, for multiple runs of a projection, it is possible for events to repeat /// from previous runs. pub fn task( - offset_store: Option>, + offset_store: mpsc::Sender, source_provider: SP, handler: IH, ) -> (impl Future, oneshot::Sender<()>) @@ -49,20 +49,13 @@ where 'outer: loop { let mut source = source_provider - .source(|| { - let offset_store = offset_store.clone(); - async { - if let Some(offset_store) = offset_store { - let (reply_to, reply_to_receiver) = oneshot::channel(); - offset_store - .send(offset_store::Command::GetLastOffset { reply_to }) - .await - .ok()?; - reply_to_receiver.await.ok()? - } else { - None - } - } + .source(|| async { + let (reply_to, reply_to_receiver) = oneshot::channel(); + offset_store + .send(offset_store::Command::GetLastOffset { reply_to }) + .await + .ok()?; + reply_to_receiver.await.ok()? }) .await; @@ -82,64 +75,60 @@ where // Validate timestamp offsets if we have one. let envelope = if matches!(offset, Offset::Timestamp(_)) { - if let Some(offset_store) = &offset_store { - // Process the sequence number. If it isn't what we expect then we go round again. + // Process the sequence number. If it isn't what we expect then we go round again. - let seq_nr = envelope.seq_nr(); + let seq_nr = envelope.seq_nr(); - let (reply_to, reply_to_receiver) = oneshot::channel(); - if offset_store - .send(offset_store::Command::GetOffset { persistence_id: persistence_id.clone(), reply_to }) - .await - .is_err() - { - warn!("Cannot send to the offset store: {}. Aborting stream.", persistence_id); - break; - } + let (reply_to, reply_to_receiver) = oneshot::channel(); + if offset_store + .send(offset_store::Command::GetOffset { persistence_id: persistence_id.clone(), reply_to }) + .await + .is_err() + { + warn!("Cannot send to the offset store: {}. Aborting stream.", persistence_id); + break; + } - let next_seq_nr = if let Ok(offset) = reply_to_receiver.await { - if let Some(Offset::Timestamp(TimestampOffset { seq_nr, .. })) = offset { - seq_nr.wrapping_add(1) - } else { - 1 - } + let next_seq_nr = if let Ok(offset) = reply_to_receiver.await { + if let Some(Offset::Timestamp(TimestampOffset { seq_nr, .. })) = offset { + seq_nr.wrapping_add(1) } else { - warn!("Cannot receive from the offset store: {}. Aborting stream.", persistence_id); - break - }; + 1 + } + } else { + warn!("Cannot receive from the offset store: {}. Aborting stream.", persistence_id); + break + }; - let source = envelope.source(); + let source = envelope.source(); - if seq_nr > next_seq_nr && envelope.source() == Source::Backtrack { - // This shouldn't happen, if so then abort. - warn!("Back track received for a future event: {}. Aborting stream.", persistence_id); - break; - } else if seq_nr != next_seq_nr { - // Duplicate or gap - continue; - } + if seq_nr > next_seq_nr && envelope.source() == Source::Backtrack { + // This shouldn't happen, if so then abort. + warn!("Back track received for a future event: {}. Aborting stream.", persistence_id); + break; + } else if seq_nr != next_seq_nr { + // Duplicate or gap + continue; + } - // If the sequence number is what we expect and the producer is backtracking, then - // request its payload. If we can't get its payload then we abort as it is an error. - - let resolved_envelope = if source == Source::Backtrack { - if let Some(event) = source_provider.load_envelope(persistence_id.clone(), seq_nr) - .await - { - Some(event) - } else { - warn!("Cannot obtain an backtrack envelope: {}. Aborting stream.", persistence_id); - None - } - } else { - Some(envelope) - }; + // If the sequence number is what we expect and the producer is backtracking, then + // request its payload. If we can't get its payload then we abort as it is an error. - let Some(envelope) = resolved_envelope else { break; }; - envelope + let resolved_envelope = if source == Source::Backtrack { + if let Some(event) = source_provider.load_envelope(persistence_id.clone(), seq_nr) + .await + { + Some(event) + } else { + warn!("Cannot obtain an backtrack envelope: {}. Aborting stream.", persistence_id); + None + } } else { - envelope - } + Some(envelope) + }; + + let Some(envelope) = resolved_envelope else { break; }; + envelope } else { envelope }; @@ -153,14 +142,12 @@ where break; } } - if let Some(offset_store) = &offset_store { - if offset_store - .send(offset_store::Command::SaveOffset { persistence_id, offset }) - .await - .is_err() - { - break; - } + if offset_store + .send(offset_store::Command::SaveOffset { persistence_id, offset }) + .await + .is_err() + { + break; } } Handlers::Pending(handler, _) => { @@ -193,17 +180,13 @@ where active_source = &mut source; // If all is well with our pending future so we can finally cause the offset to be persisted. - if pending.is_err() { - break; - } - - if let Some(offset_store) = &offset_store { - if offset_store + if pending.is_err() + || offset_store .send(offset_store::Command::SaveOffset { persistence_id, offset }) .await - .is_err() { - break; - } + .is_err() + { + break; } } @@ -418,7 +401,7 @@ mod tests { let (offset_store, mut offset_store_receiver) = mpsc::channel(1); let task_persistence_id = persistence_id.clone(); let (projection_task, _kill_switch) = task( - Some(offset_store), + offset_store, MySourceProvider { persistence_id: task_persistence_id.clone(), event_value: event_value.clone(), From 68a079cd2900b58ae0451d3bdb4a326a90f51d58 Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 16 Nov 2023 12:11:12 +1100 Subject: [PATCH 3/3] Provides a volatile offset store ...and joins the offset store with the consumer for convenience. Also saves creating another task for the projection as these two things will always go hand-in-hand. --- .../src/offset_store.rs | 3 +- akka-projection-rs/benches/benches.rs | 11 +- akka-projection-rs/src/consumer.rs | 22 +++- akka-projection-rs/src/lib.rs | 1 + .../src/volatile_offset_store.rs | 115 ++++++++++++++++++ 5 files changed, 138 insertions(+), 14 deletions(-) create mode 100644 akka-projection-rs/src/volatile_offset_store.rs diff --git a/akka-projection-rs-commitlog/src/offset_store.rs b/akka-projection-rs-commitlog/src/offset_store.rs index 9f5046a..1ff4b40 100644 --- a/akka-projection-rs-commitlog/src/offset_store.rs +++ b/akka-projection-rs-commitlog/src/offset_store.rs @@ -367,11 +367,10 @@ pub fn task( #[cfg(test)] mod tests { - use std::{env, fs}; - use super::*; use akka_persistence_rs::TimestampOffset; + use std::{env, fs}; use test_log::test; #[test(tokio::test)] diff --git a/akka-projection-rs/benches/benches.rs b/akka-projection-rs/benches/benches.rs index b434add..3890b83 100644 --- a/akka-projection-rs/benches/benches.rs +++ b/akka-projection-rs/benches/benches.rs @@ -5,12 +5,13 @@ use akka_persistence_rs::{ WithSource, }; use akka_projection_rs::{ - consumer, offset_store::LastOffset, Handler, HandlerError, SourceProvider, + consumer, offset_store::LastOffset, volatile_offset_store, Handler, HandlerError, + SourceProvider, }; use async_stream::stream; use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; -use tokio::sync::{mpsc, Notify}; +use tokio::sync::Notify; use tokio_stream::Stream; const NUM_EVENTS: usize = 10_000; @@ -106,9 +107,7 @@ fn criterion_benchmark(c: &mut Criterion) { .unwrap(); let events_processed = Arc::new(Notify::new()); - let (offset_store, mut offset_store_receiver) = mpsc::channel(1); - let offset_store_task = - async move { while let Some(_) = offset_store_receiver.recv().await {} }; + let offset_store = volatile_offset_store::task(1); let (projection_task, _kill_switch) = consumer::task( offset_store, TestSourceProvider, @@ -117,7 +116,7 @@ fn criterion_benchmark(c: &mut Criterion) { }, ); - let _ = rt.spawn(async move { tokio::join!(offset_store_task, projection_task) }); + let _ = rt.spawn(projection_task); b.to_async(&rt).iter(|| { let task_events_processed = events_processed.clone(); diff --git a/akka-projection-rs/src/consumer.rs b/akka-projection-rs/src/consumer.rs index 445e7ce..ff98af0 100644 --- a/akka-projection-rs/src/consumer.rs +++ b/akka-projection-rs/src/consumer.rs @@ -1,6 +1,6 @@ #![doc = include_str!("../README.md")] -use std::{collections::VecDeque, pin::Pin}; +use std::{collections::VecDeque, io, pin::Pin}; use crate::{ offset_store::{self}, @@ -26,10 +26,13 @@ struct StorableState { /// meaning, for multiple runs of a projection, it is possible for events to repeat /// from previous runs. pub fn task( - offset_store: mpsc::Sender, + offset_store: ( + impl Future>, + mpsc::Sender, + ), source_provider: SP, handler: IH, -) -> (impl Future, oneshot::Sender<()>) +) -> (impl Future>, oneshot::Sender<()>) where A: Handler + Send, B: PendingHandler + Send, @@ -40,7 +43,9 @@ where { let (kill_switch, mut kill_switch_receiver) = oneshot::channel(); - let task = async move { + let (offset_store_task, offset_store) = offset_store; + + let projection_task = async move { let mut handler = handler.into(); let mut always_pending_handler: Pin< @@ -203,7 +208,12 @@ where } }; - (task, kill_switch) + let combined_task = async { + let (_, r) = tokio::join!(projection_task, offset_store_task); + r + }; + + (combined_task, kill_switch) } #[cfg(test)] @@ -401,7 +411,7 @@ mod tests { let (offset_store, mut offset_store_receiver) = mpsc::channel(1); let task_persistence_id = persistence_id.clone(); let (projection_task, _kill_switch) = task( - offset_store, + (future::pending(), offset_store), MySourceProvider { persistence_id: task_persistence_id.clone(), event_value: event_value.clone(), diff --git a/akka-projection-rs/src/lib.rs b/akka-projection-rs/src/lib.rs index 438682c..d332a71 100644 --- a/akka-projection-rs/src/lib.rs +++ b/akka-projection-rs/src/lib.rs @@ -10,6 +10,7 @@ use tokio_stream::Stream; pub mod consumer; pub mod consumer_filter; pub mod offset_store; +pub mod volatile_offset_store; /// Captures the various types of handlers and the way they are performed. pub enum Handlers diff --git a/akka-projection-rs/src/volatile_offset_store.rs b/akka-projection-rs/src/volatile_offset_store.rs new file mode 100644 index 0000000..1977601 --- /dev/null +++ b/akka-projection-rs/src/volatile_offset_store.rs @@ -0,0 +1,115 @@ +//! A volatile offset store is provided for situations where +//! events are always sourced events from their earliest offset. +//! An example use case is when events are queried over HTTP from +//! a web browser that does not retain the offset where it is up to. +//! +//! All offset data for a given persistence id is retained in +//! memory. + +use std::{collections::HashMap, io}; + +use futures::Future; +use tokio::sync::mpsc; + +use crate::offset_store; + +/// Provides an asynchronous task and a command channel that can run and drive an in-memory offset store. +pub fn task( + keys_expected: usize, +) -> ( + impl Future>, + mpsc::Sender, +) { + let (sender, mut receiver) = mpsc::channel(1); + let task = async move { + let mut offsets = HashMap::with_capacity(keys_expected); + while let Some(command) = receiver.recv().await { + match command { + offset_store::Command::GetLastOffset { reply_to } => { + let _ = reply_to.send(None); + offsets.clear(); + } + offset_store::Command::GetOffset { + persistence_id, + reply_to, + } => { + let _ = reply_to.send(offsets.get(&persistence_id).cloned()); + } + offset_store::Command::SaveOffset { + persistence_id, + offset, + } => { + offsets + .entry(persistence_id) + .and_modify(|v| *v = offset.clone()) + .or_insert(offset); + } + } + } + Ok(()) + }; + (task, sender) +} + +#[cfg(test)] +mod tests { + use super::*; + + use akka_persistence_rs::{EntityId, EntityType, Offset, PersistenceId}; + use test_log::test; + use tokio::sync::oneshot; + + #[test(tokio::test)] + async fn test_basic_ops() { + let (task, commands) = task(1); + + tokio::spawn(task); + + let (reply_to, reply_to_receiver) = oneshot::channel(); + assert!(commands + .send(offset_store::Command::GetLastOffset { reply_to }) + .await + .is_ok()); + assert_eq!(reply_to_receiver.await, Ok(None)); + + let persistence_id = + PersistenceId::new(EntityType::from("entity-type"), EntityId::from("entity-id")); + + let offset = Offset::Sequence(10); + + assert!(commands + .send(offset_store::Command::SaveOffset { + persistence_id: persistence_id.clone(), + offset: offset.clone() + }) + .await + .is_ok()); + + let (reply_to, reply_to_receiver) = oneshot::channel(); + assert!(commands + .send(offset_store::Command::GetOffset { + persistence_id: persistence_id.clone(), + reply_to + }) + .await + .is_ok()); + assert_eq!(reply_to_receiver.await, Ok(Some(offset))); + + let (reply_to, reply_to_receiver) = oneshot::channel(); + assert!(commands + .send(offset_store::Command::GetLastOffset { reply_to }) + .await + .is_ok()); + assert_eq!(reply_to_receiver.await, Ok(None)); + + let (reply_to, reply_to_receiver) = oneshot::channel(); + assert!(commands + .send(offset_store::Command::GetOffset { + persistence_id, + reply_to + }) + .await + .is_ok()); + assert_eq!(reply_to_receiver.await, Ok(None)); + } +}