Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: add ignored fields #89

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ type Config struct {
// For watching specific namespace, leave it empty for watching all.
// this config is ignored when watching namespaces
Namespace string `json:"namespace,omitempty"`

// Specify fields to skip sending object update. Will be applied to all objects.
// If after removal of these fields from k8s object all remaining fields will be equal,
// handler won't trigger sending update. Removing array elements is not supported.
// For example,
// ignorefields:
// status:
// metadata:
// resourceVersion:
// managedFields:
// will remove ".status", ".metadata.resourceVersion" and ".metadata.managedFields"
// from k8s object before comparing old & new k8s objects.
IgnoredFields map[string]interface{} `json:"ignoredfields,omitempty"`
}

// Slack contains slack configuration
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/fatih/structtag v1.2.0
github.com/google/go-cmp v0.6.0
github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 // indirect
github.com/hashicorp/hcl v0.0.0-20171017181929-23c074d0eceb // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand Down
87 changes: 65 additions & 22 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/google/go-cmp/cmp"
)

const maxRetries = 5
Expand Down Expand Up @@ -131,7 +133,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics)
allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopAllCoreEventsCh := make(chan struct{})
defer close(stopAllCoreEventsCh)

Expand All @@ -155,7 +157,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics)
allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopAllEventsCh := make(chan struct{})
defer close(stopAllEventsCh)

Expand All @@ -177,7 +179,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -199,7 +201,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -222,7 +224,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -244,7 +246,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -266,7 +268,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -288,7 +290,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -310,7 +312,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -332,7 +334,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -354,7 +356,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -376,7 +378,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -398,7 +400,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -420,7 +422,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -442,7 +444,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -464,7 +466,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -486,7 +488,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -508,7 +510,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -530,7 +532,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -552,7 +554,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand Down Expand Up @@ -583,7 +585,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -596,7 +598,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
<-sigterm
}

func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec) *Controller {
func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec, ignoredFields map[string]interface{}) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
var newEvent Event
var err error
Expand Down Expand Up @@ -634,6 +636,15 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
if !ok {
logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot convert old to runtime.Object for update on %v", old)
}
if len(ignoredFields) > 0 {
diff, errDiff := diffObjects(old, new, ignoredFields)
if errDiff != nil {
logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot diff old & new objects %v and %v: %v", old, new, errDiff)
} else if len(diff) == 0 {
logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Ignoring update to %v: %s", resourceType, newEvent.key)
return
}
}
logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Processing update to %v: %s", resourceType, newEvent.key)
if err == nil {
queue.Add(newEvent)
Expand Down Expand Up @@ -670,6 +681,38 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
}
}

func diffObjects(old, new interface{}, ignoredFields map[string]interface{}) (string, error) {
oldContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(old)
if err != nil {
return "", err
}
newContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(new)
if err != nil {
return "", err
}
recursiveDelete(oldContent, ignoredFields)
recursiveDelete(newContent, ignoredFields)
return cmp.Diff(oldContent, newContent), nil
}

// recursiveDelete recursively removes key from object
// value of key should be either nil or nested map[string]interface{}
// value of object to delete from should be nested map[string]interface{}
func recursiveDelete(object map[string]interface{}, key map[string]interface{}) {
for k, v := range key {
if v == nil {
delete(object, k)
continue
}
if recursiveKey, ok := v.(map[string]interface{}); ok {
if recursiveObj, ok := object[k].(map[string]interface{}); ok {
recursiveDelete(recursiveObj, recursiveKey)
}
}
}
return
}

// Run starts the kubewatch controller
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
Expand Down