diff --git a/main.go b/main.go index add56d97c49..4f95a86cbe9 100644 --- a/main.go +++ b/main.go @@ -47,11 +47,11 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" "github.com/open-policy-agent/gatekeeper/v3/pkg/drivers/k8scel" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/externaldata" "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation" "github.com/open-policy-agent/gatekeeper/v3/pkg/operations" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness/pruner" "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" @@ -444,7 +444,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle mutationSystem := mutation.NewSystem(mutationOpts) expansionSystem := expansion.NewSystem(mutationSystem) - pubsubSystem := pubsub.NewSystem() + exportSystem := export.NewSystem() c := mgr.GetCache() dc, ok := c.(watch.RemovableCache) @@ -518,7 +518,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle MutationSystem: mutationSystem, ExpansionSystem: expansionSystem, ProviderCache: providerCache, - PubsubSystem: pubsubSystem, + ExportSystem: exportSystem, } if err := controller.AddToManager(mgr, &opts); err != nil { @@ -548,7 +548,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle ProcessExcluder: processExcluder, CacheLister: auditCache, ExpansionSystem: expansionSystem, - PubSubSystem: pubsubSystem, + ExportSystem: exportSystem, } if err := audit.AddToManager(mgr, &auditDeps); err != nil { setupLog.Error(err, "unable to register audit with the manager") diff --git a/pkg/audit/controller.go b/pkg/audit/controller.go index 97f1bba7692..d01b9bf9bca 100644 --- a/pkg/audit/controller.go +++ b/pkg/audit/controller.go @@ -16,7 +16,7 @@ import ( constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -25,7 +25,7 @@ type Dependencies struct { ProcessExcluder *process.Excluder CacheLister *CacheLister ExpansionSystem *expansion.System - PubSubSystem *pubsub.System + ExportSystem *export.System } // AddToManager adds audit manager to the Manager. diff --git a/pkg/audit/manager.go b/pkg/audit/manager.go index 8d8a6761d23..1afd096f59b 100644 --- a/pkg/audit/manager.go +++ b/pkg/audit/manager.go @@ -18,11 +18,11 @@ import ( constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/frameworks/constraint/pkg/client/reviews" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" - pubsubController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub" + exportController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" mutationtypes "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/target" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" corev1 "k8s.io/api/core/v1" @@ -91,7 +91,7 @@ type Manager struct { auditCache *CacheLister expansionSystem *expansion.System - pubsubSystem *pubsub.System + exportSystem *export.System } // StatusViolation represents each violation under status. @@ -107,7 +107,7 @@ type StatusViolation struct { } // ConstraintMsg represents publish message for each constraint. -type PubsubMsg struct { +type ExportMsg struct { ID string `json:"id,omitempty"` Details interface{} `json:"details,omitempty"` EventType string `json:"eventType,omitempty"` @@ -269,7 +269,7 @@ func New(mgr manager.Manager, deps *Dependencies) (*Manager, error) { gkNamespace: util.GetNamespace(), auditCache: deps.CacheLister, expansionSystem: deps.ExpansionSystem, - pubsubSystem: deps.PubSubSystem, + exportSystem: deps.ExportSystem, } return am, nil } @@ -902,10 +902,10 @@ func (am *Manager) addAuditResponsesToUpdateLists( details := r.Metadata["details"] labels := r.obj.GetLabels() logViolation(am.log, constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels) - if *pubsubController.PubsubEnabled { - err := am.pubsubSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp)) + if *exportController.ExportEnabled { + err := am.exportSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp)) if err != nil { - am.log.Error(err, "pubsub audit Publishing") + am.log.Error(err, "error exporting audit violation") } } if *emitAuditEvents { @@ -1161,7 +1161,7 @@ func violationMsg(constraint *unstructured.Unstructured, enforcementAction util. userConstraintAnnotations := constraint.GetAnnotations() delete(userConstraintAnnotations, "kubectl.kubernetes.io/last-applied-configuration") - return PubsubMsg{ + return ExportMsg{ Message: message, Details: details, ID: timestamp, diff --git a/pkg/controller/add_pubsub.go b/pkg/controller/add_export.go similarity index 83% rename from pkg/controller/add_pubsub.go rename to pkg/controller/add_export.go index 52904f36d50..feb9cb30e1c 100644 --- a/pkg/controller/add_pubsub.go +++ b/pkg/controller/add_export.go @@ -16,9 +16,9 @@ limitations under the License. package controller import ( - "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub" + "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export" ) func init() { - Injectors = append(Injectors, &pubsub.Adder{}) + Injectors = append(Injectors, &export.Adder{}) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d129dc03e02..ef8872f3581 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -28,9 +28,9 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" @@ -57,8 +57,8 @@ type GetPodInjector interface { InjectGetPod(func(context.Context) (*corev1.Pod, error)) } -type PubsubInjector interface { - InjectPubsubSystem(pubsubSystem *pubsub.System) +type ExportInjector interface { + InjectExportSystem(exportSystem *export.System) } type DataClientInjector interface { @@ -103,7 +103,7 @@ type Dependencies struct { MutationSystem *mutation.System ExpansionSystem *expansion.System ProviderCache *externaldata.ProviderCache - PubsubSystem *pubsub.System + ExportSystem *export.System SyncEventsCh chan event.GenericEvent CacheMgr *cm.CacheManager } @@ -215,8 +215,8 @@ func AddToManager(m manager.Manager, deps *Dependencies) error { if a2, ok := a.(GetPodInjector); ok { a2.InjectGetPod(deps.GetPod) } - if a2, ok := a.(PubsubInjector); ok { - a2.InjectPubsubSystem(deps.PubsubSystem) + if a2, ok := a.(ExportInjector); ok { + a2.InjectExportSystem(deps.ExportSystem) } if a2, ok := a.(CacheManagerInjector); ok { // this is used by the config controller to sync diff --git a/pkg/controller/pubsub/pubsub_config_controller.go b/pkg/controller/export/export_config_controller.go similarity index 80% rename from pkg/controller/pubsub/pubsub_config_controller.go rename to pkg/controller/export/export_config_controller.go index c34fa1b97b8..c1efe9bc2a6 100644 --- a/pkg/controller/pubsub/pubsub_config_controller.go +++ b/pkg/controller/export/export_config_controller.go @@ -1,4 +1,4 @@ -package pubsub +package export import ( "context" @@ -6,8 +6,8 @@ import ( "flag" "fmt" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" @@ -26,19 +26,19 @@ import ( ) var ( - PubsubEnabled = flag.Bool("enable-pub-sub", false, "(alpha) Enabled pubsub to publish messages") + ExportEnabled = flag.Bool("enable-pub-sub", false, "(alpha) Enabled pubsub to publish messages") log = logf.Log.WithName("controller").WithValues(logging.Process, "pubsub_controller") ) type Adder struct { - PubsubSystem *pubsub.System + ExportSystem *export.System } func (a *Adder) Add(mgr manager.Manager) error { - if !*PubsubEnabled { + if !*ExportEnabled { return nil } - r := newReconciler(mgr, a.PubsubSystem) + r := newReconciler(mgr, a.ExportSystem) return add(mgr, r) } @@ -46,17 +46,17 @@ func (a *Adder) InjectControllerSwitch(_ *watch.ControllerSwitch) {} func (a *Adder) InjectTracker(_ *readiness.Tracker) {} -func (a *Adder) InjectPubsubSystem(pubsubSystem *pubsub.System) { - a.PubsubSystem = pubsubSystem +func (a *Adder) InjectExportSystem(exportSystem *export.System) { + a.ExportSystem = exportSystem } type Reconciler struct { client.Client scheme *runtime.Scheme - system *pubsub.System + system *export.System } -func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler { +func newReconciler(mgr manager.Manager, system *export.System) *Reconciler { return &Reconciler{ Client: mgr.GetClient(), scheme: mgr.GetScheme(), @@ -65,7 +65,7 @@ func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler { } func add(mgr manager.Manager, r reconcile.Reconciler) error { - c, err := controller.New("pubsub-config-controller", mgr, controller.Options{Reconciler: r}) + c, err := controller.New("export-config-controller", mgr, controller.Options{Reconciler: r}) if err != nil { return err } @@ -113,10 +113,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( } if len(cfg.Data) == 0 { - return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName)) + return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to establish connection", request.NamespacedName)) } - if _, ok := cfg.Data["provider"]; !ok { - return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing provider field in configmap %s, unable to configure respective pubsub", request.NamespacedName)) + if _, ok := cfg.Data["driver"]; !ok { + return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing driver field in configmap %s, unable to establish connection", request.NamespacedName)) } var config interface{} err = json.Unmarshal([]byte(cfg.Data["config"]), &config) @@ -124,11 +124,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( return reconcile.Result{}, err } - err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["provider"]) + err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["driver"]) if err != nil { return reconcile.Result{}, err } - log.Info("Connection upsert successful", "name", request.Name, "provider", cfg.Data["provider"]) + log.Info("Connection upsert successful", "name", request.Name, "driver", cfg.Data["driver"]) return reconcile.Result{}, nil } diff --git a/pkg/controller/pubsub/pubsub_config_controller_test.go b/pkg/controller/export/export_config_controller_test.go similarity index 92% rename from pkg/controller/pubsub/pubsub_config_controller_test.go rename to pkg/controller/export/export_config_controller_test.go index 258b092c309..791468a4c9c 100644 --- a/pkg/controller/pubsub/pubsub_config_controller_test.go +++ b/pkg/controller/export/export_config_controller_test.go @@ -1,4 +1,4 @@ -package pubsub +package export import ( "context" @@ -6,7 +6,7 @@ import ( "fmt" "testing" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" @@ -48,7 +48,7 @@ func TestReconcile(t *testing.T) { }, }, wantErr: true, - errorMsg: fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName), + errorMsg: fmt.Sprintf("data missing in configmap %s, unable to establish connection", request.NamespacedName), }, } for _, tc := range testCases { diff --git a/pkg/export/dapr/dapr.go b/pkg/export/dapr/dapr.go new file mode 100644 index 00000000000..ba4ca8e4fdd --- /dev/null +++ b/pkg/export/dapr/dapr.go @@ -0,0 +1,88 @@ +package dapr + +import ( + "context" + "encoding/json" + "fmt" + + daprClient "github.com/dapr/go-sdk/client" +) + +type Connection struct { + // Name of the component object to use in Dapr + component string + + client daprClient.Client +} + +// Dapr represents driver to use Dapr. +type Dapr struct { + openConnections map[string]Connection +} + +const ( + Name = "dapr" +) + +var Connections = &Dapr{ + openConnections: make(map[string]Connection), +} + +func (r *Dapr) Publish(_ context.Context, connectionName string, data interface{}, topic string) error { + jsonData, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("error marshaling data: %w", err) + } + + conn, ok := r.openConnections[connectionName] + if !ok { + return fmt.Errorf("connection not found: %s for Dapr driver", connectionName) + } + err = conn.client.PublishEvent(context.Background(), conn.component, topic, jsonData) + if err != nil { + return fmt.Errorf("error publishing message to dapr: %w", err) + } + + return nil +} + +func (r *Dapr) CloseConnection(connectionName string) error { + delete(r.openConnections, connectionName) + return nil +} + +func (r *Dapr) UpdateConnection(_ context.Context, connectionName string, config interface{}) error { + cfg, ok := config.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid type assertion, config is not in expected format") + } + component, ok := cfg["component"].(string) + if !ok { + return fmt.Errorf("failed to get value of component") + } + conn := r.openConnections[connectionName] + conn.component = component + r.openConnections[connectionName] = conn + return nil +} + +func (r *Dapr) CreateConnection(_ context.Context, connectionName string, config interface{}) error { + var conn Connection + cfg, ok := config.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid type assertion, config is not in expected format") + } + conn.component, ok = cfg["component"].(string) + if !ok { + return fmt.Errorf("failed to get value of component") + } + + tmp, err := daprClient.NewClient() + if err != nil { + return err + } + + conn.client = tmp + r.openConnections[connectionName] = conn + return nil +} diff --git a/pkg/export/dapr/dapr_test.go b/pkg/export/dapr/dapr_test.go new file mode 100644 index 00000000000..d53e2e819ed --- /dev/null +++ b/pkg/export/dapr/dapr_test.go @@ -0,0 +1,163 @@ +package dapr + +import ( + "context" + "os" + "testing" + + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/driver" + "github.com/stretchr/testify/assert" +) + +var testClient driver.Driver + +func TestMain(m *testing.M) { + c, f := FakeConnection() + testClient = c + r := m.Run() + f() + + if r != 0 { + os.Exit(r) + } +} + +func TestCreate(t *testing.T) { + tests := []struct { + name string + config interface{} + expectedConnections int + errorMsg string + }{ + { + name: "invalid config", + config: "test", + expectedConnections: 1, + errorMsg: "invalid type assertion, config is not in expected format", + }, + { + name: "config with missing component", + config: map[string]interface{}{"enableBatching": true}, + expectedConnections: 1, + errorMsg: "failed to get value of component", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := testClient.CreateConnection(context.TODO(), "another-test", tc.config) + tmp, ok := testClient.(*Dapr) + if !ok { + t.Errorf("failed to type assert") + } + assert.Equal(t, tc.expectedConnections, len(tmp.openConnections)) + assert.EqualError(t, err, tc.errorMsg) + }) + } +} + +func TestDapr_Publish(t *testing.T) { + ctx := context.Background() + + type args struct { + ctx context.Context + data interface{} + topic string + connectionName string + } + + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "test publish", + args: args{ + ctx: ctx, + data: map[string]interface{}{ + "test": "test", + }, + topic: "test", + connectionName: "test", + }, + wantErr: false, + }, + { + name: "test publish without data", + args: args{ + ctx: ctx, + data: nil, + topic: "test", + connectionName: "test", + }, + wantErr: false, + }, + { + name: "test publish without topic", + args: args{ + ctx: ctx, + data: map[string]interface{}{ + "test": "test", + }, + topic: "", + connectionName: "test", + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := testClient + if err := r.Publish(tt.args.ctx, tt.args.connectionName, tt.args.data, tt.args.topic); (err != nil) != tt.wantErr { + t.Errorf("Dapr.Publish() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestDapr_Update(t *testing.T) { + tests := []struct { + name string + config interface{} + connectionName string + wantErr bool + }{ + { + name: "test update connection", + config: map[string]interface{}{ + "component": "foo", + }, + wantErr: false, + connectionName: "test", + }, + { + name: "test update connection with invalid config", + config: map[string]interface{}{ + "foo": "bar", + }, + connectionName: "test", + wantErr: true, + }, + { + name: "test update connection with nil config", + config: nil, + connectionName: "test", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := testClient + if err := r.UpdateConnection(context.Background(), tt.connectionName, tt.config); (err != nil) != tt.wantErr { + t.Errorf("Dapr.Update() error = %v, wantErr %v", err, tt.wantErr) + } + if !tt.wantErr { + cmp, ok := tt.config.(map[string]interface{})["component"].(string) + assert.True(t, ok) + tmp, ok := r.(*Dapr) + assert.True(t, ok) + assert.Equal(t, cmp, tmp.openConnections[tt.connectionName].component) + } + }) + } +} diff --git a/pkg/pubsub/dapr/fake_dapr_client.go b/pkg/export/dapr/fake_dapr_client.go similarity index 87% rename from pkg/pubsub/dapr/fake_dapr_client.go rename to pkg/export/dapr/fake_dapr_client.go index 4bd36da5ecd..62236c1c74f 100644 --- a/pkg/pubsub/dapr/fake_dapr_client.go +++ b/pkg/export/dapr/fake_dapr_client.go @@ -17,7 +17,7 @@ import ( pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1" "github.com/golang/protobuf/ptypes/empty" "github.com/google/uuid" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/driver" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" @@ -326,66 +326,76 @@ func (s *testDaprServer) BulkPublishEventAlpha1(_ context.Context, req *pb.BulkP return &pb.BulkPublishResponse{FailedEntries: failedEntries}, nil } -func FakeConnection() (connection.Connection, func()) { +func FakeConnection() (driver.Driver, func()) { ctx := context.Background() c, f := getTestClient(ctx) return &Dapr{ - client: c, - pubSubComponent: "test", + openConnections: map[string]Connection{ + "test": { + client: c, + component: "test", + }, + }, }, f } -type FakeDapr struct { - // Array of clients to talk to different endpoints - client daprClient.Client - - // Name of the pubsub component - pubSubComponent string +type FakeDaprConnection struct { + component string + client daprClient.Client // closing function f func() } -func (r *FakeDapr) Publish(_ context.Context, _ interface{}, _ string) error { +type FakeDapr struct { + openConnections map[string]FakeDaprConnection +} + +func (r *FakeDapr) Publish(_ context.Context, _ string, _ interface{}, _ string) error { return nil } -func (r *FakeDapr) CloseConnection() error { - r.f() +func (r *FakeDapr) CloseConnection(connectionName string) error { + if len(r.openConnections) == 1 { + r.openConnections[connectionName].f() + } + delete(r.openConnections, connectionName) return nil } -func (r *FakeDapr) UpdateConnection(_ context.Context, config interface{}) error { - var cfg ClientConfig - m, ok := config.(map[string]interface{}) +func (r *FakeDapr) UpdateConnection(_ context.Context, connectionName string, config interface{}) error { + cfg, ok := config.(map[string]interface{}) if !ok { return fmt.Errorf("invalid type assertion, config is not in expected format") } - cfg.Component, ok = m["component"].(string) + component, ok := cfg["component"].(string) if !ok { return fmt.Errorf("failed to get value of component") } - r.pubSubComponent = cfg.Component + conn := r.openConnections[connectionName] + conn.component = component + r.openConnections[connectionName] = conn return nil } -// Returns a fake client for dapr. -func FakeNewConnection(ctx context.Context, config interface{}) (connection.Connection, error) { - var cfg ClientConfig - m, ok := config.(map[string]interface{}) +func (r *FakeDapr) CreateConnection(ctx context.Context, connectionName string, config interface{}) error { + var conn FakeDaprConnection + cfg, ok := config.(map[string]interface{}) if !ok { - return nil, fmt.Errorf("invalid type assertion, config is not in expected format") + return fmt.Errorf("invalid type assertion, config is not in expected format") } - cfg.Component, ok = m["component"].(string) + conn.component, ok = cfg["component"].(string) if !ok { - return nil, fmt.Errorf("failed to get value of component") + return fmt.Errorf("failed to get value of component") } c, f := getTestClient(ctx) + conn.client = c + conn.f = f + r.openConnections[connectionName] = conn + return nil +} - return &FakeDapr{ - client: c, - pubSubComponent: cfg.Component, - f: f, - }, nil +var FakeConn = &FakeDapr{ + openConnections: map[string]FakeDaprConnection{}, } diff --git a/pkg/export/driver/driver.go b/pkg/export/driver/driver.go new file mode 100644 index 00000000000..3b5a4b561d4 --- /dev/null +++ b/pkg/export/driver/driver.go @@ -0,0 +1,19 @@ +package driver + +import ( + "context" +) + +type Driver interface { + // Publish publishes single message with specific subject using a connection + Publish(ctx context.Context, connectionName string, data interface{}, subject string) error + + // CloseConnection closes a connection + CloseConnection(connectionName string) error + + // UpdateConnection updates an existing connection + UpdateConnection(ctx context.Context, connectionName string, config interface{}) error + + // CreateConnection creates new connection + CreateConnection(ctx context.Context, connectionName string, config interface{}) error +} diff --git a/pkg/export/system.go b/pkg/export/system.go new file mode 100644 index 00000000000..d9863c35a65 --- /dev/null +++ b/pkg/export/system.go @@ -0,0 +1,81 @@ +package export + +import ( + "context" + "fmt" + "sync" + + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/driver" +) + +var SupportedDrivers = map[string]driver.Driver{ + dapr.Name: dapr.Connections, +} + +type System struct { + mux sync.RWMutex + connectionToDriver map[string]string +} + +func NewSystem() *System { + return &System{ + connectionToDriver: map[string]string{}, + } +} + +func (s *System) Publish(_ context.Context, connectionName string, subject string, msg interface{}) error { + s.mux.RLock() + defer s.mux.RUnlock() + if dName, ok := s.connectionToDriver[connectionName]; ok { + return SupportedDrivers[dName].Publish(context.Background(), connectionName, msg, subject) + } + return fmt.Errorf("connection is not initialized, name: %s ", connectionName) +} + +func (s *System) UpsertConnection(ctx context.Context, config interface{}, connectionName string, newDriver string) error { + s.mux.Lock() + defer s.mux.Unlock() + // Check if the connection already exists. + if oldDriver, ok := s.connectionToDriver[connectionName]; ok { + // If the provider is the same, update the existing connection. + if oldDriver == newDriver { + return SupportedDrivers[newDriver].UpdateConnection(ctx, connectionName, config) + } + } + // Check if the provider is supported. + if d, ok := SupportedDrivers[newDriver]; ok { + err := d.CreateConnection(ctx, connectionName, config) + if err != nil { + return err + } + + // Close the existing connection after successfully creating the new one. + if err := s.closeConnection(connectionName); err != nil { + return err + } + // Add the new connection and provider to the maps. + s.connectionToDriver[connectionName] = newDriver + return nil + } + return fmt.Errorf("driver %s is not supported", newDriver) +} + +func (s *System) CloseConnection(connectionName string) error { + s.mux.Lock() + defer s.mux.Unlock() + return s.closeConnection(connectionName) +} + +func (s *System) closeConnection(connectionName string) error { + if c, ok := s.connectionToDriver[connectionName]; ok { + if conn, ok := SupportedDrivers[c]; ok { + err := conn.CloseConnection(connectionName) + if err != nil { + return err + } + } + delete(s.connectionToDriver, connectionName) + } + return nil +} diff --git a/pkg/export/system_test.go b/pkg/export/system_test.go new file mode 100644 index 00000000000..5daa62df3e0 --- /dev/null +++ b/pkg/export/system_test.go @@ -0,0 +1,241 @@ +package export + +import ( + "context" + "os" + "sync" + "testing" + + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/driver" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/testdriver" + "github.com/stretchr/testify/assert" +) + +var testSystem *System + +func TestMain(m *testing.M) { + ctx := context.Background() + SupportedDrivers = map[string]driver.Driver{ + dapr.Name: dapr.FakeConn, + } + testSystem = NewSystem() + cfg := map[string]interface{}{ + dapr.Name: map[string]interface{}{ + "component": "pubsub", + }, + } + for name, fakeConn := range SupportedDrivers { + testSystem.connectionToDriver[name] = name + _ = fakeConn.CreateConnection(ctx, name, cfg[name]) + } + r := m.Run() + for name, fakeConn := range testSystem.connectionToDriver { + _ = SupportedDrivers[fakeConn].CloseConnection(name) + } + + if r != 0 { + os.Exit(r) + } +} + +func TestNewSystem(t *testing.T) { + tests := []struct { + name string + input string + want *System + }{ + { + name: "requesting system", + want: &System{ + connectionToDriver: map[string]string{}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ret := NewSystem() + assert.Equal(t, ret, tc.want) + }) + } +} + +func TestSystem_UpsertConnection(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + config interface{} + connectionName string + newDriver string + setup func(*System) error + wantErr bool + }{ + { + name: "new connection with supported driver", + config: map[string]interface{}{"component": "pubsub"}, + connectionName: "conn1", + newDriver: dapr.Name, + setup: func(s *System) error { + s.connectionToDriver = map[string]string{} + SupportedDrivers[dapr.Name] = dapr.FakeConn + return nil + }, + wantErr: false, + }, + { + name: "update existing connection with same driver", + config: map[string]interface{}{"component": "pubsub1"}, + connectionName: "conn1", + newDriver: dapr.Name, + setup: func(s *System) error { + s.connectionToDriver["conn1"] = dapr.Name + SupportedDrivers[dapr.Name] = dapr.FakeConn + return SupportedDrivers[dapr.Name].CreateConnection(ctx, "conn1", map[string]interface{}{"component": "pubsub"}) + }, + wantErr: false, + }, + { + name: "new connection with unsupported driver", + config: map[string]interface{}{"component": "pubsub"}, + connectionName: "conn3", + newDriver: "unsupportedDriver", + setup: func(_ *System) error { return nil }, + wantErr: true, + }, + { + name: "update existing connection with different driver", + config: map[string]interface{}{"component": "pubsub"}, + connectionName: "conn4", + newDriver: dapr.Name, + setup: func(s *System) error { + s.connectionToDriver["conn4"] = testdriver.Name + SupportedDrivers[dapr.Name] = dapr.FakeConn + SupportedDrivers[testdriver.Name] = testdriver.FakeConn + return SupportedDrivers[testdriver.Name].CreateConnection(ctx, "conn4", "config4") + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + system := NewSystem() + if err := tt.setup(system); err != nil { + t.Fatalf("failed to setup test: %v", err) + } + + err := system.UpsertConnection(ctx, tt.config, tt.connectionName, tt.newDriver) + if (err != nil) != tt.wantErr { + t.Errorf("UpsertConnection() error = %v, wantErr %v", err, tt.wantErr) + } + + if !tt.wantErr { + if driver, ok := system.connectionToDriver[tt.connectionName]; !ok || driver != tt.newDriver { + t.Errorf("connection %s not found or driver mismatch: got %v, want %v", tt.connectionName, driver, tt.newDriver) + } + } + }) + } +} + +func TestSystem_CloseConnection(t *testing.T) { + tests := []struct { + name string + setup func(*System) + connectionName string + wantErr bool + }{ + { + name: "close existing connection", + setup: func(s *System) { + s.connectionToDriver["test-connection"] = dapr.Name + SupportedDrivers[dapr.Name] = dapr.FakeConn + _ = dapr.FakeConn.CreateConnection(context.TODO(), "test-connection", map[string]interface{}{"component": "pubsub"}) + }, + connectionName: "test-connection", + wantErr: false, + }, + { + name: "close non-existing connection", + setup: func(s *System) { + // No setup needed for non-existing connection + s.connectionToDriver = map[string]string{} + }, + connectionName: "non-existing-connection", + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := NewSystem() + if tt.setup != nil { + tt.setup(s) + } + + err := s.CloseConnection(tt.connectionName) + if (err != nil) != tt.wantErr { + t.Errorf("CloseConnection() error = %v, wantErr %v", err, tt.wantErr) + } + + if _, exists := s.connectionToDriver[tt.connectionName]; exists && !tt.wantErr { + t.Errorf("connection %s still exists after CloseConnection", tt.connectionName) + } + }) + } +} + +func TestSystem_Publish(t *testing.T) { + type fields struct { + connections map[string]string + } + type args struct { + ctx context.Context + connection string + topic string + msg interface{} + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "There are no connections established", + fields: fields{ + connections: nil, + }, + args: args{ctx: context.Background(), connection: "audit", topic: "test", msg: nil}, + wantErr: true, + }, + { + name: "Publishing to a connection that does not exist", + fields: fields{ + connections: map[string]string{"audit": dapr.Name}, + }, + args: args{ctx: context.Background(), connection: "test", topic: "test", msg: nil}, + wantErr: true, + }, + { + name: "Publishing to a connection that does exist", + fields: fields{ + connections: testSystem.connectionToDriver, + }, + args: args{ctx: context.Background(), connection: "dapr", topic: "test", msg: nil}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &System{ + mux: sync.RWMutex{}, + connectionToDriver: tt.fields.connections, + } + if err := s.Publish(tt.args.ctx, tt.args.connection, tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr { + t.Errorf("System.Publish() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/export/testdriver/testdriver.go b/pkg/export/testdriver/testdriver.go new file mode 100644 index 00000000000..4283a256a5c --- /dev/null +++ b/pkg/export/testdriver/testdriver.go @@ -0,0 +1,48 @@ +package testdriver + +import ( + "context" + "fmt" +) + +const Name = "testdriver" + +var FakeConn = &Connection{ + openConnections: make(map[string]FakeConnection), +} + +// Connection represents driver to use testdriver. +type Connection struct { + openConnections map[string]FakeConnection +} + +type FakeConnection struct { + name string +} + +func (r *Connection) Publish(_ context.Context, _ string, _ interface{}, _ string) error { + return nil +} + +func (r *Connection) CloseConnection(connectionName string) error { + delete(r.openConnections, connectionName) + return nil +} + +func (r *Connection) UpdateConnection(_ context.Context, connectionName string, config interface{}) error { + name, ok := config.(string) + if !ok { + return fmt.Errorf("invalid type assertion, config is not in expected format") + } + r.openConnections[connectionName] = FakeConnection{name: name} + return nil +} + +func (r *Connection) CreateConnection(_ context.Context, connectionName string, config interface{}) error { + name, ok := config.(string) + if !ok { + return fmt.Errorf("invalid type assertion, config is not in expected format") + } + r.openConnections[connectionName] = FakeConnection{name: name} + return nil +} diff --git a/pkg/pubsub/connection/connection.go b/pkg/pubsub/connection/connection.go deleted file mode 100644 index 0edb6a74daf..00000000000 --- a/pkg/pubsub/connection/connection.go +++ /dev/null @@ -1,17 +0,0 @@ -package connection - -import ( - "context" -) - -// PubSub is the interface that wraps pubsub methods. -type Connection interface { - // Publish single message over a specific topic/channel - Publish(ctx context.Context, data interface{}, topic string) error - - // Close connections - CloseConnection() error - - // Update connection - UpdateConnection(ctx context.Context, data interface{}) error -} diff --git a/pkg/pubsub/dapr/dapr.go b/pkg/pubsub/dapr/dapr.go deleted file mode 100644 index 0db60445494..00000000000 --- a/pkg/pubsub/dapr/dapr.go +++ /dev/null @@ -1,83 +0,0 @@ -package dapr - -import ( - "context" - "encoding/json" - "fmt" - - daprClient "github.com/dapr/go-sdk/client" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" -) - -type ClientConfig struct { - // Name of the component to be used for pub sub messaging - Component string `json:"component"` -} - -// Dapr represents driver for interacting with pub sub using dapr. -type Dapr struct { - // Array of clients to talk to different endpoints - client daprClient.Client - - // Name of the pubsub component - pubSubComponent string -} - -const ( - Name = "dapr" -) - -func (r *Dapr) Publish(_ context.Context, data interface{}, topic string) error { - jsonData, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("error marshaling data: %w", err) - } - - err = r.client.PublishEvent(context.Background(), r.pubSubComponent, topic, jsonData) - if err != nil { - return fmt.Errorf("error publishing message to dapr: %w", err) - } - - return nil -} - -func (r *Dapr) CloseConnection() error { - return nil -} - -func (r *Dapr) UpdateConnection(_ context.Context, config interface{}) error { - var cfg ClientConfig - m, ok := config.(map[string]interface{}) - if !ok { - return fmt.Errorf("invalid type assertion, config is not in expected format") - } - cfg.Component, ok = m["component"].(string) - if !ok { - return fmt.Errorf("failed to get value of component") - } - r.pubSubComponent = cfg.Component - return nil -} - -// Returns a new client for dapr. -func NewConnection(_ context.Context, config interface{}) (connection.Connection, error) { - var cfg ClientConfig - m, ok := config.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("invalid type assertion, config is not in expected format") - } - cfg.Component, ok = m["component"].(string) - if !ok { - return nil, fmt.Errorf("failed to get value of component") - } - - tmp, err := daprClient.NewClient() - if err != nil { - return nil, err - } - - return &Dapr{ - client: tmp, - pubSubComponent: cfg.Component, - }, nil -} diff --git a/pkg/pubsub/dapr/dapr_test.go b/pkg/pubsub/dapr/dapr_test.go deleted file mode 100644 index 5a2e72615b1..00000000000 --- a/pkg/pubsub/dapr/dapr_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package dapr - -import ( - "context" - "os" - "testing" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" - "github.com/stretchr/testify/assert" -) - -var testClient connection.Connection - -func TestMain(m *testing.M) { - c, f := FakeConnection() - testClient = c - r := m.Run() - f() - - if r != 0 { - os.Exit(r) - } -} - -func TestNewConnection(t *testing.T) { - tests := []struct { - name string - config interface{} - expected connection.Connection - errorMsg string - }{ - { - name: "invalid config", - config: "test", - expected: nil, - errorMsg: "invalid type assertion, config is not in expected format", - }, - { - name: "config with missing component", - config: map[string]interface{}{"enableBatching": true}, - expected: nil, - errorMsg: "failed to get value of component", - }, - } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ret, err := NewConnection(context.TODO(), tc.config) - assert.Equal(t, ret, tc.expected) - assert.EqualError(t, err, tc.errorMsg) - }) - } -} - -func TestDapr_Publish(t *testing.T) { - ctx := context.Background() - - type args struct { - ctx context.Context - data interface{} - topic string - } - - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "test publish", - args: args{ - ctx: ctx, - data: map[string]interface{}{ - "test": "test", - }, - topic: "test", - }, - wantErr: false, - }, - { - name: "test publish without data", - args: args{ - ctx: ctx, - data: nil, - topic: "test", - }, - wantErr: false, - }, - { - name: "test publish without topic", - args: args{ - ctx: ctx, - data: map[string]interface{}{ - "test": "test", - }, - topic: "", - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := testClient - if err := r.Publish(tt.args.ctx, tt.args.data, tt.args.topic); (err != nil) != tt.wantErr { - t.Errorf("Dapr.Publish() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestDapr_UpdateConnection(t *testing.T) { - tests := []struct { - name string - config interface{} - wantErr bool - }{ - { - name: "test update connection", - config: map[string]interface{}{ - "component": "foo", - }, - wantErr: false, - }, - { - name: "test update connection with invalid config", - config: map[string]interface{}{ - "foo": "bar", - }, - wantErr: true, - }, - { - name: "test update connection with nil config", - config: nil, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := testClient - if err := r.UpdateConnection(context.Background(), tt.config); (err != nil) != tt.wantErr { - t.Errorf("Dapr.UpdateConnection() error = %v, wantErr %v", err, tt.wantErr) - } - if !tt.wantErr { - cmp, ok := tt.config.(map[string]interface{})["component"].(string) - assert.True(t, ok) - tmp, ok := r.(*Dapr) - assert.True(t, ok) - assert.Equal(t, cmp, tmp.pubSubComponent) - } - }) - } -} diff --git a/pkg/pubsub/provider/fake_provider.go b/pkg/pubsub/provider/fake_provider.go deleted file mode 100644 index 6e1173eb9b7..00000000000 --- a/pkg/pubsub/provider/fake_provider.go +++ /dev/null @@ -1,11 +0,0 @@ -package provider - -import ( - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" -) - -func FakeProviders() { - pubSubs = newPubSubSet(map[string]InitiateConnection{ - dapr.Name: dapr.FakeNewConnection, - }) -} diff --git a/pkg/pubsub/provider/provider.go b/pkg/pubsub/provider/provider.go deleted file mode 100644 index 5e1d0601014..00000000000 --- a/pkg/pubsub/provider/provider.go +++ /dev/null @@ -1,39 +0,0 @@ -package provider - -import ( - "context" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" -) - -var pubSubs = newPubSubSet(map[string]InitiateConnection{ - dapr.Name: dapr.NewConnection, -}, -) - -type pubSubSet struct { - supportedPubSub map[string]InitiateConnection -} - -// returns new client for pub sub tool. -type InitiateConnection func(ctx context.Context, config interface{}) (connection.Connection, error) - -func newPubSubSet(pubSubs map[string]InitiateConnection) *pubSubSet { - supported := make(map[string]InitiateConnection) - set := &pubSubSet{ - supportedPubSub: supported, - } - for name := range pubSubs { - set.supportedPubSub[name] = pubSubs[name] - } - return set -} - -func List() map[string]InitiateConnection { - ret := make(map[string]InitiateConnection) - for name, new := range pubSubs.supportedPubSub { - ret[name] = new - } - return ret -} diff --git a/pkg/pubsub/provider/provider_test.go b/pkg/pubsub/provider/provider_test.go deleted file mode 100644 index c81602525ac..00000000000 --- a/pkg/pubsub/provider/provider_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package provider - -import ( - "testing" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" -) - -func Test_newPubSubSet(t *testing.T) { - tests := []struct { - name string - pubSubs map[string]InitiateConnection - wantKey string - }{ - { - name: "only one provider is available", - pubSubs: map[string]InitiateConnection{ - dapr.Name: dapr.NewConnection, - }, - wantKey: dapr.Name, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := newPubSubSet(tt.pubSubs) - if _, ok := got.supportedPubSub[tt.wantKey]; !ok { - t.Errorf("newPubSubSet() = %#v, want key %#v", got.supportedPubSub, tt.wantKey) - } - }) - } -} - -func TestList(t *testing.T) { - tests := []struct { - name string - wantKey string - }{ - { - name: "only one provider is available", - wantKey: dapr.Name, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := List() - if _, ok := got[tt.wantKey]; !ok { - t.Errorf("List() = %#v, want key %#v", got, tt.wantKey) - } - }) - } -} diff --git a/pkg/pubsub/system.go b/pkg/pubsub/system.go deleted file mode 100644 index da60a1be8e6..00000000000 --- a/pkg/pubsub/system.go +++ /dev/null @@ -1,87 +0,0 @@ -package pubsub - -import ( - "context" - "fmt" - "sync" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" - prvd "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/provider" -) - -type System struct { - mux sync.RWMutex - connections map[string]connection.Connection - providers map[string]string -} - -func NewSystem() *System { - return &System{} -} - -func (s *System) Publish(_ context.Context, connection string, topic string, msg interface{}) error { - s.mux.RLock() - defer s.mux.RUnlock() - if len(s.connections) > 0 { - if c, ok := s.connections[connection]; ok { - return c.Publish(context.Background(), msg, topic) - } - return fmt.Errorf("connection is not initialized, name: %s ", connection) - } - return fmt.Errorf("No connections are established") -} - -func (s *System) UpsertConnection(ctx context.Context, config interface{}, name string, provider string) error { - s.mux.Lock() - defer s.mux.Unlock() - // Check if the connection already exists. - if conn, ok := s.connections[name]; ok { - // If the provider is the same, update the existing connection. - if s.providers[name] == provider { - return conn.UpdateConnection(ctx, config) - } - } - // Check if the provider is supported. - if newConnFunc, ok := prvd.List()[provider]; ok { - newConn, err := newConnFunc(ctx, config) - if err != nil { - return err - } - - // Close the existing connection after successfully creating the new one. - if err := s.closeConnection(name); err != nil { - return err - } - // Add the new connection and provider to the maps. - if s.connections == nil { - s.connections = map[string]connection.Connection{} - } - if s.providers == nil { - s.providers = map[string]string{} - } - s.connections[name] = newConn - s.providers[name] = provider - return nil - } - return fmt.Errorf("pub-sub provider %s is not supported", provider) -} - -func (s *System) CloseConnection(connection string) error { - s.mux.Lock() - defer s.mux.Unlock() - return s.closeConnection(connection) -} - -func (s *System) closeConnection(connection string) error { - if len(s.connections) > 0 { - if c, ok := s.connections[connection]; ok { - err := c.CloseConnection() - if err != nil { - return err - } - delete(s.connections, connection) - delete(s.providers, connection) - } - } - return nil -} diff --git a/pkg/pubsub/system_test.go b/pkg/pubsub/system_test.go deleted file mode 100644 index a58e43c8cb7..00000000000 --- a/pkg/pubsub/system_test.go +++ /dev/null @@ -1,256 +0,0 @@ -package pubsub - -import ( - "context" - "os" - "sync" - "testing" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr" - "github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/provider" - "github.com/stretchr/testify/assert" -) - -var testSystem *System - -func TestMain(m *testing.M) { - ctx := context.Background() - provider.FakeProviders() - tmp := provider.List() - testSystem = NewSystem() - testSystem.connections = make(map[string]connection.Connection) - testSystem.providers = make(map[string]string) - cfg := map[string]interface{}{ - dapr.Name: map[string]interface{}{ - "component": "pubsub", - }, - } - for name, fakeConn := range tmp { - testSystem.providers[name] = name - testSystem.connections[name], _ = fakeConn(ctx, cfg[name]) - } - r := m.Run() - for _, fakeConn := range testSystem.connections { - _ = fakeConn.CloseConnection() - } - - if r != 0 { - os.Exit(r) - } -} - -func TestNewSystem(t *testing.T) { - tests := []struct { - name string - input string - want *System - }{ - { - name: "requesting system", - want: &System{}, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ret := NewSystem() - assert.Equal(t, ret, tc.want) - }) - } -} - -func TestSystem_UpsertConnection(t *testing.T) { - type fields struct { - connections map[string]connection.Connection - providers map[string]string - s *System - } - type args struct { - ctx context.Context - config interface{} - name string - provider string - } - tests := []struct { - name string - fields fields - args args - wantErr bool - match bool - }{ - { - name: "Create a new connection with dapr provider", - fields: fields{ - connections: testSystem.connections, - providers: testSystem.providers, - s: &System{}, - }, - args: args{ - ctx: context.Background(), - config: map[string]interface{}{ - "component": "pubsub", - }, - name: "dapr", - provider: "dapr", - }, - wantErr: false, - match: true, - }, - { - name: "Update a connection to use test provider", - fields: fields{ - connections: nil, - providers: map[string]string{"audit": "dapr"}, - s: &System{ - mux: sync.RWMutex{}, - providers: map[string]string{"audit": "dapr"}, - }, - }, - args: args{ - ctx: context.Background(), - config: map[string]interface{}{ - "component": "pubsub", - }, - name: "audit", - provider: "test", - }, - wantErr: true, - match: true, - }, - { - name: "Update a connection using same provider", - fields: fields{ - connections: testSystem.connections, - providers: map[string]string{"dapr": "dapr"}, - s: &System{ - mux: sync.RWMutex{}, - providers: testSystem.providers, - connections: testSystem.connections, - }, - }, - args: args{ - ctx: context.Background(), - config: map[string]interface{}{ - "component": "test", - }, - name: "audit", - provider: "dapr", - }, - wantErr: false, - match: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if err := tt.fields.s.UpsertConnection(tt.args.ctx, tt.args.config, tt.args.name, tt.args.provider); (err != nil) != tt.wantErr { - t.Errorf("System.UpsertConnection() error = %v, wantErr %v", err, tt.wantErr) - } - assert.NotEqual(t, nil, tt.fields.s.connections) - if tt.match { - assert.Equal(t, tt.fields.providers, tt.fields.s.providers) - } else { - assert.NotEqual(t, tt.fields.providers, tt.fields.s.providers) - } - }) - } -} - -func TestSystem_CloseConnection(t *testing.T) { - type fields struct { - connections map[string]connection.Connection - providers map[string]string - } - type args struct { - connection string - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "closing connection", - fields: fields{ - connections: map[string]connection.Connection{"audit": &dapr.Dapr{}}, - providers: map[string]string{"audit": "dapr"}, - }, - args: args{connection: "audit"}, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := &System{ - mux: sync.RWMutex{}, - connections: tt.fields.connections, - providers: tt.fields.providers, - } - if err := s.CloseConnection(tt.args.connection); (err != nil) != tt.wantErr { - t.Errorf("System.CloseConnection() error = %v, wantErr %v", err, tt.wantErr) - _, ok := s.connections[tt.args.connection] - assert.False(t, ok) - } - }) - } -} - -func TestSystem_Publish(t *testing.T) { - type fields struct { - connections map[string]connection.Connection - providers map[string]string - } - type args struct { - ctx context.Context - connection string - topic string - msg interface{} - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "There are no connections established", - fields: fields{ - connections: nil, - providers: nil, - }, - args: args{ctx: context.Background(), connection: "audit", topic: "test", msg: nil}, - wantErr: true, - }, - { - name: "Publishing to a connection that does not exist", - fields: fields{ - connections: map[string]connection.Connection{"audit": &dapr.Dapr{}}, - providers: map[string]string{"audit": "dapr"}, - }, - args: args{ctx: context.Background(), connection: "test", topic: "test", msg: nil}, - wantErr: true, - }, - { - name: "Publishing to a connection that does exist", - fields: fields{ - connections: testSystem.connections, - providers: testSystem.providers, - }, - args: args{ctx: context.Background(), connection: "dapr", topic: "test", msg: nil}, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := &System{ - mux: sync.RWMutex{}, - connections: tt.fields.connections, - providers: tt.fields.providers, - } - if err := s.Publish(tt.args.ctx, tt.args.connection, tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr { - t.Errorf("System.Publish() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/test/pubsub/fake-subscriber/main.go b/test/pubsub/fake-subscriber/main.go index fadd3aac2c9..96271c8a4d7 100644 --- a/test/pubsub/fake-subscriber/main.go +++ b/test/pubsub/fake-subscriber/main.go @@ -11,7 +11,7 @@ import ( daprd "github.com/dapr/go-sdk/service/http" ) -type PubsubMsg struct { +type ExportMsg struct { ID string `json:"id,omitempty"` Details interface{} `json:"details,omitempty"` EventType string `json:"eventType,omitempty"` @@ -52,7 +52,7 @@ func main() { } func eventHandler(_ context.Context, e *common.TopicEvent) (retry bool, err error) { - var msg PubsubMsg + var msg ExportMsg jsonInput, err := strconv.Unquote(string(e.RawData)) if err != nil { log.Fatalf("error unquoting %v", err) diff --git a/test/pubsub/publish-components.yaml b/test/pubsub/publish-components.yaml index 9686935dd01..e623acabe6f 100644 --- a/test/pubsub/publish-components.yaml +++ b/test/pubsub/publish-components.yaml @@ -21,7 +21,7 @@ metadata: name: audit namespace: gatekeeper-system data: - provider: "dapr" + driver: "dapr" config: | { "component": "pubsub" diff --git a/website/docs/audit.md b/website/docs/audit.md index f73b9ae0649..9ce938be687 100644 --- a/website/docs/audit.md +++ b/website/docs/audit.md @@ -133,14 +133,14 @@ In addition to violations, these other audit events may be useful (all uniquely All of these events (including `violation_audited`) are marked with the same `audit_id` for a given audit run. -### Pubsub channel +### Export violations -This feature uses publish and subscribe (pubsub) model that allows Gatekeeper to export audit violations over a broker that can be consumed by a subscriber independently. Therefore, pubsub violations are not subject to reporting limits. Please refer to [this](pubsub.md) guide to configure audit to push violations over a channel. +This feature allows plugging in different backends that allows Gatekeeper to export audit violations. Therefore, violations are not subject to reporting limits. Please refer to [this](export.md) guide to configure audit to push violations via this feature. -Limitations/drawbacks of getting violations using pubsub channel: +Limitations/drawbacks of exporting violations: -- There is an inherent risk of messages getting dropped. You might not receive all the published violations. -- Additional dependency on pubsub broker. +- There is a risk of messages getting dropped. You might not receive all the exported violations. This depends on the type of backend you are using for delivery. For example, using a network as backend to export violation has the risk of messages getting dropped. +- Additional dependency depending on what is plugged in. For example, using pubsub tools to export violations. ## Running Audit For more details on how to deploy audit and @@ -148,7 +148,7 @@ number of instances to run, please refer to [operations audit](operations.md#aud ## Configuring Audit -- Audit violations per constraint: set `--constraint-violations-limit=123` (defaults to `20`). NOTE: This flag only impacts when gathering audit results using the constraint status model. If you are gathering audit results using the pubsub model, please refer to the [pubsub](pubsub.md) guide. Both approaches for getting audit violations can be configured independently and work simultaneously without any interference. +- Audit violations per constraint: set `--constraint-violations-limit=123` (defaults to `20`). NOTE: This flag only impacts when gathering audit results using the constraint status model. If you want to gather audit results via other means, please refer to the [exporting audit results](export.md) guide. Both approaches for getting audit violations can be configured independently and work simultaneously without any interference. - Audit chunk size: set `--audit-chunk-size=400` (defaults to `500`, `0` = infinite) Lower chunk size can reduce memory consumption of the auditing `Pod` but can increase the number requests to the Kubernetes API server. - Audit interval: set `--audit-interval=123` (defaults to every `60` seconds). Disable audit interval by setting `--audit-interval=0` - Audit api server cache write to disk (Gatekeeper v3.7.0+): Starting from v3.7.0, by default, audit writes api server cache to the disk attached to the node. This reduces the memory consumption of the audit `pod`. If there are concerns with high IOPS, then switch audit to write cache to a tmpfs ramdisk instead. NOTE: write to ramdisk will increase memory footprint of the audit `pod`. diff --git a/website/docs/export-driver-walkthrough.md b/website/docs/export-driver-walkthrough.md new file mode 100644 index 00000000000..c5b0fc9e5ce --- /dev/null +++ b/website/docs/export-driver-walkthrough.md @@ -0,0 +1,100 @@ +--- +id: export-driver +title: Export Interface/Driver walkthrough +--- + +This guide provides an overview of the driver interface, including details of its structure and functionality. Additionally, it offers instructions on adding a new driver and utilizing different backends to export violations. + +## Driver interface + +```go +type Driver interface { + // Publish publishes single message with specific subject using a connection + Publish(ctx context.Context, connectionName string, data interface{}, subject string) error + + // CloseConnection closes a connection + CloseConnection(connectionName string) error + + // UpdateConnection updates an existing connection + UpdateConnection(ctx context.Context, connectionName string, config interface{}) error + + // CreateConnection creates new connection + CreateConnection(ctx context.Context, connectionName string, config interface{}) error +} +``` + +As an example, the Dapr driver implements these methods to publish message and manage connection to do so. Please refer to [dapr.go](https://github.com/open-policy-agent/gatekeeper/blob/master/pkg/export/dapr/dapr.go) to understand the logic that goes in each of these methods. + +### How to add new driver to export violations to foo backend + +A driver must maintain a map of open connections associated with backend `foo`. + +```go +type Connection struct { + // properties needed for individual connection +} + +type Foo struct { + openConnections map[string]Connection +} + +const ( + Name = "foo" +) + +var Connections = &Foo{ + openConnections: make(map[string]Connection), +} + +``` + +A driver must implement the `Driver` interface. + +```go +func (r *Foo) Publish(ctx context.Context, connectionName string, data interface{}, subject string) error { + ... +} + +func (r *Foo) loseConnection(connectionName string) error { + ... +} + +func (r *Foo) UpdateConnection(ctx context.Context, connectionName string, config interface{}) error { + ... +} + +func (r *Foo) CreateConnection(ctx context.Context, connectionName string, config interface{}) error { + ... +} +``` + +This newly added driver's `Connections` exported variable must be added to the map of `SupportedDrivers` in [system.go](https://github.com/open-policy-agent/gatekeeper/blob/master/pkg/export/provider/system.go). For example, + +```go +var SupportedDrivers = map[string]driver.Driver{ + dapr.Name: dapr.Connections, + foo.Name: foo.Connections, +} +``` + +And thats it! Exporter system will take the newly added driver into account and whenever a configMap to establish connection to export message is created. + +### How to establish connections to different backend + +To enable audit to use this driver to publish messages, a connection configMap with appropriate `config` and `driver` is needed. For example, + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: audit + namespace: gatekeeper-system +data: + driver: "foo" + config: | + { + + } +``` + +> The `data.driver` field must exist and must match one of the keys of the `SupportedDrivers` map that was defined earlier to use the corresponding driver. The `data.config` field in the configuration can vary depending on the driver being used. For dapr driver, `data.config` must be `{"component": "pubsub"}`. diff --git a/website/docs/pubsub.md b/website/docs/export.md similarity index 92% rename from website/docs/pubsub.md rename to website/docs/export.md index 8c1df5fb3c0..5af87327c8a 100644 --- a/website/docs/pubsub.md +++ b/website/docs/export.md @@ -1,6 +1,6 @@ --- -id: pubsub -title: Consuming violations using Pubsub +id: export +title: Exporting violations --- `Feature State`: Gatekeeper version v3.13+ (alpha) @@ -9,7 +9,7 @@ title: Consuming violations using Pubsub ## Description -This feature pushes audit violations to a pubsub service. Users can subscribe to pubsub service to consume violations. +This feature exports audit violations to a backend from where users can consume violations. > To gain insights into different methods of obtaining audit violations and the respective trade-offs for each approach, please refer to [Reading Audit Results](audit.md#reading-audit-results). @@ -17,11 +17,11 @@ This feature pushes audit violations to a pubsub service. Users can subscribe to Install prerequisites such as a pubsub tool, a message broker etc. -### Setting up audit with pubsub enabled +### Setting up audit to export violations In the audit deployment, set the `--enable-pub-sub` flag to `true` to publish audit violations. Additionally, use `--audit-connection` (defaults to `audit-connection`) and `--audit-channel`(defaults to `audit-channel`) flags to allow audit to publish violations using desired connection onto desired channel. `--audit-connection` must be set to the name of the connection config, and `--audit-channel` must be set to name of the channel where violations should get published. -A ConfigMap that contains `provider` and `config` fields in `data` is required to establish connection for sending violations over the channel. Following is an example ConfigMap to establish a connection that uses Dapr to publish messages: +A ConfigMap that contains `driver` and `config` fields in `data` is required to establish connection for sending violations over the channel. Following is an example ConfigMap to establish a connection that uses Dapr to publish messages: ```yaml apiVersion: v1 @@ -30,20 +30,20 @@ metadata: name: audit-connection namespace: gatekeeper-system data: - provider: "dapr" + driver: "dapr" config: | { "component": "pubsub" } ``` -- `provider` field determines which tool/driver should be used to establish a connection. Valid values are: `dapr` +- `driver` field determines which tool/driver should be used to establish a connection. Valid values are: `dapr` - `config` field is a json object that configures how the connection is made. E.g. which queue messages should be sent to. -#### Available Pubsub drivers +#### Available drivers Dapr: https://dapr.io/ -### Quick start with publishing violations using Dapr and Redis +### Quick start with exporting violations using Dapr and Redis #### Prerequisites @@ -130,7 +130,7 @@ Dapr: https://dapr.io/ > [!IMPORTANT] > Please make sure `fake-subscriber` image is built and available in your cluster. Dockerfile to build image for `fake-subscriber` is under [gatekeeper/test/fake-subscriber](https://github.com/open-policy-agent/gatekeeper/tree/master/test/pubsub/fake-subscriber). -#### Configure Gatekeeper with Pubsub enabled +#### Configure Gatekeeper with Export enabled 1. Create Gatekeeper namespace, and create Dapr pubsub component and Redis secret in Gatekeeper's namespace (`gatekeeper-system` by default). Please make sure to update `gatekeeper-system` namespace for the next steps if your cluster's Gatekeeper namespace is different. @@ -180,7 +180,7 @@ Dapr: https://dapr.io/ name: audit-connection namespace: gatekeeper-system data: - provider: "dapr" + driver: "dapr" config: | { "component": "pubsub" diff --git a/website/docs/pubsub-driver-walkthrough.md b/website/docs/pubsub-driver-walkthrough.md deleted file mode 100644 index 4d598946640..00000000000 --- a/website/docs/pubsub-driver-walkthrough.md +++ /dev/null @@ -1,61 +0,0 @@ ---- -id: pubsub-driver -title: Pubsub Interface/Driver walkthrough ---- - -This guide provides an overview of the pubsub interface, including details on its structure and functionality. Additionally, it offers instructions on adding a new driver and utilizing providers other than the default provider Dapr. - -## Pubsub interface and Driver walkthrough - -Pubsub's connection interface looks like -```go -// Connection is the interface that wraps pubsub methods. -type Connection interface { - // Publish single message over a specific topic/channel - Publish(ctx context.Context, message interface{}, topic string) error - - // Close connections - CloseConnection() error - - // Update an existing connection with new configuration - UpdateConnection(ctx context.Context, config interface{}) error -} -``` - -As an example, the Dapr driver implements these three methods to publish message, close connection, and update connection respectively. Please refer to [dapr.go](https://github.com/open-policy-agent/gatekeeper/blob/master/pkg/pubsub/dapr/dapr.go) to understand the logic that goes in each of these methods. Additionally, the Dapr driver also implements `func NewConnection(_ context.Context, config interface{}) (connection.Connection, error)` method that returns a new client for dapr. - -### How to add new drivers - -**Note:** For example, if we want to add a new driver to use `foo` instead of Dapr as a tool to publish violations. - -A driver must implement the `Connection` interface and a new `func NewConnection(_ context.Context, config interface{}) (connection.Connection, error)` method that returns a client for the respective tool. - -This newly added driver's `NewConnection` method must be used to create a new `pubSubs` object in [provider.go](https://github.com/open-policy-agent/gatekeeper/blob/master/pkg/pubsub/provider/provider.go). For example, - -```go -var pubSubs = newPubSubSet(map[string]InitiateConnection{ - dapr.Name: dapr.NewConnection, - "foo": foo.NewConnection, -}, -) -``` - -### How to use different providers - -To enable audit to use this driver to publish messages, a connection configMap with appropriate `config` and `provider` is needed. For example, - -```yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: audit - namespace: gatekeeper-system -data: - provider: "foo" - config: | - { - - } -``` - -> The `data.provider` field must exist and must match one of the keys of the `pubSubs` map that was defined earlier to use the corresponding driver. The `data.config` field in the configuration can vary depending on the driver being used. For dapr driver, `data.config` must be `{"component": "pubsub"}`. diff --git a/website/sidebars.js b/website/sidebars.js index f81ac8de716..5b558445f3c 100644 --- a/website/sidebars.js +++ b/website/sidebars.js @@ -34,7 +34,7 @@ module.exports = { 'expansion', 'gator', 'workload-resources', - 'pubsub', + 'export', 'validating-admission-policy', 'enforcement-points' ], @@ -66,7 +66,7 @@ module.exports = { 'developers', 'help', 'security', - 'pubsub-driver' + 'export-driver' ], } ]