diff --git a/blockchain/chain.go b/blockchain/chain.go index 5287ca0c..78ed8a56 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -760,6 +760,14 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, return err } + // Flush the indexes if they need to be flushed. + if b.indexManager != nil { + err := b.indexManager.Flush(&state.Hash, FlushIfNeeded, true) + if err != nil { + return err + } + } + // Prune fully spent entries and mark all entries in the view unmodified // now that the modifications have been committed to the database. if view != nil { @@ -2328,6 +2336,27 @@ type IndexManager interface { // PruneBlock is invoked when an older block is deleted after it's been // processed. This lowers the storage requirement for a node. PruneBlocks(database.Tx, int32, func(int32) (*chainhash.Hash, error)) error + + // Flush flushes the relevant indexes if they need to be flushed. + Flush(*chainhash.Hash, FlushMode, bool) error +} + +// FlushUtxoCache flushes the indexes if a flush is needed with the given flush mode. +// If the flush is on a block connect and not a reorg, the onConnect bool should be true. +// +// This function is safe for concurrent access. +func (b *BlockChain) FlushIndexes(mode FlushMode, onConnect bool) error { + b.chainLock.Lock() + defer b.chainLock.Unlock() + + if b.indexManager != nil { + err := b.indexManager.Flush(&b.BestSnapshot().Hash, mode, onConnect) + if err != nil { + return err + } + } + + return nil } // Config is a descriptor which specifies the blockchain instance configuration. diff --git a/blockchain/indexers/addrindex.go b/blockchain/indexers/addrindex.go index 154c091f..cfc2a3ad 100644 --- a/blockchain/indexers/addrindex.go +++ b/blockchain/indexers/addrindex.go @@ -645,7 +645,7 @@ func (idx *AddrIndex) NeedsInputs() bool { // initialize for this index. // // This is part of the Indexer interface. -func (idx *AddrIndex) Init(_ *blockchain.BlockChain) error { +func (idx *AddrIndex) Init(_ *blockchain.BlockChain, _ *chainhash.Hash, _ int32) error { // Nothing to do. return nil } @@ -816,7 +816,14 @@ func (idx *AddrIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, // supported with pruning. // // This is part of the Indexer interface. -func (idx *AddrIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) error { +func (idx *AddrIndex) PruneBlock(_ database.Tx, _ *chainhash.Hash, _ int32) error { + return nil +} + +// For AddrIndex, flush is a no-op. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Flush(_ *chainhash.Hash, _ blockchain.FlushMode, _ bool) error { return nil } diff --git a/blockchain/indexers/cfindex.go b/blockchain/indexers/cfindex.go index a1394ccd..8b5c7db0 100644 --- a/blockchain/indexers/cfindex.go +++ b/blockchain/indexers/cfindex.go @@ -96,7 +96,7 @@ func (idx *CfIndex) NeedsInputs() bool { // Init initializes the hash-based cf index. This is part of the Indexer // interface. -func (idx *CfIndex) Init(_ *blockchain.BlockChain) error { +func (idx *CfIndex) Init(_ *blockchain.BlockChain, _ *chainhash.Hash, _ int32) error { return nil // Nothing to do. } @@ -261,7 +261,7 @@ func (idx *CfIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, // reindexing as a pruned node. // // This is part of the Indexer interface. -func (idx *CfIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) error { +func (idx *CfIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash, _ int32) error { for _, key := range cfIndexKeys { err := dbDeleteFilterIdxEntry(dbTx, key, blockHash) if err != nil { @@ -286,6 +286,13 @@ func (idx *CfIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) erro return nil } +// For CfIndex, flush is a no-op. +// +// This is part of the Indexer interface. +func (idx *CfIndex) Flush(_ *chainhash.Hash, _ blockchain.FlushMode, _ bool) error { + return nil +} + // entryByBlockHash fetches a filter index entry of a particular type // (eg. filter, filter header, etc) for a filter type and block hash. func (idx *CfIndex) entryByBlockHash(filterTypeKeys [][]byte, diff --git a/blockchain/indexers/common.go b/blockchain/indexers/common.go index 6bc0106a..dc0ac97e 100644 --- a/blockchain/indexers/common.go +++ b/blockchain/indexers/common.go @@ -49,7 +49,7 @@ type Indexer interface { // Init is invoked when the index manager is first initializing the // index. This differs from the Create method in that it is called on // every load, including the case the index was just created. - Init(*blockchain.BlockChain) error + Init(*blockchain.BlockChain, *chainhash.Hash, int32) error // ConnectBlock is invoked when a new block has been connected to the // main chain. The set of output spent within a block is also passed in @@ -65,7 +65,10 @@ type Indexer interface { // PruneBlock is invoked when an older block is deleted after it's been // processed. - PruneBlock(database.Tx, *chainhash.Hash) error + PruneBlock(dbTx database.Tx, deletedBlock *chainhash.Hash, lastKeptHeight int32) error + + // Flush flushes the index. + Flush(*chainhash.Hash, blockchain.FlushMode, bool) error } // AssertError identifies an error that indicates an internal code consistency diff --git a/blockchain/indexers/flatfile.go b/blockchain/indexers/flatfile.go index fe806f55..562d7541 100644 --- a/blockchain/indexers/flatfile.go +++ b/blockchain/indexers/flatfile.go @@ -29,7 +29,7 @@ const ( var ( // magicBytes are the bytes prepended to any entry in the dataFiles. - magicBytes = []byte{0xaa, 0xff, 0xaa, 0xff} + magicBytes = [4]byte{0xaa, 0xff, 0xaa, 0xff} ) // FlatFileState is the shared state for storing flatfiles. It is specifically designed @@ -59,6 +59,72 @@ type FlatFileState struct { offsets []int64 } +// recoverOffsetFile recovers the offset file to the latest readable offset. +func (ff *FlatFileState) recoverOffsetFile(fileSize int64) error { + offsetFileSize := (fileSize / 8) * 8 + return ff.offsetFile.Truncate(offsetFileSize) +} + +// recover recovers the flat file state to a readable state by rolling back to the latest +// reable stored data. +func (ff *FlatFileState) recover() error { + log.Infof("Recovering flatfile as it's not consistent") + buf := make([]byte, 8) + for ; ff.currentHeight > 0; ff.currentHeight-- { + // Read from the dataFile. This read will grab the magic bytes and the + // size bytes. + offset := ff.offsets[ff.currentHeight] + + _, err := ff.dataFile.ReadAt(buf, offset) + if err == nil && bytes.Equal(buf[:4], magicBytes[:]) { + // Size of the actual data we want to fetch. + size := binary.BigEndian.Uint32(buf[4:]) + + // Read the data. + dataBuf := make([]byte, size) + read, err := ff.dataFile.ReadAt(dataBuf, offset+8) + if err == nil && uint32(read) == size { + _, err := ff.FetchData(ff.currentHeight) + if err == nil { + // If we're able to read the data bytes, then return here. + return nil + } + } + } + + // Truncating when the offset is bigger will append 0s. + // Only truncate when the offset is less than the data file size. + dataFileSize, err := ff.dataFile.Seek(0, 2) + if err != nil { + return err + } + if offset < dataFileSize { + err = ff.dataFile.Truncate(offset) + if err != nil { + return err + } + } + + offsetFileSize, err := ff.offsetFile.Seek(0, 2) + if err != nil { + return err + } + // Each offset is 8 bytes. + err = ff.offsetFile.Truncate(offsetFileSize - 8) + if err != nil { + return err + } + + // Set the currentOffset as the last offset. + ff.currentOffset = ff.offsets[len(ff.offsets)-1] + + // Pop the offset in memory. + ff.offsets = ff.offsets[:len(ff.offsets)-1] + } + + return nil +} + // Init initializes the FlatFileState. If resuming, it loads the offsets onto memory. // If starting new, it creates an offsetFile and a dataFile along with the directories // those belong in. @@ -90,8 +156,12 @@ func (ff *FlatFileState) Init(path, dataName string) error { // Offsets are always 8 bytes each. if offsetFileSize%8 != 0 { - return fmt.Errorf("FlatFileState.Init(): Corrupt FlatFileState. " + - "offsetFile not mulitple of 8 bytes") + log.Infof("Recovering flatfile offsets as it's not consistent") + // recover the offsetfile if it's not in 8 byte increments. + err = ff.recoverOffsetFile(offsetFileSize) + if err != nil { + return err + } } // If the file size is bigger than 0, we're resuming and will read all @@ -131,11 +201,18 @@ func (ff *FlatFileState) Init(path, dataName string) error { return err } - // Oo the same with the in-ram slice. + // Do the same with the in-ram slice. ff.offsets = make([]int64, 1) } - return nil + // Test if we can fetch the last stored data. + _, err = ff.FetchData(ff.currentHeight) + if err == nil { + return nil + } + + // If we can't fetch the last stored data, recover to the last readable data. + return ff.recover() } // StoreData stores the given byte slice as a new entry in the dataFile. @@ -179,7 +256,7 @@ func (ff *FlatFileState) StoreData(height int32, data []byte) error { buf = buf[:len(data)+8] // Add the magic bytes, size, and the data to the buffer to be written. - copy(buf[:4], magicBytes) + copy(buf[:4], magicBytes[:]) binary.BigEndian.PutUint32(buf[4:8], uint32(len(data))) copy(buf[8:], data) @@ -225,7 +302,7 @@ func (ff *FlatFileState) FetchData(height int32) ([]byte, error) { } // Sanity check. If wrong magic was read, then error out. - if !bytes.Equal(buf[:4], magicBytes) { + if !bytes.Equal(buf[:4], magicBytes[:]) { return nil, fmt.Errorf("Read wrong magic bytes. Expect %x but got %x", magicBytes, buf[:4]) } @@ -266,7 +343,7 @@ func (ff *FlatFileState) DisconnectBlock(height int32) error { return err } - if !bytes.Equal(buf[:4], magicBytes) { + if !bytes.Equal(buf[:4], magicBytes[:]) { return fmt.Errorf("read wrong magic of %x", buf[:4]) } @@ -305,6 +382,11 @@ func (ff *FlatFileState) DisconnectBlock(height int32) error { return nil } +// BestHeight returns the current latest height of the flat file state. +func (ff *FlatFileState) BestHeight() int32 { + return ff.currentHeight +} + // deleteFileFile removes the flat file state directory and all the contents // in it. func deleteFlatFile(path string) error { diff --git a/blockchain/indexers/flatfile_test.go b/blockchain/indexers/flatfile_test.go index 00d40640..720cfe59 100644 --- a/blockchain/indexers/flatfile_test.go +++ b/blockchain/indexers/flatfile_test.go @@ -327,7 +327,7 @@ func getAfterSizes(ff *FlatFileState, height int32) (int64, int64, error) { if err != nil { return 0, 0, err } - if !bytes.Equal(buf[:4], magicBytes) { + if !bytes.Equal(buf[:4], magicBytes[:]) { return 0, 0, fmt.Errorf("read wrong magic of %x", buf[:4]) } dataSize := binary.BigEndian.Uint32(buf[4:]) @@ -649,3 +649,200 @@ func TestMultipleFetchData(t *testing.T) { wg.Wait() } + +func TestRecover(t *testing.T) { + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + tests := []struct { + name string + datas [][]byte + truncateLen int64 + truncateOffsetFile bool + }{ + { + name: "first", + datas: func() [][]byte { + datas := make([][]byte, 0, 100) + for i := int32(1); i <= 100; i++ { + data, err := createRandByteSlice(rnd) + if err != nil { + t.Fatal(err) + } + datas = append(datas, data) + } + + return datas + }(), + truncateLen: 1, + truncateOffsetFile: true, + }, + { + name: "second", + datas: func() [][]byte { + datas := make([][]byte, 0, 100) + for i := int32(1); i <= 100; i++ { + data, err := createRandByteSlice(rnd) + if err != nil { + t.Fatal(err) + } + datas = append(datas, data) + } + + return datas + }(), + truncateLen: 7, + truncateOffsetFile: true, + }, + { + name: "third", + datas: func() [][]byte { + datas := make([][]byte, 0, 100) + for i := int32(1); i <= 100; i++ { + data, err := createRandByteSlice(rnd) + if err != nil { + t.Fatal(err) + } + datas = append(datas, data) + } + + return datas + }(), + truncateLen: 5, + truncateOffsetFile: true, + }, + { + name: "fourth", + datas: func() [][]byte { + datas := make([][]byte, 0, 100) + for i := int32(1); i <= 100; i++ { + data, err := createRandByteSlice(rnd) + if err != nil { + t.Fatal(err) + } + datas = append(datas, data) + } + + return datas + }(), + truncateLen: 1, + truncateOffsetFile: false, + }, + { + name: "fifth", + datas: func() [][]byte { + datas := make([][]byte, 0, 100) + for i := int32(1); i <= 100; i++ { + data, err := createRandByteSlice(rnd) + if err != nil { + t.Fatal(err) + } + datas = append(datas, data) + } + + return datas + }(), + truncateLen: 15, + truncateOffsetFile: false, + }, + { + name: "sixth", + datas: func() [][]byte { + datas := make([][]byte, 0, 100) + for i := int32(1); i <= 100; i++ { + data, err := createRandByteSlice(rnd) + if err != nil { + t.Fatal(err) + } + datas = append(datas, data) + } + + return datas + }(), + truncateLen: 155, + truncateOffsetFile: false, + }, + } + + for _, test := range tests { + tmpDir := t.TempDir() + dir := filepath.Join(tmpDir, "dir_"+test.name) + defer deleteFlatFile(dir) + + // Create and store data in the flat file state to test it. + ff := NewFlatFileState() + err := ff.Init(dir, test.name) + if err != nil { + t.Fatal(err) + } + for i, data := range test.datas { + err = ff.StoreData(int32(i)+1, data) + if err != nil { + t.Fatal(err) + } + } + + // Sanity checking. + for i, data := range test.datas { + fetched, err := ff.FetchData(int32(i) + 1) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(fetched, data) { + t.Fatalf("test %v. for height %v, expected %v, got %v", + test.name, + i, + hex.EncodeToString(test.datas[i-1]), + hex.EncodeToString(fetched)) + } + } + + // Corrupt the flat file state. + if test.truncateOffsetFile { + offsetFileSize, err := ff.offsetFile.Seek(0, 2) + if err != nil { + t.Fatal(err) + } + err = ff.offsetFile.Truncate(offsetFileSize - test.truncateLen) + if err != nil { + t.Fatal(err) + } + } else { + dataFileSize, err := ff.dataFile.Seek(0, 2) + if err != nil { + t.Fatal(err) + } + err = ff.dataFile.Truncate(dataFileSize - test.truncateLen) + if err != nil { + t.Fatal(err) + } + + // Test if we can fetch the last stored data. + _, err = ff.FetchData(ff.currentHeight) + if err == nil { + t.Fatalf("test %v. expected error", test.name) + } + } + + // Calling init here calls the recovery functions. + err = ff.Init(dir, test.name) + if err != nil { + t.Fatal(err) + } + + // Check that the data til the currentHeight is correct. + for i := int32(1); i <= ff.currentHeight; i++ { + fetched, err := ff.FetchData(i) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(fetched, test.datas[i-1]) { + t.Fatalf("test %v. for height %v, expected %v, got %v", + test.name, + i, + hex.EncodeToString(test.datas[i-1]), + hex.EncodeToString(fetched)) + } + } + } +} diff --git a/blockchain/indexers/flatutreexoproofindex.go b/blockchain/indexers/flatutreexoproofindex.go index 72a77463..d27a088b 100644 --- a/blockchain/indexers/flatutreexoproofindex.go +++ b/blockchain/indexers/flatutreexoproofindex.go @@ -13,6 +13,7 @@ import ( "path/filepath" "reflect" "sync" + "time" "github.com/utreexo/utreexo" "github.com/utreexo/utreexod/blockchain" @@ -87,11 +88,9 @@ type FlatUtreexoProofIndex struct { rememberIdxState FlatFileState proofStatsState FlatFileState rootsState FlatFileState - chainParams *chaincfg.Params - dataDir string - // True if the node is pruned. - pruned bool + // All the configurable metadata. + config *UtreexoConfig // The blockchain instance the index corresponds to. chain *blockchain.BlockChain @@ -105,6 +104,9 @@ type FlatUtreexoProofIndex struct { // pStats are the proof size statistics that are kept for research purposes. pStats proofStats + + // The time of when the utreexo state was last flushed. + lastFlushTime time.Time } // NeedsInputs signals that the index requires the referenced inputs in order @@ -115,27 +117,111 @@ func (idx *FlatUtreexoProofIndex) NeedsInputs() bool { return true } +// consistentFlatFileState rolls back all the flat file states to the tip height. +// The data is written to the flat files directly but the index tips are cached and +// then written to disk. This may lead to states where the index tip is lower than the +// data stored in the flat files. Rolling back the flat file state to the index tip +// keep ths entire indexer consistent. +func (idx *FlatUtreexoProofIndex) consistentFlatFileState(tipHeight int32) error { + if !idx.config.Pruned { + if idx.proofState.BestHeight() != 0 && + tipHeight < idx.proofState.BestHeight() { + bestHeight := idx.proofState.BestHeight() + for tipHeight != bestHeight && bestHeight > 0 { + err := idx.proofState.DisconnectBlock(bestHeight) + if err != nil { + return err + } + bestHeight-- + } + } + } + + if idx.undoState.BestHeight() != 0 && + tipHeight < idx.undoState.BestHeight() { + bestHeight := idx.undoState.BestHeight() + for tipHeight != bestHeight && bestHeight > 0 { + err := idx.undoState.DisconnectBlock(bestHeight) + if err != nil { + return err + } + bestHeight-- + } + } + + if idx.rememberIdxState.BestHeight() != 0 && + tipHeight < idx.rememberIdxState.BestHeight() { + bestHeight := idx.rememberIdxState.BestHeight() + for tipHeight != bestHeight && bestHeight > 0 { + err := idx.rememberIdxState.DisconnectBlock(bestHeight) + if err != nil { + return err + } + bestHeight-- + } + } + if idx.proofStatsState.BestHeight() != 0 && + tipHeight < idx.proofStatsState.BestHeight() { + bestHeight := idx.proofStatsState.BestHeight() + for tipHeight != bestHeight && bestHeight > 0 { + err := idx.proofStatsState.DisconnectBlock(bestHeight) + if err != nil { + return err + } + bestHeight-- + } + } + if idx.rootsState.BestHeight() != 0 && + tipHeight < idx.rootsState.BestHeight() { + bestHeight := idx.rootsState.BestHeight() + for tipHeight != bestHeight && bestHeight > 0 { + err := idx.rootsState.DisconnectBlock(bestHeight) + if err != nil { + return err + } + bestHeight-- + } + } + + return nil +} + // Init initializes the flat utreexo proof index. This is part of the Indexer // interface. -func (idx *FlatUtreexoProofIndex) Init(chain *blockchain.BlockChain) error { +func (idx *FlatUtreexoProofIndex) Init(chain *blockchain.BlockChain, + tipHash *chainhash.Hash, tipHeight int32) error { + idx.chain = chain + // Init Utreexo State. + uState, err := InitUtreexoState(idx.config, chain, tipHash, tipHeight) + if err != nil { + return err + } + idx.utreexoState = uState + idx.lastFlushTime = time.Now() + + err = idx.consistentFlatFileState(tipHeight) + if err != nil { + return err + } + // Nothing to do if the node is not pruned. // // If the node is pruned, then we need to check if it started off as // a pruned node or if the user switch to being a pruned node. - if !idx.pruned { + if !idx.config.Pruned { return nil } - proofPath := flatFilePath(idx.dataDir, flatUtreexoProofName) - _, err := os.Stat(proofPath) + proofPath := flatFilePath(idx.config.DataDir, flatUtreexoProofName) + _, err = os.Stat(proofPath) if err != nil { // If the error isn't nil, that means the proofpath // doesn't exist. return nil } - proofState, err := loadFlatFileState(idx.dataDir, flatUtreexoProofName) + proofState, err := loadFlatFileState(idx.config.DataDir, flatUtreexoProofName) if err != nil { return err } @@ -212,7 +298,7 @@ func (idx *FlatUtreexoProofIndex) Init(chain *blockchain.BlockChain) error { } // Delete proof stat file since it's not relevant to a pruned bridge node. - proofStatPath := flatFilePath(idx.dataDir, flatUtreexoProofStatsName) + proofStatPath := flatFilePath(idx.config.DataDir, flatUtreexoProofStatsName) err = deleteFlatFile(proofStatPath) if err != nil { return err @@ -287,7 +373,7 @@ func (idx *FlatUtreexoProofIndex) ConnectBlock(dbTx database.Tx, block *btcutil. // undo block in order to undo a block on reorgs. If we have all the // proofs block by block, that data can be used for reorgs but these // two modes will not have the proofs available. - if idx.pruned || idx.proofGenInterVal != 1 { + if idx.config.Pruned || idx.proofGenInterVal != 1 { err = idx.storeUndoBlock(block.Height(), uint64(len(adds)), ud.AccProof.Targets, delHashes) if err != nil { @@ -319,7 +405,7 @@ func (idx *FlatUtreexoProofIndex) ConnectBlock(dbTx database.Tx, block *btcutil. } // Don't store proofs if the node is pruned. - if idx.pruned { + if idx.config.Pruned { return nil } @@ -715,7 +801,7 @@ func (idx *FlatUtreexoProofIndex) getUndoData(block *btcutil.Block) (uint64, []u delHashes []utreexo.Hash ) - if !idx.pruned || idx.proofGenInterVal != 1 { + if !idx.config.Pruned || idx.proofGenInterVal != 1 { ud, err := idx.FetchUtreexoProof(block.Height(), false) if err != nil { return 0, nil, nil, err @@ -768,9 +854,16 @@ func (idx *FlatUtreexoProofIndex) DisconnectBlock(dbTx database.Tx, block *btcut return err } + // Always flush the utreexo state on flushes to never leave the utreexoState + // at an unrecoverable state. + err = idx.flushUtreexoState(&block.MsgBlock().Header.PrevBlock) + if err != nil { + return err + } + // Check if we're at a height where proof was generated. Only check if we're not // pruned as we don't keep the historical proofs as a pruned node. - if (block.Height()%idx.proofGenInterVal) == 0 && !idx.pruned { + if (block.Height()%idx.proofGenInterVal) == 0 && !idx.config.Pruned { height := block.Height() / idx.proofGenInterVal err = idx.proofState.DisconnectBlock(height) if err != nil { @@ -795,8 +888,29 @@ func (idx *FlatUtreexoProofIndex) DisconnectBlock(dbTx database.Tx, block *btcut // processed. // // This is part of the Indexer interface. -func (idx *FlatUtreexoProofIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) error { - return nil +func (idx *FlatUtreexoProofIndex) PruneBlock(_ database.Tx, _ *chainhash.Hash, lastKeptHeight int32) error { + hash, _, err := dbFetchUtreexoStateConsistency(idx.utreexoState.utreexoStateDB) + if err != nil { + return err + } + + // It's ok to call block by hash here as the utreexo state consistency hash is always + // included in the best chain. + lastFlushHeight, err := idx.chain.BlockHeightByHash(hash) + if err != nil { + return err + } + + // If the last flushed utreexo state is the last or greater than the kept block, + // we can sync up to the tip so a flush is not required. + if lastKeptHeight <= lastFlushHeight { + return nil + } + + // It's ok to fetch the best snapshot here as the block called on pruneblock has not + // been yet connected yet on the utreexo state. So this is indeed the correct hash. + bestHash := idx.chain.BestSnapshot().Hash + return idx.Flush(&bestHash, blockchain.FlushRequired, true) } // FetchUtreexoProof returns the Utreexo proof data for the given block height. @@ -807,7 +921,7 @@ func (idx *FlatUtreexoProofIndex) FetchUtreexoProof(height int32, excludeAccProo return nil, fmt.Errorf("No Utreexo Proof for height %d", height) } - if idx.pruned { + if idx.config.Pruned { return nil, fmt.Errorf("Cannot fetch historical proof as the node is pruned") } @@ -896,7 +1010,7 @@ func (idx *FlatUtreexoProofIndex) FetchMultiUtreexoProof(height int32) ( return nil, nil, nil, fmt.Errorf("No Utreexo Proof for height %d", height) } - if idx.pruned { + if idx.config.Pruned { return nil, nil, nil, fmt.Errorf("Cannot fetch historical proof as the node is pruned") } @@ -1253,7 +1367,7 @@ func loadFlatFileState(dataDir, name string) (*FlatFileState, error) { // turn is used by the blockchain package. This allows the index to be // seamlessly maintained along with the chain. func NewFlatUtreexoProofIndex(pruned bool, chainParams *chaincfg.Params, - proofGenInterVal *int32, maxMemoryUsage int64, dataDir string) (*FlatUtreexoProofIndex, error) { + proofGenInterVal *int32, maxMemoryUsage int64, dataDir string, flush func() error) (*FlatUtreexoProofIndex, error) { // If the proofGenInterVal argument is nil, use the default value. var intervalToUse int32 @@ -1265,25 +1379,19 @@ func NewFlatUtreexoProofIndex(pruned bool, chainParams *chaincfg.Params, idx := &FlatUtreexoProofIndex{ proofGenInterVal: intervalToUse, - chainParams: chainParams, mtx: new(sync.RWMutex), - dataDir: dataDir, - } - - // Init Utreexo State. - uState, err := InitUtreexoState(&UtreexoConfig{ - DataDir: dataDir, - Name: flatUtreexoProofIndexType, - Params: chainParams, - }, maxMemoryUsage) - if err != nil { - return nil, err + config: &UtreexoConfig{ + MaxMemoryUsage: maxMemoryUsage, + Params: chainParams, + Pruned: pruned, + DataDir: dataDir, + Name: flatUtreexoProofIndexType, + FlushMainDB: flush, + }, } - idx.utreexoState = uState - idx.pruned = pruned // Init the utreexo proof state if the node isn't pruned. - if !idx.pruned { + if !idx.config.Pruned { proofState, err := loadFlatFileState(dataDir, flatUtreexoProofName) if err != nil { return nil, err diff --git a/blockchain/indexers/indexers_test.go b/blockchain/indexers/indexers_test.go index a9695294..5c0ab9fc 100644 --- a/blockchain/indexers/indexers_test.go +++ b/blockchain/indexers/indexers_test.go @@ -70,17 +70,17 @@ func createDB(dbName string) (database.DB, string, error) { return db, dbPath, nil } -func initIndexes(interval int32, dbPath string, db *database.DB, params *chaincfg.Params) ( +func initIndexes(interval int32, dbPath string, db database.DB, params *chaincfg.Params) ( *Manager, []Indexer, error) { proofGenInterval := new(int32) *proofGenInterval = interval - flatUtreexoProofIndex, err := NewFlatUtreexoProofIndex(false, params, proofGenInterval, 50*1024*1024, dbPath) + flatUtreexoProofIndex, err := NewFlatUtreexoProofIndex(false, params, proofGenInterval, 50*1024*1024, dbPath, db.Flush) if err != nil { return nil, nil, err } - utreexoProofIndex, err := NewUtreexoProofIndex(*db, false, 50*1024*1024, params, dbPath) + utreexoProofIndex, err := NewUtreexoProofIndex(db, false, 50*1024*1024, params, dbPath, db.Flush) if err != nil { return nil, nil, err } @@ -89,7 +89,7 @@ func initIndexes(interval int32, dbPath string, db *database.DB, params *chaincf utreexoProofIndex, flatUtreexoProofIndex, } - indexManager := NewManager(*db, indexes) + indexManager := NewManager(db, indexes) return indexManager, indexes, nil } @@ -109,7 +109,7 @@ func indexersTestChain(testName string, proofGenInterval int32) (*blockchain.Blo } // Create the indexes to be used in the chain. - indexManager, indexes, err := initIndexes(proofGenInterval, dbPath, &db, ¶ms) + indexManager, indexes, err := initIndexes(proofGenInterval, dbPath, db, ¶ms) if err != nil { tearDown() os.RemoveAll(testDbRoot) @@ -132,14 +132,6 @@ func indexersTestChain(testName string, proofGenInterval int32) (*blockchain.Blo panic(fmt.Errorf("failed to create chain instance: %v", err)) } - // Init the indexes. - err = indexManager.Init(chain, nil) - if err != nil { - tearDown() - os.RemoveAll(testDbRoot) - panic(fmt.Errorf("failed to init indexs: %v", err)) - } - return chain, indexes, ¶ms, indexManager, tearDown } @@ -203,7 +195,7 @@ func compareUtreexoIdx(start, end int32, pruned bool, chain *blockchain.BlockCha return err } - if !idxType.pruned { + if !idxType.config.Pruned { utreexoUD, err = idxType.FetchUtreexoProof(block.Hash()) if err != nil { return err @@ -222,7 +214,7 @@ func compareUtreexoIdx(start, end int32, pruned bool, chain *blockchain.BlockCha case *FlatUtreexoProofIndex: var err error - if !idxType.pruned { + if !idxType.config.Pruned { flatUD, err = idxType.FetchUtreexoProof(b, false) if err != nil { return err @@ -1033,7 +1025,7 @@ func TestBridgeNodePruneUndoDataGen(t *testing.T) { t.Fatal(err) } } - idxType.pruned = true + idxType.config.Pruned = true case *UtreexoProofIndex: for height := int32(1); height <= maxHeight; height++ { @@ -1047,10 +1039,26 @@ func TestBridgeNodePruneUndoDataGen(t *testing.T) { t.Fatal(err) } } - idxType.pruned = true + idxType.config.Pruned = true } } + // Close the databases so that they can be initialized again + // to generate the undo data. + for _, indexer := range indexes { + switch idxType := indexer.(type) { + case *FlatUtreexoProofIndex: + err := idxType.CloseUtreexoState() + if err != nil { + t.Fatal(err) + } + case *UtreexoProofIndex: + err := idxType.CloseUtreexoState() + if err != nil { + t.Fatal(err) + } + } + } // Here we generate the undo data and delete the proof files. err = indexManager.Init(chain, nil) if err != nil { diff --git a/blockchain/indexers/manager.go b/blockchain/indexers/manager.go index 45437d7f..828624d7 100644 --- a/blockchain/indexers/manager.go +++ b/blockchain/indexers/manager.go @@ -311,7 +311,18 @@ func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{}) // Initialize each of the enabled indexes. for _, indexer := range m.enabledIndexes { - if err := indexer.Init(chain); err != nil { + // Fetch the current tip for the index. + var height int32 + var hash *chainhash.Hash + err := m.db.View(func(dbTx database.Tx) error { + idxKey := indexer.Key() + hash, height, err = dbFetchIndexerTip(dbTx, idxKey) + return err + }) + if err != nil { + return err + } + if err := indexer.Init(chain, hash, height); err != nil { return err } } @@ -468,24 +479,6 @@ func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{}) return err } - if interruptRequested(interrupt) { - for _, indexer := range m.enabledIndexes { - switch idxType := indexer.(type) { - case *UtreexoProofIndex: - err := idxType.FlushUtreexoState() - if err != nil { - log.Errorf("Error while flushing utreexo state: %v", err) - } - case *FlatUtreexoProofIndex: - err := idxType.FlushUtreexoState() - if err != nil { - log.Errorf("Error while flushing utreexo state for flat utreexo proof index: %v", err) - } - } - } - return errInterruptRequested - } - // Connect the block for all indexes that need it. var spentTxos []blockchain.SpentTxOut for i, indexer := range m.enabledIndexes { @@ -523,19 +516,33 @@ func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{}) for _, indexer := range m.enabledIndexes { switch idxType := indexer.(type) { case *UtreexoProofIndex: - err := idxType.FlushUtreexoState() + err := idxType.flushUtreexoState(block.Hash()) + if err != nil { + log.Errorf("Error while flushing utreexo state for utreexo proof index: %v", err) + } + err = idxType.utreexoState.utreexoStateDB.Close() if err != nil { - log.Errorf("Error while flushing utreexo state: %v", err) + log.Errorf("Error while closing the utreexo state for utreexo proof index: %v", err) } case *FlatUtreexoProofIndex: - err := idxType.FlushUtreexoState() + err := idxType.flushUtreexoState(block.Hash()) if err != nil { log.Errorf("Error while flushing utreexo state for flat utreexo proof index: %v", err) } + err = idxType.utreexoState.utreexoStateDB.Close() + if err != nil { + log.Errorf("Error while closing the utreexo state for flat utreexo proof index: %v", err) + } } } return errInterruptRequested } + + // Flush indexes if needed. + err = m.Flush(block.Hash(), blockchain.FlushIfNeeded, true) + if err != nil { + return err + } } log.Infof("Indexes caught up to height %d", bestHeight) @@ -645,7 +652,7 @@ func (m *Manager) PruneBlocks(dbTx database.Tx, lastKeptHeight int32, } // Notify the indexer with the connected block so it can prune it. - err = index.PruneBlock(dbTx, blockHash) + err = index.PruneBlock(dbTx, blockHash, lastKeptHeight) if err != nil { return err } @@ -667,6 +674,18 @@ func (m *Manager) PruneBlocks(dbTx database.Tx, lastKeptHeight int32, return nil } +// Flush flushes the enabled indexes. For the indexers that do not need to be flushed, it's a no-op. +func (m *Manager) Flush(bestHash *chainhash.Hash, mode blockchain.FlushMode, onConnect bool) error { + for _, index := range m.enabledIndexes { + err := index.Flush(bestHash, mode, onConnect) + if err != nil { + return err + } + } + + return nil +} + // NewManager returns a new index manager with the provided indexes enabled. // // The manager returned satisfies the blockchain.IndexManager interface and thus diff --git a/blockchain/indexers/ttlindex.go b/blockchain/indexers/ttlindex.go index 06009ee1..edfa9793 100644 --- a/blockchain/indexers/ttlindex.go +++ b/blockchain/indexers/ttlindex.go @@ -41,7 +41,7 @@ func (idx *TTLIndex) NeedsInputs() bool { // Init initializes the time to live index. This is part of the Indexer // interface. -func (idx *TTLIndex) Init(_ *blockchain.BlockChain) error { +func (idx *TTLIndex) Init(_ *blockchain.BlockChain, _ *chainhash.Hash, _ int32) error { return nil // Nothing to do. } @@ -97,7 +97,14 @@ func (idx *TTLIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, // supported with pruning. // // This is part of the Indexer interface. -func (idx *TTLIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) error { +func (idx *TTLIndex) PruneBlock(_ database.Tx, _ *chainhash.Hash, _ int32) error { + return nil +} + +// For TTLIndex, flush is a no-op. +// +// This is part of the Indexer interface. +func (idx *TTLIndex) Flush(_ *chainhash.Hash, _ blockchain.FlushMode, _ bool) error { return nil } diff --git a/blockchain/indexers/txindex.go b/blockchain/indexers/txindex.go index d08db037..45c0bd84 100644 --- a/blockchain/indexers/txindex.go +++ b/blockchain/indexers/txindex.go @@ -294,7 +294,7 @@ var _ Indexer = (*TxIndex)(nil) // disconnecting blocks. // // This is part of the Indexer interface. -func (idx *TxIndex) Init(_ *blockchain.BlockChain) error { +func (idx *TxIndex) Init(_ *blockchain.BlockChain, _ *chainhash.Hash, _ int32) error { // Find the latest known block id field for the internal block id // index and initialize it. This is done because it's a lot more // efficient to do a single search at initialize time than it is to @@ -437,7 +437,14 @@ func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, // supported with pruning. // // This is part of the Indexer interface. -func (idx *TxIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) error { +func (idx *TxIndex) PruneBlock(_ database.Tx, _ *chainhash.Hash, _ int32) error { + return nil +} + +// NOTE: For TxIndex, flush is a no-op. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Flush(_ *chainhash.Hash, _ blockchain.FlushMode, _ bool) error { return nil } diff --git a/blockchain/indexers/utreexobackend.go b/blockchain/indexers/utreexobackend.go index 36c09851..d521e1c1 100644 --- a/blockchain/indexers/utreexobackend.go +++ b/blockchain/indexers/utreexobackend.go @@ -10,25 +10,45 @@ import ( "fmt" "os" "path/filepath" + "time" + "github.com/syndtr/goleveldb/leveldb" "github.com/utreexo/utreexo" "github.com/utreexo/utreexod/blockchain" "github.com/utreexo/utreexod/chaincfg" "github.com/utreexo/utreexod/chaincfg/chainhash" "github.com/utreexo/utreexod/database" + "github.com/utreexo/utreexod/wire" ) const ( // utreexoDirName is the name of the directory in which the utreexo state // is stored. - utreexoDirName = "utreexostate" - nodesDBDirName = "nodes" - cachedLeavesDBDirName = "cachedleaves" - defaultUtreexoFileName = "forest.dat" + utreexoDirName = "utreexostate" + + // oldDefaultUtreexoFileName is the file name of the utreexo state that the num leaves + // used to be stored in. + oldDefaultUtreexoFileName = "forest.dat" +) + +var ( + // utreexoStateConsistencyKeyName is name of the db key used to store the consistency + // state for the utreexo accumulator state. + utreexoStateConsistencyKeyName = []byte("utreexostateconsistency") ) // UtreexoConfig is a descriptor which specifies the Utreexo state instance configuration. type UtreexoConfig struct { + // MaxMemoryUsage is the desired memory usage for the utreexo state cache. + MaxMemoryUsage int64 + + // Params are the Bitcoin network parameters. This is used to separately store + // different accumulators. + Params *chaincfg.Params + + // If the node is a pruned node or not. + Pruned bool + // DataDir is the base path of where all the data for this node will be stored. // Utreexo has custom storage method and that data will be stored under this // directory. @@ -38,18 +58,48 @@ type UtreexoConfig struct { // to. Name string - // Params are the Bitcoin network parameters. This is used to separately store - // different accumulators. - Params *chaincfg.Params + // FlushMainDB flushes the main database where all the data is stored. + FlushMainDB func() error } // UtreexoState is a wrapper around the raw accumulator with configuration // information. It contains the entire, non-pruned accumulator. type UtreexoState struct { - config *UtreexoConfig - state utreexo.Utreexo + config *UtreexoConfig + state utreexo.Utreexo + utreexoStateDB *leveldb.DB - closeDB func() error + isFlushNeeded func() bool + flushLeavesAndNodes func(ldbTx *leveldb.Transaction) error +} + +// flush flushes the utreexo state and all the data necessary for the utreexo state to be recoverable +// on sudden crashes. +func (us *UtreexoState) flush(bestHash *chainhash.Hash) error { + ldbTx, err := us.utreexoStateDB.OpenTransaction() + if err != nil { + return err + } + + // Write the best block hash and the numleaves for the utreexo state. + err = dbWriteUtreexoStateConsistency(ldbTx, bestHash, us.state.GetNumLeaves()) + if err != nil { + return err + } + + err = us.flushLeavesAndNodes(ldbTx) + if err != nil { + ldbTx.Discard() + return err + } + + err = ldbTx.Commit() + if err != nil { + ldbTx.Discard() + return err + } + + return nil } // utreexoBasePath returns the base path of where the utreexo state should be @@ -58,17 +108,6 @@ func utreexoBasePath(cfg *UtreexoConfig) string { return filepath.Join(cfg.DataDir, utreexoDirName+"_"+cfg.Name) } -// InitUtreexoState returns an initialized utreexo state. If there isn't an -// existing state on disk, it creates one and returns it. -// maxMemoryUsage of 0 will keep every element on disk. A negaive maxMemoryUsage will -// load every element to the memory. -func InitUtreexoState(cfg *UtreexoConfig, maxMemoryUsage int64) (*UtreexoState, error) { - basePath := utreexoBasePath(cfg) - log.Infof("Initializing Utreexo state from '%s'", basePath) - defer log.Info("Utreexo state loaded") - return initUtreexoState(cfg, maxMemoryUsage, basePath) -} - // deleteUtreexoState removes the utreexo state directory and all the contents // in it. func deleteUtreexoState(path string) error { @@ -84,7 +123,7 @@ func deleteUtreexoState(path string) error { // checkUtreexoExists checks that the data for this utreexo state type specified // in the config is present and should be resumed off of. func checkUtreexoExists(cfg *UtreexoConfig, basePath string) bool { - path := filepath.Join(basePath, defaultUtreexoFileName) + path := filepath.Join(basePath, oldDefaultUtreexoFileName) _, err := os.Stat(path) if err != nil && os.IsNotExist(err) { return false @@ -92,6 +131,36 @@ func checkUtreexoExists(cfg *UtreexoConfig, basePath string) bool { return true } +// dbWriteUtreexoStateConsistency writes the consistency state to the database using the given transaction. +func dbWriteUtreexoStateConsistency(ldbTx *leveldb.Transaction, bestHash *chainhash.Hash, numLeaves uint64) error { + // Create the byte slice to be written. + var buf [8 + chainhash.HashSize]byte + binary.LittleEndian.PutUint64(buf[:8], numLeaves) + copy(buf[8:], bestHash[:]) + + return ldbTx.Put(utreexoStateConsistencyKeyName, buf[:], nil) +} + +// dbFetchUtreexoStateConsistency returns the stored besthash and the numleaves in the database. +func dbFetchUtreexoStateConsistency(db *leveldb.DB) (*chainhash.Hash, uint64, error) { + buf, err := db.Get(utreexoStateConsistencyKeyName, nil) + if err != nil && err != leveldb.ErrNotFound { + return nil, 0, err + } + // Set error to nil as the error may have been ErrNotFound. + err = nil + if buf == nil { + return nil, 0, nil + } + + bestHash, err := chainhash.NewHash(buf[8:]) + if err != nil { + return nil, 0, err + } + + return bestHash, binary.LittleEndian.Uint64(buf[:8]), nil +} + // FetchCurrentUtreexoState returns the current utreexo state. func (idx *UtreexoProofIndex) FetchCurrentUtreexoState() ([]*chainhash.Hash, uint64) { idx.mtx.RLock() @@ -154,46 +223,138 @@ func (idx *FlatUtreexoProofIndex) FetchUtreexoState(blockHeight int32) ([]*chain return chainhashRoots, stump.NumLeaves, nil } -// FlushUtreexoState saves the utreexo state to disk. -func (idx *UtreexoProofIndex) FlushUtreexoState() error { - basePath := utreexoBasePath(idx.utreexoState.config) - if _, err := os.Stat(basePath); err != nil { - os.MkdirAll(basePath, os.ModePerm) +// Flush flushes the utreexo state. The different modes pass in as an argument determine if the utreexo state +// will be flushed or not. +// +// The onConnect bool is if the Flush is called on a block connect or a disconnect. +// It's important as it determines if we flush the main node db before attempting to flush the utreexo state. +// For the utreexo state to be recoverable, it has to be behind whatever tip the main database is at. +// On block connects, we always want to flush first but on disconnects, we want to flush first before the +// data necessary undo data is removed. +func (idx *UtreexoProofIndex) Flush(bestHash *chainhash.Hash, mode blockchain.FlushMode, onConnect bool) error { + switch mode { + case blockchain.FlushPeriodic: + // If the time since the last flush less then the interval, just return. + if time.Since(idx.lastFlushTime) < blockchain.UtxoFlushPeriodicInterval { + return nil + } + case blockchain.FlushIfNeeded: + if !idx.utreexoState.isFlushNeeded() { + return nil + } + case blockchain.FlushRequired: + // Purposely left empty. } - forestFilePath := filepath.Join(basePath, defaultUtreexoFileName) - forestFile, err := os.OpenFile(forestFilePath, os.O_RDWR|os.O_CREATE, 0666) - if err != nil { - return err + + if onConnect { + // Flush the main database first. This is because the block and other data may still + // be in the database cache. If we flush the utreexo state before, there's no way to + // undo the utreexo state to the last block where the main database flushed. Flushing + // this before we flush the utreexo state ensures that we leave the database state at + // a recoverable state. + // + // This is different from on disconnect as you want the utreexo state to be flushed + // first as the utreexo state can always catch up to the main db tip but can't undo + // without the main database data. + err := idx.config.FlushMainDB() + if err != nil { + return err + } } - var buf [8]byte - binary.LittleEndian.PutUint64(buf[:], idx.utreexoState.state.GetNumLeaves()) - _, err = forestFile.Write(buf[:]) + err := idx.flushUtreexoState(bestHash) if err != nil { return err } - return idx.utreexoState.closeDB() + // Set the last flush time as now as the flush was successful. + idx.lastFlushTime = time.Now() + return nil } // FlushUtreexoState saves the utreexo state to disk. -func (idx *FlatUtreexoProofIndex) FlushUtreexoState() error { - basePath := utreexoBasePath(idx.utreexoState.config) - if _, err := os.Stat(basePath); err != nil { - os.MkdirAll(basePath, os.ModePerm) - } - forestFilePath := filepath.Join(basePath, defaultUtreexoFileName) - forestFile, err := os.OpenFile(forestFilePath, os.O_RDWR|os.O_CREATE, 0666) +func (idx *UtreexoProofIndex) flushUtreexoState(bestHash *chainhash.Hash) error { + idx.mtx.Lock() + defer idx.mtx.Unlock() + + log.Infof("Flushing the utreexo state to disk...") + return idx.utreexoState.flush(bestHash) +} + +// CloseUtreexoState flushes and closes the utreexo database state. +func (idx *UtreexoProofIndex) CloseUtreexoState() error { + bestHash := idx.chain.BestSnapshot().Hash + err := idx.flushUtreexoState(&bestHash) if err != nil { - return err + log.Warnf("error whiling flushing the utreexo state. %v", err) } - var buf [8]byte - binary.LittleEndian.PutUint64(buf[:], idx.utreexoState.state.GetNumLeaves()) - _, err = forestFile.Write(buf[:]) + return idx.utreexoState.utreexoStateDB.Close() +} + +// Flush flushes the utreexo state. The different modes pass in as an argument determine if the utreexo state +// will be flushed or not. +// +// The onConnect bool is if the Flush is called on a block connect or a disconnect. +// It's important as it determines if we flush the main node db before attempting to flush the utreexo state. +// For the utreexo state to be recoverable, it has to be behind whatever tip the main database is at. +// On block connects, we always want to flush first but on disconnects, we want to flush first before the +// data necessary undo data is removed. +func (idx *FlatUtreexoProofIndex) Flush(bestHash *chainhash.Hash, mode blockchain.FlushMode, onConnect bool) error { + switch mode { + case blockchain.FlushPeriodic: + // If the time since the last flush less then the interval, just return. + if time.Since(idx.lastFlushTime) < blockchain.UtxoFlushPeriodicInterval { + return nil + } + case blockchain.FlushIfNeeded: + if !idx.utreexoState.isFlushNeeded() { + return nil + } + case blockchain.FlushRequired: + // Purposely left empty. + } + + if onConnect { + // Flush the main database first. This is because the block and other data may still + // be in the database cache. If we flush the utreexo state before, there's no way to + // undo the utreexo state to the last block where the main database flushed. Flushing + // this before we flush the utreexo state ensures that we leave the database state at + // a recoverable state. + // + // This is different from on disconnect as you want the utreexo state to be flushed + // first as the utreexo state can always catch up to the main db tip but can't undo + // without the main database data. + err := idx.config.FlushMainDB() + if err != nil { + return err + } + } + err := idx.flushUtreexoState(bestHash) if err != nil { return err } - return idx.utreexoState.closeDB() + // Set the last flush time as now as the flush was successful. + idx.lastFlushTime = time.Now() + return nil +} + +// FlushUtreexoState saves the utreexo state to disk. +func (idx *FlatUtreexoProofIndex) flushUtreexoState(bestHash *chainhash.Hash) error { + idx.mtx.Lock() + defer idx.mtx.Unlock() + + log.Infof("Flushing the utreexo state to disk...") + return idx.utreexoState.flush(bestHash) +} + +// CloseUtreexoState flushes and closes the utreexo database state. +func (idx *FlatUtreexoProofIndex) CloseUtreexoState() error { + bestHash := idx.chain.BestSnapshot().Hash + err := idx.flushUtreexoState(&bestHash) + if err != nil { + log.Warnf("error whiling flushing the utreexo state. %v", err) + } + return idx.utreexoState.utreexoStateDB.Close() } // serializeUndoBlock serializes all the data that's needed for undoing a full utreexo state @@ -310,59 +471,266 @@ func deserializeUndoBlock(serialized []byte) (uint64, []uint64, []utreexo.Hash, return numAdds, targets, delHashes, nil } -// initUtreexoState creates a new utreexo state and returns it. maxMemoryUsage of 0 will keep -// every element on disk and a negative maxMemoryUsage will load all the elemnts to memory. -func initUtreexoState(cfg *UtreexoConfig, maxMemoryUsage int64, basePath string) (*UtreexoState, error) { - p := utreexo.NewMapPollard(true) +// upgradeUtreexoState upgrades the utreexo state to be atomic. +func upgradeUtreexoState(cfg *UtreexoConfig, p *utreexo.MapPollard, + db *leveldb.DB, bestHash *chainhash.Hash) error { + + // Check if the current database is an older database that needs to be upgraded. + if !checkUtreexoExists(cfg, utreexoBasePath(cfg)) { + return nil + } - // 60% of the memory for the nodes map, 40% for the cache leaves map. - // TODO Totally arbitrary, it there's something better than change it to that. - maxNodesMem := maxMemoryUsage * 6 / 10 - maxCachedLeavesMem := maxMemoryUsage - maxNodesMem + log.Infof("Upgrading the utreexo state database. Do NOT shut down this process. " + + "This may take a while...") - nodesPath := filepath.Join(basePath, nodesDBDirName) - nodesDB, err := blockchain.InitNodesBackEnd(nodesPath, maxNodesMem) + // Write the nodes to the new database. + nodesPath := filepath.Join(utreexoBasePath(cfg), "nodes") + nodesDB, err := leveldb.OpenFile(nodesPath, nil) if err != nil { - return nil, err + return err } - cachedLeavesPath := filepath.Join(basePath, cachedLeavesDBDirName) - cachedLeavesDB, err := blockchain.InitCachedLeavesBackEnd(cachedLeavesPath, maxCachedLeavesMem) + ldbTx, err := db.OpenTransaction() if err != nil { - return nil, err + return err } - if checkUtreexoExists(cfg, basePath) { - forestFilePath := filepath.Join(basePath, defaultUtreexoFileName) - file, err := os.OpenFile(forestFilePath, os.O_RDWR, 0400) + iter := nodesDB.NewIterator(nil, nil) + for iter.Next() { + err = ldbTx.Put(iter.Key(), iter.Value(), nil) if err != nil { - return nil, err + ldbTx.Discard() + return err } - var buf [8]byte - _, err = file.Read(buf[:]) + } + nodesDB.Close() + + // Write the cached leaves to the new database. + cachedLeavesPath := filepath.Join(utreexoBasePath(cfg), "cachedleaves") + cachedLeavesDB, err := leveldb.OpenFile(cachedLeavesPath, nil) + if err != nil { + return err + } + + iter = cachedLeavesDB.NewIterator(nil, nil) + for iter.Next() { + err = ldbTx.Put(iter.Key(), iter.Value(), nil) if err != nil { - return nil, err + ldbTx.Discard() + return err } - p.NumLeaves = binary.LittleEndian.Uint64(buf[:]) + } + cachedLeavesDB.Close() + + // Open the file and read the numLeaves. + forestFilePath := filepath.Join(utreexoBasePath(cfg), oldDefaultUtreexoFileName) + file, err := os.OpenFile(forestFilePath, os.O_RDWR, 0400) + if err != nil { + return err + } + var buf [8]byte + _, err = file.Read(buf[:]) + if err != nil { + return err } - var closeDB func() error - if maxMemoryUsage >= 0 { + // Save the consistency state + p.NumLeaves = binary.LittleEndian.Uint64(buf[:8]) + err = dbWriteUtreexoStateConsistency(ldbTx, bestHash, p.NumLeaves) + if err != nil { + ldbTx.Discard() + return err + } + + // Commit all the writes to the database. + err = ldbTx.Commit() + if err != nil { + ldbTx.Discard() + return err + } + + // Remove the unnecessary file after the upgrade. + err = os.Remove(forestFilePath) + if err != nil { + return err + } + err = os.RemoveAll(cachedLeavesPath) + if err != nil { + return err + } + err = os.RemoveAll(nodesPath) + if err != nil { + return err + } + + log.Infof("Finished upgrading the utreexo state database.") + return nil +} + +// initConsistentUtreexoState makes the utreexo state consistent with the given tipHash. +func (us *UtreexoState) initConsistentUtreexoState(chain *blockchain.BlockChain, + savedHash, tipHash *chainhash.Hash, tipHeight int32) error { + + // This is a new accumulator state that we're working with. + var empty chainhash.Hash + if tipHeight == -1 && tipHash.IsEqual(&empty) { + return nil + } + + // We're all caught up if both of the hashes are equal. + if savedHash != nil && savedHash.IsEqual(tipHash) { + return nil + } + + currentHeight := int32(-1) + if savedHash != nil { + // Even though this should always be true, make sure the fetched hash is in + // the best chain. + if !chain.MainChainHasBlock(savedHash) { + return fmt.Errorf("last utreexo consistency status contains "+ + "hash that is not in best chain: %v", savedHash) + } + + var err error + currentHeight, err = chain.BlockHeightByHash(savedHash) + if err != nil { + return err + } + + if currentHeight > tipHeight { + return fmt.Errorf("Saved besthash has a heigher height "+ + "of %v than tip height of %v. The utreexo state is NOT "+ + "recoverable and should be dropped and reindexed", + currentHeight, tipHeight) + } + } else { + // Mark it as an empty hash for logging below. + savedHash = new(chainhash.Hash) + } + + log.Infof("Reconstructing the Utreexo state after an unclean shutdown. The Utreexo state is "+ + "consistent at block %s (%d) but the index tip is at block %s (%d), This may "+ + "take a long time...", savedHash.String(), currentHeight, tipHash.String(), tipHeight) + + for h := currentHeight + 1; h <= tipHeight; h++ { + block, err := chain.BlockByHeight(h) + if err != nil { + return err + } + + stxos, err := chain.FetchSpendJournal(block) + if err != nil { + return err + } + + _, outCount, inskip, outskip := blockchain.DedupeBlock(block) + dels, _, err := blockchain.BlockToDelLeaves(stxos, chain, block, inskip, -1) + if err != nil { + return err + } + adds := blockchain.BlockToAddLeaves(block, outskip, nil, outCount) + + ud, err := wire.GenerateUData(dels, us.state) + if err != nil { + return err + } + delHashes := make([]utreexo.Hash, len(ud.LeafDatas)) + for i := range delHashes { + delHashes[i] = ud.LeafDatas[i].LeafHash() + } + + err = us.state.Modify(adds, delHashes, ud.AccProof) + if err != nil { + return err + } + + if us.isFlushNeeded() { + log.Infof("Flushing the utreexo state to disk...") + err = us.flush(block.Hash()) + if err != nil { + return err + } + } + } + + return nil +} + +// InitUtreexoState returns an initialized utreexo state. If there isn't an +// existing state on disk, it creates one and returns it. +// maxMemoryUsage of 0 will keep every element on disk. A negaive maxMemoryUsage will +// load every element to the memory. +func InitUtreexoState(cfg *UtreexoConfig, chain *blockchain.BlockChain, + tipHash *chainhash.Hash, tipHeight int32) (*UtreexoState, error) { + + log.Infof("Initializing Utreexo state from '%s'", utreexoBasePath(cfg)) + defer log.Info("Utreexo state loaded") + + p := utreexo.NewMapPollard(true) + + maxNodesMem := cfg.MaxMemoryUsage * 7 / 10 + maxCachedLeavesMem := cfg.MaxMemoryUsage - maxNodesMem + + db, err := leveldb.OpenFile(utreexoBasePath(cfg), nil) + if err != nil { + return nil, err + } + + nodesDB, err := blockchain.InitNodesBackEnd(db, maxNodesMem) + if err != nil { + return nil, err + } + + cachedLeavesDB, err := blockchain.InitCachedLeavesBackEnd(db, maxCachedLeavesMem) + if err != nil { + return nil, err + } + + // The utreexo state may be an older version where the numLeaves were stored in a flat + // file. Upgrade the utreexo state if it needs to be. + err = upgradeUtreexoState(cfg, &p, db, tipHash) + if err != nil { + return nil, err + } + + savedHash, numLeaves, err := dbFetchUtreexoStateConsistency(db) + if err != nil { + return nil, err + } + p.NumLeaves = numLeaves + + var flush func(ldbTx *leveldb.Transaction) error + var isFlushNeeded func() bool + if cfg.MaxMemoryUsage >= 0 { p.Nodes = nodesDB p.CachedLeaves = cachedLeavesDB - closeDB = func() error { - err := nodesDB.Close() + flush = func(ldbTx *leveldb.Transaction) error { + nodesUsed, nodesCapacity := nodesDB.UsageStats() + log.Debugf("Utreexo index nodesDB cache usage: %d/%d (%v%%)\n", + nodesUsed, nodesCapacity, + float64(nodesUsed)/float64(nodesCapacity)) + + cachedLeavesUsed, cachedLeavesCapacity := cachedLeavesDB.UsageStats() + log.Debugf("Utreexo index cachedLeavesDB cache usage: %d/%d (%v%%)\n", + cachedLeavesUsed, cachedLeavesCapacity, + float64(cachedLeavesUsed)/float64(cachedLeavesCapacity)) + + err = nodesDB.Flush(ldbTx) if err != nil { return err } - - err = cachedLeavesDB.Close() + err = cachedLeavesDB.Flush(ldbTx) if err != nil { return err } return nil } + isFlushNeeded = func() bool { + nodesNeedsFlush := nodesDB.IsFlushNeeded() + leavesNeedsFlush := cachedLeavesDB.IsFlushNeeded() + return nodesNeedsFlush || leavesNeedsFlush + } } else { log.Infof("loading the utreexo state from disk...") err = nodesDB.ForEach(func(k uint64, v utreexo.Leaf) error { @@ -383,45 +751,42 @@ func initUtreexoState(cfg *UtreexoConfig, maxMemoryUsage int64, basePath string) log.Infof("Finished loading the utreexo state from disk.") - closeDB = func() error { - log.Infof("Flushing the utreexo state to disk. May take a while...") - - p.Nodes.ForEach(func(k uint64, v utreexo.Leaf) error { - nodesDB.Put(k, v) - return nil + flush = func(ldbTx *leveldb.Transaction) error { + err = p.Nodes.ForEach(func(k uint64, v utreexo.Leaf) error { + return blockchain.NodesBackendPut(ldbTx, k, v) }) - - p.CachedLeaves.ForEach(func(k utreexo.Hash, v uint64) error { - cachedLeavesDB.Put(k, v) - return nil - }) - - // We want to try to close both of the DBs before returning because of an error. - errStr := "" - err := nodesDB.Close() if err != nil { - errStr += fmt.Sprintf("Error while closing nodes db. %v", err.Error()) - } - err = cachedLeavesDB.Close() - if err != nil { - errStr += fmt.Sprintf("Error while closing cached leaves db. %v", err.Error()) + return err } - // If the err string isn't "", then return the error here. - if errStr != "" { - return fmt.Errorf(errStr) + err = p.CachedLeaves.ForEach(func(k utreexo.Hash, v uint64) error { + return blockchain.CachedLeavesBackendPut(ldbTx, k, v) + }) + if err != nil { + return err } - log.Infof("Finished flushing the utreexo state to disk.") - return nil } + + // Flush is never needed since we're keeping everything in memory. + isFlushNeeded = func() bool { + return false + } } uState := &UtreexoState{ - config: cfg, - state: &p, - closeDB: closeDB, + config: cfg, + state: &p, + utreexoStateDB: db, + isFlushNeeded: isFlushNeeded, + flushLeavesAndNodes: flush, + } + + // Make sure that the utreexo state is consistent before returning it. + err = uState.initConsistentUtreexoState(chain, savedHash, tipHash, tipHeight) + if err != nil { + return nil, err } return uState, err diff --git a/blockchain/indexers/utreexobackend_test.go b/blockchain/indexers/utreexobackend_test.go new file mode 100644 index 00000000..16f2130a --- /dev/null +++ b/blockchain/indexers/utreexobackend_test.go @@ -0,0 +1,55 @@ +// Copyright (c) 2024 The utreexo developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "math/rand" + "os" + "testing" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/utreexo/utreexod/chaincfg" +) + +func TestUtreexoStateConsistencyWrite(t *testing.T) { + dbPath := t.TempDir() + db, err := leveldb.OpenFile(dbPath, nil) + if err != nil { + t.Fatal(err) + } + defer func() { os.RemoveAll(dbPath) }() + + // Values to write. + numLeaves := rand.Uint64() + hash := chaincfg.MainNetParams.GenesisHash + + // Write the consistency state. + ldbTx, err := db.OpenTransaction() + if err != nil { + t.Fatal(err) + } + err = dbWriteUtreexoStateConsistency(ldbTx, hash, numLeaves) + if err != nil { + t.Fatal(err) + } + err = ldbTx.Commit() + if err != nil { + t.Fatal(err) + } + + // Fetch the consistency state. + gotHash, gotNumLeaves, err := dbFetchUtreexoStateConsistency(db) + if err != nil { + t.Fatal(err) + } + + // Compare. + if *hash != *gotHash { + t.Fatalf("expected %v, got %v", hash.String(), gotHash.String()) + } + if numLeaves != gotNumLeaves { + t.Fatalf("expected %v, got %v", numLeaves, gotNumLeaves) + } +} diff --git a/blockchain/indexers/utreexoproofindex.go b/blockchain/indexers/utreexoproofindex.go index 27224899..b599d9d0 100644 --- a/blockchain/indexers/utreexoproofindex.go +++ b/blockchain/indexers/utreexoproofindex.go @@ -8,6 +8,7 @@ import ( "bytes" "fmt" "sync" + "time" "github.com/utreexo/utreexo" "github.com/utreexo/utreexod/blockchain" @@ -52,11 +53,8 @@ var _ NeedsInputser = (*UtreexoProofIndex)(nil) // UtreexoProofIndex implements a utreexo accumulator proof index for all the blocks. type UtreexoProofIndex struct { - db database.DB - chainParams *chaincfg.Params - - // If the node is a pruned node or not. - pruned bool + db database.DB + config *UtreexoConfig // The blockchain instance the index corresponds to. chain *blockchain.BlockChain @@ -67,6 +65,9 @@ type UtreexoProofIndex struct { // utreexoState represents the Bitcoin UTXO set as a utreexo accumulator. // It keeps all the elements of the forest in order to generate proofs. utreexoState *UtreexoState + + // The time of when the utreexo state was last flushed. + lastFlushTime time.Time } // NeedsInputs signals that the index requires the referenced inputs in order @@ -79,17 +80,27 @@ func (idx *UtreexoProofIndex) NeedsInputs() bool { // Init initializes the utreexo proof index. This is part of the Indexer // interface. -func (idx *UtreexoProofIndex) Init(chain *blockchain.BlockChain) error { +func (idx *UtreexoProofIndex) Init(chain *blockchain.BlockChain, + tipHash *chainhash.Hash, tipHeight int32) error { + idx.chain = chain + // Init Utreexo State. + uState, err := InitUtreexoState(idx.config, chain, tipHash, tipHeight) + if err != nil { + return err + } + idx.utreexoState = uState + idx.lastFlushTime = time.Now() + // Nothing else to do if the node is an archive node. - if !idx.pruned { + if !idx.config.Pruned { return nil } // Check if the utreexo undo bucket exists. var exists bool - err := idx.db.View(func(dbTx database.Tx) error { + err = idx.db.View(func(dbTx database.Tx) error { parentBucket := dbTx.Metadata().Bucket(utreexoParentBucketKey) bucket := parentBucket.Bucket(utreexoUndoKey) exists = bucket != nil @@ -226,7 +237,7 @@ func (idx *UtreexoProofIndex) Create(dbTx database.Tx) error { } // Only create the undo bucket if the node is pruned. - if idx.pruned { + if idx.config.Pruned { _, err = utreexoParentBucket.CreateBucket(utreexoUndoKey) if err != nil { return err @@ -267,7 +278,7 @@ func (idx *UtreexoProofIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Bloc } // Only store the proofs if the node is not pruned. - if !idx.pruned { + if !idx.config.Pruned { err = dbStoreUtreexoProof(dbTx, block.Hash(), ud) if err != nil { return err @@ -285,7 +296,7 @@ func (idx *UtreexoProofIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Bloc } // For pruned nodes, the undo data is necessary for reorgs. - if idx.pruned { + if idx.config.Pruned { err = dbStoreUndoData(dbTx, uint64(len(adds)), ud.AccProof.Targets, block.Hash(), delHashes) if err != nil { @@ -312,7 +323,7 @@ func (idx *UtreexoProofIndex) getUndoData(dbTx database.Tx, block *btcutil.Block delHashes []utreexo.Hash ) - if !idx.pruned { + if !idx.config.Pruned { ud, err := idx.FetchUtreexoProof(block.Hash()) if err != nil { return 0, nil, nil, err @@ -365,12 +376,19 @@ func (idx *UtreexoProofIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.B return err } + // Always flush the utreexo state on flushes to never leave the utreexoState + // at an unrecoverable state. + err = idx.flushUtreexoState(&block.MsgBlock().Header.PrevBlock) + if err != nil { + return err + } + err = dbDeleteUtreexoState(dbTx, block.Hash()) if err != nil { return err } - if idx.pruned { + if idx.config.Pruned { err = dbDeleteUndoData(dbTx, block.Hash()) if err != nil { return err @@ -387,7 +405,7 @@ func (idx *UtreexoProofIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.B // FetchUtreexoProof returns the Utreexo proof data for the given block hash. func (idx *UtreexoProofIndex) FetchUtreexoProof(hash *chainhash.Hash) (*wire.UData, error) { - if idx.pruned { + if idx.config.Pruned { return nil, fmt.Errorf("Cannot fetch historical proof as the node is pruned") } @@ -557,8 +575,29 @@ func (idx *UtreexoProofIndex) VerifyAccProof(toProve []utreexo.Hash, // processed. // // This is part of the Indexer interface. -func (idx *UtreexoProofIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash.Hash) error { - return nil +func (idx *UtreexoProofIndex) PruneBlock(_ database.Tx, _ *chainhash.Hash, lastKeptHeight int32) error { + hash, _, err := dbFetchUtreexoStateConsistency(idx.utreexoState.utreexoStateDB) + if err != nil { + return err + } + + // It's ok to call block by hash here as the utreexo state consistency hash is always + // included in the best chain. + lastFlushHeight, err := idx.chain.BlockHeightByHash(hash) + if err != nil { + return err + } + + // If the last flushed utreexo state is the last or greater than the kept block, + // we can sync up to the tip so a flush is not required. + if lastKeptHeight <= lastFlushHeight { + return nil + } + + // It's ok to fetch the best snapshot here as the block called on pruneblock has not + // been yet connected yet on the utreexo state. So this is indeed the correct hash. + bestHash := idx.chain.BestSnapshot().Hash + return idx.Flush(&bestHash, blockchain.FlushRequired, true) } // NewUtreexoProofIndex returns a new instance of an indexer that is used to create a utreexo @@ -570,25 +609,21 @@ func (idx *UtreexoProofIndex) PruneBlock(dbTx database.Tx, blockHash *chainhash. // turn is used by the blockchain package. This allows the index to be // seamlessly maintained along with the chain. func NewUtreexoProofIndex(db database.DB, pruned bool, maxMemoryUsage int64, - chainParams *chaincfg.Params, dataDir string) (*UtreexoProofIndex, error) { + chainParams *chaincfg.Params, dataDir string, flush func() error) (*UtreexoProofIndex, error) { idx := &UtreexoProofIndex{ - db: db, - chainParams: chainParams, - mtx: new(sync.RWMutex), + db: db, + mtx: new(sync.RWMutex), + config: &UtreexoConfig{ + MaxMemoryUsage: maxMemoryUsage, + Params: chainParams, + Pruned: pruned, + DataDir: dataDir, + Name: db.Type(), + FlushMainDB: flush, + }, } - uState, err := InitUtreexoState(&UtreexoConfig{ - DataDir: dataDir, - Name: db.Type(), - Params: chainParams, - }, maxMemoryUsage) - if err != nil { - return nil, err - } - idx.utreexoState = uState - idx.pruned = pruned - return idx, nil } diff --git a/blockchain/internal/utreexobackends/cachedleavesmap.go b/blockchain/internal/utreexobackends/cachedleavesmap.go index f18111bd..912ae579 100644 --- a/blockchain/internal/utreexobackends/cachedleavesmap.go +++ b/blockchain/internal/utreexobackends/cachedleavesmap.go @@ -13,6 +13,27 @@ const ( cachedLeavesMapBucketSize = 16 + sizehelper.Uint64Size*chainhash.HashSize + sizehelper.Uint64Size*sizehelper.Uint64Size ) +// CachedPosition has the leaf and a flag for the status in the cache. +type CachedPosition struct { + Position uint64 + Flags CachedFlag +} + +// IsFresh returns if the cached Position has never been in the database. +func (c *CachedPosition) IsFresh() bool { + return c.Flags&Fresh == Fresh +} + +// IsModified returns if the cached leaf has been in the database and was modified in the cache. +func (c *CachedPosition) IsModified() bool { + return c.Flags&Modified == Modified +} + +// IsRemoved returns if the key for this cached leaf has been removed. +func (c *CachedPosition) IsRemoved() bool { + return c.Flags&Removed == Removed +} + // CachedLeavesMapSlice is a slice of maps for utxo entries. The slice of maps are needed to // guarantee that the map will only take up N amount of bytes. As of v1.20, the // go runtime will allocate 2^N + few extra buckets, meaning that for large N, we'll @@ -24,7 +45,10 @@ type CachedLeavesMapSlice struct { mtx *sync.Mutex // maps are the underlying maps in the slice of maps. - maps []map[utreexo.Hash]uint64 + maps []map[utreexo.Hash]CachedPosition + + // overflow puts the overflowed entries. + overflow map[utreexo.Hash]CachedPosition // maxEntries is the maximum amount of elemnts that the map is allocated for. maxEntries []int @@ -46,6 +70,8 @@ func (ms *CachedLeavesMapSlice) Length() int { l += len(m) } + l += len(ms.overflow) + return l } @@ -53,11 +79,11 @@ func (ms *CachedLeavesMapSlice) Length() int { // the entry. nil and false is returned if the outpoint is not found. // // This function is safe for concurrent access. -func (ms *CachedLeavesMapSlice) Get(k utreexo.Hash) (uint64, bool) { +func (ms *CachedLeavesMapSlice) Get(k utreexo.Hash) (CachedPosition, bool) { ms.mtx.Lock() defer ms.mtx.Unlock() - var v uint64 + var v CachedPosition var found bool for _, m := range ms.maps { @@ -67,7 +93,14 @@ func (ms *CachedLeavesMapSlice) Get(k utreexo.Hash) (uint64, bool) { } } - return 0, false + if len(ms.overflow) > 0 { + v, found = ms.overflow[k] + if found { + return v, found + } + } + + return CachedPosition{}, false } // Put puts the keys and the values into one of the maps in the map slice. If the @@ -75,7 +108,7 @@ func (ms *CachedLeavesMapSlice) Get(k utreexo.Hash) (uint64, bool) { // return false. // // This function is safe for concurrent access. -func (ms *CachedLeavesMapSlice) Put(k utreexo.Hash, v uint64) bool { +func (ms *CachedLeavesMapSlice) Put(k utreexo.Hash, v CachedPosition) bool { ms.mtx.Lock() defer ms.mtx.Unlock() @@ -88,6 +121,14 @@ func (ms *CachedLeavesMapSlice) Put(k utreexo.Hash, v uint64) bool { } } + if len(ms.overflow) > 0 { + _, found := ms.overflow[k] + if found { + ms.overflow[k] = v + return true + } + } + for i, maxNum := range ms.maxEntries { m := ms.maps[i] if len(m) >= maxNum { @@ -101,6 +142,8 @@ func (ms *CachedLeavesMapSlice) Put(k utreexo.Hash, v uint64) bool { return true // Return as we were successful in adding the entry. } + ms.overflow[k] = v + // We only reach this code if we've failed to insert into the map above as // all the current maps were full. return false @@ -117,6 +160,8 @@ func (ms *CachedLeavesMapSlice) Delete(k utreexo.Hash) { for i := 0; i < len(ms.maps); i++ { delete(ms.maps[i], k) } + + delete(ms.overflow, k) } // DeleteMaps deletes all maps and allocate new ones with the maxEntries defined in @@ -127,10 +172,12 @@ func (ms *CachedLeavesMapSlice) DeleteMaps() { ms.mtx.Lock() defer ms.mtx.Unlock() - ms.maps = make([]map[utreexo.Hash]uint64, len(ms.maxEntries)) + ms.maps = make([]map[utreexo.Hash]CachedPosition, len(ms.maxEntries)) for i := range ms.maxEntries { - ms.maps[i] = make(map[utreexo.Hash]uint64, ms.maxEntries[i]) + ms.maps[i] = make(map[utreexo.Hash]CachedPosition, ms.maxEntries[i]) } + + ms.overflow = make(map[utreexo.Hash]CachedPosition) } // ClearMaps clears all maps @@ -145,20 +192,38 @@ func (ms *CachedLeavesMapSlice) ClearMaps() { delete(ms.maps[i], key) } } + + for key := range ms.overflow { + delete(ms.overflow, key) + } } // ForEach loops through all the elements in the cachedleaves map slice and calls fn with the key-value pairs. // // This function is safe for concurrent access. -func (ms *CachedLeavesMapSlice) ForEach(fn func(utreexo.Hash, uint64)) { +func (ms *CachedLeavesMapSlice) ForEach(fn func(utreexo.Hash, CachedPosition) error) error { ms.mtx.Lock() defer ms.mtx.Unlock() for _, m := range ms.maps { for k, v := range m { - fn(k, v) + err := fn(k, v) + if err != nil { + return err + } + } + } + + if len(ms.overflow) > 0 { + for k, v := range ms.overflow { + err := fn(k, v) + if err != nil { + return err + } } } + + return nil } // createMaps creates a slice of maps and returns the total count that the maps @@ -182,14 +247,21 @@ func (ms *CachedLeavesMapSlice) createMaps(maxMemoryUsage int64) int64 { } // Create the maps. - ms.maps = make([]map[utreexo.Hash]uint64, len(ms.maxEntries)) + ms.maps = make([]map[utreexo.Hash]CachedPosition, len(ms.maxEntries)) for i := range ms.maxEntries { - ms.maps[i] = make(map[utreexo.Hash]uint64, ms.maxEntries[i]) + ms.maps[i] = make(map[utreexo.Hash]CachedPosition, ms.maxEntries[i]) } + ms.overflow = make(map[utreexo.Hash]CachedPosition) + return int64(totalElemCount) } +// Overflowed returns true if the map slice overflowed. +func (ms *CachedLeavesMapSlice) Overflowed() bool { + return len(ms.overflow) > 0 +} + // NewCachedLeavesMapSlice returns a new CachedLeavesMapSlice and the total amount of elements // that the map slice can accomodate. func NewCachedLeavesMapSlice(maxTotalMemoryUsage int64) (CachedLeavesMapSlice, int64) { diff --git a/blockchain/internal/utreexobackends/cachedleavesmap_test.go b/blockchain/internal/utreexobackends/cachedleavesmap_test.go index 50735df0..72715c7a 100644 --- a/blockchain/internal/utreexobackends/cachedleavesmap_test.go +++ b/blockchain/internal/utreexobackends/cachedleavesmap_test.go @@ -35,7 +35,7 @@ func TestCachedLeaveMapSliceDuplicates(t *testing.T) { m, maxElems := NewCachedLeavesMapSlice(8000) for i := 0; i < 10; i++ { for j := int64(0); j < maxElems; j++ { - if !m.Put(uint64ToHash(uint64(j)), 0) { + if !m.Put(uint64ToHash(uint64(j)), CachedPosition{}) { t.Fatalf("unexpected error on m.put") } } @@ -49,7 +49,7 @@ func TestCachedLeaveMapSliceDuplicates(t *testing.T) { // Try inserting x which should be unique. Should fail as the map is full. x := uint64(0) x -= 1 - if m.Put(uint64ToHash(x), 0) { + if m.Put(uint64ToHash(x), CachedPosition{}) { t.Fatalf("expected error but successfully called put") } @@ -57,22 +57,27 @@ func TestCachedLeaveMapSliceDuplicates(t *testing.T) { // a duplicate element. m.Delete(uint64ToHash(0)) x = uint64(maxElems) - 1 - if !m.Put(uint64ToHash(x), 0) { + if !m.Put(uint64ToHash(x), CachedPosition{}) { t.Fatalf("unexpected failure on put") } // Make sure the length of the map is 1 less than the max elems. - if m.Length() != int(maxElems)-1 { + if m.Length()-len(m.overflow) != int(maxElems)-1 { t.Fatalf("expected length of %v but got %v", maxElems-1, m.Length()) } // Put 0 back in and then compare the map. - if !m.Put(uint64ToHash(0), 0) { + if !m.Put(uint64ToHash(0), CachedPosition{}) { t.Fatalf("didn't expect error but unsuccessfully called put") } - if m.Length() != int(maxElems) { + if m.Length()-len(m.overflow) != int(maxElems) { t.Fatalf("expected length of %v but got %v", maxElems, m.Length()) } + + if len(m.overflow) != 1 { + t.Fatalf("expected length of %v but got %v", + 1, len(m.overflow)) + } } diff --git a/blockchain/internal/utreexobackends/nodesmap.go b/blockchain/internal/utreexobackends/nodesmap.go index 0c2a753c..5b5909ef 100644 --- a/blockchain/internal/utreexobackends/nodesmap.go +++ b/blockchain/internal/utreexobackends/nodesmap.go @@ -64,6 +64,9 @@ type NodesMapSlice struct { // maps are the underlying maps in the slice of maps. maps []map[uint64]CachedLeaf + // overflow puts the overflowed entries. + overflow map[uint64]CachedLeaf + // maxEntries is the maximum amount of elemnts that the map is allocated for. maxEntries []int @@ -84,6 +87,8 @@ func (ms *NodesMapSlice) Length() int { l += len(m) } + l += len(ms.overflow) + return l } @@ -105,7 +110,14 @@ func (ms *NodesMapSlice) Get(k uint64) (CachedLeaf, bool) { } } - return v, found + if len(ms.overflow) > 0 { + v, found = ms.overflow[k] + if found { + return v, found + } + } + + return v, false } // put puts the keys and the values into one of the maps in the map slice. If the @@ -126,6 +138,14 @@ func (ms *NodesMapSlice) Put(k uint64, v CachedLeaf) bool { } } + if len(ms.overflow) > 0 { + _, found := ms.overflow[k] + if found { + ms.overflow[k] = v + return true + } + } + for i, maxNum := range ms.maxEntries { m := ms.maps[i] if len(m) >= maxNum { @@ -139,6 +159,8 @@ func (ms *NodesMapSlice) Put(k uint64, v CachedLeaf) bool { return true // Return as we were successful in adding the entry. } + ms.overflow[k] = v + // We only reach this code if we've failed to insert into the map above as // all the current maps were full. return false @@ -155,6 +177,8 @@ func (ms *NodesMapSlice) delete(k uint64) { for i := 0; i < len(ms.maps); i++ { delete(ms.maps[i], k) } + + delete(ms.overflow, k) } // DeleteMaps deletes all maps and allocate new ones with the maxEntries defined in @@ -168,6 +192,8 @@ func (ms *NodesMapSlice) DeleteMaps() { for i := range ms.maxEntries { ms.maps[i] = make(map[uint64]CachedLeaf, ms.maxEntries[i]) } + + ms.overflow = make(map[uint64]CachedLeaf) } // ClearMaps clears all maps @@ -182,20 +208,38 @@ func (ms *NodesMapSlice) ClearMaps() { delete(ms.maps[i], key) } } + + for key := range ms.overflow { + delete(ms.overflow, key) + } } // ForEach loops through all the elements in the nodes map slice and calls fn with the key-value pairs. // // This function is safe for concurrent access. -func (ms *NodesMapSlice) ForEach(fn func(uint64, CachedLeaf)) { +func (ms *NodesMapSlice) ForEach(fn func(uint64, CachedLeaf) error) error { ms.mtx.Lock() defer ms.mtx.Unlock() for _, m := range ms.maps { for k, v := range m { - fn(k, v) + err := fn(k, v) + if err != nil { + return err + } + } + } + + if len(ms.overflow) > 0 { + for k, v := range ms.overflow { + err := fn(k, v) + if err != nil { + return err + } } } + + return nil } // createMaps creates a slice of maps and returns the total count that the maps @@ -224,9 +268,16 @@ func (ms *NodesMapSlice) createMaps(maxMemoryUsage int64) int64 { ms.maps[i] = make(map[uint64]CachedLeaf, ms.maxEntries[i]) } + ms.overflow = make(map[uint64]CachedLeaf) + return int64(totalElemCount) } +// Overflowed returns true if the map slice overflowed. +func (ms *NodesMapSlice) Overflowed() bool { + return len(ms.overflow) > 0 +} + // NewNodesMapSlice returns a new NodesMapSlice and the total amount of elements // that the map slice can accomodate. func NewNodesMapSlice(maxTotalMemoryUsage int64) (NodesMapSlice, int64) { diff --git a/blockchain/internal/utreexobackends/nodesmap_test.go b/blockchain/internal/utreexobackends/nodesmap_test.go index de962924..19230c61 100644 --- a/blockchain/internal/utreexobackends/nodesmap_test.go +++ b/blockchain/internal/utreexobackends/nodesmap_test.go @@ -50,7 +50,7 @@ func TestNodesMapSliceDuplicates(t *testing.T) { } // Make sure the length of the map is 1 less than the max elems. - if m.Length() != int(maxElems)-1 { + if m.Length()-len(m.overflow) != int(maxElems)-1 { t.Fatalf("expected length of %v but got %v", maxElems-1, m.Length()) } @@ -59,8 +59,13 @@ func TestNodesMapSliceDuplicates(t *testing.T) { if !m.Put(0, CachedLeaf{}) { t.Fatalf("didn't expect error but unsuccessfully called put") } - if m.Length() != int(maxElems) { + if m.Length()-len(m.overflow) != int(maxElems) { t.Fatalf("expected length of %v but got %v", maxElems, m.Length()) } + + if len(m.overflow) != 1 { + t.Fatalf("expected length of %v but got %v", + 1, len(m.overflow)) + } } diff --git a/blockchain/utreexoio.go b/blockchain/utreexoio.go index 0c0a6fd9..9fa87447 100644 --- a/blockchain/utreexoio.go +++ b/blockchain/utreexoio.go @@ -53,12 +53,7 @@ type NodesBackEnd struct { // InitNodesBackEnd returns a newly initialized NodesBackEnd which implements // utreexo.NodesInterface. -func InitNodesBackEnd(datadir string, maxTotalMemoryUsage int64) (*NodesBackEnd, error) { - db, err := leveldb.OpenFile(datadir, nil) - if err != nil { - return nil, err - } - +func InitNodesBackEnd(db *leveldb.DB, maxTotalMemoryUsage int64) (*NodesBackEnd, error) { cache, maxCacheElems := utreexobackends.NewNodesMapSlice(maxTotalMemoryUsage) nb := NodesBackEnd{ db: db, @@ -69,15 +64,6 @@ func InitNodesBackEnd(datadir string, maxTotalMemoryUsage int64) (*NodesBackEnd, return &nb, nil } -// dbPut serializes and puts the key value pair into the database. -func (m *NodesBackEnd) dbPut(k uint64, v utreexo.Leaf) error { - var buf [vlqBufSize]byte - size := putVLQ(buf[:], k) - - serialized := serializeLeaf(v) - return m.db.Put(buf[:size], serialized[:], nil) -} - // dbGet fetches the value from the database and deserializes it and returns // the leaf value and a boolean for whether or not it was successful. func (m *NodesBackEnd) dbGet(k uint64) (utreexo.Leaf, bool) { @@ -98,19 +84,8 @@ func (m *NodesBackEnd) dbGet(k uint64) (utreexo.Leaf, bool) { return leaf, true } -// dbDel removes the key from the database. -func (m *NodesBackEnd) dbDel(k uint64) error { - var buf [vlqBufSize]byte - size := putVLQ(buf[:], k) - return m.db.Delete(buf[:size], nil) -} - // Get returns the leaf from the underlying map. func (m *NodesBackEnd) Get(k uint64) (utreexo.Leaf, bool) { - if m.maxCacheElem == 0 { - return m.dbGet(k) - } - // Look it up on the cache first. cLeaf, found := m.cache.Get(k) if found { @@ -119,13 +94,6 @@ func (m *NodesBackEnd) Get(k uint64) (utreexo.Leaf, bool) { return utreexo.Leaf{}, false } - // If the cache is full, flush the cache then Put - // the leaf in. - if !m.cache.Put(k, cLeaf) { - m.flush() - m.cache.Put(k, cLeaf) - } - // If we found it, return here. return cLeaf.Leaf, true } @@ -139,28 +107,23 @@ func (m *NodesBackEnd) Get(k uint64) (utreexo.Leaf, bool) { } // Cache the leaf before returning it. - if !m.cache.Put(k, utreexobackends.CachedLeaf{Leaf: leaf}) { - m.flush() - m.cache.Put(k, utreexobackends.CachedLeaf{Leaf: leaf}) - } + m.cache.Put(k, utreexobackends.CachedLeaf{Leaf: leaf}) + return leaf, true } -// Put puts the given position and the leaf to the underlying map. -func (m *NodesBackEnd) Put(k uint64, v utreexo.Leaf) { - if m.maxCacheElem == 0 { - err := m.dbPut(k, v) - if err != nil { - log.Warnf("NodesBackEnd dbPut fail. %v", err) - } - - return - } +// NodesBackendPut puts a key-value pair in the given leveldb tx. +func NodesBackendPut(tx *leveldb.Transaction, k uint64, v utreexo.Leaf) error { + size := serializeSizeVLQ(k) + buf := make([]byte, size) + putVLQ(buf, k) - if int64(m.cache.Length()) > m.maxCacheElem { - m.flush() - } + serialized := serializeLeaf(v) + return tx.Put(buf[:], serialized[:], nil) +} +// Put puts the given position and the leaf to the underlying map. +func (m *NodesBackEnd) Put(k uint64, v utreexo.Leaf) { leaf, found := m.cache.Get(k) if found { leaf.Flags &^= utreexobackends.Removed @@ -169,11 +132,7 @@ func (m *NodesBackEnd) Put(k uint64, v utreexo.Leaf) { Flags: leaf.Flags | utreexobackends.Modified, } - // It shouldn't fail here but handle it anyways. - if !m.cache.Put(k, l) { - m.flush() - m.cache.Put(k, l) - } + m.cache.Put(k, l) } else { // If the key isn't found, mark it as fresh. l := utreexobackends.CachedLeaf{ @@ -181,49 +140,59 @@ func (m *NodesBackEnd) Put(k uint64, v utreexo.Leaf) { Flags: utreexobackends.Fresh, } - // It shouldn't fail here but handle it anyways. - if !m.cache.Put(k, l) { - m.flush() - m.cache.Put(k, l) - } + m.cache.Put(k, l) } } +// NodesBackendDelete deletes the corresponding key-value pair from the given leveldb tx. +func NodesBackendDelete(tx *leveldb.Transaction, k uint64) error { + size := serializeSizeVLQ(k) + buf := make([]byte, size) + putVLQ(buf, k) + return tx.Delete(buf, nil) +} + // Delete removes the given key from the underlying map. No-op if the key // doesn't exist. func (m *NodesBackEnd) Delete(k uint64) { - if m.maxCacheElem == 0 { - err := m.dbDel(k) - if err != nil { - log.Warnf("NodesBackEnd dbDel fail. %v", err) - } - - return - } - - leaf, found := m.cache.Get(k) - if !found { - if int64(m.cache.Length()) >= m.maxCacheElem { - m.flush() - } - } + // Don't delete as the same key may get called to be removed multiple times. + // Cache it as removed so that we don't call expensive flushes on keys that + // are not in the database. + leaf, _ := m.cache.Get(k) l := utreexobackends.CachedLeaf{ Leaf: leaf.Leaf, Flags: leaf.Flags | utreexobackends.Removed, } - if !m.cache.Put(k, l) { - m.flush() - m.cache.Put(k, l) - } + + m.cache.Put(k, l) } // Length returns the amount of items in the underlying database. func (m *NodesBackEnd) Length() int { - m.flush() - length := 0 + m.cache.ForEach(func(u uint64, cl utreexobackends.CachedLeaf) error { + // Only count the entry if it's not removed and it's not already + // in the database. + if !cl.IsRemoved() && cl.IsFresh() { + length++ + } + + return nil + }) + iter := m.db.NewIterator(nil, nil) for iter.Next() { + // If the itered key is chainhash.HashSize, it means that the entry is for nodesbackend. + // Skip it since it's not relevant here. + if len(iter.Key()) == 32 { + continue + } + k, _ := deserializeVLQ(iter.Key()) + val, found := m.cache.Get(k) + if found && val.IsRemoved() { + // Skip if the key-value pair has already been removed in the cache. + continue + } length++ } iter.Release() @@ -233,13 +202,31 @@ func (m *NodesBackEnd) Length() int { // ForEach calls the given function for each of the elements in the underlying map. func (m *NodesBackEnd) ForEach(fn func(uint64, utreexo.Leaf) error) error { - m.flush() + m.cache.ForEach(func(u uint64, cl utreexobackends.CachedLeaf) error { + // Only operate on the entry if it's not removed and it's not already + // in the database. + if !cl.IsRemoved() && cl.IsFresh() { + fn(u, cl.Leaf) + } + + return nil + }) iter := m.db.NewIterator(nil, nil) for iter.Next() { + // If the itered key is chainhash.HashSize, it means that the entry is for nodesbackend. + // Skip it since it's not relevant here. + if len(iter.Key()) == 32 { + continue + } // Remember that the contents of the returned slice should not be modified, and // only valid until the next call to Next. k, _ := deserializeVLQ(iter.Key()) + val, found := m.cache.Get(k) + if found && val.IsRemoved() { + // Skip if the key-value pair has already been removed in the cache. + continue + } value := iter.Value() if len(value) != leafLength { @@ -257,34 +244,48 @@ func (m *NodesBackEnd) ForEach(fn func(uint64, utreexo.Leaf) error) error { return iter.Error() } -// flush saves all the cached entries to disk and resets the cache map. -func (m *NodesBackEnd) flush() { - if m.maxCacheElem == 0 { - return - } +// IsFlushNeeded returns true if the backend needs to be flushed. +func (m *NodesBackEnd) IsFlushNeeded() bool { + return m.cache.Overflowed() +} - m.cache.ForEach(func(k uint64, v utreexobackends.CachedLeaf) { - if v.IsRemoved() { - err := m.dbDel(k) - if err != nil { - log.Warnf("NodesBackEnd flush error. %v", err) +// UsageStats returns the currently cached elements and the total amount the cache can hold. +func (m *NodesBackEnd) UsageStats() (int64, int64) { + return int64(m.cache.Length()), m.maxCacheElem +} + +// flush saves all the cached entries to disk and resets the cache map. +func (m *NodesBackEnd) Flush(ldbTx *leveldb.Transaction) error { + err := m.cache.ForEach(func(k uint64, v utreexobackends.CachedLeaf) error { + if v.IsFresh() { + if !v.IsRemoved() { + err := NodesBackendPut(ldbTx, k, v.Leaf) + if err != nil { + return err + } } - } else if v.IsFresh() || v.IsModified() { - err := m.dbPut(k, v.Leaf) - if err != nil { - log.Warnf("NodesBackEnd flush error. %v", err) + } else { + if v.IsRemoved() { + err := NodesBackendDelete(ldbTx, k) + if err != nil { + return err + } + } else if v.IsModified() { + err := NodesBackendPut(ldbTx, k, v.Leaf) + if err != nil { + return err + } } } + + return nil }) + if err != nil { + return fmt.Errorf("NodesBackEnd flush error. %v", err) + } m.cache.ClearMaps() -} - -// Close flushes the cache and closes the underlying database. -func (m *NodesBackEnd) Close() error { - m.flush() - - return m.db.Close() + return nil } var _ utreexo.CachedLeavesInterface = (*CachedLeavesBackEnd)(nil) @@ -297,13 +298,6 @@ type CachedLeavesBackEnd struct { cache utreexobackends.CachedLeavesMapSlice } -// dbPut serializes and puts the key and the value into the database. -func (m *CachedLeavesBackEnd) dbPut(k utreexo.Hash, v uint64) error { - var buf [vlqBufSize]byte - size := putVLQ(buf[:], v) - return m.db.Put(k[:], buf[:size], nil) -} - // dbGet fetches and deserializes the value from the database. func (m *CachedLeavesBackEnd) dbGet(k utreexo.Hash) (uint64, bool) { val, err := m.db.Get(k[:], nil) @@ -317,64 +311,84 @@ func (m *CachedLeavesBackEnd) dbGet(k utreexo.Hash) (uint64, bool) { // InitCachedLeavesBackEnd returns a newly initialized CachedLeavesBackEnd which implements // utreexo.CachedLeavesInterface. -func InitCachedLeavesBackEnd(datadir string, maxMemoryUsage int64) (*CachedLeavesBackEnd, error) { - db, err := leveldb.OpenFile(datadir, nil) - if err != nil { - return nil, err - } - +func InitCachedLeavesBackEnd(db *leveldb.DB, maxMemoryUsage int64) (*CachedLeavesBackEnd, error) { cache, maxCacheElem := utreexobackends.NewCachedLeavesMapSlice(maxMemoryUsage) return &CachedLeavesBackEnd{maxCacheElem: maxCacheElem, db: db, cache: cache}, nil } // Get returns the data from the underlying cache or the database. func (m *CachedLeavesBackEnd) Get(k utreexo.Hash) (uint64, bool) { - if m.maxCacheElem == 0 { - return m.dbGet(k) - } - pos, found := m.cache.Get(k) if !found { return m.dbGet(k) } + // Even if the entry was found, if the position value is math.MaxUint64, + // then it was already deleted. + if pos.IsRemoved() { + return 0, false + } - return pos, found + return pos.Position, found +} + +// CachedLeavesBackendPut puts a key-value pair in the given leveldb tx. +func CachedLeavesBackendPut(tx *leveldb.Transaction, k utreexo.Hash, v uint64) error { + size := serializeSizeVLQ(v) + buf := make([]byte, size) + putVLQ(buf, v) + return tx.Put(k[:], buf, nil) } // Put puts the given data to the underlying cache. If the cache is full, it evicts // the earliest entries to make room. func (m *CachedLeavesBackEnd) Put(k utreexo.Hash, v uint64) { - if m.maxCacheElem == 0 { - err := m.dbPut(k, v) - if err != nil { - log.Warnf("CachedLeavesBackEnd dbPut fail. %v", err) - } - - return - } - - length := m.cache.Length() - if int64(length) >= m.maxCacheElem { - m.flush() - } - - m.cache.Put(k, v) + m.cache.Put(k, utreexobackends.CachedPosition{ + Position: v, + Flags: utreexobackends.Fresh, + }) } // Delete removes the given key from the underlying map. No-op if the key // doesn't exist. func (m *CachedLeavesBackEnd) Delete(k utreexo.Hash) { - m.cache.Delete(k) - m.db.Delete(k[:], nil) + pos, found := m.cache.Get(k) + if found && pos.IsFresh() { + m.cache.Delete(k) + return + } + p := utreexobackends.CachedPosition{ + Position: pos.Position, + Flags: pos.Flags | utreexobackends.Removed, + } + + m.cache.Put(k, p) } // Length returns the amount of items in the underlying db and the cache. func (m *CachedLeavesBackEnd) Length() int { - m.flush() - length := 0 + m.cache.ForEach(func(k utreexo.Hash, v utreexobackends.CachedPosition) error { + // Only operate on the entry if it's not removed and it's not already + // in the database. + if !v.IsRemoved() && v.IsFresh() { + length++ + } + return nil + }) iter := m.db.NewIterator(nil, nil) for iter.Next() { + // If the itered key is chainhash.HashSize, it means that the entry is for nodesbackend. + // Skip it since it's not relevant here. + if len(iter.Key()) != chainhash.HashSize { + continue + } + k := iter.Key() + val, found := m.cache.Get(*(*[chainhash.HashSize]byte)(k)) + if found && val.IsRemoved() { + // Skip if the key-value pair has already been removed in the cache. + continue + } + length++ } iter.Release() @@ -384,13 +398,29 @@ func (m *CachedLeavesBackEnd) Length() int { // ForEach calls the given function for each of the elements in the underlying map. func (m *CachedLeavesBackEnd) ForEach(fn func(utreexo.Hash, uint64) error) error { - m.flush() - + m.cache.ForEach(func(k utreexo.Hash, v utreexobackends.CachedPosition) error { + // Only operate on the entry if it's not removed and it's not already + // in the database. + if !v.IsRemoved() && v.IsFresh() { + fn(k, v.Position) + } + return nil + }) iter := m.db.NewIterator(nil, nil) for iter.Next() { + // If the itered key isn't chainhash.HashSize, it means that the entry is for nodesbackend. + // Skip it since it's not relevant here. + if len(iter.Key()) != chainhash.HashSize { + continue + } // Remember that the contents of the returned slice should not be modified, and // only valid until the next call to Next. k := iter.Key() + val, found := m.cache.Get(*(*[chainhash.HashSize]byte)(k)) + if found && val.IsRemoved() { + // Skip if the key-value pair has already been removed in the cache. + continue + } v, _ := deserializeVLQ(iter.Value()) err := fn(*(*[chainhash.HashSize]byte)(k), v) @@ -402,20 +432,37 @@ func (m *CachedLeavesBackEnd) ForEach(fn func(utreexo.Hash, uint64) error) error return iter.Error() } +// IsFlushNeeded returns true if the backend needs to be flushed. +func (m *CachedLeavesBackEnd) IsFlushNeeded() bool { + return m.cache.Overflowed() +} + +// UsageStats returns the currently cached elements and the total amount the cache can hold. +func (m *CachedLeavesBackEnd) UsageStats() (int64, int64) { + return int64(m.cache.Length()), m.maxCacheElem +} + // Flush resets the cache and saves all the key values onto the database. -func (m *CachedLeavesBackEnd) flush() { - m.cache.ForEach(func(k utreexo.Hash, v uint64) { - err := m.dbPut(k, v) - if err != nil { - log.Warnf("CachedLeavesBackEnd dbPut fail. %v", err) +func (m *CachedLeavesBackEnd) Flush(ldbTx *leveldb.Transaction) error { + err := m.cache.ForEach(func(k utreexo.Hash, v utreexobackends.CachedPosition) error { + if v.IsRemoved() { + err := ldbTx.Delete(k[:], nil) + if err != nil { + return err + } + } else { + err := CachedLeavesBackendPut(ldbTx, k, v.Position) + if err != nil { + return err + } } + + return nil }) + if err != nil { + return fmt.Errorf("CachedLeavesBackEnd flush error. %v", err) + } m.cache.ClearMaps() -} - -// Close flushes all the cached entries and then closes the underlying database. -func (m *CachedLeavesBackEnd) Close() error { - m.flush() - return m.db.Close() + return nil } diff --git a/blockchain/utreexoio_test.go b/blockchain/utreexoio_test.go index 33ced614..e0f1361f 100644 --- a/blockchain/utreexoio_test.go +++ b/blockchain/utreexoio_test.go @@ -8,6 +8,7 @@ import ( "sync" "testing" + "github.com/syndtr/goleveldb/leveldb" "github.com/utreexo/utreexo" "github.com/utreexo/utreexod/blockchain/internal/utreexobackends" ) @@ -19,26 +20,18 @@ func TestCachedLeavesBackEnd(t *testing.T) { }{ { tmpDir: func() string { - return filepath.Join(os.TempDir(), "TestCachedLeavesBackEnd0") - }(), - maxMemUsage: -1, - }, - { - tmpDir: func() string { - return filepath.Join(os.TempDir(), "TestCachedLeavesBackEnd1") - }(), - maxMemUsage: 0, - }, - { - tmpDir: func() string { - return filepath.Join(os.TempDir(), "TestCachedLeavesBackEnd2") + return filepath.Join(os.TempDir(), "TestCachedLeavesBackEnd") }(), maxMemUsage: 1 * 1024 * 1024, }, } for _, test := range tests { - cachedLeavesBackEnd, err := InitCachedLeavesBackEnd(test.tmpDir, test.maxMemUsage) + db, err := leveldb.OpenFile(test.tmpDir, nil) + if err != nil { + t.Fatal(err) + } + cachedLeavesBackEnd, err := InitCachedLeavesBackEnd(db, test.maxMemUsage) if err != nil { t.Fatal(err) } @@ -55,12 +48,26 @@ func TestCachedLeavesBackEnd(t *testing.T) { cachedLeavesBackEnd.Put(hash, i) } + ldbTx, err := db.OpenTransaction() + if err != nil { + t.Fatal(err) + } // Close and reopen the backend. - err = cachedLeavesBackEnd.Close() + cachedLeavesBackEnd.Flush(ldbTx) + err = ldbTx.Commit() + if err != nil { + t.Fatal(err) + } + err = db.Close() + if err != nil { + t.Fatal(err) + } + + db, err = leveldb.OpenFile(test.tmpDir, nil) if err != nil { t.Fatal(err) } - cachedLeavesBackEnd, err = InitCachedLeavesBackEnd(test.tmpDir, test.maxMemUsage) + cachedLeavesBackEnd, err = InitCachedLeavesBackEnd(db, test.maxMemUsage) if err != nil { t.Fatal(err) } @@ -153,26 +160,18 @@ func TestNodesBackEnd(t *testing.T) { }{ { tmpDir: func() string { - return filepath.Join(os.TempDir(), "TestNodesBackEnd0") - }(), - maxMemUsage: -1, - }, - { - tmpDir: func() string { - return filepath.Join(os.TempDir(), "TestNodesBackEnd1") - }(), - maxMemUsage: 0, - }, - { - tmpDir: func() string { - return filepath.Join(os.TempDir(), "TestNodesBackEnd2") + return filepath.Join(os.TempDir(), "TestNodesBackEnd") }(), maxMemUsage: 1 * 1024 * 1024, }, } for _, test := range tests { - nodesBackEnd, err := InitNodesBackEnd(test.tmpDir, test.maxMemUsage) + db, err := leveldb.OpenFile(test.tmpDir, nil) + if err != nil { + t.Fatal(err) + } + nodesBackEnd, err := InitNodesBackEnd(db, test.maxMemUsage) if err != nil { t.Fatal(err) } @@ -189,12 +188,26 @@ func TestNodesBackEnd(t *testing.T) { nodesBackEnd.Put(i, utreexo.Leaf{Hash: hash}) } + ldbTx, err := db.OpenTransaction() + if err != nil { + t.Fatal(err) + } // Close and reopen the backend. - err = nodesBackEnd.Close() + nodesBackEnd.Flush(ldbTx) + err = ldbTx.Commit() + if err != nil { + t.Fatal(err) + } + err = db.Close() + if err != nil { + t.Fatal(err) + } + + db, err = leveldb.OpenFile(test.tmpDir, nil) if err != nil { t.Fatal(err) } - nodesBackEnd, err = InitNodesBackEnd(test.tmpDir, test.maxMemUsage) + nodesBackEnd, err = InitNodesBackEnd(db, test.maxMemUsage) if err != nil { t.Fatal(err) } diff --git a/blockchain/utxocache.go b/blockchain/utxocache.go index 49872bde..37bc70ef 100644 --- a/blockchain/utxocache.go +++ b/blockchain/utxocache.go @@ -180,11 +180,11 @@ func (ms *mapSlice) deleteMaps() { } const ( - // utxoFlushPeriodicInterval is the interval at which a flush is performed + // UtxoFlushPeriodicInterval is the interval at which a flush is performed // when the flush mode FlushPeriodic is used. This is used when the initial // block download is complete and it's useful to flush periodically in case // of unforseen shutdowns. - utxoFlushPeriodicInterval = time.Minute * 5 + UtxoFlushPeriodicInterval = time.Minute * 5 ) // FlushMode is used to indicate the different urgency types for a flush. @@ -563,7 +563,7 @@ func (s *utxoCache) flush(dbTx database.Tx, mode FlushMode, bestState *BestState case FlushPeriodic: // If the time since the last flush is over the periodic interval, // force a flush. Otherwise just flush when the cache is full. - if time.Since(s.lastFlushTime) > utxoFlushPeriodicInterval { + if time.Since(s.lastFlushTime) > UtxoFlushPeriodicInterval { threshold = 0 } else { threshold = s.maxTotalMemoryUsage diff --git a/config.go b/config.go index 59f21754..b43710bc 100644 --- a/config.go +++ b/config.go @@ -490,7 +490,7 @@ func loadConfig() (*config, []string, error) { MaxOrphanTxs: defaultMaxOrphanTransactions, SigCacheMaxSize: defaultSigCacheMaxSize, UtxoCacheMaxSizeMiB: defaultUtxoCacheMaxSizeMiB, - UtreexoProofIndexMaxMemory: defaultUtxoCacheMaxSizeMiB, + UtreexoProofIndexMaxMemory: defaultUtxoCacheMaxSizeMiB * 2, Generate: defaultGenerate, TxIndex: defaultTxIndex, TTLIndex: defaultTTLIndex, diff --git a/database/ffldb/db.go b/database/ffldb/db.go index 93523748..3612a93a 100644 --- a/database/ffldb/db.go +++ b/database/ffldb/db.go @@ -2288,6 +2288,22 @@ func (db *db) Update(fn func(database.Tx) error) error { return tx.Commit() } +// Flush flushes the internal cache of the database to the disk. +// +// This function is part of the database.DB interface implementation. +func (db *db) Flush() error { + // Since all transactions have a read lock on this mutex, this will + // cause Flush to wait for all readers to complete. + db.closeLock.Lock() + defer db.closeLock.Unlock() + + if db.closed { + return makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil) + } + + return db.cache.flush() +} + // Close cleanly shuts down the database and syncs all data. It will block // until all database transactions have been finalized (rolled back or // committed). diff --git a/database/interface.go b/database/interface.go index 058c90cd..31340752 100644 --- a/database/interface.go +++ b/database/interface.go @@ -504,6 +504,9 @@ type DB interface { // user-supplied function will result in a panic. Update(fn func(tx Tx) error) error + // Flush flushes the internal cache of the database to the disk. + Flush() error + // Close cleanly shuts down the database and syncs all data. It will // block until all database transactions have been finalized (rolled // back or committed). diff --git a/netsync/manager.go b/netsync/manager.go index 5c988669..b7e84863 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -837,14 +837,19 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { // flush the blockchain cache because we don't expect new blocks immediately. // After that, there is nothing more to do. if !sm.headersFirstMode { + // Flush relevant indexes. + if err := sm.chain.FlushIndexes(blockchain.FlushPeriodic, true); err != nil { + log.Errorf("Error while flushing the blockchain cache: %v", err) + } // Only flush if utreexoView is not active since a utreexo node does // not have a utxo cache. if !sm.chain.IsUtreexoViewActive() { if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { log.Errorf("Error while flushing the blockchain cache: %v", err) } - return } + + return } // This is headers-first mode, so if the block is not a checkpoint diff --git a/server.go b/server.go index 74fcc96f..6079d69b 100644 --- a/server.go +++ b/server.go @@ -2574,7 +2574,7 @@ out: // If utreexoProofIndex option is on, flush it after closing down syncManager. if s.utreexoProofIndex != nil { - err := s.utreexoProofIndex.FlushUtreexoState() + err := s.utreexoProofIndex.CloseUtreexoState() if err != nil { btcdLog.Errorf("Error while flushing utreexo state: %v", err) } @@ -2582,7 +2582,7 @@ out: // If flatUtreexoProofIndex option is on, flush it after closing down syncManager. if s.flatUtreexoProofIndex != nil { - err := s.flatUtreexoProofIndex.FlushUtreexoState() + err := s.flatUtreexoProofIndex.CloseUtreexoState() if err != nil { btcdLog.Errorf("Error while flushing utreexo state: %v", err) } @@ -3239,7 +3239,7 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, var err error s.utreexoProofIndex, err = indexers.NewUtreexoProofIndex( db, cfg.Prune != 0, cfg.UtreexoProofIndexMaxMemory*1024*1024, - chainParams, cfg.DataDir) + chainParams, cfg.DataDir, db.Flush) if err != nil { return nil, err } @@ -3256,7 +3256,7 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, var err error s.flatUtreexoProofIndex, err = indexers.NewFlatUtreexoProofIndex( cfg.Prune != 0, chainParams, interval, - cfg.UtreexoProofIndexMaxMemory*1024*1024, cfg.DataDir) + cfg.UtreexoProofIndexMaxMemory*1024*1024, cfg.DataDir, db.Flush) if err != nil { return nil, err }