Skip to content

Commit

Permalink
Integration tests for unstable-reconnecting-rpc-client (#1711)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
  • Loading branch information
pkhry and niklasad1 committed Aug 30, 2024
1 parent 6e01451 commit 9ea7b14
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ exclude = [
"testing/wasm-lightclient-tests",
"signer/wasm-tests",
"examples/wasm-example",
"examples/parachain-example"
"examples/parachain-example",
]
resolver = "2"

Expand Down
2 changes: 1 addition & 1 deletion testing/integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ serde = { workspace = true }
scale-info = { workspace = true, features = ["bit-vec"] }
sp-core = { workspace = true }
syn = { workspace = true }
subxt = { workspace = true, features = ["unstable-metadata", "native", "jsonrpsee", "substrate-compat"] }
subxt = { workspace = true, features = ["unstable-metadata", "native", "jsonrpsee", "substrate-compat", "unstable-reconnecting-rpc-client"] }
subxt-signer = { workspace = true, features = ["default"] }
subxt-codegen = { workspace = true }
subxt-metadata = { workspace = true }
Expand Down
42 changes: 41 additions & 1 deletion testing/integration-tests/src/full_client/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use std::collections::HashSet;

use crate::{
subxt_test, test_context,
subxt_test, test_context, test_context_reconnecting_rpc_client,
utils::{node_runtime, wait_for_blocks},
};
use codec::{Decode, Encode};
Expand Down Expand Up @@ -409,3 +411,41 @@ async fn partial_fee_estimate_correct() {
// Both methods should yield the same fee
assert_eq!(partial_fee_1, partial_fee_2);
}

#[subxt_test]
async fn legacy_and_unstable_block_subscription_reconnect() {
let ctx = test_context_reconnecting_rpc_client().await;

let api = ctx.unstable_client().await;

let unstable_client_blocks = move |num: usize| {
let api = api.clone();
async move {
api.blocks()
.subscribe_finalized()
.await
.unwrap()
.take(num)
.map(|x| x.unwrap().hash().to_string())
.collect::<Vec<String>>()
.await
}
};

let blocks = unstable_client_blocks(3).await;
let blocks: HashSet<String> = HashSet::from_iter(blocks.into_iter());

assert!(blocks.len() == 3);

let ctx = ctx.restart().await;

// Make client aware that connection was dropped and force them to reconnect
let _ = ctx.unstable_client().await.backend().genesis_hash().await;

let unstable_blocks = unstable_client_blocks(6).await;

let unstable_blocks: HashSet<String> = HashSet::from_iter(unstable_blocks.into_iter());
let intersection = unstable_blocks.intersection(&blocks).count();

assert!(intersection == 3);
}
11 changes: 9 additions & 2 deletions testing/integration-tests/src/utils/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@ pub(crate) use crate::{node_runtime, utils::TestNodeProcess};
use subxt::client::OnlineClient;
use subxt::SubstrateConfig;

use super::node_proc::RpcClientKind;

/// `substrate-node` should be installed on the $PATH. We fall back
/// to also checking for an older `substrate` binary.
const SUBSTRATE_NODE_PATHS: &str = "substrate-node,substrate";

pub async fn test_context_with(authority: String) -> TestContext {
pub async fn test_context_with(authority: String, rpc_client_kind: RpcClientKind) -> TestContext {
let paths =
std::env::var("SUBSTRATE_NODE_PATH").unwrap_or_else(|_| SUBSTRATE_NODE_PATHS.to_string());
let paths: Vec<_> = paths.split(',').map(|p| p.trim()).collect();

let mut proc = TestContext::build(&paths);
proc.with_authority(authority);
proc.with_rpc_client_kind(rpc_client_kind);
proc.spawn::<SubstrateConfig>().await.unwrap()
}

Expand All @@ -28,5 +31,9 @@ pub type TestContext = TestNodeProcess<SubstrateConfig>;
pub type TestClient = OnlineClient<SubstrateConfig>;

pub async fn test_context() -> TestContext {
test_context_with("alice".to_string()).await
test_context_with("alice".to_string(), RpcClientKind::Legacy).await
}

pub async fn test_context_reconnecting_rpc_client() -> TestContext {
test_context_with("alice".to_string(), RpcClientKind::UnstableReconnecting).await
}
62 changes: 49 additions & 13 deletions testing/integration-tests/src/utils/node_proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
use std::cell::RefCell;
use std::ffi::{OsStr, OsString};
use std::sync::Arc;
use std::time::Duration;
use substrate_runner::SubstrateNode;
use subxt::backend::rpc::reconnecting_rpc_client::{ExponentialBackoff, RpcClientBuilder};
use subxt::{
backend::{legacy, rpc, unstable},
Config, OnlineClient,
Expand Down Expand Up @@ -58,26 +60,29 @@ where
TestNodeProcessBuilder::new(paths)
}

pub async fn restart(mut self) -> Self {
tokio::task::spawn_blocking(move || {
if let Some(ref mut proc) = &mut self.proc {
proc.restart().unwrap();
}
self
})
.await
.expect("to succeed")
}

/// Hand back an RPC client connected to the test node which exposes the legacy RPC methods.
pub async fn legacy_rpc_methods(&self) -> legacy::LegacyRpcMethods<R> {
let rpc_client = self.rpc_client().await;
let rpc_client = self.rpc_client.clone();
legacy::LegacyRpcMethods::new(rpc_client)
}

/// Hand back an RPC client connected to the test node which exposes the unstable RPC methods.
pub async fn unstable_rpc_methods(&self) -> unstable::UnstableRpcMethods<R> {
let rpc_client = self.rpc_client().await;
let rpc_client = self.rpc_client.clone();
unstable::UnstableRpcMethods::new(rpc_client)
}

/// Hand back an RPC client connected to the test node.
pub async fn rpc_client(&self) -> rpc::RpcClient {
let url = get_url(self.proc.as_ref().map(|p| p.ws_port()));
rpc::RpcClient::from_url(url)
.await
.expect("Unable to connect RPC client to test node")
}

/// Always return a client using the unstable backend.
/// Only use for comparing backends; use [`TestNodeProcess::client()`] normally,
/// which enables us to run each test against both backends.
Expand Down Expand Up @@ -109,12 +114,24 @@ where
pub fn client(&self) -> OnlineClient<R> {
self.client.clone()
}

/// Returns the rpc client connected to the node
pub fn rpc_client(&self) -> rpc::RpcClient {
self.rpc_client.clone()
}
}

/// Kind of rpc client to use in tests
pub enum RpcClientKind {
Legacy,
UnstableReconnecting,
}

/// Construct a test node process.
pub struct TestNodeProcessBuilder {
node_paths: Vec<OsString>,
authority: Option<String>,
rpc_client: RpcClientKind,
}

impl TestNodeProcessBuilder {
Expand All @@ -132,9 +149,16 @@ impl TestNodeProcessBuilder {
Self {
node_paths: paths,
authority: None,
rpc_client: RpcClientKind::Legacy,
}
}

/// Set the testRunner to use a preferred RpcClient impl, ie Legacy or Unstable
pub fn with_rpc_client_kind(&mut self, rpc_client_kind: RpcClientKind) -> &mut Self {
self.rpc_client = rpc_client_kind;
self
}

/// Set the authority dev account for a node in validator mode e.g. --alice.
pub fn with_authority(&mut self, account: String) -> &mut Self {
self.authority = Some(account);
Expand All @@ -161,9 +185,11 @@ impl TestNodeProcessBuilder {
};

let ws_url = get_url(proc.as_ref().map(|p| p.ws_port()));
let rpc_client = build_rpc_client(&ws_url)
.await
.map_err(|e| format!("Failed to connect to node at {ws_url}: {e}"))?;
let rpc_client = match self.rpc_client {
RpcClientKind::Legacy => build_rpc_client(&ws_url).await,
RpcClientKind::UnstableReconnecting => build_unstable_rpc_client(&ws_url).await,
}
.map_err(|e| format!("Failed to connect to node at {ws_url}: {e}"))?;

// Cache whatever client we build, and None for the other.
#[allow(unused_assignments, unused_mut)]
Expand Down Expand Up @@ -206,6 +232,16 @@ async fn build_rpc_client(ws_url: &str) -> Result<rpc::RpcClient, String> {
Ok(rpc_client)
}

async fn build_unstable_rpc_client(ws_url: &str) -> Result<rpc::RpcClient, String> {
let client = RpcClientBuilder::new()
.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
.build(ws_url.to_string())
.await
.map_err(|e| format!("Cannot construct RPC client: {e}"))?;

Ok(rpc::RpcClient::new(client))
}

async fn build_legacy_client<T: Config>(
rpc_client: rpc::RpcClient,
) -> Result<OnlineClient<T>, String> {
Expand Down
69 changes: 68 additions & 1 deletion testing/substrate-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,27 @@ impl SubstrateNodeBuilder {
}

/// Spawn the node, handing back an object which, when dropped, will stop it.
pub fn spawn(self) -> Result<SubstrateNode, Error> {
pub fn spawn(mut self) -> Result<SubstrateNode, Error> {
// Try to spawn the binary at each path, returning the
// first "ok" or last error that we encountered.
let mut res = Err(io::Error::new(
io::ErrorKind::Other,
"No binary path provided",
));

let path = Command::new("mktemp")
.arg("-d")
.output()
.expect("failed to create base dir");
let path = String::from_utf8(path.stdout).expect("bad path");
let mut bin_path = OsString::new();
for binary_path in &self.binary_paths {
self.custom_flags
.insert("base-path".into(), Some(path.clone().into()));

res = SubstrateNodeBuilder::try_spawn(binary_path, &self.custom_flags);
if res.is_ok() {
bin_path.clone_from(binary_path);
break;
}
}
Expand All @@ -98,10 +109,13 @@ impl SubstrateNodeBuilder {
let p2p_port = p2p_port.ok_or(Error::CouldNotExtractP2pPort)?;

Ok(SubstrateNode {
binary_path: bin_path,
custom_flags: self.custom_flags,
proc,
ws_port,
p2p_address,
p2p_port,
base_path: path,
})
}

Expand Down Expand Up @@ -131,10 +145,13 @@ impl SubstrateNodeBuilder {
}

pub struct SubstrateNode {
binary_path: OsString,
custom_flags: HashMap<CowStr, Option<CowStr>>,
proc: process::Child,
ws_port: u16,
p2p_address: String,
p2p_port: u32,
base_path: String,
}

impl SubstrateNode {
Expand Down Expand Up @@ -167,11 +184,61 @@ impl SubstrateNode {
pub fn kill(&mut self) -> std::io::Result<()> {
self.proc.kill()
}

/// restart the node, handing back an object which, when dropped, will stop it.
pub fn restart(&mut self) -> Result<(), std::io::Error> {
let res: Result<(), io::Error> = self.kill();

match res {
Ok(_) => (),
Err(e) => {
self.cleanup();
return Err(e);
}
}

let proc = self.try_spawn()?;

self.proc = proc;
// Wait for RPC port to be logged (it's logged to stderr).

Ok(())
}

// Attempt to spawn a binary with the path/flags given.
fn try_spawn(&mut self) -> Result<Child, std::io::Error> {
let mut cmd = Command::new(&self.binary_path);

cmd.env("RUST_LOG", "info,libp2p_tcp=debug")
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.arg("--dev");

for (key, val) in &self.custom_flags {
let arg = match val {
Some(val) => format!("--{key}={val}"),
None => format!("--{key}"),
};
cmd.arg(arg);
}

cmd.arg(format!("--rpc-port={}", self.ws_port));
cmd.arg(format!("--port={}", self.p2p_port));
cmd.spawn()
}

fn cleanup(&self) {
let _ = Command::new("rm")
.args(["-rf", &self.base_path])
.output()
.expect("success");
}
}

impl Drop for SubstrateNode {
fn drop(&mut self) {
let _ = self.kill();
self.cleanup()
}
}

Expand Down
5 changes: 4 additions & 1 deletion testing/test-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ serde = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util = { workspace = true, features = ["compat"] }
which = { workspace = true }
jsonrpsee = { workspace = true, features = ["async-client", "client-ws-transport-tls"] }
jsonrpsee = { workspace = true, features = [
"async-client",
"client-ws-transport-tls",
] }
hex = { workspace = true }
codec = { workspace = true }

Expand Down

0 comments on commit 9ea7b14

Please sign in to comment.