Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add track_all_path support for the prometheus pump #714

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 93 additions & 91 deletions pumps/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,97 +206,99 @@
continue
}
metric.aggregatedObservations = p.conf.AggregateObservations
if errInit := metric.InitVec(); errInit != nil {
p.log.Error(errInit)
}
trimmedAllMetrics = append(trimmedAllMetrics, metric)
}
p.allMetrics = trimmedAllMetrics
}

// InitCustomMetrics initialise custom prometheus metrics based on p.conf.CustomMetrics and add them into p.allMetrics
func (p *PrometheusPump) InitCustomMetrics() {
if len(p.conf.CustomMetrics) > 0 {
customMetrics := []*PrometheusMetric{}
for i := range p.conf.CustomMetrics {
newMetric := &p.conf.CustomMetrics[i]
newMetric.aggregatedObservations = p.conf.AggregateObservations
errInit := newMetric.InitVec()
if errInit != nil {
p.log.Error("there was an error initialising custom prometheus metric ", newMetric.Name, " error:", errInit)
} else {
p.log.Info("added custom prometheus metric:", newMetric.Name)
customMetrics = append(customMetrics, newMetric)
}
}

p.allMetrics = append(p.allMetrics, customMetrics...)
}
}

func (p *PrometheusPump) WriteData(ctx context.Context, data []interface{}) error {
p.log.Debug("Attempting to write ", len(data), " records...")

for i, item := range data {
select {
case <-ctx.Done():
p.log.Warn("Purged ", i, " of ", len(data), " because of timeout.")
return errors.New("prometheus pump couldn't write all the analytics records")
default:
}
record := item.(analytics.AnalyticsRecord)
// we loop through all the metrics available.
for _, metric := range p.allMetrics {
if metric.enabled {
p.log.Debug("Processing metric:", metric.Name)
// we get the values for that metric required labels
values := metric.GetLabelsValues(record)

switch metric.MetricType {
case counterType:
if metric.counterVec != nil {
// if the metric is a counter, we increment the counter memory map
err := metric.Inc(values...)
if err != nil {
p.log.WithFields(logrus.Fields{
"metric_type": metric.MetricType,
"metric_name": metric.Name,
}).Error("error incrementing prometheus metric value:", err)
}
}
case histogramType:
if metric.histogramVec != nil {
// if the metric is an histogram, we Observe the request time with the given values
err := metric.Observe(record.RequestTime, values...)
if err != nil {
p.log.WithFields(logrus.Fields{
"metric_type": metric.MetricType,
"metric_name": metric.Name,
}).Error("error incrementing prometheus metric value:", err)
}
}
default:
p.log.Debug("trying to process an invalid prometheus metric type:", metric.MetricType)
}
}
}
}

// after looping through all the analytics records, we expose the metrics to prometheus endpoint
for _, customMetric := range p.allMetrics {
err := customMetric.Expose()
if err != nil {
p.log.WithFields(logrus.Fields{
"metric_type": customMetric.MetricType,
"metric_name": customMetric.Name,
}).Error("error writing prometheus metric:", err)
}
}

p.log.Info("Purged ", len(data), " records...")

return nil
}
type PrometheusConf struct {
type PrometheusConf struct {

Check failure on line 210 in pumps/prometheus.go

View workflow job for this annotation

GitHub Actions / Go 1.19 tests

syntax error: unexpected type, expecting field name or embedded type
EnvPrefix string `mapstructure:"meta_env_prefix"`
Addr string `json:"listen_address" mapstructure:"listen_address"`
Path string `json:"path" mapstructure:"path"`
AggregateObservations bool `json:"aggregate_observations" mapstructure:"aggregate_observations"`
DisabledMetrics []string `json:"disabled_metrics" mapstructure:"disabled_metrics"`
CustomMetrics CustomMetrics `json:"custom_metrics" mapstructure:"custom_metrics"`
TrackAllPaths bool `json:"track_all_paths" mapstructure:"track_all_paths"`
}

type CustomMetrics []PrometheusMetric

type PrometheusMetric struct {
Name string `json:"name" mapstructure:"name"`
Help string `json:"help" mapstructure:"help"`
MetricType string `json:"metric_type" mapstructure:"metric_type"`
Buckets []float64 `json:"buckets" mapstructure:"buckets"`
Labels []string `json:"labels" mapstructure:"labels"`

enabled bool
counterVec *prometheus.CounterVec
histogramVec *prometheus.HistogramVec

counterMap map[string]counterStruct

histogramMap map[string]histogramCounter
aggregatedObservations bool
}

func (p *PrometheusPump) WriteData(ctx context.Context, data []interface{}) error {

Check failure on line 239 in pumps/prometheus.go

View workflow job for this annotation

GitHub Actions / Go 1.19 tests

syntax error: unexpected context in argument list; possibly missing comma or )
p.log.Debug("Attempting to write ", len(data), " records...")

for i, item := range data {
select {
case <-ctx.Done():
p.log.Warn("Purged ", i, " of ", len(data), " because of timeout.")
return errors.New("prometheus pump couldn't write all the analytics records")
default:
}
record := item.(analytics.AnalyticsRecord)
// we loop through all the metrics available.
for _, metric := range p.allMetrics {
if metric.enabled {
p.log.Debug("Processing metric:", metric.Name)
// we get the values for that metric required labels
values := metric.GetLabelsValues(record)

switch metric.MetricType {
case counterType:
if metric.counterVec != nil {
// if the metric is a counter, we increment the counter memory map
err := metric.Inc(values...)
if err != nil {
p.log.WithFields(logrus.Fields{
"metric_type": metric.MetricType,
"metric_name": metric.Name,
}).Error("error incrementing prometheus metric value:", err)
}
}
case histogramType:
if metric.histogramVec != nil {
// if the metric is an histogram, we Observe the request time with the given values
err := metric.Observe(record.RequestTime, values...)
if err != nil {
p.log.WithFields(logrus.Fields{
"metric_type": metric.MetricType,
"metric_name": metric.Name,
}).Error("error incrementing prometheus metric value:", err)
}
}
default:
p.log.Debug("trying to process an invalid prometheus metric type:", metric.MetricType)
}
}
}
}

// after looping through all the analytics records, we expose the metrics to prometheus endpoint
for _, customMetric := range p.allMetrics {
err := customMetric.Expose()
if err != nil {
p.log.WithFields(logrus.Fields{
"metric_type": customMetric.MetricType,
"metric_name": customMetric.Name,
}).Error("error writing prometheus metric:", err)
}
}

p.log.Info("Purged ", len(data), " records...")

return nil
}

// InitVec inits the prometheus metric based on the metric_type. It only can create counter and histogram,
// if the metric_type is anything else it returns an error
Expand Down
Loading