Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto respect rbac for discovery/sync #532

Merged
merged 6 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ require (
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/go-logr/logr v1.2.2
github.com/golang/mock v1.6.0
github.com/google/gnostic v0.5.7-v3refs
github.com/spf13/cobra v1.5.0
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/protobuf v1.27.1
k8s.io/api v0.24.2
k8s.io/apiextensions-apiserver v0.24.2
k8s.io/apimachinery v0.24.2
Expand Down Expand Up @@ -42,7 +44,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
Expand Down Expand Up @@ -78,7 +79,6 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
161 changes: 136 additions & 25 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

"github.com/go-logr/logr"
"golang.org/x/sync/semaphore"
authorizationv1 "k8s.io/api/authorization/v1"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -22,6 +24,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
authType1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/pager"
Expand Down Expand Up @@ -55,6 +59,15 @@ const (
defaultListSemaphoreWeight = 50
)

const (
// RespectRbacDisabled default value for respectRbac
RespectRbacDisabled = iota
// RespectRbacNormal checks only api response for forbidden/unauthorized errors
RespectRbacNormal
// RespectRbacStrict checks both api response for forbidden/unauthorized errors and SelfSubjectAccessReview
RespectRbacStrict
)

type apiMeta struct {
namespaced bool
watchCancel context.CancelFunc
Expand Down Expand Up @@ -208,6 +221,8 @@ type clusterCache struct {
eventHandlers map[uint64]OnEventHandler
openAPISchema openapi.Resources
gvkParser *managedfields.GvkParser

respectRBAC int
}

type clusterCacheSync struct {
Expand Down Expand Up @@ -462,6 +477,10 @@ func (c *clusterCache) startMissingWatches() error {
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(c.config)
if err != nil {
return err
}
namespacedResources := make(map[schema.GroupKind]bool)
for i := range apis {
api := apis[i]
Expand All @@ -470,8 +489,25 @@ func (c *clusterCache) startMissingWatches() error {
ctx, cancel := context.WithCancel(context.Background())
c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}

err = c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
go c.watchEvents(ctx, api, resClient, ns, "")
err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns)
if err != nil && c.isRestrictedResource(err) {
keep := false
if c.respectRBAC == RespectRbacStrict {
k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
if permErr != nil {
return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
}
keep = k
}
// if we are not allowed to list the resource, remove it from the watch list
if !keep {
delete(c.apisMeta, api.GroupKind)
delete(namespacedResources, api.GroupKind)
return nil
}
}
go c.watchEvents(ctx, api, resClient, ns, resourceVersion)
return nil
})
if err != nil {
Expand Down Expand Up @@ -530,6 +566,29 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
return resourceVersion, callback(listPager)
}

func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string) (string, error) {
return c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
var items []*Resource
err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
if un, ok := obj.(*unstructured.Unstructured); !ok {
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
} else {
items = append(items, c.newResource(un))
}
return nil
})

if err != nil {
return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
}

return runSynced(&c.lock, func() error {
c.replaceResourceCache(api.GroupKind, items, ns)
return nil
})
})
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
defer func() {
Expand All @@ -540,30 +599,10 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

// load API initial state if no resource version provided
if resourceVersion == "" {
var items []*Resource
resourceVersion, err = c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
if un, ok := obj.(*unstructured.Unstructured); !ok {
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
} else {
items = append(items, c.newResource(un))
}
return nil
})

if err != nil {
return fmt.Errorf("failed to load initial state of resource %s: %v", api.GroupKind.String(), err)
}
return nil
})

resourceVersion, err = c.loadInitialState(ctx, api, resClient, ns)
if err != nil {
return err
}

c.lock.Lock()
c.replaceResourceCache(api.GroupKind, items, ns)
c.lock.Unlock()
}

w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
Expand Down Expand Up @@ -687,7 +726,7 @@ func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResource
resClient := client.Resource(api.GroupVersionResource)
switch {
// if manage whole cluster or resource is cluster level and cluster resources enabled
case len(c.namespaces) == 0 || !api.Meta.Namespaced && c.clusterResources:
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
return callback(resClient, "")
// if manage some namespaces and resource is namespaced
case len(c.namespaces) != 0 && api.Meta.Namespaced:
Expand All @@ -702,6 +741,56 @@ func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResource
return nil
}

// isRestrictedResource checks if the kube api call is unauthorized or forbidden
func (c *clusterCache) isRestrictedResource(err error) bool {
return c.respectRBAC != RespectRbacDisabled && (k8sErrors.IsForbidden(err) || k8sErrors.IsUnauthorized(err))
}

// checkPermission runs a self subject access review to check if the controller has permissions to list the resource
func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface authType1.SelfSubjectAccessReviewInterface, api kube.APIResourceInfo) (keep bool, err error) {
sar := &authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Namespace: "*",
Verb: "list", // uses list verb to check for permissions
jannfis marked this conversation as resolved.
Show resolved Hide resolved
Resource: api.GroupVersionResource.Resource,
},
},
}

switch {
// if manage whole cluster or resource is cluster level and cluster resources enabled
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
}
if resp != nil && resp.Status.Allowed {
return true, nil
}
// unsupported, remove from watch list
return false, nil
// if manage some namespaces and resource is namespaced
case len(c.namespaces) != 0 && api.Meta.Namespaced:
for _, ns := range c.namespaces {
sar.Spec.ResourceAttributes.Namespace = ns
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
}
if resp != nil && resp.Status.Allowed {
return true, nil
} else {
// unsupported, remove from watch list
return false, nil
}
}
}
// checkPermission follows the same logic of determining namespace/cluster resource as the processApi function
// so if neither of the cases match it means the controller will not watch for it so it is safe to return true.
return true, nil
}

func (c *clusterCache) sync() error {
c.log.Info("Start syncing cluster")

Expand Down Expand Up @@ -748,6 +837,10 @@ func (c *clusterCache) sync() error {
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}
Comment on lines +840 to +843
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a new client here for the core API, could we not just reuse the existing dynamic client and then call it something like client.Resource(schema.FromAPIVersionAndKind("authorization.k8s.io/v1", "SelfSubjectAccessReviews").Create() below? Creating a new client imho has some side-effects (i.e. a new client-side rate limiter etc).

If there's a specific reason for using a new client, a comment would be nice as to why.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No specific reason, I thought of using the concrete type instead of using dynamic client. Will test out with dynamic client instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested it with dynamic client and for some reason the request to create the access review resource failed with an unauthorized error. I wasn't able to determine why it was happening but my best guess is the dynamic client needs a namespace for creating the resource, but access review is a cluster scope resource so when I pass "" as the namespace it fails the request.

lock := sync.Mutex{}
err = kube.RunAllAsync(len(apis), func(i int) error {
api := apis[i]
Expand All @@ -773,7 +866,25 @@ func (c *clusterCache) sync() error {
})
})
if err != nil {
return fmt.Errorf("failed to load initial state of resource %s: %v", api.GroupKind.String(), err)
if c.isRestrictedResource(err) {
keep := false
if c.respectRBAC == RespectRbacStrict {
k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
if permErr != nil {
return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
}
keep = k
}
// if we are not allowed to list the resource, remove it from the watch list
if !keep {
lock.Lock()
delete(c.apisMeta, api.GroupKind)
delete(c.namespacedResources, api.GroupKind)
lock.Unlock()
return nil
}
}
return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
}

go c.watchEvents(ctx, api, resClient, ns, resourceVersion)
Expand Down
12 changes: 12 additions & 0 deletions pkg/cache/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,15 @@ func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc ListRetryFunc)
cache.listRetryFunc = retryFunc
}
}

// SetRespectRBAC allows to set whether to respect the controller rbac in list/watches
func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc {
return func(cache *clusterCache) {
// if invalid value is provided disable respect rbac
if respectRBAC < RespectRbacDisabled || respectRBAC > RespectRbacStrict {
cache.respectRBAC = RespectRbacDisabled
} else {
cache.respectRBAC = respectRBAC
}
}
}
Loading