Skip to content

Commit

Permalink
Restore the implementation of oneshot_to_stream using flatten_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
twright committed Nov 12, 2024
1 parent 0e2b022 commit 1966b27
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,21 @@ use crate::core::Specification;
use crate::core::{OutputStream, StreamContext, StreamData, VarName};

/* Converts a `oneshot::Receiver` of an `OutputStream` into an `OutputStream`.
* Is done by first waiting for the oneshot to resolve to an OutputStream and then continously yielding
* the values from the stream.
* Is done by first waiting for the oneshot to resolve to an OutputStream and
* then continuously yielding the values from the stream. This is implemented
* using the `flatten_stream` combinator from the `futures` crate, which
* is essentially a general version of this function (except for handling the
* case where the oneshot resolves to an error due to the sender going away).
*/
fn oneshot_to_stream<T: StreamData>(
receiver: oneshot::Receiver<OutputStream<T>>,
) -> OutputStream<T> {
Box::pin(async_stream::stream! {
match receiver.await {
Ok(mut s) => {
while let Some(val) = s.next().await {
yield val;
}
}
Err(_) => {
return;
}
}
})
let empty_stream = Box::pin(stream::empty());
Box::pin(
receiver
.map(|res| res.unwrap_or(empty_stream))
.flatten_stream(),
)
}

/* Wrap a stream in a drop guard to ensure that the associated cancellation
Expand Down

0 comments on commit 1966b27

Please sign in to comment.