Skip to content

Commit

Permalink
Merge pull request #148 from firstbatchxyz/erhant/node-refresh-rfks-p…
Browse files Browse the repository at this point in the history
…eer-fixes

feat: multi-network & peer disconnections
  • Loading branch information
erhant authored Nov 26, 2024
2 parents ee8790f + 5a6feb5 commit 702eb28
Show file tree
Hide file tree
Showing 26 changed files with 1,316 additions and 752 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ default-members = ["compute"]

[workspace.package]
edition = "2021"
version = "0.2.22"
version = "0.2.23"
license = "Apache-2.0"
readme = "README.md"

Expand Down
35 changes: 35 additions & 0 deletions compute/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,33 @@ use libsecp256k1::{PublicKey, SecretKey};

use std::{env, str::FromStr};

/// Network type.
#[derive(Default, Debug, Clone, Copy)]
pub enum DriaNetworkType {
#[default]
Community,
Pro,
}

impl From<&str> for DriaNetworkType {
fn from(s: &str) -> Self {
match s {
"community" => DriaNetworkType::Community,
"pro" => DriaNetworkType::Pro,
_ => Default::default(),
}
}
}

impl DriaNetworkType {
pub fn protocol_name(&self) -> &str {
match self {
DriaNetworkType::Community => "dria",
DriaNetworkType::Pro => "dria-sdk",
}
}
}

#[derive(Debug, Clone)]
pub struct DriaComputeNodeConfig {
/// Wallet secret/private key.
Expand All @@ -23,6 +50,8 @@ pub struct DriaComputeNodeConfig {
pub p2p_listen_addr: Multiaddr,
/// Workflow configurations, e.g. models and providers.
pub workflows: DriaWorkflowsConfig,
/// Network type of the node.
pub network_type: DriaNetworkType,
}

/// The default P2P network listen address.
Expand Down Expand Up @@ -104,13 +133,19 @@ impl DriaComputeNodeConfig {
let p2p_listen_addr = Multiaddr::from_str(&p2p_listen_addr_str)
.expect("Could not parse the given P2P listen address.");

// parse network type
let network_type = env::var("DKN_NETWORK")
.map(|s| DriaNetworkType::from(s.as_str()))
.unwrap_or_default();

Self {
admin_public_key,
secret_key,
public_key,
address,
workflows,
p2p_listen_addr,
network_type,
}
}

Expand Down
2 changes: 1 addition & 1 deletion compute/src/handlers/pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl ComputeHandler for PingpongHandler {
Self::RESPONSE_TOPIC,
&node.config.secret_key,
);
node.publish(message)?;
node.publish(message).await?;

Ok(MessageAcceptance::Accept)
}
Expand Down
5 changes: 2 additions & 3 deletions compute/src/handlers/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ impl ComputeHandler for WorkflowHandler {
};

// try publishing the result

if let Err(publish_err) = node.publish(message) {
if let Err(publish_err) = node.publish(message).await {
let err_msg = format!("Could not publish result: {:?}", publish_err);
log::error!("{}", err_msg);

Expand All @@ -175,7 +174,7 @@ impl ComputeHandler for WorkflowHandler {
Self::RESPONSE_TOPIC,
&node.config.secret_key,
);
node.publish(message)?;
node.publish(message).await?;
}

Ok(acceptance)
Expand Down
2 changes: 1 addition & 1 deletion compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ pub(crate) mod utils;
/// This value is attached within the published messages.
pub const DRIA_COMPUTE_NODE_VERSION: &str = env!("CARGO_PKG_VERSION");

pub use config::DriaComputeNodeConfig;
pub use config::{DriaComputeNodeConfig, DriaNetworkType};
pub use node::DriaComputeNode;
69 changes: 36 additions & 33 deletions compute/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dkn_compute::*;
use eyre::{Context, Result};
use eyre::Result;
use std::env;
use tokio_util::sync::CancellationToken;

Expand All @@ -25,7 +25,7 @@ async fn main() -> Result<()> {
██║ ██║██╔══██╗██║██╔══██║ https://dria.co
██████╔╝██║ ██║██║██║ ██║
╚═════╝ ╚═╝ ╚═╝╚═╝╚═╝ ╚═╝
"#,
"#
);

let token = CancellationToken::new();
Expand All @@ -52,10 +52,6 @@ async fn main() -> Result<()> {
let service_check_token = token.clone();
let config = tokio::spawn(async move {
tokio::select! {
_ = service_check_token.cancelled() => {
log::info!("Service check cancelled.");
config
}
result = config.workflows.check_services() => {
if let Err(err) = result {
log::error!("Error checking services: {:?}", err);
Expand All @@ -64,38 +60,44 @@ async fn main() -> Result<()> {
log::warn!("Using models: {:#?}", config.workflows.models);
config
}
_ = service_check_token.cancelled() => {
log::info!("Service check cancelled.");
config
}
}
})
.await
.wrap_err("error during service checks")?;

if !token.is_cancelled() {
// launch the node in a separate thread
let node_token = token.clone();
let node_handle = tokio::spawn(async move {
match DriaComputeNode::new(config, node_token).await {
Ok(mut node) => {
if let Err(err) = node.launch().await {
log::error!("Node launch error: {}", err);
panic!("Node failed.")
};
}
Err(err) => {
log::error!("Node setup error: {}", err);
panic!("Could not setup node.")
}
}
});
.await?;

// wait for tasks to complete
if let Err(err) = node_handle.await {
log::error!("Node handle error: {}", err);
panic!("Could not exit Node thread handle.");
};
} else {
log::warn!("Not launching node due to early exit.");
// check early exit due to failed service check
if token.is_cancelled() {
log::warn!("Not launching node due to early exit, bye!");
return Ok(());
}

let node_token = token.clone();
let (mut node, p2p) = DriaComputeNode::new(config, node_token).await?;

// launch the p2p in a separate thread
log::info!("Spawning peer-to-peer client thread.");
let p2p_handle = tokio::spawn(async move { p2p.run().await });

// launch the node in a separate thread
log::info!("Spawning compute node thread.");
let node_handle = tokio::spawn(async move {
if let Err(err) = node.launch().await {
log::error!("Node launch error: {}", err);
panic!("Node failed.")
};
});

// wait for tasks to complete
if let Err(err) = node_handle.await {
log::error!("Node handle error: {}", err);
};
if let Err(err) = p2p_handle.await {
log::error!("P2P handle error: {}", err);
};

log::info!("Bye!");
Ok(())
}
Expand Down Expand Up @@ -158,6 +160,7 @@ async fn wait_for_termination(cancellation: CancellationToken) -> Result<()> {
Ok(())
}

// #[deprecated]
/// Very CRUDE fix due to launcher log level bug
///
/// TODO: remove me later when the launcher is fixed
Expand Down
Loading

0 comments on commit 702eb28

Please sign in to comment.