diff --git a/Cargo.toml b/Cargo.toml index 61bf799..a88c086 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "compliance/compliance-api", "dealer/renegade-dealer", "dealer/renegade-dealer-api", - "fee-sweeper", + "funds-manager", "price-reporter", ] diff --git a/fee-sweeper/src/indexer/index_fees.rs b/fee-sweeper/src/indexer/index_fees.rs deleted file mode 100644 index b0e4bb3..0000000 --- a/fee-sweeper/src/indexer/index_fees.rs +++ /dev/null @@ -1,125 +0,0 @@ -//! Phase one of the sweeper's execution; index all fees since the last -//! consistent block - -use alloy_sol_types::SolCall; -use arbitrum_client::abi::settleOfflineFeeCall; -use arbitrum_client::{ - abi::NotePostedFilter, constants::SELECTOR_LEN, - helpers::parse_note_ciphertext_from_settle_offline_fee, -}; -use ethers::contract::LogMeta; -use ethers::middleware::Middleware; -use ethers::types::TxHash; -use renegade_circuit_types::elgamal::ElGamalCiphertext; -use renegade_circuit_types::native_helpers::elgamal_decrypt; -use renegade_circuit_types::note::{Note, NOTE_CIPHERTEXT_SIZE}; -use renegade_circuit_types::wallet::NoteCommitment; -use renegade_constants::Scalar; -use renegade_crypto::fields::{scalar_to_biguint, scalar_to_u128, u256_to_scalar}; -use renegade_util::raw_err_str; -use tracing::info; - -use crate::db::models::NewFee; -use crate::Indexer; - -impl Indexer { - /// Index all fees since the given block - pub async fn index_fees(&mut self) -> Result<(), String> { - let block_number = self.get_latest_block()?; - info!("indexing fees from block {block_number}"); - - let filter = self - .arbitrum_client - .get_darkpool_client() - .event::() - .from_block(block_number); - - let events = filter - .query_with_meta() - .await - .map_err(raw_err_str!("failed to create note posted stream: {}"))?; - - let mut most_recent_block = block_number; - for (event, meta) in events { - let block = meta.block_number.as_u64(); - let note_comm = u256_to_scalar(&event.note_commitment); - self.index_note(note_comm, meta).await?; - - if block > most_recent_block { - most_recent_block = block; - self.update_latest_block(most_recent_block)?; - } - } - - Ok(()) - } - - /// Index a note - async fn index_note(&mut self, note_comm: NoteCommitment, meta: LogMeta) -> Result<(), String> { - let note = self.get_note_from_tx(meta.transaction_hash).await?; - let tx = format!("{:#x}", meta.transaction_hash); - if note.commitment() != note_comm { - info!("not receiver, skipping"); - return Ok(()); - } else { - info!("indexing note from tx: {tx}"); - } - - // Check that the note's nullifier has not been spent - let nullifier = note.nullifier(); - if self - .arbitrum_client - .check_nullifier_used(nullifier) - .await - .map_err(raw_err_str!("failed to check nullifier: {}"))? - { - info!("note nullifier already spent, skipping"); - return Ok(()); - } - - // Otherwise, index the note - let fee = NewFee::new_from_note(¬e, tx); - self.insert_fee(fee) - } - - /// Get a note from a transaction body - pub(crate) async fn get_note_from_tx(&self, tx_hash: TxHash) -> Result { - // Parse the note from the tx - let tx = self - .arbitrum_client - .get_darkpool_client() - .client() - .get_transaction(tx_hash) - .await - .map_err(raw_err_str!("failed to query tx: {}"))? - .ok_or_else(|| format!("tx not found: {}", tx_hash))?; - - let calldata: Vec = tx.input.to_vec(); - let selector: [u8; 4] = calldata[..SELECTOR_LEN].try_into().unwrap(); - let encryption = match selector { - ::SELECTOR => { - parse_note_ciphertext_from_settle_offline_fee(&calldata) - .map_err(raw_err_str!("failed to parse ciphertext: {}"))? - }, - sel => return Err(format!("invalid selector when parsing note: {sel:?}")), - }; - - // Decrypt the note - let note = self.decrypt_note(&encryption); - Ok(note) - } - - /// Decrypt a note using the decryption key - fn decrypt_note(&self, note: &ElGamalCiphertext) -> Note { - // The ciphertext stores all note values except the encryption key - let cleartext_values: [Scalar; NOTE_CIPHERTEXT_SIZE] = - elgamal_decrypt(note, &self.decryption_key); - - Note { - mint: scalar_to_biguint(&cleartext_values[0]), - amount: scalar_to_u128(&cleartext_values[1]), - receiver: self.decryption_key.public_key(), - blinder: cleartext_values[2], - } - } -} diff --git a/fee-sweeper/Cargo.toml b/funds-manager/Cargo.toml similarity index 90% rename from fee-sweeper/Cargo.toml rename to funds-manager/Cargo.toml index e7700f6..3346be7 100644 --- a/fee-sweeper/Cargo.toml +++ b/funds-manager/Cargo.toml @@ -1,12 +1,15 @@ [package] name = "fee-sweeper" +description = "Manages custody of funds for protocol operator" version = "0.1.0" edition = "2021" [dependencies] -# === CLI + Runtime === # +# === CLI + Server === # clap = { version = "4.5.3", features = ["derive", "env"] } +http-body-util = "0.1.0" tokio = { version = "1.10", features = ["full"] } +warp = "0.3" # === Infra === # aws-sdk-secretsmanager = "1.37" diff --git a/fee-sweeper/diesel.toml b/funds-manager/diesel.toml similarity index 100% rename from fee-sweeper/diesel.toml rename to funds-manager/diesel.toml diff --git a/fee-sweeper/migrations/.keep b/funds-manager/migrations/.keep similarity index 100% rename from fee-sweeper/migrations/.keep rename to funds-manager/migrations/.keep diff --git a/fee-sweeper/migrations/00000000000000_diesel_initial_setup/down.sql b/funds-manager/migrations/00000000000000_diesel_initial_setup/down.sql similarity index 100% rename from fee-sweeper/migrations/00000000000000_diesel_initial_setup/down.sql rename to funds-manager/migrations/00000000000000_diesel_initial_setup/down.sql diff --git a/fee-sweeper/migrations/00000000000000_diesel_initial_setup/up.sql b/funds-manager/migrations/00000000000000_diesel_initial_setup/up.sql similarity index 100% rename from fee-sweeper/migrations/00000000000000_diesel_initial_setup/up.sql rename to funds-manager/migrations/00000000000000_diesel_initial_setup/up.sql diff --git a/fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/down.sql b/funds-manager/migrations/2024-06-15-202249_create_last_indexed_table/down.sql similarity index 100% rename from fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/down.sql rename to funds-manager/migrations/2024-06-15-202249_create_last_indexed_table/down.sql diff --git a/fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/up.sql b/funds-manager/migrations/2024-06-15-202249_create_last_indexed_table/up.sql similarity index 100% rename from fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/up.sql rename to funds-manager/migrations/2024-06-15-202249_create_last_indexed_table/up.sql diff --git a/fee-sweeper/migrations/2024-06-15-203503_create_fees_table/down.sql b/funds-manager/migrations/2024-06-15-203503_create_fees_table/down.sql similarity index 100% rename from fee-sweeper/migrations/2024-06-15-203503_create_fees_table/down.sql rename to funds-manager/migrations/2024-06-15-203503_create_fees_table/down.sql diff --git a/fee-sweeper/migrations/2024-06-15-203503_create_fees_table/up.sql b/funds-manager/migrations/2024-06-15-203503_create_fees_table/up.sql similarity index 100% rename from fee-sweeper/migrations/2024-06-15-203503_create_fees_table/up.sql rename to funds-manager/migrations/2024-06-15-203503_create_fees_table/up.sql diff --git a/fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/down.sql b/funds-manager/migrations/2024-06-16-003335_create_wallets_table/down.sql similarity index 100% rename from fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/down.sql rename to funds-manager/migrations/2024-06-16-003335_create_wallets_table/down.sql diff --git a/fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/up.sql b/funds-manager/migrations/2024-06-16-003335_create_wallets_table/up.sql similarity index 100% rename from fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/up.sql rename to funds-manager/migrations/2024-06-16-003335_create_wallets_table/up.sql diff --git a/fee-sweeper/src/db/mod.rs b/funds-manager/src/db/mod.rs similarity index 100% rename from fee-sweeper/src/db/mod.rs rename to funds-manager/src/db/mod.rs diff --git a/fee-sweeper/src/db/models.rs b/funds-manager/src/db/models.rs similarity index 100% rename from fee-sweeper/src/db/models.rs rename to funds-manager/src/db/models.rs diff --git a/fee-sweeper/src/db/schema.rs b/funds-manager/src/db/schema.rs similarity index 100% rename from fee-sweeper/src/db/schema.rs rename to funds-manager/src/db/schema.rs diff --git a/funds-manager/src/error.rs b/funds-manager/src/error.rs new file mode 100644 index 0000000..b3a29a9 --- /dev/null +++ b/funds-manager/src/error.rs @@ -0,0 +1,69 @@ +//! Error types for the funds manager + +use std::{error::Error, fmt::Display}; + +use warp::reject::Reject; + +/// The error type emitted by the funds manager +#[derive(Debug, Clone)] +pub enum FundsManagerError { + /// An error with the arbitrum client + Arbitrum(String), + /// An error with a database query + Db(String), + /// An error executing an HTTP request + Http(String), + /// An error parsing a value + Parse(String), + /// An error with AWS secrets manager + SecretsManager(String), + /// A miscellaneous error + Custom(String), +} + +impl FundsManagerError { + /// Create an arbitrum error + pub fn arbitrum(msg: T) -> FundsManagerError { + FundsManagerError::Arbitrum(msg.to_string()) + } + + /// Create a database error + pub fn db(msg: T) -> FundsManagerError { + FundsManagerError::Db(msg.to_string()) + } + + /// Create an HTTP error + pub fn http(msg: T) -> FundsManagerError { + FundsManagerError::Http(msg.to_string()) + } + + /// Create a parse error + pub fn parse(msg: T) -> FundsManagerError { + FundsManagerError::Parse(msg.to_string()) + } + + /// Create a secrets manager error + pub fn secrets_manager(msg: T) -> FundsManagerError { + FundsManagerError::SecretsManager(msg.to_string()) + } + + /// Create a custom error + pub fn custom(msg: T) -> FundsManagerError { + FundsManagerError::Custom(msg.to_string()) + } +} + +impl Display for FundsManagerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FundsManagerError::Arbitrum(e) => write!(f, "Arbitrum error: {}", e), + FundsManagerError::Db(e) => write!(f, "Database error: {}", e), + FundsManagerError::Http(e) => write!(f, "HTTP error: {}", e), + FundsManagerError::Parse(e) => write!(f, "Parse error: {}", e), + FundsManagerError::SecretsManager(e) => write!(f, "Secrets manager error: {}", e), + FundsManagerError::Custom(e) => write!(f, "Custom error: {}", e), + } + } +} +impl Error for FundsManagerError {} +impl Reject for FundsManagerError {} diff --git a/funds-manager/src/indexer/index_fees.rs b/funds-manager/src/indexer/index_fees.rs new file mode 100644 index 0000000..7d653e0 --- /dev/null +++ b/funds-manager/src/indexer/index_fees.rs @@ -0,0 +1,182 @@ +//! Phase one of the sweeper's execution; index all fees since the last +//! consistent block + +use alloy_sol_types::SolCall; +use arbitrum_client::abi::settleOfflineFeeCall; +use arbitrum_client::{ + abi::NotePostedFilter, constants::SELECTOR_LEN, + helpers::parse_note_ciphertext_from_settle_offline_fee, +}; +use ethers::contract::LogMeta; +use ethers::middleware::Middleware; +use ethers::types::TxHash; +use renegade_circuit_types::elgamal::{DecryptionKey, ElGamalCiphertext}; +use renegade_circuit_types::native_helpers::elgamal_decrypt; +use renegade_circuit_types::note::{Note, NOTE_CIPHERTEXT_SIZE}; +use renegade_circuit_types::wallet::NoteCommitment; +use renegade_constants::Scalar; +use renegade_crypto::fields::{scalar_to_biguint, scalar_to_u128, u256_to_scalar}; +use renegade_util::err_str; +use tracing::info; + +use crate::db::models::NewFee; +use crate::error::FundsManagerError; +use crate::Indexer; + +impl Indexer { + /// Index all fees since the given block + pub async fn index_fees(&mut self) -> Result<(), FundsManagerError> { + let block_number = self.get_latest_block()?; + info!("indexing fees from block {block_number}"); + + let filter = self + .arbitrum_client + .get_darkpool_client() + .event::() + .from_block(block_number); + + let events = filter + .query_with_meta() + .await + .map_err(|_| FundsManagerError::arbitrum("failed to create note posted stream"))?; + + let mut most_recent_block = block_number; + for (event, meta) in events { + let block = meta.block_number.as_u64(); + let note_comm = u256_to_scalar(&event.note_commitment); + self.index_note(note_comm, meta).await?; + + if block > most_recent_block { + most_recent_block = block; + self.update_latest_block(most_recent_block)?; + } + } + + Ok(()) + } + + /// Index a note + async fn index_note( + &mut self, + note_comm: NoteCommitment, + meta: LogMeta, + ) -> Result<(), FundsManagerError> { + let maybe_note = self.get_note_from_tx(meta.transaction_hash, note_comm).await?; + let tx = format!("{:#x}", meta.transaction_hash); + let note = match maybe_note { + Some(note) => note, + None => { + info!("note not found, skipping"); + return Ok(()); + }, + }; + info!("indexing note from tx: {tx}"); + + // Check that the note's nullifier has not been spent + let nullifier = note.nullifier(); + if self + .arbitrum_client + .check_nullifier_used(nullifier) + .await + .map_err(|_| FundsManagerError::db("failed to check nullifier"))? + { + info!("note nullifier already spent, skipping"); + return Ok(()); + } + + // Otherwise, index the note + let fee = NewFee::new_from_note(¬e, tx); + self.insert_fee(fee) + } + + /// Get a note from a transaction body + /// + /// Checks the note's commitment against the provided commitment, returning + /// `None` if they do not match + pub(crate) async fn get_note_from_tx( + &self, + tx_hash: TxHash, + note_comm: NoteCommitment, + ) -> Result, FundsManagerError> { + // Parse the note from the tx then decrypt it + let cipher = self.get_ciphertext_from_tx(tx_hash).await?; + Ok(self.decrypt_note(&cipher, note_comm)) + } + + /// Get a note from a transaction body using the given key to decrypt it + pub(crate) async fn get_note_from_tx_with_key( + &self, + tx_hash: TxHash, + decryption_key: &DecryptionKey, + ) -> Result { + // Parse the note from the tx the decrypt + let cipher = self.get_ciphertext_from_tx(tx_hash).await?; + Ok(self.decrypt_note_with_key(&cipher, decryption_key)) + } + + /// Get the ciphertext of a note from a tx body + async fn get_ciphertext_from_tx( + &self, + tx_hash: TxHash, + ) -> Result, FundsManagerError> { + let tx = self + .arbitrum_client + .get_darkpool_client() + .client() + .get_transaction(tx_hash) + .await + .map_err(err_str!(FundsManagerError::Arbitrum))? + .ok_or_else(|| FundsManagerError::arbitrum("tx not found"))?; + + let calldata: Vec = tx.input.to_vec(); + let selector: [u8; 4] = calldata[..SELECTOR_LEN].try_into().unwrap(); + let encryption = match selector { + ::SELECTOR => { + parse_note_ciphertext_from_settle_offline_fee(&calldata) + .map_err(err_str!(FundsManagerError::Arbitrum))? + }, + sel => { + return Err(FundsManagerError::arbitrum(format!( + "invalid selector when parsing note: {sel:?}" + ))) + }, + }; + + Ok(encryption) + } + + /// Decrypt a note using the decryption key + /// + /// Checks the decryption against the note's expected commitment, returns + /// `None` if the note does not match for any of the provided key + fn decrypt_note( + &self, + note: &ElGamalCiphertext, + note_comm: NoteCommitment, + ) -> Option { + // The ciphertext stores all note values except the encryption key + for key in self.decryption_keys.iter() { + let note = self.decrypt_note_with_key(note, key); + if note.commitment() == note_comm { + return Some(note); + } + } + + None + } + + /// Decrypt a note using the given key + fn decrypt_note_with_key( + &self, + note: &ElGamalCiphertext, + key: &DecryptionKey, + ) -> Note { + let cleartext_values: [Scalar; NOTE_CIPHERTEXT_SIZE] = elgamal_decrypt(note, key); + Note { + mint: scalar_to_biguint(&cleartext_values[0]), + amount: scalar_to_u128(&cleartext_values[1]), + receiver: key.public_key(), + blinder: cleartext_values[2], + } + } +} diff --git a/fee-sweeper/src/indexer/mod.rs b/funds-manager/src/indexer/mod.rs similarity index 72% rename from fee-sweeper/src/indexer/mod.rs rename to funds-manager/src/indexer/mod.rs index e089d91..9faa4ca 100644 --- a/fee-sweeper/src/indexer/mod.rs +++ b/funds-manager/src/indexer/mod.rs @@ -3,7 +3,7 @@ use arbitrum_client::{client::ArbitrumClient, constants::Chain}; use aws_config::SdkConfig as AwsConfig; use diesel::PgConnection; -use renegade_circuit_types::elgamal::DecryptionKey; +use renegade_circuit_types::elgamal::{DecryptionKey, EncryptionKey}; use crate::relayer_client::RelayerClient; @@ -22,7 +22,7 @@ pub(crate) struct Indexer { /// The Arbitrum client pub arbitrum_client: ArbitrumClient, /// The decryption key - pub decryption_key: DecryptionKey, + pub decryption_keys: Vec, /// A connection to the DB pub db_conn: PgConnection, /// The AWS config @@ -36,7 +36,7 @@ impl Indexer { chain: Chain, aws_config: AwsConfig, arbitrum_client: ArbitrumClient, - decryption_key: DecryptionKey, + decryption_keys: Vec, db_conn: PgConnection, relayer_client: RelayerClient, ) -> Self { @@ -44,10 +44,16 @@ impl Indexer { chain_id, chain, arbitrum_client, - decryption_key, + decryption_keys, db_conn, relayer_client, aws_config, } } + + /// Get the decryption key for a given encryption key, referred to as a + /// receiver in this context + pub fn get_key_for_receiver(&self, receiver: EncryptionKey) -> Option<&DecryptionKey> { + self.decryption_keys.iter().find(|key| key.public_key() == receiver) + } } diff --git a/fee-sweeper/src/indexer/queries.rs b/funds-manager/src/indexer/queries.rs similarity index 77% rename from fee-sweeper/src/indexer/queries.rs rename to funds-manager/src/indexer/queries.rs index 87171f2..a5b4174 100644 --- a/fee-sweeper/src/indexer/queries.rs +++ b/funds-manager/src/indexer/queries.rs @@ -12,7 +12,6 @@ use diesel::sql_types::{Array, Integer, Nullable, Numeric, Text}; use diesel::PgArrayExpressionMethods; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; use renegade_constants::MAX_BALANCES; -use renegade_util::raw_err_str; use crate::db::models::WalletMetadata; use crate::db::models::{Metadata, NewFee}; @@ -25,6 +24,7 @@ use crate::db::schema::{ }, wallets::dsl::{mints as managed_mints_col, wallets as wallet_table}, }; +use crate::error::FundsManagerError; use crate::Indexer; use super::redeem_fees::MAX_FEES_REDEEMED; @@ -56,6 +56,9 @@ pub(crate) struct FeeValue { /// The mint of the fee #[sql_type = "Text"] pub mint: String, + /// The receiver of the mint + #[sql_type = "Text"] + pub receiver: String, /// The value of the fee #[sql_type = "Numeric"] #[allow(unused)] @@ -72,24 +75,30 @@ impl Indexer { // ------------------ /// Get the latest block number - pub(crate) fn get_latest_block(&mut self) -> Result { + pub(crate) fn get_latest_block(&mut self) -> Result { let entry = metadata_table .filter(metadata_key.eq(LAST_INDEXED_BLOCK_KEY)) .limit(1) .load(&mut self.db_conn) .map(|res: Vec| res[0].clone()) - .map_err(raw_err_str!("failed to query latest block: {}"))?; + .map_err(|_| FundsManagerError::db("failed to query latest block"))?; - entry.value.parse::().map_err(raw_err_str!("failed to parse latest block: {}")) + entry + .value + .parse::() + .map_err(|_| FundsManagerError::db("could not parse latest block")) } /// Update the latest block number - pub(crate) fn update_latest_block(&mut self, block_number: u64) -> Result<(), String> { + pub(crate) fn update_latest_block( + &mut self, + block_number: u64, + ) -> Result<(), FundsManagerError> { let block_string = block_number.to_string(); diesel::update(metadata_table.find(LAST_INDEXED_BLOCK_KEY)) .set(metadata_value.eq(block_string)) .execute(&mut self.db_conn) - .map_err(raw_err_str!("failed to update latest block: {}")) + .map_err(|_| FundsManagerError::db("failed to update latest block")) .map(|_| ()) } @@ -98,33 +107,33 @@ impl Indexer { // -------------- /// Insert a fee into the fees table - pub(crate) fn insert_fee(&mut self, fee: NewFee) -> Result<(), String> { + pub(crate) fn insert_fee(&mut self, fee: NewFee) -> Result<(), FundsManagerError> { diesel::insert_into(fees_table) .values(vec![fee]) .execute(&mut self.db_conn) - .map_err(raw_err_str!("failed to insert fee: {}")) + .map_err(|_| FundsManagerError::db("failed to insert fee: {}")) .map(|_| ()) } /// Get all mints that have unredeemed fees - pub(crate) fn get_unredeemed_fee_mints(&mut self) -> Result, String> { + pub(crate) fn get_unredeemed_fee_mints(&mut self) -> Result, FundsManagerError> { let mints = fees_table .select(mint_col) .filter(redeemed_col.eq(false)) .distinct() .load(&mut self.db_conn) - .map_err(raw_err_str!("failed to query unredeemed fees: {}"))?; + .map_err(|_| FundsManagerError::db("failed to query unredeemed fees"))?; Ok(mints) } /// Mark a fee as redeemed - pub(crate) fn mark_fee_as_redeemed(&mut self, tx_hash: &str) -> Result<(), String> { + pub(crate) fn mark_fee_as_redeemed(&mut self, tx_hash: &str) -> Result<(), FundsManagerError> { let filter = tx_hash_col.eq(tx_hash); diesel::update(fees_table.filter(filter)) .set(redeemed_col.eq(true)) .execute(&mut self.db_conn) - .map_err(raw_err_str!("failed to mark fee as redeemed: {}")) + .map_err(|_| FundsManagerError::db("failed to mark fee as redeemed")) .map(|_| ()) } @@ -134,8 +143,7 @@ impl Indexer { pub(crate) fn get_most_valuable_fees( &mut self, prices: HashMap, - receiver: &str, - ) -> Result, String> { + ) -> Result, FundsManagerError> { if prices.is_empty() { return Ok(vec![]); } @@ -152,7 +160,7 @@ impl Indexer { // FROM fees // ORDER BY value DESC; let mut query_string = String::new(); - query_string.push_str("SELECT tx_hash, mint, "); + query_string.push_str("SELECT tx_hash, mint, receiver, "); query_string.push_str("CASE "); // Add the cases @@ -160,8 +168,7 @@ impl Indexer { query_string.push_str(&format!("WHEN mint = '{}' then amount * {} ", mint, price)); } query_string.push_str("ELSE 0 END as value "); - query_string - .push_str(&format!("FROM fees WHERE redeemed = false and receiver = '{}'", receiver)); + query_string.push_str("FROM fees WHERE redeemed = false "); // Sort and limit query_string.push_str(&format!("ORDER BY value DESC LIMIT {};", MAX_FEES_REDEEMED)); @@ -169,7 +176,7 @@ impl Indexer { // Query for the tx hashes sql_query(query_string) .load(&mut self.db_conn) - .map_err(raw_err_str!("failed to query most valuable fees: {}")) + .map_err(|_| FundsManagerError::db("failed to query most valuable fees")) } // ----------------- @@ -182,11 +189,11 @@ impl Indexer { pub(crate) fn get_wallet_for_mint( &mut self, mint: &str, - ) -> Result, String> { + ) -> Result, FundsManagerError> { let wallets: Vec = wallet_table .filter(managed_mints_col.contains(vec![mint])) .load(&mut self.db_conn) - .map_err(raw_err_str!("failed to query wallet for mint: {}"))?; + .map_err(|_| FundsManagerError::db("failed to query wallet for mint"))?; Ok(wallets.first().cloned()) } @@ -194,22 +201,25 @@ impl Indexer { /// Find a wallet with an empty balance slot, if one exists pub(crate) fn find_wallet_with_empty_balance( &mut self, - ) -> Result, String> { + ) -> Result, FundsManagerError> { let n_mints = coalesce(array_length(managed_mints_col, 1 /* dim */), 0); let wallets = wallet_table .filter(n_mints.lt(MAX_BALANCES as i32)) .load(&mut self.db_conn) - .map_err(raw_err_str!("failed to query wallets with empty balances: {}"))?; + .map_err(|_| FundsManagerError::db("failed to query wallets with empty balances"))?; Ok(wallets.first().cloned()) } /// Insert a new wallet into the wallets table - pub(crate) fn insert_wallet(&mut self, wallet: WalletMetadata) -> Result<(), String> { + pub(crate) fn insert_wallet( + &mut self, + wallet: WalletMetadata, + ) -> Result<(), FundsManagerError> { diesel::insert_into(wallet_table) .values(vec![wallet]) .execute(&mut self.db_conn) - .map_err(raw_err_str!("failed to insert wallet: {}")) + .map_err(|_| FundsManagerError::db("failed to insert wallet")) .map(|_| ()) } } diff --git a/fee-sweeper/src/indexer/redeem_fees.rs b/funds-manager/src/indexer/redeem_fees.rs similarity index 77% rename from fee-sweeper/src/indexer/redeem_fees.rs rename to funds-manager/src/indexer/redeem_fees.rs index ec03ef3..e71e0c0 100644 --- a/fee-sweeper/src/indexer/redeem_fees.rs +++ b/funds-manager/src/indexer/redeem_fees.rs @@ -9,16 +9,17 @@ use ethers::signers::LocalWallet; use ethers::types::TxHash; use ethers::utils::hex; use renegade_api::http::wallet::RedeemNoteRequest; +use renegade_circuit_types::elgamal::DecryptionKey; use renegade_circuit_types::note::Note; use renegade_common::types::wallet::derivation::{ derive_blinder_seed, derive_share_seed, derive_wallet_id, derive_wallet_keychain, }; use renegade_common::types::wallet::{Wallet, WalletIdentifier}; -use renegade_util::hex::jubjub_to_hex_string; -use renegade_util::raw_err_str; +use renegade_util::err_str; use tracing::{info, warn}; use crate::db::models::WalletMetadata; +use crate::error::FundsManagerError; use crate::Indexer; /// The maximum number of fees to redeem in a given run of the indexer @@ -26,7 +27,7 @@ pub(crate) const MAX_FEES_REDEEMED: usize = 20; impl Indexer { /// Redeem the most valuable open fees - pub async fn redeem_fees(&mut self) -> Result<(), String> { + pub async fn redeem_fees(&mut self) -> Result<(), FundsManagerError> { info!("redeeming fees..."); // Get all mints that have unredeemed fees @@ -45,14 +46,13 @@ impl Indexer { } // Get the most valuable fees and redeem them - let recv = jubjub_to_hex_string(&self.decryption_key.public_key()); - let most_valuable_fees = self.get_most_valuable_fees(prices, &recv)?; + let most_valuable_fees = self.get_most_valuable_fees(prices)?; // TODO: Filter by those fees whose present value exceeds the expected gas costs // to redeem for fee in most_valuable_fees.into_iter() { let wallet = self.get_or_create_wallet(&fee.mint).await?; - self.redeem_note_into_wallet(fee.tx_hash.clone(), wallet).await?; + self.redeem_note_into_wallet(fee.tx_hash.clone(), fee.receiver, wallet).await?; } Ok(()) @@ -63,7 +63,10 @@ impl Indexer { // ------------------- /// Find or create a wallet to store balances of a given mint - async fn get_or_create_wallet(&mut self, mint: &str) -> Result { + async fn get_or_create_wallet( + &mut self, + mint: &str, + ) -> Result { let maybe_wallet = self.get_wallet_for_mint(mint)?; let maybe_wallet = maybe_wallet.or_else(|| self.find_wallet_with_empty_balance().ok().flatten()); @@ -80,7 +83,7 @@ impl Indexer { /// Create a new wallet for managing a given mint /// /// Return the new wallet's metadata - async fn create_new_wallet(&mut self) -> Result { + async fn create_new_wallet(&mut self) -> Result { // 1. Create the new wallet on-chain let (wallet_id, root_key) = self.create_renegade_wallet().await?; @@ -95,13 +98,16 @@ impl Indexer { } /// Create a new Renegade wallet on-chain - async fn create_renegade_wallet(&mut self) -> Result<(WalletIdentifier, LocalWallet), String> { + async fn create_renegade_wallet( + &mut self, + ) -> Result<(WalletIdentifier, LocalWallet), FundsManagerError> { let root_key = LocalWallet::new(&mut thread_rng()); - let wallet_id = derive_wallet_id(&root_key)?; - let blinder_seed = derive_blinder_seed(&root_key)?; - let share_seed = derive_share_seed(&root_key)?; - let key_chain = derive_wallet_keychain(&root_key, self.chain_id)?; + let wallet_id = derive_wallet_id(&root_key).map_err(FundsManagerError::custom)?; + let blinder_seed = derive_blinder_seed(&root_key).map_err(FundsManagerError::custom)?; + let share_seed = derive_share_seed(&root_key).map_err(FundsManagerError::custom)?; + let key_chain = + derive_wallet_keychain(&root_key, self.chain_id).map_err(FundsManagerError::custom)?; let wallet = Wallet::new_empty_wallet(wallet_id, blinder_seed, share_seed, key_chain); self.relayer_client.create_new_wallet(wallet).await?; @@ -118,8 +124,9 @@ impl Indexer { pub async fn redeem_note_into_wallet( &mut self, tx: String, + receiver: String, wallet: WalletMetadata, - ) -> Result { + ) -> Result { info!("redeeming fee into {}", wallet.id); // Get the wallet key for the given wallet let eth_key = self.get_wallet_private_key(&wallet).await?; @@ -129,11 +136,12 @@ impl Indexer { self.relayer_client.check_wallet_indexed(wallet.id, self.chain_id, ð_key).await?; // Find the note in the tx body - let tx_hash = TxHash::from_str(&tx).map_err(raw_err_str!("invalid tx hash: {}"))?; - let note = self.get_note_from_tx(tx_hash).await?; + let tx_hash = TxHash::from_str(&tx).map_err(err_str!(FundsManagerError::Parse))?; + let receiver = DecryptionKey::from_hex_str(&receiver).unwrap(); + let note = self.get_note_from_tx_with_key(tx_hash, &receiver).await?; // Redeem the note through the relayer - let req = RedeemNoteRequest { note: note.clone(), decryption_key: self.decryption_key }; + let req = RedeemNoteRequest { note: note.clone(), decryption_key: receiver }; self.relayer_client.redeem_note(wallet.id, req, &root_key).await?; // Mark the fee as redeemed @@ -142,13 +150,17 @@ impl Indexer { } /// Mark a fee as redeemed if its nullifier is spent on-chain - async fn maybe_mark_redeemed(&mut self, tx_hash: &str, note: &Note) -> Result<(), String> { + async fn maybe_mark_redeemed( + &mut self, + tx_hash: &str, + note: &Note, + ) -> Result<(), FundsManagerError> { let nullifier = note.nullifier(); if !self .arbitrum_client .check_nullifier_used(nullifier) .await - .map_err(raw_err_str!("failed to check nullifier: {}"))? + .map_err(err_str!(FundsManagerError::Arbitrum))? { return Ok(()); } @@ -169,7 +181,7 @@ impl Indexer { &mut self, id: WalletIdentifier, wallet: LocalWallet, - ) -> Result { + ) -> Result { let client = SecretsManagerClient::new(&self.aws_config); let secret_name = format!("redemption-wallet-{}-{id}", self.chain); let secret_val = hex::encode(wallet.signer().to_bytes()); @@ -185,7 +197,7 @@ impl Indexer { .description("Wallet used for fee redemption") .send() .await - .map_err(raw_err_str!("Error creating secret: {}"))?; + .map_err(err_str!(FundsManagerError::SecretsManager))?; Ok(secret_name) } @@ -194,7 +206,7 @@ impl Indexer { async fn get_wallet_private_key( &mut self, metadata: &WalletMetadata, - ) -> Result { + ) -> Result { let client = SecretsManagerClient::new(&self.aws_config); let secret_name = format!("redemption-wallet-{}-{}", self.chain, metadata.id); @@ -203,11 +215,11 @@ impl Indexer { .secret_id(secret_name) .send() .await - .map_err(raw_err_str!("Error fetching secret: {}"))?; + .map_err(err_str!(FundsManagerError::SecretsManager))?; let secret_str = secret.secret_string().unwrap(); let wallet = - LocalWallet::from_str(secret_str).map_err(raw_err_str!("Invalid wallet secret: {}"))?; + LocalWallet::from_str(secret_str).map_err(err_str!(FundsManagerError::Parse))?; Ok(wallet) } } diff --git a/fee-sweeper/src/main.rs b/funds-manager/src/main.rs similarity index 50% rename from fee-sweeper/src/main.rs rename to funds-manager/src/main.rs index 65da756..ee09215 100644 --- a/fee-sweeper/src/main.rs +++ b/funds-manager/src/main.rs @@ -7,27 +7,30 @@ #![feature(trivial_bounds)] pub mod db; +pub mod error; pub mod indexer; pub mod relayer_client; -use aws_config::{BehaviorVersion, Region}; +use aws_config::{BehaviorVersion, Region, SdkConfig}; use diesel::{pg::PgConnection, Connection}; +use error::FundsManagerError; use ethers::signers::LocalWallet; use indexer::Indexer; use relayer_client::RelayerClient; use renegade_circuit_types::elgamal::DecryptionKey; use renegade_util::{ - raw_err_str, + err_str, raw_err_str, telemetry::{setup_system_logger, LevelFilter}, }; -use std::{error::Error, str::FromStr}; +use std::{error::Error, str::FromStr, sync::Arc}; use arbitrum_client::{ client::{ArbitrumClient, ArbitrumClientConfig}, constants::Chain, }; use clap::Parser; +use warp::{reply::Json, Filter}; // ------------- // | Constants | @@ -43,13 +46,13 @@ const DEFAULT_REGION: &str = "us-east-2"; // ------- /// The cli for the fee sweeper -#[derive(Debug, Parser)] +#[derive(Clone, Debug, Parser)] struct Cli { /// The URL of the relayer to use #[clap(long)] relayer_url: String, /// The Arbitrum RPC url to use - #[clap(short, long)] + #[clap(short, long, env = "RPC_URL")] rpc_url: String, /// The address of the darkpool contract #[clap(short = 'a', long)] @@ -58,24 +61,62 @@ struct Cli { #[clap(long, default_value = "mainnet")] chain: Chain, /// The fee decryption key to use - #[clap(short, long)] - decryption_key: String, + #[clap(short, long, env = "RELAYER_DECRYPTION_KEY")] + relayer_decryption_key: String, + /// The fee decryption key to use for the protocol fees + /// + /// This argument is not necessary, protocol fee indexing is skipped if this + /// is omitted + #[clap(short, long, env = "PROTOCOL_DECRYPTION_KEY")] + protocol_decryption_key: Option, /// The arbitrum private key used to submit transactions - #[clap(long = "pkey")] + #[clap(long = "pkey", env = "ARBITRUM_PRIVATE_KEY")] arbitrum_private_key: String, /// The database url - #[clap(long)] + #[clap(long, env = "DATABASE_URL")] db_url: String, /// The token address of the USDC token, used to get prices for fee /// redemption #[clap(long)] usdc_mint: String, + /// The port to run the server on + #[clap(long, default_value = "3000")] + port: u16, +} + +/// The server +#[derive(Clone)] +struct Server { + /// The id of the chain this indexer targets + pub chain_id: u64, + /// The chain this indexer targets + pub chain: Chain, + /// A client for interacting with the relayer + pub relayer_client: RelayerClient, + /// The Arbitrum client + pub arbitrum_client: ArbitrumClient, + /// The decryption key + pub decryption_keys: Vec, + /// The DB url + pub db_url: String, + /// The AWS config + pub aws_config: SdkConfig, } -impl Cli { - /// Build a connection to the DB - pub fn build_db_conn(&self) -> Result { - PgConnection::establish(&self.db_url).map_err(|e| e.to_string()) +impl Server { + /// Build an indexer + pub fn build_indexer(&self) -> Result { + let db_conn = + PgConnection::establish(&self.db_url).map_err(err_str!(FundsManagerError::Db))?; + Ok(Indexer::new( + self.chain_id, + self.chain, + self.aws_config.clone(), + self.arbitrum_client.clone(), + self.decryption_keys.clone(), + db_conn, + self.relayer_client.clone(), + )) } } @@ -84,7 +125,6 @@ impl Cli { async fn main() -> Result<(), Box> { setup_system_logger(LevelFilter::INFO); let cli = Cli::parse(); - let db_conn = cli.build_db_conn()?; // Parse an AWS config let config = aws_config::defaults(BehaviorVersion::latest()) @@ -105,15 +145,29 @@ async fn main() -> Result<(), Box> { let chain_id = client.chain_id().await.map_err(raw_err_str!("Error fetching chain ID: {}"))?; // Build the indexer - let key = DecryptionKey::from_hex_str(&cli.decryption_key)?; + let mut decryption_keys = vec![DecryptionKey::from_hex_str(&cli.relayer_decryption_key)?]; + if let Some(protocol_key) = cli.protocol_decryption_key { + decryption_keys.push(DecryptionKey::from_hex_str(&protocol_key)?); + } + let relayer_client = RelayerClient::new(&cli.relayer_url, &cli.usdc_mint); - let mut indexer = - Indexer::new(chain_id, cli.chain, config, client, key, db_conn, relayer_client); + let server = Server { + chain_id, + chain: cli.chain, + relayer_client: relayer_client.clone(), + arbitrum_client: client.clone(), + decryption_keys, + db_url: cli.db_url, + aws_config: config, + }; + + // Define routes + let ping = warp::get() + .and(warp::path("ping")) + .map(|| warp::reply::with_status("PONG", warp::http::StatusCode::OK)); - // 1. Index all new fees in the DB - indexer.index_fees().await?; - // 2. Redeem fees according to the redemption policy - indexer.redeem_fees().await?; + let routes = ping; + warp::serve(routes).run(([0, 0, 0, 0], cli.port)).await; Ok(()) } diff --git a/fee-sweeper/src/relayer_client.rs b/funds-manager/src/relayer_client.rs similarity index 83% rename from fee-sweeper/src/relayer_client.rs rename to funds-manager/src/relayer_client.rs index 1d7b12d..9e229fd 100644 --- a/fee-sweeper/src/relayer_client.rs +++ b/funds-manager/src/relayer_client.rs @@ -32,18 +32,21 @@ use renegade_common::types::{ }, }; use renegade_crypto::fields::scalar_to_biguint; -use renegade_util::{get_current_time_millis, raw_err_str}; +use renegade_util::{err_str, get_current_time_millis}; use reqwest::{Body, Client}; use serde::{Deserialize, Serialize}; use tracing::warn; use uuid::Uuid; +use crate::error::FundsManagerError; + /// The interval at which to poll relayer task status const POLL_INTERVAL_MS: u64 = 1000; /// The amount of time (ms) to declare a wallet signature value for const SIG_EXPIRATION_BUFFER_MS: u64 = 5000; /// A client for interacting with a configured relayer +#[derive(Clone)] pub struct RelayerClient { /// The base URL of the relayer base_url: String, @@ -58,7 +61,7 @@ impl RelayerClient { } /// Get the price for a given mint - pub async fn get_binance_price(&self, mint: &str) -> Result, String> { + pub async fn get_binance_price(&self, mint: &str) -> Result, FundsManagerError> { if mint == self.usdc_mint { return Ok(Some(1.0)); } @@ -88,7 +91,7 @@ impl RelayerClient { wallet_id: WalletIdentifier, chain_id: u64, eth_key: &LocalWallet, - ) -> Result<(), String> { + ) -> Result<(), FundsManagerError> { let mut path = GET_WALLET_ROUTE.to_string(); path = path.replace(":wallet_id", &wallet_id.to_string()); @@ -103,7 +106,11 @@ impl RelayerClient { } /// Lookup a wallet in the configured relayer - async fn lookup_wallet(&self, chain_id: u64, eth_key: &LocalWallet) -> Result<(), String> { + async fn lookup_wallet( + &self, + chain_id: u64, + eth_key: &LocalWallet, + ) -> Result<(), FundsManagerError> { let path = FIND_WALLET_ROUTE.to_string(); let wallet_id = derive_wallet_id(eth_key).unwrap(); let blinder_seed = derive_blinder_seed(eth_key).unwrap(); @@ -123,7 +130,7 @@ impl RelayerClient { } /// Create a new wallet via the configured relayer - pub(crate) async fn create_new_wallet(&self, wallet: Wallet) -> Result<(), String> { + pub(crate) async fn create_new_wallet(&self, wallet: Wallet) -> Result<(), FundsManagerError> { let body = CreateWalletRequest { wallet: wallet.into() }; let resp: CreateWalletResponse = self.post_relayer(CREATE_WALLET_ROUTE, &body).await?; @@ -136,7 +143,7 @@ impl RelayerClient { wallet_id: WalletIdentifier, req: RedeemNoteRequest, root_key: &SecretSigningKey, - ) -> Result<(), String> { + ) -> Result<(), FundsManagerError> { let mut path = REDEEM_NOTE_ROUTE.to_string(); path = path.replace(":wallet_id", &wallet_id.to_string()); @@ -149,7 +156,11 @@ impl RelayerClient { // ----------- /// Post to the relayer URL - async fn post_relayer(&self, path: &str, body: &Req) -> Result + async fn post_relayer( + &self, + path: &str, + body: &Req, + ) -> Result where Req: Serialize, Resp: for<'de> Deserialize<'de>, @@ -163,14 +174,14 @@ impl RelayerClient { path: &str, body: &Req, root_key: &SecretSigningKey, - ) -> Result + ) -> Result where Req: Serialize, Resp: for<'de> Deserialize<'de>, { - let body_ser = - serde_json::to_vec(body).map_err(raw_err_str!("Failed to serialize body: {}"))?; - let headers = build_auth_headers(root_key, &body_ser)?; + let body_ser = serde_json::to_vec(body).map_err(err_str!(FundsManagerError::Custom))?; + let headers = + build_auth_headers(root_key, &body_ser).map_err(err_str!(FundsManagerError::custom))?; self.post_relayer_with_headers(path, body, &headers).await } @@ -180,7 +191,7 @@ impl RelayerClient { path: &str, body: &Req, headers: &HeaderMap, - ) -> Result + ) -> Result where Req: Serialize, Resp: for<'de> Deserialize<'de>, @@ -194,18 +205,21 @@ impl RelayerClient { .headers(headers.clone()) .send() .await - .map_err(raw_err_str!("Failed to send request: {}"))?; + .map_err(err_str!(FundsManagerError::Http))?; // Deserialize the response if !resp.status().is_success() { - return Err(format!("Failed to send request: {}", resp.status())); + return Err(FundsManagerError::http(format!( + "Failed to send request: {}", + resp.status() + ))); } - resp.json::().await.map_err(raw_err_str!("Failed to parse response: {}")) + resp.json::().await.map_err(err_str!(FundsManagerError::Parse)) } /// Get from the relayer URL - async fn get_relayer(&self, path: &str) -> Result + async fn get_relayer(&self, path: &str) -> Result where Resp: for<'de> Deserialize<'de>, { @@ -217,11 +231,12 @@ impl RelayerClient { &self, path: &str, root_key: &SecretSigningKey, - ) -> Result + ) -> Result where Resp: for<'de> Deserialize<'de>, { - let headers = build_auth_headers(root_key, &[])?; + let headers = + build_auth_headers(root_key, &[]).map_err(err_str!(FundsManagerError::Custom))?; self.get_relayer_with_headers(path, &headers).await } @@ -230,7 +245,7 @@ impl RelayerClient { &self, path: &str, headers: &HeaderMap, - ) -> Result + ) -> Result where Resp: for<'de> Deserialize<'de>, { @@ -241,18 +256,21 @@ impl RelayerClient { .headers(headers.clone()) .send() .await - .map_err(raw_err_str!("Failed to get relayer path: {}"))?; + .map_err(err_str!(FundsManagerError::Http))?; // Parse the response if !resp.status().is_success() { - return Err(format!("Failed to get relayer path: {}", resp.status())); + return Err(FundsManagerError::http(format!( + "Failed to get relayer path: {}", + resp.status() + ))); } - resp.json::().await.map_err(raw_err_str!("Failed to parse response: {}")) + resp.json::().await.map_err(err_str!(FundsManagerError::Parse)) } /// Await a relayer task - async fn await_relayer_task(&self, task_id: Uuid) -> Result<(), String> { + async fn await_relayer_task(&self, task_id: Uuid) -> Result<(), FundsManagerError> { let mut path = GET_TASK_STATUS_ROUTE.to_string(); path = path.replace(":task_id", &task_id.to_string()); @@ -279,11 +297,11 @@ impl RelayerClient { // ----------- /// Build a reqwest client -fn reqwest_client() -> Result { +fn reqwest_client() -> Result { Client::builder() .user_agent("fee-sweeper") .build() - .map_err(raw_err_str!("Failed to create reqwest client: {}")) + .map_err(|_| FundsManagerError::custom("Failed to create reqwest client")) } /// Build authentication headers for a request