From c5ba76c05a47c1301d6d40fdec4d47ce19a2deaf Mon Sep 17 00:00:00 2001 From: ningmingxiao Date: Fri, 27 Sep 2024 18:02:49 +0800 Subject: [PATCH] fix exec hang when tx channel is full --- crates/runc-shim/src/service.rs | 81 ++++++++++++------------- crates/shim/src/asynchronous/monitor.rs | 9 +-- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/crates/runc-shim/src/service.rs b/crates/runc-shim/src/service.rs index 2decc527..5b0b26e1 100644 --- a/crates/runc-shim/src/service.rs +++ b/crates/runc-shim/src/service.rs @@ -159,52 +159,51 @@ async fn process_exits( if let Subject::Pid(pid) = e.subject { debug!("receive exit event: {}", &e); let exit_code = e.exit_code; - for (_k, cont) in containers.lock().await.iter_mut() { - let bundle = cont.bundle.to_string(); - // pid belongs to container init process - if cont.init.pid == pid { - // kill all children process if the container has a private PID namespace - if should_kill_all_on_exit(&bundle).await { - cont.kill(None, 9, true).await.unwrap_or_else(|e| { - error!("failed to kill init's children: {}", e) - }); - } - // set exit for init process - cont.init.set_exited(exit_code).await; - - // publish event - let (_, code, exited_at) = match cont.get_exit_info(None).await { - Ok(info) => info, - Err(_) => break, - }; - - let ts = convert_to_timestamp(exited_at); - let event = TaskExit { - container_id: cont.id.to_string(), - id: cont.id.to_string(), - pid: cont.pid().await as u32, - exit_status: code as u32, - exited_at: Some(ts).into(), - ..Default::default() - }; - let topic = event.topic(); - tx.send((topic.to_string(), Box::new(event))) - .await - .unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e)); + let containers = containers.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + for (_k, cont) in containers.lock().await.iter_mut() { + let bundle = cont.bundle.to_string(); + // pid belongs to container init process + if cont.init.pid == pid { + // kill all children process if the container has a private PID namespace + if should_kill_all_on_exit(&bundle).await { + cont.kill(None, 9, true).await.unwrap_or_else(|e| { + error!("failed to kill init's children: {}", e) + }); + } + // set exit for init process + cont.init.set_exited(exit_code).await; + + // publish event + let (_, code, exited_at) = match cont.get_exit_info(None).await { + Ok(info) => info, + Err(_) => break, + }; + + let ts = convert_to_timestamp(exited_at); + let event = TaskExit { + container_id: cont.id.to_string(), + id: cont.id.to_string(), + pid: cont.pid().await as u32, + exit_status: code as u32, + exited_at: Some(ts).into(), + ..Default::default() + }; + let topic = event.topic(); + tx.send((topic.to_string(), Box::new(event))) + .await + .unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e)); - break; - } + break; + } - // pid belongs to container common process - for (_exec_id, p) in cont.processes.iter_mut() { - // set exit for exec process - if p.pid == pid { + // pid belongs to container common process + if let Some(p) = cont.processes.values_mut().find(|p| p.pid == pid) { p.set_exited(exit_code).await; - // TODO: publish event - break; } } - } + }); } } monitor_unsubscribe(s.id).await.unwrap_or_default(); diff --git a/crates/shim/src/asynchronous/monitor.rs b/crates/shim/src/asynchronous/monitor.rs index 7ed8c0a2..e8c1de9f 100644 --- a/crates/shim/src/asynchronous/monitor.rs +++ b/crates/shim/src/asynchronous/monitor.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use lazy_static::lazy_static; use log::error; use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + mpsc::{channel, Receiver, Sender}, Mutex, }; @@ -68,17 +68,17 @@ pub struct Monitor { pub(crate) struct Subscriber { pub(crate) topic: Topic, - pub(crate) tx: UnboundedSender, + pub(crate) tx: Sender, } pub struct Subscription { pub id: i64, - pub rx: UnboundedReceiver, + pub rx: Receiver, } impl Monitor { pub fn subscribe(&mut self, topic: Topic) -> Result { - let (tx, rx) = unbounded_channel::(); + let (tx, rx) = channel::(128); let id = self.seq_id; self.seq_id += 1; let subscriber = Subscriber { @@ -117,6 +117,7 @@ impl Monitor { subject: subject.clone(), exit_code, }) + .await .map_err(other_error!(e, "failed to send exit code")); results.push(res); }