diff --git a/src/main.rs b/src/main.rs index dff1acc..ca3ac28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,16 +22,11 @@ use tokio::net::TcpListener; #[tokio::main] async fn main() -> Result<(), Box> { - let config = config::NodeConfig::new().unwrap(); + let config = config::NodeConfig::new()?; - //println!("config: {:?}", config); - - let node_address = SocketAddr::from(( - IpAddr::from_str(config.address.as_str()).unwrap(), - config.port, - )); + let node_address = SocketAddr::from((IpAddr::from_str(config.address.as_str())?, config.port)); let metrics_address = SocketAddr::from(( - IpAddr::from_str(config.metrics.address.as_str()).unwrap(), + IpAddr::from_str(config.metrics.address.as_str())?, config.metrics.port, )); @@ -39,7 +34,7 @@ async fn main() -> Result<(), Box> { let metrics_listener = TcpListener::bind(metrics_address).await?; let metrics = Metrics::new(); - let node_service = NodeService::new(config.domains_map()); + let node_service = NodeService::new(config.domains_map(), metrics.clone()); let node_service_clone = node_service.clone(); tokio::task::spawn(async move { node_service_clone.update_block_numbers().await; @@ -50,8 +45,6 @@ async fn main() -> Result<(), Box> { let (stream, _) = node_listener.accept().await.unwrap(); let io = TokioIo::new(stream); - metrics.add_total_requests(); - let service = node_service.clone().get_proxy_request().await.clone(); tokio::task::spawn(async move { @@ -67,7 +60,9 @@ async fn main() -> Result<(), Box> { let (stream, _) = metrics_listener.accept().await.unwrap(); let io = TokioIo::new(stream); - let metrics_service = MetricsService {}; + let metrics_service = MetricsService { + metrics: metrics.clone(), + }; tokio::task::spawn(async move { if let Err(err) = http1::Builder::new() diff --git a/src/metrics.rs b/src/metrics.rs index 960711d..b9762d4 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,14 +1,18 @@ +use std::sync::Arc; + +use prometheus_client::encoding::text::encode; use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prometheus_client::metrics::gauge::Gauge; use prometheus_client::registry::Registry; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Metrics { - registry: Registry, + registry: Arc, proxy_request_counter: Counter, - proxy_requests: Family, + proxy_requests: Family, + proxy_requests_response: Family, node_block_latest: Family, } @@ -27,7 +31,8 @@ pub(crate) struct ProxyStateLabels { impl Metrics { pub fn new() -> Self { let proxy_request_counter: Counter = Default::default(); - let proxy_requests = Family::::default(); + let proxy_requests = Family::::default(); + let proxy_requests_response = Family::::default(); let node_block_latest = Family::::default(); let mut registry = ::default(); @@ -41,6 +46,11 @@ impl Metrics { "Proxy requests by host", proxy_requests.clone(), ); + registry.register( + "proxy_requests_response", + "Proxy requests served by host", + proxy_requests_response.clone(), + ); registry.register( "node_block_latest", "Node block latest", @@ -48,9 +58,10 @@ impl Metrics { ); Self { - registry, + registry: Arc::new(registry), proxy_request_counter, proxy_requests, + proxy_requests_response, node_block_latest, } } @@ -59,8 +70,16 @@ impl Metrics { self.proxy_request_counter.inc(); } - pub fn add_proxy_request(&self, host: &str, status: u64, latency: u64) { + pub fn add_proxy_request(&self, host: &str) { self.proxy_requests + .get_or_create(&HostStateLabels { + host: host.to_string(), + }) + .inc(); + } + + pub fn add_proxy_request_response(&self, host: &str, status: u64, latency: u64) { + self.proxy_requests_response .get_or_create(&ProxyStateLabels { host: host.to_string(), status, @@ -77,6 +96,12 @@ impl Metrics { .set(value as i64); } + pub fn get_metrics(&self) -> String { + let mut buffer = String::new(); + encode(&mut buffer, &self.registry).unwrap(); + buffer + } + // fn metrics_logger(&self) -> MetricsLogger { // MetricsLogger { // request_counter: &self.request_counter, diff --git a/src/metrics_service.rs b/src/metrics_service.rs index e660313..948acea 100644 --- a/src/metrics_service.rs +++ b/src/metrics_service.rs @@ -5,8 +5,12 @@ use futures::Future; use http_body_util::Full; use hyper::{body::Incoming as IncomingBody, service::Service, Request, Response}; +use crate::metrics::Metrics; + #[derive(Debug, Clone)] -pub struct MetricsService {} +pub struct MetricsService { + pub metrics: Metrics, +} impl Service> for MetricsService { type Response = Response>; @@ -14,11 +18,9 @@ impl Service> for MetricsService { type Future = Pin> + Send>>; fn call(&self, _req: Request) -> Self::Future { - fn mk_response(s: String) -> Result>, hyper::Error> { - Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap()) - } - - let res = mk_response("oh no! not found".into()); + let res = Ok(Response::builder() + .body(Full::new(Bytes::from(self.metrics.get_metrics()))) + .unwrap()); Box::pin(async { res }) } diff --git a/src/node_service.rs b/src/node_service.rs index 4b993a3..2e86c70 100644 --- a/src/node_service.rs +++ b/src/node_service.rs @@ -5,6 +5,7 @@ use tokio::sync::Mutex; use tokio::time::{sleep, Duration}; use crate::config::Url; +use crate::metrics::Metrics; use crate::{ chain_service::ChainService, chain_type::ChainType, @@ -16,6 +17,7 @@ use crate::{ pub struct NodeService { pub domains: HashMap, pub nodes: Arc>>, + pub metrics: Metrics, } #[derive(Debug)] @@ -33,7 +35,7 @@ pub struct NodeResult { } impl NodeService { - pub fn new(domains: HashMap) -> Self { + pub fn new(domains: HashMap, metrics: Metrics) -> Self { // let mut hash_map: HashMap = HashMap::new(); @@ -45,12 +47,14 @@ impl NodeService { Self { domains, nodes: Arc::new(Mutex::new(hash_map)), + metrics, } } pub async fn get_proxy_request(&self) -> ProxyRequestService { ProxyRequestService { domains: self.get_node_domains().await, + metrics: self.metrics.clone(), } } diff --git a/src/proxy_request_service.rs b/src/proxy_request_service.rs index a63e661..4bbb7f5 100644 --- a/src/proxy_request_service.rs +++ b/src/proxy_request_service.rs @@ -15,11 +15,13 @@ use std::str::FromStr; use crate::config::Url; use crate::logger::{log_incoming_request, log_proxy_response}; +use crate::metrics::Metrics; use crate::request_url::RequestUrl; #[derive(Debug, Clone)] pub struct ProxyRequestService { pub domains: HashMap, + pub metrics: Metrics, } #[derive(Debug, Clone)] @@ -40,6 +42,8 @@ impl Service> for ProxyRequestService { .to_str() .unwrap_or_default(); + self.metrics.add_total_requests(); + log_incoming_request(&req); match self.domains.get(host) { @@ -51,19 +55,23 @@ impl Service> for ProxyRequestService { req.uri(), ); + self.metrics.add_proxy_request(host); + async move { proxy_pass(req, url).await }.boxed() } - _ => async move { unsupported_chain(req).await }.boxed(), //async move { handle_request(req).await }.boxed(), //Ok(Response::new(Full::from("unsupported domain")))}, + _ => async move { Self::unsupported_chain(req).await }.boxed(), //async move { handle_request(req).await }.boxed(), //Ok(Response::new(Full::from("unsupported domain")))}, } } } -async fn unsupported_chain( - _req: Request, -) -> Result>, Box> { - Ok(Response::builder() - .body(Full::new(Bytes::from("unsupported domain"))) - .unwrap()) +impl ProxyRequestService { + async fn unsupported_chain( + _req: Request, + ) -> Result>, Box> { + Ok(Response::builder() + .body(Full::new(Bytes::from("unsupported domain"))) + .unwrap()) + } } async fn proxy_pass(