Skip to content

Commit

Permalink
feat(job): start job controller when cluster has queues (PaddlePaddle…
Browse files Browse the repository at this point in the history
…#1312)

* feat(job): start job controller when cluster has queues

* add more ut

* add ut
  • Loading branch information
D0m021ng authored Jan 12, 2024
1 parent 268b57c commit a2e1f67
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 44 deletions.
5 changes: 0 additions & 5 deletions pkg/common/schema/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,12 @@ const (
EnvJobUserName = "PF_USER_NAME"
EnvJobMode = "PF_JOB_MODE"
EnvJobFramework = "PF_JOB_FRAMEWORK"
// EnvJobYamlPath Additional configuration for a specific job
EnvJobYamlPath = "PF_JOB_YAML_PATH"
EnvIsCustomYaml = "PF_IS_CUSTOM_YAML"
// EnvJobWorkDir The working directory of the job, `null` means command without a working directory
EnvJobWorkDir = "PF_WORK_DIR"
EnvMountPath = "PF_MOUNT_PATH"

EnvJobRestartPolicy = "PF_JOB_RESTART_POLICY"

EnvEnableJobQueueSync = "PF_JOB_QUEUE_SYNC"

// EnvJobModePS env
EnvJobModePS = "PS"
// EnvJobModeCollective env
Expand Down
28 changes: 13 additions & 15 deletions pkg/job/runtime_v2/k3s_runtime_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/base64"
"errors"
"fmt"
"os"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -43,6 +42,7 @@ import (
"github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2/client"
"github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2/controller"
"github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2/framework"
"github.com/PaddlePaddle/PaddleFlow/pkg/storage"
"github.com/PaddlePaddle/PaddleFlow/pkg/trace_logger"
)

Expand Down Expand Up @@ -150,12 +150,18 @@ func (k3srs *K3SRuntimeService) Client() framework.RuntimeClientInterface {
}

func (k3srs *K3SRuntimeService) SyncController(stopCh <-chan struct{}) {
log.Infof("start job/queue controller on %s", k3srs.String())
var err error
jobQueueSync := os.Getenv(pfschema.EnvEnableJobQueueSync)
if jobQueueSync == "false" {
log.Warnf("skip job and queue syn controller on %s", k3srs.String())
} else {
// 1. init node resource controller
nodeResourceController := controller.NewNodeResourceSync()
err := nodeResourceController.Initialize(k3srs.client)
if err != nil {
log.Errorf("init node resource controller on %s failed, err: %v", k3srs.String(), err)
return
}
go nodeResourceController.Run(stopCh)

// 2. start job controller if the cluster has queues
queues := storage.Queue.ListQueuesByCluster(k3srs.cluster.ID)
if len(queues) > 0 {
jobController := controller.NewJobSync()
err = jobController.Initialize(k3srs.client)
if err != nil {
Expand All @@ -164,14 +170,6 @@ func (k3srs *K3SRuntimeService) SyncController(stopCh <-chan struct{}) {
}
go jobController.Run(stopCh)
}

nodeResourceController := controller.NewNodeResourceSync()
err = nodeResourceController.Initialize(k3srs.client)
if err != nil {
log.Errorf("init node resource controller on %s failed, err: %v", k3srs.String(), err)
return
}
go nodeResourceController.Run(stopCh)
}

func (k3srs *K3SRuntimeService) getNodeName() (string, error) {
Expand Down
21 changes: 19 additions & 2 deletions pkg/job/runtime_v2/k3s_runtime_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,25 @@ func TestK3SRuntime_SyncController(t *testing.T) {
cluster: &schema.Cluster{Name: "test-cluster", Type: schema.K3SType},
client: kubeClient,
}
ch := make(chan struct{})
kubeRuntime.SyncController(ch)
driver.InitMockDB()
err := storage.Queue.CreateQueue(&model.Queue{
Name: "default-queue",
Status: schema.StatusQueueOpen,
})
assert.Equal(t, nil, err)

t.Run("sync controller", func(t *testing.T) {
ch := make(chan struct{})
defer close(ch)
kubeRuntime.SyncController(ch)
})

t.Run("sync controller failed", func(t *testing.T) {
ch := make(chan struct{})
defer close(ch)
kubeRuntime.client = nil
kubeRuntime.SyncController(ch)
})
}

func TestK3SRuntime_GetLog(t *testing.T) {
Expand Down
47 changes: 25 additions & 22 deletions pkg/job/runtime_v2/kubernetes_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"sort"
"strconv"

Expand Down Expand Up @@ -52,6 +51,7 @@ import (
"github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2/framework"
_ "github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2/job"
_ "github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2/queue"
"github.com/PaddlePaddle/PaddleFlow/pkg/model"
"github.com/PaddlePaddle/PaddleFlow/pkg/storage"
"github.com/PaddlePaddle/PaddleFlow/pkg/trace_logger"
)
Expand Down Expand Up @@ -251,35 +251,38 @@ func (kr *KubeRuntime) Queue(fwVersion pfschema.KindGroupVersion) framework.Queu
}

func (kr *KubeRuntime) SyncController(stopCh <-chan struct{}) {
log.Infof("start job/queue controller on %s", kr.String())
var err error
jobQueueSync := os.Getenv(pfschema.EnvEnableJobQueueSync)
if jobQueueSync == "false" {
log.Warnf("skip job and queue syn controller on %s", kr.String())
} else {
jobController := controller.NewJobSync()
err = jobController.Initialize(kr.kubeClient)
if err != nil {
log.Errorf("init job controller on %s failed, err: %v", kr.String(), err)
return
}
queueController := controller.NewQueueSync()
err = queueController.Initialize(kr.kubeClient)
if err != nil {
log.Errorf("init queue controller on %s failed, err: %v", kr.String(), err)
return
}
go jobController.Run(stopCh)
go queueController.Run(stopCh)
// 1. start queue controller
queueController := controller.NewQueueSync()
err := queueController.Initialize(kr.kubeClient)
if err != nil {
log.Errorf("init queue controller on %s failed, err: %v", kr.String(), err)
return
}
go queueController.Run(stopCh)
log.Infof("start queue controller on %s", kr.String())

// 2. star node resource controller
nodeResourceController := controller.NewNodeResourceSync()
err = nodeResourceController.Initialize(kr.kubeClient)
if err != nil {
log.Errorf("init node resource controller on %s failed, err: %v", kr.String(), err)
return
}
go nodeResourceController.Run(stopCh)
log.Infof("start nodeResources controller on %s", kr.String())

// 3. start job controller if the cluster has queues
queues := storage.Queue.ListQueuesByCluster(kr.cluster.ID)
if len(queues) > 0 {
jobController := controller.NewJobSync()
err = jobController.Initialize(kr.kubeClient)
if err != nil {
log.Errorf("init job controller on %s failed, err: %v", kr.String(), err)
return
}
go jobController.Run(stopCh)
log.Infof("start job controller on %s", kr.String())
}
}

func (kr *KubeRuntime) Client() framework.RuntimeClientInterface {
Expand Down Expand Up @@ -676,7 +679,7 @@ func formatAllEventLogs(events []corev1.Event, logPage utils.LogPage) []string {
for _, event := range events {
// Type-Reason-Timestamp-Message
str := fmt.Sprintf("type: %s\treason: %s\teventsTime: %s \tmessage: %s",
event.Type, event.Reason, event.CreationTimestamp.Format("2006-01-02 15:04:05"), event.Message)
event.Type, event.Reason, event.CreationTimestamp.Format(model.TimeFormat), event.Message)
formatedEvents = append(formatedEvents, str)
}
formatedEvents = logPage.SlicePaging(formatedEvents)
Expand Down
32 changes: 32 additions & 0 deletions pkg/job/runtime_v2/kubernetes_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,38 @@ func TestKubeRuntimeJob(t *testing.T) {
t.SkipNow()
}

func TestKubeRuntimeSyncController(t *testing.T) {
driver.InitMockDB()
config.GlobalServerConfig = &config.ServerConfig{}
err := storage.Queue.CreateQueue(&model.Queue{
Name: "default-queue",
Status: schema.StatusQueueOpen,
})
assert.Equal(t, nil, err)

var server = httptest.NewServer(k8s.DiscoveryHandlerFunc)
defer server.Close()

kubeClient := client.NewFakeKubeRuntimeClient(server)
kubeRuntime := &KubeRuntime{
cluster: schema.Cluster{Name: "test-cluster", Type: "Kubernetes"},
kubeClient: kubeClient,
}

t.Run("test sync controller", func(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
kubeRuntime.SyncController(stopCh)
})

t.Run("sync controller failed", func(t *testing.T) {
ch := make(chan struct{})
defer close(ch)
kubeRuntime.kubeClient = nil
kubeRuntime.SyncController(ch)
})
}

func TestKubeRuntimePVAndPVC(t *testing.T) {
var server = httptest.NewServer(k8s.DiscoveryHandlerFunc)
defer server.Close()
Expand Down

0 comments on commit a2e1f67

Please sign in to comment.