Skip to content

Commit

Permalink
implement preserveResourcesOnDeletion to support migration rollback
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Alavi <amiralavi7@gmail.com>
  • Loading branch information
a7i committed Sep 23, 2024
1 parent 721107d commit 21604d0
Show file tree
Hide file tree
Showing 14 changed files with 259 additions and 41 deletions.
14 changes: 11 additions & 3 deletions pkg/controllers/binding/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func ensureWork(
var replicas int32
var conflictResolutionInBinding policyv1alpha1.ConflictResolution
var suspension *policyv1alpha1.Suspension
var preserveResourcesOnDeletion *bool
switch scope {
case apiextensionsv1.NamespaceScoped:
bindingObj := binding.(*workv1alpha2.ResourceBinding)
Expand All @@ -57,6 +58,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
case apiextensionsv1.ClusterScoped:
bindingObj := binding.(*workv1alpha2.ClusterResourceBinding)
targetClusters = bindingObj.Spec.Clusters
Expand All @@ -65,6 +67,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
}

targetClusters = mergeTargetClusters(targetClusters, requiredByBindingSnapshot)
Expand Down Expand Up @@ -133,9 +136,14 @@ func ensureWork(
Annotations: annotations,
}

suspendDispatching := shouldSuspendDispatching(suspension, targetCluster)

if err = helper.CreateOrUpdateWork(ctx, c, workMeta, clonedWorkload, &suspendDispatching); err != nil {
if err = helper.CreateOrUpdateWork(
ctx,
c,
workMeta,
clonedWorkload,
helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)),
helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false)),
); err != nil {
return err
}
}
Expand Down
33 changes: 24 additions & 9 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/metrics"
Expand Down Expand Up @@ -102,15 +104,8 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
}

if !work.DeletionTimestamp.IsZero() {
// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, clusterName, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return controllerruntime.Result{}, err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return controllerruntime.Result{}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
if err := c.handleWorkDelete(ctx, work, cluster); err != nil {
return controllerruntime.Result{}, err
}

return c.removeFinalizer(ctx, work)
Expand Down Expand Up @@ -161,6 +156,26 @@ func (c *Controller) syncWork(ctx context.Context, clusterName string, work *wor
return controllerruntime.Result{}, nil
}

func (c *Controller) handleWorkDelete(ctx context.Context, work *workv1alpha1.Work, cluster *clusterv1alpha1.Cluster) error {
if ptr.Deref(work.Spec.PreserveResourcesOnDeletion, false) {
klog.V(4).Infof("Preserving resource on deletion from work(%s/%s) on cluster(%s)", work.Namespace, work.Name, cluster.Name)
return nil
}

// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, cluster.Name, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return nil
}

// tryDeleteWorkload tries to delete resources in the given member cluster.
func (c *Controller) tryDeleteWorkload(ctx context.Context, clusterName string, work *workv1alpha1.Work) error {
for _, manifest := range work.Spec.Workload.Manifests {
Expand Down
84 changes: 76 additions & 8 deletions pkg/controllers/execution/execution_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -33,18 +34,27 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/gclient"
utilhelper "github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
testhelper "github.com/karmada-io/karmada/test/helper"
)

const (
podNamespace = "default"
podName = "test"
clusterName = "cluster"
)

func TestExecutionController_Reconcile(t *testing.T) {
tests := []struct {
name string
Expand All @@ -54,6 +64,7 @@ func TestExecutionController_Reconcile(t *testing.T) {
expectCondition *metav1.Condition
expectEventMessage string
existErr bool
resourceExists *bool
}{
{
name: "work dispatching is suspended, no error, no apply",
Expand Down Expand Up @@ -112,6 +123,44 @@ func TestExecutionController_Reconcile(t *testing.T) {
work.Spec.SuspendDispatching = ptr.To(true)
}),
},
{
name: "PreserveResourcesOnDeletion=true, deletion timestamp set, does not delete resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(true),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
work.Spec.PreserveResourcesOnDeletion = ptr.To(true)
}),
},
{
name: "PreserveResourcesOnDeletion=false, deletion timestamp set, deletes resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(false),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
work.Spec.PreserveResourcesOnDeletion = ptr.To(false)
}),
},
{
name: "PreserveResourcesOnDeletion unset, deletion timestamp set, deletes resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(false),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
}),
},
}

for _, tt := range tests {
Expand All @@ -125,6 +174,7 @@ func TestExecutionController_Reconcile(t *testing.T) {

eventRecorder := record.NewFakeRecorder(1)
c := newController(tt.work, eventRecorder)
defer c.InformerManager.Stop(clusterName)
res, err := c.Reconcile(context.Background(), req)
assert.Equal(t, tt.expectRes, res)
if tt.existErr {
Expand All @@ -143,32 +193,50 @@ func TestExecutionController_Reconcile(t *testing.T) {
e := <-eventRecorder.Events
assert.Equal(t, tt.expectEventMessage, e)
}

if tt.resourceExists != nil {
_, err = utilhelper.GetObjectFromCache(c.RESTMapper, c.InformerManager, keys.FederatedKey{Cluster: clusterName, ClusterWideKey: keys.ClusterWideKey{
Version: "v1", Kind: "Pod", Namespace: podNamespace, Name: podName,
}})
if *tt.resourceExists {
assert.NoErrorf(t, err, "unable to query pod (%s/%s)", podNamespace, podName)
} else {
assert.True(t, apierrors.IsNotFound(err), "pod (%s/%s) was not deleted", podNamespace, podName)
}
}
})
}
}

func newController(work *workv1alpha1.Work, eventRecorder *record.FakeRecorder) Controller {
cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod("default", "test")
client := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work, pod).WithStatusSubresource(work).Build()
func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Controller {
cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod(podNamespace, podName)
pod.SetLabels(map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue})
restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
restMapper.Add(corev1.SchemeGroupVersion.WithKind(pod.Kind), meta.RESTScopeNamespace)
fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work, pod).WithStatusSubresource(work).WithRESTMapper(restMapper).Build()
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, pod)
informerManager := genericmanager.GetInstance()
informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
informerManager.Start(cluster.Name)
informerManager.WaitForCacheSync(cluster.Name)
clusterClientSetFunc := func(string, client.Client) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
}, nil
}
return Controller{
Client: client,
Client: fakeClient,
InformerManager: informerManager,
EventRecorder: eventRecorder,
EventRecorder: recorder,
RESTMapper: restMapper,
ObjectWatcher: objectwatcher.NewObjectWatcher(client, restMapper, util.NewClusterDynamicClientSetForAgent, nil),
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, nil),
}
}

func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work {
pod := testhelper.NewPod("default", "test")
pod := testhelper.NewPod(podNamespace, podName)
bytes, _ := json.Marshal(pod)
work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), bytes)
if applyFunc != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *SyncController) buildWorks(ctx context.Context, quota *policyv1alpha1.F
},
}

err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj, nil)
err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj)
if err != nil {
errs = append(errs, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *EndpointsliceDispatchController) ensureEndpointSliceWork(ctx context.Co
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
return err
}
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS); err != nil {
klog.Errorf("Failed to dispatch EndpointSlice %s/%s from %s to cluster %s:%v",
work.GetNamespace(), work.GetName(), providerCluster, consumerCluster, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err)
return err
}
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, nil); err != nil {
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj); err != nil {
klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v",
mcs.Namespace, mcs.Name, clusterName, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/namespace/namespace_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *Controller) buildWorks(ctx context.Context, namespace *corev1.Namespace
Annotations: annotations,
}

if err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, clonedNamespaced, nil); err != nil {
if err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, clonedNamespaced); err != nil {
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/unifiedauth/unified_auth_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (c *Controller) buildWorks(ctx context.Context, cluster *clusterv1alpha1.Cl
},
}

if err := helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, obj, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, obj); err != nil {
return err
}

Expand Down
29 changes: 17 additions & 12 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
bindingCopy.Spec.Suspension = binding.Spec.Suspension
bindingCopy.Spec.PreserveResourcesOnDeletion = binding.Spec.PreserveResourcesOnDeletion
excludeClusterPolicy(bindingCopy.Labels)
return nil
})
Expand Down Expand Up @@ -596,6 +597,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
bindingCopy.Spec.Suspension = binding.Spec.Suspension
bindingCopy.Spec.PreserveResourcesOnDeletion = binding.Spec.PreserveResourcesOnDeletion
return nil
})
return err
Expand Down Expand Up @@ -642,6 +644,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
bindingCopy.Spec.Suspension = binding.Spec.Suspension
bindingCopy.Spec.PreserveResourcesOnDeletion = binding.Spec.PreserveResourcesOnDeletion
return nil
})
return err
Expand Down Expand Up @@ -763,12 +766,13 @@ func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructure
Finalizers: []string{util.BindingControllerFinalizer},
},
Spec: workv1alpha2.ResourceBindingSpec{
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PreserveResourcesOnDeletion: policySpec.PreserveResourcesOnDeletion,
Resource: workv1alpha2.ObjectReference{
APIVersion: object.GetAPIVersion(),
Kind: object.GetKind(),
Expand Down Expand Up @@ -808,12 +812,13 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst
Finalizers: []string{util.ClusterResourceBindingControllerFinalizer},
},
Spec: workv1alpha2.ResourceBindingSpec{
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PreserveResourcesOnDeletion: policySpec.PreserveResourcesOnDeletion,
Resource: workv1alpha2.ObjectReference{
APIVersion: object.GetAPIVersion(),
Kind: object.GetKind(),
Expand Down
Loading

0 comments on commit 21604d0

Please sign in to comment.