Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Guilherme Felipe da Silva <gfsilva.eng@gmail.com>
  • Loading branch information
frenzox committed Dec 11, 2024
1 parent a7a4c57 commit 0dc6cdf
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 85 deletions.
258 changes: 183 additions & 75 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ solana-listener = { path = "crates/solana-listener" }
common-serde-utils = { path = "crates/common-serde-utils" }
solana-event-forwarder = { path = "crates/solana-event-forwarder" }
solana-tx-pusher = { path = "crates/solana-tx-pusher" }
rest-service = { path = "crates/rest-service" }
retrying-solana-http-sender = { path = "crates/retrying-solana-http-sender" }
solana-gateway-task-processor = { path = "crates/solana-gateway-task-processor" }
effective-tx-sender = { path = "crates/effective-tx-sender" }
Expand Down
11 changes: 11 additions & 0 deletions crates/rest-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,22 @@ license.workspace = true
edition.workspace = true

[dependencies]
amplifier-api.workspace = true
axum.workspace = true
eyre.workspace = true
futures.workspace = true
relayer-amplifier-api-integration.workspace = true
relayer-engine.workspace = true
serde.workspace = true
solana-client.workspace = true
solana-listener.workspace = true
solana-sdk.workspace = true
solana-transaction-status.workspace = true
thiserror.workspace = true
tokio.workspace = true

axelar-solana-gateway = { git = "ssh://git@github.com/eigerco/solana-axelar-internal.git", branch = "feat/gtw-large-outgoing-messages" , features = ["no-entrypoint"] }
gateway-event-stack = { git = "ssh://git@github.com/eigerco/solana-axelar-internal.git", branch = "feat/gtw-large-outgoing-messages" }

[lints]
workspace = true
151 changes: 151 additions & 0 deletions crates/rest-service/src/call_contract_offchain_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use std::sync::Arc;

use amplifier_api::types::{
CallEvent, CallEventMetadata, Event, EventBase, EventId, EventMetadata, GatewayV2Message,
MessageId, PublishEventsRequest, TxId,
};
use axelar_solana_gateway::processor::{CallContractOffchainDataEvent, GatewayEvent};
use axum::extract::{Json, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use futures::SinkExt as _;
use gateway_event_stack::{MatchContext, ProgramInvocationState};
use relayer_amplifier_api_integration::AmplifierCommand;
use serde::Deserialize;
use solana_listener::{fetch_logs, SolanaTransaction};
use solana_sdk::signature::Signature;
use thiserror::Error;

use crate::component::ServiceState;

pub(crate) const PATH: &str = "/call-contract-offchain-data";

#[derive(Deserialize)]
pub(crate) struct CallContractOffchainData {
transaction_signature: Signature,
data: Vec<u8>,
}

#[derive(Debug, Error)]
pub(crate) enum CallContractOffchainDataError {
/// Failed to fetch transaction logs.
#[error("Failed to fetch transaction logs: {0}")]
FailedToFetchTransactionLogs(#[from] eyre::Report),

/// The hash of the given payload doesn't match the one emitted in the transaction logs.
#[error("Payload hashes don't match")]
PayloadHashMismatch,

#[error("Successful transaction with CallContractOffchainDataEvent not found")]
EventNotFound,

#[error("Failed to relay message to Axelar Amplifier")]
FailedToRelayToAmplifier,
}

impl IntoResponse for CallContractOffchainDataError {
fn into_response(self) -> axum::response::Response {
let error_tuple = match self {
Self::FailedToFetchTransactionLogs(report) => {
(StatusCode::INTERNAL_SERVER_ERROR, report.to_string())
}
Self::FailedToRelayToAmplifier => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),

Self::PayloadHashMismatch => (StatusCode::BAD_REQUEST, self.to_string()),
Self::EventNotFound => (StatusCode::NOT_FOUND, self.to_string()),
};

error_tuple.into_response()
}
}

pub(crate) async fn handler(
State(state): State<Arc<ServiceState>>,
Json(payload): Json<CallContractOffchainData>,
) -> Result<(), CallContractOffchainDataError> {
let solana_transaction = fetch_logs(payload.transaction_signature, &state.rpc_client()).await?;

let match_context = MatchContext::new(&axelar_solana_gateway::id().to_string());
let invocations = gateway_event_stack::build_program_event_stack(
&match_context,
solana_transaction.logs.as_slice(),
gateway_event_stack::parse_gateway_logs,
);

for invocation in invocations {
if let ProgramInvocationState::Succeeded(events) = invocation {
for (idx, event) in events {
if let GatewayEvent::CallContractOffchainData(event_data) = event {
if event_data.payload_hash == solana_sdk::hash::hash(&payload.data).to_bytes() {
let amplifier_event = build_amplifier_event(
state.chain_name().to_owned(),
&solana_transaction,
event_data,
payload,
idx,
);

let command = AmplifierCommand::PublishEvents(PublishEventsRequest {
events: vec![amplifier_event],
});

let mut sender = state.amplifier_client().sender.clone();

sender.send(command).await.map_err(|_err| {
CallContractOffchainDataError::FailedToRelayToAmplifier
})?;

return Ok(());
}

return Err(CallContractOffchainDataError::PayloadHashMismatch);
}
}
}
}

Err(CallContractOffchainDataError::EventNotFound)
}

fn build_amplifier_event(
source_chain: String,
transaction: &SolanaTransaction,
solana_event: CallContractOffchainDataEvent,
payload: CallContractOffchainData,
log_index: usize,
) -> Event {
let tx_id = TxId(transaction.signature.to_string());
let message_id = MessageId::new(&transaction.signature.to_string(), log_index);
let event_id = EventId::new(&transaction.signature.to_string(), log_index);
let source_address = solana_event.sender_key.to_string();

Event::Call(
CallEvent::builder()
.base(
EventBase::builder()
.event_id(event_id)
.meta(Some(
EventMetadata::builder()
.tx_id(Some(tx_id))
.timestamp(None)
.from_address(Some(source_address.clone()))
.finalized(Some(true))
.extra(CallEventMetadata::builder().build())
.build(),
))
.build(),
)
.message(
GatewayV2Message::builder()
.message_id(message_id)
.source_chain(source_chain)
.source_address(source_address)
.destination_address(solana_event.destination_contract_address)
.payload_hash(solana_event.payload_hash.to_vec())
.build(),
)
.destination_chain(solana_event.destination_chain)
.payload(payload.data)
.build(),
)
}
61 changes: 55 additions & 6 deletions crates/rest-service/src/component.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
//! REST service component for the relayer.
use core::future::Future;
use core::net::SocketAddrV4;
use core::net::SocketAddr;
use core::pin::Pin;
use std::sync::Arc;

use axum::extract::DefaultBodyLimit;
use axum::routing::post;
use axum::Router;
use relayer_amplifier_api_integration::AmplifierCommandClient;
use solana_client::nonblocking::rpc_client::RpcClient;

/// TODO: Doc
use crate::call_contract_offchain_data;

/// The REST service component for the relayer.
pub struct RestService {
router: Router,
socket_addr: SocketAddrV4,
socket_addr: SocketAddr,
}

impl relayer_engine::RelayerComponent for RestService {
Expand All @@ -18,11 +26,52 @@ impl relayer_engine::RelayerComponent for RestService {
}
}

pub(crate) struct ServiceState {
amplifier_client: AmplifierCommandClient,
rpc_client: Arc<RpcClient>,
chain_name: String,
}

impl ServiceState {
pub(crate) fn rpc_client(&self) -> Arc<RpcClient> {
Arc::clone(&self.rpc_client)
}

pub(crate) fn chain_name(&self) -> &str {
&self.chain_name
}

pub(crate) const fn amplifier_client(&self) -> &AmplifierCommandClient {
&self.amplifier_client
}
}

impl RestService {
#[must_use]
/// TODO: Doc
pub fn new(config: &crate::Config) -> Self {
let router = Router::new();
/// Create a new REST service component with the given configuration and RPC client.
pub fn new(
config: crate::Config,
rpc_client: Arc<RpcClient>,
amplifier_client: AmplifierCommandClient,
) -> Self {
let state = ServiceState {
amplifier_client,
rpc_client,
chain_name: config.chain_name,
};
let router = Router::new()
.route(
call_contract_offchain_data::PATH,
post(call_contract_offchain_data::handler),
)
.with_state(Arc::new(state))
.layer(DefaultBodyLimit::max(
config
.call_contract_offchain_data_size_limit
.saturating_add(size_of::<
call_contract_offchain_data::CallContractOffchainData,
>()),
));

Self {
router,
Expand Down
15 changes: 13 additions & 2 deletions crates/rest-service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
use std::net::SocketAddrV4;
//! This crate provides a REST service component for the relayer.
use core::net::SocketAddr;

use serde::{Deserialize, Serialize};

mod call_contract_offchain_data;
pub mod component;

/// Configuration for the REST service.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct Config {
pub socket_addr: SocketAddrV4,
/// Source chain name.
pub chain_name: String,
/// The socket address to bind to.
pub socket_addr: SocketAddr,
/// The maximum size of the data in a contract call with offchain data handling.
pub call_contract_offchain_data_size_limit: usize,
}
1 change: 1 addition & 0 deletions crates/solana-axelar-relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ toml.workspace = true
tokio.workspace = true
serde.workspace = true
relayer-amplifier-api-integration.workspace = true
rest-service.workspace = true
solana-listener.workspace = true
solana-event-forwarder.workspace = true
solana-gateway-task-processor.workspace = true
Expand Down
22 changes: 22 additions & 0 deletions crates/solana-axelar-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,21 @@ async fn main() {
let solana_event_forwarder_component = solana_event_forwarder::SolanaEventForwarder::new(
event_forwarder_config,
solana_listener_client,
amplifier_client.clone(),
);

let rest_service_component = rest_service::component::RestService::new(
config.rest_service,
Arc::clone(&rpc_client),
amplifier_client,
);

let components: Vec<Box<dyn RelayerComponent>> = vec![
Box::new(amplifier_component),
Box::new(solana_listener_component),
Box::new(solana_event_forwarder_component),
Box::new(gateway_task_processor),
Box::new(rest_service_component),
];
RelayerEngine::new(config.relayer_engine, components)
.start_and_wait_for_shutdown()
Expand Down Expand Up @@ -86,6 +94,8 @@ pub struct Config {
pub solana_rpc: retrying_solana_http_sender::Config,
/// Path to the storage configuration file
pub storage_path: std::path::PathBuf,
/// Configuration for the REST service
pub rest_service: rest_service::Config,
}

#[expect(
Expand Down Expand Up @@ -123,6 +133,8 @@ mod tests {
let latest_processed_signature = Signature::new_unique().to_string();
let identity = identity_fixture();
let missed_signature_catchup_strategy = "until_beginning";
let rest_service_bind_addr = "127.0.0.1:80";
let call_contract_offchain_data_size_limit = 10 * 1024 * 1024;
let input = indoc::formatdoc! {r#"
storage_path = "./store"
Expand Down Expand Up @@ -151,6 +163,11 @@ mod tests {
[solana_rpc]
max_concurrent_rpc_requests = {max_concurrent_rpc_requests}
solana_http_rpc = "{solana_rpc}"
[rest_service]
chain_name = "{chain}"
socket_addr = "{rest_service_bind_addr}"
call_contract_offchain_data_size_limit = {call_contract_offchain_data_size_limit}
"#};

let parsed: Config = toml::from_str(&input)?;
Expand Down Expand Up @@ -181,6 +198,11 @@ mod tests {
solana_http_rpc: solana_rpc,
},
storage_path: "./store".parse().unwrap(),
rest_service: rest_service::Config {
chain_name: chain.to_owned(),
socket_addr: SocketAddr::from_str(rest_service_bind_addr)?,
call_contract_offchain_data_size_limit,
},
};
assert_eq!(parsed, expected);
Ok(())
Expand Down
1 change: 1 addition & 0 deletions crates/solana-listener/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod log_processor;
mod signature_batch_scanner;
mod signature_realtime_scanner;

pub use log_processor::fetch_logs;
/// Typical message with the produced work.
/// Contains the handle to a task that resolves into a
/// [`SolanaTransaction`].
Expand Down
10 changes: 9 additions & 1 deletion crates/solana-listener/src/component/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ pub(crate) async fn fetch_and_send(
Ok(())
}

pub(crate) async fn fetch_logs(
/// Fetch the logs of a Solana transaction.
///
/// # Errors
///
/// - If request to the Solana RPC fails
/// - If the metadata is not included with the logs
/// - If the logs are not included
/// - If the transaction was not successful
pub async fn fetch_logs(
signature: Signature,
rpc_client: &RpcClient,
) -> eyre::Result<SolanaTransaction> {
Expand Down
2 changes: 1 addition & 1 deletion crates/solana-listener/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
mod component;
mod config;

pub use component::{SolanaListener, SolanaListenerClient, SolanaTransaction};
pub use component::{fetch_logs, SolanaListener, SolanaListenerClient, SolanaTransaction};
pub use config::{Config, MissedSignatureCatchupStrategy};
pub use solana_sdk;

0 comments on commit 0dc6cdf

Please sign in to comment.