diff --git a/config/config.go b/config/config.go index 47d3ad20..a492c70c 100755 --- a/config/config.go +++ b/config/config.go @@ -94,6 +94,19 @@ type Config struct { // For watching specific namespace, leave it empty for watching all. // this config is ignored when watching namespaces Namespace string `json:"namespace,omitempty"` + + // Specify fields to skip sending object update. Will be applied to all objects. + // If after removal of these fields from k8s object all remaining fields will be equal, + // handler won't trigger sending update. Removing array elements is not supported. + // For example, + // ignorefields: + // status: + // metadata: + // resourceVersion: + // managedFields: + // will remove ".status", ".metadata.resourceVersion" and ".metadata.managedFields" + // from k8s object before comparing old & new k8s objects. + IgnoredFields map[string]interface{} `json:"ignoredfields,omitempty"` } // Slack contains slack configuration diff --git a/go.mod b/go.mod index f0d0739e..cddcbd58 100755 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.14 require ( github.com/fatih/structtag v1.2.0 + github.com/google/go-cmp v0.6.0 github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 // indirect github.com/hashicorp/hcl v0.0.0-20171017181929-23c074d0eceb // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5bb907a3..60ef1c00 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -54,6 +54,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/google/go-cmp/cmp" ) const maxRetries = 5 @@ -131,7 +133,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics) + allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopAllCoreEventsCh := make(chan struct{}) defer close(stopAllCoreEventsCh) @@ -155,7 +157,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics) + allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopAllEventsCh := make(chan struct{}) defer close(stopAllEventsCh) @@ -177,7 +179,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -199,7 +201,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -222,7 +224,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -244,7 +246,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -266,7 +268,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -288,7 +290,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -310,7 +312,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -332,7 +334,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -354,7 +356,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -376,7 +378,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -398,7 +400,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -420,7 +422,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -442,7 +444,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -464,7 +466,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -486,7 +488,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -508,7 +510,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -530,7 +532,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -552,7 +554,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -583,7 +585,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -596,7 +598,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { <-sigterm } -func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec) *Controller { +func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec, ignoredFields map[string]interface{}) *Controller { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) var newEvent Event var err error @@ -634,6 +636,15 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha if !ok { logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot convert old to runtime.Object for update on %v", old) } + if len(ignoredFields) > 0 { + diff, errDiff := diffObjects(old, new, ignoredFields) + if errDiff != nil { + logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot diff old & new objects %v and %v: %v", old, new, errDiff) + } else if len(diff) == 0 { + logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Ignoring update to %v: %s", resourceType, newEvent.key) + return + } + } logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Processing update to %v: %s", resourceType, newEvent.key) if err == nil { queue.Add(newEvent) @@ -670,6 +681,38 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha } } +func diffObjects(old, new interface{}, ignoredFields map[string]interface{}) (string, error) { + oldContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(old) + if err != nil { + return "", err + } + newContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(new) + if err != nil { + return "", err + } + recursiveDelete(oldContent, ignoredFields) + recursiveDelete(newContent, ignoredFields) + return cmp.Diff(oldContent, newContent), nil +} + +// recursiveDelete recursively removes key from object +// value of key should be either nil or nested map[string]interface{} +// value of object to delete from should be nested map[string]interface{} +func recursiveDelete(object map[string]interface{}, key map[string]interface{}) { + for k, v := range key { + if v == nil { + delete(object, k) + continue + } + if recursiveKey, ok := v.(map[string]interface{}); ok { + if recursiveObj, ok := object[k].(map[string]interface{}); ok { + recursiveDelete(recursiveObj, recursiveKey) + } + } + } + return +} + // Run starts the kubewatch controller func (c *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash()