Skip to content

Commit

Permalink
Counters service with counters from ya-gsb-proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
pwalski committed Mar 29, 2024
1 parent 2ad3bac commit 2d53c17
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 85 deletions.
105 changes: 55 additions & 50 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ya-service-bus = "0.7.1"
ya-client-model = "0.6.0"
ya-agreement-utils = "0.5"
ya-transfer = { git = "https://github.com/golemfactory/yagna.git", rev = "b3f1e8238f26b224729c578eae6c29098d8800d7" }
ya-counters = { path = "../yagna/exe-unit/components/counters" }

actix = "0.13"
actix-rt = "2"
Expand All @@ -37,7 +38,8 @@ flexi_logger = { version = "0.28", features = ["colors"] }
regex = "1"
reqwest = { version = "0.11", features = ["blocking", "json"] }
async-stream = "0.3.5"
ya-gsb-http-proxy = { git = "https://github.com/golemfactory/yagna.git", rev = "b3f1e8238f26b224729c578eae6c29098d8800d7" }
# ya-gsb-http-proxy = { git = "https://github.com/golemfactory/yagna.git", rev = "b3f1e8238f26b224729c578eae6c29098d8800d7" }
ya-gsb-http-proxy = { path = "../yagna/exe-unit/components/gsb-http-proxy" }
http = "1.1.0"
bytes = "1.5.0"
humantime = "2.1"
Expand Down
4 changes: 4 additions & 0 deletions src/counter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/*
mod combined;
mod duration;
mod requests;
Expand All @@ -17,6 +18,8 @@ use self::{
type SharedCounters = Arc<RwLock<Vec<SupportedCounter>>>;
#[derive(Clone, Debug)]
pub struct Counters {
counters: SharedCounters,
Expand Down Expand Up @@ -339,3 +342,4 @@ mod tests {
(x * y).floor() / y
}
}
*/
2 changes: 2 additions & 0 deletions src/counter/combined.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/*
use chrono::{DateTime, Utc};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use ya_gsb_http_proxy::monitor::{RequestsMonitor, ResponseMonitor};
Expand Down Expand Up @@ -92,3 +93,4 @@ impl Drop for ResponseMonitors {
self.on_response();
}
}
*/
75 changes: 48 additions & 27 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use actix::prelude::*;
use anyhow::Context;
use chrono::Utc;
use clap::Parser;
use counter::Counters;
use futures::prelude::*;
use process::Runtime;
use tokio::select;
Expand All @@ -20,6 +19,10 @@ use ya_client_model::activity::ExeScriptCommand;
use ya_client_model::activity::{ActivityUsage, CommandResult, ExeScriptCommandResult};
use ya_core_model::activity;
use ya_core_model::activity::RpcMessageError;
use ya_counters::counters::TimeMetric;
use ya_counters::error::MetricError;
use ya_counters::message::GetMetrics;
use ya_counters::service::{MetricsService, MetricsServiceBuilder};
use ya_gsb_http_proxy::gsb_to_http::GsbToHttpProxy;
use ya_gsb_http_proxy::message::GsbHttpCallMessage;
use ya_service_bus::typed as gsb;
Expand All @@ -33,7 +36,6 @@ use crate::signal::SignalMonitor;

mod agreement;
mod cli;
mod counter;
mod logger;
mod offer_template;
mod process;
Expand All @@ -58,28 +60,45 @@ async fn activity_loop<T: process::Runtime + Clone + Unpin + 'static>(
report_url: &str,
activity_id: &str,
process: ProcessController<T>,
counters: Counters,
metrics: Addr<MetricsService>,
) -> anyhow::Result<()> {
let report_service = gsb::service(report_url);

while let Some(()) = process.report() {
let current_usage = counters.current_usage().await;
let timestamp = Utc::now().timestamp();
match report_service
.call(activity::local::SetUsage {
activity_id: activity_id.to_string(),
usage: ActivityUsage {
current_usage,
timestamp,
// make it a function
match metrics.send(GetMetrics).await {
Ok(resp) => match resp {
Ok(current_usage) => {
let timestamp = Utc::now().timestamp();
match report_service
.call(activity::local::SetUsage {
activity_id: activity_id.to_string(),
usage: ActivityUsage {
current_usage: Some(current_usage),
timestamp,
},
timeout: None,
})
.await
{
Ok(Ok(())) => log::debug!("Successfully sent activity usage message"),
Ok(Err(rpc_message_error)) => {
log::error!("rpcMessageError : {:?}", rpc_message_error)
}
Err(err) => log::error!("other error : {:?}", err),
}
}
Err(err) => match err {
MetricError::UsageLimitExceeded(info) => {
log::warn!("Usage limit exceeded: {}", info);
// TODO State::Terminated
}
error => log::warn!("Unable to retrieve metrics: {:?}", error),
},
timeout: None,
})
.await
{
Ok(Ok(())) => log::debug!("Successfully sent activity usage message"),
Ok(Err(rpc_message_error)) => log::error!("rpcMessageError : {:?}", rpc_message_error),
Err(err) => log::error!("other error : {:?}", err),
},
Err(e) => log::warn!("Unable to report activity usage: {:?}", e),
}
//

select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
Expand Down Expand Up @@ -202,7 +221,15 @@ async fn run<RUNTIME: process::Runtime + Clone + Unpin + 'static>(

let agreement = AgreementDesc::load(agreement_path)?;

let counters = Counters::start(&agreement.counters)?;
let mut gsb_proxy = GsbToHttpProxy::new("http://localhost:7861/".into());

let mut metrics = MetricsServiceBuilder::new(agreement.counters.clone(), Some(10000));
metrics
.with_metric(TimeMetric::ID, Box::new(TimeMetric::default()))
.with_metric("ai-runtime.requests", Box::new(gsb_proxy.requests_counter()))
// .with_metric("golem.usage.gpu-sec", gsb_proxy.requests_duration_counter());
;
let metrics = metrics.build().start();

let ctx = ExeUnitContext {
activity_id: activity_id.clone(),
Expand All @@ -223,7 +250,7 @@ async fn run<RUNTIME: process::Runtime + Clone + Unpin + 'static>(
report_url,
activity_id,
ctx.process_controller.clone(),
counters.clone(),
metrics.clone(),
);

#[cfg(target_os = "windows")]
Expand Down Expand Up @@ -402,14 +429,8 @@ async fn run<RUNTIME: process::Runtime + Clone + Unpin + 'static>(
}
});

let counters = counters.clone();
gsb::bind_stream(&exe_unit_url, move |message: GsbHttpCallMessage| {
let requests_monitor = counters.requests_monitor();
let mut proxy = GsbToHttpProxy {
base_url: "http://localhost:7861/".to_string(),
requests_monitor,
};
let stream = proxy.pass(message);
let stream = gsb_proxy.pass(message);
Box::pin(stream.map(Ok))
});
};
Expand Down
Loading

0 comments on commit 2d53c17

Please sign in to comment.