Skip to content

Commit

Permalink
Opportunistically download missed/deleted blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
marcopeereboom committed Nov 5, 2024
1 parent ee5144a commit dbea1d5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
2 changes: 1 addition & 1 deletion database/tbcd/level/level.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (l *ldb) BlockByHash(ctx context.Context, hash *chainhash.Hash) (*btcutil.B
eb, err := bDB.Get(hash[:])
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return nil, database.BlockNotFoundError(*hash)
return nil, database.BlockNotFoundError{*hash}
}
return nil, fmt.Errorf("block get: %w", err)
}
Expand Down
49 changes: 34 additions & 15 deletions service/tbc/tbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (s *Server) mempoolPeer(ctx context.Context, p *peer) {
}

func (s *Server) headersPeer(ctx context.Context, p *peer) {
log.Infof("headersPeer %v", p)
log.Tracef("headersPeer %v", p)
defer log.Tracef("headersPeer %v exit", p)

bhb, err := s.db.BlockHeaderBest(ctx)
Expand Down Expand Up @@ -743,6 +743,21 @@ func (s *Server) downloadBlock(ctx context.Context, p *peer, ch *chainhash.Hash)
return err
}

func (s *Server) downloadBlockFromRandomPeer(ctx context.Context, block *chainhash.Hash) error {
log.Tracef("downloadBlockFromRandomPeer")
defer log.Tracef("downloadBlockFromRandomPeer exit")

rp, err := s.pm.Random()
if err != nil {
return fmt.Errorf("random peer %v: %w", block, err)
}
s.blocks.Put(ctx, defaultBlockPendingTimeout, block.String(), rp,
s.blockExpired, nil)
go s.downloadBlock(ctx, rp, block)

return nil
}

func (s *Server) handleBlockExpired(ctx context.Context, key any, value any) error {
log.Tracef("handleBlockExpired")
defer log.Tracef("handleBlockExpired exit")
Expand Down Expand Up @@ -870,22 +885,31 @@ func (s *Server) syncBlocks(ctx context.Context) {
}
// XXX rethink closure, this is because of index flag mutex.
go func() {
var eval *database.BlockNotFoundError
var (
eval database.BlockNotFoundError
block chainhash.Hash
)
err := s.SyncIndexersToBest(ctx)
switch {
case errors.Is(err, nil):
fallthrough
case errors.Is(err, context.Canceled):
return
case errors.Is(err, ErrAlreadyIndexing):
return

case errors.As(err, &eval):
block := chainhash.Hash(*eval)
panic(block)
block = eval.Hash
err := s.downloadBlockFromRandomPeer(ctx, &block)
if err != nil {
log.Errorf("download block random peer: %v", err)
} else {
log.Infof("Download missed block: %v", block)
}
return

default:
panic(fmt.Errorf("sync blocks: %T %w", err, err))
}
log.Infof("flush -- %v", spew.Sdump(s.Synced(ctx).Synced))

// Get block headers that we missed during indexing.
if s.Synced(ctx).Synced {
Expand All @@ -896,15 +920,13 @@ func (s *Server) syncBlocks(ctx context.Context) {

// Fixup ib array to not ask for block headers
// we already have.
log.Infof("flush missed headers %v", len(ib))
ib = slices.DeleteFunc(ib, func(h *chainhash.Hash) bool {
_, _, err := s.BlockHeaderByHash(ctx, h)
return err == nil
})

// Flush out blocks we saw during quiece.
log.Infof("flush missed headers AFTER %v", len(ib))
log.Debugf("flush missed headers %v", len(ib))
log.Debugf("download missed block headers %v", len(ib))

if len(ib) == 0 {
log.Debugf("nothing to do")
Expand Down Expand Up @@ -936,17 +958,13 @@ func (s *Server) syncBlocks(ctx context.Context) {
// Already being downloaded.
continue
}
rp, err := s.pm.Random()
if err != nil {
if err := s.downloadBlockFromRandomPeer(ctx, hash); err != nil {
// This can happen during startup or when the network
// is starved.
// XXX: Probably too loud, remove later.
log.Errorf("random peer %v: %v", hashS, err)
return
}
s.blocks.Put(ctx, defaultBlockPendingTimeout, hashS, rp,
s.blockExpired, nil)
go s.downloadBlock(ctx, rp, hash)
}
}

Expand Down Expand Up @@ -977,7 +995,8 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader
if err != nil {
log.Errorf("blockheaders %v: %v", p, err)
} else {
log.Infof("blockheaders caught up at %v: %v", p, bhb.HH())
log.Debugf("blockheaders caught up at %v: %v",
p, bhb.HH())
}
} else {
if s.cfg.MempoolEnabled {
Expand Down

0 comments on commit dbea1d5

Please sign in to comment.