Skip to content

Commit

Permalink
Fix unintended skip in metric collection on Azure Monitor (#37203)
Browse files Browse the repository at this point in the history
* Fix unintended skip in metric collection

Due to the use of `time.Now()` to set the value of the reference time
used to decide if collect the value of the metric, the metricset
may skip a metric collection.

Made two changes to the Azure Monitor metricset:

- moved `referenceTime` into the `Fetch()` and truncated its value to
  seconds to have a more predictable reference time for comparison.
- Updated the `MetricRegistry.NeedsUpdate()` method to use
  `referenceTime` vs. using "now" to compare with the time grain
  duration.

Current tests seem fine, with PT5M time grain and collection periods
of 300s and 60s.

I am also adding some structured logging messages to track registry
decisions at the debug log level.

Here's how to parse the structured logs to get a nice table view:

```shell
$ cat metricbeat.log.json | grep "MetricRegistry" | jq -r  '[.key, .needs_update, .reference_time, .now, .time_grain_start_time//"n/a", .last_collection_at//"n/a"] | @TSV'
fdd3a07a3cabd90233c083950a4bc30c        true    2023-11-26T15:51:30.000Z        2023-11-26T15:51:30.967Z        2023-11-26T15:46:30.000Z        2023-11-26T15:46:30.000Z
6ee8809577a906538473e3e5e98dc893        true    2023-11-26T15:51:30.000Z        2023-11-26T15:51:35.257Z        2023-11-26T15:46:30.000Z        2023-11-26T15:46:30.000Z
6aedb7dffafbfe9ca19e0aa01436d30a        false   2023-11-26T15:51:30.000Z        2023-11-26T15:51:35.757Z        2023-11-26T15:46:30.000Z        2023-11-26T15:48:30.000Z
```

---------

Co-authored-by: Richa Talwar <102972658+ritalwar@users.noreply.github.com>
  • Loading branch information
zmoog and ritalwar authored Nov 28, 2023
1 parent 62c5e91 commit 110cc31
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix CPU and memory metrics collection from privileged process on Windows {issue}17314[17314]{pull}37027[37027]
- Enhanced Azure Metrics metricset with refined grouping logic and resolved duplication issues for TSDB compatibility {pull}36823[36823]
- Fix memory leak on Windows {issue}37142[37142] {pull}37171[37171]
- Fix unintended skip in metric collection on Azure Monitor {issue}37204[37204] {pull}37203[37203]

*Osquerybeat*

Expand Down
15 changes: 14 additions & 1 deletion x-pack/metricbeat/module/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azure

import (
"fmt"
"time"

"github.com/elastic/beats/v7/metricbeat/mb"
)
Expand Down Expand Up @@ -87,6 +88,18 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
// It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Set the reference time for the current fetch.
//
// The reference time is used to calculate time intervals
// and compare with collection info in the metric
// registry to decide whether to collect metrics or not,
// depending on metric time grain (check `MetricRegistry`
// for more information).
//
// We truncate the reference time to the second to avoid millisecond
// variations in the collection period causing skipped collections.
referenceTime := time.Now().UTC().Truncate(time.Second)

// Initialize cloud resources and monitor metrics
// information.
//
Expand Down Expand Up @@ -116,7 +129,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {

for _, metricsDefinition := range metricsByResourceId {
// Fetch metric values for each resource.
metricValues := m.Client.GetMetricValues(metricsDefinition, report)
metricValues := m.Client.GetMetricValues(referenceTime, metricsDefinition, report)

// Turns metric values into events and sends them to Elasticsearch.
if err := mapToEvents(metricValues, m.Client, report); err != nil {
Expand Down
78 changes: 63 additions & 15 deletions x-pack/metricbeat/module/azure/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
)

// NewMetricRegistry instantiates a new metric registry.
func NewMetricRegistry() *MetricRegistry {
func NewMetricRegistry(logger *logp.Logger) *MetricRegistry {
return &MetricRegistry{
logger: logger,
collectionsInfo: make(map[string]MetricCollectionInfo),
}
}
Expand All @@ -29,6 +30,7 @@ func NewMetricRegistry() *MetricRegistry {
// This is used to avoid collecting the same metric values over and over again
// when the time grain is larger than the collection interval.
type MetricRegistry struct {
logger *logp.Logger
collectionsInfo map[string]MetricCollectionInfo
}

Expand All @@ -38,26 +40,70 @@ func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) {
m.collectionsInfo[m.buildMetricKey(metric)] = info
}

// NeedsUpdate returns true if the metric needs to be updated.
func (m *MetricRegistry) NeedsUpdate(metric Metric) bool {
// NeedsUpdate returns true if the metric needs to be collected again
// for the given `referenceTime`.
func (m *MetricRegistry) NeedsUpdate(referenceTime time.Time, metric Metric) bool {
// Build a key to store the metric in the registry.
// The key is a combination of the namespace,
// resource ID and metric names.
metricKey := m.buildMetricKey(metric)

if info, exists := m.collectionsInfo[metricKey]; exists {
duration := convertTimeGrainToDuration(info.timeGrain)
// Get the now time in UTC, only to be used for logging.
// It's interesting to see when the registry evaluate each
// metric in relation to the reference time.
now := time.Now().UTC()

if collection, exists := m.collectionsInfo[metricKey]; exists {
// Turn the time grain into a duration (for example, PT5M -> 5 minutes).
timeGrainDuration := convertTimeGrainToDuration(collection.timeGrain)

// Calculate the start time of the time grain in relation to
// the reference time.
timeGrainStartTime := referenceTime.Add(-timeGrainDuration)

// If the last collection time is after the start time of the time grain,
// it means that we already have a value for the given time grain.
//
// In this case, the metricset does not need to collect the metric
// values again.
if collection.timestamp.After(timeGrainStartTime) {
m.logger.Debugw(
"MetricRegistry: Metric does not need an update",
"needs_update", false,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

// Check if the metric has been collected within a
// time period defined by the time grain.
if info.timestamp.After(time.Now().Add(duration * (-1))) {
return false
}

// The last collection time is before the start time of the time grain,
// it means that the metricset needs to collect the metric values again.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
"time_grain_start_time", timeGrainStartTime,
"last_collection_at", collection.timestamp,
)

return true
}

// If the metric is not in the registry, it means that it has never
// been collected before.
//
// In this case, we need to collect the metric.
m.logger.Debugw(
"MetricRegistry: Metric needs an update",
"needs_update", true,
"reference_time", referenceTime,
"now", now,
)

return true
}

Expand Down Expand Up @@ -101,11 +147,13 @@ func NewClient(config Config) (*Client, error) {
return nil, err
}

logger := logp.NewLogger("azure monitor client")

client := &Client{
AzureMonitorService: azureMonitorService,
Config: config,
Log: logp.NewLogger("azure monitor client"),
MetricRegistry: NewMetricRegistry(),
Log: logger,
MetricRegistry: NewMetricRegistry(logger),
}

client.ResourceConfigurations.RefreshInterval = config.RefreshListInterval
Expand Down Expand Up @@ -176,11 +224,10 @@ func (client *Client) InitResources(fn mapResourceMetrics) error {
}

// GetMetricValues returns the metric values for the given cloud resources.
func (client *Client) GetMetricValues(metrics []Metric, reporter mb.ReporterV2) []Metric {
func (client *Client) GetMetricValues(referenceTime time.Time, metrics []Metric, reporter mb.ReporterV2) []Metric {
var result []Metric

// Same end time for all metrics in the same batch.
referenceTime := time.Now().UTC()
interval := client.Config.Period

// Fetch in the range [{-2 x INTERVAL},{-1 x INTERVAL}) with a delay of {INTERVAL}.
Expand Down Expand Up @@ -208,7 +255,7 @@ func (client *Client) GetMetricValues(metrics []Metric, reporter mb.ReporterV2)
// the time grain of the metric, we can determine if the metric needs
// to be collected again, or if we can skip it.
//
if !client.MetricRegistry.NeedsUpdate(metric) {
if !client.MetricRegistry.NeedsUpdate(referenceTime, metric) {
continue
}

Expand Down Expand Up @@ -413,11 +460,12 @@ func (client *Client) AddVmToResource(resourceId string, vm VmResource) {
// NewMockClient instantiates a new client with the mock azure service
func NewMockClient() *Client {
azureMockService := new(MockService)
logger := logp.NewLogger("test azure monitor")
client := &Client{
AzureMonitorService: azureMockService,
Config: Config{},
Log: logp.NewLogger("test azure monitor"),
MetricRegistry: NewMetricRegistry(),
Log: logger,
MetricRegistry: NewMetricRegistry(logger),
}
return client
}
7 changes: 5 additions & 2 deletions x-pack/metricbeat/module/azure/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package azure
import (
"errors"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
Expand Down Expand Up @@ -66,6 +67,7 @@ func TestGetMetricValues(t *testing.T) {
client.Config = resourceIDConfig

t.Run("return no error when no metric values are returned but log and send event", func(t *testing.T) {
referenceTime := time.Now().UTC().Truncate(time.Second)
client.ResourceConfigurations = ResourceConfiguration{
Metrics: []Metric{
{
Expand All @@ -82,12 +84,13 @@ func TestGetMetricValues(t *testing.T) {
client.AzureMonitorService = m
mr := MockReporterV2{}
mr.On("Error", mock.Anything).Return(true)
metrics := client.GetMetricValues(client.ResourceConfigurations.Metrics, &mr)
metrics := client.GetMetricValues(referenceTime, client.ResourceConfigurations.Metrics, &mr)
assert.Equal(t, len(metrics), 0)
assert.Equal(t, len(client.ResourceConfigurations.Metrics[0].Values), 0)
m.AssertExpectations(t)
})
t.Run("return metric values", func(t *testing.T) {
referenceTime := time.Now().UTC().Truncate(time.Second)
client.ResourceConfigurations = ResourceConfiguration{
Metrics: []Metric{
{
Expand All @@ -104,7 +107,7 @@ func TestGetMetricValues(t *testing.T) {
client.AzureMonitorService = m
mr := MockReporterV2{}
mr.On("Error", mock.Anything).Return(true)
metricValues := client.GetMetricValues(client.ResourceConfigurations.Metrics, &mr)
metricValues := client.GetMetricValues(referenceTime, client.ResourceConfigurations.Metrics, &mr)
assert.Equal(t, len(metricValues), 0)
assert.Equal(t, len(client.ResourceConfigurations.Metrics[0].Values), 0)
m.AssertExpectations(t)
Expand Down

0 comments on commit 110cc31

Please sign in to comment.