Skip to content

Commit

Permalink
feat(job): update kubeflow build (PaddlePaddle#1303)
Browse files Browse the repository at this point in the history
* feat(job): update kubeflow build

* update PodTemplate build

* update error handle

* remove BuildPodSpec

* set limit with request
  • Loading branch information
D0m021ng authored Jan 4, 2024
1 parent 067e7f6 commit b3f7efd
Show file tree
Hide file tree
Showing 13 changed files with 578 additions and 377 deletions.
9 changes: 9 additions & 0 deletions pkg/job/api/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ func (pfj *PFJob) GetID() string {
return pfj.ID
}

func (pfj *PFJob) GetMember(roleName schema.MemberRole) schema.Member {
for _, member := range pfj.Tasks {
if member.Role == roleName {
return member
}
}
return schema.Member{}
}

type JobSyncInfo struct {
ID string
Namespace string
Expand Down
3 changes: 2 additions & 1 deletion pkg/job/runtime_v2/job/aitraining/kube_aitraining_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func (pj *KubeAITrainingJob) patchReplicaSpec(rs *v1.ReplicaSpec, task pfschema.
rs.Replicas = &replicas
}
// patch fs
return kuberuntime.BuildPodTemplateSpec(&rs.Template, jobID, &task)
kuberuntime.NewPodTemplateSpecBuilder(&rs.Template, jobID).Build(task)
return nil
}

func (pj *KubeAITrainingJob) builtinAITrainingJob(pdj *v1.TrainingJobSpec, job *api.PFJob) error {
Expand Down
22 changes: 12 additions & 10 deletions pkg/job/runtime_v2/job/mpi/kube_mpi_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,38 @@ func (mj *KubeMPIJob) builtinMPIJobSpec(mpiJobSpec *mpiv1.MPIJobSpec, job *api.P
replicaType = mpiv1.MPIReplicaTypeWorker
}
replicaSpec := mpiJobSpec.MPIReplicaSpecs[replicaType]
if err := kuberuntime.KubeflowReplicaSpec(replicaSpec, job.ID, &task); err != nil {
log.Errorf("build %s RepilcaSpec for %s failed, err: %v", replicaType, mj.String(jobName), err)
return err
}
// build kubeflowReplicaSpec
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, replicaSpec).ReplicaSpec(task)
// calculate job minResources
taskResources, _ := resources.NewResourceFromMap(task.Flavour.ToMap())
taskResources.Multi(task.Replicas)
minResources.Add(taskResources)
}
// set RunPolicy
resourceList := k8s.NewResourceList(minResources)
return kuberuntime.KubeflowRunPolicy(&mpiJobSpec.RunPolicy, &resourceList, job.Conf.GetQueueName(), job.Conf.GetPriority())
kuberuntime.NewKubeflowJobBuilder(job.ID, &mpiJobSpec.RunPolicy, nil).
RunPolicy(&resourceList, job.Conf.GetQueueName(), job.Conf.GetPriority())
return nil
}

// customMPIJobSpec set custom MPIJob Spec
func (mj *KubeMPIJob) customMPIJobSpec(mpiJobSpec *mpiv1.MPIJobSpec, job *api.PFJob) error {
jobName := job.NamespacedName()
log.Debugf("patch %s spec:%#v", mj.String(jobName), mpiJobSpec)
// patch metadata
ps, find := mpiJobSpec.MPIReplicaSpecs[mpiv1.MPIReplicaTypeLauncher]
if find && ps != nil {
kuberuntime.BuildTaskMetadata(&ps.Template.ObjectMeta, job.ID, &pfschema.Conf{})
master, find := mpiJobSpec.MPIReplicaSpecs[mpiv1.MPIReplicaTypeLauncher]
if find && master != nil {
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, master).ReplicaSpec(job.GetMember(pfschema.RoleMaster))
}
worker, find := mpiJobSpec.MPIReplicaSpecs[mpiv1.MPIReplicaTypeWorker]
if find && worker != nil {
kuberuntime.BuildTaskMetadata(&worker.Template.ObjectMeta, job.ID, &pfschema.Conf{})
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, worker).ReplicaSpec(job.GetMember(pfschema.RoleWorker))
}
// TODO: patch mpi job from user
// check RunPolicy
return kuberuntime.KubeflowRunPolicy(&mpiJobSpec.RunPolicy, nil, job.Conf.GetQueueName(), job.Conf.GetPriority())
kuberuntime.NewKubeflowJobBuilder(job.ID, &mpiJobSpec.RunPolicy, nil).
RunPolicy(nil, job.Conf.GetQueueName(), job.Conf.GetPriority())
return nil
}

func (mj *KubeMPIJob) AddEventListener(ctx context.Context, listenerType string, jobQueue workqueue.RateLimitingInterface, listener interface{}) error {
Expand Down
29 changes: 1 addition & 28 deletions pkg/job/runtime_v2/job/mpi/kube_mpi_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,33 +269,6 @@ func TestMPIJob_CreateJob(t *testing.T) {
expectErr: "err",
wantErr: false,
},
{
caseName: "Member absent",
jobObj: &api.PFJob{
Name: "test-mpi-job",
ID: uuid.GenerateIDWithLength("job", 5),
Namespace: "default",
JobType: pfschema.TypeDistributed,
JobMode: pfschema.EnvJobModePS,
Framework: pfschema.FrameworkMPI,
Conf: pfschema.Conf{
Name: "normal",
Command: "sleep 200",
Image: "mockImage",
},
Tasks: []pfschema.Member{
{
Replicas: 1,
Role: pfschema.RoleMaster,
Conf: pfschema.Conf{
Flavour: pfschema.Flavour{Name: "", ResourceInfo: pfschema.ResourceInfo{CPU: "-1", Mem: "4Gi"}},
},
},
},
},
expectErr: "negative resources not permitted: map[cpu:-1 memory:4Gi]",
wantErr: true,
},
{
caseName: "flavour wrong",
jobObj: &api.PFJob{
Expand All @@ -321,7 +294,7 @@ func TestMPIJob_CreateJob(t *testing.T) {
},
},
expectErr: "quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'",
wantErr: true,
wantErr: false,
},
{
caseName: "ExtensionTemplate",
Expand Down
12 changes: 6 additions & 6 deletions pkg/job/runtime_v2/job/paddle/kube_paddle_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ func patchCustomPaddleTask(rSpec *paddlejobv1.ResourceSpec, task pfschema.Member
if rSpec.Replicas <= 0 {
rSpec.Replicas = kuberuntime.DefaultReplicas
}
kuberuntime.BuildTaskMetadata(&rSpec.Template.ObjectMeta, jobID, &task.Conf)
// build pod spec
return kuberuntime.BuildPodSpec(&rSpec.Template.Spec, task)
// build pod template
kuberuntime.NewPodTemplateSpecBuilder(&rSpec.Template, jobID).Build(task)
return nil
}

func (pj *KubePaddleJob) buildSchedulingPolicy(pdjSpec *paddlejobv1.PaddleJobSpec, jobConf pfschema.PFJobConf) error {
Expand Down Expand Up @@ -229,9 +229,9 @@ func (pj *KubePaddleJob) patchPaddleTask(resourceSpec *paddlejobv1.ResourceSpec,
if task.Name == "" {
task.Name = uuid.GenerateIDWithLength(jobID, 3)
}
kuberuntime.BuildTaskMetadata(&resourceSpec.Template.ObjectMeta, jobID, &task.Conf)
// build pod spec
return kuberuntime.BuildPodSpec(&resourceSpec.Template.Spec, task)
// build pod template spec
kuberuntime.NewPodTemplateSpecBuilder(&resourceSpec.Template, jobID).Build(task)
return nil
}

func (pj *KubePaddleJob) AddEventListener(ctx context.Context, listenerType string, jobQueue workqueue.RateLimitingInterface, listener interface{}) error {
Expand Down
18 changes: 10 additions & 8 deletions pkg/job/runtime_v2/job/paddle/kubeflow_paddle_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ func (pj *KubeKFPaddleJob) builtinPaddleJobSpec(jobSpec *paddlev1.PaddleJobSpec,
if !ok {
return fmt.Errorf("replica type %s for %s is not supported", replicaType, pj.String(jobName))
}
if err := kuberuntime.KubeflowReplicaSpec(replicaSpec, job.ID, &task); err != nil {
log.Errorf("build %s RepilcaSpec for %s failed, err: %v", replicaType, pj.String(jobName), err)
return err
}
// build kubeflowReplicaSpec
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, replicaSpec).ReplicaSpec(task)
// calculate job minResources
taskResources, err := resources.NewResourceFromMap(task.Flavour.ToMap())
if err != nil {
Expand All @@ -115,7 +113,9 @@ func (pj *KubeKFPaddleJob) builtinPaddleJobSpec(jobSpec *paddlev1.PaddleJobSpec,
}
// set RunPolicy
resourceList := k8s.NewResourceList(minResources)
return kuberuntime.KubeflowRunPolicy(&jobSpec.RunPolicy, &resourceList, job.Conf.GetQueueName(), job.Conf.GetPriority())
kuberuntime.NewKubeflowJobBuilder(job.ID, &jobSpec.RunPolicy, nil).
RunPolicy(&resourceList, job.Conf.GetQueueName(), job.Conf.GetPriority())
return nil
}

func (pj *KubeKFPaddleJob) customPaddleJobSpec(jobSpec *paddlev1.PaddleJobSpec, job *api.PFJob) error {
Expand All @@ -127,15 +127,17 @@ func (pj *KubeKFPaddleJob) customPaddleJobSpec(jobSpec *paddlev1.PaddleJobSpec,
// patch metadata
ps, find := jobSpec.PaddleReplicaSpecs[paddlev1.PaddleJobReplicaTypeMaster]
if find && ps != nil {
kuberuntime.BuildTaskMetadata(&ps.Template.ObjectMeta, job.ID, &pfschema.Conf{})
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, ps).ReplicaSpec(job.GetMember(pfschema.RoleMaster))
}
worker, find := jobSpec.PaddleReplicaSpecs[paddlev1.PaddleJobReplicaTypeWorker]
if find && worker != nil {
kuberuntime.BuildTaskMetadata(&worker.Template.ObjectMeta, job.ID, &pfschema.Conf{})
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, worker).ReplicaSpec(job.GetMember(pfschema.RoleWorker))
}
// TODO: patch paddle job from user
// check RunPolicy
return kuberuntime.KubeflowRunPolicy(&jobSpec.RunPolicy, nil, job.Conf.GetQueueName(), job.Conf.GetPriority())
kuberuntime.NewKubeflowJobBuilder(job.ID, &jobSpec.RunPolicy, nil).
RunPolicy(nil, job.Conf.GetQueueName(), job.Conf.GetPriority())
return nil
}

func (pj *KubeKFPaddleJob) AddEventListener(ctx context.Context, listenerType string, jobQueue workqueue.RateLimitingInterface, listener interface{}) error {
Expand Down
22 changes: 12 additions & 10 deletions pkg/job/runtime_v2/job/pytorch/kube_pytorch_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,8 @@ func (pj *KubePyTorchJob) builtinPyTorchJobSpec(torchJobSpec *pytorchv1.PyTorchJ
if !ok {
return fmt.Errorf("replica type %s for %s is not supported", replicaType, pj.String(jobName))
}
if err := kuberuntime.KubeflowReplicaSpec(replicaSpec, job.ID, &task); err != nil {
log.Errorf("build %s RepilcaSpec for %s failed, err: %v", replicaType, pj.String(jobName), err)
return err
}
// build kubeflowReplicaSpec
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, replicaSpec).ReplicaSpec(task)
// calculate job minResources
taskResources, err := resources.NewResourceFromMap(task.Flavour.ToMap())
if err != nil {
Expand All @@ -116,7 +114,9 @@ func (pj *KubePyTorchJob) builtinPyTorchJobSpec(torchJobSpec *pytorchv1.PyTorchJ
}
// set RunPolicy
resourceList := k8s.NewResourceList(minResources)
return kuberuntime.KubeflowRunPolicy(&torchJobSpec.RunPolicy, &resourceList, job.Conf.GetQueueName(), job.Conf.GetPriority())
kuberuntime.NewKubeflowJobBuilder(job.ID, &torchJobSpec.RunPolicy, nil).
RunPolicy(&resourceList, job.Conf.GetQueueName(), job.Conf.GetPriority())
return nil
}

// customPyTorchJobSpec set custom PyTorchJob Spec
Expand All @@ -127,17 +127,19 @@ func (pj *KubePyTorchJob) customPyTorchJobSpec(torchJobSpec *pytorchv1.PyTorchJo
jobName := job.NamespacedName()
log.Debugf("patch %s spec:%#v", pj.String(jobName), torchJobSpec)
// patch metadata
ps, find := torchJobSpec.PyTorchReplicaSpecs[pytorchv1.PyTorchReplicaTypeMaster]
if find && ps != nil {
kuberuntime.BuildTaskMetadata(&ps.Template.ObjectMeta, job.ID, &pfschema.Conf{})
master, find := torchJobSpec.PyTorchReplicaSpecs[pytorchv1.PyTorchReplicaTypeMaster]
if find && master != nil {
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, master).ReplicaSpec(job.GetMember(pfschema.RoleMaster))
}
worker, find := torchJobSpec.PyTorchReplicaSpecs[pytorchv1.PyTorchReplicaTypeWorker]
if find && worker != nil {
kuberuntime.BuildTaskMetadata(&worker.Template.ObjectMeta, job.ID, &pfschema.Conf{})
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, worker).ReplicaSpec(job.GetMember(pfschema.RoleWorker))
}
// TODO: patch pytorch job from user
// check RunPolicy
return kuberuntime.KubeflowRunPolicy(&torchJobSpec.RunPolicy, nil, job.Conf.GetQueueName(), job.Conf.GetPriority())
kuberuntime.NewKubeflowJobBuilder(job.ID, &torchJobSpec.RunPolicy, nil).
RunPolicy(nil, job.Conf.GetQueueName(), job.Conf.GetPriority())
return nil
}

func (pj *KubePyTorchJob) AddEventListener(ctx context.Context, listenerType string, jobQueue workqueue.RateLimitingInterface, listener interface{}) error {
Expand Down
16 changes: 6 additions & 10 deletions pkg/job/runtime_v2/job/ray/kube_ray_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ func (rj *KubeRayJob) buildHeadPod(rayJobSpec *rayV1alpha1.RayJobSpec, jobID str
task.Command = ""
task.Args = []string{}
// Template
if err := kuberuntime.BuildPodTemplateSpec(&headGroupSpec.Template, jobID, &task); err != nil {
log.Errorf("build head pod spec failed, err:%v", err)
return err
}
kuberuntime.NewPodTemplateSpecBuilder(&headGroupSpec.Template, jobID).Build(task)
// patch queue name
headGroupSpec.Template.Labels[pfschema.QueueLabelKey] = task.QueueName
headGroupSpec.Template.Annotations[pfschema.QueueLabelKey] = task.QueueName
Expand Down Expand Up @@ -217,10 +214,7 @@ func (rj *KubeRayJob) buildWorkerPod(rayJobSpec *rayV1alpha1.RayJobSpec, jobID s
task.Command = ""
task.Args = []string{}
// Template
if err := kuberuntime.BuildPodTemplateSpec(&worker.Template, jobID, &task); err != nil {
log.Errorf("build head pod spec failed, err: %v", err)
return err
}
kuberuntime.NewPodTemplateSpecBuilder(&worker.Template, jobID).Build(task)
// patch queue name
worker.Template.Labels[pfschema.QueueLabelKey] = task.QueueName
worker.Template.Annotations[pfschema.QueueLabelKey] = task.QueueName
Expand Down Expand Up @@ -266,9 +260,11 @@ func (rj *KubeRayJob) customRayJobSpec(rayJobSpec *rayV1alpha1.RayJobSpec, job *
jobName := job.NamespacedName()
log.Debugf("patch %s spec:%#v", rj.String(jobName), rayJobSpec)
// patch metadata
kuberuntime.BuildTaskMetadata(&rayJobSpec.RayClusterSpec.HeadGroupSpec.Template.ObjectMeta, job.ID, &pfschema.Conf{})
kuberuntime.NewPodTemplateSpecBuilder(&rayJobSpec.RayClusterSpec.HeadGroupSpec.Template, job.ID).
Build(job.GetMember(pfschema.RoleMaster))
for i := range rayJobSpec.RayClusterSpec.WorkerGroupSpecs {
kuberuntime.BuildTaskMetadata(&rayJobSpec.RayClusterSpec.WorkerGroupSpecs[i].Template.ObjectMeta, job.ID, &pfschema.Conf{})
kuberuntime.NewPodTemplateSpecBuilder(&rayJobSpec.RayClusterSpec.WorkerGroupSpecs[i].Template, job.ID).
Build(job.GetMember(pfschema.RoleWorker))
}
// TODO: patch ray job from user
return nil
Expand Down
18 changes: 10 additions & 8 deletions pkg/job/runtime_v2/job/tensorflow/kube_tensorflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,8 @@ func (pj *KubeTFJob) builtinTFJobSpec(tfJobSpec *tfv1.TFJobSpec, job *api.PFJob)
if !ok {
return fmt.Errorf("replica type %s for %s is not supported", replicaType, pj.String(jobName))
}
if err := kuberuntime.KubeflowReplicaSpec(replicaSpec, job.ID, &task); err != nil {
log.Errorf("build %s RepilcaSpec for %s failed, err: %v", replicaType, pj.String(jobName), err)
return err
}
// build kubeflowReplicaSpec
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, replicaSpec).ReplicaSpec(task)
// calculate job minResources
taskResources, err := resources.NewResourceFromMap(task.Flavour.ToMap())
if err != nil {
Expand All @@ -118,7 +116,9 @@ func (pj *KubeTFJob) builtinTFJobSpec(tfJobSpec *tfv1.TFJobSpec, job *api.PFJob)
}
// set RunPolicy
resourceList := k8s.NewResourceList(minResources)
return kuberuntime.KubeflowRunPolicy(&tfJobSpec.RunPolicy, &resourceList, job.Conf.GetQueueName(), job.Conf.GetPriority())
kuberuntime.NewKubeflowJobBuilder(job.ID, &tfJobSpec.RunPolicy, nil).
RunPolicy(&resourceList, job.Conf.GetQueueName(), job.Conf.GetPriority())
return nil
}

// customTFJobSpec set custom TFJob Spec
Expand All @@ -131,15 +131,17 @@ func (pj *KubeTFJob) customTFJobSpec(tfJobSpec *tfv1.TFJobSpec, job *api.PFJob)
// patch metadata
ps, find := tfJobSpec.TFReplicaSpecs[tfv1.TFReplicaTypePS]
if find && ps != nil {
kuberuntime.BuildTaskMetadata(&ps.Template.ObjectMeta, job.ID, &pfschema.Conf{})
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, ps).ReplicaSpec(job.GetMember(pfschema.RolePServer))
}
worker, find := tfJobSpec.TFReplicaSpecs[tfv1.TFReplicaTypeWorker]
if find && worker != nil {
kuberuntime.BuildTaskMetadata(&worker.Template.ObjectMeta, job.ID, &pfschema.Conf{})
kuberuntime.NewKubeflowJobBuilder(job.ID, nil, worker).ReplicaSpec(job.GetMember(pfschema.RoleWorker))
}
// TODO: patch pytorch job from user
// check RunPolicy
return kuberuntime.KubeflowRunPolicy(&tfJobSpec.RunPolicy, nil, job.Conf.GetQueueName(), job.Conf.GetPriority())
kuberuntime.NewKubeflowJobBuilder(job.ID, &tfJobSpec.RunPolicy, nil).
RunPolicy(nil, job.Conf.GetQueueName(), job.Conf.GetPriority())
return nil
}

func (pj *KubeTFJob) AddEventListener(ctx context.Context, listenerType string, jobQueue workqueue.RateLimitingInterface, listener interface{}) error {
Expand Down
Loading

0 comments on commit b3f7efd

Please sign in to comment.