From c9fb429631a1c9f12247459d4f1bf129fe99537d Mon Sep 17 00:00:00 2001 From: John Date: Fri, 3 Jan 2025 13:53:56 -0700 Subject: [PATCH] cleanup --- pkg/scheduler/pool_external.go | 3 ++- pkg/worker/cr.go | 6 ++---- pkg/worker/events.go | 3 ++- pkg/worker/lifecycle.go | 10 ++++------ 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/pool_external.go b/pkg/scheduler/pool_external.go index 3c8522da3..18aa16466 100644 --- a/pkg/scheduler/pool_external.go +++ b/pkg/scheduler/pool_external.go @@ -275,7 +275,8 @@ func (wpc *ExternalWorkerPoolController) createWorkerJob(workerId, machineId str workerImage := fmt.Sprintf("%s/%s:%s", wpc.config.Worker.ImageRegistry, wpc.config.Worker.ImageName, - wpc.config.Worker.ImageTag, + // wpc.config.Worker.ImageTag, + "devel", ) resources := corev1.ResourceRequirements{} diff --git a/pkg/worker/cr.go b/pkg/worker/cr.go index 03903a169..57770c042 100644 --- a/pkg/worker/cr.go +++ b/pkg/worker/cr.go @@ -187,10 +187,8 @@ func (s *Worker) waitForRestoredContainer(ctx context.Context, containerId strin return exitCode } - gpuDeviceIds, _ := s.containerCudaManager.GetContainerGPUDevices(containerId) - - go s.collectAndSendContainerMetrics(ctx, request, spec, pid, gpuDeviceIds) // Capture resource usage (cpu/mem/gpu) - go s.watchOOMEvents(ctx, containerId, outputLogger) // Watch for OOM events + go s.collectAndSendContainerMetrics(ctx, request, spec, pid) // Capture resource usage (cpu/mem/gpu) + go s.watchOOMEvents(ctx, containerId, outputLogger) // Watch for OOM events ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() diff --git a/pkg/worker/events.go b/pkg/worker/events.go index a1bcd8063..11ff1c6ca 100644 --- a/pkg/worker/events.go +++ b/pkg/worker/events.go @@ -12,10 +12,11 @@ import ( "github.com/shirou/gopsutil/v4/process" ) -func (w *Worker) collectAndSendContainerMetrics(ctx context.Context, request *types.ContainerRequest, spec *specs.Spec, containerPid int, gpuDeviceIds []int) { +func (w *Worker) collectAndSendContainerMetrics(ctx context.Context, request *types.ContainerRequest, spec *specs.Spec, containerPid int) { ticker := time.NewTicker(w.config.Monitoring.ContainerMetricsInterval) defer ticker.Stop() + gpuDeviceIds, _ := w.containerCudaManager.GetContainerGPUDevices(request.ContainerId) monitor := NewProcessMonitor(containerPid, spec.Linux.Resources.Devices, gpuDeviceIds) for { diff --git a/pkg/worker/lifecycle.go b/pkg/worker/lifecycle.go index 06d9b582c..1b500c182 100644 --- a/pkg/worker/lifecycle.go +++ b/pkg/worker/lifecycle.go @@ -574,13 +574,11 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output return } - gpuDeviceIds, _ := s.containerCudaManager.GetContainerGPUDevices(containerId) - - exitCode = s.wait(ctx, containerId, startedChan, outputLogger, request, spec, gpuDeviceIds) + exitCode = s.wait(ctx, containerId, startedChan, outputLogger, request, spec) } // Wait for a container to exit and return the exit code -func (s *Worker) wait(ctx context.Context, containerId string, startedChan chan int, outputLogger *slog.Logger, request *types.ContainerRequest, spec *specs.Spec, gpuDeviceIds []int) int { +func (s *Worker) wait(ctx context.Context, containerId string, startedChan chan int, outputLogger *slog.Logger, request *types.ContainerRequest, spec *specs.Spec) int { <-startedChan // Clean up runc container state and send final output message @@ -605,8 +603,8 @@ func (s *Worker) wait(ctx context.Context, containerId string, startedChan chan pid := state.Pid // Start monitoring the container - go s.collectAndSendContainerMetrics(ctx, request, spec, pid, gpuDeviceIds) // Capture resource usage (cpu/mem/gpu) - go s.watchOOMEvents(ctx, containerId, outputLogger) // Watch for OOM events + go s.collectAndSendContainerMetrics(ctx, request, spec, pid) // Capture resource usage (cpu/mem/gpu) + go s.watchOOMEvents(ctx, containerId, outputLogger) // Watch for OOM events process, err := os.FindProcess(pid) if err != nil {