Skip to content

Commit

Permalink
Fix indexer and add method to get outpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
alanorwick authored and jdowning100 committed May 24, 2024
1 parent d9b5887 commit bfd3819
Show file tree
Hide file tree
Showing 27 changed files with 767 additions and 274 deletions.
2 changes: 1 addition & 1 deletion common/proto_common.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion consensus/blake3pow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,11 @@ func (blake3pow *Blake3pow) Finalize(chain consensus.ChainHeaderReader, header *
continue
}
}
core.AddGenesisUtxos(state, nodeLocation, blake3pow.logger)
addressOutpointMap := make(map[string]map[string]*types.OutpointAndDenomination)
core.AddGenesisUtxos(state, nodeLocation, addressOutpointMap, blake3pow.logger)
if chain.Config().IndexAddressUtxos {
chain.WriteAddressOutpoints(addressOutpointMap)
}
}
header.Header().SetUTXORoot(state.UTXORoot())
header.Header().SetEVMRoot(state.IntermediateRoot(true))
Expand Down
3 changes: 3 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type ChainHeaderReader interface {

// UpdateEtxEligibleSlices updates the etx eligible slice for the given zone location
UpdateEtxEligibleSlices(header *types.WorkObject, location common.Location) common.Hash

// WriteAddressOutpoints writes the address outpoints to the database
WriteAddressOutpoints(outpointsMap map[string]map[string]*types.OutpointAndDenomination) error
}

// ChainReader defines a small collection of methods needed to access the local
Expand Down
6 changes: 5 additions & 1 deletion consensus/progpow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,11 @@ func (progpow *Progpow) Finalize(chain consensus.ChainHeaderReader, header *type
continue
}
}
core.AddGenesisUtxos(state, nodeLocation, progpow.logger)
addressOutpointMap := make(map[string]map[string]*types.OutpointAndDenomination)
core.AddGenesisUtxos(state, nodeLocation, addressOutpointMap, progpow.logger)
if chain.Config().IndexAddressUtxos {
chain.WriteAddressOutpoints(addressOutpointMap)
}
}
header.Header().SetUTXORoot(state.UTXORoot())
header.Header().SetEVMRoot(state.IntermediateRoot(true))
Expand Down
4 changes: 2 additions & 2 deletions core/bloom_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ type BloomIndexer struct {

// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
// canonical chain for fast logs filtering.
func NewBloomIndexer(db ethdb.Database, size, confirms uint64, nodeCtx int, logger *log.Logger) *ChainIndexer {
func NewBloomIndexer(db ethdb.Database, size, confirms uint64, nodeCtx int, logger *log.Logger, indexAddressUtxos bool) *ChainIndexer {
backend := &BloomIndexer{
db: db,
size: size,
logger: logger,
}
table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix), db.Location(), db.Logger())

return NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits", nodeCtx, logger)
return NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits", nodeCtx, logger, indexAddressUtxos)
}

// Reset implements core.ChainIndexerBackend, starting a new bloombits index
Expand Down
186 changes: 172 additions & 14 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/core/rawdb"
"github.com/dominant-strategies/go-quai/core/state"
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/crypto"
"github.com/dominant-strategies/go-quai/ethdb"
"github.com/dominant-strategies/go-quai/event"
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/params"
)

// ChainIndexerBackend defines the methods needed to process chain segments in
Expand Down Expand Up @@ -62,6 +65,8 @@ type ChainIndexerChain interface {
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
// NodeCtx returns the context of the chain
NodeCtx() int
// StateAt returns the state for a state trie root and utxo root
StateAt(root common.Hash, utxoRoot common.Hash, etxRoot common.Hash) (*state.StateDB, error)
}

// ChainIndexer does a post-processing job for equally sized sections of the
Expand All @@ -79,6 +84,7 @@ type ChainIndexer struct {
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to
GetBloom func(common.Hash) (*types.Bloom, error)
StateAt func(common.Hash, common.Hash, common.Hash) (*state.StateDB, error)
active uint32 // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed
quit chan chan error // Quit channel to tear down running goroutines
Expand All @@ -96,22 +102,25 @@ type ChainIndexer struct {

logger *log.Logger
lock sync.Mutex

indexAddressUtxos bool
}

// NewChainIndexer creates a new chain indexer to do background processing on
// chain segments of a given size after certain number of confirmations passed.
// The throttling parameter might be used to prevent database thrashing.
func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string, nodeCtx int, logger *log.Logger) *ChainIndexer {
func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string, nodeCtx int, logger *log.Logger, indexAddressUtxos bool) *ChainIndexer {
c := &ChainIndexer{
chainDb: chainDb,
indexDb: indexDb,
backend: backend,
update: make(chan struct{}, 1),
quit: make(chan chan error),
sectionSize: section,
confirmsReq: confirm,
throttling: throttling,
logger: logger,
chainDb: chainDb,
indexDb: indexDb,
backend: backend,
update: make(chan struct{}, 1),
quit: make(chan chan error),
sectionSize: section,
confirmsReq: confirm,
throttling: throttling,
logger: logger,
indexAddressUtxos: indexAddressUtxos,
}
// Initialize database dependent fields and start the updater
c.loadValidSections()
Expand All @@ -125,11 +134,12 @@ func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend Cha
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
func (c *ChainIndexer) Start(chain ChainIndexerChain) {
func (c *ChainIndexer) Start(chain ChainIndexerChain, config params.ChainConfig) {
events := make(chan ChainHeadEvent, 10)
sub := chain.SubscribeChainHeadEvent(events)
c.GetBloom = chain.GetBloom
go c.eventLoop(chain.CurrentHeader(), events, sub, chain.NodeCtx())
c.StateAt = chain.StateAt
go c.eventLoop(chain.CurrentHeader(), events, sub, chain.NodeCtx(), config)
}

// Close tears down all goroutines belonging to the indexer and returns any error
Expand Down Expand Up @@ -174,13 +184,13 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.WorkObject, events chan ChainHeadEvent, sub event.Subscription, nodeCtx int) {
func (c *ChainIndexer) eventLoop(currentHeader *types.WorkObject, events chan ChainHeadEvent, sub event.Subscription, nodeCtx int, config params.ChainConfig) {
defer func() {
if r := recover(); r != nil {
c.logger.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Fatal("Go-Quai Panicked")
}).Error("Go-Quai Panicked")
}
}()
// Mark the chain indexer as active, requiring an additional teardown
Expand Down Expand Up @@ -210,18 +220,61 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.WorkObject, events chan Ch
return
}
header := ev.Block

var validUtxoIndex bool
var addressOutpoints map[string]map[string]*types.OutpointAndDenomination
if c.indexAddressUtxos {
validUtxoIndex = true
addressOutpoints = rawdb.ReadAddressOutpoints(c.chainDb, config.Location)
}

if header.ParentHash(nodeCtx) != prevHash {
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
// TODO: This seems a bit brittle, can we detect this case explicitly?

if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.NumberU64(nodeCtx)) != prevHash {
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header, nodeCtx); h != nil {

// If indexAddressUtxos flag is enabled, update the address utxo map
// TODO: Need to be able to turn on/off indexer and fix corrupted state
if c.indexAddressUtxos {
reorgHeaders := make([]*types.WorkObject, 0)
for prev := prevHeader; prev.Hash() != h.Hash(); prev = rawdb.ReadHeader(c.chainDb, prev.ParentHash(nodeCtx)) {
reorgHeaders = append(reorgHeaders, h)
}

// Reorg out all outpoints of the reorg headers
err := c.reorgUtxoIndexer(reorgHeaders, addressOutpoints, nodeCtx, config)
if err != nil {
c.logger.Error("Failed to reorg utxo indexer", "err", err)
validUtxoIndex = false
}

// Add new blocks from current hash back to common ancestor
for curr := header; curr.Hash() != h.Hash(); curr = rawdb.ReadHeader(c.chainDb, curr.ParentHash(nodeCtx)) {
block := rawdb.ReadWorkObject(c.chainDb, curr.Hash(), types.BlockObject)
c.addOutpointsToIndexer(addressOutpoints, nodeCtx, config, block)
}
}

c.newHead(h.NumberU64(nodeCtx), true)
}
}
}

if c.indexAddressUtxos {
c.addOutpointsToIndexer(addressOutpoints, nodeCtx, config, ev.Block)
}

c.newHead(header.NumberU64(nodeCtx), false)

if c.indexAddressUtxos && validUtxoIndex {
err := rawdb.WriteAddressOutpoints(c.chainDb, addressOutpoints)
if err != nil {
panic(err)
}
}

prevHeader, prevHash = header, header.Hash()
}
}
Expand Down Expand Up @@ -490,3 +543,108 @@ func (c *ChainIndexer) removeSectionHead(section uint64) {

c.indexDb.Delete(append([]byte("shead"), data[:]...))
}

// addOutpointsToIndexer removes the spent outpoints and adds new utxos to the indexer.
func (c *ChainIndexer) addOutpointsToIndexer(addressOutpoints map[string]map[string]*types.OutpointAndDenomination, nodeCtx int, config params.ChainConfig, block *types.WorkObject) {
utxos := block.QiTransactions()

for _, tx := range utxos {
for _, in := range tx.TxIn() {

// Skip Coinbase TxIns since they do not have a previous outpoint
if types.IsCoinBaseTx(tx, block.ParentHash(nodeCtx), config.Location) {
continue
}

outpoint := in.PreviousOutPoint

address := crypto.PubkeyBytesToAddress(in.PubKey, config.Location).Hex()
outpointsForAddress := addressOutpoints[address]

delete(outpointsForAddress, outpoint.Key())
}

for i, out := range tx.TxOut() {

addrBytes := out.Address
outpoint := types.OutPoint{
TxHash: tx.Hash(),
Index: uint16(i),
}

address := common.BytesToAddress(addrBytes, config.Location).Hex()

outpointAndDenom := &types.OutpointAndDenomination{
TxHash: outpoint.TxHash,
Index: outpoint.Index,
Denomination: out.Denomination,
}

if _, ok := addressOutpoints[address]; !ok {
addressOutpoints[address] = make(map[string]*types.OutpointAndDenomination)
}
addressOutpoints[address][outpointAndDenom.Key()] = outpointAndDenom
}
}
}

// reorgUtxoIndexer adds back previously removed outpoints and removes newly added outpoints.
// This is done in reverse order from the old header to the common ancestor.
func (c *ChainIndexer) reorgUtxoIndexer(headers []*types.WorkObject, addressOutpoints map[string]map[string]*types.OutpointAndDenomination, nodeCtx int, config params.ChainConfig) error {
for _, header := range headers {
block := rawdb.ReadWorkObject(c.chainDb, header.Hash(), types.BlockObject)

for _, tx := range block.QiTransactions() {
for i, out := range tx.TxOut() {

address := out.Address

addr := common.BytesToAddress(address, config.Location).Hex()
outpointsForAddress := addressOutpoints[addr]

// reconstruct outpoint to remove it via outpoint.Key()
outpoint := types.OutPoint{
TxHash: tx.Hash(),
Index: uint16(i),
}

delete(outpointsForAddress, outpoint.Key())
}

if types.IsCoinBaseTx(tx, block.ParentHash(nodeCtx), config.Location) {
continue
}

for _, in := range tx.TxIn() {
outpoint := in.PreviousOutPoint
address := crypto.PubkeyBytesToAddress(in.PubKey, config.Location).Hex()

parent := rawdb.ReadHeader(c.chainDb, block.ParentHash(nodeCtx))

state, err := c.StateAt(parent.EVMRoot(), parent.UTXORoot(), parent.EtxSetRoot())
if err != nil {
return err
}

entry := state.GetUTXO(outpoint.TxHash, outpoint.Index)
if entry == nil {
// missing entry while tryig to add back outpoint
continue
}

outpointAndDenom := &types.OutpointAndDenomination{
TxHash: outpoint.TxHash,
Index: outpoint.Index,
Denomination: entry.Denomination,
}

if _, ok := addressOutpoints[address]; !ok {
addressOutpoints[address] = make(map[string]*types.OutpointAndDenomination)
}
addressOutpoints[address][outpointAndDenom.Key()] = outpointAndDenom
}

}
}
return nil
}
33 changes: 25 additions & 8 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,8 @@ type Core struct {
logger *log.Logger
}

type IndexerConfig struct {
IndexAddressUtxos bool
}

func NewCore(db ethdb.Database, config *Config, isLocalBlock func(block *types.WorkObject) bool, txConfig *TxPoolConfig, txLookupLimit *uint64, chainConfig *params.ChainConfig, slicesRunning []common.Location, currentExpansionNumber uint8, genesisBlock *types.WorkObject, domClientUrl string, subClientUrls []string, engine consensus.Engine, cacheConfig *CacheConfig, vmConfig vm.Config, indexerConfig *IndexerConfig, genesis *Genesis, logger *log.Logger) (*Core, error) {
slice, err := NewSlice(db, config, txConfig, txLookupLimit, isLocalBlock, chainConfig, slicesRunning, currentExpansionNumber, genesisBlock, domClientUrl, subClientUrls, engine, cacheConfig, indexerConfig, vmConfig, genesis, logger)
func NewCore(db ethdb.Database, config *Config, isLocalBlock func(block *types.WorkObject) bool, txConfig *TxPoolConfig, txLookupLimit *uint64, chainConfig *params.ChainConfig, slicesRunning []common.Location, currentExpansionNumber uint8, genesisBlock *types.WorkObject, domClientUrl string, subClientUrls []string, engine consensus.Engine, cacheConfig *CacheConfig, vmConfig vm.Config, genesis *Genesis, logger *log.Logger) (*Core, error) {
slice, err := NewSlice(db, config, txConfig, txLookupLimit, isLocalBlock, chainConfig, slicesRunning, currentExpansionNumber, genesisBlock, domClientUrl, subClientUrls, engine, cacheConfig, vmConfig, genesis, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -942,6 +938,10 @@ func (c *Core) CheckIfEtxIsEligible(etxEligibleSlices common.Hash, location comm
return c.sl.hc.CheckIfEtxIsEligible(etxEligibleSlices, location)
}

func (c *Core) WriteAddressOutpoints(outpoints map[string]map[string]*types.OutpointAndDenomination) error {
return c.sl.hc.WriteAddressOutpoints(outpoints)
}

//--------------------//
// BlockChain methods //
//--------------------//
Expand Down Expand Up @@ -1142,8 +1142,25 @@ func (c *Core) TrieNode(hash common.Hash) ([]byte, error) {
return c.sl.hc.bc.processor.TrieNode(hash)
}

func (c *Core) GetUTXOsByAddress(addr common.Address) ([]*types.UtxoEntry, error) {
return c.sl.hc.bc.processor.GetUTXOsByAddress(addr)
func (c *Core) GetOutpointsByAddress(address common.Address) map[string]*types.OutpointAndDenomination {
outpoints := rawdb.ReadAddressOutpoints(c.sl.hc.bc.db, c.sl.hc.NodeLocation())
outpointsForAddress := outpoints[address.Hex()]
return outpointsForAddress
}

func (c *Core) GetUTXOsByAddressAtState(state *state.StateDB, address common.Address) ([]*types.UtxoEntry, error) {
outpointsForAddress := c.GetOutpointsByAddress(address)
utxos := make([]*types.UtxoEntry, 0, len(outpointsForAddress))

for _, outpoint := range outpointsForAddress {
entry := state.GetUTXO(outpoint.TxHash, outpoint.Index)
if entry == nil {
return nil, errors.New("failed to get UTXO for address")
}
utxos = append(utxos, entry)
}

return utxos, nil
}

//----------------//
Expand Down
Loading

0 comments on commit bfd3819

Please sign in to comment.