diff --git a/Cargo.toml b/Cargo.toml index 40dc342bf5..9f75698d18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ exclude = [ "testing/wasm-lightclient-tests", "signer/wasm-tests", "examples/wasm-example", - "examples/parachain-example" + "examples/parachain-example", ] resolver = "2" diff --git a/testing/integration-tests/Cargo.toml b/testing/integration-tests/Cargo.toml index 451a99a500..ecf485d57b 100644 --- a/testing/integration-tests/Cargo.toml +++ b/testing/integration-tests/Cargo.toml @@ -36,7 +36,7 @@ serde = { workspace = true } scale-info = { workspace = true, features = ["bit-vec"] } sp-core = { workspace = true } syn = { workspace = true } -subxt = { workspace = true, features = ["unstable-metadata", "native", "jsonrpsee", "substrate-compat"] } +subxt = { workspace = true, features = ["unstable-metadata", "native", "jsonrpsee", "substrate-compat", "unstable-reconnecting-rpc-client"] } subxt-signer = { workspace = true, features = ["default"] } subxt-codegen = { workspace = true } subxt-metadata = { workspace = true } diff --git a/testing/integration-tests/src/full_client/client/mod.rs b/testing/integration-tests/src/full_client/client/mod.rs index 2685551335..3e08a8055a 100644 --- a/testing/integration-tests/src/full_client/client/mod.rs +++ b/testing/integration-tests/src/full_client/client/mod.rs @@ -2,8 +2,10 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. +use std::collections::HashSet; + use crate::{ - subxt_test, test_context, + subxt_test, test_context, test_context_reconnecting_rpc_client, utils::{node_runtime, wait_for_blocks}, }; use codec::{Decode, Encode}; @@ -409,3 +411,41 @@ async fn partial_fee_estimate_correct() { // Both methods should yield the same fee assert_eq!(partial_fee_1, partial_fee_2); } + +#[subxt_test] +async fn legacy_and_unstable_block_subscription_reconnect() { + let ctx = test_context_reconnecting_rpc_client().await; + + let api = ctx.unstable_client().await; + + let unstable_client_blocks = move |num: usize| { + let api = api.clone(); + async move { + api.blocks() + .subscribe_finalized() + .await + .unwrap() + .take(num) + .map(|x| x.unwrap().hash().to_string()) + .collect::>() + .await + } + }; + + let blocks = unstable_client_blocks(3).await; + let blocks: HashSet = HashSet::from_iter(blocks.into_iter()); + + assert!(blocks.len() == 3); + + let ctx = ctx.restart().await; + + // Make client aware that connection was dropped and force them to reconnect + let _ = ctx.unstable_client().await.backend().genesis_hash().await; + + let unstable_blocks = unstable_client_blocks(6).await; + + let unstable_blocks: HashSet = HashSet::from_iter(unstable_blocks.into_iter()); + let intersection = unstable_blocks.intersection(&blocks).count(); + + assert!(intersection == 3); +} diff --git a/testing/integration-tests/src/utils/context.rs b/testing/integration-tests/src/utils/context.rs index e987763eb3..7542a0dac4 100644 --- a/testing/integration-tests/src/utils/context.rs +++ b/testing/integration-tests/src/utils/context.rs @@ -7,17 +7,20 @@ pub(crate) use crate::{node_runtime, utils::TestNodeProcess}; use subxt::client::OnlineClient; use subxt::SubstrateConfig; +use super::node_proc::RpcClientKind; + /// `substrate-node` should be installed on the $PATH. We fall back /// to also checking for an older `substrate` binary. const SUBSTRATE_NODE_PATHS: &str = "substrate-node,substrate"; -pub async fn test_context_with(authority: String) -> TestContext { +pub async fn test_context_with(authority: String, rpc_client_kind: RpcClientKind) -> TestContext { let paths = std::env::var("SUBSTRATE_NODE_PATH").unwrap_or_else(|_| SUBSTRATE_NODE_PATHS.to_string()); let paths: Vec<_> = paths.split(',').map(|p| p.trim()).collect(); let mut proc = TestContext::build(&paths); proc.with_authority(authority); + proc.with_rpc_client_kind(rpc_client_kind); proc.spawn::().await.unwrap() } @@ -28,5 +31,9 @@ pub type TestContext = TestNodeProcess; pub type TestClient = OnlineClient; pub async fn test_context() -> TestContext { - test_context_with("alice".to_string()).await + test_context_with("alice".to_string(), RpcClientKind::Legacy).await +} + +pub async fn test_context_reconnecting_rpc_client() -> TestContext { + test_context_with("alice".to_string(), RpcClientKind::UnstableReconnecting).await } diff --git a/testing/integration-tests/src/utils/node_proc.rs b/testing/integration-tests/src/utils/node_proc.rs index b2c0197d5f..96e2cbd22a 100644 --- a/testing/integration-tests/src/utils/node_proc.rs +++ b/testing/integration-tests/src/utils/node_proc.rs @@ -5,7 +5,9 @@ use std::cell::RefCell; use std::ffi::{OsStr, OsString}; use std::sync::Arc; +use std::time::Duration; use substrate_runner::SubstrateNode; +use subxt::backend::rpc::reconnecting_rpc_client::{ExponentialBackoff, RpcClientBuilder}; use subxt::{ backend::{legacy, rpc, unstable}, Config, OnlineClient, @@ -58,26 +60,29 @@ where TestNodeProcessBuilder::new(paths) } + pub async fn restart(mut self) -> Self { + tokio::task::spawn_blocking(move || { + if let Some(ref mut proc) = &mut self.proc { + proc.restart().unwrap(); + } + self + }) + .await + .expect("to succeed") + } + /// Hand back an RPC client connected to the test node which exposes the legacy RPC methods. pub async fn legacy_rpc_methods(&self) -> legacy::LegacyRpcMethods { - let rpc_client = self.rpc_client().await; + let rpc_client = self.rpc_client.clone(); legacy::LegacyRpcMethods::new(rpc_client) } /// Hand back an RPC client connected to the test node which exposes the unstable RPC methods. pub async fn unstable_rpc_methods(&self) -> unstable::UnstableRpcMethods { - let rpc_client = self.rpc_client().await; + let rpc_client = self.rpc_client.clone(); unstable::UnstableRpcMethods::new(rpc_client) } - /// Hand back an RPC client connected to the test node. - pub async fn rpc_client(&self) -> rpc::RpcClient { - let url = get_url(self.proc.as_ref().map(|p| p.ws_port())); - rpc::RpcClient::from_url(url) - .await - .expect("Unable to connect RPC client to test node") - } - /// Always return a client using the unstable backend. /// Only use for comparing backends; use [`TestNodeProcess::client()`] normally, /// which enables us to run each test against both backends. @@ -109,12 +114,24 @@ where pub fn client(&self) -> OnlineClient { self.client.clone() } + + /// Returns the rpc client connected to the node + pub fn rpc_client(&self) -> rpc::RpcClient { + self.rpc_client.clone() + } +} + +/// Kind of rpc client to use in tests +pub enum RpcClientKind { + Legacy, + UnstableReconnecting, } /// Construct a test node process. pub struct TestNodeProcessBuilder { node_paths: Vec, authority: Option, + rpc_client: RpcClientKind, } impl TestNodeProcessBuilder { @@ -132,9 +149,16 @@ impl TestNodeProcessBuilder { Self { node_paths: paths, authority: None, + rpc_client: RpcClientKind::Legacy, } } + /// Set the testRunner to use a preferred RpcClient impl, ie Legacy or Unstable + pub fn with_rpc_client_kind(&mut self, rpc_client_kind: RpcClientKind) -> &mut Self { + self.rpc_client = rpc_client_kind; + self + } + /// Set the authority dev account for a node in validator mode e.g. --alice. pub fn with_authority(&mut self, account: String) -> &mut Self { self.authority = Some(account); @@ -161,9 +185,11 @@ impl TestNodeProcessBuilder { }; let ws_url = get_url(proc.as_ref().map(|p| p.ws_port())); - let rpc_client = build_rpc_client(&ws_url) - .await - .map_err(|e| format!("Failed to connect to node at {ws_url}: {e}"))?; + let rpc_client = match self.rpc_client { + RpcClientKind::Legacy => build_rpc_client(&ws_url).await, + RpcClientKind::UnstableReconnecting => build_unstable_rpc_client(&ws_url).await, + } + .map_err(|e| format!("Failed to connect to node at {ws_url}: {e}"))?; // Cache whatever client we build, and None for the other. #[allow(unused_assignments, unused_mut)] @@ -206,6 +232,16 @@ async fn build_rpc_client(ws_url: &str) -> Result { Ok(rpc_client) } +async fn build_unstable_rpc_client(ws_url: &str) -> Result { + let client = RpcClientBuilder::new() + .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) + .build(ws_url.to_string()) + .await + .map_err(|e| format!("Cannot construct RPC client: {e}"))?; + + Ok(rpc::RpcClient::new(client)) +} + async fn build_legacy_client( rpc_client: rpc::RpcClient, ) -> Result, String> { diff --git a/testing/substrate-runner/src/lib.rs b/testing/substrate-runner/src/lib.rs index 790d3d6861..82d3ea6258 100644 --- a/testing/substrate-runner/src/lib.rs +++ b/testing/substrate-runner/src/lib.rs @@ -70,16 +70,27 @@ impl SubstrateNodeBuilder { } /// Spawn the node, handing back an object which, when dropped, will stop it. - pub fn spawn(self) -> Result { + pub fn spawn(mut self) -> Result { // Try to spawn the binary at each path, returning the // first "ok" or last error that we encountered. let mut res = Err(io::Error::new( io::ErrorKind::Other, "No binary path provided", )); + + let path = Command::new("mktemp") + .arg("-d") + .output() + .expect("failed to create base dir"); + let path = String::from_utf8(path.stdout).expect("bad path"); + let mut bin_path = OsString::new(); for binary_path in &self.binary_paths { + self.custom_flags + .insert("base-path".into(), Some(path.clone().into())); + res = SubstrateNodeBuilder::try_spawn(binary_path, &self.custom_flags); if res.is_ok() { + bin_path.clone_from(binary_path); break; } } @@ -98,10 +109,13 @@ impl SubstrateNodeBuilder { let p2p_port = p2p_port.ok_or(Error::CouldNotExtractP2pPort)?; Ok(SubstrateNode { + binary_path: bin_path, + custom_flags: self.custom_flags, proc, ws_port, p2p_address, p2p_port, + base_path: path, }) } @@ -131,10 +145,13 @@ impl SubstrateNodeBuilder { } pub struct SubstrateNode { + binary_path: OsString, + custom_flags: HashMap>, proc: process::Child, ws_port: u16, p2p_address: String, p2p_port: u32, + base_path: String, } impl SubstrateNode { @@ -167,11 +184,61 @@ impl SubstrateNode { pub fn kill(&mut self) -> std::io::Result<()> { self.proc.kill() } + + /// restart the node, handing back an object which, when dropped, will stop it. + pub fn restart(&mut self) -> Result<(), std::io::Error> { + let res: Result<(), io::Error> = self.kill(); + + match res { + Ok(_) => (), + Err(e) => { + self.cleanup(); + return Err(e); + } + } + + let proc = self.try_spawn()?; + + self.proc = proc; + // Wait for RPC port to be logged (it's logged to stderr). + + Ok(()) + } + + // Attempt to spawn a binary with the path/flags given. + fn try_spawn(&mut self) -> Result { + let mut cmd = Command::new(&self.binary_path); + + cmd.env("RUST_LOG", "info,libp2p_tcp=debug") + .stdout(process::Stdio::piped()) + .stderr(process::Stdio::piped()) + .arg("--dev"); + + for (key, val) in &self.custom_flags { + let arg = match val { + Some(val) => format!("--{key}={val}"), + None => format!("--{key}"), + }; + cmd.arg(arg); + } + + cmd.arg(format!("--rpc-port={}", self.ws_port)); + cmd.arg(format!("--port={}", self.p2p_port)); + cmd.spawn() + } + + fn cleanup(&self) { + let _ = Command::new("rm") + .args(["-rf", &self.base_path]) + .output() + .expect("success"); + } } impl Drop for SubstrateNode { fn drop(&mut self) { let _ = self.kill(); + self.cleanup() } } diff --git a/testing/test-runtime/Cargo.toml b/testing/test-runtime/Cargo.toml index b7b69f22fb..108f784eda 100644 --- a/testing/test-runtime/Cargo.toml +++ b/testing/test-runtime/Cargo.toml @@ -14,7 +14,10 @@ serde = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } tokio-util = { workspace = true, features = ["compat"] } which = { workspace = true } -jsonrpsee = { workspace = true, features = ["async-client", "client-ws-transport-tls"] } +jsonrpsee = { workspace = true, features = [ + "async-client", + "client-ws-transport-tls", +] } hex = { workspace = true } codec = { workspace = true }