Skip to content

Commit

Permalink
PIBD variable block size. Download speed control to match network qua…
Browse files Browse the repository at this point in the history
…lity
  • Loading branch information
bayk committed Dec 23, 2024
1 parent ca3df15 commit 25cb842
Show file tree
Hide file tree
Showing 22 changed files with 818 additions and 546 deletions.
10 changes: 0 additions & 10 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1336,16 +1336,6 @@ impl Chain {
assert!(header.kernel_root == kernel_pmmr_root);
}

/*{
use mwc_core::core::pmmr::ReadablePMMR;
let txhashset = self.txhashset.read();
let rangeproof_pmmr = txhashset.rangeproof_pmmr_at(&header);
let rangeproof_pmmr_root = rangeproof_pmmr.root().unwrap();
error!("rangeproof_pmmr_root: {} at height: {}, mmr size: {}", rangeproof_pmmr_root, header.height, header.output_mmr_size);
txhashset.dump_rproof_mmrs()
}*/

Ok(Segmenter::new(
Arc::new(RwLock::new(segm_header_pmmr_backend)),
self.txhashset.clone(),
Expand Down
9 changes: 6 additions & 3 deletions chain/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,12 @@ pub enum Error {
/// Invalid headers root hash. Probably old traffic or somebody attacking as
#[error("Invalid headers root hash")]
InvalidHeadersRoot,
/// Invalid segment height.
#[error("Invalid segment height")]
InvalidSegmentHeght,
/// Invalid prune segment state. Expected that segment is not pruned, but it does
#[error("Not expected segment pruned state")]
InvalidPruneState,
/// Invalid segment id (such segment was never requested).
#[error("Invalid segment id")]
InvalidSegmentId,
/// Invalid genesis hash.
#[error("Invalid genesis hash")]
InvalidGenesisHash,
Expand Down
141 changes: 60 additions & 81 deletions chain/src/pibd_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,17 @@
use chrono::{DateTime, Utc};
use mwc_util::RwLock;
use std::cmp;
use std::ops::Range;
use std::sync::Arc;
use sysinfo::{MemoryRefreshKind, RefreshKind, System};

/// Segment heights for Header Hashes
pub const HEADERS_HASHES_SEGMENT_HEIGHT_RANGE: Range<u8> = 10..13; // ~32b
/// Segment heights for kernels
pub const KERNEL_SEGMENT_HEIGHT_RANGE: Range<u8> = 8..11; // ~ 100 b
/// Segment heights for output bitmaps
pub const BITMAP_SEGMENT_HEIGHT_RANGE: Range<u8> = 8..11; // ~ 128 b
/// Segment heights for outputs
pub const OUTPUT_SEGMENT_HEIGHT_RANGE: Range<u8> = 10..13; // ~ 33 b
/// Segment heights for rangeproofs
pub const RANGEPROOF_SEGMENT_HEIGHT_RANGE: Range<u8> = 6..9; // ~ 675 b
/// Segment heights for Header Hashes. Note, this number is needs to be the same for all network
pub const PIBD_MESSAGE_SIZE_LIMIT: usize = 256 * 1034; // Let's use 256k messages max. I think we should be good to handle that

/// Retry request to the header if next 10 are already returned.
pub const HEADERS_RETRY_DELTA: u64 = 10;

/// Retry request to the segments if next 5 are already returned.
pub const SEGMENTS_RETRY_DELTA: u64 = 5;
pub const SEGMENTS_RETRY_DELTA: usize = 5;

/// Retry request to the blocks if next 10 are already returned.
pub const BLOCKS_RETRY_DELTA: u64 = 10;
Expand Down Expand Up @@ -81,16 +72,16 @@ impl SysMemoryInfo {
}
}

struct NetworkSpeed {
last_network_speed_update: DateTime<Utc>,
network_speed_multiplier: f64,
}

/// Pibd Sync related params. Note, most of settings are dynamic calculated to match available resources.
pub struct PibdParams {
cpu_num: usize,
sys_memory_info: Arc<RwLock<SysMemoryInfo>>,

bitmap_segment_height: u8,
headers_segment_height: u8,
output_segment_height: u8,
rangeproof_segment_height: u8,
kernel_segment_height: u8,
network_speed: RwLock<NetworkSpeed>,
}

impl PibdParams {
Expand All @@ -100,59 +91,20 @@ impl PibdParams {
let mem_info = SysMemoryInfo::update();
let res = PibdParams {
cpu_num: num_cores,
bitmap_segment_height: Self::calc_mem_adequate_val(
&BITMAP_SEGMENT_HEIGHT_RANGE,
mem_info.available_memory_mb,
num_cores,
),
headers_segment_height: Self::calc_mem_adequate_val(
&HEADERS_HASHES_SEGMENT_HEIGHT_RANGE,
mem_info.available_memory_mb,
num_cores,
),
output_segment_height: Self::calc_mem_adequate_val(
&OUTPUT_SEGMENT_HEIGHT_RANGE,
mem_info.available_memory_mb,
num_cores,
),
rangeproof_segment_height: Self::calc_mem_adequate_val(
&RANGEPROOF_SEGMENT_HEIGHT_RANGE,
mem_info.available_memory_mb,
num_cores,
),
kernel_segment_height: Self::calc_mem_adequate_val(
&KERNEL_SEGMENT_HEIGHT_RANGE,
mem_info.available_memory_mb,
num_cores,
),
sys_memory_info: Arc::new(RwLock::new(mem_info)),
network_speed: RwLock::new(NetworkSpeed {
last_network_speed_update: Utc::now(),
network_speed_multiplier: 1.0,
}),
};
debug!("PibdParams config: cpu_num={}, bitmap_segment_height={}, headers_segment_height={}, output_segment_height={}, rangeproof_segment_height={}, kernel_segment_height={}, available_memory_mb={}",
res.cpu_num, res.bitmap_segment_height, res.headers_segment_height, res.output_segment_height, res.rangeproof_segment_height, res.kernel_segment_height, res.sys_memory_info.read().available_memory_mb );
debug!(
"PibdParams config: cpu_num={}, available_memory_mb={}",
res.cpu_num,
res.sys_memory_info.read().available_memory_mb
);
res
}

/// Get segment height for output bitmaps
pub fn get_bitmap_segment_height(&self) -> u8 {
self.bitmap_segment_height
}
/// Get segment height for header hashes
pub fn get_headers_segment_height(&self) -> u8 {
self.headers_segment_height
}
/// Get segment height for outputs
pub fn get_output_segment_height(&self) -> u8 {
self.output_segment_height
}
/// Get segment height for rangeproofs
pub fn get_rangeproof_segment_height(&self) -> u8 {
self.rangeproof_segment_height
}
/// Get segment height for kernels
pub fn get_kernel_segment_height(&self) -> u8 {
self.kernel_segment_height
}

/// Buffer size for header hashes
pub fn get_headers_hash_buffer_len(&self) -> usize {
Self::calc_mem_adequate_val2(
Expand Down Expand Up @@ -208,8 +160,13 @@ impl PibdParams {
}

/// Maxumum number of blocks that can await into the DB as orphans
pub fn get_blocks_request_limit(&self) -> usize {
self.get_orphans_num_limit() / 2
pub fn get_blocks_request_limit(&self, average_latency_ms: u32) -> usize {
let req_limit = self.get_orphans_num_limit() / 2;
cmp::max(
1,
(req_limit as f64 * self.get_network_speed_multiplier(average_latency_ms)).round()
as usize,
)
}

/// Number of simultaneous requests for segments we should make per available peer. Note this is currently
Expand All @@ -225,14 +182,47 @@ impl PibdParams {

/// Maximum number of simultaneous requests. Please note, the data will be processed in a single thread, so
/// don't overload much
pub fn get_segments_requests_limit(&self) -> usize {
Self::calc_mem_adequate_val2(
pub fn get_segments_requests_limit(&self, average_latency_ms: u32) -> usize {
let req_limit = Self::calc_mem_adequate_val2(
&SEGMENTS_REQUEST_LIMIT,
self.get_available_memory_mb(),
self.cpu_num,
);
cmp::max(
1,
(req_limit as f64 * self.get_network_speed_multiplier(average_latency_ms)).round()
as usize,
)
}

fn get_network_speed_multiplier(&self, average_latency_ms: u32) -> f64 {
if average_latency_ms == 0 {
return 1.0;
}
if (Utc::now() - self.network_speed.read().last_network_speed_update).num_seconds() > 5 {
let mut network_speed = self.network_speed.write();
network_speed.last_network_speed_update = Utc::now();
let expected_latency_ms = PIBD_REQUESTS_TIMEOUT_SECS as u32 / 2 * 1000;
if average_latency_ms < expected_latency_ms {
let update_mul =
(expected_latency_ms - average_latency_ms) as f64 / expected_latency_ms as f64;
debug_assert!(update_mul >= 0.0 && update_mul <= 1.0);
network_speed.network_speed_multiplier =
1.0f64.min((network_speed.network_speed_multiplier) * (1.0 + 0.1 * update_mul));
} else {
let update_mul =
(average_latency_ms - expected_latency_ms) as f64 / expected_latency_ms as f64;
let update_mul = 1.0f64.min(update_mul);
debug_assert!(update_mul >= 0.0 && update_mul <= 1.0);
network_speed.network_speed_multiplier = 0.05f64
.max((network_speed.network_speed_multiplier) / (1.0 + 0.15 * update_mul));
}
network_speed.network_speed_multiplier
} else {
self.network_speed.read().network_speed_multiplier
}
}

fn get_available_memory_mb(&self) -> u64 {
let mut sys_memory_info = self.sys_memory_info.write();
if (Utc::now() - sys_memory_info.update_time).num_seconds() > 2 {
Expand All @@ -241,17 +231,6 @@ impl PibdParams {
sys_memory_info.available_memory_mb
}

fn calc_mem_adequate_val(range: &Range<u8>, available_memory_mb: u64, num_cores: usize) -> u8 {
if available_memory_mb < 500 || num_cores <= 1 {
range.start
} else if available_memory_mb < 1000 || num_cores <= 2 {
cmp::min(range.start.saturating_add(1), range.end.saturating_sub(1))
} else {
debug_assert!(range.end - range.start <= 3); // it is not true, add more ifs for memory checking
cmp::min(range.start.saturating_add(2), range.end.saturating_sub(1))
}
}

fn calc_mem_adequate_val2<T: Clone>(
vals: &[T],
available_memory_mb: u64,
Expand Down
3 changes: 2 additions & 1 deletion chain/src/txhashset/bitmap_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ pub struct BitmapChunk(BitVec);

impl BitmapChunk {
const LEN_BITS: usize = 1024;
const LEN_BYTES: usize = Self::LEN_BITS / 8;
/// Size of the bitmap chain in bytes
pub const LEN_BYTES: usize = Self::LEN_BITS / 8;

/// Create a new bitmap chunk, defaulting all bits in the chunk to false.
pub fn new() -> BitmapChunk {
Expand Down
Loading

0 comments on commit 25cb842

Please sign in to comment.