From 2861b61f337bae7a90fd1cb7f8400e3bc37a9458 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 14 Oct 2024 16:39:00 +0200 Subject: [PATCH] Only watch metadata for ReplicaSets in K8s (#41100) (#41214) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Bump github.com/elastic/elastic-agent-autodiscover to v0.9.0 * Only watch metadata for ReplicaSets in k8s autodiscovery * Only watch metadata for ReplicaSets in add_kubernetes_metadata processor * Fix linter warnings * Merge changelog entries (cherry picked from commit ee780d252892469931b887643c006714dff08b22) # Conflicts: # NOTICE.txt # go.mod # go.sum # libbeat/autodiscover/providers/kubernetes/pod.go # libbeat/processors/add_kubernetes_metadata/kubernetes.go Co-authored-by: Mikołaj Świątek --- CHANGELOG.next.asciidoc | 2 + NOTICE.txt | 4 +- go.mod | 2 +- go.sum | 4 +- .../autodiscover/providers/kubernetes/pod.go | 71 ++++++++++++------- .../providers/kubernetes/pod_test.go | 13 +++- .../add_kubernetes_metadata/kubernetes.go | 33 ++++++--- 7 files changed, 86 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c7c4c313026..f794c8ab902 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -153,6 +153,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572] - The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669] - Update to Go 1.22.7. {pull}41018[41018] +- Replace Ubuntu 20.04 with 24.04 for Docker base images {issue}40743[40743] {pull}40942[40942] +- Reduce memory consumption of k8s autodiscovery and the add_kubernetes_metadata processor when Deployment metadata is enabled *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 5172fabcec3..1bda6eb8a03 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12612,11 +12612,11 @@ various licenses: -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-autodiscover -Version: v0.8.1 +Version: v0.9.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.8.1/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.9.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index a1847eaf938..e4472b0253e 100644 --- a/go.mod +++ b/go.mod @@ -198,7 +198,7 @@ require ( github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 github.com/elastic/bayeux v1.0.5 github.com/elastic/ebpfevents v0.6.0 - github.com/elastic/elastic-agent-autodiscover v0.8.1 + github.com/elastic/elastic-agent-autodiscover v0.9.0 github.com/elastic/elastic-agent-libs v0.12.1 github.com/elastic/elastic-agent-system-metrics v0.10.4-0.20240826151019-9db0a02d3b85 github.com/elastic/go-elasticsearch/v8 v8.14.0 diff --git a/go.sum b/go.sum index f30c362a4eb..0840721cb23 100644 --- a/go.sum +++ b/go.sum @@ -558,8 +558,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY= github.com/elastic/ebpfevents v0.6.0 h1:BrL3m7JFK7U6h2jkbk3xAWWs//IZnugCHEDds5u2v68= github.com/elastic/ebpfevents v0.6.0/go.mod h1:ESG9gw7N+n5yCCMgdg1IIJENKWSmX7+X0Fi9GUs9nvU= -github.com/elastic/elastic-agent-autodiscover v0.8.1 h1:u6TWqh7wfevu6S4GUq4SIxYBRo4b/P5RZmx/rSvT10A= -github.com/elastic/elastic-agent-autodiscover v0.8.1/go.mod h1:0gzGsaDCAqBfUZjuCqqWsSI60eaZ778A5tQZV72rPV0= +github.com/elastic/elastic-agent-autodiscover v0.9.0 h1:+iWIKh0u3e8I+CJa3FfWe9h0JojNasPgYIA47gpuuns= +github.com/elastic/elastic-agent-autodiscover v0.9.0/go.mod h1:5iUxLHhVdaGSWYTveSwfJEY4RqPXTG13LPiFoxcpFd4= github.com/elastic/elastic-agent-client/v7 v7.13.0 h1:ENCfV5XIMmjWo9/0J7t//5N7xgm43Ktg0SyIomupRcA= github.com/elastic/elastic-agent-client/v7 v7.13.0/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE= github.com/elastic/elastic-agent-libs v0.12.1 h1:5jkxMx15Bna8cq7/Sz/XUIVUXfNWiJ80iSk4ICQ7KJ0= diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index c5f9c721eb9..e6ec1424602 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -25,6 +25,8 @@ import ( "time" "github.com/gofrs/uuid" + "k8s.io/apimachinery/pkg/runtime/schema" + k8s "k8s.io/client-go/kubernetes" "github.com/elastic/elastic-agent-autodiscover/bus" @@ -126,11 +128,23 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod if metaConf.Deployment { - replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Namespace: config.Namespace, - HonorReSyncs: true, - }, nil) + metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + logger.Errorf("Error creating metadata client due to error %+v", err) + } + replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher( + "resource_metadata_enricher_rs", + client, + metadataClient, + schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}, + kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + HonorReSyncs: true, + }, + nil, + metadata.RemoveUnnecessaryReplicaSetData, + ) if err != nil { logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err) } @@ -216,23 +230,26 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event { var kubeMeta, container mapstr.M annotations := make(mapstr.M, 0) - rawMeta, ok := event["kubernetes"] - if ok { - kubeMeta = rawMeta.(mapstr.M) - // The builder base config can configure any of the field values of kubernetes if need be. - e["kubernetes"] = kubeMeta - if rawAnn, ok := kubeMeta["annotations"]; ok { - anns, _ := rawAnn.(mapstr.M) - if len(anns) != 0 { - annotations = anns.Clone() + rawMeta, found := event["kubernetes"] + if found { + kubeMetaMap, ok := rawMeta.(mapstr.M) + if ok { + kubeMeta = kubeMetaMap + // The builder base config can configure any of the field values of kubernetes if need be. + e["kubernetes"] = kubeMeta + if rawAnn, ok := kubeMeta["annotations"]; ok { + anns, _ := rawAnn.(mapstr.M) + if len(anns) != 0 { + annotations = anns.Clone() + } } - } - // Look at all the namespace level default annotations and do a merge with priority going to the pod annotations. - if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok { - namespaceAnnotations, _ := rawNsAnn.(mapstr.M) - if len(namespaceAnnotations) != 0 { - annotations.DeepUpdateNoOverwrite(namespaceAnnotations) + // Look at all the namespace level default annotations and do a merge with priority going to the pod annotations. + if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok { + namespaceAnnotations, _ := rawNsAnn.(mapstr.M) + if len(namespaceAnnotations) != 0 { + annotations.DeepUpdateNoOverwrite(namespaceAnnotations) + } } } } @@ -246,12 +263,14 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event { e["ports"] = ports } - if rawCont, ok := kubeMeta["container"]; ok { - container = rawCont.(mapstr.M) - // This would end up adding a runtime entry into the event. This would make sure - // that there is not an attempt to spin up a docker input for a rkt container and when a - // rkt input exists it would be natively supported. - e["container"] = container + if rawCont, found := kubeMeta["container"]; found { + if containerMap, ok := rawCont.(mapstr.M); ok { + container = containerMap + // This would end up adding a runtime entry into the event. This would make sure + // that there is not an attempt to spin up a docker input for a rkt container and when a + // rkt input exists it would be natively supported. + e["container"] = container + } } cname := utils.GetContainerName(container) diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 84712615ec1..c63cd7a32cf 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -44,16 +44,19 @@ import ( func TestGenerateHints(t *testing.T) { tests := []struct { + name string event bus.Event result bus.Event }{ // Empty events should return empty hints { + name: "empty", event: bus.Event{}, result: bus.Event{}, }, // Only kubernetes payload must return only kubernetes as part of the hint { + name: "only kubernetes", event: bus.Event{ "kubernetes": mapstr.M{ "pod": mapstr.M{ @@ -71,6 +74,7 @@ func TestGenerateHints(t *testing.T) { }, // Kubernetes payload with container info must be bubbled to top level { + name: "kubernetes container info top level", event: bus.Event{ "kubernetes": mapstr.M{ "container": mapstr.M{ @@ -102,6 +106,7 @@ func TestGenerateHints(t *testing.T) { // not.to.include must not be part of hints // period is annotated at both container and pod level. Container level value must be in hints { + name: "multiple hints", event: bus.Event{ "kubernetes": mapstr.M{ "annotations": getNestedAnnotations(mapstr.M{ @@ -163,6 +168,7 @@ func TestGenerateHints(t *testing.T) { // Have one set of hints come from the pod and the other come from namespaces // The resultant hints should have a combination of both { + name: "hints from Pod and Namespace", event: bus.Event{ "kubernetes": mapstr.M{ "annotations": getNestedAnnotations(mapstr.M{ @@ -227,6 +233,7 @@ func TestGenerateHints(t *testing.T) { // Have one set of hints come from the pod and the same keys come from namespaces // The resultant hints should honor only pods and not namespace. { + name: "pod hints win over namespace", event: bus.Event{ "kubernetes": mapstr.M{ "annotations": getNestedAnnotations(mapstr.M{ @@ -288,6 +295,7 @@ func TestGenerateHints(t *testing.T) { // Have no hints on the pod and have namespace level defaults. // The resultant hints should honor only namespace defaults. { + name: "namespace defaults", event: bus.Event{ "kubernetes": mapstr.M{ "namespace_annotations": getNestedAnnotations(mapstr.M{ @@ -339,7 +347,10 @@ func TestGenerateHints(t *testing.T) { logger: logp.NewLogger("kubernetes.pod"), } for _, test := range tests { - assert.Equal(t, p.GenerateHints(test.event), test.result) + test := test + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.result, p.GenerateHints(test.event)) + }) } } diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index e8600b6d85c..79dc76171d1 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/runtime/schema" + k8sclient "k8s.io/client-go/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes" @@ -226,11 +228,23 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod if metaConf.Deployment { - replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Namespace: config.Namespace, - HonorReSyncs: true, - }, nil) + metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + k.log.Errorf("Error creating metadata client due to error %+v", err) + } + replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher( + "resource_metadata_enricher_rs", + client, + metadataClient, + schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}, + kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + HonorReSyncs: true, + }, + nil, + metadata.RemoveUnnecessaryReplicaSetData, + ) if err != nil { k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err) } @@ -259,18 +273,15 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - pod := obj.(*kubernetes.Pod) - k.log.Debugf("Adding kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName()) + pod, _ := obj.(*kubernetes.Pod) k.addPod(pod) }, UpdateFunc: func(obj interface{}) { - pod := obj.(*kubernetes.Pod) - k.log.Debugf("Updating kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName()) + pod, _ := obj.(*kubernetes.Pod) k.updatePod(pod) }, DeleteFunc: func(obj interface{}) { - pod := obj.(*kubernetes.Pod) - k.log.Debugf("Removing pod: %s/%s", pod.GetNamespace(), pod.GetName()) + pod, _ := obj.(*kubernetes.Pod) k.removePod(pod) }, })