Skip to content

Commit

Permalink
better peer addr, check listen addr in use
Browse files Browse the repository at this point in the history
  • Loading branch information
erhant committed Oct 3, 2024
1 parent 6997c8f commit 9a7c066
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 28 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dkn-compute"
version = "0.2.8"
version = "0.2.9"
edition = "2021"
license = "Apache-2.0"
readme = "README.md"
Expand Down Expand Up @@ -70,6 +70,7 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "7ce9f9
libp2p-identity = { version = "0.2.9", features = ["secp256k1"] }
tracing = { version = "0.1.40" }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
port_check = "0.2.1"

# Vendor OpenSSL so that its easier to build cross-platform packages
[dependencies.openssl]
Expand Down
25 changes: 20 additions & 5 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ mod models;
mod ollama;
mod openai;

use crate::utils::crypto::to_address;
use crate::utils::{address_in_use, crypto::to_address};
use eyre::{eyre, Result};
use libp2p::Multiaddr;
use libsecp256k1::{PublicKey, SecretKey};
use models::ModelConfig;
use ollama::OllamaConfig;
use ollama_workflows::ModelProvider;
use openai::OpenAIConfig;

use std::{env, time::Duration};
use std::{env, str::FromStr, time::Duration};

/// Timeout duration for checking model performance during a generation.
const CHECK_TIMEOUT_DURATION: Duration = Duration::from_secs(80);
Expand All @@ -28,8 +29,8 @@ pub struct DriaComputeNodeConfig {
pub address: [u8; 20],
/// Admin public key, used for message authenticity.
pub admin_public_key: PublicKey,
/// P2P listen address as a string, e.g. `/ip4/0.0.0.0/tcp/4001`.
pub p2p_listen_addr: String,
/// P2P listen address, e.g. `/ip4/0.0.0.0/tcp/4001`.
pub p2p_listen_addr: Multiaddr,
/// Available LLM models & providers for the node.
pub model_config: ModelConfig,
/// Even if Ollama is not used, we store the host & port here.
Expand Down Expand Up @@ -104,9 +105,11 @@ impl DriaComputeNodeConfig {
}
log::info!("Models: {:?}", model_config.models);

let p2p_listen_addr = env::var("DKN_P2P_LISTEN_ADDR")
let p2p_listen_addr_str = env::var("DKN_P2P_LISTEN_ADDR")
.map(|addr| addr.trim_matches('"').to_string())
.unwrap_or(DEFAULT_P2P_LISTEN_ADDR.to_string());
let p2p_listen_addr = Multiaddr::from_str(&p2p_listen_addr_str)
.expect("Could not parse the given P2P listen address.");

Self {
admin_public_key,
Expand Down Expand Up @@ -178,6 +181,18 @@ impl DriaComputeNodeConfig {
Ok(())
}
}

// ensure that listen address is free
pub fn check_address_in_use(&self) -> Result<()> {
if address_in_use(&self.p2p_listen_addr) {
return Err(eyre!(
"Listen address {} is already in use.",
self.p2p_listen_addr
));
}

Ok(())
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async fn main() -> Result<()> {

// create configurations & check required services
let config = DriaComputeNodeConfig::new();
config.check_address_in_use()?;
let service_check_token = token.clone();
let mut config_clone = config.clone();
let service_check_handle = tokio::spawn(async move {
Expand Down
7 changes: 3 additions & 4 deletions src/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use eyre::{eyre, Result};
use libp2p::{gossipsub, Multiaddr};
use std::{str::FromStr, time::Duration};
use libp2p::gossipsub;
use std::time::Duration;
use tokio_util::sync::CancellationToken;

use crate::{
Expand Down Expand Up @@ -40,7 +40,6 @@ impl DriaComputeNode {
cancellation: CancellationToken,
) -> Result<Self> {
let keypair = secret_to_keypair(&config.secret_key);
let listen_addr = Multiaddr::from_str(config.p2p_listen_addr.as_str())?;

// get available nodes (bootstrap, relay, rpc) for p2p
let available_nodes = AvailableNodes::default()
Expand All @@ -53,7 +52,7 @@ impl DriaComputeNode {
)
.sort_dedup();

let p2p = P2PClient::new(keypair, listen_addr, &available_nodes)?;
let p2p = P2PClient::new(keypair, config.p2p_listen_addr.clone(), &available_nodes)?;

Ok(DriaComputeNode {
p2p,
Expand Down
42 changes: 26 additions & 16 deletions src/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,7 @@ impl P2PClient {
///
/// - For Kademlia, we check the kademlia protocol and then add the address to the Kademlia routing table.
fn handle_identify_event(&mut self, peer_id: PeerId, info: identify::Info) {
// we only care about the observed address, although there may be other addresses at `info.listen_addrs`
// TODO: this may be wrong
let addr = info.observed_addr;

// check protocol string
// check identify protocol string
if info.protocol_version != P2P_PROTOCOL_STRING {
log::warn!(
"Identify: Peer {} has different Identify protocol: (them {}, you {})",
Expand All @@ -259,17 +255,31 @@ impl P2PClient {
{
// if it matches our protocol, add it to the Kademlia routing table
if *kad_protocol == P2P_KADEMLIA_PROTOCOL {
log::info!(
"Identify: {} peer {} identified at {}",
kad_protocol,
peer_id,
addr
);

self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr);
// filter listen addresses
let addrs = info.listen_addrs.into_iter().filter(|listen_addr| {
if let Some(Protocol::Ip4(ipv4_addr)) = listen_addr.iter().next() {
// ignore private & localhost addresses
!(ipv4_addr.is_private() || ipv4_addr.is_loopback())
} else {
// ignore non ipv4 addresses
false
}
});

// add them to kademlia
for addr in addrs {
log::info!(
"Identify: {} peer {} identified at {}",
kad_protocol,
peer_id,
addr
);

self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr);
}
} else {
log::warn!(
"Identify: Peer {} has different Kademlia version: (them {}, you {})",
Expand Down
35 changes: 34 additions & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ pub use message::DKNMessage;
mod available_nodes;
pub use available_nodes::AvailableNodes;

use std::time::{Duration, SystemTime};
use libp2p::{multiaddr::Protocol, Multiaddr};
use port_check::is_port_reachable;
use std::{
net::{Ipv4Addr, SocketAddrV4},
time::{Duration, SystemTime},
};

/// Returns the current time in nanoseconds since the Unix epoch.
///
Expand All @@ -23,6 +28,34 @@ pub fn get_current_time_nanos() -> u128 {
.as_nanos()
}

/// Checks if a given address is already in use locally.
/// This is mostly used to see if the P2P address is already in use.
///
/// Simply tries to connect with TCP to the given address.
#[inline]
pub fn address_in_use(addr: &Multiaddr) -> bool {
addr.iter()
// find the port within our multiaddr
.find_map(|p| {
if let Protocol::Tcp(port) = p {
Some(port)
} else {
None
}

// }
})
// check if its reachable or not
.map(|port| is_port_reachable(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)))
.unwrap_or_else(|| {
log::error!(
"Could not find any TCP port in the given address: {:?}",
addr
);
false
})
}

/// Utility to parse comma-separated string values, mostly read from the environment.
/// - Trims `"` from both ends at the start
/// - For each item, trims whitespace from both ends
Expand Down

0 comments on commit 9a7c066

Please sign in to comment.