Skip to content

Commit

Permalink
fix(c-bridge): use wit/rpc client instead of tcp::socket to check wit…
Browse files Browse the repository at this point in the history
… connection status
  • Loading branch information
guidiaz committed Aug 21, 2024
1 parent 30c9d9c commit f621c07
Showing 1 changed file with 91 additions and 77 deletions.
168 changes: 91 additions & 77 deletions bridges/centralized-ethereum/src/actors/watch_dog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ use crate::{
config::Config,
};
use actix::prelude::*;
use async_jsonrpc_client::{transports::tcp::TcpSocket, Transport};
use futures_util::compat::Compat01As03;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{
sync::Arc,
time::{Duration, Instant},
Expand All @@ -23,7 +19,9 @@ use witnet_node::utils::stop_system_if_panicking;
/// in the DrDatabase
#[derive(Default)]
pub struct WatchDog {
/// JSON RPC connection to Wit/node
/// JSON WIT/RPC client connection to Wit/node
pub wit_client: Option<Addr<JsonRpcClient>>,
/// JSON WIT/RPC socket address
pub wit_jsonrpc_socket: String,
/// Bridge UTXO min value threshold
pub wit_utxo_min_value_threshold: u64,
Expand All @@ -43,11 +41,6 @@ pub struct WatchDog {
pub start_wit_balance: Option<f64>,
}

#[derive(Serialize, Deserialize)]
struct WatchDogOutput {
pub running_secs: u64,
}

impl Drop for WatchDog {
fn drop(&mut self) {
log::trace!("Dropping WatchDog");
Expand All @@ -68,28 +61,31 @@ impl Actor for WatchDog {
}
}

#[derive(Debug)]
#[derive(Debug, PartialEq)]
enum WatchDogStatus {
// Add all of them here
UpAndRunning,
EvmDisconnect,
EvmSyncing,
EvmErrors,
EvmSyncing,
WitAlmostSynced,
WitErrors,
WitDisconnect,
WitSyncing,
WitErrors,
WitWaitingConsensus,
UpAndRunning
}

impl WatchDogStatus {
fn to_string(&self) -> String {
match self {
WatchDogStatus::UpAndRunning => "up-and-running".to_string(),
WatchDogStatus::EvmDisconnect => "evm-disconnect".to_string(),
WatchDogStatus::EvmSyncing => "evm-syncing".to_string(),
WatchDogStatus::EvmErrors => format!("evm-errors"),
WatchDogStatus::EvmSyncing => "evm-syncing".to_string(),
WatchDogStatus::WitAlmostSynced => "wit-almost-synced".to_string(),
WatchDogStatus::WitDisconnect => "wit-disconnect".to_string(),
WatchDogStatus::WitSyncing => "wit-syncing".to_string(),
WatchDogStatus::WitErrors => format!("wit-errors"),
WatchDogStatus::WitSyncing => "wit-syncing".to_string(),
WatchDogStatus::WitWaitingConsensus => "wit-waiting-consensus".to_string(),
WatchDogStatus::UpAndRunning => "up-and-running".to_string(),
}
}
}
Expand All @@ -102,6 +98,7 @@ impl WatchDog {
/// Initialize from config
pub fn from_config(config: &Config, eth_contract: Arc<Contract<Http>>) -> Self {
Self {
wit_client: JsonRpcClient::start(config.witnet_jsonrpc_socket.to_string().as_str()).ok(),
wit_jsonrpc_socket: config.witnet_jsonrpc_socket.to_string(),
wit_utxo_min_value_threshold: config.witnet_utxo_min_value_threshold,
eth_account: config.eth_from,
Expand Down Expand Up @@ -132,6 +129,7 @@ impl WatchDog {
}
let start_eth_balance = self.start_eth_balance;
let start_wit_balance = self.start_wit_balance;
let wit_client = self.wit_client.clone();
let wit_jsonrpc_socket = self.wit_jsonrpc_socket.clone();
let wit_utxo_min_value_threshold = self.wit_utxo_min_value_threshold;
let eth_jsonrpc_url = self.eth_jsonrpc_url.clone();
Expand All @@ -142,27 +140,58 @@ impl WatchDog {
let fut = async move {
let mut status = WatchDogStatus::UpAndRunning;

if let Err(err) = check_wit_connection_status(&wit_jsonrpc_socket).await {
status = err;
}
let wit_client = match JsonRpcClient::start(&wit_jsonrpc_socket) {
Ok(client) => client,
Err(_) => return (None, None),
};
let (wit_account, wit_balance, wit_utxos_above_threshold) = match fetch_wit_info(
&wit_client,
wit_utxo_min_value_threshold
).await {
Ok((wit_account, wit_balance, wit_utxos_above_threshold)) => {
(wit_account, wit_balance, wit_utxos_above_threshold)
let dr_database = DrDatabase::from_registry();
let (_, drs_pending, drs_finished, _) =
dr_database.send(CountDrsPerState).await.unwrap().unwrap();

let mut metrics: String = "{".to_string();
metrics.push_str(&format!("\"drsFinished\": {drs_finished}, "));
metrics.push_str(&format!("\"drsPending\": {drs_pending}, "));
metrics.push_str(&format!("\"evmAccount\": \"{eth_account}\", "));

if let Some(wit_client) = wit_client {
if let Err(err) = check_wit_connection_status(&wit_client).await {
status = err;
}
Err(err) => {
if status == WatchDogStatus::UpAndRunning {
status = err;

let (wit_account, wit_balance, wit_utxos_above_threshold) = match fetch_wit_info(
&wit_client,
wit_utxo_min_value_threshold
).await {
Ok((wit_account, wit_balance, wit_utxos_above_threshold)) => {
(wit_account, wit_balance, wit_utxos_above_threshold)
}
(None, None, None)
Err(err) => {
if status == WatchDogStatus::UpAndRunning {
status = err;
}
(None, None, None)
}
};

if wit_account.is_some() {
metrics.push_str(&format!("\"witAccount\": {:?}, ", wit_account.unwrap()));
}
};
if wit_balance.is_some() {
let wit_balance = wit_balance.unwrap();
metrics.push_str(&format!("\"witBalance\": {:.5}, ", wit_balance));
if let Some(start_wit_balance) = start_wit_balance {
let wit_hourly_expenditure =
((start_wit_balance - wit_balance) / running_secs as f64) * 3600_f64;
metrics.push_str(&format!(
"\"witHourlyExpenditure\": {:.1}, ",
wit_hourly_expenditure
));
}
}
metrics.push_str(&format!("\"witNodeSocket\": \"{wit_jsonrpc_socket}\", "));
if wit_utxos_above_threshold.is_some() {
metrics.push_str(&format!(
"\"witUtxosAboveThreshold\": {}, ",
wit_utxos_above_threshold.unwrap()
));
}
}

let eth_balance = match check_eth_account_balance(&eth_jsonrpc_url, eth_account).await {
Ok(eth_balance) => eth_balance,
Expand All @@ -174,14 +203,6 @@ impl WatchDog {
}
};

let dr_database = DrDatabase::from_registry();
let (_, drs_pending, drs_finished, _) =
dr_database.send(CountDrsPerState).await.unwrap().unwrap();

let mut metrics: String = "{".to_string();
metrics.push_str(&format!("\"drsFinished\": {drs_finished}, "));
metrics.push_str(&format!("\"drsPending\": {drs_pending}, "));
metrics.push_str(&format!("\"evmAccount\": \"{eth_account}\", "));
if eth_balance.is_some() {
let eth_balance = eth_balance.unwrap();
metrics.push_str(&format!("\"evmBalance\": {:.5}, ", eth_balance));
Expand All @@ -195,33 +216,12 @@ impl WatchDog {
));
}
}
if wit_account.is_some() {
metrics.push_str(&format!("\"witAccount\": {:?}, ", wit_account.unwrap()));
}
if wit_balance.is_some() {
let wit_balance = wit_balance.unwrap();
metrics.push_str(&format!("\"witBalance\": {:.5}, ", wit_balance));
if let Some(start_wit_balance) = start_wit_balance {
let wit_hourly_expenditure =
((start_wit_balance - wit_balance) / running_secs as f64) * 3600_f64;
metrics.push_str(&format!(
"\"witHourlyExpenditure\": {:.1}, ",
wit_hourly_expenditure
));
}
}
metrics.push_str(&format!("\"witNodeSocket\": \"{}\", ", wit_jsonrpc_socket));
if wit_utxos_above_threshold.is_some() {
metrics.push_str(&format!(
"\"witUtxosAboveThreshold\": {}, ",
wit_utxos_above_threshold.unwrap()
));
}

metrics.push_str(&format!("\"runningSecs\": {running_secs}, "));
metrics.push_str(&format!("\"status\": \"{}\"", status.to_string()));
metrics.push_str("}}");
log::info!("{metrics}");

(eth_balance, wit_balance)
};

Expand Down Expand Up @@ -269,17 +269,31 @@ async fn check_eth_account_balance(
}
}

async fn check_wit_connection_status(wit_jsonrpc_socket: &str) -> Result<(), WatchDogStatus> {
let (_handle, wit_client) = TcpSocket::new(wit_jsonrpc_socket).unwrap();
let wit_client = Arc::new(wit_client);
let res = wit_client.execute("syncStatus", json!(null));
let res = Compat01As03::new(res);
let res = tokio::time::timeout(Duration::from_secs(5), res).await;

async fn check_wit_connection_status(wit_client: &Addr<JsonRpcClient>) -> Result<(), WatchDogStatus> {
let req = jsonrpc::Request::method("syncStatus").timeout(Duration::from_secs(5));
let res = wit_client.send(req).await;
match res {
Ok(Ok(_)) => Ok(()),
Ok(Err(_)) => Err(WatchDogStatus::WitSyncing),
Err(_elapse) => Err(WatchDogStatus::WitDisconnect),
Ok(Ok(result)) => {
if let Some(node_state) = result["node_state"].as_str() {
match node_state {
"Synced" => Ok(()),
"AlmostSynced" => Err(WatchDogStatus::WitAlmostSynced),
"WaitingConsensus" => Err(WatchDogStatus::WitWaitingConsensus),
_ => Err(WatchDogStatus::WitSyncing)
}
} else {
log::debug!("check_wit_connection_status => unknown node_state");
Err(WatchDogStatus::WitErrors)
}
}
Ok(Err(err)) => {
log::debug!("check_wit_connection_status => {}", err);
Err(WatchDogStatus::WitDisconnect)
}
Err(err) => {
log::debug!("check_wit_connection_status => {}", err);
Err(WatchDogStatus::WitDisconnect)
}
}
}

Expand Down

0 comments on commit f621c07

Please sign in to comment.