diff --git a/src/untimed_monitoring_combinators.rs b/src/untimed_monitoring_combinators.rs index aaa4a2a..d95cacb 100644 --- a/src/untimed_monitoring_combinators.rs +++ b/src/untimed_monitoring_combinators.rs @@ -275,41 +275,42 @@ pub fn var(ctx: &dyn StreamContext, x: VarName) -> OutputStream { // Defer for an UntimedLolaExpression using the lola_expression parser pub fn defer( ctx: &dyn StreamContext, - prop_stream: OutputStream, + mut prop_stream: OutputStream, history_length: usize, ) -> OutputStream { /* Subcontext with current values only*/ let subcontext = ctx.subcontext(history_length); - /*unfold() creates a Stream from a seed value.*/ - Box::pin(stream::unfold( - (subcontext, prop_stream, None::), - |(subcontext, mut x, saved)| async move { - /* x.next() returns None if we are done unfolding. Return in that case.*/ - let current = x.next().await?; - /* If we have a saved state then use that otherwise use current */ - let defer_str = saved.unwrap_or_else(|| current); - - match defer_str { + + Box::pin(stream! { + let mut eval_output_stream: Option> = None; + + // Yield Unknown until we have a value to evaluate, then evaluate it + while let Some(current) = prop_stream.next().await { + match current { Value::Str(defer_s) => { + // We have a string to evaluate so do so let defer_parse = &mut defer_s.as_str(); - let expr = match lola_expression.parse_next(defer_parse) { - Ok(expr) => expr, - Err(_) => unimplemented!("Invalid eval str"), - }; - let mut es = UntimedLolaSemantics::to_async_stream(expr, subcontext.deref()); - let eval_res = es.next().await?; - subcontext.advance(); - return Some((eval_res, (subcontext, x, Some(Value::Str(defer_s))))); + let expr = lola_expression.parse_next(defer_parse) + .expect("Invalid eval str"); + eval_output_stream = Some(UntimedLolaSemantics::to_async_stream(expr, subcontext.deref())); + break; } Value::Unknown => { // Consume a sample from the subcontext but return Unknown (aka. Waiting) subcontext.advance(); - Some((Value::Unknown, (subcontext, x, None))) + yield Value::Unknown; } - _ => panic!("We did not have memory and defer_str was not a Str"), + _ => panic!("Invalid defer property type {:?}", current) } - }, - )) + } + let mut eval_output_stream = eval_output_stream.expect("No eval stream"); + + // Yield the saved value until the inner stream is done + while let Some(eval_res) = eval_output_stream.next().await { + subcontext.advance(); + yield eval_res; + } + }) } // Update for a synchronized language - in this case UntimedLolaSemantics.