Skip to content

Commit

Permalink
feat: add support for reconciling check statuses
Browse files Browse the repository at this point in the history
[skip ci]
  • Loading branch information
adityathebe committed Jul 26, 2023
1 parent b0de55b commit 4e085fd
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 16 deletions.
28 changes: 25 additions & 3 deletions db/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,39 @@ package db

import (
"fmt"
"strings"

"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/upstream"
"github.com/flanksource/incident-commander/api"
"github.com/google/uuid"
"gorm.io/gorm/clause"
)

func GetAllResourceIDsOfAgent(ctx *api.Context, agentID string, req upstream.PaginateRequest) ([]string, error) {
func GetAllResourceIDsOfAgent(ctx *api.Context, req upstream.PaginateRequest, agentID uuid.UUID) ([]string, error) {
var response []string
query := fmt.Sprintf("SELECT id FROM %s WHERE agent_id = ? AND id > ? ORDER BY id LIMIT ?", req.Table)
err := ctx.DB().Raw(query, agentID, req.From, req.Size).Scan(&response).Error
var err error

switch req.Table {
case "check_statuses":
query := `
SELECT (check_id::TEXT || ',' || time::TEXT)
FROM check_statuses
LEFT JOIN checks ON checks.id = check_statuses.check_id
WHERE checks.agent_id = ? AND (check_statuses.check_id::TEXT, check_statuses.time::TEXT) > (?, ?)
ORDER BY check_statuses.check_id, check_statuses.time
LIMIT ?`
parts := strings.Split(req.From, ",")
if len(parts) != 2 {
return nil, fmt.Errorf("%s is not a valid next cursor. It must consist of check_id and time separated by a comma", req.From)
}

err = ctx.DB().Raw(query, agentID, parts[0], parts[1], req.Size).Scan(&response).Error
default:
query := fmt.Sprintf("SELECT id FROM %s WHERE agent_id = ? AND id::TEXT > ? ORDER BY id LIMIT ?", req.Table)
err = ctx.DB().Raw(query, agentID, req.From, req.Size).Scan(&response).Error
}

return response, err
}

Expand Down
6 changes: 3 additions & 3 deletions upstream/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func PushUpstream(c echo.Context) error {

req.PopulateAgentID(agentID.(uuid.UUID))

logger.Debugf("Pushing %s", req.String())
logger.Tracef("Inserting push data %s", req.String())
if err := db.InsertUpstreamMsg(ctx, &req); err != nil {
return c.JSON(http.StatusInternalServerError, api.HTTPError{Error: err.Error(), Message: "failed to upsert upstream message"})
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func Pull(c echo.Context) error {
return c.JSON(http.StatusNotFound, api.HTTPError{Message: fmt.Sprintf("agent(name=%s) not found", agentName)})
}

resp, err := db.GetAllResourceIDsOfAgent(ctx, agent.ID.String(), req)
resp, err := db.GetAllResourceIDsOfAgent(ctx, req, agent.ID)
if err != nil {
return c.JSON(http.StatusInternalServerError, api.HTTPError{Error: err.Error(), Message: "failed to get resource ids"})
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func Status(c echo.Context) error {
return c.JSON(http.StatusNotFound, api.HTTPError{Message: fmt.Sprintf("agent(name=%s) not found", agentName)})
}

response, err := upstream.GetPrimaryKeysHash(ctx, req.Table, req.From, req.Size)
response, err := upstream.GetPrimaryKeysHash(ctx, req, agent.ID)
if err != nil {
return c.JSON(http.StatusInternalServerError, api.HTTPError{Error: err.Error(), Message: "failed to push status response"})
}
Expand Down
4 changes: 2 additions & 2 deletions upstream/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ func SyncWithUpstream(ctx *api.Context) error {
_ = db.PersistJobHistory(ctx, jobHistory.End())
}()

syncer := upstream.NewUpstreamSyncer(api.UpstreamConf, ReconcilePageSize)
reconciler := upstream.NewUpstreamReconciler(api.UpstreamConf, ReconcilePageSize)
for _, table := range api.TablesToReconcile {
if err := syncer.SyncTableWithUpstream(ctx, table); err != nil {
if err := reconciler.Sync(ctx, table); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("failed to sync table %s: %w", table, err)
} else {
Expand Down
16 changes: 8 additions & 8 deletions upstream/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ var _ = ginkgo.Describe("Push Mode reconcilation", ginkgo.Ordered, func() {
upstreamCtx := api.NewContext(upstreamDB, nil)

for _, table := range api.TablesToReconcile {
agentStatus, err := upstream.GetPrimaryKeysHash(ctx, table, "", 500)
agentStatus, err := upstream.GetPrimaryKeysHash(ctx, upstream.PaginateRequest{From: "", Table: table, Size: 500}, uuid.Nil)
Expect(err).To(BeNil())

upstreamStatus, err := upstream.GetPrimaryKeysHash(upstreamCtx, table, "", 500)
upstreamStatus, err := upstream.GetPrimaryKeysHash(upstreamCtx, upstream.PaginateRequest{From: "", Table: table, Size: 500}, uuid.Nil)
Expect(err).To(BeNil())

Expect(agentStatus).ToNot(Equal(upstreamStatus), fmt.Sprintf("table [%s] hash to not match", table))
Expand All @@ -67,9 +67,9 @@ var _ = ginkgo.Describe("Push Mode reconcilation", ginkgo.Ordered, func() {
ginkgo.It("should reconcile all the tables", func() {
ctx := api.NewContext(agentDB, nil)

syncer := upstream.NewUpstreamSyncer(api.UpstreamConf, 500)
reconciler := upstream.NewUpstreamReconciler(api.UpstreamConf, 500)
for _, table := range api.TablesToReconcile {
err := syncer.SyncTableWithUpstream(ctx, table)
err := reconciler.Sync(ctx, table)
Expect(err).To(BeNil(), fmt.Sprintf("should push table '%s' to upstream", table))
}
})
Expand All @@ -79,10 +79,10 @@ var _ = ginkgo.Describe("Push Mode reconcilation", ginkgo.Ordered, func() {
upstreamCtx := api.NewContext(upstreamDB, nil)

for _, table := range api.TablesToReconcile {
agentStatus, err := upstream.GetPrimaryKeysHash(ctx, table, "", 500)
agentStatus, err := upstream.GetPrimaryKeysHash(ctx, upstream.PaginateRequest{From: "", Table: table, Size: 500}, uuid.Nil)
Expect(err).To(BeNil())

upstreamStatus, err := upstream.GetPrimaryKeysHash(upstreamCtx, table, "", 500)
upstreamStatus, err := upstream.GetPrimaryKeysHash(upstreamCtx, upstream.PaginateRequest{From: "", Table: table, Size: 500}, uuid.Nil)
Expect(err).To(BeNil())

Expect(agentStatus).To(Equal(upstreamStatus), fmt.Sprintf("table [%s] hash to match", table))
Expand Down Expand Up @@ -132,8 +132,8 @@ var _ = ginkgo.Describe("Push Mode reconcilation", ginkgo.Ordered, func() {
ginkgo.It("should reconcile config items", func() {
ctx := api.NewContext(agentDB, nil)

syncer := upstream.NewUpstreamSyncer(api.UpstreamConf, 500)
err := syncer.SyncTableWithUpstream(ctx, "config_items")
reconciler := upstream.NewUpstreamReconciler(api.UpstreamConf, 500)
err := reconciler.Sync(ctx, "config_items")
Expect(err).To(BeNil(), "should push table 'config_items' upstream")
})

Expand Down

0 comments on commit 4e085fd

Please sign in to comment.