Skip to content

Commit

Permalink
Feature: Add oom monitor for cgroupv2.
Browse files Browse the repository at this point in the history
  • Loading branch information
aa624545345 committed Apr 7, 2024
1 parent c696ce4 commit e83b1d0
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 44 deletions.
2 changes: 1 addition & 1 deletion crates/runc-shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ tokio = { workspace = true, features = ["full"] }

[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.3.3"
nix = { workspace = true, features = ["event"] }
nix = { workspace = true, features = ["event", "inotify"] }
207 changes: 166 additions & 41 deletions crates/runc-shim/src/cgroup_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,32 @@
#![cfg(target_os = "linux")]

use std::{
os::unix::io::{AsRawFd, FromRawFd},
collections::HashMap,
mem::size_of,
os::{
fd::AsFd,
unix::io::{AsRawFd, FromRawFd},
},
path::Path,
sync::Arc,
};

use containerd_shim::{
error::{Error, Result},
io_error, other_error,
};
use nix::sys::eventfd::{EfdFlags, EventFd};
use nix::sys::{
eventfd::{EfdFlags, EventFd},
inotify,
};
use tokio::{
fs::{self, read_to_string, File},
io::AsyncReadExt,
sync::mpsc::{self, Receiver},
};

pub const DEFAULT_CGROUPV2_PATH: &str = "/sys/fs/cgroup";

pub async fn get_path_from_cgorup(pid: u32) -> Result<String> {
let proc_path = format!("/proc/{}/cgroup", pid);
let path_string = read_to_string(&proc_path)
Expand Down Expand Up @@ -126,6 +137,111 @@ pub async fn register_memory_event(
Ok(receiver)
}

fn memory_event_fd(path: &Path) -> Result<inotify::Inotify> {
let instance = inotify::Inotify::init(inotify::InitFlags::empty())?;

let fpath = path.join("memory.events");
instance.add_watch(&fpath, inotify::AddWatchFlags::IN_MODIFY)?;

let evpath = path.join("cgroup.events");
instance.add_watch(&evpath, inotify::AddWatchFlags::IN_MODIFY)?;

Ok(instance)
}

async fn parse_kv_file(cg_dir: &Path, file: &str) -> Result<HashMap<String, u32>> {
let path = cg_dir.join(file);
let mut map: HashMap<String, u32> = HashMap::new();

let file_string = read_to_string(path.clone()).await.map_err(io_error!(
e,
"open {}.",
path.to_string_lossy()
))?;

for line in file_string.lines() {
if let Some((key, val)) = line.split_once(" ") {
let val = val.parse::<u32>()?;
map.insert(key.to_string(), val);
}
}

Ok(map)
}

pub async fn register_memory_event_v2(key: &str, cg_dir: &Path) -> Result<Receiver<String>> {
let (sender, receiver) = mpsc::channel(128);
let cg_dir = Arc::new(Box::from(cg_dir));
let key = key.to_string();

tokio::spawn(async move {
let inotify = memory_event_fd(&cg_dir).unwrap();
let mut eventfd_file = unsafe { File::from_raw_fd(inotify.as_fd().as_raw_fd()) };
let mut buffer: [u8; 4096] = [0u8; 4096];

let mut lastoom_map: HashMap<String, u32> = HashMap::new();
loop {
let nread = match eventfd_file.read(&mut buffer).await {
Ok(nread) => nread,
Err(_) => return,
};
if nread >= size_of::<libc::inotify_event>() {
match parse_kv_file(&cg_dir, "memory.events").await {
Ok(mem_map) => {
let last_oom_kill = match lastoom_map.get(&key) {
Some(v) => v,
None => &0,
};

let oom_kill = match mem_map.get("oom_kill") {
Some(v) => v,
None => return,
};

if *oom_kill > *last_oom_kill {
sender.send(key.clone()).await.unwrap();
}
if *oom_kill > 0 {
lastoom_map.insert(key.to_string(), *oom_kill);
}
}
Err(_) => return,
};

let cg_map = match parse_kv_file(&cg_dir, "cgroup.events").await {
Ok(cg_map) => cg_map,
Err(_) => return,
};
match cg_map.get("populated") {
Some(v) if *v == 0 => return,
Some(_) => (),
None => return,
};
}
}
});

Ok(receiver)
}

pub async fn get_path_from_cgorup_v2(pid: u32) -> Result<String> {
let proc_path = format!("/proc/{}/cgroup", pid);
let path_string = read_to_string(&proc_path)
.await
.map_err(io_error!(e, "open {}.", &proc_path))?;

let (_, path) = path_string
.lines()
.nth(0)
.ok_or(Error::Other(
"Error happened while geting the path from cgroup of container process pid.".into(),
))?
.split_once("::")
.ok_or(Error::Other("Failed to parse memory line".into()))?;

Ok(path.to_string())
}

#[cfg(test)]
mod tests {
use std::path::Path;
Expand All @@ -140,63 +256,72 @@ mod tests {
use crate::cgroup_memory;

#[tokio::test]
async fn test_cgroupv1_oom_monitor() {
if !is_cgroup2_unified_mode() {
// Create a memory cgroup with limits on both memory and swap.
let path = "cgroupv1_oom_monitor";
let cg = Cgroup::new(hierarchies::auto(), path).unwrap();

let mem_controller: &MemController = cg.controller_of().unwrap();
mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M
mem_controller.set_swappiness(0).unwrap();

// Create a sh sub process, and let it wait for the stdinput.
let mut child_process = Command::new("sh")
.stdin(std::process::Stdio::piped())
.spawn()
.unwrap();
async fn test_cgroup_oom_monitor() {
// Create a memory cgroup with limits on both memory and swap.
let path = "cgroupv1_oom_monitor";
let cg = Cgroup::new(hierarchies::auto(), path).unwrap();

let mem_controller: &MemController = cg.controller_of().unwrap();
mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M
mem_controller.set_swappiness(0).unwrap();

// Create a sh sub process, and let it wait for the stdinput.
let mut child_process = Command::new("sh")
.stdin(std::process::Stdio::piped())
.spawn()
.unwrap();

let pid = child_process.id().unwrap();
let pid = child_process.id().unwrap();

// Add the sh subprocess to the cgroup.
cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap();
// Add the sh subprocess to the cgroup.
cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap();
let mut rx: tokio::sync::mpsc::Receiver<String>;

// Set oom monitor
if !is_cgroup2_unified_mode() {
// Set cgroupv1 oom monitor
let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await.unwrap();
let (mount_root, mount_point) =
cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup)
.await
.unwrap();

let mem_cgroup_path = mount_point + &mount_root;
let mut rx = cgroup_memory::register_memory_event(
rx = cgroup_memory::register_memory_event(
pid.to_string().as_str(),
Path::new(&mem_cgroup_path),
"memory.oom_control",
)
.await
.unwrap();
} else {
// Set cgroupv2 oom monitor
let path_from_cgorup = cgroup_memory::get_path_from_cgorup_v2(pid).await.unwrap();
let mem_cgroup_path =
cgroup_memory::DEFAULT_CGROUPV2_PATH.to_owned() + &path_from_cgorup;

// Exec the sh subprocess to a dd command that consumes more than 10M of memory.
if let Some(mut stdin) = child_process.stdin.take() {
stdin
.write_all(
b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n",
)
.await
.unwrap();
stdin.flush().await.unwrap();
}

// Wait for the oom message.
if let Some(item) = rx.recv().await {
assert_eq!(pid.to_string(), item, "Receive error oom message");
}
rx = cgroup_memory::register_memory_event_v2(
pid.to_string().as_str(),
Path::new(&mem_cgroup_path),
)
.await
.unwrap();
}
// Exec the sh subprocess to a dd command that consumes more than 10M of memory.
if let Some(mut stdin) = child_process.stdin.take() {
stdin
.write_all(b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n")
.await
.unwrap();
stdin.flush().await.unwrap();
}

// Clean.
child_process.wait().await.unwrap();
cg.delete().unwrap();
remove_file("/tmp/test_oom_monitor_file").await.unwrap();
// Wait for the oom message.
if let Some(item) = rx.recv().await {
assert_eq!(pid.to_string(), item, "Receive error oom message");
}
// Clean.
child_process.wait().await.unwrap();
cg.delete().unwrap();
remove_file("/tmp/test_oom_monitor_file").await.unwrap();
}
}
13 changes: 11 additions & 2 deletions crates/runc-shim/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,31 @@ fn run_oom_monitor(mut rx: Receiver<String>, id: String, tx: EventSender) {

#[cfg(target_os = "linux")]
async fn monitor_oom(id: &String, pid: u32, tx: EventSender) -> Result<()> {
// std::thread::sleep(std::time::Duration::from_secs(20));
let rx: Receiver<String>;
if !is_cgroup2_unified_mode() {
let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await?;
let (mount_root, mount_point) =
cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup).await?;

let mem_cgroup_path = mount_point + &mount_root;
let rx = cgroup_memory::register_memory_event(
rx = cgroup_memory::register_memory_event(
id,
Path::new(&mem_cgroup_path),
"memory.oom_control",
)
.await
.map_err(other_error!(e, "register_memory_event failed:"))?;
} else {
let path_from_cgorup = cgroup_memory::get_path_from_cgorup_v2(pid).await?;
let mem_cgroup_path = cgroup_memory::DEFAULT_CGROUPV2_PATH.to_owned() + &path_from_cgorup;

run_oom_monitor(rx, id.to_string(), tx);
rx = cgroup_memory::register_memory_event_v2(id, Path::new(&mem_cgroup_path))
.await
.map_err(other_error!(e, "register_memory_event failed:"))?;
}

run_oom_monitor(rx, id.to_string(), tx);
Ok(())
}

Expand Down

0 comments on commit e83b1d0

Please sign in to comment.