diff --git a/fee-sweeper/src/src/db/mod.rs b/fee-sweeper/src/src/db/mod.rs new file mode 100644 index 0000000..5402351 --- /dev/null +++ b/fee-sweeper/src/src/db/mod.rs @@ -0,0 +1,5 @@ +//! Database code + +pub mod models; +#[allow(missing_docs)] +pub mod schema; diff --git a/fee-sweeper/src/src/db/models.rs b/fee-sweeper/src/src/db/models.rs new file mode 100644 index 0000000..5ad7bfa --- /dev/null +++ b/fee-sweeper/src/src/db/models.rs @@ -0,0 +1,88 @@ +#![allow(missing_docs)] +#![allow(trivial_bounds)] + +use bigdecimal::BigDecimal; +use diesel::prelude::*; +use num_bigint::BigInt; +use renegade_circuit_types::note::Note; +use renegade_crypto::fields::scalar_to_bigint; +use renegade_util::hex::{biguint_to_hex_addr, jubjub_to_hex_string}; +use uuid::Uuid; + +use crate::db::schema::fees; + +/// A fee that has been indexed by the indexer +#[derive(Queryable, Selectable)] +#[diesel(table_name = crate::db::schema::fees)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[allow(missing_docs, clippy::missing_docs_in_private_items)] +pub struct Fee { + pub id: i32, + pub tx_hash: String, + pub mint: String, + pub amount: BigDecimal, + pub blinder: BigDecimal, + pub receiver: String, + pub redeemed: bool, +} + +/// A new fee inserted into the database +#[derive(Insertable)] +#[diesel(table_name = fees)] +pub struct NewFee { + pub tx_hash: String, + pub mint: String, + pub amount: BigDecimal, + pub blinder: BigDecimal, + pub receiver: String, +} + +impl NewFee { + /// Construct a fee from a note + pub fn new_from_note(note: &Note, tx_hash: String) -> Self { + let mint = biguint_to_hex_addr(¬e.mint); + let amount = BigInt::from(note.amount).into(); + let blinder = scalar_to_bigint(¬e.blinder).into(); + let receiver = jubjub_to_hex_string(¬e.receiver); + + NewFee { + tx_hash, + mint, + amount, + blinder, + receiver, + } + } +} + +/// Metadata information maintained by the indexer +#[derive(Clone, Queryable, Selectable)] +#[diesel(table_name = crate::db::schema::indexing_metadata)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[allow(missing_docs, clippy::missing_docs_in_private_items)] +pub struct Metadata { + pub key: String, + pub value: String, +} + +/// A metadata entry for a wallet managed by the indexer +#[derive(Clone, Queryable, Selectable, Insertable)] +#[diesel(table_name = crate::db::schema::wallets)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[allow(missing_docs, clippy::missing_docs_in_private_items)] +pub struct WalletMetadata { + pub id: Uuid, + pub mints: Vec>, + pub secret_id: String, +} + +impl WalletMetadata { + /// Construct a new wallet metadata entry + pub fn empty(id: Uuid, secret_id: String) -> Self { + WalletMetadata { + id, + mints: vec![], + secret_id, + } + } +} diff --git a/fee-sweeper/src/src/db/schema.rs b/fee-sweeper/src/src/db/schema.rs new file mode 100644 index 0000000..2363365 --- /dev/null +++ b/fee-sweeper/src/src/db/schema.rs @@ -0,0 +1,34 @@ +// @generated automatically by Diesel CLI. + +diesel::table! { + fees (id) { + id -> Int4, + tx_hash -> Text, + mint -> Text, + amount -> Numeric, + blinder -> Numeric, + receiver -> Text, + redeemed -> Bool, + } +} + +diesel::table! { + indexing_metadata (key) { + key -> Text, + value -> Text, + } +} + +diesel::table! { + wallets (id) { + id -> Uuid, + mints -> Array>, + secret_id -> Text, + } +} + +diesel::allow_tables_to_appear_in_same_query!( + fees, + indexing_metadata, + wallets, +); diff --git a/fee-sweeper/src/src/indexer/index_fees.rs b/fee-sweeper/src/src/indexer/index_fees.rs new file mode 100644 index 0000000..b8c1f02 --- /dev/null +++ b/fee-sweeper/src/src/indexer/index_fees.rs @@ -0,0 +1,124 @@ +//! Phase one of the sweeper's execution; index all fees since the last consistent block + +use alloy_sol_types::SolCall; +use arbitrum_client::abi::settleOfflineFeeCall; +use arbitrum_client::{ + abi::NotePostedFilter, constants::SELECTOR_LEN, + helpers::parse_note_ciphertext_from_settle_offline_fee, +}; +use ethers::contract::LogMeta; +use ethers::middleware::Middleware; +use ethers::types::TxHash; +use renegade_circuit_types::elgamal::ElGamalCiphertext; +use renegade_circuit_types::native_helpers::elgamal_decrypt; +use renegade_circuit_types::note::{Note, NOTE_CIPHERTEXT_SIZE}; +use renegade_circuit_types::wallet::NoteCommitment; +use renegade_constants::Scalar; +use renegade_crypto::fields::{scalar_to_biguint, scalar_to_u128, u256_to_scalar}; +use renegade_util::raw_err_str; +use tracing::info; + +use crate::db::models::NewFee; +use crate::Indexer; + +impl Indexer { + /// Index all fees since the given block + pub async fn index_fees(&mut self) -> Result<(), String> { + let block_number = self.get_latest_block()?; + info!("indexing fees from block {block_number}"); + + let filter = self + .arbitrum_client + .get_darkpool_client() + .event::() + .from_block(block_number); + + let events = filter + .query_with_meta() + .await + .map_err(raw_err_str!("failed to create note posted stream: {}"))?; + + let mut most_recent_block = block_number; + for (event, meta) in events { + let block = meta.block_number.as_u64(); + let note_comm = u256_to_scalar(&event.note_commitment); + self.index_note(note_comm, meta).await?; + + if block > most_recent_block { + most_recent_block = block; + self.update_latest_block(most_recent_block)?; + } + } + + Ok(()) + } + + /// Index a note + async fn index_note(&mut self, note_comm: NoteCommitment, meta: LogMeta) -> Result<(), String> { + let note = self.get_note_from_tx(meta.transaction_hash).await?; + let tx = format!("{:#x}", meta.transaction_hash); + if note.commitment() != note_comm { + info!("not receiver, skipping"); + return Ok(()); + } else { + info!("indexing note from tx: {tx}"); + } + + // Check that the note's nullifier has not been spent + let nullifier = note.nullifier(); + if self + .arbitrum_client + .check_nullifier_used(nullifier) + .await + .map_err(raw_err_str!("failed to check nullifier: {}"))? + { + info!("note nullifier already spent, skipping"); + return Ok(()); + } + + // Otherwise, index the note + let fee = NewFee::new_from_note(¬e, tx); + self.insert_fee(fee) + } + + /// Get a note from a transaction body + pub(crate) async fn get_note_from_tx(&self, tx_hash: TxHash) -> Result { + // Parse the note from the tx + let tx = self + .arbitrum_client + .get_darkpool_client() + .client() + .get_transaction(tx_hash) + .await + .map_err(raw_err_str!("failed to query tx: {}"))? + .ok_or_else(|| format!("tx not found: {}", tx_hash))?; + + let calldata: Vec = tx.input.to_vec(); + let selector: [u8; 4] = calldata[..SELECTOR_LEN].try_into().unwrap(); + let encryption = match selector { + ::SELECTOR => { + parse_note_ciphertext_from_settle_offline_fee(&calldata) + .map_err(raw_err_str!("failed to parse ciphertext: {}"))? + } + sel => return Err(format!("invalid selector when parsing note: {sel:?}")), + }; + + // Decrypt the note + let note = self.decrypt_note(&encryption); + Ok(note) + } + + /// Decrypt a note using the decryption key + fn decrypt_note(&self, note: &ElGamalCiphertext) -> Note { + // The ciphertext stores all note values except the encryption key + let cleartext_values: [Scalar; NOTE_CIPHERTEXT_SIZE] = + elgamal_decrypt(note, &self.decryption_key); + + Note { + mint: scalar_to_biguint(&cleartext_values[0]), + amount: scalar_to_u128(&cleartext_values[1]), + receiver: self.decryption_key.public_key(), + blinder: cleartext_values[2], + } + } +} diff --git a/fee-sweeper/src/src/indexer/mod.rs b/fee-sweeper/src/src/indexer/mod.rs new file mode 100644 index 0000000..e089d91 --- /dev/null +++ b/fee-sweeper/src/src/indexer/mod.rs @@ -0,0 +1,53 @@ +//! The indexer handles the indexing and redemption of fee notes + +use arbitrum_client::{client::ArbitrumClient, constants::Chain}; +use aws_config::SdkConfig as AwsConfig; +use diesel::PgConnection; +use renegade_circuit_types::elgamal::DecryptionKey; + +use crate::relayer_client::RelayerClient; + +pub mod index_fees; +pub mod queries; +pub mod redeem_fees; + +/// Stores the dependencies needed to index the chain +pub(crate) struct Indexer { + /// The id of the chain this indexer targets + pub chain_id: u64, + /// The chain this indexer targets + pub chain: Chain, + /// A client for interacting with the relayer + pub relayer_client: RelayerClient, + /// The Arbitrum client + pub arbitrum_client: ArbitrumClient, + /// The decryption key + pub decryption_key: DecryptionKey, + /// A connection to the DB + pub db_conn: PgConnection, + /// The AWS config + pub aws_config: AwsConfig, +} + +impl Indexer { + /// Constructor + pub fn new( + chain_id: u64, + chain: Chain, + aws_config: AwsConfig, + arbitrum_client: ArbitrumClient, + decryption_key: DecryptionKey, + db_conn: PgConnection, + relayer_client: RelayerClient, + ) -> Self { + Indexer { + chain_id, + chain, + arbitrum_client, + decryption_key, + db_conn, + relayer_client, + aws_config, + } + } +} diff --git a/fee-sweeper/src/src/indexer/queries.rs b/fee-sweeper/src/src/indexer/queries.rs new file mode 100644 index 0000000..6f162b4 --- /dev/null +++ b/fee-sweeper/src/src/indexer/queries.rs @@ -0,0 +1,222 @@ +//! Groups query logic for the indexer + +use std::collections::HashMap; + +use bigdecimal::BigDecimal; +use diesel::define_sql_function; +use diesel::deserialize::Queryable; +use diesel::deserialize::QueryableByName; +use diesel::sql_query; +use diesel::sql_types::SingleValue; +use diesel::sql_types::{Array, Integer, Nullable, Numeric, Text}; +use diesel::PgArrayExpressionMethods; +use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; +use renegade_constants::MAX_BALANCES; +use renegade_util::raw_err_str; + +use crate::db::models::WalletMetadata; +use crate::db::models::{Metadata, NewFee}; +use crate::db::schema::{ + fees::dsl::{ + fees as fees_table, mint as mint_col, redeemed as redeemed_col, tx_hash as tx_hash_col, + }, + indexing_metadata::dsl::{ + indexing_metadata as metadata_table, key as metadata_key, value as metadata_value, + }, + wallets::dsl::{mints as managed_mints_col, wallets as wallet_table}, +}; +use crate::Indexer; + +use super::redeem_fees::MAX_FEES_REDEEMED; + +/// The metadata key for the last indexed block +pub(crate) const LAST_INDEXED_BLOCK_KEY: &str = "latest_block"; + +// Define the `array_length` function +define_sql_function! { + /// Calculate the length of an array + fn array_length(array: Array, dim: Integer) -> Nullable; +} + +define_sql_function! { + /// Coalesce a nullable value with a default value + fn coalesce(x: Nullable, y: T) -> T; +} + +// --------------- +// | Query Types | +// --------------- + +/// A sub-query of the most valuable fees to be redeemed +#[derive(Debug, Queryable, QueryableByName)] +pub(crate) struct FeeValue { + /// The tx hash of the fee + #[sql_type = "Text"] + pub tx_hash: String, + /// The mint of the fee + #[sql_type = "Text"] + pub mint: String, + /// The value of the fee + #[sql_type = "Numeric"] + #[allow(unused)] + pub value: BigDecimal, +} + +// ------------------------- +// | Query Implementations | +// ------------------------- + +impl Indexer { + // ------------------ + // | Metadata Table | + // ------------------ + + /// Get the latest block number + pub(crate) fn get_latest_block(&mut self) -> Result { + let entry = metadata_table + .filter(metadata_key.eq(LAST_INDEXED_BLOCK_KEY)) + .limit(1) + .load(&mut self.db_conn) + .map(|res: Vec| res[0].clone()) + .map_err(raw_err_str!("failed to query latest block: {}"))?; + + entry + .value + .parse::() + .map_err(raw_err_str!("failed to parse latest block: {}")) + } + + /// Update the latest block number + pub(crate) fn update_latest_block(&mut self, block_number: u64) -> Result<(), String> { + let block_string = block_number.to_string(); + diesel::update(metadata_table.find(LAST_INDEXED_BLOCK_KEY)) + .set(metadata_value.eq(block_string)) + .execute(&mut self.db_conn) + .map_err(raw_err_str!("failed to update latest block: {}")) + .map(|_| ()) + } + + // -------------- + // | Fees Table | + // -------------- + + /// Insert a fee into the fees table + pub(crate) fn insert_fee(&mut self, fee: NewFee) -> Result<(), String> { + diesel::insert_into(fees_table) + .values(vec![fee]) + .execute(&mut self.db_conn) + .map_err(raw_err_str!("failed to insert fee: {}")) + .map(|_| ()) + } + + /// Get all mints that have unredeemed fees + pub(crate) fn get_unredeemed_fee_mints(&mut self) -> Result, String> { + let mints = fees_table + .select(mint_col) + .filter(redeemed_col.eq(false)) + .distinct() + .load(&mut self.db_conn) + .map_err(raw_err_str!("failed to query unredeemed fees: {}"))?; + + Ok(mints) + } + + /// Mark a fee as redeemed + pub(crate) fn mark_fee_as_redeemed(&mut self, tx_hash: &str) -> Result<(), String> { + let filter = tx_hash_col.eq(tx_hash); + diesel::update(fees_table.filter(filter)) + .set(redeemed_col.eq(true)) + .execute(&mut self.db_conn) + .map_err(raw_err_str!("failed to mark fee as redeemed: {}")) + .map(|_| ()) + } + + /// Get the most valuable fees to be redeemed + /// + /// Returns the tx hashes of the most valuable fees to be redeemed + pub(crate) fn get_most_valuable_fees( + &mut self, + prices: HashMap, + receiver: &str, + ) -> Result, String> { + if prices.is_empty() { + return Ok(vec![]); + } + + // We query the fees table with a transformation that calculates the value of each fee using the prices passed in. + // This query looks something like: + // SELECT tx_hash, mint, amount, + // CASE + // WHEN mint = '' then amount * + // WHEN mint = '' then amount * + // ... + // ELSE 0 + // END as value + // FROM fees + // ORDER BY value DESC; + let mut query_string = String::new(); + query_string.push_str("SELECT tx_hash, mint, "); + query_string.push_str("CASE "); + + // Add the cases + for (mint, price) in prices.into_iter() { + query_string.push_str(&format!("WHEN mint = '{}' then amount * {} ", mint, price)); + } + query_string.push_str("ELSE 0 END as value "); + query_string.push_str(&format!( + "FROM fees WHERE redeemed = false and receiver = '{}'", + receiver + )); + + // Sort and limit + query_string.push_str(&format!("ORDER BY value DESC LIMIT {};", MAX_FEES_REDEEMED)); + + // Query for the tx hashes + sql_query(query_string) + .load(&mut self.db_conn) + .map_err(raw_err_str!("failed to query most valuable fees: {}")) + } + + // ----------------- + // | Wallets Table | + // ----------------- + + /// Get the wallet managing an mint, if it exists + /// + /// Returns the id and secret id of the wallet + pub(crate) fn get_wallet_for_mint( + &mut self, + mint: &str, + ) -> Result, String> { + let wallets: Vec = wallet_table + .filter(managed_mints_col.contains(vec![mint])) + .load(&mut self.db_conn) + .map_err(raw_err_str!("failed to query wallet for mint: {}"))?; + + Ok(wallets.first().cloned()) + } + + /// Find a wallet with an empty balance slot, if one exists + pub(crate) fn find_wallet_with_empty_balance( + &mut self, + ) -> Result, String> { + let n_mints = coalesce(array_length(managed_mints_col, 1 /* dim */), 0); + let wallets = wallet_table + .filter(n_mints.lt(MAX_BALANCES as i32)) + .load(&mut self.db_conn) + .map_err(raw_err_str!( + "failed to query wallets with empty balances: {}" + ))?; + + Ok(wallets.first().cloned()) + } + + /// Insert a new wallet into the wallets table + pub(crate) fn insert_wallet(&mut self, wallet: WalletMetadata) -> Result<(), String> { + diesel::insert_into(wallet_table) + .values(vec![wallet]) + .execute(&mut self.db_conn) + .map_err(raw_err_str!("failed to insert wallet: {}")) + .map(|_| ()) + } +} diff --git a/fee-sweeper/src/src/indexer/redeem_fees.rs b/fee-sweeper/src/src/indexer/redeem_fees.rs new file mode 100644 index 0000000..ec03ef3 --- /dev/null +++ b/fee-sweeper/src/src/indexer/redeem_fees.rs @@ -0,0 +1,213 @@ +//! Fee redemption logic + +use std::collections::HashMap; +use std::str::FromStr; + +use aws_sdk_secretsmanager::Client as SecretsManagerClient; +use ethers::core::rand::thread_rng; +use ethers::signers::LocalWallet; +use ethers::types::TxHash; +use ethers::utils::hex; +use renegade_api::http::wallet::RedeemNoteRequest; +use renegade_circuit_types::note::Note; +use renegade_common::types::wallet::derivation::{ + derive_blinder_seed, derive_share_seed, derive_wallet_id, derive_wallet_keychain, +}; +use renegade_common::types::wallet::{Wallet, WalletIdentifier}; +use renegade_util::hex::jubjub_to_hex_string; +use renegade_util::raw_err_str; +use tracing::{info, warn}; + +use crate::db::models::WalletMetadata; +use crate::Indexer; + +/// The maximum number of fees to redeem in a given run of the indexer +pub(crate) const MAX_FEES_REDEEMED: usize = 20; + +impl Indexer { + /// Redeem the most valuable open fees + pub async fn redeem_fees(&mut self) -> Result<(), String> { + info!("redeeming fees..."); + + // Get all mints that have unredeemed fees + let mints = self.get_unredeemed_fee_mints()?; + + // Get the prices of each redeemable mint, we want to redeem the most profitable + // fees first + let mut prices = HashMap::new(); + for mint in mints.into_iter() { + let maybe_price = self.relayer_client.get_binance_price(&mint).await?; + if let Some(price) = maybe_price { + prices.insert(mint, price); + } else { + warn!("{}: no price", mint); + } + } + + // Get the most valuable fees and redeem them + let recv = jubjub_to_hex_string(&self.decryption_key.public_key()); + let most_valuable_fees = self.get_most_valuable_fees(prices, &recv)?; + + // TODO: Filter by those fees whose present value exceeds the expected gas costs + // to redeem + for fee in most_valuable_fees.into_iter() { + let wallet = self.get_or_create_wallet(&fee.mint).await?; + self.redeem_note_into_wallet(fee.tx_hash.clone(), wallet).await?; + } + + Ok(()) + } + + // ------------------- + // | Wallet Creation | + // ------------------- + + /// Find or create a wallet to store balances of a given mint + async fn get_or_create_wallet(&mut self, mint: &str) -> Result { + let maybe_wallet = self.get_wallet_for_mint(mint)?; + let maybe_wallet = + maybe_wallet.or_else(|| self.find_wallet_with_empty_balance().ok().flatten()); + + match maybe_wallet { + Some(wallet) => Ok(wallet), + None => { + info!("creating new wallet for {mint}"); + self.create_new_wallet().await + }, + } + } + + /// Create a new wallet for managing a given mint + /// + /// Return the new wallet's metadata + async fn create_new_wallet(&mut self) -> Result { + // 1. Create the new wallet on-chain + let (wallet_id, root_key) = self.create_renegade_wallet().await?; + + // 2. Create a secrets manager entry for the new wallet + let secret_name = self.create_secrets_manager_entry(wallet_id, root_key).await?; + + // 3. Add an entry in the wallets table for the newly created wallet + let entry = WalletMetadata::empty(wallet_id, secret_name); + self.insert_wallet(entry.clone())?; + + Ok(entry) + } + + /// Create a new Renegade wallet on-chain + async fn create_renegade_wallet(&mut self) -> Result<(WalletIdentifier, LocalWallet), String> { + let root_key = LocalWallet::new(&mut thread_rng()); + + let wallet_id = derive_wallet_id(&root_key)?; + let blinder_seed = derive_blinder_seed(&root_key)?; + let share_seed = derive_share_seed(&root_key)?; + let key_chain = derive_wallet_keychain(&root_key, self.chain_id)?; + + let wallet = Wallet::new_empty_wallet(wallet_id, blinder_seed, share_seed, key_chain); + self.relayer_client.create_new_wallet(wallet).await?; + info!("created new wallet for fee redemption"); + + Ok((wallet_id, root_key)) + } + + // ------------------ + // | Fee Redemption | + // ------------------ + + /// Redeem a note into a wallet + pub async fn redeem_note_into_wallet( + &mut self, + tx: String, + wallet: WalletMetadata, + ) -> Result { + info!("redeeming fee into {}", wallet.id); + // Get the wallet key for the given wallet + let eth_key = self.get_wallet_private_key(&wallet).await?; + let wallet_keychain = derive_wallet_keychain(ð_key, self.chain_id).unwrap(); + let root_key = wallet_keychain.secret_keys.sk_root.clone().unwrap(); + + self.relayer_client.check_wallet_indexed(wallet.id, self.chain_id, ð_key).await?; + + // Find the note in the tx body + let tx_hash = TxHash::from_str(&tx).map_err(raw_err_str!("invalid tx hash: {}"))?; + let note = self.get_note_from_tx(tx_hash).await?; + + // Redeem the note through the relayer + let req = RedeemNoteRequest { note: note.clone(), decryption_key: self.decryption_key }; + self.relayer_client.redeem_note(wallet.id, req, &root_key).await?; + + // Mark the fee as redeemed + self.maybe_mark_redeemed(&tx, ¬e).await?; + Ok(note) + } + + /// Mark a fee as redeemed if its nullifier is spent on-chain + async fn maybe_mark_redeemed(&mut self, tx_hash: &str, note: &Note) -> Result<(), String> { + let nullifier = note.nullifier(); + if !self + .arbitrum_client + .check_nullifier_used(nullifier) + .await + .map_err(raw_err_str!("failed to check nullifier: {}"))? + { + return Ok(()); + } + + info!("successfully redeemed fee from tx: {}", tx_hash); + self.mark_fee_as_redeemed(tx_hash) + } + + // ------------------- + // | Secrets Manager | + // ------------------- + + /// Add a Renegade wallet to the secrets manager entry so that it may be + /// recovered later + /// + /// Returns the name of the secret + async fn create_secrets_manager_entry( + &mut self, + id: WalletIdentifier, + wallet: LocalWallet, + ) -> Result { + let client = SecretsManagerClient::new(&self.aws_config); + let secret_name = format!("redemption-wallet-{}-{id}", self.chain); + let secret_val = hex::encode(wallet.signer().to_bytes()); + + // Check that the `LocalWallet` recovers the same + debug_assert_eq!(LocalWallet::from_str(&secret_val).unwrap(), wallet); + + // Store the secret in AWS + client + .create_secret() + .name(secret_name.clone()) + .secret_string(secret_val) + .description("Wallet used for fee redemption") + .send() + .await + .map_err(raw_err_str!("Error creating secret: {}"))?; + + Ok(secret_name) + } + + /// Get the private key for a wallet specified by its metadata + async fn get_wallet_private_key( + &mut self, + metadata: &WalletMetadata, + ) -> Result { + let client = SecretsManagerClient::new(&self.aws_config); + let secret_name = format!("redemption-wallet-{}-{}", self.chain, metadata.id); + + let secret = client + .get_secret_value() + .secret_id(secret_name) + .send() + .await + .map_err(raw_err_str!("Error fetching secret: {}"))?; + + let secret_str = secret.secret_string().unwrap(); + let wallet = + LocalWallet::from_str(secret_str).map_err(raw_err_str!("Invalid wallet secret: {}"))?; + Ok(wallet) + } +} diff --git a/fee-sweeper/src/src/main.rs b/fee-sweeper/src/src/main.rs new file mode 100644 index 0000000..5bc6a54 --- /dev/null +++ b/fee-sweeper/src/src/main.rs @@ -0,0 +1,127 @@ +//! The fee sweeper, sweeps for unredeemed fees in the Renegade protocol and redeems them +#![deny(missing_docs)] +#![deny(clippy::missing_docs_in_private_items)] +#![deny(unsafe_code)] +#![deny(clippy::needless_pass_by_ref_mut)] +#![feature(trivial_bounds)] + +pub mod db; +pub mod indexer; +pub mod relayer_client; + +use aws_config::{BehaviorVersion, Region}; +use diesel::{pg::PgConnection, Connection}; +use ethers::signers::LocalWallet; +use indexer::Indexer; +use relayer_client::RelayerClient; +use renegade_circuit_types::elgamal::DecryptionKey; +use renegade_util::{ + raw_err_str, + telemetry::{setup_system_logger, LevelFilter}, +}; + +use std::{error::Error, str::FromStr}; + +use arbitrum_client::{ + client::{ArbitrumClient, ArbitrumClientConfig}, + constants::Chain, +}; +use clap::Parser; + +// ------------- +// | Constants | +// ------------- + +/// The block polling interval for the Arbitrum client +const BLOCK_POLLING_INTERVAL_MS: u64 = 100; +/// The default region in which to provision secrets manager secrets +const DEFAULT_REGION: &str = "us-east-2"; + +// ------- +// | Cli | +// ------- + +/// The cli for the fee sweeper +#[derive(Debug, Parser)] +struct Cli { + /// The URL of the relayer to use + #[clap(long)] + relayer_url: String, + /// The Arbitrum RPC url to use + #[clap(short, long)] + rpc_url: String, + /// The address of the darkpool contract + #[clap(short = 'a', long)] + darkpool_address: String, + /// The chain to redeem fees for + #[clap(long, default_value = "mainnet")] + chain: Chain, + /// The fee decryption key to use + #[clap(short, long)] + decryption_key: String, + /// The arbitrum private key used to submit transactions + #[clap(long = "pkey")] + arbitrum_private_key: String, + /// The database url + #[clap(long)] + db_url: String, + /// The token address of the USDC token, used to get prices for fee redemption + #[clap(long)] + usdc_mint: String, +} + +impl Cli { + /// Build a connection to the DB + pub fn build_db_conn(&self) -> Result { + PgConnection::establish(&self.db_url).map_err(|e| e.to_string()) + } +} + +/// Main +#[tokio::main] +async fn main() -> Result<(), Box> { + setup_system_logger(LevelFilter::INFO); + let cli = Cli::parse(); + let db_conn = cli.build_db_conn()?; + + // Parse an AWS config + let config = aws_config::defaults(BehaviorVersion::latest()) + .region(Region::new(DEFAULT_REGION)) + .load() + .await; + + // Build an Arbitrum client + let wallet = LocalWallet::from_str(&cli.arbitrum_private_key)?; + let conf = ArbitrumClientConfig { + darkpool_addr: cli.darkpool_address, + chain: cli.chain, + rpc_url: cli.rpc_url, + arb_priv_keys: vec![wallet], + block_polling_interval_ms: BLOCK_POLLING_INTERVAL_MS, + }; + let client = ArbitrumClient::new(conf).await?; + let chain_id = client + .chain_id() + .await + .map_err(raw_err_str!("Error fetching chain ID: {}"))?; + + // Build the indexer + let key = DecryptionKey::from_hex_str(&cli.decryption_key)?; + let relayer_client = RelayerClient::new(&cli.relayer_url, &cli.usdc_mint); + let mut indexer = Indexer::new( + chain_id, + cli.chain, + config, + client, + key, + db_conn, + relayer_client, + ); + + // 1. Index all new fees in the DB + indexer.index_fees().await?; + // 2. Redeem fees according to the redemption policy + indexer.redeem_fees().await?; + + Ok(()) +} diff --git a/fee-sweeper/src/src/relayer_client.rs b/fee-sweeper/src/src/relayer_client.rs new file mode 100644 index 0000000..3be9f66 --- /dev/null +++ b/fee-sweeper/src/src/relayer_client.rs @@ -0,0 +1,328 @@ +//! Client code for interacting with a configured relayer + +use std::time::Duration; + +use base64::engine::{general_purpose as b64_general_purpose, Engine}; +use ethers::{ + core::k256::ecdsa::{signature::Signer, Signature, SigningKey}, + signers::LocalWallet, +}; +use http::{HeaderMap, HeaderValue}; +use renegade_api::{ + http::{ + price_report::{GetPriceReportRequest, GetPriceReportResponse, PRICE_REPORT_ROUTE}, + task::{GetTaskStatusResponse, GET_TASK_STATUS_ROUTE}, + wallet::{ + CreateWalletRequest, CreateWalletResponse, FindWalletRequest, FindWalletResponse, + GetWalletResponse, RedeemNoteRequest, RedeemNoteResponse, CREATE_WALLET_ROUTE, + FIND_WALLET_ROUTE, GET_WALLET_ROUTE, REDEEM_NOTE_ROUTE, + }, + }, + RENEGADE_AUTH_HEADER_NAME, RENEGADE_SIG_EXPIRATION_HEADER_NAME, +}; +use renegade_circuit_types::keychain::SecretSigningKey; +use renegade_common::types::{ + exchange::PriceReporterState, + token::Token, + wallet::{ + derivation::{ + derive_blinder_seed, derive_share_seed, derive_wallet_id, derive_wallet_keychain, + }, + Wallet, WalletIdentifier, + }, +}; +use renegade_crypto::fields::scalar_to_biguint; +use renegade_util::{get_current_time_millis, raw_err_str}; +use reqwest::{Body, Client}; +use serde::{Deserialize, Serialize}; +use tracing::warn; +use uuid::Uuid; + +/// The interval at which to poll relayer task status +const POLL_INTERVAL_MS: u64 = 1000; +/// The amount of time (ms) to declare a wallet signature value for +const SIG_EXPIRATION_BUFFER_MS: u64 = 5000; + +/// A client for interacting with a configured relayer +pub struct RelayerClient { + /// The base URL of the relayer + base_url: String, + /// The mind of the USDC token + usdc_mint: String, +} + +impl RelayerClient { + /// Create a new relayer client + pub fn new(base_url: &str, usdc_mint: &str) -> Self { + Self { + base_url: base_url.to_string(), + usdc_mint: usdc_mint.to_string(), + } + } + + /// Get the price for a given mint + pub async fn get_binance_price(&self, mint: &str) -> Result, String> { + if mint == self.usdc_mint { + return Ok(Some(1.0)); + } + + let body = GetPriceReportRequest { + base_token: Token::from_addr(mint), + quote_token: Token::from_addr(&self.usdc_mint), + }; + let response: GetPriceReportResponse = self.post_relayer(PRICE_REPORT_ROUTE, &body).await?; + + match response.price_report { + PriceReporterState::Nominal(report) => Ok(Some(report.price)), + state => { + warn!("Price report state: {state:?}"); + Ok(None) + } + } + } + + // ------------------ + // | Wallet Methods | + // ------------------ + + /// Check that the relayer has a given wallet, lookup the wallet if not + pub async fn check_wallet_indexed( + &self, + wallet_id: WalletIdentifier, + chain_id: u64, + eth_key: &LocalWallet, + ) -> Result<(), String> { + let mut path = GET_WALLET_ROUTE.to_string(); + path = path.replace(":wallet_id", &wallet_id.to_string()); + + let keychain = derive_wallet_keychain(eth_key, chain_id).unwrap(); + let root_key = keychain.secret_keys.sk_root.unwrap(); + if self + .get_relayer_with_auth::(&path, &root_key) + .await + .is_ok() + { + return Ok(()); + } + + // Otherwise lookup the wallet + self.lookup_wallet(chain_id, eth_key).await + } + + /// Lookup a wallet in the configured relayer + async fn lookup_wallet(&self, chain_id: u64, eth_key: &LocalWallet) -> Result<(), String> { + let path = FIND_WALLET_ROUTE.to_string(); + let wallet_id = derive_wallet_id(eth_key).unwrap(); + let blinder_seed = derive_blinder_seed(eth_key).unwrap(); + let share_seed = derive_share_seed(eth_key).unwrap(); + let keychain = derive_wallet_keychain(eth_key, chain_id).unwrap(); + let root_key = keychain.secret_keys.sk_root.clone().unwrap(); + + let body = FindWalletRequest { + wallet_id, + secret_share_seed: scalar_to_biguint(&share_seed), + blinder_seed: scalar_to_biguint(&blinder_seed), + key_chain: keychain.into(), + }; + + let resp: FindWalletResponse = self.post_relayer_with_auth(&path, &body, &root_key).await?; + self.await_relayer_task(resp.task_id).await + } + + /// Create a new wallet via the configured relayer + pub(crate) async fn create_new_wallet(&self, wallet: Wallet) -> Result<(), String> { + let body = CreateWalletRequest { + wallet: wallet.into(), + }; + + let resp: CreateWalletResponse = self.post_relayer(CREATE_WALLET_ROUTE, &body).await?; + self.await_relayer_task(resp.task_id).await + } + + /// Redeem a note into a wallet + pub(crate) async fn redeem_note( + &self, + wallet_id: WalletIdentifier, + req: RedeemNoteRequest, + root_key: &SecretSigningKey, + ) -> Result<(), String> { + let mut path = REDEEM_NOTE_ROUTE.to_string(); + path = path.replace(":wallet_id", &wallet_id.to_string()); + + let resp: RedeemNoteResponse = self.post_relayer_with_auth(&path, &req, root_key).await?; + self.await_relayer_task(resp.task_id).await + } + + // ----------- + // | Helpers | + // ----------- + + /// Post to the relayer URL + async fn post_relayer(&self, path: &str, body: &Req) -> Result + where + Req: Serialize, + Resp: for<'de> Deserialize<'de>, + { + self.post_relayer_with_headers(path, body, &HeaderMap::new()) + .await + } + + /// Post to the relayer with wallet auth + async fn post_relayer_with_auth( + &self, + path: &str, + body: &Req, + root_key: &SecretSigningKey, + ) -> Result + where + Req: Serialize, + Resp: for<'de> Deserialize<'de>, + { + let body_ser = + serde_json::to_vec(body).map_err(raw_err_str!("Failed to serialize body: {}"))?; + let headers = build_auth_headers(root_key, &body_ser)?; + self.post_relayer_with_headers(path, body, &headers).await + } + + /// Post to the relayer with given headers + async fn post_relayer_with_headers( + &self, + path: &str, + body: &Req, + headers: &HeaderMap, + ) -> Result + where + Req: Serialize, + Resp: for<'de> Deserialize<'de>, + { + // Send a request + let client = reqwest_client()?; + let route = format!("{}{}", self.base_url, path); + let resp = client + .post(route) + .json(body) + .headers(headers.clone()) + .send() + .await + .map_err(raw_err_str!("Failed to send request: {}"))?; + + // Deserialize the response + if !resp.status().is_success() { + return Err(format!("Failed to send request: {}", resp.status())); + } + + resp.json::() + .await + .map_err(raw_err_str!("Failed to parse response: {}")) + } + + /// Get from the relayer URL + async fn get_relayer(&self, path: &str) -> Result + where + Resp: for<'de> Deserialize<'de>, + { + self.get_relayer_with_headers(path, &HeaderMap::new()).await + } + + /// Get from the relayer URL with wallet auth + async fn get_relayer_with_auth( + &self, + path: &str, + root_key: &SecretSigningKey, + ) -> Result + where + Resp: for<'de> Deserialize<'de>, + { + let headers = build_auth_headers(root_key, &[])?; + self.get_relayer_with_headers(path, &headers).await + } + + /// Get from the relayer URL with given headers + async fn get_relayer_with_headers( + &self, + path: &str, + headers: &HeaderMap, + ) -> Result + where + Resp: for<'de> Deserialize<'de>, + { + let client = reqwest_client()?; + let url = format!("{}{}", self.base_url, path); + let resp = client + .get(url) + .headers(headers.clone()) + .send() + .await + .map_err(raw_err_str!("Failed to get relayer path: {}"))?; + + // Parse the response + if !resp.status().is_success() { + return Err(format!("Failed to get relayer path: {}", resp.status())); + } + + resp.json::() + .await + .map_err(raw_err_str!("Failed to parse response: {}")) + } + + /// Await a relayer task + async fn await_relayer_task(&self, task_id: Uuid) -> Result<(), String> { + let mut path = GET_TASK_STATUS_ROUTE.to_string(); + path = path.replace(":task_id", &task_id.to_string()); + + // Enter a polling loop until the task finishes + let poll_interval = Duration::from_millis(POLL_INTERVAL_MS); + loop { + // For now, we assume that an error is a 404 in which case the task has completed + // TODO: Improve this break condition if it proves problematic + if self + .get_relayer::(&path) + .await + .is_err() + { + break; + } + + // Sleep for a bit before polling again + std::thread::sleep(poll_interval); + } + + Ok(()) + } +} + +// ----------- +// | Helpers | +// ----------- + +/// Build a reqwest client +fn reqwest_client() -> Result { + Client::builder() + .user_agent("fee-sweeper") + .build() + .map_err(raw_err_str!("Failed to create reqwest client: {}")) +} + +/// Build authentication headers for a request +fn build_auth_headers(key: &SecretSigningKey, req_bytes: &[u8]) -> Result { + let mut headers = HeaderMap::new(); + let expiration = get_current_time_millis() + SIG_EXPIRATION_BUFFER_MS; + headers.insert(RENEGADE_SIG_EXPIRATION_HEADER_NAME, expiration.into()); + + let root_key: SigningKey = key.try_into()?; + + // Sign the concatenation of the message and the expiration timestamp + let body = Body::from(req_bytes.to_vec()); + let msg_bytes = body.as_bytes().unwrap(); + let payload = [msg_bytes, &expiration.to_le_bytes()].concat(); + + let signature: Signature = root_key.sign(&payload); + let encoded_sig = b64_general_purpose::STANDARD_NO_PAD.encode(signature.to_bytes()); + + headers.insert( + RENEGADE_AUTH_HEADER_NAME, + HeaderValue::from_str(&encoded_sig).unwrap(), + ); + + Ok(headers) +}