Skip to content

Commit

Permalink
fixup! fixup! Issue #150/#156 integrate DeepGraphSplitter in Aggregat…
Browse files Browse the repository at this point in the history
…orBatchJobs._create_crossbackend_job
  • Loading branch information
soxofaan committed Sep 19, 2024
1 parent b1c708e commit 5a087dc
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/openeo_aggregator/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ def exists(self, path):
def get(self, path):
self._assert_open()
if path not in self.data:
raise kazoo.exceptions.NoNodeError()
raise kazoo.exceptions.NoNodeError(path)
return self.data[path]

def get_children(self, path):
self._assert_open()
if path not in self.data:
raise kazoo.exceptions.NoNodeError()
raise kazoo.exceptions.NoNodeError(path)
parent = path.split("/")
return [p.split("/")[-1] for p in self.data if p.split("/")[:-1] == parent]

Expand Down
149 changes: 148 additions & 1 deletion tests/partitionedjobs/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,10 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1, split_strateg
"""Handling of single "load_collection" process graph"""
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)

pg = {"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}, "result": True}}
pg = {
# lc1 (that's it, that's the graph)
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}, "result": True}
}

res = api100.post(
"/jobs",
Expand Down Expand Up @@ -773,6 +776,9 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, dummy2, reques
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)

pg = {
# lc1 lc2
# \ /
# merge
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}},
"merge": {
Expand Down Expand Up @@ -913,6 +919,9 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, dummy2, r
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)

pg = {
# lc1 lc2
# \ /
# merge
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}},
"merge": {
Expand Down Expand Up @@ -999,6 +1008,9 @@ def test_failing_create(self, flask_app, api100, zk_db, dummy1, dummy2, split_st
dummy2.fail_create_job = True

pg = {
# lc1 lc2
# \ /
# merge
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}},
"merge": {
Expand Down Expand Up @@ -1028,3 +1040,138 @@ def test_failing_create(self, flask_app, api100, zk_db, dummy1, dummy2, split_st
"created": self.now.rfc3339,
"progress": 0,
}

@now.mock
def test_create_job_deep_basic(self, flask_app, api100, zk_db, dummy1, dummy2, requests_mock):
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)

pg = {
# lc1 lc2
# | |
# bands1 temporal2
# \ /
# merge
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"lc2": {"process_id": "load_collection", "arguments": {"id": "T22"}},
"bands1": {"process_id": "filter_bands", "arguments": {"data": {"from_node": "lc1"}}},
"temporal2": {"process_id": "filter_temporal", "arguments": {"data": {"from_node": "lc2"}}},
"merge": {
"process_id": "merge_cubes",
"arguments": {"cube1": {"from_node": "bands1"}, "cube2": {"from_node": "temporal2"}},
"result": True,
},
}

requests_mock.get(
"https://b2.test/v1/jobs/2-jb-0/results?partial=true",
json={"links": [{"rel": "canonical", "href": "https://data.b2.test/123abc"}]},
)

split_strategy = {"crossbackend": {"method": "deep", "primary_backend": "b1"}}
res = api100.post(
"/jobs",
json={
"process": {"process_graph": pg},
"job_options": {"split_strategy": split_strategy},
},
).assert_status_code(201)

pjob_id = "pj-20220119-123456"
expected_job_id = f"agg-{pjob_id}"
assert res.headers["Location"] == f"http://oeoa.test/openeo/1.0.0/jobs/{expected_job_id}"
assert res.headers["OpenEO-Identifier"] == expected_job_id

res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
assert res.json == {
"id": expected_job_id,
"process": {"process_graph": pg},
"status": "created",
"created": self.now.rfc3339,
"progress": 0,
}

# Inspect stored parent job metadata
assert zk_db.get_pjob_metadata(user_id=TEST_USER, pjob_id=pjob_id) == {
"user_id": TEST_USER,
"created": self.now.epoch,
"process": {"process_graph": pg},
"metadata": {"log_level": "info"},
"job_options": {"split_strategy": split_strategy},
"result_jobs": ["main"],
}

assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == {
"status": "created",
"message": approx_str_contains("{'created': 2}"),
"timestamp": pytest.approx(self.now.epoch, abs=5),
"progress": 0,
}

# Inspect stored subjob metadata
subjobs = zk_db.list_subjobs(user_id=TEST_USER, pjob_id=pjob_id)
assert subjobs == {
"b2:temporal2": {
"backend_id": "b2",
"process_graph": {
"lc2": {"arguments": {"id": "T22"}, "process_id": "load_collection"},
"temporal2": {"arguments": {"data": {"from_node": "lc2"}}, "process_id": "filter_temporal"},
"_agg_crossbackend_save_result": {
"arguments": {"data": {"from_node": "temporal2"}, "format": "GTiff"},
"process_id": "save_result",
"result": True,
},
},
"title": "Partitioned job pjob_id='pj-20220119-123456' sjob_id='b2:temporal2'",
},
"main": {
"backend_id": "b1",
"process_graph": {
"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection"},
"bands1": {"arguments": {"data": {"from_node": "lc1"}}, "process_id": "filter_bands"},
"temporal2": {"arguments": {"url": "https://data.b2.test/123abc"}, "process_id": "load_stac"},
"merge": {
"arguments": {"cube1": {"from_node": "bands1"}, "cube2": {"from_node": "temporal2"}},
"process_id": "merge_cubes",
"result": True,
},
},
"title": "Partitioned job pjob_id='pj-20220119-123456' " "sjob_id='main'",
},
}
sjob_id = "main"
expected_job_id = "1-jb-0"
assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == {
"status": "created",
"timestamp": self.now.epoch,
"message": None,
}
assert zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == expected_job_id
assert dummy1.get_job_status(TEST_USER, expected_job_id) == "created"
assert dummy1.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == {
"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection"},
"bands1": {"arguments": {"data": {"from_node": "lc1"}}, "process_id": "filter_bands"},
"temporal2": {"arguments": {"url": "https://data.b2.test/123abc"}, "process_id": "load_stac"},
"merge": {
"arguments": {"cube1": {"from_node": "bands1"}, "cube2": {"from_node": "temporal2"}},
"process_id": "merge_cubes",
"result": True,
},
}
sjob_id = "b2:temporal2"
expected_job_id = "2-jb-0"
assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == {
"status": "created",
"timestamp": self.now.epoch,
"message": None,
}
assert zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == expected_job_id
assert dummy2.get_job_status(TEST_USER, expected_job_id) == "created"
assert dummy2.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == {
"lc2": {"arguments": {"id": "T22"}, "process_id": "load_collection"},
"temporal2": {"arguments": {"data": {"from_node": "lc2"}}, "process_id": "filter_temporal"},
"_agg_crossbackend_save_result": {
"arguments": {"data": {"from_node": "temporal2"}, "format": "GTiff"},
"process_id": "save_result",
"result": True,
},
}

0 comments on commit 5a087dc

Please sign in to comment.