Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jsun-m committed Jan 3, 2025
1 parent ca454cc commit c9fb429
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 12 deletions.
3 changes: 2 additions & 1 deletion pkg/scheduler/pool_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ func (wpc *ExternalWorkerPoolController) createWorkerJob(workerId, machineId str
workerImage := fmt.Sprintf("%s/%s:%s",
wpc.config.Worker.ImageRegistry,
wpc.config.Worker.ImageName,
wpc.config.Worker.ImageTag,
// wpc.config.Worker.ImageTag,
"devel",
)

resources := corev1.ResourceRequirements{}
Expand Down
6 changes: 2 additions & 4 deletions pkg/worker/cr.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,8 @@ func (s *Worker) waitForRestoredContainer(ctx context.Context, containerId strin
return exitCode
}

gpuDeviceIds, _ := s.containerCudaManager.GetContainerGPUDevices(containerId)

go s.collectAndSendContainerMetrics(ctx, request, spec, pid, gpuDeviceIds) // Capture resource usage (cpu/mem/gpu)
go s.watchOOMEvents(ctx, containerId, outputLogger) // Watch for OOM events
go s.collectAndSendContainerMetrics(ctx, request, spec, pid) // Capture resource usage (cpu/mem/gpu)
go s.watchOOMEvents(ctx, containerId, outputLogger) // Watch for OOM events

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Expand Down
3 changes: 2 additions & 1 deletion pkg/worker/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
"github.com/shirou/gopsutil/v4/process"
)

func (w *Worker) collectAndSendContainerMetrics(ctx context.Context, request *types.ContainerRequest, spec *specs.Spec, containerPid int, gpuDeviceIds []int) {
func (w *Worker) collectAndSendContainerMetrics(ctx context.Context, request *types.ContainerRequest, spec *specs.Spec, containerPid int) {
ticker := time.NewTicker(w.config.Monitoring.ContainerMetricsInterval)
defer ticker.Stop()

gpuDeviceIds, _ := w.containerCudaManager.GetContainerGPUDevices(request.ContainerId)
monitor := NewProcessMonitor(containerPid, spec.Linux.Resources.Devices, gpuDeviceIds)

for {
Expand Down
10 changes: 4 additions & 6 deletions pkg/worker/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,11 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output
return
}

gpuDeviceIds, _ := s.containerCudaManager.GetContainerGPUDevices(containerId)

exitCode = s.wait(ctx, containerId, startedChan, outputLogger, request, spec, gpuDeviceIds)
exitCode = s.wait(ctx, containerId, startedChan, outputLogger, request, spec)
}

// Wait for a container to exit and return the exit code
func (s *Worker) wait(ctx context.Context, containerId string, startedChan chan int, outputLogger *slog.Logger, request *types.ContainerRequest, spec *specs.Spec, gpuDeviceIds []int) int {
func (s *Worker) wait(ctx context.Context, containerId string, startedChan chan int, outputLogger *slog.Logger, request *types.ContainerRequest, spec *specs.Spec) int {
<-startedChan

// Clean up runc container state and send final output message
Expand All @@ -605,8 +603,8 @@ func (s *Worker) wait(ctx context.Context, containerId string, startedChan chan
pid := state.Pid

// Start monitoring the container
go s.collectAndSendContainerMetrics(ctx, request, spec, pid, gpuDeviceIds) // Capture resource usage (cpu/mem/gpu)
go s.watchOOMEvents(ctx, containerId, outputLogger) // Watch for OOM events
go s.collectAndSendContainerMetrics(ctx, request, spec, pid) // Capture resource usage (cpu/mem/gpu)
go s.watchOOMEvents(ctx, containerId, outputLogger) // Watch for OOM events

process, err := os.FindProcess(pid)
if err != nil {
Expand Down

0 comments on commit c9fb429

Please sign in to comment.