diff --git a/chain/src/pibd_params.rs b/chain/src/pibd_params.rs index 30af492a5..56c207b84 100644 --- a/chain/src/pibd_params.rs +++ b/chain/src/pibd_params.rs @@ -26,15 +26,6 @@ use sysinfo::{MemoryRefreshKind, RefreshKind, System}; /// Segment heights for Header Hashes. Note, this number is needs to be the same for all network pub const PIBD_MESSAGE_SIZE_LIMIT: usize = 256 * 1034; // Let's use 256k messages max. I think we should be good to handle that -/// Retry request to the header if next 10 are already returned. -pub const HEADERS_RETRY_DELTA: u64 = 10; - -/// Retry request to the segments if next 5 are already returned. -pub const SEGMENTS_RETRY_DELTA: usize = 5; - -/// Retry request to the blocks if next 10 are already returned. -pub const BLOCKS_RETRY_DELTA: u64 = 10; - // Here are series for different available resources. Mem and CPU thresholds are allways the same. const HEADERS_HASH_BUFFER_LEN: [usize; 4] = [10, 20, 30, 60]; @@ -196,7 +187,7 @@ impl PibdParams { } fn get_network_speed_multiplier(&self, average_latency_ms: u32) -> f64 { - if average_latency_ms == 0 { + if average_latency_ms == 0 || average_latency_ms == 30000 { return 1.0; } if (Utc::now() - self.network_speed.read().last_network_speed_update).num_seconds() > 5 { @@ -217,6 +208,10 @@ impl PibdParams { network_speed.network_speed_multiplier = 0.05f64 .max((network_speed.network_speed_multiplier) / (1.0 + 0.15 * update_mul)); } + debug!( + "for current latency {} ms the new network speed multiplier is {}", + average_latency_ms, network_speed.network_speed_multiplier + ); network_speed.network_speed_multiplier } else { self.network_speed.read().network_speed_multiplier diff --git a/chain/src/txhashset/headers_desegmenter.rs b/chain/src/txhashset/headers_desegmenter.rs index 15948ed6a..8adb0cfa5 100644 --- a/chain/src/txhashset/headers_desegmenter.rs +++ b/chain/src/txhashset/headers_desegmenter.rs @@ -305,6 +305,7 @@ impl HeadersRecieveCache { let mut first_in_cache = 0; let mut last_in_cache = 0; let mut has10_idx = 0; + let headers_to_retry = headers_cache_size_limit as u64 / 5; for hash_idx in base_hash_idx..=max_idx { // let's check if cache already have it @@ -323,7 +324,7 @@ impl HeadersRecieveCache { } if last_in_cache > 0 { - if last_in_cache - first_in_cache > pibd_params::HEADERS_RETRY_DELTA { + if last_in_cache - first_in_cache > headers_to_retry { has10_idx = first_in_cache; } first_in_cache = 0; diff --git a/chain/src/txhashset/segments_cache.rs b/chain/src/txhashset/segments_cache.rs index 7fa6be322..7ee58c500 100644 --- a/chain/src/txhashset/segments_cache.rs +++ b/chain/src/txhashset/segments_cache.rs @@ -15,7 +15,6 @@ //! Manages the segments caching use crate::error::Error; -use crate::pibd_params; use crate::txhashset::request_lookup::RequestLookup; use mwc_core::core::{Segment, SegmentIdentifier, SegmentType}; use std::cmp; @@ -106,6 +105,7 @@ impl SegmentsCache { let mut first_in_cache = 0; let mut last_in_cache = 0; let mut has_5_idx = 0; + let retry_delta = cache_size_limit / 5; for idx in self.received_segments..max_segm_idx { let segm = &self.required_segments[idx]; @@ -120,7 +120,7 @@ impl SegmentsCache { } if last_in_cache > 0 { - if last_in_cache - first_in_cache > pibd_params::SEGMENTS_RETRY_DELTA { + if last_in_cache - first_in_cache > retry_delta { has_5_idx = first_in_cache; } first_in_cache = 0; diff --git a/servers/src/mwc/sync/body_sync.rs b/servers/src/mwc/sync/body_sync.rs index 70a7eb9b2..ba4686e8e 100644 --- a/servers/src/mwc/sync/body_sync.rs +++ b/servers/src/mwc/sync/body_sync.rs @@ -27,7 +27,7 @@ use mwc_util::RwLock; use p2p::Capabilities; use rand::prelude::*; use std::cmp; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::sync::Arc; pub struct BodySync { @@ -38,6 +38,7 @@ pub struct BodySync { pibd_params: Arc, last_retry_height: RwLock, retry_expiration_times: RwLock>>, + excluded_peers: RwLock>, } impl BodySync { @@ -50,6 +51,7 @@ impl BodySync { request_series: RwLock::new(Vec::new()), last_retry_height: RwLock::new(0), retry_expiration_times: RwLock::new(VecDeque::new()), + excluded_peers: RwLock::new(HashSet::new()), } } @@ -122,12 +124,19 @@ impl BodySync { }; *self.required_capabilities.write() = required_capabilities; + // requested_blocks, check for expiration + let excluded_peers = self + .request_tracker + .retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers); + *self.excluded_peers.write() = excluded_peers; + let (peers, excluded_requests, excluded_peers) = sync_utils::get_sync_peers( in_peers, self.pibd_params.get_blocks_request_per_peer(), peer_capabilities, head.height, &self.request_tracker, + &*self.excluded_peers.read(), ); if peers.is_empty() { if excluded_peers == 0 { @@ -152,10 +161,6 @@ impl BodySync { } } - // requested_blocks, check for expiration - self.request_tracker - .retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers); - sync_state.update(SyncStatus::BodySync { archive_height: if self.chain.archive_mode() { 0 @@ -293,6 +298,7 @@ impl BodySync { *self.required_capabilities.read(), head.height, &self.request_tracker, + &*self.excluded_peers.read(), ); if !peers.is_empty() { // requested_blocks, check for expiration @@ -363,6 +369,7 @@ impl BodySync { let mut first_in_cache = 0; let mut last_in_cache = 0; let mut has10_idx = 0; + let retry_delta = cmp::max(7, request_series.len() as u64 / 5); for (hash, height) in request_series.iter().rev() { if self.is_block_recieved(&hash)? { @@ -376,7 +383,7 @@ impl BodySync { } if last_in_cache > 0 { - if last_in_cache - first_in_cache > pibd_params::BLOCKS_RETRY_DELTA { + if last_in_cache - first_in_cache > retry_delta { has10_idx = first_in_cache; } first_in_cache = 0; diff --git a/servers/src/mwc/sync/header_sync.rs b/servers/src/mwc/sync/header_sync.rs index 108e804f0..ca1b03fb8 100644 --- a/servers/src/mwc/sync/header_sync.rs +++ b/servers/src/mwc/sync/header_sync.rs @@ -35,7 +35,7 @@ use mwc_p2p::PeerAddr; use mwc_util::RwLock; use rand::seq::IteratorRandom; use rand::seq::SliceRandom; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; pub struct HeaderSync { @@ -49,6 +49,7 @@ pub struct HeaderSync { last_retry_height: RwLock, retry_expiration_times: RwLock>>, send_requests_lock: RwLock, + excluded_peers: RwLock>, } impl HeaderSync { @@ -63,6 +64,7 @@ impl HeaderSync { last_retry_height: RwLock::new(0), retry_expiration_times: RwLock::new(VecDeque::new()), send_requests_lock: RwLock::new(0), + excluded_peers: RwLock::new(HashSet::new()), } } @@ -102,8 +104,10 @@ impl HeaderSync { return resp; } - self.request_tracker + let excluded_peers = self + .request_tracker .retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers); + *self.excluded_peers.write() = excluded_peers; // it is initial statis flag if !header_hashes.is_pibd_headers_are_loaded() { @@ -165,6 +169,7 @@ impl HeaderSync { Capabilities::HEADER_HIST, header_hashes.get_target_archive_height(), &self.request_tracker, + &*self.excluded_peers.read(), ); if peers.is_empty() { if excluded_peers == 0 { @@ -363,6 +368,7 @@ impl HeaderSync { Capabilities::HEADER_HIST, headers_hash_desegmenter.get_target_height(), &self.request_tracker, + &*self.excluded_peers.read(), ); if !peers.is_empty() { diff --git a/servers/src/mwc/sync/state_sync.rs b/servers/src/mwc/sync/state_sync.rs index ce6004c3b..0dcf7976e 100644 --- a/servers/src/mwc/sync/state_sync.rs +++ b/servers/src/mwc/sync/state_sync.rs @@ -60,6 +60,7 @@ pub struct StateSync { last_retry_idx: RwLock>, retry_expiration_times: RwLock>>, + excluded_peers: RwLock>, send_requests_lock: RwLock, } @@ -79,6 +80,7 @@ impl StateSync { is_complete: AtomicBool::new(false), last_retry_idx: RwLock::new(HashMap::new()), retry_expiration_times: RwLock::new(VecDeque::new()), + excluded_peers: RwLock::new(HashSet::new()), send_requests_lock: RwLock::new(0), } } @@ -156,6 +158,11 @@ impl StateSync { } }; + let excluded_peers = self + .request_tracker + .retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers); + *self.excluded_peers.write() = excluded_peers; + // Requesting root_hash... let (peers, excluded_requests, excluded_peers) = sync_utils::get_sync_peers( in_peers, @@ -163,6 +170,7 @@ impl StateSync { Capabilities::PIBD_HIST, target_archive_height, &self.request_tracker, + &*self.excluded_peers.read(), ); if peers.is_empty() { if excluded_peers == 0 { @@ -370,9 +378,6 @@ impl StateSync { debug_assert!(!desegmenter.is_complete()); - self.request_tracker - .retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers); - sync_state.update(desegmenter.get_pibd_progress()); // let's check what peers with root hash are exist @@ -534,6 +539,7 @@ impl StateSync { Capabilities::PIBD_HIST, self.target_archive_height.load(Ordering::Relaxed), &self.request_tracker, + &*self.excluded_peers.read(), ); if peers.is_empty() { return; @@ -801,7 +807,7 @@ impl StateSync { .cloned() .unwrap_or(0); - if segm.identifier.leaf_offset() < retry_idx { + if segm.identifier.leaf_offset() <= retry_idx { continue; } diff --git a/servers/src/mwc/sync/sync_utils.rs b/servers/src/mwc/sync/sync_utils.rs index 6cbff71f4..03ff221f8 100644 --- a/servers/src/mwc/sync/sync_utils.rs +++ b/servers/src/mwc/sync/sync_utils.rs @@ -22,7 +22,7 @@ use mwc_chain::{pibd_params, Chain}; use mwc_p2p::{Capabilities, Peer, PeerAddr, Peers}; use mwc_util::RwLock; use std::cmp; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; @@ -147,7 +147,7 @@ impl LatencyTracker { } else { self.latency_sum / self.latency_history.len() as i64 }; - Duration::microseconds(dur_ms) + Duration::milliseconds(dur_ms) } } @@ -190,17 +190,24 @@ where } } - pub fn retain_expired(&self, expiration_time_interval_sec: i64, sync_peers: &SyncPeers) { + pub fn retain_expired( + &self, + expiration_time_interval_sec: i64, + sync_peers: &SyncPeers, + ) -> HashSet { let mut requested = self.requested.write(); let peers_stats = &mut self.peers_stats.write(); let now = Utc::now(); + let mut res: HashSet = HashSet::new(); + // first let's clean up stale requests... requested.retain(|_, request_data| { let peer_stat = peers_stats.get_mut(&request_data.peer); if (now - request_data.request_time).num_seconds() > expiration_time_interval_sec { sync_peers .report_no_response(&request_data.peer, request_data.request_message.clone()); + res.insert(request_data.peer.clone()); if let Some(n) = peer_stat { n.requests = n.requests.saturating_sub(1); } @@ -208,6 +215,7 @@ where } true }); + res } pub fn clear(&self) { @@ -335,6 +343,7 @@ pub fn get_sync_peers( capabilities: Capabilities, min_height: u64, request_tracker: &RequestTracker, + excluded_peer_addr: &HashSet, ) -> (Vec>, u32, u32) { // Excluding peers with totally full Q let peer_requests_limit = expected_requests_per_peer as u32; @@ -350,16 +359,20 @@ pub fn get_sync_peers( .outbound() .with_min_height(min_height) { + let mut excluded = excluded_peer_addr.contains(&peer.info.addr); found_outbound = true; if let Some(track_data) = request_tracker.get_peer_track_data(&peer.info.addr) { - if track_data.requests < peer_requests_limit { + if !excluded && track_data.requests < peer_requests_limit { excluded_requests = excluded_requests.saturating_sub(track_data.requests as usize); } else { - excluded_peers += 1; - continue; + excluded = true; } } - res.push(peer); + if !excluded { + res.push(peer); + } else { + excluded_peers += 1; + } } if !found_outbound { // adding inbounds since no outbound is found... @@ -370,16 +383,20 @@ pub fn get_sync_peers( .inbound() .with_min_height(min_height) { + let mut excluded = excluded_peer_addr.contains(&peer.info.addr); if let Some(track_data) = request_tracker.get_peer_track_data(&peer.info.addr) { - if track_data.requests < peer_requests_limit { + if !excluded && track_data.requests < peer_requests_limit { excluded_requests = excluded_requests.saturating_sub(track_data.requests as usize); } else { - excluded_peers += 1; - continue; + excluded = true; } } - res.push(peer); + if !excluded { + res.push(peer); + } else { + excluded_peers += 1; + } } } (res, excluded_requests as u32, excluded_peers)