Skip to content

Commit

Permalink
When synced on connect ask mempool and add prom gauges for mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
marcopeereboom committed Oct 23, 2024
1 parent e8606ca commit 854965e
Showing 1 changed file with 47 additions and 19 deletions.
66 changes: 47 additions & 19 deletions service/tbc/tbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ type Server struct {
// Prometheus
promPollVerbose bool // set to true to print stats during poll
prom struct {
syncInfo SyncInfo
connected, good, bad int
syncInfo SyncInfo
connected, good, bad int
mempoolCount, mempoolSize int
} // periodically updated by promPoll
isRunning bool
cmdsProcessed prometheus.Counter
Expand Down Expand Up @@ -179,12 +180,12 @@ func NewServer(cfg *Config) (*Server, error) {
Name: "rpc_calls_total",
Help: "The total number of successful RPC commands",
}),
sessions: make(map[string]*tbcWs),
requestTimeout: defaultRequestTimeout,
broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16),
sessions: make(map[string]*tbcWs),
requestTimeout: defaultRequestTimeout,
broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16),
promPollVerbose: false,
}

s.cfg.MempoolEnabled = false
if s.cfg.MempoolEnabled {
s.mempool, err = mempoolNew()
if err != nil {
Expand Down Expand Up @@ -277,16 +278,17 @@ func (s *Server) mempoolPeer(ctx context.Context, p *peer) {
log.Tracef("mempoolPeer %v", p)
defer log.Tracef("mempoolPeer %v exit", p)

s.mtx.Lock()
log.Infof("starting to collect mempool tx' %v %v", p, s.indexing)
s.mtx.Unlock()
if !s.cfg.MempoolEnabled {
return
}

err := p.write(defaultCmdTimeout, wire.NewMsgMemPool())
if err != nil {
log.Debugf("mempool %v: %v", p, err)
return
}
}

func (s *Server) headersPeer(ctx context.Context, p *peer) {
log.Tracef("headersPeer %v", p)
defer log.Tracef("headersPeer %v exit", p)
Expand Down Expand Up @@ -692,6 +694,14 @@ func (s *Server) handlePeer(ctx context.Context, p *peer) error {
return err
}

// If we are caught up and start collecting mempool data.
if s.cfg.MempoolEnabled && s.Synced(ctx).Synced {
err := p.write(defaultCmdTimeout, wire.NewMsgMemPool())
if err != nil {
return fmt.Errorf("mempool %v: %w", p, err)
}
}

// XXX wave hands here for now but we should get 3 peers to agree that
// this is a fork indeed.

Expand Down Expand Up @@ -819,6 +829,18 @@ func (s *Server) promBadPeers() float64 {
return float64(s.prom.bad)
}

func (s *Server) promMempoolCount() float64 {
s.mtx.Lock()
defer s.mtx.Unlock()
return float64(s.prom.mempoolCount)
}

func (s *Server) promMempoolSize() float64 {
s.mtx.Lock()
defer s.mtx.Unlock()
return float64(s.prom.mempoolSize)
}

func (s *Server) promPoll(ctx context.Context) error {
for {
select {
Expand All @@ -829,23 +851,19 @@ func (s *Server) promPoll(ctx context.Context) error {

s.prom.syncInfo = s.Synced(ctx)
s.prom.connected, s.prom.good, s.prom.bad = s.pm.Stats()

s.mtx.RLock()
var (
mempoolCount int
mempoolSize int
)
if s.cfg.MempoolEnabled {
mempoolCount, mempoolSize = s.mempool.stats(ctx)
s.prom.mempoolCount, s.prom.mempoolSize = s.mempool.stats(ctx)
}

if s.promPollVerbose {
s.mtx.RLock()
log.Infof("Pending blocks %v/%v connected peers %v "+
"good peers %v bad peers %v mempool %v %v",
s.blocks.Len(), defaultPendingBlocks, s.prom.connected,
s.prom.good, s.prom.bad, mempoolCount,
humanize.Bytes(uint64(mempoolSize)))
s.prom.good, s.prom.bad, s.prom.mempoolCount,
humanize.Bytes(uint64(s.prom.mempoolSize)))
s.mtx.RUnlock()
}
s.mtx.RUnlock()

}
}
Expand Down Expand Up @@ -2033,6 +2051,16 @@ func (s *Server) Run(pctx context.Context) error {
Name: "peers_bad",
Help: "Number of bad peers.",
}, s.promBadPeers),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Subsystem: promSubsystem,
Name: "mempool_count",
Help: "Number of txs in mempool.",
}, s.promMempoolCount),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Subsystem: promSubsystem,
Name: "mempool_size",
Help: "Size of mempool in bytes.",
}, s.promMempoolSize),
}
s.wg.Add(1)
go func() {
Expand Down

0 comments on commit 854965e

Please sign in to comment.