Skip to content

Commit

Permalink
Merge pull request #65 from Burning1020/clh-vcpu
Browse files Browse the repository at this point in the history
vmm: add vcpus of clh to sandbox cgroup
  • Loading branch information
flyflypeng authored Aug 23, 2023
2 parents f1b950a + 5ce600e commit 4860bba
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 46 deletions.
57 changes: 40 additions & 17 deletions vmm/sandbox/src/cloud_hypervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::{collections::HashMap, os::unix::io::RawFd, process::Stdio};
use std::{os::unix::io::RawFd, process::Stdio};

use anyhow::anyhow;
use async_trait::async_trait;
Expand All @@ -39,7 +39,7 @@ use crate::{
device::{BusType, DeviceInfo},
impl_recoverable,
param::ToCmdLineParams,
utils::{read_file, read_std, set_cmd_fd, set_cmd_netns, wait_pid, write_file_atomic},
utils::{read_std, set_cmd_fd, set_cmd_netns, wait_pid, write_file_atomic},
vm::{Pids, VcpuThreads, VM},
};

Expand All @@ -49,6 +49,8 @@ pub mod devices;
pub mod factory;
pub mod hooks;

const VCPU_PREFIX: &str = "vcpu";

#[derive(Default, Serialize, Deserialize)]
pub struct CloudHypervisorVM {
id: String,
Expand All @@ -64,6 +66,7 @@ pub struct CloudHypervisorVM {
#[serde(skip)]
client: Option<ChClient>,
fds: Vec<RawFd>,
pids: Pids,
}

impl CloudHypervisorVM {
Expand All @@ -88,20 +91,19 @@ impl CloudHypervisorVM {
wait_chan: None,
client: None,
fds: vec![],
pids: Pids::default(),
}
}

pub fn add_device(&mut self, device: impl CloudHypervisorDevice + Sync + Send + 'static) {
self.devices.push(Box::new(device));
}

async fn pid(&self) -> Result<u32> {
let pid_file = format!("{}/pid", self.base_dir);
let pid = read_file(&*pid_file).await.and_then(|x| {
x.parse::<u32>()
.map_err(|e| anyhow!("failed to parse pid file {}, {}", x, e).into())
})?;
Ok(pid)
fn pid(&self) -> Result<u32> {
match self.pids.vmm_pid {
None => Err(anyhow!("empty pid from vmm_pid").into()),
Some(pid) => Ok(pid),
}
}

async fn create_client(&self) -> Result<ChClient> {
Expand All @@ -114,7 +116,7 @@ impl CloudHypervisorVM {
))
}

async fn start_virtiofsd(&self) -> Result<()> {
async fn start_virtiofsd(&self) -> Result<u32> {
create_dir_all(&self.virtiofsd_config.shared_dir).await?;
let params = self.virtiofsd_config.to_cmdline_params("--");
let mut cmd = tokio::process::Command::new(&self.virtiofsd_config.path);
Expand All @@ -126,8 +128,11 @@ impl CloudHypervisorVM {
let child = cmd
.spawn()
.map_err(|e| anyhow!("failed to spawn virtiofsd command: {}", e))?;
let pid = child
.id()
.ok_or(anyhow!("the virtiofsd has been polled to completion"))?;
spawn_wait(child, "virtiofsd".to_string(), None, None);
Ok(())
Ok(pid)
}

fn append_fd(&mut self, fd: RawFd) -> usize {
Expand All @@ -141,7 +146,7 @@ impl VM for CloudHypervisorVM {
async fn start(&mut self) -> Result<u32> {
debug!("start vm {}", self.id);
create_dir_all(&self.base_dir).await?;
self.start_virtiofsd().await?;
let virtiofsd_pid = self.start_virtiofsd().await?;
let mut params = self.config.to_cmdline_params("--");
for d in self.devices.iter() {
params.extend(d.to_cmdline_params("--"));
Expand Down Expand Up @@ -174,11 +179,16 @@ impl VM for CloudHypervisorVM {
);
self.client = Some(self.create_client().await?);
self.wait_chan = Some(rx);

// update vmm related pids
self.pids.vmm_pid = pid;
self.pids.affilicated_pids.push(virtiofsd_pid);
// TODO: add child virtiofsd process
Ok(pid.unwrap_or_default())
}

async fn stop(&mut self, force: bool) -> Result<()> {
let pid = self.pid().await?;
let pid = self.pid()?;
if pid == 0 {
return Ok(());
}
Expand Down Expand Up @@ -247,15 +257,28 @@ impl VM for CloudHypervisorVM {
}

async fn vcpus(&self) -> Result<VcpuThreads> {
// TODO: support get vcpu threads id
// Refer to https://github.com/firecracker-microvm/firecracker/issues/718
Ok(VcpuThreads {
vcpus: HashMap::new(),
vcpus: procfs::process::Process::new(self.pid()? as i32)
.map_err(|e| anyhow!("failed to get process {}", e))?
.tasks()
.map_err(|e| anyhow!("failed to get tasks {}", e))?
.flatten()
.filter_map(|t| {
t.stat()
.map_err(|e| anyhow!("failed to get stat {}", e))
.ok()?
.comm
.strip_prefix(VCPU_PREFIX)
.and_then(|comm| comm.parse().ok())
.map(|index| (index, t.tid as i64))
})
.collect(),
})
}

fn pids(&self) -> Pids {
// TODO: support get all vmm related pids
Pids::default()
self.pids.clone()
}
}

Expand Down
23 changes: 11 additions & 12 deletions vmm/sandbox/src/qemu/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::{
qmp_client::QmpClient,
utils::detect_pid,
},
utils::{read_file, read_std, wait_channel, wait_pid},
utils::{read_std, wait_channel, wait_pid},
vm::{BlockDriver, Pids, VcpuThreads, VM},
};

Expand Down Expand Up @@ -82,6 +82,7 @@ pub struct QemuVM {
console_socket: String,
agent_socket: String,
netns: String,
pids: Pids,
#[serde(skip)]
block_driver: BlockDriver,
#[serde(skip)]
Expand Down Expand Up @@ -126,7 +127,9 @@ impl VM for QemuVM {
error!("failed to read console log, {}", e);
});
});
// TODO return qemu pid
// update vmm related pids
let vmm_pid = detect_pid(self.config.pid_file.as_str(), self.config.path.as_str()).await?;
self.pids.vmm_pid = Some(vmm_pid);
Ok(0)
}

Expand All @@ -143,7 +146,7 @@ impl VM for QemuVM {

if let Err(e) = self.wait_stop(Duration::from_secs(10)).await {
if force {
if let Ok(pid) = self.pid().await {
if let Ok(pid) = self.pid() {
unsafe { kill(pid as i32, 9) };
}
} else {
Expand Down Expand Up @@ -324,6 +327,7 @@ impl QemuVM {
console_socket: format!("{}/console.sock", base_dir),
agent_socket: "".to_string(),
netns: netns.to_string(),
pids: Pids::default(),
block_driver: Default::default(),
wait_chan: None,
client: None,
Expand Down Expand Up @@ -442,16 +446,11 @@ impl QemuVM {
Ok(())
}

async fn pid(&self) -> Result<u32> {
if self.config.pid_file.is_empty() {
return Err(anyhow!("failed to get pid file of sandbox").into());
fn pid(&self) -> Result<u32> {
match self.pids.vmm_pid {
None => Err(anyhow!("empty pid from vmm_pid").into()),
Some(pid) => Ok(pid),
}
let pid = read_file(&*self.config.pid_file).await.and_then(|x| {
x.trim()
.parse::<u32>()
.map_err(|e| anyhow!("failed to parse qemu.pid, {}", e).into())
})?;
Ok(pid)
}

async fn hot_attach_device<T: QemuHotAttachable + Sync + Send + 'static>(
Expand Down
6 changes: 3 additions & 3 deletions vmm/sandbox/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,11 @@ where
sandbox
.sandbox_cgroups
.add_process_into_sandbox_cgroups(vmm_pid, Some(vcpu_threads))?;
// move the virtiofsd process into sandbox cgroup
if let Some(virtiofsd_pid) = sandbox.vm.pids().virtiofsd_pid {
// move all vmm-related process into sandbox cgroup
for pid in sandbox.vm.pids().affilicated_pids {
sandbox
.sandbox_cgroups
.add_process_into_sandbox_cgroups(virtiofsd_pid, None)?;
.add_process_into_sandbox_cgroups(pid, None)?;
}
} else {
return Err(Error::Other(anyhow!(
Expand Down
24 changes: 12 additions & 12 deletions vmm/sandbox/src/stratovirt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::{
utils::detect_pid,
virtiofs::VirtiofsDaemon,
},
utils::{read_file, read_std, wait_channel, wait_pid},
utils::{read_std, wait_channel, wait_pid},
vm::{BlockDriver, Pids, VcpuThreads, VM},
};

Expand Down Expand Up @@ -140,7 +140,9 @@ impl VM for StratoVirtVM {
let vmm_pid = detect_pid(self.config.pid_file.as_str(), self.config.path.as_str()).await?;
self.pids.vmm_pid = Some(vmm_pid);
if let Some(virtiofsd) = &self.virtiofs_daemon {
self.pids.virtiofsd_pid = virtiofsd.pid;
if let Some(pid) = virtiofsd.pid {
self.pids.affilicated_pids.push(pid);
}
}

Ok(vmm_pid)
Expand All @@ -164,7 +166,10 @@ impl VM for StratoVirtVM {

if let Err(e) = self.wait_stop(Duration::from_secs(10)).await {
if force {
if let Ok(pid) = self.pid().await {
if let Ok(pid) = self.pid() {
if pid == 0 {
return Ok(());
}
unsafe { kill(pid as i32, 9) };
}
} else {
Expand Down Expand Up @@ -408,16 +413,11 @@ impl StratoVirtVM {
Ok(())
}

async fn pid(&self) -> Result<u32> {
if self.config.pid_file.is_empty() {
return Err(anyhow!("failed to get pid file of sandbox").into());
fn pid(&self) -> Result<u32> {
match self.pids.vmm_pid {
None => Err(anyhow!("empty pid from vmm_pid").into()),
Some(pid) => Ok(pid),
}
let pid = read_file(&*self.config.pid_file).await.and_then(|x| {
x.trim()
.parse::<u32>()
.map_err(|e| anyhow!("failed to parse stratovirt.pid, {}", e).into())
})?;
Ok(pid)
}

async fn hot_attach_device<T: StratoVirtHotAttachable + Sync + Send + 'static>(
Expand Down
4 changes: 2 additions & 2 deletions vmm/sandbox/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ macro_rules! impl_recoverable {
impl $crate::vm::Recoverable for $ty {
async fn recover(&mut self) -> Result<()> {
self.client = Some(self.create_client().await?);
let pid = self.pid().await?;
let pid = self.pid()?;
let (tx, rx) = channel((0u32, 0i128));
tokio::spawn(async move {
let wait_result = wait_pid(pid as i32).await;
Expand Down Expand Up @@ -210,5 +210,5 @@ pub struct VcpuThreads {
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Pids {
pub vmm_pid: Option<u32>,
pub virtiofsd_pid: Option<u32>,
pub affilicated_pids: Vec<u32>,
}

0 comments on commit 4860bba

Please sign in to comment.