Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 committed Aug 24, 2024
2 parents 61c7372 + 523771f commit 7f18720
Show file tree
Hide file tree
Showing 45 changed files with 2,162 additions and 1,804 deletions.
17 changes: 1 addition & 16 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,13 @@ queueRefreshPeriod: 10s
disableSubmitCheck: false
metrics:
port: 9000
jobStateMetricsResetInterval: 12h
refreshInterval: 30s
metrics:
scheduleCycleTimeHistogramSettings:
start: 10.0
factor: 1.1
count: 110
reconcileCycleTimeHistogramSettings:
start: 10.0
factor: 1.1
count: 110
schedulerMetrics:
trackedResourceNames:
- "cpu"
- "memory"
- "ephemeral-storage"
- "nvidia.com/gpu"
resourceRenaming:
nvidia.com/gpu: "gpu"
amd.com/gpu: "gpu"
ephemeral-storage: "ephemeralStorage"
matchedRegexIndexByErrorMessageCacheSize: 100
resetInterval: "1h"
pulsar:
URL: "pulsar://pulsar:6650"
jobsetEventsTopic: "events"
Expand Down
124 changes: 100 additions & 24 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This class provides integration with Airflow and Armada
## armada.operators.armada module


### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, \*\*kwargs)
### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, \*\*kwargs)
Bases: `BaseOperator`, `LoggingMixin`

An Airflow operator that manages Job submission to Armada.
Expand All @@ -33,7 +33,7 @@ and handles job cancellation if the Airflow task is killed.
* **armada_queue** (*str*) –


* **job_request** (*JobSubmitRequestItem*) –
* **job_request** (*JobSubmitRequestItem** | **Callable**[**[**Context**, **jinja2.Environment**]**, **JobSubmitRequestItem**]*) –


* **job_set_prefix** (*Optional**[**str**]*) –
Expand All @@ -57,8 +57,9 @@ and handles job cancellation if the Airflow task is killed.
* **job_acknowledgement_timeout** (*int*) –


* **dry_run** (*bool*) –


#### _property_ client(_: ArmadaClien_ )

#### execute(context)
Submits the job to Armada and polls for completion.
Expand All @@ -76,6 +77,10 @@ Submits the job to Armada and polls for completion.



#### _property_ hook(_: ArmadaHoo_ )

#### lookout_url(job_id)

#### on_kill()
Override this method to clean up subprocesses when a task instance gets killed.

Expand All @@ -89,6 +94,8 @@ operator needs to be cleaned up, or it will leave ghost processes behind.



#### operator_extra_links(_: Collection[BaseOperatorLink_ _ = (LookoutLink(),_ )

#### _property_ pod_manager(_: KubernetesPodLogManage_ )

#### render_template_fields(context, jinja_env=None)
Expand Down Expand Up @@ -117,6 +124,8 @@ Args:


#### template_fields(_: Sequence[str_ _ = ('job_request', 'job_set_prefix'_ )

#### template_fields_renderers(_: Dict[str, str_ _ = {'job_request': 'py'_ )
Initializes a new ArmadaOperator.


Expand All @@ -132,7 +141,7 @@ Initializes a new ArmadaOperator.
* **armada_queue** (*str*) – The name of the Armada queue to which the job will be submitted.


* **job_request** (*JobSubmitRequestItem*) – The job to be submitted to Armada.
* **job_request** (*JobSubmitRequestItem** | **Callable**[**[**Context**, **jinja2.Environment**]**, **JobSubmitRequestItem**]*) – The job to be submitted to Armada.


* **job_set_prefix** (*Optional**[**str**]*) – A string to prepend to the jobSet name.
Expand All @@ -156,10 +165,39 @@ for asynchronous execution.
:param job_acknowledgement_timeout: The timeout in seconds to wait for a job to be
acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param kwargs: Additional keyword arguments to pass to the BaseOperator.


### armada.operators.armada.log_exceptions(method)
### _class_ armada.operators.armada.LookoutLink()
Bases: `BaseOperatorLink`


#### get_link(operator, \*, ti_key)
Link to external system.

Note: The old signature of this function was `(self, operator, dttm: datetime)`. That is still
supported at runtime but is deprecated.


* **Parameters**


* **operator** (*BaseOperator*) – The Airflow operator object this link is associated to.


* **ti_key** (*TaskInstanceKey*) – TaskInstance ID to return link for.



* **Returns**

link to external system



#### name(_ = 'Lookout_ )
## armada.triggers.armada module

## armada.auth module
Expand All @@ -176,18 +214,10 @@ Bases: `Protocol`
str



#### serialize()

* **Return type**

*Tuple*[str, *Dict*[str, *Any*]]


## armada.model module


### _class_ armada.model.GrpcChannelArgs(target, options=None, compression=None, auth=None, auth_details=None)
### _class_ armada.model.GrpcChannelArgs(target, options=None, compression=None, auth=None)
Bases: `object`


Expand All @@ -197,32 +227,31 @@ Bases: `object`
* **target** (*str*) –


* **options** (*Sequence**[**Tuple**[**str**, **Any**]**] **| **None*) –
* **options** (*Optional**[**Sequence**[**Tuple**[**str**, **Any**]**]**]*) –


* **compression** (*Compression** | **None*) –
* **compression** (*Optional**[**grpc.Compression**]*) –


* **auth** (*AuthMetadataPlugin** | **None*) –
* **auth** (*Optional**[**grpc.AuthMetadataPlugin**]*) –


* **auth_details** (*Dict**[**str**, **Any**] **| **None*) –

#### _static_ deserialize(data, version)

* **Parameters**

#### aio_channel()

* **Return type**

* **data** (*dict**[**str**, **Any**]*) –

*Channel*

* **version** (*int*) –


#### channel()

* **Return type**

*Channel*
*GrpcChannelArgs*



Expand All @@ -231,3 +260,50 @@ Bases: `object`
* **Return type**

*Dict*[str, *Any*]



### _class_ armada.model.RunningJobContext(armada_queue: 'str', job_id: 'str', job_set_id: 'str', submit_time: 'DateTime', cluster: 'Optional[str]' = None, last_log_time: 'Optional[DateTime]' = None, job_state: 'str' = 'UNKNOWN')
Bases: `object`


* **Parameters**


* **armada_queue** (*str*) –


* **job_id** (*str*) –


* **job_set_id** (*str*) –


* **submit_time** (*DateTime*) –


* **cluster** (*str** | **None*) –


* **last_log_time** (*DateTime** | **None*) –


* **job_state** (*str*) –



#### armada_queue(_: st_ )

#### cluster(_: str | Non_ _ = Non_ )

#### job_id(_: st_ )

#### job_set_id(_: st_ )

#### job_state(_: st_ _ = 'UNKNOWN_ )

#### last_log_time(_: DateTime | Non_ _ = Non_ )

#### _property_ state(_: JobStat_ )

#### submit_time(_: DateTim_ )
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/go-openapi/spec v0.20.14
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.4
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand All @@ -35,7 +35,7 @@ require (
github.com/oklog/ulid v1.3.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/client_golang v1.19.1
github.com/renstrom/shortuuid v3.0.0+incompatible
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -78,7 +78,7 @@ require (
github.com/magefile/mage v1.14.0
github.com/minio/highwayhash v1.0.2
github.com/openconfig/goyang v1.2.0
github.com/prometheus/common v0.45.0
github.com/prometheus/common v0.48.0
github.com/redis/go-redis/extra/redisprometheus/v9 v9.0.5
github.com/redis/go-redis/v9 v9.5.1
github.com/segmentio/fasthash v1.0.3
Expand Down Expand Up @@ -161,7 +161,6 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/microcosm-cc/bluemonday v1.0.25 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
13 changes: 6 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -372,8 +373,6 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-zglob v0.0.4 h1:LQi2iOm0/fGgu80AioIJ/1j9w9Oh+9DZ39J4VAGzHQM=
github.com/mattn/go-zglob v0.0.4/go.mod h1:MxxjyoXXnMxfIpxTK2GAkw1w8glPsQILx3N5wrKakiY=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/microcosm-cc/bluemonday v1.0.25 h1:4NEwSfiJ+Wva0VxN5B8OwMicaJvD8r9tlJWm9rtloEg=
github.com/microcosm-cc/bluemonday v1.0.25/go.mod h1:ZIOjCQp1OrzBBPIJmfX4qDYFuhU02nx4bn030ixfHLE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
Expand Down Expand Up @@ -437,13 +436,13 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pquerna/cachecontrol v0.1.0 h1:yJMy84ti9h/+OEWa752kBTKv4XC30OtVVHYv/8cTqKc=
github.com/pquerna/cachecontrol v0.1.0/go.mod h1:NrUG3Z7Rdu85UNR3vm7SOsl1nFIeSiQnrHV5K9mBcUI=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/redis/go-redis/extra/redisprometheus/v9 v9.0.5 h1:kvl0LOTQD23VR1R7A9vDti9msfV6mOE2+j6ngYkFsfg=
Expand Down
47 changes: 11 additions & 36 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ type Configuration struct {
// Configuration controlling leader election
Leader LeaderConfig
// Configuration controlling metrics
Metrics LegacyMetricsConfig
// Configuration for new scheduler metrics.
// Due to replace metrics configured via the above entry.
SchedulerMetrics MetricsConfig
Metrics MetricsConfig
// Scheduler configuration (this is shared with the old scheduler)
Scheduling SchedulingConfig
Auth authconfig.AuthConfig
Expand Down Expand Up @@ -69,28 +66,6 @@ func (c Configuration) Validate() error {
return validate.Struct(c)
}

type MetricsConfig struct {
// If true, disable metric collection and publishing.
Disabled bool
// Regexes used for job error categorisation.
// Specifically, the subCategory label for job failure counters is the first regex that matches the job error.
// If no regex matches, the subCategory label is the empty string.
TrackedErrorRegexes []string
// Metrics are exported for these resources.
TrackedResourceNames []v1.ResourceName
// Optionally rename resources in exported metrics.
// E.g., if ResourceRenaming["nvidia.com/gpu"] = "gpu", then metrics for resource "nvidia.com/gpu" use resource name "gpu" instead.
// This can be used to avoid illegal Prometheus metric names (e.g., for "nvidia.com/gpu" as "/" is not allowed).
// Allowed characters in resource names are [a-zA-Z_:][a-zA-Z0-9_:]*
// It can also be used to track multiple resources within the same metric, e.g., "nvidia.com/gpu" and "amd.com/gpu".
ResourceRenaming map[v1.ResourceName]string
// The first matching regex of each error message is cached in an LRU cache.
// This setting controls the cache size.
MatchedRegexIndexByErrorMessageCacheSize uint64
// Reset metrics this often. Resetting periodically ensures inactive time series are garbage-collected.
ResetInterval time.Duration
}

type LeaderConfig struct {
// Valid modes are "standalone" or "kubernetes"
Mode string `validate:"required"`
Expand Down Expand Up @@ -128,16 +103,16 @@ type HttpConfig struct {
Port int `validate:"required"`
}

// TODO: ALl this needs to be unified with MetricsConfig
type LegacyMetricsConfig struct {
Port uint16
RefreshInterval time.Duration
Metrics SchedulerMetricsConfig
}

type SchedulerMetricsConfig struct {
ScheduleCycleTimeHistogramSettings HistogramConfig
ReconcileCycleTimeHistogramSettings HistogramConfig
type MetricsConfig struct {
Port uint16
RefreshInterval time.Duration
JobStateMetricsResetInterval time.Duration
// Regexes used for job error categorisation.
// Specifically, the subCategory label for job failure counters is the first regex that matches the job error.
// If no regex matches, the subCategory label is the empty string.
TrackedErrorRegexes []string
// Metrics are exported for these resources.
TrackedResourceNames []v1.ResourceName
}

type HistogramConfig struct {
Expand Down
Loading

0 comments on commit 7f18720

Please sign in to comment.