Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add min gang cardinality #2984

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/autoupdate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- uses: docker://chinthakagodawita/autoupdate-action:v1
env:
GITHUB_TOKEN: '${{ secrets.GITHUB_TOKEN }}'
PR_LABELS: "auto-update"
PR_LABELS: "auto-update"
MERGE_MSG: "Branch was auto-updated."
RETRY_COUNT: "5"
RETRY_SLEEP: "300"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ name: CI

on:
push:
branches:
- master
tags:
- v*
branches-ignore:
- gh-pages
pull_request:
branches-ignore:
- gh-pages
Expand Down
47 changes: 0 additions & 47 deletions .github/workflows/not-airflow-operator.yml

This file was deleted.

42 changes: 0 additions & 42 deletions .github/workflows/not-python-client.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ jobs:

echo -e "### Git status" >> $GITHUB_STEP_SUMMARY
if [[ "$changed" -gt 0 ]]; then
echo -e "Generated proto files are out of date. Please run 'make proto' and commit the changes." >> $GITHUB_STEP_SUMMARY
echo -e "Generated proto files are out of date. Please run 'mage proto' and commit the changes." >> $GITHUB_STEP_SUMMARY

git status -s -uno >> $GITHUB_STEP_SUMMARY

Expand Down
2 changes: 1 addition & 1 deletion client/python/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ workflow for contributing. First time contributors can follow the guide below to
Unlike most python projects, the Armada python client contains a large quantity of generated code. This code must be
generated in order to compile and develop against the client.

From the root of the repository, run `make python`. This will generate python code needed to build
From the root of the repository, run `mage buildPython`. This will generate python code needed to build
and use the client. This command needs to be re-run anytime an API change is committed (e.g. a change to a `*.proto`
file).

Expand Down
2 changes: 1 addition & 1 deletion client/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ Before beginning, ensure you have:
- Network access to fetch docker images and go dependencies.

To generate all needed code, and install the python client:
1) From the root of the repository, run `make python`
1) From the root of the repository, run `mage buildPython`
2) Install the client using `pip install client/python`. It's strongly recommended you do this inside a virtualenv.
4 changes: 2 additions & 2 deletions client/python/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ Usage

Easy way:
- Ensure all protobufs files needed for the client are generated by running
`make python` from the repository root.
`mage buildPython` from the repository root.
- `tox -e docs` will create a valid virtual environment and use it to generate
documentation. The generated files will be placed under `build/jekyll/*.md`.

Manual way:
- Ensure all protobufs files needed for the client are generated by running
`make python` from the repository root.
`mage buildPython` from the repository root.
- Create a virtual environment containing all the deps listed in `tox.ini`
under `[testenv:docs]`.
- Run `poetry install -v` from inside `client/python` to install the client
Expand Down
4 changes: 2 additions & 2 deletions cmd/scheduler/cmd/prune_database.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package cmd

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/database"
schedulerdb "github.com/armadaproject/armada/internal/scheduler/database"
)
Expand Down Expand Up @@ -57,7 +57,7 @@ func pruneDatabase(cmd *cobra.Command, _ []string) error {
return errors.WithMessagef(err, "Failed to connect to database")
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), timeout)
defer cancel()
return schedulerdb.PruneDb(ctx, db, batchSize, expireAfter, clock.RealClock{})
}
2 changes: 1 addition & 1 deletion docs/developer/manual-localdev.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mage BootstrapTools
# Compile .pb.go files from .proto files
# (only necessary after changing a .proto file).
mage proto
make dotnet
mage dotnet

# Build the Docker images containing all Armada components.
# Only the main "bundle" is needed for quickly testing Armada.
Expand Down
30 changes: 27 additions & 3 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,27 @@ Reports the result of the job and returns.



#### serialize()
Get a serialized version of this object.


* **Returns**

A dict of keyword arguments used when instantiating



* **Return type**

dict


this object.


#### template_fields(_: Sequence[str_ _ = ('job_request_items',_ )

### _class_ armada.operators.armada_deferrable.ArmadaJobCompleteTrigger(job_id, job_service_channel_args, armada_queue, job_set_id, airflow_task_name)
### _class_ armada.operators.armada_deferrable.ArmadaJobCompleteTrigger(job_id, job_service_channel_args, armada_queue, job_set_id, airflow_task_name, poll_interval=30)
Bases: `BaseTrigger`

An airflow trigger that monitors the job state of an armada job.
Expand Down Expand Up @@ -269,6 +287,9 @@ Triggers when the job is complete.
belongs.


* **poll_interval** (*int*) – How often to poll jobservice to get status.



* **Returns**

Expand All @@ -281,7 +302,7 @@ Runs the trigger. Meant to be called by an airflow triggerer process.


#### serialize()
Returns the information needed to reconstruct this Trigger.
Return the information needed to reconstruct this Trigger.


* **Returns**
Expand Down Expand Up @@ -664,7 +685,7 @@ A terminated event is SUCCEEDED, FAILED or CANCELLED



### _async_ armada.operators.utils.search_for_job_complete_async(armada_queue, job_set_id, airflow_task_name, job_id, job_service_client, log, time_out_for_failure=7200)
### _async_ armada.operators.utils.search_for_job_complete_async(armada_queue, job_set_id, airflow_task_name, job_id, job_service_client, log, poll_interval, time_out_for_failure=7200)
Poll JobService cache asyncronously until you get a terminated event.

A terminated event is SUCCEEDED, FAILED or CANCELLED
Expand All @@ -689,6 +710,9 @@ A terminated event is SUCCEEDED, FAILED or CANCELLED
It is optional only for testing


* **poll_interval** (*int*) – How often to poll jobservice to get status.


* **time_out_for_failure** (*int*) – The amount of time a job
can be in job_id_not_found
before we decide it was a invalid job
Expand Down
12 changes: 6 additions & 6 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
lastSeen,
)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf(
logging.WithStacktrace(ctx, err).Warnf(
"skipping node %s from executor %s", nodeInfo.GetName(), req.GetClusterId(),
)
continue
Expand Down Expand Up @@ -566,7 +566,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
if q.SchedulingContextRepository != nil {
sctx.ClearJobSpecs()
if err := q.SchedulingContextRepository.AddSchedulingContext(sctx); err != nil {
logging.WithStacktrace(ctx.Log, err).Error("failed to store scheduling context")
logging.WithStacktrace(ctx, err).Error("failed to store scheduling context")
}
}

Expand Down Expand Up @@ -641,7 +641,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
jobIdsToDelete := util.Map(jobsToDelete, func(job *api.Job) string { return job.Id })
log.Infof("deleting preempted jobs: %v", jobIdsToDelete)
if deletionResult, err := q.jobRepository.DeleteJobs(jobsToDelete); err != nil {
logging.WithStacktrace(ctx.Log, err).Error("failed to delete preempted jobs from Redis")
logging.WithStacktrace(ctx, err).Error("failed to delete preempted jobs from Redis")
} else {
deleteErrorByJobId := armadamaps.MapKeys(deletionResult, func(job *api.Job) string { return job.Id })
for jobId := range preemptedApiJobsById {
Expand Down Expand Up @@ -704,7 +704,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
}
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, currentExecutorReport); err != nil {
logging.WithStacktrace(ctx.Log, err).Errorf("failed to update cluster usage")
logging.WithStacktrace(ctx, err).Errorf("failed to update cluster usage")
}

allocatedByQueueAndPriorityClassForPool = q.aggregateAllocationAcrossExecutor(reportsByExecutor, req.Pool)
Expand All @@ -728,7 +728,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
node, err := nodeDb.GetNode(nodeId)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId)
logging.WithStacktrace(ctx, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId)
continue
}
v := node.Labels[q.schedulingConfig.Preemption.NodeIdLabel]
Expand Down Expand Up @@ -764,7 +764,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
node, err := nodeDb.GetNode(nodeId)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId)
logging.WithStacktrace(ctx, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId)
continue
}
podSpec.NodeName = node.Name
Expand Down
8 changes: 4 additions & 4 deletions internal/armada/server/submit_from_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error {
sequence, err := eventutil.UnmarshalEventSequence(ctxWithLogger, msg.Payload())
if err != nil {
srv.ack(ctx, msg)
logging.WithStacktrace(ctxWithLogger.Log, err).Warnf("processing message failed; ignoring")
logging.WithStacktrace(ctxWithLogger, err).Warnf("processing message failed; ignoring")
numErrored++
break
}

ctxWithLogger.Log.WithField("numEvents", len(sequence.Events)).Info("processing sequence")
ctxWithLogger.WithField("numEvents", len(sequence.Events)).Info("processing sequence")
// TODO: Improve retry logic.
srv.ProcessSequence(ctxWithLogger, sequence)
srv.ack(ctx, msg)
Expand All @@ -155,11 +155,11 @@ func (srv *SubmitFromLog) ProcessSequence(ctx *armadacontext.Context, sequence *
for i < len(sequence.Events) && time.Since(lastProgress) < timeout {
j, err := srv.ProcessSubSequence(ctx, i, sequence)
if err != nil {
logging.WithStacktrace(ctx.Log, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring")
logging.WithStacktrace(ctx, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring")
}

if j == i {
ctx.Log.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress")
ctx.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress")

// We should only get here if a transient error occurs.
// Sleep for a bit before retrying.
Expand Down
Loading
Loading