From 6437858e2494ba94f63d4555e9c9e741733c175e Mon Sep 17 00:00:00 2001 From: John Chilton Date: Thu, 29 Aug 2024 13:54:12 -0400 Subject: [PATCH] Typing around tool state. --- lib/galaxy/tools/__init__.py | 135 ++++++++++++------- lib/galaxy/tools/_types.py | 30 +++++ lib/galaxy/tools/actions/__init__.py | 6 +- lib/galaxy/tools/actions/data_manager.py | 4 +- lib/galaxy/tools/actions/metadata.py | 4 +- lib/galaxy/tools/actions/model_operations.py | 4 +- lib/galaxy/tools/execute.py | 16 +-- lib/galaxy/tools/parameters/__init__.py | 3 +- lib/galaxy/tools/parameters/meta.py | 8 +- lib/galaxy/webapps/galaxy/api/jobs.py | 8 +- lib/galaxy/work/context.py | 2 +- 11 files changed, 144 insertions(+), 76 deletions(-) create mode 100644 lib/galaxy/tools/_types.py diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 7193fd6ea618..d44f9ec28c30 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -183,7 +183,17 @@ get_tool_shed_url_from_tool_shed_registry, ) from galaxy.version import VERSION_MAJOR -from galaxy.work.context import proxy_work_context_for_history +from galaxy.work.context import ( + proxy_work_context_for_history, + WorkRequestContext, +) +from ._types import ( + InputFormatT, + ParameterValidationErrorsT, + ToolRequestT, + ToolStateJobInstancePopulatedT, + ToolStateJobInstanceT, +) from .execute import ( DatasetCollectionElementsSliceT, DEFAULT_JOB_CALLBACK, @@ -195,8 +205,6 @@ ExecutionSlice, JobCallbackT, MappingParameters, - ToolParameterRequestInstanceT, - ToolParameterRequestT, ) if TYPE_CHECKING: @@ -1796,21 +1804,16 @@ def visit_inputs(self, values, callback): if self.check_values: visit_input_values(self.inputs, values, callback) - def expand_incoming(self, trans, incoming, request_context, input_format="legacy"): - rerun_remap_job_id = None - if "rerun_remap_job_id" in incoming: - try: - rerun_remap_job_id = trans.app.security.decode_id(incoming["rerun_remap_job_id"]) - except Exception as exception: - log.error(str(exception)) - raise exceptions.MessageException( - "Failure executing tool with id '%s' (attempting to rerun invalid job).", self.id - ) - + def expand_incoming( + self, request_context: WorkRequestContext, incoming: ToolRequestT, input_format: InputFormatT = "legacy" + ): + rerun_remap_job_id = _rerun_remap_job_id(request_context, incoming) set_dataset_matcher_factory(request_context, self) # Fixed set of input parameters may correspond to any number of jobs. # Expand these out to individual parameters for given jobs (tool executions). + expanded_incomings: List[ToolStateJobInstanceT] + collection_info: Optional[MatchingCollections] expanded_incomings, collection_info = expand_meta_parameters(trans, self, incoming) # Remapping a single job to many jobs doesn't make sense, so disable @@ -1832,38 +1835,10 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy "Validated and populated state for tool request", ) all_errors = [] - all_params = [] + all_params: List[ToolStateJobInstancePopulatedT] = [] + for expanded_incoming in expanded_incomings: - params = {} - errors: Dict[str, str] = {} - if self.input_translator: - self.input_translator.translate(expanded_incoming) - if not self.check_values: - # If `self.check_values` is false we don't do any checking or - # processing on input This is used to pass raw values - # through to/from external sites. - params = expanded_incoming - else: - # Update state for all inputs on the current page taking new - # values from `incoming`. - populate_state( - request_context, - self.inputs, - expanded_incoming, - params, - errors, - simple_errors=False, - input_format=input_format, - ) - # If the tool provides a `validate_input` hook, call it. - validate_input = self.get_hook("validate_input") - if validate_input: - # hooks are so terrible ... this is specifically for https://github.com/galaxyproject/tools-devteam/blob/main/tool_collections/gops/basecoverage/operation_filter.py - legacy_non_dce_params = { - k: v.hda if isinstance(v, model.DatasetCollectionElement) and v.hda else v - for k, v in params.items() - } - validate_input(request_context, errors, legacy_non_dce_params, self.inputs) + params, errors = self._populate(request_context, expanded_incoming, input_format) all_errors.append(errors) all_params.append(params) unset_dataset_matcher_factory(request_context) @@ -1871,10 +1846,49 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy log.info(validation_timer) return all_params, all_errors, rerun_remap_job_id, collection_info + def _populate( + request_context, expanded_incoming: ToolStateJobInstanceT, input_format: InputFormatT + ) -> Tuple[ToolStateJobInstancePopulatedT, ParameterValidationErrorsT]: + """Validate expanded parameters for a job to replace references with model objects. + + So convert a ToolStateJobInstanceT to a ToolStateJobInstancePopulatedT. + """ + params: ToolStateJobInstancePopulatedT = {} + errors: ParameterValidationErrorsT = {} + if self.input_translator: + self.input_translator.translate(expanded_incoming) + if not self.check_values: + # If `self.check_values` is false we don't do any checking or + # processing on input This is used to pass raw values + # through to/from external sites. + params = cast(ToolStateJobInstancePopulatedT, expanded_incoming) + else: + # Update state for all inputs on the current page taking new + # values from `incoming`. + populate_state( + request_context, + self.inputs, + expanded_incoming, + params, + errors, + simple_errors=False, + input_format=input_format, + ) + # If the tool provides a `validate_input` hook, call it. + validate_input = self.get_hook("validate_input") + if validate_input: + # hooks are so terrible ... this is specifically for https://github.com/galaxyproject/tools-devteam/blob/main/tool_collections/gops/basecoverage/operation_filter.py + legacy_non_dce_params = { + k: v.hda if isinstance(v, model.DatasetCollectionElement) and v.hda else v + for k, v in params.items() + } + validate_input(request_context, errors, legacy_non_dce_params, self.inputs) + return params, errors + def handle_input( self, trans, - incoming: ToolParameterRequestT, + incoming: ToolRequestT, history: Optional[model.History] = None, use_cached_job: bool = DEFAULT_USE_CACHED_JOB, preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, @@ -1887,9 +1901,15 @@ def handle_input( there were no errors). """ request_context = proxy_work_context_for_history(trans, history=history) - all_params, all_errors, rerun_remap_job_id, collection_info = self.expand_incoming( + + expanded = self.expand_incoming( trans=trans, incoming=incoming, request_context=request_context, input_format=input_format ) + all_params: List[ToolStateJobInstancePopulatedT] = expanded[0] + all_errors: List[ParameterValidationErrorsT] = expanded[1] + rerun_remap_job_id: Optional[int] = expanded[2] + collection_info: Optional[MatchingCollections] = expanded[3] + # If there were errors, we stay on the same page and display them self.handle_incoming_errors(all_errors) @@ -1935,7 +1955,7 @@ def handle_input( implicit_collections=execution_tracker.implicit_collections, ) - def handle_incoming_errors(self, all_errors): + def handle_incoming_errors(self, all_errors: List[ParameterValidationErrorsT]): if any(all_errors): # simple param_key -> message string for tool form. err_data = {key: unicodify(value) for d in all_errors for (key, value) in d.items()} @@ -2060,7 +2080,7 @@ def get_static_param_values(self, trans): def execute( self, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[model.History] = None, set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, flush_job: bool = True, @@ -2086,7 +2106,7 @@ def execute( def _execute( self, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[model.History] = None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, execution_cache: Optional[ToolExecutionCache] = None, @@ -4110,6 +4130,21 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history # ---- Utility classes to be factored out ----------------------------------- + + +def _rerun_remap_job_id(trans, incoming) -> Optional[int]: + rerun_remap_job_id = None + if "rerun_remap_job_id" in incoming: + try: + rerun_remap_job_id = trans.app.security.decode_id(incoming["rerun_remap_job_id"]) + except Exception as exception: + log.error(str(exception)) + raise exceptions.MessageException( + "Failure executing tool with id '%s' (attempting to rerun invalid job).", self.id + ) + return rerun_remap_job_id + + class TracksterConfig: """Trackster configuration encapsulation.""" diff --git a/lib/galaxy/tools/_types.py b/lib/galaxy/tools/_types.py new file mode 100644 index 000000000000..61a9f4eb4e7d --- /dev/null +++ b/lib/galaxy/tools/_types.py @@ -0,0 +1,30 @@ +from typing import ( + Any, + Dict, +) + +from typing_extensions import Literal + +# Tool state goes through several different walk throughs that are difficult to follow I think, +# hopefully datatypes can be used as markers to describe what has been done - even if they don't +# provide strong traditional typing semantics. + +# Input dictionary from the API, may include map/reduce instructions. Objects are referenced by "src" +# dictionaries and encoded IDS. +ToolRequestT = Dict[str, Any] + +# Input dictionary extracted from a tool request for running a tool individually as a single job. Objects are referenced +# by "src" dictionaries with encoded IDs still but batch instructions have been pulled out. Parameters have not +# been "checked" (check_param has not been called). +ToolStateJobInstanceT = Dict[str, Any] + +# Input dictionary for an individual job where objects are their model objects and parameters have been +# "checked" (check_param has been called). +ToolStateJobInstancePopulatedT = Dict[str, Any] + +# A dictionary of error messages that occur while attempting to validate a ToolStateJobInstanceT and transform it +# into a ToolStateJobInstancePopulatedT with model objects populated. Tool errors indicate the job should not be +# further processed. +ParameterValidationErrorsT = Dict[str, str] + +InputFormatT = Literal["legacy", "21.01"] diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index 989ab5a6ad45..1c3969236f13 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -37,6 +37,7 @@ from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.model.none_like import NoneDataset from galaxy.objectstore import ObjectStorePopulator +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.execute import ( DatasetCollectionElementsSliceT, DEFAULT_DATASET_COLLECTION_ELEMENTS, @@ -45,7 +46,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ( filter_output, @@ -88,7 +88,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, @@ -401,7 +401,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/data_manager.py b/lib/galaxy/tools/actions/data_manager.py index c24e86fd0afb..d8786ad5a921 100644 --- a/lib/galaxy/tools/actions/data_manager.py +++ b/lib/galaxy/tools/actions/data_manager.py @@ -7,6 +7,7 @@ ) from galaxy.model.base import transaction from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.execute import ( DatasetCollectionElementsSliceT, DEFAULT_DATASET_COLLECTION_ELEMENTS, @@ -15,7 +16,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache from . import ( @@ -33,7 +33,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/metadata.py b/lib/galaxy/tools/actions/metadata.py index f7d6ce844a9d..2b46c6060ccd 100644 --- a/lib/galaxy/tools/actions/metadata.py +++ b/lib/galaxy/tools/actions/metadata.py @@ -16,6 +16,7 @@ ) from galaxy.model.base import transaction from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.execute import ( DatasetCollectionElementsSliceT, DEFAULT_DATASET_COLLECTION_ELEMENTS, @@ -24,7 +25,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.util import asbool @@ -43,7 +43,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/model_operations.py b/lib/galaxy/tools/actions/model_operations.py index 1b18adcf39f6..fdfecfb9c65e 100644 --- a/lib/galaxy/tools/actions/model_operations.py +++ b/lib/galaxy/tools/actions/model_operations.py @@ -10,6 +10,7 @@ ) from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.objectstore import ObjectStorePopulator +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.actions import ( DefaultToolAction, OutputCollections, @@ -24,7 +25,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache @@ -52,7 +52,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index aef512f61f37..f886a55f08f1 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -35,6 +35,10 @@ ToolExecutionCache, ) from galaxy.tools.parameters.workflow_utils import is_runtime_value +from ._types import ( + ToolRequestT, + ToolStateJobInstancePopulatedT, +) if typing.TYPE_CHECKING: from galaxy.tools import Tool @@ -48,10 +52,6 @@ CompletedJobsT = Dict[int, Optional[model.Job]] JobCallbackT = Callable WorkflowResourceParametersT = Dict[str, Any] -# Input dictionary from the API, may include map/reduce instructions -ToolParameterRequestT = Dict[str, Any] -# Input dictionary extracted from a tool request for running a tool individually -ToolParameterRequestInstanceT = Dict[str, Any] DatasetCollectionElementsSliceT = Dict[str, model.DatasetCollectionElement] DEFAULT_USE_CACHED_JOB = False DEFAULT_PREFERRED_OBJECT_STORE_ID: Optional[str] = None @@ -67,8 +67,8 @@ def __init__(self, execution_tracker: "ExecutionTracker"): class MappingParameters(NamedTuple): - param_template: ToolParameterRequestT - param_combinations: List[ToolParameterRequestInstanceT] + param_template: ToolRequestT + param_combinations: List[ToolStateJobInstancePopulatedT] def execute( @@ -242,14 +242,14 @@ def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optiona class ExecutionSlice: job_index: int - param_combination: ToolParameterRequestInstanceT + param_combination: ToolStateJobInstancePopulatedT dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] history: Optional[model.History] def __init__( self, job_index: int, - param_combination: ToolParameterRequestInstanceT, + param_combination: ToolStateJobInstancePopulatedT, dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, ): self.job_index = job_index diff --git a/lib/galaxy/tools/parameters/__init__.py b/lib/galaxy/tools/parameters/__init__.py index 7e72bf3a9eb1..ed5f3e472d95 100644 --- a/lib/galaxy/tools/parameters/__init__.py +++ b/lib/galaxy/tools/parameters/__init__.py @@ -32,6 +32,7 @@ runtime_to_json, ) from .wrapped import flat_to_nested_state +from .._types import InputFormatT REPLACE_ON_TRUTHY = object() @@ -351,7 +352,7 @@ def populate_state( context=None, check=True, simple_errors=True, - input_format="legacy", + input_format: InputFormatT = "legacy", ): """ Populates nested state dict from incoming parameter values. diff --git a/lib/galaxy/tools/parameters/meta.py b/lib/galaxy/tools/parameters/meta.py index fee9f6d6079e..e275a44e03ce 100644 --- a/lib/galaxy/tools/parameters/meta.py +++ b/lib/galaxy/tools/parameters/meta.py @@ -21,6 +21,10 @@ ) from galaxy.util import permutations from . import visit_input_values +from ._types import ( + ToolRequestT, + ToolStateJobInstanceT, +) from .wrapped import process_key log = logging.getLogger(__name__) @@ -154,10 +158,10 @@ def is_batch(value): return WorkflowParameterExpansion(param_combinations, params_keys, input_combinations) -ExpandedT = Tuple[List[Dict[str, Any]], Optional[matching.MatchingCollections]] +ExpandedT = Tuple[List[ToolStateJobInstanceT], Optional[matching.MatchingCollections]] -def expand_meta_parameters(trans, tool, incoming) -> ExpandedT: +def expand_meta_parameters(trans, tool, incoming: ToolRequestT) -> ExpandedT: """ Take in a dictionary of raw incoming parameters and expand to a list of expanded incoming parameters (one set of parameters per tool diff --git a/lib/galaxy/webapps/galaxy/api/jobs.py b/lib/galaxy/webapps/galaxy/api/jobs.py index 6aebebe5ec3c..9eb5efb40938 100644 --- a/lib/galaxy/webapps/galaxy/api/jobs.py +++ b/lib/galaxy/webapps/galaxy/api/jobs.py @@ -71,7 +71,7 @@ JobIndexViewEnum, JobsService, ) -from galaxy.work.context import WorkRequestContext +from galaxy.work.context import proxy_work_context_for_history log = logging.getLogger(__name__) @@ -478,10 +478,8 @@ def search( for k, v in payload.__annotations__.items(): if k.startswith("files_") or k.startswith("__files_"): inputs[k] = v - request_context = WorkRequestContext(app=trans.app, user=trans.user, history=trans.history) - all_params, all_errors, _, _ = tool.expand_incoming( - trans=trans, incoming=inputs, request_context=request_context - ) + request_context = proxy_work_context_for_history(trans) + all_params, all_errors, _, _ = tool.expand_incoming(request_context, incoming=inputs) if any(all_errors): return [] params_dump = [tool.params_to_strings(param, trans.app, nested=True) for param in all_params] diff --git a/lib/galaxy/work/context.py b/lib/galaxy/work/context.py index 025fb7aac920..81db0f7e1c6c 100644 --- a/lib/galaxy/work/context.py +++ b/lib/galaxy/work/context.py @@ -175,7 +175,7 @@ def set_history(self, history): def proxy_work_context_for_history( trans: ProvidesHistoryContext, history: Optional[History] = None, workflow_building_mode=False -): +) -> WorkRequestContext: """Create a WorkContext for supplied context with potentially different history. This provides semi-structured access to a transaction/work context with a supplied target