From 15f425babe45ad977c457669693bcd4ebd05c73f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wenkai=20Yin=28=E5=B0=B9=E6=96=87=E5=BC=80=29?= Date: Wed, 17 Jan 2024 16:54:44 +0800 Subject: [PATCH] Create informer per resources to avoid huge memory consumption MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create informer per resources to avoid huge memory consumption Fixes #7323 Signed-off-by: Wenkai Yin(尹文开) --- pkg/client/dynamic.go | 8 ++-- pkg/restore/restore.go | 85 ++++++++++++------------------------- pkg/restore/restore_test.go | 1 + pkg/test/fake_dynamic.go | 4 +- 4 files changed, 35 insertions(+), 63 deletions(-) diff --git a/pkg/client/dynamic.go b/pkg/client/dynamic.go index 0e9655b11be..705c28aaa65 100644 --- a/pkg/client/dynamic.go +++ b/pkg/client/dynamic.go @@ -35,8 +35,8 @@ type DynamicFactory interface { // ClientForGroupVersionResource returns a Dynamic client for the given group/version // and resource for the given namespace. ClientForGroupVersionResource(gv schema.GroupVersion, resource metav1.APIResource, namespace string) (Dynamic, error) - // DynamicSharedInformerFactoryForNamespace returns a DynamicSharedInformerFactory for the given namespace. - DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory + // DynamicSharedInformerFactory returns a DynamicSharedInformerFactory. + DynamicSharedInformerFactory() dynamicinformer.DynamicSharedInformerFactory } // dynamicFactory implements DynamicFactory. @@ -55,8 +55,8 @@ func (f *dynamicFactory) ClientForGroupVersionResource(gv schema.GroupVersion, r }, nil } -func (f *dynamicFactory) DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory { - return dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.dynamicClient, time.Minute, namespace, nil) +func (f *dynamicFactory) DynamicSharedInformerFactory() dynamicinformer.DynamicSharedInformerFactory { + return dynamicinformer.NewDynamicSharedInformerFactory(f.dynamicClient, time.Minute) } // Creator creates an object. diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index e59d14be951..1a341d1e7b0 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -45,7 +45,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" crclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -309,8 +308,6 @@ func (kr *kubernetesRestorer) RestoreWithResolvers( resourceTerminatingTimeout: kr.resourceTerminatingTimeout, resourceTimeout: kr.resourceTimeout, resourceClients: make(map[resourceClientKey]client.Dynamic), - dynamicInformerFactories: make(map[string]*informerFactoryWithContext), - resourceInformers: make(map[resourceClientKey]informers.GenericInformer), restoredItems: req.RestoredItems, renamedPVs: make(map[string]string), pvRenamer: kr.pvRenamer, @@ -362,8 +359,7 @@ type restoreContext struct { resourceTerminatingTimeout time.Duration resourceTimeout time.Duration resourceClients map[resourceClientKey]client.Dynamic - dynamicInformerFactories map[string]*informerFactoryWithContext - resourceInformers map[resourceClientKey]informers.GenericInformer + dynamicInformerFactory *informerFactoryWithContext restoredItems map[itemKey]restoredItemStatus renamedPVs map[string]string pvRenamer func(string) (string, error) @@ -447,11 +443,16 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { // Need to stop all informers if enabled if !ctx.disableInformerCache { + context, cancel := signal.NotifyContext(go_context.Background(), os.Interrupt) + ctx.dynamicInformerFactory = &informerFactoryWithContext{ + factory: ctx.dynamicFactory.DynamicSharedInformerFactory(), + context: context, + cancel: cancel, + } + defer func() { // Call the cancel func to close the channel for each started informer - for _, factory := range ctx.dynamicInformerFactories { - factory.cancel() - } + ctx.dynamicInformerFactory.cancel() // After upgrading to client-go 0.27 or newer, also call Shutdown for each informer factory }() } @@ -579,28 +580,24 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { // initialize informer caches for selected resources if enabled if !ctx.disableInformerCache { - // CRD informer will have already been initialized if any CRDs were created, - // but already-initialized informers aren't re-initialized because getGenericInformer - // looks for an existing one first. - factoriesToStart := make(map[string]*informerFactoryWithContext) for _, informerResource := range selectedResourceCollection { - gr := schema.ParseGroupResource(informerResource.resource) + if informerResource.totalItems == 0 { + continue + } + version := "" for _, items := range informerResource.selectedItemsByNamespace { - // don't use ns key since it represents original ns, not mapped ns if len(items) == 0 { continue } - // use the first item in the list to initialize the informer. The rest of the list - // should share the same gvr and namespace - _, factory := ctx.getGenericInformerInternal(gr, items[0].version, items[0].targetNamespace) - if factory != nil { - factoriesToStart[items[0].targetNamespace] = factory - } + version = items[0].version + break } + gvr := schema.ParseGroupResource(informerResource.resource).WithVersion(version) + ctx.dynamicInformerFactory.factory.ForResource(gvr) } - for _, factoryWithContext := range factoriesToStart { - factoryWithContext.factory.WaitForCacheSync(factoryWithContext.context.Done()) - } + ctx.dynamicInformerFactory.factory.Start(ctx.dynamicInformerFactory.context.Done()) + ctx.log.Info("waiting informer cache sync ...") + ctx.dynamicInformerFactory.factory.WaitForCacheSync(ctx.dynamicInformerFactory.context.Done()) } // reset processedItems and totalItems before processing full resource list @@ -1061,42 +1058,15 @@ func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource, return client, nil } -// if new informer is created, non-nil factory is returned -func (ctx *restoreContext) getGenericInformerInternal(groupResource schema.GroupResource, version, namespace string) (informers.GenericInformer, *informerFactoryWithContext) { - var returnFactory *informerFactoryWithContext - - key := getResourceClientKey(groupResource, version, namespace) - factoryWithContext, ok := ctx.dynamicInformerFactories[key.namespace] - if !ok { - factory := ctx.dynamicFactory.DynamicSharedInformerFactoryForNamespace(namespace) - informerContext, informerCancel := signal.NotifyContext(go_context.Background(), os.Interrupt) - factoryWithContext = &informerFactoryWithContext{ - factory: factory, - context: informerContext, - cancel: informerCancel, - } - ctx.dynamicInformerFactories[key.namespace] = factoryWithContext - } - informer, ok := ctx.resourceInformers[key] - if !ok { - ctx.log.Infof("[debug] Creating factory for %s in namespace %s", key.resource, key.namespace) - informer = factoryWithContext.factory.ForResource(key.resource) - factoryWithContext.factory.Start(factoryWithContext.context.Done()) - ctx.resourceInformers[key] = informer - returnFactory = factoryWithContext +func (ctx *restoreContext) getResourceLister(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) cache.GenericNamespaceLister { + informer := ctx.dynamicInformerFactory.factory.ForResource(groupResource.WithVersion(obj.GroupVersionKind().Version)) + // if the restore contains CRDs or the RIA returns new resources, need to make sure the corresponding informers are synced + if !informer.Informer().HasSynced() { + ctx.dynamicInformerFactory.factory.Start(ctx.dynamicInformerFactory.context.Done()) + ctx.log.Infof("waiting informer cache sync for %s, %s/%s ...", groupResource, namespace, obj.GetName()) + ctx.dynamicInformerFactory.factory.WaitForCacheSync(ctx.dynamicInformerFactory.context.Done()) } - return informer, returnFactory -} -func (ctx *restoreContext) getGenericInformer(groupResource schema.GroupResource, version, namespace string) informers.GenericInformer { - informer, factoryWithContext := ctx.getGenericInformerInternal(groupResource, version, namespace) - if factoryWithContext != nil { - factoryWithContext.factory.WaitForCacheSync(factoryWithContext.context.Done()) - } - return informer -} -func (ctx *restoreContext) getResourceLister(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) cache.GenericNamespaceLister { - informer := ctx.getGenericInformer(groupResource, obj.GroupVersionKind().Version, namespace) if namespace == "" { return informer.Lister() } else { @@ -1123,6 +1093,7 @@ func (ctx *restoreContext) getResource(groupResource schema.GroupResource, obj * ctx.log.WithError(errors.WithStack(fmt.Errorf("expected *unstructured.Unstructured but got %T", u))).Error("unable to understand entry returned from client") return nil, fmt.Errorf("expected *unstructured.Unstructured but got %T", u) } + ctx.log.Debugf("get %s, %s/%s from informer cache", groupResource, namespace, name) return u, nil } diff --git a/pkg/restore/restore_test.go b/pkg/restore/restore_test.go index 19022bb7055..b80e621911a 100644 --- a/pkg/restore/restore_test.go +++ b/pkg/restore/restore_test.go @@ -394,6 +394,7 @@ func TestRestoreResourceFiltering(t *testing.T) { test.Deployments(): {"ns-1/deploy-1"}, }, }, + { name: "IncludeClusterResources=false only restores namespaced resources", restore: defaultRestore().IncludeClusterResources(false).Result(), diff --git a/pkg/test/fake_dynamic.go b/pkg/test/fake_dynamic.go index d184758a98d..1007ecdf366 100644 --- a/pkg/test/fake_dynamic.go +++ b/pkg/test/fake_dynamic.go @@ -38,8 +38,8 @@ func (df *FakeDynamicFactory) ClientForGroupVersionResource(gv schema.GroupVersi return args.Get(0).(client.Dynamic), args.Error(1) } -func (df *FakeDynamicFactory) DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory { - args := df.Called(namespace) +func (df *FakeDynamicFactory) DynamicSharedInformerFactory() dynamicinformer.DynamicSharedInformerFactory { + args := df.Called() return args.Get(0).(dynamicinformer.DynamicSharedInformerFactory) }