Skip to content

Commit

Permalink
networking-bench: Update benchmarks payload (#7056)
Browse files Browse the repository at this point in the history
# Description

- Used 10 notifications and requests within the benchmarks. After moving
the network workers' initialization out of the benchmarks, it is
acceptable to use this small number without losing precision.
- Removed the 128MB payload that consumed most of the execution time.
  • Loading branch information
AndreiEres authored Jan 9, 2025
1 parent 2f17958 commit 6bfe452
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 100 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/benchmarks-networking.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ jobs:
uses: benchmark-action/github-action-benchmark@v1
with:
tool: "cargo"
name: ${{ env.BENCH }}
output-file-path: ./charts/${{ env.BENCH }}.txt
benchmark-data-dir-path: ./bench/${{ env.BENCH }}
github-token: ${{ steps.app-token.outputs.token }}
Expand All @@ -103,6 +104,7 @@ jobs:
uses: benchmark-action/github-action-benchmark@v1
with:
tool: "cargo"
name: ${{ env.BENCH }}
output-file-path: ./charts/${{ env.BENCH }}.txt
benchmark-data-dir-path: ./bench/${{ env.BENCH }}
github-token: ${{ steps.app-token.outputs.token }}
Expand Down
99 changes: 46 additions & 53 deletions substrate/client/network/benches/notifications_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,16 @@ use std::{sync::Arc, time::Duration};
use substrate_test_runtime_client::runtime;
use tokio::{sync::Mutex, task::JoinHandle};

const SMALL_PAYLOAD: &[(u32, usize, &'static str)] = &[
// (Exponent of size, number of notifications, label)
(6, 100, "64B"),
(9, 100, "512B"),
(12, 100, "4KB"),
(15, 100, "64KB"),
];
const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[
// (Exponent of size, number of notifications, label)
(18, 10, "256KB"),
(21, 10, "2MB"),
(24, 10, "16MB"),
(27, 10, "128MB"),
const NUMBER_OF_NOTIFICATIONS: usize = 100;
const PAYLOAD: &[(u32, &'static str)] = &[
// (Exponent of size, label)
(6, "64B"),
(9, "512B"),
(12, "4KB"),
(15, "64KB"),
(18, "256KB"),
(21, "2MB"),
(24, "16MB"),
];
const MAX_SIZE: u64 = 2u64.pow(30);

Expand Down Expand Up @@ -156,12 +153,19 @@ where
tokio::select! {
Some(event) = notification_service1.next_event() => {
if let NotificationEvent::NotificationStreamOpened { .. } = event {
break;
// Send a 32MB notification to preheat the network
notification_service1.send_async_notification(&peer_id2, vec![0; 2usize.pow(25)]).await.unwrap();
}
},
Some(event) = notification_service2.next_event() => {
if let NotificationEvent::ValidateInboundSubstream { result_tx, .. } = event {
result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap();
match event {
NotificationEvent::ValidateInboundSubstream { result_tx, .. } => {
result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap();
},
NotificationEvent::NotificationReceived { .. } => {
break;
}
_ => {}
}
},
}
Expand Down Expand Up @@ -255,64 +259,53 @@ async fn run_with_backpressure(setup: Arc<BenchSetup>, size: usize, limit: usize
let _ = tokio::join!(network1, network2);
}

fn run_benchmark(c: &mut Criterion, payload: &[(u32, usize, &'static str)], group: &str) {
fn run_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic);
let mut group = c.benchmark_group(group);
let mut group = c.benchmark_group("notifications_protocol");
group.plot_config(plot_config);
group.sample_size(10);

let libp2p_setup = setup_workers::<runtime::Block, runtime::Hash, NetworkWorker<_, _>>(&rt);
for &(exponent, limit, label) in payload.iter() {
for &(exponent, label) in PAYLOAD.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(limit as u64 * size as u64));
group.bench_with_input(
BenchmarkId::new("libp2p/serially", label),
&(size, limit),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit));
},
);
group.throughput(Throughput::Bytes(NUMBER_OF_NOTIFICATIONS as u64 * size as u64));
group.bench_with_input(BenchmarkId::new("libp2p/serially", label), &size, |b, &size| {
b.to_async(&rt)
.iter(|| run_serially(Arc::clone(&libp2p_setup), size, NUMBER_OF_NOTIFICATIONS));
});
group.bench_with_input(
BenchmarkId::new("libp2p/with_backpressure", label),
&(size, limit),
|b, &(size, limit)| {
b.to_async(&rt)
.iter(|| run_with_backpressure(Arc::clone(&libp2p_setup), size, limit));
&size,
|b, &size| {
b.to_async(&rt).iter(|| {
run_with_backpressure(Arc::clone(&libp2p_setup), size, NUMBER_OF_NOTIFICATIONS)
});
},
);
}
drop(libp2p_setup);

let litep2p_setup = setup_workers::<runtime::Block, runtime::Hash, Litep2pNetworkBackend>(&rt);
for &(exponent, limit, label) in payload.iter() {
for &(exponent, label) in PAYLOAD.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(limit as u64 * size as u64));
group.bench_with_input(
BenchmarkId::new("litep2p/serially", label),
&(size, limit),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit));
},
);
group.throughput(Throughput::Bytes(NUMBER_OF_NOTIFICATIONS as u64 * size as u64));
group.bench_with_input(BenchmarkId::new("litep2p/serially", label), &size, |b, &size| {
b.to_async(&rt)
.iter(|| run_serially(Arc::clone(&litep2p_setup), size, NUMBER_OF_NOTIFICATIONS));
});
group.bench_with_input(
BenchmarkId::new("litep2p/with_backpressure", label),
&(size, limit),
|b, &(size, limit)| {
b.to_async(&rt)
.iter(|| run_with_backpressure(Arc::clone(&litep2p_setup), size, limit));
&size,
|b, &size| {
b.to_async(&rt).iter(|| {
run_with_backpressure(Arc::clone(&litep2p_setup), size, NUMBER_OF_NOTIFICATIONS)
});
},
);
}
drop(litep2p_setup);
}

fn run_benchmark_with_small_payload(c: &mut Criterion) {
run_benchmark(c, SMALL_PAYLOAD, "notifications_protocol/small_payload");
}

fn run_benchmark_with_large_payload(c: &mut Criterion) {
run_benchmark(c, LARGE_PAYLOAD, "notifications_protocol/large_payload");
}

criterion_group!(benches, run_benchmark_with_small_payload, run_benchmark_with_large_payload);
criterion_group!(benches, run_benchmark);
criterion_main!(benches);
102 changes: 55 additions & 47 deletions substrate/client/network/benches/request_response_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,16 @@ use substrate_test_runtime_client::runtime;
use tokio::{sync::Mutex, task::JoinHandle};

const MAX_SIZE: u64 = 2u64.pow(30);
const SMALL_PAYLOAD: &[(u32, usize, &'static str)] = &[
// (Exponent of size, number of requests, label)
(6, 100, "64B"),
(9, 100, "512B"),
(12, 100, "4KB"),
(15, 100, "64KB"),
];
const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[
// (Exponent of size, number of requests, label)
(18, 10, "256KB"),
(21, 10, "2MB"),
(24, 10, "16MB"),
(27, 10, "128MB"),
const NUMBER_OF_REQUESTS: usize = 100;
const PAYLOAD: &[(u32, &'static str)] = &[
// (Exponent of size, label)
(6, "64B"),
(9, "512B"),
(12, "4KB"),
(15, "64KB"),
(18, "256KB"),
(21, "2MB"),
(24, "16MB"),
];

pub fn create_network_worker<B, H, N>() -> (
Expand Down Expand Up @@ -154,6 +151,21 @@ where
let handle1 = tokio::spawn(worker1.run());
let handle2 = tokio::spawn(worker2.run());

let _ = tokio::spawn({
let rx2 = rx2.clone();

async move {
let req = rx2.recv().await.unwrap();
req.pending_response
.send(OutgoingResponse {
result: Ok(vec![0; 2usize.pow(25)]),
reputation_changes: vec![],
sent_feedback: None,
})
.unwrap();
}
});

let ready = tokio::spawn({
let network_service1 = Arc::clone(&network_service1);

Expand All @@ -165,6 +177,16 @@ where
network_service2.listen_addresses()[0].clone()
};
network_service1.add_known_address(peer_id2, listen_address2.into());
let _ = network_service1
.request(
peer_id2.into(),
"/request-response/1".into(),
vec![0; 2],
None,
IfDisconnected::TryConnect,
)
.await
.unwrap();
}
});

Expand Down Expand Up @@ -210,8 +232,8 @@ async fn run_serially(setup: Arc<BenchSetup>, size: usize, limit: usize) {
async move {
loop {
tokio::select! {
res = rx2.recv() => {
let IncomingRequest { pending_response, .. } = res.unwrap();
req = rx2.recv() => {
let IncomingRequest { pending_response, .. } = req.unwrap();
pending_response.send(OutgoingResponse {
result: Ok(vec![0; size]),
reputation_changes: vec![],
Expand Down Expand Up @@ -269,49 +291,35 @@ async fn run_with_backpressure(setup: Arc<BenchSetup>, size: usize, limit: usize
let _ = tokio::join!(network1, network2);
}

fn run_benchmark(c: &mut Criterion, payload: &[(u32, usize, &'static str)], group: &str) {
fn run_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic);
let mut group = c.benchmark_group(group);
let mut group = c.benchmark_group("request_response_protocol");
group.plot_config(plot_config);
group.sample_size(10);

let libp2p_setup = setup_workers::<runtime::Block, runtime::Hash, NetworkWorker<_, _>>(&rt);
for &(exponent, limit, label) in payload.iter() {
for &(exponent, label) in PAYLOAD.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(limit as u64 * size as u64));
group.bench_with_input(
BenchmarkId::new("libp2p/serially", label),
&(size, limit),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit));
},
);
group.throughput(Throughput::Bytes(NUMBER_OF_REQUESTS as u64 * size as u64));
group.bench_with_input(BenchmarkId::new("libp2p/serially", label), &size, |b, &size| {
b.to_async(&rt)
.iter(|| run_serially(Arc::clone(&libp2p_setup), size, NUMBER_OF_REQUESTS));
});
}
drop(libp2p_setup);

// TODO: NetworkRequest::request should be implemented for Litep2pNetworkService
let litep2p_setup = setup_workers::<runtime::Block, runtime::Hash, Litep2pNetworkBackend>(&rt);
// for &(exponent, limit, label) in payload.iter() {
// let size = 2usize.pow(exponent);
// group.throughput(Throughput::Bytes(limit as u64 * size as u64));
// group.bench_with_input(
// BenchmarkId::new("litep2p/serially", label),
// &(size, limit),
// |b, &(size, limit)| {
// b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit));
// },
// );
// }
for &(exponent, label) in PAYLOAD.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(NUMBER_OF_REQUESTS as u64 * size as u64));
group.bench_with_input(BenchmarkId::new("litep2p/serially", label), &size, |b, &size| {
b.to_async(&rt)
.iter(|| run_serially(Arc::clone(&litep2p_setup), size, NUMBER_OF_REQUESTS));
});
}
drop(litep2p_setup);
}

fn run_benchmark_with_small_payload(c: &mut Criterion) {
run_benchmark(c, SMALL_PAYLOAD, "request_response_benchmark/small_payload");
}

fn run_benchmark_with_large_payload(c: &mut Criterion) {
run_benchmark(c, LARGE_PAYLOAD, "request_response_benchmark/large_payload");
}

criterion_group!(benches, run_benchmark_with_small_payload, run_benchmark_with_large_payload);
criterion_group!(benches, run_benchmark);
criterion_main!(benches);

0 comments on commit 6bfe452

Please sign in to comment.