diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 50791920e..01280157f 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -69,7 +69,9 @@ const ( ) type apiMeta struct { - namespaced bool + namespaced bool + // watchCancel stops the watch of all resources for this API. This gets called when the cache is invalidated or when + // the watched API ceases to exist (e.g. a CRD gets deleted). watchCancel context.CancelFunc } @@ -468,7 +470,7 @@ func (c *clusterCache) stopWatching(gk schema.GroupKind, ns string) { } } -// startMissingWatches lists supported cluster resources and start watching for changes unless watch is already running +// startMissingWatches lists supported cluster resources and starts watching for changes unless watch is already running func (c *clusterCache) startMissingWatches() error { apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter) if err != nil { @@ -567,6 +569,7 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso return resourceVersion, callback(listPager) } +// loadInitialState loads the state of all the resources retrieved by the given resource client. func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) { return c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { var items []*Resource @@ -724,6 +727,9 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo }) } +// processApi processes all the resources for a given API. First we construct an API client for the given API. Then we +// call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace. +// If we're managing specific namespaces, we call the callback for each namespace. func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, callback func(resClient dynamic.ResourceInterface, ns string) error) error { resClient := client.Resource(api.GroupVersionResource) switch { @@ -793,6 +799,17 @@ func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface auth return true, nil } +// sync retrieves the current state of the cluster and stores relevant information in the clusterCache fields. +// +// First we get some metadata from the cluster, like the server version, OpenAPI document, and the list of all API +// resources. +// +// Then we get a list of the preferred versions of all API resources which are to be monitored (it's possible to exclude +// resources from monitoring). We loop through those APIs asynchronously and for each API we list all resources. We also +// kick off a goroutine to watch the resources for that API and update the cache constantly. +// +// When this function exits, the cluster cache is up to date, and the appropriate resources are being watched for +// changes. func (c *clusterCache) sync() error { c.log.Info("Start syncing cluster") @@ -839,6 +856,8 @@ func (c *clusterCache) sync() error { if err != nil { return err } + + // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. lock := sync.Mutex{} err = kube.RunAllAsync(len(apis), func(i int) error { api := apis[i]