diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 63113a58d..8d54ceca9 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -1128,30 +1128,45 @@ impl Chain { )) } + /// instantiate desegmenter for this header. Expected that handshake is done and as a result, header with bitmap_root_hash is known + pub fn create_desegmenter( + &self, + archive_header: &BlockHeader, + bitmap_root_hash: Hash, + ) -> Result<(), Error> { + let desegmenter = self.init_desegmenter(archive_header, bitmap_root_hash)?; + *self.pibd_desegmenter.write() = Some(desegmenter); + Ok(()) + } + /// instantiate desegmenter (in same lazy fashion as segmenter, though this should not be as /// expensive an operation) - pub fn desegmenter( + pub fn get_desegmenter( &self, archive_header: &BlockHeader, - ) -> Result>>, Error> { + ) -> Arc>> { // Use our cached desegmenter if we have one and the associated header matches. if let Some(d) = self.pibd_desegmenter.write().as_ref() { if d.header() == archive_header { - return Ok(self.pibd_desegmenter.clone()); + return self.pibd_desegmenter.clone(); } } + return Arc::new(RwLock::new(None)); + } - let desegmenter = self.init_desegmenter(archive_header)?; - let mut cache = self.pibd_desegmenter.write(); - *cache = Some(desegmenter.clone()); - - Ok(self.pibd_desegmenter.clone()) + /// Reset desegmenter associated with this seesion + pub fn reset_desegmenter(&self) { + *self.pibd_desegmenter.write() = None } /// initialize a desegmenter, which is capable of extending the hashset by appending /// PIBD segments of the three PMMR trees + Bitmap PMMR /// header should be the same header as selected for the txhashset.zip archive - fn init_desegmenter(&self, header: &BlockHeader) -> Result { + fn init_desegmenter( + &self, + header: &BlockHeader, + bitmap_root_hash: Hash, + ) -> Result { debug!( "init_desegmenter: initializing new desegmenter for {} at {}", header.hash(), @@ -1162,6 +1177,7 @@ impl Chain { self.txhashset(), self.header_pmmr.clone(), header.clone(), + bitmap_root_hash, self.genesis.header.clone(), self.store.clone(), )) diff --git a/chain/src/error.rs b/chain/src/error.rs index 35fdccf5f..8b656b103 100644 --- a/chain/src/error.rs +++ b/chain/src/error.rs @@ -20,6 +20,7 @@ use crate::core::ser; use crate::keychain; use crate::util::secp; use crate::util::secp::pedersen::Commitment; +use grin_core::core::hash::Hash; use grin_store as store; use std::io; @@ -179,8 +180,8 @@ pub enum Error { #[error("Aborting PIBD error")] AbortingPIBDError, /// The segmenter is associated to a different block header - #[error("Segmenter header mismatch")] - SegmenterHeaderMismatch, + #[error("Segmenter header mismatch, available {0} at height {1}")] + SegmenterHeaderMismatch(Hash, u64), /// Segment height not within allowed range #[error("Invalid segment height")] InvalidSegmentHeight, diff --git a/chain/src/pibd_params.rs b/chain/src/pibd_params.rs index 44fa77693..4857efc0b 100644 --- a/chain/src/pibd_params.rs +++ b/chain/src/pibd_params.rs @@ -40,6 +40,9 @@ pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 60; /// will always be requested first) pub const SEGMENT_REQUEST_COUNT: usize = 15; +/// Maximum stale requests per peer. If there are more requests, no new data will be requested +pub const STALE_REQUESTS_PER_PEER: u32 = 5; + /// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds /// give up and revert back to the txhashset.zip download method pub const TXHASHSET_ZIP_FALLBACK_TIME_SECS: i64 = 60; diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index fbbe37ec7..640233252 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -42,6 +42,7 @@ pub struct Desegmenter { txhashset: Arc>, header_pmmr: Arc>>, archive_header: BlockHeader, + bitmap_root_hash: Hash, // bitmap root hash must come as a result of handshake process store: Arc, genesis: BlockHeader, @@ -78,6 +79,7 @@ impl Desegmenter { txhashset: Arc>, header_pmmr: Arc>>, archive_header: BlockHeader, + bitmap_root_hash: Hash, genesis: BlockHeader, store: Arc, ) -> Desegmenter { @@ -86,6 +88,7 @@ impl Desegmenter { txhashset, header_pmmr, archive_header, + bitmap_root_hash, store, genesis, bitmap_accumulator: BitmapAccumulator::new(), @@ -653,19 +656,12 @@ impl Desegmenter { } /// Adds and validates a bitmap chunk - pub fn add_bitmap_segment( - &mut self, - segment: Segment, - output_root_hash: Hash, - ) -> Result<(), Error> { + pub fn add_bitmap_segment(&mut self, segment: Segment) -> Result<(), Error> { trace!("pibd_desegmenter: add bitmap segment"); - segment.validate_with( + segment.validate( self.bitmap_mmr_size, // Last MMR pos at the height being validated, in this case of the bitmap root None, - self.archive_header.output_root, // Output root we're checking for - self.archive_header.output_mmr_size, - output_root_hash, // Other root - true, + self.bitmap_root_hash, )?; trace!("pibd_desegmenter: adding segment to cache"); // All okay, add to our cached list of bitmap segments @@ -778,24 +774,17 @@ impl Desegmenter { } /// Adds a output segment - pub fn add_output_segment( - &mut self, - segment: Segment, - _bitmap_root: Option, - ) -> Result<(), Error> { + pub fn add_output_segment(&mut self, segment: Segment) -> Result<(), Error> { trace!("pibd_desegmenter: add output segment"); // TODO: This, something very wrong, probably need to reset entire body sync // check bitmap root matches what we already have /*if bitmap_root != Some(self.bitmap_accumulator.root()) { }*/ - segment.validate_with( + segment.validate( self.archive_header.output_mmr_size, // Last MMR pos at the height being validated self.bitmap_cache.as_ref(), self.archive_header.output_root, // Output root we're checking for - self.archive_header.output_mmr_size, - self.bitmap_accumulator.root(), // Other root - false, )?; self.cache_output_segment(segment); Ok(()) diff --git a/chain/src/txhashset/segmenter.rs b/chain/src/txhashset/segmenter.rs index ed6a2aee2..96ed0f9bf 100644 --- a/chain/src/txhashset/segmenter.rs +++ b/chain/src/txhashset/segmenter.rs @@ -70,16 +70,8 @@ impl Segmenter { Ok(segment) } - /// The root of the output PMMR based on size from the header. - fn output_root(&self) -> Result { - let txhashset = self.txhashset.read(); - let pmmr = txhashset.output_pmmr_at(&self.header); - let root = pmmr.root().map_err(&Error::TxHashSetErr)?; - Ok(root) - } - /// The root of the bitmap snapshot PMMR. - fn bitmap_root(&self) -> Result { + pub fn bitmap_root(&self) -> Result { let pmmr = self.bitmap_snapshot.readonly_pmmr(); let root = pmmr.root().map_err(&Error::TxHashSetErr)?; Ok(root) @@ -87,14 +79,10 @@ impl Segmenter { /// Create a utxo bitmap segment based on our bitmap "snapshot" and return it with /// the corresponding output root. - pub fn bitmap_segment( - &self, - id: SegmentIdentifier, - ) -> Result<(Segment, Hash), Error> { + pub fn bitmap_segment(&self, id: SegmentIdentifier) -> Result, Error> { let now = Instant::now(); let bitmap_pmmr = self.bitmap_snapshot.readonly_pmmr(); let segment = Segment::from_pmmr(id, &bitmap_pmmr, false)?; - let output_root = self.output_root()?; debug!( "bitmap_segment: id: ({}, {}), leaves: {}, hashes: {}, proof hashes: {}, took {}ms", segment.id().height, @@ -104,19 +92,18 @@ impl Segmenter { segment.proof().size(), now.elapsed().as_millis() ); - Ok((segment, output_root)) + Ok(segment) } /// Create an output segment and return it with the corresponding bitmap root. pub fn output_segment( &self, id: SegmentIdentifier, - ) -> Result<(Segment, Hash), Error> { + ) -> Result, Error> { let now = Instant::now(); let txhashset = self.txhashset.read(); let output_pmmr = txhashset.output_pmmr_at(&self.header); let segment = Segment::from_pmmr(id, &output_pmmr, true)?; - let bitmap_root = self.bitmap_root()?; debug!( "output_segment: id: ({}, {}), leaves: {}, hashes: {}, proof hashes: {}, took {}ms", segment.id().height, @@ -126,7 +113,7 @@ impl Segmenter { segment.proof().size(), now.elapsed().as_millis() ); - Ok((segment, bitmap_root)) + Ok(segment) } /// Create a rangeproof segment. diff --git a/chain/tests/test_pibd_copy.rs b/chain/tests/test_pibd_copy.rs index 1bd9f93a1..339c75cbb 100644 --- a/chain/tests/test_pibd_copy.rs +++ b/chain/tests/test_pibd_copy.rs @@ -19,6 +19,7 @@ use grin_util as util; #[macro_use] extern crate log; +use grin_core::core::BlockHeader; use std::path::Path; use std::sync::Arc; use std::{fs, io}; @@ -87,15 +88,16 @@ impl SegmenterResponder { self.chain.clone() } - pub fn get_bitmap_segment(&self, seg_id: SegmentIdentifier) -> (Segment, Hash) { + pub fn get_bitmap_root_hash(&self) -> Hash { + self.chain.segmenter().unwrap().bitmap_root().unwrap() + } + + pub fn get_bitmap_segment(&self, seg_id: SegmentIdentifier) -> Segment { let segmenter = self.chain.segmenter().unwrap(); segmenter.bitmap_segment(seg_id).unwrap() } - pub fn get_output_segment( - &self, - seg_id: SegmentIdentifier, - ) -> (Segment, Hash) { + pub fn get_output_segment(&self, seg_id: SegmentIdentifier) -> Segment { let segmenter = self.chain.segmenter().unwrap(); segmenter.output_segment(seg_id).unwrap() } @@ -174,11 +176,17 @@ impl DesegmenterRequestor { } } + pub fn init_desegmenter(&mut self, bitmap_root_hash: Hash) { + let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + self.chain + .create_desegmenter(&archive_header, bitmap_root_hash); + } + // Emulate `continue_pibd` function, which would be called from state sync // return whether is complete pub fn continue_pibd(&mut self) -> bool { let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); - let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + let desegmenter = self.chain.get_desegmenter(&archive_header); // Apply segments... TODO: figure out how this should be called, might // need to be a separate thread. @@ -205,17 +213,15 @@ impl DesegmenterRequestor { // Perform request and response match seg_id.segment_type { SegmentType::Bitmap => { - let (seg, output_root) = - self.responder.get_bitmap_segment(seg_id.identifier.clone()); + let seg = self.responder.get_bitmap_segment(seg_id.identifier.clone()); if let Some(d) = desegmenter.write().as_mut() { - d.add_bitmap_segment(seg, output_root).unwrap(); + d.add_bitmap_segment(seg).unwrap(); } } SegmentType::Output => { - let (seg, bitmap_root) = - self.responder.get_output_segment(seg_id.identifier.clone()); + let seg = self.responder.get_output_segment(seg_id.identifier.clone()); if let Some(d) = desegmenter.write().as_mut() { - d.add_output_segment(seg, Some(bitmap_root)).unwrap(); + d.add_output_segment(seg).unwrap(); } } SegmentType::RangeProof => { @@ -279,6 +285,7 @@ fn test_pibd_copy_impl( } let src_responder = Arc::new(SegmenterResponder::new(src_root_dir, genesis.clone())); + let bitmap_root_hash = src_responder.get_bitmap_root_hash(); let mut dest_requestor = DesegmenterRequestor::new(dest_root_dir, genesis.clone(), src_responder); @@ -290,6 +297,8 @@ fn test_pibd_copy_impl( } } + dest_requestor.init_desegmenter(bitmap_root_hash); + // Perform until desegmenter reports it's done while !dest_requestor.continue_pibd() {} @@ -327,12 +336,9 @@ fn test_pibd_copy_real() { let copy_headers_to_template = false; // if testing against a real chain, insert location here - let src_root_dir = format!("/home/yeastplume/Projects/grin-project/servers/floo-1/chain_data"); - let dest_template_dir = format!( - "/home/yeastplume/Projects/grin-project/servers/floo-pibd-1/chain_data_headers_only" - ); - let dest_root_dir = - format!("/home/yeastplume/Projects/grin-project/servers/floo-pibd-1/chain_data_test_copy"); + let src_root_dir = format!("/Users/bay/.mwc/_floo/chain_data"); + let dest_template_dir = format!("/Users/bay/.mwc/_floo2/chain_data"); + let dest_root_dir = format!("/Users/bay/.mwc/_floo2/chain_data"); if copy_headers_to_template { clean_output_dir(&dest_template_dir); test_pibd_copy_impl(false, &src_root_dir, &dest_template_dir, None); diff --git a/chain/tests/test_pibd_validation.rs b/chain/tests/test_pibd_validation.rs index 25ffcdce2..c713677b2 100644 --- a/chain/tests/test_pibd_validation.rs +++ b/chain/tests/test_pibd_validation.rs @@ -80,6 +80,12 @@ fn test_pibd_chain_validation_impl(is_test_chain: bool, src_root_dir: &str) { // This is going to use the same block as horizon_header let segmenter = src_chain.segmenter().unwrap(); + let bitmap_root = segmenter.bitmap_root().unwrap(); + println!( + "Bitmap segmenter reports output bitmap root hash is {:?}", + bitmap_root + ); + // BITMAP - Read + Validate, Also recreate bitmap accumulator for target tx hash set // Predict number of leaves (chunks) in the bitmap MMR from the number of outputs let bitmap_mmr_num_leaves = @@ -108,19 +114,12 @@ fn test_pibd_chain_validation_impl(is_test_chain: bool, src_root_dir: &str) { for sid in identifier_iter { println!("Getting bitmap segment with Segment Identifier {:?}", sid); - let (bitmap_segment, output_root_hash) = segmenter.bitmap_segment(sid).unwrap(); - println!( - "Bitmap segmenter reports output root hash is {:?}", - output_root_hash - ); + let bitmap_segment = segmenter.bitmap_segment(sid).unwrap(); // Validate bitmap segment with provided output hash - if let Err(e) = bitmap_segment.validate_with( + if let Err(e) = bitmap_segment.validate( bitmap_pmmr_size, // Last MMR pos at the height being validated, in this case of the bitmap root None, - horizon_header.output_root, // Output root we're checking for - horizon_header.output_mmr_size, - output_root_hash, // Other root - true, + bitmap_root, ) { panic!("Unable to validate bitmap_root: {}", e); } @@ -149,19 +148,12 @@ fn test_pibd_chain_validation_impl(is_test_chain: bool, src_root_dir: &str) { for sid in identifier_iter { println!("Getting output segment with Segment Identifier {:?}", sid); - let (output_segment, bitmap_root_hash) = segmenter.output_segment(sid).unwrap(); - println!( - "Output segmenter reports bitmap hash is {:?}", - bitmap_root_hash - ); + let output_segment = segmenter.output_segment(sid).unwrap(); // Validate Output - if let Err(e) = output_segment.validate_with( + if let Err(e) = output_segment.validate( horizon_header.output_mmr_size, // Last MMR pos at the height being validated Some(&bitmap), horizon_header.output_root, // Output root we're checking for - horizon_header.output_mmr_size, - bitmap_root_hash, // Other root - false, ) { panic!("Unable to validate output segment root: {}", e); } @@ -232,6 +224,6 @@ fn test_pibd_chain_validation_sample() { fn test_pibd_chain_validation_real() { util::init_test_logger(); // if testing against a real chain, insert location here - let src_root_dir = format!("/Users/bay/.grin/main/chain_data"); + let src_root_dir = format!("/Users/bay/.mwc/_floo/chain_data"); test_pibd_chain_validation_impl(false, &src_root_dir); } diff --git a/core/src/core/pmmr/segment.rs b/core/src/core/pmmr/segment.rs index 8df8b8f98..18b915b39 100644 --- a/core/src/core/pmmr/segment.rs +++ b/core/src/core/pmmr/segment.rs @@ -536,32 +536,6 @@ where segment_unpruned_pos, ) } - - /// Check validity of the segment by calculating its root and validating the merkle proof - /// This function assumes a final hashing step together with `other_root` - pub fn validate_with( - &self, - mmr_size: u64, - bitmap: Option<&Bitmap>, - mmr_root: Hash, - hash_last_pos: u64, - other_root: Hash, - other_is_left: bool, - ) -> Result<(), SegmentError> { - let (first, last) = self.segment_pos_range(mmr_size); - let (segment_root, segment_unpruned_pos) = self.first_unpruned_parent(mmr_size, bitmap)?; - self.proof.validate_with( - mmr_size, - mmr_root, - first, - last, - segment_root, - segment_unpruned_pos, - hash_last_pos, - other_root, - other_is_left, - ) - } } impl Readable for Segment { @@ -786,39 +760,6 @@ impl SegmentProof { Err(SegmentError::Mismatch) } } - - /// Check validity of the proof by equating the reconstructed root with the actual root - /// This function assumes a final hashing step together with `other_root` - pub fn validate_with( - &self, - last_pos: u64, - mmr_root: Hash, - segment_first_pos: u64, - segment_last_pos: u64, - segment_root: Hash, - segment_unpruned_pos: u64, - hash_last_pos: u64, - other_root: Hash, - other_is_left: bool, - ) -> Result<(), SegmentError> { - let root = self.reconstruct_root( - last_pos, - segment_first_pos, - segment_last_pos, - segment_root, - segment_unpruned_pos, - )?; - let root = if other_is_left { - (other_root, root).hash_with_index(hash_last_pos) - } else { - (root, other_root).hash_with_index(hash_last_pos) - }; - if root == mmr_root { - Ok(()) - } else { - Err(SegmentError::Mismatch) - } - } } impl Readable for SegmentProof { diff --git a/p2p/src/codec.rs b/p2p/src/codec.rs index bfab8e918..795dabb0b 100644 --- a/p2p/src/codec.rs +++ b/p2p/src/codec.rs @@ -267,12 +267,15 @@ fn decode_message( Type::TxHashSetArchive => Message::TxHashSetArchive(msg.body()?), Type::GetOutputBitmapSegment => Message::GetOutputBitmapSegment(msg.body()?), Type::OutputBitmapSegment => Message::OutputBitmapSegment(msg.body()?), + Type::StartPibdSyncRequest => Message::StartPibdSyncRequest(msg.body()?), + Type::PibdSyncState => Message::PibdSyncState(msg.body()?), Type::GetOutputSegment => Message::GetOutputSegment(msg.body()?), Type::OutputSegment => Message::OutputSegment(msg.body()?), Type::GetRangeProofSegment => Message::GetRangeProofSegment(msg.body()?), Type::RangeProofSegment => Message::RangeProofSegment(msg.body()?), Type::GetKernelSegment => Message::GetKernelSegment(msg.body()?), Type::KernelSegment => Message::KernelSegment(msg.body()?), + Type::HasAnotherArchiveHeader => Message::HasAnotherArchiveHeader(msg.body()?), Type::Error | Type::Hand | Type::Shake | Type::Headers => { return Err(Error::UnexpectedMessage(format!( "get message with type {:?} (code {})", diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index a7c9986b6..f453d3dda 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -77,14 +77,17 @@ enum_from_primitive! { GetTransaction = 19, TransactionKernel = 20, TorAddress = 23, - GetOutputBitmapSegment = 24, - OutputBitmapSegment = 25, - GetOutputSegment = 26, - OutputSegment = 27, - GetRangeProofSegment = 28, - RangeProofSegment = 29, - GetKernelSegment = 30, - KernelSegment = 31, + StartPibdSyncRequest = 24, + GetOutputBitmapSegment = 25, + OutputBitmapSegment = 26, + GetOutputSegment = 27, + OutputSegment = 28, + GetRangeProofSegment = 29, + RangeProofSegment = 30, + GetKernelSegment = 31, + KernelSegment = 32, + HasAnotherArchiveHeader = 33, + PibdSyncState = 34, } } @@ -117,7 +120,7 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::CompactBlock => max_block_size() / 10, Type::StemTransaction => max_block_size(), Type::Transaction => max_block_size(), - Type::TxHashSetRequest => 40, + Type::TxHashSetRequest => 40, // 32+8=40 Type::TxHashSetArchive => 64, Type::BanReason => 64, Type::GetTransaction => 32, @@ -131,6 +134,9 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::RangeProofSegment => 2 * max_block_size(), Type::GetKernelSegment => 41, Type::KernelSegment => 2 * max_block_size(), + Type::StartPibdSyncRequest => 40, // 32+8=40 + Type::HasAnotherArchiveHeader => 40, + Type::PibdSyncState => 72, // 32 + 8 + 32 = 72 } } @@ -763,16 +769,15 @@ impl Readable for BanReason { } } -/// Request to get an archive of the full txhashset store, required to sync -/// a new node. -pub struct TxHashSetRequest { +/// Request to get PIBD sync request +pub struct ArchiveHeaderData { /// Hash of the block for which the txhashset should be provided pub hash: Hash, /// Height of the corresponding block pub height: u64, } -impl Writeable for TxHashSetRequest { +impl Writeable for ArchiveHeaderData { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { self.hash.write(writer)?; writer.write_u64(self.height)?; @@ -780,15 +785,43 @@ impl Writeable for TxHashSetRequest { } } -impl Readable for TxHashSetRequest { - fn read(reader: &mut R) -> Result { - Ok(TxHashSetRequest { +impl Readable for ArchiveHeaderData { + fn read(reader: &mut R) -> Result { + Ok(ArchiveHeaderData { hash: Hash::read(reader)?, height: reader.read_u64()?, }) } } +pub struct PibdSyncState { + /// Hash of the block for which the txhashset should be provided + pub header_hash: Hash, + /// Height of the corresponding block + pub header_height: u64, + /// output bitmap root hash + pub output_bitmap_root: Hash, +} + +impl Writeable for PibdSyncState { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + self.header_hash.write(writer)?; + writer.write_u64(self.header_height)?; + self.output_bitmap_root.write(writer)?; + Ok(()) + } +} + +impl Readable for PibdSyncState { + fn read(reader: &mut R) -> Result { + Ok(PibdSyncState { + header_hash: Hash::read(reader)?, + header_height: reader.read_u64()?, + output_bitmap_root: Hash::read(reader)?, + }) + } +} + /// Request to get a segment of a (P)MMR at a particular block. pub struct SegmentRequest { /// The hash of the block the MMR is associated with @@ -845,25 +878,18 @@ impl Writeable for SegmentResponse { pub struct OutputSegmentResponse { /// The segment response pub response: SegmentResponse, - /// The root hash of the output bitmap MMR - pub output_bitmap_root: Hash, } impl Readable for OutputSegmentResponse { fn read(reader: &mut R) -> Result { let response = Readable::read(reader)?; - let output_bitmap_root = Readable::read(reader)?; - Ok(Self { - response, - output_bitmap_root, - }) + Ok(Self { response }) } } impl Writeable for OutputSegmentResponse { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { - Writeable::write(&self.response, writer)?; - Writeable::write(&self.output_bitmap_root, writer) + Writeable::write(&self.response, writer) } } @@ -873,19 +899,15 @@ pub struct OutputBitmapSegmentResponse { pub block_hash: Hash, /// The MMR segment pub segment: BitmapSegment, - /// The root hash of the output PMMR - pub output_root: Hash, } impl Readable for OutputBitmapSegmentResponse { fn read(reader: &mut R) -> Result { let block_hash = Readable::read(reader)?; let segment = Readable::read(reader)?; - let output_root = Readable::read(reader)?; Ok(Self { block_hash, segment, - output_root, }) } } @@ -893,8 +915,7 @@ impl Readable for OutputBitmapSegmentResponse { impl Writeable for OutputBitmapSegmentResponse { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { Writeable::write(&self.block_hash, writer)?; - Writeable::write(&self.segment, writer)?; - Writeable::write(&self.output_root, writer) + Writeable::write(&self.segment, writer) } } @@ -916,10 +937,12 @@ pub enum Message { Headers(HeadersData), GetPeerAddrs(GetPeerAddrs), PeerAddrs(PeerAddrs), - TxHashSetRequest(TxHashSetRequest), + TxHashSetRequest(ArchiveHeaderData), TxHashSetArchive(TxHashSetArchive), Attachment(AttachmentUpdate, Option), TorAddress(TorAddress), + StartPibdSyncRequest(ArchiveHeaderData), + PibdSyncState(PibdSyncState), GetOutputBitmapSegment(SegmentRequest), OutputBitmapSegment(OutputBitmapSegmentResponse), GetOutputSegment(SegmentRequest), @@ -928,6 +951,7 @@ pub enum Message { RangeProofSegment(SegmentResponse), GetKernelSegment(SegmentRequest), KernelSegment(SegmentResponse), + HasAnotherArchiveHeader(ArchiveHeaderData), } /// We receive 512 headers from a peer. @@ -973,6 +997,11 @@ impl fmt::Display for Message { Message::RangeProofSegment(_) => write!(f, "range proof segment"), Message::GetKernelSegment(_) => write!(f, "get kernel segment"), Message::KernelSegment(_) => write!(f, "kernel segment"), + Message::PibdSyncState(_) => write!(f, "PIBD sync state"), + Message::StartPibdSyncRequest(_) => write!(f, "start PIBD sync"), + Message::HasAnotherArchiveHeader(_) => { + write!(f, "PIBD error, has another archive header") + } } } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 408781f4b..7f8098433 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -33,7 +33,7 @@ use crate::grin_core::ser::Writeable; use crate::grin_core::{core, global}; use crate::handshake::Handshake; use crate::msg::{ - self, BanReason, GetPeerAddrs, Locator, Msg, Ping, SegmentRequest, TxHashSetRequest, Type, + self, ArchiveHeaderData, BanReason, GetPeerAddrs, Locator, Msg, Ping, SegmentRequest, Type, }; use crate::protocol::Protocol; use crate::types::{ @@ -42,6 +42,7 @@ use crate::types::{ }; use crate::util::secp::pedersen::RangeProof; use chrono::prelude::{DateTime, Utc}; +use grin_chain::txhashset::Segmenter; use std::time::Instant; const MAX_TRACK_SIZE: usize = 30; @@ -56,6 +57,60 @@ enum State { Banned, } +pub struct PeerPibdStatus { + /// Hash of the block for which the txhashset should be provided + pub header_hash: Hash, + /// Height of the corresponding block + pub header_height: u64, + /// output bitmap root hash + pub output_bitmap_root: Option, + // History of commited output_bitmaps + header hashes. Needed to ban the peer in case of misbehaviour + pub output_bitmap_root_header_history: Vec, + /// Time when request for pibd start was sent last time (timestamp in seconds) + pub initiate_pibd_request_time: i64, + /// Number of requests that was sent after success response recieved + pub no_response_requests: u32, + /// Time when first no responsive request was sent + pub no_response_time: Option, +} + +impl PeerPibdStatus { + pub fn default() -> PeerPibdStatus { + PeerPibdStatus { + header_hash: Hash::default(), + header_height: 0, + output_bitmap_root: None, + output_bitmap_root_header_history: Vec::new(), + initiate_pibd_request_time: 0, + no_response_requests: 0, + no_response_time: None, + } + } + + pub fn update_pibd_status( + &mut self, + header_hash: Hash, + header_height: u64, + output_bitmap_root: Option, + ) { + self.header_hash = header_hash; + self.header_height = header_height; + + match output_bitmap_root { + Some(hash) => { + let hist_hash = (hash, header_hash).hash(); + if !self.output_bitmap_root_header_history.contains(&hist_hash) { + self.output_bitmap_root_header_history.push(hist_hash); + } + self.output_bitmap_root = Some(hash); + } + None => { + self.output_bitmap_root = None; + } + } + } +} + pub struct Peer { pub info: PeerInfo, state: Arc>, @@ -69,6 +124,8 @@ pub struct Peer { stop_handle: Mutex, // Whether or not we requested a txhashset from this peer state_sync_requested: Arc, + // PIBD available data status + pub pibd_status: Arc>, } impl fmt::Debug for Peer { @@ -89,12 +146,14 @@ impl Peer { let state = Arc::new(RwLock::new(State::Connected)); let state_sync_requested = Arc::new(AtomicBool::new(false)); let tracking_adapter = TrackingAdapter::new(adapter); + let pibd_status = Arc::new(Mutex::new(PeerPibdStatus::default())); let handler = Protocol::new( Arc::new(tracking_adapter.clone()), info.clone(), state_sync_requested.clone(), header_cache_size, server, + pibd_status.clone(), ); let tracker = Arc::new(conn::Tracker::new()); let (sendh, stoph) = conn::listen(conn, info.version, tracker.clone(), handler)?; @@ -108,6 +167,7 @@ impl Peer { send_handle, stop_handle, state_sync_requested, + pibd_status, }) } @@ -402,16 +462,37 @@ impl Peer { ); self.state_sync_requested.store(true, Ordering::Relaxed); self.send( - &TxHashSetRequest { hash, height }, + &ArchiveHeaderData { hash, height }, msg::Type::TxHashSetRequest, ) } + pub fn send_start_pibd_sync_request(&self, height: u64, hash: Hash) -> Result<(), Error> { + info!( + "Asking peer {} for pibd sync at {} {}.", + self.info.addr, height, hash + ); + self.report_pibd_request(); + self.send( + &ArchiveHeaderData { hash, height }, + msg::Type::StartPibdSyncRequest, + ) + } + + fn report_pibd_request(&self) { + let mut pibd_status = self.pibd_status.lock(); + pibd_status.no_response_requests += 1; + if pibd_status.no_response_time.is_none() { + pibd_status.no_response_time = Some(Utc::now().timestamp()); + } + } + pub fn send_bitmap_segment_request( &self, h: Hash, identifier: SegmentIdentifier, ) -> Result<(), Error> { + self.report_pibd_request(); self.send( &SegmentRequest { block_hash: h, @@ -426,6 +507,7 @@ impl Peer { h: Hash, identifier: SegmentIdentifier, ) -> Result<(), Error> { + self.report_pibd_request(); self.send( &SegmentRequest { block_hash: h, @@ -440,6 +522,7 @@ impl Peer { h: Hash, identifier: SegmentIdentifier, ) -> Result<(), Error> { + self.report_pibd_request(); self.send( &SegmentRequest { block_hash: h, @@ -454,6 +537,7 @@ impl Peer { h: Hash, identifier: SegmentIdentifier, ) -> Result<(), Error> { + self.report_pibd_request(); self.send( &SegmentRequest { block_hash: h, @@ -480,6 +564,26 @@ impl Peer { None => error!("can't get stop lock for peer"), } } + + /// check if this peer ever commited for specific pibd hash + pub fn commited_to_pibd_bitmap_output_root( + &self, + output_bitmap_root_header_hash: &Hash, + ) -> bool { + let status = self.pibd_status.lock(); + status + .output_bitmap_root_header_history + .contains(&output_bitmap_root_header_hash) + } + + /// + pub fn get_pibd_no_response_state(&self) -> Option<(u32, i64)> { + let status = self.pibd_status.lock(); + match status.no_response_time { + None => None, + Some(time) => Some((status.no_response_requests, time)), + } + } } /// Adapter implementation that forwards everything to an underlying adapter @@ -693,6 +797,11 @@ impl ChainAdapter for TrackingAdapter { self.adapter.get_tmpfile_pathname(tmpfile_name) } + /// For MWC handshake we need to have a segmenter ready with output bitmap ready and commited. + fn prepare_segmenter(&self) -> Result { + self.adapter.prepare_segmenter() + } + fn get_kernel_segment( &self, hash: Hash, @@ -705,7 +814,7 @@ impl ChainAdapter for TrackingAdapter { &self, hash: Hash, id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error> { + ) -> Result, chain::Error> { self.adapter.get_bitmap_segment(hash, id) } @@ -713,7 +822,7 @@ impl ChainAdapter for TrackingAdapter { &self, hash: Hash, id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error> { + ) -> Result, chain::Error> { self.adapter.get_output_segment(hash, id) } @@ -728,21 +837,17 @@ impl ChainAdapter for TrackingAdapter { fn receive_bitmap_segment( &self, block_hash: Hash, - output_root: Hash, segment: Segment, ) -> Result { - self.adapter - .receive_bitmap_segment(block_hash, output_root, segment) + self.adapter.receive_bitmap_segment(block_hash, segment) } fn receive_output_segment( &self, block_hash: Hash, - bitmap_root: Hash, segment: Segment, ) -> Result { - self.adapter - .receive_output_segment(block_hash, bitmap_root, segment) + self.adapter.receive_output_segment(block_hash, segment) } fn receive_rangeproof_segment( diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index e9e3d4118..7c0fd4272 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -37,6 +37,7 @@ use crate::types::{ use crate::util::secp::pedersen::RangeProof; use chrono::prelude::*; use chrono::Duration; +use grin_chain::txhashset::Segmenter; use grin_util::StopState; const LOCK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2); @@ -643,6 +644,11 @@ impl ChainAdapter for Peers { self.adapter.get_tmpfile_pathname(tmpfile_name) } + /// For MWC handshake we need to have a segmenter ready with output bitmap ready and commited. + fn prepare_segmenter(&self) -> Result { + self.adapter.prepare_segmenter() + } + fn get_kernel_segment( &self, hash: Hash, @@ -655,7 +661,7 @@ impl ChainAdapter for Peers { &self, hash: Hash, id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error> { + ) -> Result, chain::Error> { self.adapter.get_bitmap_segment(hash, id) } @@ -663,7 +669,7 @@ impl ChainAdapter for Peers { &self, hash: Hash, id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error> { + ) -> Result, chain::Error> { self.adapter.get_output_segment(hash, id) } @@ -678,21 +684,17 @@ impl ChainAdapter for Peers { fn receive_bitmap_segment( &self, block_hash: Hash, - output_root: Hash, segment: Segment, ) -> Result { - self.adapter - .receive_bitmap_segment(block_hash, output_root, segment) + self.adapter.receive_bitmap_segment(block_hash, segment) } fn receive_output_segment( &self, block_hash: Hash, - bitmap_root: Hash, segment: Segment, ) -> Result { - self.adapter - .receive_output_segment(block_hash, bitmap_root, segment) + self.adapter.receive_output_segment(block_hash, segment) } fn receive_rangeproof_segment( diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index ac5b76f26..b5a220c71 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -17,12 +17,15 @@ use crate::grin_core::core::{hash::Hashed, CompactBlock}; use crate::{chain, Capabilities}; use crate::msg::{ - Consumed, Headers, Message, Msg, OutputBitmapSegmentResponse, OutputSegmentResponse, PeerAddrs, - Pong, SegmentRequest, SegmentResponse, TxHashSetArchive, Type, + ArchiveHeaderData, Consumed, Headers, Message, Msg, OutputBitmapSegmentResponse, + OutputSegmentResponse, PeerAddrs, PibdSyncState, Pong, SegmentRequest, SegmentResponse, + TxHashSetArchive, Type, }; +use crate::peer::PeerPibdStatus; use crate::serv::Server; use crate::types::{AttachmentMeta, Error, NetAdapter, PeerAddr, PeerAddr::Onion, PeerInfo}; use chrono::prelude::Utc; +use grin_util::Mutex; use rand::{thread_rng, Rng}; use std::fs::{self, File}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -34,6 +37,7 @@ pub struct Protocol { state_sync_requested: Arc, header_cache_size: u64, server: Server, + pibd_status: Arc>, } impl Protocol { @@ -43,6 +47,7 @@ impl Protocol { state_sync_requested: Arc, header_cache_size: u64, server: Server, + pibd_status: Arc>, ) -> Protocol { Protocol { adapter, @@ -50,6 +55,15 @@ impl Protocol { state_sync_requested, header_cache_size, server, + pibd_status, + } + } + + fn report_pibd_response(&self, success: bool) { + if success { + let mut pibd_status = self.pibd_status.lock(); + pibd_status.no_response_requests = 0; + pibd_status.no_response_time = None; } } } @@ -365,26 +379,79 @@ impl MessageHandler for Protocol { Consumed::Attachment(Arc::new(meta), file) } - + Message::StartPibdSyncRequest(sm_req) => { + debug!( + "handle_payload: start PIBD request for {} at {}", + sm_req.hash, sm_req.height + ); + match self.adapter.prepare_segmenter() { + Ok(segmenter) => { + let header = segmenter.header(); + let header_hash = header.hash(); + if header_hash == sm_req.hash && header.height == sm_req.height { + if let Ok(bitmap_root_hash) = segmenter.bitmap_root() { + // we can start the sync process, let's prepare the segmenter + Consumed::Response(Msg::new( + Type::PibdSyncState, + &PibdSyncState { + header_height: header.height, + header_hash: header_hash, + output_bitmap_root: bitmap_root_hash, + }, + self.peer_info.version, + )?) + } else { + Consumed::None + } + } else { + Consumed::Response(Msg::new( + Type::HasAnotherArchiveHeader, + &ArchiveHeaderData { + height: header.height, + hash: header_hash, + }, + self.peer_info.version, + )?) + } + } + Err(e) => { + warn!( + "Unable to prepare segment for PIBD request for {} at {}. Error: {}", + sm_req.hash, sm_req.height, e + ); + Consumed::None + } + } + } Message::GetOutputBitmapSegment(req) => { let SegmentRequest { block_hash, identifier, } = req; - if let Ok((segment, output_root)) = - self.adapter.get_bitmap_segment(block_hash, identifier) - { - Consumed::Response(Msg::new( + + match self.adapter.get_bitmap_segment(block_hash, identifier) { + Ok(segment) => Consumed::Response(Msg::new( Type::OutputBitmapSegment, OutputBitmapSegmentResponse { block_hash, segment: segment.into(), - output_root, }, self.peer_info.version, - )?) - } else { - Consumed::None + )?), + Err(chain::Error::SegmenterHeaderMismatch(hash, height)) => { + Consumed::Response(Msg::new( + Type::HasAnotherArchiveHeader, + &ArchiveHeaderData { + height: height, + hash: hash, + }, + self.peer_info.version, + )?) + } + Err(e) => { + warn!("Failed to process GetOutputBitmapSegment for block_hash={} and identifier={:?}. Error: {}", block_hash, identifier, e); + Consumed::None + } } } Message::GetOutputSegment(req) => { @@ -392,22 +459,32 @@ impl MessageHandler for Protocol { block_hash, identifier, } = req; - if let Ok((segment, output_bitmap_root)) = - self.adapter.get_output_segment(block_hash, identifier) - { - Consumed::Response(Msg::new( + + match self.adapter.get_output_segment(block_hash, identifier) { + Ok(segment) => Consumed::Response(Msg::new( Type::OutputSegment, OutputSegmentResponse { response: SegmentResponse { block_hash, segment, }, - output_bitmap_root, }, self.peer_info.version, - )?) - } else { - Consumed::None + )?), + Err(chain::Error::SegmenterHeaderMismatch(hash, height)) => { + Consumed::Response(Msg::new( + Type::HasAnotherArchiveHeader, + &ArchiveHeaderData { + height: height, + hash: hash, + }, + self.peer_info.version, + )?) + } + Err(e) => { + warn!("Failed to process GetOutputSegment for block_hash={} and identifier={:?}. Error: {}", block_hash, identifier, e); + Consumed::None + } } } Message::GetRangeProofSegment(req) => { @@ -415,17 +492,29 @@ impl MessageHandler for Protocol { block_hash, identifier, } = req; - if let Ok(segment) = self.adapter.get_rangeproof_segment(block_hash, identifier) { - Consumed::Response(Msg::new( + match self.adapter.get_rangeproof_segment(block_hash, identifier) { + Ok(segment) => Consumed::Response(Msg::new( Type::RangeProofSegment, SegmentResponse { block_hash, segment, }, self.peer_info.version, - )?) - } else { - Consumed::None + )?), + Err(chain::Error::SegmenterHeaderMismatch(hash, height)) => { + Consumed::Response(Msg::new( + Type::HasAnotherArchiveHeader, + &ArchiveHeaderData { + height: height, + hash: hash, + }, + self.peer_info.version, + )?) + } + Err(e) => { + warn!("Failed to process GetRangeProofSegment for block_hash={} and identifier={:?}. Error: {}", block_hash, identifier, e); + Consumed::None + } } } Message::GetKernelSegment(req) => { @@ -433,48 +522,71 @@ impl MessageHandler for Protocol { block_hash, identifier, } = req; - if let Ok(segment) = self.adapter.get_kernel_segment(block_hash, identifier) { - Consumed::Response(Msg::new( + + match self.adapter.get_kernel_segment(block_hash, identifier) { + Ok(segment) => Consumed::Response(Msg::new( Type::KernelSegment, SegmentResponse { block_hash, segment, }, self.peer_info.version, - )?) - } else { - Consumed::None + )?), + Err(chain::Error::SegmenterHeaderMismatch(hash, height)) => { + Consumed::Response(Msg::new( + Type::HasAnotherArchiveHeader, + &ArchiveHeaderData { + height: height, + hash: hash, + }, + self.peer_info.version, + )?) + } + Err(e) => { + warn!("Failed to process GetKernelSegment for block_hash={} and identifier={:?}. Error: {}", block_hash, identifier, e); + Consumed::None + } } } + Message::PibdSyncState(req) => { + let mut status = self.pibd_status.lock(); + status.update_pibd_status( + req.header_hash, + req.header_height, + Some(req.output_bitmap_root), + ); + self.report_pibd_response(true); + Consumed::None + } + Message::HasAnotherArchiveHeader(req) => { + let mut status = self.pibd_status.lock(); + status.update_pibd_status(req.hash, req.height, None); + Consumed::None + } Message::OutputBitmapSegment(req) => { let OutputBitmapSegmentResponse { block_hash, segment, - output_root, } = req; - trace!( - "Received Output Bitmap Segment: bh, output_root: {}, {}", - block_hash, - output_root - ); - adapter.receive_bitmap_segment(block_hash, output_root, segment.into())?; + trace!("Received Output Bitmap Segment: bh: {}", block_hash,); + + adapter + .receive_bitmap_segment(block_hash, segment.into()) + .and_then(|ok| { + self.report_pibd_response(ok); + Ok(ok) + })?; Consumed::None } Message::OutputSegment(req) => { - let OutputSegmentResponse { - response, - output_bitmap_root, - } = req; - trace!( - "Received Output Segment: bh, bitmap_root: {}, {}", - response.block_hash, - output_bitmap_root - ); - adapter.receive_output_segment( - response.block_hash, - output_bitmap_root, - response.segment.into(), - )?; + let OutputSegmentResponse { response } = req; + trace!("Received Output Segment: bh, {}", response.block_hash,); + adapter + .receive_output_segment(response.block_hash, response.segment.into()) + .and_then(|ok| { + self.report_pibd_response(ok); + Ok(ok) + })?; Consumed::None } Message::RangeProofSegment(req) => { @@ -483,7 +595,12 @@ impl MessageHandler for Protocol { segment, } = req; trace!("Received Rangeproof Segment: bh: {}", block_hash); - adapter.receive_rangeproof_segment(block_hash, segment.into())?; + adapter + .receive_rangeproof_segment(block_hash, segment.into()) + .and_then(|ok| { + self.report_pibd_response(ok); + Ok(ok) + })?; Consumed::None } Message::KernelSegment(req) => { @@ -492,7 +609,12 @@ impl MessageHandler for Protocol { segment, } = req; trace!("Received Kernel Segment: bh: {}", block_hash); - adapter.receive_kernel_segment(block_hash, segment.into())?; + adapter + .receive_kernel_segment(block_hash, segment.into()) + .and_then(|ok| { + self.report_pibd_response(ok); + Ok(ok) + })?; Consumed::None } Message::Unknown(_) => Consumed::None, diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 4964ad9f0..26c4c85ad 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -39,6 +39,7 @@ use crate::types::{ use crate::util::secp::pedersen::RangeProof; use crate::util::StopState; use chrono::prelude::{DateTime, Utc}; +use grin_chain::txhashset::Segmenter; /// P2P server implementation, handling bootstrapping to find and connect to /// peers, receiving connections from other peers and keep track of all of them. @@ -488,6 +489,10 @@ impl ChainAdapter for DummyAdapter { unimplemented!() } + fn prepare_segmenter(&self) -> Result { + unimplemented!() + } + fn get_kernel_segment( &self, _hash: Hash, @@ -500,7 +505,7 @@ impl ChainAdapter for DummyAdapter { &self, _hash: Hash, _id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error> { + ) -> Result, chain::Error> { unimplemented!() } @@ -508,7 +513,7 @@ impl ChainAdapter for DummyAdapter { &self, _hash: Hash, _id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error> { + ) -> Result, chain::Error> { unimplemented!() } @@ -523,7 +528,6 @@ impl ChainAdapter for DummyAdapter { fn receive_bitmap_segment( &self, _block_hash: Hash, - _output_root: Hash, _segment: Segment, ) -> Result { unimplemented!() @@ -532,7 +536,6 @@ impl ChainAdapter for DummyAdapter { fn receive_output_segment( &self, _block_hash: Hash, - _bitmap_root: Hash, _segment: Segment, ) -> Result { unimplemented!() diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 5292d4b26..a2a56891c 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -40,6 +40,7 @@ use crate::grin_core::ser::{self, ProtocolVersion, Readable, Reader, Writeable, use crate::msg::PeerAddrs; use crate::util::secp::pedersen::RangeProof; use crate::util::RwLock; +use grin_chain::txhashset::Segmenter; use std::time::Instant; /// Maximum number of block headers a peer should ever send @@ -526,6 +527,8 @@ enum_from_primitive! { ManualBan = 5, FraudHeight = 6, BadHandshake = 7, + PibdFailure = 8, + PibdInactive = 9, } } @@ -766,6 +769,9 @@ pub trait ChainAdapter: Sync + Send { /// Delete file if tmp file already exists fn get_tmpfile_pathname(&self, tmpfile_name: String) -> PathBuf; + /// For MWC handshake we need to have a segmenter ready with output bitmap ready and commited. + fn prepare_segmenter(&self) -> Result; + fn get_kernel_segment( &self, hash: Hash, @@ -776,13 +782,13 @@ pub trait ChainAdapter: Sync + Send { &self, hash: Hash, id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error>; + ) -> Result, chain::Error>; fn get_output_segment( &self, hash: Hash, id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error>; + ) -> Result, chain::Error>; fn get_rangeproof_segment( &self, @@ -793,14 +799,12 @@ pub trait ChainAdapter: Sync + Send { fn receive_bitmap_segment( &self, block_hash: Hash, - output_root: Hash, segment: Segment, ) -> Result; fn receive_output_segment( &self, block_hash: Hash, - bitmap_root: Hash, segment: Segment, ) -> Result; diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 94bdb0250..ede3486eb 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -47,6 +47,7 @@ use crate::util::secp::pedersen::RangeProof; use crate::util::OneTime; use chrono::prelude::*; use chrono::Duration; +use grin_chain::txhashset::Segmenter; use rand::prelude::*; use std::ops::Range; use std::sync::atomic::AtomicI64; @@ -728,6 +729,10 @@ where self.chain().get_tmpfile_pathname(tmpfile_name) } + fn prepare_segmenter(&self) -> Result { + self.chain().segmenter() + } + fn get_kernel_segment( &self, hash: Hash, @@ -737,8 +742,12 @@ where return Err(chain::Error::InvalidSegmentHeight); } let segmenter = self.chain().segmenter()?; - if segmenter.header().hash() != hash { - return Err(chain::Error::SegmenterHeaderMismatch); + let head_hash = segmenter.header().hash(); + if head_hash != hash { + return Err(chain::Error::SegmenterHeaderMismatch( + head_hash, + segmenter.header().height, + )); } segmenter.kernel_segment(id) } @@ -747,13 +756,17 @@ where &self, hash: Hash, id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error> { + ) -> Result, chain::Error> { if !BITMAP_SEGMENT_HEIGHT_RANGE.contains(&id.height) { return Err(chain::Error::InvalidSegmentHeight); } let segmenter = self.chain().segmenter()?; - if segmenter.header().hash() != hash { - return Err(chain::Error::SegmenterHeaderMismatch); + let head_hash = segmenter.header().hash(); + if head_hash != hash { + return Err(chain::Error::SegmenterHeaderMismatch( + head_hash, + segmenter.header().height, + )); } segmenter.bitmap_segment(id) } @@ -762,13 +775,17 @@ where &self, hash: Hash, id: SegmentIdentifier, - ) -> Result<(Segment, Hash), chain::Error> { + ) -> Result, chain::Error> { if !OUTPUT_SEGMENT_HEIGHT_RANGE.contains(&id.height) { return Err(chain::Error::InvalidSegmentHeight); } let segmenter = self.chain().segmenter()?; - if segmenter.header().hash() != hash { - return Err(chain::Error::SegmenterHeaderMismatch); + let head_hash = segmenter.header().hash(); + if head_hash != hash { + return Err(chain::Error::SegmenterHeaderMismatch( + head_hash, + segmenter.header().height, + )); } segmenter.output_segment(id) } @@ -782,8 +799,12 @@ where return Err(chain::Error::InvalidSegmentHeight); } let segmenter = self.chain().segmenter()?; - if segmenter.header().hash() != hash { - return Err(chain::Error::SegmenterHeaderMismatch); + let head_hash = segmenter.header().hash(); + if head_hash != hash { + return Err(chain::Error::SegmenterHeaderMismatch( + head_hash, + segmenter.header().height, + )); } segmenter.rangeproof_segment(id) } @@ -791,22 +812,28 @@ where fn receive_bitmap_segment( &self, block_hash: Hash, - output_root: Hash, segment: Segment, ) -> Result { debug!( - "Received bitmap segment {} for block_hash: {}, output_root: {}", + "Received bitmap segment {} for block_hash: {}", segment.identifier().idx, - block_hash, - output_root + block_hash ); // TODO: Entire process needs to be restarted if the horizon block // has changed (perhaps not here, NB this has to go somewhere) let archive_header = self.chain().txhashset_archive_header_header_only()?; + if archive_header.hash() != block_hash { + return Ok(false); + } let identifier = segment.identifier().clone(); let mut retval = Ok(true); - if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { - let res = d.add_bitmap_segment(segment, output_root); + if let Some(d) = self + .chain() + .get_desegmenter(&archive_header) + .write() + .as_mut() + { + let res = d.add_bitmap_segment(segment); if let Err(e) = res { error!( "Validation of incoming bitmap segment failed: {:?}, reason: {}", @@ -814,6 +841,8 @@ where ); retval = Err(e); } + } else { + retval = Ok(false); } // Remove segment from outgoing list self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { @@ -826,20 +855,26 @@ where fn receive_output_segment( &self, block_hash: Hash, - bitmap_root: Hash, segment: Segment, ) -> Result { debug!( - "Received output segment {} for block_hash: {}, bitmap_root: {:?}", + "Received output segment {} for block_hash: {}", segment.identifier().idx, block_hash, - bitmap_root, ); let archive_header = self.chain().txhashset_archive_header_header_only()?; + if archive_header.hash() != block_hash { + return Ok(false); + } let identifier = segment.identifier().clone(); let mut retval = Ok(true); - if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { - let res = d.add_output_segment(segment, Some(bitmap_root)); + if let Some(d) = self + .chain() + .get_desegmenter(&archive_header) + .write() + .as_mut() + { + let res = d.add_output_segment(segment); if let Err(e) = res { error!( "Validation of incoming output segment failed: {:?}, reason: {}", @@ -847,6 +882,8 @@ where ); retval = Err(e); } + } else { + retval = Ok(false); } // Remove segment from outgoing list self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { @@ -867,9 +904,17 @@ where block_hash, ); let archive_header = self.chain().txhashset_archive_header_header_only()?; + if archive_header.hash() != block_hash { + return Ok(false); + } let identifier = segment.identifier().clone(); let mut retval = Ok(true); - if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { + if let Some(d) = self + .chain() + .get_desegmenter(&archive_header) + .write() + .as_mut() + { let res = d.add_rangeproof_segment(segment); if let Err(e) = res { error!( @@ -878,6 +923,8 @@ where ); retval = Err(e); } + } else { + retval = Ok(false); } // Remove segment from outgoing list self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { @@ -898,9 +945,17 @@ where block_hash, ); let archive_header = self.chain().txhashset_archive_header_header_only()?; + if archive_header.hash() != block_hash { + return Ok(false); + } let identifier = segment.identifier().clone(); let mut retval = Ok(true); - if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { + if let Some(d) = self + .chain() + .get_desegmenter(&archive_header) + .write() + .as_mut() + { let res = d.add_kernel_segment(segment); if let Err(e) = res { error!( @@ -909,6 +964,8 @@ where ); retval = Err(e); } + } else { + retval = Ok(false); } // Remove segment from outgoing list self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index 0074b4d3b..3284bb2d4 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -14,6 +14,11 @@ use chrono::prelude::{DateTime, Utc}; use chrono::Duration; +use grin_core::core::hash::Hash; +use grin_core::core::BlockHeader; +use grin_p2p::ReasonForBan; +use grin_util::secp::rand::Rng; +use rand::seq::SliceRandom; use std::sync::Arc; use crate::chain::{self, pibd_params, SyncState, SyncStatus}; @@ -42,8 +47,13 @@ pub struct StateSync { pibd_aborted: bool, earliest_zero_pibd_peer_time: Option>, + + // Used bitmap_output_root, in case of error we better to bun all related peers + output_bitmap_root_header_hash: Option, } +const MIN_START_PIBD_RESPONCE_LIMIT_SEC: i64 = 60; + impl StateSync { pub fn new( sync_state: Arc, @@ -60,6 +70,7 @@ impl StateSync { last_download_size: 0, pibd_aborted: false, earliest_zero_pibd_peer_time: None, + output_bitmap_root_header_hash: None, } } @@ -108,14 +119,33 @@ impl StateSync { // Check whether we've errored and should restart pibd if using_pibd { if let SyncStatus::TxHashsetPibd { errored: true, .. } = self.sync_state.status() { + // So far in case of error, it is allways something bad happens, we was under attack, data that we got + // is not valid as whole, even all the blocks was fine. + + // That is why we really want to ban all perrs that supported original bitmap_output hash + // Reason for that - that so far it is the only way to fool the node. All other hashes are part of the headers + + if let Some(output_bitmap_root_header_hash) = self.output_bitmap_root_header_hash { + // let's check for supporters and ban who ever commited to the same hash + for peer in self.peers.iter() { + if peer.commited_to_pibd_bitmap_output_root(&output_bitmap_root_header_hash) + { + if let Err(err) = self + .peers + .ban_peer(peer.info.addr.clone(), ReasonForBan::PibdFailure) + { + error!("Unable to ban the peer {}, error {}", &peer.info.addr, err); + } + } + } + self.output_bitmap_root_header_hash = None; + } + let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); error!("PIBD Reported Failure - Restarting Sync"); // reset desegmenter state - let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + self.chain.reset_desegmenter(); - if let Some(d) = desegmenter.write().as_mut() { - d.reset(); - }; if let Err(e) = self.chain.reset_pibd_head() { error!("pibd_sync restart: reset pibd_head error = {}", e); } @@ -180,40 +210,70 @@ impl StateSync { self.sync_state .update_pibd_progress(false, false, 0, 1, &archive_header); } - // Continue our PIBD process (which returns true if all segments are in) - if self.continue_pibd() { - let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); - // All segments in, validate - if let Some(d) = desegmenter.write().as_mut() { - if let Ok(true) = d.check_progress(self.sync_state.clone()) { - if let Err(e) = d.check_update_leaf_set_state() { - error!("error updating PIBD leaf set: {}", e); - self.sync_state.update_pibd_progress( - false, - true, - 0, - 1, - &archive_header, - ); - return false; - } - if let Err(e) = d.validate_complete_state( - self.sync_state.clone(), - stop_state.clone(), - ) { - error!("error validating PIBD state: {}", e); - self.sync_state.update_pibd_progress( - false, - true, - 0, - 1, - &archive_header, - ); - return false; - } - return true; + + let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); + + self.ban_inactive_pibd_peers(); + self.make_pibd_hand_shake(&archive_header); + + let mut has_segmenter = true; + if self.chain.get_desegmenter(&archive_header).read().is_none() { + has_segmenter = false; + if let Some(bitmap_output_root) = + self.select_pibd_bitmap_output_root(&archive_header) + { + self.output_bitmap_root_header_hash = + Some((bitmap_output_root, archive_header.hash()).hash()); + if let Err(e) = self + .chain + .create_desegmenter(&archive_header, bitmap_output_root) + { + error!( + "Unable to create desegmenter for header at {}, Error: {}", + archive_header.height, e + ); + } else { + has_segmenter = true; } - }; + } + } + + if has_segmenter { + // Continue our PIBD process (which returns true if all segments are in) + if self.continue_pibd(&archive_header) { + let desegmenter = self.chain.get_desegmenter(&archive_header); + // All segments in, validate + if let Some(d) = desegmenter.write().as_mut() { + if let Ok(true) = d.check_progress(self.sync_state.clone()) { + if let Err(e) = d.check_update_leaf_set_state() { + error!("error updating PIBD leaf set: {}", e); + self.sync_state.update_pibd_progress( + false, + true, + 0, + 1, + &archive_header, + ); + return false; + } + if let Err(e) = d.validate_complete_state( + self.sync_state.clone(), + stop_state.clone(), + ) { + error!("error validating PIBD state: {}", e); + self.sync_state.update_pibd_progress( + false, + true, + 0, + 1, + &archive_header, + ); + return false; + } + return true; + } + }; + } } } else { let (go, download_timeout) = self.state_sync_due(); @@ -248,12 +308,144 @@ impl StateSync { true } + fn get_pibd_qualify_peers(&self, archive_header: &BlockHeader) -> Vec> { + // First, get max difficulty or greater peers + self.peers + .iter() + .connected() + .into_iter() + .filter(|peer| { + peer.info.height() > archive_header.height + && peer.info.capabilities.contains(Capabilities::PIBD_HIST) + }) + .collect() + } + + fn get_pibd_ready_peers(&self) -> Vec> { + if let Some(output_bitmap_root_header_hash) = self.output_bitmap_root_header_hash.as_ref() { + // First, get max difficulty or greater peers + self.peers + .iter() + .connected() + .into_iter() + .filter(|peer| { + let pibd_status = peer.pibd_status.lock(); + match &pibd_status.output_bitmap_root { + Some(output_bitmap_root) => { + let peer_output_bitmap_root_header_hash = + (output_bitmap_root, pibd_status.header_hash).hash(); + output_bitmap_root_header_hash == &peer_output_bitmap_root_header_hash + && pibd_status.no_response_requests + <= pibd_params::STALE_REQUESTS_PER_PEER + } + None => false, + } + }) + .collect() + } else { + vec![] + } + } + + fn ban_inactive_pibd_peers(&self) { + let none_active_time_limit = Utc::now().timestamp() - MIN_START_PIBD_RESPONCE_LIMIT_SEC; + let mut banned_peers: Vec> = Vec::new(); + for peer in self.peers.iter().connected().into_iter() { + if let Some((requests, time)) = peer.get_pibd_no_response_state() { + // we can ban this peer if during long time we didn't hear back any correct responses + if time < none_active_time_limit && requests > pibd_params::STALE_REQUESTS_PER_PEER + { + banned_peers.push(peer.clone()); + } + } + } + for peer in banned_peers { + if let Err(err) = self + .peers + .ban_peer(peer.info.addr.clone(), ReasonForBan::PibdInactive) + { + error!("Unable to ban the peer {}, error {}", &peer.info.addr, err); + } + } + } + + fn make_pibd_hand_shake(&self, archive_header: &BlockHeader) { + let peers = self.get_pibd_qualify_peers(archive_header); + + // Minimal interval to send request for starting the PIBD sync process + + let last_handshake_time = Utc::now().timestamp() - MIN_START_PIBD_RESPONCE_LIMIT_SEC; + + for peer in peers { + let mut need_sync = false; + { + // we don't want keep lock for a long time, that is why using need_sync to make api call later + let mut pibd_status = peer.pibd_status.lock(); + if pibd_status.header_height < archive_header.height + && pibd_status.initiate_pibd_request_time < last_handshake_time + { + pibd_status.initiate_pibd_request_time = last_handshake_time; + need_sync = true; + } + } + + if need_sync { + if let Err(e) = + peer.send_start_pibd_sync_request(archive_header.height, archive_header.hash()) + { + info!( + "Error sending start_pibd_sync_request to peer at {}, reason: {:?}", + peer.info.addr, e + ); + } + } + } + } + + // Select a random peer and take it hash. + // Alternative approach is select the largest group, but I think it is less attack resistant. + // Download process takes time, so even if we ban all group after, still majority will be able to control + // the sync process. Using random will give a chances to even single 'good' peer. + fn select_pibd_bitmap_output_root(&self, archive_header: &BlockHeader) -> Option { + let header_hash = archive_header.hash(); + + let handshake_time_limit = Utc::now().timestamp() - MIN_START_PIBD_RESPONCE_LIMIT_SEC / 2; + + let mut min_handshake_time = handshake_time_limit + 1; + + let mut rng = rand::thread_rng(); + + let output_bitmap_roots: Vec = self + .peers + .iter() + .into_iter() + .filter_map(|peer| { + let pibd_status = peer.pibd_status.lock(); + if pibd_status.header_height == archive_header.height + && pibd_status.header_hash == header_hash + && pibd_status.output_bitmap_root.is_some() + { + min_handshake_time = + std::cmp::min(min_handshake_time, pibd_status.initiate_pibd_request_time); + Some(pibd_status.output_bitmap_root.unwrap()) + } else { + None + } + }) + .collect(); + + if output_bitmap_roots.is_empty() || min_handshake_time >= handshake_time_limit { + return None; + } + + return Some(output_bitmap_roots[rng.gen_range(0, output_bitmap_roots.len())]); + } + /// Continue the PIBD process, returning true if the desegmenter is reporting /// that the process is done - fn continue_pibd(&mut self) -> bool { + fn continue_pibd(&mut self, archive_header: &BlockHeader) -> bool { // Check the state of our chain to figure out what we should be requesting next - let archive_header = self.chain.txhashset_archive_header_header_only().unwrap(); - let desegmenter = self.chain.desegmenter(&archive_header).unwrap(); + let desegmenter = self.chain.get_desegmenter(&archive_header); // Remove stale requests, if we haven't recieved the segment within a minute re-request // TODO: verify timing @@ -287,6 +479,21 @@ impl StateSync { next_segment_ids = d.next_desired_segments(pibd_params::SEGMENT_REQUEST_COUNT); } + let pibd_peers = self.get_pibd_ready_peers(); + + // Choose a random "most work" peer, preferring outbound if at all possible. + let mut outbound_peers: Vec> = Vec::new(); + let mut inbound_peers: Vec> = Vec::new(); + let mut rng = rand::thread_rng(); + + for p in pibd_peers { + if p.info.is_outbound() { + outbound_peers.push(p); + } else if p.info.is_inbound() { + inbound_peers.push(p); + } + } + // For each segment, pick a desirable peer and send message // (Provided we're not waiting for a response for this message from someone else) for seg_id in next_segment_ids.iter() { @@ -295,80 +502,63 @@ impl StateSync { continue; } - // TODO: urg - let peers = self.peers.clone(); - - // First, get max difficulty or greater peers - let peers_iter = || peers.iter().connected(); - let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero()); - let peers_iter_max = || peers_iter().with_difficulty(|x| x >= max_diff); - - // Then, further filter by PIBD capabilities v1 - let peers_iter_pibd = || { - peers_iter_max() - .with_capabilities(Capabilities::PIBD_HIST) - .connected() - }; - - // If there are no suitable PIBD-Enabled peers, AND there hasn't been one for a minute, - // abort PIBD and fall back to txhashset download - // Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled - // peer having the max difficulty - if peers_iter_pibd().count() == 0 { - if let None = self.earliest_zero_pibd_peer_time { - self.set_earliest_zero_pibd_peer_time(Some(Utc::now())); - } - if self.earliest_zero_pibd_peer_time.unwrap() - + Duration::seconds(pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS) - < Utc::now() - { - // random abort test - info!("No PIBD-enabled max-difficulty peers for the past {} seconds - Aborting PIBD and falling back to TxHashset.zip download", pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS); - self.sync_state - .update_pibd_progress(true, true, 0, 1, &archive_header); - self.sync_state - .set_sync_error(chain::Error::AbortingPIBDError); - self.set_pibd_aborted(); - return false; - } - } else { - self.set_earliest_zero_pibd_peer_time(None) - } - - // Choose a random "most work" peer, preferring outbound if at all possible. - let peer = peers_iter_pibd() - .outbound() - .choose_random() - .or_else(|| peers_iter_pibd().inbound().choose_random()); + let peer = outbound_peers + .choose(&mut rng) + .or_else(|| inbound_peers.choose(&mut rng)); trace!("Chosen peer is {:?}", peer); - if let Some(p) = peer { - // add to list of segments that are being tracked - self.sync_state.add_pibd_segment(seg_id); - let res = match seg_id.segment_type { - SegmentType::Bitmap => p.send_bitmap_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ), - SegmentType::Output => p.send_output_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ), - SegmentType::RangeProof => p.send_rangeproof_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ), - SegmentType::Kernel => p.send_kernel_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ), - }; - if let Err(e) = res { - info!( - "Error sending request to peer at {}, reason: {:?}", - p.info.addr, e - ); - self.sync_state.remove_pibd_segment(seg_id); + match peer { + None => { + // If there are no suitable PIBD-Enabled peers, AND there hasn't been one for a minute, + // abort PIBD and fall back to txhashset download + // Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled + // peer having the max difficulty + if let None = self.earliest_zero_pibd_peer_time { + self.set_earliest_zero_pibd_peer_time(Some(Utc::now())); + } + if self.earliest_zero_pibd_peer_time.unwrap() + + Duration::seconds(pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS) + < Utc::now() + { + // random abort test + info!("No PIBD-enabled max-difficulty peers for the past {} seconds - Aborting PIBD and falling back to TxHashset.zip download", pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS); + self.sync_state + .update_pibd_progress(true, true, 0, 1, &archive_header); + self.sync_state + .set_sync_error(chain::Error::AbortingPIBDError); + self.set_pibd_aborted(); + return false; + } + } + Some(p) => { + self.set_earliest_zero_pibd_peer_time(None); + + self.sync_state.add_pibd_segment(seg_id); + let res = match seg_id.segment_type { + SegmentType::Bitmap => p.send_bitmap_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::Output => p.send_output_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::RangeProof => p.send_rangeproof_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::Kernel => p.send_kernel_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + }; + if let Err(e) = res { + info!( + "Error sending request to peer at {}, reason: {:?}", + p.info.addr, e + ); + self.sync_state.remove_pibd_segment(seg_id); + } } } }