Skip to content

Commit

Permalink
Also support job_options_update to inject job options in synchronou…
Browse files Browse the repository at this point in the history
…s processing requests

ref #135, eu-cdse/openeo-cdse-infra#114
  • Loading branch information
soxofaan committed May 2, 2024
1 parent e783963 commit 7795215
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.

The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## 0.34.0

- Also support `job_options_update` to inject job options in synchronous processing requests ([#135](https://github.com/Open-EO/openeo-aggregator/issues/135), eu-cdse/openeo-cdse-infra#114)

## 0.33.0

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"requests",
"attrs",
"openeo>=0.27.0",
"openeo_driver>=0.88.0.dev",
"openeo_driver>=0.99.0.dev",
"flask~=2.0",
"gunicorn~=20.0",
"python-json-logger>=2.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from typing import Optional

__version__ = "0.33.0a1"
__version__ = "0.34.0a1"


def log_version_info(logger: Optional[logging.Logger] = None):
Expand Down
17 changes: 15 additions & 2 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,13 +583,26 @@ def evaluate(self, process_graph: dict, env: EvalEnv = None):

# Send process graph to backend
con = self.backends.get_connection(backend_id=backend_id)
request_pg = {"process": {"process_graph": process_graph}}
post_data = {"process": {"process_graph": process_graph}}

# TODO: assumption here that job_options for sync requests are available through EvalEnv. Better options?
job_options = env.get("job_options")
if get_backend_config().job_options_update:
# Allow fine-tuning job options through config
job_options = get_backend_config().job_options_update(job_options=job_options, backend_id=backend_id)

if job_options:
# TODO: this (re)groups job options under "job_options" key, while original options might have been at root level.
# How should this be handled?
post_data["job_options"] = job_options

# TODO: inject job options here as well
timing_logger = TimingLogger(title=f"Evaluate process graph on backend {backend_id}", logger=_log.info)
with con.authenticated_from_request(flask.request), timing_logger:
try:
backend_response = con.post(
path="/result",
json=request_pg,
json=post_data,
stream=True,
timeout=CONNECTION_TIMEOUT_RESULT,
expected_status=200,
Expand Down
74 changes: 74 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,80 @@ def test_validation_upstream_failure(self, api100, requests_mock, backend1, back
}
assert validation_mock.call_count == 1

@pytest.mark.parametrize(
["orig_post_data", "job_options_update", "expected_post_data"],
[
# No job options in original and none added
(
{"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}}},
None,
{"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}}},
),
# No job options in original, but some added
(
{"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}}},
lambda job_options, backend_id: {**(job_options or {}), **{"beverage": f"fizzy{backend_id}"}},
{
"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}},
"job_options": {"beverage": "fizzyb1"},
},
),
# Merge original and extra job options
(
{
"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}},
"job_options": {"side": "salad", "color": "green"},
},
lambda job_options, backend_id: {
**(job_options or {}),
**{"beverage": f"fizzy{backend_id}", "color": "blue"},
},
{
"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}},
"job_options": {"side": "salad", "beverage": "fizzyb1", "color": "blue"},
},
),
# Handling of job options at top level
# TODO: keep at top level?
(
{
"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}},
"_x_preferred_speed": "slow",
},
None,
{
"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}},
"job_options": {"_x_preferred_speed": "slow"},
},
),
(
{
"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}},
"_x_preferred_speed": "slow",
},
lambda job_options, backend_id: {**(job_options or {}), **{"beverage": f"fizzy{backend_id}"}},
{
"process": {"process_graph": {"fo": {"process_id": "fo", "arguments": {}, "result": True}}},
"job_options": {"_x_preferred_speed": "slow", "beverage": "fizzyb1"},
},
),
],
)
def test_sync_processing_with_job_options(
self, api100, requests_mock, backend1, backend2, orig_post_data, job_options_update, expected_post_data
):
def post_result(request: requests.Request, context):
post_data = request.json()
assert post_data == expected_post_data
context.headers["Content-Type"] = "application/json"
return 123

requests_mock.post(backend1 + "/result", json=post_result)
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
with config_overrides(job_options_update=job_options_update):
res = api100.post("/result", json=orig_post_data).assert_status_code(200)
assert res.json == 123


class TestBatchJobs:
def test_list_jobs_no_auth(self, api100):
Expand Down

0 comments on commit 7795215

Please sign in to comment.