diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index f4ff507..3d7d272 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -61,7 +61,6 @@ impl WithTimestampOffset for EventEnvelope { fn timestamp_offset(&self) -> TimestampOffset { TimestampOffset { timestamp: self.timestamp, - // FIXME: Is this correct? seen: vec![], } } @@ -651,12 +650,13 @@ mod tests { let (_, my_command_receiver) = mpsc::channel(10); - entity_manager::run( + assert!(entity_manager::run( my_behavior, file_log_topic_adapter, my_command_receiver, NonZeroUsize::new(1).unwrap(), ) - .await; + .await + .is_ok()); } } diff --git a/akka-persistence-rs/Cargo.toml b/akka-persistence-rs/Cargo.toml index 2a42327..4f23a56 100644 --- a/akka-persistence-rs/Cargo.toml +++ b/akka-persistence-rs/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } +log = { workspace = true } lru = { workspace = true } serde = { workspace = true, features = ["derive"] } smol_str = { workspace = true, features = ["serde"] } diff --git a/akka-persistence-rs/src/effect.rs b/akka-persistence-rs/src/effect.rs index 1e8912f..4f1ac75 100644 --- a/akka-persistence-rs/src/effect.rs +++ b/akka-persistence-rs/src/effect.rs @@ -14,6 +14,7 @@ use crate::{ }; /// Errors that can occur when applying effects. +#[derive(Debug)] pub enum Error { IoError(io::Error), } diff --git a/akka-persistence-rs/src/entity_manager.rs b/akka-persistence-rs/src/entity_manager.rs index 74f991b..55bb67f 100644 --- a/akka-persistence-rs/src/entity_manager.rs +++ b/akka-persistence-rs/src/entity_manager.rs @@ -8,6 +8,8 @@ use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; +use log::debug; +use log::warn; use lru::LruCache; use std::io; use std::num::NonZeroUsize; @@ -97,7 +99,8 @@ pub async fn run( mut adapter: A, mut receiver: Receiver>, capacity: NonZeroUsize, -) where +) -> io::Result<()> +where B: EventSourcedBehavior + Send + Sync + 'static, B::Command: Send, B::State: Send + Sync, @@ -107,7 +110,9 @@ pub async fn run( let mut entities = LruCache::new(capacity); - if let Ok(envelopes) = adapter.source_initial().await { + let envelopes = adapter.source_initial().await?; + + { tokio::pin!(envelopes); while let Some(envelope) = envelopes.next().await { update_entity::(&mut entities, envelope); @@ -118,9 +123,6 @@ pub async fn run( .on_recovery_completed(&context, &entity_status.state) .await; } - } else { - // A problem sourcing initial events is regarded as fatal. - return; } // Receive commands for the entities and process them. @@ -131,26 +133,24 @@ pub async fn run( let mut entity_status = entities.get(&message.entity_id); if entity_status.is_none() { - if let Ok(envelopes) = adapter.source(&message.entity_id).await { - tokio::pin!(envelopes); - while let Some(envelope) = envelopes.next().await { - update_entity::(&mut entities, envelope); - } - entity_status = entities.get(&message.entity_id); - let context = Context { - entity_id: &message.entity_id, - }; - behavior - .on_recovery_completed( - &context, - &entity_status - .unwrap_or(&EntityStatus::::default()) - .state, - ) - .await; - } else { - continue; + let envelopes = adapter.source(&message.entity_id).await?; + + tokio::pin!(envelopes); + while let Some(envelope) = envelopes.next().await { + update_entity::(&mut entities, envelope); } + entity_status = entities.get(&message.entity_id); + let context = Context { + entity_id: &message.entity_id, + }; + behavior + .on_recovery_completed( + &context, + &entity_status + .unwrap_or(&EntityStatus::::default()) + .state, + ) + .await; } // Given an entity, send it the command, possibly producing an effect. @@ -181,9 +181,15 @@ pub async fn run( ) .await; if result.is_err() { + warn!( + "An error occurred when processing an effect for {}. Result: {result:?} Evicting it.", + context.entity_id + ); entities.pop(context.entity_id); } } + + Ok(()) } fn update_entity( @@ -203,6 +209,8 @@ where entity_state.last_seq_nr = envelope.seq_nr; entity_state } else { + debug!("Inserting new entity: {}", envelope.entity_id); + // We're avoiding the use of get_or_insert so that we can avoid // cloning the entity id unless necessary. entities.push( @@ -216,6 +224,8 @@ where }; B::on_event(&context, &mut entity_state.state, envelope.event); } else { + debug!("Removing entity: {}", envelope.entity_id); + entities.pop(&envelope.entity_id); } envelope.seq_nr diff --git a/akka-projection-rs-commitlog/src/offset_store.rs b/akka-projection-rs-commitlog/src/offset_store.rs index 487ee94..8631262 100644 --- a/akka-projection-rs-commitlog/src/offset_store.rs +++ b/akka-projection-rs-commitlog/src/offset_store.rs @@ -1,6 +1,6 @@ //! An offset store for use with the Streambed commit log. -use std::num::NonZeroUsize; +use std::{io, num::NonZeroUsize}; use akka_persistence_rs::{entity_manager, EntityId, EntityType, Message, PersistenceId}; use akka_persistence_rs_commitlog::{CommitLogMarshaler, CommitLogTopicAdapter, EventEnvelope}; @@ -90,7 +90,7 @@ pub async fn run( offset_store_id: OffsetStoreId, offset_store_receiver: mpsc::Receiver>, to_compaction_key: impl Fn(&EntityId, &offset_store::Event) -> Option + Send + Sync + 'static, -) { +) -> io::Result<()> { let events_entity_type = EntityType::from(offset_store_id.clone()); let events_topic = Topic::from(offset_store_id.clone()); diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index dead6c2..f5a2641 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -13,6 +13,7 @@ use chrono::NaiveDateTime; use chrono::TimeZone; use chrono::Timelike; use chrono::Utc; +use log::warn; use prost::Message; use prost_types::Timestamp; use std::{future::Future, marker::PhantomData, ops::Range, pin::Pin}; @@ -196,7 +197,10 @@ where if let Ok(proto::StreamOut{ message: Some(proto::stream_out::Message::Event(streamed_event)) }) = stream_out { // If we can't parse the persistence id then we abort. - let Ok(persistence_id) = streamed_event.persistence_id.parse::() else { break }; + let Ok(persistence_id) = streamed_event.persistence_id.parse::() else { + warn!("Cannot parse persistence id: {}. Aborting stream.", streamed_event.persistence_id); + break + }; // Process the sequence number. If it isn't what we expect then we go round again. @@ -211,17 +215,20 @@ where .await .is_err() { + warn!("Cannot send to the offset store: {}. Aborting stream.", streamed_event.persistence_id); break; } let next_seq_nr = if let Ok(offset_store::State { last_seq_nr }) = reply_to_receiver.await { last_seq_nr.wrapping_add(1) } else { + warn!("Cannot receive from the offset store: {}. Aborting stream.", streamed_event.persistence_id); break }; if seq_nr > next_seq_nr && streamed_event.source == "BT" { // This shouldn't happen, if so then abort. + warn!("Back track received for a future event: {}. Aborting stream.", streamed_event.persistence_id); break; } else if seq_nr != next_seq_nr { // Duplicate or gap @@ -231,7 +238,7 @@ where // If the sequence number is what we expect and the producer is backtracking, then // request its payload. If we can't get its payload then we abort as it is an error. - let streamed_event = if streamed_event.source == "BT" { + let resolved_streamed_event = if streamed_event.source == "BT" { if let Ok(response) = stream_connection .load_event(proto::LoadEventRequest { stream_id: stream_stream_id.clone(), @@ -245,21 +252,24 @@ where { Some(event) } else { + warn!("Cannot receive an backtrack event: {}. Aborting stream.", streamed_event.persistence_id); None } } else { + warn!("Cannot obtain an backtrack event: {}. Aborting stream.", streamed_event.persistence_id); None } } else { Some(streamed_event) }; - let Some(streamed_event) = streamed_event else { break; }; + let Some(streamed_event) = resolved_streamed_event else { break; }; // Parse the event and abort if we can't. let event = if let Some(payload) = streamed_event.payload { if !payload.type_url.starts_with("type.googleapis.com/") { + warn!("Payload type was not expected: {}: {}. Aborting stream.", streamed_event.persistence_id, payload.type_url); break } let Ok(event) = E::decode(Bytes::from(payload.value)) else { break }; @@ -268,9 +278,18 @@ where None }; - let Some(offset) = streamed_event.offset else { break }; - let Some(timestamp) = offset.timestamp else { break }; - let Some(timestamp) = NaiveDateTime::from_timestamp_opt(timestamp.seconds, timestamp.nanos as u32) else { break }; + let Some(offset) = streamed_event.offset else { + warn!("Payload offset was not present: {}. Aborting stream.", streamed_event.persistence_id); + break; + }; + let Some(timestamp) = offset.timestamp else { + warn!("Payload timestamp was not present: {}. Aborting stream.", streamed_event.persistence_id); + break; + }; + let Some(timestamp) = NaiveDateTime::from_timestamp_opt(timestamp.seconds, timestamp.nanos as u32) else { + warn!("Payload timestamp was not able to be converted: {}: {}. Aborting stream.", streamed_event.persistence_id, timestamp); + break; + }; let timestamp = Utc.from_utc_datetime(×tamp); let seen = offset.seen.iter().flat_map(|pis| pis.persistence_id.parse().ok().map(|pid|(pid, pis.seq_nr as u64))).collect(); let offset = TimestampOffset { timestamp, seen }; diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index fbabb78..1e734b0 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -186,6 +186,7 @@ pub async fn run( 'outer: loop { if let Err(oneshot::error::TryRecvError::Closed) = kill_switch.try_recv() { + debug!("gRPC producer killed."); break; } @@ -287,7 +288,10 @@ pub async fn run( } }, - _ = &mut kill_switch => break 'outer, + _ = &mut kill_switch => { + debug!("gRPC producer killed."); + break 'outer + } } } @@ -342,7 +346,10 @@ pub async fn run( } }, - _ = &mut kill_switch => break 'outer, + _ = &mut kill_switch => { + debug!("gRPC producer killed."); + break 'outer + } else => break } diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index 0398933..e00b947 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -5,7 +5,7 @@ use std::{collections::VecDeque, path::Path, pin::Pin, time::Duration}; use akka_persistence_rs::{Offset, WithOffset}; use akka_projection_rs::{Handler, HandlerError, Handlers, PendingHandler, SourceProvider}; use futures::{self, future, stream, Future, Stream}; -use log::error; +use log::{debug, error}; use serde::{Deserialize, Serialize}; use streambed::secret_store::SecretStore; use tokio::{sync::oneshot, time}; @@ -143,6 +143,7 @@ pub async fn run( } _ = &mut kill_switch => { + debug!("storage killed."); break 'outer; } diff --git a/examples/iot-service/backend/src/main.rs b/examples/iot-service/backend/src/main.rs index 1e54e1e..ea2098f 100644 --- a/examples/iot-service/backend/src/main.rs +++ b/examples/iot-service/backend/src/main.rs @@ -161,7 +161,7 @@ async fn main() -> Result<(), Box> { info!("IoT service ready"); - tokio::spawn(temperature::task( + let _ = tokio::spawn(temperature::task( cl, ss, temperature_events_key_secret_path, diff --git a/examples/iot-service/backend/src/temperature.rs b/examples/iot-service/backend/src/temperature.rs index e9193b4..5106992 100644 --- a/examples/iot-service/backend/src/temperature.rs +++ b/examples/iot-service/backend/src/temperature.rs @@ -214,7 +214,7 @@ pub async fn task( events_key_secret_path: String, command_receiver: mpsc::Receiver>, events: broadcast::Sender, -) { +) -> io::Result<()> { // We register a compaction strategy for our topic such that when we use up // 64KB of disk space (the default), we will run compaction so that unwanted // events are removed. In our scenario, unwanted events can be removed when