diff --git a/apis/workloads/v1alpha1/instanceset_types.go b/apis/workloads/v1alpha1/instanceset_types.go index 48f29072f42..68f9dcd1f1e 100644 --- a/apis/workloads/v1alpha1/instanceset_types.go +++ b/apis/workloads/v1alpha1/instanceset_types.go @@ -605,6 +605,10 @@ const ( // Or, a NotReady reason with not ready instances encoded in the Message filed will be set. InstanceReady ConditionType = "InstanceReady" + // InstanceAvailable ConditionStatus will be True if all instances(pods) are in the ready condition + // and continue for "MinReadySeconds" seconds. Otherwise, it will be set to False. + InstanceAvailable ConditionType = "InstanceAvailable" + // InstanceFailure is added in an instance set when at least one of its instances(pods) is in a `Failed` phase. InstanceFailure ConditionType = "InstanceFailure" ) @@ -616,6 +620,12 @@ const ( // ReasonReady is a reason for condition InstanceReady. ReasonReady = "Ready" + // ReasonNotAvailable is a reason for condition InstanceAvailable. + ReasonNotAvailable = "NotAvailable" + + // ReasonAvailable is a reason for condition InstanceAvailable. + ReasonAvailable = "Available" + // ReasonInstanceFailure is a reason for condition InstanceFailure. ReasonInstanceFailure = "InstanceFailure" ) diff --git a/controllers/apps/operations/ops_progress_util.go b/controllers/apps/operations/ops_progress_util.go index ffa29b43fd4..ccfd0d7fa46 100644 --- a/controllers/apps/operations/ops_progress_util.go +++ b/controllers/apps/operations/ops_progress_util.go @@ -456,6 +456,7 @@ func handleScaleOutProgressWithInstanceSet( compStatus *appsv1alpha1.OpsRequestComponentStatus) (completedCount int32, err error) { currPodRevisionMap, _ := instanceset.GetRevisions(its.Status.CurrentRevisions) notReadyPodSet := instanceset.GetPodNameSetFromInstanceSetCondition(its, workloads.InstanceReady) + notAvailablePodSet := instanceset.GetPodNameSetFromInstanceSetCondition(its, workloads.InstanceAvailable) failurePodSet := instanceset.GetPodNameSetFromInstanceSetCondition(its, workloads.InstanceFailure) pgRes.opsMessageKey = "Create" memberStatusMap := map[string]sets.Empty{} @@ -479,6 +480,9 @@ func handleScaleOutProgressWithInstanceSet( updateProgressDetailForHScale(opsRes, pgRes, compStatus, objectKey, appsv1alpha1.ProcessingProgressStatus) continue } + if _, ok := notAvailablePodSet[podName]; ok { + continue + } if _, ok := memberStatusMap[podName]; !ok && needToCheckRole(pgRes) { continue } diff --git a/controllers/apps/opsrequest_controller.go b/controllers/apps/opsrequest_controller.go index 4f508caaa69..58ebe761777 100644 --- a/controllers/apps/opsrequest_controller.go +++ b/controllers/apps/opsrequest_controller.go @@ -246,7 +246,9 @@ func (r *OpsRequestReconciler) reconcileStatusDuringRunningOrCanceling(reqCtx in opsRequest := opsRes.OpsRequest // wait for OpsRequest.status.phase to Succeed if requeueAfter, err := operations.GetOpsManager().Reconcile(reqCtx, r.Client, opsRes); err != nil { - r.Recorder.Eventf(opsRequest, corev1.EventTypeWarning, reasonOpsReconcileStatusFailed, "Failed to reconcile the status of OpsRequest: %s", err.Error()) + if !apierrors.IsConflict(err) { + r.Recorder.Eventf(opsRequest, corev1.EventTypeWarning, reasonOpsReconcileStatusFailed, "Failed to reconcile the status of OpsRequest: %s", err.Error()) + } return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")) } else if requeueAfter != 0 { // if the reconcileAction need requeue, do it @@ -300,7 +302,9 @@ func (r *OpsRequestReconciler) doOpsRequestAction(reqCtx intctrlutil.RequestCtx, opsDeepCopy := opsRequest.DeepCopy() res, err := operations.GetOpsManager().Do(reqCtx, r.Client, opsRes) if err != nil { - r.Recorder.Eventf(opsRequest, corev1.EventTypeWarning, reasonOpsDoActionFailed, "Failed to process the operation of OpsRequest: %s", err.Error()) + if !apierrors.IsConflict(err) { + r.Recorder.Eventf(opsRequest, corev1.EventTypeWarning, reasonOpsDoActionFailed, "Failed to process the operation of OpsRequest: %s", err.Error()) + } if !reflect.DeepEqual(opsRequest.Status, opsDeepCopy.Status) { if patchErr := r.Client.Status().Patch(reqCtx.Ctx, opsRequest, client.MergeFrom(opsDeepCopy)); patchErr != nil { return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")) diff --git a/controllers/apps/transformer_component_workload.go b/controllers/apps/transformer_component_workload.go index 4ba7f2d5bf2..dd2edfb332c 100644 --- a/controllers/apps/transformer_component_workload.go +++ b/controllers/apps/transformer_component_workload.go @@ -300,6 +300,7 @@ func copyAndMergeITS(oldITS, newITS *workloads.InstanceSet, synthesizeComp *comp itsObjCopy.Spec.Credential = itsProto.Spec.Credential itsObjCopy.Spec.Instances = itsProto.Spec.Instances itsObjCopy.Spec.OfflineInstances = itsProto.Spec.OfflineInstances + itsObjCopy.Spec.MinReadySeconds = itsProto.Spec.MinReadySeconds if itsProto.Spec.UpdateStrategy.Type != "" || itsProto.Spec.UpdateStrategy.RollingUpdate != nil { updateUpdateStrategy(itsObjCopy, itsProto) diff --git a/controllers/workloads/instanceset_controller.go b/controllers/workloads/instanceset_controller.go index 205784c0707..9fae154aa67 100644 --- a/controllers/workloads/instanceset_controller.go +++ b/controllers/workloads/instanceset_controller.go @@ -90,8 +90,11 @@ func (r *InstanceSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) Do(instanceset.NewReplicasAlignmentReconciler()). Do(instanceset.NewUpdateReconciler()). Commit() + if re, ok := err.(intctrlutil.DelayedRequeueError); ok { + return intctrlutil.RequeueAfter(re.RequeueAfter(), logger, re.Reason()) + } requeue := false - if err != nil && apierrors.IsConflict(err) { + if apierrors.IsConflict(err) { requeue = true err = nil } diff --git a/docs/developer_docs/api-reference/cluster.md b/docs/developer_docs/api-reference/cluster.md index 7d322f73c6a..cbd2461e970 100644 --- a/docs/developer_docs/api-reference/cluster.md +++ b/docs/developer_docs/api-reference/cluster.md @@ -24064,7 +24064,11 @@ string Description -

"InstanceFailure"

+

"InstanceAvailable"

+

InstanceAvailable ConditionStatus will be True if all instances(pods) are in the ready condition +and continue for “MinReadySeconds” seconds. Otherwise, it will be set to False.

+ +

"InstanceFailure"

InstanceFailure is added in an instance set when at least one of its instances(pods) is in a Failed phase.

"InstanceReady"

diff --git a/pkg/controller/instanceset/reconciler_status.go b/pkg/controller/instanceset/reconciler_status.go index b760b0d7b3e..eafb49ce010 100644 --- a/pkg/controller/instanceset/reconciler_status.go +++ b/pkg/controller/instanceset/reconciler_status.go @@ -21,6 +21,7 @@ package instanceset import ( "encoding/json" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" @@ -68,6 +69,7 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (*kubebuilde currentReplicas, updatedReplicas := int32(0), int32(0) readyReplicas, availableReplicas := int32(0), int32(0) notReadyNames := sets.New[string]() + notAvailableNames := sets.New[string]() currentRevisions := map[string]string{} for _, pod := range podList { currentRevisions[pod.Name] = getPodRevision(pod) @@ -80,6 +82,8 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (*kubebuilde notReadyNames.Delete(pod.Name) if isRunningAndAvailable(pod, its.Spec.MinReadySeconds) { availableReplicas++ + } else { + notAvailableNames.Insert(pod.Name) } } if isCreated(pod) && !isTerminating(pod) { @@ -116,6 +120,12 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (*kubebuilde } meta.SetStatusCondition(&its.Status.Conditions, *readyCondition) + availableCondition, err := buildAvailableCondition(its, availableReplicas >= replicas, notAvailableNames) + if err != nil { + return nil, err + } + meta.SetStatusCondition(&its.Status.Conditions, *availableCondition) + // 3. set InstanceFailure condition failureCondition, err := buildFailureCondition(its, podList) if err != nil { @@ -134,9 +144,19 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (*kubebuilde // TODO(free6om): should put this field to the spec setReadyWithPrimary(its, podList) + if its.Spec.MinReadySeconds > 0 && availableReplicas != readyReplicas { + return tree, intctrlutil.NewDelayedRequeueError(time.Second, "requeue for right status update") + } return tree, nil } +func buildConditionMessageWithNames(podNames []string) ([]byte, error) { + baseSort(podNames, func(i int) (string, int) { + return ParseParentNameAndOrdinal(podNames[i]) + }, nil, true) + return json.Marshal(podNames) +} + func buildReadyCondition(its *workloads.InstanceSet, ready bool, notReadyNames sets.Set[string]) (*metav1.Condition, error) { condition := &metav1.Condition{ Type: string(workloads.InstanceReady), @@ -147,11 +167,26 @@ func buildReadyCondition(its *workloads.InstanceSet, ready bool, notReadyNames s if !ready { condition.Status = metav1.ConditionFalse condition.Reason = workloads.ReasonNotReady - names := notReadyNames.UnsortedList() - baseSort(names, func(i int) (string, int) { - return ParseParentNameAndOrdinal(names[i]) - }, nil, true) - message, err := json.Marshal(names) + message, err := buildConditionMessageWithNames(notReadyNames.UnsortedList()) + if err != nil { + return nil, err + } + condition.Message = string(message) + } + return condition, nil +} + +func buildAvailableCondition(its *workloads.InstanceSet, available bool, notAvailableNames sets.Set[string]) (*metav1.Condition, error) { + condition := &metav1.Condition{ + Type: string(workloads.InstanceAvailable), + Status: metav1.ConditionTrue, + ObservedGeneration: its.Generation, + Reason: workloads.ReasonAvailable, + } + if !available { + condition.Status = metav1.ConditionFalse + condition.Reason = workloads.ReasonNotAvailable + message, err := buildConditionMessageWithNames(notAvailableNames.UnsortedList()) if err != nil { return nil, err } @@ -180,10 +215,7 @@ func buildFailureCondition(its *workloads.InstanceSet, pods []*corev1.Pod) (*met if len(failureNames) == 0 { return nil, nil } - baseSort(failureNames, func(i int) (string, int) { - return ParseParentNameAndOrdinal(failureNames[i]) - }, nil, true) - message, err := json.Marshal(failureNames) + message, err := buildConditionMessageWithNames(failureNames) if err != nil { return nil, err } diff --git a/pkg/controller/instanceset/reconciler_status_test.go b/pkg/controller/instanceset/reconciler_status_test.go index 160b8d488d3..76ba6ea6f8c 100644 --- a/pkg/controller/instanceset/reconciler_status_test.go +++ b/pkg/controller/instanceset/reconciler_status_test.go @@ -33,6 +33,7 @@ import ( workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" "github.com/apecloud/kubeblocks/pkg/controller/builder" "github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx" + intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" ) var _ = Describe("status reconciler test", func() { @@ -109,12 +110,15 @@ var _ = Describe("status reconciler test", func() { condition := corev1.PodCondition{ Type: corev1.PodReady, Status: corev1.ConditionTrue, - LastTransitionTime: metav1.NewTime(time.Now().Add(-1 * minReadySeconds * time.Second)), + LastTransitionTime: metav1.NewTime(time.Now()), } - makePodAvailableWithRevision := func(pod *corev1.Pod, revision string) { + makePodAvailableWithRevision := func(pod *corev1.Pod, revision string, updatePodAvailable bool) { pod.Labels[appsv1.ControllerRevisionHashLabelKey] = revision pod.Status.Phase = corev1.PodRunning - pod.Status.Conditions = append(pod.Status.Conditions, condition) + if updatePodAvailable { + condition.LastTransitionTime = metav1.NewTime(time.Now().Add(-1 * minReadySeconds * time.Second)) + } + pod.Status.Conditions = []corev1.PodCondition{condition} } pods := newTree.List(&corev1.Pod{}) currentRevisionMap := map[string]string{} @@ -122,18 +126,20 @@ var _ = Describe("status reconciler test", func() { for _, object := range pods { pod, ok := object.(*corev1.Pod) Expect(ok).Should(BeTrue()) - makePodAvailableWithRevision(pod, oldRevision) + makePodAvailableWithRevision(pod, oldRevision, false) currentRevisionMap[pod.Name] = oldRevision } _, err = reconciler.Reconcile(newTree) - Expect(err).Should(BeNil()) + Expect(intctrlutil.IsDelayedRequeueError(err)).Should(BeTrue()) Expect(its.Status.Replicas).Should(BeEquivalentTo(replicas)) Expect(its.Status.ReadyReplicas).Should(BeEquivalentTo(replicas)) - Expect(its.Status.AvailableReplicas).Should(BeEquivalentTo(replicas)) + Expect(its.Status.AvailableReplicas).Should(BeEquivalentTo(0)) Expect(its.Status.UpdatedReplicas).Should(BeEquivalentTo(0)) Expect(its.Status.CurrentReplicas).Should(BeEquivalentTo(replicas)) currentRevisions, _ := buildRevisions(currentRevisionMap) Expect(its.Status.CurrentRevisions).Should(Equal(currentRevisions)) + Expect(its.Status.Conditions[1].Type).Should(BeEquivalentTo(workloads.InstanceAvailable)) + Expect(its.Status.Conditions[1].Status).Should(BeEquivalentTo(corev1.ConditionFalse)) By("make all pods available with latest revision") updateRevisions, err := GetRevisions(its.Status.UpdateRevisions) @@ -141,7 +147,7 @@ var _ = Describe("status reconciler test", func() { for _, object := range pods { pod, ok := object.(*corev1.Pod) Expect(ok).Should(BeTrue()) - makePodAvailableWithRevision(pod, updateRevisions[pod.Name]) + makePodAvailableWithRevision(pod, updateRevisions[pod.Name], true) } _, err = reconciler.Reconcile(newTree) Expect(err).Should(BeNil()) @@ -151,7 +157,9 @@ var _ = Describe("status reconciler test", func() { Expect(its.Status.UpdatedReplicas).Should(BeEquivalentTo(replicas)) Expect(its.Status.CurrentReplicas).Should(BeEquivalentTo(replicas)) Expect(its.Status.CurrentRevisions).Should(Equal(its.Status.UpdateRevisions)) - Expect(its.Status.Conditions).Should(HaveLen(1)) + Expect(its.Status.Conditions).Should(HaveLen(2)) + Expect(its.Status.Conditions[1].Type).Should(BeEquivalentTo(workloads.InstanceAvailable)) + Expect(its.Status.Conditions[1].Status).Should(BeEquivalentTo(corev1.ConditionTrue)) By("make all pods failed") for _, object := range pods { @@ -167,7 +175,7 @@ var _ = Describe("status reconciler test", func() { Expect(its.Status.UpdatedReplicas).Should(BeEquivalentTo(replicas)) Expect(its.Status.CurrentReplicas).Should(BeEquivalentTo(replicas)) Expect(its.Status.CurrentRevisions).Should(Equal(its.Status.UpdateRevisions)) - Expect(its.Status.Conditions).Should(HaveLen(2)) + Expect(its.Status.Conditions).Should(HaveLen(3)) failureNames := []string{"bar-0", "bar-1", "bar-2", "bar-3", "bar-foo-0", "bar-foo-1", "bar-hello-0"} message, err := json.Marshal(failureNames) Expect(err).Should(BeNil()) @@ -175,9 +183,9 @@ var _ = Describe("status reconciler test", func() { Expect(its.Status.Conditions[0].Status).Should(BeEquivalentTo(metav1.ConditionFalse)) Expect(its.Status.Conditions[0].Reason).Should(BeEquivalentTo(workloads.ReasonNotReady)) Expect(its.Status.Conditions[0].Message).Should(BeEquivalentTo(message)) - Expect(its.Status.Conditions[1].Type).Should(BeEquivalentTo(workloads.InstanceFailure)) - Expect(its.Status.Conditions[1].Reason).Should(BeEquivalentTo(workloads.ReasonInstanceFailure)) - Expect(its.Status.Conditions[1].Message).Should(BeEquivalentTo(message)) + Expect(its.Status.Conditions[2].Type).Should(BeEquivalentTo(workloads.InstanceFailure)) + Expect(its.Status.Conditions[2].Reason).Should(BeEquivalentTo(workloads.ReasonInstanceFailure)) + Expect(its.Status.Conditions[2].Message).Should(BeEquivalentTo(message)) }) }) diff --git a/pkg/controller/kubebuilderx/controller.go b/pkg/controller/kubebuilderx/controller.go index a9c86d38f47..83d18a62d32 100644 --- a/pkg/controller/kubebuilderx/controller.go +++ b/pkg/controller/kubebuilderx/controller.go @@ -30,7 +30,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/apecloud/kubeblocks/pkg/controller/graph" + intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" ) // TODO(free6om): this is a new reconciler framework in the very early stage leaving the following tasks to do: @@ -100,22 +100,23 @@ func (c *controller) Do(reconcilers ...Reconciler) Controller { func (c *controller) Commit() error { defer c.emitFailureEvent() - if c.err != nil { + if c.err != nil && !intctrlutil.IsDelayedRequeueError(c.err) { return c.err } if c.oldTree.GetRoot() == nil { return nil } builder := NewPlanBuilder(c.ctx, c.cli, c.oldTree, c.tree, c.recorder, c.logger) - if c.err = builder.Init(); c.err != nil { - return c.err + if err := builder.Init(); err != nil { + return err } - var plan graph.Plan - plan, c.err = builder.Build() - if c.err != nil { - return c.err + plan, err := builder.Build() + if err != nil { + return err + } + if err = plan.Execute(); err != nil { + return err } - c.err = plan.Execute() return c.err }