From 1f55f974d2c31d29a7e5c5161e8d233a76692923 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 7 Sep 2023 18:16:44 +0200 Subject: [PATCH] Issue #115 Improve handling of failed sub batch job creation --- .../partitionedjobs/tracking.py | 102 +++++++++--------- tests/partitionedjobs/conftest.py | 5 +- tests/partitionedjobs/test_api.py | 37 +++++++ 3 files changed, 94 insertions(+), 50 deletions(-) diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py index 129fae2f..dc07dfd4 100644 --- a/src/openeo_aggregator/partitionedjobs/tracking.py +++ b/src/openeo_aggregator/partitionedjobs/tracking.py @@ -112,57 +112,61 @@ def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict: } } - for sjob_id, subjob, subjob_dependencies in splitter.split_streaming( - process_graph=process["process_graph"], get_replacement=get_replacement, main_subgraph_id=main_subgraph_id - ): - subjobs[sjob_id] = subjob - dependencies[sjob_id] = subjob_dependencies - try: - # TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible? - con = self._backends.get_connection(subjob.backend_id) - with con.authenticated_from_request(request=flask.request), con.override( - default_timeout=CONNECTION_TIMEOUT_JOB_START - ): - with TimingLogger(title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info): - job = con.create_job( - process_graph=subjob.process_graph, - title=f"Crossbackend job {pjob_id}:{sjob_id}", - plan=metadata.get("plan"), - budget=metadata.get("budget"), - additional=job_options, - ) - _log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}") - batch_jobs[sjob_id] = job - title = f"Partitioned job {pjob_id=} {sjob_id=}" - self._db.insert_sjob( - user_id=user_id, - pjob_id=pjob_id, - sjob_id=sjob_id, - subjob=subjob, - title=title, - status=STATUS_CREATED, - ) - self._db.set_backend_job_id( - user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id - ) - create_stats[STATUS_CREATED] += 1 - except Exception as exc: - _log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True) - msg = f"Create failed: {exc}" - self._db.set_sjob_status( - user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg - ) - create_stats[STATUS_ERROR] += 1 + try: + for sjob_id, subjob, subjob_dependencies in splitter.split_streaming( + process_graph=process["process_graph"], + get_replacement=get_replacement, + main_subgraph_id=main_subgraph_id, + ): + subjobs[sjob_id] = subjob + dependencies[sjob_id] = subjob_dependencies + try: + title = f"Partitioned job {pjob_id=} {sjob_id=}" + self._db.insert_sjob(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, subjob=subjob, title=title) + + # TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible? + con = self._backends.get_connection(subjob.backend_id) + with con.authenticated_from_request(request=flask.request), con.override( + default_timeout=CONNECTION_TIMEOUT_JOB_START + ): + with TimingLogger( + title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info + ): + job = con.create_job( + process_graph=subjob.process_graph, + title=f"Crossbackend job {pjob_id}:{sjob_id}", + plan=metadata.get("plan"), + budget=metadata.get("budget"), + additional=job_options, + ) + _log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}") + batch_jobs[sjob_id] = job + self._db.set_sjob_status( + user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_CREATED + ) + self._db.set_backend_job_id( + user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id + ) + create_stats[STATUS_CREATED] += 1 + except Exception as exc: + _log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True) + msg = f"Create failed: {exc}" + self._db.set_sjob_status( + user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg + ) + create_stats[STATUS_ERROR] += 1 - # TODO: this is currently unused, don't bother building it at all? - partitioned_job = PartitionedJob( - process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies - ) + # TODO: this is currently unused, don't bother building it at all? + partitioned_job = PartitionedJob( + process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies + ) - pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR - self._db.set_pjob_status( - user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0 - ) + pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR + self._db.set_pjob_status( + user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0 + ) + except Exception as exc: + self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_ERROR, message=str(exc)) return pjob_id diff --git a/tests/partitionedjobs/conftest.py b/tests/partitionedjobs/conftest.py index c986ab25..3911ce96 100644 --- a/tests/partitionedjobs/conftest.py +++ b/tests/partitionedjobs/conftest.py @@ -65,6 +65,7 @@ def __init__(self, requests_mock, backend_url: str, job_id_template: str = "job{ self.job_id_template = job_id_template self.jobs: Dict[Tuple[str, str], DummyBatchJobData] = {} self.users: Dict[str, str] = {} + self.fail_create_job = False def register_user(self, bearer_token: str, user_id: str): self.users[bearer_token] = user_id @@ -77,7 +78,7 @@ def get_user_id(self, request: requests.Request): def get_job_data(self, user_id, job_id) -> DummyBatchJobData: if (user_id, job_id) not in self.jobs: - raise JobNotFoundException + raise JobNotFoundException(job_id=job_id) return self.jobs[user_id, job_id] def setup_basic_requests_mocks(self): @@ -127,6 +128,8 @@ def _handle_get_jobs(self, request: requests.Request, context): def _handle_post_jobs(self, request: requests.Request, context): """`POST /jobs` handler (create job)""" + if self.fail_create_job: + raise RuntimeError("nope!") user_id = self.get_user_id(request) job_id = self.job_id_template.format(i=len(self.jobs)) assert (user_id, job_id) not in self.jobs diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py index 4a98f595..37b334d4 100644 --- a/tests/partitionedjobs/test_api.py +++ b/tests/partitionedjobs/test_api.py @@ -873,3 +873,40 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1): }, } ) + + @now.mock + def test_failing_create(self, flask_app, api100, zk_db, dummy1): + """Run what happens when creation of sub batch job fails on upstream backend""" + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + dummy1.fail_create_job = True + + pg = { + "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "merge": { + "process_id": "merge_cubes", + "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}}, + "result": True, + }, + } + + res = api100.post( + "/jobs", + json={ + "process": {"process_graph": pg}, + "job_options": {"split_strategy": "crossbackend"}, + }, + ).assert_status_code(201) + + pjob_id = "pj-20220119-123456" + expected_job_id = f"agg-{pjob_id}" + assert res.headers["OpenEO-Identifier"] == expected_job_id + + res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) + assert res.json == { + "id": expected_job_id, + "process": {"process_graph": pg}, + "status": "error", + "created": self.now.rfc3339, + "progress": 0, + }