diff --git a/internal/controller/monitor/monitor.go b/internal/controller/monitor/monitor.go index 3190c4b3..a748b2c1 100644 --- a/internal/controller/monitor/monitor.go +++ b/internal/controller/monitor/monitor.go @@ -167,57 +167,56 @@ func (m *Monitor) Start(ctx context.Context, handler JobHandler) <-chan error { return } - jobs := resp.CommandJobs() - - // Wrapped so staleCancel is called before the next loop iteration. - func() { - // A sneaky way to create a channel that is closed after a duration. - // Why not pass directly to handler.Create? Because that might - // interrupt scheduling a pod, when all we want is to bound the - // time spent waiting for the limiter. - staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.PollInterval) - defer staleCancel() - - // TODO: sort by ScheduledAt in the API - sort.Slice(jobs, func(i, j int) bool { - return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt) - }) - - for _, j := range jobs { - if staleCtx.Err() != nil { - // Results already stale; try again later. - return - } - - jobTags := toMapAndLogErrors(logger, j.AgentQueryRules) - - // The api returns jobs that match ANY agent tags (the agent query rules) - // However, we can only acquire jobs that match ALL agent tags - if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) { - logger.Debug("skipping job because it did not match all tags", zap.Any("job", j)) - continue - } - - job := Job{ - CommandJob: &j.CommandJob, - StaleCh: staleCtx.Done(), - } - - logger.Debug("creating job", zap.String("uuid", j.Uuid)) - if err := handler.Create(ctx, job); err != nil { - if ctx.Err() != nil { - return - } - logger.Error("failed to create job", zap.Error(err)) - } - } - }() + m.createJobs(ctx, logger, handler, agentTags, resp.CommandJobs()) } }() return errs } +func (m *Monitor) createJobs(ctx context.Context, logger *zap.Logger, handler JobHandler, agentTags map[string]string, jobs []*api.JobJobTypeCommand) { + // A sneaky way to create a channel that is closed after a duration. + // Why not pass directly to handler.Create? Because that might + // interrupt scheduling a pod, when all we want is to bound the + // time spent waiting for the limiter. + staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.PollInterval) + defer staleCancel() + + // TODO: sort by ScheduledAt in the API + sort.Slice(jobs, func(i, j int) bool { + return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt) + }) + + for _, j := range jobs { + if staleCtx.Err() != nil { + // Results already stale; try again later. + return + } + + jobTags := toMapAndLogErrors(logger, j.AgentQueryRules) + + // The api returns jobs that match ANY agent tags (the agent query rules) + // However, we can only acquire jobs that match ALL agent tags + if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) { + logger.Debug("skipping job because it did not match all tags", zap.Any("job", j)) + continue + } + + job := Job{ + CommandJob: &j.CommandJob, + StaleCh: staleCtx.Done(), + } + + logger.Debug("creating job", zap.String("uuid", j.Uuid)) + if err := handler.Create(ctx, job); err != nil { + if ctx.Err() != nil { + return + } + logger.Error("failed to create job", zap.Error(err)) + } + } +} + func encodeClusterGraphQLID(clusterUUID string) string { return base64.StdEncoding.EncodeToString([]byte("Cluster---" + clusterUUID)) }