From 2f8e3ea67900f82ad734012ee67fce697bb0574e Mon Sep 17 00:00:00 2001 From: "chang.qiangqiang" Date: Fri, 22 Nov 2024 19:17:21 +0800 Subject: [PATCH] feat(detector): resource detector matched policy potimization Signed-off-by: chang.qiangqiang --- .../app/controllermanager.go | 5 + pkg/detector/compare.go | 8 + pkg/detector/detector.go | 110 ++++++-------- pkg/detector/detector_test.go | 142 ++++-------------- pkg/detector/policy.go | 26 +--- pkg/detector/preemption.go | 34 +---- pkg/detector/preemption_test.go | 98 ++++++------ .../genericmanager/generated-manager.go | 102 +++++++++++++ 8 files changed, 244 insertions(+), 281 deletions(-) create mode 100644 pkg/util/fedinformer/genericmanager/generated-manager.go diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 3429981480b4..52cd86ff23fa 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -71,6 +71,7 @@ import ( "github.com/karmada-io/karmada/pkg/dependenciesdistributor" "github.com/karmada-io/karmada/pkg/detector" "github.com/karmada-io/karmada/pkg/features" + generatedclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" "github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient" "github.com/karmada-io/karmada/pkg/metrics" "github.com/karmada-io/karmada/pkg/resourceinterpreter" @@ -725,6 +726,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop dynamicClientSet := dynamic.NewForConfigOrDie(restConfig) discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig) kubeClientSet := kubeclientset.NewForConfigOrDie(restConfig) + generatedClientSet := generatedclientset.NewForConfigOrDie(restConfig) overrideManager := overridemanager.New(mgr.GetClient(), mgr.GetEventRecorderFor(overridemanager.OverrideManagerName)) skippedResourceConfig := util.NewSkippedResourceConfig() @@ -748,10 +750,13 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter) + generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, opts.ResyncPeriod.Duration, stopChan) + resourceDetector := &detector.ResourceDetector{ DiscoveryClientSet: discoverClientSet, Client: mgr.GetClient(), InformerManager: controlPlaneInformerManager, + GeneratedInformerManager: generatedInformerManager, RESTMapper: mgr.GetRESTMapper(), DynamicClient: dynamicClientSet, SkippedResourceConfig: skippedResourceConfig, diff --git a/pkg/detector/compare.go b/pkg/detector/compare.go index 80c30b239456..f19785dc1e28 100644 --- a/pkg/detector/compare.go +++ b/pkg/detector/compare.go @@ -33,6 +33,10 @@ func getHighestPriorityPropagationPolicy(policies []*policyv1alpha1.PropagationP var matchedPolicy *policyv1alpha1.PropagationPolicy for _, policy := range policies { + if !policy.DeletionTimestamp.IsZero() { + klog.V(4).Infof("Propagation policy(%s/%s) cannot match any resource template because it's being deleted.", policy.Namespace, policy.Name) + continue + } implicitPriority := util.ResourceMatchSelectorsPriority(resource, policy.Spec.ResourceSelectors...) if implicitPriority <= util.PriorityMisMatch { continue @@ -70,6 +74,10 @@ func getHighestPriorityClusterPropagationPolicy(policies []*policyv1alpha1.Clust var matchedClusterPolicy *policyv1alpha1.ClusterPropagationPolicy for _, policy := range policies { + if !policy.DeletionTimestamp.IsZero() { + klog.V(4).Infof("Cluster propagation policy(%s) cannot match any resource template because it's being deleted.", policy.Name) + continue + } implicitPriority := util.ResourceMatchSelectorsPriority(resource, policy.Spec.ResourceSelectors...) if implicitPriority <= util.PriorityMisMatch { continue diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index aacac44ae78e..5a59d17485fd 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -48,6 +48,7 @@ import ( workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/features" + generatedpolicylister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1" "github.com/karmada-io/karmada/pkg/metrics" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" @@ -71,6 +72,7 @@ type ResourceDetector struct { // DynamicClient used to fetch arbitrary resources. DynamicClient dynamic.Interface InformerManager genericmanager.SingleClusterInformerManager + GeneratedInformerManager genericmanager.GeneratedInformerManager EventHandler cache.ResourceEventHandler Processor util.AsyncWorker SkippedResourceConfig *util.SkippedResourceConfig @@ -81,12 +83,12 @@ type ResourceDetector struct { // policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and // a reconcile function to consume the items in queue. policyReconcileWorker util.AsyncWorker - propagationPolicyLister cache.GenericLister + propagationPolicyLister generatedpolicylister.PropagationPolicyLister // clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and // a reconcile function to consume the items in queue. clusterPolicyReconcileWorker util.AsyncWorker - clusterPropagationPolicyLister cache.GenericLister + clusterPropagationPolicyLister generatedpolicylister.ClusterPropagationPolicyLister RESTMapper meta.RESTMapper @@ -132,24 +134,14 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.clusterPolicyReconcileWorker.Run(d.ConcurrentClusterPropagationPolicySyncs, d.stopCh) // watch and enqueue PropagationPolicy changes. - propagationPolicyGVR := schema.GroupVersionResource{ - Group: policyv1alpha1.GroupVersion.Group, - Version: policyv1alpha1.GroupVersion.Version, - Resource: policyv1alpha1.ResourcePluralPropagationPolicy, - } policyHandler := fedinformer.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, nil) - d.InformerManager.ForResource(propagationPolicyGVR, policyHandler) - d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR) + d.GeneratedInformerManager.ForPropagationPolicy(policyHandler) + d.propagationPolicyLister = d.GeneratedInformerManager.PropagationPolicyLister() // watch and enqueue ClusterPropagationPolicy changes. - clusterPropagationPolicyGVR := schema.GroupVersionResource{ - Group: policyv1alpha1.GroupVersion.Group, - Version: policyv1alpha1.GroupVersion.Version, - Resource: policyv1alpha1.ResourcePluralClusterPropagationPolicy, - } clusterPolicyHandler := fedinformer.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, nil) - d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler) - d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR) + d.GeneratedInformerManager.ForClusterPropagationPolicy(clusterPolicyHandler) + d.clusterPropagationPolicyLister = d.GeneratedInformerManager.ClusterPropagationPolicyLister() detectorWorkerOptions := util.Options{ Name: "resource detector", @@ -161,6 +153,8 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.EventHandler = fedinformer.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete) d.Processor = util.NewAsyncWorker(detectorWorkerOptions) d.Processor.Run(d.ConcurrentResourceTemplateSyncs, d.stopCh) + d.GeneratedInformerManager.Start() + d.GeneratedInformerManager.WaitForCacheSync() go d.discoverResources(30 * time.Second) <-d.stopCh @@ -359,7 +353,7 @@ func (d *ResourceDetector) LookForMatchedPolicy(object *unstructured.Unstructure } klog.V(2).Infof("Attempts to match policy for resource(%s)", objectKey) - policyObjects, err := d.propagationPolicyLister.ByNamespace(objectKey.Namespace).List(labels.Everything()) + policyObjects, err := d.propagationPolicyLister.PropagationPolicies(objectKey.Namespace).List(labels.Everything()) if err != nil { klog.Errorf("Failed to list propagation policy: %v", err) return nil, err @@ -369,22 +363,7 @@ func (d *ResourceDetector) LookForMatchedPolicy(object *unstructured.Unstructure return nil, nil } - policyList := make([]*policyv1alpha1.PropagationPolicy, 0) - for index := range policyObjects { - policy := &policyv1alpha1.PropagationPolicy{} - if err = helper.ConvertToTypedObject(policyObjects[index], policy); err != nil { - klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v", err) - return nil, err - } - - if !policy.DeletionTimestamp.IsZero() { - klog.V(4).Infof("Propagation policy(%s/%s) cannot match any resource template because it's being deleted.", policy.Namespace, policy.Name) - continue - } - policyList = append(policyList, policy) - } - - return getHighestPriorityPropagationPolicy(policyList, object, objectKey), nil + return getHighestPriorityPropagationPolicy(policyObjects, object, objectKey), nil } // LookForMatchedClusterPolicy tries to find a ClusterPropagationPolicy for object referenced by object key. @@ -400,22 +379,7 @@ func (d *ResourceDetector) LookForMatchedClusterPolicy(object *unstructured.Unst return nil, nil } - policyList := make([]*policyv1alpha1.ClusterPropagationPolicy, 0) - for index := range policyObjects { - policy := &policyv1alpha1.ClusterPropagationPolicy{} - if err = helper.ConvertToTypedObject(policyObjects[index], policy); err != nil { - klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v", err) - return nil, err - } - - if !policy.DeletionTimestamp.IsZero() { - klog.V(4).Infof("Cluster propagation policy(%s) cannot match any resource template because it's being deleted.", policy.Name) - continue - } - policyList = append(policyList, policy) - } - - return getHighestPriorityClusterPropagationPolicy(policyList, object, objectKey), nil + return getHighestPriorityClusterPropagationPolicy(policyObjects, object, objectKey), nil } // ApplyPolicy starts propagate the object referenced by object key according to PropagationPolicy. @@ -849,12 +813,24 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour // OnPropagationPolicyAdd handles object add event and push the object to queue. func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) { - d.policyReconcileWorker.Enqueue(obj) + policyObj := obj.(*policyv1alpha1.PropagationPolicy) + policyObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: policyv1alpha1.GroupVersion.Group, + Version: policyv1alpha1.GroupVersion.Version, + Kind: policyv1alpha1.ResourceKindPropagationPolicy, + }) + d.policyReconcileWorker.Enqueue(policyObj) } // OnPropagationPolicyUpdate handles object update event and push the object to queue. func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) { - d.policyReconcileWorker.Enqueue(newObj) + policyObj := newObj.(*policyv1alpha1.PropagationPolicy) + policyObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: policyv1alpha1.GroupVersion.Group, + Version: policyv1alpha1.GroupVersion.Version, + Kind: policyv1alpha1.ResourceKindPropagationPolicy, + }) + d.policyReconcileWorker.Enqueue(policyObj) // Temporary solution of corner case: After the priority(.spec.priority) of // PropagationPolicy changed from high priority (e.g. 5) to low priority(e.g. 3), @@ -915,7 +891,7 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { return fmt.Errorf("invalid key") } - unstructuredObj, err := d.propagationPolicyLister.Get(ckey.NamespaceKey()) + propagationObject, err := d.propagationPolicyLister.PropagationPolicies(ckey.Namespace).Get(ckey.Name) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -924,12 +900,6 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { return err } - propagationObject := &policyv1alpha1.PropagationPolicy{} - if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil { - klog.Errorf("Failed to convert PropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err) - return err - } - if !propagationObject.DeletionTimestamp.IsZero() { klog.Infof("PropagationPolicy(%s) is being deleted.", ckey.NamespaceKey()) if err = d.HandlePropagationPolicyDeletion(propagationObject.Labels[policyv1alpha1.PropagationPolicyPermanentIDLabel]); err != nil { @@ -950,12 +920,24 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { // OnClusterPropagationPolicyAdd handles object add event and push the object to queue. func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) { - d.clusterPolicyReconcileWorker.Enqueue(obj) + policyObj := obj.(*policyv1alpha1.ClusterPropagationPolicy) + policyObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: policyv1alpha1.GroupVersion.Group, + Version: policyv1alpha1.GroupVersion.Version, + Kind: policyv1alpha1.ResourceKindClusterPropagationPolicy, + }) + d.clusterPolicyReconcileWorker.Enqueue(policyObj) } // OnClusterPropagationPolicyUpdate handles object update event and push the object to queue. func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) { - d.clusterPolicyReconcileWorker.Enqueue(newObj) + policyObj := newObj.(*policyv1alpha1.ClusterPropagationPolicy) + policyObj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: policyv1alpha1.GroupVersion.Group, + Version: policyv1alpha1.GroupVersion.Version, + Kind: policyv1alpha1.ResourceKindClusterPropagationPolicy, + }) + d.clusterPolicyReconcileWorker.Enqueue(policyObj) // Temporary solution of corner case: After the priority(.spec.priority) of // ClusterPropagationPolicy changed from high priority (e.g. 5) to low priority(e.g. 3), @@ -1016,7 +998,7 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey) return fmt.Errorf("invalid key") } - unstructuredObj, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey()) + propagationObject, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey()) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -1026,12 +1008,6 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey) return err } - propagationObject := &policyv1alpha1.ClusterPropagationPolicy{} - if err = helper.ConvertToTypedObject(unstructuredObj, propagationObject); err != nil { - klog.Errorf("Failed to convert ClusterPropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err) - return err - } - if !propagationObject.DeletionTimestamp.IsZero() { klog.Infof("ClusterPropagationPolicy(%s) is being deleted.", ckey.NamespaceKey()) if err = d.HandleClusterPropagationPolicyDeletion(propagationObject.Labels[policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel]); err != nil { diff --git a/pkg/detector/detector_test.go b/pkg/detector/detector_test.go index 10f77fa06343..a20514f40efb 100644 --- a/pkg/detector/detector_test.go +++ b/pkg/detector/detector_test.go @@ -29,11 +29,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" dynamicfake "k8s.io/client-go/dynamic/fake" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -41,7 +39,9 @@ import ( configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + fakegeneratedclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" ) @@ -677,13 +677,14 @@ func TestLookForMatchedPolicy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { scheme := setupTestScheme() + stopChan := make(chan struct{}) fakeClient := dynamicfake.NewSimpleDynamicClient(scheme) + generatedClientSet := fakegeneratedclientset.NewSimpleClientset() + generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, time.Second, stopChan) d := &ResourceDetector{ - DynamicClient: fakeClient, - propagationPolicyLister: &mockPropagationPolicyLister{ - policies: tt.policies, - }, + DynamicClient: fakeClient, + propagationPolicyLister: generatedInformerManager.PropagationPolicyLister(), } objectKey := keys.ClusterWideKey{ @@ -691,6 +692,14 @@ func TestLookForMatchedPolicy(t *testing.T) { Namespace: tt.object.GetNamespace(), Kind: tt.object.GetKind(), } + for _, object := range tt.policies { + _, err := generatedClientSet.PolicyV1alpha1().PropagationPolicies(object.Namespace).Create(context.TODO(), object, metav1.CreateOptions{}) + if err != nil { + t.Errorf("Create PropagationPolicy failed: %v", err) + } + } + generatedInformerManager.Start() + generatedInformerManager.WaitForCacheSync() policy, err := d.LookForMatchedPolicy(tt.object, objectKey) @@ -766,13 +775,14 @@ func TestLookForMatchedClusterPolicy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { scheme := setupTestScheme() + stopChan := make(chan struct{}) fakeClient := dynamicfake.NewSimpleDynamicClient(scheme) + generatedClientSet := fakegeneratedclientset.NewSimpleClientset() + generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, time.Second, stopChan) d := &ResourceDetector{ - DynamicClient: fakeClient, - clusterPropagationPolicyLister: &mockClusterPropagationPolicyLister{ - policies: tt.policies, - }, + DynamicClient: fakeClient, + clusterPropagationPolicyLister: generatedInformerManager.ClusterPropagationPolicyLister(), } objectKey := keys.ClusterWideKey{ @@ -780,6 +790,14 @@ func TestLookForMatchedClusterPolicy(t *testing.T) { Namespace: tt.object.GetNamespace(), Kind: tt.object.GetKind(), } + for _, object := range tt.policies { + _, err := generatedClientSet.PolicyV1alpha1().ClusterPropagationPolicies().Create(context.TODO(), object, metav1.CreateOptions{}) + if err != nil { + t.Errorf("Create PropagationPolicy failed: %v", err) + } + } + generatedInformerManager.Start() + generatedInformerManager.WaitForCacheSync() policy, err := d.LookForMatchedClusterPolicy(tt.object, objectKey) @@ -1133,110 +1151,6 @@ func (m *mockResourceDetector) BuildClusterResourceBinding(object *unstructured. return &workv1alpha2.ClusterResourceBinding{}, nil } -// mockPropagationPolicyLister is a mock implementation of the PropagationPolicyLister -type mockPropagationPolicyLister struct { - policies []*policyv1alpha1.PropagationPolicy -} - -func (m *mockPropagationPolicyLister) List(_ labels.Selector) (ret []runtime.Object, err error) { - var result []runtime.Object - for _, p := range m.policies { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - result = append(result, &unstructured.Unstructured{Object: u}) - } - return result, nil -} - -func (m *mockPropagationPolicyLister) Get(name string) (runtime.Object, error) { - for _, p := range m.policies { - if p.Name == name { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - return &unstructured.Unstructured{Object: u}, nil - } - } - return nil, nil -} - -func (m *mockPropagationPolicyLister) ByNamespace(namespace string) cache.GenericNamespaceLister { - return &mockGenericNamespaceLister{ - policies: m.policies, - namespace: namespace, - } -} - -// mockGenericNamespaceLister is a mock implementation of cache.GenericNamespaceLister -type mockGenericNamespaceLister struct { - policies []*policyv1alpha1.PropagationPolicy - namespace string -} - -func (m *mockGenericNamespaceLister) List(_ labels.Selector) (ret []runtime.Object, err error) { - var result []runtime.Object - for _, p := range m.policies { - if p.Namespace == m.namespace { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - result = append(result, &unstructured.Unstructured{Object: u}) - } - } - return result, nil -} - -func (m *mockGenericNamespaceLister) Get(name string) (runtime.Object, error) { - for _, p := range m.policies { - if p.Name == name && p.Namespace == m.namespace { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - return &unstructured.Unstructured{Object: u}, nil - } - } - return nil, nil -} - -// mockClusterPropagationPolicyLister is a mock implementation of the ClusterPropagationPolicyLister -type mockClusterPropagationPolicyLister struct { - policies []*policyv1alpha1.ClusterPropagationPolicy -} - -func (m *mockClusterPropagationPolicyLister) List(_ labels.Selector) (ret []runtime.Object, err error) { - var result []runtime.Object - for _, p := range m.policies { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - result = append(result, &unstructured.Unstructured{Object: u}) - } - return result, nil -} - -func (m *mockClusterPropagationPolicyLister) Get(name string) (runtime.Object, error) { - for _, p := range m.policies { - if p.Name == name { - u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p) - if err != nil { - return nil, err - } - return &unstructured.Unstructured{Object: u}, nil - } - } - return nil, nil -} - -func (m *mockClusterPropagationPolicyLister) ByNamespace(_ string) cache.GenericNamespaceLister { - return nil // ClusterPropagationPolicies are not namespaced -} - // mockResourceInterpreter is a mock implementation of the ResourceInterpreter interface type mockResourceInterpreter struct{} diff --git a/pkg/detector/policy.go b/pkg/detector/policy.go index 1a1b7c69c3fe..b94c9cdee5f4 100644 --- a/pkg/detector/policy.go +++ b/pkg/detector/policy.go @@ -107,7 +107,7 @@ func (d *ResourceDetector) propagateResource(object *unstructured.Unstructured, func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, resourceChangeByKarmada bool, policyNamespace, policyName, claimedID string) error { - policyObject, err := d.propagationPolicyLister.ByNamespace(policyNamespace).Get(policyName) + policyObject, err := d.propagationPolicyLister.PropagationPolicies(policyNamespace).Get(policyName) if err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("PropagationPolicy(%s/%s) has been removed.", policyNamespace, policyName) @@ -117,12 +117,6 @@ func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, return err } - matchedPropagationPolicy := &policyv1alpha1.PropagationPolicy{} - if err = helper.ConvertToTypedObject(policyObject, matchedPropagationPolicy); err != nil { - klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v", err) - return err - } - // Some resources are available in more than one group in the same kubernetes version. // Therefore, the following scenarios occurs: // In v1.21 kubernetes cluster, Ingress are available in both networking.k8s.io and extensions groups. @@ -130,17 +124,17 @@ func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, // to the member clusters, the detector will listen two resource creation events: // Ingress(networking.k8s.io/v1) and Ingress(extensions/v1beta1). In order to prevent // Ingress(extensions/v1beta1) from being propagated, we need to ignore it. - if !util.ResourceMatchSelectors(object, matchedPropagationPolicy.Spec.ResourceSelectors...) { + if !util.ResourceMatchSelectors(object, policyObject.Spec.ResourceSelectors...) { return nil } // return err when dependents not present, that we can retry at next reconcile. - if present, err := helper.IsDependentOverridesPresent(d.Client, matchedPropagationPolicy); err != nil || !present { + if present, err := helper.IsDependentOverridesPresent(d.Client, policyObject); err != nil || !present { klog.Infof("Waiting for dependent overrides present for policy(%s/%s)", policyNamespace, policyName) return fmt.Errorf("waiting for dependent overrides") } - return d.ApplyPolicy(object, objectKey, resourceChangeByKarmada, matchedPropagationPolicy) + return d.ApplyPolicy(object, objectKey, resourceChangeByKarmada, policyObject) } func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, @@ -156,12 +150,6 @@ func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstruc return err } - matchedClusterPropagationPolicy := &policyv1alpha1.ClusterPropagationPolicy{} - if err = helper.ConvertToTypedObject(policyObject, matchedClusterPropagationPolicy); err != nil { - klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v", err) - return err - } - // Some resources are available in more than one group in the same kubernetes version. // Therefore, the following scenarios occurs: // In v1.21 kubernetes cluster, Ingress are available in both networking.k8s.io and extensions groups. @@ -169,17 +157,17 @@ func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstruc // propagate it to the member clusters, the detector will listen two resource creation events: // Ingress(networking.k8s.io/v1) and Ingress(extensions/v1beta1). In order to prevent // Ingress(extensions/v1beta1) from being propagated, we need to ignore it. - if !util.ResourceMatchSelectors(object, matchedClusterPropagationPolicy.Spec.ResourceSelectors...) { + if !util.ResourceMatchSelectors(object, policyObject.Spec.ResourceSelectors...) { return nil } // return err when dependents not present, that we can retry at next reconcile. - if present, err := helper.IsDependentClusterOverridesPresent(d.Client, matchedClusterPropagationPolicy); err != nil || !present { + if present, err := helper.IsDependentClusterOverridesPresent(d.Client, policyObject); err != nil || !present { klog.Infof("Waiting for dependent overrides present for policy(%s)", policyName) return fmt.Errorf("waiting for dependent overrides") } - return d.ApplyClusterPolicy(object, objectKey, resourceChangeByKarmada, matchedClusterPropagationPolicy) + return d.ApplyClusterPolicy(object, objectKey, resourceChangeByKarmada, policyObject) } func (d *ResourceDetector) cleanPPUnmatchedRBs(policyID, policyNamespace, policyName string, selectors []policyv1alpha1.ResourceSelector) error { diff --git a/pkg/detector/preemption.go b/pkg/detector/preemption.go index 3b71818a3ed7..99892bc7d546 100755 --- a/pkg/detector/preemption.go +++ b/pkg/detector/preemption.go @@ -118,18 +118,12 @@ func (d *ResourceDetector) preemptPropagationPolicy(resourceTemplate *unstructur return nil } - claimedPolicyObj, err := d.propagationPolicyLister.ByNamespace(claimedPolicyNamespace).Get(claimedPolicyName) + claimedPolicy, err := d.propagationPolicyLister.PropagationPolicies(claimedPolicyNamespace).Get(claimedPolicyName) if err != nil { klog.Errorf("Failed to retrieve claimed propagation policy(%s/%s): %v.", claimedPolicyNamespace, claimedPolicyName, err) return err } - claimedPolicy := &policyv1alpha1.PropagationPolicy{} - if err = helper.ConvertToTypedObject(claimedPolicyObj, claimedPolicy); err != nil { - klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v.", err) - return err - } - if policy.ExplicitPriority() <= claimedPolicy.ExplicitPriority() { klog.V(2).Infof("Propagation policy(%s/%s) cannot preempt another propagation policy(%s/%s) due to insufficient priority.", policy.Namespace, policy.Name, claimedPolicyNamespace, claimedPolicyName) @@ -196,18 +190,12 @@ func (d *ResourceDetector) preemptClusterPropagationPolicy(resourceTemplate *uns return nil } - claimedPolicyObj, err := d.clusterPropagationPolicyLister.Get(claimedPolicyName) + claimedPolicy, err := d.clusterPropagationPolicyLister.Get(claimedPolicyName) if err != nil { klog.Errorf("Failed to retrieve claimed cluster propagation policy(%s): %v.", claimedPolicyName, err) return err } - claimedPolicy := &policyv1alpha1.ClusterPropagationPolicy{} - if err = helper.ConvertToTypedObject(claimedPolicyObj, claimedPolicy); err != nil { - klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v.", err) - return err - } - if policy.ExplicitPriority() <= claimedPolicy.ExplicitPriority() { klog.V(2).Infof("Cluster propagation policy(%s) cannot preempt another cluster propagation policy(%s) due to insufficient priority.", policy.Name, claimedPolicyName) @@ -263,7 +251,7 @@ func (d *ResourceDetector) fetchResourceTemplate(rs policyv1alpha1.ResourceSelec // and put the PropagationPolicy in the queue to trigger preemption. func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policyv1alpha1.PropagationPolicy, newPolicy policyv1alpha1.PropagationPolicy) { klog.Infof("PropagationPolicy(%s/%s) priority changed from %d to %d", newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority) - policies, err := d.propagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything()) + policies, err := d.propagationPolicyLister.PropagationPolicies(newPolicy.GetNamespace()).List(labels.Everything()) if err != nil { klog.Errorf("Failed to list PropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err) return @@ -277,11 +265,7 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy // multiple preemption. sortedPotentialKeys := pq.NewWith(priorityDescendingComparator) for i := range policies { - var potentialPolicy policyv1alpha1.PropagationPolicy - if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil { - klog.Errorf("Failed to convert typed PropagationPolicy: %v", err) - continue - } + potentialPolicy := policies[i] // Re-queue the polies that enables preemption and with the priority // in range (new priority, old priority). // For the polices with higher priority than old priority, it can @@ -294,7 +278,7 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() { klog.Infof("Enqueuing PropagationPolicy(%s/%s) in case of PropagationPolicy(%s/%s) priority changes.", potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName()) sortedPotentialKeys.Enqueue(&PriorityKey{ - Object: &potentialPolicy, + Object: potentialPolicy, Priority: potentialPolicy.ExplicitPriority(), }) } @@ -323,11 +307,7 @@ func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy // multiple preemption. sortedPotentialKeys := pq.NewWith(priorityDescendingComparator) for i := range policies { - var potentialPolicy policyv1alpha1.ClusterPropagationPolicy - if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil { - klog.Errorf("Failed to convert typed ClusterPropagationPolicy: %v", err) - continue - } + potentialPolicy := policies[i] // Re-queue the polies that enables preemption and with the priority // in range (new priority, old priority). // For the polices with higher priority than old priority, it can @@ -341,7 +321,7 @@ func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy klog.Infof("Enqueuing ClusterPropagationPolicy(%s) in case of ClusterPropagationPolicy(%s) priority changes.", potentialPolicy.GetName(), newPolicy.GetName()) sortedPotentialKeys.Enqueue(&PriorityKey{ - Object: &potentialPolicy, + Object: potentialPolicy, Priority: potentialPolicy.ExplicitPriority(), }) } diff --git a/pkg/detector/preemption_test.go b/pkg/detector/preemption_test.go index bcda63c957c8..9d9656647c66 100644 --- a/pkg/detector/preemption_test.go +++ b/pkg/detector/preemption_test.go @@ -17,6 +17,7 @@ limitations under the License. package detector import ( + "context" "fmt" "testing" "time" @@ -24,9 +25,7 @@ import ( appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - dynamicfake "k8s.io/client-go/dynamic/fake" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -34,6 +33,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + fakegeneratedclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" ) @@ -79,17 +79,11 @@ func TestHandleDeprioritizedPropagationPolicy(t *testing.T) { utilruntime.Must(v1alpha2.Install(scheme)) utilruntime.Must(policyv1alpha1.Install(scheme)) - propagationPolicyGVR := schema.GroupVersionResource{ - Group: policyv1alpha1.GroupVersion.Group, - Version: policyv1alpha1.GroupVersion.Version, - Resource: policyv1alpha1.ResourcePluralPropagationPolicy, - } - tests := []struct { name string newPolicy *policyv1alpha1.PropagationPolicy oldPolicy *policyv1alpha1.PropagationPolicy - objects []runtime.Object + objects []*policyv1alpha1.PropagationPolicy setupClient func() *fake.ClientBuilder wantQueueSize int }{ @@ -123,8 +117,8 @@ func TestHandleDeprioritizedPropagationPolicy(t *testing.T) { }, }, }, - objects: []runtime.Object{ - &policyv1alpha1.PropagationPolicy{ + objects: []*policyv1alpha1.PropagationPolicy{ + { ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: "test", @@ -202,8 +196,8 @@ func TestHandleDeprioritizedPropagationPolicy(t *testing.T) { }, }, }, - objects: []runtime.Object{ - &policyv1alpha1.PropagationPolicy{ + objects: []*policyv1alpha1.PropagationPolicy{ + { ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: "test", @@ -224,7 +218,7 @@ func TestHandleDeprioritizedPropagationPolicy(t *testing.T) { }, }, }, - &policyv1alpha1.PropagationPolicy{ + { ObjectMeta: metav1.ObjectMeta{ Name: "foo-2", Namespace: "test", @@ -336,20 +330,24 @@ func TestHandleDeprioritizedPropagationPolicy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { fakeClient := tt.setupClient().Build() + generatedClientSet := fakegeneratedclientset.NewSimpleClientset() stopCh := make(chan struct{}) defer close(stopCh) - fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(scheme, tt.objects...) - genMgr := genericmanager.NewSingleClusterInformerManager(fakeDynamicClient, 0, stopCh) + generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, time.Second, stopCh) resourceDetector := &ResourceDetector{ Client: fakeClient, - DynamicClient: fakeDynamicClient, - InformerManager: genMgr, - propagationPolicyLister: genMgr.Lister(propagationPolicyGVR), + propagationPolicyLister: generatedInformerManager.PropagationPolicyLister(), + } + for _, policyObj := range tt.objects { + _, err := generatedClientSet.PolicyV1alpha1().PropagationPolicies(policyObj.Namespace).Create(context.TODO(), policyObj, metav1.CreateOptions{}) + if err != nil { + t.Errorf("Failed to create propagationPolicy (%s/%s), err:%v", policyObj.Namespace, policyObj.Name, err) + } } mockWorker := &MockAsyncWorker{} resourceDetector.policyReconcileWorker = mockWorker - resourceDetector.InformerManager.Start() - resourceDetector.InformerManager.WaitForCacheSync() + generatedInformerManager.Start() + generatedInformerManager.WaitForCacheSync() resourceDetector.HandleDeprioritizedPropagationPolicy(*tt.oldPolicy, *tt.newPolicy) @@ -368,24 +366,18 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { utilruntime.Must(v1alpha2.Install(scheme)) utilruntime.Must(policyv1alpha1.Install(scheme)) - clusterPropagationPolicyGVR := schema.GroupVersionResource{ - Group: policyv1alpha1.GroupVersion.Group, - Version: policyv1alpha1.GroupVersion.Version, - Resource: policyv1alpha1.ResourcePluralClusterPropagationPolicy, - } - tests := []struct { name string newPolicy *policyv1alpha1.ClusterPropagationPolicy oldPolicy *policyv1alpha1.ClusterPropagationPolicy - objects []runtime.Object + objects []*policyv1alpha1.ClusterPropagationPolicy setupClient func() *fake.ClientBuilder wantQueueSize int }{ { name: "preempt deprioritized cluster propagation policy of len 1", newPolicy: &policyv1alpha1.ClusterPropagationPolicy{ - ObjectMeta: metav1.ObjectMeta{Name: "app", Namespace: "test"}, + ObjectMeta: metav1.ObjectMeta{Name: "app"}, Spec: policyv1alpha1.PropagationSpec{ Priority: ptr.To[int32](2), ResourceSelectors: []policyv1alpha1.ResourceSelector{ @@ -399,7 +391,7 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { }, }, oldPolicy: &policyv1alpha1.ClusterPropagationPolicy{ - ObjectMeta: metav1.ObjectMeta{Name: "app", Namespace: "test"}, + ObjectMeta: metav1.ObjectMeta{Name: "app"}, Spec: policyv1alpha1.PropagationSpec{ Priority: ptr.To[int32](4), ResourceSelectors: []policyv1alpha1.ResourceSelector{ @@ -412,11 +404,10 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { }, }, }, - objects: []runtime.Object{ + objects: []*policyv1alpha1.ClusterPropagationPolicy{ &policyv1alpha1.ClusterPropagationPolicy{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "bar", + Name: "foo", Labels: map[string]string{ policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: "policy-1", }, @@ -438,8 +429,7 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { setupClient: func() *fake.ClientBuilder { obj := &policyv1alpha1.ClusterPropagationPolicy{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "bar", + Name: "foo", Labels: map[string]string{ policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: "policy-1", }, @@ -464,7 +454,7 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { { name: "preempt deprioritized cluster propagation policy of len 2", newPolicy: &policyv1alpha1.ClusterPropagationPolicy{ - ObjectMeta: metav1.ObjectMeta{Name: "app", Namespace: "test"}, + ObjectMeta: metav1.ObjectMeta{Name: "app"}, Spec: policyv1alpha1.PropagationSpec{ Priority: ptr.To[int32](2), ResourceSelectors: []policyv1alpha1.ResourceSelector{ @@ -491,11 +481,10 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { }, }, }, - objects: []runtime.Object{ - &policyv1alpha1.ClusterPropagationPolicy{ + objects: []*policyv1alpha1.ClusterPropagationPolicy{ + { ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "bar", + Name: "foo", Labels: map[string]string{ policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: "policy-1", }, @@ -513,10 +502,9 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { }, }, }, - &policyv1alpha1.ClusterPropagationPolicy{ + { ObjectMeta: metav1.ObjectMeta{ - Name: "foo-2", - Namespace: "bar-2", + Name: "foo-2", Labels: map[string]string{ policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: "policy-2", }, @@ -539,8 +527,7 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { obj := []client.Object{ &policyv1alpha1.ClusterPropagationPolicy{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "bar", + Name: "foo", Labels: map[string]string{ policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: "policy-1", }, @@ -560,8 +547,7 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { }, &policyv1alpha1.ClusterPropagationPolicy{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo-2", - Namespace: "bar-2", + Name: "foo-2", Labels: map[string]string{ policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: "policy-2", }, @@ -625,20 +611,24 @@ func TestHandleDeprioritizedClusterPropagationPolicy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { fakeClient := tt.setupClient().Build() + generatedClientSet := fakegeneratedclientset.NewSimpleClientset() stopCh := make(chan struct{}) defer close(stopCh) - fakeDynamicClient := dynamicfake.NewSimpleDynamicClient(scheme, tt.objects...) - genMgr := genericmanager.NewSingleClusterInformerManager(fakeDynamicClient, 0, stopCh) + generatedInformerManager := genericmanager.NewGeneratedInformerManager(generatedClientSet, time.Second, stopCh) + for _, policyObj := range tt.objects { + _, err := generatedClientSet.PolicyV1alpha1().ClusterPropagationPolicies().Create(context.TODO(), policyObj, metav1.CreateOptions{}) + if err != nil { + t.Errorf("Failed to create clusterPropagationPolicy (%s), err: %v", policyObj.Name, err) + } + } resourceDetector := &ResourceDetector{ Client: fakeClient, - DynamicClient: fakeDynamicClient, - InformerManager: genMgr, - clusterPropagationPolicyLister: genMgr.Lister(clusterPropagationPolicyGVR), + clusterPropagationPolicyLister: generatedInformerManager.ClusterPropagationPolicyLister(), } mockWorker := &MockAsyncWorker{} resourceDetector.clusterPolicyReconcileWorker = mockWorker - resourceDetector.InformerManager.Start() - resourceDetector.InformerManager.WaitForCacheSync() + generatedInformerManager.Start() + generatedInformerManager.WaitForCacheSync() resourceDetector.HandleDeprioritizedClusterPropagationPolicy(*tt.oldPolicy, *tt.newPolicy) diff --git a/pkg/util/fedinformer/genericmanager/generated-manager.go b/pkg/util/fedinformer/genericmanager/generated-manager.go new file mode 100644 index 000000000000..c84aefdcd207 --- /dev/null +++ b/pkg/util/fedinformer/genericmanager/generated-manager.go @@ -0,0 +1,102 @@ +package genericmanager + +import ( + "context" + "time" + + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + generatedclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" + generatedinformer "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" + generatedpolicylister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" +) + +// GeneratedInformerManager manages karmada shared informer for propagationPolicy and clusterPropagationPolicy. +type GeneratedInformerManager interface { + // ForPropagationPolicy builds a propagationPolicy informer,the event handler will be appended to the informer. + ForPropagationPolicy(handler cache.ResourceEventHandler) + // ForClusterPropagationPolicy builds a clusterPropagationPolicy informer,the event handler will be appended to the informer. + ForClusterPropagationPolicy(handler cache.ResourceEventHandler) + // PropagationPolicyLister builds a clusterPropagationPolicy lister. + PropagationPolicyLister() generatedpolicylister.PropagationPolicyLister + // ClusterPropagationPolicyLister builds a clusterPropagationPolicy lister. + ClusterPropagationPolicyLister() generatedpolicylister.ClusterPropagationPolicyLister + // Start will run all informers, the informers will keep running until the channel closed. + Start() + // Stop stops all informers. Once it is stopped, it will be not able + // to Start again. + Stop() + // WaitForCacheSync waits for all caches to populate. + WaitForCacheSync() + // Context returns the single cluster context. + Context() context.Context + // GetClient returns the karmada clientset client. + GetClient() generatedclientset.Interface +} + +// NewGeneratedInformerManager constructs a new instance of generatedInformerManagerImpl. +// defaultResync with value '0' means no re-sync. +func NewGeneratedInformerManager(client generatedclientset.Interface, defaultResync time.Duration, parentCh <-chan struct{}) GeneratedInformerManager { + ctx, cancel := util.ContextForChannel(parentCh) + return &generatedInformerManagerImpl{ + generatedInformer: generatedinformer.NewSharedInformerFactory(client, defaultResync), + ctx: ctx, + cancel: cancel, + client: client, + } +} + +type generatedInformerManagerImpl struct { + ctx context.Context + cancel context.CancelFunc + + generatedInformer generatedinformer.SharedInformerFactory + + client generatedclientset.Interface +} + +func (g *generatedInformerManagerImpl) ForPropagationPolicy(handler cache.ResourceEventHandler) { + _, err := g.generatedInformer.Policy().V1alpha1().PropagationPolicies().Informer().AddEventHandler(handler) + if err != nil { + klog.Errorf("Failed to add handler for propagationPolicy: %v", err) + return + } +} + +func (g *generatedInformerManagerImpl) ForClusterPropagationPolicy(handler cache.ResourceEventHandler) { + _, err := g.generatedInformer.Policy().V1alpha1().ClusterPropagationPolicies().Informer().AddEventHandler(handler) + if err != nil { + klog.Errorf("Failed to add handler for clusterPropagationPolicy: %v", err) + return + } +} + +func (g *generatedInformerManagerImpl) PropagationPolicyLister() generatedpolicylister.PropagationPolicyLister { + return g.generatedInformer.Policy().V1alpha1().PropagationPolicies().Lister() +} + +func (g *generatedInformerManagerImpl) ClusterPropagationPolicyLister() generatedpolicylister.ClusterPropagationPolicyLister { + return g.generatedInformer.Policy().V1alpha1().ClusterPropagationPolicies().Lister() +} + +func (g *generatedInformerManagerImpl) Start() { + g.generatedInformer.Start(g.ctx.Done()) +} + +func (g *generatedInformerManagerImpl) Stop() { + g.cancel() +} + +func (g *generatedInformerManagerImpl) WaitForCacheSync() { + g.generatedInformer.WaitForCacheSync(g.ctx.Done()) +} + +func (g *generatedInformerManagerImpl) Context() context.Context { + return g.ctx +} + +func (g *generatedInformerManagerImpl) GetClient() generatedclientset.Interface { + return g.client +}