From bc81b8aaeacd6d2b1dc803b4759523690920864c Mon Sep 17 00:00:00 2001 From: jefftt Date: Thu, 14 Sep 2023 10:08:21 -0400 Subject: [PATCH] feat(console): add warning for tasks that never yield (#439) feat(console): add warning for tasks that never yield (#439) Adds a new warning for tasks that have never yielded. This is more complicated than it sounds, because under the previous behavior, tasks are only linted when they are created or updated. A task that never yields doesn't get updated, so if it gets linted before the "never yielded" threshold is reached (defaults to 1 second), the warning returns false and the task won't get linted again. To solve this problem, warnings can now return a response that requests to be rechecked later. In practice, this later is on the next update cycle, at which point the task may again request to be rechecked later. For this warning, the overhead will likely be only a single recheck after task creation (and the fact that we need to store those task IDs between updates). We will have to consider the performance impact of any new warnings which may request rechecks as they are added. Co-authored-by: Hayden Stainsby --- console-subscriber/examples/app.rs | 21 +++++ tokio-console/src/main.rs | 1 + tokio-console/src/state/tasks.rs | 50 +++++++++-- tokio-console/src/warnings.rs | 136 ++++++++++++++++++++++++----- 4 files changed, 180 insertions(+), 28 deletions(-) diff --git a/console-subscriber/examples/app.rs b/console-subscriber/examples/app.rs index f1247ed62..7a1b50c11 100644 --- a/console-subscriber/examples/app.rs +++ b/console-subscriber/examples/app.rs @@ -11,6 +11,7 @@ OPTIONS: blocks Includes a (misbehaving) blocking task burn Includes a (misbehaving) task that spins CPU with self-wakes coma Includes a (misbehaving) task that forgets to register a waker + noyield Includes a (misbehaving) task that spawns tasks that never yield "#; #[tokio::main] @@ -38,6 +39,12 @@ async fn main() -> Result<(), Box> { .spawn(burn(1, 10)) .unwrap(); } + "noyield" => { + tokio::task::Builder::new() + .name("noyield") + .spawn(no_yield(20)) + .unwrap(); + } "help" | "-h" => { eprintln!("{}", HELP); return Ok(()); @@ -114,3 +121,17 @@ async fn burn(min: u64, max: u64) { } } } + +#[tracing::instrument] +async fn no_yield(seconds: u64) { + loop { + let handle = tokio::task::Builder::new() + .name("greedy") + .spawn(async move { + std::thread::sleep(Duration::from_secs(seconds)); + }) + .expect("Couldn't spawn greedy task"); + + _ = handle.await; + } +} diff --git a/tokio-console/src/main.rs b/tokio-console/src/main.rs index 4fc6630bb..8545e30b9 100644 --- a/tokio-console/src/main.rs +++ b/tokio-console/src/main.rs @@ -65,6 +65,7 @@ async fn main() -> color_eyre::Result<()> { .with_task_linters(vec![ warnings::Linter::new(warnings::SelfWakePercent::default()), warnings::Linter::new(warnings::LostWaker), + warnings::Linter::new(warnings::NeverYielded::default()), ]) .with_retain_for(retain_for); let mut input = input::EventStream::new(); diff --git a/tokio-console/src/state/tasks.rs b/tokio-console/src/state/tasks.rs index bf9c8a2c0..97aa4f35b 100644 --- a/tokio-console/src/state/tasks.rs +++ b/tokio-console/src/state/tasks.rs @@ -9,13 +9,13 @@ use crate::{ }, util::Percentage, view, - warnings::Linter, + warnings::{Lint, Linter}, }; use console_api as proto; use ratatui::{style::Color, text::Span}; use std::{ cell::RefCell, - collections::HashMap, + collections::{HashMap, HashSet}, convert::{TryFrom, TryInto}, rc::{Rc, Weak}, time::{Duration, SystemTime}, @@ -24,6 +24,7 @@ use std::{ #[derive(Default, Debug)] pub(crate) struct TasksState { tasks: Store, + pending_lint: HashSet>, pub(crate) linters: Vec>, dropped_events: u64, } @@ -145,6 +146,9 @@ impl TasksState { let mut stats_update = update.stats_update; let linters = &self.linters; + // Gathers the tasks that need to be linted again on the next update cycle + let mut next_pending_lint = HashSet::new(); + self.tasks .insert_with(visibility, update.new_tasks, |ids, mut task| { if task.id.is_none() { @@ -217,15 +221,30 @@ impl TasksState { warnings: Vec::new(), location, }; - task.lint(linters); + if let TaskLintResult::RequiresRecheck = task.lint(linters) { + next_pending_lint.insert(task.id); + } Some((id, task)) }); for (stats, mut task) in self.tasks.updated(stats_update) { tracing::trace!(?task, ?stats, "processing stats update for"); task.stats = stats.into(); - task.lint(linters); + match task.lint(linters) { + TaskLintResult::RequiresRecheck => next_pending_lint.insert(task.id), + // Avoid linting this task again this cycle + _ => self.pending_lint.remove(&task.id), + }; + } + + for id in &self.pending_lint { + if let Some(task) = self.tasks.get(*id) { + if let TaskLintResult::RequiresRecheck = task.borrow_mut().lint(linters) { + next_pending_lint.insert(*id); + } + } } + self.pending_lint = next_pending_lint; self.dropped_events += update.dropped_events; } @@ -430,15 +449,25 @@ impl Task { &self.warnings[..] } - fn lint(&mut self, linters: &[Linter]) { + fn lint(&mut self, linters: &[Linter]) -> TaskLintResult { self.warnings.clear(); + let mut recheck = false; for lint in linters { tracing::debug!(?lint, task = ?self, "checking..."); - if let Some(warning) = lint.check(self) { - tracing::info!(?warning, task = ?self, "found a warning!"); - self.warnings.push(warning) + match lint.check(self) { + Lint::Warning(warning) => { + tracing::info!(?warning, task = ?self, "found a warning!"); + self.warnings.push(warning); + } + Lint::Ok => {} + Lint::Recheck => recheck = true, } } + if recheck { + TaskLintResult::RequiresRecheck + } else { + TaskLintResult::Linted + } } pub(crate) fn location(&self) -> &str { @@ -446,6 +475,11 @@ impl Task { } } +enum TaskLintResult { + Linted, + RequiresRecheck, +} + impl From for TaskStats { fn from(pb: proto::tasks::Stats) -> Self { let created_at = pb diff --git a/tokio-console/src/warnings.rs b/tokio-console/src/warnings.rs index 818b140bc..59d43f99f 100644 --- a/tokio-console/src/warnings.rs +++ b/tokio-console/src/warnings.rs @@ -1,5 +1,9 @@ -use crate::state::tasks::Task; -use std::{fmt::Debug, rc::Rc}; +use crate::state::tasks::{Task, TaskState}; +use std::{ + fmt::Debug, + rc::Rc, + time::{Duration, SystemTime}, +}; /// A warning for a particular type of monitored entity (e.g. task or resource). /// @@ -7,8 +11,8 @@ use std::{fmt::Debug, rc::Rc}; /// generating a warning message describing it. The [`Linter`] type wraps an /// instance of this trait to track active instances of the warning. pub trait Warn: Debug { - /// Returns `true` if the warning applies to `val`. - fn check(&self, val: &T) -> bool; + /// Returns if the warning applies to `val`. + fn check(&self, val: &T) -> Warning; /// Formats a description of the warning detected for a *specific* `val`. /// @@ -46,6 +50,19 @@ pub trait Warn: Debug { fn summary(&self) -> &str; } +/// A result for a warning check +pub enum Warning { + /// No warning for this entity. + Ok, + + /// A warning has been detected for this entity. + Warn, + + /// The warning should be rechecked as the conditions to allow for checking + /// are not satisfied yet + Recheck, +} + #[derive(Debug)] pub(crate) struct Linter(Rc>); @@ -57,17 +74,12 @@ impl Linter { Self(Rc::new(warning)) } - /// Checks if the warning applies to a particular entity, returning a clone - /// of `Self` if it does. - /// - /// The cloned instance of `Self` should be held by the entity that - /// generated the warning, so that it can be formatted. Holding the clone of - /// `Self` will increment the warning count for that entity. - pub(crate) fn check(&self, val: &T) -> Option { - if self.0.check(val) { - Some(Self(self.0.clone())) - } else { - None + /// Checks if the warning applies to a particular entity + pub(crate) fn check(&self, val: &T) -> Lint { + match self.0.check(val) { + Warning::Ok => Lint::Ok, + Warning::Warn => Lint::Warning(Self(self.0.clone())), + Warning::Recheck => Lint::Recheck, } } @@ -78,7 +90,7 @@ impl Linter { pub(crate) fn format(&self, val: &T) -> String { debug_assert!( - self.0.check(val), + matches!(self.0.check(val), Warning::Warn), "tried to format a warning for a {} that did not have that warning!", std::any::type_name::() ); @@ -90,6 +102,21 @@ impl Linter { } } +/// A result for a linter check +pub(crate) enum Lint { + /// No warning applies to the entity + Ok, + + /// The cloned instance of `Self` should be held by the entity that + /// generated the warning, so that it can be formatted. Holding the clone of + /// `Self` will increment the warning count for that entity. + Warning(Linter), + + /// The lint should be rechecked as the conditions to allow for checking are + /// not satisfied yet + Recheck, +} + #[derive(Clone, Debug)] pub(crate) struct SelfWakePercent { min_percent: u64, @@ -120,9 +147,13 @@ impl Warn for SelfWakePercent { self.description.as_str() } - fn check(&self, task: &Task) -> bool { + fn check(&self, task: &Task) -> Warning { let self_wakes = task.self_wake_percent(); - self_wakes > self.min_percent + if self_wakes > self.min_percent { + Warning::Warn + } else { + Warning::Ok + } } fn format(&self, task: &Task) -> String { @@ -142,11 +173,76 @@ impl Warn for LostWaker { "tasks have lost their waker" } - fn check(&self, task: &Task) -> bool { - !task.is_completed() && task.waker_count() == 0 && !task.is_running() && !task.is_awakened() + fn check(&self, task: &Task) -> Warning { + if !task.is_completed() + && task.waker_count() == 0 + && !task.is_running() + && !task.is_awakened() + { + Warning::Warn + } else { + Warning::Ok + } } fn format(&self, _: &Task) -> String { "This task has lost its waker, and will never be woken again.".into() } } + +/// Warning for if a task has never yielded +#[derive(Clone, Debug)] +pub(crate) struct NeverYielded { + min_duration: Duration, + description: String, +} + +impl NeverYielded { + pub(crate) const DEFAULT_DURATION: Duration = Duration::from_secs(1); + pub(crate) fn new(min_duration: Duration) -> Self { + Self { + min_duration, + description: format!( + "tasks have never yielded (threshold {}ms)", + min_duration.as_millis() + ), + } + } +} + +impl Default for NeverYielded { + fn default() -> Self { + Self::new(Self::DEFAULT_DURATION) + } +} + +impl Warn for NeverYielded { + fn summary(&self) -> &str { + self.description.as_str() + } + + fn check(&self, task: &Task) -> Warning { + // Don't fire warning for tasks that are waiting to run + if task.state() != TaskState::Running { + return Warning::Ok; + } + + if task.total_polls() > 1 { + return Warning::Ok; + } + + // Avoid short-lived task false positives + if task.busy(SystemTime::now()) >= self.min_duration { + Warning::Warn + } else { + Warning::Recheck + } + } + + fn format(&self, task: &Task) -> String { + format!( + "This task has never yielded ({:?})", + task.busy(SystemTime::now()), + ) + } +}