Skip to content

Commit

Permalink
Merge branch 'aws:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
garvinp-stripe authored Nov 17, 2023
2 parents c3fd53c + f6988eb commit 5b3ff94
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 164 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/presubmit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ on:
workflow_dispatch:
jobs:
presubmit:
permissions:
issues: write
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on:
tags: ['v*.*.*']
jobs:
release:
permissions:
contents: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
30 changes: 0 additions & 30 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,6 @@ spec:
expireAfter: 720h
description: Disruption contains the parameters that relate to Karpenter's disruption logic
properties:
budgets:
default:
- maxUnavailable: 10%
description: Budgets is a list of Budgets. If there are multiple active budgets, Karpenter uses the most restrictive maxUnavailable. If left undefined, this will default to one budget with a maxUnavailable to 10%.
items:
description: Budget defines when Karpenter will restrict the number of Node Claims that can be terminating simultaneously.
properties:
crontab:
description: Crontab specifies when a budget begins being active, using the upstream cronjob syntax. If omitted, the budget is always active. Currently timezones are not supported. This is required if Duration is set.
pattern: ^(@(annually|yearly|monthly|weekly|daily|midnight|hourly))|((.*)\s(.*)\s(.*)\s(.*)\s(.*))$
type: string
duration:
description: Duration determines how long a Budget is active since each Crontab hit. If omitted, the budget is always active. This is required if Crontab is set.
pattern: ^(([0-9]+(s|m|h))+)|(Never)$
type: string
maxUnavailable:
anyOf:
- type: integer
- type: string
default: 10%
description: MaxUnavailable dictates how many NodeClaims owned by this NodePool can be terminating at once. It must be set. This only considers NodeClaims with the karpenter.sh/disruption taint.
x-kubernetes-int-or-string: true
required:
- maxUnavailable
type: object
maxItems: 50
type: array
x-kubernetes-validations:
- message: '''crontab'' must be set with ''duration'''
rule: '!self.all(x, (has(x.crontab) && !has(x.duration)) || (!has(x.crontab) && has(x.duration)))'
consolidateAfter:
description: ConsolidateAfter is the duration the controller will wait before attempting to terminate nodes that are underutilized. Refer to ConsolidationPolicy for how underutilization is considered.
pattern: ^(([0-9]+(s|m|h))+)|(Never)$
Expand Down
35 changes: 0 additions & 35 deletions pkg/apis/v1beta1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"knative.dev/pkg/ptr"
)

Expand Down Expand Up @@ -79,40 +78,6 @@ type Disruption struct {
// +kubebuilder:validation:Schemaless
// +optional
ExpireAfter NillableDuration `json:"expireAfter"`
// Budgets is a list of Budgets.
// If there are multiple active budgets, Karpenter uses
// the most restrictive maxUnavailable. If left undefined,
// this will default to one budget with a maxUnavailable to 10%.
// +kubebuilder:validation:XValidation:message="'crontab' must be set with 'duration'",rule="!self.all(x, (has(x.crontab) && !has(x.duration)) || (!has(x.crontab) && has(x.duration)))"
// +kubebuilder:default:={{maxUnavailable: "10%"}}
// +kubebuilder:validation:MaxItems=50
// +optional
Budgets []Budget `json:"budgets,omitempty" hash:"ignore"`
}

// Budget defines when Karpenter will restrict the
// number of Node Claims that can be terminating simultaneously.
type Budget struct {
// MaxUnavailable dictates how many NodeClaims owned by this NodePool
// can be terminating at once. It must be set.
// This only considers NodeClaims with the karpenter.sh/disruption taint.
// +kubebuilder:validation:XIntOrString
// +kubebuilder:default:="10%"
MaxUnavailable intstr.IntOrString `json:"maxUnavailable" hash:"ignore"`
// Crontab specifies when a budget begins being active,
// using the upstream cronjob syntax. If omitted, the budget is always active.
// Currently timezones are not supported.
// This is required if Duration is set.
// +kubebuilder:validation:Pattern:=`^(@(annually|yearly|monthly|weekly|daily|midnight|hourly))|((.*)\s(.*)\s(.*)\s(.*)\s(.*))$`
// +optional
Crontab *string `json:"crontab,omitempty" hash:"ignore"`
// Duration determines how long a Budget is active since each Crontab hit.
// If omitted, the budget is always active.
// This is required if Crontab is set.
// +kubebuilder:validation:Pattern=`^(([0-9]+(s|m|h))+)|(Never)$`
// +kubebuilder:validation:Type="string"
// +optional
Duration *metav1.Duration `json:"duration,omitempty" hash:"ignore"`
}

type ConsolidationPolicy string
Expand Down
66 changes: 0 additions & 66 deletions pkg/apis/v1beta1/nodepool_validation_cel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/ptr"

Expand Down Expand Up @@ -101,71 +100,6 @@ var _ = Describe("CEL/Validation", func() {
nodePool.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenUnderutilized
Expect(env.Client.Create(ctx, nodePool)).To(Succeed())
})
It("should fail when creating a budget with an invalid cron", func() {
nodePool.Spec.Disruption.Budgets = []Budget{{
MaxUnavailable: intstr.FromInt(10),
Crontab: ptr.String("*"),
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("30s"))},
}}
Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed())
})
It("should fail when creating a budget with a negative duration", func() {
nodePool.Spec.Disruption.Budgets = []Budget{{
MaxUnavailable: intstr.FromInt(10),
Crontab: ptr.String("* * * * *"),
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("-30s"))},
}}
Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed())
})
It("should fail when creating a budget with a cron but no duration", func() {
nodePool.Spec.Disruption.Budgets = []Budget{{
MaxUnavailable: intstr.FromInt(10),
Crontab: ptr.String("* * * * *"),
}}
Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed())
})
It("should fail when creating a budget with a duration but no cron", func() {
nodePool.Spec.Disruption.Budgets = []Budget{{
MaxUnavailable: intstr.FromInt(10),
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("-30s"))},
}}
Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed())
})
It("should succeed when creating a budget with both duration and cron", func() {
nodePool.Spec.Disruption.Budgets = []Budget{{
MaxUnavailable: intstr.FromInt(10),
Crontab: ptr.String("* * * * *"),
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("30s"))},
}}
Expect(env.Client.Create(ctx, nodePool)).To(Succeed())
})
It("should succeed when creating a budget with neither duration nor cron", func() {
nodePool.Spec.Disruption.Budgets = []Budget{{
MaxUnavailable: intstr.FromInt(10),
}}
Expect(env.Client.Create(ctx, nodePool)).To(Succeed())
})
It("should succeed when creating a budget with special cased crons", func() {
nodePool.Spec.Disruption.Budgets = []Budget{{
MaxUnavailable: intstr.FromInt(10),
Crontab: ptr.String("@annually"),
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("30s"))},
}}
Expect(env.Client.Create(ctx, nodePool)).To(Succeed())
})
It("should fail when creating two budgets where one is invalid", func() {
nodePool.Spec.Disruption.Budgets = []Budget{{
MaxUnavailable: intstr.FromInt(10),
Crontab: ptr.String("@annually"),
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("30s"))},
},
{
MaxUnavailable: intstr.FromInt(10),
Crontab: ptr.String("*"),
Duration: &metav1.Duration{Duration: lo.Must(time.ParseDuration("30s"))},
}}
Expect(env.Client.Create(ctx, nodePool)).ToNot(Succeed())
})
})
Context("KubeletConfiguration", func() {
It("should succeed on kubeReserved with invalid keys", func() {
Expand Down
33 changes: 0 additions & 33 deletions pkg/apis/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
return reconcile.Result{}, fmt.Errorf("removing taint from nodes, %w", err)
}

// Check if the queue is processing an item. If it is, retry again later.
// TODO this should be removed when disruption budgets are added in.
if !c.queue.IsEmpty() {
return reconcile.Result{RequeueAfter: time.Second}, nil
}

// Attempt different disruption methods. We'll only let one method perform an action
for _, m := range c.methods {
c.recordRun(fmt.Sprintf("%T", m))
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,9 @@ func (q *Queue) Reset() {
q.RateLimitingInterface = &controllertest.Queue{Interface: workqueue.New()}
q.providerIDToCommand = map[string]*Command{}
}

func (q *Queue) IsEmpty() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.providerIDToCommand) == 0
}
138 changes: 138 additions & 0 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,144 @@ var _ = AfterEach(func() {
ExpectCleanedUp(ctx, env.Client)
})

// TODO remove this when Budgets are added in
var _ = Describe("Queue Limits", func() {
var nodePool *v1beta1.NodePool
var nodeClaim, nodeClaim2 *v1beta1.NodeClaim
var node, node2 *v1.Node
BeforeEach(func() {
currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{
Name: "current-on-demand",
Offerings: []cloudprovider.Offering{
{
CapacityType: v1beta1.CapacityTypeOnDemand,
Zone: "test-zone-1a",
Price: 1.5,
Available: false,
},
},
})
replacementInstance := fake.NewInstanceType(fake.InstanceTypeOptions{
Name: "spot-replacement",
Offerings: []cloudprovider.Offering{
{
CapacityType: v1beta1.CapacityTypeSpot,
Zone: "test-zone-1a",
Price: 1.0,
Available: true,
},
{
CapacityType: v1beta1.CapacityTypeSpot,
Zone: "test-zone-1b",
Price: 0.2,
Available: true,
},
{
CapacityType: v1beta1.CapacityTypeSpot,
Zone: "test-zone-1c",
Price: 0.4,
Available: true,
},
},
})
nodePool = test.NodePool()
nodeClaim, node = test.NodeClaimAndNode(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.LabelInstanceTypeStable: currentInstance.Name,
v1beta1.CapacityTypeLabelKey: currentInstance.Offerings[0].CapacityType,
v1.LabelTopologyZone: currentInstance.Offerings[0].Zone,
v1beta1.NodePoolLabelKey: nodePool.Name,
},
},
Status: v1beta1.NodeClaimStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("3"),
v1.ResourcePods: resource.MustParse("100"),
},
},
})
nodeClaim2, node2 = test.NodeClaimAndNode(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.LabelInstanceTypeStable: currentInstance.Name,
v1beta1.CapacityTypeLabelKey: currentInstance.Offerings[0].CapacityType,
v1.LabelTopologyZone: currentInstance.Offerings[0].Zone,
v1beta1.NodePoolLabelKey: nodePool.Name,
},
},
Status: v1beta1.NodeClaimStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("3"),
v1.ResourcePods: resource.MustParse("100"),
},
},
})
cloudProvider.InstanceTypes = []*cloudprovider.InstanceType{
currentInstance,
replacementInstance,
}
// Mark the nodes as drifted so they'll be both be candidates
nodeClaim.StatusConditions().MarkTrue(v1beta1.Drifted)
nodeClaim2.StatusConditions().MarkTrue(v1beta1.Drifted)
})
It("should be able to disrupt two nodes with replace, but only ever be disrupting one at a time", func() {
labels := map[string]string{
"app": "test",
}
// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pods := test.Pods(2, test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(true),
},
}}})

ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], nodeClaim, nodeClaim2, node, node2, nodePool)

// bind the pods to the nodes so that they're both non-empty
ExpectManualBinding(ctx, env.Client, pods[0], node)
ExpectManualBinding(ctx, env.Client, pods[1], node2)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node, node2}, []*v1beta1.NodeClaim{nodeClaim, nodeClaim2})

// Do one reconcile to add one node to the queue
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})

Expect(queue.Len()).To(BeNumerically("==", 1))
// Process the item but still expect it to be in the queue, since it's replacements aren't created
ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})
ExpectNodeExists(ctx, env.Client, node.Name)
ExpectNodeExists(ctx, env.Client, node2.Name)
Expect(queue.Len()).To(BeNumerically("==", 1))

// Do another reconcile to try to add another node to the queue.
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
ExpectNodeExists(ctx, env.Client, node.Name)
ExpectNodeExists(ctx, env.Client, node2.Name)
// Expect that the queue length has not increased
Expect(queue.Len()).To(BeNumerically("==", 1))
})
})

var _ = Describe("Disruption Taints", func() {
var nodePool *v1beta1.NodePool
var nodeClaim *v1beta1.NodeClaim
Expand Down

0 comments on commit 5b3ff94

Please sign in to comment.