Skip to content

Commit

Permalink
Issue #115 Initial implementation of "crossbackend" splitting through…
Browse files Browse the repository at this point in the history
… API
  • Loading branch information
soxofaan committed Sep 6, 2023
1 parent 0098c46 commit 41a07d2
Show file tree
Hide file tree
Showing 8 changed files with 444 additions and 57 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"requests",
"attrs",
"openeo>=0.17.0",
"openeo_driver>=0.57.1.dev",
"openeo_driver>=0.65.0.dev",
"flask~=2.0",
"gunicorn~=20.0",
"python-json-logger>=2.0.0",
Expand Down
82 changes: 70 additions & 12 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,17 @@
)
from openeo_aggregator.metadata.reporter import LoggerReporter
from openeo_aggregator.partitionedjobs import PartitionedJob
from openeo_aggregator.partitionedjobs.crossbackend import (
CrossBackendSplitter,
SubGraphId,
)
from openeo_aggregator.partitionedjobs.splitting import FlimsySplitter, TileGridSplitter
from openeo_aggregator.partitionedjobs.tracking import (
PartitionedJobConnection,
PartitionedJobTracker,
)
from openeo_aggregator.utils import (
Clock,
FlatPG,
PGWithMetadata,
dict_merge,
Expand Down Expand Up @@ -620,18 +625,29 @@ def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
})

def create_job(
self, user_id: str, process: dict, api_version: str,
metadata: dict, job_options: dict = None
self,
user_id: str,
process: dict,
api_version: str,
metadata: dict,
job_options: Optional[dict] = None,
) -> BatchJobMetadata:
if "process_graph" not in process:
raise ProcessGraphMissingException()

# TODO: better, more generic/specific job_option(s)?
if job_options and (
job_options.get(JOB_OPTION_SPLIT_STRATEGY)
or job_options.get(JOB_OPTION_TILE_GRID)
):
return self._create_partitioned_job(
if job_options and (job_options.get(JOB_OPTION_SPLIT_STRATEGY) or job_options.get(JOB_OPTION_TILE_GRID)):
if job_options.get(JOB_OPTION_SPLIT_STRATEGY) == "crossbackend":
# TODO this is temporary feature flag to trigger "crossbackend" splitting
return self._create_crossbackend_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
else:
return self._create_partitioned_job(
user_id=user_id,
process=process,
api_version=api_version,
Expand Down Expand Up @@ -690,8 +706,9 @@ def _create_job_standard(
raise OpenEOApiException(f"Failed to create job on backend {backend_id!r}: {e!r}")
return BatchJobMetadata(
id=JobIdMapping.get_aggregator_job_id(backend_job_id=job.job_id, backend_id=backend_id),
# Note: required, but unused metadata
status="dummy", created="dummy", process={"dummy": "dummy"}
# Note: additional required, but unused metadata
status="dummy",
created="dummy",
)

def _create_partitioned_job(
Expand Down Expand Up @@ -719,11 +736,52 @@ def _create_partitioned_job(
raise ValueError("Could not determine splitting strategy from job options")
pjob: PartitionedJob = splitter.split(process=process, metadata=metadata, job_options=job_options)

job_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request)
pjob_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request)

return BatchJobMetadata(
id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG),
# Note: additional required, but unused metadata
status="dummy",
created="dummy",
)

def _create_crossbackend_job(
self,
user_id: str,
process: PGWithMetadata,
api_version: str,
metadata: dict,
job_options: Optional[dict] = None,
) -> BatchJobMetadata:
"""
Advanced/handled batch job creation:
- split original job in (possibly) multiple sub-jobs,
e.g. split the process graph based on `load_collection` availability
- distribute sub-jobs across (possibly) multiple back-ends
- keep track of them through a "parent job" in a `PartitionedJobTracker`.
"""
if not self.partitioned_job_tracker:
raise FeatureUnsupportedException(message="Partitioned job tracking is not supported")

def backend_for_collection(collection_id) -> str:
return self._catalog.get_backends_for_collection(cid=collection_id)[0]

splitter = CrossBackendSplitter(
backend_for_collection=backend_for_collection,
# TODO: job option for `always_split` feature?
always_split=True,
)

pjob_id = self.partitioned_job_tracker.create_crossbackend_pjob(
user_id=user_id, process=process, metadata=metadata, job_options=job_options, splitter=splitter
)

return BatchJobMetadata(
id=JobIdMapping.get_aggregator_job_id(backend_job_id=job_id, backend_id=JobIdMapping.AGG),
status="dummy", created="dummy", process={"dummy": "dummy"}
id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG),
# Note: additional required, but unused metadata
status="dummy",
created="dummy",
)

def _get_connection_and_backend_job_id(
Expand Down
1 change: 1 addition & 0 deletions src/openeo_aggregator/partitionedjobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def to_subjobs_dict(
"""Helper to convert a collection of SubJobs to a dictionary"""
# TODO: hide this logic in a setter or __init__ (e.g. when outgrowing the constraints of typing.NamedTuple)
if isinstance(subjobs, Sequence):
# TODO: eliminate this `Sequence` code path, and just always work with dict?
return {f"{i:04d}": j for i, j in enumerate(subjobs)}
elif isinstance(subjobs, dict):
return {str(k): v for k, v in subjobs.items()}
Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/partitionedjobs/crossbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def split_streaming(
(e.g. creating openEO batch jobs on the fly and injecting the corresponding batch job ids appropriately).
:return: tuple containing:
- subgraph id
- subgraph id, recommended to handle it as opaque id (but usually format '{backend_id}:{node_id}')
- SubJob
- dependencies as list of subgraph ids
"""
Expand Down
109 changes: 106 additions & 3 deletions src/openeo_aggregator/partitionedjobs/tracking.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import collections
import contextlib
import dataclasses
import datetime
import logging
import threading
from typing import List, Optional
from typing import Dict, List, Optional, Union

import flask
from openeo.api.logs import LogEntry
from openeo.rest.job import ResultAsset
from openeo.rest.job import BatchJob, ResultAsset
from openeo.util import TimingLogger, rfc3339
from openeo_driver.errors import JobNotFinishedException
from openeo_driver.users import User
Expand All @@ -21,10 +22,15 @@
STATUS_INSERTED,
STATUS_RUNNING,
PartitionedJob,
SubJob,
)
from openeo_aggregator.partitionedjobs.crossbackend import (
CrossBackendSplitter,
SubGraphId,
)
from openeo_aggregator.partitionedjobs.splitting import TileGridSplitter
from openeo_aggregator.partitionedjobs.zookeeper import ZooKeeperPartitionedJobDB
from openeo_aggregator.utils import _UNSET, timestamp_to_rfc3339
from openeo_aggregator.utils import _UNSET, Clock, PGWithMetadata, timestamp_to_rfc3339

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,6 +63,103 @@ def create(self, user_id: str, pjob: PartitionedJob, flask_request: flask.Reques
self.create_sjobs(user_id=user_id, pjob_id=pjob_id, flask_request=flask_request)
return pjob_id

def create_crossbackend_pjob(
self,
*,
user_id: str,
process: PGWithMetadata,
metadata: dict,
job_options: Optional[dict] = None,
splitter: CrossBackendSplitter,
) -> str:
"""
crossbackend partitioned job creation is different from original partitioned
job creation due to dependencies between jobs.
First the batch jobs have to be created in the right order on the respective backends
before we have finalised sub-processgraphs, whose metadata can then be persisted in the ZooKeeperPartitionedJobDB
"""
# Start with reserving a new partitioned job id based on initial metadata
pjob_node_value = self._db.serialize(
user_id=user_id,
created=Clock.time(),
process=process,
metadata=metadata,
job_options=job_options,
)
pjob_id = self._db.obtain_new_pjob_id(user_id=user_id, initial_value=pjob_node_value)
self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_INSERTED, create=True)

# Create batch jobs on respective backends, and build the PartitionedJob components along the way
subjobs: Dict[str, SubJob] = {}
dependencies: Dict[str, List[str]] = {}
batch_jobs: Dict[SubGraphId, BatchJob] = {}
create_stats = collections.Counter()

def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
# TODO: use `load_stac` iso `load_result`, and use canonical URL?
nonlocal batch_jobs
job_id = batch_jobs[subgraph_id].job_id
return {
node_id: {
"process_id": "load_result",
"arguments": {"id": job_id},
}
}

for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
process_graph=process["process_graph"], get_replacement=get_replacement
):
subjobs[sjob_id] = subjob
dependencies[sjob_id] = subjob_dependencies
try:
# TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
con = self._backends.get_connection(subjob.backend_id)
with con.authenticated_from_request(request=flask.request), con.override(
default_timeout=CONNECTION_TIMEOUT_JOB_START
):
with TimingLogger(title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info):
job = con.create_job(
process_graph=subjob.process_graph,
title=f"Crossbackend job {pjob_id}:{sjob_id}",
plan=metadata.get("plan"),
budget=metadata.get("budget"),
additional=job_options,
)
_log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}")
batch_jobs[sjob_id] = job
title = f"Partitioned job {pjob_id=} {sjob_id=}"
self._db.insert_sjob(
user_id=user_id,
pjob_id=pjob_id,
sjob_id=sjob_id,
subjob=subjob,
title=title,
status=STATUS_CREATED,
)
self._db.set_backend_job_id(
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id
)
create_stats[STATUS_CREATED] += 1
except Exception as exc:
_log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True)
msg = f"Create failed: {exc}"
self._db.set_sjob_status(
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg
)
create_stats[STATUS_ERROR] += 1

# TODO: this is currently unused, don't bother building it at all?
partitioned_job = PartitionedJob(
process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies
)

pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR
self._db.set_pjob_status(
user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0
)

return pjob_id

def create_sjobs(self, user_id: str, pjob_id: str, flask_request: flask.Request):
"""Create all sub-jobs on remote back-end for given partitioned job"""
pjob_metadata = self._db.get_pjob_metadata(user_id=user_id, pjob_id=pjob_id)
Expand Down
Loading

0 comments on commit 41a07d2

Please sign in to comment.