diff --git a/babushka-core/tests/test_client_cmd.rs b/babushka-core/tests/test_client_cmd.rs index 2edd6d34c6..fc0011fd35 100644 --- a/babushka-core/tests/test_client_cmd.rs +++ b/babushka-core/tests/test_client_cmd.rs @@ -174,7 +174,9 @@ mod client_cmd_tests { block_on_all(async { let mut client = ClientCMD::create_client(connection_request).await.unwrap(); - mocks.drain(1..config.number_of_replicas_dropped_after_connection + 1); + for mock in mocks.drain(1..config.number_of_replicas_dropped_after_connection + 1) { + mock.close().await; + } for _ in 0..config.number_of_requests_sent { let _ = client.send_packed_command(&cmd).await; } diff --git a/babushka-core/tests/utilities/mocks.rs b/babushka-core/tests/utilities/mocks.rs index 5e140aa94c..8c4226354d 100644 --- a/babushka-core/tests/utilities/mocks.rs +++ b/babushka-core/tests/utilities/mocks.rs @@ -24,6 +24,7 @@ pub struct ServerMock { received_commands: Arc, runtime: Option, // option so that we can take the runtime on drop. closing_signal: Arc, + closing_completed_signal: Arc, } async fn read_from_socket(buffer: &mut Vec, socket: &mut TcpStream) -> Option { @@ -113,6 +114,8 @@ impl ServerMock { ); let closing_signal = Arc::new(ManualResetEvent::new(false)); let closing_signal_clone = closing_signal.clone(); + let closing_completed_signal = Arc::new(ManualResetEvent::new(false)); + let closing_completed_signal_clone = closing_completed_signal.clone(); let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(1) .thread_name(format!("ServerMock - {address}")) @@ -134,6 +137,7 @@ impl ServerMock { .await {} + closing_completed_signal_clone.set(); log_warn("mocks", format!("closing {:?} mock", listener.local_addr())); }); Self { @@ -142,8 +146,14 @@ impl ServerMock { received_commands, runtime: Some(runtime), closing_signal, + closing_completed_signal, } } + + pub async fn close(self) { + self.closing_signal.set(); + self.closing_completed_signal.wait().await; + } } impl Mock for ServerMock {