Skip to content

Commit

Permalink
Save offsets on the consumer and correctly uses entity id (#53)
Browse files Browse the repository at this point in the history
Consumer offsets were not being saved. They now are.

Also, we were incorrectly using the persistence id in place of where an entity id was expected. This was causing a parsing issue and preventing an event from being saved correctly.
  • Loading branch information
huntc authored Oct 19, 2023
1 parent f3a1268 commit 253d408
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
40 changes: 30 additions & 10 deletions akka-projection-rs-grpc/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use akka_persistence_rs::EntityId;
use akka_persistence_rs::Message as EntityMessage;
use akka_persistence_rs::Offset;
use akka_persistence_rs::PersistenceId;
Expand Down Expand Up @@ -202,14 +201,16 @@ where
break
};

let entity_id = persistence_id.entity_id.clone();

// Process the sequence number. If it isn't what we expect then we go round again.

let seq_nr = streamed_event.seq_nr as u64;

let (reply_to, reply_to_receiver) = oneshot::channel();
if stream_offset_store
.send(EntityMessage::new(
EntityId::from(streamed_event.persistence_id.clone()),
entity_id.clone(),
offset_store::Command::Get { reply_to },
))
.await
Expand Down Expand Up @@ -294,6 +295,20 @@ where
let seen = offset.seen.iter().flat_map(|pis| pis.persistence_id.parse().ok().map(|pid|(pid, pis.seq_nr as u64))).collect();
let offset = TimestampOffset { timestamp, seen };

// Save the offset

if stream_offset_store
.send(EntityMessage::new(
entity_id,
offset_store::Command::Save { seq_nr },
))
.await
.is_err()
{
warn!("Cannot save to the offset store: {}. Aborting stream.", streamed_event.persistence_id);
break;
}

// All is well, so emit the event.

yield EventEnvelope {persistence_id, seq_nr, event, offset};
Expand Down Expand Up @@ -473,10 +488,10 @@ mod tests {
let event_seen_by = vec![(persistence_id.clone(), 1)];

let server_kill_switch = Arc::new(Notify::new());
let offset_saved = Arc::new(Notify::new());

let task_event_time = event_time;
let task_event_seen_by = event_seen_by.clone();
let task_kill_switch = server_kill_switch.clone();
tokio::spawn(async move {
Server::builder()
.add_service(
Expand All @@ -489,21 +504,28 @@ mod tests {
)
.serve_with_shutdown(
"127.0.0.1:50051".to_socket_addrs().unwrap().next().unwrap(),
task_kill_switch.notified(),
server_kill_switch.notified(),
)
.await
.unwrap();
});

let (offset_store, mut offset_store_receiver) = mpsc::channel(10);
let task_persistence_id = persistence_id.clone();
let task_entity_id = entity_id.clone();
let task_offset_saved = offset_saved.clone();
tokio::spawn(async move {
while let Some(EntityMessage { entity_id, command }) =
offset_store_receiver.recv().await
{
if entity_id == task_persistence_id.to_string() {
if let offset_store::Command::Get { reply_to } = command {
let _ = reply_to.send(offset_store::State { last_seq_nr: 0 });
if entity_id == task_entity_id.to_string() {
match command {
offset_store::Command::Get { reply_to } => {
let _ = reply_to.send(offset_store::State { last_seq_nr: 0 });
}
offset_store::Command::Save { seq_nr } => {
assert_eq!(seq_nr, 1);
task_offset_saved.notify_one();
}
}
}
}
Expand Down Expand Up @@ -565,7 +587,5 @@ mod tests {

break;
}

server_kill_switch.notified();
}
}
2 changes: 0 additions & 2 deletions akka-projection-rs-grpc/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,5 @@ mod tests {
assert!(!consumer_filters_receiver.borrow().is_empty());

assert!(reply_receiver.await.is_ok());

server_kill_switch.notified();
}
}

0 comments on commit 253d408

Please sign in to comment.