diff --git a/nyx-chain-watcher/src/chain_scraper/mod.rs b/nyx-chain-watcher/src/chain_scraper/mod.rs index 36634a5964..3a00d7f665 100644 --- a/nyx-chain-watcher/src/chain_scraper/mod.rs +++ b/nyx-chain-watcher/src/chain_scraper/mod.rs @@ -1,6 +1,8 @@ use nyxd_scraper::{storage::ScraperStorage, NyxdScraper, PruningOptions}; -pub(crate) async fn run_chain_scraper(config: &crate::config::Config) -> anyhow::Result { +pub(crate) async fn run_chain_scraper( + config: &crate::config::Config, +) -> anyhow::Result { let websocket_url = std::env::var("NYXD_WS").expect("NYXD_WS not defined"); let rpc_url = std::env::var("NYXD").expect("NYXD not defined"); diff --git a/nyx-chain-watcher/src/cli/commands/run/mod.rs b/nyx-chain-watcher/src/cli/commands/run/mod.rs index 84d3d8e1e5..d3095fc37d 100644 --- a/nyx-chain-watcher/src/cli/commands/run/mod.rs +++ b/nyx-chain-watcher/src/cli/commands/run/mod.rs @@ -21,8 +21,18 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa let db_path = config.database_path(); info!("Config is {config:#?}"); - info!("Database path is {:?}", std::path::Path::new(&db_path).canonicalize().unwrap_or_default()); - info!("Chain History Database path is {:?}", std::path::Path::new(&config.chain_scraper_database_path()).canonicalize().unwrap_or_default()); + info!( + "Database path is {:?}", + std::path::Path::new(&db_path) + .canonicalize() + .unwrap_or_default() + ); + info!( + "Chain History Database path is {:?}", + std::path::Path::new(&config.chain_scraper_database_path()) + .canonicalize() + .unwrap_or_default() + ); // Ensure parent directory exists if let Some(parent) = std::path::Path::new(&db_path).parent() { diff --git a/nyx-chain-watcher/src/payment_listener/mod.rs b/nyx-chain-watcher/src/payment_listener/mod.rs index e0ba3aa668..4df72b5a42 100644 --- a/nyx-chain-watcher/src/payment_listener/mod.rs +++ b/nyx-chain-watcher/src/payment_listener/mod.rs @@ -1,3 +1,4 @@ +use crate::config::payments_watcher::HttpAuthenticationOptions; use crate::config::PaymentWatcherConfig; use crate::db::queries; use crate::models::WebhookPayload; @@ -9,7 +10,7 @@ use serde_json::Value; use sqlx::SqlitePool; use std::str::FromStr; use tokio::time::{self, Duration}; -use tracing::{error, info}; +use tracing::{error, info, trace}; #[derive(Debug)] struct TransferEvent { @@ -59,10 +60,6 @@ pub(crate) async fn run_payment_listener( } for tx in transactions { - info!( - "[watcher = {}] Processing transaction: {}", - watcher.id, tx.hash - ); if let Some(raw_log) = tx.raw_log.as_deref() { if let Some(watch_for_transfer_recipient_accounts) = &watcher.watch_for_transfer_recipient_accounts @@ -73,6 +70,13 @@ pub(crate) async fn run_payment_listener( watch_for_transfer_recipient_accounts, ) { Ok(transfer_events) => { + if !transfer_events.is_empty() { + info!( + "[watcher = {}] Processing transaction: {} - {} payment events found", + watcher.id, tx.hash, transfer_events.len() + ); + } + for transfer in transfer_events { let amount: f64 = parse_unym_amount(&transfer.amount)?; @@ -96,12 +100,19 @@ pub(crate) async fn run_payment_listener( height: tx.height as u128, memo: tx.memo.clone(), }; - match client - .post(&watcher.webhook_url) - .json(&webhook_data) - .send() - .await - { + + let mut request_builder = + client.post(&watcher.webhook_url).json(&webhook_data); + + if let Some(auth) = &watcher.authentication { + match auth { + HttpAuthenticationOptions::AuthorizationBearerToken { token } => { + request_builder = request_builder.bearer_auth(token); + } + } + } + + match request_builder.send().await { Ok(res) => info!( "[watcher = {}] ✅ Webhook {} {} - tx {}, index {}", watcher.id, @@ -142,41 +153,52 @@ fn parse_transfer_from_raw_log( let mut transfers: Vec = vec![]; - if let Some(events) = log_value[0]["events"].as_array() { - for transfer_event in events.iter().filter(|e| e["type"] == "transfer") { - if let Some(attrs) = transfer_event["attributes"].as_array() { - let mut recipient: Option = None; - let mut sender: Option = None; - let mut amount: Option = None; - let message_index: Option = Some(0u64); - - for attr in attrs { - match attr["key"].as_str() { - Some("recipient") => { - recipient = - AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok(); - } - Some("sender") => { - sender = AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok(); - } - Some("amount") => { - amount = Some(attr["value"].as_str().unwrap_or("").to_string()) + let default_value = vec![]; + let log_entries: &Vec = log_value.as_array().unwrap_or(&default_value); + + trace!("contains {} log entries", log_entries.len()); + + for log_entry in log_entries { + let message_index = log_entry["msg_index"].as_u64().unwrap_or_default(); + + trace!("entry - {message_index}..."); + + if let Some(events) = log_entry["events"].as_array() { + for transfer_event in events.iter().filter(|e| e["type"] == "transfer") { + if let Some(attrs) = transfer_event["attributes"].as_array() { + let mut recipient: Option = None; + let mut sender: Option = None; + let mut amount: Option = None; + + for attr in attrs { + match attr["key"].as_str() { + Some("recipient") => { + recipient = + AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok(); + } + Some("sender") => { + sender = + AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok(); + } + Some("amount") => { + amount = Some(attr["value"].as_str().unwrap_or("").to_string()) + } + // TODO: parse message index + _ => continue, } - // TODO: parse message index - _ => continue, } - } - if let (Some(recipient), Some(sender), Some(amount), Some(message_index)) = - (recipient, sender, amount, message_index) - { - if watch_for_transfer_recipient_accounts.contains(&recipient) { - transfers.push(TransferEvent { - recipient, - sender, - amount, - message_index, - }); + if let (Some(recipient), Some(sender), Some(amount)) = + (recipient, sender, amount) + { + if watch_for_transfer_recipient_accounts.contains(&recipient) { + transfers.push(TransferEvent { + recipient, + sender, + amount, + message_index, + }); + } } } }