diff --git a/playbook/run_consumer.go b/playbook/run_consumer.go index 8483c230d..ac7189462 100644 --- a/playbook/run_consumer.go +++ b/playbook/run_consumer.go @@ -299,12 +299,13 @@ func RunConsumer(ctx context.Context) (int, error) { FROM playbook_runs INNER JOIN playbooks ON playbooks.id = playbook_runs.playbook_id WHERE status IN (?, ?) AND scheduled_time <= NOW() + AND (agent_id IS NULL OR agent_id = ?) ORDER BY scheduled_time FOR UPDATE SKIP LOCKED LIMIT 1 ` var run models.PlaybookRun - if err := tx.Raw(query, models.PlaybookRunStatusScheduled, models.PlaybookRunStatusSleeping).First(&run).Error; err != nil { + if err := tx.Raw(query, models.PlaybookRunStatusScheduled, models.PlaybookRunStatusSleeping, uuid.Nil).First(&run).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil } diff --git a/playbook/runner/agent.go b/playbook/runner/agent.go index 973801e79..afc5be585 100644 --- a/playbook/runner/agent.go +++ b/playbook/runner/agent.go @@ -95,7 +95,7 @@ func getActionForAgent(ctx context.Context, agent *models.Agent) (*ActionForAgen return nil, ctx.Oops().Wrapf(err, "failed to template env") } - spec, err := getActionSpec(ctx, playbook, step.Name) + spec, err := getActionSpec(playbook, step.Name) if err != nil { return nil, ctx.Oops().Wrap(err) } @@ -117,7 +117,7 @@ func getActionForAgent(ctx context.Context, agent *models.Agent) (*ActionForAgen TemplateEnv: templateEnv, } - if skip, err := filterAction(ctx, run.ID, spec.Filter); err != nil { + if skip, err := filterAction(ctx, spec.Filter); err != nil { return nil, ctx.Oops().Wrap(err) } else { // We run the filter on the upstream and simply send the filter result to the agent. diff --git a/playbook/runner/exec.go b/playbook/runner/exec.go index edd2fc1ad..8e090f136 100644 --- a/playbook/runner/exec.go +++ b/playbook/runner/exec.go @@ -41,7 +41,7 @@ func executeAction(ctx context.Context, playbookID any, runID uuid.UUID, runActi } if actionSpec.Filter != "" { - if skipped, err := filterAction(ctx, runID, actionSpec.Filter); err != nil { + if skipped, err := filterAction(ctx, actionSpec.Filter); err != nil { return executeActionResult{}, err } else if skipped { ctx.Debugf("skipping %s", actionSpec.Name) diff --git a/playbook/runner/runner.go b/playbook/runner/runner.go index 1f7ac689e..83a5c10b9 100644 --- a/playbook/runner/runner.go +++ b/playbook/runner/runner.go @@ -16,7 +16,6 @@ import ( v1 "github.com/flanksource/incident-commander/api/v1" "github.com/flanksource/incident-commander/db" "github.com/flanksource/incident-commander/playbook/actions" - "github.com/google/uuid" "github.com/samber/lo" "github.com/samber/oops" "gorm.io/gorm" @@ -87,7 +86,7 @@ func findNextActionWithFilter(actions []v1.PlaybookAction) *v1.PlaybookAction { return nil } -func getActionSpec(ctx context.Context, playbook *models.Playbook, name string) (*v1.PlaybookAction, error) { +func getActionSpec(playbook *models.Playbook, name string) (*v1.PlaybookAction, error) { var spec v1.PlaybookSpec if err := json.Unmarshal(playbook.Spec, &spec); err != nil { return nil, err @@ -326,7 +325,7 @@ func TemplateAndExecuteAction(ctx context.Context, spec v1.Playbook, playbook *m return oops.Wrap(ExecuteAndSaveAction(ctx, run.PlaybookID, action, step)) } -func filterAction(ctx context.Context, runID uuid.UUID, filter string) (bool, error) { +func filterAction(ctx context.Context, filter string) (bool, error) { if strings.TrimSpace(filter) == "" { return false, nil }