From a14800deabcb081bfe5054affa17f39426b9fe53 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Tue, 5 Nov 2024 07:21:59 -0500 Subject: [PATCH] Make invBlock an array instead of a map to maintain some sort of order. Also add a new error type in database so that the caller can figure out which block is missing. --- database/database.go | 24 ++++++++++++-- database/database_test.go | 21 ++++++++++++ database/tbcd/level/level.go | 2 +- service/tbc/tbc.go | 64 ++++++++++++++++++++++++++---------- 4 files changed, 89 insertions(+), 22 deletions(-) diff --git a/database/database.go b/database/database.go index 285860ad..bdc80d97 100644 --- a/database/database.go +++ b/database/database.go @@ -14,6 +14,8 @@ import ( "strconv" "strings" "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" ) type Database interface { @@ -35,6 +37,21 @@ func (nfe NotFoundError) Is(target error) bool { return ok } +type BlockNotFoundError chainhash.Hash + +func (bnfe BlockNotFoundError) Error() string { + return fmt.Sprintf("block not found: %v", chainhash.Hash(bnfe).String()) +} + +func (bnfe BlockNotFoundError) Is(target error) bool { + _, ok := target.(BlockNotFoundError) + return ok +} + +func (bnfe BlockNotFoundError) Hash() chainhash.Hash { + return chainhash.Hash(bnfe) +} + type DuplicateError string func (de DuplicateError) Error() string { @@ -69,9 +86,10 @@ func (ze ZeroRowsError) Is(target error) bool { } var ( - ErrDuplicate = DuplicateError("duplicate") - ErrNotFound = NotFoundError("not found") - ErrValidation = ValidationError("validation") + ErrDuplicate = DuplicateError("duplicate") + ErrNotFound = NotFoundError("not found") + ErrValidation = ValidationError("validation") + ErrBlockNotFound = BlockNotFoundError(chainhash.Hash{}) ) // ByteArray is a type that corresponds to BYTEA in a database. It supports diff --git a/database/database_test.go b/database/database_test.go index 80b5ce2a..ece15d78 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -7,11 +7,13 @@ package database import ( "bytes" "encoding/json" + "errors" "fmt" "reflect" "testing" "time" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/davecgh/go-spew/spew" ) @@ -329,3 +331,22 @@ func TestTimeZoneScan(t *testing.T) { } } } + +func TestErrors(t *testing.T) { + hash, err := chainhash.NewHashFromStr("000000000000098faa89ab34c3ec0e6e037698e3e54c8d1bbb9dcfe0054a8e7a") + if err != nil { + t.Fatal(err) + } + err = BlockNotFoundError(*hash) + if !errors.Is(err, ErrBlockNotFound) { + t.Fatalf("expected block not found, got %T", err) + } + err = fmt.Errorf("wrap %w", err) + if !errors.Is(err, ErrBlockNotFound) { + t.Fatalf("expected wrapped block not found, got %T", err) + } + err = errors.New("moo") + if errors.Is(err, ErrBlockNotFound) { + t.Fatalf("did not expected block not found, got %T", err) + } +} diff --git a/database/tbcd/level/level.go b/database/tbcd/level/level.go index 3759e2de..e0e5a063 100644 --- a/database/tbcd/level/level.go +++ b/database/tbcd/level/level.go @@ -746,7 +746,7 @@ func (l *ldb) BlockByHash(ctx context.Context, hash *chainhash.Hash) (*btcutil.B eb, err := bDB.Get(hash[:]) if err != nil { if errors.Is(err, leveldb.ErrNotFound) { - return nil, database.NotFoundError(fmt.Sprintf("block not found: %v", hash)) + return nil, database.BlockNotFoundError(*hash) } return nil, fmt.Errorf("block get: %w", err) } diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 637b1ce2..fd03d889 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -122,7 +122,7 @@ type Server struct { broadcast map[chainhash.Hash]*wire.MsgTx // inv blocks see - invBlocks map[chainhash.Hash]struct{} + invBlocks []*chainhash.Hash // bitcoin network seeds []string // XXX remove @@ -185,7 +185,7 @@ func NewServer(cfg *Config) (*Server, error) { sessions: make(map[string]*tbcWs), requestTimeout: defaultRequestTimeout, broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16), - invBlocks: make(map[chainhash.Hash]struct{}, 16), + invBlocks: make([]*chainhash.Hash, 0, 16), promPollVerbose: false, } @@ -227,6 +227,24 @@ func NewServer(cfg *Config) (*Server, error) { return s, nil } +func (s *Server) invInsertUnlocked(h chainhash.Hash) bool { + for k := range s.invBlocks { + if s.invBlocks[k].IsEqual(&h) { + return false + } + } + + // Not found, thus return true for inserted + s.invBlocks = append(s.invBlocks, &h) + return true +} + +func (s *Server) invInsert(h chainhash.Hash) bool { + s.mtx.Lock() + defer s.mtx.Unlock() + return s.invInsertUnlocked(h) +} + func (s *Server) getHeadersByHashes(ctx context.Context, p *peer, hashes ...*chainhash.Hash) error { log.Tracef("getHeadersByHashes %v %v", p, hashes) defer log.Tracef("getHeadersByHashes exit %v %v", p, hashes) @@ -852,29 +870,37 @@ func (s *Server) syncBlocks(ctx context.Context) { } // XXX rethink closure, this is because of index flag mutex. go func() { - if err = s.SyncIndexersToBest(ctx); err != nil && !errors.Is(err, ErrAlreadyIndexing) && !errors.Is(err, context.Canceled) { - // XXX this is probably not a panic. + err := s.SyncIndexersToBest(ctx) + switch { + case errors.Is(err, nil): + fallthrough + case errors.Is(err, context.Canceled): + return + case errors.Is(err, ErrAlreadyIndexing): + return + case errors.Is(err, database.ErrBlockNotFound): + panic(err) + default: panic(fmt.Errorf("sync blocks: %T %w", err, err)) } // Get block headers that we missed during indexing. if s.Synced(ctx).Synced { s.mtx.Lock() - ib := make([]*chainhash.Hash, 0, len(s.invBlocks)) - for k := range s.invBlocks { - ib = append(ib, &k) - } - clear(s.invBlocks) + ib := s.invBlocks + s.invBlocks = make([]*chainhash.Hash, 0, 16) s.mtx.Unlock() // Fixup ib array to not ask for block headers // we already have. + log.Infof("flush missed headers %v", len(ib)) ib = slices.DeleteFunc(ib, func(h *chainhash.Hash) bool { _, _, err := s.BlockHeaderByHash(ctx, h) return err == nil }) // Flush out blocks we saw during quiece. + log.Infof("flush missed headers AFTER %v", len(ib)) log.Debugf("flush missed headers %v", len(ib)) if len(ib) == 0 { @@ -928,12 +954,11 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader // When quiesced do not handle headers but do cache them. s.mtx.Lock() if s.indexing { + x := len(s.invBlocks) for k := range msg.Headers { - if _, ok := s.invBlocks[msg.Headers[k].BlockHash()]; !ok { - s.invBlocks[msg.Headers[k].BlockHash()] = struct{}{} - } + s.invInsertUnlocked(msg.Headers[k].BlockHash()) } - if len(s.invBlocks) != 0 { + if len(s.invBlocks) != x { log.Infof("handleHeaders indexing %v %v", len(msg.Headers), len(s.invBlocks)) } @@ -1193,12 +1218,15 @@ func (s *Server) handleInv(ctx context.Context, p *peer, msg *wire.MsgInv, raw [ // at a time while taking a mutex. txsFound = true case wire.InvTypeBlock: - log.Infof("inventory block: %v", v.Hash) - s.mtx.Lock() - if _, ok := s.invBlocks[v.Hash]; !ok { - s.invBlocks[v.Hash] = struct{}{} + // Make sure we haven't seen block header yet. + _, _, err := s.BlockHeaderByHash(ctx, &v.Hash) + if err == nil { + return nil + } + // XXX we should grab it from p2p if not indexing + if s.invInsert(v.Hash) { + log.Infof("inventory block: %v", v.Hash) } - s.mtx.Unlock() case wire.InvTypeFilteredBlock: log.Debugf("inventory filtered block: %v", v.Hash) case wire.InvTypeWitnessBlock: