Skip to content

Commit

Permalink
support scale to zero
Browse files Browse the repository at this point in the history
  • Loading branch information
skonto committed Sep 18, 2024
1 parent c2fa22d commit 0ec66a7
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/reconciler/autoscaling/hpa/keda_hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1.
// as initialized until the current replicas are >= the min-scale value.
if !pa.Status.IsScaleTargetInitialized() {
ms := activeThreshold(ctx, pa)
if hpa.Status.CurrentReplicas >= int32(ms) {
if hpa.Status.CurrentReplicas >= int32(ms) || hpa.Status.CurrentReplicas == 0 {
pa.Status.MarkScaleTargetInitialized()
}
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/reconciler/autoscaling/hpa/resources/keda.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ func DesiredScaledObject(ctx context.Context, pa *autoscalingv1alpha1.PodAutosca

if min > 0 {
sO.Spec.MinReplicaCount = ptr.Int32(min)
} else {
sO.Spec.MinReplicaCount = ptr.Int32(1)
}

if target, ok := pa.Target(); ok {
Expand Down
243 changes: 228 additions & 15 deletions test/e2e/autoscale_custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

autoscalingv2 "k8s.io/api/autoscaling/v2"
resources2 "knative.dev/autoscaler-keda/pkg/reconciler/autoscaling/hpa/resources"
"knative.dev/pkg/kmap"
pkgTest "knative.dev/pkg/test"
"knative.dev/pkg/test/helpers"
"knative.dev/pkg/test/spoof"
"knative.dev/serving/pkg/apis/autoscaling"
v1 "knative.dev/serving/pkg/apis/serving/v1"
Expand All @@ -56,34 +58,133 @@ const (
)

func TestUpDownCustomMetric(t *testing.T) {
ctx := setupCustomHPASvc(t, "http_requests_total", 5)
metric := "http_requests_total"
target := 5
configAnnotations := map[string]string{
autoscaling.ClassAnnotationKey: autoscaling.HPA,
autoscaling.MetricAnnotationKey: metric,
autoscaling.TargetAnnotationKey: strconv.Itoa(target),
autoscaling.MinScaleAnnotationKey: "1",
autoscaling.MaxScaleAnnotationKey: fmt.Sprintf("%d", int(maxPods)),
autoscaling.WindowAnnotationKey: "20s",
resources2.KedaAutoscaleAnnotationPrometheusQuery: fmt.Sprintf("sum(rate(http_requests_total{namespace='%s'}[1m]))", test.ServingFlags.TestNamespace),
}
ctx := setupCustomHPASvc(t, metric, target, configAnnotations, "")
test.EnsureTearDown(t, ctx.Clients(), ctx.Names())
assertCustomHPAAutoscaleUpToNumPods(ctx, targetPods, time.After(scaleUpTimeout), true /* quick */)
assertScaleDownToOne(ctx)
assertScaleDownToN(ctx, 1)
assertCustomHPAAutoscaleUpToNumPods(ctx, targetPods, time.After(scaleUpTimeout), true /* quick */)
}

func setupCustomHPASvc(t *testing.T, metric string, target int) *TestContext {
func TestScaleToZero(t *testing.T) {
metric := "raw_scale"
target := 5
configAnnotations := map[string]string{
autoscaling.ClassAnnotationKey: autoscaling.HPA,
autoscaling.MetricAnnotationKey: metric,
autoscaling.TargetAnnotationKey: strconv.Itoa(target),
autoscaling.MinScaleAnnotationKey: "0",
autoscaling.MaxScaleAnnotationKey: fmt.Sprintf("%d", int(maxPods)),
autoscaling.WindowAnnotationKey: "20s",
resources2.KedaAutoscaleAnnotationMetricType: string(autoscalingv2.ValueMetricType),
resources2.KedaAutoscaleAnnotationPrometheusQuery: fmt.Sprintf("sum by (service) (%s{namespace='%s'})", metric, test.ServingFlags.TestNamespace),
}

// Create a ksvc that will control another one to scale up/down via its metric values
configAnnotationsScale := map[string]string{
autoscaling.MinScaleAnnotationKey: "1",
autoscaling.MaxScaleAnnotationKey: "1",
}
scaleSvcName := helpers.MakeK8sNamePrefix(strings.TrimPrefix(t.Name(), "scale"))
ctxScale := setupSvc(t, metric, target, configAnnotationsScale, scaleSvcName)
test.EnsureTearDown(t, ctxScale.Clients(), ctxScale.names)

// Create a ksvc that will be scaled up/down based on a metric value set by another, mimicking external metrics
ctx := setupCustomHPASvcFromZero(t, metric, target, configAnnotations, "")
test.EnsureTearDown(t, ctx.Clients(), ctx.Names())

// Set the scale metric to 20, which should create 20/target=4 pods
ctxScale.names.URL.RawQuery = "scale=20"
t.Logf("URL: %v", ctxScale.names.URL)
if _, err := pkgTest.CheckEndpointState(
context.Background(),
ctxScale.clients.KubeClient,
t.Logf,
ctxScale.names.URL,
spoof.MatchesAllOf(spoof.MatchesBody("Scaling to 20")),
"CheckingEndpointScaleText",
test.ServingFlags.ResolvableDomain,
test.AddRootCAtoTransport(context.Background(), t.Logf, ctxScale.clients, test.ServingFlags.HTTPS),
); err != nil {
t.Fatalf("Error probing %s: %v", ctxScale.names.URL.Hostname(), err)
}

// Waiting until HPA status is available, as it takes some time until HPA starts collecting metrics.
if err := waitForHPAState(t, ctx.resources.Revision.Name, ctx.resources.Revision.Namespace, ctx.clients); err != nil {
t.Fatalf("Error collecting metrics by HPA: %v", err)
}

assertAutoscaleUpToNumPods(ctx, targetPods*2, time.After(scaleUpTimeout), true /* quick */)

// Set scale metric to zero
ctxScale.names.URL.RawQuery = "scale=0"
if _, err := pkgTest.CheckEndpointState(
context.Background(),
ctxScale.clients.KubeClient,
t.Logf,
ctxScale.names.URL,
spoof.MatchesAllOf(spoof.MatchesBody("Scaling to 0")),
"CheckingEndpointScaleText",
test.ServingFlags.ResolvableDomain,
test.AddRootCAtoTransport(context.Background(), t.Logf, ctxScale.clients, test.ServingFlags.HTTPS),
); err != nil {
t.Fatalf("Error probing %s: %v", ctxScale.names.URL.Hostname(), err)
}

assertScaleDownToN(ctx, 0)

// Set scale metric to 20 again, which should create 20/target=4 pods
ctxScale.names.URL.RawQuery = "scale=20"
if _, err := pkgTest.CheckEndpointState(
context.Background(),
ctxScale.clients.KubeClient,
t.Logf,
ctxScale.names.URL,
spoof.MatchesAllOf(spoof.MatchesBody("Scaling to 20")),
"CheckingEndpointScaleText",
test.ServingFlags.ResolvableDomain,
test.AddRootCAtoTransport(context.Background(), t.Logf, ctxScale.clients, test.ServingFlags.HTTPS),
); err != nil {
t.Fatalf("Error probing %s: %v", ctxScale.names.URL.Hostname(), err)
}

// Waiting until HPA status is available, as it takes some time until HPA starts collecting metrics again after scale to zero.
// Keda de-activates the HPA if metrics is zero, so we need to wait for it to be active again.
if err := waitForHPAState(t, ctx.resources.Revision.Name, ctx.resources.Revision.Namespace, ctx.clients); err != nil {
t.Fatalf("Error collecting metrics by HPA: %v", err)
}
assertAutoscaleUpToNumPods(ctx, targetPods*2, time.After(scaleUpTimeout), true /* quick */)
}

func setupCustomHPASvc(t *testing.T, metric string, target int, annos map[string]string, svcName string) *TestContext {
t.Helper()
clients := test2e.Setup(t)
var svc string
if svcName != "" {
svc = svcName
} else {
svc = test.ObjectNameForTest(t)
}

t.Log("Creating a new Route and Configuration")
names := &test.ResourceNames{
Service: test.ObjectNameForTest(t),
Service: svc,
Image: autoscaleTestImageName,
}
resources, err := v1test.CreateServiceReady(t, clients, names,
[]rtesting.ServiceOption{
withConfigLabels(map[string]string{"metrics-test": "metrics-test"}),
rtesting.WithConfigAnnotations(map[string]string{
autoscaling.ClassAnnotationKey: autoscaling.HPA,
autoscaling.MetricAnnotationKey: metric,
autoscaling.TargetAnnotationKey: strconv.Itoa(target),
autoscaling.MinScaleAnnotationKey: "1",
autoscaling.MaxScaleAnnotationKey: fmt.Sprintf("%d", int(maxPods)),
autoscaling.WindowAnnotationKey: "20s",
resources2.KedaAutoscaleAnnotationPrometheusQuery: fmt.Sprintf("sum(rate(http_requests_total{namespace='%s'}[1m]))", test.ServingFlags.TestNamespace),
}), rtesting.WithResourceRequirements(corev1.ResourceRequirements{
rtesting.WithConfigAnnotations(annos), rtesting.WithResourceRequirements(corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("30m"),
corev1.ResourceMemory: resource.MustParse("20Mi"),
Expand Down Expand Up @@ -127,6 +228,107 @@ func setupCustomHPASvc(t *testing.T, metric string, target int) *TestContext {
}
}

func setupCustomHPASvcFromZero(t *testing.T, metric string, target int, annos map[string]string, svcName string) *TestContext {
t.Helper()
clients := test2e.Setup(t)
var svc string
if svcName != "" {
svc = svcName
} else {
svc = test.ObjectNameForTest(t)
}

t.Log("Creating a new Route and Configuration")
names := &test.ResourceNames{
Service: svc,
Image: autoscaleTestImageName,
}
resources, err := CreateServiceReady(t, clients, names,
[]rtesting.ServiceOption{
withConfigLabels(map[string]string{"metrics-test": "metrics-test"}),
rtesting.WithConfigAnnotations(annos), rtesting.WithResourceRequirements(corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("30m"),
corev1.ResourceMemory: resource.MustParse("20Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
},
}),
}...)
if err != nil {
t.Fatalf("Failed to create initial Service: %v: %v", names.Service, err)
}

return &TestContext{
t: t,
clients: clients,
names: names,
resources: resources,
autoscaler: &AutoscalerOptions{
Metric: metric,
Target: target,
},
}
}

func setupSvc(t *testing.T, metric string, target int, annos map[string]string, svcName string) *TestContext {
t.Helper()
clients := test2e.Setup(t)
var svc string
if svcName != "" {
svc = svcName
} else {
svc = test.ObjectNameForTest(t)
}

t.Log("Creating a new Route and Configuration")
names := &test.ResourceNames{
Service: svc,
Image: autoscaleTestImageName,
}
resources, err := v1test.CreateServiceReady(t, clients, names,
[]rtesting.ServiceOption{
withConfigLabels(map[string]string{"metrics-test": "metrics-test"}),
rtesting.WithConfigAnnotations(annos), rtesting.WithResourceRequirements(corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("30m"),
corev1.ResourceMemory: resource.MustParse("20Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
},
}),
}...)
if err != nil {
t.Fatalf("Failed to create initial Service: %v: %v", names.Service, err)
}

if _, err := pkgTest.CheckEndpointState(
context.Background(),
clients.KubeClient,
t.Logf,
names.URL,
spoof.MatchesAllOf(spoof.IsStatusOK),
"CheckingEndpointAfterCreate",
test.ServingFlags.ResolvableDomain,
test.AddRootCAtoTransport(context.Background(), t.Logf, clients, test.ServingFlags.HTTPS),
); err != nil {
t.Fatalf("Error probing %s: %v", names.URL.Hostname(), err)
}

return &TestContext{
t: t,
clients: clients,
names: names,
resources: resources,
autoscaler: &AutoscalerOptions{
Metric: metric,
Target: target,
},
}
}

func assertCustomHPAAutoscaleUpToNumPods(ctx *TestContext, targetPods float64, done <-chan time.Time, quick bool) {
ctx.t.Helper()

Expand All @@ -146,7 +348,18 @@ func assertCustomHPAAutoscaleUpToNumPods(ctx *TestContext, targetPods float64, d
}
}

func assertScaleDownToOne(ctx *TestContext) {
func assertAutoscaleUpToNumPods(ctx *TestContext, targetPods float64, done <-chan time.Time, quick bool) {
ctx.t.Helper()
var grp errgroup.Group
grp.Go(func() error {
return checkPodScale(ctx, targetPods, minPods, maxPods, done, quick)
})
if err := grp.Wait(); err != nil {
ctx.t.Fatal(err)
}
}

func assertScaleDownToN(ctx *TestContext, n int) {
deploymentName := resourcenames.Deployment(ctx.resources.Revision)
if err := waitForScaleToOne(ctx.t, deploymentName, ctx.clients); err != nil {
ctx.t.Fatalf("Unable to observe the Deployment named %s scaling down: %v", deploymentName, err)
Expand All @@ -157,7 +370,7 @@ func assertScaleDownToOne(ctx *TestContext) {
context.Background(),
ctx.clients.KubeClient,
func(p *corev1.PodList) (bool, error) {
if !(len(getDepPods(p.Items, deploymentName)) == 1) {
if !(len(getDepPods(p.Items, deploymentName)) == n) {
return false, nil
}
return true, nil
Expand Down
Loading

0 comments on commit 0ec66a7

Please sign in to comment.