From d65536de504b47d54d05eb5f7dca496a1876848a Mon Sep 17 00:00:00 2001 From: Roberts Pumpurs <33699735+roberts-pumpurs@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:07:35 +0300 Subject: [PATCH] feat: handle `GatewayTransactionTask` by sending txs to Solana (#20) * feat: can send txs to solana * docs: adr doc for solana tx sending --- .cargo/config.toml | 3 - Cargo.lock | 583 ++++++++++++------ Cargo.toml | 12 + crates/amplifier-api/src/client.rs | 25 +- crates/amplifier-api/src/client/requests.rs | 6 +- crates/amplifier-api/src/error.rs | 4 - crates/amplifier-api/src/types.rs | 6 +- crates/common-serde-utils/Cargo.toml | 5 + crates/common-serde-utils/src/lib.rs | 27 +- crates/effective-tx-sender/Cargo.toml | 20 + crates/effective-tx-sender/src/lib.rs | 219 +++++++ .../src/component.rs | 43 +- .../src/config.rs | 2 +- .../src/lib.rs | 2 +- .../src/listener.rs | 32 +- .../src/subscriber.rs | 6 +- crates/relayer-engine/src/config.rs | 4 - crates/relayer-engine/src/lib.rs | 2 +- crates/retrying-solana-http-sender/Cargo.toml | 25 + .../src/lib.rs} | 48 +- crates/solana-axelar-relayer/Cargo.toml | 2 + crates/solana-axelar-relayer/src/main.rs | 47 +- crates/solana-axelar-relayer/src/telemetry.rs | 4 +- .../solana-event-forwarder/src/component.rs | 8 +- .../solana-gateway-task-processor/Cargo.toml | 35 ++ .../src/component.rs | 570 +++++++++++++++++ .../src/config.rs | 163 +++++ .../solana-gateway-task-processor/src/lib.rs | 7 + crates/solana-listener/Cargo.toml | 7 +- crates/solana-listener/src/component.rs | 31 +- .../src/component/log_processor.rs | 4 +- .../src/component/signature_batch_scanner.rs | 2 +- .../component/signature_realtime_scanner.rs | 6 +- crates/solana-listener/src/config.rs | 35 +- crates/solana-listener/src/lib.rs | 1 - doc/adr/0004-solana-tx-sending.md | 50 ++ rust-toolchain.toml | 2 +- 37 files changed, 1705 insertions(+), 343 deletions(-) create mode 100644 crates/effective-tx-sender/Cargo.toml create mode 100644 crates/effective-tx-sender/src/lib.rs create mode 100644 crates/retrying-solana-http-sender/Cargo.toml rename crates/{solana-listener/src/retrying_http_sender.rs => retrying-solana-http-sender/src/lib.rs} (63%) create mode 100644 crates/solana-gateway-task-processor/Cargo.toml create mode 100644 crates/solana-gateway-task-processor/src/component.rs create mode 100644 crates/solana-gateway-task-processor/src/config.rs create mode 100644 crates/solana-gateway-task-processor/src/lib.rs create mode 100644 doc/adr/0004-solana-tx-sending.md diff --git a/.cargo/config.toml b/.cargo/config.toml index eb73198..e1d3dfc 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,9 +1,6 @@ [net] git-fetch-with-cli = true -[build] -rustflags = ["-Z", "threads=8", "-Ctarget-cpu=native", "-Zthreads=10"] - [alias] xtask = "run --package xtask --" diff --git a/Cargo.lock b/Cargo.lock index 75fcd98..b14b16f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -133,7 +133,7 @@ dependencies = [ "bytes", "cfg-if", "const-hex", - "derive_more", + "derive_more 0.99.18", "hex-literal", "itoa", "k256", @@ -147,9 +147,9 @@ dependencies = [ [[package]] name = "alloy-rlp" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26154390b1d205a4a7ac7352aa2eb4f81f391399d4e2f546fb81a2f8bb383f62" +checksum = "da0822426598f95e45dd1ea32a738dac057529a709ee645fcc516ffa4cbde08f" dependencies = [ "arrayvec", "bytes", @@ -166,7 +166,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -182,7 +182,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", "syn-solidity", "tiny-keccak", ] @@ -198,7 +198,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", "syn-solidity", ] @@ -310,9 +310,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.89" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8" [[package]] name = "ark-bn254" @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.15" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e26a9844c659a2a293d239c7910b752f8487fe122c6c8bd1659bf85a6507c302" +checksum = "0cb8f1d480b0ea3783ab015936d2a55c87e219676f0c0b7dec61494043f21857" dependencies = [ "brotli", "flate2", @@ -607,7 +607,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -618,7 +618,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -646,7 +646,7 @@ checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -951,7 +951,7 @@ dependencies = [ "proc-macro-crate 3.2.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", "syn_derive", ] @@ -1068,7 +1068,7 @@ checksum = "bcfcc3cd946cb52f0bbfdbbcfa2f4e24f75ebb6c0e1002f7c25904fada18b9ec" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -1079,9 +1079,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.2" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" dependencies = [ "serde", ] @@ -1098,9 +1098,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.30" +version = "1.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" +checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f" dependencies = [ "jobserver", "libc", @@ -1212,7 +1212,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -1281,6 +1281,8 @@ name = "common-serde-utils" version = "0.1.0" dependencies = [ "serde", + "solana-sdk", + "tracing", ] [[package]] @@ -1356,6 +1358,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "cordyceps" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec10f0a762d93c4498d2e97a333805cb6250d60bead623f71d8034f9a4152ba3" +dependencies = [ + "loom", + "tracing", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -1509,7 +1521,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -1533,7 +1545,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -1544,7 +1556,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -1626,7 +1638,27 @@ dependencies = [ "proc-macro2", "quote", "rustc_version 0.4.1", - "syn 2.0.79", + "syn 2.0.85", +] + +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.85", ] [[package]] @@ -1682,7 +1714,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -1705,9 +1737,15 @@ checksum = "a6cbae11b3de8fce2a456e8ea3dada226b35fe791f0dc1d360c0941f0bb681f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dunce" version = "1.0.5" @@ -1793,6 +1831,19 @@ dependencies = [ "sha2 0.10.8", ] +[[package]] +name = "effective-tx-sender" +version = "0.1.0" +dependencies = [ + "eyre", + "futures", + "itertools 0.12.1", + "solana-client", + "solana-sdk", + "thiserror", + "tracing", +] + [[package]] name = "either" version = "1.13.0" @@ -1826,9 +1877,9 @@ checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" [[package]] name = "encoding_rs" -version = "0.8.34" +version = "0.8.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" dependencies = [ "cfg-if", ] @@ -1850,7 +1901,7 @@ checksum = "a1ab991c1362ac86c61ab6f556cff143daa22e5a15e4e189df818b2fd19fe65b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -2033,6 +2084,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.0.34" @@ -2067,6 +2124,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "funty" version = "2.0.0" @@ -2090,10 +2153,11 @@ dependencies = [ [[package]] name = "futures-buffered" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fa130f3777d0d4b0993653c20bc433026d3290627693c4ed1b18dd237357ab" +checksum = "34acda8ae8b63fbe0b2195c998b180cff89a8212fb2622a78b572a9f1c6f7684" dependencies = [ + "cordyceps", "diatomic-waker", "futures-core", "pin-project-lite", @@ -2111,11 +2175,11 @@ dependencies = [ [[package]] name = "futures-concurrency" -version = "7.6.1" +version = "7.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b14ac911e85d57c5ea6eef76d7b4d4a3177ecd15f4bea2e61927e9e3823e19f" +checksum = "d9b724496da7c26fcce66458526ce68fc2ecf4aaaa994281cf322ded5755520c" dependencies = [ - "bitvec", + "fixedbitset", "futures-buffered", "futures-core", "futures-lite", @@ -2170,7 +2234,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -2209,6 +2273,19 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2552,9 +2629,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.30" +version = "0.14.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" +checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85" dependencies = [ "bytes", "futures-channel", @@ -2576,9 +2653,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" dependencies = [ "bytes", "futures-channel", @@ -2603,7 +2680,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper 0.14.30", + "hyper 0.14.31", "rustls 0.21.12", "tokio", "tokio-rustls 0.24.1", @@ -2617,9 +2694,9 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", - "rustls 0.23.14", + "rustls 0.23.15", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -2633,7 +2710,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "pin-project-lite", "tokio", @@ -2651,7 +2728,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.1", - "hyper 1.4.1", + "hyper 1.5.0", "pin-project-lite", "socket2", "tokio", @@ -2981,9 +3058,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.159" +version = "0.2.161" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" +checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" [[package]] name = "libm" @@ -3061,6 +3138,19 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "loom" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "matchers" version = "0.1.0" @@ -3163,6 +3253,32 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockall" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c28b3fb6d753d28c20e826cd46ee611fda1cf3cde03a443a974043247c065a" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "341014e7f530314e9a1fdbc7400b244efea7122662c96bfa248c31da5bfb2020" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.85", +] + [[package]] name = "nix" version = "0.28.0" @@ -3255,7 +3371,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -3328,7 +3444,7 @@ dependencies = [ "proc-macro-crate 3.2.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -3615,29 +3731,29 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf123a161dde1e524adf36f90bc5d8d3462824a9c43553ad07a8183161189ec" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4502d8515ca9f32f1fb543d987f63d95a14934883db45bdb48060b6b69257f8" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] name = "pin-project-lite" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" [[package]] name = "pin-utils" @@ -3700,6 +3816,32 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "predicates" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9086cc7640c29a356d1a29fd134380bee9d8f79a17410aa76e7ad295f42c97" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae8177bee8e75d6846599c6b9ff679ed51e882816914eec639944d7c9aa11931" + +[[package]] +name = "predicates-tree" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41b740d195ed3166cd147c8047ec98db0e22ec019eb8eeb76d343b795304fb13" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_assertions" version = "1.4.1" @@ -3768,9 +3910,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.87" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a" +checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" dependencies = [ "unicode-ident", ] @@ -3826,7 +3968,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -3907,7 +4049,7 @@ dependencies = [ "quinn-proto 0.11.8", "quinn-udp 0.5.5", "rustc-hash 2.0.0", - "rustls 0.23.14", + "rustls 0.23.15", "socket2", "thiserror", "tokio", @@ -3942,7 +4084,7 @@ dependencies = [ "rand 0.8.5", "ring 0.17.8", "rustc-hash 2.0.0", - "rustls 0.23.14", + "rustls 0.23.15", "slab", "thiserror", "tinyvec", @@ -4134,14 +4276,14 @@ checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] name = "regex" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", @@ -4241,7 +4383,7 @@ dependencies = [ "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.30", + "hyper 0.14.31", "hyper-rustls 0.24.2", "ipnet", "js-sys", @@ -4285,7 +4427,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-rustls 0.27.3", "hyper-util", "ipnet", @@ -4296,7 +4438,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn 0.11.5", - "rustls 0.23.14", + "rustls 0.23.15", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -4331,6 +4473,24 @@ dependencies = [ "thiserror", ] +[[package]] +name = "retrying-solana-http-sender" +version = "0.1.0" +dependencies = [ + "async-trait", + "backoff", + "serde", + "serde_json", + "solana-client", + "solana-rpc-client", + "solana-rpc-client-api", + "solana-sdk", + "tokio", + "tracing", + "typed-builder", + "url", +] + [[package]] name = "rfc6979" version = "0.4.0" @@ -4468,7 +4628,7 @@ dependencies = [ "regex", "relative-path", "rustc_version 0.4.1", - "syn 2.0.79", + "syn 2.0.85", "unicode-ident", ] @@ -4590,9 +4750,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.14" +version = "0.23.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" +checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993" dependencies = [ "once_cell", "ring 0.17.8", @@ -4685,26 +4845,26 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "scale-info" -version = "2.11.3" +version = "2.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eca070c12893629e2cc820a9761bedf6ce1dcddc9852984d1dc734b8bd9bd024" +checksum = "1aa7ffc1c0ef49b0452c6e2986abf2b07743320641ffd5fc63d552458e3b779b" dependencies = [ "cfg-if", - "derive_more", + "derive_more 1.0.0", "parity-scale-codec", "scale-info-derive", ] [[package]] name = "scale-info-derive" -version = "2.11.3" +version = "2.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d35494501194174bda522a32605929eefc9ecf7e0a326c26db1fdd85881eb62" +checksum = "46385cc24172cf615450267463f937c10072516359b3ff1cb24228a4a08bf951" dependencies = [ "proc-macro-crate 3.2.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.85", ] [[package]] @@ -4716,6 +4876,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -4739,7 +4905,7 @@ checksum = "1db149f81d46d2deba7cd3c50772474707729550221e69588478ebf9ada425ae" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -4821,9 +4987,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.210" +version = "1.0.213" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +checksum = "3ea7893ff5e2466df8d720bb615088341b295f849602c6956047f8f80f0e9bc1" dependencies = [ "serde_derive", ] @@ -4848,20 +5014,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.210" +version = "1.0.213" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] name = "serde_json" -version = "1.0.128" +version = "1.0.132" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" +checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" dependencies = [ "itoa", "memchr", @@ -4909,7 +5075,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -5080,9 +5246,9 @@ dependencies = [ [[package]] name = "solana-account-decoder" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d87c6ef8c13eb759fa8d887e12c67afd851799050b6afd501a27726551f52e" +checksum = "ad3f0b04a6f8d8778488fe2c3e77e97866d8b61378c8a4d91e188e1444f98186" dependencies = [ "Inflector", "base64 0.22.1", @@ -5119,8 +5285,10 @@ dependencies = [ "pretty_assertions", "relayer-amplifier-api-integration", "relayer-engine", + "retrying-solana-http-sender", "serde", "solana-event-forwarder", + "solana-gateway-task-processor", "solana-listener", "temp-env", "tokio", @@ -5131,9 +5299,9 @@ dependencies = [ [[package]] name = "solana-clap-utils" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f9709683a4d480a0185827292405cad0b6f414abaa479c7d1dfe5e2194aeec8" +checksum = "855bb216b0862bc10ae515b1400a3f677527881f3e3df51e24bb1ba0bd514fcc" dependencies = [ "chrono", "clap 2.34.0", @@ -5148,9 +5316,9 @@ dependencies = [ [[package]] name = "solana-client" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67169e4f1faabb717ce81b5ca93960da21e3ac5c9b75cb6792f9b3ce38db459f" +checksum = "e17a2e3cf4aa6b7ed64d33ea656507af4e754832ad3c8733fab6ff5eeb8b4249" dependencies = [ "async-trait", "bincode", @@ -5181,9 +5349,9 @@ dependencies = [ [[package]] name = "solana-compute-budget" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5acde49a883ca3e099a8050ad8321ea56b02041995dadcf84b0dab14561cc34a" +checksum = "29e90b1be747a3f2373c8b7f2e1bd4291249fc588647524789ff877cc57e7ad8" dependencies = [ "rustc_version 0.4.1", "solana-sdk", @@ -5191,9 +5359,9 @@ dependencies = [ [[package]] name = "solana-config-program" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f638e44fb308bdc1ce99eb0fee194b2cb212917b258999cdb4a8b056d48973d4" +checksum = "193009c562956c9672cb1fd6439a444367b1b0fd67f20f435ab6a4026e5ed187" dependencies = [ "bincode", "chrono", @@ -5205,9 +5373,9 @@ dependencies = [ [[package]] name = "solana-connection-cache" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fd01a4d43b780996970cb3669946b002f71d34e6a26a19bd6d2a74513ecc0aa" +checksum = "fe5a15ec1f3f9860e6171e5dd2d497974e7167f0ba15c8215d448b30f3bee12f" dependencies = [ "async-trait", "bincode", @@ -5226,9 +5394,9 @@ dependencies = [ [[package]] name = "solana-curve25519" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44b61d8eda3319deca3627e3eb3970ce2ad179ad39c106d6c003d06c90e3031d" +checksum = "e0d6ca4dc435e6048f871424cd5ace2aeb06c2c82229d684903b03fb351072d4" dependencies = [ "bytemuck", "bytemuck_derive", @@ -5254,11 +5422,37 @@ dependencies = [ "tracing", ] +[[package]] +name = "solana-gateway-task-processor" +version = "0.1.0" +dependencies = [ + "amplifier-api", + "async-trait", + "axelar-rkyv-encoding", + "bs58", + "common-serde-utils", + "effective-tx-sender", + "eyre", + "futures", + "gmp-gateway", + "mockall", + "relayer-amplifier-api-integration", + "relayer-engine", + "serde", + "serde_json", + "solana-client", + "solana-sdk", + "solana-transaction-status", + "tokio", + "tracing", + "typed-builder", +] + [[package]] name = "solana-inline-spl" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f6614014b976112fb6c9bf259f87c6659b8fdea628c656639e02211324d2b34" +checksum = "f94111dea93785d063d62894046f2a1c2fd822f107aa59c82272a34d2b98cfd4" dependencies = [ "bytemuck", "rustc_version 0.4.1", @@ -5269,8 +5463,6 @@ dependencies = [ name = "solana-listener" version = "0.1.0" dependencies = [ - "async-trait", - "backoff", "chrono", "common-serde-utils", "eyre", @@ -5278,10 +5470,7 @@ dependencies = [ "gmp-gateway", "relayer-engine", "serde", - "serde_json", "solana-client", - "solana-rpc-client", - "solana-rpc-client-api", "solana-sdk", "solana-transaction-status", "tokio", @@ -5292,9 +5481,9 @@ dependencies = [ [[package]] name = "solana-logger" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6b996befdb2bdbd816524fc7afe0e158fced33ff61c36ab29ae803c0462455d" +checksum = "7369915bd82d09dcb14a5ebba3431c4e54f19f2de0521ac56627d38016d45408" dependencies = [ "env_logger", "lazy_static", @@ -5303,9 +5492,9 @@ dependencies = [ [[package]] name = "solana-measure" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79d44cdbcf9e1489564cdae1cd92b8806b0ee89d05d36a58fef8c0d293ea7c2a" +checksum = "8b583a9a2a43e02231636662663d7804b70c1c0f3a42b1a641ab5d5964bc8ebf" dependencies = [ "log", "solana-sdk", @@ -5313,9 +5502,9 @@ dependencies = [ [[package]] name = "solana-metrics" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68979964a3a004f1af4f1571814817e7e050ef4c1b2a1bdaa3ff35e980072d69" +checksum = "68602687aeb613bd73933f27ce11b1ad58896a6a2fa71497fecae2987e614f61" dependencies = [ "crossbeam-channel", "gethostname", @@ -5328,9 +5517,9 @@ dependencies = [ [[package]] name = "solana-net-utils" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44bb419eb9293a277982cf14a58772e9b9ab30ff6f9421bc4ac0826d40122760" +checksum = "44f41b767c25ec128957b73414efe0797877240e1a9f033e910ab3bcd54748be" dependencies = [ "bincode", "clap 3.2.25", @@ -5351,9 +5540,9 @@ dependencies = [ [[package]] name = "solana-perf" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00c4128122787a61d8f94fdaa04cb71b3dbb017d9939ac4d632264c55ec345de" +checksum = "e4e7809a71f40bc1d551c86203f11c463926dcfe5a892e74357de618aeffe13a" dependencies = [ "ahash 0.8.11", "bincode", @@ -5378,9 +5567,9 @@ dependencies = [ [[package]] name = "solana-program" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29249ce5b5c7bd018013adbb97439b0b1b986f16bb07c54db28f82e97baaa2f1" +checksum = "2625a23c3813b620141ee447819b08d1b9a5f1c69a309754834e3f35798a21fb" dependencies = [ "ark-bn254", "ark-ec", @@ -5424,9 +5613,9 @@ dependencies = [ [[package]] name = "solana-program-runtime" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "948bfeb10ba38b55a8b2db2de8ccfa8f57b44b6d73c98e8e0de8b10f10ce043b" +checksum = "9f6f48b286f452feb1f2151ffa4243a382affc126d90c60ad804d2759175b996" dependencies = [ "base64 0.22.1", "bincode", @@ -5453,9 +5642,9 @@ dependencies = [ [[package]] name = "solana-pubsub-client" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ce9fa94ef00f7dfec749fc6835a4c36e8cfa2166c4a80736af1b49ef5bcd8e" +checksum = "f03618c313746b6c69ff04c00d210f8c4549b8c9e172ac90eb1d003e810e5353" dependencies = [ "crossbeam-channel", "futures-util", @@ -5478,9 +5667,9 @@ dependencies = [ [[package]] name = "solana-quic-client" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00764a5e5e36a94515d05f771e869c920671f5753cfc71ebf366546c891450b4" +checksum = "6f54213cdc6f6a869f86603385563a78bc89012cfeb921e0a7ba8710d32f65fe" dependencies = [ "async-mutex", "async-trait", @@ -5504,9 +5693,9 @@ dependencies = [ [[package]] name = "solana-rayon-threadlimit" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33119350281687a17a8321f897dfd27009fc862711ee6555c26beb5b84d6c08c" +checksum = "369b4d9a8e15906219ee60c1ce6c336ad04fe1217c9c1d625e1960ba5b174d24" dependencies = [ "lazy_static", "num_cpus", @@ -5514,9 +5703,9 @@ dependencies = [ [[package]] name = "solana-remote-wallet" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aba8725448426110b9ac20d7256f43aad1ea46458fe35c63d174cf962af4a9d0" +checksum = "d9d30c4d54739ca1f56113699c086d2dd5e7423eb5f894d0312757cace61e193" dependencies = [ "console", "dialoguer", @@ -5533,9 +5722,9 @@ dependencies = [ [[package]] name = "solana-rpc-client" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd96f6a505a492544ee2459b608af3fe07da6c8ffc0bd842489e836ac2c3fce6" +checksum = "a12219fb033d7de4e0fe6c613d7ebea5e457d2ca71890ead6c2a3cb5e4534275" dependencies = [ "async-trait", "base64 0.22.1", @@ -5560,9 +5749,9 @@ dependencies = [ [[package]] name = "solana-rpc-client-api" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d04f79b88c53b675d5d885d498e7a7e6a4fdd60ffe56e543faddb5d94c6094ba" +checksum = "6dc975656b2bd12d9a0d3b37d7188be83d5d40928d48c50450f4e1adab0eb795" dependencies = [ "anyhow", "base64 0.22.1", @@ -5584,9 +5773,9 @@ dependencies = [ [[package]] name = "solana-rpc-client-nonce-utils" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42d46d162566cbf7d6eb2ae369fbb8a934bc846906cbe959aed9123c1ac92b85" +checksum = "d251b055f02d2fbbf30da82f10229e2bfe2c4ced7724b8d1311b6184a83589bb" dependencies = [ "clap 2.34.0", "solana-clap-utils", @@ -5597,9 +5786,9 @@ dependencies = [ [[package]] name = "solana-sdk" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24dae5bda29858add4df3a6c5eaf71c0d2042ca3317a9fd81d7e9f436278a1fe" +checksum = "6bec7d84513d65700740755c512a0d58b9f60dbbce683379c399d2c357b3ceb0" dependencies = [ "bincode", "bitflags 2.6.0", @@ -5646,15 +5835,15 @@ dependencies = [ [[package]] name = "solana-sdk-macro" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "704c9cacc61a5b9b6f717773cf4b3b45a4239dc7fa8c585258fceaf9b8e1cb94" +checksum = "93a5a1eabc890415d326707afe62cd7a2009236e8d899c1519566fc8f7e3977b" dependencies = [ "bs58", "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -5665,9 +5854,9 @@ checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" [[package]] name = "solana-streamer" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cf77ab19483dce4b4307c9e6f195a8c52f0c219026b78af3a9fae1e63ba9222" +checksum = "9a66633bc2a7bf49f9d94ec7728bc92ca7e1a563ec2d1752b1501170f48392b1" dependencies = [ "async-channel", "bytes", @@ -5699,9 +5888,9 @@ dependencies = [ [[package]] name = "solana-thin-client" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8c880be4e50ff473b3e82b600162244b6eb28cb5a616dc90ee9232d34998680" +checksum = "dc6717f63e619d8062f9ba9f874dec1fa21e5dbaf85f9a6bc8adba1b97c4df46" dependencies = [ "bincode", "log", @@ -5714,9 +5903,9 @@ dependencies = [ [[package]] name = "solana-tpu-client" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e65c01edbca303273e735ae383dde54bd5c5b8a051c51162c0ff886b0939ec6" +checksum = "c524c75954fd3ca8f7cdcb386ab70b8861c671a82ed7310e8ed50aa4318b093c" dependencies = [ "async-trait", "bincode", @@ -5738,9 +5927,9 @@ dependencies = [ [[package]] name = "solana-transaction-metrics-tracker" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44727bef1f8c57a6ed9a74761d8b7ddfcf4b4e2237cbcc5dc7f8f59985e07755" +checksum = "439f96c03d9c2f133b51fa82f227a09cf1f8d5fc63b70d7754d75c02bb7f9e5e" dependencies = [ "Inflector", "base64 0.22.1", @@ -5754,9 +5943,9 @@ dependencies = [ [[package]] name = "solana-transaction-status" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d51d9d4a6004708f9563a29aa87fdf9960c1e7420b69dd82e8b817cf8f02430b" +checksum = "02b6361f2bb0020a269108e8630c174ad97a72e8ba1fe52a7ccaae27fc1219c7" dependencies = [ "Inflector", "base64 0.22.1", @@ -5781,9 +5970,9 @@ dependencies = [ [[package]] name = "solana-type-overrides" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab21276d6296965dc7181d785075b20e97b6789c76e8376cf363b3e2f7439b6" +checksum = "18ee8181704d686981cf4c1365a15f16e1b680542c160ddd4d07ec33e0be747a" dependencies = [ "lazy_static", "rand 0.8.5", @@ -5791,9 +5980,9 @@ dependencies = [ [[package]] name = "solana-udp-client" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10e902d4dc29cafc0794073805a2db1b48b818251480a9fbaec3959df72aec2f" +checksum = "b50a813338c28da988ec0e0fd8c53f756f93c63f403a3ac46ba2c15138dd60a4" dependencies = [ "async-trait", "solana-connection-cache", @@ -5806,9 +5995,9 @@ dependencies = [ [[package]] name = "solana-version" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bcbc570264e5a61a8f84439dfc254931460769fedfb91ff16253acfc3644c9d" +checksum = "fe60c22d8bd1325ab3379950dcc14027fa40e3de9a39a0b22645f81803d6cfaf" dependencies = [ "log", "rustc_version 0.4.1", @@ -5820,9 +6009,9 @@ dependencies = [ [[package]] name = "solana-vote" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fa1401a42023379f14af9165954f44ad02888a327dfd2a4abce0f18fa7cfab9" +checksum = "62a9aa08d6d925b438d569d5e56bad9eb98fd2acd91bf76274ed59045dc77f9f" dependencies = [ "itertools 0.12.1", "log", @@ -5835,9 +6024,9 @@ dependencies = [ [[package]] name = "solana-vote-program" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfd8e539a9963c2914ff8426dfe92351a902892aea465cd507e36d638ca0b7d6" +checksum = "4e7a9b9023943c6ba747d6e1c5bf16343e510060cbef3f576d8527b33938c48a" dependencies = [ "bincode", "log", @@ -5855,9 +6044,9 @@ dependencies = [ [[package]] name = "solana-zk-token-sdk" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dd7a8d6843cb3de4c13c2cfec1994519735ea4110b7f36b80b41d57bea1c07" +checksum = "8c9c1b81daa97afa8690da1a72a453f4c7faf4dc05c4205074b2cbd8f4e5490c" dependencies = [ "aes-gcm-siv", "base64 0.22.1", @@ -5960,7 +6149,7 @@ checksum = "d9e8418ea6269dcfb01c712f0444d2c75542c04448b480e87de59d2865edc750" dependencies = [ "quote", "spl-discriminator-syn", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -5972,7 +6161,7 @@ dependencies = [ "proc-macro2", "quote", "sha2 0.10.8", - "syn 2.0.79", + "syn 2.0.85", "thiserror", ] @@ -6021,7 +6210,7 @@ dependencies = [ "proc-macro2", "quote", "sha2 0.10.8", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -6176,7 +6365,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -6198,9 +6387,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56" dependencies = [ "proc-macro2", "quote", @@ -6216,7 +6405,7 @@ dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -6228,7 +6417,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -6325,6 +6514,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "test-log" version = "0.2.16" @@ -6343,7 +6538,7 @@ checksum = "5999e24eaa32083191ba4e425deb75cdf25efefabe5aaccb7446dd0d4122a3f5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -6363,22 +6558,22 @@ checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9" [[package]] name = "thiserror" -version = "1.0.64" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" +checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.64" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" +checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -6492,7 +6687,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -6511,7 +6706,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.14", + "rustls 0.23.15", "rustls-pki-types", "tokio", ] @@ -6613,7 +6808,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-timeout", "hyper-util", "percent-encoding", @@ -6694,7 +6889,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -6807,7 +7002,7 @@ checksum = "1f718dfaf347dcb5b983bfc87608144b0bad87970aebcbea5ce44d2a30c08e63" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -6842,12 +7037,9 @@ checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" [[package]] name = "unicase" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] +checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df" [[package]] name = "unicode-bidi" @@ -6949,9 +7141,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "getrandom 0.2.15", "serde", @@ -7061,7 +7253,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", "wasm-bindgen-shared", ] @@ -7095,7 +7287,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -7184,6 +7376,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -7466,7 +7667,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -7486,7 +7687,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c46f97a..d6259ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ pub_with_shorthand = "allow" redundant_pub_crate = "allow" option_if_let_else = "allow" self_named_module_files = "allow" +shadow_unrelated = "allow" [workspace.lints.rust] missing_docs = { level = "warn", priority = -1 } @@ -67,28 +68,38 @@ amplifier-api = { path = "crates/amplifier-api" } solana-listener = { path = "crates/solana-listener" } common-serde-utils = { path = "crates/common-serde-utils" } solana-event-forwarder = { path = "crates/solana-event-forwarder" } +solana-tx-pusher = { path = "crates/solana-tx-pusher" } +retrying-solana-http-sender = { path = "crates/retrying-solana-http-sender" } +solana-gateway-task-processor = { path = "crates/solana-gateway-task-processor" } +effective-tx-sender = { path = "crates/effective-tx-sender" } # Solana Gateway gmp-gateway = { git = "https://github.com/eigerco/solana-axelar.git", branch = "main", features = ["no-entrypoint"] } +axelar-rkyv-encoding = { git = "https://github.com/eigerco/solana-axelar.git", branch = "main" } +axelar-executable = { git = "https://github.com/eigerco/solana-axelar.git", branch = "main" } # CLI clap = { version = "4", features = ["derive"] } xshell = "0.2" # Utils +arrayvec = "0.7" url = { version = "2.5", features = ["serde"] } temp-env = "0.3" chrono = { version = "0.4", default-features = false, features = ["serde", "clock", "std"] } base64 = "0.22" +bs58 = "0.5" redact = { version = "0.1", features = ["serde"] } thiserror = "1" uuid = { version = "1.2", features = ["v4", "serde"] } typed-builder = "0.18" +derive_builder = "0.20" bnum = "0.12" hex = "0.4" quanta = "0.12" backoff = { version = "0.4", features = ["tokio"] } indoc = "2" +itertools = "0.12" # Serde serde = { version = "1", features = ["derive"] } @@ -100,6 +111,7 @@ simd-json = "0.13" rstest = { version = "0.21" } test-log = { version = "0.2", features = ["trace"], default-features = false } pretty_assertions = "1" +mockall = "0.13" # Errors eyre = "0.6" diff --git a/crates/amplifier-api/src/client.rs b/crates/amplifier-api/src/client.rs index 177da24..6dfeff7 100644 --- a/crates/amplifier-api/src/client.rs +++ b/crates/amplifier-api/src/client.rs @@ -8,10 +8,6 @@ use tracing::instrument; use crate::error::AmplifierApiError; /// Client for the Amplifier API -#[expect( - clippy::module_name_repetitions, - reason = "makes the type easir to understand to the user of this crate" -)] #[derive(Clone, Debug)] pub struct AmplifierApiClient { inner: reqwest::Client, @@ -48,14 +44,10 @@ impl AmplifierApiClient { let method = T::METHOD; let client = self.inner.clone(); let payload = simd_json::to_vec(&request.payload())?; - let reqwest_req = client - .request(method, endpoint.as_str()) - .body(payload) - .build()?; + let reqwest_req = client.request(method, endpoint.as_str()).body(payload); Ok(AmplifierRequest { request: reqwest_req, - client, result: PhantomData, err: PhantomData, }) @@ -64,20 +56,25 @@ impl AmplifierApiClient { /// Encalpsulated HTTP request for the Amplifier API pub struct AmplifierRequest { - request: reqwest::Request, - client: reqwest::Client, + request: reqwest::RequestBuilder, result: PhantomData, err: PhantomData, } impl AmplifierRequest { /// execute an Amplifier API request - #[instrument(name = "execute_request", skip(self), fields(method = %self.request.method(), url = %self.request.url()))] + #[instrument(name = "execute_request", skip(self))] pub async fn execute(self) -> Result, AmplifierApiError> { - let response = self.client.execute(self.request).await?; + let (client, request) = self.request.build_split(); + let request = request?; // Capture the current span let span = tracing::Span::current(); + span.record("method", request.method().as_str()); + span.record("url", request.url().as_str()); + + // execute the request + let response = client.execute(request).await?; Ok(AmplifierResponse { response, @@ -230,7 +227,7 @@ pub mod identity { } mod serde_utils { - use serde::{Deserialize, Deserializer}; + use serde::{Deserialize as _, Deserializer}; pub(crate) fn deserialize_identity<'de, D>( deserializer: D, diff --git a/crates/amplifier-api/src/client/requests.rs b/crates/amplifier-api/src/client/requests.rs index 102fdcd..114ac70 100644 --- a/crates/amplifier-api/src/client/requests.rs +++ b/crates/amplifier-api/src/client/requests.rs @@ -1,6 +1,6 @@ //! Bindings for the Amplifier API REST [paths](https://github.com/axelarnetwork/axelar-eds-mirror/blob/3dcef3bc08ecb51af79c6223605d4fbc01660847/oapi/gmp/schema.yaml#L6-L77) -use core::ops::Add; +use core::ops::Add as _; use crate::error::AmplifierApiError; use crate::types::{ @@ -80,7 +80,7 @@ pub struct GetChains<'a> { pub limit: Option, } -impl<'a> AmplifierApiRequest for GetChains<'a> { +impl AmplifierApiRequest for GetChains<'_> { type Res = GetTasksResult; type Error = ErrorResponse; type Payload = (); @@ -123,7 +123,7 @@ pub struct PostEvents<'a, 'b> { pub payload: &'b PublishEventsRequest, } -impl<'a, 'b> AmplifierApiRequest for PostEvents<'a, 'b> { +impl AmplifierApiRequest for PostEvents<'_, '_> { type Res = PublishEventsResult; type Payload = PublishEventsRequest; type Error = ErrorResponse; diff --git a/crates/amplifier-api/src/error.rs b/crates/amplifier-api/src/error.rs index 97f3dff..5276275 100644 --- a/crates/amplifier-api/src/error.rs +++ b/crates/amplifier-api/src/error.rs @@ -1,8 +1,4 @@ #![expect(missing_docs, reason = "the error macro already is descriptive enough")] -#![expect( - clippy::module_name_repetitions, - reason = "This is fine for the error module" -)] /// Error variants for the Amplifier API #[derive(thiserror::Error, Debug)] diff --git a/crates/amplifier-api/src/types.rs b/crates/amplifier-api/src/types.rs index 0b18647..b848ae2 100644 --- a/crates/amplifier-api/src/types.rs +++ b/crates/amplifier-api/src/types.rs @@ -11,10 +11,6 @@ use serde::{Deserialize, Deserializer, Serialize}; pub type Address = String; /// Newtypes for different types of IDs so we don't mix them up in the future -#[expect( - clippy::module_name_repetitions, - reason = "the id suffix makes it easier to distinguish the types" -)] mod id { use super::*; @@ -80,7 +76,7 @@ mod id { mod serde_utils { use base64::prelude::*; - use serde::{Deserialize, Deserializer, Serializer}; + use serde::{Deserialize as _, Deserializer, Serializer}; pub(crate) fn base64_decode<'de, D>(deserializer: D) -> Result, D::Error> where diff --git a/crates/common-serde-utils/Cargo.toml b/crates/common-serde-utils/Cargo.toml index 688efb8..045e245 100644 --- a/crates/common-serde-utils/Cargo.toml +++ b/crates/common-serde-utils/Cargo.toml @@ -9,6 +9,11 @@ edition.workspace = true [dependencies] serde.workspace = true +solana-sdk = { workspace = true, optional = true } +tracing.workspace = true + +[features] +solana-sdk = ["dep:solana-sdk"] [lints] workspace = true diff --git a/crates/common-serde-utils/src/lib.rs b/crates/common-serde-utils/src/lib.rs index 739960a..827d5d9 100644 --- a/crates/common-serde-utils/src/lib.rs +++ b/crates/common-serde-utils/src/lib.rs @@ -2,7 +2,7 @@ use core::time::Duration; -use serde::{Deserialize, Deserializer}; +use serde::{Deserialize as _, Deserializer}; /// Decode [`Duratoin`] assuming that the underlying number is representation of duration in /// milliseconds @@ -19,3 +19,28 @@ where let duration = Duration::from_millis(raw_number); Ok(duration) } + +/// Decode [`solana_sdk::pubkey::Pubkey`] from a string in base58 format. +/// +/// # Errors +/// Returns an error if the provided string is not a valid base58-encoded public key. +/// +/// # Errors +/// This function will return an error if: +/// - The deserialized string is not valid base58 data. +/// - The deserialized string cannot be parsed into a [`solana_sdk::pubkey::Pubkey`]. +#[cfg(feature = "solana-sdk")] +pub fn pubkey_decode<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + use core::str::FromStr as _; + + let raw_string = String::deserialize(deserializer)?; + let pubkey = solana_sdk::pubkey::Pubkey::from_str(raw_string.as_str()) + .inspect_err(|err| { + tracing::error!(?err, "cannot parse base58 data"); + }) + .map_err(serde::de::Error::custom)?; + Ok(pubkey) +} diff --git a/crates/effective-tx-sender/Cargo.toml b/crates/effective-tx-sender/Cargo.toml new file mode 100644 index 0000000..052fdfe --- /dev/null +++ b/crates/effective-tx-sender/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "effective-tx-sender" +version.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +eyre.workspace = true +thiserror.workspace = true +solana-sdk.workspace = true +solana-client.workspace = true +itertools.workspace = true +futures.workspace = true +tracing.workspace = true + +[lints] +workspace = true diff --git a/crates/effective-tx-sender/src/lib.rs b/crates/effective-tx-sender/src/lib.rs new file mode 100644 index 0000000..a43922f --- /dev/null +++ b/crates/effective-tx-sender/src/lib.rs @@ -0,0 +1,219 @@ +//! Utility that estimates the optimal compute budget and compute unit price to maximize the +//! chances of successfully submitting a transaction on the Solana network. +//! +//! This module provides the `EffectiveTxSender` struct, which helps simulate transactions +//! to determine the required compute units and appropriate compute unit price based on recent +//! network conditions. + +use core::marker::PhantomData; +use std::collections::VecDeque; + +use futures::TryFutureExt as _; +use itertools::Itertools as _; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_response::RpcSimulateTransactionResult; +use solana_sdk::compute_budget::ComputeBudgetInstruction; +use solana_sdk::instruction::Instruction; +use solana_sdk::signature::{Keypair, Signature}; +use solana_sdk::signer::Signer as _; + +/// Typestate representing the transaction sender before simulation to evaluate compute limits. +pub struct Unevaluated; + +/// Typestate representing the transaction sender after evaluating the best compute limits. +pub struct Evaluated; + +/// A transaction sender that pre-calculates the optimal compute budget and compute unit price +/// to maximize the chances of transaction inclusion. +/// +/// Internally, it maintains a deque of instructions, prepending compute budget and unit price +/// instructions after evaluation. +pub struct EffectiveTxSender<'a, T> { + solana_rpc_client: &'a RpcClient, + ixs: VecDeque, + solana_keypair: &'a Keypair, + hash: solana_sdk::hash::Hash, + _type: PhantomData, +} + +impl<'a> EffectiveTxSender<'a, Unevaluated> { + /// Creates a new `EffectiveTxSender` instance with the provided instructions. + /// + /// After initialization, the internal instruction deque contains the provided instructions. + #[tracing::instrument(skip_all)] + pub fn new( + solana_rpc_client: &'a RpcClient, + solana_keypair: &'a Keypair, + ixs: VecDeque, + ) -> Self { + Self { + solana_rpc_client, + hash: solana_sdk::hash::Hash::new_from_array([0; 32]), + ixs, + solana_keypair, + _type: PhantomData, + } + } + + /// Evaluates the compute budget and compute unit price instructions based on simulation. + /// + /// This method simulates the transaction to determine the optimal compute budget and compute + /// unit price. It updates the instruction deque by prepending the computed instructions. + /// + /// Returns an error if the simulation fails. + #[tracing::instrument(skip_all)] + pub async fn evaluate_compute_ixs( + mut self, + ) -> Result, ComputeBudgetError> { + const MAX_COMPUTE_BUDGET: u32 = 1_399_850; + // Add the max possible compute budget instruction for simulation + let cu_budget_for_simulation = + ComputeBudgetInstruction::set_compute_unit_limit(MAX_COMPUTE_BUDGET); + + self.ixs.push_front(cu_budget_for_simulation); + let valid_slice = self.ixs.make_contiguous(); + + let compute_budget = + compute_budget(valid_slice, self.solana_keypair, self.solana_rpc_client); + let compute_unit_price = compute_unit_price(valid_slice, self.solana_rpc_client) + .map_err(eyre::Error::from) + .map_err(ComputeBudgetError::Generic); + + let (compute_unit_price, (compute_budget, hash)) = + futures::try_join!(compute_unit_price, compute_budget)?; + + self.ixs.push_front(compute_unit_price); + let valid_slice = self.ixs.make_contiguous(); + #[expect( + clippy::indexing_slicing, + reason = "we're guaranteed to have 2 elements at this point" + )] + { + valid_slice[1] = compute_budget; + }; + + Ok(EffectiveTxSender { + solana_rpc_client: self.solana_rpc_client, + ixs: self.ixs, + hash, + solana_keypair: self.solana_keypair, + _type: PhantomData, + }) + } +} + +impl<'a> EffectiveTxSender<'a, Evaluated> { + /// Signs and sends the transaction. + #[tracing::instrument(skip_all, err)] + pub async fn send_tx(self) -> eyre::Result { + let valid_slice = self.ixs.as_slices().0; + let tx = solana_sdk::transaction::Transaction::new_signed_with_payer( + valid_slice, + Some(&self.solana_keypair.pubkey()), + &[self.solana_keypair], + self.hash, + ); + + let signature = self + .solana_rpc_client + .send_and_confirm_transaction(&tx) + .await?; + Ok(signature) + } +} + +/// Error type representing possible failures during compute budget evaluation. +#[derive(thiserror::Error, Debug)] +pub enum ComputeBudgetError { + /// Error occurred during transaction simulation. + #[error("Simulation error: {0:?}")] + SimulationError(RpcSimulateTransactionResult), + /// Generic, non-recoverable error. + #[error("Generic error: {0}")] + Generic(eyre::Error), +} + +/// Computes the optimal compute budget instruction based on transaction simulation. +/// +/// Simulates the transaction to estimate the compute units consumed, then adds a top-up percentage +/// to ensure sufficient compute units during execution. +/// +/// Returns the compute budget instruction and the latest blockhash. +pub(crate) async fn compute_budget( + ixs: &[Instruction], + solana_keypair: &Keypair, + solana_rpc_client: &RpcClient, +) -> Result<(Instruction, solana_sdk::hash::Hash), ComputeBudgetError> { + const PERCENT_POINTS_TO_TOP_UP: u64 = 10; + + let hash = solana_rpc_client + .get_latest_blockhash() + .await + .map_err(eyre::Error::from) + .map_err(ComputeBudgetError::Generic)?; + let tx_to_simulate = solana_sdk::transaction::Transaction::new_signed_with_payer( + ixs, + Some(&solana_keypair.pubkey()), + &[solana_keypair], + hash, + ); + let simulation_result = solana_rpc_client + .simulate_transaction(&tx_to_simulate) + .await + .map_err(eyre::Error::from) + .map_err(ComputeBudgetError::Generic)?; + if simulation_result.value.err.is_some() { + return Err(ComputeBudgetError::SimulationError(simulation_result.value)); + } + let computed_units = simulation_result.value.units_consumed.unwrap_or(0); + let top_up = computed_units + .checked_div(PERCENT_POINTS_TO_TOP_UP) + .unwrap_or(0); + let compute_budget = computed_units.saturating_add(top_up); + let ix = ComputeBudgetInstruction::set_compute_unit_limit( + compute_budget + .try_into() + .map_err(eyre::Error::from) + .map_err(ComputeBudgetError::Generic)?, + ); + Ok((ix, hash)) +} + +/// Computes the optimal compute unit price instruction based on recent prioritization fees. +/// +/// Analyzes recent prioritization fees for accounts involved in the transaction to calculate an +/// average fee, which is then used to set the compute unit price. +/// +/// Returns the compute unit price instruction. +pub(crate) async fn compute_unit_price( + ixs: &[Instruction], + solana_rpc_client: &RpcClient, +) -> Result { + const MAX_ACCOUNTS: usize = 128; + const N_SLOTS_TO_CHECK: usize = 10; + + let all_touched_accounts = ixs + .iter() + .flat_map(|x| x.accounts.as_slice()) + .take(MAX_ACCOUNTS) + .map(|x| x.pubkey) + .collect_vec(); + let fees = solana_rpc_client + .get_recent_prioritization_fees(&all_touched_accounts) + .await?; + let (sum, count) = fees + .into_iter() + .rev() + .take(N_SLOTS_TO_CHECK) + .map(|x| x.prioritization_fee) + // Simple rolling average of the last `N_SLOTS_TO_CHECK` items. + .fold((0_u64, 0_u64), |(sum, count), fee| { + (sum.saturating_add(fee), count.saturating_add(1)) + }); + let average = if count > 0 { + sum.checked_div(count).unwrap_or(0) + } else { + 0 + }; + Ok(ComputeBudgetInstruction::set_compute_unit_price(average)) +} diff --git a/crates/relayer-amplifier-api-integration/src/component.rs b/crates/relayer-amplifier-api-integration/src/component.rs index e98b590..d4efc51 100644 --- a/crates/relayer-amplifier-api-integration/src/component.rs +++ b/crates/relayer-amplifier-api-integration/src/component.rs @@ -1,10 +1,10 @@ use core::future::Future; use core::pin::Pin; -use amplifier_api::types::PublishEventsRequest; -use futures_concurrency::future::FutureExt; +use amplifier_api::types::{PublishEventsRequest, TaskItem}; +use futures_concurrency::future::FutureExt as _; use quanta::Upkeep; -use tracing::{info_span, Instrument}; +use tracing::{info_span, Instrument as _}; use crate::{config, healthcheck, listener, subscriber}; @@ -16,6 +16,7 @@ pub enum AmplifierCommand { } pub(crate) type CommandReceiver = futures::channel::mpsc::UnboundedReceiver; +pub(crate) type AmplifierTaskSender = futures::channel::mpsc::UnboundedSender; /// The core Amplifier API abstraction. /// @@ -27,18 +28,26 @@ pub(crate) type CommandReceiver = futures::channel::mpsc::UnboundedReceiver, } +/// Utility client used for getting data from the `Amplifier` instance +#[derive(Debug)] +pub struct AmplifierTaskReceiver { + /// send commands to the `Amplifier` instance + pub receiver: futures::channel::mpsc::UnboundedReceiver, +} + impl relayer_engine::RelayerComponent for Amplifier { fn process(self: Box) -> Pin> + Send>> { - use futures::FutureExt; + use futures::FutureExt as _; self.process_internal().boxed() } @@ -50,14 +59,17 @@ impl Amplifier { /// The returned variable also returns a helper client that encompasses ways to communicate with /// the underlying Amplifier instance. #[must_use] - pub fn new(config: config::Config) -> (Self, AmplifierClient) { - let (tx, rx) = futures::channel::mpsc::unbounded(); + pub fn new(config: config::Config) -> (Self, AmplifierCommandClient, AmplifierTaskReceiver) { + let (command_tx, command_rx) = futures::channel::mpsc::unbounded(); + let (task_tx, task_rx) = futures::channel::mpsc::unbounded(); let this = Self { config, - receiver: rx, + sender: task_tx, + receiver: command_rx, }; - let client = AmplifierClient { sender: tx }; - (this, client) + let client = AmplifierCommandClient { sender: command_tx }; + let task_client = AmplifierTaskReceiver { receiver: task_rx }; + (this, client, task_client) } #[tracing::instrument(skip_all, name = "Amplifier")] @@ -78,10 +90,13 @@ impl Amplifier { ) .instrument(info_span!("subscriber")) .in_current_span(); - let from_amplifier_msgs = - listener::process_msgs_from_amplifier(self.config.clone(), client.clone()) - .instrument(info_span!("listener")) - .in_current_span(); + let from_amplifier_msgs = listener::process_msgs_from_amplifier( + self.config.clone(), + client.clone(), + self.sender.clone(), + ) + .instrument(info_span!("listener")) + .in_current_span(); // await tasks until one of them exits (fatal) healthcheck diff --git a/crates/relayer-amplifier-api-integration/src/config.rs b/crates/relayer-amplifier-api-integration/src/config.rs index 1088b10..521f9e3 100644 --- a/crates/relayer-amplifier-api-integration/src/config.rs +++ b/crates/relayer-amplifier-api-integration/src/config.rs @@ -51,7 +51,7 @@ pub(crate) mod config_defaults { } pub(crate) const fn get_chains_poll_interval() -> Duration { - Duration::from_secs(3) + Duration::from_secs(60) } #[expect(clippy::unnecessary_wraps, reason = "fine for config defaults")] diff --git a/crates/relayer-amplifier-api-integration/src/lib.rs b/crates/relayer-amplifier-api-integration/src/lib.rs index a6551c3..6525f3d 100644 --- a/crates/relayer-amplifier-api-integration/src/lib.rs +++ b/crates/relayer-amplifier-api-integration/src/lib.rs @@ -7,5 +7,5 @@ mod listener; mod subscriber; pub use amplifier_api; -pub use component::{Amplifier, AmplifierClient, AmplifierCommand}; +pub use component::{Amplifier, AmplifierCommand, AmplifierCommandClient, AmplifierTaskReceiver}; pub use config::Config; diff --git a/crates/relayer-amplifier-api-integration/src/listener.rs b/crates/relayer-amplifier-api-integration/src/listener.rs index e5246d2..3a43377 100644 --- a/crates/relayer-amplifier-api-integration/src/listener.rs +++ b/crates/relayer-amplifier-api-integration/src/listener.rs @@ -3,10 +3,12 @@ use core::task::Poll; use amplifier_api::requests::{self, WithTrailingSlash}; use amplifier_api::types::{ErrorResponse, GetTasksResult}; use amplifier_api::AmplifierRequest; -use futures::stream::StreamExt; +use futures::stream::StreamExt as _; +use futures::SinkExt as _; use tokio::task::JoinSet; use tokio_stream::wrappers::IntervalStream; +use crate::component::AmplifierTaskSender; use crate::config::Config; // process incoming messages (aka `tasks`) coming in form Amplifier API @@ -15,6 +17,7 @@ use crate::config::Config; pub(crate) async fn process_msgs_from_amplifier( config: Config, client: amplifier_api::AmplifierApiClient, + fan_out_sender: AmplifierTaskSender, ) -> eyre::Result<()> { tracing::info!(poll_interval =? config.get_chains_poll_interval, "spawned"); @@ -29,7 +32,13 @@ pub(crate) async fn process_msgs_from_amplifier( // periodically query new tasks match interval_stream.poll_next_unpin(cx) { Poll::Ready(Some(_res)) => { - let res = internal(&config, &chain_with_trailing_slash, &client, &mut join_set); + let res = internal( + &config, + &chain_with_trailing_slash, + &client, + fan_out_sender.clone(), + &mut join_set, + ); // in case we were awoken by join_set being ready, let's re-run this function, while // returning the result of `internal`. cx.waker().wake_by_ref(); @@ -69,6 +78,7 @@ pub(crate) fn internal( config: &Config, chain_with_trailing_slash: &WithTrailingSlash, client: &lifier_api::AmplifierApiClient, + fan_out_sender: AmplifierTaskSender, to_join_set: &mut JoinSet>, ) -> eyre::Result<()> { let request = requests::GetChains::builder() @@ -76,29 +86,19 @@ pub(crate) fn internal( .limit(config.get_chains_limit) .build(); let request = client.build_request(&request)?; - to_join_set.spawn(process_task_request(request)); + to_join_set.spawn(process_task_request(request, fan_out_sender)); Ok(()) } -#[expect(clippy::unimplemented, reason = "will be added in the future")] async fn process_task_request( request: AmplifierRequest, + mut fan_out_sender: AmplifierTaskSender, ) -> eyre::Result<()> { let res = request.execute().await?; let res = res.json().await??; tracing::info!(task_count = ?res.tasks.len(), "received new tasks"); - for task_item in res.tasks { - use amplifier_api::types::Task::{Execute, GatewayTx, Refund, Verify}; - match task_item.task { - Verify(_) => unimplemented!("this will be supported in the future"), - GatewayTx(gateway_tx_task) => { - tracing::info!(incoming_gateway_tx_len = ?gateway_tx_task.execute_data.len(), "handle gateway task"); - // TODO: Send this to a pre-registered handler - } - Execute(_) => unimplemented!("this will be supported in the future"), - Refund(_) => unimplemented!("this will be supported in the future"), - } - } + let mut iter = futures::stream::iter(res.tasks.into_iter().map(Ok)); + fan_out_sender.send_all(&mut iter).await?; Ok(()) } diff --git a/crates/relayer-amplifier-api-integration/src/subscriber.rs b/crates/relayer-amplifier-api-integration/src/subscriber.rs index 4d4cb10..0c4c573 100644 --- a/crates/relayer-amplifier-api-integration/src/subscriber.rs +++ b/crates/relayer-amplifier-api-integration/src/subscriber.rs @@ -3,10 +3,10 @@ use core::task::Poll; use amplifier_api::requests::{self, WithTrailingSlash}; use amplifier_api::types::{ErrorResponse, PublishEventsResult}; use amplifier_api::AmplifierRequest; -use futures::stream::FusedStream; -use futures::StreamExt; +use futures::stream::FusedStream as _; +use futures::StreamExt as _; use tokio::task::JoinSet; -use tracing::{info_span, Instrument}; +use tracing::{info_span, Instrument as _}; use super::component::{AmplifierCommand, CommandReceiver}; use super::config::Config; diff --git a/crates/relayer-engine/src/config.rs b/crates/relayer-engine/src/config.rs index aa5fb62..10d25f2 100644 --- a/crates/relayer-engine/src/config.rs +++ b/crates/relayer-engine/src/config.rs @@ -30,10 +30,6 @@ pub struct Config { pub health_check: HealthCheckConfig, } -#[expect( - clippy::module_name_repetitions, - reason = "makes it clearer that this is a config-only struct" -)] /// Health check server configuration. #[derive(Debug, Deserialize, PartialEq, Eq)] pub struct HealthCheckConfig { diff --git a/crates/relayer-engine/src/lib.rs b/crates/relayer-engine/src/lib.rs index 34f4249..ec9288c 100644 --- a/crates/relayer-engine/src/lib.rs +++ b/crates/relayer-engine/src/lib.rs @@ -4,7 +4,7 @@ mod config; pub use config::{Config, HealthCheckConfig, RelayerComponent}; use tokio::task::JoinSet; -use tracing::{info_span, Instrument}; +use tracing::{info_span, Instrument as _}; pub use url; /// Generic array of components to be consumed by the engine diff --git a/crates/retrying-solana-http-sender/Cargo.toml b/crates/retrying-solana-http-sender/Cargo.toml new file mode 100644 index 0000000..993d5c1 --- /dev/null +++ b/crates/retrying-solana-http-sender/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "retrying-solana-http-sender" +version.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +async-trait.workspace = true +backoff.workspace = true +serde_json.workspace = true +solana-client.workspace = true +solana-sdk.workspace = true +solana-rpc-client.workspace = true +solana-rpc-client-api.workspace = true +tokio.workspace = true +tracing.workspace = true +serde.workspace = true +typed-builder.workspace = true +url.workspace = true + +[lints] +workspace = true diff --git a/crates/solana-listener/src/retrying_http_sender.rs b/crates/retrying-solana-http-sender/src/lib.rs similarity index 63% rename from crates/solana-listener/src/retrying_http_sender.rs rename to crates/retrying-solana-http-sender/src/lib.rs index 00d145e..b1df9db 100644 --- a/crates/solana-listener/src/retrying_http_sender.rs +++ b/crates/retrying-solana-http-sender/src/lib.rs @@ -1,31 +1,54 @@ +//! Simple HTTP client that will limit the amount of concurrent requests. +//! It will also retry the HTTP calls if they failed with a an exponential backoff strategy. +//! Intended to rate-limit Solana RPC calls. + use core::time::Duration; use std::sync::Arc; use async_trait::async_trait; use backoff::future::retry; use backoff::ExponentialBackoffBuilder; +use serde::Deserialize; use serde_json::Value; use solana_client::client_error::{ClientError, ClientErrorKind}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_client::RpcClientConfig; use solana_client::rpc_sender::RpcTransportStats; use solana_rpc_client::http_sender::HttpSender; use solana_rpc_client::rpc_sender::RpcSender; use solana_rpc_client_api::client_error::Result as ClientResult; use solana_rpc_client_api::request::RpcRequest; +use solana_sdk::commitment_config::CommitmentConfig; use tokio::sync::Semaphore; use tracing::error; +use typed_builder::TypedBuilder; /// The maximum elapsed time for retrying failed requests. const TWO_MINUTES: Duration = Duration::from_millis(2 * 60 * 1_000); +/// Create a new Solana RPC client based on the provided config +#[must_use] +pub fn new_client(config: &Config) -> Arc { + let sender = RetryingHttpSender::new( + config.solana_http_rpc.to_string(), + config.max_concurrent_rpc_requests, + ); + let config = RpcClientConfig::with_commitment(CommitmentConfig::confirmed()); + let client = RpcClient::new_sender(sender, config); + Arc::new(client) +} + /// A wrapper around `HttpSender` that adds retry logic for sending RPC /// requests. -pub(crate) struct RetryingHttpSender { +pub struct RetryingHttpSender { http_client: HttpSender, request_permit: Arc, } impl RetryingHttpSender { - pub(crate) fn new(url: String, max_concurrent_requests: usize) -> Self { + /// Initialize a new + #[must_use] + pub fn new(url: String, max_concurrent_requests: usize) -> Self { let http = HttpSender::new(url); let request_permit = Arc::new(Semaphore::new(max_concurrent_requests)); Self { @@ -84,3 +107,24 @@ impl RpcSender for RetryingHttpSender { self.http_client.url() } } + +/// Configuration for initialising the [`RetryingHttpSender`] +#[derive(Debug, Deserialize, Clone, PartialEq, Eq, TypedBuilder)] +pub struct Config { + /// How many rpc requests we process at the same time + #[builder(default = config_defaults::max_concurrent_rpc_requests())] + #[serde( + rename = "max_concurrent_rpc_requests", + default = "config_defaults::max_concurrent_rpc_requests" + )] + pub max_concurrent_rpc_requests: usize, + + /// The rpc of the solana node + pub solana_http_rpc: url::Url, +} + +mod config_defaults { + pub(crate) const fn max_concurrent_rpc_requests() -> usize { + 5 + } +} diff --git a/crates/solana-axelar-relayer/Cargo.toml b/crates/solana-axelar-relayer/Cargo.toml index 917a8c5..1441f61 100644 --- a/crates/solana-axelar-relayer/Cargo.toml +++ b/crates/solana-axelar-relayer/Cargo.toml @@ -17,6 +17,8 @@ serde.workspace = true relayer-amplifier-api-integration.workspace = true solana-listener.workspace = true solana-event-forwarder.workspace = true +solana-gateway-task-processor.workspace = true +retrying-solana-http-sender.workspace = true tracing-subscriber.workspace = true tracing-error.workspace = true opentelemetry.workspace = true diff --git a/crates/solana-axelar-relayer/src/main.rs b/crates/solana-axelar-relayer/src/main.rs index c025a06..5b28fc4 100644 --- a/crates/solana-axelar-relayer/src/main.rs +++ b/crates/solana-axelar-relayer/src/main.rs @@ -1,6 +1,7 @@ //! Transaction relayer for Solana-Axelar integration use std::path::PathBuf; +use std::sync::Arc; use relayer_amplifier_api_integration::Amplifier; use relayer_engine::{RelayerComponent, RelayerEngine}; @@ -24,13 +25,22 @@ async fn main() { let config_file = std::fs::read_to_string(config_file).expect("cannot read config file"); let config = toml::from_str::(&config_file).expect("invalid config file"); + let rpc_client = retrying_solana_http_sender::new_client(&config.solana_rpc); let event_forwarder_config = solana_event_forwarder::Config::new( &config.solana_listener_component, &config.amplifier_component, ); - let (amplifier_component, amplifier_client) = Amplifier::new(config.amplifier_component); - let (solana_listener_component, solana_listener_client) = - solana_listener::SolanaListener::new(config.solana_listener_component); + let (amplifier_component, amplifier_client, amplifier_task_receiver) = + Amplifier::new(config.amplifier_component); + let gateway_task_processor = solana_gateway_task_processor::SolanaTxPusher::new( + config.solana_gateway_task_processor, + Arc::clone(&rpc_client), + amplifier_task_receiver, + ); + let (solana_listener_component, solana_listener_client) = solana_listener::SolanaListener::new( + config.solana_listener_component, + Arc::clone(&rpc_client), + ); let solana_event_forwarder_component = solana_event_forwarder::SolanaEventForwarder::new( event_forwarder_config, solana_listener_client, @@ -40,6 +50,7 @@ async fn main() { Box::new(amplifier_component), Box::new(solana_listener_component), Box::new(solana_event_forwarder_component), + Box::new(gateway_task_processor), ]; RelayerEngine::new(config.relayer_engine, components) .start_and_wait_for_shutdown() @@ -61,8 +72,12 @@ pub struct Config { pub amplifier_component: relayer_amplifier_api_integration::Config, /// Configuration for the Solana transaction listener processor pub solana_listener_component: solana_listener::Config, + /// Configuration for the Solana transaction listener processor + pub solana_gateway_task_processor: solana_gateway_task_processor::Config, /// Meta-configuration on the engine pub relayer_engine: relayer_engine::Config, + /// Shared configuration for the Solana RPC client + pub solana_rpc: retrying_solana_http_sender::Config, } #[expect( @@ -72,13 +87,13 @@ pub struct Config { #[cfg(test)] mod tests { use core::net::SocketAddr; - use core::str::FromStr; + use core::str::FromStr as _; use core::time::Duration; use amplifier_api::identity::Identity; use pretty_assertions::assert_eq; use solana_listener::solana_sdk::pubkey::Pubkey; - use solana_listener::solana_sdk::signature::Signature; + use solana_listener::solana_sdk::signature::{Keypair, Signature}; use solana_listener::MissedSignatureCatchupStrategy; use crate::Config; @@ -95,6 +110,8 @@ mod tests { let solana_tx_scan_poll_period = Duration::from_millis(42); let solana_tx_scan_poll_period_ms = solana_tx_scan_poll_period.as_millis(); let max_concurrent_rpc_requests = 100; + let signing_keypair = Keypair::new(); + let signing_keypair_as_str = signing_keypair.to_base58_string(); let latest_processed_signature = Signature::new_unique().to_string(); let identity = identity_fixture(); let missed_signature_catchup_strategy = "until_beginning"; @@ -112,12 +129,18 @@ mod tests { [solana_listener_component] gateway_program_address = "{gateway_program_address_as_str}" - solana_http_rpc = "{solana_rpc}" solana_ws = "{solana_ws}" tx_scan_poll_period_in_milliseconds = {solana_tx_scan_poll_period_ms} - max_concurrent_rpc_requests = {max_concurrent_rpc_requests} missed_signature_catchup_strategy = "{missed_signature_catchup_strategy}" latest_processed_signature = "{latest_processed_signature}" + + [solana_gateway_task_processor] + signing_keypair = "{signing_keypair_as_str}" + gateway_program_address = "{gateway_program_address_as_str}" + + [solana_rpc] + max_concurrent_rpc_requests = {max_concurrent_rpc_requests} + solana_http_rpc = "{solana_rpc}" "#}; let parsed: Config = toml::from_str(&input)?; @@ -134,13 +157,19 @@ mod tests { }, solana_listener_component: solana_listener::Config { gateway_program_address, - solana_http_rpc: solana_rpc, tx_scan_poll_period: solana_tx_scan_poll_period, - max_concurrent_rpc_requests, solana_ws, missed_signature_catchup_strategy: MissedSignatureCatchupStrategy::UntilBeginning, latest_processed_signature: Some(Signature::from_str(&latest_processed_signature)?), }, + solana_gateway_task_processor: solana_gateway_task_processor::Config { + gateway_program_address, + signing_keypair, + }, + solana_rpc: retrying_solana_http_sender::Config { + max_concurrent_rpc_requests, + solana_http_rpc: solana_rpc, + }, }; assert_eq!(parsed, expected); Ok(()) diff --git a/crates/solana-axelar-relayer/src/telemetry.rs b/crates/solana-axelar-relayer/src/telemetry.rs index e0c0889..6743371 100644 --- a/crates/solana-axelar-relayer/src/telemetry.rs +++ b/crates/solana-axelar-relayer/src/telemetry.rs @@ -3,7 +3,7 @@ use opentelemetry::metrics::MetricsError; use opentelemetry::trace::TraceError; use opentelemetry::{global, KeyValue}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; -use opentelemetry_otlp::{ExportConfig, WithExportConfig}; +use opentelemetry_otlp::{ExportConfig, WithExportConfig as _}; use opentelemetry_sdk::logs::{Logger, LoggerProvider}; use opentelemetry_sdk::trace::Config; use opentelemetry_sdk::{runtime, trace as sdktrace, Resource}; @@ -103,6 +103,8 @@ fn setup_subscriber( .add_directive("amplifier_api=info".parse()?) .add_directive("solana_listener=info".parse()?) .add_directive("solana_event_forwarder=info".parse()?) + .add_directive("solana_gateway_task_processor=info".parse()?) + .add_directive("effective_tx_sender=info".parse()?) .add_directive("hyper=error".parse()?) .add_directive("tonic=error".parse()?) .add_directive("reqwest=error".parse()?) diff --git a/crates/solana-event-forwarder/src/component.rs b/crates/solana-event-forwarder/src/component.rs index c3bad43..1a189fa 100644 --- a/crates/solana-event-forwarder/src/component.rs +++ b/crates/solana-event-forwarder/src/component.rs @@ -1,7 +1,7 @@ use core::future::Future; use core::pin::Pin; -use futures::{SinkExt, StreamExt}; +use futures::{SinkExt as _, StreamExt as _}; use gmp_gateway::events::{EventContainer, GatewayEvent}; use relayer_amplifier_api_integration::amplifier_api::types::{ CallEvent, Event, EventBase, EventId, EventMetadata, GatewayV2Message, MessageId, @@ -17,12 +17,12 @@ use solana_sdk::pubkey::Pubkey; pub struct SolanaEventForwarder { config: crate::Config, solana_listener_client: solana_listener::SolanaListenerClient, - amplifier_client: relayer_amplifier_api_integration::AmplifierClient, + amplifier_client: relayer_amplifier_api_integration::AmplifierCommandClient, } impl relayer_engine::RelayerComponent for SolanaEventForwarder { fn process(self: Box) -> Pin> + Send>> { - use futures::FutureExt; + use futures::FutureExt as _; self.process_internal().boxed() } @@ -34,7 +34,7 @@ impl SolanaEventForwarder { pub const fn new( config: crate::Config, solana_listener_client: solana_listener::SolanaListenerClient, - amplifier_client: relayer_amplifier_api_integration::AmplifierClient, + amplifier_client: relayer_amplifier_api_integration::AmplifierCommandClient, ) -> Self { Self { config, diff --git a/crates/solana-gateway-task-processor/Cargo.toml b/crates/solana-gateway-task-processor/Cargo.toml new file mode 100644 index 0000000..a3578a6 --- /dev/null +++ b/crates/solana-gateway-task-processor/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "solana-gateway-task-processor" +version.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +eyre.workspace = true +solana-sdk.workspace = true +typed-builder.workspace = true +solana-client.workspace = true +serde.workspace = true +gmp-gateway.workspace = true +axelar-rkyv-encoding.workspace = true +tracing.workspace = true +futures.workspace = true +common-serde-utils = { workspace = true, features = ["solana-sdk"] } +bs58.workspace = true +relayer-engine.workspace = true +tokio.workspace = true +amplifier-api.workspace = true +relayer-amplifier-api-integration.workspace = true +effective-tx-sender.workspace = true + +[dev-dependencies] +serde_json.workspace = true +mockall.workspace = true +async-trait.workspace = true +solana-transaction-status.workspace = true + +[lints] +workspace = true diff --git a/crates/solana-gateway-task-processor/src/component.rs b/crates/solana-gateway-task-processor/src/component.rs new file mode 100644 index 0000000..6b2d99a --- /dev/null +++ b/crates/solana-gateway-task-processor/src/component.rs @@ -0,0 +1,570 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::Poll; +use std::collections::VecDeque; +use std::sync::Arc; + +use amplifier_api::types::TaskItem; +use axelar_rkyv_encoding::types::{HasheableMessageVec, VerifierSet}; +use effective_tx_sender::ComputeBudgetError; +use futures::stream::{FusedStream as _, FuturesUnordered}; +use futures::StreamExt as _; +use gmp_gateway::commands::OwnedCommand; +use gmp_gateway::state::GatewayApprovedCommand; +use gmp_gateway::{hasher_impl, instructions}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_response::RpcSimulateTransactionResult; +use solana_sdk::instruction::{Instruction, InstructionError}; +use solana_sdk::program_pack::Pack as _; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, Signature}; +use solana_sdk::signer::Signer as _; +use solana_sdk::transaction::TransactionError; +use tokio::task::JoinSet; +use tracing::{instrument, Instrument as _}; + +use crate::config; + +/// A component that pushes transactions over to the Solana blockchain. +/// The transactions to push are dependant on the events that the Amplifier API will provide +pub struct SolanaTxPusher { + config: config::Config, + rpc_client: Arc, + task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver, +} + +impl relayer_engine::RelayerComponent for SolanaTxPusher { + fn process(self: Box) -> Pin> + Send>> { + use futures::FutureExt as _; + + self.process_internal().boxed() + } +} + +impl SolanaTxPusher { + /// Create a new [`SolanaTxPusher`] component + #[must_use] + pub const fn new( + config: config::Config, + rpc_client: Arc, + task_receiver: relayer_amplifier_api_integration::AmplifierTaskReceiver, + ) -> Self { + Self { + config, + rpc_client, + task_receiver, + } + } + + async fn process_internal(self) -> eyre::Result<()> { + let config_metadata = self.get_config_metadata().await.map(Arc::new)?; + + let keypair = Arc::new(self.config.signing_keypair.insecure_clone()); + let mut join_set = JoinSet::>::new(); + let mut rx = self.task_receiver.receiver.fuse(); + let mut task_stream = futures::stream::poll_fn(move |cx| { + // check if we have new requests to add to the join set + match rx.poll_next_unpin(cx) { + Poll::Ready(Some(command)) => { + // spawn the command on the joinset, returning the error + tracing::info!(?command, "received command from amplifier API"); + join_set.spawn({ + let solana_rpc_client = Arc::clone(&self.rpc_client); + let keypair = Arc::clone(&keypair); + let config_metadata = Arc::clone(&config_metadata); + async move { + process_task(&keypair, &solana_rpc_client, command, &config_metadata) + .await + } + }); + } + Poll::Pending => (), + Poll::Ready(None) => { + tracing::error!("receiver channel closed"); + join_set.abort_all(); + } + } + // check if any background tasks are done + match join_set.poll_join_next(cx) { + Poll::Ready(Some(res)) => Poll::Ready(Some(res)), + // join set returns `Poll::Ready(None)` when it's empty + Poll::Ready(None) => { + if rx.is_terminated() { + return Poll::Ready(None) + } + Poll::Pending + } + Poll::Pending => Poll::Pending, + } + }); + + while let Some(task_result) = task_stream.next().await { + let Ok(res) = task_result else { + tracing::error!(?task_result, "background task panicked"); + continue; + }; + let Err(err) = res else { + continue; + }; + + tracing::error!(?err, "background task returned an error"); + } + + eyre::bail!("fatal error") + } + + async fn get_config_metadata(&self) -> Result { + let gateway_root_pda = gmp_gateway::get_gateway_root_config_pda().0; + let data = self.rpc_client.get_account_data(&gateway_root_pda).await?; + let root_config = gmp_gateway::state::GatewayConfig::unpack_from_slice(&data)?; + let config_metadata = ConfigMetadata { + gateway_root_pda, + domain_separator: root_config.domain_separator, + }; + Ok(config_metadata) + } +} + +struct ConfigMetadata { + gateway_root_pda: Pubkey, + domain_separator: [u8; 32], +} + +#[instrument(skip_all)] +async fn process_task( + keypair: &Keypair, + solana_rpc_client: &RpcClient, + task: TaskItem, + metadata: &ConfigMetadata, +) -> eyre::Result<()> { + use amplifier_api::types::Task::{Execute, GatewayTx, Refund, Verify}; + use axelar_rkyv_encoding::types::Payload::{Messages, VerifierSet}; + let signer = keypair.pubkey(); + let gateway_root_pda = gmp_gateway::get_gateway_root_config_pda().0; + + #[expect( + clippy::todo, + reason = "fine for the time being, will be refactored later" + )] + match task.task { + Verify(_verify_task) => todo!(), + GatewayTx(gateway_transaction_task) => { + let execute_data_bytes = gateway_transaction_task.execute_data.as_ref(); + + let decoded_execute_data = + axelar_rkyv_encoding::types::ExecuteData::from_bytes(execute_data_bytes) + .map_err(|_err| eyre::eyre!("cannot decode execute data"))?; + let signing_verifier_set = decoded_execute_data.proof.verifier_set(); + let (signing_verifier_set_pda, _) = gmp_gateway::get_verifier_set_tracker_pda( + &gmp_gateway::id(), + signing_verifier_set.hash(hasher_impl()), + ); + + match decoded_execute_data.payload { + Messages(messages) => { + ProcessMessages::builder() + .messages(messages) + .signer(signer) + .gateway_root_pda(gateway_root_pda) + .metadata(metadata) + .execute_data_bytes(execute_data_bytes) + .signing_verifier_set_pda(signing_verifier_set_pda) + .solana_rpc_client(solana_rpc_client) + .keypair(keypair) + .build() + .execute() + .await?; + } + VerifierSet(new_verifier_set) => { + ProcessVerifierSet::builder() + .new_verifier_set(new_verifier_set) + .signer(signer) + .gateway_root_pda(gateway_root_pda) + .metadata(metadata) + .execute_data_bytes(execute_data_bytes) + .signing_verifier_set_pda(signing_verifier_set_pda) + .solana_rpc_client(solana_rpc_client) + .keypair(keypair) + .build() + .execute() + .await?; + } + } + } + Execute(_execute_task) => todo!(), + Refund(_refund_task) => todo!(), + }; + + Ok(()) +} + +#[derive(typed_builder::TypedBuilder)] +struct ProcessMessages<'a> { + messages: HasheableMessageVec, + signer: Pubkey, + gateway_root_pda: Pubkey, + metadata: &'a ConfigMetadata, + execute_data_bytes: &'a [u8], + signing_verifier_set_pda: Pubkey, + solana_rpc_client: &'a RpcClient, + keypair: &'a Keypair, +} + +impl<'a> ProcessMessages<'a> { + #[tracing::instrument(skip_all)] + async fn execute(&self) -> eyre::Result<()> { + let execute_data_pda = InitializeApproveMessagesExecuteData::builder() + .signer(self.signer) + .gateway_root_pda(self.gateway_root_pda) + .domain_separator(&self.metadata.domain_separator) + .execute_data_bytes(self.execute_data_bytes) + .solana_rpc_client(self.solana_rpc_client) + .keypair(self.keypair) + .build() + .execute() + .await?; + + // Compose messages + let mut future_set = self + .messages + .iter() + .filter_map(|message| { + let command = OwnedCommand::ApproveMessage(message.clone()); + let (approved_message_pda, _bump, _seed) = + GatewayApprovedCommand::pda(&self.metadata.gateway_root_pda, &command); + let ix = instructions::initialize_pending_command( + &self.metadata.gateway_root_pda, + &self.signer, + command, + ) + .ok()?; + + let output = async move { + let send_transaction_result = + send_transaction(self.solana_rpc_client, self.keypair, ix).await; + + let Err(err) = send_transaction_result else { + // tx was successfully executed + return Ok(execute_data_pda) + }; + + // tx was not executed -- inspect root cause + let ComputeBudgetError::SimulationError(ref simulation) = err else { + // some kid of irrecoverable error + return Err(eyre::Error::from(err)) + }; + + if matches!( + simulation.err, + Some(TransactionError::InstructionError( + 1, // <-- 0th idx is the ComputeBudget prefix + InstructionError::InvalidAccountData + )) + ) { + return eyre::Result::Ok(approved_message_pda); + } + + // Return the simulation error + Err(eyre::Error::from(err)) + } + .instrument(tracing::info_span!( + "registering command PDA", + ?approved_message_pda + )); + Some(output) + }) + .collect::>(); + + let mut command_accounts = Vec::new(); + while let Some(result) = future_set.next().await { + let pubkey = result?; + command_accounts.push(pubkey); + } + + ApproveMessages::builder() + .execute_data_pda(execute_data_pda) + .gateway_root_pda(&self.metadata.gateway_root_pda) + .command_accounts(&command_accounts) + .signing_verifier_set_pda(self.signing_verifier_set_pda) + .solana_rpc_client(self.solana_rpc_client) + .keypair(self.keypair) + .build() + .execute() + .await?; + + Ok(()) + } +} + +#[derive(typed_builder::TypedBuilder)] +struct ProcessVerifierSet<'a> { + new_verifier_set: VerifierSet, + signer: Pubkey, + gateway_root_pda: Pubkey, + metadata: &'a ConfigMetadata, + execute_data_bytes: &'a [u8], + signing_verifier_set_pda: Pubkey, + solana_rpc_client: &'a RpcClient, + keypair: &'a Keypair, +} + +impl<'a> ProcessVerifierSet<'a> { + #[instrument(skip_all)] + pub async fn execute(&self) -> eyre::Result<()> { + let execute_data_pda = InitializeRotateSignersExecuteData::builder() + .signer(self.signer) + .gateway_root_pda(self.gateway_root_pda) + .domain_separator(&self.metadata.domain_separator) + .execute_data_bytes(self.execute_data_bytes) + .solana_rpc_client(self.solana_rpc_client) + .keypair(self.keypair) + .build() + .execute() + .await?; + + let new_signing_verifier_set_pda = + get_new_signing_verifier_set_pda(&self.new_verifier_set)?; + + RotateSigners::builder() + .execute_data_pda(execute_data_pda) + .gateway_root_pda(&self.metadata.gateway_root_pda) + .signing_verifier_set_pda(self.signing_verifier_set_pda) + .new_signing_verifier_set_pda(new_signing_verifier_set_pda) + .signer(self.signer) + .solana_rpc_client(self.solana_rpc_client) + .keypair(self.keypair) + .build() + .execute() + .await?; + + Ok(()) + } +} + +#[derive(typed_builder::TypedBuilder)] +struct LogFinder<'a> { + simulation: &'a RpcSimulateTransactionResult, + log_to_search: &'a str, +} + +impl<'a> LogFinder<'a> { + const fn new(simulation: &'a RpcSimulateTransactionResult, log_to_search: &'a str) -> Self { + Self { + simulation, + log_to_search, + } + } + + #[instrument(skip_all)] + pub fn find(&self) -> bool { + self.simulation + .logs + .as_ref() + .is_some_and(|logs| logs.iter().any(|log| log.starts_with(self.log_to_search))) + } +} + +#[derive(typed_builder::TypedBuilder)] +struct InitializeApproveMessagesExecuteData<'a> { + signer: Pubkey, + gateway_root_pda: Pubkey, + domain_separator: &'a [u8; 32], + execute_data_bytes: &'a [u8], + solana_rpc_client: &'a RpcClient, + keypair: &'a Keypair, +} + +impl<'a> InitializeApproveMessagesExecuteData<'a> { + #[instrument(skip_all)] + pub async fn execute(&self) -> eyre::Result { + let (ix, execute_data) = instructions::initialize_approve_messages_execute_data( + self.signer, + self.gateway_root_pda, + self.domain_separator, + self.execute_data_bytes, + )?; + let (execute_data_pda, ..) = gmp_gateway::get_execute_data_pda( + &self.gateway_root_pda, + &execute_data.hash_decoded_contents(), + ); + + let send_transaction_result = + send_transaction(self.solana_rpc_client, self.keypair, ix).await; + + let Err(err) = send_transaction_result else { + // tx was successfully executed + return Ok(execute_data_pda) + }; + + // tx was not executed -- inspect root cause + let ComputeBudgetError::SimulationError(ref simulation) = err else { + // some kid of irrecoverable error + return Err(eyre::Error::from(err)) + }; + + // This happens if the PDA was already initialised + if LogFinder::new( + simulation, + "Program log: Execute Datat PDA already initialized", + ) + .find() + { + // Acceptable simulation error; proceed as successful + return Ok(execute_data_pda) + } + + // Return the simulation error + Err(eyre::Error::from(err)) + } +} + +#[derive(typed_builder::TypedBuilder)] +struct InitializeRotateSignersExecuteData<'a> { + signer: Pubkey, + gateway_root_pda: Pubkey, + domain_separator: &'a [u8; 32], + execute_data_bytes: &'a [u8], + solana_rpc_client: &'a RpcClient, + keypair: &'a Keypair, +} + +impl<'a> InitializeRotateSignersExecuteData<'a> { + #[instrument(skip_all)] + pub async fn execute(&self) -> eyre::Result { + let (ix, execute_data) = instructions::initialize_rotate_signers_execute_data( + self.signer, + self.gateway_root_pda, + self.domain_separator, + self.execute_data_bytes, + )?; + let (execute_data_pda, ..) = gmp_gateway::get_execute_data_pda( + &self.gateway_root_pda, + &execute_data.hash_decoded_contents(), + ); + tracing::info!(?execute_data_pda, "execute data PDA"); + + let send_transaction_result = + send_transaction(self.solana_rpc_client, self.keypair, ix).await; + + let Err(err) = send_transaction_result else { + // tx was successfully executed + return Ok(execute_data_pda) + }; + + // tx was not executed -- inspect root cause + let ComputeBudgetError::SimulationError(ref simulation) = err else { + // some kid of irrecoverable error + return Err(eyre::Error::from(err)) + }; + + // This happens if the PDA was already initialised + if LogFinder::new( + simulation, + "Program log: Execute Datat PDA already initialized", + ) + .find() + { + // Acceptable simulation error; proceed as successful + return Ok(execute_data_pda) + } + + // Return the simulation error + Err(eyre::Error::from(err)) + } +} + +#[derive(typed_builder::TypedBuilder)] +struct ApproveMessages<'a> { + execute_data_pda: Pubkey, + gateway_root_pda: &'a Pubkey, + command_accounts: &'a [Pubkey], + signing_verifier_set_pda: Pubkey, + solana_rpc_client: &'a RpcClient, + keypair: &'a Keypair, +} + +impl<'a> ApproveMessages<'a> { + #[instrument(skip_all)] + pub async fn execute(&self) -> eyre::Result<()> { + let ix = instructions::approve_messages( + self.execute_data_pda, + *self.gateway_root_pda, + self.command_accounts, + self.signing_verifier_set_pda, + )?; + + let send_transaction_result = + send_transaction(self.solana_rpc_client, self.keypair, ix).await; + + let Err(err) = send_transaction_result else { + // tx was successfully executed + return Ok(()) + }; + + // tx was not executed -- inspect root cause + let ComputeBudgetError::SimulationError(ref simulation) = err else { + // some kid of irrecoverable error + return Err(eyre::Error::from(err)) + }; + + // This can happen if the verifier set is too old + if LogFinder::new(simulation, "Program log: Proof validation failed").find() { + // Acceptable simulation error; proceed as successful + return Ok(()) + } + + // Return the simulation error + Err(eyre::Error::from(err)) + } +} + +#[derive(typed_builder::TypedBuilder)] +struct RotateSigners<'a> { + execute_data_pda: Pubkey, + gateway_root_pda: &'a Pubkey, + signing_verifier_set_pda: Pubkey, + new_signing_verifier_set_pda: Pubkey, + signer: Pubkey, + solana_rpc_client: &'a RpcClient, + keypair: &'a Keypair, +} + +impl<'a> RotateSigners<'a> { + #[instrument(skip_all)] + pub async fn execute(&self) -> eyre::Result<()> { + let ix = instructions::rotate_signers( + self.execute_data_pda, + *self.gateway_root_pda, + None, + self.signing_verifier_set_pda, + self.new_signing_verifier_set_pda, + self.signer, + )?; + send_transaction(self.solana_rpc_client, self.keypair, ix).await?; + Ok(()) + } +} + +#[instrument(skip_all)] +fn get_new_signing_verifier_set_pda(new_verifier_set: &VerifierSet) -> eyre::Result { + let (new_signing_verifier_set_pda, _) = gmp_gateway::get_verifier_set_tracker_pda( + &gmp_gateway::id(), + new_verifier_set.hash(hasher_impl()), + ); + Ok(new_signing_verifier_set_pda) +} + +#[instrument(skip_all, ret)] +async fn send_transaction( + solana_rpc_client: &RpcClient, + keypair: &Keypair, + ix: Instruction, +) -> Result { + effective_tx_sender::EffectiveTxSender::new(solana_rpc_client, keypair, VecDeque::from([ix])) + .evaluate_compute_ixs() + .await? + .send_tx() + .await + .map_err(eyre::Error::from) + .map_err(ComputeBudgetError::Generic) +} diff --git a/crates/solana-gateway-task-processor/src/config.rs b/crates/solana-gateway-task-processor/src/config.rs new file mode 100644 index 0000000..cc173f8 --- /dev/null +++ b/crates/solana-gateway-task-processor/src/config.rs @@ -0,0 +1,163 @@ +use serde::Deserialize; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Keypair; +use typed_builder::TypedBuilder; + +/// Configuration for the [`SolanaTxPusher`] component +#[derive(Debug, Deserialize, PartialEq, TypedBuilder)] +pub struct Config { + /// Gateway program id + #[serde(deserialize_with = "common_serde_utils::pubkey_decode")] + #[builder(default = config_defaults::gateway_program_address())] + #[serde(default = "config_defaults::gateway_program_address")] + pub gateway_program_address: Pubkey, + + /// The signing keypair for transactions. + /// Can be represented as a base58 string or 64 element array `[42, 42, ..]` + #[serde(deserialize_with = "serde_utils::deserialize_keypair")] + pub signing_keypair: Keypair, +} + +pub(crate) mod config_defaults { + use solana_sdk::pubkey::Pubkey; + + pub(crate) const fn gateway_program_address() -> Pubkey { + gmp_gateway::id() + } +} + +#[expect(clippy::min_ident_chars, reason = "part of trait definitions")] +mod serde_utils { + + use serde::de::{self, Deserializer, Visitor}; + use solana_sdk::signature::Keypair; + + pub(crate) fn deserialize_keypair<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_any(KeypairVisitor) + } + + struct KeypairVisitor; + + impl<'de> Visitor<'de> for KeypairVisitor { + type Value = Keypair; + + fn expecting(&self, formatter: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + formatter + .write_str("a base58 encoded string or an array of 64 bytes representing a keypair") + } + + fn visit_str(self, v: &str) -> Result + where + E: de::Error, + { + // Try Base58 decoding + if let Ok(keypair_bytes) = bs58::decode(v).into_vec() { + if keypair_bytes.len() == 64 { + return Keypair::from_bytes(&keypair_bytes).map_err(de::Error::custom); + } + } + + Err(de::Error::custom("Invalid keypair encoding or length")) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut bytes = Vec::with_capacity(64); + while let Some(value) = seq.next_element::()? { + bytes.push(value); + } + if bytes.len() != 64 { + return Err(de::Error::custom("Invalid keypair length")); + } + Keypair::from_bytes(&bytes).map_err(de::Error::custom) + } + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + use solana_sdk::signature::Keypair; + + use super::*; + + #[test] + fn test_deserialize_keypair_base58() { + // Generate a new Keypair and encode it in Base58 + let keypair = Keypair::new(); + let keypair_bytes = keypair.to_bytes(); + let base58_encoded = bs58::encode(&keypair_bytes).into_string(); + + // Prepare JSON data + let data = json!({ + "gateway_program_address": Pubkey::new_unique().to_string(), + "signing_keypair": base58_encoded + }); + + // Deserialize Config + let config: Config = serde_json::from_value(data).expect("Failed to deserialize Config"); + + // Check if the deserialized keypair matches the original + assert_eq!(config.signing_keypair.to_bytes(), keypair_bytes); + } + + #[test] + fn test_deserialize_keypair_array() { + // Generate a new Keypair + let keypair = Keypair::new(); + let keypair_bytes = keypair.to_bytes(); + + // Prepare JSON data + let data = json!({ + "gateway_program_address": Pubkey::new_unique().to_string(), + "signing_keypair": keypair_bytes.to_vec() + }); + + // Deserialize Config + let config: Config = serde_json::from_value(data).expect("Failed to deserialize Config"); + + // Check if the deserialized keypair matches the original + assert_eq!(config.signing_keypair.to_bytes(), keypair_bytes); + } + + #[test] + fn test_deserialize_keypair_invalid_length() { + // Create an invalid keypair byte array of incorrect length + let invalid_bytes = vec![0_u8; 63]; // Should be 64 bytes + + // Prepare JSON data + let data = json!({ + "gateway_program_address": Pubkey::new_unique().to_string(), + "signing_keypair": invalid_bytes + }); + + // Attempt to deserialize Config + let result: Result = serde_json::from_value(data); + + // Check that deserialization fails + result.unwrap_err(); + } + + #[test] + fn test_deserialize_keypair_invalid_encoding() { + // Provide an invalid encoded string + let invalid_encoded = "invalid_keypair_string"; + + // Prepare JSON data + let data = json!({ + "gateway_program_address": Pubkey::new_unique().to_string(), + "signing_keypair": invalid_encoded + }); + + // Attempt to deserialize Config + let result: Result = serde_json::from_value(data); + + // Check that deserialization fails + result.unwrap_err(); + } +} diff --git a/crates/solana-gateway-task-processor/src/lib.rs b/crates/solana-gateway-task-processor/src/lib.rs new file mode 100644 index 0000000..044b672 --- /dev/null +++ b/crates/solana-gateway-task-processor/src/lib.rs @@ -0,0 +1,7 @@ +//! Parse Amplifier API events, translate them to transaction actions to exesute on the Solana +//! blockchain + +mod component; +mod config; +pub use component::SolanaTxPusher; +pub use config::Config; diff --git a/crates/solana-listener/Cargo.toml b/crates/solana-listener/Cargo.toml index 938721d..a2c0087 100644 --- a/crates/solana-listener/Cargo.toml +++ b/crates/solana-listener/Cargo.toml @@ -16,17 +16,12 @@ eyre.workspace = true url.workspace = true serde.workspace = true relayer-engine.workspace = true -backoff.workspace = true -async-trait.workspace = true -serde_json.workspace = true chrono.workspace = true gmp-gateway.workspace = true -common-serde-utils.workspace = true +common-serde-utils = { workspace = true, features = ["solana-sdk"] } solana-client.workspace = true -solana-rpc-client.workspace = true -solana-rpc-client-api.workspace = true solana-sdk.workspace = true solana-transaction-status.workspace = true diff --git a/crates/solana-listener/src/component.rs b/crates/solana-listener/src/component.rs index 94653ba..de447be 100644 --- a/crates/solana-listener/src/component.rs +++ b/crates/solana-listener/src/component.rs @@ -4,12 +4,9 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_client::rpc_client::RpcClientConfig; -use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::signature::Signature; use crate::config; -use crate::retrying_http_sender::RetryingHttpSender; mod log_processor; mod signature_batch_scanner; @@ -36,9 +33,9 @@ pub(crate) type MessageSender = futures::channel::mpsc::UnboundedSender, sender: MessageSender, } @@ -51,7 +48,7 @@ pub struct SolanaListenerClient { impl relayer_engine::RelayerComponent for SolanaListener { fn process(self: Box) -> Pin> + Send>> { - use futures::FutureExt; + use futures::FutureExt as _; self.process_internal().boxed() } @@ -63,10 +60,11 @@ impl SolanaListener { /// The returned variable also returns a helper client that encompasses ways to communicate with /// the underlying `SolanaListener` instance. #[must_use] - pub fn new(config: config::Config) -> (Self, SolanaListenerClient) { + pub fn new(config: config::Config, rpc_client: Arc) -> (Self, SolanaListenerClient) { let (tx_outgoing, rx_outgoing) = futures::channel::mpsc::unbounded(); let this = Self { config, + rpc_client, sender: tx_outgoing, }; let client = SolanaListenerClient { @@ -77,26 +75,19 @@ impl SolanaListener { #[tracing::instrument(skip_all, name = "Solana Listener")] pub(crate) async fn process_internal(self) -> eyre::Result<()> { - let rpc_client = { - let sender = RetryingHttpSender::new( - self.config.solana_http_rpc.to_string(), - self.config.max_concurrent_rpc_requests, - ); - let config = RpcClientConfig::with_commitment(CommitmentConfig::finalized()); - let client = RpcClient::new_sender(sender, config); - Arc::new(client) - }; - // we fetch potentially missed signatures based on the provided the config - let latest = - signature_batch_scanner::scan_old_signatures(&self.config, &self.sender, &rpc_client) - .await?; + let latest = signature_batch_scanner::scan_old_signatures( + &self.config, + &self.sender, + &self.rpc_client, + ) + .await?; // we start processing realtime logs signature_realtime_scanner::process_realtime_logs( self.config, latest, - rpc_client, + self.rpc_client, self.sender, ) .await?; diff --git a/crates/solana-listener/src/component/log_processor.rs b/crates/solana-listener/src/component/log_processor.rs index e4da794..014292b 100644 --- a/crates/solana-listener/src/component/log_processor.rs +++ b/crates/solana-listener/src/component/log_processor.rs @@ -1,8 +1,8 @@ use std::sync::Arc; use chrono::DateTime; -use eyre::OptionExt; -use futures::SinkExt; +use eyre::OptionExt as _; +use futures::SinkExt as _; use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::signature::Signature; diff --git a/crates/solana-listener/src/component/signature_batch_scanner.rs b/crates/solana-listener/src/component/signature_batch_scanner.rs index d829577..fc4b9a3 100644 --- a/crates/solana-listener/src/component/signature_batch_scanner.rs +++ b/crates/solana-listener/src/component/signature_batch_scanner.rs @@ -1,4 +1,4 @@ -use core::str::FromStr; +use core::str::FromStr as _; use std::sync::Arc; use solana_client::nonblocking::rpc_client::RpcClient; diff --git a/crates/solana-listener/src/component/signature_realtime_scanner.rs b/crates/solana-listener/src/component/signature_realtime_scanner.rs index 6c5fc4c..e5947ea 100644 --- a/crates/solana-listener/src/component/signature_realtime_scanner.rs +++ b/crates/solana-listener/src/component/signature_realtime_scanner.rs @@ -1,12 +1,12 @@ -use core::str::FromStr; +use core::str::FromStr as _; use std::sync::Arc; -use futures::{SinkExt, StreamExt}; +use futures::{SinkExt as _, StreamExt as _}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::rpc_config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter}; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::signature::Signature; -use tracing::{info_span, Instrument}; +use tracing::{info_span, Instrument as _}; use super::MessageSender; use crate::component::signature_batch_scanner; diff --git a/crates/solana-listener/src/config.rs b/crates/solana-listener/src/config.rs index 916cfde..d93e757 100644 --- a/crates/solana-listener/src/config.rs +++ b/crates/solana-listener/src/config.rs @@ -11,14 +11,11 @@ use typed_builder::TypedBuilder; #[derive(Debug, Deserialize, Clone, PartialEq, Eq, TypedBuilder)] pub struct Config { /// Gateway program id - #[serde(deserialize_with = "serde_utils::pubkey_decode")] + #[serde(deserialize_with = "common_serde_utils::pubkey_decode")] #[builder(default = config_defaults::gateway_program_address())] #[serde(default = "config_defaults::gateway_program_address")] pub gateway_program_address: Pubkey, - /// The rpc of the solana node - pub solana_http_rpc: url::Url, - /// The websocket endpoint of the solana node pub solana_ws: url::Url, @@ -38,14 +35,6 @@ pub struct Config { deserialize_with = "common_serde_utils::duration_ms_decode" )] pub tx_scan_poll_period: Duration, - - /// How many rpc requests we process at the same time to get data attached to a signature - #[builder(default = config_defaults::max_concurrent_rpc_requests())] - #[serde( - rename = "max_concurrent_rpc_requests", - default = "config_defaults::max_concurrent_rpc_requests" - )] - pub max_concurrent_rpc_requests: usize, } /// The strategy which defines on how we want to handle parsing historical signatures. @@ -75,32 +64,14 @@ pub(crate) mod config_defaults { pub(crate) const fn gateway_program_address() -> Pubkey { gmp_gateway::ID } - - pub(crate) const fn max_concurrent_rpc_requests() -> usize { - 5 - } } mod serde_utils { - use core::str::FromStr; + use core::str::FromStr as _; - use serde::{Deserialize, Deserializer}; - use solana_sdk::pubkey::Pubkey; + use serde::{Deserialize as _, Deserializer}; use solana_sdk::signature::Signature; - pub(crate) fn pubkey_decode<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let raw_string = String::deserialize(deserializer)?; - let pubkey = Pubkey::from_str(raw_string.as_str()) - .inspect_err(|err| { - tracing::error!(?err, "cannot parse base58 data"); - }) - .map_err(serde::de::Error::custom)?; - Ok(pubkey) - } - pub(crate) fn signature_decode<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, diff --git a/crates/solana-listener/src/lib.rs b/crates/solana-listener/src/lib.rs index a1256b2..e61136c 100644 --- a/crates/solana-listener/src/lib.rs +++ b/crates/solana-listener/src/lib.rs @@ -2,7 +2,6 @@ mod component; mod config; -mod retrying_http_sender; pub use component::{SolanaListener, SolanaListenerClient, SolanaTransaction}; pub use config::{Config, MissedSignatureCatchupStrategy}; diff --git a/doc/adr/0004-solana-tx-sending.md b/doc/adr/0004-solana-tx-sending.md new file mode 100644 index 0000000..8b0cd99 --- /dev/null +++ b/doc/adr/0004-solana-tx-sending.md @@ -0,0 +1,50 @@ +# 4. solana tx sending + +Date: 2024-10-25 + +## Status + +Accepted + +## Context + +Our relayer system needs an efficient and reliable way to send transactions to the Solana blockchain based on tasks received from the Amplifier API. These tasks include various operations such as verifying data, executing gateway transactions, processing messages, rotating verifier sets, and handling refunds. + +Challenges influencing this decision: + +- Compute Budget Constraints: Solana imposes a compute budget limit per transaction, which can vary based on network conditions. +- Dynamic Compute Unit Pricing: The cost of compute units can fluctuate, affecting transaction prioritization and inclusion. +- Asynchronous Task Processing: The system must handle multiple tasks concurrently without blocking, ensuring scalability and responsiveness. +- Error Handling and Recovery: Transactions may fail due to simulation errors or network issues, requiring robust error handling mechanisms. + +## Decision + +We implemented a new `SolanaTxPusher` component within the relayer system to handle the submission of transactions to the Solana blockchain. Key decisions and features include: + +- Asynchronous Processing: The `SolanaTxPusher` uses a JoinSet to process incoming tasks concurrently, allowing the system to handle multiple tasks without blocking. +- Task Handling: It supports different task types from the Amplifier API, specifically focusing on GatewayTx tasks, which involve processing messages and updating verifier sets. +- Dynamic Compute Budget Adjustment: + - Introduced an `EffectiveTxSender` utility that simulates transactions to estimate the required compute units. + - Adjusts the compute budget and compute unit price based on simulation results and recent network fees to maximize the chances of transaction inclusion. +- Error Handling: + - Implements detailed error inspection after simulation to handle specific errors like insufficient compute units or invalid account data. + - Uses retries or alternative flows when acceptable simulation errors occur (e.g., already initialized PDAs). + +## Consequences + +Benefits: + +- Increased Transaction Success Rate: By simulating and adjusting compute budgets and unit prices, the system increases the likelihood of transactions being accepted by the Solana network. +- Scalability: Asynchronous task processing allows the system to scale and handle higher loads without significant performance degradation. +- Optimized Resource Usage: Dynamic adjustment of compute budgets helps in efficient utilization of resources, avoiding over-allocation or under-utilization. + +Drawbacks: + +- Increased Complexity: The introduction of transaction simulation and dynamic adjustments adds complexity to the system, which may lead to longer development and debugging times. +- Potential Latency: Additional steps like simulation and fee analysis may introduce latency in transaction processing. More HTTP round-trips. +- Error Handling Complexity: Robust error handling necessitates comprehensive testing to ensure all edge cases are covered, which can be resource-intensive. + +Risks and Mitigation: + +- Simulation Failures: If simulations fail or provide inaccurate estimates, transactions may fail. To mitigate this, the system includes error inspection and alternative handling paths. +- Security Concerns: Handling keypairs and signing transactions introduces security risks. Proper key management and secure coding practices are essential to mitigate potential vulnerabilities. diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 9935f6c..31794b2 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] # https://rust-lang.github.io/rustup-components-history/aarch64-apple-darwin.html -channel = "nightly-2024-09-24" +channel = "nightly-2024-10-24" components = ["rustfmt", "clippy", "cargo", "llvm-tools-preview"]