Skip to content

Commit

Permalink
feat: send task execution feedback on failure
Browse files Browse the repository at this point in the history
Signed-off-by: Guilherme Felipe da Silva <gfsilva.eng@gmail.com>
  • Loading branch information
frenzox committed Dec 18, 2024
1 parent 0388822 commit 5db5cd4
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 30 deletions.
25 changes: 13 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/solana-axelar-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn main() {
name_on_amplifier.clone(),
Arc::clone(&rpc_client),
amplifier_task_receiver,
amplifier_client.clone(),
file_based_storage,
);
let (solana_listener_component, solana_listener_client) = solana_listener::SolanaListener::new(
Expand Down
1 change: 1 addition & 0 deletions crates/solana-gateway-task-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ axelar-executable.workspace = true
num-traits.workspace = true
relayer-amplifier-state.workspace = true
its-instruction-builder.workspace = true
solana-listener.workspace = true

[dev-dependencies]
serde_json.workspace = true
Expand Down
138 changes: 120 additions & 18 deletions crates/solana-gateway-task-processor/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ use core::task::Poll;
use std::collections::VecDeque;
use std::sync::Arc;

use amplifier_api::types::TaskItem;
use amplifier_api::types::{
BigInt, CannotExecuteMessageEventV2, CannotExecuteMessageEventV2Metadata,
CannotExecuteMessageReason, Event, EventBase, EventId, EventMetadata, MessageExecutedEvent,
MessageExecutedEventMetadata, MessageExecutionStatus, PublishEventsRequest, TaskItem,
TaskItemId, Token, TxEvent,
};
use axelar_executable::AxelarMessagePayload;
use axelar_solana_encoding::borsh::BorshDeserialize as _;
use axelar_solana_encoding::types::execute_data::{ExecuteData, MerkleisedPayload};
Expand All @@ -14,8 +19,9 @@ use axelar_solana_gateway::state::incoming_message::command_id;
use effective_tx_sender::ComputeBudgetError;
use eyre::Context as _;
use futures::stream::{FusedStream as _, FuturesOrdered, FuturesUnordered};
use futures::StreamExt as _;
use futures::{SinkExt as _, StreamExt as _};
use num_traits::FromPrimitive as _;
use relayer_amplifier_api_integration::AmplifierCommand;
use relayer_amplifier_state::State;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_response::RpcSimulateTransactionResult;
Expand All @@ -35,6 +41,7 @@ pub struct SolanaTxPusher<S: State> {
name_on_amplifier: String,
rpc_client: Arc<RpcClient>,
task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver,
amplifier_client: relayer_amplifier_api_integration::AmplifierCommandClient,
state: S,
}

Expand All @@ -54,13 +61,15 @@ impl<S: State> SolanaTxPusher<S> {
name_on_amplifier: String,
rpc_client: Arc<RpcClient>,
task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver,
amplifier_client: relayer_amplifier_api_integration::AmplifierCommandClient,
state: S,
) -> Self {
Self {
config,
name_on_amplifier,
rpc_client,
task_receiver,
amplifier_client,
state,
}
}
Expand All @@ -82,11 +91,17 @@ impl<S: State> SolanaTxPusher<S> {
let solana_rpc_client = Arc::clone(&self.rpc_client);
let keypair = Arc::clone(&keypair);
let config_metadata = Arc::clone(&config_metadata);
let amplifier_client = self.amplifier_client.clone();
async move {
let command_id = task.id.clone();
let res =
process_task(&keypair, &solana_rpc_client, task, &config_metadata)
.await;
let res = process_task(
&keypair,
&solana_rpc_client,
amplifier_client,
task,
&config_metadata,
)
.await;
(command_id, res)
}
});
Expand Down Expand Up @@ -140,26 +155,67 @@ struct ConfigMetadata {
async fn process_task(
keypair: &Keypair,
solana_rpc_client: &RpcClient,
task: TaskItem,
mut amplifier_client: relayer_amplifier_api_integration::AmplifierCommandClient,
task_item: TaskItem,
metadata: &ConfigMetadata,
) -> eyre::Result<()> {
use amplifier_api::types::Task::{Execute, GatewayTx, Refund, Verify};
let signer = keypair.pubkey();
let gateway_root_pda = metadata.gateway_root_pda;

match task.task {
match task_item.task {
Verify(_verify_task) => {
tracing::warn!("solana blockchain is not supposed to receive the `verify_task`");
}
GatewayTx(task) => {
gateway_tx_task(task, gateway_root_pda, signer, solana_rpc_client, keypair).await?;
}
Execute(task) => {
let source_chain = task.message.source_chain.clone();
let message_id = task.message.message_id.clone();

// communicate with the destination program
execute_task(task, metadata, signer, solana_rpc_client, keypair)
if let Err(error) = execute_task(task, metadata, signer, solana_rpc_client, keypair)
.instrument(info_span!("execute task"))
.in_current_span()
.await?;
.await
{
let event = match error.downcast_ref::<ComputeBudgetError>() {
Some(&ComputeBudgetError::TransactionError {
source: ref _source,
signature,
}) => {
let tx = solana_listener::fetch_logs(signature, solana_rpc_client).await?;
message_executed_event(
signature,
source_chain,
message_id,
MessageExecutionStatus::Reverted,
Token {
token_id: None,
amount: BigInt::from_u64(tx.cost_in_lamports),
},
)
}
_ => {
// Any other error, probably happening before execution: Simulation eror,
// error building an instruction, parsing pubkey, rpc transport error,
// etc.
cannot_execute_message_event(
task_item.id,
source_chain,
message_id,
CannotExecuteMessageReason::Error,
error.to_string(),
)
}
};

let command = AmplifierCommand::PublishEvents(PublishEventsRequest {
events: vec![event],
});
amplifier_client.sender.send(command).await?;
};
}
Refund(_refund_task) => {
tracing::error!("refund task not implemented");
Expand All @@ -169,6 +225,54 @@ async fn process_task(
Ok(())
}

fn message_executed_event(
tx_signature: Signature,
source_chain: String,
message_id: TxEvent,
status: MessageExecutionStatus,
cost: Token,
) -> Event {
let event_id = EventId::tx_reverted_event_id(&tx_signature.to_string());
let metadata = MessageExecutedEventMetadata::builder().build();
let event_metadata = EventMetadata::builder().extra(metadata).build();
let event_base = EventBase::builder()
.event_id(event_id)
.meta(Some(event_metadata))
.build();
Event::MessageExecuted(MessageExecutedEvent {
base: event_base,
message_id,
source_chain,
status,
cost,
})
}

fn cannot_execute_message_event(
task_item_id: TaskItemId,
source_chain: String,
message_id: TxEvent,
reason: CannotExecuteMessageReason,
details: String,
) -> Event {
let event_id = EventId::cannot_execute_task_event_id(&task_item_id);
let metadata = CannotExecuteMessageEventV2Metadata::builder()
.task_item_id(task_item_id)
.build();
let event_metadata = EventMetadata::builder().extra(metadata).build();
let event_base = EventBase::builder()
.meta(Some(event_metadata))
.event_id(event_id)
.build();
Event::CannotExecuteMessageV2(CannotExecuteMessageEventV2 {
base: event_base,
reason,
details,
message_id,
source_chain,
})
}

async fn execute_task(
execute_task: amplifier_api::types::ExecuteTask,
metadata: &ConfigMetadata,
Expand Down Expand Up @@ -211,7 +315,7 @@ async fn execute_task(
)
.await?;

send_tx(solana_rpc_client, keypair, ix).await?;
send_transaction(solana_rpc_client, keypair, ix).await?;
}
axelar_solana_governance::ID => {
// todo Governance specific handling
Expand Down Expand Up @@ -240,7 +344,7 @@ async fn execute_task(
&payload,
gateway_incoming_message_pda,
)?;
send_tx(solana_rpc_client, keypair, ix).await?;
send_transaction(solana_rpc_client, keypair, ix).await?;
}
}
Ok(())
Expand Down Expand Up @@ -269,7 +373,7 @@ async fn gateway_tx_task(
gateway_root_pda,
execute_data.payload_merkle_root,
)?;
send_tx(solana_rpc_client, keypair, ix).await?;
send_gateway_tx(solana_rpc_client, keypair, ix).await?;

// verify each signature in the signing session
let mut verifier_ver_future_set = execute_data
Expand All @@ -283,7 +387,7 @@ async fn gateway_tx_task(
verifier_info,
)
.ok()?;
Some(send_tx(solana_rpc_client, keypair, ix))
Some(send_gateway_tx(solana_rpc_client, keypair, ix))
})
.collect::<FuturesUnordered<_>>();
while let Some(result) = verifier_ver_future_set.next().await {
Expand All @@ -306,7 +410,7 @@ async fn gateway_tx_task(
None,
new_verifier_set_merkle_root,
)?;
send_tx(solana_rpc_client, keypair, ix).await?;
send_gateway_tx(solana_rpc_client, keypair, ix).await?;
}
MerkleisedPayload::NewMessages { messages } => {
let mut merkelised_message_f_set = messages
Expand All @@ -326,7 +430,7 @@ async fn gateway_tx_task(
pda,
)
.ok()?;
Some(send_tx(solana_rpc_client, keypair, ix))
Some(send_gateway_tx(solana_rpc_client, keypair, ix))
})
.collect::<FuturesUnordered<_>>();
while let Some(result) = merkelised_message_f_set.next().await {
Expand All @@ -343,7 +447,7 @@ async fn gateway_tx_task(
///
/// In case the transaction fails and the error is not recoverable relayer will stop processing
/// and return the error.
async fn send_tx(
async fn send_gateway_tx(
solana_rpc_client: &RpcClient,
keypair: &Keypair,
ix: Instruction,
Expand Down Expand Up @@ -388,6 +492,4 @@ async fn send_transaction(
.await?
.send_tx()
.await
.map_err(eyre::Error::from)
.map_err(ComputeBudgetError::Generic)
}

0 comments on commit 5db5cd4

Please sign in to comment.