Skip to content

Commit

Permalink
Refactor defer to use the stream! macro
Browse files Browse the repository at this point in the history
  • Loading branch information
twright committed Nov 20, 2024
1 parent 29f3eb2 commit be5c41b
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions src/untimed_monitoring_combinators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,41 +275,42 @@ pub fn var(ctx: &dyn StreamContext<Value>, x: VarName) -> OutputStream<Value> {
// Defer for an UntimedLolaExpression using the lola_expression parser
pub fn defer(
ctx: &dyn StreamContext<Value>,
prop_stream: OutputStream<Value>,
mut prop_stream: OutputStream<Value>,
history_length: usize,
) -> OutputStream<Value> {
/* Subcontext with current values only*/
let subcontext = ctx.subcontext(history_length);
/*unfold() creates a Stream from a seed value.*/
Box::pin(stream::unfold(
(subcontext, prop_stream, None::<Value>),
|(subcontext, mut x, saved)| async move {
/* x.next() returns None if we are done unfolding. Return in that case.*/
let current = x.next().await?;
/* If we have a saved state then use that otherwise use current */
let defer_str = saved.unwrap_or_else(|| current);

match defer_str {

Box::pin(stream! {
let mut eval_output_stream: Option<OutputStream<Value>> = None;

// Yield Unknown until we have a value to evaluate, then evaluate it
while let Some(current) = prop_stream.next().await {
match current {
Value::Str(defer_s) => {
// We have a string to evaluate so do so
let defer_parse = &mut defer_s.as_str();
let expr = match lola_expression.parse_next(defer_parse) {
Ok(expr) => expr,
Err(_) => unimplemented!("Invalid eval str"),
};
let mut es = UntimedLolaSemantics::to_async_stream(expr, subcontext.deref());
let eval_res = es.next().await?;
subcontext.advance();
return Some((eval_res, (subcontext, x, Some(Value::Str(defer_s)))));
let expr = lola_expression.parse_next(defer_parse)
.expect("Invalid eval str");
eval_output_stream = Some(UntimedLolaSemantics::to_async_stream(expr, subcontext.deref()));
break;
}
Value::Unknown => {
// Consume a sample from the subcontext but return Unknown (aka. Waiting)
subcontext.advance();
Some((Value::Unknown, (subcontext, x, None)))
yield Value::Unknown;
}
_ => panic!("We did not have memory and defer_str was not a Str"),
_ => panic!("Invalid defer property type {:?}", current)
}
},
))
}
let mut eval_output_stream = eval_output_stream.expect("No eval stream");

// Yield the saved value until the inner stream is done
while let Some(eval_res) = eval_output_stream.next().await {
subcontext.advance();
yield eval_res;
}
})
}

// Update for a synchronized language - in this case UntimedLolaSemantics.
Expand Down

0 comments on commit be5c41b

Please sign in to comment.