diff --git a/blockchain/chain.go b/blockchain/chain.go index d1d984b935..92f1f40a96 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -348,6 +348,19 @@ func (b *BlockChain) DisableVerify(disable bool) { b.chainLock.Unlock() } +// HaveHeader returns whether or not the chain instance has the block header +// represented by the passed hash. Note that this will return true for both the +// main chain and any side chains. +// +// This function is safe for concurrent access. +func (b *BlockChain) HaveHeader(hash *chainhash.Hash) bool { + b.index.RLock() + node := b.index.index[*hash] + headerKnown := node != nil + b.index.RUnlock() + return headerKnown +} + // HaveBlock returns whether or not the chain instance has the block represented // by the passed hash. This includes checking the various places a block can // be like part of the main chain or on a side chain. @@ -1436,7 +1449,7 @@ func (b *BlockChain) isOldTimestamp(node *blockNode) bool { } // maybeUpdateIsCurrent potentially updates whether or not the chain believes it -// is current. +// is current using the provided best chain tip. // // It makes use of a latching approach such that once the chain becomes current // it will only switch back to false in the case no new blocks have been seen @@ -1474,7 +1487,20 @@ func (b *BlockChain) maybeUpdateIsCurrent(curBest *blockNode) { log.Debugf("Chain latched to current at block %s (height %d)", curBest.hash, curBest.height) } +} +// MaybeUpdateIsCurrent potentially updates whether or not the chain believes it +// is current. +// +// It makes use of a latching approach such that once the chain becomes current +// it will only switch back to false in the case no new blocks have been seen +// for an extended period of time. +// +// This function is safe for concurrent access. +func (b *BlockChain) MaybeUpdateIsCurrent() { + b.chainLock.Lock() + b.maybeUpdateIsCurrent(b.bestChain.Tip()) + b.chainLock.Unlock() } // isCurrent returns whether or not the chain believes it is current based on @@ -2188,10 +2214,16 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { "%d, block index: %d", b.dbInfo.version, b.dbInfo.compVer, b.dbInfo.bidxVer) + b.index.RLock() + bestHdr := b.index.bestHeader + b.index.RUnlock() + log.Infof("Best known header: height %d, hash %v", bestHdr.height, + bestHdr.hash) + tip := b.bestChain.Tip() - log.Infof("Chain state: height %d, hash %v, total transactions %d, "+ - "work %v, stake version %v", tip.height, tip.hash, - b.stateSnapshot.TotalTxns, tip.workSum, 0) + log.Infof("Chain state: height %d, hash %v, total transactions %d, work "+ + "%v, progress %0.2f%%", tip.height, tip.hash, + b.stateSnapshot.TotalTxns, tip.workSum, b.VerifyProgress()) return &b, nil } diff --git a/blockchain/chainquery.go b/blockchain/chainquery.go index dd3ffe33e6..00008ea874 100644 --- a/blockchain/chainquery.go +++ b/blockchain/chainquery.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2020 The Decred developers +// Copyright (c) 2018-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -6,6 +6,7 @@ package blockchain import ( "bytes" + "math" "sort" "github.com/decred/dcrd/chaincfg/chainhash" @@ -154,3 +155,98 @@ func (b *BlockChain) BestInvalidHeader() chainhash.Hash { b.index.RUnlock() return hash } + +// NextNeededBlocks returns hashes for the next blocks after the current best +// chain tip that are needed to make progress towards the current best known +// header skipping any blocks that are already known or in the provided map of +// blocks to exclude. Typically the caller would want to exclude all blocks +// that have outstanding requests. +// +// The maximum number of results is limited to the provided value or the maximum +// allowed by the internal lookahead buffer in the case the requested number of +// max results exceeds that value. +// +// This function is safe for concurrent access. +func (b *BlockChain) NextNeededBlocks(maxResults uint8, exclude map[chainhash.Hash]struct{}) []*chainhash.Hash { + // Nothing to do when no results are requested. + if maxResults == 0 { + return nil + } + + // Determine the common ancestor between the current best chain tip and the + // current best known header. In practice this should never be nil because + // orphan headers are not allowed into the block index, but be paranoid and + // check anyway in case things change in the future. + b.index.RLock() + bestHeader := b.index.bestHeader + fork := b.bestChain.FindFork(bestHeader) + if fork == nil { + b.index.RUnlock() + return nil + } + + // Determine the final block to consider for determining the next needed + // blocks by determining the descendants of the current best chain tip on + // the branch that leads to the best known header while clamping the number + // of descendants to consider to a lookahead buffer. + const lookaheadBuffer = 512 + numBlocksToConsider := bestHeader.height - fork.height + if numBlocksToConsider == 0 { + b.index.RUnlock() + return nil + } + if numBlocksToConsider > lookaheadBuffer { + bestHeader = bestHeader.Ancestor(fork.height + lookaheadBuffer) + numBlocksToConsider = lookaheadBuffer + } + + // Walk backwards from the final block to consider to the current best chain + // tip excluding any blocks that already have their data available or that + // the caller asked to be excluded (likely because they've already been + // requested). + neededBlocks := make([]*chainhash.Hash, 0, numBlocksToConsider) + for node := bestHeader; node != nil && node != fork; node = node.parent { + _, isExcluded := exclude[node.hash] + if isExcluded || node.status.HaveData() { + continue + } + + neededBlocks = append(neededBlocks, &node.hash) + } + b.index.RUnlock() + + // Reverse the needed blocks so they are in forwards order. + reverse := func(s []*chainhash.Hash) { + slen := len(s) + for i := 0; i < slen/2; i++ { + s[i], s[slen-1-i] = s[slen-1-i], s[i] + } + } + reverse(neededBlocks) + + // Clamp the number of results to the lower of the requested max or number + // available. + if int64(maxResults) > numBlocksToConsider { + maxResults = uint8(numBlocksToConsider) + } + if uint16(len(neededBlocks)) > uint16(maxResults) { + neededBlocks = neededBlocks[:maxResults] + } + return neededBlocks +} + +// VerifyProgress returns a percentage that is a guess of the progress of the +// chain verification process. +// +// This function is safe for concurrent access. +func (b *BlockChain) VerifyProgress() float64 { + b.index.RLock() + bestHdr := b.index.bestHeader + b.index.RUnlock() + if bestHdr.height == 0 { + return 0.0 + } + + tip := b.bestChain.Tip() + return math.Min(float64(tip.height)/float64(bestHdr.height), 1.0) * 100 +} diff --git a/blockchain/checkpoints.go b/blockchain/checkpoints.go index e674a3f195..cbeecb32f1 100644 --- a/blockchain/checkpoints.go +++ b/blockchain/checkpoints.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -65,7 +65,7 @@ func (b *BlockChain) verifyCheckpoint(height int64, hash *chainhash.Hash) bool { return false } - log.Infof("Verified checkpoint at height %d/block %s", checkpoint.Height, + log.Debugf("Verified checkpoint at height %d/block %s", checkpoint.Height, checkpoint.Hash) return true } diff --git a/blockdb.go b/blockdb.go index 45a6f02ef5..977ace0374 100644 --- a/blockdb.go +++ b/blockdb.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -238,7 +238,11 @@ func dumpBlockChain(params *chaincfg.Params, b *blockchain.BlockChain) error { return err } - progressLogger.LogBlockHeight(bl.MsgBlock(), tipHeight) + msgBlock := bl.MsgBlock() + forceLog := int64(msgBlock.Header.Height) >= tipHeight + progressLogger.LogProgress(msgBlock, forceLog, func() float64 { + return float64(msgBlock.Header.Height) / float64(tipHeight) * 100 + }) } srvrLog.Infof("Successfully dumped the blockchain (%v blocks) to %v.", diff --git a/internal/mining/cpuminer/cpuminer.go b/internal/mining/cpuminer/cpuminer.go index 17ba42ce2d..8951d26df5 100644 --- a/internal/mining/cpuminer/cpuminer.go +++ b/internal/mining/cpuminer/cpuminer.go @@ -1,5 +1,5 @@ // Copyright (c) 2014-2016 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -96,7 +96,7 @@ type Config struct { // ProcessBlock defines the function to call with any solved blocks. // It typically must run the provided block through the same set of // rules and handling as any other block coming from the network. - ProcessBlock func(*dcrutil.Block, blockchain.BehaviorFlags) (bool, error) + ProcessBlock func(*dcrutil.Block, blockchain.BehaviorFlags) error // ConnectedCount defines the function to use to obtain how many other // peers the server is connected to. This is used by the automatic @@ -200,8 +200,14 @@ func (m *CPUMiner) submitBlock(block *dcrutil.Block) bool { // Process this block using the same rules as blocks coming from other // nodes. This will in turn relay it to the network like normal. - isOrphan, err := m.cfg.ProcessBlock(block, blockchain.BFNone) + err := m.cfg.ProcessBlock(block, blockchain.BFNone) if err != nil { + if errors.Is(err, blockchain.ErrMissingParent) { + log.Errorf("Block submitted via CPU miner is an orphan building "+ + "on parent %v", block.MsgBlock().Header.PrevBlock) + return false + } + // Anything other than a rule violation is an unexpected error, // so log that error as an internal error. var rErr blockchain.RuleError @@ -228,11 +234,6 @@ func (m *CPUMiner) submitBlock(block *dcrutil.Block) bool { log.Errorf("Block submitted via CPU miner rejected: %v", err) return false } - if isOrphan { - log.Errorf("Block submitted via CPU miner is an orphan building on "+ - "parent %v", block.MsgBlock().Header.PrevBlock) - return false - } // The block was accepted. coinbaseTxOuts := block.MsgBlock().Transactions[0].TxOut diff --git a/internal/netsync/log.go b/internal/netsync/log.go index cbe84d3a1e..8db82c2a8d 100644 --- a/internal/netsync/log.go +++ b/internal/netsync/log.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 The Decred developers +// Copyright (c) 2020-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -20,3 +20,12 @@ var log = slog.Disabled func UseLogger(logger slog.Logger) { log = logger } + +// pickNoun returns the singular or plural form of a noun depending on the +// provided count. +func pickNoun(n uint64, singular, plural string) string { + if n == 1 { + return singular + } + return plural +} diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 1320e42f6c..2d64b42e79 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -6,10 +6,10 @@ package netsync import ( - "container/list" "context" "errors" "fmt" + "math" "runtime/debug" "sync" "time" @@ -30,13 +30,12 @@ import ( const ( // minInFlightBlocks is the minimum number of blocks that should be - // in the request queue for headers-first mode before requesting - // more. + // in the request queue before requesting more. minInFlightBlocks = 10 - // maxOrphanBlocks is the maximum number of orphan blocks that can be - // queued. - maxOrphanBlocks = 500 + // maxInFlightBlocks is the maximum number of blocks to allow in the sync + // peer request queue. + maxInFlightBlocks = 16 // maxRejectedTxns is the maximum number of rejected transactions // hashes to store in memory. @@ -49,6 +48,21 @@ const ( // maxRequestedTxns is the maximum number of requested transactions // hashes to store in memory. maxRequestedTxns = wire.MaxInvPerMsg + + // maxExpectedHeaderAnnouncementsPerMsg is the maximum number of headers in + // a single message that is expected when determining when the message + // appears to be announcing new blocks. + maxExpectedHeaderAnnouncementsPerMsg = 12 + + // maxConsecutiveOrphanHeaders is the maximum number of consecutive header + // messages that contain headers which do not connect a peer can send before + // it is deemed to have diverged so far it is no longer useful. + maxConsecutiveOrphanHeaders = 10 + + // headerSyncStallTimeoutSecs is the number of seconds to wait for progress + // during the header sync process before stalling the sync and disconnecting + // the peer. + headerSyncStallTimeoutSecs = (3 + wire.MaxBlockHeadersPerMsg/1000) * 2 ) // zeroHash is the zero value hash (all zeros). It is defined as a convenience. @@ -128,9 +142,8 @@ type requestFromPeerResponse struct { // processBlockResponse is a response sent to the reply channel of a // processBlockMsg. type processBlockResponse struct { - forkLen int64 - isOrphan bool - err error + forkLen int64 + err error } // processBlockMsg is a message type to be sent across the message channel @@ -163,13 +176,6 @@ type processTransactionMsg struct { reply chan processTransactionResponse } -// headerNode is used as a node in a list of headers that are linked together -// between checkpoints. -type headerNode struct { - height int64 - hash *chainhash.Hash -} - // syncMgrPeer extends a peer to maintain additional state maintained by the // sync manager. type syncMgrPeer struct { @@ -178,14 +184,71 @@ type syncMgrPeer struct { syncCandidate bool requestedTxns map[chainhash.Hash]struct{} requestedBlocks map[chainhash.Hash]struct{} + + // numConsecutiveOrphanHeaders tracks the number of consecutive header + // messages sent by the peer that contain headers which do not connect. It + // is used to detect peers that have either diverged so far they are no + // longer useful or are otherwise being malicious. + numConsecutiveOrphanHeaders int32 +} + +// headerSyncState houses the state used to track the header sync progress and +// related stall handling. +type headerSyncState struct { + // headersSynced tracks whether or not the headers are synced to a point + // that is recent enough to start downloading blocks. + headersSynced bool + + // These fields are used to implement a progress stall timeout that can be + // reset at any time without needing to create a new one and the associated + // extra garbage. + // + // stallTimer is an underlying timer that is used to implement the timeout. + // + // stallChanDrained indicates whether or not the channel for the stall timer + // has already been read and is used when resetting the timer to ensure the + // channel is drained when the timer is stopped as described in the timer + // documentation. + stallTimer *time.Timer + stallChanDrained bool +} + +// makeHeaderSyncState returns a header sync state that is ready to use. +func makeHeaderSyncState() headerSyncState { + stallTimer := time.NewTimer(math.MaxInt64) + stallTimer.Stop() + return headerSyncState{ + stallTimer: stallTimer, + stallChanDrained: true, + } +} + +// stopStallTimeout stops the progress stall timer while ensuring to read from +// the timer's channel in the case the timer already expired which can happen +// due to the fact the stop happens in between channel reads. This behavior is +// well documented in the Timer docs. +// +// NOTE: This function must not be called concurrent with any other receives on +// the timer's channel. +func (state *headerSyncState) stopStallTimeout() { + t := state.stallTimer + if !t.Stop() && !state.stallChanDrained { + <-t.C + } + state.stallChanDrained = true } -// orphanBlock represents a block for which the parent is not yet available. It -// is a normal block plus an expiration time to prevent caching the orphan -// forever. -type orphanBlock struct { - block *dcrutil.Block - expiration time.Time +// resetStallTimeout resets the progress stall timer while ensuring to read from +// the timer's channel in the case the timer already expired which can happen +// due to the fact the reset happens in between channel reads. This behavior is +// well documented in the Timer docs. +// +// NOTE: This function must not be called concurrent with any other receives on +// the timer's channel. +func (state *headerSyncState) resetStallTimeout() { + state.stopStallTimeout() + state.stallTimer.Reset(headerSyncStallTimeoutSecs * time.Second) + state.stallChanDrained = false } // SyncManager provides a concurrency safe sync manager for handling all @@ -203,23 +266,14 @@ type SyncManager struct { rejectedTxns map[chainhash.Hash]struct{} requestedTxns map[chainhash.Hash]struct{} requestedBlocks map[chainhash.Hash]struct{} - progressLogger *progresslog.BlockLogger + progressLogger *progresslog.Logger syncPeer *syncMgrPeer msgChan chan interface{} peers map[*peerpkg.Peer]*syncMgrPeer - // The following fields are used for headers-first mode. - headersFirstMode bool - headerList *list.List - startHeader *list.Element - nextCheckpoint *chaincfg.Checkpoint - - // These fields are related to handling of orphan blocks. They are - // protected by the orphan lock. - orphanLock sync.RWMutex - orphans map[chainhash.Hash]*orphanBlock - prevOrphans map[chainhash.Hash][]*orphanBlock - oldestOrphan *orphanBlock + // hdrSyncState houses the state used to track the initial header sync + // process and related stall handling. + hdrSyncState headerSyncState // The following fields are used to track the height being synced to from // peers. @@ -245,55 +299,12 @@ func lookupPeer(peer *peerpkg.Peer, peers map[*peerpkg.Peer]*syncMgrPeer) *syncM return sp } -// resetHeaderState sets the headers-first mode state to values appropriate for -// syncing from a new peer. -func (m *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int64) { - m.headersFirstMode = false - m.headerList.Init() - m.startHeader = nil - - // When there is a next checkpoint, add an entry for the latest known - // block into the header pool. This allows the next downloaded header - // to prove it links to the chain properly. - if m.nextCheckpoint != nil { - node := headerNode{height: newestHeight, hash: newestHash} - m.headerList.PushBack(&node) - } -} - // SyncHeight returns latest known block being synced to. func (m *SyncManager) SyncHeight() int64 { m.syncHeightMtx.Lock() - defer m.syncHeightMtx.Unlock() - return m.syncHeight -} - -// findNextHeaderCheckpoint returns the next checkpoint after the passed height. -// It returns nil when there is not one either because the height is already -// later than the final checkpoint or some other reason such as disabled -// checkpoints. -func (m *SyncManager) findNextHeaderCheckpoint(height int64) *chaincfg.Checkpoint { - checkpoints := m.cfg.Chain.Checkpoints() - if len(checkpoints) == 0 { - return nil - } - - // There is no next checkpoint if the height is already after the final - // checkpoint. - finalCheckpoint := &checkpoints[len(checkpoints)-1] - if height >= finalCheckpoint.Height { - return nil - } - - // Find the next checkpoint. - nextCheckpoint := finalCheckpoint - for i := len(checkpoints) - 2; i >= 0; i-- { - if height >= checkpoints[i].Height { - break - } - nextCheckpoint = &checkpoints[i] - } - return nextCheckpoint + syncHeight := m.syncHeight + m.syncHeightMtx.Unlock() + return syncHeight } // chainBlockLocatorToHashes converts a block locator from chain to a slice @@ -310,17 +321,61 @@ func chainBlockLocatorToHashes(locator blockchain.BlockLocator) []chainhash.Hash return result } +// fetchNextBlocks creates and sends a request to the provided peer for the next +// blocks to be downloaded based on the current headers. +func (m *SyncManager) fetchNextBlocks(peer *syncMgrPeer) { + // Nothing to do if the target maximum number of blocks to request from the + // peer at the same time are already in flight. + numInFlight := len(peer.requestedBlocks) + if numInFlight >= maxInFlightBlocks { + return + } + + // Determine the next blocks to download based on the final block that has + // already been requested and the next blocks in the branch leading up to + // best known header. + chain := m.cfg.Chain + maxNeeded := uint8(maxInFlightBlocks - numInFlight) + neededBlocks := chain.NextNeededBlocks(maxNeeded, m.requestedBlocks) + if len(neededBlocks) == 0 { + return + } + + // Ensure the number of needed blocks does not exceed the max inventory + // per message. This should never happen because the constants above limit + // it to relatively small values. However, the code below relies on this + // assumption, so assert it. + if len(neededBlocks) > wire.MaxInvPerMsg { + log.Warnf("%d needed blocks exceeds max allowed per message", + len(neededBlocks)) + return + } + + // Build and send a getdata request for needed blocks. + gdmsg := wire.NewMsgGetDataSizeHint(uint(len(neededBlocks))) + for i := range neededBlocks { + hash := neededBlocks[i] + iv := wire.NewInvVect(wire.InvTypeBlock, hash) + m.requestedBlocks[*hash] = struct{}{} + peer.requestedBlocks[*hash] = struct{}{} + gdmsg.AddInvVect(iv) + } + peer.QueueMessage(gdmsg, nil) + +} + // startSync will choose the best peer among the available candidate peers to // download/sync the blockchain from. When syncing is already running, it // simply returns. It also examines the candidates for any which are no longer // candidates and removes them as needed. func (m *SyncManager) startSync() { - // Return now if we're already syncing. + // Nothing more to do when already syncing. if m.syncPeer != nil { return } - best := m.cfg.Chain.BestSnapshot() + chain := m.cfg.Chain + best := chain.BestSnapshot() var bestPeer *syncMgrPeer for _, peer := range m.peers { if !peer.syncCandidate { @@ -338,7 +393,7 @@ func (m *SyncManager) startSync() { continue } - // the best sync candidate is the most updated peer + // The best sync candidate is the most updated peer. if bestPeer == nil { bestPeer = peer } @@ -350,77 +405,78 @@ func (m *SyncManager) startSync() { // Update the state of whether or not the manager believes the chain is // fully synced to whatever the chain believes when there is no candidate // for a sync peer. + // + // Also, return now when there isn't a sync peer candidate as there is + // nothing more to do without one. if bestPeer == nil { m.isCurrentMtx.Lock() - m.isCurrent = m.cfg.Chain.IsCurrent() + m.isCurrent = chain.IsCurrent() m.isCurrentMtx.Unlock() + log.Warnf("No sync peer candidates available") + return } - // Start syncing from the best peer if one was selected. - if bestPeer != nil { - // Clear the requestedBlocks if the sync peer changes, otherwise - // we may ignore blocks we need that the last sync peer failed - // to send. - m.requestedBlocks = make(map[chainhash.Hash]struct{}) + // Start syncing from the best peer. - blkLocator := m.cfg.Chain.LatestBlockLocator() - locator := chainBlockLocatorToHashes(blkLocator) + // Clear the requestedBlocks if the sync peer changes, otherwise + // we may ignore blocks we need that the last sync peer failed + // to send. + m.requestedBlocks = make(map[chainhash.Hash]struct{}) - log.Infof("Syncing to block height %d from peer %v", - bestPeer.LastBlock(), bestPeer.Addr()) + syncHeight := bestPeer.LastBlock() - // The chain is not synced whenever the current best height is less than - // the height to sync to. - if best.Height < bestPeer.LastBlock() { - m.isCurrentMtx.Lock() - m.isCurrent = false - m.isCurrentMtx.Unlock() - } + headersSynced := m.hdrSyncState.headersSynced + if !headersSynced { + log.Infof("Syncing headers to block height %d from peer %v", syncHeight, + bestPeer) + } - // When the current height is less than a known checkpoint we - // can use block headers to learn about which blocks comprise - // the chain up to the checkpoint and perform less validation - // for them. This is possible since each header contains the - // hash of the previous header and a merkle root. Therefore if - // we validate all of the received headers link together - // properly and the checkpoint hashes match, we can be sure the - // hashes for the blocks in between are accurate. Further, once - // the full blocks are downloaded, the merkle root is computed - // and compared against the value in the header which proves the - // full block hasn't been tampered with. - // - // Once we have passed the final checkpoint, or checkpoints are - // disabled, use standard inv messages learn about the blocks - // and fully validate them. Finally, regression test mode does - // not support the headers-first approach so do normal block - // downloads when in regression test mode. - if m.nextCheckpoint != nil && - best.Height < m.nextCheckpoint.Height && - !m.cfg.DisableCheckpoints { - - err := bestPeer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) - if err != nil { - log.Errorf("Failed to push getheadermsg for the latest "+ - "blocks: %v", err) - return - } - m.headersFirstMode = true - log.Infof("Downloading headers for blocks %d to %d from peer %s", - best.Height+1, m.nextCheckpoint.Height, bestPeer.Addr()) - } else { - err := bestPeer.PushGetBlocksMsg(locator, &zeroHash) - if err != nil { - log.Errorf("Failed to push getblocksmsg for the latest "+ - "blocks: %v", err) - return - } - } - m.syncPeer = bestPeer - m.syncHeightMtx.Lock() - m.syncHeight = bestPeer.LastBlock() - m.syncHeightMtx.Unlock() - } else { - log.Warnf("No sync peer candidates available") + // The chain is not synced whenever the current best height is less than the + // height to sync to. + if best.Height < syncHeight { + m.isCurrentMtx.Lock() + m.isCurrent = false + m.isCurrentMtx.Unlock() + } + + // Request headers to discover any blocks that are not already known + // starting from the parent of the best known header for the local chain. + // The parent is used as a means to accurately discover the best known block + // of the remote peer in the case both tips are the same where it would + // otherwise result in an empty response. + bestHeaderHash, _ := chain.BestHeader() + parentHash := bestHeaderHash + header, err := chain.HeaderByHash(&bestHeaderHash) + if err == nil { + parentHash = header.PrevBlock + } + blkLocator := chain.BlockLocatorFromHash(&parentHash) + locator := chainBlockLocatorToHashes(blkLocator) + bestPeer.PushGetHeadersMsg(locator, &zeroHash) + + // Track the sync peer and update the sync height when it is higher than the + // currently best known value. + m.syncPeer = bestPeer + m.syncHeightMtx.Lock() + if syncHeight > m.syncHeight { + m.syncHeight = syncHeight + } + m.syncHeightMtx.Unlock() + + // Start the header sync progress stall timeout when the initial headers + // sync is not already done. + if !headersSynced { + m.hdrSyncState.resetStallTimeout() + } + + // Download any blocks needed to catch the local chain up to the best + // known header (if any) when the initial headers sync is already done. + // + // This is done in addition to the header request above to avoid waiting + // for the round trip when there are still blocks that are needed + // regardless of the headers response. + if headersSynced { + m.fetchNextBlocks(m.syncPeer) } } @@ -527,28 +583,23 @@ func (m *SyncManager) handleDonePeerMsg(p *peerpkg.Peer) { delete(m.peers, p) // Remove requested transactions from the global map so that they will - // be fetched from elsewhere next time we get an inv. + // be fetched from elsewhere. for txHash := range peer.requestedTxns { delete(m.requestedTxns, txHash) } // Remove requested blocks from the global map so that they will be - // fetched from elsewhere next time we get an inv. + // fetched from elsewhere. // TODO(oga) we could possibly here check which peers have these blocks // and request them now to speed things up a little. for blockHash := range peer.requestedBlocks { delete(m.requestedBlocks, blockHash) } - // Attempt to find a new peer to sync from if the quitting peer is the - // sync peer. Also, reset the headers-first state if in headers-first - // mode so + // Attempt to find a new peer to sync from and reset the final requested + // block when the quitting peer is the sync peer. if m.syncPeer == peer { m.syncPeer = nil - if m.headersFirstMode { - best := m.cfg.Chain.BestSnapshot() - m.resetHeaderState(&best.Hash, best.Height) - } m.startSync() } } @@ -685,135 +736,8 @@ func (m *SyncManager) handleTxMsg(tmsg *txMsg) { m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) } -// isKnownOrphan returns whether the passed hash is currently a known orphan. -// Keep in mind that only a limited number of orphans are held onto for a -// limited amount of time, so this function must not be used as an absolute way -// to test if a block is an orphan block. A full block (as opposed to just its -// hash) must be passed to ProcessBlock for that purpose. This function -// provides a mechanism for a caller to intelligently detect *recent* duplicate -// orphans and react accordingly. -// -// This function is safe for concurrent access. -func (m *SyncManager) isKnownOrphan(hash *chainhash.Hash) bool { - // Protect concurrent access. Using a read lock only so multiple readers - // can query without blocking each other. - m.orphanLock.RLock() - _, exists := m.orphans[*hash] - m.orphanLock.RUnlock() - return exists -} - -// orphanRoot returns the head of the chain for the provided hash from the map -// of orphan blocks. -// -// This function is safe for concurrent access. -func (m *SyncManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash { - // Protect concurrent access. Using a read lock only so multiple - // readers can query without blocking each other. - m.orphanLock.RLock() - defer m.orphanLock.RUnlock() - - // Keep looping while the parent of each orphaned block is known and is an - // orphan itself. - orphanRoot := hash - prevHash := hash - for { - orphan, exists := m.orphans[*prevHash] - if !exists { - break - } - orphanRoot = prevHash - prevHash = &orphan.block.MsgBlock().Header.PrevBlock - } - - return orphanRoot -} - -// removeOrphanBlock removes the passed orphan block from the orphan pool and -// previous orphan index. -func (m *SyncManager) removeOrphanBlock(orphan *orphanBlock) { - // Protect concurrent access. - m.orphanLock.Lock() - defer m.orphanLock.Unlock() - - // Remove the orphan block from the orphan pool. - orphanHash := orphan.block.Hash() - delete(m.orphans, *orphanHash) - - // Remove the reference from the previous orphan index too. An indexing - // for loop is intentionally used over a range here as range does not - // reevaluate the slice on each iteration nor does it adjust the index - // for the modified slice. - prevHash := &orphan.block.MsgBlock().Header.PrevBlock - orphans := m.prevOrphans[*prevHash] - for i := 0; i < len(orphans); i++ { - hash := orphans[i].block.Hash() - if hash.IsEqual(orphanHash) { - copy(orphans[i:], orphans[i+1:]) - orphans[len(orphans)-1] = nil - orphans = orphans[:len(orphans)-1] - i-- - } - } - m.prevOrphans[*prevHash] = orphans - - // Remove the map entry altogether if there are no longer any orphans - // which depend on the parent hash. - if len(m.prevOrphans[*prevHash]) == 0 { - delete(m.prevOrphans, *prevHash) - } -} - -// addOrphanBlock adds the passed block (which is already determined to be an -// orphan prior calling this function) to the orphan pool. It lazily cleans up -// any expired blocks so a separate cleanup poller doesn't need to be run. It -// also imposes a maximum limit on the number of outstanding orphan blocks and -// will remove the oldest received orphan block if the limit is exceeded. -func (m *SyncManager) addOrphanBlock(block *dcrutil.Block) { - // Remove expired orphan blocks. - for _, oBlock := range m.orphans { - if time.Now().After(oBlock.expiration) { - m.removeOrphanBlock(oBlock) - continue - } - - // Update the oldest orphan block pointer so it can be discarded - // in case the orphan pool fills up. - if m.oldestOrphan == nil || - oBlock.expiration.Before(m.oldestOrphan.expiration) { - m.oldestOrphan = oBlock - } - } - - // Limit orphan blocks to prevent memory exhaustion. - if len(m.orphans)+1 > maxOrphanBlocks { - // Remove the oldest orphan to make room for the new one. - m.removeOrphanBlock(m.oldestOrphan) - m.oldestOrphan = nil - } - - // Protect concurrent access. This is intentionally done here instead - // of near the top since removeOrphanBlock does its own locking and - // the range iterator is not invalidated by removing map entries. - m.orphanLock.Lock() - defer m.orphanLock.Unlock() - - // Insert the block into the orphan map with an expiration time - // 1 hour from now. - expiration := time.Now().Add(time.Hour) - oBlock := &orphanBlock{ - block: block, - expiration: expiration, - } - m.orphans[*block.Hash()] = oBlock - - // Add to previous hash lookup index for faster dependency lookups. - prevHash := &block.MsgBlock().Header.PrevBlock - m.prevOrphans[*prevHash] = append(m.prevOrphans[*prevHash], oBlock) -} - -// maybeUpdateIsCurrent potentially updates the manager to signal it believes the -// chain is considered synced. +// maybeUpdateIsCurrent potentially updates the manager to signal it believes +// the chain is considered synced. // // This function MUST be called with the is current mutex held (for writes). func (m *SyncManager) maybeUpdateIsCurrent() { @@ -831,103 +755,24 @@ func (m *SyncManager) maybeUpdateIsCurrent() { } } -// processOrphans determines if there are any orphans which depend on the passed -// block hash (they are no longer orphans if true) and potentially accepts them. -// It repeats the process for the newly accepted blocks (to detect further -// orphans which may no longer be orphans) until there are no more. -// -// The flags do not modify the behavior of this function directly, however they -// are needed to pass along to maybeAcceptBlock. -func (m *SyncManager) processOrphans(hash *chainhash.Hash, flags blockchain.BehaviorFlags) error { - // Start with processing at least the passed hash. Leave a little room for - // additional orphan blocks that need to be processed without needing to - // grow the array in the common case. - processHashes := make([]*chainhash.Hash, 0, 10) - processHashes = append(processHashes, hash) - for len(processHashes) > 0 { - // Pop the first hash to process from the slice. - processHash := processHashes[0] - processHashes[0] = nil // Prevent GC leak. - processHashes = processHashes[1:] - - // Look up all orphans that are parented by the block we just accepted. - // This will typically only be one, but it could be multiple if multiple - // blocks are mined and broadcast around the same time. The one with - // the most proof of work will eventually win out. An indexing for loop - // is intentionally used over a range here as range does not reevaluate - // the slice on each iteration nor does it adjust the index for the - // modified slice. - for i := 0; i < len(m.prevOrphans[*processHash]); i++ { - orphan := m.prevOrphans[*processHash][i] - if orphan == nil { - log.Warnf("Found a nil entry at index %d in the orphan "+ - "dependency list for block %v", i, processHash) - continue - } - - // Remove the orphan from the orphan pool. - orphanHash := orphan.block.Hash() - m.removeOrphanBlock(orphan) - i-- - - // Potentially accept the block into the block chain. - _, err := m.cfg.Chain.ProcessBlock(orphan.block, flags) - if err != nil { - return err - } - m.isCurrentMtx.Lock() - m.maybeUpdateIsCurrent() - m.isCurrentMtx.Unlock() - - // Add this block to the list of blocks to process so any orphan - // blocks that depend on this block are handled too. - processHashes = append(processHashes, orphanHash) - } - } - return nil -} - -// processBlockAndOrphans processes the provided block using the internal chain -// instance while keeping track of orphan blocks and also processing any orphans -// that depend on the passed block to potentially accept as well. +// processBlock processes the provided block using the internal chain instance. // // When no errors occurred during processing, the first return value indicates // the length of the fork the block extended. In the case it either extended // the best chain or is now the tip of the best chain due to causing a -// reorganize, the fork length will be 0. The second return value indicates -// whether or not the block is an orphan, in which case the fork length will -// also be zero as expected, because it, by definition, does not connect to the -// best chain. -func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, bool, error) { +// reorganize, the fork length will be 0. Orphans are rejected and can be +// detected by checking if the error is blockchain.ErrMissingParent. +func (m *SyncManager) processBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, error) { // Process the block to include validation, best chain selection, etc. - // - // Also, keep track of orphan blocks in the sync manager when the error - // returned indicates the block is an orphan. - blockHash := block.Hash() forkLen, err := m.cfg.Chain.ProcessBlock(block, flags) - if errors.Is(err, blockchain.ErrMissingParent) { - log.Infof("Adding orphan block %v with parent %v", blockHash, - block.MsgBlock().Header.PrevBlock) - m.addOrphanBlock(block) - - // The fork length of orphans is unknown since they, by definition, do - // not connect to the best chain. - return 0, true, nil - } if err != nil { - return 0, false, err + return 0, err } m.isCurrentMtx.Lock() m.maybeUpdateIsCurrent() m.isCurrentMtx.Unlock() - // Accept any orphan blocks that depend on this block (they are no longer - // orphans) and repeat for those accepted blocks until there are no more. - if err := m.processOrphans(blockHash, flags); err != nil { - return 0, false, err - } - - return forkLen, false, nil + return forkLen, nil } // handleBlockMsg handles block messages from all peers. @@ -941,49 +786,38 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { blockHash := bmsg.block.Hash() if _, exists := peer.requestedBlocks[*blockHash]; !exists { log.Warnf("Got unrequested block %v from %s -- disconnecting", - blockHash, peer.Addr()) + blockHash, peer) peer.Disconnect() return } - // When in headers-first mode, if the block matches the hash of the - // first header in the list of headers that are being fetched, it's - // eligible for less validation since the headers have already been - // verified to link together and are valid up to the next checkpoint. - // Also, remove the list entry for all blocks except the checkpoint - // since it is needed to verify the next round of headers links - // properly. - isCheckpointBlock := false - behaviorFlags := blockchain.BFNone - if m.headersFirstMode { - firstNodeEl := m.headerList.Front() - if firstNodeEl != nil { - firstNode := firstNodeEl.Value.(*headerNode) - if blockHash.IsEqual(firstNode.hash) { - behaviorFlags |= blockchain.BFFastAdd - if firstNode.hash.IsEqual(m.nextCheckpoint.Hash) { - isCheckpointBlock = true - } else { - m.headerList.Remove(firstNodeEl) - } - } - } - } + // Save whether or not the chain believes it is current prior to processing + // the block for use below in determining logging behavior. + chain := m.cfg.Chain + wasChainCurrent := chain.IsCurrent() - // Remove block from request maps. Either chain will know about it and - // so we shouldn't have any more instances of trying to fetch it, or we - // will fail the insert and thus we'll retry next time we get an inv. + // Process the block to include validation, best chain selection, etc. + // + // Also, remove the block from the request maps once it has been processed. + // This ensures chain is aware of the block before it is removed from the + // maps in order to help prevent duplicate requests. + forkLen, err := m.processBlock(bmsg.block, blockchain.BFNone) delete(peer.requestedBlocks, *blockHash) delete(m.requestedBlocks, *blockHash) - - // Process the block to include validation, best chain selection, orphan - // handling, etc. - forkLen, isOrphan, err := m.processBlockAndOrphans(bmsg.block, behaviorFlags) if err != nil { + // Ideally there should never be any requests for duplicate blocks, but + // ignore any that manage to make it through. + if errors.Is(err, blockchain.ErrDuplicateBlock) { + return + } + // When the error is a rule error, it means the block was simply - // rejected as opposed to something actually going wrong, so log - // it as such. Otherwise, something really did go wrong, so log - // it as an actual error. + // rejected as opposed to something actually going wrong, so log it as + // such. Otherwise, something really did go wrong, so log it as an + // actual error. + // + // Note that orphan blocks are never requested so there is no need to + // test for that rule error separately. var rErr blockchain.RuleError if errors.As(err, &rErr) { log.Infof("Rejected block %v from %s: %v", blockHash, peer, err) @@ -994,51 +828,69 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { log.Errorf("Critical failure: %v", err) } - // Convert the error into an appropriate reject message and - // send it. + // Convert the error into an appropriate reject message and send it. code, reason := errToWireRejectCode(err) peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false) return } - // Request the parents for the orphan block from the peer that sent it. - onMainChain := !isOrphan && forkLen == 0 - if isOrphan { - orphanRoot := m.orphanRoot(blockHash) - blkLocator := m.cfg.Chain.LatestBlockLocator() - locator := chainBlockLocatorToHashes(blkLocator) - err := peer.PushGetBlocksMsg(locator, orphanRoot) - if err != nil { - log.Warnf("Failed to push getblocksmsg for the latest block: %v", - err) + // Log information about the block. Use the progress logger when the chain + // was not already current prior to processing the block to provide nicer + // periodic logging with a progress percentage. Otherwise, log the block + // individually along with some stats. + msgBlock := bmsg.block.MsgBlock() + header := &msgBlock.Header + if !wasChainCurrent { + forceLog := int64(header.Height) >= m.SyncHeight() + m.progressLogger.LogProgress(msgBlock, forceLog, chain.VerifyProgress) + if chain.IsCurrent() { + best := chain.BestSnapshot() + log.Infof("Initial chain sync complete (hash %s, height %d)", + best.Hash, best.Height) } } else { - // When the block is not an orphan, log information about it and - // update the chain state. - m.progressLogger.LogBlockHeight(bmsg.block.MsgBlock(), m.SyncHeight()) - - if onMainChain { - // Notify stake difficulty subscribers and prune invalidated - // transactions. - best := m.cfg.Chain.BestSnapshot() - m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) - m.cfg.TxMemPool.PruneExpiredTx() - - // Clear the rejected transactions. - m.rejectedTxns = make(map[chainhash.Hash]struct{}) + var interval string + prevBlockHeader, err := chain.HeaderByHash(&header.PrevBlock) + if err == nil { + diff := header.Timestamp.Sub(prevBlockHeader.Timestamp) + interval = ", interval " + diff.Round(time.Second).String() } + + numTxns := uint64(len(msgBlock.Transactions)) + numTickets := uint64(header.FreshStake) + numVotes := uint64(header.Voters) + numRevokes := uint64(header.Revocations) + log.Infof("New block %s (%d %s, %d %s, %d %s, %d %s, height %d%s)", + blockHash, numTxns, pickNoun(numTxns, "transaction", "transactions"), + numTickets, pickNoun(numTickets, "ticket", "tickets"), + numVotes, pickNoun(numVotes, "vote", "votes"), + numRevokes, pickNoun(numRevokes, "revocation", "revocations"), + header.Height, interval) + } + + // Perform some additional processing when the block extended the main + // chain. + onMainChain := forkLen == 0 + if onMainChain { + // Prune invalidated transactions. + best := chain.BestSnapshot() + m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) + m.cfg.TxMemPool.PruneExpiredTx() + + // Clear the rejected transactions. + m.rejectedTxns = make(map[chainhash.Hash]struct{}) } // Update the latest block height for the peer to avoid stale heights when // looking for future potential sync node candidacy. // - // Also, when the block is an orphan or the chain is considered current and - // the block was accepted to the main chain, update the heights of other - // peers whose invs may have been ignored when actively syncing while the - // chain was not yet current or lost the lock announcement race. - blockHeight := int64(bmsg.block.MsgBlock().Header.Height) + // Also, when the chain is considered current and the block was accepted to + // the main chain, update the heights of other peers whose invs may have + // been ignored when actively syncing while the chain was not yet current or + // lost the lock announcement race. + blockHeight := int64(header.Height) peer.UpdateLastBlockHeight(blockHeight) - if isOrphan || (onMainChain && m.IsCurrent()) { + if onMainChain && m.IsCurrent() { for _, p := range m.peers { // The height for the sending peer is already updated. if p == peer { @@ -1053,97 +905,70 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // Nothing more to do if we aren't in headers-first mode. - if !m.headersFirstMode { - return - } - - // This is headers-first mode, so if the block is not a checkpoint - // request more blocks using the header list when the request queue is - // getting short. - if !isCheckpointBlock { - if m.startHeader != nil && - len(peer.requestedBlocks) < minInFlightBlocks { - m.fetchHeaderBlocks() - } - return + // Request more blocks using the headers when the request queue is getting + // short. + if peer == m.syncPeer && len(peer.requestedBlocks) < minInFlightBlocks { + m.fetchNextBlocks(peer) } +} - // This is headers-first mode and the block is a checkpoint. When - // there is a next checkpoint, get the next round of headers by asking - // for headers starting from the block after this one up to the next - // checkpoint. - prevHeight := m.nextCheckpoint.Height - prevHash := m.nextCheckpoint.Hash - m.nextCheckpoint = m.findNextHeaderCheckpoint(prevHeight) - if m.nextCheckpoint != nil { - locator := []chainhash.Hash{*prevHash} - err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) - if err != nil { - log.Warnf("Failed to send getheaders message to peer %s: %v", - peer.Addr(), err) - return - } - log.Infof("Downloading headers for blocks %d to %d from peer %s", - prevHeight+1, m.nextCheckpoint.Height, m.syncPeer.Addr()) - return - } +// guessHeaderSyncProgress returns a percentage that is a guess of the progress +// of the header sync progress for the given currently best known header based +// on an algorithm that considers the total number of expected headers based on +// the target time per block of the network. It should only be used for the +// main and test networks because it relies on relatively consistent mining +// which is not the case for other network such as the simulation test network. +// +// This function is safe for concurrent access. +func (m *SyncManager) guessHeaderSyncProgress(header *wire.BlockHeader) float64 { + // Calculate the expected total number of blocks to reach the current time + // by considering the number there already are plus the expected number of + // remaining ones there should be in the time interval since the provided + // best known header and the current time given the target block time. + // + // This approach is used as opposed to calculating the total expected since + // the genesis block since it gets more accurate as more headers are + // processed and thus provide more information. It is also more robust + // against networks with dynamic difficulty readjustment such as the test + // network. + curTimestamp := m.cfg.TimeSource.AdjustedTime().Unix() + targetSecsPerBlock := int64(m.cfg.ChainParams.TargetTimePerBlock.Seconds()) + remaining := (curTimestamp - header.Timestamp.Unix()) / targetSecsPerBlock + expectedTotal := int64(header.Height) + remaining + + // Finally the progress guess is simply the ratio of the current number of + // known headers to the total expected number of headers. + return math.Min(float64(header.Height)/float64(expectedTotal), 1.0) * 100 +} - // This is headers-first mode, the block is a checkpoint, and there are - // no more checkpoints, so switch to normal mode by requesting blocks - // from the block after this one up to the end of the chain (zero hash). - m.headersFirstMode = false - m.headerList.Init() - log.Infof("Reached the final checkpoint -- switching to normal mode") - locator := []chainhash.Hash{*blockHash} - err = peer.PushGetBlocksMsg(locator, &zeroHash) +// headerSyncProgress returns a percentage that is a guess of the progress of +// of the header sync process. +// +// This function is safe for concurrent access. +func (m *SyncManager) headerSyncProgress() float64 { + hash, _ := m.cfg.Chain.BestHeader() + header, err := m.cfg.Chain.HeaderByHash(&hash) if err != nil { - log.Warnf("Failed to send getblocks message to peer %s: %v", - peer.Addr(), err) - return + return 0.0 } -} -// fetchHeaderBlocks creates and sends a request to the syncPeer for the next -// list of blocks to be downloaded based on the current list of headers. -func (m *SyncManager) fetchHeaderBlocks() { - // Nothing to do if there is no start header. - if m.startHeader == nil { - log.Warnf("fetchHeaderBlocks called with no start header") - return + // Use an algorithm that considers the total number of expected headers + // based on the target time per block of the network for the main and test + // networks. This is the preferred approach because, unlike the sync height + // reported by remote peers, it is difficult to game since it is based on + // the target proof of work, but it assumes consistent mining, which is not + // the case on all networks, so limit it to the two where that applies. + net := m.cfg.ChainParams.Net + if net == wire.MainNet || net == wire.TestNet3 { + return m.guessHeaderSyncProgress(&header) } - // Build up a getdata request for the list of blocks the headers - // describe. The size hint will be limited to wire.MaxInvPerMsg by - // the function, so no need to double check it here. - gdmsg := wire.NewMsgGetDataSizeHint(uint(m.headerList.Len())) - numRequested := 0 - for e := m.startHeader; e != nil; e = e.Next() { - node, ok := e.Value.(*headerNode) - if !ok { - log.Warn("Header list node type is not a headerNode") - continue - } - - if m.needBlock(node.hash) { - iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) - m.requestedBlocks[*node.hash] = struct{}{} - m.syncPeer.requestedBlocks[*node.hash] = struct{}{} - err := gdmsg.AddInvVect(iv) - if err != nil { - log.Warnf("Failed to add invvect while fetching block "+ - "headers: %v", err) - } - numRequested++ - } - m.startHeader = e.Next() - if numRequested >= wire.MaxInvPerMsg { - break - } - } - if len(gdmsg.InvList) > 0 { - m.syncPeer.QueueMessage(gdmsg, nil) + // Fall back to using the sync height reported by the remote peer otherwise. + syncHeight := m.SyncHeight() + if syncHeight == 0 { + return 0.0 } + return math.Min(float64(header.Height)/float64(syncHeight), 1.0) * 100 } // handleHeadersMsg handles headers messages from all peers. @@ -1153,97 +978,226 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { return } - // The remote peer is misbehaving if we didn't request headers. - msg := hmsg.headers - numHeaders := len(msg.Headers) - if !m.headersFirstMode { - log.Warnf("Got %d unrequested headers from %s -- disconnecting", - numHeaders, peer.Addr()) - peer.Disconnect() + // Nothing to do for an empty headers message as it means the sending peer + // does not have any additional headers for the requested block locator. + headers := hmsg.headers.Headers + numHeaders := len(headers) + if numHeaders == 0 { return } - // Nothing to do for an empty headers message. - if numHeaders == 0 { + // Handle the case where the first header does not connect to any known + // headers specially. + chain := m.cfg.Chain + firstHeader := headers[0] + firstHeaderHash := firstHeader.BlockHash() + firstHeaderConnects := chain.HaveHeader(&firstHeader.PrevBlock) + headersSynced := m.hdrSyncState.headersSynced + if !firstHeaderConnects { + // Ignore headers that do not connect to any known headers when the + // initial headers sync is taking place. It is expected that headers + // will be announced that are not yet known. + if !headersSynced { + return + } + + // Attempt to detect block announcements which do not connect to any + // known headers and request any headers starting from the best header + // the local chain knows in order to (hopefully) discover the missing + // headers. + // + // Meanwhile, also keep track of how many times the peer has + // consecutively sent a headers message that does not connect and + // disconnect it once the max allowed threshold has been reached. + if numHeaders < maxExpectedHeaderAnnouncementsPerMsg { + peer.numConsecutiveOrphanHeaders++ + if peer.numConsecutiveOrphanHeaders >= maxConsecutiveOrphanHeaders { + log.Debugf("Received %d consecutive headers messages that do "+ + "not connect from peer %s -- disconnecting", + peer.numConsecutiveOrphanHeaders, peer) + peer.Disconnect() + } + + log.Debugf("Requesting missing parents for header %s (height %d) "+ + "received from peer %s", firstHeaderHash, firstHeader.Height, + peer) + bestHeaderHash, _ := chain.BestHeader() + blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash) + locator := chainBlockLocatorToHashes(blkLocator) + peer.PushGetHeadersMsg(locator, &zeroHash) + + return + } + + // The initial headers sync process is done and this does not appear to + // be a block announcement, so disconnect the peer. + log.Debugf("Received orphan header from peer %s -- disconnecting", peer) + peer.Disconnect() return } - // Process all of the received headers ensuring each one connects to the - // previous and that checkpoints match. - receivedCheckpoint := false - var finalHash *chainhash.Hash - for _, blockHeader := range msg.Headers { - blockHash := blockHeader.BlockHash() - finalHash = &blockHash - - // Ensure there is a previous header to compare against. - prevNodeEl := m.headerList.Back() - if prevNodeEl == nil { - log.Warnf("Header list does not contain a previous element as " + - "expected -- disconnecting peer") + // Ensure all of the received headers connect the previous one before + // attempting to perform any further processing on any of them. + headerHashes := make([]chainhash.Hash, 0, len(headers)) + headerHashes = append(headerHashes, firstHeaderHash) + for prevIdx, header := range headers[1:] { + prevHash := &headerHashes[prevIdx] + prevHeight := headers[prevIdx].Height + if header.PrevBlock != *prevHash || header.Height != prevHeight+1 { + log.Debugf("Received block header that does not properly connect "+ + "to previous one from peer %s -- disconnecting", peer) peer.Disconnect() return } + headerHashes = append(headerHashes, header.BlockHash()) + } - // Ensure the header properly connects to the previous one and - // add it to the list of headers. - node := headerNode{hash: &blockHash} - prevNode := prevNodeEl.Value.(*headerNode) - if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { - node.height = prevNode.height + 1 - e := m.headerList.PushBack(&node) - if m.startHeader == nil { - m.startHeader = e - } - } else { - log.Warnf("Received block header that does not properly connect "+ - "to the chain from peer %s -- disconnecting", peer.Addr()) + // Save the current best known header height prior to processing the headers + // so the code later is able to determine if any new useful headers were + // provided. + _, prevBestHeaderHeight := chain.BestHeader() + + // Process all of the received headers. + for _, header := range headers { + err := chain.ProcessBlockHeader(header, blockchain.BFNone) + if err != nil { + // Note that there is no need to check for an orphan header here + // because they were already verified to connect above. + + log.Debugf("Failed to process block header %s from peer %s: %v -- "+ + "disconnecting", header.BlockHash(), peer, err) peer.Disconnect() return } + } + + // All of the headers were either accepted or already known valid at this + // point. - // Verify the header at the next checkpoint height matches. - if node.height == m.nextCheckpoint.Height { - if node.hash.IsEqual(m.nextCheckpoint.Hash) { - receivedCheckpoint = true - log.Infof("Verified downloaded block header against "+ - "checkpoint at height %d/hash %s", node.height, node.hash) - } else { - log.Warnf("Block header at height %d/hash %s from peer %s "+ - "does NOT match expected checkpoint hash of %s -- "+ - "disconnecting", node.height, node.hash, peer.Addr(), - m.nextCheckpoint.Hash) + // Reset the header sync progress stall timeout when the headers are not + // already synced and progress was made. + newBestHeaderHash, newBestHeaderHeight := chain.BestHeader() + if peer == m.syncPeer && !headersSynced { + if newBestHeaderHeight > prevBestHeaderHeight { + m.hdrSyncState.resetStallTimeout() + } + } + + // Reset the count of consecutive headers messages that contained headers + // which do not connect. Note that this is intentionally only done when all + // of the provided headers are successfully processed above. + peer.numConsecutiveOrphanHeaders = 0 + + // Update the last announced block to the final one in the announced headers + // above and update the height for the peer too. + finalHeader := headers[len(headers)-1] + finalReceivedHash := &headerHashes[len(headerHashes)-1] + peer.UpdateLastAnnouncedBlock(finalReceivedHash) + peer.UpdateLastBlockHeight(int64(finalHeader.Height)) + + // Update the sync height if the new best known header height exceeds it. + syncHeight := m.SyncHeight() + if newBestHeaderHeight > syncHeight { + syncHeight = newBestHeaderHeight + m.syncHeightMtx.Lock() + m.syncHeight = syncHeight + m.syncHeightMtx.Unlock() + } + + // Disconnect outbound peers that have less cumulative work than the minimum + // value already known to have been achieved on the network a priori while + // the initial sync is still underway. This is determined by noting that a + // peer only sends fewer than the maximum number of headers per message when + // it has reached its best known header. + isChainCurrent := chain.IsCurrent() + receivedMaxHeaders := len(headers) == wire.MaxBlockHeadersPerMsg + if !isChainCurrent && !peer.Inbound() && !receivedMaxHeaders { + minKnownWork := m.cfg.ChainParams.MinKnownChainWork + if minKnownWork != nil { + workSum, err := chain.ChainWork(finalReceivedHash) + if err == nil && workSum.Cmp(minKnownWork) < 0 { + log.Debugf("Best known chain for peer %s has too little "+ + "cumulative work -- disconnecting", peer) peer.Disconnect() return } - break } } - // When this header is a checkpoint, switch to fetching the blocks for - // all of the headers since the last checkpoint. - if receivedCheckpoint { - // Since the first entry of the list is always the final block - // that is already in the database and is only used to ensure - // the next header links properly, it must be removed before - // fetching the blocks. - m.headerList.Remove(m.headerList.Front()) - log.Infof("Received %v block headers: Fetching blocks", - m.headerList.Len()) - m.progressLogger.SetLastLogTime(time.Now()) - m.fetchHeaderBlocks() - return + // Request more headers when the peer announced the maximum number of + // headers that can be sent in a single message since it probably has more. + if receivedMaxHeaders { + blkLocator := chain.BlockLocatorFromHash(finalReceivedHash) + locator := chainBlockLocatorToHashes(blkLocator) + peer.PushGetHeadersMsg(locator, &zeroHash) + + m.progressLogger.LogHeaderProgress(uint64(len(headers)), headersSynced, + m.headerSyncProgress) + } + + // Consider the headers synced once the sync peer sends a message with a + // final header that is within a few blocks of the sync height. + if !headersSynced && peer == m.syncPeer { + const syncHeightFetchOffset = 6 + if int64(finalHeader.Height)+syncHeightFetchOffset > syncHeight { + headersSynced = true + m.hdrSyncState.headersSynced = headersSynced + m.hdrSyncState.stopStallTimeout() + + m.progressLogger.LogHeaderProgress(uint64(len(headers)), + headersSynced, m.headerSyncProgress) + log.Infof("Initial headers sync complete (best header hash %s, "+ + "height %d)", newBestHeaderHash, newBestHeaderHeight) + log.Info("Syncing chain") + m.progressLogger.SetLastLogTime(time.Now()) + + // Potentially update whether the chain believes it is current now + // that the headers are synced. + chain.MaybeUpdateIsCurrent() + isChainCurrent = chain.IsCurrent() + if isChainCurrent { + best := chain.BestSnapshot() + log.Infof("Initial chain sync complete (hash %s, height %d)", + best.Hash, best.Height) + } + } } - // This header is not a checkpoint, so request the next batch of - // headers starting from the latest known header and ending with the - // next checkpoint. - locator := []chainhash.Hash{*finalHash} - err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) - if err != nil { - log.Warnf("Failed to send getheaders message to peer %s: %v", - peer.Addr(), err) - return + // Immediately download blocks associated with the announced headers once + // the chain is current. This allows downloading from whichever peer + // announces it first and also ensures any side chain blocks are downloaded + // for vote consideration. + // + // Ultimately, it would likely be better for this functionality to be moved + // to the code which determines the next blocks to request based on the + // available headers once that code supports downloading from multiple peers + // and associated infrastructure to efficiently determinine which peers have + // the associated block(s). + if isChainCurrent { + gdmsg := wire.NewMsgGetDataSizeHint(uint(len(headers))) + for i := range headerHashes { + // Skip the block when it has already been requested or is otherwise + // already known. + hash := &headerHashes[i] + _, isRequestedBlock := m.requestedBlocks[*hash] + if isRequestedBlock || chain.HaveBlock(hash) { + continue + } + + iv := wire.NewInvVect(wire.InvTypeBlock, hash) + limitAdd(m.requestedBlocks, *hash, maxRequestedBlocks) + limitAdd(peer.requestedBlocks, *hash, maxRequestedBlocks) + gdmsg.AddInvVect(iv) + } + if len(gdmsg.InvList) > 0 { + peer.QueueMessage(gdmsg, nil) + } + } + + // Download any blocks needed to catch the local chain up to the best known + // header (if any) once the initial headers sync is done. + if headersSynced && m.syncPeer != nil { + m.fetchNextBlocks(m.syncPeer) } } @@ -1272,12 +1226,6 @@ func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { } } -// needBlock returns whether or not the block needs to be downloaded. For -// example, it does not need to be downloaded when it is already known. -func (m *SyncManager) needBlock(hash *chainhash.Hash) bool { - return !m.isKnownOrphan(hash) && !m.cfg.Chain.HaveBlock(hash) -} - // needTx returns whether or not the transaction needs to be downloaded. For // example, it does not need to be downloaded when it is already known. func (m *SyncManager) needTx(hash *chainhash.Hash) bool { @@ -1300,14 +1248,14 @@ func (m *SyncManager) needTx(hash *chainhash.Hash) bool { return false } -// handleInvMsg handles inv messages from all peers. -// We examine the inventory advertised by the remote peer and act accordingly. +// handleInvMsg handles inv messages from all peers. This entails examining the +// inventory advertised by the remote peer for block and transaction +// announcements and acting accordingly. func (m *SyncManager) handleInvMsg(imsg *invMsg) { peer := lookupPeer(imsg.peer, m.peers) if peer == nil { return } - fromSyncPeer := peer == m.syncPeer isCurrent := m.IsCurrent() // Update state information regarding per-peer known inventory and determine @@ -1321,6 +1269,14 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { for _, iv := range imsg.inv.InvList { switch iv.Type { case wire.InvTypeBlock: + // NOTE: All block announcements are now made via headers and the + // decisions regarding which blocks to download are based on those + // headers. Therefore, there is no need to request anything here. + // + // Also, there realistically should not typically be any inv + // messages with a type of block for the same reason. However, it + // doesn't hurt to update the state accordingly just in case. + // Add the block to the cache of known inventory for the peer. This // helps avoid sending blocks to the peer that it is already known // to have. @@ -1329,22 +1285,6 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { // Update the last block in the announced inventory. lastBlock = iv - // Ignore block announcements for blocks that are from peers other - // than the sync peer before the chain is current or are otherwise - // not needed, such as when they're already available. - // - // This helps prevent fetching a mass of orphans. - if (!fromSyncPeer && !isCurrent) || !m.needBlock(&iv.Hash) { - continue - } - - // Request the block if there is not one already pending. - if _, exists := m.requestedBlocks[iv.Hash]; !exists { - limitAdd(m.requestedBlocks, iv.Hash, maxRequestedBlocks) - limitAdd(peer.requestedBlocks, iv.Hash, maxRequestedBlocks) - requestQueue = append(requestQueue, iv) - } - case wire.InvTypeTx: // Add the tx to the cache of known inventory for the peer. This // helps avoid sending transactions to the peer that it is already @@ -1372,26 +1312,6 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { } if lastBlock != nil { - // When the block is an orphan that we already have, the missing parent - // blocks were requested when the orphan was processed. In that case, - // there were more blocks missing than are allowed into a single - // inventory message. As a result, once this peer requested the final - // advertised block, the remote peer noticed and is now resending the - // orphan block as an available block to signal there are more missing - // blocks that need to be requested. - if m.isKnownOrphan(&lastBlock.Hash) { - // Request blocks starting at the latest known up to the root of the - // orphan that just came in. - orphanRoot := m.orphanRoot(&lastBlock.Hash) - blkLocator := m.cfg.Chain.LatestBlockLocator() - locator := chainBlockLocatorToHashes(blkLocator) - err := peer.PushGetBlocksMsg(locator, orphanRoot) - if err != nil { - log.Errorf("Failed to push getblocksmsg for orphan chain: %v", - err) - } - } - // Update the last announced block to the final one in the announced // inventory above (if any). In the case the header for that block is // already known, use that information to update the height for the peer @@ -1403,15 +1323,6 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { peer.UpdateLastBlockHeight(int64(header.Height)) } } - - // Request blocks after the last one advertised up to the final one the - // remote peer knows about (zero stop hash). - blkLocator := m.cfg.Chain.BlockLocatorFromHash(&lastBlock.Hash) - locator := chainBlockLocatorToHashes(blkLocator) - err := peer.PushGetBlocksMsg(locator, &zeroHash) - if err != nil { - log.Errorf("Failed to push getblocksmsg: %v", err) - } } // Request as much as possible at once. @@ -1512,18 +1423,16 @@ out: } case processBlockMsg: - forkLen, isOrphan, err := m.processBlockAndOrphans(msg.block, - msg.flags) + forkLen, err := m.processBlock(msg.block, msg.flags) if err != nil { msg.reply <- processBlockResponse{ - forkLen: forkLen, - isOrphan: isOrphan, - err: err, + forkLen: forkLen, + err: err, } continue } - onMainChain := !isOrphan && forkLen == 0 + onMainChain := forkLen == 0 if onMainChain { // Prune invalidated transactions. best := m.cfg.Chain.BestSnapshot() @@ -1533,8 +1442,7 @@ out: } msg.reply <- processBlockResponse{ - isOrphan: isOrphan, - err: nil, + err: nil, } case processTransactionMsg: @@ -1549,6 +1457,18 @@ out: log.Warnf("Invalid message type in event handler: %T", msg) } + case <-m.hdrSyncState.stallTimer.C: + // Mark the timer's channel as having been drained so the timer can + // safely be reset. + m.hdrSyncState.stallChanDrained = true + + // Disconnect the sync peer due to stalling the header sync process. + if m.syncPeer != nil { + log.Debugf("Header sync progress stalled from peer %s -- "+ + "disconnecting", m.syncPeer) + m.syncPeer.Disconnect() + } + case <-ctx.Done(): break out } @@ -1643,9 +1563,15 @@ func (m *SyncManager) RequestFromPeer(p *peerpkg.Peer, blocks, voteHashes, tSpendHashes []*chainhash.Hash) error { reply := make(chan requestFromPeerResponse, 1) + request := requestFromPeerMsg{ + peer: p, + blocks: blocks, + voteHashes: voteHashes, + tSpendHashes: tSpendHashes, + reply: reply, + } select { - case m.msgChan <- requestFromPeerMsg{peer: p, blocks: blocks, - voteHashes: voteHashes, tSpendHashes: tSpendHashes, reply: reply}: + case m.msgChan <- request: case <-m.quit: } @@ -1677,7 +1603,7 @@ func (m *SyncManager) requestFromPeer(p *peerpkg.Peer, blocks, voteHashes, } // Skip the block when it is already known. - if m.isKnownOrphan(bh) || m.cfg.Chain.HaveBlock(bh) { + if m.cfg.Chain.HaveBlock(bh) { continue } @@ -1778,7 +1704,7 @@ func (m *SyncManager) requestFromPeer(p *peerpkg.Peer, blocks, voteHashes, // ProcessBlock makes use of ProcessBlock on an internal instance of a block // chain. It is funneled through the sync manager since blockchain is not safe // for concurrent access. -func (m *SyncManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { +func (m *SyncManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) error { reply := make(chan processBlockResponse, 1) select { case m.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}: @@ -1787,9 +1713,9 @@ func (m *SyncManager) ProcessBlock(block *dcrutil.Block, flags blockchain.Behavi select { case response := <-reply: - return response.isOrphan, response.err + return response.err case <-m.quit: - return false, fmt.Errorf("sync manager stopped") + return fmt.Errorf("sync manager stopped") } } @@ -1862,6 +1788,10 @@ type Config struct { // transactions. Chain *blockchain.BlockChain + // TimeSource defines the median time source which is used to retrieve the + // current time adjusted by the median time offset. + TimeSource blockchain.MedianTimeSource + // TxMemPool specifies the mempool to use for processing transactions. TxMemPool *mempool.TxPool @@ -1869,10 +1799,6 @@ type Config struct { // It may return nil if there is no active RPC server. RpcServer func() *rpcserver.Server - // DisableCheckpoints indicates whether or not the sync manager should make - // use of checkpoints. - DisableCheckpoints bool - // NoMiningStateSync indicates whether or not the sync manager should // perform an initial mining state synchronization with peers once they are // believed to be fully synced. @@ -1896,35 +1822,17 @@ type Config struct { // New returns a new network chain synchronization manager. Use Run to begin // processing asynchronous events. func New(config *Config) *SyncManager { - m := SyncManager{ + return &SyncManager{ cfg: *config, rejectedTxns: make(map[chainhash.Hash]struct{}), requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), peers: make(map[*peerpkg.Peer]*syncMgrPeer), + hdrSyncState: makeHeaderSyncState(), progressLogger: progresslog.New("Processed", log), msgChan: make(chan interface{}, config.MaxPeers*3), - headerList: list.New(), quit: make(chan struct{}), - orphans: make(map[chainhash.Hash]*orphanBlock), - prevOrphans: make(map[chainhash.Hash][]*orphanBlock), + syncHeight: config.Chain.BestSnapshot().Height, isCurrent: config.Chain.IsCurrent(), } - - best := m.cfg.Chain.BestSnapshot() - if !m.cfg.DisableCheckpoints { - // Initialize the next checkpoint based on the current height. - m.nextCheckpoint = m.findNextHeaderCheckpoint(best.Height) - if m.nextCheckpoint != nil { - m.resetHeaderState(&best.Hash, best.Height) - } - } else { - log.Info("Checkpoints are disabled") - } - - m.syncHeightMtx.Lock() - m.syncHeight = best.Height - m.syncHeightMtx.Unlock() - - return &m } diff --git a/internal/progresslog/blocklogger.go b/internal/progresslog/blocklogger.go deleted file mode 100644 index d0274880a7..0000000000 --- a/internal/progresslog/blocklogger.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright (c) 2015-2020 The Decred developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -package progresslog - -import ( - "sync" - "time" - - "github.com/decred/dcrd/wire" - "github.com/decred/slog" -) - -// BlockLogger provides periodic logging for other services in order to show -// users progress of certain "actions" involving some or all current blocks. -// For example, syncing to best chain, indexing all blocks, etc. -type BlockLogger struct { - sync.Mutex - subsystemLogger slog.Logger - progressAction string - - receivedLogBlocks int64 - receivedLogTx int64 - receivedLogVotes int64 - receivedLogRevocations int64 - receivedLogTickets int64 - lastBlockLogTime time.Time -} - -// New returns a new block progress logger. -// -// The progress message is templated as follows: -// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod} -// ({numTxs} {transactions|transaction}, {numTickets} {tickets|ticket}, -// {numVotes} {votes|vote}, {numRevocations} {revocations|revocation}, -// height {lastBlockHeight}, {lastBlockTimeStamp}) -func New(progressMessage string, logger slog.Logger) *BlockLogger { - return &BlockLogger{ - lastBlockLogTime: time.Now(), - progressAction: progressMessage, - subsystemLogger: logger, - } -} - -// LogBlockHeight logs a new block height as an information message to show -// progress to the user. In order to prevent spam, it limits logging to one -// message every 10 seconds with duration and totals included. -func (b *BlockLogger) LogBlockHeight(block *wire.MsgBlock, syncHeight int64) { - b.Lock() - defer b.Unlock() - - header := &block.Header - b.receivedLogBlocks++ - b.receivedLogTx += int64(len(block.Transactions)) - b.receivedLogVotes += int64(header.Voters) - b.receivedLogRevocations += int64(header.Revocations) - b.receivedLogTickets += int64(header.FreshStake) - now := time.Now() - duration := now.Sub(b.lastBlockLogTime) - if int64(header.Height) < syncHeight && duration < time.Second*10 { - return - } - - // Truncate the duration to 10s of milliseconds. - durationMillis := int64(duration / time.Millisecond) - tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) - - // Log information about new block height. - blockStr := "blocks" - if b.receivedLogBlocks == 1 { - blockStr = "block" - } - txStr := "transactions" - if b.receivedLogTx == 1 { - txStr = "transaction" - } - ticketStr := "tickets" - if b.receivedLogTickets == 1 { - ticketStr = "ticket" - } - revocationStr := "revocations" - if b.receivedLogRevocations == 1 { - revocationStr = "revocation" - } - voteStr := "votes" - if b.receivedLogVotes == 1 { - voteStr = "vote" - } - b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, %d %s, %d %s, "+ - "%d %s, height %d, %s)", b.progressAction, b.receivedLogBlocks, - blockStr, tDuration, b.receivedLogTx, txStr, b.receivedLogTickets, - ticketStr, b.receivedLogVotes, voteStr, b.receivedLogRevocations, - revocationStr, header.Height, header.Timestamp) - - b.receivedLogBlocks = 0 - b.receivedLogTx = 0 - b.receivedLogVotes = 0 - b.receivedLogTickets = 0 - b.receivedLogRevocations = 0 - b.lastBlockLogTime = now -} - -// SetLastLogTime updates the last time data was logged to the provided time. -func (b *BlockLogger) SetLastLogTime(time time.Time) { - b.Lock() - b.lastBlockLogTime = time - b.Unlock() -} diff --git a/internal/progresslog/blocklogger_test.go b/internal/progresslog/blocklogger_test.go deleted file mode 100644 index e640134296..0000000000 --- a/internal/progresslog/blocklogger_test.go +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright (c) 2020 The Decred developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -package progresslog - -import ( - "io/ioutil" - "reflect" - "testing" - "time" - - "github.com/decred/dcrd/wire" - "github.com/decred/slog" -) - -var ( - backendLog = slog.NewBackend(ioutil.Discard) - testLog = backendLog.Logger("TEST") -) - -// TestLogBlockHeight ensures the logging functionality works as expected via -// a test logger. -func TestLogBlockHeight(t *testing.T) { - testBlocks := []wire.MsgBlock{{ - Header: wire.BlockHeader{ - Version: 1, - Height: 100000, - Timestamp: time.Unix(1293623863, 0), // 2010-12-29 11:57:43 +0000 UTC - Voters: 2, - Revocations: 1, - FreshStake: 4, - }, - Transactions: make([]*wire.MsgTx, 4), - STransactions: nil, - }, { - Header: wire.BlockHeader{ - Version: 1, - Height: 100001, - Timestamp: time.Unix(1293624163, 0), // 2010-12-29 12:02:43 +0000 UTC - Voters: 3, - Revocations: 2, - FreshStake: 2, - }, - Transactions: make([]*wire.MsgTx, 2), - STransactions: nil, - }, { - Header: wire.BlockHeader{ - Version: 1, - Height: 100002, - Timestamp: time.Unix(1293624463, 0), // 2010-12-29 12:07:43 +0000 UTC - Voters: 1, - Revocations: 3, - FreshStake: 1, - }, - Transactions: make([]*wire.MsgTx, 3), - STransactions: nil, - }} - - tests := []struct { - name string - reset bool - inputBlock *wire.MsgBlock - inputSyncHeight int64 - inputLastLogTime time.Time - wantReceivedLogBlocks int64 - wantReceivedLogTx int64 - wantReceivedLogVotes int64 - wantReceivedLogRevocations int64 - wantReceivedLogTickets int64 - }{{ - name: "round 1, block 0, last log time < 10 secs ago, < sync height", - inputBlock: &testBlocks[0], - inputSyncHeight: 100002, - inputLastLogTime: time.Now(), - wantReceivedLogBlocks: 1, - wantReceivedLogTx: 4, - wantReceivedLogVotes: 2, - wantReceivedLogRevocations: 1, - wantReceivedLogTickets: 4, - }, { - name: "round 1, block 1, last log time < 10 secs ago, < sync height", - inputBlock: &testBlocks[1], - inputSyncHeight: 100002, - inputLastLogTime: time.Now(), - wantReceivedLogBlocks: 2, - wantReceivedLogTx: 6, - wantReceivedLogVotes: 5, - wantReceivedLogRevocations: 3, - wantReceivedLogTickets: 6, - }, { - name: "round 1, block 2, last log time < 10 secs ago, < sync height", - inputBlock: &testBlocks[2], - inputSyncHeight: 100002, - inputLastLogTime: time.Now(), - wantReceivedLogBlocks: 0, - wantReceivedLogTx: 0, - wantReceivedLogVotes: 0, - wantReceivedLogRevocations: 0, - wantReceivedLogTickets: 0, - }, { - name: "round 2, block 0, last log time < 10 secs ago, < sync height", - reset: true, - inputBlock: &testBlocks[0], - inputSyncHeight: 100002, - inputLastLogTime: time.Now(), - wantReceivedLogBlocks: 1, - wantReceivedLogTx: 4, - wantReceivedLogVotes: 2, - wantReceivedLogRevocations: 1, - wantReceivedLogTickets: 4, - }, { - name: "round 2, block 1, last log time > 10 secs ago, < sync height", - inputBlock: &testBlocks[1], - inputSyncHeight: 100002, - inputLastLogTime: time.Now().Add(-11 * time.Second), - wantReceivedLogBlocks: 0, - wantReceivedLogTx: 0, - wantReceivedLogVotes: 0, - wantReceivedLogRevocations: 0, - wantReceivedLogTickets: 0, - }, { - name: "round 2, block 2, last log time > 10 secs ago, == sync height", - inputBlock: &testBlocks[2], - inputSyncHeight: 100002, - inputLastLogTime: time.Now().Add(-11 * time.Second), - wantReceivedLogBlocks: 0, - wantReceivedLogTx: 0, - wantReceivedLogVotes: 0, - wantReceivedLogRevocations: 0, - wantReceivedLogTickets: 0, - }} - - progressLogger := New("Wrote", testLog) - for _, test := range tests { - if test.reset { - progressLogger = New("Wrote", testLog) - } - progressLogger.SetLastLogTime(test.inputLastLogTime) - progressLogger.LogBlockHeight(test.inputBlock, test.inputSyncHeight) - wantBlockProgressLogger := &BlockLogger{ - receivedLogBlocks: test.wantReceivedLogBlocks, - receivedLogTx: test.wantReceivedLogTx, - receivedLogVotes: test.wantReceivedLogVotes, - receivedLogRevocations: test.wantReceivedLogRevocations, - receivedLogTickets: test.wantReceivedLogTickets, - lastBlockLogTime: progressLogger.lastBlockLogTime, - progressAction: progressLogger.progressAction, - subsystemLogger: progressLogger.subsystemLogger, - } - if !reflect.DeepEqual(progressLogger, wantBlockProgressLogger) { - t.Errorf("%s:\nwant: %+v\ngot: %+v\n", test.name, - wantBlockProgressLogger, progressLogger) - } - } -} diff --git a/internal/progresslog/doc.go b/internal/progresslog/doc.go index 0758a3b653..0a4a10d0af 100644 --- a/internal/progresslog/doc.go +++ b/internal/progresslog/doc.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 The Decred developers +// Copyright (c) 2020-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -16,6 +16,6 @@ Tests are included to ensure proper functionality. - Total number of tickets - Total number of revocations - Logs all cumulative data every 10 seconds -- Immediately logs any outstanding data when the provided sync height is reached +- Immediately logs any outstanding data when requested by the caller */ package progresslog diff --git a/internal/progresslog/logger.go b/internal/progresslog/logger.go new file mode 100644 index 0000000000..e3017824ce --- /dev/null +++ b/internal/progresslog/logger.go @@ -0,0 +1,135 @@ +// Copyright (c) 2015-2021 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package progresslog + +import ( + "sync" + "time" + + "github.com/decred/dcrd/wire" + "github.com/decred/slog" +) + +// pickNoun returns the singular or plural form of a noun depending on the +// provided count. +func pickNoun(n uint64, singular, plural string) string { + if n == 1 { + return singular + } + return plural +} + +// Logger provides periodic logging of progress towards some action such as +// syncing the chain. +type Logger struct { + sync.Mutex + subsystemLogger slog.Logger + progressAction string + + // lastLogTime tracks the last time a log statement was shown. + lastLogTime time.Time + + // These fields accumulate information about blocks between log statements. + receivedBlocks uint64 + receivedTxns uint64 + receivedVotes uint64 + receivedRevokes uint64 + receivedTickets uint64 + + // These fields accumulate information about headers between log statements. + receivedHeaders uint64 +} + +// New returns a new block progress logger. +func New(progressAction string, logger slog.Logger) *Logger { + return &Logger{ + lastLogTime: time.Now(), + progressAction: progressAction, + subsystemLogger: logger, + } +} + +// LogProgress accumulates details for the provided block and periodically +// (every 10 seconds) logs an information message to show progress to the user +// along with duration and totals included. +// +// The force flag may be used to force a log message to be shown regardless of +// the time the last one was shown. +// +// The progress message is templated as follows: +// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod} +// ({numTxs} {transactions|transaction}, {numTickets} {tickets|ticket}, +// {numVotes} {votes|vote}, {numRevocations} {revocations|revocation}, +// height {lastBlockHeight}, progress {progress}%) +func (l *Logger) LogProgress(block *wire.MsgBlock, forceLog bool, progressFn func() float64) { + l.Lock() + defer l.Unlock() + + header := &block.Header + l.receivedBlocks++ + l.receivedTxns += uint64(len(block.Transactions)) + l.receivedVotes += uint64(header.Voters) + l.receivedRevokes += uint64(header.Revocations) + l.receivedTickets += uint64(header.FreshStake) + now := time.Now() + duration := now.Sub(l.lastLogTime) + if !forceLog && duration < time.Second*10 { + return + } + + // Log information about chain progress. + l.subsystemLogger.Infof("%s %d %s in the last %0.2fs (%d %s, %d %s, %d %s, "+ + "%d %s, height %d, progress %0.2f%%)", l.progressAction, + l.receivedBlocks, pickNoun(l.receivedBlocks, "block", "blocks"), + duration.Seconds(), + l.receivedTxns, pickNoun(l.receivedTxns, "transaction", "transactions"), + l.receivedTickets, pickNoun(l.receivedTickets, "ticket", "tickets"), + l.receivedVotes, pickNoun(l.receivedVotes, "vote", "votes"), + l.receivedRevokes, pickNoun(l.receivedRevokes, "revocation", "revocations"), + header.Height, progressFn()) + + l.receivedBlocks = 0 + l.receivedTxns = 0 + l.receivedVotes = 0 + l.receivedTickets = 0 + l.receivedRevokes = 0 + l.lastLogTime = now +} + +// LogHeaderProgress accumulates the provided number of processed headers and +// periodically (every 10 seconds) logs an information message to show the +// header sync progress to the user along with duration and totals included. +// +// The force flag may be used to force a log message to be shown regardless of +// the time the last one was shown. +// +// The progress message is templated as follows: +// {progressAction} {numProcessed} {headers|header} in the last {timePeriod} +// (progress {progress}%) +func (l *Logger) LogHeaderProgress(processedHeaders uint64, forceLog bool, progressFn func() float64) { + l.receivedHeaders += processedHeaders + + now := time.Now() + duration := now.Sub(l.lastLogTime) + if !forceLog && duration < time.Second*10 { + return + } + + // Log information about header progress. + l.subsystemLogger.Infof("%s %d %s in the last %0.2fs (progress %0.2f%%)", + l.progressAction, l.receivedHeaders, + pickNoun(l.receivedHeaders, "header", "headers"), + duration.Seconds(), progressFn()) + + l.receivedHeaders = 0 + l.lastLogTime = now +} + +// SetLastLogTime updates the last time data was logged to the provided time. +func (l *Logger) SetLastLogTime(time time.Time) { + l.Lock() + l.lastLogTime = time + l.Unlock() +} diff --git a/internal/progresslog/logger_test.go b/internal/progresslog/logger_test.go new file mode 100644 index 0000000000..37add29e04 --- /dev/null +++ b/internal/progresslog/logger_test.go @@ -0,0 +1,228 @@ +// Copyright (c) 2021 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package progresslog + +import ( + "io/ioutil" + "reflect" + "testing" + "time" + + "github.com/decred/dcrd/wire" + "github.com/decred/slog" +) + +var ( + backendLog = slog.NewBackend(ioutil.Discard) + testLog = backendLog.Logger("TEST") +) + +// TestLogProgress ensures the logging functionality works as expected via a +// test logger. +func TestLogProgress(t *testing.T) { + testBlocks := []wire.MsgBlock{{ + Header: wire.BlockHeader{ + Version: 1, + Height: 100000, + Timestamp: time.Unix(1293623863, 0), // 2010-12-29 11:57:43 +0000 UTC + Voters: 2, + Revocations: 1, + FreshStake: 4, + }, + Transactions: make([]*wire.MsgTx, 4), + STransactions: nil, + }, { + Header: wire.BlockHeader{ + Version: 1, + Height: 100001, + Timestamp: time.Unix(1293624163, 0), // 2010-12-29 12:02:43 +0000 UTC + Voters: 3, + Revocations: 2, + FreshStake: 2, + }, + Transactions: make([]*wire.MsgTx, 2), + STransactions: nil, + }, { + Header: wire.BlockHeader{ + Version: 1, + Height: 100002, + Timestamp: time.Unix(1293624463, 0), // 2010-12-29 12:07:43 +0000 UTC + Voters: 1, + Revocations: 3, + FreshStake: 1, + }, + Transactions: make([]*wire.MsgTx, 3), + STransactions: nil, + }} + + tests := []struct { + name string + reset bool + inputBlock *wire.MsgBlock + forceLog bool + inputLastLogTime time.Time + wantReceivedBlocks uint64 + wantReceivedTxns uint64 + wantReceivedVotes uint64 + wantReceivedRevokes uint64 + wantReceivedTickets uint64 + }{{ + name: "round 1, block 0, last log time < 10 secs ago, not forced", + inputBlock: &testBlocks[0], + forceLog: false, + inputLastLogTime: time.Now(), + wantReceivedBlocks: 1, + wantReceivedTxns: 4, + wantReceivedVotes: 2, + wantReceivedRevokes: 1, + wantReceivedTickets: 4, + }, { + name: "round 1, block 1, last log time < 10 secs ago, not forced", + inputBlock: &testBlocks[1], + forceLog: false, + inputLastLogTime: time.Now(), + wantReceivedBlocks: 2, + wantReceivedTxns: 6, + wantReceivedVotes: 5, + wantReceivedRevokes: 3, + wantReceivedTickets: 6, + }, { + name: "round 1, block 2, last log time < 10 secs ago, forced", + inputBlock: &testBlocks[2], + forceLog: true, + inputLastLogTime: time.Now(), + wantReceivedBlocks: 0, + wantReceivedTxns: 0, + wantReceivedVotes: 0, + wantReceivedRevokes: 0, + wantReceivedTickets: 0, + }, { + name: "round 2, block 0, last log time < 10 secs ago, not forced", + reset: true, + inputBlock: &testBlocks[0], + forceLog: false, + inputLastLogTime: time.Now(), + wantReceivedBlocks: 1, + wantReceivedTxns: 4, + wantReceivedVotes: 2, + wantReceivedRevokes: 1, + wantReceivedTickets: 4, + }, { + name: "round 2, block 1, last log time > 10 secs ago, not forced", + inputBlock: &testBlocks[1], + forceLog: false, + inputLastLogTime: time.Now().Add(-11 * time.Second), + wantReceivedBlocks: 0, + wantReceivedTxns: 0, + wantReceivedVotes: 0, + wantReceivedRevokes: 0, + wantReceivedTickets: 0, + }, { + name: "round 2, block 2, last log time > 10 secs ago, forced", + inputBlock: &testBlocks[2], + forceLog: true, + inputLastLogTime: time.Now().Add(-11 * time.Second), + wantReceivedBlocks: 0, + wantReceivedTxns: 0, + wantReceivedVotes: 0, + wantReceivedRevokes: 0, + wantReceivedTickets: 0, + }} + + progressFn := func() float64 { return 0.0 } + progressLogger := New("Wrote", testLog) + for _, test := range tests { + if test.reset { + progressLogger = New("Wrote", testLog) + } + progressLogger.SetLastLogTime(test.inputLastLogTime) + progressLogger.LogProgress(test.inputBlock, test.forceLog, progressFn) + wantBlockProgressLogger := &Logger{ + receivedBlocks: test.wantReceivedBlocks, + receivedTxns: test.wantReceivedTxns, + receivedVotes: test.wantReceivedVotes, + receivedRevokes: test.wantReceivedRevokes, + receivedTickets: test.wantReceivedTickets, + lastLogTime: progressLogger.lastLogTime, + progressAction: progressLogger.progressAction, + subsystemLogger: progressLogger.subsystemLogger, + } + if !reflect.DeepEqual(progressLogger, wantBlockProgressLogger) { + t.Errorf("%s:\nwant: %+v\ngot: %+v\n", test.name, + wantBlockProgressLogger, progressLogger) + } + } +} + +// TestLogHeaderProgress ensures the logging functionality for headers works as +// expected via a test logger. +func TestLogHeaderProgress(t *testing.T) { + tests := []struct { + name string + reset bool + numHeaders uint64 + forceLog bool + lastLogTime time.Time + wantReceivedHeaders uint64 + }{{ + name: "round 1, batch 1, last log time < 10 secs ago, not forced", + numHeaders: 35500, + forceLog: false, + lastLogTime: time.Now(), + wantReceivedHeaders: 35500, + }, { + name: "round 1, batch 2, last log time < 10 secs ago, not forced", + numHeaders: 40000, + forceLog: false, + lastLogTime: time.Now(), + wantReceivedHeaders: 75500, + }, { + name: "round 1, batch 3, last log time < 10 secs ago, not forced", + numHeaders: 66500, + forceLog: false, + lastLogTime: time.Now(), + wantReceivedHeaders: 142000, + }, { + name: "round 2, batch 1, last log time < 10 secs ago, not forced", + reset: true, + numHeaders: 40000, + forceLog: false, + lastLogTime: time.Now(), + wantReceivedHeaders: 40000, + }, { + name: "round 2, batch 2, last log time > 10 secs ago, not forced", + numHeaders: 66500, + forceLog: false, + lastLogTime: time.Now().Add(-11 * time.Second), + wantReceivedHeaders: 0, + }, { + name: "round 2, batch 1, last log time < 10 secs ago, forced", + reset: true, + numHeaders: 54330, + forceLog: true, + lastLogTime: time.Now(), + wantReceivedHeaders: 0, + }} + + progressFn := func() float64 { return 0.0 } + logger := New("Wrote", testLog) + for _, test := range tests { + if test.reset { + logger = New("Wrote", testLog) + } + logger.SetLastLogTime(test.lastLogTime) + logger.LogHeaderProgress(test.numHeaders, test.forceLog, progressFn) + + want := &Logger{ + subsystemLogger: logger.subsystemLogger, + progressAction: logger.progressAction, + lastLogTime: logger.lastLogTime, + receivedHeaders: test.wantReceivedHeaders, + } + if !reflect.DeepEqual(logger, want) { + t.Errorf("%s:\nwant: %+v\ngot: %+v\n", test.name, want, logger) + } + } +} diff --git a/internal/rpcserver/interface.go b/internal/rpcserver/interface.go index dc3ebc24fa..c5cdb7c871 100644 --- a/internal/rpcserver/interface.go +++ b/internal/rpcserver/interface.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 The Decred developers +// Copyright (c) 2019-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -148,7 +148,7 @@ type SyncManager interface { // SubmitBlock submits the provided block to the network after // processing it locally. - SubmitBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) + SubmitBlock(block *dcrutil.Block) error // SyncPeerID returns the id of the current peer being synced with. SyncPeerID() int32 diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index b06e21a576..292e8dfc56 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -2035,10 +2035,10 @@ func handleGetBlockchainInfo(_ context.Context, s *Server, cmd interface{}) (int } // Estimate the verification progress of the node. - syncHeight := s.cfg.SyncMgr.SyncHeight() var verifyProgress float64 - if syncHeight > 0 { - verifyProgress = math.Min(float64(best.Height)/float64(syncHeight), 1.0) + if bestHeaderHeight > 0 { + progress := float64(best.Height) / float64(bestHeaderHeight) + verifyProgress = math.Min(progress, 1.0) } // Fetch the maximum allowed block size for all blocks other than the @@ -2101,7 +2101,7 @@ func handleGetBlockchainInfo(_ context.Context, s *Server, cmd interface{}) (int Chain: params.Name, Blocks: best.Height, Headers: bestHeaderHeight, - SyncHeight: syncHeight, + SyncHeight: s.cfg.SyncMgr.SyncHeight(), ChainWork: fmt.Sprintf("%064x", chainWork), InitialBlockDownload: !chain.IsCurrent(), VerificationProgress: verifyProgress, @@ -3783,8 +3783,14 @@ func handleGetWorkSubmission(_ context.Context, s *Server, hexData string) (inte // Process this block using the same rules as blocks coming from other // nodes. This will in turn relay it to the network like normal. - isOrphan, err := s.cfg.SyncMgr.SubmitBlock(block, blockchain.BFNone) + err = s.cfg.SyncMgr.SubmitBlock(block) if err != nil { + if errors.Is(err, blockchain.ErrMissingParent) { + log.Infof("Block submitted via getwork rejected: orphan building "+ + "on parent %v", block.MsgBlock().Header.PrevBlock) + return false, nil + } + // Anything other than a rule violation is an unexpected error, // so return that error as an internal error. var rErr blockchain.RuleError @@ -3797,12 +3803,6 @@ func handleGetWorkSubmission(_ context.Context, s *Server, hexData string) (inte return false, nil } - if isOrphan { - log.Infof("Block submitted via getwork rejected: an orphan "+ - "building on parent %v", block.MsgBlock().Header.PrevBlock) - return false, nil - } - // The block was accepted. log.Infof("Block submitted via getwork accepted: %s (height %d)", block.Hash(), msgBlock.Header.Height) @@ -4745,7 +4745,7 @@ func handleSubmitBlock(_ context.Context, s *Server, cmd interface{}) (interface return nil, rpcInternalError(err.Error(), "Block decode") } - _, err = s.cfg.SyncMgr.SubmitBlock(block, blockchain.BFNone) + err = s.cfg.SyncMgr.SubmitBlock(block) if err != nil { return fmt.Sprintf("rejected: %v", err), nil } diff --git a/internal/rpcserver/rpcserverhandlers_test.go b/internal/rpcserver/rpcserverhandlers_test.go index eedb74a925..53267ad57c 100644 --- a/internal/rpcserver/rpcserverhandlers_test.go +++ b/internal/rpcserver/rpcserverhandlers_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 The Decred developers +// Copyright (c) 2020-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -500,7 +500,6 @@ func (c *testAddrManager) LocalAddresses() []addrmgr.LocalAddr { // SyncManager interface. type testSyncManager struct { isCurrent bool - isOrphan bool submitBlockErr error syncPeerID int32 syncHeight int64 @@ -516,8 +515,8 @@ func (s *testSyncManager) IsCurrent() bool { // SubmitBlock provides a mock implementation for submitting the provided block // to the network after processing it locally. -func (s *testSyncManager) SubmitBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { - return s.isOrphan, s.submitBlockErr +func (s *testSyncManager) SubmitBlock(block *dcrutil.Block) error { + return s.submitBlockErr } // SyncPeer returns a mocked id of the current peer being synced with. @@ -1584,7 +1583,6 @@ func defaultMockDB() *testDB { // *testSyncManager. func defaultMockSyncManager() *testSyncManager { return &testSyncManager{ - isOrphan: false, syncHeight: 463074, } } @@ -3521,7 +3519,7 @@ func TestHandleGetBlockchainInfo(t *testing.T) { SyncHeight: int64(463074), ChainWork: "000000000000000000000000000000000000000000115d2833849090b0026506", InitialBlockDownload: true, - VerificationProgress: float64(0.9999978405179302), + VerificationProgress: float64(1), BestBlockHash: "00000000000000001e6ec1501c858506de1de4703d1be8bab4061126e8f61480", Difficulty: uint32(404696953), DifficultyRatio: float64(35256672611.3862), @@ -5929,9 +5927,14 @@ func TestHandleGetWork(t *testing.T) { hex.Encode(submissionB, data) submission := string(submissionB) - truncatedSubmission := submission[1:] lessThanGetWorkDataLen := submission[10:] + // Create an orphan block by mutating the prevblock field of the data. + orphanData := make([]byte, len(data)) + copy(orphanData, data) + orphanData[4] ^= 0x55 + orphanSubmission := hex.EncodeToString(orphanData) + buf = &bytes.Buffer{} buf.Write(submissionB[:10]) buf.WriteRune('g') @@ -6086,12 +6089,11 @@ func TestHandleGetWork(t *testing.T) { name: "handleGetWork: submission is an orphan", handler: handleGetWork, cmd: &types.GetWorkCmd{ - Data: &truncatedSubmission, + Data: &orphanSubmission, }, mockMiningState: mine(), mockSyncManager: func() *testSyncManager { syncManager := defaultMockSyncManager() - syncManager.isOrphan = true return syncManager }(), result: false, diff --git a/peer/log.go b/peer/log.go index 30feeb35a8..6c7233b790 100644 --- a/peer/log.go +++ b/peer/log.go @@ -1,5 +1,5 @@ // Copyright (c) 2015-2016 The btcsuite developers -// Copyright (c) 2016-2020 The Decred developers +// Copyright (c) 2016-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -188,7 +188,13 @@ func messageSummary(msg wire.Message) string { return locatorSummary(msg.BlockLocatorHashes, &msg.HashStop) case *wire.MsgHeaders: - return fmt.Sprintf("num %d", len(msg.Headers)) + summary := fmt.Sprintf("num %d", len(msg.Headers)) + if len(msg.Headers) > 0 { + finalHeader := msg.Headers[len(msg.Headers)-1] + summary = fmt.Sprintf("%s, final hash %s, height %d", summary, + finalHeader.BlockHash(), finalHeader.Height) + } + return summary case *wire.MsgReject: // Ensure the variable length strings don't contain any diff --git a/peer/peer.go b/peer/peer.go index d7f2cd776a..5caa65beaf 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2016-2020 The Decred developers +// Copyright (c) 2016-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -1090,40 +1090,45 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st // sent asynchronously and as a result of a long backlog of messages, // such as is typical in the case of initial block download, the // response won't be received in time. - log.Debugf("Adding deadline for command %s for peer %s", msgCmd, p.addr) - + // + // Also, getheaders is intentionally ignored since there is no guaranteed + // response if the remote peer does not have any headers for the locator. + var addedDeadline bool deadline := time.Now().Add(stallResponseTimeout) switch msgCmd { case wire.CmdVersion: // Expects a verack message. pendingResponses[wire.CmdVerAck] = deadline + addedDeadline = true case wire.CmdMemPool: // Expects an inv message. pendingResponses[wire.CmdInv] = deadline + addedDeadline = true case wire.CmdGetBlocks: // Expects an inv message. pendingResponses[wire.CmdInv] = deadline + addedDeadline = true case wire.CmdGetData: // Expects a block, tx, or notfound message. pendingResponses[wire.CmdBlock] = deadline pendingResponses[wire.CmdTx] = deadline pendingResponses[wire.CmdNotFound] = deadline - - case wire.CmdGetHeaders: - // Expects a headers message. Use a longer deadline since it - // can take a while for the remote peer to load all of the - // headers. - deadline = time.Now().Add(stallResponseTimeout * 3) - pendingResponses[wire.CmdHeaders] = deadline + addedDeadline = true case wire.CmdGetMiningState: pendingResponses[wire.CmdMiningState] = deadline + addedDeadline = true case wire.CmdGetInitState: pendingResponses[wire.CmdInitState] = deadline + addedDeadline = true + } + + if addedDeadline { + log.Debugf("Adding deadline for command %s for peer %s", msgCmd, p.addr) } } diff --git a/rpcadaptors.go b/rpcadaptors.go index 3f89c6184c..b7f59c24a6 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -1,5 +1,5 @@ // Copyright (c) 2017 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -330,8 +330,8 @@ func (b *rpcSyncMgr) IsCurrent() bool { // // This function is safe for concurrent access and is part of the // rpcserver.SyncManager interface implementation. -func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { - return b.syncMgr.ProcessBlock(block, flags) +func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block) error { + return b.syncMgr.ProcessBlock(block, blockchain.BFNone) } // SyncPeer returns the id of the current peer being synced with. diff --git a/server.go b/server.go index 6db838159e..a23789fd00 100644 --- a/server.go +++ b/server.go @@ -673,7 +673,7 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej // Ignore peers that have a protocol version that is too old. The peer // negotiation logic will disconnect it after this callback returns. - if msg.ProtocolVersion < int32(wire.InitialProcotolVersion) { + if msg.ProtocolVersion < int32(wire.SendHeadersVersion) { return nil } @@ -737,6 +737,13 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej return nil } +// OnVerAck is invoked when a peer receives a verack wire message. It creates +// and sends a sendheaders message to request all block annoucements are made +// via full headers instead of the inv message. +func (sp *serverPeer) OnVerAck(_ *peer.Peer, msg *wire.MsgVerAck) { + sp.QueueMessage(wire.NewMsgSendHeaders(), nil) +} + // OnMemPool is invoked when a peer receives a mempool wire message. It creates // and sends an inventory message with the contents of the memory pool up to the // maximum inventory allowed per message. @@ -1067,12 +1074,6 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) { // OnHeaders is invoked when a peer receives a headers wire message. The // message is passed down to the net sync manager. func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { - // Ban peers sending empty headers requests. - if len(msg.Headers) == 0 { - sp.server.BanPeer(sp) - return - } - sp.server.syncManager.QueueHeaders(msg, sp.Peer) } @@ -2068,6 +2069,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config { return &peer.Config{ Listeners: peer.MessageListeners{ OnVersion: sp.OnVersion, + OnVerAck: sp.OnVerAck, OnMemPool: sp.OnMemPool, OnGetMiningState: sp.OnGetMiningState, OnMiningState: sp.OnMiningState, @@ -3508,15 +3510,20 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP }, } s.txMemPool = mempool.New(&txC) + + // Create a new sync manager instance with the appropriate configuration. + if cfg.DisableCheckpoints { + srvrLog.Info("Checkpoints are disabled") + } s.syncManager = netsync.New(&netsync.Config{ PeerNotifier: &s, Chain: s.chain, ChainParams: s.chainParams, + TimeSource: s.timeSource, TxMemPool: s.txMemPool, RpcServer: func() *rpcserver.Server { return s.rpcServer }, - DisableCheckpoints: cfg.DisableCheckpoints, NoMiningStateSync: cfg.NoMiningStateSync, MaxPeers: cfg.MaxPeers, MaxOrphanTxs: cfg.MaxOrphanTxs,