diff --git a/crates/ursa-pod/Cargo.toml b/crates/ursa-pod/Cargo.toml index 48d9b819..29b9c002 100644 --- a/crates/ursa-pod/Cargo.toml +++ b/crates/ursa-pod/Cargo.toml @@ -27,6 +27,10 @@ tracing = { version = "0.1", optional = true } # Criterion benchmark dependencies hyper = { version = "1.0.0-rc.3", features = ["full"], optional = true } http-body-util = { version = "0.1.0-rc.2", optional = true } +quinn = { version = "0.9", optional = true } +rcgen = { version = "0.10", optional = true } +rustls = { version = "0.20", default-features = false, features = ["dangerous_configuration"], optional = true } +s2n-quic = { version = "1.18", default-features = false, features = ["provider-address-token-default", "provider-tls-rustls"] , optional = true } # Binary benchmark dependencies serde = { version = "1.0", features = ["derive"], optional = true } @@ -47,6 +51,7 @@ client = ["tokio", "tokio-stream", "futures", "tracing"] server = ["tokio", "tokio-stream", "futures", "tracing"] bench-hyper = ["tokio", "hyper", "http-body-util"] benchmarks = ["serde", "serde_json", "rayon", "fnv", "clap", "gnuplot"] +bench-quic = ["tokio", "quinn", "s2n-quic", "rcgen", "rustls"] [[bench]] name = "encrypt" diff --git a/crates/ursa-pod/benches/e2e.rs b/crates/ursa-pod/benches/e2e.rs index 3e903524..c3407c48 100644 --- a/crates/ursa-pod/benches/e2e.rs +++ b/crates/ursa-pod/benches/e2e.rs @@ -2,8 +2,12 @@ use criterion::{measurement::Measurement, *}; use futures::Future; use std::time::Duration; use tokio::sync::oneshot; +use ursa_pod::connection::consts::MAX_BLOCK_SIZE; +use ursa_pod::server::Backend; +use ursa_pod::types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}; const MAX_REQUESTS: usize = 64; +const DECRYPTION_KEY: [u8; 33] = [3u8; 33]; const KILOBYTE_FILES: &[&[u8]] = &[ &[0u8; 1024], @@ -114,55 +118,76 @@ fn protocol_benchmarks(c: &mut Criterion) { http_hyper::server_loop, ); } + + #[cfg(feature = "bench-quic")] + { + let mut g = c.benchmark_group(format!("QUINN UFDP/{range}")); + g.sample_size(20); + benchmark_sizes( + &mut g, + files, + unit, + quinn_ufdp::client_loop, + quinn_ufdp::server_loop, + ); + } + + #[cfg(feature = "bench-quic")] + { + let mut g = c.benchmark_group(format!("S2N-QUIC UFDP/{range}")); + g.sample_size(20); + benchmark_sizes( + &mut g, + files, + unit, + s2n_quic_ufdp::client_loop, + s2n_quic_ufdp::server_loop, + ); + } + } +} + +#[derive(Clone, Copy)] +struct DummyBackend { + content: &'static [u8], +} + +impl Backend for DummyBackend { + fn raw_block(&self, _cid: &Blake3Cid, block: u64) -> Option<&[u8]> { + let s = block as usize * MAX_BLOCK_SIZE; + if s < self.content.len() { + let e = self.content.len().min(s + MAX_BLOCK_SIZE); + Some(&self.content[s..e]) + } else { + None + } + } + + fn decryption_key(&self, _request_id: u64) -> (ursa_pod::types::Secp256k1AffinePoint, u64) { + (DECRYPTION_KEY, 0) + } + + fn get_balance(&self, _pubkey: Secp256k1PublicKey) -> u128 { + 9001 + } + + fn save_batch(&self, _batch: BlsSignature) -> Result<(), String> { + Ok(()) } } mod tcp_ufdp { + use super::DummyBackend; use futures::future::join_all; use tokio::{ net::{TcpListener, TcpStream}, task, }; - use ursa_pod::{ - client::UfdpClient, - connection::consts::MAX_BLOCK_SIZE, - server::{Backend, UfdpHandler}, - types::{Blake3Cid, BlsSignature, Secp256k1PublicKey}, - }; + use ursa_pod::{client::UfdpClient, server::UfdpHandler, types::Blake3Cid}; - const DECRYPTION_KEY: [u8; 33] = [3u8; 33]; const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; const CID: Blake3Cid = Blake3Cid([3u8; 32]); - #[derive(Clone, Copy)] - struct DummyBackend { - content: &'static [u8], - } - - impl Backend for DummyBackend { - fn raw_block(&self, _cid: &Blake3Cid, block: u64) -> Option<&[u8]> { - let s = block as usize * MAX_BLOCK_SIZE; - if s < self.content.len() { - let e = self.content.len().min(s + MAX_BLOCK_SIZE); - Some(&self.content[s..e]) - } else { - None - } - } - - fn decryption_key(&self, _request_id: u64) -> (ursa_pod::types::Secp256k1AffinePoint, u64) { - (DECRYPTION_KEY, 0) - } - - fn get_balance(&self, _pubkey: Secp256k1PublicKey) -> u128 { - 9001 - } - - fn save_batch(&self, _batch: BlsSignature) -> Result<(), String> { - Ok(()) - } - } - /// Simple tcp server loop that replies with static content pub async fn server_loop( addr: String, @@ -267,5 +292,282 @@ mod http_hyper { } } +#[cfg(feature = "bench-quic")] +mod quinn_ufdp { + use super::{ + tls_utils::{client_config, server_config}, + DummyBackend, + }; + use futures::future::join_all; + use quinn::{ConnectionError, Endpoint, RecvStream, SendStream, ServerConfig}; + use std::{ + io::Error, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + }; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + use tokio::task; + use ursa_pod::{client::UfdpClient, server::UfdpHandler, types::Blake3Cid}; + + const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; + const CID: Blake3Cid = Blake3Cid([3u8; 32]); + + pub async fn server_loop( + addr: String, + content: &'static [u8], + tx_started: tokio::sync::oneshot::Sender, + ) { + let server_config = ServerConfig::with_crypto(Arc::new(server_config())); + let server = Endpoint::server(server_config, addr.parse().unwrap()).unwrap(); + let port = server.local_addr().unwrap().port(); + + tx_started.send(port).unwrap(); + + while let Some(connecting) = server.accept().await { + let connection = connecting.await.unwrap(); + let content_clone = content.clone(); + task::spawn(async move { + loop { + match connection.accept_bi().await { + Ok((tx, rx)) => { + task::spawn(async { + let stream = BiStream { tx, rx }; + let handler = UfdpHandler::new( + stream, + DummyBackend { + content: content_clone, + }, + 0, + ); + if let Err(e) = handler.serve().await { + println!("server error: {e:?}"); + } + }); + } + Err(ConnectionError::ApplicationClosed(_)) => { + // Client closed the connection. + break; + } + Err(e) => panic!("{e:?}"), + } + } + }); + } + } + + pub async fn client_loop(addr: String, iterations: usize) { + let mut tasks = vec![]; + let mut endpoint = Endpoint::client("0.0.0.0:0".parse().unwrap()).unwrap(); + let client_config = quinn::ClientConfig::new(Arc::new(client_config())); + endpoint.set_default_client_config(client_config); + let stream = endpoint + .connect(addr.parse().unwrap(), "localhost") + .unwrap() + .await + .unwrap(); + for _ in 0..iterations { + let (tx, rx) = stream.open_bi().await.unwrap(); + let stream = BiStream { tx, rx }; + let task = task::spawn(async move { + let mut client = UfdpClient::new(stream, CLIENT_PUB_KEY, None).await.unwrap(); + client.request(CID).await.unwrap(); + let mut stream = client.finish(); + stream.tx.finish().await.unwrap(); + }); + tasks.push(task); + } + join_all(tasks).await; + } + + // Bidirectional QUIC stream for sending one request. + struct BiStream { + tx: SendStream, + rx: RecvStream, + } + + impl AsyncRead for BiStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.rx).poll_read(cx, buf) + } + } + + impl AsyncWrite for BiStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.tx).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.tx).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.tx).poll_shutdown(cx) + } + } +} + +#[cfg(feature = "bench-quic")] +mod s2n_quic_ufdp { + use super::{ + tls_utils::{client_config, server_config}, + DummyBackend, + }; + use futures::future::join_all; + use s2n_quic::{client::Connect, provider::tls, Client, Server}; + use std::net::SocketAddr; + use tokio::task; + use ursa_pod::{client::UfdpClient, server::UfdpHandler, types::Blake3Cid}; + + const CLIENT_PUB_KEY: [u8; 48] = [3u8; 48]; + const CID: Blake3Cid = Blake3Cid([3u8; 32]); + + pub struct TlsProvider; + + impl tls::Provider for TlsProvider { + type Server = tls::rustls::Server; + type Client = tls::rustls::Client; + type Error = rustls::Error; + + fn start_server(self) -> Result { + Ok(server_config().into()) + } + + fn start_client(self) -> Result { + Ok(client_config().into()) + } + } + + pub async fn server_loop( + addr: String, + content: &'static [u8], + tx_started: tokio::sync::oneshot::Sender, + ) { + let mut server = Server::builder() + .with_tls(TlsProvider) + .unwrap() + .with_io(addr.as_str()) + .unwrap() + .start() + .unwrap(); + + tx_started + .send(server.local_addr().unwrap().port()) + .unwrap(); + + while let Some(mut conn) = server.accept().await { + let content_clone = content.clone(); + task::spawn(async move { + loop { + match conn.accept_bidirectional_stream().await { + Ok(Some(stream)) => { + task::spawn(async { + let handler = UfdpHandler::new( + stream, + DummyBackend { + content: content_clone, + }, + 0, + ); + if let Err(e) = handler.serve().await { + println!("server error: {e:?}"); + } + }); + } + Ok(None) => break, + Err(s2n_quic::connection::Error::Closed { .. }) => break, + Err(e) => panic!("{e:?}"), + } + } + }); + } + } + + pub async fn client_loop(addr: String, iterations: usize) { + let mut tasks = vec![]; + let client = Client::builder() + .with_tls(TlsProvider) + .unwrap() + .with_io("0.0.0.0:0") + .unwrap() + .start() + .unwrap(); + let addr: SocketAddr = addr.parse().unwrap(); + let mut connection = client + .connect(Connect::new(addr).with_server_name("localhost")) + .await + .unwrap(); + for _ in 0..iterations { + let stream = connection.open_bidirectional_stream().await.unwrap(); + let task = task::spawn(async move { + let mut client = UfdpClient::new(stream, CLIENT_PUB_KEY, None).await.unwrap(); + client.request(CID).await.unwrap(); + }); + tasks.push(task); + } + join_all(tasks).await; + } +} + +#[cfg(feature = "bench-quic")] +mod tls_utils { + use std::sync::Arc; + + pub struct SkipServerVerification; + + impl SkipServerVerification { + fn new() -> Arc { + Arc::new(Self) + } + } + + impl rustls::client::ServerCertVerifier for SkipServerVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } + } + + pub fn server_config() -> rustls::ServerConfig { + let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap(); + let key = rustls::PrivateKey(cert.serialize_private_key_der()); + let cert = vec![rustls::Certificate(cert.serialize_der().unwrap())]; + let mut config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(cert, key) + .unwrap(); + config.alpn_protocols = vec![b"ufdp".to_vec()]; + config + } + + pub fn client_config() -> rustls::ClientConfig { + let mut config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_no_client_auth(); + config.alpn_protocols = vec![b"ufdp".to_vec()]; + config + } +} + criterion_group!(benches, protocol_benchmarks); criterion_main!(benches); diff --git a/crates/ursa-pod/src/client.rs b/crates/ursa-pod/src/client.rs index d21e4214..9ec72478 100644 --- a/crates/ursa-pod/src/client.rs +++ b/crates/ursa-pod/src/client.rs @@ -143,6 +143,11 @@ where Ok(size) } + /// Consumes the client and returns the underlying stream. + pub fn finish(self) -> S { + self.conn.stream + } + /// Get the lane assigned to the connection pub fn lane(&self) -> u8 { self.lane diff --git a/crates/ursa-pod/src/server.rs b/crates/ursa-pod/src/server.rs index 9b66aca2..b1d7cbb0 100644 --- a/crates/ursa-pod/src/server.rs +++ b/crates/ursa-pod/src/server.rs @@ -47,7 +47,7 @@ impl UfdpHandler { } /// Begin serving a request. Accepts a handshake, and then begins the request loop. - pub async fn serve(mut self) -> Result<(), UrsaCodecError> { + pub async fn serve(mut self) -> Result { // Step 1: Perform the handshake. instrument!( self.handshake().await?, @@ -89,7 +89,7 @@ impl UfdpHandler { } } - Ok(()) + Ok(self.conn.stream) } /// Await and respond to a handshake