-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathscraper.go
138 lines (117 loc) · 4.21 KB
/
scraper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package rabbitmqreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver"
import (
"context"
"errors"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/rabbitmqreceiver/internal/models"
)
var errClientNotInit = errors.New("client not initialized")
// Names of metrics in message_stats
const (
deliverStat = "deliver"
publishStat = "publish"
ackStat = "ack"
dropUnroutableStat = "drop_unroutable"
)
// Metrics to gather from queue message_stats structure
var messageStatMetrics = []string{
deliverStat,
publishStat,
ackStat,
dropUnroutableStat,
}
// rabbitmqScraper handles scraping of RabbitMQ metrics
type rabbitmqScraper struct {
client client
logger *zap.Logger
cfg *Config
settings component.TelemetrySettings
mb *metadata.MetricsBuilder
}
// newScraper creates a new scraper
func newScraper(logger *zap.Logger, cfg *Config, settings receiver.Settings) *rabbitmqScraper {
return &rabbitmqScraper{
logger: logger,
cfg: cfg,
settings: settings.TelemetrySettings,
mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings),
}
}
// start starts the scraper by creating a new HTTP Client on the scraper
func (r *rabbitmqScraper) start(ctx context.Context, host component.Host) (err error) {
r.client, err = newClient(ctx, r.cfg, host, r.settings, r.logger)
return
}
// scrape collects metrics from the RabbitMQ API
func (r *rabbitmqScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
now := pcommon.NewTimestampFromTime(time.Now())
// Validate we don't attempt to scrape without initializing the client
if r.client == nil {
return pmetric.NewMetrics(), errClientNotInit
}
// Get queues for processing
queues, err := r.client.GetQueues(ctx)
if err != nil {
return pmetric.NewMetrics(), err
}
// Collect metrics for each queue
for _, queue := range queues {
r.collectQueue(queue, now)
}
return r.mb.Emit(), nil
}
// collectQueue collects metrics
func (r *rabbitmqScraper) collectQueue(queue *models.Queue, now pcommon.Timestamp) {
r.mb.RecordRabbitmqConsumerCountDataPoint(now, queue.Consumers)
r.mb.RecordRabbitmqMessageCurrentDataPoint(now, queue.UnacknowledgedMessages, metadata.AttributeMessageStateUnacknowledged)
r.mb.RecordRabbitmqMessageCurrentDataPoint(now, queue.ReadyMessages, metadata.AttributeMessageStateReady)
for _, messageStatMetric := range messageStatMetrics {
// Get metric value
val, ok := queue.MessageStats[messageStatMetric]
// A metric may not exist if the actions that increment it do not occur
if !ok {
r.logger.Debug("metric not found", zap.String("Metric", messageStatMetric), zap.String("Queue", queue.Name))
continue
}
// Convert to int64
val64, ok := convertValToInt64(val)
if !ok {
// Log warning if the metric is not in the format we expect
r.logger.Warn("metric not int64", zap.String("Metric", messageStatMetric), zap.String("Queue", queue.Name))
continue
}
switch messageStatMetric {
case deliverStat:
r.mb.RecordRabbitmqMessageDeliveredDataPoint(now, val64)
case publishStat:
r.mb.RecordRabbitmqMessagePublishedDataPoint(now, val64)
case ackStat:
r.mb.RecordRabbitmqMessageAcknowledgedDataPoint(now, val64)
case dropUnroutableStat:
r.mb.RecordRabbitmqMessageDroppedDataPoint(now, val64)
}
}
rb := r.mb.NewResourceBuilder()
rb.SetRabbitmqQueueName(queue.Name)
rb.SetRabbitmqNodeName(queue.Node)
rb.SetRabbitmqVhostName(queue.VHost)
r.mb.EmitForResource(metadata.WithResource(rb.Emit()))
}
// convertValToInt64 values from message state unmarshal as float64s but should be int64.
// Need to do a double cast to get an int64.
// This should never fail but worth checking just in case.
func convertValToInt64(val any) (int64, bool) {
f64Val, ok := val.(float64)
if !ok {
return 0, ok
}
return int64(f64Val), true
}