diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 63113a58d..9ff5bbb42 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -158,7 +158,6 @@ pub struct Chain { orphans: Arc, txhashset: Arc>, header_pmmr: Arc>>, - sync_pmmr: Arc>>, pibd_segmenter: Arc>>, pibd_desegmenter: Arc>>, // POW verification function @@ -194,21 +193,8 @@ impl Chain { ProtocolVersion(1), None, )?; - let mut sync_pmmr = PMMRHandle::new( - Path::new(&db_root).join("header").join("sync_head"), - false, - ProtocolVersion(1), - None, - )?; - setup_head( - &genesis, - &store, - &mut header_pmmr, - &mut sync_pmmr, - &mut txhashset, - false, - )?; + setup_head(&genesis, &store, &mut header_pmmr, &mut txhashset, false)?; // Initialize the output_pos index based on UTXO set // and NRD kernel_pos index based recent kernel history. @@ -226,7 +212,6 @@ impl Chain { orphans: Arc::new(OrphanBlockPool::new()), txhashset: Arc::new(RwLock::new(txhashset)), header_pmmr: Arc::new(RwLock::new(header_pmmr)), - sync_pmmr: Arc::new(RwLock::new(sync_pmmr)), pibd_segmenter: Arc::new(RwLock::new(None)), pibd_desegmenter: Arc::new(RwLock::new(None)), pow_verifier, @@ -239,9 +224,6 @@ impl Chain { // Suppress any errors here in case we cannot find chain.rewind_bad_block()?; - let header_head = chain.header_head()?; - chain.rebuild_sync_mmr(&header_head)?; - chain.log_heads()?; Ok(chain) @@ -306,7 +288,6 @@ impl Chain { /// restart the output PMMRs from scratch pub fn reset_chain_head_to_genesis(&self) -> Result<(), Error> { let mut header_pmmr = self.header_pmmr.write(); - let mut sync_pmmr = self.sync_pmmr.write(); let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; @@ -322,7 +303,6 @@ impl Chain { &self.genesis, &self.store, &mut header_pmmr, - &mut sync_pmmr, &mut txhashset, true, )?; @@ -482,17 +462,6 @@ impl Chain { }; log_head("head", self.head()?); log_head("header_head", self.header_head()?); - log_head("sync_head", self.get_sync_head()?); - - // Needed for Node State tracking... - let sync_head = self.get_sync_head()?; - info!( - "init: sync_head: {} @ {} [{}]", - sync_head.total_difficulty.to_num(), - sync_head.height, - sync_head.last_block_h, - ); - Ok(()) } @@ -1231,21 +1200,6 @@ impl Chain { Ok(()) } - /// Rebuild the sync MMR based on current header_head. - /// We rebuild the sync MMR when first entering sync mode so ensure we - /// have an MMR we can safely rewind based on the headers received from a peer. - pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> { - let mut sync_pmmr = self.sync_pmmr.write(); - let mut batch = self.store.batch()?; - let header = batch.get_block_header(&head.hash())?; - txhashset::header_extending(&mut sync_pmmr, &mut batch, |ext, batch| { - self.rewind_and_apply_header_fork(&header, ext, batch)?; - Ok(()) - })?; - batch.commit()?; - Ok(()) - } - /// Finds the "fork point" where header chain diverges from full block chain. /// If we are syncing this will correspond to the last full block where /// the next header is known but we do not yet have the full block. @@ -1914,16 +1868,7 @@ impl Chain { } } - /// Get the tip of the current "sync" header chain. - /// This may be significantly different to current header chain. - pub fn get_sync_head(&self) -> Result { - let hash = self.sync_pmmr.read().head_hash()?; - let header = self.store.get_block_header(&hash)?; - Ok(Tip::from_header(&header)) - } - /// Gets multiple headers at the provided heights. - /// Note: Uses the sync pmmr, not the header pmmr. /// Note: This is based on the provided sync_head to support syncing against a fork. pub fn get_locator_hashes(&self, sync_head: Tip, heights: &[u64]) -> Result, Error> { let mut header_pmmr = self.header_pmmr.write(); @@ -1961,13 +1906,12 @@ fn setup_head( genesis: &Block, store: &store::ChainStore, header_pmmr: &mut txhashset::PMMRHandle, - sync_pmmr: &mut txhashset::PMMRHandle, txhashset: &mut txhashset::TxHashSet, resetting_pibd: bool, ) -> Result<(), Error> { let mut batch = store.batch()?; - // Apply the genesis header to header and sync MMRs. + // Apply the genesis header to header MMR. { if batch.get_block_header(&genesis.hash()).is_err() { batch.save_block_header(&genesis.header)?; @@ -1978,12 +1922,6 @@ fn setup_head( ext.apply_header(&genesis.header) })?; } - - if sync_pmmr.size == 0 { - txhashset::header_extending(sync_pmmr, &mut batch, |ext, _| { - ext.apply_header(&genesis.header) - })?; - } } // Make sure our header PMMR is consistent with header_head from db if it exists. diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 408781f4b..8d829aca8 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -83,7 +83,6 @@ impl Peer { info: PeerInfo, conn: TcpStream, adapter: Arc, - header_cache_size: u64, server: Server, ) -> std::io::Result { let state = Arc::new(RwLock::new(State::Connected)); @@ -93,7 +92,6 @@ impl Peer { Arc::new(tracking_adapter.clone()), info.clone(), state_sync_requested.clone(), - header_cache_size, server, ); let tracker = Arc::new(conn::Tracker::new()); @@ -117,13 +115,12 @@ impl Peer { total_difficulty: Difficulty, hs: &Handshake, adapter: Arc, - header_cache_size: u64, server: Server, ) -> Result { debug!("accept: handshaking from {:?}", conn.peer_addr()); let info = hs.accept(capab, total_difficulty, &mut conn); match info { - Ok(info) => Ok(Peer::new(info, conn, adapter, header_cache_size, server)?), + Ok(info) => Ok(Peer::new(info, conn, adapter, server)?), Err(e) => { debug!( "accept: handshaking from {:?} failed with error: {:?}", @@ -145,7 +142,6 @@ impl Peer { self_addr: PeerAddr, hs: &Handshake, adapter: Arc, - header_cache_size: u64, peer_addr: Option, server: Server, ) -> Result { @@ -163,7 +159,7 @@ impl Peer { hs.initiate(capab, total_difficulty, self_addr, &mut conn, None) }; match info { - Ok(info) => Ok(Peer::new(info, conn, adapter, header_cache_size, server)?), + Ok(info) => Ok(Peer::new(info, conn, adapter, server)?), Err(e) => { if peer_addr.is_some() { debug!( @@ -612,7 +608,6 @@ impl ChainAdapter for TrackingAdapter { &self, bh: &[core::BlockHeader], peer_info: &PeerInfo, - header_sync_cache_size: u64, ) -> Result { trace!( "peer = {:?}, set header sync = false (in headers)", @@ -633,17 +628,7 @@ impl ChainAdapter for TrackingAdapter { peer_info.header_sync_requested.store(0, Ordering::Relaxed); } trace!("header sync for {} is {}", peer_info.addr, val); - self.adapter - .headers_received(bh, peer_info, header_sync_cache_size) - } - - // note: not needed because adapter is called from headers_received and header_recevied - fn process_add_headers_sync( - &self, - _: &[core::BlockHeader], - _: u64, - ) -> Result { - unimplemented!() + self.adapter.headers_received(bh, peer_info) } fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error> { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index e9e3d4118..881e1e81a 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -561,12 +561,8 @@ impl ChainAdapter for Peers { &self, headers: &[core::BlockHeader], peer_info: &PeerInfo, - header_sync_cache_size: u64, ) -> Result { - if !self - .adapter - .headers_received(headers, peer_info, header_sync_cache_size)? - { + if !self.adapter.headers_received(headers, peer_info)? { // if the peer sent us a block header that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban self.ban_peer(peer_info.addr.clone(), ReasonForBan::BadBlockHeader) @@ -577,15 +573,6 @@ impl ChainAdapter for Peers { } } - // note not needed to implement because adapter is called by headers_received and header_received - fn process_add_headers_sync( - &self, - _: &[core::BlockHeader], - _: u64, - ) -> Result { - unimplemented!() - } - fn locate_headers(&self, hs: &[Hash]) -> Result, chain::Error> { self.adapter.locate_headers(hs) } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index ac5b76f26..018fd7411 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -32,7 +32,6 @@ pub struct Protocol { adapter: Arc, peer_info: PeerInfo, state_sync_requested: Arc, - header_cache_size: u64, server: Server, } @@ -41,14 +40,12 @@ impl Protocol { adapter: Arc, peer_info: PeerInfo, state_sync_requested: Arc, - header_cache_size: u64, server: Server, ) -> Protocol { Protocol { adapter, peer_info, state_sync_requested, - header_cache_size, server, } } @@ -236,7 +233,7 @@ impl MessageHandler for Protocol { } Message::Headers(data) => { - adapter.headers_received(&data.headers, &self.peer_info, self.header_cache_size)?; + adapter.headers_received(&data.headers, &self.peer_info)?; Consumed::None } diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 4964ad9f0..dc7f37390 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -88,7 +88,7 @@ impl Server { /// Starts a new TCP server and listen to incoming connections. This is a /// blocking call until the TCP server stops. - pub fn listen(&self, header_cache_size: u64) -> Result<(), Error> { + pub fn listen(&self) -> Result<(), Error> { // start TCP listener and handle incoming connections let addr = SocketAddr::new(self.config.host, self.config.port); let listener = TcpListener::bind(addr)?; @@ -139,7 +139,7 @@ impl Server { } continue; } - match self.handle_new_peer(stream, header_cache_size) { + match self.handle_new_peer(stream) { Err(Error::ConnectionClose(err)) => { debug!("shutting down, ignoring a new peer, {}", err) } @@ -167,7 +167,7 @@ impl Server { /// Asks the server to connect to a new peer. Directly returns the peer if /// we're already connected to the provided address. - pub fn connect(&self, addr: PeerAddr, header_cache_size: u64) -> Result, Error> { + pub fn connect(&self, addr: PeerAddr) -> Result, Error> { if self.stop_state.is_stopped() { return Err(Error::ConnectionClose(String::from("node is stopping"))); } @@ -281,7 +281,6 @@ impl Server { self_addr, &self.handshake, self.peers.clone(), - header_cache_size, peer_addr, (*self).clone(), )?; @@ -302,7 +301,7 @@ impl Server { } } - fn handle_new_peer(&self, stream: TcpStream, header_cache_size: u64) -> Result<(), Error> { + fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> { if self.stop_state.is_stopped() { return Err(Error::ConnectionClose(String::from("Server is stopping"))); } @@ -315,7 +314,6 @@ impl Server { total_diff, &self.handshake, self.peers.clone(), - header_cache_size, self.clone(), )?; self.peers.add_connected(Arc::new(peer))?; @@ -432,7 +430,6 @@ impl ChainAdapter for DummyAdapter { &self, _: &[core::BlockHeader], _: &PeerInfo, - _: u64, ) -> Result { Ok(true) } @@ -450,14 +447,6 @@ impl ChainAdapter for DummyAdapter { unimplemented!() } - fn process_add_headers_sync( - &self, - _: &[core::BlockHeader], - _: u64, - ) -> Result { - unimplemented!() - } - fn txhashset_receive_ready(&self) -> bool { false } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 5292d4b26..7e75ed476 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -701,12 +701,6 @@ pub trait ChainAdapter: Sync + Send { peer_info: &PeerInfo, ) -> Result; - fn process_add_headers_sync( - &self, - bh: &[core::BlockHeader], - header_cache_size: u64, - ) -> Result; - /// A set of block header has been received, typically in response to a /// block /// header request. @@ -714,7 +708,6 @@ pub trait ChainAdapter: Sync + Send { &self, bh: &[core::BlockHeader], peer_info: &PeerInfo, - header_sync_cache_size: u64, ) -> Result; /// Finds a list of block headers based on the provided locator. Tries to diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 94bdb0250..ac4d0436a 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -57,45 +57,6 @@ const BITMAP_SEGMENT_HEIGHT_RANGE: Range = 9..14; const OUTPUT_SEGMENT_HEIGHT_RANGE: Range = 11..16; const RANGEPROOF_SEGMENT_HEIGHT_RANGE: Range = 7..12; -// NetToChainAdapter need a memory cache to prevent data overloading for network core nodes (non leaf nodes) -// This cache will drop sequence of the events during the second -struct EventCache { - event: RwLock, - time: AtomicI64, -} - -impl EventCache { - fn new() -> Self { - EventCache { - event: RwLock::new(Hash::default()), - time: AtomicI64::new(0), - } - } - - // Check if it is contain the hash value - pub fn contains(&self, hash: &Hash, update: bool) -> bool { - let now = Utc::now().timestamp_millis(); - let time_limit = now - 1000; // lock for a 1 second, should be enough to reduce the load. - if self.time.load(Ordering::Relaxed) < time_limit { - if update { - *(self.event.write()) = *hash; - self.time.store(now, Ordering::Relaxed); - } - return false; - } - - if *self.event.read() == *hash { - true - } else { - if update { - *(self.event.write()) = *hash; - self.time.store(now, Ordering::Relaxed); - } - false - } - } -} - /// Implementation of the NetAdapter for the . Gets notified when new /// blocks and transactions are received and forwards to the chain and pool /// implementations. @@ -110,15 +71,6 @@ where peers: OneTime>, config: ServerConfig, hooks: Vec>, - - // local in mem cache - processed_headers: EventCache, - processed_blocks: EventCache, - processed_transactions: EventCache, - - header_cache: Arc>>, - tip_processed: Arc>, - reset_tip: Arc>, } impl p2p::ChainAdapter for NetToChainAdapter @@ -166,16 +118,6 @@ where return Ok(true); } - let tx_hash = tx.hash(); - // For transaction we allow double processing, we want to be sure that TX will be stored in the pool - // because there is no recovery plan for transactions. So we want to use natural retry to help us handle failures - if self.processed_transactions.contains(&tx_hash, false) { - debug!("transaction_received, cache for {} Rejected", tx_hash); - return Ok(true); - } else { - debug!("transaction_received, cache for {} OK", tx_hash); - } - let source = pool::TxSource::Broadcast; let header = self.chain().head_header()?; @@ -184,12 +126,11 @@ where hook.on_transaction_received(&tx); } + let tx_hash = tx.hash(); + let mut tx_pool = self.tx_pool.write(); match tx_pool.add_to_pool(source, tx, stem, &header) { - Ok(_) => { - self.processed_transactions.contains(&tx_hash, true); - Ok(true) - } + Ok(_) => Ok(true), Err(e) => { debug!("Transaction {} rejected: {:?}", tx_hash, e); Ok(false) @@ -203,14 +144,6 @@ where peer_info: &PeerInfo, opts: chain::Options, ) -> Result { - let b_hash = b.hash(); - if self.processed_blocks.contains(&b_hash, true) { - debug!("block_received, cache for {} Rejected", b_hash); - return Ok(true); - } else { - debug!("block_received, cache for {} OK", b_hash); - } - if self.chain().is_known(&b.header).is_err() { return Ok(true); } @@ -356,13 +289,6 @@ where return Ok(false); } - if self.processed_headers.contains(&bh_hash, true) { - debug!("header_received, cache for {} Rejected", bh_hash); - return Ok(true); - } else { - debug!("header_received, cache for {} OK", bh_hash); - } - if self.chain().block_exists(bh.hash())? { return Ok(true); } @@ -399,7 +325,6 @@ where &self, bhs: &[core::BlockHeader], peer_info: &PeerInfo, - header_cache_size: u64, ) -> Result { info!( "Received {} block headers from {}", @@ -407,132 +332,9 @@ where peer_info.addr ); - let tip_processed = { - let mut tip_processed = self.tip_processed.lock().unwrap(); - let sync_head_height = self.chain().get_sync_head()?.height; - - let mut reset_tip = self.reset_tip.lock().unwrap(); - if *reset_tip != 0 { - warn!( - "reset of tip to {} from {} due to differing headers.", - *reset_tip, *tip_processed - ); - *tip_processed = *reset_tip; - *reset_tip = 0; - } else if *tip_processed < sync_head_height { - *tip_processed = sync_head_height; - } - - *tip_processed - }; - if bhs.is_empty() { return Ok(false); } - let bad_block = Hash::from_hex(crate::chain::BLOCK_TO_BAN)?; - if bhs.iter().find(|h| h.hash() == bad_block).is_some() { - debug!("headers_received: found known bad header, all data is rejected"); - return Ok(false); - } - - info!( - "Received {} block headers from {}, height {}, hash = {}, tip_processed = {}", - bhs.len(), - peer_info.addr, - bhs[0].height, - bhs[0].hash(), - tip_processed, - ); - - if bhs[0].height > tip_processed + 1 { - // we can't process this yet. - // try to process anything in the cache that we can - - if header_cache_size > 0 { - for bh in bhs { - let mut hashmap = self.header_cache.lock().unwrap(); - hashmap.insert(bh.height, bh.clone()); - if bh.height > header_cache_size { - hashmap.remove(&(bh.height - header_cache_size)); - } - } - } - return Ok(true); - } - if header_cache_size > 0 { - let mut itt = tip_processed + 1; - let mut bh_backlog: Vec = Vec::new(); - let mut backlog_processed = false; - loop { - { - let hashmap = self.header_cache.lock().unwrap(); - let next = hashmap.get(&itt); - if !next.is_some() { - break; - } - let next = next.unwrap(); - //info!("adding headers to the backlog: {}", next.height); - bh_backlog.push(next.clone()); - } - - if bh_backlog.len() >= 256 { - // getting too big, process and continue - self.process_add_headers_sync(&bh_backlog.as_slice(), header_cache_size)?; - bh_backlog = Vec::new(); - backlog_processed = true; - } - - itt = itt + 1; - } - - if bh_backlog.len() > 0 { - self.process_add_headers_sync(&bh_backlog.as_slice(), header_cache_size)?; - return Ok(true); - } - if backlog_processed { - return Ok(true); - } - } - - let first_height = bhs[0].height; - for bh in bhs { - if header_cache_size > 0 { - // set highest processed block - let mut hashmap = self.header_cache.lock().unwrap(); - let value = hashmap.get(&bh.height); - if value.is_some() { - // we already have something here. - // does it match? If so return. - let cache_value = value.unwrap(); - if bh.prev_hash == cache_value.prev_hash { - if first_height <= tip_processed { - return Ok(true); - } - } else { - // it doesn't match! there must have - // been a reorg or someone gave us bad headers. - // clear the entire hashmap to be safe. - // go back to previous logic at this point hashmap.clear(); - warn!( - "different header value at height = {}. clearing cache.", - bh.height - ); - hashmap.clear(); - *(self.reset_tip.lock().unwrap()) = first_height - 1; - break; - } - } - } - } - self.process_add_headers_sync(bhs, header_cache_size) - } - - fn process_add_headers_sync( - &self, - bhs: &[core::BlockHeader], - header_cache_size: u64, - ) -> Result { - let mut hashmap = self.header_cache.lock().unwrap(); // Read our sync_head if we are in header_sync. // If not then we can ignore this batch of headers. @@ -544,7 +346,6 @@ where } }; - // try to add headers to our header chain match self .chain() .sync_block_headers(bhs, sync_head, chain::Options::SYNC) @@ -555,19 +356,6 @@ where if let Some(sync_head) = sync_head { self.sync_state.update_header_sync(sync_head); } - - for bh in bhs { - let mut tip_processed = self.tip_processed.lock().unwrap(); - if *tip_processed < bh.height { - *tip_processed = bh.height; - } - if header_cache_size > 0 { - hashmap.insert(bh.height, bh.clone()); - if bh.height > header_cache_size { - hashmap.remove(&(bh.height - header_cache_size)); - } - } - } Ok(true) } Err(e) => { @@ -939,12 +727,6 @@ where peers: OneTime::new(), config, hooks, - processed_headers: EventCache::new(), - processed_blocks: EventCache::new(), - processed_transactions: EventCache::new(), - header_cache: Arc::new(Mutex::new(HashMap::new())), - tip_processed: Arc::new(Mutex::new(0)), - reset_tip: Arc::new(Mutex::new(0)), } } diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 5b0dd23a9..ec1505b0f 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -220,7 +220,7 @@ pub struct ServerConfig { /// Header cache size /// Set to 0 for now - pub header_cache_size: Option, + //pub header_cache_size: Option, /// Invalid Block hash list /// (Default: none) @@ -288,7 +288,7 @@ impl Default for ServerConfig { chain_validation_mode: ChainValidationMode::default(), pool_config: pool::PoolConfig::default(), skip_sync_wait: Some(false), - header_cache_size: Some(0), + // header_cache_size: Some(0), invalid_block_hashes: Some(vec![]), duration_sync_short: Some(30), duration_sync_long: Some(50), diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index e821aedeb..d86b10eb9 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -41,7 +41,6 @@ pub fn connect_and_monitor( seed_list: Box Vec + Send>, config: P2PConfig, stop_state: Arc, - header_cache_size: u64, ) -> std::io::Result> { thread::Builder::new() .name("seed".to_string()) @@ -118,7 +117,6 @@ pub fn connect_and_monitor( p2p_server.clone(), &rx, &mut connecting_history, - header_cache_size, connect_all, ); @@ -339,7 +337,6 @@ fn listen_for_addrs( p2p: Arc, rx: &mpsc::Receiver, connecting_history: &mut HashMap>, - header_cache_size: u64, attempt_all: bool, ) { // Pull everything currently on the queue off the queue. @@ -402,7 +399,7 @@ fn listen_for_addrs( }; if update_possible { - match p2p_c.connect(addr.clone(), header_cache_size) { + match p2p_c.connect(addr.clone()) { Ok(p) => { // If peer advertizes PEER_LIST then ask it for more peers that support PEER_LIST. // We want to build a local db of possible peers to connect to. diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 2e86290f4..50974228e 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -210,7 +210,7 @@ impl Server { stop_state: Option>, api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), ) -> Result { - let header_cache_size = config.header_cache_size.unwrap_or(25_000); + //let header_cache_size = config.header_cache_size.unwrap_or(25_000); //let duration_sync_long = config.duration_sync_long.unwrap_or(150); //let duration_sync_short = config.duration_sync_short.unwrap_or(100); @@ -544,7 +544,6 @@ impl Server { seed_list, config.p2p_config.clone(), stop_state.clone(), - header_cache_size, )?); } @@ -564,7 +563,7 @@ impl Server { let _ = thread::Builder::new() .name("p2p-server".to_string()) .spawn(move || { - if let Err(e) = p2p_inner.listen(header_cache_size) { + if let Err(e) = p2p_inner.listen() { error!("P2P server failed with erorr: {:?}", e); } })?; @@ -780,8 +779,8 @@ impl Server { } /// Asks the server to connect to a peer at the provided network address. - pub fn connect_peer(&self, addr: PeerAddr, header_cache_size: u64) -> Result<(), Error> { - self.p2p.connect(addr, header_cache_size)?; + pub fn connect_peer(&self, addr: PeerAddr) -> Result<(), Error> { + self.p2p.connect(addr)?; Ok(()) } diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index 136c6c6b0..5befc0a3f 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -53,28 +53,10 @@ impl HeaderSync { let do_run = match self.sync_state.status() { SyncStatus::BodySync { .. } | SyncStatus::HeaderSync { .. } - | SyncStatus::TxHashsetDone => true, - SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => { - let sync_head_sync = self.chain.get_sync_head()?; - debug!( - "sync: initial transition to HeaderSync. sync_head: {} at {}, resetting to: {} at {}", - sync_head_sync.hash(), - sync_head_sync.height, - sync_head.hash(), - sync_head.height, - ); - - // Reset sync_head to header_head on transition to HeaderSync, - // but ONLY on initial transition to HeaderSync state. - // - // The header_head and sync_head may diverge here in the presence of a fork - // in the header chain. Ensure we track the new advertised header chain here - // correctly, so reset any previous (and potentially stale) sync_head to match - // our last known "good" header_head. - // - self.chain.rebuild_sync_mmr(&sync_head)?; - true - } + | SyncStatus::TxHashsetDone + | SyncStatus::NoSync + | SyncStatus::Initial + | SyncStatus::AwaitingPeers(_) => true, _ => false, };