Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix event mapping implementation for statsd module #36925

Merged
merged 14 commits into from
Oct 30, 2023
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ is collected by it.
- Add missing 'TransactionType' dimension for Azure Storage Account. {pull}36413[36413]
- Add log error when statsd server fails to start {pull}36477[36477]
- Fix CassandraConnectionClosures metric configuration {pull}34742[34742]
- Fix event mapping implementation for statsd module {pull}36925[36925]

*Osquerybeat*

Expand Down
8 changes: 4 additions & 4 deletions x-pack/metricbeat/module/airflow/statsd/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
"@timestamp": "2017-10-12T08:05:34.853Z",
"airflow": {
"dag_duration": {
"15m_rate": 0.2,
"1m_rate": 0.2,
"5m_rate": 0.2,
"15m_rate": 0,
"1m_rate": 0,
"5m_rate": 0,
"count": 1,
"max": 200,
"mean": 200,
"mean_rate": 0.2222490946071946,
"mean_rate": 38960.532980091164,
agithomas marked this conversation as resolved.
Show resolved Hide resolved
"median": 200,
"min": 200,
"p75": 200,
Expand Down
16 changes: 8 additions & 8 deletions x-pack/metricbeat/module/airflow/statsd/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ import (
"sync"
"testing"

"github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/auditbeat/core"
_ "github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/metricbeat/mb"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
_ "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server"
)

func init() {
Expand All @@ -42,14 +41,14 @@ func getConfig() map[string]interface{} {
}
}

func createEvent(t *testing.T) {
func createEvent(data string, t *testing.T) {
udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", STATSD_HOST, STATSD_PORT))
require.NoError(t, err)

conn, err := net.DialUDP("udp", nil, udpAddr)
require.NoError(t, err)

_, err = fmt.Fprint(conn, "dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2")
_, err = fmt.Fprint(conn, data)
require.NoError(t, err)
}

Expand All @@ -70,15 +69,16 @@ func TestData(t *testing.T) {
wg.Done()

go ms.Run(reporter)
events = reporter.(*mbtest.CapturingPushReporterV2).BlockingCapture(1)
events = reporter.(*mbtest.CapturingPushReporterV2).BlockingCapture(2)

close(done)
}(wg)

wg.Wait()
createEvent(t)
createEvent("dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2", t)
createEvent("dagrun.duration.failed.b_dagid:500|ms|#k1:v1,k2:v2", t)
<-done

assert.Len(t, events, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test fail without the new changes in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed running the test before making this change, but I've run it now and made the necessary updates(the tags needed to be identical for both the events to test this change). Thanks!

if len(events) == 0 {
t.Fatal("received no events")
}
Expand Down
20 changes: 11 additions & 9 deletions x-pack/metricbeat/module/statsd/server/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,26 @@ func parse(b []byte) ([]statsdMetric, error) {
return metrics, nil
}

func eventMapping(metricName string, metricValue interface{}, metricSetFields mapstr.M, mappings map[string]StatsdMapping) {
func eventMapping(metricName string, metricValue interface{}, mappings map[string]StatsdMapping) mapstr.M {
m := mapstr.M{}
if len(mappings) == 0 {
metricSetFields[common.DeDot(metricName)] = metricValue
return
m[common.DeDot(metricName)] = metricValue
return m
}

for _, mapping := range mappings {
// The metricname match the one with no labels in mappings
// Let's insert it dedotted and continue
if metricName == mapping.Metric {
metricSetFields[mapping.Value.Field] = metricValue
return
m[mapping.Value.Field] = metricValue
tommyers-elastic marked this conversation as resolved.
Show resolved Hide resolved
return m
}

res := mapping.regex.FindStringSubmatch(metricName)

// Not all labels match
// Skip and continue to next mapping
if len(res) != (len(mapping.Labels) + 1) {
logger.Debug("not all labels match in statsd.mapping, skipped")
logger.Debug("not all labels match in statsd.mappings, skipped")
continue
}

Expand All @@ -133,13 +133,15 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma
continue
}

metricSetFields[label.Field] = res[i]
m[label.Field] = res[i]
}
}

// Let's add the metric with the value field
metricSetFields[mapping.Value.Field] = metricValue
m[mapping.Value.Field] = metricValue
break
}
return m
}

func newMetricProcessor(ttl time.Duration) *metricProcessor {
Expand Down
22 changes: 16 additions & 6 deletions x-pack/metricbeat/module/statsd/server/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,11 +737,9 @@ func TestEventMapping(t *testing.T) {
},
} {
t.Run(test.metricName, func(t *testing.T) {
metricSetFields := mapstr.M{}
builtMappings, _ := buildMappings(mappings)
eventMapping(test.metricName, test.metricValue, metricSetFields, builtMappings)

assert.Equal(t, test.expected, metricSetFields)
ms := eventMapping(test.metricName, test.metricValue, builtMappings)
assert.Equal(t, test.expected, ms)
})
}
}
Expand Down Expand Up @@ -1132,7 +1130,7 @@ func TestTagsGrouping(t *testing.T) {
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 2)
assert.Len(t, events, 4)

actualTags := []mapstr.M{}
for _, e := range events {
Expand All @@ -1146,6 +1144,18 @@ func TestTagsGrouping(t *testing.T) {
"k2": "v2",
},
},
{
"labels": mapstr.M{
"k1": "v1",
"k2": "v2",
},
},
{
"labels": mapstr.M{
"k1": "v2",
"k2": "v3",
},
},
{
"labels": mapstr.M{
"k1": "v2",
Expand Down Expand Up @@ -1224,7 +1234,7 @@ func TestData(t *testing.T) {
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 1)
assert.Len(t, events, 10)

mbevent := mbtest.StandardizeEvent(ms, *events[0])
mbtest.WriteEventToDataJSON(t, mbevent, "")
Expand Down
42 changes: 24 additions & 18 deletions x-pack/metricbeat/module/statsd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

mappings, err := buildMappings(config.Mappings)
if err != nil {
return nil, fmt.Errorf("invalid mapping configuration for `statsd.mapping`: %w", err)
return nil, fmt.Errorf("invalid mapping configuration for `statsd.mappings`: %w", err)
}
return &MetricSet{
BaseMetricSet: base,
Expand All @@ -107,8 +107,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Host returns the hostname or other module specific value that identifies a
// specific host or service instance from which to collect metrics.
func (b *MetricSet) Host() string {
return b.server.(*udp.UdpServer).GetHost()
func (m *MetricSet) Host() string {
return m.server.(*udp.UdpServer).GetHost()
}

func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) {
Expand Down Expand Up @@ -163,30 +163,36 @@ func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) {
return mappings, nil
}

// It processes metric groups, applies event mappings, and creates Metricbeat events.
// The generated events include metric fields, labels, and the namespace associated with the MetricSet.
// Returns a slice of Metricbeat events.
func (m *MetricSet) getEvents() []*mb.Event {
ritalwar marked this conversation as resolved.
Show resolved Hide resolved
groups := m.processor.GetAll()
events := make([]*mb.Event, len(groups))

for idx, tagGroup := range groups {

mapstrTags := mapstr.M{}
// If there are no metric groups, return nil to indicate no events.
if len(groups) == 0 {
return nil
}
events := make([]*mb.Event, 0, len(groups))
for _, tagGroup := range groups {
mapstrTags := make(mapstr.M, len(tagGroup.tags))
for k, v := range tagGroup.tags {
mapstrTags[k] = v
}

sanitizedMetrics := mapstr.M{}
for k, v := range tagGroup.metrics {
eventMapping(k, v, sanitizedMetrics, m.mappings)
}
// Apply event mapping to the metric and get MetricSetFields.
ms := eventMapping(k, v, m.mappings)

if len(sanitizedMetrics) == 0 {
continue
}

events[idx] = &mb.Event{
MetricSetFields: sanitizedMetrics,
RootFields: mapstr.M{"labels": mapstrTags},
Namespace: m.Module().Name(),
// If no MetricSetFields were generated, continue to the next metric.
if len(ms) == 0 {
continue
}
events = append(events, &mb.Event{
MetricSetFields: ms,
RootFields: mapstr.M{"labels": mapstrTags},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to worry about increasing storage cost by duplicating the tags across many more metric documents?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tags aren't always there. We're not adding them ourselves; they are optional. In fact, I couldn't find them in the case of Airflow, so there is no extra overhead involved. More about StatsD tags in general, as they are supported by every StatsD implementation, such as Datadog and Atlassian. Same we are supporting this here.

Namespace: m.Module().Name(),
})
}
}
return events
Expand Down
Loading