Skip to content

Commit

Permalink
ignore legacy nodes for test route selection and bias selection with …
Browse files Browse the repository at this point in the history
…existing score
  • Loading branch information
jstuczyn committed Dec 4, 2024
1 parent 29ea462 commit df4ba70
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 70 deletions.
11 changes: 11 additions & 0 deletions nym-api/src/network_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::network_monitor::monitor::sender::PacketSender;
use crate::network_monitor::monitor::summary_producer::SummaryProducer;
use crate::network_monitor::monitor::Monitor;
use crate::node_describe_cache::DescribedNodes;
use crate::node_status_api::NodeStatusCache;
use crate::nym_contract_cache::cache::NymContractCache;
use crate::storage::NymApiStorage;
use crate::support::caching::cache::SharedCache;
Expand All @@ -38,6 +39,7 @@ pub(crate) fn setup<'a>(
config: &'a config::NetworkMonitor,
nym_contract_cache: &NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
storage: &NymApiStorage,
nyxd_client: nyxd::Client,
) -> NetworkMonitorBuilder<'a> {
Expand All @@ -47,6 +49,7 @@ pub(crate) fn setup<'a>(
storage.to_owned(),
nym_contract_cache.clone(),
described_cache,
node_status_cache,
)
}

Expand All @@ -56,6 +59,7 @@ pub(crate) struct NetworkMonitorBuilder<'a> {
node_status_storage: NymApiStorage,
contract_cache: NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
}

impl<'a> NetworkMonitorBuilder<'a> {
Expand All @@ -65,13 +69,15 @@ impl<'a> NetworkMonitorBuilder<'a> {
node_status_storage: NymApiStorage,
contract_cache: NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
) -> Self {
NetworkMonitorBuilder {
config,
nyxd_client,
node_status_storage,
contract_cache,
described_cache,
node_status_cache,
}
}

Expand All @@ -94,6 +100,7 @@ impl<'a> NetworkMonitorBuilder<'a> {
let packet_preparer = new_packet_preparer(
self.contract_cache,
self.described_cache,
self.node_status_cache,
self.config.debug.per_node_test_packets,
Arc::clone(&ack_key),
*identity_keypair.public_key(),
Expand Down Expand Up @@ -169,6 +176,7 @@ impl<R: MessageReceiver + Send + 'static> NetworkMonitorRunnables<R> {
fn new_packet_preparer(
contract_cache: NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
per_node_test_packets: usize,
ack_key: Arc<AckKey>,
self_public_identity: identity::PublicKey,
Expand All @@ -177,6 +185,7 @@ fn new_packet_preparer(
PacketPreparer::new(
contract_cache,
described_cache,
node_status_cache,
per_node_test_packets,
ack_key,
self_public_identity,
Expand Down Expand Up @@ -231,6 +240,7 @@ pub(crate) async fn start<R: MessageReceiver + Send + 'static>(
config: &config::NetworkMonitor,
nym_contract_cache: &NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
storage: &NymApiStorage,
nyxd_client: nyxd::Client,
shutdown: &TaskManager,
Expand All @@ -239,6 +249,7 @@ pub(crate) async fn start<R: MessageReceiver + Send + 'static>(
config,
nym_contract_cache,
described_cache,
node_status_cache,
storage,
nyxd_client,
);
Expand Down
2 changes: 1 addition & 1 deletion nym-api/src/network_monitor/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl<R: MessageReceiver + Send> Monitor<R> {
// the actual target
let candidates = match self
.packet_preparer
.prepare_test_routes(remaining * 2, &mut blacklist)
.prepare_test_routes(remaining * 2)
.await
{
Some(candidates) => candidates,
Expand Down
112 changes: 51 additions & 61 deletions nym-api/src/network_monitor/monitor/preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
use crate::network_monitor::monitor::sender::GatewayPackets;
use crate::network_monitor::test_route::TestRoute;
use crate::node_describe_cache::{DescribedNodes, NodeDescriptionTopologyExt};
use crate::node_status_api::NodeStatusCache;
use crate::nym_contract_cache::cache::{CachedRewardedSet, NymContractCache};
use crate::support::caching::cache::SharedCache;
use nym_api_requests::legacy::{LegacyGatewayBondWithId, LegacyMixNodeBondWithLayer};
use nym_api_requests::models::NymNodeDescription;
use nym_api_requests::models::{NodeAnnotation, NymNodeDescription};
use nym_contracts_common::NaiveFloat;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_contract_common::{LegacyMixLayer, NodeId};
use nym_node_tester_utils::node::TestableNode;
Expand All @@ -21,7 +23,7 @@ use nym_topology::mix::MixnodeConversionError;
use nym_topology::{gateway, mix};
use rand::prelude::SliceRandom;
use rand::{rngs::ThreadRng, thread_rng, Rng};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -77,6 +79,7 @@ pub(crate) struct PreparedPackets {
pub(crate) struct PacketPreparer {
contract_cache: NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,

/// Number of test packets sent to each node
per_node_test_packets: usize,
Expand All @@ -94,6 +97,7 @@ impl PacketPreparer {
pub(crate) fn new(
contract_cache: NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
per_node_test_packets: usize,
ack_key: Arc<AckKey>,
self_public_identity: identity::PublicKey,
Expand All @@ -102,6 +106,7 @@ impl PacketPreparer {
PacketPreparer {
contract_cache,
described_cache,
node_status_cache,
per_node_test_packets,
ack_key,
self_public_identity,
Expand Down Expand Up @@ -205,20 +210,6 @@ impl PacketPreparer {
(mixnodes, gateways)
}

async fn filtered_legacy_mixnodes_and_gateways(
&self,
) -> (
Vec<LegacyMixNodeBondWithLayer>,
Vec<LegacyGatewayBondWithId>,
) {
info!("Obtaining network topology...");

let mixnodes = self.contract_cache.legacy_mixnodes_filtered_basic().await;
let gateways = self.contract_cache.legacy_gateways_filtered().await;

(mixnodes, gateways)
}

pub(crate) fn try_parse_mix_bond(
&self,
bond: &LegacyMixNodeBondWithLayer,
Expand Down Expand Up @@ -276,58 +267,50 @@ impl PacketPreparer {
parse_bond(gateway).map_err(|_| identity)
}

fn layered_mixes<'a, R: Rng>(
fn to_legacy_layered_mixes<'a, R: Rng>(
&self,
rng: &mut R,
blacklist: &mut HashSet<NodeId>,
rewarded_set: &CachedRewardedSet,
legacy_mixnodes: Vec<LegacyMixNodeBondWithLayer>,
node_statuses: &HashMap<NodeId, NodeAnnotation>,
mixing_nym_nodes: impl Iterator<Item = &'a NymNodeDescription> + 'a,
) -> HashMap<LegacyMixLayer, Vec<mix::LegacyNode>> {
) -> HashMap<LegacyMixLayer, Vec<(mix::LegacyNode, f64)>> {
let mut layered_mixes = HashMap::new();
for mix in legacy_mixnodes {
let layer = mix.layer;
let layer_mixes = layered_mixes.entry(layer).or_insert_with(Vec::new);
let Ok(parsed_node) = self.try_parse_mix_bond(&mix) else {
blacklist.insert(mix.mix_id);
continue;
};
layer_mixes.push(parsed_node)
}

for mixing_nym_node in mixing_nym_nodes {
let Some(parsed_node) = self.nym_node_to_legacy_mix(rng, rewarded_set, mixing_nym_node)
else {
continue;
};
// if the node is not present, default to 0.5
let weight = node_statuses
.get(&mixing_nym_node.node_id)
.map(|node| node.last_24h_performance.naive_to_f64())
.unwrap_or(0.5);
let layer = parsed_node.layer;
let layer_mixes = layered_mixes.entry(layer).or_insert_with(Vec::new);
layer_mixes.push(parsed_node)
layer_mixes.push((parsed_node, weight))
}

layered_mixes
}

fn all_gateways<'a>(
fn to_legacy_gateway_nodes<'a>(
&self,
blacklist: &mut HashSet<NodeId>,
legacy_gateways: Vec<LegacyGatewayBondWithId>,
node_statuses: &HashMap<NodeId, NodeAnnotation>,
gateway_capable_nym_nodes: impl Iterator<Item = &'a NymNodeDescription> + 'a,
) -> Vec<gateway::LegacyNode> {
) -> Vec<(gateway::LegacyNode, f64)> {
let mut gateways = Vec::new();
for gateway in legacy_gateways {
let Ok(parsed_node) = self.try_parse_gateway_bond(&gateway) else {
blacklist.insert(gateway.node_id);
continue;
};
gateways.push(parsed_node)
}

for gateway_capable_node in gateway_capable_nym_nodes {
let Some(parsed_node) = self.nym_node_to_legacy_gateway(gateway_capable_node) else {
continue;
};
gateways.push(parsed_node)
// if the node is not present, default to 0.5
let weight = node_statuses
.get(&gateway_capable_node.node_id)
.map(|node| node.last_24h_performance.naive_to_f64())
.unwrap_or(0.5);
gateways.push((parsed_node, weight))
}

gateways
Expand All @@ -337,42 +320,49 @@ impl PacketPreparer {
// if failed to get parsed => onto the blacklist they go
// if generated fewer than n, blacklist will be updated by external function with correctly generated
// routes so that they wouldn't be reused
pub(crate) async fn prepare_test_routes(
&self,
n: usize,
blacklist: &mut HashSet<NodeId>,
) -> Option<Vec<TestRoute>> {
let (legacy_mixnodes, legacy_gateways) = self.filtered_legacy_mixnodes_and_gateways().await;
pub(crate) async fn prepare_test_routes(&self, n: usize) -> Option<Vec<TestRoute>> {
let rewarded_set = self.contract_cache.rewarded_set().await?;

let descriptions = self.described_cache.get().await.ok()?;
let statuses = self.node_status_cache.node_annotations().await?;

let mixing_nym_nodes = descriptions.mixing_nym_nodes();
// last I checked `gatewaying` wasn't a word : )
let gateway_capable_nym_nodes = descriptions.entry_capable_nym_nodes();

let mut rng = thread_rng();

// separate mixes into layers for easier selection
let layered_mixes = self.layered_mixes(
&mut rng,
blacklist,
&rewarded_set,
legacy_mixnodes,
mixing_nym_nodes,
);
let gateways = self.all_gateways(blacklist, legacy_gateways, gateway_capable_nym_nodes);
// separate mixes into layers for easier selection alongside the selection weights
let layered_mixes =
self.to_legacy_layered_mixes(&mut rng, &rewarded_set, &statuses, mixing_nym_nodes);
let gateways = self.to_legacy_gateway_nodes(&statuses, gateway_capable_nym_nodes);

// get all nodes from each layer...
let l1 = layered_mixes.get(&LegacyMixLayer::One)?;
let l2 = layered_mixes.get(&LegacyMixLayer::Two)?;
let l3 = layered_mixes.get(&LegacyMixLayer::Three)?;

// try to choose n nodes from each of them (+ gateways)...
let rand_l1 = l1.choose_multiple(&mut rng, n).collect::<Vec<_>>();
let rand_l2 = l2.choose_multiple(&mut rng, n).collect::<Vec<_>>();
let rand_l3 = l3.choose_multiple(&mut rng, n).collect::<Vec<_>>();
let rand_gateways = gateways.choose_multiple(&mut rng, n).collect::<Vec<_>>();
let rand_l1 = l1
.choose_multiple_weighted(&mut rng, n, |item| item.1)
.ok()?
.map(|node| node.0.clone())
.collect::<Vec<_>>();
let rand_l2 = l2
.choose_multiple_weighted(&mut rng, n, |item| item.1)
.ok()?
.map(|node| node.0.clone())
.collect::<Vec<_>>();
let rand_l3 = l3
.choose_multiple_weighted(&mut rng, n, |item| item.1)
.ok()?
.map(|node| node.0.clone())
.collect::<Vec<_>>();
let rand_gateways = gateways
.choose_multiple_weighted(&mut rng, n, |item| item.1)
.ok()?
.map(|node| node.0.clone())
.collect::<Vec<_>>();

// the unwrap on `min()` is fine as we know the iterator is not empty
let most_available = *[
Expand Down
8 changes: 0 additions & 8 deletions nym-api/src/nym_contract_cache/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,6 @@ impl NymContractCache {
.into_inner()
}

pub async fn legacy_mixnodes_filtered_basic(&self) -> Vec<LegacyMixNodeBondWithLayer> {
self.legacy_mixnodes_filtered()
.await
.into_iter()
.map(|bond| bond.bond_information)
.collect()
}

pub async fn legacy_mixnodes_all_basic(&self) -> Vec<LegacyMixNodeBondWithLayer> {
self.legacy_mixnodes_all()
.await
Expand Down
1 change: 1 addition & 0 deletions nym-api/src/support/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
&config.network_monitor,
&nym_contract_cache_state,
described_nodes_cache.clone(),
node_status_cache_state.clone(),
&storage,
nyxd_client.clone(),
&task_manager,
Expand Down

0 comments on commit df4ba70

Please sign in to comment.