diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index b617edfaf04..7434cd62fec 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -95,6 +95,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. ==== Added +- Update watchers to be shared between metricsets in Kubernetes module. {pull}37332[37332] - Add new metricset in Kubernetes module, `state_namespace`. {pull}36406[36406] - Add configuration for APM instrumentation and expose the tracer trough the Beat object. {pull}17938[17938] - Make the behavior of clientWorker and netClientWorker consistent when error is returned from publisher pipeline diff --git a/NOTICE.txt b/NOTICE.txt index e5cf2df78b6..c4ae3f617b5 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12313,11 +12313,11 @@ various licenses: -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-autodiscover -Version: v0.6.7 +Version: v0.6.8 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.6.7/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.6.8/LICENSE: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index 538eb2fbdce..745d47673b6 100644 --- a/go.mod +++ b/go.mod @@ -202,7 +202,7 @@ require ( github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 github.com/elastic/bayeux v1.0.5 github.com/elastic/ebpfevents v0.4.0 - github.com/elastic/elastic-agent-autodiscover v0.6.7 + github.com/elastic/elastic-agent-autodiscover v0.6.8 github.com/elastic/elastic-agent-libs v0.7.5 github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 github.com/elastic/elastic-agent-system-metrics v0.9.2 diff --git a/go.sum b/go.sum index 7623c476c43..ff0abc92363 100644 --- a/go.sum +++ b/go.sum @@ -673,8 +673,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY= github.com/elastic/ebpfevents v0.4.0 h1:M80eAeJnzvGQgU9cjJqkjFca9pjM3aq/TuZxJeom4bI= github.com/elastic/ebpfevents v0.4.0/go.mod h1:o21z5xup/9dK8u0Hg9bZRflSqqj1Zu5h2dg2hSTcUPQ= -github.com/elastic/elastic-agent-autodiscover v0.6.7 h1:+KVjltN0rPsBrU8b156gV4lOTBgG/vt0efFCFARrf3g= -github.com/elastic/elastic-agent-autodiscover v0.6.7/go.mod h1:hFeFqneS2r4jD0/QzGkrNk0YVdN0JGh7lCWdsH7zcI4= +github.com/elastic/elastic-agent-autodiscover v0.6.8 h1:BSXz+QwjZAEt08G+T3GDGl14Bh9a6zD8luNCvZut/b8= +github.com/elastic/elastic-agent-autodiscover v0.6.8/go.mod h1:hFeFqneS2r4jD0/QzGkrNk0YVdN0JGh7lCWdsH7zcI4= github.com/elastic/elastic-agent-client/v7 v7.8.1 h1:J9wZc/0mUvSEok0X5iR5+n60Jgb+AWooKddb3XgPWqM= github.com/elastic/elastic-agent-client/v7 v7.8.1/go.mod h1:axl1nkdqc84YRFkeJGD9jExKNPUrOrzf3DFo2m653nY= github.com/elastic/elastic-agent-libs v0.7.5 h1:4UMqB3BREvhwecYTs/L23oQp1hs/XUkcunPlmTZn5yg= diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 1718dbe0752..4cc2d8bb393 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -2150,6 +2150,10 @@ func (s *mockUpdaterWatcher) Store() caches.Store { func (s *mockUpdaterWatcher) AddEventHandler(kubernetes.ResourceEventHandler) { } +func (s *mockUpdaterWatcher) GetEventHandler() kubernetes.ResourceEventHandler { + return nil +} + func (s *mockUpdaterStore) List() []interface{} { return s.objects } diff --git a/metricbeat/helper/kubernetes/state_metricset.go b/metricbeat/helper/kubernetes/state_metricset.go index 51929d73509..aad813e0099 100644 --- a/metricbeat/helper/kubernetes/state_metricset.go +++ b/metricbeat/helper/kubernetes/state_metricset.go @@ -29,8 +29,6 @@ import ( k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) -const prefix = "state_" - /* mappings stores the metrics for each metricset. The key of the map is the name of the metricset and the values are the mapping of the metricset metrics. @@ -45,7 +43,7 @@ var lock sync.RWMutex // The New method will be called after the setup of the module and before starting to fetch data func Init(name string, mapping *prometheus.MetricsMapping) { if name != util.NamespaceResource { - name = prefix + name + name = util.StateMetricsetPrefix + name } lock.Lock() mappings[name] = mapping @@ -79,16 +77,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { mapping := mappings[base.Name()] lock.Unlock() - resourceName := base.Name() - if resourceName != util.NamespaceResource { - resourceName = strings.ReplaceAll(resourceName, prefix, "") - } - return &MetricSet{ BaseMetricSet: base, prometheusClient: prometheusClient, prometheusMapping: mapping, - enricher: util.NewResourceMetadataEnricher(base, resourceName, mod.GetMetricsRepo(), false), + enricher: util.NewResourceMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), false), mod: mod, }, nil } @@ -103,12 +96,12 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { // for the state_namespace metricset. resourceName := m.BaseMetricSet.Name() if resourceName != util.NamespaceResource { - resourceName = strings.ReplaceAll(resourceName, prefix, "") + resourceName = strings.ReplaceAll(resourceName, util.StateMetricsetPrefix, "") } else { resourceName = "state_namespace" } - m.enricher.Start() + m.enricher.Start(m.mod.GetResourceWatchers()) families, err := m.mod.GetStateMetricsFamilies(m.prometheusClient) if err != nil { @@ -139,6 +132,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { // Close stops this metricset func (m *MetricSet) Close() error { - m.enricher.Stop() + m.enricher.Stop(m.mod.GetResourceWatchers()) return nil } diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index c277406faee..d1071f613de 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), true), + enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), true), mod: mod, }, nil } @@ -84,7 +84,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) { - m.enricher.Start() + m.enricher.Start(m.mod.GetResourceWatchers()) body, err := m.mod.GetKubeletStats(m.http) if err != nil { @@ -131,6 +131,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { // Close stops this metricset func (m *MetricSet) Close() error { - m.enricher.Stop() + m.enricher.Stop(m.mod.GetResourceWatchers()) return nil } diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index 1cb9ca037f9..23611e0b63c 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -42,6 +42,7 @@ type Module interface { GetStateMetricsFamilies(prometheus p.Prometheus) ([]*p.MetricFamily, error) GetKubeletStats(http *helper.HTTP) ([]byte, error) GetMetricsRepo() *util.MetricsRepo + GetResourceWatchers() *util.Watchers } type familiesCache struct { @@ -86,6 +87,7 @@ type module struct { kubeStateMetricsCache *kubeStateMetricsCache kubeletStatsCache *kubeletStatsCache metricsRepo *util.MetricsRepo + resourceWatchers *util.Watchers cacheHash uint64 } @@ -97,6 +99,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { cacheMap: make(map[uint64]*statsCache), } metricsRepo := util.NewMetricsRepo() + resourceWatchers := util.NewWatchers() return func(base mb.BaseModule) (mb.Module, error) { hash, err := generateCacheHash(base.Config().Hosts) if err != nil { @@ -108,6 +111,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { kubeStateMetricsCache: kubeStateMetricsCache, kubeletStatsCache: kubeletStatsCache, metricsRepo: metricsRepo, + resourceWatchers: resourceWatchers, cacheHash: hash, } return &m, nil @@ -162,3 +166,7 @@ func generateCacheHash(host []string) (uint64, error) { func (m *module) GetMetricsRepo() *util.MetricsRepo { return m.metricsRepo } + +func (m *module) GetResourceWatchers() *util.Watchers { + return m.resourceWatchers +} diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index 69bfcc2139e..e862d83e92a 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewResourceMetadataEnricher(base, util.NodeResource, mod.GetMetricsRepo(), false), + enricher: util.NewResourceMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), false), mod: mod, }, nil } @@ -84,7 +84,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) { - m.enricher.Start() + m.enricher.Start(m.mod.GetResourceWatchers()) body, err := m.mod.GetKubeletStats(m.http) if err != nil { @@ -115,6 +115,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { // Close stops this metricset func (m *MetricSet) Close() error { - m.enricher.Stop() + m.enricher.Stop(m.mod.GetResourceWatchers()) return nil } diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index 485a72f11b7..fe20641b432 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -76,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewResourceMetadataEnricher(base, util.PodResource, mod.GetMetricsRepo(), true), + enricher: util.NewResourceMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), true), mod: mod, }, nil } @@ -85,7 +85,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) { - m.enricher.Start() + m.enricher.Start(m.mod.GetResourceWatchers()) body, err := m.mod.GetKubeletStats(m.http) if err != nil { @@ -133,6 +133,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { // Close stops this metricset func (m *MetricSet) Close() error { - m.enricher.Stop() + m.enricher.Stop(m.mod.GetResourceWatchers()) return nil } diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index 86ffb6c0782..0c46e60f51a 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -115,7 +115,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), false), + enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), mod.GetResourceWatchers(), false), mod: mod, }, nil } @@ -124,7 +124,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - m.enricher.Start() + m.enricher.Start(m.mod.GetResourceWatchers()) families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { @@ -196,6 +196,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { // Close stops this metricset func (m *MetricSet) Close() error { - m.enricher.Stop() + m.enricher.Stop(m.mod.GetResourceWatchers()) return nil } diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 60b3360ab89..d89ca006f0e 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -38,19 +38,6 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -// Enricher takes Kubernetes events and enrich them with k8s metadata -type Enricher interface { - // Start will start the Kubernetes watcher on the first call, does nothing on the rest - // errors are logged as warning - Start() - - // Stop will stop the Kubernetes watcher - Stop() - - // Enrich the given list of events - Enrich([]mapstr.M) -} - type kubernetesConfig struct { KubeConfig string `config:"kube_config"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` @@ -64,22 +51,60 @@ type kubernetesConfig struct { Namespace string `config:"namespace"` } +// Enricher takes Kubernetes events and enrich them with k8s metadata +type Enricher interface { + // Start will start the Kubernetes watcher on the first call, does nothing on the rest + // errors are logged as warning + Start(*Watchers) + + // Stop will stop the Kubernetes watcher + Stop(*Watchers) + + // Enrich the given list of events + Enrich([]mapstr.M) +} + type enricher struct { sync.RWMutex - metadata map[string]mapstr.M - index func(mapstr.M) string - watcher kubernetes.Watcher - watchersStarted bool - watchersStartedLock sync.Mutex - namespaceWatcher kubernetes.Watcher - nodeWatcher kubernetes.Watcher - replicasetWatcher kubernetes.Watcher - jobWatcher kubernetes.Watcher - isPod bool + metadata map[string]mapstr.M + index func(mapstr.M) string + updateFunc func(kubernetes.Resource) map[string]mapstr.M + deleteFunc func(kubernetes.Resource) []string + metricsetName string + resourceName string + isPod bool + config *kubernetesConfig + log *logp.Logger +} + +type nilEnricher struct{} + +func (*nilEnricher) Start(*Watchers) {} +func (*nilEnricher) Stop(*Watchers) {} +func (*nilEnricher) Enrich([]mapstr.M) {} + +type metaWatcher struct { + watcher kubernetes.Watcher // watcher responsible for watching a specific resource + started bool // true if watcher has started, false otherwise + + metricsetsUsing []string // list of metricsets using this shared watcher(e.g. pod, container, state_pod) + + enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher + metadataObjects map[string]bool // representation of a set of ids(in the form of namespace_name-resource_name) of each object received by the watcher's handler functions + + nodeScope bool // whether this watcher should watch for resources in current node or in whole cluster + restartWatcher kubernetes.Watcher // whether this watcher needs a restart. Only relevant in leader nodes due to metricsets with different nodescope(pod, state_pod) +} + +type Watchers struct { + metaWatchersMap map[string]*metaWatcher + lock sync.RWMutex } const selector = "kubernetes" +const StateMetricsetPrefix = "state_" + const ( PodResource = "pod" ServiceResource = "service" @@ -96,6 +121,13 @@ const ( NamespaceResource = "state_namespace" ) +func NewWatchers() *Watchers { + watchers := &Watchers{ + metaWatchersMap: make(map[string]*metaWatcher), + } + return watchers +} + func getResource(resourceName string) kubernetes.Resource { switch resourceName { case PodResource: @@ -129,155 +161,519 @@ func getResource(resourceName string) kubernetes.Resource { } } -// NewResourceMetadataEnricher returns an Enricher configured for kubernetes resource events +// getExtraWatchers returns a list of the extra resources to watch based on some resource. +// The full list can be seen in https://github.com/elastic/beats/issues/37243, at Expected Watchers section. +func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddResourceMetadataConfig) []string { + switch resourceName { + case PodResource: + extra := []string{NamespaceResource, NodeResource} + // We need to create watchers for ReplicaSets and Jobs that it might belong to, + // in order to be able to retrieve 2nd layer Owner metadata like in case of: + // Deployment -> Replicaset -> Pod + // CronJob -> job -> Pod + if addResourceMetadata != nil && addResourceMetadata.Deployment { + extra = append(extra, ReplicaSetResource) + } + if addResourceMetadata != nil && addResourceMetadata.CronJob { + extra = append(extra, JobResource) + } + return extra + case ServiceResource: + return []string{NamespaceResource} + case DeploymentResource: + return []string{NamespaceResource} + case ReplicaSetResource: + return []string{NamespaceResource} + case StatefulSetResource: + return []string{NamespaceResource} + case DaemonSetResource: + return []string{NamespaceResource} + case JobResource: + return []string{NamespaceResource} + case CronJobResource: + return []string{NamespaceResource} + case PersistentVolumeResource: + return []string{} + case PersistentVolumeClaimResource: + return []string{NamespaceResource} + case StorageClassResource: + return []string{} + case NodeResource: + return []string{} + case NamespaceResource: + return []string{} + default: + return []string{} + } +} + +// getResourceName returns the name of the resource for a metricset. +// Example: state_pod metricset uses pod resource. +// Exception is state_namespace. +func getResourceName(metricsetName string) string { + resourceName := metricsetName + if resourceName != NamespaceResource { + resourceName = strings.ReplaceAll(resourceName, StateMetricsetPrefix, "") + } + return resourceName +} + +// getWatchOptions builds the kubernetes.WatchOptions{} needed for the watcher based on the config and nodeScope. +func getWatchOptions(config *kubernetesConfig, nodeScope bool, client k8sclient.Interface, log *logp.Logger) (*kubernetes.WatchOptions, error) { + var err error + options := kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + } + + // Watch objects in the node only. + if nodeScope { + nd := &kubernetes.DiscoverKubernetesNodeParams{ + ConfigHost: config.Node, + Client: client, + IsInCluster: kubernetes.IsInCluster(config.KubeConfig), + HostUtils: &kubernetes.DefaultDiscoveryUtils{}, + } + options.Node, err = kubernetes.DiscoverKubernetesNode(log, nd) + if err != nil { + return nil, fmt.Errorf("couldn't discover kubernetes node: %w", err) + } + } + return &options, err +} + +func isNamespaced(resourceName string) bool { + if resourceName == NodeResource || resourceName == PersistentVolumeResource || resourceName == StorageClassResource || + resourceName == NamespaceResource { + return false + } + return true +} + +// createWatcher creates a watcher for a specific resource if not already created and stores it in the resourceWatchers map. +// resourceName is the key in the resourceWatchers map where the created watcher gets stored. +// options are the watch options for a specific watcher. +// For example a watcher can be configured through options to watch only for resources on a specific node/namespace or in whole cluster. +// resourceWatchers is the store for all created watchers. +// extraWatcher bool sets apart the watchers that are created as main watcher for a resource and the ones that are created as an extra watcher. +func createWatcher( + resourceName string, + resource kubernetes.Resource, + options kubernetes.WatchOptions, + client k8sclient.Interface, + resourceWatchers *Watchers, + namespace string, + extraWatcher bool) (bool, error) { + + // We need to check the node scope to decide on whether a watcher should be updated or not. + nodeScope := false + if options.Node != "" { + nodeScope = true + } + // The nodescope for extra watchers node, namespace, replicaset and job should be always false. + if extraWatcher { + nodeScope = false + options.Node = "" + } + + resourceWatchers.lock.Lock() + defer resourceWatchers.lock.Unlock() + + // Check if a watcher for the specific resource already exists. + resourceMetaWatcher, ok := resourceWatchers.metaWatchersMap[resourceName] + + // If it does not exist, create the resourceMetaWatcher. + if !ok { + // Check if we need to add namespace to the watcher's options. + if isNamespaced(resourceName) { + options.Namespace = namespace + } + watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) + if err != nil { + return false, err + } + resourceWatchers.metaWatchersMap[resourceName] = &metaWatcher{ + watcher: watcher, + started: false, // not started yet + metadataObjects: make(map[string]bool), + enrichers: make(map[string]*enricher), + metricsetsUsing: make([]string, 0), + restartWatcher: nil, + nodeScope: nodeScope, + } + return true, nil + } else if resourceMetaWatcher.nodeScope != nodeScope && resourceMetaWatcher.nodeScope { + // It might happen that the watcher already exists, but is only being used to monitor the resources + // of a single node(e.g. created by pod metricset). In that case, we need to check if we are trying to create a new watcher that will track + // the resources of whole cluster(e.g. in case of state_pod metricset). + // If it is the case, then we need to update the watcher by changing its watch options (removing options.Node) + // A running watcher cannot be updated directly. Instead, we must create a new one with the correct watch options. + // The new restartWatcher must be identical to the old watcher, including the same handler function, with the only difference being the watch options. + + if isNamespaced(resourceName) { + options.Namespace = namespace + } + restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) + if err != nil { + return false, err + } + // update the handler of the restartWatcher to match the current watcher's handler. + restartWatcher.AddEventHandler(resourceMetaWatcher.watcher.GetEventHandler()) + resourceMetaWatcher.restartWatcher = restartWatcher + resourceMetaWatcher.nodeScope = nodeScope + } + return false, nil +} + +// addToMetricsetsUsing adds metricset identified by metricsetUsing to the list of resources using the shared watcher +// identified by resourceName. The caller of this function should not be holding the lock. +func addToMetricsetsUsing(resourceName string, metricsetUsing string, resourceWatchers *Watchers) { + resourceWatchers.lock.Lock() + defer resourceWatchers.lock.Unlock() + + data, ok := resourceWatchers.metaWatchersMap[resourceName] + if ok { + contains := false + for _, which := range data.metricsetsUsing { + if which == metricsetUsing { + contains = true + break + } + } + // add this resource to the list of resources using it + if !contains { + data.metricsetsUsing = append(data.metricsetsUsing, metricsetUsing) + } + } +} + +// removeFromMetricsetsUsing removes the metricset from the list of resources using the shared watcher. +// It returns true if element was removed and new size of array. +// The cache should be locked when called. +func removeFromMetricsetsUsing(resourceName string, notUsingName string, resourceWatchers *Watchers) (bool, int) { + data, ok := resourceWatchers.metaWatchersMap[resourceName] + removed := false + if ok { + newIndex := 0 + for i, which := range data.metricsetsUsing { + if which == notUsingName { + removed = true + } else { + data.metricsetsUsing[newIndex] = data.metricsetsUsing[i] + newIndex++ + } + } + data.metricsetsUsing = data.metricsetsUsing[:newIndex] + return removed, len(data.metricsetsUsing) + } + return removed, 0 +} + +// createAllWatchers creates all the watchers required by a metricset +func createAllWatchers( + client k8sclient.Interface, + metricsetName string, + resourceName string, + nodeScope bool, + config *kubernetesConfig, + log *logp.Logger, + resourceWatchers *Watchers, +) error { + res := getResource(resourceName) + if res == nil { + return fmt.Errorf("resource for name %s does not exist. Watcher cannot be created", resourceName) + } + + options, err := getWatchOptions(config, nodeScope, client, log) + if err != nil { + return err + } + // Create the main watcher for the given resource. + // For example pod metricset's main watcher will be pod watcher. + // If it fails, we return an error, so we can stop the extra watchers from creating. + created, err := createWatcher(resourceName, res, *options, client, resourceWatchers, config.Namespace, false) + if err != nil { + return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err) + } else if created { + log.Debugf("Created watcher %s successfully, created by %s.", resourceName, metricsetName) + } + // add this metricset to the ones using the watcher + addToMetricsetsUsing(resourceName, metricsetName, resourceWatchers) + + // Create any extra watchers required by this resource + // For example pod requires also namespace and node watcher and possibly replicaset and job watcher. + extraWatchers := getExtraWatchers(resourceName, config.AddResourceMetadata) + for _, extra := range extraWatchers { + extraRes := getResource(extra) + if extraRes != nil { + created, err = createWatcher(extra, extraRes, *options, client, resourceWatchers, config.Namespace, true) + if err != nil { + log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err) + } else { + if created { + log.Debugf("Created watcher %s successfully, created by %s.", extra, metricsetName) + } + // add this metricset to the ones using the extra watchers + addToMetricsetsUsing(extra, metricsetName, resourceWatchers) + } + } else { + log.Errorf("Resource for name %s does not exist. Watcher cannot be created.", extra) + } + } + + return nil +} + +// createMetadataGen creates and returns the metadata generator for resources other than pod and service +// metaGen is a struct of type Resource and implements Generate method for metadata generation for a given resource kind. +func createMetadataGen(client k8sclient.Interface, commonConfig *conf.C, addResourceMetadata *metadata.AddResourceMetadataConfig, + resourceName string, resourceWatchers *Watchers) (*metadata.Resource, error) { + + resourceWatchers.lock.RLock() + defer resourceWatchers.lock.RUnlock() + + resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName] + // This should not be possible since the watchers should have been created before + if resourceMetaWatcher == nil { + return nil, fmt.Errorf("could not create the metadata generator, as the watcher for %s does not exist", resourceName) + } + + var metaGen *metadata.Resource + + namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource] + if namespaceMetaWatcher != nil { + n := metadata.NewNamespaceMetadataGenerator(addResourceMetadata.Namespace, + (*namespaceMetaWatcher).watcher.Store(), client) + metaGen = metadata.NewNamespaceAwareResourceMetadataGenerator(commonConfig, client, n) + } else { + metaGen = metadata.NewResourceMetadataGenerator(commonConfig, client) + } + + return metaGen, nil +} + +// createMetadataGenSpecific creates and returns the metadata generator for a specific resource - pod or service +// A metaGen struct implements a MetaGen interface and is designed to utilize the necessary watchers to collect(Generate) metadata for a specific resource. +func createMetadataGenSpecific(client k8sclient.Interface, commonConfig *conf.C, addResourceMetadata *metadata.AddResourceMetadataConfig, + resourceName string, resourceWatchers *Watchers) (metadata.MetaGen, error) { + + resourceWatchers.lock.RLock() + defer resourceWatchers.lock.RUnlock() + // The watcher for the resource needs to exist + resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName] + if resourceMetaWatcher == nil { + return nil, fmt.Errorf("could not create the metadata generator, as the watcher for %s does not exist", resourceName) + } + mainWatcher := (*resourceMetaWatcher).watcher + if (*resourceMetaWatcher).restartWatcher != nil { + mainWatcher = (*resourceMetaWatcher).restartWatcher + } + + var metaGen metadata.MetaGen + if resourceName == PodResource { + var nodeWatcher kubernetes.Watcher + if nodeMetaWatcher := resourceWatchers.metaWatchersMap[NodeResource]; nodeMetaWatcher != nil { + nodeWatcher = (*nodeMetaWatcher).watcher + } + var namespaceWatcher kubernetes.Watcher + if namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource]; namespaceMetaWatcher != nil { + namespaceWatcher = (*namespaceMetaWatcher).watcher + } + var replicaSetWatcher kubernetes.Watcher + if replicasetMetaWatcher := resourceWatchers.metaWatchersMap[ReplicaSetResource]; replicasetMetaWatcher != nil { + replicaSetWatcher = (*replicasetMetaWatcher).watcher + } + var jobWatcher kubernetes.Watcher + if jobMetaWatcher := resourceWatchers.metaWatchersMap[JobResource]; jobMetaWatcher != nil { + jobWatcher = (*jobMetaWatcher).watcher + } + // For example for pod named redis in namespace default, the generator uses the pod watcher for pod metadata, + // collects all node metadata using the node watcher's store and all namespace metadata using the namespacewatcher's store. + metaGen = metadata.GetPodMetaGen(commonConfig, mainWatcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, + jobWatcher, addResourceMetadata) + return metaGen, nil + } else if resourceName == ServiceResource { + namespaceMetaWatcher := resourceWatchers.metaWatchersMap[NamespaceResource] + if namespaceMetaWatcher == nil { + return nil, fmt.Errorf("could not create the metadata generator, as the watcher for namespace does not exist") + } + namespaceMeta := metadata.NewNamespaceMetadataGenerator(addResourceMetadata.Namespace, + (*namespaceMetaWatcher).watcher.Store(), client) + metaGen = metadata.NewServiceMetadataGenerator(commonConfig, (*resourceMetaWatcher).watcher.Store(), + namespaceMeta, client) + return metaGen, nil + } + + // Should never reach this part, as this function is only for service or pod resources + return metaGen, fmt.Errorf("failed to create a metadata generator for resource %s", resourceName) +} + +// NewResourceMetadataEnricher returns a metadata enricher for a given resource +// For the metadata enrichment, resource watchers are used which are shared between +// the different metricsets. For example for pod metricset, a pod watcher, a namespace and +// node watcher are by default needed in addition to job and replicaset watcher according +// to configuration. These watchers will be also used by other metricsets that require them +// like state_pod, state_container, node etc. func NewResourceMetadataEnricher( base mb.BaseMetricSet, - resourceName string, metricsRepo *MetricsRepo, + resourceWatchers *Watchers, nodeScope bool) Enricher { + log := logp.NewLogger(selector) - var replicaSetWatcher, jobWatcher kubernetes.Watcher - + // metricset configuration config, err := GetValidatedConfig(base) if err != nil { - logp.Info("Kubernetes metricset enriching is disabled") + log.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } - res := getResource(resourceName) - if res == nil { + // This type of config is needed for the metadata generator + // and includes detailed settings for metadata enrichment + commonMetaConfig := metadata.Config{} + if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { + log.Errorf("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } + commonConfig, _ := conf.NewConfigFrom(&commonMetaConfig) client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) if err != nil { - logp.Err("Error creating Kubernetes client: %s", err) + log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } - watcher, nodeWatcher, namespaceWatcher := getResourceMetadataWatchers(config, res, client, nodeScope) - - if watcher == nil { + metricsetName := base.Name() + resourceName := getResourceName(metricsetName) + // Create all watchers needed for this metricset + err = createAllWatchers(client, metricsetName, resourceName, nodeScope, config, log, resourceWatchers) + if err != nil { + log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} } - // commonMetaConfig stores the metadata configuration of the resource itself - commonMetaConfig := metadata.Config{} - if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { - logp.Err("Error initializing Kubernetes metadata enricher: %s", err) + var specificMetaGen metadata.MetaGen + var generalMetaGen *metadata.Resource + // Create the metadata generator to be used in the watcher's event handler. + // Both specificMetaGen and generalMetaGen implement Generate method for metadata collection. + if resourceName == ServiceResource || resourceName == PodResource { + specificMetaGen, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, resourceName, resourceWatchers) + } else { + generalMetaGen, err = createMetadataGen(client, commonConfig, config.AddResourceMetadata, resourceName, resourceWatchers) + } + if err != nil { + log.Errorf("Error trying to create the metadata generators: %s", err) return &nilEnricher{} } - cfg, _ := conf.NewConfigFrom(&commonMetaConfig) - // if Resource is Pod then we need to create watchers for Replicasets and Jobs that it might belongs to - // in order to be able to retrieve 2nd layer Owner metadata like in case of: - // Deployment -> Replicaset -> Pod - // CronJob -> job -> Pod - if resourceName == PodResource { - if config.AddResourceMetadata.Deployment { - replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err) - return &nilEnricher{} - } + // updateFunc to be used as the resource watcher's add and update handler. + // The handler function is executed when a watcher is triggered(i.e. new/updated resource). + // It is responsible for generating the metadata for a detected resource by executing the metadata generator's Generate method. + // It is a common handler for all resource watchers. The kind of resource(e.g. pod or deployment) is checked inside the function. + // It returns a map of a resourse identifier(i.e namespace-resource_name) as key and the metadata as value. + updateFunc := func(r kubernetes.Resource) map[string]mapstr.M { + accessor, _ := meta.Accessor(r) + id := accessor.GetName() + namespace := accessor.GetNamespace() + if namespace != "" { + id = join(namespace, id) } - if config.AddResourceMetadata.CronJob { - jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) - return &nilEnricher{} + switch r := r.(type) { + case *kubernetes.Pod: + return map[string]mapstr.M{id: specificMetaGen.Generate(r)} + + case *kubernetes.Node: + nodeName := r.GetObjectMeta().GetName() + metrics := NewNodeMetrics() + if cpu, ok := r.Status.Capacity["cpu"]; ok { + if q, err := resource.ParseQuantity(cpu.String()); err == nil { + metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000) + } + } + if memory, ok := r.Status.Capacity["memory"]; ok { + if q, err := resource.ParseQuantity(memory.String()); err == nil { + metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value())) + } } + nodeStore, _ := metricsRepo.AddNodeStore(nodeName) + nodeStore.SetNodeMetrics(metrics) + + return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)} + case *kubernetes.Deployment: + return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)} + case *kubernetes.Job: + return map[string]mapstr.M{id: generalMetaGen.Generate(JobResource, r)} + case *kubernetes.CronJob: + return map[string]mapstr.M{id: generalMetaGen.Generate(CronJobResource, r)} + case *kubernetes.Service: + return map[string]mapstr.M{id: specificMetaGen.Generate(r)} + case *kubernetes.StatefulSet: + return map[string]mapstr.M{id: generalMetaGen.Generate(StatefulSetResource, r)} + case *kubernetes.Namespace: + return map[string]mapstr.M{id: generalMetaGen.Generate(NamespaceResource, r)} + case *kubernetes.ReplicaSet: + return map[string]mapstr.M{id: generalMetaGen.Generate(ReplicaSetResource, r)} + case *kubernetes.DaemonSet: + return map[string]mapstr.M{id: generalMetaGen.Generate(DaemonSetResource, r)} + case *kubernetes.PersistentVolume: + return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeResource, r)} + case *kubernetes.PersistentVolumeClaim: + return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeClaimResource, r)} + case *kubernetes.StorageClass: + return map[string]mapstr.M{id: generalMetaGen.Generate(StorageClassResource, r)} + default: + return map[string]mapstr.M{id: generalMetaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r)} } } - podMetaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, config.AddResourceMetadata) - - namespaceMeta := metadata.NewNamespaceMetadataGenerator(config.AddResourceMetadata.Namespace, namespaceWatcher.Store(), watcher.Client()) - serviceMetaGen := metadata.NewServiceMetadataGenerator(cfg, watcher.Store(), namespaceMeta, watcher.Client()) - - metaGen := metadata.NewNamespaceAwareResourceMetadataGenerator(cfg, watcher.Client(), namespaceMeta) + // deleteFunc to be used as the resource watcher's delete handler. + // The deleteFunc is executed when a watcher is triggered for a resource deletion(e.g. pod deleted). + // It returns the identifier of the resource. + deleteFunc := func(r kubernetes.Resource) []string { + accessor, _ := meta.Accessor(r) - enricher := buildMetadataEnricher(watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, - // update - func(m map[string]mapstr.M, r kubernetes.Resource) { - accessor, _ := meta.Accessor(r) - id := join(accessor.GetNamespace(), accessor.GetName()) - - switch r := r.(type) { - case *kubernetes.Pod: - m[id] = podMetaGen.Generate(r) + switch r := r.(type) { + case *kubernetes.Node: + nodeName := r.GetObjectMeta().GetName() + metricsRepo.DeleteNodeStore(nodeName) + } - case *kubernetes.Node: - nodeName := r.GetObjectMeta().GetName() - metrics := NewNodeMetrics() - if cpu, ok := r.Status.Capacity["cpu"]; ok { - if q, err := resource.ParseQuantity(cpu.String()); err == nil { - metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000) - } - } - if memory, ok := r.Status.Capacity["memory"]; ok { - if q, err := resource.ParseQuantity(memory.String()); err == nil { - metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value())) - } - } - nodeStore, _ := metricsRepo.AddNodeStore(nodeName) - nodeStore.SetNodeMetrics(metrics) - - m[id] = metaGen.Generate(NodeResource, r) - - case *kubernetes.Deployment: - m[id] = metaGen.Generate(DeploymentResource, r) - case *kubernetes.Job: - m[id] = metaGen.Generate(JobResource, r) - case *kubernetes.CronJob: - m[id] = metaGen.Generate(CronJobResource, r) - case *kubernetes.Service: - m[id] = serviceMetaGen.Generate(r) - case *kubernetes.StatefulSet: - m[id] = metaGen.Generate(StatefulSetResource, r) - case *kubernetes.Namespace: - m[id] = metaGen.Generate(NamespaceResource, r) - case *kubernetes.ReplicaSet: - m[id] = metaGen.Generate(ReplicaSetResource, r) - case *kubernetes.DaemonSet: - m[id] = metaGen.Generate(DaemonSetResource, r) - case *kubernetes.PersistentVolume: - m[id] = metaGen.Generate(PersistentVolumeResource, r) - case *kubernetes.PersistentVolumeClaim: - m[id] = metaGen.Generate(PersistentVolumeClaimResource, r) - case *kubernetes.StorageClass: - m[id] = metaGen.Generate(StorageClassResource, r) - default: - m[id] = metaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r) - } - }, - // delete - func(m map[string]mapstr.M, r kubernetes.Resource) { - accessor, _ := meta.Accessor(r) - - switch r := r.(type) { - case *kubernetes.Node: - nodeName := r.GetObjectMeta().GetName() - metricsRepo.DeleteNodeStore(nodeName) - } + id := accessor.GetName() + namespace := accessor.GetNamespace() + if namespace != "" { + id = join(namespace, id) + } + return []string{id} + } - id := join(accessor.GetNamespace(), accessor.GetName()) - delete(m, id) - }, - // index - func(e mapstr.M) string { - return join(getString(e, mb.ModuleDataKey+".namespace"), getString(e, "name")) - }, - ) + // indexFunc constructs and returns the resource identifier from a given event. + // If a resource is namespaced(e.g. pod) the identifier is in the form of namespace-resource_name. + // If it is not namespaced(e.g. node) the identifier is the resource's name. + indexFunc := func(e mapstr.M) string { + name := getString(e, "name") + namespace := getString(e, mb.ModuleDataKey+".namespace") + id := "" + if name != "" && namespace != "" { + id = join(namespace, name) + } else if namespace != "" { + id = namespace + } else { + id = name + } + return id + } - // Configure the enricher for Pods, so pod specific metadata ends up in the right place when - // calling Enrich - if _, ok := res.(*kubernetes.Pod); ok { + // create a metadata enricher for this metricset + enricher := buildMetadataEnricher( + metricsetName, + resourceName, + resourceWatchers, + config, + updateFunc, + deleteFunc, + indexFunc, + log) + if resourceName == PodResource { enricher.isPod = true } @@ -288,197 +684,138 @@ func NewResourceMetadataEnricher( func NewContainerMetadataEnricher( base mb.BaseMetricSet, metricsRepo *MetricsRepo, + resourceWatchers *Watchers, nodeScope bool) Enricher { - var replicaSetWatcher, jobWatcher kubernetes.Watcher + log := logp.NewLogger(selector) + config, err := GetValidatedConfig(base) if err != nil { - logp.Info("Kubernetes metricset enriching is disabled") + log.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } - client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) - if err != nil { - logp.Err("Error creating Kubernetes client: %s", err) + // This type of config is needed for the metadata generator + commonMetaConfig := metadata.Config{} + if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { + log.Errorf("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } + commonConfig, _ := conf.NewConfigFrom(&commonMetaConfig) - watcher, nodeWatcher, namespaceWatcher := getResourceMetadataWatchers(config, &kubernetes.Pod{}, client, nodeScope) - if watcher == nil { + client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } - // commonMetaConfig stores the metadata configuration of the resource itself - commonMetaConfig := metadata.Config{} - if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { - logp.Err("Error initializing Kubernetes metadata enricher: %s", err) + metricsetName := base.Name() + + err = createAllWatchers(client, metricsetName, PodResource, nodeScope, config, log, resourceWatchers) + if err != nil { + log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} } - cfg, _ := conf.NewConfigFrom(&commonMetaConfig) - // Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to - // in order to be able to retrieve 2nd layer Owner metadata like in case of: - // Deployment -> Replicaset -> Pod - // CronJob -> job -> Pod - if config.AddResourceMetadata.Deployment { - replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) - return &nilEnricher{} - } - } - if config.AddResourceMetadata.CronJob { - jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) - return &nilEnricher{} - } + metaGen, err := createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, PodResource, resourceWatchers) + if err != nil { + log.Errorf("Error trying to create the metadata generators: %s", err) + return &nilEnricher{} } - metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, config.AddResourceMetadata) + updateFunc := func(r kubernetes.Resource) map[string]mapstr.M { + metadataEvents := make(map[string]mapstr.M) - enricher := buildMetadataEnricher(watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, - // update - func(m map[string]mapstr.M, r kubernetes.Resource) { - pod, ok := r.(*kubernetes.Pod) - if !ok { - base.Logger().Debugf("Error while casting event: %s", ok) - } - pmeta := metaGen.Generate(pod) + pod, ok := r.(*kubernetes.Pod) + if !ok { + base.Logger().Debugf("Error while casting event: %s", ok) + } + pmeta := metaGen.Generate(pod) - statuses := make(map[string]*kubernetes.PodContainerStatus) - mapStatuses := func(s []kubernetes.PodContainerStatus) { - for i := range s { - statuses[s[i].Name] = &s[i] - } + statuses := make(map[string]*kubernetes.PodContainerStatus) + mapStatuses := func(s []kubernetes.PodContainerStatus) { + for i := range s { + statuses[s[i].Name] = &s[i] } - mapStatuses(pod.Status.ContainerStatuses) - mapStatuses(pod.Status.InitContainerStatuses) + } + mapStatuses(pod.Status.ContainerStatuses) + mapStatuses(pod.Status.InitContainerStatuses) - nodeStore, _ := metricsRepo.AddNodeStore(pod.Spec.NodeName) - podId := NewPodId(pod.Namespace, pod.Name) - podStore, _ := nodeStore.AddPodStore(podId) + nodeStore, _ := metricsRepo.AddNodeStore(pod.Spec.NodeName) + podId := NewPodId(pod.Namespace, pod.Name) + podStore, _ := nodeStore.AddPodStore(podId) - for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { - cmeta := mapstr.M{} - metrics := NewContainerMetrics() + for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { + cmeta := mapstr.M{} + metrics := NewContainerMetrics() - if cpu, ok := container.Resources.Limits["cpu"]; ok { - if q, err := resource.ParseQuantity(cpu.String()); err == nil { - metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000) - } + if cpu, ok := container.Resources.Limits["cpu"]; ok { + if q, err := resource.ParseQuantity(cpu.String()); err == nil { + metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000) } - if memory, ok := container.Resources.Limits["memory"]; ok { - if q, err := resource.ParseQuantity(memory.String()); err == nil { - metrics.MemoryLimit = NewFloat64Metric(float64(q.Value())) - } + } + if memory, ok := container.Resources.Limits["memory"]; ok { + if q, err := resource.ParseQuantity(memory.String()); err == nil { + metrics.MemoryLimit = NewFloat64Metric(float64(q.Value())) } + } - containerStore, _ := podStore.AddContainerStore(container.Name) - containerStore.SetContainerMetrics(metrics) + containerStore, _ := podStore.AddContainerStore(container.Name) + containerStore.SetContainerMetrics(metrics) - if s, ok := statuses[container.Name]; ok { - // Extracting id and runtime ECS fields from ContainerID - // which is in the form of :// - split := strings.Index(s.ContainerID, "://") - if split != -1 { - kubernetes2.ShouldPut(cmeta, "container.id", s.ContainerID[split+3:], base.Logger()) + if s, ok := statuses[container.Name]; ok { + // Extracting id and runtime ECS fields from ContainerID + // which is in the form of :// + split := strings.Index(s.ContainerID, "://") + if split != -1 { + kubernetes2.ShouldPut(cmeta, "container.id", s.ContainerID[split+3:], base.Logger()) - kubernetes2.ShouldPut(cmeta, "container.runtime", s.ContainerID[:split], base.Logger()) - } + kubernetes2.ShouldPut(cmeta, "container.runtime", s.ContainerID[:split], base.Logger()) } - - id := join(pod.GetObjectMeta().GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) - cmeta.DeepUpdate(pmeta) - m[id] = cmeta } - }, - // delete - func(m map[string]mapstr.M, r kubernetes.Resource) { - pod, ok := r.(*kubernetes.Pod) - if !ok { - base.Logger().Debugf("Error while casting event: %s", ok) - } - podId := NewPodId(pod.Namespace, pod.Name) - nodeStore := metricsRepo.GetNodeStore(pod.Spec.NodeName) - nodeStore.DeletePodStore(podId) - for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { - id := join(pod.ObjectMeta.GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) - delete(m, id) - } - }, - // index - func(e mapstr.M) string { - return join(getString(e, mb.ModuleDataKey+".namespace"), getString(e, mb.ModuleDataKey+".pod.name"), getString(e, "name")) - }, - ) - - return enricher -} + id := join(pod.GetObjectMeta().GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) + cmeta.DeepUpdate(pmeta) -func getResourceMetadataWatchers( - config *kubernetesConfig, - resource kubernetes.Resource, - client k8sclient.Interface, nodeScope bool) (kubernetes.Watcher, kubernetes.Watcher, kubernetes.Watcher) { - - var err error - - options := kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Namespace: config.Namespace, - } - - log := logp.NewLogger(selector) - - // Watch objects in the node only - if nodeScope { - nd := &kubernetes.DiscoverKubernetesNodeParams{ - ConfigHost: config.Node, - Client: client, - IsInCluster: kubernetes.IsInCluster(config.KubeConfig), - HostUtils: &kubernetes.DefaultDiscoveryUtils{}, - } - options.Node, err = kubernetes.DiscoverKubernetesNode(log, nd) - if err != nil { - logp.Err("Couldn't discover kubernetes node: %s", err) - return nil, nil, nil + metadataEvents[id] = cmeta } + return metadataEvents } - log.Debugf("Initializing a new Kubernetes watcher using host: %v", config.Node) + deleteFunc := func(r kubernetes.Resource) []string { + ids := make([]string, 0) + pod, ok := r.(*kubernetes.Pod) + if !ok { + base.Logger().Debugf("Error while casting event: %s", ok) + } + podId := NewPodId(pod.Namespace, pod.Name) + nodeStore := metricsRepo.GetNodeStore(pod.Spec.NodeName) + nodeStore.DeletePodStore(podId) - watcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher", client, resource, options, nil) - if err != nil { - logp.Err("Error initializing Kubernetes watcher: %s", err) - return nil, nil, nil - } + for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { + id := join(pod.ObjectMeta.GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) + ids = append(ids, id) + } - nodeWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_node", client, &kubernetes.Node{}, options, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Node{}, err) - return watcher, nil, nil + return ids } - namespaceWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) - return watcher, nodeWatcher, nil + indexFunc := func(e mapstr.M) string { + return join(getString(e, mb.ModuleDataKey+".namespace"), getString(e, mb.ModuleDataKey+".pod.name"), getString(e, "name")) } - return watcher, nodeWatcher, namespaceWatcher -} + enricher := buildMetadataEnricher( + metricsetName, + PodResource, + resourceWatchers, + config, + updateFunc, + deleteFunc, + indexFunc, + log, + ) -func GetDefaultDisabledMetaConfig() *kubernetesConfig { - return &kubernetesConfig{ - AddMetadata: false, - } + return enricher } func GetValidatedConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) { @@ -531,114 +868,228 @@ func join(fields ...string) string { return strings.Join(fields, ":") } +// buildMetadataEnricher builds and returns a metadata enricher for a given metricset. +// It appends the new enricher to the watcher.enrichers map for the given resource watcher. +// It also updates the add, update and delete event handlers of the watcher in order to retrieve +// the metadata of all enrichers associated to that watcher. func buildMetadataEnricher( - watcher kubernetes.Watcher, - nodeWatcher kubernetes.Watcher, - namespaceWatcher kubernetes.Watcher, - replicasetWatcher kubernetes.Watcher, - jobWatcher kubernetes.Watcher, - update func(map[string]mapstr.M, kubernetes.Resource), - delete func(map[string]mapstr.M, kubernetes.Resource), - index func(e mapstr.M) string) *enricher { - - enricher := enricher{ - metadata: map[string]mapstr.M{}, - index: index, - watcher: watcher, - nodeWatcher: nodeWatcher, - namespaceWatcher: namespaceWatcher, - replicasetWatcher: replicasetWatcher, - jobWatcher: jobWatcher, - } - - watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - enricher.Lock() - defer enricher.Unlock() - update(enricher.metadata, obj.(kubernetes.Resource)) - }, - UpdateFunc: func(obj interface{}) { - enricher.Lock() - defer enricher.Unlock() - update(enricher.metadata, obj.(kubernetes.Resource)) - }, - DeleteFunc: func(obj interface{}) { - enricher.Lock() - defer enricher.Unlock() - delete(enricher.metadata, obj.(kubernetes.Resource)) - }, - }) - - return &enricher -} + metricsetName string, + resourceName string, + resourceWatchers *Watchers, + config *kubernetesConfig, + updateFunc func(kubernetes.Resource) map[string]mapstr.M, + deleteFunc func(kubernetes.Resource) []string, + indexFunc func(e mapstr.M) string, + log *logp.Logger) *enricher { + + enricher := &enricher{ + metadata: map[string]mapstr.M{}, + index: indexFunc, + updateFunc: updateFunc, + deleteFunc: deleteFunc, + resourceName: resourceName, + metricsetName: metricsetName, + config: config, + log: log, + } -func (m *enricher) Start() { - m.watchersStartedLock.Lock() - defer m.watchersStartedLock.Unlock() - if !m.watchersStarted { - if m.nodeWatcher != nil { - if err := m.nodeWatcher.Start(); err != nil { - logp.Warn("Error starting node watcher: %s", err) + resourceWatchers.lock.Lock() + defer resourceWatchers.lock.Unlock() + + // Check if a watcher for this resource already exists. + resourceMetaWatcher := resourceWatchers.metaWatchersMap[resourceName] + if resourceMetaWatcher != nil { + // Append the new enricher to watcher's enrichers map. + resourceMetaWatcher.enrichers[metricsetName] = enricher + + // Check if this shared watcher has already detected resources and collected their + // metadata for another enricher. + // In that case, for each resource, call the updateFunc of the current enricher to + // generate its metadata. This is needed in cases where the watcher has already been + // notified for new/updated resources while the enricher for current metricset has not + // built yet (example is pod, state_pod metricsets). + for key := range resourceMetaWatcher.metadataObjects { + obj, exists, err := resourceMetaWatcher.watcher.Store().GetByKey(key) + if err != nil { + log.Errorf("Error trying to get the object from the store: %s", err) + } else { + if exists { + newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource)) + // add the new metadata to the watcher received metadata + for id, metadata := range newMetadataEvents { + enricher.metadata[id] = metadata + } + } } } - if m.namespaceWatcher != nil { - if err := m.namespaceWatcher.Start(); err != nil { - logp.Warn("Error starting namespace watcher: %s", err) - } - } + // AddEventHandler sets add, update and delete methods of watcher. + // Those methods are triggered when an event is detected for a + // resource creation, update or deletion. + resourceMetaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + resourceWatchers.lock.Lock() + defer resourceWatchers.lock.Unlock() + + // Add object(detected resource) to the list of metadata objects of this watcher, + // so it can be used by enrichers created after the event is triggered. + // The identifier of the object is in the form of namespace/name so that + // it can be easily fetched from watcher's store in previous step. + accessor, _ := meta.Accessor(obj.(kubernetes.Resource)) + id := accessor.GetName() + namespace := accessor.GetNamespace() + if namespace != "" { + id = namespace + "/" + id + } + resourceMetaWatcher.metadataObjects[id] = true + // Execute the updateFunc of each enricher associated to thos watcher. + for _, enricher := range resourceMetaWatcher.enrichers { + enricher.Lock() + newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource)) + // add the new metadata to the watcher received metadata + for id, metadata := range newMetadataEvents { + enricher.metadata[id] = metadata + } + enricher.Unlock() + } + }, + UpdateFunc: func(obj interface{}) { + resourceWatchers.lock.Lock() + defer resourceWatchers.lock.Unlock() + + // Add object to the list of metadata objects of this watcher + accessor, _ := meta.Accessor(obj.(kubernetes.Resource)) + id := accessor.GetName() + namespace := accessor.GetNamespace() + if namespace != "" { + id = namespace + "/" + id + } + resourceMetaWatcher.metadataObjects[id] = true - if m.replicasetWatcher != nil { - if err := m.replicasetWatcher.Start(); err != nil { - logp.Warn("Error starting replicaset watcher: %s", err) - } - } + for _, enricher := range resourceMetaWatcher.enrichers { + enricher.Lock() + updatedMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource)) + for id, metadata := range updatedMetadataEvents { + enricher.metadata[id] = metadata + } + enricher.Unlock() + } + }, + DeleteFunc: func(obj interface{}) { + resourceWatchers.lock.Lock() + defer resourceWatchers.lock.Unlock() + + // Remove object from the list of metadata objects of this watcher + accessor, _ := meta.Accessor(obj.(kubernetes.Resource)) + id := accessor.GetName() + namespace := accessor.GetNamespace() + if namespace != "" { + id = namespace + "/" + id + } + delete(resourceMetaWatcher.metadataObjects, id) + + for _, enricher := range resourceMetaWatcher.enrichers { + enricher.Lock() + ids := enricher.deleteFunc(obj.(kubernetes.Resource)) + // update this watcher events by removing all the metadata[id] + for _, id := range ids { + delete(enricher.metadata, id) + } + enricher.Unlock() + } + }, + }) + } - if m.jobWatcher != nil { - if err := m.jobWatcher.Start(); err != nil { - logp.Warn("Error starting job watcher: %s", err) + return enricher +} + +// Start starts all the watchers associated with a given enricher's resource. +func (e *enricher) Start(resourceWatchers *Watchers) { + resourceWatchers.lock.Lock() + defer resourceWatchers.lock.Unlock() + + // Each resource may require multiple watchers. Firstly, we start the + // extra watchers as they are a dependency for the main resource watcher + // For example a pod watcher requires namespace and node watcher to be started + // first. + extras := getExtraWatchers(e.resourceName, e.config.AddResourceMetadata) + for _, extra := range extras { + extraWatcherMeta := resourceWatchers.metaWatchersMap[extra] + if extraWatcherMeta != nil && !extraWatcherMeta.started { + if err := extraWatcherMeta.watcher.Start(); err != nil { + e.log.Warnf("Error starting %s watcher: %s", extra, err) + } else { + extraWatcherMeta.started = true } } + } - err := m.watcher.Start() - if err != nil { - logp.Warn("Error starting Kubernetes watcher: %s", err) + // Start the main watcher if not already started. + // If there is a restartWatcher defined, stop the old watcher if started and start the restartWatcher. + // restartWatcher replaces the old watcher and resourceMetaWatcher.restartWatcher is set to nil. + resourceMetaWatcher := resourceWatchers.metaWatchersMap[e.resourceName] + if resourceMetaWatcher != nil { + if resourceMetaWatcher.restartWatcher != nil { + if resourceMetaWatcher.started { + resourceMetaWatcher.watcher.Stop() + } + if err := resourceMetaWatcher.restartWatcher.Start(); err != nil { + e.log.Warnf("Error restarting %s watcher: %s", e.resourceName, err) + } else { + resourceMetaWatcher.watcher = resourceMetaWatcher.restartWatcher + resourceMetaWatcher.restartWatcher = nil + resourceMetaWatcher.started = true + } + } else { + if !resourceMetaWatcher.started { + if err := resourceMetaWatcher.watcher.Start(); err != nil { + e.log.Warnf("Error starting %s watcher: %s", e.resourceName, err) + } else { + resourceMetaWatcher.started = true + } + } } - m.watchersStarted = true } } -func (m *enricher) Stop() { - m.watchersStartedLock.Lock() - defer m.watchersStartedLock.Unlock() - if m.watchersStarted { - m.watcher.Stop() - - if m.namespaceWatcher != nil { - m.namespaceWatcher.Stop() - } - - if m.nodeWatcher != nil { - m.nodeWatcher.Stop() - } - - if m.replicasetWatcher != nil { - m.replicasetWatcher.Stop() +// Stop removes the enricher's metricset as a user of the associated watchers. +// If no metricset is using the watchers anymore, the watcher gets stopped. +func (e *enricher) Stop(resourceWatchers *Watchers) { + resourceWatchers.lock.Lock() + defer resourceWatchers.lock.Unlock() + + resourceMetaWatcher := resourceWatchers.metaWatchersMap[e.resourceName] + if resourceMetaWatcher != nil && resourceMetaWatcher.started { + _, size := removeFromMetricsetsUsing(e.resourceName, e.metricsetName, resourceWatchers) + if size == 0 { + resourceMetaWatcher.watcher.Stop() + resourceMetaWatcher.started = false } + } - if m.jobWatcher != nil { - m.jobWatcher.Stop() + extras := getExtraWatchers(e.resourceName, e.config.AddResourceMetadata) + for _, extra := range extras { + extraMetaWatcher := resourceWatchers.metaWatchersMap[extra] + if extraMetaWatcher != nil && extraMetaWatcher.started { + _, size := removeFromMetricsetsUsing(extra, e.metricsetName, resourceWatchers) + if size == 0 { + extraMetaWatcher.watcher.Stop() + extraMetaWatcher.started = false + } } - - m.watchersStarted = false } } -func (m *enricher) Enrich(events []mapstr.M) { - m.RLock() - defer m.RUnlock() +// Enrich enriches events with metadata saved in the enricher.metadata map +// This method is executed whenever a new event is created and about to be published. +// The enricher's index method is used to retrieve the resource identifier from each event. +func (e *enricher) Enrich(events []mapstr.M) { + e.RLock() + defer e.RUnlock() + for _, event := range events { - if meta := m.metadata[m.index(event)]; meta != nil { + if meta := e.metadata[e.index(event)]; meta != nil { k8s, err := meta.GetValue("kubernetes") if err != nil { continue @@ -648,7 +1099,7 @@ func (m *enricher) Enrich(events []mapstr.M) { continue } - if m.isPod { + if e.isPod { // apply pod meta at metricset level if podMeta, ok := k8sMeta["pod"].(mapstr.M); ok { event.DeepUpdate(podMeta) @@ -672,12 +1123,6 @@ func (m *enricher) Enrich(events []mapstr.M) { } } -type nilEnricher struct{} - -func (*nilEnricher) Start() {} -func (*nilEnricher) Stop() {} -func (*nilEnricher) Enrich([]mapstr.M) {} - func CreateEvent(event mapstr.M, namespace string) (mb.Event, error) { var moduleFieldsMapStr mapstr.M moduleFields, ok := event[mb.ModuleDataKey] diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index 92d60b28b2d..b4e528100a6 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -20,34 +20,451 @@ package util import ( "fmt" "testing" + "time" - k8s "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - - "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "github.com/elastic/elastic-agent-autodiscover/kubernetes" - "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" kubernetes2 "github.com/elastic/beats/v7/libbeat/autodiscover/providers/kubernetes" -) + "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" -var ( - logger = logp.NewLogger("kubernetes") + "github.com/stretchr/testify/require" + k8sfake "k8s.io/client-go/kubernetes/fake" + + "github.com/elastic/elastic-agent-autodiscover/kubernetes" + "github.com/elastic/elastic-agent-libs/logp" ) -func TestBuildMetadataEnricher(t *testing.T) { - watcher := mockWatcher{} - nodeWatcher := mockWatcher{} - namespaceWatcher := mockWatcher{} - rsWatcher := mockWatcher{} - jobWatcher := mockWatcher{} +func TestWatchOptions(t *testing.T) { + log := logp.NewLogger("test") + + client := k8sfake.NewSimpleClientset() + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + } + + options, err := getWatchOptions(config, false, client, log) + require.NoError(t, err) + require.Equal(t, options.SyncTimeout, config.SyncPeriod) + require.NotEqual(t, options.Node, config.Node) + + options, err = getWatchOptions(config, true, client, log) + require.NoError(t, err) + require.Equal(t, options.SyncTimeout, config.SyncPeriod) + require.Equal(t, options.Node, config.Node) +} + +func TestCreateWatcher(t *testing.T) { + resourceWatchers := NewWatchers() + + client := k8sfake.NewSimpleClientset() + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + } + log := logp.NewLogger("test") + + options, err := getWatchOptions(config, false, client, log) + require.NoError(t, err) + + created, err := createWatcher(NamespaceResource, &kubernetes.Node{}, *options, client, resourceWatchers, config.Namespace, false) + require.True(t, created) + require.NoError(t, err) + + resourceWatchers.lock.Lock() + require.Equal(t, 1, len(resourceWatchers.metaWatchersMap)) + require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource]) + require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher) + resourceWatchers.lock.Unlock() + + created, err = createWatcher(NamespaceResource, &kubernetes.Namespace{}, *options, client, resourceWatchers, config.Namespace, true) + require.False(t, created) + require.NoError(t, err) + + resourceWatchers.lock.Lock() + require.Equal(t, 1, len(resourceWatchers.metaWatchersMap)) + require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource]) + require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher) + resourceWatchers.lock.Unlock() + + created, err = createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false) + require.True(t, created) + require.NoError(t, err) + + resourceWatchers.lock.Lock() + require.Equal(t, 2, len(resourceWatchers.metaWatchersMap)) + require.NotNil(t, resourceWatchers.metaWatchersMap[DeploymentResource]) + require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource]) + resourceWatchers.lock.Unlock() +} + +func TestAddToMetricsetsUsing(t *testing.T) { + resourceWatchers := NewWatchers() + + client := k8sfake.NewSimpleClientset() + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + } + log := logp.NewLogger("test") + + options, err := getWatchOptions(config, false, client, log) + require.NoError(t, err) + + // Create the new entry with watcher and nil string array first + created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false) + require.True(t, created) + require.NoError(t, err) + + resourceWatchers.lock.Lock() + require.NotNil(t, resourceWatchers.metaWatchersMap[DeploymentResource].watcher) + require.Equal(t, []string{}, resourceWatchers.metaWatchersMap[DeploymentResource].metricsetsUsing) + resourceWatchers.lock.Unlock() + + metricsetDeployment := "state_deployment" + addToMetricsetsUsing(DeploymentResource, metricsetDeployment, resourceWatchers) + resourceWatchers.lock.Lock() + require.Equal(t, []string{metricsetDeployment}, resourceWatchers.metaWatchersMap[DeploymentResource].metricsetsUsing) + resourceWatchers.lock.Unlock() + + metricsetContainer := "container" + addToMetricsetsUsing(DeploymentResource, metricsetContainer, resourceWatchers) + resourceWatchers.lock.Lock() + require.Equal(t, []string{metricsetDeployment, metricsetContainer}, resourceWatchers.metaWatchersMap[DeploymentResource].metricsetsUsing) + resourceWatchers.lock.Unlock() +} + +func TestRemoveFromMetricsetsUsing(t *testing.T) { + resourceWatchers := NewWatchers() + + client := k8sfake.NewSimpleClientset() + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + } + log := logp.NewLogger("test") + + options, err := getWatchOptions(config, false, client, log) + require.NoError(t, err) + + // Create the new entry with watcher and nil string array first + created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false) + require.True(t, created) + require.NoError(t, err) + + metricsetDeployment := "state_deployment" + metricsetPod := "state_pod" + addToMetricsetsUsing(DeploymentResource, metricsetDeployment, resourceWatchers) + addToMetricsetsUsing(DeploymentResource, metricsetPod, resourceWatchers) + + resourceWatchers.lock.Lock() + defer resourceWatchers.lock.Unlock() + + removed, size := removeFromMetricsetsUsing(DeploymentResource, metricsetDeployment, resourceWatchers) + require.True(t, removed) + require.Equal(t, 1, size) + + removed, size = removeFromMetricsetsUsing(DeploymentResource, metricsetDeployment, resourceWatchers) + require.False(t, removed) + require.Equal(t, 1, size) + + removed, size = removeFromMetricsetsUsing(DeploymentResource, metricsetPod, resourceWatchers) + require.True(t, removed) + require.Equal(t, 0, size) +} + +func TestCreateAllWatchers(t *testing.T) { + resourceWatchers := NewWatchers() + + client := k8sfake.NewSimpleClientset() + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + AddResourceMetadata: &metadata.AddResourceMetadataConfig{ + CronJob: false, + Deployment: true, + }, + } + log := logp.NewLogger("test") + + // Start watchers based on a resource that does not exist should cause an error + err := createAllWatchers(client, "does-not-exist", "does-not-exist", false, config, log, resourceWatchers) + require.Error(t, err) + resourceWatchers.lock.Lock() + require.Equal(t, 0, len(resourceWatchers.metaWatchersMap)) + resourceWatchers.lock.Unlock() + + // Start watcher for a resource that requires other resources, should start all the watchers + metricsetPod := "pod" + extras := getExtraWatchers(PodResource, config.AddResourceMetadata) + err = createAllWatchers(client, metricsetPod, PodResource, false, config, log, resourceWatchers) + require.NoError(t, err) + + // Check that all the required watchers are in the map + resourceWatchers.lock.Lock() + // we add 1 to the expected result to represent the resource itself + require.Equal(t, len(extras)+1, len(resourceWatchers.metaWatchersMap)) + for _, extra := range extras { + require.NotNil(t, resourceWatchers.metaWatchersMap[extra]) + } + resourceWatchers.lock.Unlock() +} + +func TestCreateMetaGen(t *testing.T) { + resourceWatchers := NewWatchers() + + commonMetaConfig := metadata.Config{} + commonConfig, err := conf.NewConfigFrom(&commonMetaConfig) + require.NoError(t, err) + + log := logp.NewLogger("test") + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + AddResourceMetadata: &metadata.AddResourceMetadataConfig{ + CronJob: false, + Deployment: true, + }, + } + client := k8sfake.NewSimpleClientset() + + _, err = createMetadataGen(client, commonConfig, config.AddResourceMetadata, DeploymentResource, resourceWatchers) + // At this point, no watchers were created + require.Error(t, err) + + // Create the watchers necessary for the metadata generator + metricsetDeployment := "state_deployment" + err = createAllWatchers(client, metricsetDeployment, DeploymentResource, false, config, log, resourceWatchers) + require.NoError(t, err) + + // Create the generators, this time without error + _, err = createMetadataGen(client, commonConfig, config.AddResourceMetadata, DeploymentResource, resourceWatchers) + require.NoError(t, err) +} + +func TestCreateMetaGenSpecific(t *testing.T) { + resourceWatchers := NewWatchers() + + commonMetaConfig := metadata.Config{} + commonConfig, err := conf.NewConfigFrom(&commonMetaConfig) + require.NoError(t, err) + + log := logp.NewLogger("test") + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + AddResourceMetadata: &metadata.AddResourceMetadataConfig{ + CronJob: false, + Deployment: true, + }, + } + client := k8sfake.NewSimpleClientset() + + // For pod: + metricsetPod := "pod" + + _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, PodResource, resourceWatchers) + // At this point, no watchers were created + require.Error(t, err) + + // Create the pod resource + the extras + err = createAllWatchers(client, metricsetPod, PodResource, false, config, log, resourceWatchers) + require.NoError(t, err) + + _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, PodResource, resourceWatchers) + require.NoError(t, err) + + // For service: + _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, ServiceResource, resourceWatchers) + // At this point, no watchers were created + require.Error(t, err) + + // Create the service resource + the extras + metricsetService := "state_service" + err = createAllWatchers(client, metricsetService, ServiceResource, false, config, log, resourceWatchers) + require.NoError(t, err) + + _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, ServiceResource, resourceWatchers) + require.NoError(t, err) +} + +func TestBuildMetadataEnricher_Start_Stop(t *testing.T) { + resourceWatchers := NewWatchers() + + metricsetNamespace := "state_namespace" + metricsetDeployment := "state_deployment" + + resourceWatchers.lock.Lock() + resourceWatchers.metaWatchersMap[NamespaceResource] = &metaWatcher{ + watcher: &mockWatcher{}, + started: false, + metricsetsUsing: []string{metricsetNamespace, metricsetDeployment}, + enrichers: make(map[string]*enricher), + } + resourceWatchers.metaWatchersMap[DeploymentResource] = &metaWatcher{ + watcher: &mockWatcher{}, + started: true, + metricsetsUsing: []string{metricsetDeployment}, + enrichers: make(map[string]*enricher), + } + resourceWatchers.lock.Unlock() + + funcs := mockFuncs{} + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + AddResourceMetadata: &metadata.AddResourceMetadataConfig{ + CronJob: false, + Deployment: false, + }, + } + + log := logp.NewLogger(selector) + + enricherNamespace := buildMetadataEnricher( + metricsetNamespace, + NamespaceResource, + resourceWatchers, + config, + funcs.update, + funcs.delete, + funcs.index, + log, + ) + resourceWatchers.lock.Lock() + watcher := resourceWatchers.metaWatchersMap[NamespaceResource] + require.False(t, watcher.started) + resourceWatchers.lock.Unlock() + + enricherNamespace.Start(resourceWatchers) + resourceWatchers.lock.Lock() + watcher = resourceWatchers.metaWatchersMap[NamespaceResource] + require.True(t, watcher.started) + resourceWatchers.lock.Unlock() + + // Stopping should not stop the watcher because it is still being used by deployment metricset + enricherNamespace.Stop(resourceWatchers) + resourceWatchers.lock.Lock() + watcher = resourceWatchers.metaWatchersMap[NamespaceResource] + require.True(t, watcher.started) + require.Equal(t, []string{metricsetDeployment}, watcher.metricsetsUsing) + resourceWatchers.lock.Unlock() + + // Stopping the deployment watcher should stop now both watchers + enricherDeployment := buildMetadataEnricher( + metricsetDeployment, + DeploymentResource, + resourceWatchers, + config, + funcs.update, + funcs.delete, + funcs.index, + log, + ) + enricherDeployment.Stop(resourceWatchers) + + resourceWatchers.lock.Lock() + watcher = resourceWatchers.metaWatchersMap[NamespaceResource] + + require.False(t, watcher.started) + require.Equal(t, []string{}, watcher.metricsetsUsing) + + watcher = resourceWatchers.metaWatchersMap[DeploymentResource] + require.False(t, watcher.started) + require.Equal(t, []string{}, watcher.metricsetsUsing) + + resourceWatchers.lock.Unlock() +} + +func TestBuildMetadataEnricher_Start_Stop_SameResources(t *testing.T) { + resourceWatchers := NewWatchers() + + metricsetPod := "pod" + metricsetStatePod := "state_pod" + + resourceWatchers.lock.Lock() + resourceWatchers.metaWatchersMap[PodResource] = &metaWatcher{ + watcher: &mockWatcher{}, + started: false, + metricsetsUsing: []string{metricsetStatePod, metricsetPod}, + enrichers: make(map[string]*enricher), + } + resourceWatchers.lock.Unlock() + + funcs := mockFuncs{} + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + AddResourceMetadata: &metadata.AddResourceMetadataConfig{ + CronJob: false, + Deployment: false, + }, + } + + log := logp.NewLogger(selector) + enricherPod := buildMetadataEnricher(metricsetPod, PodResource, resourceWatchers, config, + funcs.update, funcs.delete, funcs.index, log) + resourceWatchers.lock.Lock() + watcher := resourceWatchers.metaWatchersMap[PodResource] + require.False(t, watcher.started) + resourceWatchers.lock.Unlock() + + enricherPod.Start(resourceWatchers) + resourceWatchers.lock.Lock() + watcher = resourceWatchers.metaWatchersMap[PodResource] + require.True(t, watcher.started) + resourceWatchers.lock.Unlock() + + // Stopping should not stop the watcher because it is still being used by state_pod metricset + enricherPod.Stop(resourceWatchers) + resourceWatchers.lock.Lock() + watcher = resourceWatchers.metaWatchersMap[PodResource] + require.True(t, watcher.started) + require.Equal(t, []string{metricsetStatePod}, watcher.metricsetsUsing) + resourceWatchers.lock.Unlock() + + // Stopping the state_pod watcher should stop pod watcher + enricherStatePod := buildMetadataEnricher(metricsetStatePod, PodResource, resourceWatchers, config, + funcs.update, funcs.delete, funcs.index, log) + enricherStatePod.Stop(resourceWatchers) + + resourceWatchers.lock.Lock() + watcher = resourceWatchers.metaWatchersMap[PodResource] + require.False(t, watcher.started) + require.Equal(t, []string{}, watcher.metricsetsUsing) + resourceWatchers.lock.Unlock() +} + +func TestBuildMetadataEnricher_EventHandler(t *testing.T) { + resourceWatchers := NewWatchers() + + resourceWatchers.lock.Lock() + resourceWatchers.metaWatchersMap[PodResource] = &metaWatcher{ + watcher: &mockWatcher{}, + started: false, + metricsetsUsing: []string{"pod"}, + metadataObjects: make(map[string]bool), + enrichers: make(map[string]*enricher), + } + resourceWatchers.lock.Unlock() funcs := mockFuncs{} resource := &v1.Pod{ @@ -60,16 +477,44 @@ func TestBuildMetadataEnricher(t *testing.T) { Namespace: "default", }, } + id := "default/enrich" + metadataObjects := map[string]bool{id: true} + + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + AddResourceMetadata: &metadata.AddResourceMetadataConfig{ + CronJob: false, + Deployment: false, + }, + } + + metricset := "pod" + log := logp.NewLogger(selector) + + enricher := buildMetadataEnricher(metricset, PodResource, resourceWatchers, config, + funcs.update, funcs.delete, funcs.index, log) + resourceWatchers.lock.Lock() + wData := resourceWatchers.metaWatchersMap[PodResource] + mockW := wData.watcher.(*mockWatcher) + require.NotNil(t, mockW.handler) + resourceWatchers.lock.Unlock() - enricher := buildMetadataEnricher(&watcher, &nodeWatcher, &namespaceWatcher, &rsWatcher, &jobWatcher, funcs.update, funcs.delete, funcs.index) - assert.NotNil(t, watcher.handler) + enricher.Start(resourceWatchers) + resourceWatchers.lock.Lock() + watcher := resourceWatchers.metaWatchersMap[PodResource] + require.True(t, watcher.started) + mockW = watcher.watcher.(*mockWatcher) + resourceWatchers.lock.Unlock() - enricher.Start() - assert.True(t, watcher.started) + mockW.handler.OnAdd(resource) - // Emit an event - watcher.handler.OnAdd(resource) - assert.Equal(t, resource, funcs.updated) + resourceWatchers.lock.Lock() + require.Equal(t, metadataObjects, watcher.metadataObjects) + resourceWatchers.lock.Unlock() + + require.Equal(t, resource, funcs.updated) // Test enricher events := []mapstr.M{ @@ -78,7 +523,7 @@ func TestBuildMetadataEnricher(t *testing.T) { } enricher.Enrich(events) - assert.Equal(t, []mapstr.M{ + require.Equal(t, []mapstr.M{ {"name": "unknown"}, { "name": "enrich", @@ -95,7 +540,7 @@ func TestBuildMetadataEnricher(t *testing.T) { enricher.isPod = true enricher.Enrich(events) - assert.Equal(t, []mapstr.M{ + require.Equal(t, []mapstr.M{ {"name": "unknown"}, { "name": "enrich", @@ -106,8 +551,18 @@ func TestBuildMetadataEnricher(t *testing.T) { }, events) // Emit delete event - watcher.handler.OnDelete(resource) - assert.Equal(t, resource, funcs.deleted) + resourceWatchers.lock.Lock() + wData = resourceWatchers.metaWatchersMap[PodResource] + mockW = wData.watcher.(*mockWatcher) + resourceWatchers.lock.Unlock() + + mockW.handler.OnDelete(resource) + + resourceWatchers.lock.Lock() + require.Equal(t, map[string]bool{}, watcher.metadataObjects) + resourceWatchers.lock.Unlock() + + require.Equal(t, resource, funcs.deleted) events = []mapstr.M{ {"name": "unknown"}, @@ -115,10 +570,99 @@ func TestBuildMetadataEnricher(t *testing.T) { } enricher.Enrich(events) - assert.Equal(t, []mapstr.M{ + require.Equal(t, []mapstr.M{ {"name": "unknown"}, {"name": "enrich"}, }, events) + + enricher.Stop(resourceWatchers) + resourceWatchers.lock.Lock() + watcher = resourceWatchers.metaWatchersMap[PodResource] + require.False(t, watcher.started) + resourceWatchers.lock.Unlock() +} + +// Test if we can add metadata from past events to an enricher that is associated +// with a resource that had already triggered the handler functions +func TestBuildMetadataEnricher_EventHandler_PastObjects(t *testing.T) { + log := logp.NewLogger(selector) + + resourceWatchers := NewWatchers() + + resourceWatchers.lock.Lock() + resourceWatchers.metaWatchersMap[PodResource] = &metaWatcher{ + watcher: &mockWatcher{}, + started: false, + metricsetsUsing: []string{"pod", "state_pod"}, + metadataObjects: make(map[string]bool), + enrichers: make(map[string]*enricher), + } + resourceWatchers.lock.Unlock() + + funcs := mockFuncs{} + resource1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("mockuid"), + Name: "enrich", + Labels: map[string]string{ + "label": "value", + }, + Namespace: "default", + }, + } + id1 := "default/enrich" + resource2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("mockuid2"), + Name: "enrich-2", + Labels: map[string]string{ + "label": "value", + }, + Namespace: "default-2", + }, + } + id2 := "default-2/enrich-2" + + config := &kubernetesConfig{ + Namespace: "test-ns", + SyncPeriod: time.Minute, + Node: "test-node", + AddResourceMetadata: &metadata.AddResourceMetadataConfig{ + CronJob: false, + Deployment: false, + }, + } + + enricher := buildMetadataEnricher("pod", PodResource, resourceWatchers, config, + funcs.update, funcs.delete, funcs.index, log) + enricher.Start(resourceWatchers) + + resourceWatchers.lock.Lock() + + watcher := resourceWatchers.metaWatchersMap[PodResource] + mockW := watcher.watcher.(*mockWatcher) + resourceWatchers.lock.Unlock() + + mockW.handler.OnAdd(resource1) + + resourceWatchers.lock.Lock() + metadataObjects := map[string]bool{id1: true} + require.Equal(t, metadataObjects, watcher.metadataObjects) + resourceWatchers.lock.Unlock() + + mockW.handler.OnUpdate(resource2) + + resourceWatchers.lock.Lock() + metadataObjects[id2] = true + require.Equal(t, metadataObjects, watcher.metadataObjects) + resourceWatchers.lock.Unlock() + + mockW.handler.OnDelete(resource1) + + resourceWatchers.lock.Lock() + delete(metadataObjects, id1) + require.Equal(t, metadataObjects, watcher.metadataObjects) + resourceWatchers.lock.Unlock() } type mockFuncs struct { @@ -127,7 +671,7 @@ type mockFuncs struct { indexed mapstr.M } -func (f *mockFuncs) update(m map[string]mapstr.M, obj kubernetes.Resource) { +func (f *mockFuncs) update(obj kubernetes.Resource) map[string]mapstr.M { accessor, _ := meta.Accessor(obj) f.updated = obj meta := mapstr.M{ @@ -138,17 +682,19 @@ func (f *mockFuncs) update(m map[string]mapstr.M, obj kubernetes.Resource) { }, }, } + logger := logp.NewLogger("kubernetes") for k, v := range accessor.GetLabels() { kubernetes2.ShouldPut(meta, fmt.Sprintf("kubernetes.%v", k), v, logger) } kubernetes2.ShouldPut(meta, "orchestrator.cluster.name", "gke-4242", logger) - m[accessor.GetName()] = meta + id := accessor.GetName() + return map[string]mapstr.M{id: meta} } -func (f *mockFuncs) delete(m map[string]mapstr.M, obj kubernetes.Resource) { +func (f *mockFuncs) delete(obj kubernetes.Resource) []string { accessor, _ := meta.Accessor(obj) f.deleted = obj - delete(m, accessor.GetName()) + return []string{accessor.GetName()} } func (f *mockFuncs) index(m mapstr.M) string { @@ -158,11 +704,9 @@ func (f *mockFuncs) index(m mapstr.M) string { type mockWatcher struct { handler kubernetes.ResourceEventHandler - started bool } func (m *mockWatcher) Start() error { - m.started = true return nil } @@ -174,6 +718,10 @@ func (m *mockWatcher) AddEventHandler(r kubernetes.ResourceEventHandler) { m.handler = r } +func (m *mockWatcher) GetEventHandler() kubernetes.ResourceEventHandler { + return m.handler +} + func (m *mockWatcher) Store() cache.Store { return nil } diff --git a/x-pack/filebeat/input/internal/httplog/roundtripper.go b/x-pack/filebeat/input/internal/httplog/roundtripper.go index e8d5f8765ca..642245603f8 100644 --- a/x-pack/filebeat/input/internal/httplog/roundtripper.go +++ b/x-pack/filebeat/input/internal/httplog/roundtripper.go @@ -17,10 +17,11 @@ import ( "strconv" "time" - "github.com/elastic/elastic-agent-libs/logp" "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" + + "github.com/elastic/elastic-agent-libs/logp" ) var _ http.RoundTripper = (*LoggingRoundTripper)(nil)