From 9ee8ea4822e84233b00b93edc0a2567764896990 Mon Sep 17 00:00:00 2001 From: Thomas Wright Date: Thu, 7 Nov 2024 12:13:46 +0100 Subject: [PATCH] Add documentation for the async runtime --- src/async_runtime.rs | 133 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 114 insertions(+), 19 deletions(-) diff --git a/src/async_runtime.rs b/src/async_runtime.rs index 819c0b8..2c388a1 100644 --- a/src/async_runtime.rs +++ b/src/async_runtime.rs @@ -24,13 +24,25 @@ use crate::core::MonitoringSemantics; use crate::core::Specification; use crate::core::{OutputStream, StreamContext, StreamData, VarName}; +/* A struct containing a stream of type S which is lazily provided via a + * oneshot channel. This is used to allow a stream to be provided lazily to a + * consumer. + * + * This is either in the state Arrived(S) if the stream has already arrived, or + * Waiting(oneshot::Receiver) when we are still waiting to receive the + * stream on the channel. + */ enum WaitingStream { Arrived(S), Waiting(oneshot::Receiver), } -// Convert a receiver of a stream to a stream which waits for the stream to -// arrive before supplying the first element +/* This function takes a receiver of a stream and returns a new stream which + * first waits for the stream to be received on the channel before the first + * element is supplied. This is useful for providing code which depend on a + * stream with this stream before it has been defined, and make it possible to + * define multiple mutually recursive streams. + */ fn oneshot_to_stream(receiver: oneshot::Receiver>) -> OutputStream { Box::pin(stream::unfold( WaitingStream::Waiting(receiver), @@ -48,8 +60,10 @@ fn oneshot_to_stream(receiver: oneshot::Receiver>) - )) } -// Wrap a stream in a drop guard to ensure that the associated cancellation -// token is not dropped before the stream has completed or been dropped +/* Wrap a stream in a drop guard to ensure that the associated cancellation + * token is not dropped before the stream has completed or been dropped. + * This is used for automatic cleanup of background tasks when all consumers + * of an output stream have gone away. */ fn drop_guard_stream( stream: OutputStream, drop_guard: Arc, @@ -67,8 +81,28 @@ fn drop_guard_stream( )) } -// An actor which manages a single variable by providing channels to get -// the output stream and publishing values to all subscribers +/* An actor which manages access to a stream variable by tracking the + * subscribers to the variable and creating independent output streams to + * forwards new data to each subscribers. + * + * This actor goes through two stages: + * 1. Gathering subscribers: In this stage, the actor waits for all subscribers + * to request output streams + * 2. Distributing data: In this stage, the actor forwards data from the input + * stream to all subscribers. + * + * This has parameters: + * - var: the name of the variable being managed + * - input_stream: the stream of inputs which we are distributing + * - channel_request_rx: a mpsc channel on which we receive requests for a + * new subscription to the variable. We are passed a oneshot channel which + * we can used to send the output stream to the requester. + * - ready: a watch channel which is used to signal when all subscribers have + * requested the stream and determines when we move to stage 2 to start + * distributing data + * - cancel: a cancellation token which is used to signal when the actor should + * terminate + */ async fn manage_var( var: VarName, mut input_stream: OutputStream, @@ -80,9 +114,9 @@ async fn manage_var( let mut send_requests = vec![]; // println!("Starting manage_var for {}", var); // Gathering senders + + /* Stage 1. Gathering subscribers */ loop { - // println!("check stage 1"); - // println!("Ready: {:?}", *ready.borrow()); select! { biased; _ = cancel.cancelled() => { @@ -103,29 +137,30 @@ async fn manage_var( } } - // Sending subscriptions + /* Send subscriptions to everyone who has requested one */ if send_requests.len() == 1 { - // Special case handling for a single request; just send the input stream + /* Special case handling for a single subscription request: just send + * the input stream directly to the subscriber and skip stage 2 */ let channel_sender = send_requests.pop().unwrap(); if let Err(_) = channel_sender.send(input_stream) { panic!("Failed to send stream for {var} to requester"); } // We directly re-forwarded the input stream, so we are done - // println!("Direct reforward for {}", var); return; } else { + /* Normal case: create a new channel for each subscriber and send + * them a stream generated from the receiving end of this channel */ for channel_sender in send_requests { let (tx, rx) = mpsc::channel(10); senders.push(tx); let stream = ReceiverStream::new(rx); - // let typed_stream = SS::to_typed_stream(typ, Box::pin(stream)); if let Err(_) = channel_sender.send(Box::pin(stream)) { // panic!("Failed to send stream for {var} to requester"); } } } - // Distributing data + /* Stage 2. Distributing data */ loop { select! { biased; @@ -153,8 +188,20 @@ async fn manage_var( } } -// Task for moving data from a channel to an broadcast channel -// with a clock used to control the rate of data distribution +/* Task for moving data from a channel to an broadcast channel + * with a clock used to control the rate of data distribution + * + * This is used alongside the monitor task to implement subcontexts + * by buffering data from the input stream and distributing it to subscribers + * when the clock advances (i.e. when subcontext.advance() is called) + * + * This has parameters: + * - input_stream: the stream of inputs which we are distributing + * - send: the broadcast channel to which we are sending data + * - clock: a watch channel which is used to signal when the clock advances + * - cancellation_token: a cancellation token which is used to signal when to + * terminate the task + */ async fn distribute( mut input_stream: OutputStream, send: broadcast::Sender, @@ -205,7 +252,18 @@ async fn distribute( } } -// Task for moving data from an input stream to an output channel +/* Task for moving data from an input stream to an output channel + * + * This is used in the implementation of subcontexts to buffer data before it + * is sent to distribute + * + * This has parameters: + * - input_stream: the stream of inputs which we are monitoring + * - send: the channel to which we are sending data (corresponding to the + * other half of the channel held by distribute) + * - cancellation_token: a cancellation token which is used to signal when to + * terminate the task + */ async fn monitor( mut input_stream: OutputStream, send: mpsc::Sender, @@ -239,7 +297,25 @@ struct VarData { requester: mpsc::Sender>>, } -// A struct representing the top-level stream context for an async monitor +/* + * A StreamContext that manages subscriptions to each variable and provides + * an asynchronous stream when requested by the monitoring semantics. + * + * This includes: + * - var_data: a map from variable names to the data associated with each + * variable, including the requester channel used to request the stream + * - cancellation_token: a cancellation token used to signal when any async + * actors we have launched should terminate + * - drop_guard: a reference-counted drop guard associated with the + * cancellation token which signals to cancel all background tasks when there + * it is dropped (i.e. when the reference count goes to zero) + * - vars_requested: a watch channel used to track the number of outstanding + * requests for subscriptions to each variable (this is used to determine + * when it is safe for managed_var to start distributing data) + * + * Most of the logic is in the manage_var actor which is launched for each + * variable to manage subscriptions and distribute data. + */ struct AsyncVarExchange { var_data: BTreeMap>, cancellation_token: CancellationToken, @@ -305,8 +381,13 @@ impl StreamContext for Arc> { } } -// A subcontext which consumes data for a subset of the variables and makes -// it available when evaluating a deferred expression +/* A subcontext which consumes data for a subset of the variables and makes + * it available when evaluating a deferred expression + * + * This is implemented via multiple background monitor and distribute actor + * pairs which buffer data from the input stream and distribute it to + * each subscriber to the subcontext + */ struct SubMonitor { parent: Arc>, senders: BTreeMap>, @@ -379,6 +460,20 @@ impl StreamContext for SubMonitor { } } +/* + * A Monitor instance implementing the Async Runtime. + * + * This runtime uses async actors to keep track of dependencies between + * channels and to distribute data between them, pass data around via async + * streams, and automatically perform garbage collection of the data contained + * in the streams. + * + * - The Expr type parameter is the type of the expressions in the model. + * - The Val type parameter is the type of the values used in the channels. + * - The S type parameter is the monitoring semantics used to evaluate the + * expressions as streams. + * - The M type parameter is the model/specification being monitored. + */ pub struct AsyncMonitorRunner where S: MonitoringSemantics,