diff --git a/.github/workflows/benchmarks-networking.yml b/.github/workflows/benchmarks-networking.yml index 79494b9a015c..8f4246c79548 100644 --- a/.github/workflows/benchmarks-networking.yml +++ b/.github/workflows/benchmarks-networking.yml @@ -92,6 +92,7 @@ jobs: uses: benchmark-action/github-action-benchmark@v1 with: tool: "cargo" + name: ${{ env.BENCH }} output-file-path: ./charts/${{ env.BENCH }}.txt benchmark-data-dir-path: ./bench/${{ env.BENCH }} github-token: ${{ steps.app-token.outputs.token }} @@ -103,6 +104,7 @@ jobs: uses: benchmark-action/github-action-benchmark@v1 with: tool: "cargo" + name: ${{ env.BENCH }} output-file-path: ./charts/${{ env.BENCH }}.txt benchmark-data-dir-path: ./bench/${{ env.BENCH }} github-token: ${{ steps.app-token.outputs.token }} diff --git a/substrate/client/network/benches/notifications_protocol.rs b/substrate/client/network/benches/notifications_protocol.rs index 40a810d616b5..a406e328d5a6 100644 --- a/substrate/client/network/benches/notifications_protocol.rs +++ b/substrate/client/network/benches/notifications_protocol.rs @@ -36,19 +36,16 @@ use std::{sync::Arc, time::Duration}; use substrate_test_runtime_client::runtime; use tokio::{sync::Mutex, task::JoinHandle}; -const SMALL_PAYLOAD: &[(u32, usize, &'static str)] = &[ - // (Exponent of size, number of notifications, label) - (6, 100, "64B"), - (9, 100, "512B"), - (12, 100, "4KB"), - (15, 100, "64KB"), -]; -const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[ - // (Exponent of size, number of notifications, label) - (18, 10, "256KB"), - (21, 10, "2MB"), - (24, 10, "16MB"), - (27, 10, "128MB"), +const NUMBER_OF_NOTIFICATIONS: usize = 100; +const PAYLOAD: &[(u32, &'static str)] = &[ + // (Exponent of size, label) + (6, "64B"), + (9, "512B"), + (12, "4KB"), + (15, "64KB"), + (18, "256KB"), + (21, "2MB"), + (24, "16MB"), ]; const MAX_SIZE: u64 = 2u64.pow(30); @@ -156,12 +153,19 @@ where tokio::select! { Some(event) = notification_service1.next_event() => { if let NotificationEvent::NotificationStreamOpened { .. } = event { - break; + // Send a 32MB notification to preheat the network + notification_service1.send_async_notification(&peer_id2, vec![0; 2usize.pow(25)]).await.unwrap(); } }, Some(event) = notification_service2.next_event() => { - if let NotificationEvent::ValidateInboundSubstream { result_tx, .. } = event { - result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap(); + match event { + NotificationEvent::ValidateInboundSubstream { result_tx, .. } => { + result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap(); + }, + NotificationEvent::NotificationReceived { .. } => { + break; + } + _ => {} } }, } @@ -255,64 +259,53 @@ async fn run_with_backpressure(setup: Arc, size: usize, limit: usize let _ = tokio::join!(network1, network2); } -fn run_benchmark(c: &mut Criterion, payload: &[(u32, usize, &'static str)], group: &str) { +fn run_benchmark(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); - let mut group = c.benchmark_group(group); + let mut group = c.benchmark_group("notifications_protocol"); group.plot_config(plot_config); + group.sample_size(10); let libp2p_setup = setup_workers::>(&rt); - for &(exponent, limit, label) in payload.iter() { + for &(exponent, label) in PAYLOAD.iter() { let size = 2usize.pow(exponent); - group.throughput(Throughput::Bytes(limit as u64 * size as u64)); - group.bench_with_input( - BenchmarkId::new("libp2p/serially", label), - &(size, limit), - |b, &(size, limit)| { - b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit)); - }, - ); + group.throughput(Throughput::Bytes(NUMBER_OF_NOTIFICATIONS as u64 * size as u64)); + group.bench_with_input(BenchmarkId::new("libp2p/serially", label), &size, |b, &size| { + b.to_async(&rt) + .iter(|| run_serially(Arc::clone(&libp2p_setup), size, NUMBER_OF_NOTIFICATIONS)); + }); group.bench_with_input( BenchmarkId::new("libp2p/with_backpressure", label), - &(size, limit), - |b, &(size, limit)| { - b.to_async(&rt) - .iter(|| run_with_backpressure(Arc::clone(&libp2p_setup), size, limit)); + &size, + |b, &size| { + b.to_async(&rt).iter(|| { + run_with_backpressure(Arc::clone(&libp2p_setup), size, NUMBER_OF_NOTIFICATIONS) + }); }, ); } drop(libp2p_setup); let litep2p_setup = setup_workers::(&rt); - for &(exponent, limit, label) in payload.iter() { + for &(exponent, label) in PAYLOAD.iter() { let size = 2usize.pow(exponent); - group.throughput(Throughput::Bytes(limit as u64 * size as u64)); - group.bench_with_input( - BenchmarkId::new("litep2p/serially", label), - &(size, limit), - |b, &(size, limit)| { - b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit)); - }, - ); + group.throughput(Throughput::Bytes(NUMBER_OF_NOTIFICATIONS as u64 * size as u64)); + group.bench_with_input(BenchmarkId::new("litep2p/serially", label), &size, |b, &size| { + b.to_async(&rt) + .iter(|| run_serially(Arc::clone(&litep2p_setup), size, NUMBER_OF_NOTIFICATIONS)); + }); group.bench_with_input( BenchmarkId::new("litep2p/with_backpressure", label), - &(size, limit), - |b, &(size, limit)| { - b.to_async(&rt) - .iter(|| run_with_backpressure(Arc::clone(&litep2p_setup), size, limit)); + &size, + |b, &size| { + b.to_async(&rt).iter(|| { + run_with_backpressure(Arc::clone(&litep2p_setup), size, NUMBER_OF_NOTIFICATIONS) + }); }, ); } drop(litep2p_setup); } -fn run_benchmark_with_small_payload(c: &mut Criterion) { - run_benchmark(c, SMALL_PAYLOAD, "notifications_protocol/small_payload"); -} - -fn run_benchmark_with_large_payload(c: &mut Criterion) { - run_benchmark(c, LARGE_PAYLOAD, "notifications_protocol/large_payload"); -} - -criterion_group!(benches, run_benchmark_with_small_payload, run_benchmark_with_large_payload); +criterion_group!(benches, run_benchmark); criterion_main!(benches); diff --git a/substrate/client/network/benches/request_response_protocol.rs b/substrate/client/network/benches/request_response_protocol.rs index 85381112b753..97c6d72ddf1e 100644 --- a/substrate/client/network/benches/request_response_protocol.rs +++ b/substrate/client/network/benches/request_response_protocol.rs @@ -37,19 +37,16 @@ use substrate_test_runtime_client::runtime; use tokio::{sync::Mutex, task::JoinHandle}; const MAX_SIZE: u64 = 2u64.pow(30); -const SMALL_PAYLOAD: &[(u32, usize, &'static str)] = &[ - // (Exponent of size, number of requests, label) - (6, 100, "64B"), - (9, 100, "512B"), - (12, 100, "4KB"), - (15, 100, "64KB"), -]; -const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[ - // (Exponent of size, number of requests, label) - (18, 10, "256KB"), - (21, 10, "2MB"), - (24, 10, "16MB"), - (27, 10, "128MB"), +const NUMBER_OF_REQUESTS: usize = 100; +const PAYLOAD: &[(u32, &'static str)] = &[ + // (Exponent of size, label) + (6, "64B"), + (9, "512B"), + (12, "4KB"), + (15, "64KB"), + (18, "256KB"), + (21, "2MB"), + (24, "16MB"), ]; pub fn create_network_worker() -> ( @@ -154,6 +151,21 @@ where let handle1 = tokio::spawn(worker1.run()); let handle2 = tokio::spawn(worker2.run()); + let _ = tokio::spawn({ + let rx2 = rx2.clone(); + + async move { + let req = rx2.recv().await.unwrap(); + req.pending_response + .send(OutgoingResponse { + result: Ok(vec![0; 2usize.pow(25)]), + reputation_changes: vec![], + sent_feedback: None, + }) + .unwrap(); + } + }); + let ready = tokio::spawn({ let network_service1 = Arc::clone(&network_service1); @@ -165,6 +177,16 @@ where network_service2.listen_addresses()[0].clone() }; network_service1.add_known_address(peer_id2, listen_address2.into()); + let _ = network_service1 + .request( + peer_id2.into(), + "/request-response/1".into(), + vec![0; 2], + None, + IfDisconnected::TryConnect, + ) + .await + .unwrap(); } }); @@ -210,8 +232,8 @@ async fn run_serially(setup: Arc, size: usize, limit: usize) { async move { loop { tokio::select! { - res = rx2.recv() => { - let IncomingRequest { pending_response, .. } = res.unwrap(); + req = rx2.recv() => { + let IncomingRequest { pending_response, .. } = req.unwrap(); pending_response.send(OutgoingResponse { result: Ok(vec![0; size]), reputation_changes: vec![], @@ -269,49 +291,35 @@ async fn run_with_backpressure(setup: Arc, size: usize, limit: usize let _ = tokio::join!(network1, network2); } -fn run_benchmark(c: &mut Criterion, payload: &[(u32, usize, &'static str)], group: &str) { +fn run_benchmark(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); - let mut group = c.benchmark_group(group); + let mut group = c.benchmark_group("request_response_protocol"); group.plot_config(plot_config); + group.sample_size(10); let libp2p_setup = setup_workers::>(&rt); - for &(exponent, limit, label) in payload.iter() { + for &(exponent, label) in PAYLOAD.iter() { let size = 2usize.pow(exponent); - group.throughput(Throughput::Bytes(limit as u64 * size as u64)); - group.bench_with_input( - BenchmarkId::new("libp2p/serially", label), - &(size, limit), - |b, &(size, limit)| { - b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit)); - }, - ); + group.throughput(Throughput::Bytes(NUMBER_OF_REQUESTS as u64 * size as u64)); + group.bench_with_input(BenchmarkId::new("libp2p/serially", label), &size, |b, &size| { + b.to_async(&rt) + .iter(|| run_serially(Arc::clone(&libp2p_setup), size, NUMBER_OF_REQUESTS)); + }); } drop(libp2p_setup); - // TODO: NetworkRequest::request should be implemented for Litep2pNetworkService let litep2p_setup = setup_workers::(&rt); - // for &(exponent, limit, label) in payload.iter() { - // let size = 2usize.pow(exponent); - // group.throughput(Throughput::Bytes(limit as u64 * size as u64)); - // group.bench_with_input( - // BenchmarkId::new("litep2p/serially", label), - // &(size, limit), - // |b, &(size, limit)| { - // b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit)); - // }, - // ); - // } + for &(exponent, label) in PAYLOAD.iter() { + let size = 2usize.pow(exponent); + group.throughput(Throughput::Bytes(NUMBER_OF_REQUESTS as u64 * size as u64)); + group.bench_with_input(BenchmarkId::new("litep2p/serially", label), &size, |b, &size| { + b.to_async(&rt) + .iter(|| run_serially(Arc::clone(&litep2p_setup), size, NUMBER_OF_REQUESTS)); + }); + } drop(litep2p_setup); } -fn run_benchmark_with_small_payload(c: &mut Criterion) { - run_benchmark(c, SMALL_PAYLOAD, "request_response_benchmark/small_payload"); -} - -fn run_benchmark_with_large_payload(c: &mut Criterion) { - run_benchmark(c, LARGE_PAYLOAD, "request_response_benchmark/large_payload"); -} - -criterion_group!(benches, run_benchmark_with_small_payload, run_benchmark_with_large_payload); +criterion_group!(benches, run_benchmark); criterion_main!(benches);