Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support validate policy of opsrequest #8232

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions apis/operations/v1alpha1/opsrequest_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,16 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com
if err := validateHScaleOperation(scaleOut.ReplicaChanger, scaleOut.NewInstances, scaleOut.OfflineInstancesToOnline, false); err != nil {
return err
}
if len(scaleOut.OfflineInstancesToOnline) > 0 {
offlineInstanceSet := sets.New(compSpec.OfflineInstances...)
for _, offlineInsName := range scaleOut.OfflineInstancesToOnline {
if _, ok := offlineInstanceSet[offlineInsName]; !ok {
return fmt.Errorf(`cannot find the offline instance "%s" in component "%s" for scaleOut operation`, offlineInsName, hScale.ComponentName)
}
}
// instance cannot be both in OfflineInstancesToOnline and OnlineInstancesToOffline
if scaleIn != nil && scaleOut != nil {
offlineToOnlineSet := make(map[string]struct{})
for _, instance := range scaleIn.OnlineInstancesToOffline {
offlineToOnlineSet[instance] = struct{}{}
}
for _, instance := range scaleOut.OfflineInstancesToOnline {
if _, exists := offlineToOnlineSet[instance]; exists {
return fmt.Errorf(`instance "%s" cannot be both in "OfflineInstancesToOnline" and "OnlineInstancesToOffline"`, instance)
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,9 @@ const (
const InvalidContainerPort int32 = 0

const EmptyInsTemplateName = ""

const (
HscaleValidatePolicyKey = "apps.kubeblocks.io/hscale-validate-policy"
HscaleValidatePolicyStrict = "strict"
HscaleValidatePolicyIgnore = "ignore"
)
201 changes: 170 additions & 31 deletions pkg/operations/horizontal_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import (
"fmt"
appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"slices"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -97,18 +99,9 @@
if err := compOpsSet.updateClusterComponentsAndShardings(opsRes.Cluster, func(compSpec *appsv1.ClusterComponentSpec, obj ComponentOpsInterface) error {
horizontalScaling := obj.(opsv1alpha1.HorizontalScaling)
lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[obj.GetComponentName()]
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
// check if the instances are online.
currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances,
opsRes.Cluster.Name, obj.GetComponentName())
if err != nil {
return err
}
for _, onlineIns := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
if _, ok := currPodSet[onlineIns]; !ok {
return intctrlutil.NewFatalError(fmt.Sprintf(`instance "%s" specified in onlineInstancesToOffline is not online`, onlineIns))
}
}

if err := hs.validateHorizontalScalingWithPolicy(opsRes, lastCompConfiguration, obj); err != nil {
return err
}
replicas, instances, offlineInstances, err := hs.getExpectedCompValues(opsRes, compSpec.DeepCopy(),
lastCompConfiguration, horizontalScaling)
Expand Down Expand Up @@ -205,6 +198,16 @@
deletePodSet[k] = appsv1.GetInstanceTemplateName(clusterName, fullCompName, k)
}
}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
for _, v := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
deletePodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v)
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
for _, v := range horizontalScaling.ScaleOut.OfflineInstancesToOnline {
createPodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v)
}
}
if opsRes.OpsRequest.Status.Phase == opsv1alpha1.OpsCancellingPhase {
// when cancelling this opsRequest, revert the changes.
return deletePodSet, createPodSet, nil
Expand Down Expand Up @@ -294,16 +297,56 @@
compReplicas := *lastCompConfiguration.Replicas
compInstanceTpls := slices.Clone(lastCompConfiguration.Instances)
compOfflineInstances := lastCompConfiguration.OfflineInstances
expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, horizontalScaling)
err := hs.autoSyncReplicaChanges(opsRes, horizontalScaling, compReplicas, compInstanceTpls, expectOfflineInstances)
filteredHorizontal, err := filterHorizontalScalingSpec(opsRes, compReplicas, compInstanceTpls, compOfflineInstances, horizontalScaling.DeepCopy())
if err != nil {
return 0, nil, nil, err
}
return hs.getCompExpectReplicas(horizontalScaling, compReplicas),
hs.getCompExpectedInstances(compInstanceTpls, horizontalScaling),
expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, *filteredHorizontal)
err = hs.autoSyncReplicaChanges(opsRes, *filteredHorizontal, compReplicas, compInstanceTpls, expectOfflineInstances)
if err != nil {
return 0, nil, nil, err
}
return hs.getCompExpectReplicas(*filteredHorizontal, compReplicas),
hs.getCompExpectedInstances(compInstanceTpls, *filteredHorizontal),
expectOfflineInstances, nil
}

// only offlined instances could be taken online.
// and only onlined instances could be taken offline.
func filterHorizontalScalingSpec(
opsRes *OpsResource,
compReplicas int32,
compInstanceTpls []appsv1.InstanceTemplate,
compOfflineInstances []string,
horizontalScaling *opsv1alpha1.HorizontalScaling) (*opsv1alpha1.HorizontalScaling, error) {
offlineInstances := sets.New(compOfflineInstances...)
podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compOfflineInstances,
opsRes.Cluster.Name, horizontalScaling.ComponentName)
if err != nil {
return nil, err
}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
onlinedInstanceFromOps := sets.Set[string]{}
for _, insName := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
if _, ok := podSet[insName]; ok {
onlinedInstanceFromOps.Insert(insName)
}
}
horizontalScaling.ScaleIn.OnlineInstancesToOffline = onlinedInstanceFromOps.UnsortedList()
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
offlinedInstanceFromOps := sets.Set[string]{}
for _, insName := range horizontalScaling.ScaleOut.OfflineInstancesToOnline {
if _, ok := offlineInstances[insName]; ok {
offlinedInstanceFromOps.Insert(insName)
}
}
horizontalScaling.ScaleOut.OfflineInstancesToOnline = offlinedInstanceFromOps.UnsortedList()
}
return horizontalScaling, nil

}

// autoSyncReplicaChanges auto-sync the replicaChanges of the component and instance templates.
func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
opsRes *OpsResource,
Expand Down Expand Up @@ -339,6 +382,7 @@
}
return replicaChanger.Instances, &allReplicaChanges
}

// auto sync the replicaChanges.
scaleIn := horizontalScaling.ScaleIn
if scaleIn != nil {
Expand All @@ -347,21 +391,7 @@
}
scaleOut := horizontalScaling.ScaleOut
if scaleOut != nil {
// get the pod set when removing the specified instances from offlineInstances slice
podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compExpectOfflineInstances,
opsRes.Cluster.Name, horizontalScaling.ComponentName)
if err != nil {
return err
}
onlineInsCountMap := map[string]int32{}
for _, insName := range scaleOut.OfflineInstancesToOnline {
if _, ok := podSet[insName]; !ok {
// if the specified instance will not be created, continue
continue
}
insTplName := appsv1.GetInstanceTemplateName(opsRes.Cluster.Name, horizontalScaling.ComponentName, insName)
onlineInsCountMap[insTplName]++
}
onlineInsCountMap := opsRes.OpsRequest.CountOfflineOrOnlineInstances(opsRes.Cluster.Name, horizontalScaling.ComponentName, scaleOut.OfflineInstancesToOnline)
scaleOut.Instances, scaleOut.ReplicaChanges = getSyncedInstancesAndReplicaChanges(onlineInsCountMap, scaleOut.ReplicaChanger, scaleOut.NewInstances)
}
return nil
Expand Down Expand Up @@ -433,3 +463,112 @@
}
return compOfflineInstances
}

// validate if there is any instance specified in the request that is not exist, return error.
// if HscaleValidatePolicy is StrictScalePolicy or empty, it would validate the instances if they are already offlined or onlined.
func (hs horizontalScalingOpsHandler) validateHorizontalScalingWithPolicy(
opsRes *OpsResource,
lastCompConfiguration opsv1alpha1.LastComponentConfiguration,
obj ComponentOpsInterface,
) error {
horizontalScaling := obj.(opsv1alpha1.HorizontalScaling)
currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances,
opsRes.Cluster.Name, obj.GetComponentName())
if err != nil {
return err
}
offlineInstances := sets.New(lastCompConfiguration.OfflineInstances...)
onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps, notExistInstanceFromOps := hs.collecteAllTypeOfInstancesFromOps(horizontalScaling, currPodSet, offlineInstances)
if notExistInstanceFromOps.Len() > 0 {
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in the request is not exist`, strings.Join(notExistInstanceFromOps.UnsortedList(), ", ")))
}

if policy, exist := opsRes.OpsRequest.Annotations[constant.HscaleValidatePolicyKey]; exist && policy != constant.HscaleValidatePolicyStrict {
return nil
}

if err := hs.strictPolicyValidation(horizontalScaling, onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps); err != nil {
return err
}

return nil
}

// collecteAllTypeOfInstancesFromOps collects the online and offline instances specified in the request.
func (hs horizontalScalingOpsHandler) collecteAllTypeOfInstancesFromOps(
horizontalScaling opsv1alpha1.HorizontalScaling,
currPodSet map[string]string,
offlineInstances sets.Set[string]) (onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps, notExistInstanceFromOps sets.Set[string]) {
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
notExistInstanceFromScaleIn := sets.Set[string]{}

Check failure on line 503 in pkg/operations/horizontal_scaling.go

View workflow job for this annotation

GitHub Actions / pr-pre-check (lint)

ineffectual assignment to notExistInstanceFromScaleIn (ineffassign)
onlinedInstanceFromScaleInOps, _, notExistInstanceFromScaleIn = hs.collectOnlineAndOfflineAndNotExistInstances(
horizontalScaling.ScaleIn.OnlineInstancesToOffline,
offlineInstances,
currPodSet)
if notExistInstanceFromScaleIn.Len() > 0 {
notExistInstanceFromOps = notExistInstanceFromOps.Union(notExistInstanceFromScaleIn)
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
notExistInstanceFromScaleOut := sets.Set[string]{}

Check failure on line 513 in pkg/operations/horizontal_scaling.go

View workflow job for this annotation

GitHub Actions / pr-pre-check (lint)

ineffectual assignment to notExistInstanceFromScaleOut (ineffassign)
_, offlinedInstanceFromScaleOutOps, notExistInstanceFromScaleOut = hs.collectOnlineAndOfflineAndNotExistInstances(
horizontalScaling.ScaleOut.OfflineInstancesToOnline,
offlineInstances,
currPodSet)
if notExistInstanceFromScaleOut.Len() > 0 {
notExistInstanceFromOps = notExistInstanceFromOps.Union(notExistInstanceFromScaleOut)
}
}
return
}

// collect the online and offline instances specified in the request.
func (hs horizontalScalingOpsHandler) collectOnlineAndOfflineAndNotExistInstances(
instance []string,
offlineInstances sets.Set[string],
currPodSet map[string]string) (sets.Set[string], sets.Set[string], sets.Set[string]) {

offlinedInstanceFromOps := sets.Set[string]{}
onlinedInstanceFromOps := sets.Set[string]{}
notExistInstanceFromOps := sets.Set[string]{}
for _, insName := range instance {
if _, ok := offlineInstances[insName]; ok {
offlinedInstanceFromOps.Insert(insName)
continue
}
if _, ok := currPodSet[insName]; ok {
onlinedInstanceFromOps.Insert(insName)
continue
}
notExistInstanceFromOps.Insert(insName)
}
return onlinedInstanceFromOps, offlinedInstanceFromOps, notExistInstanceFromOps
}

// check when setting strict validate policy
// if the instances specified in the request are not offline, return error.
// if the instances duplicate in the request, return error.
func (hs horizontalScalingOpsHandler) strictPolicyValidation(
horizontalScaling opsv1alpha1.HorizontalScaling,
onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps sets.Set[string]) error {

if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
if onlinedInstanceFromScaleInOps.Len() != len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) {
unscalablePods := getMissingElementsInSetFromList(onlinedInstanceFromScaleInOps, horizontalScaling.ScaleIn.OnlineInstancesToOffline)
if unscalablePods == nil {
return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates")
}
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in onlineInstancesToOffline is not online or not exist`, strings.Join(unscalablePods, ", ")))
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
if offlinedInstanceFromScaleOutOps.Len() != len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) {
unscalablePods := getMissingElementsInSetFromList(offlinedInstanceFromScaleOutOps, horizontalScaling.ScaleOut.OfflineInstancesToOnline)
if unscalablePods == nil {
return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates")
}
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in offlineInstancesToOnline is not offline or not exist`, strings.Join(unscalablePods, ", ")))
}
}
return nil
}
Loading
Loading