From 3ca832200c686b18d6e845be9b20d6e82b764a4a Mon Sep 17 00:00:00 2001 From: Grigoriy Yurgin Date: Fri, 15 Nov 2024 16:42:48 +0300 Subject: [PATCH] [Disk Manager] Run system tasks in control plane only; fix lister metrics (#2496) * [Disk Manager] add RunSystemTasks to tasks config, run system tasks in controlplane only, clean up lister metrics when collectListerMetricsTask finishes * minor fix * [Disk Manager] do not cleanup in collectListerMetricsTask; fixes * [Disk Manager] fixes * fixes --- cloud/tasks/config/config.proto | 8 ++ cloud/tasks/registry.go | 12 ++ cloud/tasks/scheduler_impl.go | 154 +++++++++++++++----------- cloud/tasks/tasks_tests/tasks_test.go | 43 +++++++ 4 files changed, 153 insertions(+), 64 deletions(-) diff --git a/cloud/tasks/config/config.proto b/cloud/tasks/config/config.proto index 719c90899aa..445066d41b4 100644 --- a/cloud/tasks/config/config.proto +++ b/cloud/tasks/config/config.proto @@ -56,4 +56,12 @@ message TasksConfig { // Needed for tracing. // Spans of tasks with greater generation id will not be sampled. optional uint64 MaxSampledTaskGeneration = 32 [default = 100]; + + // System tasks are tasks implemented by `tasks` library, like, + // tasks.ClearEndedTasks, tasks.CollectListerMetricsTask. If this flag is + // true, then regular system tasks will be automatically registered for + // execution and scheduled. + // Warning: if this flag is false, then information about ended tasks + // will not be cleared from the database. + optional bool RegularSystemTasksEnabled = 33 [default = true]; } diff --git a/cloud/tasks/registry.go b/cloud/tasks/registry.go index 1227b799678..ecb69b0625a 100644 --- a/cloud/tasks/registry.go +++ b/cloud/tasks/registry.go @@ -18,6 +18,18 @@ type Registry struct { taskFactoriesMutex sync.RWMutex } +func (r *Registry) TaskTypes() []string { + r.taskFactoriesMutex.RLock() + defer r.taskFactoriesMutex.RUnlock() + + var taskTypes []string + for taskType := range r.taskFactories { + taskTypes = append(taskTypes, taskType) + } + + return taskTypes +} + func (r *Registry) TaskTypesForExecution() []string { r.taskFactoriesMutex.RLock() defer r.taskFactoriesMutex.RUnlock() diff --git a/cloud/tasks/scheduler_impl.go b/cloud/tasks/scheduler_impl.go index 1bbb84bcf03..17205b63470 100644 --- a/cloud/tasks/scheduler_impl.go +++ b/cloud/tasks/scheduler_impl.go @@ -529,72 +529,37 @@ func (s *scheduler) ScheduleBlankTask(ctx context.Context) (string, error) { //////////////////////////////////////////////////////////////////////////////// -func NewScheduler( +func (s *scheduler) registerAndScheduleRegularSystemTasks( ctx context.Context, - registry *Registry, - storage tasks_storage.Storage, config *tasks_config.TasksConfig, metricsRegistry metrics.Registry, -) (Scheduler, error) { - - pollForTaskUpdatesPeriod, err := time.ParseDuration( - config.GetPollForTaskUpdatesPeriod()) - if err != nil { - return nil, err - } - - taskWaitingTimeout, err := time.ParseDuration(config.GetTaskWaitingTimeout()) - if err != nil { - return nil, err - } - - scheduleRegularTasksPeriodMin, err := time.ParseDuration(config.GetScheduleRegularTasksPeriodMin()) - if err != nil { - return nil, err - } - - scheduleRegularTasksPeriodMax, err := time.ParseDuration(config.GetScheduleRegularTasksPeriodMax()) - if err != nil { - return nil, err - } +) error { - endedTaskExpirationTimeout, err := time.ParseDuration(config.GetEndedTaskExpirationTimeout()) + endedTaskExpirationTimeout, err := time.ParseDuration( + config.GetEndedTaskExpirationTimeout(), + ) if err != nil { - return nil, err + return err } clearEndedTasksTaskScheduleInterval, err := time.ParseDuration( config.GetClearEndedTasksTaskScheduleInterval(), ) if err != nil { - return nil, err + return err } - s := &scheduler{ - registry: registry, - storage: storage, - pollForTaskUpdatesPeriod: pollForTaskUpdatesPeriod, - taskWaitingTimeout: taskWaitingTimeout, - scheduleRegularTasksPeriodMin: scheduleRegularTasksPeriodMin, - scheduleRegularTasksPeriodMax: scheduleRegularTasksPeriodMax, - } - - err = registry.RegisterForExecution("tasks.Blank", func() Task { - return &blankTask{} - }) - if err != nil { - return nil, err - } - - err = registry.RegisterForExecution("tasks.ClearEndedTasks", func() Task { - return &clearEndedTasksTask{ - storage: storage, - expirationTimeout: endedTaskExpirationTimeout, - limit: int(config.GetClearEndedTasksLimit()), - } - }) + err = s.registry.RegisterForExecution( + "tasks.ClearEndedTasks", func() Task { + return &clearEndedTasksTask{ + storage: s.storage, + expirationTimeout: endedTaskExpirationTimeout, + limit: int(config.GetClearEndedTasksLimit()), + } + }, + ) if err != nil { - return nil, err + return err } s.ScheduleRegularTasks( @@ -610,31 +575,31 @@ func NewScheduler( config.GetListerMetricsCollectionInterval(), ) if err != nil { - return nil, err + return err } - err = registry.RegisterForExecution( + collectListerMetricsTaskScheduleInterval, err := time.ParseDuration( + config.GetCollectListerMetricsTaskScheduleInterval(), + ) + if err != nil { + return err + } + + err = s.registry.RegisterForExecution( "tasks.CollectListerMetrics", func() Task { return &collectListerMetricsTask{ registry: metricsRegistry, - storage: storage, + storage: s.storage, metricsCollectionInterval: listerMetricsCollectionInterval, - taskTypes: registry.TaskTypesForExecution(), + taskTypes: s.registry.TaskTypes(), hangingTaskGaugesByID: make(map[string]metrics.Gauge), maxHangingTaskIDsToReport: config.GetMaxHangingTaskIDsToReport(), } }, ) if err != nil { - return nil, err - } - - collectListerMetricsTaskScheduleInterval, err := time.ParseDuration( - config.GetCollectListerMetricsTaskScheduleInterval(), - ) - if err != nil { - return nil, err + return err } s.ScheduleRegularTasks( @@ -646,5 +611,66 @@ func NewScheduler( }, ) + return nil +} + +//////////////////////////////////////////////////////////////////////////////// + +func NewScheduler( + ctx context.Context, + registry *Registry, + storage tasks_storage.Storage, + config *tasks_config.TasksConfig, + metricsRegistry metrics.Registry, +) (Scheduler, error) { + + pollForTaskUpdatesPeriod, err := time.ParseDuration( + config.GetPollForTaskUpdatesPeriod()) + if err != nil { + return nil, err + } + + taskWaitingTimeout, err := time.ParseDuration(config.GetTaskWaitingTimeout()) + if err != nil { + return nil, err + } + + scheduleRegularTasksPeriodMin, err := time.ParseDuration(config.GetScheduleRegularTasksPeriodMin()) + if err != nil { + return nil, err + } + + scheduleRegularTasksPeriodMax, err := time.ParseDuration(config.GetScheduleRegularTasksPeriodMax()) + if err != nil { + return nil, err + } + + s := &scheduler{ + registry: registry, + storage: storage, + pollForTaskUpdatesPeriod: pollForTaskUpdatesPeriod, + taskWaitingTimeout: taskWaitingTimeout, + scheduleRegularTasksPeriodMin: scheduleRegularTasksPeriodMin, + scheduleRegularTasksPeriodMax: scheduleRegularTasksPeriodMax, + } + + err = registry.RegisterForExecution("tasks.Blank", func() Task { + return &blankTask{} + }) + if err != nil { + return nil, err + } + + if config.GetRegularSystemTasksEnabled() { + err = s.registerAndScheduleRegularSystemTasks( + ctx, + config, + metricsRegistry, + ) + if err != nil { + return nil, err + } + } + return s, nil } diff --git a/cloud/tasks/tasks_tests/tasks_test.go b/cloud/tasks/tasks_tests/tasks_test.go index a384a992d5b..f721e91e5e8 100644 --- a/cloud/tasks/tasks_tests/tasks_test.go +++ b/cloud/tasks/tasks_tests/tasks_test.go @@ -312,6 +312,12 @@ func registerLongTask(registry *tasks.Registry) error { }) } +func registerLongTaskNotForExecution(registry *tasks.Registry) error { + return registry.Register("long", func() tasks.Task { + return &longTask{} + }) +} + func scheduleLongTask( ctx context.Context, scheduler tasks.Scheduler, @@ -1307,3 +1313,40 @@ func TestHangingTasksMetrics(t *testing.T) { gaugeUnsetWg.Wait() registry.AssertAllExpectations(t) } + +func TestHangingTasksMetricsAreSetEvenForTasksNotRegisteredForExecution(t *testing.T) { + ctx, cancel := context.WithCancel(newContext()) + defer cancel() + + db, err := newYDB(ctx) + require.NoError(t, err) + defer db.Close(ctx) + + registry := mocks.NewIgnoreUnknownCallsRegistryMock() + + config := newHangingTaskTestConfig() + + s := createServicesWithConfig(t, ctx, db, config, registry) + err = registerLongTaskNotForExecution(s.registry) + require.NoError(t, err) + + err = s.startRunners(ctx) + require.NoError(t, err) + + gaugeSetChannel := make(chan int) + + registry.GetGauge( + "totalHangingTaskCount", + map[string]string{"type": "long"}, + ).On("Set", float64(0)).Return(mock.Anything).Run( + func(args mock.Arguments) { + select { + case gaugeSetChannel <- 0: + default: + } + }, + ) + + _ = <-gaugeSetChannel + registry.AssertAllExpectations(t) +}