diff --git a/.github/workflows/release-chat-example.yml b/.github/workflows/release-chat-example.yml new file mode 100644 index 0000000..6f06123 --- /dev/null +++ b/.github/workflows/release-chat-example.yml @@ -0,0 +1,72 @@ +name: Build and release chat example +on: + push: + branches: + - master + - feat/chat-example +permissions: write-all +jobs: + metadata: + name: Get release metadata + runs-on: ubuntu-latest + outputs: + version: ${{ steps.get_version.outputs.version }} + release_exists: ${{ steps.check_release.outputs.exists }} + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Get version + id: get_version + run: echo "version=chat-example-$(cargo read-manifest --manifest-path examples/chat/Cargo.toml | jq -r '.version')" >> $GITHUB_OUTPUT + + - name: Check if release exists + id: check_release + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + RELEASE_URL=$(curl --silent "https://api.github.com/repos/calimero-network/relay-server/releases/tags/${{ steps.get_version.outputs.version }}" \ + -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.v3+json" | jq -r '.url') + if [[ "$RELEASE_URL" != "null" ]]; then + echo "exists=true" >> $GITHUB_OUTPUT + else + echo "exists=false" >> $GITHUB_OUTPUT + fi + + release: + name: Build and release + runs-on: ubuntu-latest + needs: metadata + if: needs.metadata.outputs.release_exists == 'false' + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup rust toolchain + run: rustup toolchain install stable --profile minimal + + - name: Setup rust cache + uses: Swatinem/rust-cache@v2 + + - name: Build for Intel Linux + run: cargo build -p chat-example --release --target=x86_64-unknown-linux-gnu + + - name: Build for Aarch Linux + run: cross build -p chat-example --release --target=aarch64-unknown-linux-gnu + + - name: Create artifacts directory + run: | + mkdir -p artifacts + cp target/x86_64-unknown-linux-gnu/release/chat-example artifacts/chat-example-x86_64-unknown-linux + cp target/aarch64-unknown-linux-gnu/release/chat-example artifacts/chat-example-aarch64-unknown-linux + + - name: Create GitHub Release + uses: softprops/action-gh-release@v2 + with: + tag_name: ${{ needs.metadata.outputs.version }} + files: | + examples/chat/README.md + artifacts/* + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/Cargo.lock b/Cargo.lock index 7c90fef..0f9075b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,6 +283,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.0" @@ -394,6 +400,22 @@ dependencies = [ "zeroize", ] +[[package]] +name = "chat-example" +version = "0.1.0" +dependencies = [ + "clap", + "eyre", + "libp2p", + "multiaddr", + "owo-colors", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "cipher" version = "0.4.4" @@ -871,6 +893,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-ticker" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9763058047f713632a52e916cc7f6a4b3fc6e9fc1ff8c5b1dc49e5a89041682e" +dependencies = [ + "futures", + "futures-timer", + "instant", +] + [[package]] name = "futures-timer" version = "3.0.3" @@ -979,6 +1012,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hex_fmt" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" + [[package]] name = "hickory-proto" version = "0.24.1" @@ -1276,6 +1315,7 @@ dependencies = [ "libp2p-core", "libp2p-dcutr", "libp2p-dns", + "libp2p-gossipsub", "libp2p-identify", "libp2p-identity", "libp2p-mdns", @@ -1386,6 +1426,37 @@ dependencies = [ "tracing", ] +[[package]] +name = "libp2p-gossipsub" +version = "0.46.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d665144a616dadebdc5fff186b1233488cdcd8bfb1223218ff084b6d052c94f7" +dependencies = [ + "asynchronous-codec 0.7.0", + "base64 0.21.7", + "byteorder", + "bytes", + "either", + "fnv", + "futures", + "futures-ticker", + "getrandom", + "hex_fmt", + "instant", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "prometheus-client", + "quick-protobuf", + "quick-protobuf-codec 0.3.1", + "rand", + "regex", + "sha2", + "smallvec", + "tracing", + "void", +] + [[package]] name = "libp2p-identify" version = "0.44.1" @@ -1458,6 +1529,7 @@ dependencies = [ "instant", "libp2p-core", "libp2p-dcutr", + "libp2p-gossipsub", "libp2p-identify", "libp2p-identity", "libp2p-ping", @@ -2002,6 +2074,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caff54706df99d2a78a5a4e3455ff45448d81ef1bb63c22cd14052ca0e993a3f" + [[package]] name = "parking" version = "2.2.0" @@ -2043,7 +2121,7 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae" dependencies = [ - "base64", + "base64 0.22.0", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 6b98fd3..5475072 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [".", "examples/dcutr"] +members = [".", "examples/dcutr", "examples/chat"] [package] name = "relay-server" diff --git a/examples/chat/Cargo.toml b/examples/chat/Cargo.toml new file mode 100644 index 0000000..cdda457 --- /dev/null +++ b/examples/chat/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "chat-example" +version = "0.1.0" +authors = ["Calimero Limited "] +edition = "2021" +repository = "https://github.com/calimero-network/relay-server" +license = "MIT OR Apache-2.0" + +[dependencies] +clap = { version = "4.5.4", features = ["derive", "env"] } +eyre = "0.6.12" +libp2p = { version = "0.53.2", features = [ + "dcutr", + "dns", + "gossipsub", + "identify", + "macros", + "noise", + "ping", + "quic", + "relay", + "tokio", + "tcp", + "tls", + "yamux", +] } +multiaddr = "0.18.1" +owo-colors = "4.0.0" +serde = "1.0.196" +serde_json = "1.0.113" +tokio = { version = "1.35.1", features = [ + "io-std", + "macros", + "rt", + "rt-multi-thread", +] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } diff --git a/examples/chat/README.md b/examples/chat/README.md new file mode 100644 index 0000000..bf5a474 --- /dev/null +++ b/examples/chat/README.md @@ -0,0 +1,47 @@ +# Chat +This examples show cases how to manually dial (connect to) either local peer or remote peer has a reservation on relay-server. + +## Run local only +This examples shows how to run two sessions locally and connect sessions by manually dialing local peer. + +Run first chat session in echo mode. +``` +cargo run -p chat-example -- --mode echo --port 4002 --secret-key-seed 102 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF +``` + +Run second chat session in interactive mode with local peer dial. +``` +cargo run -p chat-example -- --mode interactive --port 4003 --secret-key-seed 103 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --dial-peer-addrs /ip4/127.0.0.1/udp/4002/quic-v1/p2p/12D3KooWMpeKAbMK4BTPsQY3rG7XwtdstseHGcq7kffY8LToYYKK --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF +``` + +In the interactive session publish new message manually: +``` +publish calimero-network/examples/chat/v0.0.1 ola +``` + +## Run locally with remote peer dial in +This examples shows how to run two sessions locally and connect sessions manually by dialing private remote peer from each session. For the gossip message to pass from one local session to second local session it needs to go "the long way" around (local -> remote -> local). + +Additional info: +- Remote instance is running in a private subnet behind NAT. +- Remote instance PeerId: `12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp` +- Remote instance address at the relay server: `ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp` +Run first chat session in interactive mode with remote peer dial. +``` +cargo run -p chat-example -- --mode interactive --port 4002 --secret-key-seed 102 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --dial-peer-addrs /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF +``` + +Run second chat session in interactive mode with remote peer dial. +``` +cargo run -p chat-example -- --mode interactive --port 4003 --secret-key-seed 103 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --dial-peer-addrs /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF +``` + +In any interactive session publish new message manually: +``` +publish calimero-network/examples/chat/v0.0.1 ola +``` + +## Debugging and known issues +- If multiple people are running the same example, some will fail to get reservation on relay server because the same PeerId already exists. + - Fix: change `secret-key-seed` to something else + diff --git a/examples/chat/src/main.rs b/examples/chat/src/main.rs new file mode 100644 index 0000000..227740f --- /dev/null +++ b/examples/chat/src/main.rs @@ -0,0 +1,312 @@ +use std::str::FromStr; + +use clap::Parser; +use clap::ValueEnum; +use libp2p::gossipsub; +use libp2p::identity; +use libp2p::PeerId; +use multiaddr::Multiaddr; +use tokio::io::AsyncBufReadExt; +use tracing::debug; +use tracing::{error, info}; +use tracing_subscriber::prelude::*; +use tracing_subscriber::EnvFilter; + +mod network; + +#[derive(Debug, Parser)] +#[clap(name = "Chat example")] +struct Opt { + /// The mode (interactive, echo). + #[clap(long, value_enum)] + mode: Mode, + + /// The port used to listen on all interfaces + #[clap(long)] + port: u16, + + /// Fixed value to generate deterministic peer id. + #[clap(long)] + secret_key_seed: u8, + + /// The listening address of a relay server to connect to. + #[clap(long)] + relay_address: Multiaddr, + + /// Optional list of peer addresses to dial immediately after network bootstrap. + #[clap(long)] + dial_peer_addrs: Option>, + + /// Optional list of gossip topic names to subscribe immediately after network bootstrap. + #[clap(long)] + gossip_topic_names: Option>, +} + +#[derive(Clone, Debug, PartialEq, Parser, ValueEnum)] +enum Mode { + Interactive, + Echo, +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + tracing_subscriber::registry() + // "info,chat_example=debug,{}", + .with(EnvFilter::builder().parse(format!( + "info,chat_example=debug,{}", + std::env::var("RUST_LOG").unwrap_or_default() + ))?) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let opt = Opt::parse(); + + let keypair = generate_ed25519(opt.secret_key_seed); + + let (network_client, mut network_events) = + network::run(keypair.clone(), opt.port, opt.relay_address.clone()).await?; + + if let Some(peer_addrs) = opt.dial_peer_addrs { + for addr in peer_addrs { + info!("Dialing peer: {}", addr); + network_client.dial(addr).await?; + } + } + + if let Some(topic_names) = opt.gossip_topic_names { + for topic_name in topic_names { + info!("Subscribing to topic: {}", topic_name); + let topic = gossipsub::IdentTopic::new(topic_name); + network_client.subscribe(topic).await?; + } + } + + let peer_id = keypair.public().to_peer_id(); + match opt.mode { + Mode::Interactive => { + let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines(); + + loop { + tokio::select! { + event = network_events.recv() => { + let Some(event) = event else { + break; + }; + handle_network_event(network_client.clone(), event, peer_id, false).await?; + } + line = stdin.next_line() => { + if let Some(line) = line? { + handle_line(network_client.clone(), line).await?; + } + } + } + } + } + Mode::Echo => { + while let Some(event) = network_events.recv().await { + handle_network_event(network_client.clone(), event, peer_id, true).await?; + } + } + } + + Ok(()) +} + +fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair { + let mut bytes = [0u8; 32]; + bytes[0] = secret_key_seed; + + identity::Keypair::ed25519_from_bytes(bytes).expect("only errors on wrong length") +} + +const LINE_START: &str = ">>>>>>>>>> "; + +async fn handle_network_event( + network_client: network::client::NetworkClient, + event: network::types::NetworkEvent, + peer_id: PeerId, + is_echo: bool, +) -> eyre::Result<()> { + match event { + network::types::NetworkEvent::IdentifySent { peer_id } => { + debug!("Identify sent to {:?}", peer_id); + } + network::types::NetworkEvent::IdentifyReceived { + peer_id, + observed_addr, + } => { + debug!( + "Identify received from {:?} at {:?}", + peer_id, observed_addr + ); + } + network::types::NetworkEvent::Message { message, .. } => { + let text = String::from_utf8_lossy(&message.data); + println!("{LINE_START} Received message: {:?}", text); + + if is_echo { + if text.starts_with("echo") { + debug!("Ignoring echo message"); + return Ok(()); + } + let text = format!("echo ({}): '{}'", peer_id, text); + + match network_client + .publish(message.topic, text.into_bytes()) + .await + { + Ok(_) => debug!("Echoed message back"), + Err(err) => error!(%err, "Failed to echo message back"), + }; + } + } + network::types::NetworkEvent::Subscribed { topic, .. } => { + debug!("Subscribed to {:?}", topic); + } + network::types::NetworkEvent::ListeningOn { address, .. } => { + info!("Listening on: {}", address); + } + event => { + info!("Unhandled event: {:?}", event); + } + } + Ok(()) +} + +async fn handle_line( + network_client: network::client::NetworkClient, + line: String, +) -> eyre::Result<()> { + let (command, args) = match line.split_once(' ') { + Some((method, payload)) => (method, Some(payload)), + None => (line.as_str(), None), + }; + + match command { + "dial" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: dial "); + return Ok(()); + } + }; + + let addr = match Multiaddr::from_str(args) { + Ok(addr) => addr, + Err(err) => { + println!("{LINE_START} Failed to parse MultiAddr: {:?}", err); + return Ok(()); + } + }; + + info!("{LINE_START} Dialing {:?}", addr); + + match network_client.dial(addr).await { + Ok(_) => { + println!("{LINE_START} Peer dialed"); + } + Err(err) => { + println!("{LINE_START} Failed to dial peer: {:?}", err); + } + }; + } + "subscribe" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: subscribe "); + return Ok(()); + } + }; + + let topic = gossipsub::IdentTopic::new(args.to_string()); + match network_client.subscribe(topic).await { + Ok(_) => { + println!("{LINE_START} Peer dialed"); + } + Err(err) => { + println!("{LINE_START} Failed to parse peer id: {:?}", err); + } + }; + } + "unsubscribe" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: unsubscribe "); + return Ok(()); + } + }; + + let topic = gossipsub::IdentTopic::new(args.to_string()); + match network_client.unsubscribe(topic).await { + Ok(_) => { + println!("{LINE_START} Peer dialed"); + } + Err(err) => { + println!("{LINE_START} Failed to parse peer id: {:?}", err); + } + }; + } + "publish" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: message "); + return Ok(()); + } + }; + + let mut args_iter = args.split_whitespace(); + let topic_name = match args_iter.next() { + Some(topic) => topic, + None => { + println!("{LINE_START} Usage: message "); + return Ok(()); + } + }; + + let message_data = match args_iter.next() { + Some(data) => data, + None => { + println!("{LINE_START} Usage: message "); + return Ok(()); + } + }; + + let topic = gossipsub::IdentTopic::new(topic_name.to_string()); + match network_client + .publish(topic.hash(), message_data.as_bytes().to_vec()) + .await + { + Ok(_) => { + println!("{LINE_START} Message published successfully"); + } + Err(err) => { + println!("{LINE_START} Failed to publish message: {:?}", err); + } + }; + } + "peers" => { + let peer_info = network_client.peer_info().await; + println!("{LINE_START} Peer info: {:?}", peer_info); + } + "mesh-peers" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: mesh-peers "); + return Ok(()); + } + }; + + let topic = gossipsub::IdentTopic::new(args.to_string()); + let mesh_peer_info = network_client.mesh_peer_info(topic.hash()).await; + println!("{LINE_START} Mesh peer info: {:?}", mesh_peer_info); + } + _ => println!("{LINE_START} Unknown command"), + } + + Ok(()) +} diff --git a/examples/chat/src/network.rs b/examples/chat/src/network.rs new file mode 100644 index 0000000..62145a9 --- /dev/null +++ b/examples/chat/src/network.rs @@ -0,0 +1,499 @@ +use std::collections::hash_map::{self, HashMap}; +use std::time::Duration; + +use libp2p::futures::prelude::*; +use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::{dcutr, gossipsub, identify, identity, noise, ping, relay, yamux, PeerId}; +use multiaddr::Multiaddr; +use tokio::sync::{mpsc, oneshot}; +use tokio::time; +use tracing::{debug, error, info, trace, warn}; + +pub mod client; +pub mod events; +pub mod types; + +use client::NetworkClient; + +const PROTOCOL_VERSION: &str = concat!("/", env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); + +#[derive(NetworkBehaviour)] +struct Behaviour { + dcutr: dcutr::Behaviour, + identify: identify::Behaviour, + gossipsub: gossipsub::Behaviour, + ping: ping::Behaviour, + relay_client: relay::client::Behaviour, +} + +pub async fn run( + keypair: identity::Keypair, + port: u16, + relay_address: Multiaddr, +) -> eyre::Result<(NetworkClient, mpsc::Receiver)> { + let (client, mut event_receiver, event_loop) = init(keypair).await?; + + tokio::spawn(event_loop.run()); + + let swarm_listen: Vec = vec![ + format!("/ip4/0.0.0.0/udp/{}/quic-v1", port).parse()?, + format!("/ip4/0.0.0.0/tcp/{}", port).parse()?, + ]; + for addr in swarm_listen { + client.listen_on(addr.clone()).await?; + } + + // Reference: https://github.com/libp2p/rust-libp2p/blob/60fd566a955a33c42a6ab6eefc1f0fedef9f8b83/examples/dcutr/src/main.rs#L118 + loop { + tokio::select! { + Some(event) = event_receiver.recv() => { + match event { + types::NetworkEvent::ListeningOn { address, .. } => { + info!("Listening on: {}", address); + } + _ => { + error!("Recieved unexpected network event: {:?}", event) + } + } + } + _ = tokio::time::sleep(Duration::from_secs(1)) => { + // Likely listening on all interfaces now, thus continuing by breaking the loop. + break; + } + } + } + + // Connect to the relay server. Not for the reservation or relayed connection, but to (a) learn + // our local public address and (b) enable a freshly started relay to learn its public address. + client.dial(relay_address.clone()).await?; + + let mut learned_observed_addr = false; + let mut told_relay_observed_addr = false; + let relay_peer_id = match relay_address.iter().find_map(|protocol| { + if let libp2p::multiaddr::Protocol::P2p(peer_id) = protocol { + Some(peer_id) + } else { + None + } + }) { + Some(peer_id) => peer_id, + None => eyre::bail!("Failed to get PeerId from relay address"), + }; + + loop { + match event_receiver.recv().await.unwrap() { + types::NetworkEvent::IdentifySent { peer_id } => { + if peer_id == relay_peer_id { + info!("Told relay its public address"); + told_relay_observed_addr = true; + } + } + types::NetworkEvent::IdentifyReceived { + peer_id, + observed_addr, + } => { + if peer_id == relay_peer_id { + info!("Relay told us our observed address: {}", observed_addr); + learned_observed_addr = true; + } + } + event => info!("unexpected: {event:?}"), + }; + + if learned_observed_addr && told_relay_observed_addr { + break; + } + } + + // Create reservation on relay server and wait for it to be accepted ... + client + .listen_on(relay_address.with(multiaddr::Protocol::P2pCircuit)) + .await?; + + loop { + match event_receiver.recv().await.unwrap() { + types::NetworkEvent::RelayReservationAccepted => { + info!("Relay accepted our reservation"); + break; + } + event => info!("unexpected: {event:?}"), + }; + } + + // ... and now wait until we are listening on the "relayed" interfaces + // Reference: https://github.com/libp2p/rust-libp2p/blob/60fd566a955a33c42a6ab6eefc1f0fedef9f8b83/examples/dcutr/src/main.rs#L118 + loop { + tokio::select! { + Some(event) = event_receiver.recv() => { + match event { + types::NetworkEvent::ListeningOn { address, .. } => { + info!("Listening on: {}", address); + } + _ => { + error!("Recieved unexpected network event: {:?}", event) + } + } + } + _ = tokio::time::sleep(Duration::from_secs(1)) => { + // Likely listening on all interfaces now, thus continuing by breaking the loop. + break; + } + } + } + + Ok((client, event_receiver)) +} + +async fn init( + keypair: identity::Keypair, +) -> eyre::Result<( + NetworkClient, + mpsc::Receiver, + EventLoop, +)> { + let swarm = libp2p::SwarmBuilder::with_existing_identity(keypair.clone()) + .with_tokio() + .with_tcp( + Default::default(), + (libp2p::tls::Config::new, libp2p::noise::Config::new), + libp2p::yamux::Config::default, + )? + .with_quic() + .with_relay_client(noise::Config::new, yamux::Config::default)? + .with_behaviour(|keypair, relay_behaviour| Behaviour { + dcutr: dcutr::Behaviour::new(keypair.public().to_peer_id()), + identify: identify::Behaviour::new( + identify::Config::new(PROTOCOL_VERSION.to_owned(), keypair.public()) + .with_push_listen_addr_updates(true), + ), + gossipsub: gossipsub::Behaviour::new( + gossipsub::MessageAuthenticity::Signed(keypair.clone()), + gossipsub::Config::default(), + ) + .expect("Valid gossipsub config."), + ping: ping::Behaviour::default(), + relay_client: relay_behaviour, + })? + .with_swarm_config(|cfg| { + cfg.with_idle_connection_timeout(time::Duration::from_secs(u64::MAX)) + }) + .build(); + + let (command_sender, command_receiver) = mpsc::channel(32); + let (event_sender, event_receiver) = mpsc::channel(32); + + let client = NetworkClient { + sender: command_sender, + }; + + let event_loop = EventLoop::new(swarm, command_receiver, event_sender); + + Ok((client, event_receiver, event_loop)) +} + +pub(crate) struct EventLoop { + swarm: Swarm, + command_receiver: mpsc::Receiver, + event_sender: mpsc::Sender, + pending_dial: HashMap>>>, +} + +impl EventLoop { + fn new( + swarm: Swarm, + command_receiver: mpsc::Receiver, + event_sender: mpsc::Sender, + ) -> Self { + Self { + swarm, + command_receiver, + event_sender, + pending_dial: Default::default(), + } + } + + pub(crate) async fn run(mut self) { + loop { + tokio::select! { + event = self.swarm.next() => self.handle_swarm_event(event.expect("Swarm stream to be infinite.")).await, + command = self.command_receiver.recv() => { + let Some(c) = command else { break }; + self.handle_command(c).await; + } + } + } + } + + async fn handle_command(&mut self, command: Command) { + match command { + Command::ListenOn { addr, sender } => { + let _ = match self.swarm.listen_on(addr) { + Ok(_) => sender.send(Ok(())), + Err(e) => sender.send(Err(eyre::eyre!(e))), + }; + } + Command::Dial { peer_addr, sender } => { + let addr_meta = match MultiaddrMeta::try_from(&peer_addr) { + Ok(meta) => meta, + Err(e) => { + let _ = sender.send(Err(eyre::eyre!(e))); + return; + } + }; + + match self.pending_dial.entry(*addr_meta.peer_id()) { + hash_map::Entry::Occupied(_) => { + let _ = sender.send(Ok(None)); + } + hash_map::Entry::Vacant(entry) => { + match self.swarm.dial(peer_addr) { + Ok(_) => { + entry.insert(sender); + } + Err(err) => { + let _ = sender.send(Err(eyre::eyre!(err))); + } + }; + } + } + } + Command::Subscribe { topic, sender } => { + if let Err(err) = self.swarm.behaviour_mut().gossipsub.subscribe(&topic) { + let _ = sender.send(Err(eyre::eyre!(err))); + return; + } + + let _ = sender.send(Ok(topic)); + } + Command::Unsubscribe { topic, sender } => { + if let Err(err) = self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic) { + let _ = sender.send(Err(eyre::eyre!(err))); + return; + } + + let _ = sender.send(Ok(topic)); + } + Command::PeerInfo { sender } => { + let peers: Vec = self + .swarm + .connected_peers() + .into_iter() + .map(|peer| peer.clone()) + .collect(); + let count = peers.len(); + + let _ = sender.send(PeerInfo { count, peers }); + } + Command::MeshPeerCount { topic, sender } => { + let peers: Vec = self + .swarm + .behaviour_mut() + .gossipsub + .mesh_peers(&topic) + .map(|peer| peer.clone()) + .collect(); + let count = peers.len(); + + let _ = sender.send(MeshPeerInfo { count, peers }); + } + Command::Publish { + topic, + data, + sender, + } => { + let id = match self.swarm.behaviour_mut().gossipsub.publish(topic, data) { + Ok(id) => id, + Err(err) => { + let _ = sender.send(Err(eyre::eyre!(err))); + return; + } + }; + + let _ = sender.send(Ok(id)); + } + } + } +} + +#[derive(Debug)] +pub(crate) enum Command { + ListenOn { + addr: Multiaddr, + sender: oneshot::Sender>, + }, + Dial { + peer_addr: Multiaddr, + sender: oneshot::Sender>>, + }, + Subscribe { + topic: gossipsub::IdentTopic, + sender: oneshot::Sender>, + }, + Unsubscribe { + topic: gossipsub::IdentTopic, + sender: oneshot::Sender>, + }, + PeerInfo { + sender: oneshot::Sender, + }, + MeshPeerCount { + topic: gossipsub::TopicHash, + sender: oneshot::Sender, + }, + Publish { + topic: gossipsub::TopicHash, + data: Vec, + sender: oneshot::Sender>, + }, +} + +#[allow(dead_code)] // Info structs for pretty printing +#[derive(Debug)] +pub(crate) struct PeerInfo { + count: usize, + peers: Vec, +} + +#[allow(dead_code)] // Info structs for pretty printing +#[derive(Debug)] +pub(crate) struct MeshPeerInfo { + count: usize, + peers: Vec, +} + +#[derive(Debug)] +pub(crate) struct MultiaddrMeta { + peer_id: PeerId, + relay_peer_ids: Vec, +} + +impl TryFrom<&Multiaddr> for MultiaddrMeta { + type Error = &'static str; + + fn try_from(value: &Multiaddr) -> Result { + let mut peer_ids = Vec::new(); + + let mut iter = value.iter(); + while let Some(protocol) = iter.next() { + match protocol { + multiaddr::Protocol::P2pCircuit => { + if peer_ids.is_empty() { + return Err("expected at least one p2p proto before P2pCircuit"); + } + let Some(multiaddr::Protocol::P2p(id)) = iter.next() else { + return Err("expected p2p proto after P2pCircuit"); + }; + peer_ids.push(id); + } + multiaddr::Protocol::P2p(id) => { + peer_ids.push(id); + } + _ => {} + } + } + + if let Some(peer_id) = peer_ids.pop() { + Ok(Self { + peer_id, + relay_peer_ids: peer_ids, + }) + } else { + Err("expected at least one p2p proto") + } + } +} + +impl MultiaddrMeta { + fn peer_id(&self) -> &PeerId { + &self.peer_id + } + + fn is_relayed(&self) -> bool { + !self.relay_peer_ids.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_valid_multiaddr() { + let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp/p2p-circuit/p2p/12D3KooWMpeKAbMK4BTPsQY3rG7XwtdstseHGcq7kffY8LToYYKK"; + let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); + + let meta = MultiaddrMeta::try_from(&multiaddr).expect("valid MultiaddrMeta"); + let expected_peer_id: PeerId = "12D3KooWMpeKAbMK4BTPsQY3rG7XwtdstseHGcq7kffY8LToYYKK" + .parse() + .expect("valid peer id"); + let relay_peer_ids: Vec = vec![ + "12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF" + .parse() + .expect("valid peer id"), + "12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp" + .parse() + .expect("valid peer id"), + ]; + + assert_eq!(meta.peer_id, expected_peer_id); + assert_eq!(meta.relay_peer_ids, relay_peer_ids); + assert!(meta.is_relayed()); + } + + #[test] + fn test_no_p2p_proto() { + let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1"; + let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); + + let result = MultiaddrMeta::try_from(&multiaddr); + assert!(result.is_err()); + assert_eq!(result.err(), Some("expected at least one p2p proto")); + } + + #[test] + fn test_p2p_circuit_without_previous_p2p() { + let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1/p2p-circuit"; + let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); + + let result = MultiaddrMeta::try_from(&multiaddr); + assert!(result.is_err()); + assert_eq!( + result.err(), + Some("expected at least one p2p proto before P2pCircuit") + ); + } + + #[test] + fn test_single_p2p_no_circuit() { + let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF"; + let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); + + let meta = MultiaddrMeta::try_from(&multiaddr).expect("valid MultiaddrMeta"); + let expected_peer_id: PeerId = "12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF" + .parse() + .expect("valid peer id"); + + assert_eq!(meta.peer_id, expected_peer_id); + assert!(meta.relay_peer_ids.is_empty()); + assert!(!meta.is_relayed()); + } + + #[test] + fn test_p2p_circuit_with_single_p2p() { + let addr_str = "/ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp"; + let multiaddr: Multiaddr = addr_str.parse().expect("valid multiaddr"); + + let meta = MultiaddrMeta::try_from(&multiaddr).expect("valid MultiaddrMeta"); + let expected_peer_id: PeerId = "12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp" + .parse() + .expect("valid peer id"); + let relay_peer_ids: Vec = + vec!["12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF" + .parse() + .expect("valid peer id")]; + + assert_eq!(meta.peer_id, expected_peer_id); + assert_eq!(meta.relay_peer_ids, relay_peer_ids); + assert!(meta.is_relayed()); + } +} diff --git a/examples/chat/src/network/client.rs b/examples/chat/src/network/client.rs new file mode 100644 index 0000000..cfbf9b4 --- /dev/null +++ b/examples/chat/src/network/client.rs @@ -0,0 +1,102 @@ +use libp2p::{gossipsub, Multiaddr}; +use tokio::sync::{mpsc, oneshot}; + +use super::Command; + +#[derive(Clone)] +pub struct NetworkClient { + pub(crate) sender: mpsc::Sender, +} + +impl NetworkClient { + pub async fn listen_on(&self, addr: Multiaddr) -> eyre::Result<()> { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(Command::ListenOn { addr, sender }) + .await + .expect("Command receiver not to be dropped."); + + receiver.await.expect("Sender not to be dropped.") + } + + pub async fn subscribe( + &self, + topic: gossipsub::IdentTopic, + ) -> eyre::Result { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(Command::Subscribe { topic, sender }) + .await + .expect("Command receiver not to be dropped."); + + receiver.await.expect("Sender not to be dropped.") + } + + pub async fn unsubscribe( + &self, + topic: gossipsub::IdentTopic, + ) -> eyre::Result { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(Command::Unsubscribe { topic, sender }) + .await + .expect("Command receiver not to be dropped."); + + receiver.await.expect("Sender not to be dropped.") + } + + pub async fn peer_info(&self) -> super::PeerInfo { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(Command::PeerInfo { sender }) + .await + .expect("Command receiver not to be dropped."); + + receiver.await.expect("Sender not to be dropped.") + } + + pub async fn mesh_peer_info(&self, topic: gossipsub::TopicHash) -> super::MeshPeerInfo { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(Command::MeshPeerCount { topic, sender }) + .await + .expect("Command receiver not to be dropped."); + + receiver.await.expect("Sender not to be dropped.") + } + + pub async fn publish( + &self, + topic: gossipsub::TopicHash, + data: Vec, + ) -> eyre::Result { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(Command::Publish { + topic, + data, + sender, + }) + .await + .expect("Command receiver not to be dropped."); + + receiver.await.expect("Sender not to be dropped.") + } + + pub async fn dial(&self, peer_addr: Multiaddr) -> eyre::Result> { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(Command::Dial { peer_addr, sender }) + .await + .expect("Command receiver not to be dropped."); + + receiver.await.expect("Sender not to be dropped.") + } +} diff --git a/examples/chat/src/network/events.rs b/examples/chat/src/network/events.rs new file mode 100644 index 0000000..89cc8e8 --- /dev/null +++ b/examples/chat/src/network/events.rs @@ -0,0 +1,115 @@ +use tracing::error; + +use super::*; + +mod dcutr; +mod gossipsub; +mod identify; +mod ping; +mod relay_client; + +pub trait EventHandler { + async fn handle(&mut self, event: E); +} + +impl EventLoop { + pub(super) async fn handle_swarm_event(&mut self, event: SwarmEvent) { + match event { + SwarmEvent::Behaviour(event) => match event { + BehaviourEvent::Identify(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Gossipsub(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::RelayClient(event) => { + events::EventHandler::handle(self, event).await + } + BehaviourEvent::Ping(event) => events::EventHandler::handle(self, event).await, + BehaviourEvent::Dcutr(event) => events::EventHandler::handle(self, event).await, + }, + SwarmEvent::NewListenAddr { + listener_id, + address, + } => { + let local_peer_id = *self.swarm.local_peer_id(); + if let Err(err) = self + .event_sender + .send(types::NetworkEvent::ListeningOn { + listener_id, + address: address.with(multiaddr::Protocol::P2p(local_peer_id)), + }) + .await + { + error!("Failed to send listening on event: {:?}", err); + } + } + SwarmEvent::IncomingConnection { .. } => {} + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + debug!(%peer_id, ?endpoint, "Connection established"); + match endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => { + let addr_meta = match MultiaddrMeta::try_from(&address) { + Ok(meta) => meta, + Err(e) => { + error!(%e, "Failed to parse dialer address meta for established connection"); + return; + } + }; + + if addr_meta.is_relayed() { + debug!("Connection established via relay"); + } + + if let Some(sender) = self.pending_dial.remove(&peer_id) { + let _ = sender.send(Ok(Some(()))); + } + } + _ => {} + } + } + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + endpoint, + num_established, + cause, + } => { + debug!( + "Connection closed: {} {:?} {:?} {} {:?}", + peer_id, connection_id, endpoint, num_established, cause + ); + } + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + debug!(%error, ?peer_id, "Outgoing connection error"); + if let Some(peer_id) = peer_id { + if let Some(sender) = self.pending_dial.remove(&peer_id) { + let _ = sender.send(Err(eyre::eyre!(error))); + } + } + } + SwarmEvent::IncomingConnectionError { error, .. } => { + debug!(%error, "Incoming connection error") + } + SwarmEvent::Dialing { + peer_id: Some(peer_id), + .. + } => trace!("Dialing peer: {}", peer_id), + SwarmEvent::ExpiredListenAddr { address, .. } => { + trace!("Expired listen address: {}", address) + } + SwarmEvent::ListenerClosed { + addresses, reason, .. + } => trace!("Listener closed: {:?} {:?}", addresses, reason.err()), + SwarmEvent::ListenerError { error, .. } => trace!(%error, "Listener error"), + SwarmEvent::NewExternalAddrCandidate { address } => { + trace!("New external address candidate: {}", address) + } + SwarmEvent::ExternalAddrConfirmed { address } => { + trace!("External address confirmed: {}", address) + } + SwarmEvent::ExternalAddrExpired { address } => { + trace!("External address expired: {}", address) + } + unhandled => warn!("Unhandled event: {:?}", unhandled), + } + } +} diff --git a/examples/chat/src/network/events/dcutr.rs b/examples/chat/src/network/events/dcutr.rs new file mode 100644 index 0000000..7c4840f --- /dev/null +++ b/examples/chat/src/network/events/dcutr.rs @@ -0,0 +1,11 @@ +use libp2p::dcutr; +use owo_colors::OwoColorize; +use tracing::debug; + +use super::{EventHandler, EventLoop}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: dcutr::Event) { + debug!("{}: {:?}", "dcutr".yellow(), event); + } +} diff --git a/examples/chat/src/network/events/gossipsub.rs b/examples/chat/src/network/events/gossipsub.rs new file mode 100644 index 0000000..b235a7f --- /dev/null +++ b/examples/chat/src/network/events/gossipsub.rs @@ -0,0 +1,37 @@ +use libp2p::gossipsub; +use owo_colors::OwoColorize; +use tracing::{debug, error}; + +use super::{types, EventHandler, EventLoop}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: gossipsub::Event) { + debug!("{}: {:?}", "gossipsub".yellow(), event); + + match event { + gossipsub::Event::Message { + message_id: id, + message, + .. + } => { + if let Err(err) = self + .event_sender + .send(types::NetworkEvent::Message { id, message }) + .await + { + error!("Failed to send message event: {:?}", err); + } + } + gossipsub::Event::Subscribed { peer_id, topic } => { + if let Err(_) = self + .event_sender + .send(types::NetworkEvent::Subscribed { peer_id, topic }) + .await + { + error!("Failed to send subscribed event"); + } + } + _ => {} + } + } +} diff --git a/examples/chat/src/network/events/identify.rs b/examples/chat/src/network/events/identify.rs new file mode 100644 index 0000000..b51ebbd --- /dev/null +++ b/examples/chat/src/network/events/identify.rs @@ -0,0 +1,39 @@ +use libp2p::identify; +use owo_colors::OwoColorize; +use tracing::{debug, error}; + +use super::{types, EventHandler, EventLoop}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: identify::Event) { + debug!("{}: {:?}", "identify".yellow(), event); + + match event { + identify::Event::Received { + peer_id, + info: identify::Info { observed_addr, .. }, + } => { + if let Err(err) = self + .event_sender + .send(types::NetworkEvent::IdentifyReceived { + peer_id, + observed_addr, + }) + .await + { + error!("Failed to send message event: {:?}", err); + } + } + identify::Event::Sent { peer_id } => { + if let Err(err) = self + .event_sender + .send(types::NetworkEvent::IdentifySent { peer_id }) + .await + { + error!("Failed to send message event: {:?}", err); + } + } + _ => {} + } + } +} diff --git a/examples/chat/src/network/events/ping.rs b/examples/chat/src/network/events/ping.rs new file mode 100644 index 0000000..3c0efab --- /dev/null +++ b/examples/chat/src/network/events/ping.rs @@ -0,0 +1,11 @@ +use libp2p::ping; +use owo_colors::OwoColorize; +use tracing::debug; + +use super::{EventHandler, EventLoop}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: ping::Event) { + debug!("{}: {:?}", "ping".yellow(), event); + } +} diff --git a/examples/chat/src/network/events/relay_client.rs b/examples/chat/src/network/events/relay_client.rs new file mode 100644 index 0000000..ed35f3a --- /dev/null +++ b/examples/chat/src/network/events/relay_client.rs @@ -0,0 +1,24 @@ +use libp2p::relay; +use owo_colors::OwoColorize; +use tracing::{debug, error}; + +use super::{types, EventHandler, EventLoop}; + +impl EventHandler for EventLoop { + async fn handle(&mut self, event: relay::client::Event) { + debug!("{}: {:?}", "relay_client".yellow(), event); + + match event { + relay::client::Event::ReservationReqAccepted { .. } => { + if let Err(err) = self + .event_sender + .send(types::NetworkEvent::RelayReservationAccepted) + .await + { + error!("Failed to send message event: {:?}", err); + } + } + _ => {} + } + } +} diff --git a/examples/chat/src/network/types.rs b/examples/chat/src/network/types.rs new file mode 100644 index 0000000..1f5f099 --- /dev/null +++ b/examples/chat/src/network/types.rs @@ -0,0 +1,27 @@ +use libp2p::core::transport; +pub use libp2p::gossipsub::{Message, MessageId, TopicHash}; +pub use libp2p::identity::PeerId; + +#[derive(Debug)] +pub enum NetworkEvent { + ListeningOn { + listener_id: transport::ListenerId, + address: libp2p::Multiaddr, + }, + Subscribed { + peer_id: PeerId, + topic: TopicHash, + }, + Message { + id: MessageId, + message: Message, + }, + IdentifySent { + peer_id: PeerId, + }, + IdentifyReceived { + peer_id: PeerId, + observed_addr: libp2p::Multiaddr, + }, + RelayReservationAccepted, +}