Skip to content

Commit

Permalink
Make invBlock an array instead of a map to maintain some sort of order.
Browse files Browse the repository at this point in the history
Also add a new error type in database so that the caller can figure out
which block is missing.
  • Loading branch information
marcopeereboom committed Nov 5, 2024
1 parent 5742d57 commit a14800d
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 22 deletions.
24 changes: 21 additions & 3 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"strconv"
"strings"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
)

type Database interface {
Expand All @@ -35,6 +37,21 @@ func (nfe NotFoundError) Is(target error) bool {
return ok
}

type BlockNotFoundError chainhash.Hash

func (bnfe BlockNotFoundError) Error() string {
return fmt.Sprintf("block not found: %v", chainhash.Hash(bnfe).String())
}

func (bnfe BlockNotFoundError) Is(target error) bool {
_, ok := target.(BlockNotFoundError)
return ok
}

func (bnfe BlockNotFoundError) Hash() chainhash.Hash {
return chainhash.Hash(bnfe)
}

type DuplicateError string

func (de DuplicateError) Error() string {
Expand Down Expand Up @@ -69,9 +86,10 @@ func (ze ZeroRowsError) Is(target error) bool {
}

var (
ErrDuplicate = DuplicateError("duplicate")
ErrNotFound = NotFoundError("not found")
ErrValidation = ValidationError("validation")
ErrDuplicate = DuplicateError("duplicate")
ErrNotFound = NotFoundError("not found")
ErrValidation = ValidationError("validation")
ErrBlockNotFound = BlockNotFoundError(chainhash.Hash{})
)

// ByteArray is a type that corresponds to BYTEA in a database. It supports
Expand Down
21 changes: 21 additions & 0 deletions database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package database
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"reflect"
"testing"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/davecgh/go-spew/spew"
)

Expand Down Expand Up @@ -329,3 +331,22 @@ func TestTimeZoneScan(t *testing.T) {
}
}
}

func TestErrors(t *testing.T) {
hash, err := chainhash.NewHashFromStr("000000000000098faa89ab34c3ec0e6e037698e3e54c8d1bbb9dcfe0054a8e7a")
if err != nil {
t.Fatal(err)
}
err = BlockNotFoundError(*hash)
if !errors.Is(err, ErrBlockNotFound) {
t.Fatalf("expected block not found, got %T", err)
}
err = fmt.Errorf("wrap %w", err)
if !errors.Is(err, ErrBlockNotFound) {
t.Fatalf("expected wrapped block not found, got %T", err)
}
err = errors.New("moo")
if errors.Is(err, ErrBlockNotFound) {
t.Fatalf("did not expected block not found, got %T", err)
}
}
2 changes: 1 addition & 1 deletion database/tbcd/level/level.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (l *ldb) BlockByHash(ctx context.Context, hash *chainhash.Hash) (*btcutil.B
eb, err := bDB.Get(hash[:])
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return nil, database.NotFoundError(fmt.Sprintf("block not found: %v", hash))
return nil, database.BlockNotFoundError(*hash)
}
return nil, fmt.Errorf("block get: %w", err)
}
Expand Down
64 changes: 46 additions & 18 deletions service/tbc/tbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type Server struct {
broadcast map[chainhash.Hash]*wire.MsgTx

// inv blocks see
invBlocks map[chainhash.Hash]struct{}
invBlocks []*chainhash.Hash

// bitcoin network
seeds []string // XXX remove
Expand Down Expand Up @@ -185,7 +185,7 @@ func NewServer(cfg *Config) (*Server, error) {
sessions: make(map[string]*tbcWs),
requestTimeout: defaultRequestTimeout,
broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16),
invBlocks: make(map[chainhash.Hash]struct{}, 16),
invBlocks: make([]*chainhash.Hash, 0, 16),
promPollVerbose: false,
}

Expand Down Expand Up @@ -227,6 +227,24 @@ func NewServer(cfg *Config) (*Server, error) {
return s, nil
}

func (s *Server) invInsertUnlocked(h chainhash.Hash) bool {
for k := range s.invBlocks {
if s.invBlocks[k].IsEqual(&h) {
return false
}
}

// Not found, thus return true for inserted
s.invBlocks = append(s.invBlocks, &h)
return true
}

func (s *Server) invInsert(h chainhash.Hash) bool {
s.mtx.Lock()
defer s.mtx.Unlock()
return s.invInsertUnlocked(h)
}

func (s *Server) getHeadersByHashes(ctx context.Context, p *peer, hashes ...*chainhash.Hash) error {
log.Tracef("getHeadersByHashes %v %v", p, hashes)
defer log.Tracef("getHeadersByHashes exit %v %v", p, hashes)
Expand Down Expand Up @@ -852,29 +870,37 @@ func (s *Server) syncBlocks(ctx context.Context) {
}
// XXX rethink closure, this is because of index flag mutex.
go func() {
if err = s.SyncIndexersToBest(ctx); err != nil && !errors.Is(err, ErrAlreadyIndexing) && !errors.Is(err, context.Canceled) {
// XXX this is probably not a panic.
err := s.SyncIndexersToBest(ctx)
switch {
case errors.Is(err, nil):
fallthrough
case errors.Is(err, context.Canceled):
return
case errors.Is(err, ErrAlreadyIndexing):
return
case errors.Is(err, database.ErrBlockNotFound):
panic(err)
default:
panic(fmt.Errorf("sync blocks: %T %w", err, err))
}

// Get block headers that we missed during indexing.
if s.Synced(ctx).Synced {
s.mtx.Lock()
ib := make([]*chainhash.Hash, 0, len(s.invBlocks))
for k := range s.invBlocks {
ib = append(ib, &k)
}
clear(s.invBlocks)
ib := s.invBlocks
s.invBlocks = make([]*chainhash.Hash, 0, 16)
s.mtx.Unlock()

// Fixup ib array to not ask for block headers
// we already have.
log.Infof("flush missed headers %v", len(ib))
ib = slices.DeleteFunc(ib, func(h *chainhash.Hash) bool {
_, _, err := s.BlockHeaderByHash(ctx, h)
return err == nil
})

// Flush out blocks we saw during quiece.
log.Infof("flush missed headers AFTER %v", len(ib))
log.Debugf("flush missed headers %v", len(ib))

if len(ib) == 0 {
Expand Down Expand Up @@ -928,12 +954,11 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader
// When quiesced do not handle headers but do cache them.
s.mtx.Lock()
if s.indexing {
x := len(s.invBlocks)
for k := range msg.Headers {
if _, ok := s.invBlocks[msg.Headers[k].BlockHash()]; !ok {
s.invBlocks[msg.Headers[k].BlockHash()] = struct{}{}
}
s.invInsertUnlocked(msg.Headers[k].BlockHash())
}
if len(s.invBlocks) != 0 {
if len(s.invBlocks) != x {
log.Infof("handleHeaders indexing %v %v",
len(msg.Headers), len(s.invBlocks))
}
Expand Down Expand Up @@ -1193,12 +1218,15 @@ func (s *Server) handleInv(ctx context.Context, p *peer, msg *wire.MsgInv, raw [
// at a time while taking a mutex.
txsFound = true
case wire.InvTypeBlock:
log.Infof("inventory block: %v", v.Hash)
s.mtx.Lock()
if _, ok := s.invBlocks[v.Hash]; !ok {
s.invBlocks[v.Hash] = struct{}{}
// Make sure we haven't seen block header yet.
_, _, err := s.BlockHeaderByHash(ctx, &v.Hash)
if err == nil {
return nil
}
// XXX we should grab it from p2p if not indexing
if s.invInsert(v.Hash) {
log.Infof("inventory block: %v", v.Hash)
}
s.mtx.Unlock()
case wire.InvTypeFilteredBlock:
log.Debugf("inventory filtered block: %v", v.Hash)
case wire.InvTypeWitnessBlock:
Expand Down

0 comments on commit a14800d

Please sign in to comment.