From 835ca991927e7bef89b3c6684600dae0db9295d0 Mon Sep 17 00:00:00 2001 From: Gui Iribarren Date: Mon, 25 Sep 2023 16:40:05 +0200 Subject: [PATCH 1/2] metrics: refactor using NewGaugeFunc * make Register a package func instead of a method * drop storage.CollectMetrics method, stats are collected on demand --- data/data.go | 4 +- data/datamocktest.go | 7 +- data/ipfs/ipfs.go | 40 +++++++---- data/ipfs/metrics.go | 73 ++++--------------- metrics/metrics.go | 4 +- service/ipfs.go | 7 +- service/vochain.go | 3 - vochain/vochaininfo/metrics.go | 109 +++++++++-------------------- vochain/vochaininfo/vochaininfo.go | 1 + 9 files changed, 79 insertions(+), 169 deletions(-) diff --git a/data/data.go b/data/data.go index 1993130b7..350cb18d9 100644 --- a/data/data.go +++ b/data/data.go @@ -6,7 +6,6 @@ import ( "fmt" "go.vocdoni.io/dvote/data/ipfs" - "go.vocdoni.io/dvote/metrics" "go.vocdoni.io/dvote/types" ) @@ -19,8 +18,7 @@ type Storage interface { Unpin(ctx context.Context, path string) error ListPins(ctx context.Context) (map[string]string, error) URIprefix() string - Stats(ctx context.Context) map[string]any - CollectMetrics(ctx context.Context, ma *metrics.Agent) error + Stats() map[string]any Stop() error } diff --git a/data/datamocktest.go b/data/datamocktest.go index 1bfe180ef..4c1608d10 100644 --- a/data/datamocktest.go +++ b/data/datamocktest.go @@ -8,7 +8,6 @@ import ( "time" "go.vocdoni.io/dvote/data/ipfs" - "go.vocdoni.io/dvote/metrics" "go.vocdoni.io/dvote/test/testcommon/testutil" "go.vocdoni.io/dvote/types" ) @@ -80,11 +79,7 @@ func (d *DataMockTest) URIprefix() string { return d.prefix } -func (*DataMockTest) Stats(_ context.Context) map[string]any { - return nil -} - -func (*DataMockTest) CollectMetrics(_ context.Context, _ *metrics.Agent) error { +func (*DataMockTest) Stats() map[string]any { return nil } diff --git a/data/ipfs/ipfs.go b/data/ipfs/ipfs.go index 6960a4446..d0bddf72c 100644 --- a/data/ipfs/ipfs.go +++ b/data/ipfs/ipfs.go @@ -131,6 +131,8 @@ func (i *Handler) Init(d *types.DataStore) error { return err } + i.registerMetrics() + return nil } @@ -207,39 +209,49 @@ func (i *Handler) Unpin(ctx context.Context, path string) error { } // Stats returns stats about the IPFS node. -func (i *Handler) Stats(ctx context.Context) map[string]any { - peers, err := i.CoreAPI.Swarm().Peers(ctx) - if err != nil { - return map[string]any{"error": err.Error()} - } - addresses, err := i.CoreAPI.Swarm().KnownAddrs(ctx) +func (i *Handler) Stats() map[string]any { + return map[string]any{"peers": i.countPeers(), "addresses": i.countKnownAddrs(), "pins": i.countPins()} +} + +func (i *Handler) countPeers() float64 { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + v, err := i.CoreAPI.Swarm().Peers(ctx) if err != nil { - return map[string]any{"error": err.Error()} + return -1 } - pins, err := i.countPins(ctx) + return float64(len(v)) +} + +func (i *Handler) countKnownAddrs() float64 { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + v, err := i.CoreAPI.Swarm().KnownAddrs(ctx) if err != nil { - return map[string]any{"error": err.Error()} + return -1 } - return map[string]any{"peers": len(peers), "addresses": len(addresses), "pins": pins} + return float64(len(v)) } -func (i *Handler) countPins(ctx context.Context) (int, error) { +func (i *Handler) countPins() float64 { // Note that pins is a channel that gets closed when finished. // We MUST range over the entire channel to not leak goroutines. // Maybe there is a way to get the total number of pins without // iterating over them? + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() pins, err := i.CoreAPI.Pin().Ls(ctx) if err != nil { - return 0, err + return -1 } count := 0 for pin := range pins { if err := pin.Err(); err != nil { - return 0, err + return -1 } count++ } - return count, nil + return float64(count) } // ListPins returns a map of all pinned CIDs and their types diff --git a/data/ipfs/metrics.go b/data/ipfs/metrics.go index 6d6599c99..9aa4bc880 100644 --- a/data/ipfs/metrics.go +++ b/data/ipfs/metrics.go @@ -1,78 +1,31 @@ package ipfs import ( - "context" - "time" - "github.com/prometheus/client_golang/prometheus" "go.vocdoni.io/dvote/metrics" ) -// File collectors -var ( - // FilePeers ... - FilePeers = prometheus.NewGauge(prometheus.GaugeOpts{ +// registerMetrics registers prometheus metrics +func (i *Handler) registerMetrics() { + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "file", Name: "peers", Help: "The number of connected peers", - }) - // FileAddresses ... - FileAddresses = prometheus.NewGauge(prometheus.GaugeOpts{ + }, + i.countPeers)) + + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "file", Name: "addresses", Help: "The number of registered addresses", - }) - // FilePins ... - FilePins = prometheus.NewGauge(prometheus.GaugeOpts{ + }, + i.countKnownAddrs)) + + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "file", Name: "pins", Help: "The number of pinned files", - }) -) - -// RegisterMetrics to initialize the metrics to the agent -func (*Handler) registerMetrics(ma *metrics.Agent) { - ma.Register(FilePeers) - ma.Register(FileAddresses) - ma.Register(FilePins) -} - -// setMetrics to be called as a loop and grab metrics -func (i *Handler) setMetrics(ctx context.Context) error { - peers, err := i.CoreAPI.Swarm().Peers(ctx) - if err != nil { - return err - } - FilePeers.Set(float64(len(peers))) - addresses, err := i.CoreAPI.Swarm().KnownAddrs(ctx) - if err != nil { - return err - } - FileAddresses.Set(float64(len(addresses))) - pins, err := i.countPins(ctx) - if err != nil { - return err - } - FilePins.Set(float64(pins)) - return nil -} - -// CollectMetrics constantly updates the metric values for prometheus -// The function is blocking, should be called in a go routine -// If the metrics Agent is nil, do nothing -func (i *Handler) CollectMetrics(ctx context.Context, ma *metrics.Agent) error { - if ma != nil { - i.registerMetrics(ma) - for { - time.Sleep(ma.RefreshInterval) - tctx, cancel := context.WithTimeout(ctx, time.Minute) - err := i.setMetrics(tctx) - cancel() - if err != nil { - return err - } - } - } - return nil + }, + i.countPins)) } diff --git a/metrics/metrics.go b/metrics/metrics.go index 615d1ec11..6e08e7538 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -23,8 +23,8 @@ func NewAgent(path string, interval time.Duration, router *httprouter.HTTProuter return &ma } -// Register adds a prometheus collector -func (*Agent) Register(c prometheus.Collector) { +// Register the provided prometheus collector, ignoring any error returned (simply logs a Warn) +func Register(c prometheus.Collector) { err := prometheus.Register(c) if err != nil { log.Warnf("cannot register metrics: (%s) (%+v)", err, c) diff --git a/service/ipfs.go b/service/ipfs.go index abfd64bc6..b80459df1 100644 --- a/service/ipfs.go +++ b/service/ipfs.go @@ -1,7 +1,6 @@ package service import ( - "context" "os" "time" @@ -25,14 +24,10 @@ func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage go func() { for { time.Sleep(time.Second * 120) - tctx, cancel := context.WithTimeout(context.Background(), time.Minute) - log.Monitor("ipfs storage", storage.Stats(tctx)) - cancel() + log.Monitor("ipfs storage", storage.Stats()) } }() - go storage.CollectMetrics(context.Background(), vs.MetricsAgent) - if len(ipfsconfig.ConnectKey) > 0 { log.Infow("starting ipfsconnect service", "key", ipfsconfig.ConnectKey) ipfsconn := ipfsconnect.New( diff --git a/service/vochain.go b/service/vochain.go index 2cd0e3f4f..9d4c3749d 100644 --- a/service/vochain.go +++ b/service/vochain.go @@ -132,9 +132,6 @@ func (vs *VocdoniService) Start() error { if vs.Stats == nil { vs.Stats = vochaininfo.NewVochainInfo(vs.App) go vs.Stats.Start(10) - - // Grab metrics - go vs.Stats.CollectMetrics(vs.MetricsAgent) } if !vs.Config.NoWaitSync { diff --git a/vochain/vochaininfo/metrics.go b/vochain/vochaininfo/metrics.go index b607c6a09..2f7f32d76 100644 --- a/vochain/vochaininfo/metrics.go +++ b/vochain/vochaininfo/metrics.go @@ -1,113 +1,72 @@ package vochaininfo import ( - "time" - "github.com/prometheus/client_golang/prometheus" "go.vocdoni.io/dvote/metrics" ) -// Vochain collectors -var ( - // VochainHeight ... - VochainHeight = prometheus.NewGauge(prometheus.GaugeOpts{ +// registerMetrics registers each of the vochain prometheus metrics +func (vi *VochainInfo) registerMetrics() { + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "vochain", Name: "height", Help: "Height of the vochain (last block)", - }) - // VochainMempool ... - VochainMempool = prometheus.NewGauge(prometheus.GaugeOpts{ + }, + func() float64 { return float64(vi.Height()) })) + + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "vochain", Name: "mempool", Help: "Number of Txs in the mempool", - }) - // VochainAppTree ... - VochainAppTree = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "vochain", - Name: "app_tree", - Help: "Size of the app tree", - }) - // VochainProcessTree ... - VochainProcessTree = prometheus.NewGauge(prometheus.GaugeOpts{ + }, + func() float64 { return float64(vi.MempoolSize()) })) + + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "vochain", Name: "process_tree", Help: "Size of the process tree", - }) - // VochainVoteTree ... - VochainVoteTree = prometheus.NewGauge(prometheus.GaugeOpts{ + }, + func() float64 { p, _, _ := vi.TreeSizes(); return float64(p) })) + + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "vochain", Name: "vote_tree", Help: "Size of the vote tree", - }) - // VochainVotesPerMinute ... - VochainVotesPerMinute = prometheus.NewGauge(prometheus.GaugeOpts{ + }, + func() float64 { _, v, _ := vi.TreeSizes(); return float64(v) })) + + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "vochain", Name: "vote_tree_increase_last_minute", Help: "Number of votes included in the vote tree the last 60 seconds", - }) - // VochainAppTree ... - VochainVoteCache = prometheus.NewGauge(prometheus.GaugeOpts{ + }, + func() float64 { _, _, vxm := vi.TreeSizes(); return float64(vxm) })) + + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "vochain", Name: "vote_cache", Help: "Size of the current vote cache", - }) + }, + func() float64 { return float64(vi.VoteCacheSize()) })) - VochainAccountTree = prometheus.NewGauge(prometheus.GaugeOpts{ + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "vochain", Name: "account_tree", Help: "Size of the account tree", - }) + }, + func() float64 { return float64(vi.AccountTreeSize()) })) - VochainSIKTree = prometheus.NewGauge(prometheus.GaugeOpts{ + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "vochain", Name: "sik_tree", Help: "Size of the SIK tree", - }) + }, + func() float64 { return float64(vi.SIKTreeSize()) })) - VochainTokensBurned = prometheus.NewGauge(prometheus.GaugeOpts{ + metrics.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "vochain", Name: "tokens_burned", Help: "Balance of the burn address", - }) -) - -// registerMetrics registers each of the vochain prometheus metrics -func (*VochainInfo) registerMetrics(ma *metrics.Agent) { - ma.Register(VochainHeight) - ma.Register(VochainMempool) - ma.Register(VochainAppTree) - ma.Register(VochainProcessTree) - ma.Register(VochainVoteTree) - ma.Register(VochainVotesPerMinute) - ma.Register(VochainVoteCache) - ma.Register(VochainAccountTree) - ma.Register(VochainSIKTree) - ma.Register(VochainTokensBurned) -} - -// setMetrics updates the metrics values to the current state -func (vi *VochainInfo) setMetrics() { - VochainHeight.Set(float64(vi.Height())) - VochainMempool.Set(float64(vi.MempoolSize())) - p, v, vxm := vi.TreeSizes() - VochainProcessTree.Set(float64(p)) - VochainVoteTree.Set(float64(v)) - VochainVotesPerMinute.Set(float64(vxm)) - VochainVoteCache.Set(float64(vi.VoteCacheSize())) - VochainAccountTree.Set(float64(vi.AccountTreeSize())) - VochainSIKTree.Set(float64(vi.SIKTreeSize())) - VochainTokensBurned.Set(float64(vi.TokensBurned())) -} - -// CollectMetrics constantly updates the metric values for prometheus -// The function is blocking, should be called in a go routine -// If the metrics Agent is nil, do nothing -func (vi *VochainInfo) CollectMetrics(ma *metrics.Agent) { - if ma != nil { - vi.registerMetrics(ma) - for { - time.Sleep(ma.RefreshInterval) - vi.setMetrics() - } - } + }, + func() float64 { return float64(vi.TokensBurned()) })) } diff --git a/vochain/vochaininfo/vochaininfo.go b/vochain/vochaininfo/vochaininfo.go index 756818817..f30f85356 100644 --- a/vochain/vochaininfo/vochaininfo.go +++ b/vochain/vochaininfo/vochaininfo.go @@ -222,6 +222,7 @@ func (vi *VochainInfo) NPeers() int { // TODO: use time.Duration instead of int64 func (vi *VochainInfo) Start(sleepSecs int64) { log.Infof("starting vochain info service every %d seconds", sleepSecs) + vi.registerMetrics() var duration time.Duration var pheight, height int64 var h1, h10, h60, h360, h1440 int64 From 00f284e638a4ac5593765f50eba1a219352e4bc5 Mon Sep 17 00:00:00 2001 From: Gui Iribarren Date: Wed, 6 Sep 2023 12:24:42 +0200 Subject: [PATCH 2/2] subpub: measure DHT latency and export via prometheus --- subpub/discovery.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/subpub/discovery.go b/subpub/discovery.go index 0f905b856..d6655fa7f 100644 --- a/subpub/discovery.go +++ b/subpub/discovery.go @@ -10,13 +10,23 @@ import ( discrouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" discutil "github.com/libp2p/go-libp2p/p2p/discovery/util" multiaddr "github.com/multiformats/go-multiaddr" + "github.com/prometheus/client_golang/prometheus" "go.vocdoni.io/dvote/log" + "go.vocdoni.io/dvote/metrics" +) + +// Metrics exported via prometheus +var ( + dhtLatency = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "file", + Name: "peers_dht_latency", + Help: "The time it takes FindPeers to discover peers", + }) ) // setupDiscovery creates a DHT discovery service and attaches it to the libp2p Host. // This lets us automatically discover peers and connect to them. func (s *SubPub) setupDiscovery(ctx context.Context) { - // Set a function as stream handler. This function is called when a peer // initiates a connection and starts a stream with this peer. if !s.OnlyDiscover { @@ -29,6 +39,8 @@ func (s *SubPub) setupDiscovery(ctx context.Context) { s.routing = discrouting.NewRoutingDiscovery(s.node.DHT) discutil.Advertise(ctx, s.routing, s.Topic) + metrics.Register(dhtLatency) + // Discover new peers periodically go func() { // this spawns a single background task per instance for { @@ -46,6 +58,7 @@ func (s *SubPub) setupDiscovery(ctx context.Context) { } func (s *SubPub) discover(ctx context.Context) { + dhtLatencyTimer := prometheus.NewTimer(dhtLatency) // Now, look for others who have announced. // This is like your friend telling you the location to meet you. log.Debugf("looking for peers in topic %s", s.Topic) @@ -69,6 +82,8 @@ func (s *SubPub) discover(ctx context.Context) { continue } // new peer; let's connect to it + // first update the latency metrics + dhtLatencyTimer.ObserveDuration() connectCtx, cancel := context.WithTimeout(ctx, time.Second*10) if err := s.node.PeerHost.Connect(connectCtx, peer); err != nil { cancel()