Skip to content

Commit

Permalink
Merge pull request #155 from firstbatchxyz/erhant/task-monitor
Browse files Browse the repository at this point in the history
feat: task monitor
  • Loading branch information
erhant authored Dec 4, 2024
2 parents 014c5c8 + ecea7f3 commit f589c11
Show file tree
Hide file tree
Showing 24 changed files with 474 additions and 122 deletions.
3 changes: 1 addition & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ GEMINI_API_KEY=
OPENROUTER_API_KEY=

## Ollama (if used, optional) ##
# do not change this, it is used by Docker
OLLAMA_HOST=http://host.docker.internal
OLLAMA_HOST=http://localhost
# you can change the port if you would like
OLLAMA_PORT=11434
# if "true", automatically pull models from Ollama
Expand Down
28 changes: 24 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
[workspace]
resolver = "2"
members = ["compute", "p2p", "workflows", "utils"]
# compute node is the default member, until Oracle comes in
# then, a Launcher will be the default member
default-members = ["compute"]
members = ["compute", "p2p", "workflows", "utils", "monitor"]

[workspace.package]
edition = "2021"
version = "0.2.25"
version = "0.2.26"
license = "Apache-2.0"
readme = "README.md"

Expand Down
18 changes: 12 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,37 @@ endif
###############################################################################
.PHONY: launch # | Run with INFO logs in release mode
launch:
RUST_LOG=none,dkn_compute=info,dkn_workflows=info,dkn_p2p=info cargo run --release
cargo run --release --bin dkn-compute

.PHONY: run # | Run with INFO logs
run:
cargo run
cargo run --bin dkn-compute

.PHONY: monitor # | Run monitor node with INFO logs
monitor:
cargo run --bin dkn-monitor

.PHONY: debug # | Run with DEBUG logs with INFO log-level workflows
debug:
RUST_LOG=warn,dkn_compute=debug,dkn_workflows=debug,dkn_p2p=debug,ollama_workflows=info cargo run
RUST_LOG=warn,dkn_compute=debug,dkn_workflows=debug,dkn_p2p=debug,ollama_workflows=info \
cargo run --bin dkn-compute

.PHONY: trace # | Run with TRACE logs
trace:
RUST_LOG=warn,dkn_compute=trace,libp2p=debug cargo run
RUST_LOG=warn,dkn_compute=trace,libp2p=debug \
cargo run --bin dkn-compute

.PHONY: build # | Build
build:
cargo build --workspace

.PHONY: profile-cpu # | Profile CPU usage with flamegraph
profile-cpu:
DKN_EXIT_TIMEOUT=120 cargo flamegraph --root --profile=profiling
DKN_EXIT_TIMEOUT=120 cargo flamegraph --root --profile=profiling --bin dkn-compute

.PHONY: profile-mem # | Profile memory usage with instruments
profile-mem:
DKN_EXIT_TIMEOUT=120 cargo instruments --profile=profiling -t Allocations
DKN_EXIT_TIMEOUT=120 cargo instruments --profile=profiling -t Allocations --bin dkn-compute

.PHONY: ollama-versions
ollama-versions:
Expand Down
2 changes: 1 addition & 1 deletion compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ base64 = "0.22.0"
hex = "0.4.3"
hex-literal = "0.4.1"
uuid = { version = "1.8.0", features = ["v4"] }
rand.workspace = true

# logging & errors
rand.workspace = true
env_logger.workspace = true
log.workspace = true
eyre.workspace = true
Expand Down
14 changes: 3 additions & 11 deletions compute/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub(crate) const DEFAULT_P2P_LISTEN_ADDR: &str = "/ip4/0.0.0.0/tcp/4001";
#[allow(clippy::new_without_default)]
impl DriaComputeNodeConfig {
/// Creates new config from environment variables.
pub fn new() -> Self {
pub fn new(workflows: DriaWorkflowsConfig) -> Self {
let secret_key = match env::var("DKN_WALLET_SECRET_KEY") {
Ok(secret_env) => {
let secret_dec = hex::decode(secret_env.trim_start_matches("0x"))
Expand Down Expand Up @@ -91,15 +91,7 @@ impl DriaComputeNodeConfig {
hex::encode(admin_public_key.serialize_compressed())
);

let workflows =
DriaWorkflowsConfig::new_from_csv(&env::var("DKN_MODELS").unwrap_or_default());
#[cfg(not(test))]
if workflows.models.is_empty() {
log::error!("No models were provided, make sure to restart with at least one model provided within DKN_MODELS.");
panic!("No models provided.");
}
log::info!("Configured models: {:?}", workflows.models);

// parse listen address
let p2p_listen_addr_str = env::var("DKN_P2P_LISTEN_ADDR")
.map(|addr| addr.trim_matches('"').to_string())
.unwrap_or(DEFAULT_P2P_LISTEN_ADDR.to_string());
Expand Down Expand Up @@ -152,7 +144,7 @@ impl Default for DriaComputeNodeConfig {
);
env::set_var("DKN_MODELS", "gpt-3.5-turbo");

Self::new()
Self::new(Default::default())
}
}

Expand Down
4 changes: 2 additions & 2 deletions compute/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod pingpong;
pub use pingpong::PingpongHandler;
pub use pingpong::*;

mod workflow;
pub use workflow::WorkflowHandler;
pub use workflow::*;
8 changes: 4 additions & 4 deletions compute/src/handlers/pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use serde::{Deserialize, Serialize};
pub struct PingpongHandler;

#[derive(Serialize, Deserialize, Debug, Clone)]
struct PingpongPayload {
pub struct PingpongPayload {
/// UUID of the ping request, prevents replay attacks.
uuid: String,
/// Deadline for the ping request.
deadline: u128,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct PingpongResponse {
pub struct PingpongResponse {
/// UUID as given in the ping payload.
pub(crate) uuid: String,
/// Models available in the node.
Expand All @@ -26,8 +26,8 @@ struct PingpongResponse {
}

impl PingpongHandler {
pub(crate) const LISTEN_TOPIC: &'static str = "ping";
pub(crate) const RESPONSE_TOPIC: &'static str = "pong";
pub const LISTEN_TOPIC: &'static str = "ping";
pub const RESPONSE_TOPIC: &'static str = "pong";

/// Handles the ping message and responds with a pong message.
///
Expand Down
8 changes: 4 additions & 4 deletions compute/src/handlers/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use libsecp256k1::PublicKey;
use serde::Deserialize;
use tokio_util::either::Either;

use crate::payloads::{TaskErrorPayload, TaskRequestPayload, TaskResponsePayload, TaskStats};
use crate::payloads::*;
use crate::utils::DriaMessage;
use crate::workers::workflow::*;
use crate::DriaComputeNode;

pub struct WorkflowHandler;

#[derive(Debug, Deserialize)]
struct WorkflowPayload {
pub struct WorkflowPayload {
/// [Workflow](https://github.com/andthattoo/ollama-workflows/blob/main/src/program/workflow.rs) object to be parsed.
pub(crate) workflow: Workflow,
/// A lıst of model (that can be parsed into `Model`) or model provider names.
Expand All @@ -27,8 +27,8 @@ struct WorkflowPayload {
}

impl WorkflowHandler {
pub(crate) const LISTEN_TOPIC: &'static str = "task";
pub(crate) const RESPONSE_TOPIC: &'static str = "results";
pub const LISTEN_TOPIC: &'static str = "task";
pub const RESPONSE_TOPIC: &'static str = "results";

pub(crate) async fn handle_compute(
node: &mut DriaComputeNode,
Expand Down
14 changes: 8 additions & 6 deletions compute/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
pub(crate) mod config;
pub(crate) mod handlers;
pub(crate) mod node;
pub(crate) mod payloads;
pub(crate) mod utils;
pub(crate) mod workers;
pub mod config;
pub mod handlers;
pub mod node;
pub mod payloads;
pub mod utils;
pub mod workers;

/// Crate version of the compute node.
/// This value is attached within the published messages.
pub const DRIA_COMPUTE_NODE_VERSION: &str = env!("CARGO_PKG_VERSION");

pub use utils::refresh_dria_nodes;

pub use config::DriaComputeNodeConfig;
pub use node::DriaComputeNode;
19 changes: 15 additions & 4 deletions compute/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use dkn_compute::*;
use dkn_workflows::DriaWorkflowsConfig;
use eyre::Result;
use std::env;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
Expand All @@ -15,9 +16,6 @@ async fn main() -> Result<()> {
.filter_module("dkn_workflows", log::LevelFilter::Info)
.parse_default_env() // reads RUST_LOG variable
.init();
if let Err(e) = dotenv_result {
log::warn!("could not load .env file: {}", e);
}

log::info!(
r#"
Expand All @@ -31,6 +29,12 @@ async fn main() -> Result<()> {
"#
);

// log about env usage
match dotenv_result {
Ok(path) => log::info!("Loaded .env file at: {}", path.display()),
Err(e) => log::warn!("Could not load .env file: {}", e),
}

// task tracker for multiple threads
let task_tracker = TaskTracker::new();
let cancellation = CancellationToken::new();
Expand Down Expand Up @@ -61,7 +65,14 @@ async fn main() -> Result<()> {
});

// create configurations & check required services & address in use
let mut config = DriaComputeNodeConfig::new();
let workflows_config =
DriaWorkflowsConfig::new_from_csv(&env::var("DKN_MODELS").unwrap_or_default());
if workflows_config.models.is_empty() {
return Err(eyre::eyre!("No models were provided, make sure to restart with at least one model provided within DKN_MODELS."));
}

log::info!("Configured models: {:?}", workflows_config.models);
let mut config = DriaComputeNodeConfig::new(workflows_config);
config.assert_address_not_in_use()?;
// check services & models, will exit if there is an error
// since service check can take time, we allow early-exit here as well
Expand Down
Loading

0 comments on commit f589c11

Please sign in to comment.