diff --git a/funds-manager/funds-manager-api/src/lib.rs b/funds-manager/funds-manager-api/src/lib.rs index 5b43139..954016f 100644 --- a/funds-manager/funds-manager-api/src/lib.rs +++ b/funds-manager/funds-manager-api/src/lib.rs @@ -26,6 +26,8 @@ pub const WITHDRAW_CUSTODY_ROUTE: &str = "withdraw"; pub const WITHDRAW_GAS_ROUTE: &str = "withdraw-gas"; /// The route to register a gas wallet for a peer pub const REGISTER_GAS_WALLET_ROUTE: &str = "register-gas-wallet"; +/// The route to report active peers +pub const REPORT_ACTIVE_PEERS_ROUTE: &str = "report-active-peers"; /// The route to get fee wallets pub const GET_FEE_WALLETS_ROUTE: &str = "get-fee-wallets"; @@ -114,6 +116,16 @@ pub struct RegisterGasWalletResponse { pub key: String, } +/// A request reporting active peers in the network +/// +/// The funds manager uses such a request to mark gas wallets as active or +/// inactive +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ReportActivePeersRequest { + /// The list of active peers + pub peers: Vec, +} + // --- Hot Wallets --- // /// The request body for creating a hot wallet diff --git a/funds-manager/funds-manager-server/src/custody_client/gas_wallets.rs b/funds-manager/funds-manager-server/src/custody_client/gas_wallets.rs index 7b26e0c..94c4add 100644 --- a/funds-manager/funds-manager-server/src/custody_client/gas_wallets.rs +++ b/funds-manager/funds-manager-server/src/custody_client/gas_wallets.rs @@ -1,5 +1,7 @@ //! Handlers for gas wallet operations +use std::str::FromStr; + use ethers::{ signers::{LocalWallet, Signer}, utils::hex::ToHexExt, @@ -8,6 +10,7 @@ use rand::thread_rng; use tracing::info; use crate::{ + db::models::GasWalletStatus, error::FundsManagerError, helpers::{create_secrets_manager_entry_with_description, get_secret}, }; @@ -61,6 +64,41 @@ impl CustodyClient { Ok(secret_value) } + /// Record the set of active peers, marking their gas wallets as active and + /// transitioning the rest to inactive or pending if necessary + pub(crate) async fn record_active_gas_wallet( + &self, + active_peers: Vec, + ) -> Result<(), FundsManagerError> { + // Fetch all gas wallets + let all_wallets = self.get_all_gas_wallets().await?; + + // For those gas wallets whose peer is not in the active peers list, mark them + // as inactive + for wallet in all_wallets { + let state = + GasWalletStatus::from_str(&wallet.status).expect("invalid gas wallet status"); + let peer_id = match wallet.peer_id { + Some(peer_id) => peer_id, + None => continue, + }; + + if !active_peers.contains(&peer_id) { + match state.transition_inactive() { + GasWalletStatus::Pending => { + self.mark_gas_wallet_pending(&wallet.address).await?; + }, + GasWalletStatus::Inactive => { + self.mark_gas_wallet_inactive(&wallet.address).await?; + }, + _ => unreachable!(), + } + } + } + + Ok(()) + } + // ----------- // | Helpers | // ----------- diff --git a/funds-manager/funds-manager-server/src/custody_client/queries.rs b/funds-manager/funds-manager-server/src/custody_client/queries.rs index 09cda82..ec0f599 100644 --- a/funds-manager/funds-manager-server/src/custody_client/queries.rs +++ b/funds-manager/funds-manager-server/src/custody_client/queries.rs @@ -5,14 +5,40 @@ use diesel_async::RunQueryDsl; use renegade_util::err_str; use uuid::Uuid; -use crate::db::models::{GasWallet, HotWallet}; +use crate::db::models::{GasWallet, GasWalletStatus, HotWallet}; use crate::db::schema::gas_wallets; use crate::db::schema::hot_wallets; use crate::error::FundsManagerError; use crate::CustodyClient; impl CustodyClient { - // --- Gas Wallets --- // + // --------------- + // | Gas Wallets | + // --------------- + + // --- Getters --- // + + /// Get all gas wallets + pub async fn get_all_gas_wallets(&self) -> Result, FundsManagerError> { + let mut conn = self.get_db_conn().await?; + gas_wallets::table + .load::(&mut conn) + .await + .map_err(err_str!(FundsManagerError::Db)) + } + + /// Find an inactive gas wallet + pub async fn find_inactive_gas_wallet(&self) -> Result { + let mut conn = self.get_db_conn().await?; + let inactive = GasWalletStatus::Inactive.to_string(); + gas_wallets::table + .filter(gas_wallets::status.eq(inactive)) + .first::(&mut conn) + .await + .map_err(err_str!(FundsManagerError::Db)) + } + + // --- Setters --- // /// Add a new gas wallet pub async fn add_gas_wallet(&self, address: &str) -> Result<(), FundsManagerError> { @@ -27,14 +53,34 @@ impl CustodyClient { Ok(()) } - /// Find an inactive gas wallet - pub async fn find_inactive_gas_wallet(&self) -> Result { + /// Mark a gas wallet as inactive + pub async fn mark_gas_wallet_inactive(&self, address: &str) -> Result<(), FundsManagerError> { let mut conn = self.get_db_conn().await?; - gas_wallets::table - .filter(gas_wallets::active.eq(false)) - .first::(&mut conn) + let updates = ( + gas_wallets::status.eq(GasWalletStatus::Inactive.to_string()), + gas_wallets::peer_id.eq(None::), + ); + + diesel::update(gas_wallets::table.filter(gas_wallets::address.eq(address))) + .set(updates) + .execute(&mut conn) .await - .map_err(err_str!(FundsManagerError::Db)) + .map_err(err_str!(FundsManagerError::Db))?; + + Ok(()) + } + + /// Update a gas wallet to pending + pub async fn mark_gas_wallet_pending(&self, address: &str) -> Result<(), FundsManagerError> { + let mut conn = self.get_db_conn().await?; + let pending = GasWalletStatus::Pending.to_string(); + diesel::update(gas_wallets::table.filter(gas_wallets::address.eq(address))) + .set(gas_wallets::status.eq(pending)) + .execute(&mut conn) + .await + .map_err(err_str!(FundsManagerError::Db))?; + + Ok(()) } /// Mark a gas wallet as active @@ -44,7 +90,8 @@ impl CustodyClient { peer_id: &str, ) -> Result<(), FundsManagerError> { let mut conn = self.get_db_conn().await?; - let updates = (gas_wallets::active.eq(true), gas_wallets::peer_id.eq(peer_id)); + let active = GasWalletStatus::Active.to_string(); + let updates = (gas_wallets::status.eq(active), gas_wallets::peer_id.eq(peer_id)); diesel::update(gas_wallets::table.filter(gas_wallets::address.eq(address))) .set(updates) .execute(&mut conn) @@ -54,7 +101,11 @@ impl CustodyClient { Ok(()) } - // --- Hot Wallets --- // + // --------------- + // | Hot Wallets | + // --------------- + + // --- Getters --- // /// Get all hot wallets pub async fn get_all_hot_wallets(&self) -> Result, FundsManagerError> { @@ -67,30 +118,6 @@ impl CustodyClient { Ok(wallets) } - /// Insert a new hot wallet into the database - pub async fn insert_hot_wallet( - &self, - address: &str, - vault: &str, - secret_id: &str, - internal_wallet_id: &Uuid, - ) -> Result<(), FundsManagerError> { - let mut conn = self.get_db_conn().await?; - let entry = HotWallet::new( - secret_id.to_string(), - vault.to_string(), - address.to_string(), - *internal_wallet_id, - ); - diesel::insert_into(hot_wallets::table) - .values(entry) - .execute(&mut conn) - .await - .map_err(err_str!(FundsManagerError::Db))?; - - Ok(()) - } - /// Get a hot wallet by its address pub async fn get_hot_wallet_by_address( &self, @@ -116,4 +143,30 @@ impl CustodyClient { .await .map_err(err_str!(FundsManagerError::Db)) } + + // --- Setters --- // + + /// Insert a new hot wallet into the database + pub async fn insert_hot_wallet( + &self, + address: &str, + vault: &str, + secret_id: &str, + internal_wallet_id: &Uuid, + ) -> Result<(), FundsManagerError> { + let mut conn = self.get_db_conn().await?; + let entry = HotWallet::new( + secret_id.to_string(), + vault.to_string(), + address.to_string(), + *internal_wallet_id, + ); + diesel::insert_into(hot_wallets::table) + .values(entry) + .execute(&mut conn) + .await + .map_err(err_str!(FundsManagerError::Db))?; + + Ok(()) + } } diff --git a/funds-manager/funds-manager-server/src/handlers.rs b/funds-manager/funds-manager-server/src/handlers.rs index 06b50c8..4d44701 100644 --- a/funds-manager/funds-manager-server/src/handlers.rs +++ b/funds-manager/funds-manager-server/src/handlers.rs @@ -7,8 +7,8 @@ use bytes::Bytes; use funds_manager_api::{ CreateGasWalletResponse, CreateHotWalletRequest, CreateHotWalletResponse, DepositAddressResponse, FeeWalletsResponse, HotWalletBalancesResponse, - RegisterGasWalletRequest, RegisterGasWalletResponse, TransferToVaultRequest, - WithdrawFeeBalanceRequest, WithdrawFundsRequest, WithdrawGasRequest, + RegisterGasWalletRequest, RegisterGasWalletResponse, ReportActivePeersRequest, + TransferToVaultRequest, WithdrawFeeBalanceRequest, WithdrawFundsRequest, WithdrawGasRequest, WithdrawToHotWalletRequest, }; use itertools::Itertools; @@ -158,6 +158,19 @@ pub(crate) async fn register_gas_wallet_handler( Ok(warp::reply::json(&resp)) } +/// Handler for reporting active peers +pub(crate) async fn report_active_peers_handler( + req: ReportActivePeersRequest, + server: Arc, +) -> Result { + server + .custody_client + .record_active_gas_wallet(req.peers) + .await + .map_err(|e| warp::reject::custom(ApiError::InternalError(e.to_string())))?; + Ok(warp::reply::json(&{})) +} + // --- Hot Wallets --- // /// Handler for creating a hot wallet diff --git a/funds-manager/funds-manager-server/src/main.rs b/funds-manager/funds-manager-server/src/main.rs index ca25989..2a13d50 100644 --- a/funds-manager/funds-manager-server/src/main.rs +++ b/funds-manager/funds-manager-server/src/main.rs @@ -21,18 +21,19 @@ use error::FundsManagerError; use ethers::signers::LocalWallet; use fee_indexer::Indexer; use funds_manager_api::{ - CreateHotWalletRequest, RegisterGasWalletRequest, TransferToVaultRequest, - WithdrawFeeBalanceRequest, WithdrawGasRequest, WithdrawToHotWalletRequest, - GET_DEPOSIT_ADDRESS_ROUTE, GET_FEE_WALLETS_ROUTE, INDEX_FEES_ROUTE, PING_ROUTE, - REDEEM_FEES_ROUTE, REGISTER_GAS_WALLET_ROUTE, TRANSFER_TO_VAULT_ROUTE, WITHDRAW_CUSTODY_ROUTE, - WITHDRAW_FEE_BALANCE_ROUTE, WITHDRAW_GAS_ROUTE, WITHDRAW_TO_HOT_WALLET_ROUTE, + CreateHotWalletRequest, RegisterGasWalletRequest, ReportActivePeersRequest, + TransferToVaultRequest, WithdrawFeeBalanceRequest, WithdrawGasRequest, + WithdrawToHotWalletRequest, GET_DEPOSIT_ADDRESS_ROUTE, GET_FEE_WALLETS_ROUTE, INDEX_FEES_ROUTE, + PING_ROUTE, REDEEM_FEES_ROUTE, REGISTER_GAS_WALLET_ROUTE, REPORT_ACTIVE_PEERS_ROUTE, + TRANSFER_TO_VAULT_ROUTE, WITHDRAW_CUSTODY_ROUTE, WITHDRAW_FEE_BALANCE_ROUTE, + WITHDRAW_GAS_ROUTE, WITHDRAW_TO_HOT_WALLET_ROUTE, }; use handlers::{ create_gas_wallet_handler, create_hot_wallet_handler, get_deposit_address_handler, get_fee_wallets_handler, get_hot_wallet_balances_handler, index_fees_handler, quoter_withdraw_handler, redeem_fees_handler, register_gas_wallet_handler, - transfer_to_vault_handler, withdraw_fee_balance_handler, withdraw_from_vault_handler, - withdraw_gas_handler, + report_active_peers_handler, transfer_to_vault_handler, withdraw_fee_balance_handler, + withdraw_from_vault_handler, withdraw_gas_handler, }; use middleware::{identity, with_hmac_auth, with_json_body}; use relayer_client::RelayerClient; @@ -359,6 +360,16 @@ async fn main() -> Result<(), Box> { .and(with_server(server.clone())) .and_then(register_gas_wallet_handler); + let report_active_peers = warp::post() + .and(warp::path("custody")) + .and(warp::path("gas-wallets")) + .and(warp::path(REPORT_ACTIVE_PEERS_ROUTE)) + .and(with_hmac_auth(server.clone())) + .map(with_json_body::) + .and_then(identity) + .and(with_server(server.clone())) + .and_then(report_active_peers_handler); + // --- Hot Wallets --- // let create_hot_wallet = warp::post() @@ -404,6 +415,7 @@ async fn main() -> Result<(), Box> { .or(withdraw_custody) .or(get_deposit_address) .or(withdraw_gas) + .or(report_active_peers) .or(register_gas_wallet) .or(add_gas_wallet) .or(get_balances)