diff --git a/blockdb.go b/blockdb.go
index f3fbaa2cf0..38b23835b5 100644
--- a/blockdb.go
+++ b/blockdb.go
@@ -244,7 +244,7 @@ func dumpBlockChain(params *chaincfg.Params, b *blockchain.BlockChain) error {
progressLogger.LogBlockHeight(bl.MsgBlock(), tipHeight)
}
- bmgrLog.Infof("Successfully dumped the blockchain (%v blocks) to %v.",
+ srvrLog.Infof("Successfully dumped the blockchain (%v blocks) to %v.",
tipHeight, cfg.DumpBlockchain)
return nil
diff --git a/docs/json_rpc_api.mediawiki b/docs/json_rpc_api.mediawiki
index d1fe4c520b..675b916b1e 100644
--- a/docs/json_rpc_api.mediawiki
+++ b/docs/json_rpc_api.mediawiki
@@ -549,7 +549,7 @@ the method name for further details such as parameter and return information.
: Dynamically changes the debug logging level.
: The levelspec can either be a debug level or of the form =,=,...
: The valid debug levels are trace
, debug
, info
, warn
, error
, and critical
.
-: The valid subsystems are AMGR
, ADXR
, BCDB
, BMGR
, DCRD
, CHAN
, DISC
, PEER
, RPCS
, SCRP
, SRVR
, and TXMP
.
+: The valid subsystems are AMGR
, ADXR
, BCDB
, DCRD
, CHAN
, DISC
, PEER
, RPCS
, SCRP
, SRVR
, SYNC
, and TXMP
.
: Additionally, the special keyword show
can be used to get a list of the available subsystems.
|-
!Returns
@@ -559,7 +559,7 @@ the method name for further details such as parameter and return information.
|Done.
|-
!Example show
Return
-|Supported subsystems [AMGR ADXR BCDB BMGR DCRD CHAN DISC PEER RPCS SCRP SRVR TXMP]
+|Supported subsystems [AMGR ADXR BCDB DCRD CHAN DISC PEER RPCS SCRP SRVR SYNC TXMP]
|}
----
diff --git a/internal/fees/estimator.go b/internal/fees/estimator.go
index aeee8837ed..7bfe90ef36 100644
--- a/internal/fees/estimator.go
+++ b/internal/fees/estimator.go
@@ -623,11 +623,11 @@ func (stats *Estimator) removeFromMemPool(blocksInMemPool int32, rate feeRate) {
// function but not on a previous newMemPoolTx. This leaves the fee db
// in an undefined state and should never happen in regular use. If this
// happens, then there is a logic or coding error somewhere, either in
- // the estimator itself or on its hooking to the mempool/blockmanager.
- // Either way, the easiest way to fix this is to completely delete the
- // database and start again.
- // During development, you can use a panic() here and we might return it
- // after being confident that the estimator is completely bug free.
+ // the estimator itself or on its hooking to the mempool/network sync
+ // manager. Either way, the easiest way to fix this is to completely
+ // delete the database and start again. During development, you can use
+ // a panic() here and we might return it after being confident that the
+ // estimator is completely bug free.
log.Errorf("Transaction count in bucket index %d and confirmation "+
"index %d became < 0", bucketIdx, confirmIdx)
}
diff --git a/internal/mining/interface.go b/internal/mining/interface.go
index 998f91ef63..2a18271b77 100644
--- a/internal/mining/interface.go
+++ b/internal/mining/interface.go
@@ -67,7 +67,7 @@ type blockManagerFacade interface {
// best chain.
ForceReorganization(formerBest, newBest chainhash.Hash) error
- // IsCurrent returns whether or not the block manager believes it is synced
- // with the connected peers.
+ // IsCurrent returns whether or not the net sync manager believes it is
+ // synced with the connected peers.
IsCurrent() bool
}
diff --git a/internal/netsync/README.md b/internal/netsync/README.md
new file mode 100644
index 0000000000..3391fad683
--- /dev/null
+++ b/internal/netsync/README.md
@@ -0,0 +1,21 @@
+netsync
+=======
+
+[![Build Status](https://github.com/decred/dcrd/workflows/Build%20and%20Test/badge.svg)](https://github.com/decred/dcrd/actions)
+[![ISC License](https://img.shields.io/badge/license-ISC-blue.svg)](http://copyfree.org)
+[![Doc](https://img.shields.io/badge/doc-reference-blue.svg)](https://pkg.go.dev/github.com/decred/dcrd/internal/netsync)
+
+Package netsync implements a concurrency safe block syncing protocol.
+
+## Overview
+
+The provided implementation of SyncManager communicates with connected peers to
+perform an initial block download, keep the chain in sync, and announce new
+blocks connected to the chain. Currently the sync manager selects a single sync
+peer that it downloads all blocks from until it is up to date with the longest
+chain the sync peer is aware of.
+
+## License
+
+Package netsync is licensed under the [copyfree](http://copyfree.org) ISC
+License.
diff --git a/internal/netsync/doc.go b/internal/netsync/doc.go
new file mode 100644
index 0000000000..5fde072364
--- /dev/null
+++ b/internal/netsync/doc.go
@@ -0,0 +1,14 @@
+// Copyright (c) 2020 The Decred developers
+// Use of this source code is governed by an ISC
+// license that can be found in the LICENSE file.
+
+/*
+Package netsync implements a concurrency safe block syncing protocol.
+
+The provided implementation of SyncManager communicates with connected peers to
+perform an initial block download, keep the chain in sync, and announce new
+blocks connected to the chain. Currently the sync manager selects a single sync
+peer that it downloads all blocks from until it is up to date with the longest
+chain the sync peer is aware of.
+*/
+package netsync
diff --git a/internal/netsync/interface.go b/internal/netsync/interface.go
new file mode 100644
index 0000000000..4a9ac79b55
--- /dev/null
+++ b/internal/netsync/interface.go
@@ -0,0 +1,23 @@
+// Copyright (c) 2020 The Decred developers
+// Use of this source code is governed by an ISC
+// license that can be found in the LICENSE file.
+
+package netsync
+
+import (
+ "github.com/decred/dcrd/chaincfg/chainhash"
+ "github.com/decred/dcrd/dcrutil/v3"
+ "github.com/decred/dcrd/peer/v2"
+)
+
+// PeerNotifier provides an interface to notify peers of status changes related
+// to blocks and transactions.
+type PeerNotifier interface {
+ // AnnounceNewTransactions generates and relays inventory vectors and
+ // notifies websocket clients of the passed transactions.
+ AnnounceNewTransactions(txns []*dcrutil.Tx)
+
+ // UpdatePeerHeights updates the heights of all peers who have announced the
+ // latest connected main chain block, or a recognized orphan.
+ UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int64, updateSource *peer.Peer)
+}
diff --git a/internal/netsync/log.go b/internal/netsync/log.go
new file mode 100644
index 0000000000..cbe84d3a1e
--- /dev/null
+++ b/internal/netsync/log.go
@@ -0,0 +1,22 @@
+// Copyright (c) 2020 The Decred developers
+// Use of this source code is governed by an ISC
+// license that can be found in the LICENSE file.
+
+package netsync
+
+import (
+ "github.com/decred/slog"
+)
+
+// log is a logger that is initialized with no output filters. This
+// means the package will not perform any logging by default until the caller
+// requests it.
+// The default amount of logging is none.
+var log = slog.Disabled
+
+// UseLogger uses a specified Logger to output package logging info.
+// This should be used in preference to SetLogWriter if the caller is also
+// using slog.
+func UseLogger(logger slog.Logger) {
+ log = logger
+}
diff --git a/blockmanager.go b/internal/netsync/manager.go
similarity index 70%
rename from blockmanager.go
rename to internal/netsync/manager.go
index 177cc3499b..ce70b197ec 100644
--- a/blockmanager.go
+++ b/internal/netsync/manager.go
@@ -3,7 +3,7 @@
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
-package main
+package netsync
import (
"container/list"
@@ -13,7 +13,6 @@ import (
"sync/atomic"
"time"
- "github.com/decred/dcrd/blockchain/standalone/v2"
"github.com/decred/dcrd/blockchain/v4"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/chaincfg/v3"
@@ -108,7 +107,7 @@ type getSyncPeerMsg struct {
// requestFromPeerMsg is a message type to be sent across the message channel
// for requesting either blocks or transactions from a given peer. It routes
-// this through the block manager so the block manager doesn't ban the peer
+// this through the sync manager so the sync manager doesn't ban the peer
// when it sends this information back.
type requestFromPeerMsg struct {
peer *peerpkg.Peer
@@ -197,56 +196,7 @@ type headerNode struct {
hash *chainhash.Hash
}
-// peerNotifier provides an interface for server peer notifications.
-type peerNotifier interface {
- // AnnounceNewTransactions generates and relays inventory vectors and
- // notifies websocket clients of the passed transactions.
- AnnounceNewTransactions(txns []*dcrutil.Tx)
-
- // UpdatePeerHeights updates the heights of all peers who have
- // announced the latest connected main chain block, or a recognized orphan.
- UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int64, updateSource *peerpkg.Peer)
-}
-
-// blockManangerConfig is a configuration struct for a blockManager.
-type blockManagerConfig struct {
- PeerNotifier peerNotifier
-
- // The following fields are for accessing the chain and its configuration.
- Chain *blockchain.BlockChain
- ChainParams *chaincfg.Params
- SubsidyCache *standalone.SubsidyCache
-
- // SigCache defines the signature cache to use.
- SigCache *txscript.SigCache
-
- // The following field provides access to the mempool.
- TxMemPool *mempool.TxPool
-
- // RpcServer returns an instance of an RPC server to use for notifications.
- // It may return nil if there is no active RPC server.
- RpcServer func() *rpcserver.Server
-
- // DisableCheckpoints indicates whether or not the block manager should make
- // use of checkpoints.
- DisableCheckpoints bool
-
- // NoMiningStateSync indicates whether or not the block manager should
- // perform an initial mining state synchronization with peers once they are
- // believed to be fully synced.
- NoMiningStateSync bool
-
- // MaxPeers specifies the maximum number of peers the server is expected to
- // be connected with. It is primarily used as a hint for more efficient
- // synchronization.
- MaxPeers int
-
- // MaxOrphanTxs specifies the maximum number of orphan transactions the
- // transaction pool associated with the server supports.
- MaxOrphanTxs int
-}
-
-// peerSyncState stores additional information that the blockManager tracks
+// peerSyncState stores additional information that the sync manager tracks
// about a peer.
type peerSyncState struct {
syncCandidate bool
@@ -262,10 +212,10 @@ type orphanBlock struct {
expiration time.Time
}
-// blockManager provides a concurrency safe block manager for handling all
+// SyncManager provides a concurrency safe sync manager for handling all
// incoming blocks.
-type blockManager struct {
- cfg *blockManagerConfig
+type SyncManager struct {
+ cfg Config
started int32
shutdown int32
rejectedTxns map[chainhash.Hash]struct{}
@@ -304,33 +254,33 @@ type blockManager struct {
// resetHeaderState sets the headers-first mode state to values appropriate for
// syncing from a new peer.
-func (b *blockManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int64) {
- b.headersFirstMode = false
- b.headerList.Init()
- b.startHeader = nil
+func (m *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int64) {
+ m.headersFirstMode = false
+ m.headerList.Init()
+ m.startHeader = nil
// When there is a next checkpoint, add an entry for the latest known
// block into the header pool. This allows the next downloaded header
// to prove it links to the chain properly.
- if b.nextCheckpoint != nil {
+ if m.nextCheckpoint != nil {
node := headerNode{height: newestHeight, hash: newestHash}
- b.headerList.PushBack(&node)
+ m.headerList.PushBack(&node)
}
}
// SyncHeight returns latest known block being synced to.
-func (b *blockManager) SyncHeight() int64 {
- b.syncHeightMtx.Lock()
- defer b.syncHeightMtx.Unlock()
- return b.syncHeight
+func (m *SyncManager) SyncHeight() int64 {
+ m.syncHeightMtx.Lock()
+ defer m.syncHeightMtx.Unlock()
+ return m.syncHeight
}
// findNextHeaderCheckpoint returns the next checkpoint after the passed height.
// It returns nil when there is not one either because the height is already
// later than the final checkpoint or some other reason such as disabled
// checkpoints.
-func (b *blockManager) findNextHeaderCheckpoint(height int64) *chaincfg.Checkpoint {
- checkpoints := b.cfg.Chain.Checkpoints()
+func (m *SyncManager) findNextHeaderCheckpoint(height int64) *chaincfg.Checkpoint {
+ checkpoints := m.cfg.Chain.Checkpoints()
if len(checkpoints) == 0 {
return nil
}
@@ -371,15 +321,15 @@ func chainBlockLocatorToHashes(locator blockchain.BlockLocator) []chainhash.Hash
// download/sync the blockchain from. When syncing is already running, it
// simply returns. It also examines the candidates for any which are no longer
// candidates and removes them as needed.
-func (b *blockManager) startSync() {
+func (m *SyncManager) startSync() {
// Return now if we're already syncing.
- if b.syncPeer != nil {
+ if m.syncPeer != nil {
return
}
- best := b.cfg.Chain.BestSnapshot()
+ best := m.cfg.Chain.BestSnapshot()
var bestPeer *peerpkg.Peer
- for peer, state := range b.peerStates {
+ for peer, state := range m.peerStates {
if !state.syncCandidate {
continue
}
@@ -408,9 +358,9 @@ func (b *blockManager) startSync() {
// fully synced to whatever the chain believes when there is no candidate
// for a sync peer.
if bestPeer == nil {
- b.isCurrentMtx.Lock()
- b.isCurrent = b.cfg.Chain.IsCurrent()
- b.isCurrentMtx.Unlock()
+ m.isCurrentMtx.Lock()
+ m.isCurrent = m.cfg.Chain.IsCurrent()
+ m.isCurrentMtx.Unlock()
}
// Start syncing from the best peer if one was selected.
@@ -418,25 +368,25 @@ func (b *blockManager) startSync() {
// Clear the requestedBlocks if the sync peer changes, otherwise
// we may ignore blocks we need that the last sync peer failed
// to send.
- b.requestedBlocks = make(map[chainhash.Hash]struct{})
+ m.requestedBlocks = make(map[chainhash.Hash]struct{})
- blkLocator, err := b.cfg.Chain.LatestBlockLocator()
+ blkLocator, err := m.cfg.Chain.LatestBlockLocator()
if err != nil {
- bmgrLog.Errorf("Failed to get block locator for the "+
- "latest block: %v", err)
+ log.Errorf("Failed to get block locator for the latest block: %v",
+ err)
return
}
locator := chainBlockLocatorToHashes(blkLocator)
- bmgrLog.Infof("Syncing to block height %d from peer %v",
+ log.Infof("Syncing to block height %d from peer %v",
bestPeer.LastBlock(), bestPeer.Addr())
// The chain is not synced whenever the current best height is less than
// the height to sync to.
if best.Height < bestPeer.LastBlock() {
- b.isCurrentMtx.Lock()
- b.isCurrent = false
- b.isCurrentMtx.Unlock()
+ m.isCurrentMtx.Lock()
+ m.isCurrent = false
+ m.isCurrentMtx.Unlock()
}
// When the current height is less than a known checkpoint we
@@ -456,60 +406,59 @@ func (b *blockManager) startSync() {
// and fully validate them. Finally, regression test mode does
// not support the headers-first approach so do normal block
// downloads when in regression test mode.
- if b.nextCheckpoint != nil &&
- best.Height < b.nextCheckpoint.Height &&
- !b.cfg.DisableCheckpoints {
+ if m.nextCheckpoint != nil &&
+ best.Height < m.nextCheckpoint.Height &&
+ !m.cfg.DisableCheckpoints {
- err := bestPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
+ err := bestPeer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash)
if err != nil {
- bmgrLog.Errorf("Failed to push getheadermsg for the "+
- "latest blocks: %v", err)
+ log.Errorf("Failed to push getheadermsg for the latest "+
+ "blocks: %v", err)
return
}
- b.headersFirstMode = true
- bmgrLog.Infof("Downloading headers for blocks %d to "+
- "%d from peer %s", best.Height+1,
- b.nextCheckpoint.Height, bestPeer.Addr())
+ m.headersFirstMode = true
+ log.Infof("Downloading headers for blocks %d to %d from peer %s",
+ best.Height+1, m.nextCheckpoint.Height, bestPeer.Addr())
} else {
err := bestPeer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
- bmgrLog.Errorf("Failed to push getblocksmsg for the "+
- "latest blocks: %v", err)
+ log.Errorf("Failed to push getblocksmsg for the latest "+
+ "blocks: %v", err)
return
}
}
- b.syncPeer = bestPeer
- b.syncHeightMtx.Lock()
- b.syncHeight = bestPeer.LastBlock()
- b.syncHeightMtx.Unlock()
+ m.syncPeer = bestPeer
+ m.syncHeightMtx.Lock()
+ m.syncHeight = bestPeer.LastBlock()
+ m.syncHeightMtx.Unlock()
} else {
- bmgrLog.Warnf("No sync peer candidates available")
+ log.Warnf("No sync peer candidates available")
}
}
// isSyncCandidate returns whether or not the peer is a candidate to consider
// syncing from.
-func (b *blockManager) isSyncCandidate(peer *peerpkg.Peer) bool {
+func (m *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
// The peer is not a candidate for sync if it's not a full node.
return peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork
}
-// syncMiningStateAfterSync polls the blockManager for the current sync
-// state; if the manager is synced, it executes a call to the peer to
-// sync the mining state to the network.
-func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) {
+// syncMiningStateAfterSync polls the sync manager for the current sync state
+// and once the manager believes the chain is fully synced, it executes a call
+// to the peer to sync the mining state.
+func (m *SyncManager) syncMiningStateAfterSync(peer *peerpkg.Peer) {
go func() {
for {
select {
case <-time.After(3 * time.Second):
- case <-b.quit:
+ case <-m.quit:
return
}
if !peer.Connected() {
return
}
- if !b.IsCurrent() {
+ if !m.IsCurrent() {
continue
}
@@ -532,9 +481,8 @@ func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) {
wire.InitStateHeadBlockVotes,
wire.InitStateTSpends)
if err != nil {
- bmgrLog.Errorf("Unexpected error while "+
- "building getinitstate msg: %v",
- err)
+ log.Errorf("Unexpected error while building getinitstate "+
+ "msg: %v", err)
return
}
msg = m
@@ -549,30 +497,30 @@ func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) {
// handleNewPeerMsg deals with new peers that have signalled they may
// be considered as a sync peer (they have already successfully negotiated). It
// also starts syncing if needed. It is invoked from the syncHandler goroutine.
-func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) {
+func (m *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
// Ignore if in the process of shutting down.
- if atomic.LoadInt32(&b.shutdown) != 0 {
+ if atomic.LoadInt32(&m.shutdown) != 0 {
return
}
- bmgrLog.Infof("New valid peer %s (%s)", peer, peer.UserAgent())
+ log.Infof("New valid peer %s (%s)", peer, peer.UserAgent())
// Initialize the peer state
- isSyncCandidate := b.isSyncCandidate(peer)
- b.peerStates[peer] = &peerSyncState{
+ isSyncCandidate := m.isSyncCandidate(peer)
+ m.peerStates[peer] = &peerSyncState{
syncCandidate: isSyncCandidate,
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
}
// Start syncing by choosing the best candidate if needed.
- if isSyncCandidate && b.syncPeer == nil {
- b.startSync()
+ if isSyncCandidate && m.syncPeer == nil {
+ m.startSync()
}
// Grab the mining state from this peer once synced when enabled.
- if !b.cfg.NoMiningStateSync {
- b.syncMiningStateAfterSync(peer)
+ if !m.cfg.NoMiningStateSync {
+ m.syncMiningStateAfterSync(peer)
}
}
@@ -580,20 +528,20 @@ func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) {
// removes the peer as a candidate for syncing and in the case where it was
// the current sync peer, attempts to select a new best peer to sync from. It
// is invoked from the syncHandler goroutine.
-func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) {
- state, exists := b.peerStates[peer]
+func (m *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
+ state, exists := m.peerStates[peer]
if !exists {
- bmgrLog.Warnf("Received done peer message for unknown peer %s", peer)
+ log.Warnf("Received done peer message for unknown peer %s", peer)
return
}
// Remove the peer from the list of candidate peers.
- delete(b.peerStates, peer)
+ delete(m.peerStates, peer)
// Remove requested transactions from the global map so that they will
// be fetched from elsewhere next time we get an inv.
for txHash := range state.requestedTxns {
- delete(b.requestedTxns, txHash)
+ delete(m.requestedTxns, txHash)
}
// Remove requested blocks from the global map so that they will be
@@ -601,19 +549,19 @@ func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) {
// TODO(oga) we could possibly here check which peers have these blocks
// and request them now to speed things up a little.
for blockHash := range state.requestedBlocks {
- delete(b.requestedBlocks, blockHash)
+ delete(m.requestedBlocks, blockHash)
}
// Attempt to find a new peer to sync from if the quitting peer is the
// sync peer. Also, reset the headers-first state if in headers-first
// mode so
- if b.syncPeer == peer {
- b.syncPeer = nil
- if b.headersFirstMode {
- best := b.cfg.Chain.BestSnapshot()
- b.resetHeaderState(&best.Hash, best.Height)
+ if m.syncPeer == peer {
+ m.syncPeer = nil
+ if m.headersFirstMode {
+ best := m.cfg.Chain.BestSnapshot()
+ m.resetHeaderState(&best.Hash, best.Height)
}
- b.startSync()
+ m.startSync()
}
}
@@ -685,11 +633,11 @@ func errToWireRejectCode(err error) (wire.RejectCode, string) {
}
// handleTxMsg handles transaction messages from all peers.
-func (b *blockManager) handleTxMsg(tmsg *txMsg) {
+func (m *SyncManager) handleTxMsg(tmsg *txMsg) {
peer := tmsg.peer
- state, exists := b.peerStates[peer]
+ state, exists := m.peerStates[peer]
if !exists {
- bmgrLog.Warnf("Received tx message from unknown peer %s", peer)
+ log.Warnf("Received tx message from unknown peer %s", peer)
return
}
@@ -706,16 +654,16 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// Ignore transactions that we have already rejected. Do not
// send a reject message here because if the transaction was already
// rejected, the transaction was unsolicited.
- if _, exists = b.rejectedTxns[*txHash]; exists {
- bmgrLog.Debugf("Ignoring unsolicited previously rejected "+
- "transaction %v from %s", txHash, peer)
+ if _, exists = m.rejectedTxns[*txHash]; exists {
+ log.Debugf("Ignoring unsolicited previously rejected transaction %v "+
+ "from %s", txHash, peer)
return
}
// Process the transaction to include validation, insertion in the
// memory pool, orphan handling, etc.
- allowOrphans := b.cfg.MaxOrphanTxs > 0
- acceptedTxs, err := b.cfg.TxMemPool.ProcessTransaction(tmsg.tx,
+ allowOrphans := m.cfg.MaxOrphanTxs > 0
+ acceptedTxs, err := m.cfg.TxMemPool.ProcessTransaction(tmsg.tx,
allowOrphans, true, true, mempool.Tag(tmsg.peer.ID()))
// Remove transaction from request maps. Either the mempool/chain
@@ -723,12 +671,12 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// instances of trying to fetch it, or we failed to insert and thus
// we'll retry next time we get an inv.
delete(state.requestedTxns, *txHash)
- delete(b.requestedTxns, *txHash)
+ delete(m.requestedTxns, *txHash)
if err != nil {
// Do not request this transaction again until a new block
// has been processed.
- limitAdd(b.rejectedTxns, *txHash, maxRejectedTxns)
+ limitAdd(m.rejectedTxns, *txHash, maxRejectedTxns)
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
@@ -736,11 +684,9 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// so log it as an actual error.
var rErr mempool.RuleError
if errors.As(err, &rErr) {
- bmgrLog.Debugf("Rejected transaction %v from %s: %v",
- txHash, peer, err)
+ log.Debugf("Rejected transaction %v from %s: %v", txHash, peer, err)
} else {
- bmgrLog.Errorf("Failed to process transaction %v: %v",
- txHash, err)
+ log.Errorf("Failed to process transaction %v: %v", txHash, err)
}
// Convert the error into an appropriate reject message and
@@ -750,7 +696,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
return
}
- b.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs)
+ m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs)
}
// isKnownOrphan returns whether the passed hash is currently a known orphan.
@@ -762,12 +708,12 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// orphans and react accordingly.
//
// This function is safe for concurrent access.
-func (b *blockManager) isKnownOrphan(hash *chainhash.Hash) bool {
+func (m *SyncManager) isKnownOrphan(hash *chainhash.Hash) bool {
// Protect concurrent access. Using a read lock only so multiple readers
// can query without blocking each other.
- b.orphanLock.RLock()
- _, exists := b.orphans[*hash]
- b.orphanLock.RUnlock()
+ m.orphanLock.RLock()
+ _, exists := m.orphans[*hash]
+ m.orphanLock.RUnlock()
return exists
}
@@ -775,18 +721,18 @@ func (b *blockManager) isKnownOrphan(hash *chainhash.Hash) bool {
// of orphan blocks.
//
// This function is safe for concurrent access.
-func (b *blockManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash {
+func (m *SyncManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash {
// Protect concurrent access. Using a read lock only so multiple
// readers can query without blocking each other.
- b.orphanLock.RLock()
- defer b.orphanLock.RUnlock()
+ m.orphanLock.RLock()
+ defer m.orphanLock.RUnlock()
// Keep looping while the parent of each orphaned block is known and is an
// orphan itself.
orphanRoot := hash
prevHash := hash
for {
- orphan, exists := b.orphans[*prevHash]
+ orphan, exists := m.orphans[*prevHash]
if !exists {
break
}
@@ -799,21 +745,21 @@ func (b *blockManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash {
// removeOrphanBlock removes the passed orphan block from the orphan pool and
// previous orphan index.
-func (b *blockManager) removeOrphanBlock(orphan *orphanBlock) {
+func (m *SyncManager) removeOrphanBlock(orphan *orphanBlock) {
// Protect concurrent access.
- b.orphanLock.Lock()
- defer b.orphanLock.Unlock()
+ m.orphanLock.Lock()
+ defer m.orphanLock.Unlock()
// Remove the orphan block from the orphan pool.
orphanHash := orphan.block.Hash()
- delete(b.orphans, *orphanHash)
+ delete(m.orphans, *orphanHash)
// Remove the reference from the previous orphan index too. An indexing
// for loop is intentionally used over a range here as range does not
// reevaluate the slice on each iteration nor does it adjust the index
// for the modified slice.
prevHash := &orphan.block.MsgBlock().Header.PrevBlock
- orphans := b.prevOrphans[*prevHash]
+ orphans := m.prevOrphans[*prevHash]
for i := 0; i < len(orphans); i++ {
hash := orphans[i].block.Hash()
if hash.IsEqual(orphanHash) {
@@ -823,12 +769,12 @@ func (b *blockManager) removeOrphanBlock(orphan *orphanBlock) {
i--
}
}
- b.prevOrphans[*prevHash] = orphans
+ m.prevOrphans[*prevHash] = orphans
// Remove the map entry altogether if there are no longer any orphans
// which depend on the parent hash.
- if len(b.prevOrphans[*prevHash]) == 0 {
- delete(b.prevOrphans, *prevHash)
+ if len(m.prevOrphans[*prevHash]) == 0 {
+ delete(m.prevOrphans, *prevHash)
}
}
@@ -837,34 +783,34 @@ func (b *blockManager) removeOrphanBlock(orphan *orphanBlock) {
// any expired blocks so a separate cleanup poller doesn't need to be run. It
// also imposes a maximum limit on the number of outstanding orphan blocks and
// will remove the oldest received orphan block if the limit is exceeded.
-func (b *blockManager) addOrphanBlock(block *dcrutil.Block) {
+func (m *SyncManager) addOrphanBlock(block *dcrutil.Block) {
// Remove expired orphan blocks.
- for _, oBlock := range b.orphans {
+ for _, oBlock := range m.orphans {
if time.Now().After(oBlock.expiration) {
- b.removeOrphanBlock(oBlock)
+ m.removeOrphanBlock(oBlock)
continue
}
// Update the oldest orphan block pointer so it can be discarded
// in case the orphan pool fills up.
- if b.oldestOrphan == nil ||
- oBlock.expiration.Before(b.oldestOrphan.expiration) {
- b.oldestOrphan = oBlock
+ if m.oldestOrphan == nil ||
+ oBlock.expiration.Before(m.oldestOrphan.expiration) {
+ m.oldestOrphan = oBlock
}
}
// Limit orphan blocks to prevent memory exhaustion.
- if len(b.orphans)+1 > maxOrphanBlocks {
+ if len(m.orphans)+1 > maxOrphanBlocks {
// Remove the oldest orphan to make room for the new one.
- b.removeOrphanBlock(b.oldestOrphan)
- b.oldestOrphan = nil
+ m.removeOrphanBlock(m.oldestOrphan)
+ m.oldestOrphan = nil
}
// Protect concurrent access. This is intentionally done here instead
// of near the top since removeOrphanBlock does its own locking and
// the range iterator is not invalidated by removing map entries.
- b.orphanLock.Lock()
- defer b.orphanLock.Unlock()
+ m.orphanLock.Lock()
+ defer m.orphanLock.Unlock()
// Insert the block into the orphan map with an expiration time
// 1 hour from now.
@@ -873,11 +819,11 @@ func (b *blockManager) addOrphanBlock(block *dcrutil.Block) {
block: block,
expiration: expiration,
}
- b.orphans[*block.Hash()] = oBlock
+ m.orphans[*block.Hash()] = oBlock
// Add to previous hash lookup index for faster dependency lookups.
prevHash := &block.MsgBlock().Header.PrevBlock
- b.prevOrphans[*prevHash] = append(b.prevOrphans[*prevHash], oBlock)
+ m.prevOrphans[*prevHash] = append(m.prevOrphans[*prevHash], oBlock)
}
// processOrphans determines if there are any orphans which depend on the passed
@@ -887,7 +833,7 @@ func (b *blockManager) addOrphanBlock(block *dcrutil.Block) {
//
// The flags do not modify the behavior of this function directly, however they
// are needed to pass along to maybeAcceptBlock.
-func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.BehaviorFlags) error {
+func (m *SyncManager) processOrphans(hash *chainhash.Hash, flags blockchain.BehaviorFlags) error {
// Start with processing at least the passed hash. Leave a little room for
// additional orphan blocks that need to be processed without needing to
// grow the array in the common case.
@@ -906,21 +852,21 @@ func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beh
// is intentionally used over a range here as range does not reevaluate
// the slice on each iteration nor does it adjust the index for the
// modified slice.
- for i := 0; i < len(b.prevOrphans[*processHash]); i++ {
- orphan := b.prevOrphans[*processHash][i]
+ for i := 0; i < len(m.prevOrphans[*processHash]); i++ {
+ orphan := m.prevOrphans[*processHash][i]
if orphan == nil {
- bmgrLog.Warnf("Found a nil entry at index %d in the orphan "+
+ log.Warnf("Found a nil entry at index %d in the orphan "+
"dependency list for block %v", i, processHash)
continue
}
// Remove the orphan from the orphan pool.
orphanHash := orphan.block.Hash()
- b.removeOrphanBlock(orphan)
+ m.removeOrphanBlock(orphan)
i--
// Potentially accept the block into the block chain.
- _, err := b.cfg.Chain.ProcessBlock(orphan.block, flags)
+ _, err := m.cfg.Chain.ProcessBlock(orphan.block, flags)
if err != nil {
return err
}
@@ -944,17 +890,17 @@ func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beh
// whether or not the block is an orphan, in which case the fork length will
// also be zero as expected, because it, by definition, does not connect to the
// best chain.
-func (b *blockManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, bool, error) {
+func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, bool, error) {
// Process the block to include validation, best chain selection, etc.
//
- // Also, keep track of orphan blocks in the block manager when the error
+ // Also, keep track of orphan blocks in the sync manager when the error
// returned indicates the block is an orphan.
blockHash := block.Hash()
- forkLen, err := b.cfg.Chain.ProcessBlock(block, flags)
+ forkLen, err := m.cfg.Chain.ProcessBlock(block, flags)
if errors.Is(err, blockchain.ErrMissingParent) {
- bmgrLog.Infof("Adding orphan block %v with parent %v", blockHash,
+ log.Infof("Adding orphan block %v with parent %v", blockHash,
block.MsgBlock().Header.PrevBlock)
- b.addOrphanBlock(block)
+ m.addOrphanBlock(block)
// The fork length of orphans is unknown since they, by definition, do
// not connect to the best chain.
@@ -966,37 +912,37 @@ func (b *blockManager) processBlockAndOrphans(block *dcrutil.Block, flags blockc
// Accept any orphan blocks that depend on this block (they are no longer
// orphans) and repeat for those accepted blocks until there are no more.
- if err := b.processOrphans(blockHash, flags); err != nil {
+ if err := m.processOrphans(blockHash, flags); err != nil {
return 0, false, err
}
// The chain is considered synced once both the blockchain believes it is
// current and the sync height is reached or exceeded.
- best := b.cfg.Chain.BestSnapshot()
- syncHeight := b.SyncHeight()
- if best.Height >= syncHeight && b.cfg.Chain.IsCurrent() {
- b.isCurrentMtx.Lock()
- b.isCurrent = true
- b.isCurrentMtx.Unlock()
+ best := m.cfg.Chain.BestSnapshot()
+ syncHeight := m.SyncHeight()
+ if best.Height >= syncHeight && m.cfg.Chain.IsCurrent() {
+ m.isCurrentMtx.Lock()
+ m.isCurrent = true
+ m.isCurrentMtx.Unlock()
}
return forkLen, false, nil
}
// handleBlockMsg handles block messages from all peers.
-func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
+func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer
- state, exists := b.peerStates[peer]
+ state, exists := m.peerStates[peer]
if !exists {
- bmgrLog.Warnf("Received block message from unknown peer %s", peer)
+ log.Warnf("Received block message from unknown peer %s", peer)
return
}
// If we didn't ask for this block then the peer is misbehaving.
blockHash := bmsg.block.Hash()
if _, exists := state.requestedBlocks[*blockHash]; !exists {
- bmgrLog.Warnf("Got unrequested block %v from %s -- "+
- "disconnecting", blockHash, bmsg.peer.Addr())
+ log.Warnf("Got unrequested block %v from %s -- disconnecting",
+ blockHash, bmsg.peer.Addr())
bmsg.peer.Disconnect()
return
}
@@ -1010,16 +956,16 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// properly.
isCheckpointBlock := false
behaviorFlags := blockchain.BFNone
- if b.headersFirstMode {
- firstNodeEl := b.headerList.Front()
+ if m.headersFirstMode {
+ firstNodeEl := m.headerList.Front()
if firstNodeEl != nil {
firstNode := firstNodeEl.Value.(*headerNode)
if blockHash.IsEqual(firstNode.hash) {
behaviorFlags |= blockchain.BFFastAdd
- if firstNode.hash.IsEqual(b.nextCheckpoint.Hash) {
+ if firstNode.hash.IsEqual(m.nextCheckpoint.Hash) {
isCheckpointBlock = true
} else {
- b.headerList.Remove(firstNodeEl)
+ m.headerList.Remove(firstNodeEl)
}
}
}
@@ -1029,11 +975,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
delete(state.requestedBlocks, *blockHash)
- delete(b.requestedBlocks, *blockHash)
+ delete(m.requestedBlocks, *blockHash)
// Process the block to include validation, best chain selection, orphan
// handling, etc.
- forkLen, isOrphan, err := b.processBlockAndOrphans(bmsg.block, behaviorFlags)
+ forkLen, isOrphan, err := m.processBlockAndOrphans(bmsg.block, behaviorFlags)
if err != nil {
// When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
@@ -1041,16 +987,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// it as an actual error.
var rErr blockchain.RuleError
if errors.As(err, &rErr) {
- bmgrLog.Infof("Rejected block %v from %s: %v", blockHash,
- peer, err)
+ log.Infof("Rejected block %v from %s: %v", blockHash, peer, err)
} else {
- bmgrLog.Errorf("Failed to process block %v: %v",
- blockHash, err)
+ log.Errorf("Failed to process block %v: %v", blockHash, err)
}
var dbErr database.Error
if errors.As(err, &dbErr) && dbErr.ErrorCode ==
database.ErrCorruption {
- bmgrLog.Errorf("Critical failure: %v", dbErr.Error())
+ log.Errorf("Critical failure: %v", err)
}
// Convert the error into an appropriate reject message and
@@ -1063,29 +1007,29 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// Request the parents for the orphan block from the peer that sent it.
onMainChain := !isOrphan && forkLen == 0
if isOrphan {
- orphanRoot := b.orphanRoot(blockHash)
- blkLocator, err := b.cfg.Chain.LatestBlockLocator()
+ orphanRoot := m.orphanRoot(blockHash)
+ blkLocator, err := m.cfg.Chain.LatestBlockLocator()
if err != nil {
- bmgrLog.Warnf("Failed to get block locator for the "+
- "latest block: %v", err)
+ log.Warnf("Failed to get block locator for the latest block: %v",
+ err)
} else {
locator := chainBlockLocatorToHashes(blkLocator)
err = peer.PushGetBlocksMsg(locator, orphanRoot)
if err != nil {
- bmgrLog.Warnf("Failed to push getblocksmsg for the "+
- "latest block: %v", err)
+ log.Warnf("Failed to push getblocksmsg for the latest block: "+
+ "%v", err)
}
}
} else {
// When the block is not an orphan, log information about it and
// update the chain state.
- b.progressLogger.LogBlockHeight(bmsg.block.MsgBlock(), b.SyncHeight())
+ m.progressLogger.LogBlockHeight(bmsg.block.MsgBlock(), m.SyncHeight())
if onMainChain {
// Notify stake difficulty subscribers and prune invalidated
// transactions.
- best := b.cfg.Chain.BestSnapshot()
- if r := b.cfg.RpcServer(); r != nil {
+ best := m.cfg.Chain.BestSnapshot()
+ if r := m.cfg.RpcServer(); r != nil {
// Update registered websocket clients on the
// current stake difficulty.
r.NotifyStakeDifficulty(
@@ -1095,14 +1039,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
StakeDifficulty: best.NextStakeDiff,
})
}
- b.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height)
- b.cfg.TxMemPool.PruneExpiredTx()
+ m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height)
+ m.cfg.TxMemPool.PruneExpiredTx()
// Clear the rejected transactions.
- b.rejectedTxns = make(map[chainhash.Hash]struct{})
+ m.rejectedTxns = make(map[chainhash.Hash]struct{})
// Proactively evict SigCache entries.
- b.proactivelyEvictSigCacheEntries(best.Height)
+ m.proactivelyEvictSigCacheEntries(best.Height)
}
}
@@ -1115,13 +1059,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// chain was not yet current or lost the lock announcement race.
blockHeight := int64(bmsg.block.MsgBlock().Header.Height)
peer.UpdateLastBlockHeight(blockHeight)
- if isOrphan || (onMainChain && b.IsCurrent()) {
- go b.cfg.PeerNotifier.UpdatePeerHeights(blockHash, blockHeight,
+ if isOrphan || (onMainChain && m.IsCurrent()) {
+ go m.cfg.PeerNotifier.UpdatePeerHeights(blockHash, blockHeight,
bmsg.peer)
}
// Nothing more to do if we aren't in headers-first mode.
- if !b.headersFirstMode {
+ if !m.headersFirstMode {
return
}
@@ -1129,9 +1073,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// request more blocks using the header list when the request queue is
// getting short.
if !isCheckpointBlock {
- if b.startHeader != nil &&
+ if m.startHeader != nil &&
len(state.requestedBlocks) < minInFlightBlocks {
- b.fetchHeaderBlocks()
+ m.fetchHeaderBlocks()
}
return
}
@@ -1140,32 +1084,32 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// there is a next checkpoint, get the next round of headers by asking
// for headers starting from the block after this one up to the next
// checkpoint.
- prevHeight := b.nextCheckpoint.Height
- prevHash := b.nextCheckpoint.Hash
- b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight)
- if b.nextCheckpoint != nil {
+ prevHeight := m.nextCheckpoint.Height
+ prevHash := m.nextCheckpoint.Hash
+ m.nextCheckpoint = m.findNextHeaderCheckpoint(prevHeight)
+ if m.nextCheckpoint != nil {
locator := []chainhash.Hash{*prevHash}
- err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
+ err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash)
if err != nil {
- bmgrLog.Warnf("Failed to send getheaders message to "+
- "peer %s: %v", peer.Addr(), err)
+ log.Warnf("Failed to send getheaders message to peer %s: %v",
+ peer.Addr(), err)
return
}
- bmgrLog.Infof("Downloading headers for blocks %d to %d from peer %s",
- prevHeight+1, b.nextCheckpoint.Height, b.syncPeer.Addr())
+ log.Infof("Downloading headers for blocks %d to %d from peer %s",
+ prevHeight+1, m.nextCheckpoint.Height, m.syncPeer.Addr())
return
}
// This is headers-first mode, the block is a checkpoint, and there are
// no more checkpoints, so switch to normal mode by requesting blocks
// from the block after this one up to the end of the chain (zero hash).
- b.headersFirstMode = false
- b.headerList.Init()
- bmgrLog.Infof("Reached the final checkpoint -- switching to normal mode")
+ m.headersFirstMode = false
+ m.headerList.Init()
+ log.Infof("Reached the final checkpoint -- switching to normal mode")
locator := []chainhash.Hash{*blockHash}
err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
- bmgrLog.Warnf("Failed to send getblocks message to peer %s: %v",
+ log.Warnf("Failed to send getblocks message to peer %s: %v",
peer.Addr(), err)
return
}
@@ -1174,88 +1118,87 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// proactivelyEvictSigCacheEntries fetches the block that is
// txscript.ProactiveEvictionDepth levels deep from bestHeight and passes it to
// SigCache to evict the entries associated with the transactions in that block.
-func (b *blockManager) proactivelyEvictSigCacheEntries(bestHeight int64) {
+func (m *SyncManager) proactivelyEvictSigCacheEntries(bestHeight int64) {
// Nothing to do before the eviction depth is reached.
if bestHeight <= txscript.ProactiveEvictionDepth {
return
}
evictHeight := bestHeight - txscript.ProactiveEvictionDepth
- block, err := b.cfg.Chain.BlockByHeight(evictHeight)
+ block, err := m.cfg.Chain.BlockByHeight(evictHeight)
if err != nil {
- bmgrLog.Warnf("Failed to retrieve the block at height %d: %v",
- evictHeight, err)
+ log.Warnf("Failed to retrieve the block at height %d: %v", evictHeight,
+ err)
return
}
- b.cfg.SigCache.EvictEntries(block.MsgBlock())
+ m.cfg.SigCache.EvictEntries(block.MsgBlock())
}
// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
// list of blocks to be downloaded based on the current list of headers.
-func (b *blockManager) fetchHeaderBlocks() {
+func (m *SyncManager) fetchHeaderBlocks() {
// Nothing to do if there is no start header.
- if b.startHeader == nil {
- bmgrLog.Warnf("fetchHeaderBlocks called with no start header")
+ if m.startHeader == nil {
+ log.Warnf("fetchHeaderBlocks called with no start header")
return
}
// Build up a getdata request for the list of blocks the headers
// describe. The size hint will be limited to wire.MaxInvPerMsg by
// the function, so no need to double check it here.
- gdmsg := wire.NewMsgGetDataSizeHint(uint(b.headerList.Len()))
+ gdmsg := wire.NewMsgGetDataSizeHint(uint(m.headerList.Len()))
numRequested := 0
- for e := b.startHeader; e != nil; e = e.Next() {
+ for e := m.startHeader; e != nil; e = e.Next() {
node, ok := e.Value.(*headerNode)
if !ok {
- bmgrLog.Warn("Header list node type is not a headerNode")
+ log.Warn("Header list node type is not a headerNode")
continue
}
iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
- haveInv, err := b.haveInventory(iv)
+ haveInv, err := m.haveInventory(iv)
if err != nil {
- bmgrLog.Warnf("Unexpected failure when checking for "+
- "existing inventory during header block "+
- "fetch: %v", err)
+ log.Warnf("Unexpected failure when checking for existing "+
+ "inventory during header block fetch: %v", err)
continue
}
if !haveInv {
- b.requestedBlocks[*node.hash] = struct{}{}
- syncPeerState := b.peerStates[b.syncPeer]
+ m.requestedBlocks[*node.hash] = struct{}{}
+ syncPeerState := m.peerStates[m.syncPeer]
syncPeerState.requestedBlocks[*node.hash] = struct{}{}
err = gdmsg.AddInvVect(iv)
if err != nil {
- bmgrLog.Warnf("Failed to add invvect while fetching "+
- "block headers: %v", err)
+ log.Warnf("Failed to add invvect while fetching block "+
+ "headers: %v", err)
}
numRequested++
}
- b.startHeader = e.Next()
+ m.startHeader = e.Next()
if numRequested >= wire.MaxInvPerMsg {
break
}
}
if len(gdmsg.InvList) > 0 {
- b.syncPeer.QueueMessage(gdmsg, nil)
+ m.syncPeer.QueueMessage(gdmsg, nil)
}
}
// handleHeadersMsg handles headers messages from all peers.
-func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
+func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
peer := hmsg.peer
- _, exists := b.peerStates[peer]
+ _, exists := m.peerStates[peer]
if !exists {
- bmgrLog.Warnf("Received headers message from unknown peer %s", peer)
+ log.Warnf("Received headers message from unknown peer %s", peer)
return
}
// The remote peer is misbehaving if we didn't request headers.
msg := hmsg.headers
numHeaders := len(msg.Headers)
- if !b.headersFirstMode {
- bmgrLog.Warnf("Got %d unrequested headers from %s -- "+
- "disconnecting", numHeaders, peer.Addr())
+ if !m.headersFirstMode {
+ log.Warnf("Got %d unrequested headers from %s -- disconnecting",
+ numHeaders, peer.Addr())
peer.Disconnect()
return
}
@@ -1274,10 +1217,10 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
finalHash = &blockHash
// Ensure there is a previous header to compare against.
- prevNodeEl := b.headerList.Back()
+ prevNodeEl := m.headerList.Back()
if prevNodeEl == nil {
- bmgrLog.Warnf("Header list does not contain a previous " +
- "element as expected -- disconnecting peer")
+ log.Warnf("Header list does not contain a previous element as " +
+ "expected -- disconnecting peer")
peer.Disconnect()
return
}
@@ -1288,31 +1231,28 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
prevNode := prevNodeEl.Value.(*headerNode)
if prevNode.hash.IsEqual(&blockHeader.PrevBlock) {
node.height = prevNode.height + 1
- e := b.headerList.PushBack(&node)
- if b.startHeader == nil {
- b.startHeader = e
+ e := m.headerList.PushBack(&node)
+ if m.startHeader == nil {
+ m.startHeader = e
}
} else {
- bmgrLog.Warnf("Received block header that does not "+
- "properly connect to the chain from peer %s "+
- "-- disconnecting", peer.Addr())
+ log.Warnf("Received block header that does not properly connect "+
+ "to the chain from peer %s -- disconnecting", peer.Addr())
peer.Disconnect()
return
}
// Verify the header at the next checkpoint height matches.
- if node.height == b.nextCheckpoint.Height {
- if node.hash.IsEqual(b.nextCheckpoint.Hash) {
+ if node.height == m.nextCheckpoint.Height {
+ if node.hash.IsEqual(m.nextCheckpoint.Hash) {
receivedCheckpoint = true
- bmgrLog.Infof("Verified downloaded block "+
- "header against checkpoint at height "+
- "%d/hash %s", node.height, node.hash)
+ log.Infof("Verified downloaded block header against "+
+ "checkpoint at height %d/hash %s", node.height, node.hash)
} else {
- bmgrLog.Warnf("Block header at height %d/hash "+
- "%s from peer %s does NOT match "+
- "expected checkpoint hash of %s -- "+
- "disconnecting", node.height, node.hash,
- peer.Addr(), b.nextCheckpoint.Hash)
+ log.Warnf("Block header at height %d/hash %s from peer %s "+
+ "does NOT match expected checkpoint hash of %s -- "+
+ "disconnecting", node.height, node.hash, peer.Addr(),
+ m.nextCheckpoint.Hash)
peer.Disconnect()
return
}
@@ -1327,11 +1267,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
// that is already in the database and is only used to ensure
// the next header links properly, it must be removed before
// fetching the blocks.
- b.headerList.Remove(b.headerList.Front())
- bmgrLog.Infof("Received %v block headers: Fetching blocks",
- b.headerList.Len())
- b.progressLogger.SetLastLogTime(time.Now())
- b.fetchHeaderBlocks()
+ m.headerList.Remove(m.headerList.Front())
+ log.Infof("Received %v block headers: Fetching blocks",
+ m.headerList.Len())
+ m.progressLogger.SetLastLogTime(time.Now())
+ m.fetchHeaderBlocks()
return
}
@@ -1339,20 +1279,20 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
// headers starting from the latest known header and ending with the
// next checkpoint.
locator := []chainhash.Hash{*finalHash}
- err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
+ err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash)
if err != nil {
- bmgrLog.Warnf("Failed to send getheaders message to "+
- "peer %s: %v", peer.Addr(), err)
+ log.Warnf("Failed to send getheaders message to peer %s: %v",
+ peer.Addr(), err)
return
}
}
// handleNotFoundMsg handles notfound messages from all peers.
-func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
+func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
peer := nfmsg.peer
- state, exists := b.peerStates[peer]
+ state, exists := m.peerStates[peer]
if !exists {
- bmgrLog.Warnf("Received notfound message from unknown peer %s", peer)
+ log.Warnf("Received notfound message from unknown peer %s", peer)
return
}
for _, inv := range nfmsg.notFound.InvList {
@@ -1362,12 +1302,12 @@ func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
case wire.InvTypeBlock:
if _, exists := state.requestedBlocks[inv.Hash]; exists {
delete(state.requestedBlocks, inv.Hash)
- delete(b.requestedBlocks, inv.Hash)
+ delete(m.requestedBlocks, inv.Hash)
}
case wire.InvTypeTx:
if _, exists := state.requestedTxns[inv.Hash]; exists {
delete(state.requestedTxns, inv.Hash)
- delete(b.requestedTxns, inv.Hash)
+ delete(m.requestedTxns, inv.Hash)
}
}
}
@@ -1378,24 +1318,24 @@ func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
// inventory can be when it is in different states such as blocks that are part
// of the main chain, on a side chain, in the orphan pool, and transactions that
// are in the memory pool (either the main pool or orphan pool).
-func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) {
+func (m *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
switch invVect.Type {
case wire.InvTypeBlock:
// Determine if the block is known in any form (main chain, side
// chain, or orphan).
hash := &invVect.Hash
- return b.isKnownOrphan(hash) || b.cfg.Chain.HaveBlock(hash), nil
+ return m.isKnownOrphan(hash) || m.cfg.Chain.HaveBlock(hash), nil
case wire.InvTypeTx:
// Ask the transaction memory pool if the transaction is known
// to it in any form (main pool or orphan).
- if b.cfg.TxMemPool.HaveTransaction(&invVect.Hash) {
+ if m.cfg.TxMemPool.HaveTransaction(&invVect.Hash) {
return true, nil
}
// Check if the transaction exists from the point of view of the
// end of the main chain.
- entry, err := b.cfg.Chain.FetchUtxoEntry(&invVect.Hash)
+ entry, err := m.cfg.Chain.FetchUtxoEntry(&invVect.Hash)
if err != nil {
return false, err
}
@@ -1409,11 +1349,11 @@ func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) {
// handleInvMsg handles inv messages from all peers.
// We examine the inventory advertised by the remote peer and act accordingly.
-func (b *blockManager) handleInvMsg(imsg *invMsg) {
+func (m *SyncManager) handleInvMsg(imsg *invMsg) {
peer := imsg.peer
- state, exists := b.peerStates[peer]
+ state, exists := m.peerStates[peer]
if !exists {
- bmgrLog.Warnf("Received inv message from unknown peer %s", peer)
+ log.Warnf("Received inv message from unknown peer %s", peer)
return
}
@@ -1428,8 +1368,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
}
}
- fromSyncPeer := peer == b.syncPeer
- isCurrent := b.IsCurrent()
+ fromSyncPeer := peer == m.syncPeer
+ isCurrent := m.IsCurrent()
// If this inv contains a block announcement, and this isn't coming from
// our current sync peer or we're current, then update the last
@@ -1449,7 +1389,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// If our chain is current and a peer announces a block we already
// know of, then update their current block height.
if lastBlock != -1 && isCurrent {
- blkHeight, err := b.cfg.Chain.BlockHeightByHash(&invVects[lastBlock].Hash)
+ blkHeight, err := m.cfg.Chain.BlockHeightByHash(&invVects[lastBlock].Hash)
if err == nil {
imsg.peer.UpdateLastBlockHeight(blkHeight)
}
@@ -1471,23 +1411,22 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
peer.AddKnownInventory(iv)
// Ignore inventory when we're in headers-first mode.
- if b.headersFirstMode {
+ if m.headersFirstMode {
continue
}
// Request the inventory if we don't already have it.
- haveInv, err := b.haveInventory(iv)
+ haveInv, err := m.haveInventory(iv)
if err != nil {
- bmgrLog.Warnf("Unexpected failure when checking for "+
- "existing inventory during inv message "+
- "processing: %v", err)
+ log.Warnf("Unexpected failure when checking for existing "+
+ "inventory during inv message processing: %v", err)
continue
}
if !haveInv {
if iv.Type == wire.InvTypeTx {
// Skip the transaction if it has already been
// rejected.
- if _, exists := b.rejectedTxns[iv.Hash]; exists {
+ if _, exists := m.rejectedTxns[iv.Hash]; exists {
continue
}
}
@@ -1508,23 +1447,22 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// resending the orphan block as an available block
// to signal there are more missing blocks that need to
// be requested.
- if b.isKnownOrphan(&iv.Hash) {
+ if m.isKnownOrphan(&iv.Hash) {
// Request blocks starting at the latest known
// up to the root of the orphan that just came
// in.
- orphanRoot := b.orphanRoot(&iv.Hash)
- blkLocator, err := b.cfg.Chain.LatestBlockLocator()
+ orphanRoot := m.orphanRoot(&iv.Hash)
+ blkLocator, err := m.cfg.Chain.LatestBlockLocator()
if err != nil {
- bmgrLog.Errorf("PEER: Failed to get block "+
- "locator for the latest block: "+
- "%v", err)
+ log.Errorf("Failed to get block locator for the latest "+
+ "block: %v", err)
continue
}
locator := chainBlockLocatorToHashes(blkLocator)
err = peer.PushGetBlocksMsg(locator, orphanRoot)
if err != nil {
- bmgrLog.Errorf("PEER: Failed to push getblocksmsg "+
- "for orphan chain: %v", err)
+ log.Errorf("Failed to push getblocksmsg for orphan chain: "+
+ "%v", err)
}
continue
}
@@ -1537,12 +1475,11 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// Request blocks after this one up to the
// final one the remote peer knows about (zero
// stop hash).
- blkLocator := b.cfg.Chain.BlockLocatorFromHash(&iv.Hash)
+ blkLocator := m.cfg.Chain.BlockLocatorFromHash(&iv.Hash)
locator := chainBlockLocatorToHashes(blkLocator)
err = imsg.peer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
- bmgrLog.Errorf("PEER: Failed to push getblocksmsg: "+
- "%v", err)
+ log.Errorf("PEER: Failed to push getblocksmsg: %v", err)
}
}
}
@@ -1556,8 +1493,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
case wire.InvTypeBlock:
// Request the block if there is not already a pending
// request.
- if _, exists := b.requestedBlocks[iv.Hash]; !exists {
- limitAdd(b.requestedBlocks, iv.Hash, maxRequestedBlocks)
+ if _, exists := m.requestedBlocks[iv.Hash]; !exists {
+ limitAdd(m.requestedBlocks, iv.Hash, maxRequestedBlocks)
limitAdd(state.requestedBlocks, iv.Hash, maxRequestedBlocks)
gdmsg.AddInvVect(iv)
numRequested++
@@ -1566,8 +1503,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
case wire.InvTypeTx:
// Request the transaction if there is not already a
// pending request.
- if _, exists := b.requestedTxns[iv.Hash]; !exists {
- limitAdd(b.requestedTxns, iv.Hash, maxRequestedTxns)
+ if _, exists := m.requestedTxns[iv.Hash]; !exists {
+ limitAdd(m.requestedTxns, iv.Hash, maxRequestedTxns)
limitAdd(state.requestedTxns, iv.Hash, maxRequestedTxns)
gdmsg.AddInvVect(iv)
numRequested++
@@ -1610,63 +1547,63 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) {
m[hash] = struct{}{}
}
-// blockHandler is the main handler for the block manager. It must be run
-// as a goroutine. It processes block and inv messages in a separate goroutine
-// from the peer handlers so the block (MsgBlock) messages are handled by a
-// single thread without needing to lock memory data structures. This is
-// important because the block manager controls which blocks are needed and how
-// the fetching should proceed.
-func (b *blockManager) blockHandler() {
+// blockHandler is the main handler for the sync manager. It must be run as a
+// goroutine. It processes block and inv messages in a separate goroutine from
+// the peer handlers so the block (MsgBlock) messages are handled by a single
+// thread without needing to lock memory data structures. This is important
+// because the sync manager controls which blocks are needed and how the
+// fetching should proceed.
+func (m *SyncManager) blockHandler() {
out:
for {
select {
- case m := <-b.msgChan:
- switch msg := m.(type) {
+ case data := <-m.msgChan:
+ switch msg := data.(type) {
case *newPeerMsg:
- b.handleNewPeerMsg(msg.peer)
+ m.handleNewPeerMsg(msg.peer)
case *txMsg:
- b.handleTxMsg(msg)
+ m.handleTxMsg(msg)
msg.reply <- struct{}{}
case *blockMsg:
- b.handleBlockMsg(msg)
+ m.handleBlockMsg(msg)
msg.reply <- struct{}{}
case *invMsg:
- b.handleInvMsg(msg)
+ m.handleInvMsg(msg)
case *headersMsg:
- b.handleHeadersMsg(msg)
+ m.handleHeadersMsg(msg)
case *notFoundMsg:
- b.handleNotFoundMsg(msg)
+ m.handleNotFoundMsg(msg)
case *donePeerMsg:
- b.handleDonePeerMsg(msg.peer)
+ m.handleDonePeerMsg(msg.peer)
case getSyncPeerMsg:
var peerID int32
- if b.syncPeer != nil {
- peerID = b.syncPeer.ID()
+ if m.syncPeer != nil {
+ peerID = m.syncPeer.ID()
}
msg.reply <- peerID
case requestFromPeerMsg:
- err := b.requestFromPeer(msg.peer, msg.blocks, msg.txs)
+ err := m.requestFromPeer(msg.peer, msg.blocks, msg.txs)
msg.reply <- requestFromPeerResponse{
err: err,
}
case forceReorganizationMsg:
- err := b.cfg.Chain.ForceHeadReorganization(
+ err := m.cfg.Chain.ForceHeadReorganization(
msg.formerBest, msg.newBest)
if err == nil {
// Notify stake difficulty subscribers and prune
// invalidated transactions.
- best := b.cfg.Chain.BestSnapshot()
- if r := b.cfg.RpcServer(); r != nil {
+ best := m.cfg.Chain.BestSnapshot()
+ if r := m.cfg.RpcServer(); r != nil {
r.NotifyStakeDifficulty(
&rpcserver.StakeDifficultyNtfnData{
BlockHash: best.Hash,
@@ -1674,9 +1611,9 @@ out:
StakeDifficulty: best.NextStakeDiff,
})
}
- b.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff,
+ m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff,
best.Height)
- b.cfg.TxMemPool.PruneExpiredTx()
+ m.cfg.TxMemPool.PruneExpiredTx()
}
msg.reply <- forceReorganizationResponse{
@@ -1684,14 +1621,14 @@ out:
}
case tipGenerationMsg:
- g, err := b.cfg.Chain.TipGeneration()
+ g, err := m.cfg.Chain.TipGeneration()
msg.reply <- tipGenerationResponse{
hashes: g,
err: err,
}
case processBlockMsg:
- forkLen, isOrphan, err := b.processBlockAndOrphans(msg.block,
+ forkLen, isOrphan, err := m.processBlockAndOrphans(msg.block,
msg.flags)
if err != nil {
msg.reply <- processBlockResponse{
@@ -1706,8 +1643,8 @@ out:
if onMainChain {
// Notify stake difficulty subscribers and prune
// invalidated transactions.
- best := b.cfg.Chain.BestSnapshot()
- if r := b.cfg.RpcServer(); r != nil {
+ best := m.cfg.Chain.BestSnapshot()
+ if r := m.cfg.RpcServer(); r != nil {
r.NotifyStakeDifficulty(
&rpcserver.StakeDifficultyNtfnData{
BlockHash: best.Hash,
@@ -1715,9 +1652,9 @@ out:
StakeDifficulty: best.NextStakeDiff,
})
}
- b.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff,
+ m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff,
best.Height)
- b.cfg.TxMemPool.PruneExpiredTx()
+ m.cfg.TxMemPool.PruneExpiredTx()
}
msg.reply <- processBlockResponse{
@@ -1726,7 +1663,7 @@ out:
}
case processTransactionMsg:
- acceptedTxs, err := b.cfg.TxMemPool.ProcessTransaction(msg.tx,
+ acceptedTxs, err := m.cfg.TxMemPool.ProcessTransaction(msg.tx,
msg.allowOrphans, msg.rateLimit, msg.allowHighFees, msg.tag)
msg.reply <- processTransactionResponse{
acceptedTxs: acceptedTxs,
@@ -1734,145 +1671,144 @@ out:
}
default:
- bmgrLog.Warnf("Invalid message type in block handler: %T", msg)
+ log.Warnf("Invalid message type in block handler: %T", msg)
}
- case <-b.quit:
+ case <-m.quit:
break out
}
}
- b.wg.Done()
- bmgrLog.Trace("Block handler done")
+ m.wg.Done()
+ log.Trace("Sync manager done")
}
-// NewPeer informs the block manager of a newly active peer.
-func (b *blockManager) NewPeer(peer *peerpkg.Peer) {
+// NewPeer informs the sync manager of a newly active peer.
+func (m *SyncManager) NewPeer(peer *peerpkg.Peer) {
// Ignore if we are shutting down.
- if atomic.LoadInt32(&b.shutdown) != 0 {
+ if atomic.LoadInt32(&m.shutdown) != 0 {
return
}
- b.msgChan <- &newPeerMsg{peer: peer}
+ m.msgChan <- &newPeerMsg{peer: peer}
}
// QueueTx adds the passed transaction message and peer to the block handling
// queue.
-func (b *blockManager) QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan struct{}) {
+func (m *SyncManager) QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan struct{}) {
// Don't accept more transactions if we're shutting down.
- if atomic.LoadInt32(&b.shutdown) != 0 {
+ if atomic.LoadInt32(&m.shutdown) != 0 {
done <- struct{}{}
return
}
- b.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
+ m.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
}
// QueueBlock adds the passed block message and peer to the block handling queue.
-func (b *blockManager) QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done chan struct{}) {
+func (m *SyncManager) QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done chan struct{}) {
// Don't accept more blocks if we're shutting down.
- if atomic.LoadInt32(&b.shutdown) != 0 {
+ if atomic.LoadInt32(&m.shutdown) != 0 {
done <- struct{}{}
return
}
- b.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
+ m.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
}
// QueueInv adds the passed inv message and peer to the block handling queue.
-func (b *blockManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
+func (m *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
// No channel handling here because peers do not need to block on inv
// messages.
- if atomic.LoadInt32(&b.shutdown) != 0 {
+ if atomic.LoadInt32(&m.shutdown) != 0 {
return
}
- b.msgChan <- &invMsg{inv: inv, peer: peer}
+ m.msgChan <- &invMsg{inv: inv, peer: peer}
}
// QueueHeaders adds the passed headers message and peer to the block handling
// queue.
-func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) {
+func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) {
// No channel handling here because peers do not need to block on
// headers messages.
- if atomic.LoadInt32(&b.shutdown) != 0 {
+ if atomic.LoadInt32(&m.shutdown) != 0 {
return
}
- b.msgChan <- &headersMsg{headers: headers, peer: peer}
+ m.msgChan <- &headersMsg{headers: headers, peer: peer}
}
// QueueNotFound adds the passed notfound message and peer to the block handling
// queue.
-func (b *blockManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) {
+func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) {
// No channel handling here because peers do not need to block on
// reject messages.
- if atomic.LoadInt32(&b.shutdown) != 0 {
+ if atomic.LoadInt32(&m.shutdown) != 0 {
return
}
- b.msgChan <- ¬FoundMsg{notFound: notFound, peer: peer}
+ m.msgChan <- ¬FoundMsg{notFound: notFound, peer: peer}
}
-// DonePeer informs the blockmanager that a peer has disconnected.
-func (b *blockManager) DonePeer(peer *peerpkg.Peer) {
+// DonePeer informs the sync manager that a peer has disconnected.
+func (m *SyncManager) DonePeer(peer *peerpkg.Peer) {
// Ignore if we are shutting down.
- if atomic.LoadInt32(&b.shutdown) != 0 {
+ if atomic.LoadInt32(&m.shutdown) != 0 {
return
}
- b.msgChan <- &donePeerMsg{peer: peer}
+ m.msgChan <- &donePeerMsg{peer: peer}
}
// Start begins the core block handler which processes block and inv messages.
-func (b *blockManager) Start() {
+func (m *SyncManager) Start() {
// Already started?
- if atomic.AddInt32(&b.started, 1) != 1 {
+ if atomic.AddInt32(&m.started, 1) != 1 {
return
}
- bmgrLog.Trace("Starting block manager")
- b.wg.Add(1)
- go b.blockHandler()
+ log.Trace("Starting sync manager")
+ m.wg.Add(1)
+ go m.blockHandler()
}
-// Stop gracefully shuts down the block manager by stopping all asynchronous
+// Stop gracefully shuts down the sync manager by stopping all asynchronous
// handlers and waiting for them to finish.
-func (b *blockManager) Stop() error {
- if atomic.AddInt32(&b.shutdown, 1) != 1 {
- bmgrLog.Warnf("Block manager is already in the process of " +
- "shutting down")
+func (m *SyncManager) Stop() error {
+ if atomic.AddInt32(&m.shutdown, 1) != 1 {
+ log.Warnf("Sync manager is already in the process of shutting down")
return nil
}
- bmgrLog.Infof("Block manager shutting down")
- close(b.quit)
- b.wg.Wait()
+ log.Infof("Sync manager shutting down")
+ close(m.quit)
+ m.wg.Wait()
return nil
}
// SyncPeerID returns the ID of the current sync peer, or 0 if there is none.
-func (b *blockManager) SyncPeerID() int32 {
+func (m *SyncManager) SyncPeerID() int32 {
reply := make(chan int32)
- b.msgChan <- getSyncPeerMsg{reply: reply}
+ m.msgChan <- getSyncPeerMsg{reply: reply}
return <-reply
}
// RequestFromPeer allows an outside caller to request blocks or transactions
-// from a peer. The requests are logged in the blockmanager's internal map of
-// requests so they do not later ban the peer for sending the respective data.
-func (b *blockManager) RequestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error {
+// from a peer. The requests are logged in the internal map of requests so the
+// peer is not later banned for sending the respective data.
+func (m *SyncManager) RequestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error {
reply := make(chan requestFromPeerResponse)
- b.msgChan <- requestFromPeerMsg{peer: p, blocks: blocks, txs: txs,
+ m.msgChan <- requestFromPeerMsg{peer: p, blocks: blocks, txs: txs,
reply: reply}
response := <-reply
return response.err
}
-func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error {
+func (m *SyncManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error {
msgResp := wire.NewMsgGetData()
- state, exists := b.peerStates[p]
+ state, exists := m.peerStates[p]
if !exists {
return fmt.Errorf("unknown peer %s", p)
}
@@ -1881,14 +1817,14 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash
for _, bh := range blocks {
// If we've already requested this block, skip it.
_, alreadyReqP := state.requestedBlocks[*bh]
- _, alreadyReqB := b.requestedBlocks[*bh]
+ _, alreadyReqB := m.requestedBlocks[*bh]
if alreadyReqP || alreadyReqB {
continue
}
// Skip the block when it is already known.
- if b.isKnownOrphan(bh) || b.cfg.Chain.HaveBlock(bh) {
+ if m.isKnownOrphan(bh) || m.cfg.Chain.HaveBlock(bh) {
continue
}
@@ -1900,14 +1836,14 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash
}
state.requestedBlocks[*bh] = struct{}{}
- b.requestedBlocks[*bh] = struct{}{}
+ m.requestedBlocks[*bh] = struct{}{}
}
// Add the vote transactions to the request.
for _, vh := range txs {
// If we've already requested this transaction, skip it.
_, alreadyReqP := state.requestedTxns[*vh]
- _, alreadyReqB := b.requestedTxns[*vh]
+ _, alreadyReqB := m.requestedTxns[*vh]
if alreadyReqP || alreadyReqB {
continue
@@ -1915,13 +1851,13 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash
// Ask the transaction memory pool if the transaction is known
// to it in any form (main pool or orphan).
- if b.cfg.TxMemPool.HaveTransaction(vh) {
+ if m.cfg.TxMemPool.HaveTransaction(vh) {
continue
}
// Check if the transaction exists from the point of view of the
// end of the main chain.
- entry, err := b.cfg.Chain.FetchUtxoEntry(vh)
+ entry, err := m.cfg.Chain.FetchUtxoEntry(vh)
if err != nil {
return err
}
@@ -1937,7 +1873,7 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash
}
state.requestedTxns[*vh] = struct{}{}
- b.requestedTxns[*vh] = struct{}{}
+ m.requestedTxns[*vh] = struct{}{}
}
if len(msgResp.InvList) > 0 {
@@ -1949,11 +1885,11 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash
// ForceReorganization forces a reorganization of the block chain to the block
// hash requested, so long as it matches up with the current organization of the
-// best chain. It is funneled through the block manager since blockchain is not
+// best chain. It is funneled through the sync manager since blockchain is not
// safe for concurrent access.
-func (b *blockManager) ForceReorganization(formerBest, newBest chainhash.Hash) error {
+func (m *SyncManager) ForceReorganization(formerBest, newBest chainhash.Hash) error {
reply := make(chan forceReorganizationResponse)
- b.msgChan <- forceReorganizationMsg{
+ m.msgChan <- forceReorganizationMsg{
formerBest: formerBest,
newBest: newBest,
reply: reply}
@@ -1962,64 +1898,108 @@ func (b *blockManager) ForceReorganization(formerBest, newBest chainhash.Hash) e
}
// TipGeneration returns the hashes of all the children of the current best
-// chain tip. It is funneled through the block manager since blockchain is not
+// chain tip. It is funneled through the sync manager since blockchain is not
// safe for concurrent access.
-func (b *blockManager) TipGeneration() ([]chainhash.Hash, error) {
+func (m *SyncManager) TipGeneration() ([]chainhash.Hash, error) {
reply := make(chan tipGenerationResponse)
- b.msgChan <- tipGenerationMsg{reply: reply}
+ m.msgChan <- tipGenerationMsg{reply: reply}
response := <-reply
return response.hashes, response.err
}
// ProcessBlock makes use of ProcessBlock on an internal instance of a block
-// chain. It is funneled through the block manager since blockchain is not safe
+// chain. It is funneled through the sync manager since blockchain is not safe
// for concurrent access.
-func (b *blockManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
+func (m *SyncManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
reply := make(chan processBlockResponse, 1)
- b.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
+ m.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
response := <-reply
return response.isOrphan, response.err
}
// ProcessTransaction makes use of ProcessTransaction on an internal instance of
-// a block chain. It is funneled through the block manager since blockchain is
+// a block chain. It is funneled through the sync manager since blockchain is
// not safe for concurrent access.
-func (b *blockManager) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool,
+func (m *SyncManager) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool,
rateLimit bool, allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) {
reply := make(chan processTransactionResponse, 1)
- b.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit,
+ m.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit,
allowHighFees, tag, reply}
response := <-reply
return response.acceptedTxs, response.err
}
-// IsCurrent returns whether or not the block manager believes it is synced with
+// IsCurrent returns whether or not the sync manager believes it is synced with
// the connected peers.
//
// This function is safe for concurrent access.
-func (b *blockManager) IsCurrent() bool {
- b.isCurrentMtx.RLock()
- isCurrent := b.isCurrent
- b.isCurrentMtx.RUnlock()
+func (m *SyncManager) IsCurrent() bool {
+ m.isCurrentMtx.RLock()
+ isCurrent := m.isCurrent
+ m.isCurrentMtx.RUnlock()
return isCurrent
}
// TicketPoolValue returns the current value of the total stake in the ticket
// pool.
-func (b *blockManager) TicketPoolValue() (dcrutil.Amount, error) {
- return b.cfg.Chain.TicketPoolValue()
+func (m *SyncManager) TicketPoolValue() (dcrutil.Amount, error) {
+ return m.cfg.Chain.TicketPoolValue()
+}
+
+// Config holds the configuration options related to the network chain
+// synchronization manager.
+type Config struct {
+ // PeerNotifier specifies an implementation to use for notifying peers of
+ // status changes related to blocks and transactions.
+ PeerNotifier PeerNotifier
+
+ // ChainParams identifies which chain parameters the manager is associated
+ // with.
+ ChainParams *chaincfg.Params
+
+ // Chain specifies the chain instance to use for processing blocks and
+ // transactions.
+ Chain *blockchain.BlockChain
+
+ // SigCache defines the signature cache to use when validating signatures.
+ SigCache *txscript.SigCache
+
+ // TxMemPool specifies the mempool to use for processing transactions.
+ TxMemPool *mempool.TxPool
+
+ // RpcServer returns an instance of an RPC server to use for notifications.
+ // It may return nil if there is no active RPC server.
+ RpcServer func() *rpcserver.Server
+
+ // DisableCheckpoints indicates whether or not the sync manager should make
+ // use of checkpoints.
+ DisableCheckpoints bool
+
+ // NoMiningStateSync indicates whether or not the sync manager should
+ // perform an initial mining state synchronization with peers once they are
+ // believed to be fully synced.
+ NoMiningStateSync bool
+
+ // MaxPeers specifies the maximum number of peers the server is expected to
+ // be connected with. It is primarily used as a hint for more efficient
+ // synchronization.
+ MaxPeers int
+
+ // MaxOrphanTxs specifies the maximum number of orphan transactions the
+ // transaction pool associated with the server supports.
+ MaxOrphanTxs int
}
-// newBlockManager returns a new Decred block manager.
-// Use Start to begin processing asynchronous block and inv updates.
-func newBlockManager(config *blockManagerConfig) (*blockManager, error) {
- bm := blockManager{
- cfg: config,
+// New returns a new network chain synchronization manager. Use Start to begin
+// processing asynchronous block and inv updates.
+func New(config *Config) (*SyncManager, error) {
+ m := SyncManager{
+ cfg: *config,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
- progressLogger: progresslog.New("Processed", bmgrLog),
+ progressLogger: progresslog.New("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
@@ -2028,20 +2008,20 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) {
isCurrent: config.Chain.IsCurrent(),
}
- best := bm.cfg.Chain.BestSnapshot()
- if !bm.cfg.DisableCheckpoints {
+ best := m.cfg.Chain.BestSnapshot()
+ if !m.cfg.DisableCheckpoints {
// Initialize the next checkpoint based on the current height.
- bm.nextCheckpoint = bm.findNextHeaderCheckpoint(best.Height)
- if bm.nextCheckpoint != nil {
- bm.resetHeaderState(&best.Hash, best.Height)
+ m.nextCheckpoint = m.findNextHeaderCheckpoint(best.Height)
+ if m.nextCheckpoint != nil {
+ m.resetHeaderState(&best.Hash, best.Height)
}
} else {
- bmgrLog.Info("Checkpoints are disabled")
+ log.Info("Checkpoints are disabled")
}
- bm.syncHeightMtx.Lock()
- bm.syncHeight = best.Height
- bm.syncHeightMtx.Unlock()
+ m.syncHeightMtx.Lock()
+ m.syncHeight = best.Height
+ m.syncHeightMtx.Unlock()
- return &bm, nil
+ return &m, nil
}
diff --git a/internal/rpcserver/rpcserverhelp.go b/internal/rpcserver/rpcserverhelp.go
index cc000d0c4c..5006d7a8a2 100644
--- a/internal/rpcserver/rpcserverhelp.go
+++ b/internal/rpcserver/rpcserverhelp.go
@@ -22,7 +22,7 @@ var helpDescsEnUS = map[string]string{
"The levelspec can either a debug level or of the form:\n" +
"=,=,...\n" +
"The valid debug levels are trace, debug, info, warn, error, and critical.\n" +
- "The valid subsystems are AMGR, ADXR, BCDB, BMGR, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, and TXMP.\n" +
+ "The valid subsystems are AMGR, ADXR, BCDB, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, SYNC, and TXMP.\n" +
"Finally the keyword 'show' will return a list of the available subsystems.",
"debuglevel-levelspec": "The debug level(s) to use or the keyword 'show'",
"debuglevel--condition0": "levelspec!=show",
diff --git a/internal/rpcserver/rpcwebsocket.go b/internal/rpcserver/rpcwebsocket.go
index fb8fdc72a9..2d488545f3 100644
--- a/internal/rpcserver/rpcwebsocket.go
+++ b/internal/rpcserver/rpcwebsocket.go
@@ -2028,8 +2028,8 @@ var ErrClientQuit = errors.New("client quit")
// QueueNotification queues the passed notification to be sent to the websocket
// client. This function, as the name implies, is only intended for
// notifications since it has additional logic to prevent other subsystems, such
-// as the memory pool and block manager, from blocking even when the send
-// channel is full.
+// as the memory pool and sync manager, from blocking even when the send channel
+// is full.
//
// If the client is in the process of shutting down, this function returns
// ErrClientQuit. This is intended to be checked by long-running notification
diff --git a/log.go b/log.go
index 28843d3600..e7289a6c61 100644
--- a/log.go
+++ b/log.go
@@ -20,6 +20,7 @@ import (
"github.com/decred/dcrd/internal/mempool"
"github.com/decred/dcrd/internal/mining"
"github.com/decred/dcrd/internal/mining/cpuminer"
+ "github.com/decred/dcrd/internal/netsync"
"github.com/decred/dcrd/internal/rpcserver"
"github.com/decred/dcrd/peer/v2"
"github.com/decred/dcrd/txscript/v3"
@@ -60,7 +61,6 @@ var (
adxrLog = backendLog.Logger("ADXR")
amgrLog = backendLog.Logger("AMGR")
bcdbLog = backendLog.Logger("BCDB")
- bmgrLog = backendLog.Logger("BMGR")
chanLog = backendLog.Logger("CHAN")
cmgrLog = backendLog.Logger("CMGR")
dcrdLog = backendLog.Logger("DCRD")
@@ -73,6 +73,7 @@ var (
scrpLog = backendLog.Logger("SCRP")
srvrLog = backendLog.Logger("SRVR")
stkeLog = backendLog.Logger("STKE")
+ syncLog = backendLog.Logger("SYNC")
txmpLog = backendLog.Logger("TXMP")
trsyLog = backendLog.Logger("TRSY")
)
@@ -92,6 +93,7 @@ func init() {
peer.UseLogger(peerLog)
rpcserver.UseLogger(rpcsLog)
stake.UseLogger(stkeLog)
+ netsync.UseLogger(syncLog)
txscript.UseLogger(scrpLog)
}
@@ -100,7 +102,6 @@ var subsystemLoggers = map[string]slog.Logger{
"ADXR": adxrLog,
"AMGR": amgrLog,
"BCDB": bcdbLog,
- "BMGR": bmgrLog,
"CHAN": chanLog,
"CMGR": cmgrLog,
"DCRD": dcrdLog,
@@ -113,6 +114,7 @@ var subsystemLoggers = map[string]slog.Logger{
"SCRP": scrpLog,
"SRVR": srvrLog,
"STKE": stkeLog,
+ "SYNC": syncLog,
"TXMP": txmpLog,
"TRSY": trsyLog,
}
diff --git a/rpcadaptors.go b/rpcadaptors.go
index 9f60a7832c..cd871a5bb9 100644
--- a/rpcadaptors.go
+++ b/rpcadaptors.go
@@ -19,6 +19,7 @@ import (
"github.com/decred/dcrd/internal/mempool"
"github.com/decred/dcrd/internal/mining"
"github.com/decred/dcrd/internal/mining/cpuminer"
+ "github.com/decred/dcrd/internal/netsync"
"github.com/decred/dcrd/internal/rpcserver"
"github.com/decred/dcrd/peer/v2"
"github.com/decred/dcrd/wire"
@@ -306,23 +307,23 @@ func (*rpcConnManager) Lookup(host string) ([]net.IP, error) {
return dcrdLookup(host)
}
-// rpcSyncMgr provides a block manager for use with the RPC server and
-// implements the rpcserver.SyncManager interface.
+// rpcSyncMgr provides an adaptor for use with the RPC server and implements the
+// rpcserver.SyncManager interface.
type rpcSyncMgr struct {
- server *server
- blockMgr *blockManager
+ server *server
+ syncMgr *netsync.SyncManager
}
// Ensure rpcSyncMgr implements the rpcserver.SyncManager interface.
var _ rpcserver.SyncManager = (*rpcSyncMgr)(nil)
-// IsCurrent returns whether or not the sync manager believes the chain is
+// IsCurrent returns whether or not the net sync manager believes the chain is
// current as compared to the rest of the network.
//
// This function is safe for concurrent access and is part of the
// rpcserver.SyncManager interface implementation.
func (b *rpcSyncMgr) IsCurrent() bool {
- return b.blockMgr.IsCurrent()
+ return b.syncMgr.IsCurrent()
}
// SubmitBlock submits the provided block to the network after processing it
@@ -331,7 +332,7 @@ func (b *rpcSyncMgr) IsCurrent() bool {
// This function is safe for concurrent access and is part of the
// rpcserver.SyncManager interface implementation.
func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
- return b.blockMgr.ProcessBlock(block, flags)
+ return b.syncMgr.ProcessBlock(block, flags)
}
// SyncPeer returns the id of the current peer being synced with.
@@ -339,7 +340,7 @@ func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block, flags blockchain.Behavior
// This function is safe for concurrent access and is part of the
// rpcserver.SyncManager interface implementation.
func (b *rpcSyncMgr) SyncPeerID() int32 {
- return b.blockMgr.SyncPeerID()
+ return b.syncMgr.SyncPeerID()
}
// LocateBlocks returns the hashes of the blocks after the first known block in
@@ -355,19 +356,19 @@ func (b *rpcSyncMgr) LocateBlocks(locator blockchain.BlockLocator, hashStop *cha
// TipGeneration returns the entire generation of blocks stemming from the
// parent of the current tip.
func (b *rpcSyncMgr) TipGeneration() ([]chainhash.Hash, error) {
- return b.blockMgr.TipGeneration()
+ return b.syncMgr.TipGeneration()
}
// SyncHeight returns latest known block being synced to.
func (b *rpcSyncMgr) SyncHeight() int64 {
- return b.blockMgr.SyncHeight()
+ return b.syncMgr.SyncHeight()
}
// ProcessTransaction relays the provided transaction validation and insertion
// into the memory pool.
func (b *rpcSyncMgr) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool,
rateLimit bool, allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) {
- return b.blockMgr.ProcessTransaction(tx, allowOrphans,
+ return b.syncMgr.ProcessTransaction(tx, allowOrphans,
rateLimit, allowHighFees, tag)
}
diff --git a/server.go b/server.go
index c5e5c36b4c..e30d8970ae 100644
--- a/server.go
+++ b/server.go
@@ -42,6 +42,7 @@ import (
"github.com/decred/dcrd/internal/mempool"
"github.com/decred/dcrd/internal/mining"
"github.com/decred/dcrd/internal/mining/cpuminer"
+ "github.com/decred/dcrd/internal/netsync"
"github.com/decred/dcrd/internal/rpcserver"
"github.com/decred/dcrd/internal/version"
"github.com/decred/dcrd/lru"
@@ -161,9 +162,9 @@ type relayMsg struct {
immediate bool
}
-// updatePeerHeightsMsg is a message sent from the blockmanager to the server
-// after a new block has been accepted. The purpose of the message is to update
-// the heights of peers that were known to announce the block before we
+// updatePeerHeightsMsg is a message sent from the net sync manager to the
+// server after a new block has been accepted. The purpose of the message is to
+// update the heights of peers that were known to announce the block before we
// connected it to the main chain or recognized it as an orphan. With these
// updates, peer heights will be kept up to date, allowing for fresh data when
// selecting sync peer candidacy.
@@ -462,7 +463,7 @@ type server struct {
sigCache *txscript.SigCache
subsidyCache *standalone.SubsidyCache
rpcServer *rpcserver.Server
- blockManager *blockManager
+ syncManager *netsync.SyncManager
bg *mining.BgBlkTmplGenerator
chain *blockchain.BlockChain
txMemPool *mempool.TxPool
@@ -501,8 +502,7 @@ type server struct {
lotteryDataBroadcast map[chainhash.Hash]struct{}
}
-// serverPeer extends the peer to maintain state shared by the server and
-// the blockmanager.
+// serverPeer extends the peer to maintain state shared by the server.
type serverPeer struct {
*peer.Peer
@@ -524,7 +524,8 @@ type serverPeer struct {
getMiningStateSent bool
initStateSent bool
- // The following chans are used to sync blockmanager and server.
+ // The following chans are used to synchronize the net sync manager and
+ // server.
txProcessed chan struct{}
blockProcessed chan struct{}
@@ -699,7 +700,7 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej
// Advertise the local address when the server accepts incoming
// connections and it believes itself to be close to the best
// known tip.
- if !cfg.DisableListen && sp.server.blockManager.IsCurrent() {
+ if !cfg.DisableListen && sp.server.syncManager.IsCurrent() {
// Get address that best matches.
lna := addrManager.GetBestLocalAddress(remoteAddr)
if addrmgr.IsRoutable(lna) {
@@ -730,8 +731,8 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej
// the local clock to keep the network time in sync.
sp.server.timeSource.AddTimeSample(p.Addr(), msg.Timestamp)
- // Signal the block manager this peer is a new sync candidate.
- sp.server.blockManager.NewPeer(sp.Peer)
+ // Signal the net sync manager this peer is a new sync candidate.
+ sp.server.syncManager.NewPeer(sp.Peer)
// Add valid peer to the server.
sp.server.AddPeer(sp)
@@ -807,12 +808,8 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState
}
sp.getMiningStateSent = true
- // Access the block manager and get the list of best blocks to mine on.
- bm := sp.server.blockManager
- mp := sp.server.txMemPool
- best := sp.server.chain.BestSnapshot()
-
// Send out blank mining states if it's early in the blockchain.
+ best := sp.server.chain.BestSnapshot()
if best.Height < sp.server.chainParams.StakeValidationHeight-1 {
err := sp.pushMiningStateMsg(0, nil, nil)
if err != nil {
@@ -825,9 +822,10 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState
// Obtain the entire generation of blocks stemming from the parent of
// the current tip.
- children, err := bm.TipGeneration()
+ sm := sp.server.syncManager
+ children, err := sm.TipGeneration()
if err != nil {
- peerLog.Warnf("failed to access block manager to get the generation "+
+ peerLog.Warnf("failed to access sync manager to get the generation "+
"for a mining state request (block: %v): %v", best.Hash, err)
return
}
@@ -836,6 +834,7 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState
// list to the maximum number of allowed eligible block hashes per
// mining state message. There is nothing to send when there are no
// eligible blocks.
+ mp := sp.server.txMemPool
blockHashes := mining.SortParentsByVotes(mp, best.Hash, children,
sp.server.chainParams)
numBlocks := len(blockHashes)
@@ -871,7 +870,7 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState
// OnMiningState is invoked when a peer receives a miningstate wire message. It
// requests the data advertised in the message from the peer.
func (sp *serverPeer) OnMiningState(p *peer.Peer, msg *wire.MsgMiningState) {
- err := sp.server.blockManager.RequestFromPeer(sp.Peer, msg.BlockHashes,
+ err := sp.server.syncManager.RequestFromPeer(sp.Peer, msg.BlockHashes,
msg.VoteHashes)
if err != nil {
peerLog.Warnf("couldn't handle mining state message: %v",
@@ -888,12 +887,8 @@ func (sp *serverPeer) OnGetInitState(p *peer.Peer, msg *wire.MsgGetInitState) {
}
sp.initStateSent = true
- // Access the block manager and get the list of best blocks to mine on.
- bm := sp.server.blockManager
- mp := sp.server.txMemPool
- best := sp.server.chain.BestSnapshot()
-
// Send out blank mining states if it's early in the blockchain.
+ best := sp.server.chain.BestSnapshot()
if best.Height < sp.server.chainParams.StakeValidationHeight-1 {
sp.QueueMessage(wire.NewMsgInitState(), nil)
return
@@ -913,12 +908,14 @@ func (sp *serverPeer) OnGetInitState(p *peer.Peer, msg *wire.MsgGetInitState) {
// Fetch head block hashes if we need to send either them or their
// votes.
+ mp := sp.server.txMemPool
if wantBlocks || wantVotes {
// Obtain the entire generation of blocks stemming from the
// parent of the current tip.
- children, err := bm.TipGeneration()
+ sm := sp.server.syncManager
+ children, err := sm.TipGeneration()
if err != nil {
- peerLog.Warnf("Failed to access block manager to get the generation "+
+ peerLog.Warnf("Failed to access sync manager to get the generation "+
"for a init state request (block: %v): %v", best.Hash, err)
return
}
@@ -979,7 +976,7 @@ func (sp *serverPeer) OnInitState(p *peer.Peer, msg *wire.MsgInitState) {
txHashes = append(txHashes, &msg.TSpendHashes[i])
}
- err := sp.server.blockManager.RequestFromPeer(sp.Peer, blockHashes,
+ err := sp.server.syncManager.RequestFromPeer(sp.Peer, blockHashes,
txHashes)
if err != nil {
peerLog.Warnf("couldn't handle init state message: %v", err)
@@ -1004,12 +1001,12 @@ func (sp *serverPeer) OnTx(p *peer.Peer, msg *wire.MsgTx) {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
p.AddKnownInventory(iv)
- // Queue the transaction up to be handled by the block manager and
+ // Queue the transaction up to be handled by the net sync manager and
// intentionally block further receives until the transaction is fully
// processed and known good or bad. This helps prevent a malicious peer
// from queuing up a bunch of bad transactions before disconnecting (or
// being disconnected) and wasting memory.
- sp.server.blockManager.QueueTx(tx, sp.Peer, sp.txProcessed)
+ sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
<-sp.txProcessed
}
@@ -1024,23 +1021,22 @@ func (sp *serverPeer) OnBlock(p *peer.Peer, msg *wire.MsgBlock, buf []byte) {
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
p.AddKnownInventory(iv)
- // Queue the block up to be handled by the block manager and
+ // Queue the block up to be handled by the net sync manager and
// intentionally block further receives until the network block is fully
// processed and known good or bad. This helps prevent a malicious peer
// from queuing up a bunch of bad blocks before disconnecting (or being
// disconnected) and wasting memory. Additionally, this behavior is
- // depended on by at least the block acceptance test tool as the
- // reference implementation processes blocks in the same thread and
- // therefore blocks further messages until the network block has been
- // fully processed.
- sp.server.blockManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
+ // depended on by at least the block acceptance test tool as the reference
+ // implementation processes blocks in the same thread and therefore blocks
+ // further messages until the network block has been fully processed.
+ sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
<-sp.blockProcessed
}
// OnInv is invoked when a peer receives an inv wire message and is used to
// examine the inventory being advertised by the remote peer and react
-// accordingly. We pass the message down to blockmanager which will call
-// QueueMessage with any appropriate responses.
+// accordingly. We pass the message down to the net sync manager which will
+// call QueueMessage with any appropriate responses.
func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) {
// Ban peers sending empty inventory requests.
if len(msg.InvList) == 0 {
@@ -1049,7 +1045,7 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) {
}
if !cfg.BlocksOnly {
- sp.server.blockManager.QueueInv(msg, sp.Peer)
+ sp.server.syncManager.QueueInv(msg, sp.Peer)
return
}
@@ -1068,11 +1064,11 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) {
}
}
- sp.server.blockManager.QueueInv(newInv, sp.Peer)
+ sp.server.syncManager.QueueInv(newInv, sp.Peer)
}
// OnHeaders is invoked when a peer receives a headers wire message. The
-// message is passed down to the block manager.
+// message is passed down to the net sync manager.
func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
// Ban peers sending empty headers requests.
if len(msg.Headers) == 0 {
@@ -1080,7 +1076,7 @@ func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
return
}
- sp.server.blockManager.QueueHeaders(msg, sp.Peer)
+ sp.server.syncManager.QueueHeaders(msg, sp.Peer)
}
// handleGetData is invoked when a peer receives a getdata wire message and is
@@ -1206,7 +1202,7 @@ func (sp *serverPeer) OnGetBlocks(p *peer.Peer, msg *wire.MsgGetBlocks) {
// OnGetHeaders is invoked when a peer receives a getheaders wire message.
func (sp *serverPeer) OnGetHeaders(p *peer.Peer, msg *wire.MsgGetHeaders) {
// Ignore getheaders requests if not in sync.
- if !sp.server.blockManager.IsCurrent() {
+ if !sp.server.syncManager.IsCurrent() {
return
}
@@ -1268,7 +1264,7 @@ func (sp *serverPeer) OnGetCFilter(p *peer.Peer, msg *wire.MsgGetCFilter) {
}
// Ignore request if CFs are disabled or the chain is not yet synced.
- if cfg.NoCFilters || !sp.server.blockManager.IsCurrent() {
+ if cfg.NoCFilters || !sp.server.syncManager.IsCurrent() {
return
}
@@ -1337,7 +1333,7 @@ func (sp *serverPeer) OnGetCFilter(p *peer.Peer, msg *wire.MsgGetCFilter) {
// OnGetCFilterV2 is invoked when a peer receives a getcfilterv2 wire message.
func (sp *serverPeer) OnGetCFilterV2(_ *peer.Peer, msg *wire.MsgGetCFilterV2) {
// Ignore request if the chain is not yet synced.
- if !sp.server.blockManager.IsCurrent() {
+ if !sp.server.syncManager.IsCurrent() {
return
}
@@ -1370,7 +1366,7 @@ func (sp *serverPeer) OnGetCFHeaders(p *peer.Peer, msg *wire.MsgGetCFHeaders) {
}
// Ignore request if CFs are disabled or the chain is not yet synced.
- if cfg.NoCFilters || !sp.server.blockManager.IsCurrent() {
+ if cfg.NoCFilters || !sp.server.syncManager.IsCurrent() {
return
}
@@ -1579,7 +1575,7 @@ func (sp *serverPeer) OnNotFound(p *peer.Peer, msg *wire.MsgNotFound) {
return
}
}
- sp.server.blockManager.QueueNotFound(msg, p)
+ sp.server.syncManager.QueueNotFound(msg, p)
}
// randomUint16Number returns a random uint16 in a specified input range. Note
@@ -2317,9 +2313,10 @@ func (s *server) peerDoneHandler(sp *serverPeer) {
sp.WaitForDisconnect()
s.donePeers <- sp
- // Only tell block manager we are gone if we ever told it we existed.
+ // Notify the net sync manager the peer is gone if it was ever notified that
+ // the peer existed.
if sp.VersionKnown() {
- s.blockManager.DonePeer(sp.Peer)
+ s.syncManager.DonePeer(sp.Peer)
tipHash := &s.chain.BestSnapshot().Hash
isTreasuryEnabled, err := s.chain.IsTreasuryAgendaActive(tipHash)
@@ -2342,13 +2339,12 @@ func (s *server) peerDoneHandler(sp *serverPeer) {
// peers to and from the server, banning peers, and broadcasting messages to
// peers. It must be run in a goroutine.
func (s *server) peerHandler(ctx context.Context) {
- // Start the address manager and block manager, both of which are needed
- // by peers. This is done here since their lifecycle is closely tied
- // to this handler and rather than adding more channels to synchronize
- // things, it's easier and slightly faster to simply start and stop them
- // in this handler.
+ // Start the address manager and sync manager, both of which are needed by
+ // peers. This is done here since their lifecycle is closely tied to this
+ // handler and rather than adding more channels to synchronize things, it's
+ // easier and slightly faster to simply start and stop them in this handler.
s.addrManager.Start()
- s.blockManager.Start()
+ s.syncManager.Start()
srvrLog.Tracef("Starting peer handler")
@@ -2405,7 +2401,7 @@ out:
}
}
- s.blockManager.Stop()
+ s.syncManager.Stop()
s.addrManager.Stop()
// Drain channels before exiting so nothing is left waiting around
@@ -2652,7 +2648,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
// which could result in a deadlock.
block, ok := notification.Data.(*dcrutil.Block)
if !ok {
- bmgrLog.Warnf("New tip block checked notification is not a block.")
+ syncLog.Warnf("New tip block checked notification is not a block.")
break
}
@@ -2670,13 +2666,13 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
// Don't relay or notify RPC clients with winning tickets if we are not
// current. Other peers that are current should already know about it
// and clients, such as wallets, shouldn't be voting on old blocks.
- if !s.blockManager.IsCurrent() {
+ if !s.syncManager.IsCurrent() {
return
}
band, ok := notification.Data.(*blockchain.BlockAcceptedNtfnsData)
if !ok {
- bmgrLog.Warnf("Chain accepted notification is not " +
+ syncLog.Warnf("Chain accepted notification is not " +
"BlockAcceptedNtfnsData.")
break
}
@@ -2720,7 +2716,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
// blockchain.
wt, _, _, err := s.chain.LotteryDataForBlock(blockHash)
if err != nil {
- bmgrLog.Errorf("Couldn't calculate winning tickets for "+
+ syncLog.Errorf("Couldn't calculate winning tickets for "+
"accepted block %v: %v", blockHash, err.Error())
} else {
// Notify registered websocket clients of newly eligible tickets
@@ -2765,8 +2761,8 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
case blockchain.NTBlockConnected:
ntfn, ok := notification.Data.(*blockchain.BlockConnectedNtfnsData)
if !ok {
- bmgrLog.Warnf("Block connected notification is not " +
- "BlockConnectedNtfnsData.")
+ syncLog.Warnf("Block connected notification is not " +
+ "BlockConnectedNtfnsData")
break
}
block := ntfn.Block
@@ -2875,7 +2871,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
case blockchain.NTSpentAndMissedTickets:
tnd, ok := notification.Data.(*blockchain.TicketNotificationsData)
if !ok {
- bmgrLog.Warnf("Tickets connected notification is not " +
+ syncLog.Warnf("Tickets connected notification is not " +
"TicketNotificationsData")
break
}
@@ -2888,7 +2884,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
case blockchain.NTNewTickets:
tnd, ok := notification.Data.(*blockchain.TicketNotificationsData)
if !ok {
- bmgrLog.Warnf("Tickets connected notification is not " +
+ syncLog.Warnf("Tickets connected notification is not " +
"TicketNotificationsData")
break
}
@@ -2901,7 +2897,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
case blockchain.NTBlockDisconnected:
ntfn, ok := notification.Data.(*blockchain.BlockDisconnectedNtfnsData)
if !ok {
- bmgrLog.Warnf("Block disconnected notification is not " +
+ syncLog.Warnf("Block disconnected notification is not " +
"BlockDisconnectedNtfnsData.")
break
}
@@ -2994,7 +2990,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
case blockchain.NTReorganization:
rd, ok := notification.Data.(*blockchain.ReorganizationNtfnsData)
if !ok {
- bmgrLog.Warnf("Chain reorganization notification is malformed")
+ syncLog.Warnf("Chain reorganization notification is malformed")
break
}
@@ -3650,12 +3646,11 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP
},
}
s.txMemPool = mempool.New(&txC)
- s.blockManager, err = newBlockManager(&blockManagerConfig{
+ s.syncManager, err = netsync.New(&netsync.Config{
PeerNotifier: &s,
Chain: s.chain,
ChainParams: s.chainParams,
SigCache: s.sigCache,
- SubsidyCache: s.subsidyCache,
TxMemPool: s.txMemPool,
RpcServer: func() *rpcserver.Server {
return s.rpcServer
@@ -3703,7 +3698,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP
TimeSource: s.timeSource,
SubsidyCache: s.subsidyCache,
ChainParams: s.chainParams,
- BlockManager: s.blockManager,
+ BlockManager: s.syncManager,
MiningTimeOffset: cfg.MiningTimeOffset,
BestSnapshot: s.chain.BestSnapshot,
BlockByHash: s.chain.BlockByHash,
@@ -3747,9 +3742,9 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP
PermitConnectionlessMining: cfg.SimNet || cfg.RegNet,
BgBlkTmplGenerator: s.bg,
MiningAddrs: cfg.miningAddrs,
- ProcessBlock: s.blockManager.ProcessBlock,
+ ProcessBlock: s.syncManager.ProcessBlock,
ConnectedCount: s.ConnectedCount,
- IsCurrent: s.blockManager.IsCurrent,
+ IsCurrent: s.syncManager.IsCurrent,
})
}
@@ -3852,7 +3847,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP
rpcsConfig := rpcserver.Config{
Listeners: rpcListeners,
ConnMgr: &rpcConnManager{&s},
- SyncMgr: &rpcSyncMgr{server: &s, blockMgr: s.blockManager},
+ SyncMgr: &rpcSyncMgr{server: &s, syncMgr: s.syncManager},
FeeEstimator: s.feeEstimator,
TimeSource: s.timeSource,
Services: s.services,