Skip to content

Commit

Permalink
expose the Aggregator instead of a handle to abort it
Browse files Browse the repository at this point in the history
As discussed in code review, for such a low level api as
`Server::into_parts`, it makes sense to allow the user to spawn the
aggregator where they like, rather than spawning it internally.

Since I couldn't find a way to return and use a `dyn Future` (boxed or
otherwise), the `Aggregator` has been made public with a (async) single
function `run()` which will start it's run loop.
  • Loading branch information
hds committed Jul 27, 2023
1 parent adca82c commit 7a2912c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 22 deletions.
15 changes: 13 additions & 2 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ mod shrink;
use self::id_data::{IdData, Include};
use self::shrink::{ShrinkMap, ShrinkVec};

pub(crate) struct Aggregator {
/// Aggregates instrumentation traces and prepares state for the instrument
/// server.
///
/// The `Aggregator` is responsible for receiving and organizing the
/// instrumentated events and preparing the data to be served to a instrument
/// client.
pub struct Aggregator {
/// Channel of incoming events emitted by `TaskLayer`s.
events: mpsc::Receiver<Event>,

Expand Down Expand Up @@ -157,7 +163,12 @@ impl Aggregator {
}
}

pub(crate) async fn run(mut self) {
/// Runs the aggregator.
///
/// This method will start the aggregator loop and should run as long as
/// the instrument server is running. If the instrument server stops,
/// this future can be aborted.
pub async fn run(mut self) {
let mut publish = tokio::time::interval(self.publish_interval);
loop {
let should_send = tokio::select! {
Expand Down
51 changes: 31 additions & 20 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ mod stats;
pub(crate) mod sync;
mod visitors;

use aggregator::Aggregator;
pub use aggregator::Aggregator;
pub use builder::{Builder, ServerAddr};
use callsites::Callsites;
use record::Recorder;
Expand Down Expand Up @@ -941,10 +941,11 @@ impl Server {
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let addr = self.addr.clone();
let ServerParts {
instrument_server: service,
aggregator_handle: aggregate,
instrument_server,
aggregator,
} = self.into_parts();
let router = builder.add_service(service);
let aggregate = spawn_named(aggregator.run(), "console::aggregate");
let router = builder.add_service(instrument_server);
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
Expand All @@ -957,17 +958,21 @@ impl Server {
spawn_named(serve, "console::serve").await
}
};
drop(aggregate);
aggregate.abort();
res?.map_err(Into::into)
}

/// Returns the parts needed to spawn a gRPC server and keep the aggregation
/// worker running.
/// Returns the parts needed to spawn a gRPC server and the aggregator that
/// supplies it.
///
/// Note that a server spawned in this way will disregard any value set by
/// [`Builder::server_addr`], as the user becomes responsible for defining
/// the address when calling [`Router::serve`].
///
/// Additionally, the user of this API must ensure that the [`Aggregator`]
/// is running for as long as the gRPC server is. If the server stops
/// running, the aggregator task can be aborted.
///
/// # Examples
///
/// The parts can be used to serve the instrument server together with
Expand All @@ -984,10 +989,11 @@ impl Server {
/// let (console_layer, server) = ConsoleLayer::builder().build();
/// let ServerParts {
/// instrument_server,
/// aggregator_handle,
/// aggregator,
/// ..
/// } = server.into_parts();
///
/// let aggregator_handle = tokio::spawn(aggregator.run());
/// let router = tonic::transport::Server::builder()
/// //.add_service(some_other_service)
/// .add_service(instrument_server);
Expand All @@ -1007,19 +1013,16 @@ impl Server {
///
/// [`Router::serve`]: fn@tonic::transport::server::Router::serve
pub fn into_parts(mut self) -> ServerParts {
let aggregate = self
let aggregator = self
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");

let service = proto::instrument::instrument_server::InstrumentServer::new(self);
let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self);

ServerParts {
instrument_server: service,
aggregator_handle: AggregatorHandle {
join_handle: aggregate,
},
instrument_server,
aggregator,
}
}
}
Expand All @@ -1032,8 +1035,9 @@ impl Server {
/// The `InstrumentServer<Server>` can be used to construct a router which
/// can be added to a [`tonic`] gRPC server.
///
/// The [`AggregatorHandle`] can be used to abort the associated aggregator task
/// after the server has been shut down.
/// The `aggregator` is a future which should be running as long as the server is.
/// Generally, this future should be spawned onto an appropriate runtime and then
/// aborted if the server gets shut down.
///
/// See the [`Server::into_parts`] documentation for usage.
#[non_exhaustive]
Expand All @@ -1043,10 +1047,17 @@ pub struct ServerParts {
/// See the documentation for [`InstrumentServer`] for details.
pub instrument_server: InstrumentServer<Server>,

/// A handle to the background worker task responsible for aggregating trace data.
/// The aggregator.
///
/// Responsible for collecting and preparing traces for the instrument server
/// to send its clients.
///
/// The aggregator should be [`run`] when the instrument server is started.
/// If the server stops running for any reason, the aggregator task can be
/// aborted.
///
/// See the documentation for [`AggregatorHandle`] for details.
pub aggregator_handle: AggregatorHandle,
/// [`run`]: fn@crate::Aggregator::run
pub aggregator: Aggregator,
}

/// Aggregator handle.
Expand Down

0 comments on commit 7a2912c

Please sign in to comment.