diff --git a/charts/unikorn/templates/unikorn-server.yaml b/charts/unikorn/templates/unikorn-server.yaml index 2de1f2bd..95dfbfc5 100644 --- a/charts/unikorn/templates/unikorn-server.yaml +++ b/charts/unikorn/templates/unikorn-server.yaml @@ -59,7 +59,7 @@ rules: - list - watch - apiGroups: - - events.k8s.io + - "" resources: - events verbs: diff --git a/pkg/server/reaper/reaper.go b/pkg/server/reaper/reaper.go index 72309b4d..00384dd0 100644 --- a/pkg/server/reaper/reaper.go +++ b/pkg/server/reaper/reaper.go @@ -24,7 +24,8 @@ import ( "github.com/unikorn-cloud/core/pkg/constants" "github.com/unikorn-cloud/unikorn/pkg/server/handler/region" - eventsv1 "k8s.io/api/events/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/controller-runtime/pkg/client" @@ -34,7 +35,8 @@ import ( var ( ErrInvalidClient = errors.New("client invalid") - ErrDataMissing = errors.New("data missing") + ErrDataMissing = errors.New("data missing") + ErrUnexpectedType = errors.New("unexpected type") ) @@ -58,34 +60,51 @@ func (r *Reaper) Run(ctx context.Context) error { return fmt.Errorf("%w: client does not implement watches", ErrInvalidClient) } - var events eventsv1.EventList - - watcher, err := watchingClient.Watch(ctx, &events, &client.ListOptions{}) - if err != nil { - return err + options := &client.ListOptions{ + FieldSelector: fields.SelectorFromSet(fields.Set{ + "involvedObject.kind": "KubernetesCluster", + }), } - eventStream := watcher.ResultChan() - // Please note when using Kubernetes watches of events that you may see // some historical events for things in the last hour. go func() { - for { - event := <-eventStream + var zeroEvent watch.Event - log.V(1).Info("witnessed an event", "event", event) + for { + var events corev1.EventList - if event.Type != watch.Added { + watcher, err := watchingClient.Watch(ctx, &events, options) + if err != nil { + log.Error(err, "failed to setup watch") continue } - realEvent, ok := event.Object.(*eventsv1.Event) - if !ok { - log.Error(ErrUnexpectedType, "unable to decode event") - } + eventStream := watcher.ResultChan() + + for { + event := <-eventStream + + // Zero value returned, closed channel, this happens, start it back + // up again... + if event == zeroEvent { + break + } + + log.V(1).Info("witnessed an event", "event", event) + + if event.Type != watch.Added { + continue + } + + realEvent, ok := event.Object.(*corev1.Event) + if !ok { + log.Error(ErrUnexpectedType, "unable to decode event") + } - if err := r.handleEvent(ctx, realEvent); err != nil { - log.Error(err, "event handling failed") + if err := r.handleEvent(ctx, realEvent); err != nil { + log.Error(err, "event handling failed") + } } } }() @@ -93,7 +112,7 @@ func (r *Reaper) Run(ctx context.Context) error { return nil } -func (r *Reaper) handleEvent(ctx context.Context, event *eventsv1.Event) error { +func (r *Reaper) handleEvent(ctx context.Context, event *corev1.Event) error { log := log.FromContext(ctx) if event.Reason == "ClusterDeleted" {