Skip to content

Commit

Permalink
Monitor errors in listener #9
Browse files Browse the repository at this point in the history
  • Loading branch information
janekolszak committed Jan 26, 2023
1 parent 81ff480 commit 2b88ac8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (self *Controller) run() (err error) {

// Listening for arweave transactions
listener := listener.NewListener(self.config).
WithMonitor(self.monitor).
WithStartHeight(startHeight).
WithClient(client)

Expand Down
15 changes: 14 additions & 1 deletion src/utils/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"syncer/src/utils/config"
"syncer/src/utils/logger"
"syncer/src/utils/model"
"syncer/src/utils/monitor"
"syncer/src/utils/warp"
"time"

Expand All @@ -23,6 +24,7 @@ type Listener struct {
config *config.Config
log *logrus.Entry
interactionParser *warp.InteractionParser
monitor *monitor.Monitor

// Stopping
isStopping *atomic.Bool
Expand Down Expand Up @@ -85,6 +87,11 @@ func NewListener(config *config.Config) (self *Listener) {
return
}

func (self *Listener) WithMonitor(monitor *monitor.Monitor) *Listener {
self.monitor = monitor
return self
}

func (self *Listener) WithClient(client *arweave.Client) *Listener {
self.client = client
return self
Expand Down Expand Up @@ -145,6 +152,7 @@ func (self *Listener) monitorNetwork() {
networkInfo, err := self.client.GetNetworkInfo(ctx)
if err != nil {
self.log.WithError(err).Error("Failed to get Arweave network info")
self.monitor.Increment(monitor.Kind(monitor.NetworkInfoDownloadErrors))
return
}

Expand Down Expand Up @@ -207,6 +215,8 @@ func (self *Listener) monitorBlocks() {
// This will completly reset the HTTP client and possibly help in solving the problem
self.client.Reset()

self.monitor.Increment(monitor.Kind(monitor.BlockDownloadErrors))

time.Sleep(self.config.ListenerRetryFailedTransactionDownloadInterval)
if self.isStopping.Load() {
// Neglect this block and close the goroutine
Expand All @@ -220,7 +230,7 @@ func (self *Listener) monitorBlocks() {
if !block.IsValid() {
self.log.WithField("height", height).Panic("Block hash isn't valid")
// self.log.WithField("height", height).Error("Block hash isn't valid, blacklisting peer for ever and retrying")

self.monitor.Increment(monitor.Kind(monitor.BlockValidationErrors))
goto retry
// FIXME: Inform downstream something's wrong
// FIXME: Blacklist peer, retry downloading block
Expand Down Expand Up @@ -282,6 +292,8 @@ func (self *Listener) downloadTransactions(block *arweave.Block) (out []*arweave
// This will completly reset the HTTP client and possibly help in solving the problem
self.client.Reset()

self.monitor.Increment(monitor.Kind(monitor.TxDownloadErrors))

time.Sleep(self.config.ListenerRetryFailedTransactionDownloadInterval)
if self.isStopping.Load() {
// Neglect this block and close the goroutine
Expand Down Expand Up @@ -382,6 +394,7 @@ func (self *Listener) verifyTransactions(transactions []*arweave.Transaction) (e
for _, tx := range transactions {
err = tx.Verify()
if err != nil {
self.monitor.Increment(monitor.Kind(monitor.TxValidationErrors))
self.log.Error("Transaction failed to verify")
return
}
Expand Down

0 comments on commit 2b88ac8

Please sign in to comment.