diff --git a/pumps/prometheus.go b/pumps/prometheus.go index b1b64adec..bd497baa9 100644 --- a/pumps/prometheus.go +++ b/pumps/prometheus.go @@ -206,97 +206,99 @@ func (p *PrometheusPump) initBaseMetrics() { continue } metric.aggregatedObservations = p.conf.AggregateObservations - if errInit := metric.InitVec(); errInit != nil { - p.log.Error(errInit) - } - trimmedAllMetrics = append(trimmedAllMetrics, metric) - } - p.allMetrics = trimmedAllMetrics -} - -// InitCustomMetrics initialise custom prometheus metrics based on p.conf.CustomMetrics and add them into p.allMetrics -func (p *PrometheusPump) InitCustomMetrics() { - if len(p.conf.CustomMetrics) > 0 { - customMetrics := []*PrometheusMetric{} - for i := range p.conf.CustomMetrics { - newMetric := &p.conf.CustomMetrics[i] - newMetric.aggregatedObservations = p.conf.AggregateObservations - errInit := newMetric.InitVec() - if errInit != nil { - p.log.Error("there was an error initialising custom prometheus metric ", newMetric.Name, " error:", errInit) - } else { - p.log.Info("added custom prometheus metric:", newMetric.Name) - customMetrics = append(customMetrics, newMetric) - } - } - - p.allMetrics = append(p.allMetrics, customMetrics...) - } -} - -func (p *PrometheusPump) WriteData(ctx context.Context, data []interface{}) error { - p.log.Debug("Attempting to write ", len(data), " records...") - - for i, item := range data { - select { - case <-ctx.Done(): - p.log.Warn("Purged ", i, " of ", len(data), " because of timeout.") - return errors.New("prometheus pump couldn't write all the analytics records") - default: - } - record := item.(analytics.AnalyticsRecord) - // we loop through all the metrics available. - for _, metric := range p.allMetrics { - if metric.enabled { - p.log.Debug("Processing metric:", metric.Name) - // we get the values for that metric required labels - values := metric.GetLabelsValues(record) - - switch metric.MetricType { - case counterType: - if metric.counterVec != nil { - // if the metric is a counter, we increment the counter memory map - err := metric.Inc(values...) - if err != nil { - p.log.WithFields(logrus.Fields{ - "metric_type": metric.MetricType, - "metric_name": metric.Name, - }).Error("error incrementing prometheus metric value:", err) - } - } - case histogramType: - if metric.histogramVec != nil { - // if the metric is an histogram, we Observe the request time with the given values - err := metric.Observe(record.RequestTime, values...) - if err != nil { - p.log.WithFields(logrus.Fields{ - "metric_type": metric.MetricType, - "metric_name": metric.Name, - }).Error("error incrementing prometheus metric value:", err) - } - } - default: - p.log.Debug("trying to process an invalid prometheus metric type:", metric.MetricType) - } - } - } - } - - // after looping through all the analytics records, we expose the metrics to prometheus endpoint - for _, customMetric := range p.allMetrics { - err := customMetric.Expose() - if err != nil { - p.log.WithFields(logrus.Fields{ - "metric_type": customMetric.MetricType, - "metric_name": customMetric.Name, - }).Error("error writing prometheus metric:", err) - } - } - - p.log.Info("Purged ", len(data), " records...") - - return nil -} + type PrometheusConf struct { + type PrometheusConf struct { + EnvPrefix string `mapstructure:"meta_env_prefix"` + Addr string `json:"listen_address" mapstructure:"listen_address"` + Path string `json:"path" mapstructure:"path"` + AggregateObservations bool `json:"aggregate_observations" mapstructure:"aggregate_observations"` + DisabledMetrics []string `json:"disabled_metrics" mapstructure:"disabled_metrics"` + CustomMetrics CustomMetrics `json:"custom_metrics" mapstructure:"custom_metrics"` + TrackAllPaths bool `json:"track_all_paths" mapstructure:"track_all_paths"` + } + + type CustomMetrics []PrometheusMetric + + type PrometheusMetric struct { + Name string `json:"name" mapstructure:"name"` + Help string `json:"help" mapstructure:"help"` + MetricType string `json:"metric_type" mapstructure:"metric_type"` + Buckets []float64 `json:"buckets" mapstructure:"buckets"` + Labels []string `json:"labels" mapstructure:"labels"` + + enabled bool + counterVec *prometheus.CounterVec + histogramVec *prometheus.HistogramVec + + counterMap map[string]counterStruct + + histogramMap map[string]histogramCounter + aggregatedObservations bool + } + + func (p *PrometheusPump) WriteData(ctx context.Context, data []interface{}) error { + p.log.Debug("Attempting to write ", len(data), " records...") + + for i, item := range data { + select { + case <-ctx.Done(): + p.log.Warn("Purged ", i, " of ", len(data), " because of timeout.") + return errors.New("prometheus pump couldn't write all the analytics records") + default: + } + record := item.(analytics.AnalyticsRecord) + // we loop through all the metrics available. + for _, metric := range p.allMetrics { + if metric.enabled { + p.log.Debug("Processing metric:", metric.Name) + // we get the values for that metric required labels + values := metric.GetLabelsValues(record) + + switch metric.MetricType { + case counterType: + if metric.counterVec != nil { + // if the metric is a counter, we increment the counter memory map + err := metric.Inc(values...) + if err != nil { + p.log.WithFields(logrus.Fields{ + "metric_type": metric.MetricType, + "metric_name": metric.Name, + }).Error("error incrementing prometheus metric value:", err) + } + } + case histogramType: + if metric.histogramVec != nil { + // if the metric is an histogram, we Observe the request time with the given values + err := metric.Observe(record.RequestTime, values...) + if err != nil { + p.log.WithFields(logrus.Fields{ + "metric_type": metric.MetricType, + "metric_name": metric.Name, + }).Error("error incrementing prometheus metric value:", err) + } + } + default: + p.log.Debug("trying to process an invalid prometheus metric type:", metric.MetricType) + } + } + } + } + + // after looping through all the analytics records, we expose the metrics to prometheus endpoint + for _, customMetric := range p.allMetrics { + err := customMetric.Expose() + if err != nil { + p.log.WithFields(logrus.Fields{ + "metric_type": customMetric.MetricType, + "metric_name": customMetric.Name, + }).Error("error writing prometheus metric:", err) + } + } + + p.log.Info("Purged ", len(data), " records...") + + return nil + } // InitVec inits the prometheus metric based on the metric_type. It only can create counter and histogram, // if the metric_type is anything else it returns an error