Skip to content

Commit

Permalink
[contract_indexer] improve robustness (#3892)
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc authored Jun 26, 2023
1 parent 1abbc44 commit 13f4826
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 45 deletions.
68 changes: 52 additions & 16 deletions blockindex/contractstaking/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type (
bucketTypeMap map[uint64]*BucketType // map[bucketTypeId]BucketType
propertyBucketTypeMap map[int64]map[uint64]uint64 // map[amount][duration]index
totalBucketCount uint64 // total number of buckets including burned buckets
height uint64 // current block height, it's put in cache for consistency on merge
contractAddress string // contract address for the bucket
mutex sync.RWMutex // a RW mutex for the cache to protect concurrent access
}
Expand All @@ -43,6 +44,12 @@ func newContractStakingCache(contractAddr string) *contractStakingCache {
}
}

func (s *contractStakingCache) Height() uint64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.height
}

func (s *contractStakingCache) CandidateVotes(candidate address.Address) *big.Int {
s.mutex.RLock()
defer s.mutex.RUnlock()
Expand Down Expand Up @@ -183,24 +190,18 @@ func (s *contractStakingCache) DeleteBucketInfo(id uint64) {
s.deleteBucketInfo(id)
}

func (s *contractStakingCache) Merge(delta *contractStakingDelta) error {
func (s *contractStakingCache) Merge(delta *contractStakingDelta, height uint64) error {
s.mutex.Lock()
defer s.mutex.Unlock()

if err := s.merge(delta); err != nil {
if err := s.mergeDelta(delta); err != nil {
return err
}
s.putHeight(height)
s.putTotalBucketCount(s.totalBucketCount + delta.AddedBucketCnt())
return nil
}

func (s *contractStakingCache) PutTotalBucketCount(count uint64) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.putTotalBucketCount(count)
}

func (s *contractStakingCache) MatchBucketType(amount *big.Int, duration uint64) (uint64, *BucketType, bool) {
s.mutex.RLock()
defer s.mutex.RUnlock()
Expand All @@ -223,13 +224,28 @@ func (s *contractStakingCache) LoadFromDB(kvstore db.KVStore) error {
s.mutex.Lock()
defer s.mutex.Unlock()

// load height
var height uint64
h, err := kvstore.Get(_StakingNS, _stakingHeightKey)
if err != nil {
if !errors.Is(err, db.ErrNotExist) {
return err
}
height = 0
} else {
height = byteutil.BytesToUint64BigEndian(h)

}
s.putHeight(height)

// load total bucket count
var totalBucketCount uint64
tbc, err := kvstore.Get(_StakingNS, _stakingTotalBucketCountKey)
if err != nil {
if !errors.Is(err, db.ErrNotExist) {
return err
}
totalBucketCount = 0
} else {
totalBucketCount = byteutil.BytesToUint64BigEndian(tbc)
}
Expand Down Expand Up @@ -320,8 +336,19 @@ func (s *contractStakingCache) getBucket(id uint64) (*Bucket, bool) {
}

func (s *contractStakingCache) putBucketType(id uint64, bt *BucketType) {
amount := bt.Amount.Int64()
// remove old bucket map
if oldBt, existed := s.bucketTypeMap[id]; existed {
amount := oldBt.Amount.Int64()
if _, existed := s.propertyBucketTypeMap[amount]; existed {
delete(s.propertyBucketTypeMap[amount], oldBt.Duration)
if len(s.propertyBucketTypeMap[amount]) == 0 {
delete(s.propertyBucketTypeMap, amount)
}
}
}
// add new bucket map
s.bucketTypeMap[id] = bt
amount := bt.Amount.Int64()
m, ok := s.propertyBucketTypeMap[amount]
if !ok {
s.propertyBucketTypeMap[amount] = make(map[uint64]uint64)
Expand All @@ -340,11 +367,16 @@ func (s *contractStakingCache) putBucketInfo(id uint64, bi *bucketInfo) {
}
s.candidateBucketMap[newDelegate][id] = true
// delete old candidate bucket map
if oldBi != nil {
oldDelegate := oldBi.Delegate.String()
if oldDelegate != newDelegate {
delete(s.candidateBucketMap[oldDelegate], id)
}
if oldBi == nil {
return
}
oldDelegate := oldBi.Delegate.String()
if oldDelegate == newDelegate {
return
}
delete(s.candidateBucketMap[oldDelegate], id)
if len(s.candidateBucketMap[oldDelegate]) == 0 {
delete(s.candidateBucketMap, oldDelegate)
}
}

Expand All @@ -364,7 +396,11 @@ func (s *contractStakingCache) putTotalBucketCount(count uint64) {
s.totalBucketCount = count
}

func (s *contractStakingCache) merge(delta *contractStakingDelta) error {
func (s *contractStakingCache) putHeight(height uint64) {
s.height = height
}

func (s *contractStakingCache) mergeDelta(delta *contractStakingDelta) error {
for state, btMap := range delta.BucketTypeDelta() {
if state == deltaStateAdded || state == deltaStateModified {
for id, bt := range btMap {
Expand Down
14 changes: 8 additions & 6 deletions blockindex/contractstaking/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ func TestContractStakingCache_TotalBucketCount(t *testing.T) {
require.EqualValues(0, cache.TotalBucketCount())

// one bucket
cache.PutTotalBucketCount(1)
cache.putTotalBucketCount(1)
require.EqualValues(1, cache.TotalBucketCount())

// two buckets
cache.PutTotalBucketCount(2)
cache.putTotalBucketCount(2)
require.EqualValues(2, cache.TotalBucketCount())

// delete bucket 1
Expand Down Expand Up @@ -312,25 +312,27 @@ func TestContractStakingCache_ActiveBucketTypes(t *testing.T) {
func TestContractStakingCache_Merge(t *testing.T) {
require := require.New(t)
cache := newContractStakingCache("")
height := uint64(1)

// create delta with one bucket type
delta := newContractStakingDelta()
delta.AddBucketType(1, &BucketType{Amount: big.NewInt(100), Duration: 100, ActivatedAt: 1})
// merge delta into cache
err := cache.Merge(delta)
err := cache.Merge(delta, height)
require.NoError(err)
// check that bucket type was added to cache
activeBucketTypes := cache.ActiveBucketTypes()
require.Len(activeBucketTypes, 1)
require.EqualValues(100, activeBucketTypes[1].Amount.Int64())
require.EqualValues(100, activeBucketTypes[1].Duration)
require.EqualValues(1, activeBucketTypes[1].ActivatedAt)
require.EqualValues(height, cache.Height())

// create delta with one bucket
delta = newContractStakingDelta()
delta.AddBucketInfo(1, &bucketInfo{TypeIndex: 1, CreatedAt: 1, UnlockedAt: maxBlockNumber, UnstakedAt: maxBlockNumber, Delegate: identityset.Address(1), Owner: identityset.Address(2)})
// merge delta into cache
err = cache.Merge(delta)
err = cache.Merge(delta, height)
require.NoError(err)
// check that bucket was added to cache and vote count is correct
require.EqualValues(100, cache.CandidateVotes(identityset.Address(1)).Int64())
Expand All @@ -339,7 +341,7 @@ func TestContractStakingCache_Merge(t *testing.T) {
delta = newContractStakingDelta()
delta.UpdateBucketInfo(1, &bucketInfo{TypeIndex: 1, CreatedAt: 1, UnlockedAt: maxBlockNumber, UnstakedAt: maxBlockNumber, Delegate: identityset.Address(3), Owner: identityset.Address(2)})
// merge delta into cache
err = cache.Merge(delta)
err = cache.Merge(delta, height)
require.NoError(err)
// check that bucket delegate was updated and vote count is correct
require.EqualValues(0, cache.CandidateVotes(identityset.Address(1)).Int64())
Expand All @@ -349,7 +351,7 @@ func TestContractStakingCache_Merge(t *testing.T) {
delta = newContractStakingDelta()
delta.DeleteBucketInfo(1)
// merge delta into cache
err = cache.Merge(delta)
err = cache.Merge(delta, height)
require.NoError(err)
// check that bucket was deleted from cache and vote count is 0
require.EqualValues(0, cache.CandidateVotes(identityset.Address(3)).Int64())
Expand Down
24 changes: 3 additions & 21 deletions blockindex/contractstaking/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package contractstaking
import (
"context"
"math/big"
"sync/atomic"

"github.com/ethereum/go-ethereum/common/math"
"github.com/iotexproject/iotex-address/address"
Expand Down Expand Up @@ -54,7 +53,6 @@ type (
cache *contractStakingCache // in-memory index for clean data, used to query index data
contractAddress string // stake contract address
contractDeployHeight uint64 // height of the contract deployment
height atomic.Value // uint64, current block height
}
)

Expand Down Expand Up @@ -93,7 +91,7 @@ func (s *Indexer) Stop(ctx context.Context) error {

// Height returns the tip block height
func (s *Indexer) Height() (uint64, error) {
return s.height.Load().(uint64), nil
return s.cache.Height(), nil
}

// StartHeight returns the start height of the indexer
Expand Down Expand Up @@ -143,7 +141,7 @@ func (s *Indexer) BucketTypes() ([]*BucketType, error) {

// PutBlock puts a block into indexer
func (s *Indexer) PutBlock(ctx context.Context, blk *block.Block) error {
if blk.Height() < s.contractDeployHeight || blk.Height() <= s.height.Load().(uint64) {
if blk.Height() < s.contractDeployHeight || blk.Height() <= s.cache.Height() {
return nil
}
// new event handler for this block
Expand Down Expand Up @@ -176,7 +174,7 @@ func (s *Indexer) DeleteTipBlock(context.Context, *block.Block) error {
func (s *Indexer) commit(handler *contractStakingEventHandler, height uint64) error {
batch, delta := handler.Result()
// update cache
if err := s.cache.Merge(delta); err != nil {
if err := s.cache.Merge(delta, height); err != nil {
s.reloadCache()
return err
}
Expand All @@ -186,8 +184,6 @@ func (s *Indexer) commit(handler *contractStakingEventHandler, height uint64) er
s.reloadCache()
return err
}
// update indexer height cache
s.height.Store(height)
return nil
}

Expand All @@ -197,19 +193,5 @@ func (s *Indexer) reloadCache() error {
}

func (s *Indexer) loadFromDB() error {
// load height
var height uint64
h, err := s.kvstore.Get(_StakingNS, _stakingHeightKey)
if err != nil {
if !errors.Is(err, db.ErrNotExist) {
return err
}
height = 0
} else {
height = byteutil.BytesToUint64BigEndian(h)

}
s.height.Store(height)
// load cache
return s.cache.LoadFromDB(s.kvstore)
}
4 changes: 2 additions & 2 deletions blockindex/contractstaking/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ func TestIndexer_PutBlock(t *testing.T) {
r.NoError(indexer.Stop(context.Background()))
testutil.CleanupPath(dbPath)
}()
indexer.height.Store(height)
indexer.cache.putHeight(height)
// Create a mock block
builder := block.NewBuilder(block.NewRunnableActionsBuilder().Build())
builder.SetHeight(c.blockHeight)
Expand All @@ -856,7 +856,7 @@ func TestIndexer_PutBlock(t *testing.T) {
err = indexer.PutBlock(context.Background(), &blk)
r.NoError(err)
// Check the block height
r.EqualValues(c.expectedHeight, indexer.height.Load().(uint64))
r.EqualValues(c.expectedHeight, indexer.cache.Height())
})
}

Expand Down

0 comments on commit 13f4826

Please sign in to comment.