Skip to content

Commit

Permalink
[Metricbeat][Kubernetes] Share watchers between metricsets (#37332)
Browse files Browse the repository at this point in the history
* Share watchers between metricsets.

---------

Signed-off-by: constanca <constanca.manteigas@elastic.co>
Co-authored-by: Michael Katsoulis <michaelkatsoulis88@gmail.com>
Co-authored-by: Tetiana Kravchenko <tanya.kravchenko.v@gmail.com>
  • Loading branch information
3 people authored Apr 3, 2024
1 parent aaa4829 commit 460b5c4
Show file tree
Hide file tree
Showing 14 changed files with 1,446 additions and 446 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.

==== Added

- Update watchers to be shared between metricsets in Kubernetes module. {pull}37332[37332]
- Add new metricset in Kubernetes module, `state_namespace`. {pull}36406[36406]
- Add configuration for APM instrumentation and expose the tracer trough the Beat object. {pull}17938[17938]
- Make the behavior of clientWorker and netClientWorker consistent when error is returned from publisher pipeline
Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12313,11 +12313,11 @@ various licenses:

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-autodiscover
Version: v0.6.7
Version: v0.6.8
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.6.7/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.6.8/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ require (
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.4.0
github.com/elastic/elastic-agent-autodiscover v0.6.7
github.com/elastic/elastic-agent-autodiscover v0.6.8
github.com/elastic/elastic-agent-libs v0.7.5
github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3
github.com/elastic/elastic-agent-system-metrics v0.9.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/ebpfevents v0.4.0 h1:M80eAeJnzvGQgU9cjJqkjFca9pjM3aq/TuZxJeom4bI=
github.com/elastic/ebpfevents v0.4.0/go.mod h1:o21z5xup/9dK8u0Hg9bZRflSqqj1Zu5h2dg2hSTcUPQ=
github.com/elastic/elastic-agent-autodiscover v0.6.7 h1:+KVjltN0rPsBrU8b156gV4lOTBgG/vt0efFCFARrf3g=
github.com/elastic/elastic-agent-autodiscover v0.6.7/go.mod h1:hFeFqneS2r4jD0/QzGkrNk0YVdN0JGh7lCWdsH7zcI4=
github.com/elastic/elastic-agent-autodiscover v0.6.8 h1:BSXz+QwjZAEt08G+T3GDGl14Bh9a6zD8luNCvZut/b8=
github.com/elastic/elastic-agent-autodiscover v0.6.8/go.mod h1:hFeFqneS2r4jD0/QzGkrNk0YVdN0JGh7lCWdsH7zcI4=
github.com/elastic/elastic-agent-client/v7 v7.8.1 h1:J9wZc/0mUvSEok0X5iR5+n60Jgb+AWooKddb3XgPWqM=
github.com/elastic/elastic-agent-client/v7 v7.8.1/go.mod h1:axl1nkdqc84YRFkeJGD9jExKNPUrOrzf3DFo2m653nY=
github.com/elastic/elastic-agent-libs v0.7.5 h1:4UMqB3BREvhwecYTs/L23oQp1hs/XUkcunPlmTZn5yg=
Expand Down
4 changes: 4 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,10 @@ func (s *mockUpdaterWatcher) Store() caches.Store {
func (s *mockUpdaterWatcher) AddEventHandler(kubernetes.ResourceEventHandler) {
}

func (s *mockUpdaterWatcher) GetEventHandler() kubernetes.ResourceEventHandler {
return nil
}

func (s *mockUpdaterStore) List() []interface{} {
return s.objects
}
Expand Down
17 changes: 5 additions & 12 deletions metricbeat/helper/kubernetes/state_metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
)

const prefix = "state_"

/*
mappings stores the metrics for each metricset. The key of the map is the name of the metricset
and the values are the mapping of the metricset metrics.
Expand All @@ -45,7 +43,7 @@ var lock sync.RWMutex
// The New method will be called after the setup of the module and before starting to fetch data
func Init(name string, mapping *prometheus.MetricsMapping) {
if name != util.NamespaceResource {
name = prefix + name
name = util.StateMetricsetPrefix + name
}
lock.Lock()
mappings[name] = mapping
Expand Down Expand Up @@ -79,16 +77,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
mapping := mappings[base.Name()]
lock.Unlock()

resourceName := base.Name()
if resourceName != util.NamespaceResource {
resourceName = strings.ReplaceAll(resourceName, prefix, "")
}

return &MetricSet{
BaseMetricSet: base,
prometheusClient: prometheusClient,
prometheusMapping: mapping,
enricher: util.NewResourceMetadataEnricher(base, resourceName, mod.GetMetricsRepo(), false),
enricher: util.NewResourceMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), false),
mod: mod,
}, nil
}
Expand All @@ -103,12 +96,12 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
// for the state_namespace metricset.
resourceName := m.BaseMetricSet.Name()
if resourceName != util.NamespaceResource {
resourceName = strings.ReplaceAll(resourceName, prefix, "")
resourceName = strings.ReplaceAll(resourceName, util.StateMetricsetPrefix, "")
} else {
resourceName = "state_namespace"
}

m.enricher.Start()
m.enricher.Start(m.mod.GetResourceWatchers())

families, err := m.mod.GetStateMetricsFamilies(m.prometheusClient)
if err != nil {
Expand Down Expand Up @@ -139,6 +132,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {

// Close stops this metricset
func (m *MetricSet) Close() error {
m.enricher.Stop()
m.enricher.Stop(m.mod.GetResourceWatchers())
return nil
}
6 changes: 3 additions & 3 deletions metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), true),
enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), true),
mod: mod,
}, nil
}
Expand All @@ -84,7 +84,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()
m.enricher.Start(m.mod.GetResourceWatchers())

body, err := m.mod.GetKubeletStats(m.http)
if err != nil {
Expand Down Expand Up @@ -131,6 +131,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {

// Close stops this metricset
func (m *MetricSet) Close() error {
m.enricher.Stop()
m.enricher.Stop(m.mod.GetResourceWatchers())
return nil
}
8 changes: 8 additions & 0 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Module interface {
GetStateMetricsFamilies(prometheus p.Prometheus) ([]*p.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)
GetMetricsRepo() *util.MetricsRepo
GetResourceWatchers() *util.Watchers
}

type familiesCache struct {
Expand Down Expand Up @@ -86,6 +87,7 @@ type module struct {
kubeStateMetricsCache *kubeStateMetricsCache
kubeletStatsCache *kubeletStatsCache
metricsRepo *util.MetricsRepo
resourceWatchers *util.Watchers
cacheHash uint64
}

Expand All @@ -97,6 +99,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
cacheMap: make(map[uint64]*statsCache),
}
metricsRepo := util.NewMetricsRepo()
resourceWatchers := util.NewWatchers()
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
Expand All @@ -108,6 +111,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeStateMetricsCache: kubeStateMetricsCache,
kubeletStatsCache: kubeletStatsCache,
metricsRepo: metricsRepo,
resourceWatchers: resourceWatchers,
cacheHash: hash,
}
return &m, nil
Expand Down Expand Up @@ -162,3 +166,7 @@ func generateCacheHash(host []string) (uint64, error) {
func (m *module) GetMetricsRepo() *util.MetricsRepo {
return m.metricsRepo
}

func (m *module) GetResourceWatchers() *util.Watchers {
return m.resourceWatchers
}
6 changes: 3 additions & 3 deletions metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, util.NodeResource, mod.GetMetricsRepo(), false),
enricher: util.NewResourceMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), false),
mod: mod,
}, nil
}
Expand All @@ -84,7 +84,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()
m.enricher.Start(m.mod.GetResourceWatchers())

body, err := m.mod.GetKubeletStats(m.http)
if err != nil {
Expand Down Expand Up @@ -115,6 +115,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {

// Close stops this metricset
func (m *MetricSet) Close() error {
m.enricher.Stop()
m.enricher.Stop(m.mod.GetResourceWatchers())
return nil
}
6 changes: 3 additions & 3 deletions metricbeat/module/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, util.PodResource, mod.GetMetricsRepo(), true),
enricher: util.NewResourceMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), true),
mod: mod,
}, nil
}
Expand All @@ -85,7 +85,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()
m.enricher.Start(m.mod.GetResourceWatchers())

body, err := m.mod.GetKubeletStats(m.http)
if err != nil {
Expand Down Expand Up @@ -133,6 +133,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {

// Close stops this metricset
func (m *MetricSet) Close() error {
m.enricher.Stop()
m.enricher.Stop(m.mod.GetResourceWatchers())
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), false),
enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), false),
mod: mod,
}, nil
}
Expand All @@ -124,7 +124,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()
m.enricher.Start(m.mod.GetResourceWatchers())

families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
Expand Down Expand Up @@ -196,6 +196,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {

// Close stops this metricset
func (m *MetricSet) Close() error {
m.enricher.Stop()
m.enricher.Stop(m.mod.GetResourceWatchers())
return nil
}
Loading

0 comments on commit 460b5c4

Please sign in to comment.