Skip to content

Commit

Permalink
unindex transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcopeereboom committed Jul 3, 2024
1 parent 9dcfb76 commit 9d8da0f
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 19 deletions.
11 changes: 8 additions & 3 deletions database/tbcd/level/level.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,10 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxK
log.Tracef("BlockTxUpdate")
defer log.Tracef("BlockTxUpdate exit")

if !(direction == 1 || direction == -1) {
return fmt.Errorf("invalid direction: %v", direction)
}

// transactions
txsTx, txsCommit, txsDiscard, err := l.startTransaction(level.TransactionsDB)
if err != nil {
Expand All @@ -933,10 +937,11 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxK
default:
return fmt.Errorf("invalid cache entry: %v", spew.Sdump(k))
}
if direction <= 0 {
txsBatch.Put(key, value)
} else {
switch direction {
case -1:
txsBatch.Delete(key)
case 1:
txsBatch.Put(key, value)
}
// log.Infof("%v:%v", spew.Sdump(key), spew.Sdump(value))
// // XXX this probably should be done by the caller but we do it
Expand Down
173 changes: 160 additions & 13 deletions service/tbc/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (s *Server) UtxoIndexer(ctx context.Context, endHash *chainhash.Hash) error
case 1:
return s.TxIndexerWind(ctx, startBH, endBH)
case -1:
return s.TxIndexerUnwind(ctx, endBH, startBH)
return s.TxIndexerUnwind(ctx, startBH, endBH)
}
return ErrNotLinear
}
Expand Down Expand Up @@ -521,11 +521,148 @@ func (s *Server) indexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash,
return blocksProcessed, last, nil
}

func (s *Server) TxIndexerUnwind(ctx context.Context, startBH, toBH *tbcd.BlockHeader) error {
// unindexTxsInBlocks indexes txs from the last processed block until the
// provided end hash, inclusive. It returns the number of blocks processed and
// the last hash it has processedd.
func (s *Server) unindexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash, txs map[tbcd.TxKey]*tbcd.TxValue) (int, *HashHeight, error) {
log.Tracef("unindexTxsInBlocks")
defer log.Tracef("unindexTxsInBlocks exit")

// indicates if we have processed endHash and thus have hit the exit
// condition.
var last *HashHeight

// Find start hash
txHH, err := s.TxIndexHash(ctx)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return 0, last, fmt.Errorf("tx index hash: %w", err)
}
txHH = &HashHeight{
Hash: *s.chainParams.GenesisHash,
Height: 0,
}
}

txsPercentage := 95 // flush cache at >95% capacity
blocksProcessed := 0
hh := txHH
for {
log.Debugf("unindexing txs: %v", hh)
log.Infof("unindexing txs: %v", hh)

hash := hh.Hash
bh, err := s.db.BlockHeaderByHash(ctx, hash[:])
if err != nil {
return 0, last, fmt.Errorf("block header %v: %w", hash, err)
}

// Index block
eb, err := s.db.BlockByHash(ctx, bh.Hash)
if err != nil {
return 0, last, fmt.Errorf("block by hash %v: %w", bh, err)
}
b, err := btcutil.NewBlockFromBytes(eb.Block)
if err != nil {
return 0, last, fmt.Errorf("could not decode block %v: %w", hh, err)
}

err = processTxs(s.chainParams, b.Hash(), b.Transactions(), txs)
if err != nil {
return 0, last, fmt.Errorf("process txs %v: %w", hh, err)
}

blocksProcessed++

// Try not to overshoot the cache to prevent costly allocations
cp := len(txs) * 100 / s.cfg.MaxCachedTxs
if bh.Height%10000 == 0 || cp > txsPercentage || blocksProcessed == 1 {
log.Infof("Tx unindexer: %v tx cache %v%%", hh, cp)
}
if cp > txsPercentage {
// Set txsMax to the largest tx capacity seen
s.cfg.MaxCachedTxs = max(len(txs), s.cfg.MaxCachedTxs)
last = hh
// Flush
break
}

// Exit if we processed the provided end hash
if endHash.IsEqual(&hash) {
last = hh
break
}

// Move to previous block
height := bh.Height - 1
pbh, err := s.db.BlockHeaderByHash(ctx, bh.ParentHash()[:])
if err != nil {
if errors.Is(err, database.ErrNotFound) {
log.Infof("No more blocks at: %v", height)
break
}
return 0, last, fmt.Errorf("block headers by height %v: %w",
height, err)
}
hh.Hash = *pbh.BlockHash()
hh.Height = pbh.Height
}

return blocksProcessed, last, nil
}

func (s *Server) TxIndexerUnwind(ctx context.Context, startBH, endBH *tbcd.BlockHeader) error {
log.Tracef("TxIndexerUnwind")
defer log.Tracef("TxIndexerUnwind exit")

return fmt.Errorf("TxIndexerUnwind not yet")
// XXX dedup with TxIndexedWind; it's basically the same code but with the direction, start anf endhas flipped

// Allocate here so that we don't waste space when not indexing.
txs := make(map[tbcd.TxKey]*tbcd.TxValue, s.cfg.MaxCachedTxs)
defer clear(txs)

log.Infof("Start unwinding Txs at hash %v height %v", startBH, startBH.Height)
log.Infof("End unwinding Txs at hash %v height %v", endBH, endBH.Height)
endHash := endBH.BlockHash()
for {
start := time.Now()
blocksProcessed, last, err := s.unindexTxsInBlocks(ctx, endHash, txs)
if err != nil {
return fmt.Errorf("index tx in blocks: %w", err)
}
if blocksProcessed == 0 {
return nil
}
txsCached := len(txs)
log.Infof("Tx unwinder blocks processed %v in %v transactions cached %v cache unused %v avg tx/blk %v",
blocksProcessed, time.Since(start), txsCached,
s.cfg.MaxCachedTxs-txsCached, txsCached/blocksProcessed)

// Flush to disk
start = time.Now()
if err = s.db.BlockTxUpdate(ctx, -1, txs); err != nil {
return fmt.Errorf("block tx update: %w", err)
}
// leveldb does all kinds of allocations, force GC to lower
// memory preassure.
logMemStats()
runtime.GC()

log.Infof("Flushing unwind txs complete %v took %v",
txsCached, time.Since(start))

// Record height in metadata
err = s.db.MetadataPut(ctx, TxIndexHashKey, last.Hash[:])
if err != nil {
return fmt.Errorf("metadata tx hash: %w", err)
}

if endHash.IsEqual(&last.Hash) {
break
}

}
return nil
}

func (s *Server) TxIndexerWind(ctx context.Context, startBH, endBH *tbcd.BlockHeader) error {
Expand Down Expand Up @@ -555,7 +692,7 @@ func (s *Server) TxIndexerWind(ctx context.Context, startBH, endBH *tbcd.BlockHe

// Flush to disk
start = time.Now()
if err = s.db.BlockTxUpdate(ctx, 0, txs); err != nil {
if err = s.db.BlockTxUpdate(ctx, 1, txs); err != nil {
return fmt.Errorf("block tx update: %w", err)
}
// leveldb does all kinds of allocations, force GC to lower
Expand Down Expand Up @@ -585,6 +722,8 @@ func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error {
log.Tracef("TxIndexer")
defer log.Tracef("TxIndexer exit")

// XXX this is basically duplicate from TxIndexIsLinear

// Verify exit condition hash
if endHash == nil {
return errors.New("must provide an end hash")
Expand All @@ -611,13 +750,21 @@ func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error {
if err != nil {
return fmt.Errorf("blockheader hash: %w", err)
}
switch endBH.Difficulty.Cmp(&startBH.Difficulty) {
direction, err := s.TxIndexIsLinear(ctx, endHash)
if err != nil {
return fmt.Errorf("TxIndexIsLinear: %w", err)
}
// switch endBH.Difficulty.Cmp(&startBH.Difficulty) {
switch direction {
case 1:
return s.TxIndexerWind(ctx, startBH, endBH)
case -1:
return s.TxIndexerUnwind(ctx, endBH, startBH)
return s.TxIndexerUnwind(ctx, startBH, endBH)
case 0:
// XXX dedup TxIndexIsLinear with the above code so that it isn't so awkward.
return nil // because we call TxIndexIsLinear we know it's the same block
}
return ErrNotLinear
return fmt.Errorf("wtf") // XXX fix code so thatw e can't get here
}

func (s *Server) TxIndexIsLinear(ctx context.Context, endHash *chainhash.Hash) (int, error) {
Expand Down Expand Up @@ -650,9 +797,12 @@ func (s *Server) TxIndexIsLinear(ctx context.Context, endHash *chainhash.Hash) (
return 0, fmt.Errorf("blockheader hash: %w", err)
}
direction := endBH.Difficulty.Cmp(&startBH.Difficulty)
log.Debugf("startBH %v %v", startBH, startBH.Difficulty)
log.Debugf("endBH %v %v", endBH, endBH.Difficulty)
log.Debugf("direction %v", direction)
log.Infof("startBH %v %v", startBH, startBH.Difficulty)
log.Infof("endBH %v %v", endBH, endBH.Difficulty)
log.Infof("direction %v", direction)
if startBH.BlockHash().IsEqual(endBH.BlockHash()) {
return 0, nil
}

// Expensive linear test, this needs some performance love. We can
// memoize it keep snapshot heights whereto we know the chain is
Expand All @@ -671,9 +821,6 @@ func (s *Server) TxIndexIsLinear(ctx context.Context, endHash *chainhash.Hash) (
default:
return 0, ErrNotLinear
}
if direction > 0 {
} else {
}
for {
bh, err := s.db.BlockHeaderByHash(ctx, h[:])
if err != nil {
Expand Down
34 changes: 31 additions & 3 deletions service/tbc/tbcfork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ func TestIndexFork(t *testing.T) {
// XXX verify indexes

// Verify linear indexing. Current TxIndex is sitting at b3
t.Logf("b3: %v", b3)

// b3 -> genesis should work with postive direction (cdiff is greater than target)
direction, err = s.TxIndexIsLinear(ctx, s.chainParams.GenesisHash)
Expand Down Expand Up @@ -1002,10 +1003,37 @@ func TestIndexFork(t *testing.T) {
t.Fatalf("b2b is not linear to b3: %v", err)
}

// unwind back to genesis
// make sure syncing to iself is non linear
err = s.SyncIndexersToHash(ctx, b3.Hash())
if !errors.Is(err, ErrNotLinear) {
t.Fatalf("at b3, should have returned not linear, got %v", err)
if err != nil {
t.Fatalf("at b3, should have returned nil, got %v", err)
}

// unwind back to genesis
err = s.SyncIndexersToHash(ctx, s.chainParams.GenesisHash)
if err != nil {
t.Fatalf("unwinding to genesis should have returned nil, got %v", err)
}

// XXX verify indexes
txHH, err := s.TxIndexHash(ctx)
if err != nil {
t.Fatalf("expected success getting tx index hash, got: %v", err)
}
if !txHH.Hash.IsEqual(s.chainParams.GenesisHash) {
t.Fatalf("expected tx index hash to be equal to genesis, got: %v", txHH)
}
if txHH.Height != 0 {
t.Fatalf("expected tx index height to be 0, got: %v", txHH.Height)
}

// see if we can move to b2z
direction, err = s.TxIndexIsLinear(ctx, b2a.Hash())
if err != nil {
t.Fatalf("expected success genesis -> b2a, got %v", err)
}
if direction != 1 {
t.Fatalf("expected 1 going from genesis to b2a, got %v", direction)
}

//// Should fail
Expand Down

0 comments on commit 9d8da0f

Please sign in to comment.