Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: more docstrings #606

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ const (
)

type apiMeta struct {
namespaced bool
namespaced bool
// watchCancel stops the watch of all resources for this API. This gets called when the cache is invalidated or when
// the watched API ceases to exist (e.g. a CRD gets deleted).
watchCancel context.CancelFunc
}

Expand Down Expand Up @@ -468,7 +470,7 @@ func (c *clusterCache) stopWatching(gk schema.GroupKind, ns string) {
}
}

// startMissingWatches lists supported cluster resources and start watching for changes unless watch is already running
// startMissingWatches lists supported cluster resources and starts watching for changes unless watch is already running
func (c *clusterCache) startMissingWatches() error {
apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter)
if err != nil {
Expand Down Expand Up @@ -567,6 +569,7 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
return resourceVersion, callback(listPager)
}

// loadInitialState loads the state of all the resources retrieved by the given resource client.
func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) {
return c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
var items []*Resource
Expand Down Expand Up @@ -724,6 +727,9 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
})
}

// processApi processes all the resources for a given API. First we construct an API client for the given API. Then we
// call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace.
// If we're managing specific namespaces, we call the callback for each namespace.
func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, callback func(resClient dynamic.ResourceInterface, ns string) error) error {
resClient := client.Resource(api.GroupVersionResource)
switch {
Expand Down Expand Up @@ -793,6 +799,17 @@ func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface auth
return true, nil
}

// sync retrieves the current state of the cluster and stores relevant information in the clusterCache fields.
//
// First we get some metadata from the cluster, like the server version, OpenAPI document, and the list of all API
// resources.
//
// Then we get a list of the preferred versions of all API resources which are to be monitored (it's possible to exclude
// resources from monitoring). We loop through those APIs asynchronously and for each API we list all resources. We also
// kick off a goroutine to watch the resources for that API and update the cache constantly.
//
// When this function exits, the cluster cache is up to date, and the appropriate resources are being watched for
// changes.
func (c *clusterCache) sync() error {
c.log.Info("Start syncing cluster")

Expand Down Expand Up @@ -839,6 +856,8 @@ func (c *clusterCache) sync() error {
if err != nil {
return err
}

// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
lock := sync.Mutex{}
err = kube.RunAllAsync(len(apis), func(i int) error {
api := apis[i]
Expand Down
Loading