Skip to content

Commit

Permalink
chore: attend pr review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
guidiaz committed Aug 20, 2024
1 parent 0a983af commit 26c27e1
Showing 1 changed file with 135 additions and 122 deletions.
257 changes: 135 additions & 122 deletions bridges/centralized-ethereum/src/actors/watch_dog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
use web3::{
contract::Contract,
transports::Http,
types::{H160, U256},
types::H160,
};
use witnet_net::client::tcp::{jsonrpc, JsonRpcClient};
use witnet_node::utils::stop_system_if_panicking;
Expand Down Expand Up @@ -68,6 +68,32 @@ impl Actor for WatchDog {
}
}

#[derive(Debug)]
enum WatchDogStatus {
// Add all of them here
UpAndRunning,
EvmDisconnect,
EvmSyncing,
EvmErrors,
WitDisconnect,
WitSyncing,
WitErrors,
}

impl WatchDogStatus {
fn to_string(&self) -> String {
match self {
WatchDogStatus::UpAndRunning => "up-and-running".to_string(),
WatchDogStatus::EvmDisconnect => "evm-disconnect".to_string(),
WatchDogStatus::EvmSyncing => "evm-syncing".to_string(),
WatchDogStatus::EvmErrors => format!("evm-errors"),
WatchDogStatus::WitDisconnect => "wit-disconnect".to_string(),
WatchDogStatus::WitSyncing => "wit-syncing".to_string(),
WatchDogStatus::WitErrors => format!("wit-errors"),
}
}
}

/// Required trait for being able to retrieve WatchDog address from system registry
impl actix::Supervised for WatchDog {}
impl SystemService for WatchDog {}
Expand Down Expand Up @@ -114,66 +140,34 @@ impl WatchDog {
let running_secs = self.start_ts.unwrap().elapsed().as_secs();

let fut = async move {
let mut status = "up-and-running".to_string();
let mut status = WatchDogStatus::UpAndRunning;

if let Err(err) = check_wit_connection_status(&wit_jsonrpc_socket).await {
status = err;
}
let wit_client = JsonRpcClient::start(&wit_jsonrpc_socket)
.expect("cannot start JSON/WIT connection");
let wit_account = match fetch_wit_account(&wit_client).await {
Ok(pkh) => pkh,
let wit_client = match JsonRpcClient::start(&wit_jsonrpc_socket) {
Ok(client) => client,
Err(_) => return (None, None),
};
let (wit_account, wit_balance, wit_utxos_above_threshold) = match fetch_wit_info(
&wit_client,
wit_utxo_min_value_threshold
).await {
Ok((wit_account, wit_balance, wit_utxos_above_threshold)) => {
(wit_account, wit_balance, wit_utxos_above_threshold)
}
Err(err) => {
if status.eq("up-and-running") {
if status == WatchDogStatus::UpAndRunning {
status = err;
}
None
}
};

let wit_balance = match wit_account.clone() {
Some(pkh) => match fetch_wit_account_balance(&wit_client, pkh.as_str()).await {
Ok(wit_balance) => wit_balance,
Err(err) => {
if status.eq("up-and-running") {
status = err;
}
None
}
},
None => None,
};

let wit_utxos_above_threshold = match wit_account.clone() {
Some(pkh) => {
match fetch_wit_account_count_utxos_above(
&wit_client,
pkh.as_str(),
wit_utxo_min_value_threshold,
)
.await
{
Ok(wit_utxos_above_threshold) => wit_utxos_above_threshold,
Err(err) => {
if status.eq("up-and-running") {
status = err;
}
None
}
}
(None, None, None)
}
None => None,
};

let eth_balance = match check_eth_account_balance(&eth_jsonrpc_url, eth_account).await {
Ok(Some(eth_balance)) => {
let eth_balance: f64 = eth_balance.to_string().parse().unwrap_or_default();
//Some(Unit::Wei(&eth_balance.to_string()).to_eth_str().unwrap_or_default()),
Some(eth_balance / 1000000000000000000.0)
}
Ok(None) => None,
Ok(eth_balance) => eth_balance,
Err(err) => {
if status.eq("up-and-running") {
if status == WatchDogStatus::UpAndRunning {
status = err;
}
None
Expand Down Expand Up @@ -224,7 +218,7 @@ impl WatchDog {
));
}
metrics.push_str(&format!("\"runningSecs\": {running_secs}, "));
metrics.push_str(&format!("\"status\": \"{status}\""));
metrics.push_str(&format!("\"status\": \"{}\"", status.to_string()));
metrics.push_str("}}");
log::info!("{metrics}");

Expand All @@ -248,27 +242,34 @@ impl WatchDog {
async fn check_eth_account_balance(
eth_jsonrpc_url: &str,
eth_account: H160,
) -> Result<Option<U256>, String> {
) -> Result<Option<f64>, WatchDogStatus> {
let web3_http = web3::transports::Http::new(eth_jsonrpc_url)
.map_err(|_e| "evm-disconnect".to_string())
.map_err(|_e| WatchDogStatus::EvmDisconnect)
.unwrap();

let web3 = web3::Web3::new(web3_http);
match web3.eth().syncing().await {
Ok(syncing) => match syncing {
web3::types::SyncState::NotSyncing => {
match web3.eth().balance(eth_account, None).await {
Ok(balance) => Ok(Some(balance)),
Ok(eth_balance) => {
let eth_balance: f64 = eth_balance.to_string().parse().unwrap_or_default();
Ok(Some(eth_balance / 1000000000000000000.0))
}
_ => Ok(None),
}
}
web3::types::SyncState::Syncing(_) => Err("evm-syncing".to_string()),
web3::types::SyncState::Syncing(_) => Err(WatchDogStatus::EvmSyncing),
},
Err(_e) => Err("evm-errors".to_string()),
Err(e) => {
log::debug!("check_eth_account_balance => {}", e);

Err(WatchDogStatus::EvmErrors)
}
}
}

async fn check_wit_connection_status(wit_jsonrpc_socket: &str) -> Result<(), String> {
async fn check_wit_connection_status(wit_jsonrpc_socket: &str) -> Result<(), WatchDogStatus> {
let (_handle, wit_client) = TcpSocket::new(wit_jsonrpc_socket).unwrap();
let wit_client = Arc::new(wit_client);
let res = wit_client.execute("syncStatus", json!(null));
Expand All @@ -277,88 +278,100 @@ async fn check_wit_connection_status(wit_jsonrpc_socket: &str) -> Result<(), Str

match res {
Ok(Ok(_)) => Ok(()),
Ok(Err(_)) => Err("wit-syncing".to_string()),
Err(_elapse) => Err("wit-disconnect".to_string()),
Ok(Err(_)) => Err(WatchDogStatus::WitSyncing),
Err(_elapse) => Err(WatchDogStatus::WitDisconnect),
}
}

async fn fetch_wit_account(wit_client: &Addr<JsonRpcClient>) -> Result<Option<String>, String> {
async fn fetch_wit_info (
wit_client: &Addr<JsonRpcClient>,
wit_utxos_min_threshold: u64,
) -> Result<(Option<String>, Option<f64>, Option<u64>), WatchDogStatus> {
let req = jsonrpc::Request::method("getPkh").timeout(Duration::from_secs(5));
let res = wit_client.send(req).await;
match res {
let wit_account = match res {
Ok(Ok(res)) => match serde_json::from_value::<String>(res) {
Ok(pkh) => Ok(Some(pkh)),
Err(_) => Ok(None),
Ok(pkh) => Some(pkh),
Err(_) => None,
},
Ok(Err(_)) => Ok(None),
Err(_) => Err("wit-errors-getPkh".to_string()),
}
}

async fn fetch_wit_account_balance(
wit_client: &Addr<JsonRpcClient>,
wit_account: &str,
) -> Result<Option<f64>, String> {
let req = jsonrpc::Request::method("getBalance")
.timeout(Duration::from_secs(5))
.params(vec![wit_account, "true"])
.expect("getBalance wrong params");

let res = wit_client.send(req).await;
let res = match res {
Ok(res) => res,
Err(_) => {
return Err("wit-errors-getBalance".to_string());
Ok(Err(_)) => None,
Err(err) => {
log::debug!("fetch_wit_info => {}", err);
return Err(WatchDogStatus::WitErrors);
}
};

match res {
Ok(value) => match value.get("total") {
Some(value) => match value.as_f64() {
Some(value) => Ok(Some(value / 1000000000.0)),
None => Ok(None),
},
None => Ok(None),
},
Err(_) => Err("wit-errors-getBalance".to_string()),
}
}

async fn fetch_wit_account_count_utxos_above(
wit_client: &Addr<JsonRpcClient>,
wit_account: &str,
threshold: u64,
) -> Result<Option<u64>, String> {
let req = jsonrpc::Request::method("getUtxoInfo")
.timeout(Duration::from_secs(5))
.params(wit_account)
.expect("getUtxoInfo wrong params");

let res = wit_client.send(req).await;
let res = match res {
Ok(res) => res,
Err(_) => {
return Err("wit-errors-getUtxoInfo".to_string());
let wit_account_balance = match wit_account.clone() {
Some(wit_account) => {
let req = jsonrpc::Request::method("getBalance")
.timeout(Duration::from_secs(5))
.params(wit_account)
.expect("getBalance wrong params");
let res = wit_client.send(req).await;
let res = match res {
Ok(res) => res,
Err(err) => {
log::debug!("fetch_wit_info => {}", err);
return Err(WatchDogStatus::WitErrors);
}
};
match res {
Ok(value) => match value.get("total") {
Some(value) => match value.as_f64() {
Some(value) => Some(value / 1000000000.0),
None => None,
},
None => None,
},
Err(err) => {
log::debug!("fetch_wit_info => {}", err);
return Err(WatchDogStatus::WitErrors);
}
}
}
None => None,
};

match res {
Ok(utxo_info) => {
if let Some(utxos) = utxo_info["utxos"].as_array() {
let mut counter: u64 = u64::default();
for utxo in utxos {
if let Some(value) = utxo["value"].as_u64() {
if value >= threshold {
counter += 1;
let wit_utxos_above_threshold = match wit_account.clone() {
Some(wit_account) => {
let req = jsonrpc::Request::method("getUtxoInfo")
.timeout(Duration::from_secs(5))
.params(wit_account)
.expect("getUtxoInfo wrong params");
let res = wit_client.send(req).await;
let res = match res {
Ok(res) => res,
Err(err) => {
log::debug!("fetch_wit_info => {}", err);
return Err(WatchDogStatus::WitErrors);
}
};
match res {
Ok(utxo_info) => {
if let Some(utxos) = utxo_info["utxos"].as_array() {
let mut counter: u64 = u64::default();
for utxo in utxos {
if let Some(value) = utxo["value"].as_u64() {
if value >= wit_utxos_min_threshold {
counter += 1;
}
}
}

Some(counter)
} else {
None
}
}

Ok(Some(counter))
} else {
Ok(None)
Err(err) => {
log::debug!("fetch_wit_info => {}", err);
return Err(WatchDogStatus::WitErrors);
}
}
}
Err(_) => Err("wit-errors-getUtxoInfo".to_string()),
}
None => None,
};


Ok((wit_account, wit_account_balance, wit_utxos_above_threshold))
}

0 comments on commit 26c27e1

Please sign in to comment.