From a03b852ff01d01d66a681227a02d822240fad1c2 Mon Sep 17 00:00:00 2001 From: "chang.qiangqiang" Date: Tue, 12 Nov 2024 15:19:10 +0800 Subject: [PATCH] feat(detector): resource detector matched policy potimization Signed-off-by: chang.qiangqiang --- pkg/detector/compare.go | 33 +++++--- pkg/detector/compare_test.go | 40 +++++----- pkg/detector/detector.go | 71 ++++------------- pkg/detector/detector_test.go | 132 +++++--------------------------- pkg/detector/policy.go | 20 ++--- pkg/detector/preemption.go | 48 ++++-------- pkg/detector/preemption_test.go | 27 ++----- 7 files changed, 103 insertions(+), 268 deletions(-) diff --git a/pkg/detector/compare.go b/pkg/detector/compare.go index 80c30b239456..852ff3313a73 100644 --- a/pkg/detector/compare.go +++ b/pkg/detector/compare.go @@ -21,37 +21,43 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/klog/v2" + "k8s.io/utils/ptr" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" ) -func getHighestPriorityPropagationPolicy(policies []*policyv1alpha1.PropagationPolicy, resource *unstructured.Unstructured, objectKey keys.ClusterWideKey) *policyv1alpha1.PropagationPolicy { +func getHighestPriorityPropagationPolicy(policies []policyv1alpha1.PropagationPolicy, resource *unstructured.Unstructured, objectKey keys.ClusterWideKey) *policyv1alpha1.PropagationPolicy { matchedPolicyImplicitPriority := util.PriorityMisMatch matchedPolicyExplicitPriority := int32(math.MinInt32) var matchedPolicy *policyv1alpha1.PropagationPolicy for _, policy := range policies { - implicitPriority := util.ResourceMatchSelectorsPriority(resource, policy.Spec.ResourceSelectors...) + policyPointer := ptr.To(policy) + if !policyPointer.DeletionTimestamp.IsZero() { + klog.V(4).Infof("Propagation policy(%s/%s) cannot match any resource template because it's being deleted.", policyPointer.Namespace, policyPointer.Name) + continue + } + implicitPriority := util.ResourceMatchSelectorsPriority(resource, policyPointer.Spec.ResourceSelectors...) if implicitPriority <= util.PriorityMisMatch { continue } - explicitPriority := policy.ExplicitPriority() + explicitPriority := policyPointer.ExplicitPriority() if matchedPolicyExplicitPriority < explicitPriority { matchedPolicyImplicitPriority = implicitPriority matchedPolicyExplicitPriority = explicitPriority - matchedPolicy = policy + matchedPolicy = policyPointer continue } if matchedPolicyExplicitPriority == explicitPriority { if implicitPriority > matchedPolicyImplicitPriority { matchedPolicyImplicitPriority = implicitPriority - matchedPolicy = policy + matchedPolicy = policyPointer } else if implicitPriority == matchedPolicyImplicitPriority { - matchedPolicy = getHigherPriorityPropagationPolicy(matchedPolicy, policy) + matchedPolicy = getHigherPriorityPropagationPolicy(matchedPolicy, policyPointer) } } } @@ -64,31 +70,36 @@ func getHighestPriorityPropagationPolicy(policies []*policyv1alpha1.PropagationP return matchedPolicy } -func getHighestPriorityClusterPropagationPolicy(policies []*policyv1alpha1.ClusterPropagationPolicy, resource *unstructured.Unstructured, objectKey keys.ClusterWideKey) *policyv1alpha1.ClusterPropagationPolicy { +func getHighestPriorityClusterPropagationPolicy(policies []policyv1alpha1.ClusterPropagationPolicy, resource *unstructured.Unstructured, objectKey keys.ClusterWideKey) *policyv1alpha1.ClusterPropagationPolicy { matchedClusterPolicyImplicitPriority := util.PriorityMisMatch matchedClusterPolicyExplicitPriority := int32(math.MinInt32) var matchedClusterPolicy *policyv1alpha1.ClusterPropagationPolicy for _, policy := range policies { + policyPointer := ptr.To(policy) + if !policyPointer.DeletionTimestamp.IsZero() { + klog.V(4).Infof("Cluster propagation policy(%s/%s) cannot match any resource template because it's being deleted.", policyPointer.Namespace, policyPointer.Name) + continue + } implicitPriority := util.ResourceMatchSelectorsPriority(resource, policy.Spec.ResourceSelectors...) if implicitPriority <= util.PriorityMisMatch { continue } - explicitPriority := policy.ExplicitPriority() + explicitPriority := policyPointer.ExplicitPriority() if matchedClusterPolicyExplicitPriority < explicitPriority { matchedClusterPolicyImplicitPriority = implicitPriority matchedClusterPolicyExplicitPriority = explicitPriority - matchedClusterPolicy = policy + matchedClusterPolicy = policyPointer continue } if matchedClusterPolicyExplicitPriority == explicitPriority { if implicitPriority > matchedClusterPolicyImplicitPriority { matchedClusterPolicyImplicitPriority = implicitPriority - matchedClusterPolicy = policy + matchedClusterPolicy = policyPointer } else if implicitPriority == matchedClusterPolicyImplicitPriority { - matchedClusterPolicy = getHigherPriorityClusterPropagationPolicy(matchedClusterPolicy, policy) + matchedClusterPolicy = getHigherPriorityClusterPropagationPolicy(matchedClusterPolicy, policyPointer) } } } diff --git a/pkg/detector/compare_test.go b/pkg/detector/compare_test.go index cb5a25bd2ca6..b0ae031dc70b 100644 --- a/pkg/detector/compare_test.go +++ b/pkg/detector/compare_test.go @@ -207,7 +207,7 @@ func Test_GetHigherPriorityClusterPropagationPolicy(t *testing.T) { func Test_getHighestPriorityPropagationPolicies(t *testing.T) { type args struct { - policies []*policyv1alpha1.PropagationPolicy + policies []policyv1alpha1.PropagationPolicy resource *unstructured.Unstructured objectKey keys.ClusterWideKey } @@ -219,7 +219,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { { name: "empty policies", args: args{ - policies: []*policyv1alpha1.PropagationPolicy{}, + policies: []policyv1alpha1.PropagationPolicy{}, resource: &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "apps/v1", @@ -237,7 +237,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { { name: "mo policy match for resource", args: args{ - policies: []*policyv1alpha1.PropagationPolicy{ + policies: []policyv1alpha1.PropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "match-with-name", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -278,7 +278,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { { name: "different implicit priority policy", args: args{ - policies: []*policyv1alpha1.PropagationPolicy{ + policies: []policyv1alpha1.PropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "match-with-name", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -339,7 +339,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { { name: "same implicit priority policy", args: args{ - policies: []*policyv1alpha1.PropagationPolicy{ + policies: []policyv1alpha1.PropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -390,7 +390,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { { name: "one policy with implicit priority, one policy with explicit priority 1", args: args{ - policies: []*policyv1alpha1.PropagationPolicy{ + policies: []policyv1alpha1.PropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -449,7 +449,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { { name: "one policy with explicit priority 1(name match), one policy with explicit priority 2(label selector match)", args: args{ - policies: []*policyv1alpha1.PropagationPolicy{ + policies: []policyv1alpha1.PropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -512,7 +512,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { { name: "two policies with explicit priority 1(name match), select the one with lower alphabetical order", args: args{ - policies: []*policyv1alpha1.PropagationPolicy{ + policies: []policyv1alpha1.PropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -575,7 +575,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { { name: "one policy with explicit priority 1(name match), one policy with explicit priority 1(label selector match)", args: args{ - policies: []*policyv1alpha1.PropagationPolicy{ + policies: []policyv1alpha1.PropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -638,7 +638,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { { name: "one policy with explicit priority -1(name match), one policy with implicit priority(label selector match)", args: args{ - policies: []*policyv1alpha1.PropagationPolicy{ + policies: []policyv1alpha1.PropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -702,7 +702,7 @@ func Test_getHighestPriorityPropagationPolicies(t *testing.T) { func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { type args struct { - policies []*policyv1alpha1.ClusterPropagationPolicy + policies []policyv1alpha1.ClusterPropagationPolicy resource *unstructured.Unstructured objectKey keys.ClusterWideKey } @@ -714,7 +714,7 @@ func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { { name: "empty policies", args: args{ - policies: []*policyv1alpha1.ClusterPropagationPolicy{}, + policies: []policyv1alpha1.ClusterPropagationPolicy{}, resource: &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "apps/v1", @@ -732,7 +732,7 @@ func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { { name: "mo policy match for resource", args: args{ - policies: []*policyv1alpha1.ClusterPropagationPolicy{ + policies: []policyv1alpha1.ClusterPropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "match-with-name", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -773,7 +773,7 @@ func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { { name: "different implicit priority policy", args: args{ - policies: []*policyv1alpha1.ClusterPropagationPolicy{ + policies: []policyv1alpha1.ClusterPropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "match-with-name", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -834,7 +834,7 @@ func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { { name: "same implicit priority policy", args: args{ - policies: []*policyv1alpha1.ClusterPropagationPolicy{ + policies: []policyv1alpha1.ClusterPropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -885,7 +885,7 @@ func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { { name: "one policy with implicit priority, one policy with explicit priority 1", args: args{ - policies: []*policyv1alpha1.ClusterPropagationPolicy{ + policies: []policyv1alpha1.ClusterPropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -944,7 +944,7 @@ func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { { name: "one policy with explicit priority 1(name match), one policy with explicit priority 2(label selector match)", args: args{ - policies: []*policyv1alpha1.ClusterPropagationPolicy{ + policies: []policyv1alpha1.ClusterPropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -1007,7 +1007,7 @@ func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { { name: "two policies with explicit priority 1(name match), select the one with lower alphabetical order", args: args{ - policies: []*policyv1alpha1.ClusterPropagationPolicy{ + policies: []policyv1alpha1.ClusterPropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -1070,7 +1070,7 @@ func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { { name: "one policy with explicit priority 1(name match), one policy with explicit priority 1(label selector match)", args: args{ - policies: []*policyv1alpha1.ClusterPropagationPolicy{ + policies: []policyv1alpha1.ClusterPropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ @@ -1133,7 +1133,7 @@ func Test_getHighestPriorityClusterPropagationPolicies(t *testing.T) { { name: "one policy with explicit priority -1(name match), one policy with implicit priority(label selector match)", args: args{ - policies: []*policyv1alpha1.ClusterPropagationPolicy{ + policies: []policyv1alpha1.ClusterPropagationPolicy{ { ObjectMeta: metav1.ObjectMeta{Name: "a-pp", Namespace: "test"}, Spec: policyv1alpha1.PropagationSpec{ diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index aacac44ae78e..8e16a6722c2e 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" @@ -80,13 +81,11 @@ type ResourceDetector struct { EventRecorder record.EventRecorder // policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and // a reconcile function to consume the items in queue. - policyReconcileWorker util.AsyncWorker - propagationPolicyLister cache.GenericLister + policyReconcileWorker util.AsyncWorker // clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and // a reconcile function to consume the items in queue. - clusterPolicyReconcileWorker util.AsyncWorker - clusterPropagationPolicyLister cache.GenericLister + clusterPolicyReconcileWorker util.AsyncWorker RESTMapper meta.RESTMapper @@ -139,7 +138,6 @@ func (d *ResourceDetector) Start(ctx context.Context) error { } policyHandler := fedinformer.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, nil) d.InformerManager.ForResource(propagationPolicyGVR, policyHandler) - d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) // watch and enqueue ClusterPropagationPolicy changes. clusterPropagationPolicyGVR := schema.GroupVersionResource{ @@ -149,7 +147,6 @@ func (d *ResourceDetector) Start(ctx context.Context) error { } clusterPolicyHandler := fedinformer.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, nil) d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler) - d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) detectorWorkerOptions := util.Options{ Name: "resource detector", @@ -359,63 +356,35 @@ func (d *ResourceDetector) LookForMatchedPolicy(object *unstructured.Unstructure } klog.V(2).Infof("Attempts to match policy for resource(%s)", objectKey) - policyObjects, err := d.propagationPolicyLister.ByNamespace(objectKey.Namespace).List(labels.Everything()) + policyObjectList := &policyv1alpha1.PropagationPolicyList{} + err := d.Client.List(context.TODO(), policyObjectList) if err != nil { klog.Errorf("Failed to list propagation policy: %v", err) return nil, err } - if len(policyObjects) == 0 { + if len(policyObjectList.Items) == 0 { klog.V(2).Infof("No propagationpolicy find in namespace(%s).", objectKey.Namespace) return nil, nil } - policyList := make([]*policyv1alpha1.PropagationPolicy, 0) - for index := range policyObjects { - policy := &policyv1alpha1.PropagationPolicy{} - if err = helper.ConvertToTypedObject(policyObjects[index], policy); err != nil { - klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v", err) - return nil, err - } - - if !policy.DeletionTimestamp.IsZero() { - klog.V(4).Infof("Propagation policy(%s/%s) cannot match any resource template because it's being deleted.", policy.Namespace, policy.Name) - continue - } - policyList = append(policyList, policy) - } - - return getHighestPriorityPropagationPolicy(policyList, object, objectKey), nil + return getHighestPriorityPropagationPolicy(policyObjectList.Items, object, objectKey), nil } // LookForMatchedClusterPolicy tries to find a ClusterPropagationPolicy for object referenced by object key. func (d *ResourceDetector) LookForMatchedClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey) (*policyv1alpha1.ClusterPropagationPolicy, error) { klog.V(2).Infof("Attempts to match cluster policy for resource(%s)", objectKey) - policyObjects, err := d.clusterPropagationPolicyLister.List(labels.Everything()) + policyObjectList := &policyv1alpha1.ClusterPropagationPolicyList{} + err := d.Client.List(context.TODO(), policyObjectList) if err != nil { klog.Errorf("Failed to list cluster propagation policy: %v", err) return nil, err } - if len(policyObjects) == 0 { + if len(policyObjectList.Items) == 0 { klog.V(2).Infof("No clusterpropagationpolicy find.") return nil, nil } - policyList := make([]*policyv1alpha1.ClusterPropagationPolicy, 0) - for index := range policyObjects { - policy := &policyv1alpha1.ClusterPropagationPolicy{} - if err = helper.ConvertToTypedObject(policyObjects[index], policy); err != nil { - klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v", err) - return nil, err - } - - if !policy.DeletionTimestamp.IsZero() { - klog.V(4).Infof("Cluster propagation policy(%s) cannot match any resource template because it's being deleted.", policy.Name) - continue - } - policyList = append(policyList, policy) - } - - return getHighestPriorityClusterPropagationPolicy(policyList, object, objectKey), nil + return getHighestPriorityClusterPropagationPolicy(policyObjectList.Items, object, objectKey), nil } // ApplyPolicy starts propagate the object referenced by object key according to PropagationPolicy. @@ -915,7 +884,8 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { return fmt.Errorf("invalid key") } - unstructuredObj, err := d.propagationPolicyLister.Get(ckey.NamespaceKey()) + propagationObject := &policyv1alpha1.PropagationPolicy{} + err := d.Client.Get(context.TODO(), types.NamespacedName{Namespace: ckey.Namespace, Name: ckey.Name}, propagationObject) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -924,12 +894,6 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { return err } - propagationObject := &policyv1alpha1.PropagationPolicy{} - if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil { - klog.Errorf("Failed to convert PropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err) - return err - } - if !propagationObject.DeletionTimestamp.IsZero() { klog.Infof("PropagationPolicy(%s) is being deleted.", ckey.NamespaceKey()) if err = d.HandlePropagationPolicyDeletion(propagationObject.Labels[policyv1alpha1.PropagationPolicyPermanentIDLabel]); err != nil { @@ -1016,7 +980,8 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey) return fmt.Errorf("invalid key") } - unstructuredObj, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey()) + propagationObject := &policyv1alpha1.ClusterPropagationPolicy{} + err := d.Client.Get(context.TODO(), types.NamespacedName{Name: ckey.Name}, propagationObject) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -1026,12 +991,6 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey) return err } - propagationObject := &policyv1alpha1.ClusterPropagationPolicy{} - if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil { - klog.Errorf("Failed to convert ClusterPropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err) - return err - } - if !propagationObject.DeletionTimestamp.IsZero() { klog.Infof("ClusterPropagationPolicy(%s) is being deleted.", ckey.NamespaceKey()) if err = d.HandleClusterPropagationPolicyDeletion(propagationObject.Labels[policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel]); err != nil { diff --git a/pkg/detector/detector_test.go b/pkg/detector/detector_test.go index 10f77fa06343..8d1eb2f7fe00 100644 --- a/pkg/detector/detector_test.go +++ b/pkg/detector/detector_test.go @@ -29,11 +29,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" dynamicfake "k8s.io/client-go/dynamic/fake" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -678,12 +676,17 @@ func TestLookForMatchedPolicy(t *testing.T) { t.Run(tt.name, func(t *testing.T) { scheme := setupTestScheme() fakeClient := dynamicfake.NewSimpleDynamicClient(scheme) - + controllerRuntimeFakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() d := &ResourceDetector{ + Client: controllerRuntimeFakeClient, DynamicClient: fakeClient, - propagationPolicyLister: &mockPropagationPolicyLister{ - policies: tt.policies, - }, + } + + for _, policy := range tt.policies { + err := d.Client.Create(context.TODO(), policy) + if err != nil { + t.Errorf("Create policy by fakeClinet failed. : %v", err) + } } objectKey := keys.ClusterWideKey{ @@ -767,12 +770,16 @@ func TestLookForMatchedClusterPolicy(t *testing.T) { t.Run(tt.name, func(t *testing.T) { scheme := setupTestScheme() fakeClient := dynamicfake.NewSimpleDynamicClient(scheme) - + controllerRuntimeFakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() d := &ResourceDetector{ + Client: controllerRuntimeFakeClient, DynamicClient: fakeClient, - clusterPropagationPolicyLister: &mockClusterPropagationPolicyLister{ - policies: tt.policies, - }, + } + for _, policy := range tt.policies { + err := d.Client.Create(context.TODO(), policy) + if err != nil { + t.Errorf("Create cluster policy by fakeClinet failed. : %v", err) + } } objectKey := keys.ClusterWideKey{ @@ -1030,6 +1037,7 @@ func setupTestScheme() *runtime.Scheme { scheme := runtime.NewScheme() _ = workv1alpha2.Install(scheme) _ = corev1.AddToScheme(scheme) + _ = policyv1alpha1.Install(scheme) return scheme } @@ -1133,110 +1141,6 @@ func (m *mockResourceDetector) BuildClusterResourceBinding(object *unstructured. return &workv1alpha2.ClusterResourceBinding{}, nil } -// mockPropagationPolicyLister is a mock implementation of the PropagationPolicyLister -type mockPropagationPolicyLister struct { - policies []*policyv1alpha1.PropagationPolicy -} - -func (m *mockPropagationPolicyLister) List(_ labels.Selector) (ret []runtime.Object, err error) { - var result []runtime.Object - for _, p := range m.policies { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - result = append(result, &unstructured.Unstructured{Object: u}) - } - return result, nil -} - -func (m *mockPropagationPolicyLister) Get(name string) (runtime.Object, error) { - for _, p := range m.policies { - if p.Name == name { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - return &unstructured.Unstructured{Object: u}, nil - } - } - return nil, nil -} - -func (m *mockPropagationPolicyLister) ByNamespace(namespace string) cache.GenericNamespaceLister { - return &mockGenericNamespaceLister{ - policies: m.policies, - namespace: namespace, - } -} - -// mockGenericNamespaceLister is a mock implementation of cache.GenericNamespaceLister -type mockGenericNamespaceLister struct { - policies []*policyv1alpha1.PropagationPolicy - namespace string -} - -func (m *mockGenericNamespaceLister) List(_ labels.Selector) (ret []runtime.Object, err error) { - var result []runtime.Object - for _, p := range m.policies { - if p.Namespace == m.namespace { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - result = append(result, &unstructured.Unstructured{Object: u}) - } - } - return result, nil -} - -func (m *mockGenericNamespaceLister) Get(name string) (runtime.Object, error) { - for _, p := range m.policies { - if p.Name == name && p.Namespace == m.namespace { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - return &unstructured.Unstructured{Object: u}, nil - } - } - return nil, nil -} - -// mockClusterPropagationPolicyLister is a mock implementation of the ClusterPropagationPolicyLister -type mockClusterPropagationPolicyLister struct { - policies []*policyv1alpha1.ClusterPropagationPolicy -} - -func (m *mockClusterPropagationPolicyLister) List(_ labels.Selector) (ret []runtime.Object, err error) { - var result []runtime.Object - for _, p := range m.policies { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - result = append(result, &unstructured.Unstructured{Object: u}) - } - return result, nil -} - -func (m *mockClusterPropagationPolicyLister) Get(name string) (runtime.Object, error) { - for _, p := range m.policies { - if p.Name == name { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - return &unstructured.Unstructured{Object: u}, nil - } - } - return nil, nil -} - -func (m *mockClusterPropagationPolicyLister) ByNamespace(_ string) cache.GenericNamespaceLister { - return nil // ClusterPropagationPolicies are not namespaced -} - // mockResourceInterpreter is a mock implementation of the ResourceInterpreter interface type mockResourceInterpreter struct{} diff --git a/pkg/detector/policy.go b/pkg/detector/policy.go index 1a1b7c69c3fe..128f98d6cf41 100644 --- a/pkg/detector/policy.go +++ b/pkg/detector/policy.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -107,7 +108,8 @@ func (d *ResourceDetector) propagateResource(object *unstructured.Unstructured, func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, resourceChangeByKarmada bool, policyNamespace, policyName, claimedID string) error { - policyObject, err := d.propagationPolicyLister.ByNamespace(policyNamespace).Get(policyName) + matchedPropagationPolicy := &policyv1alpha1.PropagationPolicy{} + err := d.Client.Get(context.TODO(), types.NamespacedName{Namespace: policyNamespace, Name: policyName}, matchedPropagationPolicy) if err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("PropagationPolicy(%s/%s) has been removed.", policyNamespace, policyName) @@ -117,12 +119,6 @@ func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, return err } - matchedPropagationPolicy := &policyv1alpha1.PropagationPolicy{} - if err = helper.ConvertToTypedObject(policyObject, matchedPropagationPolicy); err != nil { - klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v", err) - return err - } - // Some resources are available in more than one group in the same kubernetes version. // Therefore, the following scenarios occurs: // In v1.21 kubernetes cluster, Ingress are available in both networking.k8s.io and extensions groups. @@ -145,23 +141,17 @@ func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, resourceChangeByKarmada bool, policyName, policyID string) error { - policyObject, err := d.clusterPropagationPolicyLister.Get(policyName) + matchedClusterPropagationPolicy := &policyv1alpha1.ClusterPropagationPolicy{} + err := d.Client.Get(context.TODO(), types.NamespacedName{Name: policyName}, matchedClusterPropagationPolicy) if err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("ClusterPropagationPolicy(%s) has been removed.", policyName) return d.HandleClusterPropagationPolicyDeletion(policyID) } - klog.Errorf("Failed to get claimed policy(%s),: %v", policyName, err) return err } - matchedClusterPropagationPolicy := &policyv1alpha1.ClusterPropagationPolicy{} - if err = helper.ConvertToTypedObject(policyObject, matchedClusterPropagationPolicy); err != nil { - klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v", err) - return err - } - // Some resources are available in more than one group in the same kubernetes version. // Therefore, the following scenarios occurs: // In v1.21 kubernetes cluster, Ingress are available in both networking.k8s.io and extensions groups. diff --git a/pkg/detector/preemption.go b/pkg/detector/preemption.go index 3b71818a3ed7..afa5a221132f 100755 --- a/pkg/detector/preemption.go +++ b/pkg/detector/preemption.go @@ -24,8 +24,8 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" @@ -118,18 +118,13 @@ func (d *ResourceDetector) preemptPropagationPolicy(resourceTemplate *unstructur return nil } - claimedPolicyObj, err := d.propagationPolicyLister.ByNamespace(claimedPolicyNamespace).Get(claimedPolicyName) + claimedPolicy := &policyv1alpha1.PropagationPolicy{} + err = d.Client.Get(context.TODO(), types.NamespacedName{Namespace: claimedPolicyNamespace, Name: claimedPolicyName}, claimedPolicy) if err != nil { klog.Errorf("Failed to retrieve claimed propagation policy(%s/%s): %v.", claimedPolicyNamespace, claimedPolicyName, err) return err } - claimedPolicy := &policyv1alpha1.PropagationPolicy{} - if err = helper.ConvertToTypedObject(claimedPolicyObj, claimedPolicy); err != nil { - klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v.", err) - return err - } - if policy.ExplicitPriority() <= claimedPolicy.ExplicitPriority() { klog.V(2).Infof("Propagation policy(%s/%s) cannot preempt another propagation policy(%s/%s) due to insufficient priority.", policy.Namespace, policy.Name, claimedPolicyNamespace, claimedPolicyName) @@ -196,18 +191,13 @@ func (d *ResourceDetector) preemptClusterPropagationPolicy(resourceTemplate *uns return nil } - claimedPolicyObj, err := d.clusterPropagationPolicyLister.Get(claimedPolicyName) + claimedPolicy := &policyv1alpha1.ClusterPropagationPolicy{} + err = d.Client.Get(context.TODO(), types.NamespacedName{Name: claimedPolicyName}, claimedPolicy) if err != nil { klog.Errorf("Failed to retrieve claimed cluster propagation policy(%s): %v.", claimedPolicyName, err) return err } - claimedPolicy := &policyv1alpha1.ClusterPropagationPolicy{} - if err = helper.ConvertToTypedObject(claimedPolicyObj, claimedPolicy); err != nil { - klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v.", err) - return err - } - if policy.ExplicitPriority() <= claimedPolicy.ExplicitPriority() { klog.V(2).Infof("Cluster propagation policy(%s) cannot preempt another cluster propagation policy(%s) due to insufficient priority.", policy.Name, claimedPolicyName) @@ -263,12 +253,14 @@ func (d *ResourceDetector) fetchResourceTemplate(rs policyv1alpha1.ResourceSelec // and put the PropagationPolicy in the queue to trigger preemption. func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policyv1alpha1.PropagationPolicy, newPolicy policyv1alpha1.PropagationPolicy) { klog.Infof("PropagationPolicy(%s/%s) priority changed from %d to %d", newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority) - policies, err := d.propagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything()) + + policies := &policyv1alpha1.PropagationPolicyList{} + err := d.Client.List(context.TODO(), policies) if err != nil { klog.Errorf("Failed to list PropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err) return } - if len(policies) == 0 { + if len(policies.Items) == 0 { klog.Infof("No PropagationPolicy to preempt the PropagationPolicy(%s/%s).", newPolicy.GetNamespace(), newPolicy.GetName()) } @@ -276,12 +268,8 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy // higher priority PropagationPolicy be process first to avoid possible // multiple preemption. sortedPotentialKeys := pq.NewWith(priorityDescendingComparator) - for i := range policies { - var potentialPolicy policyv1alpha1.PropagationPolicy - if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil { - klog.Errorf("Failed to convert typed PropagationPolicy: %v", err) - continue - } + for i := range policies.Items { + potentialPolicy := policies.Items[i] // Re-queue the polies that enables preemption and with the priority // in range (new priority, old priority). // For the polices with higher priority than old priority, it can @@ -309,12 +297,14 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy policyv1alpha1.ClusterPropagationPolicy, newPolicy policyv1alpha1.ClusterPropagationPolicy) { klog.Infof("ClusterPropagationPolicy(%s) priority changed from %d to %d", newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority) - policies, err := d.clusterPropagationPolicyLister.List(labels.Everything()) + + policies := &policyv1alpha1.ClusterPropagationPolicyList{} + err := d.Client.List(context.TODO(), policies) if err != nil { klog.Errorf("Failed to list ClusterPropagationPolicy, error: %v", err) return } - if len(policies) == 0 { + if len(policies.Items) == 0 { klog.Infof("No ClusterPropagationPolicy to preempt the ClusterPropagationPolicy(%s).", newPolicy.GetName()) } @@ -322,12 +312,8 @@ func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy // higher priority ClusterPropagationPolicy be process first to avoid possible // multiple preemption. sortedPotentialKeys := pq.NewWith(priorityDescendingComparator) - for i := range policies { - var potentialPolicy policyv1alpha1.ClusterPropagationPolicy - if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil { - klog.Errorf("Failed to convert typed ClusterPropagationPolicy: %v", err) - continue - } + for i := range policies.Items { + potentialPolicy := policies.Items[i] // Re-queue the polies that enables preemption and with the priority // in range (new priority, old priority). // For the polices with higher priority than old priority, it can diff --git a/pkg/detector/preemption_test.go b/pkg/detector/preemption_test.go index bcda63c957c8..0b518595955b 100644 --- a/pkg/detector/preemption_test.go +++ b/pkg/detector/preemption_test.go @@ -24,7 +24,6 @@ import ( appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" dynamicfake "k8s.io/client-go/dynamic/fake" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -79,12 +78,6 @@ func TestHandleDeprioritizedPropagationPolicy(t *testing.T) { utilruntime.Must(v1alpha2.Install(scheme)) utilruntime.Must(policyv1alpha1.Install(scheme)) - propagationPolicyGVR := schema.GroupVersionResource{ - Group: policyv1alpha1.GroupVersion.Group, - Version: policyv1alpha1.GroupVersion.Version, - Resource: policyv1alpha1.ResourcePluralPropagationPolicy, - } - tests := []struct { name string newPolicy *policyv1alpha1.PropagationPolicy @@ -341,10 +334,9 @@ func TestHandleDeprioritizedPropagationPolicy(t *testing.T) { fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(scheme, tt.objects...) genMgr := genericmanager.NewSingleClusterInformerManager(fakeDynamicClient, 0, stopCh) resourceDetector := &ResourceDetector{ - Client: fakeClient, - DynamicClient: fakeDynamicClient, - InformerManager: genMgr, - propagationPolicyLister: genMgr.Lister(propagationPolicyGVR), + Client: fakeClient, + DynamicClient: fakeDynamicClient, + InformerManager: genMgr, } mockWorker := &MockAsyncWorker{} resourceDetector.policyReconcileWorker = mockWorker @@ -368,12 +360,6 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { utilruntime.Must(v1alpha2.Install(scheme)) utilruntime.Must(policyv1alpha1.Install(scheme)) - clusterPropagationPolicyGVR := schema.GroupVersionResource{ - Group: policyv1alpha1.GroupVersion.Group, - Version: policyv1alpha1.GroupVersion.Version, - Resource: policyv1alpha1.ResourcePluralClusterPropagationPolicy, - } - tests := []struct { name string newPolicy *policyv1alpha1.ClusterPropagationPolicy @@ -630,10 +616,9 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(scheme, tt.objects...) genMgr := genericmanager.NewSingleClusterInformerManager(fakeDynamicClient, 0, stopCh) resourceDetector := &ResourceDetector{ - Client: fakeClient, - DynamicClient: fakeDynamicClient, - InformerManager: genMgr, - clusterPropagationPolicyLister: genMgr.Lister(clusterPropagationPolicyGVR), + Client: fakeClient, + DynamicClient: fakeDynamicClient, + InformerManager: genMgr, } mockWorker := &MockAsyncWorker{} resourceDetector.clusterPolicyReconcileWorker = mockWorker