Skip to content

Commit

Permalink
[Disk Manager] Speeding up takeBaseDisksToSchedule (#2221)
Browse files Browse the repository at this point in the history
* fix sql query for getPoolConfigs

* add getPoolConfigsForActiveImages

* add more duration buckets

* update parallelism config

* rename method, edit config and make exponentioal duration buckets

* rename method
  • Loading branch information
Rattysed authored Oct 7, 2024
1 parent cf3b141 commit 4a90895
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ message PoolsConfig {
// Maximum number of base disks simultaneously creating from one image.
optional uint32 MaxBaseDisksInflight = 2 [default = 5];
optional uint32 MaxBaseDiskUnits = 3 [default = 640];
optional uint32 TakeBaseDisksToScheduleParallelism = 4 [default = 20];
optional uint32 TakeBaseDisksToScheduleParallelism = 4 [default = 100];
optional string ScheduleBaseDisksTaskScheduleInterval = 5 [default = "1m"];
optional string DeleteBaseDisksTaskScheduleInterval = 6 [default = "1m"];
optional string StorageFolder = 7 [default = "pools"];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ func (s *callStats) onError() {
////////////////////////////////////////////////////////////////////////////////

func callDurationBuckets() common_metrics.DurationBuckets {
return common_metrics.NewDurationBuckets(
10*time.Millisecond, 20*time.Millisecond, 50*time.Millisecond,
100*time.Millisecond, 200*time.Millisecond, 500*time.Millisecond,
1*time.Second, 2*time.Second, 5*time.Second,
return common_metrics.NewExponentialDurationBuckets(
10*time.Millisecond, 2, 14,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,44 @@ func (s *storageYDB) getPoolConfigs(
return configs, nil
}

func (s *storageYDB) getPoolConfigsWithNonZeroCapacity(
ctx context.Context,
session *persistence.Session,
) (configs []poolConfig, err error) {

res, err := session.StreamExecuteRO(ctx, fmt.Sprintf(`
--!syntax_v1
pragma TablePathPrefix = "%v";
select *
from configs
where capacity > 0
`, s.tablesPath,
))
if err != nil {
return nil, err
}
defer res.Close()

for res.NextResultSet(ctx) {
for res.NextRow() {
config, err := scanPoolConfig(res)
if err != nil {
return nil, err
}
configs = append(configs, config)
}
}

// NOTE: always check stream query result after iteration.
err = res.Err()
if err != nil {
return nil, errors.NewRetriableError(err)
}

return configs, nil
}

func (s *storageYDB) getBaseDisksScheduling(
ctx context.Context,
session *persistence.Session,
Expand Down Expand Up @@ -2181,7 +2219,7 @@ func (s *storageYDB) takeBaseDisksToSchedule(

defer s.metrics.StatCall("takeBaseDisksToSchedule")(&err)

configs, err := s.getPoolConfigs(ctx, session)
configs, err := s.getPoolConfigsWithNonZeroCapacity(ctx, session)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 4a90895

Please sign in to comment.