diff --git a/fee-sweeper/Cargo.toml b/fee-sweeper/Cargo.toml new file mode 100644 index 0000000..d3e0545 --- /dev/null +++ b/fee-sweeper/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "fee-sweeper" +version = "0.1.0" +edition = "2021" + +[dependencies] +# === CLI + Runtime === # +clap = { version = "4.5.3", features = ["derive", "env"] } +tokio = { version = "1.10", features = ["full"] } + +# === Infra === # +aws-sdk-secretsmanager = "1.37" +aws-config = "1.5" +diesel = { version = "2.1", features = ["postgres", "numeric", "uuid"] } + +# === Blockchain Interaction === # +alloy-sol-types = "0.3.1" +ethers = "2" + +# === Renegade Dependencies === # +arbitrum-client = { git = "https://github.com/renegade-fi/renegade.git", features = [ + "rand", +] } +renegade-api = { package = "external-api", git = "https://github.com/renegade-fi/renegade.git" } +renegade-common = { package = "common", git = "https://github.com/renegade-fi/renegade.git" } +renegade-constants = { package = "constants", git = "https://github.com/renegade-fi/renegade.git" } +renegade-circuits = { package = "circuits", git = "https://github.com/renegade-fi/renegade.git" } +renegade-circuit-types = { package = "circuit-types", git = "https://github.com/renegade-fi/renegade.git" } +renegade-crypto = { git = "https://github.com/renegade-fi/renegade.git" } +renegade-util = { package = "util", git = "https://github.com/renegade-fi/renegade.git" } + +# === Misc Dependencies === # +base64 = "0.22" +bigdecimal = { version = "0.3", features = ["serde"] } +futures = "0.3" +http = "1.1" +num-bigint = "0.4" +reqwest = { version = "0.12", features = ["json"] } +serde = "1.0" +serde_json = "1.0" +tracing = "0.1" +uuid = "1.8" diff --git a/fee-sweeper/diesel.toml b/fee-sweeper/diesel.toml new file mode 100644 index 0000000..0010335 --- /dev/null +++ b/fee-sweeper/diesel.toml @@ -0,0 +1,9 @@ +# For documentation on how to configure this file, +# see https://diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/db/schema.rs" +custom_type_derives = ["diesel::query_builder::QueryId", "Clone"] + +[migrations_directory] +dir = "./migrations" diff --git a/fee-sweeper/migrations/.keep b/fee-sweeper/migrations/.keep new file mode 100644 index 0000000..e69de29 diff --git a/fee-sweeper/migrations/00000000000000_diesel_initial_setup/down.sql b/fee-sweeper/migrations/00000000000000_diesel_initial_setup/down.sql new file mode 100644 index 0000000..a9f5260 --- /dev/null +++ b/fee-sweeper/migrations/00000000000000_diesel_initial_setup/down.sql @@ -0,0 +1,6 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + +DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass); +DROP FUNCTION IF EXISTS diesel_set_updated_at(); diff --git a/fee-sweeper/migrations/00000000000000_diesel_initial_setup/up.sql b/fee-sweeper/migrations/00000000000000_diesel_initial_setup/up.sql new file mode 100644 index 0000000..d68895b --- /dev/null +++ b/fee-sweeper/migrations/00000000000000_diesel_initial_setup/up.sql @@ -0,0 +1,36 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + + + + +-- Sets up a trigger for the given table to automatically set a column called +-- `updated_at` whenever the row is modified (unless `updated_at` was included +-- in the modified columns) +-- +-- # Example +-- +-- ```sql +-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW()); +-- +-- SELECT diesel_manage_updated_at('users'); +-- ``` +CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$ +BEGIN + EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s + FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$ +BEGIN + IF ( + NEW IS DISTINCT FROM OLD AND + NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at + ) THEN + NEW.updated_at := current_timestamp; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; diff --git a/fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/down.sql b/fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/down.sql new file mode 100644 index 0000000..d9cbfe2 --- /dev/null +++ b/fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/down.sql @@ -0,0 +1,2 @@ +-- Drop the indexing metadata table +DROP TABLE IF EXISTS indexing_metadata; diff --git a/fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/up.sql b/fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/up.sql new file mode 100644 index 0000000..5899a0f --- /dev/null +++ b/fee-sweeper/migrations/2024-06-15-202249_create_last_indexed_table/up.sql @@ -0,0 +1,8 @@ +-- Create the table that stores indexing metadata +CREATE TABLE indexing_metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); + +-- Insert a row with the latest block number set to zero +INSERT INTO indexing_metadata (key, value) VALUES ('latest_block', '0'); diff --git a/fee-sweeper/migrations/2024-06-15-203503_create_fees_table/down.sql b/fee-sweeper/migrations/2024-06-15-203503_create_fees_table/down.sql new file mode 100644 index 0000000..dbf1808 --- /dev/null +++ b/fee-sweeper/migrations/2024-06-15-203503_create_fees_table/down.sql @@ -0,0 +1,4 @@ +-- Drop the fees table and indexes +DROP TABLE IF EXISTS fees; +DROP INDEX IF EXISTS idx_fees_mint; +DROP INDEX IF EXISTS idx_fees_amount; diff --git a/fee-sweeper/migrations/2024-06-15-203503_create_fees_table/up.sql b/fee-sweeper/migrations/2024-06-15-203503_create_fees_table/up.sql new file mode 100644 index 0000000..87ba8d7 --- /dev/null +++ b/fee-sweeper/migrations/2024-06-15-203503_create_fees_table/up.sql @@ -0,0 +1,13 @@ +-- Stores fees and index by mint, amount +CREATE TABLE fees( + id SERIAL PRIMARY KEY, + tx_hash TEXT NOT NULL UNIQUE, + mint TEXT NOT NULL, + amount NUMERIC NOT NULL, + blinder NUMERIC NOT NULL, + receiver TEXT NOT NULL, + redeemed BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE INDEX idx_fees_mint ON fees(mint); +CREATE INDEX idx_fees_amount ON fees(amount); diff --git a/fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/down.sql b/fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/down.sql new file mode 100644 index 0000000..ab2e55b --- /dev/null +++ b/fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/down.sql @@ -0,0 +1,2 @@ +-- Drop the wallets table +DROP TABLE IF EXISTS wallets; diff --git a/fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/up.sql b/fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/up.sql new file mode 100644 index 0000000..f53a974 --- /dev/null +++ b/fee-sweeper/migrations/2024-06-16-003335_create_wallets_table/up.sql @@ -0,0 +1,7 @@ +-- Create a table for storing wallets and the mints they hold +-- The `secret_id` is the id of the AWS Secrets Manager secret that holds recovery information for the wallet +CREATE TABLE wallets ( + id UUID PRIMARY KEY, + mints TEXT[] not null, + secret_id TEXT not null +); diff --git a/fee-sweeper/src/db/mod.rs b/fee-sweeper/src/db/mod.rs new file mode 100644 index 0000000..5402351 --- /dev/null +++ b/fee-sweeper/src/db/mod.rs @@ -0,0 +1,5 @@ +//! Database code + +pub mod models; +#[allow(missing_docs)] +pub mod schema; diff --git a/fee-sweeper/src/db/models.rs b/fee-sweeper/src/db/models.rs new file mode 100644 index 0000000..5ad7bfa --- /dev/null +++ b/fee-sweeper/src/db/models.rs @@ -0,0 +1,88 @@ +#![allow(missing_docs)] +#![allow(trivial_bounds)] + +use bigdecimal::BigDecimal; +use diesel::prelude::*; +use num_bigint::BigInt; +use renegade_circuit_types::note::Note; +use renegade_crypto::fields::scalar_to_bigint; +use renegade_util::hex::{biguint_to_hex_addr, jubjub_to_hex_string}; +use uuid::Uuid; + +use crate::db::schema::fees; + +/// A fee that has been indexed by the indexer +#[derive(Queryable, Selectable)] +#[diesel(table_name = crate::db::schema::fees)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[allow(missing_docs, clippy::missing_docs_in_private_items)] +pub struct Fee { + pub id: i32, + pub tx_hash: String, + pub mint: String, + pub amount: BigDecimal, + pub blinder: BigDecimal, + pub receiver: String, + pub redeemed: bool, +} + +/// A new fee inserted into the database +#[derive(Insertable)] +#[diesel(table_name = fees)] +pub struct NewFee { + pub tx_hash: String, + pub mint: String, + pub amount: BigDecimal, + pub blinder: BigDecimal, + pub receiver: String, +} + +impl NewFee { + /// Construct a fee from a note + pub fn new_from_note(note: &Note, tx_hash: String) -> Self { + let mint = biguint_to_hex_addr(¬e.mint); + let amount = BigInt::from(note.amount).into(); + let blinder = scalar_to_bigint(¬e.blinder).into(); + let receiver = jubjub_to_hex_string(¬e.receiver); + + NewFee { + tx_hash, + mint, + amount, + blinder, + receiver, + } + } +} + +/// Metadata information maintained by the indexer +#[derive(Clone, Queryable, Selectable)] +#[diesel(table_name = crate::db::schema::indexing_metadata)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[allow(missing_docs, clippy::missing_docs_in_private_items)] +pub struct Metadata { + pub key: String, + pub value: String, +} + +/// A metadata entry for a wallet managed by the indexer +#[derive(Clone, Queryable, Selectable, Insertable)] +#[diesel(table_name = crate::db::schema::wallets)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[allow(missing_docs, clippy::missing_docs_in_private_items)] +pub struct WalletMetadata { + pub id: Uuid, + pub mints: Vec>, + pub secret_id: String, +} + +impl WalletMetadata { + /// Construct a new wallet metadata entry + pub fn empty(id: Uuid, secret_id: String) -> Self { + WalletMetadata { + id, + mints: vec![], + secret_id, + } + } +} diff --git a/fee-sweeper/src/db/schema.rs b/fee-sweeper/src/db/schema.rs new file mode 100644 index 0000000..2363365 --- /dev/null +++ b/fee-sweeper/src/db/schema.rs @@ -0,0 +1,34 @@ +// @generated automatically by Diesel CLI. + +diesel::table! { + fees (id) { + id -> Int4, + tx_hash -> Text, + mint -> Text, + amount -> Numeric, + blinder -> Numeric, + receiver -> Text, + redeemed -> Bool, + } +} + +diesel::table! { + indexing_metadata (key) { + key -> Text, + value -> Text, + } +} + +diesel::table! { + wallets (id) { + id -> Uuid, + mints -> Array>, + secret_id -> Text, + } +} + +diesel::allow_tables_to_appear_in_same_query!( + fees, + indexing_metadata, + wallets, +); diff --git a/fee-sweeper/src/indexer/index_fees.rs b/fee-sweeper/src/indexer/index_fees.rs new file mode 100644 index 0000000..f41b9b7 --- /dev/null +++ b/fee-sweeper/src/indexer/index_fees.rs @@ -0,0 +1,105 @@ +//! Phase one of the sweeper's execution; index all fees since the last consistent block + +use alloy_sol_types::SolCall; +use arbitrum_client::abi::settleOfflineFeeCall; +use arbitrum_client::{ + abi::NotePostedFilter, constants::SELECTOR_LEN, + helpers::parse_note_ciphertext_from_settle_offline_fee, +}; +use ethers::contract::LogMeta; +use ethers::middleware::Middleware; +use renegade_circuit_types::elgamal::ElGamalCiphertext; +use renegade_circuit_types::native_helpers::elgamal_decrypt; +use renegade_circuit_types::note::{Note, NOTE_CIPHERTEXT_SIZE}; +use renegade_circuit_types::wallet::NoteCommitment; +use renegade_constants::Scalar; +use renegade_crypto::fields::{scalar_to_biguint, scalar_to_u128, u256_to_scalar}; +use renegade_util::raw_err_str; +use tracing::info; + +use crate::db::models::NewFee; +use crate::Indexer; + +impl Indexer { + /// Index all fees since the given block + pub async fn index_fees(&mut self) -> Result<(), String> { + let block_number = self.get_latest_block()?; + info!("indexing fees from block {block_number}"); + + let filter = self + .arbitrum_client + .get_darkpool_client() + .event::() + .from_block(block_number); + + let events = filter + .query_with_meta() + .await + .map_err(raw_err_str!("failed to create note posted stream: {}"))?; + + let mut most_recent_block = block_number; + for (event, meta) in events { + let block = meta.block_number.as_u64(); + let note_comm = u256_to_scalar(&event.note_commitment); + self.index_note(note_comm, meta).await?; + + if block > most_recent_block { + most_recent_block = block; + self.update_latest_block(most_recent_block)?; + } + } + + Ok(()) + } + + /// Index a note + async fn index_note(&mut self, note_comm: NoteCommitment, meta: LogMeta) -> Result<(), String> { + // Parse the note from the tx + let tx = self + .arbitrum_client + .get_darkpool_client() + .client() + .get_transaction(meta.transaction_hash) + .await + .map_err(raw_err_str!("failed to query tx: {}"))? + .ok_or_else(|| format!("tx not found: {}", meta.transaction_hash))?; + + let calldata: Vec = tx.input.to_vec(); + let selector: [u8; 4] = calldata[..SELECTOR_LEN].try_into().unwrap(); + let encryption = match selector { + ::SELECTOR => { + parse_note_ciphertext_from_settle_offline_fee(&calldata) + .map_err(raw_err_str!("failed to parse ciphertext: {}"))? + } + sel => return Err(format!("invalid selector when parsing note: {sel:?}")), + }; + + // Decrypt the note and check that the commitment matches the expected value; if not we are not the receiver + let note = self.decrypt_note(&encryption); + let tx = format!("{:#x}", meta.transaction_hash); + if note.commitment() != note_comm { + info!("not receiver, skipping"); + return Ok(()); + } else { + info!("indexing note from tx: {tx}"); + } + + // Otherwise, index the note + let fee = NewFee::new_from_note(¬e, tx); + self.insert_fee(fee) + } + + /// Decrypt a note using the decryption key + fn decrypt_note(&self, note: &ElGamalCiphertext) -> Note { + // The ciphertext stores all note values except the encryption key + let cleartext_values: [Scalar; NOTE_CIPHERTEXT_SIZE] = + elgamal_decrypt(note, &self.decryption_key); + + Note { + mint: scalar_to_biguint(&cleartext_values[0]), + amount: scalar_to_u128(&cleartext_values[1]), + receiver: self.decryption_key.public_key(), + blinder: cleartext_values[2], + } + } +} diff --git a/fee-sweeper/src/indexer/mod.rs b/fee-sweeper/src/indexer/mod.rs new file mode 100644 index 0000000..4616597 --- /dev/null +++ b/fee-sweeper/src/indexer/mod.rs @@ -0,0 +1,49 @@ +//! The indexer handles the indexing and redemption of fee notes + +use arbitrum_client::client::ArbitrumClient; +use aws_config::SdkConfig as AwsConfig; +use diesel::PgConnection; +use renegade_circuit_types::elgamal::DecryptionKey; + +use crate::relayer_client::RelayerClient; + +pub mod index_fees; +pub mod queries; +pub mod redeem_fees; + +/// Stores the dependencies needed to index the chain +pub(crate) struct Indexer { + /// The environment this indexer runs in + pub env: String, + /// A client for interacting with the relayer + pub relayer_client: RelayerClient, + /// The Arbitrum client + pub arbitrum_client: ArbitrumClient, + /// The decryption key + pub decryption_key: DecryptionKey, + /// A connection to the DB + pub db_conn: PgConnection, + /// The AWS config + pub aws_config: AwsConfig, +} + +impl Indexer { + /// Constructor + pub fn new( + env: String, + aws_config: AwsConfig, + arbitrum_client: ArbitrumClient, + decryption_key: DecryptionKey, + db_conn: PgConnection, + relayer_client: RelayerClient, + ) -> Self { + Indexer { + env, + arbitrum_client, + decryption_key, + db_conn, + relayer_client, + aws_config, + } + } +} diff --git a/fee-sweeper/src/indexer/queries.rs b/fee-sweeper/src/indexer/queries.rs new file mode 100644 index 0000000..63b6b8d --- /dev/null +++ b/fee-sweeper/src/indexer/queries.rs @@ -0,0 +1,205 @@ +//! Groups query logic for the indexer + +use std::collections::HashMap; + +use bigdecimal::BigDecimal; +use diesel::define_sql_function; +use diesel::deserialize::Queryable; +use diesel::deserialize::QueryableByName; +use diesel::sql_query; +use diesel::sql_types::SingleValue; +use diesel::sql_types::{Array, Integer, Nullable, Numeric, Text}; +use diesel::PgArrayExpressionMethods; +use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; +use renegade_constants::MAX_BALANCES; +use renegade_util::raw_err_str; + +use crate::db::models::WalletMetadata; +use crate::db::models::{Metadata, NewFee}; +use crate::db::schema::{ + fees::dsl::{fees as fees_table, mint as mint_col, redeemed as redeemed_col}, + indexing_metadata::dsl::{ + indexing_metadata as metadata_table, key as metadata_key, value as metadata_value, + }, + wallets::dsl::{mints as managed_mints_col, wallets as wallet_table}, +}; +use crate::Indexer; + +use super::redeem_fees::MAX_FEES_REDEEMED; + +/// The metadata key for the last indexed block +pub(crate) const LAST_INDEXED_BLOCK_KEY: &str = "latest_block"; + +// Define the `array_length` function +define_sql_function! { + /// Calculate the length of an array + fn array_length(array: Array, dim: Integer) -> Nullable; +} + +define_sql_function! { + /// Coalesce a nullable value with a default value + fn coalesce(x: Nullable, y: T) -> T; +} + +// --------------- +// | Query Types | +// --------------- + +/// A sub-query of the most valuable fees to be redeemed +#[derive(Debug, Queryable, QueryableByName)] +pub(crate) struct FeeValue { + /// The tx hash of the fee + #[sql_type = "Text"] + pub tx_hash: String, + /// The mint of the fee + #[sql_type = "Text"] + pub mint: String, + /// The value of the fee + #[sql_type = "Numeric"] + pub value: BigDecimal, +} + +// ------------------------- +// | Query Implementations | +// ------------------------- + +impl Indexer { + // ------------------ + // | Metadata Table | + // ------------------ + + /// Get the latest block number + pub(crate) fn get_latest_block(&mut self) -> Result { + let entry = metadata_table + .filter(metadata_key.eq(LAST_INDEXED_BLOCK_KEY)) + .limit(1) + .load(&mut self.db_conn) + .map(|res: Vec| res[0].clone()) + .map_err(raw_err_str!("failed to query latest block: {}"))?; + + entry + .value + .parse::() + .map_err(raw_err_str!("failed to parse latest block: {}")) + } + + /// Update the latest block number + pub(crate) fn update_latest_block(&mut self, block_number: u64) -> Result<(), String> { + let block_string = block_number.to_string(); + diesel::update(metadata_table.find(LAST_INDEXED_BLOCK_KEY)) + .set(metadata_value.eq(block_string)) + .execute(&mut self.db_conn) + .map_err(raw_err_str!("failed to update latest block: {}")) + .map(|_| ()) + } + + // -------------- + // | Fees Table | + // -------------- + + /// Insert a fee into the fees table + pub(crate) fn insert_fee(&mut self, fee: NewFee) -> Result<(), String> { + diesel::insert_into(fees_table) + .values(vec![fee]) + .execute(&mut self.db_conn) + .map_err(raw_err_str!("failed to insert fee: {}")) + .map(|_| ()) + } + + /// Get all mints that have unredeemed fees + pub(crate) fn get_unredeemed_fee_mints(&mut self) -> Result, String> { + let mints = fees_table + .select(mint_col) + .filter(redeemed_col.eq(false)) + .distinct() + .load(&mut self.db_conn) + .map_err(raw_err_str!("failed to query unredeemed fees: {}"))?; + + Ok(mints) + } + + /// Get the most valuable fees to be redeemed + /// + /// Returns the tx hashes of the most valuable fees to be redeemed + pub(crate) fn get_most_valuable_fees( + &mut self, + prices: HashMap, + ) -> Result, String> { + if prices.is_empty() { + return Ok(vec![]); + } + + // We query the fees table with a transformation that calculates the value of each fee using the prices passed in. + // This query looks something like: + // SELECT tx_hash, mint, amount, + // CASE + // WHEN mint = '' then amount * + // WHEN mint = '' then amount * + // ... + // ELSE 0 + // END as value + // FROM fees + // ORDER BY value DESC; + let mut query_string = String::new(); + query_string.push_str("SELECT tx_hash, mint, "); + query_string.push_str("CASE "); + + // Add the cases + for (mint, price) in prices.into_iter() { + query_string.push_str(&format!("WHEN mint = '{}' then amount * {} ", mint, price)); + } + query_string.push_str("ELSE 0 END as value "); + query_string.push_str("FROM fees WHERE redeemed = false "); + + // Sort and limit + query_string.push_str(&format!("ORDER BY value DESC LIMIT {};", MAX_FEES_REDEEMED)); + + // Query for the tx hashes + sql_query(query_string) + .load(&mut self.db_conn) + .map_err(raw_err_str!("failed to query most valuable fees: {}")) + } + + // ----------------- + // | Wallets Table | + // ----------------- + + /// Get the wallet managing an mint, if it exists + /// + /// Returns the id and secret id of the wallet + pub(crate) fn get_wallet_for_mint( + &mut self, + mint: &str, + ) -> Result, String> { + let wallets: Vec = wallet_table + .filter(managed_mints_col.contains(vec![mint])) + .load(&mut self.db_conn) + .map_err(raw_err_str!("failed to query wallet for mint: {}"))?; + + Ok(wallets.first().cloned()) + } + + /// Find a wallet with an empty balance slot, if one exists + pub(crate) fn find_wallet_with_empty_balance( + &mut self, + ) -> Result, String> { + let n_mints = coalesce(array_length(managed_mints_col, 1 /* dim */), 0); + let wallets = wallet_table + .filter(n_mints.lt(MAX_BALANCES as i32)) + .load(&mut self.db_conn) + .map_err(raw_err_str!( + "failed to query wallets with empty balances: {}" + ))?; + + Ok(wallets.first().cloned()) + } + + /// Insert a new wallet into the wallets table + pub(crate) fn insert_wallet(&mut self, wallet: WalletMetadata) -> Result<(), String> { + diesel::insert_into(wallet_table) + .values(vec![wallet]) + .execute(&mut self.db_conn) + .map_err(raw_err_str!("failed to insert wallet: {}")) + .map(|_| ()) + } +} diff --git a/fee-sweeper/src/indexer/redeem_fees.rs b/fee-sweeper/src/indexer/redeem_fees.rs new file mode 100644 index 0000000..f0b65f1 --- /dev/null +++ b/fee-sweeper/src/indexer/redeem_fees.rs @@ -0,0 +1,136 @@ +//! Fee redemption logic + +use std::collections::HashMap; +use std::str::FromStr; + +use aws_sdk_secretsmanager::Client as SecretsManagerClient; +use ethers::core::rand::thread_rng; +use ethers::signers::LocalWallet; +use ethers::utils::hex; +use renegade_common::types::wallet::derivation::{ + derive_blinder_seed, derive_share_seed, derive_wallet_id, derive_wallet_keychain, +}; +use renegade_common::types::wallet::{Wallet, WalletIdentifier}; +use renegade_util::raw_err_str; +use tracing::{info, warn}; + +use crate::db::models::WalletMetadata; +use crate::Indexer; + +/// The maximum number of fees to redeem in a given run of the indexer +pub(crate) const MAX_FEES_REDEEMED: usize = 20; + +impl Indexer { + /// Redeem the most valuable open fees + pub async fn redeem_fees(&mut self) -> Result<(), String> { + info!("redeeming fees..."); + + // Get all mints that have unredeemed fees + let mints = self.get_unredeemed_fee_mints()?; + + // Get the prices of each redeemable mint, we want to redeem the most profitable fees first + let mut prices = HashMap::new(); + for mint in mints.into_iter() { + let maybe_price = self.relayer_client.get_binance_price(&mint).await?; + if let Some(price) = maybe_price { + prices.insert(mint, price); + } else { + warn!("{}: no price", mint); + } + } + + // Get the most valuable fees and redeem them + let most_valuable_fees = self.get_most_valuable_fees(prices)?; + + // TODO: Filter by those fees whose present value exceeds the expected gas costs to redeem + for fee in most_valuable_fees.into_iter() { + let wallet = self.get_or_create_wallet(&fee.mint).await?; + info!("redeeming into {}", wallet.id); + } + + Ok(()) + } + + /// Find or create a wallet to store balances of a given mint + async fn get_or_create_wallet(&mut self, mint: &str) -> Result { + let maybe_wallet = self.get_wallet_for_mint(mint)?; + let maybe_wallet = + maybe_wallet.or_else(|| self.find_wallet_with_empty_balance().ok().flatten()); + + match maybe_wallet { + Some(wallet) => Ok(wallet), + None => { + info!("creating new wallet for {mint}"); + self.create_new_wallet().await + } + } + } + + /// Create a new wallet for managing a given mint + /// + /// Return the new wallet's metadata + async fn create_new_wallet(&mut self) -> Result { + // 1. Create the new wallet on-chain + let (wallet_id, root_key) = self.create_renegade_wallet().await?; + + // 2. Create a secrets manager entry for the new wallet + let secret_name = self + .create_secrets_manager_entry(wallet_id, root_key) + .await?; + + // 3. Add an entry in the wallets table for the newly created wallet + let entry = WalletMetadata::empty(wallet_id, secret_name); + self.insert_wallet(entry.clone())?; + + Ok(entry) + } + + /// Create a new Renegade wallet on-chain + async fn create_renegade_wallet(&mut self) -> Result<(WalletIdentifier, LocalWallet), String> { + let chain_id = self + .arbitrum_client + .chain_id() + .await + .map_err(raw_err_str!("Error fetching chain ID: {}"))?; + let root_key = LocalWallet::new(&mut thread_rng()); + + let wallet_id = derive_wallet_id(&root_key)?; + let blinder_seed = derive_blinder_seed(&root_key)?; + let share_seed = derive_share_seed(&root_key)?; + let key_chain = derive_wallet_keychain(&root_key, chain_id)?; + + let wallet = Wallet::new_empty_wallet(wallet_id, blinder_seed, share_seed, key_chain); + self.relayer_client.create_new_wallet(wallet).await?; + info!("created new wallet for fee redemption"); + + Ok((wallet_id, root_key)) + } + + /// Add a Renegade wallet to the secrets manager entry so that it may be recovered later + /// + /// Returns the name of the secret + async fn create_secrets_manager_entry( + &mut self, + id: WalletIdentifier, + wallet: LocalWallet, + ) -> Result { + let client = SecretsManagerClient::new(&self.aws_config); + let secret_name = format!("redemption-wallet-{}-{id}", self.env); + let secret_val = hex::encode(wallet.signer().to_bytes()); + + // Check that the `LocalWallet` recovers the same + debug_assert_eq!(LocalWallet::from_str(&secret_val).unwrap(), wallet); + + // Store the secret in AWS + client + .create_secret() + .name(secret_name.clone()) + .secret_string(secret_val) + .description("Wallet used for fee redemption") + .send() + .await + .map_err(raw_err_str!("Error creating secret: {}"))?; + + Ok(secret_name) + } +} diff --git a/fee-sweeper/src/main.rs b/fee-sweeper/src/main.rs new file mode 100644 index 0000000..2390d5c --- /dev/null +++ b/fee-sweeper/src/main.rs @@ -0,0 +1,115 @@ +//! The fee sweeper, sweeps for unredeemed fees in the Renegade protocol and redeems them +#![deny(missing_docs)] +#![deny(clippy::missing_docs_in_private_items)] +#![deny(unsafe_code)] +#![deny(clippy::needless_pass_by_ref_mut)] +#![feature(trivial_bounds)] + +pub mod db; +pub mod indexer; +pub mod relayer_client; + +use aws_config::{BehaviorVersion, Region}; +use diesel::{pg::PgConnection, Connection}; +use ethers::signers::LocalWallet; +use indexer::Indexer; +use relayer_client::RelayerClient; +use renegade_circuit_types::elgamal::DecryptionKey; +use renegade_util::telemetry::{setup_system_logger, LevelFilter}; + +use std::{error::Error, str::FromStr}; + +use arbitrum_client::{ + client::{ArbitrumClient, ArbitrumClientConfig}, + constants::Chain, +}; +use clap::Parser; + +// ------------- +// | Constants | +// ------------- + +/// The block polling interval for the Arbitrum client +const BLOCK_POLLING_INTERVAL_MS: u64 = 100; +/// The default region in which to provision secrets manager secrets +const DEFAULT_REGION: &str = "us-east-2"; + +// ------- +// | Cli | +// ------- + +/// The cli for the fee sweeper +#[derive(Debug, Parser)] +struct Cli { + /// The environment this sweeper runs in + #[clap(short, long, default_value = "testnet")] + env: String, + /// The URL of the relayer to use + #[clap(long)] + relayer_url: String, + /// The Arbitrum RPC url to use + #[clap(short, long)] + rpc_url: String, + /// The address of the darkpool contract + #[clap(short = 'a', long)] + darkpool_address: String, + /// The chain to redeem fees for + #[clap(long, default_value = "mainnet")] + chain: Chain, + /// The fee decryption key to use + #[clap(short, long)] + decryption_key: String, + /// The arbitrum private key used to submit transactions + #[clap(long = "pkey")] + arbitrum_private_key: String, + /// The database url + #[clap(long)] + db_url: String, + /// The token address of the USDC token, used to get prices for fee redemption + #[clap(long)] + usdc_mint: String, +} + +impl Cli { + /// Build a connection to the DB + pub fn build_db_conn(&self) -> Result { + PgConnection::establish(&self.db_url).map_err(|e| e.to_string()) + } +} + +/// Main +#[tokio::main] +async fn main() -> Result<(), Box> { + setup_system_logger(LevelFilter::INFO); + let cli = Cli::parse(); + let db_conn = cli.build_db_conn()?; + + // Parse an AWS config + let config = aws_config::defaults(BehaviorVersion::latest()) + .region(Region::new(DEFAULT_REGION)) + .load() + .await; + + // Build an Arbitrum client + let wallet = LocalWallet::from_str(&cli.arbitrum_private_key)?; + let conf = ArbitrumClientConfig { + darkpool_addr: cli.darkpool_address, + chain: cli.chain, + rpc_url: cli.rpc_url, + arb_priv_keys: vec![wallet], + block_polling_interval_ms: BLOCK_POLLING_INTERVAL_MS, + }; + let client = ArbitrumClient::new(conf).await?; + + // Build the indexer + let key = DecryptionKey::from_hex_str(&cli.decryption_key)?; + let relayer_client = RelayerClient::new(&cli.relayer_url, &cli.usdc_mint); + let mut indexer = Indexer::new(cli.env, config, client, key, db_conn, relayer_client); + + // 1. Index all new fees in the DB + indexer.index_fees().await?; + // 2. Redeem fees according to the redemption policy + indexer.redeem_fees().await?; + + Ok(()) +} diff --git a/fee-sweeper/src/relayer_client.rs b/fee-sweeper/src/relayer_client.rs new file mode 100644 index 0000000..ed60cb2 --- /dev/null +++ b/fee-sweeper/src/relayer_client.rs @@ -0,0 +1,253 @@ +//! Client code for interacting with a configured relayer + +use std::time::Duration; + +use base64::engine::{general_purpose as b64_general_purpose, Engine}; +use ethers::core::k256::ecdsa::{signature::Signer, Signature, SigningKey}; +use http::{HeaderMap, HeaderValue}; +use renegade_api::{ + http::{ + price_report::{GetPriceReportRequest, GetPriceReportResponse, PRICE_REPORT_ROUTE}, + task::{GetTaskStatusResponse, GET_TASK_STATUS_ROUTE}, + wallet::{CreateWalletRequest, CreateWalletResponse, CREATE_WALLET_ROUTE}, + }, + RENEGADE_AUTH_HEADER_NAME, RENEGADE_SIG_EXPIRATION_HEADER_NAME, +}; +use renegade_circuit_types::keychain::SecretSigningKey; +use renegade_common::types::{exchange::PriceReporterState, token::Token, wallet::Wallet}; +use renegade_util::{get_current_time_millis, raw_err_str}; +use reqwest::{Body, Client}; +use serde::{Deserialize, Serialize}; +use tracing::warn; +use uuid::Uuid; + +/// The interval at which to poll relayer task status +const POLL_INTERVAL_MS: u64 = 1000; +/// The amount of time (ms) to declare a wallet signature value for +const SIG_EXPIRATION_BUFFER_MS: u64 = 5000; + +/// A client for interacting with a configured relayer +pub struct RelayerClient { + /// The base URL of the relayer + base_url: String, + /// The mind of the USDC token + usdc_mint: String, +} + +impl RelayerClient { + /// Create a new relayer client + pub fn new(base_url: &str, usdc_mint: &str) -> Self { + Self { + base_url: base_url.to_string(), + usdc_mint: usdc_mint.to_string(), + } + } + + /// Get the price for a given mint + pub async fn get_binance_price(&self, mint: &str) -> Result, String> { + if mint == self.usdc_mint { + return Ok(Some(1.0)); + } + + let body = GetPriceReportRequest { + base_token: Token::from_addr(mint), + quote_token: Token::from_addr(&self.usdc_mint), + }; + let response: GetPriceReportResponse = self.post_relayer(PRICE_REPORT_ROUTE, &body).await?; + + match response.price_report { + PriceReporterState::Nominal(report) => Ok(Some(report.price)), + state => { + warn!("Price report state: {state:?}"); + Ok(None) + } + } + } + + // ------------------ + // | Wallet Methods | + // ------------------ + + /// Create a new wallet via the configured relayer + pub(crate) async fn create_new_wallet(&self, wallet: Wallet) -> Result<(), String> { + let body = CreateWalletRequest { + wallet: wallet.into(), + }; + + let resp: CreateWalletResponse = self.post_relayer(CREATE_WALLET_ROUTE, &body).await?; + self.await_relayer_task(resp.task_id).await + } + + // ----------- + // | Helpers | + // ----------- + + /// Post to the relayer URL + async fn post_relayer(&self, path: &str, body: &Req) -> Result + where + Req: Serialize, + Resp: for<'de> Deserialize<'de>, + { + self.post_relayer_with_headers(path, body, &HeaderMap::new()) + .await + } + + /// Post to the relayer with wallet auth + async fn post_relayer_with_auth( + &self, + path: &str, + body: &Req, + root_key: &SecretSigningKey, + ) -> Result + where + Req: Serialize, + Resp: for<'de> Deserialize<'de>, + { + let body_ser = + serde_json::to_vec(body).map_err(raw_err_str!("Failed to serialize body: {}"))?; + let headers = build_auth_headers(root_key, &body_ser)?; + self.post_relayer_with_headers(path, body, &headers).await + } + + /// Post to the relayer with given headers + async fn post_relayer_with_headers( + &self, + path: &str, + body: &Req, + headers: &HeaderMap, + ) -> Result + where + Req: Serialize, + Resp: for<'de> Deserialize<'de>, + { + // Send a request + let client = reqwest_client()?; + let route = format!("{}{}", self.base_url, path); + let resp = client + .post(route) + .json(body) + .headers(headers.clone()) + .send() + .await + .map_err(raw_err_str!("Failed to send request: {}"))?; + + // Deserialize the response + if !resp.status().is_success() { + return Err(format!("Failed to send request: {}", resp.status())); + } + + resp.json::() + .await + .map_err(raw_err_str!("Failed to parse response: {}")) + } + + /// Get from the relayer URL + async fn get_relayer(&self, path: &str) -> Result + where + Resp: for<'de> Deserialize<'de>, + { + self.get_relayer_with_headers(path, &HeaderMap::new()).await + } + + /// Get from the relayer URL with wallet auth + async fn get_relayer_with_auth( + &self, + path: &str, + root_key: &SecretSigningKey, + ) -> Result + where + Resp: for<'de> Deserialize<'de>, + { + let headers = build_auth_headers(root_key, &[])?; + self.get_relayer_with_headers(path, &headers).await + } + + /// Get from the relayer URL with given headers + async fn get_relayer_with_headers( + &self, + path: &str, + headers: &HeaderMap, + ) -> Result + where + Resp: for<'de> Deserialize<'de>, + { + let client = reqwest_client()?; + let url = format!("{}{}", self.base_url, path); + let resp = client + .get(url) + .headers(headers.clone()) + .send() + .await + .map_err(raw_err_str!("Failed to get relayer path: {}"))?; + + // Parse the response + if !resp.status().is_success() { + return Err(format!("Failed to get relayer path: {}", resp.status())); + } + + resp.json::() + .await + .map_err(raw_err_str!("Failed to parse response: {}")) + } + + /// Await a relayer task + async fn await_relayer_task(&self, task_id: Uuid) -> Result<(), String> { + let mut path = GET_TASK_STATUS_ROUTE.to_string(); + path = path.replace(":task_id", &task_id.to_string()); + + // Enter a polling loop until the task finishes + let poll_interval = Duration::from_millis(POLL_INTERVAL_MS); + loop { + // For now, we assume that an error is a 404 in which case the task has completed + // TODO: Improve this break condition if it proves problematic + if self + .get_relayer::(&path) + .await + .is_err() + { + break; + } + + // Sleep for a bit before polling again + std::thread::sleep(poll_interval); + } + + Ok(()) + } +} + +// ----------- +// | Helpers | +// ----------- + +/// Build a reqwest client +fn reqwest_client() -> Result { + Client::builder() + .user_agent("fee-sweeper") + .build() + .map_err(raw_err_str!("Failed to create reqwest client: {}")) +} + +/// Build authentication headers for a request +fn build_auth_headers(key: &SecretSigningKey, req_bytes: &[u8]) -> Result { + let mut headers = HeaderMap::new(); + let expiration = get_current_time_millis() + SIG_EXPIRATION_BUFFER_MS; + headers.insert(RENEGADE_SIG_EXPIRATION_HEADER_NAME, expiration.into()); + + let root_key: SigningKey = key.try_into()?; + + // Sign the concatenation of the message and the expiration timestamp + let body = Body::from(req_bytes.to_vec()); + let msg_bytes = body.as_bytes().unwrap(); + let payload = [msg_bytes, &expiration.to_le_bytes()].concat(); + + let signature: Signature = root_key.sign(&payload); + let encoded_sig = b64_general_purpose::STANDARD_NO_PAD.encode(signature.to_bytes()); + + headers.insert( + RENEGADE_AUTH_HEADER_NAME, + HeaderValue::from_str(&encoded_sig).unwrap(), + ); + + Ok(headers) +}