From 85034916a5343c0c63ccd63da520417a94d0387e Mon Sep 17 00:00:00 2001 From: Joey Kraut Date: Wed, 17 Jul 2024 17:27:32 -0700 Subject: [PATCH] funds-manager: Use `diesel-async` and add `warp` endpoints --- Cargo.toml | 4 ++ compliance/compliance-server/Cargo.toml | 2 +- funds-manager/Cargo.toml | 9 ++- funds-manager/src/db/mod.rs | 29 ++++++++ funds-manager/src/db/schema.rs | 6 +- funds-manager/src/error.rs | 25 +++++++ funds-manager/src/indexer/index_fees.rs | 6 +- funds-manager/src/indexer/mod.rs | 14 ++-- funds-manager/src/indexer/queries.rs | 59 ++++++++++------ funds-manager/src/indexer/redeem_fees.rs | 26 ++++--- funds-manager/src/main.rs | 90 +++++++++++++++++++++--- 11 files changed, 215 insertions(+), 55 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a88c086..b975b22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,5 +33,9 @@ renegade-circuit-types = { package = "circuit-types", git = "https://github.com/ renegade-crypto = { git = "https://github.com/renegade-fi/renegade.git" } renegade-util = { package = "util", git = "https://github.com/renegade-fi/renegade.git" } +# === Database Dependencies === # +diesel = { version = "2.1" } +diesel-async = { version = "0.4" } + # === Misc Dependencies === # tracing = "0.1" diff --git a/compliance/compliance-server/Cargo.toml b/compliance/compliance-server/Cargo.toml index 33aae9e..b59a2a1 100644 --- a/compliance/compliance-server/Cargo.toml +++ b/compliance/compliance-server/Cargo.toml @@ -10,7 +10,7 @@ warp = "0.3" compliance-api = { path = "../compliance-api" } # === Database === # -diesel = { version = "2.2", features = ["postgres", "r2d2"] } +diesel = { workspace = true, features = ["postgres", "r2d2"] } # === Renegade Dependencies === # renegade-util = { workspace = true } diff --git a/funds-manager/Cargo.toml b/funds-manager/Cargo.toml index 3346be7..1f1bf41 100644 --- a/funds-manager/Cargo.toml +++ b/funds-manager/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "fee-sweeper" +name = "funds-manager" description = "Manages custody of funds for protocol operator" version = "0.1.0" edition = "2021" @@ -11,10 +11,15 @@ http-body-util = "0.1.0" tokio = { version = "1.10", features = ["full"] } warp = "0.3" + # === Infra === # aws-sdk-secretsmanager = "1.37" aws-config = "1.5" -diesel = { version = "2.2", features = ["postgres", "numeric", "uuid"] } +diesel = { workspace = true, features = ["postgres", "numeric", "uuid"] } +diesel-async = { workspace = true, features = ["postgres"] } +native-tls = "0.2" +postgres-native-tls = "0.5" +tokio-postgres = "0.7.7" # === Blockchain Interaction === # alloy-sol-types = "0.3.1" diff --git a/funds-manager/src/db/mod.rs b/funds-manager/src/db/mod.rs index 5402351..37209b1 100644 --- a/funds-manager/src/db/mod.rs +++ b/funds-manager/src/db/mod.rs @@ -1,5 +1,34 @@ //! Database code +use diesel_async::{AsyncConnection, AsyncPgConnection}; +use native_tls::TlsConnector; +use postgres_native_tls::MakeTlsConnector; +use renegade_util::err_str; +use tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; + +use crate::error::FundsManagerError; + pub mod models; #[allow(missing_docs)] pub mod schema; + +/// Establish a connection to the database +pub async fn establish_connection(db_url: &str) -> Result { + let connector = TlsConnector::builder() + .danger_accept_invalid_certs(true) + .build() + .map_err(err_str!(FundsManagerError::Db))?; + let connector = MakeTlsConnector::new(connector); + let (client, conn) = tokio_postgres::connect(db_url, connector) + .await + .map_err(err_str!(FundsManagerError::Db))?; + + // Spawn the connection handle in a separate task + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("Connection error: {}", e); + } + }); + + AsyncPgConnection::try_from(client).await.map_err(err_str!(FundsManagerError::Db)) +} diff --git a/funds-manager/src/db/schema.rs b/funds-manager/src/db/schema.rs index af949b2..2363365 100644 --- a/funds-manager/src/db/schema.rs +++ b/funds-manager/src/db/schema.rs @@ -27,4 +27,8 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!(fees, indexing_metadata, wallets,); +diesel::allow_tables_to_appear_in_same_query!( + fees, + indexing_metadata, + wallets, +); diff --git a/funds-manager/src/error.rs b/funds-manager/src/error.rs index b3a29a9..071ab0f 100644 --- a/funds-manager/src/error.rs +++ b/funds-manager/src/error.rs @@ -67,3 +67,28 @@ impl Display for FundsManagerError { } impl Error for FundsManagerError {} impl Reject for FundsManagerError {} + +/// API-specific error type +#[derive(Debug)] +pub enum ApiError { + /// Error during fee indexing + IndexingError(String), + /// Error during fee redemption + RedemptionError(String), + /// Internal server error + InternalError(String), +} + +impl Reject for ApiError {} + +impl Display for ApiError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ApiError::IndexingError(e) => write!(f, "Indexing error: {}", e), + ApiError::RedemptionError(e) => write!(f, "Redemption error: {}", e), + ApiError::InternalError(e) => write!(f, "Internal error: {}", e), + } + } +} + +impl Error for ApiError {} diff --git a/funds-manager/src/indexer/index_fees.rs b/funds-manager/src/indexer/index_fees.rs index 7d653e0..dcac217 100644 --- a/funds-manager/src/indexer/index_fees.rs +++ b/funds-manager/src/indexer/index_fees.rs @@ -26,7 +26,7 @@ use crate::Indexer; impl Indexer { /// Index all fees since the given block pub async fn index_fees(&mut self) -> Result<(), FundsManagerError> { - let block_number = self.get_latest_block()?; + let block_number = self.get_latest_block().await?; info!("indexing fees from block {block_number}"); let filter = self @@ -48,7 +48,7 @@ impl Indexer { if block > most_recent_block { most_recent_block = block; - self.update_latest_block(most_recent_block)?; + self.update_latest_block(most_recent_block).await?; } } @@ -86,7 +86,7 @@ impl Indexer { // Otherwise, index the note let fee = NewFee::new_from_note(¬e, tx); - self.insert_fee(fee) + self.insert_fee(fee).await } /// Get a note from a transaction body diff --git a/funds-manager/src/indexer/mod.rs b/funds-manager/src/indexer/mod.rs index 9faa4ca..cd37970 100644 --- a/funds-manager/src/indexer/mod.rs +++ b/funds-manager/src/indexer/mod.rs @@ -2,8 +2,9 @@ use arbitrum_client::{client::ArbitrumClient, constants::Chain}; use aws_config::SdkConfig as AwsConfig; -use diesel::PgConnection; -use renegade_circuit_types::elgamal::{DecryptionKey, EncryptionKey}; +use diesel_async::AsyncPgConnection; +use renegade_circuit_types::elgamal::DecryptionKey; +use renegade_util::hex::jubjub_from_hex_string; use crate::relayer_client::RelayerClient; @@ -24,7 +25,7 @@ pub(crate) struct Indexer { /// The decryption key pub decryption_keys: Vec, /// A connection to the DB - pub db_conn: PgConnection, + pub db_conn: AsyncPgConnection, /// The AWS config pub aws_config: AwsConfig, } @@ -37,7 +38,7 @@ impl Indexer { aws_config: AwsConfig, arbitrum_client: ArbitrumClient, decryption_keys: Vec, - db_conn: PgConnection, + db_conn: AsyncPgConnection, relayer_client: RelayerClient, ) -> Self { Indexer { @@ -53,7 +54,8 @@ impl Indexer { /// Get the decryption key for a given encryption key, referred to as a /// receiver in this context - pub fn get_key_for_receiver(&self, receiver: EncryptionKey) -> Option<&DecryptionKey> { - self.decryption_keys.iter().find(|key| key.public_key() == receiver) + pub fn get_key_for_receiver(&self, receiver: &str) -> Option<&DecryptionKey> { + let key = jubjub_from_hex_string(receiver).ok()?; + self.decryption_keys.iter().find(|k| k.public_key() == key) } } diff --git a/funds-manager/src/indexer/queries.rs b/funds-manager/src/indexer/queries.rs index a5b4174..8eccd82 100644 --- a/funds-manager/src/indexer/queries.rs +++ b/funds-manager/src/indexer/queries.rs @@ -3,14 +3,15 @@ use std::collections::HashMap; use bigdecimal::BigDecimal; -use diesel::define_sql_function; use diesel::deserialize::Queryable; use diesel::deserialize::QueryableByName; +use diesel::sql_function; use diesel::sql_query; use diesel::sql_types::SingleValue; use diesel::sql_types::{Array, Integer, Nullable, Numeric, Text}; use diesel::PgArrayExpressionMethods; -use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; +use diesel::{ExpressionMethods, QueryDsl}; +use diesel_async::RunQueryDsl; use renegade_constants::MAX_BALANCES; use crate::db::models::WalletMetadata; @@ -33,12 +34,12 @@ use super::redeem_fees::MAX_FEES_REDEEMED; pub(crate) const LAST_INDEXED_BLOCK_KEY: &str = "latest_block"; // Define the `array_length` function -define_sql_function! { +sql_function! { /// Calculate the length of an array fn array_length(array: Array, dim: Integer) -> Nullable; } -define_sql_function! { +sql_function! { /// Coalesce a nullable value with a default value fn coalesce(x: Nullable, y: T) -> T; } @@ -75,12 +76,13 @@ impl Indexer { // ------------------ /// Get the latest block number - pub(crate) fn get_latest_block(&mut self) -> Result { + pub(crate) async fn get_latest_block(&mut self) -> Result { let entry = metadata_table .filter(metadata_key.eq(LAST_INDEXED_BLOCK_KEY)) .limit(1) - .load(&mut self.db_conn) - .map(|res: Vec| res[0].clone()) + .load::(&mut self.db_conn) + .await + .map(|res| res[0].clone()) .map_err(|_| FundsManagerError::db("failed to query latest block"))?; entry @@ -90,7 +92,7 @@ impl Indexer { } /// Update the latest block number - pub(crate) fn update_latest_block( + pub(crate) async fn update_latest_block( &mut self, block_number: u64, ) -> Result<(), FundsManagerError> { @@ -98,6 +100,7 @@ impl Indexer { diesel::update(metadata_table.find(LAST_INDEXED_BLOCK_KEY)) .set(metadata_value.eq(block_string)) .execute(&mut self.db_conn) + .await .map_err(|_| FundsManagerError::db("failed to update latest block")) .map(|_| ()) } @@ -107,32 +110,40 @@ impl Indexer { // -------------- /// Insert a fee into the fees table - pub(crate) fn insert_fee(&mut self, fee: NewFee) -> Result<(), FundsManagerError> { + pub(crate) async fn insert_fee(&mut self, fee: NewFee) -> Result<(), FundsManagerError> { diesel::insert_into(fees_table) .values(vec![fee]) .execute(&mut self.db_conn) - .map_err(|_| FundsManagerError::db("failed to insert fee: {}")) + .await + .map_err(|e| FundsManagerError::db(format!("failed to insert fee: {e}"))) .map(|_| ()) } /// Get all mints that have unredeemed fees - pub(crate) fn get_unredeemed_fee_mints(&mut self) -> Result, FundsManagerError> { + pub(crate) async fn get_unredeemed_fee_mints( + &mut self, + ) -> Result, FundsManagerError> { let mints = fees_table .select(mint_col) .filter(redeemed_col.eq(false)) .distinct() - .load(&mut self.db_conn) + .load::(&mut self.db_conn) + .await .map_err(|_| FundsManagerError::db("failed to query unredeemed fees"))?; Ok(mints) } /// Mark a fee as redeemed - pub(crate) fn mark_fee_as_redeemed(&mut self, tx_hash: &str) -> Result<(), FundsManagerError> { + pub(crate) async fn mark_fee_as_redeemed( + &mut self, + tx_hash: &str, + ) -> Result<(), FundsManagerError> { let filter = tx_hash_col.eq(tx_hash); diesel::update(fees_table.filter(filter)) .set(redeemed_col.eq(true)) .execute(&mut self.db_conn) + .await .map_err(|_| FundsManagerError::db("failed to mark fee as redeemed")) .map(|_| ()) } @@ -140,7 +151,7 @@ impl Indexer { /// Get the most valuable fees to be redeemed /// /// Returns the tx hashes of the most valuable fees to be redeemed - pub(crate) fn get_most_valuable_fees( + pub(crate) async fn get_most_valuable_fees( &mut self, prices: HashMap, ) -> Result, FundsManagerError> { @@ -175,7 +186,8 @@ impl Indexer { // Query for the tx hashes sql_query(query_string) - .load(&mut self.db_conn) + .load::(&mut self.db_conn) + .await .map_err(|_| FundsManagerError::db("failed to query most valuable fees")) } @@ -186,39 +198,42 @@ impl Indexer { /// Get the wallet managing an mint, if it exists /// /// Returns the id and secret id of the wallet - pub(crate) fn get_wallet_for_mint( + pub(crate) async fn get_wallet_for_mint( &mut self, mint: &str, ) -> Result, FundsManagerError> { let wallets: Vec = wallet_table .filter(managed_mints_col.contains(vec![mint])) - .load(&mut self.db_conn) + .load::(&mut self.db_conn) + .await .map_err(|_| FundsManagerError::db("failed to query wallet for mint"))?; - Ok(wallets.first().cloned()) + Ok(wallets.into_iter().next()) } /// Find a wallet with an empty balance slot, if one exists - pub(crate) fn find_wallet_with_empty_balance( + pub(crate) async fn find_wallet_with_empty_balance( &mut self, ) -> Result, FundsManagerError> { let n_mints = coalesce(array_length(managed_mints_col, 1 /* dim */), 0); let wallets = wallet_table .filter(n_mints.lt(MAX_BALANCES as i32)) - .load(&mut self.db_conn) + .load::(&mut self.db_conn) + .await .map_err(|_| FundsManagerError::db("failed to query wallets with empty balances"))?; - Ok(wallets.first().cloned()) + Ok(wallets.into_iter().next()) } /// Insert a new wallet into the wallets table - pub(crate) fn insert_wallet( + pub(crate) async fn insert_wallet( &mut self, wallet: WalletMetadata, ) -> Result<(), FundsManagerError> { diesel::insert_into(wallet_table) .values(vec![wallet]) .execute(&mut self.db_conn) + .await .map_err(|_| FundsManagerError::db("failed to insert wallet")) .map(|_| ()) } diff --git a/funds-manager/src/indexer/redeem_fees.rs b/funds-manager/src/indexer/redeem_fees.rs index e71e0c0..f4c0ac4 100644 --- a/funds-manager/src/indexer/redeem_fees.rs +++ b/funds-manager/src/indexer/redeem_fees.rs @@ -9,7 +9,6 @@ use ethers::signers::LocalWallet; use ethers::types::TxHash; use ethers::utils::hex; use renegade_api::http::wallet::RedeemNoteRequest; -use renegade_circuit_types::elgamal::DecryptionKey; use renegade_circuit_types::note::Note; use renegade_common::types::wallet::derivation::{ derive_blinder_seed, derive_share_seed, derive_wallet_id, derive_wallet_keychain, @@ -31,7 +30,7 @@ impl Indexer { info!("redeeming fees..."); // Get all mints that have unredeemed fees - let mints = self.get_unredeemed_fee_mints()?; + let mints = self.get_unredeemed_fee_mints().await?; // Get the prices of each redeemable mint, we want to redeem the most profitable // fees first @@ -46,7 +45,7 @@ impl Indexer { } // Get the most valuable fees and redeem them - let most_valuable_fees = self.get_most_valuable_fees(prices)?; + let most_valuable_fees = self.get_most_valuable_fees(prices).await?; // TODO: Filter by those fees whose present value exceeds the expected gas costs // to redeem @@ -67,9 +66,12 @@ impl Indexer { &mut self, mint: &str, ) -> Result { - let maybe_wallet = self.get_wallet_for_mint(mint)?; - let maybe_wallet = - maybe_wallet.or_else(|| self.find_wallet_with_empty_balance().ok().flatten()); + let maybe_wallet = self.get_wallet_for_mint(mint).await?; + let maybe_wallet = if maybe_wallet.is_none() { + self.find_wallet_with_empty_balance().await? + } else { + maybe_wallet + }; match maybe_wallet { Some(wallet) => Ok(wallet), @@ -92,7 +94,7 @@ impl Indexer { // 3. Add an entry in the wallets table for the newly created wallet let entry = WalletMetadata::empty(wallet_id, secret_name); - self.insert_wallet(entry.clone())?; + self.insert_wallet(entry.clone()).await?; Ok(entry) } @@ -137,11 +139,13 @@ impl Indexer { // Find the note in the tx body let tx_hash = TxHash::from_str(&tx).map_err(err_str!(FundsManagerError::Parse))?; - let receiver = DecryptionKey::from_hex_str(&receiver).unwrap(); - let note = self.get_note_from_tx_with_key(tx_hash, &receiver).await?; + let key = self + .get_key_for_receiver(&receiver) + .ok_or(FundsManagerError::custom("no key found for receiver"))?; + let note = self.get_note_from_tx_with_key(tx_hash, key).await?; // Redeem the note through the relayer - let req = RedeemNoteRequest { note: note.clone(), decryption_key: receiver }; + let req = RedeemNoteRequest { note: note.clone(), decryption_key: *key }; self.relayer_client.redeem_note(wallet.id, req, &root_key).await?; // Mark the fee as redeemed @@ -166,7 +170,7 @@ impl Indexer { } info!("successfully redeemed fee from tx: {}", tx_hash); - self.mark_fee_as_redeemed(tx_hash) + self.mark_fee_as_redeemed(tx_hash).await } // ------------------- diff --git a/funds-manager/src/main.rs b/funds-manager/src/main.rs index ee09215..7d5cd5a 100644 --- a/funds-manager/src/main.rs +++ b/funds-manager/src/main.rs @@ -12,7 +12,7 @@ pub mod indexer; pub mod relayer_client; use aws_config::{BehaviorVersion, Region, SdkConfig}; -use diesel::{pg::PgConnection, Connection}; +use diesel_async::{AsyncConnection, AsyncPgConnection}; use error::FundsManagerError; use ethers::signers::LocalWallet; use indexer::Indexer; @@ -30,8 +30,11 @@ use arbitrum_client::{ constants::Chain, }; use clap::Parser; +use tracing::error; use warp::{reply::Json, Filter}; +use crate::error::ApiError; + // ------------- // | Constants | // ------------- @@ -61,13 +64,13 @@ struct Cli { #[clap(long, default_value = "mainnet")] chain: Chain, /// The fee decryption key to use - #[clap(short, long, env = "RELAYER_DECRYPTION_KEY")] + #[clap(long, env = "RELAYER_DECRYPTION_KEY")] relayer_decryption_key: String, /// The fee decryption key to use for the protocol fees /// /// This argument is not necessary, protocol fee indexing is skipped if this /// is omitted - #[clap(short, long, env = "PROTOCOL_DECRYPTION_KEY")] + #[clap(long, env = "PROTOCOL_DECRYPTION_KEY")] protocol_decryption_key: Option, /// The arbitrum private key used to submit transactions #[clap(long = "pkey", env = "ARBITRUM_PRIVATE_KEY")] @@ -105,9 +108,11 @@ struct Server { impl Server { /// Build an indexer - pub fn build_indexer(&self) -> Result { - let db_conn = - PgConnection::establish(&self.db_url).map_err(err_str!(FundsManagerError::Db))?; + pub async fn build_indexer(&self) -> Result { + let db_conn = db::establish_connection(&self.db_url) + .await + .map_err(err_str!(FundsManagerError::Db))?; + Ok(Indexer::new( self.chain_id, self.chain, @@ -120,7 +125,6 @@ impl Server { } } -/// Main #[tokio::main] async fn main() -> Result<(), Box> { setup_system_logger(LevelFilter::INFO); @@ -161,13 +165,81 @@ async fn main() -> Result<(), Box> { aws_config: config, }; - // Define routes + // --- Routes --- // + let ping = warp::get() .and(warp::path("ping")) .map(|| warp::reply::with_status("PONG", warp::http::StatusCode::OK)); - let routes = ping; + let index_fees = warp::post() + .and(warp::path("index-fees")) + .and(with_server(Arc::new(server.clone()))) + .and_then(index_fees_handler); + + let redeem_fees = warp::post() + .and(warp::path("redeem-fees")) + .and(with_server(Arc::new(server.clone()))) + .and_then(redeem_fees_handler); + + let routes = ping.or(index_fees).or(redeem_fees).recover(handle_rejection); warp::serve(routes).run(([0, 0, 0, 0], cli.port)).await; Ok(()) } + +// ------------ +// | Handlers | +// ------------ + +/// Handler for indexing fees +async fn index_fees_handler(server: Arc) -> Result { + let mut indexer = server + .build_indexer() + .await + .map_err(|e| warp::reject::custom(ApiError::InternalError(e.to_string())))?; + indexer + .index_fees() + .await + .map_err(|e| warp::reject::custom(ApiError::IndexingError(e.to_string())))?; + Ok(warp::reply::json(&"Fees indexed successfully")) +} + +/// Handler for redeeming fees +async fn redeem_fees_handler(server: Arc) -> Result { + let mut indexer = server + .build_indexer() + .await + .map_err(|e| warp::reject::custom(ApiError::InternalError(e.to_string())))?; + indexer + .redeem_fees() + .await + .map_err(|e| warp::reject::custom(ApiError::RedemptionError(e.to_string())))?; + Ok(warp::reply::json(&"Fees redeemed successfully")) +} + +// ----------- +// | Helpers | +// ----------- + +/// Handle a rejection from an +async fn handle_rejection(err: warp::Rejection) -> Result { + if let Some(api_error) = err.find::() { + let (code, message) = match api_error { + ApiError::IndexingError(msg) => (warp::http::StatusCode::BAD_REQUEST, msg), + ApiError::RedemptionError(msg) => (warp::http::StatusCode::BAD_REQUEST, msg), + ApiError::InternalError(msg) => (warp::http::StatusCode::INTERNAL_SERVER_ERROR, msg), + }; + error!("API Error: {:?}", api_error); + Ok(warp::reply::with_status(message.clone(), code)) + } else { + error!("Unhandled rejection: {:?}", err); + Err(err) + } +} + +/// Helper function to clone and pass the server to filters +fn with_server( + server: Arc, +) -> impl Filter,), Error = std::convert::Infallible> + Clone { + warp::any().map(move || server.clone()) +}