diff --git a/database/tbcd/level/level.go b/database/tbcd/level/level.go index 66e28b5e..0666f02b 100644 --- a/database/tbcd/level/level.go +++ b/database/tbcd/level/level.go @@ -911,6 +911,10 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxK log.Tracef("BlockTxUpdate") defer log.Tracef("BlockTxUpdate exit") + if !(direction == 1 || direction == -1) { + return fmt.Errorf("invalid direction: %v", direction) + } + // transactions txsTx, txsCommit, txsDiscard, err := l.startTransaction(level.TransactionsDB) if err != nil { @@ -933,10 +937,11 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxK default: return fmt.Errorf("invalid cache entry: %v", spew.Sdump(k)) } - if direction <= 0 { - txsBatch.Put(key, value) - } else { + switch direction { + case -1: txsBatch.Delete(key) + case 1: + txsBatch.Put(key, value) } // log.Infof("%v:%v", spew.Sdump(key), spew.Sdump(value)) // // XXX this probably should be done by the caller but we do it diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index 274719ac..dcbb568c 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -398,7 +398,7 @@ func (s *Server) UtxoIndexer(ctx context.Context, endHash *chainhash.Hash) error case 1: return s.TxIndexerWind(ctx, startBH, endBH) case -1: - return s.TxIndexerUnwind(ctx, endBH, startBH) + return s.TxIndexerUnwind(ctx, startBH, endBH) } return ErrNotLinear } @@ -521,11 +521,148 @@ func (s *Server) indexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash, return blocksProcessed, last, nil } -func (s *Server) TxIndexerUnwind(ctx context.Context, startBH, toBH *tbcd.BlockHeader) error { +// unindexTxsInBlocks indexes txs from the last processed block until the +// provided end hash, inclusive. It returns the number of blocks processed and +// the last hash it has processedd. +func (s *Server) unindexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash, txs map[tbcd.TxKey]*tbcd.TxValue) (int, *HashHeight, error) { + log.Tracef("unindexTxsInBlocks") + defer log.Tracef("unindexTxsInBlocks exit") + + // indicates if we have processed endHash and thus have hit the exit + // condition. + var last *HashHeight + + // Find start hash + txHH, err := s.TxIndexHash(ctx) + if err != nil { + if !errors.Is(err, database.ErrNotFound) { + return 0, last, fmt.Errorf("tx index hash: %w", err) + } + txHH = &HashHeight{ + Hash: *s.chainParams.GenesisHash, + Height: 0, + } + } + + txsPercentage := 95 // flush cache at >95% capacity + blocksProcessed := 0 + hh := txHH + for { + log.Debugf("unindexing txs: %v", hh) + log.Infof("unindexing txs: %v", hh) + + hash := hh.Hash + bh, err := s.db.BlockHeaderByHash(ctx, hash[:]) + if err != nil { + return 0, last, fmt.Errorf("block header %v: %w", hash, err) + } + + // Index block + eb, err := s.db.BlockByHash(ctx, bh.Hash) + if err != nil { + return 0, last, fmt.Errorf("block by hash %v: %w", bh, err) + } + b, err := btcutil.NewBlockFromBytes(eb.Block) + if err != nil { + return 0, last, fmt.Errorf("could not decode block %v: %w", hh, err) + } + + err = processTxs(s.chainParams, b.Hash(), b.Transactions(), txs) + if err != nil { + return 0, last, fmt.Errorf("process txs %v: %w", hh, err) + } + + blocksProcessed++ + + // Try not to overshoot the cache to prevent costly allocations + cp := len(txs) * 100 / s.cfg.MaxCachedTxs + if bh.Height%10000 == 0 || cp > txsPercentage || blocksProcessed == 1 { + log.Infof("Tx unindexer: %v tx cache %v%%", hh, cp) + } + if cp > txsPercentage { + // Set txsMax to the largest tx capacity seen + s.cfg.MaxCachedTxs = max(len(txs), s.cfg.MaxCachedTxs) + last = hh + // Flush + break + } + + // Exit if we processed the provided end hash + if endHash.IsEqual(&hash) { + last = hh + break + } + + // Move to previous block + height := bh.Height - 1 + pbh, err := s.db.BlockHeaderByHash(ctx, bh.ParentHash()[:]) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + log.Infof("No more blocks at: %v", height) + break + } + return 0, last, fmt.Errorf("block headers by height %v: %w", + height, err) + } + hh.Hash = *pbh.BlockHash() + hh.Height = pbh.Height + } + + return blocksProcessed, last, nil +} + +func (s *Server) TxIndexerUnwind(ctx context.Context, startBH, endBH *tbcd.BlockHeader) error { log.Tracef("TxIndexerUnwind") defer log.Tracef("TxIndexerUnwind exit") - return fmt.Errorf("TxIndexerUnwind not yet") + // XXX dedup with TxIndexedWind; it's basically the same code but with the direction, start anf endhas flipped + + // Allocate here so that we don't waste space when not indexing. + txs := make(map[tbcd.TxKey]*tbcd.TxValue, s.cfg.MaxCachedTxs) + defer clear(txs) + + log.Infof("Start unwinding Txs at hash %v height %v", startBH, startBH.Height) + log.Infof("End unwinding Txs at hash %v height %v", endBH, endBH.Height) + endHash := endBH.BlockHash() + for { + start := time.Now() + blocksProcessed, last, err := s.unindexTxsInBlocks(ctx, endHash, txs) + if err != nil { + return fmt.Errorf("index tx in blocks: %w", err) + } + if blocksProcessed == 0 { + return nil + } + txsCached := len(txs) + log.Infof("Tx unwinder blocks processed %v in %v transactions cached %v cache unused %v avg tx/blk %v", + blocksProcessed, time.Since(start), txsCached, + s.cfg.MaxCachedTxs-txsCached, txsCached/blocksProcessed) + + // Flush to disk + start = time.Now() + if err = s.db.BlockTxUpdate(ctx, -1, txs); err != nil { + return fmt.Errorf("block tx update: %w", err) + } + // leveldb does all kinds of allocations, force GC to lower + // memory preassure. + logMemStats() + runtime.GC() + + log.Infof("Flushing unwind txs complete %v took %v", + txsCached, time.Since(start)) + + // Record height in metadata + err = s.db.MetadataPut(ctx, TxIndexHashKey, last.Hash[:]) + if err != nil { + return fmt.Errorf("metadata tx hash: %w", err) + } + + if endHash.IsEqual(&last.Hash) { + break + } + + } + return nil } func (s *Server) TxIndexerWind(ctx context.Context, startBH, endBH *tbcd.BlockHeader) error { @@ -555,7 +692,7 @@ func (s *Server) TxIndexerWind(ctx context.Context, startBH, endBH *tbcd.BlockHe // Flush to disk start = time.Now() - if err = s.db.BlockTxUpdate(ctx, 0, txs); err != nil { + if err = s.db.BlockTxUpdate(ctx, 1, txs); err != nil { return fmt.Errorf("block tx update: %w", err) } // leveldb does all kinds of allocations, force GC to lower @@ -585,6 +722,8 @@ func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error { log.Tracef("TxIndexer") defer log.Tracef("TxIndexer exit") + // XXX this is basically duplicate from TxIndexIsLinear + // Verify exit condition hash if endHash == nil { return errors.New("must provide an end hash") @@ -611,13 +750,21 @@ func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error { if err != nil { return fmt.Errorf("blockheader hash: %w", err) } - switch endBH.Difficulty.Cmp(&startBH.Difficulty) { + direction, err := s.TxIndexIsLinear(ctx, endHash) + if err != nil { + return fmt.Errorf("TxIndexIsLinear: %w", err) + } + // switch endBH.Difficulty.Cmp(&startBH.Difficulty) { + switch direction { case 1: return s.TxIndexerWind(ctx, startBH, endBH) case -1: - return s.TxIndexerUnwind(ctx, endBH, startBH) + return s.TxIndexerUnwind(ctx, startBH, endBH) + case 0: + // XXX dedup TxIndexIsLinear with the above code so that it isn't so awkward. + return nil // because we call TxIndexIsLinear we know it's the same block } - return ErrNotLinear + return fmt.Errorf("wtf") // XXX fix code so thatw e can't get here } func (s *Server) TxIndexIsLinear(ctx context.Context, endHash *chainhash.Hash) (int, error) { @@ -650,9 +797,12 @@ func (s *Server) TxIndexIsLinear(ctx context.Context, endHash *chainhash.Hash) ( return 0, fmt.Errorf("blockheader hash: %w", err) } direction := endBH.Difficulty.Cmp(&startBH.Difficulty) - log.Debugf("startBH %v %v", startBH, startBH.Difficulty) - log.Debugf("endBH %v %v", endBH, endBH.Difficulty) - log.Debugf("direction %v", direction) + log.Infof("startBH %v %v", startBH, startBH.Difficulty) + log.Infof("endBH %v %v", endBH, endBH.Difficulty) + log.Infof("direction %v", direction) + if startBH.BlockHash().IsEqual(endBH.BlockHash()) { + return 0, nil + } // Expensive linear test, this needs some performance love. We can // memoize it keep snapshot heights whereto we know the chain is @@ -671,9 +821,6 @@ func (s *Server) TxIndexIsLinear(ctx context.Context, endHash *chainhash.Hash) ( default: return 0, ErrNotLinear } - if direction > 0 { - } else { - } for { bh, err := s.db.BlockHeaderByHash(ctx, h[:]) if err != nil { diff --git a/service/tbc/tbcfork_test.go b/service/tbc/tbcfork_test.go index ac54031e..c666637a 100644 --- a/service/tbc/tbcfork_test.go +++ b/service/tbc/tbcfork_test.go @@ -972,6 +972,7 @@ func TestIndexFork(t *testing.T) { // XXX verify indexes // Verify linear indexing. Current TxIndex is sitting at b3 + t.Logf("b3: %v", b3) // b3 -> genesis should work with postive direction (cdiff is greater than target) direction, err = s.TxIndexIsLinear(ctx, s.chainParams.GenesisHash) @@ -1002,10 +1003,37 @@ func TestIndexFork(t *testing.T) { t.Fatalf("b2b is not linear to b3: %v", err) } - // unwind back to genesis + // make sure syncing to iself is non linear err = s.SyncIndexersToHash(ctx, b3.Hash()) - if !errors.Is(err, ErrNotLinear) { - t.Fatalf("at b3, should have returned not linear, got %v", err) + if err != nil { + t.Fatalf("at b3, should have returned nil, got %v", err) + } + + // unwind back to genesis + err = s.SyncIndexersToHash(ctx, s.chainParams.GenesisHash) + if err != nil { + t.Fatalf("unwinding to genesis should have returned nil, got %v", err) + } + + // XXX verify indexes + txHH, err := s.TxIndexHash(ctx) + if err != nil { + t.Fatalf("expected success getting tx index hash, got: %v", err) + } + if !txHH.Hash.IsEqual(s.chainParams.GenesisHash) { + t.Fatalf("expected tx index hash to be equal to genesis, got: %v", txHH) + } + if txHH.Height != 0 { + t.Fatalf("expected tx index height to be 0, got: %v", txHH.Height) + } + + // see if we can move to b2z + direction, err = s.TxIndexIsLinear(ctx, b2a.Hash()) + if err != nil { + t.Fatalf("expected success genesis -> b2a, got %v", err) + } + if direction != 1 { + t.Fatalf("expected 1 going from genesis to b2a, got %v", direction) } //// Should fail