Skip to content

Commit

Permalink
lndmon: cache closedChannels response
Browse files Browse the repository at this point in the history
  • Loading branch information
djkazic committed Jan 31, 2024
1 parent f7d3fdf commit c8e69f9
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 16 deletions.
71 changes: 60 additions & 11 deletions collectors/channels_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/btcsuite/btcd/btcutil"
"github.com/lightninglabs/lndclient"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/prometheus/client_golang/prometheus"
)

// Cache refresh interval magic number.
const cacheRefreshInterval = 10 * time.Minute

// ChannelsCollector is a collector that keeps track of channel information.
type ChannelsCollector struct {
channelBalanceDesc *prometheus.Desc
Expand Down Expand Up @@ -51,17 +56,25 @@ type ChannelsCollector struct {
// errChan is a channel that we send any errors that we encounter into.
// This channel should be buffered so that it does not block sends.
errChan chan<- error

// quit is a channel that we use to signal for graceful shutdown.
quit chan struct{}

// cache is for storing results from a ticker to reduce grpc server
// load on lnd.
closedChannelsCache []lndclient.ClosedChannel
cacheMutex sync.RWMutex
}

// NewChannelsCollector returns a new instance of the ChannelsCollector for the
// target lnd client.
func NewChannelsCollector(lnd lndclient.LightningClient, errChan chan<- error,
cfg *MonitoringConfig) *ChannelsCollector {
quitChan chan struct{}, cfg *MonitoringConfig) *ChannelsCollector {

// Our set of labels, status should either be active or inactive. The
// initiator is "true" if we are the initiator, and "false" otherwise.
labels := []string{"chan_id", "status", "initiator", "peer"}
return &ChannelsCollector{
collector := &ChannelsCollector{
channelBalanceDesc: prometheus.NewDesc(
"lnd_channels_open_balance_sat",
"total balance of channels in satoshis",
Expand Down Expand Up @@ -174,10 +187,49 @@ func NewChannelsCollector(lnd lndclient.LightningClient, errChan chan<- error,
[]string{"amount"}, nil,
),

lnd: lnd,
primaryNode: cfg.PrimaryNode,
errChan: errChan,
lnd: lnd,
primaryNode: cfg.PrimaryNode,
closedChannelsCache: nil,
errChan: errChan,
quit: quitChan,
}

// Start a ticker to update the cache once per 10m
go func() {
ticker := time.NewTicker(cacheRefreshInterval)
defer ticker.Stop()

for {
err := collector.refreshClosedChannelsCache()
if err != nil {
errChan <- err
}

select {
case <-ticker.C:
continue

case <-collector.quit:
return
}
}
}()

return collector
}

// refreshClosedChannelsCache acquires a mutex write lock to update
// the closedChannelsCache.
func (c *ChannelsCollector) refreshClosedChannelsCache() error {
data, err := c.lnd.ClosedChannels(context.Background())
if err != nil {
return err
}
c.cacheMutex.Lock()
c.closedChannelsCache = data
c.cacheMutex.Unlock()

return nil
}

// Describe sends the super-set of all possible descriptors of metrics
Expand Down Expand Up @@ -452,12 +504,9 @@ func (c *ChannelsCollector) Collect(ch chan<- prometheus.Metric) {
)

// Get the list of closed channels.
closedChannelsResp, err := c.lnd.ClosedChannels(context.Background())
if err != nil {
c.errChan <- fmt.Errorf("ChannelsCollector ClosedChannels "+
"failed with: %v", err)
return
}
c.cacheMutex.RLock()
closedChannelsResp := c.closedChannelsCache
c.cacheMutex.RUnlock()
closeCounts := make(map[string]int)
for _, channel := range closedChannelsResp {
typeString, ok := closeTypeLabelMap[channel.CloseType]
Expand Down
13 changes: 9 additions & 4 deletions collectors/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func DefaultConfig() *PrometheusConfig {
// NewPrometheusExporter makes a new instance of the PrometheusExporter given
// the address to listen for Prometheus on and an lnd gRPC client.
func NewPrometheusExporter(cfg *PrometheusConfig, lnd *lndclient.LndServices,
monitoringCfg *MonitoringConfig) *PrometheusExporter {
monitoringCfg *MonitoringConfig, quitChan chan struct{}) *PrometheusExporter {

// We have six collectors and a htlc monitor running, so we buffer our
// error channel by 7 so that we do not need to consume all errors from
Expand All @@ -94,11 +94,16 @@ func NewPrometheusExporter(cfg *PrometheusConfig, lnd *lndclient.LndServices,

htlcMonitor := newHtlcMonitor(lnd.Router, errChan)

chanCollector := NewChannelsCollector(
lnd.Client, errChan, quitChan, monitoringCfg,
)
go func() {
close(chanCollector.quit)
}()

collectors := []prometheus.Collector{
NewChainCollector(lnd.Client, errChan),
NewChannelsCollector(
lnd.Client, errChan, monitoringCfg,
),
chanCollector,
NewWalletCollector(lnd, errChan),
NewPeerCollector(lnd.Client, errChan),
NewInfoCollector(lnd.Client, errChan),
Expand Down
4 changes: 3 additions & 1 deletion lndmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func start() error {
return err
}

quit := make(chan struct{})
interceptor, err := signal.Intercept()
if err != nil {
return fmt.Errorf("could not intercept signal: %v", err)
Expand Down Expand Up @@ -72,7 +73,7 @@ func start() error {
// Start our Prometheus exporter. This exporter spawns a goroutine
// that pulls metrics from our lnd client on a set interval.
exporter := collectors.NewPrometheusExporter(
cfg.Prometheus, &lnd.LndServices, &monitoringCfg,
cfg.Prometheus, &lnd.LndServices, &monitoringCfg, quit,
)
if err := exporter.Start(); err != nil {
return err
Expand All @@ -83,6 +84,7 @@ func start() error {
var stopErr error
select {
case <-interceptor.ShutdownChannel():
close(quit)
fmt.Println("Exiting lndmon.")

case stopErr = <-exporter.Errors():
Expand Down

0 comments on commit c8e69f9

Please sign in to comment.