From 66dab1af3620034ff95c2f096fcf2ac02c6eab9d Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Wed, 5 Jun 2024 12:04:55 +0100 Subject: [PATCH] mostly working but ugly interleaved start/stop downloading/indexing --- service/tbc/crawler.go | 55 ++++- service/tbc/tbc.go | 395 ++++++++++++++++++++---------------- service/tbc/tbcfork_test.go | 2 +- 3 files changed, 268 insertions(+), 184 deletions(-) diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index 2a7218275..19cc51a42 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -8,7 +8,6 @@ import ( "context" "crypto/sha256" "encoding/binary" - "encoding/hex" "errors" "fmt" "runtime" @@ -56,7 +55,7 @@ func processUtxos(cp *chaincfg.Params, txs []*btcutil.Tx, utxos map[tbcd.Outpoin op := tbcd.NewOutpoint(txIn.PreviousOutPoint.Hash, txIn.PreviousOutPoint.Index) if utxo, ok := utxos[op]; ok && !utxo.IsDelete() { - log.Infof("deleting utxo %s value %d", hex.EncodeToString(utxo.ScriptHashSlice()), utxo.Value()) + // log.Infof("deleting utxo %s value %d", hex.EncodeToString(utxo.ScriptHashSlice()), utxo.Value()) delete(utxos, op) continue } @@ -66,8 +65,8 @@ func processUtxos(cp *chaincfg.Params, txs []*btcutil.Tx, utxos map[tbcd.Outpoin continue } - scriptHash := sha256.Sum256(txOut.PkScript) - log.Infof("adding utxo to script hash %s value %d", hex.EncodeToString(scriptHash[:]), uint64(txOut.Value)) + // scriptHash := sha256.Sum256(txOut.PkScript) + // log.Infof("adding utxo to script hash %s value %d", hex.EncodeToString(scriptHash[:]), uint64(txOut.Value)) utxos[tbcd.NewOutpoint(*tx.Hash(), uint32(outIndex))] = tbcd.NewCacheOutput( sha256.Sum256(txOut.PkScript), @@ -164,7 +163,7 @@ func (s *Server) indexUtxosInBlocks(ctx context.Context, startHeight, maxHeight } // At this point we can lockless since it is all single // threaded again. - log.Infof("processing utxo at height %d", height) + // log.Infof("processing utxo at height %d", height) err = processUtxos(s.chainParams, b.Transactions(), utxos) if err != nil { return 0, fmt.Errorf("process utxos %v: %w", height, err) @@ -407,12 +406,55 @@ func (s *Server) TxIndexer(ctx context.Context, height, count uint64) error { } } -// SyncIndexersToHeight tries to move the various indexers to the suplied +// SyncIndexersToHeight tries to move the various indexers to the supplied // height (inclusive). func (s *Server) SyncIndexersToHeight(ctx context.Context, height uint64) error { + log.Infof("SyncIndexersToHeight %v", height) + defer log.Infof("SyncIndexersToHeight exit") + log.Tracef("SyncIndexersToHeight") defer log.Tracef("SyncIndexersToHeight exit") + s.mtx.Lock() + if s.indexing { + s.mtx.Unlock() + return fmt.Errorf("already indexing") + } + s.indexing = true + s.mtx.Unlock() + + defer func() { + // unquiesce + s.mtx.Lock() + s.quiesced = false + s.indexing = false + s.clipped = false + actualHeight, bhb, err := s.RawBlockHeaderBest(ctx) + if err != nil { + log.Errorf("sync indexers best: %v", err) + s.mtx.Unlock() + return + } + // get a random peer + p, err := s.randomPeer(ctx) + if err != nil { + log.Errorf("sync indexers random peer: %v", err) + s.mtx.Unlock() + return + } + s.mtx.Unlock() + + // continue getting headers, XXX this does not belong here either + // XXX if bh download fails we will get jammed. We need a queued "must execute this command" added to peer/service. + log.Infof("resuming block header download at: %v", actualHeight) + err = s.getHeaders(ctx, p, bhb) + if err != nil { + log.Errorf("sync indexers: %v", err) + return + } + }() + + log.Infof("working") // Outputs index uhBE, err := s.db.MetadataGet(ctx, UtxoIndexHeightKey) if err != nil { @@ -446,6 +488,7 @@ func (s *Server) SyncIndexersToHeight(ctx context.Context, height uint64) error return fmt.Errorf("tx indexer: %w", err) } } + log.Infof("done working") return nil } diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index fe5976cd1..84f902950 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -220,8 +220,11 @@ type Server struct { pings *ttl.TTL // outstanding pings // reentrancy flags for the indexers - utxoIndexerRunning bool - txIndexerRunning bool + // utxoIndexerRunning bool + // txIndexerRunning bool + quiesced bool // when set do not accept blockheaders and ot blocks. + clipped bool // XXX kill including all surrounding code, this is for test only + indexing bool // prevent re-entrant indexing db tbcd.Database @@ -688,30 +691,40 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { spew.Sdump(msg) } - // XXX send wire message to pool reader + // Commands that are always accepted. switch m := msg.(type) { case *wire.MsgAddr: go s.handleAddr(ctx, p, m) + continue case *wire.MsgAddrV2: go s.handleAddrV2(ctx, p, m) + continue - case *wire.MsgBlock: - go s.handleBlock(ctx, p, m) + case *wire.MsgPing: + go s.handlePing(ctx, p, m) + continue - case *wire.MsgFeeFilter: - // XXX shut up + case *wire.MsgPong: + go s.handlePong(ctx, p, m) + continue + } - case *wire.MsgInv: - go s.handleInv(ctx, p, m) + // When quiesced do not handle other p2p commands. + s.mtx.Lock() + quiesced := s.quiesced + s.mtx.Unlock() + if quiesced { + continue + } + switch m := msg.(type) { case *wire.MsgHeaders: go s.handleHeaders(ctx, p, m) - case *wire.MsgPing: - go s.handlePing(ctx, p, m) - case *wire.MsgPong: - go s.handlePong(ctx, p, m) + case *wire.MsgBlock: + go s.handleBlock(ctx, p, m) + default: log.Tracef("unhandled message type %v: %T\n", p, msg) } @@ -826,164 +839,164 @@ func (s *Server) handlePong(ctx context.Context, p *peer, pong *wire.MsgPong) { log.Tracef("handlePong %v: pong %v", p.address, pong.Nonce) } -func (s *Server) handleInv(ctx context.Context, p *peer, msg *wire.MsgInv) { - log.Tracef("handleInv (%v)", p) - defer log.Tracef("handleInv exit (%v)", p) - - // XXX this currently does nothing. Blocks are requested elsewhere. - if true { - return - } - - var bis []tbcd.BlockIdentifier - for k := range msg.InvList { - switch msg.InvList[k].Type { - case wire.InvTypeBlock: - - // XXX height is missing here, looks right but assert - // that this isn't broken. - log.Infof("handleInv: block %v", msg.InvList[k].Hash) - - bis = append(bis, tbcd.BlockIdentifier{ - Hash: msg.InvList[k].Hash[:], // fake out - }) - log.Infof("handleInv: block %v", msg.InvList[k].Hash) - case wire.InvTypeTx: - // XXX silence mempool for now - return - default: - log.Infof("handleInv: skipping inv type %v", msg.InvList[k].Type) - return - } - } - - // XXX This happens during block header download, we should not react - // Probably move into the invtype switch - // log.Infof("download blocks if we like them") - // if len(bis) > 0 { - // s.mtx.Lock() - // defer s.mtx.Unlock() - // err := s.downloadBlocks(ctx, bis) - // if err != nil { - // log.Errorf("download blocks: %v", err) - // return - // } - // } -} - -func (s *Server) txIndexer(ctx context.Context) { - log.Tracef("txIndexer") - defer log.Tracef("txIndexer exit") - - if !s.cfg.AutoIndex { - return - } - - // only one txIndexer may run at any given time - s.mtx.Lock() - if s.txIndexerRunning { - s.mtx.Unlock() - return - } - s.txIndexerRunning = true - s.mtx.Unlock() - - // mark txIndexer not running on exit - defer func() { - s.mtx.Lock() - s.txIndexerRunning = false - s.mtx.Unlock() - }() - - if s.blocksMissing(ctx) { - return - } - - // Get height from db - he, err := s.db.MetadataGet(ctx, TxIndexHeightKey) - if err != nil { - if !errors.Is(err, database.ErrNotFound) { - log.Errorf("tx indexer metadata get: %v", err) - return - } - he = make([]byte, 8) - } - h := binary.BigEndian.Uint64(he) - - // Skip txIndexer if we are at best block height. This is a bit racy. - bhb, err := s.db.BlockHeaderBest(ctx) - if err != nil { - log.Errorf("utxo indexer block headers best: %v", err) - return - } - if bhb.Height != h-1 { - err = s.TxIndexer(ctx, h, 0) - if err != nil { - log.Errorf("tx indexer: %v", err) - return - } - } -} - -func (s *Server) utxoIndexer(ctx context.Context) { - log.Tracef("utxoIndexer") - defer log.Tracef("utxoIndexer exit") - - if !s.cfg.AutoIndex { - return - } - - // only one utxoIndexer may run at any given time - s.mtx.Lock() - if s.utxoIndexerRunning { - s.mtx.Unlock() - return - } - s.utxoIndexerRunning = true - s.mtx.Unlock() - - // mark utxoIndexer not running on exit - defer func() { - s.mtx.Lock() - s.utxoIndexerRunning = false - s.mtx.Unlock() - }() - - // exit if we aren't synced - if s.blocksMissing(ctx) { - return - } - - // Index all utxos - - // Get height from db - he, err := s.db.MetadataGet(ctx, UtxoIndexHeightKey) - if err != nil { - if !errors.Is(err, database.ErrNotFound) { - log.Errorf("utxo indexer metadata get: %v", err) - return - } - he = make([]byte, 8) - } - h := binary.BigEndian.Uint64(he) - - // Skip UtxoIndex if we are at best block height. This is a bit racy. - bhb, err := s.db.BlockHeaderBest(ctx) - if err != nil { - log.Errorf("utxo indexer block headers best: %v", err) - return - } - if bhb.Height != h-1 { - err = s.UtxoIndexer(ctx, h, 0) - if err != nil { - log.Errorf("utxo indexer: %v", err) - return - } - } - - // When utxo sync completes kick off tx sync - go s.txIndexer(ctx) -} +//func (s *Server) handleInv(ctx context.Context, p *peer, msg *wire.MsgInv) { +// log.Tracef("handleInv (%v)", p) +// defer log.Tracef("handleInv exit (%v)", p) +// +// // XXX this currently does nothing. Blocks are requested elsewhere. +// if true { +// return +// } +// +// var bis []tbcd.BlockIdentifier +// for k := range msg.InvList { +// switch msg.InvList[k].Type { +// case wire.InvTypeBlock: +// +// // XXX height is missing here, looks right but assert +// // that this isn't broken. +// log.Infof("handleInv: block %v", msg.InvList[k].Hash) +// +// bis = append(bis, tbcd.BlockIdentifier{ +// Hash: msg.InvList[k].Hash[:], // fake out +// }) +// log.Infof("handleInv: block %v", msg.InvList[k].Hash) +// case wire.InvTypeTx: +// // XXX silence mempool for now +// return +// default: +// log.Infof("handleInv: skipping inv type %v", msg.InvList[k].Type) +// return +// } +// } +// +// // XXX This happens during block header download, we should not react +// // Probably move into the invtype switch +// // log.Infof("download blocks if we like them") +// // if len(bis) > 0 { +// // s.mtx.Lock() +// // defer s.mtx.Unlock() +// // err := s.downloadBlocks(ctx, bis) +// // if err != nil { +// // log.Errorf("download blocks: %v", err) +// // return +// // } +// // } +//} + +//func (s *Server) txIndexer(ctx context.Context) { +// log.Tracef("txIndexer") +// defer log.Tracef("txIndexer exit") +// +// if !s.cfg.AutoIndex { +// return +// } +// +// // only one txIndexer may run at any given time +// s.mtx.Lock() +// if s.txIndexerRunning { +// s.mtx.Unlock() +// return +// } +// s.txIndexerRunning = true +// s.mtx.Unlock() +// +// // mark txIndexer not running on exit +// defer func() { +// s.mtx.Lock() +// s.txIndexerRunning = false +// s.mtx.Unlock() +// }() +// +// if s.blocksMissing(ctx) { +// return +// } +// +// // Get height from db +// he, err := s.db.MetadataGet(ctx, TxIndexHeightKey) +// if err != nil { +// if !errors.Is(err, database.ErrNotFound) { +// log.Errorf("tx indexer metadata get: %v", err) +// return +// } +// he = make([]byte, 8) +// } +// h := binary.BigEndian.Uint64(he) +// +// // Skip txIndexer if we are at best block height. This is a bit racy. +// bhb, err := s.db.BlockHeaderBest(ctx) +// if err != nil { +// log.Errorf("utxo indexer block headers best: %v", err) +// return +// } +// if bhb.Height != h-1 { +// err = s.TxIndexer(ctx, h, 0) +// if err != nil { +// log.Errorf("tx indexer: %v", err) +// return +// } +// } +//} + +//func (s *Server) utxoIndexer(ctx context.Context) { +// log.Tracef("utxoIndexer") +// defer log.Tracef("utxoIndexer exit") +// +// if !s.cfg.AutoIndex { +// return +// } +// +// // only one utxoIndexer may run at any given time +// s.mtx.Lock() +// if s.utxoIndexerRunning { +// s.mtx.Unlock() +// return +// } +// s.utxoIndexerRunning = true +// s.mtx.Unlock() +// +// // mark utxoIndexer not running on exit +// defer func() { +// s.mtx.Lock() +// s.utxoIndexerRunning = false +// s.mtx.Unlock() +// }() +// +// // exit if we aren't synced +// if s.blocksMissing(ctx) { +// return +// } +// +// // Index all utxos +// +// // Get height from db +// he, err := s.db.MetadataGet(ctx, UtxoIndexHeightKey) +// if err != nil { +// if !errors.Is(err, database.ErrNotFound) { +// log.Errorf("utxo indexer metadata get: %v", err) +// return +// } +// he = make([]byte, 8) +// } +// h := binary.BigEndian.Uint64(he) +// +// // Skip UtxoIndex if we are at best block height. This is a bit racy. +// bhb, err := s.db.BlockHeaderBest(ctx) +// if err != nil { +// log.Errorf("utxo indexer block headers best: %v", err) +// return +// } +// if bhb.Height != h-1 { +// err = s.UtxoIndexer(ctx, h, 0) +// if err != nil { +// log.Errorf("utxo indexer: %v", err) +// return +// } +// } +// +// // When utxo sync completes kick off tx sync +// go s.txIndexer(ctx) +//} func (s *Server) downloadBlock(ctx context.Context, p *peer, ch *chainhash.Hash) { log.Tracef("downloadBlock") @@ -1043,7 +1056,8 @@ func (s *Server) syncBlocks(ctx context.Context) { log.Tracef("syncBlocks") defer log.Tracef("syncBlocks exit") - // Prevent race condition with 'want', which may cause the cache capacity to be exceeded. + // Prevent race condition with 'want', which may cause the cache + // capacity to be exceeded. s.mtx.Lock() defer s.mtx.Unlock() @@ -1057,6 +1071,27 @@ func (s *Server) syncBlocks(ctx context.Context) { return } + if len(bm) == 0 { + // if we are complete we need to kick off utxo sync + s.quiesced = true // XXX if it's set and we exit with an error, what should we do?? + // XXX this really should be hash based + bhb, err := s.db.BlockHeaderBest(ctx) + if err != nil { + log.Errorf("sync blocks best block header: %v", err) + return + } + go func() { + // we really want to push the indexing reentrancy into this call + log.Infof("quiescing p2p and indexing to: %v", bhb.Height) + err = s.SyncIndexersToHeight(ctx, bhb.Height) + if err != nil { + log.Errorf("sync blocks: %v", err) + return + } + }() + return + } + for k := range bm { bi := bm[k] hash, _ := chainhash.NewHash(bi.Hash[:]) @@ -1077,17 +1112,19 @@ func (s *Server) syncBlocks(ctx context.Context) { s.blockExpired, nil) go s.downloadBlock(ctx, rp, hash) } - - if len(bm) == 0 { - // if we are complete we need to kick off utxo sync - go s.utxoIndexer(ctx) - } } func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeaders) { log.Tracef("handleHeaders (%v): %v", p, len(msg.Headers)) defer log.Tracef("handleHeaders exit (%v): %v", p, len(msg.Headers)) + s.mtx.Lock() + if s.clipped { + log.Infof("pretend we are at the height") + msg.Headers = msg.Headers[0:0] + } + s.mtx.Unlock() + if len(msg.Headers) == 0 { // This may signify the end of IBD but isn't 100%. We can fart // around with mean block time to determine if this peer is @@ -1189,6 +1226,10 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader log.Infof("Inserted (%v) %v block headers height %v", it, len(headers), lbh.Height) + s.mtx.Lock() + s.clipped = true + s.mtx.Unlock() + log.Infof("clipped at %v", lbh.Height) } } diff --git a/service/tbc/tbcfork_test.go b/service/tbc/tbcfork_test.go index 7ed2d6574..ec3ae538d 100644 --- a/service/tbc/tbcfork_test.go +++ b/service/tbc/tbcfork_test.go @@ -524,7 +524,7 @@ func TestFork(t *testing.T) { // Connect tbc service cfg := &Config{ - AutoIndex: false, + AutoIndex: true, BlockSanity: false, LevelDBHome: t.TempDir(), ListenAddress: tbcapi.DefaultListen, // TODO: should use random free port