From 810259cfb69c4efe2d5d61010a5b71585ab7a801 Mon Sep 17 00:00:00 2001 From: huntc Date: Fri, 25 Aug 2023 11:42:22 +1000 Subject: [PATCH] Benchmarking the projection Principally, I've introduced a benchmark for the Streambed storage projection. I checked heap allocations and CPU and nothing untoward was found. My machine (M1Max) yields about 10 million events per second, and we're pretty much down to the time it takes for channels and streams to process. The benchmark did highlight that both the source provider and handler traits should operate on a mutable self, and this is now consistent with their equivalents for the entity manager. --- akka-projection-rs-commitlog/src/lib.rs | 4 +- akka-projection-rs-grpc/src/consumer.rs | 4 +- akka-projection-rs-storage/Cargo.toml | 5 + akka-projection-rs-storage/benches/README.md | 18 ++ akka-projection-rs-storage/benches/benches.rs | 168 ++++++++++++++++++ akka-projection-rs-storage/src/lib.rs | 8 +- akka-projection-rs/src/lib.rs | 4 +- examples/iot-service/src/main.rs | 2 +- .../src/registration_projection.rs | 2 +- 9 files changed, 203 insertions(+), 12 deletions(-) create mode 100644 akka-projection-rs-storage/benches/README.md create mode 100644 akka-projection-rs-storage/benches/benches.rs diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index d7c1fde..304878e 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -125,7 +125,7 @@ where type Envelope = EventEnvelope; async fn source( - &self, + &mut self, offset: F, ) -> Pin + Send + 'async_trait>> where @@ -313,7 +313,7 @@ mod tests { // Source that event just produced. - let source_provider = CommitLogSourceProvider::new( + let mut source_provider = CommitLogSourceProvider::new( commit_log.clone(), MyEventMarshaler, "some-consumer", diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index bd0ac85..415ee31 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -14,12 +14,12 @@ pub struct GrpcSourceProvider { #[async_trait] impl SourceProvider for GrpcSourceProvider where - E: Sync, + E: Send + Sync, { type Envelope = EventEnvelope; async fn source( - &self, + &mut self, _offset: F, ) -> Pin + Send + 'async_trait>> where diff --git a/akka-projection-rs-storage/Cargo.toml b/akka-projection-rs-storage/Cargo.toml index c621c2d..c855c0a 100644 --- a/akka-projection-rs-storage/Cargo.toml +++ b/akka-projection-rs-storage/Cargo.toml @@ -19,7 +19,12 @@ akka-projection-rs = { path = "../akka-projection-rs" } [dev-dependencies] async-stream = { workspace = true } +criterion = { workspace = true, features = ["async_tokio", "html_reports"] } env_logger = { workspace = true } test-log = { workspace = true } akka-persistence-rs-commitlog = { path = "../akka-persistence-rs-commitlog" } + +[[bench]] +name = "benches" +harness = false diff --git a/akka-projection-rs-storage/benches/README.md b/akka-projection-rs-storage/benches/README.md new file mode 100644 index 0000000..9420992 --- /dev/null +++ b/akka-projection-rs-storage/benches/README.md @@ -0,0 +1,18 @@ +Benchmarking +=== + +To invoke a benchmark: + +``` +cd akka-projection-rs-storage +cargo bench +``` + +The above will compare with any previous benchmark run. Thus, you can checkout a commit, run the benchmark, then +re-run having applied any changes you have made. + +To instrument on OS X (requires `cargo install cargo-instruments` and the Xcode dev tools): + +``` +cargo instruments --bench "benches" --profile "bench-debug" -t "time" -- --bench +``` \ No newline at end of file diff --git a/akka-projection-rs-storage/benches/benches.rs b/akka-projection-rs-storage/benches/benches.rs new file mode 100644 index 0000000..a722869 --- /dev/null +++ b/akka-projection-rs-storage/benches/benches.rs @@ -0,0 +1,168 @@ +use std::{ + collections::HashMap, future::Future, path::PathBuf, pin::Pin, sync::Arc, time::Duration, +}; + +use akka_persistence_rs::{Offset, WithOffset}; +use akka_projection_rs::{Handler, HandlerError, SourceProvider}; +use async_stream::stream; +use async_trait::async_trait; +use criterion::{criterion_group, criterion_main, Criterion}; +use streambed::secret_store::{ + AppRoleAuthReply, Error, GetSecretReply, SecretData, SecretStore, UserPassAuthReply, +}; +use tokio::sync::{mpsc, Notify}; +use tokio_stream::Stream; + +const NUM_EVENTS: usize = 10_000; + +struct TestEnvelope { + offset: u64, +} + +impl WithOffset for TestEnvelope { + fn offset(&self) -> Offset { + Offset::Sequence(self.offset) + } +} + +struct TestSourceProvider; + +#[async_trait] +impl SourceProvider for TestSourceProvider { + type Envelope = TestEnvelope; + + async fn source( + &mut self, + offset: F, + ) -> Pin + Send + 'async_trait>> + where + F: Fn() -> FR + Send + Sync, + FR: Future> + Send, + { + let _ = offset().await; + Box::pin(stream!(for offset in 0..NUM_EVENTS as u64 { + yield TestEnvelope { offset }; + })) + } +} + +struct TestHandler { + events_processed: Arc, +} + +#[async_trait] +impl Handler for TestHandler { + type Envelope = TestEnvelope; + + async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError> { + const LAST_OFFSET: u64 = NUM_EVENTS as u64 - 1; + if envelope.offset == LAST_OFFSET { + self.events_processed.notify_one(); + } + Ok(()) + } +} + +#[derive(Clone)] +struct NoopSecretStore; + +#[async_trait] +impl SecretStore for NoopSecretStore { + async fn approle_auth( + &self, + _role_id: &str, + _secret_id: &str, + ) -> Result { + panic!("should not be called") + } + + async fn create_secret( + &self, + _secret_path: &str, + _secret_data: SecretData, + ) -> Result<(), Error> { + panic!("should not be called") + } + + async fn get_secret(&self, _secret_path: &str) -> Result, Error> { + let mut data = HashMap::new(); + data.insert( + "value".to_string(), + "ed31e94c161aea6ff2300c72b17741f71b616463f294dac0542324bbdbf8a2de".to_string(), + ); + + Ok(Some(GetSecretReply { + lease_duration: 10, + data: SecretData { data }, + })) + } + + async fn token_auth(&self, _token: &str) -> Result<(), Error> { + panic!("should not be called") + } + + async fn userpass_auth( + &self, + _username: &str, + _password: &str, + ) -> Result { + panic!("should not be called") + } + + async fn userpass_create_update_user( + &self, + _current_username: &str, + _username: &str, + _password: &str, + ) -> Result<(), Error> { + panic!("should not be called") + } +} + +fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("project events", move |b| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + let events_processed = Arc::new(Notify::new()); + let (_registration_projection_command, registration_projection_command_receiver) = + mpsc::channel(1); + + let task_events_processed = events_processed.clone(); + let _ = rt.spawn(async move { + let storage_path = PathBuf::from("/dev/null"); + + akka_projection_rs_storage::run( + &NoopSecretStore, + &"some-secret-path", + &storage_path, + registration_projection_command_receiver, + TestSourceProvider, + TestHandler { + events_processed: task_events_processed, + }, + Duration::from_secs(1), // Not testing out fs performance + ) + .await + }); + + b.to_async(&rt).iter(|| { + let task_events_processed = events_processed.clone(); + async move { + tokio::spawn(async move { + task_events_processed.notified().await; + }) + .await + } + }) + }); +} + +criterion_group! { + name = benches; + config = Criterion::default().sample_size(10); + targets = criterion_benchmark +} +criterion_main!(benches); diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index 5ae77c3..f6b2cc2 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -34,8 +34,8 @@ pub async fn run( secret_path: &str, state_storage_path: &Path, mut receiver: Receiver, - source_provider: SP, - handler: H, + mut source_provider: SP, + mut handler: H, min_save_offset_interval: Duration, ) where E: WithOffset, @@ -214,7 +214,7 @@ mod tests { type Envelope = EventEnvelope; async fn source( - &self, + &mut self, offset: F, ) -> Pin + Send + 'async_trait>> where @@ -240,7 +240,7 @@ mod tests { type Envelope = EventEnvelope; /// Process an envelope. - async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError> { + async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError> { assert_eq!( envelope, EventEnvelope { diff --git a/akka-projection-rs/src/lib.rs b/akka-projection-rs/src/lib.rs index 83f08be..45b685d 100644 --- a/akka-projection-rs/src/lib.rs +++ b/akka-projection-rs/src/lib.rs @@ -18,7 +18,7 @@ pub trait Handler { type Envelope; /// Process an envelope. - async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError>; + async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError>; } /// Errors for event processing by a handler. @@ -56,7 +56,7 @@ pub trait SourceProvider { /// Given a closure that returns an offset, source envelopes. async fn source( - &self, + &mut self, offset: F, ) -> Pin + Send + 'async_trait>> where diff --git a/examples/iot-service/src/main.rs b/examples/iot-service/src/main.rs index c0e3c5d..ff5465e 100644 --- a/examples/iot-service/src/main.rs +++ b/examples/iot-service/src/main.rs @@ -40,7 +40,7 @@ struct Args { } const MAX_REGISTRATION_MANAGER_COMMANDS: usize = 10; -const MAX_REGISTRATION_PROJECTION_MANAGER_COMMANDS: usize = 10; +const MAX_REGISTRATION_PROJECTION_MANAGER_COMMANDS: usize = 1; const MAX_TEMPERATURE_MANAGER_COMMANDS: usize = 10; #[tokio::main] diff --git a/examples/iot-service/src/registration_projection.rs b/examples/iot-service/src/registration_projection.rs index 1d8e8f9..cbcdaba 100644 --- a/examples/iot-service/src/registration_projection.rs +++ b/examples/iot-service/src/registration_projection.rs @@ -29,7 +29,7 @@ pub struct RegistrationHandler { impl Handler for RegistrationHandler { type Envelope = EventEnvelope; - async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError> { + async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError> { let registration::Event::Registered { secret } = envelope.event; self.temperature_sender .send(Message::new(