From 253d4089e45edef987898bbb8154b8765705fd6d Mon Sep 17 00:00:00 2001 From: Christopher Hunt Date: Thu, 19 Oct 2023 18:23:55 +1100 Subject: [PATCH] Save offsets on the consumer and correctly uses entity id (#53) Consumer offsets were not being saved. They now are. Also, we were incorrectly using the persistence id in place of where an entity id was expected. This was causing a parsing issue and preventing an event from being saved correctly. --- akka-projection-rs-grpc/src/consumer.rs | 40 ++++++++++++++++++------- akka-projection-rs-grpc/src/producer.rs | 2 -- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index f5a2641..037798b 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -1,4 +1,3 @@ -use akka_persistence_rs::EntityId; use akka_persistence_rs::Message as EntityMessage; use akka_persistence_rs::Offset; use akka_persistence_rs::PersistenceId; @@ -202,6 +201,8 @@ where break }; + let entity_id = persistence_id.entity_id.clone(); + // Process the sequence number. If it isn't what we expect then we go round again. let seq_nr = streamed_event.seq_nr as u64; @@ -209,7 +210,7 @@ where let (reply_to, reply_to_receiver) = oneshot::channel(); if stream_offset_store .send(EntityMessage::new( - EntityId::from(streamed_event.persistence_id.clone()), + entity_id.clone(), offset_store::Command::Get { reply_to }, )) .await @@ -294,6 +295,20 @@ where let seen = offset.seen.iter().flat_map(|pis| pis.persistence_id.parse().ok().map(|pid|(pid, pis.seq_nr as u64))).collect(); let offset = TimestampOffset { timestamp, seen }; + // Save the offset + + if stream_offset_store + .send(EntityMessage::new( + entity_id, + offset_store::Command::Save { seq_nr }, + )) + .await + .is_err() + { + warn!("Cannot save to the offset store: {}. Aborting stream.", streamed_event.persistence_id); + break; + } + // All is well, so emit the event. yield EventEnvelope {persistence_id, seq_nr, event, offset}; @@ -473,10 +488,10 @@ mod tests { let event_seen_by = vec![(persistence_id.clone(), 1)]; let server_kill_switch = Arc::new(Notify::new()); + let offset_saved = Arc::new(Notify::new()); let task_event_time = event_time; let task_event_seen_by = event_seen_by.clone(); - let task_kill_switch = server_kill_switch.clone(); tokio::spawn(async move { Server::builder() .add_service( @@ -489,21 +504,28 @@ mod tests { ) .serve_with_shutdown( "127.0.0.1:50051".to_socket_addrs().unwrap().next().unwrap(), - task_kill_switch.notified(), + server_kill_switch.notified(), ) .await .unwrap(); }); let (offset_store, mut offset_store_receiver) = mpsc::channel(10); - let task_persistence_id = persistence_id.clone(); + let task_entity_id = entity_id.clone(); + let task_offset_saved = offset_saved.clone(); tokio::spawn(async move { while let Some(EntityMessage { entity_id, command }) = offset_store_receiver.recv().await { - if entity_id == task_persistence_id.to_string() { - if let offset_store::Command::Get { reply_to } = command { - let _ = reply_to.send(offset_store::State { last_seq_nr: 0 }); + if entity_id == task_entity_id.to_string() { + match command { + offset_store::Command::Get { reply_to } => { + let _ = reply_to.send(offset_store::State { last_seq_nr: 0 }); + } + offset_store::Command::Save { seq_nr } => { + assert_eq!(seq_nr, 1); + task_offset_saved.notify_one(); + } } } } @@ -565,7 +587,5 @@ mod tests { break; } - - server_kill_switch.notified(); } } diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index 1e734b0..339bd12 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -494,7 +494,5 @@ mod tests { assert!(!consumer_filters_receiver.borrow().is_empty()); assert!(reply_receiver.await.is_ok()); - - server_kill_switch.notified(); } }