From e2ceb07b67014e6b191af4b581a128f32eeea98b Mon Sep 17 00:00:00 2001 From: Santtu Lakkala Date: Wed, 2 Oct 2024 16:48:26 +0300 Subject: [PATCH] Review changes Signed-off-by: Santtu Lakkala --- client/src/client.rs | 42 ++++++++++++++++++++++++++---------------- src/bin/givc-cli.rs | 8 ++++---- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index d096329..83725ef 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -1,12 +1,13 @@ use anyhow::bail; use async_channel::Receiver; -use givc_common::pb; -pub use givc_common::query::{Event, QueryResult}; +use tokio::sync::mpsc; use tokio_stream::StreamExt; use tonic::transport::Channel; use tracing::debug; use givc_common::address::EndpointAddress; +use givc_common::pb; +pub use givc_common::query::{Event, QueryResult}; use givc_common::types::*; use crate::endpoint::{EndpointConfig, TlsConfig}; @@ -18,6 +19,8 @@ pub struct WatchResult { // Design defence: we use `async-channel` here, as it could be used with both // tokio's and glib's eventloop, and recommended by gtk4-rs developers: pub channel: Receiver, + + _quit: mpsc::Sender<()>, } #[derive(Debug)] @@ -165,6 +168,7 @@ impl AdminClient { use pb::admin::watch_item::Status; use pb::admin::WatchItem; let (tx, rx) = async_channel::bounded::(10); + let (quittx, mut quitrx) = mpsc::channel(1); let mut watch = self .connect_to() @@ -181,29 +185,35 @@ impl AdminClient { }; tokio::spawn(async move { - loop { - if let Ok(Some(event)) = watch.try_next().await { - let event = match Event::try_from(event) { - Ok(event) => event, - Err(e) => { - debug!("Fail to decode: {e}"); + tokio::select! { + _ = async move { + loop { + if let Ok(Some(event)) = watch.try_next().await { + let event = match Event::try_from(event) { + Ok(event) => event, + Err(e) => { + debug!("Fail to decode: {e}"); + break; + } + }; + if let Err(e) = tx.send(event).await { + debug!("Fail to send event: {e}"); + break; + } + } else { + debug!("Stream closed by server"); break; } - }; - if let Err(e) = tx.send(event).await { - debug!("Fail to send event: {e}"); - break; } - } else { - debug!("Stream closed by server"); - break; - } + } => {} + _ = quitrx.recv() => {} } }); let result = WatchResult { initial: list, channel: rx, + _quit: quittx, }; Ok(result) } diff --git a/src/bin/givc-cli.rs b/src/bin/givc-cli.rs index 7e9d728..46483be 100644 --- a/src/bin/givc-cli.rs +++ b/src/bin/givc-cli.rs @@ -2,7 +2,7 @@ use clap::{Parser, Subcommand}; use givc::endpoint::TlsConfig; use givc::types::*; use givc::utils::vsock::parse_vsock_addr; -use givc_client::{client::WatchResult, AdminClient}; +use givc_client::client::AdminClient; use givc_common::address::EndpointAddress; use serde::ser::Serialize; use std::path::PathBuf; @@ -193,16 +193,16 @@ async fn main() -> std::result::Result<(), Box> { limit, initial: dump_initial, } => { - let WatchResult { initial, channel } = admin.watch().await?; + let watch = admin.watch().await?; let mut limit = limit.map(|l| 0..l); if dump_initial { - dump(initial, as_json)? + dump(watch.initial, as_json)? } // Change to Option::is_none_or() with rust >1.82 while !limit.as_mut().is_some_and(|l| l.next().is_none()) { - dump(channel.recv().await?, as_json)?; + dump(watch.channel.recv().await?, as_json)?; } } };