diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index ce86d6c2c6..1320e42f6c 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -23,6 +23,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/progresslog" "github.com/decred/dcrd/internal/rpcserver" + "github.com/decred/dcrd/lru" peerpkg "github.com/decred/dcrd/peer/v2" "github.com/decred/dcrd/wire" ) @@ -1124,17 +1125,11 @@ func (m *SyncManager) fetchHeaderBlocks() { continue } - iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) - haveInv, err := m.haveInventory(iv) - if err != nil { - log.Warnf("Unexpected failure when checking for existing "+ - "inventory during header block fetch: %v", err) - continue - } - if !haveInv { + if m.needBlock(node.hash) { + iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) m.requestedBlocks[*node.hash] = struct{}{} m.syncPeer.requestedBlocks[*node.hash] = struct{}{} - err = gdmsg.AddInvVect(iv) + err := gdmsg.AddInvVect(iv) if err != nil { log.Warnf("Failed to add invvect while fetching block "+ "headers: %v", err) @@ -1277,59 +1272,32 @@ func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { } } -// haveInventory returns whether or not the inventory represented by the passed -// inventory vector is known. This includes checking all of the various places -// inventory can be when it is in different states such as blocks that are part -// of the main chain, on a side chain, in the orphan pool, and transactions that -// are in the memory pool (either the main pool or orphan pool). -func (m *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) { - switch invVect.Type { - case wire.InvTypeBlock: - // Determine if the block is known in any form (main chain, side - // chain, or orphan). - hash := &invVect.Hash - return m.isKnownOrphan(hash) || m.cfg.Chain.HaveBlock(hash), nil - - case wire.InvTypeTx: - // Ask the transaction memory pool if the transaction is known - // to it in any form (main pool or orphan). - if m.cfg.TxMemPool.HaveTransaction(&invVect.Hash) { - return true, nil - } +// needBlock returns whether or not the block needs to be downloaded. For +// example, it does not need to be downloaded when it is already known. +func (m *SyncManager) needBlock(hash *chainhash.Hash) bool { + return !m.isKnownOrphan(hash) && !m.cfg.Chain.HaveBlock(hash) +} - // Check if the transaction exists from the point of view of the main chain - // tip. Note that this is only a best effort since it is expensive to check - // existence of every output and the only purpose of this check is to avoid - // downloading already known transactions. - // - // The first three outputs are checked because it covers the majority of - // common scenarios, including: - // - The vast majority of regular transactions consist of two outputs - // where one is some form of "pay-to-somebody-else" and the other is a - // change output. - // - For some stake transactions (e.g. votes) the first two outputs will - // never be found as a utxo, since they are not spendable, so we need to - // check through at least the third output. - outpoint := wire.OutPoint{Hash: invVect.Hash} - trees := []int8{wire.TxTreeRegular, wire.TxTreeStake} - for _, tree := range trees { - outpoint.Tree = tree - for i := uint32(0); i < 3; i++ { - outpoint.Index = i - entry, err := m.cfg.Chain.FetchUtxoEntry(outpoint) - if err != nil { - return false, err - } - if entry != nil && !entry.IsSpent() { - return true, nil - } - } - } +// needTx returns whether or not the transaction needs to be downloaded. For +// example, it does not need to be downloaded when it is already known. +func (m *SyncManager) needTx(hash *chainhash.Hash) bool { + // No need for transactions that have already been rejected. + if _, exists := m.rejectedTxns[*hash]; exists { + return false } - // The requested inventory is an unsupported type, so just claim - // it is known to avoid requesting it. - return true, nil + // No need for transactions that are already available in the transaction + // memory pool (main pool or orphan). + if m.cfg.TxMemPool.HaveTransaction(hash) { + return false + } + + // No need for transactions that were recently confirmed. + if m.cfg.RecentlyConfirmedTxns.Contains(*hash) { + return false + } + + return false } // handleInvMsg handles inv messages from all peers. @@ -1339,167 +1307,125 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { if peer == nil { return } - - // Attempt to find the final block in the inventory list. There may - // not be one. - lastBlock := -1 - invVects := imsg.inv.InvList - for i := len(invVects) - 1; i >= 0; i-- { - if invVects[i].Type == wire.InvTypeBlock { - lastBlock = i - break - } - } - fromSyncPeer := peer == m.syncPeer isCurrent := m.IsCurrent() - // If this inv contains a block announcement, and this isn't coming from - // our current sync peer or we're current, then update the last - // announced block for this peer. We'll use this information later to - // update the heights of peers based on blocks we've accepted that they - // previously announced. - if lastBlock != -1 && (!fromSyncPeer || isCurrent) { - peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) - } - - // Ignore invs from peers that aren't the sync if we are not current. - // Helps prevent fetching a mass of orphans. - if !fromSyncPeer && !isCurrent { - return - } - - // If our chain is current and a peer announces a block we already - // know of, then update their current block height. - if lastBlock != -1 && isCurrent { - blkHeight, err := m.cfg.Chain.BlockHeightByHash(&invVects[lastBlock].Hash) - if err == nil { - peer.UpdateLastBlockHeight(blkHeight) - } - } - - // Request the advertised inventory if we don't already have it. Also, - // request parent blocks of orphans if we receive one we already have. - // Finally, attempt to detect potential stalls due to long side chains - // we already have and request more blocks to prevent them. + // Update state information regarding per-peer known inventory and determine + // what inventory to request based on factors such as the current sync state + // and whether or not the data is already available. + // + // Also, keep track of the final announced block (when there is one) so the + // peer can be updated with that information accordingly. + var lastBlock *wire.InvVect var requestQueue []*wire.InvVect - for i, iv := range invVects { - // Ignore unsupported inventory types. - if iv.Type != wire.InvTypeBlock && iv.Type != wire.InvTypeTx { - continue - } - - // Add the inventory to the cache of known inventory - // for the peer. - peer.AddKnownInventory(iv) - - // Ignore inventory when we're in headers-first mode. - if m.headersFirstMode { - continue - } - - // Request the inventory if we don't already have it. - haveInv, err := m.haveInventory(iv) - if err != nil { - log.Warnf("Unexpected failure when checking for existing "+ - "inventory during inv message processing: %v", err) - continue - } - if !haveInv { - if iv.Type == wire.InvTypeTx { - // Skip the transaction if it has already been - // rejected. - if _, exists := m.rejectedTxns[iv.Hash]; exists { - continue - } - } + for _, iv := range imsg.inv.InvList { + switch iv.Type { + case wire.InvTypeBlock: + // Add the block to the cache of known inventory for the peer. This + // helps avoid sending blocks to the peer that it is already known + // to have. + peer.AddKnownInventory(iv) - // Add it to the request queue. - requestQueue = append(requestQueue, iv) - continue - } + // Update the last block in the announced inventory. + lastBlock = iv - if iv.Type == wire.InvTypeBlock { - // The block is an orphan block that we already have. - // When the existing orphan was processed, it requested - // the missing parent blocks. When this scenario - // happens, it means there were more blocks missing - // than are allowed into a single inventory message. As - // a result, once this peer requested the final - // advertised block, the remote peer noticed and is now - // resending the orphan block as an available block - // to signal there are more missing blocks that need to - // be requested. - if m.isKnownOrphan(&iv.Hash) { - // Request blocks starting at the latest known - // up to the root of the orphan that just came - // in. - orphanRoot := m.orphanRoot(&iv.Hash) - blkLocator, err := m.cfg.Chain.LatestBlockLocator() - if err != nil { - log.Errorf("Failed to get block locator for the latest "+ - "block: %v", err) - continue - } - locator := chainBlockLocatorToHashes(blkLocator) - err = peer.PushGetBlocksMsg(locator, orphanRoot) - if err != nil { - log.Errorf("Failed to push getblocksmsg for orphan chain: "+ - "%v", err) - } + // Ignore block announcements for blocks that are from peers other + // than the sync peer before the chain is current or are otherwise + // not needed, such as when they're already available. + // + // This helps prevent fetching a mass of orphans. + if (!fromSyncPeer && !isCurrent) || !m.needBlock(&iv.Hash) { continue } - // We already have the final block advertised by this - // inventory message, so force a request for more. This - // should only happen if we're on a really long side - // chain. - if i == lastBlock { - // Request blocks after this one up to the - // final one the remote peer knows about (zero - // stop hash). - blkLocator := m.cfg.Chain.BlockLocatorFromHash(&iv.Hash) - locator := chainBlockLocatorToHashes(blkLocator) - err = peer.PushGetBlocksMsg(locator, &zeroHash) - if err != nil { - log.Errorf("PEER: Failed to push getblocksmsg: %v", err) - } - } - } - } - - // Request as much as possible at once. - numRequested := 0 - gdmsg := wire.NewMsgGetData() - for _, iv := range requestQueue { - switch iv.Type { - case wire.InvTypeBlock: - // Request the block if there is not already a pending - // request. + // Request the block if there is not one already pending. if _, exists := m.requestedBlocks[iv.Hash]; !exists { limitAdd(m.requestedBlocks, iv.Hash, maxRequestedBlocks) limitAdd(peer.requestedBlocks, iv.Hash, maxRequestedBlocks) - gdmsg.AddInvVect(iv) - numRequested++ + requestQueue = append(requestQueue, iv) } case wire.InvTypeTx: - // Request the transaction if there is not already a - // pending request. + // Add the tx to the cache of known inventory for the peer. This + // helps avoid sending transactions to the peer that it is already + // known to have. + peer.AddKnownInventory(iv) + + // Ignore transaction announcements before the chain is current or + // are otherwise not needed, such as when they were recently + // rejected or are already known. + // + // Transaction announcements are based on the state of the fully + // synced ledger, so they are likely to be invalid before the chain + // is current. + if !isCurrent || !m.needTx(&iv.Hash) { + continue + } + + // Request the transaction if there is not one already pending. if _, exists := m.requestedTxns[iv.Hash]; !exists { limitAdd(m.requestedTxns, iv.Hash, maxRequestedTxns) limitAdd(peer.requestedTxns, iv.Hash, maxRequestedTxns) - gdmsg.AddInvVect(iv) - numRequested++ + requestQueue = append(requestQueue, iv) } } + } + if lastBlock != nil { + // When the block is an orphan that we already have, the missing parent + // blocks were requested when the orphan was processed. In that case, + // there were more blocks missing than are allowed into a single + // inventory message. As a result, once this peer requested the final + // advertised block, the remote peer noticed and is now resending the + // orphan block as an available block to signal there are more missing + // blocks that need to be requested. + if m.isKnownOrphan(&lastBlock.Hash) { + // Request blocks starting at the latest known up to the root of the + // orphan that just came in. + orphanRoot := m.orphanRoot(&lastBlock.Hash) + blkLocator := m.cfg.Chain.LatestBlockLocator() + locator := chainBlockLocatorToHashes(blkLocator) + err := peer.PushGetBlocksMsg(locator, orphanRoot) + if err != nil { + log.Errorf("Failed to push getblocksmsg for orphan chain: %v", + err) + } + } + + // Update the last announced block to the final one in the announced + // inventory above (if any). In the case the header for that block is + // already known, use that information to update the height for the peer + // too. + peer.UpdateLastAnnouncedBlock(&lastBlock.Hash) + if isCurrent { + header, err := m.cfg.Chain.HeaderByHash(&lastBlock.Hash) + if err == nil { + peer.UpdateLastBlockHeight(int64(header.Height)) + } + } + + // Request blocks after the last one advertised up to the final one the + // remote peer knows about (zero stop hash). + blkLocator := m.cfg.Chain.BlockLocatorFromHash(&lastBlock.Hash) + locator := chainBlockLocatorToHashes(blkLocator) + err := peer.PushGetBlocksMsg(locator, &zeroHash) + if err != nil { + log.Errorf("Failed to push getblocksmsg: %v", err) + } + } + + // Request as much as possible at once. + var numRequested int32 + gdmsg := wire.NewMsgGetData() + for _, iv := range requestQueue { + gdmsg.AddInvVect(iv) + numRequested++ if numRequested == wire.MaxInvPerMsg { // Send full getdata message and reset. // - // NOTE: There should never be more than wire.MaxInvPerMsg - // in the inv request, so we could return after the - // QueueMessage, but this is more safe. + // NOTE: There should never be more than wire.MaxInvPerMsg in the + // inv request, so this could return after the QueueMessage, but + // this is safer. peer.QueueMessage(gdmsg, nil) gdmsg = wire.NewMsgGetData() numRequested = 0 @@ -1960,6 +1886,11 @@ type Config struct { // MaxOrphanTxs specifies the maximum number of orphan transactions the // transaction pool associated with the server supports. MaxOrphanTxs int + + // RecentlyConfirmedTxns specifies a size limited set to use for tracking + // and querying the most recently confirmed transactions. It is useful for + // preventing duplicate requests. + RecentlyConfirmedTxns *lru.Cache } // New returns a new network chain synchronization manager. Use Run to begin diff --git a/server.go b/server.go index 6afd502a88..2ac5b832fc 100644 --- a/server.go +++ b/server.go @@ -97,6 +97,15 @@ const ( // otherwise arise from sending old orphan blocks and forcing nodes to do // expensive lottery data calculations for them. maxReorgDepthNotify = 6 + + // maxRecentlyConfirmedTxns specifies the maximum number of recently + // confirmed transactions to track. This value is set to target tracking + // the maximum number transactions of the minimum realistic size (~206 + // bytes) in approximately one hour of blocks on the main network. + // + // Since each hash in the cache will occupy 32 bytes, this value will result + // in around 718KiB memory usage plus the overhead of the structure. + maxRecentlyConfirmedTxns = 23000 ) var ( @@ -485,6 +494,10 @@ type server struct { // anouncements. lotteryDataBroadcastMtx sync.RWMutex lotteryDataBroadcast map[chainhash.Hash]struct{} + + // recentlyConfirmedTxns tracks transactions that have been confirmed in the + // most recent blocks. + recentlyConfirmedTxns lru.Cache } // serverPeer extends the peer to maintain state shared by the server. @@ -1494,11 +1507,15 @@ func (s *server) AnnounceNewTransactions(txns []*dcrutil.Tx) { } // TransactionConfirmed marks the provided single confirmation transaction as -// no longer needing rebroadcasting. +// no longer needing rebroadcasting and keeps track of it for use when avoiding +// requests for recently confirmed transactions. func (s *server) TransactionConfirmed(tx *dcrutil.Tx) { + txHash := tx.Hash() + s.recentlyConfirmedTxns.Add(*txHash) + // Rebroadcasting is only necessary when the RPC server is active. if s.rpcServer != nil { - iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) + iv := wire.NewInvVect(wire.InvTypeTx, txHash) s.RemoveRebroadcastInventory(iv) } } @@ -2625,7 +2642,8 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // Now that this block is in the blockchain, mark the // transaction (except the coinbase) as no longer needing - // rebroadcasting. + // rebroadcasting and keep track of it for use when avoiding + // requests for recently confirmed transactions. s.TransactionConfirmed(tx) } } @@ -3325,22 +3343,23 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP } s := server{ - chainParams: chainParams, - addrManager: amgr, - newPeers: make(chan *serverPeer, cfg.MaxPeers), - donePeers: make(chan *serverPeer, cfg.MaxPeers), - banPeers: make(chan *serverPeer, cfg.MaxPeers), - query: make(chan interface{}), - relayInv: make(chan relayMsg, cfg.MaxPeers), - broadcast: make(chan broadcastMsg, cfg.MaxPeers), - modifyRebroadcastInv: make(chan interface{}), - nat: nat, - db: db, - timeSource: blockchain.NewMedianTime(), - services: services, - sigCache: sigCache, - subsidyCache: standalone.NewSubsidyCache(chainParams), - lotteryDataBroadcast: make(map[chainhash.Hash]struct{}), + chainParams: chainParams, + addrManager: amgr, + newPeers: make(chan *serverPeer, cfg.MaxPeers), + donePeers: make(chan *serverPeer, cfg.MaxPeers), + banPeers: make(chan *serverPeer, cfg.MaxPeers), + query: make(chan interface{}), + relayInv: make(chan relayMsg, cfg.MaxPeers), + broadcast: make(chan broadcastMsg, cfg.MaxPeers), + modifyRebroadcastInv: make(chan interface{}), + nat: nat, + db: db, + timeSource: blockchain.NewMedianTime(), + services: services, + sigCache: sigCache, + subsidyCache: standalone.NewSubsidyCache(chainParams), + lotteryDataBroadcast: make(map[chainhash.Hash]struct{}), + recentlyConfirmedTxns: lru.NewCache(maxRecentlyConfirmedTxns), } // Create the transaction and address indexes if needed. @@ -3497,10 +3516,11 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP RpcServer: func() *rpcserver.Server { return s.rpcServer }, - DisableCheckpoints: cfg.DisableCheckpoints, - NoMiningStateSync: cfg.NoMiningStateSync, - MaxPeers: cfg.MaxPeers, - MaxOrphanTxs: cfg.MaxOrphanTxs, + DisableCheckpoints: cfg.DisableCheckpoints, + NoMiningStateSync: cfg.NoMiningStateSync, + MaxPeers: cfg.MaxPeers, + MaxOrphanTxs: cfg.MaxOrphanTxs, + RecentlyConfirmedTxns: &s.recentlyConfirmedTxns, }) // Dump the blockchain and quit if requested.