Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: updating pubsub system #3646

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/drivers/k8scel"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/externaldata"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/operations"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness/pruner"
"github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil"
Expand Down Expand Up @@ -444,7 +444,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle

mutationSystem := mutation.NewSystem(mutationOpts)
expansionSystem := expansion.NewSystem(mutationSystem)
pubsubSystem := pubsub.NewSystem()
exportSystem := export.NewSystem()

c := mgr.GetCache()
dc, ok := c.(watch.RemovableCache)
Expand Down Expand Up @@ -518,7 +518,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle
MutationSystem: mutationSystem,
ExpansionSystem: expansionSystem,
ProviderCache: providerCache,
PubsubSystem: pubsubSystem,
ExportSystem: exportSystem,
}

if err := controller.AddToManager(mgr, &opts); err != nil {
Expand Down Expand Up @@ -548,7 +548,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, sw *watch.Controlle
ProcessExcluder: processExcluder,
CacheLister: auditCache,
ExpansionSystem: expansionSystem,
PubSubSystem: pubsubSystem,
ExportSystem: exportSystem,
}
if err := audit.AddToManager(mgr, &auditDeps); err != nil {
setupLog.Error(err, "unable to register audit with the manager")
Expand Down
4 changes: 2 additions & 2 deletions pkg/audit/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

Expand All @@ -25,7 +25,7 @@ type Dependencies struct {
ProcessExcluder *process.Excluder
CacheLister *CacheLister
ExpansionSystem *expansion.System
PubSubSystem *pubsub.System
ExportSystem *export.System
}

// AddToManager adds audit manager to the Manager.
Expand Down
18 changes: 9 additions & 9 deletions pkg/audit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client"
"github.com/open-policy-agent/frameworks/constraint/pkg/client/reviews"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
pubsubController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub"
exportController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/logging"
mutationtypes "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/target"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -91,7 +91,7 @@ type Manager struct {
auditCache *CacheLister

expansionSystem *expansion.System
pubsubSystem *pubsub.System
exportSystem *export.System
}

// StatusViolation represents each violation under status.
Expand All @@ -107,7 +107,7 @@ type StatusViolation struct {
}

// ConstraintMsg represents publish message for each constraint.
type PubsubMsg struct {
type ExportMsg struct {
ID string `json:"id,omitempty"`
Details interface{} `json:"details,omitempty"`
EventType string `json:"eventType,omitempty"`
Expand Down Expand Up @@ -269,7 +269,7 @@ func New(mgr manager.Manager, deps *Dependencies) (*Manager, error) {
gkNamespace: util.GetNamespace(),
auditCache: deps.CacheLister,
expansionSystem: deps.ExpansionSystem,
pubsubSystem: deps.PubSubSystem,
exportSystem: deps.ExportSystem,
}
return am, nil
}
Expand Down Expand Up @@ -902,10 +902,10 @@ func (am *Manager) addAuditResponsesToUpdateLists(
details := r.Metadata["details"]
labels := r.obj.GetLabels()
logViolation(am.log, constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels)
if *pubsubController.PubsubEnabled {
err := am.pubsubSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp))
if *exportController.ExportEnabled {
err := am.exportSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp))
if err != nil {
am.log.Error(err, "pubsub audit Publishing")
am.log.Error(err, "error exporting audit violation")
}
}
if *emitAuditEvents {
Expand Down Expand Up @@ -1161,7 +1161,7 @@ func violationMsg(constraint *unstructured.Unstructured, enforcementAction util.
userConstraintAnnotations := constraint.GetAnnotations()
delete(userConstraintAnnotations, "kubectl.kubernetes.io/last-applied-configuration")

return PubsubMsg{
return ExportMsg{
Message: message,
Details: details,
ID: timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ limitations under the License.
package controller

import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export"
)

func init() {
Injectors = append(Injectors, &pubsub.Adder{})
Injectors = append(Injectors, &export.Adder{})
}
12 changes: 6 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/fakes"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
"github.com/open-policy-agent/gatekeeper/v3/pkg/watch"
Expand All @@ -57,8 +57,8 @@ type GetPodInjector interface {
InjectGetPod(func(context.Context) (*corev1.Pod, error))
}

type PubsubInjector interface {
InjectPubsubSystem(pubsubSystem *pubsub.System)
type ExportInjector interface {
InjectExportSystem(exportSystem *export.System)
}

type DataClientInjector interface {
Expand Down Expand Up @@ -103,7 +103,7 @@ type Dependencies struct {
MutationSystem *mutation.System
ExpansionSystem *expansion.System
ProviderCache *externaldata.ProviderCache
PubsubSystem *pubsub.System
ExportSystem *export.System
SyncEventsCh chan event.GenericEvent
CacheMgr *cm.CacheManager
}
Expand Down Expand Up @@ -215,8 +215,8 @@ func AddToManager(m manager.Manager, deps *Dependencies) error {
if a2, ok := a.(GetPodInjector); ok {
a2.InjectGetPod(deps.GetPod)
}
if a2, ok := a.(PubsubInjector); ok {
a2.InjectPubsubSystem(deps.PubsubSystem)
if a2, ok := a.(ExportInjector); ok {
a2.InjectExportSystem(deps.ExportSystem)
}
if a2, ok := a.(CacheManagerInjector); ok {
// this is used by the config controller to sync
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package pubsub
package export

import (
"context"
"encoding/json"
"flag"
"fmt"

"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/logging"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
"github.com/open-policy-agent/gatekeeper/v3/pkg/watch"
Expand All @@ -26,37 +26,37 @@ import (
)

var (
PubsubEnabled = flag.Bool("enable-pub-sub", false, "(alpha) Enabled pubsub to publish messages")
ExportEnabled = flag.Bool("enable-pub-sub", false, "(alpha) Enabled pubsub to publish messages")
log = logf.Log.WithName("controller").WithValues(logging.Process, "pubsub_controller")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to rename the flag to remove pub-sub word?

)

type Adder struct {
PubsubSystem *pubsub.System
ExportSystem *export.System
}

func (a *Adder) Add(mgr manager.Manager) error {
if !*PubsubEnabled {
if !*ExportEnabled {
return nil
}
r := newReconciler(mgr, a.PubsubSystem)
r := newReconciler(mgr, a.ExportSystem)
return add(mgr, r)
}

func (a *Adder) InjectControllerSwitch(_ *watch.ControllerSwitch) {}

func (a *Adder) InjectTracker(_ *readiness.Tracker) {}

func (a *Adder) InjectPubsubSystem(pubsubSystem *pubsub.System) {
a.PubsubSystem = pubsubSystem
func (a *Adder) InjectExportSystem(exportSystem *export.System) {
a.ExportSystem = exportSystem
}

type Reconciler struct {
client.Client
scheme *runtime.Scheme
system *pubsub.System
system *export.System
}

func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler {
func newReconciler(mgr manager.Manager, system *export.System) *Reconciler {
return &Reconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
Expand All @@ -65,7 +65,7 @@ func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler {
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New("pubsub-config-controller", mgr, controller.Options{Reconciler: r})
c, err := controller.New("export-config-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
Expand Down Expand Up @@ -113,22 +113,22 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
}

if len(cfg.Data) == 0 {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName))
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to establish connection", request.NamespacedName))
}
if _, ok := cfg.Data["provider"]; !ok {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing provider field in configmap %s, unable to configure respective pubsub", request.NamespacedName))
if _, ok := cfg.Data["driver"]; !ok {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing driver field in configmap %s, unable to establish connection", request.NamespacedName))
}
var config interface{}
err = json.Unmarshal([]byte(cfg.Data["config"]), &config)
if err != nil {
return reconcile.Result{}, err
}

err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["provider"])
err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["driver"])
if err != nil {
return reconcile.Result{}, err
}

log.Info("Connection upsert successful", "name", request.Name, "provider", cfg.Data["provider"])
log.Info("Connection upsert successful", "name", request.Name, "driver", cfg.Data["driver"])
return reconcile.Result{}, nil
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package pubsub
package export

import (
"context"
"flag"
"fmt"
"testing"

"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -48,7 +48,7 @@ func TestReconcile(t *testing.T) {
},
},
wantErr: true,
errorMsg: fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName),
errorMsg: fmt.Sprintf("data missing in configmap %s, unable to establish connection", request.NamespacedName),
},
}
for _, tc := range testCases {
Expand Down
88 changes: 88 additions & 0 deletions pkg/export/dapr/dapr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package dapr

import (
"context"
"encoding/json"
"fmt"

daprClient "github.com/dapr/go-sdk/client"
)

type Connection struct {
// Name of the component object to use in Dapr
component string

client daprClient.Client
}

// Dapr represents driver to use Dapr.
type Dapr struct {
openConnections map[string]Connection
}

const (
Name = "dapr"
)

var Connections = &Dapr{
openConnections: make(map[string]Connection),
}

func (r *Dapr) Publish(_ context.Context, connectionName string, data interface{}, topic string) error {
jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("error marshaling data: %w", err)
}

conn, ok := r.openConnections[connectionName]
if !ok {
return fmt.Errorf("connection not found: %s for Dapr driver", connectionName)
}
err = conn.client.PublishEvent(context.Background(), conn.component, topic, jsonData)
if err != nil {
return fmt.Errorf("error publishing message to dapr: %w", err)
}

return nil
}

func (r *Dapr) CloseConnection(connectionName string) error {
delete(r.openConnections, connectionName)
return nil
}

func (r *Dapr) UpdateConnection(_ context.Context, connectionName string, config interface{}) error {
cfg, ok := config.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
}
component, ok := cfg["component"].(string)
if !ok {
return fmt.Errorf("failed to get value of component")
}
conn := r.openConnections[connectionName]
conn.component = component
r.openConnections[connectionName] = conn
return nil
}

func (r *Dapr) CreateConnection(_ context.Context, connectionName string, config interface{}) error {
var conn Connection
cfg, ok := config.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
}
conn.component, ok = cfg["component"].(string)
if !ok {
return fmt.Errorf("failed to get value of component")
}

tmp, err := daprClient.NewClient()
if err != nil {
return err
}

conn.client = tmp
r.openConnections[connectionName] = conn
return nil
}
Loading
Loading