Skip to content

Commit

Permalink
bring back tx indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
marcopeereboom committed Jun 21, 2024
1 parent 9956515 commit dd52ff0
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 125 deletions.
55 changes: 21 additions & 34 deletions cmd/hemictl/hemictl.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,40 +429,27 @@ func tbcdb() error {
}

case "txindex":
panic("txindex")
//var h, c, mc uint64
//height := args["height"]
//if height == "" {
// // Get height from db
// he, err := s.DB().MetadataGet(ctx, tbc.TxIndexHeightKey)
// if err != nil {
// if !errors.Is(err, database.ErrNotFound) {
// return fmt.Errorf("metadata %v: %w",
// string(tbc.TxIndexHeightKey), err)
// }
// he = make([]byte, 8)
// }
// h = binary.BigEndian.Uint64(he)
//} else if h, err = strconv.ParseUint(height, 10, 64); err != nil {
// return fmt.Errorf("height: %w", err)
//}
//count := args["count"]
//if count == "" {
// c = 0
//} else if c, err = strconv.ParseUint(count, 10, 64); err != nil {
// return fmt.Errorf("count: %w", err)
//}
//maxCache := args["maxcache"]
//if maxCache != "" {
// if mc, err = strconv.ParseUint(maxCache, 10, 64); err != nil {
// return fmt.Errorf("maxCache: %w", err)
// }
// cfg.MaxCachedTxs = int(mc)
//}
//err = s.TxIndexer(ctx, h, c)
//if err != nil {
// return fmt.Errorf("indexer: %w", err)
//}
hash := args["hash"]
if hash == "" {
return fmt.Errorf("must provide hash")
}
eh, err := chainhash.NewHashFromStr(hash)
if err != nil {
return fmt.Errorf("chainhash: %v", err)
}

maxCache := args["maxcache"]
var mc uint64
if maxCache != "" {
if mc, err = strconv.ParseUint(maxCache, 10, 64); err != nil {
return fmt.Errorf("maxCache: %w", err)
}
cfg.MaxCachedTxs = int(mc)
}
err = s.TxIndexer(ctx, eh)
if err != nil {
return fmt.Errorf("indexer: %w", err)
}

case "blocksbytxid":
txid := args["txid"]
Expand Down
213 changes: 129 additions & 84 deletions service/tbc/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (s *Server) UtxoIndexHash(ctx context.Context) (*HashHeight, error) {
return s.mdHashHeight(ctx, UtxoIndexHashKey)
}

// UtxoIndexHash returns the last hash that has been been Tx indexed.
func (s *Server) txIndexHash(ctx context.Context) (*HashHeight, error) {
// TxIndexHash returns the last hash that has been been Tx indexed.
func (s *Server) TxIndexHash(ctx context.Context) (*HashHeight, error) {
return s.mdHashHeight(ctx, TxIndexHashKey)
}

Expand Down Expand Up @@ -331,7 +331,7 @@ func (s *Server) UtxoIndexer(ctx context.Context, endHash *chainhash.Hash) error
// Record height in metadata
err = s.db.MetadataPut(ctx, UtxoIndexHashKey, last.Hash[:])
if err != nil {
return fmt.Errorf("metadata utxo height: %w", err)
return fmt.Errorf("metadata utxo hash: %w", err)
}

if endHash.IsEqual(&last.Hash) {
Expand Down Expand Up @@ -361,134 +361,179 @@ func processTxs(cp *chaincfg.Params, blockHash *chainhash.Hash, txs []*btcutil.T
return nil
}

func (s *Server) indexTxsInBlocks(ctx context.Context, startHeight, maxHeight uint64, txs map[tbcd.TxKey]*tbcd.TxValue) (int, error) {
// indexTxsInBlocks indexes txs from the last processed block until the
// provided end hash, inclusive. It returns the number of blocks processed and
// the last hash it has processedd.
func (s *Server) indexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash, txs map[tbcd.TxKey]*tbcd.TxValue) (int, *HashHeight, error) {
log.Tracef("indexTxsInBlocks")
defer log.Tracef("indexTxsInBlocks exit")

circuitBreaker := false
if maxHeight != 0 {
circuitBreaker = true
// indicates if we have processed endHash and thus have hit the exit
// condition.
var last *HashHeight

// Find start hash
txHH, err := s.TxIndexHash(ctx)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return 0, last, fmt.Errorf("tx index hash: %w", err)
}
txHH = &HashHeight{
Hash: *s.chainParams.GenesisHash,
Height: 0,
}
}

txsPercentage := 95 // flush cache at >95% capacity
blocksProcessed := 0
for height := startHeight; ; height++ {
bhs, err := s.db.BlockHeadersByHeight(ctx, height)
hh := txHH
for {
log.Debugf("indexing txs: %v", hh)

hash := hh.Hash
bh, err := s.db.BlockHeaderByHash(ctx, hash[:])
if err != nil {
if errors.Is(err, database.ErrNotFound) {
log.Infof("No more blocks at: %v", height)
break
}
return 0, fmt.Errorf("block headers by height %v: %w", height, err)
return 0, last, fmt.Errorf("block header %v: %w", hash, err)
}
eb, err := s.db.BlockByHash(ctx, bhs[0].Hash)

// Index block
eb, err := s.db.BlockByHash(ctx, bh.Hash)
if err != nil {
return 0, fmt.Errorf("block by hash %v: %w", height, err)
return 0, last, fmt.Errorf("block by hash %v: %w", bh, err)
}
b, err := btcutil.NewBlockFromBytes(eb.Block)
if err != nil {
ch, _ := chainhash.NewHash(bhs[0].Hash)
return 0, fmt.Errorf("could not decode block %v %v: %v",
height, ch, err)
return 0, last, fmt.Errorf("could not decode block %v: %v", hh, err)
}

err = processTxs(s.chainParams, b.Hash(), b.Transactions(), txs)
if err != nil {
return 0, fmt.Errorf("process txs %v: %w", height, err)
return 0, last, fmt.Errorf("process txs %v: %w", hh, err)
}

blocksProcessed++

// Try not to overshoot the cache to prevent costly allocations
cp := len(txs) * 100 / s.cfg.MaxCachedTxs
if height%10000 == 0 || cp > txsPercentage || blocksProcessed == 1 {
log.Infof("Tx indexer height: %v tx cache %v%%", height, cp)
if bh.Height%10000 == 0 || cp > txsPercentage || blocksProcessed == 1 {
log.Infof("Tx indexer: %v tx cache %v%%", hh, cp)
}
if cp > txsPercentage {
// Set txsMax to the largest tx capacity seen
s.cfg.MaxCachedTxs = max(len(txs), s.cfg.MaxCachedTxs)
last = hh
// Flush
break
}

// If set we may have to exit early
if circuitBreaker {
if height >= maxHeight-1 {
// Exit if we processed the provided end hash
if endHash.IsEqual(&hash) {
last = hh
break
}

// Move to next block
height := bh.Height + 1
bhs, err := s.db.BlockHeadersByHeight(ctx, height)
if err != nil {
if errors.Is(err, database.ErrNotFound) {
log.Infof("No more blocks at: %v", height)
break
}
return 0, last, fmt.Errorf("block headers by height %v: %w",
height, err)
}
if len(bhs) > 1 {
panic("FIXME handle multiple block headers")
}
// Verify it connects to parent
if !hash.IsEqual(bhs[0].ParentHash()) {
return 0, last, fmt.Errorf("%v does not connect to: %v",
bhs[0], hash)
}
hh.Hash = *bhs[0].BlockHash()
hh.Height = bhs[0].Height
}

return blocksProcessed, nil
return blocksProcessed, last, nil
}

// TxIndexer starts indexing at start height for count blocks. If count is 0
// the indexers will index to tip. It does NOT verify that the provided start
// height is correct. This is the version of the function that has no training
// wheels and is meant for internal use only.
func (s *Server) TxIndexer(ctx context.Context, height, count uint64) error {
func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error {
log.Tracef("TxIndexer")
defer log.Tracef("TxIndexer exit")

return fmt.Errorf("TxIndexer not yet")
//var maxHeight uint64
//circuitBreaker := false
//if count != 0 {
// circuitBreaker = true
// maxHeight = height + count
//}
// Verify exit condition hash
if endHash == nil {
return fmt.Errorf("must provide an end hash")
}
_, endHeight, err := s.BlockHeaderByHash(ctx, endHash)
if err != nil {
return fmt.Errorf("blockheader hash: %w", err)
}

// Verify start point is not after the end point
txHH, err := s.TxIndexHash(ctx)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return fmt.Errorf("tx indexer : %w", err)
}
txHH = &HashHeight{
Hash: *s.chainParams.GenesisHash,
Height: 0,
}
}

//// Allocate here so that we don't waste space when not indexing.
//txs := make(map[tbcd.TxKey]*tbcd.TxValue, s.cfg.MaxCachedTxs)
//// log.Infof("max %v %v", s.cfg.MaxCachedTxs, s.cfg.MaxCachedTxs*(105))
//// return nil
//defer clear(txs)
// XXX we need training wheels here. We can't blind accept the end
// without asserting if it is either ihigher in the chain or is a
// forced for.
// XXX check cumulative? check fork?

// log.Infof("Start indexing transactions at height %v count %v", height, count)
// for {
// start := time.Now()
// blocksProcessed, err := s.indexTxsInBlocks(ctx, height, maxHeight, txs)
// if err != nil {
// return fmt.Errorf("index blocks: %w", err)
// }
// if blocksProcessed == 0 {
// return nil
// }
// txsCached := len(txs)
// log.Infof("Tx indexer blocks processed %v in %v transactions cached %v cache unused %v avg tx/blk %v",
// blocksProcessed, time.Since(start), txsCached,
// s.cfg.MaxCachedTxs-txsCached, txsCached/blocksProcessed)

// start = time.Now()
// if err = s.db.BlockTxUpdate(ctx, txs); err != nil {
// return fmt.Errorf("block tx update: %w", err)
// }
// // leveldb does all kinds of allocations, force GC to lower
// // memory preassure.
// logMemStats()
// runtime.GC()
// Allocate here so that we don't waste space when not indexing.
txs := make(map[tbcd.TxKey]*tbcd.TxValue, s.cfg.MaxCachedTxs)
defer clear(txs)

log.Infof("Start indexing Txs at hash %v height %v", txHH.Hash, txHH.Height)
log.Infof("End indexing Txs at hash %v height %v", endHash, endHeight)
for {
start := time.Now()
blocksProcessed, last, err := s.indexTxsInBlocks(ctx, endHash, txs)
if err != nil {
return fmt.Errorf("index blocks: %w", err)
}
if blocksProcessed == 0 {
return nil
}
txsCached := len(txs)
log.Infof("Tx indexer blocks processed %v in %v transactions cached %v cache unused %v avg tx/blk %v",
blocksProcessed, time.Since(start), txsCached,
s.cfg.MaxCachedTxs-txsCached, txsCached/blocksProcessed)

// Flush to disk
start = time.Now()
if err = s.db.BlockTxUpdate(ctx, txs); err != nil {
return fmt.Errorf("block tx update: %w", err)
}
// leveldb does all kinds of allocations, force GC to lower
// memory preassure.
logMemStats()
runtime.GC()

// log.Infof("Flushing txs complete %v took %v",
// txsCached, time.Since(start))
log.Infof("Flushing txs complete %v took %v",
txsCached, time.Since(start))

// height += uint64(blocksProcessed)
// Record height in metadata
err = s.db.MetadataPut(ctx, TxIndexHashKey, last.Hash[:])
if err != nil {
return fmt.Errorf("metadata tx hash: %w", err)
}

// // Record height in metadata
// var dbHeight [8]byte
// binary.BigEndian.PutUint64(dbHeight[:], height)
// err = s.db.MetadataPut(ctx, TxIndexHeightKey, dbHeight[:])
// if err != nil {
// return fmt.Errorf("metadata tx height: %w", err)
// }
if endHash.IsEqual(&last.Hash) {
break
}

// // If set we may have to exit early
// if circuitBreaker {
// log.Infof("Indexed transactions to height: %v", height-1)
// if height >= maxHeight {
// return nil
// }
// }
//}
}

return nil
}

// SyncIndexersToHash tries to move the various indexers to the supplied
Expand Down
Loading

0 comments on commit dd52ff0

Please sign in to comment.