Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Guilherme Felipe da Silva <gfsilva.eng@gmail.com>
  • Loading branch information
frenzox committed Dec 17, 2024
1 parent 9d9666c commit d5c050b
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 21 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/solana-axelar-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn main() {
name_on_amplifier.clone(),
Arc::clone(&rpc_client),
amplifier_task_receiver,
amplifier_client.clone(),
file_based_storage,
);
let (solana_listener_component, solana_listener_client) = solana_listener::SolanaListener::new(
Expand Down
78 changes: 69 additions & 9 deletions crates/solana-gateway-task-processor/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use core::task::Poll;
use std::collections::VecDeque;
use std::sync::Arc;

use amplifier_api::types::TaskItem;
use amplifier_api::types::{
CannotExecuteMessageEventV2, CannotExecuteMessageEventV2Metadata, CannotExecuteMessageReason,
Event, EventBase, EventId, EventMetadata, PublishEventsRequest, TaskItem, TaskItemId, TxEvent,
};
use axelar_executable::AxelarMessagePayload;
use axelar_solana_encoding::borsh::BorshDeserialize as _;
use axelar_solana_encoding::types::execute_data::{ExecuteData, MerkleisedPayload};
Expand All @@ -14,8 +17,9 @@ use axelar_solana_gateway::state::incoming_message::command_id;
use effective_tx_sender::ComputeBudgetError;
use eyre::Context as _;
use futures::stream::{FusedStream as _, FuturesOrdered, FuturesUnordered};
use futures::StreamExt as _;
use futures::{SinkExt as _, StreamExt as _};
use num_traits::FromPrimitive as _;
use relayer_amplifier_api_integration::AmplifierCommand;
use relayer_amplifier_state::State;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_response::RpcSimulateTransactionResult;
Expand All @@ -35,6 +39,7 @@ pub struct SolanaTxPusher<S: State> {
name_on_amplifier: String,
rpc_client: Arc<RpcClient>,
task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver,
amplifier_client: relayer_amplifier_api_integration::AmplifierCommandClient,
state: S,
}

Expand All @@ -54,13 +59,15 @@ impl<S: State> SolanaTxPusher<S> {
name_on_amplifier: String,
rpc_client: Arc<RpcClient>,
task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver,
amplifier_client: relayer_amplifier_api_integration::AmplifierCommandClient,
state: S,
) -> Self {
Self {
config,
name_on_amplifier,
rpc_client,
task_receiver,
amplifier_client,
state,
}
}
Expand All @@ -82,11 +89,17 @@ impl<S: State> SolanaTxPusher<S> {
let solana_rpc_client = Arc::clone(&self.rpc_client);
let keypair = Arc::clone(&keypair);
let config_metadata = Arc::clone(&config_metadata);
let amplifier_client = self.amplifier_client.clone();
async move {
let command_id = task.id.clone();
let res =
process_task(&keypair, &solana_rpc_client, task, &config_metadata)
.await;
let res = process_task(
&keypair,
&solana_rpc_client,
amplifier_client,
task,
&config_metadata,
)
.await;
(command_id, res)
}
});
Expand Down Expand Up @@ -140,26 +153,44 @@ struct ConfigMetadata {
async fn process_task(
keypair: &Keypair,
solana_rpc_client: &RpcClient,
task: TaskItem,
mut amplifier_client: relayer_amplifier_api_integration::AmplifierCommandClient,
task_item: TaskItem,
metadata: &ConfigMetadata,
) -> eyre::Result<()> {
use amplifier_api::types::Task::{Execute, GatewayTx, Refund, Verify};
let signer = keypair.pubkey();
let gateway_root_pda = metadata.gateway_root_pda;

match task.task {
match task_item.task {
Verify(_verify_task) => {
tracing::warn!("solana blockchain is not supposed to receive the `verify_task`");
}
GatewayTx(task) => {
gateway_tx_task(task, gateway_root_pda, signer, solana_rpc_client, keypair).await?;
}
Execute(task) => {
let source_chain = task.message.source_chain.clone();
let message_id = task.message.message_id.clone();

// communicate with the destination program
execute_task(task, metadata, signer, solana_rpc_client, keypair)
if let Err(error) = execute_task(task, metadata, signer, solana_rpc_client, keypair)
.instrument(info_span!("execute task"))
.in_current_span()
.await?;
.await
{
let event = cannot_execute_message_event(
task_item.id,
source_chain,
message_id,
CannotExecuteMessageReason::Error,
error.to_string(),
);
let command = AmplifierCommand::PublishEvents(PublishEventsRequest {
events: vec![event],
});

amplifier_client.sender.send(command).await?;
};
}
Refund(_refund_task) => {
tracing::error!("refund task not implemented");
Expand All @@ -169,6 +200,35 @@ async fn process_task(
Ok(())
}

fn cannot_execute_message_event_id(task_id: &TaskItemId) -> EventId {
TxEvent(format!("cannot-execute-task-v2-{}", task_id.0))
}

fn cannot_execute_message_event(
task_item_id: TaskItemId,
source_chain: String,
message_id: TxEvent,
reason: CannotExecuteMessageReason,
details: String,
) -> Event {
let event_id = cannot_execute_message_event_id(&task_item_id);
let metadata = CannotExecuteMessageEventV2Metadata::builder()
.task_item_id(task_item_id)
.build();
let event_metadata = EventMetadata::builder().extra(metadata).build();
let event_base = EventBase::builder()
.meta(Some(event_metadata))
.event_id(event_id)
.build();
Event::CannotExecuteMessageV2(CannotExecuteMessageEventV2 {
base: event_base,
reason,
details,
message_id,
source_chain,
})
}

async fn execute_task(
execute_task: amplifier_api::types::ExecuteTask,
metadata: &ConfigMetadata,
Expand Down

0 comments on commit d5c050b

Please sign in to comment.