Skip to content

Commit

Permalink
Message creation to support subject fields + additional data, ensure …
Browse files Browse the repository at this point in the history
…tenant id exists in events (#112)

* rework message creation to take functional arguments, support subject fields + additional data. add tenant to messages in additional subject urns and subject fields.

Signed-off-by: E Camden Fisher <efisher@equinix.com>

* add tenant to subject fields

Signed-off-by: E Camden Fisher <efisher@equinix.com>

* Update internal/pubsub/message.go

Co-authored-by: Matt Siwiec <rizzza@users.noreply.github.com>
Signed-off-by: E Camden Fisher <fish@fishnix.net>

---------

Signed-off-by: E Camden Fisher <efisher@equinix.com>
Signed-off-by: E Camden Fisher <fish@fishnix.net>
Co-authored-by: Matt Siwiec <rizzza@users.noreply.github.com>
  • Loading branch information
fishnix and rizzza authored Apr 26, 2023
1 parent ebc5cfd commit 9b2e66c
Show file tree
Hide file tree
Showing 24 changed files with 376 additions and 124 deletions.
3 changes: 2 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
"ms-azuretools.vscode-docker",
"RemiMarche.cspell-tech",
"streetsidesoftware.code-spell-checker",
"netcorext.uuid-generator"
"netcorext.uuid-generator",
"ms-vscode.makefile-tools"
]
}
},
Expand Down
10 changes: 0 additions & 10 deletions internal/pubsub/assignments.go

This file was deleted.

13 changes: 8 additions & 5 deletions internal/pubsub/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package pubsub

import "errors"

// nolint
var (
ErrInvalidActorURN = errors.New("invalid actor urn")
ErrInvalidTenantURN = errors.New("invalid tenant urn")
ErrInvalidAssignmentURN = errors.New("invalid assignment urn")
ErrInvalidURN = errors.New("invalid urn")
// ErrMissingEventSubjectURN is returned when the event subject urn is missing
ErrMissingEventSubjectURN = errors.New("missing event subject urn")

// ErrMissingEventActorURN is returned when the event actor urn is missing
ErrMissingEventActorURN = errors.New("missing event actor urn")

// ErrMissingEventSource is returned when the event source is missing
ErrMissingEventSource = errors.New("missing event source")
)
10 changes: 0 additions & 10 deletions internal/pubsub/frontend.go

This file was deleted.

10 changes: 0 additions & 10 deletions internal/pubsub/load_balancer.go

This file was deleted.

108 changes: 108 additions & 0 deletions internal/pubsub/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package pubsub

import (
"time"

"go.infratographer.com/x/pubsubx"
)

const (
// DefaultMessageSource is the default source for messages
DefaultMessageSource = "load-balancer-api"
)

// NewMessage functionally generates a new pubsub message and appends the tenantURN
// to the list of additional subject urns
func NewMessage(tenantURN string, opts ...MsgOption) (*pubsubx.Message, error) {
msg := pubsubx.Message{
Timestamp: time.Now().UTC(),
Source: DefaultMessageSource,
}

for _, opt := range opts {
opt(&msg)
}

msg.AdditionalSubjectURNs = append(msg.AdditionalSubjectURNs, tenantURN)

if msg.SubjectFields == nil {
msg.SubjectFields = make(map[string]string)
}

msg.SubjectFields["tenant_urn"] = tenantURN

if err := validatePubsubMessage(&msg); err != nil {
return nil, err
}

return &msg, nil
}

// MsgOption is a functional argument for NewMessage
type MsgOption func(m *pubsubx.Message)

// WithEventType sets the event type of the message
func WithEventType(e string) MsgOption {
return func(m *pubsubx.Message) {
m.EventType = e
}
}

// WithSource sets the source of the message
func WithSource(s string) MsgOption {
return func(m *pubsubx.Message) {
m.Source = s
}
}

// WithActorURN sets the actor urn of the message
func WithActorURN(u string) MsgOption {
return func(m *pubsubx.Message) {
m.ActorURN = u
}
}

// WithSubjectURN sets the subject urn of the message
func WithSubjectURN(s string) MsgOption {
return func(m *pubsubx.Message) {
m.SubjectURN = s
}
}

// WithAdditionalSubjectURNs sets the additional subject urns of the message
func WithAdditionalSubjectURNs(a ...string) MsgOption {
return func(m *pubsubx.Message) {
m.AdditionalSubjectURNs = a
}
}

// WithSubjectFields sets the subject fields of the message
func WithSubjectFields(f map[string]string) MsgOption {
return func(m *pubsubx.Message) {
m.SubjectFields = f
}
}

// WithAdditionalData sets the additional data of the message
func WithAdditionalData(d map[string]interface{}) MsgOption {
return func(m *pubsubx.Message) {
m.AdditionalData = d
}
}

// validatePubsubMessage validates a pubsub message for required fields
func validatePubsubMessage(msg *pubsubx.Message) error {
if msg.SubjectURN == "" {
return ErrMissingEventSubjectURN
}

if msg.ActorURN == "" {
return ErrMissingEventActorURN
}

if msg.Source == "" {
return ErrMissingEventSource
}

return nil
}
64 changes: 64 additions & 0 deletions internal/pubsub/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package pubsub

import (
"testing"

"github.com/stretchr/testify/assert"
"go.infratographer.com/x/pubsubx"
)

func Test_validatePubsubMessage(t *testing.T) {
tests := []struct {
name string
msg *pubsubx.Message
wantErr bool
}{
{
name: "valid message",
msg: &pubsubx.Message{
EventType: "test",
Source: "test",
SubjectURN: "foo",
ActorURN: "bar",
},
wantErr: false,
},
{
name: "missing source",
msg: &pubsubx.Message{
EventType: "test",
SubjectURN: "foo",
ActorURN: "bar",
},
wantErr: true,
},
{
name: "missing subject urn",
msg: &pubsubx.Message{
EventType: "test",
Source: "test",
ActorURN: "bar",
},
wantErr: true,
},
{
name: "missing actor urn",
msg: &pubsubx.Message{
EventType: "test",
Source: "test",
SubjectURN: "foo",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validatePubsubMessage(tt.msg)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
})
}
}
10 changes: 0 additions & 10 deletions internal/pubsub/origins.go

This file was deleted.

10 changes: 0 additions & 10 deletions internal/pubsub/pools.go

This file was deleted.

11 changes: 0 additions & 11 deletions internal/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/nats-io/nats.go"
"go.infratographer.com/x/pubsubx"
Expand All @@ -24,16 +23,6 @@ const (
// May be a config option later
var prefix = "com.infratographer.events"

func newMessage(actorURN string, subjectURN string, additionalSubjectURNs ...string) *pubsubx.Message {
return &pubsubx.Message{
SubjectURN: subjectURN,
ActorURN: actorURN, // comes from the jwt eventually
Timestamp: time.Now().UTC(),
Source: "lbapi",
AdditionalSubjectURNs: additionalSubjectURNs,
}
}

// PublishCreate publishes a create event
func (c *Client) PublishCreate(ctx context.Context, actor, location string, data *pubsubx.Message) error {
data.EventType = CreateEventType
Expand Down
20 changes: 15 additions & 5 deletions pkg/api/v1/assignments_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,22 @@ func (r *Router) assignmentsCreate(c echo.Context) error {
return v1BadRequestResponse(c, err)
}

msg, err := pubsub.NewAssignmentMessage(
someTestJWTURN,
msg, err := pubsub.NewMessage(
pubsub.NewTenantURN(tenantID),
pubsub.NewAssignmentURN(assignmentID),
pubsub.NewLoadBalancerURN(port.LoadBalancerID),
pubsub.NewPoolURN(payload.PoolID),
pubsub.WithActorURN(someTestJWTURN),
pubsub.WithSubjectURN(
pubsub.NewAssignmentURN(assignmentID),
),
pubsub.WithAdditionalSubjectURNs(
pubsub.NewLoadBalancerURN(port.LoadBalancerID),
pubsub.NewPoolURN(payload.PoolID),
),
pubsub.WithSubjectFields(
map[string]string{
"tenant_id": tenantID,
"tenant_urn": pubsub.NewTenantURN(tenantID),
},
),
)
if err != nil {
// TODO: add status to reconcile and requeue this
Expand Down
23 changes: 18 additions & 5 deletions pkg/api/v1/assignments_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,24 @@ func (r *Router) assignmentsDelete(c echo.Context) error {
r.logger.Error("error fetching port", zap.Error(err))
}

msg, err := pubsub.NewAssignmentMessage(
someTestJWTURN,
pubsub.NewTenantURN(assignments[0].TenantID),
pubsub.NewAssignmentURN(assignments[0].AssignmentID),
pubsub.NewLoadBalancerURN(feModel.LoadBalancerID),
tenantID := assignments[0].TenantID
assignmentID := assignments[0].AssignmentID

msg, err := pubsub.NewMessage(
pubsub.NewTenantURN(tenantID),
pubsub.WithActorURN(someTestJWTURN),
pubsub.WithSubjectURN(
pubsub.NewAssignmentURN(assignmentID),
),
pubsub.WithAdditionalSubjectURNs(
pubsub.NewLoadBalancerURN(feModel.LoadBalancerID),
),
pubsub.WithSubjectFields(
map[string]string{
"tenant_id": tenantID,
"tenant_urn": pubsub.NewTenantURN(tenantID),
},
),
)
if err != nil {
// TODO: add status to reconcile and requeue this
Expand Down
16 changes: 12 additions & 4 deletions pkg/api/v1/load_balancers_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,19 @@ func (r *Router) loadBalancerCreate(c echo.Context) error {

r.logger.Info("created new load balancer", zap.Any("loadbalancer.id", lb.LoadBalancerID))

msg, err := pubsub.NewLoadBalancerMessage(
someTestJWTURN,
msg, err := pubsub.NewMessage(
pubsub.NewTenantURN(tenantID),
pubsub.NewLoadBalancerURN(lb.LoadBalancerID),
additionalURNs...,
pubsub.WithActorURN(someTestJWTURN),
pubsub.WithSubjectURN(
pubsub.NewLoadBalancerURN(lb.LoadBalancerID),
),
pubsub.WithAdditionalSubjectURNs(additionalURNs...),
pubsub.WithSubjectFields(
map[string]string{
"tenant_id": tenantID,
"tenant_urn": pubsub.NewTenantURN(tenantID),
},
),
)
if err != nil {
// TODO: add status to reconcile and requeue this
Expand Down
19 changes: 15 additions & 4 deletions pkg/api/v1/load_balancers_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,21 @@ func (r *Router) loadBalancerDelete(c echo.Context) error {
return v1InternalServerErrorResponse(c, err)
}

msg, err := pubsub.NewLoadBalancerMessage(
someTestJWTURN,
pubsub.NewLoadBalancerURN(lb[0].TenantID),
pubsub.NewLoadBalancerURN(lb[0].LoadBalancerID),
tenantID := lb[0].TenantID
lbID := lb[0].LoadBalancerID

msg, err := pubsub.NewMessage(
pubsub.NewTenantURN(tenantID),
pubsub.WithActorURN(someTestJWTURN),
pubsub.WithSubjectURN(
pubsub.NewLoadBalancerURN(lbID),
),
pubsub.WithSubjectFields(
map[string]string{
"tenant_id": tenantID,
"tenant_urn": pubsub.NewTenantURN(tenantID),
},
),
)
if err != nil {
// TODO: add status to reconcile and requeue this
Expand Down
Loading

0 comments on commit 9b2e66c

Please sign in to comment.