From 4ebe380297a069dd12fb79529d78769082c33616 Mon Sep 17 00:00:00 2001 From: Marko Boben Date: Sun, 8 Oct 2023 20:07:44 +0200 Subject: [PATCH 1/9] Removed address binder cronjob - integrated registering address into mirror cronjob --- indexer/cronjob/address_binder.go | 248 ------------------------ indexer/cronjob/address_binder_stubs.go | 156 --------------- indexer/cronjob/migrations.go | 11 -- indexer/cronjob/mirror.go | 112 +++++++++-- indexer/cronjob/mirror_stubs.go | 59 +++++- indexer/cronjob/mirror_test.go | 8 +- indexer/cronjob/voting.go | 8 +- indexer/runner/runner.go | 5 - utils/chain/tx_utils.go | 75 +++++++ 9 files changed, 234 insertions(+), 448 deletions(-) delete mode 100644 indexer/cronjob/address_binder.go delete mode 100644 indexer/cronjob/address_binder_stubs.go create mode 100644 utils/chain/tx_utils.go diff --git a/indexer/cronjob/address_binder.go b/indexer/cronjob/address_binder.go deleted file mode 100644 index e192491..0000000 --- a/indexer/cronjob/address_binder.go +++ /dev/null @@ -1,248 +0,0 @@ -package cronjob - -import ( - "flare-indexer/database" - indexerctx "flare-indexer/indexer/context" - "flare-indexer/logger" - "flare-indexer/utils" - "flare-indexer/utils/chain" - "flare-indexer/utils/staking" - "time" - - "github.com/ava-labs/avalanchego/utils/crypto" - mapset "github.com/deckarep/golang-set/v2" - "github.com/pkg/errors" -) - -const addressBinderStateName = "address_binder_cronjob" - -type addressBinderCronJob struct { - epochCronjob - db addressBinderDB - contracts addressBinderContracts - time utils.ShiftedTime -} - -type addressBinderDB interface { - FetchState(name string) (database.State, error) - UpdateJobState(epoch int64, force bool) error - GetPChainTxsForEpoch(start, end time.Time) ([]database.PChainTxData, error) - GetPChainTx(txID string, address string) (*database.PChainTxData, error) -} - -type addressBinderContracts interface { - GetMerkleRoot(epoch int64) ([32]byte, error) - IsAddressRegistered(address string) (bool, error) - RegisterPublicKey(publicKey crypto.PublicKey) error - EpochConfig() (time.Time, time.Duration, error) -} - -func NewAddressBinderCronjob(ctx indexerctx.IndexerContext) (Cronjob, error) { - cfg := ctx.Config() - - if !cfg.Mirror.Enabled { - return &addressBinderCronJob{}, nil - } - - contracts, err := initAddressBinderJobContracts(cfg) - if err != nil { - return nil, err - } - - start, period, err := contracts.EpochConfig() - if err != nil { - return nil, err - } - - epochs := staking.NewEpochInfo(&cfg.Mirror.EpochConfig, start, period) - - mc := &addressBinderCronJob{ - epochCronjob: newEpochCronjob(&cfg.Mirror.CronjobConfig, epochs), - db: NewAddressBinderDBGorm(ctx.DB()), - contracts: contracts, - } - - err = mc.reset(ctx.Flags().ResetMirrorCronjob) - - return mc, err -} - -func (c *addressBinderCronJob) Name() string { - return "address_binder" -} - -func (c *addressBinderCronJob) OnStart() error { - return nil -} - -func (c *addressBinderCronJob) Call() error { - epochRange, err := c.getEpochRange() - if err != nil { - if errors.Is(err, errNoEpochsToRegisterAddresses) { - logger.Debug("no epochs to register addresses") - return nil - } - - return err - } - - logger.Debug("registering addresses for epochs %d-%d", epochRange.start, epochRange.end) - registeredAddresses := mapset.NewSet[string]() - for epoch := epochRange.start; epoch <= epochRange.end; epoch++ { - logger.Debug("registering addresses for epoch %d", epoch) - if err := c.registerEpoch(registeredAddresses, epoch); err != nil { - return err - } - } - - logger.Debug("successfully registered addresses for epochs %d-%d", epochRange.start, epochRange.end) - - if err := c.db.UpdateJobState(epochRange.end+1, false); err != nil { - return err - } - - return nil -} - -var errNoEpochsToRegisterAddresses = errors.New("no epochs to register addresses") - -func (c *addressBinderCronJob) getEpochRange() (*epochRange, error) { - startEpoch, err := c.getStartEpoch() - if err != nil { - return nil, err - } - - logger.Debug("start epoch: %d", startEpoch) - - endEpoch, err := c.getEndEpoch(startEpoch) - if err != nil { - return nil, err - } - logger.Debug("Registering addresses needed for epochs [%d, %d]", startEpoch, endEpoch) - return c.getTrimmedEpochRange(startEpoch, endEpoch), nil -} - -func (c *addressBinderCronJob) getStartEpoch() (int64, error) { - jobState, err := c.db.FetchState(addressBinderStateName) - if err != nil { - return 0, err - } - - return int64(jobState.NextDBIndex), nil -} - -func (c *addressBinderCronJob) getEndEpoch(startEpoch int64) (int64, error) { - currEpoch := c.epochs.GetEpochIndex(c.time.Now()) - logger.Debug("current epoch: %d", currEpoch) - - for epoch := currEpoch; epoch > startEpoch; epoch-- { - confirmed, err := c.isEpochConfirmed(epoch) - if err != nil { - return 0, err - } - - if confirmed { - return epoch, nil - } - } - - return 0, errNoEpochsToRegisterAddresses -} - -func (c *addressBinderCronJob) isEpochConfirmed(epoch int64) (bool, error) { - merkleRoot, err := c.contracts.GetMerkleRoot(epoch) - if err != nil { - return false, errors.Wrap(err, "votingContract.GetMerkleRoot") - } - - return merkleRoot != [32]byte{}, nil -} - -func (c *addressBinderCronJob) registerEpoch(registeredAddresses mapset.Set[string], epoch int64) error { - txs, err := c.getEpochTxs(epoch) - if err != nil { - return err - } - - if len(txs) == 0 { - logger.Debug("no unregistered txs found") - return nil - } - - logger.Info("registering %d txs", len(txs)) - if err := c.registerTxs(registeredAddresses, txs, epoch); err != nil { - return err - } - - return nil -} - -func (c *addressBinderCronJob) getEpochTxs(epoch int64) ([]database.PChainTxData, error) { - startTimestamp, endTimestamp := c.epochs.GetTimeRange(epoch) - - txs, err := c.db.GetPChainTxsForEpoch(startTimestamp, endTimestamp) - if err != nil { - return nil, err - } - - return staking.DedupeTxs(txs), nil -} - -func (c *addressBinderCronJob) registerTxs(registeredAddresses mapset.Set[string], txs []database.PChainTxData, epochID int64) error { - for _, tx := range txs { - if registeredAddresses.Contains(tx.InputAddress) { - continue - } - if err := c.registerAddress(*tx.TxID, tx.InputAddress); err != nil { - // Non-fatal error, continue registering other addresses - logger.Error("error registering address: %s", err.Error()) - } - registeredAddresses.Add(tx.InputAddress) - } - return nil -} - -// Register address on AddressBinder contract if it is not already registered -func (c *addressBinderCronJob) registerAddress(txID string, address string) error { - registered, err := c.contracts.IsAddressRegistered(address) - if err != nil || registered { - return err - } - tx, err := c.db.GetPChainTx(txID, address) - if err != nil { - return err - } - if tx == nil { - return errors.New("tx not found") - } - publicKeys, err := chain.PublicKeysFromPChainBlock(tx.Bytes) - if err != nil { - return err - } - if tx.InputIndex >= uint32(len(publicKeys)) { - return errors.New("input index out of range") - } - publicKey := publicKeys[tx.InputIndex] - for _, k := range publicKey { - err := c.contracts.RegisterPublicKey(k) - if err != nil { - return errors.Wrap(err, "mirroringContract.RegisterPublicKey") - } - } - logger.Info("registered address %s on address binder contract", address) - return nil -} - -func (c *addressBinderCronJob) reset(firstEpoch int64) error { - if firstEpoch <= 0 { - return nil - } - - logger.Info("Resetting address binder cronjob state to epoch %d", firstEpoch) - err := c.db.UpdateJobState(firstEpoch, true) - if err != nil { - return err - } - c.epochs.First = firstEpoch - return nil -} diff --git a/indexer/cronjob/address_binder_stubs.go b/indexer/cronjob/address_binder_stubs.go deleted file mode 100644 index e7b4253..0000000 --- a/indexer/cronjob/address_binder_stubs.go +++ /dev/null @@ -1,156 +0,0 @@ -// Stubs for the address binder cronjob. These handle the direct interactions with DB -// and contracts. The actual logic is in address_binder.go, which is unit-tested. -package cronjob - -import ( - "flare-indexer/database" - "flare-indexer/indexer/config" - "flare-indexer/logger" - "flare-indexer/utils/chain" - "flare-indexer/utils/contracts/addresses" - "flare-indexer/utils/contracts/mirroring" - "flare-indexer/utils/contracts/voting" - "flare-indexer/utils/staking" - "math/big" - "time" - - "github.com/ava-labs/avalanchego/utils/crypto" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/pkg/errors" - "gorm.io/gorm" -) - -type addressBinderDBGorm struct { - db *gorm.DB -} - -func NewAddressBinderDBGorm(db *gorm.DB) addressBinderDB { - return addressBinderDBGorm{db: db} -} - -func (m addressBinderDBGorm) FetchState(name string) (database.State, error) { - return database.FetchState(m.db, name) -} - -func (m addressBinderDBGorm) UpdateJobState(epoch int64, force bool) error { - return m.db.Transaction(func(tx *gorm.DB) error { - jobState, err := database.FetchState(tx, addressBinderStateName) - if err != nil { - return errors.Wrap(err, "database.FetchState") - } - - if !force && jobState.NextDBIndex >= uint64(epoch) { - logger.Debug("job state already up to date") - return nil - } - - jobState.NextDBIndex = uint64(epoch) - return database.UpdateState(tx, &jobState) - }) -} - -func (m addressBinderDBGorm) GetPChainTxsForEpoch(start, end time.Time) ([]database.PChainTxData, error) { - return database.GetPChainTxsForEpoch(&database.GetPChainTxsForEpochInput{ - DB: m.db, - StartTimestamp: start, - EndTimestamp: end, - }) -} - -func (m addressBinderDBGorm) GetPChainTx(txID string, address string) (*database.PChainTxData, error) { - return database.FetchPChainTxData(m.db, txID, address) -} - -type addressBinderContractsCChain struct { - addressBinder *addresses.Binder - txOpts *bind.TransactOpts - voting *voting.Voting -} - -func initAddressBinderJobContracts(cfg *config.Config) (addressBinderContracts, error) { - if cfg.ContractAddresses.Mirroring == (common.Address{}) { - return nil, errors.New("mirroring contract address not set") - } - - if cfg.ContractAddresses.Voting == (common.Address{}) { - return nil, errors.New("voting contract address not set") - } - - eth, err := ethclient.Dial(cfg.Chain.EthRPCURL) - if err != nil { - return nil, err - } - - mirroringContract, err := mirroring.NewMirroring(cfg.ContractAddresses.Mirroring, eth) - if err != nil { - return nil, err - } - - votingContract, err := voting.NewVoting(cfg.ContractAddresses.Voting, eth) - if err != nil { - return nil, err - } - - addressBinderContract, err := newAddressBinderContract(eth, mirroringContract) - if err != nil { - return nil, err - } - - privateKey, err := cfg.Chain.GetPrivateKey() - if err != nil { - return nil, err - } - - txOpts, err := TransactOptsFromPrivateKey(privateKey, cfg.Chain.ChainID) - if err != nil { - return nil, err - } - - return &addressBinderContractsCChain{ - addressBinder: addressBinderContract, - txOpts: txOpts, - voting: votingContract, - }, nil -} - -func newAddressBinderContract( - eth *ethclient.Client, mirroringContract *mirroring.Mirroring, -) (*addresses.Binder, error) { - addressBinderAddress, err := mirroringContract.AddressBinder(new(bind.CallOpts)) - if err != nil { - return nil, err - } - - return addresses.NewBinder(addressBinderAddress, eth) -} - -func (m addressBinderContractsCChain) GetMerkleRoot(epoch int64) ([32]byte, error) { - return m.voting.GetMerkleRoot(new(bind.CallOpts), big.NewInt(epoch)) -} - -func (m addressBinderContractsCChain) IsAddressRegistered(address string) (bool, error) { - addressBytes, err := chain.ParseAddress(address) - if err != nil { - return false, err - } - boundAddress, err := m.addressBinder.PAddressToCAddress(new(bind.CallOpts), addressBytes) - if err != nil { - return false, err - } - return boundAddress != (common.Address{}), nil -} - -func (m addressBinderContractsCChain) RegisterPublicKey(publicKey crypto.PublicKey) error { - ethAddress, err := chain.PublicKeyToEthAddress(publicKey) - if err != nil { - return err - } - _, err = m.addressBinder.RegisterAddresses(m.txOpts, publicKey.Bytes(), publicKey.Address(), ethAddress) - return err -} - -func (m addressBinderContractsCChain) EpochConfig() (start time.Time, period time.Duration, err error) { - return staking.GetEpochConfig(m.voting) -} diff --git a/indexer/cronjob/migrations.go b/indexer/cronjob/migrations.go index fe7e6d4..c86b3d8 100644 --- a/indexer/cronjob/migrations.go +++ b/indexer/cronjob/migrations.go @@ -11,8 +11,6 @@ import ( func init() { migrations.Container.Add("2023-08-25-00-00", "Create initial state for voting cronjob", createVotingCronjobState) migrations.Container.Add("2023-08-30-00-00", "Create initial state for mirror cronjob", createMirrorCronjobState) - migrations.Container.Add("2023-09-30-00-00", "Create initial state for address binder cronjob", createAddressBinderCronjobState) - } func createVotingCronjobState(db *gorm.DB) error { @@ -32,12 +30,3 @@ func createMirrorCronjobState(db *gorm.DB) error { Updated: time.Now(), }) } - -func createAddressBinderCronjobState(db *gorm.DB) error { - return database.CreateState(db, &database.State{ - Name: addressBinderStateName, - NextDBIndex: 0, - LastChainIndex: 0, - Updated: time.Now(), - }) -} diff --git a/indexer/cronjob/mirror.go b/indexer/cronjob/mirror.go index e14b165..f5fce58 100644 --- a/indexer/cronjob/mirror.go +++ b/indexer/cronjob/mirror.go @@ -5,6 +5,7 @@ import ( indexerctx "flare-indexer/indexer/context" "flare-indexer/logger" "flare-indexer/utils" + "flare-indexer/utils/chain" "flare-indexer/utils/contracts/mirroring" "flare-indexer/utils/merkle" "flare-indexer/utils/staking" @@ -12,6 +13,8 @@ import ( "strings" "time" + "github.com/ava-labs/avalanchego/utils/crypto" + mapset "github.com/deckarep/golang-set/v2" "github.com/pkg/errors" ) @@ -22,6 +25,8 @@ type mirrorCronJob struct { db mirrorDB contracts mirrorContracts time utils.ShiftedTime + + registeredAddresses mapset.Set[string] } type mirrorDB interface { @@ -37,6 +42,8 @@ type mirrorContracts interface { stakeData *mirroring.IPChainStakeMirrorVerifierPChainStake, merkleProof [][32]byte, ) error + IsAddressRegistered(address string) (bool, error) + RegisterPublicKey(publicKey crypto.PublicKey) error EpochConfig() (time.Time, time.Duration, error) } @@ -63,6 +70,8 @@ func NewMirrorCronjob(ctx indexerctx.IndexerContext) (Cronjob, error) { epochCronjob: newEpochCronjob(&cfg.Mirror.CronjobConfig, epochs), db: NewMirrorDBGorm(ctx.DB()), contracts: contracts, + + registeredAddresses: mapset.NewSet[string](), } err = mc.reset(ctx.Flags().ResetMirrorCronjob) @@ -110,31 +119,55 @@ func (c *mirrorCronJob) Call() error { var errNoEpochsToMirror = errors.New("no epochs to mirror") func (c *mirrorCronJob) getEpochRange() (*epochRange, error) { - now := c.time.Now() - binderJobState, err := c.db.FetchState(addressBinderStateName) + startEpoch, err := c.getStartEpoch() if err != nil { return nil, err } - jobState, err := c.db.FetchState(mirrorStateName) + + logger.Debug("start epoch: %d", startEpoch) + + endEpoch, err := c.getEndEpoch(startEpoch) if err != nil { return nil, err } - startEpoch := int64(jobState.NextDBIndex) - endEpoch := int64(binderJobState.NextDBIndex) - 1 - for startEpoch <= endEpoch { - endTime := c.epochs.GetEndTime(endEpoch) - if endTime.After(now.Add(-c.epochs.Period)) { - endEpoch-- - } else { - break + logger.Debug("Mirroring needed for epochs [%d, %d]", startEpoch, endEpoch) + return c.getTrimmedEpochRange(startEpoch, endEpoch), nil +} + +func (c *mirrorCronJob) getStartEpoch() (int64, error) { + jobState, err := c.db.FetchState(mirrorStateName) + if err != nil { + return 0, err + } + + return int64(jobState.NextDBIndex), nil +} + +func (c *mirrorCronJob) getEndEpoch(startEpoch int64) (int64, error) { + currEpoch := c.epochs.GetEpochIndex(c.time.Now()) + logger.Debug("current epoch: %d", currEpoch) + + for epoch := currEpoch; epoch > startEpoch; epoch-- { + confirmed, err := c.isEpochConfirmed(epoch) + if err != nil { + return 0, err + } + + if confirmed { + return epoch, nil } } - if startEpoch > endEpoch { - return nil, errNoEpochsToMirror + + return 0, errNoEpochsToMirror +} + +func (c *mirrorCronJob) isEpochConfirmed(epoch int64) (bool, error) { + merkleRoot, err := c.contracts.GetMerkleRoot(epoch) + if err != nil { + return false, errors.Wrap(err, "votingContract.GetMerkleRoot") } - logger.Debug("Mirroring needed for epochs [%d, %d]", startEpoch, endEpoch) - return c.getTrimmedEpochRange(startEpoch, endEpoch), nil + return merkleRoot != [32]byte{}, nil } func (c *mirrorCronJob) mirrorEpoch(epoch int64) error { @@ -217,6 +250,14 @@ type mirrorTxInput struct { } func (c *mirrorCronJob) mirrorTx(in *mirrorTxInput) error { + + // Try to register address and wait until it is mined on timeout occurs + err := c.registerAddress(*in.tx.TxID, in.tx.InputAddress) + if err != nil { + // Non-fatal error, continue mirroring + logger.Error("error registering address: %s", err.Error()) + } + stakeData, err := staking.ToStakeData(in.tx) if err != nil { return err @@ -257,7 +298,48 @@ func (c *mirrorCronJob) mirrorTx(in *mirrorTxInput) error { return errors.Wrap(err, "mirroringContract.MirrorStake") } + return nil +} + +// Register address on AddressBinder contract if it is not already registered +// Checks receipt of tx to see if an error occurred +func (c *mirrorCronJob) registerAddress(txID string, address string) error { + // Avoid contract calls if address is already registered + if c.registeredAddresses.Contains(address) { + return nil + } + registered, err := c.contracts.IsAddressRegistered(address) + if err != nil { + return err + } + if registered { + c.registeredAddresses.Add(address) + return nil + } + tx, err := c.db.GetPChainTx(txID, address) + if err != nil { + return err + } + if tx == nil { + return errors.New("tx not found") + } + publicKeys, err := chain.PublicKeysFromPChainBlock(tx.Bytes) + if err != nil { + return err + } + if tx.InputIndex >= uint32(len(publicKeys)) { + return errors.New("input index out of range") + } + publicKey := publicKeys[tx.InputIndex] + for _, k := range publicKey { + err := c.contracts.RegisterPublicKey(k) + if err != nil { + return errors.Wrap(err, "mirroringContract.RegisterPublicKey") + } + } + c.registeredAddresses.Add(address) + logger.Info("registered address %s on address binder contract", address) return nil } diff --git a/indexer/cronjob/mirror_stubs.go b/indexer/cronjob/mirror_stubs.go index e523a6a..84d9bbe 100644 --- a/indexer/cronjob/mirror_stubs.go +++ b/indexer/cronjob/mirror_stubs.go @@ -6,12 +6,15 @@ import ( "flare-indexer/database" "flare-indexer/indexer/config" "flare-indexer/logger" + "flare-indexer/utils/chain" + "flare-indexer/utils/contracts/addresses" "flare-indexer/utils/contracts/mirroring" "flare-indexer/utils/contracts/voting" "flare-indexer/utils/staking" "math/big" "time" + "github.com/ava-labs/avalanchego/utils/crypto" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" @@ -62,9 +65,11 @@ func (m mirrorDBGorm) GetPChainTx(txID string, address string) (*database.PChain } type mirrorContractsCChain struct { - mirroring *mirroring.Mirroring - txOpts *bind.TransactOpts - voting *voting.Voting + mirroring *mirroring.Mirroring + addressBinder *addresses.Binder + txOpts *bind.TransactOpts + voting *voting.Voting + txVerifier *chain.TxVerifier } func initMirrorJobContracts(cfg *config.Config) (mirrorContracts, error) { @@ -91,6 +96,11 @@ func initMirrorJobContracts(cfg *config.Config) (mirrorContracts, error) { return nil, err } + addressBinderContract, err := newAddressBinderContract(eth, mirroringContract) + if err != nil { + return nil, err + } + privateKey, err := cfg.Chain.GetPrivateKey() if err != nil { return nil, err @@ -102,12 +112,25 @@ func initMirrorJobContracts(cfg *config.Config) (mirrorContracts, error) { } return &mirrorContractsCChain{ - mirroring: mirroringContract, - txOpts: txOpts, - voting: votingContract, + mirroring: mirroringContract, + addressBinder: addressBinderContract, + txOpts: txOpts, + voting: votingContract, + txVerifier: chain.NewTxVerifier(eth), }, nil } +func newAddressBinderContract( + eth *ethclient.Client, mirroringContract *mirroring.Mirroring, +) (*addresses.Binder, error) { + addressBinderAddress, err := mirroringContract.AddressBinder(new(bind.CallOpts)) + if err != nil { + return nil, err + } + + return addresses.NewBinder(addressBinderAddress, eth) +} + func (m mirrorContractsCChain) GetMerkleRoot(epoch int64) ([32]byte, error) { return m.voting.GetMerkleRoot(new(bind.CallOpts), big.NewInt(epoch)) } @@ -120,6 +143,30 @@ func (m mirrorContractsCChain) MirrorStake( return err } +func (m mirrorContractsCChain) IsAddressRegistered(address string) (bool, error) { + addressBytes, err := chain.ParseAddress(address) + if err != nil { + return false, err + } + boundAddress, err := m.addressBinder.PAddressToCAddress(new(bind.CallOpts), addressBytes) + if err != nil { + return false, err + } + return boundAddress != (common.Address{}), nil +} + +func (m mirrorContractsCChain) RegisterPublicKey(publicKey crypto.PublicKey) error { + ethAddress, err := chain.PublicKeyToEthAddress(publicKey) + if err != nil { + return err + } + tx, err := m.addressBinder.RegisterAddresses(m.txOpts, publicKey.Bytes(), publicKey.Address(), ethAddress) + if err != nil { + return err + } + return m.txVerifier.WaitUntilMined(m.txOpts.From, tx, 10*time.Second) +} + func (m mirrorContractsCChain) EpochConfig() (start time.Time, period time.Duration, err error) { return staking.GetEpochConfig(m.voting) } diff --git a/indexer/cronjob/mirror_test.go b/indexer/cronjob/mirror_test.go index ca5260a..643f72e 100644 --- a/indexer/cronjob/mirror_test.go +++ b/indexer/cronjob/mirror_test.go @@ -16,6 +16,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/crypto" "github.com/bradleyjkemp/cupaloy" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" "github.com/stretchr/testify/require" @@ -169,7 +170,7 @@ func TestMultipleTransactionsInSeparateEpochs(t *testing.T) { db := testMirror(t, txsMap, contracts) - require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(4)) + require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(3)) } func TestAlreadyMirrored(t *testing.T) { @@ -238,10 +239,6 @@ func testMirror( LastChainIndex: 2, }, mirrorStateName: {}, - addressBinderStateName: { - Updated: epochInfo.GetEndTime(999), - NextDBIndex: 4, - }, }, txs: txs, } @@ -253,6 +250,7 @@ func testMirror( enabled: true, epochs: epochInfo, }, + registeredAddresses: mapset.NewSet[string](), } err := j.Call() diff --git a/indexer/cronjob/voting.go b/indexer/cronjob/voting.go index 0d9600d..510dfee 100644 --- a/indexer/cronjob/voting.go +++ b/indexer/cronjob/voting.go @@ -113,7 +113,9 @@ func (c *votingCronjob) Call() error { epochRange := c.getEpochRange(int64(state.NextDBIndex), now) logger.Debug("Voting needed for epochs [%d, %d]", epochRange.start, epochRange.end) - c.metrics.lastEpoch.Set(float64(epochRange.end)) + if c.metrics != nil { + c.metrics.lastEpoch.Set(float64(epochRange.end)) + } votedInBatch := false for e := epochRange.start; e <= epochRange.end; e++ { @@ -141,7 +143,9 @@ func (c *votingCronjob) Call() error { if err := c.db.UpdateState(&state); err != nil { return err } - c.metrics.lastProcessedEpoch.Set(float64(e)) + if c.metrics != nil { + c.metrics.lastProcessedEpoch.Set(float64(e)) + } } logger.Debug("Voting not needed for epoch %d", e) } diff --git a/indexer/runner/runner.go b/indexer/runner/runner.go index e55a5c9..414cdbf 100644 --- a/indexer/runner/runner.go +++ b/indexer/runner/runner.go @@ -16,10 +16,6 @@ func Start(ctx context.IndexerContext) { if err != nil { log.Fatal(err) } - addressBinderCronjob, err := cronjob.NewAddressBinderCronjob(ctx) - if err != nil { - log.Fatal(err) - } mirrorCronjob, err := cronjob.NewMirrorCronjob(ctx) if err != nil { log.Fatal(err) @@ -35,7 +31,6 @@ func Start(ctx context.IndexerContext) { go cronjob.RunCronjob(uptimeCronjob) go cronjob.RunCronjob(votingCronjob) - go cronjob.RunCronjob(addressBinderCronjob) go cronjob.RunCronjob(mirrorCronjob) go cronjob.RunCronjob(uptimeVotingCronjob) } diff --git a/utils/chain/tx_utils.go b/utils/chain/tx_utils.go new file mode 100644 index 0000000..c8e3fb1 --- /dev/null +++ b/utils/chain/tx_utils.go @@ -0,0 +1,75 @@ +package chain + +import ( + "bytes" + "context" + "math/big" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/pkg/errors" +) + +type TxVerifier struct { + eth *ethclient.Client +} + +func NewTxVerifier(eth *ethclient.Client) *TxVerifier { + return &TxVerifier{eth: eth} +} + +func (t TxVerifier) WaitUntilMined(from common.Address, tx *types.Transaction, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + receipt, err := bind.WaitMined(ctx, t.eth, tx) + if err != nil { + return errors.Wrap(err, "bind.WaitMined") + } + if receipt.Status != types.ReceiptStatusSuccessful { + reason, err := errorReason(ctx, t.eth, from, tx, receipt.BlockNumber) + if err != nil { + return err + } + return errors.Errorf("tx failed: %s", reason) + } + return nil +} + +// Taken from: https://ethereum.stackexchange.com/questions/48383/how-to-retrieve-revert-reason-for-past-transactions +func errorReason(ctx context.Context, b ethereum.ContractCaller, from common.Address, tx *types.Transaction, blockNum *big.Int) (string, error) { + msg := ethereum.CallMsg{ + From: from, + To: tx.To(), + Gas: tx.Gas(), + GasPrice: tx.GasPrice(), + Value: tx.Value(), + Data: tx.Data(), + } + res, err := b.CallContract(ctx, msg, blockNum) + if err != nil { + return "", errors.Wrap(err, "CallContract") + } + return unpackError(res) +} + +var ( + errorSig = []byte{0x08, 0xc3, 0x79, 0xa0} // Keccak256("Error(string)")[:4] + abiString, _ = abi.NewType("string", "", nil) +) + +func unpackError(result []byte) (string, error) { + if !bytes.Equal(result[:4], errorSig) { + return "", errors.New("tx result not of type Error(string)") + } + vs, err := abi.Arguments{{Type: abiString}}.UnpackValues(result[4:]) + if err != nil { + return "", errors.Wrap(err, "unpacking revert reason") + } + return vs[0].(string), nil +} From a26eb345d8aaf19c9807e5fea990229a0f34f070 Mon Sep 17 00:00:00 2001 From: Marko Boben Date: Mon, 9 Oct 2023 18:41:38 +0200 Subject: [PATCH 2/9] Debug RegisterPublicKey --- indexer/cronjob/mirror_stubs.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/indexer/cronjob/mirror_stubs.go b/indexer/cronjob/mirror_stubs.go index 84d9bbe..3815388 100644 --- a/indexer/cronjob/mirror_stubs.go +++ b/indexer/cronjob/mirror_stubs.go @@ -160,11 +160,25 @@ func (m mirrorContractsCChain) RegisterPublicKey(publicKey crypto.PublicKey) err if err != nil { return err } + logger.Debug("Registering address %s", ethAddress) tx, err := m.addressBinder.RegisterAddresses(m.txOpts, publicKey.Bytes(), publicKey.Address(), ethAddress) if err != nil { return err } - return m.txVerifier.WaitUntilMined(m.txOpts.From, tx, 10*time.Second) + err = m.txVerifier.WaitUntilMined(m.txOpts.From, tx, 20*time.Second) + if err != nil { + return err + } + logger.Debug("Registering address %s mined (tx %s)", ethAddress, tx.Hash().Hex()) + // Check + a := publicKey.Address() + ab, _ := chain.FormatAddressBytes(a[:]) + check, err := m.IsAddressRegistered(ab) + if err != nil { + return err + } + logger.Debug("Address is registered: %v", check) + return nil } func (m mirrorContractsCChain) EpochConfig() (start time.Time, period time.Duration, err error) { From 6a589ed2fc39baa52e237458f22dc66ce1b206e9 Mon Sep 17 00:00:00 2001 From: Marko Boben Date: Mon, 9 Oct 2023 21:35:00 +0200 Subject: [PATCH 3/9] Removed code to debug address registration --- indexer/cronjob/mirror_stubs.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/indexer/cronjob/mirror_stubs.go b/indexer/cronjob/mirror_stubs.go index 3815388..3aa4edd 100644 --- a/indexer/cronjob/mirror_stubs.go +++ b/indexer/cronjob/mirror_stubs.go @@ -160,24 +160,15 @@ func (m mirrorContractsCChain) RegisterPublicKey(publicKey crypto.PublicKey) err if err != nil { return err } - logger.Debug("Registering address %s", ethAddress) tx, err := m.addressBinder.RegisterAddresses(m.txOpts, publicKey.Bytes(), publicKey.Address(), ethAddress) if err != nil { return err } - err = m.txVerifier.WaitUntilMined(m.txOpts.From, tx, 20*time.Second) + err = m.txVerifier.WaitUntilMined(m.txOpts.From, tx, 60*time.Second) if err != nil { return err } - logger.Debug("Registering address %s mined (tx %s)", ethAddress, tx.Hash().Hex()) - // Check - a := publicKey.Address() - ab, _ := chain.FormatAddressBytes(a[:]) - check, err := m.IsAddressRegistered(ab) - if err != nil { - return err - } - logger.Debug("Address is registered: %v", check) + logger.Debug("Mined tx %s to register adddress %s", tx.Hash().Hex(), ethAddress) return nil } From c2f65a2c11f661f9ad9feaf42ebd483f2f6b9009 Mon Sep 17 00:00:00 2001 From: Marko Boben Date: Mon, 9 Oct 2023 23:19:54 +0200 Subject: [PATCH 4/9] Wait for voting transaction to be mined before advancing processed epoch --- indexer/cronjob/uptime_voting.go | 8 +++++- indexer/cronjob/voting.go | 40 ++++++++++++++--------------- indexer/cronjob/voting_stubs.go | 44 ++++++++++++++++++++------------ 3 files changed, 53 insertions(+), 39 deletions(-) diff --git a/indexer/cronjob/uptime_voting.go b/indexer/cronjob/uptime_voting.go index 9f8a7a7..72d8543 100644 --- a/indexer/cronjob/uptime_voting.go +++ b/indexer/cronjob/uptime_voting.go @@ -16,6 +16,7 @@ import ( "github.com/ava-labs/avalanchego/ids" mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/ethclient" "github.com/pkg/errors" "gorm.io/gorm" ) @@ -54,7 +55,12 @@ func NewUptimeVotingCronjob(ctx context.IndexerContext) (*uptimeVotingCronjob, e return &uptimeVotingCronjob{}, nil } - votingContract, err := newVotingContract(cfg) + eth, err := ethclient.Dial(cfg.Chain.EthRPCURL) + if err != nil { + return nil, err + } + + votingContract, err := voting.NewVoting(cfg.ContractAddresses.Voting, eth) if err != nil { return nil, err } diff --git a/indexer/cronjob/voting.go b/indexer/cronjob/voting.go index 510dfee..1574dc6 100644 --- a/indexer/cronjob/voting.go +++ b/indexer/cronjob/voting.go @@ -117,7 +117,6 @@ func (c *votingCronjob) Call() error { c.metrics.lastEpoch.Set(float64(epochRange.end)) } - votedInBatch := false for e := epochRange.start; e <= epochRange.end; e++ { start, end := c.epochs.GetTimeRange(e) @@ -130,39 +129,32 @@ func (c *votingCronjob) Call() error { if err != nil { return err } - voted, err := c.submitVotes(e, votingData) + err = c.submitVotes(e, votingData) if err != nil { return err } - if voted { - votedInBatch = true - logger.Info("Submitted vote for epoch %d", e) - } else { - if !votedInBatch { - state.NextDBIndex = uint64(e + 1) - if err := c.db.UpdateState(&state); err != nil { - return err - } - if c.metrics != nil { - c.metrics.lastProcessedEpoch.Set(float64(e)) - } - } - logger.Debug("Voting not needed for epoch %d", e) + state.NextDBIndex = uint64(e + 1) + if err := c.db.UpdateState(&state); err != nil { + return err + } + if c.metrics != nil { + c.metrics.lastProcessedEpoch.Set(float64(e)) } } return nil } // Return true if the vote was submitted, and false if shouldVote returned false -func (c *votingCronjob) submitVotes(e int64, votingData []database.PChainTxData) (bool, error) { +func (c *votingCronjob) submitVotes(e int64, votingData []database.PChainTxData) error { votingData = staking.DedupeTxs(votingData) shouldVote, err := c.contract.ShouldVote(big.NewInt(e)) if err != nil { - return false, err + return err } if !shouldVote { - return false, nil + logger.Debug("Voting not needed for epoch %d", e) + return nil } var merkleRoot common.Hash @@ -171,11 +163,17 @@ func (c *votingCronjob) submitVotes(e int64, votingData []database.PChainTxData) } else { merkleRoot, err = staking.GetMerkleRoot(votingData) if err != nil { - return false, err + return err } } + + // Submit vote and wait for the transaction to be mined err = c.contract.SubmitVote(big.NewInt(e), [32]byte(merkleRoot)) - return true, err + if err != nil { + return err + } + logger.Info("Submitted vote for epoch %d", e) + return nil } func (c *votingCronjob) reset(firstEpoch int64) error { diff --git a/indexer/cronjob/voting_stubs.go b/indexer/cronjob/voting_stubs.go index a310291..141ccf9 100644 --- a/indexer/cronjob/voting_stubs.go +++ b/indexer/cronjob/voting_stubs.go @@ -3,6 +3,8 @@ package cronjob import ( "flare-indexer/database" "flare-indexer/indexer/config" + "flare-indexer/logger" + "flare-indexer/utils/chain" "flare-indexer/utils/contracts/voting" "flare-indexer/utils/staking" "math/big" @@ -30,13 +32,20 @@ func (db *votingDBGorm) UpdateState(state *database.State) error { } type votingContractCChain struct { - callOpts *bind.CallOpts - txOpts *bind.TransactOpts - voting *voting.Voting + callOpts *bind.CallOpts + txOpts *bind.TransactOpts + voting *voting.Voting + txVerifier *chain.TxVerifier } func newVotingContractCChain(cfg *config.Config) (votingContract, error) { - votingContract, err := newVotingContract(cfg) + + eth, err := ethclient.Dial(cfg.Chain.EthRPCURL) + if err != nil { + return nil, err + } + + votingContract, err := voting.NewVoting(cfg.ContractAddresses.Voting, eth) if err != nil { return nil, err } @@ -55,27 +64,28 @@ func newVotingContractCChain(cfg *config.Config) (votingContract, error) { callOpts := &bind.CallOpts{From: txOpts.From} return &votingContractCChain{ - callOpts: callOpts, - txOpts: txOpts, - voting: votingContract, + callOpts: callOpts, + txOpts: txOpts, + voting: votingContract, + txVerifier: chain.NewTxVerifier(eth), }, nil } -func newVotingContract(cfg *config.Config) (*voting.Voting, error) { - eth, err := ethclient.Dial(cfg.Chain.EthRPCURL) - if err != nil { - return nil, err - } - return voting.NewVoting(cfg.ContractAddresses.Voting, eth) -} - func (c *votingContractCChain) ShouldVote(epoch *big.Int) (bool, error) { return c.voting.ShouldVote(c.callOpts, epoch, c.callOpts.From) } func (c *votingContractCChain) SubmitVote(epoch *big.Int, merkleRoot [32]byte) error { - _, err := c.voting.SubmitVote(c.txOpts, epoch, merkleRoot) - return err + tx, err := c.voting.SubmitVote(c.txOpts, epoch, merkleRoot) + if err != nil { + return err + } + err = c.txVerifier.WaitUntilMined(c.callOpts.From, tx, 60*time.Second) + if err != nil { + return err + } + logger.Debug("Mined voting tx %s", tx.Hash().Hex()) + return nil } func (c *votingContractCChain) EpochConfig() (start time.Time, period time.Duration, err error) { From af95e4a660d86cb4cd9fe13c987c0defc00b980c Mon Sep 17 00:00:00 2001 From: Marko Boben Date: Mon, 9 Oct 2023 23:50:08 +0200 Subject: [PATCH 5/9] Fixed voting test --- indexer/cronjob/voting_test.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/indexer/cronjob/voting_test.go b/indexer/cronjob/voting_test.go index 9914eab..7a6214a 100644 --- a/indexer/cronjob/voting_test.go +++ b/indexer/cronjob/voting_test.go @@ -119,13 +119,7 @@ func TestVotes(t *testing.T) { cupaloy.SnapshotT(t, contract.submittedVotes) updatedState := db.states[votingStateName] - require.Equal(t, updatedState.NextDBIndex, uint64(1)) - - err = cronjob.Call() - require.NoError(t, err) - - updatedState = db.states[votingStateName] - require.Equal(t, updatedState.NextDBIndex, uint64(6)) + require.Equal(t, updatedState.NextDBIndex, uint64(5)) } func timeRangeForEpoch(cj epochCronjob, epoch int64) timeRange { From 0dac6db5ab2196c1f2b3bbc8fa1d98f5fea2cff2 Mon Sep 17 00:00:00 2001 From: Marko Boben Date: Tue, 10 Oct 2023 14:59:21 +0200 Subject: [PATCH 6/9] Revert of transaction with "epoch already finalized" is not an error --- indexer/cronjob/voting_stubs.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/indexer/cronjob/voting_stubs.go b/indexer/cronjob/voting_stubs.go index 141ccf9..19858d8 100644 --- a/indexer/cronjob/voting_stubs.go +++ b/indexer/cronjob/voting_stubs.go @@ -8,6 +8,7 @@ import ( "flare-indexer/utils/contracts/voting" "flare-indexer/utils/staking" "math/big" + "strings" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -82,6 +83,10 @@ func (c *votingContractCChain) SubmitVote(epoch *big.Int, merkleRoot [32]byte) e } err = c.txVerifier.WaitUntilMined(c.callOpts.From, tx, 60*time.Second) if err != nil { + if strings.Contains(err.Error(), "epoch already finalized") { + logger.Info("Epoch %s already finalized", epoch.String()) + return nil + } return err } logger.Debug("Mined voting tx %s", tx.Hash().Hex()) From a8f88e811e87c0a5c259c5d4e1c7134383209331 Mon Sep 17 00:00:00 2001 From: Marko Boben Date: Wed, 11 Oct 2023 14:30:18 +0200 Subject: [PATCH 7/9] Updated prometheus metrics, added health api endpoint --- indexer/cronjob/cronjob.go | 57 ++++++---------- indexer/cronjob/metrics.go | 33 +++++++++ indexer/cronjob/mirror.go | 9 ++- indexer/cronjob/uptime.go | 23 +++++-- indexer/cronjob/uptime_voting.go | 2 + indexer/cronjob/voting.go | 13 ++-- indexer/shared/indexer.go | 22 ++++-- indexer/shared/indexer_metrics.go | 14 +++- indexer/shared/metrics.go | 107 ++++++++++++++++++++++++++++++ indexer/shared/prometheus.go | 29 -------- 10 files changed, 221 insertions(+), 88 deletions(-) create mode 100644 indexer/cronjob/metrics.go create mode 100644 indexer/shared/metrics.go delete mode 100644 indexer/shared/prometheus.go diff --git a/indexer/cronjob/cronjob.go b/indexer/cronjob/cronjob.go index 98afd73..43791e0 100644 --- a/indexer/cronjob/cronjob.go +++ b/indexer/cronjob/cronjob.go @@ -3,12 +3,11 @@ package cronjob import ( "flare-indexer/database" "flare-indexer/indexer/config" + "flare-indexer/indexer/shared" "flare-indexer/logger" "flare-indexer/utils" "flare-indexer/utils/staking" "time" - - "github.com/prometheus/client_golang/prometheus" ) type Cronjob interface { @@ -18,12 +17,16 @@ type Cronjob interface { RandomTimeoutDelta() time.Duration Call() error OnStart() error - OnError(err error) + + // Set health status of cronjob + // (can be implemented to ignore the status based on other conditions) + UpdateCronjobStatus(status shared.HealthStatus) } func RunCronjob(c Cronjob) { if !c.Enabled() { logger.Debug("%s cronjob disabled", c.Name()) + c.UpdateCronjobStatus(shared.HealthStatusOk) return } @@ -38,10 +41,13 @@ func RunCronjob(c Cronjob) { ticker := utils.NewRandomizedTicker(c.Timeout(), c.RandomTimeoutDelta()) for { <-ticker + err := c.Call() - if err != nil { + if err == nil { + c.UpdateCronjobStatus(shared.HealthStatusOk) + } else { logger.Error("%s cronjob error %s", c.Name(), err.Error()) - c.OnError(err) + c.UpdateCronjobStatus(shared.HealthStatusError) } } } @@ -64,17 +70,6 @@ type epochRange struct { end int64 } -type epochCronjobMetrics struct { - // Current epoch - lastEpoch prometheus.Gauge - - // Last processsed epoch - lastProcessedEpoch prometheus.Gauge - - // Error count - errorCount prometheus.Counter -} - func newEpochCronjob(cronjobCfg *config.CronjobConfig, epochs staking.EpochInfo) epochCronjob { return epochCronjob{ enabled: cronjobCfg.Enabled, @@ -97,9 +92,9 @@ func (c *epochCronjob) RandomTimeoutDelta() time.Duration { return 0 } -func (c *epochCronjob) OnError(err error) { +func (c *epochCronjob) UpdateCronjobStatus(status shared.HealthStatus) { if c.metrics != nil { - c.metrics.errorCount.Inc() + c.metrics.SetStatus(status) } } @@ -128,22 +123,14 @@ func (c *epochCronjob) indexerBehind(idxState *database.State, epoch int64) bool return epochEnd.After(idxState.Updated.Add(-c.delay)) || idxState.NextDBIndex <= idxState.LastChainIndex } -func newEpochCronjobMetrics(namespace string) *epochCronjobMetrics { - return &epochCronjobMetrics{ - lastEpoch: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "last_epoch", - Help: "Last completed epoch", - }), - lastProcessedEpoch: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "last_processed_epoch", - Help: "Last processed epoch", - }), - errorCount: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: "error_count", - Help: "Number of errors", - }), +func (c *epochCronjob) updateLastEpochMetrics(epoch int64) { + if c.metrics != nil { + c.metrics.lastEpoch.Set(float64(epoch)) + } +} + +func (c *epochCronjob) updateLastProcessedEpochMetrics(epoch int64) { + if c.metrics != nil { + c.metrics.lastProcessedEpoch.Set(float64(epoch)) } } diff --git a/indexer/cronjob/metrics.go b/indexer/cronjob/metrics.go new file mode 100644 index 0000000..26ab010 --- /dev/null +++ b/indexer/cronjob/metrics.go @@ -0,0 +1,33 @@ +package cronjob + +import ( + "flare-indexer/indexer/shared" + + "github.com/prometheus/client_golang/prometheus" +) + +type epochCronjobMetrics struct { + shared.MetricsBase + + // Current epoch + lastEpoch prometheus.Gauge + + // Last processsed epoch + lastProcessedEpoch prometheus.Gauge +} + +func newEpochCronjobMetrics(namespace string) *epochCronjobMetrics { + return &epochCronjobMetrics{ + MetricsBase: *shared.NewMetricsBase(namespace), + lastEpoch: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "last_epoch", + Help: "Last completed epoch", + }), + lastProcessedEpoch: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "last_processed_epoch", + Help: "Last processed epoch", + }), + } +} diff --git a/indexer/cronjob/mirror.go b/indexer/cronjob/mirror.go index f5fce58..a47d571 100644 --- a/indexer/cronjob/mirror.go +++ b/indexer/cronjob/mirror.go @@ -75,12 +75,17 @@ func NewMirrorCronjob(ctx indexerctx.IndexerContext) (Cronjob, error) { } err = mc.reset(ctx.Flags().ResetMirrorCronjob) + if err != nil { + return nil, err + } + + mc.metrics = newEpochCronjobMetrics(mirrorStateName) return mc, err } func (c *mirrorCronJob) Name() string { - return "mirror" + return mirrorStateName } func (c *mirrorCronJob) OnStart() error { @@ -99,6 +104,7 @@ func (c *mirrorCronJob) Call() error { } logger.Debug("mirroring epochs %d-%d", epochRange.start, epochRange.end) + c.updateLastEpochMetrics(epochRange.end) for epoch := epochRange.start; epoch <= epochRange.end; epoch++ { logger.Debug("mirroring epoch %d", epoch) @@ -112,6 +118,7 @@ func (c *mirrorCronJob) Call() error { if err := c.db.UpdateJobState(epochRange.end+1, false); err != nil { return err } + c.updateLastProcessedEpochMetrics(epochRange.end) return nil } diff --git a/indexer/cronjob/uptime.go b/indexer/cronjob/uptime.go index f9a76e1..a9e197a 100644 --- a/indexer/cronjob/uptime.go +++ b/indexer/cronjob/uptime.go @@ -4,6 +4,7 @@ import ( "flare-indexer/database" "flare-indexer/indexer/config" "flare-indexer/indexer/context" + "flare-indexer/indexer/shared" "flare-indexer/utils" "flare-indexer/utils/chain" "time" @@ -11,24 +12,28 @@ import ( "gorm.io/gorm" ) +const uptimeCronjobName = "uptime_cronjob" + type uptimeCronjob struct { config config.UptimeConfig db *gorm.DB - client chain.UptimeClient + client chain.UptimeClient + metrics *shared.MetricsBase } func NewUptimeCronjob(ctx context.IndexerContext) Cronjob { endpoint := utils.JoinPaths(ctx.Config().Chain.NodeURL, "ext/bc/P"+chain.RPCClientOptions(ctx.Config().Chain.ApiKey)) return &uptimeCronjob{ - config: ctx.Config().UptimeCronjob, - db: ctx.DB(), - client: chain.NewAvalancheUptimeClient(endpoint), + config: ctx.Config().UptimeCronjob, + db: ctx.DB(), + client: chain.NewAvalancheUptimeClient(endpoint), + metrics: shared.NewMetricsBase(uptimeCronjobName), } } func (c *uptimeCronjob) Name() string { - return "uptime" + return uptimeCronjobName } func (c *uptimeCronjob) Timeout() time.Duration { @@ -43,7 +48,13 @@ func (c *uptimeCronjob) RandomTimeoutDelta() time.Duration { return 0 } -func (c *uptimeCronjob) OnError(err error) { +func (c *uptimeCronjob) UpdateCronjobStatus(status shared.HealthStatus) { + if c.metrics != nil { + c.metrics.SetStatus(status) + } +} + +func (c *uptimeCronjob) SetHealthStatus(status shared.HealthStatus) { } func (c *uptimeCronjob) OnStart() error { diff --git a/indexer/cronjob/uptime_voting.go b/indexer/cronjob/uptime_voting.go index 72d8543..bd8cc8d 100644 --- a/indexer/cronjob/uptime_voting.go +++ b/indexer/cronjob/uptime_voting.go @@ -120,6 +120,7 @@ func (c *uptimeVotingCronjob) Call() error { var aggregations []*database.UptimeAggregation lastAggregatedEpoch := c.lastAggregatedEpoch + c.updateLastEpochMetrics(epochRange.end) // Aggregate missing epochs for all nodes for epoch := epochRange.start; epoch <= epochRange.end; epoch++ { @@ -149,6 +150,7 @@ func (c *uptimeVotingCronjob) Call() error { return fmt.Errorf("failed persisting uptime aggregations %w", err) } c.lastAggregatedEpoch = lastAggregatedEpoch + c.updateLastProcessedEpochMetrics(lastAggregatedEpoch) err = c.deleteOldUptimes() if err != nil { diff --git a/indexer/cronjob/voting.go b/indexer/cronjob/voting.go index 1574dc6..3bb40a2 100644 --- a/indexer/cronjob/voting.go +++ b/indexer/cronjob/voting.go @@ -33,8 +33,7 @@ type votingCronjob struct { contract votingContract // For testing to set "now" to some past date - time utils.ShiftedTime - metrics *epochCronjobMetrics + time utils.ShiftedTime } type votingDB interface { @@ -85,7 +84,7 @@ func NewVotingCronjob(ctx indexerctx.IndexerContext) (*votingCronjob, error) { } func (c *votingCronjob) Name() string { - return "voting" + return votingStateName } func (c *votingCronjob) OnStart() error { @@ -113,9 +112,7 @@ func (c *votingCronjob) Call() error { epochRange := c.getEpochRange(int64(state.NextDBIndex), now) logger.Debug("Voting needed for epochs [%d, %d]", epochRange.start, epochRange.end) - if c.metrics != nil { - c.metrics.lastEpoch.Set(float64(epochRange.end)) - } + c.updateLastEpochMetrics(epochRange.end) for e := epochRange.start; e <= epochRange.end; e++ { start, end := c.epochs.GetTimeRange(e) @@ -137,9 +134,7 @@ func (c *votingCronjob) Call() error { if err := c.db.UpdateState(&state); err != nil { return err } - if c.metrics != nil { - c.metrics.lastProcessedEpoch.Set(float64(e)) - } + c.updateLastProcessedEpochMetrics(e) } return nil } diff --git a/indexer/shared/indexer.go b/indexer/shared/indexer.go index 45001cd..85152a6 100644 --- a/indexer/shared/indexer.go +++ b/indexer/shared/indexer.go @@ -56,14 +56,17 @@ func (ci *ChainIndexerBase) IndexBatch() error { // Nothing to do; no new containers logger.Debug("Nothing to do. Last index %d < next to process %d", lastIndex, nextIndex) - duration := time.Since(startTime).Milliseconds() + // Update time of last run (for other clients to know that the indexer is running) + currentState.UpdateTime() + if err := database.UpdateState(ci.DB, ¤tState); err != nil { + return err + } + if ci.metrics != nil { + duration := time.Since(startTime).Milliseconds() ci.metrics.Update(currentState.LastChainIndex, currentState.NextDBIndex-1, duration) } - - // Update time of last run (for other clients to know that the indexer is running) - currentState.UpdateTime() - return database.UpdateState(ci.DB, ¤tState) + return nil } // Get MaxBatch containers from the chain @@ -122,6 +125,8 @@ func (ci *ChainIndexerBase) ProcessContainers(nextIndex uint64, containers []ind func (ci *ChainIndexerBase) Run() { if !ci.Config.Enabled { + logger.Debug("%s indexer is disabled", ci.IndexerName) + ci.SetStatus(HealthStatusOk) return } ticker := time.NewTicker(ci.Config.Timeout) @@ -129,6 +134,7 @@ func (ci *ChainIndexerBase) Run() { err := ci.IndexBatch() if err != nil { logger.Error("%s indexer error %v", ci.IndexerName, err) + ci.SetStatus(HealthStatusError) } } } @@ -136,3 +142,9 @@ func (ci *ChainIndexerBase) Run() { func (ci *ChainIndexerBase) InitMetrics(namespace string) { ci.metrics = newMetrics(namespace) } + +func (ci *ChainIndexerBase) SetStatus(status HealthStatus) { + if ci.metrics != nil { + ci.metrics.SetStatus(status) + } +} diff --git a/indexer/shared/indexer_metrics.go b/indexer/shared/indexer_metrics.go index f514ac0..21be8b6 100644 --- a/indexer/shared/indexer_metrics.go +++ b/indexer/shared/indexer_metrics.go @@ -6,6 +6,8 @@ import ( ) type metrics struct { + MetricsBase + // Last accepted index on the chain lastAcceptedIndex prometheus.Gauge @@ -18,6 +20,7 @@ type metrics struct { func newMetrics(namespace string) *metrics { return &metrics{ + MetricsBase: *NewMetricsBase(namespace), lastAcceptedIndex: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Name: "last_accepted_index", @@ -36,8 +39,13 @@ func newMetrics(namespace string) *metrics { } } -func (m *metrics) Update(newAcceptedCount uint64, newProcessedCount uint64, processingTime int64) { - m.lastAcceptedIndex.Set(float64(newAcceptedCount)) - m.lastProcessedIndex.Set(float64(newProcessedCount)) +func (m *metrics) Update(lastAcceptedIndex uint64, lastProcessedIndex uint64, processingTime int64) { + m.lastAcceptedIndex.Set(float64(lastAcceptedIndex)) + m.lastProcessedIndex.Set(float64(lastProcessedIndex)) m.processingTime.Set(float64(processingTime)) + if lastAcceptedIndex > lastProcessedIndex { + m.SetStatus(HealthStatusSyncing) + } else { + m.SetStatus(HealthStatusOk) + } } diff --git a/indexer/shared/metrics.go b/indexer/shared/metrics.go new file mode 100644 index 0000000..3c5b1c1 --- /dev/null +++ b/indexer/shared/metrics.go @@ -0,0 +1,107 @@ +package shared + +import ( + "flare-indexer/indexer/config" + "log" + "net/http" + "strings" + + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type HealthStatus int + +const ( + HealthStatusInitializing HealthStatus = 0 // Default prometheus Gauge value, thus it indicates that it was not updated yet + HealthStatusOk HealthStatus = 1 + HealthStatusError HealthStatus = -1 + HealthStatusSyncing HealthStatus = -2 +) + +type MetricsBase struct { + // status onf the client, see HealthStatus constants + status prometheus.Gauge +} + +func NewMetricsBase(namespace string) *MetricsBase { + m := &MetricsBase{ + status: promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "health_status", + Help: "Status of the client (0 - initializing, 1 - ok, -1 - error, -2 - syncing)", + }), + } + m.status.Set(float64(HealthStatusInitializing)) + return m +} + +func (m *MetricsBase) SetStatus(status HealthStatus) { + m.status.Set(float64(status)) +} + +func InitMetricsServer(cfg *config.MetricsConfig) { + if len(cfg.PrometheusAddress) == 0 { + return + } + + r := mux.NewRouter() + + r.Path("/metrics").Handler(promhttp.Handler()) + r.Path("/health").HandlerFunc(healthHandler) + + srv := &http.Server{ + Addr: cfg.PrometheusAddress, + Handler: r, + } + go func() { + err := srv.ListenAndServe() + log.Fatal(err) + }() +} + +func healthHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + err := writeHealthResponse(w) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } +} + +func writeHealthResponse(w http.ResponseWriter) (err error) { + ok, err := getHealthStatus() + if err != nil { + return + } + if ok { + _, err = w.Write([]byte("true")) + } else { + _, err = w.Write([]byte("false")) + } + return +} + +func getHealthStatus() (bool, error) { + gatherer := prometheus.DefaultGatherer + mfs, err := gatherer.Gather() + if err != nil { + return false, err + } + + // Check metrics with suffix "health_status" + // Status is healthy if all of them have value HealthStatusOk + for _, mf := range mfs { + if strings.HasSuffix(mf.GetName(), "health_status") { + for _, m := range mf.GetMetric() { + if g := m.GetGauge(); g != nil { + if g.GetValue() != float64(HealthStatusOk) { + return false, nil + } + } + } + } + } + return true, nil +} diff --git a/indexer/shared/prometheus.go b/indexer/shared/prometheus.go deleted file mode 100644 index a28dccb..0000000 --- a/indexer/shared/prometheus.go +++ /dev/null @@ -1,29 +0,0 @@ -package shared - -import ( - "flare-indexer/indexer/config" - "log" - "net/http" - - "github.com/gorilla/mux" - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -func InitMetricsServer(cfg *config.MetricsConfig) { - if len(cfg.PrometheusAddress) == 0 { - return - } - - r := mux.NewRouter() - - r.Path("/metrics").Handler(promhttp.Handler()) - - srv := &http.Server{ - Addr: cfg.PrometheusAddress, - Handler: r, - } - go func() { - err := srv.ListenAndServe() - log.Fatal(err) - }() -} From dd2604ff104adaeb8d4501c1486756ebc2a10ae5 Mon Sep 17 00:00:00 2001 From: Marko Boben Date: Wed, 11 Oct 2023 15:28:19 +0200 Subject: [PATCH 8/9] Init metrics in uptime voting cronjob --- indexer/cronjob/metrics.go | 5 +++-- indexer/cronjob/uptime_voting.go | 7 ++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/indexer/cronjob/metrics.go b/indexer/cronjob/metrics.go index 26ab010..d4c97e6 100644 --- a/indexer/cronjob/metrics.go +++ b/indexer/cronjob/metrics.go @@ -4,6 +4,7 @@ import ( "flare-indexer/indexer/shared" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) type epochCronjobMetrics struct { @@ -19,12 +20,12 @@ type epochCronjobMetrics struct { func newEpochCronjobMetrics(namespace string) *epochCronjobMetrics { return &epochCronjobMetrics{ MetricsBase: *shared.NewMetricsBase(namespace), - lastEpoch: prometheus.NewGauge(prometheus.GaugeOpts{ + lastEpoch: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Name: "last_epoch", Help: "Last completed epoch", }), - lastProcessedEpoch: prometheus.NewGauge(prometheus.GaugeOpts{ + lastProcessedEpoch: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Name: "last_processed_epoch", Help: "Last processed epoch", diff --git a/indexer/cronjob/uptime_voting.go b/indexer/cronjob/uptime_voting.go index bd8cc8d..f915ff2 100644 --- a/indexer/cronjob/uptime_voting.go +++ b/indexer/cronjob/uptime_voting.go @@ -21,6 +21,10 @@ import ( "gorm.io/gorm" ) +const ( + uptimeVotingCronjobName = "uptime_voting_cronjob" +) + var ( errNoEpochsToAggregate = errors.New("no epochs to aggregate") ) @@ -81,6 +85,7 @@ func NewUptimeVotingCronjob(ctx context.IndexerContext) (*uptimeVotingCronjob, e enabled: config.EnableVoting, timeout: config.Timeout, epochs: staking.NewEpochInfo(&globalConfig.EpochConfig{First: config.First}, config.Start.Time, config.Period), + metrics: newEpochCronjobMetrics(uptimeVotingCronjobName), }, lastAggregatedEpoch: -1, deleteOldUptimesEpochThreshold: config.DeleteOldUptimesEpochThreshold, @@ -93,7 +98,7 @@ func NewUptimeVotingCronjob(ctx context.IndexerContext) (*uptimeVotingCronjob, e } func (c *uptimeVotingCronjob) Name() string { - return "uptime_aggregator" + return uptimeVotingCronjobName } func (c *uptimeVotingCronjob) Timeout() time.Duration { From b31aeccf30819c5889b9c6dcdd9362b141a41b6a Mon Sep 17 00:00:00 2001 From: Marko Boben Date: Thu, 12 Oct 2023 11:42:55 +0200 Subject: [PATCH 9/9] Removed unused function --- indexer/cronjob/uptime.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/indexer/cronjob/uptime.go b/indexer/cronjob/uptime.go index a9e197a..1a9ead7 100644 --- a/indexer/cronjob/uptime.go +++ b/indexer/cronjob/uptime.go @@ -54,9 +54,6 @@ func (c *uptimeCronjob) UpdateCronjobStatus(status shared.HealthStatus) { } } -func (c *uptimeCronjob) SetHealthStatus(status shared.HealthStatus) { -} - func (c *uptimeCronjob) OnStart() error { entities := []*database.UptimeCronjob{&database.UptimeCronjob{ NodeID: nil,