diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 7359970b5..4496cba28 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -22,7 +22,13 @@ mod shrink; use self::id_data::{IdData, Include}; use self::shrink::{ShrinkMap, ShrinkVec}; -pub(crate) struct Aggregator { +/// Aggregates instrumentation traces and prepares state for the instrument +/// server. +/// +/// The `Aggregator` is responsible for receiving and organizing the +/// instrumentated events and preparing the data to be served to a instrument +/// client. +pub struct Aggregator { /// Channel of incoming events emitted by `TaskLayer`s. events: mpsc::Receiver, @@ -157,7 +163,12 @@ impl Aggregator { } } - pub(crate) async fn run(mut self) { + /// Runs the aggregator. + /// + /// This method will start the aggregator loop and should run as long as + /// the instrument server is running. If the instrument server stops, + /// this future can be aborted. + pub async fn run(mut self) { let mut publish = tokio::time::interval(self.publish_interval); loop { let should_send = tokio::select! { diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 6b0c1a75e..8df105cce 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -1,6 +1,6 @@ #![doc = include_str!("../README.md")] use console_api as proto; -use proto::resources::resource; +use proto::{instrument::instrument_server::InstrumentServer, resources::resource}; use serde::Serialize; use std::{ cell::RefCell, @@ -15,7 +15,10 @@ use std::{ use thread_local::ThreadLocal; #[cfg(unix)] use tokio::net::UnixListener; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; #[cfg(unix)] use tokio_stream::wrappers::UnixListenerStream; use tracing_core::{ @@ -39,7 +42,7 @@ mod stats; pub(crate) mod sync; mod visitors; -use aggregator::Aggregator; +pub use aggregator::Aggregator; pub use builder::{Builder, ServerAddr}; use callsites::Callsites; use record::Recorder; @@ -933,18 +936,16 @@ impl Server { /// /// [`tonic`]: https://docs.rs/tonic/ pub async fn serve_with( - mut self, + self, mut builder: tonic::transport::Server, ) -> Result<(), Box> { - let aggregate = self - .aggregator - .take() - .expect("cannot start server multiple times"); - let aggregate = spawn_named(aggregate.run(), "console::aggregate"); let addr = self.addr.clone(); - let router = builder.add_service( - proto::instrument::instrument_server::InstrumentServer::new(self), - ); + let ServerParts { + instrument_server, + aggregator, + } = self.into_parts(); + let aggregate = spawn_named(aggregator.run(), "console::aggregate"); + let router = builder.add_service(instrument_server); let res = match addr { ServerAddr::Tcp(addr) => { let serve = router.serve(addr); @@ -960,6 +961,135 @@ impl Server { aggregate.abort(); res?.map_err(Into::into) } + + /// Returns the parts needed to spawn a gRPC server and the aggregator that + /// supplies it. + /// + /// Note that a server spawned in this way will disregard any value set by + /// [`Builder::server_addr`], as the user becomes responsible for defining + /// the address when calling [`Router::serve`]. + /// + /// Additionally, the user of this API must ensure that the [`Aggregator`] + /// is running for as long as the gRPC server is. If the server stops + /// running, the aggregator task can be aborted. + /// + /// # Examples + /// + /// The parts can be used to serve the instrument server together with + /// other endpoints from the same gRPC server. + /// + /// ``` + /// use console_subscriber::{ConsoleLayer, ServerParts}; + /// + /// # let runtime = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .unwrap(); + /// # runtime.block_on(async { + /// let (console_layer, server) = ConsoleLayer::builder().build(); + /// let ServerParts { + /// instrument_server, + /// aggregator, + /// .. + /// } = server.into_parts(); + /// + /// let aggregator_handle = tokio::spawn(aggregator.run()); + /// let router = tonic::transport::Server::builder() + /// //.add_service(some_other_service) + /// .add_service(instrument_server); + /// let serve = router.serve(std::net::SocketAddr::new( + /// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), + /// 6669, + /// )); + /// + /// // Finally, spawn the server. + /// tokio::spawn(serve); + /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused. + /// # drop(console_layer); + /// # let mut aggregator_handle = aggregator_handle; + /// # aggregator_handle.abort(); + /// # }); + /// ``` + /// + /// [`Router::serve`]: fn@tonic::transport::server::Router::serve + pub fn into_parts(mut self) -> ServerParts { + let aggregator = self + .aggregator + .take() + .expect("cannot start server multiple times"); + + let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self); + + ServerParts { + instrument_server, + aggregator, + } + } +} + +/// Server Parts +/// +/// This struct contains the parts returned by [`Server::into_parts`]. It may contain +/// further parts in the future, an as such is marked as `non_exhaustive`. +/// +/// The `InstrumentServer` can be used to construct a router which +/// can be added to a [`tonic`] gRPC server. +/// +/// The `aggregator` is a future which should be running as long as the server is. +/// Generally, this future should be spawned onto an appropriate runtime and then +/// aborted if the server gets shut down. +/// +/// See the [`Server::into_parts`] documentation for usage. +#[non_exhaustive] +pub struct ServerParts { + /// The instrument server. + /// + /// See the documentation for [`InstrumentServer`] for details. + pub instrument_server: InstrumentServer, + + /// The aggregator. + /// + /// Responsible for collecting and preparing traces for the instrument server + /// to send its clients. + /// + /// The aggregator should be [`run`] when the instrument server is started. + /// If the server stops running for any reason, the aggregator task can be + /// aborted. + /// + /// [`run`]: fn@crate::Aggregator::run + pub aggregator: Aggregator, +} + +/// Aggregator handle. +/// +/// This object is returned from [`Server::into_parts`]. It can be +/// used to abort the aggregator task. +/// +/// The aggregator collects the traces that implement the async runtime +/// being observed and prepares them to be served by the gRPC server. +/// +/// Normally, if the server, started with [`Server::serve`] or +/// [`Server::serve_with`] stops for any reason, the aggregator is aborted, +/// hoewver, if the server was started with the [`InstrumentServer`] returned +/// from [`Server::into_parts`], then it is the responsibility of the user +/// of the API to stop the aggregator task by calling [`abort`] on this +/// object. +/// +/// [`abort`]: fn@crate::AggregatorHandle::abort +pub struct AggregatorHandle { + join_handle: JoinHandle<()>, +} + +impl AggregatorHandle { + /// Aborts the task running this aggregator. + /// + /// To avoid having a disconnected aggregator running forever, this + /// method should be called when the [`tonic::transport::Server`] started + /// with the [`InstrumentServer`] also returned from [`Server::into_parts`] + /// stops running. + pub fn abort(&mut self) { + self.join_handle.abort(); + } } #[tonic::async_trait]