Skip to content

Commit

Permalink
change a little requests retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bayk committed Dec 23, 2024
1 parent 3141b7f commit f34053e
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 36 deletions.
15 changes: 5 additions & 10 deletions chain/src/pibd_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ use sysinfo::{MemoryRefreshKind, RefreshKind, System};
/// Segment heights for Header Hashes. Note, this number is needs to be the same for all network
pub const PIBD_MESSAGE_SIZE_LIMIT: usize = 256 * 1034; // Let's use 256k messages max. I think we should be good to handle that

/// Retry request to the header if next 10 are already returned.
pub const HEADERS_RETRY_DELTA: u64 = 10;

/// Retry request to the segments if next 5 are already returned.
pub const SEGMENTS_RETRY_DELTA: usize = 5;

/// Retry request to the blocks if next 10 are already returned.
pub const BLOCKS_RETRY_DELTA: u64 = 10;

// Here are series for different available resources. Mem and CPU thresholds are allways the same.
const HEADERS_HASH_BUFFER_LEN: [usize; 4] = [10, 20, 30, 60];

Expand Down Expand Up @@ -196,7 +187,7 @@ impl PibdParams {
}

fn get_network_speed_multiplier(&self, average_latency_ms: u32) -> f64 {
if average_latency_ms == 0 {
if average_latency_ms == 0 || average_latency_ms == 30000 {
return 1.0;
}
if (Utc::now() - self.network_speed.read().last_network_speed_update).num_seconds() > 5 {
Expand All @@ -217,6 +208,10 @@ impl PibdParams {
network_speed.network_speed_multiplier = 0.05f64
.max((network_speed.network_speed_multiplier) / (1.0 + 0.15 * update_mul));
}
debug!(
"for current latency {} ms the new network speed multiplier is {}",
average_latency_ms, network_speed.network_speed_multiplier
);
network_speed.network_speed_multiplier
} else {
self.network_speed.read().network_speed_multiplier
Expand Down
3 changes: 2 additions & 1 deletion chain/src/txhashset/headers_desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ impl<T> HeadersRecieveCache<T> {
let mut first_in_cache = 0;
let mut last_in_cache = 0;
let mut has10_idx = 0;
let headers_to_retry = headers_cache_size_limit as u64 / 5;

for hash_idx in base_hash_idx..=max_idx {
// let's check if cache already have it
Expand All @@ -323,7 +324,7 @@ impl<T> HeadersRecieveCache<T> {
}

if last_in_cache > 0 {
if last_in_cache - first_in_cache > pibd_params::HEADERS_RETRY_DELTA {
if last_in_cache - first_in_cache > headers_to_retry {
has10_idx = first_in_cache;
}
first_in_cache = 0;
Expand Down
4 changes: 2 additions & 2 deletions chain/src/txhashset/segments_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! Manages the segments caching
use crate::error::Error;
use crate::pibd_params;
use crate::txhashset::request_lookup::RequestLookup;
use mwc_core::core::{Segment, SegmentIdentifier, SegmentType};
use std::cmp;
Expand Down Expand Up @@ -106,6 +105,7 @@ impl<T> SegmentsCache<T> {
let mut first_in_cache = 0;
let mut last_in_cache = 0;
let mut has_5_idx = 0;
let retry_delta = cache_size_limit / 5;

for idx in self.received_segments..max_segm_idx {
let segm = &self.required_segments[idx];
Expand All @@ -120,7 +120,7 @@ impl<T> SegmentsCache<T> {
}

if last_in_cache > 0 {
if last_in_cache - first_in_cache > pibd_params::SEGMENTS_RETRY_DELTA {
if last_in_cache - first_in_cache > retry_delta {
has_5_idx = first_in_cache;
}
first_in_cache = 0;
Expand Down
19 changes: 13 additions & 6 deletions servers/src/mwc/sync/body_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use mwc_util::RwLock;
use p2p::Capabilities;
use rand::prelude::*;
use std::cmp;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;

pub struct BodySync {
Expand All @@ -38,6 +38,7 @@ pub struct BodySync {
pibd_params: Arc<PibdParams>,
last_retry_height: RwLock<u64>,
retry_expiration_times: RwLock<VecDeque<DateTime<Utc>>>,
excluded_peers: RwLock<HashSet<PeerAddr>>,
}

impl BodySync {
Expand All @@ -50,6 +51,7 @@ impl BodySync {
request_series: RwLock::new(Vec::new()),
last_retry_height: RwLock::new(0),
retry_expiration_times: RwLock::new(VecDeque::new()),
excluded_peers: RwLock::new(HashSet::new()),
}
}

Expand Down Expand Up @@ -122,12 +124,19 @@ impl BodySync {
};
*self.required_capabilities.write() = required_capabilities;

// requested_blocks, check for expiration
let excluded_peers = self
.request_tracker
.retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers);
*self.excluded_peers.write() = excluded_peers;

let (peers, excluded_requests, excluded_peers) = sync_utils::get_sync_peers(
in_peers,
self.pibd_params.get_blocks_request_per_peer(),
peer_capabilities,
head.height,
&self.request_tracker,
&*self.excluded_peers.read(),
);
if peers.is_empty() {
if excluded_peers == 0 {
Expand All @@ -152,10 +161,6 @@ impl BodySync {
}
}

// requested_blocks, check for expiration
self.request_tracker
.retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers);

sync_state.update(SyncStatus::BodySync {
archive_height: if self.chain.archive_mode() {
0
Expand Down Expand Up @@ -293,6 +298,7 @@ impl BodySync {
*self.required_capabilities.read(),
head.height,
&self.request_tracker,
&*self.excluded_peers.read(),
);
if !peers.is_empty() {
// requested_blocks, check for expiration
Expand Down Expand Up @@ -363,6 +369,7 @@ impl BodySync {
let mut first_in_cache = 0;
let mut last_in_cache = 0;
let mut has10_idx = 0;
let retry_delta = cmp::max(7, request_series.len() as u64 / 5);

for (hash, height) in request_series.iter().rev() {
if self.is_block_recieved(&hash)? {
Expand All @@ -376,7 +383,7 @@ impl BodySync {
}

if last_in_cache > 0 {
if last_in_cache - first_in_cache > pibd_params::BLOCKS_RETRY_DELTA {
if last_in_cache - first_in_cache > retry_delta {
has10_idx = first_in_cache;
}
first_in_cache = 0;
Expand Down
10 changes: 8 additions & 2 deletions servers/src/mwc/sync/header_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use mwc_p2p::PeerAddr;
use mwc_util::RwLock;
use rand::seq::IteratorRandom;
use rand::seq::SliceRandom;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;

pub struct HeaderSync {
Expand All @@ -49,6 +49,7 @@ pub struct HeaderSync {
last_retry_height: RwLock<u64>,
retry_expiration_times: RwLock<VecDeque<DateTime<Utc>>>,
send_requests_lock: RwLock<u8>,
excluded_peers: RwLock<HashSet<PeerAddr>>,
}

impl HeaderSync {
Expand All @@ -63,6 +64,7 @@ impl HeaderSync {
last_retry_height: RwLock::new(0),
retry_expiration_times: RwLock::new(VecDeque::new()),
send_requests_lock: RwLock::new(0),
excluded_peers: RwLock::new(HashSet::new()),
}
}

Expand Down Expand Up @@ -102,8 +104,10 @@ impl HeaderSync {
return resp;
}

self.request_tracker
let excluded_peers = self
.request_tracker
.retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers);
*self.excluded_peers.write() = excluded_peers;

// it is initial statis flag
if !header_hashes.is_pibd_headers_are_loaded() {
Expand Down Expand Up @@ -165,6 +169,7 @@ impl HeaderSync {
Capabilities::HEADER_HIST,
header_hashes.get_target_archive_height(),
&self.request_tracker,
&*self.excluded_peers.read(),
);
if peers.is_empty() {
if excluded_peers == 0 {
Expand Down Expand Up @@ -363,6 +368,7 @@ impl HeaderSync {
Capabilities::HEADER_HIST,
headers_hash_desegmenter.get_target_height(),
&self.request_tracker,
&*self.excluded_peers.read(),
);

if !peers.is_empty() {
Expand Down
14 changes: 10 additions & 4 deletions servers/src/mwc/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct StateSync {
last_retry_idx: RwLock<HashMap<SegmentType, u64>>,
retry_expiration_times: RwLock<VecDeque<DateTime<Utc>>>,

excluded_peers: RwLock<HashSet<PeerAddr>>,
send_requests_lock: RwLock<u8>,
}

Expand All @@ -79,6 +80,7 @@ impl StateSync {
is_complete: AtomicBool::new(false),
last_retry_idx: RwLock::new(HashMap::new()),
retry_expiration_times: RwLock::new(VecDeque::new()),
excluded_peers: RwLock::new(HashSet::new()),
send_requests_lock: RwLock::new(0),
}
}
Expand Down Expand Up @@ -156,13 +158,19 @@ impl StateSync {
}
};

let excluded_peers = self
.request_tracker
.retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers);
*self.excluded_peers.write() = excluded_peers;

// Requesting root_hash...
let (peers, excluded_requests, excluded_peers) = sync_utils::get_sync_peers(
in_peers,
self.pibd_params.get_segments_request_per_peer(),
Capabilities::PIBD_HIST,
target_archive_height,
&self.request_tracker,
&*self.excluded_peers.read(),
);
if peers.is_empty() {
if excluded_peers == 0 {
Expand Down Expand Up @@ -370,9 +378,6 @@ impl StateSync {

debug_assert!(!desegmenter.is_complete());

self.request_tracker
.retain_expired(pibd_params::PIBD_REQUESTS_TIMEOUT_SECS, sync_peers);

sync_state.update(desegmenter.get_pibd_progress());

// let's check what peers with root hash are exist
Expand Down Expand Up @@ -534,6 +539,7 @@ impl StateSync {
Capabilities::PIBD_HIST,
self.target_archive_height.load(Ordering::Relaxed),
&self.request_tracker,
&*self.excluded_peers.read(),
);
if peers.is_empty() {
return;
Expand Down Expand Up @@ -801,7 +807,7 @@ impl StateSync {
.cloned()
.unwrap_or(0);

if segm.identifier.leaf_offset() < retry_idx {
if segm.identifier.leaf_offset() <= retry_idx {
continue;
}

Expand Down
39 changes: 28 additions & 11 deletions servers/src/mwc/sync/sync_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use mwc_chain::{pibd_params, Chain};
use mwc_p2p::{Capabilities, Peer, PeerAddr, Peers};
use mwc_util::RwLock;
use std::cmp;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -147,7 +147,7 @@ impl LatencyTracker {
} else {
self.latency_sum / self.latency_history.len() as i64
};
Duration::microseconds(dur_ms)
Duration::milliseconds(dur_ms)
}
}

Expand Down Expand Up @@ -190,24 +190,32 @@ where
}
}

pub fn retain_expired(&self, expiration_time_interval_sec: i64, sync_peers: &SyncPeers) {
pub fn retain_expired(
&self,
expiration_time_interval_sec: i64,
sync_peers: &SyncPeers,
) -> HashSet<PeerAddr> {
let mut requested = self.requested.write();
let peers_stats = &mut self.peers_stats.write();
let now = Utc::now();

let mut res: HashSet<PeerAddr> = HashSet::new();

// first let's clean up stale requests...
requested.retain(|_, request_data| {
let peer_stat = peers_stats.get_mut(&request_data.peer);
if (now - request_data.request_time).num_seconds() > expiration_time_interval_sec {
sync_peers
.report_no_response(&request_data.peer, request_data.request_message.clone());
res.insert(request_data.peer.clone());
if let Some(n) = peer_stat {
n.requests = n.requests.saturating_sub(1);
}
return false;
}
true
});
res
}

pub fn clear(&self) {
Expand Down Expand Up @@ -335,6 +343,7 @@ pub fn get_sync_peers<T: std::cmp::Eq + std::hash::Hash>(
capabilities: Capabilities,
min_height: u64,
request_tracker: &RequestTracker<T>,
excluded_peer_addr: &HashSet<PeerAddr>,
) -> (Vec<Arc<Peer>>, u32, u32) {
// Excluding peers with totally full Q
let peer_requests_limit = expected_requests_per_peer as u32;
Expand All @@ -350,16 +359,20 @@ pub fn get_sync_peers<T: std::cmp::Eq + std::hash::Hash>(
.outbound()
.with_min_height(min_height)
{
let mut excluded = excluded_peer_addr.contains(&peer.info.addr);
found_outbound = true;
if let Some(track_data) = request_tracker.get_peer_track_data(&peer.info.addr) {
if track_data.requests < peer_requests_limit {
if !excluded && track_data.requests < peer_requests_limit {
excluded_requests = excluded_requests.saturating_sub(track_data.requests as usize);
} else {
excluded_peers += 1;
continue;
excluded = true;
}
}
res.push(peer);
if !excluded {
res.push(peer);
} else {
excluded_peers += 1;
}
}
if !found_outbound {
// adding inbounds since no outbound is found...
Expand All @@ -370,16 +383,20 @@ pub fn get_sync_peers<T: std::cmp::Eq + std::hash::Hash>(
.inbound()
.with_min_height(min_height)
{
let mut excluded = excluded_peer_addr.contains(&peer.info.addr);
if let Some(track_data) = request_tracker.get_peer_track_data(&peer.info.addr) {
if track_data.requests < peer_requests_limit {
if !excluded && track_data.requests < peer_requests_limit {
excluded_requests =
excluded_requests.saturating_sub(track_data.requests as usize);
} else {
excluded_peers += 1;
continue;
excluded = true;
}
}
res.push(peer);
if !excluded {
res.push(peer);
} else {
excluded_peers += 1;
}
}
}
(res, excluded_requests as u32, excluded_peers)
Expand Down

0 comments on commit f34053e

Please sign in to comment.