Skip to content

Commit

Permalink
Refactoring together
Browse files Browse the repository at this point in the history
  • Loading branch information
mortenhaahr committed Nov 12, 2024
1 parent 6c19cd6 commit 5d11ba1
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dyn-clone = "1.0.17"
backtrace-on-stack-overflow = "0.3.0"
tokio-util = "0.7.11"
clap = { version = "4.5.17", features = ["derive"] }
async-stream = "0.3.6"

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] }
Expand Down
26 changes: 15 additions & 11 deletions src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,25 @@ use crate::core::MonitoringSemantics;
use crate::core::Specification;
use crate::core::{OutputStream, StreamContext, StreamData, VarName};

/* This function takes a receiver of a stream and returns a new stream which
* first waits for the stream to be received on the channel before the first
* element is supplied. This is useful for providing code which depend on a
* stream with this stream before it has been defined, and make it possible to
* define multiple mutually recursive streams.
/* Converts a `oneshot::Receiver` of an `OutputStream` into an `OutputStream`.
* Is done by first waiting for the oneshot to resolve to an OutputStream and then continously yielding
* the values from the stream.
*/
fn oneshot_to_stream<T: StreamData>(
receiver: oneshot::Receiver<OutputStream<T>>,
) -> OutputStream<T> {
let empty_stream = Box::pin(stream::empty());
Box::pin(
receiver
.map(|res| res.unwrap_or(empty_stream))
.flatten_stream()
)
Box::pin(async_stream::stream! {
match receiver.await {
Ok(mut s) => {
while let Some(val) = s.next().await {
yield val;
}
}
Err(_) => {
return;
}
}
})
}

/* Wrap a stream in a drop guard to ensure that the associated cancellation
Expand Down
1 change: 1 addition & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use futures::stream::BoxStream;

#[derive(Clone, Debug, PartialEq, Eq)]
// TODO: Rename to Value
pub enum ConcreteStreamData {
Int(i64),
Str(String),
Expand Down

0 comments on commit 5d11ba1

Please sign in to comment.