Skip to content

Commit

Permalink
feat(stat): add cardtime api (PaddlePaddle#1292)
Browse files Browse the repository at this point in the history
* cardtime api

* card time fix,fix GetCardTimeInfo

* card time fix for unit test

* card time fix , delete minduration

* card time fix minduration

* card time fix unit test

* card time fix, add created_at
  • Loading branch information
hyx930 authored Jan 3, 2024
1 parent 75d125f commit 067e7f6
Show file tree
Hide file tree
Showing 8 changed files with 986 additions and 10 deletions.
6 changes: 5 additions & 1 deletion pkg/apiserver/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common
import (
"encoding/base64"
"fmt"
"math"
"math/rand"
"regexp"
"strings"
Expand Down Expand Up @@ -223,7 +224,6 @@ func CheckFsNested(path1, path2 string) bool {
if len(path1Arr) > len(path2Arr) {
minIndex = len(path2Arr)
}

for index := 0; index < minIndex; index++ {
if path1Arr[index] != path2Arr[index] {
return false
Expand Down Expand Up @@ -270,3 +270,7 @@ func CheckPermission(requestUserName, ownerUserName, resourceType, resourceID st
}
return nil
}

func Floor2decimal(num float64) float64 {
return math.Floor(num*100) / 100.0
}
278 changes: 271 additions & 7 deletions pkg/apiserver/controller/statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,34 @@ limitations under the License.
package statistics

import (
"errors"
"fmt"
"strings"
"time"

prometheusModel "github.com/prometheus/common/model"

"github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/common"
"github.com/PaddlePaddle/PaddleFlow/pkg/common/consts"
"github.com/PaddlePaddle/PaddleFlow/pkg/common/logger"
"github.com/PaddlePaddle/PaddleFlow/pkg/common/resources"
"github.com/PaddlePaddle/PaddleFlow/pkg/common/schema"
"github.com/PaddlePaddle/PaddleFlow/pkg/model"
"github.com/PaddlePaddle/PaddleFlow/pkg/monitor"
"github.com/PaddlePaddle/PaddleFlow/pkg/storage"
)

var metricNameList = [...]string{
consts.MetricCpuUsageRate, consts.MetricMemoryUsageRate,
consts.MetricMemoryUsage, consts.MetricDiskUsage,
consts.MetricNetReceiveBytes, consts.MetricNetSendBytes,
consts.MetricDiskReadRate, consts.MetricDiskWriteRate,
consts.MetricGpuUtil, consts.MetricGpuMemoryUtil,
consts.MetricGpuMemoryUsage}
// cardNameList 存放所有的资源类型
var (
cardNameList = []string{"nvidia.com/gpu", "_cgpu"}
metricNameList = [...]string{
consts.MetricCpuUsageRate, consts.MetricMemoryUsageRate,
consts.MetricMemoryUsage, consts.MetricDiskUsage,
consts.MetricNetReceiveBytes, consts.MetricNetSendBytes,
consts.MetricDiskReadRate, consts.MetricDiskWriteRate,
consts.MetricGpuUtil, consts.MetricGpuMemoryUtil,
consts.MetricGpuMemoryUsage}
)

type JobStatisticsResponse struct {
MetricsInfo map[string]string `json:"metricsInfo"`
Expand All @@ -58,6 +66,39 @@ type MetricInfo struct {
Values [][2]float64 `json:"values"`
}

type GetCardTimeBatchRequest struct {
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
QueueNames []string `json:"queueNames"`
}

type GetCardTimeResponse struct {
Data []*QueueCardTimeInfo `json:"data"`
}

type QueueCardTimeInfo struct {
QueueName string `json:"queueName"`
CardTime float64 `json:"cardTime"`
DeviceType string `json:"deviceType"`
Detail []JobDetail `json:"detail"`
}

type JobCardTimeInfo struct {
JobID string `json:"jobId"`
CardTime float64 `json:"cardTime"`
CreateTime string `json:"createTime"`
StartTime string `json:"startTime"`
FinishTime string `json:"finishTime"`
DeviceCount int `json:"deviceCount"`
}

type JobDetail struct {
UserName string `json:"userName"`
JobInfoList []JobCardTimeInfo `json:"jobInfoList"`
JobCount int `json:"jobCount"`
TotalCardTime float64 `json:"totalCardTime"`
}

func GetJobStatistics(ctx *logger.RequestContext, jobID string) (*JobStatisticsResponse, error) {
response := &JobStatisticsResponse{
MetricsInfo: make(map[string]string),
Expand Down Expand Up @@ -221,3 +262,226 @@ func convertResultToResponse(response *JobStatisticsResponse, result float64, me
func checkJobPermission(ctx *logger.RequestContext, job *model.Job) bool {
return common.IsRootUser(ctx.UserName) || ctx.UserName == job.UserName
}

// GetCardTimeInfo 函数从给定的队列中获取卡片时间信息
// ctx: 请求上下文
// queueNames: 队列名称列表
// startTimeStr: 开始时间字符串
// endTimeStr: 结束时间字符串
// 返回值:
// []*GetCardTimeResponse: 卡片时间信息列表
// error: 错误信息
func GetCardTimeInfo(ctx *logger.RequestContext, queueNames []string, startTimeStr string, endTimeStr string) (*GetCardTimeResponse, error) {
var cardTimeInfoList []*QueueCardTimeInfo
startTime, err := time.Parse(model.TimeFormat, startTimeStr)
if err != nil {
ctx.Logging().Errorf("[GetCardTimeInfo] startTime parse failed, err:%s", err.Error())
return nil, err
}
endTime, err := time.Parse(model.TimeFormat, endTimeStr)
if err != nil {
ctx.Logging().Errorf("[GetCardTimeInfo] endTime parse failed, err:%s", err.Error())
return nil, err
}
if startTime.After(endTime) {
return nil, errors.New("[GetCardTimeInfo] startTime must be before than endTime")
}
minDuration := time.Second * 0
// 若指定的时间段长度小于最小任务运行时间,报错
logger.Logger().Infof("[GetCardTimeInfo] queueNames:%s, startDate:%v, endDate:%v", queueNames, startTime, endTime)
if endTime.Sub(startTime) < minDuration {
return nil, fmt.Errorf("time period less than minDuration")
}
for _, queueName := range queueNames {
queue, err := storage.Queue.GetQueueByName(queueName)
if err != nil {
ctx.ErrorMessage = err.Error()
ctx.Logging().Errorf("get queue by name failed. queuerName:[%s]", queueName)
continue
}
logger.Logger().Infof("[GetCardTimeInfo] queueName:%s, queueID:%s", queueName, queue.ID)
detailInfo, cardTime, err := GetCardTimeByQueueID(startTime, endTime, queue.ID, minDuration)
if err != nil {
ctx.ErrorMessage = err.Error()
ctx.Logging().Errorf("get cardTime failed. queuerName:[%s]", queue.Name)
continue
}
cardTimeInfo := &QueueCardTimeInfo{
QueueName: queue.Name,
CardTime: cardTime,
Detail: detailInfo,
}
cardTimeInfoList = append(cardTimeInfoList, cardTimeInfo)
}
return &GetCardTimeResponse{cardTimeInfoList}, nil
}

// GetCardTimeByQueueID 根据队列ID获取任务卡片运行时间
//
// 参数:
// startDate: 开始时间,格式为20xx-xx-xx
// endDate: 结束时间,格式为20xx-xx-xx
// queueID: 队列ID
// minDuration: 最小任务运行时间
//
// 返回值:
// []JobDetail: 任务详情列表
// float64: 任务卡片运行时间总和
// error: 错误信息,若成功则为nil
func GetCardTimeByQueueID(startDate time.Time, endDate time.Time,
queueID string, minDuration time.Duration) ([]JobDetail, float64, error) {
// endDate 原本为 20xx-xx-xx xx:59:59 加一秒
endDate = endDate.Add(time.Second)
logger.Logger().Infof("[GetCardTimeFromQueueID] startDate:%s, endDate:%s,queueID: %s", startDate, endDate, queueID)
// 初始化detailInfo,map的key为userName,value为[]PaddleJobStatusDataForCardTime
// TODO: 优化大数据量时的查询方案
limit, offset := 5000, 0
jobStats, err := storage.Job.ListJobStat(startDate, endDate, queueID, limit, offset)
logger.Logger().Debugf("[GetCardTimeFromQueueID] job stats: %v", jobStats)
if err != nil {
logger.Logger().Errorf("[GetCardTimeFromQueueID] list job status failed, error: %s", err.Error())
return nil, 0, err
}
detailInfoMap := make(map[string][]JobCardTimeInfo)
for caseType, jobStat := range jobStats {
detailInfoMap, err = FulfillDetailInfo(startDate, endDate, detailInfoMap, jobStat, caseType, minDuration)
if err != nil {
logger.Logger().Errorf("processCardTimeCase error: %s", err.Error())
}
}
logger.Logger().Infof("[GetCardTimeFromQueueID] detail info map: %v", detailInfoMap)
cardTimeForGroup := float64(0)
var detailInfo []JobDetail
// 遍历detailInfoMap,计算每个用户的卡时以及该group的总卡时
for userName, jobInfoList := range detailInfoMap {
var totalCardTime float64 = 0
for _, jobStatusData := range jobInfoList {
totalCardTime += jobStatusData.CardTime
}
totalCardTime = common.Floor2decimal(totalCardTime)
detailInfo = append(detailInfo, JobDetail{
UserName: userName,
JobInfoList: jobInfoList,
JobCount: len(jobInfoList),
TotalCardTime: totalCardTime,
})
cardTimeForGroup += totalCardTime
}
logger.Logger().Infof("[GetCardTimeFromQueueID] detail info: %v", detailInfo)
return detailInfo, cardTimeForGroup, nil
}

// containsStr 判断目标字符串是否包含切片中的任意一个字符串
// 参数 target: 目标字符串
// 参数 strSlice: 字符串切片
// 返回值: 如果目标字符串包含切片中的任意一个字符串,返回 true,否则返回 false
func containsStr(target string, strSlice []string) bool {
for _, str := range strSlice {
if strings.HasSuffix(target, str) {
return true
}
}
return false
}

// GetFlavourCards 根据传入的 Flavour 参数和设备卡类型列表,返回符合条件的资源数量
func GetFlavourCards(flavour schema.Flavour, deviceCardTypes []string) int {
res, err := resources.NewResourceFromMap(flavour.ToMap())
if err != nil {
logger.Logger().Errorf("[GetFlavourCards] NewResourceFromMap failed, error: %s", err.Error())
return 0
}
for rName, rValue := range res.ScalarResources("") {
if containsStr(rName, deviceCardTypes) && rValue > 0 {
return int(rValue)
}
}
return 0
}

// GetGpuCards 函数用于获取指定任务状态下的GPU卡数量
// 参数jobStatus为任务状态,类型为model.Job指针
// 返回值为int类型,表示GPU卡数量
func GetGpuCards(jobStatus *model.Job) int {
members := jobStatus.Members
var gpuCards int = 0
for _, member := range members {
gpuCards += GetFlavourCards(member.Flavour, cardNameList) * member.Replicas
}
return gpuCards
}

// FulfillDetailInfo 函数用于填充任务详情信息
//
// 参数:
// startTime: 开始时间
// endTime: 结束时间
// detailInfo: 任务详情信息
// jobStatusCase: 任务状态列表
// caseType: 任务状态类型
// minDuration: 最小持续时间
//
// 返回值:
// map[string][]JobCardTimeInfo: 更新后的任务详情信息
// error: 错误信息
func FulfillDetailInfo(startTime time.Time, endTime time.Time, detailInfo map[string][]JobCardTimeInfo,
jobStatusCase []*model.Job, caseType string, minDuration time.Duration) (map[string][]JobCardTimeInfo, error) {
logger.Logger().Infof("[FulfillDetailInfo] case type:%s, job status cases:%v", caseType, jobStatusCase)
var cardTimeCalculation = func(jobStatus *model.Job, startDate, endDate time.Time, gpuCards int) (time.Duration, float64) {
var cardTime float64 = 0
var jobDuration time.Duration
switch caseType {
case "case1":
// 任务开始运行时间 < start_date, 任务结束时间 > end_date 或者 任务尚未结束
jobDuration = endDate.Sub(startDate)
cardTime = float64(gpuCards) * endDate.Sub(startDate).Seconds()
case "case2":
// 任务开始运行时间 < start_date,任务结束时间 <= end_date
jobDuration = jobStatus.FinishedAt.Time.Sub(startDate)
cardTime = jobDuration.Seconds() * float64(gpuCards)
case "case3":
// 任务开始运行时间 >= start_date,任务结束时间 <= end_date
jobDuration = jobStatus.FinishedAt.Time.Sub((*jobStatus).ActivatedAt.Time)
cardTime = jobDuration.Seconds() * float64(gpuCards)
case "case4":
// 任务开始运行时间 >= start_date, 任务结束时间 > end_date 或者 任务尚未结束
jobDuration = endDate.Sub((*jobStatus).ActivatedAt.Time)
cardTime = jobDuration.Seconds() * float64(gpuCards)
}
return jobDuration, cardTime
}

for _, jobStatus := range jobStatusCase {
gpuCards := GetGpuCards(jobStatus)
logger.Logger().Infof("[FulfillDetailInfo] jobID: %s,gpu cards:%v", jobStatus.ID, gpuCards)
jobDuration, cardTime := cardTimeCalculation(jobStatus, startTime, endTime, gpuCards)
if jobDuration < minDuration {
continue
}
cardTime = cardTime / 3600
cardTime = common.Floor2decimal(cardTime)
_, ok := detailInfo[jobStatus.UserName]
if ok {
detailInfo[jobStatus.UserName] = append(detailInfo[jobStatus.UserName], JobCardTimeInfo{
JobID: jobStatus.ID,
CardTime: cardTime,
CreateTime: jobStatus.CreatedAt.Format(model.TimeFormat),
StartTime: jobStatus.ActivatedAt.Time.Format(model.TimeFormat),
FinishTime: jobStatus.FinishedAt.Time.Format(model.TimeFormat),
DeviceCount: gpuCards,
})
} else {
var jobStatusDataForCardTimeList []JobCardTimeInfo
jobStatusDataForCardTimeList = append(jobStatusDataForCardTimeList, JobCardTimeInfo{
JobID: jobStatus.ID,
CardTime: cardTime,
CreateTime: jobStatus.CreatedAt.Format(model.TimeFormat),
StartTime: jobStatus.ActivatedAt.Time.Format(model.TimeFormat),
FinishTime: jobStatus.FinishedAt.Time.Format(model.TimeFormat),
DeviceCount: gpuCards,
})
detailInfo[jobStatus.UserName] = jobStatusDataForCardTimeList
}
}
return detailInfo, nil
}
Loading

0 comments on commit 067e7f6

Please sign in to comment.