Skip to content

Commit

Permalink
netsync: Rework inventory announcement handling.
Browse files Browse the repository at this point in the history
This modifies the way inventory announcements are handled to improve
efficiency and readability.

Specifically, it switches to a more explicit approach that handles the
specific recognized types (blocks and transactions) independently and
now determines if they are needed with unique logic for each type as
opposed to the more generic inventory-only based approach.

Next, and probably the most important change overall, is that recently
confirmed transactions are now tracked by the server and the logic which
determines if a transaction is needed now makes use of those tracked
transactions as opposed to the previous rather expensive utxo-based
query approach.

The reasoning for this change is that the only time honest nodes notify
others about transactions are when they view those transactions as
unconfirmed and they consider themselves to be current.  In practice
this means the only duplicate announcements are for unconfirmed and
recently confirmed transactions.  In the case of malicious nodes, the
transactions are rejected (either the transactions are actually invalid
or they are old transactions that were valid in the past but will now
fail due to trying to spend outputs they already spent).

So, given the above discussion, there are 3 cases to handle:

1) Duplicate announcements for unconfirmed transactions
2) Duplicate announcements for txns that have already been rejected
3) Duplicate announcements for recently-confirmed transactions

For the first case, the duplicate request is filtered by the mempool
since it already knows the unconfirmed transaction.  For the second
case, rejected transactions are tracked separately and filtered.  Thus,
only the third case of recently confirmed transactions remains.

Finally, the last block is now tracked by the hash instead of an index
into the index which allows the relevant updates to take place after the
main loop versus inside of it conditionally.
  • Loading branch information
davecgh committed Jan 11, 2021
1 parent c715121 commit 33c39f2
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 219 deletions.
323 changes: 127 additions & 196 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/decred/dcrd/internal/mempool"
"github.com/decred/dcrd/internal/progresslog"
"github.com/decred/dcrd/internal/rpcserver"
"github.com/decred/dcrd/lru"
peerpkg "github.com/decred/dcrd/peer/v2"
"github.com/decred/dcrd/wire"
)
Expand Down Expand Up @@ -1124,17 +1125,11 @@ func (m *SyncManager) fetchHeaderBlocks() {
continue
}

iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
haveInv, err := m.haveInventory(iv)
if err != nil {
log.Warnf("Unexpected failure when checking for existing "+
"inventory during header block fetch: %v", err)
continue
}
if !haveInv {
if m.needBlock(node.hash) {
iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
m.requestedBlocks[*node.hash] = struct{}{}
m.syncPeer.requestedBlocks[*node.hash] = struct{}{}
err = gdmsg.AddInvVect(iv)
err := gdmsg.AddInvVect(iv)
if err != nil {
log.Warnf("Failed to add invvect while fetching block "+
"headers: %v", err)
Expand Down Expand Up @@ -1277,59 +1272,32 @@ func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
}
}

// haveInventory returns whether or not the inventory represented by the passed
// inventory vector is known. This includes checking all of the various places
// inventory can be when it is in different states such as blocks that are part
// of the main chain, on a side chain, in the orphan pool, and transactions that
// are in the memory pool (either the main pool or orphan pool).
func (m *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
switch invVect.Type {
case wire.InvTypeBlock:
// Determine if the block is known in any form (main chain, side
// chain, or orphan).
hash := &invVect.Hash
return m.isKnownOrphan(hash) || m.cfg.Chain.HaveBlock(hash), nil

case wire.InvTypeTx:
// Ask the transaction memory pool if the transaction is known
// to it in any form (main pool or orphan).
if m.cfg.TxMemPool.HaveTransaction(&invVect.Hash) {
return true, nil
}
// needBlock returns whether or not the block needs to be downloaded. For
// example, it does not need to be downloaded when it is already known.
func (m *SyncManager) needBlock(hash *chainhash.Hash) bool {
return !m.isKnownOrphan(hash) && !m.cfg.Chain.HaveBlock(hash)
}

// Check if the transaction exists from the point of view of the main chain
// tip. Note that this is only a best effort since it is expensive to check
// existence of every output and the only purpose of this check is to avoid
// downloading already known transactions.
//
// The first three outputs are checked because it covers the majority of
// common scenarios, including:
// - The vast majority of regular transactions consist of two outputs
// where one is some form of "pay-to-somebody-else" and the other is a
// change output.
// - For some stake transactions (e.g. votes) the first two outputs will
// never be found as a utxo, since they are not spendable, so we need to
// check through at least the third output.
outpoint := wire.OutPoint{Hash: invVect.Hash}
trees := []int8{wire.TxTreeRegular, wire.TxTreeStake}
for _, tree := range trees {
outpoint.Tree = tree
for i := uint32(0); i < 3; i++ {
outpoint.Index = i
entry, err := m.cfg.Chain.FetchUtxoEntry(outpoint)
if err != nil {
return false, err
}
if entry != nil && !entry.IsSpent() {
return true, nil
}
}
}
// needTx returns whether or not the transaction needs to be downloaded. For
// example, it does not need to be downloaded when it is already known.
func (m *SyncManager) needTx(hash *chainhash.Hash) bool {
// No need for transactions that have already been rejected.
if _, exists := m.rejectedTxns[*hash]; exists {
return false
}

// The requested inventory is an unsupported type, so just claim
// it is known to avoid requesting it.
return true, nil
// No need for transactions that are already available in the transaction
// memory pool (main pool or orphan).
if m.cfg.TxMemPool.HaveTransaction(hash) {
return false
}

// No need for transactions that were recently confirmed.
if m.cfg.RecentlyConfirmedTxns.Contains(*hash) {
return false
}

return false
}

// handleInvMsg handles inv messages from all peers.
Expand All @@ -1339,167 +1307,125 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) {
if peer == nil {
return
}

// Attempt to find the final block in the inventory list. There may
// not be one.
lastBlock := -1
invVects := imsg.inv.InvList
for i := len(invVects) - 1; i >= 0; i-- {
if invVects[i].Type == wire.InvTypeBlock {
lastBlock = i
break
}
}

fromSyncPeer := peer == m.syncPeer
isCurrent := m.IsCurrent()

// If this inv contains a block announcement, and this isn't coming from
// our current sync peer or we're current, then update the last
// announced block for this peer. We'll use this information later to
// update the heights of peers based on blocks we've accepted that they
// previously announced.
if lastBlock != -1 && (!fromSyncPeer || isCurrent) {
peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
}

// Ignore invs from peers that aren't the sync if we are not current.
// Helps prevent fetching a mass of orphans.
if !fromSyncPeer && !isCurrent {
return
}

// If our chain is current and a peer announces a block we already
// know of, then update their current block height.
if lastBlock != -1 && isCurrent {
blkHeight, err := m.cfg.Chain.BlockHeightByHash(&invVects[lastBlock].Hash)
if err == nil {
peer.UpdateLastBlockHeight(blkHeight)
}
}

// Request the advertised inventory if we don't already have it. Also,
// request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains
// we already have and request more blocks to prevent them.
// Update state information regarding per-peer known inventory and determine
// what inventory to request based on factors such as the current sync state
// and whether or not the data is already available.
//
// Also, keep track of the final announced block (when there is one) so the
// peer can be updated with that information accordingly.
var lastBlock *wire.InvVect
var requestQueue []*wire.InvVect
for i, iv := range invVects {
// Ignore unsupported inventory types.
if iv.Type != wire.InvTypeBlock && iv.Type != wire.InvTypeTx {
continue
}

// Add the inventory to the cache of known inventory
// for the peer.
peer.AddKnownInventory(iv)

// Ignore inventory when we're in headers-first mode.
if m.headersFirstMode {
continue
}

// Request the inventory if we don't already have it.
haveInv, err := m.haveInventory(iv)
if err != nil {
log.Warnf("Unexpected failure when checking for existing "+
"inventory during inv message processing: %v", err)
continue
}
if !haveInv {
if iv.Type == wire.InvTypeTx {
// Skip the transaction if it has already been
// rejected.
if _, exists := m.rejectedTxns[iv.Hash]; exists {
continue
}
}
for _, iv := range imsg.inv.InvList {
switch iv.Type {
case wire.InvTypeBlock:
// Add the block to the cache of known inventory for the peer. This
// helps avoid sending blocks to the peer that it is already known
// to have.
peer.AddKnownInventory(iv)

// Add it to the request queue.
requestQueue = append(requestQueue, iv)
continue
}
// Update the last block in the announced inventory.
lastBlock = iv

if iv.Type == wire.InvTypeBlock {
// The block is an orphan block that we already have.
// When the existing orphan was processed, it requested
// the missing parent blocks. When this scenario
// happens, it means there were more blocks missing
// than are allowed into a single inventory message. As
// a result, once this peer requested the final
// advertised block, the remote peer noticed and is now
// resending the orphan block as an available block
// to signal there are more missing blocks that need to
// be requested.
if m.isKnownOrphan(&iv.Hash) {
// Request blocks starting at the latest known
// up to the root of the orphan that just came
// in.
orphanRoot := m.orphanRoot(&iv.Hash)
blkLocator, err := m.cfg.Chain.LatestBlockLocator()
if err != nil {
log.Errorf("Failed to get block locator for the latest "+
"block: %v", err)
continue
}
locator := chainBlockLocatorToHashes(blkLocator)
err = peer.PushGetBlocksMsg(locator, orphanRoot)
if err != nil {
log.Errorf("Failed to push getblocksmsg for orphan chain: "+
"%v", err)
}
// Ignore block announcements for blocks that are from peers other
// than the sync peer before the chain is current or are otherwise
// not needed, such as when they're already available.
//
// This helps prevent fetching a mass of orphans.
if (!fromSyncPeer && !isCurrent) || !m.needBlock(&iv.Hash) {
continue
}

// We already have the final block advertised by this
// inventory message, so force a request for more. This
// should only happen if we're on a really long side
// chain.
if i == lastBlock {
// Request blocks after this one up to the
// final one the remote peer knows about (zero
// stop hash).
blkLocator := m.cfg.Chain.BlockLocatorFromHash(&iv.Hash)
locator := chainBlockLocatorToHashes(blkLocator)
err = peer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
log.Errorf("PEER: Failed to push getblocksmsg: %v", err)
}
}
}
}

// Request as much as possible at once.
numRequested := 0
gdmsg := wire.NewMsgGetData()
for _, iv := range requestQueue {
switch iv.Type {
case wire.InvTypeBlock:
// Request the block if there is not already a pending
// request.
// Request the block if there is not one already pending.
if _, exists := m.requestedBlocks[iv.Hash]; !exists {
limitAdd(m.requestedBlocks, iv.Hash, maxRequestedBlocks)
limitAdd(peer.requestedBlocks, iv.Hash, maxRequestedBlocks)
gdmsg.AddInvVect(iv)
numRequested++
requestQueue = append(requestQueue, iv)
}

case wire.InvTypeTx:
// Request the transaction if there is not already a
// pending request.
// Add the tx to the cache of known inventory for the peer. This
// helps avoid sending transactions to the peer that it is already
// known to have.
peer.AddKnownInventory(iv)

// Ignore transaction announcements before the chain is current or
// are otherwise not needed, such as when they were recently
// rejected or are already known.
//
// Transaction announcements are based on the state of the fully
// synced ledger, so they are likely to be invalid before the chain
// is current.
if !isCurrent || !m.needTx(&iv.Hash) {
continue
}

// Request the transaction if there is not one already pending.
if _, exists := m.requestedTxns[iv.Hash]; !exists {
limitAdd(m.requestedTxns, iv.Hash, maxRequestedTxns)
limitAdd(peer.requestedTxns, iv.Hash, maxRequestedTxns)
gdmsg.AddInvVect(iv)
numRequested++
requestQueue = append(requestQueue, iv)
}
}
}

if lastBlock != nil {
// When the block is an orphan that we already have, the missing parent
// blocks were requested when the orphan was processed. In that case,
// there were more blocks missing than are allowed into a single
// inventory message. As a result, once this peer requested the final
// advertised block, the remote peer noticed and is now resending the
// orphan block as an available block to signal there are more missing
// blocks that need to be requested.
if m.isKnownOrphan(&lastBlock.Hash) {
// Request blocks starting at the latest known up to the root of the
// orphan that just came in.
orphanRoot := m.orphanRoot(&lastBlock.Hash)
blkLocator := m.cfg.Chain.LatestBlockLocator()
locator := chainBlockLocatorToHashes(blkLocator)
err := peer.PushGetBlocksMsg(locator, orphanRoot)
if err != nil {
log.Errorf("Failed to push getblocksmsg for orphan chain: %v",
err)
}
}

// Update the last announced block to the final one in the announced
// inventory above (if any). In the case the header for that block is
// already known, use that information to update the height for the peer
// too.
peer.UpdateLastAnnouncedBlock(&lastBlock.Hash)
if isCurrent {
header, err := m.cfg.Chain.HeaderByHash(&lastBlock.Hash)
if err == nil {
peer.UpdateLastBlockHeight(int64(header.Height))
}
}

// Request blocks after the last one advertised up to the final one the
// remote peer knows about (zero stop hash).
blkLocator := m.cfg.Chain.BlockLocatorFromHash(&lastBlock.Hash)
locator := chainBlockLocatorToHashes(blkLocator)
err := peer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
log.Errorf("Failed to push getblocksmsg: %v", err)
}
}

// Request as much as possible at once.
var numRequested int32
gdmsg := wire.NewMsgGetData()
for _, iv := range requestQueue {
gdmsg.AddInvVect(iv)
numRequested++
if numRequested == wire.MaxInvPerMsg {
// Send full getdata message and reset.
//
// NOTE: There should never be more than wire.MaxInvPerMsg
// in the inv request, so we could return after the
// QueueMessage, but this is more safe.
// NOTE: There should never be more than wire.MaxInvPerMsg in the
// inv request, so this could return after the QueueMessage, but
// this is safer.
peer.QueueMessage(gdmsg, nil)
gdmsg = wire.NewMsgGetData()
numRequested = 0
Expand Down Expand Up @@ -1960,6 +1886,11 @@ type Config struct {
// MaxOrphanTxs specifies the maximum number of orphan transactions the
// transaction pool associated with the server supports.
MaxOrphanTxs int

// RecentlyConfirmedTxns specifies a size limited set to use for tracking
// and querying the most recently confirmed transactions. It is useful for
// preventing duplicate requests.
RecentlyConfirmedTxns *lru.Cache
}

// New returns a new network chain synchronization manager. Use Run to begin
Expand Down
Loading

0 comments on commit 33c39f2

Please sign in to comment.