From b2d1393ecd523d2f6b65662b81adabe8f6036547 Mon Sep 17 00:00:00 2001 From: Luke Date: Mon, 20 May 2024 14:45:14 +0900 Subject: [PATCH] handle deleting nodes --- pkg/controllers/provisioning/provisioner.go | 26 +++++++++++++-------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 4a507b2224..6f63eeda4d 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -160,7 +160,7 @@ func (p *Provisioner) GetPendingPods(ctx context.Context) ([]*v1.Pod, error) { } return lo.Reject(pods, func(po *v1.Pod, _ int) bool { if err := p.Validate(ctx, po); err != nil { - logging.FromContext(ctx).With("pod", client.ObjectKeyFromObject(po)).Infof("ignoring pod, %s", err) + logging.FromContext(ctx).With("pod", client.ObjectKeyFromObject(po)).Debugf("ignoring pod, %s", err) return true } p.consolidationWarnings(ctx, po) @@ -314,7 +314,13 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { if len(pods) == 0 { return scheduler.Results{}, nil } - s, err := p.NewScheduler(ctx, pods, nodes.Active()) + targetPods := lo.FilterMap(pods, func(pod *v1.Pod, _ int) (*v1.Pod, bool) { + if !p.isPodHandled(ctx, pod) { + return pod, true + } + return nil, false + }) + s, err := p.NewScheduler(ctx, targetPods, nodes.Active()) if err != nil { if errors.Is(err, ErrNodePoolsNotFound) { logging.FromContext(ctx).Info(ErrNodePoolsNotFound) @@ -322,8 +328,8 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { } return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err) } - results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes) - logging.FromContext(ctx).With("pods", pretty.Slice(lo.Map(pods, func(p *v1.Pod, _ int) string { return client.ObjectKeyFromObject(p).String() }), 5)). + results := s.Solve(ctx, targetPods).TruncateInstanceTypes(scheduler.MaxInstanceTypes) + logging.FromContext(ctx).With("pods", pretty.Slice(lo.Map(targetPods, func(p *v1.Pod, _ int) string { return client.ObjectKeyFromObject(p).String() }), 5)). With("duration", time.Since(start)). Infof("found provisionable pod(s)") results.Record(ctx, p.recorder, p.cluster) @@ -416,11 +422,10 @@ func (p *Provisioner) Validate(ctx context.Context, pod *v1.Pod) error { validateNodeSelector(pod), validateAffinity(pod), p.volumeTopology.ValidatePersistentVolumeClaims(ctx, pod), - p.isPodHandled(ctx, pod), ) } -func (p *Provisioner) isPodHandled(ctx context.Context, pod *v1.Pod) (err error) { +func (p *Provisioner) isPodHandled(ctx context.Context, pod *v1.Pod) bool { var events v1.EventList filter := client.MatchingFields{ "namespace": pod.Namespace, @@ -428,20 +433,21 @@ func (p *Provisioner) isPodHandled(ctx context.Context, pod *v1.Pod) (err error) "involvedObject.name": pod.Name, "reason": "HandledByKarpenter", } - logging.FromContext(ctx).Debugf("get event for %s/%s", pod.Namespace, pod.Name) + logging.FromContext(ctx).Infof("get event for %s/%s", pod.Namespace, pod.Name) if err := p.kubeClient.List(ctx, &events, filter); err == nil { for _, event := range events.Items { - logging.FromContext(ctx).Debugf("process event %s", event.Name) + logging.FromContext(ctx).Infof("process event %s", event.Name) // ignore the pod if it's already handled in 3 minute if !time.Now().Add(3 * time.Minute).After(event.LastTimestamp.Time) { - return fmt.Errorf("pod %s/%s is handled", pod.Namespace, pod.Name) + logging.FromContext(ctx).Infof("pod %s/%s is handled", pod.Namespace, pod.Name) + return true } } } else { logging.FromContext(ctx).Errorf("failed to get event for %s/%s: %w", pod.Namespace, pod.Name, err) } p.recorder.Publish(scheduler.PodHandledEvent(pod)) - return nil + return false } // validateKarpenterManagedLabelCanExist provides a more clear error message in the event of scheduling a pod that specifically doesn't