Skip to content

Commit

Permalink
feat: support validate policy of opsrequest
Browse files Browse the repository at this point in the history
  • Loading branch information
xujiahao authored and ian-hui committed Sep 29, 2024
1 parent edf8598 commit 9f847d9
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 68 deletions.
16 changes: 10 additions & 6 deletions apis/operations/v1alpha1/opsrequest_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,16 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com
if err := validateHScaleOperation(scaleOut.ReplicaChanger, scaleOut.NewInstances, scaleOut.OfflineInstancesToOnline, false); err != nil {
return err
}
if len(scaleOut.OfflineInstancesToOnline) > 0 {
offlineInstanceSet := sets.New(compSpec.OfflineInstances...)
for _, offlineInsName := range scaleOut.OfflineInstancesToOnline {
if _, ok := offlineInstanceSet[offlineInsName]; !ok {
return fmt.Errorf(`cannot find the offline instance "%s" in component "%s" for scaleOut operation`, offlineInsName, hScale.ComponentName)
}
}
// instance cannot be both in OfflineInstancesToOnline and OnlineInstancesToOffline
if scaleIn != nil && scaleOut != nil {
offlineToOnlineSet := make(map[string]struct{})
for _, instance := range scaleIn.OnlineInstancesToOffline {
offlineToOnlineSet[instance] = struct{}{}
}
for _, instance := range scaleOut.OfflineInstancesToOnline {
if _, exists := offlineToOnlineSet[instance]; exists {
return fmt.Errorf(`instance "%s" cannot be both in "OfflineInstancesToOnline" and "OnlineInstancesToOffline"`, instance)
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,9 @@ const (
const InvalidContainerPort int32 = 0

const EmptyInsTemplateName = ""

const (
HscaleValidatePolicyKey = "apps.kubeblocks.io/hscale-validate-policy"
HscaleValidatePolicyStrict = "strict"
HscaleValidatePolicyIgnore = "ignore"
)
201 changes: 170 additions & 31 deletions pkg/operations/horizontal_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ package operations

import (
"fmt"
appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"slices"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -97,18 +99,9 @@ func (hs horizontalScalingOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli
if err := compOpsSet.updateClusterComponentsAndShardings(opsRes.Cluster, func(compSpec *appsv1.ClusterComponentSpec, obj ComponentOpsInterface) error {
horizontalScaling := obj.(opsv1alpha1.HorizontalScaling)
lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[obj.GetComponentName()]
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
// check if the instances are online.
currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances,
opsRes.Cluster.Name, obj.GetComponentName())
if err != nil {
return err
}
for _, onlineIns := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
if _, ok := currPodSet[onlineIns]; !ok {
return intctrlutil.NewFatalError(fmt.Sprintf(`instance "%s" specified in onlineInstancesToOffline is not online`, onlineIns))
}
}

if err := hs.validateHorizontalScalingWithPolicy(opsRes, lastCompConfiguration, obj); err != nil {
return err
}
replicas, instances, offlineInstances, err := hs.getExpectedCompValues(opsRes, compSpec.DeepCopy(),
lastCompConfiguration, horizontalScaling)
Expand Down Expand Up @@ -205,6 +198,16 @@ func (hs horizontalScalingOpsHandler) getCreateAndDeletePodSet(opsRes *OpsResour
deletePodSet[k] = appsv1.GetInstanceTemplateName(clusterName, fullCompName, k)
}
}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
for _, v := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
deletePodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v)
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
for _, v := range horizontalScaling.ScaleOut.OfflineInstancesToOnline {
createPodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v)
}
}
if opsRes.OpsRequest.Status.Phase == opsv1alpha1.OpsCancellingPhase {
// when cancelling this opsRequest, revert the changes.
return deletePodSet, createPodSet, nil
Expand Down Expand Up @@ -294,16 +297,56 @@ func (hs horizontalScalingOpsHandler) getExpectedCompValues(
compReplicas := *lastCompConfiguration.Replicas
compInstanceTpls := slices.Clone(lastCompConfiguration.Instances)
compOfflineInstances := lastCompConfiguration.OfflineInstances
expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, horizontalScaling)
err := hs.autoSyncReplicaChanges(opsRes, horizontalScaling, compReplicas, compInstanceTpls, expectOfflineInstances)
filteredHorizontal, err := filterHorizontalScalingSpec(opsRes, compReplicas, compInstanceTpls, compOfflineInstances, horizontalScaling.DeepCopy())
if err != nil {
return 0, nil, nil, err
}
return hs.getCompExpectReplicas(horizontalScaling, compReplicas),
hs.getCompExpectedInstances(compInstanceTpls, horizontalScaling),
expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, *filteredHorizontal)
err = hs.autoSyncReplicaChanges(opsRes, *filteredHorizontal, compReplicas, compInstanceTpls, expectOfflineInstances)
if err != nil {
return 0, nil, nil, err
}
return hs.getCompExpectReplicas(*filteredHorizontal, compReplicas),
hs.getCompExpectedInstances(compInstanceTpls, *filteredHorizontal),
expectOfflineInstances, nil
}

// only offlined instances could be taken online.
// and only onlined instances could be taken offline.
func filterHorizontalScalingSpec(
opsRes *OpsResource,
compReplicas int32,
compInstanceTpls []appsv1.InstanceTemplate,
compOfflineInstances []string,
horizontalScaling *opsv1alpha1.HorizontalScaling) (*opsv1alpha1.HorizontalScaling, error) {
offlineInstances := sets.New(compOfflineInstances...)
podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compOfflineInstances,
opsRes.Cluster.Name, horizontalScaling.ComponentName)
if err != nil {
return nil, err
}
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
onlinedInstanceFromOps := sets.Set[string]{}
for _, insName := range horizontalScaling.ScaleIn.OnlineInstancesToOffline {
if _, ok := podSet[insName]; ok {
onlinedInstanceFromOps.Insert(insName)
}
}
horizontalScaling.ScaleIn.OnlineInstancesToOffline = onlinedInstanceFromOps.UnsortedList()
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
offlinedInstanceFromOps := sets.Set[string]{}
for _, insName := range horizontalScaling.ScaleOut.OfflineInstancesToOnline {
if _, ok := offlineInstances[insName]; ok {
offlinedInstanceFromOps.Insert(insName)
}
}
horizontalScaling.ScaleOut.OfflineInstancesToOnline = offlinedInstanceFromOps.UnsortedList()
}
return horizontalScaling, nil

}

// autoSyncReplicaChanges auto-sync the replicaChanges of the component and instance templates.
func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
opsRes *OpsResource,
Expand Down Expand Up @@ -339,6 +382,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
}
return replicaChanger.Instances, &allReplicaChanges
}

// auto sync the replicaChanges.
scaleIn := horizontalScaling.ScaleIn
if scaleIn != nil {
Expand All @@ -347,21 +391,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges(
}
scaleOut := horizontalScaling.ScaleOut
if scaleOut != nil {
// get the pod set when removing the specified instances from offlineInstances slice
podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compExpectOfflineInstances,
opsRes.Cluster.Name, horizontalScaling.ComponentName)
if err != nil {
return err
}
onlineInsCountMap := map[string]int32{}
for _, insName := range scaleOut.OfflineInstancesToOnline {
if _, ok := podSet[insName]; !ok {
// if the specified instance will not be created, continue
continue
}
insTplName := appsv1.GetInstanceTemplateName(opsRes.Cluster.Name, horizontalScaling.ComponentName, insName)
onlineInsCountMap[insTplName]++
}
onlineInsCountMap := opsRes.OpsRequest.CountOfflineOrOnlineInstances(opsRes.Cluster.Name, horizontalScaling.ComponentName, scaleOut.OfflineInstancesToOnline)
scaleOut.Instances, scaleOut.ReplicaChanges = getSyncedInstancesAndReplicaChanges(onlineInsCountMap, scaleOut.ReplicaChanger, scaleOut.NewInstances)
}
return nil
Expand Down Expand Up @@ -433,3 +463,112 @@ func (hs horizontalScalingOpsHandler) getCompExpectedOfflineInstances(
}
return compOfflineInstances
}

// validate if there is any instance specified in the request that is not exist, return error.
// if HscaleValidatePolicy is StrictScalePolicy or empty, it would validate the instances if they are already offlined or onlined.
func (hs horizontalScalingOpsHandler) validateHorizontalScalingWithPolicy(
opsRes *OpsResource,
lastCompConfiguration opsv1alpha1.LastComponentConfiguration,
obj ComponentOpsInterface,
) error {
horizontalScaling := obj.(opsv1alpha1.HorizontalScaling)
currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances,
opsRes.Cluster.Name, obj.GetComponentName())
if err != nil {
return err
}
offlineInstances := sets.New(lastCompConfiguration.OfflineInstances...)
onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps, notExistInstanceFromOps := hs.collecteAllTypeOfInstancesFromOps(horizontalScaling, currPodSet, offlineInstances)
if notExistInstanceFromOps.Len() > 0 {
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in the request is not exist`, strings.Join(notExistInstanceFromOps.UnsortedList(), ", ")))
}

if policy, exist := opsRes.OpsRequest.Annotations[constant.HscaleValidatePolicyKey]; exist && policy != constant.HscaleValidatePolicyStrict {
return nil
}

if err := hs.strictPolicyValidation(horizontalScaling, onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps); err != nil {
return err
}

return nil
}

// collecteAllTypeOfInstancesFromOps collects the online and offline instances specified in the request.
func (hs horizontalScalingOpsHandler) collecteAllTypeOfInstancesFromOps(
horizontalScaling opsv1alpha1.HorizontalScaling,
currPodSet map[string]string,
offlineInstances sets.Set[string]) (onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps, notExistInstanceFromOps sets.Set[string]) {
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
notExistInstanceFromScaleIn := sets.Set[string]{}

Check failure on line 503 in pkg/operations/horizontal_scaling.go

View workflow job for this annotation

GitHub Actions / pr-pre-check (lint)

ineffectual assignment to notExistInstanceFromScaleIn (ineffassign)
onlinedInstanceFromScaleInOps, _, notExistInstanceFromScaleIn = hs.collectOnlineAndOfflineAndNotExistInstances(
horizontalScaling.ScaleIn.OnlineInstancesToOffline,
offlineInstances,
currPodSet)
if notExistInstanceFromScaleIn.Len() > 0 {
notExistInstanceFromOps = notExistInstanceFromOps.Union(notExistInstanceFromScaleIn)
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
notExistInstanceFromScaleOut := sets.Set[string]{}

Check failure on line 513 in pkg/operations/horizontal_scaling.go

View workflow job for this annotation

GitHub Actions / pr-pre-check (lint)

ineffectual assignment to notExistInstanceFromScaleOut (ineffassign)
_, offlinedInstanceFromScaleOutOps, notExistInstanceFromScaleOut = hs.collectOnlineAndOfflineAndNotExistInstances(
horizontalScaling.ScaleOut.OfflineInstancesToOnline,
offlineInstances,
currPodSet)
if notExistInstanceFromScaleOut.Len() > 0 {
notExistInstanceFromOps = notExistInstanceFromOps.Union(notExistInstanceFromScaleOut)
}
}
return
}

// collect the online and offline instances specified in the request.
func (hs horizontalScalingOpsHandler) collectOnlineAndOfflineAndNotExistInstances(
instance []string,
offlineInstances sets.Set[string],
currPodSet map[string]string) (sets.Set[string], sets.Set[string], sets.Set[string]) {

offlinedInstanceFromOps := sets.Set[string]{}
onlinedInstanceFromOps := sets.Set[string]{}
notExistInstanceFromOps := sets.Set[string]{}
for _, insName := range instance {
if _, ok := offlineInstances[insName]; ok {
offlinedInstanceFromOps.Insert(insName)
continue
}
if _, ok := currPodSet[insName]; ok {
onlinedInstanceFromOps.Insert(insName)
continue
}
notExistInstanceFromOps.Insert(insName)
}
return onlinedInstanceFromOps, offlinedInstanceFromOps, notExistInstanceFromOps
}

// check when setting strict validate policy
// if the instances specified in the request are not offline, return error.
// if the instances duplicate in the request, return error.
func (hs horizontalScalingOpsHandler) strictPolicyValidation(
horizontalScaling opsv1alpha1.HorizontalScaling,
onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps sets.Set[string]) error {

if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
if onlinedInstanceFromScaleInOps.Len() != len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) {
unscalablePods := getMissingElementsInSetFromList(onlinedInstanceFromScaleInOps, horizontalScaling.ScaleIn.OnlineInstancesToOffline)
if unscalablePods == nil {
return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates")
}
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in onlineInstancesToOffline is not online or not exist`, strings.Join(unscalablePods, ", ")))
}
}
if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 {
if offlinedInstanceFromScaleOutOps.Len() != len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) {
unscalablePods := getMissingElementsInSetFromList(offlinedInstanceFromScaleOutOps, horizontalScaling.ScaleOut.OfflineInstancesToOnline)
if unscalablePods == nil {
return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates")
}
return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in offlineInstancesToOnline is not offline or not exist`, strings.Join(unscalablePods, ", ")))
}
}
return nil
}
Loading

0 comments on commit 9f847d9

Please sign in to comment.