Skip to content

Commit

Permalink
Add documentation for the async runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
twright committed Nov 7, 2024
1 parent a32b0d6 commit 9ee8ea4
Showing 1 changed file with 114 additions and 19 deletions.
133 changes: 114 additions & 19 deletions src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,25 @@ use crate::core::MonitoringSemantics;
use crate::core::Specification;
use crate::core::{OutputStream, StreamContext, StreamData, VarName};

/* A struct containing a stream of type S which is lazily provided via a
* oneshot channel. This is used to allow a stream to be provided lazily to a
* consumer.
*
* This is either in the state Arrived(S) if the stream has already arrived, or
* Waiting(oneshot::Receiver<S>) when we are still waiting to receive the
* stream on the channel.
*/
enum WaitingStream<S> {
Arrived(S),
Waiting(oneshot::Receiver<S>),
}

// Convert a receiver of a stream to a stream which waits for the stream to
// arrive before supplying the first element
/* This function takes a receiver of a stream and returns a new stream which
* first waits for the stream to be received on the channel before the first
* element is supplied. This is useful for providing code which depend on a
* stream with this stream before it has been defined, and make it possible to
* define multiple mutually recursive streams.
*/
fn oneshot_to_stream<T: 'static>(receiver: oneshot::Receiver<OutputStream<T>>) -> OutputStream<T> {
Box::pin(stream::unfold(
WaitingStream::Waiting(receiver),
Expand All @@ -48,8 +60,10 @@ fn oneshot_to_stream<T: 'static>(receiver: oneshot::Receiver<OutputStream<T>>) -
))
}

// Wrap a stream in a drop guard to ensure that the associated cancellation
// token is not dropped before the stream has completed or been dropped
/* Wrap a stream in a drop guard to ensure that the associated cancellation
* token is not dropped before the stream has completed or been dropped.
* This is used for automatic cleanup of background tasks when all consumers
* of an output stream have gone away. */
fn drop_guard_stream<T: 'static>(
stream: OutputStream<T>,
drop_guard: Arc<DropGuard>,
Expand All @@ -67,8 +81,28 @@ fn drop_guard_stream<T: 'static>(
))
}

// An actor which manages a single variable by providing channels to get
// the output stream and publishing values to all subscribers
/* An actor which manages access to a stream variable by tracking the
* subscribers to the variable and creating independent output streams to
* forwards new data to each subscribers.
*
* This actor goes through two stages:
* 1. Gathering subscribers: In this stage, the actor waits for all subscribers
* to request output streams
* 2. Distributing data: In this stage, the actor forwards data from the input
* stream to all subscribers.
*
* This has parameters:
* - var: the name of the variable being managed
* - input_stream: the stream of inputs which we are distributing
* - channel_request_rx: a mpsc channel on which we receive requests for a
* new subscription to the variable. We are passed a oneshot channel which
* we can used to send the output stream to the requester.
* - ready: a watch channel which is used to signal when all subscribers have
* requested the stream and determines when we move to stage 2 to start
* distributing data
* - cancel: a cancellation token which is used to signal when the actor should
* terminate
*/
async fn manage_var<V: StreamData>(
var: VarName,
mut input_stream: OutputStream<V>,
Expand All @@ -80,9 +114,9 @@ async fn manage_var<V: StreamData>(
let mut send_requests = vec![];
// println!("Starting manage_var for {}", var);
// Gathering senders

/* Stage 1. Gathering subscribers */
loop {
// println!("check stage 1");
// println!("Ready: {:?}", *ready.borrow());
select! {
biased;
_ = cancel.cancelled() => {
Expand All @@ -103,29 +137,30 @@ async fn manage_var<V: StreamData>(
}
}

// Sending subscriptions
/* Send subscriptions to everyone who has requested one */
if send_requests.len() == 1 {
// Special case handling for a single request; just send the input stream
/* Special case handling for a single subscription request: just send
* the input stream directly to the subscriber and skip stage 2 */
let channel_sender = send_requests.pop().unwrap();
if let Err(_) = channel_sender.send(input_stream) {
panic!("Failed to send stream for {var} to requester");
}
// We directly re-forwarded the input stream, so we are done
// println!("Direct reforward for {}", var);
return;
} else {
/* Normal case: create a new channel for each subscriber and send
* them a stream generated from the receiving end of this channel */
for channel_sender in send_requests {
let (tx, rx) = mpsc::channel(10);
senders.push(tx);
let stream = ReceiverStream::new(rx);
// let typed_stream = SS::to_typed_stream(typ, Box::pin(stream));
if let Err(_) = channel_sender.send(Box::pin(stream)) {
// panic!("Failed to send stream for {var} to requester");
}
}
}

// Distributing data
/* Stage 2. Distributing data */
loop {
select! {
biased;
Expand Down Expand Up @@ -153,8 +188,20 @@ async fn manage_var<V: StreamData>(
}
}

// Task for moving data from a channel to an broadcast channel
// with a clock used to control the rate of data distribution
/* Task for moving data from a channel to an broadcast channel
* with a clock used to control the rate of data distribution
*
* This is used alongside the monitor task to implement subcontexts
* by buffering data from the input stream and distributing it to subscribers
* when the clock advances (i.e. when subcontext.advance() is called)
*
* This has parameters:
* - input_stream: the stream of inputs which we are distributing
* - send: the broadcast channel to which we are sending data
* - clock: a watch channel which is used to signal when the clock advances
* - cancellation_token: a cancellation token which is used to signal when to
* terminate the task
*/
async fn distribute<V: Clone + Debug + Send + 'static>(
mut input_stream: OutputStream<V>,
send: broadcast::Sender<V>,
Expand Down Expand Up @@ -205,7 +252,18 @@ async fn distribute<V: Clone + Debug + Send + 'static>(
}
}

// Task for moving data from an input stream to an output channel
/* Task for moving data from an input stream to an output channel
*
* This is used in the implementation of subcontexts to buffer data before it
* is sent to distribute
*
* This has parameters:
* - input_stream: the stream of inputs which we are monitoring
* - send: the channel to which we are sending data (corresponding to the
* other half of the channel held by distribute)
* - cancellation_token: a cancellation token which is used to signal when to
* terminate the task
*/
async fn monitor<V: StreamData>(
mut input_stream: OutputStream<V>,
send: mpsc::Sender<V>,
Expand Down Expand Up @@ -239,7 +297,25 @@ struct VarData<Val> {
requester: mpsc::Sender<oneshot::Sender<OutputStream<Val>>>,
}

// A struct representing the top-level stream context for an async monitor
/*
* A StreamContext that manages subscriptions to each variable and provides
* an asynchronous stream when requested by the monitoring semantics.
*
* This includes:
* - var_data: a map from variable names to the data associated with each
* variable, including the requester channel used to request the stream
* - cancellation_token: a cancellation token used to signal when any async
* actors we have launched should terminate
* - drop_guard: a reference-counted drop guard associated with the
* cancellation token which signals to cancel all background tasks when there
* it is dropped (i.e. when the reference count goes to zero)
* - vars_requested: a watch channel used to track the number of outstanding
* requests for subscriptions to each variable (this is used to determine
* when it is safe for managed_var to start distributing data)
*
* Most of the logic is in the manage_var actor which is launched for each
* variable to manage subscriptions and distribute data.
*/
struct AsyncVarExchange<Val: StreamData> {
var_data: BTreeMap<VarName, VarData<Val>>,
cancellation_token: CancellationToken,
Expand Down Expand Up @@ -305,8 +381,13 @@ impl<Val: StreamData> StreamContext<Val> for Arc<AsyncVarExchange<Val>> {
}
}

// A subcontext which consumes data for a subset of the variables and makes
// it available when evaluating a deferred expression
/* A subcontext which consumes data for a subset of the variables and makes
* it available when evaluating a deferred expression
*
* This is implemented via multiple background monitor and distribute actor
* pairs which buffer data from the input stream and distribute it to
* each subscriber to the subcontext
*/
struct SubMonitor<Val: StreamData> {
parent: Arc<AsyncVarExchange<Val>>,
senders: BTreeMap<VarName, broadcast::Sender<Val>>,
Expand Down Expand Up @@ -379,6 +460,20 @@ impl<Val: StreamData> StreamContext<Val> for SubMonitor<Val> {
}
}

/*
* A Monitor instance implementing the Async Runtime.
*
* This runtime uses async actors to keep track of dependencies between
* channels and to distribute data between them, pass data around via async
* streams, and automatically perform garbage collection of the data contained
* in the streams.
*
* - The Expr type parameter is the type of the expressions in the model.
* - The Val type parameter is the type of the values used in the channels.
* - The S type parameter is the monitoring semantics used to evaluate the
* expressions as streams.
* - The M type parameter is the model/specification being monitored.
*/
pub struct AsyncMonitorRunner<Expr, Val, S, M>
where
S: MonitoringSemantics<Expr, Val>,
Expand Down

0 comments on commit 9ee8ea4

Please sign in to comment.