Skip to content

Commit

Permalink
Review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Santtu Lakkala <santtu.lakkala@unikie.com>
  • Loading branch information
slakkala committed Oct 2, 2024
1 parent 719cc37 commit e2ceb07
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
42 changes: 26 additions & 16 deletions client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use anyhow::bail;
use async_channel::Receiver;
use givc_common::pb;
pub use givc_common::query::{Event, QueryResult};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tracing::debug;

use givc_common::address::EndpointAddress;
use givc_common::pb;
pub use givc_common::query::{Event, QueryResult};
use givc_common::types::*;

use crate::endpoint::{EndpointConfig, TlsConfig};
Expand All @@ -18,6 +19,8 @@ pub struct WatchResult {
// Design defence: we use `async-channel` here, as it could be used with both
// tokio's and glib's eventloop, and recommended by gtk4-rs developers:
pub channel: Receiver<Event>,

_quit: mpsc::Sender<()>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -165,6 +168,7 @@ impl AdminClient {
use pb::admin::watch_item::Status;
use pb::admin::WatchItem;
let (tx, rx) = async_channel::bounded::<Event>(10);
let (quittx, mut quitrx) = mpsc::channel(1);

let mut watch = self
.connect_to()
Expand All @@ -181,29 +185,35 @@ impl AdminClient {
};

tokio::spawn(async move {
loop {
if let Ok(Some(event)) = watch.try_next().await {
let event = match Event::try_from(event) {
Ok(event) => event,
Err(e) => {
debug!("Fail to decode: {e}");
tokio::select! {
_ = async move {
loop {
if let Ok(Some(event)) = watch.try_next().await {
let event = match Event::try_from(event) {
Ok(event) => event,
Err(e) => {
debug!("Fail to decode: {e}");
break;
}
};
if let Err(e) = tx.send(event).await {
debug!("Fail to send event: {e}");
break;
}
} else {
debug!("Stream closed by server");
break;
}
};
if let Err(e) = tx.send(event).await {
debug!("Fail to send event: {e}");
break;
}
} else {
debug!("Stream closed by server");
break;
}
} => {}
_ = quitrx.recv() => {}
}
});

let result = WatchResult {
initial: list,
channel: rx,
_quit: quittx,
};
Ok(result)
}
Expand Down
8 changes: 4 additions & 4 deletions src/bin/givc-cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::{Parser, Subcommand};
use givc::endpoint::TlsConfig;
use givc::types::*;
use givc::utils::vsock::parse_vsock_addr;
use givc_client::{client::WatchResult, AdminClient};
use givc_client::client::AdminClient;
use givc_common::address::EndpointAddress;
use serde::ser::Serialize;
use std::path::PathBuf;
Expand Down Expand Up @@ -193,16 +193,16 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
limit,
initial: dump_initial,
} => {
let WatchResult { initial, channel } = admin.watch().await?;
let watch = admin.watch().await?;
let mut limit = limit.map(|l| 0..l);

if dump_initial {
dump(initial, as_json)?
dump(watch.initial, as_json)?
}

// Change to Option::is_none_or() with rust >1.82
while !limit.as_mut().is_some_and(|l| l.next().is_none()) {
dump(channel.recv().await?, as_json)?;
dump(watch.channel.recv().await?, as_json)?;
}
}
};
Expand Down

0 comments on commit e2ceb07

Please sign in to comment.