diff --git a/Cargo.lock b/Cargo.lock index b14b16f..565a634 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -655,6 +655,18 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "axelar-executable" +version = "0.1.0" +source = "git+https://github.com/eigerco/solana-axelar.git?branch=main#09c8619fb9dc3ea61cf855150886266f01f208a7" +dependencies = [ + "axelar-message-primitives", + "axelar-rkyv-encoding", + "borsh 1.5.1", + "gmp-gateway", + "solana-program", +] + [[package]] name = "axelar-message-primitives" version = "0.1.0" @@ -5409,7 +5421,10 @@ dependencies = [ name = "solana-event-forwarder" version = "0.1.0" dependencies = [ + "axelar-message-primitives", + "axelar-rkyv-encoding", "base64 0.22.1", + "bs58", "eyre", "futures", "gmp-gateway", @@ -5428,6 +5443,7 @@ version = "0.1.0" dependencies = [ "amplifier-api", "async-trait", + "axelar-executable", "axelar-rkyv-encoding", "bs58", "common-serde-utils", @@ -5436,6 +5452,7 @@ dependencies = [ "futures", "gmp-gateway", "mockall", + "num-traits", "relayer-amplifier-api-integration", "relayer-engine", "serde", @@ -7367,7 +7384,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d6259ab..ab1dbbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ effective-tx-sender = { path = "crates/effective-tx-sender" } # Solana Gateway gmp-gateway = { git = "https://github.com/eigerco/solana-axelar.git", branch = "main", features = ["no-entrypoint"] } axelar-rkyv-encoding = { git = "https://github.com/eigerco/solana-axelar.git", branch = "main" } +axelar-message-primitives = { git = "https://github.com/eigerco/solana-axelar.git", branch = "main" } axelar-executable = { git = "https://github.com/eigerco/solana-axelar.git", branch = "main" } # CLI @@ -100,6 +101,7 @@ quanta = "0.12" backoff = { version = "0.4", features = ["tokio"] } indoc = "2" itertools = "0.12" +num-traits = "0.2" # Serde serde = { version = "1", features = ["derive"] } diff --git a/crates/amplifier-api/src/client/requests.rs b/crates/amplifier-api/src/client/requests.rs index 114ac70..bcafc0a 100644 --- a/crates/amplifier-api/src/client/requests.rs +++ b/crates/amplifier-api/src/client/requests.rs @@ -69,10 +69,10 @@ impl AmplifierApiRequest for HealthCheck { pub struct GetChains<'a> { /// The name of the cain that we want to query and get the tasks for pub chain: &'a WithTrailingSlash, - #[builder(setter(strip_option), default)] + #[builder(default)] /// The earliers task id pub after: Option, - #[builder(setter(strip_option), default)] + #[builder(default)] /// The latest task id pub before: Option, /// the amount of results to return diff --git a/crates/amplifier-api/src/types.rs b/crates/amplifier-api/src/types.rs index b848ae2..c9dd30e 100644 --- a/crates/amplifier-api/src/types.rs +++ b/crates/amplifier-api/src/types.rs @@ -2,10 +2,11 @@ //! Contsructed form the following API spec [link](https://github.com/axelarnetwork/axelar-eds-mirror/blob/main/oapi/gmp/schema.yaml) pub use big_int::BigInt; -use bnum::types::U256; +pub use bnum; use chrono::{DateTime, Utc}; pub use id::*; use serde::{Deserialize, Deserializer, Serialize}; +use typed_builder::TypedBuilder; /// Represents an address as a non-empty string. pub type Address = String; @@ -122,81 +123,86 @@ pub enum MessageExecutionStatus { } /// Represents metadata associated with an event. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct EventMetadata { +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] +pub struct EventMetadata { /// tx id of the underlying event #[serde(rename = "txID", skip_serializing_if = "Option::is_none")] + #[builder(default)] pub tx_id: Option, /// timestamp of the underlying event #[serde(rename = "timestamp", skip_serializing_if = "Option::is_none")] + #[builder(default)] pub timestamp: Option>, /// sender address #[serde(rename = "fromAddress", skip_serializing_if = "Option::is_none")] + #[builder(default)] pub from_address: Option
, /// weather the event is finalized or not #[serde(default, skip_serializing_if = "Option::is_none")] + #[builder(default)] pub finalized: Option, + /// Extra fields that are dependant on the core event + #[serde(flatten)] + pub extra: T, } /// Specialized metadata for `CallEvent`. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct CallEventMetadata { - /// common event data - #[serde(flatten)] - pub base: EventMetadata, /// the message id that's responsible for the event #[serde(rename = "parentMessageID", skip_serializing_if = "Option::is_none")] + #[builder(default)] pub parent_message_id: Option, } /// Specialized metadata for `MessageApprovedEvent`. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct MessageApprovedEventMetadata { - /// common event data - #[serde(flatten)] - pub base: EventMetadata, /// The command id that corresponds to the approved message #[serde(rename = "commandID", skip_serializing_if = "Option::is_none")] + #[builder(default)] pub command_id: Option, } /// Specialized metadata for `MessageExecutedEvent`. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct MessageExecutedEventMetadata { - /// common event data - #[serde(flatten)] - pub base: EventMetadata, /// The command id that corresponds to the executed message #[serde(rename = "commandID", skip_serializing_if = "Option::is_none")] + #[builder(default)] pub command_id: Option, /// The message #[serde(rename = "childMessageIDs", skip_serializing_if = "Option::is_none")] + #[builder(default)] pub child_message_ids: Option>, } /// Specialized metadata for `CannotExecuteMessageEvent`. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct CannotExecuteMessageEventMetadata { /// The initiator of the message #[serde(rename = "fromAddress", skip_serializing_if = "Option::is_none")] + #[builder(default)] pub from_address: Option
, /// timestamp of the event #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default)] pub timestamp: Option>, } /// Represents a token amount, possibly with a token ID. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct Token { /// indicates that amount is in native token if left blank #[serde(rename = "tokenID", skip_serializing_if = "Option::is_none")] + #[builder(default)] pub token_id: Option, /// the amount in token’s denominator pub amount: BigInt, } /// Represents a cross-chain message. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct GatewayV2Message { /// the message id of a GMP call #[serde(rename = "messageID")] @@ -220,18 +226,19 @@ pub struct GatewayV2Message { } /// Base struct for events. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct EventBase { +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] +pub struct EventBase { /// The event id #[serde(rename = "eventID")] pub event_id: EventId, /// Metadata of the event #[serde(skip_serializing_if = "Option::is_none")] - pub meta: Option, + #[builder(default)] + pub meta: Option>, } /// Represents a Gas Credit Event. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct GasCreditEvent { /// Event base #[serde(flatten)] @@ -247,7 +254,7 @@ pub struct GasCreditEvent { } /// Represents a Gas Refunded Event. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct GasRefundedEvent { /// Event base #[serde(flatten)] @@ -266,11 +273,11 @@ pub struct GasRefundedEvent { } /// Represents a Call Event. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct CallEvent { /// Event base #[serde(flatten)] - pub base: EventBase, + pub base: EventBase, /// The cross chain message pub message: GatewayV2Message, /// Name of the destination chain @@ -285,26 +292,47 @@ pub struct CallEvent { } /// Represents a Message Approved Event. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct MessageApprovedEvent { /// Event base #[serde(flatten)] - pub base: EventBase, + pub base: EventBase, /// The cross chain message pub message: GatewayV2Message, /// the cost of the approval. (#of approvals in transaction / transaction cost) pub cost: Token, - /// Event metadata - #[serde(skip_serializing_if = "Option::is_none")] - pub meta: Option, +} + +/// Event that gets emitted upon signer rotatoin +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] +pub struct SignersRotatedEvent { + /// Event base + #[serde(flatten)] + pub base: EventBase, + /// the cost of the approval. (#of approvals in transaction / transaction cost) + pub cost: Token, +} + +/// Represents extra metadata that can be added to the signers rotated event +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] +pub struct SignersRotatedMetadata { + /// The hash of the new signer set + #[serde(rename = "signerHash")] + #[serde( + deserialize_with = "serde_utils::base64_decode", + serialize_with = "serde_utils::base64_encode" + )] + pub signer_hash: Vec, + /// The epoch of the new signer set + pub epoch: u64, } /// Represents a Message Executed Event. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct MessageExecutedEvent { /// Event base #[serde(flatten)] - pub base: EventBase, + pub base: EventBase, /// message id #[serde(rename = "messageID")] pub message_id: MessageId, @@ -315,17 +343,14 @@ pub struct MessageExecutedEvent { pub status: MessageExecutionStatus, /// the cost of the transaction containing the execution pub cost: Token, - /// event metadata - #[serde(skip_serializing_if = "Option::is_none")] - pub meta: Option, } /// Represents a Cannot Execute Message Event. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct CannotExecuteMessageEvent { - /// tevent base + /// Event base #[serde(flatten)] - pub base: EventBase, + pub base: EventBase, /// task id #[serde(rename = "taskItemID")] pub task_item_id: TaskItemId, @@ -333,9 +358,6 @@ pub struct CannotExecuteMessageEvent { pub reason: CannotExecuteMessageReason, /// details of the error pub details: String, - /// event metadata - #[serde(skip_serializing_if = "Option::is_none")] - pub meta: Option, } /// Represents a generic Event, which can be any of the specific event types. @@ -354,24 +376,26 @@ pub enum Event { MessageExecuted(MessageExecutedEvent), /// cannot execute message event CannotExecuteMessage(CannotExecuteMessageEvent), + /// Signers have been rotated + SignersRotated(SignersRotatedEvent), } /// Represents the request payload for posting events. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct PublishEventsRequest { /// list of events to publish pub events: Vec, } /// Base struct for publish event result items. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct PublishEventResultItemBase { /// index of the event pub index: usize, } /// Represents an accepted publish event result. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct PublishEventAcceptedResult { /// event base #[serde(flatten)] @@ -379,7 +403,7 @@ pub struct PublishEventAcceptedResult { } /// Represents an error in publishing an event. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct PublishEventErrorResult { /// event base #[serde(flatten)] @@ -401,14 +425,14 @@ pub enum PublishEventResultItem { } /// Represents the response from posting events. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct PublishEventsResult { /// The result array pub results: Vec, } /// Represents a Verify Task. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct VerifyTask { /// the cross chain message pub message: GatewayV2Message, @@ -421,7 +445,7 @@ pub struct VerifyTask { } /// Represents a Gateway Transaction Task. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct GatewayTransactionTask { /// the execute data for the gateway #[serde( @@ -433,7 +457,7 @@ pub struct GatewayTransactionTask { } /// Represents an Execute Task. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct ExecuteTask { /// the cross-chain message pub message: GatewayV2Message, @@ -449,7 +473,7 @@ pub struct ExecuteTask { } /// Represents a Refund Task. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct RefundTask { /// the cross-chain message pub message: GatewayV2Message, @@ -476,7 +500,7 @@ pub enum Task { } /// Represents an individual Task Item. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct TaskItem { /// UUID of current task pub id: TaskItemId, @@ -488,7 +512,7 @@ pub struct TaskItem { } /// Represents the response from fetching tasks. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct GetTasksResult { /// Array of tasks matching the filters pub tasks: Vec, @@ -511,11 +535,11 @@ mod big_int { /// Represents a big integer as a string matching the pattern `^(0|[1-9]\d*)$`. #[derive(Debug, PartialEq, Eq)] - pub struct BigInt(pub bnum::types::U256); + pub struct BigInt(pub bnum::types::I512); impl BigInt { /// Creates a new [`BigInt`]. #[must_use] - pub const fn new(num: U256) -> Self { + pub const fn new(num: bnum::types::I512) -> Self { Self(num) } @@ -542,7 +566,7 @@ mod big_int { D: Deserializer<'de>, { let string = String::deserialize(deserializer)?; - let number = bnum::types::U256::parse_str_radix(string.as_str(), 10); + let number = bnum::types::I512::parse_str_radix(string.as_str(), 10); Ok(Self(number)) } } @@ -623,6 +647,7 @@ mod tests { .ok(), from_address: Some("0xEA12282BaC49497793622d67e2CD43bf1065a819".to_owned()), finalized: Some(true), + extra: (), }), }, message_id: MessageId::new( @@ -676,6 +701,7 @@ mod tests { .ok(), from_address: Some("0xEA12282BaC49497793622d67e2CD43bf1065a819".to_owned()), finalized: Some(true), + extra: (), }), }, message_id: MessageId::new( @@ -827,8 +853,9 @@ mod tests { .unwrap() .into_bytes(); - let type_in_rust = PublishEventsRequest { - events: vec![Event::Call(CallEvent { + let type_in_rust = + PublishEventsRequest { + events: vec![Event::Call(CallEvent { base: EventBase { event_id: EventId::new( "0x9b447614be654eeea0c5de0319b3f2c243ab45bebd914a1f7319f4bb599d8968", @@ -844,6 +871,7 @@ mod tests { .ok(), from_address: Some("0xba76c6980428A0b10CFC5d8ccb61949677A61233".to_owned()), finalized: Some(true), + extra: CallEventMetadata { parent_message_id: None }, }), }, message: GatewayV2Message { @@ -859,7 +887,7 @@ mod tests { destination_chain: "test-avalanche".to_owned(), payload: payload_bytes(), })], - }; + }; test_serialization(&type_in_rust, reference_json); } @@ -922,7 +950,6 @@ mod tests { task_item_id: TaskItemId("550e8400-e29b-41d4-a716-446655440000".parse().unwrap()), reason: CannotExecuteMessageReason::InsufficientGas, details: "Not enough gas to execute the message".to_owned(), - meta: None, }); test_serialization(&type_in_rust, reference_json); @@ -938,4 +965,60 @@ mod tests { test_serialization(&type_in_rust, reference_json.to_vec()); } + + #[test] + fn test_deserialize_tasks_response() { + // Given JSON + let mut json_data = r#" + { + "tasks": [ + { + "id": "01924c97-a26b-7eff-8289-c3bdf2b37446", + "task": { + "executeData": "YXZhbGFuY2hlLWZ1amkweDUxZWM2MmI0YWI0YzY1OTM4YTNmZTNlMjlhN2Y4OTMwYzkyODk3MWI1ZTc5MTQxODA4ZjI3OTZlYjgxYzU4NzItMDB4NDM2NmEwNDFiQTQyMzdGOWI3NTUzQjhhNzNlOEFGMWYyZWUxRjRkMXNvbGFuYS1kZXZuZXRtZW1RdUtNR0JvdW5od1A1eXc5cW9tWU5VOTdFcWN4OWM0WHdEVW82dUdWzjepvJoJHFQzw1uyXXd48x3os6pzUWvq8CLZoHNYpLgOAAAALP///0QAAAAy////KgAAAG7///8NAAAAkP///ysAAACV////AAAAAAAAAAABAAAAAAOkQJ+JUVaKfZGWXgR9L4KOCebkpdpCe1FfVVxF9+CBMwEAtcL4qBLWMHodl+/x6UzKZ+1v6InbUUK82UTyJPGkN2Vev/IRM1aZxgGVS97+qW8mfehYwHvk69Ei0masgbXJYhwBAAAAAAAAAAAAAAAAAAAAAAAAAAAAADD///8BAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAdF8wAAAAAAB0XzAAAAAAAAEAAAAg////AQAAAAAAAAA=" + }, + "timestamp": "2024-10-02T09:37:39.608929Z", + "type": "GATEWAY_TX" + }, + { + "id": "0192d87b-faab-7dd7-9750-f6261541cf2b", + "task": { + "executeData": "YXZhbGFuY2hlLWZ1amkweDFiM2RkNmI2OTYyZmE3OWQ1NzFmNjAxMjhiMGY0OTIyNzQ4ODM1NDNjNGQ0MDg5YTdjMzZmYjQ3NGFmNDVkZWItMDB4RTJjZEI0MDQwMDM0ZTA1Yjc4RDUzYUMzMjIwNWEzNDdhMjEzYzkwNXNvbGFuYS1kZXZuZXRtZW1RdUtNR0JvdW5od1A1eXc5cW9tWU5VOTdFcWN4OWM0WHdEVW82dUdWbom8u2+OKANkGqUpecD+cRHd4C+YjMPyDduiSvwOm4QOAAAALP///0QAAAAy////KgAAAG7///8NAAAAkP///ysAAACV////AAAAAAAAAAABAAAAAAOkQJ+JUVaKfZGWXgR9L4KOCebkpdpCe1FfVVxF9+CBMwEApmIkQcyrccWA6IGBMyDrCbWfDlSpCkBEzVzAz3FFd68bCnvViSwHWdx71h/6JYKOii2fFP4haZrn2c3+Wkip6xwBAAAAAAAAAAAAAAAAAAAAAAAAAAAAADD///8BAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAdF8wAAAAAAB0XzAAAAAAAAEAAAAg////AQAAAAAAAAA=" + }, + "timestamp": "2024-10-29T13:34:16.754126Z", + "type": "GATEWAY_TX" + }, + { + "id": "0192d881-9433-7831-8c67-98eaa7727676", + "task": { + "availableGasBalance": { + "amount": "-864042" + }, + "message": { + "destinationAddress": "memQuKMGBounhwP5yw9qomYNU97Eqcx9c4XwDUo6uGV", + "messageID": "0x1b3dd6b6962fa79d571f60128b0f492274883543c4d4089a7c36fb474af45deb-0", + "payloadHash": "bom8u2+OKANkGqUpecD+cRHd4C+YjMPyDduiSvwOm4Q=", + "sourceAddress": "0xE2cdB4040034e05b78D53aC32205a347a213c905", + "sourceChain": "avalanche-fuji" + }, + "payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACWhlbGxv8J+QqgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAHo0Z7lybs+RcfP+DxXZV1GDSdTSXtHjzHyWWXizQKXPQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAE=" + }, + "timestamp": "2024-10-29T13:40:23.732532Z", + "type": "EXECUTE" + }, + { + "id": "0192d882-0310-780c-bda4-47770cce7248", + "task": { + "executeData": "YXZhbGFuY2hlLWZ1amkweGVkZTFkYWY1ZmEyZjJkZTAwNGQ0NzljMjRhMjJmNDI5YmFiOGZhOGQwMTgxOWNhNzZmN2JlN2VmYjNmNGUyZjUtMDB4RTJjZEI0MDQwMDM0ZTA1Yjc4RDUzYUMzMjIwNWEzNDdhMjEzYzkwNXNvbGFuYS1kZXZuZXRtZW1RdUtNR0JvdW5od1A1eXc5cW9tWU5VOTdFcWN4OWM0WHdEVW82dUdWbom8u2+OKANkGqUpecD+cRHd4C+YjMPyDduiSvwOm4QOAAAALP///0QAAAAy////KgAAAG7///8NAAAAkP///ysAAACV////AAAAAAAAAAABAAAAAAOkQJ+JUVaKfZGWXgR9L4KOCebkpdpCe1FfVVxF9+CBMwEAzQXXxwk7hn3x8p6/PzqdKGric/f1xyVOxChGshfK1G08/QWRtLexC6M5+aAYadUXaJkGYmYP0F0bPhYDJ0be4xwBAAAAAAAAAAAAAAAAAAAAAAAAAAAAADD///8BAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAdF8wAAAAAAB0XzAAAAAAAAEAAAAg////AQAAAAAAAAA=" + }, + "timestamp": "2024-10-29T13:40:52.1338Z", + "type": "GATEWAY_TX" + } + ] + } + "#.to_owned() + .into_bytes(); + + let _deserialized: GetTasksResult = from_slice(json_data.as_mut_slice()).unwrap(); + } } diff --git a/crates/relayer-amplifier-api-integration/src/component.rs b/crates/relayer-amplifier-api-integration/src/component.rs index d4efc51..5992300 100644 --- a/crates/relayer-amplifier-api-integration/src/component.rs +++ b/crates/relayer-amplifier-api-integration/src/component.rs @@ -6,7 +6,7 @@ use futures_concurrency::future::FutureExt as _; use quanta::Upkeep; use tracing::{info_span, Instrument as _}; -use crate::{config, healthcheck, listener, subscriber}; +use crate::{config, from_amplifier, healthcheck, to_amplifier}; /// A valid command that the Amplifier component can act upon #[derive(Debug)] @@ -83,20 +83,14 @@ impl Amplifier { healthcheck::process_healthcheck(self.config.clone(), clock, client.clone()) .instrument(info_span!("healthcheck")) .in_current_span(); - let to_amplifier_msgs = subscriber::process_msgs_to_amplifier( - self.config.clone(), - self.receiver, - client.clone(), - ) - .instrument(info_span!("subscriber")) - .in_current_span(); - let from_amplifier_msgs = listener::process_msgs_from_amplifier( - self.config.clone(), - client.clone(), - self.sender.clone(), - ) - .instrument(info_span!("listener")) - .in_current_span(); + let to_amplifier_msgs = + to_amplifier::process(self.config.clone(), self.receiver, client.clone()) + .instrument(info_span!("to amplifier")) + .in_current_span(); + let from_amplifier_msgs = + from_amplifier::process(self.config.clone(), client.clone(), self.sender.clone()) + .instrument(info_span!("from amplifier")) + .in_current_span(); // await tasks until one of them exits (fatal) healthcheck diff --git a/crates/relayer-amplifier-api-integration/src/config.rs b/crates/relayer-amplifier-api-integration/src/config.rs index 521f9e3..0c2e792 100644 --- a/crates/relayer-amplifier-api-integration/src/config.rs +++ b/crates/relayer-amplifier-api-integration/src/config.rs @@ -51,7 +51,7 @@ pub(crate) mod config_defaults { } pub(crate) const fn get_chains_poll_interval() -> Duration { - Duration::from_secs(60) + Duration::from_secs(10) } #[expect(clippy::unnecessary_wraps, reason = "fine for config defaults")] diff --git a/crates/relayer-amplifier-api-integration/src/listener.rs b/crates/relayer-amplifier-api-integration/src/from_amplifier.rs similarity index 70% rename from crates/relayer-amplifier-api-integration/src/listener.rs rename to crates/relayer-amplifier-api-integration/src/from_amplifier.rs index 3a43377..3daf6cb 100644 --- a/crates/relayer-amplifier-api-integration/src/listener.rs +++ b/crates/relayer-amplifier-api-integration/src/from_amplifier.rs @@ -1,7 +1,8 @@ use core::task::Poll; +use std::sync::{Arc, Mutex}; use amplifier_api::requests::{self, WithTrailingSlash}; -use amplifier_api::types::{ErrorResponse, GetTasksResult}; +use amplifier_api::types::{ErrorResponse, GetTasksResult, TaskItemId}; use amplifier_api::AmplifierRequest; use futures::stream::StreamExt as _; use futures::SinkExt as _; @@ -14,7 +15,7 @@ use crate::config::Config; // process incoming messages (aka `tasks`) coming in form Amplifier API // 1. periodically check if we have new tasks for processing // 2. if we do, try to act on them; spawning handlers concurrently -pub(crate) async fn process_msgs_from_amplifier( +pub(crate) async fn process( config: Config, client: amplifier_api::AmplifierApiClient, fan_out_sender: AmplifierTaskSender, @@ -25,22 +26,28 @@ pub(crate) async fn process_msgs_from_amplifier( let chain_with_trailing_slash = WithTrailingSlash::new(config.chain.clone()); let mut join_set = JoinSet::>::new(); - let mut interval_stream = - IntervalStream::new(tokio::time::interval(config.get_chains_poll_interval)); + let mut interval_stream = IntervalStream::new({ + let mut interval = tokio::time::interval(config.get_chains_poll_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + interval + }); + let latest_task = Arc::new(Mutex::new(Option::::None)); let mut task_stream = futures::stream::poll_fn(move |cx| { - // periodically query new tasks + // periodically query the API for new tasks but only if the downstream processor is ready to + // accept match interval_stream.poll_next_unpin(cx) { Poll::Ready(Some(_res)) => { let res = internal( &config, + Arc::clone(&latest_task), &chain_with_trailing_slash, &client, fan_out_sender.clone(), &mut join_set, ); - // in case we were awoken by join_set being ready, let's re-run this function, while - // returning the result of `internal`. + // in case we were awoken by join_set being ready, let's re-run this function, + // while returning the result of `internal`. cx.waker().wake_by_ref(); return Poll::Ready(Some(Ok(res))); } @@ -76,29 +83,44 @@ pub(crate) async fn process_msgs_from_amplifier( pub(crate) fn internal( config: &Config, + tasks_after: Arc>>, chain_with_trailing_slash: &WithTrailingSlash, client: &lifier_api::AmplifierApiClient, fan_out_sender: AmplifierTaskSender, to_join_set: &mut JoinSet>, ) -> eyre::Result<()> { + if !fan_out_sender.is_empty() { + // the downstream client is still processing the events, don't send any new ones + return Ok(()) + } + let tasks_after_internal = tasks_after.lock().expect("lock poisoned").clone(); let request = requests::GetChains::builder() .chain(chain_with_trailing_slash) .limit(config.get_chains_limit) + .after(tasks_after_internal) .build(); let request = client.build_request(&request)?; - to_join_set.spawn(process_task_request(request, fan_out_sender)); + to_join_set.spawn(process_task_request(request, tasks_after, fan_out_sender)); Ok(()) } async fn process_task_request( request: AmplifierRequest, + tasks_after: Arc>>, mut fan_out_sender: AmplifierTaskSender, ) -> eyre::Result<()> { let res = request.execute().await?; let res = res.json().await??; + let Some(last_task) = res.tasks.last().map(|x| x.id.clone()) else { + return Ok(()); + }; tracing::info!(task_count = ?res.tasks.len(), "received new tasks"); let mut iter = futures::stream::iter(res.tasks.into_iter().map(Ok)); fan_out_sender.send_all(&mut iter).await?; + { + let mut lock = tasks_after.lock().expect("lock poisoned"); + lock.replace(last_task); + }; Ok(()) } diff --git a/crates/relayer-amplifier-api-integration/src/lib.rs b/crates/relayer-amplifier-api-integration/src/lib.rs index 6525f3d..078b29b 100644 --- a/crates/relayer-amplifier-api-integration/src/lib.rs +++ b/crates/relayer-amplifier-api-integration/src/lib.rs @@ -2,9 +2,9 @@ mod component; mod config; +mod from_amplifier; mod healthcheck; -mod listener; -mod subscriber; +mod to_amplifier; pub use amplifier_api; pub use component::{Amplifier, AmplifierCommand, AmplifierCommandClient, AmplifierTaskReceiver}; diff --git a/crates/relayer-amplifier-api-integration/src/subscriber.rs b/crates/relayer-amplifier-api-integration/src/to_amplifier.rs similarity index 98% rename from crates/relayer-amplifier-api-integration/src/subscriber.rs rename to crates/relayer-amplifier-api-integration/src/to_amplifier.rs index 0c4c573..b5def4f 100644 --- a/crates/relayer-amplifier-api-integration/src/subscriber.rs +++ b/crates/relayer-amplifier-api-integration/src/to_amplifier.rs @@ -11,7 +11,7 @@ use tracing::{info_span, Instrument as _}; use super::component::{AmplifierCommand, CommandReceiver}; use super::config::Config; -pub(crate) async fn process_msgs_to_amplifier( +pub(crate) async fn process( config: Config, mut receiver: CommandReceiver, client: amplifier_api::AmplifierApiClient, diff --git a/crates/retrying-solana-http-sender/src/lib.rs b/crates/retrying-solana-http-sender/src/lib.rs index b1df9db..a7c2887 100644 --- a/crates/retrying-solana-http-sender/src/lib.rs +++ b/crates/retrying-solana-http-sender/src/lib.rs @@ -33,7 +33,7 @@ pub fn new_client(config: &Config) -> Arc { config.solana_http_rpc.to_string(), config.max_concurrent_rpc_requests, ); - let config = RpcClientConfig::with_commitment(CommitmentConfig::confirmed()); + let config = RpcClientConfig::with_commitment(CommitmentConfig::finalized()); let client = RpcClient::new_sender(sender, config); Arc::new(client) } diff --git a/crates/solana-axelar-relayer/src/main.rs b/crates/solana-axelar-relayer/src/main.rs index 5b28fc4..4e132f2 100644 --- a/crates/solana-axelar-relayer/src/main.rs +++ b/crates/solana-axelar-relayer/src/main.rs @@ -30,10 +30,12 @@ async fn main() { &config.solana_listener_component, &config.amplifier_component, ); + let name_on_amplifier = config.amplifier_component.chain.clone(); let (amplifier_component, amplifier_client, amplifier_task_receiver) = Amplifier::new(config.amplifier_component); let gateway_task_processor = solana_gateway_task_processor::SolanaTxPusher::new( config.solana_gateway_task_processor, + name_on_amplifier.clone(), Arc::clone(&rpc_client), amplifier_task_receiver, ); diff --git a/crates/solana-event-forwarder/Cargo.toml b/crates/solana-event-forwarder/Cargo.toml index 1264b04..5735927 100644 --- a/crates/solana-event-forwarder/Cargo.toml +++ b/crates/solana-event-forwarder/Cargo.toml @@ -15,7 +15,10 @@ relayer-engine.workspace = true gmp-gateway.workspace = true tracing.workspace = true eyre.workspace = true +bs58.workspace = true solana-sdk.workspace = true +axelar-message-primitives.workspace = true +axelar-rkyv-encoding.workspace = true [dev-dependencies] base64.workspace = true diff --git a/crates/solana-event-forwarder/src/component.rs b/crates/solana-event-forwarder/src/component.rs index 1a189fa..43a1c29 100644 --- a/crates/solana-event-forwarder/src/component.rs +++ b/crates/solana-event-forwarder/src/component.rs @@ -1,11 +1,14 @@ use core::future::Future; use core::pin::Pin; +use axelar_message_primitives::U256; +use axelar_rkyv_encoding::rkyv::{self, Deserialize as _}; use futures::{SinkExt as _, StreamExt as _}; use gmp_gateway::events::{EventContainer, GatewayEvent}; use relayer_amplifier_api_integration::amplifier_api::types::{ - CallEvent, Event, EventBase, EventId, EventMetadata, GatewayV2Message, MessageId, - PublishEventsRequest, TxId, + BigInt, CallEvent, CallEventMetadata, CommandId, Event, EventBase, EventId, EventMetadata, + GatewayV2Message, MessageApprovedEvent, MessageApprovedEventMetadata, MessageId, + PublishEventsRequest, SignersRotatedEvent, SignersRotatedMetadata, Token, TxEvent, TxId, }; use relayer_amplifier_api_integration::AmplifierCommand; use solana_sdk::pubkey::Pubkey; @@ -49,24 +52,37 @@ impl SolanaEventForwarder { while let Some(message) = self.solana_listener_client.log_receiver.next().await { let gateway_program_stack = build_program_event_stack(&match_context, &message.logs); + let total_cost = message.cost_in_lamports; - // After processing all logs, collect events from successful invocations - let events_to_send = gateway_program_stack + // Collect all successful events into a vector + let events_vec = gateway_program_stack .into_iter() .filter_map(|x| { if let ProgramInvocationState::Succeeded(events) = x { - return Some(events) + Some(events) + } else { + None } - None }) .flatten() + .collect::>(); + + // Calculate the number of events + let num_events = events_vec.len(); + + // Compute the price per event, handling the case where num_events is zero + let price_for_event = total_cost.checked_div(num_events.try_into()?).unwrap_or(0); + + // Map the events to amplifier events with the calculated price + let events_to_send = events_vec + .into_iter() .filter_map(|(log_index, event)| { - // transform gateway events to amplifier events map_gateway_event_to_amplifier_event( self.config.source_chain_name.as_str(), &event, &message, log_index, + price_for_event, ) }) .collect::>(); @@ -177,55 +193,168 @@ fn handle_success_log(program_stack: &mut Vec) { } } +#[expect( + clippy::too_many_lines, + clippy::cognitive_complexity, + reason = "easier to read when all the transformations in one place rather than scattered around" +)] fn map_gateway_event_to_amplifier_event( source_chain: &str, event: &EventContainer, message: &solana_listener::SolanaTransaction, log_index: usize, + price_per_event_in_lamports: u64, ) -> Option { use gmp_gateway::events::ArchivedGatewayEvent::{ CallContract, MessageApproved, MessageExecuted, OperatorshipTransferred, SignersRotated, }; - let parse = event.parse(); - match *parse { + let signature = message.signature.to_string(); + let event_id = EventId::new(&signature, log_index); + let tx_id = TxId(signature.clone()); + + #[expect( + clippy::little_endian_bytes, + reason = "we are guaranteed correct conversion" + )] + match *event.parse() { CallContract(ref call_contract) => { - let signature = message.signature.to_string(); - let event_id = EventId::new(&signature, log_index); - let source_address = Pubkey::new_from_array(call_contract.sender).to_string(); let message_id = MessageId::new(&signature, log_index); - let tx_id = TxId(signature); - let amplifier_event = Event::Call(CallEvent { - base: EventBase { - event_id, - meta: Some(EventMetadata { - tx_id: Some(tx_id), - timestamp: message.timestamp, - from_address: Some(source_address.clone()), - finalized: Some(true), - }), - }, - message: GatewayV2Message { - message_id, - source_chain: source_chain.to_owned(), - source_address, - destination_address: call_contract.destination_address.to_string(), - payload_hash: call_contract.payload_hash.to_vec(), - }, - destination_chain: call_contract.destination_chain.to_string(), - payload: call_contract.payload.to_vec(), - }); + let source_address = Pubkey::new_from_array(call_contract.sender).to_string(); + let amplifier_event = Event::Call( + CallEvent::builder() + .base( + EventBase::builder() + .event_id(event_id) + .meta(Some( + EventMetadata::builder() + .tx_id(Some(tx_id)) + .timestamp(message.timestamp) + .from_address(Some(source_address.clone())) + .finalized(Some(true)) + .extra(CallEventMetadata::builder().build()) + .build(), + )) + .build(), + ) + .message( + GatewayV2Message::builder() + .message_id(message_id) + .source_chain(source_chain.to_owned()) + .source_address(source_address) + .destination_address(call_contract.destination_address.to_string()) + .payload_hash(call_contract.payload_hash.to_vec()) + .build(), + ) + .destination_chain(call_contract.destination_chain.to_string()) + .payload(call_contract.payload.to_vec()) + .build(), + ); Some(amplifier_event) } SignersRotated(ref signers) => { tracing::info!(?signers, "Signers rotated"); - None + + let decoded_u256: U256 = signers + .new_epoch + .deserialize(&mut rkyv::Infallible) + .unwrap(); + let le_bytes = decoded_u256.to_le_bytes(); + let (le_u64, _) = le_bytes.split_first_chunk::<8>()?; + let epoch = u64::from_le_bytes(*le_u64); + + let amplifier_event = Event::SignersRotated( + SignersRotatedEvent::builder() + .base( + EventBase::builder() + .event_id(event_id) + .meta(Some( + EventMetadata::builder() + .tx_id(Some(tx_id)) + .timestamp(message.timestamp) + .finalized(Some(true)) + .extra( + SignersRotatedMetadata::builder() + .signer_hash(signers.new_signers_hash.to_vec()) + .epoch(epoch) + .build(), + ) + .build(), + )) + .build(), + ) + .cost( + Token::builder() + .token_id(None) + .amount(BigInt::from_u64(price_per_event_in_lamports)) + .build(), + ) + .build(), + ); + Some(amplifier_event) } MessageApproved(ref approved_message) => { + let command_id = approved_message.command_id; + let message_id = TxEvent( + String::from_utf8(approved_message.message_id.to_vec()) + .expect("message id is not a valid String"), + ); + let amplifier_event = Event::MessageApproved( + MessageApprovedEvent::builder() + .base( + EventBase::builder() + .event_id(event_id) + .meta(Some( + EventMetadata::builder() + .tx_id(Some(tx_id)) + .timestamp(message.timestamp) + .from_address(Some( + String::from_utf8(approved_message.source_address.to_vec()) + .expect("source address is not a valid string"), + )) + .finalized(Some(true)) + .extra( + MessageApprovedEventMetadata::builder() + .command_id(Some(CommandId( + bs58::encode(command_id).into_string(), + ))) + .build(), + ) + .build(), + )) + .build(), + ) + .message( + GatewayV2Message::builder() + .message_id(message_id) + .source_chain( + String::from_utf8(approved_message.source_chain.to_vec()) + .expect("invalid source chain"), + ) + .source_address( + String::from_utf8(approved_message.source_address.to_vec()) + .expect("invalid source address"), + ) + .destination_address( + Pubkey::new_from_array(approved_message.destination_address) + .to_string(), + ) + .payload_hash(approved_message.payload_hash.to_vec()) + .build(), + ) + .cost( + Token::builder() + .amount(BigInt::from_u64(price_per_event_in_lamports)) + .build(), + ) + .build(), + ); tracing::info!(?approved_message, "Message approved"); - None + Some(amplifier_event) } - MessageExecuted(ref executed_message) => { - tracing::info!(?executed_message, "Message executed"); + MessageExecuted(ref _executed_message) => { + tracing::warn!( + "current gateway event does not produce enough artifacts to relay this message" + ); None } OperatorshipTransferred(ref new_operatorship) => { diff --git a/crates/solana-gateway-task-processor/Cargo.toml b/crates/solana-gateway-task-processor/Cargo.toml index a3578a6..031bf83 100644 --- a/crates/solana-gateway-task-processor/Cargo.toml +++ b/crates/solana-gateway-task-processor/Cargo.toml @@ -24,6 +24,8 @@ tokio.workspace = true amplifier-api.workspace = true relayer-amplifier-api-integration.workspace = true effective-tx-sender.workspace = true +axelar-executable.workspace = true +num-traits.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/crates/solana-gateway-task-processor/src/component.rs b/crates/solana-gateway-task-processor/src/component.rs index 6b2d99a..31be57a 100644 --- a/crates/solana-gateway-task-processor/src/component.rs +++ b/crates/solana-gateway-task-processor/src/component.rs @@ -12,6 +12,7 @@ use futures::StreamExt as _; use gmp_gateway::commands::OwnedCommand; use gmp_gateway::state::GatewayApprovedCommand; use gmp_gateway::{hasher_impl, instructions}; +use num_traits::FromPrimitive as _; use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::rpc_response::RpcSimulateTransactionResult; use solana_sdk::instruction::{Instruction, InstructionError}; @@ -21,7 +22,7 @@ use solana_sdk::signature::{Keypair, Signature}; use solana_sdk::signer::Signer as _; use solana_sdk::transaction::TransactionError; use tokio::task::JoinSet; -use tracing::{instrument, Instrument as _}; +use tracing::{info_span, instrument, Instrument as _}; use crate::config; @@ -29,6 +30,7 @@ use crate::config; /// The transactions to push are dependant on the events that the Amplifier API will provide pub struct SolanaTxPusher { config: config::Config, + name_on_amplifier: String, rpc_client: Arc, task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver, } @@ -46,11 +48,13 @@ impl SolanaTxPusher { #[must_use] pub const fn new( config: config::Config, + name_on_amplifier: String, rpc_client: Arc, task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver, ) -> Self { Self { config, + name_on_amplifier, rpc_client, task_receiver, } @@ -120,12 +124,14 @@ impl SolanaTxPusher { let config_metadata = ConfigMetadata { gateway_root_pda, domain_separator: root_config.domain_separator, + name_of_the_solana_chain: self.name_on_amplifier.clone(), }; Ok(config_metadata) } } struct ConfigMetadata { + name_of_the_solana_chain: String, gateway_root_pda: Pubkey, domain_separator: [u8; 32], } @@ -146,6 +152,10 @@ async fn process_task( clippy::todo, reason = "fine for the time being, will be refactored later" )] + #[expect( + clippy::unreachable, + reason = "will be removed in the future, only there because of outdated gateway API" + )] match task.task { Verify(_verify_task) => todo!(), GatewayTx(gateway_transaction_task) => { @@ -191,7 +201,86 @@ async fn process_task( } } } - Execute(_execute_task) => todo!(), + Execute(execute_task) => { + // communicate with the destination program + async { + let payload = execute_task.payload; + let message = axelar_rkyv_encoding::types::Message::new( + axelar_rkyv_encoding::types::CrossChainId::new( + execute_task.message.source_chain, + execute_task.message.message_id.0, + ), + execute_task.message.source_address, + metadata.name_of_the_solana_chain.clone(), + execute_task.message.destination_address, + execute_task + .message + .payload_hash + .try_into() + .unwrap_or_default(), + ); + + // this interface will be refactored in the next gateway version + let command = OwnedCommand::ApproveMessage(message); + let (gateway_approved_message_pda, _, _) = + GatewayApprovedCommand::pda(&gateway_root_pda, &command); + let OwnedCommand::ApproveMessage(message) = command else { + unreachable!() + }; + tracing::debug!(?gateway_approved_message_pda, "approved message PDA"); + + let ix = axelar_executable::construct_axelar_executable_ix( + message, + payload, + gateway_approved_message_pda, + gateway_root_pda, + )?; + let send_transaction_result = + send_transaction(solana_rpc_client, keypair, ix).await; + + let Err(err) = send_transaction_result else { + // tx was successfully executed + return Ok(()) + }; + + // tx was not executed -- inspect root cause + let ComputeBudgetError::SimulationError(ref simulation) = err else { + // some kid of irrecoverable error + return Err(eyre::Error::from(err)) + }; + tracing::warn!(?simulation.err,"simulation err"); + + // NOTE: this error makes it look like the command is not approved, but in fact it's + // the error that is returned if a message was approved & executed. + // This will be altered in the future Gateway impl + let command_already_executed_error = simulation + .err + .as_ref() + .and_then(|err| { + if let TransactionError::InstructionError( + 1, // <-- 0th idx is the ComputeBudget prefix + InstructionError::Custom(err_code), + ) = *err + { + return gmp_gateway::error::GatewayError::from_u32(err_code) + } + None + }) + .is_some_and(|received_err| { + gmp_gateway::error::GatewayError::GatewayCommandNotApproved == received_err + }); + if command_already_executed_error { + tracing::warn!("message already executed"); + return eyre::Result::Ok(()); + } + + // Return the simulation error + Err(eyre::Error::from(err)) + } + .instrument(info_span!("execute task")) + .in_current_span() + .await?; + } Refund(_refund_task) => todo!(), }; @@ -211,7 +300,7 @@ struct ProcessMessages<'a> { } impl<'a> ProcessMessages<'a> { - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, name = "approve messages flow")] async fn execute(&self) -> eyre::Result<()> { let execute_data_pda = InitializeApproveMessagesExecuteData::builder() .signer(self.signer) @@ -222,6 +311,8 @@ impl<'a> ProcessMessages<'a> { .keypair(self.keypair) .build() .execute() + .instrument(info_span!("init execute data")) + .in_current_span() .await?; // Compose messages @@ -245,7 +336,7 @@ impl<'a> ProcessMessages<'a> { let Err(err) = send_transaction_result else { // tx was successfully executed - return Ok(execute_data_pda) + return Ok(approved_message_pda) }; // tx was not executed -- inspect root cause @@ -254,6 +345,7 @@ impl<'a> ProcessMessages<'a> { return Err(eyre::Error::from(err)) }; + // this is the error for when a message PDA was already registered if matches!( simulation.err, Some(TransactionError::InstructionError( @@ -270,7 +362,9 @@ impl<'a> ProcessMessages<'a> { .instrument(tracing::info_span!( "registering command PDA", ?approved_message_pda - )); + )) + .in_current_span(); + Some(output) }) .collect::>(); @@ -290,6 +384,7 @@ impl<'a> ProcessMessages<'a> { .keypair(self.keypair) .build() .execute() + .instrument(info_span!("verify signatures")) .await?; Ok(()) diff --git a/crates/solana-listener/src/component.rs b/crates/solana-listener/src/component.rs index de447be..7ae0d8e 100644 --- a/crates/solana-listener/src/component.rs +++ b/crates/solana-listener/src/component.rs @@ -25,6 +25,8 @@ pub struct SolanaTransaction { pub logs: Vec, /// the slot number of the tx pub slot: u64, + /// How expensive was the transaction expressed in lamports + pub cost_in_lamports: u64, } pub(crate) type MessageSender = futures::channel::mpsc::UnboundedSender; diff --git a/crates/solana-listener/src/component/log_processor.rs b/crates/solana-listener/src/component/log_processor.rs index 014292b..d31cd14 100644 --- a/crates/solana-listener/src/component/log_processor.rs +++ b/crates/solana-listener/src/component/log_processor.rs @@ -37,13 +37,13 @@ pub(crate) async fn fetch_and_send( Ok(()) } -async fn fetch_logs( +pub(crate) async fn fetch_logs( signature: Signature, rpc_client: &RpcClient, ) -> eyre::Result { use solana_client::rpc_config::RpcTransactionConfig; let config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Base64), + encoding: Some(UiTransactionEncoding::Binary), commitment: Some(CommitmentConfig::confirmed()), max_supported_transaction_version: Some(0), }; @@ -58,7 +58,7 @@ async fn fetch_logs( let meta = transaction_with_meta .meta - .ok_or_eyre("metadat not included with logs")?; + .ok_or_eyre("metadata not included with logs")?; let OptionSerializer::Some(logs) = meta.log_messages else { eyre::bail!("logs not included"); @@ -72,6 +72,7 @@ async fn fetch_logs( logs, slot, timestamp: block_time.and_then(|secs| DateTime::from_timestamp(secs, 0)), + cost_in_lamports: meta.fee, }; Ok(transaction) diff --git a/crates/solana-listener/src/component/signature_realtime_scanner.rs b/crates/solana-listener/src/component/signature_realtime_scanner.rs index e5947ea..6a0de9b 100644 --- a/crates/solana-listener/src/component/signature_realtime_scanner.rs +++ b/crates/solana-listener/src/component/signature_realtime_scanner.rs @@ -1,7 +1,10 @@ +use core::pin::Pin; use core::str::FromStr as _; use std::sync::Arc; -use futures::{SinkExt as _, StreamExt as _}; +use futures::stream::{poll_fn, FuturesUnordered, StreamExt as _}; +use futures::task::Poll; +use futures::{SinkExt as _, Stream as _}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::rpc_config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter}; use solana_sdk::commitment_config::CommitmentConfig; @@ -9,6 +12,7 @@ use solana_sdk::signature::Signature; use tracing::{info_span, Instrument as _}; use super::MessageSender; +use crate::component::log_processor::fetch_logs; use crate::component::signature_batch_scanner; use crate::SolanaTransaction; @@ -20,8 +24,12 @@ pub(crate) async fn process_realtime_logs( mut signature_sender: MessageSender, ) -> Result<(), eyre::Error> { let gateway_program_address = config.gateway_program_address; - loop { - tracing::info!(endpoint =? config.solana_ws.as_str(), ?gateway_program_address, "init new WS connection"); + 'outer: loop { + tracing::info!( + endpoint = ?config.solana_ws.as_str(), + ?gateway_program_address, + "init new WS connection" + ); let client = solana_client::nonblocking::pubsub_client::PubsubClient::new(config.solana_ws.as_str()) .await?; @@ -33,59 +41,98 @@ pub(crate) async fn process_realtime_logs( }, ) .await?; - let mut ws_stream = ws_stream - .filter(|item| { - // only keep non-error items - core::future::ready(item.value.err.is_none()) - }) - .filter_map(|item| { - // parse the returned data into a format we can forward to other components - core::future::ready({ - Signature::from_str(&item.value.signature) - .map(|signature| { - SolanaTransaction { - // timestamp not available via the the WS API - timestamp: None, - signature, - logs: item.value.logs, - slot: item.context.slot, - } - }) - .ok() - }) - }) - .inspect(|item| { - tracing::info!(item = ?item.signature, "found tx"); - }) - .boxed(); + let mut ws_stream = ws_stream.fuse(); - // It takes a few seconds for the Solana node to accept the WS connection. - // During this time we might have already missed a few signatures. - // We attempt to fetch the diff here. - // This will only trigger upon the very first WS returned signature - let next = ws_stream.next().await; - let Some(t2_signature) = next else { - // reconnect if connection dropped - continue; - }; + 'first: loop { + // Get the first item from the ws_stream + let first_item = ws_stream.next().await; + let Some(first_item) = first_item else { + // Reconnect if connection dropped + continue 'outer; + }; + // Process the first item + if first_item.value.err.is_none() { + if let Ok(sig) = Signature::from_str(&first_item.value.signature) { + let t2_signature = fetch_logs(sig, &rpc_client).await?; - signature_batch_scanner::fetch_batches_in_range( - &config, - Arc::clone(&rpc_client), - &signature_sender, - Some(t2_signature.signature), - latest_processed_signature, - ) - .instrument(info_span!("fetching missed signatures")) - .await?; - // forward the tx data to be processed - signature_sender.send(t2_signature).await?; + // Fetch missed batches + signature_batch_scanner::fetch_batches_in_range( + &config, + Arc::clone(&rpc_client), + &signature_sender, + Some(t2_signature.signature), + latest_processed_signature, + ) + .instrument(info_span!("fetching missed signatures")) + .await?; + // Send the first item + signature_sender.send(t2_signature).await?; + break 'first; + } + } + } + + // Create the FuturesUnordered + let mut fetch_futures = FuturesUnordered::new(); - // start processing the rest of the messages + // Manual polling using poll_fn tracing::info!("waiting realtime logs"); - while let Some(item) = ws_stream.next().await { - signature_sender.send(item).await?; + + let rpc_client = Arc::clone(&rpc_client); + let mut merged_stream = poll_fn(move |cx| { + // Poll fetch_futures + let poll_next_unpin = fetch_futures.poll_next_unpin(cx); + match poll_next_unpin { + Poll::Ready(Some(fetch_result)) => { + cx.waker().wake_by_ref(); + return Poll::Ready(Some(fetch_result)) + } + Poll::Ready(None) | Poll::Pending => {} // No more futures to poll + } + + // Poll ws_stream + match Pin::new(&mut ws_stream).poll_next(cx) { + Poll::Ready(Some(item)) => { + if item.value.err.is_none() { + if let Ok(sig) = Signature::from_str(&item.value.signature) { + // Push fetch_logs future into fetch_futures + let rpc_client = Arc::clone(&rpc_client); + let fetch_future = async move { + let log_item = fetch_logs(sig, &rpc_client).await?; + tracing::info!(item = ?log_item.signature, "found tx"); + eyre::Result::Ok(log_item) + }; + fetch_futures.push(fetch_future); + } + } + // We return Pending here because the actual result will come from + // fetch_futures + cx.waker().wake_by_ref(); + Poll::Pending + } + Poll::Ready(None) => { + // WS stream ended + tracing::warn!("websocket stream exited"); + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } + }); + + // Process the merged stream + while let Option::>::Some(result) = + merged_stream.next().await + { + match result { + Ok(log_item) => { + // Send the fetched log item + signature_sender.send(log_item).await?; + } + Err(err) => { + // Handle error in fetch_logs + tracing::error!(?err, "Error in merged stream"); + } + } } - tracing::warn!("websocket stream exited"); } } diff --git a/deny.toml b/deny.toml index 627b394..e905e63 100644 --- a/deny.toml +++ b/deny.toml @@ -140,6 +140,10 @@ license-files = [] crate = "axelar-message-primitives" expression = "MIT" license-files = [] +[[licenses.clarify]] +crate = "axelar-executable" +expression = "MIT" +license-files = [] [[licenses.clarify]] # The package spec the clarification applies to diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 60cf0e9..ab77334 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -10,7 +10,10 @@ struct Args { #[derive(Subcommand, Debug)] enum Commands { - Deny, + Deny { + #[clap(last = true)] + args: Vec, + }, Test { #[clap(short, long, default_value_t = false)] coverage: bool, @@ -29,10 +32,10 @@ fn main() -> eyre::Result<()> { let args = Args::parse(); match args.command { - Commands::Deny => { + Commands::Deny { args } => { println!("cargo deny"); cmd!(sh, "cargo install cargo-deny").run()?; - cmd!(sh, "cargo deny check").run()?; + cmd!(sh, "cargo deny check {args...}").run()?; } Commands::Test { args, coverage } => { println!("cargo test");