From 067e7f68df7c6ede6ec5abac05319cdd2cd628f7 Mon Sep 17 00:00:00 2001 From: hyx930 <55907933+hyx930@users.noreply.github.com> Date: Wed, 3 Jan 2024 17:32:46 +0800 Subject: [PATCH] feat(stat): add cardtime api (#1292) * cardtime api * card time fix,fix GetCardTimeInfo * card time fix for unit test * card time fix , delete minduration * card time fix minduration * card time fix unit test * card time fix, add created_at --- pkg/apiserver/common/utils.go | 6 +- .../controller/statistics/statistics.go | 278 ++++++++++++++- .../controller/statistics/statistics_test.go | 322 ++++++++++++++++++ pkg/apiserver/router/util/util.go | 1 + pkg/apiserver/router/v1/statistics.go | 34 ++ pkg/apiserver/router/v1/statistics_test.go | 295 ++++++++++++++++ pkg/storage/interface.go | 7 +- pkg/storage/job.go | 53 +++ 8 files changed, 986 insertions(+), 10 deletions(-) create mode 100644 pkg/apiserver/controller/statistics/statistics_test.go create mode 100644 pkg/apiserver/router/v1/statistics_test.go diff --git a/pkg/apiserver/common/utils.go b/pkg/apiserver/common/utils.go index affedccc0..b9618ff02 100644 --- a/pkg/apiserver/common/utils.go +++ b/pkg/apiserver/common/utils.go @@ -19,6 +19,7 @@ package common import ( "encoding/base64" "fmt" + "math" "math/rand" "regexp" "strings" @@ -223,7 +224,6 @@ func CheckFsNested(path1, path2 string) bool { if len(path1Arr) > len(path2Arr) { minIndex = len(path2Arr) } - for index := 0; index < minIndex; index++ { if path1Arr[index] != path2Arr[index] { return false @@ -270,3 +270,7 @@ func CheckPermission(requestUserName, ownerUserName, resourceType, resourceID st } return nil } + +func Floor2decimal(num float64) float64 { + return math.Floor(num*100) / 100.0 +} diff --git a/pkg/apiserver/controller/statistics/statistics.go b/pkg/apiserver/controller/statistics/statistics.go index e624382ae..cf6c15097 100644 --- a/pkg/apiserver/controller/statistics/statistics.go +++ b/pkg/apiserver/controller/statistics/statistics.go @@ -17,26 +17,34 @@ limitations under the License. package statistics import ( + "errors" "fmt" + "strings" + "time" prometheusModel "github.com/prometheus/common/model" "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/common" "github.com/PaddlePaddle/PaddleFlow/pkg/common/consts" "github.com/PaddlePaddle/PaddleFlow/pkg/common/logger" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/resources" "github.com/PaddlePaddle/PaddleFlow/pkg/common/schema" "github.com/PaddlePaddle/PaddleFlow/pkg/model" "github.com/PaddlePaddle/PaddleFlow/pkg/monitor" "github.com/PaddlePaddle/PaddleFlow/pkg/storage" ) -var metricNameList = [...]string{ - consts.MetricCpuUsageRate, consts.MetricMemoryUsageRate, - consts.MetricMemoryUsage, consts.MetricDiskUsage, - consts.MetricNetReceiveBytes, consts.MetricNetSendBytes, - consts.MetricDiskReadRate, consts.MetricDiskWriteRate, - consts.MetricGpuUtil, consts.MetricGpuMemoryUtil, - consts.MetricGpuMemoryUsage} +// cardNameList 存放所有的资源类型 +var ( + cardNameList = []string{"nvidia.com/gpu", "_cgpu"} + metricNameList = [...]string{ + consts.MetricCpuUsageRate, consts.MetricMemoryUsageRate, + consts.MetricMemoryUsage, consts.MetricDiskUsage, + consts.MetricNetReceiveBytes, consts.MetricNetSendBytes, + consts.MetricDiskReadRate, consts.MetricDiskWriteRate, + consts.MetricGpuUtil, consts.MetricGpuMemoryUtil, + consts.MetricGpuMemoryUsage} +) type JobStatisticsResponse struct { MetricsInfo map[string]string `json:"metricsInfo"` @@ -58,6 +66,39 @@ type MetricInfo struct { Values [][2]float64 `json:"values"` } +type GetCardTimeBatchRequest struct { + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + QueueNames []string `json:"queueNames"` +} + +type GetCardTimeResponse struct { + Data []*QueueCardTimeInfo `json:"data"` +} + +type QueueCardTimeInfo struct { + QueueName string `json:"queueName"` + CardTime float64 `json:"cardTime"` + DeviceType string `json:"deviceType"` + Detail []JobDetail `json:"detail"` +} + +type JobCardTimeInfo struct { + JobID string `json:"jobId"` + CardTime float64 `json:"cardTime"` + CreateTime string `json:"createTime"` + StartTime string `json:"startTime"` + FinishTime string `json:"finishTime"` + DeviceCount int `json:"deviceCount"` +} + +type JobDetail struct { + UserName string `json:"userName"` + JobInfoList []JobCardTimeInfo `json:"jobInfoList"` + JobCount int `json:"jobCount"` + TotalCardTime float64 `json:"totalCardTime"` +} + func GetJobStatistics(ctx *logger.RequestContext, jobID string) (*JobStatisticsResponse, error) { response := &JobStatisticsResponse{ MetricsInfo: make(map[string]string), @@ -221,3 +262,226 @@ func convertResultToResponse(response *JobStatisticsResponse, result float64, me func checkJobPermission(ctx *logger.RequestContext, job *model.Job) bool { return common.IsRootUser(ctx.UserName) || ctx.UserName == job.UserName } + +// GetCardTimeInfo 函数从给定的队列中获取卡片时间信息 +// ctx: 请求上下文 +// queueNames: 队列名称列表 +// startTimeStr: 开始时间字符串 +// endTimeStr: 结束时间字符串 +// 返回值: +// []*GetCardTimeResponse: 卡片时间信息列表 +// error: 错误信息 +func GetCardTimeInfo(ctx *logger.RequestContext, queueNames []string, startTimeStr string, endTimeStr string) (*GetCardTimeResponse, error) { + var cardTimeInfoList []*QueueCardTimeInfo + startTime, err := time.Parse(model.TimeFormat, startTimeStr) + if err != nil { + ctx.Logging().Errorf("[GetCardTimeInfo] startTime parse failed, err:%s", err.Error()) + return nil, err + } + endTime, err := time.Parse(model.TimeFormat, endTimeStr) + if err != nil { + ctx.Logging().Errorf("[GetCardTimeInfo] endTime parse failed, err:%s", err.Error()) + return nil, err + } + if startTime.After(endTime) { + return nil, errors.New("[GetCardTimeInfo] startTime must be before than endTime") + } + minDuration := time.Second * 0 + // 若指定的时间段长度小于最小任务运行时间,报错 + logger.Logger().Infof("[GetCardTimeInfo] queueNames:%s, startDate:%v, endDate:%v", queueNames, startTime, endTime) + if endTime.Sub(startTime) < minDuration { + return nil, fmt.Errorf("time period less than minDuration") + } + for _, queueName := range queueNames { + queue, err := storage.Queue.GetQueueByName(queueName) + if err != nil { + ctx.ErrorMessage = err.Error() + ctx.Logging().Errorf("get queue by name failed. queuerName:[%s]", queueName) + continue + } + logger.Logger().Infof("[GetCardTimeInfo] queueName:%s, queueID:%s", queueName, queue.ID) + detailInfo, cardTime, err := GetCardTimeByQueueID(startTime, endTime, queue.ID, minDuration) + if err != nil { + ctx.ErrorMessage = err.Error() + ctx.Logging().Errorf("get cardTime failed. queuerName:[%s]", queue.Name) + continue + } + cardTimeInfo := &QueueCardTimeInfo{ + QueueName: queue.Name, + CardTime: cardTime, + Detail: detailInfo, + } + cardTimeInfoList = append(cardTimeInfoList, cardTimeInfo) + } + return &GetCardTimeResponse{cardTimeInfoList}, nil +} + +// GetCardTimeByQueueID 根据队列ID获取任务卡片运行时间 +// +// 参数: +// startDate: 开始时间,格式为20xx-xx-xx +// endDate: 结束时间,格式为20xx-xx-xx +// queueID: 队列ID +// minDuration: 最小任务运行时间 +// +// 返回值: +// []JobDetail: 任务详情列表 +// float64: 任务卡片运行时间总和 +// error: 错误信息,若成功则为nil +func GetCardTimeByQueueID(startDate time.Time, endDate time.Time, + queueID string, minDuration time.Duration) ([]JobDetail, float64, error) { + // endDate 原本为 20xx-xx-xx xx:59:59 加一秒 + endDate = endDate.Add(time.Second) + logger.Logger().Infof("[GetCardTimeFromQueueID] startDate:%s, endDate:%s,queueID: %s", startDate, endDate, queueID) + // 初始化detailInfo,map的key为userName,value为[]PaddleJobStatusDataForCardTime + // TODO: 优化大数据量时的查询方案 + limit, offset := 5000, 0 + jobStats, err := storage.Job.ListJobStat(startDate, endDate, queueID, limit, offset) + logger.Logger().Debugf("[GetCardTimeFromQueueID] job stats: %v", jobStats) + if err != nil { + logger.Logger().Errorf("[GetCardTimeFromQueueID] list job status failed, error: %s", err.Error()) + return nil, 0, err + } + detailInfoMap := make(map[string][]JobCardTimeInfo) + for caseType, jobStat := range jobStats { + detailInfoMap, err = FulfillDetailInfo(startDate, endDate, detailInfoMap, jobStat, caseType, minDuration) + if err != nil { + logger.Logger().Errorf("processCardTimeCase error: %s", err.Error()) + } + } + logger.Logger().Infof("[GetCardTimeFromQueueID] detail info map: %v", detailInfoMap) + cardTimeForGroup := float64(0) + var detailInfo []JobDetail + // 遍历detailInfoMap,计算每个用户的卡时以及该group的总卡时 + for userName, jobInfoList := range detailInfoMap { + var totalCardTime float64 = 0 + for _, jobStatusData := range jobInfoList { + totalCardTime += jobStatusData.CardTime + } + totalCardTime = common.Floor2decimal(totalCardTime) + detailInfo = append(detailInfo, JobDetail{ + UserName: userName, + JobInfoList: jobInfoList, + JobCount: len(jobInfoList), + TotalCardTime: totalCardTime, + }) + cardTimeForGroup += totalCardTime + } + logger.Logger().Infof("[GetCardTimeFromQueueID] detail info: %v", detailInfo) + return detailInfo, cardTimeForGroup, nil +} + +// containsStr 判断目标字符串是否包含切片中的任意一个字符串 +// 参数 target: 目标字符串 +// 参数 strSlice: 字符串切片 +// 返回值: 如果目标字符串包含切片中的任意一个字符串,返回 true,否则返回 false +func containsStr(target string, strSlice []string) bool { + for _, str := range strSlice { + if strings.HasSuffix(target, str) { + return true + } + } + return false +} + +// GetFlavourCards 根据传入的 Flavour 参数和设备卡类型列表,返回符合条件的资源数量 +func GetFlavourCards(flavour schema.Flavour, deviceCardTypes []string) int { + res, err := resources.NewResourceFromMap(flavour.ToMap()) + if err != nil { + logger.Logger().Errorf("[GetFlavourCards] NewResourceFromMap failed, error: %s", err.Error()) + return 0 + } + for rName, rValue := range res.ScalarResources("") { + if containsStr(rName, deviceCardTypes) && rValue > 0 { + return int(rValue) + } + } + return 0 +} + +// GetGpuCards 函数用于获取指定任务状态下的GPU卡数量 +// 参数jobStatus为任务状态,类型为model.Job指针 +// 返回值为int类型,表示GPU卡数量 +func GetGpuCards(jobStatus *model.Job) int { + members := jobStatus.Members + var gpuCards int = 0 + for _, member := range members { + gpuCards += GetFlavourCards(member.Flavour, cardNameList) * member.Replicas + } + return gpuCards +} + +// FulfillDetailInfo 函数用于填充任务详情信息 +// +// 参数: +// startTime: 开始时间 +// endTime: 结束时间 +// detailInfo: 任务详情信息 +// jobStatusCase: 任务状态列表 +// caseType: 任务状态类型 +// minDuration: 最小持续时间 +// +// 返回值: +// map[string][]JobCardTimeInfo: 更新后的任务详情信息 +// error: 错误信息 +func FulfillDetailInfo(startTime time.Time, endTime time.Time, detailInfo map[string][]JobCardTimeInfo, + jobStatusCase []*model.Job, caseType string, minDuration time.Duration) (map[string][]JobCardTimeInfo, error) { + logger.Logger().Infof("[FulfillDetailInfo] case type:%s, job status cases:%v", caseType, jobStatusCase) + var cardTimeCalculation = func(jobStatus *model.Job, startDate, endDate time.Time, gpuCards int) (time.Duration, float64) { + var cardTime float64 = 0 + var jobDuration time.Duration + switch caseType { + case "case1": + // 任务开始运行时间 < start_date, 任务结束时间 > end_date 或者 任务尚未结束 + jobDuration = endDate.Sub(startDate) + cardTime = float64(gpuCards) * endDate.Sub(startDate).Seconds() + case "case2": + // 任务开始运行时间 < start_date,任务结束时间 <= end_date + jobDuration = jobStatus.FinishedAt.Time.Sub(startDate) + cardTime = jobDuration.Seconds() * float64(gpuCards) + case "case3": + // 任务开始运行时间 >= start_date,任务结束时间 <= end_date + jobDuration = jobStatus.FinishedAt.Time.Sub((*jobStatus).ActivatedAt.Time) + cardTime = jobDuration.Seconds() * float64(gpuCards) + case "case4": + // 任务开始运行时间 >= start_date, 任务结束时间 > end_date 或者 任务尚未结束 + jobDuration = endDate.Sub((*jobStatus).ActivatedAt.Time) + cardTime = jobDuration.Seconds() * float64(gpuCards) + } + return jobDuration, cardTime + } + + for _, jobStatus := range jobStatusCase { + gpuCards := GetGpuCards(jobStatus) + logger.Logger().Infof("[FulfillDetailInfo] jobID: %s,gpu cards:%v", jobStatus.ID, gpuCards) + jobDuration, cardTime := cardTimeCalculation(jobStatus, startTime, endTime, gpuCards) + if jobDuration < minDuration { + continue + } + cardTime = cardTime / 3600 + cardTime = common.Floor2decimal(cardTime) + _, ok := detailInfo[jobStatus.UserName] + if ok { + detailInfo[jobStatus.UserName] = append(detailInfo[jobStatus.UserName], JobCardTimeInfo{ + JobID: jobStatus.ID, + CardTime: cardTime, + CreateTime: jobStatus.CreatedAt.Format(model.TimeFormat), + StartTime: jobStatus.ActivatedAt.Time.Format(model.TimeFormat), + FinishTime: jobStatus.FinishedAt.Time.Format(model.TimeFormat), + DeviceCount: gpuCards, + }) + } else { + var jobStatusDataForCardTimeList []JobCardTimeInfo + jobStatusDataForCardTimeList = append(jobStatusDataForCardTimeList, JobCardTimeInfo{ + JobID: jobStatus.ID, + CardTime: cardTime, + CreateTime: jobStatus.CreatedAt.Format(model.TimeFormat), + StartTime: jobStatus.ActivatedAt.Time.Format(model.TimeFormat), + FinishTime: jobStatus.FinishedAt.Time.Format(model.TimeFormat), + DeviceCount: gpuCards, + }) + detailInfo[jobStatus.UserName] = jobStatusDataForCardTimeList + } + } + return detailInfo, nil +} diff --git a/pkg/apiserver/controller/statistics/statistics_test.go b/pkg/apiserver/controller/statistics/statistics_test.go new file mode 100644 index 000000000..e8b662e01 --- /dev/null +++ b/pkg/apiserver/controller/statistics/statistics_test.go @@ -0,0 +1,322 @@ +package statistics + +import ( + "database/sql" + "testing" + "time" + + "github.com/PaddlePaddle/PaddleFlow/pkg/common/config" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/logger" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/resources" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/schema" + "github.com/PaddlePaddle/PaddleFlow/pkg/model" + "github.com/PaddlePaddle/PaddleFlow/pkg/storage" + "github.com/PaddlePaddle/PaddleFlow/pkg/storage/driver" + "github.com/stretchr/testify/assert" +) + +const ( + MockQueueName = "default-queue" + MockQueueID = "default-queue" + MockClusterName = "default-cluster" + MockClusterID = "default-cluster" + MockRootUser = "root" +) + +// test for func GetCardTimeByQueueName +func TestGetCardTimeByQueueName(t *testing.T) { + maxRes, err := resources.NewResourceFromMap(map[string]string{ + resources.ResCPU: "10", + resources.ResMemory: "20Gi", + "nvidia.com/gpu": "500", + }) + assert.Equal(t, nil, err) + + driver.InitMockDB() + mockCluster := model.ClusterInfo{ + Model: model.Model{ + ID: MockClusterID, + }, + Name: MockClusterName, + } + mockQueue1 := model.Queue{ + Name: MockQueueName, + Model: model.Model{ + ID: MockQueueID, + }, + Namespace: "paddleflow", + ClusterId: MockClusterID, + ClusterName: MockClusterName, + QuotaType: schema.TypeVolcanoCapabilityQuota, + MaxResources: maxRes, + SchedulingPolicy: []string{"s1", "s2"}, + Status: schema.StatusQueueOpen, + } + mockQueue2 := model.Queue{ + Name: MockQueueName + "2", + Model: model.Model{ + ID: MockQueueID + "2", + }, + Namespace: "paddleflow", + ClusterId: MockClusterID, + ClusterName: MockClusterName, + QuotaType: schema.TypeVolcanoCapabilityQuota, + MaxResources: maxRes, + SchedulingPolicy: []string{"s1", "s2"}, + Status: schema.StatusQueueOpen, + } + mockJob1 := model.Job{ + ID: "MockJobID", + Name: "MockJobName", + UserName: MockRootUser, + QueueID: MockQueueID, + ActivatedAt: sql.NullTime{ + Time: time.Date(2023, 3, 2, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + FinishedAt: sql.NullTime{ + Time: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Valid: true, + }, + UpdatedAt: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Members: []schema.Member{ + { + Replicas: 1, + Conf: schema.Conf{ + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{ + ScalarResources: map[schema.ResourceName]string{ + "nvidia.com/gpu": "1", + }, + }, + }, + }, + }, + }, + } + mockJob2 := model.Job{ + ID: "MockJobID2", + Name: "MockJobName2", + UserName: MockRootUser, + QueueID: MockQueueID + "2", + ActivatedAt: sql.NullTime{ + Time: time.Date(2023, 3, 2, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + FinishedAt: sql.NullTime{ + Time: time.Date(2023, 3, 3, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + UpdatedAt: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Members: []schema.Member{ + { + Replicas: 1, + Conf: schema.Conf{ + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{ + ScalarResources: map[schema.ResourceName]string{ + "nvidia.com/gpu": "2", + }, + }, + }, + }, + }, + }, + } + mockJob3 := model.Job{ + ID: "MockJobID3", + Name: "MockJobName3", + UserName: MockRootUser, + QueueID: MockQueueID, + ActivatedAt: sql.NullTime{ + Time: time.Date(2023, 3, 2, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + FinishedAt: sql.NullTime{ + Time: time.Date(2023, 3, 3, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + UpdatedAt: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Members: []schema.Member{ + { + Replicas: 1, + Conf: schema.Conf{ + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{ + ScalarResources: map[schema.ResourceName]string{ + "cpu": "2", + }, + }, + }, + }, + }, + }, + } + mockJob4 := model.Job{ + ID: "MockJobID4", + Name: "MockJobName4", + UserName: MockRootUser, + QueueID: MockQueueID, + ActivatedAt: sql.NullTime{ + Time: time.Date(2023, 3, 1, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + FinishedAt: sql.NullTime{ + Time: time.Date(2023, 3, 7, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + UpdatedAt: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Members: []schema.Member{ + { + Replicas: 1, + Conf: schema.Conf{ + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{ + ScalarResources: map[schema.ResourceName]string{ + "nvidia.com/gpu": "2", + }, + }, + }, + }, + }, + }, + } + + type args struct { + ctx *logger.RequestContext + queueNames []string + startTimeStr string + endTimeStr string + } + testCases := []struct { + name string + args args + wantErr bool + responseCode int + }{ + { + name: "case1", + args: args{ + ctx: &logger.RequestContext{UserName: MockRootUser}, + queueNames: []string{MockQueueName, MockQueueName + "2"}, + //queueNames: []string{MockQueueName}, + startTimeStr: "2023-03-02 06:00:00", + endTimeStr: "2023-03-02 08:00:00", + }, + wantErr: false, + responseCode: 200, + }, + { + name: "case2", + args: args{ + ctx: &logger.RequestContext{UserName: MockRootUser}, + queueNames: []string{MockQueueName, MockQueueName + "2"}, + //queueNames: []string{MockQueueName}, + startTimeStr: "2023-03-02 06:00:00", + endTimeStr: "2023-03-02 16:00:00", + }, + wantErr: false, + responseCode: 200, + }, + { + name: "case3", + args: args{ + ctx: &logger.RequestContext{UserName: MockRootUser}, + queueNames: []string{MockQueueName, MockQueueName + "2"}, + //queueNames: []string{MockQueueName}, + startTimeStr: "2023-03-01 00:00:00", + endTimeStr: "2023-03-05 00:00:00", + }, + wantErr: false, + responseCode: 200, + }, + { + name: "case4", + args: args{ + ctx: &logger.RequestContext{UserName: MockRootUser}, + queueNames: []string{MockQueueName, MockQueueName + "2"}, + //queueNames: []string{MockQueueName}, + startTimeStr: "2023-03-01 12:00:00", + endTimeStr: "2023-03-02 16:00:00", + }, + wantErr: false, + responseCode: 200, + }, + { + name: "start time parse err", + args: args{ + ctx: &logger.RequestContext{UserName: MockRootUser}, + queueNames: []string{MockQueueName, MockQueueName + "2"}, + //queueNames: []string{MockQueueName}, + startTimeStr: "2023-03-01 00-00-00", + endTimeStr: "2023-03-05 00:00:00", + }, + wantErr: true, + responseCode: 500, + }, + { + name: "end time parse err", + args: args{ + ctx: &logger.RequestContext{UserName: MockRootUser}, + queueNames: []string{MockQueueName, MockQueueName + "2"}, + //queueNames: []string{MockQueueName}, + startTimeStr: "2023-03-01 00:00:00", + endTimeStr: "2023-03-05 00-00-00", + }, + wantErr: true, + responseCode: 500, + }, + { + name: "end time before start time", + args: args{ + ctx: &logger.RequestContext{UserName: MockRootUser}, + queueNames: []string{MockQueueName, MockQueueName + "2"}, + //queueNames: []string{MockQueueName}, + startTimeStr: "2023-03-05 00:00:00", + endTimeStr: "2023-03-01 00:00:00", + }, + wantErr: true, + responseCode: 500, + }, + { + name: "get queue by name failed", + args: args{ + ctx: &logger.RequestContext{UserName: MockRootUser}, + queueNames: []string{"000"}, + //queueNames: []string{MockQueueName}, + startTimeStr: "2023-03-01 00:00:00", + endTimeStr: "2023-03-05 00:00:00", + }, + wantErr: false, + responseCode: 500, + }, + } + + //ctx := &logger.RequestContext{UserName: MockRootUser} + config.GlobalServerConfig = &config.ServerConfig{} + config.GlobalServerConfig.Job.IsSingleCluster = true + storage.Cluster.CreateCluster(&mockCluster) + storage.Queue.CreateQueue(&mockQueue1) + storage.Queue.CreateQueue(&mockQueue2) + storage.Job.CreateJob(&mockJob1) + storage.Job.CreateJob(&mockJob2) + storage.Job.CreateJob(&mockJob3) + storage.Job.CreateJob(&mockJob4) + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + t.Logf("name=%s args=[%#v], wantError=%v", tt.name, tt.args, tt.wantErr) + res, err := GetCardTimeInfo(tt.args.ctx, tt.args.queueNames, tt.args.startTimeStr, tt.args.endTimeStr) + if tt.wantErr { + assert.Error(t, err) + t.Logf("name=%s err: %v", tt.name, err) + } else { + assert.Equal(t, nil, err) + cardTimeInfos := res.Data + for _, cardTimeInfo := range cardTimeInfos { + t.Logf("case[%s] result=%+v", tt.name, cardTimeInfo) + } + + } + }) + } +} diff --git a/pkg/apiserver/router/util/util.go b/pkg/apiserver/router/util/util.go index f5a9bf915..963fd00e8 100644 --- a/pkg/apiserver/router/util/util.go +++ b/pkg/apiserver/router/util/util.go @@ -73,6 +73,7 @@ const ( QueryKeyStatus = "status" QueryKeyTimestamp = "timestamp" QueryKeyStartTime = "startTime" + QueryKeyEndTime = "endTime" QueryKeyQueue = "queue" QueryKeyLabels = "labels" QueryKeyReadFromTail = "readFromTail" diff --git a/pkg/apiserver/router/v1/statistics.go b/pkg/apiserver/router/v1/statistics.go index 2039bad22..0aa0773dd 100644 --- a/pkg/apiserver/router/v1/statistics.go +++ b/pkg/apiserver/router/v1/statistics.go @@ -19,6 +19,7 @@ package v1 import ( "net/http" "strconv" + "strings" "github.com/go-chi/chi" log "github.com/sirupsen/logrus" @@ -26,6 +27,7 @@ import ( "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/common" "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/controller/statistics" "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/router/util" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/logger" ) type StatisticsRouter struct{} @@ -39,6 +41,8 @@ func (sr *StatisticsRouter) AddRouter(r chi.Router) { r.Get("/statistics/job/{jobID}", sr.getJobStatistics) r.Get("/statistics/jobDetail/{jobID}", sr.getJobDetailStatistics) + r.Get("/statistics/cardTime/{queueName}", sr.getCardTimeDetail) + r.Post("/statistics/cardTime", sr.getCardTimeBatch) } @@ -122,3 +126,33 @@ func validateStatisticsParam(start, end, step int64) error { } return nil } + +func (sr *StatisticsRouter) getCardTimeDetail(w http.ResponseWriter, r *http.Request) { + ctx := common.GetRequestContext(r) + queueName := strings.TrimSpace(chi.URLParam(r, util.ParamKeyQueueName)) + startTime := r.URL.Query().Get(util.QueryKeyStartTime) + endTime := r.URL.Query().Get(util.QueryKeyEndTime) + response, err := statistics.GetCardTimeInfo(&ctx, []string{queueName}, startTime, endTime) + if err != nil { + common.RenderErrWithMessage(w, ctx.RequestID, ctx.ErrorCode, ctx.ErrorMessage) + return + } + common.Render(w, http.StatusOK, response) +} + +func (sr *StatisticsRouter) getCardTimeBatch(w http.ResponseWriter, r *http.Request) { + ctx := common.GetRequestContext(r) + var request statistics.GetCardTimeBatchRequest + if err := common.BindJSON(r, &request); err != nil { + logger.LoggerForRequest(&ctx).Errorf( + "get cardTime batch failed parsing request body:%+v. error:%s", r.Body, err.Error()) + common.RenderErrWithMessage(w, ctx.RequestID, ctx.ErrorCode, err.Error()) + return + } + cardTimeBatchData, err := statistics.GetCardTimeInfo(&ctx, request.QueueNames, request.StartTime, request.EndTime) + if err != nil { + common.RenderErrWithMessage(w, ctx.RequestID, ctx.ErrorCode, ctx.ErrorMessage) + return + } + common.Render(w, http.StatusOK, cardTimeBatchData) +} diff --git a/pkg/apiserver/router/v1/statistics_test.go b/pkg/apiserver/router/v1/statistics_test.go new file mode 100644 index 000000000..57dd7fe64 --- /dev/null +++ b/pkg/apiserver/router/v1/statistics_test.go @@ -0,0 +1,295 @@ +package v1 + +import ( + "database/sql" + "fmt" + "testing" + "time" + + "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/controller/statistics" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/config" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/logger" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/resources" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/schema" + "github.com/PaddlePaddle/PaddleFlow/pkg/model" + "github.com/PaddlePaddle/PaddleFlow/pkg/storage" + "github.com/go-chi/chi" + "github.com/stretchr/testify/assert" +) + +// test for func GetCardTimeByQueueName +func TestGetCardTimeByQueueName(t *testing.T) { + + type args struct { + ctx *logger.RequestContext + queueName string + startTime string + endTime string + router *chi.Mux + } + + //router, _, baseURL := MockInitJob(t) + router, baseURL := prepareDBAndAPIForUser(t, MockRootUser) + ctx := &logger.RequestContext{UserName: mockUserName} + tests := []struct { + name string + args args + wantErr bool + responseCode int + }{ + { + name: "empty request", + args: args{ + ctx: ctx, + router: router, + queueName: MockQueueName, + startTime: "2023-03-02 06:00:00", + endTime: "2023-03-02 08:00:00", + }, + wantErr: false, + responseCode: 200, + }, + } + + MockDBForCardTime(t) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := PerformGetRequest(tt.args.router, fmt.Sprintf(baseURL+"/statistics/cardTime/%s?startTime=%s&endTime=%s", tt.args.queueName, tt.args.startTime, tt.args.endTime)) + assert.NoError(t, err) + t.Logf("res %v", res) + if tt.wantErr { + assert.NotEqual(t, res.Code, 200) + } else { + t.Logf("res: %v", res) + assert.Equal(t, tt.responseCode, res.Code) + } + }) + } +} +func TestGetCardTimeBatch(t *testing.T) { + MockDBForCardTime(t) + type args struct { + ctx *logger.RequestContext + req *statistics.GetCardTimeBatchRequest + router *chi.Mux + } + router, _, baseURL := MockInitJob(t) + ctx := &logger.RequestContext{UserName: "testusername"} + tests := []struct { + name string + args args + wantErr bool + responseCode int + }{ + { + name: "empty request", + args: args{ + ctx: ctx, + router: router, + req: &statistics.GetCardTimeBatchRequest{ + QueueNames: []string{MockQueueName}, + StartTime: "2023-03-02 06:00:00", + EndTime: "2023-03-02 08:00:00", + }, + }, + wantErr: false, + responseCode: 200, + }, + { + name: "empty request", + args: args{ + ctx: ctx, + router: router, + req: &statistics.GetCardTimeBatchRequest{ + QueueNames: []string{MockQueueName}, + StartTime: "2023-03-01 06:00:00", + EndTime: "2023-03-07 08:00:00", + }, + }, + wantErr: false, + responseCode: 200, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := PerformPostRequest(tt.args.router, baseURL+"/statistics/cardTime", tt.args.req) + assert.NoError(t, err) + t.Logf("get card time batch %v", res) + if tt.wantErr { + assert.NotEqual(t, res.Code, 200) + } else { + t.Logf("res: %v", res) + assert.Equal(t, tt.responseCode, res.Code) + } + }) + } +} + +func MockDBForCardTime(t *testing.T) { + maxRes, err := resources.NewResourceFromMap(map[string]string{ + resources.ResCPU: "10", + resources.ResMemory: "20Gi", + "nvidia.com/gpu": "500", + }) + assert.Equal(t, nil, err) + + mockCluster := model.ClusterInfo{ + Model: model.Model{ + ID: MockClusterID, + }, + Name: MockClusterName, + } + mockQueue1 := model.Queue{ + Name: MockQueueName, + Model: model.Model{ + ID: MockQueueID, + }, + Namespace: "paddleflow", + ClusterId: MockClusterID, + ClusterName: MockClusterName, + QuotaType: schema.TypeVolcanoCapabilityQuota, + MaxResources: maxRes, + SchedulingPolicy: []string{"s1", "s2"}, + Status: schema.StatusQueueOpen, + } + mockQueue2 := model.Queue{ + Name: MockQueueName + "2", + Model: model.Model{ + ID: MockQueueID + "2", + }, + Namespace: "paddleflow", + ClusterId: MockClusterID, + ClusterName: MockClusterName, + QuotaType: schema.TypeVolcanoCapabilityQuota, + MaxResources: maxRes, + SchedulingPolicy: []string{"s1", "s2"}, + Status: schema.StatusQueueOpen, + } + mockJob1 := model.Job{ + ID: "MockJobID", + Name: "MockJobName", + UserName: MockRootUser, + QueueID: MockQueueID, + ActivatedAt: sql.NullTime{ + Time: time.Date(2023, 3, 2, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + FinishedAt: sql.NullTime{ + Time: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Valid: true, + }, + UpdatedAt: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Members: []schema.Member{ + { + Replicas: 1, + Conf: schema.Conf{ + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{ + ScalarResources: map[schema.ResourceName]string{ + "nvidia.com/gpu": "1", + }, + }, + }, + }, + }, + }, + } + mockJob2 := model.Job{ + ID: "MockJobID2", + Name: "MockJobName2", + UserName: MockRootUser, + QueueID: MockQueueID + "2", + ActivatedAt: sql.NullTime{ + Time: time.Date(2023, 3, 2, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + FinishedAt: sql.NullTime{ + Time: time.Date(2023, 3, 3, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + UpdatedAt: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Members: []schema.Member{ + { + Replicas: 1, + Conf: schema.Conf{ + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{ + ScalarResources: map[schema.ResourceName]string{ + "nvidia.com/gpu": "2", + }, + }, + }, + }, + }, + }, + } + mockJob3 := model.Job{ + ID: "MockJobID3", + Name: "MockJobName3", + UserName: MockRootUser, + QueueID: MockQueueID, + ActivatedAt: sql.NullTime{ + Time: time.Date(2023, 3, 2, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + FinishedAt: sql.NullTime{ + Time: time.Date(2023, 3, 3, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + UpdatedAt: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Members: []schema.Member{ + { + Replicas: 1, + Conf: schema.Conf{ + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{ + ScalarResources: map[schema.ResourceName]string{ + "cpu": "2", + }, + }, + }, + }, + }, + }, + } + mockJob4 := model.Job{ + ID: "MockJobID4", + Name: "MockJobName4", + UserName: MockRootUser, + QueueID: MockQueueID, + ActivatedAt: sql.NullTime{ + Time: time.Date(2023, 3, 1, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + FinishedAt: sql.NullTime{ + Time: time.Date(2023, 3, 7, 0, 0, 0, 0, time.UTC), + Valid: true, + }, + UpdatedAt: time.Date(2023, 3, 2, 12, 0, 0, 0, time.UTC), + Members: []schema.Member{ + { + Replicas: 1, + Conf: schema.Conf{ + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{ + ScalarResources: map[schema.ResourceName]string{ + "nvidia.com/gpu": "2", + }, + }, + }, + }, + }, + }, + } + + //ctx := &logger.RequestContext{UserName: MockRootUser} + config.GlobalServerConfig = &config.ServerConfig{} + config.GlobalServerConfig.Job.IsSingleCluster = true + storage.Cluster.CreateCluster(&mockCluster) + storage.Queue.CreateQueue(&mockQueue1) + storage.Queue.CreateQueue(&mockQueue2) + storage.Job.CreateJob(&mockJob1) + storage.Job.CreateJob(&mockJob2) + storage.Job.CreateJob(&mockJob3) + storage.Job.CreateJob(&mockJob4) +} diff --git a/pkg/storage/interface.go b/pkg/storage/interface.go index e29debbe8..3b6d2e8f2 100644 --- a/pkg/storage/interface.go +++ b/pkg/storage/interface.go @@ -17,6 +17,8 @@ limitations under the License. package storage import ( + "time" + log "github.com/sirupsen/logrus" "gorm.io/gorm" @@ -195,8 +197,8 @@ type JobFilter struct { Labels map[string]string OrderBy string Order string - PK int64 - MaxKeys int + PK int64 // offset + MaxKeys int // limit } var ( @@ -216,6 +218,7 @@ type JobStoreInterface interface { Update(jobID string, job *model.Job) error // ListJob list job with filter ListJob(filter JobFilter) ([]model.Job, error) + ListJobStat(startDate, endDate time.Time, queueID string, limit, offset int) (map[string][]*model.Job, error) GetJobsByRunID(runID string, jobID string) ([]model.Job, error) // GetTaskByID get job task GetTaskByID(id string) (model.JobTask, error) diff --git a/pkg/storage/job.go b/pkg/storage/job.go index 943968664..42f50bb54 100644 --- a/pkg/storage/job.go +++ b/pkg/storage/job.go @@ -32,6 +32,10 @@ import ( "github.com/PaddlePaddle/PaddleFlow/pkg/model" ) +const ( + jobStatSelectColumn = "id,queue_id,user_name,created_at,activated_at,finished_at,members" +) + type JobStore struct { db *gorm.DB } @@ -331,3 +335,52 @@ func (js *JobStore) ListTaskByJobID(jobID string) ([]model.JobTask, error) { } return jobList, nil } + +func (js *JobStore) ListJobStat(startDate, endDate time.Time, queueID string, limit, offset int) (map[string][]*model.Job, error) { + JobStatMap := make(map[string][]*model.Job) + // case1 + jobStatusForCase1 := []*model.Job{} + result := js.db.Table("job").Select(jobStatSelectColumn). + Where(" queue_id = ?", queueID). + Where("activated_at <= ? and activated_at != '0000-00-00 00:00:00'", startDate). + Where("finished_at >= ? or finished_at = '0000-00-00 00:00:00'", endDate). + Limit(limit).Offset(offset).Find(&jobStatusForCase1) + if result.Error != nil { + return nil, result.Error + } + JobStatMap["case1"] = jobStatusForCase1 + // case2 + jobStatusForCase2 := []*model.Job{} + result = js.db.Table("job").Select(jobStatSelectColumn). + Where(" queue_id = ?", queueID). + Where("activated_at <= ? and activated_at != '0000-00-00 00:00:00'", startDate). + Where("finished_at <= ? and finished_at > ?", endDate, startDate). + Limit(limit).Offset(offset).Find(&jobStatusForCase2) + if result.Error != nil { + return nil, result.Error + } + JobStatMap["case2"] = jobStatusForCase2 + // case3 + jobStatusForCase3 := []*model.Job{} + result = js.db.Table("job").Select(jobStatSelectColumn). + Where(" queue_id = ?", queueID). + Where("activated_at >= ?", startDate). + Where("finished_at <= ? ", endDate). + Limit(limit).Offset(offset).Find(&jobStatusForCase3) + if result.Error != nil { + return nil, result.Error + } + JobStatMap["case3"] = jobStatusForCase3 + // case4 + jobStatusForCase4 := []*model.Job{} + result = js.db.Table("job").Select(jobStatSelectColumn). + Where(" queue_id = ?", queueID). + Where("activated_at >= ?", startDate). + Where("finished_at >= ? or finished_at = '0000-00-00 00:00:00'", endDate). + Limit(limit).Offset(offset).Find(&jobStatusForCase4) + if result.Error != nil { + return nil, result.Error + } + JobStatMap["case4"] = jobStatusForCase4 + return JobStatMap, nil +}