Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add syncset controller, feat: syncset readiness #3030

Merged
merged 47 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e34eed0
add syncset controller
acpana Oct 2, 2023
3d43814
add syncset readiness, observe config
acpana Oct 3, 2023
0dc69bd
reconcile data expectations
acpana Oct 3, 2023
46dfde2
enable syncset crd
acpana Oct 3, 2023
686af5d
interpret registrar err
acpana Oct 5, 2023
7e94cdd
review: move EM, rename to EP
acpana Oct 5, 2023
31979ac
review: remove waitgroup
acpana Oct 6, 2023
b290867
smoll fix for test
acpana Oct 7, 2023
49d652e
Merge branch 'master' into acpana/syncsets-rt-r
acpana Oct 9, 2023
2192d65
use logging constant
acpana Oct 9, 2023
592e243
add TryCancelData
acpana Oct 9, 2023
8da842c
fix lock
acpana Oct 9, 2023
50f7abd
review: rm comments, mgr starts pruner
acpana Oct 11, 2023
7296196
review: move interface, IsUniversal
acpana Oct 11, 2023
6a260a3
review: explicitly remove source
acpana Oct 11, 2023
a1e153f
Merge branch 'master' into acpana/syncsets-rt-r
acpana Oct 12, 2023
689b252
limit name to 63 char
acpana Oct 13, 2023
4708b4d
review: comments, naming, HasBalidationOperation
acpana Oct 13, 2023
9937605
only add syncset controller for validation
acpana Oct 13, 2023
05af5c9
flesh out errorList
acpana Oct 13, 2023
dda78b6
TryCancel all gvks if universal
acpana Oct 16, 2023
5e48091
review feedback
acpana Oct 19, 2023
24f9237
review feedback
acpana Oct 26, 2023
53a7625
review feedback
acpana Oct 28, 2023
c8537e2
use fake client builder
acpana Oct 29, 2023
7cd0440
use fake client builder in readiness pkg too
acpana Oct 29, 2023
23d9ef7
Merge branch 'master' into acpana/syncsets-rt-r
acpana Oct 30, 2023
c4ea7cb
review feedback
acpana Nov 2, 2023
b660943
review: remove Test_ExpectationsMgr_DeletedSyncSets
acpana Nov 2, 2023
79dce07
refactor: agg err, restore agg
acpana Nov 2, 2023
09842ff
revert: restore agg
acpana Nov 3, 2023
3d07778
revert: infer podGVK
acpana Nov 3, 2023
ccf2160
add RemoveGVKErr
acpana Nov 3, 2023
449b39a
account for dangling watches
acpana Nov 4, 2023
d0ebd6a
better handling for dangling watches
acpana Nov 6, 2023
eeba2bf
check err nullability, naming
acpana Nov 7, 2023
034e826
use remove set instead of forceWipe
acpana Nov 7, 2023
854306e
set instead of book for dangling watches
acpana Nov 9, 2023
d7bb9b5
general error watch dangling
acpana Nov 9, 2023
7faecbc
review feedback
acpana Nov 10, 2023
03565fa
refactor: use tt
acpana Nov 15, 2023
3766d66
fix: general err dangling watches
acpana Nov 15, 2023
352ae72
rf: add set directly
acpana Nov 16, 2023
a216503
Merge branch 'master' into acpana/syncsets-rt-r
acpana Nov 17, 2023
a280fb7
Apply suggestions from code review
acpana Nov 22, 2023
c87e934
review suggestions
acpana Nov 22, 2023
430deea
Merge branch 'master' into acpana/syncsets-rt-r
acpana Nov 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expectationsmgr"
"github.com/open-policy-agent/gatekeeper/v3/pkg/externaldata"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/operations"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness/pruner"
"github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil"
"github.com/open-policy-agent/gatekeeper/v3/pkg/target"
"github.com/open-policy-agent/gatekeeper/v3/pkg/upgrade"
Expand Down Expand Up @@ -487,8 +487,8 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle
return err
}

em := expectationsmgr.NewExpecationsManager(cm, tracker)
go em.Run(ctx)
p := pruner.NewExpecationsPruner(cm, tracker)
go p.Run(ctx)
acpana marked this conversation as resolved.
Show resolved Hide resolved

opts := controller.Dependencies{
CFClient: client,
Expand Down
69 changes: 52 additions & 17 deletions pkg/cachemanager/cachemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package cachemanager

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/open-policy-agent/frameworks/constraint/pkg/types"
"github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager/aggregator"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
Expand Down Expand Up @@ -59,8 +61,6 @@ type CacheManager struct {
registrar *watch.Registrar
backgroundManagementTicker time.Ticker
reader client.Reader

started bool
}

// CFDataClient is an interface for caching data.
Expand Down Expand Up @@ -106,21 +106,10 @@ func NewCacheManager(config *Config) (*CacheManager, error) {
func (c *CacheManager) Start(ctx context.Context) error {
go c.manageCache(ctx)

c.mu.Lock()
c.started = true
c.mu.Unlock()

<-ctx.Done()
return nil
}

func (c *CacheManager) Started() bool {
c.mu.RLock()
defer c.mu.RUnlock()

return c.started
}

// UpsertSource adjusts the watched set of gvks according to the newGVKs passed in
// for a given sourceKey. Callers are responsible for retrying on error.
func (c *CacheManager) UpsertSource(ctx context.Context, sourceKey aggregator.Key, newGVKs []schema.GroupVersionKind) error {
Expand All @@ -141,8 +130,12 @@ func (c *CacheManager) UpsertSource(ctx context.Context, sourceKey aggregator.Ke
// may become unreferenced and need to be deleted; this will be handled async
// in the manageCache loop.

if err := c.replaceWatchSet(ctx); err != nil {
return fmt.Errorf("error watching new gvks: %w", err)
err := c.replaceWatchSet(ctx)
acpana marked this conversation as resolved.
Show resolved Hide resolved
if failedGVKs, interpreted := interpretErr(log, err, newGVKs); interpreted != nil {
acpana marked this conversation as resolved.
Show resolved Hide resolved
for _, g := range failedGVKs {
c.tracker.TryCancelData(g)
}
return fmt.Errorf("error establishing watches: %w", interpreted)
}

return nil
Expand Down Expand Up @@ -170,6 +163,45 @@ func (c *CacheManager) replaceWatchSet(ctx context.Context) error {
return innerError
}

// interpretErr looks at the error e and unwraps it looking to find an error FailingGVKs() in the error chain.
// If found and there is an intersection with the given gvks parameter with the failing gvks, we return the failing
acpana marked this conversation as resolved.
Show resolved Hide resolved
// gvks and an error that is meant to bubble up to controllers. If there is no intersection, we log the error
// so as not to fully swallow any issues. If no error implemeting FailingGVKs() is found, we consider that
// to be a "global error", so we return all gvks passed in and the error.
func interpretErr(logger logr.Logger, e error, gvks []schema.GroupVersionKind) ([]schema.GroupVersionKind, error) {
if e == nil {
return nil, nil
}

var f interface {
acpana marked this conversation as resolved.
Show resolved Hide resolved
FailingGVKs() []schema.GroupVersionKind
Error() string
}

// search for the first error that implements the FailingGVKs() interface
if errors.As(e, &f) {
acpana marked this conversation as resolved.
Show resolved Hide resolved
acpana marked this conversation as resolved.
Show resolved Hide resolved
// this error includes failing gvks; let's match against the original list
acpana marked this conversation as resolved.
Show resolved Hide resolved
failedGvks := watch.NewSet()
failedGvks.Add(f.FailingGVKs()...)
gvksSet := watch.NewSet()
acpana marked this conversation as resolved.
Show resolved Hide resolved
gvksSet.Add(gvks...)

common := failedGvks.Intersection(gvksSet)
if common.Size() > 0 {
// then this error is pertinent to the gvks and needs to be returned
acpana marked this conversation as resolved.
Show resolved Hide resolved
return common.Items(), e
}

// if no intersection, this error is not about the gvks in this request
acpana marked this conversation as resolved.
Show resolved Hide resolved
// but we still log it for visibility
logger.Info("encountered unrelated error when replacing watch set", "error", e)
return nil, nil
}

// otherwise, this is a "global error"
return gvks, e
}

// RemoveSource removes the watches of the GVKs for a given aggregator.Key. Callers are responsible for retrying on error.
func (c *CacheManager) RemoveSource(ctx context.Context, sourceKey aggregator.Key) error {
c.mu.Lock()
Expand All @@ -179,8 +211,11 @@ func (c *CacheManager) RemoveSource(ctx context.Context, sourceKey aggregator.Ke
return fmt.Errorf("internal error removing source: %w", err)
}

if err := c.replaceWatchSet(ctx); err != nil {
return fmt.Errorf("error removing watches for source %v: %w", sourceKey, err)
err := c.replaceWatchSet(ctx)
if _, interpreted := interpretErr(log, err, []schema.GroupVersionKind{}); interpreted != nil {
// unlike UpsertSource, we cannot TryCancel or Cancel any expectations as these
acpana marked this conversation as resolved.
Show resolved Hide resolved
// GVKs may be required by other sync sources.
return fmt.Errorf("error removing watches for source %v: %w", sourceKey, interpreted)
}

return nil
Expand Down
Loading
Loading