Skip to content

Commit

Permalink
feat: add next_message
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Sep 26, 2024
1 parent d0810c6 commit dff938a
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 39 deletions.
2 changes: 1 addition & 1 deletion console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct Aggregator {
poll_ops: Vec<proto::resources::PollOp>,

/// The time "state" of the aggregator, such as paused or live.
temporality: proto::instrument::Temporality,
temporality: proto::instrument::Temporality,

/// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
/// timestamp that can be sent over the wire.
Expand Down
11 changes: 7 additions & 4 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,10 +1252,13 @@ impl proto::instrument::instrument_server::Instrument for Server {
&self,
_req: tonic::Request<proto::instrument::StateRequest>,
) -> Result<tonic::Response<Self::WatchStateStream>, tonic::Status> {
let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer);
self.subscribe.send(Command::WatchState(Watch(stream_sender))).await.map_err(|_| {
tonic::Status::internal("cannot get state, aggregation task is not running")
})?;
let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer);
self.subscribe
.send(Command::WatchState(Watch(stream_sender)))
.await
.map_err(|_| {
tonic::Status::internal("cannot get state, aggregation task is not running")
})?;
let stream = tokio_stream::wrappers::ReceiverStream::new(stream_recv);
Ok(tonic::Response::new(stream))
}
Expand Down
65 changes: 49 additions & 16 deletions tokio-console/src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use console_api::instrument::StateRequest;
use console_api::instrument::{
instrument_client::InstrumentClient, InstrumentRequest, PauseRequest, ResumeRequest,
TaskDetailsRequest, Update,
Expand Down Expand Up @@ -30,11 +31,17 @@ pub struct Connection {
enum State {
Connected {
client: InstrumentClient<Channel>,
stream: Box<Streaming<Update>>,
update_stream: Box<Streaming<Update>>,
state_stream: Box<Streaming<console_api::instrument::State>>,
},
Disconnected(Duration),
}

pub(crate) enum Message {
Update(Update),
State(console_api::instrument::State),
}

macro_rules! with_client {
($me:ident, $client:ident, $block:expr) => ({
loop {
Expand Down Expand Up @@ -110,9 +117,16 @@ impl Connection {
}
};
let mut client = InstrumentClient::new(channel);
let request = tonic::Request::new(InstrumentRequest {});
let stream = Box::new(client.watch_updates(request).await?.into_inner());
Ok::<State, Box<dyn Error + Send + Sync>>(State::Connected { client, stream })
let update_request = tonic::Request::new(InstrumentRequest {});
let update_stream =
Box::new(client.watch_updates(update_request).await?.into_inner());
let state_request = tonic::Request::new(StateRequest {});
let state_stream = Box::new(client.watch_state(state_request).await?.into_inner());
Ok::<State, Box<dyn Error + Send + Sync>>(State::Connected {
client,
update_stream,
state_stream,
})
};
self.state = match try_connect.await {
Ok(connected) => {
Expand All @@ -128,20 +142,39 @@ impl Connection {
}
}

pub async fn next_update(&mut self) -> Update {
pub async fn next_message(&mut self) -> Message {
loop {
match self.state {
State::Connected { ref mut stream, .. } => match stream.next().await {
Some(Ok(update)) => return update,
Some(Err(status)) => {
tracing::warn!(%status, "error from stream");
self.state = State::Disconnected(Self::BACKOFF);
match &mut self.state {
State::Connected {
update_stream,
state_stream,
..
} => {
tokio::select! {
update = update_stream.next() => match update {
Some(Ok(update)) => return Message::Update(update),
Some(Err(status)) => {
tracing::warn!(%status, "error from update stream");
self.state = State::Disconnected(Self::BACKOFF);
}
None => {
tracing::error!("update stream closed by server");
self.state = State::Disconnected(Self::BACKOFF);
}
},
state = state_stream.next() => match state {
Some(Ok(state)) => return Message::State(state),
Some(Err(status)) => {
tracing::warn!(%status, "error from state stream");
self.state = State::Disconnected(Self::BACKOFF);
}
None => {
tracing::error!("state stream closed by server");
self.state = State::Disconnected(Self::BACKOFF);
}
},
}
None => {
tracing::error!("stream closed by server");
self.state = State::Disconnected(Self::BACKOFF);
}
},
}
State::Disconnected(_) => self.connect().await,
}
}
Expand Down
12 changes: 10 additions & 2 deletions tokio-console/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,17 @@ async fn main() -> color_eyre::Result<()> {
_ => {}
}
},
instrument_update = conn.next_update() => {
state.update(&view.styles, view.current_view(), instrument_update);
instrument_message = conn.next_message() => {
match instrument_message {
conn::Message::Update(update) => {
state.update(&view.styles, view.current_view(), update);
},
conn::Message::State(state_update) => {
state.update_state(state_update);
}
}
}

details_update = details_rx.recv() => {
if let Some(details_update) = details_update {
state.update_task_details(details_update);
Expand Down
24 changes: 8 additions & 16 deletions tokio-console/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) type DetailsRef = Rc<RefCell<Option<Details>>>;
pub(crate) struct State {
metas: HashMap<u64, Metadata>,
last_updated_at: Option<SystemTime>,
temporality: Temporality,
temporality: proto::instrument::Temporality,
tasks_state: TasksState,
resources_state: ResourcesState,
async_ops_state: AsyncOpsState,
Expand Down Expand Up @@ -71,12 +71,6 @@ pub(crate) enum FieldValue {
Debug(String),
}

#[derive(Debug)]
enum Temporality {
Live,
Paused,
}

#[derive(Debug, Eq, PartialEq)]
pub(crate) struct Attribute {
field: Field,
Expand Down Expand Up @@ -169,6 +163,10 @@ impl State {
}
}

pub(crate) fn update_state(&mut self, state: proto::instrument::State) {
self.temporality = proto::instrument::Temporality::try_from(state.temporality).unwrap();
}

pub(crate) fn retain_active(&mut self) {
if self.is_paused() {
return;
Expand Down Expand Up @@ -238,21 +236,15 @@ impl State {
// temporality methods

pub(crate) fn pause(&mut self) {
self.temporality = Temporality::Paused;
self.temporality = proto::instrument::Temporality::Paused;
}

pub(crate) fn resume(&mut self) {
self.temporality = Temporality::Live;
self.temporality = proto::instrument::Temporality::Live;
}

pub(crate) fn is_paused(&self) -> bool {
matches!(self.temporality, Temporality::Paused)
}
}

impl Default for Temporality {
fn default() -> Self {
Self::Live
matches!(self.temporality, proto::instrument::Temporality::Paused)
}
}

Expand Down

0 comments on commit dff938a

Please sign in to comment.