Skip to content

Commit

Permalink
Add metrics server
Browse files Browse the repository at this point in the history
  • Loading branch information
gemcoder21 committed Jun 26, 2024
1 parent 0ea704d commit 72876b4
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 32 deletions.
19 changes: 7 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,19 @@ use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let config = config::NodeConfig::new().unwrap();
let config = config::NodeConfig::new()?;

//println!("config: {:?}", config);

let node_address = SocketAddr::from((
IpAddr::from_str(config.address.as_str()).unwrap(),
config.port,
));
let node_address = SocketAddr::from((IpAddr::from_str(config.address.as_str())?, config.port));
let metrics_address = SocketAddr::from((
IpAddr::from_str(config.metrics.address.as_str()).unwrap(),
IpAddr::from_str(config.metrics.address.as_str())?,
config.metrics.port,
));

let node_listener = TcpListener::bind(node_address).await?;
let metrics_listener = TcpListener::bind(metrics_address).await?;

let metrics = Metrics::new();
let node_service = NodeService::new(config.domains_map());
let node_service = NodeService::new(config.domains_map(), metrics.clone());
let node_service_clone = node_service.clone();
tokio::task::spawn(async move {
node_service_clone.update_block_numbers().await;
Expand All @@ -50,8 +45,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (stream, _) = node_listener.accept().await.unwrap();
let io = TokioIo::new(stream);

metrics.add_total_requests();

let service = node_service.clone().get_proxy_request().await.clone();

tokio::task::spawn(async move {
Expand All @@ -67,7 +60,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (stream, _) = metrics_listener.accept().await.unwrap();
let io = TokioIo::new(stream);

let metrics_service = MetricsService {};
let metrics_service = MetricsService {
metrics: metrics.clone(),
};

tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
Expand Down
37 changes: 31 additions & 6 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::sync::Arc;

use prometheus_client::encoding::text::encode;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Metrics {
registry: Registry,
registry: Arc<Registry>,
proxy_request_counter: Counter<u64>,
proxy_requests: Family<ProxyStateLabels, Gauge>,
proxy_requests: Family<HostStateLabels, Gauge>,
proxy_requests_response: Family<ProxyStateLabels, Gauge>,
node_block_latest: Family<HostStateLabels, Gauge>,
}

Expand All @@ -27,7 +31,8 @@ pub(crate) struct ProxyStateLabels {
impl Metrics {
pub fn new() -> Self {
let proxy_request_counter: Counter<u64> = Default::default();
let proxy_requests = Family::<ProxyStateLabels, Gauge>::default();
let proxy_requests = Family::<HostStateLabels, Gauge>::default();
let proxy_requests_response = Family::<ProxyStateLabels, Gauge>::default();
let node_block_latest = Family::<HostStateLabels, Gauge>::default();

let mut registry = <Registry>::default();
Expand All @@ -41,16 +46,22 @@ impl Metrics {
"Proxy requests by host",
proxy_requests.clone(),
);
registry.register(
"proxy_requests_response",
"Proxy requests served by host",
proxy_requests_response.clone(),
);
registry.register(
"node_block_latest",
"Node block latest",
node_block_latest.clone(),
);

Self {
registry,
registry: Arc::new(registry),
proxy_request_counter,
proxy_requests,
proxy_requests_response,
node_block_latest,
}
}
Expand All @@ -59,8 +70,16 @@ impl Metrics {
self.proxy_request_counter.inc();
}

pub fn add_proxy_request(&self, host: &str, status: u64, latency: u64) {
pub fn add_proxy_request(&self, host: &str) {
self.proxy_requests
.get_or_create(&HostStateLabels {
host: host.to_string(),
})
.inc();
}

pub fn add_proxy_request_response(&self, host: &str, status: u64, latency: u64) {
self.proxy_requests_response
.get_or_create(&ProxyStateLabels {
host: host.to_string(),
status,
Expand All @@ -77,6 +96,12 @@ impl Metrics {
.set(value as i64);
}

pub fn get_metrics(&self) -> String {
let mut buffer = String::new();
encode(&mut buffer, &self.registry).unwrap();
buffer
}

// fn metrics_logger(&self) -> MetricsLogger {
// MetricsLogger {
// request_counter: &self.request_counter,
Expand Down
14 changes: 8 additions & 6 deletions src/metrics_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@ use futures::Future;
use http_body_util::Full;
use hyper::{body::Incoming as IncomingBody, service::Service, Request, Response};

use crate::metrics::Metrics;

#[derive(Debug, Clone)]
pub struct MetricsService {}
pub struct MetricsService {
pub metrics: Metrics,
}

impl Service<Request<IncomingBody>> for MetricsService {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn call(&self, _req: Request<IncomingBody>) -> Self::Future {
fn mk_response(s: String) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap())
}

let res = mk_response("oh no! not found".into());
let res = Ok(Response::builder()
.body(Full::new(Bytes::from(self.metrics.get_metrics())))
.unwrap());

Box::pin(async { res })
}
Expand Down
6 changes: 5 additions & 1 deletion src/node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};

use crate::config::Url;
use crate::metrics::Metrics;
use crate::{
chain_service::ChainService,
chain_type::ChainType,
Expand All @@ -16,6 +17,7 @@ use crate::{
pub struct NodeService {
pub domains: HashMap<String, Domain>,
pub nodes: Arc<Mutex<HashMap<String, NodeDomain>>>,
pub metrics: Metrics,
}

#[derive(Debug)]
Expand All @@ -33,7 +35,7 @@ pub struct NodeResult {
}

impl NodeService {
pub fn new(domains: HashMap<String, Domain>) -> Self {
pub fn new(domains: HashMap<String, Domain>, metrics: Metrics) -> Self {
//
let mut hash_map: HashMap<String, NodeDomain> = HashMap::new();

Expand All @@ -45,12 +47,14 @@ impl NodeService {
Self {
domains,
nodes: Arc::new(Mutex::new(hash_map)),
metrics,
}
}

pub async fn get_proxy_request(&self) -> ProxyRequestService {
ProxyRequestService {
domains: self.get_node_domains().await,
metrics: self.metrics.clone(),
}
}

Expand Down
22 changes: 15 additions & 7 deletions src/proxy_request_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ use std::str::FromStr;

use crate::config::Url;
use crate::logger::{log_incoming_request, log_proxy_response};
use crate::metrics::Metrics;
use crate::request_url::RequestUrl;

#[derive(Debug, Clone)]
pub struct ProxyRequestService {
pub domains: HashMap<String, NodeDomain>,
pub metrics: Metrics,
}

#[derive(Debug, Clone)]
Expand All @@ -40,6 +42,8 @@ impl Service<Request<IncomingBody>> for ProxyRequestService {
.to_str()
.unwrap_or_default();

self.metrics.add_total_requests();

log_incoming_request(&req);

match self.domains.get(host) {
Expand All @@ -51,19 +55,23 @@ impl Service<Request<IncomingBody>> for ProxyRequestService {
req.uri(),
);

self.metrics.add_proxy_request(host);

async move { proxy_pass(req, url).await }.boxed()
}
_ => async move { unsupported_chain(req).await }.boxed(), //async move { handle_request(req).await }.boxed(), //Ok(Response::new(Full::from("unsupported domain")))},
_ => async move { Self::unsupported_chain(req).await }.boxed(), //async move { handle_request(req).await }.boxed(), //Ok(Response::new(Full::from("unsupported domain")))},
}
}
}

async fn unsupported_chain(
_req: Request<IncomingBody>,
) -> Result<Response<Full<Bytes>>, Box<dyn std::error::Error + Send + Sync>> {
Ok(Response::builder()
.body(Full::new(Bytes::from("unsupported domain")))
.unwrap())
impl ProxyRequestService {
async fn unsupported_chain(
_req: Request<IncomingBody>,
) -> Result<Response<Full<Bytes>>, Box<dyn std::error::Error + Send + Sync>> {
Ok(Response::builder()
.body(Full::new(Bytes::from("unsupported domain")))
.unwrap())
}
}

async fn proxy_pass(
Expand Down

0 comments on commit 72876b4

Please sign in to comment.