From 8c8b7d71d0c979c82ef1a2ae053c6057444ef093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20H=C3=A4ggblad?= Date: Wed, 24 Aug 2022 14:29:21 +0200 Subject: [PATCH] validator-api: create node status cache with selection probabilies (#1547) * validator-api: create node status cache with selection probabilies Create a node status cache to complement the contract cache. Initially we store the simulated active set selection probabilities. * validator-api: add validator cache watch channel * changelog: add note * validator-api: clippy fixes * validator-api: fix clippy * validator-api: additional fields to inclusion probabilities response * selection chance: revert back to 3 buckets * selection chance: revert buckets again * rustfmt * validator-api: remove the old get_mixnode_inclusion_probability * node-status-cache: return error when refreshing * inclusion-simulator: cap on wall clock time * node status cache: tidy --- CHANGELOG.md | 1 + Cargo.lock | 7 +- common/inclusion-probability/Cargo.toml | 1 + common/inclusion-probability/src/error.rs | 4 +- common/inclusion-probability/src/lib.rs | 155 +++++++++-- .../src/pages/settings/system-variables.tsx | 4 + validator-api/Cargo.toml | 15 +- validator-api/src/contract_cache/mod.rs | 28 +- validator-api/src/main.rs | 36 ++- validator-api/src/node_status_api/cache.rs | 244 ++++++++++++++++++ validator-api/src/node_status_api/mod.rs | 5 + validator-api/src/node_status_api/routes.rs | 78 +++--- .../validator-api-requests/src/models.rs | 31 ++- 13 files changed, 532 insertions(+), 77 deletions(-) create mode 100644 validator-api/src/node_status_api/cache.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 4602044f3eb..b6adc1c8096 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https:// - validator-api: add Swagger to document the REST API ([#1249]). - validator-api: Added new endpoints for coconut spending flow and communications with coconut & multisig contracts ([#1261]) - validator-api: add `uptime`, `estimated_operator_apy`, `estimated_delegators_apy` to `/mixnodes/detailed` endpoint ([#1393]) +- validator-api: add node info cache storing simulated active set inclusion probabilities - network-statistics: a new mixnet service that aggregates and exposes anonymized data about mixnet services ([#1328]) - mixnode: Added basic mixnode hardware reporting to the HTTP API ([#1308]). - validator-api: endpoint, in coconut mode, for returning the validator-api cosmos address ([#1404]). diff --git a/Cargo.lock b/Cargo.lock index 62fd3c5c806..908c8235667 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2500,6 +2500,7 @@ dependencies = [ name = "inclusion-probability" version = "0.1.0" dependencies = [ + "log", "rand 0.8.5", "thiserror", ] @@ -2751,9 +2752,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if 1.0.0", ] @@ -3314,6 +3315,7 @@ dependencies = [ "gateway-client", "getset", "humantime-serde", + "inclusion-probability", "log", "mixnet-contract-common", "multisig-contract-common", @@ -3333,6 +3335,7 @@ dependencies = [ "serde", "serde_json", "sqlx", + "tap", "task", "thiserror", "time 0.3.9", diff --git a/common/inclusion-probability/Cargo.toml b/common/inclusion-probability/Cargo.toml index 0df3bfe4e70..73eedb75a08 100644 --- a/common/inclusion-probability/Cargo.toml +++ b/common/inclusion-probability/Cargo.toml @@ -6,5 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +log = "0.4.17" rand = "0.8.5" thiserror = "1.0.32" diff --git a/common/inclusion-probability/src/error.rs b/common/inclusion-probability/src/error.rs index 9e9908a5857..72a02a2a6a5 100644 --- a/common/inclusion-probability/src/error.rs +++ b/common/inclusion-probability/src/error.rs @@ -4,6 +4,8 @@ pub enum Error { EmptyListCumulStake, #[error("Sample point was unexpectedly out of bounds")] SamplePointOutOfBounds, - #[error("Norm computation failed on different size arrarys")] + #[error("Norm computation failed on different size arrays")] NormDifferenceSizeArrays, + #[error("Computed probabilities are fewer than input number of nodes")] + ResultsShorterThanInput, } diff --git a/common/inclusion-probability/src/lib.rs b/common/inclusion-probability/src/lib.rs index b392ab98035..44923ca5af1 100644 --- a/common/inclusion-probability/src/lib.rs +++ b/common/inclusion-probability/src/lib.rs @@ -1,26 +1,45 @@ //! Active set inclusion probability simulator +use std::time::{Duration, Instant}; + use error::Error; mod error; const TOLERANCE_L2_NORM: f64 = 1e-4; -const TOLERANCE_MAX_NORM: f64 = 1e-3; +const TOLERANCE_MAX_NORM: f64 = 1e-4; pub struct SelectionProbability { pub active_set_probability: Vec, pub reserve_set_probability: Vec, - pub samples: u32, + pub samples: u64, + pub time: Duration, pub delta_l2: f64, pub delta_max: f64, } pub fn simulate_selection_probability_mixnodes( - list_stake_for_mixnodes: &[u64], + list_stake_for_mixnodes: &[u128], active_set_size: usize, reserve_set_size: usize, - max_samples: u32, + max_samples: u64, + max_time: Duration, ) -> Result { + log::trace!("Simulating mixnode active set selection probability"); + + // In case the active set size is larger than the number of bonded mixnodes, they all have 100% + // chance we don't have to go through with the simulation + if list_stake_for_mixnodes.len() <= active_set_size { + return Ok(SelectionProbability { + active_set_probability: vec![1.0; list_stake_for_mixnodes.len()], + reserve_set_probability: vec![0.0; list_stake_for_mixnodes.len()], + samples: 0, + time: Duration::ZERO, + delta_l2: 0.0, + delta_max: 0.0, + }); + } + // Total number of existing (registered) nodes let num_mixnodes = list_stake_for_mixnodes.len(); @@ -37,6 +56,9 @@ pub fn simulate_selection_probability_mixnodes( let mut delta_max; let mut rng = rand::thread_rng(); + // Make sure we bound the time we allow it to run + let start_time = Instant::now(); + loop { samples += 1; let mut sample_active_mixnodes = Vec::new(); @@ -46,7 +68,9 @@ pub fn simulate_selection_probability_mixnodes( let active_set_probability_previous = active_set_probability.clone(); // Select the active nodes for the epoch (hour) - while sample_active_mixnodes.len() < active_set_size { + while sample_active_mixnodes.len() < active_set_size + && sample_active_mixnodes.len() < list_cumul_temp.len() + { let candidate = sample_candidate(&list_cumul_temp, &mut rng)?; if !sample_active_mixnodes.contains(&candidate) { @@ -56,7 +80,9 @@ pub fn simulate_selection_probability_mixnodes( } // Select the reserve nodes for the epoch (hour) - while sample_reserve_mixnodes.len() < reserve_set_size { + while sample_reserve_mixnodes.len() < reserve_set_size + && sample_reserve_mixnodes.len() + sample_active_mixnodes.len() < list_cumul_temp.len() + { let candidate = sample_candidate(&list_cumul_temp, &mut rng)?; if !sample_reserve_mixnodes.contains(&candidate) @@ -78,35 +104,49 @@ pub fn simulate_selection_probability_mixnodes( // Convergence critera only on active set. // We devide by samples to get the average, that is not really part of the delta // computation. - delta_l2 = l2_diff(&active_set_probability, &active_set_probability_previous)? - / f64::from(samples); - delta_max = max_diff(&active_set_probability, &active_set_probability_previous)? - / f64::from(samples); + delta_l2 = + l2_diff(&active_set_probability, &active_set_probability_previous)? / (samples as f64); + delta_max = + max_diff(&active_set_probability, &active_set_probability_previous)? / (samples as f64); if samples > 10 && delta_l2 < TOLERANCE_L2_NORM && delta_max < TOLERANCE_MAX_NORM || samples >= max_samples { break; } + + // Stop if we run out of time + if start_time.elapsed() > max_time { + log::debug!("Simulation ran out of time, stopping"); + break; + } } + // Divide occurrences with the number of samples once we're done to get the probabilities. active_set_probability .iter_mut() - .for_each(|x| *x /= f64::from(samples)); + .for_each(|x| *x /= samples as f64); reserve_set_probability .iter_mut() - .for_each(|x| *x /= f64::from(samples)); + .for_each(|x| *x /= samples as f64); + + // Some sanity checks of the output + if active_set_probability.len() != num_mixnodes || reserve_set_probability.len() != num_mixnodes + { + return Err(Error::ResultsShorterThanInput); + } Ok(SelectionProbability { active_set_probability, reserve_set_probability, samples, + time: start_time.elapsed(), delta_l2, delta_max, }) } // Compute the cumulative sum -fn cumul_sum<'a>(list: impl IntoIterator) -> Vec { +fn cumul_sum<'a>(list: impl IntoIterator) -> Vec { let mut list_cumul = Vec::new(); let mut cumul = 0; for entry in list { @@ -116,7 +156,7 @@ fn cumul_sum<'a>(list: impl IntoIterator) -> Vec { list_cumul } -fn sample_candidate(list_cumul: &[u64], rng: &mut rand::rngs::ThreadRng) -> Result { +fn sample_candidate(list_cumul: &[u128], rng: &mut rand::rngs::ThreadRng) -> Result { use rand::distributions::{Distribution, Uniform}; let uniform = Uniform::from(0..*list_cumul.last().ok_or(Error::EmptyListCumulStake)?); let r = uniform.sample(rng); @@ -132,7 +172,7 @@ fn sample_candidate(list_cumul: &[u64], rng: &mut rand::rngs::ThreadRng) -> Resu } // Update list of cumulative stake to reflect eliminating the picked node -fn remove_mixnode_from_cumul_stake(candidate: usize, list_cumul_stake: &mut [u64]) { +fn remove_mixnode_from_cumul_stake(candidate: usize, list_cumul_stake: &mut [u128]) { let prob_candidate = if candidate == 0 { list_cumul_stake[0] } else { @@ -212,11 +252,13 @@ mod tests { ]; let max_samples = 100_000; + let max_time = Duration::from_secs(10); let SelectionProbability { active_set_probability, reserve_set_probability, samples, + time: _, delta_l2, delta_max, } = simulate_selection_probability_mixnodes( @@ -224,6 +266,7 @@ mod tests { active_set_size, standby_set_size, max_samples, + max_time, ) .unwrap(); @@ -275,4 +318,86 @@ mod tests { assert!(delta_l2 < TOLERANCE_L2_NORM); assert!(delta_max < TOLERANCE_MAX_NORM); } + + #[test] + fn fewer_nodes_than_active_set_size() { + let active_set_size = 10; + let standby_set_size = 3; + let list_mix = vec![100, 100, 3000]; + let max_samples = 100_000; + let max_time = Duration::from_secs(10); + + let SelectionProbability { + active_set_probability, + reserve_set_probability, + samples, + time: _, + delta_l2, + delta_max, + } = simulate_selection_probability_mixnodes( + &list_mix, + active_set_size, + standby_set_size, + max_samples, + max_time, + ) + .unwrap(); + + // These values comes from running the python simulator for a very long time + let expected_active_set_probability = vec![1.0, 1.0, 1.0]; + let expected_reserve_set_probability = vec![0.0, 0.0, 0.0]; + assert!( + max_diff(&active_set_probability, &expected_active_set_probability).unwrap() + < 1e1 * f64::EPSILON + ); + assert!( + max_diff(&reserve_set_probability, &expected_reserve_set_probability).unwrap() + < 1e1 * f64::EPSILON + ); + + // We converge around 20_000, add another 500 for some slack due to random values + assert_eq!(samples, 0); + assert!(delta_l2 < f64::EPSILON); + assert!(delta_max < f64::EPSILON); + } + + #[test] + fn fewer_nodes_than_reward_set_size() { + let active_set_size = 4; + let standby_set_size = 3; + let list_mix = vec![100, 100, 3000, 342, 3_498_234]; + let max_samples = 100_000_000; + let max_time = Duration::from_secs(10); + + let SelectionProbability { + active_set_probability, + reserve_set_probability, + samples, + time: _, + delta_l2, + delta_max, + } = simulate_selection_probability_mixnodes( + &list_mix, + active_set_size, + standby_set_size, + max_samples, + max_time, + ) + .unwrap(); + + // These values comes from running the python simulator for a very long time + let expected_active_set_probability = vec![0.546, 0.538, 0.999, 0.915, 1.0]; + let expected_reserve_set_probability = vec![0.453, 0.461, 0.0005, 0.084, 0.0]; + assert!( + max_diff(&active_set_probability, &expected_active_set_probability).unwrap() < 1e-2, + ); + assert!( + max_diff(&reserve_set_probability, &expected_reserve_set_probability).unwrap() < 1e-2, + ); + + // We converge around 20_000, add another 500 for some slack due to random values + assert!(samples < 20_500); + assert!(delta_l2 < TOLERANCE_L2_NORM); + assert!(delta_max < TOLERANCE_MAX_NORM); + } } diff --git a/nym-wallet/src/pages/settings/system-variables.tsx b/nym-wallet/src/pages/settings/system-variables.tsx index b43c4041279..96fb7322113 100644 --- a/nym-wallet/src/pages/settings/system-variables.tsx +++ b/nym-wallet/src/pages/settings/system-variables.tsx @@ -30,14 +30,18 @@ const DataField = ({ title, info, Indicator }: { title: string; info: string; In ); const colorMap: { [key in SelectionChance]: string } = { + VeryLow: 'error.main', Low: 'error.main', Moderate: 'warning.main', + High: 'success.main', VeryHigh: 'success.main', }; const textMap: { [key in SelectionChance]: string } = { + VeryLow: 'VeryLow', Low: 'Low', Moderate: 'Moderate', + High: 'High', VeryHigh: 'Very high', }; diff --git a/validator-api/Cargo.toml b/validator-api/Cargo.toml index ce09411419d..6c5faecb822 100644 --- a/validator-api/Cargo.toml +++ b/validator-api/Cargo.toml @@ -16,7 +16,9 @@ rust-version = "1.56" [dependencies] async-trait = "0.1.52" +cfg-if = "1.0" clap = "2.33.0" +console-subscriber = { version = "0.1.1", optional = true} # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable" dirs = "4.0" dotenv = "0.15.0" futures = "0.3" @@ -31,6 +33,7 @@ rocket = { version = "0.5.0-rc.2", features = ["json"] } rocket_cors = { git="https://github.com/lawliet89/rocket_cors", rev="dfd3662c49e2f6fc37df35091cb94d82f7fb5915" } serde = "1.0" serde_json = "1.0" +tap = "1.0.1" thiserror = "1" time = { version = "0.3", features = ["serde-human-readable", "parsing"]} tokio = { version = "1.19.1", features = ["rt-multi-thread", "macros", "signal", "time"] } @@ -51,25 +54,23 @@ schemars = { version = "0.8", features = ["preserve_order"] } ## internal coconut-bandwidth-contract-common = { path = "../common/cosmwasm-smart-contracts/coconut-bandwidth-contract" } +coconut-interface = { path = "../common/coconut-interface", optional = true } config = { path = "../common/config" } cosmwasm-std = "1.0.0" +credential-storage = { path = "../common/credential-storage" } +credentials = { path = "../common/credentials", optional = true } crypto = { path="../common/crypto" } gateway-client = { path="../common/client-libs/gateway-client" } +inclusion-probability = { path = "../common/inclusion-probability" } mixnet-contract-common = { path= "../common/cosmwasm-smart-contracts/mixnet-contract" } multisig-contract-common = { path = "../common/cosmwasm-smart-contracts/multisig-contract" } -nymsphinx = { path="../common/nymsphinx" } nymcoconut = { path = "../common/nymcoconut", optional = true } +nymsphinx = { path="../common/nymsphinx" } task = { path = "../common/task" } topology = { path="../common/topology" } validator-api-requests = { path = "validator-api-requests" } validator-client = { path="../common/client-libs/validator-client", features = ["nymd-client"] } version-checker = { path="../common/version-checker" } -coconut-interface = { path = "../common/coconut-interface", optional = true } -credentials = { path = "../common/credentials", optional = true } -credential-storage = { path = "../common/credential-storage" } -# validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable" -console-subscriber = { version = "0.1.1", optional = true} -cfg-if = "1.0" [features] coconut = ["coconut-interface", "credentials", "gateway-client/coconut", "credentials/coconut", "validator-api-requests/coconut", "nymcoconut"] diff --git a/validator-api/src/contract_cache/mod.rs b/validator-api/src/contract_cache/mod.rs index 593fe63b88d..a7f2b91b0bb 100644 --- a/validator-api/src/contract_cache/mod.rs +++ b/validator-api/src/contract_cache/mod.rs @@ -23,7 +23,7 @@ use std::collections::HashSet; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; +use tokio::sync::{watch, RwLock}; use tokio::time; use validator_api_requests::models::{MixNodeBondAnnotated, MixnodeStatus}; use validator_client::nymd::CosmWasmClient; @@ -31,6 +31,13 @@ use validator_client::nymd::CosmWasmClient; pub(crate) mod reward_estimate; pub(crate) mod routes; +// The cache can emit notifications to listeners about the current state +#[derive(Debug, PartialEq, Eq)] +pub enum CacheNotification { + Start, + Updated, +} + pub struct ValidatorCacheRefresher { nymd_client: Client, cache: ValidatorCache, @@ -38,6 +45,9 @@ pub struct ValidatorCacheRefresher { // Readonly: some of the quantities cached depends on values from the storage. storage: Option, + + // Notify listeners that the cache has been updated + update_notifier: watch::Sender, } #[derive(Clone)] @@ -73,14 +83,14 @@ pub struct Cache { } impl Cache { - fn new(value: T) -> Self { + pub(super) fn new(value: T) -> Self { Cache { value, as_at: current_unix_timestamp(), } } - fn update(&mut self, value: T) { + pub(super) fn update(&mut self, value: T) { self.value = value; self.as_at = current_unix_timestamp() } @@ -101,11 +111,13 @@ impl ValidatorCacheRefresher { cache: ValidatorCache, storage: Option, ) -> Self { + let (tx, _) = watch::channel(CacheNotification::Start); ValidatorCacheRefresher { nymd_client, cache, caching_interval, storage, + update_notifier: tx, } } @@ -117,6 +129,10 @@ impl ValidatorCacheRefresher { .ok() } + pub fn subscribe(&self) -> watch::Receiver { + self.update_notifier.subscribe() + } + async fn annotate_bond_with_details( &self, mixnodes: Vec, @@ -250,6 +266,10 @@ impl ValidatorCacheRefresher { ) .await; + if let Err(err) = self.update_notifier.send(CacheNotification::Updated) { + warn!("Failed to notify validator cache refresh: {}", err); + } + Ok(()) } @@ -271,7 +291,7 @@ impl ValidatorCacheRefresher { } } _ = shutdown.recv() => { - trace!("UpdateHandler: Received shutdown"); + trace!("ValidatorCacheRefresher: Received shutdown"); } } } diff --git a/validator-api/src/main.rs b/validator-api/src/main.rs index 6fe0ca07ae8..dd4ec49172c 100644 --- a/validator-api/src/main.rs +++ b/validator-api/src/main.rs @@ -19,6 +19,7 @@ use anyhow::Result; use clap::{crate_version, App, Arg, ArgMatches}; use contract_cache::ValidatorCache; use log::{info, warn}; +use node_status_api::NodeStatusCache; use okapi::openapi3::OpenApi; use rocket::fairing::AdHoc; use rocket::http::Method; @@ -470,7 +471,8 @@ async fn setup_rocket( .mount("/swagger", make_swagger_ui(&swagger::get_docs())) .attach(setup_cors()?) .attach(setup_liftoff_notify(liftoff_notify)) - .attach(ValidatorCache::stage()); + .attach(ValidatorCache::stage()) + .attach(NodeStatusCache::stage()); // This is not a very nice approach. A lazy value would be more suitable, but that's still // a nightly feature: https://github.com/rust-lang/rust/issues/74465 @@ -579,10 +581,11 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> { let monitor_builder = setup_network_monitor(&config, system_version, &rocket); let validator_cache = rocket.state::().unwrap().clone(); + let node_status_cache = rocket.state::().unwrap().clone(); // if network monitor is disabled, we're not going to be sending any rewarding hence // we're not starting signing client - if config.get_network_monitor_enabled() { + let validator_cache_listener = if config.get_network_monitor_enabled() { // Main storage let storage = rocket.state::().unwrap().clone(); @@ -592,13 +595,14 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> { let shutdown_listener = shutdown.subscribe(); tokio::spawn(async move { uptime_updater.run(shutdown_listener).await }); - // spawn the cache refresher + // spawn the validator cache refresher let validator_cache_refresher = ValidatorCacheRefresher::new( signing_nymd_client.clone(), config.get_caching_interval(), validator_cache.clone(), Some(storage.clone()), ); + let validator_cache_listener = validator_cache_refresher.subscribe(); let shutdown_listener = shutdown.subscribe(); tokio::spawn(async move { validator_cache_refresher.run(shutdown_listener).await }); @@ -606,19 +610,37 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> { let mut rewarded_set_updater = RewardedSetUpdater::new(signing_nymd_client, validator_cache.clone(), storage).await?; tokio::spawn(async move { rewarded_set_updater.run().await.unwrap() }); + + validator_cache_listener } else { + // Spawn the validator cache refresher. + // When the network monitor is not enabled, we spawn the validator cache refresher task + // with just a nymd client, in contrast to a signing client. let nymd_client = Client::new_query(&config); let validator_cache_refresher = ValidatorCacheRefresher::new( nymd_client, config.get_caching_interval(), - validator_cache, + validator_cache.clone(), None, ); - + let validator_cache_listener = validator_cache_refresher.subscribe(); let shutdown_listener = shutdown.subscribe(); - // spawn our cacher tokio::spawn(async move { validator_cache_refresher.run(shutdown_listener).await }); - } + + validator_cache_listener + }; + + // Spawn the node status cache refresher. + // It is primarily refreshed in-sync with the validator cache, however provide a fallback + // caching interval that is twice the validator cache + let mut validator_api_cache_refresher = node_status_api::NodeStatusCacheRefresher::new( + node_status_cache, + validator_cache, + validator_cache_listener, + config.get_caching_interval().saturating_mul(2), + ); + let shutdown_listener = shutdown.subscribe(); + tokio::spawn(async move { validator_api_cache_refresher.run(shutdown_listener).await }); // launch the rocket! // Rocket handles shutdown on it's own, but its shutdown handling should be incorporated diff --git a/validator-api/src/node_status_api/cache.rs b/validator-api/src/node_status_api/cache.rs new file mode 100644 index 00000000000..07ebc19ad8a --- /dev/null +++ b/validator-api/src/node_status_api/cache.rs @@ -0,0 +1,244 @@ +// Copyright 2022 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use rocket::fairing::AdHoc; +use serde::Serialize; +use tap::TapFallible; +use tokio::{ + sync::{watch, RwLock}, + time, +}; + +use std::{sync::Arc, time::Duration}; + +use mixnet_contract_common::{reward_params::EpochRewardParams, MixNodeBond}; +use task::ShutdownListener; +use validator_api_requests::models::InclusionProbability; + +use crate::contract_cache::{Cache, CacheNotification, ValidatorCache}; + +const CACHE_TIMOUT_MS: u64 = 100; +const MAX_SIMULATION_SAMPLES: u64 = 5000; +const MAX_SIMULATION_TIME_SEC: u64 = 15; + +enum NodeStatusCacheError { + SimulationFailed, +} + +// A node status cache suitable for caching values computed in one sweep, such as active set +// inclusion probabilities that are computed for all mixnodes at the same time. +// +// The cache can be triggered to update on contract cache changes, and/or periodically on a timer. +#[derive(Clone)] +pub struct NodeStatusCache { + inner: Arc>, +} + +struct NodeStatusCacheInner { + inclusion_probabilities: Cache, +} + +#[derive(Clone, Default, Serialize, schemars::JsonSchema)] +pub(crate) struct InclusionProbabilities { + pub inclusion_probabilities: Vec, + pub samples: u64, + pub elapsed: Duration, + pub delta_max: f64, + pub delta_l2: f64, +} + +impl InclusionProbabilities { + pub fn node(&self, id: &str) -> Option<&InclusionProbability> { + self.inclusion_probabilities.iter().find(|x| x.id == id) + } +} + +impl NodeStatusCache { + fn new() -> Self { + NodeStatusCache { + inner: Arc::new(RwLock::new(NodeStatusCacheInner::new())), + } + } + + pub fn stage() -> AdHoc { + AdHoc::on_ignite("Node Status Cache", |rocket| async { + rocket.manage(Self::new()) + }) + } + + async fn update_cache(&self, inclusion_probabilities: InclusionProbabilities) { + match time::timeout(Duration::from_millis(CACHE_TIMOUT_MS), self.inner.write()).await { + Ok(mut cache) => { + cache + .inclusion_probabilities + .update(inclusion_probabilities); + } + Err(e) => error!("{e}"), + } + } + + pub(crate) async fn inclusion_probabilities(&self) -> Option> { + match time::timeout(Duration::from_millis(CACHE_TIMOUT_MS), self.inner.read()).await { + Ok(cache) => Some(cache.inclusion_probabilities.clone()), + Err(e) => { + error!("{e}"); + None + } + } + } +} + +impl NodeStatusCacheInner { + pub fn new() -> Self { + Self { + inclusion_probabilities: Default::default(), + } + } +} + +// Long running task responsible of keeping the cache up-to-date. +pub struct NodeStatusCacheRefresher { + cache: NodeStatusCache, + contract_cache: ValidatorCache, + contract_cache_listener: watch::Receiver, + fallback_caching_interval: Duration, +} + +impl NodeStatusCacheRefresher { + pub(crate) fn new( + cache: NodeStatusCache, + contract_cache: ValidatorCache, + contract_cache_listener: watch::Receiver, + fallback_caching_interval: Duration, + ) -> Self { + Self { + cache, + contract_cache, + contract_cache_listener, + fallback_caching_interval, + } + } + + pub async fn run(&mut self, mut shutdown: ShutdownListener) { + let mut fallback_interval = time::interval(self.fallback_caching_interval); + while !shutdown.is_shutdown() { + tokio::select! { + // Update node status cache when the contract cache / validator cache is updated + Ok(_) = self.contract_cache_listener.changed() => { + self.update_on_notify(&mut fallback_interval).await; + } + // ... however, if we don't receive any notifications we fall back to periodic + // refreshes + _ = fallback_interval.tick() => { + self.update_on_timer().await; + } + _ = shutdown.recv() => { + log::trace!("NodeStatusCacheRefresher: Received shutdown"); + } + } + } + log::info!("NodeStatusCacheRefresher: Exiting"); + } + + async fn update_on_notify(&self, fallback_interval: &mut time::Interval) { + log::debug!( + "Validator cache event detected: {:?}", + &*self.contract_cache_listener.borrow(), + ); + let _ = self.refresh_cache().await; + fallback_interval.reset(); + } + + async fn update_on_timer(&self) { + log::debug!("Timed trigger for the node status cache"); + let have_contract_cache_data = + *self.contract_cache_listener.borrow() != CacheNotification::Start; + + if have_contract_cache_data { + let _ = self.refresh_cache().await; + } else { + log::trace!( + "Skipping updating node status cache, is the contract cache not yet available?" + ); + } + } + + async fn refresh_cache(&self) -> Result<(), NodeStatusCacheError> { + log::info!("Updating node status cache"); + let mixnode_bonds = self.contract_cache.mixnodes().await; + let params = self.contract_cache.epoch_reward_params().await.into_inner(); + let inclusion_probabilities = compute_inclusion_probabilities(&mixnode_bonds, params) + .ok_or_else(|| { + error!( + "Failed to simulate selection probabilties for mixnodes, not updating cache" + ); + NodeStatusCacheError::SimulationFailed + })?; + + self.cache.update_cache(inclusion_probabilities).await; + Ok(()) + } +} + +fn compute_inclusion_probabilities( + mixnode_bonds: &[MixNodeBond], + params: EpochRewardParams, +) -> Option { + let active_set_size = params + .active_set_size() + .try_into() + .tap_err(|e| error!("Active set size unexpectantly large: {e}")) + .ok()?; + let standby_set_size = (params.rewarded_set_size() - params.active_set_size()) + .try_into() + .tap_err(|e| error!("Active set size larger than rewarded set size, a contradiction: {e}")) + .ok()?; + + // Unzip list of total bonds into ids and bonds. + // We need to go through this zip/unzip procedure to make sure we have matching identities + // for the input to the simulator, which assumes the identity is the position in the vec + let (ids, mixnode_total_bonds) = unzip_into_mixnode_ids_and_total_bonds(mixnode_bonds); + + // Compute inclusion probabilitites and keep track of how long time it took. + let results = inclusion_probability::simulate_selection_probability_mixnodes( + &mixnode_total_bonds, + active_set_size, + standby_set_size, + MAX_SIMULATION_SAMPLES, + Duration::from_secs(MAX_SIMULATION_TIME_SEC), + ) + .tap_err(|err| error!("{err}")) + .ok()?; + + Some(InclusionProbabilities { + inclusion_probabilities: zip_ids_together_with_results(&ids, &results), + samples: results.samples, + elapsed: results.time, + delta_max: results.delta_max, + delta_l2: results.delta_l2, + }) +} + +fn unzip_into_mixnode_ids_and_total_bonds( + mixnode_bonds: &[MixNodeBond], +) -> (Vec<&String>, Vec) { + mixnode_bonds + .iter() + .filter_map(|m| m.total_bond().map(|b| (m.identity(), b))) + .unzip() +} + +fn zip_ids_together_with_results( + ids: &[&String], + results: &inclusion_probability::SelectionProbability, +) -> Vec { + ids.iter() + .zip(results.active_set_probability.iter()) + .zip(results.reserve_set_probability.iter()) + .map(|((id, a), r)| InclusionProbability { + id: (*id).to_string(), + in_active: *a, + in_reserve: *r, + }) + .collect() +} diff --git a/validator-api/src/node_status_api/mod.rs b/validator-api/src/node_status_api/mod.rs index dab874f96d5..fe672202290 100644 --- a/validator-api/src/node_status_api/mod.rs +++ b/validator-api/src/node_status_api/mod.rs @@ -1,11 +1,14 @@ // Copyright 2021 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +pub(crate) use cache::{NodeStatusCache, NodeStatusCacheRefresher}; + use okapi::openapi3::OpenApi; use rocket::Route; use rocket_okapi::{openapi_get_routes_spec, settings::OpenApiSettings}; use std::time::Duration; +pub(crate) mod cache; pub(crate) mod local_guard; pub(crate) mod models; pub(crate) mod routes; @@ -35,6 +38,7 @@ pub(crate) fn node_status_routes( routes::get_mixnode_inclusion_probability, routes::get_mixnode_avg_uptime, routes::get_mixnode_avg_uptimes, + routes::get_mixnode_inclusion_probabilities, ] } else { // in the minimal variant we would not have access to endpoints relying on existence @@ -43,6 +47,7 @@ pub(crate) fn node_status_routes( routes::get_mixnode_status, routes::get_mixnode_stake_saturation, routes::get_mixnode_inclusion_probability, + routes::get_mixnode_inclusion_probabilities, ] } } diff --git a/validator-api/src/node_status_api/routes.rs b/validator-api/src/node_status_api/routes.rs index c25d0e6b134..bd71b78c59e 100644 --- a/validator-api/src/node_status_api/routes.rs +++ b/validator-api/src/node_status_api/routes.rs @@ -1,6 +1,7 @@ // Copyright 2021 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +use crate::contract_cache::Cache; use crate::node_status_api::models::{ ErrorResponse, GatewayStatusReport, GatewayUptimeHistory, MixnodeStatusReport, MixnodeUptimeHistory, @@ -16,11 +17,12 @@ use rocket_okapi::openapi; use schemars::JsonSchema; use serde::Deserialize; use validator_api_requests::models::{ - CoreNodeStatusResponse, InclusionProbabilityResponse, MixnodeStatusResponse, - RewardEstimationResponse, StakeSaturationResponse, UptimeResponse, + AllInclusionProbabilitiesResponse, CoreNodeStatusResponse, InclusionProbabilityResponse, + MixnodeStatusResponse, RewardEstimationResponse, StakeSaturationResponse, UptimeResponse, }; use super::models::Uptime; +use super::NodeStatusCache; async fn average_mixnode_uptime( identity: &str, @@ -307,43 +309,21 @@ pub(crate) async fn get_mixnode_stake_saturation( #[openapi(tag = "status")] #[get("/mixnode//inclusion-probability")] pub(crate) async fn get_mixnode_inclusion_probability( - cache: &State, + node_status_cache: &State, identity: String, ) -> Json> { - let mixnodes = cache.mixnodes().await; - let rewarding_params = cache.epoch_reward_params().await.into_inner(); - - if let Some(target_mixnode) = mixnodes.iter().find(|x| x.identity() == &identity) { - let total_bonded_tokens = mixnodes - .iter() - .fold(0u128, |acc, x| acc + x.total_bond().unwrap_or_default()) - as f64; - - let rewarded_set_size = rewarding_params.rewarded_set_size() as f64; - let active_set_size = rewarding_params.active_set_size() as f64; - - let prob_one_draw = - target_mixnode.total_bond().unwrap_or_default() as f64 / total_bonded_tokens; - // Chance to be selected in any draw for active set - let prob_active_set = if mixnodes.len() <= active_set_size as usize { - 1.0 - } else { - active_set_size * prob_one_draw - }; - // This is likely slightly too high, as we're not correcting form them not being selected in active, should be chance to be selected, minus the chance for being not selected in reserve - let prob_reserve_set = if mixnodes.len() <= rewarded_set_size as usize { - 1.0 - } else { - (rewarded_set_size - active_set_size) * prob_one_draw - }; - - Json(Some(InclusionProbabilityResponse { - in_active: prob_active_set.into(), - in_reserve: prob_reserve_set.into(), - })) - } else { - Json(None) - } + node_status_cache + .inclusion_probabilities() + .await + .map(Cache::into_inner) + .and_then(|p| p.node(&identity).cloned()) + .map(|p| { + Json(Some(InclusionProbabilityResponse { + in_active: p.in_active.into(), + in_reserve: p.in_reserve.into(), + })) + }) + .unwrap_or(Json(None)) } #[openapi(tag = "status")] @@ -384,3 +364,27 @@ pub(crate) async fn get_mixnode_avg_uptimes( Ok(Json(response)) } + +#[openapi(tag = "status")] +#[get("/mixnodes/inclusion_probability")] +pub(crate) async fn get_mixnode_inclusion_probabilities( + cache: &State, +) -> Result, ErrorResponse> { + if let Some(prob) = cache.inclusion_probabilities().await { + let as_at = prob.timestamp(); + let prob = prob.into_inner(); + Ok(Json(AllInclusionProbabilitiesResponse { + inclusion_probabilities: prob.inclusion_probabilities, + samples: prob.samples, + elapsed: prob.elapsed, + delta_max: prob.delta_max, + delta_l2: prob.delta_l2, + as_at, + })) + } else { + Err(ErrorResponse::new( + "No data available".to_string(), + Status::ServiceUnavailable, + )) + } +} diff --git a/validator-api/validator-api-requests/src/models.rs b/validator-api/validator-api-requests/src/models.rs index 3018d2b77a6..a2af343a59e 100644 --- a/validator-api/validator-api-requests/src/models.rs +++ b/validator-api/validator-api-requests/src/models.rs @@ -4,7 +4,7 @@ use mixnet_contract_common::{reward_params::RewardParams, MixNode, MixNodeBond}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use std::fmt; +use std::{fmt, time::Duration}; #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, JsonSchema)] #[cfg_attr(feature = "generate-ts", derive(ts_rs::TS))] @@ -101,16 +101,20 @@ pub type StakeSaturation = f32; )] pub enum SelectionChance { VeryHigh, + High, Moderate, Low, + VeryLow, } impl From for SelectionChance { fn from(p: f64) -> SelectionChance { match p { - p if p > 0.15 => SelectionChance::VeryHigh, - p if p >= 0.05 => SelectionChance::Moderate, - _ => SelectionChance::Low, + p if p > 0.98 => SelectionChance::VeryHigh, + p if p > 0.9 => SelectionChance::High, + p if p > 0.7 => SelectionChance::Moderate, + p if p > 0.5 => SelectionChance::Low, + _ => SelectionChance::VeryLow, } } } @@ -119,8 +123,10 @@ impl fmt::Display for SelectionChance { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { SelectionChance::VeryHigh => write!(f, "VeryHigh"), + SelectionChance::High => write!(f, "High"), SelectionChance::Moderate => write!(f, "Moderate"), SelectionChance::Low => write!(f, "Low"), + SelectionChance::VeryLow => write!(f, "VeryLow"), } } } @@ -145,3 +151,20 @@ impl fmt::Display for InclusionProbabilityResponse { ) } } + +#[derive(Clone, Serialize, schemars::JsonSchema)] +pub struct AllInclusionProbabilitiesResponse { + pub inclusion_probabilities: Vec, + pub samples: u64, + pub elapsed: Duration, + pub delta_max: f64, + pub delta_l2: f64, + pub as_at: i64, +} + +#[derive(Clone, Serialize, schemars::JsonSchema)] +pub struct InclusionProbability { + pub id: String, + pub in_active: f64, + pub in_reserve: f64, +}