Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional logging #40

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ impl<E> WithTimestampOffset for EventEnvelope<E> {
fn timestamp_offset(&self) -> TimestampOffset {
TimestampOffset {
timestamp: self.timestamp,
// FIXME: Is this correct?
seen: vec![],
}
}
Expand Down Expand Up @@ -651,12 +650,13 @@ mod tests {

let (_, my_command_receiver) = mpsc::channel(10);

entity_manager::run(
assert!(entity_manager::run(
my_behavior,
file_log_topic_adapter,
my_command_receiver,
NonZeroUsize::new(1).unwrap(),
)
.await;
.await
.is_ok());
}
}
1 change: 1 addition & 0 deletions akka-persistence-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
log = { workspace = true }
lru = { workspace = true }
serde = { workspace = true, features = ["derive"] }
smol_str = { workspace = true, features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions akka-persistence-rs/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
};

/// Errors that can occur when applying effects.
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
}
Expand Down
58 changes: 34 additions & 24 deletions akka-persistence-rs/src/entity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use log::debug;
use log::warn;
use lru::LruCache;
use std::io;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -97,7 +99,8 @@ pub async fn run<A, B>(
mut adapter: A,
mut receiver: Receiver<Message<B::Command>>,
capacity: NonZeroUsize,
) where
) -> io::Result<()>
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::Command: Send,
B::State: Send + Sync,
Expand All @@ -107,7 +110,9 @@ pub async fn run<A, B>(

let mut entities = LruCache::new(capacity);

if let Ok(envelopes) = adapter.source_initial().await {
let envelopes = adapter.source_initial().await?;

{
tokio::pin!(envelopes);
while let Some(envelope) = envelopes.next().await {
update_entity::<B>(&mut entities, envelope);
Expand All @@ -118,9 +123,6 @@ pub async fn run<A, B>(
.on_recovery_completed(&context, &entity_status.state)
.await;
}
} else {
// A problem sourcing initial events is regarded as fatal.
return;
}

// Receive commands for the entities and process them.
Expand All @@ -131,26 +133,24 @@ pub async fn run<A, B>(
let mut entity_status = entities.get(&message.entity_id);

if entity_status.is_none() {
if let Ok(envelopes) = adapter.source(&message.entity_id).await {
tokio::pin!(envelopes);
while let Some(envelope) = envelopes.next().await {
update_entity::<B>(&mut entities, envelope);
}
entity_status = entities.get(&message.entity_id);
let context = Context {
entity_id: &message.entity_id,
};
behavior
.on_recovery_completed(
&context,
&entity_status
.unwrap_or(&EntityStatus::<B::State>::default())
.state,
)
.await;
} else {
continue;
let envelopes = adapter.source(&message.entity_id).await?;

tokio::pin!(envelopes);
while let Some(envelope) = envelopes.next().await {
update_entity::<B>(&mut entities, envelope);
}
entity_status = entities.get(&message.entity_id);
let context = Context {
entity_id: &message.entity_id,
};
behavior
.on_recovery_completed(
&context,
&entity_status
.unwrap_or(&EntityStatus::<B::State>::default())
.state,
)
.await;
}

// Given an entity, send it the command, possibly producing an effect.
Expand Down Expand Up @@ -181,9 +181,15 @@ pub async fn run<A, B>(
)
.await;
if result.is_err() {
warn!(
"An error occurred when processing an effect for {}. Result: {result:?} Evicting it.",
context.entity_id
);
entities.pop(context.entity_id);
}
}

Ok(())
}

fn update_entity<B>(
Expand All @@ -203,6 +209,8 @@ where
entity_state.last_seq_nr = envelope.seq_nr;
entity_state
} else {
debug!("Inserting new entity: {}", envelope.entity_id);

// We're avoiding the use of get_or_insert so that we can avoid
// cloning the entity id unless necessary.
entities.push(
Expand All @@ -216,6 +224,8 @@ where
};
B::on_event(&context, &mut entity_state.state, envelope.event);
} else {
debug!("Removing entity: {}", envelope.entity_id);

entities.pop(&envelope.entity_id);
}
envelope.seq_nr
Expand Down
4 changes: 2 additions & 2 deletions akka-projection-rs-commitlog/src/offset_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! An offset store for use with the Streambed commit log.

use std::num::NonZeroUsize;
use std::{io, num::NonZeroUsize};

use akka_persistence_rs::{entity_manager, EntityId, EntityType, Message, PersistenceId};
use akka_persistence_rs_commitlog::{CommitLogMarshaler, CommitLogTopicAdapter, EventEnvelope};
Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn run(
offset_store_id: OffsetStoreId,
offset_store_receiver: mpsc::Receiver<Message<offset_store::Command>>,
to_compaction_key: impl Fn(&EntityId, &offset_store::Event) -> Option<Key> + Send + Sync + 'static,
) {
) -> io::Result<()> {
let events_entity_type = EntityType::from(offset_store_id.clone());
let events_topic = Topic::from(offset_store_id.clone());

Expand Down
31 changes: 25 additions & 6 deletions akka-projection-rs-grpc/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use chrono::NaiveDateTime;
use chrono::TimeZone;
use chrono::Timelike;
use chrono::Utc;
use log::warn;
use prost::Message;
use prost_types::Timestamp;
use std::{future::Future, marker::PhantomData, ops::Range, pin::Pin};
Expand Down Expand Up @@ -196,7 +197,10 @@ where
if let Ok(proto::StreamOut{ message: Some(proto::stream_out::Message::Event(streamed_event)) }) = stream_out {
// If we can't parse the persistence id then we abort.

let Ok(persistence_id) = streamed_event.persistence_id.parse::<PersistenceId>() else { break };
let Ok(persistence_id) = streamed_event.persistence_id.parse::<PersistenceId>() else {
warn!("Cannot parse persistence id: {}. Aborting stream.", streamed_event.persistence_id);
break
};

// Process the sequence number. If it isn't what we expect then we go round again.

Expand All @@ -211,17 +215,20 @@ where
.await
.is_err()
{
warn!("Cannot send to the offset store: {}. Aborting stream.", streamed_event.persistence_id);
break;
}

let next_seq_nr = if let Ok(offset_store::State { last_seq_nr }) = reply_to_receiver.await {
last_seq_nr.wrapping_add(1)
} else {
warn!("Cannot receive from the offset store: {}. Aborting stream.", streamed_event.persistence_id);
break
};

if seq_nr > next_seq_nr && streamed_event.source == "BT" {
// This shouldn't happen, if so then abort.
warn!("Back track received for a future event: {}. Aborting stream.", streamed_event.persistence_id);
break;
} else if seq_nr != next_seq_nr {
// Duplicate or gap
Expand All @@ -231,7 +238,7 @@ where
// If the sequence number is what we expect and the producer is backtracking, then
// request its payload. If we can't get its payload then we abort as it is an error.

let streamed_event = if streamed_event.source == "BT" {
let resolved_streamed_event = if streamed_event.source == "BT" {
if let Ok(response) = stream_connection
.load_event(proto::LoadEventRequest {
stream_id: stream_stream_id.clone(),
Expand All @@ -245,21 +252,24 @@ where
{
Some(event)
} else {
warn!("Cannot receive an backtrack event: {}. Aborting stream.", streamed_event.persistence_id);
None
}
} else {
warn!("Cannot obtain an backtrack event: {}. Aborting stream.", streamed_event.persistence_id);
None
}
} else {
Some(streamed_event)
};

let Some(streamed_event) = streamed_event else { break; };
let Some(streamed_event) = resolved_streamed_event else { break; };

// Parse the event and abort if we can't.

let event = if let Some(payload) = streamed_event.payload {
if !payload.type_url.starts_with("type.googleapis.com/") {
warn!("Payload type was not expected: {}: {}. Aborting stream.", streamed_event.persistence_id, payload.type_url);
break
}
let Ok(event) = E::decode(Bytes::from(payload.value)) else { break };
Expand All @@ -268,9 +278,18 @@ where
None
};

let Some(offset) = streamed_event.offset else { break };
let Some(timestamp) = offset.timestamp else { break };
let Some(timestamp) = NaiveDateTime::from_timestamp_opt(timestamp.seconds, timestamp.nanos as u32) else { break };
let Some(offset) = streamed_event.offset else {
warn!("Payload offset was not present: {}. Aborting stream.", streamed_event.persistence_id);
break;
};
let Some(timestamp) = offset.timestamp else {
warn!("Payload timestamp was not present: {}. Aborting stream.", streamed_event.persistence_id);
break;
};
let Some(timestamp) = NaiveDateTime::from_timestamp_opt(timestamp.seconds, timestamp.nanos as u32) else {
warn!("Payload timestamp was not able to be converted: {}: {}. Aborting stream.", streamed_event.persistence_id, timestamp);
break;
};
let timestamp = Utc.from_utc_datetime(&timestamp);
let seen = offset.seen.iter().flat_map(|pis| pis.persistence_id.parse().ok().map(|pid|(pid, pis.seq_nr as u64))).collect();
let offset = TimestampOffset { timestamp, seen };
Expand Down
11 changes: 9 additions & 2 deletions akka-projection-rs-grpc/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub async fn run<E, EC, ECR>(

'outer: loop {
if let Err(oneshot::error::TryRecvError::Closed) = kill_switch.try_recv() {
debug!("gRPC producer killed.");
break;
}

Expand Down Expand Up @@ -287,7 +288,10 @@ pub async fn run<E, EC, ECR>(
}
},

_ = &mut kill_switch => break 'outer,
_ = &mut kill_switch => {
debug!("gRPC producer killed.");
break 'outer
}
}
}

Expand Down Expand Up @@ -342,7 +346,10 @@ pub async fn run<E, EC, ECR>(
}
},

_ = &mut kill_switch => break 'outer,
_ = &mut kill_switch => {
debug!("gRPC producer killed.");
break 'outer
}

else => break
}
Expand Down
3 changes: 2 additions & 1 deletion akka-projection-rs-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::VecDeque, path::Path, pin::Pin, time::Duration};
use akka_persistence_rs::{Offset, WithOffset};
use akka_projection_rs::{Handler, HandlerError, Handlers, PendingHandler, SourceProvider};
use futures::{self, future, stream, Future, Stream};
use log::error;
use log::{debug, error};
use serde::{Deserialize, Serialize};
use streambed::secret_store::SecretStore;
use tokio::{sync::oneshot, time};
Expand Down Expand Up @@ -143,6 +143,7 @@ pub async fn run<A, B, E, IH, SP>(
}

_ = &mut kill_switch => {
debug!("storage killed.");
break 'outer;
}

Expand Down
2 changes: 1 addition & 1 deletion examples/iot-service/backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

info!("IoT service ready");

tokio::spawn(temperature::task(
let _ = tokio::spawn(temperature::task(
cl,
ss,
temperature_events_key_secret_path,
Expand Down
2 changes: 1 addition & 1 deletion examples/iot-service/backend/src/temperature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ pub async fn task(
events_key_secret_path: String,
command_receiver: mpsc::Receiver<Message<Command>>,
events: broadcast::Sender<BroadcastEvent>,
) {
) -> io::Result<()> {
// We register a compaction strategy for our topic such that when we use up
// 64KB of disk space (the default), we will run compaction so that unwanted
// events are removed. In our scenario, unwanted events can be removed when
Expand Down