diff --git a/Cargo.lock b/Cargo.lock index bc379f5..2c1baab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2366,7 +2366,7 @@ checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" [[package]] name = "libp2p" version = "0.54.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "bytes 1.7.1", "either", @@ -2402,7 +2402,7 @@ dependencies = [ [[package]] name = "libp2p-allow-block-list" version = "0.4.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "libp2p-core", "libp2p-identity", @@ -2413,7 +2413,7 @@ dependencies = [ [[package]] name = "libp2p-autonat" version = "0.13.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "async-trait", "asynchronous-codec", @@ -2439,7 +2439,7 @@ dependencies = [ [[package]] name = "libp2p-connection-limits" version = "0.4.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "libp2p-core", "libp2p-identity", @@ -2450,7 +2450,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.42.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "either", "fnv", @@ -2477,7 +2477,7 @@ dependencies = [ [[package]] name = "libp2p-dcutr" version = "0.12.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "either", @@ -2499,7 +2499,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.42.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "async-trait", "futures", @@ -2514,7 +2514,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.47.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "base64 0.22.1", @@ -2545,7 +2545,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.45.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "either", @@ -2587,7 +2587,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.46.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "arrayvec", "asynchronous-codec", @@ -2615,7 +2615,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.46.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "data-encoding", "futures", @@ -2635,7 +2635,7 @@ dependencies = [ [[package]] name = "libp2p-metrics" version = "0.14.2" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "libp2p-core", @@ -2655,7 +2655,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.45.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "bytes 1.7.1", @@ -2680,7 +2680,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.45.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "either", "futures", @@ -2697,7 +2697,7 @@ dependencies = [ [[package]] name = "libp2p-quic" version = "0.11.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "bytes 1.7.1", "futures", @@ -2720,7 +2720,7 @@ dependencies = [ [[package]] name = "libp2p-relay" version = "0.18.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "bytes 1.7.1", @@ -2744,7 +2744,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" version = "0.27.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "async-trait", "futures", @@ -2763,7 +2763,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.45.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "either", "fnv", @@ -2786,7 +2786,7 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" version = "0.35.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2797,7 +2797,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.42.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "futures-timer", @@ -2813,7 +2813,7 @@ dependencies = [ [[package]] name = "libp2p-tls" version = "0.5.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "futures-rustls", @@ -2831,7 +2831,7 @@ dependencies = [ [[package]] name = "libp2p-upnp" version = "0.3.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "futures-timer", @@ -2846,7 +2846,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.46.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "either", "futures", @@ -3223,7 +3223,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.13.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "bytes 1.7.1", "futures", @@ -3944,7 +3944,7 @@ dependencies = [ [[package]] name = "quick-protobuf-codec" version = "0.3.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "bytes 1.7.1", @@ -4520,7 +4520,7 @@ checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" [[package]] name = "rw-stream-sink" version = "0.4.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=be2ed55#be2ed5544a5d5fbbed4b2f57600398cdebe48ca6" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "pin-project", diff --git a/Cargo.toml b/Cargo.toml index 9302fd2..dc64d5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ fastbloom-rs = "0.5.9" ollama-workflows = { git = "https://github.com/andthattoo/ollama-workflows", rev = "d6b2e1e" } # peer-to-peer -libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "be2ed55", features = [ +libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "3c55e95", features = [ # libp2p = { version = "0.54.1", features = [ "dcutr", "ping", diff --git a/src/node.rs b/src/node.rs index 910337d..20d7f66 100644 --- a/src/node.rs +++ b/src/node.rs @@ -126,89 +126,86 @@ impl DriaComputeNode { self.available_nodes_last_refreshed = tokio::time::Instant::now(); } + let (peer_id, message_id, message) = event; + let topic = message.topic.clone(); + let topic_str = topic.as_str(); + + // handle message w.r.t topic + if std::matches!(topic_str, PINGPONG_LISTEN_TOPIC | WORKFLOW_LISTEN_TOPIC) { + // ensure that the message is from a valid source (origin) + let source_peer_id = match message.source { + Some(peer) => peer, + None => { + log::warn!("Received {} message from {} without source.", topic_str, peer_id); + self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Ignore)?; + continue; + } + }; + + log::info!( + "Received {} message ({})\nFrom: {}\nSource: {}", + topic_str, + message_id, + peer_id, + source_peer_id + ); + + // ensure that message is from the static RPCs + if !self.available_nodes.rpc_nodes.contains(&source_peer_id) { + log::warn!("Received message from unauthorized source: {}", source_peer_id); + log::debug!("Allowed sources: {:#?}", self.available_nodes.rpc_nodes); + self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Ignore)?; + continue; + } - if let Some((peer_id, message_id, message)) = event { - let topic = message.topic.clone(); - let topic_str = topic.as_str(); - - // handle message w.r.t topic - if std::matches!(topic_str, PINGPONG_LISTEN_TOPIC | WORKFLOW_LISTEN_TOPIC) { - // ensure that the message is from a valid source (origin) - let source_peer_id = match message.source { - Some(peer) => peer, - None => { - log::warn!("Received {} message from {} without source.", topic_str, peer_id); - self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Ignore)?; - continue; - } - }; - - log::info!( - "Received {} message ({})\nFrom: {}\nSource: {}", - topic_str, - message_id, - peer_id, - source_peer_id - ); - - // ensure that message is from the static RPCs - if !self.available_nodes.rpc_nodes.contains(&source_peer_id) { - log::warn!("Received message from unauthorized source: {}", source_peer_id); - log::debug!("Allowed sources: {:#?}", self.available_nodes.rpc_nodes); + // first, parse the raw gossipsub message to a prepared message + // if unparseable, + let message = match self.parse_message_to_prepared_message(message.clone()) { + Ok(message) => message, + Err(e) => { + log::error!("Error parsing message: {}", e); + log::debug!("Message: {}", String::from_utf8_lossy(&message.data)); self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Ignore)?; continue; } + }; - // first, parse the raw gossipsub message to a prepared message - // if unparseable, - let message = match self.parse_message_to_prepared_message(message.clone()) { - Ok(message) => message, - Err(e) => { - log::error!("Error parsing message: {}", e); - log::debug!("Message: {}", String::from_utf8_lossy(&message.data)); - self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Ignore)?; - continue; - } - }; - - // then handle the prepared message - let handle_result = match topic_str { - WORKFLOW_LISTEN_TOPIC => { - WorkflowHandler::handle_compute(self, message, WORKFLOW_RESPONSE_TOPIC).await - } - PINGPONG_LISTEN_TOPIC => { - PingpongHandler::handle_compute(self, message, PINGPONG_RESPONSE_TOPIC).await - } - // TODO: can we do this in a nicer way? - // TODO: yes, cast to enum above and let type-casting do the work - _ => unreachable!() // unreachable because of the if condition - }; - - // validate the message based on the result - match handle_result { - Ok(acceptance) => { - - self.p2p.validate_message(&message_id, &peer_id, acceptance)?; - }, - Err(err) => { - log::error!("Error handling {} message: {}", topic_str, err); - self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Reject)?; - } + // then handle the prepared message + let handle_result = match topic_str { + WORKFLOW_LISTEN_TOPIC => { + WorkflowHandler::handle_compute(self, message, WORKFLOW_RESPONSE_TOPIC).await + } + PINGPONG_LISTEN_TOPIC => { + PingpongHandler::handle_compute(self, message, PINGPONG_RESPONSE_TOPIC).await + } + // TODO: can we do this in a nicer way? + // TODO: yes, cast to enum above and let type-casting do the work + _ => unreachable!() // unreachable because of the if condition + }; + + // validate the message based on the result + match handle_result { + Ok(acceptance) => { + + self.p2p.validate_message(&message_id, &peer_id, acceptance)?; + }, + Err(err) => { + log::error!("Error handling {} message: {}", topic_str, err); + self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Reject)?; } - } else if std::matches!(topic_str, PINGPONG_RESPONSE_TOPIC | WORKFLOW_RESPONSE_TOPIC) { - // since we are responding to these topics, we might receive messages from other compute nodes - // we can gracefully ignore them - log::debug!("Ignoring message for topic: {}", topic_str); - - // accept this message for propagation - self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Accept)?; - } else { - log::warn!("Received message from unexpected topic: {}", topic_str); - - // reject this message as its from a foreign topic - self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Reject)?; } - + } else if std::matches!(topic_str, PINGPONG_RESPONSE_TOPIC | WORKFLOW_RESPONSE_TOPIC) { + // since we are responding to these topics, we might receive messages from other compute nodes + // we can gracefully ignore them + log::debug!("Ignoring message for topic: {}", topic_str); + + // accept this message for propagation + self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Accept)?; + } else { + log::warn!("Received message from unexpected topic: {}", topic_str); + + // reject this message as its from a foreign topic + self.p2p.validate_message(&message_id, &peer_id, gossipsub::MessageAcceptance::Reject)?; } }, _ = self.cancellation.cancelled() => break, diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 92fb150..e5f04c8 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -60,6 +60,9 @@ fn create_identify_behavior(local_public_key: PublicKey) -> identify::Behaviour Behaviour::new(cfg) } +/// Configures the Dcutr behavior to allow nodes to connect via hole-punching. +/// It uses a Relay for the hole-punching process, and if it succeeds the peers are +/// connected directly without the need for the relay; otherwise, they keep using the relay. #[inline] fn create_dcutr_behavior(local_peer_id: PeerId) -> dcutr::Behaviour { use dcutr::Behaviour; @@ -95,6 +98,12 @@ fn create_gossipsub_behavior(author: PeerId) -> gossipsub::Behaviour { /// and check their fields based on whether they exist or not. const VALIDATION_MODE: ValidationMode = ValidationMode::Permissive; + /// Heartbeat interval in seconds + const HEARTBEAT_INTERVAL_SECS: u64 = 10; + + /// Duplicate cache time in seconds + const DUPLICATE_CACHE_TIME_SECS: u64 = 120; + /// Gossip cache TTL in seconds const GOSSIP_TTL_SECS: u64 = 100; @@ -108,6 +117,10 @@ fn create_gossipsub_behavior(author: PeerId) -> gossipsub::Behaviour { /// because we don't need historic messages at all const MAX_IHAVE_LENGTH: usize = 100; + /// Max size of the send queue + /// This helps to avoid memory exhaustion during high load + const MAX_SEND_QUEUE_SIZE: usize = 400; + // message id's are simply hashes of the message data let message_id_fn = |message: &Message| { let mut hasher = hash_map::DefaultHasher::new(); @@ -120,15 +133,17 @@ fn create_gossipsub_behavior(author: PeerId) -> gossipsub::Behaviour { Behaviour::new( MessageAuthenticity::Author(author), ConfigBuilder::default() - .heartbeat_interval(Duration::from_secs(10)) - .max_transmit_size(MAX_TRANSMIT_SIZE) // 256 KB + .heartbeat_interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) + .max_transmit_size(MAX_TRANSMIT_SIZE) .message_id_fn(message_id_fn) + .message_capacity(MESSAGE_CAPACITY) .message_ttl(Duration::from_secs(MESSAGE_TTL_SECS)) .gossip_ttl(Duration::from_secs(GOSSIP_TTL_SECS)) - .message_capacity(MESSAGE_CAPACITY) + .duplicate_cache_time(Duration::from_secs(DUPLICATE_CACHE_TIME_SECS)) + .max_ihave_length(MAX_IHAVE_LENGTH) + .send_queue_size(MAX_SEND_QUEUE_SIZE) .validation_mode(VALIDATION_MODE) .validate_messages() - .max_ihave_length(MAX_IHAVE_LENGTH) .build() .expect("Valid config"), // TODO: better error handling ) diff --git a/src/p2p/client.rs b/src/p2p/client.rs index 17662ae..c798cf1 100644 --- a/src/p2p/client.rs +++ b/src/p2p/client.rs @@ -204,7 +204,7 @@ impl P2PClient { /// /// This method should be called in a loop to keep the client running. /// When a GossipSub message is received, it will be returned. - pub async fn process_events(&mut self) -> Option<(PeerId, MessageId, Message)> { + pub async fn process_events(&mut self) -> (PeerId, MessageId, Message) { loop { // refresh peers self.refresh_peer_counts().await; @@ -227,7 +227,7 @@ impl P2PClient { message, }, )) => { - return Some((peer_id, message_id, message)); + return (peer_id, message_id, message); } SwarmEvent::Behaviour(DriaBehaviourEvent::Autonat( autonat::Event::StatusChanged { old, new }, @@ -326,7 +326,7 @@ impl P2PClient { /// Should be called in a loop. /// /// Returns: (All Peer Count, Mesh Peer Count) - async fn refresh_peer_counts(&mut self) { + pub async fn refresh_peer_counts(&mut self) { if self.peer_last_refreshed.elapsed() > Duration::from_secs(PEER_REFRESH_INTERVAL_SECS) { let random_peer = PeerId::random(); self.swarm diff --git a/src/p2p/message.rs b/src/p2p/message.rs index 928bef6..47f3f42 100644 --- a/src/p2p/message.rs +++ b/src/p2p/message.rs @@ -15,6 +15,12 @@ use serde::{Deserialize, Serialize}; /// A parsed message from gossipsub. When first received, the message data is simply a vector of bytes. /// We treat that bytearray as a stringified JSON object, and parse it into this struct. +/// +/// TODO: these are all available at protocol level as well +/// - payload is the data itself +/// - topic is available as TopicHash of Gossipsub +/// - version is given within the Identify protocol +/// - timestamp is available at protocol level via DataTransform #[derive(Serialize, Deserialize, Debug, Clone)] pub struct P2PMessage { pub(crate) payload: String,