diff --git a/nym-api/src/network_monitor/mod.rs b/nym-api/src/network_monitor/mod.rs index 699498b2872..1df86ef3b81 100644 --- a/nym-api/src/network_monitor/mod.rs +++ b/nym-api/src/network_monitor/mod.rs @@ -12,6 +12,7 @@ use crate::network_monitor::monitor::sender::PacketSender; use crate::network_monitor::monitor::summary_producer::SummaryProducer; use crate::network_monitor::monitor::Monitor; use crate::node_describe_cache::DescribedNodes; +use crate::node_status_api::NodeStatusCache; use crate::nym_contract_cache::cache::NymContractCache; use crate::storage::NymApiStorage; use crate::support::caching::cache::SharedCache; @@ -38,6 +39,7 @@ pub(crate) fn setup<'a>( config: &'a config::NetworkMonitor, nym_contract_cache: &NymContractCache, described_cache: SharedCache, + node_status_cache: NodeStatusCache, storage: &NymApiStorage, nyxd_client: nyxd::Client, ) -> NetworkMonitorBuilder<'a> { @@ -47,6 +49,7 @@ pub(crate) fn setup<'a>( storage.to_owned(), nym_contract_cache.clone(), described_cache, + node_status_cache, ) } @@ -56,6 +59,7 @@ pub(crate) struct NetworkMonitorBuilder<'a> { node_status_storage: NymApiStorage, contract_cache: NymContractCache, described_cache: SharedCache, + node_status_cache: NodeStatusCache, } impl<'a> NetworkMonitorBuilder<'a> { @@ -65,6 +69,7 @@ impl<'a> NetworkMonitorBuilder<'a> { node_status_storage: NymApiStorage, contract_cache: NymContractCache, described_cache: SharedCache, + node_status_cache: NodeStatusCache, ) -> Self { NetworkMonitorBuilder { config, @@ -72,6 +77,7 @@ impl<'a> NetworkMonitorBuilder<'a> { node_status_storage, contract_cache, described_cache, + node_status_cache, } } @@ -94,6 +100,7 @@ impl<'a> NetworkMonitorBuilder<'a> { let packet_preparer = new_packet_preparer( self.contract_cache, self.described_cache, + self.node_status_cache, self.config.debug.per_node_test_packets, Arc::clone(&ack_key), *identity_keypair.public_key(), @@ -169,6 +176,7 @@ impl NetworkMonitorRunnables { fn new_packet_preparer( contract_cache: NymContractCache, described_cache: SharedCache, + node_status_cache: NodeStatusCache, per_node_test_packets: usize, ack_key: Arc, self_public_identity: identity::PublicKey, @@ -177,6 +185,7 @@ fn new_packet_preparer( PacketPreparer::new( contract_cache, described_cache, + node_status_cache, per_node_test_packets, ack_key, self_public_identity, @@ -231,6 +240,7 @@ pub(crate) async fn start( config: &config::NetworkMonitor, nym_contract_cache: &NymContractCache, described_cache: SharedCache, + node_status_cache: NodeStatusCache, storage: &NymApiStorage, nyxd_client: nyxd::Client, shutdown: &TaskManager, @@ -239,6 +249,7 @@ pub(crate) async fn start( config, nym_contract_cache, described_cache, + node_status_cache, storage, nyxd_client, ); diff --git a/nym-api/src/network_monitor/monitor/mod.rs b/nym-api/src/network_monitor/monitor/mod.rs index 4d24cb0504d..b99ce6a0cec 100644 --- a/nym-api/src/network_monitor/monitor/mod.rs +++ b/nym-api/src/network_monitor/monitor/mod.rs @@ -197,7 +197,7 @@ impl Monitor { // the actual target let candidates = match self .packet_preparer - .prepare_test_routes(remaining * 2, &mut blacklist) + .prepare_test_routes(remaining * 2) .await { Some(candidates) => candidates, diff --git a/nym-api/src/network_monitor/monitor/preparer.rs b/nym-api/src/network_monitor/monitor/preparer.rs index 04fdceebefb..50c3be80baa 100644 --- a/nym-api/src/network_monitor/monitor/preparer.rs +++ b/nym-api/src/network_monitor/monitor/preparer.rs @@ -4,10 +4,12 @@ use crate::network_monitor::monitor::sender::GatewayPackets; use crate::network_monitor::test_route::TestRoute; use crate::node_describe_cache::{DescribedNodes, NodeDescriptionTopologyExt}; +use crate::node_status_api::NodeStatusCache; use crate::nym_contract_cache::cache::{CachedRewardedSet, NymContractCache}; use crate::support::caching::cache::SharedCache; use nym_api_requests::legacy::{LegacyGatewayBondWithId, LegacyMixNodeBondWithLayer}; -use nym_api_requests::models::NymNodeDescription; +use nym_api_requests::models::{NodeAnnotation, NymNodeDescription}; +use nym_contracts_common::NaiveFloat; use nym_crypto::asymmetric::{encryption, identity}; use nym_mixnet_contract_common::{LegacyMixLayer, NodeId}; use nym_node_tester_utils::node::TestableNode; @@ -21,7 +23,7 @@ use nym_topology::mix::MixnodeConversionError; use nym_topology::{gateway, mix}; use rand::prelude::SliceRandom; use rand::{rngs::ThreadRng, thread_rng, Rng}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt::{self, Display, Formatter}; use std::sync::Arc; use std::time::Duration; @@ -77,6 +79,7 @@ pub(crate) struct PreparedPackets { pub(crate) struct PacketPreparer { contract_cache: NymContractCache, described_cache: SharedCache, + node_status_cache: NodeStatusCache, /// Number of test packets sent to each node per_node_test_packets: usize, @@ -94,6 +97,7 @@ impl PacketPreparer { pub(crate) fn new( contract_cache: NymContractCache, described_cache: SharedCache, + node_status_cache: NodeStatusCache, per_node_test_packets: usize, ack_key: Arc, self_public_identity: identity::PublicKey, @@ -102,6 +106,7 @@ impl PacketPreparer { PacketPreparer { contract_cache, described_cache, + node_status_cache, per_node_test_packets, ack_key, self_public_identity, @@ -205,20 +210,6 @@ impl PacketPreparer { (mixnodes, gateways) } - async fn filtered_legacy_mixnodes_and_gateways( - &self, - ) -> ( - Vec, - Vec, - ) { - info!("Obtaining network topology..."); - - let mixnodes = self.contract_cache.legacy_mixnodes_filtered_basic().await; - let gateways = self.contract_cache.legacy_gateways_filtered().await; - - (mixnodes, gateways) - } - pub(crate) fn try_parse_mix_bond( &self, bond: &LegacyMixNodeBondWithLayer, @@ -276,58 +267,50 @@ impl PacketPreparer { parse_bond(gateway).map_err(|_| identity) } - fn layered_mixes<'a, R: Rng>( + fn to_legacy_layered_mixes<'a, R: Rng>( &self, rng: &mut R, - blacklist: &mut HashSet, rewarded_set: &CachedRewardedSet, - legacy_mixnodes: Vec, + node_statuses: &HashMap, mixing_nym_nodes: impl Iterator + 'a, - ) -> HashMap> { + ) -> HashMap> { let mut layered_mixes = HashMap::new(); - for mix in legacy_mixnodes { - let layer = mix.layer; - let layer_mixes = layered_mixes.entry(layer).or_insert_with(Vec::new); - let Ok(parsed_node) = self.try_parse_mix_bond(&mix) else { - blacklist.insert(mix.mix_id); - continue; - }; - layer_mixes.push(parsed_node) - } for mixing_nym_node in mixing_nym_nodes { let Some(parsed_node) = self.nym_node_to_legacy_mix(rng, rewarded_set, mixing_nym_node) else { continue; }; + // if the node is not present, default to 0.5 + let weight = node_statuses + .get(&mixing_nym_node.node_id) + .map(|node| node.last_24h_performance.naive_to_f64()) + .unwrap_or(0.5); let layer = parsed_node.layer; let layer_mixes = layered_mixes.entry(layer).or_insert_with(Vec::new); - layer_mixes.push(parsed_node) + layer_mixes.push((parsed_node, weight)) } layered_mixes } - fn all_gateways<'a>( + fn to_legacy_gateway_nodes<'a>( &self, - blacklist: &mut HashSet, - legacy_gateways: Vec, + node_statuses: &HashMap, gateway_capable_nym_nodes: impl Iterator + 'a, - ) -> Vec { + ) -> Vec<(gateway::LegacyNode, f64)> { let mut gateways = Vec::new(); - for gateway in legacy_gateways { - let Ok(parsed_node) = self.try_parse_gateway_bond(&gateway) else { - blacklist.insert(gateway.node_id); - continue; - }; - gateways.push(parsed_node) - } for gateway_capable_node in gateway_capable_nym_nodes { let Some(parsed_node) = self.nym_node_to_legacy_gateway(gateway_capable_node) else { continue; }; - gateways.push(parsed_node) + // if the node is not present, default to 0.5 + let weight = node_statuses + .get(&gateway_capable_node.node_id) + .map(|node| node.last_24h_performance.naive_to_f64()) + .unwrap_or(0.5); + gateways.push((parsed_node, weight)) } gateways @@ -337,15 +320,11 @@ impl PacketPreparer { // if failed to get parsed => onto the blacklist they go // if generated fewer than n, blacklist will be updated by external function with correctly generated // routes so that they wouldn't be reused - pub(crate) async fn prepare_test_routes( - &self, - n: usize, - blacklist: &mut HashSet, - ) -> Option> { - let (legacy_mixnodes, legacy_gateways) = self.filtered_legacy_mixnodes_and_gateways().await; + pub(crate) async fn prepare_test_routes(&self, n: usize) -> Option> { let rewarded_set = self.contract_cache.rewarded_set().await?; let descriptions = self.described_cache.get().await.ok()?; + let statuses = self.node_status_cache.node_annotations().await?; let mixing_nym_nodes = descriptions.mixing_nym_nodes(); // last I checked `gatewaying` wasn't a word : ) @@ -353,15 +332,10 @@ impl PacketPreparer { let mut rng = thread_rng(); - // separate mixes into layers for easier selection - let layered_mixes = self.layered_mixes( - &mut rng, - blacklist, - &rewarded_set, - legacy_mixnodes, - mixing_nym_nodes, - ); - let gateways = self.all_gateways(blacklist, legacy_gateways, gateway_capable_nym_nodes); + // separate mixes into layers for easier selection alongside the selection weights + let layered_mixes = + self.to_legacy_layered_mixes(&mut rng, &rewarded_set, &statuses, mixing_nym_nodes); + let gateways = self.to_legacy_gateway_nodes(&statuses, gateway_capable_nym_nodes); // get all nodes from each layer... let l1 = layered_mixes.get(&LegacyMixLayer::One)?; @@ -369,10 +343,26 @@ impl PacketPreparer { let l3 = layered_mixes.get(&LegacyMixLayer::Three)?; // try to choose n nodes from each of them (+ gateways)... - let rand_l1 = l1.choose_multiple(&mut rng, n).collect::>(); - let rand_l2 = l2.choose_multiple(&mut rng, n).collect::>(); - let rand_l3 = l3.choose_multiple(&mut rng, n).collect::>(); - let rand_gateways = gateways.choose_multiple(&mut rng, n).collect::>(); + let rand_l1 = l1 + .choose_multiple_weighted(&mut rng, n, |item| item.1) + .ok()? + .map(|node| node.0.clone()) + .collect::>(); + let rand_l2 = l2 + .choose_multiple_weighted(&mut rng, n, |item| item.1) + .ok()? + .map(|node| node.0.clone()) + .collect::>(); + let rand_l3 = l3 + .choose_multiple_weighted(&mut rng, n, |item| item.1) + .ok()? + .map(|node| node.0.clone()) + .collect::>(); + let rand_gateways = gateways + .choose_multiple_weighted(&mut rng, n, |item| item.1) + .ok()? + .map(|node| node.0.clone()) + .collect::>(); // the unwrap on `min()` is fine as we know the iterator is not empty let most_available = *[ diff --git a/nym-api/src/nym_contract_cache/cache/mod.rs b/nym-api/src/nym_contract_cache/cache/mod.rs index eeb9b2a7c28..ed5db5cb3c9 100644 --- a/nym-api/src/nym_contract_cache/cache/mod.rs +++ b/nym-api/src/nym_contract_cache/cache/mod.rs @@ -219,14 +219,6 @@ impl NymContractCache { .into_inner() } - pub async fn legacy_mixnodes_filtered_basic(&self) -> Vec { - self.legacy_mixnodes_filtered() - .await - .into_iter() - .map(|bond| bond.bond_information) - .collect() - } - pub async fn legacy_mixnodes_all_basic(&self) -> Vec { self.legacy_mixnodes_all() .await diff --git a/nym-api/src/support/cli/run.rs b/nym-api/src/support/cli/run.rs index 6436f19be15..9f3feb0d1f7 100644 --- a/nym-api/src/support/cli/run.rs +++ b/nym-api/src/support/cli/run.rs @@ -259,6 +259,7 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result