From ef3cf5c08c925c1d5fa153ca5fccc53293a56c2e Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Thu, 29 Jun 2023 15:20:42 -0500 Subject: [PATCH 01/18] Update CreateJobs --- internal/armada/server/submit.go | 53 +++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/internal/armada/server/submit.go b/internal/armada/server/submit.go index 0acd6969582..ab863f409fa 100644 --- a/internal/armada/server/submit.go +++ b/internal/armada/server/submit.go @@ -253,10 +253,14 @@ func (server *SubmitServer) DeleteQueue(ctx context.Context, request *api.QueueD func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error) { principal := authorization.GetPrincipal(ctx) - jobs, e := server.createJobs(req, principal.GetName(), principal.GetGroupNames()) + jobs, responseItems, e := server.createJobs(req, principal.GetName(), principal.GetGroupNames()) if e != nil { + result := &api.JobSubmitResponse{ + JobResponseItems: responseItems, + } + reqJson, _ := json.Marshal(req) - return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e) + return result, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e) } if err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil { return nil, err @@ -746,16 +750,16 @@ func (server *SubmitServer) getQueueOrCreate(ctx context.Context, queueName stri // createJobs returns a list of objects representing the jobs in a JobSubmitRequest. // This function validates the jobs in the request and the pod specs. in each job. // If any job or pod in invalid, an error is returned. -func (server *SubmitServer) createJobs(request *api.JobSubmitRequest, owner string, ownershipGroups []string) ([]*api.Job, error) { +func (server *SubmitServer) createJobs(request *api.JobSubmitRequest, owner string, ownershipGroups []string) ([]*api.Job, []*api.JobSubmitResponseItem, error) { return server.createJobsObjects(request, owner, ownershipGroups, time.Now, util.NewULID) } func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, owner string, ownershipGroups []string, getTime func() time.Time, getUlid func() string, -) ([]*api.Job, error) { +) ([]*api.Job, []*api.JobSubmitResponseItem, error) { compressor, err := server.compressorPool.BorrowObject(context.Background()) if err != nil { - return nil, err + return nil, nil, err } defer func(compressorPool *pool.ObjectPool, ctx context.Context, object interface{}) { err := compressorPool.ReturnObject(ctx, object) @@ -765,29 +769,44 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own }(server.compressorPool, context.Background(), compressor) compressedOwnershipGroups, err := compress.CompressStringArray(ownershipGroups, compressor.(compress.Compressor)) if err != nil { - return nil, err + return nil, nil, err } jobs := make([]*api.Job, 0, len(request.JobRequestItems)) if request.JobSetId == "" { - return nil, errors.Errorf("[createJobs] job set not specified") + return nil, nil, errors.Errorf("[createJobs] job set not specified") } if request.Queue == "" { - return nil, errors.Errorf("[createJobs] queue not specified") + return nil, nil, errors.Errorf("[createJobs] queue not specified") } + responseItems := make([]*api.JobSubmitResponseItem, 0, len(request.JobRequestItems)) for i, item := range request.JobRequestItems { + jobId := getUlid() + if item.PodSpec != nil && len(item.PodSpecs) > 0 { - return nil, errors.Errorf("[createJobs] job %d in job set %s contains both podSpec and podSpecs, but may only contain either", i, request.JobSetId) + response := &api.JobSubmitResponseItem{ + JobId: jobId, + Error: fmt.Sprintf("[createJobs] job %d in job set %s contains both podSpec and podSpecs, but may only contain either", i, request.JobSetId), + } + responseItems = append(responseItems, response) } podSpec := item.GetMainPodSpec() if podSpec == nil { - return nil, errors.Errorf("[createJobs] job %d in job set %s contains no podSpec", i, request.JobSetId) + response := &api.JobSubmitResponseItem{ + JobId: jobId, + Error: fmt.Sprintf("[createJobs] job %d in job set %s contains no podSpec", i, request.JobSetId), + } + responseItems = append(responseItems, response) } if err := validation.ValidateJobSubmitRequestItem(item); err != nil { - return nil, errors.Errorf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err) + response := &api.JobSubmitResponseItem{ + JobId: jobId, + Error: fmt.Sprintf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err), + } + responseItems = append(responseItems, response) } namespace := item.Namespace if namespace == "" { @@ -796,7 +815,11 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own fillContainerRequestsAndLimits(podSpec.Containers) applyDefaultsToPodSpec(podSpec, *server.schedulingConfig) if err := validation.ValidatePodSpec(podSpec, server.schedulingConfig); err != nil { - return nil, errors.Errorf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err) + response := &api.JobSubmitResponseItem{ + JobId: jobId, + Error: fmt.Sprintf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err), + } + responseItems = append(responseItems, response) } // TODO: remove, RequiredNodeLabels is deprecated and will be removed in future versions @@ -807,7 +830,6 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own podSpec.NodeSelector[k] = v } - jobId := getUlid() enrichText(item.Labels, jobId) enrichText(item.Annotations, jobId) j := &api.Job{ @@ -837,7 +859,10 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own jobs = append(jobs, j) } - return jobs, nil + if len(responseItems) > 0 { + return nil, responseItems, errors.Errorf("[createJobs] error creating jobs, check JobSubmitResponse for details") + } + return jobs, nil, nil } func enrichText(labels map[string]string, jobId string) { From c2a479012ef6338796b1cbc6c17a7f7ed3b6769c Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Thu, 29 Jun 2023 15:37:36 -0500 Subject: [PATCH 02/18] Update CreateJobs to return a SubmitResponse with error details --- internal/armada/server/submit_to_log.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/armada/server/submit_to_log.go b/internal/armada/server/submit_to_log.go index 08f2002b99e..c54b151564c 100644 --- a/internal/armada/server/submit_to_log.go +++ b/internal/armada/server/submit_to_log.go @@ -92,9 +92,12 @@ func (srv *PulsarSubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmi // Create legacy API jobs from the requests. // We use the legacy code for the conversion to ensure that behaviour doesn't change. - apiJobs, err := srv.SubmitServer.createJobs(req, userId, groups) + apiJobs, responseItems, err := srv.SubmitServer.createJobs(req, userId, groups) if err != nil { - return nil, err + result := &api.JobSubmitResponse{ + JobResponseItems: responseItems, + } + return result, err } if err := commonvalidation.ValidateApiJobs(apiJobs, *srv.SubmitServer.schedulingConfig); err != nil { return nil, err From 5657c937dde17993e915debe8c945e80bf91cd73 Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Thu, 29 Jun 2023 17:55:05 -0500 Subject: [PATCH 03/18] Update createJobs sub functions to check jobs individually --- internal/armada/server/job_validation.go | 23 ++++++-- internal/armada/server/submit.go | 19 +++++-- internal/armada/server/submit_to_log.go | 7 ++- internal/common/validation/job.go | 71 +++++++++++++++++------- 4 files changed, 89 insertions(+), 31 deletions(-) diff --git a/internal/armada/server/job_validation.go b/internal/armada/server/job_validation.go index a584a5c1fd4..869015fd49c 100644 --- a/internal/armada/server/job_validation.go +++ b/internal/armada/server/job_validation.go @@ -1,6 +1,7 @@ package server import ( + "fmt" "github.com/pkg/errors" "github.com/armadaproject/armada/internal/armada/scheduling" @@ -13,16 +14,30 @@ import ( func validateJobsCanBeScheduled( jobs []*api.Job, allClusterSchedulingInfo map[string]*api.ClusterSchedulingInfoReport, -) (bool, error) { +) (bool, []*api.JobSubmitResponseItem, error) { activeClusterSchedulingInfo := scheduling.FilterActiveClusterSchedulingInfoReports(allClusterSchedulingInfo) + responseItems := make([]*api.JobSubmitResponseItem, 0, len(jobs)) for i, job := range jobs { if ok, err := scheduling.MatchSchedulingRequirementsOnAnyCluster(job, activeClusterSchedulingInfo); !ok { if err != nil { - return false, errors.WithMessagef(err, "%d-th job can't be scheduled", i) + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: errors.WithMessagef(err, "%d-th job can't be scheduled", i).Error(), + } + responseItems = append(responseItems, response) } else { - return false, errors.Errorf("%d-th job can't be scheduled", i) + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: fmt.Sprintf("%d-th job can't be scheduled", i), + } + responseItems = append(responseItems, response) } } } - return true, nil + + if len(responseItems) > 0 { + return false, responseItems, errors.New("[createJobs] Failed to validate jobs can be scheduled") + } + + return true, nil, nil } diff --git a/internal/armada/server/submit.go b/internal/armada/server/submit.go index ab863f409fa..ad2ca413707 100644 --- a/internal/armada/server/submit.go +++ b/internal/armada/server/submit.go @@ -262,8 +262,12 @@ func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRe reqJson, _ := json.Marshal(req) return result, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e) } - if err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil { - return nil, err + if responseItems, err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil { + result := &api.JobSubmitResponse{ + JobResponseItems: responseItems, + } + + return result, err } q, err := server.getQueueOrCreate(ctx, req.Queue) @@ -300,11 +304,14 @@ func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRe return nil, status.Errorf(codes.InvalidArgument, "error getting scheduling info: %s", err) } - if ok, err := validateJobsCanBeScheduled(jobs, allClusterSchedulingInfo); !ok { + if ok, responseItems, err := validateJobsCanBeScheduled(jobs, allClusterSchedulingInfo); !ok { + result := &api.JobSubmitResponse{ + JobResponseItems: responseItems, + } if err != nil { - return nil, errors.WithMessagef(err, "can't schedule job for user %s", principal.GetName()) + return result, errors.WithMessagef(err, "can't schedule job for user %s", principal.GetName()) } - return nil, errors.Errorf("can't schedule job for user %s", principal.GetName()) + return result, errors.Errorf("can't schedule job for user %s", principal.GetName()) } // Create events marking the jobs as submitted @@ -860,7 +867,7 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own } if len(responseItems) > 0 { - return nil, responseItems, errors.Errorf("[createJobs] error creating jobs, check JobSubmitResponse for details") + return nil, responseItems, errors.New("[createJobs] error creating jobs, check JobSubmitResponse for details") } return jobs, nil, nil } diff --git a/internal/armada/server/submit_to_log.go b/internal/armada/server/submit_to_log.go index c54b151564c..70220ec0abc 100644 --- a/internal/armada/server/submit_to_log.go +++ b/internal/armada/server/submit_to_log.go @@ -99,8 +99,11 @@ func (srv *PulsarSubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmi } return result, err } - if err := commonvalidation.ValidateApiJobs(apiJobs, *srv.SubmitServer.schedulingConfig); err != nil { - return nil, err + if responseItems, err := commonvalidation.ValidateApiJobs(apiJobs, *srv.SubmitServer.schedulingConfig); err != nil { + result := &api.JobSubmitResponse{ + JobResponseItems: responseItems, + } + return result, err } schedulersByJobId, err := srv.assignScheduler(apiJobs) diff --git a/internal/common/validation/job.go b/internal/common/validation/job.go index fcc25f1b825..8b04f057994 100644 --- a/internal/common/validation/job.go +++ b/internal/common/validation/job.go @@ -1,6 +1,7 @@ package validation import ( + "fmt" "github.com/pkg/errors" "github.com/armadaproject/armada/internal/scheduler" @@ -11,50 +12,77 @@ import ( "github.com/armadaproject/armada/pkg/api" ) -func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) error { - err := validateGangs(jobs) - if err != nil { - return err +func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) ([]*api.JobSubmitResponseItem, error) { + if responseItems, err := validateGangs(jobs); err != nil { + return responseItems, err } + + responseItems := make([]*api.JobSubmitResponseItem, 0, len(jobs)) for _, job := range jobs { if err := ValidateApiJob(job, config); err != nil { - return err + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: err.Error(), + } + responseItems = append(responseItems, response) } } - return nil + + if len(responseItems) > 0 { + return responseItems, errors.New("[createJobs] Failed to validate jobs") + } + return nil, nil } -func validateGangs(jobs []*api.Job) error { +func validateGangs(jobs []*api.Job) ([]*api.JobSubmitResponseItem, error) { gangDetailsByGangId := make(map[string]struct { actualCardinality int expectedCardinality int expectedPriorityClassName string }) + + responseItems := make([]*api.JobSubmitResponseItem, 0, len(jobs)) for i, job := range jobs { annotations := job.Annotations gangId, gangCardinality, isGangJob, err := scheduler.GangIdAndCardinalityFromAnnotations(annotations) if err != nil { - return errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId) + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId).Error(), + } + responseItems = append(responseItems, response) } if !isGangJob { continue } if gangId == "" { - return errors.Errorf("empty gang id for %d-th job with id %s", i, job.Id) + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: fmt.Sprintf("empty gang id for %d-th job with id %s", i, job.Id), + } + responseItems = append(responseItems, response) } podSpec := util.PodSpecFromJob(job) if details, ok := gangDetailsByGangId[gangId]; ok { if details.expectedCardinality != gangCardinality { - return errors.Errorf( - "inconsistent gang cardinality for %d-th job with id %s in gang %s: expected %d but got %d", - i, job.Id, gangId, details.expectedCardinality, gangCardinality, - ) + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: fmt.Sprintf( + "inconsistent gang cardinality for %d-th job with id %s in gang %s: expected %d but got %d", + i, job.Id, gangId, details.expectedCardinality, gangCardinality, + ), + } + responseItems = append(responseItems, response) } if podSpec != nil && details.expectedPriorityClassName != podSpec.PriorityClassName { - return errors.Errorf( - "inconsistent PriorityClassName for %d-th job with id %s in gang %s: expected %s but got %s", - i, job.Id, gangId, details.expectedPriorityClassName, podSpec.PriorityClassName, - ) + response := &api.JobSubmitResponseItem{ + JobId: job.Id, + Error: fmt.Sprintf( + "inconsistent PriorityClassName for %d-th job with id %s in gang %s: expected %s but got %s", + i, job.Id, gangId, details.expectedPriorityClassName, podSpec.PriorityClassName, + ), + } + responseItems = append(responseItems, response) } details.actualCardinality++ gangDetailsByGangId[gangId] = details @@ -67,15 +95,20 @@ func validateGangs(jobs []*api.Job) error { gangDetailsByGangId[gangId] = details } } + + if len(responseItems) > 0 { + return responseItems, errors.New("[createJobs] Failed to validate gang jobs") + } + for gangId, details := range gangDetailsByGangId { if details.expectedCardinality != details.actualCardinality { - return errors.Errorf( + return nil, errors.Errorf( "unexpected number of jobs for gang %s: expected %d jobs but got %d", gangId, details.expectedCardinality, details.actualCardinality, ) } } - return nil + return nil, nil } func ValidateApiJob(job *api.Job, config configuration.SchedulingConfig) error { From ce201b27917a34637769ef67559c27e1c437c015 Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Thu, 29 Jun 2023 18:10:32 -0500 Subject: [PATCH 04/18] Update function usage to count errored jobs --- internal/armada/server/lease.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 3d8224cf4c4..0a84d36f31e 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -913,11 +913,11 @@ func (q *AggregatedQueueServer) addAvoidNodeAffinity( } changed := addAvoidNodeAffinity(jobs[0], labels, func(jobsToValidate []*api.Job) error { - if ok, err := validateJobsCanBeScheduled(jobsToValidate, allClusterSchedulingInfo); !ok { + if ok, responseItems, err := validateJobsCanBeScheduled(jobsToValidate, allClusterSchedulingInfo); !ok { if err != nil { - return errors.WithMessage(err, "can't schedule at least 1 job") + return errors.WithMessagef(err, "can't schedule %d job", len(responseItems)) } else { - return errors.Errorf("can't schedule at least 1 job") + return errors.Errorf("can't schedule %d job", len(responseItems)) } } return nil From 3d638cc3e4a1dfc470d5c2a5815d96bd3cbdb5f6 Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Mon, 3 Jul 2023 10:44:03 -0500 Subject: [PATCH 05/18] Fix grammar --- internal/armada/server/lease.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 0a84d36f31e..fbc13ffffbb 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -915,9 +915,9 @@ func (q *AggregatedQueueServer) addAvoidNodeAffinity( changed := addAvoidNodeAffinity(jobs[0], labels, func(jobsToValidate []*api.Job) error { if ok, responseItems, err := validateJobsCanBeScheduled(jobsToValidate, allClusterSchedulingInfo); !ok { if err != nil { - return errors.WithMessagef(err, "can't schedule %d job", len(responseItems)) + return errors.WithMessagef(err, "can't schedule %d job(s)", len(responseItems)) } else { - return errors.Errorf("can't schedule %d job", len(responseItems)) + return errors.Errorf("can't schedule %d job(s)", len(responseItems)) } } return nil From 4693ad0e193e091d040895f44c2883e5f520281d Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Mon, 3 Jul 2023 18:14:09 -0500 Subject: [PATCH 06/18] Added updated test cases --- internal/armada/server/submit_test.go | 87 ++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/internal/armada/server/submit_test.go b/internal/armada/server/submit_test.go index 151181a54ed..a9ab1b9001b 100644 --- a/internal/armada/server/submit_test.go +++ b/internal/armada/server/submit_test.go @@ -1738,6 +1738,9 @@ func TestSubmitServer_CreateJobs_WithJobIdReplacement(t *testing.T) { }, } + // Empty - no response items expected as no jobs were submitted without errors + var expectedResponseItems []*api.JobSubmitResponseItem + request := &api.JobSubmitRequest{ Queue: "test", JobSetId: "test-jobsetid", @@ -1777,8 +1780,90 @@ func TestSubmitServer_CreateJobs_WithJobIdReplacement(t *testing.T) { } ownershipGroups := make([]string, 0) withSubmitServer(func(s *SubmitServer, events *repository.TestEventStore) { - output, err := s.createJobsObjects(request, "test", ownershipGroups, mockNow, mockNewULID) + output, responseItems, err := s.createJobsObjects(request, "test", ownershipGroups, mockNow, mockNewULID) assert.NoError(t, err) + assert.Equal(t, expectedResponseItems, responseItems) assert.Equal(t, expected, output) }) } + +func TestSubmitServer_CreateJobs_WithDuplicatePodSpec(t *testing.T) { + timeNow := time.Now() + mockNow := func() time.Time { + return timeNow + } + mockNewULID := func() string { + return "test-ulid" + } + + expectedResponseItems := []*api.JobSubmitResponseItem{ + { + JobId: "test-ulid", + Error: "[createJobs] job 0 in job set test-jobsetid contains both podSpec and podSpecs, but may only contain either", + }, + } + expectedError := "[createJobs] error creating jobs, check JobSubmitResponse for details" + + request := &api.JobSubmitRequest{ + Queue: "test", + JobSetId: "test-jobsetid", + JobRequestItems: []*api.JobSubmitRequestItem{ + { + Priority: 1, + Namespace: "test", + ClientId: "0", + Labels: map[string]string{ + "a.label": "job-id-is-{JobId}", + }, + Annotations: map[string]string{ + "a.nnotation": "job-id-is-{JobId}", + }, + PodSpecs: []*v1.PodSpec{ + { + Containers: []v1.Container{ + { + Name: "app", + Image: "test:latest", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("100Mi"), + }, + Requests: v1.ResourceList{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("100Mi"), + }, + }, + }, + }, + }, + }, + PodSpec: &v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "app", + Image: "test:latest", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("100Mi"), + }, + Requests: v1.ResourceList{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("100Mi"), + }, + }, + }, + }, + }, + }, + }, + } + ownershipGroups := make([]string, 0) + withSubmitServer(func(s *SubmitServer, events *repository.TestEventStore) { + output, responseItems, err := s.createJobsObjects(request, "test", ownershipGroups, mockNow, mockNewULID) + assert.Equal(t, expectedError, err.Error()) + assert.Equal(t, expectedResponseItems, responseItems) + assert.Nil(t, output) + }) +} From cf43d96f8268e9c2504f4eb185d67ed6856bfeed Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Mon, 3 Jul 2023 20:01:05 -0500 Subject: [PATCH 07/18] Update gang job testing --- internal/common/validation/job_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/common/validation/job_test.go b/internal/common/validation/job_test.go index 960b74ba2bb..b448382baa3 100644 --- a/internal/common/validation/job_test.go +++ b/internal/common/validation/job_test.go @@ -337,8 +337,9 @@ func TestValidateGangs(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - err := validateGangs(tc.Jobs) + responseItems, err := validateGangs(tc.Jobs) if tc.ExpectSuccess { + assert.Nil(t, responseItems) assert.NoError(t, err) } else { assert.Error(t, err) From ccb824d4b73990371ce3506a466ec5805f8f9917 Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Wed, 5 Jul 2023 11:45:18 -0500 Subject: [PATCH 08/18] Merge branch 'master' into feat/create_job_error --- internal/common/validation/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/common/validation/job.go b/internal/common/validation/job.go index c41d6451026..d2b57cc14ad 100644 --- a/internal/common/validation/job.go +++ b/internal/common/validation/job.go @@ -87,7 +87,7 @@ func validateGangs(jobs []*api.Job) ([]*api.JobSubmitResponseItem, error) { responseItems = append(responseItems, response) } if nodeUniformityLabel != details.expectedNodeUniformityLabel { - return errors.Errorf( + return nil, errors.Errorf( "inconsistent nodeUniformityLabel for %d-th job with id %s in gang %s: expected %s but got %s", i, job.Id, gangId, details.expectedNodeUniformityLabel, nodeUniformityLabel, ) From d0b2d3f441b818ed718e2ae6e73a51eba8931322 Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Thu, 6 Jul 2023 09:36:45 -0500 Subject: [PATCH 09/18] Lint fix --- internal/armada/server/job_validation.go | 1 + internal/common/validation/job.go | 1 + 2 files changed, 2 insertions(+) diff --git a/internal/armada/server/job_validation.go b/internal/armada/server/job_validation.go index 869015fd49c..dedf89afec2 100644 --- a/internal/armada/server/job_validation.go +++ b/internal/armada/server/job_validation.go @@ -2,6 +2,7 @@ package server import ( "fmt" + "github.com/pkg/errors" "github.com/armadaproject/armada/internal/armada/scheduling" diff --git a/internal/common/validation/job.go b/internal/common/validation/job.go index d2b57cc14ad..29ac347d030 100644 --- a/internal/common/validation/job.go +++ b/internal/common/validation/job.go @@ -2,6 +2,7 @@ package validation import ( "fmt" + "github.com/pkg/errors" "github.com/armadaproject/armada/internal/scheduler" From a36727f528eca56b101dbdf3ba564328841a8998 Mon Sep 17 00:00:00 2001 From: Raaj Patel <41763998+Raajheer1@users.noreply.github.com> Date: Sat, 5 Aug 2023 18:30:03 -0500 Subject: [PATCH 10/18] Rework gRPC to send JobSubmitResponse over status.details --- go.mod | 2 ++ go.sum | 6 ++++ internal/armada/server/submit.go | 44 ++++++++++++++++++++----- internal/armada/server/submit_to_log.go | 21 +++++++++--- internal/armadactl/submit.go | 5 +++ pkg/api/submit.pb.go | 10 +++++- 6 files changed, 74 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 0f03be2aadc..73c93eda03d 100644 --- a/go.mod +++ b/go.mod @@ -126,6 +126,8 @@ require ( github.com/go-playground/universal-translator v0.18.0 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a // indirect + github.com/gogo/status v1.1.1 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/snappy v0.0.3 // indirect github.com/gomodule/redigo v2.0.0+incompatible // indirect diff --git a/go.sum b/go.sum index 3b3ef71afe9..fd01ce6d1d9 100644 --- a/go.sum +++ b/go.sum @@ -305,10 +305,14 @@ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a h1:dR8+Q0uO5S2ZBcs2IH6VBKYwSxPo2vYCYq0ot0mu7xA= +github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg= +github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= @@ -1312,6 +1316,7 @@ google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCID google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -1359,6 +1364,7 @@ google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaE google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef h1:uQ2vjV/sHTsWSqdKeLqmwitzgvjMl7o4IdtHwUDXSJY= google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= +google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/internal/armada/server/submit.go b/internal/armada/server/submit.go index b647ff4ea6a..a26d5401abf 100644 --- a/internal/armada/server/submit.go +++ b/internal/armada/server/submit.go @@ -9,11 +9,11 @@ import ( "time" "github.com/gogo/protobuf/types" + "github.com/gogo/status" pool "github.com/jolestar/go-commons-pool" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "k8s.io/utils/strings/slices" "github.com/armadaproject/armada/internal/armada/configuration" @@ -41,6 +41,21 @@ type SubmitServer struct { compressorPool *pool.ObjectPool } +type JobSubmitError struct { + JobErrorsDetails []*api.JobSubmitResponseItem + Err error +} + +func (e *JobSubmitError) Error() string { + output := "" + for _, jobError := range e.JobErrorsDetails { + output += fmt.Sprintf("Error - Job %s: %s\n", jobError.JobId, jobError.Error) + } + + output += fmt.Sprintf("\nError - %s", e.Err.Error()) + return output +} + func NewSubmitServer( permissions authorization.PermissionChecker, jobRepository repository.JobRepository, @@ -255,19 +270,28 @@ func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRe jobs, responseItems, e := server.createJobs(req, principal.GetName(), principal.GetGroupNames()) if e != nil { - result := &api.JobSubmitResponse{ + details := &api.JobSubmitResponse{ JobResponseItems: responseItems, } reqJson, _ := json.Marshal(req) - return result, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e) + st, err := status.Newf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e).WithDetails(details) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e) + } + return nil, st.Err() } if responseItems, err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil { - result := &api.JobSubmitResponse{ + details := &api.JobSubmitResponse{ JobResponseItems: responseItems, } - return result, err + reqJson, _ := json.Marshal(req) + st, err := status.Newf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e).WithDetails(details) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e) + } + return nil, st.Err() } q, err := server.getQueueOrCreate(ctx, req.Queue) @@ -305,13 +329,17 @@ func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRe } if ok, responseItems, err := validateJobsCanBeScheduled(jobs, allClusterSchedulingInfo); !ok { - result := &api.JobSubmitResponse{ + details := &api.JobSubmitResponse{ JobResponseItems: responseItems, } if err != nil { - return result, errors.WithMessagef(err, "can't schedule job for user %s", principal.GetName()) + st, e := status.Newf(codes.InvalidArgument, "[SubmitJobs] error validating jobs: %s", err).WithDetails(details) + if e != nil { + return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] error validating jobs: %s", err) + } + return nil, st.Err() } - return result, errors.Errorf("can't schedule job for user %s", principal.GetName()) + return nil, errors.Errorf("can't schedule job for user %s", principal.GetName()) } // Create events marking the jobs as submitted diff --git a/internal/armada/server/submit_to_log.go b/internal/armada/server/submit_to_log.go index 70220ec0abc..5b83f7a610b 100644 --- a/internal/armada/server/submit_to_log.go +++ b/internal/armada/server/submit_to_log.go @@ -10,11 +10,11 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/gogo/protobuf/types" + "github.com/gogo/status" "github.com/google/uuid" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/armadaproject/armada/internal/armada/permissions" "github.com/armadaproject/armada/internal/armada/repository" @@ -94,16 +94,27 @@ func (srv *PulsarSubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmi // We use the legacy code for the conversion to ensure that behaviour doesn't change. apiJobs, responseItems, err := srv.SubmitServer.createJobs(req, userId, groups) if err != nil { - result := &api.JobSubmitResponse{ + details := &api.JobSubmitResponse{ JobResponseItems: responseItems, } - return result, err + + st, e := status.Newf(codes.InvalidArgument, "[SubmitJobs] Failed to parse job request: %s", err.Error()).WithDetails(details) + if e != nil { + return nil, status.Newf(codes.Internal, "[SubmitJobs] Failed to parse job request: %s", e.Error()).Err() + } + + return nil, st.Err() } if responseItems, err := commonvalidation.ValidateApiJobs(apiJobs, *srv.SubmitServer.schedulingConfig); err != nil { - result := &api.JobSubmitResponse{ + details := &api.JobSubmitResponse{ JobResponseItems: responseItems, } - return result, err + + st, e := status.Newf(codes.InvalidArgument, "[SubmitJobs] Failed to parse job request: %s", err.Error()).WithDetails(details) + if e != nil { + return nil, status.Newf(codes.Internal, "[SubmitJobs] Failed to parse job request: %s", e.Error()).Err() + } + return nil, st.Err() } schedulersByJobId, err := srv.assignScheduler(apiJobs) diff --git a/internal/armadactl/submit.go b/internal/armadactl/submit.go index e779b7198fd..8382c6badee 100644 --- a/internal/armadactl/submit.go +++ b/internal/armadactl/submit.go @@ -35,6 +35,11 @@ func (a *App) Submit(path string, dryRun bool) error { for _, request := range requests { response, err := client.SubmitJobs(c, request) if err != nil { + fmt.Fprintln(a.Out, "[JobSubmitResponse]") + for _, jobResponseItem := range response.JobResponseItems { + fmt.Fprintf(a.Out, "Error submitting job with id %s, details: %s\n", jobResponseItem.JobId, jobResponseItem.Error) + } + fmt.Fprintln(a.Out, "[Error]") return errors.WithMessagef(err, "error submitting request %#v", request) } diff --git a/pkg/api/submit.pb.go b/pkg/api/submit.pb.go index ae36a9a2837..e7fb0542f79 100644 --- a/pkg/api/submit.pb.go +++ b/pkg/api/submit.pb.go @@ -17,10 +17,10 @@ import ( proto "github.com/gogo/protobuf/proto" github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" types "github.com/gogo/protobuf/types" + "github.com/gogo/status" _ "google.golang.org/genproto/googleapis/api/annotations" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" ) @@ -1760,6 +1760,13 @@ func (c *submitClient) SubmitJobs(ctx context.Context, in *JobSubmitRequest, opt out := new(JobSubmitResponse) err := c.cc.Invoke(ctx, "/api.Submit/SubmitJobs", in, out, opts...) if err != nil { + st := status.Convert(err) + for _, detail := range st.Details() { + switch t := detail.(type) { + case *JobSubmitResponse: + return t, err + } + } return nil, err } return out, nil @@ -1817,6 +1824,7 @@ func (c *submitClient) UpdateQueue(ctx context.Context, in *Queue, opts ...grpc. return nil, err } return out, nil + } func (c *submitClient) UpdateQueues(ctx context.Context, in *QueueList, opts ...grpc.CallOption) (*BatchQueueUpdateResponse, error) { From 0bcf984ee118d8dceb74df50ff1380a882723f9d Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Mon, 7 Aug 2023 18:59:05 -0500 Subject: [PATCH 11/18] Add better nil checking --- internal/armadactl/submit.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/armadactl/submit.go b/internal/armadactl/submit.go index 8382c6badee..5d9bed650ed 100644 --- a/internal/armadactl/submit.go +++ b/internal/armadactl/submit.go @@ -35,9 +35,11 @@ func (a *App) Submit(path string, dryRun bool) error { for _, request := range requests { response, err := client.SubmitJobs(c, request) if err != nil { - fmt.Fprintln(a.Out, "[JobSubmitResponse]") - for _, jobResponseItem := range response.JobResponseItems { - fmt.Fprintf(a.Out, "Error submitting job with id %s, details: %s\n", jobResponseItem.JobId, jobResponseItem.Error) + if response == nil { + fmt.Fprintln(a.Out, "[JobSubmitResponse]") + for _, jobResponseItem := range response.JobResponseItems { + fmt.Fprintf(a.Out, "Error submitting job with id %s, details: %s\n", jobResponseItem.JobId, jobResponseItem.Error) + } } fmt.Fprintln(a.Out, "[Error]") return errors.WithMessagef(err, "error submitting request %#v", request) From 0d9eab4ebd31a98b54e53d365d0db426377bd3ca Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Mon, 7 Aug 2023 19:08:39 -0500 Subject: [PATCH 12/18] Typo == instead of != --- internal/armadactl/submit.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/armadactl/submit.go b/internal/armadactl/submit.go index 5d9bed650ed..305bd751008 100644 --- a/internal/armadactl/submit.go +++ b/internal/armadactl/submit.go @@ -35,7 +35,7 @@ func (a *App) Submit(path string, dryRun bool) error { for _, request := range requests { response, err := client.SubmitJobs(c, request) if err != nil { - if response == nil { + if response != nil { fmt.Fprintln(a.Out, "[JobSubmitResponse]") for _, jobResponseItem := range response.JobResponseItems { fmt.Fprintf(a.Out, "Error submitting job with id %s, details: %s\n", jobResponseItem.JobId, jobResponseItem.Error) From 2c022b6f38a3684e0a353930bec349cfa14e076f Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Mon, 7 Aug 2023 22:46:05 -0500 Subject: [PATCH 13/18] go mod tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 73c93eda03d..2ec60d5747b 100644 --- a/go.mod +++ b/go.mod @@ -83,6 +83,7 @@ require ( github.com/go-openapi/swag v0.22.3 github.com/go-openapi/validate v0.22.1 github.com/go-playground/validator/v10 v10.11.1 + github.com/gogo/status v1.1.1 github.com/golang/mock v1.6.0 github.com/goreleaser/goreleaser v1.15.2 github.com/jackc/pgx/v5 v5.3.1 @@ -127,7 +128,6 @@ require ( github.com/gobwas/glob v0.2.3 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a // indirect - github.com/gogo/status v1.1.1 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/snappy v0.0.3 // indirect github.com/gomodule/redigo v2.0.0+incompatible // indirect From 003e7dd77698922193e8a4be84e6d05081f72d0a Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Fri, 11 Aug 2023 09:53:02 -0500 Subject: [PATCH 14/18] Wrap gRPC SubmitJob function --- internal/armadactl/submit.go | 4 +++- pkg/api/submit.go | 26 ++++++++++++++++++++++++++ pkg/api/submit.pb.go | 9 +-------- pkg/client/submit.go | 2 +- 4 files changed, 31 insertions(+), 10 deletions(-) create mode 100644 pkg/api/submit.go diff --git a/internal/armadactl/submit.go b/internal/armadactl/submit.go index 305bd751008..9ad6001246b 100644 --- a/internal/armadactl/submit.go +++ b/internal/armadactl/submit.go @@ -31,7 +31,9 @@ func (a *App) Submit(path string, dryRun bool) error { } requests := client.CreateChunkedSubmitRequests(submitFile.Queue, submitFile.JobSetId, submitFile.Jobs) - return client.WithSubmitClient(a.Params.ApiConnectionDetails, func(c api.SubmitClient) error { + return client.WithSubmitClient(a.Params.ApiConnectionDetails, func(originalClient api.SubmitClient) error { + c := api.CustomSubmitClient{Inner: originalClient} + for _, request := range requests { response, err := client.SubmitJobs(c, request) if err != nil { diff --git a/pkg/api/submit.go b/pkg/api/submit.go new file mode 100644 index 00000000000..6ed5492a3c7 --- /dev/null +++ b/pkg/api/submit.go @@ -0,0 +1,26 @@ +package api + +import ( + "context" + "github.com/gogo/status" + "google.golang.org/grpc" +) + +type CustomSubmitClient struct { + Inner SubmitClient +} + +func (c *CustomSubmitClient) SubmitJobs(ctx context.Context, in *JobSubmitRequest, opts ...grpc.CallOption) (*JobSubmitResponse, error) { + out, err := c.Inner.SubmitJobs(ctx, in, opts...) + if err != nil { + st := status.Convert(err) + for _, detail := range st.Details() { + switch t := detail.(type) { + case *JobSubmitResponse: + return t, err + } + } + return nil, err + } + return out, nil +} diff --git a/pkg/api/submit.pb.go b/pkg/api/submit.pb.go index e7fb0542f79..61c5c289848 100644 --- a/pkg/api/submit.pb.go +++ b/pkg/api/submit.pb.go @@ -7,6 +7,7 @@ import ( context "context" encoding_binary "encoding/binary" fmt "fmt" + "google.golang.org/grpc/status" io "io" math "math" math_bits "math/bits" @@ -17,7 +18,6 @@ import ( proto "github.com/gogo/protobuf/proto" github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" types "github.com/gogo/protobuf/types" - "github.com/gogo/status" _ "google.golang.org/genproto/googleapis/api/annotations" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" @@ -1760,13 +1760,6 @@ func (c *submitClient) SubmitJobs(ctx context.Context, in *JobSubmitRequest, opt out := new(JobSubmitResponse) err := c.cc.Invoke(ctx, "/api.Submit/SubmitJobs", in, out, opts...) if err != nil { - st := status.Convert(err) - for _, detail := range st.Details() { - switch t := detail.(type) { - case *JobSubmitResponse: - return t, err - } - } return nil, err } return out, nil diff --git a/pkg/client/submit.go b/pkg/client/submit.go index 234dad8f49e..e838f8c7c8a 100644 --- a/pkg/client/submit.go +++ b/pkg/client/submit.go @@ -31,7 +31,7 @@ func DeleteQueue(submitClient api.SubmitClient, name string) error { return e } -func SubmitJobs(submitClient api.SubmitClient, request *api.JobSubmitRequest) (*api.JobSubmitResponse, error) { +func SubmitJobs(submitClient api.CustomSubmitClient, request *api.JobSubmitRequest) (*api.JobSubmitResponse, error) { AddClientIds(request.JobRequestItems) ctx, cancel := common.ContextWithDefaultTimeout() defer cancel() From ab8451f2eba6d6f0b6dba48e3eba22edc68398f8 Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Fri, 11 Aug 2023 09:56:42 -0500 Subject: [PATCH 15/18] Create new client function instead of sharing --- internal/armadactl/submit.go | 2 +- pkg/client/submit.go | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/armadactl/submit.go b/internal/armadactl/submit.go index 9ad6001246b..26514c2d66c 100644 --- a/internal/armadactl/submit.go +++ b/internal/armadactl/submit.go @@ -35,7 +35,7 @@ func (a *App) Submit(path string, dryRun bool) error { c := api.CustomSubmitClient{Inner: originalClient} for _, request := range requests { - response, err := client.SubmitJobs(c, request) + response, err := client.CustomClientSubmitJobs(c, request) if err != nil { if response != nil { fmt.Fprintln(a.Out, "[JobSubmitResponse]") diff --git a/pkg/client/submit.go b/pkg/client/submit.go index e838f8c7c8a..4214ca7ca71 100644 --- a/pkg/client/submit.go +++ b/pkg/client/submit.go @@ -31,7 +31,14 @@ func DeleteQueue(submitClient api.SubmitClient, name string) error { return e } -func SubmitJobs(submitClient api.CustomSubmitClient, request *api.JobSubmitRequest) (*api.JobSubmitResponse, error) { +func SubmitJobs(submitClient api.SubmitClient, request *api.JobSubmitRequest) (*api.JobSubmitResponse, error) { + AddClientIds(request.JobRequestItems) + ctx, cancel := common.ContextWithDefaultTimeout() + defer cancel() + return submitClient.SubmitJobs(ctx, request) +} + +func CustomClientSubmitJobs(submitClient api.CustomSubmitClient, request *api.JobSubmitRequest) (*api.JobSubmitResponse, error) { AddClientIds(request.JobRequestItems) ctx, cancel := common.ContextWithDefaultTimeout() defer cancel() From 354517d6c89b0c424aa002557a37c421dc25c3aa Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Fri, 11 Aug 2023 10:09:51 -0500 Subject: [PATCH 16/18] Change import order --- pkg/api/submit.pb.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/api/submit.pb.go b/pkg/api/submit.pb.go index 61c5c289848..ae36a9a2837 100644 --- a/pkg/api/submit.pb.go +++ b/pkg/api/submit.pb.go @@ -7,7 +7,6 @@ import ( context "context" encoding_binary "encoding/binary" fmt "fmt" - "google.golang.org/grpc/status" io "io" math "math" math_bits "math/bits" @@ -21,6 +20,7 @@ import ( _ "google.golang.org/genproto/googleapis/api/annotations" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" ) @@ -1817,7 +1817,6 @@ func (c *submitClient) UpdateQueue(ctx context.Context, in *Queue, opts ...grpc. return nil, err } return out, nil - } func (c *submitClient) UpdateQueues(ctx context.Context, in *QueueList, opts ...grpc.CallOption) (*BatchQueueUpdateResponse, error) { From fabb8324c05d485d44d8e05b69608ae5e9924dd1 Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Fri, 11 Aug 2023 10:22:57 -0500 Subject: [PATCH 17/18] Add a space between imports --- pkg/api/submit.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/api/submit.go b/pkg/api/submit.go index 6ed5492a3c7..0a913f1215b 100644 --- a/pkg/api/submit.go +++ b/pkg/api/submit.go @@ -2,6 +2,7 @@ package api import ( "context" + "github.com/gogo/status" "google.golang.org/grpc" ) From c3facda69633fa3dae06fb9e19ba6ac7de01afa4 Mon Sep 17 00:00:00 2001 From: Raajheer1 Date: Fri, 11 Aug 2023 10:54:15 -0500 Subject: [PATCH 18/18] Avoid nil pointer deference --- internal/armada/server/submit.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/armada/server/submit.go b/internal/armada/server/submit.go index a26d5401abf..189cd2b62f6 100644 --- a/internal/armada/server/submit.go +++ b/internal/armada/server/submit.go @@ -835,6 +835,7 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own Error: fmt.Sprintf("[createJobs] job %d in job set %s contains no podSpec", i, request.JobSetId), } responseItems = append(responseItems, response) + continue // Safety check, to avoid possible nil pointer dereference below } if err := validation.ValidateJobSubmitRequestItem(item); err != nil { response := &api.JobSubmitResponseItem{