Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on_kill() operator added. #2461

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
29e75af
on_kill() operator added.
sarthaksarthak9 May 12, 2023
6e66710
job_cancel added
sarthaksarthak9 May 13, 2023
e1c3476
removed openai and use cancel_job from client.py
sarthaksarthak9 May 13, 2023
7e8c1a9
Merge branch 'master' into airflow1
sarthaksarthak9 May 15, 2023
a795d98
add the cancelling of jobs in the on_kill function
sarthaksarthak9 May 20, 2023
32b6bcc
cancel the Jobset
sarthaksarthak9 May 21, 2023
c5fc912
Merge branch 'armadaproject:master' into airflow1
sarthaksarthak9 May 22, 2023
930c3c7
update the changes.
sarthaksarthak9 May 22, 2023
09989ed
Merge branch 'airflow1' of https://github.com/sarthaksarthak9/armada …
sarthaksarthak9 May 22, 2023
a04b5b7
remove newlines.
sarthaksarthak9 May 22, 2023
ec295f2
add self.queue
sarthaksarthak9 Jun 2, 2023
14ada00
add the docs of kill()
sarthaksarthak9 Jun 2, 2023
d533445
reformatted
sarthaksarthak9 Jun 3, 2023
7508265
Merge branch 'master' into airflow1
sarthaksarthak9 Jun 3, 2023
df23245
unit test added
sarthaksarthak9 Jul 17, 2023
59d72b7
Merge branch 'master' into airflow1
sarthaksarthak9 Jul 17, 2023
d02e84e
file formatted
sarthaksarthak9 Jul 18, 2023
6d936e2
undo
sarthaksarthak9 Jul 19, 2023
ea76ce4
redo
sarthaksarthak9 Jul 19, 2023
cbd0749
updated
sarthaksarthak9 Jul 24, 2023
6369ee7
update
sarthaksarthak9 Jul 25, 2023
d824296
Merge branch 'master' into airflow1
sarthaksarthak9 Jul 25, 2023
9c848d1
tox -e docs-check
sarthaksarthak9 Jul 27, 2023
7b9c6a5
tox -e docs-check
sarthaksarthak9 Jul 27, 2023
16afcc0
re-check
sarthaksarthak9 Jul 28, 2023
3943a6c
Merge branch 'master' into airflow1
kannon92 Jul 28, 2023
e669c6f
Merge branch 'master' into airflow1
sarthaksarthak9 Aug 23, 2023
b194bdc
Merge branch 'master' into airflow1
sarthaksarthak9 Aug 30, 2023
784bf28
updated
sarthaksarthak9 Sep 16, 2023
b46ae28
run tox -e formate-code to remove errors
sarthaksarthak9 Sep 16, 2023
954d65b
Merge branch 'master' into airflow1
Sharpz7 Sep 20, 2023
60fe261
Merge branch 'master' into airflow1
Sharpz7 Sep 20, 2023
4649b5f
Merge branch 'master' into airflow1
Sharpz7 Sep 26, 2023
ded1de8
Merge branch 'master' into airflow1
Sharpz7 Oct 3, 2023
7fb86c7
Signed-off-by: sarthaksarthak9 <sarthaknegi908@gmail.com>
sarthaksarthak9 Oct 5, 2023
7c4cd03
Signed-off-by: sarthaksarthak9 <sarthaknegi908@gmail.com>
sarthaksarthak9 Oct 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ Runs an Armada job and calls the job_service_client for polling.



#### on_kill()
Stops the JobService from listening to the JobSet and cancels the jobs.


* **Returns**

None



* **Return type**

None



#### render_template_fields(context, jinja_env=None)
Template all attributes listed in *self.template_fields*.

Expand Down
20 changes: 20 additions & 0 deletions third_party/airflow/armada/operators/armada.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,26 @@ def _get_lookout_url(self, job_id: str) -> str:
return ""
return self.lookout_url_template.replace("<job_id>", job_id)

def on_kill(self) -> None:
sarthaksarthak9 marked this conversation as resolved.
Show resolved Hide resolved
"""
Stops the JobService from listening to the JobSet and cancels the jobs.

:return: None
"""
try:
if self.job_set_id and self.queue:
sarthaksarthak9 marked this conversation as resolved.
Show resolved Hide resolved
# Cancel the jobs using the Armada client
self.armada_client.cancel_job(
job_set_id=self.job_set_id, queue=self.queue
)
armada_logger.info(
"Queue %s and JobSetId %s has been cancelled.",
self.queue,
self.job_set_id,
)
except Exception as e:
armada_logger.warning("Error during job cancellation: %s", str(e))

def render_template_fields(
self,
context: Context,
Expand Down
38 changes: 38 additions & 0 deletions third_party/airflow/tests/unit/test_armada_operator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from armada.operators.armada import ArmadaOperator

from armada_client import ArmadaClient
from armada.operators.armada import ArmadaOperator
from unittest.mock import patch, Mock

import copy
from unittest.mock import patch, Mock

import grpc
import pytest
import unittest



from armada.jobservice import jobservice_pb2
from armada.operators.armada import ArmadaOperator
Expand Down Expand Up @@ -45,6 +54,35 @@ def test_get_lookout_url(lookout_url_template, job_id, expected_url):
assert operator._get_lookout_url(job_id) == expected_url


class TestJobService(unittest.TestCase):
@patch.object(JobService, "cancel_jobs")
def test_on_kill(self, mock_cancel_jobs):

operator = ArmadaOperator(
name="test_operator",
armada_channel_args={},
job_service_channel_args={},
armada_queue="test_queue",
job_request_items=[],
)

operator.job_set_id = "test_job_set_id"
operator.queue = "test_queue"

armada_client_instance = mock_armada_client.return_value
armada_client_instance.cancel_job.return_value = None

operator.on_kill()

armada_client_instance.cancel_job.assert_called_once_with(
job_set_id="test_job_set_id", queue="test_queue"
)


if __name__ == "__main__":
unittest.main()


def test_deepcopy_operator():
armada_channel_args = {"target": "127.0.0.1:50051"}
job_service_channel_args = {"target": "127.0.0.1:60003"}
Expand Down
Loading