diff --git a/src/utils/config/warpy_syncer.go b/src/utils/config/warpy_syncer.go index 14ad2558..608691a0 100644 --- a/src/utils/config/warpy_syncer.go +++ b/src/utils/config/warpy_syncer.go @@ -23,6 +23,9 @@ type WarpySyncer struct { // Max time between failed retries to download block BlockDownloaderBackoffInterval time.Duration + // Time between poller task is called from block downloader + BlockDownloaderPollerInterval int64 + // Warpy contract id SyncerContractId string @@ -101,9 +104,6 @@ type WarpySyncer struct { // How long does it wait for the query response PollerSommelierTimeout time.Duration - // Cron which indicates how often to poll for new records - PollerSommelierCron string - // Base for the points multiplication PollerSommelierPointsBase int64 @@ -123,6 +123,7 @@ func setWarpySyncerDefaults() { viper.SetDefault("WarpySyncer.BlockDownloaderBatchSize", 100) viper.SetDefault("WarpySyncer.BlockDownloaderBackoffInterval", "3s") viper.SetDefault("WarpySyncer.BlockDownloaderChannelSize", 100) + viper.SetDefault("WarpySyncer.BlockDownloaderPollerInterval", 3600) viper.SetDefault("WarpySyncer.SyncerContractId", "p5OI99-BaY4QbZts266T7EDwofZqs-wVuYJmMCS0SUU") viper.SetDefault("WarpySyncer.SyncerNameServiceContractId", "p5OI99-BaY4QbZts266T7EDwofZqs-wVuYJmMCS0SUU") viper.SetDefault("WarpySyncer.SyncerChain", eth.Arbitrum) @@ -149,7 +150,6 @@ func setWarpySyncerDefaults() { viper.SetDefault("WarpySyncer.PollerSommelierChannelBufferLength", 100) viper.SetDefault("WarpySyncer.PollerSommelierInterval", "1m") viper.SetDefault("WarpySyncer.PollerSommelierTimeout", "90s") - viper.SetDefault("WarpySyncer.PollerSommelierCron", "0 * * * * *") viper.SetDefault("WarpySyncer.PollerSommelierPointsBase", 1000) viper.SetDefault("WarpySyncer.PollerSommelierSecondsForSelect", 3600) viper.SetDefault("WarpySyncer.WriterBackoffInterval", "3s") diff --git a/src/warpy_sync/block_downloader.go b/src/warpy_sync/block_downloader.go index 49b14e31..057d1fd4 100644 --- a/src/warpy_sync/block_downloader.go +++ b/src/warpy_sync/block_downloader.go @@ -3,6 +3,7 @@ package warpy_sync import ( "context" "errors" + "math" "math/big" "runtime" "sync" @@ -20,7 +21,10 @@ import ( type BlockDownloader struct { *task.Task lastSyncedBlockHeight int64 + pollerCron bool + nextPollBlockHeight int64 Output chan *BlockInfoPayload + OutputPollTxs chan uint64 monitor monitoring.Monitor ethClient *ethclient.Client } @@ -33,6 +37,7 @@ type BlockDownloader struct { func NewBlockDownloader(config *config.Config) (self *BlockDownloader) { self = new(BlockDownloader) self.Output = make(chan *BlockInfoPayload, config.WarpySyncer.BlockDownloaderChannelSize) + self.OutputPollTxs = make(chan uint64, config.WarpySyncer.BlockDownloaderChannelSize) self.Task = task.NewTask(config, "block-downloader"). WithPeriodicSubtaskFunc(config.WarpySyncer.BlockDownloaderInterval, self.run). @@ -49,6 +54,11 @@ func (self *BlockDownloader) WithMonitor(monitor monitoring.Monitor) *BlockDownl return self } +func (self *BlockDownloader) WithPollerCron() *BlockDownloader { + self.pollerCron = true + return self +} + func (self *BlockDownloader) WithEthClient(ethClient *ethclient.Client) *BlockDownloader { self.ethClient = ethClient return self @@ -56,19 +66,26 @@ func (self *BlockDownloader) WithEthClient(ethClient *ethclient.Client) *BlockDo func (self *BlockDownloader) WithInitStartBlockHeight(db *gorm.DB, syncedComponent model.SyncedComponent) *BlockDownloader { self.Task = self.Task.WithOnBeforeStart(func() (err error) { - var lastSyncedBlockHeight int64 + var LastSyncedBlock struct { + FinishedBlockHeight int64 + FinishedBlockTimestamp int64 + } + err = db.WithContext(self.Ctx). - Raw(`SELECT finished_block_height + Raw(`SELECT finished_block_height, finished_block_timestamp FROM sync_state WHERE name = ?;`, syncedComponent). - Scan(&lastSyncedBlockHeight).Error + Scan(&LastSyncedBlock).Error if err != nil { self.Log.WithError(err).Error("Failed to get last synced block height") return } - self.lastSyncedBlockHeight = lastSyncedBlockHeight + self.lastSyncedBlockHeight = LastSyncedBlock.FinishedBlockHeight + if self.pollerCron { + self.nextPollBlockHeight = self.calculateNextFullBlockHeight(self.lastSyncedBlockHeight, LastSyncedBlock.FinishedBlockTimestamp) + } return nil }) return self @@ -118,6 +135,7 @@ func (self *BlockDownloader) downloadBlocks(blocks []int64) (err error) { for _, height := range blocks { height := height + nextPollBlockHeight := self.nextPollBlockHeight self.SubmitToWorker(func() { block, err := self.downloadBlock(height) if err != nil { @@ -134,6 +152,12 @@ func (self *BlockDownloader) downloadBlocks(blocks []int64) (err error) { Hash: block.Hash().String(), Timestamp: block.Time(), }: + if self.pollerCron && height == nextPollBlockHeight { + self.OutputPollTxs <- block.Number().Uint64() + + self.nextPollBlockHeight = self.calculateNextFullBlockHeight(block.Number().Int64(), int64(block.Time())) + self.Log.WithField("next_poll_block_height", self.nextPollBlockHeight).Debug("Next poll block height has been set") + } } end: @@ -186,3 +210,10 @@ func (self *BlockDownloader) getBlockInfo(blockNumber int64) (blockInfo *types.B } return } + +func (self *BlockDownloader) calculateNextFullBlockHeight(lastSyncedBlockHeight int64, lastSyncedBlockTimestamp int64) int64 { + nextPollBlockTimestamp := (lastSyncedBlockTimestamp - (lastSyncedBlockTimestamp % self.Config.WarpySyncer.BlockDownloaderPollerInterval)) + self.Config.WarpySyncer.BlockDownloaderPollerInterval + timestampDiff := nextPollBlockTimestamp - lastSyncedBlockTimestamp + blocksDiff := math.Round(float64(timestampDiff) / float64(0.26)) + return lastSyncedBlockHeight + int64(blocksDiff) +} diff --git a/src/warpy_sync/controller.go b/src/warpy_sync/controller.go index 0e09dde4..e27704f3 100644 --- a/src/warpy_sync/controller.go +++ b/src/warpy_sync/controller.go @@ -101,10 +101,13 @@ func NewController(config *config.Config) (self *Controller, err error) { WithContractAbi(contractAbi). WithDb(db) + blockDownloader.WithPollerCron() + // Polls records from db poller := NewPollerSommelier(config). WithDB(db). - WithMonitor(monitor) + WithMonitor(monitor). + WithInputChannel(blockDownloader.OutputPollTxs) // Writes interaction to Warpy based on the records from the poller writer := NewWriter(config). diff --git a/src/warpy_sync/poller_sommelier.go b/src/warpy_sync/poller_sommelier.go index d0fa9592..3e7e9bec 100644 --- a/src/warpy_sync/poller_sommelier.go +++ b/src/warpy_sync/poller_sommelier.go @@ -11,15 +11,15 @@ import ( "gorm.io/gorm" ) -// Periodically gets new evolved contract sources which are not yet in the db type PollerSommelier struct { *task.Task db *gorm.DB monitor monitoring.Monitor - // Evolve to be sent out Output chan *InteractionPayload + + input chan uint64 } func NewPollerSommelier(config *config.Config) (self *PollerSommelier) { @@ -27,8 +27,8 @@ func NewPollerSommelier(config *config.Config) (self *PollerSommelier) { self.Output = make(chan *InteractionPayload, config.WarpySyncer.PollerSommelierChannelBufferLength) - self.Task = task.NewTask(config, "poller"). - WithCronSubtaskFunc(config.WarpySyncer.PollerSommelierCron, self.handleNew). + self.Task = task.NewTask(config, "poller_sommelier"). + WithSubtaskFunc(self.handleNew). WithOnAfterStop(func() { close(self.Output) }) @@ -46,52 +46,59 @@ func (self *PollerSommelier) WithMonitor(monitor monitoring.Monitor) *PollerSomm return self } -func (self *PollerSommelier) handleNew() (err error) { - self.Log.Debug("Checking for new assets sums...") - ctx, cancel := context.WithTimeout(self.Ctx, self.Config.WarpySyncer.PollerSommelierTimeout) - defer cancel() +func (self *PollerSommelier) WithInputChannel(v chan uint64) *PollerSommelier { + self.input = v + return self +} - var AssetsSums []struct { - FromAddress string - Sum float64 - } - err = self.db.WithContext(ctx). - Raw(`SELECT from_address, +func (self *PollerSommelier) handleNew() (err error) { + for block := range self.input { + block := block + self.Log.WithField("block_height", block).Debug("Checking for new assets sums...") + ctx, cancel := context.WithTimeout(self.Ctx, self.Config.WarpySyncer.PollerSommelierTimeout) + defer cancel() + + var AssetsSums []struct { + FromAddress string + Sum float64 + } + err = self.db.WithContext(ctx). + Raw(`SELECT from_address, SUM(assets) FROM warpy_syncer_assets WHERE timestamp < ? group by from_address; `, time.Now().Unix()-self.Config.WarpySyncer.PollerSommelierSecondsForSelect). - Scan(&AssetsSums).Error + Scan(&AssetsSums).Error - if err != nil { - if err != gorm.ErrRecordNotFound { - self.Log.WithError(err).Error("Failed to get new assets sums") - self.monitor.GetReport().WarpySyncer.Errors.PollerSommelierFetchError.Inc() + if err != nil { + if err != gorm.ErrRecordNotFound { + self.Log.WithError(err).Error("Failed to get new assets sums") + self.monitor.GetReport().WarpySyncer.Errors.PollerSommelierFetchError.Inc() + } + return } - return - } - if len(AssetsSums) > 0 { - self.Log. - WithField("count", len(AssetsSums)). - Debug("Polled new assets sum") - } else { - self.Log.Debug("No new assets sum found") - } + if len(AssetsSums) > 0 { + self.Log. + WithField("count", len(AssetsSums)). + Debug("Polled new assets sum") + } else { + self.Log.Debug("No new assets sum found") + } - for _, sum := range AssetsSums { + for _, sum := range AssetsSums { - self.monitor.GetReport().WarpySyncer.State.PollerSommelierAssetsFromSelects.Inc() + self.monitor.GetReport().WarpySyncer.State.PollerSommelierAssetsFromSelects.Inc() - select { - case <-self.Ctx.Done(): - return - case self.Output <- &InteractionPayload{ - FromAddress: sum.FromAddress, - Points: int64(sum.Sum * float64(self.Config.WarpySyncer.PollerSommelierPointsBase)), - }: + select { + case <-self.Ctx.Done(): + return + case self.Output <- &InteractionPayload{ + FromAddress: sum.FromAddress, + Points: int64(sum.Sum * float64(self.Config.WarpySyncer.PollerSommelierPointsBase)), + }: + } } } - return }