Skip to content

Commit

Permalink
Benchmarking the projection
Browse files Browse the repository at this point in the history
Principally, I've introduced a benchmark for the Streambed storage projection. I checked heap allocations and CPU and nothing untoward was found. My machine (M1Max) yields about 10 million events per second, and we're pretty much down to the time it takes for channels and streams to process.

The benchmark did highlight that both the source provider and handler traits should operate on a mutable self, and this is now consistent with their equivalents for the entity manager.
  • Loading branch information
huntc committed Aug 25, 2023
1 parent 10830a1 commit 810259c
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 12 deletions.
4 changes: 2 additions & 2 deletions akka-projection-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
type Envelope = EventEnvelope<E>;

async fn source<F, FR>(
&self,
&mut self,
offset: F,
) -> Pin<Box<dyn Stream<Item = Self::Envelope> + Send + 'async_trait>>
where
Expand Down Expand Up @@ -313,7 +313,7 @@ mod tests {

// Source that event just produced.

let source_provider = CommitLogSourceProvider::new(
let mut source_provider = CommitLogSourceProvider::new(
commit_log.clone(),
MyEventMarshaler,
"some-consumer",
Expand Down
4 changes: 2 additions & 2 deletions akka-projection-rs-grpc/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ pub struct GrpcSourceProvider<E> {
#[async_trait]
impl<E> SourceProvider for GrpcSourceProvider<E>
where
E: Sync,
E: Send + Sync,
{
type Envelope = EventEnvelope<E>;

async fn source<F, FR>(
&self,
&mut self,
_offset: F,
) -> Pin<Box<dyn Stream<Item = Self::Envelope> + Send + 'async_trait>>
where
Expand Down
5 changes: 5 additions & 0 deletions akka-projection-rs-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ akka-projection-rs = { path = "../akka-projection-rs" }

[dev-dependencies]
async-stream = { workspace = true }
criterion = { workspace = true, features = ["async_tokio", "html_reports"] }
env_logger = { workspace = true }
test-log = { workspace = true }

akka-persistence-rs-commitlog = { path = "../akka-persistence-rs-commitlog" }

[[bench]]
name = "benches"
harness = false
18 changes: 18 additions & 0 deletions akka-projection-rs-storage/benches/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Benchmarking
===

To invoke a benchmark:

```
cd akka-projection-rs-storage
cargo bench
```

The above will compare with any previous benchmark run. Thus, you can checkout a commit, run the benchmark, then
re-run having applied any changes you have made.

To instrument on OS X (requires `cargo install cargo-instruments` and the Xcode dev tools):

```
cargo instruments --bench "benches" --profile "bench-debug" -t "time" -- --bench
```
168 changes: 168 additions & 0 deletions akka-projection-rs-storage/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::{
collections::HashMap, future::Future, path::PathBuf, pin::Pin, sync::Arc, time::Duration,
};

use akka_persistence_rs::{Offset, WithOffset};
use akka_projection_rs::{Handler, HandlerError, SourceProvider};
use async_stream::stream;
use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use streambed::secret_store::{
AppRoleAuthReply, Error, GetSecretReply, SecretData, SecretStore, UserPassAuthReply,
};
use tokio::sync::{mpsc, Notify};
use tokio_stream::Stream;

const NUM_EVENTS: usize = 10_000;

struct TestEnvelope {
offset: u64,
}

impl WithOffset for TestEnvelope {
fn offset(&self) -> Offset {
Offset::Sequence(self.offset)
}
}

struct TestSourceProvider;

#[async_trait]
impl SourceProvider for TestSourceProvider {
type Envelope = TestEnvelope;

async fn source<F, FR>(
&mut self,
offset: F,
) -> Pin<Box<dyn Stream<Item = Self::Envelope> + Send + 'async_trait>>
where
F: Fn() -> FR + Send + Sync,
FR: Future<Output = Option<Offset>> + Send,
{
let _ = offset().await;
Box::pin(stream!(for offset in 0..NUM_EVENTS as u64 {
yield TestEnvelope { offset };
}))
}
}

struct TestHandler {
events_processed: Arc<Notify>,
}

#[async_trait]
impl Handler for TestHandler {
type Envelope = TestEnvelope;

async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError> {
const LAST_OFFSET: u64 = NUM_EVENTS as u64 - 1;
if envelope.offset == LAST_OFFSET {
self.events_processed.notify_one();
}
Ok(())
}
}

#[derive(Clone)]
struct NoopSecretStore;

#[async_trait]
impl SecretStore for NoopSecretStore {
async fn approle_auth(
&self,
_role_id: &str,
_secret_id: &str,
) -> Result<AppRoleAuthReply, Error> {
panic!("should not be called")
}

async fn create_secret(
&self,
_secret_path: &str,
_secret_data: SecretData,
) -> Result<(), Error> {
panic!("should not be called")
}

async fn get_secret(&self, _secret_path: &str) -> Result<Option<GetSecretReply>, Error> {
let mut data = HashMap::new();
data.insert(
"value".to_string(),
"ed31e94c161aea6ff2300c72b17741f71b616463f294dac0542324bbdbf8a2de".to_string(),
);

Ok(Some(GetSecretReply {
lease_duration: 10,
data: SecretData { data },
}))
}

async fn token_auth(&self, _token: &str) -> Result<(), Error> {
panic!("should not be called")
}

async fn userpass_auth(
&self,
_username: &str,
_password: &str,
) -> Result<UserPassAuthReply, Error> {
panic!("should not be called")
}

async fn userpass_create_update_user(
&self,
_current_username: &str,
_username: &str,
_password: &str,
) -> Result<(), Error> {
panic!("should not be called")
}
}

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("project events", move |b| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();

let events_processed = Arc::new(Notify::new());
let (_registration_projection_command, registration_projection_command_receiver) =
mpsc::channel(1);

let task_events_processed = events_processed.clone();
let _ = rt.spawn(async move {
let storage_path = PathBuf::from("/dev/null");

akka_projection_rs_storage::run(
&NoopSecretStore,
&"some-secret-path",
&storage_path,
registration_projection_command_receiver,
TestSourceProvider,
TestHandler {
events_processed: task_events_processed,
},
Duration::from_secs(1), // Not testing out fs performance
)
.await
});

b.to_async(&rt).iter(|| {
let task_events_processed = events_processed.clone();
async move {
tokio::spawn(async move {
task_events_processed.notified().await;
})
.await
}
})
});
}

criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = criterion_benchmark
}
criterion_main!(benches);
8 changes: 4 additions & 4 deletions akka-projection-rs-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ pub async fn run<E, H, SP>(
secret_path: &str,
state_storage_path: &Path,
mut receiver: Receiver<Command>,
source_provider: SP,
handler: H,
mut source_provider: SP,
mut handler: H,
min_save_offset_interval: Duration,
) where
E: WithOffset,
Expand Down Expand Up @@ -214,7 +214,7 @@ mod tests {
type Envelope = EventEnvelope<MyEvent>;

async fn source<F, FR>(
&self,
&mut self,
offset: F,
) -> Pin<Box<dyn Stream<Item = Self::Envelope> + Send + 'async_trait>>
where
Expand All @@ -240,7 +240,7 @@ mod tests {
type Envelope = EventEnvelope<MyEvent>;

/// Process an envelope.
async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError> {
async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError> {
assert_eq!(
envelope,
EventEnvelope {
Expand Down
4 changes: 2 additions & 2 deletions akka-projection-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub trait Handler {
type Envelope;

/// Process an envelope.
async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError>;
async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError>;
}

/// Errors for event processing by a handler.
Expand Down Expand Up @@ -56,7 +56,7 @@ pub trait SourceProvider {

/// Given a closure that returns an offset, source envelopes.
async fn source<F, FR>(
&self,
&mut self,
offset: F,
) -> Pin<Box<dyn Stream<Item = Self::Envelope> + Send + 'async_trait>>
where
Expand Down
2 changes: 1 addition & 1 deletion examples/iot-service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct Args {
}

const MAX_REGISTRATION_MANAGER_COMMANDS: usize = 10;
const MAX_REGISTRATION_PROJECTION_MANAGER_COMMANDS: usize = 10;
const MAX_REGISTRATION_PROJECTION_MANAGER_COMMANDS: usize = 1;
const MAX_TEMPERATURE_MANAGER_COMMANDS: usize = 10;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/iot-service/src/registration_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct RegistrationHandler {
impl Handler for RegistrationHandler {
type Envelope = EventEnvelope<registration::Event>;

async fn process(&self, envelope: Self::Envelope) -> Result<(), HandlerError> {
async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError> {
let registration::Event::Registered { secret } = envelope.event;
self.temperature_sender
.send(Message::new(
Expand Down

0 comments on commit 810259c

Please sign in to comment.