Skip to content

Commit

Permalink
[db] Share db for two contract indexer (#4307)
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc authored Jun 24, 2024
1 parent f001407 commit afa504c
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 112 deletions.
74 changes: 36 additions & 38 deletions blockchain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,25 @@ import (
type (
// Config is the config struct for blockchain package
Config struct {
ChainDBPath string `yaml:"chainDBPath"`
TrieDBPatchFile string `yaml:"trieDBPatchFile"`
TrieDBPath string `yaml:"trieDBPath"`
StakingPatchDir string `yaml:"stakingPatchDir"`
IndexDBPath string `yaml:"indexDBPath"`
BloomfilterIndexDBPath string `yaml:"bloomfilterIndexDBPath"`
CandidateIndexDBPath string `yaml:"candidateIndexDBPath"`
StakingIndexDBPath string `yaml:"stakingIndexDBPath"`
SGDIndexDBPath string `yaml:"sgdIndexDBPath"`
ContractStakingIndexDBPath string `yaml:"contractStakingIndexDBPath"`
ContractStakingIndexV2DBPath string `yaml:"contractStakingIndexV2DBPath"`
ID uint32 `yaml:"id"`
EVMNetworkID uint32 `yaml:"evmNetworkID"`
Address string `yaml:"address"`
ProducerPrivKey string `yaml:"producerPrivKey"`
ProducerPrivKeySchema string `yaml:"producerPrivKeySchema"`
SignatureScheme []string `yaml:"signatureScheme"`
EmptyGenesis bool `yaml:"emptyGenesis"`
GravityChainDB db.Config `yaml:"gravityChainDB"`
Committee committee.Config `yaml:"committee"`
ChainDBPath string `yaml:"chainDBPath"`
TrieDBPatchFile string `yaml:"trieDBPatchFile"`
TrieDBPath string `yaml:"trieDBPath"`
StakingPatchDir string `yaml:"stakingPatchDir"`
IndexDBPath string `yaml:"indexDBPath"`
BloomfilterIndexDBPath string `yaml:"bloomfilterIndexDBPath"`
CandidateIndexDBPath string `yaml:"candidateIndexDBPath"`
StakingIndexDBPath string `yaml:"stakingIndexDBPath"`
SGDIndexDBPath string `yaml:"sgdIndexDBPath"`
ContractStakingIndexDBPath string `yaml:"contractStakingIndexDBPath"`
ID uint32 `yaml:"id"`
EVMNetworkID uint32 `yaml:"evmNetworkID"`
Address string `yaml:"address"`
ProducerPrivKey string `yaml:"producerPrivKey"`
ProducerPrivKeySchema string `yaml:"producerPrivKeySchema"`
SignatureScheme []string `yaml:"signatureScheme"`
EmptyGenesis bool `yaml:"emptyGenesis"`
GravityChainDB db.Config `yaml:"gravityChainDB"`
Committee committee.Config `yaml:"committee"`

EnableTrielessStateDB bool `yaml:"enableTrielessStateDB"`
// EnableStateDBCaching enables cachedStateDBOption
Expand Down Expand Up @@ -78,24 +77,23 @@ type (
var (
// DefaultConfig is the default config of chain
DefaultConfig = Config{
ChainDBPath: "/var/data/chain.db",
TrieDBPatchFile: "/var/data/trie.db.patch",
TrieDBPath: "/var/data/trie.db",
StakingPatchDir: "/var/data",
IndexDBPath: "/var/data/index.db",
BloomfilterIndexDBPath: "/var/data/bloomfilter.index.db",
CandidateIndexDBPath: "/var/data/candidate.index.db",
StakingIndexDBPath: "/var/data/staking.index.db",
SGDIndexDBPath: "/var/data/sgd.index.db",
ContractStakingIndexDBPath: "/var/data/contractstaking.index.db",
ContractStakingIndexV2DBPath: "/var/data/contractstaking.index.v2.db",
ID: 1,
EVMNetworkID: 4689,
Address: "",
ProducerPrivKey: generateRandomKey(SigP256k1),
SignatureScheme: []string{SigP256k1},
EmptyGenesis: false,
GravityChainDB: db.Config{DbPath: "/var/data/poll.db", NumRetries: 10},
ChainDBPath: "/var/data/chain.db",
TrieDBPatchFile: "/var/data/trie.db.patch",
TrieDBPath: "/var/data/trie.db",
StakingPatchDir: "/var/data",
IndexDBPath: "/var/data/index.db",
BloomfilterIndexDBPath: "/var/data/bloomfilter.index.db",
CandidateIndexDBPath: "/var/data/candidate.index.db",
StakingIndexDBPath: "/var/data/staking.index.db",
SGDIndexDBPath: "/var/data/sgd.index.db",
ContractStakingIndexDBPath: "/var/data/contractstaking.index.db",
ID: 1,
EVMNetworkID: 4689,
Address: "",
ProducerPrivKey: generateRandomKey(SigP256k1),
SignatureScheme: []string{SigP256k1},
EmptyGenesis: false,
GravityChainDB: db.Config{DbPath: "/var/data/poll.db", NumRetries: 10},
Committee: committee.Config{
GravityChainAPIs: []string{},
},
Expand Down
71 changes: 29 additions & 42 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,52 +324,42 @@ func (builder *Builder) buildContractStakingIndexer(forTest bool) error {
if !builder.cfg.Chain.EnableStakingProtocol {
return nil
}
if builder.cs.contractStakingIndexer != nil {
return nil
}
if forTest || builder.cfg.Genesis.SystemStakingContractAddress == "" {
if forTest {
builder.cs.contractStakingIndexer = nil
builder.cs.contractStakingIndexerV2 = nil
return nil
}
dbConfig := builder.cfg.DB
dbConfig.DbPath = builder.cfg.Chain.ContractStakingIndexDBPath
voteCalcConsts := builder.cfg.Genesis.VoteWeightCalConsts
indexer, err := contractstaking.NewContractStakingIndexer(
db.NewBoltDB(dbConfig),
contractstaking.Config{
ContractAddress: builder.cfg.Genesis.SystemStakingContractAddress,
ContractDeployHeight: builder.cfg.Genesis.SystemStakingContractHeight,
CalculateVoteWeight: func(v *staking.VoteBucket) *big.Int {
return staking.CalculateVoteWeight(voteCalcConsts, v, false)
},
BlockInterval: builder.cfg.DardanellesUpgrade.BlockInterval,
})
if err != nil {
return err
kvstore := db.NewBoltDB(dbConfig)
// build contract staking indexer
if builder.cs.contractStakingIndexer == nil && len(builder.cfg.Genesis.SystemStakingContractAddress) > 0 {
voteCalcConsts := builder.cfg.Genesis.VoteWeightCalConsts
indexer, err := contractstaking.NewContractStakingIndexer(
kvstore,
contractstaking.Config{
ContractAddress: builder.cfg.Genesis.SystemStakingContractAddress,
ContractDeployHeight: builder.cfg.Genesis.SystemStakingContractHeight,
CalculateVoteWeight: func(v *staking.VoteBucket) *big.Int {
return staking.CalculateVoteWeight(voteCalcConsts, v, false)
},
BlockInterval: builder.cfg.DardanellesUpgrade.BlockInterval,
})
if err != nil {
return err
}
builder.cs.contractStakingIndexer = indexer
}
// build contract staking indexer v2
if builder.cs.contractStakingIndexerV2 == nil && len(builder.cfg.Genesis.SystemStakingContractV2Address) > 0 {
indexer := stakingindex.NewIndexer(
kvstore,
builder.cfg.Genesis.SystemStakingContractV2Address,
builder.cfg.Genesis.SystemStakingContractV2Height, builder.cfg.DardanellesUpgrade.BlockInterval,
)
builder.cs.contractStakingIndexerV2 = indexer
}
builder.cs.contractStakingIndexer = indexer
return nil
}

func (builder *Builder) buildContractStakingIndexerV2(forTest bool) error {
if !builder.cfg.Chain.EnableStakingProtocol {
return nil
}
if builder.cs.contractStakingIndexerV2 != nil {
return nil
}
if forTest || builder.cfg.Genesis.SystemStakingContractV2Address == "" {
builder.cs.contractStakingIndexerV2 = nil
return nil
}
dbConfig := builder.cfg.DB
dbConfig.DbPath = builder.cfg.Chain.ContractStakingIndexV2DBPath
indexerV2 := stakingindex.NewIndexer(
db.NewBoltDB(dbConfig),
builder.cfg.Genesis.SystemStakingContractV2Address,
builder.cfg.Genesis.SystemStakingContractV2Height, builder.cfg.DardanellesUpgrade.BlockInterval,
)
builder.cs.contractStakingIndexerV2 = indexerV2
return nil
}

Expand Down Expand Up @@ -765,9 +755,6 @@ func (builder *Builder) build(forSubChain, forTest bool) (*ChainService, error)
if err := builder.buildContractStakingIndexer(forTest); err != nil {
return nil, err
}
if err := builder.buildContractStakingIndexerV2(forTest); err != nil {
return nil, err
}
if err := builder.buildBlockDAO(forTest); err != nil {
return nil, err
}
Expand Down
14 changes: 14 additions & 0 deletions db/db_bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package db
import (
"bytes"
"context"
"sync"
"syscall"

"github.com/pkg/errors"
Expand Down Expand Up @@ -43,6 +44,7 @@ type BoltDB struct {
db *bolt.DB
path string
config Config
mutex sync.Mutex
}

// NewBoltDB instantiates an BoltDB with implements KVStore
Expand All @@ -56,6 +58,12 @@ func NewBoltDB(cfg Config) *BoltDB {

// Start opens the BoltDB (creates new file if not existing yet)
func (b *BoltDB) Start(_ context.Context) error {
b.mutex.Lock()
defer b.mutex.Unlock()

if b.IsReady() {
return nil
}
opts := *bolt.DefaultOptions
if b.config.ReadOnly {
opts.ReadOnly = true
Expand All @@ -70,6 +78,12 @@ func (b *BoltDB) Start(_ context.Context) error {

// Stop closes the BoltDB
func (b *BoltDB) Stop(_ context.Context) error {
b.mutex.Lock()
defer b.mutex.Unlock()

if !b.IsReady() {
return nil
}
if err := b.TurnOff(); err != nil {
return err
}
Expand Down
4 changes: 0 additions & 4 deletions e2etest/e2etest.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ func initDBPaths(r *require.Assertions, cfg *config.Config) {
r.NoError(err)
testContractIndexPath, err := testutil.PathOfTempFile("contractindex")
r.NoError(err)
testContractIndexPathV2, err := testutil.PathOfTempFile("contractindexv2")
r.NoError(err)
testSGDIndexPath, err := testutil.PathOfTempFile("sgdindex")
r.NoError(err)
testBloomfilterIndexPath, err := testutil.PathOfTempFile("bloomfilterindex")
Expand All @@ -185,7 +183,6 @@ func initDBPaths(r *require.Assertions, cfg *config.Config) {
cfg.Chain.ChainDBPath = testDBPath
cfg.Chain.IndexDBPath = testIndexPath
cfg.Chain.ContractStakingIndexDBPath = testContractIndexPath
cfg.Chain.ContractStakingIndexV2DBPath = testContractIndexPathV2
cfg.Chain.SGDIndexDBPath = testSGDIndexPath
cfg.Chain.BloomfilterIndexDBPath = testBloomfilterIndexPath
cfg.Chain.CandidateIndexDBPath = testCandidateIndexPath
Expand All @@ -200,7 +197,6 @@ func clearDBPaths(cfg *config.Config) {
testutil.CleanupPath(cfg.Chain.CandidateIndexDBPath)
testutil.CleanupPath(cfg.Chain.StakingIndexDBPath)
testutil.CleanupPath(cfg.Chain.ContractStakingIndexDBPath)
testutil.CleanupPath(cfg.Chain.ContractStakingIndexV2DBPath)
testutil.CleanupPath(cfg.DB.DbPath)
testutil.CleanupPath(cfg.Chain.IndexDBPath)
testutil.CleanupPath(cfg.Chain.SGDIndexDBPath)
Expand Down
6 changes: 0 additions & 6 deletions e2etest/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,6 @@ func TestLocalSync(t *testing.T) {
require.NoError(err)
contractIndexDBPath2, err := testutil.PathOfTempFile(_dBPath2)
require.NoError(err)
contractIndexDBV2Path2, err := testutil.PathOfTempFile(_dBPath2 + "v2")
require.NoError(err)
indexSGDDBPath2, err := testutil.PathOfTempFile(_dBPath2 + "_sgd")
require.NoError(err)
cfg, err = newTestConfig()
Expand All @@ -377,7 +375,6 @@ func TestLocalSync(t *testing.T) {
cfg.Chain.ChainDBPath = testDBPath2
cfg.Chain.IndexDBPath = indexDBPath2
cfg.Chain.ContractStakingIndexDBPath = contractIndexDBPath2
cfg.Chain.ContractStakingIndexV2DBPath = contractIndexDBV2Path2
cfg.Chain.SGDIndexDBPath = indexSGDDBPath2
defer func() {
testutil.CleanupPath(testTriePath2)
Expand Down Expand Up @@ -435,8 +432,6 @@ func TestStartExistingBlockchain(t *testing.T) {
require.NoError(err)
testContractStakeIndexPath, err := testutil.PathOfTempFile(_dBPath)
require.NoError(err)
testContractStakeIndexPathV2, err := testutil.PathOfTempFile(_dBPath + "v2")
require.NoError(err)
testSGDIndexPath, err := testutil.PathOfTempFile(_dBPath + "_sgd")
require.NoError(err)
// Disable block reward to make bookkeeping easier
Expand All @@ -446,7 +441,6 @@ func TestStartExistingBlockchain(t *testing.T) {
cfg.Chain.ChainDBPath = testDBPath
cfg.Chain.IndexDBPath = testIndexPath
cfg.Chain.ContractStakingIndexDBPath = testContractStakeIndexPath
cfg.Chain.ContractStakingIndexV2DBPath = testContractStakeIndexPathV2
cfg.Chain.SGDIndexDBPath = testSGDIndexPath
cfg.Chain.EnableAsyncIndexWrite = false
cfg.ActPool.MinGasPriceStr = "0"
Expand Down
1 change: 0 additions & 1 deletion e2etest/local_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,6 @@ func newTransferConfig(
cfg.System.SystemLogDBPath = systemLogDBPath
cfg.Chain.CandidateIndexDBPath = candidateIndexDBPath
cfg.Chain.ContractStakingIndexDBPath = contractstakeIndexDBPath
cfg.Chain.ContractStakingIndexV2DBPath = contractstakeIndexDBPathV2
cfg.Chain.EnableAsyncIndexWrite = true
cfg.ActPool.MinGasPriceStr = "0"
cfg.Consensus.Scheme = config.StandaloneScheme
Expand Down
1 change: 0 additions & 1 deletion e2etest/nodeinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func newConfigForNodeInfoTest(triePath, dBPath, idxDBPath, contractIdxDBPath str
cfg.Chain.ChainDBPath = testDBPath
cfg.Chain.IndexDBPath = indexDBPath
cfg.Chain.ContractStakingIndexDBPath = contractIndexDBPath
cfg.Chain.ContractStakingIndexV2DBPath = contractIndexV2DBPath
cfg.Chain.SGDIndexDBPath = sgdIndexDBPath
return cfg, func() {
testutil.CleanupPath(testTriePath)
Expand Down
1 change: 0 additions & 1 deletion server/itx/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func newConfig(t *testing.T) (config.Config, func()) {
cfg.Chain.SGDIndexDBPath = sgdPath
cfg.Chain.TrieDBPatchFile = ""
cfg.Chain.ContractStakingIndexDBPath = contractIndexPath
cfg.Chain.ContractStakingIndexV2DBPath = contractIndexPathV2
return cfg, func() {
testutil.CleanupPath(dbPath)
testutil.CleanupPath(triePath)
Expand Down
11 changes: 7 additions & 4 deletions systemcontractindex/stakingindex/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@ type cache struct {
buckets map[uint64]*Bucket
bucketsByCandidate map[string]map[uint64]struct{}
totalBucketCount uint64
ns, bucketNS string
}

func newCache() *cache {
func newCache(ns, bucketNS string) *cache {
return &cache{
buckets: make(map[uint64]*Bucket),
bucketsByCandidate: make(map[string]map[uint64]struct{}),
ns: ns,
bucketNS: bucketNS,
}
}

func (s *cache) Load(kvstore db.KVStore) error {
// load total bucket count
var totalBucketCount uint64
tbc, err := kvstore.Get(stakingNS, stakingTotalBucketCountKey)
tbc, err := kvstore.Get(s.ns, stakingTotalBucketCountKey)
if err != nil {
if !errors.Is(err, db.ErrNotExist) {
return err
Expand All @@ -39,7 +42,7 @@ func (s *cache) Load(kvstore db.KVStore) error {
s.totalBucketCount = totalBucketCount

// load buckets
ks, vs, err := kvstore.Filter(stakingBucketNS, func(k, v []byte) bool { return true }, nil, nil)
ks, vs, err := kvstore.Filter(s.bucketNS, func(k, v []byte) bool { return true }, nil, nil)
if err != nil && !errors.Is(err, db.ErrBucketNotExist) {
return err
}
Expand All @@ -54,7 +57,7 @@ func (s *cache) Load(kvstore db.KVStore) error {
}

func (s *cache) Copy() *cache {
c := newCache()
c := newCache(s.ns, s.bucketNS)
for k, v := range s.buckets {
c.buckets[k] = v.Clone()
}
Expand Down
26 changes: 14 additions & 12 deletions systemcontractindex/stakingindex/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@ const (
maxBlockNumber uint64 = math.MaxUint64
)

type eventHandler struct {
dirty *cache // dirty cache, a view for current block
delta batch.KVStoreBatch // delta for db to store buckets of current block
tokenOwner map[uint64]address.Address
}

var (
stakingContractABI = staking.StakingContractABI

// ErrBucketNotExist is the error when bucket does not exist
ErrBucketNotExist = errors.New("bucket does not exist")
)

func newEventHandler(dirty *cache) *eventHandler {
type eventHandler struct {
stakingBucketNS string
dirty *cache // dirty cache, a view for current block
delta batch.KVStoreBatch // delta for db to store buckets of current block
tokenOwner map[uint64]address.Address
}

func newEventHandler(bucketNS string, dirty *cache) *eventHandler {
return &eventHandler{
dirty: dirty,
delta: batch.NewBatch(),
tokenOwner: make(map[uint64]address.Address),
stakingBucketNS: bucketNS,
dirty: dirty,
delta: batch.NewBatch(),
tokenOwner: make(map[uint64]address.Address),
}
}

Expand Down Expand Up @@ -303,10 +305,10 @@ func (eh *eventHandler) Finalize() (batch.KVStoreBatch, *cache) {

func (eh *eventHandler) putBucket(id uint64, bkt *Bucket) {
eh.dirty.PutBucket(id, bkt)
eh.delta.Put(stakingBucketNS, byteutil.Uint64ToBytesBigEndian(id), bkt.Serialize(), "failed to put bucket")
eh.delta.Put(eh.stakingBucketNS, byteutil.Uint64ToBytesBigEndian(id), bkt.Serialize(), "failed to put bucket")
}

func (eh *eventHandler) delBucket(id uint64) {
eh.dirty.DeleteBucket(id)
eh.delta.Delete(stakingBucketNS, byteutil.Uint64ToBytesBigEndian(id), "failed to delete bucket")
eh.delta.Delete(eh.stakingBucketNS, byteutil.Uint64ToBytesBigEndian(id), "failed to delete bucket")
}
Loading

0 comments on commit afa504c

Please sign in to comment.