Skip to content

Commit

Permalink
Allow including ports on load balancer create POST, get ports in load…
Browse files Browse the repository at this point in the history
… balancer GET responses (#88)

* allow including ports on load balancer create POST, get ports in load balancer GET responses

Signed-off-by: E Camden Fisher <efisher@equinix.com>

* disable logging in tests

Signed-off-by: E Camden Fisher <efisher@equinix.com>

* remove stutter

Signed-off-by: E Camden Fisher <efisher@equinix.com>

* make routes separate functions in routes.go

Signed-off-by: E Camden Fisher <efisher@equinix.com>

* remove unnecessary length check on lb port pools

Signed-off-by: E Camden Fisher <efisher@equinix.com>

---------

Signed-off-by: E Camden Fisher <efisher@equinix.com>
  • Loading branch information
fishnix authored Mar 28, 2023
1 parent 9492627 commit de2435d
Show file tree
Hide file tree
Showing 25 changed files with 432 additions and 150 deletions.
6 changes: 3 additions & 3 deletions internal/pubsub/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// Client is an event bus client with some configuration
type Client struct {
js nats.JetStreamContext
logger *zap.SugaredLogger
logger *zap.Logger
prefix, stream string
}

Expand All @@ -20,7 +20,7 @@ type Option func(c *Client)
// NewClient configures and establishes a new event bus client connection
func NewClient(opts ...Option) *Client {
client := Client{
logger: zap.NewNop().Sugar(),
logger: zap.NewNop(),
}

for _, opt := range opts {
Expand Down Expand Up @@ -54,7 +54,7 @@ func WithSubjectPrefix(p string) Option {
// WithLogger sets the client logger
func WithLogger(l *zap.SugaredLogger) Option {
return func(c *Client) {
c.logger = l
c.logger = l.Desugar()
}
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/api/v1/assignments_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/labstack/echo/v4"
"github.com/volatiletech/sqlboiler/v4/boil"
"github.com/volatiletech/sqlboiler/v4/queries/qm"
"go.uber.org/zap"

"go.infratographer.com/load-balancer-api/internal/models"
"go.infratographer.com/load-balancer-api/internal/pubsub"
Expand All @@ -19,7 +20,7 @@ func (r *Router) assignmentsCreate(c echo.Context) error {
}{}

if err := c.Bind(&payload); err != nil {
r.logger.Errorw("error binding payload", "error", err)
r.logger.Error("error binding payload", zap.Error(err))
return v1BadRequestResponse(c, err)
}

Expand All @@ -34,27 +35,30 @@ func (r *Router) assignmentsCreate(c echo.Context) error {
qm.Load("LoadBalancer"),
).One(ctx, r.db)
if err != nil {
r.logger.Errorw("error fetching port", "error", err)
r.logger.Error("error fetching port", zap.Error(err))
return v1BadRequestResponse(c, err)
}

// validate pool exists
pool, err := models.Pools(
models.PoolWhere.PoolID.EQ(payload.PoolID),
models.PoolWhere.TenantID.EQ(tenantID),
).One(ctx, r.db)
if err != nil {
r.logger.Errorw("error fetching pool", "error", err)
r.logger.Error("error fetching pool", zap.Error(err))
return v1BadRequestResponse(c, err)
}

r.logger.Debug("validated pool exists", zap.Any("pool", pool))

assignment := models.Assignment{
TenantID: tenantID,
PortID: port.PortID,
PoolID: pool.PoolID,
}

if err := assignment.Insert(ctx, r.db, boil.Infer()); err != nil {
r.logger.Errorw("error inserting assignment", "error", err)
r.logger.Error("error inserting assignment", zap.Error(err))
return v1InternalServerErrorResponse(c, err)
}

Expand All @@ -67,12 +71,12 @@ func (r *Router) assignmentsCreate(c echo.Context) error {
)
if err != nil {
// TODO: add status to reconcile and requeue this
r.logger.Errorw("error creating assignment message", "error", err)
r.logger.Error("error creating assignment message", zap.Error(err))
}

if err := r.pubsub.PublishCreate(ctx, "load-balancer-assignment", "global", msg); err != nil {
// TODO: add status to reconcile and requeue this
r.logger.Errorw("error publishing assignment event", "error", err)
r.logger.Error("error publishing assignment event", zap.Error(err))
}

return v1AssignmentsCreatedResponse(c, assignment.AssignmentID)
Expand Down
13 changes: 7 additions & 6 deletions pkg/api/v1/assignments_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/labstack/echo/v4"
"go.infratographer.com/load-balancer-api/internal/models"
"go.infratographer.com/load-balancer-api/internal/pubsub"
"go.uber.org/zap"
)

// assignmentsDelete handles the DELETE /assignments route
Expand All @@ -12,13 +13,13 @@ func (r *Router) assignmentsDelete(c echo.Context) error {

mods, err := r.assignmentParamsBinding(c)
if err != nil {
r.logger.Errorw("error parsing query params", "error", err)
r.logger.Error("error parsing query params", zap.Error(err))
return v1BadRequestResponse(c, err)
}

assignments, err := models.Assignments(mods...).All(ctx, r.db)
if err != nil {
r.logger.Errorw("error getting assignments", "error", err)
r.logger.Error("error getting assignments", zap.Error(err))
return v1InternalServerErrorResponse(c, err)
}

Expand All @@ -27,7 +28,7 @@ func (r *Router) assignmentsDelete(c echo.Context) error {
return v1NotFoundResponse(c)
case 1:
if _, err := assignments[0].Delete(ctx, r.db, false); err != nil {
r.logger.Errorw("error deleting assignment", "error", err)
r.logger.Error("error deleting assignment", zap.Error(err))
return v1InternalServerErrorResponse(c, err)
}

Expand All @@ -36,7 +37,7 @@ func (r *Router) assignmentsDelete(c echo.Context) error {
feModel, err := models.Ports(feMods).One(ctx, r.db)
if err != nil {
// TODO: add status to reconcile and requeue this
r.logger.Errorw("error fetching port", "error", err)
r.logger.Error("error fetching port", zap.Error(err))
}

msg, err := pubsub.NewAssignmentMessage(
Expand All @@ -47,12 +48,12 @@ func (r *Router) assignmentsDelete(c echo.Context) error {
)
if err != nil {
// TODO: add status to reconcile and requeue this
r.logger.Errorw("error creating message", "error", err)
r.logger.Error("error creating message", zap.Error(err))
}

if err := r.pubsub.PublishDelete(ctx, "assignment", "global", msg); err != nil {
// TODO: add status to reconcile and requeue this
r.logger.Errorw("error publishing event", "error", err)
r.logger.Error("error publishing event", zap.Error(err))
}

return v1DeletedResponse(c)
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/v1/assignments_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"github.com/labstack/echo/v4"
"go.infratographer.com/load-balancer-api/internal/models"
"go.uber.org/zap"
)

// assignmentsGet handles the GET /assignments route
Expand All @@ -11,13 +12,13 @@ func (r *Router) assignmentsGet(c echo.Context) error {

mods, err := r.assignmentParamsBinding(c)
if err != nil {
r.logger.Errorw("error parsing query params", "error", err)
r.logger.Error("error parsing query params", zap.Error(err))
return v1BadRequestResponse(c, err)
}

assignments, err := models.Assignments(mods...).All(ctx, r.db)
if err != nil {
r.logger.Errorw("error getting assignments", "error", err)
r.logger.Error("error getting assignments", zap.Error(err))
return v1InternalServerErrorResponse(c, err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/api/v1/assignments_param_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"github.com/labstack/echo/v4"
"github.com/volatiletech/sqlboiler/v4/queries/qm"
"go.uber.org/zap"

"go.infratographer.com/load-balancer-api/internal/models"
)
Expand All @@ -25,7 +26,7 @@ func (r *Router) assignmentParamsBinding(c echo.Context) ([]qm.QueryMod, error)
mods = queryParamsToQueryMods(qpb, qp, mods)

if len(c.QueryParam(qp)) > 0 {
r.logger.Debugw("query param", "query_param", qp, "param_vale", c.QueryParam(qp))
r.logger.Debug("assignment query parameters", zap.String("query.key", qp), zap.String("query.value", c.QueryParam(qp)))
}
}

Expand Down
12 changes: 0 additions & 12 deletions pkg/api/v1/assignments_routes.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/api/v1/assignments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func Test_Assignments(t *testing.T) {
defer cleanupFE(t)

// Create a pool
pool, cleanupPool := createPool(t, srv, "marlin", loadBalancer.TenantID)
pool, cleanupPool := createPool(t, srv, "marlin", tenantID)
defer cleanupPool(t)

// Create an origin in the pool
_, cleanupOrigin := createOrigin(t, srv, "bruce", pool.ID)
defer cleanupOrigin(t)

// poll2
pool2, cleanupPool2 := createPool(t, srv, "dory", loadBalancer.TenantID)
pool2, cleanupPool2 := createPool(t, srv, "dory", tenantID)
defer cleanupPool2(t)

// origin2
Expand Down
122 changes: 121 additions & 1 deletion pkg/api/v1/load_balancers_create.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package api

import (
"context"
"database/sql"

"github.com/google/uuid"
"github.com/gosimple/slug"
"github.com/labstack/echo/v4"
Expand All @@ -20,6 +23,11 @@ func (r *Router) loadBalancerCreate(c echo.Context) error {
LoadBalancerType string `json:"load_balancer_type"`
IPAddressID string `json:"ip_address_id"`
LocationID string `json:"location_id"`
Ports []struct {
Name string `json:"name"`
Port int64 `json:"port"`
Pools []string `json:"pools"`
} `json:"ports"`
}{}

if err := c.Bind(&payload); err != nil {
Expand Down Expand Up @@ -61,8 +69,59 @@ func (r *Router) loadBalancerCreate(c echo.Context) error {
return v1BadRequestResponse(c, err)
}

if err = lb.Insert(ctx, r.db, boil.Infer()); err != nil {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
r.logger.Error("failed to begin transaction", zap.Error(err))
return v1InternalServerErrorResponse(c, err)
}

if err = lb.Insert(ctx, tx, boil.Infer()); err != nil {
r.logger.Error("failed to create load balancer, rolling back transaction", zap.Error(err))

if err := tx.Rollback(); err != nil {
r.logger.Error("error rolling back transaction", zap.Error(err))
return v1InternalServerErrorResponse(c, err)
}

return v1InternalServerErrorResponse(c, err)
}

additionalURNs := []string{}

for _, p := range payload.Ports {
portID, err := r.loadBalancerPortCreate(ctx, tx, lb.LoadBalancerID, p.Name, p.Port)
if err != nil {
r.logger.Error("failed to create load balancer port, rolling back transaction", zap.Error(err))

if err := tx.Rollback(); err != nil {
r.logger.Error("error rolling back transaction", zap.Error(err))
return v1InternalServerErrorResponse(c, err)
}

return v1BadRequestResponse(c, err)
}

additionalURNs = append(additionalURNs, pubsub.NewPortURN(portID))

for _, pool := range p.Pools {
assignmentID, err := r.loadBalancerAssignmentCreate(ctx, tx, tenantID, lb.LoadBalancerID, pool, portID)
if err != nil {
r.logger.Error("failed to create load balancer assignment, rolling back transaction", zap.Error(err))

if err := tx.Rollback(); err != nil {
r.logger.Error("error rolling back transaction", zap.Error(err))
return v1InternalServerErrorResponse(c, err)
}

return v1BadRequestResponse(c, err)
}

additionalURNs = append(additionalURNs, pubsub.NewAssignmentURN(assignmentID))
}
}

if err := tx.Commit(); err != nil {
r.logger.Error("failed to commit transaction", zap.Error(err))
return v1InternalServerErrorResponse(c, err)
}

Expand All @@ -72,6 +131,7 @@ func (r *Router) loadBalancerCreate(c echo.Context) error {
someTestJWTURN,
pubsub.NewTenantURN(tenantID),
pubsub.NewLoadBalancerURN(lb.LoadBalancerID),
additionalURNs...,
)
if err != nil {
// TODO: add status to reconcile and requeue this
Expand All @@ -85,3 +145,63 @@ func (r *Router) loadBalancerCreate(c echo.Context) error {

return v1LoadBalancerCreatedResponse(c, lb.LoadBalancerID)
}

func (r *Router) loadBalancerPortCreate(ctx context.Context, tx *sql.Tx, loadBalancerID string, portName string, portNumber int64) (string, error) {
r.logger.Debug("creating loadbalancer port",
zap.String("loadbalancer.id", loadBalancerID),
zap.String("port.name", portName),
zap.Int64("port.number", portNumber),
)

port := models.Port{
Name: portName,
Port: portNumber,
LoadBalancerID: loadBalancerID,
Slug: slug.Make(portName),
CurrentState: "pending",
}

if err := validatePort(&port); err != nil {
r.logger.Error("failed to validate port", zap.Error(err))
return "", err
}

if err := port.Insert(ctx, tx, boil.Infer()); err != nil {
r.logger.Error("failed to insert port", zap.Error(err))
return "", err
}

return port.PortID, nil
}

func (r *Router) loadBalancerAssignmentCreate(ctx context.Context, tx *sql.Tx, tenantID, loadBalancerID, poolID, portID string) (string, error) {
r.logger.Debug("creating loadbalancer assignment",
zap.String("tenant.id", tenantID),
zap.String("loadbalancer.id", loadBalancerID),
zap.String("pool.id", poolID),
zap.String("port.id", portID),
)

// validate pool exists
pool, err := models.Pools(
models.PoolWhere.PoolID.EQ(poolID),
models.PoolWhere.TenantID.EQ(tenantID),
).One(ctx, r.db)
if err != nil {
r.logger.Error("error fetching pool", zap.Error(err))
return "", err
}

assignment := models.Assignment{
TenantID: tenantID,
PortID: portID,
PoolID: pool.PoolID,
}

if err := assignment.Insert(ctx, tx, boil.Infer()); err != nil {
r.logger.Error("error inserting assignment", zap.Error(err))
return "", err
}

return assignment.AssignmentID, nil
}
Loading

0 comments on commit de2435d

Please sign in to comment.