Skip to content

Commit

Permalink
[Disk Manager] Run system tasks in control plane only; fix lister met…
Browse files Browse the repository at this point in the history
…rics (#2496)

* [Disk Manager] add RunSystemTasks to tasks config, run system tasks in controlplane only, clean up lister metrics when collectListerMetricsTask finishes

* minor fix

* [Disk Manager] do not cleanup in collectListerMetricsTask; fixes

* [Disk Manager] fixes

* fixes
  • Loading branch information
gy2411 authored Nov 15, 2024
1 parent f82fa2c commit 3ca8322
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 64 deletions.
8 changes: 8 additions & 0 deletions cloud/tasks/config/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,12 @@ message TasksConfig {
// Needed for tracing.
// Spans of tasks with greater generation id will not be sampled.
optional uint64 MaxSampledTaskGeneration = 32 [default = 100];

// System tasks are tasks implemented by `tasks` library, like,
// tasks.ClearEndedTasks, tasks.CollectListerMetricsTask. If this flag is
// true, then regular system tasks will be automatically registered for
// execution and scheduled.
// Warning: if this flag is false, then information about ended tasks
// will not be cleared from the database.
optional bool RegularSystemTasksEnabled = 33 [default = true];
}
12 changes: 12 additions & 0 deletions cloud/tasks/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ type Registry struct {
taskFactoriesMutex sync.RWMutex
}

func (r *Registry) TaskTypes() []string {
r.taskFactoriesMutex.RLock()
defer r.taskFactoriesMutex.RUnlock()

var taskTypes []string
for taskType := range r.taskFactories {
taskTypes = append(taskTypes, taskType)
}

return taskTypes
}

func (r *Registry) TaskTypesForExecution() []string {
r.taskFactoriesMutex.RLock()
defer r.taskFactoriesMutex.RUnlock()
Expand Down
154 changes: 90 additions & 64 deletions cloud/tasks/scheduler_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,72 +529,37 @@ func (s *scheduler) ScheduleBlankTask(ctx context.Context) (string, error) {

////////////////////////////////////////////////////////////////////////////////

func NewScheduler(
func (s *scheduler) registerAndScheduleRegularSystemTasks(
ctx context.Context,
registry *Registry,
storage tasks_storage.Storage,
config *tasks_config.TasksConfig,
metricsRegistry metrics.Registry,
) (Scheduler, error) {

pollForTaskUpdatesPeriod, err := time.ParseDuration(
config.GetPollForTaskUpdatesPeriod())
if err != nil {
return nil, err
}

taskWaitingTimeout, err := time.ParseDuration(config.GetTaskWaitingTimeout())
if err != nil {
return nil, err
}

scheduleRegularTasksPeriodMin, err := time.ParseDuration(config.GetScheduleRegularTasksPeriodMin())
if err != nil {
return nil, err
}

scheduleRegularTasksPeriodMax, err := time.ParseDuration(config.GetScheduleRegularTasksPeriodMax())
if err != nil {
return nil, err
}
) error {

endedTaskExpirationTimeout, err := time.ParseDuration(config.GetEndedTaskExpirationTimeout())
endedTaskExpirationTimeout, err := time.ParseDuration(
config.GetEndedTaskExpirationTimeout(),
)
if err != nil {
return nil, err
return err
}

clearEndedTasksTaskScheduleInterval, err := time.ParseDuration(
config.GetClearEndedTasksTaskScheduleInterval(),
)
if err != nil {
return nil, err
return err
}

s := &scheduler{
registry: registry,
storage: storage,
pollForTaskUpdatesPeriod: pollForTaskUpdatesPeriod,
taskWaitingTimeout: taskWaitingTimeout,
scheduleRegularTasksPeriodMin: scheduleRegularTasksPeriodMin,
scheduleRegularTasksPeriodMax: scheduleRegularTasksPeriodMax,
}

err = registry.RegisterForExecution("tasks.Blank", func() Task {
return &blankTask{}
})
if err != nil {
return nil, err
}

err = registry.RegisterForExecution("tasks.ClearEndedTasks", func() Task {
return &clearEndedTasksTask{
storage: storage,
expirationTimeout: endedTaskExpirationTimeout,
limit: int(config.GetClearEndedTasksLimit()),
}
})
err = s.registry.RegisterForExecution(
"tasks.ClearEndedTasks", func() Task {
return &clearEndedTasksTask{
storage: s.storage,
expirationTimeout: endedTaskExpirationTimeout,
limit: int(config.GetClearEndedTasksLimit()),
}
},
)
if err != nil {
return nil, err
return err
}

s.ScheduleRegularTasks(
Expand All @@ -610,31 +575,31 @@ func NewScheduler(
config.GetListerMetricsCollectionInterval(),
)
if err != nil {
return nil, err
return err
}

err = registry.RegisterForExecution(
collectListerMetricsTaskScheduleInterval, err := time.ParseDuration(
config.GetCollectListerMetricsTaskScheduleInterval(),
)
if err != nil {
return err
}

err = s.registry.RegisterForExecution(
"tasks.CollectListerMetrics", func() Task {
return &collectListerMetricsTask{
registry: metricsRegistry,
storage: storage,
storage: s.storage,
metricsCollectionInterval: listerMetricsCollectionInterval,

taskTypes: registry.TaskTypesForExecution(),
taskTypes: s.registry.TaskTypes(),
hangingTaskGaugesByID: make(map[string]metrics.Gauge),
maxHangingTaskIDsToReport: config.GetMaxHangingTaskIDsToReport(),
}
},
)
if err != nil {
return nil, err
}

collectListerMetricsTaskScheduleInterval, err := time.ParseDuration(
config.GetCollectListerMetricsTaskScheduleInterval(),
)
if err != nil {
return nil, err
return err
}

s.ScheduleRegularTasks(
Expand All @@ -646,5 +611,66 @@ func NewScheduler(
},
)

return nil
}

////////////////////////////////////////////////////////////////////////////////

func NewScheduler(
ctx context.Context,
registry *Registry,
storage tasks_storage.Storage,
config *tasks_config.TasksConfig,
metricsRegistry metrics.Registry,
) (Scheduler, error) {

pollForTaskUpdatesPeriod, err := time.ParseDuration(
config.GetPollForTaskUpdatesPeriod())
if err != nil {
return nil, err
}

taskWaitingTimeout, err := time.ParseDuration(config.GetTaskWaitingTimeout())
if err != nil {
return nil, err
}

scheduleRegularTasksPeriodMin, err := time.ParseDuration(config.GetScheduleRegularTasksPeriodMin())
if err != nil {
return nil, err
}

scheduleRegularTasksPeriodMax, err := time.ParseDuration(config.GetScheduleRegularTasksPeriodMax())
if err != nil {
return nil, err
}

s := &scheduler{
registry: registry,
storage: storage,
pollForTaskUpdatesPeriod: pollForTaskUpdatesPeriod,
taskWaitingTimeout: taskWaitingTimeout,
scheduleRegularTasksPeriodMin: scheduleRegularTasksPeriodMin,
scheduleRegularTasksPeriodMax: scheduleRegularTasksPeriodMax,
}

err = registry.RegisterForExecution("tasks.Blank", func() Task {
return &blankTask{}
})
if err != nil {
return nil, err
}

if config.GetRegularSystemTasksEnabled() {
err = s.registerAndScheduleRegularSystemTasks(
ctx,
config,
metricsRegistry,
)
if err != nil {
return nil, err
}
}

return s, nil
}
43 changes: 43 additions & 0 deletions cloud/tasks/tasks_tests/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ func registerLongTask(registry *tasks.Registry) error {
})
}

func registerLongTaskNotForExecution(registry *tasks.Registry) error {
return registry.Register("long", func() tasks.Task {
return &longTask{}
})
}

func scheduleLongTask(
ctx context.Context,
scheduler tasks.Scheduler,
Expand Down Expand Up @@ -1307,3 +1313,40 @@ func TestHangingTasksMetrics(t *testing.T) {
gaugeUnsetWg.Wait()
registry.AssertAllExpectations(t)
}

func TestHangingTasksMetricsAreSetEvenForTasksNotRegisteredForExecution(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx)
require.NoError(t, err)
defer db.Close(ctx)

registry := mocks.NewIgnoreUnknownCallsRegistryMock()

config := newHangingTaskTestConfig()

s := createServicesWithConfig(t, ctx, db, config, registry)
err = registerLongTaskNotForExecution(s.registry)
require.NoError(t, err)

err = s.startRunners(ctx)
require.NoError(t, err)

gaugeSetChannel := make(chan int)

registry.GetGauge(
"totalHangingTaskCount",
map[string]string{"type": "long"},
).On("Set", float64(0)).Return(mock.Anything).Run(
func(args mock.Arguments) {
select {
case gaugeSetChannel <- 0:
default:
}
},
)

_ = <-gaugeSetChannel
registry.AssertAllExpectations(t)
}

0 comments on commit 3ca8322

Please sign in to comment.