Skip to content

Commit

Permalink
chore(auditor): further improve auditor performance
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Jun 10, 2024
1 parent 5d67202 commit ba57a35
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 61 deletions.
38 changes: 18 additions & 20 deletions sn_auditor/src/dag_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ lazy_static! {
/// time in seconds UTXOs are refetched in DAG crawl
static ref UTXO_REATTEMPT_INTERVAL: Duration = Duration::from_secs(
std::env::var("UTXO_REATTEMPT_INTERVAL")
.unwrap_or("3600".to_string())
.unwrap_or("1800".to_string())
.parse::<u64>()
.unwrap_or(300)
);
Expand Down Expand Up @@ -237,17 +237,14 @@ impl SpendDagDb {
let mut addrs_to_get = BTreeSet::new();

loop {
// get current utxos to fetch
// `addrs_to_get` is always empty when reaching this point
// get expired utxos for the further fetch
let utxos_to_fetch;
let now = Instant::now();

// Always track new outputs first
if addrs_to_get.is_empty() {
let utxos_to_fetch;
(utxo_addresses, utxos_to_fetch) = utxo_addresses
.into_iter()
.partition(|(_address, time_stamp)| *time_stamp > now);
addrs_to_get.extend(utxos_to_fetch.keys().cloned().collect::<BTreeSet<_>>());
}
(utxo_addresses, utxos_to_fetch) = utxo_addresses
.into_iter()
.partition(|(_address, time_stamp)| *time_stamp > now);
addrs_to_get.extend(utxos_to_fetch.keys().cloned().collect::<BTreeSet<_>>());

if addrs_to_get.is_empty() {
debug!(
Expand All @@ -273,15 +270,16 @@ impl SpendDagDb {
.map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)),
);
} else if let Some(sender) = spend_processing.clone() {
let (reattempt_addrs, new_utxos) =
client.crawl_to_next_utxos(&addrs_to_get, sender).await?;
utxo_addresses.extend(
reattempt_addrs
.into_iter()
.map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)),
);
addrs_to_get.clear();
addrs_to_get.extend(new_utxos);
if let Ok(reattempt_addrs) = client
.crawl_to_next_utxos(
&mut addrs_to_get,
sender.clone(),
*UTXO_REATTEMPT_INTERVAL,
)
.await
{
utxo_addresses.extend(reattempt_addrs);
}
} else {
panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments.");
};
Expand Down
85 changes: 44 additions & 41 deletions sn_client/src/audit/dag_crawling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ use sn_transfers::{
SignedSpend, SpendAddress, SpendReason, WalletError, WalletResult, GENESIS_SPEND_UNIQUE_KEY,
NETWORK_ROYALTIES_PK,
};
use std::collections::BTreeSet;
use tokio::sync::mpsc::Sender;
use std::{
collections::{BTreeMap, BTreeSet},
time::{Duration, Instant},
};
use tokio::{sync::mpsc::Sender, task::JoinSet};

const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096;

Expand Down Expand Up @@ -129,51 +132,51 @@ impl Client {
Ok(dag)
}

/// Get spends for a set of given SpendAddresses
/// Notifies the UTXOs that need to be further tracked down.
/// returns: (addresses_for_reattempt, new_utxos_for_furthertracking)
/// Get spends from a set of given SpendAddresses
/// Recursivly fetching till reached frontline of the DAG tree.
/// Return with UTXOs for re-attempt (with insertion time stamp)
pub async fn crawl_to_next_utxos(
&self,
from: &BTreeSet<SpendAddress>,
spend_processing: Sender<(SignedSpend, u64)>,
) -> WalletResult<(BTreeSet<SpendAddress>, BTreeSet<SpendAddress>)> {
let spends = join_all(from.iter().map(|&address| {
let client_clone = self.clone();
async move { (client_clone.crawl_spend(address).await, address) }
}))
.await;

let mut failed_utxos = BTreeSet::new();
let mut new_utxos = BTreeSet::new();

for (result, address) in spends {
let spend = match result {
InternalGetNetworkSpend::Spend(s) => *s,
InternalGetNetworkSpend::DoubleSpend(_s1, _s2) => {
warn!("Detected double spend regarding {address:?}");
continue;
}
InternalGetNetworkSpend::NotFound => {
let _ = failed_utxos.insert(address);
continue;
}
InternalGetNetworkSpend::Error(e) => {
warn!("Got a fetching error {e:?}");
continue;
addrs_to_get: &mut BTreeSet<SpendAddress>,
sender: Sender<(SignedSpend, u64)>,
reattempt_interval: Duration,
) -> WalletResult<BTreeMap<SpendAddress, Instant>> {
let mut failed_utxos = BTreeMap::new();
let mut tasks = JoinSet::new();

while !addrs_to_get.is_empty() || !tasks.is_empty() {
while tasks.len() < 32 && !addrs_to_get.is_empty() {
if let Some(addr) = addrs_to_get.pop_first() {
let client_clone = self.clone();
let _ =
tasks.spawn(async move { (client_clone.crawl_spend(addr).await, addr) });
}
};

let for_further_track = beta_track_analyze_spend(&spend);

spend_processing
.send((spend, for_further_track.len() as u64))
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}

new_utxos.extend(for_further_track);
if let Some(Ok((result, address))) = tasks.join_next().await {
match result {
InternalGetNetworkSpend::Spend(spend) => {
let for_further_track = beta_track_analyze_spend(&spend);
let _ = sender
.send((*spend, for_further_track.len() as u64))
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()));
addrs_to_get.extend(for_further_track);
}
InternalGetNetworkSpend::DoubleSpend(_s1, _s2) => {
warn!("Detected double spend regarding {address:?}");
}
InternalGetNetworkSpend::NotFound => {
let _ = failed_utxos.insert(address, Instant::now() + reattempt_interval);
}
InternalGetNetworkSpend::Error(e) => {
warn!("Got a fetching error {e:?}");
}
}
}
}

Ok((failed_utxos, new_utxos))
Ok(failed_utxos)
}

/// Crawls the Spend Dag from a given SpendAddress recursively
Expand Down

0 comments on commit ba57a35

Please sign in to comment.