Skip to content

Commit

Permalink
funds-manager: custody-client: Add handler for reporting active peers
Browse files Browse the repository at this point in the history
  • Loading branch information
joeykraut committed Aug 2, 2024
1 parent 2683ea5 commit 283156e
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 43 deletions.
12 changes: 12 additions & 0 deletions funds-manager/funds-manager-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub const WITHDRAW_CUSTODY_ROUTE: &str = "withdraw";
pub const WITHDRAW_GAS_ROUTE: &str = "withdraw-gas";
/// The route to register a gas wallet for a peer
pub const REGISTER_GAS_WALLET_ROUTE: &str = "register-gas-wallet";
/// The route to report active peers
pub const REPORT_ACTIVE_PEERS_ROUTE: &str = "report-active-peers";

/// The route to get fee wallets
pub const GET_FEE_WALLETS_ROUTE: &str = "get-fee-wallets";
Expand Down Expand Up @@ -114,6 +116,16 @@ pub struct RegisterGasWalletResponse {
pub key: String,
}

/// A request reporting active peers in the network
///
/// The funds manager uses such a request to mark gas wallets as active or
/// inactive
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReportActivePeersRequest {
/// The list of active peers
pub peers: Vec<String>,
}

// --- Hot Wallets --- //

/// The request body for creating a hot wallet
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Handlers for gas wallet operations

use std::str::FromStr;

use ethers::{
signers::{LocalWallet, Signer},
utils::hex::ToHexExt,
Expand All @@ -8,6 +10,7 @@ use rand::thread_rng;
use tracing::info;

use crate::{
db::models::GasWalletStatus,
error::FundsManagerError,
helpers::{create_secrets_manager_entry_with_description, get_secret},
};
Expand Down Expand Up @@ -61,6 +64,41 @@ impl CustodyClient {
Ok(secret_value)
}

/// Record the set of active peers, marking their gas wallets as active and
/// transitioning the rest to inactive or pending if necessary
pub(crate) async fn record_active_gas_wallet(
&self,
active_peers: Vec<String>,
) -> Result<(), FundsManagerError> {
// Fetch all gas wallets
let all_wallets = self.get_all_gas_wallets().await?;

// For those gas wallets whose peer is not in the active peers list, mark them
// as inactive
for wallet in all_wallets {
let state =
GasWalletStatus::from_str(&wallet.status).expect("invalid gas wallet status");
let peer_id = match wallet.peer_id {
Some(peer_id) => peer_id,
None => continue,
};

if !active_peers.contains(&peer_id) {
match state.transition_inactive() {
GasWalletStatus::Pending => {
self.mark_gas_wallet_pending(&wallet.address).await?;
},
GasWalletStatus::Inactive => {
self.mark_gas_wallet_inactive(&wallet.address).await?;
},
_ => unreachable!(),
}
}
}

Ok(())
}

// -----------
// | Helpers |
// -----------
Expand Down
121 changes: 87 additions & 34 deletions funds-manager/funds-manager-server/src/custody_client/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,40 @@ use diesel_async::RunQueryDsl;
use renegade_util::err_str;
use uuid::Uuid;

use crate::db::models::{GasWallet, HotWallet};
use crate::db::models::{GasWallet, GasWalletStatus, HotWallet};
use crate::db::schema::gas_wallets;
use crate::db::schema::hot_wallets;
use crate::error::FundsManagerError;
use crate::CustodyClient;

impl CustodyClient {
// --- Gas Wallets --- //
// ---------------
// | Gas Wallets |
// ---------------

// --- Getters --- //

/// Get all gas wallets
pub async fn get_all_gas_wallets(&self) -> Result<Vec<GasWallet>, FundsManagerError> {
let mut conn = self.get_db_conn().await?;
gas_wallets::table
.load::<GasWallet>(&mut conn)
.await
.map_err(err_str!(FundsManagerError::Db))
}

/// Find an inactive gas wallet
pub async fn find_inactive_gas_wallet(&self) -> Result<GasWallet, FundsManagerError> {
let mut conn = self.get_db_conn().await?;
let inactive = GasWalletStatus::Inactive.to_string();
gas_wallets::table
.filter(gas_wallets::status.eq(inactive))
.first::<GasWallet>(&mut conn)
.await
.map_err(err_str!(FundsManagerError::Db))
}

// --- Setters --- //

/// Add a new gas wallet
pub async fn add_gas_wallet(&self, address: &str) -> Result<(), FundsManagerError> {
Expand All @@ -27,14 +53,34 @@ impl CustodyClient {
Ok(())
}

/// Find an inactive gas wallet
pub async fn find_inactive_gas_wallet(&self) -> Result<GasWallet, FundsManagerError> {
/// Mark a gas wallet as inactive
pub async fn mark_gas_wallet_inactive(&self, address: &str) -> Result<(), FundsManagerError> {
let mut conn = self.get_db_conn().await?;
gas_wallets::table
.filter(gas_wallets::active.eq(false))
.first::<GasWallet>(&mut conn)
let updates = (
gas_wallets::status.eq(GasWalletStatus::Inactive.to_string()),
gas_wallets::peer_id.eq(None::<String>),
);

diesel::update(gas_wallets::table.filter(gas_wallets::address.eq(address)))
.set(updates)
.execute(&mut conn)
.await
.map_err(err_str!(FundsManagerError::Db))
.map_err(err_str!(FundsManagerError::Db))?;

Ok(())
}

/// Update a gas wallet to pending
pub async fn mark_gas_wallet_pending(&self, address: &str) -> Result<(), FundsManagerError> {
let mut conn = self.get_db_conn().await?;
let pending = GasWalletStatus::Pending.to_string();
diesel::update(gas_wallets::table.filter(gas_wallets::address.eq(address)))
.set(gas_wallets::status.eq(pending))
.execute(&mut conn)
.await
.map_err(err_str!(FundsManagerError::Db))?;

Ok(())
}

/// Mark a gas wallet as active
Expand All @@ -44,7 +90,8 @@ impl CustodyClient {
peer_id: &str,
) -> Result<(), FundsManagerError> {
let mut conn = self.get_db_conn().await?;
let updates = (gas_wallets::active.eq(true), gas_wallets::peer_id.eq(peer_id));
let active = GasWalletStatus::Active.to_string();
let updates = (gas_wallets::status.eq(active), gas_wallets::peer_id.eq(peer_id));
diesel::update(gas_wallets::table.filter(gas_wallets::address.eq(address)))
.set(updates)
.execute(&mut conn)
Expand All @@ -54,7 +101,11 @@ impl CustodyClient {
Ok(())
}

// --- Hot Wallets --- //
// ---------------
// | Hot Wallets |
// ---------------

// --- Getters --- //

/// Get all hot wallets
pub async fn get_all_hot_wallets(&self) -> Result<Vec<HotWallet>, FundsManagerError> {
Expand All @@ -67,30 +118,6 @@ impl CustodyClient {
Ok(wallets)
}

/// Insert a new hot wallet into the database
pub async fn insert_hot_wallet(
&self,
address: &str,
vault: &str,
secret_id: &str,
internal_wallet_id: &Uuid,
) -> Result<(), FundsManagerError> {
let mut conn = self.get_db_conn().await?;
let entry = HotWallet::new(
secret_id.to_string(),
vault.to_string(),
address.to_string(),
*internal_wallet_id,
);
diesel::insert_into(hot_wallets::table)
.values(entry)
.execute(&mut conn)
.await
.map_err(err_str!(FundsManagerError::Db))?;

Ok(())
}

/// Get a hot wallet by its address
pub async fn get_hot_wallet_by_address(
&self,
Expand All @@ -116,4 +143,30 @@ impl CustodyClient {
.await
.map_err(err_str!(FundsManagerError::Db))
}

// --- Setters --- //

/// Insert a new hot wallet into the database
pub async fn insert_hot_wallet(
&self,
address: &str,
vault: &str,
secret_id: &str,
internal_wallet_id: &Uuid,
) -> Result<(), FundsManagerError> {
let mut conn = self.get_db_conn().await?;
let entry = HotWallet::new(
secret_id.to_string(),
vault.to_string(),
address.to_string(),
*internal_wallet_id,
);
diesel::insert_into(hot_wallets::table)
.values(entry)
.execute(&mut conn)
.await
.map_err(err_str!(FundsManagerError::Db))?;

Ok(())
}
}
17 changes: 15 additions & 2 deletions funds-manager/funds-manager-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use bytes::Bytes;
use funds_manager_api::{
CreateGasWalletResponse, CreateHotWalletRequest, CreateHotWalletResponse,
DepositAddressResponse, FeeWalletsResponse, HotWalletBalancesResponse,
RegisterGasWalletRequest, RegisterGasWalletResponse, TransferToVaultRequest,
WithdrawFeeBalanceRequest, WithdrawFundsRequest, WithdrawGasRequest,
RegisterGasWalletRequest, RegisterGasWalletResponse, ReportActivePeersRequest,
TransferToVaultRequest, WithdrawFeeBalanceRequest, WithdrawFundsRequest, WithdrawGasRequest,
WithdrawToHotWalletRequest,
};
use itertools::Itertools;
Expand Down Expand Up @@ -158,6 +158,19 @@ pub(crate) async fn register_gas_wallet_handler(
Ok(warp::reply::json(&resp))
}

/// Handler for reporting active peers
pub(crate) async fn report_active_peers_handler(
req: ReportActivePeersRequest,
server: Arc<Server>,
) -> Result<Json, warp::Rejection> {
server
.custody_client
.record_active_gas_wallet(req.peers)
.await
.map_err(|e| warp::reject::custom(ApiError::InternalError(e.to_string())))?;
Ok(warp::reply::json(&{}))
}

// --- Hot Wallets --- //

/// Handler for creating a hot wallet
Expand Down
26 changes: 19 additions & 7 deletions funds-manager/funds-manager-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ use error::FundsManagerError;
use ethers::signers::LocalWallet;
use fee_indexer::Indexer;
use funds_manager_api::{
CreateHotWalletRequest, RegisterGasWalletRequest, TransferToVaultRequest,
WithdrawFeeBalanceRequest, WithdrawGasRequest, WithdrawToHotWalletRequest,
GET_DEPOSIT_ADDRESS_ROUTE, GET_FEE_WALLETS_ROUTE, INDEX_FEES_ROUTE, PING_ROUTE,
REDEEM_FEES_ROUTE, REGISTER_GAS_WALLET_ROUTE, TRANSFER_TO_VAULT_ROUTE, WITHDRAW_CUSTODY_ROUTE,
WITHDRAW_FEE_BALANCE_ROUTE, WITHDRAW_GAS_ROUTE, WITHDRAW_TO_HOT_WALLET_ROUTE,
CreateHotWalletRequest, RegisterGasWalletRequest, ReportActivePeersRequest,
TransferToVaultRequest, WithdrawFeeBalanceRequest, WithdrawGasRequest,
WithdrawToHotWalletRequest, GET_DEPOSIT_ADDRESS_ROUTE, GET_FEE_WALLETS_ROUTE, INDEX_FEES_ROUTE,
PING_ROUTE, REDEEM_FEES_ROUTE, REGISTER_GAS_WALLET_ROUTE, REPORT_ACTIVE_PEERS_ROUTE,
TRANSFER_TO_VAULT_ROUTE, WITHDRAW_CUSTODY_ROUTE, WITHDRAW_FEE_BALANCE_ROUTE,
WITHDRAW_GAS_ROUTE, WITHDRAW_TO_HOT_WALLET_ROUTE,
};
use handlers::{
create_gas_wallet_handler, create_hot_wallet_handler, get_deposit_address_handler,
get_fee_wallets_handler, get_hot_wallet_balances_handler, index_fees_handler,
quoter_withdraw_handler, redeem_fees_handler, register_gas_wallet_handler,
transfer_to_vault_handler, withdraw_fee_balance_handler, withdraw_from_vault_handler,
withdraw_gas_handler,
report_active_peers_handler, transfer_to_vault_handler, withdraw_fee_balance_handler,
withdraw_from_vault_handler, withdraw_gas_handler,
};
use middleware::{identity, with_hmac_auth, with_json_body};
use relayer_client::RelayerClient;
Expand Down Expand Up @@ -359,6 +360,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
.and(with_server(server.clone()))
.and_then(register_gas_wallet_handler);

let report_active_peers = warp::post()
.and(warp::path("custody"))
.and(warp::path("gas-wallets"))
.and(warp::path(REPORT_ACTIVE_PEERS_ROUTE))
.and(with_hmac_auth(server.clone()))
.map(with_json_body::<ReportActivePeersRequest>)
.and_then(identity)
.and(with_server(server.clone()))
.and_then(report_active_peers_handler);

// --- Hot Wallets --- //

let create_hot_wallet = warp::post()
Expand Down Expand Up @@ -404,6 +415,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.or(withdraw_custody)
.or(get_deposit_address)
.or(withdraw_gas)
.or(report_active_peers)
.or(register_gas_wallet)
.or(add_gas_wallet)
.or(get_balances)
Expand Down

0 comments on commit 283156e

Please sign in to comment.