Skip to content

Commit

Permalink
Only watch metadata for ReplicaSets in K8s (#41100) (#41214)
Browse files Browse the repository at this point in the history
* Bump github.com/elastic/elastic-agent-autodiscover to v0.9.0

* Only watch metadata for ReplicaSets in k8s autodiscovery

* Only watch metadata for ReplicaSets in add_kubernetes_metadata processor

* Fix linter warnings

* Merge changelog entries

(cherry picked from commit ee780d2)

# Conflicts:
#	NOTICE.txt
#	go.mod
#	go.sum
#	libbeat/autodiscover/providers/kubernetes/pod.go
#	libbeat/processors/add_kubernetes_metadata/kubernetes.go

Co-authored-by: Mikołaj Świątek <mail@mikolajswiatek.com>
  • Loading branch information
mergify[bot] and swiatekm authored Oct 14, 2024
1 parent f32a872 commit 2861b61
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 43 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]
- The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669]
- Update to Go 1.22.7. {pull}41018[41018]
- Replace Ubuntu 20.04 with 24.04 for Docker base images {issue}40743[40743] {pull}40942[40942]
- Reduce memory consumption of k8s autodiscovery and the add_kubernetes_metadata processor when Deployment metadata is enabled

*Auditbeat*

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12612,11 +12612,11 @@ various licenses:

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-autodiscover
Version: v0.8.1
Version: v0.9.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.8.1/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.9.0/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ require (
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.8.1
github.com/elastic/elastic-agent-autodiscover v0.9.0
github.com/elastic/elastic-agent-libs v0.12.1
github.com/elastic/elastic-agent-system-metrics v0.10.4-0.20240826151019-9db0a02d3b85
github.com/elastic/go-elasticsearch/v8 v8.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/ebpfevents v0.6.0 h1:BrL3m7JFK7U6h2jkbk3xAWWs//IZnugCHEDds5u2v68=
github.com/elastic/ebpfevents v0.6.0/go.mod h1:ESG9gw7N+n5yCCMgdg1IIJENKWSmX7+X0Fi9GUs9nvU=
github.com/elastic/elastic-agent-autodiscover v0.8.1 h1:u6TWqh7wfevu6S4GUq4SIxYBRo4b/P5RZmx/rSvT10A=
github.com/elastic/elastic-agent-autodiscover v0.8.1/go.mod h1:0gzGsaDCAqBfUZjuCqqWsSI60eaZ778A5tQZV72rPV0=
github.com/elastic/elastic-agent-autodiscover v0.9.0 h1:+iWIKh0u3e8I+CJa3FfWe9h0JojNasPgYIA47gpuuns=
github.com/elastic/elastic-agent-autodiscover v0.9.0/go.mod h1:5iUxLHhVdaGSWYTveSwfJEY4RqPXTG13LPiFoxcpFd4=
github.com/elastic/elastic-agent-client/v7 v7.13.0 h1:ENCfV5XIMmjWo9/0J7t//5N7xgm43Ktg0SyIomupRcA=
github.com/elastic/elastic-agent-client/v7 v7.13.0/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE=
github.com/elastic/elastic-agent-libs v0.12.1 h1:5jkxMx15Bna8cq7/Sz/XUIVUXfNWiJ80iSk4ICQ7KJ0=
Expand Down
71 changes: 45 additions & 26 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"time"

"github.com/gofrs/uuid"
"k8s.io/apimachinery/pkg/runtime/schema"

k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/elastic-agent-autodiscover/bus"
Expand Down Expand Up @@ -126,11 +128,23 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
logger.Errorf("Error creating metadata client due to error %+v", err)
}
replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
},
nil,
metadata.RemoveUnnecessaryReplicaSetData,
)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
Expand Down Expand Up @@ -216,23 +230,26 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {
var kubeMeta, container mapstr.M

annotations := make(mapstr.M, 0)
rawMeta, ok := event["kubernetes"]
if ok {
kubeMeta = rawMeta.(mapstr.M)
// The builder base config can configure any of the field values of kubernetes if need be.
e["kubernetes"] = kubeMeta
if rawAnn, ok := kubeMeta["annotations"]; ok {
anns, _ := rawAnn.(mapstr.M)
if len(anns) != 0 {
annotations = anns.Clone()
rawMeta, found := event["kubernetes"]
if found {
kubeMetaMap, ok := rawMeta.(mapstr.M)
if ok {
kubeMeta = kubeMetaMap
// The builder base config can configure any of the field values of kubernetes if need be.
e["kubernetes"] = kubeMeta
if rawAnn, ok := kubeMeta["annotations"]; ok {
anns, _ := rawAnn.(mapstr.M)
if len(anns) != 0 {
annotations = anns.Clone()
}
}
}

// Look at all the namespace level default annotations and do a merge with priority going to the pod annotations.
if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok {
namespaceAnnotations, _ := rawNsAnn.(mapstr.M)
if len(namespaceAnnotations) != 0 {
annotations.DeepUpdateNoOverwrite(namespaceAnnotations)
// Look at all the namespace level default annotations and do a merge with priority going to the pod annotations.
if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok {
namespaceAnnotations, _ := rawNsAnn.(mapstr.M)
if len(namespaceAnnotations) != 0 {
annotations.DeepUpdateNoOverwrite(namespaceAnnotations)
}
}
}
}
Expand All @@ -246,12 +263,14 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {
e["ports"] = ports
}

if rawCont, ok := kubeMeta["container"]; ok {
container = rawCont.(mapstr.M)
// This would end up adding a runtime entry into the event. This would make sure
// that there is not an attempt to spin up a docker input for a rkt container and when a
// rkt input exists it would be natively supported.
e["container"] = container
if rawCont, found := kubeMeta["container"]; found {
if containerMap, ok := rawCont.(mapstr.M); ok {
container = containerMap
// This would end up adding a runtime entry into the event. This would make sure
// that there is not an attempt to spin up a docker input for a rkt container and when a
// rkt input exists it would be natively supported.
e["container"] = container
}
}

cname := utils.GetContainerName(container)
Expand Down
13 changes: 12 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@ import (

func TestGenerateHints(t *testing.T) {
tests := []struct {
name string
event bus.Event
result bus.Event
}{
// Empty events should return empty hints
{
name: "empty",
event: bus.Event{},
result: bus.Event{},
},
// Only kubernetes payload must return only kubernetes as part of the hint
{
name: "only kubernetes",
event: bus.Event{
"kubernetes": mapstr.M{
"pod": mapstr.M{
Expand All @@ -71,6 +74,7 @@ func TestGenerateHints(t *testing.T) {
},
// Kubernetes payload with container info must be bubbled to top level
{
name: "kubernetes container info top level",
event: bus.Event{
"kubernetes": mapstr.M{
"container": mapstr.M{
Expand Down Expand Up @@ -102,6 +106,7 @@ func TestGenerateHints(t *testing.T) {
// not.to.include must not be part of hints
// period is annotated at both container and pod level. Container level value must be in hints
{
name: "multiple hints",
event: bus.Event{
"kubernetes": mapstr.M{
"annotations": getNestedAnnotations(mapstr.M{
Expand Down Expand Up @@ -163,6 +168,7 @@ func TestGenerateHints(t *testing.T) {
// Have one set of hints come from the pod and the other come from namespaces
// The resultant hints should have a combination of both
{
name: "hints from Pod and Namespace",
event: bus.Event{
"kubernetes": mapstr.M{
"annotations": getNestedAnnotations(mapstr.M{
Expand Down Expand Up @@ -227,6 +233,7 @@ func TestGenerateHints(t *testing.T) {
// Have one set of hints come from the pod and the same keys come from namespaces
// The resultant hints should honor only pods and not namespace.
{
name: "pod hints win over namespace",
event: bus.Event{
"kubernetes": mapstr.M{
"annotations": getNestedAnnotations(mapstr.M{
Expand Down Expand Up @@ -288,6 +295,7 @@ func TestGenerateHints(t *testing.T) {
// Have no hints on the pod and have namespace level defaults.
// The resultant hints should honor only namespace defaults.
{
name: "namespace defaults",
event: bus.Event{
"kubernetes": mapstr.M{
"namespace_annotations": getNestedAnnotations(mapstr.M{
Expand Down Expand Up @@ -339,7 +347,10 @@ func TestGenerateHints(t *testing.T) {
logger: logp.NewLogger("kubernetes.pod"),
}
for _, test := range tests {
assert.Equal(t, p.GenerateHints(test.event), test.result)
test := test
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.result, p.GenerateHints(test.event))
})
}
}

Expand Down
33 changes: 22 additions & 11 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"

k8sclient "k8s.io/client-go/kubernetes"

"github.com/elastic/elastic-agent-autodiscover/kubernetes"
Expand Down Expand Up @@ -226,11 +228,23 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
k.log.Errorf("Error creating metadata client due to error %+v", err)
}
replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
},
nil,
metadata.RemoveUnnecessaryReplicaSetData,
)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
Expand Down Expand Up @@ -259,18 +273,15 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
k.log.Debugf("Adding kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName())
pod, _ := obj.(*kubernetes.Pod)
k.addPod(pod)
},
UpdateFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
k.log.Debugf("Updating kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName())
pod, _ := obj.(*kubernetes.Pod)
k.updatePod(pod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
k.log.Debugf("Removing pod: %s/%s", pod.GetNamespace(), pod.GetName())
pod, _ := obj.(*kubernetes.Pod)
k.removePod(pod)
},
})
Expand Down

0 comments on commit 2861b61

Please sign in to comment.