diff --git a/Cargo.toml b/Cargo.toml index 99428b8..9f86af2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ serde_json = "1.0.133" # mqtt client # Disable the default dependency on SSL to avoid a build dependency on OpenSSL paho-mqtt = {version = "0.12.5", features=["bundled"]} +async-trait = "0.1.51" [features] ros = ["dep:r2r"] diff --git a/benches/simple_add.rs b/benches/simple_add.rs index 0e9f072..7895c10 100644 --- a/benches/simple_add.rs +++ b/benches/simple_add.rs @@ -5,13 +5,11 @@ use criterion::BenchmarkId; use criterion::Criterion; use criterion::SamplingMode; use criterion::{criterion_group, criterion_main}; -use futures::{ - stream::{self, BoxStream}, - StreamExt, -}; +use futures::stream::{self, BoxStream}; +use trustworthiness_checker::null_output_handler::NullOutputHandler; use trustworthiness_checker::type_checking::type_check; use trustworthiness_checker::OutputStream; -use trustworthiness_checker::{Value, Monitor, VarName}; +use trustworthiness_checker::{Monitor, Value, VarName}; pub fn spec_simple_add_monitor() -> &'static str { "in x\n\ @@ -27,22 +25,18 @@ pub fn spec_simple_add_monitor_typed() -> &'static str { z = x + y" } -pub fn input_streams_concrete( - size: usize, -) -> BTreeMap> { +pub fn input_streams_concrete(size: usize) -> BTreeMap> { let size = size as i64; let mut input_streams = BTreeMap::new(); input_streams.insert( VarName("x".into()), - Box::pin(stream::iter( - (0..size).map(|x| Value::Int(2 * x)), - )) as Pin + std::marker::Send>>, + Box::pin(stream::iter((0..size).map(|x| Value::Int(2 * x)))) + as Pin + std::marker::Send>>, ); input_streams.insert( VarName("y".into()), - Box::pin(stream::iter( - (0..size).map(|y| Value::Int(2 * y + 1)), - )) as Pin + std::marker::Send>>, + Box::pin(stream::iter((0..size).map(|y| Value::Int(2 * y + 1)))) + as Pin + std::marker::Send>>, ); input_streams } @@ -52,15 +46,11 @@ pub fn input_streams_typed(size: usize) -> BTreeMap let size = size as i64; input_streams.insert( VarName("x".into()), - Box::pin(stream::iter( - (0..size).map(|x| Value::Int(2 * x)), - )) as OutputStream, + Box::pin(stream::iter((0..size).map(|x| Value::Int(2 * x)))) as OutputStream, ); input_streams.insert( VarName("y".into()), - Box::pin(stream::iter( - (0..size).map(|y| Value::Int(2 * y + 1)), - )), + Box::pin(stream::iter((0..size).map(|y| Value::Int(2 * y + 1)))), ); input_streams } @@ -68,32 +58,28 @@ pub fn input_streams_typed(size: usize) -> BTreeMap async fn monitor_outputs_untyped_constraints(num_outputs: usize) { let mut input_streams = input_streams_concrete(num_outputs); let spec = trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor()).unwrap(); - let mut async_monitor = + let output_handler = NullOutputHandler::new(spec.output_vars.clone()); + let async_monitor = trustworthiness_checker::constraint_based_runtime::ConstraintBasedMonitor::new( spec, &mut input_streams, + output_handler, ); - let _outputs: Vec> = async_monitor - .monitor_outputs() - .take(num_outputs) - .collect() - .await; + async_monitor.run().await; } async fn monitor_outputs_untyped_async(num_outputs: usize) { let mut input_streams = input_streams_concrete(num_outputs); let spec = trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor()).unwrap(); - let mut async_monitor = trustworthiness_checker::async_runtime::AsyncMonitorRunner::< + let output_handler = NullOutputHandler::new(spec.output_vars.clone()); + let async_monitor = trustworthiness_checker::async_runtime::AsyncMonitorRunner::< _, _, trustworthiness_checker::UntimedLolaSemantics, trustworthiness_checker::LOLASpecification, - >::new(spec, &mut input_streams); - let _outputs: Vec> = async_monitor - .monitor_outputs() - .take(num_outputs) - .collect() - .await; + _, + >::new(spec, &mut input_streams, output_handler); + async_monitor.run().await; } async fn monitor_outputs_typed_async(num_outputs: usize) { @@ -101,33 +87,29 @@ async fn monitor_outputs_typed_async(num_outputs: usize) { let spec = trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor_typed()).unwrap(); let spec = type_check(spec).expect("Type check failed"); - let mut async_monitor = trustworthiness_checker::async_runtime::AsyncMonitorRunner::< + let output_handler = NullOutputHandler::new(spec.output_vars.clone()); + let async_monitor = trustworthiness_checker::async_runtime::AsyncMonitorRunner::< _, _, trustworthiness_checker::TypedUntimedLolaSemantics, _, - >::new(spec, &mut input_streams); - let _outputs: Vec> = async_monitor - .monitor_outputs() - .take(num_outputs) - .collect() - .await; + _, + >::new(spec, &mut input_streams, output_handler); + async_monitor.run().await; } async fn monitor_outputs_untyped_queuing(num_outputs: usize) { let mut input_streams = input_streams_concrete(num_outputs); let spec = trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor()).unwrap(); - let mut async_monitor = trustworthiness_checker::queuing_runtime::QueuingMonitorRunner::< + let output_handler = NullOutputHandler::new(spec.output_vars.clone()); + let async_monitor = trustworthiness_checker::queuing_runtime::QueuingMonitorRunner::< _, _, trustworthiness_checker::UntimedLolaSemantics, trustworthiness_checker::LOLASpecification, - >::new(spec, &mut input_streams); - let _outputs: Vec> = async_monitor - .monitor_outputs() - .take(num_outputs) - .collect() - .await; + _, + >::new(spec, &mut input_streams, output_handler); + async_monitor.run().await; } async fn monitor_outputs_typed_queuing(num_outputs: usize) { @@ -135,17 +117,15 @@ async fn monitor_outputs_typed_queuing(num_outputs: usize) { let spec = trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor_typed()).unwrap(); let spec = type_check(spec).expect("Type check failed"); - let mut async_monitor = trustworthiness_checker::queuing_runtime::QueuingMonitorRunner::< + let output_handler = NullOutputHandler::new(spec.output_vars.clone()); + let async_monitor = trustworthiness_checker::queuing_runtime::QueuingMonitorRunner::< _, _, trustworthiness_checker::TypedUntimedLolaSemantics, _, - >::new(spec, &mut input_streams); - let _outputs: Vec> = async_monitor - .monitor_outputs() - // .take(num_outputs) - .collect() - .await; + _, + >::new(spec, &mut input_streams, output_handler); + async_monitor.run().await; } fn from_elem(c: &mut Criterion) { diff --git a/src/async_runtime.rs b/src/async_runtime.rs index 5fa0ead..f6add7a 100644 --- a/src/async_runtime.rs +++ b/src/async_runtime.rs @@ -1,11 +1,9 @@ use std::collections::BTreeMap; use std::marker::PhantomData; -use std::mem; use std::sync::Arc; +use async_trait::async_trait; use futures::future::join_all; -use futures::stream; -use futures::stream::BoxStream; use futures::StreamExt; use tokio::select; use tokio::sync::broadcast; @@ -20,6 +18,7 @@ use tokio_util::sync::DropGuard; use crate::core::InputProvider; use crate::core::Monitor; use crate::core::MonitoringSemantics; +use crate::core::OutputHandler; use crate::core::Specification; use crate::core::{OutputStream, StreamContext, StreamData, VarName}; use crate::stream_utils::{drop_guard_stream, oneshot_to_stream}; @@ -416,12 +415,16 @@ impl StreamContext for SubMonitor { * expressions as streams. * - The M type parameter is the model/specification being monitored. */ -pub struct AsyncMonitorRunner +pub struct AsyncMonitorRunner where + Val: StreamData, S: MonitoringSemantics, M: Specification, + H: OutputHandler + Send, + Expr: Sync + Send { model: M, + output_handler: H, output_streams: BTreeMap>, #[allow(dead_code)] // This is used for RAII to cancel background tasks when the async var @@ -431,13 +434,15 @@ where semantics_t: PhantomData, } -impl Monitor for AsyncMonitorRunner +#[async_trait] +impl Monitor for AsyncMonitorRunner where Val: StreamData, S: MonitoringSemantics, M: Specification, + H: OutputHandler + Send + Sync, { - fn new(model: M, input_streams: &mut dyn InputProvider) -> Self { + fn new(model: M, input_streams: &mut dyn InputProvider, output: H) -> Self { let cancellation_token = CancellationToken::new(); let cancellation_guard = Arc::new(cancellation_token.clone().drop_guard()); @@ -525,39 +530,16 @@ where semantics_t: PhantomData, cancellation_guard, expr_t: PhantomData, + output_handler: output, } } fn spec(&self) -> &M { &self.model } - - fn monitor_outputs(&mut self) -> BoxStream<'static, BTreeMap> { - let output_streams = mem::take(&mut self.output_streams); - let mut outputs = self.model.output_vars(); - outputs.sort(); - - Box::pin(stream::unfold( - (output_streams, outputs), - |(mut output_streams, outputs)| async move { - let mut futures = vec![]; - for (_, stream) in output_streams.iter_mut() { - futures.push(stream.next()); - } - - let next_vals = join_all(futures).await; - let mut res: BTreeMap = BTreeMap::new(); - for (var, val) in outputs.clone().iter().zip(next_vals) { - res.insert( - var.clone(), - match val { - Some(val) => val, - None => return None, - }, - ); - } - return Some((res, (output_streams, outputs))); - }, - )) as BoxStream<'static, BTreeMap> + + async fn run(mut self) { + self.output_handler.provide_streams(self.output_streams); + self.output_handler.run().await; } -} +} \ No newline at end of file diff --git a/src/constraint_based_runtime.rs b/src/constraint_based_runtime.rs index dea5cac..d9817a5 100644 --- a/src/constraint_based_runtime.rs +++ b/src/constraint_based_runtime.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use futures::future::join_all; use futures::stream; use futures::stream::BoxStream; @@ -7,14 +8,14 @@ use std::iter::zip; use std::mem; use crate::ast::LOLASpecification; +use crate::ast::SExpr; use crate::constraint_solver::*; use crate::core::InputProvider; use crate::core::Monitor; +use crate::core::OutputHandler; use crate::core::Specification; use crate::core::Value; use crate::core::VarName; -use crate::ast::SExpr; -use crate::OutputStream; use crate::is_enum_variant; #[derive(Default)] @@ -41,21 +42,19 @@ impl ValStreamCollection { } } -fn constraints_to_outputs<'a>( +/* Get a stream of the values of a variable from a constraint store over time */ +fn var_output_stream<'a>( + var_name: VarName, constraints: BoxStream<'a, ConstraintStore>, - output_vars: Vec, -) -> BoxStream<'a, BTreeMap> { - Box::pin( - constraints.enumerate().map(move |(index, cs)| { - let mut res = BTreeMap::new(); - for var in &output_vars { - if let Some(val) = cs.get_from_outputs_resolved(&var, &index) { - res.insert(var.clone(), val.clone()); - } - } - res - }) - ) +) -> BoxStream<'a, Value> { + Box::pin(constraints.enumerate().map(move |(index, cs)| { + if let Some(val) = cs.get_from_outputs_resolved(&var_name, &index) { + // Return the value if it is resolved + val.clone() + } else { + Value::Unknown + } + })) } #[derive(Debug, Default)] @@ -146,16 +145,17 @@ impl SyncConstraintBasedRuntime { self.resolve_possible(); self.time += 1; } - } -pub struct ConstraintBasedMonitor { +pub struct ConstraintBasedMonitor> { input_streams: ValStreamCollection, model: LOLASpecification, + output_handler: H, } -impl Monitor for ConstraintBasedMonitor { - fn new(model: LOLASpecification, input: &mut dyn InputProvider) -> Self { +#[async_trait] +impl> Monitor for ConstraintBasedMonitor { + fn new(model: LOLASpecification, input: &mut dyn InputProvider, output: H) -> Self { let input_streams = model .input_vars() .iter() @@ -169,6 +169,7 @@ impl Monitor for ConstraintBasedMonitor { ConstraintBasedMonitor { input_streams, model, + output_handler: output, } } @@ -176,26 +177,28 @@ impl Monitor for ConstraintBasedMonitor { &self.model } - fn monitor_outputs(&mut self) -> OutputStream> { - constraints_to_outputs( - self.stream_output_constraints(), - self.model.output_vars.clone(), - ) + async fn run(mut self) { + let outputs = self + .model + .output_vars() + .into_iter() + .map(|var| (var.clone(), self.output_stream(&var))) + .collect(); + self.output_handler.provide_streams(outputs); + self.output_handler.run().await; } } -impl ConstraintBasedMonitor { - fn stream_output_constraints( - &mut self, - ) -> BoxStream<'static, ConstraintStore> { +impl> ConstraintBasedMonitor { + fn stream_output_constraints(&mut self) -> BoxStream<'static, ConstraintStore> { let inputs_stream = mem::take(&mut self.input_streams).into_stream(); let mut runtime_initial = SyncConstraintBasedRuntime::default(); runtime_initial.store = model_constraints(self.model.clone()); Box::pin(stream::unfold( (inputs_stream, runtime_initial), |(mut inputs_stream, mut runtime)| async move { - // Add the new contraints to the constraint store - let new_inputs= inputs_stream.next().await?; + // Add the new constraints to the constraint store + let new_inputs = inputs_stream.next().await?; runtime.step(&new_inputs); // Keep unfolding @@ -203,4 +206,8 @@ impl ConstraintBasedMonitor { }, )) } + + fn output_stream(&mut self, var: &VarName) -> BoxStream<'static, Value> { + var_output_stream(var.clone(), self.stream_output_constraints()) + } } diff --git a/src/core.rs b/src/core.rs index bd6bcff..dfc6799 100644 --- a/src/core.rs +++ b/src/core.rs @@ -3,6 +3,7 @@ use std::{ fmt::{Debug, Display}, }; +use async_trait::async_trait; use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; // use serde_json::{Deserializer, Sserializer}; @@ -165,11 +166,11 @@ pub trait StreamContext: Send + 'static { fn advance(&self); } -pub trait MonitoringSemantics: Clone + Send + 'static { +pub trait MonitoringSemantics: Clone + Sync + Send + 'static { fn to_async_stream(expr: Expr, ctx: &dyn StreamContext) -> OutputStream; } -pub trait Specification { +pub trait Specification: Sync + Send { fn input_vars(&self) -> Vec; fn output_vars(&self) -> Vec; @@ -177,6 +178,24 @@ pub trait Specification { fn var_expr(&self, var: &VarName) -> Option; } +// This could alternatively implement Sink +// The constructor (which is not specified by the trait) should provide any +// configuration details needed by the output handler (e.g. host, port, +// output file name, etc.) whilst provide_streams is called by the runtime to +// finish the setup of the output handler by providing the streams to be output, +// and finally run is called to start the output handler. +#[async_trait] +pub trait OutputHandler: Send { + // async fn handle_output(&mut self, var: &VarName, value: V); + // This should only be called once by the runtime to provide the streams + fn provide_streams(&mut self, streams: BTreeMap>); + + // Essentially this is of type + // async fn run(&mut self); + async fn run(mut self); + // -> Pin + 'static + Send>>; +} + /* * A runtime monitor for a model/specification of type M over streams with * values of type V. @@ -185,10 +204,14 @@ pub trait Specification { * type of input provider to be provided and allows the output * to borrow from the input provider without worrying about lifetimes. */ -pub trait Monitor { - fn new(model: M, input: &mut dyn InputProvider) -> Self; +#[async_trait] +pub trait Monitor>: Send { + fn new(model: M, input: &mut dyn InputProvider, output: H) -> Self; fn spec(&self) -> &M; - fn monitor_outputs(&mut self) -> BoxStream<'static, BTreeMap>; + // Should usually wait on the output provider + async fn run(mut self); + + // fn monitor_outputs(&mut self) -> BoxStream<'static, BTreeMap>; } diff --git a/src/lib.rs b/src/lib.rs index 39ebef8..ec57cdc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,12 +25,15 @@ pub mod type_checking; pub mod untimed_monitoring_combinators; pub use file_handling::parse_file; pub mod commandline_args; +pub mod macros; +pub mod manual_output_handler; pub mod mqtt_input_provider; +pub mod null_output_handler; #[cfg(feature = "ros")] pub mod ros_input_provider; #[cfg(feature = "ros")] pub mod ros_topic_stream_mapping; +pub mod stdout_output_handler; pub mod stream_utils; pub mod typed_monitoring_combinators; pub mod typed_monitoring_semantics; -pub mod macros; diff --git a/src/main.rs b/src/main.rs index d5eaa60..2060f0b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,13 +2,13 @@ use core::panic; // #![deny(warnings)] use clap::Parser; -use futures::StreamExt; use trustworthiness_checker::InputProvider; use trustworthiness_checker::{self as tc, parse_file, type_checking::type_check, Monitor}; use trustworthiness_checker::commandline_args::{Cli, Language, Runtime, Semantics}; #[cfg(feature = "ros")] use trustworthiness_checker::ros_input_provider; +use trustworthiness_checker::stdout_output_handler::StdoutOutputHandler; const MQTT_HOSTNAME: &str = "localhost"; @@ -40,10 +40,10 @@ async fn main() { .await .expect("Input file could not be parsed"), ) - } else if let Some(input_ros_topics) = input_mode.input_ros_topics { + } else if let Some(_input_ros_topics) = input_mode.input_ros_topics { #[cfg(feature = "ros")] { - let input_mapping_str = std::fs::read_to_string(&input_ros_topics) + let input_mapping_str = std::fs::read_to_string(&_input_ros_topics) .expect("Input mapping file could not be read"); let input_mapping = tc::ros_topic_stream_mapping::json_to_mapping(&input_mapping_str) @@ -76,64 +76,67 @@ async fn main() { .await .expect("Model file could not be parsed"); + let output_handler = StdoutOutputHandler::::new(model.output_vars.clone()); + // println!("Outputs: {:?}", model.output_vars); // println!("Inputs: {:?}", model.input_vars); // println!("Model: {:?}", model); // Get the outputs from the Monitor - let outputs = match (runtime, semantics) { + let task = match (runtime, semantics) { (Runtime::Async, Semantics::Untimed) => { - let mut runner = tc::AsyncMonitorRunner::<_, _, tc::UntimedLolaSemantics, _>::new( - model, - &mut *input_streams, + let runner = Box::new( + tc::AsyncMonitorRunner::<_, _, tc::UntimedLolaSemantics, _, _>::new( + model, + &mut *input_streams, + output_handler, + ), ); - runner.monitor_outputs() + tokio::spawn(runner.run()) } (Runtime::Queuing, Semantics::Untimed) => { - let mut runner = tc::queuing_runtime::QueuingMonitorRunner::< + let runner = tc::queuing_runtime::QueuingMonitorRunner::< _, _, tc::UntimedLolaSemantics, _, - >::new(model, &mut *input_streams); - runner.monitor_outputs() + _, + >::new(model, &mut *input_streams, output_handler); + tokio::spawn(runner.run()) } (Runtime::Async, Semantics::TypedUntimed) => { let typed_model = type_check(model).expect("Model failed to type check"); // let typed_input_streams = d - let mut runner = tc::AsyncMonitorRunner::<_, _, tc::TypedUntimedLolaSemantics, _>::new( + let runner = tc::AsyncMonitorRunner::<_, _, tc::TypedUntimedLolaSemantics, _, _>::new( typed_model, &mut *input_streams, + output_handler, ); - runner.monitor_outputs() + tokio::spawn(runner.run()) } (Runtime::Queuing, Semantics::TypedUntimed) => { let typed_model = type_check(model).expect("Model failed to type check"); - let mut runner = tc::queuing_runtime::QueuingMonitorRunner::< + let runner = tc::queuing_runtime::QueuingMonitorRunner::< _, _, tc::TypedUntimedLolaSemantics, _, - >::new(typed_model, &mut *input_streams); - runner.monitor_outputs() + _, + >::new(typed_model, &mut *input_streams, output_handler); + tokio::spawn(runner.run()) } (Runtime::Constraints, Semantics::Untimed) => { - let mut runner = tc::constraint_based_runtime::ConstraintBasedMonitor::new( + let runner = tc::constraint_based_runtime::ConstraintBasedMonitor::new( model, &mut *input_streams, + output_handler, ); - runner.monitor_outputs() + tokio::spawn(runner.run()) } _ => unimplemented!(), }; - // Print the outputs - let mut enumerated_outputs = outputs.enumerate(); - while let Some((i, output)) = enumerated_outputs.next().await { - for (var, data) in output { - println!("{}[{}] = {:?}", var, i, data); - } - } + task.await.expect("Monitor failed to run"); } diff --git a/src/manual_output_handler.rs b/src/manual_output_handler.rs new file mode 100644 index 0000000..1159d01 --- /dev/null +++ b/src/manual_output_handler.rs @@ -0,0 +1,148 @@ +use std::{collections::BTreeMap, mem}; + +use async_trait::async_trait; +use futures::future::join_all; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; + +use crate::core::{OutputHandler, OutputStream, StreamData, VarName}; + +/* Some members are defined as Option as either they are provided after + * construction by provide_streams or once they are used they are taken and + * cannot be used again; this allows us to manage the lifetimes of our data + * without mutexes or arcs. */ +pub struct ManualOutputHandler { + var_names: Vec, + stream_senders: Option>>>, + stream_receivers: Option>>>, + output_sender: mpsc::Sender>, + output_receiver: Option>>, +} + +impl ManualOutputHandler { + pub fn new(var_names: Vec) -> Self { + let (stream_senders, stream_receivers): ( + Vec>>, + Vec>>, + ) = var_names.iter().map(|_| oneshot::channel()).unzip(); + let (output_sender, output_receiver) = mpsc::channel(10); + Self { + var_names, + stream_senders: Some(stream_senders), + stream_receivers: Some(stream_receivers), + output_receiver: Some(output_receiver), + output_sender, + } + } + + pub fn get_output(&mut self) -> OutputStream> { + Box::pin(ReceiverStream::new( + self.output_receiver + .take() + .expect("Output receiver missing"), + )) + } +} + +#[async_trait] +impl OutputHandler for ManualOutputHandler { + fn provide_streams(&mut self, mut streams: BTreeMap>) { + for (var_name, sender) in self + .var_names + .iter() + .zip(self.stream_senders.take().unwrap()) + { + let stream = streams + .remove(var_name) + .expect(format!("Stream for {} not found", var_name).as_str()); + assert!(sender.send(stream).is_ok()); + } + } + + async fn run(mut self) { + let receivers = mem::take(&mut self.stream_receivers).expect("Stream receivers not found"); + let mut streams: Vec<_> = receivers + .into_iter() + .map(|mut r| r.try_recv().unwrap()) + .collect(); + let output_sender = self.output_sender.clone(); + let var_names = self.var_names.clone(); + + // let receivers = receivers; + // let mut streams = streams; + // let output_sender = output_sender; + + loop { + let nexts = streams.iter_mut().map(|s| s.next()); + + // Stop outputting when any of the streams ends, otherwise collect + // all of the values + if let Some(vals) = join_all(nexts) + .await + .into_iter() + .collect::>>() + { + // Combine the values into a single map + let mut output = BTreeMap::new(); + for (var_name, value) in var_names.iter().zip(vals) { + output.insert(var_name.clone(), value); + } + // Output the combined data + output_sender.send(output).await.unwrap(); + } else { + break; + } + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + // use tokio_stream as stream; + + use crate::{OutputStream, Value, VarName}; + use futures::stream; + + use super::*; + + #[tokio::test] + async fn test_combined_output() { + let x_stream: OutputStream = Box::pin(stream::iter((0..10).map(|x| (x * 2).into()))); + let y_stream: OutputStream = + Box::pin(stream::iter((0..10).map(|x| (x * 2 + 1).into()))); + let xy_expected: Vec> = (0..10) + .map(|x| { + vec![ + (VarName("x".to_string()), (x * 2).into()), + (VarName("y".to_string()), (x * 2 + 1).into()), + ] + .into_iter() + .collect() + }) + .collect(); + let mut handler: ManualOutputHandler = + ManualOutputHandler::new(vec![VarName("x".to_string()), VarName("y".to_string())]); + + handler.provide_streams( + vec![ + (VarName("x".to_string()), x_stream), + (VarName("y".to_string()), y_stream), + ] + .into_iter() + .collect(), + ); + + // + let output_stream = handler.get_output(); + + let task = tokio::spawn(handler.run()); + + let output: Vec> = output_stream.collect().await; + + assert_eq!(output, xy_expected); + + task.await.unwrap(); + } +} diff --git a/src/null_output_handler.rs b/src/null_output_handler.rs new file mode 100644 index 0000000..7281939 --- /dev/null +++ b/src/null_output_handler.rs @@ -0,0 +1,75 @@ +use std::collections::BTreeMap; + +use async_trait::async_trait; +use futures::StreamExt; + +use crate::{ + core::{OutputHandler, OutputStream, StreamData, VarName}, + manual_output_handler::ManualOutputHandler, +}; + +/* Some members are defined as Option as either they are provided after + * construction by provide_streams or once they are used they are taken and + * cannot be used again; this allows us to manage the lifetimes of our data + * without mutexes or arcs. */ +pub struct NullOutputHandler { + manual_output_handler: ManualOutputHandler, +} + +impl NullOutputHandler { + pub fn new(var_names: Vec) -> Self { + let combined_output_handler = ManualOutputHandler::new(var_names); + + Self { + manual_output_handler: combined_output_handler, + } + } +} + +#[async_trait] +impl OutputHandler for NullOutputHandler { + fn provide_streams(&mut self, streams: BTreeMap>) { + self.manual_output_handler.provide_streams(streams); + } + + async fn run(mut self) { + let output_stream = self.manual_output_handler.get_output(); + // let mut enumerated_outputs = output_stream.enumerate(); + let task = tokio::spawn(self.manual_output_handler.run()); + + let _ = output_stream.collect::>().await; + + task.await.unwrap(); + } +} + +#[cfg(test)] +mod tests { + use crate::core::{OutputStream, Value, VarName}; + use futures::stream; + + use super::*; + + #[tokio::test] + async fn test_run_stdout_output_handler() { + let x_stream: OutputStream = Box::pin(stream::iter((0..10).map(|x| (x * 2).into()))); + let y_stream: OutputStream = + Box::pin(stream::iter((0..10).map(|x| (x * 2 + 1).into()))); + let mut handler: NullOutputHandler = + NullOutputHandler::new(vec![VarName("x".to_string()), VarName("y".to_string())]); + + handler.provide_streams( + vec![ + (VarName("x".to_string()), x_stream), + (VarName("y".to_string()), y_stream), + ] + .into_iter() + .collect(), + ); + + let task = tokio::spawn(handler.run()); + + // + task.await.unwrap(); + } +} diff --git a/src/queuing_runtime.rs b/src/queuing_runtime.rs index 156e505..cb85359 100644 --- a/src/queuing_runtime.rs +++ b/src/queuing_runtime.rs @@ -2,17 +2,16 @@ use std::collections::BTreeMap; use std::marker::PhantomData; use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::vec; +use async_trait::async_trait; use tokio::sync::Mutex; -use futures::future::join_all; use futures::stream; -use futures::stream::BoxStream; use futures::StreamExt; use crate::core::InputProvider; use crate::core::Monitor; use crate::core::MonitoringSemantics; +use crate::core::OutputHandler; use crate::core::Specification; use crate::core::StreamData; use crate::core::{OutputStream, StreamContext, VarName}; @@ -247,22 +246,30 @@ impl StreamContext for SubMonitor { * expressions as streams. * - The M type parameter is the model/specification being monitored. */ -pub struct QueuingMonitorRunner +pub struct QueuingMonitorRunner where Val: StreamData, S: MonitoringSemantics, M: Specification, + H: OutputHandler, { model: M, var_exchange: Arc>, semantics_t: PhantomData, expr_t: PhantomData, + output_handler: H, } -impl, M: Specification> - Monitor for QueuingMonitorRunner +#[async_trait] +impl< + Val: StreamData, + Expr: Send, + S: MonitoringSemantics, + M: Specification, + H: OutputHandler, + > Monitor for QueuingMonitorRunner { - fn new(model: M, input_streams: &mut dyn InputProvider) -> Self { + fn new(model: M, input_streams: &mut dyn InputProvider, output: H) -> Self { let var_names: Vec = model .input_vars() .into_iter() @@ -307,6 +314,7 @@ impl, M: Specification< var_exchange, semantics_t: PhantomData, expr_t: PhantomData, + output_handler: output, } } @@ -314,44 +322,25 @@ impl, M: Specification< &self.model } - fn monitor_outputs(&mut self) -> BoxStream<'static, BTreeMap> { - let outputs = self.model.output_vars(); - let mut output_streams: Vec> = vec![]; - for output in outputs.iter().cloned() { - output_streams.push(Box::pin(self.output_stream(output))); - } - - Box::pin(stream::unfold( - (output_streams, outputs), - |(mut output_streams, outputs)| async move { - let mut futures = vec![]; - for output_stream in output_streams.iter_mut() { - futures.push(output_stream.next()); - } - - let next_vals: Vec> = join_all(futures).await; - let mut res: BTreeMap = BTreeMap::new(); - for (var, val) in outputs.clone().iter().zip(next_vals) { - res.insert( - var.clone(), - match val { - Some(val) => val, - None => return None, - }, - ); - } - Some((res, (output_streams, outputs))) - as Option<( - BTreeMap, - (Vec>, Vec), - )> - }, - )) + async fn run(mut self) { + let output_streams = self + .model + .output_vars() + .into_iter() + .map(|var| (var.clone(), self.output_stream(var))) + .collect(); + self.output_handler.provide_streams(output_streams); + self.output_handler.run().await; } } -impl, M: Specification> - QueuingMonitorRunner +impl< + Val: StreamData, + Expr, + S: MonitoringSemantics, + M: Specification, + H: OutputHandler, + > QueuingMonitorRunner { fn output_stream(&self, var: VarName) -> OutputStream { self.var_exchange.var(&var).unwrap() diff --git a/src/stdout_output_handler.rs b/src/stdout_output_handler.rs new file mode 100644 index 0000000..5f3a184 --- /dev/null +++ b/src/stdout_output_handler.rs @@ -0,0 +1,79 @@ +use std::collections::BTreeMap; + +use async_trait::async_trait; +use futures::StreamExt; + +use crate::{ + core::{OutputHandler, OutputStream, StreamData, VarName}, + manual_output_handler::ManualOutputHandler, +}; + +/* Some members are defined as Option as either they are provided after + * construction by provide_streams or once they are used they are taken and + * cannot be used again; this allows us to manage the lifetimes of our data + * without mutexes or arcs. */ +pub struct StdoutOutputHandler { + manual_output_handler: ManualOutputHandler, +} + +impl StdoutOutputHandler { + pub fn new(var_names: Vec) -> Self { + let combined_output_handler = ManualOutputHandler::new(var_names); + + Self { + manual_output_handler: combined_output_handler, + } + } +} + +#[async_trait] +impl OutputHandler for StdoutOutputHandler { + fn provide_streams(&mut self, streams: BTreeMap>) { + self.manual_output_handler.provide_streams(streams); + } + + async fn run(mut self) { + let output_stream = self.manual_output_handler.get_output(); + let mut enumerated_outputs = output_stream.enumerate(); + let task = tokio::spawn(self.manual_output_handler.run()); + + while let Some((i, output)) = enumerated_outputs.next().await { + for (var, data) in output { + println!("{}[{}] = {:?}", var, i, data); + } + } + + task.await.unwrap(); + } +} + +#[cfg(test)] +mod tests { + use crate::core::{OutputStream, Value, VarName}; + use futures::stream; + + use super::*; + + #[tokio::test] + async fn test_run_stdout_output_handler() { + let x_stream: OutputStream = Box::pin(stream::iter((0..10).map(|x| (x * 2).into()))); + let y_stream: OutputStream = + Box::pin(stream::iter((0..10).map(|x| (x * 2 + 1).into()))); + let mut handler: StdoutOutputHandler = + StdoutOutputHandler::new(vec![VarName("x".to_string()), VarName("y".to_string())]); + + handler.provide_streams( + vec![ + (VarName("x".to_string()), x_stream), + (VarName("y".to_string()), y_stream), + ] + .into_iter() + .collect(), + ); + + let task = tokio::spawn(handler.run()); + + // + task.await.unwrap(); + } +} diff --git a/tests/constraint_based_lola.rs b/tests/constraint_based_lola.rs index 4cae364..8cf4bc1 100644 --- a/tests/constraint_based_lola.rs +++ b/tests/constraint_based_lola.rs @@ -4,24 +4,27 @@ use futures::stream::StreamExt; use std::collections::BTreeMap; use trustworthiness_checker::constraint_based_runtime::ConstraintBasedMonitor; use trustworthiness_checker::lola_specification; +use trustworthiness_checker::manual_output_handler::ManualOutputHandler; use trustworthiness_checker::{Monitor, Value, VarName}; mod lola_fixtures; -use lola_fixtures::*; use futures::stream; use futures::stream::BoxStream; +use lola_fixtures::*; use std::pin::Pin; pub fn input_streams1() -> BTreeMap> { let mut input_streams = BTreeMap::new(); input_streams.insert( VarName("x".into()), - Box::pin(stream::iter(vec![Value::Int(1), Value::Int(3), Value::Int(5)].into_iter())) - as Pin + std::marker::Send>>, + Box::pin(stream::iter( + vec![Value::Int(1), Value::Int(3), Value::Int(5)].into_iter(), + )) as Pin + std::marker::Send>>, ); input_streams.insert( VarName("y".into()), - Box::pin(stream::iter(vec![Value::Int(2), Value::Int(4), Value::Int(6)].into_iter())) - as Pin + std::marker::Send>>, + Box::pin(stream::iter( + vec![Value::Int(2), Value::Int(4), Value::Int(6)].into_iter(), + )) as Pin + std::marker::Send>>, ); input_streams } @@ -30,9 +33,11 @@ pub fn input_streams1() -> BTreeMap> { async fn test_simple_add_monitor() { let mut input_streams = input_streams1(); let spec = lola_specification(&mut spec_simple_add_monitor()).unwrap(); - let mut monitor = ConstraintBasedMonitor::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = - monitor.monitor_outputs().enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let monitor = ConstraintBasedMonitor::new(spec, &mut input_streams, output_handler); + tokio::spawn(monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert_eq!( outputs, vec![ @@ -62,8 +67,11 @@ async fn test_simple_add_monitor() { async fn test_runtime_initialization() { let mut input_streams = input_empty(); let spec = lola_specification(&mut spec_empty()).unwrap(); - let mut monitor = ConstraintBasedMonitor::new(spec, &mut input_streams); - let outputs: Vec< BTreeMap> = monitor.monitor_outputs().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let monitor = ConstraintBasedMonitor::new(spec, &mut input_streams, output_handler); + tokio::spawn(monitor.run()); + let outputs: Vec> = outputs.collect().await; assert_eq!(outputs.len(), 0); } @@ -72,9 +80,11 @@ async fn test_var() { let mut input_streams = input_streams1(); let mut spec = "in x\nout z\nz =x"; let spec = lola_specification(&mut spec).unwrap(); - let mut monitor = ConstraintBasedMonitor::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = - monitor.monitor_outputs().enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let monitor = ConstraintBasedMonitor::new(spec, &mut input_streams, output_handler); + tokio::spawn(monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert!(outputs.len() == 3); assert_eq!( outputs, @@ -106,9 +116,12 @@ async fn test_literal_expression() { let mut input_streams = input_streams1(); let mut spec = "out z\nz =42"; let spec = lola_specification(&mut spec).unwrap(); - let mut monitor = ConstraintBasedMonitor::new(spec, &mut input_streams); - let outputs : Vec<(usize, BTreeMap)> - = monitor.monitor_outputs().take(3).enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let monitor = ConstraintBasedMonitor::new(spec, &mut input_streams, output_handler); + tokio::spawn(monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = + outputs.take(3).enumerate().collect().await; assert!(outputs.len() == 3); assert_eq!( outputs, @@ -140,9 +153,11 @@ async fn test_addition() { let mut input_streams = input_streams1(); let mut spec = "in x\nout z\nz =x+1"; let spec = lola_specification(&mut spec).unwrap(); - let mut monitor = ConstraintBasedMonitor::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = - monitor.monitor_outputs().enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let monitor = ConstraintBasedMonitor::new(spec, &mut input_streams, output_handler); + tokio::spawn(monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert!(outputs.len() == 3); assert_eq!( outputs, @@ -174,9 +189,12 @@ async fn test_subtraction() { let mut input_streams = input_streams1(); let mut spec = "in x\nout z\nz =x-10"; let spec = lola_specification(&mut spec).unwrap(); - let mut monitor = ConstraintBasedMonitor::new(spec, &mut input_streams); + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let monitor = ConstraintBasedMonitor::new(spec, &mut input_streams, output_handler); + tokio::spawn(monitor.run()); let outputs: Vec<(usize, BTreeMap)> = - monitor.monitor_outputs().enumerate().collect().await; + outputs.enumerate().collect().await; assert!(outputs.len() == 3); assert_eq!( outputs, @@ -208,9 +226,12 @@ async fn test_index_past() { let mut input_streams = input_streams1(); let mut spec = "in x\nout z\nz =x[-1, 0]"; let spec = lola_specification(&mut spec).unwrap(); - let mut monitor = ConstraintBasedMonitor::new(spec, &mut input_streams); + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let monitor = ConstraintBasedMonitor::new(spec, &mut input_streams, output_handler); + tokio::spawn(monitor.run()); let outputs: Vec<(usize, BTreeMap)> = - monitor.monitor_outputs().enumerate().collect().await; + outputs.enumerate().collect().await; assert!(outputs.len() == 3); assert_eq!( outputs, @@ -240,7 +261,6 @@ async fn test_index_past() { ); } - // #[tokio::test] // async fn test_index_future() { // let mut input_streams = input_streams1(); diff --git a/tests/typed_untimed_semantics_async.rs b/tests/typed_untimed_semantics_async.rs index ec99e39..fec99b3 100644 --- a/tests/typed_untimed_semantics_async.rs +++ b/tests/typed_untimed_semantics_async.rs @@ -2,6 +2,7 @@ use futures::stream::{BoxStream, StreamExt}; use std::collections::BTreeMap; +use trustworthiness_checker::manual_output_handler::ManualOutputHandler; use trustworthiness_checker::queuing_runtime::QueuingMonitorRunner; use trustworthiness_checker::type_checking::type_check; use trustworthiness_checker::{ @@ -16,10 +17,15 @@ async fn test_simple_add_monitor() { let mut input_streams = input_streams3(); let spec = lola_specification(&mut spec_simple_add_monitor_typed()).unwrap(); let spec = type_check(spec).expect("Type check failed"); - let mut async_monitor = - AsyncMonitorRunner::<_, _, TypedUntimedLolaSemantics, _>::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = - async_monitor.monitor_outputs().enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = AsyncMonitorRunner::<_, _, TypedUntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert_eq!( outputs, vec![ @@ -46,10 +52,15 @@ async fn test_concat_monitor() { let spec = type_check(spec).expect("Type check failed"); // let mut async_monitor = // AsyncMonitorRunner::<_, _, TypedUntimedLolaSemantics, _>::new(spec, input_streams); - let mut async_monitor = - QueuingMonitorRunner::<_, _, TypedUntimedLolaSemantics, _>::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = - async_monitor.monitor_outputs().enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = QueuingMonitorRunner::<_, _, TypedUntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert_eq!( outputs, vec![ @@ -74,14 +85,16 @@ async fn test_count_monitor() { let mut input_streams: BTreeMap> = BTreeMap::new(); let spec = lola_specification(&mut spec_typed_count_monitor()).unwrap(); let spec = type_check(spec).expect("Type check failed"); - let mut async_monitor = - AsyncMonitorRunner::<_, _, TypedUntimedLolaSemantics, _>::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = async_monitor - .monitor_outputs() - .take(4) - .enumerate() - .collect() - .await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = AsyncMonitorRunner::<_, _, TypedUntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = + outputs.take(4).enumerate().collect().await; assert_eq!( outputs, vec![ @@ -120,10 +133,15 @@ async fn test_eval_monitor() { let spec = lola_specification(&mut spec_typed_eval_monitor()).unwrap(); let spec = type_check(spec).expect("Type check failed"); println!("{:?}", spec); - let mut async_monitor = - AsyncMonitorRunner::<_, _, TypedUntimedLolaSemantics, _>::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = - async_monitor.monitor_outputs().enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = AsyncMonitorRunner::<_, _, TypedUntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert_eq!( outputs, vec![ diff --git a/tests/untimed_semantics_async.rs b/tests/untimed_semantics_async.rs index 7baedeb..1840f28 100644 --- a/tests/untimed_semantics_async.rs +++ b/tests/untimed_semantics_async.rs @@ -2,6 +2,7 @@ use futures::stream::{BoxStream, StreamExt}; use std::collections::BTreeMap; +use trustworthiness_checker::manual_output_handler::ManualOutputHandler; use trustworthiness_checker::UntimedLolaSemantics; use trustworthiness_checker::{ async_runtime::AsyncMonitorRunner, lola_specification, Monitor, Value, VarName, @@ -13,10 +14,15 @@ use lola_fixtures::*; async fn test_simple_add_monitor() { let mut input_streams = input_streams1(); let spec = lola_specification(&mut spec_simple_add_monitor()).unwrap(); - let mut async_monitor = - AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _>::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = - async_monitor.monitor_outputs().enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert_eq!( outputs, vec![ @@ -41,9 +47,15 @@ async fn test_simple_add_monitor_does_not_go_away() { let mut input_streams = input_streams1(); let spec = lola_specification(&mut spec_simple_add_monitor()).unwrap(); let outputs = { - let mut async_monitor = - AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _>::new(spec, &mut input_streams); - async_monitor.monitor_outputs() + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + outputs }; let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert_eq!( @@ -67,17 +79,18 @@ async fn test_simple_add_monitor_does_not_go_away() { #[tokio::test] async fn test_count_monitor() { - let mut input_streams: BTreeMap> = - BTreeMap::new(); + let mut input_streams: BTreeMap> = BTreeMap::new(); let spec = lola_specification(&mut spec_count_monitor()).unwrap(); - let mut async_monitor = - AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _>::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = async_monitor - .monitor_outputs() - .take(4) - .enumerate() - .collect() - .await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = + outputs.take(4).enumerate().collect().await; assert_eq!( outputs, vec![ @@ -113,10 +126,13 @@ async fn test_count_monitor() { async fn test_eval_monitor() { let mut input_streams = input_streams2(); let spec = lola_specification(&mut spec_eval_monitor()).unwrap(); - let mut async_monitor = - AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _>::new(spec, &mut input_streams); + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = + AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _, _>::new(spec, &mut input_streams, output_handler); + tokio::spawn(async_monitor.run()); let outputs: Vec<(usize, BTreeMap)> = - async_monitor.monitor_outputs().enumerate().collect().await; + outputs.enumerate().collect().await; assert_eq!( outputs, vec![ diff --git a/tests/untimed_semantics_queuing.rs b/tests/untimed_semantics_queuing.rs index d65b63c..1014929 100644 --- a/tests/untimed_semantics_queuing.rs +++ b/tests/untimed_semantics_queuing.rs @@ -2,10 +2,9 @@ use futures::stream::{BoxStream, StreamExt}; use std::collections::BTreeMap; +use trustworthiness_checker::manual_output_handler::ManualOutputHandler; use trustworthiness_checker::{lola_specification, UntimedLolaSemantics}; -use trustworthiness_checker::{ - queuing_runtime::QueuingMonitorRunner, Value, Monitor, VarName, -}; +use trustworthiness_checker::{queuing_runtime::QueuingMonitorRunner, Monitor, Value, VarName}; mod lola_fixtures; use lola_fixtures::*; @@ -13,10 +12,15 @@ use lola_fixtures::*; async fn test_simple_add_monitor() { let mut input_streams = input_streams1(); let spec = lola_specification(&mut spec_simple_add_monitor()).unwrap(); - let mut async_monitor = - QueuingMonitorRunner::<_, _, UntimedLolaSemantics, _>::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = - async_monitor.monitor_outputs().enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = QueuingMonitorRunner::<_, _, UntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert_eq!( outputs, vec![ @@ -40,14 +44,16 @@ async fn test_simple_add_monitor() { async fn test_count_monitor() { let mut input_streams: BTreeMap> = BTreeMap::new(); let spec = lola_specification(&mut spec_count_monitor()).unwrap(); - let mut async_monitor = - QueuingMonitorRunner::<_, _, UntimedLolaSemantics, _>::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = async_monitor - .monitor_outputs() - .take(4) - .enumerate() - .collect() - .await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = QueuingMonitorRunner::<_, _, UntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = + outputs.take(4).enumerate().collect().await; assert_eq!( outputs, vec![ @@ -83,10 +89,15 @@ async fn test_count_monitor() { async fn test_eval_monitor() { let mut input_streams = input_streams2(); let spec = lola_specification(&mut spec_eval_monitor()).unwrap(); - let mut async_monitor = - QueuingMonitorRunner::<_, _, UntimedLolaSemantics, _>::new(spec, &mut input_streams); - let outputs: Vec<(usize, BTreeMap)> = - async_monitor.monitor_outputs().enumerate().collect().await; + let mut output_handler = ManualOutputHandler::new(spec.output_vars.clone()); + let outputs = output_handler.get_output(); + let async_monitor = QueuingMonitorRunner::<_, _, UntimedLolaSemantics, _, _>::new( + spec, + &mut input_streams, + output_handler, + ); + tokio::spawn(async_monitor.run()); + let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert_eq!( outputs, vec![