Skip to content

Commit

Permalink
handle deleting nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke-Smartnews committed May 20, 2024
1 parent 9a08a56 commit b2d1393
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (p *Provisioner) GetPendingPods(ctx context.Context) ([]*v1.Pod, error) {
}
return lo.Reject(pods, func(po *v1.Pod, _ int) bool {
if err := p.Validate(ctx, po); err != nil {
logging.FromContext(ctx).With("pod", client.ObjectKeyFromObject(po)).Infof("ignoring pod, %s", err)
logging.FromContext(ctx).With("pod", client.ObjectKeyFromObject(po)).Debugf("ignoring pod, %s", err)
return true
}
p.consolidationWarnings(ctx, po)
Expand Down Expand Up @@ -314,16 +314,22 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
if len(pods) == 0 {
return scheduler.Results{}, nil
}
s, err := p.NewScheduler(ctx, pods, nodes.Active())
targetPods := lo.FilterMap(pods, func(pod *v1.Pod, _ int) (*v1.Pod, bool) {
if !p.isPodHandled(ctx, pod) {
return pod, true
}
return nil, false
})
s, err := p.NewScheduler(ctx, targetPods, nodes.Active())
if err != nil {
if errors.Is(err, ErrNodePoolsNotFound) {
logging.FromContext(ctx).Info(ErrNodePoolsNotFound)
return scheduler.Results{}, nil
}
return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
logging.FromContext(ctx).With("pods", pretty.Slice(lo.Map(pods, func(p *v1.Pod, _ int) string { return client.ObjectKeyFromObject(p).String() }), 5)).
results := s.Solve(ctx, targetPods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
logging.FromContext(ctx).With("pods", pretty.Slice(lo.Map(targetPods, func(p *v1.Pod, _ int) string { return client.ObjectKeyFromObject(p).String() }), 5)).
With("duration", time.Since(start)).
Infof("found provisionable pod(s)")
results.Record(ctx, p.recorder, p.cluster)
Expand Down Expand Up @@ -416,32 +422,32 @@ func (p *Provisioner) Validate(ctx context.Context, pod *v1.Pod) error {
validateNodeSelector(pod),
validateAffinity(pod),
p.volumeTopology.ValidatePersistentVolumeClaims(ctx, pod),
p.isPodHandled(ctx, pod),
)
}

func (p *Provisioner) isPodHandled(ctx context.Context, pod *v1.Pod) (err error) {
func (p *Provisioner) isPodHandled(ctx context.Context, pod *v1.Pod) bool {
var events v1.EventList
filter := client.MatchingFields{
"namespace": pod.Namespace,
"involvedObject.kind": "Pod",
"involvedObject.name": pod.Name,
"reason": "HandledByKarpenter",
}
logging.FromContext(ctx).Debugf("get event for %s/%s", pod.Namespace, pod.Name)
logging.FromContext(ctx).Infof("get event for %s/%s", pod.Namespace, pod.Name)
if err := p.kubeClient.List(ctx, &events, filter); err == nil {
for _, event := range events.Items {
logging.FromContext(ctx).Debugf("process event %s", event.Name)
logging.FromContext(ctx).Infof("process event %s", event.Name)
// ignore the pod if it's already handled in 3 minute
if !time.Now().Add(3 * time.Minute).After(event.LastTimestamp.Time) {
return fmt.Errorf("pod %s/%s is handled", pod.Namespace, pod.Name)
logging.FromContext(ctx).Infof("pod %s/%s is handled", pod.Namespace, pod.Name)
return true
}
}
} else {
logging.FromContext(ctx).Errorf("failed to get event for %s/%s: %w", pod.Namespace, pod.Name, err)
}
p.recorder.Publish(scheduler.PodHandledEvent(pod))
return nil
return false
}

// validateKarpenterManagedLabelCanExist provides a more clear error message in the event of scheduling a pod that specifically doesn't
Expand Down

0 comments on commit b2d1393

Please sign in to comment.