Skip to content

Commit

Permalink
Avoid duplicate polling loops with context (#196)
Browse files Browse the repository at this point in the history
* Setup all polling metrics with a context that is cancelled when a new websocket is opened

* Updatate libs

* libs 0.16.1
  • Loading branch information
cmmarslender authored Nov 12, 2024
1 parent 755bcad commit 2f6189b
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/chia-network/chia-exporter
go 1.21

require (
github.com/chia-network/go-chia-libs v0.16.0
github.com/chia-network/go-chia-libs v0.16.1
github.com/chia-network/go-modules v0.0.8
github.com/go-sql-driver/mysql v1.8.1
github.com/oschwald/maxminddb-golang v1.13.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chia-network/go-chia-libs v0.16.0 h1:xIgzoNV+iPe7jGtIhJJDcVy2hwhnCTKFYeOQhkWTwpQ=
github.com/chia-network/go-chia-libs v0.16.0/go.mod h1:npTqaFSjTdMxE7hc0LOmWJmWGqcs+IERarK5fDxXk/I=
github.com/chia-network/go-chia-libs v0.16.1 h1:oMOfvGodh7aCXX0xGFGU6NtmC+u/bz5BUptUbirH1jI=
github.com/chia-network/go-chia-libs v0.16.1/go.mod h1:npTqaFSjTdMxE7hc0LOmWJmWGqcs+IERarK5fDxXk/I=
github.com/chia-network/go-modules v0.0.8 h1:VATMxehRISOhaRwPo/GL735IKWW0G7sUYH2OmBofsfE=
github.com/chia-network/go-modules v0.0.8/go.mod h1:OdvlWftyJc3+i3QYv5cfQsiQASL7Em7fJnzdmPmj07M=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
Expand Down
3 changes: 2 additions & 1 deletion internal/metrics/crawler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -122,7 +123,7 @@ func (s *CrawlerServiceMetrics) InitialData() {
}

// SetupPollingMetrics starts any metrics that happen on an interval
func (s *CrawlerServiceMetrics) SetupPollingMetrics() {}
func (s *CrawlerServiceMetrics) SetupPollingMetrics(ctx context.Context) {}

// Disconnected clears/unregisters metrics when the connection drops
func (s *CrawlerServiceMetrics) Disconnected() {
Expand Down
3 changes: 2 additions & 1 deletion internal/metrics/farmer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"encoding/json"
"fmt"

Expand Down Expand Up @@ -102,7 +103,7 @@ func (s *FarmerServiceMetrics) InitialData() {
}

// SetupPollingMetrics starts any metrics that happen on an interval
func (s *FarmerServiceMetrics) SetupPollingMetrics() {}
func (s *FarmerServiceMetrics) SetupPollingMetrics(ctx context.Context) {}

// Disconnected clears/unregisters metrics when the connection drops
func (s *FarmerServiceMetrics) Disconnected() {
Expand Down
21 changes: 14 additions & 7 deletions internal/metrics/fullnode.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -164,19 +165,25 @@ func (s *FullNodeServiceMetrics) InitialData() {
utils.LogErr(s.metrics.client.FullNodeService.GetBlockchainState()) // Also calls get_connections once we get the response
utils.LogErr(s.metrics.client.FullNodeService.GetBlockCountMetrics())
s.GetFeeEstimates()
}

// SetupPollingMetrics starts any metrics that happen on an interval
func (s *FullNodeServiceMetrics) SetupPollingMetrics(ctx context.Context) {
// Things that update in the background
go func() {
go func(ctx context.Context) {
for {
s.RefreshFileSizes()
time.Sleep(30 * time.Second)
select {
case <-ctx.Done():
// Exit the loop if the context is canceled
return
default:
s.RefreshFileSizes()
time.Sleep(30 * time.Second)
}
}
}()
}(ctx)
}

// SetupPollingMetrics starts any metrics that happen on an interval
func (s *FullNodeServiceMetrics) SetupPollingMetrics() {}

// Disconnected clears/unregisters metrics when the connection drops
func (s *FullNodeServiceMetrics) Disconnected() {
s.version.Reset()
Expand Down
18 changes: 13 additions & 5 deletions internal/metrics/harvester.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"encoding/json"
"fmt"
"time"
Expand Down Expand Up @@ -83,13 +84,20 @@ func (s *HarvesterServiceMetrics) InitialData() {
}

// SetupPollingMetrics starts any metrics that happen on an interval
func (s *HarvesterServiceMetrics) SetupPollingMetrics() {
go func() {
func (s *HarvesterServiceMetrics) SetupPollingMetrics(ctx context.Context) {
// Things that update in the background
go func(ctx context.Context) {
for {
utils.LogErr(s.metrics.client.HarvesterService.GetConnections(&rpc.GetConnectionsOptions{}))
time.Sleep(15 * time.Second)
select {
case <-ctx.Done():
// Exit the loop if the context is canceled
return
default:
utils.LogErr(s.metrics.client.HarvesterService.GetConnections(&rpc.GetConnectionsOptions{}))
time.Sleep(15 * time.Second)
}
}
}()
}(ctx)
}

func (s *HarvesterServiceMetrics) httpGetPlots() {
Expand Down
7 changes: 5 additions & 2 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -42,7 +43,7 @@ type serviceMetrics interface {

// SetupPollingMetrics Some services need data that doesn't have a good event to hook into
// In those cases, we have to fall back to polling
SetupPollingMetrics()
SetupPollingMetrics(ctx context.Context)

// ReceiveResponse is called when a response is received for the particular metrics service
ReceiveResponse(*types.WebsocketResponse)
Expand Down Expand Up @@ -332,9 +333,10 @@ func (m *Metrics) OpenWebsocket() error {
}
}

ctx, cancel := context.WithCancel(context.Background())
for _, service := range m.serviceMetrics {
service.InitialData()
service.SetupPollingMetrics()
service.SetupPollingMetrics(ctx)
}

m.lastReceive = time.Now()
Expand All @@ -344,6 +346,7 @@ func (m *Metrics) OpenWebsocket() error {
time.Sleep(10 * time.Second)

if m.lastReceive.Before(time.Now().Add(-5 * time.Minute)) {
cancel()
log.Info("Websocket connection seems down. Recreating...")
m.disconnectHandler()
err := m.setNewClient()
Expand Down
3 changes: 2 additions & 1 deletion internal/metrics/timelord.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"encoding/json"

"github.com/chia-network/go-chia-libs/pkg/rpc"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (s *TimelordServiceMetrics) InitialData() {
}

// SetupPollingMetrics starts any metrics that happen on an interval
func (s *TimelordServiceMetrics) SetupPollingMetrics() {}
func (s *TimelordServiceMetrics) SetupPollingMetrics(ctx context.Context) {}

// Disconnected clears/unregisters metrics when the connection drops
func (s *TimelordServiceMetrics) Disconnected() {
Expand Down
18 changes: 13 additions & 5 deletions internal/metrics/wallet.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"encoding/json"
"fmt"
"time"
Expand Down Expand Up @@ -73,13 +74,20 @@ func (s *WalletServiceMetrics) InitialData() {
}

// SetupPollingMetrics starts any metrics that happen on an interval
func (s *WalletServiceMetrics) SetupPollingMetrics() {
go func() {
func (s *WalletServiceMetrics) SetupPollingMetrics(ctx context.Context) {
// Things that update in the background
go func(ctx context.Context) {
for {
utils.LogErr(s.metrics.client.WalletService.GetConnections(&rpc.GetConnectionsOptions{}))
time.Sleep(15 * time.Second)
select {
case <-ctx.Done():
// Exit the loop if the context is canceled
return
default:
utils.LogErr(s.metrics.client.WalletService.GetConnections(&rpc.GetConnectionsOptions{}))
time.Sleep(15 * time.Second)
}
}
}()
}(ctx)
}

// Disconnected clears/unregisters metrics when the connection drops
Expand Down

0 comments on commit 2f6189b

Please sign in to comment.