From dc41f154b6d482cadf1011ef4289cec9ab097ca1 Mon Sep 17 00:00:00 2001 From: erhant Date: Fri, 27 Dec 2024 21:36:26 +0300 Subject: [PATCH 01/13] setup req-res --- Cargo.lock | 5 ++++ p2p/Cargo.toml | 4 ++++ p2p/src/behaviour.rs | 24 ++++++++++++++++++- p2p/src/client.rs | 56 ++++++++++++++++++++++++++++++++++++++++++++ p2p/src/lib.rs | 2 ++ p2p/src/protocol.rs | 13 ++++++++++ p2p/src/reqres.rs | 14 +++++++++++ 7 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 p2p/src/reqres.rs diff --git a/Cargo.lock b/Cargo.lock index 69fb50a..36d4fdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1035,6 +1035,8 @@ dependencies = [ "libp2p", "libp2p-identity", "log", + "serde", + "serde_json", "tokio 1.42.0", "tokio-util 0.7.13", ] @@ -2468,6 +2470,7 @@ dependencies = [ "libp2p-ping", "libp2p-quic", "libp2p-relay", + "libp2p-request-response", "libp2p-swarm", "libp2p-tcp", "libp2p-upnp", @@ -2833,6 +2836,8 @@ dependencies = [ "libp2p-identity", "libp2p-swarm", "rand 0.8.5", + "serde", + "serde_json", "smallvec", "tracing", "void", diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index caae8d5..587f7bb 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -22,6 +22,8 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "7ce9f9 "mdns", "noise", "macros", + "request-response", + "json", "tcp", "yamux", "quic", @@ -30,6 +32,8 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "7ce9f9 libp2p-identity = { version = "0.2.9", features = ["secp256k1"] } log.workspace = true eyre.workspace = true +serde.workspace = true +serde_json.workspace = true tokio-util.workspace = true tokio.workspace = true diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index e9694b7..6f135d0 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -6,7 +6,11 @@ use eyre::{eyre, Context, Result}; use libp2p::identity::{Keypair, PeerId, PublicKey}; use libp2p::kad::store::MemoryStore; use libp2p::StreamProtocol; -use libp2p::{autonat, connection_limits, dcutr, gossipsub, identify, kad, relay}; +use libp2p::{ + autonat, connection_limits, dcutr, gossipsub, identify, kad, relay, request_response, +}; + +use crate::reqres; #[derive(libp2p::swarm::NetworkBehaviour)] pub struct DriaBehaviour { @@ -17,6 +21,8 @@ pub struct DriaBehaviour { pub autonat: autonat::Behaviour, pub dcutr: dcutr::Behaviour, pub connection_limits: connection_limits::Behaviour, + pub request_response: + request_response::json::Behaviour, } impl DriaBehaviour { @@ -25,6 +31,7 @@ impl DriaBehaviour { relay_behaviour: relay::client::Behaviour, identity_protocol: String, kademlia_protocol: StreamProtocol, + reqres_protocol: StreamProtocol, ) -> Result { let public_key = key.public(); let peer_id = public_key.to_peer_id(); @@ -38,10 +45,25 @@ impl DriaBehaviour { dcutr: create_dcutr_behaviour(peer_id), identify: create_identify_behaviour(public_key, identity_protocol), connection_limits: create_connection_limits_behaviour(), + request_response: create_request_response_behaviour(reqres_protocol), }) } } +/// Configures the request-response behaviour for the node. +#[inline] +fn create_request_response_behaviour( + protocol_name: StreamProtocol, +) -> request_response::json::Behaviour { + // TODO: use json instead here? + use request_response::{Behaviour, ProtocolSupport}; + + Behaviour::new( + [(protocol_name, ProtocolSupport::Full)], + request_response::Config::default(), + ) +} + /// Configures the connection limits. #[inline] fn create_connection_limits_behaviour() -> connection_limits::Behaviour { diff --git a/p2p/src/client.rs b/p2p/src/client.rs index 492a842..b99afae 100644 --- a/p2p/src/client.rs +++ b/p2p/src/client.rs @@ -2,6 +2,7 @@ use eyre::Result; use libp2p::futures::StreamExt; use libp2p::gossipsub::{Message, MessageId}; use libp2p::kad::{GetClosestPeersError, GetClosestPeersOk, QueryResult}; +use libp2p::request_response; use libp2p::swarm::SwarmEvent; use libp2p::{autonat, gossipsub, identify, kad, multiaddr::Protocol, noise, tcp, yamux}; use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder}; @@ -72,6 +73,7 @@ impl DriaP2PClient { relay_behaviour, protocol.identity(), protocol.kademlia(), + protocol.request_response(), ) .map_err(Into::into) })? @@ -303,6 +305,60 @@ impl DriaP2PClient { log::warn!("AutoNAT status changed from {:?} to {:?}", old, new); } + // request-response events + SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( + request_response::Event::Message { message, .. }, + )) => match message { + request_response::Message::Request { + request, channel, .. + } => { + // TODO: handle request + } + request_response::Message::Response { + request_id, + response, + } => { + // TODO: handle response + } + }, + SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( + request_response::Event::ResponseSent { peer, request_id }, + )) => { + log::debug!( + "Request-Response: Response sent to peer {} with request_id {}", + peer, + request_id + ) + } + SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( + request_response::Event::OutboundFailure { + peer, + request_id, + error, + }, + )) => { + log::error!( + "Request-Response: Outbound failure to peer {} with request_id {}: {:?}", + peer, + request_id, + error + ); + } + SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( + request_response::Event::InboundFailure { + peer, + request_id, + error, + }, + )) => { + log::error!( + "Request-Response: Inbound failure to peer {} with request_id {}: {:?}", + peer, + request_id, + error + ); + } + // log listen addreses SwarmEvent::NewListenAddr { address, .. } => { log::warn!("Local node is listening on {}", address); diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 74b8913..829aea8 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -20,3 +20,5 @@ pub use nodes::DriaNodes; // re-exports pub use libp2p; pub use libp2p_identity; + +mod reqres; diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 6315dd8..e50163f 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -18,6 +18,12 @@ pub struct DriaP2PProtocol { /// which is mandatory for a `StreamProtocol`. /// pub kademlia: StreamProtocol, + /// Request-response protocol, must match with other peers in the network. + /// + /// This is usually `/{name}/rr/{version}`, notice the `/` at the start + /// which is mandatory for a `StreamProtocol`. + /// + pub request_response: StreamProtocol, } impl std::fmt::Display for DriaP2PProtocol { @@ -38,12 +44,14 @@ impl DriaP2PProtocol { pub fn new(name: &str, version: &str) -> Self { let identity = format!("{}/{}", name, version); let kademlia = format!("/{}/kad/{}", name, version); + let request_response = format!("/{}/rr/{}", name, version); Self { name: name.to_string(), version: version.to_string(), identity, kademlia: StreamProtocol::try_from_owned(kademlia).unwrap(), // guaranteed to unwrap + request_response: StreamProtocol::try_from_owned(request_response).unwrap(), // guaranteed to unwrap } } @@ -69,6 +77,11 @@ impl DriaP2PProtocol { self.kademlia.clone() } + /// Returns the request-response protocol, e.g. `/dria/rr/0.2`. + pub fn request_response(&self) -> StreamProtocol { + self.request_response.clone() + } + /// Returns `true` if the given protocol has a matching prefix with out Kademlia protocol. /// Otherwise, returns `false`. pub fn is_common_kademlia(&self, protocol: &StreamProtocol) -> bool { diff --git a/p2p/src/reqres.rs b/p2p/src/reqres.rs new file mode 100644 index 0000000..f52fecd --- /dev/null +++ b/p2p/src/reqres.rs @@ -0,0 +1,14 @@ +use serde::{Deserialize, Serialize}; + +/// Request-Response protocol, request type. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ReqresRequest { + pub request_id: String, +} + +/// Request-Response protocol, response type. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ReqresResponse { + pub spec: String, + pub location: String, +} From 457777361bc6716aedf8d420f6c1217fb8c0e35a Mon Sep 17 00:00:00 2001 From: erhant Date: Mon, 30 Dec 2024 14:43:55 +0300 Subject: [PATCH 02/13] added request channel & handler, todo specs --- Cargo.lock | 11 ++++- compute/src/lib.rs | 6 +++ compute/src/node.rs | 28 +++++++++-- compute/src/responders/mod.rs | 13 +++++ compute/src/responders/specs.rs | 29 ++++++++++++ monitor/src/main.rs | 2 +- p2p/Cargo.toml | 2 +- p2p/src/behaviour.rs | 9 ++-- p2p/src/client.rs | 84 +++++++++++++++++++++------------ p2p/src/commands.rs | 8 +++- p2p/src/lib.rs | 2 - p2p/src/reqres.rs | 14 ------ p2p/tests/listen_test.rs | 2 +- 13 files changed, 150 insertions(+), 60 deletions(-) create mode 100644 compute/src/responders/mod.rs create mode 100644 compute/src/responders/specs.rs delete mode 100644 p2p/src/reqres.rs diff --git a/Cargo.lock b/Cargo.lock index 36d4fdd..fd977c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -464,6 +464,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b02b629252fe8ef6460461409564e2c21d0c8e77e0944f3d189ff06c4e932ad" +[[package]] +name = "cbor4ii" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "472931dd4dfcc785075b09be910147f9c6258883fc4591d0dac6116392b2daa6" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.2.4" @@ -2829,6 +2838,7 @@ version = "0.27.0" source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=7ce9f9e#7ce9f9e65ddbe1fdac3913f0f3c1d94edc1de25e" dependencies = [ "async-trait", + "cbor4ii", "futures", "futures-bounded", "futures-timer", @@ -2837,7 +2847,6 @@ dependencies = [ "libp2p-swarm", "rand 0.8.5", "serde", - "serde_json", "smallvec", "tracing", "void", diff --git a/compute/src/lib.rs b/compute/src/lib.rs index c399688..51d5161 100644 --- a/compute/src/lib.rs +++ b/compute/src/lib.rs @@ -1,5 +1,11 @@ pub mod config; + +/// Gossipsub message handlers. pub mod handlers; + +// Request-response handlers. +pub mod responders; + pub mod node; pub mod payloads; pub mod utils; diff --git a/compute/src/node.rs b/compute/src/node.rs index 872e5ad..4252d31 100644 --- a/compute/src/node.rs +++ b/compute/src/node.rs @@ -1,6 +1,7 @@ use dkn_p2p::{ libp2p::{ gossipsub::{Message, MessageAcceptance, MessageId}, + request_response::ResponseChannel, PeerId, }, DriaNodes, DriaP2PClient, DriaP2PCommander, DriaP2PProtocol, @@ -33,6 +34,8 @@ pub struct DriaComputeNode { pub p2p: DriaP2PCommander, /// Gossipsub message receiver, used by peer-to-peer client in a separate thread. message_rx: mpsc::Receiver<(PeerId, MessageId, Message)>, + /// Request-response request receiver. + request_rx: mpsc::Receiver<(Vec, ResponseChannel>)>, /// Publish receiver to receive messages to be published, publish_rx: mpsc::Receiver, /// Workflow transmitter to send batchable tasks. @@ -78,7 +81,7 @@ impl DriaComputeNode { log::info!("Using identity: {}", protocol); // create p2p client - let (p2p_client, p2p_commander, message_rx) = DriaP2PClient::new( + let (p2p_client, p2p_commander, message_rx, request_rx) = DriaP2PClient::new( keypair, config.p2p_listen_addr.clone(), &available_nodes, @@ -111,8 +114,9 @@ impl DriaComputeNode { config, p2p: p2p_commander, dria_nodes: available_nodes, - message_rx, publish_rx, + message_rx, + request_rx, workflow_batch_tx, workflow_single_tx, pending_tasks_single: HashSet::new(), @@ -312,6 +316,12 @@ impl DriaComputeNode { } } + /// Handles a request-response request received from the network. + /// + /// Internally, the data is expected to be some JSON serialized data that is expected to be parsed and handled. + async fn handle_request(&mut self, data: Vec, channel: ResponseChannel>) { + // TODO: !!! + } /// Runs the main loop of the compute node. /// This method is not expected to return until cancellation occurs for the given token. pub async fn run(&mut self, cancellation: CancellationToken) -> Result<()> { @@ -331,8 +341,6 @@ impl DriaComputeNode { loop { tokio::select! { - // prioritize the branches in the order below - biased; // a Workflow message to be published is received from the channel // this is expected to be sent by the workflow worker @@ -375,7 +383,17 @@ impl DriaComputeNode { log::error!("Error validating message {}: {:?}", message_id, e); } } else { - log::error!("Message channel closed unexpectedly."); + log::error!("message_rx channel closed unexpectedly."); + break; + }; + }, + // a Response message is received from the channel + // this is expected to be sent by the p2p client + request_msg_opt = self.request_rx.recv() => { + if let Some((data, channel)) = request_msg_opt { + self.handle_request(data, channel).await; + } else { + log::error!("request_rx channel closed unexpectedly."); break; }; }, diff --git a/compute/src/responders/mod.rs b/compute/src/responders/mod.rs new file mode 100644 index 0000000..dd10f98 --- /dev/null +++ b/compute/src/responders/mod.rs @@ -0,0 +1,13 @@ +mod specs; +use eyre::Context; +use serde::{de::DeserializeOwned, Serialize}; +pub use specs::SpecResponder; + +pub trait IsResponder { + type Request: Serialize + DeserializeOwned; + type Response: Serialize + DeserializeOwned; + + fn try_parse_request<'a>(data: &[u8]) -> eyre::Result { + serde_json::from_slice(data).wrap_err("could not parse request") + } +} diff --git a/compute/src/responders/specs.rs b/compute/src/responders/specs.rs new file mode 100644 index 0000000..17a48f7 --- /dev/null +++ b/compute/src/responders/specs.rs @@ -0,0 +1,29 @@ +use super::IsResponder; +use eyre::Result; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct Request { + /// UUID of the specs request, prevents replay attacks. + request_id: String, +} + +#[derive(Serialize, Deserialize)] +pub struct Response { + request_id: String, + response: String, +} + +pub struct SpecResponder; + +impl IsResponder for SpecResponder { + type Request = Request; + type Response = Response; +} + +impl SpecResponder { + pub fn respond(request: Request) -> Response { + // TODO: collect specs + } +} diff --git a/monitor/src/main.rs b/monitor/src/main.rs index bf10c3d..f68e530 100644 --- a/monitor/src/main.rs +++ b/monitor/src/main.rs @@ -30,7 +30,7 @@ async fn main() -> eyre::Result<()> { log::info!("Listen Address: {}", listen_addr); let keypair = Keypair::generate_secp256k1(); log::info!("PeerID: {}", keypair.public().to_peer_id()); - let (client, commander, msg_rx) = DriaP2PClient::new( + let (client, commander, msg_rx, _) = DriaP2PClient::new( keypair, listen_addr, &nodes, diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 587f7bb..594079e 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -23,7 +23,7 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "7ce9f9 "noise", "macros", "request-response", - "json", + "cbor", "tcp", "yamux", "quic", diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index 6f135d0..ea1d0ca 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -10,8 +10,6 @@ use libp2p::{ autonat, connection_limits, dcutr, gossipsub, identify, kad, relay, request_response, }; -use crate::reqres; - #[derive(libp2p::swarm::NetworkBehaviour)] pub struct DriaBehaviour { pub relay: relay::client::Behaviour, @@ -21,8 +19,7 @@ pub struct DriaBehaviour { pub autonat: autonat::Behaviour, pub dcutr: dcutr::Behaviour, pub connection_limits: connection_limits::Behaviour, - pub request_response: - request_response::json::Behaviour, + pub request_response: request_response::cbor::Behaviour, Vec>, } impl DriaBehaviour { @@ -51,10 +48,12 @@ impl DriaBehaviour { } /// Configures the request-response behaviour for the node. +/// +/// The protocol supports bytes only, #[inline] fn create_request_response_behaviour( protocol_name: StreamProtocol, -) -> request_response::json::Behaviour { +) -> request_response::cbor::Behaviour, Vec> { // TODO: use json instead here? use request_response::{Behaviour, ProtocolSupport}; diff --git a/p2p/src/client.rs b/p2p/src/client.rs index b99afae..f1f5023 100644 --- a/p2p/src/client.rs +++ b/p2p/src/client.rs @@ -2,7 +2,7 @@ use eyre::Result; use libp2p::futures::StreamExt; use libp2p::gossipsub::{Message, MessageId}; use libp2p::kad::{GetClosestPeersError, GetClosestPeersOk, QueryResult}; -use libp2p::request_response; +use libp2p::request_response::{self, ResponseChannel}; use libp2p::swarm::SwarmEvent; use libp2p::{autonat, gossipsub, identify, kad, multiaddr::Protocol, noise, tcp, yamux}; use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder}; @@ -22,8 +22,10 @@ pub struct DriaP2PClient { swarm: Swarm, /// Dria protocol, used for identifying the client. protocol: DriaP2PProtocol, - /// Gossipsub message sender. + /// Gossipsub protoocol, gossip message sender. msg_tx: mpsc::Sender<(PeerId, MessageId, Message)>, + /// Request-response protocol, request sender. + req_tx: mpsc::Sender<(Vec, ResponseChannel>)>, /// Command receiver. cmd_rx: mpsc::Receiver, } @@ -53,6 +55,7 @@ impl DriaP2PClient { DriaP2PClient, DriaP2PCommander, mpsc::Receiver<(PeerId, MessageId, Message)>, + mpsc::Receiver<(Vec, ResponseChannel>)>, )> { // this is our peerId let node_peerid = keypair.public().to_peer_id(); @@ -139,14 +142,16 @@ impl DriaP2PClient { // create p2p client itself let (msg_tx, msg_rx) = mpsc::channel(MSG_CHANNEL_BUFSIZE); + let (req_tx, req_rx) = mpsc::channel(MSG_CHANNEL_BUFSIZE); let client = Self { swarm, protocol, msg_tx, + req_tx, cmd_rx, }; - Ok((client, commander, msg_rx)) + Ok((client, commander, msg_rx, req_rx)) } /// Waits for swarm events and Node commands at the same time. @@ -208,6 +213,19 @@ impl DriaP2PClient { .publish(gossipsub::IdentTopic::new(topic), data), ); } + DriaP2PCommand::Respond { + data, + channel, + sender, + } => { + let _ = sender.send( + self.swarm + .behaviour_mut() + .request_response + .send_response(channel, data) + .map_err(|_| eyre::eyre!("could not send response, channel is closed?")), + ); + } DriaP2PCommand::ValidateMessage { msg_id, propagation_source, @@ -278,33 +296,10 @@ impl DriaP2PClient { message, })) => { if let Err(e) = self.msg_tx.send((peer_id, message_id, message)).await { - log::error!("Error sending message: {:?}", e); + log::error!("Could not send Gossipsub message: {:?}", e); } } - // kademlia events - SwarmEvent::Behaviour(DriaBehaviourEvent::Kademlia( - kad::Event::OutboundQueryProgressed { - result: QueryResult::GetClosestPeers(result), - .. - }, - )) => self.handle_closest_peers_result(result), - - // identify events - SwarmEvent::Behaviour(DriaBehaviourEvent::Identify(identify::Event::Received { - peer_id, - info, - .. - })) => self.handle_identify_event(peer_id, info), - - // autonat events - SwarmEvent::Behaviour(DriaBehaviourEvent::Autonat(autonat::Event::StatusChanged { - old, - new, - })) => { - log::warn!("AutoNAT status changed from {:?} to {:?}", old, new); - } - // request-response events SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( request_response::Event::Message { message, .. }, @@ -312,13 +307,21 @@ impl DriaP2PClient { request_response::Message::Request { request, channel, .. } => { - // TODO: handle request + if let Err(e) = self.req_tx.send((request, channel)).await { + log::error!("Could not send request-response request: {:?}", e); + } } request_response::Message::Response { request_id, response, } => { - // TODO: handle response + // while we support the protocol, we dont really make any requests + // TODO: p2p crate should support this + log::warn!( + "Unexpected response message with request_id {}: {:?}", + request_id, + response + ); } }, SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( @@ -359,6 +362,29 @@ impl DriaP2PClient { ); } + // kademlia events + SwarmEvent::Behaviour(DriaBehaviourEvent::Kademlia( + kad::Event::OutboundQueryProgressed { + result: QueryResult::GetClosestPeers(result), + .. + }, + )) => self.handle_closest_peers_result(result), + + // identify events + SwarmEvent::Behaviour(DriaBehaviourEvent::Identify(identify::Event::Received { + peer_id, + info, + .. + })) => self.handle_identify_event(peer_id, info), + + // autonat events + SwarmEvent::Behaviour(DriaBehaviourEvent::Autonat(autonat::Event::StatusChanged { + old, + new, + })) => { + log::warn!("AutoNAT status changed from {:?} to {:?}", old, new); + } + // log listen addreses SwarmEvent::NewListenAddr { address, .. } => { log::warn!("Local node is listening on {}", address); diff --git a/p2p/src/commands.rs b/p2p/src/commands.rs index d6c8869..d502647 100644 --- a/p2p/src/commands.rs +++ b/p2p/src/commands.rs @@ -1,5 +1,5 @@ use eyre::{Context, Result}; -use libp2p::{gossipsub, kad, swarm, Multiaddr, PeerId}; +use libp2p::{gossipsub, kad, request_response, swarm, Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; use crate::DriaP2PProtocol; @@ -43,6 +43,12 @@ pub enum DriaP2PCommand { data: Vec, sender: oneshot::Sender>, }, + /// Respond to a request-response message. + Respond { + data: Vec, + channel: request_response::ResponseChannel>, + sender: oneshot::Sender>, + }, /// Validates a GossipSub message for propagation, returns whether the message existed in cache. /// /// - `Accept`: Accept the message and propagate it. diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 829aea8..74b8913 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -20,5 +20,3 @@ pub use nodes::DriaNodes; // re-exports pub use libp2p; pub use libp2p_identity; - -mod reqres; diff --git a/p2p/src/reqres.rs b/p2p/src/reqres.rs deleted file mode 100644 index f52fecd..0000000 --- a/p2p/src/reqres.rs +++ /dev/null @@ -1,14 +0,0 @@ -use serde::{Deserialize, Serialize}; - -/// Request-Response protocol, request type. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ReqresRequest { - pub request_id: String, -} - -/// Request-Response protocol, response type. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ReqresResponse { - pub spec: String, - pub location: String, -} diff --git a/p2p/tests/listen_test.rs b/p2p/tests/listen_test.rs index e4e70ef..400ea76 100644 --- a/p2p/tests/listen_test.rs +++ b/p2p/tests/listen_test.rs @@ -22,7 +22,7 @@ async fn test_listen_topic_once() -> Result<()> { .with_relay_nodes(["/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa".parse()?]); // spawn P2P client in another task - let (client, mut commander, mut msg_rx) = DriaP2PClient::new( + let (client, mut commander, mut msg_rx, _) = DriaP2PClient::new( Keypair::generate_secp256k1(), listen_addr, &nodes, From 49b9abb9f271e0bad3831bc504fda2a27d27e45d Mon Sep 17 00:00:00 2001 From: erhant Date: Mon, 30 Dec 2024 20:56:08 +0300 Subject: [PATCH 03/13] added spec collector --- Cargo.lock | 597 +++++++++++++++++++++++++++++++- compute/Cargo.toml | 9 + compute/src/node.rs | 15 +- compute/src/responders/specs.rs | 14 +- compute/src/utils/mod.rs | 3 + compute/src/utils/specs.rs | 75 ++++ workflows/src/bin/tps.rs | 1 + 7 files changed, 696 insertions(+), 18 deletions(-) create mode 100644 compute/src/utils/specs.rs diff --git a/Cargo.lock b/Cargo.lock index fd977c8..2eeef19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,18 @@ name = "arrayvec" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +dependencies = [ + "serde", +] + +[[package]] +name = "ash" +version = "0.38.0+1.3.281" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bb44936d800fea8f016d7f2311c6a4f97aebd5dc86f09906139ec848cf3a46f" +dependencies = [ + "libloading", +] [[package]] name = "asn1-rs" @@ -374,7 +386,16 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" dependencies = [ - "bit-vec", + "bit-vec 0.6.3", +] + +[[package]] +name = "bit-set" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3" +dependencies = [ + "bit-vec 0.8.0", ] [[package]] @@ -383,6 +404,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bit-vec" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" + [[package]] name = "bitflags" version = "1.3.2" @@ -404,6 +431,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "block" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d8c1fef690941d3e7788d328517591fecc684c084084702d6ff1641e993699a" + [[package]] name = "block-buffer" version = "0.9.0" @@ -437,6 +470,12 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "bytemuck" +version = "1.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" + [[package]] name = "byteorder" version = "1.5.0" @@ -494,6 +533,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "cfg_aliases" version = "0.2.1" @@ -549,6 +594,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "codespan-reporting" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" +dependencies = [ + "termcolor", + "unicode-width", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -602,6 +657,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core-graphics-types" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45390e6114f68f718cc7a830514a96f903cccd70d02a8f6d9f643ac4ba45afaf" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "libc", +] + [[package]] name = "core2" version = "0.4.0" @@ -949,6 +1015,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "directories" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a49173b84e034382284f27f1af4dcbbd231ffa358c0fe316541a7337f376a35" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -959,6 +1034,18 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -1001,17 +1088,20 @@ dependencies = [ "log", "openssl", "port_check", + "public-ip-address", "rand 0.8.5", "reqwest 0.12.9", "serde", "serde_json", "sha2 0.10.8", "sha3", + "sysinfo 0.33.1", "tokio 1.42.0", "tokio-util 0.7.13", "url", "urlencoding", "uuid", + "wgpu", ] [[package]] @@ -1069,11 +1159,20 @@ dependencies = [ "reqwest 0.12.9", "serde", "serde_json", - "sysinfo", + "sysinfo 0.32.1", "tokio 1.42.0", "tokio-util 0.7.13", ] +[[package]] +name = "document-features" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb6969eaabd2421f8a2775cfd2471a2b634372b4a25d41e3bd647b79912850a0" +dependencies = [ + "litrs", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -1327,7 +1426,28 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" dependencies = [ - "foreign-types-shared", + "foreign-types-shared 0.1.1", +] + +[[package]] +name = "foreign-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" +dependencies = [ + "foreign-types-macros", + "foreign-types-shared 0.3.1", +] + +[[package]] +name = "foreign-types-macros" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", ] [[package]] @@ -1336,6 +1456,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "foreign-types-shared" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1596,6 +1722,89 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "gl_generator" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a95dfc23a2b4a9a2f5ab41d194f8bfda3cabec42af4e39f08c339eb2a0c124d" +dependencies = [ + "khronos_api", + "log", + "xml-rs", +] + +[[package]] +name = "glow" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d51fa363f025f5c111e03f13eda21162faeacb6911fe8caa0c0349f9cf0c4483" +dependencies = [ + "js-sys", + "slotmap", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "glutin_wgl_sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a4e1951bbd9434a81aa496fe59ccc2235af3820d27b85f9314e279609211e2c" +dependencies = [ + "gl_generator", +] + +[[package]] +name = "gpu-alloc" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbcd2dba93594b227a1f57ee09b8b9da8892c34d55aa332e034a228d0fe6a171" +dependencies = [ + "bitflags 2.6.0", + "gpu-alloc-types", +] + +[[package]] +name = "gpu-alloc-types" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98ff03b468aa837d70984d55f5d3f846f6ec31fe34bbb97c4f85219caeee1ca4" +dependencies = [ + "bitflags 2.6.0", +] + +[[package]] +name = "gpu-allocator" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c151a2a5ef800297b4e79efa4f4bec035c5f51d5ae587287c9b952bdf734cacd" +dependencies = [ + "log", + "presser", + "thiserror 1.0.69", + "windows 0.53.0", +] + +[[package]] +name = "gpu-descriptor" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf29e94d6d243368b7a56caa16bc213e4f9f8ed38c4d9557069527b5d5281ca" +dependencies = [ + "bitflags 2.6.0", + "gpu-descriptor-types", + "hashbrown 0.15.2", +] + +[[package]] +name = "gpu-descriptor-types" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdf242682df893b86f33a73828fb09ca4b2d3bb6cc95249707fc684d27484b91" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "h2" version = "0.2.7" @@ -1722,6 +1931,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" +[[package]] +name = "hexf-parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfa686283ad6dd069f105e5ab091b04c62850d3e4cf5d67debad1933f55023df" + [[package]] name = "hickory-proto" version = "0.24.2" @@ -2412,6 +2627,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "js-sys" version = "0.3.76" @@ -2441,6 +2662,23 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "khronos-egl" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aae1df220ece3c0ada96b8153459b67eebe9ae9212258bb0134ae60416fdf76" +dependencies = [ + "libc", + "libloading", + "pkg-config", +] + +[[package]] +name = "khronos_api" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc" + [[package]] name = "lazy_static" version = "1.5.0" @@ -2453,6 +2691,16 @@ version = "0.2.168" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" +[[package]] +name = "libloading" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +dependencies = [ + "cfg-if 1.0.0", + "windows-targets 0.52.6", +] + [[package]] name = "libp2p" version = "0.54.1" @@ -3026,6 +3274,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" +[[package]] +name = "litrs" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" + [[package]] name = "lock_api" version = "0.4.12" @@ -3072,6 +3326,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" +[[package]] +name = "malloc_buf" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62bb907fe88d54d8d9ce32a3cceab4218ed2f6b7d35617cafe9adf84e43919cb" +dependencies = [ + "libc", +] + [[package]] name = "markup5ever" version = "0.10.1" @@ -3124,12 +3387,38 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "maybe-async" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "memchr" version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "metal" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ecfd3296f8c56b7c1f6fbac3c71cefa9d78ce009850c45000015f206dc7fa21" +dependencies = [ + "bitflags 2.6.0", + "block", + "core-graphics-types", + "foreign-types 0.5.0", + "log", + "objc", + "paste", +] + [[package]] name = "mime" version = "0.3.17" @@ -3256,6 +3545,27 @@ dependencies = [ "unsigned-varint", ] +[[package]] +name = "naga" +version = "23.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "364f94bc34f61332abebe8cad6f6cd82a5b65cff22c828d05d0968911462ca4f" +dependencies = [ + "arrayvec", + "bit-set 0.8.0", + "bitflags 2.6.0", + "cfg_aliases 0.1.1", + "codespan-reporting", + "hexf-parse", + "indexmap 2.7.0", + "log", + "rustc-hash 1.1.0", + "spirv", + "termcolor", + "thiserror 1.0.69", + "unicode-xid", +] + [[package]] name = "native-tls" version = "0.2.12" @@ -3273,6 +3583,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ndk-sys" +version = "0.5.0+25.2.9519653" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c196769dd60fd4f363e11d948139556a344e79d451aeb2fa2fd040738ef7691" +dependencies = [ + "jni-sys", +] + [[package]] name = "net2" version = "0.2.39" @@ -3441,6 +3760,15 @@ dependencies = [ "libc", ] +[[package]] +name = "objc" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915b1b472bc21c53464d6c8461c9d3af805ba1ef837e1cac254428f4a77177b1" +dependencies = [ + "malloc_buf", +] + [[package]] name = "object" version = "0.36.5" @@ -3540,7 +3868,7 @@ checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" dependencies = [ "bitflags 2.6.0", "cfg-if 1.0.0", - "foreign-types", + "foreign-types 0.3.2", "libc", "once_cell", "openssl-macros", @@ -3586,6 +3914,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "parking" version = "2.2.1" @@ -3900,6 +4234,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" +[[package]] +name = "presser" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8cf8e6a8aa66ce33f63993ffc4ea4271eb5b0530a9002db8455ea6050c77bfa" + [[package]] name = "pretty_env_logger" version = "0.5.0" @@ -3939,6 +4279,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "profiling" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afbdc74edc00b6f6a218ca6a5364d6226a259d4b8ea1af4a0ea063f27e179f4d" + [[package]] name = "prometheus-client" version = "0.22.3" @@ -3962,6 +4308,21 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "public-ip-address" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "761cf3bcffbc326e841fcbaf0849759dc2e30876b89c454e0991f20ceca40f4c" +dependencies = [ + "directories", + "log", + "maybe-async", + "reqwest 0.12.9", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -4000,7 +4361,7 @@ dependencies = [ "pin-project-lite 0.2.15", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.1.0", "rustls", "socket2 0.5.8", "thiserror 2.0.8", @@ -4018,7 +4379,7 @@ dependencies = [ "getrandom 0.2.15", "rand 0.8.5", "ring 0.17.8", - "rustc-hash", + "rustc-hash 2.1.0", "rustls", "rustls-pki-types", "slab", @@ -4034,7 +4395,7 @@ version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c40286217b4ba3a71d644d752e6a0b71f13f1b6a2c5311acfcbe0c2418ed904" dependencies = [ - "cfg_aliases", + "cfg_aliases 0.2.1", "libc", "once_cell", "socket2 0.5.8", @@ -4132,6 +4493,18 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "range-alloc" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8a99fddc9f0ba0a85884b8d14e3592853e787d581ca1816c91349b10e4eeab" + +[[package]] +name = "raw-window-handle" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539" + [[package]] name = "rayon" version = "1.10.0" @@ -4213,6 +4586,12 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "renderdoc-sys" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b30a45b0cd0bcca8037f3d0dc3421eaf95327a17cad11964fb8179b4fc4832" + [[package]] name = "reqwest" version = "0.10.10" @@ -4380,6 +4759,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.0" @@ -4581,7 +4966,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ee061f90afcc8678bef7a78d0d121683f0ba753f740ff7005f833ec445876b7" dependencies = [ - "bit-set", + "bit-set 0.5.3", "html5ever 0.25.2", "markup5ever_rcdom", ] @@ -4789,6 +5174,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slotmap" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbff4acf519f630b3a3ddcfaea6c06b42174d9a44bc70c620e9ed1649d58b82a" +dependencies = [ + "version_check", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -4845,6 +5239,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spirv" +version = "0.3.0+sdk-1.3.268.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eda41003dc44290527a59b13432d4a0379379fa074b70174882adfbdfd917844" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "spki" version = "0.7.3" @@ -4983,6 +5386,20 @@ dependencies = [ "windows 0.57.0", ] +[[package]] +name = "sysinfo" +version = "0.33.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fc858248ea01b66f19d8e8a6d55f41deaf91e9d495246fd01368d99935c6c01" +dependencies = [ + "core-foundation-sys", + "libc", + "memchr", + "ntapi", + "rayon", + "windows 0.57.0", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -5379,6 +5796,12 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "universal-hash" version = "0.5.1" @@ -5613,6 +6036,115 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "wgpu" +version = "23.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80f70000db37c469ea9d67defdc13024ddf9a5f1b89cb2941b812ad7cde1735a" +dependencies = [ + "arrayvec", + "cfg_aliases 0.1.1", + "document-features", + "js-sys", + "log", + "naga", + "parking_lot", + "profiling", + "raw-window-handle", + "serde", + "smallvec", + "static_assertions", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "wgpu-core", + "wgpu-hal", + "wgpu-types", +] + +[[package]] +name = "wgpu-core" +version = "23.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d63c3c478de8e7e01786479919c8769f62a22eec16788d8c2ac77ce2c132778a" +dependencies = [ + "arrayvec", + "bit-vec 0.8.0", + "bitflags 2.6.0", + "cfg_aliases 0.1.1", + "document-features", + "indexmap 2.7.0", + "log", + "naga", + "once_cell", + "parking_lot", + "profiling", + "raw-window-handle", + "rustc-hash 1.1.0", + "serde", + "smallvec", + "thiserror 1.0.69", + "wgpu-hal", + "wgpu-types", +] + +[[package]] +name = "wgpu-hal" +version = "23.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89364b8a0b211adc7b16aeaf1bd5ad4a919c1154b44c9ce27838213ba05fd821" +dependencies = [ + "android_system_properties", + "arrayvec", + "ash", + "bit-set 0.8.0", + "bitflags 2.6.0", + "block", + "bytemuck", + "cfg_aliases 0.1.1", + "core-graphics-types", + "glow", + "glutin_wgl_sys", + "gpu-alloc", + "gpu-allocator", + "gpu-descriptor", + "js-sys", + "khronos-egl", + "libc", + "libloading", + "log", + "metal", + "naga", + "ndk-sys", + "objc", + "once_cell", + "parking_lot", + "profiling", + "range-alloc", + "raw-window-handle", + "renderdoc-sys", + "rustc-hash 1.1.0", + "smallvec", + "thiserror 1.0.69", + "wasm-bindgen", + "web-sys", + "wgpu-types", + "windows 0.58.0", + "windows-core 0.58.0", +] + +[[package]] +name = "wgpu-types" +version = "23.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "610f6ff27778148c31093f3b03abc4840f9636d58d597ca2f5977433acfe0068" +dependencies = [ + "bitflags 2.6.0", + "js-sys", + "serde", + "web-sys", +] + [[package]] name = "widestring" version = "1.1.0" @@ -5682,6 +6214,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" +dependencies = [ + "windows-core 0.58.0", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -5707,12 +6249,25 @@ version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" dependencies = [ - "windows-implement", - "windows-interface", + "windows-implement 0.57.0", + "windows-interface 0.57.0", "windows-result 0.1.2", "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" +dependencies = [ + "windows-implement 0.58.0", + "windows-interface 0.58.0", + "windows-result 0.2.0", + "windows-strings", + "windows-targets 0.52.6", +] + [[package]] name = "windows-implement" version = "0.57.0" @@ -5724,6 +6279,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "windows-implement" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "windows-interface" version = "0.57.0" @@ -5735,6 +6301,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "windows-interface" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "windows-registry" version = "0.2.0" diff --git a/compute/Cargo.toml b/compute/Cargo.toml index 14cbfd2..08c4e73 100644 --- a/compute/Cargo.toml +++ b/compute/Cargo.toml @@ -42,11 +42,20 @@ sha2 = "0.10.8" sha3 = "0.10.8" fastbloom-rs = "0.5.9" +# machine diagnostics +# system info +sysinfo = "0.33.1" +# gpu info +wgpu = { version = "23.0.1", features = ["serde"] } +# public ip +public-ip-address = "0.3.2" + # dria subcrates dkn-p2p = { path = "../p2p" } dkn-utils = { path = "../utils" } dkn-workflows = { path = "../workflows" } + # vendor OpenSSL so that its easier to build cross-platform packages [dependencies.openssl] version = "*" diff --git a/compute/src/node.rs b/compute/src/node.rs index 4252d31..189eb0e 100644 --- a/compute/src/node.rs +++ b/compute/src/node.rs @@ -14,7 +14,8 @@ use tokio_util::{either::Either, sync::CancellationToken}; use crate::{ config::*, handlers::*, - utils::{crypto::secret_to_keypair, refresh_dria_nodes, DriaMessage}, + responders::{IsResponder, SpecResponder}, + utils::{crypto::secret_to_keypair, refresh_dria_nodes, DriaMessage, SpecCollector}, workers::workflow::{WorkflowsWorker, WorkflowsWorkerInput, WorkflowsWorkerOutput}, DRIA_COMPUTE_NODE_VERSION, }; @@ -50,6 +51,8 @@ pub struct DriaComputeNode { completed_tasks_single: usize, /// Completed batch tasks count completed_tasks_batch: usize, + /// Spec collector for the node. + spec_collector: SpecCollector, } impl DriaComputeNode { @@ -123,6 +126,7 @@ impl DriaComputeNode { pending_tasks_batch: HashSet::new(), completed_tasks_single: 0, completed_tasks_batch: 0, + spec_collector: SpecCollector::new(), }, p2p_client, workflows_batch_worker, @@ -319,8 +323,13 @@ impl DriaComputeNode { /// Handles a request-response request received from the network. /// /// Internally, the data is expected to be some JSON serialized data that is expected to be parsed and handled. - async fn handle_request(&mut self, data: Vec, channel: ResponseChannel>) { - // TODO: !!! + async fn handle_request(&mut self, data: Vec, mut channel: ResponseChannel>) { + if let Ok(req) = SpecResponder::try_parse_request(&data) { + let response = SpecResponder::respond(req, self.spec_collector.collect().await); + // TODO: send response + } else { + log::warn!("Received unknown request: {:?}", data); + } } /// Runs the main loop of the compute node. /// This method is not expected to return until cancellation occurs for the given token. diff --git a/compute/src/responders/specs.rs b/compute/src/responders/specs.rs index 17a48f7..83a159e 100644 --- a/compute/src/responders/specs.rs +++ b/compute/src/responders/specs.rs @@ -1,6 +1,6 @@ -use super::IsResponder; -use eyre::Result; +use crate::utils::Specs; +use super::IsResponder; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] @@ -12,7 +12,8 @@ pub struct Request { #[derive(Serialize, Deserialize)] pub struct Response { request_id: String, - response: String, + #[serde(flatten)] + specs: Specs, } pub struct SpecResponder; @@ -23,7 +24,10 @@ impl IsResponder for SpecResponder { } impl SpecResponder { - pub fn respond(request: Request) -> Response { - // TODO: collect specs + pub fn respond(request: Request, specs: Specs) -> Response { + Response { + request_id: request.request_id, + specs, + } } } diff --git a/compute/src/utils/mod.rs b/compute/src/utils/mod.rs index 9485dad..c7b4bae 100644 --- a/compute/src/utils/mod.rs +++ b/compute/src/utils/mod.rs @@ -9,3 +9,6 @@ pub use misc::*; mod nodes; pub use nodes::*; + +mod specs; +pub use specs::*; diff --git a/compute/src/utils/specs.rs b/compute/src/utils/specs.rs new file mode 100644 index 0000000..3729c6e --- /dev/null +++ b/compute/src/utils/specs.rs @@ -0,0 +1,75 @@ +use serde::{Deserialize, Serialize}; +use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind}; + +/// Machine info & location. +#[derive(Debug, Serialize, Deserialize)] +pub struct Specs { + /// Total memory in bytes + total_mem: u64, + /// Free memory in bytes + free_mem: u64, + /// Number of physical CPU cores. + num_cpus: Option, + cpu_usage: f32, + os: String, + arch: String, + family: String, + gpus: Vec, + lookup: Option, +} + +pub struct SpecCollector { + /// System information object, this is expected to be created only once + /// as per the [docs](https://github.com/GuillaumeGomez/sysinfo?tab=readme-ov-file#good-practice--performance-tips). + system: sysinfo::System, + /// GPU adapter infos, showing information about the available GPUs. + gpus: Vec, +} + +impl SpecCollector { + pub fn new() -> Self { + SpecCollector { + system: sysinfo::System::new_with_specifics(Self::get_refresh_specifics()), + gpus: wgpu::Instance::default() + .enumerate_adapters(wgpu::Backends::all()) + .into_iter() + .map(|a| a.get_info()) + .collect(), + } + } + + #[inline(always)] + fn get_refresh_specifics() -> RefreshKind { + RefreshKind::nothing() + .with_cpu(CpuRefreshKind::everything()) + .with_memory(MemoryRefreshKind::everything()) + } + + pub async fn collect(&mut self) -> Specs { + self.system.refresh_specifics(Self::get_refresh_specifics()); + + Specs { + total_mem: self.system.total_memory(), + free_mem: self.system.free_memory(), + num_cpus: self.system.physical_core_count(), + cpu_usage: self.system.global_cpu_usage(), + os: std::env::consts::OS.to_string(), + arch: std::env::consts::ARCH.to_string(), + family: std::env::consts::FAMILY.to_string(), + gpus: self.gpus.clone(), + lookup: public_ip_address::perform_lookup(None).await.ok(), + } + } +} +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + #[ignore = "run manually"] + async fn test_print_specs() { + let mut spec_collector = SpecCollector::new(); + let specs = spec_collector.collect().await; + println!("{}", serde_json::to_string_pretty(&specs).unwrap()); + } +} diff --git a/workflows/src/bin/tps.rs b/workflows/src/bin/tps.rs index 8284278..6e889ec 100644 --- a/workflows/src/bin/tps.rs +++ b/workflows/src/bin/tps.rs @@ -113,6 +113,7 @@ async fn main() { * 1_000_000_000f64; // add row to table + // FIXME: this should be updated on each iteration table.add_row(Row::new(vec![ Cell::new(&model.to_string()), Cell::new(&tps.to_string()), From 9d42cc133f3260b71df1d7bc8389dc9731609a7e Mon Sep 17 00:00:00 2001 From: erhant Date: Mon, 30 Dec 2024 21:01:59 +0300 Subject: [PATCH 04/13] added response handler --- compute/src/node.rs | 21 +++++++++++++++------ p2p/src/commands.rs | 22 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/compute/src/node.rs b/compute/src/node.rs index 189eb0e..64f0a67 100644 --- a/compute/src/node.rs +++ b/compute/src/node.rs @@ -323,14 +323,21 @@ impl DriaComputeNode { /// Handles a request-response request received from the network. /// /// Internally, the data is expected to be some JSON serialized data that is expected to be parsed and handled. - async fn handle_request(&mut self, data: Vec, mut channel: ResponseChannel>) { - if let Ok(req) = SpecResponder::try_parse_request(&data) { + async fn handle_request( + &mut self, + data: Vec, + channel: ResponseChannel>, + ) -> Result<()> { + let response_data = if let Ok(req) = SpecResponder::try_parse_request(&data) { let response = SpecResponder::respond(req, self.spec_collector.collect().await); - // TODO: send response + serde_json::to_vec(&response).unwrap() } else { - log::warn!("Received unknown request: {:?}", data); - } + return Err(eyre::eyre!("Received unknown request: {:?}", data)); + }; + + self.p2p.respond(response_data, channel).await } + /// Runs the main loop of the compute node. /// This method is not expected to return until cancellation occurs for the given token. pub async fn run(&mut self, cancellation: CancellationToken) -> Result<()> { @@ -400,7 +407,9 @@ impl DriaComputeNode { // this is expected to be sent by the p2p client request_msg_opt = self.request_rx.recv() => { if let Some((data, channel)) = request_msg_opt { - self.handle_request(data, channel).await; + if let Err(e) = self.handle_request(data, channel).await { + log::error!("Error handling request: {:?}", e); + } } else { log::error!("request_rx channel closed unexpectedly."); break; diff --git a/p2p/src/commands.rs b/p2p/src/commands.rs index d502647..a0a8bb2 100644 --- a/p2p/src/commands.rs +++ b/p2p/src/commands.rs @@ -159,6 +159,28 @@ impl DriaP2PCommander { .wrap_err("could not publish") } + pub async fn respond( + &mut self, + data: Vec, + channel: request_response::ResponseChannel>, + ) -> Result<()> { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(DriaP2PCommand::Respond { + data, + channel, + sender, + }) + .await + .wrap_err("could not send")?; + + receiver + .await + .wrap_err("could not receive")? + .wrap_err("could not publish") + } + /// Dials a given peer. pub async fn dial(&mut self, peer_id: Multiaddr) -> Result<()> { let (sender, receiver) = oneshot::channel(); From 1b569a4fb97b14d59beed44b06391ed6bddbfe92 Mon Sep 17 00:00:00 2001 From: erhant Date: Thu, 2 Jan 2025 14:16:03 +0300 Subject: [PATCH 05/13] added ping liveness warning, rpc peerid check on request --- compute/src/handlers/pingpong.rs | 4 +++ compute/src/handlers/workflow.rs | 2 +- compute/src/node.rs | 45 +++++++++++++++++++++++++------- p2p/src/client.rs | 8 +++--- 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/compute/src/handlers/pingpong.rs b/compute/src/handlers/pingpong.rs index 1326c37..efc47c0 100644 --- a/compute/src/handlers/pingpong.rs +++ b/compute/src/handlers/pingpong.rs @@ -4,6 +4,7 @@ use dkn_utils::get_current_time_nanos; use dkn_workflows::{Model, ModelProvider}; use eyre::{Context, Result}; use serde::{Deserialize, Serialize}; +use tokio::time::Instant; pub struct PingpongHandler; @@ -60,6 +61,9 @@ impl PingpongHandler { return Ok(MessageAcceptance::Ignore); } + // record ping moment + node.last_pinged_at = Instant::now(); + // respond let response_body = PingpongResponse { uuid: pingpong.uuid.clone(), diff --git a/compute/src/handlers/workflow.rs b/compute/src/handlers/workflow.rs index ae651e9..f7683ca 100644 --- a/compute/src/handlers/workflow.rs +++ b/compute/src/handlers/workflow.rs @@ -55,7 +55,7 @@ impl WorkflowHandler { // check task inclusion via the bloom filter if !task.filter.contains(&node.config.address)? { - log::info!("Task {} ignored due to filter.", task.task_id); + log::debug!("Task {} ignored due to filter.", task.task_id); // accept the message, someone else may be included in filter return Ok(Either::Left(MessageAcceptance::Accept)); diff --git a/compute/src/node.rs b/compute/src/node.rs index 64f0a67..056b5ad 100644 --- a/compute/src/node.rs +++ b/compute/src/node.rs @@ -8,7 +8,10 @@ use dkn_p2p::{ }; use eyre::Result; use std::collections::HashSet; -use tokio::{sync::mpsc, time::Duration}; +use tokio::{ + sync::mpsc, + time::{Duration, Instant}, +}; use tokio_util::{either::Either, sync::CancellationToken}; use crate::{ @@ -24,6 +27,8 @@ use crate::{ const DIAGNOSTIC_REFRESH_INTERVAL_SECS: u64 = 30; /// Number of seconds between refreshing the available nodes. const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 30 * 60; // 30 minutes +/// Number of seconds such that if the last ping is older than this, the node is considered unreachable. +const PING_LIVENESS_SECS: u64 = 150; /// Buffer size for message publishes. const PUBLISH_CHANNEL_BUFSIZE: usize = 1024; @@ -33,10 +38,13 @@ pub struct DriaComputeNode { pub dria_nodes: DriaNodes, /// Peer-to-peer client commander to interact with the network. pub p2p: DriaP2PCommander, + /// The last time the node was pinged by the network. + /// If this is too much, we can say that the node is not reachable by RPC. + pub last_pinged_at: Instant, /// Gossipsub message receiver, used by peer-to-peer client in a separate thread. message_rx: mpsc::Receiver<(PeerId, MessageId, Message)>, /// Request-response request receiver. - request_rx: mpsc::Receiver<(Vec, ResponseChannel>)>, + request_rx: mpsc::Receiver<(PeerId, Vec, ResponseChannel>)>, /// Publish receiver to receive messages to be published, publish_rx: mpsc::Receiver, /// Workflow transmitter to send batchable tasks. @@ -127,6 +135,7 @@ impl DriaComputeNode { completed_tasks_single: 0, completed_tasks_batch: 0, spec_collector: SpecCollector::new(), + last_pinged_at: Instant::now(), }, p2p_client, workflows_batch_worker, @@ -325,9 +334,19 @@ impl DriaComputeNode { /// Internally, the data is expected to be some JSON serialized data that is expected to be parsed and handled. async fn handle_request( &mut self, - data: Vec, - channel: ResponseChannel>, + (peer_id, data, channel): (PeerId, Vec, ResponseChannel>), ) -> Result<()> { + // ensure that message is from the known RPCs + if !self.dria_nodes.rpc_peerids.contains(&peer_id) { + log::warn!("Received request from unauthorized source: {}", peer_id); + log::debug!("Allowed sources: {:#?}", self.dria_nodes.rpc_peerids); + return Err(eyre::eyre!( + "Received unauthorized request from {}", + peer_id + )); + } + + // respond w.r.t data let response_data = if let Ok(req) = SpecResponder::try_parse_request(&data) { let response = SpecResponder::respond(req, self.spec_collector.collect().await); serde_json::to_vec(&response).unwrap() @@ -342,9 +361,9 @@ impl DriaComputeNode { /// This method is not expected to return until cancellation occurs for the given token. pub async fn run(&mut self, cancellation: CancellationToken) -> Result<()> { // prepare durations for sleeps - let mut peer_refresh_interval = + let mut diagnostic_refresh_interval = tokio::time::interval(Duration::from_secs(DIAGNOSTIC_REFRESH_INTERVAL_SECS)); - peer_refresh_interval.tick().await; // move one tick + diagnostic_refresh_interval.tick().await; // move one tick let mut available_node_refresh_interval = tokio::time::interval(Duration::from_secs(AVAILABLE_NODES_REFRESH_INTERVAL_SECS)); available_node_refresh_interval.tick().await; // move one tick @@ -383,7 +402,7 @@ impl DriaComputeNode { }, // check peer count every now and then - _ = peer_refresh_interval.tick() => self.handle_diagnostic_refresh().await, + _ = diagnostic_refresh_interval.tick() => self.handle_diagnostic_refresh().await, // available nodes are refreshed every now and then _ = available_node_refresh_interval.tick() => self.handle_available_nodes_refresh().await, // a GossipSub message is received from the channel @@ -406,8 +425,8 @@ impl DriaComputeNode { // a Response message is received from the channel // this is expected to be sent by the p2p client request_msg_opt = self.request_rx.recv() => { - if let Some((data, channel)) = request_msg_opt { - if let Err(e) = self.handle_request(data, channel).await { + if let Some((peer_id, data, channel)) = request_msg_opt { + if let Err(e) = self.handle_request((peer_id, data, channel)).await { log::error!("Error handling request: {:?}", e); } } else { @@ -453,6 +472,7 @@ impl DriaComputeNode { /// Peer refresh simply reports the peer count to the user. async fn handle_diagnostic_refresh(&self) { let mut diagnostics = Vec::new(); + // print peer counts match self.p2p.peer_counts().await { Ok((mesh, all)) => { @@ -480,6 +500,13 @@ impl DriaComputeNode { diagnostics.push(format!("Version: v{}", DRIA_COMPUTE_NODE_VERSION)); log::info!("{}", diagnostics.join(" | ")); + + if self.last_pinged_at < Instant::now() - Duration::from_secs(PING_LIVENESS_SECS) { + log::error!( + "Node has not received any pings for at least {} seconds & it may be unreachable!\nPlease restart your node!", + PING_LIVENESS_SECS + ); + } } /// Updates the local list of available nodes by refreshing it. diff --git a/p2p/src/client.rs b/p2p/src/client.rs index f1f5023..a52bd4c 100644 --- a/p2p/src/client.rs +++ b/p2p/src/client.rs @@ -25,7 +25,7 @@ pub struct DriaP2PClient { /// Gossipsub protoocol, gossip message sender. msg_tx: mpsc::Sender<(PeerId, MessageId, Message)>, /// Request-response protocol, request sender. - req_tx: mpsc::Sender<(Vec, ResponseChannel>)>, + req_tx: mpsc::Sender<(PeerId, Vec, ResponseChannel>)>, /// Command receiver. cmd_rx: mpsc::Receiver, } @@ -55,7 +55,7 @@ impl DriaP2PClient { DriaP2PClient, DriaP2PCommander, mpsc::Receiver<(PeerId, MessageId, Message)>, - mpsc::Receiver<(Vec, ResponseChannel>)>, + mpsc::Receiver<(PeerId, Vec, ResponseChannel>)>, )> { // this is our peerId let node_peerid = keypair.public().to_peer_id(); @@ -302,12 +302,12 @@ impl DriaP2PClient { // request-response events SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( - request_response::Event::Message { message, .. }, + request_response::Event::Message { message, peer }, )) => match message { request_response::Message::Request { request, channel, .. } => { - if let Err(e) = self.req_tx.send((request, channel)).await { + if let Err(e) = self.req_tx.send((peer, request, channel)).await { log::error!("Could not send request-response request: {:?}", e); } } From b75a2e9e46f25bbde1d5ee5afbae89776af87900 Mon Sep 17 00:00:00 2001 From: erhant Date: Thu, 2 Jan 2025 14:20:35 +0300 Subject: [PATCH 06/13] bump ver --- Cargo.lock | 10 +++++----- Cargo.toml | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2eeef19..dee226b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,7 +1070,7 @@ dependencies = [ [[package]] name = "dkn-compute" -version = "0.2.31" +version = "0.2.32" dependencies = [ "async-trait", "base64 0.22.1", @@ -1106,7 +1106,7 @@ dependencies = [ [[package]] name = "dkn-monitor" -version = "0.2.31" +version = "0.2.32" dependencies = [ "async-trait", "dkn-compute", @@ -1126,7 +1126,7 @@ dependencies = [ [[package]] name = "dkn-p2p" -version = "0.2.31" +version = "0.2.32" dependencies = [ "dkn-utils", "env_logger 0.11.5", @@ -1142,11 +1142,11 @@ dependencies = [ [[package]] name = "dkn-utils" -version = "0.2.31" +version = "0.2.32" [[package]] name = "dkn-workflows" -version = "0.2.31" +version = "0.2.32" dependencies = [ "dkn-utils", "dotenvy", diff --git a/Cargo.toml b/Cargo.toml index 2d986d4..ba3c621 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ default-members = ["compute"] [workspace.package] edition = "2021" -version = "0.2.31" +version = "0.2.32" license = "Apache-2.0" readme = "README.md" From 5f7c0efc6431728a3d85ada02c28920dafc92801 Mon Sep 17 00:00:00 2001 From: erhant Date: Thu, 2 Jan 2025 16:11:42 +0300 Subject: [PATCH 07/13] fix some tps bench errors --- .github/workflows/tests.yml | 2 + workflows/src/bin/tps.rs | 157 +++++++++++++++++++++--------------- 2 files changed, 92 insertions(+), 67 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0b1d4ea..90bb9c1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,11 +8,13 @@ on: # Source files in each member - "compute/src/**" - "p2p/src/**" + - "monitor/src/**" - "workflows/src/**" # Cargo in each member - "compute/Cargo.toml" - "p2p/Cargo.toml" - "workflows/Cargo.toml" + - "monitor/Cargo.toml" # root-level Cargo - "Cargo.lock" # workflow itself diff --git a/workflows/src/bin/tps.rs b/workflows/src/bin/tps.rs index 6e889ec..4fb457a 100644 --- a/workflows/src/bin/tps.rs +++ b/workflows/src/bin/tps.rs @@ -1,3 +1,10 @@ +use dkn_workflows::OllamaConfig; +use ollama_workflows::ollama_rs::{ + generation::completion::{request::GenerationRequest, GenerationResponse}, + Ollama, +}; +use ollama_workflows::Model; + #[cfg(not(feature = "profiling"))] fn main() { unimplemented!("this binary requires the 'profiling' feature to be enabled"); @@ -6,14 +13,12 @@ fn main() { #[cfg(feature = "profiling")] #[tokio::main] async fn main() { - use dkn_workflows::{DriaWorkflowsConfig, OllamaConfig}; - use ollama_workflows::ollama_rs::{generation::completion::request::GenerationRequest, Ollama}; - use ollama_workflows::Model; - use prettytable::{Cell, Row, Table}; - use sysinfo::{CpuRefreshKind, RefreshKind, System, MINIMUM_CPU_UPDATE_INTERVAL}; - - // initialize logger - env_logger::init(); + env_logger::builder() + .filter_level(log::LevelFilter::Off) + .filter_module("tps", log::LevelFilter::Info) + .filter_module("dkn_workflows", log::LevelFilter::Debug) + .parse_default_env() + .init(); let models = vec![ Model::NousTheta, @@ -21,8 +26,6 @@ async fn main() { Model::Phi3Medium128k, Model::Phi3_5Mini, Model::Phi3_5MiniFp16, - Model::Gemma2_9B, - Model::Gemma2_9BFp16, Model::Llama3_1_8B, Model::Llama3_1_8Bq8, Model::Llama3_1_8Bf16, @@ -43,53 +46,54 @@ async fn main() { Model::Qwen2_5coder7Bf16, Model::DeepSeekCoder6_7B, Model::Mixtral8_7b, - Model::GPT4Turbo, - Model::GPT4o, - Model::GPT4oMini, - Model::O1Preview, - Model::O1Mini, - Model::Gemini15ProExp0827, - Model::Gemini15Pro, - Model::Gemini15Flash, - Model::Gemini10Pro, - Model::Gemma2_2bIt, - Model::Gemma2_27bIt, + Model::Gemma2_9B, + Model::Gemma2_9BFp16, ]; - let cfg = DriaWorkflowsConfig::new(models); let config = OllamaConfig::default(); let ollama = Ollama::new(config.host, config.port); - log::debug!("Starting..."); - // ensure that all lists of CPUs and processes are filled - let mut system = System::new_all(); - // update all information of the system - system.refresh_all(); + + run_benchmark(ollama, models).await; +} + +#[cfg(feature = "profiling")] +async fn run_benchmark(ollama: Ollama, models: Vec) { + use dkn_workflows::ModelProvider; + use prettytable::{Cell, Row, Table}; + use sysinfo::{ + CpuRefreshKind, MemoryRefreshKind, RefreshKind, System, MINIMUM_CPU_UPDATE_INTERVAL, + }; + + // create & update system info + let mut system = System::new_with_specifics( + RefreshKind::new() + .with_cpu(CpuRefreshKind::everything()) + .with_memory(MemoryRefreshKind::everything()), + ); + system.refresh_cpu_usage(); + system.refresh_memory(); log::debug!("Getting system information..."); let brand = system.cpus()[0].brand().to_string(); let os_name = System::name().unwrap_or_else(|| "Unknown".to_string()); let os_version = System::long_os_version().unwrap_or_else(|| "Unknown".to_string()); - let cpu_usage = system.global_cpu_usage(); - let total_memory = system.total_memory(); - let used_memory = system.used_memory(); - let mut tps: f64; - let mut table = Table::new(); + log::info!("{} {} ({})", brand, os_name, os_version); - // Add a row with the headers + let mut table = Table::new(); table.add_row(Row::new(vec![ Cell::new("Model"), Cell::new("TPS"), - Cell::new("OS"), - Cell::new("Version"), Cell::new("CPU Usage (%)"), Cell::new("Total Memory (KB)"), Cell::new("Used Memory (KB)"), ])); - for (_, model) in cfg.models { + // iterate over Ollama models + for model in models + .into_iter() + .filter(|m| ModelProvider::from(m.clone()) == ModelProvider::Ollama) + { log::debug!("Pulling model: {}", model); - - // pull model match ollama.pull_model(model.to_string(), false).await { Ok(status) => log::debug!("Status: {}", status.message), Err(err) => { @@ -97,48 +101,67 @@ async fn main() { } } - log::debug!("Creating request..."); - // create dummy request as a warm-up - let generation_request = - GenerationRequest::new(model.to_string(), "compute 6780 * 1200".to_string()); - - // generate response - match ollama.generate(generation_request).await { + match ollama + .generate(GenerationRequest::new( + model.to_string(), + "Write a poem about Julius Caesar.".to_string(), + )) + .await + { Ok(response) => { log::debug!("Got response for model {}", model); - // compute TPS - tps = (response.eval_count.unwrap_or_default() as f64) - / (response.eval_duration.unwrap_or(1) as f64) - * 1_000_000_000f64; - - // add row to table - // FIXME: this should be updated on each iteration + system.refresh_cpu_usage(); + system.refresh_memory(); table.add_row(Row::new(vec![ Cell::new(&model.to_string()), - Cell::new(&tps.to_string()), - Cell::new(&format!("{} {}", brand, os_name)), - Cell::new(&os_version), - Cell::new(&cpu_usage.to_string()), - Cell::new(&total_memory.to_string()), - Cell::new(&used_memory.to_string()), + Cell::new(&get_response_tps(&response).to_string()), + Cell::new(&system.global_cpu_usage().to_string()), + Cell::new(&(system.total_memory() / 1000).to_string()), + Cell::new(&(system.used_memory() / 1000).to_string()), ])); + // TODO: should add GPU usage here as well } Err(e) => { log::warn!("Ignoring model {}: Workflow failed with error {}", model, e); } } - table.printstd(); - // print system info - // refresh CPU usage (https://docs.rs/sysinfo/latest/sysinfo/struct.Cpu.html#method.cpu_usage) - system = - System::new_with_specifics(RefreshKind::new().with_cpu(CpuRefreshKind::everything())); + // wait a bit because CPU usage is based on diff std::thread::sleep(MINIMUM_CPU_UPDATE_INTERVAL); - // refresh CPUs again to get actual value - system.refresh_cpu_usage(); } - // print system info + + // print the final result table.printstd(); - log::debug!("Finished"); +} + +/// Computes the TPS. +#[inline(always)] +fn get_response_tps(res: &GenerationResponse) -> f64 { + (res.eval_count.unwrap_or_default() as f64) / (res.eval_duration.unwrap_or(1) as f64) + * 1_000_000_000f64 +} + +#[cfg(feature = "profiling")] +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_single() { + env_logger::builder() + .filter_level(log::LevelFilter::Off) + .filter_module("tps", log::LevelFilter::Debug) + .filter_module("dkn_workflows", log::LevelFilter::Debug) + .parse_default_env() + .is_test(true) + .init(); + + let models = vec![Model::Llama3_2_3B, Model::Llama3_2_1B]; + + let config = OllamaConfig::default(); + let ollama = Ollama::new(config.host, config.port); + + run_benchmark(ollama, models).await; + } } From f19d681bc931d7b1dd8a654d17ed5a978a971bc2 Mon Sep 17 00:00:00 2001 From: erhant Date: Thu, 2 Jan 2025 17:00:43 +0300 Subject: [PATCH 08/13] changed behavior ordering, changed gossip msg log --- compute/src/handlers/pingpong.rs | 1 + compute/src/handlers/workflow.rs | 2 ++ compute/src/node.rs | 19 +++++++++---------- p2p/src/behaviour.rs | 11 +++++------ 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/compute/src/handlers/pingpong.rs b/compute/src/handlers/pingpong.rs index efc47c0..1d13a40 100644 --- a/compute/src/handlers/pingpong.rs +++ b/compute/src/handlers/pingpong.rs @@ -61,6 +61,7 @@ impl PingpongHandler { return Ok(MessageAcceptance::Ignore); } + log::info!("Received a ping for: {}", pingpong.uuid); // record ping moment node.last_pinged_at = Instant::now(); diff --git a/compute/src/handlers/workflow.rs b/compute/src/handlers/workflow.rs index f7683ca..e13810c 100644 --- a/compute/src/handlers/workflow.rs +++ b/compute/src/handlers/workflow.rs @@ -61,6 +61,8 @@ impl WorkflowHandler { return Ok(Either::Left(MessageAcceptance::Accept)); } + log::info!("Received a task with id: {}", task.task_id); + // obtain public key from the payload // do this early to avoid unnecessary processing let task_public_key_bytes = diff --git a/compute/src/node.rs b/compute/src/node.rs index 056b5ad..3a98e93 100644 --- a/compute/src/node.rs +++ b/compute/src/node.rs @@ -211,14 +211,6 @@ impl DriaComputeNode { return MessageAcceptance::Ignore; }; - // log the received message - log::info!( - "Received {} message ({}) from {}", - gossipsub_message.topic, - message_id, - peer_id, - ); - // ensure that message is from the known RPCs if !self.dria_nodes.rpc_peerids.contains(&source_peer_id) { log::warn!( @@ -244,6 +236,15 @@ impl DriaComputeNode { } }; + // debug-log the received message + log::debug!( + "Received {} message ({}) from {}\n{}", + gossipsub_message.topic, + message_id, + peer_id, + message + ); + // check signature match message.is_signed(&self.config.admin_public_key) { Ok(true) => { /* message is signed correctly, nothing to do here */ } @@ -257,8 +258,6 @@ impl DriaComputeNode { } } - log::debug!("Parsed: {}", message); - // handle the DKN message with respect to the topic let handler_result = match message.topic.as_str() { WorkflowHandler::LISTEN_TOPIC => { diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index ea1d0ca..88e2680 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -34,14 +34,13 @@ impl DriaBehaviour { let peer_id = public_key.to_peer_id(); Ok(Self { + connection_limits: create_connection_limits_behaviour(), relay: relay_behaviour, - gossipsub: create_gossipsub_behaviour(peer_id) - .wrap_err("could not create Gossipsub behaviour")?, - kademlia: create_kademlia_behaviour(peer_id, kademlia_protocol), - autonat: create_autonat_behaviour(peer_id), dcutr: create_dcutr_behaviour(peer_id), + autonat: create_autonat_behaviour(peer_id), identify: create_identify_behaviour(public_key, identity_protocol), - connection_limits: create_connection_limits_behaviour(), + kademlia: create_kademlia_behaviour(peer_id, kademlia_protocol), + gossipsub: create_gossipsub_behaviour(peer_id)?, request_response: create_request_response_behaviour(reqres_protocol), }) } @@ -70,7 +69,7 @@ fn create_connection_limits_behaviour() -> connection_limits::Behaviour { /// Number of established outgoing connections limit, this is directly correlated to peer count /// so limiting this will cause a limitation on peers as well. - const EST_OUTGOING_LIMIT: u32 = 300; + const EST_OUTGOING_LIMIT: u32 = 20; let limits = ConnectionLimits::default().with_max_established_outgoing(Some(EST_OUTGOING_LIMIT)); From e9f5ee580ffaec7730f00799423acc3d3cdcbf88 Mon Sep 17 00:00:00 2001 From: erhant Date: Thu, 2 Jan 2025 17:02:27 +0300 Subject: [PATCH 09/13] revert outgoing limit num --- p2p/src/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index 88e2680..fc07a15 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -69,7 +69,7 @@ fn create_connection_limits_behaviour() -> connection_limits::Behaviour { /// Number of established outgoing connections limit, this is directly correlated to peer count /// so limiting this will cause a limitation on peers as well. - const EST_OUTGOING_LIMIT: u32 = 20; + const EST_OUTGOING_LIMIT: u32 = 300; let limits = ConnectionLimits::default().with_max_established_outgoing(Some(EST_OUTGOING_LIMIT)); From 5f2295c194fdaf143614a49b55c87c47f627c8bc Mon Sep 17 00:00:00 2001 From: erhant Date: Fri, 3 Jan 2025 22:15:37 +0300 Subject: [PATCH 10/13] added request command and a small test --- compute/src/workers/workflow.rs | 8 ++- p2p/src/client.rs | 23 ++++++- p2p/src/commands.rs | 27 ++++++++ .../{listen_test.rs => gossipsub_test.rs} | 12 +++- p2p/tests/request_test.rs | 69 +++++++++++++++++++ 5 files changed, 133 insertions(+), 6 deletions(-) rename p2p/tests/{listen_test.rs => gossipsub_test.rs} (84%) create mode 100644 p2p/tests/request_test.rs diff --git a/compute/src/workers/workflow.rs b/compute/src/workers/workflow.rs index ad763da..e0191e7 100644 --- a/compute/src/workers/workflow.rs +++ b/compute/src/workers/workflow.rs @@ -246,7 +246,13 @@ mod tests { use libsecp256k1::{PublicKey, SecretKey}; use tokio::sync::mpsc; - // cargo test --package dkn-compute --lib --all-features -- workers::workflow::tests::test_workflows_worker --exact --show-output --nocapture --ignored + /// Tests the workflows worker with a single task sent within a batch. + /// + /// ## Run command + /// + /// ```sh + /// cargo test --package dkn-compute --lib --all-features -- workers::workflow::tests::test_workflows_worker --exact --show-output --nocapture --ignored + /// ``` #[tokio::test] #[ignore = "run manually"] async fn test_workflows_worker() { diff --git a/p2p/src/client.rs b/p2p/src/client.rs index a52bd4c..01e44aa 100644 --- a/p2p/src/client.rs +++ b/p2p/src/client.rs @@ -18,6 +18,8 @@ use super::DriaP2PCommander; /// Peer-to-peer client for Dria Knowledge Network. pub struct DriaP2PClient { + /// Your peer id. + pub peer_id: PeerId, /// `Swarm` instance, everything p2p-related are accessed through this instace. swarm: Swarm, /// Dria protocol, used for identifying the client. @@ -58,8 +60,8 @@ impl DriaP2PClient { mpsc::Receiver<(PeerId, Vec, ResponseChannel>)>, )> { // this is our peerId - let node_peerid = keypair.public().to_peer_id(); - log::info!("Compute node peer address: {}", node_peerid); + let peer_id = keypair.public().to_peer_id(); + log::info!("Compute node peer address: {}", peer_id); let mut swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() @@ -144,6 +146,7 @@ impl DriaP2PClient { let (msg_tx, msg_rx) = mpsc::channel(MSG_CHANNEL_BUFSIZE); let (req_tx, req_rx) = mpsc::channel(MSG_CHANNEL_BUFSIZE); let client = Self { + peer_id, swarm, protocol, msg_tx, @@ -226,6 +229,18 @@ impl DriaP2PClient { .map_err(|_| eyre::eyre!("could not send response, channel is closed?")), ); } + DriaP2PCommand::Request { + data, + peer_id, + sender, + } => { + let _ = sender.send( + self.swarm + .behaviour_mut() + .request_response + .send_request(&peer_id, data), + ); + } DriaP2PCommand::ValidateMessage { msg_id, propagation_source, @@ -304,6 +319,8 @@ impl DriaP2PClient { SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( request_response::Event::Message { message, peer }, )) => match message { + // a request has been made with us as the target, and we should respond + // using the created `channel`; we simply forward this to the request channel request_response::Message::Request { request, channel, .. } => { @@ -316,7 +333,7 @@ impl DriaP2PClient { response, } => { // while we support the protocol, we dont really make any requests - // TODO: p2p crate should support this + // TODO: should p2p crate support this? log::warn!( "Unexpected response message with request_id {}: {:?}", request_id, diff --git a/p2p/src/commands.rs b/p2p/src/commands.rs index a0a8bb2..79d871b 100644 --- a/p2p/src/commands.rs +++ b/p2p/src/commands.rs @@ -49,6 +49,14 @@ pub enum DriaP2PCommand { channel: request_response::ResponseChannel>, sender: oneshot::Sender>, }, + /// Request a request-response message. + /// Note that you are likely to be caught by the RPC peer id check, + /// and your messages will be ignored. + Request { + peer_id: PeerId, + data: Vec, + sender: oneshot::Sender, + }, /// Validates a GossipSub message for propagation, returns whether the message existed in cache. /// /// - `Accept`: Accept the message and propagate it. @@ -181,6 +189,25 @@ impl DriaP2PCommander { .wrap_err("could not publish") } + pub async fn request( + &mut self, + peer_id: PeerId, + data: Vec, + ) -> Result { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(DriaP2PCommand::Request { + data, + peer_id, + sender, + }) + .await + .wrap_err("could not send")?; + + receiver.await.wrap_err("could not receive") + } + /// Dials a given peer. pub async fn dial(&mut self, peer_id: Multiaddr) -> Result<()> { let (sender, receiver) = oneshot::channel(); diff --git a/p2p/tests/listen_test.rs b/p2p/tests/gossipsub_test.rs similarity index 84% rename from p2p/tests/listen_test.rs rename to p2p/tests/gossipsub_test.rs index 400ea76..1b62443 100644 --- a/p2p/tests/listen_test.rs +++ b/p2p/tests/gossipsub_test.rs @@ -2,14 +2,22 @@ use dkn_p2p::{DriaNodes, DriaP2PClient, DriaP2PProtocol}; use eyre::Result; use libp2p_identity::Keypair; +/// A gossipsub test that listens for a single message on a given topic. +/// Terminates when a message is received. +/// +/// ## Run command +/// +/// ```sh +/// cargo test --package dkn-p2p --test gossipsub_test --all-features -- test_gossipsub --exact --show-output --ignored +/// ``` #[tokio::test] #[ignore = "run this manually"] -async fn test_listen_topic_once() -> Result<()> { +async fn test_gossipsub() -> Result<()> { const TOPIC: &str = "pong"; let _ = env_logger::builder() .filter_level(log::LevelFilter::Off) - .filter_module("listen_test", log::LevelFilter::Debug) + .filter_module("gossipsub_test", log::LevelFilter::Debug) .filter_module("dkn_p2p", log::LevelFilter::Debug) .is_test(true) .try_init(); diff --git a/p2p/tests/request_test.rs b/p2p/tests/request_test.rs new file mode 100644 index 0000000..7f5dadc --- /dev/null +++ b/p2p/tests/request_test.rs @@ -0,0 +1,69 @@ +use std::str::FromStr; + +use dkn_p2p::{DriaNodes, DriaP2PClient, DriaP2PProtocol}; +use eyre::Result; +use libp2p::PeerId; +use libp2p_identity::Keypair; + +/// Makes a dummy request to some peer hardcoded within the test. +/// +/// ## Run command +/// +/// ```sh +/// cargo test --package dkn-p2p --test request_test --all-features -- test_request_message --exact --show-output --ignored +/// ``` +#[tokio::test] +#[ignore = "run this manually"] +async fn test_request_message() -> Result<()> { + let _ = env_logger::builder() + .filter_level(log::LevelFilter::Off) + .filter_module("request_test", log::LevelFilter::Debug) + .filter_module("dkn_p2p", log::LevelFilter::Debug) + .is_test(true) + .try_init(); + + let listen_addr = "/ip4/0.0.0.0/tcp/4001".parse()?; + + // prepare nodes + let nodes = DriaNodes::new(dkn_p2p::DriaNetworkType::Community) + .with_bootstrap_nodes(["/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4".parse()?]) + .with_relay_nodes(["/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa".parse()?]); + + // spawn P2P client in another task + let (client, mut commander, mut msg_rx, mut req_rx) = DriaP2PClient::new( + Keypair::generate_secp256k1(), + listen_addr, + &nodes, + DriaP2PProtocol::default(), + ) + .expect("could not create p2p client"); + + // spawn task + let task_handle = tokio::spawn(async move { client.run().await }); + + log::info!("Waiting a bit until we have enough peers"); + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + + let peer_id = + PeerId::from_str("16Uiu2HAmB5HGdwLNHX81u7ey1fvDx5Mr4ofa2PdSSVxFKrrcErAN").unwrap(); + log::info!("Making a request to peer: {}", peer_id); + commander + .request(peer_id, b"here is some data".into()) + .await?; + + log::info!("Waiting for response logs for a few moments..."); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // close command channel + commander.shutdown().await.expect("could not shutdown"); + + // close other channels + msg_rx.close(); + req_rx.close(); + + log::info!("Waiting for p2p task to finish..."); + task_handle.await?; + + log::info!("Done!"); + Ok(()) +} From 65cc197fc5a1d73790c52e7d17a678e4c72e0499 Mon Sep 17 00:00:00 2001 From: erhant Date: Fri, 10 Jan 2025 17:31:46 +0300 Subject: [PATCH 11/13] added preliminary handling for task requests --- compute/src/config.rs | 15 ++- compute/src/handlers/pingpong.rs | 3 +- compute/src/node.rs | 14 ++- compute/src/responders/mod.rs | 11 +- compute/src/responders/workflow.rs | 155 +++++++++++++++++++++++++++++ compute/src/utils/message.rs | 5 +- compute/src/utils/specs.rs | 17 +++- compute/src/workers/workflow.rs | 6 +- p2p/src/behaviour.rs | 1 - 9 files changed, 205 insertions(+), 22 deletions(-) create mode 100644 compute/src/responders/workflow.rs diff --git a/compute/src/config.rs b/compute/src/config.rs index 23fa8c0..a178f77 100644 --- a/compute/src/config.rs +++ b/compute/src/config.rs @@ -1,16 +1,16 @@ -use crate::utils::{ - address_in_use, - crypto::{secret_to_keypair, to_address}, -}; use dkn_p2p::{libp2p::Multiaddr, DriaNetworkType}; use dkn_workflows::DriaWorkflowsConfig; use eyre::{eyre, Result}; use libsecp256k1::{PublicKey, SecretKey}; - use std::{env, str::FromStr}; -// TODO: make this configurable later +use crate::utils::{ + address_in_use, + crypto::{secret_to_keypair, to_address}, +}; + const DEFAULT_WORKFLOW_BATCH_SIZE: usize = 5; +const DEFAULT_P2P_LISTEN_ADDR: &str = "/ip4/0.0.0.0/tcp/4001"; #[derive(Debug, Clone)] pub struct DriaComputeNodeConfig { @@ -35,9 +35,6 @@ pub struct DriaComputeNodeConfig { pub batch_size: usize, } -/// The default P2P network listen address. -pub(crate) const DEFAULT_P2P_LISTEN_ADDR: &str = "/ip4/0.0.0.0/tcp/4001"; - #[allow(clippy::new_without_default)] impl DriaComputeNodeConfig { /// Creates new config from environment variables. diff --git a/compute/src/handlers/pingpong.rs b/compute/src/handlers/pingpong.rs index 1d13a40..34cbf09 100644 --- a/compute/src/handlers/pingpong.rs +++ b/compute/src/handlers/pingpong.rs @@ -1,4 +1,3 @@ -use crate::{utils::DriaMessage, DriaComputeNode}; use dkn_p2p::libp2p::gossipsub::MessageAcceptance; use dkn_utils::get_current_time_nanos; use dkn_workflows::{Model, ModelProvider}; @@ -6,6 +5,8 @@ use eyre::{Context, Result}; use serde::{Deserialize, Serialize}; use tokio::time::Instant; +use crate::{utils::DriaMessage, DriaComputeNode}; + pub struct PingpongHandler; #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/compute/src/node.rs b/compute/src/node.rs index 3a98e93..f4d9f79 100644 --- a/compute/src/node.rs +++ b/compute/src/node.rs @@ -17,7 +17,7 @@ use tokio_util::{either::Either, sync::CancellationToken}; use crate::{ config::*, handlers::*, - responders::{IsResponder, SpecResponder}, + responders::{IsResponder, SpecResponder, WorkflowResponder}, utils::{crypto::secret_to_keypair, refresh_dria_nodes, DriaMessage, SpecCollector}, workers::workflow::{WorkflowsWorker, WorkflowsWorkerInput, WorkflowsWorkerOutput}, DRIA_COMPUTE_NODE_VERSION, @@ -144,6 +144,9 @@ impl DriaComputeNode { } /// Subscribe to a certain task with its topic. + /// + /// These are likely to be called once, so can be inlined. + #[inline] pub async fn subscribe(&mut self, topic: &str) -> Result<()> { let ok = self.p2p.subscribe(topic).await?; if ok { @@ -155,6 +158,9 @@ impl DriaComputeNode { } /// Unsubscribe from a certain task with its topic. + /// + /// These are likely to be called once, so can be inlined. + #[inline] pub async fn unsubscribe(&mut self, topic: &str) -> Result<()> { let ok = self.p2p.unsubscribe(topic).await?; if ok { @@ -166,6 +172,7 @@ impl DriaComputeNode { } /// Returns the task count within the channels, `single` and `batch`. + #[inline] pub fn get_pending_task_count(&self) -> [usize; 2] { [ self.pending_tasks_single.len(), @@ -349,6 +356,11 @@ impl DriaComputeNode { let response_data = if let Ok(req) = SpecResponder::try_parse_request(&data) { let response = SpecResponder::respond(req, self.spec_collector.collect().await); serde_json::to_vec(&response).unwrap() + } else if let Ok(req) = WorkflowResponder::try_parse_request(&data) { + log::info!("Received a task request with id: {}", req.task_id); + return Err(eyre::eyre!( + "REQUEST RESPONSE FOR TASKS ARE NOT IMPLEMENTED YET" + )); } else { return Err(eyre::eyre!("Received unknown request: {:?}", data)); }; diff --git a/compute/src/responders/mod.rs b/compute/src/responders/mod.rs index dd10f98..b7fa2f4 100644 --- a/compute/src/responders/mod.rs +++ b/compute/src/responders/mod.rs @@ -1,10 +1,17 @@ -mod specs; use eyre::Context; use serde::{de::DeserializeOwned, Serialize}; + +mod specs; pub use specs::SpecResponder; +mod workflow; +pub use workflow::WorkflowResponder; + +/// A responder should implement a request & response type, both serializable. +/// +/// The `try_parse_request` is automatically implemented using `serde-json` for a byte slice. pub trait IsResponder { - type Request: Serialize + DeserializeOwned; + type Request: DeserializeOwned; type Response: Serialize + DeserializeOwned; fn try_parse_request<'a>(data: &[u8]) -> eyre::Result { diff --git a/compute/src/responders/workflow.rs b/compute/src/responders/workflow.rs new file mode 100644 index 0000000..5ab2472 --- /dev/null +++ b/compute/src/responders/workflow.rs @@ -0,0 +1,155 @@ +#![allow(unused)] + +use dkn_utils::get_current_time_nanos; +use dkn_workflows::{Entry, Executor, ModelProvider, Workflow}; +use eyre::{Context, Result}; +use libsecp256k1::PublicKey; +use serde::Deserialize; + +use crate::payloads::*; +use crate::utils::DriaMessage; +use crate::workers::workflow::*; +use crate::DriaComputeNode; + +use super::IsResponder; + +pub struct WorkflowResponder; + +impl IsResponder for WorkflowResponder { + type Request = TaskRequestPayload; + type Response = TaskResponsePayload; +} + +#[derive(Debug, Deserialize)] +pub struct WorkflowPayload { + /// [Workflow](https://github.com/andthattoo/ollama-workflows/blob/main/src/program/workflow.rs) object to be parsed. + pub(crate) workflow: Workflow, + /// A lıst of model (that can be parsed into `Model`) or model provider names. + /// If model provider is given, the first matching model in the node config is used for that. + /// From the given list, a random choice will be made for the task. + pub(crate) model: Vec, + /// Prompts can be provided within the workflow itself, in which case this is `None`. + /// Otherwise, the prompt is expected to be `Some` here. + pub(crate) prompt: Option, +} + +impl WorkflowResponder { + pub(crate) async fn handle_compute( + node: &mut DriaComputeNode, + compute_message: &DriaMessage, + ) -> Result> { + let stats = TaskStats::new().record_received_at(); + + // parse payload + let task = compute_message + .parse_payload::>(true) + .wrap_err("could not parse workflow task")?; + + // check if deadline is past or not + if get_current_time_nanos() >= task.deadline { + log::debug!("Task {} is past the deadline, ignoring", task.task_id,); + return Ok(None); + } + + // TODO: we dont check the filter at all, because this was a request to the given peer + + log::info!("Received a task with id: {}", task.task_id); + + // obtain public key from the payload + // do this early to avoid unnecessary processing + let task_public_key_bytes = + hex::decode(&task.public_key).wrap_err("could not decode public key")?; + let task_public_key = PublicKey::parse_slice(&task_public_key_bytes, None)?; + + // read model / provider from the task + let (model_provider, model) = node + .config + .workflows + .get_any_matching_model(task.input.model)?; + let model_name = model.to_string(); // get model name, we will pass it in payload + log::info!("Using model {} for task {}", model_name, task.task_id); + + // prepare workflow executor + let (executor, batchable) = if model_provider == ModelProvider::Ollama { + ( + Executor::new_at( + model, + &node.config.workflows.ollama.host, + node.config.workflows.ollama.port, + ), + false, + ) + } else { + (Executor::new(model), true) + }; + + // prepare entry from prompt + let entry: Option = task + .input + .prompt + .map(|prompt| Entry::try_value_or_str(&prompt)); + + // get workflow as well + let workflow = task.input.workflow; + + Ok(Some(WorkflowsWorkerInput { + entry, + executor, + workflow, + model_name, + task_id: task.task_id, + public_key: task_public_key, + stats, + batchable, + })) + } + + /// Handles the result of a workflow task. + pub(crate) async fn handle_respond( + node: &mut DriaComputeNode, + task: WorkflowsWorkerOutput, + ) -> Result<()> { + // TODO: handle response + let _response = match task.result { + Ok(result) => { + // prepare signed and encrypted payload + let payload = TaskResponsePayload::new( + result, + &task.task_id, + &task.public_key, + &node.config.secret_key, + task.model_name, + task.stats.record_published_at(), + )?; + + // convert payload to message + let payload_str = serde_json::json!(payload).to_string(); + log::info!("Publishing result for task {}", task.task_id); + + DriaMessage::new(payload_str, "response") + } + Err(err) => { + // use pretty display string for error logging with causes + let err_string = format!("{:#}", err); + log::error!("Task {} failed: {}", task.task_id, err_string); + + // prepare error payload + let error_payload = TaskErrorPayload { + task_id: task.task_id.clone(), + error: err_string, + model: task.model_name, + stats: task.stats.record_published_at(), + }; + let error_payload_str = serde_json::json!(error_payload).to_string(); + + // prepare signed message + DriaMessage::new_signed(error_payload_str, "response", &node.config.secret_key) + } + }; + + // respond through the channel + // TODO: !!! + + Ok(()) + } +} diff --git a/compute/src/utils/message.rs b/compute/src/utils/message.rs index 9eec768..144cdea 100644 --- a/compute/src/utils/message.rs +++ b/compute/src/utils/message.rs @@ -1,5 +1,3 @@ -use crate::utils::crypto::{sha256hash, sign_bytes_recoverable}; -use crate::DRIA_COMPUTE_NODE_VERSION; use base64::{prelude::BASE64_STANDARD, Engine}; use core::fmt; use dkn_utils::get_current_time_nanos; @@ -8,6 +6,9 @@ use eyre::{Context, Result}; use libsecp256k1::{verify, Message, SecretKey, Signature}; use serde::{Deserialize, Serialize}; +use crate::utils::crypto::{sha256hash, sign_bytes_recoverable}; +use crate::DRIA_COMPUTE_NODE_VERSION; + /// A message within Dria Knowledge Network. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct DriaMessage { diff --git a/compute/src/utils/specs.rs b/compute/src/utils/specs.rs index 3729c6e..737da4c 100644 --- a/compute/src/utils/specs.rs +++ b/compute/src/utils/specs.rs @@ -1,5 +1,7 @@ +use public_ip_address::response::LookupResponse; use serde::{Deserialize, Serialize}; use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind}; +use wgpu::AdapterInfo; /// Machine info & location. #[derive(Debug, Serialize, Deserialize)] @@ -10,12 +12,16 @@ pub struct Specs { free_mem: u64, /// Number of physical CPU cores. num_cpus: Option, + /// Global CPU usage, in percentage. cpu_usage: f32, + /// Operating system name, e.g. `linux`, `macos`, `windows`. os: String, + /// CPU architecture, e.g. `x86_64`, `aarch64`. arch: String, - family: String, - gpus: Vec, - lookup: Option, + /// GPU adapter infos, showing information about the available GPUs. + gpus: Vec, + /// Public IP lookup response. + lookup: Option, } pub struct SpecCollector { @@ -23,7 +29,7 @@ pub struct SpecCollector { /// as per the [docs](https://github.com/GuillaumeGomez/sysinfo?tab=readme-ov-file#good-practice--performance-tips). system: sysinfo::System, /// GPU adapter infos, showing information about the available GPUs. - gpus: Vec, + gpus: Vec, } impl SpecCollector { @@ -38,6 +44,8 @@ impl SpecCollector { } } + /// Returns the selected refresh kinds. It is important to ignore + /// process values here because it will consume a lot of file-descriptors. #[inline(always)] fn get_refresh_specifics() -> RefreshKind { RefreshKind::nothing() @@ -55,7 +63,6 @@ impl SpecCollector { cpu_usage: self.system.global_cpu_usage(), os: std::env::consts::OS.to_string(), arch: std::env::consts::ARCH.to_string(), - family: std::env::consts::FAMILY.to_string(), gpus: self.gpus.clone(), lookup: public_ip_address::perform_lookup(None).await.ok(), } diff --git a/compute/src/workers/workflow.rs b/compute/src/workers/workflow.rs index e0191e7..82e4e2a 100644 --- a/compute/src/workers/workflow.rs +++ b/compute/src/workers/workflow.rs @@ -106,7 +106,11 @@ impl WorkflowsWorker { // (1) there are no tasks, or, // (2) there are tasks less than the batch size and the channel is not empty while tasks.is_empty() || (tasks.len() < batch_size && !self.workflow_rx.is_empty()) { - log::info!("Waiting for more workflows to process ({})", tasks.len()); + log::info!( + "Worker is waiting for tasks ({} < {})", + tasks.len(), + batch_size + ); let limit = batch_size - tasks.len(); match self.workflow_rx.recv_many(&mut tasks, limit).await { // 0 tasks returned means that the channel is closed diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index fc07a15..38181f9 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -53,7 +53,6 @@ impl DriaBehaviour { fn create_request_response_behaviour( protocol_name: StreamProtocol, ) -> request_response::cbor::Behaviour, Vec> { - // TODO: use json instead here? use request_response::{Behaviour, ProtocolSupport}; Behaviour::new( From 0a98c18d0a677b5fc8a654c0e3eb02c0aefe2bd0 Mon Sep 17 00:00:00 2001 From: erhant Date: Fri, 10 Jan 2025 17:34:13 +0300 Subject: [PATCH 12/13] small build and lint fixes --- compute/src/responders/mod.rs | 2 +- compute/src/utils/specs.rs | 6 ++++++ workflows/src/bin/tps.rs | 6 ++++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/compute/src/responders/mod.rs b/compute/src/responders/mod.rs index b7fa2f4..a398cda 100644 --- a/compute/src/responders/mod.rs +++ b/compute/src/responders/mod.rs @@ -14,7 +14,7 @@ pub trait IsResponder { type Request: DeserializeOwned; type Response: Serialize + DeserializeOwned; - fn try_parse_request<'a>(data: &[u8]) -> eyre::Result { + fn try_parse_request(data: &[u8]) -> eyre::Result { serde_json::from_slice(data).wrap_err("could not parse request") } } diff --git a/compute/src/utils/specs.rs b/compute/src/utils/specs.rs index 737da4c..64e4e3c 100644 --- a/compute/src/utils/specs.rs +++ b/compute/src/utils/specs.rs @@ -32,6 +32,12 @@ pub struct SpecCollector { gpus: Vec, } +impl Default for SpecCollector { + fn default() -> Self { + Self::new() + } +} + impl SpecCollector { pub fn new() -> Self { SpecCollector { diff --git a/workflows/src/bin/tps.rs b/workflows/src/bin/tps.rs index 4fb457a..1c0d2d5 100644 --- a/workflows/src/bin/tps.rs +++ b/workflows/src/bin/tps.rs @@ -1,9 +1,10 @@ -use dkn_workflows::OllamaConfig; +#[cfg(feature = "profiling")] +use dkn_workflows::{Model, OllamaConfig}; +#[cfg(feature = "profiling")] use ollama_workflows::ollama_rs::{ generation::completion::{request::GenerationRequest, GenerationResponse}, Ollama, }; -use ollama_workflows::Model; #[cfg(not(feature = "profiling"))] fn main() { @@ -136,6 +137,7 @@ async fn run_benchmark(ollama: Ollama, models: Vec) { } /// Computes the TPS. +#[cfg(feature = "profiling")] #[inline(always)] fn get_response_tps(res: &GenerationResponse) -> f64 { (res.eval_count.unwrap_or_default() as f64) / (res.eval_duration.unwrap_or(1) as f64) From 5b7fc051330649702a2d5427a9752a3351b95bcd Mon Sep 17 00:00:00 2001 From: erhant Date: Wed, 15 Jan 2025 14:25:36 +0300 Subject: [PATCH 13/13] added logs for req-res --- compute/src/node.rs | 15 +++++++++++++-- compute/src/responders/specs.rs | 2 +- p2p/src/behaviour.rs | 7 ++----- p2p/src/protocol.rs | 19 ++++++++++++------- p2p/tests/gossipsub_test.rs | 27 ++++++--------------------- p2p/tests/request_test.rs | 7 ++++--- 6 files changed, 38 insertions(+), 39 deletions(-) diff --git a/compute/src/node.rs b/compute/src/node.rs index f4d9f79..58dc23c 100644 --- a/compute/src/node.rs +++ b/compute/src/node.rs @@ -354,17 +354,28 @@ impl DriaComputeNode { // respond w.r.t data let response_data = if let Ok(req) = SpecResponder::try_parse_request(&data) { + log::info!( + "Got a spec request from peer {} with id {}", + peer_id, + req.request_id + ); + let response = SpecResponder::respond(req, self.spec_collector.collect().await); - serde_json::to_vec(&response).unwrap() + serde_json::to_vec(&response)? } else if let Ok(req) = WorkflowResponder::try_parse_request(&data) { log::info!("Received a task request with id: {}", req.task_id); return Err(eyre::eyre!( "REQUEST RESPONSE FOR TASKS ARE NOT IMPLEMENTED YET" )); } else { - return Err(eyre::eyre!("Received unknown request: {:?}", data)); + return Err(eyre::eyre!( + "Received unknown request from {}: {:?}", + peer_id, + data, + )); }; + log::info!("Responding to peer {}", peer_id); self.p2p.respond(response_data, channel).await } diff --git a/compute/src/responders/specs.rs b/compute/src/responders/specs.rs index 83a159e..179573b 100644 --- a/compute/src/responders/specs.rs +++ b/compute/src/responders/specs.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] pub struct Request { /// UUID of the specs request, prevents replay attacks. - request_id: String, + pub request_id: String, } #[derive(Serialize, Deserialize)] diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index 38181f9..49f8f0e 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -53,12 +53,9 @@ impl DriaBehaviour { fn create_request_response_behaviour( protocol_name: StreamProtocol, ) -> request_response::cbor::Behaviour, Vec> { - use request_response::{Behaviour, ProtocolSupport}; + use request_response::{Behaviour, Config, ProtocolSupport}; - Behaviour::new( - [(protocol_name, ProtocolSupport::Full)], - request_response::Config::default(), - ) + Behaviour::new([(protocol_name, ProtocolSupport::Full)], Config::default()) } /// Configures the connection limits. diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index e50163f..bd824b5 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -41,17 +41,22 @@ impl Default for DriaP2PProtocol { impl DriaP2PProtocol { /// Creates a new instance of the protocol with the given `name` and `version`. - pub fn new(name: &str, version: &str) -> Self { + pub fn new(name: impl ToString, version: impl ToString) -> Self { + let name = name.to_string(); + let version = version.to_string(); + let identity = format!("{}/{}", name, version); - let kademlia = format!("/{}/kad/{}", name, version); - let request_response = format!("/{}/rr/{}", name, version); + let kademlia = + StreamProtocol::try_from_owned(format!("/{}/kad/{}", name, version)).unwrap(); + let request_response = + StreamProtocol::try_from_owned(format!("/{}/rr/{}", name, version)).unwrap(); Self { - name: name.to_string(), - version: version.to_string(), + name, + version, identity, - kademlia: StreamProtocol::try_from_owned(kademlia).unwrap(), // guaranteed to unwrap - request_response: StreamProtocol::try_from_owned(request_response).unwrap(), // guaranteed to unwrap + kademlia, + request_response, } } diff --git a/p2p/tests/gossipsub_test.rs b/p2p/tests/gossipsub_test.rs index 1b62443..0d8f61c 100644 --- a/p2p/tests/gossipsub_test.rs +++ b/p2p/tests/gossipsub_test.rs @@ -35,19 +35,11 @@ async fn test_gossipsub() -> Result<()> { listen_addr, &nodes, DriaP2PProtocol::default(), - ) - .expect("could not create p2p client"); - - // spawn task + )?; let task_handle = tokio::spawn(async move { client.run().await }); - // subscribe to the given topic - commander - .subscribe(TOPIC) - .await - .expect("could not subscribe"); - // wait for a single gossipsub message on this topic + commander.subscribe(TOPIC).await?; log::info!("Waiting for messages..."); let message = msg_rx.recv().await; match message { @@ -58,20 +50,13 @@ async fn test_gossipsub() -> Result<()> { log::warn!("No message received for topic: {}", TOPIC); } } + commander.unsubscribe(TOPIC).await?; - // unsubscribe to the given topic - commander - .unsubscribe(TOPIC) - .await - .expect("could not unsubscribe"); - - // close command channel - commander.shutdown().await.expect("could not shutdown"); - - // close message channel + // close everything + commander.shutdown().await?; msg_rx.close(); - log::info!("Waiting for p2p task to finish..."); + // wait for handle to return task_handle.await?; log::info!("Done!"); diff --git a/p2p/tests/request_test.rs b/p2p/tests/request_test.rs index 7f5dadc..485fc39 100644 --- a/p2p/tests/request_test.rs +++ b/p2p/tests/request_test.rs @@ -1,5 +1,6 @@ use std::str::FromStr; +use dkn_p2p::DriaNetworkType::Community; use dkn_p2p::{DriaNodes, DriaP2PClient, DriaP2PProtocol}; use eyre::Result; use libp2p::PeerId; @@ -25,9 +26,9 @@ async fn test_request_message() -> Result<()> { let listen_addr = "/ip4/0.0.0.0/tcp/4001".parse()?; // prepare nodes - let nodes = DriaNodes::new(dkn_p2p::DriaNetworkType::Community) - .with_bootstrap_nodes(["/ip4/44.206.245.139/tcp/4001/p2p/16Uiu2HAm4q3LZU2T9kgjKK4ysy6KZYKLq8KiXQyae4RHdF7uqSt4".parse()?]) - .with_relay_nodes(["/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa".parse()?]); + let nodes = DriaNodes::new(Community) + .with_bootstrap_nodes(Community.get_static_bootstrap_nodes()) + .with_relay_nodes(Community.get_static_relay_nodes()); // spawn P2P client in another task let (client, mut commander, mut msg_rx, mut req_rx) = DriaP2PClient::new(