diff --git a/Cargo.lock b/Cargo.lock index dbccf3e7d747..f7f1dcf606bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13454,6 +13454,7 @@ version = "1.0.0" dependencies = [ "assert_matches", "async-trait", + "bitvec", "clap 4.4.18", "clap-num", "color-eyre", @@ -13462,12 +13463,17 @@ dependencies = [ "futures", "futures-timer", "itertools 0.11.0", + "kvdb-memorydb", "log", "orchestra", "parity-scale-codec", "paste", + "polkadot-availability-bitfield-distribution", + "polkadot-availability-distribution", "polkadot-availability-recovery", "polkadot-erasure-coding", + "polkadot-node-core-av-store", + "polkadot-node-core-chain-api", "polkadot-node-metrics", "polkadot-node-network-protocol", "polkadot-node-primitives", @@ -13482,12 +13488,14 @@ dependencies = [ "pyroscope", "pyroscope_pprofrs", "rand", + "rand_distr", "sc-keystore", "sc-network", "sc-service", "serde", "serde_yaml", "sp-application-crypto", + "sp-consensus", "sp-core", "sp-keyring", "sp-keystore", @@ -17079,9 +17087,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.30" +version = "0.9.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38" +checksum = "3cc7a1570e38322cfe4154732e5110f887ea57e22b76f4bfd32b5bdd3368666c" dependencies = [ "indexmap 2.0.0", "itoa", @@ -20752,9 +20760,9 @@ dependencies = [ [[package]] name = "unsafe-libyaml" -version = "0.2.10" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" +checksum = "f28467d3e1d3c6586d8f25fa243f544f5800fec42d97032474e17222c2b75cfa" [[package]] name = "unsigned-varint" diff --git a/Cargo.toml b/Cargo.toml index 20cc16039fe4..1edc64217fdf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,6 +159,7 @@ members = [ "polkadot/node/gum/proc-macro", "polkadot/node/jaeger", "polkadot/node/malus", + "polkadot/node/subsystem-bench", "polkadot/node/metrics", "polkadot/node/network/approval-distribution", "polkadot/node/network/availability-distribution", diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml index 750f7a7e2f83..40702411d8b2 100644 --- a/polkadot/node/subsystem-bench/Cargo.toml +++ b/polkadot/node/subsystem-bench/Cargo.toml @@ -22,12 +22,16 @@ polkadot-node-subsystem-types = { path = "../subsystem-types" } polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } polkadot-node-network-protocol = { path = "../network/protocol" } -polkadot-availability-recovery = { path = "../network/availability-recovery", features = ["subsystem-benchmarks"] } +polkadot-availability-recovery = { path = "../network/availability-recovery", features=["subsystem-benchmarks"]} +polkadot-availability-distribution = { path = "../network/availability-distribution"} +polkadot-node-core-av-store = { path = "../core/av-store"} +polkadot-node-core-chain-api = { path = "../core/chain-api"} +polkadot-availability-bitfield-distribution = { path = "../network/bitfield-distribution"} color-eyre = { version = "0.6.1", default-features = false } -polkadot-overseer = { path = "../overseer" } +polkadot-overseer = { path = "../overseer" } colored = "2.0.4" assert_matches = "1.5" -async-trait = "0.1.74" +async-trait = "0.1.57" sp-keystore = { path = "../../../substrate/primitives/keystore" } sc-keystore = { path = "../../../substrate/client/keystore" } sp-core = { path = "../../../substrate/primitives/core" } @@ -39,7 +43,12 @@ polkadot-erasure-coding = { package = "polkadot-erasure-coding", path = "../../e log = "0.4.17" env_logger = "0.9.0" rand = "0.8.5" -parity-scale-codec = { version = "3.6.1", features = ["derive", "std"] } +# `rand` only supports uniform distribution, we need normal distribution for latency. +rand_distr = "0.4.3" +bitvec="1.0.1" +kvdb-memorydb = "0.13.0" + +parity-scale-codec = { version = "3.6.1", features = ["std", "derive"] } tokio = "1.24.2" clap-num = "1.0.2" polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } @@ -47,6 +56,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" } sp-application-crypto = { path = "../../../substrate/primitives/application-crypto" } sc-network = { path = "../../../substrate/client/network" } sc-service = { path = "../../../substrate/client/service" } +sp-consensus = { path = "../../../substrate/primitives/consensus/common" } polkadot-node-metrics = { path = "../metrics" } itertools = "0.11.0" polkadot-primitives-test-helpers = { path = "../../primitives/test-helpers" } diff --git a/polkadot/node/subsystem-bench/README.md b/polkadot/node/subsystem-bench/README.md index 1ff5b129e1e4..e090a0392cb7 100644 --- a/polkadot/node/subsystem-bench/README.md +++ b/polkadot/node/subsystem-bench/README.md @@ -1,6 +1,6 @@ # Subsystem benchmark client -Run parachain consensus stress and performance tests on your development machine. +Run parachain consensus stress and performance tests on your development machine. ## Motivation @@ -111,30 +111,28 @@ Commands: ``` Note: `test-sequence` is a special test objective that wraps up an arbitrary number of test objectives. It is tipically -used to run a suite of tests defined in a `yaml` file like in this [example](examples/availability_read.yaml). + used to run a suite of tests defined in a `yaml` file like in this [example](examples/availability_read.yaml). ### Standard test options - + ``` -Options: - --network The type of network to be emulated [default: ideal] [possible - values: ideal, healthy, degraded] - --n-cores Number of cores to fetch availability for [default: 100] - --n-validators Number of validators to fetch chunks from [default: 500] - --min-pov-size The minimum pov size in KiB [default: 5120] - --max-pov-size The maximum pov size bytes [default: 5120] --n, --num-blocks The number of blocks the test is going to run [default: 1] --p, --peer-bandwidth The bandwidth of simulated remote peers in KiB --b, --bandwidth The bandwidth of our simulated node in KiB - --peer-error Simulated conection error ratio [0-100] - --peer-min-latency Minimum remote peer latency in milliseconds [0-5000] - --peer-max-latency Maximum remote peer latency in milliseconds [0-5000] - --profile Enable CPU Profiling with Pyroscope - --pyroscope-url Pyroscope Server URL [default: http://localhost:4040] - --pyroscope-sample-rate Pyroscope Sample Rate [default: 113] - --cache-misses Enable Cache Misses Profiling with Valgrind. Linux only, Valgrind - must be in the PATH --h, --help Print help + --network The type of network to be emulated [default: ideal] [possible values: ideal, healthy, + degraded] + --n-cores Number of cores to fetch availability for [default: 100] + --n-validators Number of validators to fetch chunks from [default: 500] + --min-pov-size The minimum pov size in KiB [default: 5120] + --max-pov-size The maximum pov size bytes [default: 5120] + -n, --num-blocks The number of blocks the test is going to run [default: 1] + -p, --peer-bandwidth The bandwidth of emulated remote peers in KiB + -b, --bandwidth The bandwidth of our node in KiB + --connectivity Emulated peer connection ratio [0-100] + --peer-mean-latency Mean remote peer latency in milliseconds [0-5000] + --peer-latency-std-dev Remote peer latency standard deviation + --profile Enable CPU Profiling with Pyroscope + --pyroscope-url Pyroscope Server URL [default: http://localhost:4040] + --pyroscope-sample-rate Pyroscope Sample Rate [default: 113] + --cache-misses Enable Cache Misses Profiling with Valgrind. Linux only, Valgrind must be in the PATH + -h, --help Print help ``` These apply to all test objectives, except `test-sequence` which relies on the values being specified in a file. @@ -152,8 +150,8 @@ Benchmark availability recovery strategies Usage: subsystem-bench data-availability-read [OPTIONS] Options: - -f, --fetch-from-backers Turbo boost AD Read by fetching the full availability datafrom backers first. Saves CPU - as we don't need to re-construct from chunks. Tipically this is only faster if nodes + -f, --fetch-from-backers Turbo boost AD Read by fetching the full availability datafrom backers first. Saves CPU + as we don't need to re-construct from chunks. Tipically this is only faster if nodes have enough bandwidth -h, --help Print help ``` @@ -181,9 +179,9 @@ Let's run an availabilty read test which will recover availability for 10 cores node validator network. ``` - target/testnet/subsystem-bench --n-cores 10 data-availability-read -[2023-11-28T09:01:59Z INFO subsystem_bench::core::display] n_validators = 500, n_cores = 10, pov_size = 5120 - 5120, - error = 0, latency = None + target/testnet/subsystem-bench --n-cores 10 data-availability-read +[2023-11-28T09:01:59Z INFO subsystem_bench::core::display] n_validators = 500, n_cores = 10, pov_size = 5120 - 5120, + latency = None [2023-11-28T09:01:59Z INFO subsystem-bench::availability] Generating template candidate index=0 pov_size=5242880 [2023-11-28T09:01:59Z INFO subsystem-bench::availability] Created test environment. [2023-11-28T09:01:59Z INFO subsystem-bench::availability] Pre-generating 10 candidates. @@ -196,8 +194,8 @@ node validator network. [2023-11-28T09:02:07Z INFO subsystem_bench::availability] All blocks processed in 6001ms [2023-11-28T09:02:07Z INFO subsystem_bench::availability] Throughput: 51200 KiB/block [2023-11-28T09:02:07Z INFO subsystem_bench::availability] Block time: 6001 ms -[2023-11-28T09:02:07Z INFO subsystem_bench::availability] - +[2023-11-28T09:02:07Z INFO subsystem_bench::availability] + Total received from network: 66 MiB Total sent to network: 58 KiB Total subsystem CPU usage 4.16s diff --git a/polkadot/node/subsystem-bench/examples/availability_read.yaml b/polkadot/node/subsystem-bench/examples/availability_read.yaml index 311ea972141f..82355b0e2973 100644 --- a/polkadot/node/subsystem-bench/examples/availability_read.yaml +++ b/polkadot/node/subsystem-bench/examples/availability_read.yaml @@ -1,7 +1,7 @@ TestConfiguration: # Test 1 - objective: !DataAvailabilityRead - fetch_from_backers: false + fetch_from_backers: true n_validators: 300 n_cores: 20 min_pov_size: 5120 @@ -9,18 +9,14 @@ TestConfiguration: peer_bandwidth: 52428800 bandwidth: 52428800 latency: - min_latency: - secs: 0 - nanos: 1000000 - max_latency: - secs: 0 - nanos: 100000000 - error: 3 + mean_latency_ms: 100 + std_dev: 1 num_blocks: 3 + connectivity: 90 # Test 2 - objective: !DataAvailabilityRead - fetch_from_backers: false + fetch_from_backers: true n_validators: 500 n_cores: 20 min_pov_size: 5120 @@ -28,18 +24,14 @@ TestConfiguration: peer_bandwidth: 52428800 bandwidth: 52428800 latency: - min_latency: - secs: 0 - nanos: 1000000 - max_latency: - secs: 0 - nanos: 100000000 - error: 3 + mean_latency_ms: 100 + std_dev: 1 num_blocks: 3 + connectivity: 90 # Test 3 - objective: !DataAvailabilityRead - fetch_from_backers: false + fetch_from_backers: true n_validators: 1000 n_cores: 20 min_pov_size: 5120 @@ -47,11 +39,7 @@ TestConfiguration: peer_bandwidth: 52428800 bandwidth: 52428800 latency: - min_latency: - secs: 0 - nanos: 1000000 - max_latency: - secs: 0 - nanos: 100000000 - error: 3 + mean_latency_ms: 100 + std_dev: 1 num_blocks: 3 + connectivity: 90 diff --git a/polkadot/node/subsystem-bench/examples/availability_write.yaml b/polkadot/node/subsystem-bench/examples/availability_write.yaml new file mode 100644 index 000000000000..64e07d769692 --- /dev/null +++ b/polkadot/node/subsystem-bench/examples/availability_write.yaml @@ -0,0 +1,15 @@ +TestConfiguration: +# Test 1kV, 200 cores, max Pov +- objective: DataAvailabilityWrite + n_validators: 1000 + n_cores: 200 + max_validators_per_core: 5 + min_pov_size: 5120 + max_pov_size: 5120 + peer_bandwidth: 52428800 + bandwidth: 52428800 + latency: + mean_latency_ms: 30 + std_dev: 2.0 + connectivity: 75 + num_blocks: 3 diff --git a/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs b/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs new file mode 100644 index 000000000000..18ea2f72891f --- /dev/null +++ b/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs @@ -0,0 +1,57 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::*; + +use polkadot_node_metrics::metrics::Metrics; + +use polkadot_node_core_av_store::Config; +use polkadot_node_subsystem_util::database::Database; + +use polkadot_node_core_av_store::AvailabilityStoreSubsystem; + +mod columns { + pub const DATA: u32 = 0; + pub const META: u32 = 1; + pub const NUM_COLUMNS: u32 = 2; +} + +const TEST_CONFIG: Config = Config { col_data: columns::DATA, col_meta: columns::META }; + +struct DumbOracle; + +impl sp_consensus::SyncOracle for DumbOracle { + fn is_major_syncing(&self) -> bool { + false + } + + fn is_offline(&self) -> bool { + unimplemented!("oh no!") + } +} + +pub fn new_av_store(dependencies: &TestEnvironmentDependencies) -> AvailabilityStoreSubsystem { + let metrics = Metrics::try_register(&dependencies.registry).unwrap(); + + AvailabilityStoreSubsystem::new(test_store(), TEST_CONFIG, Box::new(DumbOracle), metrics) +} + +fn test_store() -> Arc { + let db = kvdb_memorydb::create(columns::NUM_COLUMNS); + let db = + polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[columns::META]); + Arc::new(db) +} diff --git a/polkadot/node/subsystem-bench/src/availability/mod.rs b/polkadot/node/subsystem-bench/src/availability/mod.rs index faedccdf3e42..3a42190e6e57 100644 --- a/polkadot/node/subsystem-bench/src/availability/mod.rs +++ b/polkadot/node/subsystem-bench/src/availability/mod.rs @@ -13,25 +13,41 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::{core::mock::ChainApiState, TestEnvironment}; +use av_store::NetworkAvailabilityState; +use bitvec::bitvec; +use colored::Colorize; use itertools::Itertools; -use std::{collections::HashMap, iter::Cycle, ops::Sub, sync::Arc, time::Instant}; - -use crate::TestEnvironment; +use polkadot_availability_bitfield_distribution::BitfieldDistribution; +use polkadot_node_core_av_store::AvailabilityStoreSubsystem; use polkadot_node_subsystem::{Overseer, OverseerConnector, SpawnGlue}; -use polkadot_node_subsystem_test_helpers::derive_erasure_chunks_with_proofs_and_root; +use polkadot_node_subsystem_types::{ + messages::{AvailabilityStoreMessage, NetworkBridgeEvent}, + Span, +}; use polkadot_overseer::Handle as OverseerHandle; -use sc_network::request_responses::ProtocolConfig; - -use colored::Colorize; +use sc_network::{request_responses::ProtocolConfig, PeerId}; +use sp_core::H256; +use std::{collections::HashMap, iter::Cycle, ops::Sub, sync::Arc, time::Instant}; +use av_store_helpers::new_av_store; use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; +use polkadot_availability_distribution::{ + AvailabilityDistributionSubsystem, IncomingRequestReceivers, +}; use polkadot_node_metrics::metrics::Metrics; use polkadot_availability_recovery::AvailabilityRecoverySubsystem; +use polkadot_node_primitives::{AvailableData, ErasureChunk}; use crate::GENESIS_HASH; use parity_scale_codec::Encode; -use polkadot_node_network_protocol::request_response::{IncomingRequest, ReqProtocolNames}; +use polkadot_node_network_protocol::{ + request_response::{v1::ChunkFetchingRequest, IncomingRequest, ReqProtocolNames}, + OurView, Versioned, VersionedValidationProtocol, +}; +use sc_network::request_responses::IncomingRequest as RawIncomingRequest; + use polkadot_node_primitives::{BlockData, PoV}; use polkadot_node_subsystem::messages::{AllMessages, AvailabilityRecoveryMessage}; @@ -39,8 +55,8 @@ use crate::core::{ environment::TestEnvironmentDependencies, mock::{ av_store, - network_bridge::{self, MockNetworkBridgeTx, NetworkAvailabilityState}, - runtime_api, MockAvailabilityStore, MockRuntimeApi, + network_bridge::{self, MockNetworkBridgeRx, MockNetworkBridgeTx}, + runtime_api, MockAvailabilityStore, MockChainApi, MockRuntimeApi, }, }; @@ -48,24 +64,26 @@ use super::core::{configuration::TestConfiguration, mock::dummy_builder, network const LOG_TARGET: &str = "subsystem-bench::availability"; -use polkadot_node_primitives::{AvailableData, ErasureChunk}; - use super::{cli::TestObjective, core::mock::AlwaysSupportsParachains}; -use polkadot_node_subsystem_test_helpers::mock::new_block_import_info; +use polkadot_node_subsystem_test_helpers::{ + derive_erasure_chunks_with_proofs_and_root, mock::new_block_import_info, +}; use polkadot_primitives::{ - CandidateHash, CandidateReceipt, GroupIndex, Hash, HeadData, PersistedValidationData, + AvailabilityBitfield, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HeadData, + Header, PersistedValidationData, Signed, SigningContext, ValidatorIndex, }; use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash}; use sc_service::SpawnTaskHandle; +mod av_store_helpers; mod cli; pub use cli::{DataAvailabilityReadOptions, NetworkEmulation}; -fn build_overseer( +fn build_overseer_for_availability_read( spawn_task_handle: SpawnTaskHandle, runtime_api: MockRuntimeApi, av_store: MockAvailabilityStore, - network_bridge: MockNetworkBridgeTx, + network_bridge: (MockNetworkBridgeTx, MockNetworkBridgeRx), availability_recovery: AvailabilityRecoverySubsystem, ) -> (Overseer, AlwaysSupportsParachains>, OverseerHandle) { let overseer_connector = OverseerConnector::with_event_capacity(64000); @@ -73,7 +91,8 @@ fn build_overseer( let builder = dummy .replace_runtime_api(|_| runtime_api) .replace_availability_store(|_| av_store) - .replace_network_bridge_tx(|_| network_bridge) + .replace_network_bridge_tx(|_| network_bridge.0) + .replace_network_bridge_rx(|_| network_bridge.1) .replace_availability_recovery(|_| availability_recovery); let (overseer, raw_handle) = @@ -82,11 +101,38 @@ fn build_overseer( (overseer, OverseerHandle::new(raw_handle)) } -/// Takes a test configuration and uses it to creates the `TestEnvironment`. +fn build_overseer_for_availability_write( + spawn_task_handle: SpawnTaskHandle, + runtime_api: MockRuntimeApi, + network_bridge: (MockNetworkBridgeTx, MockNetworkBridgeRx), + availability_distribution: AvailabilityDistributionSubsystem, + chain_api: MockChainApi, + availability_store: AvailabilityStoreSubsystem, + bitfield_distribution: BitfieldDistribution, +) -> (Overseer, AlwaysSupportsParachains>, OverseerHandle) { + let overseer_connector = OverseerConnector::with_event_capacity(64000); + let dummy = dummy_builder!(spawn_task_handle); + let builder = dummy + .replace_runtime_api(|_| runtime_api) + .replace_availability_store(|_| availability_store) + .replace_network_bridge_tx(|_| network_bridge.0) + .replace_network_bridge_rx(|_| network_bridge.1) + .replace_chain_api(|_| chain_api) + .replace_bitfield_distribution(|_| bitfield_distribution) + // This is needed to test own chunk recovery for `n_cores`. + .replace_availability_distribution(|_| availability_distribution); + + let (overseer, raw_handle) = + builder.build_with_connector(overseer_connector).expect("Should not fail"); + + (overseer, OverseerHandle::new(raw_handle)) +} + +/// Takes a test configuration and uses it to create the `TestEnvironment`. pub fn prepare_test( config: TestConfiguration, state: &mut TestState, -) -> (TestEnvironment, ProtocolConfig) { +) -> (TestEnvironment, Vec) { prepare_test_inner(config, state, TestEnvironmentDependencies::default()) } @@ -94,14 +140,38 @@ fn prepare_test_inner( config: TestConfiguration, state: &mut TestState, dependencies: TestEnvironmentDependencies, -) -> (TestEnvironment, ProtocolConfig) { +) -> (TestEnvironment, Vec) { // Generate test authorities. let test_authorities = config.generate_authorities(); - let runtime_api = runtime_api::MockRuntimeApi::new(config.clone(), test_authorities.clone()); + let mut candidate_hashes: HashMap> = HashMap::new(); - let av_store = - av_store::MockAvailabilityStore::new(state.chunks.clone(), state.candidate_hashes.clone()); + // Prepare per block candidates. + // Genesis block is always finalized, so we start at 1. + for block_num in 1..=config.num_blocks { + for _ in 0..config.n_cores { + candidate_hashes + .entry(Hash::repeat_byte(block_num as u8)) + .or_default() + .push(state.next_candidate().expect("Cycle iterator")) + } + + // First candidate is our backed candidate. + state.backed_candidates.push( + candidate_hashes + .get(&Hash::repeat_byte(block_num as u8)) + .expect("just inserted above") + .get(0) + .expect("just inserted above") + .clone(), + ); + } + + let runtime_api = runtime_api::MockRuntimeApi::new( + config.clone(), + test_authorities.clone(), + candidate_hashes, + ); let availability_state = NetworkAvailabilityState { candidate_hashes: state.candidate_hashes.clone(), @@ -109,45 +179,112 @@ fn prepare_test_inner( chunks: state.chunks.clone(), }; - let req_protocol_names = ReqProtocolNames::new(GENESIS_HASH, None); - let (collation_req_receiver, req_cfg) = - IncomingRequest::get_config_receiver(&req_protocol_names); + let mut req_cfgs = Vec::new(); - let network = - NetworkEmulator::new(&config, &dependencies, &test_authorities, req_protocol_names); + let (collation_req_receiver, collation_req_cfg) = + IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None)); + req_cfgs.push(collation_req_cfg); + + let (pov_req_receiver, pov_req_cfg) = + IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None)); + + let (chunk_req_receiver, chunk_req_cfg) = + IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None)); + req_cfgs.push(pov_req_cfg); + + let (network, network_interface, network_receiver) = + new_network(&config, &dependencies, &test_authorities, vec![Arc::new(availability_state)]); let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new( - config.clone(), - availability_state, network.clone(), + network_interface.subsystem_sender(), ); - let use_fast_path = match &state.config().objective { - TestObjective::DataAvailabilityRead(options) => options.fetch_from_backers, - _ => panic!("Unexpected objective"), - }; + let network_bridge_rx = + network_bridge::MockNetworkBridgeRx::new(network_receiver, Some(chunk_req_cfg.clone())); + + let (overseer, overseer_handle) = match &state.config().objective { + TestObjective::DataAvailabilityRead(options) => { + let use_fast_path = options.fetch_from_backers; + + let subsystem = if use_fast_path { + AvailabilityRecoverySubsystem::with_fast_path( + collation_req_receiver, + Metrics::try_register(&dependencies.registry).unwrap(), + ) + } else { + AvailabilityRecoverySubsystem::with_chunks_only( + collation_req_receiver, + Metrics::try_register(&dependencies.registry).unwrap(), + ) + }; - let subsystem = if use_fast_path { - AvailabilityRecoverySubsystem::with_fast_path( - collation_req_receiver, - Metrics::try_register(&dependencies.registry).unwrap(), - ) - } else { - AvailabilityRecoverySubsystem::with_chunks_only( - collation_req_receiver, - Metrics::try_register(&dependencies.registry).unwrap(), - ) - }; + // Use a mocked av-store. + let av_store = av_store::MockAvailabilityStore::new( + state.chunks.clone(), + state.candidate_hashes.clone(), + ); - let (overseer, overseer_handle) = build_overseer( - dependencies.task_manager.spawn_handle(), - runtime_api, - av_store, - network_bridge_tx, - subsystem, - ); + build_overseer_for_availability_read( + dependencies.task_manager.spawn_handle(), + runtime_api, + av_store, + (network_bridge_tx, network_bridge_rx), + subsystem, + ) + }, + TestObjective::DataAvailabilityWrite => { + let availability_distribution = AvailabilityDistributionSubsystem::new( + test_authorities.keyring.keystore(), + IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver }, + Metrics::try_register(&dependencies.registry).unwrap(), + ); + + let block_headers = (0..=config.num_blocks) + .map(|block_number| { + ( + Hash::repeat_byte(block_number as u8), + Header { + digest: Default::default(), + number: block_number as BlockNumber, + parent_hash: Default::default(), + extrinsics_root: Default::default(), + state_root: Default::default(), + }, + ) + }) + .collect::>(); + + let chain_api_state = ChainApiState { block_headers }; + let chain_api = MockChainApi::new(chain_api_state); + let bitfield_distribution = + BitfieldDistribution::new(Metrics::try_register(&dependencies.registry).unwrap()); + build_overseer_for_availability_write( + dependencies.task_manager.spawn_handle(), + runtime_api, + (network_bridge_tx, network_bridge_rx), + availability_distribution, + chain_api, + new_av_store(&dependencies), + bitfield_distribution, + ) + }, + _ => { + unimplemented!("Invalid test objective") + }, + }; - (TestEnvironment::new(dependencies, config, network, overseer, overseer_handle), req_cfg) + ( + TestEnvironment::new( + dependencies, + config, + network, + overseer, + overseer_handle, + test_authorities, + ), + req_cfgs, + ) } #[derive(Clone)] @@ -169,6 +306,8 @@ pub struct TestState { available_data: Vec, // Per candiadte index chunks chunks: Vec>, + // Per relay chain block - candidate backed by our backing group + backed_candidates: Vec, } impl TestState { @@ -255,24 +394,27 @@ impl TestState { candidate_receipt_templates.push(candidate_receipt); } - let pov_sizes = config.pov_sizes().to_owned(); - let pov_sizes = pov_sizes.into_iter().cycle(); gum::info!(target: LOG_TARGET, "{}","Created test environment.".bright_blue()); let mut _self = Self { - config, available_data, candidate_receipt_templates, chunks, pov_size_to_candidate, - pov_sizes, + pov_sizes: Vec::from(config.pov_sizes()).into_iter().cycle(), candidate_hashes: HashMap::new(), candidates: Vec::new().into_iter().cycle(), + backed_candidates: Vec::new(), + config, }; _self.generate_candidates(); _self } + + pub fn backed_candidates(&mut self) -> &mut Vec { + &mut self.backed_candidates + } } pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: TestState) { @@ -280,15 +422,15 @@ pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: T env.import_block(new_block_import_info(Hash::repeat_byte(1), 1)).await; - let start_marker = Instant::now(); + let test_start = Instant::now(); let mut batch = FuturesUnordered::new(); let mut availability_bytes = 0u128; env.metrics().set_n_validators(config.n_validators); env.metrics().set_n_cores(config.n_cores); - for block_num in 0..env.config().num_blocks { - gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num + 1, env.config().num_blocks); + for block_num in 1..=env.config().num_blocks { + gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num, env.config().num_blocks); env.metrics().set_current_block(block_num); let block_start_ts = Instant::now(); @@ -311,7 +453,7 @@ pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: T env.send_message(message).await; } - gum::info!("{}", format!("{} recoveries pending", batch.len()).bright_black()); + gum::info!(target: LOG_TARGET, "{}", format!("{} recoveries pending", batch.len()).bright_black()); while let Some(completed) = batch.next().await { let available_data = completed.unwrap().unwrap(); env.metrics().on_pov_size(available_data.encoded_size()); @@ -320,22 +462,199 @@ pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: T let block_time = Instant::now().sub(block_start_ts).as_millis() as u64; env.metrics().set_block_time(block_time); - gum::info!("All work for block completed in {}", format!("{:?}ms", block_time).cyan()); + gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{:?}ms", block_time).cyan()); } - let duration: u128 = start_marker.elapsed().as_millis(); + let duration: u128 = test_start.elapsed().as_millis(); let availability_bytes = availability_bytes / 1024; - gum::info!("All blocks processed in {}", format!("{:?}ms", duration).cyan()); - gum::info!( + gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{:?}ms", duration).cyan()); + gum::info!(target: LOG_TARGET, "Throughput: {}", format!("{} KiB/block", availability_bytes / env.config().num_blocks as u128).bright_red() ); - gum::info!( - "Block time: {}", - format!("{} ms", start_marker.elapsed().as_millis() / env.config().num_blocks as u128) - .red() + gum::info!(target: LOG_TARGET, + "Avg block time: {}", + format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red() ); - gum::info!("{}", &env); + env.display_network_usage(); + env.display_cpu_usage(&["availability-recovery"]); env.stop().await; } + +pub async fn benchmark_availability_write(env: &mut TestEnvironment, mut state: TestState) { + let config = env.config().clone(); + + env.metrics().set_n_validators(config.n_validators); + env.metrics().set_n_cores(config.n_cores); + + gum::info!(target: LOG_TARGET, "Seeding availability store with candidates ..."); + for backed_candidate in state.backed_candidates().clone() { + let candidate_index = *state.candidate_hashes.get(&backed_candidate.hash()).unwrap(); + let available_data = state.available_data[candidate_index].clone(); + let (tx, rx) = oneshot::channel(); + env.send_message(AllMessages::AvailabilityStore( + AvailabilityStoreMessage::StoreAvailableData { + candidate_hash: backed_candidate.hash(), + n_validators: config.n_validators as u32, + available_data, + expected_erasure_root: backed_candidate.descriptor().erasure_root, + tx, + }, + )) + .await; + + rx.await + .unwrap() + .expect("Test candidates are stored nicely in availability store"); + } + + gum::info!(target: LOG_TARGET, "Done"); + + let test_start = Instant::now(); + + for block_num in 1..=env.config().num_blocks { + gum::info!(target: LOG_TARGET, "Current block #{}", block_num); + env.metrics().set_current_block(block_num); + + let block_start_ts = Instant::now(); + let relay_block_hash = Hash::repeat_byte(block_num as u8); + env.import_block(new_block_import_info(relay_block_hash, block_num as BlockNumber)) + .await; + + // Inform bitfield distribution about our view of current test block + let message = polkadot_node_subsystem_types::messages::BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::OurViewChange(OurView::new(vec![(relay_block_hash, Arc::new(Span::Disabled))], 0)) + ); + env.send_message(AllMessages::BitfieldDistribution(message)).await; + + let chunk_fetch_start_ts = Instant::now(); + + // Request chunks of our own backed candidate from all other validators. + let mut receivers = Vec::new(); + for index in 1..config.n_validators { + let (pending_response, pending_response_receiver) = oneshot::channel(); + + let request = RawIncomingRequest { + peer: PeerId::random(), + payload: ChunkFetchingRequest { + candidate_hash: state.backed_candidates()[block_num - 1].hash(), + index: ValidatorIndex(index as u32), + } + .encode(), + pending_response, + }; + + let peer = env + .authorities() + .validator_authority_id + .get(index) + .expect("all validators have keys"); + + if env.network().is_peer_connected(peer) && + env.network().send_request_from_peer(peer, request).is_ok() + { + receivers.push(pending_response_receiver); + } + } + + gum::info!(target: LOG_TARGET, "Waiting for all emulated peers to receive their chunk from us ..."); + for receiver in receivers.into_iter() { + let response = receiver.await.expect("Chunk is always served succesfully"); + // TODO: check if chunk is the one the peer expects to receive. + assert!(response.result.is_ok()); + } + + let chunk_fetch_duration = Instant::now().sub(chunk_fetch_start_ts).as_millis(); + + gum::info!(target: LOG_TARGET, "All chunks received in {}ms", chunk_fetch_duration); + + let signing_context = SigningContext { session_index: 0, parent_hash: relay_block_hash }; + let network = env.network().clone(); + let authorities = env.authorities().clone(); + let n_validators = config.n_validators; + + // Spawn a task that will generate `n_validator` - 1 signed bitfiends and + // send them from the emulated peers to the subsystem. + // TODO: Implement topology. + env.spawn_blocking("send-bitfields", async move { + for index in 1..n_validators { + let validator_public = + authorities.validator_public.get(index).expect("All validator keys are known"); + + // Node has all the chunks in the world. + let payload: AvailabilityBitfield = + AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]); + // TODO(soon): Use pre-signed messages. This is quite intensive on the CPU. + let signed_bitfield = Signed::::sign( + &authorities.keyring.keystore(), + payload, + &signing_context, + ValidatorIndex(index as u32), + validator_public, + ) + .ok() + .flatten() + .expect("should be signed"); + + let from_peer = &authorities.validator_authority_id[index]; + + let message = peer_bitfield_message_v2(relay_block_hash, signed_bitfield); + + // Send the action from peer only if it is connected to our node. + if network.is_peer_connected(from_peer) { + let _ = network.send_message_from_peer(from_peer, message); + } + } + }); + + gum::info!( + "Waiting for {} bitfields to be received and processed", + config.connected_count() + ); + + // Wait for all bitfields to be processed. + env.wait_until_metric_eq( + "polkadot_parachain_received_availabilty_bitfields_total", + config.connected_count() * block_num, + ) + .await; + + gum::info!(target: LOG_TARGET, "All bitfields processed"); + + let block_time = Instant::now().sub(block_start_ts).as_millis() as u64; + env.metrics().set_block_time(block_time); + gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{:?}ms", block_time).cyan()); + } + + let duration: u128 = test_start.elapsed().as_millis(); + gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{:?}ms", duration).cyan()); + gum::info!(target: LOG_TARGET, + "Avg block time: {}", + format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red() + ); + + env.display_network_usage(); + + env.display_cpu_usage(&[ + "availability-distribution", + "bitfield-distribution", + "availability-store", + ]); + + env.stop().await; +} + +pub fn peer_bitfield_message_v2( + relay_hash: H256, + signed_bitfield: Signed, +) -> VersionedValidationProtocol { + let bitfield = polkadot_node_network_protocol::v2::BitfieldDistributionMessage::Bitfield( + relay_hash, + signed_bitfield.into(), + ); + + Versioned::V2(polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution( + bitfield, + )) +} diff --git a/polkadot/node/subsystem-bench/src/cli.rs b/polkadot/node/subsystem-bench/src/cli.rs index 3352f33a3503..7213713eb6ba 100644 --- a/polkadot/node/subsystem-bench/src/cli.rs +++ b/polkadot/node/subsystem-bench/src/cli.rs @@ -24,12 +24,14 @@ pub struct TestSequenceOptions { pub path: String, } -/// Define the supported benchmarks targets +/// Supported test objectives #[derive(Debug, Clone, clap::Parser, Serialize, Deserialize)] #[command(rename_all = "kebab-case")] pub enum TestObjective { /// Benchmark availability recovery strategies. DataAvailabilityRead(DataAvailabilityReadOptions), + /// Benchmark availability and bitfield distribution. + DataAvailabilityWrite, /// Run a test sequence specified in a file TestSequence(TestSequenceOptions), } diff --git a/polkadot/node/subsystem-bench/src/core/configuration.rs b/polkadot/node/subsystem-bench/src/core/configuration.rs index 164addb51900..66da8a1db45d 100644 --- a/polkadot/node/subsystem-bench/src/core/configuration.rs +++ b/polkadot/node/subsystem-bench/src/core/configuration.rs @@ -17,11 +17,13 @@ //! Test configuration definition and helpers. use super::*; use keyring::Keyring; -use std::{path::Path, time::Duration}; +use std::path::Path; pub use crate::cli::TestObjective; use polkadot_primitives::{AuthorityDiscoveryId, ValidatorId}; -use rand::{distributions::Uniform, prelude::Distribution, thread_rng}; +use rand::thread_rng; +use rand_distr::{Distribution, Normal, Uniform}; + use serde::{Deserialize, Serialize}; pub fn random_pov_size(min_pov_size: usize, max_pov_size: usize) -> usize { @@ -34,13 +36,13 @@ fn random_uniform_sample + From>(min_value: T, max_value: .into() } -/// Peer response latency configuration. +/// Peer networking latency configuration. #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct PeerLatency { - /// Min latency for `NetworkAction` completion. - pub min_latency: Duration, - /// Max latency or `NetworkAction` completion. - pub max_latency: Duration, + /// The mean latency(milliseconds) of the peers. + pub mean_latency_ms: usize, + /// The standard deviation + pub std_dev: f64, } // Default PoV size in KiB. @@ -58,6 +60,11 @@ fn default_connectivity() -> usize { 100 } +// Default backing group size +fn default_backing_group_size() -> usize { + 5 +} + /// The test input parameters #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TestConfiguration { @@ -67,6 +74,9 @@ pub struct TestConfiguration { pub n_validators: usize, /// Number of cores pub n_cores: usize, + /// Maximum backing group size + #[serde(default = "default_backing_group_size")] + pub max_validators_per_core: usize, /// The min PoV size #[serde(default = "default_pov_size")] pub min_pov_size: usize, @@ -82,12 +92,9 @@ pub struct TestConfiguration { /// The amount of bandiwdth our node has. #[serde(default = "default_bandwidth")] pub bandwidth: usize, - /// Optional peer emulation latency + /// Optional peer emulation latency (round trip time) wrt node under test #[serde(default)] pub latency: Option, - /// Error probability, applies to sending messages to the emulated network peers - #[serde(default)] - pub error: usize, /// Connectivity ratio, the percentage of peers we are not connected to, but ar part of /// the topology. #[serde(default = "default_connectivity")] @@ -129,7 +136,7 @@ impl TestSequence { /// Helper struct for authority related state. #[derive(Clone)] pub struct TestAuthorities { - pub keyrings: Vec, + pub keyring: Keyring, pub validator_public: Vec, pub validator_authority_id: Vec, } @@ -146,25 +153,27 @@ impl TestConfiguration { pub fn pov_sizes(&self) -> &[usize] { &self.pov_sizes } + /// Return the number of peers connected to our node. + pub fn connected_count(&self) -> usize { + ((self.n_validators - 1) as f64 / (100.0 / self.connectivity as f64)) as usize + } /// Generates the authority keys we need for the network emulation. pub fn generate_authorities(&self) -> TestAuthorities { - let keyrings = (0..self.n_validators) - .map(|peer_index| Keyring::new(format!("Node{}", peer_index))) + let keyring = Keyring::default(); + + let keys = (0..self.n_validators) + .map(|peer_index| keyring.sr25519_new(format!("Node{}", peer_index))) .collect::>(); // Generate `AuthorityDiscoveryId`` for each peer - let validator_public: Vec = keyrings - .iter() - .map(|keyring: &Keyring| keyring.clone().public().into()) - .collect::>(); + let validator_public: Vec = + keys.iter().map(|key| (*key).into()).collect::>(); - let validator_authority_id: Vec = keyrings - .iter() - .map(|keyring| keyring.clone().public().into()) - .collect::>(); + let validator_authority_id: Vec = + keys.iter().map(|key| (*key).into()).collect::>(); - TestAuthorities { keyrings, validator_public, validator_authority_id } + TestAuthorities { keyring, validator_public, validator_authority_id } } /// An unconstrained standard configuration matching Polkadot/Kusama @@ -180,12 +189,12 @@ impl TestConfiguration { objective, n_cores, n_validators, + max_validators_per_core: 5, pov_sizes: generate_pov_sizes(n_cores, min_pov_size, max_pov_size), bandwidth: 50 * 1024 * 1024, peer_bandwidth: 50 * 1024 * 1024, // No latency latency: None, - error: 0, num_blocks, min_pov_size, max_pov_size, @@ -205,14 +214,11 @@ impl TestConfiguration { objective, n_cores, n_validators, + max_validators_per_core: 5, pov_sizes: generate_pov_sizes(n_cores, min_pov_size, max_pov_size), bandwidth: 50 * 1024 * 1024, peer_bandwidth: 50 * 1024 * 1024, - latency: Some(PeerLatency { - min_latency: Duration::from_millis(1), - max_latency: Duration::from_millis(100), - }), - error: 3, + latency: Some(PeerLatency { mean_latency_ms: 50, std_dev: 12.5 }), num_blocks, min_pov_size, max_pov_size, @@ -232,14 +238,11 @@ impl TestConfiguration { objective, n_cores, n_validators, + max_validators_per_core: 5, pov_sizes: generate_pov_sizes(n_cores, min_pov_size, max_pov_size), bandwidth: 50 * 1024 * 1024, peer_bandwidth: 50 * 1024 * 1024, - latency: Some(PeerLatency { - min_latency: Duration::from_millis(10), - max_latency: Duration::from_millis(500), - }), - error: 33, + latency: Some(PeerLatency { mean_latency_ms: 150, std_dev: 40.0 }), num_blocks, min_pov_size, max_pov_size, @@ -248,15 +251,14 @@ impl TestConfiguration { } } -/// Produce a randomized duration between `min` and `max`. -pub fn random_latency(maybe_peer_latency: Option<&PeerLatency>) -> Option { - maybe_peer_latency.map(|peer_latency| { - Uniform::from(peer_latency.min_latency..=peer_latency.max_latency).sample(&mut thread_rng()) - }) -} - -/// Generate a random error based on `probability`. -/// `probability` should be a number between 0 and 100. -pub fn random_error(probability: usize) -> bool { - Uniform::from(0..=99).sample(&mut thread_rng()) < probability +/// Sample latency (in milliseconds) from a normal distribution with parameters +/// specified in `maybe_peer_latency`. +pub fn random_latency(maybe_peer_latency: Option<&PeerLatency>) -> usize { + maybe_peer_latency + .map(|latency_config| { + Normal::new(latency_config.mean_latency_ms as f64, latency_config.std_dev) + .expect("normal distribution parameters are good") + .sample(&mut thread_rng()) + }) + .unwrap_or(0.0) as usize } diff --git a/polkadot/node/subsystem-bench/src/core/display.rs b/polkadot/node/subsystem-bench/src/core/display.rs index d600cc484c14..bca82d7b90ae 100644 --- a/polkadot/node/subsystem-bench/src/core/display.rs +++ b/polkadot/node/subsystem-bench/src/core/display.rs @@ -180,12 +180,13 @@ pub fn parse_metrics(registry: &Registry) -> MetricCollection { pub fn display_configuration(test_config: &TestConfiguration) { gum::info!( - "{}, {}, {}, {}, {}", + "[{}] {}, {}, {}, {}, {}", + format!("objective = {:?}", test_config.objective).green(), format!("n_validators = {}", test_config.n_validators).blue(), format!("n_cores = {}", test_config.n_cores).blue(), format!("pov_size = {} - {}", test_config.min_pov_size, test_config.max_pov_size) .bright_black(), - format!("error = {}", test_config.error).bright_black(), + format!("connectivity = {}", test_config.connectivity).bright_black(), format!("latency = {:?}", test_config.latency).bright_black(), ); } diff --git a/polkadot/node/subsystem-bench/src/core/environment.rs b/polkadot/node/subsystem-bench/src/core/environment.rs index 247596474078..b6846316430b 100644 --- a/polkadot/node/subsystem-bench/src/core/environment.rs +++ b/polkadot/node/subsystem-bench/src/core/environment.rs @@ -15,12 +15,12 @@ // along with Polkadot. If not, see . //! Test environment implementation use crate::{ - core::{mock::AlwaysSupportsParachains, network::NetworkEmulator}, + core::{mock::AlwaysSupportsParachains, network::NetworkEmulatorHandle}, TestConfiguration, }; use colored::Colorize; use core::time::Duration; -use futures::FutureExt; +use futures::{Future, FutureExt}; use polkadot_overseer::{BlockInfo, Handle as OverseerHandle}; use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt}; @@ -29,15 +29,12 @@ use polkadot_node_subsystem_util::metrics::prometheus::{ self, Gauge, Histogram, PrometheusError, Registry, U64, }; -use sc_network::peer_store::LOG_TARGET; use sc_service::{SpawnTaskHandle, TaskManager}; -use std::{ - fmt::Display, - net::{Ipv4Addr, SocketAddr}, -}; +use std::net::{Ipv4Addr, SocketAddr}; use tokio::runtime::Handle; -const MIB: f64 = 1024.0 * 1024.0; +const LOG_TARGET: &str = "subsystem-bench::environment"; +use super::configuration::TestAuthorities; /// Test environment/configuration metrics #[derive(Clone)] @@ -56,9 +53,8 @@ pub struct TestEnvironmentMetrics { impl TestEnvironmentMetrics { pub fn new(registry: &Registry) -> Result { - let mut buckets = prometheus::exponential_buckets(16384.0, 2.0, 9) + let buckets = prometheus::exponential_buckets(16384.0, 2.0, 9) .expect("arguments are always valid; qed"); - buckets.extend(vec![5.0 * MIB, 6.0 * MIB, 7.0 * MIB, 8.0 * MIB, 9.0 * MIB, 10.0 * MIB]); Ok(Self { n_validators: prometheus::register( @@ -150,7 +146,7 @@ pub const GENESIS_HASH: Hash = Hash::repeat_byte(0xff); // We use this to bail out sending messages to the subsystem if it is overloaded such that // the time of flight is breaches 5s. // This should eventually be a test parameter. -const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000); +pub const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000); /// The test environment is the high level wrapper of all things required to test /// a certain subsystem. @@ -189,9 +185,11 @@ pub struct TestEnvironment { /// The test configuration. config: TestConfiguration, /// A handle to the network emulator. - network: NetworkEmulator, + network: NetworkEmulatorHandle, /// Configuration/env metrics metrics: TestEnvironmentMetrics, + /// Test authorities generated from the configuration. + authorities: TestAuthorities, } impl TestEnvironment { @@ -199,9 +197,10 @@ impl TestEnvironment { pub fn new( dependencies: TestEnvironmentDependencies, config: TestConfiguration, - network: NetworkEmulator, + network: NetworkEmulatorHandle, overseer: Overseer, AlwaysSupportsParachains>, overseer_handle: OverseerHandle, + authorities: TestAuthorities, ) -> Self { let metrics = TestEnvironmentMetrics::new(&dependencies.registry) .expect("Metrics need to be registered"); @@ -230,30 +229,62 @@ impl TestEnvironment { config, network, metrics, + authorities, } } + /// Returns the test configuration. pub fn config(&self) -> &TestConfiguration { &self.config } - pub fn network(&self) -> &NetworkEmulator { + /// Returns a reference to the inner network emulator handle. + pub fn network(&self) -> &NetworkEmulatorHandle { &self.network } + /// Returns the Prometheus registry. pub fn registry(&self) -> &Registry { &self.dependencies.registry } + /// Spawn a named task in the `test-environment` task group. + #[allow(unused)] + pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { + self.dependencies + .task_manager + .spawn_handle() + .spawn(name, "test-environment", task); + } + + /// Spawn a blocking named task in the `test-environment` task group. + pub fn spawn_blocking( + &self, + name: &'static str, + task: impl Future + Send + 'static, + ) { + self.dependencies.task_manager.spawn_handle().spawn_blocking( + name, + "test-environment", + task, + ); + } + /// Returns a reference to the test environment metrics instance pub fn metrics(&self) -> &TestEnvironmentMetrics { &self.metrics } + /// Returns a handle to the tokio runtime. pub fn runtime(&self) -> Handle { self.runtime_handle.clone() } - // Send a message to the subsystem under test environment. + /// Returns a reference to the authority keys used in the test. + pub fn authorities(&self) -> &TestAuthorities { + &self.authorities + } + + /// Send a message to the subsystem under test environment. pub async fn send_message(&mut self, msg: AllMessages) { self.overseer_handle .send_msg(msg, LOG_TARGET) @@ -264,7 +295,7 @@ impl TestEnvironment { }); } - // Send an `ActiveLeavesUpdate` signal to all subsystems under test. + /// Send an `ActiveLeavesUpdate` signal to all subsystems under test. pub async fn import_block(&mut self, block: BlockInfo) { self.overseer_handle .block_imported(block) @@ -275,59 +306,79 @@ impl TestEnvironment { }); } - // Stop overseer and subsystems. + /// Stop overseer and subsystems. pub async fn stop(&mut self) { self.overseer_handle.stop().await; } -} -impl Display for TestEnvironment { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let stats = self.network().stats(); - - writeln!(f, "\n")?; - writeln!( - f, - "Total received from network: {}", - format!( - "{} MiB", - stats - .iter() - .enumerate() - .map(|(_index, stats)| stats.tx_bytes_total as u128) - .sum::() / (1024 * 1024) - ) - .cyan() - )?; - writeln!( - f, - "Total sent to network: {}", - format!("{} KiB", stats[0].tx_bytes_total / (1024)).cyan() - )?; + /// Blocks until `metric_name` == `value` + pub async fn wait_until_metric_eq(&self, metric_name: &str, value: usize) { + let value = value as f64; + loop { + let test_metrics = super::display::parse_metrics(self.registry()); + let current_value = test_metrics.sum_by(metric_name); + gum::debug!(target: LOG_TARGET, metric_name, current_value, value, "Waiting for metric"); + if current_value == value { + break + } + + // Check value every 50ms. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + } + + /// Display network usage stats. + pub fn display_network_usage(&self) { + let stats = self.network().peer_stats(0); + + let total_node_received = stats.received() / 1024; + let total_node_sent = stats.sent() / 1024; + + println!( + "\nPayload bytes received from peers: {}, {}", + format!("{:.2} KiB total", total_node_received).blue(), + format!("{:.2} KiB/block", total_node_received / self.config().num_blocks) + .bright_blue() + ); + + println!( + "Payload bytes sent to peers: {}, {}", + format!("{:.2} KiB total", total_node_sent).blue(), + format!("{:.2} KiB/block", total_node_sent / self.config().num_blocks).bright_blue() + ); + } + + /// Print CPU usage stats in the CLI. + pub fn display_cpu_usage(&self, subsystems_under_test: &[&str]) { let test_metrics = super::display::parse_metrics(self.registry()); - let subsystem_cpu_metrics = - test_metrics.subset_with_label_value("task_group", "availability-recovery"); - let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum"); - writeln!(f, "Total subsystem CPU usage {}", format!("{:.2}s", total_cpu).bright_purple())?; - writeln!( - f, - "CPU usage per block {}", - format!("{:.2}s", total_cpu / self.config().num_blocks as f64).bright_purple() - )?; + + for subsystem in subsystems_under_test.iter() { + let subsystem_cpu_metrics = + test_metrics.subset_with_label_value("task_group", subsystem); + let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum"); + println!( + "{} CPU usage {}", + subsystem.to_string().bright_green(), + format!("{:.3}s", total_cpu).bright_purple() + ); + println!( + "{} CPU usage per block {}", + subsystem.to_string().bright_green(), + format!("{:.3}s", total_cpu / self.config().num_blocks as f64).bright_purple() + ); + } let test_env_cpu_metrics = test_metrics.subset_with_label_value("task_group", "test-environment"); let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum"); - writeln!( - f, + println!( "Total test environment CPU usage {}", - format!("{:.2}s", total_cpu).bright_purple() - )?; - writeln!( - f, - "CPU usage per block {}", - format!("{:.2}s", total_cpu / self.config().num_blocks as f64).bright_purple() + format!("{:.3}s", total_cpu).bright_purple() + ); + println!( + "Test environment CPU usage per block {}", + format!("{:.3}s", total_cpu / self.config().num_blocks as f64).bright_purple() ) } } diff --git a/polkadot/node/subsystem-bench/src/core/keyring.rs b/polkadot/node/subsystem-bench/src/core/keyring.rs index 68e78069a918..6cf031f5712f 100644 --- a/polkadot/node/subsystem-bench/src/core/keyring.rs +++ b/polkadot/node/subsystem-bench/src/core/keyring.rs @@ -14,26 +14,34 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use sp_core::{ - sr25519::{Pair, Public}, - Pair as PairT, -}; -/// Set of test accounts. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +use polkadot_primitives::ValidatorId; +use sc_keystore::LocalKeystore; +use sp_application_crypto::AppCrypto; +pub use sp_core::sr25519; +use sp_core::sr25519::Public; +use sp_keystore::Keystore; +use std::sync::Arc; + +/// Set of test accounts generated and kept safe by a keystore. +#[derive(Clone)] pub struct Keyring { - name: String, + keystore: Arc, } -impl Keyring { - pub fn new(name: String) -> Keyring { - Self { name } +impl Default for Keyring { + fn default() -> Self { + Self { keystore: Arc::new(LocalKeystore::in_memory()) } } +} - pub fn pair(self) -> Pair { - Pair::from_string(&format!("//{}", self.name), None).expect("input is always good; qed") +impl Keyring { + pub fn sr25519_new(&self, name: String) -> Public { + self.keystore + .sr25519_generate_new(ValidatorId::ID, Some(&format!("//{}", name))) + .expect("Insert key into keystore") } - pub fn public(self) -> Public { - self.pair().public() + pub fn keystore(&self) -> Arc { + self.keystore.clone() } } diff --git a/polkadot/node/subsystem-bench/src/core/mock/av_store.rs b/polkadot/node/subsystem-bench/src/core/mock/av_store.rs index a471230f1b3f..76609ab5dba6 100644 --- a/polkadot/node/subsystem-bench/src/core/mock/av_store.rs +++ b/polkadot/node/subsystem-bench/src/core/mock/av_store.rs @@ -17,13 +17,18 @@ //! A generic av store subsystem mockup suitable to be used in benchmarks. use parity_scale_codec::Encode; +use polkadot_node_network_protocol::request_response::{ + v1::{AvailableDataFetchingResponse, ChunkFetchingResponse, ChunkResponse}, + Requests, +}; use polkadot_primitives::CandidateHash; +use sc_network::ProtocolName; use std::collections::HashMap; use futures::{channel::oneshot, FutureExt}; -use polkadot_node_primitives::ErasureChunk; +use polkadot_node_primitives::{AvailableData, ErasureChunk}; use polkadot_node_subsystem::{ messages::AvailabilityStoreMessage, overseer, SpawnedSubsystem, SubsystemError, @@ -31,6 +36,8 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_types::OverseerSignal; +use crate::core::network::{HandleNetworkMessage, NetworkMessage}; + pub struct AvailabilityStoreState { candidate_hashes: HashMap, chunks: Vec>, @@ -38,6 +45,75 @@ pub struct AvailabilityStoreState { const LOG_TARGET: &str = "subsystem-bench::av-store-mock"; +/// Mockup helper. Contains Ccunks and full availability data of all parachain blocks +/// used in a test. +pub struct NetworkAvailabilityState { + pub candidate_hashes: HashMap, + pub available_data: Vec, + pub chunks: Vec>, +} + +// Implement access to the state. +impl HandleNetworkMessage for NetworkAvailabilityState { + fn handle( + &self, + message: NetworkMessage, + _node_sender: &mut futures::channel::mpsc::UnboundedSender, + ) -> Option { + match message { + NetworkMessage::RequestFromNode(peer, request) => match request { + Requests::ChunkFetchingV1(outgoing_request) => { + gum::debug!(target: LOG_TARGET, request = ?outgoing_request, "Received `RequestFromNode`"); + let validator_index: usize = outgoing_request.payload.index.0 as usize; + let candidate_hash = outgoing_request.payload.candidate_hash; + + let candidate_index = self + .candidate_hashes + .get(&candidate_hash) + .expect("candidate was generated previously; qed"); + gum::warn!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index"); + + let chunk: ChunkResponse = + self.chunks.get(*candidate_index).unwrap()[validator_index].clone().into(); + let response = Ok(( + ChunkFetchingResponse::from(Some(chunk)).encode(), + ProtocolName::Static("dummy"), + )); + + if let Err(err) = outgoing_request.pending_response.send(response) { + gum::error!(target: LOG_TARGET, ?err, "Failed to send `ChunkFetchingResponse`"); + } + + None + }, + Requests::AvailableDataFetchingV1(outgoing_request) => { + let candidate_hash = outgoing_request.payload.candidate_hash; + let candidate_index = self + .candidate_hashes + .get(&candidate_hash) + .expect("candidate was generated previously; qed"); + gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index"); + + let available_data = self.available_data.get(*candidate_index).unwrap().clone(); + + let response = Ok(( + AvailableDataFetchingResponse::from(Some(available_data)).encode(), + ProtocolName::Static("dummy"), + )); + outgoing_request + .pending_response + .send(response) + .expect("Response is always sent succesfully"); + None + }, + _ => Some(NetworkMessage::RequestFromNode(peer, request)), + }, + + message => Some(message), + } + } +} + /// A mock of the availability store subsystem. This one also generates all the /// candidates that a pub struct MockAvailabilityStore { @@ -127,6 +203,10 @@ impl MockAvailabilityStore { self.state.chunks.get(*candidate_index).unwrap()[0].encoded_size(); let _ = tx.send(Some(chunk_size)); }, + AvailabilityStoreMessage::StoreChunk { candidate_hash, chunk, tx } => { + gum::debug!(target: LOG_TARGET, chunk_index = ?chunk.index ,candidate_hash = ?candidate_hash, "Responding to StoreChunk"); + let _ = tx.send(Ok(())); + }, _ => { unimplemented!("Unexpected av-store message") }, diff --git a/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs b/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs new file mode 100644 index 000000000000..008d8eef106a --- /dev/null +++ b/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs @@ -0,0 +1,92 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . +//! +//! A generic runtime api subsystem mockup suitable to be used in benchmarks. + +use polkadot_primitives::Header; + +use polkadot_node_subsystem::{ + messages::ChainApiMessage, overseer, SpawnedSubsystem, SubsystemError, +}; +use polkadot_node_subsystem_types::OverseerSignal; +use sp_core::H256; +use std::collections::HashMap; + +use futures::FutureExt; + +const LOG_TARGET: &str = "subsystem-bench::chain-api-mock"; + +/// State used to respond to `BlockHeader` requests. +pub struct ChainApiState { + pub block_headers: HashMap, +} + +pub struct MockChainApi { + state: ChainApiState, +} + +impl MockChainApi { + pub fn new(state: ChainApiState) -> MockChainApi { + Self { state } + } +} + +#[overseer::subsystem(ChainApi, error=SubsystemError, prefix=self::overseer)] +impl MockChainApi { + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = self.run(ctx).map(|_| Ok(())).boxed(); + + SpawnedSubsystem { name: "test-environment", future } + } +} + +#[overseer::contextbounds(ChainApi, prefix = self::overseer)] +impl MockChainApi { + async fn run(self, mut ctx: Context) { + loop { + let msg = ctx.recv().await.expect("Overseer never fails us"); + + match msg { + orchestra::FromOrchestra::Signal(signal) => + if signal == OverseerSignal::Conclude { + return + }, + orchestra::FromOrchestra::Communication { msg } => { + gum::debug!(target: LOG_TARGET, msg=?msg, "recv message"); + + match msg { + ChainApiMessage::BlockHeader(hash, response_channel) => { + let _ = response_channel.send(Ok(Some( + self.state + .block_headers + .get(&hash) + .cloned() + .expect("Relay chain block hashes are known"), + ))); + }, + ChainApiMessage::Ancestors { hash: _hash, k: _k, response_channel } => { + // For our purposes, no ancestors is fine. + let _ = response_channel.send(Ok(Vec::new())); + }, + _ => { + unimplemented!("Unexpected chain-api message") + }, + } + }, + } + } + } +} diff --git a/polkadot/node/subsystem-bench/src/core/mock/dummy.rs b/polkadot/node/subsystem-bench/src/core/mock/dummy.rs index 0628368a49c0..a0a908750c51 100644 --- a/polkadot/node/subsystem-bench/src/core/mock/dummy.rs +++ b/polkadot/node/subsystem-bench/src/core/mock/dummy.rs @@ -73,6 +73,7 @@ macro_rules! mock { }; } +// Generate dummy implementation for all subsystems mock!(AvailabilityStore); mock!(StatementDistribution); mock!(BitfieldSigning); diff --git a/polkadot/node/subsystem-bench/src/core/mock/mod.rs b/polkadot/node/subsystem-bench/src/core/mock/mod.rs index 76fd581c3fb6..2bcc0c08c57b 100644 --- a/polkadot/node/subsystem-bench/src/core/mock/mod.rs +++ b/polkadot/node/subsystem-bench/src/core/mock/mod.rs @@ -18,11 +18,14 @@ use polkadot_node_subsystem::HeadSupportsParachains; use polkadot_node_subsystem_types::Hash; pub mod av_store; +pub mod chain_api; pub mod dummy; pub mod network_bridge; pub mod runtime_api; pub use av_store::*; +pub use chain_api::*; +pub use network_bridge::*; pub use runtime_api::*; pub struct AlwaysSupportsParachains {} diff --git a/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs b/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs index 5d534e37c991..a2be853ef8d5 100644 --- a/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs +++ b/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs @@ -14,289 +14,89 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! -//! A generic av store subsystem mockup suitable to be used in benchmarks. - -use futures::Future; -use parity_scale_codec::Encode; -use polkadot_node_subsystem_types::OverseerSignal; -use std::{collections::HashMap, pin::Pin}; - -use futures::FutureExt; - -use polkadot_node_primitives::{AvailableData, ErasureChunk}; +//! Mocked `network-bridge` subsystems that uses a `NetworkInterface` to access +//! the emulated network. +use futures::{channel::mpsc::UnboundedSender, FutureExt, StreamExt}; +use polkadot_node_subsystem_types::{ + messages::{BitfieldDistributionMessage, NetworkBridgeEvent}, + OverseerSignal, +}; -use polkadot_primitives::CandidateHash; -use sc_network::{OutboundFailure, RequestFailure}; +use sc_network::{request_responses::ProtocolConfig, PeerId, RequestFailure}; use polkadot_node_subsystem::{ messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, SubsystemError, }; -use polkadot_node_network_protocol::request_response::{ - self as req_res, - v1::{AvailableDataFetchingRequest, ChunkFetchingRequest, ChunkResponse}, - IsRequest, Requests, -}; -use polkadot_primitives::AuthorityDiscoveryId; +use polkadot_node_network_protocol::Versioned; -use crate::core::{ - configuration::{random_error, random_latency, TestConfiguration}, - network::{NetworkAction, NetworkEmulator, RateLimit}, +use crate::core::network::{ + NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt, }; -/// The availability store state of all emulated peers. -/// The network bridge tx mock will respond to requests as if the request is being serviced -/// by a remote peer on the network -pub struct NetworkAvailabilityState { - pub candidate_hashes: HashMap, - pub available_data: Vec, - pub chunks: Vec>, -} - -const LOG_TARGET: &str = "subsystem-bench::network-bridge-tx-mock"; +const LOG_TARGET: &str = "subsystem-bench::network-bridge"; +const CHUNK_REQ_PROTOCOL_NAME_V1: &str = + "/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_chunk/1"; /// A mock of the network bridge tx subsystem. pub struct MockNetworkBridgeTx { - /// The test configurationg - config: TestConfiguration, - /// The network availability state - availabilty: NetworkAvailabilityState, - /// A network emulator instance - network: NetworkEmulator, + /// A network emulator handle + network: NetworkEmulatorHandle, + /// A channel to the network interface, + to_network_interface: UnboundedSender, +} + +/// A mock of the network bridge tx subsystem. +pub struct MockNetworkBridgeRx { + /// A network interface receiver + network_receiver: NetworkInterfaceReceiver, + /// Chunk request sender + chunk_request_sender: Option, } impl MockNetworkBridgeTx { pub fn new( - config: TestConfiguration, - availabilty: NetworkAvailabilityState, - network: NetworkEmulator, + network: NetworkEmulatorHandle, + to_network_interface: UnboundedSender, ) -> MockNetworkBridgeTx { - Self { config, availabilty, network } + Self { network, to_network_interface } } +} - fn not_connected_response( - &self, - authority_discovery_id: &AuthorityDiscoveryId, - future: Pin + Send>>, - ) -> NetworkAction { - // The network action will send the error after a random delay expires. - return NetworkAction::new( - authority_discovery_id.clone(), - future, - 0, - // Generate a random latency based on configuration. - random_latency(self.config.latency.as_ref()), - ) +impl MockNetworkBridgeRx { + pub fn new( + network_receiver: NetworkInterfaceReceiver, + chunk_request_sender: Option, + ) -> MockNetworkBridgeRx { + Self { network_receiver, chunk_request_sender } } - /// Returns an `NetworkAction` corresponding to the peer sending the response. If - /// the peer is connected, the error is sent with a randomized latency as defined in - /// configuration. - fn respond_to_send_request( - &mut self, - request: Requests, - ingress_tx: &mut tokio::sync::mpsc::UnboundedSender, - ) -> NetworkAction { - let ingress_tx = ingress_tx.clone(); - - match request { - Requests::ChunkFetchingV1(outgoing_request) => { - let authority_discovery_id = match outgoing_request.peer { - req_res::Recipient::Authority(authority_discovery_id) => authority_discovery_id, - _ => unimplemented!("Peer recipient not supported yet"), - }; - // Account our sent request bytes. - self.network.peer_stats(0).inc_sent(outgoing_request.payload.encoded_size()); - - // If peer is disconnected return an error - if !self.network.is_peer_connected(&authority_discovery_id) { - // We always send `NotConnected` error and we ignore `IfDisconnected` value in - // the caller. - let future = async move { - let _ = outgoing_request - .pending_response - .send(Err(RequestFailure::NotConnected)); - } - .boxed(); - return self.not_connected_response(&authority_discovery_id, future) - } - - // Account for remote received request bytes. - self.network - .peer_stats_by_id(&authority_discovery_id) - .inc_received(outgoing_request.payload.encoded_size()); - - let validator_index: usize = outgoing_request.payload.index.0 as usize; - let candidate_hash = outgoing_request.payload.candidate_hash; - - let candidate_index = self - .availabilty - .candidate_hashes - .get(&candidate_hash) - .expect("candidate was generated previously; qed"); - gum::warn!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index"); - - let chunk: ChunkResponse = self.availabilty.chunks.get(*candidate_index).unwrap() - [validator_index] - .clone() - .into(); - let mut size = chunk.encoded_size(); - - let response = if random_error(self.config.error) { - // Error will not account to any bandwidth used. - size = 0; - Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)) - } else { - Ok(( - req_res::v1::ChunkFetchingResponse::from(Some(chunk)).encode(), - self.network.req_protocol_names().get_name(ChunkFetchingRequest::PROTOCOL), - )) - }; - - let authority_discovery_id_clone = authority_discovery_id.clone(); - - let future = async move { - let _ = outgoing_request.pending_response.send(response); - } - .boxed(); - - let future_wrapper = async move { - // Forward the response to the ingress channel of our node. - // On receive side we apply our node receiving rate limit. - let action = - NetworkAction::new(authority_discovery_id_clone, future, size, None); - ingress_tx.send(action).unwrap(); - } - .boxed(); - - NetworkAction::new( - authority_discovery_id, - future_wrapper, - size, - // Generate a random latency based on configuration. - random_latency(self.config.latency.as_ref()), - ) - }, - Requests::AvailableDataFetchingV1(outgoing_request) => { - let candidate_hash = outgoing_request.payload.candidate_hash; - let candidate_index = self - .availabilty - .candidate_hashes - .get(&candidate_hash) - .expect("candidate was generated previously; qed"); - gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index"); - - let authority_discovery_id = match outgoing_request.peer { - req_res::Recipient::Authority(authority_discovery_id) => authority_discovery_id, - _ => unimplemented!("Peer recipient not supported yet"), - }; - - // Account our sent request bytes. - self.network.peer_stats(0).inc_sent(outgoing_request.payload.encoded_size()); - - // If peer is disconnected return an error - if !self.network.is_peer_connected(&authority_discovery_id) { - let future = async move { - let _ = outgoing_request - .pending_response - .send(Err(RequestFailure::NotConnected)); - } - .boxed(); - return self.not_connected_response(&authority_discovery_id, future) - } - - // Account for remote received request bytes. - self.network - .peer_stats_by_id(&authority_discovery_id) - .inc_received(outgoing_request.payload.encoded_size()); - - let available_data = - self.availabilty.available_data.get(*candidate_index).unwrap().clone(); - - let size = available_data.encoded_size(); - - let response = if random_error(self.config.error) { - Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)) - } else { - Ok(( - req_res::v1::AvailableDataFetchingResponse::from(Some(available_data)) - .encode(), - self.network - .req_protocol_names() - .get_name(AvailableDataFetchingRequest::PROTOCOL), - )) - }; - - let future = async move { - let _ = outgoing_request.pending_response.send(response); - } - .boxed(); - - let authority_discovery_id_clone = authority_discovery_id.clone(); +} - let future_wrapper = async move { - // Forward the response to the ingress channel of our node. - // On receive side we apply our node receiving rate limit. - let action = - NetworkAction::new(authority_discovery_id_clone, future, size, None); - ingress_tx.send(action).unwrap(); - } - .boxed(); +#[overseer::subsystem(NetworkBridgeTx, error=SubsystemError, prefix=self::overseer)] +impl MockNetworkBridgeTx { + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = self.run(ctx).map(|_| Ok(())).boxed(); - NetworkAction::new( - authority_discovery_id, - future_wrapper, - size, - // Generate a random latency based on configuration. - random_latency(self.config.latency.as_ref()), - ) - }, - _ => panic!("received an unexpected request"), - } + SpawnedSubsystem { name: "network-bridge-tx", future } } } -#[overseer::subsystem(NetworkBridgeTx, error=SubsystemError, prefix=self::overseer)] -impl MockNetworkBridgeTx { +#[overseer::subsystem(NetworkBridgeRx, error=SubsystemError, prefix=self::overseer)] +impl MockNetworkBridgeRx { fn start(self, ctx: Context) -> SpawnedSubsystem { let future = self.run(ctx).map(|_| Ok(())).boxed(); - SpawnedSubsystem { name: "test-environment", future } + SpawnedSubsystem { name: "network-bridge-rx", future } } } #[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)] impl MockNetworkBridgeTx { - async fn run(mut self, mut ctx: Context) { - let (mut ingress_tx, mut ingress_rx) = - tokio::sync::mpsc::unbounded_channel::(); - - // Initialize our node bandwidth limits. - let mut rx_limiter = RateLimit::new(10, self.config.bandwidth); - - let our_network = self.network.clone(); - - // This task will handle node messages receipt from the simulated network. - ctx.spawn_blocking( - "network-receive", - async move { - while let Some(action) = ingress_rx.recv().await { - let size = action.size(); - - // account for our node receiving the data. - our_network.inc_received(size); - rx_limiter.reap(size).await; - action.run().await; - } - } - .boxed(), - ) - .expect("We never fail to spawn tasks"); - + async fn run(self, mut ctx: Context) { // Main subsystem loop. loop { - let msg = ctx.recv().await.expect("Overseer never fails us"); - - match msg { + let subsystem_message = ctx.recv().await.expect("Overseer never fails us"); + match subsystem_message { orchestra::FromOrchestra::Signal(signal) => if signal == OverseerSignal::Conclude { return @@ -305,14 +105,27 @@ impl MockNetworkBridgeTx { NetworkBridgeTxMessage::SendRequests(requests, _if_disconnected) => { for request in requests { gum::debug!(target: LOG_TARGET, request = ?request, "Processing request"); - self.network.inc_sent(request_size(&request)); - let action = self.respond_to_send_request(request, &mut ingress_tx); - - // Will account for our node sending the request over the emulated - // network. - self.network.submit_peer_action(action.peer(), action); + let peer_id = + request.authority_id().expect("all nodes are authorities").clone(); + + if !self.network.is_peer_connected(&peer_id) { + // Attempting to send a request to a disconnected peer. + request + .into_response_sender() + .send(Err(RequestFailure::NotConnected)) + .expect("send never fails"); + continue + } + + let peer_message = + NetworkMessage::RequestFromNode(peer_id.clone(), request); + + let _ = self.to_network_interface.unbounded_send(peer_message); } }, + NetworkBridgeTxMessage::ReportPeer(_) => { + // ingore rep changes + }, _ => { unimplemented!("Unexpected network bridge message") }, @@ -322,12 +135,56 @@ impl MockNetworkBridgeTx { } } -// A helper to determine the request payload size. -fn request_size(request: &Requests) -> usize { - match request { - Requests::ChunkFetchingV1(outgoing_request) => outgoing_request.payload.encoded_size(), - Requests::AvailableDataFetchingV1(outgoing_request) => - outgoing_request.payload.encoded_size(), - _ => unimplemented!("received an unexpected request"), +#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)] +impl MockNetworkBridgeRx { + async fn run(mut self, mut ctx: Context) { + // Main subsystem loop. + let mut from_network_interface = self.network_receiver.0; + loop { + futures::select! { + maybe_peer_message = from_network_interface.next() => { + if let Some(message) = maybe_peer_message { + match message { + NetworkMessage::MessageFromPeer(message) => match message { + Versioned::V2( + polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution( + bitfield, + ), + ) => { + ctx.send_message( + BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(PeerId::random(), polkadot_node_network_protocol::Versioned::V2(bitfield))) + ).await; + }, + _ => { + unimplemented!("We only talk v2 network protocol") + }, + }, + NetworkMessage::RequestFromPeer(request) => { + if let Some(protocol) = self.chunk_request_sender.as_mut() { + assert_eq!(&*protocol.name, CHUNK_REQ_PROTOCOL_NAME_V1); + if let Some(inbound_queue) = protocol.inbound_queue.as_ref() { + inbound_queue + .send(request) + .await + .expect("Forwarding requests to subsystem never fails"); + } + } + }, + _ => { + panic!("NetworkMessage::RequestFromNode is not expected to be received from a peer") + } + } + } + }, + subsystem_message = ctx.recv().fuse() => { + match subsystem_message.expect("Overseer never fails us") { + orchestra::FromOrchestra::Signal(signal) => if signal == OverseerSignal::Conclude { return }, + _ => { + unimplemented!("Unexpected network bridge rx message") + }, + } + } + } + } } } diff --git a/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs b/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs index d664ebead3cc..caefe068efff 100644 --- a/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs +++ b/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs @@ -16,31 +16,45 @@ //! //! A generic runtime api subsystem mockup suitable to be used in benchmarks. -use polkadot_primitives::{GroupIndex, IndexedVec, SessionInfo, ValidatorIndex}; +use polkadot_primitives::{ + CandidateReceipt, CoreState, GroupIndex, IndexedVec, OccupiedCore, SessionInfo, ValidatorIndex, +}; +use bitvec::prelude::BitVec; use polkadot_node_subsystem::{ messages::{RuntimeApiMessage, RuntimeApiRequest}, overseer, SpawnedSubsystem, SubsystemError, }; use polkadot_node_subsystem_types::OverseerSignal; +use sp_core::H256; +use std::collections::HashMap; use crate::core::configuration::{TestAuthorities, TestConfiguration}; use futures::FutureExt; const LOG_TARGET: &str = "subsystem-bench::runtime-api-mock"; +/// Minimal state to answer requests. pub struct RuntimeApiState { + // All authorities in the test, authorities: TestAuthorities, + // Candidate + candidate_hashes: HashMap>, } +/// A mocked `runtime-api` subsystem. pub struct MockRuntimeApi { state: RuntimeApiState, config: TestConfiguration, } impl MockRuntimeApi { - pub fn new(config: TestConfiguration, authorities: TestAuthorities) -> MockRuntimeApi { - Self { state: RuntimeApiState { authorities }, config } + pub fn new( + config: TestConfiguration, + authorities: TestAuthorities, + candidate_hashes: HashMap>, + ) -> MockRuntimeApi { + Self { state: RuntimeApiState { authorities, candidate_hashes }, config } } fn session_info(&self) -> SessionInfo { @@ -48,8 +62,10 @@ impl MockRuntimeApi { .map(|i| ValidatorIndex(i as _)) .collect::>(); - let validator_groups = all_validators.chunks(5).map(Vec::from).collect::>(); - + let validator_groups = all_validators + .chunks(self.config.max_validators_per_core) + .map(Vec::from) + .collect::>(); SessionInfo { validators: self.state.authorities.validator_public.clone().into(), discovery_keys: self.state.authorities.validator_authority_id.clone(), @@ -80,6 +96,8 @@ impl MockRuntimeApi { #[overseer::contextbounds(RuntimeApi, prefix = self::overseer)] impl MockRuntimeApi { async fn run(self, mut ctx: Context) { + let validator_group_count = self.session_info().validator_groups.len(); + loop { let msg = ctx.recv().await.expect("Overseer never fails us"); @@ -93,14 +111,79 @@ impl MockRuntimeApi { match msg { RuntimeApiMessage::Request( - _request, + _block_hash, RuntimeApiRequest::SessionInfo(_session_index, sender), ) => { let _ = sender.send(Ok(Some(self.session_info()))); }, + RuntimeApiMessage::Request( + _block_hash, + RuntimeApiRequest::SessionExecutorParams(_session_index, sender), + ) => { + let _ = sender.send(Ok(Some(Default::default()))); + }, + RuntimeApiMessage::Request( + _block_hash, + RuntimeApiRequest::Validators(sender), + ) => { + let _ = + sender.send(Ok(self.state.authorities.validator_public.clone())); + }, + RuntimeApiMessage::Request( + _block_hash, + RuntimeApiRequest::CandidateEvents(sender), + ) => { + let _ = sender.send(Ok(Default::default())); + }, + RuntimeApiMessage::Request( + _block_hash, + RuntimeApiRequest::SessionIndexForChild(sender), + ) => { + // Session is always the same. + let _ = sender.send(Ok(0)); + }, + RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::AvailabilityCores(sender), + ) => { + let candidate_hashes = self + .state + .candidate_hashes + .get(&block_hash) + .expect("Relay chain block hashes are generated at test start"); + + // All cores are always occupied. + let cores = candidate_hashes + .iter() + .enumerate() + .map(|(index, candidate_receipt)| { + // Ensure test breaks if badly configured. + assert!(index < validator_group_count); + + CoreState::Occupied(OccupiedCore { + next_up_on_available: None, + occupied_since: 0, + time_out_at: 0, + next_up_on_time_out: None, + availability: BitVec::default(), + group_responsible: GroupIndex(index as u32), + candidate_hash: candidate_receipt.hash(), + candidate_descriptor: candidate_receipt.descriptor.clone(), + }) + }) + .collect::>(); + + let _ = sender.send(Ok(cores)); + }, + RuntimeApiMessage::Request( + _block_hash, + RuntimeApiRequest::NodeFeatures(_session_index, sender), + ) => { + let _ = sender.send(Ok(Default::default())); + }, // Long term TODO: implement more as needed. - _ => { - unimplemented!("Unexpected runtime-api message") + message => { + unimplemented!("Unexpected runtime-api message: {:?}", message) }, } }, diff --git a/polkadot/node/subsystem-bench/src/core/network.rs b/polkadot/node/subsystem-bench/src/core/network.rs index bbf61425f73d..e2932bf0f51b 100644 --- a/polkadot/node/subsystem-bench/src/core/network.rs +++ b/polkadot/node/subsystem-bench/src/core/network.rs @@ -13,27 +13,65 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +//! +//! Implements network emulation and interfaces to control and specialize +//! network peer behaviour. +// +// [TestEnvironment] +// [NetworkEmulatorHandle] +// || +// +-------+--||--+-------+ +// | | | | +// Peer1 Peer2 Peer3 Peer4 +// \ | | / +// \ | | / +// \ | | / +// \ | | / +// \ | | / +// [Network Interface] +// | +// [Emulated Network Bridge] +// | +// Subsystems under test + +use crate::core::configuration::random_latency; + use super::{ configuration::{TestAuthorities, TestConfiguration}, environment::TestEnvironmentDependencies, *, }; use colored::Colorize; -use polkadot_node_network_protocol::request_response::ReqProtocolNames; +use futures::{ + channel::{mpsc, oneshot}, + lock::Mutex, + stream::FuturesUnordered, +}; + +use net_protocol::{ + request_response::{Recipient, Requests, ResponseSender}, + VersionedValidationProtocol, +}; +use parity_scale_codec::Encode; use polkadot_primitives::AuthorityDiscoveryId; use prometheus_endpoint::U64; use rand::{seq::SliceRandom, thread_rng}; +use sc_network::{ + request_responses::{IncomingRequest, OutgoingResponse}, + RequestFailure, +}; use sc_service::SpawnTaskHandle; use std::{ collections::HashMap, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::Arc, time::{Duration, Instant}, }; -use tokio::sync::mpsc::UnboundedSender; +use polkadot_node_network_protocol::{self as net_protocol, Versioned}; + +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; + +use futures::{Future, FutureExt, StreamExt}; // An emulated node egress traffic rate_limiter. #[derive(Debug)] pub struct RateLimit { @@ -100,185 +138,553 @@ impl RateLimit { } } -#[cfg(test)] -mod tests { - use std::time::Instant; - - use super::RateLimit; - - #[tokio::test] - async fn test_expected_rate() { - let tick_rate = 200; - let budget = 1_000_000; - // rate must not exceeed 100 credits per second - let mut rate_limiter = RateLimit::new(tick_rate, budget); - let mut total_sent = 0usize; - let start = Instant::now(); +/// A wrapper for both gossip and request/response protocols along with the destination +/// peer(`AuthorityDiscoveryId``). +pub enum NetworkMessage { + /// A gossip message from peer to node. + MessageFromPeer(VersionedValidationProtocol), + /// A gossip message from node to a peer. + MessageFromNode(AuthorityDiscoveryId, VersionedValidationProtocol), + /// A request originating from our node + RequestFromNode(AuthorityDiscoveryId, Requests), + /// A request originating from an emultated peer + RequestFromPeer(IncomingRequest), +} - let mut reap_amount = 0; - while rate_limiter.total_ticks < tick_rate { - reap_amount += 1; - reap_amount %= 100; +impl NetworkMessage { + /// Returns the size of the encoded message or request + pub fn size(&self) -> usize { + match &self { + NetworkMessage::MessageFromPeer(Versioned::V2(message)) => message.encoded_size(), + NetworkMessage::MessageFromPeer(Versioned::V1(message)) => message.encoded_size(), + NetworkMessage::MessageFromPeer(Versioned::V3(message)) => message.encoded_size(), + NetworkMessage::MessageFromNode(_peer_id, Versioned::V2(message)) => + message.encoded_size(), + NetworkMessage::MessageFromNode(_peer_id, Versioned::V1(message)) => + message.encoded_size(), + NetworkMessage::MessageFromNode(_peer_id, Versioned::V3(message)) => + message.encoded_size(), + NetworkMessage::RequestFromNode(_peer_id, incoming) => incoming.size(), + NetworkMessage::RequestFromPeer(request) => request.payload.encoded_size(), + } + } - rate_limiter.reap(reap_amount).await; - total_sent += reap_amount; + /// Returns the destination peer from the message or `None` if it originates from a peer. + pub fn peer(&self) -> Option<&AuthorityDiscoveryId> { + match &self { + NetworkMessage::MessageFromNode(peer_id, _) | + NetworkMessage::RequestFromNode(peer_id, _) => Some(peer_id), + _ => None, } + } +} - let end = Instant::now(); +/// A network interface of the node under test. +pub struct NetworkInterface { + // Sender for subsystems. + bridge_to_interface_sender: UnboundedSender, +} - println!("duration: {}", (end - start).as_millis()); +// Wraps the receiving side of a interface to bridge channel. It is a required +// parameter of the `network-bridge` mock. +pub struct NetworkInterfaceReceiver(pub UnboundedReceiver); - // Allow up to `budget/max_refill` error tolerance - let lower_bound = budget as u128 * ((end - start).as_millis() / 1000u128); - let upper_bound = budget as u128 * - ((end - start).as_millis() / 1000u128 + rate_limiter.max_refill as u128); - assert!(total_sent as u128 >= lower_bound); - assert!(total_sent as u128 <= upper_bound); - } +struct ProxiedRequest { + sender: Option>, + receiver: oneshot::Receiver, } -// A network peer emulator. It spawns a task that accepts `NetworkActions` and -// executes them with a configurable delay and bandwidth constraints. Tipically -// these actions wrap a future that performs a channel send to the subsystem(s) under test. -#[derive(Clone)] -struct PeerEmulator { - // The queue of requests waiting to be served by the emulator - actions_tx: UnboundedSender, +struct ProxiedResponse { + pub sender: oneshot::Sender, + pub result: Result, RequestFailure>, +} + +use std::task::Poll; + +impl Future for ProxiedRequest { + // The sender and result. + type Output = ProxiedResponse; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + match self.receiver.poll_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(response) => Poll::Ready(ProxiedResponse { + sender: self.sender.take().expect("sender already used"), + result: response + .expect("Response is always succesfully received.") + .result + .map_err(|_| RequestFailure::Refused), + }), + } + } } -impl PeerEmulator { +impl NetworkInterface { + /// Create a new `NetworkInterface` pub fn new( - bandwidth: usize, spawn_task_handle: SpawnTaskHandle, - stats: Arc, - ) -> Self { - let (actions_tx, mut actions_rx) = tokio::sync::mpsc::unbounded_channel(); - - spawn_task_handle - .clone() - .spawn("peer-emulator", "test-environment", async move { - // Rate limit peer send. - let mut rate_limiter = RateLimit::new(10, bandwidth); - loop { - let stats_clone = stats.clone(); - let maybe_action: Option = actions_rx.recv().await; - if let Some(action) = maybe_action { - let size = action.size(); - rate_limiter.reap(size).await; - if let Some(latency) = action.latency { - spawn_task_handle.spawn( - "peer-emulator-latency", - "test-environment", - async move { - tokio::time::sleep(latency).await; - action.run().await; - stats_clone.inc_sent(size); - }, - ) + network: NetworkEmulatorHandle, + bandwidth_bps: usize, + mut from_network: UnboundedReceiver, + ) -> (NetworkInterface, NetworkInterfaceReceiver) { + let rx_limiter = Arc::new(Mutex::new(RateLimit::new(10, bandwidth_bps))); + let tx_limiter = Arc::new(Mutex::new(RateLimit::new(10, bandwidth_bps))); + + // Channel for receiving messages from the network bridge subsystem. + let (bridge_to_interface_sender, mut bridge_to_interface_receiver) = + mpsc::unbounded::(); + + // Channel for forwarding messages to the network bridge subsystem. + let (interface_to_bridge_sender, interface_to_bridge_receiver) = + mpsc::unbounded::(); + + let rx_network = network.clone(); + let tx_network = network; + + let rx_task_bridge_sender = interface_to_bridge_sender.clone(); + + let task_rx_limiter = rx_limiter.clone(); + let task_tx_limiter = tx_limiter.clone(); + + // A task that forwards messages from emulated peers to the node (emulated network bridge). + let rx_task = async move { + let mut proxied_requests = FuturesUnordered::new(); + + loop { + let mut from_network = from_network.next().fuse(); + futures::select! { + maybe_peer_message = from_network => { + if let Some(peer_message) = maybe_peer_message { + let size = peer_message.size(); + task_rx_limiter.lock().await.reap(size).await; + rx_network.inc_received(size); + + // To be able to apply the configured bandwidth limits for responses being sent + // over channels, we need to implement a simple proxy that allows this loop + // to receive the response and enforce the configured bandwidth before + // sending it to the original recipient. + if let NetworkMessage::RequestFromPeer(request) = peer_message { + let (response_sender, response_receiver) = oneshot::channel(); + + // Create a new `IncomingRequest` that we forward to the network bridge. + let new_request = IncomingRequest {payload: request.payload, peer: request.peer, pending_response: response_sender}; + proxied_requests.push(ProxiedRequest {sender: Some(request.pending_response), receiver: response_receiver}); + + // Send the new message to network bridge subsystem. + rx_task_bridge_sender + .unbounded_send(NetworkMessage::RequestFromPeer(new_request)) + .expect("network bridge subsystem is alive"); + continue + } + + // Forward the message to the bridge. + rx_task_bridge_sender + .unbounded_send(peer_message) + .expect("network bridge subsystem is alive"); } else { - action.run().await; - stats_clone.inc_sent(size); + gum::info!(target: LOG_TARGET, "Uplink channel closed, network interface task exiting"); + break + } + }, + proxied_request = proxied_requests.next() => { + if let Some(proxied_request) = proxied_request { + match proxied_request.result { + Ok(result) => { + let bytes = result.encoded_size(); + gum::trace!(target: LOG_TARGET, size = bytes, "proxied request completed"); + + // Enforce bandwidth based on the response the node has sent. + // TODO: Fix the stall of RX when TX lock() takes a while to refill + // the token bucket. Good idea would be to create a task for each request. + task_tx_limiter.lock().await.reap(bytes).await; + rx_network.inc_sent(bytes); + + // Forward the response to original recipient. + proxied_request.sender.send( + OutgoingResponse { + reputation_changes: Vec::new(), + result: Ok(result), + sent_feedback: None + } + ).expect("network is alive"); + } + Err(e) => { + gum::warn!(target: LOG_TARGET, "Node req/response failure: {:?}", e) + } + } + } else { + gum::debug!(target: LOG_TARGET, "No more active proxied requests"); + // break } - } else { - break } } - }); + } + } + .boxed(); + + let task_spawn_handle = spawn_task_handle.clone(); + let task_rx_limiter = rx_limiter.clone(); + let task_tx_limiter = tx_limiter.clone(); + + // A task that forwards messages from the node to emulated peers. + let tx_task = async move { + // Wrap it in an `Arc` to avoid `clone()` the inner data as we need to share it across + // many send tasks. + let tx_network = Arc::new(tx_network); + + loop { + if let Some(peer_message) = bridge_to_interface_receiver.next().await { + let size = peer_message.size(); + // Ensure bandwidth used is limited. + task_tx_limiter.lock().await.reap(size).await; + + match peer_message { + NetworkMessage::MessageFromNode(peer, message) => + tx_network.send_message_to_peer(&peer, message), + NetworkMessage::RequestFromNode(peer, request) => { + // Send request through a proxy so we can account and limit bandwidth + // usage for the node. + let send_task = Self::proxy_send_request( + peer.clone(), + request, + tx_network.clone(), + task_rx_limiter.clone(), + ) + .boxed(); - Self { actions_tx } + task_spawn_handle.spawn("request-proxy", "test-environment", send_task); + }, + _ => panic!( + "Unexpected network message received from emulated network bridge" + ), + } + + tx_network.inc_sent(size); + } else { + gum::info!(target: LOG_TARGET, "Downlink channel closed, network interface task exiting"); + break + } + } + } + .boxed(); + + spawn_task_handle.spawn("network-interface-rx", "test-environment", rx_task); + spawn_task_handle.spawn("network-interface-tx", "test-environment", tx_task); + + ( + Self { bridge_to_interface_sender }, + NetworkInterfaceReceiver(interface_to_bridge_receiver), + ) + } + + /// Get a sender that can be used by a subsystem to send network actions to the network. + pub fn subsystem_sender(&self) -> UnboundedSender { + self.bridge_to_interface_sender.clone() } - // Queue a send request from the emulated peer. - pub fn send(&mut self, action: NetworkAction) { - self.actions_tx.send(action).expect("peer emulator task lives"); + /// Helper method that proxies a request from node to peer and implements rate limiting and + /// accounting. + async fn proxy_send_request( + peer: AuthorityDiscoveryId, + mut request: Requests, + tx_network: Arc, + task_rx_limiter: Arc>, + ) { + let (proxy_sender, proxy_receiver) = oneshot::channel(); + + // Modify the request response sender so we can intercept the answer + let sender = request.swap_response_sender(proxy_sender); + + // Send the modified request to the peer. + tx_network.send_request_to_peer(&peer, request); + + // Wait for answer (intercept the response). + match proxy_receiver.await { + Err(_) => { + panic!("Emulated peer hangup"); + }, + Ok(Err(err)) => { + sender.send(Err(err)).expect("Oneshot send always works."); + }, + Ok(Ok((response, protocol_name))) => { + let response_size = response.encoded_size(); + task_rx_limiter.lock().await.reap(response_size).await; + tx_network.inc_received(response_size); + + // Send the response to the original request sender. + if sender.send(Ok((response, protocol_name))).is_err() { + gum::warn!(target: LOG_TARGET, response_size, "response oneshot canceled by node") + } + }, + }; } } -pub type ActionFuture = std::pin::Pin + std::marker::Send>>; -/// An network action to be completed by the emulator task. -pub struct NetworkAction { - // The function that performs the action - run: ActionFuture, - // The payload size that we simulate sending/receiving from a peer - size: usize, - // Peer which should run the action. - peer: AuthorityDiscoveryId, - // The amount of time to delay the polling `run` - latency: Option, +/// A handle for controlling an emulated peer. +#[derive(Clone)] +pub struct EmulatedPeerHandle { + /// Send messages to be processed by the peer. + messages_tx: UnboundedSender, + /// Send actions to be performed by the peer. + actions_tx: UnboundedSender, } -unsafe impl Send for NetworkAction {} +impl EmulatedPeerHandle { + /// Receive and process a message from the node. + pub fn receive(&self, message: NetworkMessage) { + self.messages_tx.unbounded_send(message).expect("Peer message channel hangup"); + } -/// Book keeping of sent and received bytes. -pub struct PeerEmulatorStats { - rx_bytes_total: AtomicU64, - tx_bytes_total: AtomicU64, - metrics: Metrics, - peer_index: usize, + /// Send a message to the node. + pub fn send_message(&self, message: VersionedValidationProtocol) { + self.actions_tx + .unbounded_send(NetworkMessage::MessageFromPeer(message)) + .expect("Peer action channel hangup"); + } + + /// Send a `request` to the node. + pub fn send_request(&self, request: IncomingRequest) { + self.actions_tx + .unbounded_send(NetworkMessage::RequestFromPeer(request)) + .expect("Peer action channel hangup"); + } } -impl PeerEmulatorStats { - pub(crate) fn new(peer_index: usize, metrics: Metrics) -> Self { - Self { - metrics, - rx_bytes_total: AtomicU64::from(0), - tx_bytes_total: AtomicU64::from(0), - peer_index, +// A network peer emulator. +struct EmulatedPeer { + spawn_handle: SpawnTaskHandle, + to_node: UnboundedSender, + tx_limiter: RateLimit, + rx_limiter: RateLimit, + latency_ms: usize, +} + +impl EmulatedPeer { + /// Send a message to the node. + pub async fn send_message(&mut self, message: NetworkMessage) { + self.tx_limiter.reap(message.size()).await; + + if self.latency_ms == 0 { + self.to_node.unbounded_send(message).expect("Sending to the node never fails"); + } else { + let to_node = self.to_node.clone(); + let latency_ms = std::time::Duration::from_millis(self.latency_ms as u64); + + // Emulate RTT latency + self.spawn_handle + .spawn("peer-latency-emulator", "test-environment", async move { + tokio::time::sleep(latency_ms).await; + to_node.unbounded_send(message).expect("Sending to the node never fails"); + }); } } - pub fn inc_sent(&self, bytes: usize) { - self.tx_bytes_total.fetch_add(bytes as u64, Ordering::Relaxed); - self.metrics.on_peer_sent(self.peer_index, bytes); + /// Returns the rx bandwidth limiter. + pub fn rx_limiter(&mut self) -> &mut RateLimit { + &mut self.rx_limiter } +} - pub fn inc_received(&self, bytes: usize) { - self.rx_bytes_total.fetch_add(bytes as u64, Ordering::Relaxed); - self.metrics.on_peer_received(self.peer_index, bytes); - } +/// Interceptor pattern for handling messages. +pub trait HandleNetworkMessage { + /// Returns `None` if the message was handled, or the `message` + /// otherwise. + /// + /// `node_sender` allows sending of messages to the node in response + /// to the handled message. + fn handle( + &self, + message: NetworkMessage, + node_sender: &mut UnboundedSender, + ) -> Option; +} - pub fn sent(&self) -> u64 { - self.tx_bytes_total.load(Ordering::Relaxed) +impl HandleNetworkMessage for Arc +where + T: HandleNetworkMessage, +{ + fn handle( + &self, + message: NetworkMessage, + node_sender: &mut UnboundedSender, + ) -> Option { + self.as_ref().handle(message, node_sender) } +} - pub fn received(&self) -> u64 { - self.rx_bytes_total.load(Ordering::Relaxed) +// This loop is responsible for handling of messages/requests between the peer and the node. +async fn emulated_peer_loop( + handlers: Vec>, + stats: Arc, + mut emulated_peer: EmulatedPeer, + messages_rx: UnboundedReceiver, + actions_rx: UnboundedReceiver, + mut to_network_interface: UnboundedSender, +) { + let mut proxied_requests = FuturesUnordered::new(); + let mut messages_rx = messages_rx.fuse(); + let mut actions_rx = actions_rx.fuse(); + + loop { + futures::select! { + maybe_peer_message = messages_rx.next() => { + if let Some(peer_message) = maybe_peer_message { + let size = peer_message.size(); + + emulated_peer.rx_limiter().reap(size).await; + stats.inc_received(size); + + let mut message = Some(peer_message); + + // Try all handlers until the message gets processed. + // Panic if the message is not consumed. + for handler in handlers.iter() { + // The check below guarantees that message is always `Some`: we are still + // inside the loop. + message = handler.handle(message.unwrap(), &mut to_network_interface); + if message.is_none() { + break + } + } + if let Some(message) = message { + panic!("Emulated message from peer {:?} not handled", message.peer()); + } + } else { + gum::debug!(target: LOG_TARGET, "Downlink channel closed, peer task exiting"); + break + } + }, + maybe_action = actions_rx.next() => { + match maybe_action { + // We proxy any request being sent to the node to limit bandwidth as we + // do in the `NetworkInterface` task. + Some(NetworkMessage::RequestFromPeer(request)) => { + let (response_sender, response_receiver) = oneshot::channel(); + // Create a new `IncomingRequest` that we forward to the network interface. + let new_request = IncomingRequest {payload: request.payload, peer: request.peer, pending_response: response_sender}; + + proxied_requests.push(ProxiedRequest {sender: Some(request.pending_response), receiver: response_receiver}); + + emulated_peer.send_message(NetworkMessage::RequestFromPeer(new_request)).await; + }, + Some(message) => emulated_peer.send_message(message).await, + None => { + gum::debug!(target: LOG_TARGET, "Action channel closed, peer task exiting"); + break + } + } + }, + proxied_request = proxied_requests.next() => { + if let Some(proxied_request) = proxied_request { + match proxied_request.result { + Ok(result) => { + let bytes = result.encoded_size(); + gum::trace!(target: LOG_TARGET, size = bytes, "Peer proxied request completed"); + + emulated_peer.rx_limiter().reap(bytes).await; + stats.inc_received(bytes); + + proxied_request.sender.send( + OutgoingResponse { + reputation_changes: Vec::new(), + result: Ok(result), + sent_feedback: None + } + ).expect("network is alive"); + } + Err(e) => { + gum::warn!(target: LOG_TARGET, "Node req/response failure: {:?}", e) + } + } + } + } + } } } -#[derive(Debug, Default)] -pub struct PeerStats { - pub rx_bytes_total: u64, - pub tx_bytes_total: u64, +/// Creates a new peer emulator task and returns a handle to it. +pub fn new_peer( + bandwidth: usize, + spawn_task_handle: SpawnTaskHandle, + handlers: Vec>, + stats: Arc, + to_network_interface: UnboundedSender, + latency_ms: usize, +) -> EmulatedPeerHandle { + let (messages_tx, messages_rx) = mpsc::unbounded::(); + let (actions_tx, actions_rx) = mpsc::unbounded::(); + + let rx_limiter = RateLimit::new(10, bandwidth); + let tx_limiter = RateLimit::new(10, bandwidth); + let emulated_peer = EmulatedPeer { + spawn_handle: spawn_task_handle.clone(), + rx_limiter, + tx_limiter, + to_node: to_network_interface.clone(), + latency_ms, + }; + + spawn_task_handle.clone().spawn( + "peer-emulator", + "test-environment", + emulated_peer_loop( + handlers, + stats, + emulated_peer, + messages_rx, + actions_rx, + to_network_interface, + ) + .boxed(), + ); + + EmulatedPeerHandle { messages_tx, actions_tx } } -impl NetworkAction { - pub fn new( - peer: AuthorityDiscoveryId, - run: ActionFuture, - size: usize, - latency: Option, - ) -> Self { - Self { run, size, peer, latency } + +/// Book keeping of sent and received bytes. +pub struct PeerEmulatorStats { + metrics: Metrics, + peer_index: usize, +} + +impl PeerEmulatorStats { + pub(crate) fn new(peer_index: usize, metrics: Metrics) -> Self { + Self { metrics, peer_index } } - pub fn size(&self) -> usize { - self.size + pub fn inc_sent(&self, bytes: usize) { + self.metrics.on_peer_sent(self.peer_index, bytes); + } + + pub fn inc_received(&self, bytes: usize) { + self.metrics.on_peer_received(self.peer_index, bytes); } - pub async fn run(self) { - self.run.await; + pub fn sent(&self) -> usize { + self.metrics + .peer_total_sent + .get_metric_with_label_values(&[&format!("node{}", self.peer_index)]) + .expect("Metric exists") + .get() as usize } - pub fn peer(&self) -> AuthorityDiscoveryId { - self.peer.clone() + pub fn received(&self) -> usize { + self.metrics + .peer_total_received + .get_metric_with_label_values(&[&format!("node{}", self.peer_index)]) + .expect("Metric exists") + .get() as usize } } /// The state of a peer on the emulated network. #[derive(Clone)] enum Peer { - Connected(PeerEmulator), - Disconnected(PeerEmulator), + Connected(EmulatedPeerHandle), + Disconnected(EmulatedPeerHandle), } impl Peer { @@ -294,99 +700,154 @@ impl Peer { matches!(self, Peer::Connected(_)) } - pub fn emulator(&mut self) -> &mut PeerEmulator { + pub fn handle(&self) -> &EmulatedPeerHandle { match self { - Peer::Connected(ref mut emulator) => emulator, - Peer::Disconnected(ref mut emulator) => emulator, + Peer::Connected(ref emulator) => emulator, + Peer::Disconnected(ref emulator) => emulator, } } } -/// Mocks the network bridge and an arbitrary number of connected peer nodes. -/// Implements network latency, bandwidth and connection errors. +/// A ha emulated network implementation. #[derive(Clone)] -pub struct NetworkEmulator { +pub struct NetworkEmulatorHandle { // Per peer network emulation. peers: Vec, /// Per peer stats. stats: Vec>, /// Each emulated peer is a validator. validator_authority_ids: HashMap, - /// Request protocol names - req_protocol_names: ReqProtocolNames, } -impl NetworkEmulator { - pub fn new( - config: &TestConfiguration, - dependencies: &TestEnvironmentDependencies, - authorities: &TestAuthorities, - req_protocol_names: ReqProtocolNames, - ) -> Self { - let n_peers = config.n_validators; - gum::info!(target: LOG_TARGET, "{}",format!("Initializing emulation for a {} peer network.", n_peers).bright_blue()); - gum::info!(target: LOG_TARGET, "{}",format!("connectivity {}%, error {}%", config.connectivity, config.error).bright_black()); - - let metrics = - Metrics::new(&dependencies.registry).expect("Metrics always register succesfully"); - let mut validator_authority_id_mapping = HashMap::new(); - - // Create a `PeerEmulator` for each peer. - let (stats, mut peers): (_, Vec<_>) = (0..n_peers) - .zip(authorities.validator_authority_id.clone()) - .map(|(peer_index, authority_id)| { - validator_authority_id_mapping.insert(authority_id, peer_index); - let stats = Arc::new(PeerEmulatorStats::new(peer_index, metrics.clone())); - ( - stats.clone(), - Peer::Connected(PeerEmulator::new( - config.peer_bandwidth, - dependencies.task_manager.spawn_handle(), - stats, - )), - ) - }) - .unzip(); - - let connected_count = config.n_validators as f64 / (100.0 / config.connectivity as f64); - - let (_connected, to_disconnect) = - peers.partial_shuffle(&mut thread_rng(), connected_count as usize); - - for peer in to_disconnect { - peer.disconnect(); - } +/// Create a new emulated network based on `config`. +/// Each emulated peer will run the specified `handlers` to process incoming messages. +pub fn new_network( + config: &TestConfiguration, + dependencies: &TestEnvironmentDependencies, + authorities: &TestAuthorities, + handlers: Vec>, +) -> (NetworkEmulatorHandle, NetworkInterface, NetworkInterfaceReceiver) { + let n_peers = config.n_validators; + gum::info!(target: LOG_TARGET, "{}",format!("Initializing emulation for a {} peer network.", n_peers).bright_blue()); + gum::info!(target: LOG_TARGET, "{}",format!("connectivity {}%, latency {:?}", config.connectivity, config.latency).bright_black()); + + let metrics = + Metrics::new(&dependencies.registry).expect("Metrics always register succesfully"); + let mut validator_authority_id_mapping = HashMap::new(); + + // Create the channel from `peer` to `NetworkInterface` . + let (to_network_interface, from_network) = mpsc::unbounded(); + + // Create a `PeerEmulator` for each peer. + let (stats, mut peers): (_, Vec<_>) = (0..n_peers) + .zip(authorities.validator_authority_id.clone()) + .map(|(peer_index, authority_id)| { + validator_authority_id_mapping.insert(authority_id, peer_index); + let stats = Arc::new(PeerEmulatorStats::new(peer_index, metrics.clone())); + ( + stats.clone(), + Peer::Connected(new_peer( + config.peer_bandwidth, + dependencies.task_manager.spawn_handle(), + handlers.clone(), + stats, + to_network_interface.clone(), + random_latency(config.latency.as_ref()), + )), + ) + }) + .unzip(); - gum::info!(target: LOG_TARGET, "{}",format!("Network created, connected validator count {}", connected_count).bright_black()); + let connected_count = config.connected_count(); - Self { - peers, - stats, - validator_authority_ids: validator_authority_id_mapping, - req_protocol_names, - } + let (_connected, to_disconnect) = peers.partial_shuffle(&mut thread_rng(), connected_count); + + for peer in to_disconnect { + peer.disconnect(); } + gum::info!(target: LOG_TARGET, "{}",format!("Network created, connected validator count {}", connected_count).bright_black()); + + let handle = NetworkEmulatorHandle { + peers, + stats, + validator_authority_ids: validator_authority_id_mapping, + }; + + // Finally create the `NetworkInterface` with the `from_network` receiver. + let (network_interface, network_interface_receiver) = NetworkInterface::new( + dependencies.task_manager.spawn_handle(), + handle.clone(), + config.bandwidth, + from_network, + ); + + (handle, network_interface, network_interface_receiver) +} + +/// Errors that can happen when sending data to emulated peers. +pub enum EmulatedPeerError { + NotConnected, +} + +impl NetworkEmulatorHandle { + /// Returns true if the emulated peer is connected to the node under test. pub fn is_peer_connected(&self, peer: &AuthorityDiscoveryId) -> bool { self.peer(peer).is_connected() } - pub fn submit_peer_action(&mut self, peer: AuthorityDiscoveryId, action: NetworkAction) { - let index = self - .validator_authority_ids - .get(&peer) - .expect("all test authorities are valid; qed"); + /// Forward notification `message` to an emulated `peer`. + /// Panics if peer is not connected. + pub fn send_message_to_peer( + &self, + peer_id: &AuthorityDiscoveryId, + message: VersionedValidationProtocol, + ) { + let peer = self.peer(peer_id); + assert!(peer.is_connected(), "forward message only for connected peers."); + peer.handle().receive(NetworkMessage::MessageFromNode(peer_id.clone(), message)); + } - let peer = self.peers.get_mut(*index).expect("We just retrieved the index above; qed"); + /// Forward a `request`` to an emulated `peer`. + /// Panics if peer is not connected. + pub fn send_request_to_peer(&self, peer_id: &AuthorityDiscoveryId, request: Requests) { + let peer = self.peer(peer_id); + assert!(peer.is_connected(), "forward request only for connected peers."); + peer.handle().receive(NetworkMessage::RequestFromNode(peer_id.clone(), request)); + } - // Only actions of size 0 are allowed on disconnected peers. - // Typically this are delayed error response sends. - if action.size() > 0 && !peer.is_connected() { - gum::warn!(target: LOG_TARGET, peer_index = index, "Attempted to send data from a disconnected peer, operation ignored"); - return + /// Send a message from a peer to the node. + pub fn send_message_from_peer( + &self, + from_peer: &AuthorityDiscoveryId, + message: VersionedValidationProtocol, + ) -> Result<(), EmulatedPeerError> { + let dst_peer = self.peer(from_peer); + + if !dst_peer.is_connected() { + gum::warn!(target: LOG_TARGET, "Attempted to send message from a peer not connected to our node, operation ignored"); + return Err(EmulatedPeerError::NotConnected) } - peer.emulator().send(action); + dst_peer.handle().send_message(message); + Ok(()) + } + + /// Send a request from a peer to the node. + pub fn send_request_from_peer( + &self, + from_peer: &AuthorityDiscoveryId, + request: IncomingRequest, + ) -> Result<(), EmulatedPeerError> { + let dst_peer = self.peer(from_peer); + + if !dst_peer.is_connected() { + gum::warn!(target: LOG_TARGET, "Attempted to send request from a peer not connected to our node, operation ignored"); + return Err(EmulatedPeerError::NotConnected) + } + + dst_peer.handle().send_request(request); + Ok(()) } // Returns the sent/received stats for `peer_index`. @@ -406,42 +867,18 @@ impl NetworkEmulator { fn peer(&self, peer: &AuthorityDiscoveryId) -> &Peer { &self.peers[self.peer_index(peer)] } - // Returns the sent/received stats for `peer`. - pub fn peer_stats_by_id(&mut self, peer: &AuthorityDiscoveryId) -> Arc { - let peer_index = self.peer_index(peer); - - self.stats[peer_index].clone() - } - - // Returns the sent/received stats for all peers. - pub fn stats(&self) -> Vec { - let r = self - .stats - .iter() - .map(|stats| PeerStats { - rx_bytes_total: stats.received(), - tx_bytes_total: stats.sent(), - }) - .collect::>(); - r - } // Increment bytes sent by our node (the node that contains the subsystem under test) pub fn inc_sent(&self, bytes: usize) { - // Our node always is peer 0. + // Our node is always peer 0. self.peer_stats(0).inc_sent(bytes); } // Increment bytes received by our node (the node that contains the subsystem under test) pub fn inc_received(&self, bytes: usize) { - // Our node always is peer 0. + // Our node is always peer 0. self.peer_stats(0).inc_received(bytes); } - - // Get the request protocol names - pub fn req_protocol_names(&self) -> &ReqProtocolNames { - &self.req_protocol_names - } } use polkadot_node_subsystem_util::metrics::prometheus::{ @@ -497,3 +934,106 @@ impl Metrics { .inc_by(bytes as u64); } } + +// Helper trait for low level access to `Requests` variants. +pub trait RequestExt { + /// Get the authority id if any from the request. + fn authority_id(&self) -> Option<&AuthorityDiscoveryId>; + /// Consume self and return the response sender. + fn into_response_sender(self) -> ResponseSender; + /// Allows to change the `ResponseSender` in place. + fn swap_response_sender(&mut self, new_sender: ResponseSender) -> ResponseSender; + /// Returns the size in bytes of the request payload. + fn size(&self) -> usize; +} + +impl RequestExt for Requests { + fn authority_id(&self) -> Option<&AuthorityDiscoveryId> { + match self { + Requests::ChunkFetchingV1(request) => { + if let Recipient::Authority(authority_id) = &request.peer { + Some(authority_id) + } else { + None + } + }, + Requests::AvailableDataFetchingV1(request) => { + if let Recipient::Authority(authority_id) = &request.peer { + Some(authority_id) + } else { + None + } + }, + request => { + unimplemented!("RequestAuthority not implemented for {:?}", request) + }, + } + } + + fn into_response_sender(self) -> ResponseSender { + match self { + Requests::ChunkFetchingV1(outgoing_request) => outgoing_request.pending_response, + Requests::AvailableDataFetchingV1(outgoing_request) => + outgoing_request.pending_response, + _ => unimplemented!("unsupported request type"), + } + } + + /// Swaps the `ResponseSender` and returns the previous value. + fn swap_response_sender(&mut self, new_sender: ResponseSender) -> ResponseSender { + match self { + Requests::ChunkFetchingV1(outgoing_request) => + std::mem::replace(&mut outgoing_request.pending_response, new_sender), + Requests::AvailableDataFetchingV1(outgoing_request) => + std::mem::replace(&mut outgoing_request.pending_response, new_sender), + _ => unimplemented!("unsupported request type"), + } + } + + /// Returns the size in bytes of the request payload. + fn size(&self) -> usize { + match self { + Requests::ChunkFetchingV1(outgoing_request) => outgoing_request.payload.encoded_size(), + Requests::AvailableDataFetchingV1(outgoing_request) => + outgoing_request.payload.encoded_size(), + _ => unimplemented!("received an unexpected request"), + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Instant; + + use super::RateLimit; + + #[tokio::test] + async fn test_expected_rate() { + let tick_rate = 200; + let budget = 1_000_000; + // rate must not exceeed 100 credits per second + let mut rate_limiter = RateLimit::new(tick_rate, budget); + let mut total_sent = 0usize; + let start = Instant::now(); + + let mut reap_amount = 0; + while rate_limiter.total_ticks < tick_rate { + reap_amount += 1; + reap_amount %= 100; + + rate_limiter.reap(reap_amount).await; + total_sent += reap_amount; + } + + let end = Instant::now(); + + println!("duration: {}", (end - start).as_millis()); + + // Allow up to `budget/max_refill` error tolerance + let lower_bound = budget as u128 * ((end - start).as_millis() / 1000u128); + let upper_bound = budget as u128 * + ((end - start).as_millis() / 1000u128 + rate_limiter.max_refill as u128); + assert!(total_sent as u128 >= lower_bound); + assert!(total_sent as u128 <= upper_bound); + } +} diff --git a/polkadot/node/subsystem-bench/src/subsystem-bench.rs b/polkadot/node/subsystem-bench/src/subsystem-bench.rs index 8669ee4e8b1d..8633ebb703aa 100644 --- a/polkadot/node/subsystem-bench/src/subsystem-bench.rs +++ b/polkadot/node/subsystem-bench/src/subsystem-bench.rs @@ -16,20 +16,23 @@ //! A tool for running subsystem benchmark tests designed for development and //! CI regression testing. - use clap::Parser; + +use colored::Colorize; + use color_eyre::eyre; use pyroscope::PyroscopeAgent; use pyroscope_pprofrs::{pprof_backend, PprofConfig}; -use colored::Colorize; -use std::{path::Path, time::Duration}; +use std::path::Path; pub(crate) mod availability; pub(crate) mod cli; pub(crate) mod core; mod valgrind; +const LOG_TARGET: &str = "subsystem-bench"; + use availability::{prepare_test, NetworkEmulation, TestState}; use cli::TestObjective; @@ -61,24 +64,24 @@ struct BenchCli { pub standard_configuration: cli::StandardTestOptions, #[clap(short, long)] - /// The bandwidth of simulated remote peers in KiB + /// The bandwidth of emulated remote peers in KiB pub peer_bandwidth: Option, #[clap(short, long)] - /// The bandwidth of our simulated node in KiB + /// The bandwidth of our node in KiB pub bandwidth: Option, #[clap(long, value_parser=le_100)] - /// Simulated conection error ratio [0-100]. - pub peer_error: Option, + /// Emulated peer connection ratio [0-100]. + pub connectivity: Option, #[clap(long, value_parser=le_5000)] - /// Minimum remote peer latency in milliseconds [0-5000]. - pub peer_min_latency: Option, + /// Mean remote peer latency in milliseconds [0-5000]. + pub peer_mean_latency: Option, #[clap(long, value_parser=le_5000)] - /// Maximum remote peer latency in milliseconds [0-5000]. - pub peer_max_latency: Option, + /// Remote peer latency standard deviation + pub peer_latency_std_dev: Option, #[clap(long, default_value_t = false)] /// Enable CPU Profiling with Pyroscope @@ -101,6 +104,37 @@ struct BenchCli { } impl BenchCli { + fn create_test_configuration(&self) -> TestConfiguration { + let configuration = &self.standard_configuration; + + match self.network { + NetworkEmulation::Healthy => TestConfiguration::healthy_network( + self.objective.clone(), + configuration.num_blocks, + configuration.n_validators, + configuration.n_cores, + configuration.min_pov_size, + configuration.max_pov_size, + ), + NetworkEmulation::Degraded => TestConfiguration::degraded_network( + self.objective.clone(), + configuration.num_blocks, + configuration.n_validators, + configuration.n_cores, + configuration.min_pov_size, + configuration.max_pov_size, + ), + NetworkEmulation::Ideal => TestConfiguration::ideal_network( + self.objective.clone(), + configuration.num_blocks, + configuration.n_validators, + configuration.n_cores, + configuration.min_pov_size, + configuration.max_pov_size, + ), + } + } + fn launch(self) -> eyre::Result<()> { let is_valgrind_running = valgrind::is_valgrind_running(); if !is_valgrind_running && self.cache_misses { @@ -117,7 +151,6 @@ impl BenchCli { None }; - let configuration = self.standard_configuration; let mut test_config = match self.objective { TestObjective::TestSequence(options) => { let test_sequence = @@ -130,56 +163,48 @@ impl BenchCli { format!("Sequence contains {} step(s)", num_steps).bright_purple() ); for (index, test_config) in test_sequence.into_iter().enumerate() { - gum::info!("{}", format!("Step {}/{}", index + 1, num_steps).bright_purple(),); + gum::info!(target: LOG_TARGET, "{}", format!("Step {}/{}", index + 1, num_steps).bright_purple(),); display_configuration(&test_config); - let mut state = TestState::new(&test_config); - let (mut env, _protocol_config) = prepare_test(test_config, &mut state); - env.runtime() - .block_on(availability::benchmark_availability_read(&mut env, state)); + match test_config.objective { + TestObjective::DataAvailabilityRead(ref _opts) => { + let mut state = TestState::new(&test_config); + let (mut env, _protocol_config) = prepare_test(test_config, &mut state); + env.runtime().block_on(availability::benchmark_availability_read( + &mut env, state, + )); + }, + TestObjective::DataAvailabilityWrite => { + let mut state = TestState::new(&test_config); + let (mut env, _protocol_config) = prepare_test(test_config, &mut state); + env.runtime().block_on(availability::benchmark_availability_write( + &mut env, state, + )); + }, + _ => gum::error!("Invalid test objective in sequence"), + } } return Ok(()) }, - TestObjective::DataAvailabilityRead(ref _options) => match self.network { - NetworkEmulation::Healthy => TestConfiguration::healthy_network( - self.objective, - configuration.num_blocks, - configuration.n_validators, - configuration.n_cores, - configuration.min_pov_size, - configuration.max_pov_size, - ), - NetworkEmulation::Degraded => TestConfiguration::degraded_network( - self.objective, - configuration.num_blocks, - configuration.n_validators, - configuration.n_cores, - configuration.min_pov_size, - configuration.max_pov_size, - ), - NetworkEmulation::Ideal => TestConfiguration::ideal_network( - self.objective, - configuration.num_blocks, - configuration.n_validators, - configuration.n_cores, - configuration.min_pov_size, - configuration.max_pov_size, - ), - }, + TestObjective::DataAvailabilityRead(ref _options) => self.create_test_configuration(), + TestObjective::DataAvailabilityWrite => self.create_test_configuration(), }; let mut latency_config = test_config.latency.clone().unwrap_or_default(); - if let Some(latency) = self.peer_min_latency { - latency_config.min_latency = Duration::from_millis(latency); + if let Some(latency) = self.peer_mean_latency { + latency_config.mean_latency_ms = latency; } - if let Some(latency) = self.peer_max_latency { - latency_config.max_latency = Duration::from_millis(latency); + if let Some(std_dev) = self.peer_latency_std_dev { + latency_config.std_dev = std_dev; } - if let Some(error) = self.peer_error { - test_config.error = error; + // Write back the updated latency. + test_config.latency = Some(latency_config); + + if let Some(connectivity) = self.connectivity { + test_config.connectivity = connectivity; } if let Some(bandwidth) = self.peer_bandwidth { @@ -197,8 +222,17 @@ impl BenchCli { let mut state = TestState::new(&test_config); let (mut env, _protocol_config) = prepare_test(test_config, &mut state); - env.runtime() - .block_on(availability::benchmark_availability_read(&mut env, state)); + match self.objective { + TestObjective::DataAvailabilityRead(_options) => { + env.runtime() + .block_on(availability::benchmark_availability_read(&mut env, state)); + }, + TestObjective::DataAvailabilityWrite => { + env.runtime() + .block_on(availability::benchmark_availability_write(&mut env, state)); + }, + TestObjective::TestSequence(_options) => {}, + } if let Some(agent_running) = agent_running { let agent_ready = agent_running.stop()?; @@ -216,6 +250,7 @@ fn main() -> eyre::Result<()> { // Avoid `Terminating due to subsystem exit subsystem` warnings .filter(Some("polkadot_overseer"), log::LevelFilter::Error) .filter(None, log::LevelFilter::Info) + .format_timestamp_millis() // .filter(None, log::LevelFilter::Trace) .try_init() .unwrap(); diff --git a/prdoc/pr_2970.prdoc b/prdoc/pr_2970.prdoc new file mode 100644 index 000000000000..1db8f7bb334d --- /dev/null +++ b/prdoc/pr_2970.prdoc @@ -0,0 +1,15 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Add `availability-distribution` and `biftield-distribution` subsystem benchmark + +doc: + - audience: Node Dev + description: | + The new subsystem benchmark test objective (`DataAvailabilityWrite`) is designed to stress + test the part of the pipeline that takes as input a backed candidate and then distributes + it as erasure coded chunks to other validators. The test pulls in the `av-store`, + `bitfield-distribution` and `availability-distribution` subsystems while the whole network and rest + of the node subsystems are emulated. + +crates: [ ]