Skip to content

Commit

Permalink
feat: new approach for longpoll
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Sep 12, 2024
1 parent 927a408 commit 91e26a3
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 78 deletions.
10 changes: 4 additions & 6 deletions config/crds/mission-control.flanksource.com_incidentrules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,15 @@ spec:
properties:
timeout:
description: How long after the health checks have been passing
before, autoclosing the incident.
format: int64
type: integer
before, autoclosing the incident (accepts goduration format)
type: string
type: object
autoResolve:
properties:
timeout:
description: How long after the health checks have been passing
before, autoclosing the incident.
format: int64
type: integer
before, autoclosing the incident (accepts goduration format)
type: string
type: object
breakOnMatch:
description: stop processing other incident rules, when matched
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/samber/oops v1.13.1
github.com/samber/slog-echo v1.14.4
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
github.com/sethvargo/go-retry v0.3.0
github.com/slack-go/slack v0.14.0
github.com/tg123/go-htpasswd v1.2.2
github.com/timberio/go-datemath v0.1.0
Expand Down Expand Up @@ -207,7 +208,6 @@ require (
github.com/rodaine/table v1.3.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
github.com/sethvargo/go-retry v0.3.0 // indirect
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shurcooL/githubv4 v0.0.0-20190718010115-4ba037080260 // indirect
Expand Down Expand Up @@ -359,6 +359,6 @@ require (

// replace github.com/flanksource/commons => /Users/moshe/go/src/github.com/flanksource/commons

// replace github.com/flanksource/duty => ../du31
// replace github.com/flanksource/duty => ../duty

// replace github.com/flanksource/gomplate/v3 => /Users/moshe/go/src/github.com/flanksource/gomplate
3 changes: 1 addition & 2 deletions playbook/run_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ func StartPlaybookConsumers(ctx context.Context) error {
go pg.Listen(ctx, pgNotifyPlaybookActionUpdates, actionAgentUpdatesPGNotifyChannel)
go actionAgentEventConsumer.Listen(ctx, actionAgentUpdatesPGNotifyChannel)

go pg.Listen(ctx, pgNotifyPlaybookActionUpdates, runner.ActionMgr.Chan())
go runner.ActionMgr.Listen()
go runner.ActionNotifyRouter.Run(ctx, pgNotifyPlaybookActionUpdates)

return nil
}
Expand Down
69 changes: 48 additions & 21 deletions playbook/runner/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,63 @@ type ActionForAgent struct {
TemplateEnv actions.TemplateEnv `json:"template_env"`
}

func GetActionForAgent(ctx context.Context, agent *models.Agent) (*ActionForAgent, error) {
select {
case <-time.After(LongpollTimeout):
return &ActionForAgent{}, nil

case actionID := <-ActionMgr.Register(agent.ID.String()):
tx := ctx.DB().Begin()
if tx.Error != nil {
return nil, fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()
func GetActionForAgentWithWait(ctx context.Context, agent *models.Agent) (*ActionForAgent, error) {
action, err := getActionForAgent(ctx, agent)
if err != nil {
return nil, err
}

ctx = ctx.WithDB(tx, ctx.Pool())
ctx = ctx.WithObject(agent)
if action != nil {
return action, err
}

var action models.PlaybookRunAction
if err := ctx.DB().Where("id = ?", actionID).First(&action).Error; err != nil {
return nil, err
}
// Go into waiting state
select {
case <-time.After(ctx.Properties().Duration("playbook.runner.longpoll.timeout", DefaultLongpollTimeout)):
return &ActionForAgent{}, nil

actionForAgent, err := getAgentAction(ctx, agent, &action)
case <-ActionNotifyRouter.RegisterRoutes(agent.ID.String()):
action, err := getActionForAgent(ctx, agent)
if err != nil {
return nil, err
}

return actionForAgent, ctx.Oops().Wrap(tx.Commit().Error)
return action, err
}
}

func getAgentAction(ctx context.Context, agent *models.Agent, step *models.PlaybookRunAction) (*ActionForAgent, error) {
func getActionForAgent(ctx context.Context, agent *models.Agent) (*ActionForAgent, error) {
tx := ctx.DB().Begin()
if tx.Error != nil {
return nil, fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()

ctx = ctx.WithDB(tx, ctx.Pool())
ctx = ctx.WithObject(agent)

query := `
SELECT playbook_run_actions.*
FROM playbook_run_actions
INNER JOIN playbook_runs ON playbook_runs.id = playbook_run_actions.playbook_run_id
INNER JOIN playbooks ON playbooks.id = playbook_runs.playbook_id
WHERE playbook_run_actions.status = ?
AND (playbook_run_actions.scheduled_time IS NULL or playbook_run_actions.scheduled_time <= NOW())
AND playbook_run_actions.agent_id = ?
ORDER BY scheduled_time
FOR UPDATE SKIP LOCKED
LIMIT 1
`

var steps []models.PlaybookRunAction
if err := ctx.DB().Raw(query, models.PlaybookRunStatusWaiting, agent.ID).Find(&steps).Error; err != nil {
return nil, ctx.Oops("db").Wrap(err)
}

if len(steps) == 0 {
return nil, nil
}
step := &steps[0]
ctx = ctx.WithObject(agent, step)

run, err := step.GetRun(ctx.DB())
Expand Down Expand Up @@ -100,5 +127,5 @@ func getAgentAction(ctx context.Context, agent *models.Agent, step *models.Playb
return nil, ctx.Oops().Wrap(err)
}

return &output, nil
return &output, ctx.Oops().Wrap(tx.Commit().Error)
}
58 changes: 12 additions & 46 deletions playbook/runner/longpoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,29 @@ package runner

import (
"encoding/json"
"sync"
"time"
)

// Global instance
var ActionMgr = NewActionNotifyManager(make(chan string))

var LongpollTimeout = time.Minute
"github.com/flanksource/duty/postq/pg"
)

type actionNotifyManager struct {
ch chan string
mu *sync.RWMutex
subscriptions map[string]chan string
}
var DefaultLongpollTimeout = time.Minute

func NewActionNotifyManager(ch chan string) *actionNotifyManager {
return &actionNotifyManager{
ch: ch,
mu: &sync.RWMutex{},
subscriptions: make(map[string]chan string),
}
}
// Global instance
var ActionNotifyRouter = pg.NewNotifyRouter().WithRouteExtractor(playbookActionNotifyRouteExtractor)

type playbookActionNotifyPayload struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
}

func (d *actionNotifyManager) Chan() chan<- string {
return d.ch
}

func (d *actionNotifyManager) Listen() {
for payload := range d.ch {
var action playbookActionNotifyPayload
if err := json.Unmarshal([]byte(payload), &action); err != nil {
continue
}

d.mu.RLock()
if e, ok := d.subscriptions[action.AgentID]; ok {
e <- action.ID
}
d.mu.RUnlock()
}
}

func (d *actionNotifyManager) Register(agentID string) chan string {
d.mu.Lock()
defer d.mu.Unlock()

if e, ok := d.subscriptions[agentID]; ok {
return e
func playbookActionNotifyRouteExtractor(payload string) (string, string, error) {
var p playbookActionNotifyPayload
if err := json.Unmarshal([]byte(payload), &p); err != nil {
return "", "", err
}

ch := make(chan string)
d.subscriptions[agentID] = ch
route := p.AgentID
extractedPayload := p.ID

return ch
return route, extractedPayload, nil
}
2 changes: 1 addition & 1 deletion upstream/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func handlePlaybookActionRequest(c echo.Context) error {
ctx := c.Request().Context().(context.Context)

agent := ctx.Agent()
response, err := runner.GetActionForAgent(ctx, agent)
response, err := runner.GetActionForAgentWithWait(ctx, agent)
if err != nil {
return dutyAPI.WriteError(c, err)
}
Expand Down

0 comments on commit 91e26a3

Please sign in to comment.