diff --git a/go.mod b/go.mod index b5edb2d8f..6ade4e7aa 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,11 @@ require ( github.com/evanphx/json-patch v4.12.0+incompatible github.com/go-logr/logr v1.2.2 github.com/golang/mock v1.6.0 + github.com/google/gnostic v0.5.7-v3refs github.com/spf13/cobra v1.5.0 github.com/stretchr/testify v1.7.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + google.golang.org/protobuf v1.27.1 k8s.io/api v0.24.2 k8s.io/apiextensions-apiserver v0.24.2 k8s.io/apimachinery v0.24.2 @@ -42,7 +44,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/btree v1.0.1 // indirect - github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/go-cmp v0.5.5 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect @@ -78,7 +79,6 @@ require ( golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/protobuf v1.27.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index a594edf3c..7e15f89f8 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -11,8 +11,10 @@ import ( "github.com/go-logr/logr" "golang.org/x/sync/semaphore" + authorizationv1 "k8s.io/api/authorization/v1" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/errors" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -22,6 +24,8 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + authType1 "k8s.io/client-go/kubernetes/typed/authorization/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/pager" @@ -55,6 +59,15 @@ const ( defaultListSemaphoreWeight = 50 ) +const ( + // RespectRbacDisabled default value for respectRbac + RespectRbacDisabled = iota + // RespectRbacNormal checks only api response for forbidden/unauthorized errors + RespectRbacNormal + // RespectRbacStrict checks both api response for forbidden/unauthorized errors and SelfSubjectAccessReview + RespectRbacStrict +) + type apiMeta struct { namespaced bool watchCancel context.CancelFunc @@ -208,6 +221,8 @@ type clusterCache struct { eventHandlers map[uint64]OnEventHandler openAPISchema openapi.Resources gvkParser *managedfields.GvkParser + + respectRBAC int } type clusterCacheSync struct { @@ -462,6 +477,10 @@ func (c *clusterCache) startMissingWatches() error { if err != nil { return err } + clientset, err := kubernetes.NewForConfig(c.config) + if err != nil { + return err + } namespacedResources := make(map[schema.GroupKind]bool) for i := range apis { api := apis[i] @@ -470,8 +489,25 @@ func (c *clusterCache) startMissingWatches() error { ctx, cancel := context.WithCancel(context.Background()) c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel} - err = c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error { - go c.watchEvents(ctx, api, resClient, ns, "") + err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error { + resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns) + if err != nil && c.isRestrictedResource(err) { + keep := false + if c.respectRBAC == RespectRbacStrict { + k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api) + if permErr != nil { + return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error()) + } + keep = k + } + // if we are not allowed to list the resource, remove it from the watch list + if !keep { + delete(c.apisMeta, api.GroupKind) + delete(namespacedResources, api.GroupKind) + return nil + } + } + go c.watchEvents(ctx, api, resClient, ns, resourceVersion) return nil }) if err != nil { @@ -530,6 +566,29 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso return resourceVersion, callback(listPager) } +func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string) (string, error) { + return c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { + var items []*Resource + err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { + if un, ok := obj.(*unstructured.Unstructured); !ok { + return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName()) + } else { + items = append(items, c.newResource(un)) + } + return nil + }) + + if err != nil { + return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err) + } + + return runSynced(&c.lock, func() error { + c.replaceResourceCache(api.GroupKind, items, ns) + return nil + }) + }) +} + func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) { kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) { defer func() { @@ -540,30 +599,10 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo // load API initial state if no resource version provided if resourceVersion == "" { - var items []*Resource - resourceVersion, err = c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { - err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { - if un, ok := obj.(*unstructured.Unstructured); !ok { - return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName()) - } else { - items = append(items, c.newResource(un)) - } - return nil - }) - - if err != nil { - return fmt.Errorf("failed to load initial state of resource %s: %v", api.GroupKind.String(), err) - } - return nil - }) - + resourceVersion, err = c.loadInitialState(ctx, api, resClient, ns) if err != nil { return err } - - c.lock.Lock() - c.replaceResourceCache(api.GroupKind, items, ns) - c.lock.Unlock() } w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{ @@ -687,7 +726,7 @@ func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResource resClient := client.Resource(api.GroupVersionResource) switch { // if manage whole cluster or resource is cluster level and cluster resources enabled - case len(c.namespaces) == 0 || !api.Meta.Namespaced && c.clusterResources: + case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources): return callback(resClient, "") // if manage some namespaces and resource is namespaced case len(c.namespaces) != 0 && api.Meta.Namespaced: @@ -702,6 +741,56 @@ func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResource return nil } +// isRestrictedResource checks if the kube api call is unauthorized or forbidden +func (c *clusterCache) isRestrictedResource(err error) bool { + return c.respectRBAC != RespectRbacDisabled && (k8sErrors.IsForbidden(err) || k8sErrors.IsUnauthorized(err)) +} + +// checkPermission runs a self subject access review to check if the controller has permissions to list the resource +func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface authType1.SelfSubjectAccessReviewInterface, api kube.APIResourceInfo) (keep bool, err error) { + sar := &authorizationv1.SelfSubjectAccessReview{ + Spec: authorizationv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Namespace: "*", + Verb: "list", // uses list verb to check for permissions + Resource: api.GroupVersionResource.Resource, + }, + }, + } + + switch { + // if manage whole cluster or resource is cluster level and cluster resources enabled + case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources): + resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{}) + if err != nil { + return false, err + } + if resp != nil && resp.Status.Allowed { + return true, nil + } + // unsupported, remove from watch list + return false, nil + // if manage some namespaces and resource is namespaced + case len(c.namespaces) != 0 && api.Meta.Namespaced: + for _, ns := range c.namespaces { + sar.Spec.ResourceAttributes.Namespace = ns + resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{}) + if err != nil { + return false, err + } + if resp != nil && resp.Status.Allowed { + return true, nil + } else { + // unsupported, remove from watch list + return false, nil + } + } + } + // checkPermission follows the same logic of determining namespace/cluster resource as the processApi function + // so if neither of the cases match it means the controller will not watch for it so it is safe to return true. + return true, nil +} + func (c *clusterCache) sync() error { c.log.Info("Start syncing cluster") @@ -748,6 +837,10 @@ func (c *clusterCache) sync() error { if err != nil { return err } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return err + } lock := sync.Mutex{} err = kube.RunAllAsync(len(apis), func(i int) error { api := apis[i] @@ -773,7 +866,25 @@ func (c *clusterCache) sync() error { }) }) if err != nil { - return fmt.Errorf("failed to load initial state of resource %s: %v", api.GroupKind.String(), err) + if c.isRestrictedResource(err) { + keep := false + if c.respectRBAC == RespectRbacStrict { + k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api) + if permErr != nil { + return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error()) + } + keep = k + } + // if we are not allowed to list the resource, remove it from the watch list + if !keep { + lock.Lock() + delete(c.apisMeta, api.GroupKind) + delete(c.namespacedResources, api.GroupKind) + lock.Unlock() + return nil + } + } + return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err) } go c.watchEvents(ctx, api, resClient, ns, resourceVersion) diff --git a/pkg/cache/settings.go b/pkg/cache/settings.go index 1c87cdee1..a7194d0ca 100644 --- a/pkg/cache/settings.go +++ b/pkg/cache/settings.go @@ -158,3 +158,15 @@ func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc ListRetryFunc) cache.listRetryFunc = retryFunc } } + +// SetRespectRBAC allows to set whether to respect the controller rbac in list/watches +func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc { + return func(cache *clusterCache) { + // if invalid value is provided disable respect rbac + if respectRBAC < RespectRbacDisabled || respectRBAC > RespectRbacStrict { + cache.respectRBAC = RespectRbacDisabled + } else { + cache.respectRBAC = respectRBAC + } + } +}