From 04206a8cc13f73bc8419e2a7cb826777c4586f46 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Wed, 23 Oct 2024 08:54:27 -0400 Subject: [PATCH 1/5] Rebase drama --- service/tbc/tbc.go | 42 +++++++++++++++++------------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 6397ea00..63d50961 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -183,14 +183,10 @@ func NewServer(cfg *Config) (*Server, error) { broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16), } - log.Infof("MEMPOOL IS CURRENTLY BROKEN AND HAS BEEN DISABLED") - s.cfg.MempoolEnabled = false - if false { - if s.cfg.MempoolEnabled { - s.mempool, err = mempoolNew() - if err != nil { - return nil, err - } + if s.cfg.MempoolEnabled { + s.mempool, err = mempoolNew() + if err != nil { + return nil, err } } @@ -665,14 +661,6 @@ func (s *Server) handlePeer(ctx context.Context, p *peer) error { return err } - if s.cfg.MempoolEnabled { - // Start building the mempool. - err = p.write(defaultCmdTimeout, wire.NewMsgMemPool()) - if err != nil { - return err - } - } - // XXX wave hands here for now but we should get 3 peers to agree that // this is a fork indeed. @@ -1077,15 +1065,19 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader defer log.Tracef("handleHeaders exit (%v): %v", p, len(msg.Headers)) if len(msg.Headers) == 0 { - // This may signify the end of IBD but isn't 100%. We can fart - // around with mean block time to determine if this peer is - // just behind or if we are nominally where we should be. This - // test will never be 100% accurate. - - // only do this if peer is synced, not sure how to detect that. - - go s.syncBlocks(ctx) - + // This may signify the end of IBD but isn't 100%. + if s.blksMissing(ctx) { + go s.syncBlocks(ctx) + } else { + if s.cfg.MempoolEnabled { + // Start building the mempool. + log.Infof("starting to collect mempool tx'") + err := p.write(defaultCmdTimeout, wire.NewMsgMemPool()) + if err != nil { + return err + } + } + } return nil } From b458fb4ff02d4e9837632b0f28aa29bd6eeee8a8 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Tue, 22 Oct 2024 10:26:26 -0400 Subject: [PATCH 2/5] push some debug so we can test elsewhere --- service/tbc/crawler.go | 42 +++++++++++++-------------- service/tbc/tbc.go | 66 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 79 insertions(+), 29 deletions(-) diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index 441d87ef..d553151c 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -39,6 +39,7 @@ var ( ErrAlreadyIndexing = errors.New("already indexing") testnet3Checkpoints = map[chainhash.Hash]uint64{ + s2h("0000000000001242d96bedebc9f45a2ecdd40d393ca0d725f500fb4977a50582"): 3100000, s2h("0000000000003c46fc60e56b9c2ae202b1efec83fcc7899d21de16757dea40a4"): 3000000, s2h("000000000000001669469c0354b3f341a36b10ab099d1962f7ec4fae528b1f1d"): 2900000, s2h("0000000000000093bcb68c03a9a168ae252572d348a2eaeba2cdf9231d73206f"): 2500000, @@ -1302,30 +1303,10 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e // Mark indexing done. s.mtx.Lock() s.indexing = false - bhb, err := s.db.BlockHeaderBest(ctx) - if err != nil { - s.mtx.Unlock() - log.Errorf("sync indexers best: %v", err) - return - } - // get a random peer - p, err := s.pm.Random() - if err != nil { - s.mtx.Unlock() - log.Errorf("sync indexers random peer: %v", err) - return - } s.mtx.Unlock() - // XXX explain why we need to get more headers here - // continue getting headers, XXX this does not belong here either - // XXX if bh download fails we will get jammed. We need a queued "must execute this command" added to peer/service. - // XXX we may not want to do this when in special "driver mode" - log.Infof("resuming block header download at: %v", bhb.Height) - if err = s.getHeaders(ctx, p, bhb.BlockHash()); err != nil { - log.Errorf("sync indexers: %v", err) - return - } + // Get block headers + s.pm.All(ctx, s.headersPeer) }() log.Debugf("Syncing indexes to: %v", hash) @@ -1341,6 +1322,12 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e } log.Debugf("Done syncing to: %v", hash) + bh, err := s.db.BlockHeaderByHash(ctx, hash) + if err == nil { + return err + } + log.Infof("Syncing complete at: %v", bh.HH()) + return nil } @@ -1353,6 +1340,7 @@ func (s *Server) syncIndexersToBest(ctx context.Context) error { return err } + log.Infof("============ index utxo") // Index Utxo utxoHH, err := s.UtxoIndexHash(ctx) if err != nil { @@ -1385,6 +1373,7 @@ func (s *Server) syncIndexersToBest(ctx context.Context) error { return fmt.Errorf("utxo indexer: %w", err) } + log.Infof("============ index tx") // Index Tx txHH, err := s.TxIndexHash(ctx) if err != nil { @@ -1417,6 +1406,13 @@ func (s *Server) syncIndexersToBest(ctx context.Context) error { return fmt.Errorf("tx indexer: %w", err) } + // XXX remove? + bh, err := s.db.BlockHeaderByHash(ctx, &bhb.Hash) + if err == nil { + return err + } + log.Infof("Syncing complete at: %v", bh.HH()) + return nil } @@ -1429,6 +1425,7 @@ func (s *Server) SyncIndexersToBest(ctx context.Context) error { s.mtx.Unlock() return ErrAlreadyIndexing } + log.Infof("marking indexing true -----------------") s.indexing = true s.mtx.Unlock() @@ -1436,6 +1433,7 @@ func (s *Server) SyncIndexersToBest(ctx context.Context) error { s.mtx.Lock() s.indexing = false s.mtx.Unlock() + log.Infof("marking indexing false -----------------") }() return s.syncIndexersToBest(ctx) diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 63d50961..101666b9 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -183,6 +183,7 @@ func NewServer(cfg *Config) (*Server, error) { broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16), } + s.cfg.MempoolEnabled = false if s.cfg.MempoolEnabled { s.mempool, err = mempoolNew() if err != nil { @@ -271,6 +272,39 @@ func (s *Server) pingPeer(ctx context.Context, p *peer) { s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil) } +func (s *Server) mempoolPeer(ctx context.Context, p *peer) { + log.Tracef("mempoolPeer %v", p) + defer log.Tracef("mempoolPeer %v exit", p) + + s.mtx.Lock() + log.Infof("starting to collect mempool tx' %v %v", p, s.indexing) + s.mtx.Unlock() + + err := p.write(defaultCmdTimeout, wire.NewMsgMemPool()) + if err != nil { + log.Debugf("mempool %v: %v", p, err) + return + } +} +func (s *Server) headersPeer(ctx context.Context, p *peer) { + log.Tracef("headersPeer %v", p) + defer log.Tracef("headersPeer %v exit", p) + + s.mtx.Lock() + log.Infof("starting to collect block headers %v", p, s.indexing) + s.mtx.Unlock() + + bhb, err := s.db.BlockHeaderBest(ctx) + if err != nil { + log.Errorf("headers peer block header best: %v %v", p, err) + return + } + if err = s.getHeaders(ctx, p, bhb.BlockHash()); err != nil { + log.Errorf("headers peer sync indexers: %v", err) + return + } +} + func (s *Server) handleGeneric(ctx context.Context, p *peer, msg wire.Message, raw []byte) (bool, error) { // Do accept addr and ping commands before we consider the peer up. switch m := msg.(type) { @@ -605,7 +639,7 @@ func (s *Server) sod(ctx context.Context, p *peer) (*chainhash.Hash, error) { return &bhb.Hash, nil } if bhb.Height > uint64(p.remoteVersion.LastBlock) { - log.Debugf("sod: %v our tip is greater %v > %v", + log.Infof("sod: %v our tip is greater %v > %v", p, bhb.Height, p.remoteVersion.LastBlock) return &bhb.Hash, nil } @@ -703,7 +737,7 @@ func (s *Server) handlePeer(ctx context.Context, p *peer) error { // When quiesced do not handle other p2p commands. s.mtx.Lock() if s.indexing { - log.Debugf("indexing %v", s.indexing) + log.Infof("indexing %v", s.indexing) s.mtx.Unlock() continue } @@ -799,6 +833,20 @@ func (s *Server) promPoll(ctx context.Context) error { s.prom.syncInfo = s.Synced(ctx) s.prom.connected, s.prom.good, s.prom.bad = s.pm.Stats() + s.mtx.RLock() + var ( + mempoolCount int + mempoolSize int + ) + if s.cfg.MempoolEnabled { + mempoolCount, mempoolSize = s.mempool.stats(ctx) + } + log.Infof("Pending blocks %v/%v connected peers %v good peers %v "+ + "bad peers %v mempool %v %v", + s.blocks.Len(), defaultPendingBlocks, s.prom.connected, s.prom.good, + s.prom.bad, mempoolCount, humanize.Bytes(uint64(mempoolSize))) + s.mtx.RUnlock() + } } @@ -1034,6 +1082,9 @@ func (s *Server) syncBlocks(ctx context.Context) { // XXX this is probably not a panic. panic(fmt.Errorf("sync blocks: %w", err)) } + + // Get block headers + s.pm.All(ctx, s.headersPeer) }() return } @@ -1067,15 +1118,15 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader if len(msg.Headers) == 0 { // This may signify the end of IBD but isn't 100%. if s.blksMissing(ctx) { + bhb, err := s.db.BlockHeaderBest(ctx) + if err == nil { + log.Infof("blockheaders caught up at: %v", bhb.HH()) + } go s.syncBlocks(ctx) } else { if s.cfg.MempoolEnabled { // Start building the mempool. - log.Infof("starting to collect mempool tx'") - err := p.write(defaultCmdTimeout, wire.NewMsgMemPool()) - if err != nil { - return err - } + s.pm.All(ctx, s.mempoolPeer) } } return nil @@ -2030,6 +2081,7 @@ func (s *Server) Run(pctx context.Context) error { return } go func(pp *peer) { + log.Infof("calling handlePeer %v", pp) err := s.handlePeer(ctx, pp) if err != nil { log.Debugf("%v: %v", pp, err) From 916647686b58fe25454398d6812cc361924b8e52 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Wed, 23 Oct 2024 08:52:14 -0400 Subject: [PATCH 3/5] Silence debug --- service/tbc/crawler.go | 4 ---- service/tbc/tbc.go | 17 ++++++++++------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index d553151c..44f579df 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -1340,7 +1340,6 @@ func (s *Server) syncIndexersToBest(ctx context.Context) error { return err } - log.Infof("============ index utxo") // Index Utxo utxoHH, err := s.UtxoIndexHash(ctx) if err != nil { @@ -1373,7 +1372,6 @@ func (s *Server) syncIndexersToBest(ctx context.Context) error { return fmt.Errorf("utxo indexer: %w", err) } - log.Infof("============ index tx") // Index Tx txHH, err := s.TxIndexHash(ctx) if err != nil { @@ -1425,7 +1423,6 @@ func (s *Server) SyncIndexersToBest(ctx context.Context) error { s.mtx.Unlock() return ErrAlreadyIndexing } - log.Infof("marking indexing true -----------------") s.indexing = true s.mtx.Unlock() @@ -1433,7 +1430,6 @@ func (s *Server) SyncIndexersToBest(ctx context.Context) error { s.mtx.Lock() s.indexing = false s.mtx.Unlock() - log.Infof("marking indexing false -----------------") }() return s.syncIndexersToBest(ctx) diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 101666b9..a51bb356 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -138,7 +138,8 @@ type Server struct { db tbcd.Database // Prometheus - prom struct { + promPollVerbose bool // set to true to print stats during poll + prom struct { syncInfo SyncInfo connected, good, bad int } // periodically updated by promPoll @@ -737,7 +738,7 @@ func (s *Server) handlePeer(ctx context.Context, p *peer) error { // When quiesced do not handle other p2p commands. s.mtx.Lock() if s.indexing { - log.Infof("indexing %v", s.indexing) + log.Debugf("indexing %v", s.indexing) s.mtx.Unlock() continue } @@ -841,10 +842,13 @@ func (s *Server) promPoll(ctx context.Context) error { if s.cfg.MempoolEnabled { mempoolCount, mempoolSize = s.mempool.stats(ctx) } - log.Infof("Pending blocks %v/%v connected peers %v good peers %v "+ - "bad peers %v mempool %v %v", - s.blocks.Len(), defaultPendingBlocks, s.prom.connected, s.prom.good, - s.prom.bad, mempoolCount, humanize.Bytes(uint64(mempoolSize))) + if s.promPollVerbose { + log.Infof("Pending blocks %v/%v connected peers %v "+ + "good peers %v bad peers %v mempool %v %v", + s.blocks.Len(), defaultPendingBlocks, s.prom.connected, + s.prom.good, s.prom.bad, mempoolCount, + humanize.Bytes(uint64(mempoolSize))) + } s.mtx.RUnlock() } @@ -2081,7 +2085,6 @@ func (s *Server) Run(pctx context.Context) error { return } go func(pp *peer) { - log.Infof("calling handlePeer %v", pp) err := s.handlePeer(ctx, pp) if err != nil { log.Debugf("%v: %v", pp, err) From e8606caa81b4dbd771d4674a46a40e8c51c616ab Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Wed, 23 Oct 2024 08:58:06 -0400 Subject: [PATCH 4/5] Silence is golden --- service/tbc/crawler.go | 3 +-- service/tbc/tbc.go | 4 ---- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index 44f579df..27933fa1 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -998,7 +998,7 @@ func (s *Server) TxIndexerUnwind(ctx context.Context, startBH, endBH *tbcd.Block log.Tracef("TxIndexerUnwind") defer log.Tracef("TxIndexerUnwind exit") - // XXX dedup with TxIndexedWind; it's basically the same code but with the direction, start anf endhas flipped + // XXX dedup with TxIndexerWind; it's basically the same code but with the direction, start anf endhas flipped s.mtx.Lock() if !s.indexing { @@ -1404,7 +1404,6 @@ func (s *Server) syncIndexersToBest(ctx context.Context) error { return fmt.Errorf("tx indexer: %w", err) } - // XXX remove? bh, err := s.db.BlockHeaderByHash(ctx, &bhb.Hash) if err == nil { return err diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index a51bb356..001a79b2 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -291,10 +291,6 @@ func (s *Server) headersPeer(ctx context.Context, p *peer) { log.Tracef("headersPeer %v", p) defer log.Tracef("headersPeer %v exit", p) - s.mtx.Lock() - log.Infof("starting to collect block headers %v", p, s.indexing) - s.mtx.Unlock() - bhb, err := s.db.BlockHeaderBest(ctx) if err != nil { log.Errorf("headers peer block header best: %v %v", p, err) From 854965ea5628a9511bd837d5c88a3c630865f5a9 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Wed, 23 Oct 2024 09:30:19 -0400 Subject: [PATCH 5/5] When synced on connect ask mempool and add prom gauges for mempool --- service/tbc/tbc.go | 66 +++++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 001a79b2..878c8618 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -140,8 +140,9 @@ type Server struct { // Prometheus promPollVerbose bool // set to true to print stats during poll prom struct { - syncInfo SyncInfo - connected, good, bad int + syncInfo SyncInfo + connected, good, bad int + mempoolCount, mempoolSize int } // periodically updated by promPoll isRunning bool cmdsProcessed prometheus.Counter @@ -179,12 +180,12 @@ func NewServer(cfg *Config) (*Server, error) { Name: "rpc_calls_total", Help: "The total number of successful RPC commands", }), - sessions: make(map[string]*tbcWs), - requestTimeout: defaultRequestTimeout, - broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16), + sessions: make(map[string]*tbcWs), + requestTimeout: defaultRequestTimeout, + broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16), + promPollVerbose: false, } - s.cfg.MempoolEnabled = false if s.cfg.MempoolEnabled { s.mempool, err = mempoolNew() if err != nil { @@ -277,9 +278,9 @@ func (s *Server) mempoolPeer(ctx context.Context, p *peer) { log.Tracef("mempoolPeer %v", p) defer log.Tracef("mempoolPeer %v exit", p) - s.mtx.Lock() - log.Infof("starting to collect mempool tx' %v %v", p, s.indexing) - s.mtx.Unlock() + if !s.cfg.MempoolEnabled { + return + } err := p.write(defaultCmdTimeout, wire.NewMsgMemPool()) if err != nil { @@ -287,6 +288,7 @@ func (s *Server) mempoolPeer(ctx context.Context, p *peer) { return } } + func (s *Server) headersPeer(ctx context.Context, p *peer) { log.Tracef("headersPeer %v", p) defer log.Tracef("headersPeer %v exit", p) @@ -692,6 +694,14 @@ func (s *Server) handlePeer(ctx context.Context, p *peer) error { return err } + // If we are caught up and start collecting mempool data. + if s.cfg.MempoolEnabled && s.Synced(ctx).Synced { + err := p.write(defaultCmdTimeout, wire.NewMsgMemPool()) + if err != nil { + return fmt.Errorf("mempool %v: %w", p, err) + } + } + // XXX wave hands here for now but we should get 3 peers to agree that // this is a fork indeed. @@ -819,6 +829,18 @@ func (s *Server) promBadPeers() float64 { return float64(s.prom.bad) } +func (s *Server) promMempoolCount() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.mempoolCount) +} + +func (s *Server) promMempoolSize() float64 { + s.mtx.Lock() + defer s.mtx.Unlock() + return float64(s.prom.mempoolSize) +} + func (s *Server) promPoll(ctx context.Context) error { for { select { @@ -829,23 +851,19 @@ func (s *Server) promPoll(ctx context.Context) error { s.prom.syncInfo = s.Synced(ctx) s.prom.connected, s.prom.good, s.prom.bad = s.pm.Stats() - - s.mtx.RLock() - var ( - mempoolCount int - mempoolSize int - ) if s.cfg.MempoolEnabled { - mempoolCount, mempoolSize = s.mempool.stats(ctx) + s.prom.mempoolCount, s.prom.mempoolSize = s.mempool.stats(ctx) } + if s.promPollVerbose { + s.mtx.RLock() log.Infof("Pending blocks %v/%v connected peers %v "+ "good peers %v bad peers %v mempool %v %v", s.blocks.Len(), defaultPendingBlocks, s.prom.connected, - s.prom.good, s.prom.bad, mempoolCount, - humanize.Bytes(uint64(mempoolSize))) + s.prom.good, s.prom.bad, s.prom.mempoolCount, + humanize.Bytes(uint64(s.prom.mempoolSize))) + s.mtx.RUnlock() } - s.mtx.RUnlock() } } @@ -2033,6 +2051,16 @@ func (s *Server) Run(pctx context.Context) error { Name: "peers_bad", Help: "Number of bad peers.", }, s.promBadPeers), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "mempool_count", + Help: "Number of txs in mempool.", + }, s.promMempoolCount), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Subsystem: promSubsystem, + Name: "mempool_size", + Help: "Size of mempool in bytes.", + }, s.promMempoolSize), } s.wg.Add(1) go func() {