Skip to content

Commit

Permalink
Detected PIBD sync stale because of the bitmap changes
Browse files Browse the repository at this point in the history
  • Loading branch information
bayk committed Jul 12, 2024
1 parent d62e2e3 commit ac001c5
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 50 deletions.
3 changes: 3 additions & 0 deletions chain/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ pub enum Error {
/// Other issue with segment
#[error("Invalid segment: {0}")]
InvalidSegment(String),
/// Sync MMR is reorganized
#[error("MMR is reorganized: {0}")]
SyncMMRChanged(String),
}

impl Error {
Expand Down
2 changes: 1 addition & 1 deletion chain/src/pibd_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ pub const STALE_REQUESTS_PER_PEER: u32 = 5;

/// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds
/// give up and revert back to the txhashset.zip download method
pub const TXHASHSET_ZIP_FALLBACK_TIME_SECS: i64 = 60;
pub const TXHASHSET_ZIP_FALLBACK_TIME_SECS: i64 = 60 + SEGMENT_REQUEST_TIMEOUT_SECS * 2;
40 changes: 34 additions & 6 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::pibd_params;
use crate::store;
use crate::txhashset;

use crate::Error::SyncMMRChanged;
use croaring::Bitmap;

/// Desegmenter for rebuilding a txhashset from PIBD segments
Expand Down Expand Up @@ -446,7 +447,10 @@ impl Desegmenter {

/// Return list of the next preferred segments the desegmenter needs based on
/// the current real state of the underlying elements
pub fn next_desired_segments(&mut self, max_elements: usize) -> Vec<SegmentTypeIdentifier> {
pub fn next_desired_segments(
&mut self,
max_elements: usize,
) -> Result<Vec<SegmentTypeIdentifier>, Error> {
let mut return_vec = vec![];
// First check for required bitmap elements
if self.bitmap_cache.is_none() {
Expand All @@ -463,7 +467,7 @@ impl Desegmenter {
if !self.has_bitmap_segment_with_id(id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Bitmap, id));
if return_vec.len() >= max_elements {
return return_vec;
return Ok(return_vec);
}
}
}
Expand All @@ -488,10 +492,14 @@ impl Desegmenter {
);

let mut elems_added = 0;
let mut found_output_first = false;
while let Some(output_id) = output_identifier_iter.next() {
// Advance output iterator to next needed position
let (_first, last) =
let (first, last) =
output_id.segment_pos_range(self.archive_header.output_mmr_size);
if first == local_output_mmr_size {
found_output_first = true;
}
if last <= local_output_mmr_size {
continue;
}
Expand All @@ -507,15 +515,23 @@ impl Desegmenter {
}
}

if !found_output_first && local_output_mmr_size > 1 {
return Err(SyncMMRChanged("Outputs MMR is not aligned".into()));
}

let mut rangeproof_identifier_iter = SegmentIdentifier::traversal_iter(
self.archive_header.output_mmr_size,
self.default_rangeproof_segment_height,
);

elems_added = 0;
let mut found_rangeproof_first = false;
while let Some(rp_id) = rangeproof_identifier_iter.next() {
let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size);
let (first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size);
// Advance rangeproof iterator to next needed position
if first == local_rangeproof_mmr_size {
found_rangeproof_first = true;
}
if last <= local_rangeproof_mmr_size {
continue;
}
Expand All @@ -531,15 +547,23 @@ impl Desegmenter {
}
}

if !found_rangeproof_first && local_rangeproof_mmr_size > 1 {
return Err(SyncMMRChanged("Rangeproof MMR is not aligned".into()));
}

let mut kernel_identifier_iter = SegmentIdentifier::traversal_iter(
self.archive_header.kernel_mmr_size,
self.default_kernel_segment_height,
);

elems_added = 0;
let mut found_kernel_checkpoint = false;
while let Some(k_id) = kernel_identifier_iter.next() {
// Advance kernel iterator to next needed position
let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
let (first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
if local_kernel_mmr_size == first {
found_kernel_checkpoint = true;
}
// Advance rangeproof iterator to next needed position
if last <= local_kernel_mmr_size {
continue;
Expand All @@ -555,11 +579,15 @@ impl Desegmenter {
break;
}
}

if !found_kernel_checkpoint && local_kernel_mmr_size > 1 {
return Err(SyncMMRChanged("Kernel MMR is not aligned".into()));
}
}
if return_vec.is_empty() && self.bitmap_cache.is_some() {
self.all_segments_complete = true;
}
return_vec
Ok(return_vec)
}

/// 'Finalize' the bitmap accumulator, storing an in-memory copy of the bitmap for
Expand Down
104 changes: 61 additions & 43 deletions servers/src/grin/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ pub struct StateSync {
output_bitmap_root_header_hash: Option<Hash>,
}

const MIN_START_PIBD_RESPONCE_LIMIT_SEC: i64 = 60;

impl StateSync {
pub fn new(
sync_state: Arc<SyncState>,
Expand Down Expand Up @@ -240,39 +238,56 @@ impl StateSync {

if has_segmenter {
// Continue our PIBD process (which returns true if all segments are in)
if self.continue_pibd(&archive_header) {
let desegmenter = self.chain.get_desegmenter(&archive_header);
// All segments in, validate
if let Some(d) = desegmenter.write().as_mut() {
if let Ok(true) = d.check_progress(self.sync_state.clone()) {
if let Err(e) = d.check_update_leaf_set_state() {
error!("error updating PIBD leaf set: {}", e);
self.sync_state.update_pibd_progress(
false,
true,
0,
1,
&archive_header,
);
return false;
}
if let Err(e) = d.validate_complete_state(
self.sync_state.clone(),
stop_state.clone(),
) {
error!("error validating PIBD state: {}", e);
self.sync_state.update_pibd_progress(
false,
true,
0,
1,
&archive_header,
);
return false;
match self.continue_pibd(&archive_header) {
Ok(true) => {
let desegmenter = self.chain.get_desegmenter(&archive_header);
// All segments in, validate
if let Some(d) = desegmenter.write().as_mut() {
if let Ok(true) = d.check_progress(self.sync_state.clone()) {
if let Err(e) = d.check_update_leaf_set_state() {
error!("error updating PIBD leaf set: {}", e);
self.sync_state.update_pibd_progress(
false,
true,
0,
1,
&archive_header,
);
return false;
}
if let Err(e) = d.validate_complete_state(
self.sync_state.clone(),
stop_state.clone(),
) {
error!("error validating PIBD state: {}", e);
self.sync_state.update_pibd_progress(
false,
true,
0,
1,
&archive_header,
);
return false;
}
return true;
}
return true;
}
};
};
}
Ok(false) => (), // nothing to do, continue
Err(e) => {
// need to restart the sync process, but not ban the peers, it is not there fault
error!("Need to restart the PIBD resync because of the error {}", e);
// resetting to none, so no peers will be banned
self.output_bitmap_root_header_hash = None;
self.sync_state.update_pibd_progress(
false,
true,
0,
1,
&archive_header,
);
return false;
}
}
}
} else {
Expand Down Expand Up @@ -348,7 +363,8 @@ impl StateSync {
}

fn ban_inactive_pibd_peers(&self) {
let none_active_time_limit = Utc::now().timestamp() - MIN_START_PIBD_RESPONCE_LIMIT_SEC;
let none_active_time_limit =
Utc::now().timestamp() - pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS;
let mut banned_peers: Vec<Arc<Peer>> = Vec::new();
for peer in self.peers.iter().connected().into_iter() {
if let Some((requests, time)) = peer.get_pibd_no_response_state() {
Expand All @@ -374,7 +390,8 @@ impl StateSync {

// Minimal interval to send request for starting the PIBD sync process

let last_handshake_time = Utc::now().timestamp() - MIN_START_PIBD_RESPONCE_LIMIT_SEC;
let last_handshake_time =
Utc::now().timestamp() - pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS;

for peer in peers {
let mut need_sync = false;
Expand Down Expand Up @@ -410,7 +427,8 @@ impl StateSync {
fn select_pibd_bitmap_output_root(&self, archive_header: &BlockHeader) -> Option<Hash> {
let header_hash = archive_header.hash();

let handshake_time_limit = Utc::now().timestamp() - MIN_START_PIBD_RESPONCE_LIMIT_SEC / 2;
let handshake_time_limit =
Utc::now().timestamp() - pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS / 2;

let mut min_handshake_time = handshake_time_limit + 1;

Expand Down Expand Up @@ -444,7 +462,7 @@ impl StateSync {

/// Continue the PIBD process, returning true if the desegmenter is reporting
/// that the process is done
fn continue_pibd(&mut self, archive_header: &BlockHeader) -> bool {
fn continue_pibd(&mut self, archive_header: &BlockHeader) -> Result<bool, grin_chain::Error> {
// Check the state of our chain to figure out what we should be requesting next
let desegmenter = self.chain.get_desegmenter(&archive_header);

Expand All @@ -462,7 +480,7 @@ impl StateSync {
error!("error applying segment: {}", e);
self.sync_state
.update_pibd_progress(false, true, 0, 1, &archive_header);
return false;
return Ok(false);
}
}
}
Expand All @@ -476,12 +494,12 @@ impl StateSync {
let mut next_segment_ids = vec![];
if let Some(d) = desegmenter.write().as_mut() {
if let Ok(true) = d.check_progress(self.sync_state.clone()) {
return true;
return Ok(true);
}
// Figure out the next segments we need
// (12 is divisible by 3, to try and evenly spread the requests among the 3
// main pmmrs. Bitmaps segments will always be requested first)
next_segment_ids = d.next_desired_segments(std::cmp::max(1, desired_segments_num));
next_segment_ids = d.next_desired_segments(std::cmp::max(1, desired_segments_num))?;
}

// Choose a random "most work" peer, preferring outbound if at all possible.
Expand Down Expand Up @@ -530,7 +548,7 @@ impl StateSync {
self.sync_state
.set_sync_error(chain::Error::AbortingPIBDError);
self.set_pibd_aborted();
return false;
return Ok(false);
}
}
Some(p) => {
Expand Down Expand Up @@ -565,7 +583,7 @@ impl StateSync {
}
}
}
false
Ok(false)
}

fn request_state(&self, header_head: &chain::Tip) -> Result<Arc<Peer>, p2p::Error> {
Expand Down

0 comments on commit ac001c5

Please sign in to comment.