diff --git a/chain/src/chain.rs b/chain/src/chain.rs index e0ab485f9..a02435fd8 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -178,11 +178,11 @@ impl OrphanBlockPool { /// maintains locking for the pipeline to avoid conflicting processing. pub struct Chain { db_root: String, - store: Arc, + store: Arc, // Lock order (with childrer): 3 adapter: Arc, orphans: Arc, - txhashset: Arc>, - header_pmmr: Arc>>, + txhashset: Arc>, // Lock order (with childrer): 2 + header_pmmr: Arc>>, // Lock order (with childrer): 1 pibd_segmenter: Arc>>, pibd_desegmenter: Arc>>, reset_pibd_desegmenter: Arc>, @@ -387,12 +387,14 @@ impl Chain { } /// Return our shared header MMR handle. - pub fn header_pmmr(&self) -> Arc>> { + /// Note, caller is responsible for locking in correct order. See the comment at declaration + pub fn get_header_pmmr_for_test(&self) -> Arc>> { self.header_pmmr.clone() } /// Return our shared txhashset instance. - pub fn txhashset(&self) -> Arc> { + /// Note, caller is responsible for locking in correct order. See the comment at declaration + pub fn get_txhashset_for_test(&self) -> Arc> { self.txhashset.clone() } @@ -402,7 +404,8 @@ impl Chain { } /// Shared store instance. - pub fn store(&self) -> Arc { + /// Note, caller is responsible for locking in correct order. See the comment at declaration + pub fn get_store_for_tests(&self) -> Arc { self.store.clone() } @@ -1041,7 +1044,8 @@ impl Chain { /// Do we need to do the check here? we are doing check for every tx regardless of the kernel version. pub fn replay_attack_check(&self, tx: &Transaction) -> Result<(), Error> { let mut header_pmmr = self.header_pmmr.write(); - txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| { + let batch_read = self.store.batch_read()?; + txhashset::header_extending_readonly(&mut header_pmmr, batch_read, |ext, batch| { pipe::check_against_spent_output(&tx.body, None, None, ext, batch)?; Ok(()) }) @@ -1079,8 +1083,9 @@ impl Chain { /// Sets prev_root on a brand new block header by applying the previous header to the header MMR. pub fn set_prev_root_only(&self, header: &mut BlockHeader) -> Result<(), Error> { let mut header_pmmr = self.header_pmmr.write(); + let batch_read = self.store.batch_read()?; let prev_root = - txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| { + txhashset::header_extending_readonly(&mut header_pmmr, batch_read, |ext, batch| { let prev_header = batch.get_previous_header(header)?; self.rewind_and_apply_header_fork(&prev_header, ext, batch)?; ext.root() @@ -1301,7 +1306,7 @@ impl Chain { Ok(Segmenter::new( Arc::new(RwLock::new(segm_header_pmmr_backend)), - self.txhashset(), + self.txhashset.clone(), bitmap_snapshot, header.clone(), )) @@ -1360,7 +1365,7 @@ impl Chain { ); Ok(Desegmenter::new( - self.txhashset(), + self.txhashset.clone(), self.header_pmmr.clone(), archive_header.clone(), bitmap_root_hash, @@ -2100,7 +2105,8 @@ impl Chain { /// Note: This is based on the provided sync_head to support syncing against a fork. pub fn get_locator_hashes(&self, sync_head: Tip, heights: &[u64]) -> Result, Error> { let mut header_pmmr = self.header_pmmr.write(); - txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| { + let batch_read = self.store.batch_read()?; + txhashset::header_extending_readonly(&mut header_pmmr, batch_read, |ext, batch| { let header = batch.get_block_header(&sync_head.hash())?; self.rewind_and_apply_header_fork(&header, ext, batch)?; @@ -2128,6 +2134,61 @@ impl Chain { .block_exists(h) .map_err(|e| Error::StoreErr(e, "chain block exists".to_owned())) } + + /// Locate headers from the main chain. + pub fn locate_headers( + &self, + locator: &[Hash], + block_header_num: u32, + ) -> Result, crate::Error> { + debug!("locator: {:?}", locator); + + let header = match self.find_common_header(locator) { + Some(header) => header, + None => return Ok(vec![]), + }; + + let max_height = self.header_head()?.height; + + let header_pmmr = self.header_pmmr.read(); + + // looks like we know one, getting as many following headers as allowed + let hh = header.height; + let mut headers = vec![]; + for h in (hh + 1)..=(hh + (block_header_num as u64)) { + if h > max_height { + break; + } + + if let Ok(hash) = header_pmmr.get_header_hash_by_height(h) { + let header = self.get_block_header(&hash)?; + headers.push(header); + } else { + error!("Failed to locate headers successfully."); + break; + } + } + debug!("returning headers: {}", headers.len()); + Ok(headers) + } + + // Find the first locator hash that refers to a known header on our main chain. + fn find_common_header(&self, locator: &[Hash]) -> Option { + let header_pmmr = self.header_pmmr.read(); + + for hash in locator { + if let Ok(header) = self.get_block_header(&hash) { + if let Ok(hash_at_height) = header_pmmr.get_header_hash_by_height(header.height) { + if let Ok(header_at_height) = self.get_block_header(&hash_at_height) { + if header.hash() == header_at_height.hash() { + return Some(header); + } + } + } + } + } + None + } } fn setup_head( diff --git a/chain/src/pibd_params.rs b/chain/src/pibd_params.rs index d665f0ebf..1dbf418d0 100644 --- a/chain/src/pibd_params.rs +++ b/chain/src/pibd_params.rs @@ -221,7 +221,7 @@ impl PibdParams { match self.cpu_num { 1 => 2, 2 => 4, - _ => 8, + _ => 6, } } diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index 64f672c49..db7a05b17 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -44,6 +44,7 @@ use tokio::runtime::Builder; use tokio::task; /// Desegmenter for rebuilding a txhashset from PIBD segments +/// Note!!! header_pmmr, txhashset & store are from the Chain. Same locking rules are applicable pub struct Desegmenter { txhashset: Arc>, header_pmmr: Arc>>, @@ -293,10 +294,14 @@ impl Desegmenter { // Check NRD relative height rules for full kernel history. { info!("desegmenter validation: validating kernel history"); - let txhashset = self.txhashset.read(); - Chain::validate_kernel_history(&self.archive_header, &txhashset)?; - + // Note, locking order is: header_pmmr->txhashset->batch !!! + { + // validate_kernel_history is long operation, that is why let's lock txhashset twice. + let txhashset = self.txhashset.read(); + Chain::validate_kernel_history(&self.archive_header, &txhashset)?; + } let header_pmmr = self.header_pmmr.read(); + let txhashset = self.txhashset.read(); let batch = self.store.batch_read()?; txhashset.verify_kernel_pos_index( &self.genesis, diff --git a/chain/src/txhashset/segmenter.rs b/chain/src/txhashset/segmenter.rs index bb0030f9e..5736903c2 100644 --- a/chain/src/txhashset/segmenter.rs +++ b/chain/src/txhashset/segmenter.rs @@ -26,6 +26,7 @@ use mwc_core::core::pmmr::{ReadonlyPMMR, VecBackend}; use std::{sync::Arc, time::Instant}; /// Segmenter for generating PIBD segments. +/// Note!!! header_pmmr, txhashset & store are from the Chain. Same locking rules are applicable #[derive(Clone)] pub struct Segmenter { // every 512th header (HEADERS_PER_BATCH) must be here, we don't need all header hashes diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index c77b55df2..7501e943a 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -866,17 +866,15 @@ where /// to allow headers to be validated before we receive the full block data. pub fn header_extending_readonly<'a, F, T>( handle: &'a mut PMMRHandle, - store: &ChainStore, + batch_read: Batch<'_>, inner: F, ) -> Result where F: FnOnce(&mut HeaderExtension<'_>, &Batch<'_>) -> Result, { - let batch = store.batch_read()?; - let head = match handle.head_hash() { Ok(hash) => { - let header = batch.get_block_header(&hash)?; + let header = batch_read.get_block_header(&hash)?; Tip::from_header(&header) } Err(_) => Tip::default(), @@ -884,7 +882,7 @@ where let pmmr = PMMR::at(&mut handle.backend, handle.size); let mut extension = HeaderExtension::new(pmmr, head); - let res = inner(&mut extension, &batch); + let res = inner(&mut extension, &batch_read); handle.backend.discard(); diff --git a/chain/tests/process_block_cut_through.rs b/chain/tests/process_block_cut_through.rs index 260049450..82aa23d1c 100644 --- a/chain/tests/process_block_cut_through.rs +++ b/chain/tests/process_block_cut_through.rs @@ -181,9 +181,9 @@ fn process_block_cut_through() -> Result<(), chain::Error> { // Now exercise the internal call to pipe::process_block() directly so we can introspect the error // without it being wrapped as above. { - let store = chain.store(); - let header_pmmr = chain.header_pmmr(); - let txhashset = chain.txhashset(); + let store = chain.get_store_for_tests(); + let header_pmmr = chain.get_header_pmmr_for_test(); + let txhashset = chain.get_txhashset_for_test(); let mut header_pmmr = header_pmmr.write(); let mut txhashset = txhashset.write(); diff --git a/chain/tests/store_indices.rs b/chain/tests/store_indices.rs index 20ef13890..5749fe49e 100644 --- a/chain/tests/store_indices.rs +++ b/chain/tests/store_indices.rs @@ -50,7 +50,7 @@ fn test_store_indices() { { // Start a new batch and delete the block. - let store = chain.store(); + let store = chain.get_store_for_tests(); let batch = store.batch_write().unwrap(); assert!(batch.delete_block(&block_hash).is_ok()); diff --git a/chain/tests/test_block_known.rs b/chain/tests/test_block_known.rs index 413928e6d..bc6c9b42d 100644 --- a/chain/tests/test_block_known.rs +++ b/chain/tests/test_block_known.rs @@ -61,7 +61,7 @@ fn check_known() { // reset chain head to earlier state { let chain = init_chain(chain_dir, genesis.clone()); - let store = chain.store(); + let store = chain.get_store_for_tests(); let batch = store.batch_write().unwrap(); let head_header = chain.head_header().unwrap(); let prev = batch.get_previous_header(&head_header).unwrap(); diff --git a/chain/tests/test_pibd_copy.rs b/chain/tests/test_pibd_copy.rs index 16b10c3f6..7ffabaaa5 100644 --- a/chain/tests/test_pibd_copy.rs +++ b/chain/tests/test_pibd_copy.rs @@ -229,7 +229,7 @@ impl DesegmenterRequestor { { let max_height = src_chain.header_head().unwrap().height; - let header_pmmr = src_chain.header_pmmr(); + let header_pmmr = src_chain.get_header_pmmr_for_test(); let header_pmmr = header_pmmr.read(); let header = src_chain.get_block_header(&target_hash).unwrap(); @@ -343,7 +343,7 @@ impl DesegmenterRequestor { } pub fn check_roots(&self, archive_header_height: u64) { - let roots = self.chain.txhashset().read().roots().unwrap(); + let roots = self.chain.get_txhashset_for_test().read().roots().unwrap(); let archive_header = self .chain .get_header_by_height(archive_header_height) diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 8d6c1667a..9f020a360 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -418,38 +418,7 @@ where } fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error> { - debug!("locator: {:?}", locator); - - let header = match self.find_common_header(locator) { - Some(header) => header, - None => return Ok(vec![]), - }; - - let max_height = self.chain().header_head()?.height; - - let header_pmmr = self.chain().header_pmmr(); - let header_pmmr = header_pmmr.read(); - - // looks like we know one, getting as many following headers as allowed - let hh = header.height; - let mut headers = vec![]; - for h in (hh + 1)..=(hh + (p2p::MAX_BLOCK_HEADERS as u64)) { - if h > max_height { - break; - } - - if let Ok(hash) = header_pmmr.get_header_hash_by_height(h) { - let header = self.chain().get_block_header(&hash)?; - headers.push(header); - } else { - error!("Failed to locate headers successfully."); - break; - } - } - - debug!("returning headers: {}", headers.len()); - - Ok(headers) + self.chain().locate_headers(locator, p2p::MAX_BLOCK_HEADERS) } /// Gets a full block by its hash. @@ -815,25 +784,6 @@ where .expect("Failed to upgrade weak ref to our chain.") } - // Find the first locator hash that refers to a known header on our main chain. - fn find_common_header(&self, locator: &[Hash]) -> Option { - let header_pmmr = self.chain().header_pmmr(); - let header_pmmr = header_pmmr.read(); - - for hash in locator { - if let Ok(header) = self.chain().get_block_header(&hash) { - if let Ok(hash_at_height) = header_pmmr.get_header_hash_by_height(header.height) { - if let Ok(header_at_height) = self.chain().get_block_header(&hash_at_height) { - if header.hash() == header_at_height.hash() { - return Some(header); - } - } - } - } - } - None - } - // pushing the new block through the chain pipeline // remembering to reset the head if we have a bad block fn process_block( diff --git a/servers/src/mwc/sync/state_sync.rs b/servers/src/mwc/sync/state_sync.rs index 146f925a4..9f1a4cd53 100644 --- a/servers/src/mwc/sync/state_sync.rs +++ b/servers/src/mwc/sync/state_sync.rs @@ -526,6 +526,14 @@ impl StateSync { None } + fn is_expected_peer(&self, key: &(SegmentType, u64), peer: &PeerAddr) -> bool { + if let Some(p) = self.request_tracker.get_expected_peer(key) { + *peer == p + } else { + false + } + } + // return true if peer matched registered, so we get response from whom it was requested fn track_and_request_more_segments( &mut self, @@ -533,11 +541,9 @@ impl StateSync { peer: &PeerAddr, peers: &Arc, sync_peers: &mut SyncPeers, - ) -> bool { - let mut expected_peer = false; + ) { if let Some(peer_addr) = self.request_tracker.remove_request(key) { if peer_addr == *peer { - expected_peer = true; if self.request_tracker.get_update_requests_to_next_ask() == 0 { let (peers, excluded_requests) = sync_utils::get_sync_peers( peers, @@ -548,7 +554,7 @@ impl StateSync { &self.request_tracker.get_peers_queue_size(), ); if peers.is_empty() { - return expected_peer; + return; } let desegmenter = self.chain.get_desegmenter(); @@ -573,7 +579,7 @@ impl StateSync { } if root_hash_peers.is_empty() { - return expected_peer; + return; } let need_request = self.request_tracker.calculate_needed_requests( @@ -651,7 +657,6 @@ impl StateSync { } } } - expected_peer } pub fn receive_bitmap_segment( @@ -662,12 +667,8 @@ impl StateSync { peers: &Arc, sync_peers: &mut SyncPeers, ) { - let expected_peer = self.track_and_request_more_segments( - &(SegmentType::Bitmap, segment.id().idx), - peer, - peers, - sync_peers, - ); + let key = (SegmentType::Bitmap, segment.id().idx); + let expected_peer = self.is_expected_peer(&key, peer); if let Some(root_hash) = self.validate_root_hash(peer, archive_header_hash) { let desegmenter = self.chain.get_desegmenter(); @@ -694,6 +695,8 @@ impl StateSync { sync_peers .report_error_response(peer, "bitmap_segment, validate_root_hash failure".into()); } + + self.track_and_request_more_segments(&key, peer, peers, sync_peers); } pub fn receive_output_segment( @@ -704,12 +707,8 @@ impl StateSync { peers: &Arc, sync_peers: &mut SyncPeers, ) { - let expected_peer = self.track_and_request_more_segments( - &(SegmentType::Output, segment.id().idx), - peer, - peers, - sync_peers, - ); + let key = (SegmentType::Output, segment.id().idx); + let expected_peer = self.is_expected_peer(&key, peer); if let Some(root_hash) = self.validate_root_hash(peer, archive_header_hash) { let desegmenter = self.chain.get_desegmenter(); @@ -735,6 +734,8 @@ impl StateSync { } else { sync_peers.report_error_response(peer, "validate_root_hash failed".into()); } + + self.track_and_request_more_segments(&key, peer, peers, sync_peers); } pub fn receive_rangeproof_segment( @@ -745,16 +746,10 @@ impl StateSync { peers: &Arc, sync_peers: &mut SyncPeers, ) { - let expected_peer = self.track_and_request_more_segments( - &(SegmentType::RangeProof, segment.id().idx), - peer, - peers, - sync_peers, - ); - - self.request_tracker - .remove_request(&(SegmentType::RangeProof, segment.id().idx)); + let key = (SegmentType::RangeProof, segment.id().idx); + let expected_peer = self.is_expected_peer(&key, peer); + // Process first, unregister after. During unregister we might issue more requests. if let Some(root_hash) = self.validate_root_hash(peer, archive_header_hash) { let desegmenter = self.chain.get_desegmenter(); let mut desegmenter = desegmenter.write(); @@ -779,6 +774,8 @@ impl StateSync { } else { sync_peers.report_error_response(peer, "validate_root_hash error".into()); } + + self.track_and_request_more_segments(&key, peer, peers, sync_peers); } pub fn receive_kernel_segment( @@ -789,12 +786,8 @@ impl StateSync { peers: &Arc, sync_peers: &mut SyncPeers, ) { - let expected_peer = self.track_and_request_more_segments( - &(SegmentType::Kernel, segment.id().idx), - peer, - peers, - sync_peers, - ); + let key = (SegmentType::Kernel, segment.id().idx); + let expected_peer = self.is_expected_peer(&key, peer); if let Some(root_hash) = self.validate_root_hash(peer, archive_header_hash) { let desegmenter = self.chain.get_desegmenter(); @@ -820,5 +813,7 @@ impl StateSync { } else { sync_peers.report_error_response(peer, "validate_root_hash failed".into()); } + + self.track_and_request_more_segments(&key, peer, peers, sync_peers); } } diff --git a/servers/src/mwc/sync/sync_manager.rs b/servers/src/mwc/sync/sync_manager.rs index 1fd0b4c68..81c9ddca0 100644 --- a/servers/src/mwc/sync/sync_manager.rs +++ b/servers/src/mwc/sync/sync_manager.rs @@ -202,7 +202,7 @@ impl SyncManager { Some(CachedResponse::new(resp.clone(), Duration::seconds(180))); return resp; } else { - SyncResponse::new( + return SyncResponse::new( SyncRequestResponses::Syncing, self.body.get_peer_capabilities(), "Waiting for headers, even body is done, more is expected".into(), diff --git a/servers/src/mwc/sync/sync_peers.rs b/servers/src/mwc/sync/sync_peers.rs index cfb62aca6..3452280e6 100644 --- a/servers/src/mwc/sync/sync_peers.rs +++ b/servers/src/mwc/sync/sync_peers.rs @@ -28,7 +28,7 @@ enum PeerStatusEvent { Ban(String), } -const MIN_RESPONSE_NUM: usize = 10; +const MIN_RESPONSE_NUM: usize = 13; // 6*2+1 8 requests per peer is expected, see get_segments_request_per_peer() pub struct PeerPibdStatus { responses: VecDeque, diff --git a/servers/src/mwc/sync/sync_utils.rs b/servers/src/mwc/sync/sync_utils.rs index 9e6010c1c..491df255f 100644 --- a/servers/src/mwc/sync/sync_utils.rs +++ b/servers/src/mwc/sync/sync_utils.rs @@ -195,6 +195,14 @@ where None } } + + pub fn get_expected_peer(&self, key: &K) -> Option { + if let Some((peer, _time, _message)) = self.requested_hashes.get(key) { + Some(peer.clone()) + } else { + None + } + } } /// Get a list of qualify peers. Peers that has needed height and capability