Skip to content

Commit

Permalink
Additional logging (#40)
Browse files Browse the repository at this point in the history
Some additional logs and error handling.
  • Loading branch information
huntc authored Oct 11, 2023
1 parent f12f526 commit 6629e41
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 40 deletions.
6 changes: 3 additions & 3 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ impl<E> WithTimestampOffset for EventEnvelope<E> {
fn timestamp_offset(&self) -> TimestampOffset {
TimestampOffset {
timestamp: self.timestamp,
// FIXME: Is this correct?
seen: vec![],
}
}
Expand Down Expand Up @@ -651,12 +650,13 @@ mod tests {

let (_, my_command_receiver) = mpsc::channel(10);

entity_manager::run(
assert!(entity_manager::run(
my_behavior,
file_log_topic_adapter,
my_command_receiver,
NonZeroUsize::new(1).unwrap(),
)
.await;
.await
.is_ok());
}
}
1 change: 1 addition & 0 deletions akka-persistence-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
log = { workspace = true }
lru = { workspace = true }
serde = { workspace = true, features = ["derive"] }
smol_str = { workspace = true, features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions akka-persistence-rs/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
};

/// Errors that can occur when applying effects.
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
}
Expand Down
58 changes: 34 additions & 24 deletions akka-persistence-rs/src/entity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use log::debug;
use log::warn;
use lru::LruCache;
use std::io;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -97,7 +99,8 @@ pub async fn run<A, B>(
mut adapter: A,
mut receiver: Receiver<Message<B::Command>>,
capacity: NonZeroUsize,
) where
) -> io::Result<()>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::Command: Send,
B::State: Send + Sync,
Expand All @@ -107,7 +110,9 @@ pub async fn run<A, B>(

let mut entities = LruCache::new(capacity);

if let Ok(envelopes) = adapter.source_initial().await {
let envelopes = adapter.source_initial().await?;

{
tokio::pin!(envelopes);
while let Some(envelope) = envelopes.next().await {
update_entity::<B>(&mut entities, envelope);
Expand All @@ -118,9 +123,6 @@ pub async fn run<A, B>(
.on_recovery_completed(&context, &entity_status.state)
.await;
}
} else {
// A problem sourcing initial events is regarded as fatal.
return;
}

// Receive commands for the entities and process them.
Expand All @@ -131,26 +133,24 @@ pub async fn run<A, B>(
let mut entity_status = entities.get(&message.entity_id);

if entity_status.is_none() {
if let Ok(envelopes) = adapter.source(&message.entity_id).await {
tokio::pin!(envelopes);
while let Some(envelope) = envelopes.next().await {
update_entity::<B>(&mut entities, envelope);
}
entity_status = entities.get(&message.entity_id);
let context = Context {
entity_id: &message.entity_id,
};
behavior
.on_recovery_completed(
&context,
&entity_status
.unwrap_or(&EntityStatus::<B::State>::default())
.state,
)
.await;
} else {
continue;
let envelopes = adapter.source(&message.entity_id).await?;

tokio::pin!(envelopes);
while let Some(envelope) = envelopes.next().await {
update_entity::<B>(&mut entities, envelope);
}
entity_status = entities.get(&message.entity_id);
let context = Context {
entity_id: &message.entity_id,
};
behavior
.on_recovery_completed(
&context,
&entity_status
.unwrap_or(&EntityStatus::<B::State>::default())
.state,
)
.await;
}

// Given an entity, send it the command, possibly producing an effect.
Expand Down Expand Up @@ -181,9 +181,15 @@ pub async fn run<A, B>(
)
.await;
if result.is_err() {
warn!(
"An error occurred when processing an effect for {}. Result: {result:?} Evicting it.",
context.entity_id
);
entities.pop(context.entity_id);
}
}

Ok(())
}

fn update_entity<B>(
Expand All @@ -203,6 +209,8 @@ where
entity_state.last_seq_nr = envelope.seq_nr;
entity_state
} else {
debug!("Inserting new entity: {}", envelope.entity_id);

// We're avoiding the use of get_or_insert so that we can avoid
// cloning the entity id unless necessary.
entities.push(
Expand All @@ -216,6 +224,8 @@ where
};
B::on_event(&context, &mut entity_state.state, envelope.event);
} else {
debug!("Removing entity: {}", envelope.entity_id);

entities.pop(&envelope.entity_id);
}
envelope.seq_nr
Expand Down
4 changes: 2 additions & 2 deletions akka-projection-rs-commitlog/src/offset_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! An offset store for use with the Streambed commit log.

use std::num::NonZeroUsize;
use std::{io, num::NonZeroUsize};

use akka_persistence_rs::{entity_manager, EntityId, EntityType, Message, PersistenceId};
use akka_persistence_rs_commitlog::{CommitLogMarshaler, CommitLogTopicAdapter, EventEnvelope};
Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn run(
offset_store_id: OffsetStoreId,
offset_store_receiver: mpsc::Receiver<Message<offset_store::Command>>,
to_compaction_key: impl Fn(&EntityId, &offset_store::Event) -> Option<Key> + Send + Sync + 'static,
) {
) -> io::Result<()> {
let events_entity_type = EntityType::from(offset_store_id.clone());
let events_topic = Topic::from(offset_store_id.clone());

Expand Down
31 changes: 25 additions & 6 deletions akka-projection-rs-grpc/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use chrono::NaiveDateTime;
use chrono::TimeZone;
use chrono::Timelike;
use chrono::Utc;
use log::warn;
use prost::Message;
use prost_types::Timestamp;
use std::{future::Future, marker::PhantomData, ops::Range, pin::Pin};
Expand Down Expand Up @@ -196,7 +197,10 @@ where
if let Ok(proto::StreamOut{ message: Some(proto::stream_out::Message::Event(streamed_event)) }) = stream_out {
// If we can't parse the persistence id then we abort.

let Ok(persistence_id) = streamed_event.persistence_id.parse::<PersistenceId>() else { break };
let Ok(persistence_id) = streamed_event.persistence_id.parse::<PersistenceId>() else {
warn!("Cannot parse persistence id: {}. Aborting stream.", streamed_event.persistence_id);
break
};

// Process the sequence number. If it isn't what we expect then we go round again.

Expand All @@ -211,17 +215,20 @@ where
.await
.is_err()
{
warn!("Cannot send to the offset store: {}. Aborting stream.", streamed_event.persistence_id);
break;
}

let next_seq_nr = if let Ok(offset_store::State { last_seq_nr }) = reply_to_receiver.await {
last_seq_nr.wrapping_add(1)
} else {
warn!("Cannot receive from the offset store: {}. Aborting stream.", streamed_event.persistence_id);
break
};

if seq_nr > next_seq_nr && streamed_event.source == "BT" {
// This shouldn't happen, if so then abort.
warn!("Back track received for a future event: {}. Aborting stream.", streamed_event.persistence_id);
break;
} else if seq_nr != next_seq_nr {
// Duplicate or gap
Expand All @@ -231,7 +238,7 @@ where
// If the sequence number is what we expect and the producer is backtracking, then
// request its payload. If we can't get its payload then we abort as it is an error.

let streamed_event = if streamed_event.source == "BT" {
let resolved_streamed_event = if streamed_event.source == "BT" {
if let Ok(response) = stream_connection
.load_event(proto::LoadEventRequest {
stream_id: stream_stream_id.clone(),
Expand All @@ -245,21 +252,24 @@ where
{
Some(event)
} else {
warn!("Cannot receive an backtrack event: {}. Aborting stream.", streamed_event.persistence_id);
None
}
} else {
warn!("Cannot obtain an backtrack event: {}. Aborting stream.", streamed_event.persistence_id);
None
}
} else {
Some(streamed_event)
};

let Some(streamed_event) = streamed_event else { break; };
let Some(streamed_event) = resolved_streamed_event else { break; };

// Parse the event and abort if we can't.

let event = if let Some(payload) = streamed_event.payload {
if !payload.type_url.starts_with("type.googleapis.com/") {
warn!("Payload type was not expected: {}: {}. Aborting stream.", streamed_event.persistence_id, payload.type_url);
break
}
let Ok(event) = E::decode(Bytes::from(payload.value)) else { break };
Expand All @@ -268,9 +278,18 @@ where
None
};

let Some(offset) = streamed_event.offset else { break };
let Some(timestamp) = offset.timestamp else { break };
let Some(timestamp) = NaiveDateTime::from_timestamp_opt(timestamp.seconds, timestamp.nanos as u32) else { break };
let Some(offset) = streamed_event.offset else {
warn!("Payload offset was not present: {}. Aborting stream.", streamed_event.persistence_id);
break;
};
let Some(timestamp) = offset.timestamp else {
warn!("Payload timestamp was not present: {}. Aborting stream.", streamed_event.persistence_id);
break;
};
let Some(timestamp) = NaiveDateTime::from_timestamp_opt(timestamp.seconds, timestamp.nanos as u32) else {
warn!("Payload timestamp was not able to be converted: {}: {}. Aborting stream.", streamed_event.persistence_id, timestamp);
break;
};
let timestamp = Utc.from_utc_datetime(&timestamp);
let seen = offset.seen.iter().flat_map(|pis| pis.persistence_id.parse().ok().map(|pid|(pid, pis.seq_nr as u64))).collect();
let offset = TimestampOffset { timestamp, seen };
Expand Down
11 changes: 9 additions & 2 deletions akka-projection-rs-grpc/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub async fn run<E, EC, ECR>(

'outer: loop {
if let Err(oneshot::error::TryRecvError::Closed) = kill_switch.try_recv() {
debug!("gRPC producer killed.");
break;
}

Expand Down Expand Up @@ -287,7 +288,10 @@ pub async fn run<E, EC, ECR>(
}
},

_ = &mut kill_switch => break 'outer,
_ = &mut kill_switch => {
debug!("gRPC producer killed.");
break 'outer
}
}
}

Expand Down Expand Up @@ -342,7 +346,10 @@ pub async fn run<E, EC, ECR>(
}
},

_ = &mut kill_switch => break 'outer,
_ = &mut kill_switch => {
debug!("gRPC producer killed.");
break 'outer
}

else => break
}
Expand Down
3 changes: 2 additions & 1 deletion akka-projection-rs-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::VecDeque, path::Path, pin::Pin, time::Duration};
use akka_persistence_rs::{Offset, WithOffset};
use akka_projection_rs::{Handler, HandlerError, Handlers, PendingHandler, SourceProvider};
use futures::{self, future, stream, Future, Stream};
use log::error;
use log::{debug, error};
use serde::{Deserialize, Serialize};
use streambed::secret_store::SecretStore;
use tokio::{sync::oneshot, time};
Expand Down Expand Up @@ -143,6 +143,7 @@ pub async fn run<A, B, E, IH, SP>(
}

_ = &mut kill_switch => {
debug!("storage killed.");
break 'outer;
}

Expand Down
2 changes: 1 addition & 1 deletion examples/iot-service/backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

info!("IoT service ready");

tokio::spawn(temperature::task(
let _ = tokio::spawn(temperature::task(
cl,
ss,
temperature_events_key_secret_path,
Expand Down
2 changes: 1 addition & 1 deletion examples/iot-service/backend/src/temperature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ pub async fn task(
events_key_secret_path: String,
command_receiver: mpsc::Receiver<Message<Command>>,
events: broadcast::Sender<BroadcastEvent>,
) {
) -> io::Result<()> {
// We register a compaction strategy for our topic such that when we use up
// 64KB of disk space (the default), we will run compaction so that unwanted
// events are removed. In our scenario, unwanted events can be removed when
Expand Down

0 comments on commit 6629e41

Please sign in to comment.