Skip to content

Commit

Permalink
Merge branch 'main' into go-to-task-info-448
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Jul 28, 2023
2 parents b3c8465 + f4536af commit 56b6a86
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 14 deletions.
1 change: 1 addition & 0 deletions console-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tonic = { version = "0.9", default-features = false, features = [
prost = "0.11"
prost-types = "0.11"
tracing-core = "0.1.17"
futures-core = "0.3"

[dev-dependencies]
tonic-build = { version = "0.9", default-features = false, features = [
Expand Down
15 changes: 13 additions & 2 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ mod shrink;
use self::id_data::{IdData, Include};
use self::shrink::{ShrinkMap, ShrinkVec};

pub(crate) struct Aggregator {
/// Aggregates instrumentation traces and prepares state for the instrument
/// server.
///
/// The `Aggregator` is responsible for receiving and organizing the
/// instrumentated events and preparing the data to be served to a instrument
/// client.
pub struct Aggregator {
/// Channel of incoming events emitted by `TaskLayer`s.
events: mpsc::Receiver<Event>,

Expand Down Expand Up @@ -157,7 +163,12 @@ impl Aggregator {
}
}

pub(crate) async fn run(mut self) {
/// Runs the aggregator.
///
/// This method will start the aggregator loop and should run as long as
/// the instrument server is running. If the instrument server stops,
/// this future can be aborted.
pub async fn run(mut self) {
let mut publish = tokio::time::interval(self.publish_interval);
loop {
let should_send = tokio::select! {
Expand Down
154 changes: 142 additions & 12 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![doc = include_str!("../README.md")]
use console_api as proto;
use proto::resources::resource;
use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
use serde::Serialize;
use std::{
cell::RefCell,
Expand All @@ -15,7 +15,10 @@ use std::{
use thread_local::ThreadLocal;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::{mpsc, oneshot};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
#[cfg(unix)]
use tokio_stream::wrappers::UnixListenerStream;
use tracing_core::{
Expand All @@ -39,7 +42,7 @@ mod stats;
pub(crate) mod sync;
mod visitors;

use aggregator::Aggregator;
pub use aggregator::Aggregator;
pub use builder::{Builder, ServerAddr};
use callsites::Callsites;
use record::Recorder;
Expand Down Expand Up @@ -933,18 +936,16 @@ impl Server {
///
/// [`tonic`]: https://docs.rs/tonic/
pub async fn serve_with(
mut self,
self,
mut builder: tonic::transport::Server,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let aggregate = self
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
let addr = self.addr.clone();
let router = builder.add_service(
proto::instrument::instrument_server::InstrumentServer::new(self),
);
let ServerParts {
instrument_server,
aggregator,
} = self.into_parts();
let aggregate = spawn_named(aggregator.run(), "console::aggregate");
let router = builder.add_service(instrument_server);
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
Expand All @@ -960,6 +961,135 @@ impl Server {
aggregate.abort();
res?.map_err(Into::into)
}

/// Returns the parts needed to spawn a gRPC server and the aggregator that
/// supplies it.
///
/// Note that a server spawned in this way will disregard any value set by
/// [`Builder::server_addr`], as the user becomes responsible for defining
/// the address when calling [`Router::serve`].
///
/// Additionally, the user of this API must ensure that the [`Aggregator`]
/// is running for as long as the gRPC server is. If the server stops
/// running, the aggregator task can be aborted.
///
/// # Examples
///
/// The parts can be used to serve the instrument server together with
/// other endpoints from the same gRPC server.
///
/// ```
/// use console_subscriber::{ConsoleLayer, ServerParts};
///
/// # let runtime = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .unwrap();
/// # runtime.block_on(async {
/// let (console_layer, server) = ConsoleLayer::builder().build();
/// let ServerParts {
/// instrument_server,
/// aggregator,
/// ..
/// } = server.into_parts();
///
/// let aggregator_handle = tokio::spawn(aggregator.run());
/// let router = tonic::transport::Server::builder()
/// //.add_service(some_other_service)
/// .add_service(instrument_server);
/// let serve = router.serve(std::net::SocketAddr::new(
/// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
/// 6669,
/// ));
///
/// // Finally, spawn the server.
/// tokio::spawn(serve);
/// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
/// # drop(console_layer);
/// # let mut aggregator_handle = aggregator_handle;
/// # aggregator_handle.abort();
/// # });
/// ```
///
/// [`Router::serve`]: fn@tonic::transport::server::Router::serve
pub fn into_parts(mut self) -> ServerParts {
let aggregator = self
.aggregator
.take()
.expect("cannot start server multiple times");

let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self);

ServerParts {
instrument_server,
aggregator,
}
}
}

/// Server Parts
///
/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
/// further parts in the future, an as such is marked as `non_exhaustive`.
///
/// The `InstrumentServer<Server>` can be used to construct a router which
/// can be added to a [`tonic`] gRPC server.
///
/// The `aggregator` is a future which should be running as long as the server is.
/// Generally, this future should be spawned onto an appropriate runtime and then
/// aborted if the server gets shut down.
///
/// See the [`Server::into_parts`] documentation for usage.
#[non_exhaustive]
pub struct ServerParts {
/// The instrument server.
///
/// See the documentation for [`InstrumentServer`] for details.
pub instrument_server: InstrumentServer<Server>,

/// The aggregator.
///
/// Responsible for collecting and preparing traces for the instrument server
/// to send its clients.
///
/// The aggregator should be [`run`] when the instrument server is started.
/// If the server stops running for any reason, the aggregator task can be
/// aborted.
///
/// [`run`]: fn@crate::Aggregator::run
pub aggregator: Aggregator,
}

/// Aggregator handle.
///
/// This object is returned from [`Server::into_parts`]. It can be
/// used to abort the aggregator task.
///
/// The aggregator collects the traces that implement the async runtime
/// being observed and prepares them to be served by the gRPC server.
///
/// Normally, if the server, started with [`Server::serve`] or
/// [`Server::serve_with`] stops for any reason, the aggregator is aborted,
/// hoewver, if the server was started with the [`InstrumentServer`] returned
/// from [`Server::into_parts`], then it is the responsibility of the user
/// of the API to stop the aggregator task by calling [`abort`] on this
/// object.
///
/// [`abort`]: fn@crate::AggregatorHandle::abort
pub struct AggregatorHandle {
join_handle: JoinHandle<()>,
}

impl AggregatorHandle {
/// Aborts the task running this aggregator.
///
/// To avoid having a disconnected aggregator running forever, this
/// method should be called when the [`tonic::transport::Server`] started
/// with the [`InstrumentServer`] also returned from [`Server::into_parts`]
/// stops running.
pub fn abort(&mut self) {
self.join_handle.abort();
}
}

#[tonic::async_trait]
Expand Down

0 comments on commit 56b6a86

Please sign in to comment.