Skip to content

Commit

Permalink
fix: enhance the member cluster leave flow (#865)
Browse files Browse the repository at this point in the history
Co-authored-by: Ryan Zhang <zhangryan@microsoft.com>
  • Loading branch information
ryanzhang-oss and Ryan Zhang authored Jul 2, 2024
1 parent 3b31d46 commit fccc470
Show file tree
Hide file tree
Showing 6 changed files with 467 additions and 86 deletions.
151 changes: 101 additions & 50 deletions pkg/controllers/membercluster/v1beta1/membercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"reflect"
"time"

"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -33,6 +34,7 @@ import (
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller"
)

Expand Down Expand Up @@ -74,38 +76,38 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim

var mc clusterv1beta1.MemberCluster
if err := r.Client.Get(ctx, req.NamespacedName, &mc); err != nil {
klog.ErrorS(err, "failed to get member cluster", "memberCluster", req.Name)
klog.ErrorS(err, "Failed to get member cluster", "memberCluster", req.Name)
return runtime.Result{}, client.IgnoreNotFound(err)
}
mcObjRef := klog.KObj(&mc)

// Handle deleting/leaving member cluster, garbage collect all the resources in the cluster namespace
if !mc.DeletionTimestamp.IsZero() {
klog.V(2).InfoS("the member cluster is leaving", "memberCluster", mcObjRef)
klog.V(2).InfoS("The member cluster is leaving", "memberCluster", mcObjRef)
return r.handleDelete(ctx, mc.DeepCopy())
}

// Add the finalizer to the member cluster
if err := r.ensureFinalizer(ctx, &mc); err != nil {
klog.ErrorS(err, "failed to add the finalizer to member cluster", "memberCluster", mcObjRef)
klog.ErrorS(err, "Failed to add the finalizer to member cluster", "memberCluster", mcObjRef)
return runtime.Result{}, err
}
currentIMC, err := r.getInternalMemberCluster(ctx, mc.GetName())
if err != nil {
return runtime.Result{}, err
}
if err := r.join(ctx, &mc, currentIMC); err != nil {
klog.ErrorS(err, "failed to join", "memberCluster", mcObjRef)
klog.ErrorS(err, "Failed to join", "memberCluster", mcObjRef)
return runtime.Result{}, err
}

// Copy status from InternalMemberCluster to MemberCluster.
r.syncInternalMemberClusterStatus(currentIMC, &mc)
if err := r.updateMemberClusterStatus(ctx, &mc); err != nil {
if apierrors.IsConflict(err) {
klog.V(2).InfoS("failed to update status due to conflicts", "memberCluster", mcObjRef)
klog.V(2).InfoS("Failed to update status due to conflicts", "memberCluster", mcObjRef)
} else {
klog.ErrorS(err, "failed to update status", "memberCluster", mcObjRef)
klog.ErrorS(err, "Failed to update status", "memberCluster", mcObjRef)
}
return runtime.Result{}, client.IgnoreNotFound(err)
}
Expand All @@ -121,86 +123,135 @@ func (r *Reconciler) handleDelete(ctx context.Context, mc *clusterv1beta1.Member
klog.V(2).InfoS("No need to do anything for the deleting member cluster without a finalizer", "memberCluster", mcObjRef)
return runtime.Result{}, nil
}
currentImc, err := r.getInternalMemberCluster(ctx, mc.GetName())
if err != nil {
return runtime.Result{}, err
// check if the namespace still exist
var currentNS corev1.Namespace
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, mc.Name)
if err := r.Client.Get(ctx, types.NamespacedName{Name: namespaceName}, &currentNS); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to get the member cluster namespace", "memberCluster", mcObjRef)
return runtime.Result{}, controller.NewAPIServerError(true, err)
}
klog.V(2).InfoS("The member cluster namespace is not found, remove the finalizer", "memberCluster", mcObjRef)
controllerutil.RemoveFinalizer(mc, placementv1beta1.MemberClusterFinalizer)
return runtime.Result{}, controller.NewUpdateIgnoreConflictError(r.Update(ctx, mc))
}
// check if the namespace is being deleted already, just wait for it to be deleted
if !currentNS.DeletionTimestamp.IsZero() {
klog.V(2).InfoS("The member cluster namespace is still being deleted", "memberCluster", mcObjRef, "deleteTimestamp", currentNS.DeletionTimestamp)
var stuckErr error
if time.Now().After(currentNS.DeletionTimestamp.Add(5 * time.Minute)) {
// alert if the namespace is stuck in deleting for more than 5 minutes
stuckErr = controller.NewUnexpectedBehaviorError(fmt.Errorf("the member cluster namespace %s has been deleting since %s", namespaceName, currentNS.DeletionTimestamp.Format(time.RFC3339)))
}
return runtime.Result{RequeueAfter: time.Second}, stuckErr
}
currentImc := &clusterv1beta1.InternalMemberCluster{}
imcNamespacedName := types.NamespacedName{Namespace: namespaceName, Name: mc.Name}
if err := r.Client.Get(ctx, imcNamespacedName, currentImc); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to get internal member cluster", "internalMemberCluster", imcNamespacedName)
return runtime.Result{}, controller.NewAPIServerError(true, err)
}
// this is possible since we garbage collect the internal member cluster first before deleting the MC
klog.V(2).Info("InternalMemberCluster not found, start garbage collecting", "memberCluster", mcObjRef)
return runtime.Result{Requeue: true}, r.garbageCollect(ctx, mc)
}
// calculate the current status of the member cluster from imc status
r.syncInternalMemberClusterStatus(currentImc, mc)
cond := meta.FindStatusCondition(mc.Status.Conditions, string(clusterv1beta1.AgentJoined))
// cluster already left
if cond != nil && cond.Status == metav1.ConditionFalse && cond.ObservedGeneration == mc.GetGeneration() {
// TODO: check the last heartbeat time from all agents and assume the member cluster is left if they haven't sent heartbeat
// beyond a pre-agreed threshold.
// check if the cluster is already left
mcJoinedCondition := meta.FindStatusCondition(mc.Status.Conditions, string(clusterv1beta1.ConditionTypeMemberClusterJoined))
if condition.IsConditionStatusFalse(mcJoinedCondition, mc.GetGeneration()) {
klog.V(2).InfoS("Agent already left, start garbage collecting", "memberCluster", mcObjRef)
return r.garbageCollectWork(ctx, mc)
if gcErr := r.garbageCollect(ctx, mc); gcErr != nil {
return runtime.Result{}, gcErr
}
return runtime.Result{Requeue: true}, controller.NewUpdateIgnoreConflictError(r.updateMemberClusterStatus(ctx, mc))
}
klog.V(2).InfoS("Need to wait for agent to leave", "memberCluster", mcObjRef, "agentJoinedCondition", cond)
// mark the imc as left again to make sure the agent is leaving the fleet
klog.V(2).InfoS("Need to wait for the agent to leave", "memberCluster", mcObjRef, "joinedCondition", mcJoinedCondition)
// mark the imc as left to make sure the agent is leaving the fleet
if err := r.leave(ctx, mc, currentImc); err != nil {
klog.ErrorS(err, "failed to leave", "memberCluster", mcObjRef)
klog.ErrorS(err, "Failed to mark the imc as leave", "memberCluster", mcObjRef)
return runtime.Result{}, err
}
// update the mc status while we wait for all the agents to leave
err = r.updateMemberClusterStatus(ctx, mc)
return runtime.Result{}, controller.NewUpdateIgnoreConflictError(err)
// update the mc status to track the leaving status while we wait for all the agents to leave.
// once the imc is updated, the mc controller will reconcile again.
return runtime.Result{}, controller.NewUpdateIgnoreConflictError(r.updateMemberClusterStatus(ctx, mc))
}

func (r *Reconciler) getInternalMemberCluster(ctx context.Context, name string) (*clusterv1beta1.InternalMemberCluster, error) {
// Get current internal member cluster.
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, name)
imcNamespacedName := types.NamespacedName{Namespace: namespaceName, Name: name}
var imc clusterv1beta1.InternalMemberCluster
currentImc := &imc
if err := r.Client.Get(ctx, imcNamespacedName, &imc); err != nil {
currentIMC := &clusterv1beta1.InternalMemberCluster{}
if err := r.Client.Get(ctx, imcNamespacedName, currentIMC); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "failed to get internal member cluster", "internalMemberCluster", imcNamespacedName)
klog.ErrorS(err, "Failed to get internal member cluster", "internalMemberCluster", imcNamespacedName)
return nil, err
}
// Not found.
currentImc = nil
currentIMC = nil
}
return currentImc, nil
return currentIMC, nil
}

// garbageCollectWork remove all the finalizers on the work that are in the cluster namespace
func (r *Reconciler) garbageCollectWork(ctx context.Context, mc *clusterv1beta1.MemberCluster) (runtime.Result, error) {
var works placementv1beta1.WorkList
var clusterNS corev1.Namespace
// check if the namespace still exist
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, mc.Name)
if err := r.Client.Get(ctx, types.NamespacedName{Name: namespaceName}, &clusterNS); apierrors.IsNotFound(err) {
klog.V(2).InfoS("the member cluster namespace is successfully deleted", "memberCluster", klog.KObj(mc))
return runtime.Result{}, nil
}
func (r *Reconciler) garbageCollectWork(ctx context.Context, mc *clusterv1beta1.MemberCluster, namespaceName string) error {
// list all the work object we created in the member cluster namespace
var works placementv1beta1.WorkList
listOpts := []client.ListOption{
client.MatchingLabels{utils.LabelFleetObj: utils.LabelFleetObjValue},
client.InNamespace(namespaceName),
}
if err := r.Client.List(ctx, &works, listOpts...); err != nil {
klog.ErrorS(err, "failed to list all the work object", "memberCluster", klog.KObj(mc))
return runtime.Result{}, client.IgnoreNotFound(err)
klog.ErrorS(err, "Failed to list all the work object", "memberCluster", klog.KObj(mc))
return client.IgnoreNotFound(err)
}
// remove all the finalizers on the work objects in parallel
errs, cctx := errgroup.WithContext(ctx)
for _, work := range works.Items {
staleWork := work.DeepCopy()
staleWork.SetFinalizers(nil)
if updateErr := r.Update(ctx, staleWork, &client.UpdateOptions{}); updateErr != nil {
klog.ErrorS(updateErr, "failed to remove the finalizer from the work",
"memberCluster", klog.KObj(mc), "work", klog.KObj(staleWork))
return runtime.Result{}, updateErr
}
errs.Go(func() error {
staleWork.SetFinalizers(nil)
if updateErr := r.Update(cctx, staleWork, &client.UpdateOptions{}); updateErr != nil {
klog.ErrorS(updateErr, "Failed to remove the finalizer from the work",
"memberCluster", klog.KObj(mc), "work", klog.KObj(staleWork))
return updateErr
}
return nil
})
}
klog.V(2).InfoS("successfully removed all the work finalizers in the cluster namespace",
klog.V(2).InfoS("Try to remove all the work finalizers in the cluster namespace",
"memberCluster", klog.KObj(mc), "number of work", len(works.Items))
controllerutil.RemoveFinalizer(mc, placementv1beta1.MemberClusterFinalizer)
return runtime.Result{}, r.Update(ctx, mc, &client.UpdateOptions{})
return controller.NewUpdateIgnoreConflictError(errs.Wait())
}

// garbageCollect is used to garbage collect all the resources in the cluster namespace associated with the member cluster.
func (r *Reconciler) garbageCollect(ctx context.Context, mc *clusterv1beta1.MemberCluster) error {
// check if the namespace still exist
var clusterNS corev1.Namespace
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, mc.Name)
if err := r.Client.Get(ctx, types.NamespacedName{Name: namespaceName}, &clusterNS); err != nil {
klog.ErrorS(err, "Failed to get the member cluster namespace", "memberCluster", klog.KObj(mc))
return controller.NewAPIServerError(true, err)
}
if err := r.garbageCollectWork(ctx, mc, namespaceName); err != nil {
return err
}
if err := r.Delete(ctx, &clusterNS); err != nil {
klog.ErrorS(err, "Failed to remove the cluster namespace", "memberCluster", klog.KObj(mc), "namespace", namespaceName)
return controller.NewAPIServerError(false, err)
}
klog.V(2).InfoS("Deleted the member cluster namespace", "memberCluster", klog.KObj(mc))
return nil
}

// ensureFinalizer makes sure that the member cluster CR has a finalizer on it
func (r *Reconciler) ensureFinalizer(ctx context.Context, mc *clusterv1beta1.MemberCluster) error {
if controllerutil.ContainsFinalizer(mc, placementv1beta1.MemberClusterFinalizer) {
return nil
}
klog.InfoS("add the member cluster finalizer", "memberCluster", klog.KObj(mc))
klog.InfoS("Added the member cluster finalizer", "memberCluster", klog.KObj(mc))
controllerutil.AddFinalizer(mc, placementv1beta1.MemberClusterFinalizer)
return r.Update(ctx, mc, client.FieldOwner(utils.MCControllerFieldManagerName))
}
Expand Down Expand Up @@ -413,7 +464,7 @@ func (r *Reconciler) syncInternalMemberCluster(ctx context.Context, mc *clusterv
if currentImc == nil {
klog.V(2).InfoS("creating internal member cluster", "InternalMemberCluster", klog.KObj(&expectedImc), "spec", expectedImc.Spec)
if err := r.Client.Create(ctx, &expectedImc, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return nil, fmt.Errorf("failed to create internal member cluster %s with spec %+v: %w", klog.KObj(&expectedImc), expectedImc.Spec, err)
return nil, controller.NewAPIServerError(false, fmt.Errorf("failed to create internal member cluster %s with spec %+v: %w", klog.KObj(&expectedImc), expectedImc.Spec, err))
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonIMCCreated, "Internal member cluster was created")
klog.V(2).InfoS("created internal member cluster", "InternalMemberCluster", klog.KObj(&expectedImc), "spec", expectedImc.Spec)
Expand All @@ -427,7 +478,7 @@ func (r *Reconciler) syncInternalMemberCluster(ctx context.Context, mc *clusterv
currentImc.Spec = expectedImc.Spec
klog.V(2).InfoS("updating internal member cluster", "InternalMemberCluster", klog.KObj(currentImc), "spec", currentImc.Spec)
if err := r.Client.Update(ctx, currentImc, client.FieldOwner(utils.MCControllerFieldManagerName)); err != nil {
return nil, fmt.Errorf("failed to update internal member cluster %s with spec %+v: %w", klog.KObj(currentImc), currentImc.Spec, err)
return nil, controller.NewAPIServerError(false, fmt.Errorf("failed to update internal member cluster %s with spec %+v: %w", klog.KObj(currentImc), currentImc.Spec, err))
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonIMCSpecUpdated, "internal member cluster spec updated")
klog.V(2).InfoS("updated internal member cluster", "InternalMemberCluster", klog.KObj(currentImc), "spec", currentImc.Spec)
Expand Down Expand Up @@ -489,7 +540,7 @@ func (r *Reconciler) aggregateJoinedCondition(mc *clusterv1beta1.MemberCluster)
reportedAgents := make(map[clusterv1beta1.AgentType]bool)
for _, agentStatus := range mc.Status.AgentStatus {
if !r.agents[agentStatus.Type] {
klog.V(2).InfoS("Ignoring unexpected agent type status", "agentStatus", agentStatus)
_ = controller.NewUnexpectedBehaviorError(fmt.Errorf("find an unexpected agent type %s for member cluster %s", agentStatus.Type, mc.Name))
continue // ignore any unexpected agent type
}
condition := meta.FindStatusCondition(agentStatus.Conditions, string(clusterv1beta1.AgentJoined))
Expand Down
Loading

0 comments on commit fccc470

Please sign in to comment.