From 1bfb780d108c3218d15dd6e5862803ce62c25df3 Mon Sep 17 00:00:00 2001 From: Roberts Pumpurs <33699735+roberts-pumpurs@users.noreply.github.com> Date: Fri, 18 Oct 2024 13:11:58 +0300 Subject: [PATCH] feat: solana logs get parsed after the relayers initial startup (#18) * refactor: using ws endpoint for fetching logs* refactor: improve "catchup" mechanism fetching logic * refactor: clean up log fetching logic * refactor: make linter happy * docs: adr for Solana event ingestion * refactor: added websocket recovery when waiting for initial msg --- Cargo.lock | 27 +- config.example.toml | 8 +- crates/solana-axelar-relayer/src/main.rs | 17 +- .../solana-event-forwarder/src/component.rs | 5 +- crates/solana-listener/Cargo.toml | 3 +- crates/solana-listener/src/component.rs | 432 ++---------------- .../src/component/log_processor.rs | 78 ++++ .../src/component/signature_batch_scanner.rs | 193 ++++++++ .../component/signature_realtime_scanner.rs | 91 ++++ crates/solana-listener/src/config.rs | 45 +- crates/solana-listener/src/lib.rs | 2 +- .../src/retrying_http_sender.rs | 28 +- doc/adr/0003-solana-event-fetching.md | 64 +++ 13 files changed, 555 insertions(+), 438 deletions(-) create mode 100644 crates/solana-listener/src/component/log_processor.rs create mode 100644 crates/solana-listener/src/component/signature_batch_scanner.rs create mode 100644 crates/solana-listener/src/component/signature_realtime_scanner.rs create mode 100644 doc/adr/0003-solana-event-fetching.md diff --git a/Cargo.lock b/Cargo.lock index 227ee33..75fcd98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.14" +version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "998282f8f49ccd6116b0ed8a4de0fbd3151697920e7c7533416d6e25e76434a7" +checksum = "e26a9844c659a2a293d239c7910b752f8487fe122c6c8bd1659bf85a6507c302" dependencies = [ "brotli", "flate2", @@ -1053,9 +1053,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94bbb0ad554ad961ddc5da507a12a29b14e4ae5bda06b19f575a3e6079d2e2ae" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" dependencies = [ "bytemuck_derive", ] @@ -1098,9 +1098,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.29" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58e804ac3194a48bb129643eb1d62fcc20d18c6b8c181704489353d13120bcd1" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" dependencies = [ "jobserver", "libc", @@ -3604,9 +3604,9 @@ dependencies = [ [[package]] name = "pest" -version = "2.7.13" +version = "2.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdbef9d1d47087a895abd220ed25eb4ad973a5e26f6a4367b038c25e28dfc2d9" +checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442" dependencies = [ "memchr", "thiserror", @@ -4634,9 +4634,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" [[package]] name = "rustls-webpki" @@ -4661,9 +4661,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "rusty-fork" @@ -5271,10 +5271,10 @@ version = "0.1.0" dependencies = [ "async-trait", "backoff", + "chrono", "common-serde-utils", "eyre", "futures", - "futures-concurrency", "gmp-gateway", "relayer-engine", "serde", @@ -5284,7 +5284,6 @@ dependencies = [ "solana-rpc-client-api", "solana-sdk", "solana-transaction-status", - "thiserror", "tokio", "tracing", "typed-builder", diff --git a/config.example.toml b/config.example.toml index cb06964..44bb431 100644 --- a/config.example.toml +++ b/config.example.toml @@ -16,4 +16,10 @@ chain = "solana-devnet" bind_addr = "0.0.0.0:3000" [solana_listener_component] -solana_rpc = "https://api.devnet.solana.com" +solana_http_rpc = "https://api.devnet.solana.com" + +# the officia ws endpoints hosted by solana sometimes refuse to accept new connections / drop the connection frequently. +# using helius hosted nodes (even the free tier) often at times yields better results. +solana_ws = "wss://api.devnet.solana.com" +# solana_ws = "wss://devnet.helius-rpc.com/?{your api key}" +missed_signature_catchup_strategy = "until_beginning" diff --git a/crates/solana-axelar-relayer/src/main.rs b/crates/solana-axelar-relayer/src/main.rs index dd27d27..c025a06 100644 --- a/crates/solana-axelar-relayer/src/main.rs +++ b/crates/solana-axelar-relayer/src/main.rs @@ -78,6 +78,8 @@ mod tests { use amplifier_api::identity::Identity; use pretty_assertions::assert_eq; use solana_listener::solana_sdk::pubkey::Pubkey; + use solana_listener::solana_sdk::signature::Signature; + use solana_listener::MissedSignatureCatchupStrategy; use crate::Config; @@ -88,11 +90,14 @@ mod tests { let chain = "solana-devnet"; let gateway_program_address = Pubkey::new_unique(); let gateway_program_address_as_str = gateway_program_address.to_string(); - let solana_rpc = "https://solana-devnet.com".parse()?; + let solana_rpc = "https://api.solana-devnet.com".parse()?; + let solana_ws = "wss://api.solana-devnet.com".parse()?; let solana_tx_scan_poll_period = Duration::from_millis(42); let solana_tx_scan_poll_period_ms = solana_tx_scan_poll_period.as_millis(); let max_concurrent_rpc_requests = 100; + let latest_processed_signature = Signature::new_unique().to_string(); let identity = identity_fixture(); + let missed_signature_catchup_strategy = "until_beginning"; let input = indoc::formatdoc! {r#" [amplifier_component] identity = ''' @@ -107,9 +112,12 @@ mod tests { [solana_listener_component] gateway_program_address = "{gateway_program_address_as_str}" - solana_rpc = "{solana_rpc}" + solana_http_rpc = "{solana_rpc}" + solana_ws = "{solana_ws}" tx_scan_poll_period_in_milliseconds = {solana_tx_scan_poll_period_ms} max_concurrent_rpc_requests = {max_concurrent_rpc_requests} + missed_signature_catchup_strategy = "{missed_signature_catchup_strategy}" + latest_processed_signature = "{latest_processed_signature}" "#}; let parsed: Config = toml::from_str(&input)?; @@ -126,9 +134,12 @@ mod tests { }, solana_listener_component: solana_listener::Config { gateway_program_address, - solana_rpc, + solana_http_rpc: solana_rpc, tx_scan_poll_period: solana_tx_scan_poll_period, max_concurrent_rpc_requests, + solana_ws, + missed_signature_catchup_strategy: MissedSignatureCatchupStrategy::UntilBeginning, + latest_processed_signature: Some(Signature::from_str(&latest_processed_signature)?), }, }; assert_eq!(parsed, expected); diff --git a/crates/solana-event-forwarder/src/component.rs b/crates/solana-event-forwarder/src/component.rs index c0531f7..c3bad43 100644 --- a/crates/solana-event-forwarder/src/component.rs +++ b/crates/solana-event-forwarder/src/component.rs @@ -3,7 +3,6 @@ use core::pin::Pin; use futures::{SinkExt, StreamExt}; use gmp_gateway::events::{EventContainer, GatewayEvent}; -use relayer_amplifier_api_integration::amplifier_api::chrono::DateTime; use relayer_amplifier_api_integration::amplifier_api::types::{ CallEvent, Event, EventBase, EventId, EventMetadata, GatewayV2Message, MessageId, PublishEventsRequest, TxId, @@ -200,9 +199,7 @@ fn map_gateway_event_to_amplifier_event( event_id, meta: Some(EventMetadata { tx_id: Some(tx_id), - timestamp: message - .block_time - .and_then(|date_time| DateTime::from_timestamp(date_time, 0)), + timestamp: message.timestamp, from_address: Some(source_address.clone()), finalized: Some(true), }), diff --git a/crates/solana-listener/Cargo.toml b/crates/solana-listener/Cargo.toml index dd2d369..938721d 100644 --- a/crates/solana-listener/Cargo.toml +++ b/crates/solana-listener/Cargo.toml @@ -11,16 +11,15 @@ edition.workspace = true typed-builder.workspace = true futures.workspace = true tracing.workspace = true -futures-concurrency.workspace = true tokio.workspace = true eyre.workspace = true url.workspace = true serde.workspace = true relayer-engine.workspace = true -thiserror.workspace = true backoff.workspace = true async-trait.workspace = true serde_json.workspace = true +chrono.workspace = true gmp-gateway.workspace = true common-serde-utils.workspace = true diff --git a/crates/solana-listener/src/component.rs b/crates/solana-listener/src/component.rs index f991b17..94653ba 100644 --- a/crates/solana-listener/src/component.rs +++ b/crates/solana-listener/src/component.rs @@ -1,18 +1,19 @@ use core::future::Future; use core::pin::Pin; -use core::str::FromStr; use std::sync::Arc; -use futures_concurrency::future::FutureExt; +use chrono::{DateTime, Utc}; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config; +use solana_client::rpc_client::RpcClientConfig; use solana_sdk::commitment_config::CommitmentConfig; -use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; -use tokio::sync::Semaphore; -use tokio::task::JoinSet; use crate::config; +use crate::retrying_http_sender::RetryingHttpSender; + +mod log_processor; +mod signature_batch_scanner; +mod signature_realtime_scanner; /// Typical message with the produced work. /// Contains the handle to a task that resolves into a @@ -21,10 +22,10 @@ use crate::config; pub struct SolanaTransaction { /// signature of the transaction (id) pub signature: Signature, + /// optional timespamp + pub timestamp: Option>, /// The raw transaction logs pub logs: Vec, - /// the regisetred timestamp of the tx - pub block_time: Option, /// the slot number of the tx pub slot: u64, } @@ -76,409 +77,30 @@ impl SolanaListener { #[tracing::instrument(skip_all, name = "Solana Listener")] pub(crate) async fn process_internal(self) -> eyre::Result<()> { - let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_rpc_requests)); - let (tx, rx) = futures::channel::mpsc::unbounded(); - let scanner = signature_scanner::run( - self.config.gateway_program_address, - self.config.solana_rpc.clone(), - tx, - self.config.tx_scan_poll_period, - Arc::clone(&semaphore), - self.config.max_concurrent_rpc_requests, - ); - - let tx_retriever = - transaction_retriever::run(self.config.solana_rpc, rx, self.sender, semaphore); - - scanner.race(tx_retriever).await?; - eyre::bail!("listener crashed"); - } -} - -/// Functions to obtain transaction signatures from Solana RPC. -pub(crate) mod signature_scanner { - - use core::time::Duration; - use std::sync::Arc; - - use eyre::Context; - use futures::channel::mpsc::UnboundedSender; - use futures::SinkExt; - use tokio::sync::Semaphore; - use tracing::trace; - use url::Url; - - use super::*; - - /// Continuously fetches signatures from RPC and pipe them over a channel to - /// further processing. - /// - /// # Cancelation Safety - /// - /// This function is cancel safe. All lost work can be recovered as the - /// task's savepoint is sourced from the persistence layer, which - /// remains unchanged in this context. - #[tracing::instrument(name = "signature scanner", skip_all, err)] - pub async fn run( - address: Pubkey, - url: Url, - signature_sender: UnboundedSender, - period: Duration, - semaphore: Arc, - max_concurrent_rpc_requests: usize, - ) -> eyre::Result<()> { - let mut interval = tokio::time::interval(period); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let rpc_client = Arc::new(RpcClient::new(url.to_string())); - let max_concurrent_rpc_requests = u32::try_from(max_concurrent_rpc_requests)?; - - let mut last_processed_signature = None; - loop { - // Greedly ask for all available permits from this semaphore, to ensure this - // task will not concur with `transaction_retriever::fetch` tasks. - let all_permits = Arc::clone(&semaphore) - .acquire_many_owned(max_concurrent_rpc_requests) - .await?; - - trace!("acquired all semaphore permits (exclusive access)"); - - // Now that we have all permits, scan for signatures - collect_and_process_signatures( - address, - Arc::clone(&rpc_client), - signature_sender.clone(), - &mut last_processed_signature, - ) - .await?; - - // Give all permits back to the semaphore. - drop(all_permits); - - // Give some time for downstream futures to acquire permits from this semaphore. - trace!("sleeping"); - interval.tick().await; - } - } - - /// Calls Solana RPC after relevant transaction signatures and send results - /// over a channel. - #[tracing::instrument(skip_all, err)] - async fn collect_and_process_signatures( - address: Pubkey, - rpc_client: Arc, - mut signature_sender: UnboundedSender, - last_processed_signature: &mut Option, - ) -> eyre::Result<()> { - let collected_signatures = fetch_signatures_until_exhaustion( - Arc::clone(&rpc_client), - address, - None, - last_processed_signature, - ) - .await?; - - // Iterate backwards so oldest signatures are picked up first on the other end. - for signature in collected_signatures.into_iter().rev() { - signature_sender - .send(signature) - .await - .wrap_err("signature sender dropped")?; - } - Ok(()) - } - - /// Fetches all Solana transaction signatures for an address until a - /// specified signature is reached or no more transactions are - /// available. - #[tracing::instrument(skip(rpc_client, address), err)] - async fn fetch_signatures_until_exhaustion( - rpc_client: Arc, - address: Pubkey, - until: Option, - last_visited: &mut Option, - ) -> eyre::Result> { - /// This is the max number of signatures returned by the Solana RPC. It - /// is used as an indicator to tell if we need to continue - /// querying the RPC for more signatures. - const LIMIT: usize = 1_000; - - // Helper function to setup the configuration at each loop - let config = |before: Option| GetConfirmedSignaturesForAddress2Config { - // Only signatures before the specified signature. - before, - // Only signatures after the specified signature. - until, - limit: Some(LIMIT), - commitment: Some(CommitmentConfig::finalized()), - }; - - let mut collected_signatures = vec![]; - loop { - let batch = rpc_client - .get_signatures_for_address_with_config(&address, config(*last_visited)) - .await?; - - // Get the last (oldest) signature on this batch or break if it is empty - let Some(oldest) = batch.last() else { break }; - - // Set up following calls to start from the point this one had left - last_visited.replace(Signature::from_str(&oldest.signature)?); - - let batch_size = batch.len(); - collected_signatures.extend(batch.into_iter()); - - // If the results are less than the limit, then it means we have all the - // signatures we need. - if batch_size < LIMIT { - break; - }; - } - - Ok(collected_signatures - .into_iter() - .map(|status| Signature::from_str(&status.signature)) - .collect::, _>>()?) - } -} - -/// Functions to resolve transaction signatures into full transactions, with -/// metadata. -pub(crate) mod transaction_retriever { - use core::task::Poll; - use std::sync::Arc; - - use eyre::Context; - use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; - use futures::stream::FusedStream; - use futures::StreamExt; - use solana_client::client_error::ClientError; - use solana_client::rpc_client::RpcClientConfig; - use solana_client::rpc_config::RpcTransactionConfig; - use solana_transaction_status::option_serializer::OptionSerializer; - use solana_transaction_status::{ - EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding, - }; - use thiserror::Error; - use tokio::sync::{AcquireError, Semaphore}; - use tracing::{debug, info_span, Instrument}; - use url::Url; - - use super::*; - use crate::retrying_http_sender::RetryingHttpSender; - - #[derive(Error, Debug)] - pub(crate) enum TransactionRetrieverError { - #[error("Failed to decode solana transaction: {signature}")] - TransactionDecode { signature: Signature }, - #[error(transparent)] - NonFatal(#[from] NonFatalError), - /// This variant's value needs to be boxed to prevent a recursive type - /// definition, since this error is also part of - /// [`TransactionScannerMessage`]. - #[error("Failed to send processed transaction for event analysis: {0}")] - SendTransactionError(#[from] Box), - #[error(transparent)] - SolanaClient(#[from] ClientError), - #[error("Failed to acquire a semaphore permit")] - SemaphoreClosed(#[from] AcquireError), - } - - /// Errors that shouldn't halt the Sentinel. - #[derive(Error, Debug)] - pub(crate) enum NonFatalError { - #[error("Got wrong signature from RPC. Expected: {expected}, received: {received}")] - WrongTransactionReceived { - expected: Signature, - received: Signature, - }, - #[error("Got a transaction without meta attribute: {signature}")] - TransactionWithoutMeta { signature: Signature }, - #[error("Got a transaction without logs: {signature}")] - TransactionWithoutLogs { signature: Signature }, - } - - /// Asynchronously processes incoming transaction signatures by spawning - /// Tokio tasks to retrieve full transaction details. - /// - /// Tasks wait acquiring a semaphore permit before reaching the Solana RPC - /// endpoint. - /// - /// Successfully fetched transactions are sent through a channel for - /// further processing. - /// - /// # Cancellation Safety - /// - /// This function is cancel safe. All lost work can be recovered as the - /// task's savepoint is sourced from the persistence layer, which - /// remains unchanged in this context. - #[tracing::instrument(name = "transaction-retriever", skip_all)] - pub async fn run( - url: Url, - signature_receiver: UnboundedReceiver, - transaction_sender: UnboundedSender, - semaphore: Arc, - ) -> eyre::Result<()> { let rpc_client = { - let sender = RetryingHttpSender::new(url.to_string()); - let config = RpcClientConfig::with_commitment(CommitmentConfig::confirmed()); + let sender = RetryingHttpSender::new( + self.config.solana_http_rpc.to_string(), + self.config.max_concurrent_rpc_requests, + ); + let config = RpcClientConfig::with_commitment(CommitmentConfig::finalized()); let client = RpcClient::new_sender(sender, config); Arc::new(client) }; - let mut join_set = JoinSet::>::new(); - let mut signature_receiver = signature_receiver.fuse(); - let mut task = futures::stream::poll_fn(move |cx| { - match signature_receiver.poll_next_unpin(cx) { - Poll::Ready(Some(signature)) => { - join_set.spawn({ - let semaphore = Arc::clone(&semaphore); - let rpc_client = Arc::clone(&rpc_client); - let transaction_sender = transaction_sender.clone(); - async move { - let tx_result = - fetch_with_permit(signature, rpc_client, semaphore).await; - - match tx_result { - Ok(tx) => { - tracing::debug!(?tx, "solana tx retrieved"); - tracing::info!(?tx.signature, "solana tx retrieved"); - transaction_sender - .unbounded_send(tx) - .wrap_err("transaction sender failed")?; - } - Err(err) => match err { - TransactionRetrieverError::NonFatal(non_fatal_error) => { - tracing::warn!( - ?non_fatal_error, - "tx scanner returned non-fatal error" - ); - } - fatal_error @ - (TransactionRetrieverError::TransactionDecode { .. } | - TransactionRetrieverError::SendTransactionError(_) | - TransactionRetrieverError::SolanaClient(_) | - TransactionRetrieverError::SemaphoreClosed(_)) => { - return Err(fatal_error).wrap_err("tx retriever error") - } - }, - } - - Ok(()) - } - .instrument(info_span!("fetching tx data from signature")) - }); - } - Poll::Pending => (), - Poll::Ready(None) => { - tracing::error!("interval stream closed"); - join_set.abort_all(); - } - } - - // check if any background tasks are done - match join_set.poll_join_next(cx) { - Poll::Ready(Some(res)) => Poll::Ready(Some(res)), - // join set returns `Poll::Ready(None)` when it's empty - Poll::Ready(None) => { - if signature_receiver.is_terminated() { - return Poll::Ready(None) - } - Poll::Pending - } - Poll::Pending => Poll::Pending, - } - }); - - while let Some(task_result) = task.next().await { - let Ok(res) = task_result else { - tracing::error!(?task_result, "background task panicked"); - continue; - }; - let Err(err) = res else { - continue; - }; - - tracing::error!(?err, "background task returned an error"); - } - eyre::bail!("signature receiver closed") - } - - /// Fetches a Solana transaction by calling the `getTransactionWithConfig` - /// RPC method with its signature and decoding the result. - #[tracing::instrument(skip(rpc_client))] - async fn fetch( - signature: Signature, - rpc_client: Arc, - ) -> Result { - let config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Base64), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let EncodedConfirmedTransactionWithStatusMeta { - block_time, - slot, - transaction: transaction_with_meta, - } = rpc_client - .get_transaction_with_config(&signature, config) - .await?; - - let decoded_transaction = transaction_with_meta - .transaction - .decode() - .ok_or_else(|| TransactionRetrieverError::TransactionDecode { signature })?; - - // Check: This is the transaction we asked - if !decoded_transaction.signatures.contains(&signature) { - return Err(NonFatalError::WrongTransactionReceived { - expected: signature, - received: *decoded_transaction - .signatures - .first() - .expect("Solana transaction should have at least one signature"), - } - .into()); - } - - let meta = transaction_with_meta - .meta - .ok_or(NonFatalError::TransactionWithoutMeta { signature })?; - - let OptionSerializer::Some(logs) = meta.log_messages else { - return Err(NonFatalError::TransactionWithoutLogs { signature }.into()) - }; - - let transaction = SolanaTransaction { - signature, - logs, - block_time, - slot, - }; - - debug!( - block_time = ?transaction.block_time, - slot = %transaction.slot, - "found solana transaction" - ); + // we fetch potentially missed signatures based on the provided the config + let latest = + signature_batch_scanner::scan_old_signatures(&self.config, &self.sender, &rpc_client) + .await?; - Ok(transaction) - } + // we start processing realtime logs + signature_realtime_scanner::process_realtime_logs( + self.config, + latest, + rpc_client, + self.sender, + ) + .await?; - /// Fetches a Solana transaction for the given signature once a semaphore - /// permit is acquired. - /// - /// # Cancellation Safety - /// - /// This function is cancel safe. It will return without reaching the Solana - /// RPC endpoint if a cancellation signal is received while waiting for - /// a semaphore permit. - async fn fetch_with_permit( - signature: Signature, - rpc_client: Arc, - semaphore: Arc, - ) -> Result { - let _permit = semaphore.acquire_owned().await?; - fetch(signature, rpc_client).await + eyre::bail!("listener crashed"); } } diff --git a/crates/solana-listener/src/component/log_processor.rs b/crates/solana-listener/src/component/log_processor.rs new file mode 100644 index 0000000..e4da794 --- /dev/null +++ b/crates/solana-listener/src/component/log_processor.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; + +use chrono::DateTime; +use eyre::OptionExt; +use futures::SinkExt; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::signature::Signature; +use solana_transaction_status::option_serializer::OptionSerializer; +use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding}; +use tokio::task::JoinSet; + +use super::{MessageSender, SolanaTransaction}; + +pub(crate) async fn fetch_and_send( + fetched_signatures: impl Iterator, + rpc_client: Arc, + signature_sender: MessageSender, +) -> Result<(), eyre::Error> { + let mut log_fetch_js = JoinSet::new(); + for signature in fetched_signatures { + log_fetch_js.spawn({ + let rpc_client = Arc::clone(&rpc_client); + let mut signature_sender = signature_sender.clone(); + async move { + let tx = fetch_logs(signature, &rpc_client).await?; + signature_sender.send(tx).await?; + Result::<_, eyre::Report>::Ok(()) + } + }); + } + while let Some(item) = log_fetch_js.join_next().await { + if let Err(err) = item? { + tracing::warn!(?err, "error when parsing tx"); + } + } + Ok(()) +} + +async fn fetch_logs( + signature: Signature, + rpc_client: &RpcClient, +) -> eyre::Result { + use solana_client::rpc_config::RpcTransactionConfig; + let config = RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Base64), + commitment: Some(CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }; + + let EncodedConfirmedTransactionWithStatusMeta { + slot, + transaction: transaction_with_meta, + block_time, + } = rpc_client + .get_transaction_with_config(&signature, config) + .await?; + + let meta = transaction_with_meta + .meta + .ok_or_eyre("metadat not included with logs")?; + + let OptionSerializer::Some(logs) = meta.log_messages else { + eyre::bail!("logs not included"); + }; + if meta.err.is_some() { + eyre::bail!("tx was not successful"); + } + + let transaction = SolanaTransaction { + signature, + logs, + slot, + timestamp: block_time.and_then(|secs| DateTime::from_timestamp(secs, 0)), + }; + + Ok(transaction) +} diff --git a/crates/solana-listener/src/component/signature_batch_scanner.rs b/crates/solana-listener/src/component/signature_batch_scanner.rs new file mode 100644 index 0000000..d829577 --- /dev/null +++ b/crates/solana-listener/src/component/signature_batch_scanner.rs @@ -0,0 +1,193 @@ +use core::str::FromStr; +use std::sync::Arc; + +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; + +use super::{MessageSender, SolanaTransaction}; +use crate::component::log_processor; +use crate::config::MissedSignatureCatchupStrategy; + +#[tracing::instrument(skip_all, name = "scan old signatures")] +pub(crate) async fn scan_old_signatures( + config: &crate::Config, + signature_sender: &futures::channel::mpsc::UnboundedSender, + rpc_client: &Arc, +) -> Result, eyre::Error> { + let latest_processed_signature = match ( + &config.missed_signature_catchup_strategy, + config.latest_processed_signature, + ) { + (&MissedSignatureCatchupStrategy::None, None) => { + tracing::info!( + "Starting from the latest available signature as no catch-up is configured and no latest signature is known." + ); + None + } + (&MissedSignatureCatchupStrategy::None, Some(latest_signature)) => { + tracing::info!( + ?latest_signature, + "Starting from the latest processed signature", + ); + Some(latest_signature) + } + ( + &MissedSignatureCatchupStrategy::UntilSignatureReached(target_signature), + latest_signature, + ) => { + tracing::info!( + ?target_signature, + ?latest_signature, + "Catching up missed signatures until target signature", + ); + fetch_batches_in_range( + config, + Arc::clone(rpc_client), + signature_sender, + Some(target_signature), + latest_signature, + ) + .await? + } + (&MissedSignatureCatchupStrategy::UntilBeginning, latest_signature) => { + tracing::info!( + ?latest_signature, + "Catching up all missed signatures starting from", + ); + fetch_batches_in_range( + config, + Arc::clone(rpc_client), + signature_sender, + None, + latest_signature, + ) + .await? + } + }; + + Ok(latest_processed_signature) +} + +#[tracing::instrument(skip_all, err)] +pub(crate) async fn fetch_batches_in_range( + config: &crate::Config, + rpc_client: Arc, + signature_sender: &MessageSender, + t1_signature: Option, + mut t2_signature: Option, +) -> Result, eyre::Error> { + let mut interval = tokio::time::interval(config.tx_scan_poll_period); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // Track the chronologically youngest t2 that we've seen + let mut chronologically_newest_signature = t2_signature; + + loop { + let mut fetcher = SignatureRangeFetcher { + t1: t1_signature, + t2: t2_signature, + rpc_client: Arc::clone(&rpc_client), + address: config.gateway_program_address, + signature_sender: signature_sender.clone(), + }; + + let res = fetcher.fetch().await?; + match res { + FetchingState::Completed => break, + FetchingState::FetchAgain { new_t2 } => { + // Set the newest signature only once + if chronologically_newest_signature.is_none() { + chronologically_newest_signature = Some(new_t2); + } + + // Update t2 to fetch older signatures + t2_signature = Some(new_t2); + } + } + // Sleep to avoid rate limiting + interval.tick().await; + } + Ok(chronologically_newest_signature) +} + +enum FetchingState { + Completed, + FetchAgain { new_t2: Signature }, +} + +struct SignatureRangeFetcher { + t1: Option, + t2: Option, + rpc_client: Arc, + address: Pubkey, + signature_sender: MessageSender, +} + +impl SignatureRangeFetcher { + #[tracing::instrument(skip(self), fields(t1 = ?self.t1, t2 = ?self.t2))] + async fn fetch(&mut self) -> eyre::Result { + /// The maximum allowed by the Solana RPC is 1000. We use a smaller limit to reduce load. + const LIMIT: usize = 10; + + tracing::debug!(?self.address, "Fetching signatures"); + + let fetched_signatures = self + .rpc_client + .get_signatures_for_address_with_config( + &self.address, + GetConfirmedSignaturesForAddress2Config { + // start searching backwards from this transaction signature. If not provided + // the search starts from the top of the highest max confirmed block. + before: self.t2, + // search until this transaction signature, if found before limit reached + until: self.t1, + limit: Some(LIMIT), + commitment: Some(CommitmentConfig::finalized()), + }, + ) + .await?; + + let total_signatures = fetched_signatures.len(); + tracing::info!(total_signatures, "Fetched new set of signatures"); + + if fetched_signatures.is_empty() { + tracing::info!("No more signatures to fetch"); + return Ok(FetchingState::Completed); + } + + let (chronologically_oldest_signature, _) = + match (fetched_signatures.last(), fetched_signatures.first()) { + (Some(oldest), Some(newest)) => ( + Signature::from_str(&oldest.signature)?, + Signature::from_str(&newest.signature)?, + ), + _ => return Ok(FetchingState::Completed), + }; + + let fetched_signatures_iter = fetched_signatures + .into_iter() + .flat_map(|status| Signature::from_str(&status.signature)) + .rev(); + + // Fetch logs and send them via the sender + log_processor::fetch_and_send( + fetched_signatures_iter, + Arc::clone(&self.rpc_client), + self.signature_sender.clone(), + ) + .await?; + + if total_signatures < LIMIT { + tracing::info!("Fetched all available signatures in the range"); + Ok(FetchingState::Completed) + } else { + tracing::info!("More signatures available, continuing fetch"); + Ok(FetchingState::FetchAgain { + new_t2: chronologically_oldest_signature, + }) + } + } +} diff --git a/crates/solana-listener/src/component/signature_realtime_scanner.rs b/crates/solana-listener/src/component/signature_realtime_scanner.rs new file mode 100644 index 0000000..6c5fc4c --- /dev/null +++ b/crates/solana-listener/src/component/signature_realtime_scanner.rs @@ -0,0 +1,91 @@ +use core::str::FromStr; +use std::sync::Arc; + +use futures::{SinkExt, StreamExt}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter}; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::signature::Signature; +use tracing::{info_span, Instrument}; + +use super::MessageSender; +use crate::component::signature_batch_scanner; +use crate::SolanaTransaction; + +#[tracing::instrument(skip_all, err, name = "realtime log ingestion")] +pub(crate) async fn process_realtime_logs( + config: crate::Config, + latest_processed_signature: Option, + rpc_client: Arc, + mut signature_sender: MessageSender, +) -> Result<(), eyre::Error> { + let gateway_program_address = config.gateway_program_address; + loop { + tracing::info!(endpoint =? config.solana_ws.as_str(), ?gateway_program_address, "init new WS connection"); + let client = + solana_client::nonblocking::pubsub_client::PubsubClient::new(config.solana_ws.as_str()) + .await?; + let (ws_stream, _unsubscribe) = client + .logs_subscribe( + RpcTransactionLogsFilter::Mentions(vec![gateway_program_address.to_string()]), + RpcTransactionLogsConfig { + commitment: Some(CommitmentConfig::finalized()), + }, + ) + .await?; + let mut ws_stream = ws_stream + .filter(|item| { + // only keep non-error items + core::future::ready(item.value.err.is_none()) + }) + .filter_map(|item| { + // parse the returned data into a format we can forward to other components + core::future::ready({ + Signature::from_str(&item.value.signature) + .map(|signature| { + SolanaTransaction { + // timestamp not available via the the WS API + timestamp: None, + signature, + logs: item.value.logs, + slot: item.context.slot, + } + }) + .ok() + }) + }) + .inspect(|item| { + tracing::info!(item = ?item.signature, "found tx"); + }) + .boxed(); + + // It takes a few seconds for the Solana node to accept the WS connection. + // During this time we might have already missed a few signatures. + // We attempt to fetch the diff here. + // This will only trigger upon the very first WS returned signature + let next = ws_stream.next().await; + let Some(t2_signature) = next else { + // reconnect if connection dropped + continue; + }; + + signature_batch_scanner::fetch_batches_in_range( + &config, + Arc::clone(&rpc_client), + &signature_sender, + Some(t2_signature.signature), + latest_processed_signature, + ) + .instrument(info_span!("fetching missed signatures")) + .await?; + // forward the tx data to be processed + signature_sender.send(t2_signature).await?; + + // start processing the rest of the messages + tracing::info!("waiting realtime logs"); + while let Some(item) = ws_stream.next().await { + signature_sender.send(item).await?; + } + tracing::warn!("websocket stream exited"); + } +} diff --git a/crates/solana-listener/src/config.rs b/crates/solana-listener/src/config.rs index 7478d07..916cfde 100644 --- a/crates/solana-listener/src/config.rs +++ b/crates/solana-listener/src/config.rs @@ -4,6 +4,7 @@ use core::time::Duration; use serde::Deserialize; use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; use typed_builder::TypedBuilder; /// Top-level configuration for the solana component. @@ -16,7 +17,18 @@ pub struct Config { pub gateway_program_address: Pubkey, /// The rpc of the solana node - pub solana_rpc: url::Url, + pub solana_http_rpc: url::Url, + + /// The websocket endpoint of the solana node + pub solana_ws: url::Url, + + /// This defines how to handle missed signatures upon startup + pub missed_signature_catchup_strategy: MissedSignatureCatchupStrategy, + + /// This defines the latest signature that we have parsed + #[serde(default)] + #[serde(deserialize_with = "serde_utils::signature_decode")] + pub latest_processed_signature: Option, /// How often we want to poll the network for new signatures #[builder(default = config_defaults::tx_scan_poll_period())] @@ -36,6 +48,21 @@ pub struct Config { pub max_concurrent_rpc_requests: usize, } +/// The strategy which defines on how we want to handle parsing historical signatures. +/// +/// It is useful for when you want to double-check or suspect that the relayer has missed some txs +/// in the past. +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MissedSignatureCatchupStrategy { + /// Don't parse historical signatures at all + None, + /// Parse all signatures until the initial gateway deployment + UntilBeginning, + /// Parse all signtatures until we reach the desired end signature. + UntilSignatureReached(Signature), +} + pub(crate) mod config_defaults { use core::time::Duration; @@ -59,6 +86,7 @@ mod serde_utils { use serde::{Deserialize, Deserializer}; use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::Signature; pub(crate) fn pubkey_decode<'de, D>(deserializer: D) -> Result where @@ -67,9 +95,22 @@ mod serde_utils { let raw_string = String::deserialize(deserializer)?; let pubkey = Pubkey::from_str(raw_string.as_str()) .inspect_err(|err| { - tracing::error!(?err, "cannot parse base64 data"); + tracing::error!(?err, "cannot parse base58 data"); }) .map_err(serde::de::Error::custom)?; Ok(pubkey) } + + pub(crate) fn signature_decode<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + Option::::deserialize(deserializer)? + .map(|raw_string| { + Signature::from_str(&raw_string).map_err(|err| { + serde::de::Error::custom(format!("Cannot parse signature: {err}")) + }) + }) + .transpose() + } } diff --git a/crates/solana-listener/src/lib.rs b/crates/solana-listener/src/lib.rs index 4ae8c9b..a1256b2 100644 --- a/crates/solana-listener/src/lib.rs +++ b/crates/solana-listener/src/lib.rs @@ -5,5 +5,5 @@ mod config; mod retrying_http_sender; pub use component::{SolanaListener, SolanaListenerClient, SolanaTransaction}; -pub use config::Config; +pub use config::{Config, MissedSignatureCatchupStrategy}; pub use solana_sdk; diff --git a/crates/solana-listener/src/retrying_http_sender.rs b/crates/solana-listener/src/retrying_http_sender.rs index e0a160a..00d145e 100644 --- a/crates/solana-listener/src/retrying_http_sender.rs +++ b/crates/solana-listener/src/retrying_http_sender.rs @@ -1,4 +1,5 @@ use core::time::Duration; +use std::sync::Arc; use async_trait::async_trait; use backoff::future::retry; @@ -10,6 +11,7 @@ use solana_rpc_client::http_sender::HttpSender; use solana_rpc_client::rpc_sender::RpcSender; use solana_rpc_client_api::client_error::Result as ClientResult; use solana_rpc_client_api::request::RpcRequest; +use tokio::sync::Semaphore; use tracing::error; /// The maximum elapsed time for retrying failed requests. @@ -17,12 +19,19 @@ const TWO_MINUTES: Duration = Duration::from_millis(2 * 60 * 1_000); /// A wrapper around `HttpSender` that adds retry logic for sending RPC /// requests. -pub(crate) struct RetryingHttpSender(HttpSender); +pub(crate) struct RetryingHttpSender { + http_client: HttpSender, + request_permit: Arc, +} impl RetryingHttpSender { - pub(crate) fn new(url: String) -> Self { + pub(crate) fn new(url: String, max_concurrent_requests: usize) -> Self { let http = HttpSender::new(url); - Self(http) + let request_permit = Arc::new(Semaphore::new(max_concurrent_requests)); + Self { + http_client: http, + request_permit, + } } async fn send_internal( @@ -33,7 +42,14 @@ impl RetryingHttpSender { use ClientErrorKind::{ Custom, Io, Middleware, Reqwest, RpcError, SerdeJson, SigningError, TransactionError, }; - self.0 + // get the permit to make the request + let _permit = Arc::clone(&self.request_permit) + .acquire_owned() + .await + .expect("the semaphore will never be closed"); + + // make the actual request + self.http_client .send(request, params.clone()) .await .inspect_err(|error| error!(%error)) @@ -61,10 +77,10 @@ impl RpcSender for RetryingHttpSender { } fn get_transport_stats(&self) -> RpcTransportStats { - self.0.get_transport_stats() + self.http_client.get_transport_stats() } fn url(&self) -> String { - self.0.url() + self.http_client.url() } } diff --git a/doc/adr/0003-solana-event-fetching.md b/doc/adr/0003-solana-event-fetching.md new file mode 100644 index 0000000..ff7b187 --- /dev/null +++ b/doc/adr/0003-solana-event-fetching.md @@ -0,0 +1,64 @@ +# 3. Solana Event Parsing + +Date: 2024-10-17 + +## Status + +Accepted + +## Context + +We need to fetch logs from the Solana blockchain, examine them, compose them into an Amplifier API event, and forward it to the Axelar blockchain. On Solana, logs are attached to `signatures`, which act as unique transaction identifiers. We are interested in signatures that involve the Gateway program ID. + +Solana's RPC offers methods for interacting with the blockchain: + +- The [`getSignaturesForAddress`](https://solana.com/docs/rpc/http/getsignaturesforaddress) method allows fetching signatures in a range. It can be used to poll the node and query historical signatures in batches. However, this method does not return the logs associated with the signatures. +- The [`getTransaction`](https://solana.com/docs/rpc/http/gettransaction) method can be used to fetch the transaction details, including logs, for a given signature. This means that we need at least two RPC calls for every signature: one to fetch the signatures and another to fetch the transaction logs. + +Additionally, Solana offers a WebSocket API: + +- The [`logsSubscribe`](https://solana.com/docs/rpc/websocket/logssubscribe) method allows subscribing to live log updates for specified accounts or programs. This method provides real-time logs but does not provide historical data. + +In order to efficiently process both historical and real-time logs, we need a strategy that combines both the HTTP RPC and WebSocket APIs, minimizing the number of RPC calls while ensuring no data is missed. + +## Decision + +We have designed the Solana Listener component to process logs in a multi-step execution flow, incorporating strategies for catching up missed signatures: + +1. **Fetching Historical Signatures** + + - Upon component startup, using the provided configuration, we determine the catch-up strategy for missed signatures. The strategies include: + - **None:** Start from the latest available signature without fetching historical data. + - **UntilSignatureReached:** Fetch signatures until a specified target signature is reached. + - **UntilBeginning:** Fetch all missed signatures starting from the beginning. + - We fetch signatures using the HTTP RPC endpoint `getSignaturesForAddress` in batches. + - For each signature, we fetch the corresponding transaction logs using `getTransaction`. + - We keep track of the latest signature we have processed to avoid duplicate processing. + +2. **Establishing a WebSocket Connection** + + - We establish a new WebSocket connection to the Solana node using `logsSubscribe`, subscribing to logs involving the Gateway program ID. + - We await the first message from the WebSocket stream. + +3. **Fetching Signatures Missed During WebSocket Connection Establishment** + + - Establishing the WebSocket connection can take a few seconds, during which we might miss some signatures. + - To handle this, we use the HTTP RPC methods to fetch any signatures that might have been missed between the last processed signature and the first signature received via WebSocket. + - This ensures continuity in the event stream. + +4. **Ingesting New Signatures Using WebSocket Data** + + - After handling historical and potentially missed signatures, we process new incoming logs from the WebSocket connection in real-time. + - We monitor the WebSocket connection and handle reconnections if necessary to maintain real-time data ingestion. + +## Consequences + +Pros: + +- **Improved Reliability:** By implementing configurable catch-up strategies and combining HTTP RPC and WebSocket APIs, we ensure that no events are missed, even during startup or reconnection periods. +- **Flexibility:** The ability to configure the catch-up strategy allows us to balance between startup time and data completeness based on specific needs. + +Cons: + +- **Complexity:** The implementation involves coordinating between HTTP RPC calls and WebSocket subscriptions, handling potential overlaps. This can be hard to grasp and may seem as unnecessary complexity. +- **Resource Utilization:** Fetching historical data may increase the number of RPC calls, to evade rate limitations, the RPC uses a rate-limiting semaphore and internal sleeps.