Skip to content

Commit

Permalink
Simplifications
Browse files Browse the repository at this point in the history
  • Loading branch information
huntc committed Sep 28, 2023
1 parent f402199 commit 6d5c7cd
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 167 deletions.
4 changes: 3 additions & 1 deletion akka-persistence-rs/src/entity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use lru::LruCache;
use serde::Deserialize;
use serde::Serialize;
use std::io;
use std::num::NonZeroUsize;
use std::pin::Pin;
Expand All @@ -20,7 +22,7 @@ use crate::entity::EventSourcedBehavior;
use crate::{EntityId, Message};

/// An envelope wraps an event associated with a specific entity.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct EventEnvelope<E> {
/// Flags whether the associated event is to be considered
/// as one that represents an entity instance being deleted.
Expand Down
90 changes: 36 additions & 54 deletions examples/iot-service/frontend/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use akka_persistence_rs::{entity_manager::EventEnvelope, EntityId, Message as EntityMessage};
use tokio::sync::{broadcast, mpsc, oneshot};
use akka_persistence_rs::{
entity::Context as EntityContext, entity::EventSourcedBehavior, entity_manager::EventEnvelope,
EntityId,
};
use tokio::sync::{broadcast, mpsc};
use web_sys::HtmlInputElement;
use yew::{platform, prelude::*};

Expand All @@ -10,22 +13,19 @@ pub struct Props;

pub struct App {
entity_id: EntityId,
temperature_command: mpsc::Sender<EntityMessage<temperature::Command>>,
temperature: Option<u32>,
query: mpsc::Sender<EntityId>,
temperature: temperature::State,
}

const MAX_TEMPERATURE_COMMANDS: usize = 10;
const MAX_TEMPERATURE_QUERIES: usize = 10;
const MAX_TEMPERATURE_EVENTS: usize = 10;

pub enum Message {
EntityIdChanged {
entity_id: EntityId,
},
Submit,
UpdateWithState {
temperature: Option<u32>,
},
UpdateWithEvent {
Update {
envelope: EventEnvelope<temperature::Event>,
},
}
Expand All @@ -36,24 +36,19 @@ impl Component for App {
type Properties = Props;

fn create(ctx: &Context<Self>) -> Self {
let (temperature_command, temperature_command_receiver) =
mpsc::channel(MAX_TEMPERATURE_COMMANDS);

let (temperature_event, mut temperature_event_receiver) =
let (temperature_events, mut temperature_event_receiver) =
broadcast::channel(MAX_TEMPERATURE_EVENTS);

platform::spawn_local(temperature::task(
temperature_command.clone(),
temperature_command_receiver,
temperature_event,
));
let (query, query_receiver) = mpsc::channel(MAX_TEMPERATURE_QUERIES);

platform::spawn_local(temperature::task(query_receiver, temperature_events));

let task_link = ctx.link().clone();
platform::spawn_local(async move {
loop {
let result = temperature_event_receiver.recv().await;
match result {
Ok(envelope) => task_link.send_message(Message::UpdateWithEvent { envelope }),
Ok(envelope) => task_link.send_message(Message::Update { envelope }),
Err(broadcast::error::RecvError::Lagged(_)) => (),
Err(_) => break,
}
Expand All @@ -62,8 +57,8 @@ impl Component for App {

Self {
entity_id: EntityId::from(""),
temperature_command,
temperature: None,
query,
temperature: temperature::State::default(),
}
}

Expand All @@ -73,51 +68,34 @@ impl Component for App {
self.entity_id = entity_id;
false
}

Message::Submit => {
self.temperature = None;
self.temperature = temperature::State::default();

let task_query = self.query.clone();
let task_entity_id = self.entity_id.clone();
let task_temperature_command = self.temperature_command.clone();
ctx.link().send_future_batch(async move {
let mut messages = Vec::with_capacity(1);
let (reply_to, reply_to_receiver) = oneshot::channel();
if task_temperature_command
.send(EntityMessage::new(
task_entity_id,
temperature::Command::Get { reply_to },
))
.await
.is_ok()
{
if let Ok(history) = reply_to_receiver.await {
messages.push(Message::UpdateWithState {
temperature: history.last().cloned(),
})
}
}
messages
let _ = task_query.send(task_entity_id).await;
vec![]
});

true
}
Message::UpdateWithState { temperature } => {
if self.temperature.is_none() {
self.temperature = temperature;

Message::Update { envelope } => {
if envelope.entity_id == self.entity_id {
temperature::Behavior::on_event(
&EntityContext {
entity_id: &envelope.entity_id,
},
&mut self.temperature,
envelope.event,
);
true
} else {
false
}
}
Message::UpdateWithEvent { envelope } => {
let mut update = false;
if envelope.entity_id == self.entity_id {
if let temperature::Event::TemperatureRead { temperature } = envelope.event {
self.temperature = Some(temperature);
update = true;
}
}
update
}
}
}

Expand All @@ -136,7 +114,11 @@ impl Component for App {

let temperature = self
.temperature
.map_or(String::from("-"), |t| format!("{t}C"));
.history
.iter()
.last()
.map(|t| format!("{t}C"))
.unwrap_or(String::from("-"));

html! {
<main>
Expand Down
142 changes: 32 additions & 110 deletions examples/iot-service/frontend/src/temperature.rs
Original file line number Diff line number Diff line change
@@ -1,131 +1,53 @@
// Declare the entity

use std::{io, num::NonZeroUsize, pin::Pin};

use akka_persistence_rs::{
entity_manager::{self, EventEnvelope, Handler, SourceProvider},
EntityId, Message,
};
use async_trait::async_trait;
use akka_persistence_rs::{entity_manager::EventEnvelope, EntityId};
use gloo_net::eventsource::futures::EventSource;
use log::{error, warn};
use tokio::sync::{broadcast, mpsc};
use tokio_stream::{Stream, StreamExt};
use yew::platform;
use tokio_stream::StreamExt;

pub use iot_service_model::temperature::{Behavior, Command, Event, SecretDataValue, State};

// FIXME this adapter should be generalised

pub struct EventSourceAdapter {
query: mpsc::UnboundedSender<EntityId>,
event: broadcast::Sender<EventEnvelope<Event>>,
}

impl EventSourceAdapter {
pub fn new(
query: mpsc::UnboundedSender<EntityId>,
event: broadcast::Sender<EventEnvelope<Event>>,
) -> Self {
Self { query, event }
}
}

#[async_trait]
impl SourceProvider<Event> for EventSourceAdapter {
async fn source_initial(
&mut self,
) -> io::Result<Pin<Box<dyn Stream<Item = EventEnvelope<Event>> + Send + 'async_trait>>> {
Ok(Box::pin(tokio_stream::empty()))
}

async fn source(
&mut self,
entity_id: &EntityId,
) -> io::Result<Pin<Box<dyn Stream<Item = EventEnvelope<Event>> + Send + 'async_trait>>> {
if self.query.send(entity_id.clone()).is_ok() {
Ok(Box::pin(tokio_stream::empty()))
} else {
Err(io::Error::new(io::ErrorKind::Other, "Cannot send query"))
}
}
}

#[async_trait]
impl Handler<Event> for EventSourceAdapter {
async fn process(
&mut self,
envelope: EventEnvelope<Event>,
) -> io::Result<EventEnvelope<Event>> {
self.event
.send(envelope.clone())
.map(|_| envelope)
.map_err(|_context| io::Error::new(io::ErrorKind::Other, "Cannot broadcast"))
}
}

// Execute the entity manager for this entity

pub async fn task(
command: mpsc::Sender<Message<Command>>,
command_receiver: mpsc::Receiver<Message<Command>>,
event: broadcast::Sender<EventEnvelope<Event>>,
mut query_receiver: mpsc::Receiver<EntityId>,
events: broadcast::Sender<EventEnvelope<Event>>,
) {
let (query, mut query_receiver) = mpsc::unbounded_channel();

platform::spawn_local(async move {
if let Some(mut entity_id) = query_receiver.recv().await {
'outer: loop {
let url: &str = &format!("/api/temperature/events/{entity_id}");
let mut temperature_es = EventSource::new(url).unwrap();
let mut temperature_events = temperature_es.subscribe("message").unwrap();

loop {
tokio::select! {
Some(Ok((_, message))) = temperature_events.next() => {
let data: Option<String> = message.data().as_string();

if let Some(data) = data {
let update_command = match serde_json::from_str::<Event>(&data) {
Ok(event) => match event {
Event::Registered { secret } => Some(Command::Register { secret }),
Event::TemperatureRead { temperature } => {
Some(Command::Post { temperature })
}
},
Err(e) => {
error!("Failed to parse event: {}", e);
None
}
};
if let Some(update_command) = update_command {
let _ = command
.send(Message::new(EntityId::from(""), update_command))
.await;
if let Some(mut entity_id) = query_receiver.recv().await {
'outer: loop {
let url: &str = &format!("/api/temperature/events/{entity_id}");
let mut temperature_es = EventSource::new(url).unwrap();
let mut temperature_events = temperature_es.subscribe("message").unwrap();

loop {
tokio::select! {
Some(Ok((_, message))) = temperature_events.next() => {
let data: Option<String> = message.data().as_string();

if let Some(data) = data {
match serde_json::from_str::<EventEnvelope<Event>>(&data) {
Ok(envelope) => {
let _ = events.send(envelope);
}
} else {
warn!("Received event with no data");
}

Err(e) => {
error!("Failed to parse event: {}", e);
}
};
} else {
warn!("Received event with no data");
}

Some(next_entity_id) = query_receiver.recv() => {
entity_id = next_entity_id;
break
}
}

else => break 'outer,
Some(next_entity_id) = query_receiver.recv() => {
entity_id = next_entity_id;
break
}

else => break 'outer,
}
}
}
});

entity_manager::run(
Behavior,
EventSourceAdapter::new(query, event),
command_receiver,
NonZeroUsize::new(10).unwrap(),
)
.await
}
}
4 changes: 2 additions & 2 deletions examples/iot-service/model/src/temperature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use tokio::sync::oneshot;

#[derive(Default)]
pub struct State {
history: VecDeque<u32>,
secret: SecretDataValue,
pub history: VecDeque<u32>,
pub secret: SecretDataValue,
}

pub type SecretDataValue = SmolStr;
Expand Down

0 comments on commit 6d5c7cd

Please sign in to comment.