diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 8d30f66f6ff..bef52cca4a1 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -47,6 +47,25 @@ impl RuntimeMetrics { self.handle.inner.num_workers() } + /// Returns the number of active tasks in the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.active_tasks_count(); + /// println!("Runtime has {} active tasks", n); + /// } + /// ``` + pub fn active_tasks_count(&self) -> usize { + self.handle.inner.active_tasks_count() + } + cfg_unstable_metrics! { /// Returns the number of additional threads spawned by the runtime. @@ -75,25 +94,6 @@ impl RuntimeMetrics { self.handle.inner.num_blocking_threads() } - /// Returns the number of active tasks in the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.active_tasks_count(); - /// println!("Runtime has {} active tasks", n); - /// } - /// ``` - pub fn active_tasks_count(&self) -> usize { - self.handle.inner.active_tasks_count() - } - /// Returns the number of idle threads, which have spawned by the runtime /// for `spawn_blocking` calls. /// diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index b9c23837a58..ccc146ba435 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -500,6 +500,10 @@ impl Handle { pub(crate) fn reset_woken(&self) -> bool { self.shared.woken.swap(false, AcqRel) } + + pub(crate) fn active_tasks_count(&self) -> usize { + self.shared.owned.active_tasks_count() + } } cfg_unstable_metrics! { @@ -533,9 +537,6 @@ cfg_unstable_metrics! { self.blocking_spawner.queue_depth() } - pub(crate) fn active_tasks_count(&self) -> usize { - self.shared.owned.active_tasks_count() - } } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 3cbba11b752..679b05e7875 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -173,6 +173,10 @@ cfg_rt! { Handle::MultiThreadAlt(handle) => handle.num_workers(), } } + + pub(crate) fn active_tasks_count(&self) -> usize { + match_flavor!(self, Handle(handle) => handle.active_tasks_count()) + } } cfg_unstable_metrics! { @@ -187,9 +191,6 @@ cfg_rt! { match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads()) } - pub(crate) fn active_tasks_count(&self) -> usize { - match_flavor!(self, Handle(handle) => handle.active_tasks_count()) - } pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { match_flavor!(self, Handle(handle) => handle.scheduler_metrics()) diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 6ced245ee5b..1e478cd153a 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -9,6 +9,10 @@ impl Handle { self.shared.worker_metrics.len() } + pub(crate) fn active_tasks_count(&self) -> usize { + self.shared.owned.active_tasks_count() + } + cfg_unstable_metrics! { pub(crate) fn num_blocking_threads(&self) -> usize { // workers are currently spawned using spawn_blocking @@ -21,9 +25,6 @@ impl Handle { self.blocking_spawner.num_idle_threads() } - pub(crate) fn active_tasks_count(&self) -> usize { - self.shared.owned.active_tasks_count() - } pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 2446deb6b41..b522eeebcce 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -1,20 +1,8 @@ #![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] -#![cfg(all( - feature = "full", - tokio_unstable, - not(target_os = "wasi"), - target_has_atomic = "64" -))] - -use std::future::Future; -use std::sync::{Arc, Barrier, Mutex}; -use std::task::Poll; -use tokio::macros::support::poll_fn; +#![cfg(all(feature = "full", not(target_os = "wasi"), target_has_atomic = "64"))] use tokio::runtime::Runtime; -use tokio::task::consume_budget; -use tokio::time::{self, Duration}; #[test] fn num_workers() { @@ -25,71 +13,12 @@ fn num_workers() { assert_eq!(2, rt.metrics().num_workers()); } -#[test] -fn num_blocking_threads() { - let rt = current_thread(); - assert_eq!(0, rt.metrics().num_blocking_threads()); - let _ = rt.block_on(rt.spawn_blocking(move || {})); - assert_eq!(1, rt.metrics().num_blocking_threads()); - - let rt = threaded(); - assert_eq!(0, rt.metrics().num_blocking_threads()); - let _ = rt.block_on(rt.spawn_blocking(move || {})); - assert_eq!(1, rt.metrics().num_blocking_threads()); -} - -#[test] -fn num_idle_blocking_threads() { - let rt = current_thread(); - assert_eq!(0, rt.metrics().num_idle_blocking_threads()); - let _ = rt.block_on(rt.spawn_blocking(move || {})); - rt.block_on(async { - time::sleep(Duration::from_millis(5)).await; - }); - - // We need to wait until the blocking thread has become idle. Usually 5ms is - // enough for this to happen, but not always. When it isn't enough, sleep - // for another second. We don't always wait for a whole second since we want - // the test suite to finish quickly. - // - // Note that the timeout for idle threads to be killed is 10 seconds. - if 0 == rt.metrics().num_idle_blocking_threads() { - rt.block_on(async { - time::sleep(Duration::from_secs(1)).await; - }); - } - - assert_eq!(1, rt.metrics().num_idle_blocking_threads()); -} - -#[test] -fn blocking_queue_depth() { - let rt = tokio::runtime::Builder::new_current_thread() +fn threaded() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) .enable_all() - .max_blocking_threads(1) .build() - .unwrap(); - - assert_eq!(0, rt.metrics().blocking_queue_depth()); - - let ready = Arc::new(Mutex::new(())); - let guard = ready.lock().unwrap(); - - let ready_cloned = ready.clone(); - let wait_until_ready = move || { - let _unused = ready_cloned.lock().unwrap(); - }; - - let h1 = rt.spawn_blocking(wait_until_ready.clone()); - let h2 = rt.spawn_blocking(wait_until_ready); - assert!(rt.metrics().blocking_queue_depth() > 0); - - drop(guard); - - let _ = rt.block_on(h1); - let _ = rt.block_on(h2); - - assert_eq!(0, rt.metrics().blocking_queue_depth()); + .unwrap() } #[test] @@ -109,635 +38,707 @@ fn active_tasks_count() { }); } -#[test] -fn remote_schedule_count() { - use std::thread; - - let rt = current_thread(); - let handle = rt.handle().clone(); - let task = thread::spawn(move || { - handle.spawn(async { - // DO nothing - }) - }) - .join() - .unwrap(); +fn current_thread() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} - rt.block_on(task).unwrap(); +#[cfg(tokio_unstable)] +mod metric_tests_unstable { + use std::future::Future; + use std::sync::{Arc, Barrier, Mutex}; + use std::task::Poll; + use tokio::macros::support::poll_fn; + + use super::{current_thread, threaded}; + use tokio::runtime::Runtime; + use tokio::task::consume_budget; + use tokio::time::{self, Duration}; + + #[test] + fn num_blocking_threads() { + let rt = current_thread(); + assert_eq!(0, rt.metrics().num_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + assert_eq!(1, rt.metrics().num_blocking_threads()); + + let rt = threaded(); + assert_eq!(0, rt.metrics().num_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + assert_eq!(1, rt.metrics().num_blocking_threads()); + } - assert_eq!(1, rt.metrics().remote_schedule_count()); + #[test] + fn num_idle_blocking_threads() { + let rt = current_thread(); + assert_eq!(0, rt.metrics().num_idle_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + rt.block_on(async { + time::sleep(Duration::from_millis(5)).await; + }); - let rt = threaded(); - let handle = rt.handle().clone(); - let task = thread::spawn(move || { - handle.spawn(async { - // DO nothing - }) - }) - .join() - .unwrap(); + // We need to wait until the blocking thread has become idle. Usually 5ms is + // enough for this to happen, but not always. When it isn't enough, sleep + // for another second. We don't always wait for a whole second since we want + // the test suite to finish quickly. + // + // Note that the timeout for idle threads to be killed is 10 seconds. + if 0 == rt.metrics().num_idle_blocking_threads() { + rt.block_on(async { + time::sleep(Duration::from_secs(1)).await; + }); + } - rt.block_on(task).unwrap(); + assert_eq!(1, rt.metrics().num_idle_blocking_threads()); + } - assert_eq!(1, rt.metrics().remote_schedule_count()); -} + #[test] + fn blocking_queue_depth() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(1) + .build() + .unwrap(); -#[test] -fn worker_park_count() { - let rt = current_thread(); - let metrics = rt.metrics(); - rt.block_on(async { - time::sleep(Duration::from_millis(1)).await; - }); - drop(rt); - assert!(1 <= metrics.worker_park_count(0)); + assert_eq!(0, rt.metrics().blocking_queue_depth()); - let rt = threaded(); - let metrics = rt.metrics(); - rt.block_on(async { - time::sleep(Duration::from_millis(1)).await; - }); - drop(rt); - assert!(1 <= metrics.worker_park_count(0)); - assert!(1 <= metrics.worker_park_count(1)); -} + let ready = Arc::new(Mutex::new(())); + let guard = ready.lock().unwrap(); -#[test] -fn worker_noop_count() { - // There isn't really a great way to generate no-op parks as they happen as - // false-positive events under concurrency. + let ready_cloned = ready.clone(); + let wait_until_ready = move || { + let _unused = ready_cloned.lock().unwrap(); + }; - let rt = current_thread(); - let metrics = rt.metrics(); - rt.block_on(async { - time::sleep(Duration::from_millis(1)).await; - }); - drop(rt); - assert!(0 < metrics.worker_noop_count(0)); + let h1 = rt.spawn_blocking(wait_until_ready.clone()); + let h2 = rt.spawn_blocking(wait_until_ready); + assert!(rt.metrics().blocking_queue_depth() > 0); - let rt = threaded(); - let metrics = rt.metrics(); - rt.block_on(async { - time::sleep(Duration::from_millis(1)).await; - }); - drop(rt); - assert!(0 < metrics.worker_noop_count(0)); - assert!(0 < metrics.worker_noop_count(1)); -} + drop(guard); -#[test] -#[ignore] // this test is flaky, see https://github.com/tokio-rs/tokio/issues/6470 -fn worker_steal_count() { - // This metric only applies to the multi-threaded runtime. - // - // We use a blocking channel to backup one worker thread. - use std::sync::mpsc::channel; - - let rt = threaded_no_lifo(); - let metrics = rt.metrics(); + let _ = rt.block_on(h1); + let _ = rt.block_on(h2); - rt.block_on(async { - let (tx, rx) = channel(); + assert_eq!(0, rt.metrics().blocking_queue_depth()); + } - // Move to the runtime. - tokio::spawn(async move { - // Spawn the task that sends to the channel - // - // Since the lifo slot is disabled, this task is stealable. - tokio::spawn(async move { - tx.send(()).unwrap(); - }); + #[test] + fn remote_schedule_count() { + use std::thread; - // Blocking receive on the channel. - rx.recv().unwrap(); + let rt = current_thread(); + let handle = rt.handle().clone(); + let task = thread::spawn(move || { + handle.spawn(async { + // DO nothing + }) }) - .await + .join() .unwrap(); - }); - drop(rt); + rt.block_on(task).unwrap(); - let n: u64 = (0..metrics.num_workers()) - .map(|i| metrics.worker_steal_count(i)) - .sum(); + assert_eq!(1, rt.metrics().remote_schedule_count()); - assert_eq!(1, n); -} + let rt = threaded(); + let handle = rt.handle().clone(); + let task = thread::spawn(move || { + handle.spawn(async { + // DO nothing + }) + }) + .join() + .unwrap(); -#[test] -fn worker_poll_count_and_time() { - const N: u64 = 5; + rt.block_on(task).unwrap(); - async fn task() { - // Sync sleep - std::thread::sleep(std::time::Duration::from_micros(10)); + assert_eq!(1, rt.metrics().remote_schedule_count()); } - let rt = current_thread(); - let metrics = rt.metrics(); - rt.block_on(async { - for _ in 0..N { - tokio::spawn(task()).await.unwrap(); - } - }); - drop(rt); - assert_eq!(N, metrics.worker_poll_count(0)); - // Not currently supported for current-thread runtime - assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0)); - - // Does not populate the histogram - assert!(!metrics.poll_count_histogram_enabled()); - for i in 0..10 { - assert_eq!(0, metrics.poll_count_histogram_bucket_count(0, i)); + #[test] + fn worker_park_count() { + let rt = current_thread(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(1 <= metrics.worker_park_count(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(1 <= metrics.worker_park_count(0)); + assert!(1 <= metrics.worker_park_count(1)); } - let rt = threaded(); - let metrics = rt.metrics(); - rt.block_on(async { - for _ in 0..N { - tokio::spawn(task()).await.unwrap(); - } - }); - drop(rt); - // Account for the `block_on` task - let n = (0..metrics.num_workers()) - .map(|i| metrics.worker_poll_count(i)) - .sum(); + #[test] + fn worker_noop_count() { + // There isn't really a great way to generate no-op parks as they happen as + // false-positive events under concurrency. + + let rt = current_thread(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(0 < metrics.worker_noop_count(0)); - assert_eq!(N, n); + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(0 < metrics.worker_noop_count(0)); + assert!(0 < metrics.worker_noop_count(1)); + } - let n: Duration = (0..metrics.num_workers()) - .map(|i| metrics.worker_mean_poll_time(i)) - .sum(); + #[test] + #[ignore] // this test is flaky, see https://github.com/tokio-rs/tokio/issues/6470 + fn worker_steal_count() { + // This metric only applies to the multi-threaded runtime. + // + // We use a blocking channel to backup one worker thread. + use std::sync::mpsc::channel; - assert!(n > Duration::default()); + let rt = threaded_no_lifo(); + let metrics = rt.metrics(); - // Does not populate the histogram - assert!(!metrics.poll_count_histogram_enabled()); - for n in 0..metrics.num_workers() { - for i in 0..10 { - assert_eq!(0, metrics.poll_count_histogram_bucket_count(n, i)); - } + rt.block_on(async { + let (tx, rx) = channel(); + + // Move to the runtime. + tokio::spawn(async move { + // Spawn the task that sends to the channel + // + // Since the lifo slot is disabled, this task is stealable. + tokio::spawn(async move { + tx.send(()).unwrap(); + }); + + // Blocking receive on the channel. + rx.recv().unwrap(); + }) + .await + .unwrap(); + }); + + drop(rt); + + let n: u64 = (0..metrics.num_workers()) + .map(|i| metrics.worker_steal_count(i)) + .sum(); + + assert_eq!(1, n); } -} -#[test] -fn worker_poll_count_histogram() { - const N: u64 = 5; + #[test] + fn worker_poll_count_and_time() { + const N: u64 = 5; - let rts = [ - tokio::runtime::Builder::new_current_thread() - .enable_all() - .enable_metrics_poll_count_histogram() - .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) - .metrics_poll_count_histogram_buckets(3) - .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) - .build() - .unwrap(), - tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .enable_metrics_poll_count_histogram() - .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) - .metrics_poll_count_histogram_buckets(3) - .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) - .build() - .unwrap(), - ]; + async fn task() { + // Sync sleep + std::thread::sleep(std::time::Duration::from_micros(10)); + } - for rt in rts { + let rt = current_thread(); let metrics = rt.metrics(); rt.block_on(async { for _ in 0..N { - tokio::spawn(async {}).await.unwrap(); + tokio::spawn(task()).await.unwrap(); } }); drop(rt); + assert_eq!(N, metrics.worker_poll_count(0)); + // Not currently supported for current-thread runtime + assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0)); - let num_workers = metrics.num_workers(); - let num_buckets = metrics.poll_count_histogram_num_buckets(); - - assert!(metrics.poll_count_histogram_enabled()); - assert_eq!(num_buckets, 3); + // Does not populate the histogram + assert!(!metrics.poll_count_histogram_enabled()); + for i in 0..10 { + assert_eq!(0, metrics.poll_count_histogram_bucket_count(0, i)); + } - let n = (0..num_workers) - .flat_map(|i| (0..num_buckets).map(move |j| (i, j))) - .map(|(worker, bucket)| metrics.poll_count_histogram_bucket_count(worker, bucket)) + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(task()).await.unwrap(); + } + }); + drop(rt); + // Account for the `block_on` task + let n = (0..metrics.num_workers()) + .map(|i| metrics.worker_poll_count(i)) .sum(); + assert_eq!(N, n); - } -} -#[test] -fn worker_poll_count_histogram_range() { - let max = Duration::from_nanos(u64::MAX); + let n: Duration = (0..metrics.num_workers()) + .map(|i| metrics.worker_mean_poll_time(i)) + .sum(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .enable_metrics_poll_count_histogram() - .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) - .metrics_poll_count_histogram_buckets(3) - .metrics_poll_count_histogram_resolution(us(50)) - .build() - .unwrap(); - let metrics = rt.metrics(); + assert!(n > Duration::default()); - assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..us(50)); - assert_eq!( - metrics.poll_count_histogram_bucket_range(1), - us(50)..us(100) - ); - assert_eq!(metrics.poll_count_histogram_bucket_range(2), us(100)..max); + // Does not populate the histogram + assert!(!metrics.poll_count_histogram_enabled()); + for n in 0..metrics.num_workers() { + for i in 0..10 { + assert_eq!(0, metrics.poll_count_histogram_bucket_count(n, i)); + } + } + } - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .enable_metrics_poll_count_histogram() - .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log) - .metrics_poll_count_histogram_buckets(3) - .metrics_poll_count_histogram_resolution(us(50)) - .build() - .unwrap(); - let metrics = rt.metrics(); + #[test] + fn worker_poll_count_histogram() { + const N: u64 = 5; + + let rts = [ + tokio::runtime::Builder::new_current_thread() + .enable_all() + .enable_metrics_poll_count_histogram() + .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) + .metrics_poll_count_histogram_buckets(3) + .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) + .build() + .unwrap(), + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .enable_metrics_poll_count_histogram() + .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) + .metrics_poll_count_histogram_buckets(3) + .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) + .build() + .unwrap(), + ]; + + for rt in rts { + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async {}).await.unwrap(); + } + }); + drop(rt); - let a = Duration::from_nanos(50000_u64.next_power_of_two()); - let b = a * 2; + let num_workers = metrics.num_workers(); + let num_buckets = metrics.poll_count_histogram_num_buckets(); - assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..a); - assert_eq!(metrics.poll_count_histogram_bucket_range(1), a..b); - assert_eq!(metrics.poll_count_histogram_bucket_range(2), b..max); -} + assert!(metrics.poll_count_histogram_enabled()); + assert_eq!(num_buckets, 3); -#[test] -fn worker_poll_count_histogram_disabled_without_explicit_enable() { - let rts = [ - tokio::runtime::Builder::new_current_thread() + let n = (0..num_workers) + .flat_map(|i| (0..num_buckets).map(move |j| (i, j))) + .map(|(worker, bucket)| metrics.poll_count_histogram_bucket_count(worker, bucket)) + .sum(); + assert_eq!(N, n); + } + } + + #[test] + fn worker_poll_count_histogram_range() { + let max = Duration::from_nanos(u64::MAX); + + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() + .enable_metrics_poll_count_histogram() .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) .metrics_poll_count_histogram_buckets(3) - .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) + .metrics_poll_count_histogram_resolution(us(50)) .build() - .unwrap(), - tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) + .unwrap(); + let metrics = rt.metrics(); + + assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..us(50)); + assert_eq!( + metrics.poll_count_histogram_bucket_range(1), + us(50)..us(100) + ); + assert_eq!(metrics.poll_count_histogram_bucket_range(2), us(100)..max); + + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) + .enable_metrics_poll_count_histogram() + .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log) .metrics_poll_count_histogram_buckets(3) - .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) + .metrics_poll_count_histogram_resolution(us(50)) .build() - .unwrap(), - ]; - - for rt in rts { + .unwrap(); let metrics = rt.metrics(); - assert!(!metrics.poll_count_histogram_enabled()); - } -} - -#[test] -fn worker_total_busy_duration() { - const N: usize = 5; - let zero = Duration::from_millis(0); + let a = Duration::from_nanos(50000_u64.next_power_of_two()); + let b = a * 2; - let rt = current_thread(); - let metrics = rt.metrics(); + assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..a); + assert_eq!(metrics.poll_count_histogram_bucket_range(1), a..b); + assert_eq!(metrics.poll_count_histogram_bucket_range(2), b..max); + } - rt.block_on(async { - for _ in 0..N { - tokio::spawn(async { - tokio::task::yield_now().await; - }) - .await - .unwrap(); + #[test] + fn worker_poll_count_histogram_disabled_without_explicit_enable() { + let rts = [ + tokio::runtime::Builder::new_current_thread() + .enable_all() + .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) + .metrics_poll_count_histogram_buckets(3) + .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) + .build() + .unwrap(), + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) + .metrics_poll_count_histogram_buckets(3) + .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) + .build() + .unwrap(), + ]; + + for rt in rts { + let metrics = rt.metrics(); + assert!(!metrics.poll_count_histogram_enabled()); } - }); + } - drop(rt); + #[test] + fn worker_total_busy_duration() { + const N: usize = 5; - assert!(zero < metrics.worker_total_busy_duration(0)); + let zero = Duration::from_millis(0); - let rt = threaded(); - let metrics = rt.metrics(); + let rt = current_thread(); + let metrics = rt.metrics(); - rt.block_on(async { - for _ in 0..N { - tokio::spawn(async { - tokio::task::yield_now().await; - }) - .await - .unwrap(); - } - }); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); - drop(rt); + drop(rt); - for i in 0..metrics.num_workers() { - assert!(zero < metrics.worker_total_busy_duration(i)); - } -} + assert!(zero < metrics.worker_total_busy_duration(0)); -#[test] -fn worker_local_schedule_count() { - let rt = current_thread(); - let metrics = rt.metrics(); - rt.block_on(async { - tokio::spawn(async {}).await.unwrap(); - }); - drop(rt); - - assert_eq!(1, metrics.worker_local_schedule_count(0)); - assert_eq!(0, metrics.remote_schedule_count()); + let rt = threaded(); + let metrics = rt.metrics(); - let rt = threaded(); - let metrics = rt.metrics(); - rt.block_on(async { - // Move to the runtime - tokio::spawn(async { - tokio::spawn(async {}).await.unwrap(); - }) - .await - .unwrap(); - }); - drop(rt); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); - let n: u64 = (0..metrics.num_workers()) - .map(|i| metrics.worker_local_schedule_count(i)) - .sum(); + drop(rt); - assert_eq!(2, n); - assert_eq!(1, metrics.remote_schedule_count()); -} + for i in 0..metrics.num_workers() { + assert!(zero < metrics.worker_total_busy_duration(i)); + } + } -#[test] -fn worker_overflow_count() { - // Only applies to the threaded worker - let rt = threaded(); - let metrics = rt.metrics(); - rt.block_on(async { - // Move to the runtime - tokio::spawn(async { - let (tx1, rx1) = std::sync::mpsc::channel(); - let (tx2, rx2) = std::sync::mpsc::channel(); - - // First, we need to block the other worker until all tasks have - // been spawned. - // - // We spawn from outside the runtime to ensure that the other worker - // will pick it up: - // - tokio::task::spawn_blocking(|| { - tokio::spawn(async move { - tx1.send(()).unwrap(); - rx2.recv().unwrap(); - }); - }); + #[test] + fn worker_local_schedule_count() { + let rt = current_thread(); + let metrics = rt.metrics(); + rt.block_on(async { + tokio::spawn(async {}).await.unwrap(); + }); + drop(rt); - rx1.recv().unwrap(); + assert_eq!(1, metrics.worker_local_schedule_count(0)); + assert_eq!(0, metrics.remote_schedule_count()); - // Spawn many tasks - for _ in 0..300 { - tokio::spawn(async {}); - } + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + // Move to the runtime + tokio::spawn(async { + tokio::spawn(async {}).await.unwrap(); + }) + .await + .unwrap(); + }); + drop(rt); - tx2.send(()).unwrap(); - }) - .await - .unwrap(); - }); - drop(rt); + let n: u64 = (0..metrics.num_workers()) + .map(|i| metrics.worker_local_schedule_count(i)) + .sum(); - let n: u64 = (0..metrics.num_workers()) - .map(|i| metrics.worker_overflow_count(i)) - .sum(); + assert_eq!(2, n); + assert_eq!(1, metrics.remote_schedule_count()); + } - assert_eq!(1, n); -} + #[test] + fn worker_overflow_count() { + // Only applies to the threaded worker + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + // Move to the runtime + tokio::spawn(async { + let (tx1, rx1) = std::sync::mpsc::channel(); + let (tx2, rx2) = std::sync::mpsc::channel(); + + // First, we need to block the other worker until all tasks have + // been spawned. + // + // We spawn from outside the runtime to ensure that the other worker + // will pick it up: + // + tokio::task::spawn_blocking(|| { + tokio::spawn(async move { + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); + }); -#[test] -fn injection_queue_depth_current_thread() { - use std::thread; + rx1.recv().unwrap(); - let rt = current_thread(); - let handle = rt.handle().clone(); - let metrics = rt.metrics(); + // Spawn many tasks + for _ in 0..300 { + tokio::spawn(async {}); + } - thread::spawn(move || { - handle.spawn(async {}); - }) - .join() - .unwrap(); + tx2.send(()).unwrap(); + }) + .await + .unwrap(); + }); + drop(rt); - assert_eq!(1, metrics.injection_queue_depth()); -} + let n: u64 = (0..metrics.num_workers()) + .map(|i| metrics.worker_overflow_count(i)) + .sum(); -#[test] -fn injection_queue_depth_multi_thread() { - let rt = threaded(); - let metrics = rt.metrics(); + assert_eq!(1, n); + } - let barrier1 = Arc::new(Barrier::new(3)); - let barrier2 = Arc::new(Barrier::new(3)); + #[test] + fn injection_queue_depth_current_thread() { + use std::thread; - // Spawn a task per runtime worker to block it. - for _ in 0..2 { - let barrier1 = barrier1.clone(); - let barrier2 = barrier2.clone(); - rt.spawn(async move { - barrier1.wait(); - barrier2.wait(); - }); - } + let rt = current_thread(); + let handle = rt.handle().clone(); + let metrics = rt.metrics(); - barrier1.wait(); + thread::spawn(move || { + handle.spawn(async {}); + }) + .join() + .unwrap(); - for i in 0..10 { - assert_eq!(i, metrics.injection_queue_depth()); - rt.spawn(async {}); + assert_eq!(1, metrics.injection_queue_depth()); } - barrier2.wait(); -} + #[test] + fn injection_queue_depth_multi_thread() { + let rt = threaded(); + let metrics = rt.metrics(); -#[test] -fn worker_local_queue_depth() { - const N: usize = 100; + let barrier1 = Arc::new(Barrier::new(3)); + let barrier2 = Arc::new(Barrier::new(3)); - let rt = current_thread(); - let metrics = rt.metrics(); - rt.block_on(async { - for _ in 0..N { - tokio::spawn(async {}); + // Spawn a task per runtime worker to block it. + for _ in 0..2 { + let barrier1 = barrier1.clone(); + let barrier2 = barrier2.clone(); + rt.spawn(async move { + barrier1.wait(); + barrier2.wait(); + }); } - assert_eq!(N, metrics.worker_local_queue_depth(0)); - }); + barrier1.wait(); - let rt = threaded(); - let metrics = rt.metrics(); - rt.block_on(async move { - // Move to the runtime - tokio::spawn(async move { - let (tx1, rx1) = std::sync::mpsc::channel(); - let (tx2, rx2) = std::sync::mpsc::channel(); - - // First, we need to block the other worker until all tasks have - // been spawned. - tokio::spawn(async move { - tx1.send(()).unwrap(); - rx2.recv().unwrap(); - }); + for i in 0..10 { + assert_eq!(i, metrics.injection_queue_depth()); + rt.spawn(async {}); + } - // Bump the next-run spawn - tokio::spawn(async {}); + barrier2.wait(); + } - rx1.recv().unwrap(); + #[test] + fn worker_local_queue_depth() { + const N: usize = 100; - // Spawn some tasks - for _ in 0..100 { + let rt = current_thread(); + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { tokio::spawn(async {}); } - let n: usize = (0..metrics.num_workers()) - .map(|i| metrics.worker_local_queue_depth(i)) - .sum(); + assert_eq!(N, metrics.worker_local_queue_depth(0)); + }); - assert_eq!(n, N); + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async move { + // Move to the runtime + tokio::spawn(async move { + let (tx1, rx1) = std::sync::mpsc::channel(); + let (tx2, rx2) = std::sync::mpsc::channel(); - tx2.send(()).unwrap(); - }) - .await - .unwrap(); - }); -} + // First, we need to block the other worker until all tasks have + // been spawned. + tokio::spawn(async move { + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); -#[test] -fn budget_exhaustion_yield() { - let rt = current_thread(); - let metrics = rt.metrics(); + // Bump the next-run spawn + tokio::spawn(async {}); - assert_eq!(0, metrics.budget_forced_yield_count()); + rx1.recv().unwrap(); - let mut did_yield = false; + // Spawn some tasks + for _ in 0..100 { + tokio::spawn(async {}); + } - // block on a task which consumes budget until it yields - rt.block_on(poll_fn(|cx| loop { - if did_yield { - return Poll::Ready(()); - } + let n: usize = (0..metrics.num_workers()) + .map(|i| metrics.worker_local_queue_depth(i)) + .sum(); - let fut = consume_budget(); - tokio::pin!(fut); + assert_eq!(n, N); - if fut.poll(cx).is_pending() { - did_yield = true; - return Poll::Pending; - } - })); + tx2.send(()).unwrap(); + }) + .await + .unwrap(); + }); + } - assert_eq!(1, rt.metrics().budget_forced_yield_count()); -} + #[test] + fn budget_exhaustion_yield() { + let rt = current_thread(); + let metrics = rt.metrics(); -#[test] -fn budget_exhaustion_yield_with_joins() { - let rt = current_thread(); - let metrics = rt.metrics(); + assert_eq!(0, metrics.budget_forced_yield_count()); - assert_eq!(0, metrics.budget_forced_yield_count()); + let mut did_yield = false; - let mut did_yield_1 = false; - let mut did_yield_2 = false; + // block on a task which consumes budget until it yields + rt.block_on(poll_fn(|cx| loop { + if did_yield { + return Poll::Ready(()); + } - // block on a task which consumes budget until it yields - rt.block_on(async { - tokio::join!( - poll_fn(|cx| loop { - if did_yield_1 { - return Poll::Ready(()); - } + let fut = consume_budget(); + tokio::pin!(fut); - let fut = consume_budget(); - tokio::pin!(fut); + if fut.poll(cx).is_pending() { + did_yield = true; + return Poll::Pending; + } + })); - if fut.poll(cx).is_pending() { - did_yield_1 = true; - return Poll::Pending; - } - }), - poll_fn(|cx| loop { - if did_yield_2 { - return Poll::Ready(()); - } + assert_eq!(1, rt.metrics().budget_forced_yield_count()); + } - let fut = consume_budget(); - tokio::pin!(fut); + #[test] + fn budget_exhaustion_yield_with_joins() { + let rt = current_thread(); + let metrics = rt.metrics(); - if fut.poll(cx).is_pending() { - did_yield_2 = true; - return Poll::Pending; - } - }) - ) - }); + assert_eq!(0, metrics.budget_forced_yield_count()); - assert_eq!(1, rt.metrics().budget_forced_yield_count()); -} + let mut did_yield_1 = false; + let mut did_yield_2 = false; -#[cfg(any(target_os = "linux", target_os = "macos"))] -#[test] -fn io_driver_fd_count() { - let rt = current_thread(); - let metrics = rt.metrics(); + // block on a task which consumes budget until it yields + rt.block_on(async { + tokio::join!( + poll_fn(|cx| loop { + if did_yield_1 { + return Poll::Ready(()); + } + + let fut = consume_budget(); + tokio::pin!(fut); + + if fut.poll(cx).is_pending() { + did_yield_1 = true; + return Poll::Pending; + } + }), + poll_fn(|cx| loop { + if did_yield_2 { + return Poll::Ready(()); + } + + let fut = consume_budget(); + tokio::pin!(fut); + + if fut.poll(cx).is_pending() { + did_yield_2 = true; + return Poll::Pending; + } + }) + ) + }); - assert_eq!(metrics.io_driver_fd_registered_count(), 0); + assert_eq!(1, rt.metrics().budget_forced_yield_count()); + } - let stream = tokio::net::TcpStream::connect("google.com:80"); - let stream = rt.block_on(async move { stream.await.unwrap() }); + #[cfg(any(target_os = "linux", target_os = "macos"))] + #[test] + fn io_driver_fd_count() { + let rt = current_thread(); + let metrics = rt.metrics(); - assert_eq!(metrics.io_driver_fd_registered_count(), 1); - assert_eq!(metrics.io_driver_fd_deregistered_count(), 0); + assert_eq!(metrics.io_driver_fd_registered_count(), 0); - drop(stream); + let stream = tokio::net::TcpStream::connect("google.com:80"); + let stream = rt.block_on(async move { stream.await.unwrap() }); - assert_eq!(metrics.io_driver_fd_deregistered_count(), 1); - assert_eq!(metrics.io_driver_fd_registered_count(), 1); -} + assert_eq!(metrics.io_driver_fd_registered_count(), 1); + assert_eq!(metrics.io_driver_fd_deregistered_count(), 0); -#[cfg(any(target_os = "linux", target_os = "macos"))] -#[test] -fn io_driver_ready_count() { - let rt = current_thread(); - let metrics = rt.metrics(); + drop(stream); - let stream = tokio::net::TcpStream::connect("google.com:80"); - let _stream = rt.block_on(async move { stream.await.unwrap() }); + assert_eq!(metrics.io_driver_fd_deregistered_count(), 1); + assert_eq!(metrics.io_driver_fd_registered_count(), 1); + } - assert_eq!(metrics.io_driver_ready_count(), 1); -} + #[cfg(any(target_os = "linux", target_os = "macos"))] + #[test] + fn io_driver_ready_count() { + let rt = current_thread(); + let metrics = rt.metrics(); -fn current_thread() -> Runtime { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() -} + let stream = tokio::net::TcpStream::connect("google.com:80"); + let _stream = rt.block_on(async move { stream.await.unwrap() }); -fn threaded() -> Runtime { - tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build() - .unwrap() -} + assert_eq!(metrics.io_driver_ready_count(), 1); + } -fn threaded_no_lifo() -> Runtime { - tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .disable_lifo_slot() - .enable_all() - .build() - .unwrap() -} + fn threaded_no_lifo() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .disable_lifo_slot() + .enable_all() + .build() + .unwrap() + } -fn us(n: u64) -> Duration { - Duration::from_micros(n) + fn us(n: u64) -> Duration { + Duration::from_micros(n) + } }