Skip to content

Commit

Permalink
funds-manager-server: Use connection pool instead of raw connections
Browse files Browse the repository at this point in the history
  • Loading branch information
joeykraut committed Jul 30, 2024
1 parent 43b7c0b commit f879777
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 62 deletions.
3 changes: 2 additions & 1 deletion funds-manager/funds-manager-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ warp = "0.3"
# === Infra === #
aws-sdk-secretsmanager = "1.37"
aws-config = "1.5"
bb8 = "0.8"
diesel = { workspace = true, features = ["postgres", "numeric", "uuid"] }
diesel-async = { workspace = true, features = ["postgres"] }
diesel-async = { workspace = true, features = ["postgres", "bb8"] }
fireblocks-sdk = { git = "https://github.com/renegade-fi/fireblocks-sdk-rs" }
native-tls = "0.2"
postgres-native-tls = "0.5"
Expand Down
11 changes: 10 additions & 1 deletion funds-manager/funds-manager-server/src/custody_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use tracing::info;

use crate::db::{DbConn, DbPool};
use crate::error::FundsManagerError;

abigen!(
Expand Down Expand Up @@ -58,6 +59,8 @@ pub struct CustodyClient {
fireblocks_api_secret: Vec<u8>,
/// The arbitrum RPC url to use for the custody client
arbitrum_rpc_url: String,
/// The database connection pool
db_pool: Arc<DbPool>,
}

impl CustodyClient {
Expand All @@ -67,9 +70,10 @@ impl CustodyClient {
fireblocks_api_key: String,
fireblocks_api_secret: String,
arbitrum_rpc_url: String,
db_pool: Arc<DbPool>,
) -> Self {
let fireblocks_api_secret = fireblocks_api_secret.as_bytes().to_vec();
Self { fireblocks_api_key, fireblocks_api_secret, arbitrum_rpc_url }
Self { fireblocks_api_key, fireblocks_api_secret, arbitrum_rpc_url, db_pool }
}

/// Get a fireblocks client
Expand Down Expand Up @@ -142,4 +146,9 @@ impl CustodyClient {
.map_err(FundsManagerError::fireblocks)
.map(|(tx, _rid)| tx)
}

/// Get a database connection from the pool
pub async fn get_db_conn(&self) -> Result<DbConn, FundsManagerError> {
self.db_pool.get().await.map_err(|e| FundsManagerError::Db(e.to_string()))
}
}
29 changes: 24 additions & 5 deletions funds-manager/funds-manager-server/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
//! Database code

use diesel_async::AsyncPgConnection;
use bb8::{Pool, PooledConnection};
use diesel::ConnectionError;
use diesel_async::{
pooled_connection::{AsyncDieselConnectionManager, ManagerConfig},
AsyncPgConnection,
};
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use renegade_util::err_str;
Expand All @@ -12,19 +17,33 @@ pub mod models;
#[allow(missing_docs)]
pub mod schema;

/// The DB connection type
pub type DbConn<'a> = PooledConnection<'a, AsyncDieselConnectionManager<AsyncPgConnection>>;
/// The DB pool type
pub type DbPool = Pool<AsyncDieselConnectionManager<AsyncPgConnection>>;

/// Create a connection pool for the given database url
pub async fn create_db_pool(db_url: &str) -> Result<DbPool, FundsManagerError> {
let mut conf = ManagerConfig::default();
conf.custom_setup = Box::new(move |url| Box::pin(establish_connection(url)));

let manager = AsyncDieselConnectionManager::new_with_config(db_url, conf);
Pool::builder().build(manager).await.map_err(err_str!(FundsManagerError::Db))
}

/// Establish a connection to the database
pub async fn establish_connection(db_url: &str) -> Result<AsyncPgConnection, FundsManagerError> {
pub async fn establish_connection(db_url: &str) -> Result<AsyncPgConnection, ConnectionError> {
// Build a TLS connector, we don't validate certificates for simplicity.
// Practically this is unnecessary because we will be limiting our traffic to
// within a siloed environment when deployed
let connector = TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()
.map_err(err_str!(FundsManagerError::Db))?;
.expect("failed to build tls connector");
let connector = MakeTlsConnector::new(connector);
let (client, conn) = tokio_postgres::connect(db_url, connector)
.await
.map_err(err_str!(FundsManagerError::Db))?;
.map_err(err_str!(ConnectionError::BadConnection))?;

// Spawn the connection handle in a separate task
tokio::spawn(async move {
Expand All @@ -33,5 +52,5 @@ pub async fn establish_connection(db_url: &str) -> Result<AsyncPgConnection, Fun
}
});

AsyncPgConnection::try_from(client).await.map_err(err_str!(FundsManagerError::Db))
AsyncPgConnection::try_from(client).await
}
18 changes: 13 additions & 5 deletions funds-manager/funds-manager-server/src/fee_indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

use arbitrum_client::{client::ArbitrumClient, constants::Chain};
use aws_config::SdkConfig as AwsConfig;
use diesel_async::AsyncPgConnection;
use renegade_circuit_types::elgamal::DecryptionKey;
use renegade_util::err_str;
use renegade_util::hex::jubjub_from_hex_string;
use std::sync::Arc;

use crate::custody_client::CustodyClient;
use crate::db::{DbConn, DbPool};
use crate::error::FundsManagerError;
use crate::relayer_client::RelayerClient;

pub mod fee_balances;
Expand All @@ -26,8 +29,8 @@ pub(crate) struct Indexer {
pub arbitrum_client: ArbitrumClient,
/// The decryption key
pub decryption_keys: Vec<DecryptionKey>,
/// A connection to the DB
pub db_conn: AsyncPgConnection,
/// The database connection pool
pub db_pool: Arc<DbPool>,
/// The AWS config
pub aws_config: AwsConfig,
/// The custody client
Expand All @@ -43,7 +46,7 @@ impl Indexer {
aws_config: AwsConfig,
arbitrum_client: ArbitrumClient,
decryption_keys: Vec<DecryptionKey>,
db_conn: AsyncPgConnection,
db_pool: Arc<DbPool>,
relayer_client: RelayerClient,
custody_client: CustodyClient,
) -> Self {
Expand All @@ -52,7 +55,7 @@ impl Indexer {
chain,
arbitrum_client,
decryption_keys,
db_conn,
db_pool,
relayer_client,
aws_config,
custody_client,
Expand All @@ -65,4 +68,9 @@ impl Indexer {
let key = jubjub_from_hex_string(receiver).ok()?;
self.decryption_keys.iter().find(|k| k.public_key() == key)
}

/// Get a connection from the pool
pub async fn get_conn(&self) -> Result<DbConn, FundsManagerError> {
self.db_pool.get().await.map_err(err_str!(FundsManagerError::Db))
}
}
45 changes: 28 additions & 17 deletions funds-manager/funds-manager-server/src/fee_indexer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use diesel::sql_function;
use diesel::sql_query;
use diesel::sql_types::SingleValue;
use diesel::sql_types::{Array, Integer, Nullable, Numeric, Text};
use diesel::ExpressionMethods;
use diesel::PgArrayExpressionMethods;
use diesel::{ExpressionMethods, QueryDsl};
use diesel::QueryDsl;
use diesel_async::RunQueryDsl;
use renegade_common::types::wallet::WalletIdentifier;
use renegade_constants::MAX_BALANCES;
Expand Down Expand Up @@ -88,10 +89,11 @@ impl Indexer {

/// Get the latest block number
pub(crate) async fn get_latest_block(&mut self) -> Result<u64, FundsManagerError> {
let mut conn = self.get_conn().await?;
let entry = metadata_table
.filter(metadata_key.eq(LAST_INDEXED_BLOCK_KEY))
.limit(1)
.load::<Metadata>(&mut self.db_conn)
.load::<Metadata>(&mut conn)
.await
.map(|res| res[0].clone())
.map_err(|_| FundsManagerError::db("failed to query latest block"))?;
Expand All @@ -107,10 +109,11 @@ impl Indexer {
&mut self,
block_number: u64,
) -> Result<(), FundsManagerError> {
let mut conn = self.get_conn().await?;
let block_string = block_number.to_string();
diesel::update(metadata_table.find(LAST_INDEXED_BLOCK_KEY))
.set(metadata_value.eq(block_string))
.execute(&mut self.db_conn)
.execute(&mut conn)
.await
.map_err(|_| FundsManagerError::db("failed to update latest block"))
.map(|_| ())
Expand All @@ -122,7 +125,8 @@ impl Indexer {

/// Insert a fee into the fees table
pub(crate) async fn insert_fee(&mut self, fee: NewFee) -> Result<(), FundsManagerError> {
match diesel::insert_into(fees_table).values(vec![fee]).execute(&mut self.db_conn).await {
let mut conn = self.get_conn().await?;
match diesel::insert_into(fees_table).values(vec![fee]).execute(&mut conn).await {
Ok(_) => Ok(()),
Err(DieselError::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
Expand All @@ -139,11 +143,12 @@ impl Indexer {
pub(crate) async fn get_unredeemed_fee_mints(
&mut self,
) -> Result<Vec<String>, FundsManagerError> {
let mut conn = self.get_conn().await?;
let mints = fees_table
.select(mint_col)
.filter(redeemed_col.eq(false))
.distinct()
.load::<String>(&mut self.db_conn)
.load::<String>(&mut conn)
.await
.map_err(|_| FundsManagerError::db("failed to query unredeemed fees"))?;

Expand All @@ -155,18 +160,19 @@ impl Indexer {
&mut self,
tx_hash: &str,
) -> Result<(), FundsManagerError> {
let mut conn = self.get_conn().await?;
let filter = tx_hash_col.eq(tx_hash);
diesel::update(fees_table.filter(filter))
.set(redeemed_col.eq(true))
.execute(&mut self.db_conn)
.execute(&mut conn)
.await
.map_err(|_| FundsManagerError::db("failed to mark fee as redeemed"))
.map(|_| ())
}

/// Get the most valuable fees to be redeemed
///
/// Returns the tx hashes of the most valuable fees to be redeemed
/// Returns the `MAX_FEES_REDEEMED` most valuable fees to be redeemed
pub(crate) async fn get_most_valuable_fees(
&mut self,
prices: HashMap<String, f64>,
Expand Down Expand Up @@ -201,8 +207,9 @@ impl Indexer {
query_string.push_str(&format!("ORDER BY value DESC LIMIT {};", MAX_FEES_REDEEMED));

// Query for the tx hashes
let mut conn = self.get_conn().await?;
sql_query(query_string)
.load::<FeeValue>(&mut self.db_conn)
.load::<FeeValue>(&mut conn)
.await
.map_err(|_| FundsManagerError::db("failed to query most valuable fees"))
}
Expand All @@ -216,9 +223,10 @@ impl Indexer {
&mut self,
wallet_id: &Uuid,
) -> Result<WalletMetadata, FundsManagerError> {
let mut conn = self.get_conn().await?;
renegade_wallet_table
.filter(wallet_id_col.eq(wallet_id))
.first::<WalletMetadata>(&mut self.db_conn)
.first::<WalletMetadata>(&mut conn)
.await
.map_err(|e| FundsManagerError::db(format!("failed to get wallet by ID: {}", e)))
}
Expand All @@ -227,23 +235,23 @@ impl Indexer {
pub(crate) async fn get_all_wallets(
&mut self,
) -> Result<Vec<WalletMetadata>, FundsManagerError> {
let mut conn = self.get_conn().await?;
let wallets = renegade_wallet_table
.load::<WalletMetadata>(&mut self.db_conn)
.load::<WalletMetadata>(&mut conn)
.await
.map_err(|e| FundsManagerError::db(format!("failed to load wallets: {}", e)))?;
Ok(wallets)
}

/// Get the wallet managing an mint, if it exists
///
/// Returns the id and secret id of the wallet
/// Get the wallet managing a mint, if it exists
pub(crate) async fn get_wallet_for_mint(
&mut self,
mint: &str,
) -> Result<Option<WalletMetadata>, FundsManagerError> {
let mut conn = self.get_conn().await?;
let wallets: Vec<WalletMetadata> = renegade_wallet_table
.filter(managed_mints_col.contains(vec![mint]))
.load::<WalletMetadata>(&mut self.db_conn)
.load::<WalletMetadata>(&mut conn)
.await
.map_err(|_| FundsManagerError::db("failed to query wallet for mint"))?;

Expand All @@ -254,10 +262,11 @@ impl Indexer {
pub(crate) async fn find_wallet_with_empty_balance(
&mut self,
) -> Result<Option<WalletMetadata>, FundsManagerError> {
let mut conn = self.get_conn().await?;
let n_mints = coalesce(array_length(managed_mints_col, 1 /* dim */), 0);
let wallets = renegade_wallet_table
.filter(n_mints.lt(MAX_BALANCES as i32))
.load::<WalletMetadata>(&mut self.db_conn)
.load::<WalletMetadata>(&mut conn)
.await
.map_err(|_| FundsManagerError::db("failed to query wallets with empty balances"))?;

Expand All @@ -269,9 +278,10 @@ impl Indexer {
&mut self,
wallet: WalletMetadata,
) -> Result<(), FundsManagerError> {
let mut conn = self.get_conn().await?;
diesel::insert_into(renegade_wallet_table)
.values(vec![wallet])
.execute(&mut self.db_conn)
.execute(&mut conn)
.await
.map_err(|_| FundsManagerError::db("failed to insert wallet"))
.map(|_| ())
Expand All @@ -283,9 +293,10 @@ impl Indexer {
wallet_id: &WalletIdentifier,
mint: &str,
) -> Result<(), FundsManagerError> {
let mut conn = self.get_conn().await?;
diesel::update(renegade_wallet_table.find(wallet_id))
.set(managed_mints_col.eq(array_append(managed_mints_col, mint)))
.execute(&mut self.db_conn)
.execute(&mut conn)
.await
.map_err(|_| FundsManagerError::db("failed to add mint to wallet"))
.map(|_| ())
Expand Down
6 changes: 2 additions & 4 deletions funds-manager/funds-manager-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub const MAX_GAS_WITHDRAWAL_AMOUNT: f64 = 0.1; // 0.1 ETH
pub(crate) async fn index_fees_handler(server: Arc<Server>) -> Result<Json, warp::Rejection> {
let mut indexer = server
.build_indexer()
.await
.map_err(|e| warp::reject::custom(ApiError::InternalError(e.to_string())))?;
indexer
.index_fees()
Expand All @@ -36,7 +35,6 @@ pub(crate) async fn index_fees_handler(server: Arc<Server>) -> Result<Json, warp
pub(crate) async fn redeem_fees_handler(server: Arc<Server>) -> Result<Json, warp::Rejection> {
let mut indexer = server
.build_indexer()
.await
.map_err(|e| warp::reject::custom(ApiError::InternalError(e.to_string())))?;
indexer
.redeem_fees()
Expand Down Expand Up @@ -113,7 +111,7 @@ pub(crate) async fn get_fee_wallets_handler(
_body: Bytes, // no body
server: Arc<Server>,
) -> Result<Json, warp::Rejection> {
let mut indexer = server.build_indexer().await?;
let mut indexer = server.build_indexer()?;
let wallets = indexer.fetch_fee_wallets().await?;
Ok(warp::reply::json(&FeeWalletsResponse { wallets }))
}
Expand All @@ -123,7 +121,7 @@ pub(crate) async fn withdraw_fee_balance_handler(
req: WithdrawFeeBalanceRequest,
server: Arc<Server>,
) -> Result<Json, warp::Rejection> {
let mut indexer = server.build_indexer().await?;
let mut indexer = server.build_indexer()?;
indexer
.withdraw_fee_balance(req.wallet_id, req.mint)
.await
Expand Down
Loading

0 comments on commit f879777

Please sign in to comment.