Skip to content

Commit

Permalink
parse message index and process all log entries
Browse files Browse the repository at this point in the history
  • Loading branch information
mmsinclair committed Dec 6, 2024
1 parent d38abba commit 4877a93
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 46 deletions.
4 changes: 3 additions & 1 deletion nyx-chain-watcher/src/chain_scraper/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use nyxd_scraper::{storage::ScraperStorage, NyxdScraper, PruningOptions};

pub(crate) async fn run_chain_scraper(config: &crate::config::Config) -> anyhow::Result<ScraperStorage> {
pub(crate) async fn run_chain_scraper(
config: &crate::config::Config,
) -> anyhow::Result<ScraperStorage> {
let websocket_url = std::env::var("NYXD_WS").expect("NYXD_WS not defined");

let rpc_url = std::env::var("NYXD").expect("NYXD not defined");
Expand Down
14 changes: 12 additions & 2 deletions nyx-chain-watcher/src/cli/commands/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,18 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
let db_path = config.database_path();

info!("Config is {config:#?}");
info!("Database path is {:?}", std::path::Path::new(&db_path).canonicalize().unwrap_or_default());
info!("Chain History Database path is {:?}", std::path::Path::new(&config.chain_scraper_database_path()).canonicalize().unwrap_or_default());
info!(
"Database path is {:?}",
std::path::Path::new(&db_path)
.canonicalize()
.unwrap_or_default()
);
info!(
"Chain History Database path is {:?}",
std::path::Path::new(&config.chain_scraper_database_path())
.canonicalize()
.unwrap_or_default()
);

// Ensure parent directory exists
if let Some(parent) = std::path::Path::new(&db_path).parent() {
Expand Down
108 changes: 65 additions & 43 deletions nyx-chain-watcher/src/payment_listener/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::config::payments_watcher::HttpAuthenticationOptions;
use crate::config::PaymentWatcherConfig;
use crate::db::queries;
use crate::models::WebhookPayload;
Expand All @@ -9,7 +10,7 @@ use serde_json::Value;
use sqlx::SqlitePool;
use std::str::FromStr;
use tokio::time::{self, Duration};
use tracing::{error, info};
use tracing::{error, info, trace};

#[derive(Debug)]
struct TransferEvent {
Expand Down Expand Up @@ -59,10 +60,6 @@ pub(crate) async fn run_payment_listener(
}

for tx in transactions {
info!(
"[watcher = {}] Processing transaction: {}",
watcher.id, tx.hash
);
if let Some(raw_log) = tx.raw_log.as_deref() {
if let Some(watch_for_transfer_recipient_accounts) =
&watcher.watch_for_transfer_recipient_accounts
Expand All @@ -73,6 +70,13 @@ pub(crate) async fn run_payment_listener(
watch_for_transfer_recipient_accounts,
) {
Ok(transfer_events) => {
if !transfer_events.is_empty() {
info!(
"[watcher = {}] Processing transaction: {} - {} payment events found",
watcher.id, tx.hash, transfer_events.len()
);
}

for transfer in transfer_events {
let amount: f64 = parse_unym_amount(&transfer.amount)?;

Expand All @@ -96,12 +100,19 @@ pub(crate) async fn run_payment_listener(
height: tx.height as u128,
memo: tx.memo.clone(),
};
match client
.post(&watcher.webhook_url)
.json(&webhook_data)
.send()
.await
{

let mut request_builder =
client.post(&watcher.webhook_url).json(&webhook_data);

if let Some(auth) = &watcher.authentication {
match auth {
HttpAuthenticationOptions::AuthorizationBearerToken { token } => {
request_builder = request_builder.bearer_auth(token);
}
}
}

match request_builder.send().await {
Ok(res) => info!(
"[watcher = {}] ✅ Webhook {} {} - tx {}, index {}",
watcher.id,
Expand Down Expand Up @@ -142,41 +153,52 @@ fn parse_transfer_from_raw_log(

let mut transfers: Vec<TransferEvent> = vec![];

if let Some(events) = log_value[0]["events"].as_array() {
for transfer_event in events.iter().filter(|e| e["type"] == "transfer") {
if let Some(attrs) = transfer_event["attributes"].as_array() {
let mut recipient: Option<AccountId> = None;
let mut sender: Option<AccountId> = None;
let mut amount: Option<String> = None;
let message_index: Option<u64> = Some(0u64);

for attr in attrs {
match attr["key"].as_str() {
Some("recipient") => {
recipient =
AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok();
}
Some("sender") => {
sender = AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok();
}
Some("amount") => {
amount = Some(attr["value"].as_str().unwrap_or("").to_string())
let default_value = vec![];
let log_entries: &Vec<Value> = log_value.as_array().unwrap_or(&default_value);

trace!("contains {} log entries", log_entries.len());

for log_entry in log_entries {
let message_index = log_entry["msg_index"].as_u64().unwrap_or_default();

trace!("entry - {message_index}...");

if let Some(events) = log_entry["events"].as_array() {
for transfer_event in events.iter().filter(|e| e["type"] == "transfer") {
if let Some(attrs) = transfer_event["attributes"].as_array() {
let mut recipient: Option<AccountId> = None;
let mut sender: Option<AccountId> = None;
let mut amount: Option<String> = None;

for attr in attrs {
match attr["key"].as_str() {
Some("recipient") => {
recipient =
AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok();
}
Some("sender") => {
sender =
AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok();
}
Some("amount") => {
amount = Some(attr["value"].as_str().unwrap_or("").to_string())
}
// TODO: parse message index
_ => continue,
}
// TODO: parse message index
_ => continue,
}
}

if let (Some(recipient), Some(sender), Some(amount), Some(message_index)) =
(recipient, sender, amount, message_index)
{
if watch_for_transfer_recipient_accounts.contains(&recipient) {
transfers.push(TransferEvent {
recipient,
sender,
amount,
message_index,
});
if let (Some(recipient), Some(sender), Some(amount)) =
(recipient, sender, amount)
{
if watch_for_transfer_recipient_accounts.contains(&recipient) {
transfers.push(TransferEvent {
recipient,
sender,
amount,
message_index,
});
}
}
}
}
Expand Down

0 comments on commit 4877a93

Please sign in to comment.