Skip to content

Commit

Permalink
Merge pull request #26 from flare-foundation/testing-fixes
Browse files Browse the repository at this point in the history
Testing fixes
  • Loading branch information
mboben authored Oct 4, 2023
2 parents c3f1717 + 2e66e7e commit 84f2d44
Show file tree
Hide file tree
Showing 18 changed files with 519 additions and 196 deletions.
4 changes: 2 additions & 2 deletions database/pchain_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func FetchPChainStakingTransactions(
query = query.Joins("left join p_chain_tx_inputs as inputs on inputs.tx_id = p_chain_txes.tx_id").
Where("inputs.address = ?", address)
}
err := query.Offset(offset).Limit(limit).Order("p_chain_txes.id").
err := query.Offset(offset).Limit(limit).Order("p_chain_txes.tx_id").
Distinct().Select("p_chain_txes.tx_id").Find(&validatorTxs).Error
if err != nil {
return nil, err
Expand Down Expand Up @@ -140,7 +140,7 @@ func FetchPChainTransferTransactions(
Where("inputs.address = ?", address)
}
}
err := query.Offset(offset).Limit(limit).Order("p_chain_txes.id").
err := query.Offset(offset).Limit(limit).Order("p_chain_txes.tx_id").
Distinct().Select("p_chain_txes.tx_id").Find(&txs).Error
if err != nil {
return nil, err
Expand Down
248 changes: 248 additions & 0 deletions indexer/cronjob/address_binder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package cronjob

import (
"flare-indexer/database"
indexerctx "flare-indexer/indexer/context"
"flare-indexer/logger"
"flare-indexer/utils"
"flare-indexer/utils/chain"
"flare-indexer/utils/staking"
"time"

"github.com/ava-labs/avalanchego/utils/crypto"
mapset "github.com/deckarep/golang-set/v2"
"github.com/pkg/errors"
)

const addressBinderStateName = "address_binder_cronjob"

type addressBinderCronJob struct {
epochCronjob
db addressBinderDB
contracts addressBinderContracts
time utils.ShiftedTime
}

type addressBinderDB interface {
FetchState(name string) (database.State, error)
UpdateJobState(epoch int64, force bool) error
GetPChainTxsForEpoch(start, end time.Time) ([]database.PChainTxData, error)
GetPChainTx(txID string, address string) (*database.PChainTxData, error)
}

type addressBinderContracts interface {
GetMerkleRoot(epoch int64) ([32]byte, error)
IsAddressRegistered(address string) (bool, error)
RegisterPublicKey(publicKey crypto.PublicKey) error
EpochConfig() (time.Time, time.Duration, error)
}

func NewAddressBinderCronjob(ctx indexerctx.IndexerContext) (Cronjob, error) {
cfg := ctx.Config()

if !cfg.Mirror.Enabled {
return &addressBinderCronJob{}, nil
}

contracts, err := initAddressBinderJobContracts(cfg)
if err != nil {
return nil, err
}

start, period, err := contracts.EpochConfig()
if err != nil {
return nil, err
}

epochs := staking.NewEpochInfo(&cfg.Mirror.EpochConfig, start, period)

mc := &addressBinderCronJob{
epochCronjob: newEpochCronjob(&cfg.Mirror.CronjobConfig, epochs),
db: NewAddressBinderDBGorm(ctx.DB()),
contracts: contracts,
}

err = mc.reset(ctx.Flags().ResetMirrorCronjob)

return mc, err
}

func (c *addressBinderCronJob) Name() string {
return "address_binder"
}

func (c *addressBinderCronJob) OnStart() error {
return nil
}

func (c *addressBinderCronJob) Call() error {
epochRange, err := c.getEpochRange()
if err != nil {
if errors.Is(err, errNoEpochsToRegisterAddresses) {
logger.Debug("no epochs to register addresses")
return nil
}

return err
}

logger.Debug("registering addresses for epochs %d-%d", epochRange.start, epochRange.end)
registeredAddresses := mapset.NewSet[string]()
for epoch := epochRange.start; epoch <= epochRange.end; epoch++ {
logger.Debug("registering addresses for epoch %d", epoch)
if err := c.registerEpoch(registeredAddresses, epoch); err != nil {
return err
}
}

logger.Debug("successfully registered addresses for epochs %d-%d", epochRange.start, epochRange.end)

if err := c.db.UpdateJobState(epochRange.end+1, false); err != nil {
return err
}

return nil
}

var errNoEpochsToRegisterAddresses = errors.New("no epochs to register addresses")

func (c *addressBinderCronJob) getEpochRange() (*epochRange, error) {
startEpoch, err := c.getStartEpoch()
if err != nil {
return nil, err
}

logger.Debug("start epoch: %d", startEpoch)

endEpoch, err := c.getEndEpoch(startEpoch)
if err != nil {
return nil, err
}
logger.Debug("Registering addresses needed for epochs [%d, %d]", startEpoch, endEpoch)
return c.getTrimmedEpochRange(startEpoch, endEpoch), nil
}

func (c *addressBinderCronJob) getStartEpoch() (int64, error) {
jobState, err := c.db.FetchState(addressBinderStateName)
if err != nil {
return 0, err
}

return int64(jobState.NextDBIndex), nil
}

func (c *addressBinderCronJob) getEndEpoch(startEpoch int64) (int64, error) {
currEpoch := c.epochs.GetEpochIndex(c.time.Now())
logger.Debug("current epoch: %d", currEpoch)

for epoch := currEpoch; epoch > startEpoch; epoch-- {
confirmed, err := c.isEpochConfirmed(epoch)
if err != nil {
return 0, err
}

if confirmed {
return epoch, nil
}
}

return 0, errNoEpochsToRegisterAddresses
}

func (c *addressBinderCronJob) isEpochConfirmed(epoch int64) (bool, error) {
merkleRoot, err := c.contracts.GetMerkleRoot(epoch)
if err != nil {
return false, errors.Wrap(err, "votingContract.GetMerkleRoot")
}

return merkleRoot != [32]byte{}, nil
}

func (c *addressBinderCronJob) registerEpoch(registeredAddresses mapset.Set[string], epoch int64) error {
txs, err := c.getEpochTxs(epoch)
if err != nil {
return err
}

if len(txs) == 0 {
logger.Debug("no unregistered txs found")
return nil
}

logger.Info("registering %d txs", len(txs))
if err := c.registerTxs(registeredAddresses, txs, epoch); err != nil {
return err
}

return nil
}

func (c *addressBinderCronJob) getEpochTxs(epoch int64) ([]database.PChainTxData, error) {
startTimestamp, endTimestamp := c.epochs.GetTimeRange(epoch)

txs, err := c.db.GetPChainTxsForEpoch(startTimestamp, endTimestamp)
if err != nil {
return nil, err
}

return staking.DedupeTxs(txs), nil
}

func (c *addressBinderCronJob) registerTxs(registeredAddresses mapset.Set[string], txs []database.PChainTxData, epochID int64) error {
for _, tx := range txs {
if registeredAddresses.Contains(tx.InputAddress) {
continue
}
if err := c.registerAddress(*tx.TxID, tx.InputAddress); err != nil {
// Non-fatal error, continue registering other addresses
logger.Error("error registering address: %s", err.Error())
}
registeredAddresses.Add(tx.InputAddress)
}
return nil
}

// Register address on AddressBinder contract if it is not already registered
func (c *addressBinderCronJob) registerAddress(txID string, address string) error {
registered, err := c.contracts.IsAddressRegistered(address)
if err != nil || registered {
return err
}
tx, err := c.db.GetPChainTx(txID, address)
if err != nil {
return err
}
if tx == nil {
return errors.New("tx not found")
}
publicKeys, err := chain.PublicKeysFromPChainBlock(tx.Bytes)
if err != nil {
return err
}
if tx.InputIndex >= uint32(len(publicKeys)) {
return errors.New("input index out of range")
}
publicKey := publicKeys[tx.InputIndex]
for _, k := range publicKey {
err := c.contracts.RegisterPublicKey(k)
if err != nil {
return errors.Wrap(err, "mirroringContract.RegisterPublicKey")
}
}
logger.Info("registered address %s on address binder contract", address)
return nil
}

func (c *addressBinderCronJob) reset(firstEpoch int64) error {
if firstEpoch <= 0 {
return nil
}

logger.Info("Resetting address binder cronjob state to epoch %d", firstEpoch)
err := c.db.UpdateJobState(firstEpoch, true)
if err != nil {
return err
}
c.epochs.First = firstEpoch
return nil
}
Loading

0 comments on commit 84f2d44

Please sign in to comment.