Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staking clients #22

Merged
merged 12 commits into from
Sep 26, 2023
Merged
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ The configuration is read from `toml` file.
The settings for `[db]`, `[logger]` are the same as for the indexer above.
Specific settings are listed below.

**Note:** We recommend that the user accessing the database is not the same as for the indexer. The user for the services should only have read permissions enabled!

Config file can be specified using the command line parameter `--config`, e.g., `./services --config config.local.toml`. The default config file name is `config.toml`.

```toml
Expand Down
7 changes: 0 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"flag"
"flare-indexer/utils"
"fmt"
"log"
Expand Down Expand Up @@ -99,9 +98,3 @@ func ReadEnv(cfg interface{}) error {
}
return nil
}

func ConfigFileName() string {
cfgFlag := flag.String("config", CONFIG_FILE, "Configuration file (toml format)")
flag.Parse()
return *cfgFlag
}
4 changes: 4 additions & 0 deletions database/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,7 @@ func PersistUptimeAggregations(db *gorm.DB, aggregations []*UptimeAggregation) e
}
return db.Create(aggregations).Error
}

func DeleteUptimesBefore(db *gorm.DB, timestamp time.Time) error {
return db.Where("timestamp < ?", timestamp).Delete(&UptimeCronjob{}).Error
}
8 changes: 4 additions & 4 deletions indexer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type VotingConfig struct {
type UptimeConfig struct {
CronjobConfig
config.EpochConfig
EnableVoting bool `toml:"enable_voting"`
UptimeThreshold float64 `toml:"uptime_threshold"`
EnableVoting bool `toml:"enable_voting"`
UptimeThreshold float64 `toml:"uptime_threshold"`
DeleteOldUptimesEpochThreshold int64 `toml:"delete_old_uptimes_epoch_threshold"`
}

func newConfig() *Config {
Expand Down Expand Up @@ -93,8 +94,7 @@ func (c Config) ChainConfig() config.ChainConfig {
return c.Chain
}

func BuildConfig() (*Config, error) {
cfgFileName := config.ConfigFileName()
func BuildConfig(cfgFileName string) (*Config, error) {
cfg := newConfig()
err := config.ParseConfigFile(cfg, cfgFileName, false)
if err != nil {
Expand Down
45 changes: 39 additions & 6 deletions indexer/context/context.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package context

import (
"flag"
globalConfig "flare-indexer/config"
"flare-indexer/database"
"flare-indexer/indexer/config"
Expand All @@ -11,30 +12,62 @@ import (
type IndexerContext interface {
Config() *config.Config
DB() *gorm.DB
Flags() *IndexerFlags
}

type IndexerFlags struct {
ConfigFileName string

// Set start epoch for voting cronjob to this value, overrides config and database value,
// valid value is > 0
ResetVotingCronjob int64

// Set start epoch for mirroring cronjob to this value, overrides config and database value,
// valid value is > 0
ResetMirrorCronjob int64
}

type indexerContext struct {
config *config.Config
db *gorm.DB
flags *IndexerFlags
}

func BuildContext() (IndexerContext, error) {
ctx := indexerContext{}

cfg, err := config.BuildConfig()
flags := parseIndexerFlags()
cfg, err := config.BuildConfig(flags.ConfigFileName)
if err != nil {
return nil, err
}
ctx.config = cfg
globalConfig.GlobalConfigCallback.Call(cfg)

ctx.db, err = database.ConnectAndInitialize(&cfg.DB)
db, err := database.ConnectAndInitialize(&cfg.DB)
if err != nil {
return nil, err
}
return &ctx, nil

return &indexerContext{
config: cfg,
db: db,
flags: flags,
}, nil
}

func (c *indexerContext) Config() *config.Config { return c.config }

func (c *indexerContext) DB() *gorm.DB { return c.db }

func (c *indexerContext) Flags() *IndexerFlags { return c.flags }

func parseIndexerFlags() *IndexerFlags {
cfgFlag := flag.String("config", globalConfig.CONFIG_FILE, "Configuration file (toml format)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a general comment, personally I like using the argparse library for parsing command-line args, I prefer it to the builtin flag package. Not expecting you to change it here but just in case you want to check it out.

resetVotingFlag := flag.Int64("reset-voting", 0, "Set start epoch for voting cronjob to this value, overrides config and database value, valid values are > 0")
resetMirrorFlag := flag.Int64("reset-mirroring", 0, "Set start epoch for mirroring cronjob to this value, overrides config and database value, valid values are > 0")
flag.Parse()

return &IndexerFlags{
ConfigFileName: *cfgFlag,
ResetVotingCronjob: *resetVotingFlag,
ResetMirrorCronjob: *resetMirrorFlag,
}
}
24 changes: 20 additions & 4 deletions indexer/cronjob/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type mirrorCronJob struct {

type mirrorDB interface {
FetchState(name string) (database.State, error)
UpdateJobState(epoch int64) error
UpdateJobState(epoch int64, force bool) error
GetPChainTxsForEpoch(start, end time.Time) ([]database.PChainTxData, error)
GetPChainTx(txID string, address string) (*database.PChainTxData, error)
}
Expand All @@ -56,11 +56,13 @@ func NewMirrorCronjob(ctx indexerctx.IndexerContext) (Cronjob, error) {
return nil, err
}

return &mirrorCronJob{
mc := &mirrorCronJob{
epochCronjob: newEpochCronjob(&cfg.Mirror.CronjobConfig, &cfg.Epochs),
db: NewMirrorDBGorm(ctx.DB()),
contracts: contracts,
}, nil
}
mc.reset(ctx.Flags().ResetMirrorCronjob)
return mc, nil
}

func (c *mirrorCronJob) Name() string {
Expand Down Expand Up @@ -108,7 +110,7 @@ func (c *mirrorCronJob) Call() error {

logger.Debug("successfully mirrored epochs %d-%d", epochRange.start, epochRange.end)

if err := c.db.UpdateJobState(epochRange.end); err != nil {
if err := c.db.UpdateJobState(epochRange.end+1, false); err != nil {
return err
}

Expand Down Expand Up @@ -323,3 +325,17 @@ func (c *mirrorCronJob) registerAddress(txID string, address string) error {
}
return nil
}

func (c *mirrorCronJob) reset(firstEpoch int64) error {
if firstEpoch <= 0 {
return nil
}

logger.Info("Resetting mirroring cronjob state to epoch %d", firstEpoch)
err := c.db.UpdateJobState(firstEpoch, true)
if err != nil {
return err
}
c.epochs.First = firstEpoch
return nil
}
4 changes: 2 additions & 2 deletions indexer/cronjob/mirror_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func (m mirrorDBGorm) FetchState(name string) (database.State, error) {
return database.FetchState(m.db, name)
}

func (m mirrorDBGorm) UpdateJobState(epoch int64) error {
func (m mirrorDBGorm) UpdateJobState(epoch int64, force bool) error {
return m.db.Transaction(func(tx *gorm.DB) error {
jobState, err := database.FetchState(tx, mirrorStateName)
if err != nil {
return errors.Wrap(err, "database.FetchState")
}

if jobState.NextDBIndex >= uint64(epoch) {
if !force && jobState.NextDBIndex >= uint64(epoch) {
logger.Debug("job state already up to date")
return nil
}
Expand Down
11 changes: 5 additions & 6 deletions indexer/cronjob/mirror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestOneTransaction(t *testing.T) {

db := testMirror(t, txs, contracts, epochs)

require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(3))
require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(4))
}

func TestMultipleTransactionsInEpoch(t *testing.T) {
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestMultipleTransactionsInEpoch(t *testing.T) {

db := testMirror(t, txsMap, contracts, epochs)

require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(3))
require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(4))
}

func TestMultipleTransactionsInSeparateEpochs(t *testing.T) {
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestMultipleTransactionsInSeparateEpochs(t *testing.T) {

db := testMirror(t, txsMap, contracts, epochs)

require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(2))
require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(3))
}

func TestAlreadyMirrored(t *testing.T) {
Expand Down Expand Up @@ -225,7 +225,7 @@ func testMirrorErrors(t *testing.T, errorMsg string) {

db := testMirror(t, txs, contracts, epochs)

require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(3))
require.Equal(t, db.states[mirrorStateName].NextDBIndex, uint64(4))
}

func initEpochs() staking.EpochInfo {
Expand Down Expand Up @@ -288,12 +288,11 @@ func (db testDB) FetchState(name string) (database.State, error) {
return state, nil
}

func (db testDB) UpdateJobState(epoch int64) error {
func (db testDB) UpdateJobState(epoch int64, force bool) error {
db.states[mirrorStateName] = database.State{
Name: mirrorStateName,
NextDBIndex: uint64(epoch),
}

return nil
}

Expand Down
41 changes: 36 additions & 5 deletions indexer/cronjob/uptime_voting.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ type uptimeVotingCronjob struct {
// It is set to the last finished aggregation epoch
lastAggregatedEpoch int64

// Delete all uptimes that are older than the current epoch-deleteOldUptimesEpochThreshold
// If deleteOldUptimesEpochThreshold is set to 0, no uptimes will be deleted
// If it is set to > 0, minimum is 5
deleteOldUptimesEpochThreshold int64

uptimeThreshold float64

votingContract *voting.Voting
Expand Down Expand Up @@ -70,11 +75,12 @@ func NewUptimeVotingCronjob(ctx context.IndexerContext) (*uptimeVotingCronjob, e
timeout: config.Timeout,
epochs: staking.NewEpochInfo(&ctx.Config().UptimeCronjob.EpochConfig),
},
lastAggregatedEpoch: -1,
uptimeThreshold: config.UptimeThreshold,
votingContract: votingContract,
txOpts: txOpts,
db: ctx.DB(),
lastAggregatedEpoch: -1,
deleteOldUptimesEpochThreshold: config.DeleteOldUptimesEpochThreshold,
uptimeThreshold: config.UptimeThreshold,
votingContract: votingContract,
txOpts: txOpts,
db: ctx.DB(),
}, nil

}
Expand Down Expand Up @@ -136,6 +142,12 @@ func (c *uptimeVotingCronjob) Call() error {
return fmt.Errorf("failed persisting uptime aggregations %w", err)
}
c.lastAggregatedEpoch = lastAggregatedEpoch

err = c.deleteOldUptimes()
if err != nil {
// Error is non-fatal, we only log it
logger.Error("Failed deleting old uptimes: %v", err)
}
return nil
}

Expand Down Expand Up @@ -248,6 +260,25 @@ func (c *uptimeVotingCronjob) submitVotes(epoch int64, nodeAggregations []*datab
return err
}

func (c *uptimeVotingCronjob) deleteOldUptimes() error {
if c.deleteOldUptimesEpochThreshold <= 0 {
return nil
}

var lastEpochToDelete int64
if c.deleteOldUptimesEpochThreshold < 5 {
lastEpochToDelete = c.lastAggregatedEpoch - 5
} else {
lastEpochToDelete = c.lastAggregatedEpoch - c.deleteOldUptimesEpochThreshold
}
if lastEpochToDelete < 0 {
return nil
}

_, epochEnd := c.epochs.GetTimeRange(lastEpochToDelete)
return database.DeleteUptimesBefore(c.db, epochEnd)
}

type nodeStakingInterval struct {
nodeID string
start int64
Expand Down
29 changes: 27 additions & 2 deletions indexer/cronjob/voting.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,17 @@ func NewVotingCronjob(ctx indexerctx.IndexerContext) (*votingCronjob, error) {
return nil, err
}

return &votingCronjob{
vc := &votingCronjob{
epochCronjob: newEpochCronjob(&cfg.VotingCronjob.CronjobConfig, &cfg.Epochs),
db: db,
contract: contract,
}, nil
}

err = vc.reset(ctx.Flags().ResetVotingCronjob)
if err != nil {
return nil, err
}
return vc, nil
}

func (c *votingCronjob) Name() string {
Expand Down Expand Up @@ -137,3 +143,22 @@ func (c *votingCronjob) submitVotes(e int64, votingData []database.PChainTxData)
err = c.contract.SubmitVote(big.NewInt(e), [32]byte(merkleRoot))
return err
}

func (c *votingCronjob) reset(firstEpoch int64) error {
if firstEpoch <= 0 {
return nil
}

logger.Info("Resetting voting cronjob state to epoch %d", firstEpoch)
state, err := c.db.FetchState(votingStateName)
if err != nil {
return err
}
state.NextDBIndex = uint64(firstEpoch)
err = c.db.UpdateState(&state)
if err != nil {
return err
}
c.epochs.First = firstEpoch
return nil
}
3 changes: 1 addition & 2 deletions services/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func (c Config) ChainConfig() config.ChainConfig {
return c.Chain
}

func BuildConfig() (*Config, error) {
cfgFileName := config.ConfigFileName()
func BuildConfig(cfgFileName string) (*Config, error) {
cfg := newConfig()
err := config.ParseConfigFile(cfg, cfgFileName, false)
if err != nil {
Expand Down
Loading
Loading