From 95c0950ba632cdfc94dea402c464df388704a77c Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Thu, 7 Nov 2024 15:07:06 +0000 Subject: [PATCH] Add type annotations to ``JobRunnerMapper`` and related code --- lib/galaxy/jobs/__init__.py | 12 +++++------- lib/galaxy/jobs/handler.py | 7 ++++--- lib/galaxy/jobs/mapper.py | 20 ++++++++++++++++++-- test/unit/app/jobs/test_mapper.py | 9 ++++++--- 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 3f8e33321ac2..7809d94871de 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -328,7 +328,7 @@ def __init__(self, app: MinimalManagerApp): """Parse the job configuration XML.""" self.app = app self.runner_plugins = [] - self.dynamic_params = None + self.dynamic_params: Optional[Dict[str, Any]] = None self.handlers = {} self.handler_runner_plugins = {} self.default_handler_id = None @@ -432,7 +432,7 @@ def _configure_from_dict(self, job_config_dict): continue self.runner_plugins.append(runner_info) if "dynamic" in job_config_dict: - self.dynamic_params = job_config_dict.get("dynamic", None) + self.dynamic_params = job_config_dict["dynamic"] # Parse handlers handling_config_dict = job_config_dict.get("handling", {}) @@ -830,12 +830,12 @@ def get_destinations(self, id_or_tag) -> Iterable[JobDestination]: """ return self.destinations.get(id_or_tag, []) - def get_job_runner_plugins(self, handler_id): + def get_job_runner_plugins(self, handler_id: str): """Load all configured job runner plugins :returns: list of job runner plugins """ - rval = {} + rval: Dict[str, BaseJobRunner] = {} if handler_id in self.handler_runner_plugins: plugins_to_load = [rp for rp in self.runner_plugins if rp["id"] in self.handler_runner_plugins[handler_id]] log.info( @@ -871,11 +871,9 @@ def get_job_runner_plugins(self, handler_id): # If the name included a '.' or loading from the static runners path failed, try the original name module = __import__(load) module_name = load - if module is None: - # Module couldn't be loaded, error should have already been displayed - continue for comp in module_name.split(".")[1:]: module = getattr(module, comp) + assert module # make mypy happy if not class_names: # If there's not a ':', we check .__all__ for class names try: diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 2a455c8c425f..3cae6bf35750 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -231,7 +231,7 @@ class StopSignalException(Exception): class BaseJobHandlerQueue(Monitors): STOP_SIGNAL = object() - def __init__(self, app: MinimalManagerApp, dispatcher): + def __init__(self, app: MinimalManagerApp, dispatcher: "DefaultJobDispatcher"): """ Initializes the Queue, creates (unstarted) monitoring thread. """ @@ -309,7 +309,8 @@ def __check_jobs_at_startup(self): with transaction(session): session.commit() - def _check_job_at_startup(self, job): + def _check_job_at_startup(self, job: model.Job): + assert job.tool_id is not None if not self.app.toolbox.has_tool(job.tool_id, job.tool_version, exact=True): log.warning(f"({job.id}) Tool '{job.tool_id}' removed from tool config, unable to recover job") self.job_wrapper(job).fail( @@ -1207,7 +1208,7 @@ def start(self): for runner in self.job_runners.values(): runner.start() - def url_to_destination(self, url): + def url_to_destination(self, url: str): """This is used by the runner mapper (a.k.a. dynamic runner) and recovery methods to have runners convert URLs to destinations. diff --git a/lib/galaxy/jobs/mapper.py b/lib/galaxy/jobs/mapper.py index f5bac4e68407..66edbc10867e 100644 --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -2,6 +2,10 @@ import logging from inspect import getfullargspec from types import ModuleType +from typing import ( + Callable, + TYPE_CHECKING, +) import galaxy.jobs.rules from galaxy.jobs import stock_rules @@ -9,6 +13,13 @@ from galaxy.util.submodules import import_submodules from .rule_helper import RuleHelper +if TYPE_CHECKING: + from galaxy.jobs import ( + JobConfiguration, + JobDestination, + JobWrapper, + ) + log = logging.getLogger(__name__) DYNAMIC_RUNNER_NAME = "dynamic" @@ -52,7 +63,12 @@ class JobRunnerMapper: rules_module: ModuleType - def __init__(self, job_wrapper, url_to_destination, job_config): + def __init__( + self, + job_wrapper: "JobWrapper", + url_to_destination: Callable[[str], "JobDestination"], + job_config: "JobConfiguration", + ): self.job_wrapper = job_wrapper self.url_to_destination = url_to_destination self.job_config = job_config @@ -129,7 +145,7 @@ def __job_params(self, job): param_values = job.get_param_values(app, ignore_errors=True) return param_values - def __convert_url_to_destination(self, url): + def __convert_url_to_destination(self, url: str): """ Job runner URLs are deprecated, but dynamic mapper functions may still be returning them. Runners are expected to be able to convert these to diff --git a/test/unit/app/jobs/test_mapper.py b/test/unit/app/jobs/test_mapper.py index f2ec4cd1b87b..effd1fe71cd8 100644 --- a/test/unit/app/jobs/test_mapper.py +++ b/test/unit/app/jobs/test_mapper.py @@ -1,8 +1,11 @@ import uuid +from typing import cast from galaxy.jobs import ( HasResourceParameters, + JobConfiguration, JobDestination, + JobWrapper, ) from galaxy.jobs.mapper import ( ERROR_MESSAGE_NO_RULE_FUNCTION, @@ -134,10 +137,10 @@ def __assert_mapper_errors_with_message(mapper, message): def __mapper(tool_job_destination=TOOL_JOB_DESTINATION): - job_wrapper = MockJobWrapper(tool_job_destination) - job_config = MockJobConfig() + job_wrapper = cast(JobWrapper, MockJobWrapper(tool_job_destination)) + job_config = cast(JobConfiguration, MockJobConfig()) - mapper = JobRunnerMapper(job_wrapper, {}, job_config) + mapper = JobRunnerMapper(job_wrapper, lambda url: JobDestination(), job_config) mapper.rules_module = test_rules return mapper