Skip to content

Commit

Permalink
Bring back mempool and sundry fixes. (#284)
Browse files Browse the repository at this point in the history
* Rebase drama

* push some debug so we can test elsewhere

* Silence debug

* Silence is golden

* When synced on connect ask mempool and add prom gauges for mempool
  • Loading branch information
marcopeereboom authored Oct 23, 2024
1 parent 2160146 commit 87a6b60
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 51 deletions.
39 changes: 16 additions & 23 deletions service/tbc/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
ErrAlreadyIndexing = errors.New("already indexing")

testnet3Checkpoints = map[chainhash.Hash]uint64{
s2h("0000000000001242d96bedebc9f45a2ecdd40d393ca0d725f500fb4977a50582"): 3100000,
s2h("0000000000003c46fc60e56b9c2ae202b1efec83fcc7899d21de16757dea40a4"): 3000000,
s2h("000000000000001669469c0354b3f341a36b10ab099d1962f7ec4fae528b1f1d"): 2900000,
s2h("0000000000000093bcb68c03a9a168ae252572d348a2eaeba2cdf9231d73206f"): 2500000,
Expand Down Expand Up @@ -997,7 +998,7 @@ func (s *Server) TxIndexerUnwind(ctx context.Context, startBH, endBH *tbcd.Block
log.Tracef("TxIndexerUnwind")
defer log.Tracef("TxIndexerUnwind exit")

// XXX dedup with TxIndexedWind; it's basically the same code but with the direction, start anf endhas flipped
// XXX dedup with TxIndexerWind; it's basically the same code but with the direction, start anf endhas flipped

s.mtx.Lock()
if !s.indexing {
Expand Down Expand Up @@ -1302,30 +1303,10 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e
// Mark indexing done.
s.mtx.Lock()
s.indexing = false
bhb, err := s.db.BlockHeaderBest(ctx)
if err != nil {
s.mtx.Unlock()
log.Errorf("sync indexers best: %v", err)
return
}
// get a random peer
p, err := s.pm.Random()
if err != nil {
s.mtx.Unlock()
log.Errorf("sync indexers random peer: %v", err)
return
}
s.mtx.Unlock()

// XXX explain why we need to get more headers here
// continue getting headers, XXX this does not belong here either
// XXX if bh download fails we will get jammed. We need a queued "must execute this command" added to peer/service.
// XXX we may not want to do this when in special "driver mode"
log.Infof("resuming block header download at: %v", bhb.Height)
if err = s.getHeaders(ctx, p, bhb.BlockHash()); err != nil {
log.Errorf("sync indexers: %v", err)
return
}
// Get block headers
s.pm.All(ctx, s.headersPeer)
}()

log.Debugf("Syncing indexes to: %v", hash)
Expand All @@ -1341,6 +1322,12 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e
}
log.Debugf("Done syncing to: %v", hash)

bh, err := s.db.BlockHeaderByHash(ctx, hash)
if err == nil {
return err
}
log.Infof("Syncing complete at: %v", bh.HH())

return nil
}

Expand Down Expand Up @@ -1417,6 +1404,12 @@ func (s *Server) syncIndexersToBest(ctx context.Context) error {
return fmt.Errorf("tx indexer: %w", err)
}

bh, err := s.db.BlockHeaderByHash(ctx, &bhb.Hash)
if err == nil {
return err
}
log.Infof("Syncing complete at: %v", bh.HH())

return nil
}

Expand Down
127 changes: 99 additions & 28 deletions service/tbc/tbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,11 @@ type Server struct {
db tbcd.Database

// Prometheus
prom struct {
syncInfo SyncInfo
connected, good, bad int
promPollVerbose bool // set to true to print stats during poll
prom struct {
syncInfo SyncInfo
connected, good, bad int
mempoolCount, mempoolSize int
} // periodically updated by promPoll
isRunning bool
cmdsProcessed prometheus.Counter
Expand Down Expand Up @@ -178,19 +180,16 @@ func NewServer(cfg *Config) (*Server, error) {
Name: "rpc_calls_total",
Help: "The total number of successful RPC commands",
}),
sessions: make(map[string]*tbcWs),
requestTimeout: defaultRequestTimeout,
broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16),
sessions: make(map[string]*tbcWs),
requestTimeout: defaultRequestTimeout,
broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16),
promPollVerbose: false,
}

log.Infof("MEMPOOL IS CURRENTLY BROKEN AND HAS BEEN DISABLED")
s.cfg.MempoolEnabled = false
if false {
if s.cfg.MempoolEnabled {
s.mempool, err = mempoolNew()
if err != nil {
return nil, err
}
if s.cfg.MempoolEnabled {
s.mempool, err = mempoolNew()
if err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -275,6 +274,36 @@ func (s *Server) pingPeer(ctx context.Context, p *peer) {
s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil)
}

func (s *Server) mempoolPeer(ctx context.Context, p *peer) {
log.Tracef("mempoolPeer %v", p)
defer log.Tracef("mempoolPeer %v exit", p)

if !s.cfg.MempoolEnabled {
return
}

err := p.write(defaultCmdTimeout, wire.NewMsgMemPool())
if err != nil {
log.Debugf("mempool %v: %v", p, err)
return
}
}

func (s *Server) headersPeer(ctx context.Context, p *peer) {
log.Tracef("headersPeer %v", p)
defer log.Tracef("headersPeer %v exit", p)

bhb, err := s.db.BlockHeaderBest(ctx)
if err != nil {
log.Errorf("headers peer block header best: %v %v", p, err)
return
}
if err = s.getHeaders(ctx, p, bhb.BlockHash()); err != nil {
log.Errorf("headers peer sync indexers: %v", err)
return
}
}

func (s *Server) handleGeneric(ctx context.Context, p *peer, msg wire.Message, raw []byte) (bool, error) {
// Do accept addr and ping commands before we consider the peer up.
switch m := msg.(type) {
Expand Down Expand Up @@ -609,7 +638,7 @@ func (s *Server) sod(ctx context.Context, p *peer) (*chainhash.Hash, error) {
return &bhb.Hash, nil
}
if bhb.Height > uint64(p.remoteVersion.LastBlock) {
log.Debugf("sod: %v our tip is greater %v > %v",
log.Infof("sod: %v our tip is greater %v > %v",
p, bhb.Height, p.remoteVersion.LastBlock)
return &bhb.Hash, nil
}
Expand Down Expand Up @@ -665,11 +694,11 @@ func (s *Server) handlePeer(ctx context.Context, p *peer) error {
return err
}

if s.cfg.MempoolEnabled {
// Start building the mempool.
err = p.write(defaultCmdTimeout, wire.NewMsgMemPool())
// If we are caught up and start collecting mempool data.
if s.cfg.MempoolEnabled && s.Synced(ctx).Synced {
err := p.write(defaultCmdTimeout, wire.NewMsgMemPool())
if err != nil {
return err
return fmt.Errorf("mempool %v: %w", p, err)
}
}

Expand Down Expand Up @@ -800,6 +829,18 @@ func (s *Server) promBadPeers() float64 {
return float64(s.prom.bad)
}

func (s *Server) promMempoolCount() float64 {
s.mtx.Lock()
defer s.mtx.Unlock()
return float64(s.prom.mempoolCount)
}

func (s *Server) promMempoolSize() float64 {
s.mtx.Lock()
defer s.mtx.Unlock()
return float64(s.prom.mempoolSize)
}

func (s *Server) promPoll(ctx context.Context) error {
for {
select {
Expand All @@ -810,6 +851,19 @@ func (s *Server) promPoll(ctx context.Context) error {

s.prom.syncInfo = s.Synced(ctx)
s.prom.connected, s.prom.good, s.prom.bad = s.pm.Stats()
if s.cfg.MempoolEnabled {
s.prom.mempoolCount, s.prom.mempoolSize = s.mempool.stats(ctx)
}

if s.promPollVerbose {
s.mtx.RLock()
log.Infof("Pending blocks %v/%v connected peers %v "+
"good peers %v bad peers %v mempool %v %v",
s.blocks.Len(), defaultPendingBlocks, s.prom.connected,
s.prom.good, s.prom.bad, s.prom.mempoolCount,
humanize.Bytes(uint64(s.prom.mempoolSize)))
s.mtx.RUnlock()
}

}
}
Expand Down Expand Up @@ -1046,6 +1100,9 @@ func (s *Server) syncBlocks(ctx context.Context) {
// XXX this is probably not a panic.
panic(fmt.Errorf("sync blocks: %w", err))
}

// Get block headers
s.pm.All(ctx, s.headersPeer)
}()
return
}
Expand Down Expand Up @@ -1077,15 +1134,19 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader
defer log.Tracef("handleHeaders exit (%v): %v", p, len(msg.Headers))

if len(msg.Headers) == 0 {
// This may signify the end of IBD but isn't 100%. We can fart
// around with mean block time to determine if this peer is
// just behind or if we are nominally where we should be. This
// test will never be 100% accurate.

// only do this if peer is synced, not sure how to detect that.

go s.syncBlocks(ctx)

// This may signify the end of IBD but isn't 100%.
if s.blksMissing(ctx) {
bhb, err := s.db.BlockHeaderBest(ctx)
if err == nil {
log.Infof("blockheaders caught up at: %v", bhb.HH())
}
go s.syncBlocks(ctx)
} else {
if s.cfg.MempoolEnabled {
// Start building the mempool.
s.pm.All(ctx, s.mempoolPeer)
}
}
return nil
}

Expand Down Expand Up @@ -1990,6 +2051,16 @@ func (s *Server) Run(pctx context.Context) error {
Name: "peers_bad",
Help: "Number of bad peers.",
}, s.promBadPeers),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Subsystem: promSubsystem,
Name: "mempool_count",
Help: "Number of txs in mempool.",
}, s.promMempoolCount),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Subsystem: promSubsystem,
Name: "mempool_size",
Help: "Size of mempool in bytes.",
}, s.promMempoolSize),
}
s.wg.Add(1)
go func() {
Expand Down

0 comments on commit 87a6b60

Please sign in to comment.