Skip to content

Commit

Permalink
Merge pull request #193 from kcalvinalvin/2024-07-03-recoverable-bridge
Browse files Browse the repository at this point in the history
Make bridge nodes recoverable
  • Loading branch information
kcalvinalvin authored Aug 28, 2024
2 parents b9fcbfc + 1546209 commit f68f79d
Show file tree
Hide file tree
Showing 26 changed files with 1,589 additions and 443 deletions.
29 changes: 29 additions & 0 deletions blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,14 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block,
return err
}

// Flush the indexes if they need to be flushed.
if b.indexManager != nil {
err := b.indexManager.Flush(&state.Hash, FlushIfNeeded, true)
if err != nil {
return err
}
}

// Prune fully spent entries and mark all entries in the view unmodified
// now that the modifications have been committed to the database.
if view != nil {
Expand Down Expand Up @@ -2328,6 +2336,27 @@ type IndexManager interface {
// PruneBlock is invoked when an older block is deleted after it's been
// processed. This lowers the storage requirement for a node.
PruneBlocks(database.Tx, int32, func(int32) (*chainhash.Hash, error)) error

// Flush flushes the relevant indexes if they need to be flushed.
Flush(*chainhash.Hash, FlushMode, bool) error
}

// FlushUtxoCache flushes the indexes if a flush is needed with the given flush mode.
// If the flush is on a block connect and not a reorg, the onConnect bool should be true.
//
// This function is safe for concurrent access.
func (b *BlockChain) FlushIndexes(mode FlushMode, onConnect bool) error {
b.chainLock.Lock()
defer b.chainLock.Unlock()

if b.indexManager != nil {
err := b.indexManager.Flush(&b.BestSnapshot().Hash, mode, onConnect)
if err != nil {
return err
}
}

return nil
}

// Config is a descriptor which specifies the blockchain instance configuration.
Expand Down
11 changes: 9 additions & 2 deletions blockchain/indexers/addrindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func (idx *AddrIndex) NeedsInputs() bool {
// initialize for this index.
//
// This is part of the Indexer interface.
func (idx *AddrIndex) Init(_ *blockchain.BlockChain) error {
func (idx *AddrIndex) Init(_ *blockchain.BlockChain, _ *chainhash.Hash, _ int32) error {
// Nothing to do.
return nil
}
Expand Down Expand Up @@ -816,7 +816,14 @@ func (idx *AddrIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
// supported with pruning.
//
// This is part of the Indexer interface.
func (idx *AddrIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) error {
func (idx *AddrIndex) PruneBlock(_ database.Tx, _ *chainhash.Hash, _ int32) error {
return nil
}

// For AddrIndex, flush is a no-op.
//
// This is part of the Indexer interface.
func (idx *AddrIndex) Flush(_ *chainhash.Hash, _ blockchain.FlushMode, _ bool) error {
return nil
}

Expand Down
11 changes: 9 additions & 2 deletions blockchain/indexers/cfindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (idx *CfIndex) NeedsInputs() bool {

// Init initializes the hash-based cf index. This is part of the Indexer
// interface.
func (idx *CfIndex) Init(_ *blockchain.BlockChain) error {
func (idx *CfIndex) Init(_ *blockchain.BlockChain, _ *chainhash.Hash, _ int32) error {
return nil // Nothing to do.
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func (idx *CfIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
// reindexing as a pruned node.
//
// This is part of the Indexer interface.
func (idx *CfIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) error {
func (idx *CfIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash, _ int32) error {
for _, key := range cfIndexKeys {
err := dbDeleteFilterIdxEntry(dbTx, key, blockHash)
if err != nil {
Expand All @@ -286,6 +286,13 @@ func (idx *CfIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) erro
return nil
}

// For CfIndex, flush is a no-op.
//
// This is part of the Indexer interface.
func (idx *CfIndex) Flush(_ *chainhash.Hash, _ blockchain.FlushMode, _ bool) error {
return nil
}

// entryByBlockHash fetches a filter index entry of a particular type
// (eg. filter, filter header, etc) for a filter type and block hash.
func (idx *CfIndex) entryByBlockHash(filterTypeKeys [][]byte,
Expand Down
7 changes: 5 additions & 2 deletions blockchain/indexers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Indexer interface {
// Init is invoked when the index manager is first initializing the
// index. This differs from the Create method in that it is called on
// every load, including the case the index was just created.
Init(*blockchain.BlockChain) error
Init(*blockchain.BlockChain, *chainhash.Hash, int32) error

// ConnectBlock is invoked when a new block has been connected to the
// main chain. The set of output spent within a block is also passed in
Expand All @@ -65,7 +65,10 @@ type Indexer interface {

// PruneBlock is invoked when an older block is deleted after it's been
// processed.
PruneBlock(database.Tx, *chainhash.Hash) error
PruneBlock(dbTx database.Tx, deletedBlock *chainhash.Hash, lastKeptHeight int32) error

// Flush flushes the index.
Flush(*chainhash.Hash, blockchain.FlushMode, bool) error
}

// AssertError identifies an error that indicates an internal code consistency
Expand Down
98 changes: 90 additions & 8 deletions blockchain/indexers/flatfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (

var (
// magicBytes are the bytes prepended to any entry in the dataFiles.
magicBytes = []byte{0xaa, 0xff, 0xaa, 0xff}
magicBytes = [4]byte{0xaa, 0xff, 0xaa, 0xff}
)

// FlatFileState is the shared state for storing flatfiles. It is specifically designed
Expand Down Expand Up @@ -59,6 +59,72 @@ type FlatFileState struct {
offsets []int64
}

// recoverOffsetFile recovers the offset file to the latest readable offset.
func (ff *FlatFileState) recoverOffsetFile(fileSize int64) error {
offsetFileSize := (fileSize / 8) * 8
return ff.offsetFile.Truncate(offsetFileSize)
}

// recover recovers the flat file state to a readable state by rolling back to the latest
// reable stored data.
func (ff *FlatFileState) recover() error {
log.Infof("Recovering flatfile as it's not consistent")
buf := make([]byte, 8)
for ; ff.currentHeight > 0; ff.currentHeight-- {
// Read from the dataFile. This read will grab the magic bytes and the
// size bytes.
offset := ff.offsets[ff.currentHeight]

_, err := ff.dataFile.ReadAt(buf, offset)
if err == nil && bytes.Equal(buf[:4], magicBytes[:]) {
// Size of the actual data we want to fetch.
size := binary.BigEndian.Uint32(buf[4:])

// Read the data.
dataBuf := make([]byte, size)
read, err := ff.dataFile.ReadAt(dataBuf, offset+8)
if err == nil && uint32(read) == size {
_, err := ff.FetchData(ff.currentHeight)
if err == nil {
// If we're able to read the data bytes, then return here.
return nil
}
}
}

// Truncating when the offset is bigger will append 0s.
// Only truncate when the offset is less than the data file size.
dataFileSize, err := ff.dataFile.Seek(0, 2)
if err != nil {
return err
}
if offset < dataFileSize {
err = ff.dataFile.Truncate(offset)
if err != nil {
return err
}
}

offsetFileSize, err := ff.offsetFile.Seek(0, 2)
if err != nil {
return err
}
// Each offset is 8 bytes.
err = ff.offsetFile.Truncate(offsetFileSize - 8)
if err != nil {
return err
}

// Set the currentOffset as the last offset.
ff.currentOffset = ff.offsets[len(ff.offsets)-1]

// Pop the offset in memory.
ff.offsets = ff.offsets[:len(ff.offsets)-1]
}

return nil
}

// Init initializes the FlatFileState. If resuming, it loads the offsets onto memory.
// If starting new, it creates an offsetFile and a dataFile along with the directories
// those belong in.
Expand Down Expand Up @@ -90,8 +156,12 @@ func (ff *FlatFileState) Init(path, dataName string) error {

// Offsets are always 8 bytes each.
if offsetFileSize%8 != 0 {
return fmt.Errorf("FlatFileState.Init(): Corrupt FlatFileState. " +
"offsetFile not mulitple of 8 bytes")
log.Infof("Recovering flatfile offsets as it's not consistent")
// recover the offsetfile if it's not in 8 byte increments.
err = ff.recoverOffsetFile(offsetFileSize)
if err != nil {
return err
}
}

// If the file size is bigger than 0, we're resuming and will read all
Expand Down Expand Up @@ -131,11 +201,18 @@ func (ff *FlatFileState) Init(path, dataName string) error {
return err
}

// Oo the same with the in-ram slice.
// Do the same with the in-ram slice.
ff.offsets = make([]int64, 1)
}

return nil
// Test if we can fetch the last stored data.
_, err = ff.FetchData(ff.currentHeight)
if err == nil {
return nil
}

// If we can't fetch the last stored data, recover to the last readable data.
return ff.recover()
}

// StoreData stores the given byte slice as a new entry in the dataFile.
Expand Down Expand Up @@ -179,7 +256,7 @@ func (ff *FlatFileState) StoreData(height int32, data []byte) error {
buf = buf[:len(data)+8]

// Add the magic bytes, size, and the data to the buffer to be written.
copy(buf[:4], magicBytes)
copy(buf[:4], magicBytes[:])
binary.BigEndian.PutUint32(buf[4:8], uint32(len(data)))
copy(buf[8:], data)

Expand Down Expand Up @@ -225,7 +302,7 @@ func (ff *FlatFileState) FetchData(height int32) ([]byte, error) {
}

// Sanity check. If wrong magic was read, then error out.
if !bytes.Equal(buf[:4], magicBytes) {
if !bytes.Equal(buf[:4], magicBytes[:]) {
return nil, fmt.Errorf("Read wrong magic bytes. Expect %x but got %x",
magicBytes, buf[:4])
}
Expand Down Expand Up @@ -266,7 +343,7 @@ func (ff *FlatFileState) DisconnectBlock(height int32) error {
return err
}

if !bytes.Equal(buf[:4], magicBytes) {
if !bytes.Equal(buf[:4], magicBytes[:]) {
return fmt.Errorf("read wrong magic of %x", buf[:4])
}

Expand Down Expand Up @@ -305,6 +382,11 @@ func (ff *FlatFileState) DisconnectBlock(height int32) error {
return nil
}

// BestHeight returns the current latest height of the flat file state.
func (ff *FlatFileState) BestHeight() int32 {
return ff.currentHeight
}

// deleteFileFile removes the flat file state directory and all the contents
// in it.
func deleteFlatFile(path string) error {
Expand Down
Loading

0 comments on commit f68f79d

Please sign in to comment.