Skip to content

Commit

Permalink
Merge pull request #108 from hikhvar/fix-reloading-with-big-set-of-ta…
Browse files Browse the repository at this point in the history
…rgets

Fix bugs introduced by hot reloading of targets
  • Loading branch information
czerwonk authored Mar 27, 2024
2 parents 64b4119 + 436cd06 commit f5f97ad
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 74 deletions.
97 changes: 57 additions & 40 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,66 @@ import (
"github.com/czerwonk/ping_exporter/config"
)

var (
labelNames []string
type pingCollector struct {
monitor *mon.Monitor
enableDeprecatedMetrics bool
rttUnit rttUnit

cfg *config.Config

mutex sync.RWMutex

customLabels *customLabelSet
metrics map[string]*mon.Metrics

rttDesc scaledMetrics
bestDesc scaledMetrics
worstDesc scaledMetrics
meanDesc scaledMetrics
stddevDesc scaledMetrics
lossDesc *prometheus.Desc
progDesc *prometheus.Desc
mutex *sync.Mutex
)
}

type pingCollector struct {
cfg *config.Config
customLabels *customLabelSet
monitor *mon.Monitor
metrics map[string]*mon.Metrics
func NewPingCollector(enableDeprecatedMetrics bool, unit rttUnit, monitor *mon.Monitor, cfg *config.Config) *pingCollector {
ret := &pingCollector{
monitor: monitor,
enableDeprecatedMetrics: enableDeprecatedMetrics,
rttUnit: unit,
cfg: cfg,
}
ret.customLabels = newCustomLabelSet(cfg.Targets)
ret.createDesc()
return ret
}

func (p *pingCollector) Describe(ch chan<- *prometheus.Desc) {
p.createDesc()
func (p *pingCollector) UpdateConfig(cfg *config.Config) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.cfg.Targets = cfg.Targets
}

if enableDeprecatedMetrics {
rttDesc.Describe(ch)
func (p *pingCollector) Describe(ch chan<- *prometheus.Desc) {
if p.enableDeprecatedMetrics {
p.rttDesc.Describe(ch)
}
bestDesc.Describe(ch)
worstDesc.Describe(ch)
meanDesc.Describe(ch)
stddevDesc.Describe(ch)
ch <- lossDesc
ch <- progDesc
p.bestDesc.Describe(ch)
p.worstDesc.Describe(ch)
p.meanDesc.Describe(ch)
p.stddevDesc.Describe(ch)
ch <- p.lossDesc
ch <- p.progDesc
}

func (p *pingCollector) Collect(ch chan<- prometheus.Metric) {
mutex.Lock()
defer mutex.Unlock()
p.mutex.Lock()
defer p.mutex.Unlock()

if m := p.monitor.Export(); len(m) > 0 {
p.metrics = m
}

ch <- prometheus.MustNewConstMetric(progDesc, prometheus.GaugeValue, 1)
ch <- prometheus.MustNewConstMetric(p.progDesc, prometheus.GaugeValue, 1)

for target, metrics := range p.metrics {
l := strings.SplitN(target, " ", 3)
Expand All @@ -63,35 +81,34 @@ func (p *pingCollector) Collect(ch chan<- prometheus.Metric) {

if metrics.PacketsSent > metrics.PacketsLost {
if enableDeprecatedMetrics {
rttDesc.Collect(ch, metrics.Best, append(l, "best")...)
rttDesc.Collect(ch, metrics.Worst, append(l, "worst")...)
rttDesc.Collect(ch, metrics.Mean, append(l, "mean")...)
rttDesc.Collect(ch, metrics.StdDev, append(l, "std_dev")...)
p.rttDesc.Collect(ch, metrics.Best, append(l, "best")...)
p.rttDesc.Collect(ch, metrics.Worst, append(l, "worst")...)
p.rttDesc.Collect(ch, metrics.Mean, append(l, "mean")...)
p.rttDesc.Collect(ch, metrics.StdDev, append(l, "std_dev")...)
}

bestDesc.Collect(ch, metrics.Best, l...)
worstDesc.Collect(ch, metrics.Worst, l...)
meanDesc.Collect(ch, metrics.Mean, l...)
stddevDesc.Collect(ch, metrics.StdDev, l...)
p.bestDesc.Collect(ch, metrics.Best, l...)
p.worstDesc.Collect(ch, metrics.Worst, l...)
p.meanDesc.Collect(ch, metrics.Mean, l...)
p.stddevDesc.Collect(ch, metrics.StdDev, l...)
}

loss := float64(metrics.PacketsLost) / float64(metrics.PacketsSent)
ch <- prometheus.MustNewConstMetric(lossDesc, prometheus.GaugeValue, loss, l...)
ch <- prometheus.MustNewConstMetric(p.lossDesc, prometheus.GaugeValue, loss, l...)
}
}

func (p *pingCollector) createDesc() {
labelNames = []string{"target", "ip", "ip_version"}
labelNames := []string{"target", "ip", "ip_version"}
labelNames = append(labelNames, p.customLabels.labelNames()...)

rttDesc = newScaledDesc("rtt", "Round trip time", append(labelNames, "type"))
bestDesc = newScaledDesc("rtt_best", "Best round trip time", labelNames)
worstDesc = newScaledDesc("rtt_worst", "Worst round trip time", labelNames)
meanDesc = newScaledDesc("rtt_mean", "Mean round trip time", labelNames)
stddevDesc = newScaledDesc("rtt_std_deviation", "Standard deviation", labelNames)
lossDesc = newDesc("loss_ratio", "Packet loss from 0.0 to 1.0", labelNames, nil)
progDesc = newDesc("up", "ping_exporter version", nil, prometheus.Labels{"version": version})
mutex = &sync.Mutex{}
p.rttDesc = newScaledDesc("rtt", "Round trip time", p.rttUnit, append(labelNames, "type"))
p.bestDesc = newScaledDesc("rtt_best", "Best round trip time", p.rttUnit, labelNames)
p.worstDesc = newScaledDesc("rtt_worst", "Worst round trip time", p.rttUnit, labelNames)
p.meanDesc = newScaledDesc("rtt_mean", "Mean round trip time", p.rttUnit, labelNames)
p.stddevDesc = newScaledDesc("rtt_std_deviation", "Standard deviation", p.rttUnit, labelNames)
p.lossDesc = newDesc("loss_ratio", "Packet loss from 0.0 to 1.0", labelNames, nil)
p.progDesc = newDesc("up", "ping_exporter version", nil, prometheus.Labels{"version": version})
}

func newDesc(name, help string, variableLabels []string, constLabels prometheus.Labels) *prometheus.Desc {
Expand Down
80 changes: 51 additions & 29 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/digineo/go-ping"
Expand Down Expand Up @@ -116,13 +117,18 @@ func main() {
kingpin.FatalUsage("No targets specified")
}

m, err := startMonitor(cfg)
resolver := setupResolver(cfg)

m, err := startMonitor(cfg, resolver)
if err != nil {
log.Errorln(err)
os.Exit(2)
}

startServer(cfg, m)
collector := NewPingCollector(enableDeprecatedMetrics, rttMetricsScale, m, cfg)
go watchConfig(desiredTargets, resolver, m, collector)

startServer(cfg, collector)
}

func printVersion() {
Expand All @@ -132,8 +138,7 @@ func printVersion() {
fmt.Println("Metric exporter for go-icmp")
}

func startMonitor(cfg *config.Config) (*mon.Monitor, error) {
resolver := setupResolver(cfg)
func startMonitor(cfg *config.Config, resolver *net.Resolver) (*mon.Monitor, error) {
var bind4, bind6 string
if ln, err := net.Listen("tcp4", "127.0.0.1:0"); err == nil {
// ipv4 enabled
Expand Down Expand Up @@ -165,42 +170,50 @@ func startMonitor(cfg *config.Config) (*mon.Monitor, error) {
}

go startDNSAutoRefresh(cfg.DNS.Refresh.Duration(), desiredTargets, monitor, cfg)
go watchConfig(desiredTargets, resolver, monitor)
return monitor, nil
}

func upsertTargets(globalTargets *targets, resolver *net.Resolver, cfg *config.Config, monitor *mon.Monitor) error {
oldTargets := globalTargets.Targets()
newTargets := make([]*target, len(cfg.Targets))
var wg sync.WaitGroup
for i, t := range cfg.Targets {
t := &target{
host: t.Addr,
addresses: make([]net.IPAddr, 0),
delay: time.Duration(10*i) * time.Millisecond,
resolver: resolver,
}
newTargets[i] = t

err := t.addOrUpdateMonitor(monitor, targetOpts{
disableIPv4: cfg.Options.DisableIPv4,
disableIPv6: cfg.Options.DisableIPv6,
})
if err != nil {
return fmt.Errorf("failed to setup target: %w", err)
newTarget := globalTargets.Get(t.Addr)
if newTarget == nil {
newTarget = &target{
host: t.Addr,
addresses: make([]net.IPAddr, 0),
delay: time.Duration(10*i) * time.Millisecond,
resolver: resolver,
}
}
}

newTargets[i] = newTarget

wg.Add(1)
go func() {
err := newTarget.addOrUpdateMonitor(monitor, targetOpts{
disableIPv4: cfg.Options.DisableIPv4,
disableIPv6: cfg.Options.DisableIPv6,
})
if err != nil {
log.Errorf("failed to setup target: %v", err)
}
wg.Done()
}()
}
wg.Wait()
globalTargets.SetTargets(newTargets)

removed := removedTargets(oldTargets, globalTargets)
for _, removedTarget := range removed {
log.Infof("remove target: %s\n", removedTarget.host)
log.Infof("remove target: %s", removedTarget.host)
removedTarget.removeFromMonitor(monitor)
}
return nil
}

func watchConfig(globalTargets *targets, resolver *net.Resolver, monitor *mon.Monitor) {
func watchConfig(globalTargets *targets, resolver *net.Resolver, monitor *mon.Monitor, collector *pingCollector) {
watcher, err := inotify.NewWatcher()
if err != nil {
log.Fatalf("unable to create file watcher: %v", err)
Expand All @@ -212,17 +225,30 @@ func watchConfig(globalTargets *targets, resolver *net.Resolver, monitor *mon.Mo
}
for {
select {
case <-watcher.Events:
case event := <-watcher.Events:
log.Debugf("Got file inotify event: %s", event)
// If the file is removed, the inotify watcher will lose track of the file. Add it again.
if event.Op == inotify.Remove {
if err = watcher.Add(*configFile); err != nil {
log.Fatalf("failed to renew watch for file: %v", err)
}
}
cfg, err := loadConfig()
if err != nil {
log.Errorf("unable to load config: %v", err)
continue
}
// We get zero targets if the file was truncated. This happens if an automation tool rewrites
// the complete file, instead of alternating only parts of it.
if len(cfg.Targets) == 0 {
continue
}
log.Infof("reloading config file %s", *configFile)
if err := upsertTargets(globalTargets, resolver, cfg, monitor); err != nil {
log.Errorf("failed to reload config: %v", err)
continue
}
collector.UpdateConfig(cfg)
case err := <-watcher.Errors:
log.Errorf("watching file failed: %v", err)
}
Expand Down Expand Up @@ -264,19 +290,15 @@ func refreshDNS(tar *targets, monitor *mon.Monitor, cfg *config.Config) {
}
}

func startServer(cfg *config.Config, monitor *mon.Monitor) {
func startServer(cfg *config.Config, collector *pingCollector) {
var err error
log.Infof("Starting ping exporter (Version: %s)", version)
http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintf(w, indexHTML, *metricsPath)
})

reg := prometheus.NewRegistry()
reg.MustRegister(&pingCollector{
cfg: cfg,
monitor: monitor,
customLabels: newCustomLabelSet(cfg.Targets),
})
reg.MustRegister(collector)

l := log.New()
l.Level = log.ErrorLevel
Expand Down
12 changes: 7 additions & 5 deletions rttscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,30 @@ func rttUnitFromString(s string) rttUnit {
type scaledMetrics struct {
Millis *prometheus.Desc
Seconds *prometheus.Desc
scale rttUnit
}

func (s *scaledMetrics) Describe(ch chan<- *prometheus.Desc) {
if rttMetricsScale == rttInMills || rttMetricsScale == rttBoth {
if s.scale == rttInMills || s.scale == rttBoth {
ch <- s.Millis
}
if rttMetricsScale == rttInSeconds || rttMetricsScale == rttBoth {
if s.scale == rttInSeconds || s.scale == rttBoth {
ch <- s.Seconds
}
}

func (s *scaledMetrics) Collect(ch chan<- prometheus.Metric, value float32, labelValues ...string) {
if rttMetricsScale == rttInMills || rttMetricsScale == rttBoth {
if s.scale == rttInMills || s.scale == rttBoth {
ch <- prometheus.MustNewConstMetric(s.Millis, prometheus.GaugeValue, float64(value), labelValues...)
}
if rttMetricsScale == rttInSeconds || rttMetricsScale == rttBoth {
if s.scale == rttInSeconds || s.scale == rttBoth {
ch <- prometheus.MustNewConstMetric(s.Seconds, prometheus.GaugeValue, float64(value)/1000, labelValues...)
}
}

func newScaledDesc(name, help string, variableLabels []string) scaledMetrics {
func newScaledDesc(name, help string, scale rttUnit, variableLabels []string) scaledMetrics {
return scaledMetrics{
scale: scale,
Millis: newDesc(name+"_ms", help+" in millis (deprecated)", variableLabels, nil),
Seconds: newDesc(name+"_seconds", help+" in seconds", variableLabels, nil),
}
Expand Down
13 changes: 13 additions & 0 deletions target.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (t *targets) SetTargets(tar []*target) {
}

func (t *targets) Contains(tar *target) bool {
t.mutex.RLock()
defer t.mutex.RUnlock()
for _, ta := range t.t {
if ta.host == tar.host {
return true
Expand All @@ -45,6 +47,17 @@ func (t *targets) Contains(tar *target) bool {
return false
}

func (t *targets) Get(host string) *target {
t.mutex.RLock()
defer t.mutex.RUnlock()
for _, ta := range t.t {
if ta.host == host {
return ta
}
}
return nil
}

func (t *targets) Targets() []*target {
t.mutex.RLock()
defer t.mutex.RUnlock()
Expand Down

0 comments on commit f5f97ad

Please sign in to comment.