Skip to content

Commit

Permalink
WarpySyncer: replace poller_sommelier cron subtask
Browse files Browse the repository at this point in the history
  • Loading branch information
asiaziola committed Apr 15, 2024
1 parent d14126e commit 71da165
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 47 deletions.
8 changes: 4 additions & 4 deletions src/utils/config/warpy_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type WarpySyncer struct {
// Max time between failed retries to download block
BlockDownloaderBackoffInterval time.Duration

// Time between poller task is called from block downloader
BlockDownloaderPollerInterval int64

// Warpy contract id
SyncerContractId string

Expand Down Expand Up @@ -101,9 +104,6 @@ type WarpySyncer struct {
// How long does it wait for the query response
PollerSommelierTimeout time.Duration

// Cron which indicates how often to poll for new records
PollerSommelierCron string

// Base for the points multiplication
PollerSommelierPointsBase int64

Expand All @@ -123,6 +123,7 @@ func setWarpySyncerDefaults() {
viper.SetDefault("WarpySyncer.BlockDownloaderBatchSize", 100)
viper.SetDefault("WarpySyncer.BlockDownloaderBackoffInterval", "3s")
viper.SetDefault("WarpySyncer.BlockDownloaderChannelSize", 100)
viper.SetDefault("WarpySyncer.BlockDownloaderPollerInterval", 3600)
viper.SetDefault("WarpySyncer.SyncerContractId", "p5OI99-BaY4QbZts266T7EDwofZqs-wVuYJmMCS0SUU")
viper.SetDefault("WarpySyncer.SyncerNameServiceContractId", "p5OI99-BaY4QbZts266T7EDwofZqs-wVuYJmMCS0SUU")
viper.SetDefault("WarpySyncer.SyncerChain", eth.Arbitrum)
Expand All @@ -149,7 +150,6 @@ func setWarpySyncerDefaults() {
viper.SetDefault("WarpySyncer.PollerSommelierChannelBufferLength", 100)
viper.SetDefault("WarpySyncer.PollerSommelierInterval", "1m")
viper.SetDefault("WarpySyncer.PollerSommelierTimeout", "90s")
viper.SetDefault("WarpySyncer.PollerSommelierCron", "0 * * * * *")
viper.SetDefault("WarpySyncer.PollerSommelierPointsBase", 1000)
viper.SetDefault("WarpySyncer.PollerSommelierSecondsForSelect", 3600)
viper.SetDefault("WarpySyncer.WriterBackoffInterval", "3s")
Expand Down
39 changes: 35 additions & 4 deletions src/warpy_sync/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package warpy_sync
import (
"context"
"errors"
"math"
"math/big"
"runtime"
"sync"
Expand All @@ -20,7 +21,10 @@ import (
type BlockDownloader struct {
*task.Task
lastSyncedBlockHeight int64
pollerCron bool
nextPollBlockHeight int64
Output chan *BlockInfoPayload
OutputPollTxs chan uint64
monitor monitoring.Monitor
ethClient *ethclient.Client
}
Expand All @@ -33,6 +37,7 @@ type BlockDownloader struct {
func NewBlockDownloader(config *config.Config) (self *BlockDownloader) {
self = new(BlockDownloader)
self.Output = make(chan *BlockInfoPayload, config.WarpySyncer.BlockDownloaderChannelSize)
self.OutputPollTxs = make(chan uint64, config.WarpySyncer.BlockDownloaderChannelSize)

self.Task = task.NewTask(config, "block-downloader").
WithPeriodicSubtaskFunc(config.WarpySyncer.BlockDownloaderInterval, self.run).
Expand All @@ -49,26 +54,38 @@ func (self *BlockDownloader) WithMonitor(monitor monitoring.Monitor) *BlockDownl
return self
}

func (self *BlockDownloader) WithPollerCron() *BlockDownloader {
self.pollerCron = true
return self
}

func (self *BlockDownloader) WithEthClient(ethClient *ethclient.Client) *BlockDownloader {
self.ethClient = ethClient
return self
}

func (self *BlockDownloader) WithInitStartBlockHeight(db *gorm.DB, syncedComponent model.SyncedComponent) *BlockDownloader {
self.Task = self.Task.WithOnBeforeStart(func() (err error) {
var lastSyncedBlockHeight int64
var LastSyncedBlock struct {
FinishedBlockHeight int64
FinishedBlockTimestamp int64
}

err = db.WithContext(self.Ctx).
Raw(`SELECT finished_block_height
Raw(`SELECT finished_block_height, finished_block_timestamp
FROM sync_state
WHERE name = ?;`, syncedComponent).
Scan(&lastSyncedBlockHeight).Error
Scan(&LastSyncedBlock).Error

if err != nil {
self.Log.WithError(err).Error("Failed to get last synced block height")
return
}

self.lastSyncedBlockHeight = lastSyncedBlockHeight
self.lastSyncedBlockHeight = LastSyncedBlock.FinishedBlockHeight
if self.pollerCron {
self.nextPollBlockHeight = self.calculateNextFullBlockHeight(self.lastSyncedBlockHeight, LastSyncedBlock.FinishedBlockTimestamp)
}
return nil
})
return self
Expand Down Expand Up @@ -118,6 +135,7 @@ func (self *BlockDownloader) downloadBlocks(blocks []int64) (err error) {

for _, height := range blocks {
height := height
nextPollBlockHeight := self.nextPollBlockHeight
self.SubmitToWorker(func() {
block, err := self.downloadBlock(height)
if err != nil {
Expand All @@ -134,6 +152,12 @@ func (self *BlockDownloader) downloadBlocks(blocks []int64) (err error) {
Hash: block.Hash().String(),
Timestamp: block.Time(),
}:
if self.pollerCron && height == nextPollBlockHeight {
self.OutputPollTxs <- block.Number().Uint64()

self.nextPollBlockHeight = self.calculateNextFullBlockHeight(block.Number().Int64(), int64(block.Time()))
self.Log.WithField("next_poll_block_height", self.nextPollBlockHeight).Debug("Next poll block height has been set")
}
}

end:
Expand Down Expand Up @@ -186,3 +210,10 @@ func (self *BlockDownloader) getBlockInfo(blockNumber int64) (blockInfo *types.B
}
return
}

func (self *BlockDownloader) calculateNextFullBlockHeight(lastSyncedBlockHeight int64, lastSyncedBlockTimestamp int64) int64 {
nextPollBlockTimestamp := (lastSyncedBlockTimestamp - (lastSyncedBlockTimestamp % self.Config.WarpySyncer.BlockDownloaderPollerInterval)) + self.Config.WarpySyncer.BlockDownloaderPollerInterval
timestampDiff := nextPollBlockTimestamp - lastSyncedBlockTimestamp
blocksDiff := math.Round(float64(timestampDiff) / float64(0.26))
return lastSyncedBlockHeight + int64(blocksDiff)
}
5 changes: 4 additions & 1 deletion src/warpy_sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,13 @@ func NewController(config *config.Config) (self *Controller, err error) {
WithContractAbi(contractAbi).
WithDb(db)

blockDownloader.WithPollerCron()

// Polls records from db
poller := NewPollerSommelier(config).
WithDB(db).
WithMonitor(monitor)
WithMonitor(monitor).
WithInputChannel(blockDownloader.OutputPollTxs)

// Writes interaction to Warpy based on the records from the poller
writer := NewWriter(config).
Expand Down
83 changes: 45 additions & 38 deletions src/warpy_sync/poller_sommelier.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@ import (
"gorm.io/gorm"
)

// Periodically gets new evolved contract sources which are not yet in the db
type PollerSommelier struct {
*task.Task

db *gorm.DB
monitor monitoring.Monitor

// Evolve to be sent out
Output chan *InteractionPayload

input chan uint64
}

func NewPollerSommelier(config *config.Config) (self *PollerSommelier) {
self = new(PollerSommelier)

self.Output = make(chan *InteractionPayload, config.WarpySyncer.PollerSommelierChannelBufferLength)

self.Task = task.NewTask(config, "poller").
WithCronSubtaskFunc(config.WarpySyncer.PollerSommelierCron, self.handleNew).
self.Task = task.NewTask(config, "poller_sommelier").
WithSubtaskFunc(self.handleNew).
WithOnAfterStop(func() {
close(self.Output)
})
Expand All @@ -46,52 +46,59 @@ func (self *PollerSommelier) WithMonitor(monitor monitoring.Monitor) *PollerSomm
return self
}

func (self *PollerSommelier) handleNew() (err error) {
self.Log.Debug("Checking for new assets sums...")
ctx, cancel := context.WithTimeout(self.Ctx, self.Config.WarpySyncer.PollerSommelierTimeout)
defer cancel()
func (self *PollerSommelier) WithInputChannel(v chan uint64) *PollerSommelier {
self.input = v
return self
}

var AssetsSums []struct {
FromAddress string
Sum float64
}
err = self.db.WithContext(ctx).
Raw(`SELECT from_address,
func (self *PollerSommelier) handleNew() (err error) {
for block := range self.input {
block := block
self.Log.WithField("block_height", block).Debug("Checking for new assets sums...")
ctx, cancel := context.WithTimeout(self.Ctx, self.Config.WarpySyncer.PollerSommelierTimeout)
defer cancel()

var AssetsSums []struct {
FromAddress string
Sum float64
}
err = self.db.WithContext(ctx).
Raw(`SELECT from_address,
SUM(assets)
FROM warpy_syncer_assets
WHERE timestamp < ? group by from_address;
`, time.Now().Unix()-self.Config.WarpySyncer.PollerSommelierSecondsForSelect).
Scan(&AssetsSums).Error
Scan(&AssetsSums).Error

if err != nil {
if err != gorm.ErrRecordNotFound {
self.Log.WithError(err).Error("Failed to get new assets sums")
self.monitor.GetReport().WarpySyncer.Errors.PollerSommelierFetchError.Inc()
if err != nil {
if err != gorm.ErrRecordNotFound {
self.Log.WithError(err).Error("Failed to get new assets sums")
self.monitor.GetReport().WarpySyncer.Errors.PollerSommelierFetchError.Inc()
}
return
}
return
}

if len(AssetsSums) > 0 {
self.Log.
WithField("count", len(AssetsSums)).
Debug("Polled new assets sum")
} else {
self.Log.Debug("No new assets sum found")
}
if len(AssetsSums) > 0 {
self.Log.
WithField("count", len(AssetsSums)).
Debug("Polled new assets sum")
} else {
self.Log.Debug("No new assets sum found")
}

for _, sum := range AssetsSums {
for _, sum := range AssetsSums {

self.monitor.GetReport().WarpySyncer.State.PollerSommelierAssetsFromSelects.Inc()
self.monitor.GetReport().WarpySyncer.State.PollerSommelierAssetsFromSelects.Inc()

select {
case <-self.Ctx.Done():
return
case self.Output <- &InteractionPayload{
FromAddress: sum.FromAddress,
Points: int64(sum.Sum * float64(self.Config.WarpySyncer.PollerSommelierPointsBase)),
}:
select {
case <-self.Ctx.Done():
return
case self.Output <- &InteractionPayload{
FromAddress: sum.FromAddress,
Points: int64(sum.Sum * float64(self.Config.WarpySyncer.PollerSommelierPointsBase)),
}:
}
}
}

return
}

0 comments on commit 71da165

Please sign in to comment.