Skip to content

Commit

Permalink
Merge branch 'master' into f/chrisma/metrics-new
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 committed Aug 24, 2024
2 parents 6a35699 + b006b7b commit 5a36f7b
Show file tree
Hide file tree
Showing 37 changed files with 1,970 additions and 907 deletions.
14 changes: 12 additions & 2 deletions cmd/armadactl/cmd/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ Job priority is evaluated inside queue, queue has its own priority. Any labels
return fmt.Errorf("error reading queue labels: %s", err)
}

labelsAsMap, err := labelSliceAsMap(labels)
if err != nil {
return fmt.Errorf("error converting queue labels to map: %s", err)
}

newQueue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
Cordoned: cordoned,
Labels: labels,
Labels: labelsAsMap,
})
if err != nil {
return fmt.Errorf("invalid queue data: %s", err)
Expand Down Expand Up @@ -220,13 +225,18 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
return fmt.Errorf("error reading queue labels: %s", err)
}

labelsAsMap, err := labelSliceAsMap(labels)
if err != nil {
return fmt.Errorf("error converting queue labels to map: %s", err)
}

newQueue, err := queue.NewQueue(&api.Queue{
Name: name,
PriorityFactor: priorityFactor,
UserOwners: owners,
GroupOwners: groups,
Cordoned: cordoned,
Labels: labels,
Labels: labelsAsMap,
})
if err != nil {
return fmt.Errorf("invalid queue data: %s", err)
Expand Down
17 changes: 16 additions & 1 deletion cmd/armadactl/cmd/utils.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
package cmd

import "fmt"
import (
"fmt"
"strings"
)

func queueNameValidation(queueName string) error {
if queueName == "" {
return fmt.Errorf("cannot provide empty queue name")
}
return nil
}

func labelSliceAsMap(labels []string) (map[string]string, error) {
mapToReturn := make(map[string]string)
for _, label := range labels {
splitLabel := strings.Split(label, "=")
if len(splitLabel) != 2 {
return nil, fmt.Errorf("invalid label: %s", label)
}
mapToReturn[splitLabel[0]] = splitLabel[1]
}
return mapToReturn, nil
}
124 changes: 100 additions & 24 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This class provides integration with Airflow and Armada
## armada.operators.armada module


### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, \*\*kwargs)
### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, \*\*kwargs)
Bases: `BaseOperator`, `LoggingMixin`

An Airflow operator that manages Job submission to Armada.
Expand All @@ -33,7 +33,7 @@ and handles job cancellation if the Airflow task is killed.
* **armada_queue** (*str*) –


* **job_request** (*JobSubmitRequestItem*) –
* **job_request** (*JobSubmitRequestItem** | **Callable**[**[**Context**, **jinja2.Environment**]**, **JobSubmitRequestItem**]*) –


* **job_set_prefix** (*Optional**[**str**]*) –
Expand All @@ -57,8 +57,9 @@ and handles job cancellation if the Airflow task is killed.
* **job_acknowledgement_timeout** (*int*) –


* **dry_run** (*bool*) –


#### _property_ client(_: ArmadaClien_ )

#### execute(context)
Submits the job to Armada and polls for completion.
Expand All @@ -76,6 +77,10 @@ Submits the job to Armada and polls for completion.



#### _property_ hook(_: ArmadaHoo_ )

#### lookout_url(job_id)

#### on_kill()
Override this method to clean up subprocesses when a task instance gets killed.

Expand All @@ -89,6 +94,8 @@ operator needs to be cleaned up, or it will leave ghost processes behind.



#### operator_extra_links(_: Collection[BaseOperatorLink_ _ = (LookoutLink(),_ )

#### _property_ pod_manager(_: KubernetesPodLogManage_ )

#### render_template_fields(context, jinja_env=None)
Expand Down Expand Up @@ -117,6 +124,8 @@ Args:


#### template_fields(_: Sequence[str_ _ = ('job_request', 'job_set_prefix'_ )

#### template_fields_renderers(_: Dict[str, str_ _ = {'job_request': 'py'_ )
Initializes a new ArmadaOperator.


Expand All @@ -132,7 +141,7 @@ Initializes a new ArmadaOperator.
* **armada_queue** (*str*) – The name of the Armada queue to which the job will be submitted.


* **job_request** (*JobSubmitRequestItem*) – The job to be submitted to Armada.
* **job_request** (*JobSubmitRequestItem** | **Callable**[**[**Context**, **jinja2.Environment**]**, **JobSubmitRequestItem**]*) – The job to be submitted to Armada.


* **job_set_prefix** (*Optional**[**str**]*) – A string to prepend to the jobSet name.
Expand All @@ -156,10 +165,39 @@ for asynchronous execution.
:param job_acknowledgement_timeout: The timeout in seconds to wait for a job to be
acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param kwargs: Additional keyword arguments to pass to the BaseOperator.


### armada.operators.armada.log_exceptions(method)
### _class_ armada.operators.armada.LookoutLink()
Bases: `BaseOperatorLink`


#### get_link(operator, \*, ti_key)
Link to external system.

Note: The old signature of this function was `(self, operator, dttm: datetime)`. That is still
supported at runtime but is deprecated.


* **Parameters**


* **operator** (*BaseOperator*) – The Airflow operator object this link is associated to.


* **ti_key** (*TaskInstanceKey*) – TaskInstance ID to return link for.



* **Returns**

link to external system



#### name(_ = 'Lookout_ )
## armada.triggers.armada module

## armada.auth module
Expand All @@ -176,18 +214,10 @@ Bases: `Protocol`
str



#### serialize()

* **Return type**

*Tuple*[str, *Dict*[str, *Any*]]


## armada.model module


### _class_ armada.model.GrpcChannelArgs(target, options=None, compression=None, auth=None, auth_details=None)
### _class_ armada.model.GrpcChannelArgs(target, options=None, compression=None, auth=None)
Bases: `object`


Expand All @@ -197,32 +227,31 @@ Bases: `object`
* **target** (*str*) –


* **options** (*Sequence**[**Tuple**[**str**, **Any**]**] **| **None*) –
* **options** (*Optional**[**Sequence**[**Tuple**[**str**, **Any**]**]**]*) –


* **compression** (*Compression** | **None*) –
* **compression** (*Optional**[**grpc.Compression**]*) –


* **auth** (*AuthMetadataPlugin** | **None*) –
* **auth** (*Optional**[**grpc.AuthMetadataPlugin**]*) –


* **auth_details** (*Dict**[**str**, **Any**] **| **None*) –

#### _static_ deserialize(data, version)

* **Parameters**

#### aio_channel()

* **Return type**

* **data** (*dict**[**str**, **Any**]*) –

*Channel*

* **version** (*int*) –


#### channel()

* **Return type**

*Channel*
*GrpcChannelArgs*



Expand All @@ -231,3 +260,50 @@ Bases: `object`
* **Return type**

*Dict*[str, *Any*]



### _class_ armada.model.RunningJobContext(armada_queue: 'str', job_id: 'str', job_set_id: 'str', submit_time: 'DateTime', cluster: 'Optional[str]' = None, last_log_time: 'Optional[DateTime]' = None, job_state: 'str' = 'UNKNOWN')
Bases: `object`


* **Parameters**


* **armada_queue** (*str*) –


* **job_id** (*str*) –


* **job_set_id** (*str*) –


* **submit_time** (*DateTime*) –


* **cluster** (*str** | **None*) –


* **last_log_time** (*DateTime** | **None*) –


* **job_state** (*str*) –



#### armada_queue(_: st_ )

#### cluster(_: str | Non_ _ = Non_ )

#### job_id(_: st_ )

#### job_set_id(_: st_ )

#### job_state(_: st_ _ = 'UNKNOWN_ )

#### last_log_time(_: DateTime | Non_ _ = Non_ )

#### _property_ state(_: JobStat_ )

#### submit_time(_: DateTim_ )
29 changes: 19 additions & 10 deletions internal/armadactl/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ import (
"github.com/armadaproject/armada/pkg/client/util"
)

// QueueQueryArgs is used for retrieving queues or for cordoning/uncordoning
type QueueQueryArgs struct {
InQueueNames []string
// Filter for queues where the InQueueNames slice contains the queue name
InQueueNames []string
// Filter for queues where the queue contains all labels specified in the ContainsAllLabels slice
ContainsAllLabels []string
InvertResult bool
OnlyCordoned bool
// Filter for cordoned queues only
OnlyCordoned bool
// Applies the above filters and inverts the result
InvertResult bool
}

// CreateQueue calls app.QueueAPI.Create with the provided parameters.
Expand Down Expand Up @@ -83,18 +88,22 @@ func (a *App) GetQueue(name string) error {
func (a *App) getAllQueuesAsAPIQueue(args *QueueQueryArgs) ([]*api.Queue, error) {
queueFilters := func(q *api.Queue) bool {
containsAllLabels := slices.AllFunc(args.ContainsAllLabels, func(label string) bool {
// If the label is a key, map the labels slice to only keys
labelsToCompare := q.Labels
if len(strings.Split(label, "=")) == 1 {
labelsToCompare = slices.Map(q.Labels, func(queueLabel string) string { return strings.Split(queueLabel, "=")[0] })
splitLabel := strings.Split(label, "=")
if len(splitLabel) >= 2 {
queueLabelValue, ok := q.Labels[splitLabel[0]]
return ok && queueLabelValue == strings.Join(splitLabel[1:], "")
} else if len(splitLabel) == 1 {
// If the label is a key, we compare on keys
_, ok := q.Labels[splitLabel[0]]
return ok
}

return goslices.Contains(labelsToCompare, label)
return false
})
inQueues := len(args.InQueueNames) == 0 || goslices.Contains(args.InQueueNames, q.Name)
invertedResult := args.InvertResult != (containsAllLabels && inQueues)
matchesLabelsAndQueues := containsAllLabels && inQueues
onlyCordonedCheck := (args.OnlyCordoned && q.Cordoned) || !args.OnlyCordoned
return invertedResult && onlyCordonedCheck
return args.InvertResult != (matchesLabelsAndQueues && onlyCordonedCheck)
}
queuesToReturn, err := a.Params.QueueAPI.GetAll()
if err != nil {
Expand Down
Loading

0 comments on commit 5a36f7b

Please sign in to comment.