Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Volatile offset store #128

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions akka-projection-rs/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use akka_projection_rs::{
use async_stream::stream;
use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::sync::{mpsc, Notify};
use tokio::sync::Notify;
use tokio_stream::Stream;

const NUM_EVENTS: usize = 10_000;
Expand Down Expand Up @@ -106,18 +106,15 @@ fn criterion_benchmark(c: &mut Criterion) {
.unwrap();

let events_processed = Arc::new(Notify::new());
let (offset_store, mut offset_store_receiver) = mpsc::channel(1);
let offset_store_task =
async move { while let Some(_) = offset_store_receiver.recv().await {} };
let (projection_task, _kill_switch) = consumer::task(
offset_store,
None,
TestSourceProvider,
TestHandler {
events_processed: events_processed.clone(),
},
);

let _ = rt.spawn(async move { tokio::join!(offset_store_task, projection_task) });
let _ = rt.spawn(projection_task);

b.to_async(&rt).iter(|| {
let task_events_processed = events_processed.clone();
Expand Down
147 changes: 82 additions & 65 deletions akka-projection-rs/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct StorableState {
/// meaning, for multiple runs of a projection, it is possible for events to repeat
/// from previous runs.
pub fn task<A, B, Envelope, EE, IH, SP>(
offset_store: mpsc::Sender<offset_store::Command>,
offset_store: Option<mpsc::Sender<offset_store::Command>>,
source_provider: SP,
handler: IH,
) -> (impl Future<Output = ()>, oneshot::Sender<()>)
Expand All @@ -49,13 +49,20 @@ where

'outer: loop {
let mut source = source_provider
.source(|| async {
let (reply_to, reply_to_receiver) = oneshot::channel();
offset_store
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.ok()?;
reply_to_receiver.await.ok()?
.source(|| {
let offset_store = offset_store.clone();
async {
if let Some(offset_store) = offset_store {
let (reply_to, reply_to_receiver) = oneshot::channel();
offset_store
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.ok()?;
reply_to_receiver.await.ok()?
} else {
None
}
}
})
.await;

Expand All @@ -75,60 +82,64 @@ where

// Validate timestamp offsets if we have one.
let envelope = if matches!(offset, Offset::Timestamp(_)) {
// Process the sequence number. If it isn't what we expect then we go round again.
if let Some(offset_store) = &offset_store {
// Process the sequence number. If it isn't what we expect then we go round again.

let seq_nr = envelope.seq_nr();
let seq_nr = envelope.seq_nr();

let (reply_to, reply_to_receiver) = oneshot::channel();
if offset_store
.send(offset_store::Command::GetOffset { persistence_id: persistence_id.clone(), reply_to })
.await
.is_err()
{
warn!("Cannot send to the offset store: {}. Aborting stream.", persistence_id);
break;
}

let next_seq_nr = if let Ok(offset) = reply_to_receiver.await {
if let Some(Offset::Timestamp(TimestampOffset { seq_nr, .. })) = offset {
seq_nr.wrapping_add(1)
} else {
1
let (reply_to, reply_to_receiver) = oneshot::channel();
if offset_store
.send(offset_store::Command::GetOffset { persistence_id: persistence_id.clone(), reply_to })
.await
.is_err()
{
warn!("Cannot send to the offset store: {}. Aborting stream.", persistence_id);
break;
}
} else {
warn!("Cannot receive from the offset store: {}. Aborting stream.", persistence_id);
break
};

let source = envelope.source();
let next_seq_nr = if let Ok(offset) = reply_to_receiver.await {
if let Some(Offset::Timestamp(TimestampOffset { seq_nr, .. })) = offset {
seq_nr.wrapping_add(1)
} else {
1
}
} else {
warn!("Cannot receive from the offset store: {}. Aborting stream.", persistence_id);
break
};

if seq_nr > next_seq_nr && envelope.source() == Source::Backtrack {
// This shouldn't happen, if so then abort.
warn!("Back track received for a future event: {}. Aborting stream.", persistence_id);
break;
} else if seq_nr != next_seq_nr {
// Duplicate or gap
continue;
}
let source = envelope.source();

// If the sequence number is what we expect and the producer is backtracking, then
// request its payload. If we can't get its payload then we abort as it is an error.
if seq_nr > next_seq_nr && envelope.source() == Source::Backtrack {
// This shouldn't happen, if so then abort.
warn!("Back track received for a future event: {}. Aborting stream.", persistence_id);
break;
} else if seq_nr != next_seq_nr {
// Duplicate or gap
continue;
}

let resolved_envelope = if source == Source::Backtrack {
if let Some(event) = source_provider.load_envelope(persistence_id.clone(), seq_nr)
.await
{
Some(event)
// If the sequence number is what we expect and the producer is backtracking, then
// request its payload. If we can't get its payload then we abort as it is an error.

let resolved_envelope = if source == Source::Backtrack {
if let Some(event) = source_provider.load_envelope(persistence_id.clone(), seq_nr)
.await
{
Some(event)
} else {
warn!("Cannot obtain an backtrack envelope: {}. Aborting stream.", persistence_id);
None
}
} else {
warn!("Cannot obtain an backtrack envelope: {}. Aborting stream.", persistence_id);
None
}
} else {
Some(envelope)
};
Some(envelope)
};

let Some(envelope) = resolved_envelope else { break; };
envelope
let Some(envelope) = resolved_envelope else { break; };
envelope
} else {
envelope
}
} else {
envelope
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using no offset store together with remote gRPC projections will result in weird results, but I guess we don't have an easy way to fail fast for that "unsupported" combination? Maybe we should anyway fail here when it is a Offset::Timestamp with an error message saying that an offset store is required when using gRPC projections?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends what we mean by fail... I want avoid panicking as the source can be from anywhere, including something remote. We don't want remote providers of events to be able to halt our process.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, looking further... we can fail as we do for other conditions such as when there's a problem with the offset store. The existing failures were considered in the light of being transient, but I think I've already broken that rule.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPDATE: offset stores are always provided as before. I've introduced a new volatile_offset_store::task function to produce an in-memory store designed to always start from the beginning of a journal. The type of offset is immaterial.

};
Expand All @@ -142,12 +153,14 @@ where
break;
}
}
if offset_store
.send(offset_store::Command::SaveOffset { persistence_id, offset })
.await
.is_err()
{
break;
if let Some(offset_store) = &offset_store {
if offset_store
.send(offset_store::Command::SaveOffset { persistence_id, offset })
.await
.is_err()
{
break;
}
}
}
Handlers::Pending(handler, _) => {
Expand Down Expand Up @@ -180,13 +193,17 @@ where
active_source = &mut source;

// If all is well with our pending future so we can finally cause the offset to be persisted.
if pending.is_err()
|| offset_store
if pending.is_err() {
break;
}

if let Some(offset_store) = &offset_store {
if offset_store
.send(offset_store::Command::SaveOffset { persistence_id, offset })
.await
.is_err()
{
break;
.is_err() {
break;
}
}
}

Expand Down Expand Up @@ -401,7 +418,7 @@ mod tests {
let (offset_store, mut offset_store_receiver) = mpsc::channel(1);
let task_persistence_id = persistence_id.clone();
let (projection_task, _kill_switch) = task(
offset_store,
Some(offset_store),
MySourceProvider {
persistence_id: task_persistence_id.clone(),
event_value: event_value.clone(),
Expand Down