Skip to content

Commit

Permalink
Flip direction logic around; while it was correct it was difficult to…
Browse files Browse the repository at this point in the history
… reason about
  • Loading branch information
marcopeereboom committed Jul 2, 2024
1 parent 138e500 commit ca21b3d
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 37 deletions.
2 changes: 1 addition & 1 deletion database/tbcd/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Database interface {

// Transactions
BlockUtxoUpdate(ctx context.Context, utxos map[Outpoint]CacheOutput) error
BlockTxUpdate(ctx context.Context, txs map[TxKey]*TxValue) error
BlockTxUpdate(ctx context.Context, direction int, txs map[TxKey]*TxValue) error
BlocksByTxId(ctx context.Context, txId TxId) ([]BlockHash, error)
SpendOutputsByTxId(ctx context.Context, txId TxId) ([]SpendInfo, error)

Expand Down
13 changes: 9 additions & 4 deletions database/tbcd/level/level.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ func (l *ldb) BlockHeaderGenesisInsert(ctx context.Context, bh [80]byte) error {

hhKey := heightHashToKey(0, bhash[:])
hhBatch.Put(hhKey, []byte{})
ebh := encodeBlockHeader(0, bh, new(big.Int))
cdiff := big.NewInt(0)
cdiff = new(big.Int).Add(cdiff, blockchain.CalcWork(wbh.Bits))
ebh := encodeBlockHeader(0, bh, cdiff)
bhBatch.Put(bhash[:], ebh[:])

bhBatch.Put([]byte(bhsCanonicalTipKey), ebh[:])
Expand Down Expand Up @@ -905,7 +907,7 @@ func (l *ldb) BlockUtxoUpdate(ctx context.Context, utxos map[tbcd.Outpoint]tbcd.
return nil
}

func (l *ldb) BlockTxUpdate(ctx context.Context, txs map[tbcd.TxKey]*tbcd.TxValue) error {
func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxKey]*tbcd.TxValue) error {
log.Tracef("BlockTxUpdate")
defer log.Tracef("BlockTxUpdate exit")

Expand All @@ -931,8 +933,11 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, txs map[tbcd.TxKey]*tbcd.TxValu
default:
return fmt.Errorf("invalid cache entry: %v", spew.Sdump(k))
}

txsBatch.Put(key, value)
if direction <= 0 {
txsBatch.Put(key, value)
} else {
txsBatch.Delete(key)
}
// log.Infof("%v:%v", spew.Sdump(key), spew.Sdump(value))
// // XXX this probably should be done by the caller but we do it
// // here to lower memory pressure as large gobs of data are
Expand Down
69 changes: 50 additions & 19 deletions service/tbc/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,16 +392,12 @@ func (s *Server) UtxoIndexer(ctx context.Context, endHash *chainhash.Hash) error
if err != nil {
return fmt.Errorf("blockheader hash: %w", err)
}
direction := startBH.Difficulty.Cmp(&endBH.Difficulty)
switch {
case direction <= 0:
if endBH.Difficulty.Cmp(&startBH.Difficulty) >= 1 {
return s.UtxoIndexerWind(ctx, startBH, endBH)
default:
// start > end thus we must unwind
return s.UtxoIndexerUnwind(ctx, endBH, startBH)
}

return nil
// start > end thus we must unwind
return s.UtxoIndexerUnwind(ctx, endBH, startBH)
}

func processTxs(cp *chaincfg.Params, blockHash *chainhash.Hash, txs []*btcutil.Tx, txsCache map[tbcd.TxKey]*tbcd.TxValue) error {
Expand Down Expand Up @@ -556,7 +552,7 @@ func (s *Server) TxIndexerWind(ctx context.Context, startBH, endBH *tbcd.BlockHe

// Flush to disk
start = time.Now()
if err = s.db.BlockTxUpdate(ctx, txs); err != nil {
if err = s.db.BlockTxUpdate(ctx, 0, txs); err != nil {
return fmt.Errorf("block tx update: %w", err)
}
// leveldb does all kinds of allocations, force GC to lower
Expand Down Expand Up @@ -606,20 +602,55 @@ func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error {
Height: 0,
}
}
// XXX make sure there is no gap between start and end or vice versa.
startBH, err := s.db.BlockHeaderByHash(ctx, txHH.Hash[:])
if err != nil {
return fmt.Errorf("blockheader hash: %w", err)
}
direction := startBH.Difficulty.Cmp(&endBH.Difficulty)
switch {
case direction <= 0:
if endBH.Difficulty.Cmp(&startBH.Difficulty) >= 1 {
return s.TxIndexerWind(ctx, startBH, endBH)
default:
// start > end thus we must unwind
return s.TxIndexerUnwind(ctx, endBH, startBH)
}

return nil
// start > end thus we must unwind
return s.TxIndexerUnwind(ctx, endBH, startBH)
}

func (s *Server) TxIndexIsLinear(ctx context.Context, endHash *chainhash.Hash) (int, error) {
log.Tracef("TxIndexIsLinear")
defer log.Tracef("TxIndexIsLinear exit")

// Verify exit condition hash
if endHash == nil {
return 0, errors.New("must provide an end hash")
}
endBH, err := s.db.BlockHeaderByHash(ctx, endHash[:])
if err != nil {
return 0, fmt.Errorf("blockheader hash: %w", err)
}

// Verify start point is not after the end point
txHH, err := s.TxIndexHash(ctx)
if err != nil {
if !errors.Is(err, database.ErrNotFound) {
return 0, fmt.Errorf("tx indexer : %w", err)
}
txHH = &HashHeight{
Hash: *s.chainParams.GenesisHash,
Height: 0,
}
}
// XXX make sure there is no gap between start and end or vice versa.
startBH, err := s.db.BlockHeaderByHash(ctx, txHH.Hash[:])
if err != nil {
return 0, fmt.Errorf("blockheader hash: %w", err)
}
// direction := startBH.Difficulty.Cmp(&endBH.Difficulty)
direction := endBH.Difficulty.Cmp(&startBH.Difficulty)
log.Infof("startBH %v %v", startBH, startBH.Difficulty)
log.Infof("endBH %v %v", endBH, endBH.Difficulty)
log.Infof("direction %v", direction)

return direction, nil
}

// SyncIndexersToHash tries to move the various indexers to the supplied
Expand Down Expand Up @@ -667,10 +698,10 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e
}
}()

log.Debugf("Syncing indexes to: %v", hash)
if err := s.UtxoIndexer(ctx, hash); err != nil {
return fmt.Errorf("utxo indexer: %w", err)
}
//log.Debugf("Syncing indexes to: %v", hash)
//if err := s.UtxoIndexer(ctx, hash); err != nil {
// return fmt.Errorf("utxo indexer: %w", err)
//}

// Transactions index
if err := s.TxIndexer(ctx, hash); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions service/tbc/tbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,13 +686,13 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) {

nbh, err := s.db.BlockHeaderByHash(ctx, bhb.ParentHash()[:])
if err != nil {
panic(err)
panic(err) // XXX
}
bhb = nbh
log.Infof("WALKING BACK TO: %v", bhb)
if err = s.getHeaders(ctx, p, bhb.Header); err != nil {
panic(err)
return
panic(err) // XXX
// return
}
continue
}
Expand Down
63 changes: 53 additions & 10 deletions service/tbc/tbcfork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,25 +953,68 @@ func TestIndexFork(t *testing.T) {
t.Fatal(err)
}

// Verify linear indexing. Current TxIndex is sitting at genesis

// genesis -> b3 should work with negative direction (cdiff is less than target)
direction, err := s.TxIndexIsLinear(ctx, b3.Hash())
if err != nil {
t.Fatalf("expected success g -> b3, got %v", err)
}
if direction <= 0 {
t.Fatalf("expected 1 going from genesis to b3, got %v", direction)
}

// Index to b3
err = s.SyncIndexersToHash(ctx, b3.Hash())
if err != nil {
t.Fatal(err)
}
// XXX verify indexes

// Should fail
t.Logf("=== index b2a ===")
err = s.SyncIndexersToHash(ctx, b2a.Hash())
if err != nil {
t.Fatal(err)
}
// Verify linear indexing. Current TxIndex is sitting at b3

t.Logf("=== index b2b ===")
err = s.SyncIndexersToHash(ctx, b2b.Hash())
// b3 -> genesis should work with postive direction (cdiff is greater than target)
direction, err = s.TxIndexIsLinear(ctx, s.chainParams.GenesisHash)
if err != nil {
t.Fatal(err)
}
t.Fatalf("expected success b3 -> genesis, got %v", err)
}
if direction >= 1 {
t.Fatalf("expected -1 going from b3 to genesis, got %v", direction)
}
_ = b2a
_ = b2b
//// b3 -> b1 should work with negative direction
//direction, err = s.TxIndexIsLinear(ctx, b1.Hash())
//if err == nil {
// t.Fatal("expected -1 going from b1 to genesis")
//}
//if direction != -1 {
// t.Fatal("expected -1 going from b3 to genesis")
//}
//// b3 -> b2a should fail
//direction, err = s.TxIndexIsLinear(ctx, b2a.Hash())
//if err == nil {
// t.Fatal(err)
//}

//// b3 -> b2b should fail
//direction, err = s.TxIndexIsLinear(ctx, b2b.Hash())
//if err == nil {
// t.Fatal(err)
//}

//// Should fail
//t.Logf("=== index b2a ===")
//err = s.SyncIndexersToHash(ctx, b2a.Hash())
//if err != nil {
// t.Fatal(err)
//}

//t.Logf("=== index b2b ===")
//err = s.SyncIndexersToHash(ctx, b2b.Hash())
//if err != nil {
// t.Fatal(err)
//}

time.Sleep(time.Second)
}
Expand Down

0 comments on commit ca21b3d

Please sign in to comment.