diff --git a/zebra-grpc/src/server.rs b/zebra-grpc/src/server.rs index b62cdb1c952..d20d9944028 100644 --- a/zebra-grpc/src/server.rs +++ b/zebra-grpc/src/server.rs @@ -2,9 +2,14 @@ use std::{collections::BTreeMap, net::SocketAddr, pin::Pin}; +use color_eyre::eyre::eyre; use futures_util::future::TryFutureExt; +use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, Stream}; -use tonic::{transport::Server, Request, Response, Status}; +use tonic::{ + transport::{server::TcpIncoming, Server}, + Request, Response, Status, +}; use tower::{timeout::error::Elapsed, ServiceExt}; use zebra_chain::{block::Height, transaction}; @@ -436,11 +441,13 @@ impl From for ScanResponse { } } +type ServerTask = JoinHandle>; + /// Initializes the zebra-scan gRPC server pub async fn init( listen_addr: SocketAddr, scan_service: ScanService, -) -> Result<(), color_eyre::Report> +) -> Result<(ServerTask, SocketAddr), color_eyre::Report> where ScanService: tower::Service + Clone @@ -455,11 +462,20 @@ where .build() .unwrap(); - Server::builder() - .add_service(reflection_service) - .add_service(ScannerServer::new(service)) - .serve(listen_addr) - .await?; + let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?; + let listen_addr = tcp_listener.local_addr()?; + let incoming = + TcpIncoming::from_listener(tcp_listener, true, None).map_err(|err| eyre!(err))?; + + let server_task: JoinHandle> = tokio::spawn(async move { + Server::builder() + .add_service(reflection_service) + .add_service(ScannerServer::new(service)) + .serve_with_incoming(incoming) + .await?; + + Ok(()) + }); - Ok(()) + Ok((server_task, listen_addr)) } diff --git a/zebra-grpc/src/tests/snapshot.rs b/zebra-grpc/src/tests/snapshot.rs index 92e8b77aa8d..f468f85cf21 100644 --- a/zebra-grpc/src/tests/snapshot.rs +++ b/zebra-grpc/src/tests/snapshot.rs @@ -29,45 +29,37 @@ use crate::{ pub const ZECPAGES_SAPLING_VIEWING_KEY: &str = "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz"; #[tokio::test(flavor = "multi_thread")] -#[cfg(not(target_os = "windows"))] async fn test_grpc_response_data() { let _init_guard = zebra_test::init(); tokio::join!( - test_mocked_rpc_response_data_for_network( - Network::Mainnet, - zebra_test::net::random_known_port() - ), - test_mocked_rpc_response_data_for_network( - Network::new_default_testnet(), - zebra_test::net::random_known_port() - ), + test_mocked_rpc_response_data_for_network(Network::Mainnet,), + test_mocked_rpc_response_data_for_network(Network::new_default_testnet(),), ); } -async fn test_mocked_rpc_response_data_for_network(network: Network, random_port: u16) { +async fn test_mocked_rpc_response_data_for_network(network: Network) { // get a mocked scan service let mock_scan_service = MockService::build().for_unit_tests(); // start the gRPC server - let listen_addr: std::net::SocketAddr = format!("127.0.0.1:{random_port}") + let listen_addr: std::net::SocketAddr = "127.0.0.1:0" .parse() .expect("hard-coded IP and u16 port should parse successfully"); - { + let (_server_task, listen_addr) = { let mock_scan_service = mock_scan_service.clone(); - tokio::spawn(async move { - init(listen_addr, mock_scan_service) - .await - .expect("Possible port conflict"); - }); - } + tokio::spawn(init(listen_addr, mock_scan_service)) + .await + .expect("task should join successfully") + .expect("should spawn tonic server") + }; // wait for the server to start sleep(Duration::from_secs(1)); // connect to the gRPC server - let client = ScannerClient::connect(format!("http://127.0.0.1:{random_port}")) + let client = ScannerClient::connect(format!("http://{listen_addr}")) .await .expect("server should receive connection"); diff --git a/zebra-grpc/src/tests/vectors.rs b/zebra-grpc/src/tests/vectors.rs index 9a1bf089d88..5d35eb49ce7 100644 --- a/zebra-grpc/src/tests/vectors.rs +++ b/zebra-grpc/src/tests/vectors.rs @@ -4,10 +4,7 @@ use std::{collections::BTreeMap, thread::sleep, time::Duration}; use tonic::transport::Channel; use zebra_chain::{block::Height, parameters::Network, transaction}; -use zebra_test::{ - mock_service::{MockService, PanicAssertion}, - net::random_known_port, -}; +use zebra_test::mock_service::{MockService, PanicAssertion}; use crate::{ scanner::{ @@ -26,11 +23,10 @@ pub const ZECPAGES_SAPLING_VIEWING_KEY: &str = "zxviews1q0duytgcqqqqpqre26wkl45g /// Test the gRPC methods with mocked responses #[tokio::test(flavor = "multi_thread")] -#[cfg(not(target_os = "windows"))] async fn test_grpc_methods_mocked() { let _init_guard = zebra_test::init(); - let (client, mock_scan_service) = start_server_and_get_client(random_known_port()).await; + let (client, mock_scan_service) = start_server_and_get_client().await; test_get_results_errors(client.clone()).await; test_register_keys_errors(client.clone()).await; @@ -231,9 +227,7 @@ async fn test_mocked_delete_keys_for_network( } /// Start the gRPC server, get a client and a mock service -async fn start_server_and_get_client( - random_port: u16, -) -> ( +async fn start_server_and_get_client() -> ( ScannerClient, MockService, ) { @@ -241,24 +235,19 @@ async fn start_server_and_get_client( let mock_scan_service = MockService::build().for_unit_tests(); // start the gRPC server - let listen_addr: std::net::SocketAddr = format!("127.0.0.1:{random_port}") + let listen_addr: std::net::SocketAddr = "127.0.0.1:0" .parse() .expect("hard-coded IP and u16 port should parse successfully"); - { - let mock_scan_service = mock_scan_service.clone(); - tokio::spawn(async move { - init(listen_addr, mock_scan_service) - .await - .expect("Possible port conflict"); - }); - } + let (_server_task, listen_addr) = init(listen_addr, mock_scan_service.clone()) + .await + .expect("Possible port conflict"); // wait for the server to start sleep(Duration::from_secs(1)); // connect to the gRPC server - let client = ScannerClient::connect(format!("http://127.0.0.1:{random_port}")) + let client = ScannerClient::connect(format!("http://{listen_addr}")) .await .expect("server should receive connection"); diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index f440d713211..b87068ef8f0 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -70,6 +70,9 @@ impl fmt::Debug for RpcServer { } } +/// The message to log when logging the RPC server's listen address +pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at "; + impl RpcServer { /// Start a new RPC server endpoint using the supplied configs and services. /// @@ -206,7 +209,7 @@ impl RpcServer { .start_http(&listen_addr) .expect("Unable to start RPC server"); - info!("Opened RPC endpoint at {}", server_instance.address()); + info!("{OPENED_RPC_ENDPOINT_MSG}{}", server_instance.address()); let close_handle = server_instance.close_handle(); diff --git a/zebra-rpc/src/server/tests/vectors.rs b/zebra-rpc/src/server/tests/vectors.rs index 26e0584777f..78b7bd81516 100644 --- a/zebra-rpc/src/server/tests/vectors.rs +++ b/zebra-rpc/src/server/tests/vectors.rs @@ -23,7 +23,6 @@ use super::super::*; /// Test that the JSON-RPC server spawns when configured with a single thread. #[test] -#[cfg(not(target_os = "windows"))] fn rpc_server_spawn_single_thread() { rpc_server_spawn(false) } @@ -42,9 +41,8 @@ fn rpc_server_spawn_parallel_threads() { fn rpc_server_spawn(parallel_cpu_threads: bool) { let _init_guard = zebra_test::init(); - let port = zebra_test::net::random_known_port(); let config = Config { - listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into()), parallel_cpu_threads: if parallel_cpu_threads { 2 } else { 1 }, debug_force_finished_sync: false, }; diff --git a/zebra-scan/src/init.rs b/zebra-scan/src/init.rs index b0bfb6ad50f..cc5238e70dd 100644 --- a/zebra-scan/src/init.rs +++ b/zebra-scan/src/init.rs @@ -35,7 +35,8 @@ pub async fn init_with_server( info!(?listen_addr, "starting scan gRPC server"); // Start the gRPC server. - zebra_grpc::server::init(listen_addr, scan_service).await?; + let (server_task, _listen_addr) = zebra_grpc::server::init(listen_addr, scan_service).await?; + server_task.await??; Ok(()) } diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index a502447386f..c2a78a8879a 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -169,6 +169,7 @@ use zebra_chain::{ use zebra_consensus::ParameterCheckpoint; use zebra_network::constants::PORT_IN_USE_ERROR; use zebra_node_services::rpc_client::RpcRequestClient; +use zebra_rpc::server::OPENED_RPC_ENDPOINT_MSG; use zebra_state::{constants::LOCK_FILE_ERROR, state_database_format_version_in_code}; use zebra_test::{ @@ -182,10 +183,10 @@ mod common; use common::{ check::{is_zebrad_version, EphemeralCheck, EphemeralConfig}, - config::random_known_rpc_port_config, config::{ config_file_full_path, configs_dir, default_test_config, external_address_test_config, - persistent_test_config, testdir, + os_assigned_rpc_port_config, persistent_test_config, random_known_rpc_port_config, + read_listen_addr_from_logs, testdir, }, launch::{ spawn_zebrad_for_rpc, spawn_zebrad_without_rpc, ZebradTestDirExt, BETWEEN_NODES_DELAY, @@ -1549,7 +1550,6 @@ async fn tracing_endpoint() -> Result<()> { /// Test that the JSON-RPC endpoint responds to a request, /// when configured with a single thread. #[tokio::test] -#[cfg(not(target_os = "windows"))] async fn rpc_endpoint_single_thread() -> Result<()> { rpc_endpoint(false).await } @@ -1557,7 +1557,6 @@ async fn rpc_endpoint_single_thread() -> Result<()> { /// Test that the JSON-RPC endpoint responds to a request, /// when configured with multiple threads. #[tokio::test] -#[cfg(not(target_os = "windows"))] async fn rpc_endpoint_parallel_threads() -> Result<()> { rpc_endpoint(true).await } @@ -1574,18 +1573,15 @@ async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> { // Write a configuration that has RPC listen_addr set // [Note on port conflict](#Note on port conflict) - let mut config = random_known_rpc_port_config(parallel_cpu_threads, &Mainnet)?; + let mut config = os_assigned_rpc_port_config(parallel_cpu_threads, &Mainnet)?; let dir = testdir()?.with_config(&mut config)?; let mut child = dir.spawn_child(args!["start"])?; // Wait until port is open. - child.expect_stdout_line_matches( - format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(), - )?; - + let rpc_address = read_listen_addr_from_logs(&mut child, OPENED_RPC_ENDPOINT_MSG)?; // Create an http client - let client = RpcRequestClient::new(config.rpc.listen_addr.unwrap()); + let client = RpcRequestClient::new(rpc_address); // Make the call to the `getinfo` RPC method let res = client.call("getinfo", "[]".to_string()).await?; @@ -1625,7 +1621,6 @@ async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> { /// /// https://zcash.github.io/rpc/getblockchaininfo.html #[tokio::test] -#[cfg(not(target_os = "windows"))] async fn rpc_endpoint_client_content_type() -> Result<()> { let _init_guard = zebra_test::init(); if zebra_test::net::zebra_skip_network_tests() { @@ -1640,12 +1635,10 @@ async fn rpc_endpoint_client_content_type() -> Result<()> { let mut child = dir.spawn_child(args!["start"])?; // Wait until port is open. - child.expect_stdout_line_matches( - format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(), - )?; + let rpc_address = read_listen_addr_from_logs(&mut child, OPENED_RPC_ENDPOINT_MSG)?; // Create an http client - let client = RpcRequestClient::new(config.rpc.listen_addr.unwrap()); + let client = RpcRequestClient::new(rpc_address); // Call to `getinfo` RPC method with a no content type. let res = client diff --git a/zebrad/tests/common/config.rs b/zebrad/tests/common/config.rs index 560e9d3338c..ed399e9a26d 100644 --- a/zebrad/tests/common/config.rs +++ b/zebrad/tests/common/config.rs @@ -16,7 +16,7 @@ use color_eyre::eyre::Result; use tempfile::TempDir; use zebra_chain::parameters::Network; -use zebra_test::net::random_known_port; +use zebra_test::{command::TestChild, net::random_known_port}; use zebrad::{ components::{mempool, sync, tracing}, config::ZebradConfig, @@ -152,6 +152,27 @@ pub fn random_known_rpc_port_config( ) -> Result { // [Note on port conflict](#Note on port conflict) let listen_port = random_known_port(); + rpc_port_config(listen_port, parallel_cpu_threads, network) +} + +/// Returns a `zebrad` config with an OS-assigned RPC port. +/// +/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores. +pub fn os_assigned_rpc_port_config( + parallel_cpu_threads: bool, + network: &Network, +) -> Result { + rpc_port_config(0, parallel_cpu_threads, network) +} + +/// Returns a `zebrad` config with the provided RPC port. +/// +/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores. +pub fn rpc_port_config( + listen_port: u16, + parallel_cpu_threads: bool, + network: &Network, +) -> Result { let listen_ip = "127.0.0.1".parse().expect("hard-coded IP is valid"); let zebra_rpc_listener = SocketAddr::new(listen_ip, listen_port); @@ -169,3 +190,15 @@ pub fn random_known_rpc_port_config( Ok(config) } + +/// Reads Zebra's RPC server listen address from a testchild's logs +pub fn read_listen_addr_from_logs( + child: &mut TestChild, + expected_msg: &str, +) -> Result { + let line = child.expect_stdout_line_matches(expected_msg)?; + let rpc_addr_position = + line.find(expected_msg).expect("already checked for match") + expected_msg.len(); + let rpc_addr = line[rpc_addr_position..].trim().to_string(); + Ok(rpc_addr.parse()?) +}