diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 18f93b1ec..562440855 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -26,7 +26,8 @@ "ms-azuretools.vscode-docker", "RemiMarche.cspell-tech", "streetsidesoftware.code-spell-checker", - "netcorext.uuid-generator" + "netcorext.uuid-generator", + "ms-vscode.makefile-tools" ] } }, diff --git a/internal/pubsub/assignments.go b/internal/pubsub/assignments.go deleted file mode 100644 index 42d95c53d..000000000 --- a/internal/pubsub/assignments.go +++ /dev/null @@ -1,10 +0,0 @@ -package pubsub - -import ( - "go.infratographer.com/x/pubsubx" -) - -// NewAssignmentMessage creates a new assignment event message -func NewAssignmentMessage(actorURN string, tenantURN string, assignmentURN string, additionalSubjectURNs ...string) (*pubsubx.Message, error) { - return newMessage(actorURN, assignmentURN, additionalSubjectURNs...), nil -} diff --git a/internal/pubsub/errors.go b/internal/pubsub/errors.go index c3ac9f1db..bbe6862c6 100644 --- a/internal/pubsub/errors.go +++ b/internal/pubsub/errors.go @@ -2,10 +2,13 @@ package pubsub import "errors" -// nolint var ( - ErrInvalidActorURN = errors.New("invalid actor urn") - ErrInvalidTenantURN = errors.New("invalid tenant urn") - ErrInvalidAssignmentURN = errors.New("invalid assignment urn") - ErrInvalidURN = errors.New("invalid urn") + // ErrMissingEventSubjectURN is returned when the event subject urn is missing + ErrMissingEventSubjectURN = errors.New("missing event subject urn") + + // ErrMissingEventActorURN is returned when the event actor urn is missing + ErrMissingEventActorURN = errors.New("missing event actor urn") + + // ErrMissingEventSource is returned when the event source is missing + ErrMissingEventSource = errors.New("missing event source") ) diff --git a/internal/pubsub/frontend.go b/internal/pubsub/frontend.go deleted file mode 100644 index 509c97194..000000000 --- a/internal/pubsub/frontend.go +++ /dev/null @@ -1,10 +0,0 @@ -package pubsub - -import ( - "go.infratographer.com/x/pubsubx" -) - -// NewPortMessage creates a new port message -func NewPortMessage(actorURN string, tenantURN string, portURN string, additionalSubjectURNs ...string) (*pubsubx.Message, error) { - return newMessage(actorURN, portURN, additionalSubjectURNs...), nil -} diff --git a/internal/pubsub/load_balancer.go b/internal/pubsub/load_balancer.go deleted file mode 100644 index e89f9c66b..000000000 --- a/internal/pubsub/load_balancer.go +++ /dev/null @@ -1,10 +0,0 @@ -package pubsub - -import ( - "go.infratographer.com/x/pubsubx" -) - -// NewLoadBalancerMessage creates a new loadbalancer event message -func NewLoadBalancerMessage(actorURN string, tenantURN string, loadBalancerURN string, additionalSubjectURNs ...string) (*pubsubx.Message, error) { - return newMessage(actorURN, loadBalancerURN, additionalSubjectURNs...), nil -} diff --git a/internal/pubsub/message.go b/internal/pubsub/message.go new file mode 100644 index 000000000..a6bcd7204 --- /dev/null +++ b/internal/pubsub/message.go @@ -0,0 +1,108 @@ +package pubsub + +import ( + "time" + + "go.infratographer.com/x/pubsubx" +) + +const ( + // DefaultMessageSource is the default source for messages + DefaultMessageSource = "load-balancer-api" +) + +// NewMessage functionally generates a new pubsub message and appends the tenantURN +// to the list of additional subject urns +func NewMessage(tenantURN string, opts ...MsgOption) (*pubsubx.Message, error) { + msg := pubsubx.Message{ + Timestamp: time.Now().UTC(), + Source: DefaultMessageSource, + } + + for _, opt := range opts { + opt(&msg) + } + + msg.AdditionalSubjectURNs = append(msg.AdditionalSubjectURNs, tenantURN) + + if msg.SubjectFields == nil { + msg.SubjectFields = make(map[string]string) + } + + msg.SubjectFields["tenant_urn"] = tenantURN + + if err := validatePubsubMessage(&msg); err != nil { + return nil, err + } + + return &msg, nil +} + +// MsgOption is a functional argument for NewMessage +type MsgOption func(m *pubsubx.Message) + +// WithEventType sets the event type of the message +func WithEventType(e string) MsgOption { + return func(m *pubsubx.Message) { + m.EventType = e + } +} + +// WithSource sets the source of the message +func WithSource(s string) MsgOption { + return func(m *pubsubx.Message) { + m.Source = s + } +} + +// WithActorURN sets the actor urn of the message +func WithActorURN(u string) MsgOption { + return func(m *pubsubx.Message) { + m.ActorURN = u + } +} + +// WithSubjectURN sets the subject urn of the message +func WithSubjectURN(s string) MsgOption { + return func(m *pubsubx.Message) { + m.SubjectURN = s + } +} + +// WithAdditionalSubjectURNs sets the additional subject urns of the message +func WithAdditionalSubjectURNs(a ...string) MsgOption { + return func(m *pubsubx.Message) { + m.AdditionalSubjectURNs = a + } +} + +// WithSubjectFields sets the subject fields of the message +func WithSubjectFields(f map[string]string) MsgOption { + return func(m *pubsubx.Message) { + m.SubjectFields = f + } +} + +// WithAdditionalData sets the additional data of the message +func WithAdditionalData(d map[string]interface{}) MsgOption { + return func(m *pubsubx.Message) { + m.AdditionalData = d + } +} + +// validatePubsubMessage validates a pubsub message for required fields +func validatePubsubMessage(msg *pubsubx.Message) error { + if msg.SubjectURN == "" { + return ErrMissingEventSubjectURN + } + + if msg.ActorURN == "" { + return ErrMissingEventActorURN + } + + if msg.Source == "" { + return ErrMissingEventSource + } + + return nil +} diff --git a/internal/pubsub/message_test.go b/internal/pubsub/message_test.go new file mode 100644 index 000000000..a77d7f68d --- /dev/null +++ b/internal/pubsub/message_test.go @@ -0,0 +1,64 @@ +package pubsub + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.infratographer.com/x/pubsubx" +) + +func Test_validatePubsubMessage(t *testing.T) { + tests := []struct { + name string + msg *pubsubx.Message + wantErr bool + }{ + { + name: "valid message", + msg: &pubsubx.Message{ + EventType: "test", + Source: "test", + SubjectURN: "foo", + ActorURN: "bar", + }, + wantErr: false, + }, + { + name: "missing source", + msg: &pubsubx.Message{ + EventType: "test", + SubjectURN: "foo", + ActorURN: "bar", + }, + wantErr: true, + }, + { + name: "missing subject urn", + msg: &pubsubx.Message{ + EventType: "test", + Source: "test", + ActorURN: "bar", + }, + wantErr: true, + }, + { + name: "missing actor urn", + msg: &pubsubx.Message{ + EventType: "test", + Source: "test", + SubjectURN: "foo", + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validatePubsubMessage(tt.msg) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) + } +} diff --git a/internal/pubsub/origins.go b/internal/pubsub/origins.go deleted file mode 100644 index aa73a6394..000000000 --- a/internal/pubsub/origins.go +++ /dev/null @@ -1,10 +0,0 @@ -package pubsub - -import ( - "go.infratographer.com/x/pubsubx" -) - -// NewOriginMessage creates a new origin message -func NewOriginMessage(actorURN string, tenantURN string, originURN string, additionalSubjectURNs ...string) (*pubsubx.Message, error) { - return newMessage(actorURN, originURN, additionalSubjectURNs...), nil -} diff --git a/internal/pubsub/pools.go b/internal/pubsub/pools.go deleted file mode 100644 index a543c163a..000000000 --- a/internal/pubsub/pools.go +++ /dev/null @@ -1,10 +0,0 @@ -package pubsub - -import ( - "go.infratographer.com/x/pubsubx" -) - -// NewPoolMessage creates a new pool event message -func NewPoolMessage(actorURN string, tenantURN string, poolURN string, additionalSubjectURNs ...string) (*pubsubx.Message, error) { - return newMessage(actorURN, poolURN, additionalSubjectURNs...), nil -} diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go index c393bfcfe..2306d52e7 100644 --- a/internal/pubsub/pubsub.go +++ b/internal/pubsub/pubsub.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/nats-io/nats.go" "go.infratographer.com/x/pubsubx" @@ -24,16 +23,6 @@ const ( // May be a config option later var prefix = "com.infratographer.events" -func newMessage(actorURN string, subjectURN string, additionalSubjectURNs ...string) *pubsubx.Message { - return &pubsubx.Message{ - SubjectURN: subjectURN, - ActorURN: actorURN, // comes from the jwt eventually - Timestamp: time.Now().UTC(), - Source: "lbapi", - AdditionalSubjectURNs: additionalSubjectURNs, - } -} - // PublishCreate publishes a create event func (c *Client) PublishCreate(ctx context.Context, actor, location string, data *pubsubx.Message) error { data.EventType = CreateEventType diff --git a/pkg/api/v1/assignments_create.go b/pkg/api/v1/assignments_create.go index 71fce003f..9f7027969 100644 --- a/pkg/api/v1/assignments_create.go +++ b/pkg/api/v1/assignments_create.go @@ -47,12 +47,22 @@ func (r *Router) assignmentsCreate(c echo.Context) error { return v1BadRequestResponse(c, err) } - msg, err := pubsub.NewAssignmentMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(tenantID), - pubsub.NewAssignmentURN(assignmentID), - pubsub.NewLoadBalancerURN(port.LoadBalancerID), - pubsub.NewPoolURN(payload.PoolID), + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewAssignmentURN(assignmentID), + ), + pubsub.WithAdditionalSubjectURNs( + pubsub.NewLoadBalancerURN(port.LoadBalancerID), + pubsub.NewPoolURN(payload.PoolID), + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": tenantID, + "tenant_urn": pubsub.NewTenantURN(tenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/assignments_delete.go b/pkg/api/v1/assignments_delete.go index 43aca447c..2cc08f9f5 100644 --- a/pkg/api/v1/assignments_delete.go +++ b/pkg/api/v1/assignments_delete.go @@ -43,11 +43,24 @@ func (r *Router) assignmentsDelete(c echo.Context) error { r.logger.Error("error fetching port", zap.Error(err)) } - msg, err := pubsub.NewAssignmentMessage( - someTestJWTURN, - pubsub.NewTenantURN(assignments[0].TenantID), - pubsub.NewAssignmentURN(assignments[0].AssignmentID), - pubsub.NewLoadBalancerURN(feModel.LoadBalancerID), + tenantID := assignments[0].TenantID + assignmentID := assignments[0].AssignmentID + + msg, err := pubsub.NewMessage( + pubsub.NewTenantURN(tenantID), + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewAssignmentURN(assignmentID), + ), + pubsub.WithAdditionalSubjectURNs( + pubsub.NewLoadBalancerURN(feModel.LoadBalancerID), + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": tenantID, + "tenant_urn": pubsub.NewTenantURN(tenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/load_balancers_create.go b/pkg/api/v1/load_balancers_create.go index 8d8f46a31..3d03e6042 100644 --- a/pkg/api/v1/load_balancers_create.go +++ b/pkg/api/v1/load_balancers_create.go @@ -126,11 +126,19 @@ func (r *Router) loadBalancerCreate(c echo.Context) error { r.logger.Info("created new load balancer", zap.Any("loadbalancer.id", lb.LoadBalancerID)) - msg, err := pubsub.NewLoadBalancerMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(tenantID), - pubsub.NewLoadBalancerURN(lb.LoadBalancerID), - additionalURNs..., + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewLoadBalancerURN(lb.LoadBalancerID), + ), + pubsub.WithAdditionalSubjectURNs(additionalURNs...), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": tenantID, + "tenant_urn": pubsub.NewTenantURN(tenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/load_balancers_delete.go b/pkg/api/v1/load_balancers_delete.go index 55d6fffc4..4da15c617 100644 --- a/pkg/api/v1/load_balancers_delete.go +++ b/pkg/api/v1/load_balancers_delete.go @@ -47,10 +47,21 @@ func (r *Router) loadBalancerDelete(c echo.Context) error { return v1InternalServerErrorResponse(c, err) } - msg, err := pubsub.NewLoadBalancerMessage( - someTestJWTURN, - pubsub.NewLoadBalancerURN(lb[0].TenantID), - pubsub.NewLoadBalancerURN(lb[0].LoadBalancerID), + tenantID := lb[0].TenantID + lbID := lb[0].LoadBalancerID + + msg, err := pubsub.NewMessage( + pubsub.NewTenantURN(tenantID), + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewLoadBalancerURN(lbID), + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": tenantID, + "tenant_urn": pubsub.NewTenantURN(tenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/load_balancers_update.go b/pkg/api/v1/load_balancers_update.go index 8c4137496..a26e09dd8 100644 --- a/pkg/api/v1/load_balancers_update.go +++ b/pkg/api/v1/load_balancers_update.go @@ -114,10 +114,18 @@ func (r *Router) updateLoadBalancer(c echo.Context, lb *models.LoadBalancer) err return v1InternalServerErrorResponse(c, err) } - msg, err := pubsub.NewLoadBalancerMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(lb.TenantID), - pubsub.NewLoadBalancerURN(lb.LoadBalancerID), + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewLoadBalancerURN(lb.LoadBalancerID), + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": lb.TenantID, + "tenant_urn": pubsub.NewTenantURN(lb.TenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/origins_create.go b/pkg/api/v1/origins_create.go index 2ce066621..36c00abb2 100644 --- a/pkg/api/v1/origins_create.go +++ b/pkg/api/v1/origins_create.go @@ -46,11 +46,21 @@ func (r *Router) originsCreate(c echo.Context) error { return v1BadRequestResponse(c, err) } - msg, err := pubsub.NewOriginMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(pool.TenantID), - pubsub.NewOriginURN(originID), - pubsub.NewPoolURN(pool.PoolID), + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewOriginURN(originID), + ), + pubsub.WithAdditionalSubjectURNs( + pubsub.NewPoolURN(pool.PoolID), + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": pool.TenantID, + "tenant_urn": pubsub.NewTenantURN(pool.TenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/origins_delete.go b/pkg/api/v1/origins_delete.go index 712c1c14e..3c4533d57 100644 --- a/pkg/api/v1/origins_delete.go +++ b/pkg/api/v1/origins_delete.go @@ -39,11 +39,21 @@ func (r *Router) originsDelete(c echo.Context) error { return v1InternalServerErrorResponse(c, err) } - msg, err := pubsub.NewOriginMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(origin.R.Pool.TenantID), - pubsub.NewOriginURN(origin.OriginID), - pubsub.NewPoolURN(origin.R.Pool.PoolID), + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewOriginURN(origin.OriginID), + ), + pubsub.WithAdditionalSubjectURNs( + pubsub.NewPoolURN(origin.R.Pool.PoolID), + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": origin.R.Pool.TenantID, + "tenant_urn": pubsub.NewTenantURN(origin.R.Pool.TenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/origins_update.go b/pkg/api/v1/origins_update.go index bbdd98850..a9f48d3ff 100644 --- a/pkg/api/v1/origins_update.go +++ b/pkg/api/v1/origins_update.go @@ -129,11 +129,21 @@ func (r *Router) updateOrigin(c echo.Context, origin *models.Origin) error { return v1InternalServerErrorResponse(c, err) } - msg, err := pubsub.NewOriginMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(origin.R.Pool.TenantID), - pubsub.NewOriginURN(origin.OriginID), - additionalURNs..., + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewOriginURN(origin.OriginID), + ), + pubsub.WithAdditionalSubjectURNs( + additionalURNs..., + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": origin.R.Pool.TenantID, + "tenant_urn": pubsub.NewTenantURN(origin.R.Pool.TenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/pools_create.go b/pkg/api/v1/pools_create.go index 14a9f72e4..e504e5be9 100644 --- a/pkg/api/v1/pools_create.go +++ b/pkg/api/v1/pools_create.go @@ -87,11 +87,21 @@ func (r *Router) poolCreate(c echo.Context) error { return v1InternalServerErrorResponse(c, err) } - msg, err := pubsub.NewPoolMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(tenantID), - pubsub.NewPoolURN(pool.PoolID), - additionalURNs..., + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewPoolURN(pool.PoolID), + ), + pubsub.WithAdditionalSubjectURNs( + additionalURNs..., + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": tenantID, + "tenant_urn": pubsub.NewTenantURN(tenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/pools_delete.go b/pkg/api/v1/pools_delete.go index 275036d70..d589bc1eb 100644 --- a/pkg/api/v1/pools_delete.go +++ b/pkg/api/v1/pools_delete.go @@ -89,12 +89,21 @@ func (r *Router) poolDelete(c echo.Context) error { return v1InternalServerErrorResponse(c, err) } - msg, err := pubsub.NewPoolMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(pool.TenantID), - pubsub.NewPoolURN(pool.PoolID), - append(assignments, origins...)..., - ) + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewPoolURN(pool.PoolID), + ), + pubsub.WithAdditionalSubjectURNs( + append(assignments, origins...)..., + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": pool.TenantID, + "tenant_urn": pubsub.NewTenantURN(pool.TenantID), + }, + )) if err != nil { // TODO: add status to reconcile and requeue this r.logger.Error("error creating pool message", zap.Error(err)) diff --git a/pkg/api/v1/pools_update.go b/pkg/api/v1/pools_update.go index 5b63a12f8..73d0f2f8f 100644 --- a/pkg/api/v1/pools_update.go +++ b/pkg/api/v1/pools_update.go @@ -103,10 +103,18 @@ func (r *Router) updatePool(c echo.Context, pool *models.Pool) error { return v1InternalServerErrorResponse(c, err) } - msg, err := pubsub.NewPoolMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(pool.TenantID), - pubsub.NewPoolURN(pool.PoolID), + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewPoolURN(pool.PoolID), + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": pool.TenantID, + "tenant_urn": pubsub.NewTenantURN(pool.TenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/ports_create.go b/pkg/api/v1/ports_create.go index e43baec37..34a57461e 100644 --- a/pkg/api/v1/ports_create.go +++ b/pkg/api/v1/ports_create.go @@ -103,11 +103,21 @@ func (r *Router) portCreate(c echo.Context) error { zap.String("loadbalancer.id", lb.LoadBalancerID), ) - msg, err := pubsub.NewPortMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(lb.TenantID), - pubsub.NewPortURN(port.PortID), - additionalURNs..., + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewPortURN(port.PortID), + ), + pubsub.WithAdditionalSubjectURNs( + additionalURNs..., + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": lb.TenantID, + "tenant_urn": pubsub.NewTenantURN(lb.TenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/ports_delete.go b/pkg/api/v1/ports_delete.go index 9393ec30a..e5aee9565 100644 --- a/pkg/api/v1/ports_delete.go +++ b/pkg/api/v1/ports_delete.go @@ -82,11 +82,21 @@ func (r *Router) portDelete(c echo.Context) error { return v1InternalServerErrorResponse(c, err) } - msg, err := pubsub.NewPortMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(loadBalancer.TenantID), - pubsub.NewPortURN(port.PortID), - append(assignments, pubsub.NewLoadBalancerURN(loadBalancer.LoadBalancerID))..., + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewPortURN(port.PortID), + ), + pubsub.WithAdditionalSubjectURNs( + append(assignments, pubsub.NewLoadBalancerURN(loadBalancer.LoadBalancerID))..., + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": loadBalancer.TenantID, + "tenant_urn": pubsub.NewTenantURN(loadBalancer.TenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this diff --git a/pkg/api/v1/ports_update.go b/pkg/api/v1/ports_update.go index ed35118b5..32b4ddab8 100644 --- a/pkg/api/v1/ports_update.go +++ b/pkg/api/v1/ports_update.go @@ -209,11 +209,21 @@ func (r *Router) updatePort(c echo.Context, port *models.Port, origPools, newPoo return "", v1InternalServerErrorResponse(c, err) } - msg, err := pubsub.NewPortMessage( - someTestJWTURN, + msg, err := pubsub.NewMessage( pubsub.NewTenantURN(lb.TenantID), - pubsub.NewPortURN(port.PortID), - additionalURNs..., + pubsub.WithActorURN(someTestJWTURN), + pubsub.WithSubjectURN( + pubsub.NewPortURN(port.PortID), + ), + pubsub.WithAdditionalSubjectURNs( + additionalURNs..., + ), + pubsub.WithSubjectFields( + map[string]string{ + "tenant_id": lb.TenantID, + "tenant_urn": pubsub.NewTenantURN(lb.TenantID), + }, + ), ) if err != nil { // TODO: add status to reconcile and requeue this