From 42fb829fdb2eab8f3ba6897903891eb7f1bdc949 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Mon, 17 Jul 2023 17:46:10 +0200 Subject: [PATCH 01/20] test(subscriber): add initial integration tests The `console-subscriber` crate has no integration tests. There are some unit tests, but without very high coverage of features. Recently, we've found or fixed a few errors which probably could have been caught by a medium level of integration testing. However, testing `console-subscriber` isn't straight forward. It is effectively a tracing subscriber (or layer) on one end, and a gRPC server on the other end. This change adds enough of a testing framework to write some initial integration tests. It is the first step towards closing #450. Each test comprises 2 parts: - One or more "expcted tasks" - A future which will be driven to completion on a dedicated Tokio runtime. Behind the scenes, a console subscriber layer is created and it's server part is connected to a duplex stream. The client of the duplex stream then records incoming updates and reconstructs "actual tasks". The layer itself is set as the default subscriber for the duration of `block_on` which is used to drive the provided future to completioin. The expected tasks have a set of "matches", which is how we find the actual task that we want to validate against. Currently, the only value we match on is the task's name. The expected tasks also have a set of expectations. These are other fields on the actual task which are validated once a matching task is found. Currently, the two fields which can have expectations set on them are the `wakes` and `self_wakes` fields. So, to construct an expected task, which will match a task with the name `"my-task"` and then validate that the matched task gets woken once, the code would be: ```rust ExpectedTask::default() .match_name("my-task") .expect_wakes(1); ``` A future which passes this test could be: ```rust async { task::Builder::new() .name("my-task") .spawn(async { tokio::time::sleep(std::time::Duration::ZERO).await }) } ``` The full test would then look like: ```rust fn wakes_once() { let expected_task = ExpectedTask::default() .match_name("my-task") .expect_wakes(1); let future = async { task::Builder::new() .name("my-task") .spawn(async { tokio::time::sleep(std::time::Duration::ZERO).await }) }; assert_task(expected_task, future); } ``` The PR depends on 2 others: - #447 which fixes an error in the logic that determines whether a task is retained in the aggregator or not. - #451 which exposes the server parts and is necessary to allow us to connect the instrument server and client via a duplex channel. This change contains some initial tests for wakes and self wakes which would have caught the error fixed in #430. Additionally there are tests for the functionality of the testing framework itself. --- Cargo.lock | 2 + console-subscriber/Cargo.toml | 1 + console-subscriber/src/aggregator/id_data.rs | 8 +- console-subscriber/tests/framework.rs | 184 ++++++++++ console-subscriber/tests/support/mod.rs | 47 +++ console-subscriber/tests/support/state.rs | 139 ++++++++ .../tests/support/subscriber.rs | 318 ++++++++++++++++++ console-subscriber/tests/support/task.rs | 228 +++++++++++++ console-subscriber/tests/wake.rs | 48 +++ 9 files changed, 971 insertions(+), 4 deletions(-) create mode 100644 console-subscriber/tests/framework.rs create mode 100644 console-subscriber/tests/support/mod.rs create mode 100644 console-subscriber/tests/support/state.rs create mode 100644 console-subscriber/tests/support/subscriber.rs create mode 100644 console-subscriber/tests/support/task.rs create mode 100644 console-subscriber/tests/wake.rs diff --git a/Cargo.lock b/Cargo.lock index 27d4c3088..2b6a44e0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -257,6 +257,7 @@ dependencies = [ name = "console-api" version = "0.5.0" dependencies = [ + "futures-core", "prost", "prost-build", "prost-types", @@ -283,6 +284,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tower", "tracing", "tracing-core", "tracing-subscriber", diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index d3c414e3b..b79e6e3d5 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -55,6 +55,7 @@ crossbeam-channel = "0.5" [dev-dependencies] tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] } +tower = "0.4" futures = "0.3" [package.metadata.docs.rs] diff --git a/console-subscriber/src/aggregator/id_data.rs b/console-subscriber/src/aggregator/id_data.rs index b9010b445..2ad2c74b0 100644 --- a/console-subscriber/src/aggregator/id_data.rs +++ b/console-subscriber/src/aggregator/id_data.rs @@ -104,18 +104,18 @@ impl IdData { if let Some(dropped_at) = stats.dropped_at() { let dropped_for = now.checked_duration_since(dropped_at).unwrap_or_default(); let dirty = stats.is_unsent(); - let should_drop = + let should_retain = // if there are any clients watching, retain all dirty tasks regardless of age (dirty && has_watchers) - || dropped_for > retention; + || dropped_for <= retention; tracing::trace!( stats.id = ?id, stats.dropped_at = ?dropped_at, stats.dropped_for = ?dropped_for, stats.dirty = dirty, - should_drop, + should_retain, ); - return !should_drop; + return should_retain; } true diff --git a/console-subscriber/tests/framework.rs b/console-subscriber/tests/framework.rs new file mode 100644 index 000000000..68bf2a0ce --- /dev/null +++ b/console-subscriber/tests/framework.rs @@ -0,0 +1,184 @@ +//! Framework tests +//! +//! The tests in this module are here to verify the testing framework itself. +//! As such, some of these tests may be repeated elsewhere (where we wish to +//! actually test the functionality of `console-subscriber`) and others are +//! negative tests that should panic. + +use std::time::Duration; + +use tokio::{task, time::sleep}; + +mod support; +use support::{assert_task, assert_tasks, ExpectedTask}; + +#[test] +fn expect_present() { + let expected_task = ExpectedTask::default() + .match_default_name() + .expect_present(); + + let future = async { + sleep(Duration::ZERO).await; + }; + + assert_task(expected_task, future); +} + +#[test] +#[should_panic(expected = "Test failed: Task validation failed: + - Task: no expectations set, if you want to just expect that a matching task is present, use `expect_present()` +")] +fn fail_no_expectations() { + let expected_task = ExpectedTask::default().match_default_name(); + + let future = async { + sleep(Duration::ZERO).await; + }; + + assert_task(expected_task, future); +} + +#[test] +fn wakes() { + let expected_task = ExpectedTask::default().match_default_name().expect_wakes(1); + + let future = async { + sleep(Duration::ZERO).await; + }; + + assert_task(expected_task, future); +} + +#[test] +#[should_panic(expected = "Test failed: Task validation failed: + - Task: expected `wakes` to be 5, but actual was 1 +")] +fn fail_wakes() { + let expected_task = ExpectedTask::default().match_default_name().expect_wakes(5); + + let future = async { + sleep(Duration::ZERO).await; + }; + + assert_task(expected_task, future); +} + +#[test] +fn self_wakes() { + let expected_task = ExpectedTask::default() + .match_default_name() + .expect_self_wakes(1); + + let future = async { task::yield_now().await }; + + assert_task(expected_task, future); +} + +#[test] +#[should_panic(expected = "Test failed: Task validation failed: + - Task: expected `self_wakes` to be 1, but actual was 0 +")] +fn fail_self_wake() { + let expected_task = ExpectedTask::default() + .match_default_name() + .expect_self_wakes(1); + + let future = async { + sleep(Duration::ZERO).await; + }; + + assert_task(expected_task, future); +} + +#[test] +fn test_spawned_task() { + let expected_task = ExpectedTask::default() + .match_name("another-name".into()) + .expect_present(); + + let future = async { + task::Builder::new() + .name("another-name") + .spawn(async { task::yield_now().await }) + }; + + assert_task(expected_task, future); +} + +#[test] +#[should_panic(expected = "Test failed: Task validation failed: + - Task: no matching actual task was found +")] +fn fail_wrong_task_name() { + let expected_task = ExpectedTask::default().match_name("wrong-name".into()); + + let future = async { task::yield_now().await }; + + assert_task(expected_task, future); +} + +#[test] +fn multiple_tasks() { + let expected_tasks = vec![ + ExpectedTask::default() + .match_name("task-1".into()) + .expect_wakes(1), + ExpectedTask::default() + .match_name("task-2".into()) + .expect_wakes(1), + ]; + + let future = async { + let task1 = task::Builder::new() + .name("task-1") + .spawn(async { task::yield_now().await }) + .unwrap(); + let task2 = task::Builder::new() + .name("task-2") + .spawn(async { task::yield_now().await }) + .unwrap(); + + tokio::try_join! { + task1, + task2, + } + .unwrap(); + }; + + assert_tasks(expected_tasks, future); +} + +#[test] +#[should_panic(expected = "Test failed: Task validation failed: + - Task: expected `wakes` to be 2, but actual was 1 +")] +fn fail_1_of_2_expected_tasks() { + let expected_tasks = vec![ + ExpectedTask::default() + .match_name("task-1".into()) + .expect_wakes(1), + ExpectedTask::default() + .match_name("task-2".into()) + .expect_wakes(2), + ]; + + let future = async { + let task1 = task::Builder::new() + .name("task-1") + .spawn(async { task::yield_now().await }) + .unwrap(); + let task2 = task::Builder::new() + .name("task-2") + .spawn(async { task::yield_now().await }) + .unwrap(); + + tokio::try_join! { + task1, + task2, + } + .unwrap(); + }; + + assert_tasks(expected_tasks, future); +} diff --git a/console-subscriber/tests/support/mod.rs b/console-subscriber/tests/support/mod.rs new file mode 100644 index 000000000..4937aff6a --- /dev/null +++ b/console-subscriber/tests/support/mod.rs @@ -0,0 +1,47 @@ +use futures::Future; + +mod state; +mod subscriber; +mod task; + +use subscriber::run_test; + +pub(crate) use subscriber::MAIN_TASK_NAME; +pub(crate) use task::ExpectedTask; + +/// Assert that an `expected_task` is recorded by a console-subscriber +/// when driving the provided `future` to completion. +/// +/// This function is equivalent to calling [`assert_tasks`] with a vector +/// containing a single task. +/// +/// # Panics +/// +/// This function will panic if the expectations on the expected task are not +/// met or if a matching task is not recorded. +#[track_caller] +#[allow(dead_code)] +pub(crate) fn assert_task(expected_task: ExpectedTask, future: Fut) +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + run_test(vec![expected_task], future) +} + +/// Assert that the `expected_tasks` are recorded by a console-subscriber +/// when driving the provided `future` to completion. +/// +/// # Panics +/// +/// This function will panic if the expectations on any of the expected tasks +/// are not met or if matching tasks are not recorded for all expected tasks. +#[track_caller] +#[allow(dead_code)] +pub(crate) fn assert_tasks(expected_tasks: Vec, future: Fut) +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + run_test(expected_tasks, future) +} diff --git a/console-subscriber/tests/support/state.rs b/console-subscriber/tests/support/state.rs new file mode 100644 index 000000000..6fc663808 --- /dev/null +++ b/console-subscriber/tests/support/state.rs @@ -0,0 +1,139 @@ +use std::fmt; + +use tokio::sync::broadcast::{ + self, + error::{RecvError, TryRecvError}, +}; + +/// A step in the running of the test +#[derive(Clone, Debug, PartialEq, PartialOrd)] +pub(super) enum TestStep { + /// The overall test has begun + Start, + /// The instrument server has been started + ServerStarted, + /// The client has connected to the instrument server + ClientConnected, + /// The future being driven has completed + TestFinished, + /// The client has finished recording updates + UpdatesRecorded, +} + +impl fmt::Display for TestStep { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (self as &dyn fmt::Debug).fmt(f) + } +} + +/// The state of the test. +/// +/// This struct is used by various parts of the test framework to wait until +/// a specific test step has been reached and advance the test state to a new +/// step. +pub(super) struct TestState { + receiver: broadcast::Receiver, + sender: broadcast::Sender, + step: TestStep, +} + +impl TestState { + pub(super) fn new() -> Self { + let (sender, receiver) = broadcast::channel(1); + Self { + receiver, + sender, + step: TestStep::Start, + } + } + + /// Block asynchronously until the desired step has been reached. + /// + /// # Panics + /// + /// This function will panic if the underlying channel gets closed. + pub(super) async fn wait_for_step(&mut self, desired_step: TestStep) { + loop { + if self.step >= desired_step { + break; + } + + match self.receiver.recv().await { + Ok(step) => self.step = step, + Err(RecvError::Lagged(_)) => { + // we don't mind being lagged, we'll just get the latest state + } + Err(RecvError::Closed) => { + panic!("failed to receive current step, waiting for step: {desired_step}, did the test abort?"); + } + } + } + } + + /// Check whether the desired step has been reached without blocking. + pub(super) fn try_wait_for_step(&mut self, desired_step: TestStep) -> bool { + self.update_step(); + + self.step == desired_step + } + + /// Advance to the next step. + /// + /// The test must be at the step prior to the next step before starting. + /// Being in a different step is likely to indicate a logic error in the + /// test framework. + /// + /// # Panics + /// + /// This method will panic if the test state is not at the step prior to + /// `next_step` or if the underlying channel is closed. + #[track_caller] + pub(super) fn advance_to_step(&mut self, next_step: TestStep) { + self.update_step(); + + if self.step >= next_step { + panic!( + "cannot advance to previous or current step! current step: {current}, next step: {next_step}", + current = self.step); + } + + match (&self.step, &next_step) { + (TestStep::Start, TestStep::ServerStarted) | + (TestStep::ServerStarted, TestStep::ClientConnected) | + (TestStep::ClientConnected, TestStep::TestFinished) | + (TestStep::TestFinished, TestStep::UpdatesRecorded) => {}, + (_, _) => panic!( + "cannot advance more than one step! current step: {current}, next step: {next_step}", + current = self.step), + } + + self.sender + .send(next_step) + .expect("failed to send the next test step, did the test abort?"); + } + + fn update_step(&mut self) { + loop { + match self.receiver.try_recv() { + Ok(step) => self.step = step, + Err(TryRecvError::Lagged(_)) => { + // we don't mind being lagged, we'll just get the latest state + } + Err(TryRecvError::Closed) => { + panic!("failed to update current step, did the test abort?") + } + Err(TryRecvError::Empty) => break, + } + } + } +} + +impl Clone for TestState { + fn clone(&self) -> Self { + Self { + receiver: self.receiver.resubscribe(), + sender: self.sender.clone(), + step: self.step.clone(), + } + } +} diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs new file mode 100644 index 000000000..36888ad5a --- /dev/null +++ b/console-subscriber/tests/support/subscriber.rs @@ -0,0 +1,318 @@ +use std::{collections::HashMap, fmt, future::Future, thread}; + +use console_api::{ + field::Value, + instrument::{instrument_client::InstrumentClient, InstrumentRequest}, +}; +use console_subscriber::ServerParts; +use futures::stream::StreamExt; +use tokio::{io::DuplexStream, task}; +use tonic::transport::{Channel, Endpoint, Server, Uri}; +use tower::service_fn; + +use super::state::{TestState, TestStep}; +use super::task::{ActualTask, ExpectedTask, TaskValidationFailure}; + +pub(crate) const MAIN_TASK_NAME: &str = "main"; + +#[derive(Debug)] +struct TestFailure { + failures: Vec, +} + +impl fmt::Display for TestFailure { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Task validation failed:\n")?; + for failure in &self.failures { + write!(f, " - {failure}\n")?; + } + Ok(()) + } +} + +/// Runs the test +/// +/// This function runs the whole test. It sets up a `console-subscriber` layer +/// together with the gRPC server and connects a client to it. The subscriber +/// is then used to record traces as the provided future is driven to +/// completion on a current thread tokio runtime. +/// +/// This function will panic if the expectations on any of the expected tasks +/// are not met or if matching tasks are not recorded for all expected tasks. +#[track_caller] +pub(super) fn run_test(expected_tasks: Vec, future: Fut) +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + use tracing_subscriber::prelude::*; + + let (client_stream, server_stream) = tokio::io::duplex(1024); + let (console_layer, server) = console_subscriber::ConsoleLayer::builder().build(); + let registry = tracing_subscriber::registry().with(console_layer); + + let mut test_state = TestState::new(); + let mut test_state_test = test_state.clone(); + + let join_handle = thread::Builder::new() + .name("console::subscriber".into()) + .spawn(move || { + let _subscriber_guard = + tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default()); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .expect("console-test error: failed to initialize console subscriber runtime"); + + runtime.block_on(async move { + task::Builder::new() + .name("console::serve") + .spawn(console_server(server, server_stream, test_state.clone())) + .expect("console-test error: could not spawn 'console-server' task"); + + let actual_tasks = task::Builder::new() + .name("console::client") + .spawn(console_client(client_stream, test_state.clone())) + .expect("console-test error: could not spawn 'console-client' task") + .await + .expect("console-test error: failed to await 'console-client' task"); + + test_state.advance_to_step(TestStep::UpdatesRecorded); + actual_tasks + }) + }) + .expect("console subscriber could not spawn thread"); + + tracing::subscriber::with_default(registry, || { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async move { + test_state_test + .wait_for_step(TestStep::ClientConnected) + .await; + + // Run the future that we are testing. + _ = tokio::task::Builder::new() + .name(MAIN_TASK_NAME) + .spawn(future) + .expect("console-test error: couldn't spawn test task") + .await; + test_state_test.advance_to_step(TestStep::TestFinished); + + test_state_test + .wait_for_step(TestStep::UpdatesRecorded) + .await; + }); + }); + + let actual_tasks = join_handle + .join() + .expect("console-test error: failed to join 'console-subscriber' thread"); + + if let Err(test_failure) = validate_expected_tasks(expected_tasks, actual_tasks) { + panic!("Test failed: {test_failure}") + } +} + +/// Starts the console server. +/// +/// The server will start serving over its side of the duplex stream. +/// +/// Once the server gets spawned into its task, the test state is advanced +/// to the `ServerStarted` step. This function will then wait until the test +/// state reaches the `UpdatesRecorded` step (indicating that all validation of the +/// received updates has been completed) before dropping the aggregator. +/// +/// # Test State +/// +/// 1. Advances to: `ServerStarted` +/// 2. Waits for: `UpdatesRecorded` +async fn console_server( + server: console_subscriber::Server, + server_stream: DuplexStream, + mut test_state: TestState, +) { + let ServerParts { + instrument_server: service, + aggregator, + .. + } = server.into_parts(); + let aggregate = task::Builder::new() + .name("console::aggregate") + .spawn(aggregator.run()) + .expect("client-console error: couldn't spawn aggregator"); + Server::builder() + .add_service(service) + .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( + server_stream, + )])) + .await + .expect("client-console error: couldn't start instrument server."); + test_state.advance_to_step(TestStep::ServerStarted); + + test_state.wait_for_step(TestStep::UpdatesRecorded).await; + aggregate.abort(); +} + +/// Starts the console client and validates the expected tasks. +/// +/// First we wait until the server has started (test step `ServerStarted`), then +/// the client is connected to its half of the duplex stream and we start recording +/// the actual tasks. +/// +/// Once recording finishes (see [`record_actual_tasks()`] for details on the test +/// state condition), the actual tasks returned. +/// +/// # Test State +/// +/// 1. Waits for: `ServerStarted` +/// 2. Advances to: `ClientConnected` +async fn console_client(client_stream: DuplexStream, mut test_state: TestState) -> Vec { + test_state.wait_for_step(TestStep::ServerStarted).await; + + let mut client_stream = Some(client_stream); + let channel = Endpoint::try_from("http://[::]:6669") + .expect("Could not create endpoint") + .connect_with_connector(service_fn(move |_: Uri| { + let client = client_stream.take(); + + async move { + if let Some(client) = client { + Ok(client) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Client already taken", + )) + } + } + })) + .await + .expect("client-console error: couldn't create client"); + test_state.advance_to_step(TestStep::ClientConnected); + + record_actual_tasks(channel, test_state.clone()).await +} + +/// Records the actual tasks which are received by the client channel. +/// +/// Updates will be received until the test state reaches the `TestFinished` step +/// (indicating that the test itself has finished running), at which point we wait +/// for a final update before returning all the actual tasks which were recorded. +/// +/// # Test State +/// +/// 1. Waits for: `TestFinished` +async fn record_actual_tasks( + client_channel: Channel, + mut test_state: TestState, +) -> Vec { + let mut client = InstrumentClient::new(client_channel); + + let mut stream = loop { + let request = tonic::Request::new(InstrumentRequest {}); + match client.watch_updates(request).await { + Ok(stream) => break stream.into_inner(), + Err(err) => panic!("Client cannot connect to watch updates: {err}"), + } + }; + + let mut tasks = HashMap::new(); + + let mut last_update = false; + while let Some(update) = stream.next().await { + match update { + Ok(update) => { + if let Some(task_update) = &update.task_update { + for new_task in &task_update.new_tasks { + if let Some(id) = &new_task.id { + let mut actual_task = ActualTask::new(id.id); + for field in &new_task.fields { + if let Some(console_api::field::Name::StrName(field_name)) = + &field.name + { + if field_name == "task.name" { + actual_task.name = match &field.value { + Some(Value::DebugVal(value)) => Some(value.clone()), + Some(Value::StrVal(value)) => Some(value.clone()), + _ => None, // Anything that isn't string-like shouldn't be used as a name. + }; + } + } + } + tasks.insert(actual_task.id, actual_task); + } + } + + for (id, stats) in &task_update.stats_update { + if let Some(mut task) = tasks.get_mut(id) { + task.wakes = stats.wakes; + task.self_wakes = stats.self_wakes; + } + } + } + } + Err(e) => { + panic!("update stream error: {}", e); + } + } + + if last_update { + break; + } + + if test_state.try_wait_for_step(TestStep::TestFinished) { + // Once the test finishes running, we will get one further update and finish. + last_update = true; + } + } + + tasks.into_values().collect() +} + +/// Validate the expected tasks against the actual tasks. +/// +/// Each expected task is checked in turn. +/// +/// A matching actual task is searched for. If one is found it, the +/// expected task is validated against the actual task. +/// +/// Any validation errors result in failure. If no matches +fn validate_expected_tasks( + expected_tasks: Vec, + actual_tasks: Vec, +) -> Result<(), TestFailure> { + let failures: Vec<_> = expected_tasks + .iter() + .map(|expected| validate_expected_task(expected, &actual_tasks)) + .filter_map(|r| match r { + Ok(_) => None, + Err(validation_error) => Some(validation_error), + }) + .collect(); + + if failures.is_empty() { + Ok(()) + } else { + Err(TestFailure { failures: failures }) + } +} + +fn validate_expected_task( + expected: &ExpectedTask, + actual_tasks: &Vec, +) -> Result<(), TaskValidationFailure> { + for actual in actual_tasks { + if expected.matches_actual_task(actual) { + // We only match a single task. + // FIXME(hds): We should probably create an error or a warning if multiple tasks match. + return expected.validate_actual_task(actual); + } + } + + expected.no_match_error() +} diff --git a/console-subscriber/tests/support/task.rs b/console-subscriber/tests/support/task.rs new file mode 100644 index 000000000..6df878b1b --- /dev/null +++ b/console-subscriber/tests/support/task.rs @@ -0,0 +1,228 @@ +use std::{error, fmt}; + +use super::MAIN_TASK_NAME; + +/// An actual task +/// +/// This struct contains the values recorded from the console subscriber +/// client and represents what is known about an actual task running on +/// the test's runtime. +#[derive(Clone, Debug)] +pub(super) struct ActualTask { + pub(super) id: u64, + pub(super) name: Option, + pub(super) wakes: u64, + pub(super) self_wakes: u64, +} + +impl ActualTask { + pub(super) fn new(id: u64) -> Self { + Self { + id, + name: None, + wakes: 0, + self_wakes: 0, + } + } +} + +/// An error in task validation. +pub(super) struct TaskValidationFailure { + /// The expected task whose expectations were not met. + expected: ExpectedTask, + /// The actual task which failed the validation + actual: Option, + /// A textual description of the validation failure + failure: String, +} + +impl error::Error for TaskValidationFailure {} + +impl fmt::Display for TaskValidationFailure { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.failure) + } +} + +impl fmt::Debug for TaskValidationFailure { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.actual { + Some(actual) => write!( + f, + "Task Validation Failed!\n Expected Task: {expected:?}\n Actual Task: {actual:?}\n Failure: {failure}", + expected = self.expected, failure = self.failure), + None => write!( + f, + "Task Validation Failed!\n Expected Task: {expected:?}\n Failure: {failure}", + expected = self.expected, failure = self.failure), + } + } +} + +/// An expected task. +/// +/// This struct contains the fields that an expected task will attempt to match +/// actual tasks on, as well as the expectations that will be used to validate +/// which the actual task is as expected. +#[derive(Clone, Debug)] +pub(crate) struct ExpectedTask { + match_name: Option, + + expect_present: Option, + expect_wakes: Option, + expect_self_wakes: Option, +} + +impl Default for ExpectedTask { + fn default() -> Self { + Self { + match_name: None, + expect_present: None, + expect_wakes: None, + expect_self_wakes: None, + } + } +} + +impl ExpectedTask { + /// Returns whether or not an actual task matches this expected task. + /// + /// All matching rules will be run, if they all succeed, then `true` will + /// be returned, otherwise `false`. + pub(super) fn matches_actual_task(&self, actual_task: &ActualTask) -> bool { + if let Some(match_name) = &self.match_name { + if Some(match_name) == actual_task.name.as_ref() { + return true; + } + } + + false + } + + /// Returns an error specifying that no match was found for this expected + /// task. + pub(super) fn no_match_error(&self) -> Result<(), TaskValidationFailure> { + Err(TaskValidationFailure { + expected: self.clone(), + actual: None, + failure: format!("{self}: no matching actual task was found"), + }) + } + + /// Validates all expectations against the provided actual task. + /// + /// No check that the actual task matches is performed. That must have been + /// done prior. + /// + /// If all expections are met, this method returns `Ok(())`. If any + /// expectations are not met, then the first incorrect expectation will + /// be returned as an `Err`. + pub(super) fn validate_actual_task( + &self, + actual_task: &ActualTask, + ) -> Result<(), TaskValidationFailure> { + let mut no_expectations = true; + if let Some(_expected) = self.expect_present { + no_expectations = false; + } + + if let Some(expected_wakes) = self.expect_wakes { + no_expectations = false; + if expected_wakes != actual_task.wakes { + return Err(TaskValidationFailure { + expected: self.clone(), + actual: Some(actual_task.clone()), + failure: format!( + "{self}: expected `wakes` to be {expected_wakes}, but actual was {actual_wakes}", + actual_wakes = actual_task.wakes), + }); + } + } + + if let Some(expected_self_wakes) = self.expect_self_wakes { + no_expectations = false; + if expected_self_wakes != actual_task.self_wakes { + return Err(TaskValidationFailure { + expected: self.clone(), + actual: Some(actual_task.clone()), + failure: format!( + "{self}: expected `self_wakes` to be {expected_self_wakes}, but actual was {actual_self_wakes}", + actual_self_wakes = actual_task.self_wakes), + }); + } + } + + if no_expectations { + return Err(TaskValidationFailure { + expected: self.clone(), + actual: Some(actual_task.clone()), + failure: format!( + "{self}: no expectations set, if you want to just expect that a matching task is present, use `expect_present()`") + }); + } + + Ok(()) + } + + /// Matches tasks by name. + /// + /// To match this expected task, an actual task must have the name `name`. + #[allow(dead_code)] + pub(crate) fn match_name(mut self, name: String) -> Self { + self.match_name = Some(name); + self + } + + /// Matches tasks by the default task name. + /// + /// To match this expected task, an actual task must have the default name + /// assigned to the task which runs the future provided to [`assert_task`] + /// or [`assert_tasks`]. + /// + /// [`assert_task`]: fn@support::assert_task + /// [`assert_tasks`]: fn@support::assert_tasks + #[allow(dead_code)] + pub(crate) fn match_default_name(mut self) -> Self { + self.match_name = Some(MAIN_TASK_NAME.into()); + self + } + + /// Expects that a task is present. + /// + /// To validate, an actual task matching this expected task must be found. + #[allow(dead_code)] + pub(crate) fn expect_present(mut self) -> Self { + self.expect_present = Some(true); + self + } + + /// Expects that a task has a specific value for `wakes`. + /// + /// To validate, the actual task matching this expected task must have + /// a count of wakes equal to `wakes`. + #[allow(dead_code)] + pub(crate) fn expect_wakes(mut self, wakes: u64) -> Self { + self.expect_wakes = Some(wakes); + self + } + + /// Expects that a task has a specific value for `self_wakes`. + /// + /// To validate, the actual task matching this expected task must have + /// a count of self wakes equal to `self_wakes`. + #[allow(dead_code)] + pub(crate) fn expect_self_wakes(mut self, self_wakes: u64) -> Self { + self.expect_self_wakes = Some(self_wakes); + self + } +} + +impl fmt::Display for ExpectedTask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let fields = match &self.match_name { + Some(name) => format!("name={name}"), + None => "(no fields to match on)".into(), + }; + write!(f, "Task<{fields}>") + } +} diff --git a/console-subscriber/tests/wake.rs b/console-subscriber/tests/wake.rs new file mode 100644 index 000000000..e64e87a6e --- /dev/null +++ b/console-subscriber/tests/wake.rs @@ -0,0 +1,48 @@ +mod support; +use std::time::Duration; + +use support::{assert_task, ExpectedTask}; +use tokio::{task, time::sleep}; + +#[test] +fn sleep_wakes() { + let expected_task = ExpectedTask::default() + .match_default_name() + .expect_wakes(1) + .expect_self_wakes(0); + + let future = async { + sleep(Duration::ZERO).await; + }; + + assert_task(expected_task, future); +} + +#[test] +fn double_sleep_wakes() { + let expected_task = ExpectedTask::default() + .match_default_name() + .expect_wakes(2) + .expect_self_wakes(0); + + let future = async { + sleep(Duration::ZERO).await; + sleep(Duration::ZERO).await; + }; + + assert_task(expected_task, future); +} + +#[test] +fn self_wake() { + let expected_task = ExpectedTask::default() + .match_default_name() + .expect_wakes(1) + .expect_self_wakes(1); + + let future = async { + task::yield_now().await; + }; + + assert_task(expected_task, future); +} From 813d02099d78ccee8fba1771c171ba4704602fba Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Tue, 1 Aug 2023 18:14:02 +0200 Subject: [PATCH 02/20] fixed warning (unneeded `mut`) --- console-subscriber/tests/support/subscriber.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 36888ad5a..4c1bd928e 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -249,7 +249,7 @@ async fn record_actual_tasks( } for (id, stats) in &task_update.stats_update { - if let Some(mut task) = tasks.get_mut(id) { + if let Some(task) = tasks.get_mut(id) { task.wakes = stats.wakes; task.self_wakes = stats.self_wakes; } From 82abe73534a9450c53abc074314e03856483210f Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 17 Aug 2023 13:11:33 +0200 Subject: [PATCH 03/20] wait for additional update from subscriber After the test ends, we were waiting for a single further update before evaluating the actual tasks (vs. the expected tasks). Now we wait for 2 updates. --- .github/workflows/ci.yaml | 25 ++++++------- console-subscriber/src/aggregator/mod.rs | 21 ++++++++++- console-subscriber/tests/support/state.rs | 17 +++++++++ .../tests/support/subscriber.rs | 36 +++++++++++++++---- 4 files changed, 79 insertions(+), 20 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d0c01f7f1..d5f738aab 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -48,16 +48,17 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, macos-latest, windows-latest] + # os: [ubuntu-latest, macos-latest, windows-latest] + os: [macos-latest] rust: [stable] - include: - # Make 1.60 MSRV, as it's Tonic 0.9's MSRV. - - rust: 1.60.0 - os: ubuntu-latest + # include: + # # Make 1.60 MSRV, as it's Tonic 0.9's MSRV. + # - rust: 1.60.0 + # os: ubuntu-latest # Try to build on the latest nightly. This job is allowed to fail, but # it's useful to help catch bugs in upcoming Rust versions. - - rust: nightly - os: ubuntu-latest + # - rust: nightly + # os: ubuntu-latest steps: - name: Checkout sources uses: actions/checkout@v3 @@ -73,14 +74,14 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} - - name: Run cargo test (API) - run: cargo test -p console-api + # - name: Run cargo test (API) + # run: cargo test -p console-api - name: Run cargo test (subscriber) - run: cargo test -p console-subscriber + run: cargo test -p console-subscriber - - name: Run cargo test (console) - run: cargo test -p tokio-console --locked + # - name: Run cargo test (console) + # run: cargo test -p tokio-console --locked lints: name: Lints diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 4496cba28..fbb82e2a8 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -221,9 +221,21 @@ impl Aggregator { // to be woken when the flush interval has elapsed, or when the // channel is almost full. let mut drained = false; + let mut count_async_op = 0; + let mut count_metadata = 0; + let mut count_poll_op = 0; + let mut count_resource = 0; + let mut count_spawn = 0; while let Some(event) = self.events.recv().now_or_never() { match event { Some(event) => { + match &event { + Event::AsyncResourceOp { .. } => count_async_op += 1, + Event::Metadata(_) => count_metadata += 1, + Event::PollOp { .. } => count_poll_op += 1, + Event::Resource { .. } => count_resource += 1, + Event::Spawn { .. } => count_spawn += 1, + } self.update_state(event); drained = true; } @@ -235,7 +247,14 @@ impl Aggregator { } }; } - + tracing::debug!( + count_async_op, + count_metadata, + count_poll_op, + count_resource, + count_spawn, + "received events" + ); // flush data to clients, if there are any currently subscribed // watchers and we should send a new update. if !self.watchers.is_empty() && should_send { diff --git a/console-subscriber/tests/support/state.rs b/console-subscriber/tests/support/state.rs index 6fc663808..c424e8308 100644 --- a/console-subscriber/tests/support/state.rs +++ b/console-subscriber/tests/support/state.rs @@ -53,6 +53,13 @@ impl TestState { /// /// This function will panic if the underlying channel gets closed. pub(super) async fn wait_for_step(&mut self, desired_step: TestStep) { + self.update_step(); + tracing::info!( + target: "console_test::support::state", + "wait_for_step: {current} -> {desired_step}", + current = self.step, + ); + loop { if self.step >= desired_step { break; @@ -73,6 +80,11 @@ impl TestState { /// Check whether the desired step has been reached without blocking. pub(super) fn try_wait_for_step(&mut self, desired_step: TestStep) -> bool { self.update_step(); + tracing::info!( + target: "console_test::support::state", + "try_wait_for_step: {current} -> {desired_step}", + current = self.step, + ); self.step == desired_step } @@ -90,6 +102,11 @@ impl TestState { #[track_caller] pub(super) fn advance_to_step(&mut self, next_step: TestStep) { self.update_step(); + tracing::info!( + target: "console_test::support::state", + "advance_to_step: {current} -> {next_step}", + current = self.step, + ); if self.step >= next_step { panic!( diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 4c1bd928e..174221b58 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -9,6 +9,8 @@ use futures::stream::StreamExt; use tokio::{io::DuplexStream, task}; use tonic::transport::{Channel, Endpoint, Server, Uri}; use tower::service_fn; +use tracing_core::dispatcher::DefaultGuard; +use tracing_subscriber::EnvFilter; use super::state::{TestState, TestStep}; use super::task::{ActualTask, ExpectedTask, TaskValidationFailure}; @@ -30,6 +32,15 @@ impl fmt::Display for TestFailure { } } +fn set_debug_subscriber() -> DefaultGuard { + let subscriber = tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::builder().parse_lossy("console_subscriber=trace,console_test=info,info"), + ) + .finish(); + tracing::subscriber::set_default(subscriber) +} + /// Runs the test /// /// This function runs the whole test. It sets up a `console-subscriber` layer @@ -45,8 +56,12 @@ where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { + let _subscriber_guard = set_debug_subscriber(); use tracing_subscriber::prelude::*; + print!("\n\n"); + tracing::info!(target: "console_test::support", ?expected_tasks, "run_test"); + let (client_stream, server_stream) = tokio::io::duplex(1024); let (console_layer, server) = console_subscriber::ConsoleLayer::builder().build(); let registry = tracing_subscriber::registry().with(console_layer); @@ -57,8 +72,8 @@ where let join_handle = thread::Builder::new() .name("console::subscriber".into()) .spawn(move || { - let _subscriber_guard = - tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default()); + let _subscriber_guard = set_debug_subscriber(); + let runtime = tokio::runtime::Builder::new_current_thread() .enable_io() .enable_time() @@ -223,7 +238,7 @@ async fn record_actual_tasks( let mut tasks = HashMap::new(); - let mut last_update = false; + let mut last_updates: Option = None; while let Some(update) = stream.next().await { match update { Ok(update) => { @@ -261,13 +276,20 @@ async fn record_actual_tasks( } } - if last_update { - break; + if let Some(count) = last_updates.as_mut() { + tracing::info!(count, "last updates"); + *count -= 1; + + if *count <= 0 { + break; + } + continue; } if test_state.try_wait_for_step(TestStep::TestFinished) { - // Once the test finishes running, we will get one further update and finish. - last_update = true; + // Once the test finishes running, we will check for 2 further updates and then finish. + // FIXME(hds): Why 2? Is there some way we can do this more deterministically? + last_updates = Some(2); } } From 5d3245d54a9837cd4dffcc52706b0f475d8c46f2 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 17 Aug 2023 13:31:28 +0200 Subject: [PATCH 04/20] single threaded tests to debug --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d5f738aab..7434cbdfb 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -78,7 +78,7 @@ jobs: # run: cargo test -p console-api - name: Run cargo test (subscriber) - run: cargo test -p console-subscriber + run: cargo test -p console-subscriber -- --nocapture --test-threads=1 # - name: Run cargo test (console) # run: cargo test -p tokio-console --locked From c1a256585e0f049793d09f700095ee7e5445ef35 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 18 Aug 2023 18:45:04 +0200 Subject: [PATCH 05/20] some more debugging stuff --- console-subscriber/src/aggregator/mod.rs | 11 +++++- console-subscriber/src/lib.rs | 15 ++++++++ .../tests/support/subscriber.rs | 35 +++++++++++++++---- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index fbb82e2a8..20d5c5360 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -234,7 +234,16 @@ impl Aggregator { Event::Metadata(_) => count_metadata += 1, Event::PollOp { .. } => count_poll_op += 1, Event::Resource { .. } => count_resource += 1, - Event::Spawn { .. } => count_spawn += 1, + Event::Spawn { + id, + metadata: _, + stats: _, + fields, + location: _, + } => { + tracing::debug!(?id, ?fields, "spawn"); + count_spawn += 1; + } } self.update_state(event); drained = true; diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 8df105cce..d13d73158 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -457,6 +457,10 @@ impl ConsoleLayer { Err(TrySendError::Closed(_)) => { // we should warn here eventually, but nop for now because we // can't trigger tracing events... + println!( + "send_stats<{obj_type}> closed", + obj_type = std::any::type_name::(), + ); None } Err(TrySendError::Full(_)) => { @@ -470,6 +474,17 @@ impl ConsoleLayer { }; let capacity = self.tx.capacity(); + let stats_type = std::any::type_name::(); + if stats_type != "()" { + let time = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("whoops"); + println!( + "{timestamp:.6} send_stats capacity={capacity} did_send={did_send} will_flush={will_flush} stats_type={stats_type}", + timestamp = time.as_secs_f64(), + did_send = sent.is_some(), + will_flush = (capacity <= self.flush_under_capacity)); + } if capacity <= self.flush_under_capacity { self.shared.flush.trigger(); } diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 174221b58..5db41aef9 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt, future::Future, thread}; +use std::{collections::HashMap, fmt, fs::File, future::Future, panic, thread}; use console_api::{ field::Value, @@ -32,10 +32,11 @@ impl fmt::Display for TestFailure { } } -fn set_debug_subscriber() -> DefaultGuard { +fn set_debug_subscriber(file: File) -> DefaultGuard { let subscriber = tracing_subscriber::fmt() + .with_writer(file) .with_env_filter( - EnvFilter::builder().parse_lossy("console_subscriber=trace,console_test=info,info"), + EnvFilter::builder().parse_lossy("console_subscriber=debug,console_test=info,info"), ) .finish(); tracing::subscriber::set_default(subscriber) @@ -56,7 +57,25 @@ where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - let _subscriber_guard = set_debug_subscriber(); + let caller = std::panic::Location::caller(); + let caller_file = match caller.file().rfind("/") { + Some(index) => { + if index >= caller.file().len() { + "weird" + } else { + &caller.file()[index + 1..] + } + } + None => caller.file(), + }; + let source_line = std::panic::Location::caller().line(); + let writer = File::create(format!( + "console_test_log-file_{}-line_{}-run_test.log", + caller_file, source_line + )) + .unwrap(); + let _subscriber_guard = + set_debug_subscriber(writer.try_clone().expect("couldn't clone file handle")); use tracing_subscriber::prelude::*; print!("\n\n"); @@ -72,7 +91,7 @@ where let join_handle = thread::Builder::new() .name("console::subscriber".into()) .spawn(move || { - let _subscriber_guard = set_debug_subscriber(); + let _subscriber_guard = set_debug_subscriber(writer); let runtime = tokio::runtime::Builder::new_current_thread() .enable_io() @@ -129,7 +148,10 @@ where .expect("console-test error: failed to join 'console-subscriber' thread"); if let Err(test_failure) = validate_expected_tasks(expected_tasks, actual_tasks) { + tracing::info!(target: "console_test::support", "Test failed: {test_failure}", test_failure = test_failure); panic!("Test failed: {test_failure}") + } else { + tracing::info!(target: "console_test::support", "Test passed"); } } @@ -277,9 +299,8 @@ async fn record_actual_tasks( } if let Some(count) = last_updates.as_mut() { - tracing::info!(count, "last updates"); *count -= 1; - + tracing::info!(count, "last updates"); if *count <= 0 { break; } From b56c12392f768ecdcbef99e3b58836112c68a422 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 23 Aug 2023 16:14:59 +0200 Subject: [PATCH 06/20] Trace to stdout for CI debugging sigh. --- console-subscriber/tests/support/subscriber.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 5db41aef9..d531a1824 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -32,9 +32,9 @@ impl fmt::Display for TestFailure { } } -fn set_debug_subscriber(file: File) -> DefaultGuard { +fn set_debug_subscriber(_file: File) -> DefaultGuard { let subscriber = tracing_subscriber::fmt() - .with_writer(file) + // .with_writer(file) .with_env_filter( EnvFilter::builder().parse_lossy("console_subscriber=debug,console_test=info,info"), ) @@ -77,6 +77,8 @@ where let _subscriber_guard = set_debug_subscriber(writer.try_clone().expect("couldn't clone file handle")); use tracing_subscriber::prelude::*; + let span = tracing::info_span!(target: "console_test::support", "run_test", file = %caller_file, line = source_line); + let _span_guard = span.enter(); print!("\n\n"); tracing::info!(target: "console_test::support", ?expected_tasks, "run_test"); @@ -92,6 +94,8 @@ where .name("console::subscriber".into()) .spawn(move || { let _subscriber_guard = set_debug_subscriber(writer); + let span = tracing::info_span!(target: "console_test::support", "run_test", file = %caller_file, line = source_line); + let _span_guard = span.enter(); let runtime = tokio::runtime::Builder::new_current_thread() .enable_io() From 380345ad53aaaaa11adb3477e428196d354bc75b Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 23 Aug 2023 16:16:13 +0200 Subject: [PATCH 07/20] enable all OSes --- .github/workflows/ci.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7434cbdfb..30c93fd0e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -48,8 +48,8 @@ jobs: strategy: fail-fast: false matrix: - # os: [ubuntu-latest, macos-latest, windows-latest] - os: [macos-latest] + os: [ubuntu-latest, macos-latest, windows-latest] + # os: [macos-latest] rust: [stable] # include: # # Make 1.60 MSRV, as it's Tonic 0.9's MSRV. From 695644e178e00c187f7497a8b41460e6060cff86 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 23 Aug 2023 17:21:02 +0200 Subject: [PATCH 08/20] bit more tracing --- console-subscriber/tests/support/subscriber.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index d531a1824..deb84e8cf 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -134,11 +134,13 @@ where .await; // Run the future that we are testing. + tracing::info!("Future under test: will spawn."); _ = tokio::task::Builder::new() .name(MAIN_TASK_NAME) .spawn(future) .expect("console-test error: couldn't spawn test task") .await; + tracing::info!("Future under test: completed."); test_state_test.advance_to_step(TestStep::TestFinished); test_state_test From bd9c3159668a660616ae76d0e3e8413c5bcd7602 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 24 Aug 2023 13:38:18 +0200 Subject: [PATCH 09/20] output traces from within runtime under test to console --- console-subscriber/tests/support/subscriber.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index deb84e8cf..1532c03da 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -85,7 +85,13 @@ where let (client_stream, server_stream) = tokio::io::duplex(1024); let (console_layer, server) = console_subscriber::ConsoleLayer::builder().build(); - let registry = tracing_subscriber::registry().with(console_layer); + let fmt_layer = tracing_subscriber::fmt::layer().with_filter( + EnvFilter::builder().parse_lossy("console_subscriber=debug,console_test=info,info"), + ); + + let registry = tracing_subscriber::registry() + .with(console_layer) + .with(fmt_layer); let mut test_state = TestState::new(); let mut test_state_test = test_state.clone(); @@ -129,6 +135,9 @@ where .unwrap(); runtime.block_on(async move { + let span = tracing::info_span!(target: "console_test::support", "run_test", file = %caller_file, line = source_line); + let _span_guard = span.enter(); + test_state_test .wait_for_step(TestStep::ClientConnected) .await; From a61bdd63375578b40720493665e8d8647621fa1c Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 24 Aug 2023 16:12:54 +0200 Subject: [PATCH 10/20] confirming a suspicion --- console-subscriber/tests/support/state.rs | 33 ++++++++++++++++++----- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/console-subscriber/tests/support/state.rs b/console-subscriber/tests/support/state.rs index c424e8308..f5a3ca51c 100644 --- a/console-subscriber/tests/support/state.rs +++ b/console-subscriber/tests/support/state.rs @@ -53,7 +53,10 @@ impl TestState { /// /// This function will panic if the underlying channel gets closed. pub(super) async fn wait_for_step(&mut self, desired_step: TestStep) { - self.update_step(); + { + let _guard = tracing::info_span!("wait_for_step").entered(); + self.update_step(); + } tracing::info!( target: "console_test::support::state", "wait_for_step: {current} -> {desired_step}", @@ -79,7 +82,10 @@ impl TestState { /// Check whether the desired step has been reached without blocking. pub(super) fn try_wait_for_step(&mut self, desired_step: TestStep) -> bool { - self.update_step(); + { + let _guard = tracing::info_span!("try_wait_for_step").entered(); + self.update_step(); + } tracing::info!( target: "console_test::support::state", "try_wait_for_step: {current} -> {desired_step}", @@ -101,7 +107,10 @@ impl TestState { /// `next_step` or if the underlying channel is closed. #[track_caller] pub(super) fn advance_to_step(&mut self, next_step: TestStep) { - self.update_step(); + { + let _guard = tracing::info_span!("advance_to_step").entered(); + self.update_step(); + } tracing::info!( target: "console_test::support::state", "advance_to_step: {current} -> {next_step}", @@ -132,9 +141,21 @@ impl TestState { fn update_step(&mut self) { loop { match self.receiver.try_recv() { - Ok(step) => self.step = step, - Err(TryRecvError::Lagged(_)) => { - // we don't mind being lagged, we'll just get the latest state + Ok(step) => { + tracing::info!( + target: "console_test::support::state", + "update_step: {previous} -> {current}.", + previous = self.step, + current = step, + ); + self.step = step; + } + Err(TryRecvError::Lagged(count)) => { + tracing::info!( + target: "console_test::support::state", + "update_step: lagged by {count}! This is actually a big problem.", + count= count, + ); } Err(TryRecvError::Closed) => { panic!("failed to update current step, did the test abort?") From 76bb0616ccb9d960424df957a9fa86fc5692ccdb Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 24 Aug 2023 16:33:09 +0200 Subject: [PATCH 11/20] Increase test state channel capacity --- console-subscriber/tests/support/state.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/console-subscriber/tests/support/state.rs b/console-subscriber/tests/support/state.rs index f5a3ca51c..16e0438eb 100644 --- a/console-subscriber/tests/support/state.rs +++ b/console-subscriber/tests/support/state.rs @@ -39,7 +39,8 @@ pub(super) struct TestState { impl TestState { pub(super) fn new() -> Self { - let (sender, receiver) = broadcast::channel(1); + // Capacity sufficient for all states (will never lose anything) + let (sender, receiver) = broadcast::channel(5); Self { receiver, sender, From 7b32fdce5c48fd37ca131ebbfe336f10a590451c Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 24 Aug 2023 17:20:16 +0200 Subject: [PATCH 12/20] maybe avoid a data race --- console-subscriber/tests/support/subscriber.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 1532c03da..8125788fa 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -245,9 +245,10 @@ async fn console_client(client_stream: DuplexStream, mut test_state: TestState) })) .await .expect("client-console error: couldn't create client"); + let record_test_state = test_state.clone(); test_state.advance_to_step(TestStep::ClientConnected); - record_actual_tasks(channel, test_state.clone()).await + record_actual_tasks(channel, record_test_state).await } /// Records the actual tasks which are received by the client channel. From 3057541326c304d3b56e2e8df48dca7aafecab99 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 25 Aug 2023 15:07:14 +0200 Subject: [PATCH 13/20] undo all testing changes This commit has the same changeset as the original commit. --- .github/workflows/ci.yaml | 23 +++--- console-subscriber/src/aggregator/mod.rs | 30 +------ console-subscriber/src/lib.rs | 15 ---- console-subscriber/tests/support/state.rs | 51 ++---------- .../tests/support/subscriber.rs | 81 +++---------------- 5 files changed, 29 insertions(+), 171 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 30c93fd0e..d0c01f7f1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -49,16 +49,15 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] - # os: [macos-latest] rust: [stable] - # include: - # # Make 1.60 MSRV, as it's Tonic 0.9's MSRV. - # - rust: 1.60.0 - # os: ubuntu-latest + include: + # Make 1.60 MSRV, as it's Tonic 0.9's MSRV. + - rust: 1.60.0 + os: ubuntu-latest # Try to build on the latest nightly. This job is allowed to fail, but # it's useful to help catch bugs in upcoming Rust versions. - # - rust: nightly - # os: ubuntu-latest + - rust: nightly + os: ubuntu-latest steps: - name: Checkout sources uses: actions/checkout@v3 @@ -74,14 +73,14 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} - # - name: Run cargo test (API) - # run: cargo test -p console-api + - name: Run cargo test (API) + run: cargo test -p console-api - name: Run cargo test (subscriber) - run: cargo test -p console-subscriber -- --nocapture --test-threads=1 + run: cargo test -p console-subscriber - # - name: Run cargo test (console) - # run: cargo test -p tokio-console --locked + - name: Run cargo test (console) + run: cargo test -p tokio-console --locked lints: name: Lints diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 20d5c5360..4496cba28 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -221,30 +221,9 @@ impl Aggregator { // to be woken when the flush interval has elapsed, or when the // channel is almost full. let mut drained = false; - let mut count_async_op = 0; - let mut count_metadata = 0; - let mut count_poll_op = 0; - let mut count_resource = 0; - let mut count_spawn = 0; while let Some(event) = self.events.recv().now_or_never() { match event { Some(event) => { - match &event { - Event::AsyncResourceOp { .. } => count_async_op += 1, - Event::Metadata(_) => count_metadata += 1, - Event::PollOp { .. } => count_poll_op += 1, - Event::Resource { .. } => count_resource += 1, - Event::Spawn { - id, - metadata: _, - stats: _, - fields, - location: _, - } => { - tracing::debug!(?id, ?fields, "spawn"); - count_spawn += 1; - } - } self.update_state(event); drained = true; } @@ -256,14 +235,7 @@ impl Aggregator { } }; } - tracing::debug!( - count_async_op, - count_metadata, - count_poll_op, - count_resource, - count_spawn, - "received events" - ); + // flush data to clients, if there are any currently subscribed // watchers and we should send a new update. if !self.watchers.is_empty() && should_send { diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index d13d73158..8df105cce 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -457,10 +457,6 @@ impl ConsoleLayer { Err(TrySendError::Closed(_)) => { // we should warn here eventually, but nop for now because we // can't trigger tracing events... - println!( - "send_stats<{obj_type}> closed", - obj_type = std::any::type_name::(), - ); None } Err(TrySendError::Full(_)) => { @@ -474,17 +470,6 @@ impl ConsoleLayer { }; let capacity = self.tx.capacity(); - let stats_type = std::any::type_name::(); - if stats_type != "()" { - let time = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .expect("whoops"); - println!( - "{timestamp:.6} send_stats capacity={capacity} did_send={did_send} will_flush={will_flush} stats_type={stats_type}", - timestamp = time.as_secs_f64(), - did_send = sent.is_some(), - will_flush = (capacity <= self.flush_under_capacity)); - } if capacity <= self.flush_under_capacity { self.shared.flush.trigger(); } diff --git a/console-subscriber/tests/support/state.rs b/console-subscriber/tests/support/state.rs index 16e0438eb..6fc663808 100644 --- a/console-subscriber/tests/support/state.rs +++ b/console-subscriber/tests/support/state.rs @@ -39,8 +39,7 @@ pub(super) struct TestState { impl TestState { pub(super) fn new() -> Self { - // Capacity sufficient for all states (will never lose anything) - let (sender, receiver) = broadcast::channel(5); + let (sender, receiver) = broadcast::channel(1); Self { receiver, sender, @@ -54,16 +53,6 @@ impl TestState { /// /// This function will panic if the underlying channel gets closed. pub(super) async fn wait_for_step(&mut self, desired_step: TestStep) { - { - let _guard = tracing::info_span!("wait_for_step").entered(); - self.update_step(); - } - tracing::info!( - target: "console_test::support::state", - "wait_for_step: {current} -> {desired_step}", - current = self.step, - ); - loop { if self.step >= desired_step { break; @@ -83,15 +72,7 @@ impl TestState { /// Check whether the desired step has been reached without blocking. pub(super) fn try_wait_for_step(&mut self, desired_step: TestStep) -> bool { - { - let _guard = tracing::info_span!("try_wait_for_step").entered(); - self.update_step(); - } - tracing::info!( - target: "console_test::support::state", - "try_wait_for_step: {current} -> {desired_step}", - current = self.step, - ); + self.update_step(); self.step == desired_step } @@ -108,15 +89,7 @@ impl TestState { /// `next_step` or if the underlying channel is closed. #[track_caller] pub(super) fn advance_to_step(&mut self, next_step: TestStep) { - { - let _guard = tracing::info_span!("advance_to_step").entered(); - self.update_step(); - } - tracing::info!( - target: "console_test::support::state", - "advance_to_step: {current} -> {next_step}", - current = self.step, - ); + self.update_step(); if self.step >= next_step { panic!( @@ -142,21 +115,9 @@ impl TestState { fn update_step(&mut self) { loop { match self.receiver.try_recv() { - Ok(step) => { - tracing::info!( - target: "console_test::support::state", - "update_step: {previous} -> {current}.", - previous = self.step, - current = step, - ); - self.step = step; - } - Err(TryRecvError::Lagged(count)) => { - tracing::info!( - target: "console_test::support::state", - "update_step: lagged by {count}! This is actually a big problem.", - count= count, - ); + Ok(step) => self.step = step, + Err(TryRecvError::Lagged(_)) => { + // we don't mind being lagged, we'll just get the latest state } Err(TryRecvError::Closed) => { panic!("failed to update current step, did the test abort?") diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 8125788fa..36888ad5a 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt, fs::File, future::Future, panic, thread}; +use std::{collections::HashMap, fmt, future::Future, thread}; use console_api::{ field::Value, @@ -9,8 +9,6 @@ use futures::stream::StreamExt; use tokio::{io::DuplexStream, task}; use tonic::transport::{Channel, Endpoint, Server, Uri}; use tower::service_fn; -use tracing_core::dispatcher::DefaultGuard; -use tracing_subscriber::EnvFilter; use super::state::{TestState, TestStep}; use super::task::{ActualTask, ExpectedTask, TaskValidationFailure}; @@ -32,16 +30,6 @@ impl fmt::Display for TestFailure { } } -fn set_debug_subscriber(_file: File) -> DefaultGuard { - let subscriber = tracing_subscriber::fmt() - // .with_writer(file) - .with_env_filter( - EnvFilter::builder().parse_lossy("console_subscriber=debug,console_test=info,info"), - ) - .finish(); - tracing::subscriber::set_default(subscriber) -} - /// Runs the test /// /// This function runs the whole test. It sets up a `console-subscriber` layer @@ -57,41 +45,11 @@ where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - let caller = std::panic::Location::caller(); - let caller_file = match caller.file().rfind("/") { - Some(index) => { - if index >= caller.file().len() { - "weird" - } else { - &caller.file()[index + 1..] - } - } - None => caller.file(), - }; - let source_line = std::panic::Location::caller().line(); - let writer = File::create(format!( - "console_test_log-file_{}-line_{}-run_test.log", - caller_file, source_line - )) - .unwrap(); - let _subscriber_guard = - set_debug_subscriber(writer.try_clone().expect("couldn't clone file handle")); use tracing_subscriber::prelude::*; - let span = tracing::info_span!(target: "console_test::support", "run_test", file = %caller_file, line = source_line); - let _span_guard = span.enter(); - - print!("\n\n"); - tracing::info!(target: "console_test::support", ?expected_tasks, "run_test"); let (client_stream, server_stream) = tokio::io::duplex(1024); let (console_layer, server) = console_subscriber::ConsoleLayer::builder().build(); - let fmt_layer = tracing_subscriber::fmt::layer().with_filter( - EnvFilter::builder().parse_lossy("console_subscriber=debug,console_test=info,info"), - ); - - let registry = tracing_subscriber::registry() - .with(console_layer) - .with(fmt_layer); + let registry = tracing_subscriber::registry().with(console_layer); let mut test_state = TestState::new(); let mut test_state_test = test_state.clone(); @@ -99,10 +57,8 @@ where let join_handle = thread::Builder::new() .name("console::subscriber".into()) .spawn(move || { - let _subscriber_guard = set_debug_subscriber(writer); - let span = tracing::info_span!(target: "console_test::support", "run_test", file = %caller_file, line = source_line); - let _span_guard = span.enter(); - + let _subscriber_guard = + tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default()); let runtime = tokio::runtime::Builder::new_current_thread() .enable_io() .enable_time() @@ -135,21 +91,16 @@ where .unwrap(); runtime.block_on(async move { - let span = tracing::info_span!(target: "console_test::support", "run_test", file = %caller_file, line = source_line); - let _span_guard = span.enter(); - test_state_test .wait_for_step(TestStep::ClientConnected) .await; // Run the future that we are testing. - tracing::info!("Future under test: will spawn."); _ = tokio::task::Builder::new() .name(MAIN_TASK_NAME) .spawn(future) .expect("console-test error: couldn't spawn test task") .await; - tracing::info!("Future under test: completed."); test_state_test.advance_to_step(TestStep::TestFinished); test_state_test @@ -163,10 +114,7 @@ where .expect("console-test error: failed to join 'console-subscriber' thread"); if let Err(test_failure) = validate_expected_tasks(expected_tasks, actual_tasks) { - tracing::info!(target: "console_test::support", "Test failed: {test_failure}", test_failure = test_failure); panic!("Test failed: {test_failure}") - } else { - tracing::info!(target: "console_test::support", "Test passed"); } } @@ -245,10 +193,9 @@ async fn console_client(client_stream: DuplexStream, mut test_state: TestState) })) .await .expect("client-console error: couldn't create client"); - let record_test_state = test_state.clone(); test_state.advance_to_step(TestStep::ClientConnected); - record_actual_tasks(channel, record_test_state).await + record_actual_tasks(channel, test_state.clone()).await } /// Records the actual tasks which are received by the client channel. @@ -276,7 +223,7 @@ async fn record_actual_tasks( let mut tasks = HashMap::new(); - let mut last_updates: Option = None; + let mut last_update = false; while let Some(update) = stream.next().await { match update { Ok(update) => { @@ -302,7 +249,7 @@ async fn record_actual_tasks( } for (id, stats) in &task_update.stats_update { - if let Some(task) = tasks.get_mut(id) { + if let Some(mut task) = tasks.get_mut(id) { task.wakes = stats.wakes; task.self_wakes = stats.self_wakes; } @@ -314,19 +261,13 @@ async fn record_actual_tasks( } } - if let Some(count) = last_updates.as_mut() { - *count -= 1; - tracing::info!(count, "last updates"); - if *count <= 0 { - break; - } - continue; + if last_update { + break; } if test_state.try_wait_for_step(TestStep::TestFinished) { - // Once the test finishes running, we will check for 2 further updates and then finish. - // FIXME(hds): Why 2? Is there some way we can do this more deterministically? - last_updates = Some(2); + // Once the test finishes running, we will get one further update and finish. + last_update = true; } } From d7744839231039aafb2712df9111f769353c329d Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 25 Aug 2023 15:27:23 +0200 Subject: [PATCH 14/20] Apply suggestions from code review Co-authored-by: Eliza Weisman --- console-subscriber/tests/support/state.rs | 27 ++++++++----------- .../tests/support/subscriber.rs | 27 ++++++++----------- console-subscriber/tests/support/task.rs | 20 +++++++++----- 3 files changed, 35 insertions(+), 39 deletions(-) diff --git a/console-subscriber/tests/support/state.rs b/console-subscriber/tests/support/state.rs index 6fc663808..9a69f9632 100644 --- a/console-subscriber/tests/support/state.rs +++ b/console-subscriber/tests/support/state.rs @@ -47,16 +47,13 @@ impl TestState { } } - /// Block asynchronously until the desired step has been reached. + /// Wait asynchronously until the desired step has been reached. /// /// # Panics /// /// This function will panic if the underlying channel gets closed. pub(super) async fn wait_for_step(&mut self, desired_step: TestStep) { - loop { - if self.step >= desired_step { - break; - } + while self.step < desired_step { match self.receiver.recv().await { Ok(step) => self.step = step, @@ -70,8 +67,8 @@ impl TestState { } } - /// Check whether the desired step has been reached without blocking. - pub(super) fn try_wait_for_step(&mut self, desired_step: TestStep) -> bool { + /// Returns `true` if the current step is `desired_step` or later. + pub(super) fn is_step(&mut self, desired_step: TestStep) -> bool { self.update_step(); self.step == desired_step @@ -86,25 +83,23 @@ impl TestState { /// # Panics /// /// This method will panic if the test state is not at the step prior to - /// `next_step` or if the underlying channel is closed. + /// `next_step`, or if the underlying channel is closed. #[track_caller] pub(super) fn advance_to_step(&mut self, next_step: TestStep) { self.update_step(); - if self.step >= next_step { - panic!( - "cannot advance to previous or current step! current step: {current}, next step: {next_step}", - current = self.step); - } + assert!( + self.step < next_step, + "cannot advance to previous or current step! current step: {current}, next step: {next_step}", + current = self.step, + ); match (&self.step, &next_step) { (TestStep::Start, TestStep::ServerStarted) | (TestStep::ServerStarted, TestStep::ClientConnected) | (TestStep::ClientConnected, TestStep::TestFinished) | (TestStep::TestFinished, TestStep::UpdatesRecorded) => {}, - (_, _) => panic!( - "cannot advance more than one step! current step: {current}, next step: {next_step}", - current = self.step), + (current, _) => panic!("cannot advance more than one step! current step: {current}, next step: {next_step}"), } self.sender diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 36888ad5a..c8496c5eb 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -175,20 +175,18 @@ async fn console_client(client_stream: DuplexStream, mut test_state: TestState) test_state.wait_for_step(TestStep::ServerStarted).await; let mut client_stream = Some(client_stream); - let channel = Endpoint::try_from("http://[::]:6669") - .expect("Could not create endpoint") + // Note: we won't actually try to connect to this port on localhost, + // because we will call `connect_with_connector` with a service that + // just returns the `DuplexStream`, instead of making an actual + // network connection. + let endpoint = Endpoint::try_from("http://[::]:6669") + .expect("Could not create endpoint"); + let channel = endpoint .connect_with_connector(service_fn(move |_: Uri| { let client = client_stream.take(); async move { - if let Some(client) = client { - Ok(client) - } else { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Client already taken", - )) - } + client.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "Client already taken")) } })) .await @@ -213,12 +211,9 @@ async fn record_actual_tasks( ) -> Vec { let mut client = InstrumentClient::new(client_channel); - let mut stream = loop { - let request = tonic::Request::new(InstrumentRequest {}); - match client.watch_updates(request).await { - Ok(stream) => break stream.into_inner(), - Err(err) => panic!("Client cannot connect to watch updates: {err}"), - } + let mut stream = match client.watch_updates(tonic::Request::new(InstrumentRequest {})).await { + Ok(stream) => stream.into_inner(), + Err(err) => panic!("Client cannot connect to watch updates: {err}"), }; let mut tasks = HashMap::new(); diff --git a/console-subscriber/tests/support/task.rs b/console-subscriber/tests/support/task.rs index 6df878b1b..ea4a929f9 100644 --- a/console-subscriber/tests/support/task.rs +++ b/console-subscriber/tests/support/task.rs @@ -50,11 +50,15 @@ impl fmt::Debug for TaskValidationFailure { Some(actual) => write!( f, "Task Validation Failed!\n Expected Task: {expected:?}\n Actual Task: {actual:?}\n Failure: {failure}", - expected = self.expected, failure = self.failure), + expected = self.expected, + failure = self.failure, + ), None => write!( f, "Task Validation Failed!\n Expected Task: {expected:?}\n Failure: {failure}", - expected = self.expected, failure = self.failure), + expected = self.expected, + failure = self.failure, + ), } } } @@ -67,7 +71,6 @@ impl fmt::Debug for TaskValidationFailure { #[derive(Clone, Debug)] pub(crate) struct ExpectedTask { match_name: Option, - expect_present: Option, expect_wakes: Option, expect_self_wakes: Option, @@ -134,7 +137,8 @@ impl ExpectedTask { actual: Some(actual_task.clone()), failure: format!( "{self}: expected `wakes` to be {expected_wakes}, but actual was {actual_wakes}", - actual_wakes = actual_task.wakes), + actual_wakes = actual_task.wakes, + ), }); } } @@ -147,7 +151,8 @@ impl ExpectedTask { actual: Some(actual_task.clone()), failure: format!( "{self}: expected `self_wakes` to be {expected_self_wakes}, but actual was {actual_self_wakes}", - actual_self_wakes = actual_task.self_wakes), + actual_self_wakes = actual_task.self_wakes, + ), }); } } @@ -157,7 +162,8 @@ impl ExpectedTask { expected: self.clone(), actual: Some(actual_task.clone()), failure: format!( - "{self}: no expectations set, if you want to just expect that a matching task is present, use `expect_present()`") + "{self}: no expectations set, if you want to just expect that a matching task is present, use `expect_present()`", + ), }); } @@ -223,6 +229,6 @@ impl fmt::Display for ExpectedTask { Some(name) => format!("name={name}"), None => "(no fields to match on)".into(), }; - write!(f, "Task<{fields}>") + write!(f, "Task< {{ {fields} }}") } } From 4dac412abf2e30056e160a7d71fd10de7a6fcd9f Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 25 Aug 2023 16:34:56 +0200 Subject: [PATCH 15/20] add signal task at end of test Rather than relying on all the tasks becoming visible N update iterations after the test ends, we spawn a signal task which we then look for. Once the test has completed (which will almost certainly happen first) and the signal task has been read, we finish parsing the current update and then finish immediately. --- console-subscriber/tests/framework.rs | 10 ++--- console-subscriber/tests/support/state.rs | 1 - .../tests/support/subscriber.rs | 45 ++++++++++++------- console-subscriber/tests/support/task.rs | 2 +- 4 files changed, 34 insertions(+), 24 deletions(-) diff --git a/console-subscriber/tests/framework.rs b/console-subscriber/tests/framework.rs index 68bf2a0ce..855f778ac 100644 --- a/console-subscriber/tests/framework.rs +++ b/console-subscriber/tests/framework.rs @@ -27,7 +27,7 @@ fn expect_present() { #[test] #[should_panic(expected = "Test failed: Task validation failed: - - Task: no expectations set, if you want to just expect that a matching task is present, use `expect_present()` + - Task { name=console-test::main }: no expectations set, if you want to just expect that a matching task is present, use `expect_present()` ")] fn fail_no_expectations() { let expected_task = ExpectedTask::default().match_default_name(); @@ -52,7 +52,7 @@ fn wakes() { #[test] #[should_panic(expected = "Test failed: Task validation failed: - - Task: expected `wakes` to be 5, but actual was 1 + - Task { name=console-test::main }: expected `wakes` to be 5, but actual was 1 ")] fn fail_wakes() { let expected_task = ExpectedTask::default().match_default_name().expect_wakes(5); @@ -77,7 +77,7 @@ fn self_wakes() { #[test] #[should_panic(expected = "Test failed: Task validation failed: - - Task: expected `self_wakes` to be 1, but actual was 0 + - Task { name=console-test::main }: expected `self_wakes` to be 1, but actual was 0 ")] fn fail_self_wake() { let expected_task = ExpectedTask::default() @@ -108,7 +108,7 @@ fn test_spawned_task() { #[test] #[should_panic(expected = "Test failed: Task validation failed: - - Task: no matching actual task was found + - Task { name=wrong-name }: no matching actual task was found ")] fn fail_wrong_task_name() { let expected_task = ExpectedTask::default().match_name("wrong-name".into()); @@ -151,7 +151,7 @@ fn multiple_tasks() { #[test] #[should_panic(expected = "Test failed: Task validation failed: - - Task: expected `wakes` to be 2, but actual was 1 + - Task { name=task-2 }: expected `wakes` to be 2, but actual was 1 ")] fn fail_1_of_2_expected_tasks() { let expected_tasks = vec![ diff --git a/console-subscriber/tests/support/state.rs b/console-subscriber/tests/support/state.rs index 9a69f9632..3c7f20187 100644 --- a/console-subscriber/tests/support/state.rs +++ b/console-subscriber/tests/support/state.rs @@ -54,7 +54,6 @@ impl TestState { /// This function will panic if the underlying channel gets closed. pub(super) async fn wait_for_step(&mut self, desired_step: TestStep) { while self.step < desired_step { - match self.receiver.recv().await { Ok(step) => self.step = step, Err(RecvError::Lagged(_)) => { diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index c8496c5eb..55121c216 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -13,7 +13,8 @@ use tower::service_fn; use super::state::{TestState, TestStep}; use super::task::{ActualTask, ExpectedTask, TaskValidationFailure}; -pub(crate) const MAIN_TASK_NAME: &str = "main"; +pub(crate) const MAIN_TASK_NAME: &str = "console-test::main"; +const END_SIGNAL_TASK_NAME: &str = "console-test::signal"; #[derive(Debug)] struct TestFailure { @@ -96,11 +97,16 @@ where .await; // Run the future that we are testing. - _ = tokio::task::Builder::new() + _ = task::Builder::new() .name(MAIN_TASK_NAME) .spawn(future) .expect("console-test error: couldn't spawn test task") .await; + _ = task::Builder::new() + .name(END_SIGNAL_TASK_NAME) + .spawn(futures::future::ready(())) + .expect("console-test error: couldn't spawn end signal task") + .await; test_state_test.advance_to_step(TestStep::TestFinished); test_state_test @@ -177,23 +183,24 @@ async fn console_client(client_stream: DuplexStream, mut test_state: TestState) let mut client_stream = Some(client_stream); // Note: we won't actually try to connect to this port on localhost, // because we will call `connect_with_connector` with a service that - // just returns the `DuplexStream`, instead of making an actual + // just returns the `DuplexStream`, instead of making an actual // network connection. - let endpoint = Endpoint::try_from("http://[::]:6669") - .expect("Could not create endpoint"); + let endpoint = Endpoint::try_from("http://[::]:6669").expect("Could not create endpoint"); let channel = endpoint .connect_with_connector(service_fn(move |_: Uri| { let client = client_stream.take(); async move { - client.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "Client already taken")) + client.ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::Other, "Client already taken") + }) } })) .await .expect("client-console error: couldn't create client"); test_state.advance_to_step(TestStep::ClientConnected); - record_actual_tasks(channel, test_state.clone()).await + record_actual_tasks(channel, test_state).await } /// Records the actual tasks which are received by the client channel. @@ -211,14 +218,18 @@ async fn record_actual_tasks( ) -> Vec { let mut client = InstrumentClient::new(client_channel); - let mut stream = match client.watch_updates(tonic::Request::new(InstrumentRequest {})).await { + let mut stream = match client + .watch_updates(tonic::Request::new(InstrumentRequest {})) + .await + { Ok(stream) => stream.into_inner(), Err(err) => panic!("Client cannot connect to watch updates: {err}"), }; let mut tasks = HashMap::new(); - let mut last_update = false; + let signal_task = ExpectedTask::default().match_name(END_SIGNAL_TASK_NAME.into()); + let mut signal_task_read = false; while let Some(update) = stream.next().await { match update { Ok(update) => { @@ -239,12 +250,16 @@ async fn record_actual_tasks( } } } - tasks.insert(actual_task.id, actual_task); + if signal_task.matches_actual_task(&actual_task) { + signal_task_read = true; + } else { + tasks.insert(actual_task.id, actual_task); + } } } for (id, stats) in &task_update.stats_update { - if let Some(mut task) = tasks.get_mut(id) { + if let Some(task) = tasks.get_mut(id) { task.wakes = stats.wakes; task.self_wakes = stats.self_wakes; } @@ -256,14 +271,10 @@ async fn record_actual_tasks( } } - if last_update { + if test_state.is_step(TestStep::TestFinished) && signal_task_read { + // Once the test finishes running and we've read the signal task, the test ends. break; } - - if test_state.try_wait_for_step(TestStep::TestFinished) { - // Once the test finishes running, we will get one further update and finish. - last_update = true; - } } tasks.into_values().collect() diff --git a/console-subscriber/tests/support/task.rs b/console-subscriber/tests/support/task.rs index ea4a929f9..3f050382d 100644 --- a/console-subscriber/tests/support/task.rs +++ b/console-subscriber/tests/support/task.rs @@ -229,6 +229,6 @@ impl fmt::Display for ExpectedTask { Some(name) => format!("name={name}"), None => "(no fields to match on)".into(), }; - write!(f, "Task< {{ {fields} }}") + write!(f, "Task {{ {fields} }}") } } From fee7b0aba022e812f4cd1a2429bb8afd2c992331 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 25 Aug 2023 17:12:29 +0200 Subject: [PATCH 16/20] improved rightward drift in update loop The one in the instrumentation client. --- .../tests/support/subscriber.rs | 67 +++++++++---------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 55121c216..45cd68820 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -231,43 +231,42 @@ async fn record_actual_tasks( let signal_task = ExpectedTask::default().match_name(END_SIGNAL_TASK_NAME.into()); let mut signal_task_read = false; while let Some(update) = stream.next().await { - match update { - Ok(update) => { - if let Some(task_update) = &update.task_update { - for new_task in &task_update.new_tasks { - if let Some(id) = &new_task.id { - let mut actual_task = ActualTask::new(id.id); - for field in &new_task.fields { - if let Some(console_api::field::Name::StrName(field_name)) = - &field.name - { - if field_name == "task.name" { - actual_task.name = match &field.value { - Some(Value::DebugVal(value)) => Some(value.clone()), - Some(Value::StrVal(value)) => Some(value.clone()), - _ => None, // Anything that isn't string-like shouldn't be used as a name. - }; - } - } - } - if signal_task.matches_actual_task(&actual_task) { - signal_task_read = true; - } else { - tasks.insert(actual_task.id, actual_task); - } + let update = update.expect("update stream error"); + + if let Some(task_update) = &update.task_update { + for new_task in &task_update.new_tasks { + let mut actual_task = match new_task.id { + Some(id) => ActualTask::new(id.id), + None => continue, + }; + let name = new_task + .fields + .iter() + .find_map(|field| match field.name.as_ref()? { + console_api::field::Name::StrName(name) if name == "task.name" => { + Some(field.value.clone()) } - } - - for (id, stats) in &task_update.stats_update { - if let Some(task) = tasks.get_mut(id) { - task.wakes = stats.wakes; - task.self_wakes = stats.self_wakes; - } - } + _ => None, + }) + .flatten(); + actual_task.name = match name { + Some(Value::DebugVal(value)) => Some(value.clone()), + Some(Value::StrVal(value)) => Some(value.clone()), + _ => None, // Anything that isn't string-like shouldn't be used as a name. + }; + + if signal_task.matches_actual_task(&actual_task) { + signal_task_read = true; + } else { + tasks.insert(actual_task.id, actual_task); } } - Err(e) => { - panic!("update stream error: {}", e); + + for (id, stats) in &task_update.stats_update { + if let Some(task) = tasks.get_mut(id) { + task.wakes = stats.wakes; + task.self_wakes = stats.self_wakes; + } } } From 514aba1e093afc90b43beeed632680f1b50ecb6d Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 6 Sep 2023 13:48:38 +0200 Subject: [PATCH 17/20] Apply suggestions from code review Co-authored-by: Eliza Weisman --- console-subscriber/tests/support/state.rs | 13 ++++++++++--- console-subscriber/tests/support/subscriber.rs | 14 ++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/console-subscriber/tests/support/state.rs b/console-subscriber/tests/support/state.rs index 3c7f20187..fd41c11f0 100644 --- a/console-subscriber/tests/support/state.rs +++ b/console-subscriber/tests/support/state.rs @@ -89,7 +89,8 @@ impl TestState { assert!( self.step < next_step, - "cannot advance to previous or current step! current step: {current}, next step: {next_step}", + "console-test error: cannot advance to previous or current step! \ + current step: {current}, next step: {next_step}", current = self.step, ); @@ -98,12 +99,18 @@ impl TestState { (TestStep::ServerStarted, TestStep::ClientConnected) | (TestStep::ClientConnected, TestStep::TestFinished) | (TestStep::TestFinished, TestStep::UpdatesRecorded) => {}, - (current, _) => panic!("cannot advance more than one step! current step: {current}, next step: {next_step}"), + (current, _) => panic!( + "console-test error: test cannot advance more than one step! \ + current step: {current}, next step: {next_step}" + ), } self.sender .send(next_step) - .expect("failed to send the next test step, did the test abort?"); + .expect( + "console-test error: failed to send the next test step, \ + did the test abort?" + ); } fn update_step(&mut self) { diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 45cd68820..128b537e0 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -54,9 +54,19 @@ where let mut test_state = TestState::new(); let mut test_state_test = test_state.clone(); - + + let thread_name = { + // Include the name of the test thread in the spawned subscriber thread, + // to make it clearer which test it belongs to. + let test = thread::current().name().unwrap_or("); + format!("{test}-console::subscriber") + } let join_handle = thread::Builder::new() - .name("console::subscriber".into()) + .name(thread_name) + // Run the test's console server and client tasks in a separate thread + // from the main test, ensuring that any `tracing` emitted by the + // console worker and the client are not collected by the subscriber + // under test. .spawn(move || { let _subscriber_guard = tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default()); From 4af221a4655488e7b42be3d57cc3bd4aa7256bad Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 6 Sep 2023 13:49:35 +0200 Subject: [PATCH 18/20] Apply suggestions from code review (part 2) Co-authored-by: Eliza Weisman --- .../tests/support/subscriber.rs | 42 ++++++++----------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 128b537e0..bab3dcd25 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -160,14 +160,14 @@ async fn console_server( let aggregate = task::Builder::new() .name("console::aggregate") .spawn(aggregator.run()) - .expect("client-console error: couldn't spawn aggregator"); + .expect("console-test error: couldn't spawn aggregator"); Server::builder() .add_service(service) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( server_stream, )])) .await - .expect("client-console error: couldn't start instrument server."); + .expect("console-test error: couldn't start instrument server."); test_state.advance_to_step(TestStep::ServerStarted); test_state.wait_for_step(TestStep::UpdatesRecorded).await; @@ -201,13 +201,11 @@ async fn console_client(client_stream: DuplexStream, mut test_state: TestState) let client = client_stream.take(); async move { - client.ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::Other, "Client already taken") - }) + Ok(client.expect("console-test error: client already taken (this shouldn't happen)")) } })) .await - .expect("client-console error: couldn't create client"); + .expect("console-test client error: couldn't create client"); test_state.advance_to_step(TestStep::ClientConnected); record_actual_tasks(channel, test_state).await @@ -241,7 +239,7 @@ async fn record_actual_tasks( let signal_task = ExpectedTask::default().match_name(END_SIGNAL_TASK_NAME.into()); let mut signal_task_read = false; while let Some(update) = stream.next().await { - let update = update.expect("update stream error"); + let update = update.expect("console-test error: update stream error"); if let Some(task_update) = &update.task_update { for new_task in &task_update.new_tasks { @@ -249,21 +247,18 @@ async fn record_actual_tasks( Some(id) => ActualTask::new(id.id), None => continue, }; - let name = new_task - .fields - .iter() - .find_map(|field| match field.name.as_ref()? { + for field in new_task.fields { + match field.name.as_ref() { console_api::field::Name::StrName(name) if name == "task.name" => { - Some(field.value.clone()) - } - _ => None, - }) - .flatten(); - actual_task.name = match name { - Some(Value::DebugVal(value)) => Some(value.clone()), - Some(Value::StrVal(value)) => Some(value.clone()), - _ => None, // Anything that isn't string-like shouldn't be used as a name. - }; + actual_task.name = match field.value.as_ref() { + Value::DebugVal(value) => actual_task.name = Some(value), + Value::StrVal(value) => actual_task.name = Some(value), + _ => continue; + } + }, + _ => {} + } + } if signal_task.matches_actual_task(&actual_task) { signal_task_read = true; @@ -304,10 +299,7 @@ fn validate_expected_tasks( let failures: Vec<_> = expected_tasks .iter() .map(|expected| validate_expected_task(expected, &actual_tasks)) - .filter_map(|r| match r { - Ok(_) => None, - Err(validation_error) => Some(validation_error), - }) + .filter_map(Result::err) .collect(); if failures.is_empty() { From 76197412364df2b04312ca7606128f520d686cea Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 6 Sep 2023 14:41:08 +0200 Subject: [PATCH 19/20] Update console-subscriber/tests/support/subscriber.rs Co-authored-by: Eliza Weisman --- console-subscriber/tests/support/subscriber.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index bab3dcd25..81c963122 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -163,9 +163,7 @@ async fn console_server( .expect("console-test error: couldn't spawn aggregator"); Server::builder() .add_service(service) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( - server_stream, - )])) + .serve_with_incoming(futures::stream::once(Ok::<_, std::io::Error>(server_stream))) .await .expect("console-test error: couldn't start instrument server."); test_state.advance_to_step(TestStep::ServerStarted); From 6fce209a09066147cbd9f8044810f24ee7763cff Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 6 Sep 2023 15:01:26 +0200 Subject: [PATCH 20/20] Suggestions from code review Thanks! --- console-subscriber/Cargo.toml | 2 +- console-subscriber/tests/support/state.rs | 39 ++++++++-------- .../tests/support/subscriber.rs | 44 +++++++++++++------ console-subscriber/tests/support/task.rs | 18 +++++--- 4 files changed, 65 insertions(+), 38 deletions(-) diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index ca3a9f25c..1bfb60e24 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -55,7 +55,7 @@ crossbeam-channel = "0.5" [dev-dependencies] tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] } -tower = "0.4" +tower = { version = "0.4", default-features = false } futures = "0.3" [package.metadata.docs.rs] diff --git a/console-subscriber/tests/support/state.rs b/console-subscriber/tests/support/state.rs index fd41c11f0..97aef9d48 100644 --- a/console-subscriber/tests/support/state.rs +++ b/console-subscriber/tests/support/state.rs @@ -59,9 +59,11 @@ impl TestState { Err(RecvError::Lagged(_)) => { // we don't mind being lagged, we'll just get the latest state } - Err(RecvError::Closed) => { - panic!("failed to receive current step, waiting for step: {desired_step}, did the test abort?"); - } + Err(RecvError::Closed) => panic!( + "console-test error: failed to receive current step, \ + waiting for step: {desired_step}. This shouldn't happen, \ + did the test abort?" + ), } } } @@ -90,27 +92,27 @@ impl TestState { assert!( self.step < next_step, "console-test error: cannot advance to previous or current step! \ - current step: {current}, next step: {next_step}", + current step: {current}, next step: {next_step}. This shouldn't \ + happen.", current = self.step, ); match (&self.step, &next_step) { - (TestStep::Start, TestStep::ServerStarted) | - (TestStep::ServerStarted, TestStep::ClientConnected) | - (TestStep::ClientConnected, TestStep::TestFinished) | - (TestStep::TestFinished, TestStep::UpdatesRecorded) => {}, + (TestStep::Start, TestStep::ServerStarted) + | (TestStep::ServerStarted, TestStep::ClientConnected) + | (TestStep::ClientConnected, TestStep::TestFinished) + | (TestStep::TestFinished, TestStep::UpdatesRecorded) => {} (current, _) => panic!( "console-test error: test cannot advance more than one step! \ - current step: {current}, next step: {next_step}" + current step: {current}, next step: {next_step}. This \ + shouldn't happen." ), } - self.sender - .send(next_step) - .expect( - "console-test error: failed to send the next test step, \ - did the test abort?" - ); + self.sender.send(next_step).expect( + "console-test error: failed to send the next test step. \ + This shouldn't happen, did the test abort?", + ); } fn update_step(&mut self) { @@ -120,9 +122,10 @@ impl TestState { Err(TryRecvError::Lagged(_)) => { // we don't mind being lagged, we'll just get the latest state } - Err(TryRecvError::Closed) => { - panic!("failed to update current step, did the test abort?") - } + Err(TryRecvError::Closed) => panic!( + "console-test error: failed to update current step, did \ + the test abort?" + ), Err(TryRecvError::Empty) => break, } } diff --git a/console-subscriber/tests/support/subscriber.rs b/console-subscriber/tests/support/subscriber.rs index 81c963122..ace48397d 100644 --- a/console-subscriber/tests/support/subscriber.rs +++ b/console-subscriber/tests/support/subscriber.rs @@ -54,13 +54,14 @@ where let mut test_state = TestState::new(); let mut test_state_test = test_state.clone(); - + let thread_name = { // Include the name of the test thread in the spawned subscriber thread, // to make it clearer which test it belongs to. - let test = thread::current().name().unwrap_or("); + let current_thread = thread::current(); + let test = current_thread.name().unwrap_or(""); format!("{test}-console::subscriber") - } + }; let join_handle = thread::Builder::new() .name(thread_name) // Run the test's console server and client tasks in a separate thread @@ -93,7 +94,7 @@ where actual_tasks }) }) - .expect("console subscriber could not spawn thread"); + .expect("console-test error: console subscriber could not spawn thread"); tracing::subscriber::with_default(registry, || { let runtime = tokio::runtime::Builder::new_current_thread() @@ -163,7 +164,10 @@ async fn console_server( .expect("console-test error: couldn't spawn aggregator"); Server::builder() .add_service(service) - .serve_with_incoming(futures::stream::once(Ok::<_, std::io::Error>(server_stream))) + // .serve_with_incoming(futures::stream::once(Ok::<_, std::io::Error>(server_stream))) + .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( + server_stream, + )])) .await .expect("console-test error: couldn't start instrument server."); test_state.advance_to_step(TestStep::ServerStarted); @@ -199,7 +203,14 @@ async fn console_client(client_stream: DuplexStream, mut test_state: TestState) let client = client_stream.take(); async move { - Ok(client.expect("console-test error: client already taken (this shouldn't happen)")) + // We need to return a Result from this async block, which is + // why we don't unwrap the `client` here. + client.ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::Other, + "console-test error: client already taken. This shouldn't happen.", + ) + }) } })) .await @@ -229,11 +240,16 @@ async fn record_actual_tasks( .await { Ok(stream) => stream.into_inner(), - Err(err) => panic!("Client cannot connect to watch updates: {err}"), + Err(err) => panic!("console-test error: client cannot connect to watch updates: {err}"), }; let mut tasks = HashMap::new(); + // The console-subscriber aggregator is a bit of an unknown entity for us, + // especially with respect to its update loops. We can't guarantee that + // it will have seen all the tasks in our test N iterations after the test + // ends for some known N. For this reason we need to use a signal task to + // check for and end the collection of events at that point. let signal_task = ExpectedTask::default().match_name(END_SIGNAL_TASK_NAME.into()); let mut signal_task_read = false; while let Some(update) = stream.next().await { @@ -245,15 +261,15 @@ async fn record_actual_tasks( Some(id) => ActualTask::new(id.id), None => continue, }; - for field in new_task.fields { + for field in &new_task.fields { match field.name.as_ref() { - console_api::field::Name::StrName(name) if name == "task.name" => { + Some(console_api::field::Name::StrName(name)) if name == "task.name" => { actual_task.name = match field.value.as_ref() { - Value::DebugVal(value) => actual_task.name = Some(value), - Value::StrVal(value) => actual_task.name = Some(value), - _ => continue; - } - }, + Some(Value::DebugVal(value)) => Some(value.clone()), + Some(Value::StrVal(value)) => Some(value.clone()), + _ => continue, + }; + } _ => {} } } diff --git a/console-subscriber/tests/support/task.rs b/console-subscriber/tests/support/task.rs index 3f050382d..63814d016 100644 --- a/console-subscriber/tests/support/task.rs +++ b/console-subscriber/tests/support/task.rs @@ -49,13 +49,17 @@ impl fmt::Debug for TaskValidationFailure { match &self.actual { Some(actual) => write!( f, - "Task Validation Failed!\n Expected Task: {expected:?}\n Actual Task: {actual:?}\n Failure: {failure}", + "Task Validation Failed!\n Expected Task: {expected:?}\ + \n Actual Task: {actual:?}\ + \n Failure: {failure}", expected = self.expected, failure = self.failure, ), None => write!( f, - "Task Validation Failed!\n Expected Task: {expected:?}\n Failure: {failure}", + "Task Validation Failed!\n Expected Task: {expected:?}\ + \n Actual Task: \ + \n Failure: {failure}", expected = self.expected, failure = self.failure, ), @@ -136,7 +140,8 @@ impl ExpectedTask { expected: self.clone(), actual: Some(actual_task.clone()), failure: format!( - "{self}: expected `wakes` to be {expected_wakes}, but actual was {actual_wakes}", + "{self}: expected `wakes` to be {expected_wakes}, but \ + actual was {actual_wakes}", actual_wakes = actual_task.wakes, ), }); @@ -150,7 +155,9 @@ impl ExpectedTask { expected: self.clone(), actual: Some(actual_task.clone()), failure: format!( - "{self}: expected `self_wakes` to be {expected_self_wakes}, but actual was {actual_self_wakes}", + "{self}: expected `self_wakes` to be \ + {expected_self_wakes}, but actual was \ + {actual_self_wakes}", actual_self_wakes = actual_task.self_wakes, ), }); @@ -162,7 +169,8 @@ impl ExpectedTask { expected: self.clone(), actual: Some(actual_task.clone()), failure: format!( - "{self}: no expectations set, if you want to just expect that a matching task is present, use `expect_present()`", + "{self}: no expectations set, if you want to just expect \ + that a matching task is present, use `expect_present()`", ), }); }