Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriber) expose server parts #451

Merged
merged 6 commits into from
Jul 28, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 113 additions & 12 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![doc = include_str!("../README.md")]
use console_api as proto;
use proto::resources::resource;
use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
use serde::Serialize;
use std::{
cell::RefCell,
Expand All @@ -15,7 +15,10 @@ use std::{
use thread_local::ThreadLocal;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::{mpsc, oneshot};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
#[cfg(unix)]
use tokio_stream::wrappers::UnixListenerStream;
use tracing_core::{
Expand Down Expand Up @@ -933,18 +936,15 @@ impl Server {
///
/// [`tonic`]: https://docs.rs/tonic/
pub async fn serve_with(
mut self,
self,
mut builder: tonic::transport::Server,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let aggregate = self
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
let addr = self.addr.clone();
let router = builder.add_service(
proto::instrument::instrument_server::InstrumentServer::new(self),
);
let ServerParts {
instrument_server: service,
aggregator_handle: aggregate,
} = self.into_parts();
let router = builder.add_service(service);
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
Expand All @@ -957,9 +957,110 @@ impl Server {
spawn_named(serve, "console::serve").await
}
};
aggregate.abort();
drop(aggregate);
res?.map_err(Into::into)
}

/// Returns the parts needed to spawn a gRPC server and keep the aggregation
/// worker running.
///
/// Note that a server spawned in this way will overwrite any value set by
hds marked this conversation as resolved.
Show resolved Hide resolved
/// [`Builder::server_addr`] as the user becomes responsible for defining
hds marked this conversation as resolved.
Show resolved Hide resolved
/// the address when calling [`Router::serve`].
///
/// # Examples
///
/// The parts can be used to serve the instrument server together with
/// other endpoints from the same gRPC server.
///
/// ```
/// use console_subscriber::{ConsoleLayer, ServerParts};
///
/// # let runtime = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .unwrap();
/// # runtime.block_on(async {
/// let (console_layer, server) = ConsoleLayer::builder().build();
/// let ServerParts {
/// instrument_server,
/// aggregator_handle,
/// ..
/// } = server.into_parts();
///
/// let router = tonic::transport::Server::builder()
/// //.add_service(some_other_service)
/// .add_service(instrument_server);
/// let serve = router.serve(std::net::SocketAddr::new(
/// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
/// 6669,
/// ));
///
/// // Finally, spawn the server.
/// tokio::spawn(serve);
/// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
/// # drop(console_layer);
/// # drop(aggregator_handle);
/// # });
/// ```
///
/// [`Router::serve`]: fn@tonic::transport::server::Router::serve
pub fn into_parts(mut self) -> ServerParts {
let aggregate = self
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");

let service = proto::instrument::instrument_server::InstrumentServer::new(self);

ServerParts {
instrument_server: service,
aggregator_handle: AggregatorHandle {
join_handle: aggregate,
},
}
}
}

/// Server Parts
///
/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
/// further parts in the future, an as such is marked as `non_exhaustive`.
///
/// The `InstrumentServer<Server>` can be used to construct a router which
/// can be added to a [`tonic`] gRPC server.
///
/// The [`AggregatorHandle`] must be kept until after the server has been
/// shut down.
///
/// See the [`Server::into_parts`] documentation for usage.
#[non_exhaustive]
pub struct ServerParts {
/// The instrument server.
///
/// See the documentation for [`InstrumentServer`] for details.
pub instrument_server: InstrumentServer<Server>,

/// The aggregate handle.
hds marked this conversation as resolved.
Show resolved Hide resolved
///
/// See the documentation for [`AggregatorHandle`] for details.
hds marked this conversation as resolved.
Show resolved Hide resolved
pub aggregator_handle: AggregatorHandle,
}

/// Aggregator handle.
hds marked this conversation as resolved.
Show resolved Hide resolved
///
/// This object is returned from [`Server::into_parts`] and must be
/// kept as long as the `InstrumentServer<Server>` - which is also
/// returned - is in use.
pub struct AggregatorHandle {
join_handle: JoinHandle<()>,
}

impl Drop for AggregatorHandle {
fn drop(&mut self) {
self.join_handle.abort();
}
hds marked this conversation as resolved.
Show resolved Hide resolved
}

#[tonic::async_trait]
Expand Down