From 3832c4bbe0b2ab143fcc902c5c0dda2032c36504 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 10 Dec 2020 15:10:23 -0600 Subject: [PATCH 1/6] server: Rename blockManager field to syncManager. This renames the field that houses the block manager to syncManager to more accurately reflect its purpose and updates comments accordingly. This is part of an overall effort to split the block manager out into a separate internal netsync package. --- internal/fees/estimator.go | 10 +-- internal/mining/interface.go | 4 +- internal/rpcserver/rpcwebsocket.go | 4 +- rpcadaptors.go | 22 +++--- server.go | 115 ++++++++++++++--------------- 5 files changed, 75 insertions(+), 80 deletions(-) diff --git a/internal/fees/estimator.go b/internal/fees/estimator.go index aeee8837ed..7bfe90ef36 100644 --- a/internal/fees/estimator.go +++ b/internal/fees/estimator.go @@ -623,11 +623,11 @@ func (stats *Estimator) removeFromMemPool(blocksInMemPool int32, rate feeRate) { // function but not on a previous newMemPoolTx. This leaves the fee db // in an undefined state and should never happen in regular use. If this // happens, then there is a logic or coding error somewhere, either in - // the estimator itself or on its hooking to the mempool/blockmanager. - // Either way, the easiest way to fix this is to completely delete the - // database and start again. - // During development, you can use a panic() here and we might return it - // after being confident that the estimator is completely bug free. + // the estimator itself or on its hooking to the mempool/network sync + // manager. Either way, the easiest way to fix this is to completely + // delete the database and start again. During development, you can use + // a panic() here and we might return it after being confident that the + // estimator is completely bug free. log.Errorf("Transaction count in bucket index %d and confirmation "+ "index %d became < 0", bucketIdx, confirmIdx) } diff --git a/internal/mining/interface.go b/internal/mining/interface.go index 998f91ef63..2a18271b77 100644 --- a/internal/mining/interface.go +++ b/internal/mining/interface.go @@ -67,7 +67,7 @@ type blockManagerFacade interface { // best chain. ForceReorganization(formerBest, newBest chainhash.Hash) error - // IsCurrent returns whether or not the block manager believes it is synced - // with the connected peers. + // IsCurrent returns whether or not the net sync manager believes it is + // synced with the connected peers. IsCurrent() bool } diff --git a/internal/rpcserver/rpcwebsocket.go b/internal/rpcserver/rpcwebsocket.go index fb8fdc72a9..2d488545f3 100644 --- a/internal/rpcserver/rpcwebsocket.go +++ b/internal/rpcserver/rpcwebsocket.go @@ -2028,8 +2028,8 @@ var ErrClientQuit = errors.New("client quit") // QueueNotification queues the passed notification to be sent to the websocket // client. This function, as the name implies, is only intended for // notifications since it has additional logic to prevent other subsystems, such -// as the memory pool and block manager, from blocking even when the send -// channel is full. +// as the memory pool and sync manager, from blocking even when the send channel +// is full. // // If the client is in the process of shutting down, this function returns // ErrClientQuit. This is intended to be checked by long-running notification diff --git a/rpcadaptors.go b/rpcadaptors.go index 9f60a7832c..0631c35e6e 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -306,23 +306,23 @@ func (*rpcConnManager) Lookup(host string) ([]net.IP, error) { return dcrdLookup(host) } -// rpcSyncMgr provides a block manager for use with the RPC server and -// implements the rpcserver.SyncManager interface. +// rpcSyncMgr provides an adaptor for use with the RPC server and implements the +// rpcserver.SyncManager interface. type rpcSyncMgr struct { - server *server - blockMgr *blockManager + server *server + syncMgr *blockManager } // Ensure rpcSyncMgr implements the rpcserver.SyncManager interface. var _ rpcserver.SyncManager = (*rpcSyncMgr)(nil) -// IsCurrent returns whether or not the sync manager believes the chain is +// IsCurrent returns whether or not the net sync manager believes the chain is // current as compared to the rest of the network. // // This function is safe for concurrent access and is part of the // rpcserver.SyncManager interface implementation. func (b *rpcSyncMgr) IsCurrent() bool { - return b.blockMgr.IsCurrent() + return b.syncMgr.IsCurrent() } // SubmitBlock submits the provided block to the network after processing it @@ -331,7 +331,7 @@ func (b *rpcSyncMgr) IsCurrent() bool { // This function is safe for concurrent access and is part of the // rpcserver.SyncManager interface implementation. func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { - return b.blockMgr.ProcessBlock(block, flags) + return b.syncMgr.ProcessBlock(block, flags) } // SyncPeer returns the id of the current peer being synced with. @@ -339,7 +339,7 @@ func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block, flags blockchain.Behavior // This function is safe for concurrent access and is part of the // rpcserver.SyncManager interface implementation. func (b *rpcSyncMgr) SyncPeerID() int32 { - return b.blockMgr.SyncPeerID() + return b.syncMgr.SyncPeerID() } // LocateBlocks returns the hashes of the blocks after the first known block in @@ -355,19 +355,19 @@ func (b *rpcSyncMgr) LocateBlocks(locator blockchain.BlockLocator, hashStop *cha // TipGeneration returns the entire generation of blocks stemming from the // parent of the current tip. func (b *rpcSyncMgr) TipGeneration() ([]chainhash.Hash, error) { - return b.blockMgr.TipGeneration() + return b.syncMgr.TipGeneration() } // SyncHeight returns latest known block being synced to. func (b *rpcSyncMgr) SyncHeight() int64 { - return b.blockMgr.SyncHeight() + return b.syncMgr.SyncHeight() } // ProcessTransaction relays the provided transaction validation and insertion // into the memory pool. func (b *rpcSyncMgr) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, rateLimit bool, allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) { - return b.blockMgr.ProcessTransaction(tx, allowOrphans, + return b.syncMgr.ProcessTransaction(tx, allowOrphans, rateLimit, allowHighFees, tag) } diff --git a/server.go b/server.go index c5e5c36b4c..05d5dfe4c7 100644 --- a/server.go +++ b/server.go @@ -161,9 +161,9 @@ type relayMsg struct { immediate bool } -// updatePeerHeightsMsg is a message sent from the blockmanager to the server -// after a new block has been accepted. The purpose of the message is to update -// the heights of peers that were known to announce the block before we +// updatePeerHeightsMsg is a message sent from the net sync manager to the +// server after a new block has been accepted. The purpose of the message is to +// update the heights of peers that were known to announce the block before we // connected it to the main chain or recognized it as an orphan. With these // updates, peer heights will be kept up to date, allowing for fresh data when // selecting sync peer candidacy. @@ -462,7 +462,7 @@ type server struct { sigCache *txscript.SigCache subsidyCache *standalone.SubsidyCache rpcServer *rpcserver.Server - blockManager *blockManager + syncManager *blockManager bg *mining.BgBlkTmplGenerator chain *blockchain.BlockChain txMemPool *mempool.TxPool @@ -501,8 +501,7 @@ type server struct { lotteryDataBroadcast map[chainhash.Hash]struct{} } -// serverPeer extends the peer to maintain state shared by the server and -// the blockmanager. +// serverPeer extends the peer to maintain state shared by the server. type serverPeer struct { *peer.Peer @@ -524,7 +523,8 @@ type serverPeer struct { getMiningStateSent bool initStateSent bool - // The following chans are used to sync blockmanager and server. + // The following chans are used to synchronize the net sync manager and + // server. txProcessed chan struct{} blockProcessed chan struct{} @@ -699,7 +699,7 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej // Advertise the local address when the server accepts incoming // connections and it believes itself to be close to the best // known tip. - if !cfg.DisableListen && sp.server.blockManager.IsCurrent() { + if !cfg.DisableListen && sp.server.syncManager.IsCurrent() { // Get address that best matches. lna := addrManager.GetBestLocalAddress(remoteAddr) if addrmgr.IsRoutable(lna) { @@ -730,8 +730,8 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej // the local clock to keep the network time in sync. sp.server.timeSource.AddTimeSample(p.Addr(), msg.Timestamp) - // Signal the block manager this peer is a new sync candidate. - sp.server.blockManager.NewPeer(sp.Peer) + // Signal the net sync manager this peer is a new sync candidate. + sp.server.syncManager.NewPeer(sp.Peer) // Add valid peer to the server. sp.server.AddPeer(sp) @@ -807,12 +807,8 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState } sp.getMiningStateSent = true - // Access the block manager and get the list of best blocks to mine on. - bm := sp.server.blockManager - mp := sp.server.txMemPool - best := sp.server.chain.BestSnapshot() - // Send out blank mining states if it's early in the blockchain. + best := sp.server.chain.BestSnapshot() if best.Height < sp.server.chainParams.StakeValidationHeight-1 { err := sp.pushMiningStateMsg(0, nil, nil) if err != nil { @@ -825,9 +821,10 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState // Obtain the entire generation of blocks stemming from the parent of // the current tip. - children, err := bm.TipGeneration() + sm := sp.server.syncManager + children, err := sm.TipGeneration() if err != nil { - peerLog.Warnf("failed to access block manager to get the generation "+ + peerLog.Warnf("failed to access sync manager to get the generation "+ "for a mining state request (block: %v): %v", best.Hash, err) return } @@ -836,6 +833,7 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState // list to the maximum number of allowed eligible block hashes per // mining state message. There is nothing to send when there are no // eligible blocks. + mp := sp.server.txMemPool blockHashes := mining.SortParentsByVotes(mp, best.Hash, children, sp.server.chainParams) numBlocks := len(blockHashes) @@ -871,7 +869,7 @@ func (sp *serverPeer) OnGetMiningState(p *peer.Peer, msg *wire.MsgGetMiningState // OnMiningState is invoked when a peer receives a miningstate wire message. It // requests the data advertised in the message from the peer. func (sp *serverPeer) OnMiningState(p *peer.Peer, msg *wire.MsgMiningState) { - err := sp.server.blockManager.RequestFromPeer(sp.Peer, msg.BlockHashes, + err := sp.server.syncManager.RequestFromPeer(sp.Peer, msg.BlockHashes, msg.VoteHashes) if err != nil { peerLog.Warnf("couldn't handle mining state message: %v", @@ -888,12 +886,8 @@ func (sp *serverPeer) OnGetInitState(p *peer.Peer, msg *wire.MsgGetInitState) { } sp.initStateSent = true - // Access the block manager and get the list of best blocks to mine on. - bm := sp.server.blockManager - mp := sp.server.txMemPool - best := sp.server.chain.BestSnapshot() - // Send out blank mining states if it's early in the blockchain. + best := sp.server.chain.BestSnapshot() if best.Height < sp.server.chainParams.StakeValidationHeight-1 { sp.QueueMessage(wire.NewMsgInitState(), nil) return @@ -913,12 +907,14 @@ func (sp *serverPeer) OnGetInitState(p *peer.Peer, msg *wire.MsgGetInitState) { // Fetch head block hashes if we need to send either them or their // votes. + mp := sp.server.txMemPool if wantBlocks || wantVotes { // Obtain the entire generation of blocks stemming from the // parent of the current tip. - children, err := bm.TipGeneration() + sm := sp.server.syncManager + children, err := sm.TipGeneration() if err != nil { - peerLog.Warnf("Failed to access block manager to get the generation "+ + peerLog.Warnf("Failed to access sync manager to get the generation "+ "for a init state request (block: %v): %v", best.Hash, err) return } @@ -979,7 +975,7 @@ func (sp *serverPeer) OnInitState(p *peer.Peer, msg *wire.MsgInitState) { txHashes = append(txHashes, &msg.TSpendHashes[i]) } - err := sp.server.blockManager.RequestFromPeer(sp.Peer, blockHashes, + err := sp.server.syncManager.RequestFromPeer(sp.Peer, blockHashes, txHashes) if err != nil { peerLog.Warnf("couldn't handle init state message: %v", err) @@ -1004,12 +1000,12 @@ func (sp *serverPeer) OnTx(p *peer.Peer, msg *wire.MsgTx) { iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) p.AddKnownInventory(iv) - // Queue the transaction up to be handled by the block manager and + // Queue the transaction up to be handled by the net sync manager and // intentionally block further receives until the transaction is fully // processed and known good or bad. This helps prevent a malicious peer // from queuing up a bunch of bad transactions before disconnecting (or // being disconnected) and wasting memory. - sp.server.blockManager.QueueTx(tx, sp.Peer, sp.txProcessed) + sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed) <-sp.txProcessed } @@ -1024,23 +1020,22 @@ func (sp *serverPeer) OnBlock(p *peer.Peer, msg *wire.MsgBlock, buf []byte) { iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) p.AddKnownInventory(iv) - // Queue the block up to be handled by the block manager and + // Queue the block up to be handled by the net sync manager and // intentionally block further receives until the network block is fully // processed and known good or bad. This helps prevent a malicious peer // from queuing up a bunch of bad blocks before disconnecting (or being // disconnected) and wasting memory. Additionally, this behavior is - // depended on by at least the block acceptance test tool as the - // reference implementation processes blocks in the same thread and - // therefore blocks further messages until the network block has been - // fully processed. - sp.server.blockManager.QueueBlock(block, sp.Peer, sp.blockProcessed) + // depended on by at least the block acceptance test tool as the reference + // implementation processes blocks in the same thread and therefore blocks + // further messages until the network block has been fully processed. + sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed) <-sp.blockProcessed } // OnInv is invoked when a peer receives an inv wire message and is used to // examine the inventory being advertised by the remote peer and react -// accordingly. We pass the message down to blockmanager which will call -// QueueMessage with any appropriate responses. +// accordingly. We pass the message down to the net sync manager which will +// call QueueMessage with any appropriate responses. func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) { // Ban peers sending empty inventory requests. if len(msg.InvList) == 0 { @@ -1049,7 +1044,7 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) { } if !cfg.BlocksOnly { - sp.server.blockManager.QueueInv(msg, sp.Peer) + sp.server.syncManager.QueueInv(msg, sp.Peer) return } @@ -1068,11 +1063,11 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) { } } - sp.server.blockManager.QueueInv(newInv, sp.Peer) + sp.server.syncManager.QueueInv(newInv, sp.Peer) } // OnHeaders is invoked when a peer receives a headers wire message. The -// message is passed down to the block manager. +// message is passed down to the net sync manager. func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { // Ban peers sending empty headers requests. if len(msg.Headers) == 0 { @@ -1080,7 +1075,7 @@ func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { return } - sp.server.blockManager.QueueHeaders(msg, sp.Peer) + sp.server.syncManager.QueueHeaders(msg, sp.Peer) } // handleGetData is invoked when a peer receives a getdata wire message and is @@ -1206,7 +1201,7 @@ func (sp *serverPeer) OnGetBlocks(p *peer.Peer, msg *wire.MsgGetBlocks) { // OnGetHeaders is invoked when a peer receives a getheaders wire message. func (sp *serverPeer) OnGetHeaders(p *peer.Peer, msg *wire.MsgGetHeaders) { // Ignore getheaders requests if not in sync. - if !sp.server.blockManager.IsCurrent() { + if !sp.server.syncManager.IsCurrent() { return } @@ -1268,7 +1263,7 @@ func (sp *serverPeer) OnGetCFilter(p *peer.Peer, msg *wire.MsgGetCFilter) { } // Ignore request if CFs are disabled or the chain is not yet synced. - if cfg.NoCFilters || !sp.server.blockManager.IsCurrent() { + if cfg.NoCFilters || !sp.server.syncManager.IsCurrent() { return } @@ -1337,7 +1332,7 @@ func (sp *serverPeer) OnGetCFilter(p *peer.Peer, msg *wire.MsgGetCFilter) { // OnGetCFilterV2 is invoked when a peer receives a getcfilterv2 wire message. func (sp *serverPeer) OnGetCFilterV2(_ *peer.Peer, msg *wire.MsgGetCFilterV2) { // Ignore request if the chain is not yet synced. - if !sp.server.blockManager.IsCurrent() { + if !sp.server.syncManager.IsCurrent() { return } @@ -1370,7 +1365,7 @@ func (sp *serverPeer) OnGetCFHeaders(p *peer.Peer, msg *wire.MsgGetCFHeaders) { } // Ignore request if CFs are disabled or the chain is not yet synced. - if cfg.NoCFilters || !sp.server.blockManager.IsCurrent() { + if cfg.NoCFilters || !sp.server.syncManager.IsCurrent() { return } @@ -1579,7 +1574,7 @@ func (sp *serverPeer) OnNotFound(p *peer.Peer, msg *wire.MsgNotFound) { return } } - sp.server.blockManager.QueueNotFound(msg, p) + sp.server.syncManager.QueueNotFound(msg, p) } // randomUint16Number returns a random uint16 in a specified input range. Note @@ -2317,9 +2312,10 @@ func (s *server) peerDoneHandler(sp *serverPeer) { sp.WaitForDisconnect() s.donePeers <- sp - // Only tell block manager we are gone if we ever told it we existed. + // Notify the net sync manager the peer is gone if it was ever notified that + // the peer existed. if sp.VersionKnown() { - s.blockManager.DonePeer(sp.Peer) + s.syncManager.DonePeer(sp.Peer) tipHash := &s.chain.BestSnapshot().Hash isTreasuryEnabled, err := s.chain.IsTreasuryAgendaActive(tipHash) @@ -2342,13 +2338,12 @@ func (s *server) peerDoneHandler(sp *serverPeer) { // peers to and from the server, banning peers, and broadcasting messages to // peers. It must be run in a goroutine. func (s *server) peerHandler(ctx context.Context) { - // Start the address manager and block manager, both of which are needed - // by peers. This is done here since their lifecycle is closely tied - // to this handler and rather than adding more channels to synchronize - // things, it's easier and slightly faster to simply start and stop them - // in this handler. + // Start the address manager and sync manager, both of which are needed by + // peers. This is done here since their lifecycle is closely tied to this + // handler and rather than adding more channels to synchronize things, it's + // easier and slightly faster to simply start and stop them in this handler. s.addrManager.Start() - s.blockManager.Start() + s.syncManager.Start() srvrLog.Tracef("Starting peer handler") @@ -2405,7 +2400,7 @@ out: } } - s.blockManager.Stop() + s.syncManager.Stop() s.addrManager.Stop() // Drain channels before exiting so nothing is left waiting around @@ -2670,7 +2665,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // Don't relay or notify RPC clients with winning tickets if we are not // current. Other peers that are current should already know about it // and clients, such as wallets, shouldn't be voting on old blocks. - if !s.blockManager.IsCurrent() { + if !s.syncManager.IsCurrent() { return } @@ -3650,7 +3645,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP }, } s.txMemPool = mempool.New(&txC) - s.blockManager, err = newBlockManager(&blockManagerConfig{ + s.syncManager, err = newBlockManager(&blockManagerConfig{ PeerNotifier: &s, Chain: s.chain, ChainParams: s.chainParams, @@ -3703,7 +3698,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP TimeSource: s.timeSource, SubsidyCache: s.subsidyCache, ChainParams: s.chainParams, - BlockManager: s.blockManager, + BlockManager: s.syncManager, MiningTimeOffset: cfg.MiningTimeOffset, BestSnapshot: s.chain.BestSnapshot, BlockByHash: s.chain.BlockByHash, @@ -3747,9 +3742,9 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP PermitConnectionlessMining: cfg.SimNet || cfg.RegNet, BgBlkTmplGenerator: s.bg, MiningAddrs: cfg.miningAddrs, - ProcessBlock: s.blockManager.ProcessBlock, + ProcessBlock: s.syncManager.ProcessBlock, ConnectedCount: s.ConnectedCount, - IsCurrent: s.blockManager.IsCurrent, + IsCurrent: s.syncManager.IsCurrent, }) } @@ -3852,7 +3847,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP rpcsConfig := rpcserver.Config{ Listeners: rpcListeners, ConnMgr: &rpcConnManager{&s}, - SyncMgr: &rpcSyncMgr{server: &s, blockMgr: s.blockManager}, + SyncMgr: &rpcSyncMgr{server: &s, syncMgr: s.syncManager}, FeeEstimator: s.feeEstimator, TimeSource: s.timeSource, Services: s.services, From 25fbc14e3fe23133b0b18b0fa65f4d38580e1894 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 10 Dec 2020 15:10:23 -0600 Subject: [PATCH 2/6] multi: Rename BMGR subsystem to SYNC. This renames the block manager logging subsystem from BMGR to SYNC and updates all related documentation accordingly. This is part of an overall effort to split the block manager out into a separate internal netsync package. --- blockdb.go | 2 +- blockmanager.go | 104 ++++++++++++++-------------- docs/json_rpc_api.mediawiki | 4 +- internal/rpcserver/rpcserverhelp.go | 2 +- log.go | 4 +- server.go | 18 ++--- 6 files changed, 67 insertions(+), 67 deletions(-) diff --git a/blockdb.go b/blockdb.go index f3fbaa2cf0..38b23835b5 100644 --- a/blockdb.go +++ b/blockdb.go @@ -244,7 +244,7 @@ func dumpBlockChain(params *chaincfg.Params, b *blockchain.BlockChain) error { progressLogger.LogBlockHeight(bl.MsgBlock(), tipHeight) } - bmgrLog.Infof("Successfully dumped the blockchain (%v blocks) to %v.", + srvrLog.Infof("Successfully dumped the blockchain (%v blocks) to %v.", tipHeight, cfg.DumpBlockchain) return nil diff --git a/blockmanager.go b/blockmanager.go index 177cc3499b..5d01cd877d 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -422,13 +422,13 @@ func (b *blockManager) startSync() { blkLocator, err := b.cfg.Chain.LatestBlockLocator() if err != nil { - bmgrLog.Errorf("Failed to get block locator for the "+ + syncLog.Errorf("Failed to get block locator for the "+ "latest block: %v", err) return } locator := chainBlockLocatorToHashes(blkLocator) - bmgrLog.Infof("Syncing to block height %d from peer %v", + syncLog.Infof("Syncing to block height %d from peer %v", bestPeer.LastBlock(), bestPeer.Addr()) // The chain is not synced whenever the current best height is less than @@ -462,18 +462,18 @@ func (b *blockManager) startSync() { err := bestPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { - bmgrLog.Errorf("Failed to push getheadermsg for the "+ + syncLog.Errorf("Failed to push getheadermsg for the "+ "latest blocks: %v", err) return } b.headersFirstMode = true - bmgrLog.Infof("Downloading headers for blocks %d to "+ + syncLog.Infof("Downloading headers for blocks %d to "+ "%d from peer %s", best.Height+1, b.nextCheckpoint.Height, bestPeer.Addr()) } else { err := bestPeer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { - bmgrLog.Errorf("Failed to push getblocksmsg for the "+ + syncLog.Errorf("Failed to push getblocksmsg for the "+ "latest blocks: %v", err) return } @@ -483,7 +483,7 @@ func (b *blockManager) startSync() { b.syncHeight = bestPeer.LastBlock() b.syncHeightMtx.Unlock() } else { - bmgrLog.Warnf("No sync peer candidates available") + syncLog.Warnf("No sync peer candidates available") } } @@ -532,7 +532,7 @@ func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) { wire.InitStateHeadBlockVotes, wire.InitStateTSpends) if err != nil { - bmgrLog.Errorf("Unexpected error while "+ + syncLog.Errorf("Unexpected error while "+ "building getinitstate msg: %v", err) return @@ -555,7 +555,7 @@ func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { return } - bmgrLog.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) + syncLog.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) // Initialize the peer state isSyncCandidate := b.isSyncCandidate(peer) @@ -583,7 +583,7 @@ func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) { state, exists := b.peerStates[peer] if !exists { - bmgrLog.Warnf("Received done peer message for unknown peer %s", peer) + syncLog.Warnf("Received done peer message for unknown peer %s", peer) return } @@ -689,7 +689,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { peer := tmsg.peer state, exists := b.peerStates[peer] if !exists { - bmgrLog.Warnf("Received tx message from unknown peer %s", peer) + syncLog.Warnf("Received tx message from unknown peer %s", peer) return } @@ -707,7 +707,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // send a reject message here because if the transaction was already // rejected, the transaction was unsolicited. if _, exists = b.rejectedTxns[*txHash]; exists { - bmgrLog.Debugf("Ignoring unsolicited previously rejected "+ + syncLog.Debugf("Ignoring unsolicited previously rejected "+ "transaction %v from %s", txHash, peer) return } @@ -736,10 +736,10 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // so log it as an actual error. var rErr mempool.RuleError if errors.As(err, &rErr) { - bmgrLog.Debugf("Rejected transaction %v from %s: %v", + syncLog.Debugf("Rejected transaction %v from %s: %v", txHash, peer, err) } else { - bmgrLog.Errorf("Failed to process transaction %v: %v", + syncLog.Errorf("Failed to process transaction %v: %v", txHash, err) } @@ -909,7 +909,7 @@ func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beh for i := 0; i < len(b.prevOrphans[*processHash]); i++ { orphan := b.prevOrphans[*processHash][i] if orphan == nil { - bmgrLog.Warnf("Found a nil entry at index %d in the orphan "+ + syncLog.Warnf("Found a nil entry at index %d in the orphan "+ "dependency list for block %v", i, processHash) continue } @@ -952,7 +952,7 @@ func (b *blockManager) processBlockAndOrphans(block *dcrutil.Block, flags blockc blockHash := block.Hash() forkLen, err := b.cfg.Chain.ProcessBlock(block, flags) if errors.Is(err, blockchain.ErrMissingParent) { - bmgrLog.Infof("Adding orphan block %v with parent %v", blockHash, + syncLog.Infof("Adding orphan block %v with parent %v", blockHash, block.MsgBlock().Header.PrevBlock) b.addOrphanBlock(block) @@ -988,14 +988,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { peer := bmsg.peer state, exists := b.peerStates[peer] if !exists { - bmgrLog.Warnf("Received block message from unknown peer %s", peer) + syncLog.Warnf("Received block message from unknown peer %s", peer) return } // If we didn't ask for this block then the peer is misbehaving. blockHash := bmsg.block.Hash() if _, exists := state.requestedBlocks[*blockHash]; !exists { - bmgrLog.Warnf("Got unrequested block %v from %s -- "+ + syncLog.Warnf("Got unrequested block %v from %s -- "+ "disconnecting", blockHash, bmsg.peer.Addr()) bmsg.peer.Disconnect() return @@ -1041,16 +1041,16 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // it as an actual error. var rErr blockchain.RuleError if errors.As(err, &rErr) { - bmgrLog.Infof("Rejected block %v from %s: %v", blockHash, + syncLog.Infof("Rejected block %v from %s: %v", blockHash, peer, err) } else { - bmgrLog.Errorf("Failed to process block %v: %v", + syncLog.Errorf("Failed to process block %v: %v", blockHash, err) } var dbErr database.Error if errors.As(err, &dbErr) && dbErr.ErrorCode == database.ErrCorruption { - bmgrLog.Errorf("Critical failure: %v", dbErr.Error()) + syncLog.Errorf("Critical failure: %v", dbErr.Error()) } // Convert the error into an appropriate reject message and @@ -1066,13 +1066,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { orphanRoot := b.orphanRoot(blockHash) blkLocator, err := b.cfg.Chain.LatestBlockLocator() if err != nil { - bmgrLog.Warnf("Failed to get block locator for the "+ + syncLog.Warnf("Failed to get block locator for the "+ "latest block: %v", err) } else { locator := chainBlockLocatorToHashes(blkLocator) err = peer.PushGetBlocksMsg(locator, orphanRoot) if err != nil { - bmgrLog.Warnf("Failed to push getblocksmsg for the "+ + syncLog.Warnf("Failed to push getblocksmsg for the "+ "latest block: %v", err) } } @@ -1147,11 +1147,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { locator := []chainhash.Hash{*prevHash} err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { - bmgrLog.Warnf("Failed to send getheaders message to "+ + syncLog.Warnf("Failed to send getheaders message to "+ "peer %s: %v", peer.Addr(), err) return } - bmgrLog.Infof("Downloading headers for blocks %d to %d from peer %s", + syncLog.Infof("Downloading headers for blocks %d to %d from peer %s", prevHeight+1, b.nextCheckpoint.Height, b.syncPeer.Addr()) return } @@ -1161,11 +1161,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // from the block after this one up to the end of the chain (zero hash). b.headersFirstMode = false b.headerList.Init() - bmgrLog.Infof("Reached the final checkpoint -- switching to normal mode") + syncLog.Infof("Reached the final checkpoint -- switching to normal mode") locator := []chainhash.Hash{*blockHash} err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { - bmgrLog.Warnf("Failed to send getblocks message to peer %s: %v", + syncLog.Warnf("Failed to send getblocks message to peer %s: %v", peer.Addr(), err) return } @@ -1183,7 +1183,7 @@ func (b *blockManager) proactivelyEvictSigCacheEntries(bestHeight int64) { evictHeight := bestHeight - txscript.ProactiveEvictionDepth block, err := b.cfg.Chain.BlockByHeight(evictHeight) if err != nil { - bmgrLog.Warnf("Failed to retrieve the block at height %d: %v", + syncLog.Warnf("Failed to retrieve the block at height %d: %v", evictHeight, err) return } @@ -1196,7 +1196,7 @@ func (b *blockManager) proactivelyEvictSigCacheEntries(bestHeight int64) { func (b *blockManager) fetchHeaderBlocks() { // Nothing to do if there is no start header. if b.startHeader == nil { - bmgrLog.Warnf("fetchHeaderBlocks called with no start header") + syncLog.Warnf("fetchHeaderBlocks called with no start header") return } @@ -1208,14 +1208,14 @@ func (b *blockManager) fetchHeaderBlocks() { for e := b.startHeader; e != nil; e = e.Next() { node, ok := e.Value.(*headerNode) if !ok { - bmgrLog.Warn("Header list node type is not a headerNode") + syncLog.Warn("Header list node type is not a headerNode") continue } iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) haveInv, err := b.haveInventory(iv) if err != nil { - bmgrLog.Warnf("Unexpected failure when checking for "+ + syncLog.Warnf("Unexpected failure when checking for "+ "existing inventory during header block "+ "fetch: %v", err) continue @@ -1226,7 +1226,7 @@ func (b *blockManager) fetchHeaderBlocks() { syncPeerState.requestedBlocks[*node.hash] = struct{}{} err = gdmsg.AddInvVect(iv) if err != nil { - bmgrLog.Warnf("Failed to add invvect while fetching "+ + syncLog.Warnf("Failed to add invvect while fetching "+ "block headers: %v", err) } numRequested++ @@ -1246,7 +1246,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { peer := hmsg.peer _, exists := b.peerStates[peer] if !exists { - bmgrLog.Warnf("Received headers message from unknown peer %s", peer) + syncLog.Warnf("Received headers message from unknown peer %s", peer) return } @@ -1254,7 +1254,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { msg := hmsg.headers numHeaders := len(msg.Headers) if !b.headersFirstMode { - bmgrLog.Warnf("Got %d unrequested headers from %s -- "+ + syncLog.Warnf("Got %d unrequested headers from %s -- "+ "disconnecting", numHeaders, peer.Addr()) peer.Disconnect() return @@ -1276,7 +1276,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // Ensure there is a previous header to compare against. prevNodeEl := b.headerList.Back() if prevNodeEl == nil { - bmgrLog.Warnf("Header list does not contain a previous " + + syncLog.Warnf("Header list does not contain a previous " + "element as expected -- disconnecting peer") peer.Disconnect() return @@ -1293,7 +1293,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { b.startHeader = e } } else { - bmgrLog.Warnf("Received block header that does not "+ + syncLog.Warnf("Received block header that does not "+ "properly connect to the chain from peer %s "+ "-- disconnecting", peer.Addr()) peer.Disconnect() @@ -1304,11 +1304,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { if node.height == b.nextCheckpoint.Height { if node.hash.IsEqual(b.nextCheckpoint.Hash) { receivedCheckpoint = true - bmgrLog.Infof("Verified downloaded block "+ + syncLog.Infof("Verified downloaded block "+ "header against checkpoint at height "+ "%d/hash %s", node.height, node.hash) } else { - bmgrLog.Warnf("Block header at height %d/hash "+ + syncLog.Warnf("Block header at height %d/hash "+ "%s from peer %s does NOT match "+ "expected checkpoint hash of %s -- "+ "disconnecting", node.height, node.hash, @@ -1328,7 +1328,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // the next header links properly, it must be removed before // fetching the blocks. b.headerList.Remove(b.headerList.Front()) - bmgrLog.Infof("Received %v block headers: Fetching blocks", + syncLog.Infof("Received %v block headers: Fetching blocks", b.headerList.Len()) b.progressLogger.SetLastLogTime(time.Now()) b.fetchHeaderBlocks() @@ -1341,7 +1341,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { locator := []chainhash.Hash{*finalHash} err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { - bmgrLog.Warnf("Failed to send getheaders message to "+ + syncLog.Warnf("Failed to send getheaders message to "+ "peer %s: %v", peer.Addr(), err) return } @@ -1352,7 +1352,7 @@ func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) { peer := nfmsg.peer state, exists := b.peerStates[peer] if !exists { - bmgrLog.Warnf("Received notfound message from unknown peer %s", peer) + syncLog.Warnf("Received notfound message from unknown peer %s", peer) return } for _, inv := range nfmsg.notFound.InvList { @@ -1413,7 +1413,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { peer := imsg.peer state, exists := b.peerStates[peer] if !exists { - bmgrLog.Warnf("Received inv message from unknown peer %s", peer) + syncLog.Warnf("Received inv message from unknown peer %s", peer) return } @@ -1478,7 +1478,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // Request the inventory if we don't already have it. haveInv, err := b.haveInventory(iv) if err != nil { - bmgrLog.Warnf("Unexpected failure when checking for "+ + syncLog.Warnf("Unexpected failure when checking for "+ "existing inventory during inv message "+ "processing: %v", err) continue @@ -1515,7 +1515,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { orphanRoot := b.orphanRoot(&iv.Hash) blkLocator, err := b.cfg.Chain.LatestBlockLocator() if err != nil { - bmgrLog.Errorf("PEER: Failed to get block "+ + syncLog.Errorf("PEER: Failed to get block "+ "locator for the latest block: "+ "%v", err) continue @@ -1523,7 +1523,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { locator := chainBlockLocatorToHashes(blkLocator) err = peer.PushGetBlocksMsg(locator, orphanRoot) if err != nil { - bmgrLog.Errorf("PEER: Failed to push getblocksmsg "+ + syncLog.Errorf("PEER: Failed to push getblocksmsg "+ "for orphan chain: %v", err) } continue @@ -1541,7 +1541,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { locator := chainBlockLocatorToHashes(blkLocator) err = imsg.peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { - bmgrLog.Errorf("PEER: Failed to push getblocksmsg: "+ + syncLog.Errorf("PEER: Failed to push getblocksmsg: "+ "%v", err) } } @@ -1734,7 +1734,7 @@ out: } default: - bmgrLog.Warnf("Invalid message type in block handler: %T", msg) + syncLog.Warnf("Invalid message type in block handler: %T", msg) } case <-b.quit: @@ -1743,7 +1743,7 @@ out: } b.wg.Done() - bmgrLog.Trace("Block handler done") + syncLog.Trace("Block handler done") } // NewPeer informs the block manager of a newly active peer. @@ -1830,7 +1830,7 @@ func (b *blockManager) Start() { return } - bmgrLog.Trace("Starting block manager") + syncLog.Trace("Starting block manager") b.wg.Add(1) go b.blockHandler() } @@ -1839,12 +1839,12 @@ func (b *blockManager) Start() { // handlers and waiting for them to finish. func (b *blockManager) Stop() error { if atomic.AddInt32(&b.shutdown, 1) != 1 { - bmgrLog.Warnf("Block manager is already in the process of " + + syncLog.Warnf("Block manager is already in the process of " + "shutting down") return nil } - bmgrLog.Infof("Block manager shutting down") + syncLog.Infof("Block manager shutting down") close(b.quit) b.wg.Wait() return nil @@ -2019,7 +2019,7 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) { requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), peerStates: make(map[*peerpkg.Peer]*peerSyncState), - progressLogger: progresslog.New("Processed", bmgrLog), + progressLogger: progresslog.New("Processed", syncLog), msgChan: make(chan interface{}, config.MaxPeers*3), headerList: list.New(), quit: make(chan struct{}), @@ -2036,7 +2036,7 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) { bm.resetHeaderState(&best.Hash, best.Height) } } else { - bmgrLog.Info("Checkpoints are disabled") + syncLog.Info("Checkpoints are disabled") } bm.syncHeightMtx.Lock() diff --git a/docs/json_rpc_api.mediawiki b/docs/json_rpc_api.mediawiki index d1fe4c520b..675b916b1e 100644 --- a/docs/json_rpc_api.mediawiki +++ b/docs/json_rpc_api.mediawiki @@ -549,7 +549,7 @@ the method name for further details such as parameter and return information. : Dynamically changes the debug logging level. : The levelspec can either be a debug level or of the form =,=,... : The valid debug levels are trace, debug, info, warn, error, and critical. -: The valid subsystems are AMGR, ADXR, BCDB, BMGR, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, and TXMP. +: The valid subsystems are AMGR, ADXR, BCDB, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, SYNC, and TXMP. : Additionally, the special keyword show can be used to get a list of the available subsystems. |- !Returns @@ -559,7 +559,7 @@ the method name for further details such as parameter and return information. |Done. |- !Example show Return -|Supported subsystems [AMGR ADXR BCDB BMGR DCRD CHAN DISC PEER RPCS SCRP SRVR TXMP] +|Supported subsystems [AMGR ADXR BCDB DCRD CHAN DISC PEER RPCS SCRP SRVR SYNC TXMP] |} ---- diff --git a/internal/rpcserver/rpcserverhelp.go b/internal/rpcserver/rpcserverhelp.go index cc000d0c4c..5006d7a8a2 100644 --- a/internal/rpcserver/rpcserverhelp.go +++ b/internal/rpcserver/rpcserverhelp.go @@ -22,7 +22,7 @@ var helpDescsEnUS = map[string]string{ "The levelspec can either a debug level or of the form:\n" + "=,=,...\n" + "The valid debug levels are trace, debug, info, warn, error, and critical.\n" + - "The valid subsystems are AMGR, ADXR, BCDB, BMGR, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, and TXMP.\n" + + "The valid subsystems are AMGR, ADXR, BCDB, DCRD, CHAN, DISC, PEER, RPCS, SCRP, SRVR, SYNC, and TXMP.\n" + "Finally the keyword 'show' will return a list of the available subsystems.", "debuglevel-levelspec": "The debug level(s) to use or the keyword 'show'", "debuglevel--condition0": "levelspec!=show", diff --git a/log.go b/log.go index 28843d3600..b199958acc 100644 --- a/log.go +++ b/log.go @@ -60,7 +60,6 @@ var ( adxrLog = backendLog.Logger("ADXR") amgrLog = backendLog.Logger("AMGR") bcdbLog = backendLog.Logger("BCDB") - bmgrLog = backendLog.Logger("BMGR") chanLog = backendLog.Logger("CHAN") cmgrLog = backendLog.Logger("CMGR") dcrdLog = backendLog.Logger("DCRD") @@ -73,6 +72,7 @@ var ( scrpLog = backendLog.Logger("SCRP") srvrLog = backendLog.Logger("SRVR") stkeLog = backendLog.Logger("STKE") + syncLog = backendLog.Logger("SYNC") txmpLog = backendLog.Logger("TXMP") trsyLog = backendLog.Logger("TRSY") ) @@ -100,7 +100,6 @@ var subsystemLoggers = map[string]slog.Logger{ "ADXR": adxrLog, "AMGR": amgrLog, "BCDB": bcdbLog, - "BMGR": bmgrLog, "CHAN": chanLog, "CMGR": cmgrLog, "DCRD": dcrdLog, @@ -113,6 +112,7 @@ var subsystemLoggers = map[string]slog.Logger{ "SCRP": scrpLog, "SRVR": srvrLog, "STKE": stkeLog, + "SYNC": syncLog, "TXMP": txmpLog, "TRSY": trsyLog, } diff --git a/server.go b/server.go index 05d5dfe4c7..a299883986 100644 --- a/server.go +++ b/server.go @@ -2647,7 +2647,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // which could result in a deadlock. block, ok := notification.Data.(*dcrutil.Block) if !ok { - bmgrLog.Warnf("New tip block checked notification is not a block.") + syncLog.Warnf("New tip block checked notification is not a block.") break } @@ -2671,7 +2671,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat band, ok := notification.Data.(*blockchain.BlockAcceptedNtfnsData) if !ok { - bmgrLog.Warnf("Chain accepted notification is not " + + syncLog.Warnf("Chain accepted notification is not " + "BlockAcceptedNtfnsData.") break } @@ -2715,7 +2715,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // blockchain. wt, _, _, err := s.chain.LotteryDataForBlock(blockHash) if err != nil { - bmgrLog.Errorf("Couldn't calculate winning tickets for "+ + syncLog.Errorf("Couldn't calculate winning tickets for "+ "accepted block %v: %v", blockHash, err.Error()) } else { // Notify registered websocket clients of newly eligible tickets @@ -2760,8 +2760,8 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTBlockConnected: ntfn, ok := notification.Data.(*blockchain.BlockConnectedNtfnsData) if !ok { - bmgrLog.Warnf("Block connected notification is not " + - "BlockConnectedNtfnsData.") + syncLog.Warnf("Block connected notification is not " + + "BlockConnectedNtfnsData") break } block := ntfn.Block @@ -2870,7 +2870,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTSpentAndMissedTickets: tnd, ok := notification.Data.(*blockchain.TicketNotificationsData) if !ok { - bmgrLog.Warnf("Tickets connected notification is not " + + syncLog.Warnf("Tickets connected notification is not " + "TicketNotificationsData") break } @@ -2883,7 +2883,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTNewTickets: tnd, ok := notification.Data.(*blockchain.TicketNotificationsData) if !ok { - bmgrLog.Warnf("Tickets connected notification is not " + + syncLog.Warnf("Tickets connected notification is not " + "TicketNotificationsData") break } @@ -2896,7 +2896,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTBlockDisconnected: ntfn, ok := notification.Data.(*blockchain.BlockDisconnectedNtfnsData) if !ok { - bmgrLog.Warnf("Block disconnected notification is not " + + syncLog.Warnf("Block disconnected notification is not " + "BlockDisconnectedNtfnsData.") break } @@ -2989,7 +2989,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat case blockchain.NTReorganization: rd, ok := notification.Data.(*blockchain.ReorganizationNtfnsData) if !ok { - bmgrLog.Warnf("Chain reorganization notification is malformed") + syncLog.Warnf("Chain reorganization notification is malformed") break } From f20f1d5fb15bc5f5731cab218ecaac315e510d89 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 10 Dec 2020 15:10:24 -0600 Subject: [PATCH 3/6] server: Add temp sync manager interface. This adds a temporary sync manager interface to facilitate cleaner diffs when moving the block manager to a new package. This is part of an overall effort to split the block manager out into a separate internal netsync package. --- blockmanager.go | 24 ++++++++++++++++++++++++ rpcadaptors.go | 2 +- server.go | 2 +- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index 5d01cd877d..e130adfe56 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -2010,6 +2010,30 @@ func (b *blockManager) TicketPoolValue() (dcrutil.Amount, error) { return b.cfg.Chain.TicketPoolValue() } +// syncManager is a temporary interface to facilitate cleaner diffs when moving +// the block manager to a new package, so none of its members are documented. +// It will be removed in a future commit. +type syncManager interface { + NewPeer(p *peerpkg.Peer) + IsCurrent() bool + TipGeneration() ([]chainhash.Hash, error) + RequestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error + QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan struct{}) + QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done chan struct{}) + QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) + QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) + QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) + DonePeer(peer *peerpkg.Peer) + Start() + Stop() error + ForceReorganization(formerBest, newBest chainhash.Hash) error + ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) + SyncPeerID() int32 + SyncHeight() int64 + ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, rateLimit bool, + allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) +} + // newBlockManager returns a new Decred block manager. // Use Start to begin processing asynchronous block and inv updates. func newBlockManager(config *blockManagerConfig) (*blockManager, error) { diff --git a/rpcadaptors.go b/rpcadaptors.go index 0631c35e6e..88633b26ab 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -310,7 +310,7 @@ func (*rpcConnManager) Lookup(host string) ([]net.IP, error) { // rpcserver.SyncManager interface. type rpcSyncMgr struct { server *server - syncMgr *blockManager + syncMgr syncManager } // Ensure rpcSyncMgr implements the rpcserver.SyncManager interface. diff --git a/server.go b/server.go index a299883986..3f7d65cf11 100644 --- a/server.go +++ b/server.go @@ -462,7 +462,7 @@ type server struct { sigCache *txscript.SigCache subsidyCache *standalone.SubsidyCache rpcServer *rpcserver.Server - syncManager *blockManager + syncManager syncManager bg *mining.BgBlkTmplGenerator chain *blockchain.BlockChain txMemPool *mempool.TxPool From 1229cc35890e9677ca33a621bbb48ba7c834bf3d Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 10 Dec 2020 15:10:25 -0600 Subject: [PATCH 4/6] netsync: Split blockmanager into separate package. This does the minimum work necessary to refactor the block manager into its own internal package named netsync. The main motivation is that separating this code into its own package will improve its testability and more cleanly split the details related to syncing with the network from consensus validation. The API will certainly need some additional cleanup and changes to make it more usable outside of the specific circumstances it currently supports, however it is better to do that in future commits in order to keep the changeset as small as possible during this refactor. Overview of the major changes: - Create the new package - Move blockmanager.go -> internal/netsync/manager.go - Update logging to use the new netsync package logger - Rename blockManagerConfig to Config (now netsync.Config) - Move peerNotifier to netsync/interface.go (now netsync.PeerNotifier) - Move syncManager to netsync/interface.go (now netsync.SyncManager) - Rename newBlockManager to New (now netsync.New) - Update all references to the block manager to use the package - Add package logger - Initialize new package logger with sync logger --- internal/netsync/interface.go | 50 +++ internal/netsync/log.go | 22 ++ .../netsync/manager.go | 296 ++++++++---------- log.go | 2 + rpcadaptors.go | 3 +- server.go | 6 +- 6 files changed, 205 insertions(+), 174 deletions(-) create mode 100644 internal/netsync/interface.go create mode 100644 internal/netsync/log.go rename blockmanager.go => internal/netsync/manager.go (90%) diff --git a/internal/netsync/interface.go b/internal/netsync/interface.go new file mode 100644 index 0000000000..d1e4f7963b --- /dev/null +++ b/internal/netsync/interface.go @@ -0,0 +1,50 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package netsync + +import ( + "github.com/decred/dcrd/blockchain/v4" + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/dcrutil/v3" + "github.com/decred/dcrd/internal/mempool" + "github.com/decred/dcrd/peer/v2" + "github.com/decred/dcrd/wire" +) + +// PeerNotifier provides an interface to notify peers of status changes related +// to blocks and transactions. +type PeerNotifier interface { + // AnnounceNewTransactions generates and relays inventory vectors and + // notifies websocket clients of the passed transactions. + AnnounceNewTransactions(txns []*dcrutil.Tx) + + // UpdatePeerHeights updates the heights of all peers who have announced the + // latest connected main chain block, or a recognized orphan. + UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int64, updateSource *peer.Peer) +} + +// SyncManager is a temporary interface to facilitate cleaner diffs when moving +// the block manager to a new package, so none of its members are documented. +// It will be removed in a future commit. +type SyncManager interface { + NewPeer(p *peer.Peer) + IsCurrent() bool + TipGeneration() ([]chainhash.Hash, error) + RequestFromPeer(p *peer.Peer, blocks, txs []*chainhash.Hash) error + QueueTx(tx *dcrutil.Tx, peer *peer.Peer, done chan struct{}) + QueueBlock(block *dcrutil.Block, peer *peer.Peer, done chan struct{}) + QueueInv(inv *wire.MsgInv, peer *peer.Peer) + QueueHeaders(headers *wire.MsgHeaders, peer *peer.Peer) + QueueNotFound(notFound *wire.MsgNotFound, peer *peer.Peer) + DonePeer(peer *peer.Peer) + Start() + Stop() error + ForceReorganization(formerBest, newBest chainhash.Hash) error + ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) + SyncPeerID() int32 + SyncHeight() int64 + ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, rateLimit bool, + allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) +} diff --git a/internal/netsync/log.go b/internal/netsync/log.go new file mode 100644 index 0000000000..cbe84d3a1e --- /dev/null +++ b/internal/netsync/log.go @@ -0,0 +1,22 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package netsync + +import ( + "github.com/decred/slog" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +// The default amount of logging is none. +var log = slog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using slog. +func UseLogger(logger slog.Logger) { + log = logger +} diff --git a/blockmanager.go b/internal/netsync/manager.go similarity index 90% rename from blockmanager.go rename to internal/netsync/manager.go index e130adfe56..c830480a6d 100644 --- a/blockmanager.go +++ b/internal/netsync/manager.go @@ -3,7 +3,7 @@ // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. -package main +package netsync import ( "container/list" @@ -13,7 +13,6 @@ import ( "sync/atomic" "time" - "github.com/decred/dcrd/blockchain/standalone/v2" "github.com/decred/dcrd/blockchain/v4" "github.com/decred/dcrd/chaincfg/chainhash" "github.com/decred/dcrd/chaincfg/v3" @@ -197,55 +196,6 @@ type headerNode struct { hash *chainhash.Hash } -// peerNotifier provides an interface for server peer notifications. -type peerNotifier interface { - // AnnounceNewTransactions generates and relays inventory vectors and - // notifies websocket clients of the passed transactions. - AnnounceNewTransactions(txns []*dcrutil.Tx) - - // UpdatePeerHeights updates the heights of all peers who have - // announced the latest connected main chain block, or a recognized orphan. - UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int64, updateSource *peerpkg.Peer) -} - -// blockManangerConfig is a configuration struct for a blockManager. -type blockManagerConfig struct { - PeerNotifier peerNotifier - - // The following fields are for accessing the chain and its configuration. - Chain *blockchain.BlockChain - ChainParams *chaincfg.Params - SubsidyCache *standalone.SubsidyCache - - // SigCache defines the signature cache to use. - SigCache *txscript.SigCache - - // The following field provides access to the mempool. - TxMemPool *mempool.TxPool - - // RpcServer returns an instance of an RPC server to use for notifications. - // It may return nil if there is no active RPC server. - RpcServer func() *rpcserver.Server - - // DisableCheckpoints indicates whether or not the block manager should make - // use of checkpoints. - DisableCheckpoints bool - - // NoMiningStateSync indicates whether or not the block manager should - // perform an initial mining state synchronization with peers once they are - // believed to be fully synced. - NoMiningStateSync bool - - // MaxPeers specifies the maximum number of peers the server is expected to - // be connected with. It is primarily used as a hint for more efficient - // synchronization. - MaxPeers int - - // MaxOrphanTxs specifies the maximum number of orphan transactions the - // transaction pool associated with the server supports. - MaxOrphanTxs int -} - // peerSyncState stores additional information that the blockManager tracks // about a peer. type peerSyncState struct { @@ -265,7 +215,7 @@ type orphanBlock struct { // blockManager provides a concurrency safe block manager for handling all // incoming blocks. type blockManager struct { - cfg *blockManagerConfig + cfg Config started int32 shutdown int32 rejectedTxns map[chainhash.Hash]struct{} @@ -422,13 +372,13 @@ func (b *blockManager) startSync() { blkLocator, err := b.cfg.Chain.LatestBlockLocator() if err != nil { - syncLog.Errorf("Failed to get block locator for the "+ - "latest block: %v", err) + log.Errorf("Failed to get block locator for the latest block: %v", + err) return } locator := chainBlockLocatorToHashes(blkLocator) - syncLog.Infof("Syncing to block height %d from peer %v", + log.Infof("Syncing to block height %d from peer %v", bestPeer.LastBlock(), bestPeer.Addr()) // The chain is not synced whenever the current best height is less than @@ -462,19 +412,18 @@ func (b *blockManager) startSync() { err := bestPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { - syncLog.Errorf("Failed to push getheadermsg for the "+ - "latest blocks: %v", err) + log.Errorf("Failed to push getheadermsg for the latest "+ + "blocks: %v", err) return } b.headersFirstMode = true - syncLog.Infof("Downloading headers for blocks %d to "+ - "%d from peer %s", best.Height+1, - b.nextCheckpoint.Height, bestPeer.Addr()) + log.Infof("Downloading headers for blocks %d to %d from peer %s", + best.Height+1, b.nextCheckpoint.Height, bestPeer.Addr()) } else { err := bestPeer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { - syncLog.Errorf("Failed to push getblocksmsg for the "+ - "latest blocks: %v", err) + log.Errorf("Failed to push getblocksmsg for the latest "+ + "blocks: %v", err) return } } @@ -483,7 +432,7 @@ func (b *blockManager) startSync() { b.syncHeight = bestPeer.LastBlock() b.syncHeightMtx.Unlock() } else { - syncLog.Warnf("No sync peer candidates available") + log.Warnf("No sync peer candidates available") } } @@ -532,9 +481,8 @@ func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) { wire.InitStateHeadBlockVotes, wire.InitStateTSpends) if err != nil { - syncLog.Errorf("Unexpected error while "+ - "building getinitstate msg: %v", - err) + log.Errorf("Unexpected error while building getinitstate "+ + "msg: %v", err) return } msg = m @@ -555,7 +503,7 @@ func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { return } - syncLog.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) + log.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) // Initialize the peer state isSyncCandidate := b.isSyncCandidate(peer) @@ -583,7 +531,7 @@ func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) { state, exists := b.peerStates[peer] if !exists { - syncLog.Warnf("Received done peer message for unknown peer %s", peer) + log.Warnf("Received done peer message for unknown peer %s", peer) return } @@ -689,7 +637,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { peer := tmsg.peer state, exists := b.peerStates[peer] if !exists { - syncLog.Warnf("Received tx message from unknown peer %s", peer) + log.Warnf("Received tx message from unknown peer %s", peer) return } @@ -707,8 +655,8 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // send a reject message here because if the transaction was already // rejected, the transaction was unsolicited. if _, exists = b.rejectedTxns[*txHash]; exists { - syncLog.Debugf("Ignoring unsolicited previously rejected "+ - "transaction %v from %s", txHash, peer) + log.Debugf("Ignoring unsolicited previously rejected transaction %v "+ + "from %s", txHash, peer) return } @@ -736,11 +684,9 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // so log it as an actual error. var rErr mempool.RuleError if errors.As(err, &rErr) { - syncLog.Debugf("Rejected transaction %v from %s: %v", - txHash, peer, err) + log.Debugf("Rejected transaction %v from %s: %v", txHash, peer, err) } else { - syncLog.Errorf("Failed to process transaction %v: %v", - txHash, err) + log.Errorf("Failed to process transaction %v: %v", txHash, err) } // Convert the error into an appropriate reject message and @@ -909,7 +855,7 @@ func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beh for i := 0; i < len(b.prevOrphans[*processHash]); i++ { orphan := b.prevOrphans[*processHash][i] if orphan == nil { - syncLog.Warnf("Found a nil entry at index %d in the orphan "+ + log.Warnf("Found a nil entry at index %d in the orphan "+ "dependency list for block %v", i, processHash) continue } @@ -952,7 +898,7 @@ func (b *blockManager) processBlockAndOrphans(block *dcrutil.Block, flags blockc blockHash := block.Hash() forkLen, err := b.cfg.Chain.ProcessBlock(block, flags) if errors.Is(err, blockchain.ErrMissingParent) { - syncLog.Infof("Adding orphan block %v with parent %v", blockHash, + log.Infof("Adding orphan block %v with parent %v", blockHash, block.MsgBlock().Header.PrevBlock) b.addOrphanBlock(block) @@ -988,15 +934,15 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { peer := bmsg.peer state, exists := b.peerStates[peer] if !exists { - syncLog.Warnf("Received block message from unknown peer %s", peer) + log.Warnf("Received block message from unknown peer %s", peer) return } // If we didn't ask for this block then the peer is misbehaving. blockHash := bmsg.block.Hash() if _, exists := state.requestedBlocks[*blockHash]; !exists { - syncLog.Warnf("Got unrequested block %v from %s -- "+ - "disconnecting", blockHash, bmsg.peer.Addr()) + log.Warnf("Got unrequested block %v from %s -- disconnecting", + blockHash, bmsg.peer.Addr()) bmsg.peer.Disconnect() return } @@ -1041,16 +987,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // it as an actual error. var rErr blockchain.RuleError if errors.As(err, &rErr) { - syncLog.Infof("Rejected block %v from %s: %v", blockHash, - peer, err) + log.Infof("Rejected block %v from %s: %v", blockHash, peer, err) } else { - syncLog.Errorf("Failed to process block %v: %v", - blockHash, err) + log.Errorf("Failed to process block %v: %v", blockHash, err) } var dbErr database.Error if errors.As(err, &dbErr) && dbErr.ErrorCode == database.ErrCorruption { - syncLog.Errorf("Critical failure: %v", dbErr.Error()) + log.Errorf("Critical failure: %v", err) } // Convert the error into an appropriate reject message and @@ -1066,14 +1010,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { orphanRoot := b.orphanRoot(blockHash) blkLocator, err := b.cfg.Chain.LatestBlockLocator() if err != nil { - syncLog.Warnf("Failed to get block locator for the "+ - "latest block: %v", err) + log.Warnf("Failed to get block locator for the latest block: %v", + err) } else { locator := chainBlockLocatorToHashes(blkLocator) err = peer.PushGetBlocksMsg(locator, orphanRoot) if err != nil { - syncLog.Warnf("Failed to push getblocksmsg for the "+ - "latest block: %v", err) + log.Warnf("Failed to push getblocksmsg for the latest block: "+ + "%v", err) } } } else { @@ -1147,11 +1091,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { locator := []chainhash.Hash{*prevHash} err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { - syncLog.Warnf("Failed to send getheaders message to "+ - "peer %s: %v", peer.Addr(), err) + log.Warnf("Failed to send getheaders message to peer %s: %v", + peer.Addr(), err) return } - syncLog.Infof("Downloading headers for blocks %d to %d from peer %s", + log.Infof("Downloading headers for blocks %d to %d from peer %s", prevHeight+1, b.nextCheckpoint.Height, b.syncPeer.Addr()) return } @@ -1161,11 +1105,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // from the block after this one up to the end of the chain (zero hash). b.headersFirstMode = false b.headerList.Init() - syncLog.Infof("Reached the final checkpoint -- switching to normal mode") + log.Infof("Reached the final checkpoint -- switching to normal mode") locator := []chainhash.Hash{*blockHash} err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { - syncLog.Warnf("Failed to send getblocks message to peer %s: %v", + log.Warnf("Failed to send getblocks message to peer %s: %v", peer.Addr(), err) return } @@ -1183,8 +1127,8 @@ func (b *blockManager) proactivelyEvictSigCacheEntries(bestHeight int64) { evictHeight := bestHeight - txscript.ProactiveEvictionDepth block, err := b.cfg.Chain.BlockByHeight(evictHeight) if err != nil { - syncLog.Warnf("Failed to retrieve the block at height %d: %v", - evictHeight, err) + log.Warnf("Failed to retrieve the block at height %d: %v", evictHeight, + err) return } @@ -1196,7 +1140,7 @@ func (b *blockManager) proactivelyEvictSigCacheEntries(bestHeight int64) { func (b *blockManager) fetchHeaderBlocks() { // Nothing to do if there is no start header. if b.startHeader == nil { - syncLog.Warnf("fetchHeaderBlocks called with no start header") + log.Warnf("fetchHeaderBlocks called with no start header") return } @@ -1208,16 +1152,15 @@ func (b *blockManager) fetchHeaderBlocks() { for e := b.startHeader; e != nil; e = e.Next() { node, ok := e.Value.(*headerNode) if !ok { - syncLog.Warn("Header list node type is not a headerNode") + log.Warn("Header list node type is not a headerNode") continue } iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) haveInv, err := b.haveInventory(iv) if err != nil { - syncLog.Warnf("Unexpected failure when checking for "+ - "existing inventory during header block "+ - "fetch: %v", err) + log.Warnf("Unexpected failure when checking for existing "+ + "inventory during header block fetch: %v", err) continue } if !haveInv { @@ -1226,8 +1169,8 @@ func (b *blockManager) fetchHeaderBlocks() { syncPeerState.requestedBlocks[*node.hash] = struct{}{} err = gdmsg.AddInvVect(iv) if err != nil { - syncLog.Warnf("Failed to add invvect while fetching "+ - "block headers: %v", err) + log.Warnf("Failed to add invvect while fetching block "+ + "headers: %v", err) } numRequested++ } @@ -1246,7 +1189,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { peer := hmsg.peer _, exists := b.peerStates[peer] if !exists { - syncLog.Warnf("Received headers message from unknown peer %s", peer) + log.Warnf("Received headers message from unknown peer %s", peer) return } @@ -1254,8 +1197,8 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { msg := hmsg.headers numHeaders := len(msg.Headers) if !b.headersFirstMode { - syncLog.Warnf("Got %d unrequested headers from %s -- "+ - "disconnecting", numHeaders, peer.Addr()) + log.Warnf("Got %d unrequested headers from %s -- disconnecting", + numHeaders, peer.Addr()) peer.Disconnect() return } @@ -1276,8 +1219,8 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // Ensure there is a previous header to compare against. prevNodeEl := b.headerList.Back() if prevNodeEl == nil { - syncLog.Warnf("Header list does not contain a previous " + - "element as expected -- disconnecting peer") + log.Warnf("Header list does not contain a previous element as " + + "expected -- disconnecting peer") peer.Disconnect() return } @@ -1293,9 +1236,8 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { b.startHeader = e } } else { - syncLog.Warnf("Received block header that does not "+ - "properly connect to the chain from peer %s "+ - "-- disconnecting", peer.Addr()) + log.Warnf("Received block header that does not properly connect "+ + "to the chain from peer %s -- disconnecting", peer.Addr()) peer.Disconnect() return } @@ -1304,15 +1246,13 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { if node.height == b.nextCheckpoint.Height { if node.hash.IsEqual(b.nextCheckpoint.Hash) { receivedCheckpoint = true - syncLog.Infof("Verified downloaded block "+ - "header against checkpoint at height "+ - "%d/hash %s", node.height, node.hash) + log.Infof("Verified downloaded block header against "+ + "checkpoint at height %d/hash %s", node.height, node.hash) } else { - syncLog.Warnf("Block header at height %d/hash "+ - "%s from peer %s does NOT match "+ - "expected checkpoint hash of %s -- "+ - "disconnecting", node.height, node.hash, - peer.Addr(), b.nextCheckpoint.Hash) + log.Warnf("Block header at height %d/hash %s from peer %s "+ + "does NOT match expected checkpoint hash of %s -- "+ + "disconnecting", node.height, node.hash, peer.Addr(), + b.nextCheckpoint.Hash) peer.Disconnect() return } @@ -1328,7 +1268,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // the next header links properly, it must be removed before // fetching the blocks. b.headerList.Remove(b.headerList.Front()) - syncLog.Infof("Received %v block headers: Fetching blocks", + log.Infof("Received %v block headers: Fetching blocks", b.headerList.Len()) b.progressLogger.SetLastLogTime(time.Now()) b.fetchHeaderBlocks() @@ -1341,8 +1281,8 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { locator := []chainhash.Hash{*finalHash} err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { - syncLog.Warnf("Failed to send getheaders message to "+ - "peer %s: %v", peer.Addr(), err) + log.Warnf("Failed to send getheaders message to peer %s: %v", + peer.Addr(), err) return } } @@ -1352,7 +1292,7 @@ func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) { peer := nfmsg.peer state, exists := b.peerStates[peer] if !exists { - syncLog.Warnf("Received notfound message from unknown peer %s", peer) + log.Warnf("Received notfound message from unknown peer %s", peer) return } for _, inv := range nfmsg.notFound.InvList { @@ -1413,7 +1353,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { peer := imsg.peer state, exists := b.peerStates[peer] if !exists { - syncLog.Warnf("Received inv message from unknown peer %s", peer) + log.Warnf("Received inv message from unknown peer %s", peer) return } @@ -1478,9 +1418,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // Request the inventory if we don't already have it. haveInv, err := b.haveInventory(iv) if err != nil { - syncLog.Warnf("Unexpected failure when checking for "+ - "existing inventory during inv message "+ - "processing: %v", err) + log.Warnf("Unexpected failure when checking for existing "+ + "inventory during inv message processing: %v", err) continue } if !haveInv { @@ -1515,16 +1454,15 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { orphanRoot := b.orphanRoot(&iv.Hash) blkLocator, err := b.cfg.Chain.LatestBlockLocator() if err != nil { - syncLog.Errorf("PEER: Failed to get block "+ - "locator for the latest block: "+ - "%v", err) + log.Errorf("Failed to get block locator for the latest "+ + "block: %v", err) continue } locator := chainBlockLocatorToHashes(blkLocator) err = peer.PushGetBlocksMsg(locator, orphanRoot) if err != nil { - syncLog.Errorf("PEER: Failed to push getblocksmsg "+ - "for orphan chain: %v", err) + log.Errorf("Failed to push getblocksmsg for orphan chain: "+ + "%v", err) } continue } @@ -1541,8 +1479,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { locator := chainBlockLocatorToHashes(blkLocator) err = imsg.peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { - syncLog.Errorf("PEER: Failed to push getblocksmsg: "+ - "%v", err) + log.Errorf("PEER: Failed to push getblocksmsg: %v", err) } } } @@ -1734,7 +1671,7 @@ out: } default: - syncLog.Warnf("Invalid message type in block handler: %T", msg) + log.Warnf("Invalid message type in block handler: %T", msg) } case <-b.quit: @@ -1743,7 +1680,7 @@ out: } b.wg.Done() - syncLog.Trace("Block handler done") + log.Trace("Block handler done") } // NewPeer informs the block manager of a newly active peer. @@ -1830,7 +1767,7 @@ func (b *blockManager) Start() { return } - syncLog.Trace("Starting block manager") + log.Trace("Starting block manager") b.wg.Add(1) go b.blockHandler() } @@ -1839,12 +1776,11 @@ func (b *blockManager) Start() { // handlers and waiting for them to finish. func (b *blockManager) Stop() error { if atomic.AddInt32(&b.shutdown, 1) != 1 { - syncLog.Warnf("Block manager is already in the process of " + - "shutting down") + log.Warnf("Block manager is already in the process of shutting down") return nil } - syncLog.Infof("Block manager shutting down") + log.Infof("Block manager shutting down") close(b.quit) b.wg.Wait() return nil @@ -2010,40 +1946,60 @@ func (b *blockManager) TicketPoolValue() (dcrutil.Amount, error) { return b.cfg.Chain.TicketPoolValue() } -// syncManager is a temporary interface to facilitate cleaner diffs when moving -// the block manager to a new package, so none of its members are documented. -// It will be removed in a future commit. -type syncManager interface { - NewPeer(p *peerpkg.Peer) - IsCurrent() bool - TipGeneration() ([]chainhash.Hash, error) - RequestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error - QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan struct{}) - QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done chan struct{}) - QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) - QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) - QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) - DonePeer(peer *peerpkg.Peer) - Start() - Stop() error - ForceReorganization(formerBest, newBest chainhash.Hash) error - ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) - SyncPeerID() int32 - SyncHeight() int64 - ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, rateLimit bool, - allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) -} - -// newBlockManager returns a new Decred block manager. -// Use Start to begin processing asynchronous block and inv updates. -func newBlockManager(config *blockManagerConfig) (*blockManager, error) { +// Config holds the configuration options related to the network chain +// synchronization manager. +type Config struct { + // PeerNotifier specifies an implementation to use for notifying peers of + // status changes related to blocks and transactions. + PeerNotifier PeerNotifier + + // ChainParams identifies which chain parameters the manager is associated + // with. + ChainParams *chaincfg.Params + + // Chain specifies the chain instance to use for processing blocks and + // transactions. + Chain *blockchain.BlockChain + + // SigCache defines the signature cache to use when validating signatures. + SigCache *txscript.SigCache + + // TxMemPool specifies the mempool to use for processing transactions. + TxMemPool *mempool.TxPool + + // RpcServer returns an instance of an RPC server to use for notifications. + // It may return nil if there is no active RPC server. + RpcServer func() *rpcserver.Server + + // DisableCheckpoints indicates whether or not the block manager should make + // use of checkpoints. + DisableCheckpoints bool + + // NoMiningStateSync indicates whether or not the block manager should + // perform an initial mining state synchronization with peers once they are + // believed to be fully synced. + NoMiningStateSync bool + + // MaxPeers specifies the maximum number of peers the server is expected to + // be connected with. It is primarily used as a hint for more efficient + // synchronization. + MaxPeers int + + // MaxOrphanTxs specifies the maximum number of orphan transactions the + // transaction pool associated with the server supports. + MaxOrphanTxs int +} + +// New returns a new network chain synchronization manager. Use Start to begin +// processing asynchronous block and inv updates. +func New(config *Config) (SyncManager, error) { bm := blockManager{ - cfg: config, + cfg: *config, rejectedTxns: make(map[chainhash.Hash]struct{}), requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), peerStates: make(map[*peerpkg.Peer]*peerSyncState), - progressLogger: progresslog.New("Processed", syncLog), + progressLogger: progresslog.New("Processed", log), msgChan: make(chan interface{}, config.MaxPeers*3), headerList: list.New(), quit: make(chan struct{}), @@ -2060,7 +2016,7 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) { bm.resetHeaderState(&best.Hash, best.Height) } } else { - syncLog.Info("Checkpoints are disabled") + log.Info("Checkpoints are disabled") } bm.syncHeightMtx.Lock() diff --git a/log.go b/log.go index b199958acc..e7289a6c61 100644 --- a/log.go +++ b/log.go @@ -20,6 +20,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/mining/cpuminer" + "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/peer/v2" "github.com/decred/dcrd/txscript/v3" @@ -92,6 +93,7 @@ func init() { peer.UseLogger(peerLog) rpcserver.UseLogger(rpcsLog) stake.UseLogger(stkeLog) + netsync.UseLogger(syncLog) txscript.UseLogger(scrpLog) } diff --git a/rpcadaptors.go b/rpcadaptors.go index 88633b26ab..72c5998373 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -19,6 +19,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/mining/cpuminer" + "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/peer/v2" "github.com/decred/dcrd/wire" @@ -310,7 +311,7 @@ func (*rpcConnManager) Lookup(host string) ([]net.IP, error) { // rpcserver.SyncManager interface. type rpcSyncMgr struct { server *server - syncMgr syncManager + syncMgr netsync.SyncManager } // Ensure rpcSyncMgr implements the rpcserver.SyncManager interface. diff --git a/server.go b/server.go index 3f7d65cf11..3e5fedd94c 100644 --- a/server.go +++ b/server.go @@ -42,6 +42,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/mining" "github.com/decred/dcrd/internal/mining/cpuminer" + "github.com/decred/dcrd/internal/netsync" "github.com/decred/dcrd/internal/rpcserver" "github.com/decred/dcrd/internal/version" "github.com/decred/dcrd/lru" @@ -462,7 +463,7 @@ type server struct { sigCache *txscript.SigCache subsidyCache *standalone.SubsidyCache rpcServer *rpcserver.Server - syncManager syncManager + syncManager netsync.SyncManager bg *mining.BgBlkTmplGenerator chain *blockchain.BlockChain txMemPool *mempool.TxPool @@ -3645,12 +3646,11 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP }, } s.txMemPool = mempool.New(&txC) - s.syncManager, err = newBlockManager(&blockManagerConfig{ + s.syncManager, err = netsync.New(&netsync.Config{ PeerNotifier: &s, Chain: s.chain, ChainParams: s.chainParams, SigCache: s.sigCache, - SubsidyCache: s.subsidyCache, TxMemPool: s.txMemPool, RpcServer: func() *rpcserver.Server { return s.rpcServer From 4a9380cce8465557a87059999f0ea8428dcb56c7 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 10 Dec 2020 15:10:26 -0600 Subject: [PATCH 5/6] netsync: Rename blockManager to SyncManager. This removes the temporary netsync.SyncManager interface that was added to facilitate refactoring the block manager to the new netsync package and renames the blockManager struct to SyncManager so it is exported. It also updates callers to use the concrete struct versus the removed interface. --- internal/netsync/interface.go | 27 -- internal/netsync/manager.go | 628 +++++++++++++++++----------------- rpcadaptors.go | 2 +- server.go | 2 +- 4 files changed, 316 insertions(+), 343 deletions(-) diff --git a/internal/netsync/interface.go b/internal/netsync/interface.go index d1e4f7963b..4a9ac79b55 100644 --- a/internal/netsync/interface.go +++ b/internal/netsync/interface.go @@ -5,12 +5,9 @@ package netsync import ( - "github.com/decred/dcrd/blockchain/v4" "github.com/decred/dcrd/chaincfg/chainhash" "github.com/decred/dcrd/dcrutil/v3" - "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/peer/v2" - "github.com/decred/dcrd/wire" ) // PeerNotifier provides an interface to notify peers of status changes related @@ -24,27 +21,3 @@ type PeerNotifier interface { // latest connected main chain block, or a recognized orphan. UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int64, updateSource *peer.Peer) } - -// SyncManager is a temporary interface to facilitate cleaner diffs when moving -// the block manager to a new package, so none of its members are documented. -// It will be removed in a future commit. -type SyncManager interface { - NewPeer(p *peer.Peer) - IsCurrent() bool - TipGeneration() ([]chainhash.Hash, error) - RequestFromPeer(p *peer.Peer, blocks, txs []*chainhash.Hash) error - QueueTx(tx *dcrutil.Tx, peer *peer.Peer, done chan struct{}) - QueueBlock(block *dcrutil.Block, peer *peer.Peer, done chan struct{}) - QueueInv(inv *wire.MsgInv, peer *peer.Peer) - QueueHeaders(headers *wire.MsgHeaders, peer *peer.Peer) - QueueNotFound(notFound *wire.MsgNotFound, peer *peer.Peer) - DonePeer(peer *peer.Peer) - Start() - Stop() error - ForceReorganization(formerBest, newBest chainhash.Hash) error - ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) - SyncPeerID() int32 - SyncHeight() int64 - ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, rateLimit bool, - allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) -} diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index c830480a6d..ce70b197ec 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -107,7 +107,7 @@ type getSyncPeerMsg struct { // requestFromPeerMsg is a message type to be sent across the message channel // for requesting either blocks or transactions from a given peer. It routes -// this through the block manager so the block manager doesn't ban the peer +// this through the sync manager so the sync manager doesn't ban the peer // when it sends this information back. type requestFromPeerMsg struct { peer *peerpkg.Peer @@ -196,7 +196,7 @@ type headerNode struct { hash *chainhash.Hash } -// peerSyncState stores additional information that the blockManager tracks +// peerSyncState stores additional information that the sync manager tracks // about a peer. type peerSyncState struct { syncCandidate bool @@ -212,9 +212,9 @@ type orphanBlock struct { expiration time.Time } -// blockManager provides a concurrency safe block manager for handling all +// SyncManager provides a concurrency safe sync manager for handling all // incoming blocks. -type blockManager struct { +type SyncManager struct { cfg Config started int32 shutdown int32 @@ -254,33 +254,33 @@ type blockManager struct { // resetHeaderState sets the headers-first mode state to values appropriate for // syncing from a new peer. -func (b *blockManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int64) { - b.headersFirstMode = false - b.headerList.Init() - b.startHeader = nil +func (m *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int64) { + m.headersFirstMode = false + m.headerList.Init() + m.startHeader = nil // When there is a next checkpoint, add an entry for the latest known // block into the header pool. This allows the next downloaded header // to prove it links to the chain properly. - if b.nextCheckpoint != nil { + if m.nextCheckpoint != nil { node := headerNode{height: newestHeight, hash: newestHash} - b.headerList.PushBack(&node) + m.headerList.PushBack(&node) } } // SyncHeight returns latest known block being synced to. -func (b *blockManager) SyncHeight() int64 { - b.syncHeightMtx.Lock() - defer b.syncHeightMtx.Unlock() - return b.syncHeight +func (m *SyncManager) SyncHeight() int64 { + m.syncHeightMtx.Lock() + defer m.syncHeightMtx.Unlock() + return m.syncHeight } // findNextHeaderCheckpoint returns the next checkpoint after the passed height. // It returns nil when there is not one either because the height is already // later than the final checkpoint or some other reason such as disabled // checkpoints. -func (b *blockManager) findNextHeaderCheckpoint(height int64) *chaincfg.Checkpoint { - checkpoints := b.cfg.Chain.Checkpoints() +func (m *SyncManager) findNextHeaderCheckpoint(height int64) *chaincfg.Checkpoint { + checkpoints := m.cfg.Chain.Checkpoints() if len(checkpoints) == 0 { return nil } @@ -321,15 +321,15 @@ func chainBlockLocatorToHashes(locator blockchain.BlockLocator) []chainhash.Hash // download/sync the blockchain from. When syncing is already running, it // simply returns. It also examines the candidates for any which are no longer // candidates and removes them as needed. -func (b *blockManager) startSync() { +func (m *SyncManager) startSync() { // Return now if we're already syncing. - if b.syncPeer != nil { + if m.syncPeer != nil { return } - best := b.cfg.Chain.BestSnapshot() + best := m.cfg.Chain.BestSnapshot() var bestPeer *peerpkg.Peer - for peer, state := range b.peerStates { + for peer, state := range m.peerStates { if !state.syncCandidate { continue } @@ -358,9 +358,9 @@ func (b *blockManager) startSync() { // fully synced to whatever the chain believes when there is no candidate // for a sync peer. if bestPeer == nil { - b.isCurrentMtx.Lock() - b.isCurrent = b.cfg.Chain.IsCurrent() - b.isCurrentMtx.Unlock() + m.isCurrentMtx.Lock() + m.isCurrent = m.cfg.Chain.IsCurrent() + m.isCurrentMtx.Unlock() } // Start syncing from the best peer if one was selected. @@ -368,9 +368,9 @@ func (b *blockManager) startSync() { // Clear the requestedBlocks if the sync peer changes, otherwise // we may ignore blocks we need that the last sync peer failed // to send. - b.requestedBlocks = make(map[chainhash.Hash]struct{}) + m.requestedBlocks = make(map[chainhash.Hash]struct{}) - blkLocator, err := b.cfg.Chain.LatestBlockLocator() + blkLocator, err := m.cfg.Chain.LatestBlockLocator() if err != nil { log.Errorf("Failed to get block locator for the latest block: %v", err) @@ -384,9 +384,9 @@ func (b *blockManager) startSync() { // The chain is not synced whenever the current best height is less than // the height to sync to. if best.Height < bestPeer.LastBlock() { - b.isCurrentMtx.Lock() - b.isCurrent = false - b.isCurrentMtx.Unlock() + m.isCurrentMtx.Lock() + m.isCurrent = false + m.isCurrentMtx.Unlock() } // When the current height is less than a known checkpoint we @@ -406,19 +406,19 @@ func (b *blockManager) startSync() { // and fully validate them. Finally, regression test mode does // not support the headers-first approach so do normal block // downloads when in regression test mode. - if b.nextCheckpoint != nil && - best.Height < b.nextCheckpoint.Height && - !b.cfg.DisableCheckpoints { + if m.nextCheckpoint != nil && + best.Height < m.nextCheckpoint.Height && + !m.cfg.DisableCheckpoints { - err := bestPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := bestPeer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) if err != nil { log.Errorf("Failed to push getheadermsg for the latest "+ "blocks: %v", err) return } - b.headersFirstMode = true + m.headersFirstMode = true log.Infof("Downloading headers for blocks %d to %d from peer %s", - best.Height+1, b.nextCheckpoint.Height, bestPeer.Addr()) + best.Height+1, m.nextCheckpoint.Height, bestPeer.Addr()) } else { err := bestPeer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { @@ -427,10 +427,10 @@ func (b *blockManager) startSync() { return } } - b.syncPeer = bestPeer - b.syncHeightMtx.Lock() - b.syncHeight = bestPeer.LastBlock() - b.syncHeightMtx.Unlock() + m.syncPeer = bestPeer + m.syncHeightMtx.Lock() + m.syncHeight = bestPeer.LastBlock() + m.syncHeightMtx.Unlock() } else { log.Warnf("No sync peer candidates available") } @@ -438,27 +438,27 @@ func (b *blockManager) startSync() { // isSyncCandidate returns whether or not the peer is a candidate to consider // syncing from. -func (b *blockManager) isSyncCandidate(peer *peerpkg.Peer) bool { +func (m *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { // The peer is not a candidate for sync if it's not a full node. return peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork } -// syncMiningStateAfterSync polls the blockManager for the current sync -// state; if the manager is synced, it executes a call to the peer to -// sync the mining state to the network. -func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) { +// syncMiningStateAfterSync polls the sync manager for the current sync state +// and once the manager believes the chain is fully synced, it executes a call +// to the peer to sync the mining state. +func (m *SyncManager) syncMiningStateAfterSync(peer *peerpkg.Peer) { go func() { for { select { case <-time.After(3 * time.Second): - case <-b.quit: + case <-m.quit: return } if !peer.Connected() { return } - if !b.IsCurrent() { + if !m.IsCurrent() { continue } @@ -497,30 +497,30 @@ func (b *blockManager) syncMiningStateAfterSync(peer *peerpkg.Peer) { // handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. -func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { +func (m *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { // Ignore if in the process of shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } log.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) // Initialize the peer state - isSyncCandidate := b.isSyncCandidate(peer) - b.peerStates[peer] = &peerSyncState{ + isSyncCandidate := m.isSyncCandidate(peer) + m.peerStates[peer] = &peerSyncState{ syncCandidate: isSyncCandidate, requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), } // Start syncing by choosing the best candidate if needed. - if isSyncCandidate && b.syncPeer == nil { - b.startSync() + if isSyncCandidate && m.syncPeer == nil { + m.startSync() } // Grab the mining state from this peer once synced when enabled. - if !b.cfg.NoMiningStateSync { - b.syncMiningStateAfterSync(peer) + if !m.cfg.NoMiningStateSync { + m.syncMiningStateAfterSync(peer) } } @@ -528,20 +528,20 @@ func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { // removes the peer as a candidate for syncing and in the case where it was // the current sync peer, attempts to select a new best peer to sync from. It // is invoked from the syncHandler goroutine. -func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) { - state, exists := b.peerStates[peer] +func (m *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) { + state, exists := m.peerStates[peer] if !exists { log.Warnf("Received done peer message for unknown peer %s", peer) return } // Remove the peer from the list of candidate peers. - delete(b.peerStates, peer) + delete(m.peerStates, peer) // Remove requested transactions from the global map so that they will // be fetched from elsewhere next time we get an inv. for txHash := range state.requestedTxns { - delete(b.requestedTxns, txHash) + delete(m.requestedTxns, txHash) } // Remove requested blocks from the global map so that they will be @@ -549,19 +549,19 @@ func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) { // TODO(oga) we could possibly here check which peers have these blocks // and request them now to speed things up a little. for blockHash := range state.requestedBlocks { - delete(b.requestedBlocks, blockHash) + delete(m.requestedBlocks, blockHash) } // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the headers-first state if in headers-first // mode so - if b.syncPeer == peer { - b.syncPeer = nil - if b.headersFirstMode { - best := b.cfg.Chain.BestSnapshot() - b.resetHeaderState(&best.Hash, best.Height) + if m.syncPeer == peer { + m.syncPeer = nil + if m.headersFirstMode { + best := m.cfg.Chain.BestSnapshot() + m.resetHeaderState(&best.Hash, best.Height) } - b.startSync() + m.startSync() } } @@ -633,9 +633,9 @@ func errToWireRejectCode(err error) (wire.RejectCode, string) { } // handleTxMsg handles transaction messages from all peers. -func (b *blockManager) handleTxMsg(tmsg *txMsg) { +func (m *SyncManager) handleTxMsg(tmsg *txMsg) { peer := tmsg.peer - state, exists := b.peerStates[peer] + state, exists := m.peerStates[peer] if !exists { log.Warnf("Received tx message from unknown peer %s", peer) return @@ -654,7 +654,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Ignore transactions that we have already rejected. Do not // send a reject message here because if the transaction was already // rejected, the transaction was unsolicited. - if _, exists = b.rejectedTxns[*txHash]; exists { + if _, exists = m.rejectedTxns[*txHash]; exists { log.Debugf("Ignoring unsolicited previously rejected transaction %v "+ "from %s", txHash, peer) return @@ -662,8 +662,8 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. - allowOrphans := b.cfg.MaxOrphanTxs > 0 - acceptedTxs, err := b.cfg.TxMemPool.ProcessTransaction(tmsg.tx, + allowOrphans := m.cfg.MaxOrphanTxs > 0 + acceptedTxs, err := m.cfg.TxMemPool.ProcessTransaction(tmsg.tx, allowOrphans, true, true, mempool.Tag(tmsg.peer.ID())) // Remove transaction from request maps. Either the mempool/chain @@ -671,12 +671,12 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // instances of trying to fetch it, or we failed to insert and thus // we'll retry next time we get an inv. delete(state.requestedTxns, *txHash) - delete(b.requestedTxns, *txHash) + delete(m.requestedTxns, *txHash) if err != nil { // Do not request this transaction again until a new block // has been processed. - limitAdd(b.rejectedTxns, *txHash, maxRejectedTxns) + limitAdd(m.rejectedTxns, *txHash, maxRejectedTxns) // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -696,7 +696,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { return } - b.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) + m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) } // isKnownOrphan returns whether the passed hash is currently a known orphan. @@ -708,12 +708,12 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // orphans and react accordingly. // // This function is safe for concurrent access. -func (b *blockManager) isKnownOrphan(hash *chainhash.Hash) bool { +func (m *SyncManager) isKnownOrphan(hash *chainhash.Hash) bool { // Protect concurrent access. Using a read lock only so multiple readers // can query without blocking each other. - b.orphanLock.RLock() - _, exists := b.orphans[*hash] - b.orphanLock.RUnlock() + m.orphanLock.RLock() + _, exists := m.orphans[*hash] + m.orphanLock.RUnlock() return exists } @@ -721,18 +721,18 @@ func (b *blockManager) isKnownOrphan(hash *chainhash.Hash) bool { // of orphan blocks. // // This function is safe for concurrent access. -func (b *blockManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash { +func (m *SyncManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash { // Protect concurrent access. Using a read lock only so multiple // readers can query without blocking each other. - b.orphanLock.RLock() - defer b.orphanLock.RUnlock() + m.orphanLock.RLock() + defer m.orphanLock.RUnlock() // Keep looping while the parent of each orphaned block is known and is an // orphan itself. orphanRoot := hash prevHash := hash for { - orphan, exists := b.orphans[*prevHash] + orphan, exists := m.orphans[*prevHash] if !exists { break } @@ -745,21 +745,21 @@ func (b *blockManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash { // removeOrphanBlock removes the passed orphan block from the orphan pool and // previous orphan index. -func (b *blockManager) removeOrphanBlock(orphan *orphanBlock) { +func (m *SyncManager) removeOrphanBlock(orphan *orphanBlock) { // Protect concurrent access. - b.orphanLock.Lock() - defer b.orphanLock.Unlock() + m.orphanLock.Lock() + defer m.orphanLock.Unlock() // Remove the orphan block from the orphan pool. orphanHash := orphan.block.Hash() - delete(b.orphans, *orphanHash) + delete(m.orphans, *orphanHash) // Remove the reference from the previous orphan index too. An indexing // for loop is intentionally used over a range here as range does not // reevaluate the slice on each iteration nor does it adjust the index // for the modified slice. prevHash := &orphan.block.MsgBlock().Header.PrevBlock - orphans := b.prevOrphans[*prevHash] + orphans := m.prevOrphans[*prevHash] for i := 0; i < len(orphans); i++ { hash := orphans[i].block.Hash() if hash.IsEqual(orphanHash) { @@ -769,12 +769,12 @@ func (b *blockManager) removeOrphanBlock(orphan *orphanBlock) { i-- } } - b.prevOrphans[*prevHash] = orphans + m.prevOrphans[*prevHash] = orphans // Remove the map entry altogether if there are no longer any orphans // which depend on the parent hash. - if len(b.prevOrphans[*prevHash]) == 0 { - delete(b.prevOrphans, *prevHash) + if len(m.prevOrphans[*prevHash]) == 0 { + delete(m.prevOrphans, *prevHash) } } @@ -783,34 +783,34 @@ func (b *blockManager) removeOrphanBlock(orphan *orphanBlock) { // any expired blocks so a separate cleanup poller doesn't need to be run. It // also imposes a maximum limit on the number of outstanding orphan blocks and // will remove the oldest received orphan block if the limit is exceeded. -func (b *blockManager) addOrphanBlock(block *dcrutil.Block) { +func (m *SyncManager) addOrphanBlock(block *dcrutil.Block) { // Remove expired orphan blocks. - for _, oBlock := range b.orphans { + for _, oBlock := range m.orphans { if time.Now().After(oBlock.expiration) { - b.removeOrphanBlock(oBlock) + m.removeOrphanBlock(oBlock) continue } // Update the oldest orphan block pointer so it can be discarded // in case the orphan pool fills up. - if b.oldestOrphan == nil || - oBlock.expiration.Before(b.oldestOrphan.expiration) { - b.oldestOrphan = oBlock + if m.oldestOrphan == nil || + oBlock.expiration.Before(m.oldestOrphan.expiration) { + m.oldestOrphan = oBlock } } // Limit orphan blocks to prevent memory exhaustion. - if len(b.orphans)+1 > maxOrphanBlocks { + if len(m.orphans)+1 > maxOrphanBlocks { // Remove the oldest orphan to make room for the new one. - b.removeOrphanBlock(b.oldestOrphan) - b.oldestOrphan = nil + m.removeOrphanBlock(m.oldestOrphan) + m.oldestOrphan = nil } // Protect concurrent access. This is intentionally done here instead // of near the top since removeOrphanBlock does its own locking and // the range iterator is not invalidated by removing map entries. - b.orphanLock.Lock() - defer b.orphanLock.Unlock() + m.orphanLock.Lock() + defer m.orphanLock.Unlock() // Insert the block into the orphan map with an expiration time // 1 hour from now. @@ -819,11 +819,11 @@ func (b *blockManager) addOrphanBlock(block *dcrutil.Block) { block: block, expiration: expiration, } - b.orphans[*block.Hash()] = oBlock + m.orphans[*block.Hash()] = oBlock // Add to previous hash lookup index for faster dependency lookups. prevHash := &block.MsgBlock().Header.PrevBlock - b.prevOrphans[*prevHash] = append(b.prevOrphans[*prevHash], oBlock) + m.prevOrphans[*prevHash] = append(m.prevOrphans[*prevHash], oBlock) } // processOrphans determines if there are any orphans which depend on the passed @@ -833,7 +833,7 @@ func (b *blockManager) addOrphanBlock(block *dcrutil.Block) { // // The flags do not modify the behavior of this function directly, however they // are needed to pass along to maybeAcceptBlock. -func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.BehaviorFlags) error { +func (m *SyncManager) processOrphans(hash *chainhash.Hash, flags blockchain.BehaviorFlags) error { // Start with processing at least the passed hash. Leave a little room for // additional orphan blocks that need to be processed without needing to // grow the array in the common case. @@ -852,8 +852,8 @@ func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beh // is intentionally used over a range here as range does not reevaluate // the slice on each iteration nor does it adjust the index for the // modified slice. - for i := 0; i < len(b.prevOrphans[*processHash]); i++ { - orphan := b.prevOrphans[*processHash][i] + for i := 0; i < len(m.prevOrphans[*processHash]); i++ { + orphan := m.prevOrphans[*processHash][i] if orphan == nil { log.Warnf("Found a nil entry at index %d in the orphan "+ "dependency list for block %v", i, processHash) @@ -862,11 +862,11 @@ func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beh // Remove the orphan from the orphan pool. orphanHash := orphan.block.Hash() - b.removeOrphanBlock(orphan) + m.removeOrphanBlock(orphan) i-- // Potentially accept the block into the block chain. - _, err := b.cfg.Chain.ProcessBlock(orphan.block, flags) + _, err := m.cfg.Chain.ProcessBlock(orphan.block, flags) if err != nil { return err } @@ -890,17 +890,17 @@ func (b *blockManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beh // whether or not the block is an orphan, in which case the fork length will // also be zero as expected, because it, by definition, does not connect to the // best chain. -func (b *blockManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, bool, error) { +func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, bool, error) { // Process the block to include validation, best chain selection, etc. // - // Also, keep track of orphan blocks in the block manager when the error + // Also, keep track of orphan blocks in the sync manager when the error // returned indicates the block is an orphan. blockHash := block.Hash() - forkLen, err := b.cfg.Chain.ProcessBlock(block, flags) + forkLen, err := m.cfg.Chain.ProcessBlock(block, flags) if errors.Is(err, blockchain.ErrMissingParent) { log.Infof("Adding orphan block %v with parent %v", blockHash, block.MsgBlock().Header.PrevBlock) - b.addOrphanBlock(block) + m.addOrphanBlock(block) // The fork length of orphans is unknown since they, by definition, do // not connect to the best chain. @@ -912,27 +912,27 @@ func (b *blockManager) processBlockAndOrphans(block *dcrutil.Block, flags blockc // Accept any orphan blocks that depend on this block (they are no longer // orphans) and repeat for those accepted blocks until there are no more. - if err := b.processOrphans(blockHash, flags); err != nil { + if err := m.processOrphans(blockHash, flags); err != nil { return 0, false, err } // The chain is considered synced once both the blockchain believes it is // current and the sync height is reached or exceeded. - best := b.cfg.Chain.BestSnapshot() - syncHeight := b.SyncHeight() - if best.Height >= syncHeight && b.cfg.Chain.IsCurrent() { - b.isCurrentMtx.Lock() - b.isCurrent = true - b.isCurrentMtx.Unlock() + best := m.cfg.Chain.BestSnapshot() + syncHeight := m.SyncHeight() + if best.Height >= syncHeight && m.cfg.Chain.IsCurrent() { + m.isCurrentMtx.Lock() + m.isCurrent = true + m.isCurrentMtx.Unlock() } return forkLen, false, nil } // handleBlockMsg handles block messages from all peers. -func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { +func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { peer := bmsg.peer - state, exists := b.peerStates[peer] + state, exists := m.peerStates[peer] if !exists { log.Warnf("Received block message from unknown peer %s", peer) return @@ -956,16 +956,16 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // properly. isCheckpointBlock := false behaviorFlags := blockchain.BFNone - if b.headersFirstMode { - firstNodeEl := b.headerList.Front() + if m.headersFirstMode { + firstNodeEl := m.headerList.Front() if firstNodeEl != nil { firstNode := firstNodeEl.Value.(*headerNode) if blockHash.IsEqual(firstNode.hash) { behaviorFlags |= blockchain.BFFastAdd - if firstNode.hash.IsEqual(b.nextCheckpoint.Hash) { + if firstNode.hash.IsEqual(m.nextCheckpoint.Hash) { isCheckpointBlock = true } else { - b.headerList.Remove(firstNodeEl) + m.headerList.Remove(firstNodeEl) } } } @@ -975,11 +975,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // so we shouldn't have any more instances of trying to fetch it, or we // will fail the insert and thus we'll retry next time we get an inv. delete(state.requestedBlocks, *blockHash) - delete(b.requestedBlocks, *blockHash) + delete(m.requestedBlocks, *blockHash) // Process the block to include validation, best chain selection, orphan // handling, etc. - forkLen, isOrphan, err := b.processBlockAndOrphans(bmsg.block, behaviorFlags) + forkLen, isOrphan, err := m.processBlockAndOrphans(bmsg.block, behaviorFlags) if err != nil { // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log @@ -1007,8 +1007,8 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Request the parents for the orphan block from the peer that sent it. onMainChain := !isOrphan && forkLen == 0 if isOrphan { - orphanRoot := b.orphanRoot(blockHash) - blkLocator, err := b.cfg.Chain.LatestBlockLocator() + orphanRoot := m.orphanRoot(blockHash) + blkLocator, err := m.cfg.Chain.LatestBlockLocator() if err != nil { log.Warnf("Failed to get block locator for the latest block: %v", err) @@ -1023,13 +1023,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { } else { // When the block is not an orphan, log information about it and // update the chain state. - b.progressLogger.LogBlockHeight(bmsg.block.MsgBlock(), b.SyncHeight()) + m.progressLogger.LogBlockHeight(bmsg.block.MsgBlock(), m.SyncHeight()) if onMainChain { // Notify stake difficulty subscribers and prune invalidated // transactions. - best := b.cfg.Chain.BestSnapshot() - if r := b.cfg.RpcServer(); r != nil { + best := m.cfg.Chain.BestSnapshot() + if r := m.cfg.RpcServer(); r != nil { // Update registered websocket clients on the // current stake difficulty. r.NotifyStakeDifficulty( @@ -1039,14 +1039,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { StakeDifficulty: best.NextStakeDiff, }) } - b.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) - b.cfg.TxMemPool.PruneExpiredTx() + m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) + m.cfg.TxMemPool.PruneExpiredTx() // Clear the rejected transactions. - b.rejectedTxns = make(map[chainhash.Hash]struct{}) + m.rejectedTxns = make(map[chainhash.Hash]struct{}) // Proactively evict SigCache entries. - b.proactivelyEvictSigCacheEntries(best.Height) + m.proactivelyEvictSigCacheEntries(best.Height) } } @@ -1059,13 +1059,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // chain was not yet current or lost the lock announcement race. blockHeight := int64(bmsg.block.MsgBlock().Header.Height) peer.UpdateLastBlockHeight(blockHeight) - if isOrphan || (onMainChain && b.IsCurrent()) { - go b.cfg.PeerNotifier.UpdatePeerHeights(blockHash, blockHeight, + if isOrphan || (onMainChain && m.IsCurrent()) { + go m.cfg.PeerNotifier.UpdatePeerHeights(blockHash, blockHeight, bmsg.peer) } // Nothing more to do if we aren't in headers-first mode. - if !b.headersFirstMode { + if !m.headersFirstMode { return } @@ -1073,9 +1073,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // request more blocks using the header list when the request queue is // getting short. if !isCheckpointBlock { - if b.startHeader != nil && + if m.startHeader != nil && len(state.requestedBlocks) < minInFlightBlocks { - b.fetchHeaderBlocks() + m.fetchHeaderBlocks() } return } @@ -1084,27 +1084,27 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // there is a next checkpoint, get the next round of headers by asking // for headers starting from the block after this one up to the next // checkpoint. - prevHeight := b.nextCheckpoint.Height - prevHash := b.nextCheckpoint.Hash - b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight) - if b.nextCheckpoint != nil { + prevHeight := m.nextCheckpoint.Height + prevHash := m.nextCheckpoint.Hash + m.nextCheckpoint = m.findNextHeaderCheckpoint(prevHeight) + if m.nextCheckpoint != nil { locator := []chainhash.Hash{*prevHash} - err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) if err != nil { log.Warnf("Failed to send getheaders message to peer %s: %v", peer.Addr(), err) return } log.Infof("Downloading headers for blocks %d to %d from peer %s", - prevHeight+1, b.nextCheckpoint.Height, b.syncPeer.Addr()) + prevHeight+1, m.nextCheckpoint.Height, m.syncPeer.Addr()) return } // This is headers-first mode, the block is a checkpoint, and there are // no more checkpoints, so switch to normal mode by requesting blocks // from the block after this one up to the end of the chain (zero hash). - b.headersFirstMode = false - b.headerList.Init() + m.headersFirstMode = false + m.headerList.Init() log.Infof("Reached the final checkpoint -- switching to normal mode") locator := []chainhash.Hash{*blockHash} err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash) @@ -1118,28 +1118,28 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // proactivelyEvictSigCacheEntries fetches the block that is // txscript.ProactiveEvictionDepth levels deep from bestHeight and passes it to // SigCache to evict the entries associated with the transactions in that block. -func (b *blockManager) proactivelyEvictSigCacheEntries(bestHeight int64) { +func (m *SyncManager) proactivelyEvictSigCacheEntries(bestHeight int64) { // Nothing to do before the eviction depth is reached. if bestHeight <= txscript.ProactiveEvictionDepth { return } evictHeight := bestHeight - txscript.ProactiveEvictionDepth - block, err := b.cfg.Chain.BlockByHeight(evictHeight) + block, err := m.cfg.Chain.BlockByHeight(evictHeight) if err != nil { log.Warnf("Failed to retrieve the block at height %d: %v", evictHeight, err) return } - b.cfg.SigCache.EvictEntries(block.MsgBlock()) + m.cfg.SigCache.EvictEntries(block.MsgBlock()) } // fetchHeaderBlocks creates and sends a request to the syncPeer for the next // list of blocks to be downloaded based on the current list of headers. -func (b *blockManager) fetchHeaderBlocks() { +func (m *SyncManager) fetchHeaderBlocks() { // Nothing to do if there is no start header. - if b.startHeader == nil { + if m.startHeader == nil { log.Warnf("fetchHeaderBlocks called with no start header") return } @@ -1147,9 +1147,9 @@ func (b *blockManager) fetchHeaderBlocks() { // Build up a getdata request for the list of blocks the headers // describe. The size hint will be limited to wire.MaxInvPerMsg by // the function, so no need to double check it here. - gdmsg := wire.NewMsgGetDataSizeHint(uint(b.headerList.Len())) + gdmsg := wire.NewMsgGetDataSizeHint(uint(m.headerList.Len())) numRequested := 0 - for e := b.startHeader; e != nil; e = e.Next() { + for e := m.startHeader; e != nil; e = e.Next() { node, ok := e.Value.(*headerNode) if !ok { log.Warn("Header list node type is not a headerNode") @@ -1157,15 +1157,15 @@ func (b *blockManager) fetchHeaderBlocks() { } iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) - haveInv, err := b.haveInventory(iv) + haveInv, err := m.haveInventory(iv) if err != nil { log.Warnf("Unexpected failure when checking for existing "+ "inventory during header block fetch: %v", err) continue } if !haveInv { - b.requestedBlocks[*node.hash] = struct{}{} - syncPeerState := b.peerStates[b.syncPeer] + m.requestedBlocks[*node.hash] = struct{}{} + syncPeerState := m.peerStates[m.syncPeer] syncPeerState.requestedBlocks[*node.hash] = struct{}{} err = gdmsg.AddInvVect(iv) if err != nil { @@ -1174,20 +1174,20 @@ func (b *blockManager) fetchHeaderBlocks() { } numRequested++ } - b.startHeader = e.Next() + m.startHeader = e.Next() if numRequested >= wire.MaxInvPerMsg { break } } if len(gdmsg.InvList) > 0 { - b.syncPeer.QueueMessage(gdmsg, nil) + m.syncPeer.QueueMessage(gdmsg, nil) } } // handleHeadersMsg handles headers messages from all peers. -func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { +func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { peer := hmsg.peer - _, exists := b.peerStates[peer] + _, exists := m.peerStates[peer] if !exists { log.Warnf("Received headers message from unknown peer %s", peer) return @@ -1196,7 +1196,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // The remote peer is misbehaving if we didn't request headers. msg := hmsg.headers numHeaders := len(msg.Headers) - if !b.headersFirstMode { + if !m.headersFirstMode { log.Warnf("Got %d unrequested headers from %s -- disconnecting", numHeaders, peer.Addr()) peer.Disconnect() @@ -1217,7 +1217,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { finalHash = &blockHash // Ensure there is a previous header to compare against. - prevNodeEl := b.headerList.Back() + prevNodeEl := m.headerList.Back() if prevNodeEl == nil { log.Warnf("Header list does not contain a previous element as " + "expected -- disconnecting peer") @@ -1231,9 +1231,9 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { prevNode := prevNodeEl.Value.(*headerNode) if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { node.height = prevNode.height + 1 - e := b.headerList.PushBack(&node) - if b.startHeader == nil { - b.startHeader = e + e := m.headerList.PushBack(&node) + if m.startHeader == nil { + m.startHeader = e } } else { log.Warnf("Received block header that does not properly connect "+ @@ -1243,8 +1243,8 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } // Verify the header at the next checkpoint height matches. - if node.height == b.nextCheckpoint.Height { - if node.hash.IsEqual(b.nextCheckpoint.Hash) { + if node.height == m.nextCheckpoint.Height { + if node.hash.IsEqual(m.nextCheckpoint.Hash) { receivedCheckpoint = true log.Infof("Verified downloaded block header against "+ "checkpoint at height %d/hash %s", node.height, node.hash) @@ -1252,7 +1252,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { log.Warnf("Block header at height %d/hash %s from peer %s "+ "does NOT match expected checkpoint hash of %s -- "+ "disconnecting", node.height, node.hash, peer.Addr(), - b.nextCheckpoint.Hash) + m.nextCheckpoint.Hash) peer.Disconnect() return } @@ -1267,11 +1267,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // that is already in the database and is only used to ensure // the next header links properly, it must be removed before // fetching the blocks. - b.headerList.Remove(b.headerList.Front()) + m.headerList.Remove(m.headerList.Front()) log.Infof("Received %v block headers: Fetching blocks", - b.headerList.Len()) - b.progressLogger.SetLastLogTime(time.Now()) - b.fetchHeaderBlocks() + m.headerList.Len()) + m.progressLogger.SetLastLogTime(time.Now()) + m.fetchHeaderBlocks() return } @@ -1279,7 +1279,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // headers starting from the latest known header and ending with the // next checkpoint. locator := []chainhash.Hash{*finalHash} - err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) if err != nil { log.Warnf("Failed to send getheaders message to peer %s: %v", peer.Addr(), err) @@ -1288,9 +1288,9 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } // handleNotFoundMsg handles notfound messages from all peers. -func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) { +func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { peer := nfmsg.peer - state, exists := b.peerStates[peer] + state, exists := m.peerStates[peer] if !exists { log.Warnf("Received notfound message from unknown peer %s", peer) return @@ -1302,12 +1302,12 @@ func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) { case wire.InvTypeBlock: if _, exists := state.requestedBlocks[inv.Hash]; exists { delete(state.requestedBlocks, inv.Hash) - delete(b.requestedBlocks, inv.Hash) + delete(m.requestedBlocks, inv.Hash) } case wire.InvTypeTx: if _, exists := state.requestedTxns[inv.Hash]; exists { delete(state.requestedTxns, inv.Hash) - delete(b.requestedTxns, inv.Hash) + delete(m.requestedTxns, inv.Hash) } } } @@ -1318,24 +1318,24 @@ func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) { // inventory can be when it is in different states such as blocks that are part // of the main chain, on a side chain, in the orphan pool, and transactions that // are in the memory pool (either the main pool or orphan pool). -func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { +func (m *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) { switch invVect.Type { case wire.InvTypeBlock: // Determine if the block is known in any form (main chain, side // chain, or orphan). hash := &invVect.Hash - return b.isKnownOrphan(hash) || b.cfg.Chain.HaveBlock(hash), nil + return m.isKnownOrphan(hash) || m.cfg.Chain.HaveBlock(hash), nil case wire.InvTypeTx: // Ask the transaction memory pool if the transaction is known // to it in any form (main pool or orphan). - if b.cfg.TxMemPool.HaveTransaction(&invVect.Hash) { + if m.cfg.TxMemPool.HaveTransaction(&invVect.Hash) { return true, nil } // Check if the transaction exists from the point of view of the // end of the main chain. - entry, err := b.cfg.Chain.FetchUtxoEntry(&invVect.Hash) + entry, err := m.cfg.Chain.FetchUtxoEntry(&invVect.Hash) if err != nil { return false, err } @@ -1349,9 +1349,9 @@ func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { // handleInvMsg handles inv messages from all peers. // We examine the inventory advertised by the remote peer and act accordingly. -func (b *blockManager) handleInvMsg(imsg *invMsg) { +func (m *SyncManager) handleInvMsg(imsg *invMsg) { peer := imsg.peer - state, exists := b.peerStates[peer] + state, exists := m.peerStates[peer] if !exists { log.Warnf("Received inv message from unknown peer %s", peer) return @@ -1368,8 +1368,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { } } - fromSyncPeer := peer == b.syncPeer - isCurrent := b.IsCurrent() + fromSyncPeer := peer == m.syncPeer + isCurrent := m.IsCurrent() // If this inv contains a block announcement, and this isn't coming from // our current sync peer or we're current, then update the last @@ -1389,7 +1389,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // If our chain is current and a peer announces a block we already // know of, then update their current block height. if lastBlock != -1 && isCurrent { - blkHeight, err := b.cfg.Chain.BlockHeightByHash(&invVects[lastBlock].Hash) + blkHeight, err := m.cfg.Chain.BlockHeightByHash(&invVects[lastBlock].Hash) if err == nil { imsg.peer.UpdateLastBlockHeight(blkHeight) } @@ -1411,12 +1411,12 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { peer.AddKnownInventory(iv) // Ignore inventory when we're in headers-first mode. - if b.headersFirstMode { + if m.headersFirstMode { continue } // Request the inventory if we don't already have it. - haveInv, err := b.haveInventory(iv) + haveInv, err := m.haveInventory(iv) if err != nil { log.Warnf("Unexpected failure when checking for existing "+ "inventory during inv message processing: %v", err) @@ -1426,7 +1426,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { if iv.Type == wire.InvTypeTx { // Skip the transaction if it has already been // rejected. - if _, exists := b.rejectedTxns[iv.Hash]; exists { + if _, exists := m.rejectedTxns[iv.Hash]; exists { continue } } @@ -1447,12 +1447,12 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // resending the orphan block as an available block // to signal there are more missing blocks that need to // be requested. - if b.isKnownOrphan(&iv.Hash) { + if m.isKnownOrphan(&iv.Hash) { // Request blocks starting at the latest known // up to the root of the orphan that just came // in. - orphanRoot := b.orphanRoot(&iv.Hash) - blkLocator, err := b.cfg.Chain.LatestBlockLocator() + orphanRoot := m.orphanRoot(&iv.Hash) + blkLocator, err := m.cfg.Chain.LatestBlockLocator() if err != nil { log.Errorf("Failed to get block locator for the latest "+ "block: %v", err) @@ -1475,7 +1475,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // Request blocks after this one up to the // final one the remote peer knows about (zero // stop hash). - blkLocator := b.cfg.Chain.BlockLocatorFromHash(&iv.Hash) + blkLocator := m.cfg.Chain.BlockLocatorFromHash(&iv.Hash) locator := chainBlockLocatorToHashes(blkLocator) err = imsg.peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { @@ -1493,8 +1493,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { case wire.InvTypeBlock: // Request the block if there is not already a pending // request. - if _, exists := b.requestedBlocks[iv.Hash]; !exists { - limitAdd(b.requestedBlocks, iv.Hash, maxRequestedBlocks) + if _, exists := m.requestedBlocks[iv.Hash]; !exists { + limitAdd(m.requestedBlocks, iv.Hash, maxRequestedBlocks) limitAdd(state.requestedBlocks, iv.Hash, maxRequestedBlocks) gdmsg.AddInvVect(iv) numRequested++ @@ -1503,8 +1503,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { case wire.InvTypeTx: // Request the transaction if there is not already a // pending request. - if _, exists := b.requestedTxns[iv.Hash]; !exists { - limitAdd(b.requestedTxns, iv.Hash, maxRequestedTxns) + if _, exists := m.requestedTxns[iv.Hash]; !exists { + limitAdd(m.requestedTxns, iv.Hash, maxRequestedTxns) limitAdd(state.requestedTxns, iv.Hash, maxRequestedTxns) gdmsg.AddInvVect(iv) numRequested++ @@ -1547,63 +1547,63 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) { m[hash] = struct{}{} } -// blockHandler is the main handler for the block manager. It must be run -// as a goroutine. It processes block and inv messages in a separate goroutine -// from the peer handlers so the block (MsgBlock) messages are handled by a -// single thread without needing to lock memory data structures. This is -// important because the block manager controls which blocks are needed and how -// the fetching should proceed. -func (b *blockManager) blockHandler() { +// blockHandler is the main handler for the sync manager. It must be run as a +// goroutine. It processes block and inv messages in a separate goroutine from +// the peer handlers so the block (MsgBlock) messages are handled by a single +// thread without needing to lock memory data structures. This is important +// because the sync manager controls which blocks are needed and how the +// fetching should proceed. +func (m *SyncManager) blockHandler() { out: for { select { - case m := <-b.msgChan: - switch msg := m.(type) { + case data := <-m.msgChan: + switch msg := data.(type) { case *newPeerMsg: - b.handleNewPeerMsg(msg.peer) + m.handleNewPeerMsg(msg.peer) case *txMsg: - b.handleTxMsg(msg) + m.handleTxMsg(msg) msg.reply <- struct{}{} case *blockMsg: - b.handleBlockMsg(msg) + m.handleBlockMsg(msg) msg.reply <- struct{}{} case *invMsg: - b.handleInvMsg(msg) + m.handleInvMsg(msg) case *headersMsg: - b.handleHeadersMsg(msg) + m.handleHeadersMsg(msg) case *notFoundMsg: - b.handleNotFoundMsg(msg) + m.handleNotFoundMsg(msg) case *donePeerMsg: - b.handleDonePeerMsg(msg.peer) + m.handleDonePeerMsg(msg.peer) case getSyncPeerMsg: var peerID int32 - if b.syncPeer != nil { - peerID = b.syncPeer.ID() + if m.syncPeer != nil { + peerID = m.syncPeer.ID() } msg.reply <- peerID case requestFromPeerMsg: - err := b.requestFromPeer(msg.peer, msg.blocks, msg.txs) + err := m.requestFromPeer(msg.peer, msg.blocks, msg.txs) msg.reply <- requestFromPeerResponse{ err: err, } case forceReorganizationMsg: - err := b.cfg.Chain.ForceHeadReorganization( + err := m.cfg.Chain.ForceHeadReorganization( msg.formerBest, msg.newBest) if err == nil { // Notify stake difficulty subscribers and prune // invalidated transactions. - best := b.cfg.Chain.BestSnapshot() - if r := b.cfg.RpcServer(); r != nil { + best := m.cfg.Chain.BestSnapshot() + if r := m.cfg.RpcServer(); r != nil { r.NotifyStakeDifficulty( &rpcserver.StakeDifficultyNtfnData{ BlockHash: best.Hash, @@ -1611,9 +1611,9 @@ out: StakeDifficulty: best.NextStakeDiff, }) } - b.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, + m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) - b.cfg.TxMemPool.PruneExpiredTx() + m.cfg.TxMemPool.PruneExpiredTx() } msg.reply <- forceReorganizationResponse{ @@ -1621,14 +1621,14 @@ out: } case tipGenerationMsg: - g, err := b.cfg.Chain.TipGeneration() + g, err := m.cfg.Chain.TipGeneration() msg.reply <- tipGenerationResponse{ hashes: g, err: err, } case processBlockMsg: - forkLen, isOrphan, err := b.processBlockAndOrphans(msg.block, + forkLen, isOrphan, err := m.processBlockAndOrphans(msg.block, msg.flags) if err != nil { msg.reply <- processBlockResponse{ @@ -1643,8 +1643,8 @@ out: if onMainChain { // Notify stake difficulty subscribers and prune // invalidated transactions. - best := b.cfg.Chain.BestSnapshot() - if r := b.cfg.RpcServer(); r != nil { + best := m.cfg.Chain.BestSnapshot() + if r := m.cfg.RpcServer(); r != nil { r.NotifyStakeDifficulty( &rpcserver.StakeDifficultyNtfnData{ BlockHash: best.Hash, @@ -1652,9 +1652,9 @@ out: StakeDifficulty: best.NextStakeDiff, }) } - b.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, + m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) - b.cfg.TxMemPool.PruneExpiredTx() + m.cfg.TxMemPool.PruneExpiredTx() } msg.reply <- processBlockResponse{ @@ -1663,7 +1663,7 @@ out: } case processTransactionMsg: - acceptedTxs, err := b.cfg.TxMemPool.ProcessTransaction(msg.tx, + acceptedTxs, err := m.cfg.TxMemPool.ProcessTransaction(msg.tx, msg.allowOrphans, msg.rateLimit, msg.allowHighFees, msg.tag) msg.reply <- processTransactionResponse{ acceptedTxs: acceptedTxs, @@ -1674,141 +1674,141 @@ out: log.Warnf("Invalid message type in block handler: %T", msg) } - case <-b.quit: + case <-m.quit: break out } } - b.wg.Done() - log.Trace("Block handler done") + m.wg.Done() + log.Trace("Sync manager done") } -// NewPeer informs the block manager of a newly active peer. -func (b *blockManager) NewPeer(peer *peerpkg.Peer) { +// NewPeer informs the sync manager of a newly active peer. +func (m *SyncManager) NewPeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- &newPeerMsg{peer: peer} + m.msgChan <- &newPeerMsg{peer: peer} } // QueueTx adds the passed transaction message and peer to the block handling // queue. -func (b *blockManager) QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan struct{}) { +func (m *SyncManager) QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan struct{}) { // Don't accept more transactions if we're shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { done <- struct{}{} return } - b.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} + m.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} } // QueueBlock adds the passed block message and peer to the block handling queue. -func (b *blockManager) QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done chan struct{}) { +func (m *SyncManager) QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done chan struct{}) { // Don't accept more blocks if we're shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { done <- struct{}{} return } - b.msgChan <- &blockMsg{block: block, peer: peer, reply: done} + m.msgChan <- &blockMsg{block: block, peer: peer, reply: done} } // QueueInv adds the passed inv message and peer to the block handling queue. -func (b *blockManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { +func (m *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on inv // messages. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- &invMsg{inv: inv, peer: peer} + m.msgChan <- &invMsg{inv: inv, peer: peer} } // QueueHeaders adds the passed headers message and peer to the block handling // queue. -func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { +func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on // headers messages. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- &headersMsg{headers: headers, peer: peer} + m.msgChan <- &headersMsg{headers: headers, peer: peer} } // QueueNotFound adds the passed notfound message and peer to the block handling // queue. -func (b *blockManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) { +func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on // reject messages. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- ¬FoundMsg{notFound: notFound, peer: peer} + m.msgChan <- ¬FoundMsg{notFound: notFound, peer: peer} } -// DonePeer informs the blockmanager that a peer has disconnected. -func (b *blockManager) DonePeer(peer *peerpkg.Peer) { +// DonePeer informs the sync manager that a peer has disconnected. +func (m *SyncManager) DonePeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { + if atomic.LoadInt32(&m.shutdown) != 0 { return } - b.msgChan <- &donePeerMsg{peer: peer} + m.msgChan <- &donePeerMsg{peer: peer} } // Start begins the core block handler which processes block and inv messages. -func (b *blockManager) Start() { +func (m *SyncManager) Start() { // Already started? - if atomic.AddInt32(&b.started, 1) != 1 { + if atomic.AddInt32(&m.started, 1) != 1 { return } - log.Trace("Starting block manager") - b.wg.Add(1) - go b.blockHandler() + log.Trace("Starting sync manager") + m.wg.Add(1) + go m.blockHandler() } -// Stop gracefully shuts down the block manager by stopping all asynchronous +// Stop gracefully shuts down the sync manager by stopping all asynchronous // handlers and waiting for them to finish. -func (b *blockManager) Stop() error { - if atomic.AddInt32(&b.shutdown, 1) != 1 { - log.Warnf("Block manager is already in the process of shutting down") +func (m *SyncManager) Stop() error { + if atomic.AddInt32(&m.shutdown, 1) != 1 { + log.Warnf("Sync manager is already in the process of shutting down") return nil } - log.Infof("Block manager shutting down") - close(b.quit) - b.wg.Wait() + log.Infof("Sync manager shutting down") + close(m.quit) + m.wg.Wait() return nil } // SyncPeerID returns the ID of the current sync peer, or 0 if there is none. -func (b *blockManager) SyncPeerID() int32 { +func (m *SyncManager) SyncPeerID() int32 { reply := make(chan int32) - b.msgChan <- getSyncPeerMsg{reply: reply} + m.msgChan <- getSyncPeerMsg{reply: reply} return <-reply } // RequestFromPeer allows an outside caller to request blocks or transactions -// from a peer. The requests are logged in the blockmanager's internal map of -// requests so they do not later ban the peer for sending the respective data. -func (b *blockManager) RequestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error { +// from a peer. The requests are logged in the internal map of requests so the +// peer is not later banned for sending the respective data. +func (m *SyncManager) RequestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error { reply := make(chan requestFromPeerResponse) - b.msgChan <- requestFromPeerMsg{peer: p, blocks: blocks, txs: txs, + m.msgChan <- requestFromPeerMsg{peer: p, blocks: blocks, txs: txs, reply: reply} response := <-reply return response.err } -func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error { +func (m *SyncManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash.Hash) error { msgResp := wire.NewMsgGetData() - state, exists := b.peerStates[p] + state, exists := m.peerStates[p] if !exists { return fmt.Errorf("unknown peer %s", p) } @@ -1817,14 +1817,14 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash for _, bh := range blocks { // If we've already requested this block, skip it. _, alreadyReqP := state.requestedBlocks[*bh] - _, alreadyReqB := b.requestedBlocks[*bh] + _, alreadyReqB := m.requestedBlocks[*bh] if alreadyReqP || alreadyReqB { continue } // Skip the block when it is already known. - if b.isKnownOrphan(bh) || b.cfg.Chain.HaveBlock(bh) { + if m.isKnownOrphan(bh) || m.cfg.Chain.HaveBlock(bh) { continue } @@ -1836,14 +1836,14 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash } state.requestedBlocks[*bh] = struct{}{} - b.requestedBlocks[*bh] = struct{}{} + m.requestedBlocks[*bh] = struct{}{} } // Add the vote transactions to the request. for _, vh := range txs { // If we've already requested this transaction, skip it. _, alreadyReqP := state.requestedTxns[*vh] - _, alreadyReqB := b.requestedTxns[*vh] + _, alreadyReqB := m.requestedTxns[*vh] if alreadyReqP || alreadyReqB { continue @@ -1851,13 +1851,13 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash // Ask the transaction memory pool if the transaction is known // to it in any form (main pool or orphan). - if b.cfg.TxMemPool.HaveTransaction(vh) { + if m.cfg.TxMemPool.HaveTransaction(vh) { continue } // Check if the transaction exists from the point of view of the // end of the main chain. - entry, err := b.cfg.Chain.FetchUtxoEntry(vh) + entry, err := m.cfg.Chain.FetchUtxoEntry(vh) if err != nil { return err } @@ -1873,7 +1873,7 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash } state.requestedTxns[*vh] = struct{}{} - b.requestedTxns[*vh] = struct{}{} + m.requestedTxns[*vh] = struct{}{} } if len(msgResp.InvList) > 0 { @@ -1885,11 +1885,11 @@ func (b *blockManager) requestFromPeer(p *peerpkg.Peer, blocks, txs []*chainhash // ForceReorganization forces a reorganization of the block chain to the block // hash requested, so long as it matches up with the current organization of the -// best chain. It is funneled through the block manager since blockchain is not +// best chain. It is funneled through the sync manager since blockchain is not // safe for concurrent access. -func (b *blockManager) ForceReorganization(formerBest, newBest chainhash.Hash) error { +func (m *SyncManager) ForceReorganization(formerBest, newBest chainhash.Hash) error { reply := make(chan forceReorganizationResponse) - b.msgChan <- forceReorganizationMsg{ + m.msgChan <- forceReorganizationMsg{ formerBest: formerBest, newBest: newBest, reply: reply} @@ -1898,52 +1898,52 @@ func (b *blockManager) ForceReorganization(formerBest, newBest chainhash.Hash) e } // TipGeneration returns the hashes of all the children of the current best -// chain tip. It is funneled through the block manager since blockchain is not +// chain tip. It is funneled through the sync manager since blockchain is not // safe for concurrent access. -func (b *blockManager) TipGeneration() ([]chainhash.Hash, error) { +func (m *SyncManager) TipGeneration() ([]chainhash.Hash, error) { reply := make(chan tipGenerationResponse) - b.msgChan <- tipGenerationMsg{reply: reply} + m.msgChan <- tipGenerationMsg{reply: reply} response := <-reply return response.hashes, response.err } // ProcessBlock makes use of ProcessBlock on an internal instance of a block -// chain. It is funneled through the block manager since blockchain is not safe +// chain. It is funneled through the sync manager since blockchain is not safe // for concurrent access. -func (b *blockManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { +func (m *SyncManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { reply := make(chan processBlockResponse, 1) - b.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} + m.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} response := <-reply return response.isOrphan, response.err } // ProcessTransaction makes use of ProcessTransaction on an internal instance of -// a block chain. It is funneled through the block manager since blockchain is +// a block chain. It is funneled through the sync manager since blockchain is // not safe for concurrent access. -func (b *blockManager) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, +func (m *SyncManager) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, rateLimit bool, allowHighFees bool, tag mempool.Tag) ([]*dcrutil.Tx, error) { reply := make(chan processTransactionResponse, 1) - b.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit, + m.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit, allowHighFees, tag, reply} response := <-reply return response.acceptedTxs, response.err } -// IsCurrent returns whether or not the block manager believes it is synced with +// IsCurrent returns whether or not the sync manager believes it is synced with // the connected peers. // // This function is safe for concurrent access. -func (b *blockManager) IsCurrent() bool { - b.isCurrentMtx.RLock() - isCurrent := b.isCurrent - b.isCurrentMtx.RUnlock() +func (m *SyncManager) IsCurrent() bool { + m.isCurrentMtx.RLock() + isCurrent := m.isCurrent + m.isCurrentMtx.RUnlock() return isCurrent } // TicketPoolValue returns the current value of the total stake in the ticket // pool. -func (b *blockManager) TicketPoolValue() (dcrutil.Amount, error) { - return b.cfg.Chain.TicketPoolValue() +func (m *SyncManager) TicketPoolValue() (dcrutil.Amount, error) { + return m.cfg.Chain.TicketPoolValue() } // Config holds the configuration options related to the network chain @@ -1971,11 +1971,11 @@ type Config struct { // It may return nil if there is no active RPC server. RpcServer func() *rpcserver.Server - // DisableCheckpoints indicates whether or not the block manager should make + // DisableCheckpoints indicates whether or not the sync manager should make // use of checkpoints. DisableCheckpoints bool - // NoMiningStateSync indicates whether or not the block manager should + // NoMiningStateSync indicates whether or not the sync manager should // perform an initial mining state synchronization with peers once they are // believed to be fully synced. NoMiningStateSync bool @@ -1992,8 +1992,8 @@ type Config struct { // New returns a new network chain synchronization manager. Use Start to begin // processing asynchronous block and inv updates. -func New(config *Config) (SyncManager, error) { - bm := blockManager{ +func New(config *Config) (*SyncManager, error) { + m := SyncManager{ cfg: *config, rejectedTxns: make(map[chainhash.Hash]struct{}), requestedTxns: make(map[chainhash.Hash]struct{}), @@ -2008,20 +2008,20 @@ func New(config *Config) (SyncManager, error) { isCurrent: config.Chain.IsCurrent(), } - best := bm.cfg.Chain.BestSnapshot() - if !bm.cfg.DisableCheckpoints { + best := m.cfg.Chain.BestSnapshot() + if !m.cfg.DisableCheckpoints { // Initialize the next checkpoint based on the current height. - bm.nextCheckpoint = bm.findNextHeaderCheckpoint(best.Height) - if bm.nextCheckpoint != nil { - bm.resetHeaderState(&best.Hash, best.Height) + m.nextCheckpoint = m.findNextHeaderCheckpoint(best.Height) + if m.nextCheckpoint != nil { + m.resetHeaderState(&best.Hash, best.Height) } } else { log.Info("Checkpoints are disabled") } - bm.syncHeightMtx.Lock() - bm.syncHeight = best.Height - bm.syncHeightMtx.Unlock() + m.syncHeightMtx.Lock() + m.syncHeight = best.Height + m.syncHeightMtx.Unlock() - return &bm, nil + return &m, nil } diff --git a/rpcadaptors.go b/rpcadaptors.go index 72c5998373..cd871a5bb9 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -311,7 +311,7 @@ func (*rpcConnManager) Lookup(host string) ([]net.IP, error) { // rpcserver.SyncManager interface. type rpcSyncMgr struct { server *server - syncMgr netsync.SyncManager + syncMgr *netsync.SyncManager } // Ensure rpcSyncMgr implements the rpcserver.SyncManager interface. diff --git a/server.go b/server.go index 3e5fedd94c..e30d8970ae 100644 --- a/server.go +++ b/server.go @@ -463,7 +463,7 @@ type server struct { sigCache *txscript.SigCache subsidyCache *standalone.SubsidyCache rpcServer *rpcserver.Server - syncManager netsync.SyncManager + syncManager *netsync.SyncManager bg *mining.BgBlkTmplGenerator chain *blockchain.BlockChain txMemPool *mempool.TxPool From e7a3670915b4b78998789bf9411c7b73ede8eb4c Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 10 Dec 2020 15:10:27 -0600 Subject: [PATCH 6/6] netsync: Add package documentation. --- internal/netsync/README.md | 21 +++++++++++++++++++++ internal/netsync/doc.go | 14 ++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 internal/netsync/README.md create mode 100644 internal/netsync/doc.go diff --git a/internal/netsync/README.md b/internal/netsync/README.md new file mode 100644 index 0000000000..3391fad683 --- /dev/null +++ b/internal/netsync/README.md @@ -0,0 +1,21 @@ +netsync +======= + +[![Build Status](https://github.com/decred/dcrd/workflows/Build%20and%20Test/badge.svg)](https://github.com/decred/dcrd/actions) +[![ISC License](https://img.shields.io/badge/license-ISC-blue.svg)](http://copyfree.org) +[![Doc](https://img.shields.io/badge/doc-reference-blue.svg)](https://pkg.go.dev/github.com/decred/dcrd/internal/netsync) + +Package netsync implements a concurrency safe block syncing protocol. + +## Overview + +The provided implementation of SyncManager communicates with connected peers to +perform an initial block download, keep the chain in sync, and announce new +blocks connected to the chain. Currently the sync manager selects a single sync +peer that it downloads all blocks from until it is up to date with the longest +chain the sync peer is aware of. + +## License + +Package netsync is licensed under the [copyfree](http://copyfree.org) ISC +License. diff --git a/internal/netsync/doc.go b/internal/netsync/doc.go new file mode 100644 index 0000000000..5fde072364 --- /dev/null +++ b/internal/netsync/doc.go @@ -0,0 +1,14 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +/* +Package netsync implements a concurrency safe block syncing protocol. + +The provided implementation of SyncManager communicates with connected peers to +perform an initial block download, keep the chain in sync, and announce new +blocks connected to the chain. Currently the sync manager selects a single sync +peer that it downloads all blocks from until it is up to date with the longest +chain the sync peer is aware of. +*/ +package netsync