diff --git a/blockdb.go b/blockdb.go index f3fbaa2cf0..38b23835b5 100644 --- a/blockdb.go +++ b/blockdb.go @@ -244,7 +244,7 @@ func dumpBlockChain(params *chaincfg.Params, b *blockchain.BlockChain) error { progressLogger.LogBlockHeight(bl.MsgBlock(), tipHeight) } - bmgrLog.Infof("Successfully dumped the blockchain (%v blocks) to %v.", + srvrLog.Infof("Successfully dumped the blockchain (%v blocks) to %v.", tipHeight, cfg.DumpBlockchain) return nil diff --git a/docs/json_rpc_api.mediawiki b/docs/json_rpc_api.mediawiki index d1fe4c520b..675b916b1e 100644 --- a/docs/json_rpc_api.mediawiki +++ b/docs/json_rpc_api.mediawiki @@ -549,7 +549,7 @@ the method name for further details such as parameter and return information. : Dynamically changes the debug logging level. : The levelspec can either be a debug level or of the form =,=,... : The valid debug levels are trace, debug, info, warn, error, and critical. -: The valid subsystems are AMGR, ADXR, BCDB, BMGR, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, and TXMP. +: The valid subsystems are AMGR, ADXR, BCDB, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, SYNC, and TXMP. : Additionally, the special keyword show can be used to get a list of the available subsystems. |- !Returns @@ -559,7 +559,7 @@ the method name for further details such as parameter and return information. |Done. |- !Example show Return -|Supported subsystems [AMGR ADXR BCDB BMGR DCRD CHAN DISC PEER RPCS SCRP SRVR TXMP] +|Supported subsystems [AMGR ADXR BCDB DCRD CHAN DISC PEER RPCS SCRP SRVR SYNC TXMP] |} ---- diff --git a/internal/fees/estimator.go b/internal/fees/estimator.go index aeee8837ed..7bfe90ef36 100644 --- a/internal/fees/estimator.go +++ b/internal/fees/estimator.go @@ -623,11 +623,11 @@ func (stats *Estimator) removeFromMemPool(blocksInMemPool int32, rate feeRate) { // function but not on a previous newMemPoolTx. This leaves the fee db // in an undefined state and should never happen in regular use. If this // happens, then there is a logic or coding error somewhere, either in - // the estimator itself or on its hooking to the mempool/blockmanager. - // Either way, the easiest way to fix this is to completely delete the - // database and start again. - // During development, you can use a panic() here and we might return it - // after being confident that the estimator is completely bug free. + // the estimator itself or on its hooking to the mempool/network sync + // manager. Either way, the easiest way to fix this is to completely + // delete the database and start again. During development, you can use + // a panic() here and we might return it after being confident that the + // estimator is completely bug free. log.Errorf("Transaction count in bucket index %d and confirmation "+ "index %d became < 0", bucketIdx, confirmIdx) } diff --git a/internal/mining/interface.go b/internal/mining/interface.go index 998f91ef63..2a18271b77 100644 --- a/internal/mining/interface.go +++ b/internal/mining/interface.go @@ -67,7 +67,7 @@ type blockManagerFacade interface { // best chain. ForceReorganization(formerBest, newBest chainhash.Hash) error - // IsCurrent returns whether or not the block manager believes it is synced - // with the connected peers. + // IsCurrent returns whether or not the net sync manager believes it is + // synced with the connected peers. IsCurrent() bool } diff --git a/internal/netsync/README.md b/internal/netsync/README.md new file mode 100644 index 0000000000..3391fad683 --- /dev/null +++ b/internal/netsync/README.md @@ -0,0 +1,21 @@ +netsync +======= + +[![Build Status](https://github.com/decred/dcrd/workflows/Build%20and%20Test/badge.svg)](https://github.com/decred/dcrd/actions) +[![ISC License](https://img.shields.io/badge/license-ISC-blue.svg)](http://copyfree.org) +[![Doc](https://img.shields.io/badge/doc-reference-blue.svg)](https://pkg.go.dev/github.com/decred/dcrd/internal/netsync) + +Package netsync implements a concurrency safe block syncing protocol. + +## Overview + +The provided implementation of SyncManager communicates with connected peers to +perform an initial block download, keep the chain in sync, and announce new +blocks connected to the chain. Currently the sync manager selects a single sync +peer that it downloads all blocks from until it is up to date with the longest +chain the sync peer is aware of. + +## License + +Package netsync is licensed under the [copyfree](http://copyfree.org) ISC +License. diff --git a/internal/netsync/doc.go b/internal/netsync/doc.go new file mode 100644 index 0000000000..5fde072364 --- /dev/null +++ b/internal/netsync/doc.go @@ -0,0 +1,14 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +/* +Package netsync implements a concurrency safe block syncing protocol. + +The provided implementation of SyncManager communicates with connected peers to +perform an initial block download, keep the chain in sync, and announce new +blocks connected to the chain. Currently the sync manager selects a single sync +peer that it downloads all blocks from until it is up to date with the longest +chain the sync peer is aware of. +*/ +package netsync diff --git a/internal/netsync/interface.go b/internal/netsync/interface.go new file mode 100644 index 0000000000..4a9ac79b55 --- /dev/null +++ b/internal/netsync/interface.go @@ -0,0 +1,23 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package netsync + +import ( + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/dcrutil/v3" + "github.com/decred/dcrd/peer/v2" +) + +// PeerNotifier provides an interface to notify peers of status changes related +// to blocks and transactions. +type PeerNotifier interface { + // AnnounceNewTransactions generates and relays inventory vectors and + // notifies websocket clients of the passed transactions. + AnnounceNewTransactions(txns []*dcrutil.Tx) + + // UpdatePeerHeights updates the heights of all peers who have announced the + // latest connected main chain block, or a recognized orphan. + UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int64, updateSource *peer.Peer) +} diff --git a/internal/netsync/log.go b/internal/netsync/log.go new file mode 100644 index 0000000000..cbe84d3a1e --- /dev/null +++ b/internal/netsync/log.go @@ -0,0 +1,22 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package netsync + +import ( + "github.com/decred/slog" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +// The default amount of logging is none. +var log = slog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using slog. +func UseLogger(logger slog.Logger) { + log = logger +} diff --git a/blockmanager.go b/internal/netsync/manager.go similarity index 70% rename from blockmanager.go rename to internal/netsync/manager.go index 177cc3499b..ce70b197ec 100644 --- a/blockmanager.go +++ b/internal/netsync/manager.go @@ -3,7 +3,7 @@ // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. -package main +package netsync import ( "container/list" @@ -13,7 +13,6 @@ import ( "sync/atomic" "time" - "github.com/decred/dcrd/blockchain/standalone/v2" "github.com/decred/dcrd/blockchain/v4" "github.com/decred/dcrd/chaincfg/chainhash" "github.com/decred/dcrd/chaincfg/v3" @@ -108,7 +107,7 @@ type getSyncPeerMsg struct { // requestFromPeerMsg is a message type to be sent across the message channel // for requesting either blocks or transactions from a given peer. It routes -// this through the block manager so the block manager doesn't ban the peer +// this through the sync manager so the sync manager doesn't ban the peer // when it sends this information back. type requestFromPeerMsg struct { peer *peerpkg.Peer @@ -197,56 +196,7 @@ type headerNode struct { hash *chainhash.Hash } -// peerNotifier provides an interface for server peer notifications. -type peerNotifier interface { - // AnnounceNewTransactions generates and relays inventory vectors and - // notifies websocket clients of the passed transactions. - AnnounceNewTransactions(txns []*dcrutil.Tx) - - // UpdatePeerHeights updates the heights of all peers who have - // announced the latest connected main chain block, or a recognized orphan. - UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int64, updateSource *peerpkg.Peer) -} - -// blockManangerConfig is a configuration struct for a blockManager. -type blockManagerConfig struct { - PeerNotifier peerNotifier - - // The following fields are for accessing the chain and its configuration. - Chain *blockchain.BlockChain - ChainParams *chaincfg.Params - SubsidyCache *standalone.SubsidyCache - - // SigCache defines the signature cache to use. - SigCache *txscript.SigCache - - // The following field provides access to the mempool. - TxMemPool *mempool.TxPool - - // RpcServer returns an instance of an RPC server to use for notifications. - // It may return nil if there is no active RPC server. - RpcServer func() *rpcserver.Server - - // DisableCheckpoints indicates whether or not the block manager should make - // use of checkpoints. - DisableCheckpoints bool - - // NoMiningStateSync indicates whether or not the block manager should - // perform an initial mining state synchronization with peers once they are - // believed to be fully synced. - NoMiningStateSync bool - - // MaxPeers specifies the maximum number of peers the server is expected to - // be connected with. It is primarily used as a hint for more efficient - // synchronization. - MaxPeers int - - // MaxOrphanTxs specifies the maximum number of orphan transactions the - // transaction pool associated with the server supports. - MaxOrphanTxs int -} - -// peerSyncState stores additional information that the blockManager tracks +// peerSyncState stores additional information that the sync manager tracks // about a peer. type peerSyncState struct { syncCandidate bool @@ -262,10 +212,10 @@ type orphanBlock struct { expiration time.Time } -// blockManager provides a concurrency safe block manager for handling all +// SyncManager provides a concurrency safe sync manager for handling all // incoming blocks. -type blockManager struct { - cfg *blockManagerConfig +type SyncManager struct { + cfg Config started int32 shutdown int32 rejectedTxns map[chainhash.Hash]struct{} @@ -304,33 +254,33 @@ type blockManager struct { // resetHeaderState sets the headers-first mode state to values appropriate for // syncing from a new peer. -func (b *blockManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int64) { - b.headersFirstMode = false - b.headerList.Init() - b.startHeader = nil +func (m *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int64) { + m.headersFirstMode = false + m.headerList.Init() + m.startHeader = nil // When there is a next checkpoint, add an entry for the latest known // block into the header pool. This allows the next downloaded header // to prove it links to the chain properly. - if b.nextCheckpoint != nil { + if m.nextCheckpoint != nil { node := headerNode{height: newestHeight, hash: newestHash} - b.headerList.PushBack(&node) + m.headerList.PushBack(&node) } } // SyncHeight returns latest known block being synced to. -func (b *blockManager) SyncHeight() int64 { - b.syncHeightMtx.Lock() - defer b.syncHeightMtx.Unlock() - return b.syncHeight +func (m *SyncManager) SyncHeight() int64 { + m.syncHeightMtx.Lock() + defer m.syncHeightMtx.Unlock() + return m.syncHeight } // findNextHeaderCheckpoint returns the next checkpoint after the passed height. // It returns nil when there is not one either because the height is already // later than the final checkpoint or some other reason such as disabled // checkpoints. -func (b *blockManager) findNextHeaderCheckpoint(height int64) *chaincfg.Checkpoint { - checkpoints := b.cfg.Chain.Checkpoints() +func (m *SyncManager) findNextHeaderCheckpoint(height int64) *chaincfg.Checkpoint { + checkpoints := m.cfg.Chain.Checkpoints() if len(checkpoints) == 0 { return nil } @@ -371,15 +321,15 @@ func chainBlockLocatorToHashes(locator blockchain.BlockLocator) []chainhash.Hash // download/sync the blockchain from. When syncing is already running, it // simply returns. It also examines the candidates for any which are no longer // candidates and removes them as needed. -func (b *blockManager) startSync() { +func (m *SyncManager) startSync() { // Return now if we're already syncing. - if b.syncPeer != nil { + if m.syncPeer != nil { return } - best := b.cfg.Chain.BestSnapshot() + best := m.cfg.Chain.BestSnapshot() var bestPeer *peerpkg.Peer - for peer, state := range b.peerStates { + for peer, state := range m.peerStates { if !state.syncCandidate { continue } @@ -408,9 +358,9 @@ func (b *blockManager) startSync() { // fully synced to whatever the chain believes when there is no candidate // for a sync peer. if bestPeer == nil { - b.isCurrentMtx.Lock() - b.isCurrent = b.cfg.Chain.IsCurrent() - b.isCurrentMtx.Unlock() + m.isCurrentMtx.Lock() + m.isCurrent = m.cfg.Chain.IsCurrent() + m.isCurrentMtx.Unlock() } // Start syncing from the best peer if one was selected. @@ -418,25 +368,25 @@ func (b *blockManager) startSync() { // Clear the requestedBlocks if the sync peer changes, otherwise // we may ignore blocks we need that the last sync peer failed // to send. - b.requestedBlocks = make(map[chainhash.Hash]struct{}) + m.requestedBlocks = make(map[chainhash.Hash]struct{}) - blkLocator, err := b.cfg.Chain.LatestBlockLocator() + blkLocator, err := m.cfg.Chain.LatestBlockLocator() if err != nil { - bmgrLog.Errorf("Failed to get block locator for the "+ - "latest block: %v", err) + log.Errorf("Failed to get block locator for the latest block: %v", + err) return } locator := chainBlockLocatorToHashes(blkLocator) - bmgrLog.Infof("Syncing to block height %d from peer %v", + log.Infof("Syncing to block height %d from peer %v", bestPeer.LastBlock(), bestPeer.Addr()) // The chain is not synced whenever the current best height is less than // the height to sync to. if best.Height < bestPeer.LastBlock() { - b.isCurrentMtx.Lock() - b.isCurrent = false - b.isCurrentMtx.Unlock() + m.isCurrentMtx.Lock() + m.isCurrent = false + m.isCurrentMtx.Unlock() } // When the current height is less than a known checkpoint we @@ -456,60 +406,59 @@ func (b *blockManager) startSync() { // and fully validate them. Finally, regression test mode does // not support the headers-first approach so do normal block // downloads when in regression test mode. - if b.nextCheckpoint != nil && - best.Height < b.nextCheckpoint.Height && - !b.cfg.DisableCheckpoints { + if m.nextCheckpoint != nil && + best.Height < m.nextCheckpoint.Height && + !m.cfg.DisableCheckpoints { - err := bestPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := bestPeer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) if err != nil { - bmgrLog.Errorf("Failed to push getheadermsg for the "+ - "latest blocks: %v", err) + log.Errorf("Failed to push getheadermsg for the latest "+ + "blocks: %v", err) return } - b.headersFirstMode = true - bmgrLog.Infof("Downloading headers for blocks %d to "+ - "%d from peer %s", best.Height+1, - b.nextCheckpoint.Height, bestPeer.Addr()) + m.headersFirstMode = true + log.Infof("Downloading headers for blocks %d to %d from peer %s", + best.Height+1, m.nextCheckpoint.Height, bestPeer.Addr()) } else { err := bestPeer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { - bmgrLog.Errorf("Failed to push getblocksmsg for the "+ - "latest blocks: %v", err) + log.Errorf("Failed to push getblocksmsg for the latest "+ + "blocks: %v", err) return } } - b.syncPeer = bestPeer - b.syncHeightMtx.Lock() - b.syncHeight = bestPeer.LastBlock() - b.syncHeightMtx.Unlock() + m.syncPeer = bestPeer + m.syncHeightMtx.Lock() + m.syncHeight = bestPeer.LastBlock() + m.syncHeightMtx.Unlock() } else { - bmgrLog.Warnf("No sync peer candidates available") + log.Warnf("No sync peer candidates available") } } // isSyncCandidate returns whether or not the peer is a candidate to consider // syncing from. -func (b *blockManager) isSyncCandidate(peer *peerpkg.Peer) bool { +func (m *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { // The peer is not a candidate for sync if it's not a full node. return peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork } -// syncMiningStateAfterSync polls the blockManager for the current sync -// state; if the manager is synced, it executes a call to the peer to -// sync the mining state to the network. -func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) { +// syncMiningStateAfterSync polls the sync manager for the current sync state +// and once the manager believes the chain is fully synced, it executes a call +// to the peer to sync the mining state. +func (m *SyncManager) syncMiningStateAfterSync(peer *peerpkg.Peer) { go func() { for { select { case <-time.After(3 * time.Second): - case <-b.quit: + case <-m.quit: return } if !peer.Connected() { return } - if !b.IsCurrent() { + if !m.IsCurrent() { continue } @@ -532,9 +481,8 @@ func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) { wire.InitStateHeadBlockVotes, wire.InitStateTSpends) if err != nil { - bmgrLog.Errorf("Unexpected error while "+ - "building getinitstate msg: %v", - err) + log.Errorf("Unexpected error while building getinitstate "+ + "msg: %v", err) return } msg = m @@ -549,30 +497,30 @@ func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) { // handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. -func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { +func (m *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { // Ignore if in the process of shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - bmgrLog.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) + log.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) // Initialize the peer state - isSyncCandidate := b.isSyncCandidate(peer) - b.peerStates[peer] = &peerSyncState{ + isSyncCandidate := m.isSyncCandidate(peer) + m.peerStates[peer] = &peerSyncState{ syncCandidate: isSyncCandidate, requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), } // Start syncing by choosing the best candidate if needed. - if isSyncCandidate && b.syncPeer == nil { - b.startSync() + if isSyncCandidate && m.syncPeer == nil { + m.startSync() } // Grab the mining state from this peer once synced when enabled. - if !b.cfg.NoMiningStateSync { - b.syncMiningStateAfterSync(peer) + if !m.cfg.NoMiningStateSync { + m.syncMiningStateAfterSync(peer) } } @@ -580,20 +528,20 @@ func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { // removes the peer as a candidate for syncing and in the case where it was // the current sync peer, attempts to select a new best peer to sync from. It // is invoked from the syncHandler goroutine. -func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) { - state, exists := b.peerStates[peer] +func (m *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) { + state, exists := m.peerStates[peer] if !exists { - bmgrLog.Warnf("Received done peer message for unknown peer %s", peer) + log.Warnf("Received done peer message for unknown peer %s", peer) return } // Remove the peer from the list of candidate peers. - delete(b.peerStates, peer) + delete(m.peerStates, peer) // Remove requested transactions from the global map so that they will // be fetched from elsewhere next time we get an inv. for txHash := range state.requestedTxns { - delete(b.requestedTxns, txHash) + delete(m.requestedTxns, txHash) } // Remove requested blocks from the global map so that they will be @@ -601,19 +549,19 @@ func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) { // TODO(oga) we could possibly here check which peers have these blocks // and request them now to speed things up a little. for blockHash := range state.requestedBlocks { - delete(b.requestedBlocks, blockHash) + delete(m.requestedBlocks, blockHash) } // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the headers-first state if in headers-first // mode so - if b.syncPeer == peer { - b.syncPeer = nil - if b.headersFirstMode { - best := b.cfg.Chain.BestSnapshot() - b.resetHeaderState(&best.Hash, best.Height) + if m.syncPeer == peer { + m.syncPeer = nil + if m.headersFirstMode { + best := m.cfg.Chain.BestSnapshot() + m.resetHeaderState(&best.Hash, best.Height) } - b.startSync() + m.startSync() } } @@ -685,11 +633,11 @@ func errToWireRejectCode(err error) (wire.RejectCode, string) { } // handleTxMsg handles transaction messages from all peers. -func (b *blockManager) handleTxMsg(tmsg *txMsg) { +func (m *SyncManager) handleTxMsg(tmsg *txMsg) { peer := tmsg.peer - state, exists := b.peerStates[peer] + state, exists := m.peerStates[peer] if !exists { - bmgrLog.Warnf("Received tx message from unknown peer %s", peer) + log.Warnf("Received tx message from unknown peer %s", peer) return } @@ -706,16 +654,16 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Ignore transactions that we have already rejected. Do not // send a reject message here because if the transaction was already // rejected, the transaction was unsolicited. - if _, exists = b.rejectedTxns[*txHash]; exists { - bmgrLog.Debugf("Ignoring unsolicited previously rejected "+ - "transaction %v from %s", txHash, peer) + if _, exists = m.rejectedTxns[*txHash]; exists { + log.Debugf("Ignoring unsolicited previously rejected transaction %v "+ + "from %s", txHash, peer) return } // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. - allowOrphans := b.cfg.MaxOrphanTxs > 0 - acceptedTxs, err := b.cfg.TxMemPool.ProcessTransaction(tmsg.tx, + allowOrphans := m.cfg.MaxOrphanTxs > 0 + acceptedTxs, err := m.cfg.TxMemPool.ProcessTransaction(tmsg.tx, allowOrphans, true, true, mempool.Tag(tmsg.peer.ID())) // Remove transaction from request maps. Either the mempool/chain @@ -723,12 +671,12 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // instances of trying to fetch it, or we failed to insert and thus // we'll retry next time we get an inv. delete(state.requestedTxns, *txHash) - delete(b.requestedTxns, *txHash) + delete(m.requestedTxns, *txHash) if err != nil { // Do not request this transaction again until a new block // has been processed. - limitAdd(b.rejectedTxns, *txHash, maxRejectedTxns) + limitAdd(m.rejectedTxns, *txHash, maxRejectedTxns) // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -736,11 +684,9 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // so log it as an actual error. var rErr mempool.RuleError if errors.As(err, &rErr) { - bmgrLog.Debugf("Rejected transaction %v from %s: %v", - txHash, peer, err) + log.Debugf("Rejected transaction %v from %s: %v", txHash, peer, err) } else { - bmgrLog.Errorf("Failed to process transaction %v: %v", - txHash, err) + log.Errorf("Failed to process transaction %v: %v", txHash, err) } // Convert the error into an appropriate reject message and @@ -750,7 +696,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { return } - b.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) + m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) } // isKnownOrphan returns whether the passed hash is currently a known orphan. @@ -762,12 +708,12 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // orphans and react accordingly. // // This function is safe for concurrent access. -func (b *blockManager) isKnownOrphan(hash *chainhash.Hash) bool { +func (m *SyncManager) isKnownOrphan(hash *chainhash.Hash) bool { // Protect concurrent access. Using a read lock only so multiple readers // can query without blocking each other. - b.orphanLock.RLock() - _, exists := b.orphans[*hash] - b.orphanLock.RUnlock() + m.orphanLock.RLock() + _, exists := m.orphans[*hash] + m.orphanLock.RUnlock() return exists } @@ -775,18 +721,18 @@ func (b *blockManager) isKnownOrphan(hash *chainhash.Hash) bool { // of orphan blocks. // // This function is safe for concurrent access. -func (b *blockManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash { +func (m *SyncManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash { // Protect concurrent access. Using a read lock only so multiple // readers can query without blocking each other. - b.orphanLock.RLock() - defer b.orphanLock.RUnlock() + m.orphanLock.RLock() + defer m.orphanLock.RUnlock() // Keep looping while the parent of each orphaned block is known and is an // orphan itself. orphanRoot := hash prevHash := hash for { - orphan, exists := b.orphans[*prevHash] + orphan, exists := m.orphans[*prevHash] if !exists { break } @@ -799,21 +745,21 @@ func (b *blockManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash { // removeOrphanBlock removes the passed orphan block from the orphan pool and // previous orphan index. -func (b *blockManager) removeOrphanBlock(orphan *orphanBlock) { +func (m *SyncManager) removeOrphanBlock(orphan *orphanBlock) { // Protect concurrent access. - b.orphanLock.Lock() - defer b.orphanLock.Unlock() + m.orphanLock.Lock() + defer m.orphanLock.Unlock() // Remove the orphan block from the orphan pool. orphanHash := orphan.block.Hash() - delete(b.orphans, *orphanHash) + delete(m.orphans, *orphanHash) // Remove the reference from the previous orphan index too. An indexing // for loop is intentionally used over a range here as range does not // reevaluate the slice on each iteration nor does it adjust the index // for the modified slice. prevHash := &orphan.block.MsgBlock().Header.PrevBlock - orphans := b.prevOrphans[*prevHash] + orphans := m.prevOrphans[*prevHash] for i := 0; i < len(orphans); i++ { hash := orphans[i].block.Hash() if hash.IsEqual(orphanHash) { @@ -823,12 +769,12 @@ func (b *blockManager) removeOrphanBlock(orphan *orphanBlock) { i-- } } - b.prevOrphans[*prevHash] = orphans + m.prevOrphans[*prevHash] = orphans // Remove the map entry altogether if there are no longer any orphans // which depend on the parent hash. - if len(b.prevOrphans[*prevHash]) == 0 { - delete(b.prevOrphans, *prevHash) + if len(m.prevOrphans[*prevHash]) == 0 { + delete(m.prevOrphans, *prevHash) } } @@ -837,34 +783,34 @@ func (b *blockManager) removeOrphanBlock(orphan *orphanBlock) { // any expired blocks so a separate cleanup poller doesn't need to be run. It // also imposes a maximum limit on the number of outstanding orphan blocks and // will remove the oldest received orphan block if the limit is exceeded. -func (b *blockManager) addOrphanBlock(block *dcrutil.Block) { +func (m *SyncManager) addOrphanBlock(block *dcrutil.Block) { // Remove expired orphan blocks. - for _, oBlock := range b.orphans { + for _, oBlock := range m.orphans { if time.Now().After(oBlock.expiration) { - b.removeOrphanBlock(oBlock) + m.removeOrphanBlock(oBlock) continue } // Update the oldest orphan block pointer so it can be discarded // in case the orphan pool fills up. - if b.oldestOrphan == nil || - oBlock.expiration.Before(b.oldestOrphan.expiration) { - b.oldestOrphan = oBlock + if m.oldestOrphan == nil || + oBlock.expiration.Before(m.oldestOrphan.expiration) { + m.oldestOrphan = oBlock } } // Limit orphan blocks to prevent memory exhaustion. - if len(b.orphans)+1 > maxOrphanBlocks { + if len(m.orphans)+1 > maxOrphanBlocks { // Remove the oldest orphan to make room for the new one. - b.removeOrphanBlock(b.oldestOrphan) - b.oldestOrphan = nil + m.removeOrphanBlock(m.oldestOrphan) + m.oldestOrphan = nil } // Protect concurrent access. This is intentionally done here instead // of near the top since removeOrphanBlock does its own locking and // the range iterator is not invalidated by removing map entries. - b.orphanLock.Lock() - defer b.orphanLock.Unlock() + m.orphanLock.Lock() + defer m.orphanLock.Unlock() // Insert the block into the orphan map with an expiration time // 1 hour from now. @@ -873,11 +819,11 @@ func (b *blockManager) addOrphanBlock(block *dcrutil.Block) { block: block, expiration: expiration, } - b.orphans[*block.Hash()] = oBlock + m.orphans[*block.Hash()] = oBlock // Add to previous hash lookup index for faster dependency lookups. prevHash := &block.MsgBlock().Header.PrevBlock - b.prevOrphans[*prevHash] = append(b.prevOrphans[*prevHash], oBlock) + m.prevOrphans[*prevHash] = append(m.prevOrphans[*prevHash], oBlock) } // processOrphans determines if there are any orphans which depend on the passed @@ -887,7 +833,7 @@ func (b *blockManager) addOrphanBlock(block *dcrutil.Block) { // // The flags do not modify the behavior of this function directly, however they // are needed to pass along to maybeAcceptBlock. -func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.BehaviorFlags) error { +func (m *SyncManager) processOrphans(hash *chainhash.Hash, flags blockchain.BehaviorFlags) error { // Start with processing at least the passed hash. Leave a little room for // additional orphan blocks that need to be processed without needing to // grow the array in the common case. @@ -906,21 +852,21 @@ func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beh // is intentionally used over a range here as range does not reevaluate // the slice on each iteration nor does it adjust the index for the // modified slice. - for i := 0; i < len(b.prevOrphans[*processHash]); i++ { - orphan := b.prevOrphans[*processHash][i] + for i := 0; i < len(m.prevOrphans[*processHash]); i++ { + orphan := m.prevOrphans[*processHash][i] if orphan == nil { - bmgrLog.Warnf("Found a nil entry at index %d in the orphan "+ + log.Warnf("Found a nil entry at index %d in the orphan "+ "dependency list for block %v", i, processHash) continue } // Remove the orphan from the orphan pool. orphanHash := orphan.block.Hash() - b.removeOrphanBlock(orphan) + m.removeOrphanBlock(orphan) i-- // Potentially accept the block into the block chain. - _, err := b.cfg.Chain.ProcessBlock(orphan.block, flags) + _, err := m.cfg.Chain.ProcessBlock(orphan.block, flags) if err != nil { return err } @@ -944,17 +890,17 @@ func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beh // whether or not the block is an orphan, in which case the fork length will // also be zero as expected, because it, by definition, does not connect to the // best chain. -func (b *blockManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, bool, error) { +func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, bool, error) { // Process the block to include validation, best chain selection, etc. // - // Also, keep track of orphan blocks in the block manager when the error + // Also, keep track of orphan blocks in the sync manager when the error // returned indicates the block is an orphan. blockHash := block.Hash() - forkLen, err := b.cfg.Chain.ProcessBlock(block, flags) + forkLen, err := m.cfg.Chain.ProcessBlock(block, flags) if errors.Is(err, blockchain.ErrMissingParent) { - bmgrLog.Infof("Adding orphan block %v with parent %v", blockHash, + log.Infof("Adding orphan block %v with parent %v", blockHash, block.MsgBlock().Header.PrevBlock) - b.addOrphanBlock(block) + m.addOrphanBlock(block) // The fork length of orphans is unknown since they, by definition, do // not connect to the best chain. @@ -966,37 +912,37 @@ func (b *blockManager) processBlockAndOrphans(block *dcrutil.Block, flags blockc // Accept any orphan blocks that depend on this block (they are no longer // orphans) and repeat for those accepted blocks until there are no more. - if err := b.processOrphans(blockHash, flags); err != nil { + if err := m.processOrphans(blockHash, flags); err != nil { return 0, false, err } // The chain is considered synced once both the blockchain believes it is // current and the sync height is reached or exceeded. - best := b.cfg.Chain.BestSnapshot() - syncHeight := b.SyncHeight() - if best.Height >= syncHeight && b.cfg.Chain.IsCurrent() { - b.isCurrentMtx.Lock() - b.isCurrent = true - b.isCurrentMtx.Unlock() + best := m.cfg.Chain.BestSnapshot() + syncHeight := m.SyncHeight() + if best.Height >= syncHeight && m.cfg.Chain.IsCurrent() { + m.isCurrentMtx.Lock() + m.isCurrent = true + m.isCurrentMtx.Unlock() } return forkLen, false, nil } // handleBlockMsg handles block messages from all peers. -func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { +func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { peer := bmsg.peer - state, exists := b.peerStates[peer] + state, exists := m.peerStates[peer] if !exists { - bmgrLog.Warnf("Received block message from unknown peer %s", peer) + log.Warnf("Received block message from unknown peer %s", peer) return } // If we didn't ask for this block then the peer is misbehaving. blockHash := bmsg.block.Hash() if _, exists := state.requestedBlocks[*blockHash]; !exists { - bmgrLog.Warnf("Got unrequested block %v from %s -- "+ - "disconnecting", blockHash, bmsg.peer.Addr()) + log.Warnf("Got unrequested block %v from %s -- disconnecting", + blockHash, bmsg.peer.Addr()) bmsg.peer.Disconnect() return } @@ -1010,16 +956,16 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // properly. isCheckpointBlock := false behaviorFlags := blockchain.BFNone - if b.headersFirstMode { - firstNodeEl := b.headerList.Front() + if m.headersFirstMode { + firstNodeEl := m.headerList.Front() if firstNodeEl != nil { firstNode := firstNodeEl.Value.(*headerNode) if blockHash.IsEqual(firstNode.hash) { behaviorFlags |= blockchain.BFFastAdd - if firstNode.hash.IsEqual(b.nextCheckpoint.Hash) { + if firstNode.hash.IsEqual(m.nextCheckpoint.Hash) { isCheckpointBlock = true } else { - b.headerList.Remove(firstNodeEl) + m.headerList.Remove(firstNodeEl) } } } @@ -1029,11 +975,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // so we shouldn't have any more instances of trying to fetch it, or we // will fail the insert and thus we'll retry next time we get an inv. delete(state.requestedBlocks, *blockHash) - delete(b.requestedBlocks, *blockHash) + delete(m.requestedBlocks, *blockHash) // Process the block to include validation, best chain selection, orphan // handling, etc. - forkLen, isOrphan, err := b.processBlockAndOrphans(bmsg.block, behaviorFlags) + forkLen, isOrphan, err := m.processBlockAndOrphans(bmsg.block, behaviorFlags) if err != nil { // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log @@ -1041,16 +987,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // it as an actual error. var rErr blockchain.RuleError if errors.As(err, &rErr) { - bmgrLog.Infof("Rejected block %v from %s: %v", blockHash, - peer, err) + log.Infof("Rejected block %v from %s: %v", blockHash, peer, err) } else { - bmgrLog.Errorf("Failed to process block %v: %v", - blockHash, err) + log.Errorf("Failed to process block %v: %v", blockHash, err) } var dbErr database.Error if errors.As(err, &dbErr) && dbErr.ErrorCode == database.ErrCorruption { - bmgrLog.Errorf("Critical failure: %v", dbErr.Error()) + log.Errorf("Critical failure: %v", err) } // Convert the error into an appropriate reject message and @@ -1063,29 +1007,29 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Request the parents for the orphan block from the peer that sent it. onMainChain := !isOrphan && forkLen == 0 if isOrphan { - orphanRoot := b.orphanRoot(blockHash) - blkLocator, err := b.cfg.Chain.LatestBlockLocator() + orphanRoot := m.orphanRoot(blockHash) + blkLocator, err := m.cfg.Chain.LatestBlockLocator() if err != nil { - bmgrLog.Warnf("Failed to get block locator for the "+ - "latest block: %v", err) + log.Warnf("Failed to get block locator for the latest block: %v", + err) } else { locator := chainBlockLocatorToHashes(blkLocator) err = peer.PushGetBlocksMsg(locator, orphanRoot) if err != nil { - bmgrLog.Warnf("Failed to push getblocksmsg for the "+ - "latest block: %v", err) + log.Warnf("Failed to push getblocksmsg for the latest block: "+ + "%v", err) } } } else { // When the block is not an orphan, log information about it and // update the chain state. - b.progressLogger.LogBlockHeight(bmsg.block.MsgBlock(), b.SyncHeight()) + m.progressLogger.LogBlockHeight(bmsg.block.MsgBlock(), m.SyncHeight()) if onMainChain { // Notify stake difficulty subscribers and prune invalidated // transactions. - best := b.cfg.Chain.BestSnapshot() - if r := b.cfg.RpcServer(); r != nil { + best := m.cfg.Chain.BestSnapshot() + if r := m.cfg.RpcServer(); r != nil { // Update registered websocket clients on the // current stake difficulty. r.NotifyStakeDifficulty( @@ -1095,14 +1039,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { StakeDifficulty: best.NextStakeDiff, }) } - b.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) - b.cfg.TxMemPool.PruneExpiredTx() + m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) + m.cfg.TxMemPool.PruneExpiredTx() // Clear the rejected transactions. - b.rejectedTxns = make(map[chainhash.Hash]struct{}) + m.rejectedTxns = make(map[chainhash.Hash]struct{}) // Proactively evict SigCache entries. - b.proactivelyEvictSigCacheEntries(best.Height) + m.proactivelyEvictSigCacheEntries(best.Height) } } @@ -1115,13 +1059,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // chain was not yet current or lost the lock announcement race. blockHeight := int64(bmsg.block.MsgBlock().Header.Height) peer.UpdateLastBlockHeight(blockHeight) - if isOrphan || (onMainChain && b.IsCurrent()) { - go b.cfg.PeerNotifier.UpdatePeerHeights(blockHash, blockHeight, + if isOrphan || (onMainChain && m.IsCurrent()) { + go m.cfg.PeerNotifier.UpdatePeerHeights(blockHash, blockHeight, bmsg.peer) } // Nothing more to do if we aren't in headers-first mode. - if !b.headersFirstMode { + if !m.headersFirstMode { return } @@ -1129,9 +1073,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // request more blocks using the header list when the request queue is // getting short. if !isCheckpointBlock { - if b.startHeader != nil && + if m.startHeader != nil && len(state.requestedBlocks) < minInFlightBlocks { - b.fetchHeaderBlocks() + m.fetchHeaderBlocks() } return } @@ -1140,32 +1084,32 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // there is a next checkpoint, get the next round of headers by asking // for headers starting from the block after this one up to the next // checkpoint. - prevHeight := b.nextCheckpoint.Height - prevHash := b.nextCheckpoint.Hash - b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight) - if b.nextCheckpoint != nil { + prevHeight := m.nextCheckpoint.Height + prevHash := m.nextCheckpoint.Hash + m.nextCheckpoint = m.findNextHeaderCheckpoint(prevHeight) + if m.nextCheckpoint != nil { locator := []chainhash.Hash{*prevHash} - err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) if err != nil { - bmgrLog.Warnf("Failed to send getheaders message to "+ - "peer %s: %v", peer.Addr(), err) + log.Warnf("Failed to send getheaders message to peer %s: %v", + peer.Addr(), err) return } - bmgrLog.Infof("Downloading headers for blocks %d to %d from peer %s", - prevHeight+1, b.nextCheckpoint.Height, b.syncPeer.Addr()) + log.Infof("Downloading headers for blocks %d to %d from peer %s", + prevHeight+1, m.nextCheckpoint.Height, m.syncPeer.Addr()) return } // This is headers-first mode, the block is a checkpoint, and there are // no more checkpoints, so switch to normal mode by requesting blocks // from the block after this one up to the end of the chain (zero hash). - b.headersFirstMode = false - b.headerList.Init() - bmgrLog.Infof("Reached the final checkpoint -- switching to normal mode") + m.headersFirstMode = false + m.headerList.Init() + log.Infof("Reached the final checkpoint -- switching to normal mode") locator := []chainhash.Hash{*blockHash} err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { - bmgrLog.Warnf("Failed to send getblocks message to peer %s: %v", + log.Warnf("Failed to send getblocks message to peer %s: %v", peer.Addr(), err) return } @@ -1174,88 +1118,87 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // proactivelyEvictSigCacheEntries fetches the block that is // txscript.ProactiveEvictionDepth levels deep from bestHeight and passes it to // SigCache to evict the entries associated with the transactions in that block. -func (b *blockManager) proactivelyEvictSigCacheEntries(bestHeight int64) { +func (m *SyncManager) proactivelyEvictSigCacheEntries(bestHeight int64) { // Nothing to do before the eviction depth is reached. if bestHeight <= txscript.ProactiveEvictionDepth { return } evictHeight := bestHeight - txscript.ProactiveEvictionDepth - block, err := b.cfg.Chain.BlockByHeight(evictHeight) + block, err := m.cfg.Chain.BlockByHeight(evictHeight) if err != nil { - bmgrLog.Warnf("Failed to retrieve the block at height %d: %v", - evictHeight, err) + log.Warnf("Failed to retrieve the block at height %d: %v", evictHeight, + err) return } - b.cfg.SigCache.EvictEntries(block.MsgBlock()) + m.cfg.SigCache.EvictEntries(block.MsgBlock()) } // fetchHeaderBlocks creates and sends a request to the syncPeer for the next // list of blocks to be downloaded based on the current list of headers. -func (b *blockManager) fetchHeaderBlocks() { +func (m *SyncManager) fetchHeaderBlocks() { // Nothing to do if there is no start header. - if b.startHeader == nil { - bmgrLog.Warnf("fetchHeaderBlocks called with no start header") + if m.startHeader == nil { + log.Warnf("fetchHeaderBlocks called with no start header") return } // Build up a getdata request for the list of blocks the headers // describe. The size hint will be limited to wire.MaxInvPerMsg by // the function, so no need to double check it here. - gdmsg := wire.NewMsgGetDataSizeHint(uint(b.headerList.Len())) + gdmsg := wire.NewMsgGetDataSizeHint(uint(m.headerList.Len())) numRequested := 0 - for e := b.startHeader; e != nil; e = e.Next() { + for e := m.startHeader; e != nil; e = e.Next() { node, ok := e.Value.(*headerNode) if !ok { - bmgrLog.Warn("Header list node type is not a headerNode") + log.Warn("Header list node type is not a headerNode") continue } iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) - haveInv, err := b.haveInventory(iv) + haveInv, err := m.haveInventory(iv) if err != nil { - bmgrLog.Warnf("Unexpected failure when checking for "+ - "existing inventory during header block "+ - "fetch: %v", err) + log.Warnf("Unexpected failure when checking for existing "+ + "inventory during header block fetch: %v", err) continue } if !haveInv { - b.requestedBlocks[*node.hash] = struct{}{} - syncPeerState := b.peerStates[b.syncPeer] + m.requestedBlocks[*node.hash] = struct{}{} + syncPeerState := m.peerStates[m.syncPeer] syncPeerState.requestedBlocks[*node.hash] = struct{}{} err = gdmsg.AddInvVect(iv) if err != nil { - bmgrLog.Warnf("Failed to add invvect while fetching "+ - "block headers: %v", err) + log.Warnf("Failed to add invvect while fetching block "+ + "headers: %v", err) } numRequested++ } - b.startHeader = e.Next() + m.startHeader = e.Next() if numRequested >= wire.MaxInvPerMsg { break } } if len(gdmsg.InvList) > 0 { - b.syncPeer.QueueMessage(gdmsg, nil) + m.syncPeer.QueueMessage(gdmsg, nil) } } // handleHeadersMsg handles headers messages from all peers. -func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { +func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { peer := hmsg.peer - _, exists := b.peerStates[peer] + _, exists := m.peerStates[peer] if !exists { - bmgrLog.Warnf("Received headers message from unknown peer %s", peer) + log.Warnf("Received headers message from unknown peer %s", peer) return } // The remote peer is misbehaving if we didn't request headers. msg := hmsg.headers numHeaders := len(msg.Headers) - if !b.headersFirstMode { - bmgrLog.Warnf("Got %d unrequested headers from %s -- "+ - "disconnecting", numHeaders, peer.Addr()) + if !m.headersFirstMode { + log.Warnf("Got %d unrequested headers from %s -- disconnecting", + numHeaders, peer.Addr()) peer.Disconnect() return } @@ -1274,10 +1217,10 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { finalHash = &blockHash // Ensure there is a previous header to compare against. - prevNodeEl := b.headerList.Back() + prevNodeEl := m.headerList.Back() if prevNodeEl == nil { - bmgrLog.Warnf("Header list does not contain a previous " + - "element as expected -- disconnecting peer") + log.Warnf("Header list does not contain a previous element as " + + "expected -- disconnecting peer") peer.Disconnect() return } @@ -1288,31 +1231,28 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { prevNode := prevNodeEl.Value.(*headerNode) if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { node.height = prevNode.height + 1 - e := b.headerList.PushBack(&node) - if b.startHeader == nil { - b.startHeader = e + e := m.headerList.PushBack(&node) + if m.startHeader == nil { + m.startHeader = e } } else { - bmgrLog.Warnf("Received block header that does not "+ - "properly connect to the chain from peer %s "+ - "-- disconnecting", peer.Addr()) + log.Warnf("Received block header that does not properly connect "+ + "to the chain from peer %s -- disconnecting", peer.Addr()) peer.Disconnect() return } // Verify the header at the next checkpoint height matches. - if node.height == b.nextCheckpoint.Height { - if node.hash.IsEqual(b.nextCheckpoint.Hash) { + if node.height == m.nextCheckpoint.Height { + if node.hash.IsEqual(m.nextCheckpoint.Hash) { receivedCheckpoint = true - bmgrLog.Infof("Verified downloaded block "+ - "header against checkpoint at height "+ - "%d/hash %s", node.height, node.hash) + log.Infof("Verified downloaded block header against "+ + "checkpoint at height %d/hash %s", node.height, node.hash) } else { - bmgrLog.Warnf("Block header at height %d/hash "+ - "%s from peer %s does NOT match "+ - "expected checkpoint hash of %s -- "+ - "disconnecting", node.height, node.hash, - peer.Addr(), b.nextCheckpoint.Hash) + log.Warnf("Block header at height %d/hash %s from peer %s "+ + "does NOT match expected checkpoint hash of %s -- "+ + "disconnecting", node.height, node.hash, peer.Addr(), + m.nextCheckpoint.Hash) peer.Disconnect() return } @@ -1327,11 +1267,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // that is already in the database and is only used to ensure // the next header links properly, it must be removed before // fetching the blocks. - b.headerList.Remove(b.headerList.Front()) - bmgrLog.Infof("Received %v block headers: Fetching blocks", - b.headerList.Len()) - b.progressLogger.SetLastLogTime(time.Now()) - b.fetchHeaderBlocks() + m.headerList.Remove(m.headerList.Front()) + log.Infof("Received %v block headers: Fetching blocks", + m.headerList.Len()) + m.progressLogger.SetLastLogTime(time.Now()) + m.fetchHeaderBlocks() return } @@ -1339,20 +1279,20 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // headers starting from the latest known header and ending with the // next checkpoint. locator := []chainhash.Hash{*finalHash} - err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) if err != nil { - bmgrLog.Warnf("Failed to send getheaders message to "+ - "peer %s: %v", peer.Addr(), err) + log.Warnf("Failed to send getheaders message to peer %s: %v", + peer.Addr(), err) return } } // handleNotFoundMsg handles notfound messages from all peers. -func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) { +func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { peer := nfmsg.peer - state, exists := b.peerStates[peer] + state, exists := m.peerStates[peer] if !exists { - bmgrLog.Warnf("Received notfound message from unknown peer %s", peer) + log.Warnf("Received notfound message from unknown peer %s", peer) return } for _, inv := range nfmsg.notFound.InvList { @@ -1362,12 +1302,12 @@ func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) { case wire.InvTypeBlock: if _, exists := state.requestedBlocks[inv.Hash]; exists { delete(state.requestedBlocks, inv.Hash) - delete(b.requestedBlocks, inv.Hash) + delete(m.requestedBlocks, inv.Hash) } case wire.InvTypeTx: if _, exists := state.requestedTxns[inv.Hash]; exists { delete(state.requestedTxns, inv.Hash) - delete(b.requestedTxns, inv.Hash) + delete(m.requestedTxns, inv.Hash) } } } @@ -1378,24 +1318,24 @@ func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) { // inventory can be when it is in different states such as blocks that are part // of the main chain, on a side chain, in the orphan pool, and transactions that // are in the memory pool (either the main pool or orphan pool). -func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { +func (m *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) { switch invVect.Type { case wire.InvTypeBlock: // Determine if the block is known in any form (main chain, side // chain, or orphan). hash := &invVect.Hash - return b.isKnownOrphan(hash) || b.cfg.Chain.HaveBlock(hash), nil + return m.isKnownOrphan(hash) || m.cfg.Chain.HaveBlock(hash), nil case wire.InvTypeTx: // Ask the transaction memory pool if the transaction is known // to it in any form (main pool or orphan). - if b.cfg.TxMemPool.HaveTransaction(&invVect.Hash) { + if m.cfg.TxMemPool.HaveTransaction(&invVect.Hash) { return true, nil } // Check if the transaction exists from the point of view of the // end of the main chain. - entry, err := b.cfg.Chain.FetchUtxoEntry(&invVect.Hash) + entry, err := m.cfg.Chain.FetchUtxoEntry(&invVect.Hash) if err != nil { return false, err } @@ -1409,11 +1349,11 @@ func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { // handleInvMsg handles inv messages from all peers. // We examine the inventory advertised by the remote peer and act accordingly. -func (b *blockManager) handleInvMsg(imsg *invMsg) { +func (m *SyncManager) handleInvMsg(imsg *invMsg) { peer := imsg.peer - state, exists := b.peerStates[peer] + state, exists := m.peerStates[peer] if !exists { - bmgrLog.Warnf("Received inv message from unknown peer %s", peer) + log.Warnf("Received inv message from unknown peer %s", peer) return } @@ -1428,8 +1368,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { } } - fromSyncPeer := peer == b.syncPeer - isCurrent := b.IsCurrent() + fromSyncPeer := peer == m.syncPeer + isCurrent := m.IsCurrent() // If this inv contains a block announcement, and this isn't coming from // our current sync peer or we're current, then update the last @@ -1449,7 +1389,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // If our chain is current and a peer announces a block we already // know of, then update their current block height. if lastBlock != -1 && isCurrent { - blkHeight, err := b.cfg.Chain.BlockHeightByHash(&invVects[lastBlock].Hash) + blkHeight, err := m.cfg.Chain.BlockHeightByHash(&invVects[lastBlock].Hash) if err == nil { imsg.peer.UpdateLastBlockHeight(blkHeight) } @@ -1471,23 +1411,22 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { peer.AddKnownInventory(iv) // Ignore inventory when we're in headers-first mode. - if b.headersFirstMode { + if m.headersFirstMode { continue } // Request the inventory if we don't already have it. - haveInv, err := b.haveInventory(iv) + haveInv, err := m.haveInventory(iv) if err != nil { - bmgrLog.Warnf("Unexpected failure when checking for "+ - "existing inventory during inv message "+ - "processing: %v", err) + log.Warnf("Unexpected failure when checking for existing "+ + "inventory during inv message processing: %v", err) continue } if !haveInv { if iv.Type == wire.InvTypeTx { // Skip the transaction if it has already been // rejected. - if _, exists := b.rejectedTxns[iv.Hash]; exists { + if _, exists := m.rejectedTxns[iv.Hash]; exists { continue } } @@ -1508,23 +1447,22 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // resending the orphan block as an available block // to signal there are more missing blocks that need to // be requested. - if b.isKnownOrphan(&iv.Hash) { + if m.isKnownOrphan(&iv.Hash) { // Request blocks starting at the latest known // up to the root of the orphan that just came // in. - orphanRoot := b.orphanRoot(&iv.Hash) - blkLocator, err := b.cfg.Chain.LatestBlockLocator() + orphanRoot := m.orphanRoot(&iv.Hash) + blkLocator, err := m.cfg.Chain.LatestBlockLocator() if err != nil { - bmgrLog.Errorf("PEER: Failed to get block "+ - "locator for the latest block: "+ - "%v", err) + log.Errorf("Failed to get block locator for the latest "+ + "block: %v", err) continue } locator := chainBlockLocatorToHashes(blkLocator) err = peer.PushGetBlocksMsg(locator, orphanRoot) if err != nil { - bmgrLog.Errorf("PEER: Failed to push getblocksmsg "+ - "for orphan chain: %v", err) + log.Errorf("Failed to push getblocksmsg for orphan chain: "+ + "%v", err) } continue } @@ -1537,12 +1475,11 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // Request blocks after this one up to the // final one the remote peer knows about (zero // stop hash). - blkLocator := b.cfg.Chain.BlockLocatorFromHash(&iv.Hash) + blkLocator := m.cfg.Chain.BlockLocatorFromHash(&iv.Hash) locator := chainBlockLocatorToHashes(blkLocator) err = imsg.peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { - bmgrLog.Errorf("PEER: Failed to push getblocksmsg: "+ - "%v", err) + log.Errorf("PEER: Failed to push getblocksmsg: %v", err) } } } @@ -1556,8 +1493,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { case wire.InvTypeBlock: // Request the block if there is not already a pending // request. - if _, exists := b.requestedBlocks[iv.Hash]; !exists { - limitAdd(b.requestedBlocks, iv.Hash, maxRequestedBlocks) + if _, exists := m.requestedBlocks[iv.Hash]; !exists { + limitAdd(m.requestedBlocks, iv.Hash, maxRequestedBlocks) limitAdd(state.requestedBlocks, iv.Hash, maxRequestedBlocks) gdmsg.AddInvVect(iv) numRequested++ @@ -1566,8 +1503,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { case wire.InvTypeTx: // Request the transaction if there is not already a // pending request. - if _, exists := b.requestedTxns[iv.Hash]; !exists { - limitAdd(b.requestedTxns, iv.Hash, maxRequestedTxns) + if _, exists := m.requestedTxns[iv.Hash]; !exists { + limitAdd(m.requestedTxns, iv.Hash, maxRequestedTxns) limitAdd(state.requestedTxns, iv.Hash, maxRequestedTxns) gdmsg.AddInvVect(iv) numRequested++ @@ -1610,63 +1547,63 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) { m[hash] = struct{}{} } -// blockHandler is the main handler for the block manager. It must be run -// as a goroutine. It processes block and inv messages in a separate goroutine -// from the peer handlers so the block (MsgBlock) messages are handled by a -// single thread without needing to lock memory data structures. This is -// important because the block manager controls which blocks are needed and how -// the fetching should proceed. -func (b *blockManager) blockHandler() { +// blockHandler is the main handler for the sync manager. It must be run as a +// goroutine. It processes block and inv messages in a separate goroutine from +// the peer handlers so the block (MsgBlock) messages are handled by a single +// thread without needing to lock memory data structures. This is important +// because the sync manager controls which blocks are needed and how the +// fetching should proceed. +func (m *SyncManager) blockHandler() { out: for { select { - case m := <-b.msgChan: - switch msg := m.(type) { + case data := <-m.msgChan: + switch msg := data.(type) { case *newPeerMsg: - b.handleNewPeerMsg(msg.peer) + m.handleNewPeerMsg(msg.peer) case *txMsg: - b.handleTxMsg(msg) + m.handleTxMsg(msg) msg.reply <- struct{}{} case *blockMsg: - b.handleBlockMsg(msg) + m.handleBlockMsg(msg) msg.reply <- struct{}{} case *invMsg: - b.handleInvMsg(msg) + m.handleInvMsg(msg) case *headersMsg: - b.handleHeadersMsg(msg) + m.handleHeadersMsg(msg) case *notFoundMsg: - b.handleNotFoundMsg(msg) + m.handleNotFoundMsg(msg) case *donePeerMsg: - b.handleDonePeerMsg(msg.peer) + m.handleDonePeerMsg(msg.peer) case getSyncPeerMsg: var peerID int32 - if b.syncPeer != nil { - peerID = b.syncPeer.ID() + if m.syncPeer != nil { + peerID = m.syncPeer.ID() } msg.reply <- peerID case requestFromPeerMsg: - err := b.requestFromPeer(msg.peer, msg.blocks, msg.txs) + err := m.requestFromPeer(msg.peer, msg.blocks, msg.txs) msg.reply <- requestFromPeerResponse{ err: err, } case forceReorganizationMsg: - err := b.cfg.Chain.ForceHeadReorganization( + err := m.cfg.Chain.ForceHeadReorganization( msg.formerBest, msg.newBest) if err == nil { // Notify stake difficulty subscribers and prune // invalidated transactions. - best := b.cfg.Chain.BestSnapshot() - if r := b.cfg.RpcServer(); r != nil { + best := m.cfg.Chain.BestSnapshot() + if r := m.cfg.RpcServer(); r != nil { r.NotifyStakeDifficulty( &rpcserver.StakeDifficultyNtfnData{ BlockHash: best.Hash, @@ -1674,9 +1611,9 @@ out: StakeDifficulty: best.NextStakeDiff, }) } - b.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, + m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) - b.cfg.TxMemPool.PruneExpiredTx() + m.cfg.TxMemPool.PruneExpiredTx() } msg.reply <- forceReorganizationResponse{ @@ -1684,14 +1621,14 @@ out: } case tipGenerationMsg: - g, err := b.cfg.Chain.TipGeneration() + g, err := m.cfg.Chain.TipGeneration() msg.reply <- tipGenerationResponse{ hashes: g, err: err, } case processBlockMsg: - forkLen, isOrphan, err := b.processBlockAndOrphans(msg.block, + forkLen, isOrphan, err := m.processBlockAndOrphans(msg.block, msg.flags) if err != nil { msg.reply <- processBlockResponse{ @@ -1706,8 +1643,8 @@ out: if onMainChain { // Notify stake difficulty subscribers and prune // invalidated transactions. - best := b.cfg.Chain.BestSnapshot() - if r := b.cfg.RpcServer(); r != nil { + best := m.cfg.Chain.BestSnapshot() + if r := m.cfg.RpcServer(); r != nil { r.NotifyStakeDifficulty( &rpcserver.StakeDifficultyNtfnData{ BlockHash: best.Hash, @@ -1715,9 +1652,9 @@ out: StakeDifficulty: best.NextStakeDiff, }) } - b.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, + m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) - b.cfg.TxMemPool.PruneExpiredTx() + m.cfg.TxMemPool.PruneExpiredTx() } msg.reply <- processBlockResponse{ @@ -1726,7 +1663,7 @@ out: } case processTransactionMsg: - acceptedTxs, err := b.cfg.TxMemPool.ProcessTransaction(msg.tx, + acceptedTxs, err := m.cfg.TxMemPool.ProcessTransaction(msg.tx, msg.allowOrphans, msg.rateLimit, msg.allowHighFees, msg.tag) msg.reply <- processTransactionResponse{ acceptedTxs: acceptedTxs, @@ -1734,145 +1671,144 @@ out: } default: - bmgrLog.Warnf("Invalid message type in block handler: %T", msg) + log.Warnf("Invalid message type in block handler: %T", msg) } - case <-b.quit: + case <-m.quit: break out } } - b.wg.Done() - bmgrLog.Trace("Block handler done") + m.wg.Done() + log.Trace("Sync manager done") } -// NewPeer informs the block manager of a newly active peer. -func (b *blockManager) NewPeer(peer *peerpkg.Peer) { +// NewPeer informs the sync manager of a newly active peer. +func (m *SyncManager) NewPeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- &newPeerMsg{peer: peer} + m.msgChan <- &newPeerMsg{peer: peer} } // QueueTx adds the passed transaction message and peer to the block handling // queue. -func (b *blockManager) QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan struct{}) { +func (m *SyncManager) QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan struct{}) { // Don't accept more transactions if we're shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { done <- struct{}{} return } - b.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} + m.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} } // QueueBlock adds the passed block message and peer to the block handling queue. -func (b *blockManager) QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done chan struct{}) { +func (m *SyncManager) QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done chan struct{}) { // Don't accept more blocks if we're shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { done <- struct{}{} return } - b.msgChan <- &blockMsg{block: block, peer: peer, reply: done} + m.msgChan <- &blockMsg{block: block, peer: peer, reply: done} } // QueueInv adds the passed inv message and peer to the block handling queue. -func (b *blockManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { +func (m *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on inv // messages. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- &invMsg{inv: inv, peer: peer} + m.msgChan <- &invMsg{inv: inv, peer: peer} } // QueueHeaders adds the passed headers message and peer to the block handling // queue. -func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { +func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on // headers messages. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- &headersMsg{headers: headers, peer: peer} + m.msgChan <- &headersMsg{headers: headers, peer: peer} } // QueueNotFound adds the passed notfound message and peer to the block handling // queue. -func (b *blockManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) { +func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on // reject messages. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- ¬FoundMsg{notFound: notFound, peer: peer} + m.msgChan <- ¬FoundMsg{notFound: notFound, peer: peer} } -// DonePeer informs the blockmanager that a peer has disconnected. -func (b *blockManager) DonePeer(peer *peerpkg.Peer) { +// DonePeer informs the sync manager that a peer has disconnected. +func (m *SyncManager) DonePeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- &donePeerMsg{peer: peer} + m.msgChan <- &donePeerMsg{peer: peer} } // Start begins the core block handler which processes block and inv messages. -func (b *blockManager) Start() { +func (m *SyncManager) Start() { // Already started? - if atomic.AddInt32(&b.started, 1) != 1 { + if atomic.AddInt32(&m.started, 1) != 1 { return } - bmgrLog.Trace("Starting block manager") - b.wg.Add(1) - go b.blockHandler() + log.Trace("Starting sync manager") + m.wg.Add(1) + go m.blockHandler() } -// Stop gracefully shuts down the block manager by stopping all asynchronous +// Stop gracefully shuts down the sync manager by stopping all asynchronous // handlers and waiting for them to finish. -func (b *blockManager) Stop() error { - if atomic.AddInt32(&b.shutdown, 1) != 1 { - bmgrLog.Warnf("Block manager is already in the process of " + - "shutting down") +func (m *SyncManager) Stop() error { + if atomic.AddInt32(&m.shutdown, 1) != 1 { + log.Warnf("Sync manager is already in the process of shutting down") return nil } - bmgrLog.Infof("Block manager shutting down") - close(b.quit) - b.wg.Wait() + log.Infof("Sync manager shutting down") + close(m.quit) + m.wg.Wait() return nil } // SyncPeerID returns the ID of the current sync peer, or 0 if there is none. -func (b *blockManager) SyncPeerID() int32 { +func (m *SyncManager) SyncPeerID() int32 { reply := make(chan int32) - b.msgChan <- getSyncPeerMsg{reply: reply} + m.msgChan <- getSyncPeerMsg{reply: reply} return <-reply } // RequestFromPeer allows an outside caller to request blocks or transactions -// from a peer. The requests are logged in the blockmanager's internal map of -// requests so they do not later ban the peer for sending the respective data. -func (b *blockManager) RequestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error { +// from a peer. The requests are logged in the internal map of requests so the +// peer is not later banned for sending the respective data. +func (m *SyncManager) RequestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error { reply := make(chan requestFromPeerResponse) - b.msgChan <- requestFromPeerMsg{peer: p, blocks: blocks, txs: txs, + m.msgChan <- requestFromPeerMsg{peer: p, blocks: blocks, txs: txs, reply: reply} response := <-reply return response.err } -func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error { +func (m *SyncManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error { msgResp := wire.NewMsgGetData() - state, exists := b.peerStates[p] + state, exists := m.peerStates[p] if !exists { return fmt.Errorf("unknown peer %s", p) } @@ -1881,14 +1817,14 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash for _, bh := range blocks { // If we've already requested this block, skip it. _, alreadyReqP := state.requestedBlocks[*bh] - _, alreadyReqB := b.requestedBlocks[*bh] + _, alreadyReqB := m.requestedBlocks[*bh] if alreadyReqP || alreadyReqB { continue } // Skip the block when it is already known. - if b.isKnownOrphan(bh) || b.cfg.Chain.HaveBlock(bh) { + if m.isKnownOrphan(bh) || m.cfg.Chain.HaveBlock(bh) { continue } @@ -1900,14 +1836,14 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash } state.requestedBlocks[*bh] = struct{}{} - b.requestedBlocks[*bh] = struct{}{} + m.requestedBlocks[*bh] = struct{}{} } // Add the vote transactions to the request. for _, vh := range txs { // If we've already requested this transaction, skip it. _, alreadyReqP := state.requestedTxns[*vh] - _, alreadyReqB := b.requestedTxns[*vh] + _, alreadyReqB := m.requestedTxns[*vh] if alreadyReqP || alreadyReqB { continue @@ -1915,13 +1851,13 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash // Ask the transaction memory pool if the transaction is known // to it in any form (main pool or orphan). - if b.cfg.TxMemPool.HaveTransaction(vh) { + if m.cfg.TxMemPool.HaveTransaction(vh) { continue } // Check if the transaction exists from the point of view of the // end of the main chain. - entry, err := b.cfg.Chain.FetchUtxoEntry(vh) + entry, err := m.cfg.Chain.FetchUtxoEntry(vh) if err != nil { return err } @@ -1937,7 +1873,7 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash } state.requestedTxns[*vh] = struct{}{} - b.requestedTxns[*vh] = struct{}{} + m.requestedTxns[*vh] = struct{}{} } if len(msgResp.InvList) > 0 { @@ -1949,11 +1885,11 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash // ForceReorganization forces a reorganization of the block chain to the block // hash requested, so long as it matches up with the current organization of the -// best chain. It is funneled through the block manager since blockchain is not +// best chain. It is funneled through the sync manager since blockchain is not // safe for concurrent access. -func (b *blockManager) ForceReorganization(formerBest, newBest chainhash.Hash) error { +func (m *SyncManager) ForceReorganization(formerBest, newBest chainhash.Hash) error { reply := make(chan forceReorganizationResponse) - b.msgChan <- forceReorganizationMsg{ + m.msgChan <- forceReorganizationMsg{ formerBest: formerBest, newBest: newBest, reply: reply} @@ -1962,64 +1898,108 @@ func (b *blockManager) ForceReorganization(formerBest, newBest chainhash.Hash) e } // TipGeneration returns the hashes of all the children of the current best -// chain tip. It is funneled through the block manager since blockchain is not +// chain tip. It is funneled through the sync manager since blockchain is not // safe for concurrent access. -func (b *blockManager) TipGeneration() ([]chainhash.Hash, error) { +func (m *SyncManager) TipGeneration() ([]chainhash.Hash, error) { reply := make(chan tipGenerationResponse) - b.msgChan <- tipGenerationMsg{reply: reply} + m.msgChan <- tipGenerationMsg{reply: reply} response := <-reply return response.hashes, response.err } // ProcessBlock makes use of ProcessBlock on an internal instance of a block -// chain. It is funneled through the block manager since blockchain is not safe +// chain. It is funneled through the sync manager since blockchain is not safe // for concurrent access. -func (b *blockManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { +func (m *SyncManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { reply := make(chan processBlockResponse, 1) - b.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} + m.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} response := <-reply return response.isOrphan, response.err } // ProcessTransaction makes use of ProcessTransaction on an internal instance of -// a block chain. It is funneled through the block manager since blockchain is +// a block chain. It is funneled through the sync manager since blockchain is // not safe for concurrent access. -func (b *blockManager) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, +func (m *SyncManager) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, rateLimit bool, allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) { reply := make(chan processTransactionResponse, 1) - b.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit, + m.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit, allowHighFees, tag, reply} response := <-reply return response.acceptedTxs, response.err } -// IsCurrent returns whether or not the block manager believes it is synced with +// IsCurrent returns whether or not the sync manager believes it is synced with // the connected peers. // // This function is safe for concurrent access. -func (b *blockManager) IsCurrent() bool { - b.isCurrentMtx.RLock() - isCurrent := b.isCurrent - b.isCurrentMtx.RUnlock() +func (m *SyncManager) IsCurrent() bool { + m.isCurrentMtx.RLock() + isCurrent := m.isCurrent + m.isCurrentMtx.RUnlock() return isCurrent } // TicketPoolValue returns the current value of the total stake in the ticket // pool. -func (b *blockManager) TicketPoolValue() (dcrutil.Amount, error) { - return b.cfg.Chain.TicketPoolValue() +func (m *SyncManager) TicketPoolValue() (dcrutil.Amount, error) { + return m.cfg.Chain.TicketPoolValue() +} + +// Config holds the configuration options related to the network chain +// synchronization manager. +type Config struct { + // PeerNotifier specifies an implementation to use for notifying peers of + // status changes related to blocks and transactions. + PeerNotifier PeerNotifier + + // ChainParams identifies which chain parameters the manager is associated + // with. + ChainParams *chaincfg.Params + + // Chain specifies the chain instance to use for processing blocks and + // transactions. + Chain *blockchain.BlockChain + + // SigCache defines the signature cache to use when validating signatures. + SigCache *txscript.SigCache + + // TxMemPool specifies the mempool to use for processing transactions. + TxMemPool *mempool.TxPool + + // RpcServer returns an instance of an RPC server to use for notifications. + // It may return nil if there is no active RPC server. + RpcServer func() *rpcserver.Server + + // DisableCheckpoints indicates whether or not the sync manager should make + // use of checkpoints. + DisableCheckpoints bool + + // NoMiningStateSync indicates whether or not the sync manager should + // perform an initial mining state synchronization with peers once they are + // believed to be fully synced. + NoMiningStateSync bool + + // MaxPeers specifies the maximum number of peers the server is expected to + // be connected with. It is primarily used as a hint for more efficient + // synchronization. + MaxPeers int + + // MaxOrphanTxs specifies the maximum number of orphan transactions the + // transaction pool associated with the server supports. + MaxOrphanTxs int } -// newBlockManager returns a new Decred block manager. -// Use Start to begin processing asynchronous block and inv updates. -func newBlockManager(config *blockManagerConfig) (*blockManager, error) { - bm := blockManager{ - cfg: config, +// New returns a new network chain synchronization manager. Use Start to begin +// processing asynchronous block and inv updates. +func New(config *Config) (*SyncManager, error) { + m := SyncManager{ + cfg: *config, rejectedTxns: make(map[chainhash.Hash]struct{}), requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), peerStates: make(map[*peerpkg.Peer]*peerSyncState), - progressLogger: progresslog.New("Processed", bmgrLog), + progressLogger: progresslog.New("Processed", log), msgChan: make(chan interface{}, config.MaxPeers*3), headerList: list.New(), quit: make(chan struct{}), @@ -2028,20 +2008,20 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) { isCurrent: config.Chain.IsCurrent(), } - best := bm.cfg.Chain.BestSnapshot() - if !bm.cfg.DisableCheckpoints { + best := m.cfg.Chain.BestSnapshot() + if !m.cfg.DisableCheckpoints { // Initialize the next checkpoint based on the current height. - bm.nextCheckpoint = bm.findNextHeaderCheckpoint(best.Height) - if bm.nextCheckpoint != nil { - bm.resetHeaderState(&best.Hash, best.Height) + m.nextCheckpoint = m.findNextHeaderCheckpoint(best.Height) + if m.nextCheckpoint != nil { + m.resetHeaderState(&best.Hash, best.Height) } } else { - bmgrLog.Info("Checkpoints are disabled") + log.Info("Checkpoints are disabled") } - bm.syncHeightMtx.Lock() - bm.syncHeight = best.Height - bm.syncHeightMtx.Unlock() + m.syncHeightMtx.Lock() + m.syncHeight = best.Height + m.syncHeightMtx.Unlock() - return &bm, nil + return &m, nil } diff --git a/internal/rpcserver/rpcserverhelp.go b/internal/rpcserver/rpcserverhelp.go index cc000d0c4c..5006d7a8a2 100644 --- a/internal/rpcserver/rpcserverhelp.go +++ b/internal/rpcserver/rpcserverhelp.go @@ -22,7 +22,7 @@ var helpDescsEnUS = map[string]string{ "The levelspec can either a debug level or of the form:\n" + "=,=,...\n" + "The valid debug levels are trace, debug, info, warn, error, and critical.\n" + - "The valid subsystems are AMGR, ADXR, BCDB, BMGR, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, and TXMP.\n" + + "The valid subsystems are AMGR, ADXR, BCDB, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, SYNC, and TXMP.\n" + "Finally the keyword 'show' will return a list of the available subsystems.", "debuglevel-levelspec": "The debug level(s) to use or the keyword 'show'", "debuglevel--condition0": "levelspec!=show", diff --git a/internal/rpcserver/rpcwebsocket.go b/internal/rpcserver/rpcwebsocket.go index fb8fdc72a9..2d488545f3 100644 --- a/internal/rpcserver/rpcwebsocket.go +++ b/internal/rpcserver/rpcwebsocket.go @@ -2028,8 +2028,8 @@ var ErrClientQuit = errors.New("client quit") // QueueNotification queues the passed notification to be sent to the websocket // client. This function, as the name implies, is only intended for // notifications since it has additional logic to prevent other subsystems, such -// as the memory pool and block manager, from blocking even when the send -// channel is full. +// as the memory pool and sync manager, from blocking even when the send channel +// is full. // // If the client is in the process of shutting down, this function returns // ErrClientQuit. This is intended to be checked by long-running notification diff --git a/log.go b/log.go index 28843d3600..e7289a6c61 100644 --- a/log.go +++ b/log.go @@ -20,6 +20,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/mining/cpuminer" + "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/peer/v2" "github.com/decred/dcrd/txscript/v3" @@ -60,7 +61,6 @@ var ( adxrLog = backendLog.Logger("ADXR") amgrLog = backendLog.Logger("AMGR") bcdbLog = backendLog.Logger("BCDB") - bmgrLog = backendLog.Logger("BMGR") chanLog = backendLog.Logger("CHAN") cmgrLog = backendLog.Logger("CMGR") dcrdLog = backendLog.Logger("DCRD") @@ -73,6 +73,7 @@ var ( scrpLog = backendLog.Logger("SCRP") srvrLog = backendLog.Logger("SRVR") stkeLog = backendLog.Logger("STKE") + syncLog = backendLog.Logger("SYNC") txmpLog = backendLog.Logger("TXMP") trsyLog = backendLog.Logger("TRSY") ) @@ -92,6 +93,7 @@ func init() { peer.UseLogger(peerLog) rpcserver.UseLogger(rpcsLog) stake.UseLogger(stkeLog) + netsync.UseLogger(syncLog) txscript.UseLogger(scrpLog) } @@ -100,7 +102,6 @@ var subsystemLoggers = map[string]slog.Logger{ "ADXR": adxrLog, "AMGR": amgrLog, "BCDB": bcdbLog, - "BMGR": bmgrLog, "CHAN": chanLog, "CMGR": cmgrLog, "DCRD": dcrdLog, @@ -113,6 +114,7 @@ var subsystemLoggers = map[string]slog.Logger{ "SCRP": scrpLog, "SRVR": srvrLog, "STKE": stkeLog, + "SYNC": syncLog, "TXMP": txmpLog, "TRSY": trsyLog, } diff --git a/rpcadaptors.go b/rpcadaptors.go index 9f60a7832c..cd871a5bb9 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -19,6 +19,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/mining/cpuminer" + "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/peer/v2" "github.com/decred/dcrd/wire" @@ -306,23 +307,23 @@ func (*rpcConnManager) Lookup(host string) ([]net.IP, error) { return dcrdLookup(host) } -// rpcSyncMgr provides a block manager for use with the RPC server and -// implements the rpcserver.SyncManager interface. +// rpcSyncMgr provides an adaptor for use with the RPC server and implements the +// rpcserver.SyncManager interface. type rpcSyncMgr struct { - server *server - blockMgr *blockManager + server *server + syncMgr *netsync.SyncManager } // Ensure rpcSyncMgr implements the rpcserver.SyncManager interface. var _ rpcserver.SyncManager = (*rpcSyncMgr)(nil) -// IsCurrent returns whether or not the sync manager believes the chain is +// IsCurrent returns whether or not the net sync manager believes the chain is // current as compared to the rest of the network. // // This function is safe for concurrent access and is part of the // rpcserver.SyncManager interface implementation. func (b *rpcSyncMgr) IsCurrent() bool { - return b.blockMgr.IsCurrent() + return b.syncMgr.IsCurrent() } // SubmitBlock submits the provided block to the network after processing it @@ -331,7 +332,7 @@ func (b *rpcSyncMgr) IsCurrent() bool { // This function is safe for concurrent access and is part of the // rpcserver.SyncManager interface implementation. func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { - return b.blockMgr.ProcessBlock(block, flags) + return b.syncMgr.ProcessBlock(block, flags) } // SyncPeer returns the id of the current peer being synced with. @@ -339,7 +340,7 @@ func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block, flags blockchain.Behavior // This function is safe for concurrent access and is part of the // rpcserver.SyncManager interface implementation. func (b *rpcSyncMgr) SyncPeerID() int32 { - return b.blockMgr.SyncPeerID() + return b.syncMgr.SyncPeerID() } // LocateBlocks returns the hashes of the blocks after the first known block in @@ -355,19 +356,19 @@ func (b *rpcSyncMgr) LocateBlocks(locator blockchain.BlockLocator, hashStop *cha // TipGeneration returns the entire generation of blocks stemming from the // parent of the current tip. func (b *rpcSyncMgr) TipGeneration() ([]chainhash.Hash, error) { - return b.blockMgr.TipGeneration() + return b.syncMgr.TipGeneration() } // SyncHeight returns latest known block being synced to. func (b *rpcSyncMgr) SyncHeight() int64 { - return b.blockMgr.SyncHeight() + return b.syncMgr.SyncHeight() } // ProcessTransaction relays the provided transaction validation and insertion // into the memory pool. func (b *rpcSyncMgr) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, rateLimit bool, allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) { - return b.blockMgr.ProcessTransaction(tx, allowOrphans, + return b.syncMgr.ProcessTransaction(tx, allowOrphans, rateLimit, allowHighFees, tag) } diff --git a/server.go b/server.go index c5e5c36b4c..e30d8970ae 100644 --- a/server.go +++ b/server.go @@ -42,6 +42,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/mining/cpuminer" + "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/lru" @@ -161,9 +162,9 @@ type relayMsg struct { immediate bool } -// updatePeerHeightsMsg is a message sent from the blockmanager to the server -// after a new block has been accepted. The purpose of the message is to update -// the heights of peers that were known to announce the block before we +// updatePeerHeightsMsg is a message sent from the net sync manager to the +// server after a new block has been accepted. The purpose of the message is to +// update the heights of peers that were known to announce the block before we // connected it to the main chain or recognized it as an orphan. With these // updates, peer heights will be kept up to date, allowing for fresh data when // selecting sync peer candidacy. @@ -462,7 +463,7 @@ type server struct { sigCache *txscript.SigCache subsidyCache *standalone.SubsidyCache rpcServer *rpcserver.Server - blockManager *blockManager + syncManager *netsync.SyncManager bg *mining.BgBlkTmplGenerator chain *blockchain.BlockChain txMemPool *mempool.TxPool @@ -501,8 +502,7 @@ type server struct { lotteryDataBroadcast map[chainhash.Hash]struct{} } -// serverPeer extends the peer to maintain state shared by the server and -// the blockmanager. +// serverPeer extends the peer to maintain state shared by the server. type serverPeer struct { *peer.Peer @@ -524,7 +524,8 @@ type serverPeer struct { getMiningStateSent bool initStateSent bool - // The following chans are used to sync blockmanager and server. + // The following chans are used to synchronize the net sync manager and + // server. txProcessed chan struct{} blockProcessed chan struct{} @@ -699,7 +700,7 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej // Advertise the local address when the server accepts incoming // connections and it believes itself to be close to the best // known tip. - if !cfg.DisableListen && sp.server.blockManager.IsCurrent() { + if !cfg.DisableListen && sp.server.syncManager.IsCurrent() { // Get address that best matches. lna := addrManager.GetBestLocalAddress(remoteAddr) if addrmgr.IsRoutable(lna) { @@ -730,8 +731,8 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej // the local clock to keep the network time in sync. sp.server.timeSource.AddTimeSample(p.Addr(), msg.Timestamp) - // Signal the block manager this peer is a new sync candidate. - sp.server.blockManager.NewPeer(sp.Peer) + // Signal the net sync manager this peer is a new sync candidate. + sp.server.syncManager.NewPeer(sp.Peer) // Add valid peer to the server. sp.server.AddPeer(sp) @@ -807,12 +808,8 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState } sp.getMiningStateSent = true - // Access the block manager and get the list of best blocks to mine on. - bm := sp.server.blockManager - mp := sp.server.txMemPool - best := sp.server.chain.BestSnapshot() - // Send out blank mining states if it's early in the blockchain. + best := sp.server.chain.BestSnapshot() if best.Height < sp.server.chainParams.StakeValidationHeight-1 { err := sp.pushMiningStateMsg(0, nil, nil) if err != nil { @@ -825,9 +822,10 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState // Obtain the entire generation of blocks stemming from the parent of // the current tip. - children, err := bm.TipGeneration() + sm := sp.server.syncManager + children, err := sm.TipGeneration() if err != nil { - peerLog.Warnf("failed to access block manager to get the generation "+ + peerLog.Warnf("failed to access sync manager to get the generation "+ "for a mining state request (block: %v): %v", best.Hash, err) return } @@ -836,6 +834,7 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState // list to the maximum number of allowed eligible block hashes per // mining state message. There is nothing to send when there are no // eligible blocks. + mp := sp.server.txMemPool blockHashes := mining.SortParentsByVotes(mp, best.Hash, children, sp.server.chainParams) numBlocks := len(blockHashes) @@ -871,7 +870,7 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState // OnMiningState is invoked when a peer receives a miningstate wire message. It // requests the data advertised in the message from the peer. func (sp *serverPeer) OnMiningState(p *peer.Peer, msg *wire.MsgMiningState) { - err := sp.server.blockManager.RequestFromPeer(sp.Peer, msg.BlockHashes, + err := sp.server.syncManager.RequestFromPeer(sp.Peer, msg.BlockHashes, msg.VoteHashes) if err != nil { peerLog.Warnf("couldn't handle mining state message: %v", @@ -888,12 +887,8 @@ func (sp *serverPeer) OnGetInitState(p *peer.Peer, msg *wire.MsgGetInitState) { } sp.initStateSent = true - // Access the block manager and get the list of best blocks to mine on. - bm := sp.server.blockManager - mp := sp.server.txMemPool - best := sp.server.chain.BestSnapshot() - // Send out blank mining states if it's early in the blockchain. + best := sp.server.chain.BestSnapshot() if best.Height < sp.server.chainParams.StakeValidationHeight-1 { sp.QueueMessage(wire.NewMsgInitState(), nil) return @@ -913,12 +908,14 @@ func (sp *serverPeer) OnGetInitState(p *peer.Peer, msg *wire.MsgGetInitState) { // Fetch head block hashes if we need to send either them or their // votes. + mp := sp.server.txMemPool if wantBlocks || wantVotes { // Obtain the entire generation of blocks stemming from the // parent of the current tip. - children, err := bm.TipGeneration() + sm := sp.server.syncManager + children, err := sm.TipGeneration() if err != nil { - peerLog.Warnf("Failed to access block manager to get the generation "+ + peerLog.Warnf("Failed to access sync manager to get the generation "+ "for a init state request (block: %v): %v", best.Hash, err) return } @@ -979,7 +976,7 @@ func (sp *serverPeer) OnInitState(p *peer.Peer, msg *wire.MsgInitState) { txHashes = append(txHashes, &msg.TSpendHashes[i]) } - err := sp.server.blockManager.RequestFromPeer(sp.Peer, blockHashes, + err := sp.server.syncManager.RequestFromPeer(sp.Peer, blockHashes, txHashes) if err != nil { peerLog.Warnf("couldn't handle init state message: %v", err) @@ -1004,12 +1001,12 @@ func (sp *serverPeer) OnTx(p *peer.Peer, msg *wire.MsgTx) { iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) p.AddKnownInventory(iv) - // Queue the transaction up to be handled by the block manager and + // Queue the transaction up to be handled by the net sync manager and // intentionally block further receives until the transaction is fully // processed and known good or bad. This helps prevent a malicious peer // from queuing up a bunch of bad transactions before disconnecting (or // being disconnected) and wasting memory. - sp.server.blockManager.QueueTx(tx, sp.Peer, sp.txProcessed) + sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed) <-sp.txProcessed } @@ -1024,23 +1021,22 @@ func (sp *serverPeer) OnBlock(p *peer.Peer, msg *wire.MsgBlock, buf []byte) { iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) p.AddKnownInventory(iv) - // Queue the block up to be handled by the block manager and + // Queue the block up to be handled by the net sync manager and // intentionally block further receives until the network block is fully // processed and known good or bad. This helps prevent a malicious peer // from queuing up a bunch of bad blocks before disconnecting (or being // disconnected) and wasting memory. Additionally, this behavior is - // depended on by at least the block acceptance test tool as the - // reference implementation processes blocks in the same thread and - // therefore blocks further messages until the network block has been - // fully processed. - sp.server.blockManager.QueueBlock(block, sp.Peer, sp.blockProcessed) + // depended on by at least the block acceptance test tool as the reference + // implementation processes blocks in the same thread and therefore blocks + // further messages until the network block has been fully processed. + sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed) <-sp.blockProcessed } // OnInv is invoked when a peer receives an inv wire message and is used to // examine the inventory being advertised by the remote peer and react -// accordingly. We pass the message down to blockmanager which will call -// QueueMessage with any appropriate responses. +// accordingly. We pass the message down to the net sync manager which will +// call QueueMessage with any appropriate responses. func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) { // Ban peers sending empty inventory requests. if len(msg.InvList) == 0 { @@ -1049,7 +1045,7 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) { } if !cfg.BlocksOnly { - sp.server.blockManager.QueueInv(msg, sp.Peer) + sp.server.syncManager.QueueInv(msg, sp.Peer) return } @@ -1068,11 +1064,11 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) { } } - sp.server.blockManager.QueueInv(newInv, sp.Peer) + sp.server.syncManager.QueueInv(newInv, sp.Peer) } // OnHeaders is invoked when a peer receives a headers wire message. The -// message is passed down to the block manager. +// message is passed down to the net sync manager. func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { // Ban peers sending empty headers requests. if len(msg.Headers) == 0 { @@ -1080,7 +1076,7 @@ func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { return } - sp.server.blockManager.QueueHeaders(msg, sp.Peer) + sp.server.syncManager.QueueHeaders(msg, sp.Peer) } // handleGetData is invoked when a peer receives a getdata wire message and is @@ -1206,7 +1202,7 @@ func (sp *serverPeer) OnGetBlocks(p *peer.Peer, msg *wire.MsgGetBlocks) { // OnGetHeaders is invoked when a peer receives a getheaders wire message. func (sp *serverPeer) OnGetHeaders(p *peer.Peer, msg *wire.MsgGetHeaders) { // Ignore getheaders requests if not in sync. - if !sp.server.blockManager.IsCurrent() { + if !sp.server.syncManager.IsCurrent() { return } @@ -1268,7 +1264,7 @@ func (sp *serverPeer) OnGetCFilter(p *peer.Peer, msg *wire.MsgGetCFilter) { } // Ignore request if CFs are disabled or the chain is not yet synced. - if cfg.NoCFilters || !sp.server.blockManager.IsCurrent() { + if cfg.NoCFilters || !sp.server.syncManager.IsCurrent() { return } @@ -1337,7 +1333,7 @@ func (sp *serverPeer) OnGetCFilter(p *peer.Peer, msg *wire.MsgGetCFilter) { // OnGetCFilterV2 is invoked when a peer receives a getcfilterv2 wire message. func (sp *serverPeer) OnGetCFilterV2(_ *peer.Peer, msg *wire.MsgGetCFilterV2) { // Ignore request if the chain is not yet synced. - if !sp.server.blockManager.IsCurrent() { + if !sp.server.syncManager.IsCurrent() { return } @@ -1370,7 +1366,7 @@ func (sp *serverPeer) OnGetCFHeaders(p *peer.Peer, msg *wire.MsgGetCFHeaders) { } // Ignore request if CFs are disabled or the chain is not yet synced. - if cfg.NoCFilters || !sp.server.blockManager.IsCurrent() { + if cfg.NoCFilters || !sp.server.syncManager.IsCurrent() { return } @@ -1579,7 +1575,7 @@ func (sp *serverPeer) OnNotFound(p *peer.Peer, msg *wire.MsgNotFound) { return } } - sp.server.blockManager.QueueNotFound(msg, p) + sp.server.syncManager.QueueNotFound(msg, p) } // randomUint16Number returns a random uint16 in a specified input range. Note @@ -2317,9 +2313,10 @@ func (s *server) peerDoneHandler(sp *serverPeer) { sp.WaitForDisconnect() s.donePeers <- sp - // Only tell block manager we are gone if we ever told it we existed. + // Notify the net sync manager the peer is gone if it was ever notified that + // the peer existed. if sp.VersionKnown() { - s.blockManager.DonePeer(sp.Peer) + s.syncManager.DonePeer(sp.Peer) tipHash := &s.chain.BestSnapshot().Hash isTreasuryEnabled, err := s.chain.IsTreasuryAgendaActive(tipHash) @@ -2342,13 +2339,12 @@ func (s *server) peerDoneHandler(sp *serverPeer) { // peers to and from the server, banning peers, and broadcasting messages to // peers. It must be run in a goroutine. func (s *server) peerHandler(ctx context.Context) { - // Start the address manager and block manager, both of which are needed - // by peers. This is done here since their lifecycle is closely tied - // to this handler and rather than adding more channels to synchronize - // things, it's easier and slightly faster to simply start and stop them - // in this handler. + // Start the address manager and sync manager, both of which are needed by + // peers. This is done here since their lifecycle is closely tied to this + // handler and rather than adding more channels to synchronize things, it's + // easier and slightly faster to simply start and stop them in this handler. s.addrManager.Start() - s.blockManager.Start() + s.syncManager.Start() srvrLog.Tracef("Starting peer handler") @@ -2405,7 +2401,7 @@ out: } } - s.blockManager.Stop() + s.syncManager.Stop() s.addrManager.Stop() // Drain channels before exiting so nothing is left waiting around @@ -2652,7 +2648,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // which could result in a deadlock. block, ok := notification.Data.(*dcrutil.Block) if !ok { - bmgrLog.Warnf("New tip block checked notification is not a block.") + syncLog.Warnf("New tip block checked notification is not a block.") break } @@ -2670,13 +2666,13 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // Don't relay or notify RPC clients with winning tickets if we are not // current. Other peers that are current should already know about it // and clients, such as wallets, shouldn't be voting on old blocks. - if !s.blockManager.IsCurrent() { + if !s.syncManager.IsCurrent() { return } band, ok := notification.Data.(*blockchain.BlockAcceptedNtfnsData) if !ok { - bmgrLog.Warnf("Chain accepted notification is not " + + syncLog.Warnf("Chain accepted notification is not " + "BlockAcceptedNtfnsData.") break } @@ -2720,7 +2716,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // blockchain. wt, _, _, err := s.chain.LotteryDataForBlock(blockHash) if err != nil { - bmgrLog.Errorf("Couldn't calculate winning tickets for "+ + syncLog.Errorf("Couldn't calculate winning tickets for "+ "accepted block %v: %v", blockHash, err.Error()) } else { // Notify registered websocket clients of newly eligible tickets @@ -2765,8 +2761,8 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTBlockConnected: ntfn, ok := notification.Data.(*blockchain.BlockConnectedNtfnsData) if !ok { - bmgrLog.Warnf("Block connected notification is not " + - "BlockConnectedNtfnsData.") + syncLog.Warnf("Block connected notification is not " + + "BlockConnectedNtfnsData") break } block := ntfn.Block @@ -2875,7 +2871,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTSpentAndMissedTickets: tnd, ok := notification.Data.(*blockchain.TicketNotificationsData) if !ok { - bmgrLog.Warnf("Tickets connected notification is not " + + syncLog.Warnf("Tickets connected notification is not " + "TicketNotificationsData") break } @@ -2888,7 +2884,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTNewTickets: tnd, ok := notification.Data.(*blockchain.TicketNotificationsData) if !ok { - bmgrLog.Warnf("Tickets connected notification is not " + + syncLog.Warnf("Tickets connected notification is not " + "TicketNotificationsData") break } @@ -2901,7 +2897,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTBlockDisconnected: ntfn, ok := notification.Data.(*blockchain.BlockDisconnectedNtfnsData) if !ok { - bmgrLog.Warnf("Block disconnected notification is not " + + syncLog.Warnf("Block disconnected notification is not " + "BlockDisconnectedNtfnsData.") break } @@ -2994,7 +2990,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTReorganization: rd, ok := notification.Data.(*blockchain.ReorganizationNtfnsData) if !ok { - bmgrLog.Warnf("Chain reorganization notification is malformed") + syncLog.Warnf("Chain reorganization notification is malformed") break } @@ -3650,12 +3646,11 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP }, } s.txMemPool = mempool.New(&txC) - s.blockManager, err = newBlockManager(&blockManagerConfig{ + s.syncManager, err = netsync.New(&netsync.Config{ PeerNotifier: &s, Chain: s.chain, ChainParams: s.chainParams, SigCache: s.sigCache, - SubsidyCache: s.subsidyCache, TxMemPool: s.txMemPool, RpcServer: func() *rpcserver.Server { return s.rpcServer @@ -3703,7 +3698,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP TimeSource: s.timeSource, SubsidyCache: s.subsidyCache, ChainParams: s.chainParams, - BlockManager: s.blockManager, + BlockManager: s.syncManager, MiningTimeOffset: cfg.MiningTimeOffset, BestSnapshot: s.chain.BestSnapshot, BlockByHash: s.chain.BlockByHash, @@ -3747,9 +3742,9 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP PermitConnectionlessMining: cfg.SimNet || cfg.RegNet, BgBlkTmplGenerator: s.bg, MiningAddrs: cfg.miningAddrs, - ProcessBlock: s.blockManager.ProcessBlock, + ProcessBlock: s.syncManager.ProcessBlock, ConnectedCount: s.ConnectedCount, - IsCurrent: s.blockManager.IsCurrent, + IsCurrent: s.syncManager.IsCurrent, }) } @@ -3852,7 +3847,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP rpcsConfig := rpcserver.Config{ Listeners: rpcListeners, ConnMgr: &rpcConnManager{&s}, - SyncMgr: &rpcSyncMgr{server: &s, blockMgr: s.blockManager}, + SyncMgr: &rpcSyncMgr{server: &s, syncMgr: s.syncManager}, FeeEstimator: s.feeEstimator, TimeSource: s.timeSource, Services: s.services,