Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement preserveResourcesOnDeletion to support migration rollback #5597

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions pkg/controllers/binding/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func ensureWork(
var replicas int32
var conflictResolutionInBinding policyv1alpha1.ConflictResolution
var suspension *policyv1alpha1.Suspension
var preserveResourcesOnDeletion *bool
switch scope {
case apiextensionsv1.NamespaceScoped:
bindingObj := binding.(*workv1alpha2.ResourceBinding)
Expand All @@ -57,6 +58,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
case apiextensionsv1.ClusterScoped:
bindingObj := binding.(*workv1alpha2.ClusterResourceBinding)
targetClusters = bindingObj.Spec.Clusters
Expand All @@ -65,6 +67,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
}

targetClusters = mergeTargetClusters(targetClusters, requiredByBindingSnapshot)
Expand Down Expand Up @@ -133,9 +136,14 @@ func ensureWork(
Annotations: annotations,
}

suspendDispatching := shouldSuspendDispatching(suspension, targetCluster)

if err = helper.CreateOrUpdateWork(ctx, c, workMeta, clonedWorkload, &suspendDispatching); err != nil {
if err = helper.CreateOrUpdateWork(
ctx,
c,
workMeta,
clonedWorkload,
helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)),
helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false)),
); err != nil {
return err
}
}
Expand Down
97 changes: 86 additions & 11 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/detector"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
Expand Down Expand Up @@ -102,15 +106,8 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
}

if !work.DeletionTimestamp.IsZero() {
// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, clusterName, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return controllerruntime.Result{}, err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return controllerruntime.Result{}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
if err := c.handleWorkDelete(ctx, work, cluster); err != nil {
return controllerruntime.Result{}, err
}

return c.removeFinalizer(ctx, work)
Expand Down Expand Up @@ -151,16 +148,94 @@ func (c *Controller) syncWork(ctx context.Context, clusterName string, work *wor
metrics.ObserveSyncWorkloadLatency(err, start)
if err != nil {
msg := fmt.Sprintf("Failed to sync work(%s/%s) to cluster(%s), err: %v", work.Namespace, work.Name, clusterName, err)
klog.Errorf(msg)
klog.Error(msg)
c.EventRecorder.Event(work, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, msg)
return controllerruntime.Result{}, err
}
msg := fmt.Sprintf("Sync work(%s/%s) to cluster(%s) successful.", work.Namespace, work.Name, clusterName)
klog.V(4).Infof(msg)
klog.V(4).Info(msg)
c.EventRecorder.Event(work, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, msg)
return controllerruntime.Result{}, nil
}

func (c *Controller) handleWorkDelete(ctx context.Context, work *workv1alpha1.Work, cluster *clusterv1alpha1.Cluster) error {
if ptr.Deref(work.Spec.PreserveResourcesOnDeletion, false) {
if err := c.cleanupPolicyClaimMetadata(ctx, work, cluster); err != nil {
klog.Errorf("Failed to remove annotations and labels in on cluster(%s)", cluster.Name)
return err
}
klog.V(4).Infof("Preserving resource on deletion from work(%s/%s) on cluster(%s)", work.Namespace, work.Name, cluster.Name)
a7i marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
a7i marked this conversation as resolved.
Show resolved Hide resolved
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, cluster.Name, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return nil
}

func (c *Controller) cleanupPolicyClaimMetadata(ctx context.Context, work *workv1alpha1.Work, cluster *clusterv1alpha1.Cluster) error {
for _, manifest := range work.Spec.Workload.Manifests {
workload := &unstructured.Unstructured{}
if err := workload.UnmarshalJSON(manifest.Raw); err != nil {
klog.Errorf("Failed to unmarshal workload from work(%s/%s), error is: %v", err, work.GetNamespace(), work.GetName())
return err
}

fedKey, err := keys.FederatedKeyFunc(cluster.Name, workload)
if err != nil {
klog.Errorf("Failed to get the federated key resource(kind=%s, %s/%s) from member cluster(%s), err is %v ",
workload.GetKind(), workload.GetNamespace(), workload.GetName(), cluster.Name, err)
return err
}

clusterObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey)
if err != nil {
klog.Errorf("Failed to get the resource(kind=%s, %s/%s) from member cluster(%s) cache, err is %v ",
workload.GetKind(), workload.GetNamespace(), workload.GetName(), cluster.Name, err)
return err
}

if workload.GetNamespace() == corev1.NamespaceAll {
detector.CleanupCPPClaimMetadata(workload)
} else {
detector.CleanupPPClaimMetadata(workload)
}
util.RemoveLabels(
workload,
workv1alpha2.ResourceBindingPermanentIDLabel,
workv1alpha2.WorkPermanentIDLabel,
util.ManagedByKarmadaLabel,
)
util.RemoveAnnotations(
workload,
workv1alpha2.ManagedAnnotation,
workv1alpha2.ManagedLabels,
workv1alpha2.ResourceBindingNamespaceAnnotationKey,
workv1alpha2.ResourceBindingNameAnnotationKey,
workv1alpha2.ResourceTemplateUIDAnnotation,
workv1alpha2.ResourceTemplateGenerationAnnotationKey,
workv1alpha2.WorkNameAnnotation,
workv1alpha2.WorkNamespaceAnnotation,
)

if err := c.ObjectWatcher.Update(ctx, cluster.Name, workload, clusterObj); err != nil {
klog.Errorf("Failed to update metadata in the given member cluster %v, err is %v", cluster.Name, err)
return err
}
}

return nil
}

// tryDeleteWorkload tries to delete resources in the given member cluster.
func (c *Controller) tryDeleteWorkload(ctx context.Context, clusterName string, work *workv1alpha1.Work) error {
for _, manifest := range work.Spec.Workload.Manifests {
Expand Down
98 changes: 90 additions & 8 deletions pkg/controllers/execution/execution_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -33,18 +34,33 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/resourceinterpreter/default/native"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
testhelper "github.com/karmada-io/karmada/test/helper"
)

type FakeResourceInterpreter struct {
*native.DefaultInterpreter
}

var _ resourceinterpreter.ResourceInterpreter = &FakeResourceInterpreter{}

const (
podNamespace = "default"
podName = "test"
clusterName = "cluster"
)

func TestExecutionController_Reconcile(t *testing.T) {
tests := []struct {
name string
Expand All @@ -54,6 +70,7 @@ func TestExecutionController_Reconcile(t *testing.T) {
expectCondition *metav1.Condition
expectEventMessage string
existErr bool
resourceExists *bool
}{
{
name: "work dispatching is suspended, no error, no apply",
Expand Down Expand Up @@ -112,10 +129,52 @@ func TestExecutionController_Reconcile(t *testing.T) {
work.Spec.SuspendDispatching = ptr.To(true)
}),
},
{
name: "PreserveResourcesOnDeletion=true, deletion timestamp set, does not delete resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(true),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
work.Spec.PreserveResourcesOnDeletion = ptr.To(true)
}),
},
{
name: "PreserveResourcesOnDeletion=false, deletion timestamp set, deletes resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(false),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
work.Spec.PreserveResourcesOnDeletion = ptr.To(false)
}),
},
{
name: "PreserveResourcesOnDeletion unset, deletion timestamp set, deletes resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(false),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
}),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Cleanup(func() {
genericmanager.GetInstance().Stop(clusterName)
})

req := controllerruntime.Request{
NamespacedName: types.NamespacedName{
Name: "work",
Expand Down Expand Up @@ -143,32 +202,51 @@ func TestExecutionController_Reconcile(t *testing.T) {
e := <-eventRecorder.Events
assert.Equal(t, tt.expectEventMessage, e)
}

if tt.resourceExists != nil {
resourceInterface := c.InformerManager.GetSingleClusterManager(clusterName).GetClient().
Resource(corev1.SchemeGroupVersion.WithResource("pods")).Namespace(podNamespace)
_, err = resourceInterface.Get(context.TODO(), podName, metav1.GetOptions{})
if *tt.resourceExists {
assert.NoErrorf(t, err, "unable to query pod (%s/%s)", podNamespace, podName)
} else {
assert.True(t, apierrors.IsNotFound(err), "pod (%s/%s) was not deleted", podNamespace, podName)
}
}
})
}
}

func newController(work *workv1alpha1.Work, eventRecorder *record.FakeRecorder) Controller {
cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod("default", "test")
client := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work, pod).WithStatusSubresource(work).Build()
func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Controller {
cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod(podNamespace, podName)
pod.SetLabels(map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue})
restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
restMapper.Add(corev1.SchemeGroupVersion.WithKind(pod.Kind), meta.RESTScopeNamespace)
fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work).WithStatusSubresource(work).WithRESTMapper(restMapper).Build()
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, pod)
informerManager := genericmanager.GetInstance()
informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
informerManager.Start(cluster.Name)
informerManager.WaitForCacheSync(cluster.Name)
clusterClientSetFunc := func(string, client.Client) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
}, nil
}
resourceInterpreter := FakeResourceInterpreter{DefaultInterpreter: native.NewDefaultInterpreter()}
return Controller{
Client: client,
Client: fakeClient,
InformerManager: informerManager,
EventRecorder: eventRecorder,
EventRecorder: recorder,
RESTMapper: restMapper,
ObjectWatcher: objectwatcher.NewObjectWatcher(client, restMapper, util.NewClusterDynamicClientSetForAgent, nil),
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, resourceInterpreter),
}
}

func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work {
pod := testhelper.NewPod("default", "test")
pod := testhelper.NewPod(podNamespace, podName)
bytes, _ := json.Marshal(pod)
work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), bytes)
if applyFunc != nil {
Expand All @@ -193,3 +271,7 @@ func newCluster(name string, clusterType string, clusterStatus metav1.ConditionS
},
}
}

func (f FakeResourceInterpreter) Start(context.Context) error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *SyncController) buildWorks(ctx context.Context, quota *policyv1alpha1.F
},
}

err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj, nil)
err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj)
if err != nil {
errs = append(errs, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *EndpointsliceDispatchController) ensureEndpointSliceWork(ctx context.Co
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
return err
}
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS); err != nil {
klog.Errorf("Failed to dispatch EndpointSlice %s/%s from %s to cluster %s:%v",
work.GetNamespace(), work.GetName(), providerCluster, consumerCluster, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err)
return err
}
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, nil); err != nil {
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj); err != nil {
klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v",
mcs.Namespace, mcs.Name, clusterName, err)
return err
Expand Down
Loading