diff --git a/Cargo.lock b/Cargo.lock index 5b2879e1ba9b6..a645ecec60d86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9275,6 +9275,7 @@ dependencies = [ "sc-chain-spec", "sc-client-api", "sc-transaction-pool-api", + "sc-utils", "serde", "serde_json", "sp-api", @@ -11954,7 +11955,7 @@ checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", "digest 0.10.6", - "rand 0.7.3", + "rand 0.8.5", "static_assertions", ] diff --git a/client/rpc-spec-v2/Cargo.toml b/client/rpc-spec-v2/Cargo.toml index 43fb189081bae..23b96877f3b17 100644 --- a/client/rpc-spec-v2/Cargo.toml +++ b/client/rpc-spec-v2/Cargo.toml @@ -43,4 +43,5 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime" sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" } sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } +sc-utils = { version = "4.0.0-dev", path = "../utils" } assert_matches = "1.3.0" diff --git a/client/rpc-spec-v2/src/chain_head/mod.rs b/client/rpc-spec-v2/src/chain_head/mod.rs index afa8d3b2189ae..1c489d323f195 100644 --- a/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/mod.rs @@ -22,6 +22,8 @@ //! //! Methods are prefixed by `chainHead`. +#[cfg(test)] +mod test_utils; #[cfg(test)] mod tests; diff --git a/client/rpc-spec-v2/src/chain_head/test_utils.rs b/client/rpc-spec-v2/src/chain_head/test_utils.rs new file mode 100644 index 0000000000000..ee563debb4502 --- /dev/null +++ b/client/rpc-spec-v2/src/chain_head/test_utils.rs @@ -0,0 +1,320 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use parking_lot::Mutex; +use sc_client_api::{ + execution_extensions::ExecutionExtensions, BlockBackend, BlockImportNotification, + BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, FinalityNotification, + FinalityNotifications, FinalizeSummary, ImportNotifications, KeysIter, PairsIter, StorageData, + StorageEventStream, StorageKey, StorageProvider, +}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; +use sp_api::{CallApiAt, CallApiAtParams, NumberFor, RuntimeVersion}; +use sp_blockchain::{BlockStatus, CachedHeaderMetadata, HeaderBackend, HeaderMetadata, Info}; +use sp_consensus::BlockOrigin; +use sp_runtime::{ + generic::SignedBlock, + traits::{Block as BlockT, Header as HeaderT}, + Justifications, +}; +use std::sync::Arc; +use substrate_test_runtime::{Block, Hash, Header}; + +pub struct ChainHeadMockClient { + client: Arc, + import_sinks: Mutex>>>, + finality_sinks: Mutex>>>, +} + +impl ChainHeadMockClient { + pub fn new(client: Arc) -> Self { + ChainHeadMockClient { + client, + import_sinks: Default::default(), + finality_sinks: Default::default(), + } + } + + pub async fn trigger_import_stream(&self, header: Header) { + // Ensure the client called the `import_notification_stream`. + while self.import_sinks.lock().is_empty() { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + + // Build the notification. + let (sink, _stream) = tracing_unbounded("test_sink", 100_000); + let notification = + BlockImportNotification::new(header.hash(), BlockOrigin::Own, header, true, None, sink); + + for sink in self.import_sinks.lock().iter_mut() { + sink.unbounded_send(notification.clone()).unwrap(); + } + } + + pub async fn trigger_finality_stream(&self, header: Header) { + // Ensure the client called the `finality_notification_stream`. + while self.finality_sinks.lock().is_empty() { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + + // Build the notification. + let (sink, _stream) = tracing_unbounded("test_sink", 100_000); + let summary = FinalizeSummary { + header: header.clone(), + finalized: vec![header.hash()], + stale_heads: vec![], + }; + let notification = FinalityNotification::from_summary(summary, sink); + + for sink in self.finality_sinks.lock().iter_mut() { + sink.unbounded_send(notification.clone()).unwrap(); + } + } +} + +// ChainHead calls `import_notification_stream` and `finality_notification_stream` in order to +// subscribe to block events. +impl BlockchainEvents for ChainHeadMockClient { + fn import_notification_stream(&self) -> ImportNotifications { + let (sink, stream) = tracing_unbounded("import_notification_stream", 1024); + self.import_sinks.lock().push(sink); + stream + } + + fn every_import_notification_stream(&self) -> ImportNotifications { + unimplemented!() + } + + fn finality_notification_stream(&self) -> FinalityNotifications { + let (sink, stream) = tracing_unbounded("finality_notification_stream", 1024); + self.finality_sinks.lock().push(sink); + stream + } + + fn storage_changes_notification_stream( + &self, + _filter_keys: Option<&[StorageKey]>, + _child_filter_keys: Option<&[(StorageKey, Option>)]>, + ) -> sp_blockchain::Result> { + unimplemented!() + } +} + +// The following implementations are imposed by the `chainHead` trait bounds. + +impl, Client: ExecutorProvider> + ExecutorProvider for ChainHeadMockClient +{ + type Executor = >::Executor; + + fn executor(&self) -> &Self::Executor { + self.client.executor() + } + + fn execution_extensions(&self) -> &ExecutionExtensions { + self.client.execution_extensions() + } +} + +impl< + BE: sc_client_api::backend::Backend + Send + Sync + 'static, + Block: BlockT, + Client: StorageProvider, + > StorageProvider for ChainHeadMockClient +{ + fn storage( + &self, + hash: Block::Hash, + key: &StorageKey, + ) -> sp_blockchain::Result> { + self.client.storage(hash, key) + } + + fn storage_hash( + &self, + hash: Block::Hash, + key: &StorageKey, + ) -> sp_blockchain::Result> { + self.client.storage_hash(hash, key) + } + + fn storage_keys( + &self, + hash: Block::Hash, + prefix: Option<&StorageKey>, + start_key: Option<&StorageKey>, + ) -> sp_blockchain::Result> { + self.client.storage_keys(hash, prefix, start_key) + } + + fn storage_pairs( + &self, + hash: ::Hash, + prefix: Option<&StorageKey>, + start_key: Option<&StorageKey>, + ) -> sp_blockchain::Result> { + self.client.storage_pairs(hash, prefix, start_key) + } + + fn child_storage( + &self, + hash: Block::Hash, + child_info: &ChildInfo, + key: &StorageKey, + ) -> sp_blockchain::Result> { + self.client.child_storage(hash, child_info, key) + } + + fn child_storage_keys( + &self, + hash: Block::Hash, + child_info: ChildInfo, + prefix: Option<&StorageKey>, + start_key: Option<&StorageKey>, + ) -> sp_blockchain::Result> { + self.client.child_storage_keys(hash, child_info, prefix, start_key) + } + + fn child_storage_hash( + &self, + hash: Block::Hash, + child_info: &ChildInfo, + key: &StorageKey, + ) -> sp_blockchain::Result> { + self.client.child_storage_hash(hash, child_info, key) + } +} + +impl> CallApiAt for ChainHeadMockClient { + type StateBackend = >::StateBackend; + + fn call_api_at( + &self, + params: CallApiAtParams>::StateBackend>, + ) -> Result, sp_api::ApiError> { + self.client.call_api_at(params) + } + + fn runtime_version_at(&self, hash: Block::Hash) -> Result { + self.client.runtime_version_at(hash) + } + + fn state_at(&self, at: Block::Hash) -> Result { + self.client.state_at(at) + } +} + +impl> BlockBackend + for ChainHeadMockClient +{ + fn block_body( + &self, + hash: Block::Hash, + ) -> sp_blockchain::Result::Extrinsic>>> { + self.client.block_body(hash) + } + + fn block(&self, hash: Block::Hash) -> sp_blockchain::Result>> { + self.client.block(hash) + } + + fn block_status(&self, hash: Block::Hash) -> sp_blockchain::Result { + self.client.block_status(hash) + } + + fn justifications(&self, hash: Block::Hash) -> sp_blockchain::Result> { + self.client.justifications(hash) + } + + fn block_hash(&self, number: NumberFor) -> sp_blockchain::Result> { + self.client.block_hash(number) + } + + fn indexed_transaction(&self, hash: Block::Hash) -> sp_blockchain::Result>> { + self.client.indexed_transaction(hash) + } + + fn has_indexed_transaction(&self, hash: Block::Hash) -> sp_blockchain::Result { + self.client.has_indexed_transaction(hash) + } + + fn block_indexed_body(&self, hash: Block::Hash) -> sp_blockchain::Result>>> { + self.client.block_indexed_body(hash) + } + fn requires_full_sync(&self) -> bool { + self.client.requires_full_sync() + } +} + +impl + Send + Sync> HeaderMetadata + for ChainHeadMockClient +{ + type Error = >::Error; + + fn header_metadata( + &self, + hash: Block::Hash, + ) -> Result, Self::Error> { + self.client.header_metadata(hash) + } + + fn insert_header_metadata( + &self, + hash: Block::Hash, + header_metadata: CachedHeaderMetadata, + ) { + self.client.insert_header_metadata(hash, header_metadata) + } + + fn remove_header_metadata(&self, hash: Block::Hash) { + self.client.remove_header_metadata(hash) + } +} + +impl + Send + Sync> HeaderBackend + for ChainHeadMockClient +{ + fn header( + &self, + hash: Block::Hash, + ) -> sp_blockchain::Result::Header>> { + self.client.header(hash) + } + + fn info(&self) -> Info { + self.client.info() + } + + fn status(&self, hash: Block::Hash) -> sc_client_api::blockchain::Result { + self.client.status(hash) + } + + fn number( + &self, + hash: Block::Hash, + ) -> sc_client_api::blockchain::Result::Header as HeaderT>::Number>> { + self.client.number(hash) + } + + fn hash( + &self, + number: <::Header as HeaderT>::Number, + ) -> sp_blockchain::Result> { + self.client.hash(number) + } +} diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index fcd906dcf5be0..1d5cb8da26305 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1,6 +1,9 @@ +use crate::chain_head::test_utils::ChainHeadMockClient; + use super::*; use assert_matches::assert_matches; use codec::{Decode, Encode}; +use futures::Future; use jsonrpsee::{ core::{error::Error, server::rpc_module::Subscription as RpcSubscription}, types::{error::CallError, EmptyServerParams as EmptyParams}, @@ -33,7 +36,7 @@ const CHILD_STORAGE_KEY: &[u8] = b"child"; const CHILD_VALUE: &[u8] = b"child value"; async fn get_next_event(sub: &mut RpcSubscription) -> T { - let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(1), sub.next()) + let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next()) .await .unwrap() .unwrap() @@ -41,6 +44,12 @@ async fn get_next_event(sub: &mut RpcSubscriptio event } +async fn run_with_timeout(future: F) -> ::Output { + tokio::time::timeout(std::time::Duration::from_secs(60 * 10), future) + .await + .unwrap() +} + async fn setup_api() -> ( Arc>, RpcModule>>, @@ -1317,3 +1326,95 @@ async fn follow_report_multiple_pruned_block() { }); assert_eq!(event, expected); } + +#[tokio::test] +async fn follow_finalized_before_new_block() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); + + let api = ChainHead::new( + client_mock.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + // Make sure the block is imported for it to be pinned. + let block_1 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_1_hash = block_1.header.hash(); + client.import(BlockOrigin::Own, block_1.clone()).await.unwrap(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + + // Trigger the `FinalizedNotification` for block 1 before the `BlockImportNotification`, and + // expect for the `chainHead` to generate `NewBlock`, `BestBlock` and `Finalized` events. + + // Trigger the Finalized notification before the NewBlock one. + run_with_timeout(client_mock.trigger_finality_stream(block_1.header.clone())).await; + + // Initialized must always be reported first. + let finalized_hash = client.info().finalized_hash; + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Initialized(Initialized { + finalized_block_hash: format!("{:?}", finalized_hash), + finalized_block_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + // Block 1 must be reported because we triggered the finalized notification. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_1_hash), + parent_block_hash: format!("{:?}", finalized_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_1_hash), + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![format!("{:?}", block_1_hash)], + pruned_block_hashes: vec![], + }); + assert_eq!(event, expected); + + let block_2 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_2_hash = block_2.header.hash(); + client.import(BlockOrigin::Own, block_2.clone()).await.unwrap(); + + // Triggering the `BlockImportNotification` notification for block 1 should have no effect + // on the notification because the events were handled by the `FinalizedNotification`. + // Also trigger the `BlockImportNotification` notification for block 2 to ensure + // `NewBlock and `BestBlock` events are generated. + + // Trigger NewBlock notification for block 1 and block 2. + run_with_timeout(client_mock.trigger_import_stream(block_1.header)).await; + run_with_timeout(client_mock.trigger_import_stream(block_2.header)).await; + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_2_hash), + parent_block_hash: format!("{:?}", block_1_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_2_hash), + }); + assert_eq!(event, expected); +}