Skip to content

Commit

Permalink
funds-manager: Use diesel-async and add warp endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
joeykraut committed Jul 18, 2024
1 parent bf02ad0 commit 97ddcb7
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 50 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,9 @@ renegade-circuit-types = { package = "circuit-types", git = "https://github.com/
renegade-crypto = { git = "https://github.com/renegade-fi/renegade.git" }
renegade-util = { package = "util", git = "https://github.com/renegade-fi/renegade.git" }

# === Database Dependencies === #
diesel = { version = "2.1" }
diesel-async = { version = "0.4" }

# === Misc Dependencies === #
tracing = "0.1"
2 changes: 1 addition & 1 deletion compliance/compliance-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ warp = "0.3"
compliance-api = { path = "../compliance-api" }

# === Database === #
diesel = { version = "2.2", features = ["postgres", "r2d2"] }
diesel = { workspace = true, features = ["postgres", "r2d2"] }

# === Renegade Dependencies === #
renegade-util = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions funds-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "fee-sweeper"
name = "funds-manager"
description = "Manages custody of funds for protocol operator"
version = "0.1.0"
edition = "2021"
Expand All @@ -14,7 +14,8 @@ warp = "0.3"
# === Infra === #
aws-sdk-secretsmanager = "1.37"
aws-config = "1.5"
diesel = { version = "2.2", features = ["postgres", "numeric", "uuid"] }
diesel = { workspace = true, features = ["postgres", "numeric", "uuid"] }
diesel-async = { workspace = true, features = ["postgres"] }

# === Blockchain Interaction === #
alloy-sol-types = "0.3.1"
Expand Down
6 changes: 3 additions & 3 deletions funds-manager/src/indexer/index_fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::Indexer;
impl Indexer {
/// Index all fees since the given block
pub async fn index_fees(&mut self) -> Result<(), FundsManagerError> {
let block_number = self.get_latest_block()?;
let block_number = self.get_latest_block().await?;
info!("indexing fees from block {block_number}");

let filter = self
Expand All @@ -48,7 +48,7 @@ impl Indexer {

if block > most_recent_block {
most_recent_block = block;
self.update_latest_block(most_recent_block)?;
self.update_latest_block(most_recent_block).await?;
}
}

Expand Down Expand Up @@ -86,7 +86,7 @@ impl Indexer {

// Otherwise, index the note
let fee = NewFee::new_from_note(&note, tx);
self.insert_fee(fee)
self.insert_fee(fee).await
}

/// Get a note from a transaction body
Expand Down
14 changes: 8 additions & 6 deletions funds-manager/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use arbitrum_client::{client::ArbitrumClient, constants::Chain};
use aws_config::SdkConfig as AwsConfig;
use diesel::PgConnection;
use renegade_circuit_types::elgamal::{DecryptionKey, EncryptionKey};
use diesel_async::AsyncPgConnection;
use renegade_circuit_types::elgamal::DecryptionKey;
use renegade_util::hex::jubjub_from_hex_string;

use crate::relayer_client::RelayerClient;

Expand All @@ -24,7 +25,7 @@ pub(crate) struct Indexer {
/// The decryption key
pub decryption_keys: Vec<DecryptionKey>,
/// A connection to the DB
pub db_conn: PgConnection,
pub db_conn: AsyncPgConnection,
/// The AWS config
pub aws_config: AwsConfig,
}
Expand All @@ -37,7 +38,7 @@ impl Indexer {
aws_config: AwsConfig,
arbitrum_client: ArbitrumClient,
decryption_keys: Vec<DecryptionKey>,
db_conn: PgConnection,
db_conn: AsyncPgConnection,
relayer_client: RelayerClient,
) -> Self {
Indexer {
Expand All @@ -53,7 +54,8 @@ impl Indexer {

/// Get the decryption key for a given encryption key, referred to as a
/// receiver in this context
pub fn get_key_for_receiver(&self, receiver: EncryptionKey) -> Option<&DecryptionKey> {
self.decryption_keys.iter().find(|key| key.public_key() == receiver)
pub fn get_key_for_receiver(&self, receiver: &str) -> Option<&DecryptionKey> {
let key = jubjub_from_hex_string(receiver).ok()?;
self.decryption_keys.iter().find(|k| k.public_key() == key)
}
}
57 changes: 36 additions & 21 deletions funds-manager/src/indexer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
use std::collections::HashMap;

use bigdecimal::BigDecimal;
use diesel::define_sql_function;
use diesel::deserialize::Queryable;
use diesel::deserialize::QueryableByName;
use diesel::sql_function;
use diesel::sql_query;
use diesel::sql_types::SingleValue;
use diesel::sql_types::{Array, Integer, Nullable, Numeric, Text};
use diesel::PgArrayExpressionMethods;
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use renegade_constants::MAX_BALANCES;

use crate::db::models::WalletMetadata;
Expand All @@ -33,12 +34,12 @@ use super::redeem_fees::MAX_FEES_REDEEMED;
pub(crate) const LAST_INDEXED_BLOCK_KEY: &str = "latest_block";

// Define the `array_length` function
define_sql_function! {
sql_function! {
/// Calculate the length of an array
fn array_length<T>(array: Array<T>, dim: Integer) -> Nullable<Integer>;
}

define_sql_function! {
sql_function! {
/// Coalesce a nullable value with a default value
fn coalesce<T: SingleValue>(x: Nullable<T>, y: T) -> T;
}
Expand Down Expand Up @@ -75,12 +76,13 @@ impl Indexer {
// ------------------

/// Get the latest block number
pub(crate) fn get_latest_block(&mut self) -> Result<u64, FundsManagerError> {
pub(crate) async fn get_latest_block(&mut self) -> Result<u64, FundsManagerError> {
let entry = metadata_table
.filter(metadata_key.eq(LAST_INDEXED_BLOCK_KEY))
.limit(1)
.load(&mut self.db_conn)
.map(|res: Vec<Metadata>| res[0].clone())
.load::<Metadata>(&mut self.db_conn)
.await
.map(|res| res[0].clone())
.map_err(|_| FundsManagerError::db("failed to query latest block"))?;

entry
Expand All @@ -90,14 +92,15 @@ impl Indexer {
}

/// Update the latest block number
pub(crate) fn update_latest_block(
pub(crate) async fn update_latest_block(
&mut self,
block_number: u64,
) -> Result<(), FundsManagerError> {
let block_string = block_number.to_string();
diesel::update(metadata_table.find(LAST_INDEXED_BLOCK_KEY))
.set(metadata_value.eq(block_string))
.execute(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to update latest block"))
.map(|_| ())
}
Expand All @@ -107,40 +110,48 @@ impl Indexer {
// --------------

/// Insert a fee into the fees table
pub(crate) fn insert_fee(&mut self, fee: NewFee) -> Result<(), FundsManagerError> {
pub(crate) async fn insert_fee(&mut self, fee: NewFee) -> Result<(), FundsManagerError> {
diesel::insert_into(fees_table)
.values(vec![fee])
.execute(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to insert fee: {}"))
.map(|_| ())
}

/// Get all mints that have unredeemed fees
pub(crate) fn get_unredeemed_fee_mints(&mut self) -> Result<Vec<String>, FundsManagerError> {
pub(crate) async fn get_unredeemed_fee_mints(
&mut self,
) -> Result<Vec<String>, FundsManagerError> {
let mints = fees_table
.select(mint_col)
.filter(redeemed_col.eq(false))
.distinct()
.load(&mut self.db_conn)
.load::<String>(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to query unredeemed fees"))?;

Ok(mints)
}

/// Mark a fee as redeemed
pub(crate) fn mark_fee_as_redeemed(&mut self, tx_hash: &str) -> Result<(), FundsManagerError> {
pub(crate) async fn mark_fee_as_redeemed(
&mut self,
tx_hash: &str,
) -> Result<(), FundsManagerError> {
let filter = tx_hash_col.eq(tx_hash);
diesel::update(fees_table.filter(filter))
.set(redeemed_col.eq(true))
.execute(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to mark fee as redeemed"))
.map(|_| ())
}

/// Get the most valuable fees to be redeemed
///
/// Returns the tx hashes of the most valuable fees to be redeemed
pub(crate) fn get_most_valuable_fees(
pub(crate) async fn get_most_valuable_fees(
&mut self,
prices: HashMap<String, f64>,
) -> Result<Vec<FeeValue>, FundsManagerError> {
Expand Down Expand Up @@ -175,7 +186,8 @@ impl Indexer {

// Query for the tx hashes
sql_query(query_string)
.load(&mut self.db_conn)
.load::<FeeValue>(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to query most valuable fees"))
}

Expand All @@ -186,39 +198,42 @@ impl Indexer {
/// Get the wallet managing an mint, if it exists
///
/// Returns the id and secret id of the wallet
pub(crate) fn get_wallet_for_mint(
pub(crate) async fn get_wallet_for_mint(
&mut self,
mint: &str,
) -> Result<Option<WalletMetadata>, FundsManagerError> {
let wallets: Vec<WalletMetadata> = wallet_table
.filter(managed_mints_col.contains(vec![mint]))
.load(&mut self.db_conn)
.load::<WalletMetadata>(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to query wallet for mint"))?;

Ok(wallets.first().cloned())
Ok(wallets.into_iter().next())
}

/// Find a wallet with an empty balance slot, if one exists
pub(crate) fn find_wallet_with_empty_balance(
pub(crate) async fn find_wallet_with_empty_balance(
&mut self,
) -> Result<Option<WalletMetadata>, FundsManagerError> {
let n_mints = coalesce(array_length(managed_mints_col, 1 /* dim */), 0);
let wallets = wallet_table
.filter(n_mints.lt(MAX_BALANCES as i32))
.load(&mut self.db_conn)
.load::<WalletMetadata>(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to query wallets with empty balances"))?;

Ok(wallets.first().cloned())
Ok(wallets.into_iter().next())
}

/// Insert a new wallet into the wallets table
pub(crate) fn insert_wallet(
pub(crate) async fn insert_wallet(
&mut self,
wallet: WalletMetadata,
) -> Result<(), FundsManagerError> {
diesel::insert_into(wallet_table)
.values(vec![wallet])
.execute(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to insert wallet"))
.map(|_| ())
}
Expand Down
26 changes: 15 additions & 11 deletions funds-manager/src/indexer/redeem_fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use ethers::signers::LocalWallet;
use ethers::types::TxHash;
use ethers::utils::hex;
use renegade_api::http::wallet::RedeemNoteRequest;
use renegade_circuit_types::elgamal::DecryptionKey;
use renegade_circuit_types::note::Note;
use renegade_common::types::wallet::derivation::{
derive_blinder_seed, derive_share_seed, derive_wallet_id, derive_wallet_keychain,
Expand All @@ -31,7 +30,7 @@ impl Indexer {
info!("redeeming fees...");

// Get all mints that have unredeemed fees
let mints = self.get_unredeemed_fee_mints()?;
let mints = self.get_unredeemed_fee_mints().await?;

// Get the prices of each redeemable mint, we want to redeem the most profitable
// fees first
Expand All @@ -46,7 +45,7 @@ impl Indexer {
}

// Get the most valuable fees and redeem them
let most_valuable_fees = self.get_most_valuable_fees(prices)?;
let most_valuable_fees = self.get_most_valuable_fees(prices).await?;

// TODO: Filter by those fees whose present value exceeds the expected gas costs
// to redeem
Expand All @@ -67,9 +66,12 @@ impl Indexer {
&mut self,
mint: &str,
) -> Result<WalletMetadata, FundsManagerError> {
let maybe_wallet = self.get_wallet_for_mint(mint)?;
let maybe_wallet =
maybe_wallet.or_else(|| self.find_wallet_with_empty_balance().ok().flatten());
let maybe_wallet = self.get_wallet_for_mint(mint).await?;
let maybe_wallet = if maybe_wallet.is_none() {
self.find_wallet_with_empty_balance().await?
} else {
maybe_wallet
};

match maybe_wallet {
Some(wallet) => Ok(wallet),
Expand All @@ -92,7 +94,7 @@ impl Indexer {

// 3. Add an entry in the wallets table for the newly created wallet
let entry = WalletMetadata::empty(wallet_id, secret_name);
self.insert_wallet(entry.clone())?;
self.insert_wallet(entry.clone()).await?;

Ok(entry)
}
Expand Down Expand Up @@ -137,11 +139,13 @@ impl Indexer {

// Find the note in the tx body
let tx_hash = TxHash::from_str(&tx).map_err(err_str!(FundsManagerError::Parse))?;
let receiver = DecryptionKey::from_hex_str(&receiver).unwrap();
let note = self.get_note_from_tx_with_key(tx_hash, &receiver).await?;
let key = self
.get_key_for_receiver(&receiver)
.ok_or(FundsManagerError::custom("no key found for receiver"))?;
let note = self.get_note_from_tx_with_key(tx_hash, key).await?;

// Redeem the note through the relayer
let req = RedeemNoteRequest { note: note.clone(), decryption_key: receiver };
let req = RedeemNoteRequest { note: note.clone(), decryption_key: *key };
self.relayer_client.redeem_note(wallet.id, req, &root_key).await?;

// Mark the fee as redeemed
Expand All @@ -166,7 +170,7 @@ impl Indexer {
}

info!("successfully redeemed fee from tx: {}", tx_hash);
self.mark_fee_as_redeemed(tx_hash)
self.mark_fee_as_redeemed(tx_hash).await
}

// -------------------
Expand Down
Loading

0 comments on commit 97ddcb7

Please sign in to comment.