Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on_kill() operator added. #2461

Closed
wants to merge 36 commits into from
Closed
Changes from 9 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
29e75af
on_kill() operator added.
sarthaksarthak9 May 12, 2023
6e66710
job_cancel added
sarthaksarthak9 May 13, 2023
e1c3476
removed openai and use cancel_job from client.py
sarthaksarthak9 May 13, 2023
7e8c1a9
Merge branch 'master' into airflow1
sarthaksarthak9 May 15, 2023
a795d98
add the cancelling of jobs in the on_kill function
sarthaksarthak9 May 20, 2023
32b6bcc
cancel the Jobset
sarthaksarthak9 May 21, 2023
c5fc912
Merge branch 'armadaproject:master' into airflow1
sarthaksarthak9 May 22, 2023
930c3c7
update the changes.
sarthaksarthak9 May 22, 2023
09989ed
Merge branch 'airflow1' of https://github.com/sarthaksarthak9/armada …
sarthaksarthak9 May 22, 2023
a04b5b7
remove newlines.
sarthaksarthak9 May 22, 2023
ec295f2
add self.queue
sarthaksarthak9 Jun 2, 2023
14ada00
add the docs of kill()
sarthaksarthak9 Jun 2, 2023
d533445
reformatted
sarthaksarthak9 Jun 3, 2023
7508265
Merge branch 'master' into airflow1
sarthaksarthak9 Jun 3, 2023
df23245
unit test added
sarthaksarthak9 Jul 17, 2023
59d72b7
Merge branch 'master' into airflow1
sarthaksarthak9 Jul 17, 2023
d02e84e
file formatted
sarthaksarthak9 Jul 18, 2023
6d936e2
undo
sarthaksarthak9 Jul 19, 2023
ea76ce4
redo
sarthaksarthak9 Jul 19, 2023
cbd0749
updated
sarthaksarthak9 Jul 24, 2023
6369ee7
update
sarthaksarthak9 Jul 25, 2023
d824296
Merge branch 'master' into airflow1
sarthaksarthak9 Jul 25, 2023
9c848d1
tox -e docs-check
sarthaksarthak9 Jul 27, 2023
7b9c6a5
tox -e docs-check
sarthaksarthak9 Jul 27, 2023
16afcc0
re-check
sarthaksarthak9 Jul 28, 2023
3943a6c
Merge branch 'master' into airflow1
kannon92 Jul 28, 2023
e669c6f
Merge branch 'master' into airflow1
sarthaksarthak9 Aug 23, 2023
b194bdc
Merge branch 'master' into airflow1
sarthaksarthak9 Aug 30, 2023
784bf28
updated
sarthaksarthak9 Sep 16, 2023
b46ae28
run tox -e formate-code to remove errors
sarthaksarthak9 Sep 16, 2023
954d65b
Merge branch 'master' into airflow1
Sharpz7 Sep 20, 2023
60fe261
Merge branch 'master' into airflow1
Sharpz7 Sep 20, 2023
4649b5f
Merge branch 'master' into airflow1
Sharpz7 Sep 26, 2023
ded1de8
Merge branch 'master' into airflow1
Sharpz7 Oct 3, 2023
7fb86c7
Signed-off-by: sarthaksarthak9 <sarthaknegi908@gmail.com>
sarthaksarthak9 Oct 5, 2023
7c4cd03
Signed-off-by: sarthaksarthak9 <sarthaknegi908@gmail.com>
sarthaksarthak9 Oct 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 50 additions & 33 deletions third_party/airflow/armada/operators/armada.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

import logging
from typing import Optional, List

from airflow.models import BaseOperator
from airflow.exceptions import AirflowException

from armada_client.armada.submit_pb2 import JobSubmitRequestItem
from armada_client.client import ArmadaClient

from armada.operators.jobservice import JobServiceClient
from armada.operators.utils import (
airflow_error,
Expand All @@ -33,29 +30,25 @@
)
from armada.jobservice import jobservice_pb2


armada_logger = logging.getLogger("airflow.task")


class ArmadaOperator(BaseOperator):
"""
Implementation of an ArmadaOperator for airflow.

Implementation of an ArmadaOperator for Airflow.
Airflow operators inherit from BaseOperator.

:param name: The name of the airflow task
:param armada_client: The Armada Python GRPC client
that is used for interacting with Armada
:param job_service_client: The JobServiceClient that is used for polling
:param armada_queue: The queue name for Armada.
:param job_request_items: A PodSpec that is used by Armada for submitting a job
:param lookout_url_template: A URL template to be used to provide users
a valid link to the related lookout job in this operator's log.
The format should be:
"https://lookout.armada.domain/jobs?job_id=<job_id>" where <job_id> will
be replaced with the actual job ID.
:param poll_interval: How often to poll jobservice to get status.
:return: an armada operator instance
used for interacting with Armada
:param job_service_client: The JobServiceClient used for polling
:param armada_queue: The queue name for Armada
:param job_request_items: A list of JobSubmitRequestItem objects used by Armada for submitting a job
:param lookout_url_template: A URL template to provide users with a valid link to the related lookout job in this operator's log.
The format should be: "https://lookout.armada.domain/jobs?job_id=<job_id>"
where <job_id> will be replaced with the actual job ID.
:param poll_interval: How often to poll jobservice to get status
:return: An Armada operator instance
"""

def __init__(
Expand All @@ -77,57 +70,81 @@ def __init__(
self.job_request_items = job_request_items
self.lookout_url_template = lookout_url_template
self.poll_interval = poll_interval
self.job_id = None # Initialize job_id attribute
self.job_set_id = None # Initialize job_set_id attribute
sarthaksarthak9 marked this conversation as resolved.
Show resolved Hide resolved

def execute(self, context) -> None:
"""
Executes the Armada Operator.

Runs an Armada job and calls the job_service_client for polling.

:param context: The airflow context.

:param context: The Airflow context.
:return: None
"""
# Health Check
health = self.job_service.health()
if health.status != jobservice_pb2.HealthCheckResponse.SERVING:
armada_logger.warn("Armada Job Service is not health")
# This allows us to use a unique id from airflow
# and have all jobs in a dag correspond to same jobset
job_set_id = context["run_id"]
armada_logger.warn("Armada Job Service is not healthy")

# This allows us to use a unique id from Airflow
# and have all jobs in a DAG correspond to the same jobset
self.job_set_id = context["run_id"]
job = self.armada_client.submit_jobs(
queue=self.armada_queue,
job_set_id=job_set_id,
job_set_id=self.job_set_id,
job_request_items=annotate_job_request_items(
context, self.job_request_items
),
)

try:
job_id = job.job_response_items[0].job_id
self.job_id = job.job_response_items[0].job_id
except Exception:
raise AirflowException("Armada has issues submitting job")
raise AirflowException("Armada encountered issues submitting the job")

armada_logger.info("Running Armada job %s with id %s", self.name, job_id)
armada_logger.info("Running Armada job %s with id %s", self.name, self.job_id)

lookout_url = self._get_lookout_url(job_id)
if len(lookout_url) > 0:
lookout_url = self._get_lookout_url(self.job_id)
if lookout_url:
armada_logger.info("Lookout URL: %s", lookout_url)

job_state, job_message = search_for_job_complete(
job_service_client=self.job_service,
armada_queue=self.armada_queue,
job_set_id=job_set_id,
job_set_id=self.job_set_id,
airflow_task_name=self.name,
job_id=job_id,
job_id=self.job_id,
poll_interval=self.poll_interval,
)

armada_logger.info(
"Armada Job finished with %s and message: %s", job_state, job_message
)
airflow_error(job_state, self.name, job_id)
airflow_error(job_state, self.name, self.job_id)

def _get_lookout_url(self, job_id: str) -> str:
if self.lookout_url_template is None:
return ""
return self.lookout_url_template.replace("<job_id>", job_id)

def on_kill(self) -> None:
sarthaksarthak9 marked this conversation as resolved.
Show resolved Hide resolved
"""
Stops the JobService from listening to the JobSet and cancels the jobs.

:return: None
"""
try:
if self.job_set_id && self.queue:

sarthaksarthak9 marked this conversation as resolved.
Show resolved Hide resolved

# Cancel the jobs using the Armada client
self.armada_client.cancel_job(job_set_id=self.job_set_id, queue=self.queue)
armada_logger.info("Queue %s and JobSetId %s has been cancelled.", self.queue, self.job_set_id)
except Exception as e:
armada_logger.warning("Error during job unsubscription and cancellation: %s", str(e))