From dff938a68fbbb67a7722deb4ff8e1196e9029c4b Mon Sep 17 00:00:00 2001 From: Rustin170506 <29879298+Rustin170506@users.noreply.github.com> Date: Thu, 26 Sep 2024 23:28:41 +0800 Subject: [PATCH] feat: add next_message --- console-subscriber/src/aggregator/mod.rs | 2 +- console-subscriber/src/lib.rs | 11 ++-- tokio-console/src/conn.rs | 65 ++++++++++++++++++------ tokio-console/src/main.rs | 12 ++++- tokio-console/src/state/mod.rs | 24 +++------ 5 files changed, 75 insertions(+), 39 deletions(-) diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 259a139ce..3aa1e5ada 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -89,7 +89,7 @@ pub struct Aggregator { poll_ops: Vec, /// The time "state" of the aggregator, such as paused or live. - temporality: proto::instrument::Temporality, + temporality: proto::instrument::Temporality, /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a /// timestamp that can be sent over the wire. diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index d2bb52e8e..9707a7632 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -1252,10 +1252,13 @@ impl proto::instrument::instrument_server::Instrument for Server { &self, _req: tonic::Request, ) -> Result, tonic::Status> { - let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer); - self.subscribe.send(Command::WatchState(Watch(stream_sender))).await.map_err(|_| { - tonic::Status::internal("cannot get state, aggregation task is not running") - })?; + let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer); + self.subscribe + .send(Command::WatchState(Watch(stream_sender))) + .await + .map_err(|_| { + tonic::Status::internal("cannot get state, aggregation task is not running") + })?; let stream = tokio_stream::wrappers::ReceiverStream::new(stream_recv); Ok(tonic::Response::new(stream)) } diff --git a/tokio-console/src/conn.rs b/tokio-console/src/conn.rs index 03fb88e30..097994eae 100644 --- a/tokio-console/src/conn.rs +++ b/tokio-console/src/conn.rs @@ -1,3 +1,4 @@ +use console_api::instrument::StateRequest; use console_api::instrument::{ instrument_client::InstrumentClient, InstrumentRequest, PauseRequest, ResumeRequest, TaskDetailsRequest, Update, @@ -30,11 +31,17 @@ pub struct Connection { enum State { Connected { client: InstrumentClient, - stream: Box>, + update_stream: Box>, + state_stream: Box>, }, Disconnected(Duration), } +pub(crate) enum Message { + Update(Update), + State(console_api::instrument::State), +} + macro_rules! with_client { ($me:ident, $client:ident, $block:expr) => ({ loop { @@ -110,9 +117,16 @@ impl Connection { } }; let mut client = InstrumentClient::new(channel); - let request = tonic::Request::new(InstrumentRequest {}); - let stream = Box::new(client.watch_updates(request).await?.into_inner()); - Ok::>(State::Connected { client, stream }) + let update_request = tonic::Request::new(InstrumentRequest {}); + let update_stream = + Box::new(client.watch_updates(update_request).await?.into_inner()); + let state_request = tonic::Request::new(StateRequest {}); + let state_stream = Box::new(client.watch_state(state_request).await?.into_inner()); + Ok::>(State::Connected { + client, + update_stream, + state_stream, + }) }; self.state = match try_connect.await { Ok(connected) => { @@ -128,20 +142,39 @@ impl Connection { } } - pub async fn next_update(&mut self) -> Update { + pub async fn next_message(&mut self) -> Message { loop { - match self.state { - State::Connected { ref mut stream, .. } => match stream.next().await { - Some(Ok(update)) => return update, - Some(Err(status)) => { - tracing::warn!(%status, "error from stream"); - self.state = State::Disconnected(Self::BACKOFF); + match &mut self.state { + State::Connected { + update_stream, + state_stream, + .. + } => { + tokio::select! { + update = update_stream.next() => match update { + Some(Ok(update)) => return Message::Update(update), + Some(Err(status)) => { + tracing::warn!(%status, "error from update stream"); + self.state = State::Disconnected(Self::BACKOFF); + } + None => { + tracing::error!("update stream closed by server"); + self.state = State::Disconnected(Self::BACKOFF); + } + }, + state = state_stream.next() => match state { + Some(Ok(state)) => return Message::State(state), + Some(Err(status)) => { + tracing::warn!(%status, "error from state stream"); + self.state = State::Disconnected(Self::BACKOFF); + } + None => { + tracing::error!("state stream closed by server"); + self.state = State::Disconnected(Self::BACKOFF); + } + }, } - None => { - tracing::error!("stream closed by server"); - self.state = State::Disconnected(Self::BACKOFF); - } - }, + } State::Disconnected(_) => self.connect().await, } } diff --git a/tokio-console/src/main.rs b/tokio-console/src/main.rs index 6b8ed54cf..23d67c6d3 100644 --- a/tokio-console/src/main.rs +++ b/tokio-console/src/main.rs @@ -124,9 +124,17 @@ async fn main() -> color_eyre::Result<()> { _ => {} } }, - instrument_update = conn.next_update() => { - state.update(&view.styles, view.current_view(), instrument_update); + instrument_message = conn.next_message() => { + match instrument_message { + conn::Message::Update(update) => { + state.update(&view.styles, view.current_view(), update); + }, + conn::Message::State(state_update) => { + state.update_state(state_update); + } + } } + details_update = details_rx.recv() => { if let Some(details_update) = details_update { state.update_task_details(details_update); diff --git a/tokio-console/src/state/mod.rs b/tokio-console/src/state/mod.rs index 4b9928a8f..56ffd8589 100644 --- a/tokio-console/src/state/mod.rs +++ b/tokio-console/src/state/mod.rs @@ -34,7 +34,7 @@ pub(crate) type DetailsRef = Rc>>; pub(crate) struct State { metas: HashMap, last_updated_at: Option, - temporality: Temporality, + temporality: proto::instrument::Temporality, tasks_state: TasksState, resources_state: ResourcesState, async_ops_state: AsyncOpsState, @@ -71,12 +71,6 @@ pub(crate) enum FieldValue { Debug(String), } -#[derive(Debug)] -enum Temporality { - Live, - Paused, -} - #[derive(Debug, Eq, PartialEq)] pub(crate) struct Attribute { field: Field, @@ -169,6 +163,10 @@ impl State { } } + pub(crate) fn update_state(&mut self, state: proto::instrument::State) { + self.temporality = proto::instrument::Temporality::try_from(state.temporality).unwrap(); + } + pub(crate) fn retain_active(&mut self) { if self.is_paused() { return; @@ -238,21 +236,15 @@ impl State { // temporality methods pub(crate) fn pause(&mut self) { - self.temporality = Temporality::Paused; + self.temporality = proto::instrument::Temporality::Paused; } pub(crate) fn resume(&mut self) { - self.temporality = Temporality::Live; + self.temporality = proto::instrument::Temporality::Live; } pub(crate) fn is_paused(&self) -> bool { - matches!(self.temporality, Temporality::Paused) - } -} - -impl Default for Temporality { - fn default() -> Self { - Self::Live + matches!(self.temporality, proto::instrument::Temporality::Paused) } }