Skip to content

Commit

Permalink
Semaphore configurable, building transaction to debug only after gett…
Browse files Browse the repository at this point in the history
…ing semaphore
  • Loading branch information
ochaloup committed May 7, 2024
1 parent 7800d2c commit d299364
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions libs/solana-transaction-builder-executor/src/builder_executor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::anyhow;
use async_stream::stream;
use cached::proc_macro::cached;
use log::{error, info};
use log::{debug, error, info};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_client::SerializableTransaction;
use solana_sdk::{
Expand Down Expand Up @@ -72,7 +72,7 @@ pub async fn execute_transactions_in_sequence(
for (index, async_transaction_builder) in async_transaction_builders.into_iter().enumerate() {
let human_index = index + 1;
let tx_uuid = &async_transaction_builder.tx_uuid;
info!("Building the transaction {human_index}/{sequence_length}: {tx_uuid}");
debug!("Building the transaction {human_index}/{tx_uuid} (size: {sequence_length})");

let async_transaction_builder = async_transaction_builder.clone();
match transaction_executor
Expand All @@ -84,11 +84,11 @@ pub async fn execute_transactions_in_sequence(
})
.await
{
Ok(result) => {
info!("Successfully executed the transaction {tx_uuid} {human_index}/{sequence_length}: {:?}", result)
Ok(sig) => {
info!("Transaction {sig} {human_index}/{tx_uuid} executed successfully");
}
Err(err) => {
anyhow::bail!("Failed to execute the transaction {tx_uuid} {human_index}/{sequence_length}, err: {err}");
anyhow::bail!("Transaction {human_index}/{tx_uuid} failed: {:?}", err);
}
};
}
Expand All @@ -99,10 +99,12 @@ pub async fn execute_transactions_in_sequence(
pub async fn execute_transactions_in_parallel(
transaction_executor: Arc<TransactionExecutor>,
async_transaction_builders: Vec<TransactionBuilderExecutionData>,
parallel_execution_limit: Option<usize>,
) -> anyhow::Result<()> {
let sequence_length = async_transaction_builders.len();

let semaphore = Arc::new(Semaphore::new(PARRALLEL_EXECUTION_LIMIT));
let parallel_execution_limit = parallel_execution_limit.unwrap_or(PARRALLEL_EXECUTION_LIMIT);
let semaphore = Arc::new(Semaphore::new(parallel_execution_limit));

// Prepare the list of futures with their associated tx_uuid and human_index
let futures = async_transaction_builders
Expand All @@ -111,8 +113,8 @@ pub async fn execute_transactions_in_parallel(
.map(|(index, async_transaction_builder)| {
let human_index = index + 1;
let tx_uuid = async_transaction_builder.tx_uuid.clone();
info!("Building the transaction {human_index}/{sequence_length}: {tx_uuid}");
let semaphore = Arc::clone(&semaphore);
debug!("Building the transaction {human_index}/{tx_uuid} (size: {sequence_length})");
let transaction_executor = Arc::clone(&transaction_executor);
let transaction_future = async move {
let _permit = semaphore.acquire().await.expect("Failed to acquire semaphore");
Expand Down Expand Up @@ -141,7 +143,7 @@ pub async fn execute_transactions_in_parallel(
for (tx_uuid, human_index, result) in results {
match result {
Ok(sig) => {
info!("Transaction {sig} ({human_index}/({tx_uuid}) executed successfully.");
info!("Transaction {sig} {human_index}/{tx_uuid} executed successfully");
}
Err(err) => {
let error_msg = format!("Transaction {human_index}/{tx_uuid} failed: {:?}", err);
Expand Down

0 comments on commit d299364

Please sign in to comment.