Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:vocdoni/vocdoni-node into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
p4u committed Oct 10, 2023
2 parents 5fe633b + 00f284e commit 0207f21
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 170 deletions.
4 changes: 1 addition & 3 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"

"go.vocdoni.io/dvote/data/ipfs"
"go.vocdoni.io/dvote/metrics"
"go.vocdoni.io/dvote/types"
)

Expand All @@ -19,8 +18,7 @@ type Storage interface {
Unpin(ctx context.Context, path string) error
ListPins(ctx context.Context) (map[string]string, error)
URIprefix() string
Stats(ctx context.Context) map[string]any
CollectMetrics(ctx context.Context, ma *metrics.Agent) error
Stats() map[string]any
Stop() error
}

Expand Down
7 changes: 1 addition & 6 deletions data/datamocktest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"go.vocdoni.io/dvote/data/ipfs"
"go.vocdoni.io/dvote/metrics"
"go.vocdoni.io/dvote/test/testcommon/testutil"
"go.vocdoni.io/dvote/types"
)
Expand Down Expand Up @@ -80,11 +79,7 @@ func (d *DataMockTest) URIprefix() string {
return d.prefix
}

func (*DataMockTest) Stats(_ context.Context) map[string]any {
return nil
}

func (*DataMockTest) CollectMetrics(_ context.Context, _ *metrics.Agent) error {
func (*DataMockTest) Stats() map[string]any {
return nil
}

Expand Down
40 changes: 26 additions & 14 deletions data/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func (i *Handler) Init(d *types.DataStore) error {
return err
}

i.registerMetrics()

return nil
}

Expand Down Expand Up @@ -207,39 +209,49 @@ func (i *Handler) Unpin(ctx context.Context, path string) error {
}

// Stats returns stats about the IPFS node.
func (i *Handler) Stats(ctx context.Context) map[string]any {
peers, err := i.CoreAPI.Swarm().Peers(ctx)
if err != nil {
return map[string]any{"error": err.Error()}
}
addresses, err := i.CoreAPI.Swarm().KnownAddrs(ctx)
func (i *Handler) Stats() map[string]any {
return map[string]any{"peers": i.countPeers(), "addresses": i.countKnownAddrs(), "pins": i.countPins()}
}

func (i *Handler) countPeers() float64 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
v, err := i.CoreAPI.Swarm().Peers(ctx)
if err != nil {
return map[string]any{"error": err.Error()}
return -1
}
pins, err := i.countPins(ctx)
return float64(len(v))
}

func (i *Handler) countKnownAddrs() float64 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
v, err := i.CoreAPI.Swarm().KnownAddrs(ctx)
if err != nil {
return map[string]any{"error": err.Error()}
return -1
}
return map[string]any{"peers": len(peers), "addresses": len(addresses), "pins": pins}
return float64(len(v))
}

func (i *Handler) countPins(ctx context.Context) (int, error) {
func (i *Handler) countPins() float64 {
// Note that pins is a channel that gets closed when finished.
// We MUST range over the entire channel to not leak goroutines.
// Maybe there is a way to get the total number of pins without
// iterating over them?
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
pins, err := i.CoreAPI.Pin().Ls(ctx)
if err != nil {
return 0, err
return -1
}
count := 0
for pin := range pins {
if err := pin.Err(); err != nil {
return 0, err
return -1
}
count++
}
return count, nil
return float64(count)
}

// ListPins returns a map of all pinned CIDs and their types
Expand Down
73 changes: 13 additions & 60 deletions data/ipfs/metrics.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,31 @@
package ipfs

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"

"go.vocdoni.io/dvote/metrics"
)

// File collectors
var (
// FilePeers ...
FilePeers = prometheus.NewGauge(prometheus.GaugeOpts{
// registerMetrics registers prometheus metrics
func (i *Handler) registerMetrics() {
metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "file",
Name: "peers",
Help: "The number of connected peers",
})
// FileAddresses ...
FileAddresses = prometheus.NewGauge(prometheus.GaugeOpts{
},
i.countPeers))

metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "file",
Name: "addresses",
Help: "The number of registered addresses",
})
// FilePins ...
FilePins = prometheus.NewGauge(prometheus.GaugeOpts{
},
i.countKnownAddrs))

metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "file",
Name: "pins",
Help: "The number of pinned files",
})
)

// RegisterMetrics to initialize the metrics to the agent
func (*Handler) registerMetrics(ma *metrics.Agent) {
ma.Register(FilePeers)
ma.Register(FileAddresses)
ma.Register(FilePins)
}

// setMetrics to be called as a loop and grab metrics
func (i *Handler) setMetrics(ctx context.Context) error {
peers, err := i.CoreAPI.Swarm().Peers(ctx)
if err != nil {
return err
}
FilePeers.Set(float64(len(peers)))
addresses, err := i.CoreAPI.Swarm().KnownAddrs(ctx)
if err != nil {
return err
}
FileAddresses.Set(float64(len(addresses)))
pins, err := i.countPins(ctx)
if err != nil {
return err
}
FilePins.Set(float64(pins))
return nil
}

// CollectMetrics constantly updates the metric values for prometheus
// The function is blocking, should be called in a go routine
// If the metrics Agent is nil, do nothing
func (i *Handler) CollectMetrics(ctx context.Context, ma *metrics.Agent) error {
if ma != nil {
i.registerMetrics(ma)
for {
time.Sleep(ma.RefreshInterval)
tctx, cancel := context.WithTimeout(ctx, time.Minute)
err := i.setMetrics(tctx)
cancel()
if err != nil {
return err
}
}
}
return nil
},
i.countPins))
}
4 changes: 2 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func NewAgent(path string, interval time.Duration, router *httprouter.HTTProuter
return &ma
}

// Register adds a prometheus collector
func (*Agent) Register(c prometheus.Collector) {
// Register the provided prometheus collector, ignoring any error returned (simply logs a Warn)
func Register(c prometheus.Collector) {
err := prometheus.Register(c)
if err != nil {
log.Warnf("cannot register metrics: (%s) (%+v)", err, c)
Expand Down
7 changes: 1 addition & 6 deletions service/ipfs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package service

import (
"context"
"os"
"time"

Expand All @@ -25,14 +24,10 @@ func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage
go func() {
for {
time.Sleep(time.Second * 120)
tctx, cancel := context.WithTimeout(context.Background(), time.Minute)
log.Monitor("ipfs storage", storage.Stats(tctx))
cancel()
log.Monitor("ipfs storage", storage.Stats())
}
}()

go storage.CollectMetrics(context.Background(), vs.MetricsAgent)

if len(ipfsconfig.ConnectKey) > 0 {
log.Infow("starting ipfsconnect service", "key", ipfsconfig.ConnectKey)
ipfsconn := ipfsconnect.New(
Expand Down
3 changes: 0 additions & 3 deletions service/vochain.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ func (vs *VocdoniService) Start() error {
if vs.Stats == nil {
vs.Stats = vochaininfo.NewVochainInfo(vs.App)
go vs.Stats.Start(10)

// Grab metrics
go vs.Stats.CollectMetrics(vs.MetricsAgent)
}

if !vs.Config.NoWaitSync {
Expand Down
17 changes: 16 additions & 1 deletion subpub/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,23 @@ import (
discrouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
discutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/metrics"
)

// Metrics exported via prometheus
var (
dhtLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "file",
Name: "peers_dht_latency",
Help: "The time it takes FindPeers to discover peers",
})
)

// setupDiscovery creates a DHT discovery service and attaches it to the libp2p Host.
// This lets us automatically discover peers and connect to them.
func (s *SubPub) setupDiscovery(ctx context.Context) {

// Set a function as stream handler. This function is called when a peer
// initiates a connection and starts a stream with this peer.
if !s.OnlyDiscover {
Expand All @@ -29,6 +39,8 @@ func (s *SubPub) setupDiscovery(ctx context.Context) {
s.routing = discrouting.NewRoutingDiscovery(s.node.DHT)
discutil.Advertise(ctx, s.routing, s.Topic)

metrics.Register(dhtLatency)

// Discover new peers periodically
go func() { // this spawns a single background task per instance
for {
Expand All @@ -46,6 +58,7 @@ func (s *SubPub) setupDiscovery(ctx context.Context) {
}

func (s *SubPub) discover(ctx context.Context) {
dhtLatencyTimer := prometheus.NewTimer(dhtLatency)
// Now, look for others who have announced.
// This is like your friend telling you the location to meet you.
log.Debugf("looking for peers in topic %s", s.Topic)
Expand All @@ -69,6 +82,8 @@ func (s *SubPub) discover(ctx context.Context) {
continue
}
// new peer; let's connect to it
// first update the latency metrics
dhtLatencyTimer.ObserveDuration()
connectCtx, cancel := context.WithTimeout(ctx, time.Second*10)
if err := s.node.PeerHost.Connect(connectCtx, peer); err != nil {
cancel()
Expand Down
Loading

0 comments on commit 0207f21

Please sign in to comment.