diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index ab4cdc84a83..635a4bea261 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -1,10 +1,10 @@ [project] name = "armada_client" -version = "0.3.4" +version = "0.3.5" description = "Armada gRPC API python client" readme = "README.md" requires-python = ">=3.7" -dependencies = ["grpcio==1.58.0", "grpcio-tools==1.58.0", "mypy-protobuf>=3.2.0", "protobuf>=3.20,<5.0" ] +dependencies = ["grpcio==1.66.1", "grpcio-tools==1.66.1", "mypy-protobuf>=3.2.0", "protobuf>=5.26.1,<6.0dev" ] license = { text = "Apache Software License" } authors = [{ name = "G-Research Open Source Software", email = "armada@armadaproject.io" }] diff --git a/cmd/scheduler/cmd/root.go b/cmd/scheduler/cmd/root.go index 5b25c754d44..25f1aec73d0 100644 --- a/cmd/scheduler/cmd/root.go +++ b/cmd/scheduler/cmd/root.go @@ -42,7 +42,7 @@ func loadConfig() (schedulerconfig.Configuration, error) { common.LoadConfig(&config, "./config/scheduler", viper.GetStringSlice(CustomConfigLocation)) err := config.Validate() if err != nil { - commonconfig.LogValidationErrors(err) + return config, commonconfig.FormatValidationErrors(err) } return config, err } diff --git a/internal/binoculars/configuration/validation.go b/internal/binoculars/configuration/validation.go new file mode 100644 index 00000000000..914f40053dd --- /dev/null +++ b/internal/binoculars/configuration/validation.go @@ -0,0 +1,8 @@ +package configuration + +import "github.com/go-playground/validator/v10" + +func (c BinocularsConfig) Validate() error { + validate := validator.New() + return validate.Struct(c) +} diff --git a/internal/common/config/pulsar.go b/internal/common/config/pulsar.go index b2ec8eef984..478ca22d8c5 100644 --- a/internal/common/config/pulsar.go +++ b/internal/common/config/pulsar.go @@ -34,7 +34,7 @@ type PulsarConfig struct { // Maximum allowed message size in bytes MaxAllowedMessageSize uint // Timeout when sending messages asynchronously - SendTimeout time.Duration `validate:"required"` + SendTimeout time.Duration // Backoff from polling when Pulsar returns an error BackoffTime time.Duration // Number of pulsar messages that will be queued by the pulsar consumer. diff --git a/internal/common/config/validation.go b/internal/common/config/validation.go index 8b14cf77d48..55e8abde269 100644 --- a/internal/common/config/validation.go +++ b/internal/common/config/validation.go @@ -1,25 +1,30 @@ package config import ( + "errors" + "fmt" "strings" "github.com/go-playground/validator/v10" - log "github.com/sirupsen/logrus" ) -func LogValidationErrors(err error) { - if err != nil { - for _, err := range err.(validator.ValidationErrors) { - fieldName := stripPrefix(err.Namespace()) - tag := err.Tag() - switch tag { - case "required": - log.Errorf("ConfigError: Field %s is required but was not found", fieldName) - default: - log.Errorf("ConfigError: Field %s has invalid value %s: %s", fieldName, err.Value(), tag) - } +type Config interface { + Validate() error +} + +func FormatValidationErrors(err error) error { + var validationErrors error + for _, err := range err.(validator.ValidationErrors) { + fieldName := stripPrefix(err.Namespace()) + tag := err.Tag() + switch tag { + case "required": + validationErrors = errors.Join(validationErrors, fmt.Errorf("ConfigError: Field %s is required but was not found", fieldName)) + default: + validationErrors = errors.Join(validationErrors, fmt.Errorf("ConfigError: Field %s has invalid value %s: %s", fieldName, err.Value(), tag)) } } + return validationErrors } func stripPrefix(s string) string { diff --git a/internal/common/grpc/configuration/types.go b/internal/common/grpc/configuration/types.go index 967485f348a..2101e873f65 100644 --- a/internal/common/grpc/configuration/types.go +++ b/internal/common/grpc/configuration/types.go @@ -5,7 +5,7 @@ import ( ) type GrpcConfig struct { - Port int `validate:"required"` + Port int KeepaliveParams keepalive.ServerParameters KeepaliveEnforcementPolicy keepalive.EnforcementPolicy Tls TlsConfig diff --git a/internal/common/startup.go b/internal/common/startup.go index e8ee1825ab9..535fb628c5e 100644 --- a/internal/common/startup.go +++ b/internal/common/startup.go @@ -42,7 +42,7 @@ func BindCommandlineArguments() { } // TODO Move code relating to config out of common into a new package internal/serverconfig -func LoadConfig(config any, defaultPath string, overrideConfigs []string) *viper.Viper { +func LoadConfig(config commonconfig.Config, defaultPath string, overrideConfigs []string) *viper.Viper { v := viper.NewWithOptions(viper.KeyDelimiter("::")) v.SetConfigName(baseConfigFileName) v.AddConfigPath(defaultPath) @@ -89,6 +89,11 @@ func LoadConfig(config any, defaultPath string, overrideConfigs []string) *viper log.Debugf("Unset keys: %v", metadata.Unset) } + if err := config.Validate(); err != nil { + log.Error(commonconfig.FormatValidationErrors(err)) + os.Exit(-1) + } + return v } diff --git a/internal/eventingester/configuration/validation.go b/internal/eventingester/configuration/validation.go new file mode 100644 index 00000000000..cc797e3ad4a --- /dev/null +++ b/internal/eventingester/configuration/validation.go @@ -0,0 +1,10 @@ +package configuration + +import ( + "github.com/go-playground/validator/v10" +) + +func (c EventIngesterConfiguration) Validate() error { + validate := validator.New() + return validate.Struct(c) +} diff --git a/internal/executor/configuration/validation.go b/internal/executor/configuration/validation.go new file mode 100644 index 00000000000..186bb9cd6e1 --- /dev/null +++ b/internal/executor/configuration/validation.go @@ -0,0 +1,8 @@ +package configuration + +import "github.com/go-playground/validator/v10" + +func (c ExecutorConfiguration) Validate() error { + validate := validator.New() + return validate.Struct(c) +} diff --git a/internal/lookoutingesterv2/configuration/validation.go b/internal/lookoutingesterv2/configuration/validation.go new file mode 100644 index 00000000000..c78f1df1d19 --- /dev/null +++ b/internal/lookoutingesterv2/configuration/validation.go @@ -0,0 +1,8 @@ +package configuration + +import "github.com/go-playground/validator/v10" + +func (c LookoutIngesterV2Configuration) Validate() error { + validate := validator.New() + return validate.Struct(c) +} diff --git a/internal/lookoutv2/configuration/validation.go b/internal/lookoutv2/configuration/validation.go new file mode 100644 index 00000000000..e9228dbbef2 --- /dev/null +++ b/internal/lookoutv2/configuration/validation.go @@ -0,0 +1,8 @@ +package configuration + +import "github.com/go-playground/validator/v10" + +func (c LookoutV2Config) Validate() error { + validate := validator.New() + return validate.Struct(c) +} diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index ab82d0602c7..e5196d41ef5 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -128,6 +128,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns JobRunIdsToCancel: slices.Map(runsToCancel, func(x string) *armadaevents.Uuid { return armadaevents.MustProtoUuidFromUuidString(x) }), + JobRunIdsToCancelStr: runsToCancel, }, }, }); err != nil { diff --git a/internal/scheduler/api_test.go b/internal/scheduler/api_test.go index a2f93eb82fc..44ff21adfbe 100644 --- a/internal/scheduler/api_test.go +++ b/internal/scheduler/api_test.go @@ -203,7 +203,8 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { expectedMsgs: []*executorapi.LeaseStreamMessage{ { Event: &executorapi.LeaseStreamMessage_CancelRuns{CancelRuns: &executorapi.CancelRuns{ - JobRunIdsToCancel: []*armadaevents.Uuid{armadaevents.MustProtoUuidFromUuidString(runId2)}, + JobRunIdsToCancel: []*armadaevents.Uuid{armadaevents.MustProtoUuidFromUuidString(runId2)}, + JobRunIdsToCancelStr: []string{runId2}, }}, }, { diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index 36c1a0142c6..72f6f9648ec 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -1,10 +1,8 @@ package configuration import ( - "fmt" "time" - "github.com/go-playground/validator/v10" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -59,12 +57,6 @@ type Configuration struct { QueueRefreshPeriod time.Duration `validate:"required"` } -func (c Configuration) Validate() error { - validate := validator.New() - validate.RegisterStructValidation(SchedulingConfigValidation, SchedulingConfig{}) - return validate.Struct(c) -} - type LeaderConfig struct { // Valid modes are "standalone" or "kubernetes" Mode string `validate:"required"` @@ -251,33 +243,6 @@ const ( UnknownWellKnownNodeTypeErrorMessage = "priority class refers to unknown well-known node type" ) -func SchedulingConfigValidation(sl validator.StructLevel) { - c := sl.Current().Interface().(SchedulingConfig) - - wellKnownNodeTypes := make(map[string]bool) - for i, wellKnownNodeType := range c.WellKnownNodeTypes { - if wellKnownNodeTypes[wellKnownNodeType.Name] { - fieldName := fmt.Sprintf("WellKnownNodeTypes[%d].Name", i) - sl.ReportError(wellKnownNodeType.Name, fieldName, "", DuplicateWellKnownNodeTypeErrorMessage, "") - } - wellKnownNodeTypes[wellKnownNodeType.Name] = true - } - - for priorityClassName, priorityClass := range c.PriorityClasses { - if len(priorityClass.AwayNodeTypes) > 0 && !priorityClass.Preemptible { - fieldName := fmt.Sprintf("Preemption.PriorityClasses[%s].Preemptible", priorityClassName) - sl.ReportError(priorityClass.Preemptible, fieldName, "", AwayNodeTypesWithoutPreemptionErrorMessage, "") - } - - for i, awayNodeType := range priorityClass.AwayNodeTypes { - if !wellKnownNodeTypes[awayNodeType.WellKnownNodeTypeName] { - fieldName := fmt.Sprintf("Preemption.PriorityClasses[%s].AwayNodeTypes[%d].WellKnownNodeTypeName", priorityClassName, i) - sl.ReportError(awayNodeType.WellKnownNodeTypeName, fieldName, "", UnknownWellKnownNodeTypeErrorMessage, "") - } - } - } -} - // ResourceType represents a resource the scheduler indexes for efficient lookup. type ResourceType struct { // Resource name, e.g., "cpu", "memory", or "nvidia.com/gpu". diff --git a/internal/scheduler/configuration/validation.go b/internal/scheduler/configuration/validation.go new file mode 100644 index 00000000000..8d11541ae66 --- /dev/null +++ b/internal/scheduler/configuration/validation.go @@ -0,0 +1,40 @@ +package configuration + +import ( + "fmt" + + "github.com/go-playground/validator/v10" +) + +func (c Configuration) Validate() error { + validate := validator.New() + validate.RegisterStructValidation(SchedulingConfigValidation, SchedulingConfig{}) + return validate.Struct(c) +} + +func SchedulingConfigValidation(sl validator.StructLevel) { + c := sl.Current().Interface().(SchedulingConfig) + + wellKnownNodeTypes := make(map[string]bool) + for i, wellKnownNodeType := range c.WellKnownNodeTypes { + if wellKnownNodeTypes[wellKnownNodeType.Name] { + fieldName := fmt.Sprintf("WellKnownNodeTypes[%d].Name", i) + sl.ReportError(wellKnownNodeType.Name, fieldName, "", DuplicateWellKnownNodeTypeErrorMessage, "") + } + wellKnownNodeTypes[wellKnownNodeType.Name] = true + } + + for priorityClassName, priorityClass := range c.PriorityClasses { + if len(priorityClass.AwayNodeTypes) > 0 && !priorityClass.Preemptible { + fieldName := fmt.Sprintf("Preemption.PriorityClasses[%s].Preemptible", priorityClassName) + sl.ReportError(priorityClass.Preemptible, fieldName, "", AwayNodeTypesWithoutPreemptionErrorMessage, "") + } + + for i, awayNodeType := range priorityClass.AwayNodeTypes { + if !wellKnownNodeTypes[awayNodeType.WellKnownNodeTypeName] { + fieldName := fmt.Sprintf("Preemption.PriorityClasses[%s].AwayNodeTypes[%d].WellKnownNodeTypeName", priorityClassName, i) + sl.ReportError(awayNodeType.WellKnownNodeTypeName, fieldName, "", UnknownWellKnownNodeTypeErrorMessage, "") + } + } + } +} diff --git a/internal/scheduler/configuration/configuration_test.go b/internal/scheduler/configuration/validation_test.go similarity index 100% rename from internal/scheduler/configuration/configuration_test.go rename to internal/scheduler/configuration/validation_test.go diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 2e58291fb96..176b3ec733f 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -53,8 +53,6 @@ type FairSchedulingAlgo struct { limiterByQueue map[string]*rate.Limiter // Max amount of time each scheduling round is allowed to take. maxSchedulingDuration time.Duration - // Pools that need to be scheduled in sorted order - poolsToSchedule []string clock clock.Clock resourceListFactory *internaltypes.ResourceListFactory floatingResourceTypes *floatingresources.FloatingResourceTypes @@ -117,14 +115,11 @@ func (l *FairSchedulingAlgo) Schedule( return nil, err } - if len(l.poolsToSchedule) == 0 { - // Cycle over groups in a consistent order. - l.poolsToSchedule = maps.Keys(fsctx.nodesByPoolAndExecutor) - sortGroups(l.poolsToSchedule, l.schedulingConfig.PoolSchedulePriority, l.schedulingConfig.DefaultPoolSchedulePriority) - } + pools := maps.Keys(fsctx.nodesByPoolAndExecutor) + sortGroups(pools, l.schedulingConfig.PoolSchedulePriority, l.schedulingConfig.DefaultPoolSchedulePriority) - ctx.Infof("Looping over pools %s", strings.Join(l.poolsToSchedule, " ")) - for len(l.poolsToSchedule) > 0 { + ctx.Infof("Looping over pools %s", strings.Join(pools, " ")) + for _, pool := range pools { select { case <-ctx.Done(): // We've reached the scheduling time limit; exit gracefully. @@ -132,8 +127,6 @@ func (l *FairSchedulingAlgo) Schedule( return overallSchedulerResult, nil default: } - pool := armadaslices.Pop(&l.poolsToSchedule) - nodeCountForPool := 0 for _, executor := range fsctx.executors { nodeCountForPool += len(fsctx.nodesByPoolAndExecutor[pool][executor.Id]) @@ -162,11 +155,8 @@ func (l *FairSchedulingAlgo) Schedule( err, ) - if err == context.DeadlineExceeded { + if errors.Is(err, context.DeadlineExceeded) { // We've reached the scheduling time limit; - // add the executorGroupLabel back to l.poolsToSchedule such that we try it again next time, - // and exit gracefully. - l.poolsToSchedule = append(l.poolsToSchedule, pool) ctx.Info("stopped scheduling early as we have hit the maximum scheduling duration") break } else if err != nil { diff --git a/internal/scheduleringester/config.go b/internal/scheduleringester/config.go index fa38326e701..4ec84f5fb69 100644 --- a/internal/scheduleringester/config.go +++ b/internal/scheduleringester/config.go @@ -3,6 +3,8 @@ package scheduleringester import ( "time" + "github.com/go-playground/validator/v10" + commonconfig "github.com/armadaproject/armada/internal/common/config" profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration" "github.com/armadaproject/armada/internal/server/configuration" @@ -24,3 +26,8 @@ type Configuration struct { // If non-nil, configures pprof profiling Profiling *profilingconfig.ProfilingConfig } + +func (c Configuration) Validate() error { + validate := validator.New() + return validate.Struct(c) +} diff --git a/internal/server/configuration/validation.go b/internal/server/configuration/validation.go new file mode 100644 index 00000000000..cdfd46513c6 --- /dev/null +++ b/internal/server/configuration/validation.go @@ -0,0 +1,8 @@ +package configuration + +import "github.com/go-playground/validator/v10" + +func (c ArmadaConfig) Validate() error { + validate := validator.New() + return validate.Struct(c) +}