Skip to content

Commit

Permalink
Fix Infinite Reaper Loop (#62)
Browse files Browse the repository at this point in the history
Watches are flaky, and I should know this by now!  Seems the channel was
closed by something, then it sat in an infinite loop constantly
receiving nil.  Fix this so it restarts the watch.  Also add a field
selector so that we only get the events we care about.
  • Loading branch information
spjmurray authored Apr 25, 2024
1 parent 2410cd1 commit dff438a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
2 changes: 1 addition & 1 deletion charts/unikorn/templates/unikorn-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ rules:
- list
- watch
- apiGroups:
- events.k8s.io
- ""
resources:
- events
verbs:
Expand Down
59 changes: 39 additions & 20 deletions pkg/server/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"github.com/unikorn-cloud/core/pkg/constants"
"github.com/unikorn-cloud/unikorn/pkg/server/handler/region"

eventsv1 "k8s.io/api/events/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"

"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -34,7 +35,8 @@ import (
var (
ErrInvalidClient = errors.New("client invalid")

ErrDataMissing = errors.New("data missing")
ErrDataMissing = errors.New("data missing")

ErrUnexpectedType = errors.New("unexpected type")
)

Expand All @@ -58,42 +60,59 @@ func (r *Reaper) Run(ctx context.Context) error {
return fmt.Errorf("%w: client does not implement watches", ErrInvalidClient)
}

var events eventsv1.EventList

watcher, err := watchingClient.Watch(ctx, &events, &client.ListOptions{})
if err != nil {
return err
options := &client.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{
"involvedObject.kind": "KubernetesCluster",
}),
}

eventStream := watcher.ResultChan()

// Please note when using Kubernetes watches of events that you may see
// some historical events for things in the last hour.
go func() {
for {
event := <-eventStream
var zeroEvent watch.Event

log.V(1).Info("witnessed an event", "event", event)
for {
var events corev1.EventList

if event.Type != watch.Added {
watcher, err := watchingClient.Watch(ctx, &events, options)
if err != nil {
log.Error(err, "failed to setup watch")
continue
}

realEvent, ok := event.Object.(*eventsv1.Event)
if !ok {
log.Error(ErrUnexpectedType, "unable to decode event")
}
eventStream := watcher.ResultChan()

for {
event := <-eventStream

// Zero value returned, closed channel, this happens, start it back
// up again...
if event == zeroEvent {
break
}

log.V(1).Info("witnessed an event", "event", event)

if event.Type != watch.Added {
continue
}

realEvent, ok := event.Object.(*corev1.Event)
if !ok {
log.Error(ErrUnexpectedType, "unable to decode event")
}

if err := r.handleEvent(ctx, realEvent); err != nil {
log.Error(err, "event handling failed")
if err := r.handleEvent(ctx, realEvent); err != nil {
log.Error(err, "event handling failed")
}
}
}
}()

return nil
}

func (r *Reaper) handleEvent(ctx context.Context, event *eventsv1.Event) error {
func (r *Reaper) handleEvent(ctx context.Context, event *corev1.Event) error {
log := log.FromContext(ctx)

if event.Reason == "ClusterDeleted" {
Expand Down

0 comments on commit dff438a

Please sign in to comment.