From c5b5d6dd9efade573c1cdc58e5a16fe00bfb6a79 Mon Sep 17 00:00:00 2001 From: Simon Murray Date: Thu, 25 Apr 2024 10:58:57 +0100 Subject: [PATCH] Fix Infinite Reaper Loop Watches are flaky, and I should know this by now! Seems the channel was closed by something, then it sat in an infinite loop constantly receiving nil. Fix this so it restarts the watch. Also add a field selector so that we only get the events we care about. --- charts/unikorn/templates/unikorn-server.yaml | 2 +- pkg/server/reaper/reaper.go | 59 +++++++++++++------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/charts/unikorn/templates/unikorn-server.yaml b/charts/unikorn/templates/unikorn-server.yaml index 2de1f2bd..95dfbfc5 100644 --- a/charts/unikorn/templates/unikorn-server.yaml +++ b/charts/unikorn/templates/unikorn-server.yaml @@ -59,7 +59,7 @@ rules: - list - watch - apiGroups: - - events.k8s.io + - "" resources: - events verbs: diff --git a/pkg/server/reaper/reaper.go b/pkg/server/reaper/reaper.go index 72309b4d..00384dd0 100644 --- a/pkg/server/reaper/reaper.go +++ b/pkg/server/reaper/reaper.go @@ -24,7 +24,8 @@ import ( "github.com/unikorn-cloud/core/pkg/constants" "github.com/unikorn-cloud/unikorn/pkg/server/handler/region" - eventsv1 "k8s.io/api/events/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/controller-runtime/pkg/client" @@ -34,7 +35,8 @@ import ( var ( ErrInvalidClient = errors.New("client invalid") - ErrDataMissing = errors.New("data missing") + ErrDataMissing = errors.New("data missing") + ErrUnexpectedType = errors.New("unexpected type") ) @@ -58,34 +60,51 @@ func (r *Reaper) Run(ctx context.Context) error { return fmt.Errorf("%w: client does not implement watches", ErrInvalidClient) } - var events eventsv1.EventList - - watcher, err := watchingClient.Watch(ctx, &events, &client.ListOptions{}) - if err != nil { - return err + options := &client.ListOptions{ + FieldSelector: fields.SelectorFromSet(fields.Set{ + "involvedObject.kind": "KubernetesCluster", + }), } - eventStream := watcher.ResultChan() - // Please note when using Kubernetes watches of events that you may see // some historical events for things in the last hour. go func() { - for { - event := <-eventStream + var zeroEvent watch.Event - log.V(1).Info("witnessed an event", "event", event) + for { + var events corev1.EventList - if event.Type != watch.Added { + watcher, err := watchingClient.Watch(ctx, &events, options) + if err != nil { + log.Error(err, "failed to setup watch") continue } - realEvent, ok := event.Object.(*eventsv1.Event) - if !ok { - log.Error(ErrUnexpectedType, "unable to decode event") - } + eventStream := watcher.ResultChan() + + for { + event := <-eventStream + + // Zero value returned, closed channel, this happens, start it back + // up again... + if event == zeroEvent { + break + } + + log.V(1).Info("witnessed an event", "event", event) + + if event.Type != watch.Added { + continue + } + + realEvent, ok := event.Object.(*corev1.Event) + if !ok { + log.Error(ErrUnexpectedType, "unable to decode event") + } - if err := r.handleEvent(ctx, realEvent); err != nil { - log.Error(err, "event handling failed") + if err := r.handleEvent(ctx, realEvent); err != nil { + log.Error(err, "event handling failed") + } } } }() @@ -93,7 +112,7 @@ func (r *Reaper) Run(ctx context.Context) error { return nil } -func (r *Reaper) handleEvent(ctx context.Context, event *eventsv1.Event) error { +func (r *Reaper) handleEvent(ctx context.Context, event *corev1.Event) error { log := log.FromContext(ctx) if event.Reason == "ClusterDeleted" {