diff --git a/Cargo.lock b/Cargo.lock index ab23ee14020..fadcae6de75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1020,6 +1020,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -2070,6 +2080,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "httlib-huffman" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a9fcbcc408c5526c3ab80d534e5c86e7967c1fb7aa0a8c76abd1edc27deb877" + [[package]] name = "http" version = "0.2.12" @@ -2392,7 +2408,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdf9d64cfcf380606e64f9a0bcf493616b65331199f984151a6fa11a7b3cde38" dependencies = [ "async-io", - "core-foundation", + "core-foundation 0.9.4", "fnv", "futures", "if-addrs", @@ -3392,11 +3408,12 @@ dependencies = [ "libp2p-identity", "libp2p-swarm", "libp2p-yamux", - "rcgen", + "rcgen 0.11.3", "ring 0.17.8", "rustls 0.23.20", "rustls-webpki 0.101.7", "thiserror 2.0.9", + "time", "tokio", "x509-parser 0.16.0", "yasna", @@ -3444,7 +3461,7 @@ dependencies = [ "multihash", "quickcheck", "rand 0.8.5", - "rcgen", + "rcgen 0.11.3", "stun 0.7.0", "thiserror 2.0.9", "tokio", @@ -3509,7 +3526,7 @@ dependencies = [ "libp2p-tcp", "parking_lot", "pin-project-lite", - "rcgen", + "rcgen 0.11.3", "rw-stream-sink", "soketto", "thiserror 2.0.9", @@ -3536,6 +3553,30 @@ dependencies = [ "web-sys", ] +[[package]] +name = "libp2p-webtransport" +version = "0.1.0" +dependencies = [ + "futures", + "futures-timer", + "if-watch", + "libp2p-core", + "libp2p-identity", + "libp2p-noise", + "libp2p-tls", + "quickcheck", + "quinn", + "rustls 0.23.20", + "sha2 0.10.8", + "socket2", + "thiserror 1.0.69", + "time", + "tokio", + "tracing", + "tracing-subscriber", + "wtransport", +] + [[package]] name = "libp2p-webtransport-websys" version = "0.4.1" @@ -3888,7 +3929,7 @@ dependencies = [ "openssl-probe", "openssl-sys", "schannel", - "security-framework", + "security-framework 2.11.1", "security-framework-sys", "tempfile", ] @@ -4060,6 +4101,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "octets" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109983a091271ee8916076731ba5fdc9ee22fea871bc7c6ceab9bfd423eb1d99" + [[package]] name = "oid-registry" version = "0.6.1" @@ -4843,6 +4890,19 @@ dependencies = [ "yasna", ] +[[package]] +name = "rcgen" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75e669e5202259b5314d1ea5397316ad400819437857b90861765f24c4cf80a2" +dependencies = [ + "pem", + "ring 0.17.8", + "rustls-pki-types", + "time", + "yasna", +] + [[package]] name = "redis" version = "0.24.0" @@ -5239,6 +5299,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework 3.1.0", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -5382,7 +5454,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", - "core-foundation", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81d3f8c9bfcc3cbb6b0179eb57042d75b1582bdc65c3cb95f3fa999509c03cbc" +dependencies = [ + "bitflags 2.6.0", + "core-foundation 0.10.0", "core-foundation-sys", "libc", "security-framework-sys", @@ -5878,7 +5963,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ "bitflags 2.6.0", - "core-foundation", + "core-foundation 0.9.4", "system-configuration-sys", ] @@ -6818,7 +6903,7 @@ dependencies = [ "log", "pem", "rand 0.8.5", - "rcgen", + "rcgen 0.11.3", "regex", "ring 0.16.20", "rtcp", @@ -6882,7 +6967,7 @@ dependencies = [ "pem", "rand 0.8.5", "rand_core 0.6.4", - "rcgen", + "rcgen 0.11.3", "ring 0.16.20", "rustls 0.21.12", "sec1", @@ -7365,6 +7450,43 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "wtransport" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93a724f65db90b6a1ffa92ea4966cf03cb7e2bcd3ef7135b84dfe4339640d1b9" +dependencies = [ + "bytes", + "pem", + "quinn", + "rcgen 0.13.2", + "rustls 0.23.20", + "rustls-native-certs", + "rustls-pemfile", + "rustls-pki-types", + "sha2 0.10.8", + "socket2", + "thiserror 1.0.69", + "time", + "tokio", + "tracing", + "url", + "wtransport-proto", + "x509-parser 0.16.0", +] + +[[package]] +name = "wtransport-proto" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14e4882c24a62f15024609b3688e6e29a4c2129634d27debc849ccd3f9b9690b" +dependencies = [ + "httlib-huffman", + "octets", + "thiserror 1.0.69", + "url", +] + [[package]] name = "x25519-dalek" version = "2.0.1" diff --git a/Cargo.toml b/Cargo.toml index b10883e1441..e81d115ac83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ members = [ "transports/websocket-websys", "transports/websocket", "transports/webtransport-websys", + "transports/webtransport", "wasm-tests/webtransport-tests", ] resolver = "2" diff --git a/transports/tls/Cargo.toml b/transports/tls/Cargo.toml index 7702a4361b1..a238be70101 100644 --- a/transports/tls/Cargo.toml +++ b/transports/tls/Cargo.toml @@ -19,6 +19,7 @@ thiserror = { workspace = true } webpki = { version = "0.101.4", package = "rustls-webpki", features = ["std"] } x509-parser = "0.16.0" yasna = "0.5.2" +time = "0.3" # Exposed dependencies. Breaking changes to these are breaking changes to us. [dependencies.rustls] diff --git a/transports/tls/src/certificate.rs b/transports/tls/src/certificate.rs index 3e7eeb22bf3..ef402f8e0b5 100644 --- a/transports/tls/src/certificate.rs +++ b/transports/tls/src/certificate.rs @@ -28,6 +28,8 @@ use libp2p_identity as identity; use libp2p_identity::PeerId; use x509_parser::{prelude::*, signature_algorithm::SignatureAlgorithm}; +use ::time::OffsetDateTime; + /// The libp2p Public Key Extension is a X.509 extension /// with the Object Identifier 1.3.6.1.4.1.53594.1.1, /// allocated by IANA to the libp2p project at Protocol Labs. @@ -121,6 +123,47 @@ pub fn generate( Ok((rustls_certificate, rustls_key)) } +pub fn generate_with_validity_period( + identity_keypair: &identity::Keypair, + not_before: OffsetDateTime, + not_after: OffsetDateTime, +) -> Result< + ( + rustls::pki_types::CertificateDer<'static>, + rustls::pki_types::PrivateKeyDer<'static>, + ), + GenError, +> { + // Keypair used to sign the certificate. + // SHOULD NOT be related to the host's key. + // Endpoints MAY generate a new key and certificate + // for every connection attempt, or they MAY reuse the same key + // and certificate for multiple connections. + let certificate_keypair = rcgen::KeyPair::generate(P2P_SIGNATURE_ALGORITHM)?; + let rustls_key = rustls::pki_types::PrivateKeyDer::from( + rustls::pki_types::PrivatePkcs8KeyDer::from(certificate_keypair.serialize_der()), + ); + + let certificate = { + let mut params = rcgen::CertificateParams::new(vec![]); + params.distinguished_name = rcgen::DistinguishedName::new(); + params.custom_extensions.push(make_libp2p_extension( + identity_keypair, + &certificate_keypair, + )?); + params.alg = P2P_SIGNATURE_ALGORITHM; + params.key_pair = Some(certificate_keypair); + params.not_before = not_before; + params.not_after = not_after; + rcgen::Certificate::from_params(params)? + }; + + let rustls_certificate = rustls::pki_types::CertificateDer::from(certificate.serialize_der()?); + + Ok((rustls_certificate, rustls_key)) +} + + /// Attempts to parse the provided bytes as a [`P2pCertificate`]. /// /// For this to succeed, the certificate must contain the specified extension and the signature must @@ -128,7 +171,17 @@ pub fn generate( pub fn parse<'a>( certificate: &'a rustls::pki_types::CertificateDer<'a>, ) -> Result, ParseError> { - let certificate = parse_unverified(certificate.as_ref())?; + parse_binary(certificate.as_ref()) +} + +/// Attempts to parse the provided bytes as a [`P2pCertificate`]. +/// +/// For this to succeed, the certificate must contain the specified extension and the signature must +/// match the embedded public key. +pub fn parse_binary( + der_input: &[u8], +) -> Result { + let certificate = parse_unverified(der_input)?; certificate.verify()?; diff --git a/transports/tls/src/lib.rs b/transports/tls/src/lib.rs index 57d7d69d4bd..4ac50229e06 100644 --- a/transports/tls/src/lib.rs +++ b/transports/tls/src/lib.rs @@ -32,6 +32,8 @@ mod verifier; use std::sync::Arc; use certificate::AlwaysResolvesCert; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; + pub use futures_rustls::TlsStream; use libp2p_identity::{Keypair, PeerId}; pub use upgrade::{Config, UpgradeError}; @@ -89,3 +91,27 @@ pub fn make_server_config( Ok(crypto) } + +/// Create a TLS server configuration for libp2p. +pub fn make_webtransport_server_config( + certificate: &CertificateDer<'static>, + private_key: &PrivateKeyDer, + protocols: Vec>, +) -> rustls::ServerConfig { + let mut provider = rustls::crypto::ring::default_provider(); + provider.cipher_suites = verifier::CIPHERSUITES.to_vec(); + + let cert_resolver = Arc::new( + AlwaysResolvesCert::new(certificate.clone(), private_key) + .expect("Server cert key DER is valid; qed"), + ); + + let mut crypto = rustls::ServerConfig::builder_with_provider(provider.into()) + .with_protocol_versions(verifier::PROTOCOL_VERSIONS) + .expect("Cipher suites and kx groups are configured; qed") + .with_client_cert_verifier(Arc::new(verifier::Libp2pCertificateVerifier::new())) + .with_cert_resolver(cert_resolver); + crypto.alpn_protocols = protocols.to_vec(); + + crypto +} \ No newline at end of file diff --git a/transports/webtransport/Cargo.toml b/transports/webtransport/Cargo.toml new file mode 100644 index 00000000000..75e78cdbb66 --- /dev/null +++ b/transports/webtransport/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "libp2p-webtransport" +version = "0.1.0" +edition = "2021" +rust-version.workspace = true +description = "TLS based QUIC transport implementation for libp2p" +repository = "https://github.com/libp2p/rust-libp2p" +license = "MIT" + +[dependencies] +libp2p-core = { workspace = true } +libp2p-tls = { workspace = true } +libp2p-identity = { workspace = true, features = ["ed25519", "rand"] } +libp2p-noise = { workspace = true } +tokio = { workspace = true, default-features = false, features = ["net", "rt", "time"] } +futures = { workspace = true } +tracing = { workspace = true } +if-watch = { version = "3.2.0", features = ["tokio"] } +futures-timer = "3.0.3" +rustls = { version = "0.23.9", default-features = false } +thiserror = "1.0.61" +time = "0.3.36" +sha2 = "0.10.8" +socket2 = "0.5.7" +quinn = { version = "0.11.2", default-features = false, features = ["rustls", "futures-io"] } +wtransport = { version = "0.5.0", features = ["quinn"] } + +[dev-dependencies] +libp2p-identity = { workspace = true, features = ["rand"] } +quickcheck = "1" +time = "0.3.36" +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } +tracing-subscriber = { workspace = true, features = ["env-filter"] } + +[lints] +workspace = true diff --git a/transports/webtransport/src/certificate.rs b/transports/webtransport/src/certificate.rs new file mode 100644 index 00000000000..594ad4d294f --- /dev/null +++ b/transports/webtransport/src/certificate.rs @@ -0,0 +1,169 @@ +use std::io; +use std::io::{Cursor, Read, Write}; + +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; +use sha2::Digest; +use time::{Duration, OffsetDateTime}; + +use libp2p_core::multihash::Multihash; +use libp2p_tls::certificate; + +const MULTIHASH_SHA256_CODE: u64 = 0x12; +const CERT_VALID_PERIOD: Duration = Duration::days(14); + +pub type CertHash = Multihash<64>; + +/* +I would like to avoid interacting with the file system as much as possible. +My suggestion would be: +- libp2p::webtransport::Transport::new takes a list of certificates (of type libp2p::webtransport::Certificate) +- libp2p::webtransport::Certificate::generate allows users generate a new certificate with certain parameters (validity date etc) +- libp2p::webtransport::Certificate::{parse,to_bytes} allow users to serialize and deserialize certificates +*/ +#[derive(Debug, PartialEq, Eq)] +pub struct Certificate { + pub der: CertificateDer<'static>, + pub private_key_der: PrivateKeyDer<'static>, + pub not_before: OffsetDateTime, + pub not_after: OffsetDateTime, +} + +#[derive(Debug)] +pub enum Error { + GenError(certificate::GenError), + IoError(io::Error), +} + +impl From for Error { + fn from(value: certificate::GenError) -> Self { + Self::GenError(value) + } +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::IoError(value) + } +} + +impl Clone for Certificate { + fn clone(&self) -> Self { + Self { + der: self.der.clone(), + private_key_der: self.private_key_der.clone_key(), + not_before: self.not_before.clone(), + not_after: self.not_after.clone(), + } + } +} + +impl Certificate { + pub fn generate( + identity_keypair: &libp2p_identity::Keypair, + not_before: OffsetDateTime, + ) -> Result { + let not_after = not_before + .clone() + .checked_add(CERT_VALID_PERIOD) + .expect("Addition does not overflow"); + let (cert, private_key) = certificate::generate_with_validity_period( + identity_keypair, + not_before.clone(), + not_after.clone(), + )?; + + Ok(Self { + der: cert, + private_key_der: private_key, + not_before, + not_after, + }) + } + + pub(crate) fn cert_hash(&self) -> CertHash { + Multihash::wrap( + MULTIHASH_SHA256_CODE, + sha2::Sha256::digest(&self.der.as_ref().as_ref()).as_ref(), + ) + .expect("fingerprint's len to be 32 bytes") + } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + + Self::write_data(&mut bytes, self.der.as_ref()).expect("Write cert data"); + Self::write_data(&mut bytes, self.private_key_der.secret_der()) + .expect("Write private_key data"); + + let nb_buff = self.not_before.unix_timestamp().to_be_bytes(); + std::io::Write::write(&mut bytes, &nb_buff).expect("Write not_before"); + + let na_buff = self.not_after.unix_timestamp().to_be_bytes(); + std::io::Write::write(&mut bytes, &na_buff).expect("Write not_after"); + + bytes + } + + pub fn parse(data: &[u8]) -> Result { + let mut cursor = Cursor::new(data); + let cert_data = Self::read_data(&mut cursor)?; + let private_key_data = Self::read_data(&mut cursor)?; + let nb = Self::read_i64(&mut cursor).unwrap(); + let na = Self::read_i64(&mut cursor).unwrap(); + + let cert = CertificateDer::from(cert_data); + let private_key = PrivateKeyDer::try_from(private_key_data).unwrap(); + let not_before = OffsetDateTime::from_unix_timestamp(nb).unwrap(); + let not_after = OffsetDateTime::from_unix_timestamp(na).unwrap(); + + Ok(Self { + der: cert, + private_key_der: private_key, + not_before, + not_after, + }) + } + + fn write_data(w: &mut W, data: &[u8]) -> Result<(), io::Error> { + let size = data.len() as u64; + let mut size_buf = size.to_be_bytes(); + + w.write_all(&mut size_buf)?; + w.write_all(data)?; + + Ok(()) + } + + fn read_data(r: &mut R) -> Result, io::Error> { + let size = Self::read_i64(r)? as usize; + let mut res = vec![0u8; size]; + + r.read(res.as_mut_slice())?; + + Ok(res) + } + + fn read_i64(r: &mut R) -> Result { + let mut buffer = [0u8; 8]; + r.read(&mut buffer)?; + + Ok(i64::from_be_bytes(buffer)) + } +} + +#[cfg(test)] +mod tests { + use time::macros::datetime; + + #[test] + fn test_certificate_parsing() { + let keypair = libp2p_identity::Keypair::generate_ed25519(); + let not_before = datetime!(2025-08-08 0:00 UTC); + let cert = super::Certificate::generate(&keypair, not_before).unwrap(); + + let binary_data = cert.to_bytes(); + let actual = super::Certificate::parse(binary_data.as_slice()).unwrap(); + + assert_eq!(actual, cert); + } +} diff --git a/transports/webtransport/src/config.rs b/transports/webtransport/src/config.rs new file mode 100644 index 00000000000..ee54890d2fa --- /dev/null +++ b/transports/webtransport/src/config.rs @@ -0,0 +1,66 @@ +use quinn::{TransportConfig, VarInt}; +use std::sync::Arc; +use std::time::Duration; +use wtransport::config::TlsServerConfig; + +use crate::certificate::{CertHash, Certificate}; + +pub struct Config { + pub quic_transport_config: Arc, + /// Timeout for the initial handshake when establishing a connection. + /// The actual timeout is the minimum of this and the [`Config::max_idle_timeout`]. + pub handshake_timeout: Duration, + /// Libp2p identity of the node. + pub keypair: libp2p_identity::Keypair, + + cert: Certificate, +} + +impl Config { + pub fn new(keypair: &libp2p_identity::Keypair, cert: Certificate) -> Self { + let max_idle_timeout = 10 * 1000; + let max_concurrent_stream_limit = 256; + let keep_alive_interval = Duration::from_secs(5); + let max_connection_data = 15_000_000; + // Ensure that one stream is not consuming the whole connection. + let max_stream_data = 10_000_000; + let mtu_discovery_config = Some(Default::default()); + + let mut transport = quinn::TransportConfig::default(); + // Disable uni-directional streams. + transport.max_concurrent_uni_streams(0u32.into()); + transport.max_concurrent_bidi_streams(max_concurrent_stream_limit.into()); + // Disable datagrams. + transport.datagram_receive_buffer_size(None); + transport.keep_alive_interval(Some(keep_alive_interval)); + transport.max_idle_timeout(Some(VarInt::from_u32(max_idle_timeout).into())); + transport.allow_spin(false); + transport.stream_receive_window(max_stream_data.into()); + transport.receive_window(max_connection_data.into()); + transport.mtu_discovery_config(mtu_discovery_config); + let transport = Arc::new(transport); + + Self { + quic_transport_config: transport, + handshake_timeout: Duration::from_secs(5), + keypair: keypair.clone(), + cert, + } + } + + pub fn server_tls_config(&self) -> TlsServerConfig { + libp2p_tls::make_webtransport_server_config( + &self.cert.der, + &self.cert.private_key_der, + alpn_protocols(), + ) + } + + pub fn cert_hashes(&self) -> Vec { + vec![self.cert.cert_hash()] + } +} + +fn alpn_protocols() -> Vec> { + vec![b"libp2p".to_vec(), b"h3".to_vec()] +} diff --git a/transports/webtransport/src/connection.rs b/transports/webtransport/src/connection.rs new file mode 100644 index 00000000000..9137902f63a --- /dev/null +++ b/transports/webtransport/src/connection.rs @@ -0,0 +1,93 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::future::BoxFuture; +use futures::{ready, FutureExt}; +use wtransport::{error::ConnectionError, RecvStream, SendStream}; + +pub(crate) use connecting::Connecting; +use libp2p_core::muxing::StreamMuxerEvent; +use libp2p_core::StreamMuxer; + +pub(crate) use crate::connection::stream::Stream; +use crate::Error; + +mod connecting; +mod stream; + +/// State for a single opened Webtransport connection. +pub struct Connection { + /// Underlying connection. + connection: wtransport::Connection, + /// Future for accepting a new incoming bidirectional stream. + incoming: Option>>, + /// Future to wait for the connection to be closed. + closing: Option>, +} + +impl Connection { + fn new(connection: wtransport::Connection) -> Self { + Self { + connection, + incoming: None, + closing: None, + } + } +} + +impl StreamMuxer for Connection { + type Substream = Stream; + type Error = Error; + + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + let incoming = this.incoming.get_or_insert_with(|| { + let connection = this.connection.clone(); + async move { connection.accept_bi().await }.boxed() + }); + + let (send, recv) = ready!(incoming.poll_unpin(cx))?; + this.incoming.take(); + let stream = Stream::new(send, recv); + Poll::Ready(Ok(stream)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + let closing = this.closing.get_or_insert_with(|| { + this.connection.close(From::from(0u32), &[]); + let connection = this.connection.clone(); + async move { connection.closed().await }.boxed() + }); + + match ready!(closing.poll_unpin(cx)) { + // Expected error given that `connection.close` was called above. + ConnectionError::LocallyClosed => {} + error => return Poll::Ready(Err(Error::Connection(error))), + }; + + Poll::Ready(Ok(())) + } + + fn poll_outbound( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + // Doesn't support outbound connections! + Poll::Pending + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + // TODO: If connection migration is enabled (currently disabled) address + // change on the connection needs to be handled. + Poll::Pending + } +} diff --git a/transports/webtransport/src/connection/connecting.rs b/transports/webtransport/src/connection/connecting.rs new file mode 100644 index 00000000000..8be3fb27f60 --- /dev/null +++ b/transports/webtransport/src/connection/connecting.rs @@ -0,0 +1,117 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use futures::future::{select, BoxFuture, Either, Select}; +use futures::{ready, FutureExt}; +use futures_timer::Delay; +use wtransport::endpoint::IncomingSession; + +use libp2p_core::upgrade::InboundConnectionUpgrade; +use libp2p_identity::PeerId; + +use crate::{Connection, Error}; + +pub(crate) const WEBTRANSPORT_PATH: &str = "/.well-known/libp2p-webtransport?type=noise"; + +/// A Webtransport connection currently being negotiated. +pub struct Connecting { + connecting: Select>, Delay>, +} + +impl Connecting { + pub fn new( + incoming_session: IncomingSession, + noise_config: libp2p_noise::Config, + timeout: Duration, + ) -> Self { + Connecting { + connecting: select( + Self::handshake(incoming_session, noise_config).boxed(), + Delay::new(timeout), + ), + } + } + + async fn handshake( + incoming_session: IncomingSession, + _noise_config: libp2p_noise::Config, + ) -> Result<(PeerId, Connection), Error> { + match incoming_session.await { + Ok(session_request) => { + tracing::debug!("Got session request={:?}", session_request.path()); + + let path = session_request.path(); + if path != WEBTRANSPORT_PATH { + return Err(Error::UnexpectedPath(String::from(path))); + } + match session_request.accept().await { + Ok(wtransport_connection) => { + // The client SHOULD start the handshake right after sending the CONNECT request, + // without waiting for the server's response. + let peer_id = PeerId::random(); + // todo a real noise auth + // let peer_id = + // Self::noise_auth(wtransport_connection.clone(), noise_config).await?; + + tracing::debug!( + "Accepted connection with sessionId={}", + wtransport_connection.session_id() + ); + + let connection = Connection::new(wtransport_connection); + Ok((peer_id, connection)) + } + Err(connection_error) => Err(Error::Connection(connection_error)), + } + } + Err(connection_error) => Err(Error::Connection(connection_error)), + } + } + + async fn noise_auth( + connection: wtransport::Connection, + noise_config: libp2p_noise::Config, + ) -> Result { + fn remote_peer_id(con: &wtransport::Connection) -> PeerId { + let cert_chain = con + .peer_identity() + .expect("connection got identity because it passed TLS handshake; qed"); + let cert = cert_chain + .as_slice() + .first() + .expect("there should be exactly one certificate; qed"); + + let p2p_cert = libp2p_tls::certificate::parse_binary(cert.der()) + .expect("the certificate was validated during TLS handshake; qed"); + + p2p_cert.peer_id() + } + + let (send, recv) = connection.accept_bi().await?; + let stream = crate::Stream::new(send, recv); + let (actual_peer_id, _) = noise_config.upgrade_inbound(stream, "").await?; + + let expected_peer_id = remote_peer_id(&connection); + // TODO: This should be part libp2p-noise + if actual_peer_id != expected_peer_id { + return Err(Error::UnknownRemotePeerId); + } + + Ok(actual_peer_id) + } +} + +impl Future for Connecting { + type Output = Result<(PeerId, Connection), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (peer_id, connection) = match ready!(self.connecting.poll_unpin(cx)) { + Either::Right(_) => return Poll::Ready(Err(Error::HandshakeTimedOut)), + Either::Left((res, _)) => res?, + }; + + Poll::Ready(Ok((peer_id, connection))) + } +} diff --git a/transports/webtransport/src/connection/stream.rs b/transports/webtransport/src/connection/stream.rs new file mode 100644 index 00000000000..a88a5f439d4 --- /dev/null +++ b/transports/webtransport/src/connection/stream.rs @@ -0,0 +1,70 @@ +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use wtransport::{RecvStream, SendStream}; + +/// A single stream on a connection +pub struct Stream { + /// A send part of the stream + send: SendStream, + /// A reception part of the stream + recv: RecvStream, + /// Whether the stream is closed or not + close_result: Option>, +} + +impl Stream { + pub(super) fn new(send: SendStream, recv: RecvStream) -> Self { + Self { + send, + recv, + close_result: None, + } + } +} + +impl futures::AsyncRead for Stream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if let Some(close_result) = self.close_result { + if close_result.is_err() { + return Poll::Ready(Ok(0)); + } + } + let mut read_buf = ReadBuf::new(buf); + let res = AsyncRead::poll_read(Pin::new(&mut self.recv), cx, &mut read_buf); + match res { + Poll::Ready(Ok(_)) => Poll::Ready(Ok(read_buf.filled().len())), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + } +} + +impl futures::AsyncWrite for Stream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + AsyncWrite::poll_write(Pin::new(&mut self.send), cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + AsyncWrite::poll_flush(Pin::new(&mut self.send), cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(close_result) = self.close_result { + // For some reason poll_close needs to be 'fuse'able + return Poll::Ready(close_result.map_err(Into::into)); + } + let close_result = futures::ready!(AsyncWrite::poll_shutdown(Pin::new(&mut self.send), cx)); + self.close_result = Some(close_result.as_ref().map_err(|e| e.kind()).copied()); + Poll::Ready(close_result) + } +} diff --git a/transports/webtransport/src/lib.rs b/transports/webtransport/src/lib.rs new file mode 100644 index 00000000000..c322b6fb150 --- /dev/null +++ b/transports/webtransport/src/lib.rs @@ -0,0 +1,47 @@ +mod certificate; +mod config; +mod connection; +mod transport; + +use wtransport::error::ConnectionError; + +pub use certificate::{CertHash, Certificate}; +pub use config::Config; +pub(crate) use connection::{Connecting, Connection, Stream}; +use libp2p_core::transport::TransportError; +pub use transport::GenTransport; + +/// Errors that may happen on the [`GenTransport`] or a single [`Connection`]. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Error after the remote has been reached. + #[error(transparent)] + Connection(#[from] ConnectionError), + + /// I/O Error on a socket. + #[error(transparent)] + Io(#[from] std::io::Error), + + /// The [`Connecting`] future timed out. + #[error("Unexpected HTTP endpoint of a libp2p WebTransport server {0}")] + UnexpectedPath(String), + + #[error(transparent)] + AuthenticationError(#[from] libp2p_noise::Error), + + #[error("Unknown remote peer ID")] + UnknownRemotePeerId, + + /// The [`Connecting`] future timed out. + #[error("Handshake with the remote timed out")] + HandshakeTimedOut, + + #[error("Dial operation is not allowed on a libp2p WebTransport server")] + DialOperationIsNotAllowed, +} + +impl From for TransportError { + fn from(value: Error) -> Self { + TransportError::Other(value) + } +} diff --git a/transports/webtransport/src/transport.rs b/transports/webtransport/src/transport.rs new file mode 100644 index 00000000000..7b56bdbc626 --- /dev/null +++ b/transports/webtransport/src/transport.rs @@ -0,0 +1,657 @@ +use std::collections::HashSet; +use std::future::Pending; +use std::net::{IpAddr, SocketAddr, UdpSocket}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; +use std::time::Duration; +use std::{fmt, io}; + +use futures::future::BoxFuture; +use futures::{prelude::*, ready, stream::SelectAll}; +use if_watch::tokio::IfWatcher; +use if_watch::IfEvent; +use wtransport::endpoint::{endpoint_side::Server, Endpoint, IncomingSession}; +use wtransport::ServerConfig; + +use libp2p_core::transport::{DialOpts, ListenerId, TransportError, TransportEvent}; +use libp2p_core::{multiaddr::Protocol, Multiaddr, Transport}; +use libp2p_identity::{Keypair, PeerId}; +use socket2::{Domain, Socket, Type}; + +use crate::certificate::CertHash; +use crate::config::Config; +use crate::connection::Connection; +use crate::Connecting; +use crate::Error; + +pub struct GenTransport { + config: Config, + + listeners: SelectAll, + /// Waker to poll the transport again when a new listener is added. + waker: Option, +} + +impl GenTransport { + pub fn new(config: Config) -> Self { + GenTransport { + config, + listeners: SelectAll::new(), + waker: None, + } + } + + /// Extract the addr, quic version and peer id from the given [`Multiaddr`]. + fn remote_multiaddr_to_socketaddr( + &self, + addr: Multiaddr, + check_unspecified_addr: bool, + ) -> Result<(SocketAddr, Option), TransportError<::Error>> { + //todo rewrite: addr.clone() should be avoided. + let (socket_addr, peer_id) = multiaddr_to_socketaddr(&addr) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; + if check_unspecified_addr && (socket_addr.port() == 0 || socket_addr.ip().is_unspecified()) + { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + Ok((socket_addr, peer_id)) + } +} + +impl Transport for GenTransport { + type Output = (PeerId, Connection); + type Error = Error; + type ListenerUpgrade = Connecting; + type Dial = Pending>; + + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + let (socket_addr, _peer_id) = self.remote_multiaddr_to_socketaddr(addr, false)?; + let socket = create_socket(socket_addr).map_err(Self::Error::from)?; + + let server_tls_config = self.config.server_tls_config(); + let quic_transport_config = Arc::clone(&self.config.quic_transport_config); + + let config = ServerConfig::builder() + .with_bind_socket(socket.try_clone().unwrap()) + .with_custom_tls_and_transport(server_tls_config, quic_transport_config) + .build(); + + let endpoint = + wtransport::Endpoint::server(config).map_err(|e| TransportError::Other(e.into()))?; + let keypair = &self.config.keypair; + let cert_hashes = self.config.cert_hashes(); + let handshake_timeout = self.config.handshake_timeout.clone(); + + let listener = Listener::new( + id, + socket, + endpoint, + keypair, + cert_hashes, + handshake_timeout, + )?; + self.listeners.push(listener); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + + Ok(()) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) { + // Close the listener, which will eventually finish its stream. + // `SelectAll` removes streams once they are finished. + listener.close(Ok(())); + true + } else { + false + } + } + + fn dial( + &mut self, + _addr: Multiaddr, + _opts: DialOpts, + ) -> Result> { + Err(TransportError::Other(Error::DialOperationIsNotAllowed)) + } + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if let Poll::Ready(Some(ev)) = self.listeners.poll_next_unpin(cx) { + return Poll::Ready(ev); + } + + match &self.waker { + None => self.waker = Some(cx.waker().clone()), + Some(waker) => { + if !waker.will_wake(cx.waker()) { + self.waker = Some(cx.waker().clone()); + } + } + }; + + Poll::Pending + } +} + +/// Listener for incoming connections. +struct Listener { + /// Id of the listener. + listener_id: ListenerId, + /// Endpoint + endpoint: Arc>, + /// Watcher for network interface changes. + /// None if we are only listening on a single interface. + if_watcher: Option, + /// A future to poll new incoming connections. + accept: BoxFuture<'static, IncomingSession>, + /// Timeout for connection establishment on inbound connections. + handshake_timeout: Duration, + /// Whether the listener was closed and the stream should terminate. + is_closed: bool, + /// Pending event to reported. + pending_event: Option<::Item>, + /// The stream must be to awaken after it has been closed to deliver the last event. + close_listener_waker: Option, + + keypair: Keypair, + + cert_hashes: Vec, +} + +impl Listener { + fn new( + listener_id: ListenerId, + socket: UdpSocket, + endpoint: Endpoint, + keypair: &Keypair, + cert_hashes: Vec, + handshake_timeout: Duration, + ) -> Result { + let endpoint = Arc::new(endpoint); + let c_endpoint = Arc::clone(&endpoint); + let accept = async move { c_endpoint.accept().await }.boxed(); + + let if_watcher; + let pending_event; + let local_addr = socket.local_addr()?; + if local_addr.ip().is_unspecified() { + if_watcher = Some(IfWatcher::new()?); + pending_event = None; + } else { + if_watcher = None; + let ma = socketaddr_to_multiaddr_with_hashes(&local_addr, &cert_hashes); + pending_event = Some(TransportEvent::NewAddress { + listener_id, + listen_addr: ma, + }) + } + + Ok(Listener { + listener_id, + endpoint, + if_watcher, + accept, + handshake_timeout, + is_closed: false, + pending_event, + close_listener_waker: None, + keypair: keypair.clone(), + cert_hashes, + }) + } + + /// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and + /// terminate the stream. + fn close(&mut self, reason: Result<(), Error>) { + if self.is_closed { + return; + } + self.endpoint.close(From::from(0u32), &[]); + self.pending_event = Some(TransportEvent::ListenerClosed { + listener_id: self.listener_id, + reason, + }); + self.is_closed = true; + + // Wake the stream to deliver the last event. + if let Some(waker) = self.close_listener_waker.take() { + waker.wake(); + } + } + + fn socket_addr(&self) -> SocketAddr { + self.endpoint + .local_addr() + .expect("Cannot fail because the socket is bound") + } + + fn noise_config(&self) -> libp2p_noise::Config { + let res = libp2p_noise::Config::new(&self.keypair).expect("Getting a noise config"); + let set = self.cert_hashes.iter().cloned().collect::>(); + let res = res.with_webtransport_certhashes(set); + + res + } + + fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { + let endpoint_addr = self.socket_addr(); + let Some(if_watcher) = self.if_watcher.as_mut() else { + return Poll::Pending; + }; + loop { + match ready!(if_watcher.poll_if_event(cx)) { + Ok(IfEvent::Up(inet)) => { + if let Some(listen_addr) = + ip_to_listen_addr(&endpoint_addr, inet.addr(), &self.cert_hashes) + { + tracing::debug!( + address=%listen_addr, + "New listen address" + ); + return Poll::Ready(TransportEvent::NewAddress { + listener_id: self.listener_id, + listen_addr, + }); + } + } + Ok(IfEvent::Down(inet)) => { + if let Some(listen_addr) = + ip_to_listen_addr(&endpoint_addr, inet.addr(), &self.cert_hashes) + { + tracing::debug!( + address=%listen_addr, + "Expired listen address" + ); + return Poll::Ready(TransportEvent::AddressExpired { + listener_id: self.listener_id, + listen_addr, + }); + } + } + Err(err) => { + return Poll::Ready(TransportEvent::ListenerError { + listener_id: self.listener_id, + error: err.into(), + }) + } + } + } + } +} + +impl Stream for Listener { + type Item = TransportEvent<::ListenerUpgrade, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if let Some(event) = self.pending_event.take() { + return Poll::Ready(Some(event)); + } + if self.is_closed { + return Poll::Ready(None); + } + if let Poll::Ready(event) = self.poll_if_addr(cx) { + return Poll::Ready(Some(event)); + } + + match self.accept.poll_unpin(cx) { + Poll::Ready(incoming_session) => { + let endpoint = Arc::clone(&self.endpoint); + self.accept = async move { endpoint.accept().await }.boxed(); + let local_addr = + socketaddr_to_multiaddr_with_hashes(&self.socket_addr(), &self.cert_hashes); + let remote_addr = incoming_session.remote_address(); + let send_back_addr = socketaddr_to_multiaddr(&remote_addr); + let noise = self.noise_config(); + + let event = TransportEvent::Incoming { + upgrade: Connecting::new(incoming_session, noise, self.handshake_timeout), + local_addr, + send_back_addr, + listener_id: self.listener_id, + }; + return Poll::Ready(Some(event)); + } + // todo Думаю, что нужно получать не IncomingSession, а + // todo Result с запросом на соединение. Тогда можно будет грамотно обработать ошибки + // todo и закрыть здесь листенер + // Poll::Ready(None) => { + // self.close(Ok(())); + // continue; + // } + Poll::Pending => {} + }; + + match &self.close_listener_waker { + None => self.close_listener_waker = Some(cx.waker().clone()), + Some(waker) => { + if !waker.will_wake(cx.waker()) { + self.close_listener_waker = Some(cx.waker().clone()) + } + } + } + + return Poll::Pending; + } + } +} + +impl fmt::Debug for Listener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Listener") + .field("listener_id", &self.listener_id) + .field("handshake_timeout", &self.handshake_timeout) + .field("is_closed", &self.is_closed) + .field("pending_event", &self.pending_event) + .finish() + } +} + +fn create_socket(socket_addr: SocketAddr) -> io::Result { + let socket = Socket::new( + Domain::for_address(socket_addr), + Type::DGRAM, + Some(socket2::Protocol::UDP), + )?; + if socket_addr.is_ipv6() { + socket.set_only_v6(true)?; + } + + socket.bind(&socket_addr.into())?; + + Ok(socket.into()) +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) enum SocketFamily { + Ipv4, + Ipv6, +} + +impl SocketFamily { + fn is_same(a: &IpAddr, b: &IpAddr) -> bool { + matches!( + (a, b), + (IpAddr::V4(_), IpAddr::V4(_)) | (IpAddr::V6(_), IpAddr::V6(_)) + ) + } +} + +impl From for SocketFamily { + fn from(ip: IpAddr) -> Self { + match ip { + IpAddr::V4(_) => SocketFamily::Ipv4, + IpAddr::V6(_) => SocketFamily::Ipv6, + } + } +} + +/// Turn an [`IpAddr`] reported by the interface watcher into a +/// listen-address for the endpoint. +/// +/// For this, the `ip` is combined with the port that the endpoint +/// is actually bound. +/// +/// Returns `None` if the `ip` is not the same socket family as the +/// address that the endpoint is bound to. +fn ip_to_listen_addr( + endpoint_addr: &SocketAddr, + ip: IpAddr, + hashes: &Vec, +) -> Option { + // True if either both addresses are Ipv4 or both Ipv6. + if !SocketFamily::is_same(&endpoint_addr.ip(), &ip) { + return None; + } + let socket_addr = SocketAddr::new(ip, endpoint_addr.port()); + Some(socketaddr_to_multiaddr_with_hashes(&socket_addr, hashes)) +} + +/// Turns an IP address and port into the corresponding WebTransport multiaddr. +fn socketaddr_to_multiaddr(socket_addr: &SocketAddr) -> Multiaddr { + Multiaddr::empty() + .with(socket_addr.ip().into()) + .with(Protocol::Udp(socket_addr.port())) + .with(Protocol::QuicV1) + .with(Protocol::WebTransport) +} + +fn socketaddr_to_multiaddr_with_hashes( + socket_addr: &SocketAddr, + hashes: &Vec, +) -> Multiaddr { + let mut res = socketaddr_to_multiaddr(socket_addr); + + if !hashes.is_empty() { + let mut vec = hashes.clone(); + res = res.with(Protocol::Certhash( + vec.pop().expect("Gets the last element"), + )); + if !vec.is_empty() { + res = res.with(Protocol::Certhash( + vec.pop().expect("Gets the last element"), + )); + }; + } + + res +} + +/// Tries to turn a Webtransport multiaddress into a UDP [`SocketAddr`]. Returns None if the format +/// of the multiaddr is wrong. +fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Option<(SocketAddr, Option)> { + let mut iter = addr.iter(); + let proto1 = iter.next()?; + let proto2 = iter.next()?; + let proto3 = iter.next()?; + + if !matches!(proto3, Protocol::QuicV1) { + tracing::error!("Cannot listen on a non QUIC address {addr}"); + return None; + } + + let mut peer_id = None; + let mut is_webtransport = false; + for proto in iter { + match proto { + Protocol::P2p(id) => { + peer_id = Some(id); + } + Protocol::WebTransport => { + is_webtransport = true; + } + Protocol::Certhash(_) => { + tracing::error!( + "Cannot listen on a specific certhash for WebTransport address {addr}" + ); + return None; + } + _ => return None, + } + } + + if !is_webtransport { + tracing::error!("Listening address {addr} should be followed by `/webtransport`"); + return None; + } + + match (proto1, proto2) { + (Protocol::Ip4(ip), Protocol::Udp(port)) => { + Some((SocketAddr::new(ip.into(), port), peer_id)) + } + (Protocol::Ip6(ip), Protocol::Udp(port)) => { + Some((SocketAddr::new(ip.into(), port), peer_id)) + } + _ => None, + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::certificate::Certificate; + use futures::future::poll_fn; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + use time::ext::NumericalDuration; + use time::OffsetDateTime; + + fn generate_keypair_and_cert() -> (Keypair, Certificate) { + let keypair = Keypair::generate_ed25519(); + let not_before = OffsetDateTime::now_utc().checked_sub(1.days()).unwrap(); + let cert = Certificate::generate(&keypair, not_before).expect("Generate certificate"); + + (keypair, cert) + } + + #[tokio::test] + async fn test_close_listener() { + let (keypair, cert) = generate_keypair_and_cert(); + let config = Config::new(&keypair, cert); + let mut transport = GenTransport::new(config); + + assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)) + .now_or_never() + .is_none()); + + // Run test twice to check that there is no unexpected behaviour if `Transport.listener` + // is temporarily empty. + for _ in 0..2 { + let id = ListenerId::next(); + transport + .listen_on( + id, + "/ip4/0.0.0.0/udp/0/quic-v1/webtransport".parse().unwrap(), + ) + .unwrap(); + + match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { + TransportEvent::NewAddress { + listener_id, + listen_addr, + } => { + assert_eq!(listener_id, id); + assert!( + matches!(listen_addr.iter().next(), Some(Protocol::Ip4(a)) if !a.is_unspecified()) + ); + assert!( + matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0) + ); + assert!(matches!(listen_addr.iter().nth(2), Some(Protocol::QuicV1))); + } + e => panic!("Unexpected event: {e:?}"), + } + assert!(transport.remove_listener(id), "Expect listener to exist."); + match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { + TransportEvent::ListenerClosed { + listener_id, + reason: Ok(()), + } => { + assert_eq!(listener_id, id); + } + e => panic!("Unexpected event: {e:?}"), + } + // Poll once again so that the listener has the chance to return `Poll::Ready(None)` and + // be removed from the list of listeners. + assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)) + .now_or_never() + .is_none()); + assert!(transport.listeners.is_empty()); + } + } + + #[test] + fn socket_to_multiaddr() { + let (_keypair, cert) = generate_keypair_and_cert(); + let certs = vec![cert.cert_hash()]; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345); + let res = socketaddr_to_multiaddr_with_hashes(&addr, &certs); + + assert!(multiaddr_to_socketaddr(&res.to_string().parse::().unwrap()).is_none()); + } + + #[test] + fn multiaddr_to_udp_conversion() { + assert!( + multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) + .is_none() + ); + + assert!(multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/1234/quic-v1" + .parse::() + .unwrap() + ) + .is_none()); + + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/12345/quic-v1/webtransport" + .parse::() + .unwrap() + ), + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345), + None + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/255.255.255.255/udp/8080/quic-v1/webtransport" + .parse::() + .unwrap() + ), + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080), + None + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/55148/quic-v1/webtransport/p2p/12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ" + .parse::() + .unwrap() + ), + Some((SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55148), + Some("12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ".parse().unwrap()))) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip6/::1/udp/12345/quic-v1/webtransport" + .parse::() + .unwrap() + ), + Some(( + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345), + None + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/udp/8080/quic-v1/webtransport" + .parse::() + .unwrap() + ), + Some(( + SocketAddr::new( + IpAddr::V6(Ipv6Addr::new( + 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535, + )), + 8080, + ), + None + )) + ); + } +} diff --git a/transports/webtransport/tests/smoke.rs b/transports/webtransport/tests/smoke.rs new file mode 100644 index 00000000000..69f96245f10 --- /dev/null +++ b/transports/webtransport/tests/smoke.rs @@ -0,0 +1,116 @@ +use std::net::SocketAddr; + +use futures::join; +use futures::stream::StreamExt; +use time::ext::NumericalDuration; +use time::OffsetDateTime; +use wtransport::ClientConfig; +use wtransport::Endpoint; + +use libp2p_core::multiaddr::Protocol; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::transport::{Boxed, ListenerId, TransportEvent}; +use libp2p_core::{Multiaddr, Transport}; +use libp2p_identity::{Keypair, PeerId}; +use libp2p_webtransport as webtransport; + +#[tokio::test] +async fn smoke() { + let subscriber = tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .compact() + .finish(); + tracing::subscriber::set_global_default(subscriber).unwrap(); + + let (keypair, cert) = generate_keypair_and_certificate(); + let config = webtransport::Config::new(&keypair, cert); + let mut transport = webtransport::GenTransport::new(config) + .map(|(p, c), _| (p, StreamMuxerBox::new(c))) + .boxed(); + let addr = start_listening(&mut transport, "/ip4/127.0.0.1/udp/0/quic-v1/webtransport").await; + let socket_addr = multiaddr_to_socketaddr(&addr).unwrap(); + + let a = async move { + loop { + match &mut transport.next().await { + Some(TransportEvent::Incoming { + listener_id, + upgrade, + .. + }) => { + tracing::debug!("Got incoming event. listener_id={}", listener_id); + match upgrade.await { + Ok((peer_id, _mutex)) => { + tracing::debug!("Connection is opened. peer_id={}", peer_id); + return Some(peer_id); + } + Err(e) => { + tracing::error!("Upgrade got an error {:?}", e); + return None; + } + } + } + Some(e) => tracing::debug!("Got event {:?}", e), + e => { + tracing::error!("MY_TEST Got an error {:?}", e); + return None; + } + } + } + }; + + let url = format!( + "https://{}/.well-known/libp2p-webtransport?type=noise", + socket_addr + ); + let b = async move { + let client_key_pair = Keypair::generate_ed25519(); + let client_tls = libp2p_tls::make_client_config(&client_key_pair, None).unwrap(); + let config = ClientConfig::builder() + .with_bind_default() + .with_custom_tls(client_tls) + .build(); + + match Endpoint::client(config) + .unwrap() + .connect(url.as_str()) + .await + { + Ok(_) => {} + Err(_) => {} + } + }; + + matches!(join!(a, b), (Some(_id), ())); +} + +fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Option { + let mut iter = addr.iter(); + let proto1 = iter.next()?; + let proto2 = iter.next()?; + + match (proto1, proto2) { + (Protocol::Ip4(ip), Protocol::Udp(port)) => Some(SocketAddr::new(ip.into(), port)), + (Protocol::Ip6(ip), Protocol::Udp(port)) => Some(SocketAddr::new(ip.into(), port)), + _ => None, + } +} + +async fn start_listening(transport: &mut Boxed<(PeerId, StreamMuxerBox)>, addr: &str) -> Multiaddr { + transport + .listen_on(ListenerId::next(), addr.parse().unwrap()) + .unwrap(); + match transport.next().await { + Some(TransportEvent::NewAddress { listen_addr, .. }) => listen_addr, + e => panic!("{e:?}"), + } +} + +fn generate_keypair_and_certificate() -> (Keypair, webtransport::Certificate) { + let keypair = Keypair::generate_ed25519(); + let not_before = OffsetDateTime::now_utc().checked_sub(1.days()).unwrap(); + let cert = + webtransport::Certificate::generate(&keypair, not_before).expect("Generate certificate"); + + (keypair, cert) +}