From 7b85ae1fb577923e395453c72df37859adfafde3 Mon Sep 17 00:00:00 2001 From: D074096 Date: Mon, 13 Sep 2021 15:07:37 +0200 Subject: [PATCH] Ignore source and change ref.UID The event source is actually irrelevant. --- controllers/node_controller_test.go | 2 +- controllers/suite_test.go | 9 ++---- event/event.go | 43 ++++++++--------------------- main.go | 17 +++--------- state/state.go | 15 ++++------ 5 files changed, 23 insertions(+), 63 deletions(-) diff --git a/controllers/node_controller_test.go b/controllers/node_controller_test.go index 43b3a26..02698a0 100644 --- a/controllers/node_controller_test.go +++ b/controllers/node_controller_test.go @@ -106,7 +106,7 @@ var _ = Describe("The controller", func() { err = k8sClient.List(context.Background(), events) Expect(err).To(Succeed()) Expect(events.Items).ToNot(HaveLen(0)) - Expect(events.Items[0].Source.Host).To(Equal("targetnode")) + Expect(events.Items[0].InvolvedObject.UID).To(BeEquivalentTo("targetnode")) }) It("should annotate the last used profile", func() { diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 60d758c..3920843 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -25,12 +25,10 @@ import ( "path/filepath" "testing" - "github.com/go-logr/logr" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sapcc/maintenance-controller/event" corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" @@ -129,18 +127,15 @@ var _ = BeforeSuite(func() { k8sManager, err = ctrl.NewManager(cfg, ctrl.Options{ Scheme: scheme.Scheme, MetricsBindAddress: "0", + EventBroadcaster: event.NewNodeBroadcaster(), }) Expect(err).ToNot(HaveOccurred()) - clientSet, err := kubernetes.NewForConfig(cfg) - Expect(err).To(Succeed()) - - eventRecorder := event.MakeRecorder(logr.Discard(), scheme.Scheme, clientSet) err = (&NodeReconciler{ Client: k8sManager.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("maintenance"), Scheme: k8sManager.GetScheme(), - Recorder: eventRecorder, + Recorder: k8sManager.GetEventRecorderFor("controller"), }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) diff --git a/event/event.go b/event/event.go index ee1a56e..81a3cde 100644 --- a/event/event.go +++ b/event/event.go @@ -22,15 +22,13 @@ import ( "math/rand" "time" - "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record/util" @@ -53,7 +51,7 @@ type eventBroadcasterImpl struct { } // Creates a new event broadcaster. -func NewSourcingBroadcaster() record.EventBroadcaster { +func NewNodeBroadcaster() record.EventBroadcaster { return &eventBroadcasterImpl{ Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, @@ -195,17 +193,17 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) w // NewRecorder returns an EventRecorder that records events with the given event source. func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) record.EventRecorder { - return &SourcingRecorder{scheme, source, e.Broadcaster, clock.RealClock{}} + return &NodeRecorder{scheme, source, e.Broadcaster, clock.RealClock{}} } -type SourcingRecorder struct { +type NodeRecorder struct { scheme *runtime.Scheme source v1.EventSource *watch.Broadcaster clock clock.Clock } -func (recorder *SourcingRecorder) generateEvent(object runtime.Object, annotations map[string]string, +func (recorder *NodeRecorder) generateEvent(object runtime.Object, annotations map[string]string, source *v1.EventSource, eventtype, reason, message string) { ref, err := ref.GetReference(recorder.scheme, object) if err != nil { @@ -233,33 +231,25 @@ func (recorder *SourcingRecorder) generateEvent(object runtime.Object, annotatio }() } -func (recorder *SourcingRecorder) Event(object runtime.Object, eventtype, reason, message string) { +func (recorder *NodeRecorder) Event(object runtime.Object, eventtype, reason, message string) { recorder.generateEvent(object, nil, nil, eventtype, reason, message) } -func (recorder *SourcingRecorder) Eventf(object runtime.Object, +func (recorder *NodeRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...)) } -func (recorder *SourcingRecorder) AnnotatedEventf(object runtime.Object, +func (recorder *NodeRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { recorder.generateEvent(object, annotations, nil, eventtype, reason, fmt.Sprintf(messageFmt, args...)) } -func (recorder *SourcingRecorder) SourcedEvent(object runtime.Object, source v1.EventSource, - eventtype, reason, message string) { - recorder.generateEvent(object, nil, &source, eventtype, reason, message) -} - -func (recorder *SourcingRecorder) SourcedEventf(object runtime.Object, source v1.EventSource, eventtype, reason, - messageFmt string, args ...interface{}) { - recorder.SourcedEvent(object, source, eventtype, reason, fmt.Sprintf(messageFmt, args...)) -} - -func (recorder *SourcingRecorder) makeEvent(ref *v1.ObjectReference, annotations map[string]string, +func (recorder *NodeRecorder) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event { t := metav1.Time{Time: recorder.clock.Now()} + // this makes the event appear in kubectl describe node. + ref.UID = types.UID(ref.Name) namespace := ref.Namespace if namespace == "" { namespace = metav1.NamespaceDefault @@ -279,14 +269,3 @@ func (recorder *SourcingRecorder) makeEvent(ref *v1.ObjectReference, annotations Type: eventtype, } } - -func MakeRecorder(log logr.Logger, scheme *runtime.Scheme, clientSet *kubernetes.Clientset) record.EventRecorder { - eventBroadcaster := NewSourcingBroadcaster() - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) - eventBroadcaster.StartEventWatcher( - func(e *v1.Event) { - log.Info("Send event", "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message) - }) - eventRecorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "maintenance-controller"}) - return eventRecorder -} diff --git a/main.go b/main.go index e01e84e..40c1ac6 100644 --- a/main.go +++ b/main.go @@ -26,9 +26,7 @@ import ( // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. v1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" - "k8s.io/client-go/rest" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -97,6 +95,7 @@ func main() { HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "6a2f7a03.cloud.sap", + EventBroadcaster: event.NewNodeBroadcaster(), }) if err != nil { setupLog.Error(err, "unable to start manager") @@ -104,7 +103,7 @@ func main() { } setupChecks(mgr) - err = setupReconcilers(mgr, enableESXMaintenance, restConfig) + err = setupReconcilers(mgr, enableESXMaintenance) if err != nil { setupLog.Error(err, "problem setting up reconcilers") os.Exit(1) @@ -130,20 +129,12 @@ func setupChecks(mgr manager.Manager) { } } -func setupReconcilers(mgr manager.Manager, enableESXMaintenance bool, restConfig *rest.Config) error { - clientSet, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return fmt.Errorf("Failed to init clientSet: %w", err) - } - - eventLog := ctrl.Log.WithName("controllers").WithName("events") - eventRecorder := event.MakeRecorder(eventLog, mgr.GetScheme(), clientSet) - +func setupReconcilers(mgr manager.Manager, enableESXMaintenance bool) error { if err := (&controllers.NodeReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("maintenance"), Scheme: mgr.GetScheme(), - Recorder: eventRecorder, + Recorder: mgr.GetEventRecorderFor("maintenance"), }).SetupWithManager(mgr); err != nil { return fmt.Errorf("Failed to setup maintenance controller node reconciler: %w", err) } diff --git a/state/state.go b/state/state.go index 6f4e6bb..8f12fb5 100644 --- a/state/state.go +++ b/state/state.go @@ -24,7 +24,6 @@ import ( "strings" "time" - "github.com/sapcc/maintenance-controller/event" "github.com/sapcc/maintenance-controller/plugin" v1 "k8s.io/api/core/v1" ) @@ -95,22 +94,18 @@ func FromLabel(label NodeStateLabel, chains PluginChains, interval time.Duration // Returns the next node state. // In case of an error state.Label() is retuned alongside with the error. func Apply(state NodeState, node *v1.Node, data *Data, params plugin.Parameters) (NodeStateLabel, error) { - recorder := params.Recorder.(*event.SourcingRecorder) - source := v1.EventSource{ - Component: "maintenance-controller", - Host: node.Name, - } + recorder := params.Recorder // invoke notifications and check for transition err := state.Notify(params, data) if err != nil { - recorder.SourcedEventf(node, source, "Normal", "ChangeMaintenanceStateFailed", + recorder.Eventf(node, "Normal", "ChangeMaintenanceStateFailed", "At least one notification plugin failed for profile %v: Will stay in %v state", params.Profile.Current, params.State) return state.Label(), fmt.Errorf("failed to notify for profile %v: %w", params.Profile.Current, err) } next, err := state.Transition(params, data) if err != nil { - recorder.SourcedEventf(node, source, "Normal", "ChangeMaintenanceStateFailed", + recorder.Eventf(node, "Normal", "ChangeMaintenanceStateFailed", "At least one check plugin failed for profile %v: Will stay in %v state", params.Profile.Current, params.State) params.Log.Error(err, "Failed to check for state transition", "state", params.State, @@ -123,12 +118,12 @@ func Apply(state NodeState, node *v1.Node, data *Data, params plugin.Parameters) err = state.Trigger(params, data) if err != nil { params.Log.Error(err, "Failed to execute triggers", "state", params.State, "profile", params.Profile.Current) - recorder.SourcedEventf(node, source, "Normal", "ChangeMaintenanceStateFailed", + recorder.Eventf(node, "Normal", "ChangeMaintenanceStateFailed", "At least one trigger plugin failed for profile %v: Will stay in %v state", params.Profile.Current, params.State) return state.Label(), err } else { params.Log.Info("Moved node to next state", "state", string(next), "profile", params.Profile.Current) - recorder.SourcedEventf(node, source, "Normal", "ChangedMaintenanceState", + recorder.Eventf(node, "Normal", "ChangedMaintenanceState", "The node is now in the %v state caused by profile %v", string(next), params.Profile.Current) return next, nil }