Skip to content

Commit

Permalink
funds-manager: Use diesel-async and add warp endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
joeykraut committed Jul 18, 2024
1 parent bf02ad0 commit 8503491
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 55 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,9 @@ renegade-circuit-types = { package = "circuit-types", git = "https://github.com/
renegade-crypto = { git = "https://github.com/renegade-fi/renegade.git" }
renegade-util = { package = "util", git = "https://github.com/renegade-fi/renegade.git" }

# === Database Dependencies === #
diesel = { version = "2.1" }
diesel-async = { version = "0.4" }

# === Misc Dependencies === #
tracing = "0.1"
2 changes: 1 addition & 1 deletion compliance/compliance-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ warp = "0.3"
compliance-api = { path = "../compliance-api" }

# === Database === #
diesel = { version = "2.2", features = ["postgres", "r2d2"] }
diesel = { workspace = true, features = ["postgres", "r2d2"] }

# === Renegade Dependencies === #
renegade-util = { workspace = true }
Expand Down
9 changes: 7 additions & 2 deletions funds-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "fee-sweeper"
name = "funds-manager"
description = "Manages custody of funds for protocol operator"
version = "0.1.0"
edition = "2021"
Expand All @@ -11,10 +11,15 @@ http-body-util = "0.1.0"
tokio = { version = "1.10", features = ["full"] }
warp = "0.3"


# === Infra === #
aws-sdk-secretsmanager = "1.37"
aws-config = "1.5"
diesel = { version = "2.2", features = ["postgres", "numeric", "uuid"] }
diesel = { workspace = true, features = ["postgres", "numeric", "uuid"] }
diesel-async = { workspace = true, features = ["postgres"] }
native-tls = "0.2"
postgres-native-tls = "0.5"
tokio-postgres = "0.7.7"

# === Blockchain Interaction === #
alloy-sol-types = "0.3.1"
Expand Down
29 changes: 29 additions & 0 deletions funds-manager/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
//! Database code

use diesel_async::{AsyncConnection, AsyncPgConnection};

Check failure on line 3 in funds-manager/src/db/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `AsyncConnection`

error: unused import: `AsyncConnection` --> funds-manager/src/db/mod.rs:3:20 | 3 | use diesel_async::{AsyncConnection, AsyncPgConnection}; | ^^^^^^^^^^^^^^^ | = note: `-D unused-imports` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(unused_imports)]`
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use renegade_util::err_str;
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};

Check failure on line 7 in funds-manager/src/db/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

unused imports: `MakeTlsConnect`, `TlsConnect`

error: unused imports: `MakeTlsConnect`, `TlsConnect` --> funds-manager/src/db/mod.rs:7:27 | 7 | use tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; | ^^^^^^^^^^^^^^ ^^^^^^^^^^

use crate::error::FundsManagerError;

pub mod models;
#[allow(missing_docs)]
pub mod schema;

/// Establish a connection to the database
pub async fn establish_connection(db_url: &str) -> Result<AsyncPgConnection, FundsManagerError> {
let connector = TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()
.map_err(err_str!(FundsManagerError::Db))?;
let connector = MakeTlsConnector::new(connector);
let (client, conn) = tokio_postgres::connect(db_url, connector)
.await
.map_err(err_str!(FundsManagerError::Db))?;

// Spawn the connection handle in a separate task
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("Connection error: {}", e);
}
});

AsyncPgConnection::try_from(client).await.map_err(err_str!(FundsManagerError::Db))
}
6 changes: 5 additions & 1 deletion funds-manager/src/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ diesel::table! {
}
}

diesel::allow_tables_to_appear_in_same_query!(fees, indexing_metadata, wallets,);
diesel::allow_tables_to_appear_in_same_query!(
fees,
indexing_metadata,
wallets,
);
25 changes: 25 additions & 0 deletions funds-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,28 @@ impl Display for FundsManagerError {
}
impl Error for FundsManagerError {}
impl Reject for FundsManagerError {}

/// API-specific error type
#[derive(Debug)]
pub enum ApiError {
/// Error during fee indexing
IndexingError(String),
/// Error during fee redemption
RedemptionError(String),
/// Internal server error
InternalError(String),
}

impl Reject for ApiError {}

impl Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ApiError::IndexingError(e) => write!(f, "Indexing error: {}", e),
ApiError::RedemptionError(e) => write!(f, "Redemption error: {}", e),
ApiError::InternalError(e) => write!(f, "Internal error: {}", e),
}
}
}

impl Error for ApiError {}
6 changes: 3 additions & 3 deletions funds-manager/src/indexer/index_fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::Indexer;
impl Indexer {
/// Index all fees since the given block
pub async fn index_fees(&mut self) -> Result<(), FundsManagerError> {
let block_number = self.get_latest_block()?;
let block_number = self.get_latest_block().await?;
info!("indexing fees from block {block_number}");

let filter = self
Expand All @@ -48,7 +48,7 @@ impl Indexer {

if block > most_recent_block {
most_recent_block = block;
self.update_latest_block(most_recent_block)?;
self.update_latest_block(most_recent_block).await?;
}
}

Expand Down Expand Up @@ -86,7 +86,7 @@ impl Indexer {

// Otherwise, index the note
let fee = NewFee::new_from_note(&note, tx);
self.insert_fee(fee)
self.insert_fee(fee).await
}

/// Get a note from a transaction body
Expand Down
14 changes: 8 additions & 6 deletions funds-manager/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use arbitrum_client::{client::ArbitrumClient, constants::Chain};
use aws_config::SdkConfig as AwsConfig;
use diesel::PgConnection;
use renegade_circuit_types::elgamal::{DecryptionKey, EncryptionKey};
use diesel_async::AsyncPgConnection;
use renegade_circuit_types::elgamal::DecryptionKey;
use renegade_util::hex::jubjub_from_hex_string;

use crate::relayer_client::RelayerClient;

Expand All @@ -24,7 +25,7 @@ pub(crate) struct Indexer {
/// The decryption key
pub decryption_keys: Vec<DecryptionKey>,
/// A connection to the DB
pub db_conn: PgConnection,
pub db_conn: AsyncPgConnection,
/// The AWS config
pub aws_config: AwsConfig,
}
Expand All @@ -37,7 +38,7 @@ impl Indexer {
aws_config: AwsConfig,
arbitrum_client: ArbitrumClient,
decryption_keys: Vec<DecryptionKey>,
db_conn: PgConnection,
db_conn: AsyncPgConnection,
relayer_client: RelayerClient,
) -> Self {
Indexer {
Expand All @@ -53,7 +54,8 @@ impl Indexer {

/// Get the decryption key for a given encryption key, referred to as a
/// receiver in this context
pub fn get_key_for_receiver(&self, receiver: EncryptionKey) -> Option<&DecryptionKey> {
self.decryption_keys.iter().find(|key| key.public_key() == receiver)
pub fn get_key_for_receiver(&self, receiver: &str) -> Option<&DecryptionKey> {
let key = jubjub_from_hex_string(receiver).ok()?;
self.decryption_keys.iter().find(|k| k.public_key() == key)
}
}
59 changes: 37 additions & 22 deletions funds-manager/src/indexer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
use std::collections::HashMap;

use bigdecimal::BigDecimal;
use diesel::define_sql_function;
use diesel::deserialize::Queryable;
use diesel::deserialize::QueryableByName;
use diesel::sql_function;
use diesel::sql_query;
use diesel::sql_types::SingleValue;
use diesel::sql_types::{Array, Integer, Nullable, Numeric, Text};
use diesel::PgArrayExpressionMethods;
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use renegade_constants::MAX_BALANCES;

use crate::db::models::WalletMetadata;
Expand All @@ -33,12 +34,12 @@ use super::redeem_fees::MAX_FEES_REDEEMED;
pub(crate) const LAST_INDEXED_BLOCK_KEY: &str = "latest_block";

// Define the `array_length` function
define_sql_function! {
sql_function! {
/// Calculate the length of an array
fn array_length<T>(array: Array<T>, dim: Integer) -> Nullable<Integer>;
}

define_sql_function! {
sql_function! {
/// Coalesce a nullable value with a default value
fn coalesce<T: SingleValue>(x: Nullable<T>, y: T) -> T;
}
Expand Down Expand Up @@ -75,12 +76,13 @@ impl Indexer {
// ------------------

/// Get the latest block number
pub(crate) fn get_latest_block(&mut self) -> Result<u64, FundsManagerError> {
pub(crate) async fn get_latest_block(&mut self) -> Result<u64, FundsManagerError> {
let entry = metadata_table
.filter(metadata_key.eq(LAST_INDEXED_BLOCK_KEY))
.limit(1)
.load(&mut self.db_conn)
.map(|res: Vec<Metadata>| res[0].clone())
.load::<Metadata>(&mut self.db_conn)
.await
.map(|res| res[0].clone())
.map_err(|_| FundsManagerError::db("failed to query latest block"))?;

entry
Expand All @@ -90,14 +92,15 @@ impl Indexer {
}

/// Update the latest block number
pub(crate) fn update_latest_block(
pub(crate) async fn update_latest_block(
&mut self,
block_number: u64,
) -> Result<(), FundsManagerError> {
let block_string = block_number.to_string();
diesel::update(metadata_table.find(LAST_INDEXED_BLOCK_KEY))
.set(metadata_value.eq(block_string))
.execute(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to update latest block"))
.map(|_| ())
}
Expand All @@ -107,40 +110,48 @@ impl Indexer {
// --------------

/// Insert a fee into the fees table
pub(crate) fn insert_fee(&mut self, fee: NewFee) -> Result<(), FundsManagerError> {
pub(crate) async fn insert_fee(&mut self, fee: NewFee) -> Result<(), FundsManagerError> {
diesel::insert_into(fees_table)
.values(vec![fee])
.execute(&mut self.db_conn)
.map_err(|_| FundsManagerError::db("failed to insert fee: {}"))
.await
.map_err(|e| FundsManagerError::db(format!("failed to insert fee: {e}")))
.map(|_| ())
}

/// Get all mints that have unredeemed fees
pub(crate) fn get_unredeemed_fee_mints(&mut self) -> Result<Vec<String>, FundsManagerError> {
pub(crate) async fn get_unredeemed_fee_mints(
&mut self,
) -> Result<Vec<String>, FundsManagerError> {
let mints = fees_table
.select(mint_col)
.filter(redeemed_col.eq(false))
.distinct()
.load(&mut self.db_conn)
.load::<String>(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to query unredeemed fees"))?;

Ok(mints)
}

/// Mark a fee as redeemed
pub(crate) fn mark_fee_as_redeemed(&mut self, tx_hash: &str) -> Result<(), FundsManagerError> {
pub(crate) async fn mark_fee_as_redeemed(
&mut self,
tx_hash: &str,
) -> Result<(), FundsManagerError> {
let filter = tx_hash_col.eq(tx_hash);
diesel::update(fees_table.filter(filter))
.set(redeemed_col.eq(true))
.execute(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to mark fee as redeemed"))
.map(|_| ())
}

/// Get the most valuable fees to be redeemed
///
/// Returns the tx hashes of the most valuable fees to be redeemed
pub(crate) fn get_most_valuable_fees(
pub(crate) async fn get_most_valuable_fees(
&mut self,
prices: HashMap<String, f64>,
) -> Result<Vec<FeeValue>, FundsManagerError> {
Expand Down Expand Up @@ -175,7 +186,8 @@ impl Indexer {

// Query for the tx hashes
sql_query(query_string)
.load(&mut self.db_conn)
.load::<FeeValue>(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to query most valuable fees"))
}

Expand All @@ -186,39 +198,42 @@ impl Indexer {
/// Get the wallet managing an mint, if it exists
///
/// Returns the id and secret id of the wallet
pub(crate) fn get_wallet_for_mint(
pub(crate) async fn get_wallet_for_mint(
&mut self,
mint: &str,
) -> Result<Option<WalletMetadata>, FundsManagerError> {
let wallets: Vec<WalletMetadata> = wallet_table
.filter(managed_mints_col.contains(vec![mint]))
.load(&mut self.db_conn)
.load::<WalletMetadata>(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to query wallet for mint"))?;

Ok(wallets.first().cloned())
Ok(wallets.into_iter().next())
}

/// Find a wallet with an empty balance slot, if one exists
pub(crate) fn find_wallet_with_empty_balance(
pub(crate) async fn find_wallet_with_empty_balance(
&mut self,
) -> Result<Option<WalletMetadata>, FundsManagerError> {
let n_mints = coalesce(array_length(managed_mints_col, 1 /* dim */), 0);
let wallets = wallet_table
.filter(n_mints.lt(MAX_BALANCES as i32))
.load(&mut self.db_conn)
.load::<WalletMetadata>(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to query wallets with empty balances"))?;

Ok(wallets.first().cloned())
Ok(wallets.into_iter().next())
}

/// Insert a new wallet into the wallets table
pub(crate) fn insert_wallet(
pub(crate) async fn insert_wallet(
&mut self,
wallet: WalletMetadata,
) -> Result<(), FundsManagerError> {
diesel::insert_into(wallet_table)
.values(vec![wallet])
.execute(&mut self.db_conn)
.await
.map_err(|_| FundsManagerError::db("failed to insert wallet"))
.map(|_| ())
}
Expand Down
Loading

0 comments on commit 8503491

Please sign in to comment.