From 1444559c4d5b3db09fcee4c576b18286e0d54047 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 10 Jan 2021 18:41:30 -0600 Subject: [PATCH] netsync: Rework inventory announcement handling. This modifies the way inventory announcements are handled to improve efficiency and readability. Specifically, it switches to a more explicit approach that handles the specific recognized types (blocks and transactions) independently and now determines if they are needed with unique logic for each type as opposed to the more generic inventory-only based approach. Next, and probably the most important change overall is that recently confirmed transactions are now tracked by the server and the logic which determines if a transaction is needed now makes use of those tracked transaction as opposed to the previous rather expensive utxo-based query approach. The reasoning for this change is that the only time honest nodes notify others about transactions are when they view those transactions as unconfirmed and they consider themselves to be current. In practice this means the only duplicate announcements are for unconfirmed (which are filtered by the mempool) and recently confirmed transactions. In the case of malicious nodes, the transactions are rejected (either the transactions are actually invalid or they are old transactions that were valid in the past but will now fail due to trying to spend outputs they already spent). So, given the above discussion, there are 3 cases to handle: 1) Duplicate announcements for unconfirmed transactions 2) Duplicate announcements for txns that have already been rejected 3) Duplicate announcements for recently-confirmed transactions For the first case, the duplicate request is filtered because the mempool already knows the unconfirmed transaction. For the second case, rejected transactions are tracked separately and filtered. Thus, only the third case of recently confirmed transactions remains. Finally, the last block is now tracked by the hash instead of an index into the index which allows the relevant updates to take place after the main loop versus inside of it conditionally. --- internal/netsync/manager.go | 324 ++++++++++++++---------------------- server.go | 66 +++++--- 2 files changed, 171 insertions(+), 219 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 6580f35e27..bc6341b9c9 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -23,6 +23,7 @@ import ( "github.com/decred/dcrd/internal/mempool" "github.com/decred/dcrd/internal/progresslog" "github.com/decred/dcrd/internal/rpcserver" + "github.com/decred/dcrd/lru" peerpkg "github.com/decred/dcrd/peer/v2" "github.com/decred/dcrd/wire" ) @@ -1120,17 +1121,11 @@ func (m *SyncManager) fetchHeaderBlocks() { continue } - iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) - haveInv, err := m.haveInventory(iv) - if err != nil { - log.Warnf("Unexpected failure when checking for existing "+ - "inventory during header block fetch: %v", err) - continue - } - if !haveInv { + if m.needBlock(node.hash) { + iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) m.requestedBlocks[*node.hash] = struct{}{} m.syncPeer.requestedBlocks[*node.hash] = struct{}{} - err = gdmsg.AddInvVect(iv) + err := gdmsg.AddInvVect(iv) if err != nil { log.Warnf("Failed to add invvect while fetching block "+ "headers: %v", err) @@ -1273,59 +1268,32 @@ func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { } } -// haveInventory returns whether or not the inventory represented by the passed -// inventory vector is known. This includes checking all of the various places -// inventory can be when it is in different states such as blocks that are part -// of the main chain, on a side chain, in the orphan pool, and transactions that -// are in the memory pool (either the main pool or orphan pool). -func (m *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) { - switch invVect.Type { - case wire.InvTypeBlock: - // Determine if the block is known in any form (main chain, side - // chain, or orphan). - hash := &invVect.Hash - return m.isKnownOrphan(hash) || m.cfg.Chain.HaveBlock(hash), nil - - case wire.InvTypeTx: - // Ask the transaction memory pool if the transaction is known - // to it in any form (main pool or orphan). - if m.cfg.TxMemPool.HaveTransaction(&invVect.Hash) { - return true, nil - } +// needBlock returns whether or not the block needs to be downloaded. For +// example, it does not need to be downloaded when it is already known. +func (m *SyncManager) needBlock(hash *chainhash.Hash) bool { + return !m.isKnownOrphan(hash) && !m.cfg.Chain.HaveBlock(hash) +} - // Check if the transaction exists from the point of view of the main chain - // tip. Note that this is only a best effort since it is expensive to check - // existence of every output and the only purpose of this check is to avoid - // downloading already known transactions. - // - // The first three outputs are checked because it covers the majority of - // common scenarios, including: - // - The vast majority of regular transactions consist of two outputs - // where one is some form of "pay-to-somebody-else" and the other is a - // change output. - // - For some stake transactions (e.g. votes) the first two outputs will - // never be found as a utxo, since they are not spendable, so we need to - // check through at least the third output. - outpoint := wire.OutPoint{Hash: invVect.Hash} - trees := []int8{wire.TxTreeRegular, wire.TxTreeStake} - for _, tree := range trees { - outpoint.Tree = tree - for i := uint32(0); i < 3; i++ { - outpoint.Index = i - entry, err := m.cfg.Chain.FetchUtxoEntry(outpoint) - if err != nil { - return false, err - } - if entry != nil && !entry.IsSpent() { - return true, nil - } - } - } +// needTx returns whether or not the transaction needs to be downloaded. For +// example, it does not need to be downloaded when it is already known. +func (m *SyncManager) needTx(hash *chainhash.Hash) bool { + // No need for transactions that have already been rejected. + if _, exists := m.rejectedTxns[*hash]; exists { + return false } - // The requested inventory is an unsupported type, so just claim - // it is known to avoid requesting it. - return true, nil + // No need for transactions that are already available in the transaction + // memory pool (main pool or orphan). + if m.cfg.TxMemPool.HaveTransaction(hash) { + return false + } + + // No need for transactions that were recently confirmed. + if m.cfg.RecentlyConfirmedTxns.Contains(*hash) { + return false + } + + return false } // handleInvMsg handles inv messages from all peers. @@ -1335,167 +1303,126 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { if peer == nil { return } - - // Attempt to find the final block in the inventory list. There may - // not be one. - lastBlock := -1 - invVects := imsg.inv.InvList - for i := len(invVects) - 1; i >= 0; i-- { - if invVects[i].Type == wire.InvTypeBlock { - lastBlock = i - break - } - } - fromSyncPeer := peer == m.syncPeer isCurrent := m.IsCurrent() - // If this inv contains a block announcement, and this isn't coming from - // our current sync peer or we're current, then update the last - // announced block for this peer. We'll use this information later to - // update the heights of peers based on blocks we've accepted that they - // previously announced. - if lastBlock != -1 && (!fromSyncPeer || isCurrent) { - peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) - } - - // Ignore invs from peers that aren't the sync if we are not current. - // Helps prevent fetching a mass of orphans. - if !fromSyncPeer && !isCurrent { - return - } - - // If our chain is current and a peer announces a block we already - // know of, then update their current block height. - if lastBlock != -1 && isCurrent { - blkHeight, err := m.cfg.Chain.BlockHeightByHash(&invVects[lastBlock].Hash) - if err == nil { - peer.UpdateLastBlockHeight(blkHeight) - } - } - - // Request the advertised inventory if we don't already have it. Also, - // request parent blocks of orphans if we receive one we already have. - // Finally, attempt to detect potential stalls due to long side chains - // we already have and request more blocks to prevent them. + // Update state information regarding per-peer known inventory and determine + // what inventory to request based on factors such as the current sync state + // and whether or not the data is already available. + // + // Also, keep track of the final announced block (when there is one) so the + // peer can be updated with that information accordingly. + var lastBlock *wire.InvVect var requestQueue []*wire.InvVect - for i, iv := range invVects { - // Ignore unsupported inventory types. - if iv.Type != wire.InvTypeBlock && iv.Type != wire.InvTypeTx { - continue - } - - // Add the inventory to the cache of known inventory - // for the peer. - peer.AddKnownInventory(iv) - - // Ignore inventory when we're in headers-first mode. - if m.headersFirstMode { - continue - } - - // Request the inventory if we don't already have it. - haveInv, err := m.haveInventory(iv) - if err != nil { - log.Warnf("Unexpected failure when checking for existing "+ - "inventory during inv message processing: %v", err) - continue - } - if !haveInv { - if iv.Type == wire.InvTypeTx { - // Skip the transaction if it has already been - // rejected. - if _, exists := m.rejectedTxns[iv.Hash]; exists { - continue - } - } + for _, iv := range imsg.inv.InvList { + switch iv.Type { + case wire.InvTypeBlock: + // Add the block to the cache of known inventory for the peer. This + // helps avoid sending blocks to the peer that it is already known + // to have. + peer.AddKnownInventory(iv) - // Add it to the request queue. - requestQueue = append(requestQueue, iv) - continue - } + // Update the last block in the announced inventory. + lastBlock = iv - if iv.Type == wire.InvTypeBlock { - // The block is an orphan block that we already have. - // When the existing orphan was processed, it requested - // the missing parent blocks. When this scenario - // happens, it means there were more blocks missing - // than are allowed into a single inventory message. As - // a result, once this peer requested the final - // advertised block, the remote peer noticed and is now - // resending the orphan block as an available block - // to signal there are more missing blocks that need to - // be requested. - if m.isKnownOrphan(&iv.Hash) { - // Request blocks starting at the latest known - // up to the root of the orphan that just came - // in. - orphanRoot := m.orphanRoot(&iv.Hash) - blkLocator, err := m.cfg.Chain.LatestBlockLocator() - if err != nil { - log.Errorf("Failed to get block locator for the latest "+ - "block: %v", err) - continue - } - locator := chainBlockLocatorToHashes(blkLocator) - err = peer.PushGetBlocksMsg(locator, orphanRoot) - if err != nil { - log.Errorf("Failed to push getblocksmsg for orphan chain: "+ - "%v", err) - } + // Ignore block announcements for blocks that are from peers other + // than the sync peer before the chain is current or are otherwise + // not needed, such as when they're already available. + // + // This helps prevent fetching a mass of orphans. + if (!fromSyncPeer && !isCurrent) || !m.needBlock(&iv.Hash) { continue } - // We already have the final block advertised by this - // inventory message, so force a request for more. This - // should only happen if we're on a really long side - // chain. - if i == lastBlock { - // Request blocks after this one up to the - // final one the remote peer knows about (zero - // stop hash). - blkLocator := m.cfg.Chain.BlockLocatorFromHash(&iv.Hash) - locator := chainBlockLocatorToHashes(blkLocator) - err = peer.PushGetBlocksMsg(locator, &zeroHash) - if err != nil { - log.Errorf("PEER: Failed to push getblocksmsg: %v", err) - } - } - } - } - - // Request as much as possible at once. - numRequested := 0 - gdmsg := wire.NewMsgGetData() - for _, iv := range requestQueue { - switch iv.Type { - case wire.InvTypeBlock: - // Request the block if there is not already a pending - // request. + // Request the block if there is not one already pending. if _, exists := m.requestedBlocks[iv.Hash]; !exists { limitAdd(m.requestedBlocks, iv.Hash, maxRequestedBlocks) limitAdd(peer.requestedBlocks, iv.Hash, maxRequestedBlocks) - gdmsg.AddInvVect(iv) - numRequested++ + requestQueue = append(requestQueue, iv) } case wire.InvTypeTx: - // Request the transaction if there is not already a - // pending request. + // Add the tx to the cache of known inventory for the peer. This + // helps avoid sending transactions to the peer that it is already + // known to have. + peer.AddKnownInventory(iv) + + // Ignore transaction announcements before the chain is current or + // are otherwise not needed, such as when they were recently + // rejected or are already known. + // + // Transaction announcements are based on the state of the fully + // synced ledger, so they are likely to be invalid before the chain + // is current. + if !isCurrent || !m.needTx(&iv.Hash) { + continue + } + + // Request the transaction if there is not one already pending. if _, exists := m.requestedTxns[iv.Hash]; !exists { limitAdd(m.requestedTxns, iv.Hash, maxRequestedTxns) limitAdd(peer.requestedTxns, iv.Hash, maxRequestedTxns) - gdmsg.AddInvVect(iv) - numRequested++ + requestQueue = append(requestQueue, iv) } } + } + if lastBlock != nil { + // The block is an orphan block that we already have. When the + // existing orphan was processed, it requested the missing parent + // blocks. When this scenario happens, it means there were more + // blocks missing than are allowed into a single inventory message. + // As a result, once this peer requested the final advertised block, + // the remote peer noticed and is now resending the orphan block as + // an available block to signal there are more missing blocks that + // need to be requested. + if m.isKnownOrphan(&lastBlock.Hash) { + // Request blocks starting at the latest known up to the root of the + // orphan that just came in. + orphanRoot := m.orphanRoot(&lastBlock.Hash) + blkLocator := m.cfg.Chain.LatestBlockLocator() + locator := chainBlockLocatorToHashes(blkLocator) + err := peer.PushGetBlocksMsg(locator, orphanRoot) + if err != nil { + log.Errorf("Failed to push getblocksmsg for orphan chain: %v", + err) + } + } + + // Update the last announced block to the final one in the announced + // inventory above (if any). In the case the header for that block is + // already known, use that information to update the height for the peer + // too. + peer.UpdateLastAnnouncedBlock(&lastBlock.Hash) + if isCurrent { + header, err := m.cfg.Chain.HeaderByHash(&lastBlock.Hash) + if err == nil { + peer.UpdateLastBlockHeight(int64(header.Height)) + } + } + + // Request blocks after the last one advertised up to the final one the + // remote peer knows about (zero stop hash). + blkLocator := m.cfg.Chain.BlockLocatorFromHash(&lastBlock.Hash) + locator := chainBlockLocatorToHashes(blkLocator) + err := peer.PushGetBlocksMsg(locator, &zeroHash) + if err != nil { + log.Errorf("PEER: Failed to push getblocksmsg: %v", err) + } + } + + // Request as much as possible at once. + var numRequested int32 + gdmsg := wire.NewMsgGetData() + for _, iv := range requestQueue { + gdmsg.AddInvVect(iv) + numRequested++ if numRequested == wire.MaxInvPerMsg { // Send full getdata message and reset. // - // NOTE: There should never be more than wire.MaxInvPerMsg - // in the inv request, so we could return after the - // QueueMessage, but this is more safe. + // NOTE: There should never be more than wire.MaxInvPerMsg in the + // inv request, so this could return after the QueueMessage, but + // this is safer. peer.QueueMessage(gdmsg, nil) gdmsg = wire.NewMsgGetData() numRequested = 0 @@ -1937,6 +1864,11 @@ type Config struct { // MaxOrphanTxs specifies the maximum number of orphan transactions the // transaction pool associated with the server supports. MaxOrphanTxs int + + // RecentlyConfirmedTxns specifies a size limited set to use for tracking + // and querying the most recently confirmed transactions. It is useful for + // preventing duplicate requests. + RecentlyConfirmedTxns *lru.Cache } // New returns a new network chain synchronization manager. Use Start to begin diff --git a/server.go b/server.go index b5695368e6..edf22453a4 100644 --- a/server.go +++ b/server.go @@ -97,6 +97,15 @@ const ( // otherwise arise from sending old orphan blocks and forcing nodes to do // expensive lottery data calculations for them. maxReorgDepthNotify = 6 + + // maxRecentlyConfirmedTxns specifies the maximum number of recently + // confirmed transactions to track. This value is set to target tracking + // the maximum number transactions of the minimum realistic size (~206 + // bytes) in approximately one hour of blocks on the main network. + // + // Since each hash in the cache will occupy 32 bytes, this value will result + // in around 718KiB memory usage plus the overhead of the structure. + maxRecentlyConfirmedTxns = 23000 ) var ( @@ -485,6 +494,10 @@ type server struct { // anouncements. lotteryDataBroadcastMtx sync.RWMutex lotteryDataBroadcast map[chainhash.Hash]struct{} + + // recentlyConfirmedTxns tracks transactions that have been confirmed in the + // most recent blocks. + recentlyConfirmedTxns lru.Cache } // serverPeer extends the peer to maintain state shared by the server. @@ -1494,11 +1507,15 @@ func (s *server) AnnounceNewTransactions(txns []*dcrutil.Tx) { } // TransactionConfirmed marks the provided single confirmation transaction as -// no longer needing rebroadcasting. +// no longer needing rebroadcasting and keeps track of it for use when avoiding +// requests for recently confirmed transactions. func (s *server) TransactionConfirmed(tx *dcrutil.Tx) { + txHash := tx.Hash() + s.recentlyConfirmedTxns.Add(*txHash) + // Rebroadcasting is only necessary when the RPC server is active. if s.rpcServer != nil { - iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) + iv := wire.NewInvVect(wire.InvTypeTx, txHash) s.RemoveRebroadcastInventory(iv) } } @@ -2627,7 +2644,8 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // Now that this block is in the blockchain, mark the // transaction (except the coinbase) as no longer needing - // rebroadcasting. + // rebroadcasting and keep track of it for use when avoiding + // requests for recently confirmed transactions. s.TransactionConfirmed(tx) } } @@ -3321,22 +3339,23 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP } s := server{ - chainParams: chainParams, - addrManager: amgr, - newPeers: make(chan *serverPeer, cfg.MaxPeers), - donePeers: make(chan *serverPeer, cfg.MaxPeers), - banPeers: make(chan *serverPeer, cfg.MaxPeers), - query: make(chan interface{}), - relayInv: make(chan relayMsg, cfg.MaxPeers), - broadcast: make(chan broadcastMsg, cfg.MaxPeers), - modifyRebroadcastInv: make(chan interface{}), - nat: nat, - db: db, - timeSource: blockchain.NewMedianTime(), - services: services, - sigCache: sigCache, - subsidyCache: standalone.NewSubsidyCache(chainParams), - lotteryDataBroadcast: make(map[chainhash.Hash]struct{}), + chainParams: chainParams, + addrManager: amgr, + newPeers: make(chan *serverPeer, cfg.MaxPeers), + donePeers: make(chan *serverPeer, cfg.MaxPeers), + banPeers: make(chan *serverPeer, cfg.MaxPeers), + query: make(chan interface{}), + relayInv: make(chan relayMsg, cfg.MaxPeers), + broadcast: make(chan broadcastMsg, cfg.MaxPeers), + modifyRebroadcastInv: make(chan interface{}), + nat: nat, + db: db, + timeSource: blockchain.NewMedianTime(), + services: services, + sigCache: sigCache, + subsidyCache: standalone.NewSubsidyCache(chainParams), + lotteryDataBroadcast: make(map[chainhash.Hash]struct{}), + recentlyConfirmedTxns: lru.NewCache(maxRecentlyConfirmedTxns), } // Create the transaction and address indexes if needed. @@ -3493,10 +3512,11 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP RpcServer: func() *rpcserver.Server { return s.rpcServer }, - DisableCheckpoints: cfg.DisableCheckpoints, - NoMiningStateSync: cfg.NoMiningStateSync, - MaxPeers: cfg.MaxPeers, - MaxOrphanTxs: cfg.MaxOrphanTxs, + DisableCheckpoints: cfg.DisableCheckpoints, + NoMiningStateSync: cfg.NoMiningStateSync, + MaxPeers: cfg.MaxPeers, + MaxOrphanTxs: cfg.MaxOrphanTxs, + RecentlyConfirmedTxns: &s.recentlyConfirmedTxns, }) if err != nil { return nil, err