From 9f847d9460deb6820f46e3372af8a3f24c3fbde3 Mon Sep 17 00:00:00 2001 From: xujiahao Date: Mon, 2 Sep 2024 12:46:12 +0000 Subject: [PATCH] feat: support validate policy of opsrequest --- .../v1alpha1/opsrequest_validation.go | 16 +- pkg/constant/const.go | 6 + pkg/operations/horizontal_scaling.go | 201 +++++++++++++++--- pkg/operations/horizontal_scaling_test.go | 140 ++++++++++-- pkg/operations/ops_progress_util_test.go | 4 +- pkg/operations/ops_util.go | 11 + pkg/operations/ops_util_test.go | 12 +- 7 files changed, 322 insertions(+), 68 deletions(-) diff --git a/apis/operations/v1alpha1/opsrequest_validation.go b/apis/operations/v1alpha1/opsrequest_validation.go index 948f3a0c05e0..5cb329807fad 100644 --- a/apis/operations/v1alpha1/opsrequest_validation.go +++ b/apis/operations/v1alpha1/opsrequest_validation.go @@ -436,12 +436,16 @@ func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, com if err := validateHScaleOperation(scaleOut.ReplicaChanger, scaleOut.NewInstances, scaleOut.OfflineInstancesToOnline, false); err != nil { return err } - if len(scaleOut.OfflineInstancesToOnline) > 0 { - offlineInstanceSet := sets.New(compSpec.OfflineInstances...) - for _, offlineInsName := range scaleOut.OfflineInstancesToOnline { - if _, ok := offlineInstanceSet[offlineInsName]; !ok { - return fmt.Errorf(`cannot find the offline instance "%s" in component "%s" for scaleOut operation`, offlineInsName, hScale.ComponentName) - } + } + // instance cannot be both in OfflineInstancesToOnline and OnlineInstancesToOffline + if scaleIn != nil && scaleOut != nil { + offlineToOnlineSet := make(map[string]struct{}) + for _, instance := range scaleIn.OnlineInstancesToOffline { + offlineToOnlineSet[instance] = struct{}{} + } + for _, instance := range scaleOut.OfflineInstancesToOnline { + if _, exists := offlineToOnlineSet[instance]; exists { + return fmt.Errorf(`instance "%s" cannot be both in "OfflineInstancesToOnline" and "OnlineInstancesToOffline"`, instance) } } } diff --git a/pkg/constant/const.go b/pkg/constant/const.go index ec8c3d915c2f..17da1e1f0b8d 100644 --- a/pkg/constant/const.go +++ b/pkg/constant/const.go @@ -67,3 +67,9 @@ const ( const InvalidContainerPort int32 = 0 const EmptyInsTemplateName = "" + +const ( + HscaleValidatePolicyKey = "apps.kubeblocks.io/hscale-validate-policy" + HscaleValidatePolicyStrict = "strict" + HscaleValidatePolicyIgnore = "ignore" +) diff --git a/pkg/operations/horizontal_scaling.go b/pkg/operations/horizontal_scaling.go index 86ca71999058..28d80b653808 100644 --- a/pkg/operations/horizontal_scaling.go +++ b/pkg/operations/horizontal_scaling.go @@ -21,7 +21,9 @@ package operations import ( "fmt" + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" "slices" + "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -97,18 +99,9 @@ func (hs horizontalScalingOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli if err := compOpsSet.updateClusterComponentsAndShardings(opsRes.Cluster, func(compSpec *appsv1.ClusterComponentSpec, obj ComponentOpsInterface) error { horizontalScaling := obj.(opsv1alpha1.HorizontalScaling) lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[obj.GetComponentName()] - if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 { - // check if the instances are online. - currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances, - opsRes.Cluster.Name, obj.GetComponentName()) - if err != nil { - return err - } - for _, onlineIns := range horizontalScaling.ScaleIn.OnlineInstancesToOffline { - if _, ok := currPodSet[onlineIns]; !ok { - return intctrlutil.NewFatalError(fmt.Sprintf(`instance "%s" specified in onlineInstancesToOffline is not online`, onlineIns)) - } - } + + if err := hs.validateHorizontalScalingWithPolicy(opsRes, lastCompConfiguration, obj); err != nil { + return err } replicas, instances, offlineInstances, err := hs.getExpectedCompValues(opsRes, compSpec.DeepCopy(), lastCompConfiguration, horizontalScaling) @@ -205,6 +198,16 @@ func (hs horizontalScalingOpsHandler) getCreateAndDeletePodSet(opsRes *OpsResour deletePodSet[k] = appsv1.GetInstanceTemplateName(clusterName, fullCompName, k) } } + if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 { + for _, v := range horizontalScaling.ScaleIn.OnlineInstancesToOffline { + deletePodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v) + } + } + if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 { + for _, v := range horizontalScaling.ScaleOut.OfflineInstancesToOnline { + createPodSet[v] = appsv1alpha1.GetInstanceTemplateName(clusterName, fullCompName, v) + } + } if opsRes.OpsRequest.Status.Phase == opsv1alpha1.OpsCancellingPhase { // when cancelling this opsRequest, revert the changes. return deletePodSet, createPodSet, nil @@ -294,16 +297,56 @@ func (hs horizontalScalingOpsHandler) getExpectedCompValues( compReplicas := *lastCompConfiguration.Replicas compInstanceTpls := slices.Clone(lastCompConfiguration.Instances) compOfflineInstances := lastCompConfiguration.OfflineInstances - expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, horizontalScaling) - err := hs.autoSyncReplicaChanges(opsRes, horizontalScaling, compReplicas, compInstanceTpls, expectOfflineInstances) + filteredHorizontal, err := filterHorizontalScalingSpec(opsRes, compReplicas, compInstanceTpls, compOfflineInstances, horizontalScaling.DeepCopy()) if err != nil { return 0, nil, nil, err } - return hs.getCompExpectReplicas(horizontalScaling, compReplicas), - hs.getCompExpectedInstances(compInstanceTpls, horizontalScaling), + expectOfflineInstances := hs.getCompExpectedOfflineInstances(compOfflineInstances, *filteredHorizontal) + err = hs.autoSyncReplicaChanges(opsRes, *filteredHorizontal, compReplicas, compInstanceTpls, expectOfflineInstances) + if err != nil { + return 0, nil, nil, err + } + return hs.getCompExpectReplicas(*filteredHorizontal, compReplicas), + hs.getCompExpectedInstances(compInstanceTpls, *filteredHorizontal), expectOfflineInstances, nil } +// only offlined instances could be taken online. +// and only onlined instances could be taken offline. +func filterHorizontalScalingSpec( + opsRes *OpsResource, + compReplicas int32, + compInstanceTpls []appsv1.InstanceTemplate, + compOfflineInstances []string, + horizontalScaling *opsv1alpha1.HorizontalScaling) (*opsv1alpha1.HorizontalScaling, error) { + offlineInstances := sets.New(compOfflineInstances...) + podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compOfflineInstances, + opsRes.Cluster.Name, horizontalScaling.ComponentName) + if err != nil { + return nil, err + } + if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 { + onlinedInstanceFromOps := sets.Set[string]{} + for _, insName := range horizontalScaling.ScaleIn.OnlineInstancesToOffline { + if _, ok := podSet[insName]; ok { + onlinedInstanceFromOps.Insert(insName) + } + } + horizontalScaling.ScaleIn.OnlineInstancesToOffline = onlinedInstanceFromOps.UnsortedList() + } + if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 { + offlinedInstanceFromOps := sets.Set[string]{} + for _, insName := range horizontalScaling.ScaleOut.OfflineInstancesToOnline { + if _, ok := offlineInstances[insName]; ok { + offlinedInstanceFromOps.Insert(insName) + } + } + horizontalScaling.ScaleOut.OfflineInstancesToOnline = offlinedInstanceFromOps.UnsortedList() + } + return horizontalScaling, nil + +} + // autoSyncReplicaChanges auto-sync the replicaChanges of the component and instance templates. func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges( opsRes *OpsResource, @@ -339,6 +382,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges( } return replicaChanger.Instances, &allReplicaChanges } + // auto sync the replicaChanges. scaleIn := horizontalScaling.ScaleIn if scaleIn != nil { @@ -347,21 +391,7 @@ func (hs horizontalScalingOpsHandler) autoSyncReplicaChanges( } scaleOut := horizontalScaling.ScaleOut if scaleOut != nil { - // get the pod set when removing the specified instances from offlineInstances slice - podSet, err := intctrlcomp.GenerateAllPodNamesToSet(compReplicas, compInstanceTpls, compExpectOfflineInstances, - opsRes.Cluster.Name, horizontalScaling.ComponentName) - if err != nil { - return err - } - onlineInsCountMap := map[string]int32{} - for _, insName := range scaleOut.OfflineInstancesToOnline { - if _, ok := podSet[insName]; !ok { - // if the specified instance will not be created, continue - continue - } - insTplName := appsv1.GetInstanceTemplateName(opsRes.Cluster.Name, horizontalScaling.ComponentName, insName) - onlineInsCountMap[insTplName]++ - } + onlineInsCountMap := opsRes.OpsRequest.CountOfflineOrOnlineInstances(opsRes.Cluster.Name, horizontalScaling.ComponentName, scaleOut.OfflineInstancesToOnline) scaleOut.Instances, scaleOut.ReplicaChanges = getSyncedInstancesAndReplicaChanges(onlineInsCountMap, scaleOut.ReplicaChanger, scaleOut.NewInstances) } return nil @@ -433,3 +463,112 @@ func (hs horizontalScalingOpsHandler) getCompExpectedOfflineInstances( } return compOfflineInstances } + +// validate if there is any instance specified in the request that is not exist, return error. +// if HscaleValidatePolicy is StrictScalePolicy or empty, it would validate the instances if they are already offlined or onlined. +func (hs horizontalScalingOpsHandler) validateHorizontalScalingWithPolicy( + opsRes *OpsResource, + lastCompConfiguration opsv1alpha1.LastComponentConfiguration, + obj ComponentOpsInterface, +) error { + horizontalScaling := obj.(opsv1alpha1.HorizontalScaling) + currPodSet, err := intctrlcomp.GenerateAllPodNamesToSet(*lastCompConfiguration.Replicas, lastCompConfiguration.Instances, lastCompConfiguration.OfflineInstances, + opsRes.Cluster.Name, obj.GetComponentName()) + if err != nil { + return err + } + offlineInstances := sets.New(lastCompConfiguration.OfflineInstances...) + onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps, notExistInstanceFromOps := hs.collecteAllTypeOfInstancesFromOps(horizontalScaling, currPodSet, offlineInstances) + if notExistInstanceFromOps.Len() > 0 { + return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in the request is not exist`, strings.Join(notExistInstanceFromOps.UnsortedList(), ", "))) + } + + if policy, exist := opsRes.OpsRequest.Annotations[constant.HscaleValidatePolicyKey]; exist && policy != constant.HscaleValidatePolicyStrict { + return nil + } + + if err := hs.strictPolicyValidation(horizontalScaling, onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps); err != nil { + return err + } + + return nil +} + +// collecteAllTypeOfInstancesFromOps collects the online and offline instances specified in the request. +func (hs horizontalScalingOpsHandler) collecteAllTypeOfInstancesFromOps( + horizontalScaling opsv1alpha1.HorizontalScaling, + currPodSet map[string]string, + offlineInstances sets.Set[string]) (onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps, notExistInstanceFromOps sets.Set[string]) { + if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 { + notExistInstanceFromScaleIn := sets.Set[string]{} + onlinedInstanceFromScaleInOps, _, notExistInstanceFromScaleIn = hs.collectOnlineAndOfflineAndNotExistInstances( + horizontalScaling.ScaleIn.OnlineInstancesToOffline, + offlineInstances, + currPodSet) + if notExistInstanceFromScaleIn.Len() > 0 { + notExistInstanceFromOps = notExistInstanceFromOps.Union(notExistInstanceFromScaleIn) + } + } + if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 { + notExistInstanceFromScaleOut := sets.Set[string]{} + _, offlinedInstanceFromScaleOutOps, notExistInstanceFromScaleOut = hs.collectOnlineAndOfflineAndNotExistInstances( + horizontalScaling.ScaleOut.OfflineInstancesToOnline, + offlineInstances, + currPodSet) + if notExistInstanceFromScaleOut.Len() > 0 { + notExistInstanceFromOps = notExistInstanceFromOps.Union(notExistInstanceFromScaleOut) + } + } + return +} + +// collect the online and offline instances specified in the request. +func (hs horizontalScalingOpsHandler) collectOnlineAndOfflineAndNotExistInstances( + instance []string, + offlineInstances sets.Set[string], + currPodSet map[string]string) (sets.Set[string], sets.Set[string], sets.Set[string]) { + + offlinedInstanceFromOps := sets.Set[string]{} + onlinedInstanceFromOps := sets.Set[string]{} + notExistInstanceFromOps := sets.Set[string]{} + for _, insName := range instance { + if _, ok := offlineInstances[insName]; ok { + offlinedInstanceFromOps.Insert(insName) + continue + } + if _, ok := currPodSet[insName]; ok { + onlinedInstanceFromOps.Insert(insName) + continue + } + notExistInstanceFromOps.Insert(insName) + } + return onlinedInstanceFromOps, offlinedInstanceFromOps, notExistInstanceFromOps +} + +// check when setting strict validate policy +// if the instances specified in the request are not offline, return error. +// if the instances duplicate in the request, return error. +func (hs horizontalScalingOpsHandler) strictPolicyValidation( + horizontalScaling opsv1alpha1.HorizontalScaling, + onlinedInstanceFromScaleInOps, offlinedInstanceFromScaleOutOps sets.Set[string]) error { + + if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 { + if onlinedInstanceFromScaleInOps.Len() != len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) { + unscalablePods := getMissingElementsInSetFromList(onlinedInstanceFromScaleInOps, horizontalScaling.ScaleIn.OnlineInstancesToOffline) + if unscalablePods == nil { + return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates") + } + return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in onlineInstancesToOffline is not online or not exist`, strings.Join(unscalablePods, ", "))) + } + } + if horizontalScaling.ScaleOut != nil && len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) > 0 { + if offlinedInstanceFromScaleOutOps.Len() != len(horizontalScaling.ScaleOut.OfflineInstancesToOnline) { + unscalablePods := getMissingElementsInSetFromList(offlinedInstanceFromScaleOutOps, horizontalScaling.ScaleOut.OfflineInstancesToOnline) + if unscalablePods == nil { + return intctrlutil.NewFatalError("instances specified in onlineInstancesToOffline has duplicates") + } + return intctrlutil.NewFatalError(fmt.Sprintf(`instances "%s" specified in offlineInstancesToOnline is not offline or not exist`, strings.Join(unscalablePods, ", "))) + } + } + return nil +} diff --git a/pkg/operations/horizontal_scaling_test.go b/pkg/operations/horizontal_scaling_test.go index 0716ce18b495..ddf9705be0d2 100644 --- a/pkg/operations/horizontal_scaling_test.go +++ b/pkg/operations/horizontal_scaling_test.go @@ -21,6 +21,7 @@ package operations import ( "fmt" + "strings" "time" . "github.com/onsi/ginkgo/v2" @@ -86,7 +87,8 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { Context("Test OpsRequest", func() { commonHScaleConsensusCompTest := func(reqCtx intctrlutil.RequestCtx, changeClusterSpec func(cluster *appsv1.Cluster), - horizontalScaling opsv1alpha1.HorizontalScaling) (*OpsResource, []*corev1.Pod) { + horizontalScaling opsv1alpha1.HorizontalScaling, + hscaleValidatePolicy string) (*OpsResource, []*corev1.Pod) { By("init operations resources with CLusterDefinition/Hybrid components Cluster/consensus Pods") opsRes, _, _ := initOperationsResources(compDefName, clusterName) its := testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) @@ -99,7 +101,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { By("create opsRequest for horizontal scaling of consensus component") initClusterAnnotationAndPhaseForOps(opsRes) horizontalScaling.ComponentName = defaultCompName - opsRes.OpsRequest = createHorizontalScaling(clusterName, horizontalScaling) + opsRes.OpsRequest = createHorizontalScaling(clusterName, horizontalScaling, hscaleValidatePolicy) // set ops phase to Pending opsRes.OpsRequest.Status.Phase = opsv1alpha1.OpsPendingPhase mockComponentIsOperating(opsRes.Cluster, appsv1.UpdatingClusterCompPhase, defaultCompName) @@ -190,7 +192,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { horizontalScaling opsv1alpha1.HorizontalScaling, mockHScale func(podList []*corev1.Pod)) { reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx} - opsRes, podList := commonHScaleConsensusCompTest(reqCtx, changeClusterSpec, horizontalScaling) + opsRes, podList := commonHScaleConsensusCompTest(reqCtx, changeClusterSpec, horizontalScaling, constant.HscaleValidatePolicyStrict) mockHScale(podList) testapps.MockInstanceSetStatus(testCtx, opsRes.Cluster, defaultCompName) checkOpsRequestPhaseIsSucceed(reqCtx, opsRes) @@ -200,7 +202,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { horizontalScaling opsv1alpha1.HorizontalScaling, isScaleDown bool) { reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx} - opsRes, podList := commonHScaleConsensusCompTest(reqCtx, nil, horizontalScaling) + opsRes, podList := commonHScaleConsensusCompTest(reqCtx, nil, horizontalScaling, constant.HscaleValidatePolicyStrict) var pod *corev1.Pod if isScaleDown { By("delete the pod") @@ -289,9 +291,10 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { testHScaleWithSpecifiedPod := func(changeClusterSpec func(cluster *appsv1.Cluster), horizontalScaling opsv1alpha1.HorizontalScaling, expectOfflineInstances []string, - mockHScale func(podList []*corev1.Pod)) *OpsResource { + mockHScale func(podList []*corev1.Pod), + hscaleValidatePolicy string) *OpsResource { reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx} - opsRes, podList := commonHScaleConsensusCompTest(reqCtx, changeClusterSpec, horizontalScaling) + opsRes, podList := commonHScaleConsensusCompTest(reqCtx, changeClusterSpec, horizontalScaling, hscaleValidatePolicy) By("verify cluster spec is correct") targetSpec := opsRes.Cluster.Spec.GetComponentByName(defaultCompName) Expect(targetSpec.OfflineInstances).Should(HaveLen(len(expectOfflineInstances))) @@ -323,7 +326,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { }, offlineInstances, func(podList []*corev1.Pod) { By(fmt.Sprintf(`delete the specified pod "%s"`, toDeletePodName)) deletePods(podList[2]) - }) + }, constant.HscaleValidatePolicyStrict) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/1")) }) @@ -347,7 +350,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { deletePods(podList[2]) By("create a new pod(ordinal:2) by replicas") createPods("", 2) - }) + }, constant.HscaleValidatePolicyStrict) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("2/2")) }) @@ -365,7 +368,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { }, offlineInstances, func(podList []*corev1.Pod) { By("delete the specified pod " + offlineInstanceName) deletePods(podList[0]) - }) + }, constant.HscaleValidatePolicyStrict) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/1")) By("expect replicas to 2 and template " + insTplName + " replicas to 0") compSpec := opsRes.Cluster.Spec.GetComponentByName(defaultCompName) @@ -387,7 +390,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { }, []string{}, func(podList []*corev1.Pod) { By("create the specified pod " + offlineInstanceName) testapps.MockInstanceSetPod(&testCtx, nil, clusterName, defaultCompName, offlineInstanceName, "follower", "Readonly") - }) + }, constant.HscaleValidatePolicyStrict) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/1")) By("expect replicas to 4") compSpec := opsRes.Cluster.Spec.GetComponentByName(defaultCompName) @@ -415,7 +418,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { By(fmt.Sprintf(`create the pod "%s" which is removed from offlineInstances`, onlinePodName)) createPods("", 1) - }) + }, constant.HscaleValidatePolicyStrict) Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("2/2")) By("expect replicas to 3") Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(3)) @@ -439,7 +442,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { ScaleIn: &opsv1alpha1.ScaleIn{ ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(3)}, }, - }) + }, constant.HscaleValidatePolicyStrict) By("verify cluster spec is correct") var targetSpec *appsv1.ClusterComponentSpec for i := range opsRes.Cluster.Spec.ComponentSpecs { @@ -461,9 +464,9 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { testapps.MockInstanceSetStatus(testCtx, opsRes.Cluster, defaultCompName) checkOpsRequestPhaseIsSucceed(reqCtx, opsRes) }) - createOpsAndToCreatingPhase := func(reqCtx intctrlutil.RequestCtx, opsRes *OpsResource, horizontalScaling opsv1alpha1.HorizontalScaling) *opsv1alpha1.OpsRequest { + createOpsAndToCreatingPhase := func(reqCtx intctrlutil.RequestCtx, opsRes *OpsResource, horizontalScaling opsv1alpha1.HorizontalScaling, policy string) *opsv1alpha1.OpsRequest { horizontalScaling.ComponentName = defaultCompName - opsRes.OpsRequest = createHorizontalScaling(clusterName, horizontalScaling) + opsRes.OpsRequest = createHorizontalScaling(clusterName, horizontalScaling, policy) opsRes.OpsRequest.Spec.Force = true // set ops phase to Pending opsRes.OpsRequest.Status.Phase = opsv1alpha1.OpsPendingPhase @@ -480,24 +483,113 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { return opsRes.OpsRequest } - It("test offline the specified pod but it is not online", func() { + It("test offline the specified pod but it is not online with the ignore policy", func() { By("init operations resources with CLusterDefinition/Hybrid components Cluster/consensus Pods") opsRes, _, _ := initOperationsResources(compDefName, clusterName) testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) reqCtx := intctrlutil.RequestCtx{Ctx: ctx} - By("offline the specified pod but it is not online") + By("offline the specified pod but it is not exist, expect replicas not be changed") + offlineInsName := fmt.Sprintf("%s-%s-4", clusterName, defaultCompName) + _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ + ScaleIn: &opsv1alpha1.ScaleIn{ + OnlineInstancesToOffline: []string{offlineInsName}, + }, + }, constant.HscaleValidatePolicyIgnore) + By("expect replicas not be changed") + Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase), fmt.Sprintf("info: %v", opsRes.OpsRequest)) + }) + + It("test offline the specified pod but it is not exist", func() { + By("init operations resources with CLusterDefinition/ClusterVersion/Hybrid components Cluster/consensus Pods") + opsRes, _, _ := initOperationsResources(compDefName, clusterName) + testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + + By("offline the specified pod but it is not exist") offlineInsName := fmt.Sprintf("%s-%s-4", clusterName, defaultCompName) _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ ScaleIn: &opsv1alpha1.ScaleIn{ ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}, OnlineInstancesToOffline: []string{offlineInsName}, }, - }) + }, constant.HscaleValidatePolicyStrict) Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase)) conditions := opsRes.OpsRequest.Status.Conditions + unscalablePods := []string{offlineInsName} Expect(conditions[len(conditions)-1].Message).Should(ContainSubstring( - fmt.Sprintf(`instance "%s" specified in onlineInstancesToOffline is not online`, offlineInsName))) + fmt.Sprintf(`instances "%s" specified in the request is not exist`, strings.Join(unscalablePods, ", ")))) + }) + + It("test offline two specified pods with same pod name with ignore policy", func() { + By("init operations resources with CLusterDefinition/ClusterVersion/Hybrid components Cluster/consensus Pods") + opsRes, _, _ := initOperationsResources(compDefName, clusterName) + testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + testPodName := fmt.Sprintf("%s-%s-1", clusterName, defaultCompName) + + By("offline two pod with same pod name") + _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ + ScaleIn: &opsv1alpha1.ScaleIn{ + OnlineInstancesToOffline: []string{testPodName, testPodName}, + }, + }, constant.HscaleValidatePolicyIgnore) + Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsCreatingPhase)) + Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(2)) + // expect the not exist pod still in opsRequest + onlineToOfflineInstances := opsRes.OpsRequest.Spec.HorizontalScalingList[0].ScaleIn.OnlineInstancesToOffline + Expect(onlineToOfflineInstances).Should(Equal([]string{testPodName, testPodName}), fmt.Sprintf("info: %v", opsRes.OpsRequest)) + // expect for opsRequest phase is Succeed after pods has been scaled and component phase is Running + checkOpsRequestPhaseIsSucceed(reqCtx, opsRes) + }) + + It("test online two specified pods with same pod name with ignore policy", func() { + By("init operations resources with CLusterDefinition/ClusterVersion/Hybrid components Cluster/consensus Pods") + By("init operations resources with CLusterDefinition/ClusterVersion/Hybrid components Cluster/consensus Pods") + opsRes, _, _ := initOperationsResources(compDefName, clusterName) + testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) + reqCtx := intctrlutil.RequestCtx{Ctx: ctx} + testPodName := fmt.Sprintf("%s-%s-1", clusterName, defaultCompName) + + By("offline two pod with same pod name") + _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ + ScaleIn: &opsv1alpha1.ScaleIn{ + OnlineInstancesToOffline: []string{testPodName, testPodName}, + }, + }, constant.HscaleValidatePolicyIgnore) + Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsCreatingPhase)) + Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(2)) + By("expect the not exist pod still in opsRequest") + onlineToOfflineInstances := opsRes.OpsRequest.Spec.HorizontalScalingList[0].ScaleIn.OnlineInstancesToOffline + Expect(onlineToOfflineInstances).Should(Equal([]string{testPodName, testPodName}), fmt.Sprintf("info: %v", opsRes.OpsRequest)) + By("expect for opsRequest phase is Succeed after pods has been scaled and component phase is Running") + checkOpsRequestPhaseIsSucceed(reqCtx, opsRes) + Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("1/1"), fmt.Sprintf("info: %v", opsRes.OpsRequest)) + + }) + + It("test offline and online two pods in the same time with the ignore policy", func() { + onlinePodName := fmt.Sprintf("%s-%s-1", clusterName, defaultCompName) + offlinePodName := fmt.Sprintf("%s-%s-%s-0", clusterName, defaultCompName, insTplName) + opsRes := testHScaleWithSpecifiedPod(func(cluster *appsv1.Cluster) { + setClusterCompSpec(cluster, []appsv1.InstanceTemplate{ + {Name: insTplName, Replicas: pointer.Int32(1)}, + }, []string{onlinePodName}) + }, opsv1alpha1.HorizontalScaling{ + ScaleIn: &opsv1alpha1.ScaleIn{ + OnlineInstancesToOffline: []string{offlinePodName}, + }, + ScaleOut: &opsv1alpha1.ScaleOut{ + OfflineInstancesToOnline: []string{onlinePodName}, + }, + }, []string{offlinePodName}, func(podList []*corev1.Pod) { + By(fmt.Sprintf(`delete the specified pod"%s"`, offlinePodName)) + deletePods(podList[0]) + + By(fmt.Sprintf(`create the pod "%s" which is removed from offlineInstances`, onlinePodName)) + createPods("", 1) + }, constant.HscaleValidatePolicyIgnore) + Expect(opsRes.OpsRequest.Status.Progress).Should(Equal("2/2")) }) It("test run multi horizontalScaling opsRequest with force flag", func() { @@ -508,13 +600,13 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { By("create first opsRequest to add 1 replicas with `scaleOut` field and expect replicas to 4") createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ ScaleOut: &opsv1alpha1.ScaleOut{ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}}, - }) + }, constant.HscaleValidatePolicyStrict) Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(4)) By("create secondary opsRequest to add 1 replicas with `replicasToAdd` field and expect replicas to 5") createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ ScaleOut: &opsv1alpha1.ScaleOut{ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}}, - }) + }, constant.HscaleValidatePolicyStrict) Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(5)) By("create third opsRequest to offline a pod which is created by another running opsRequest and expect it to fail") @@ -524,7 +616,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}, OnlineInstancesToOffline: []string{offlineInsName}, }, - }) + }, constant.HscaleValidatePolicyStrict) Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase)) conditions := opsRes.OpsRequest.Status.Conditions Expect(conditions[len(conditions)-1].Message).Should(ContainSubstring(fmt.Sprintf(`instance "%s" cannot be taken offline as it has been created by another running opsRequest`, offlineInsName))) @@ -532,7 +624,7 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { By("create a opsRequest to delete 1 replicas which is created by another running opsRequest and expect it to fail") _ = createOpsAndToCreatingPhase(reqCtx, opsRes, opsv1alpha1.HorizontalScaling{ ScaleIn: &opsv1alpha1.ScaleIn{ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}}, - }) + }, constant.HscaleValidatePolicyStrict) Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase)) conditions = opsRes.OpsRequest.Status.Conditions Expect(conditions[len(conditions)-1].Message).Should(ContainSubstring(`cannot be taken offline as it has been created by another running opsRequest`)) @@ -540,13 +632,15 @@ var _ = Describe("HorizontalScaling OpsRequest", func() { }) }) -func createHorizontalScaling(clusterName string, horizontalScaling opsv1alpha1.HorizontalScaling) *opsv1alpha1.OpsRequest { +func createHorizontalScaling(clusterName string, horizontalScaling opsv1alpha1.HorizontalScaling, policy string) *opsv1alpha1.OpsRequest { horizontalOpsName := "horizontal-scaling-ops-" + testCtx.GetRandomStr() ops := testops.NewOpsRequestObj(horizontalOpsName, testCtx.DefaultNamespace, clusterName, opsv1alpha1.HorizontalScalingType) ops.Spec.HorizontalScalingList = []opsv1alpha1.HorizontalScaling{ horizontalScaling, } + ops.Annotations = map[string]string{} + ops.Annotations[constant.HscaleValidatePolicyKey] = policy opsRequest := testops.CreateOpsRequest(ctx, testCtx, ops) opsRequest.Status.Phase = opsv1alpha1.OpsPendingPhase return opsRequest diff --git a/pkg/operations/ops_progress_util_test.go b/pkg/operations/ops_progress_util_test.go index 75c49448ed86..09760d27681d 100644 --- a/pkg/operations/ops_progress_util_test.go +++ b/pkg/operations/ops_progress_util_test.go @@ -131,7 +131,7 @@ var _ = Describe("Ops ProgressDetails", func() { ReplicaChanges: pointer.Int32(2), }, }, - }) + }, constant.HscaleValidatePolicyStrict) mockComponentIsOperating(opsRes.Cluster, appsv1.UpdatingClusterCompPhase, defaultCompName) // appsv1.HorizontalScalingPhase initClusterForOps(opsRes) @@ -188,7 +188,7 @@ var _ = Describe("Ops ProgressDetails", func() { ReplicaChanges: pointer.Int32(1), }, }, - }) + }, constant.HscaleValidatePolicyStrict) mockComponentIsOperating(opsRes.Cluster, appsv1.UpdatingClusterCompPhase, defaultCompName) // appsv1.HorizontalScalingPhase initClusterForOps(opsRes) diff --git a/pkg/operations/ops_util.go b/pkg/operations/ops_util.go index b188c34d74de..033a7f55f672 100644 --- a/pkg/operations/ops_util.go +++ b/pkg/operations/ops_util.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" @@ -323,3 +324,13 @@ func getComponentSpecOrShardingTemplate(cluster *appsv1.Cluster, componentName s } return nil } + +func getMissingElementsInSetFromList(set sets.Set[string], list []string) []string { + var diff []string + for _, v := range list { + if !set.Has(v) { + diff = append(diff, v) + } + } + return diff +} diff --git a/pkg/operations/ops_util_test.go b/pkg/operations/ops_util_test.go index 2c78bbd59728..85e8d3a026f8 100644 --- a/pkg/operations/ops_util_test.go +++ b/pkg/operations/ops_util_test.go @@ -87,7 +87,7 @@ var _ = Describe("OpsUtil functions", func() { ReplicaChanges: pointer.Int32(2), }, }, - }) + }, constant.HscaleValidatePolicyStrict) Expect(patchValidateErrorCondition(ctx, k8sClient, opsRes, "validate error")).Should(Succeed()) Expect(PatchOpsHandlerNotSupported(ctx, k8sClient, opsRes)).Should(Succeed()) Expect(isOpsRequestFailedPhase(opsv1alpha1.OpsFailedPhase)).Should(BeTrue()) @@ -209,7 +209,7 @@ var _ = Describe("OpsUtil functions", func() { ReplicaChanges: pointer.Int32(1), }, }, - }) + }, constant.HscaleValidatePolicyStrict) opsRes.OpsRequest = ops _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) Expect(err).ShouldNot(HaveOccurred()) @@ -266,7 +266,7 @@ var _ = Describe("OpsUtil functions", func() { ScaleIn: &opsv1alpha1.ScaleIn{ ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}, }, - }) + }, constant.HscaleValidatePolicyStrict) opsRes.OpsRequest = ops1 _, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes) Expect(err).ShouldNot(HaveOccurred()) @@ -278,7 +278,7 @@ var _ = Describe("OpsUtil functions", func() { ScaleOut: &opsv1alpha1.ScaleOut{ ReplicaChanger: opsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)}, }, - }) + }, constant.HscaleValidatePolicyStrict) ops2.Annotations = map[string]string{constant.OpsDependentOnSuccessfulOpsAnnoKey: ops1.Name} ops2.Spec.Force = true opsRes.OpsRequest = ops2 @@ -329,7 +329,7 @@ var _ = Describe("OpsUtil functions", func() { ReplicaChanges: pointer.Int32(1), }, }, - }) + }, constant.HscaleValidatePolicyStrict) reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx} _, _ = GetOpsManager().Do(reqCtx, k8sClient, opsRes) Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsFailedPhase)) @@ -342,7 +342,7 @@ var _ = Describe("OpsUtil functions", func() { ReplicaChanges: pointer.Int32(1), }, }, - }) + }, constant.HscaleValidatePolicyStrict) opsRes.OpsRequest.Spec.Force = true opsRes.OpsRequest.Spec.EnqueueOnForce = true opsRes.OpsRequest.Status.Phase = opsv1alpha1.OpsPendingPhase