From 0aeae0cbc787357d4b1da0fafce763c207ab26d4 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 04:37:18 -0600 Subject: [PATCH 01/10] progresslog: Make logger more generic. This modifies the progress logger to make it a bit more generic so that it can support logging header sync progress in the future too and modernizes the code while here. --- blockdb.go | 6 +- internal/netsync/manager.go | 6 +- internal/progresslog/blocklogger.go | 109 ---------------- internal/progresslog/blocklogger_test.go | 156 ----------------------- internal/progresslog/doc.go | 4 +- internal/progresslog/logger.go | 103 +++++++++++++++ internal/progresslog/logger_test.go | 156 +++++++++++++++++++++++ 7 files changed, 269 insertions(+), 271 deletions(-) delete mode 100644 internal/progresslog/blocklogger.go delete mode 100644 internal/progresslog/blocklogger_test.go create mode 100644 internal/progresslog/logger.go create mode 100644 internal/progresslog/logger_test.go diff --git a/blockdb.go b/blockdb.go index 45a6f02ef5..9cf6fa942a 100644 --- a/blockdb.go +++ b/blockdb.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -238,7 +238,9 @@ func dumpBlockChain(params *chaincfg.Params, b *blockchain.BlockChain) error { return err } - progressLogger.LogBlockHeight(bl.MsgBlock(), tipHeight) + msgBlock := bl.MsgBlock() + forceLog := int64(msgBlock.Header.Height) >= tipHeight + progressLogger.LogProgress(msgBlock, forceLog) } srvrLog.Infof("Successfully dumped the blockchain (%v blocks) to %v.", diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 1320e42f6c..d597f28e8b 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -203,7 +203,7 @@ type SyncManager struct { rejectedTxns map[chainhash.Hash]struct{} requestedTxns map[chainhash.Hash]struct{} requestedBlocks map[chainhash.Hash]struct{} - progressLogger *progresslog.BlockLogger + progressLogger *progresslog.Logger syncPeer *syncMgrPeer msgChan chan interface{} peers map[*peerpkg.Peer]*syncMgrPeer @@ -1015,7 +1015,9 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { } else { // When the block is not an orphan, log information about it and // update the chain state. - m.progressLogger.LogBlockHeight(bmsg.block.MsgBlock(), m.SyncHeight()) + msgBlock := bmsg.block.MsgBlock() + forceLog := int64(msgBlock.Header.Height) >= m.SyncHeight() + m.progressLogger.LogProgress(msgBlock, forceLog) if onMainChain { // Notify stake difficulty subscribers and prune invalidated diff --git a/internal/progresslog/blocklogger.go b/internal/progresslog/blocklogger.go deleted file mode 100644 index d0274880a7..0000000000 --- a/internal/progresslog/blocklogger.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright (c) 2015-2020 The Decred developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -package progresslog - -import ( - "sync" - "time" - - "github.com/decred/dcrd/wire" - "github.com/decred/slog" -) - -// BlockLogger provides periodic logging for other services in order to show -// users progress of certain "actions" involving some or all current blocks. -// For example, syncing to best chain, indexing all blocks, etc. -type BlockLogger struct { - sync.Mutex - subsystemLogger slog.Logger - progressAction string - - receivedLogBlocks int64 - receivedLogTx int64 - receivedLogVotes int64 - receivedLogRevocations int64 - receivedLogTickets int64 - lastBlockLogTime time.Time -} - -// New returns a new block progress logger. -// -// The progress message is templated as follows: -// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod} -// ({numTxs} {transactions|transaction}, {numTickets} {tickets|ticket}, -// {numVotes} {votes|vote}, {numRevocations} {revocations|revocation}, -// height {lastBlockHeight}, {lastBlockTimeStamp}) -func New(progressMessage string, logger slog.Logger) *BlockLogger { - return &BlockLogger{ - lastBlockLogTime: time.Now(), - progressAction: progressMessage, - subsystemLogger: logger, - } -} - -// LogBlockHeight logs a new block height as an information message to show -// progress to the user. In order to prevent spam, it limits logging to one -// message every 10 seconds with duration and totals included. -func (b *BlockLogger) LogBlockHeight(block *wire.MsgBlock, syncHeight int64) { - b.Lock() - defer b.Unlock() - - header := &block.Header - b.receivedLogBlocks++ - b.receivedLogTx += int64(len(block.Transactions)) - b.receivedLogVotes += int64(header.Voters) - b.receivedLogRevocations += int64(header.Revocations) - b.receivedLogTickets += int64(header.FreshStake) - now := time.Now() - duration := now.Sub(b.lastBlockLogTime) - if int64(header.Height) < syncHeight && duration < time.Second*10 { - return - } - - // Truncate the duration to 10s of milliseconds. - durationMillis := int64(duration / time.Millisecond) - tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) - - // Log information about new block height. - blockStr := "blocks" - if b.receivedLogBlocks == 1 { - blockStr = "block" - } - txStr := "transactions" - if b.receivedLogTx == 1 { - txStr = "transaction" - } - ticketStr := "tickets" - if b.receivedLogTickets == 1 { - ticketStr = "ticket" - } - revocationStr := "revocations" - if b.receivedLogRevocations == 1 { - revocationStr = "revocation" - } - voteStr := "votes" - if b.receivedLogVotes == 1 { - voteStr = "vote" - } - b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, %d %s, %d %s, "+ - "%d %s, height %d, %s)", b.progressAction, b.receivedLogBlocks, - blockStr, tDuration, b.receivedLogTx, txStr, b.receivedLogTickets, - ticketStr, b.receivedLogVotes, voteStr, b.receivedLogRevocations, - revocationStr, header.Height, header.Timestamp) - - b.receivedLogBlocks = 0 - b.receivedLogTx = 0 - b.receivedLogVotes = 0 - b.receivedLogTickets = 0 - b.receivedLogRevocations = 0 - b.lastBlockLogTime = now -} - -// SetLastLogTime updates the last time data was logged to the provided time. -func (b *BlockLogger) SetLastLogTime(time time.Time) { - b.Lock() - b.lastBlockLogTime = time - b.Unlock() -} diff --git a/internal/progresslog/blocklogger_test.go b/internal/progresslog/blocklogger_test.go deleted file mode 100644 index e640134296..0000000000 --- a/internal/progresslog/blocklogger_test.go +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright (c) 2020 The Decred developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -package progresslog - -import ( - "io/ioutil" - "reflect" - "testing" - "time" - - "github.com/decred/dcrd/wire" - "github.com/decred/slog" -) - -var ( - backendLog = slog.NewBackend(ioutil.Discard) - testLog = backendLog.Logger("TEST") -) - -// TestLogBlockHeight ensures the logging functionality works as expected via -// a test logger. -func TestLogBlockHeight(t *testing.T) { - testBlocks := []wire.MsgBlock{{ - Header: wire.BlockHeader{ - Version: 1, - Height: 100000, - Timestamp: time.Unix(1293623863, 0), // 2010-12-29 11:57:43 +0000 UTC - Voters: 2, - Revocations: 1, - FreshStake: 4, - }, - Transactions: make([]*wire.MsgTx, 4), - STransactions: nil, - }, { - Header: wire.BlockHeader{ - Version: 1, - Height: 100001, - Timestamp: time.Unix(1293624163, 0), // 2010-12-29 12:02:43 +0000 UTC - Voters: 3, - Revocations: 2, - FreshStake: 2, - }, - Transactions: make([]*wire.MsgTx, 2), - STransactions: nil, - }, { - Header: wire.BlockHeader{ - Version: 1, - Height: 100002, - Timestamp: time.Unix(1293624463, 0), // 2010-12-29 12:07:43 +0000 UTC - Voters: 1, - Revocations: 3, - FreshStake: 1, - }, - Transactions: make([]*wire.MsgTx, 3), - STransactions: nil, - }} - - tests := []struct { - name string - reset bool - inputBlock *wire.MsgBlock - inputSyncHeight int64 - inputLastLogTime time.Time - wantReceivedLogBlocks int64 - wantReceivedLogTx int64 - wantReceivedLogVotes int64 - wantReceivedLogRevocations int64 - wantReceivedLogTickets int64 - }{{ - name: "round 1, block 0, last log time < 10 secs ago, < sync height", - inputBlock: &testBlocks[0], - inputSyncHeight: 100002, - inputLastLogTime: time.Now(), - wantReceivedLogBlocks: 1, - wantReceivedLogTx: 4, - wantReceivedLogVotes: 2, - wantReceivedLogRevocations: 1, - wantReceivedLogTickets: 4, - }, { - name: "round 1, block 1, last log time < 10 secs ago, < sync height", - inputBlock: &testBlocks[1], - inputSyncHeight: 100002, - inputLastLogTime: time.Now(), - wantReceivedLogBlocks: 2, - wantReceivedLogTx: 6, - wantReceivedLogVotes: 5, - wantReceivedLogRevocations: 3, - wantReceivedLogTickets: 6, - }, { - name: "round 1, block 2, last log time < 10 secs ago, < sync height", - inputBlock: &testBlocks[2], - inputSyncHeight: 100002, - inputLastLogTime: time.Now(), - wantReceivedLogBlocks: 0, - wantReceivedLogTx: 0, - wantReceivedLogVotes: 0, - wantReceivedLogRevocations: 0, - wantReceivedLogTickets: 0, - }, { - name: "round 2, block 0, last log time < 10 secs ago, < sync height", - reset: true, - inputBlock: &testBlocks[0], - inputSyncHeight: 100002, - inputLastLogTime: time.Now(), - wantReceivedLogBlocks: 1, - wantReceivedLogTx: 4, - wantReceivedLogVotes: 2, - wantReceivedLogRevocations: 1, - wantReceivedLogTickets: 4, - }, { - name: "round 2, block 1, last log time > 10 secs ago, < sync height", - inputBlock: &testBlocks[1], - inputSyncHeight: 100002, - inputLastLogTime: time.Now().Add(-11 * time.Second), - wantReceivedLogBlocks: 0, - wantReceivedLogTx: 0, - wantReceivedLogVotes: 0, - wantReceivedLogRevocations: 0, - wantReceivedLogTickets: 0, - }, { - name: "round 2, block 2, last log time > 10 secs ago, == sync height", - inputBlock: &testBlocks[2], - inputSyncHeight: 100002, - inputLastLogTime: time.Now().Add(-11 * time.Second), - wantReceivedLogBlocks: 0, - wantReceivedLogTx: 0, - wantReceivedLogVotes: 0, - wantReceivedLogRevocations: 0, - wantReceivedLogTickets: 0, - }} - - progressLogger := New("Wrote", testLog) - for _, test := range tests { - if test.reset { - progressLogger = New("Wrote", testLog) - } - progressLogger.SetLastLogTime(test.inputLastLogTime) - progressLogger.LogBlockHeight(test.inputBlock, test.inputSyncHeight) - wantBlockProgressLogger := &BlockLogger{ - receivedLogBlocks: test.wantReceivedLogBlocks, - receivedLogTx: test.wantReceivedLogTx, - receivedLogVotes: test.wantReceivedLogVotes, - receivedLogRevocations: test.wantReceivedLogRevocations, - receivedLogTickets: test.wantReceivedLogTickets, - lastBlockLogTime: progressLogger.lastBlockLogTime, - progressAction: progressLogger.progressAction, - subsystemLogger: progressLogger.subsystemLogger, - } - if !reflect.DeepEqual(progressLogger, wantBlockProgressLogger) { - t.Errorf("%s:\nwant: %+v\ngot: %+v\n", test.name, - wantBlockProgressLogger, progressLogger) - } - } -} diff --git a/internal/progresslog/doc.go b/internal/progresslog/doc.go index 0758a3b653..0a4a10d0af 100644 --- a/internal/progresslog/doc.go +++ b/internal/progresslog/doc.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 The Decred developers +// Copyright (c) 2020-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -16,6 +16,6 @@ Tests are included to ensure proper functionality. - Total number of tickets - Total number of revocations - Logs all cumulative data every 10 seconds -- Immediately logs any outstanding data when the provided sync height is reached +- Immediately logs any outstanding data when requested by the caller */ package progresslog diff --git a/internal/progresslog/logger.go b/internal/progresslog/logger.go new file mode 100644 index 0000000000..636be36c94 --- /dev/null +++ b/internal/progresslog/logger.go @@ -0,0 +1,103 @@ +// Copyright (c) 2015-2021 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package progresslog + +import ( + "sync" + "time" + + "github.com/decred/dcrd/wire" + "github.com/decred/slog" +) + +// pickNoun returns the singular or plural form of a noun depending on the +// provided count. +func pickNoun(n uint64, singular, plural string) string { + if n == 1 { + return singular + } + return plural +} + +// Logger provides periodic logging of progress towards some action such as +// syncing the chain. +type Logger struct { + sync.Mutex + subsystemLogger slog.Logger + progressAction string + + // lastLogTime tracks the last time a log statement was shown. + lastLogTime time.Time + + // These fields accumulate information about blocks between log statements. + receivedBlocks uint64 + receivedTxns uint64 + receivedVotes uint64 + receivedRevokes uint64 + receivedTickets uint64 +} + +// New returns a new block progress logger. +func New(progressAction string, logger slog.Logger) *Logger { + return &Logger{ + lastLogTime: time.Now(), + progressAction: progressAction, + subsystemLogger: logger, + } +} + +// LogProgress accumulates details for the provided block and periodically +// (every 10 seconds) logs an information message to show progress to the user +// along with duration and totals included. +// +// The force flag may be used to force a log message to be shown regardless of +// the time the last one was shown. +// +// The progress message is templated as follows: +// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod} +// ({numTxs} {transactions|transaction}, {numTickets} {tickets|ticket}, +// {numVotes} {votes|vote}, {numRevocations} {revocations|revocation}, +// height {lastBlockHeight}, {lastBlockTimeStamp}) +func (l *Logger) LogProgress(block *wire.MsgBlock, forceLog bool) { + l.Lock() + defer l.Unlock() + + header := &block.Header + l.receivedBlocks++ + l.receivedTxns += uint64(len(block.Transactions)) + l.receivedVotes += uint64(header.Voters) + l.receivedRevokes += uint64(header.Revocations) + l.receivedTickets += uint64(header.FreshStake) + now := time.Now() + duration := now.Sub(l.lastLogTime) + if !forceLog && duration < time.Second*10 { + return + } + + // Log information about chain progress. + l.subsystemLogger.Infof("%s %d %s in the last %0.2fs (%d %s, %d %s, %d %s,"+ + " %d %s, height %d, %s)", l.progressAction, + l.receivedBlocks, pickNoun(l.receivedBlocks, "block", "blocks"), + duration.Seconds(), + l.receivedTxns, pickNoun(l.receivedTxns, "transaction", "transactions"), + l.receivedTickets, pickNoun(l.receivedTickets, "ticket", "tickets"), + l.receivedVotes, pickNoun(l.receivedVotes, "vote", "votes"), + l.receivedRevokes, pickNoun(l.receivedRevokes, "revocation", "revocations"), + header.Height, header.Timestamp) + + l.receivedBlocks = 0 + l.receivedTxns = 0 + l.receivedVotes = 0 + l.receivedTickets = 0 + l.receivedRevokes = 0 + l.lastLogTime = now +} + +// SetLastLogTime updates the last time data was logged to the provided time. +func (l *Logger) SetLastLogTime(time time.Time) { + l.Lock() + l.lastLogTime = time + l.Unlock() +} diff --git a/internal/progresslog/logger_test.go b/internal/progresslog/logger_test.go new file mode 100644 index 0000000000..b6ee50c65a --- /dev/null +++ b/internal/progresslog/logger_test.go @@ -0,0 +1,156 @@ +// Copyright (c) 2021 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package progresslog + +import ( + "io/ioutil" + "reflect" + "testing" + "time" + + "github.com/decred/dcrd/wire" + "github.com/decred/slog" +) + +var ( + backendLog = slog.NewBackend(ioutil.Discard) + testLog = backendLog.Logger("TEST") +) + +// TestLogProgress ensures the logging functionality works as expected via a +// test logger. +func TestLogProgress(t *testing.T) { + testBlocks := []wire.MsgBlock{{ + Header: wire.BlockHeader{ + Version: 1, + Height: 100000, + Timestamp: time.Unix(1293623863, 0), // 2010-12-29 11:57:43 +0000 UTC + Voters: 2, + Revocations: 1, + FreshStake: 4, + }, + Transactions: make([]*wire.MsgTx, 4), + STransactions: nil, + }, { + Header: wire.BlockHeader{ + Version: 1, + Height: 100001, + Timestamp: time.Unix(1293624163, 0), // 2010-12-29 12:02:43 +0000 UTC + Voters: 3, + Revocations: 2, + FreshStake: 2, + }, + Transactions: make([]*wire.MsgTx, 2), + STransactions: nil, + }, { + Header: wire.BlockHeader{ + Version: 1, + Height: 100002, + Timestamp: time.Unix(1293624463, 0), // 2010-12-29 12:07:43 +0000 UTC + Voters: 1, + Revocations: 3, + FreshStake: 1, + }, + Transactions: make([]*wire.MsgTx, 3), + STransactions: nil, + }} + + tests := []struct { + name string + reset bool + inputBlock *wire.MsgBlock + forceLog bool + inputLastLogTime time.Time + wantReceivedBlocks uint64 + wantReceivedTxns uint64 + wantReceivedVotes uint64 + wantReceivedRevokes uint64 + wantReceivedTickets uint64 + }{{ + name: "round 1, block 0, last log time < 10 secs ago, not forced", + inputBlock: &testBlocks[0], + forceLog: false, + inputLastLogTime: time.Now(), + wantReceivedBlocks: 1, + wantReceivedTxns: 4, + wantReceivedVotes: 2, + wantReceivedRevokes: 1, + wantReceivedTickets: 4, + }, { + name: "round 1, block 1, last log time < 10 secs ago, not forced", + inputBlock: &testBlocks[1], + forceLog: false, + inputLastLogTime: time.Now(), + wantReceivedBlocks: 2, + wantReceivedTxns: 6, + wantReceivedVotes: 5, + wantReceivedRevokes: 3, + wantReceivedTickets: 6, + }, { + name: "round 1, block 2, last log time < 10 secs ago, forced", + inputBlock: &testBlocks[2], + forceLog: true, + inputLastLogTime: time.Now(), + wantReceivedBlocks: 0, + wantReceivedTxns: 0, + wantReceivedVotes: 0, + wantReceivedRevokes: 0, + wantReceivedTickets: 0, + }, { + name: "round 2, block 0, last log time < 10 secs ago, not forced", + reset: true, + inputBlock: &testBlocks[0], + forceLog: false, + inputLastLogTime: time.Now(), + wantReceivedBlocks: 1, + wantReceivedTxns: 4, + wantReceivedVotes: 2, + wantReceivedRevokes: 1, + wantReceivedTickets: 4, + }, { + name: "round 2, block 1, last log time > 10 secs ago, not forced", + inputBlock: &testBlocks[1], + forceLog: false, + inputLastLogTime: time.Now().Add(-11 * time.Second), + wantReceivedBlocks: 0, + wantReceivedTxns: 0, + wantReceivedVotes: 0, + wantReceivedRevokes: 0, + wantReceivedTickets: 0, + }, { + name: "round 2, block 2, last log time > 10 secs ago, forced", + inputBlock: &testBlocks[2], + forceLog: true, + inputLastLogTime: time.Now().Add(-11 * time.Second), + wantReceivedBlocks: 0, + wantReceivedTxns: 0, + wantReceivedVotes: 0, + wantReceivedRevokes: 0, + wantReceivedTickets: 0, + }} + + progressLogger := New("Wrote", testLog) + for _, test := range tests { + if test.reset { + progressLogger = New("Wrote", testLog) + } + progressLogger.SetLastLogTime(test.inputLastLogTime) + progressLogger.LogProgress(test.inputBlock, test.forceLog) + wantBlockProgressLogger := &Logger{ + receivedBlocks: test.wantReceivedBlocks, + receivedTxns: test.wantReceivedTxns, + receivedVotes: test.wantReceivedVotes, + receivedRevokes: test.wantReceivedRevokes, + receivedTickets: test.wantReceivedTickets, + lastLogTime: progressLogger.lastLogTime, + progressAction: progressLogger.progressAction, + subsystemLogger: progressLogger.subsystemLogger, + } + if !reflect.DeepEqual(progressLogger, wantBlockProgressLogger) { + t.Errorf("%s:\nwant: %+v\ngot: %+v\n", test.name, + wantBlockProgressLogger, progressLogger) + } + } +} From bc380439edc7e517eec84c82eb9a015315669487 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 04:37:19 -0600 Subject: [PATCH 02/10] netsync: Remove unused submit block flags param. This removes the flag from the SubmitBlock method of the sync manager since those flags are not really intended for general block submission which is why it is only ever called with blockchain.BFNone. Also, the intention is to modify the way the sync manager deals with block submission in the future which this change help simplify. --- internal/rpcserver/interface.go | 4 ++-- internal/rpcserver/rpcserver.go | 6 +++--- internal/rpcserver/rpcserverhandlers_test.go | 4 ++-- rpcadaptors.go | 6 +++--- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/rpcserver/interface.go b/internal/rpcserver/interface.go index dc3ebc24fa..5c046b4a4b 100644 --- a/internal/rpcserver/interface.go +++ b/internal/rpcserver/interface.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 The Decred developers +// Copyright (c) 2019-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -148,7 +148,7 @@ type SyncManager interface { // SubmitBlock submits the provided block to the network after // processing it locally. - SubmitBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) + SubmitBlock(block *dcrutil.Block) (bool, error) // SyncPeerID returns the id of the current peer being synced with. SyncPeerID() int32 diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index b06e21a576..8735580e23 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -3783,7 +3783,7 @@ func handleGetWorkSubmission(_ context.Context, s *Server, hexData string) (inte // Process this block using the same rules as blocks coming from other // nodes. This will in turn relay it to the network like normal. - isOrphan, err := s.cfg.SyncMgr.SubmitBlock(block, blockchain.BFNone) + isOrphan, err := s.cfg.SyncMgr.SubmitBlock(block) if err != nil { // Anything other than a rule violation is an unexpected error, // so return that error as an internal error. @@ -4745,7 +4745,7 @@ func handleSubmitBlock(_ context.Context, s *Server, cmd interface{}) (interface return nil, rpcInternalError(err.Error(), "Block decode") } - _, err = s.cfg.SyncMgr.SubmitBlock(block, blockchain.BFNone) + _, err = s.cfg.SyncMgr.SubmitBlock(block) if err != nil { return fmt.Sprintf("rejected: %v", err), nil } diff --git a/internal/rpcserver/rpcserverhandlers_test.go b/internal/rpcserver/rpcserverhandlers_test.go index eedb74a925..1e3a7b8ae6 100644 --- a/internal/rpcserver/rpcserverhandlers_test.go +++ b/internal/rpcserver/rpcserverhandlers_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 The Decred developers +// Copyright (c) 2020-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -516,7 +516,7 @@ func (s *testSyncManager) IsCurrent() bool { // SubmitBlock provides a mock implementation for submitting the provided block // to the network after processing it locally. -func (s *testSyncManager) SubmitBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { +func (s *testSyncManager) SubmitBlock(block *dcrutil.Block) (bool, error) { return s.isOrphan, s.submitBlockErr } diff --git a/rpcadaptors.go b/rpcadaptors.go index 3f89c6184c..9a90f466c9 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -1,5 +1,5 @@ // Copyright (c) 2017 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -330,8 +330,8 @@ func (b *rpcSyncMgr) IsCurrent() bool { // // This function is safe for concurrent access and is part of the // rpcserver.SyncManager interface implementation. -func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { - return b.syncMgr.ProcessBlock(block, flags) +func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block) (bool, error) { + return b.syncMgr.ProcessBlock(block, blockchain.BFNone) } // SyncPeer returns the id of the current peer being synced with. From 9447247d815de8e80ce6ea879ed69e56afcc4b6c Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 04:37:20 -0600 Subject: [PATCH 03/10] netsync: Remove submit/processblock orphan flag. This removes the orphan return flag from the SubmitBlock and ProcessBlock netsync methods in favor of allowing the caller to detect it from the error. This is a step towards the goal of removing handling of orphan blocks altogether in favor of using block headers for sync. --- internal/mining/cpuminer/cpuminer.go | 17 ++++--- internal/netsync/manager.go | 51 +++++++++----------- internal/rpcserver/interface.go | 2 +- internal/rpcserver/rpcserver.go | 16 +++--- internal/rpcserver/rpcserverhandlers_test.go | 16 +++--- rpcadaptors.go | 2 +- 6 files changed, 52 insertions(+), 52 deletions(-) diff --git a/internal/mining/cpuminer/cpuminer.go b/internal/mining/cpuminer/cpuminer.go index 17ba42ce2d..8951d26df5 100644 --- a/internal/mining/cpuminer/cpuminer.go +++ b/internal/mining/cpuminer/cpuminer.go @@ -1,5 +1,5 @@ // Copyright (c) 2014-2016 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -96,7 +96,7 @@ type Config struct { // ProcessBlock defines the function to call with any solved blocks. // It typically must run the provided block through the same set of // rules and handling as any other block coming from the network. - ProcessBlock func(*dcrutil.Block, blockchain.BehaviorFlags) (bool, error) + ProcessBlock func(*dcrutil.Block, blockchain.BehaviorFlags) error // ConnectedCount defines the function to use to obtain how many other // peers the server is connected to. This is used by the automatic @@ -200,8 +200,14 @@ func (m *CPUMiner) submitBlock(block *dcrutil.Block) bool { // Process this block using the same rules as blocks coming from other // nodes. This will in turn relay it to the network like normal. - isOrphan, err := m.cfg.ProcessBlock(block, blockchain.BFNone) + err := m.cfg.ProcessBlock(block, blockchain.BFNone) if err != nil { + if errors.Is(err, blockchain.ErrMissingParent) { + log.Errorf("Block submitted via CPU miner is an orphan building "+ + "on parent %v", block.MsgBlock().Header.PrevBlock) + return false + } + // Anything other than a rule violation is an unexpected error, // so log that error as an internal error. var rErr blockchain.RuleError @@ -228,11 +234,6 @@ func (m *CPUMiner) submitBlock(block *dcrutil.Block) bool { log.Errorf("Block submitted via CPU miner rejected: %v", err) return false } - if isOrphan { - log.Errorf("Block submitted via CPU miner is an orphan building on "+ - "parent %v", block.MsgBlock().Header.PrevBlock) - return false - } // The block was accepted. coinbaseTxOuts := block.MsgBlock().Transactions[0].TxOut diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index d597f28e8b..b6dcde6f5c 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -128,9 +128,8 @@ type requestFromPeerResponse struct { // processBlockResponse is a response sent to the reply channel of a // processBlockMsg. type processBlockResponse struct { - forkLen int64 - isOrphan bool - err error + forkLen int64 + err error } // processBlockMsg is a message type to be sent across the message channel @@ -894,11 +893,11 @@ func (m *SyncManager) processOrphans(hash *chainhash.Hash, flags blockchain.Beha // When no errors occurred during processing, the first return value indicates // the length of the fork the block extended. In the case it either extended // the best chain or is now the tip of the best chain due to causing a -// reorganize, the fork length will be 0. The second return value indicates -// whether or not the block is an orphan, in which case the fork length will -// also be zero as expected, because it, by definition, does not connect to the -// best chain. -func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, bool, error) { +// reorganize, the fork length will be 0. The fork length will also be zero as +// expected when the block is an orphan, because it, by definition, does not +// connect to the best chain. The caller can determine if the block is an +// orphan by check if the error is blockchain.ErrMissingParent. +func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, error) { // Process the block to include validation, best chain selection, etc. // // Also, keep track of orphan blocks in the sync manager when the error @@ -909,13 +908,9 @@ func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockch log.Infof("Adding orphan block %v with parent %v", blockHash, block.MsgBlock().Header.PrevBlock) m.addOrphanBlock(block) - - // The fork length of orphans is unknown since they, by definition, do - // not connect to the best chain. - return 0, true, nil } if err != nil { - return 0, false, err + return 0, err } m.isCurrentMtx.Lock() m.maybeUpdateIsCurrent() @@ -924,10 +919,10 @@ func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockch // Accept any orphan blocks that depend on this block (they are no longer // orphans) and repeat for those accepted blocks until there are no more. if err := m.processOrphans(blockHash, flags); err != nil { - return 0, false, err + return 0, err } - return forkLen, false, nil + return forkLen, nil } // handleBlockMsg handles block messages from all peers. @@ -978,7 +973,12 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { // Process the block to include validation, best chain selection, orphan // handling, etc. - forkLen, isOrphan, err := m.processBlockAndOrphans(bmsg.block, behaviorFlags) + var isOrphan bool + forkLen, err := m.processBlockAndOrphans(bmsg.block, behaviorFlags) + if errors.Is(err, blockchain.ErrMissingParent) { + isOrphan = true + err = nil + } if err != nil { // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log @@ -1514,18 +1514,16 @@ out: } case processBlockMsg: - forkLen, isOrphan, err := m.processBlockAndOrphans(msg.block, - msg.flags) + forkLen, err := m.processBlockAndOrphans(msg.block, msg.flags) if err != nil { msg.reply <- processBlockResponse{ - forkLen: forkLen, - isOrphan: isOrphan, - err: err, + forkLen: forkLen, + err: err, } continue } - onMainChain := !isOrphan && forkLen == 0 + onMainChain := forkLen == 0 if onMainChain { // Prune invalidated transactions. best := m.cfg.Chain.BestSnapshot() @@ -1535,8 +1533,7 @@ out: } msg.reply <- processBlockResponse{ - isOrphan: isOrphan, - err: nil, + err: nil, } case processTransactionMsg: @@ -1780,7 +1777,7 @@ func (m *SyncManager) requestFromPeer(p *peerpkg.Peer, blocks, voteHashes, // ProcessBlock makes use of ProcessBlock on an internal instance of a block // chain. It is funneled through the sync manager since blockchain is not safe // for concurrent access. -func (m *SyncManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (bool, error) { +func (m *SyncManager) ProcessBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) error { reply := make(chan processBlockResponse, 1) select { case m.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}: @@ -1789,9 +1786,9 @@ func (m *SyncManager) ProcessBlock(block *dcrutil.Block, flags blockchain.Behavi select { case response := <-reply: - return response.isOrphan, response.err + return response.err case <-m.quit: - return false, fmt.Errorf("sync manager stopped") + return fmt.Errorf("sync manager stopped") } } diff --git a/internal/rpcserver/interface.go b/internal/rpcserver/interface.go index 5c046b4a4b..c5cdb7c871 100644 --- a/internal/rpcserver/interface.go +++ b/internal/rpcserver/interface.go @@ -148,7 +148,7 @@ type SyncManager interface { // SubmitBlock submits the provided block to the network after // processing it locally. - SubmitBlock(block *dcrutil.Block) (bool, error) + SubmitBlock(block *dcrutil.Block) error // SyncPeerID returns the id of the current peer being synced with. SyncPeerID() int32 diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index 8735580e23..1e08c3ff5c 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -3783,8 +3783,14 @@ func handleGetWorkSubmission(_ context.Context, s *Server, hexData string) (inte // Process this block using the same rules as blocks coming from other // nodes. This will in turn relay it to the network like normal. - isOrphan, err := s.cfg.SyncMgr.SubmitBlock(block) + err = s.cfg.SyncMgr.SubmitBlock(block) if err != nil { + if errors.Is(err, blockchain.ErrMissingParent) { + log.Infof("Block submitted via getwork rejected: orphan building "+ + "on parent %v", block.MsgBlock().Header.PrevBlock) + return false, nil + } + // Anything other than a rule violation is an unexpected error, // so return that error as an internal error. var rErr blockchain.RuleError @@ -3797,12 +3803,6 @@ func handleGetWorkSubmission(_ context.Context, s *Server, hexData string) (inte return false, nil } - if isOrphan { - log.Infof("Block submitted via getwork rejected: an orphan "+ - "building on parent %v", block.MsgBlock().Header.PrevBlock) - return false, nil - } - // The block was accepted. log.Infof("Block submitted via getwork accepted: %s (height %d)", block.Hash(), msgBlock.Header.Height) @@ -4745,7 +4745,7 @@ func handleSubmitBlock(_ context.Context, s *Server, cmd interface{}) (interface return nil, rpcInternalError(err.Error(), "Block decode") } - _, err = s.cfg.SyncMgr.SubmitBlock(block) + err = s.cfg.SyncMgr.SubmitBlock(block) if err != nil { return fmt.Sprintf("rejected: %v", err), nil } diff --git a/internal/rpcserver/rpcserverhandlers_test.go b/internal/rpcserver/rpcserverhandlers_test.go index 1e3a7b8ae6..0ca81a350c 100644 --- a/internal/rpcserver/rpcserverhandlers_test.go +++ b/internal/rpcserver/rpcserverhandlers_test.go @@ -500,7 +500,6 @@ func (c *testAddrManager) LocalAddresses() []addrmgr.LocalAddr { // SyncManager interface. type testSyncManager struct { isCurrent bool - isOrphan bool submitBlockErr error syncPeerID int32 syncHeight int64 @@ -516,8 +515,8 @@ func (s *testSyncManager) IsCurrent() bool { // SubmitBlock provides a mock implementation for submitting the provided block // to the network after processing it locally. -func (s *testSyncManager) SubmitBlock(block *dcrutil.Block) (bool, error) { - return s.isOrphan, s.submitBlockErr +func (s *testSyncManager) SubmitBlock(block *dcrutil.Block) error { + return s.submitBlockErr } // SyncPeer returns a mocked id of the current peer being synced with. @@ -1584,7 +1583,6 @@ func defaultMockDB() *testDB { // *testSyncManager. func defaultMockSyncManager() *testSyncManager { return &testSyncManager{ - isOrphan: false, syncHeight: 463074, } } @@ -5929,9 +5927,14 @@ func TestHandleGetWork(t *testing.T) { hex.Encode(submissionB, data) submission := string(submissionB) - truncatedSubmission := submission[1:] lessThanGetWorkDataLen := submission[10:] + // Create an orphan block by mutating the prevblock field of the data. + orphanData := make([]byte, len(data)) + copy(orphanData, data) + orphanData[4] ^= 0x55 + orphanSubmission := hex.EncodeToString(orphanData) + buf = &bytes.Buffer{} buf.Write(submissionB[:10]) buf.WriteRune('g') @@ -6086,12 +6089,11 @@ func TestHandleGetWork(t *testing.T) { name: "handleGetWork: submission is an orphan", handler: handleGetWork, cmd: &types.GetWorkCmd{ - Data: &truncatedSubmission, + Data: &orphanSubmission, }, mockMiningState: mine(), mockSyncManager: func() *testSyncManager { syncManager := defaultMockSyncManager() - syncManager.isOrphan = true return syncManager }(), result: false, diff --git a/rpcadaptors.go b/rpcadaptors.go index 9a90f466c9..b7f59c24a6 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -330,7 +330,7 @@ func (b *rpcSyncMgr) IsCurrent() bool { // // This function is safe for concurrent access and is part of the // rpcserver.SyncManager interface implementation. -func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block) (bool, error) { +func (b *rpcSyncMgr) SubmitBlock(block *dcrutil.Block) error { return b.syncMgr.ProcessBlock(block, blockchain.BFNone) } From fea818c297baa55e596c8aab8ab7ae6e6eb4536b Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 04:37:21 -0600 Subject: [PATCH 04/10] netsync: Remove orphan block handling. This removes all orphan block handling from the syncing process and is part of the overall effort to move to full headers-first syncing as opposed to the current partial header-first checkpoint-based process. It should be noted that this change is made in such a way that the syncing process continues to work properly, however, it is only intended to be an intermediate step, and, as such, it does result in suboptimal behavior once it passes the final checkpoint. That will no longer be the case one the entire sync process conversion is complete. --- internal/netsync/manager.go | 334 +++++------------------------------- 1 file changed, 40 insertions(+), 294 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index b6dcde6f5c..982fbd067f 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -34,10 +34,6 @@ const ( // more. minInFlightBlocks = 10 - // maxOrphanBlocks is the maximum number of orphan blocks that can be - // queued. - maxOrphanBlocks = 500 - // maxRejectedTxns is the maximum number of rejected transactions // hashes to store in memory. maxRejectedTxns = 1000 @@ -179,14 +175,6 @@ type syncMgrPeer struct { requestedBlocks map[chainhash.Hash]struct{} } -// orphanBlock represents a block for which the parent is not yet available. It -// is a normal block plus an expiration time to prevent caching the orphan -// forever. -type orphanBlock struct { - block *dcrutil.Block - expiration time.Time -} - // SyncManager provides a concurrency safe sync manager for handling all // incoming blocks. type SyncManager struct { @@ -213,13 +201,6 @@ type SyncManager struct { startHeader *list.Element nextCheckpoint *chaincfg.Checkpoint - // These fields are related to handling of orphan blocks. They are - // protected by the orphan lock. - orphanLock sync.RWMutex - orphans map[chainhash.Hash]*orphanBlock - prevOrphans map[chainhash.Hash][]*orphanBlock - oldestOrphan *orphanBlock - // The following fields are used to track the height being synced to from // peers. syncHeightMtx sync.Mutex @@ -684,135 +665,8 @@ func (m *SyncManager) handleTxMsg(tmsg *txMsg) { m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs) } -// isKnownOrphan returns whether the passed hash is currently a known orphan. -// Keep in mind that only a limited number of orphans are held onto for a -// limited amount of time, so this function must not be used as an absolute way -// to test if a block is an orphan block. A full block (as opposed to just its -// hash) must be passed to ProcessBlock for that purpose. This function -// provides a mechanism for a caller to intelligently detect *recent* duplicate -// orphans and react accordingly. -// -// This function is safe for concurrent access. -func (m *SyncManager) isKnownOrphan(hash *chainhash.Hash) bool { - // Protect concurrent access. Using a read lock only so multiple readers - // can query without blocking each other. - m.orphanLock.RLock() - _, exists := m.orphans[*hash] - m.orphanLock.RUnlock() - return exists -} - -// orphanRoot returns the head of the chain for the provided hash from the map -// of orphan blocks. -// -// This function is safe for concurrent access. -func (m *SyncManager) orphanRoot(hash *chainhash.Hash) *chainhash.Hash { - // Protect concurrent access. Using a read lock only so multiple - // readers can query without blocking each other. - m.orphanLock.RLock() - defer m.orphanLock.RUnlock() - - // Keep looping while the parent of each orphaned block is known and is an - // orphan itself. - orphanRoot := hash - prevHash := hash - for { - orphan, exists := m.orphans[*prevHash] - if !exists { - break - } - orphanRoot = prevHash - prevHash = &orphan.block.MsgBlock().Header.PrevBlock - } - - return orphanRoot -} - -// removeOrphanBlock removes the passed orphan block from the orphan pool and -// previous orphan index. -func (m *SyncManager) removeOrphanBlock(orphan *orphanBlock) { - // Protect concurrent access. - m.orphanLock.Lock() - defer m.orphanLock.Unlock() - - // Remove the orphan block from the orphan pool. - orphanHash := orphan.block.Hash() - delete(m.orphans, *orphanHash) - - // Remove the reference from the previous orphan index too. An indexing - // for loop is intentionally used over a range here as range does not - // reevaluate the slice on each iteration nor does it adjust the index - // for the modified slice. - prevHash := &orphan.block.MsgBlock().Header.PrevBlock - orphans := m.prevOrphans[*prevHash] - for i := 0; i < len(orphans); i++ { - hash := orphans[i].block.Hash() - if hash.IsEqual(orphanHash) { - copy(orphans[i:], orphans[i+1:]) - orphans[len(orphans)-1] = nil - orphans = orphans[:len(orphans)-1] - i-- - } - } - m.prevOrphans[*prevHash] = orphans - - // Remove the map entry altogether if there are no longer any orphans - // which depend on the parent hash. - if len(m.prevOrphans[*prevHash]) == 0 { - delete(m.prevOrphans, *prevHash) - } -} - -// addOrphanBlock adds the passed block (which is already determined to be an -// orphan prior calling this function) to the orphan pool. It lazily cleans up -// any expired blocks so a separate cleanup poller doesn't need to be run. It -// also imposes a maximum limit on the number of outstanding orphan blocks and -// will remove the oldest received orphan block if the limit is exceeded. -func (m *SyncManager) addOrphanBlock(block *dcrutil.Block) { - // Remove expired orphan blocks. - for _, oBlock := range m.orphans { - if time.Now().After(oBlock.expiration) { - m.removeOrphanBlock(oBlock) - continue - } - - // Update the oldest orphan block pointer so it can be discarded - // in case the orphan pool fills up. - if m.oldestOrphan == nil || - oBlock.expiration.Before(m.oldestOrphan.expiration) { - m.oldestOrphan = oBlock - } - } - - // Limit orphan blocks to prevent memory exhaustion. - if len(m.orphans)+1 > maxOrphanBlocks { - // Remove the oldest orphan to make room for the new one. - m.removeOrphanBlock(m.oldestOrphan) - m.oldestOrphan = nil - } - - // Protect concurrent access. This is intentionally done here instead - // of near the top since removeOrphanBlock does its own locking and - // the range iterator is not invalidated by removing map entries. - m.orphanLock.Lock() - defer m.orphanLock.Unlock() - - // Insert the block into the orphan map with an expiration time - // 1 hour from now. - expiration := time.Now().Add(time.Hour) - oBlock := &orphanBlock{ - block: block, - expiration: expiration, - } - m.orphans[*block.Hash()] = oBlock - - // Add to previous hash lookup index for faster dependency lookups. - prevHash := &block.MsgBlock().Header.PrevBlock - m.prevOrphans[*prevHash] = append(m.prevOrphans[*prevHash], oBlock) -} - -// maybeUpdateIsCurrent potentially updates the manager to signal it believes the -// chain is considered synced. +// maybeUpdateIsCurrent potentially updates the manager to signal it believes +// the chain is considered synced. // // This function MUST be called with the is current mutex held (for writes). func (m *SyncManager) maybeUpdateIsCurrent() { @@ -830,85 +684,16 @@ func (m *SyncManager) maybeUpdateIsCurrent() { } } -// processOrphans determines if there are any orphans which depend on the passed -// block hash (they are no longer orphans if true) and potentially accepts them. -// It repeats the process for the newly accepted blocks (to detect further -// orphans which may no longer be orphans) until there are no more. -// -// The flags do not modify the behavior of this function directly, however they -// are needed to pass along to maybeAcceptBlock. -func (m *SyncManager) processOrphans(hash *chainhash.Hash, flags blockchain.BehaviorFlags) error { - // Start with processing at least the passed hash. Leave a little room for - // additional orphan blocks that need to be processed without needing to - // grow the array in the common case. - processHashes := make([]*chainhash.Hash, 0, 10) - processHashes = append(processHashes, hash) - for len(processHashes) > 0 { - // Pop the first hash to process from the slice. - processHash := processHashes[0] - processHashes[0] = nil // Prevent GC leak. - processHashes = processHashes[1:] - - // Look up all orphans that are parented by the block we just accepted. - // This will typically only be one, but it could be multiple if multiple - // blocks are mined and broadcast around the same time. The one with - // the most proof of work will eventually win out. An indexing for loop - // is intentionally used over a range here as range does not reevaluate - // the slice on each iteration nor does it adjust the index for the - // modified slice. - for i := 0; i < len(m.prevOrphans[*processHash]); i++ { - orphan := m.prevOrphans[*processHash][i] - if orphan == nil { - log.Warnf("Found a nil entry at index %d in the orphan "+ - "dependency list for block %v", i, processHash) - continue - } - - // Remove the orphan from the orphan pool. - orphanHash := orphan.block.Hash() - m.removeOrphanBlock(orphan) - i-- - - // Potentially accept the block into the block chain. - _, err := m.cfg.Chain.ProcessBlock(orphan.block, flags) - if err != nil { - return err - } - m.isCurrentMtx.Lock() - m.maybeUpdateIsCurrent() - m.isCurrentMtx.Unlock() - - // Add this block to the list of blocks to process so any orphan - // blocks that depend on this block are handled too. - processHashes = append(processHashes, orphanHash) - } - } - return nil -} - -// processBlockAndOrphans processes the provided block using the internal chain -// instance while keeping track of orphan blocks and also processing any orphans -// that depend on the passed block to potentially accept as well. +// processBlock processes the provided block using the internal chain instance. // // When no errors occurred during processing, the first return value indicates // the length of the fork the block extended. In the case it either extended // the best chain or is now the tip of the best chain due to causing a -// reorganize, the fork length will be 0. The fork length will also be zero as -// expected when the block is an orphan, because it, by definition, does not -// connect to the best chain. The caller can determine if the block is an -// orphan by check if the error is blockchain.ErrMissingParent. -func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, error) { +// reorganize, the fork length will be 0. Orphans are rejected and can be +// detected by checking if the error is blockchain.ErrMissingParent. +func (m *SyncManager) processBlock(block *dcrutil.Block, flags blockchain.BehaviorFlags) (int64, error) { // Process the block to include validation, best chain selection, etc. - // - // Also, keep track of orphan blocks in the sync manager when the error - // returned indicates the block is an orphan. - blockHash := block.Hash() forkLen, err := m.cfg.Chain.ProcessBlock(block, flags) - if errors.Is(err, blockchain.ErrMissingParent) { - log.Infof("Adding orphan block %v with parent %v", blockHash, - block.MsgBlock().Header.PrevBlock) - m.addOrphanBlock(block) - } if err != nil { return 0, err } @@ -916,12 +701,6 @@ func (m *SyncManager) processBlockAndOrphans(block *dcrutil.Block, flags blockch m.maybeUpdateIsCurrent() m.isCurrentMtx.Unlock() - // Accept any orphan blocks that depend on this block (they are no longer - // orphans) and repeat for those accepted blocks until there are no more. - if err := m.processOrphans(blockHash, flags); err != nil { - return 0, err - } - return forkLen, nil } @@ -971,15 +750,14 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { delete(peer.requestedBlocks, *blockHash) delete(m.requestedBlocks, *blockHash) - // Process the block to include validation, best chain selection, orphan - // handling, etc. - var isOrphan bool - forkLen, err := m.processBlockAndOrphans(bmsg.block, behaviorFlags) - if errors.Is(err, blockchain.ErrMissingParent) { - isOrphan = true - err = nil - } + // Process the block to include validation, best chain selection, etc. + forkLen, err := m.processBlock(bmsg.block, behaviorFlags) if err != nil { + // Ignore orphan blocks. + if errors.Is(err, blockchain.ErrMissingParent) { + return + } + // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log // it as such. Otherwise, something really did go wrong, so log @@ -1001,46 +779,32 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { return } - // Request the parents for the orphan block from the peer that sent it. - onMainChain := !isOrphan && forkLen == 0 - if isOrphan { - orphanRoot := m.orphanRoot(blockHash) - blkLocator := m.cfg.Chain.LatestBlockLocator() - locator := chainBlockLocatorToHashes(blkLocator) - err := peer.PushGetBlocksMsg(locator, orphanRoot) - if err != nil { - log.Warnf("Failed to push getblocksmsg for the latest block: %v", - err) - } - } else { - // When the block is not an orphan, log information about it and - // update the chain state. - msgBlock := bmsg.block.MsgBlock() - forceLog := int64(msgBlock.Header.Height) >= m.SyncHeight() - m.progressLogger.LogProgress(msgBlock, forceLog) - - if onMainChain { - // Notify stake difficulty subscribers and prune invalidated - // transactions. - best := m.cfg.Chain.BestSnapshot() - m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) - m.cfg.TxMemPool.PruneExpiredTx() + // Log information about the block and update the chain state. + msgBlock := bmsg.block.MsgBlock() + forceLog := int64(msgBlock.Header.Height) >= m.SyncHeight() + m.progressLogger.LogProgress(msgBlock, forceLog) - // Clear the rejected transactions. - m.rejectedTxns = make(map[chainhash.Hash]struct{}) - } + onMainChain := forkLen == 0 + if onMainChain { + // Prune invalidated transactions. + best := m.cfg.Chain.BestSnapshot() + m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) + m.cfg.TxMemPool.PruneExpiredTx() + + // Clear the rejected transactions. + m.rejectedTxns = make(map[chainhash.Hash]struct{}) } // Update the latest block height for the peer to avoid stale heights when // looking for future potential sync node candidacy. // - // Also, when the block is an orphan or the chain is considered current and - // the block was accepted to the main chain, update the heights of other - // peers whose invs may have been ignored when actively syncing while the - // chain was not yet current or lost the lock announcement race. + // Also, when the chain is considered current and the block was accepted to + // the main chain, update the heights of other peers whose invs may have + // been ignored when actively syncing while the chain was not yet current or + // lost the lock announcement race. blockHeight := int64(bmsg.block.MsgBlock().Header.Height) peer.UpdateLastBlockHeight(blockHeight) - if isOrphan || (onMainChain && m.IsCurrent()) { + if onMainChain && m.IsCurrent() { for _, p := range m.peers { // The height for the sending peer is already updated. if p == peer { @@ -1277,7 +1041,7 @@ func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { // needBlock returns whether or not the block needs to be downloaded. For // example, it does not need to be downloaded when it is already known. func (m *SyncManager) needBlock(hash *chainhash.Hash) bool { - return !m.isKnownOrphan(hash) && !m.cfg.Chain.HaveBlock(hash) + return !m.cfg.Chain.HaveBlock(hash) } // needTx returns whether or not the transaction needs to be downloaded. For @@ -1374,26 +1138,6 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { } if lastBlock != nil { - // When the block is an orphan that we already have, the missing parent - // blocks were requested when the orphan was processed. In that case, - // there were more blocks missing than are allowed into a single - // inventory message. As a result, once this peer requested the final - // advertised block, the remote peer noticed and is now resending the - // orphan block as an available block to signal there are more missing - // blocks that need to be requested. - if m.isKnownOrphan(&lastBlock.Hash) { - // Request blocks starting at the latest known up to the root of the - // orphan that just came in. - orphanRoot := m.orphanRoot(&lastBlock.Hash) - blkLocator := m.cfg.Chain.LatestBlockLocator() - locator := chainBlockLocatorToHashes(blkLocator) - err := peer.PushGetBlocksMsg(locator, orphanRoot) - if err != nil { - log.Errorf("Failed to push getblocksmsg for orphan chain: %v", - err) - } - } - // Update the last announced block to the final one in the announced // inventory above (if any). In the case the header for that block is // already known, use that information to update the height for the peer @@ -1514,7 +1258,7 @@ out: } case processBlockMsg: - forkLen, err := m.processBlockAndOrphans(msg.block, msg.flags) + forkLen, err := m.processBlock(msg.block, msg.flags) if err != nil { msg.reply <- processBlockResponse{ forkLen: forkLen, @@ -1643,8 +1387,12 @@ func (m *SyncManager) RequestFromPeer(p *peerpkg.Peer, blocks, voteHashes, reply := make(chan requestFromPeerResponse, 1) select { - case m.msgChan <- requestFromPeerMsg{peer: p, blocks: blocks, - voteHashes: voteHashes, tSpendHashes: tSpendHashes, reply: reply}: + case m.msgChan <- requestFromPeerMsg{ + peer: p, + blocks: blocks, + voteHashes: voteHashes, + tSpendHashes: tSpendHashes, + reply: reply}: case <-m.quit: } @@ -1676,7 +1424,7 @@ func (m *SyncManager) requestFromPeer(p *peerpkg.Peer, blocks, voteHashes, } // Skip the block when it is already known. - if m.isKnownOrphan(bh) || m.cfg.Chain.HaveBlock(bh) { + if m.cfg.Chain.HaveBlock(bh) { continue } @@ -1905,8 +1653,6 @@ func New(config *Config) *SyncManager { msgChan: make(chan interface{}, config.MaxPeers*3), headerList: list.New(), quit: make(chan struct{}), - orphans: make(map[chainhash.Hash]*orphanBlock), - prevOrphans: make(map[chainhash.Hash][]*orphanBlock), isCurrent: config.Chain.IsCurrent(), } From accb6c9472c4f18ae50671bbfdf8a9623949a57b Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 04:37:22 -0600 Subject: [PATCH 05/10] netsync: Rework sync model to use hdr annoucements. This overhauls the primary code that deals with synchronizing the blockchain with other peers on the network to use a model based on header announcements instead of inventory vectors. Currently, all blocks are discovered via a combination of inventory vector announcements and the getblocks protocol message, both of which only include the hash of the blocks in question. This is not ideal because it means blocks are blindly downloaded based on those announcements without knowing if they're actually likely to be something that is useful since there is no further information such as what blocks they connect to. It also means that extra logic is needed to deal with orphan blocks (those whose parents are not yet known) such as caching them, determining known orphan roots, and expiration. In short, the current method generally ends up wasting bandwidth and also makes it much more difficult to detect certain classes of malicious behavior. The recently added capability of blockchain to process headers independently from blocks while the block data is added out of order later opened the door for a much better model that addresses all of the aforementioned issues as well as paves the way for many other related enhancements. The new model discovers and downloads all of the headers prior to any block data via the getheaders protocol message and then uses those headers to determine exactly which blocks need to be downloaded to reach the tip of the best chain. Notably, this means orphan blocks are now a thing of the past as blocks that do not connect are no longer downloaded under any circumstance. Further, the new model also makes use of sendheaders so that all block announcements are made via the associated block header. This in turn is used to better determine if an announced block is likely to be useful prior to downloading it. It should be noted that the changes herein are intentionally limited to those necessary to use the new sync model based on headers in an incremental fashion to help simplify review and make it easier to assert correctness. There are many more improvements that this model paves the way to support planned for future commits. For example, syncing from multiple peers in parallel and improved DoS protection. The following is a high-level overview of the key features: - All headers are downloaded and examined prior to downloading any blocks - The initial header sync process: - Detects and recovers from stalled/malicious peers - The concept of orphan blocks no longer exists - This means blocks which are not already known to connect are not downloaded - All block announcements are handled via header announcements - Detects and prevents malicious behavior related to orphan headers - The chain sync process: - Starts once the headers are downloaded and entails both downloading blocks as well as verifying them - Uses the headers to determine the best blocks to download - Pipelines the requests to increase throughput - Actively avoids downloading duplicate blocks - The sync height is dynamically updated as new headers are discovered --- blockchain/chain.go | 33 +- blockchain/chainquery.go | 81 +++- internal/netsync/manager.go | 834 +++++++++++++++++++----------------- peer/log.go | 10 +- server.go | 22 +- 5 files changed, 575 insertions(+), 405 deletions(-) diff --git a/blockchain/chain.go b/blockchain/chain.go index d1d984b935..f047c66483 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -348,6 +348,19 @@ func (b *BlockChain) DisableVerify(disable bool) { b.chainLock.Unlock() } +// HaveHeader returns whether or not the chain instance has the block header +// represented by the passed hash. Note that this will return true for both the +// main chain and any side chains. +// +// This function is safe for concurrent access. +func (b *BlockChain) HaveHeader(hash *chainhash.Hash) bool { + b.index.RLock() + node := b.index.index[*hash] + headerKnown := node != nil + b.index.RUnlock() + return headerKnown +} + // HaveBlock returns whether or not the chain instance has the block represented // by the passed hash. This includes checking the various places a block can // be like part of the main chain or on a side chain. @@ -1436,7 +1449,7 @@ func (b *BlockChain) isOldTimestamp(node *blockNode) bool { } // maybeUpdateIsCurrent potentially updates whether or not the chain believes it -// is current. +// is current using the provided best chain tip. // // It makes use of a latching approach such that once the chain becomes current // it will only switch back to false in the case no new blocks have been seen @@ -1474,7 +1487,20 @@ func (b *BlockChain) maybeUpdateIsCurrent(curBest *blockNode) { log.Debugf("Chain latched to current at block %s (height %d)", curBest.hash, curBest.height) } +} +// MaybeUpdateIsCurrent potentially updates whether or not the chain believes it +// is current. +// +// It makes use of a latching approach such that once the chain becomes current +// it will only switch back to false in the case no new blocks have been seen +// for an extended period of time. +// +// This function is safe for concurrent access. +func (b *BlockChain) MaybeUpdateIsCurrent() { + b.chainLock.Lock() + b.maybeUpdateIsCurrent(b.bestChain.Tip()) + b.chainLock.Unlock() } // isCurrent returns whether or not the chain believes it is current based on @@ -2189,9 +2215,8 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { b.dbInfo.bidxVer) tip := b.bestChain.Tip() - log.Infof("Chain state: height %d, hash %v, total transactions %d, "+ - "work %v, stake version %v", tip.height, tip.hash, - b.stateSnapshot.TotalTxns, tip.workSum, 0) + log.Infof("Chain state: height %d, hash %v, total transactions %d, work %v", + tip.height, tip.hash, b.stateSnapshot.TotalTxns, tip.workSum) return &b, nil } diff --git a/blockchain/chainquery.go b/blockchain/chainquery.go index dd3ffe33e6..1ce5d03eab 100644 --- a/blockchain/chainquery.go +++ b/blockchain/chainquery.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2020 The Decred developers +// Copyright (c) 2018-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -154,3 +154,82 @@ func (b *BlockChain) BestInvalidHeader() chainhash.Hash { b.index.RUnlock() return hash } + +// NextNeededBlocks returns hashes for the next blocks after the current best +// chain tip that are needed to make progress towards the current best known +// header skipping any blocks that are already known or in the provided map of +// blocks to exclude. Typically the caller would want to exclude all blocks +// that have outstanding requests. +// +// The maximum number of results is limited to the provided value or the maximum +// allowed by the internal lookahead buffer in the case the requested number of +// max results exceeds that value. +// +// This function is safe for concurrent access. +func (b *BlockChain) NextNeededBlocks(maxResults uint8, exclude map[chainhash.Hash]struct{}) []*chainhash.Hash { + // Nothing to do when no results are requested. + if maxResults == 0 { + return nil + } + + // Determine the common ancestor between the current best chain tip and the + // current best known header. In practice this should never be nil because + // orphan headers are not allowed into the block index, but be paranoid and + // check anyway in case things change in the future. + b.index.RLock() + bestHeader := b.index.bestHeader + fork := b.bestChain.FindFork(bestHeader) + if fork == nil { + b.index.RUnlock() + return nil + } + + // Determine the final block to consider for determining the next needed + // blocks by determining the descendants of the current best chain tip on + // the branch that leads to the best known header while clamping the number + // of descendants to consider to a lookahead buffer. + const lookaheadBuffer = 512 + numBlocksToConsider := bestHeader.height - fork.height + if numBlocksToConsider == 0 { + b.index.RUnlock() + return nil + } + if numBlocksToConsider > lookaheadBuffer { + bestHeader = bestHeader.Ancestor(fork.height + lookaheadBuffer) + numBlocksToConsider = lookaheadBuffer + } + + // Walk backwards from the final block to consider to the current best chain + // tip excluding any blocks that already have their data available or that + // the caller asked to be excluded (likely because they've already been + // requested). + neededBlocks := make([]*chainhash.Hash, 0, numBlocksToConsider) + for node := bestHeader; node != nil && node != fork; node = node.parent { + _, isExcluded := exclude[node.hash] + if isExcluded || node.status.HaveData() { + continue + } + + neededBlocks = append(neededBlocks, &node.hash) + } + b.index.RUnlock() + + // Reverse the needed blocks so they are in forwards order. + reverse := func(s []*chainhash.Hash) { + slen := len(s) + for i := 0; i < slen/2; i++ { + s[i], s[slen-1-i] = s[slen-1-i], s[i] + } + } + reverse(neededBlocks) + + // Clamp the number of results to the lower of the requested max or number + // available. + if int64(maxResults) > numBlocksToConsider { + maxResults = uint8(numBlocksToConsider) + } + if uint16(len(neededBlocks)) > uint16(maxResults) { + neededBlocks = neededBlocks[:maxResults] + } + return neededBlocks +} diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 982fbd067f..cda1fbf286 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -6,10 +6,10 @@ package netsync import ( - "container/list" "context" "errors" "fmt" + "math" "runtime/debug" "sync" "time" @@ -30,10 +30,13 @@ import ( const ( // minInFlightBlocks is the minimum number of blocks that should be - // in the request queue for headers-first mode before requesting - // more. + // in the request queue before requesting more. minInFlightBlocks = 10 + // maxInFlightBlocks is the maximum number of blocks to allow in the sync + // peer request queue. + maxInFlightBlocks = 16 + // maxRejectedTxns is the maximum number of rejected transactions // hashes to store in memory. maxRejectedTxns = 1000 @@ -45,6 +48,21 @@ const ( // maxRequestedTxns is the maximum number of requested transactions // hashes to store in memory. maxRequestedTxns = wire.MaxInvPerMsg + + // maxExpectedHeaderAnnouncementsPerMsg is the maximum number of headers in + // a single message that is expected when determining when the message + // appears to be announcing new blocks. + maxExpectedHeaderAnnouncementsPerMsg = 12 + + // maxConsecutiveOrphanHeaders is the maximum number of consecutive header + // messages that contain headers which do not connect a peer can send before + // it is deemed to have diverged so far it is no longer useful. + maxConsecutiveOrphanHeaders = 10 + + // headerSyncStallTimeoutSecs is the number of seconds to wait for progress + // during the header sync process before stalling the sync and disconnecting + // the peer. + headerSyncStallTimeoutSecs = (3 + wire.MaxBlockHeadersPerMsg/1000) * 2 ) // zeroHash is the zero value hash (all zeros). It is defined as a convenience. @@ -158,13 +176,6 @@ type processTransactionMsg struct { reply chan processTransactionResponse } -// headerNode is used as a node in a list of headers that are linked together -// between checkpoints. -type headerNode struct { - height int64 - hash *chainhash.Hash -} - // syncMgrPeer extends a peer to maintain additional state maintained by the // sync manager. type syncMgrPeer struct { @@ -173,6 +184,71 @@ type syncMgrPeer struct { syncCandidate bool requestedTxns map[chainhash.Hash]struct{} requestedBlocks map[chainhash.Hash]struct{} + + // numConsecutiveOrphanHeaders tracks the number of consecutive header + // messages sent by the peer that contain headers which do not connect. It + // is used to detect peers that have either diverged so far they are no + // longer useful or are otherwise being malicious. + numConsecutiveOrphanHeaders int32 +} + +// headerSyncState houses the state used to track the header sync progress and +// related stall handling. +type headerSyncState struct { + // headersSynced tracks whether or not the headers are synced to a point + // that is recent enough to start downloading blocks. + headersSynced bool + + // These fields are used to implement a progress stall timeout that can be + // reset at any time without needing to create a new one and the associated + // extra garbage. + // + // stallTimer is an underlying timer that is used to implement the timeout. + // + // stallChanDrained indicates whether or not the channel for the stall timer + // has already been read and is used when resetting the timer to ensure the + // channel is drained when the timer is stopped as described in the timer + // documentation. + stallTimer *time.Timer + stallChanDrained bool +} + +// makeHeaderSyncState returns a header sync state that is ready to use. +func makeHeaderSyncState() headerSyncState { + stallTimer := time.NewTimer(math.MaxInt64) + stallTimer.Stop() + return headerSyncState{ + stallTimer: stallTimer, + stallChanDrained: true, + } +} + +// stopStallTimeout stops the progress stall timer while ensuring to read from +// the timer's channel in the case the timer already expired which can happen +// due to the fact the stop happens in between channel reads. This behavior is +// well documented in the Timer docs. +// +// NOTE: This function must not be called concurrent with any other receives on +// the timer's channel. +func (state *headerSyncState) stopStallTimeout() { + t := state.stallTimer + if !t.Stop() && !state.stallChanDrained { + <-t.C + } + state.stallChanDrained = true +} + +// resetStallTimeout resets the progress stall timer while ensuring to read from +// the timer's channel in the case the timer already expired which can happen +// due to the fact the reset happens in between channel reads. This behavior is +// well documented in the Timer docs. +// +// NOTE: This function must not be called concurrent with any other receives on +// the timer's channel. +func (state *headerSyncState) resetStallTimeout() { + state.stopStallTimeout() + state.stallTimer.Reset(headerSyncStallTimeoutSecs * time.Second) + state.stallChanDrained = false } // SyncManager provides a concurrency safe sync manager for handling all @@ -195,11 +271,9 @@ type SyncManager struct { msgChan chan interface{} peers map[*peerpkg.Peer]*syncMgrPeer - // The following fields are used for headers-first mode. - headersFirstMode bool - headerList *list.List - startHeader *list.Element - nextCheckpoint *chaincfg.Checkpoint + // hdrSyncState houses the state used to track the initial header sync + // process and related stall handling. + hdrSyncState headerSyncState // The following fields are used to track the height being synced to from // peers. @@ -225,55 +299,12 @@ func lookupPeer(peer *peerpkg.Peer, peers map[*peerpkg.Peer]*syncMgrPeer) *syncM return sp } -// resetHeaderState sets the headers-first mode state to values appropriate for -// syncing from a new peer. -func (m *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int64) { - m.headersFirstMode = false - m.headerList.Init() - m.startHeader = nil - - // When there is a next checkpoint, add an entry for the latest known - // block into the header pool. This allows the next downloaded header - // to prove it links to the chain properly. - if m.nextCheckpoint != nil { - node := headerNode{height: newestHeight, hash: newestHash} - m.headerList.PushBack(&node) - } -} - // SyncHeight returns latest known block being synced to. func (m *SyncManager) SyncHeight() int64 { m.syncHeightMtx.Lock() - defer m.syncHeightMtx.Unlock() - return m.syncHeight -} - -// findNextHeaderCheckpoint returns the next checkpoint after the passed height. -// It returns nil when there is not one either because the height is already -// later than the final checkpoint or some other reason such as disabled -// checkpoints. -func (m *SyncManager) findNextHeaderCheckpoint(height int64) *chaincfg.Checkpoint { - checkpoints := m.cfg.Chain.Checkpoints() - if len(checkpoints) == 0 { - return nil - } - - // There is no next checkpoint if the height is already after the final - // checkpoint. - finalCheckpoint := &checkpoints[len(checkpoints)-1] - if height >= finalCheckpoint.Height { - return nil - } - - // Find the next checkpoint. - nextCheckpoint := finalCheckpoint - for i := len(checkpoints) - 2; i >= 0; i-- { - if height >= checkpoints[i].Height { - break - } - nextCheckpoint = &checkpoints[i] - } - return nextCheckpoint + syncHeight := m.syncHeight + m.syncHeightMtx.Unlock() + return syncHeight } // chainBlockLocatorToHashes converts a block locator from chain to a slice @@ -290,17 +321,61 @@ func chainBlockLocatorToHashes(locator blockchain.BlockLocator) []chainhash.Hash return result } +// fetchNextBlocks creates and sends a request to the provided peer for the next +// blocks to be downloaded based on the current headers. +func (m *SyncManager) fetchNextBlocks(peer *syncMgrPeer) { + // Nothing to do if the target maximum number of blocks to request from the + // peer at the same time are already in flight. + numInFlight := len(peer.requestedBlocks) + if numInFlight >= maxInFlightBlocks { + return + } + + // Determine the next blocks to download based on the final block that has + // already been requested and the next blocks in the branch leading up to + // best known header. + chain := m.cfg.Chain + maxNeeded := uint8(maxInFlightBlocks - numInFlight) + neededBlocks := chain.NextNeededBlocks(maxNeeded, m.requestedBlocks) + if len(neededBlocks) == 0 { + return + } + + // Ensure the number of needed blocks does not exceed the max inventory + // per message. This should never happen because the constants above limit + // it to relatively small values. However, the code below relies on this + // assumption, so assert it. + if len(neededBlocks) > wire.MaxInvPerMsg { + log.Warnf("%d needed blocks exceeds max allowed per message", + len(neededBlocks)) + return + } + + // Build and send a getdata request for needed blocks. + gdmsg := wire.NewMsgGetDataSizeHint(uint(len(neededBlocks))) + for i := range neededBlocks { + hash := neededBlocks[i] + iv := wire.NewInvVect(wire.InvTypeBlock, hash) + m.requestedBlocks[*hash] = struct{}{} + peer.requestedBlocks[*hash] = struct{}{} + gdmsg.AddInvVect(iv) + } + peer.QueueMessage(gdmsg, nil) + +} + // startSync will choose the best peer among the available candidate peers to // download/sync the blockchain from. When syncing is already running, it // simply returns. It also examines the candidates for any which are no longer // candidates and removes them as needed. func (m *SyncManager) startSync() { - // Return now if we're already syncing. + // Nothing more to do when already syncing. if m.syncPeer != nil { return } - best := m.cfg.Chain.BestSnapshot() + chain := m.cfg.Chain + best := chain.BestSnapshot() var bestPeer *syncMgrPeer for _, peer := range m.peers { if !peer.syncCandidate { @@ -318,7 +393,7 @@ func (m *SyncManager) startSync() { continue } - // the best sync candidate is the most updated peer + // The best sync candidate is the most updated peer. if bestPeer == nil { bestPeer = peer } @@ -330,77 +405,78 @@ func (m *SyncManager) startSync() { // Update the state of whether or not the manager believes the chain is // fully synced to whatever the chain believes when there is no candidate // for a sync peer. + // + // Also, return now when there isn't a sync peer candidate as there is + // nothing more to do without one. if bestPeer == nil { m.isCurrentMtx.Lock() - m.isCurrent = m.cfg.Chain.IsCurrent() + m.isCurrent = chain.IsCurrent() m.isCurrentMtx.Unlock() + log.Warnf("No sync peer candidates available") + return } - // Start syncing from the best peer if one was selected. - if bestPeer != nil { - // Clear the requestedBlocks if the sync peer changes, otherwise - // we may ignore blocks we need that the last sync peer failed - // to send. - m.requestedBlocks = make(map[chainhash.Hash]struct{}) + // Start syncing from the best peer. - blkLocator := m.cfg.Chain.LatestBlockLocator() - locator := chainBlockLocatorToHashes(blkLocator) + // Clear the requestedBlocks if the sync peer changes, otherwise + // we may ignore blocks we need that the last sync peer failed + // to send. + m.requestedBlocks = make(map[chainhash.Hash]struct{}) - log.Infof("Syncing to block height %d from peer %v", - bestPeer.LastBlock(), bestPeer.Addr()) + syncHeight := bestPeer.LastBlock() - // The chain is not synced whenever the current best height is less than - // the height to sync to. - if best.Height < bestPeer.LastBlock() { - m.isCurrentMtx.Lock() - m.isCurrent = false - m.isCurrentMtx.Unlock() - } + headersSynced := m.hdrSyncState.headersSynced + if !headersSynced { + log.Infof("Syncing headers to block height %d from peer %v", syncHeight, + bestPeer) + } - // When the current height is less than a known checkpoint we - // can use block headers to learn about which blocks comprise - // the chain up to the checkpoint and perform less validation - // for them. This is possible since each header contains the - // hash of the previous header and a merkle root. Therefore if - // we validate all of the received headers link together - // properly and the checkpoint hashes match, we can be sure the - // hashes for the blocks in between are accurate. Further, once - // the full blocks are downloaded, the merkle root is computed - // and compared against the value in the header which proves the - // full block hasn't been tampered with. - // - // Once we have passed the final checkpoint, or checkpoints are - // disabled, use standard inv messages learn about the blocks - // and fully validate them. Finally, regression test mode does - // not support the headers-first approach so do normal block - // downloads when in regression test mode. - if m.nextCheckpoint != nil && - best.Height < m.nextCheckpoint.Height && - !m.cfg.DisableCheckpoints { - - err := bestPeer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) - if err != nil { - log.Errorf("Failed to push getheadermsg for the latest "+ - "blocks: %v", err) - return - } - m.headersFirstMode = true - log.Infof("Downloading headers for blocks %d to %d from peer %s", - best.Height+1, m.nextCheckpoint.Height, bestPeer.Addr()) - } else { - err := bestPeer.PushGetBlocksMsg(locator, &zeroHash) - if err != nil { - log.Errorf("Failed to push getblocksmsg for the latest "+ - "blocks: %v", err) - return - } - } - m.syncPeer = bestPeer - m.syncHeightMtx.Lock() - m.syncHeight = bestPeer.LastBlock() - m.syncHeightMtx.Unlock() - } else { - log.Warnf("No sync peer candidates available") + // The chain is not synced whenever the current best height is less than the + // height to sync to. + if best.Height < syncHeight { + m.isCurrentMtx.Lock() + m.isCurrent = false + m.isCurrentMtx.Unlock() + } + + // Request headers to discover any blocks that are not already known + // starting from the parent of the best known header for the local chain. + // The parent is used as a means to accurately discover the best known block + // of the remote peer in the case both tips are the same where it would + // otherwise result in an empty response. + bestHeaderHash, _ := chain.BestHeader() + parentHash := bestHeaderHash + header, err := chain.HeaderByHash(&bestHeaderHash) + if err == nil { + parentHash = header.PrevBlock + } + blkLocator := chain.BlockLocatorFromHash(&parentHash) + locator := chainBlockLocatorToHashes(blkLocator) + bestPeer.PushGetHeadersMsg(locator, &zeroHash) + + // Track the sync peer and update the sync height when it is higher than the + // currently best known value. + m.syncPeer = bestPeer + m.syncHeightMtx.Lock() + if syncHeight > m.syncHeight { + m.syncHeight = syncHeight + } + m.syncHeightMtx.Unlock() + + // Start the header sync progress stall timeout when the initial headers + // sync is not already done. + if !headersSynced { + m.hdrSyncState.resetStallTimeout() + } + + // Download any blocks needed to catch the local chain up to the best + // known header (if any) when the initial headers sync is already done. + // + // This is done in addition to the header request above to avoid waiting + // for the round trip when there are still blocks that are needed + // regardless of the headers response. + if headersSynced { + m.fetchNextBlocks(m.syncPeer) } } @@ -507,28 +583,23 @@ func (m *SyncManager) handleDonePeerMsg(p *peerpkg.Peer) { delete(m.peers, p) // Remove requested transactions from the global map so that they will - // be fetched from elsewhere next time we get an inv. + // be fetched from elsewhere. for txHash := range peer.requestedTxns { delete(m.requestedTxns, txHash) } // Remove requested blocks from the global map so that they will be - // fetched from elsewhere next time we get an inv. + // fetched from elsewhere. // TODO(oga) we could possibly here check which peers have these blocks // and request them now to speed things up a little. for blockHash := range peer.requestedBlocks { delete(m.requestedBlocks, blockHash) } - // Attempt to find a new peer to sync from if the quitting peer is the - // sync peer. Also, reset the headers-first state if in headers-first - // mode so + // Attempt to find a new peer to sync from and reset the final requested + // block when the quitting peer is the sync peer. if m.syncPeer == peer { m.syncPeer = nil - if m.headersFirstMode { - best := m.cfg.Chain.BestSnapshot() - m.resetHeaderState(&best.Hash, best.Height) - } m.startSync() } } @@ -715,53 +786,33 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { blockHash := bmsg.block.Hash() if _, exists := peer.requestedBlocks[*blockHash]; !exists { log.Warnf("Got unrequested block %v from %s -- disconnecting", - blockHash, peer.Addr()) + blockHash, peer) peer.Disconnect() return } - // When in headers-first mode, if the block matches the hash of the - // first header in the list of headers that are being fetched, it's - // eligible for less validation since the headers have already been - // verified to link together and are valid up to the next checkpoint. - // Also, remove the list entry for all blocks except the checkpoint - // since it is needed to verify the next round of headers links - // properly. - isCheckpointBlock := false - behaviorFlags := blockchain.BFNone - if m.headersFirstMode { - firstNodeEl := m.headerList.Front() - if firstNodeEl != nil { - firstNode := firstNodeEl.Value.(*headerNode) - if blockHash.IsEqual(firstNode.hash) { - behaviorFlags |= blockchain.BFFastAdd - if firstNode.hash.IsEqual(m.nextCheckpoint.Hash) { - isCheckpointBlock = true - } else { - m.headerList.Remove(firstNodeEl) - } - } - } - } - - // Remove block from request maps. Either chain will know about it and - // so we shouldn't have any more instances of trying to fetch it, or we - // will fail the insert and thus we'll retry next time we get an inv. + // Process the block to include validation, best chain selection, etc. + // + // Also, remove the block from the request maps once it has been processed. + // This ensures chain is aware of the block before it is removed from the + // maps in order to help prevent duplicate requests. + forkLen, err := m.processBlock(bmsg.block, blockchain.BFNone) delete(peer.requestedBlocks, *blockHash) delete(m.requestedBlocks, *blockHash) - - // Process the block to include validation, best chain selection, etc. - forkLen, err := m.processBlock(bmsg.block, behaviorFlags) if err != nil { - // Ignore orphan blocks. - if errors.Is(err, blockchain.ErrMissingParent) { + // Ideally there should never be any requests for duplicate blocks, but + // ignore any that manage to make it through. + if errors.Is(err, blockchain.ErrDuplicateBlock) { return } // When the error is a rule error, it means the block was simply - // rejected as opposed to something actually going wrong, so log - // it as such. Otherwise, something really did go wrong, so log - // it as an actual error. + // rejected as opposed to something actually going wrong, so log it as + // such. Otherwise, something really did go wrong, so log it as an + // actual error. + // + // Note that orphan blocks are never requested so there is no need to + // test for that rule error separately. var rErr blockchain.RuleError if errors.As(err, &rErr) { log.Infof("Rejected block %v from %s: %v", blockHash, peer, err) @@ -772,8 +823,7 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { log.Errorf("Critical failure: %v", err) } - // Convert the error into an appropriate reject message and - // send it. + // Convert the error into an appropriate reject message and send it. code, reason := errToWireRejectCode(err) peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false) return @@ -819,197 +869,230 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // Nothing more to do if we aren't in headers-first mode. - if !m.headersFirstMode { - return - } - - // This is headers-first mode, so if the block is not a checkpoint - // request more blocks using the header list when the request queue is - // getting short. - if !isCheckpointBlock { - if m.startHeader != nil && - len(peer.requestedBlocks) < minInFlightBlocks { - m.fetchHeaderBlocks() - } - return - } - - // This is headers-first mode and the block is a checkpoint. When - // there is a next checkpoint, get the next round of headers by asking - // for headers starting from the block after this one up to the next - // checkpoint. - prevHeight := m.nextCheckpoint.Height - prevHash := m.nextCheckpoint.Hash - m.nextCheckpoint = m.findNextHeaderCheckpoint(prevHeight) - if m.nextCheckpoint != nil { - locator := []chainhash.Hash{*prevHash} - err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) - if err != nil { - log.Warnf("Failed to send getheaders message to peer %s: %v", - peer.Addr(), err) - return - } - log.Infof("Downloading headers for blocks %d to %d from peer %s", - prevHeight+1, m.nextCheckpoint.Height, m.syncPeer.Addr()) - return + // Request more blocks using the headers when the request queue is getting + // short. + if peer == m.syncPeer && len(peer.requestedBlocks) < minInFlightBlocks { + m.fetchNextBlocks(peer) } +} - // This is headers-first mode, the block is a checkpoint, and there are - // no more checkpoints, so switch to normal mode by requesting blocks - // from the block after this one up to the end of the chain (zero hash). - m.headersFirstMode = false - m.headerList.Init() - log.Infof("Reached the final checkpoint -- switching to normal mode") - locator := []chainhash.Hash{*blockHash} - err = peer.PushGetBlocksMsg(locator, &zeroHash) - if err != nil { - log.Warnf("Failed to send getblocks message to peer %s: %v", - peer.Addr(), err) +// handleHeadersMsg handles headers messages from all peers. +func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { + peer := lookupPeer(hmsg.peer, m.peers) + if peer == nil { return } -} -// fetchHeaderBlocks creates and sends a request to the syncPeer for the next -// list of blocks to be downloaded based on the current list of headers. -func (m *SyncManager) fetchHeaderBlocks() { - // Nothing to do if there is no start header. - if m.startHeader == nil { - log.Warnf("fetchHeaderBlocks called with no start header") + // Nothing to do for an empty headers message as it means the sending peer + // does not have any additional headers for the requested block locator. + headers := hmsg.headers.Headers + numHeaders := len(headers) + if numHeaders == 0 { return } - // Build up a getdata request for the list of blocks the headers - // describe. The size hint will be limited to wire.MaxInvPerMsg by - // the function, so no need to double check it here. - gdmsg := wire.NewMsgGetDataSizeHint(uint(m.headerList.Len())) - numRequested := 0 - for e := m.startHeader; e != nil; e = e.Next() { - node, ok := e.Value.(*headerNode) - if !ok { - log.Warn("Header list node type is not a headerNode") - continue + // Handle the case where the first header does not connect to any known + // headers specially. + chain := m.cfg.Chain + firstHeader := headers[0] + firstHeaderHash := firstHeader.BlockHash() + firstHeaderConnects := chain.HaveHeader(&firstHeader.PrevBlock) + headersSynced := m.hdrSyncState.headersSynced + if !firstHeaderConnects { + // Ignore headers that do not connect to any known headers when the + // initial headers sync is taking place. It is expected that headers + // will be announced that are not yet known. + if !headersSynced { + return } - if m.needBlock(node.hash) { - iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) - m.requestedBlocks[*node.hash] = struct{}{} - m.syncPeer.requestedBlocks[*node.hash] = struct{}{} - err := gdmsg.AddInvVect(iv) - if err != nil { - log.Warnf("Failed to add invvect while fetching block "+ - "headers: %v", err) + // Attempt to detect block announcements which do not connect to any + // known headers and request any headers starting from the best header + // the local chain knows in order to (hopefully) discover the missing + // headers. + // + // Meanwhile, also keep track of how many times the peer has + // consecutively sent a headers message that does not connect and + // disconnect it once the max allowed threshold has been reached. + if numHeaders < maxExpectedHeaderAnnouncementsPerMsg { + peer.numConsecutiveOrphanHeaders++ + if peer.numConsecutiveOrphanHeaders >= maxConsecutiveOrphanHeaders { + log.Debugf("Received %d consecutive headers messages that do "+ + "not connect from peer %s -- disconnecting", + peer.numConsecutiveOrphanHeaders, peer) + peer.Disconnect() } - numRequested++ - } - m.startHeader = e.Next() - if numRequested >= wire.MaxInvPerMsg { - break - } - } - if len(gdmsg.InvList) > 0 { - m.syncPeer.QueueMessage(gdmsg, nil) - } -} -// handleHeadersMsg handles headers messages from all peers. -func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { - peer := lookupPeer(hmsg.peer, m.peers) - if peer == nil { - return - } + log.Debugf("Requesting missing parents for header %s (height %d) "+ + "received from peer %s", firstHeaderHash, firstHeader.Height, + peer) + bestHeaderHash, _ := chain.BestHeader() + blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash) + locator := chainBlockLocatorToHashes(blkLocator) + peer.PushGetHeadersMsg(locator, &zeroHash) - // The remote peer is misbehaving if we didn't request headers. - msg := hmsg.headers - numHeaders := len(msg.Headers) - if !m.headersFirstMode { - log.Warnf("Got %d unrequested headers from %s -- disconnecting", - numHeaders, peer.Addr()) + return + } + + // The initial headers sync process is done and this does not appear to + // be a block announcement, so disconnect the peer. + log.Debugf("Received orphan header from peer %s -- disconnecting", peer) peer.Disconnect() return } - // Nothing to do for an empty headers message. - if numHeaders == 0 { - return + // Ensure all of the received headers connect the previous one before + // attempting to perform any further processing on any of them. + headerHashes := make([]chainhash.Hash, 0, len(headers)) + headerHashes = append(headerHashes, firstHeaderHash) + for prevIdx, header := range headers[1:] { + prevHash := &headerHashes[prevIdx] + prevHeight := headers[prevIdx].Height + if header.PrevBlock != *prevHash || header.Height != prevHeight+1 { + log.Debugf("Received block header that does not properly connect "+ + "to previous one from peer %s -- disconnecting", peer) + peer.Disconnect() + return + } + headerHashes = append(headerHashes, header.BlockHash()) } - // Process all of the received headers ensuring each one connects to the - // previous and that checkpoints match. - receivedCheckpoint := false - var finalHash *chainhash.Hash - for _, blockHeader := range msg.Headers { - blockHash := blockHeader.BlockHash() - finalHash = &blockHash + // Save the current best known header height prior to processing the headers + // so the code later is able to determine if any new useful headers were + // provided. + _, prevBestHeaderHeight := chain.BestHeader() - // Ensure there is a previous header to compare against. - prevNodeEl := m.headerList.Back() - if prevNodeEl == nil { - log.Warnf("Header list does not contain a previous element as " + - "expected -- disconnecting peer") + // Process all of the received headers. + for _, header := range headers { + err := chain.ProcessBlockHeader(header, blockchain.BFNone) + if err != nil { + // Note that there is no need to check for an orphan header here + // because they were already verified to connect above. + + log.Debugf("Failed to process block header %s from peer %s: %v -- "+ + "disconnecting", header.BlockHash(), peer, err) peer.Disconnect() return } + } - // Ensure the header properly connects to the previous one and - // add it to the list of headers. - node := headerNode{hash: &blockHash} - prevNode := prevNodeEl.Value.(*headerNode) - if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { - node.height = prevNode.height + 1 - e := m.headerList.PushBack(&node) - if m.startHeader == nil { - m.startHeader = e - } - } else { - log.Warnf("Received block header that does not properly connect "+ - "to the chain from peer %s -- disconnecting", peer.Addr()) - peer.Disconnect() - return + // All of the headers were either accepted or already known valid at this + // point. + + // Reset the header sync progress stall timeout when the headers are not + // already synced and progress was made. + newBestHeaderHash, newBestHeaderHeight := chain.BestHeader() + if peer == m.syncPeer && !headersSynced { + if newBestHeaderHeight > prevBestHeaderHeight { + m.hdrSyncState.resetStallTimeout() } + } + + // Reset the count of consecutive headers messages that contained headers + // which do not connect. Note that this is intentionally only done when all + // of the provided headers are successfully processed above. + peer.numConsecutiveOrphanHeaders = 0 + + // Update the last announced block to the final one in the announced headers + // above and update the height for the peer too. + finalHeader := headers[len(headers)-1] + finalReceivedHash := &headerHashes[len(headerHashes)-1] + peer.UpdateLastAnnouncedBlock(finalReceivedHash) + peer.UpdateLastBlockHeight(int64(finalHeader.Height)) + + // Update the sync height if the new best known header height exceeds it. + syncHeight := m.SyncHeight() + if newBestHeaderHeight > syncHeight { + syncHeight = newBestHeaderHeight + m.syncHeightMtx.Lock() + m.syncHeight = syncHeight + m.syncHeightMtx.Unlock() + } - // Verify the header at the next checkpoint height matches. - if node.height == m.nextCheckpoint.Height { - if node.hash.IsEqual(m.nextCheckpoint.Hash) { - receivedCheckpoint = true - log.Infof("Verified downloaded block header against "+ - "checkpoint at height %d/hash %s", node.height, node.hash) - } else { - log.Warnf("Block header at height %d/hash %s from peer %s "+ - "does NOT match expected checkpoint hash of %s -- "+ - "disconnecting", node.height, node.hash, peer.Addr(), - m.nextCheckpoint.Hash) + // Disconnect outbound peers that have less cumulative work than the minimum + // value already known to have been achieved on the network a priori while + // the initial sync is still underway. This is determined by noting that a + // peer only sends fewer than the maximum number of headers per message when + // it has reached its best known header. + isChainCurrent := chain.IsCurrent() + receivedMaxHeaders := len(headers) == wire.MaxBlockHeadersPerMsg + if !isChainCurrent && !peer.Inbound() && !receivedMaxHeaders { + minKnownWork := m.cfg.ChainParams.MinKnownChainWork + if minKnownWork != nil { + workSum, err := chain.ChainWork(finalReceivedHash) + if err == nil && workSum.Cmp(minKnownWork) < 0 { + log.Debugf("Best known chain for peer %s has too little "+ + "cumulative work -- disconnecting", peer) peer.Disconnect() return } - break } } - // When this header is a checkpoint, switch to fetching the blocks for - // all of the headers since the last checkpoint. - if receivedCheckpoint { - // Since the first entry of the list is always the final block - // that is already in the database and is only used to ensure - // the next header links properly, it must be removed before - // fetching the blocks. - m.headerList.Remove(m.headerList.Front()) - log.Infof("Received %v block headers: Fetching blocks", - m.headerList.Len()) - m.progressLogger.SetLastLogTime(time.Now()) - m.fetchHeaderBlocks() - return + // Request more headers when the peer announced the maximum number of + // headers that can be sent in a single message since it probably has more. + if receivedMaxHeaders { + blkLocator := chain.BlockLocatorFromHash(finalReceivedHash) + locator := chainBlockLocatorToHashes(blkLocator) + peer.PushGetHeadersMsg(locator, &zeroHash) + } + + // Consider the headers synced once the sync peer sends a message with a + // final header that is within a few blocks of the sync height. + if !headersSynced && peer == m.syncPeer { + const syncHeightFetchOffset = 6 + if int64(finalHeader.Height)+syncHeightFetchOffset > syncHeight { + headersSynced = true + m.hdrSyncState.headersSynced = headersSynced + m.hdrSyncState.stopStallTimeout() + + log.Infof("Initial headers sync complete (best header hash %s, "+ + "height %d)", newBestHeaderHash, newBestHeaderHeight) + log.Info("Syncing chain") + m.progressLogger.SetLastLogTime(time.Now()) + + // Potentially update whether the chain believes it is current now + // that the headers are synced. + chain.MaybeUpdateIsCurrent() + isChainCurrent = chain.IsCurrent() + } } - // This header is not a checkpoint, so request the next batch of - // headers starting from the latest known header and ending with the - // next checkpoint. - locator := []chainhash.Hash{*finalHash} - err := peer.PushGetHeadersMsg(locator, m.nextCheckpoint.Hash) - if err != nil { - log.Warnf("Failed to send getheaders message to peer %s: %v", - peer.Addr(), err) - return + // Immediately download blocks associated with the announced headers once + // the chain is current. This allows downloading from whichever peer + // announces it first and also ensures any side chain blocks are downloaded + // for vote consideration. + // + // Ultimately, it would likely be better for this functionality to be moved + // to the code which determines the next blocks to request based on the + // available headers once that code supports downloading from multiple peers + // and associated infrastructure to efficiently determinine which peers have + // the associated block(s). + if isChainCurrent { + gdmsg := wire.NewMsgGetDataSizeHint(uint(len(headers))) + for i := range headerHashes { + // Skip the block when it has already been requested or is otherwise + // already known. + hash := &headerHashes[i] + _, isRequestedBlock := m.requestedBlocks[*hash] + if isRequestedBlock || chain.HaveBlock(hash) { + continue + } + + iv := wire.NewInvVect(wire.InvTypeBlock, hash) + limitAdd(m.requestedBlocks, *hash, maxRequestedBlocks) + limitAdd(peer.requestedBlocks, *hash, maxRequestedBlocks) + gdmsg.AddInvVect(iv) + } + if len(gdmsg.InvList) > 0 { + peer.QueueMessage(gdmsg, nil) + } + } + + // Download any blocks needed to catch the local chain up to the best known + // header (if any) once the initial headers sync is done. + if headersSynced && m.syncPeer != nil { + m.fetchNextBlocks(m.syncPeer) } } @@ -1038,12 +1121,6 @@ func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { } } -// needBlock returns whether or not the block needs to be downloaded. For -// example, it does not need to be downloaded when it is already known. -func (m *SyncManager) needBlock(hash *chainhash.Hash) bool { - return !m.cfg.Chain.HaveBlock(hash) -} - // needTx returns whether or not the transaction needs to be downloaded. For // example, it does not need to be downloaded when it is already known. func (m *SyncManager) needTx(hash *chainhash.Hash) bool { @@ -1066,14 +1143,14 @@ func (m *SyncManager) needTx(hash *chainhash.Hash) bool { return false } -// handleInvMsg handles inv messages from all peers. -// We examine the inventory advertised by the remote peer and act accordingly. +// handleInvMsg handles inv messages from all peers. This entails examining the +// inventory advertised by the remote peer for block and transaction +// announcements and acting accordingly. func (m *SyncManager) handleInvMsg(imsg *invMsg) { peer := lookupPeer(imsg.peer, m.peers) if peer == nil { return } - fromSyncPeer := peer == m.syncPeer isCurrent := m.IsCurrent() // Update state information regarding per-peer known inventory and determine @@ -1087,6 +1164,14 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { for _, iv := range imsg.inv.InvList { switch iv.Type { case wire.InvTypeBlock: + // NOTE: All block announcements are now made via headers and the + // decisions regarding which blocks to download are based on those + // headers. Therefore, there is no need to request anything here. + // + // Also, there realistically should not typically be any inv + // messages with a type of block for the same reason. However, it + // doesn't hurt to update the state accordingly just in case. + // Add the block to the cache of known inventory for the peer. This // helps avoid sending blocks to the peer that it is already known // to have. @@ -1095,22 +1180,6 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { // Update the last block in the announced inventory. lastBlock = iv - // Ignore block announcements for blocks that are from peers other - // than the sync peer before the chain is current or are otherwise - // not needed, such as when they're already available. - // - // This helps prevent fetching a mass of orphans. - if (!fromSyncPeer && !isCurrent) || !m.needBlock(&iv.Hash) { - continue - } - - // Request the block if there is not one already pending. - if _, exists := m.requestedBlocks[iv.Hash]; !exists { - limitAdd(m.requestedBlocks, iv.Hash, maxRequestedBlocks) - limitAdd(peer.requestedBlocks, iv.Hash, maxRequestedBlocks) - requestQueue = append(requestQueue, iv) - } - case wire.InvTypeTx: // Add the tx to the cache of known inventory for the peer. This // helps avoid sending transactions to the peer that it is already @@ -1149,15 +1218,6 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { peer.UpdateLastBlockHeight(int64(header.Height)) } } - - // Request blocks after the last one advertised up to the final one the - // remote peer knows about (zero stop hash). - blkLocator := m.cfg.Chain.BlockLocatorFromHash(&lastBlock.Hash) - locator := chainBlockLocatorToHashes(blkLocator) - err := peer.PushGetBlocksMsg(locator, &zeroHash) - if err != nil { - log.Errorf("Failed to push getblocksmsg: %v", err) - } } // Request as much as possible at once. @@ -1292,6 +1352,18 @@ out: log.Warnf("Invalid message type in event handler: %T", msg) } + case <-m.hdrSyncState.stallTimer.C: + // Mark the timer's channel as having been drained so the timer can + // safely be reset. + m.hdrSyncState.stallChanDrained = true + + // Disconnect the sync peer due to stalling the header sync process. + if m.syncPeer != nil { + log.Debugf("Header sync progress stalled from peer %s -- "+ + "disconnecting", m.syncPeer) + m.syncPeer.Disconnect() + } + case <-ctx.Done(): break out } @@ -1386,13 +1458,15 @@ func (m *SyncManager) RequestFromPeer(p *peerpkg.Peer, blocks, voteHashes, tSpendHashes []*chainhash.Hash) error { reply := make(chan requestFromPeerResponse, 1) - select { - case m.msgChan <- requestFromPeerMsg{ + request := requestFromPeerMsg{ peer: p, blocks: blocks, voteHashes: voteHashes, tSpendHashes: tSpendHashes, - reply: reply}: + reply: reply, + } + select { + case m.msgChan <- request: case <-m.quit: } @@ -1616,10 +1690,6 @@ type Config struct { // It may return nil if there is no active RPC server. RpcServer func() *rpcserver.Server - // DisableCheckpoints indicates whether or not the sync manager should make - // use of checkpoints. - DisableCheckpoints bool - // NoMiningStateSync indicates whether or not the sync manager should // perform an initial mining state synchronization with peers once they are // believed to be fully synced. @@ -1643,33 +1713,17 @@ type Config struct { // New returns a new network chain synchronization manager. Use Run to begin // processing asynchronous events. func New(config *Config) *SyncManager { - m := SyncManager{ + return &SyncManager{ cfg: *config, rejectedTxns: make(map[chainhash.Hash]struct{}), requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), peers: make(map[*peerpkg.Peer]*syncMgrPeer), + hdrSyncState: makeHeaderSyncState(), progressLogger: progresslog.New("Processed", log), msgChan: make(chan interface{}, config.MaxPeers*3), - headerList: list.New(), quit: make(chan struct{}), + syncHeight: config.Chain.BestSnapshot().Height, isCurrent: config.Chain.IsCurrent(), } - - best := m.cfg.Chain.BestSnapshot() - if !m.cfg.DisableCheckpoints { - // Initialize the next checkpoint based on the current height. - m.nextCheckpoint = m.findNextHeaderCheckpoint(best.Height) - if m.nextCheckpoint != nil { - m.resetHeaderState(&best.Hash, best.Height) - } - } else { - log.Info("Checkpoints are disabled") - } - - m.syncHeightMtx.Lock() - m.syncHeight = best.Height - m.syncHeightMtx.Unlock() - - return &m } diff --git a/peer/log.go b/peer/log.go index 30feeb35a8..6c7233b790 100644 --- a/peer/log.go +++ b/peer/log.go @@ -1,5 +1,5 @@ // Copyright (c) 2015-2016 The btcsuite developers -// Copyright (c) 2016-2020 The Decred developers +// Copyright (c) 2016-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -188,7 +188,13 @@ func messageSummary(msg wire.Message) string { return locatorSummary(msg.BlockLocatorHashes, &msg.HashStop) case *wire.MsgHeaders: - return fmt.Sprintf("num %d", len(msg.Headers)) + summary := fmt.Sprintf("num %d", len(msg.Headers)) + if len(msg.Headers) > 0 { + finalHeader := msg.Headers[len(msg.Headers)-1] + summary = fmt.Sprintf("%s, final hash %s, height %d", summary, + finalHeader.BlockHash(), finalHeader.Height) + } + return summary case *wire.MsgReject: // Ensure the variable length strings don't contain any diff --git a/server.go b/server.go index 6db838159e..6bafdad379 100644 --- a/server.go +++ b/server.go @@ -673,7 +673,7 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej // Ignore peers that have a protocol version that is too old. The peer // negotiation logic will disconnect it after this callback returns. - if msg.ProtocolVersion < int32(wire.InitialProcotolVersion) { + if msg.ProtocolVersion < int32(wire.SendHeadersVersion) { return nil } @@ -737,6 +737,13 @@ func (sp *serverPeer) OnVersion(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej return nil } +// OnVerAck is invoked when a peer receives a verack wire message. It creates +// and sends a sendheaders message to request all block annoucements are made +// via full headers instead of the inv message. +func (sp *serverPeer) OnVerAck(_ *peer.Peer, msg *wire.MsgVerAck) { + sp.QueueMessage(wire.NewMsgSendHeaders(), nil) +} + // OnMemPool is invoked when a peer receives a mempool wire message. It creates // and sends an inventory message with the contents of the memory pool up to the // maximum inventory allowed per message. @@ -1067,12 +1074,6 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) { // OnHeaders is invoked when a peer receives a headers wire message. The // message is passed down to the net sync manager. func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { - // Ban peers sending empty headers requests. - if len(msg.Headers) == 0 { - sp.server.BanPeer(sp) - return - } - sp.server.syncManager.QueueHeaders(msg, sp.Peer) } @@ -2068,6 +2069,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config { return &peer.Config{ Listeners: peer.MessageListeners{ OnVersion: sp.OnVersion, + OnVerAck: sp.OnVerAck, OnMemPool: sp.OnMemPool, OnGetMiningState: sp.OnGetMiningState, OnMiningState: sp.OnMiningState, @@ -3508,6 +3510,11 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP }, } s.txMemPool = mempool.New(&txC) + + // Create a new sync manager instance with the appropriate configuration. + if cfg.DisableCheckpoints { + srvrLog.Info("Checkpoints are disabled") + } s.syncManager = netsync.New(&netsync.Config{ PeerNotifier: &s, Chain: s.chain, @@ -3516,7 +3523,6 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP RpcServer: func() *rpcserver.Server { return s.rpcServer }, - DisableCheckpoints: cfg.DisableCheckpoints, NoMiningStateSync: cfg.NoMiningStateSync, MaxPeers: cfg.MaxPeers, MaxOrphanTxs: cfg.MaxOrphanTxs, From c9de511f8cf0a6d8844673f2b4be2c45616babb4 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 04:37:23 -0600 Subject: [PATCH 06/10] progresslog: Add support for header sync progress. This modifies the progress logger to expose a new method for logging the progress of header syncing and adds tests to ensure proper functionality. --- internal/progresslog/logger.go | 32 +++++++++++++ internal/progresslog/logger_test.go | 71 +++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/internal/progresslog/logger.go b/internal/progresslog/logger.go index 636be36c94..f37823410f 100644 --- a/internal/progresslog/logger.go +++ b/internal/progresslog/logger.go @@ -37,6 +37,9 @@ type Logger struct { receivedVotes uint64 receivedRevokes uint64 receivedTickets uint64 + + // These fields accumulate information about headers between log statements. + receivedHeaders uint64 } // New returns a new block progress logger. @@ -95,6 +98,35 @@ func (l *Logger) LogProgress(block *wire.MsgBlock, forceLog bool) { l.lastLogTime = now } +// LogHeaderProgress accumulates the provided number of processed headers and +// periodically (every 10 seconds) logs an information message to show the +// header sync progress to the user along with duration and totals included. +// +// The force flag may be used to force a log message to be shown regardless of +// the time the last one was shown. +// +// The progress message is templated as follows: +// {progressAction} {numProcessed} {headers|header} in the last {timePeriod} +// (progress ~{progress}%) +func (l *Logger) LogHeaderProgress(processedHeaders uint64, forceLog bool, progressFn func() float64) { + l.receivedHeaders += processedHeaders + + now := time.Now() + duration := now.Sub(l.lastLogTime) + if !forceLog && duration < time.Second*10 { + return + } + + // Log information about header progress. + l.subsystemLogger.Infof("%s %d %s in the last %0.2fs (progress %0.2f%%)", + l.progressAction, l.receivedHeaders, + pickNoun(l.receivedHeaders, "header", "headers"), + duration.Seconds(), progressFn()) + + l.receivedHeaders = 0 + l.lastLogTime = now +} + // SetLastLogTime updates the last time data was logged to the provided time. func (l *Logger) SetLastLogTime(time time.Time) { l.Lock() diff --git a/internal/progresslog/logger_test.go b/internal/progresslog/logger_test.go index b6ee50c65a..f7214ca7ac 100644 --- a/internal/progresslog/logger_test.go +++ b/internal/progresslog/logger_test.go @@ -154,3 +154,74 @@ func TestLogProgress(t *testing.T) { } } } + +// TestLogHeaderProgress ensures the logging functionality for headers works as +// expected via a test logger. +func TestLogHeaderProgress(t *testing.T) { + tests := []struct { + name string + reset bool + numHeaders uint64 + forceLog bool + lastLogTime time.Time + wantReceivedHeaders uint64 + }{{ + name: "round 1, batch 1, last log time < 10 secs ago, not forced", + numHeaders: 35500, + forceLog: false, + lastLogTime: time.Now(), + wantReceivedHeaders: 35500, + }, { + name: "round 1, batch 2, last log time < 10 secs ago, not forced", + numHeaders: 40000, + forceLog: false, + lastLogTime: time.Now(), + wantReceivedHeaders: 75500, + }, { + name: "round 1, batch 3, last log time < 10 secs ago, not forced", + numHeaders: 66500, + forceLog: false, + lastLogTime: time.Now(), + wantReceivedHeaders: 142000, + }, { + name: "round 2, batch 1, last log time < 10 secs ago, not forced", + reset: true, + numHeaders: 40000, + forceLog: false, + lastLogTime: time.Now(), + wantReceivedHeaders: 40000, + }, { + name: "round 2, batch 2, last log time > 10 secs ago, not forced", + numHeaders: 66500, + forceLog: false, + lastLogTime: time.Now().Add(-11 * time.Second), + wantReceivedHeaders: 0, + }, { + name: "round 2, batch 1, last log time < 10 secs ago, forced", + reset: true, + numHeaders: 54330, + forceLog: true, + lastLogTime: time.Now(), + wantReceivedHeaders: 0, + }} + + progressFn := func() float64 { return 0.0 } + logger := New("Wrote", testLog) + for _, test := range tests { + if test.reset { + logger = New("Wrote", testLog) + } + logger.SetLastLogTime(test.lastLogTime) + logger.LogHeaderProgress(test.numHeaders, test.forceLog, progressFn) + + want := &Logger{ + subsystemLogger: logger.subsystemLogger, + progressAction: logger.progressAction, + lastLogTime: logger.lastLogTime, + receivedHeaders: test.wantReceivedHeaders, + } + if !reflect.DeepEqual(logger, want) { + t.Errorf("%s:\nwant: %+v\ngot: %+v\n", test.name, want, logger) + } + } +} From 320852c495b81fac4716156993120db89388cf1d Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 04:37:23 -0600 Subject: [PATCH 07/10] netsync: Add header sync progress log. This modifies netsync to provide progress logging for the initial headers sync including a progress percentage. A new method is added that determines the progress of the headers sync based on an algorithm that calculates the expected number of headers there should be in the time interval since the current best known header and the current time given the target block time for the network. This is the preferred approach because, unlike the sync height reported by remote peers, it is difficult to game since it is based on the target proof of work, but it assumes consistent mining, which is not the case on all networks, so it is limited to the main and test networks where that applies. For the other networks, it falls back to the sync height reported by the remote peers. The best known header is now logged at initial load time in blockchain and the netsync code is updated to make use of the new headers sync progress logger to tally the number of processed and periodically log the progress. --- blockchain/chain.go | 6 ++++ blockchain/checkpoints.go | 4 +-- internal/netsync/manager.go | 68 +++++++++++++++++++++++++++++++++++++ server.go | 1 + 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/blockchain/chain.go b/blockchain/chain.go index f047c66483..6a6ba3d784 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -2214,6 +2214,12 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { "%d, block index: %d", b.dbInfo.version, b.dbInfo.compVer, b.dbInfo.bidxVer) + b.index.RLock() + bestHdr := b.index.bestHeader + b.index.RUnlock() + log.Infof("Best known header: height %d, hash %v", bestHdr.height, + bestHdr.hash) + tip := b.bestChain.Tip() log.Infof("Chain state: height %d, hash %v, total transactions %d, work %v", tip.height, tip.hash, b.stateSnapshot.TotalTxns, tip.workSum) diff --git a/blockchain/checkpoints.go b/blockchain/checkpoints.go index e674a3f195..cbeecb32f1 100644 --- a/blockchain/checkpoints.go +++ b/blockchain/checkpoints.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2015-2020 The Decred developers +// Copyright (c) 2015-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -65,7 +65,7 @@ func (b *BlockChain) verifyCheckpoint(height int64, hash *chainhash.Hash) bool { return false } - log.Infof("Verified checkpoint at height %d/block %s", checkpoint.Height, + log.Debugf("Verified checkpoint at height %d/block %s", checkpoint.Height, checkpoint.Hash) return true } diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index cda1fbf286..164f0801d4 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -876,6 +876,65 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } +// guessHeaderSyncProgress returns a percentage that is a guess of the progress +// of the header sync progress for the given currently best known header based +// on an algorithm that considers the total number of expected headers based on +// the target time per block of the network. It should only be used for the +// main and test networks because it relies on relatively consistent mining +// which is not the case for other network such as the simulation test network. +// +// This function is safe for concurrent access. +func (m *SyncManager) guessHeaderSyncProgress(header *wire.BlockHeader) float64 { + // Calculate the expected total number of blocks to reach the current time + // by considering the number there already are plus the expected number of + // remaining ones there should be in the time interval since the provided + // best known header and the current time given the target block time. + // + // This approach is used as opposed to calculating the total expected since + // the genesis block since it gets more accurate as more headers are + // processed and thus provide more information. It is also more robust + // against networks with dynamic difficulty readjustment such as the test + // network. + curTimestamp := m.cfg.TimeSource.AdjustedTime().Unix() + targetSecsPerBlock := int64(m.cfg.ChainParams.TargetTimePerBlock.Seconds()) + remaining := (curTimestamp - header.Timestamp.Unix()) / targetSecsPerBlock + expectedTotal := int64(header.Height) + remaining + + // Finally the progress guess is simply the ratio of the current number of + // known headers to the total expected number of headers. + return math.Min(float64(header.Height)/float64(expectedTotal), 1.0) * 100 +} + +// headerSyncProgress returns a percentage that is a guess of the progress of +// of the header sync process. +// +// This function is safe for concurrent access. +func (m *SyncManager) headerSyncProgress() float64 { + hash, _ := m.cfg.Chain.BestHeader() + header, err := m.cfg.Chain.HeaderByHash(&hash) + if err != nil { + return 0.0 + } + + // Use an algorithm that considers the total number of expected headers + // based on the target time per block of the network for the main and test + // networks. This is the preferred approach because, unlike the sync height + // reported by remote peers, it is difficult to game since it is based on + // the target proof of work, but it assumes consistent mining, which is not + // the case on all networks, so limit it to the two where that applies. + net := m.cfg.ChainParams.Net + if net == wire.MainNet || net == wire.TestNet3 { + return m.guessHeaderSyncProgress(&header) + } + + // Fall back to using the sync height reported by the remote peer otherwise. + syncHeight := m.SyncHeight() + if syncHeight == 0 { + return 0.0 + } + return math.Min(float64(header.Height)/float64(syncHeight), 1.0) * 100 +} + // handleHeadersMsg handles headers messages from all peers. func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { peer := lookupPeer(hmsg.peer, m.peers) @@ -1035,6 +1094,9 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { blkLocator := chain.BlockLocatorFromHash(finalReceivedHash) locator := chainBlockLocatorToHashes(blkLocator) peer.PushGetHeadersMsg(locator, &zeroHash) + + m.progressLogger.LogHeaderProgress(uint64(len(headers)), headersSynced, + m.headerSyncProgress) } // Consider the headers synced once the sync peer sends a message with a @@ -1046,6 +1108,8 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { m.hdrSyncState.headersSynced = headersSynced m.hdrSyncState.stopStallTimeout() + m.progressLogger.LogHeaderProgress(uint64(len(headers)), + headersSynced, m.headerSyncProgress) log.Infof("Initial headers sync complete (best header hash %s, "+ "height %d)", newBestHeaderHash, newBestHeaderHeight) log.Info("Syncing chain") @@ -1683,6 +1747,10 @@ type Config struct { // transactions. Chain *blockchain.BlockChain + // TimeSource defines the median time source which is used to retrieve the + // current time adjusted by the median time offset. + TimeSource blockchain.MedianTimeSource + // TxMemPool specifies the mempool to use for processing transactions. TxMemPool *mempool.TxPool diff --git a/server.go b/server.go index 6bafdad379..a23789fd00 100644 --- a/server.go +++ b/server.go @@ -3519,6 +3519,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP PeerNotifier: &s, Chain: s.chain, ChainParams: s.chainParams, + TimeSource: s.timeSource, TxMemPool: s.txMemPool, RpcServer: func() *rpcserver.Server { return s.rpcServer From ae4281bcfce6e06c7712890dfab46970375c3432 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 04:37:24 -0600 Subject: [PATCH 08/10] multi: Add chain verify progress percentage. This modifies blockchain, progresslog, and netsync to provide a progress percentage when logging the main chain verification process. A new method is added to blockchain that determines the verification progress of the current best chain based on how far along it towards the best known header and blockchain is also modified to log the verification progress in the initial chain state message when the blockchain is first loaded. In addition, the progess logger is modified to log the verification progress instead of the timestamp of the last block and the netsync code is updated to make use of the new verification progress func per above. Finally, the netsync code now determines when the chain is considered current and logs new blocks as they are connected along with their hash and additional details. This provides a cleaner distinction between the initial verification and catchup phase and steady state operation. --- blockchain/chain.go | 5 +-- blockchain/chainquery.go | 17 ++++++++++ blockdb.go | 4 ++- internal/netsync/log.go | 11 ++++++- internal/netsync/manager.go | 51 ++++++++++++++++++++++++++--- internal/progresslog/logger.go | 12 +++---- internal/progresslog/logger_test.go | 3 +- 7 files changed, 87 insertions(+), 16 deletions(-) diff --git a/blockchain/chain.go b/blockchain/chain.go index 6a6ba3d784..92f1f40a96 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -2221,8 +2221,9 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { bestHdr.hash) tip := b.bestChain.Tip() - log.Infof("Chain state: height %d, hash %v, total transactions %d, work %v", - tip.height, tip.hash, b.stateSnapshot.TotalTxns, tip.workSum) + log.Infof("Chain state: height %d, hash %v, total transactions %d, work "+ + "%v, progress %0.2f%%", tip.height, tip.hash, + b.stateSnapshot.TotalTxns, tip.workSum, b.VerifyProgress()) return &b, nil } diff --git a/blockchain/chainquery.go b/blockchain/chainquery.go index 1ce5d03eab..00008ea874 100644 --- a/blockchain/chainquery.go +++ b/blockchain/chainquery.go @@ -6,6 +6,7 @@ package blockchain import ( "bytes" + "math" "sort" "github.com/decred/dcrd/chaincfg/chainhash" @@ -233,3 +234,19 @@ func (b *BlockChain) NextNeededBlocks(maxResults uint8, exclude map[chainhash.Ha } return neededBlocks } + +// VerifyProgress returns a percentage that is a guess of the progress of the +// chain verification process. +// +// This function is safe for concurrent access. +func (b *BlockChain) VerifyProgress() float64 { + b.index.RLock() + bestHdr := b.index.bestHeader + b.index.RUnlock() + if bestHdr.height == 0 { + return 0.0 + } + + tip := b.bestChain.Tip() + return math.Min(float64(tip.height)/float64(bestHdr.height), 1.0) * 100 +} diff --git a/blockdb.go b/blockdb.go index 9cf6fa942a..977ace0374 100644 --- a/blockdb.go +++ b/blockdb.go @@ -240,7 +240,9 @@ func dumpBlockChain(params *chaincfg.Params, b *blockchain.BlockChain) error { msgBlock := bl.MsgBlock() forceLog := int64(msgBlock.Header.Height) >= tipHeight - progressLogger.LogProgress(msgBlock, forceLog) + progressLogger.LogProgress(msgBlock, forceLog, func() float64 { + return float64(msgBlock.Header.Height) / float64(tipHeight) * 100 + }) } srvrLog.Infof("Successfully dumped the blockchain (%v blocks) to %v.", diff --git a/internal/netsync/log.go b/internal/netsync/log.go index cbe84d3a1e..8db82c2a8d 100644 --- a/internal/netsync/log.go +++ b/internal/netsync/log.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 The Decred developers +// Copyright (c) 2020-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -20,3 +20,12 @@ var log = slog.Disabled func UseLogger(logger slog.Logger) { log = logger } + +// pickNoun returns the singular or plural form of a noun depending on the +// provided count. +func pickNoun(n uint64, singular, plural string) string { + if n == 1 { + return singular + } + return plural +} diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 164f0801d4..2d64b42e79 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -791,6 +791,11 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { return } + // Save whether or not the chain believes it is current prior to processing + // the block for use below in determining logging behavior. + chain := m.cfg.Chain + wasChainCurrent := chain.IsCurrent() + // Process the block to include validation, best chain selection, etc. // // Also, remove the block from the request maps once it has been processed. @@ -829,15 +834,46 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { return } - // Log information about the block and update the chain state. + // Log information about the block. Use the progress logger when the chain + // was not already current prior to processing the block to provide nicer + // periodic logging with a progress percentage. Otherwise, log the block + // individually along with some stats. msgBlock := bmsg.block.MsgBlock() - forceLog := int64(msgBlock.Header.Height) >= m.SyncHeight() - m.progressLogger.LogProgress(msgBlock, forceLog) + header := &msgBlock.Header + if !wasChainCurrent { + forceLog := int64(header.Height) >= m.SyncHeight() + m.progressLogger.LogProgress(msgBlock, forceLog, chain.VerifyProgress) + if chain.IsCurrent() { + best := chain.BestSnapshot() + log.Infof("Initial chain sync complete (hash %s, height %d)", + best.Hash, best.Height) + } + } else { + var interval string + prevBlockHeader, err := chain.HeaderByHash(&header.PrevBlock) + if err == nil { + diff := header.Timestamp.Sub(prevBlockHeader.Timestamp) + interval = ", interval " + diff.Round(time.Second).String() + } + numTxns := uint64(len(msgBlock.Transactions)) + numTickets := uint64(header.FreshStake) + numVotes := uint64(header.Voters) + numRevokes := uint64(header.Revocations) + log.Infof("New block %s (%d %s, %d %s, %d %s, %d %s, height %d%s)", + blockHash, numTxns, pickNoun(numTxns, "transaction", "transactions"), + numTickets, pickNoun(numTickets, "ticket", "tickets"), + numVotes, pickNoun(numVotes, "vote", "votes"), + numRevokes, pickNoun(numRevokes, "revocation", "revocations"), + header.Height, interval) + } + + // Perform some additional processing when the block extended the main + // chain. onMainChain := forkLen == 0 if onMainChain { // Prune invalidated transactions. - best := m.cfg.Chain.BestSnapshot() + best := chain.BestSnapshot() m.cfg.TxMemPool.PruneStakeTx(best.NextStakeDiff, best.Height) m.cfg.TxMemPool.PruneExpiredTx() @@ -852,7 +888,7 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { // the main chain, update the heights of other peers whose invs may have // been ignored when actively syncing while the chain was not yet current or // lost the lock announcement race. - blockHeight := int64(bmsg.block.MsgBlock().Header.Height) + blockHeight := int64(header.Height) peer.UpdateLastBlockHeight(blockHeight) if onMainChain && m.IsCurrent() { for _, p := range m.peers { @@ -1119,6 +1155,11 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // that the headers are synced. chain.MaybeUpdateIsCurrent() isChainCurrent = chain.IsCurrent() + if isChainCurrent { + best := chain.BestSnapshot() + log.Infof("Initial chain sync complete (hash %s, height %d)", + best.Hash, best.Height) + } } } diff --git a/internal/progresslog/logger.go b/internal/progresslog/logger.go index f37823410f..e3017824ce 100644 --- a/internal/progresslog/logger.go +++ b/internal/progresslog/logger.go @@ -62,8 +62,8 @@ func New(progressAction string, logger slog.Logger) *Logger { // {progressAction} {numProcessed} {blocks|block} in the last {timePeriod} // ({numTxs} {transactions|transaction}, {numTickets} {tickets|ticket}, // {numVotes} {votes|vote}, {numRevocations} {revocations|revocation}, -// height {lastBlockHeight}, {lastBlockTimeStamp}) -func (l *Logger) LogProgress(block *wire.MsgBlock, forceLog bool) { +// height {lastBlockHeight}, progress {progress}%) +func (l *Logger) LogProgress(block *wire.MsgBlock, forceLog bool, progressFn func() float64) { l.Lock() defer l.Unlock() @@ -80,15 +80,15 @@ func (l *Logger) LogProgress(block *wire.MsgBlock, forceLog bool) { } // Log information about chain progress. - l.subsystemLogger.Infof("%s %d %s in the last %0.2fs (%d %s, %d %s, %d %s,"+ - " %d %s, height %d, %s)", l.progressAction, + l.subsystemLogger.Infof("%s %d %s in the last %0.2fs (%d %s, %d %s, %d %s, "+ + "%d %s, height %d, progress %0.2f%%)", l.progressAction, l.receivedBlocks, pickNoun(l.receivedBlocks, "block", "blocks"), duration.Seconds(), l.receivedTxns, pickNoun(l.receivedTxns, "transaction", "transactions"), l.receivedTickets, pickNoun(l.receivedTickets, "ticket", "tickets"), l.receivedVotes, pickNoun(l.receivedVotes, "vote", "votes"), l.receivedRevokes, pickNoun(l.receivedRevokes, "revocation", "revocations"), - header.Height, header.Timestamp) + header.Height, progressFn()) l.receivedBlocks = 0 l.receivedTxns = 0 @@ -107,7 +107,7 @@ func (l *Logger) LogProgress(block *wire.MsgBlock, forceLog bool) { // // The progress message is templated as follows: // {progressAction} {numProcessed} {headers|header} in the last {timePeriod} -// (progress ~{progress}%) +// (progress {progress}%) func (l *Logger) LogHeaderProgress(processedHeaders uint64, forceLog bool, progressFn func() float64) { l.receivedHeaders += processedHeaders diff --git a/internal/progresslog/logger_test.go b/internal/progresslog/logger_test.go index f7214ca7ac..37add29e04 100644 --- a/internal/progresslog/logger_test.go +++ b/internal/progresslog/logger_test.go @@ -131,13 +131,14 @@ func TestLogProgress(t *testing.T) { wantReceivedTickets: 0, }} + progressFn := func() float64 { return 0.0 } progressLogger := New("Wrote", testLog) for _, test := range tests { if test.reset { progressLogger = New("Wrote", testLog) } progressLogger.SetLastLogTime(test.inputLastLogTime) - progressLogger.LogProgress(test.inputBlock, test.forceLog) + progressLogger.LogProgress(test.inputBlock, test.forceLog, progressFn) wantBlockProgressLogger := &Logger{ receivedBlocks: test.wantReceivedBlocks, receivedTxns: test.wantReceivedTxns, From d8af6cf44273a3c7420eaefadf085a84c5020707 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 04:37:25 -0600 Subject: [PATCH 09/10] peer: Remove getheaders response deadline. This removes the response dealine for getheaders since there is no guaranteed response if the remote peer does not have any headers for the locator. Also, while here, modify the debug logging to only log a deadline was added when it actually was. --- peer/peer.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/peer/peer.go b/peer/peer.go index d7f2cd776a..5caa65beaf 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers -// Copyright (c) 2016-2020 The Decred developers +// Copyright (c) 2016-2021 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -1090,40 +1090,45 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st // sent asynchronously and as a result of a long backlog of messages, // such as is typical in the case of initial block download, the // response won't be received in time. - log.Debugf("Adding deadline for command %s for peer %s", msgCmd, p.addr) - + // + // Also, getheaders is intentionally ignored since there is no guaranteed + // response if the remote peer does not have any headers for the locator. + var addedDeadline bool deadline := time.Now().Add(stallResponseTimeout) switch msgCmd { case wire.CmdVersion: // Expects a verack message. pendingResponses[wire.CmdVerAck] = deadline + addedDeadline = true case wire.CmdMemPool: // Expects an inv message. pendingResponses[wire.CmdInv] = deadline + addedDeadline = true case wire.CmdGetBlocks: // Expects an inv message. pendingResponses[wire.CmdInv] = deadline + addedDeadline = true case wire.CmdGetData: // Expects a block, tx, or notfound message. pendingResponses[wire.CmdBlock] = deadline pendingResponses[wire.CmdTx] = deadline pendingResponses[wire.CmdNotFound] = deadline - - case wire.CmdGetHeaders: - // Expects a headers message. Use a longer deadline since it - // can take a while for the remote peer to load all of the - // headers. - deadline = time.Now().Add(stallResponseTimeout * 3) - pendingResponses[wire.CmdHeaders] = deadline + addedDeadline = true case wire.CmdGetMiningState: pendingResponses[wire.CmdMiningState] = deadline + addedDeadline = true case wire.CmdGetInitState: pendingResponses[wire.CmdInitState] = deadline + addedDeadline = true + } + + if addedDeadline { + log.Debugf("Adding deadline for command %s for peer %s", msgCmd, p.addr) } } From cbe730a56127e0eb405e1592d10b45942d6b6801 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 17 Jan 2021 15:46:17 -0600 Subject: [PATCH 10/10] rpcserver: Calc verify progress based on best hdr. This modifies the RPC server getblockchaininfo to base the verification progress calculation on the best header now that all headers are determined prior to downloading and verifying blocks. --- internal/rpcserver/rpcserver.go | 8 ++++---- internal/rpcserver/rpcserverhandlers_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/rpcserver/rpcserver.go b/internal/rpcserver/rpcserver.go index 1e08c3ff5c..292e8dfc56 100644 --- a/internal/rpcserver/rpcserver.go +++ b/internal/rpcserver/rpcserver.go @@ -2035,10 +2035,10 @@ func handleGetBlockchainInfo(_ context.Context, s *Server, cmd interface{}) (int } // Estimate the verification progress of the node. - syncHeight := s.cfg.SyncMgr.SyncHeight() var verifyProgress float64 - if syncHeight > 0 { - verifyProgress = math.Min(float64(best.Height)/float64(syncHeight), 1.0) + if bestHeaderHeight > 0 { + progress := float64(best.Height) / float64(bestHeaderHeight) + verifyProgress = math.Min(progress, 1.0) } // Fetch the maximum allowed block size for all blocks other than the @@ -2101,7 +2101,7 @@ func handleGetBlockchainInfo(_ context.Context, s *Server, cmd interface{}) (int Chain: params.Name, Blocks: best.Height, Headers: bestHeaderHeight, - SyncHeight: syncHeight, + SyncHeight: s.cfg.SyncMgr.SyncHeight(), ChainWork: fmt.Sprintf("%064x", chainWork), InitialBlockDownload: !chain.IsCurrent(), VerificationProgress: verifyProgress, diff --git a/internal/rpcserver/rpcserverhandlers_test.go b/internal/rpcserver/rpcserverhandlers_test.go index 0ca81a350c..53267ad57c 100644 --- a/internal/rpcserver/rpcserverhandlers_test.go +++ b/internal/rpcserver/rpcserverhandlers_test.go @@ -3519,7 +3519,7 @@ func TestHandleGetBlockchainInfo(t *testing.T) { SyncHeight: int64(463074), ChainWork: "000000000000000000000000000000000000000000115d2833849090b0026506", InitialBlockDownload: true, - VerificationProgress: float64(0.9999978405179302), + VerificationProgress: float64(1), BestBlockHash: "00000000000000001e6ec1501c858506de1de4703d1be8bab4061126e8f61480", Difficulty: uint32(404696953), DifficultyRatio: float64(35256672611.3862),