From 7284cc399d4044e4f4c6c353c0a4649d1238f92c Mon Sep 17 00:00:00 2001 From: Miguel Date: Wed, 19 Apr 2023 21:09:15 -0400 Subject: [PATCH 1/7] Add quinn and s2n-quic benchmarks --- crates/ursa-pod/Cargo.toml | 7 + crates/ursa-pod/benches/e2e.rs | 368 ++++++++++++++++++++++++++++++ crates/ursa-pod/src/client.rs | 6 +- crates/ursa-pod/src/connection.rs | 7 +- crates/ursa-pod/src/server.rs | 4 +- 5 files changed, 385 insertions(+), 7 deletions(-) diff --git a/crates/ursa-pod/Cargo.toml b/crates/ursa-pod/Cargo.toml index 48d9b819..aba770f8 100644 --- a/crates/ursa-pod/Cargo.toml +++ b/crates/ursa-pod/Cargo.toml @@ -27,6 +27,11 @@ tracing = { version = "0.1", optional = true } # Criterion benchmark dependencies hyper = { version = "1.0.0-rc.3", features = ["full"], optional = true } http-body-util = { version = "0.1.0-rc.2", optional = true } +quinn = { version = "0.9.3", optional = true } +rcgen = { version = "0.10.0", optional = true } +rustls = { version = "0.20.8", default-features = false, features = ["dangerous_configuration"], optional = true } +s2n-quic = { version = "1.18.0", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] , optional = true } +zeroize_derive = { version = "<1.4", optional = true } # Binary benchmark dependencies serde = { version = "1.0", features = ["derive"], optional = true } @@ -47,6 +52,8 @@ client = ["tokio", "tokio-stream", "futures", "tracing"] server = ["tokio", "tokio-stream", "futures", "tracing"] bench-hyper = ["tokio", "hyper", "http-body-util"] benchmarks = ["serde", "serde_json", "rayon", "fnv", "clap", "gnuplot"] +bench-quinn = ["tokio", "quinn", "rcgen", "rustls"] +bench-s2n-quic = ["tokio", "s2n-quic", "zeroize_derive", "rcgen", "rustls"] [[bench]] name = "encrypt" diff --git a/crates/ursa-pod/benches/e2e.rs b/crates/ursa-pod/benches/e2e.rs index 3e903524..3f204284 100644 --- a/crates/ursa-pod/benches/e2e.rs +++ b/crates/ursa-pod/benches/e2e.rs @@ -90,6 +90,7 @@ fn protocol_benchmarks(c: &mut Criterion) { ("Content Size (Kilobyte)", KILOBYTE_FILES, 1024), ("Content Size (Megabyte)", MEGABYTE_FILES, 1024 * 1024), ] { + #[cfg(all(not(feature = "bench-quinn"), not(feature = "bench-s2n-quic")))] { let mut g = c.benchmark_group(format!("TCP UFDP/{range}")); g.sample_size(20); @@ -114,6 +115,32 @@ fn protocol_benchmarks(c: &mut Criterion) { http_hyper::server_loop, ); } + + #[cfg(feature = "bench-quinn")] + { + let mut g = c.benchmark_group(format!("QUINN UFDP/{range}")); + g.sample_size(20); + benchmark_sizes( + &mut g, + files, + unit, + quinn_ufdp::client_loop, + quinn_ufdp::server_loop, + ); + } + + #[cfg(feature = "bench-s2n-quic")] + { + let mut g = c.benchmark_group(format!("S2N-QUIC UFDP/{range}")); + g.sample_size(20); + benchmark_sizes( + &mut g, + files, + unit, + s2n_quic_ufdp::client_loop, + s2n_quic_ufdp::server_loop, + ); + } } } @@ -267,5 +294,346 @@ mod http_hyper { } } +#[cfg(feature = "bench-quinn")] +mod quinn_ufdp { + use super::tls_utils::{client_config, server_config}; + use futures::future::join_all; + use quinn::{ConnectionError, Endpoint, RecvStream, SendStream, ServerConfig, TransportConfig, VarInt, WriteError}; + use std::{ + io::Error, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + }; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + use tokio::task; + use ursa_pod::{ + client::UfdpClient, + connection::consts::MAX_BLOCK_SIZE, + server::{Backend, UfdpHandler}, + types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, + }; + + const DECRYPTION_KEY: [u8; 33] = [3u8; 33]; + const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; + const CID: Blake3Cid = Blake3Cid([3u8; 32]); + + #[derive(Clone, Copy)] + struct DummyBackend { + content: &'static [u8], + } + + impl Backend for DummyBackend { + fn raw_block(&self, _cid: &Blake3Cid, block: u64) -> Option<&[u8]> { + let s = block as usize * MAX_BLOCK_SIZE; + if s < self.content.len() { + let e = self.content.len().min(s + MAX_BLOCK_SIZE); + Some(&self.content[s..e]) + } else { + None + } + } + + fn decryption_key(&self, _request_id: u64) -> (ursa_pod::types::Secp256k1AffinePoint, u64) { + (DECRYPTION_KEY, 0) + } + + fn get_balance(&self, _pubkey: Secp256k1PublicKey) -> u128 { + 9001 + } + + fn save_batch(&self, _batch: BlsSignature) -> Result<(), String> { + Ok(()) + } + } + + pub async fn server_loop( + addr: String, + content: &'static [u8], + tx_started: tokio::sync::oneshot::Sender, + ) { + let mut server_config = ServerConfig::with_crypto(Arc::new(server_config())); + let server = Endpoint::server(server_config, addr.parse().unwrap()).unwrap(); + let port = server.local_addr().unwrap().port(); + + tx_started.send(port).unwrap(); + + while let Some(connecting) = server.accept().await { + let connection = connecting.await.unwrap(); + let content_clone = content.clone(); + task::spawn(async move { + loop { + match connection.accept_bi().await { + Ok((tx, rx)) => { + task::spawn(async { + let stream = BiStream { tx, rx }; + let mut handler = UfdpHandler::new( + stream, + DummyBackend { + content: content_clone, + }, + 0, + ); + if let Err(e) = handler.serve().await { + println!("server error: {e:?}"); + } + }); + } + Err(ConnectionError::ApplicationClosed(_)) => { + // Client closed the connection. + break; + } + Err(e) => panic!("{e:?}"), + } + } + }); + } + } + + pub async fn client_loop(addr: String, iterations: usize) { + let mut tasks = vec![]; + let mut endpoint = Endpoint::client("0.0.0.0:0".parse().unwrap()).unwrap(); + let mut client_config = quinn::ClientConfig::new(Arc::new(client_config())); + endpoint.set_default_client_config(client_config); + let stream = endpoint + .connect(addr.parse().unwrap(), "localhost") + .unwrap() + .await + .unwrap(); + for _ in 0..iterations { + let (tx, rx) = stream.open_bi().await.unwrap(); + let stream = BiStream { tx, rx }; + let task = task::spawn(async move { + let mut client = UfdpClient::new(stream, CLIENT_PUB_KEY, None).await.unwrap(); + client.request(CID).await.unwrap(); + let mut stream = client.finish(); + stream.tx.finish().await.unwrap(); + }); + tasks.push(task); + } + join_all(tasks).await; + } + + // Bidirectional QUIC stream for sending one request. + struct BiStream { + tx: SendStream, + rx: RecvStream, + } + + impl AsyncRead for BiStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.rx).poll_read(cx, buf) + } + } + + impl AsyncWrite for BiStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.tx).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.tx).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.tx).poll_shutdown(cx) + } + } +} + +#[cfg(feature = "bench-s2n-quic")] +mod s2n_quic_ufdp { + use super::tls_utils::{client_config, server_config}; + use futures::future::join_all; + use s2n_quic::{client::Connect, provider::tls, Client, Server}; + use std::net::SocketAddr; + use tokio::task; + use ursa_pod::{ + client::UfdpClient, + connection::consts::MAX_BLOCK_SIZE, + server::{Backend, UfdpHandler}, + types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, + }; + + const DECRYPTION_KEY: [u8; 33] = [3u8; 33]; + const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; + const CID: Blake3Cid = Blake3Cid([3u8; 32]); + + #[derive(Clone, Copy)] + struct DummyBackend { + content: &'static [u8], + } + + impl Backend for DummyBackend { + fn raw_block(&self, _cid: &Blake3Cid, block: u64) -> Option<&[u8]> { + let s = block as usize * MAX_BLOCK_SIZE; + if s < self.content.len() { + let e = self.content.len().min(s + MAX_BLOCK_SIZE); + Some(&self.content[s..e]) + } else { + None + } + } + + fn decryption_key(&self, _request_id: u64) -> (ursa_pod::types::Secp256k1AffinePoint, u64) { + (DECRYPTION_KEY, 0) + } + + fn get_balance(&self, _pubkey: Secp256k1PublicKey) -> u128 { + 9001 + } + + fn save_batch(&self, _batch: BlsSignature) -> Result<(), String> { + Ok(()) + } + } + + pub struct TlsProvider; + + impl tls::Provider for TlsProvider { + type Server = tls::rustls::Server; + type Client = tls::rustls::Client; + type Error = rustls::Error; + + fn start_server(self) -> Result { + Ok(server_config().into()) + } + + fn start_client(self) -> Result { + Ok(client_config().into()) + } + } + + pub async fn server_loop( + addr: String, + content: &'static [u8], + tx_started: tokio::sync::oneshot::Sender, + ) { + let mut server = Server::builder() + .with_tls(TlsProvider) + .unwrap() + .with_io(addr.as_str()) + .unwrap() + .start() + .unwrap(); + + tx_started + .send(server.local_addr().unwrap().port()) + .unwrap(); + + while let Some(mut conn) = server.accept().await { + let content_clone = content.clone(); + task::spawn(async move { + loop { + match conn.accept_bidirectional_stream().await { + Ok(Some(stream)) => { + task::spawn(async { + let handler = UfdpHandler::new( + stream, + DummyBackend { + content: content_clone, + }, + 0, + ); + if let Err(e) = handler.serve().await { + println!("server error: {e:?}"); + } + }); + } + Ok(None) => break, + Err(s2n_quic::connection::Error::Closed { .. }) => break, + Err(e) => panic!("{e:?}"), + } + } + }); + } + } + + pub async fn client_loop(addr: String, iterations: usize) { + let mut tasks = vec![]; + let client = Client::builder() + .with_tls(TlsProvider) + .unwrap() + .with_io("0.0.0.0:0") + .unwrap() + .start() + .unwrap(); + let addr: SocketAddr = addr.parse().unwrap(); + let mut connection = client + .connect(Connect::new(addr).with_server_name("localhost")) + .await + .unwrap(); + for _ in 0..iterations { + let stream = connection.open_bidirectional_stream().await.unwrap(); + let task = task::spawn(async move { + let mut client = UfdpClient::new(stream, CLIENT_PUB_KEY, None).await.unwrap(); + client.request(CID).await.unwrap(); + }); + tasks.push(task); + } + join_all(tasks).await; + } +} + +#[cfg(any(feature = "bench-quinn", feature = "bench-s2n-quic"))] +mod tls_utils { + use std::sync::Arc; + + pub struct SkipServerVerification; + + impl SkipServerVerification { + fn new() -> Arc { + Arc::new(Self) + } + } + + impl rustls::client::ServerCertVerifier for SkipServerVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } + } + + pub fn server_config() -> rustls::ServerConfig { + let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap(); + let key = rustls::PrivateKey(cert.serialize_private_key_der()); + let cert = vec![rustls::Certificate(cert.serialize_der().unwrap())]; + let mut config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(cert, key) + .unwrap(); + config.alpn_protocols = vec![b"ufdp".to_vec()]; + config + } + + pub fn client_config() -> rustls::ClientConfig { + let mut config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_no_client_auth(); + config.alpn_protocols = vec![b"ufdp".to_vec()]; + config + } +} + criterion_group!(benches, protocol_benchmarks); criterion_main!(benches); diff --git a/crates/ursa-pod/src/client.rs b/crates/ursa-pod/src/client.rs index d21e4214..f19eb624 100644 --- a/crates/ursa-pod/src/client.rs +++ b/crates/ursa-pod/src/client.rs @@ -12,7 +12,7 @@ use crate::{ /// UFDP Client. Accepts any stream of bytes supporting [`AsyncRead`] + [`AsyncWrite`] pub struct UfdpClient { - conn: UfdpConnection, + pub conn: UfdpConnection, lane: u8, } @@ -143,6 +143,10 @@ where Ok(size) } + pub fn finish(self) -> S { + self.conn.stream + } + /// Get the lane assigned to the connection pub fn lane(&self) -> u8 { self.lane diff --git a/crates/ursa-pod/src/connection.rs b/crates/ursa-pod/src/connection.rs index 7064d701..4ae81089 100644 --- a/crates/ursa-pod/src/connection.rs +++ b/crates/ursa-pod/src/connection.rs @@ -479,6 +479,9 @@ where #[inline(always)] pub async fn read_frame(&mut self, filter: Option) -> std::io::Result> { loop { + if let Some(frame) = self.parse_frame(filter)? { + return Ok(Some(frame)); + } if 0 == self.stream.read_buf(&mut self.read_buffer).await? { // The remote closed the connection. For this to be // a clean shutdown, there should be no data in the @@ -493,10 +496,6 @@ where )); } } - - if let Some(frame) = self.parse_frame(filter)? { - return Ok(Some(frame)); - } } } diff --git a/crates/ursa-pod/src/server.rs b/crates/ursa-pod/src/server.rs index 9b66aca2..b1d7cbb0 100644 --- a/crates/ursa-pod/src/server.rs +++ b/crates/ursa-pod/src/server.rs @@ -47,7 +47,7 @@ impl UfdpHandler { } /// Begin serving a request. Accepts a handshake, and then begins the request loop. - pub async fn serve(mut self) -> Result<(), UrsaCodecError> { + pub async fn serve(mut self) -> Result { // Step 1: Perform the handshake. instrument!( self.handshake().await?, @@ -89,7 +89,7 @@ impl UfdpHandler { } } - Ok(()) + Ok(self.conn.stream) } /// Await and respond to a handshake From e0edbcfdc1cc2dacaadb0e0ee87c9527e4d8e344 Mon Sep 17 00:00:00 2001 From: Miguel Date: Wed, 19 Apr 2023 21:19:50 -0400 Subject: [PATCH 2/7] Reuse code --- crates/ursa-pod/benches/e2e.rs | 139 +++++++++++---------------------- 1 file changed, 46 insertions(+), 93 deletions(-) diff --git a/crates/ursa-pod/benches/e2e.rs b/crates/ursa-pod/benches/e2e.rs index 3f204284..76135c44 100644 --- a/crates/ursa-pod/benches/e2e.rs +++ b/crates/ursa-pod/benches/e2e.rs @@ -2,8 +2,12 @@ use criterion::{measurement::Measurement, *}; use futures::Future; use std::time::Duration; use tokio::sync::oneshot; +use ursa_pod::connection::consts::MAX_BLOCK_SIZE; +use ursa_pod::server::Backend; +use ursa_pod::types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}; const MAX_REQUESTS: usize = 64; +const DECRYPTION_KEY: [u8; 33] = [3u8; 33]; const KILOBYTE_FILES: &[&[u8]] = &[ &[0u8; 1024], @@ -144,7 +148,37 @@ fn protocol_benchmarks(c: &mut Criterion) { } } +#[derive(Clone, Copy)] +struct DummyBackend { + content: &'static [u8], +} + +impl Backend for DummyBackend { + fn raw_block(&self, _cid: &Blake3Cid, block: u64) -> Option<&[u8]> { + let s = block as usize * MAX_BLOCK_SIZE; + if s < self.content.len() { + let e = self.content.len().min(s + MAX_BLOCK_SIZE); + Some(&self.content[s..e]) + } else { + None + } + } + + fn decryption_key(&self, _request_id: u64) -> (ursa_pod::types::Secp256k1AffinePoint, u64) { + (DECRYPTION_KEY, 0) + } + + fn get_balance(&self, _pubkey: Secp256k1PublicKey) -> u128 { + 9001 + } + + fn save_batch(&self, _batch: BlsSignature) -> Result<(), String> { + Ok(()) + } +} + mod tcp_ufdp { + use super::DummyBackend; use futures::future::join_all; use tokio::{ net::{TcpListener, TcpStream}, @@ -157,39 +191,9 @@ mod tcp_ufdp { types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, }; - const DECRYPTION_KEY: [u8; 33] = [3u8; 33]; const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; const CID: Blake3Cid = Blake3Cid([3u8; 32]); - #[derive(Clone, Copy)] - struct DummyBackend { - content: &'static [u8], - } - - impl Backend for DummyBackend { - fn raw_block(&self, _cid: &Blake3Cid, block: u64) -> Option<&[u8]> { - let s = block as usize * MAX_BLOCK_SIZE; - if s < self.content.len() { - let e = self.content.len().min(s + MAX_BLOCK_SIZE); - Some(&self.content[s..e]) - } else { - None - } - } - - fn decryption_key(&self, _request_id: u64) -> (ursa_pod::types::Secp256k1AffinePoint, u64) { - (DECRYPTION_KEY, 0) - } - - fn get_balance(&self, _pubkey: Secp256k1PublicKey) -> u128 { - 9001 - } - - fn save_batch(&self, _batch: BlsSignature) -> Result<(), String> { - Ok(()) - } - } - /// Simple tcp server loop that replies with static content pub async fn server_loop( addr: String, @@ -296,9 +300,15 @@ mod http_hyper { #[cfg(feature = "bench-quinn")] mod quinn_ufdp { - use super::tls_utils::{client_config, server_config}; + use super::{ + tls_utils::{client_config, server_config}, + DummyBackend, + }; use futures::future::join_all; - use quinn::{ConnectionError, Endpoint, RecvStream, SendStream, ServerConfig, TransportConfig, VarInt, WriteError}; + use quinn::{ + ConnectionError, Endpoint, RecvStream, SendStream, ServerConfig, TransportConfig, VarInt, + WriteError, + }; use std::{ io::Error, pin::Pin, @@ -314,39 +324,9 @@ mod quinn_ufdp { types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, }; - const DECRYPTION_KEY: [u8; 33] = [3u8; 33]; const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; const CID: Blake3Cid = Blake3Cid([3u8; 32]); - #[derive(Clone, Copy)] - struct DummyBackend { - content: &'static [u8], - } - - impl Backend for DummyBackend { - fn raw_block(&self, _cid: &Blake3Cid, block: u64) -> Option<&[u8]> { - let s = block as usize * MAX_BLOCK_SIZE; - if s < self.content.len() { - let e = self.content.len().min(s + MAX_BLOCK_SIZE); - Some(&self.content[s..e]) - } else { - None - } - } - - fn decryption_key(&self, _request_id: u64) -> (ursa_pod::types::Secp256k1AffinePoint, u64) { - (DECRYPTION_KEY, 0) - } - - fn get_balance(&self, _pubkey: Secp256k1PublicKey) -> u128 { - 9001 - } - - fn save_batch(&self, _batch: BlsSignature) -> Result<(), String> { - Ok(()) - } - } - pub async fn server_loop( addr: String, content: &'static [u8], @@ -454,7 +434,10 @@ mod quinn_ufdp { #[cfg(feature = "bench-s2n-quic")] mod s2n_quic_ufdp { - use super::tls_utils::{client_config, server_config}; + use super::{ + tls_utils::{client_config, server_config}, + DummyBackend, + }; use futures::future::join_all; use s2n_quic::{client::Connect, provider::tls, Client, Server}; use std::net::SocketAddr; @@ -466,39 +449,9 @@ mod s2n_quic_ufdp { types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, }; - const DECRYPTION_KEY: [u8; 33] = [3u8; 33]; const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; const CID: Blake3Cid = Blake3Cid([3u8; 32]); - #[derive(Clone, Copy)] - struct DummyBackend { - content: &'static [u8], - } - - impl Backend for DummyBackend { - fn raw_block(&self, _cid: &Blake3Cid, block: u64) -> Option<&[u8]> { - let s = block as usize * MAX_BLOCK_SIZE; - if s < self.content.len() { - let e = self.content.len().min(s + MAX_BLOCK_SIZE); - Some(&self.content[s..e]) - } else { - None - } - } - - fn decryption_key(&self, _request_id: u64) -> (ursa_pod::types::Secp256k1AffinePoint, u64) { - (DECRYPTION_KEY, 0) - } - - fn get_balance(&self, _pubkey: Secp256k1PublicKey) -> u128 { - 9001 - } - - fn save_batch(&self, _batch: BlsSignature) -> Result<(), String> { - Ok(()) - } - } - pub struct TlsProvider; impl tls::Provider for TlsProvider { From 3bac43de1c2a7ae932d41d079b75c21f0f12d068 Mon Sep 17 00:00:00 2001 From: Miguel Date: Thu, 20 Apr 2023 15:44:47 -0400 Subject: [PATCH 3/7] Add one feature flag for all quic tests --- crates/ursa-pod/Cargo.toml | 4 +--- crates/ursa-pod/benches/e2e.rs | 12 +++++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/crates/ursa-pod/Cargo.toml b/crates/ursa-pod/Cargo.toml index aba770f8..daf8d095 100644 --- a/crates/ursa-pod/Cargo.toml +++ b/crates/ursa-pod/Cargo.toml @@ -31,7 +31,6 @@ quinn = { version = "0.9.3", optional = true } rcgen = { version = "0.10.0", optional = true } rustls = { version = "0.20.8", default-features = false, features = ["dangerous_configuration"], optional = true } s2n-quic = { version = "1.18.0", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] , optional = true } -zeroize_derive = { version = "<1.4", optional = true } # Binary benchmark dependencies serde = { version = "1.0", features = ["derive"], optional = true } @@ -52,8 +51,7 @@ client = ["tokio", "tokio-stream", "futures", "tracing"] server = ["tokio", "tokio-stream", "futures", "tracing"] bench-hyper = ["tokio", "hyper", "http-body-util"] benchmarks = ["serde", "serde_json", "rayon", "fnv", "clap", "gnuplot"] -bench-quinn = ["tokio", "quinn", "rcgen", "rustls"] -bench-s2n-quic = ["tokio", "s2n-quic", "zeroize_derive", "rcgen", "rustls"] +bench-quic = ["tokio", "quinn", "s2n-quic", "rcgen", "rustls"] [[bench]] name = "encrypt" diff --git a/crates/ursa-pod/benches/e2e.rs b/crates/ursa-pod/benches/e2e.rs index 76135c44..2d8ce1e5 100644 --- a/crates/ursa-pod/benches/e2e.rs +++ b/crates/ursa-pod/benches/e2e.rs @@ -94,7 +94,6 @@ fn protocol_benchmarks(c: &mut Criterion) { ("Content Size (Kilobyte)", KILOBYTE_FILES, 1024), ("Content Size (Megabyte)", MEGABYTE_FILES, 1024 * 1024), ] { - #[cfg(all(not(feature = "bench-quinn"), not(feature = "bench-s2n-quic")))] { let mut g = c.benchmark_group(format!("TCP UFDP/{range}")); g.sample_size(20); @@ -120,7 +119,7 @@ fn protocol_benchmarks(c: &mut Criterion) { ); } - #[cfg(feature = "bench-quinn")] + #[cfg(feature = "bench-quic")] { let mut g = c.benchmark_group(format!("QUINN UFDP/{range}")); g.sample_size(20); @@ -133,7 +132,7 @@ fn protocol_benchmarks(c: &mut Criterion) { ); } - #[cfg(feature = "bench-s2n-quic")] + #[cfg(feature = "bench-quic")] { let mut g = c.benchmark_group(format!("S2N-QUIC UFDP/{range}")); g.sample_size(20); @@ -186,7 +185,6 @@ mod tcp_ufdp { }; use ursa_pod::{ client::UfdpClient, - connection::consts::MAX_BLOCK_SIZE, server::{Backend, UfdpHandler}, types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, }; @@ -298,7 +296,7 @@ mod http_hyper { } } -#[cfg(feature = "bench-quinn")] +#[cfg(feature = "bench-quic")] mod quinn_ufdp { use super::{ tls_utils::{client_config, server_config}, @@ -432,7 +430,7 @@ mod quinn_ufdp { } } -#[cfg(feature = "bench-s2n-quic")] +#[cfg(feature = "bench-quic")] mod s2n_quic_ufdp { use super::{ tls_utils::{client_config, server_config}, @@ -539,7 +537,7 @@ mod s2n_quic_ufdp { } } -#[cfg(any(feature = "bench-quinn", feature = "bench-s2n-quic"))] +#[cfg(feature = "bench-quic")] mod tls_utils { use std::sync::Arc; From f41488b00e0490510ca96d0121e8c65085238376 Mon Sep 17 00:00:00 2001 From: Miguel Date: Thu, 20 Apr 2023 15:51:07 -0400 Subject: [PATCH 4/7] Clean up --- crates/ursa-pod/benches/e2e.rs | 31 +++++++------------------------ 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/crates/ursa-pod/benches/e2e.rs b/crates/ursa-pod/benches/e2e.rs index 2d8ce1e5..c3407c48 100644 --- a/crates/ursa-pod/benches/e2e.rs +++ b/crates/ursa-pod/benches/e2e.rs @@ -183,11 +183,7 @@ mod tcp_ufdp { net::{TcpListener, TcpStream}, task, }; - use ursa_pod::{ - client::UfdpClient, - server::{Backend, UfdpHandler}, - types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, - }; + use ursa_pod::{client::UfdpClient, server::UfdpHandler, types::Blake3Cid}; const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; const CID: Blake3Cid = Blake3Cid([3u8; 32]); @@ -303,10 +299,7 @@ mod quinn_ufdp { DummyBackend, }; use futures::future::join_all; - use quinn::{ - ConnectionError, Endpoint, RecvStream, SendStream, ServerConfig, TransportConfig, VarInt, - WriteError, - }; + use quinn::{ConnectionError, Endpoint, RecvStream, SendStream, ServerConfig}; use std::{ io::Error, pin::Pin, @@ -315,12 +308,7 @@ mod quinn_ufdp { }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::task; - use ursa_pod::{ - client::UfdpClient, - connection::consts::MAX_BLOCK_SIZE, - server::{Backend, UfdpHandler}, - types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, - }; + use ursa_pod::{client::UfdpClient, server::UfdpHandler, types::Blake3Cid}; const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; const CID: Blake3Cid = Blake3Cid([3u8; 32]); @@ -330,7 +318,7 @@ mod quinn_ufdp { content: &'static [u8], tx_started: tokio::sync::oneshot::Sender, ) { - let mut server_config = ServerConfig::with_crypto(Arc::new(server_config())); + let server_config = ServerConfig::with_crypto(Arc::new(server_config())); let server = Endpoint::server(server_config, addr.parse().unwrap()).unwrap(); let port = server.local_addr().unwrap().port(); @@ -345,7 +333,7 @@ mod quinn_ufdp { Ok((tx, rx)) => { task::spawn(async { let stream = BiStream { tx, rx }; - let mut handler = UfdpHandler::new( + let handler = UfdpHandler::new( stream, DummyBackend { content: content_clone, @@ -371,7 +359,7 @@ mod quinn_ufdp { pub async fn client_loop(addr: String, iterations: usize) { let mut tasks = vec![]; let mut endpoint = Endpoint::client("0.0.0.0:0".parse().unwrap()).unwrap(); - let mut client_config = quinn::ClientConfig::new(Arc::new(client_config())); + let client_config = quinn::ClientConfig::new(Arc::new(client_config())); endpoint.set_default_client_config(client_config); let stream = endpoint .connect(addr.parse().unwrap(), "localhost") @@ -440,12 +428,7 @@ mod s2n_quic_ufdp { use s2n_quic::{client::Connect, provider::tls, Client, Server}; use std::net::SocketAddr; use tokio::task; - use ursa_pod::{ - client::UfdpClient, - connection::consts::MAX_BLOCK_SIZE, - server::{Backend, UfdpHandler}, - types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, - }; + use ursa_pod::{client::UfdpClient, server::UfdpHandler, types::Blake3Cid}; const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; const CID: Blake3Cid = Blake3Cid([3u8; 32]); From 14918c3eb05cda911088823df0948ff7507d0546 Mon Sep 17 00:00:00 2001 From: Miguel Date: Thu, 20 Apr 2023 15:52:53 -0400 Subject: [PATCH 5/7] Add doc --- crates/ursa-pod/src/client.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/ursa-pod/src/client.rs b/crates/ursa-pod/src/client.rs index f19eb624..79fc5da6 100644 --- a/crates/ursa-pod/src/client.rs +++ b/crates/ursa-pod/src/client.rs @@ -143,6 +143,7 @@ where Ok(size) } + /// Consumes the client and returns the underlying stream. pub fn finish(self) -> S { self.conn.stream } From 8a5b3ef0b0f83f140bcd47589dc8a3c83bdf5fa0 Mon Sep 17 00:00:00 2001 From: Miguel Date: Thu, 20 Apr 2023 15:59:17 -0400 Subject: [PATCH 6/7] Make field private again --- crates/ursa-pod/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/ursa-pod/src/client.rs b/crates/ursa-pod/src/client.rs index 79fc5da6..9ec72478 100644 --- a/crates/ursa-pod/src/client.rs +++ b/crates/ursa-pod/src/client.rs @@ -12,7 +12,7 @@ use crate::{ /// UFDP Client. Accepts any stream of bytes supporting [`AsyncRead`] + [`AsyncWrite`] pub struct UfdpClient { - pub conn: UfdpConnection, + conn: UfdpConnection, lane: u8, } From 27fbaf30be6a69d8a0d28200852e2d628da07944 Mon Sep 17 00:00:00 2001 From: Miguel Date: Thu, 20 Apr 2023 16:08:25 -0400 Subject: [PATCH 7/7] Clean up cargo --- crates/ursa-pod/Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/ursa-pod/Cargo.toml b/crates/ursa-pod/Cargo.toml index daf8d095..29b9c002 100644 --- a/crates/ursa-pod/Cargo.toml +++ b/crates/ursa-pod/Cargo.toml @@ -27,10 +27,10 @@ tracing = { version = "0.1", optional = true } # Criterion benchmark dependencies hyper = { version = "1.0.0-rc.3", features = ["full"], optional = true } http-body-util = { version = "0.1.0-rc.2", optional = true } -quinn = { version = "0.9.3", optional = true } -rcgen = { version = "0.10.0", optional = true } -rustls = { version = "0.20.8", default-features = false, features = ["dangerous_configuration"], optional = true } -s2n-quic = { version = "1.18.0", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] , optional = true } +quinn = { version = "0.9", optional = true } +rcgen = { version = "0.10", optional = true } +rustls = { version = "0.20", default-features = false, features = ["dangerous_configuration"], optional = true } +s2n-quic = { version = "1.18", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] , optional = true } # Binary benchmark dependencies serde = { version = "1.0", features = ["derive"], optional = true }