Skip to content

Commit

Permalink
chore: upgrade karpenter to 0.37.4
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke-Smartnews committed Oct 2, 2024
1 parent 46bab1b commit 7f6baca
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 21 deletions.
7 changes: 7 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,13 @@ spec:
memory leak protection, and disruption testing.
pattern: ^(([0-9]+(s|m|h))+)|(Never)$
type: string
utilizationThreshold:
description: |-
UtilizationThreshold is defined as sum of requested resources divided by capacity
below which a node can be considered for disruption.
maximum: 100
minimum: 1
type: integer
type: object
x-kubernetes-validations:
- message: consolidateAfter cannot be combined with consolidationPolicy=WhenUnderutilized
Expand Down
13 changes: 7 additions & 6 deletions pkg/apis/v1beta1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
)

const (
ConditionTypeLaunched = "Launched"
ConditionTypeRegistered = "Registered"
ConditionTypeInitialized = "Initialized"
ConditionTypeEmpty = "Empty"
ConditionTypeDrifted = "Drifted"
ConditionTypeExpired = "Expired"
ConditionTypeLaunched = "Launched"
ConditionTypeRegistered = "Registered"
ConditionTypeInitialized = "Initialized"
ConditionTypeEmpty = "Empty"
ConditionTypeUnderutilized = "Underutilized"
ConditionTypeDrifted = "Drifted"
ConditionTypeExpired = "Expired"
)

// NodeClaimStatus defines the observed state of NodeClaim
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/v1beta1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ type Disruption struct {
// +kubebuilder:validation:Enum:={WhenEmpty,WhenUnderutilized}
// +optional
ConsolidationPolicy ConsolidationPolicy `json:"consolidationPolicy,omitempty"`
// UtilizationThreshold is defined as sum of requested resources divided by capacity
// below which a node can be considered for disruption.
// +kubebuilder:validation:Minimum:=1
// +kubebuilder:validation:Maximum:=100
// +optional
UtilizationThreshold *int `json:"utilizationThreshold,omitempty"`
// ExpireAfter is the duration the controller will wait
// before terminating a node, measured from when the node is created. This
// is useful to implement features like eventually consistent node upgrade,
Expand Down
3 changes: 0 additions & 3 deletions pkg/apis/v1beta1/nodepool_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ func (in *NodeClaimTemplate) validateRequirementsNodePoolKeyDoesNotExist() (errs

//nolint:gocyclo
func (in *Disruption) validate() (errs *apis.FieldError) {
if in.ConsolidateAfter != nil && in.ConsolidateAfter.Duration != nil && in.ConsolidationPolicy == ConsolidationPolicyWhenUnderutilized {
return errs.Also(apis.ErrGeneric("consolidateAfter cannot be combined with consolidationPolicy=WhenUnderutilized"))
}
if in.ConsolidateAfter == nil && in.ConsolidationPolicy == ConsolidationPolicyWhenEmpty {
return errs.Also(apis.ErrGeneric("consolidateAfter must be specified with consolidationPolicy=WhenEmpty"))
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/apis/v1beta1/nodepool_validation_cel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ var _ = Describe("CEL/Validation", func() {
nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmpty
Expect(env.Client.Create(ctx, nodePool)).To(Succeed())
})
It("should fail when setting consolidateAfter with consolidationPolicy=WhenUnderutilized", func() {
nodePool.Spec.Disruption.ConsolidateAfter = lo.ToPtr(MustParseNillableDuration("30s"))
nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenUnderutilized
Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed())
})
It("should succeed when not setting consolidateAfter to 'Never' with consolidationPolicy=WhenUnderutilized", func() {
nodePool.Spec.Disruption.ConsolidateAfter = lo.ToPtr(MustParseNillableDuration("Never"))
nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenUnderutilized
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func (c *consolidation) ShouldDisrupt(_ context.Context, cn *Candidate) bool {
c.recorder.Publish(disruptionevents.Unconsolidatable(cn.Node, cn.NodeClaim, fmt.Sprintf("NodePool %q has consolidation disabled", cn.nodePool.Name))...)
return false
}
// Only check when UtilizationThreshold is specified to make it compatible
if cn.nodePool.Spec.Disruption.UtilizationThreshold != nil {
if !cn.NodeClaim.StatusConditions().Get(v1beta1.ConditionTypeUnderutilized).IsTrue() ||
c.clock.Now().Before(cn.NodeClaim.StatusConditions().Get(v1beta1.ConditionTypeUnderutilized).LastTransitionTime.Add(*cn.nodePool.Spec.Disruption.ConsolidateAfter.Duration)) {
return false
}
}
return true
}

Expand Down
137 changes: 137 additions & 0 deletions pkg/controllers/nodeclaim/disruption/consolidation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package disruption

import (
"context"
"fmt"

"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

// Consolidation is a nodeclaim sub-controller that adds or removes status conditions on nodeclaims when using WhenUnderutilized policy.
type Consolidation struct {
kubeClient client.Client
cluster *state.Cluster
clock clock.Clock
}

//nolint:gocyclo
func (e *Consolidation) Reconcile(ctx context.Context, nodePool *v1beta1.NodePool, nodeClaim *v1beta1.NodeClaim) (reconcile.Result, error) {
hasCondition := nodeClaim.StatusConditions().Get(v1beta1.ConditionTypeUnderutilized) != nil
if nodePool.Spec.Disruption.ConsolidationPolicy != v1beta1.ConsolidationPolicyWhenUnderutilized {
if hasCondition {
_ = nodeClaim.StatusConditions().Clear(v1beta1.ConditionTypeUnderutilized)
}
return reconcile.Result{}, nil
}
if initCond := nodeClaim.StatusConditions().Get(v1beta1.ConditionTypeInitialized); initCond == nil || initCond.IsFalse() {
if hasCondition {
_ = nodeClaim.StatusConditions().Clear(v1beta1.ConditionTypeUnderutilized)
logging.FromContext(ctx).Debugf("removing consolidated status condition, isn't initialized")
}
return reconcile.Result{}, nil
}
_, err := nodeclaimutil.NodeForNodeClaim(ctx, e.kubeClient, nodeClaim)
if err != nil {
if nodeclaimutil.IsDuplicateNodeError(err) || nodeclaimutil.IsNodeNotFoundError(err) {
_ = nodeClaim.StatusConditions().Clear(v1beta1.ConditionTypeUnderutilized)
if hasCondition {
logging.FromContext(ctx).Debugf("removing underutilized status condition, doesn't have a single node mapping")
}
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

// Get the node to check utilization
n, err := nodeclaimutil.NodeForNodeClaim(ctx, e.kubeClient, nodeClaim)
if err != nil {
if nodeclaimutil.IsDuplicateNodeError(err) || nodeclaimutil.IsNodeNotFoundError(err) {
_ = nodeClaim.StatusConditions().Clear(v1beta1.ConditionTypeUnderutilized)
if hasCondition {
logging.FromContext(ctx).Debugf("removing underutilized status condition, doesn't have a single node mapping")
}
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
pods, err := node.GetPods(ctx, e.kubeClient, n)
if err != nil {
return reconcile.Result{}, fmt.Errorf("retrieving node pods, %w", err)
}
// Check the node utilization if the utilizationThreshold is specified, the node can be disruptted only if the utilization is below the threshold.
threshold := nodePool.Spec.Disruption.UtilizationThreshold
if threshold != nil {
cpu, err := calculateUtilizationOfResource(n, v1.ResourceCPU, pods)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to calculate CPU, %w", err)
}
memory, err := calculateUtilizationOfResource(n, v1.ResourceMemory, pods)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to calculate memory, %w", err)
}
if cpu < float64(*threshold)/100 && memory < float64(*threshold)/100 {
if !hasCondition {
nodeClaim.StatusConditions().SetTrue(v1beta1.ConditionTypeUnderutilized)
logging.FromContext(ctx).Debugf("marking underutilizate")
metrics.NodeClaimsDisruptedCounter.With(prometheus.Labels{
metrics.TypeLabel: metrics.ConsolidationReason,
metrics.NodePoolLabel: nodeClaim.Labels[v1beta1.NodePoolLabelKey],
}).Inc()
}
} else {
if hasCondition {
_ = nodeClaim.StatusConditions().Clear(v1beta1.ConditionTypeUnderutilized)
logging.FromContext(ctx).Debugf("removing underutilized status condition, utilization increased")
}
}
}
return reconcile.Result{}, nil
}

// CalculateUtilizationOfResource calculates utilization of a given resource for a node.
func calculateUtilizationOfResource(node *v1.Node, resourceName v1.ResourceName, pods []*v1.Pod) (float64, error) {
allocatable, found := node.Status.Allocatable[resourceName]
if !found {
return 0, fmt.Errorf("failed to get %v from %s", resourceName, node.Name)
}
if allocatable.MilliValue() == 0 {
return 0, fmt.Errorf("%v is 0 at %s", resourceName, node.Name)
}
podsRequest := resource.MustParse("0")
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
if resourceValue, found := container.Resources.Requests[resourceName]; found {
podsRequest.Add(resourceValue)
}
}
}
return float64(podsRequest.MilliValue()) / float64(allocatable.MilliValue()), nil
}
9 changes: 6 additions & 3 deletions pkg/controllers/nodeclaim/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider

drift *Drift
expiration *Expiration
emptiness *Emptiness
drift *Drift
expiration *Expiration
emptiness *Emptiness
consolidation *Consolidation
}

// NewController constructs a nodeclaim disruption controller
Expand All @@ -64,6 +65,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, cluster *state.Clu
drift: &Drift{cloudProvider: cloudProvider},
expiration: &Expiration{kubeClient: kubeClient, clock: clk},
emptiness: &Emptiness{kubeClient: kubeClient, cluster: cluster, clock: clk},
consolidation: &Consolidation{kubeClient: kubeClient, cluster: cluster, clock: clk},
}
}

Expand All @@ -90,6 +92,7 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1beta1.NodeClaim
c.expiration,
c.drift,
c.emptiness,
c.consolidation,
}
for _, reconciler := range reconcilers {
res, err := reconciler.Reconcile(ctx, nodePool, nodeClaim)
Expand Down
38 changes: 34 additions & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,26 +311,56 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
return scheduler.Results{}, err
}
pods := append(pendingPods, deletingNodePods...)
// filter pods which are alredy handled in last 3 minute
targetPods := lo.FilterMap(pods, func(pod *v1.Pod, _ int) (*v1.Pod, bool) {
if p.isPodHandled(ctx, pod) {
return nil, false
}
return pod, true
})
// nothing to schedule, so just return success
if len(pods) == 0 {
if len(targetPods) == 0 {
return scheduler.Results{}, nil
}
s, err := p.NewScheduler(ctx, pods, nodes.Active())
s, err := p.NewScheduler(ctx, targetPods, nodes.Active())
if err != nil {
if errors.Is(err, ErrNodePoolsNotFound) {
log.FromContext(ctx).Info("no nodepools found")
return scheduler.Results{}, nil
}
return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
results := s.Solve(ctx, targetPods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
if len(results.NewNodeClaims) > 0 {
log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *v1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)")
log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(targetPods, func(p *v1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)")
}
results.Record(ctx, p.recorder, p.cluster)
return results, nil
}

func (p *Provisioner) isPodHandled(ctx context.Context, pod *v1.Pod) bool {
var events v1.EventList
filter := client.MatchingFields{
"namespace": pod.Namespace,
"involvedObject.kind": "Pod",
"involvedObject.name": pod.Name,
"reason": "HandledByKarpenter",
}
if err := p.kubeClient.List(ctx, &events, filter); err == nil {
for _, event := range events.Items {
// ignore the pod if it's already handled in 3 minute
if time.Now().Before(event.LastTimestamp.Time.Add(3 * time.Minute)) {
log.FromContext(ctx).Info(fmt.Sprintf("pod %s/%s is handled", pod.Namespace, pod.Name))
return true
}
}
} else {
log.FromContext(ctx).Error(err, fmt.Sprintf("failed to get event for %s/%s", pod.Namespace, pod.Name))
}
p.recorder.Publish(scheduler.PodHandledEvent(pod))
return false
}

func (p *Provisioner) Create(ctx context.Context, n *scheduler.NodeClaim, opts ...functional.Option[LaunchOptions]) (string, error) {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodePool", klog.KRef("", n.NodePoolName)))
options := functional.ResolveOptions(opts...)
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/provisioning/scheduling/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,14 @@ func PodFailedToScheduleEvent(pod *v1.Pod, err error) events.Event {
DedupeTimeout: 5 * time.Minute,
}
}

func PodHandledEvent(pod *v1.Pod) events.Event {
return events.Event{
InvolvedObject: pod,
Type: v1.EventTypeNormal,
Reason: "HandledByKarpenter",
Message: "Pod is handled by karpenter",
DedupeValues: []string{string(pod.UID)},
DedupeTimeout: 5 * time.Minute,
}
}
12 changes: 12 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ func NewOperator() (context.Context, *Operator) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.Node{}, "spec.providerID", func(o client.Object) []string {
return []string{o.(*v1.Node).Spec.ProviderID}
}), "failed to setup node provider id indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.Event{}, "involvedObject.kind", func(o client.Object) []string {
return []string{o.(*v1.Event).InvolvedObject.Kind}
}), "failed to setup event kind indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.Event{}, "involvedObject.name", func(o client.Object) []string {
return []string{o.(*v1.Event).InvolvedObject.Name}
}), "failed to setup event name indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.Event{}, "namespace", func(o client.Object) []string {
return []string{o.(*v1.Event).Namespace}
}), "failed to setup event namespace indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.Event{}, "reason", func(o client.Object) []string {
return []string{o.(*v1.Event).Reason}
}), "failed to setup event reason indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "status.providerID", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Status.ProviderID}
}), "failed to setup nodeclaim provider id indexer")
Expand Down

0 comments on commit 7f6baca

Please sign in to comment.