Skip to content

Commit

Permalink
Merge remote-tracking branch 'raaj/feat/create_job_error' into raaj-p…
Browse files Browse the repository at this point in the history
…atel/create_job_error
  • Loading branch information
richscott committed Aug 30, 2023
2 parents 73a5017 + a1e2f62 commit 5af3f97
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 54 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ require (
github.com/go-openapi/swag v0.22.3
github.com/go-openapi/validate v0.22.1
github.com/go-playground/validator/v10 v10.14.1
github.com/gogo/status v1.1.1
github.com/golang/mock v1.6.0
github.com/goreleaser/goreleaser v1.15.2
github.com/jackc/pgx/v5 v5.3.1
Expand Down Expand Up @@ -127,6 +128,7 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/gomodule/redigo v2.0.0+incompatible // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,14 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a h1:dR8+Q0uO5S2ZBcs2IH6VBKYwSxPo2vYCYq0ot0mu7xA=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gogo/status v1.1.1 h1:DuHXlSFHNKqTQ+/ACf5Vs6r4X/dH2EgIzR9Vr+H65kg=
github.com/gogo/status v1.1.1/go.mod h1:jpG3dM5QPcqu19Hg8lkUhBFBa3TcLs1DG7+2Jqci7oU=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
Expand Down Expand Up @@ -1256,6 +1260,7 @@ google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCID
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
Expand Down Expand Up @@ -1302,6 +1307,7 @@ google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 h1:
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand Down
24 changes: 20 additions & 4 deletions internal/armada/server/job_validation.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package server

import (
"fmt"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/armada/scheduling"
Expand All @@ -13,16 +15,30 @@ import (
func validateJobsCanBeScheduled(
jobs []*api.Job,
allClusterSchedulingInfo map[string]*api.ClusterSchedulingInfoReport,
) (bool, error) {
) (bool, []*api.JobSubmitResponseItem, error) {
activeClusterSchedulingInfo := scheduling.FilterActiveClusterSchedulingInfoReports(allClusterSchedulingInfo)
responseItems := make([]*api.JobSubmitResponseItem, 0, len(jobs))
for i, job := range jobs {
if ok, err := scheduling.MatchSchedulingRequirementsOnAnyCluster(job, activeClusterSchedulingInfo); !ok {
if err != nil {
return false, errors.WithMessagef(err, "%d-th job can't be scheduled", i)
response := &api.JobSubmitResponseItem{
JobId: job.Id,
Error: errors.WithMessagef(err, "%d-th job can't be scheduled", i).Error(),
}
responseItems = append(responseItems, response)
} else {
return false, errors.Errorf("%d-th job can't be scheduled", i)
response := &api.JobSubmitResponseItem{
JobId: job.Id,
Error: fmt.Sprintf("%d-th job can't be scheduled", i),
}
responseItems = append(responseItems, response)
}
}
}
return true, nil

if len(responseItems) > 0 {
return false, responseItems, errors.New("[createJobs] Failed to validate jobs can be scheduled")
}

return true, nil, nil
}
6 changes: 3 additions & 3 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,11 +954,11 @@ func (q *AggregatedQueueServer) addAvoidNodeAffinity(
}

changed := addAvoidNodeAffinity(jobs[0], labels, func(jobsToValidate []*api.Job) error {
if ok, err := validateJobsCanBeScheduled(jobsToValidate, allClusterSchedulingInfo); !ok {
if ok, responseItems, err := validateJobsCanBeScheduled(jobsToValidate, allClusterSchedulingInfo); !ok {
if err != nil {
return errors.WithMessage(err, "can't schedule at least 1 job")
return errors.WithMessagef(err, "can't schedule %d job(s)", len(responseItems))
} else {
return errors.Errorf("can't schedule at least 1 job")
return errors.Errorf("can't schedule %d job(s)", len(responseItems))
}
}
return nil
Expand Down
99 changes: 80 additions & 19 deletions internal/armada/server/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"time"

"github.com/gogo/protobuf/types"
"github.com/gogo/status"
pool "github.com/jolestar/go-commons-pool"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/utils/strings/slices"

"github.com/armadaproject/armada/internal/armada/configuration"
Expand Down Expand Up @@ -41,6 +41,21 @@ type SubmitServer struct {
compressorPool *pool.ObjectPool
}

type JobSubmitError struct {
JobErrorsDetails []*api.JobSubmitResponseItem
Err error
}

func (e *JobSubmitError) Error() string {
output := ""
for _, jobError := range e.JobErrorsDetails {
output += fmt.Sprintf("Error - Job %s: %s\n", jobError.JobId, jobError.Error)
}

output += fmt.Sprintf("\nError - %s", e.Err.Error())
return output
}

func NewSubmitServer(
permissions authorization.PermissionChecker,
jobRepository repository.JobRepository,
Expand Down Expand Up @@ -253,13 +268,30 @@ func (server *SubmitServer) DeleteQueue(ctx context.Context, request *api.QueueD
func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRequest) (*api.JobSubmitResponse, error) {
principal := authorization.GetPrincipal(ctx)

jobs, e := server.createJobs(req, principal.GetName(), principal.GetGroupNames())
jobs, responseItems, e := server.createJobs(req, principal.GetName(), principal.GetGroupNames())
if e != nil {
details := &api.JobSubmitResponse{
JobResponseItems: responseItems,
}

reqJson, _ := json.Marshal(req)
return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e)
st, err := status.Newf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e).WithDetails(details)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e)
}
return nil, st.Err()
}
if err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil {
return nil, err
if responseItems, err := validation.ValidateApiJobs(jobs, *server.schedulingConfig); err != nil {
details := &api.JobSubmitResponse{
JobResponseItems: responseItems,
}

reqJson, _ := json.Marshal(req)
st, err := status.Newf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e).WithDetails(details)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] Error submitting job %s for user %s: %v", reqJson, principal.GetName(), e)
}
return nil, st.Err()
}

q, err := server.getQueueOrCreate(ctx, req.Queue)
Expand Down Expand Up @@ -296,9 +328,16 @@ func (server *SubmitServer) SubmitJobs(ctx context.Context, req *api.JobSubmitRe
return nil, status.Errorf(codes.InvalidArgument, "error getting scheduling info: %s", err)
}

if ok, err := validateJobsCanBeScheduled(jobs, allClusterSchedulingInfo); !ok {
if ok, responseItems, err := validateJobsCanBeScheduled(jobs, allClusterSchedulingInfo); !ok {
details := &api.JobSubmitResponse{
JobResponseItems: responseItems,
}
if err != nil {
return nil, errors.WithMessagef(err, "can't schedule job for user %s", principal.GetName())
st, e := status.Newf(codes.InvalidArgument, "[SubmitJobs] error validating jobs: %s", err).WithDetails(details)
if e != nil {
return nil, status.Errorf(codes.InvalidArgument, "[SubmitJobs] error validating jobs: %s", err)
}
return nil, st.Err()
}
return nil, errors.Errorf("can't schedule job for user %s", principal.GetName())
}
Expand Down Expand Up @@ -746,16 +785,16 @@ func (server *SubmitServer) getQueueOrCreate(ctx context.Context, queueName stri
// createJobs returns a list of objects representing the jobs in a JobSubmitRequest.
// This function validates the jobs in the request and the pod specs. in each job.
// If any job or pod in invalid, an error is returned.
func (server *SubmitServer) createJobs(request *api.JobSubmitRequest, owner string, ownershipGroups []string) ([]*api.Job, error) {
func (server *SubmitServer) createJobs(request *api.JobSubmitRequest, owner string, ownershipGroups []string) ([]*api.Job, []*api.JobSubmitResponseItem, error) {
return server.createJobsObjects(request, owner, ownershipGroups, time.Now, util.NewULID)
}

func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, owner string, ownershipGroups []string,
getTime func() time.Time, getUlid func() string,
) ([]*api.Job, error) {
) ([]*api.Job, []*api.JobSubmitResponseItem, error) {
compressor, err := server.compressorPool.BorrowObject(context.Background())
if err != nil {
return nil, err
return nil, nil, err
}
defer func(compressorPool *pool.ObjectPool, ctx context.Context, object interface{}) {
err := compressorPool.ReturnObject(ctx, object)
Expand All @@ -765,29 +804,45 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own
}(server.compressorPool, context.Background(), compressor)
compressedOwnershipGroups, err := compress.CompressStringArray(ownershipGroups, compressor.(compress.Compressor))
if err != nil {
return nil, err
return nil, nil, err
}

jobs := make([]*api.Job, 0, len(request.JobRequestItems))

if request.JobSetId == "" {
return nil, errors.Errorf("[createJobs] job set not specified")
return nil, nil, errors.Errorf("[createJobs] job set not specified")
}

if request.Queue == "" {
return nil, errors.Errorf("[createJobs] queue not specified")
return nil, nil, errors.Errorf("[createJobs] queue not specified")
}

responseItems := make([]*api.JobSubmitResponseItem, 0, len(request.JobRequestItems))
for i, item := range request.JobRequestItems {
jobId := getUlid()

if item.PodSpec != nil && len(item.PodSpecs) > 0 {
return nil, errors.Errorf("[createJobs] job %d in job set %s contains both podSpec and podSpecs, but may only contain either", i, request.JobSetId)
response := &api.JobSubmitResponseItem{
JobId: jobId,
Error: fmt.Sprintf("[createJobs] job %d in job set %s contains both podSpec and podSpecs, but may only contain either", i, request.JobSetId),
}
responseItems = append(responseItems, response)
}
podSpec := item.GetMainPodSpec()
if podSpec == nil {
return nil, errors.Errorf("[createJobs] job %d in job set %s contains no podSpec", i, request.JobSetId)
response := &api.JobSubmitResponseItem{
JobId: jobId,
Error: fmt.Sprintf("[createJobs] job %d in job set %s contains no podSpec", i, request.JobSetId),
}
responseItems = append(responseItems, response)
continue // Safety check, to avoid possible nil pointer dereference below
}
if err := validation.ValidateJobSubmitRequestItem(item); err != nil {
return nil, errors.Errorf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err)
response := &api.JobSubmitResponseItem{
JobId: jobId,
Error: fmt.Sprintf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err),
}
responseItems = append(responseItems, response)
}
namespace := item.Namespace
if namespace == "" {
Expand All @@ -797,7 +852,11 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own
applyDefaultsToAnnotations(item.Annotations, *server.schedulingConfig)
applyDefaultsToPodSpec(podSpec, *server.schedulingConfig)
if err := validation.ValidatePodSpec(podSpec, server.schedulingConfig); err != nil {
return nil, errors.Errorf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err)
response := &api.JobSubmitResponseItem{
JobId: jobId,
Error: fmt.Sprintf("[createJobs] error validating the %d-th job of job set %s: %v", i, request.JobSetId, err),
}
responseItems = append(responseItems, response)
}

// TODO: remove, RequiredNodeLabels is deprecated and will be removed in future versions
Expand All @@ -808,7 +867,6 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own
podSpec.NodeSelector[k] = v
}

jobId := getUlid()
enrichText(item.Labels, jobId)
enrichText(item.Annotations, jobId)
j := &api.Job{
Expand Down Expand Up @@ -838,7 +896,10 @@ func (server *SubmitServer) createJobsObjects(request *api.JobSubmitRequest, own
jobs = append(jobs, j)
}

return jobs, nil
if len(responseItems) > 0 {
return nil, responseItems, errors.New("[createJobs] error creating jobs, check JobSubmitResponse for details")
}
return jobs, nil, nil
}

func enrichText(labels map[string]string, jobId string) {
Expand Down
Loading

0 comments on commit 5af3f97

Please sign in to comment.