Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring back mempool and sundry fixes. #284

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions service/tbc/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
ErrAlreadyIndexing = errors.New("already indexing")

testnet3Checkpoints = map[chainhash.Hash]uint64{
s2h("0000000000001242d96bedebc9f45a2ecdd40d393ca0d725f500fb4977a50582"): 3100000,
s2h("0000000000003c46fc60e56b9c2ae202b1efec83fcc7899d21de16757dea40a4"): 3000000,
s2h("000000000000001669469c0354b3f341a36b10ab099d1962f7ec4fae528b1f1d"): 2900000,
s2h("0000000000000093bcb68c03a9a168ae252572d348a2eaeba2cdf9231d73206f"): 2500000,
Expand Down Expand Up @@ -997,7 +998,7 @@ func (s *Server) TxIndexerUnwind(ctx context.Context, startBH, endBH *tbcd.Block
log.Tracef("TxIndexerUnwind")
defer log.Tracef("TxIndexerUnwind exit")

// XXX dedup with TxIndexedWind; it's basically the same code but with the direction, start anf endhas flipped
// XXX dedup with TxIndexerWind; it's basically the same code but with the direction, start anf endhas flipped

s.mtx.Lock()
if !s.indexing {
Expand Down Expand Up @@ -1302,30 +1303,10 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e
// Mark indexing done.
s.mtx.Lock()
s.indexing = false
bhb, err := s.db.BlockHeaderBest(ctx)
if err != nil {
s.mtx.Unlock()
log.Errorf("sync indexers best: %v", err)
return
}
// get a random peer
p, err := s.pm.Random()
if err != nil {
s.mtx.Unlock()
log.Errorf("sync indexers random peer: %v", err)
return
}
s.mtx.Unlock()

// XXX explain why we need to get more headers here
// continue getting headers, XXX this does not belong here either
// XXX if bh download fails we will get jammed. We need a queued "must execute this command" added to peer/service.
// XXX we may not want to do this when in special "driver mode"
log.Infof("resuming block header download at: %v", bhb.Height)
if err = s.getHeaders(ctx, p, bhb.BlockHash()); err != nil {
log.Errorf("sync indexers: %v", err)
return
}
// Get block headers
s.pm.All(ctx, s.headersPeer)
}()

log.Debugf("Syncing indexes to: %v", hash)
Expand All @@ -1341,6 +1322,12 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e
}
log.Debugf("Done syncing to: %v", hash)

bh, err := s.db.BlockHeaderByHash(ctx, hash)
if err == nil {
return err
}
log.Infof("Syncing complete at: %v", bh.HH())

return nil
}

Expand Down Expand Up @@ -1417,6 +1404,12 @@ func (s *Server) syncIndexersToBest(ctx context.Context) error {
return fmt.Errorf("tx indexer: %w", err)
}

bh, err := s.db.BlockHeaderByHash(ctx, &bhb.Hash)
if err == nil {
return err
}
log.Infof("Syncing complete at: %v", bh.HH())

return nil
}

Expand Down
127 changes: 99 additions & 28 deletions service/tbc/tbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,11 @@ type Server struct {
db tbcd.Database

// Prometheus
prom struct {
syncInfo SyncInfo
connected, good, bad int
promPollVerbose bool // set to true to print stats during poll
prom struct {
syncInfo SyncInfo
connected, good, bad int
mempoolCount, mempoolSize int
} // periodically updated by promPoll
isRunning bool
cmdsProcessed prometheus.Counter
Expand Down Expand Up @@ -178,19 +180,16 @@ func NewServer(cfg *Config) (*Server, error) {
Name: "rpc_calls_total",
Help: "The total number of successful RPC commands",
}),
sessions: make(map[string]*tbcWs),
requestTimeout: defaultRequestTimeout,
broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16),
sessions: make(map[string]*tbcWs),
requestTimeout: defaultRequestTimeout,
broadcast: make(map[chainhash.Hash]*wire.MsgTx, 16),
promPollVerbose: false,
}

log.Infof("MEMPOOL IS CURRENTLY BROKEN AND HAS BEEN DISABLED")
s.cfg.MempoolEnabled = false
if false {
if s.cfg.MempoolEnabled {
s.mempool, err = mempoolNew()
if err != nil {
return nil, err
}
if s.cfg.MempoolEnabled {
s.mempool, err = mempoolNew()
if err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -275,6 +274,36 @@ func (s *Server) pingPeer(ctx context.Context, p *peer) {
s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil)
}

func (s *Server) mempoolPeer(ctx context.Context, p *peer) {
log.Tracef("mempoolPeer %v", p)
defer log.Tracef("mempoolPeer %v exit", p)

if !s.cfg.MempoolEnabled {
return
}

err := p.write(defaultCmdTimeout, wire.NewMsgMemPool())
if err != nil {
log.Debugf("mempool %v: %v", p, err)
return
}
}

func (s *Server) headersPeer(ctx context.Context, p *peer) {
log.Tracef("headersPeer %v", p)
defer log.Tracef("headersPeer %v exit", p)

bhb, err := s.db.BlockHeaderBest(ctx)
if err != nil {
log.Errorf("headers peer block header best: %v %v", p, err)
return
}
if err = s.getHeaders(ctx, p, bhb.BlockHash()); err != nil {
log.Errorf("headers peer sync indexers: %v", err)
return
}
}

func (s *Server) handleGeneric(ctx context.Context, p *peer, msg wire.Message, raw []byte) (bool, error) {
// Do accept addr and ping commands before we consider the peer up.
switch m := msg.(type) {
Expand Down Expand Up @@ -609,7 +638,7 @@ func (s *Server) sod(ctx context.Context, p *peer) (*chainhash.Hash, error) {
return &bhb.Hash, nil
}
if bhb.Height > uint64(p.remoteVersion.LastBlock) {
log.Debugf("sod: %v our tip is greater %v > %v",
marcopeereboom marked this conversation as resolved.
Show resolved Hide resolved
log.Infof("sod: %v our tip is greater %v > %v",
p, bhb.Height, p.remoteVersion.LastBlock)
return &bhb.Hash, nil
}
Expand Down Expand Up @@ -665,11 +694,11 @@ func (s *Server) handlePeer(ctx context.Context, p *peer) error {
return err
}

if s.cfg.MempoolEnabled {
// Start building the mempool.
err = p.write(defaultCmdTimeout, wire.NewMsgMemPool())
// If we are caught up and start collecting mempool data.
if s.cfg.MempoolEnabled && s.Synced(ctx).Synced {
err := p.write(defaultCmdTimeout, wire.NewMsgMemPool())
if err != nil {
return err
return fmt.Errorf("mempool %v: %w", p, err)
}
}

Expand Down Expand Up @@ -800,6 +829,18 @@ func (s *Server) promBadPeers() float64 {
return float64(s.prom.bad)
}

func (s *Server) promMempoolCount() float64 {
s.mtx.Lock()
defer s.mtx.Unlock()
return float64(s.prom.mempoolCount)
}

func (s *Server) promMempoolSize() float64 {
s.mtx.Lock()
defer s.mtx.Unlock()
return float64(s.prom.mempoolSize)
}

func (s *Server) promPoll(ctx context.Context) error {
for {
select {
Expand All @@ -810,6 +851,19 @@ func (s *Server) promPoll(ctx context.Context) error {

s.prom.syncInfo = s.Synced(ctx)
s.prom.connected, s.prom.good, s.prom.bad = s.pm.Stats()
if s.cfg.MempoolEnabled {
s.prom.mempoolCount, s.prom.mempoolSize = s.mempool.stats(ctx)
}

if s.promPollVerbose {
s.mtx.RLock()
log.Infof("Pending blocks %v/%v connected peers %v "+
"good peers %v bad peers %v mempool %v %v",
s.blocks.Len(), defaultPendingBlocks, s.prom.connected,
s.prom.good, s.prom.bad, s.prom.mempoolCount,
humanize.Bytes(uint64(s.prom.mempoolSize)))
s.mtx.RUnlock()
}

}
}
Expand Down Expand Up @@ -1046,6 +1100,9 @@ func (s *Server) syncBlocks(ctx context.Context) {
// XXX this is probably not a panic.
panic(fmt.Errorf("sync blocks: %w", err))
}

// Get block headers
s.pm.All(ctx, s.headersPeer)
}()
return
}
Expand Down Expand Up @@ -1077,15 +1134,19 @@ func (s *Server) handleHeaders(ctx context.Context, p *peer, msg *wire.MsgHeader
defer log.Tracef("handleHeaders exit (%v): %v", p, len(msg.Headers))

if len(msg.Headers) == 0 {
// This may signify the end of IBD but isn't 100%. We can fart
// around with mean block time to determine if this peer is
// just behind or if we are nominally where we should be. This
// test will never be 100% accurate.

// only do this if peer is synced, not sure how to detect that.

go s.syncBlocks(ctx)

// This may signify the end of IBD but isn't 100%.
if s.blksMissing(ctx) {
bhb, err := s.db.BlockHeaderBest(ctx)
if err == nil {
log.Infof("blockheaders caught up at: %v", bhb.HH())
}
go s.syncBlocks(ctx)
} else {
if s.cfg.MempoolEnabled {
// Start building the mempool.
s.pm.All(ctx, s.mempoolPeer)
}
}
return nil
}

Expand Down Expand Up @@ -1990,6 +2051,16 @@ func (s *Server) Run(pctx context.Context) error {
Name: "peers_bad",
Help: "Number of bad peers.",
}, s.promBadPeers),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Subsystem: promSubsystem,
Name: "mempool_count",
Help: "Number of txs in mempool.",
}, s.promMempoolCount),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Subsystem: promSubsystem,
Name: "mempool_size",
Help: "Size of mempool in bytes.",
}, s.promMempoolSize),
}
s.wg.Add(1)
go func() {
Expand Down