diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 546085b26a2e..f8ac543ca954 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -9,6 +9,7 @@ Dict, List, Optional, + Union, ) import sqlalchemy @@ -41,7 +42,10 @@ Safety, ) from galaxy.managers.collections import DatasetCollectionManager -from galaxy.managers.context import ProvidesUserContext +from galaxy.managers.context import ( + ProvidesHistoryContext, + ProvidesUserContext, +) from galaxy.managers.datasets import DatasetManager from galaxy.managers.hdas import HDAManager from galaxy.managers.lddas import LDDAManager @@ -73,6 +77,10 @@ ExecutionTimer, listify, ) +from galaxy.tools._types import ( + ToolStateJobInstancePopulatedT, + ToolStateDumpedToJsonInternalT, +) from galaxy.util.search import ( FilteredTerm, parse_filters_structured, @@ -81,6 +89,8 @@ log = logging.getLogger(__name__) +JobStateT = str +JobStatesT = Union[JobStateT, List[JobStateT]] class JobLock(BaseModel): active: bool = Field(title="Job lock status", description="If active, jobs will not dispatch") @@ -311,7 +321,7 @@ def __init__( self.ldda_manager = ldda_manager self.decode_id = id_encoding_helper.decode_id - def by_tool_input(self, trans, tool_id, tool_version, param=None, param_dump=None, job_state="ok"): + def by_tool_input(self, trans: ProvidesHistoryContext, tool_id: str, tool_version: Optional[str], param: ToolStateJobInstancePopulatedT, param_dump: ToolStateDumpedToJsonInternalT, job_state: JobStatesT = "ok"): """Search for jobs producing same results using the 'inputs' part of a tool POST.""" user = trans.user input_data = defaultdict(list) @@ -353,7 +363,7 @@ def populate_input_data_input_id(path, key, value): ) def __search( - self, tool_id, tool_version, user, input_data, job_state=None, param_dump=None, wildcard_param_dump=None + self, tool_id: str, tool_version: Optional[str], user: model.User, input_data, job_state=None, param_dump=None, wildcard_param_dump=None ): search_timer = ExecutionTimer() @@ -462,7 +472,7 @@ def replace_dataset_ids(path, key, value): log.info("No equivalent jobs found %s", search_timer) return None - def _build_job_subquery(self, tool_id, user_id, tool_version, job_state, wildcard_param_dump): + def _build_job_subquery(self, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump): """Build subquery that selects a job with correct job parameters.""" stmt = select(model.Job.id).where( and_( diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 7193fd6ea618..afff2cfe30f3 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -120,6 +120,8 @@ check_param, params_from_strings, params_to_incoming, + params_to_json_internal, + params_to_json, params_to_strings, populate_state, visit_input_values, @@ -183,7 +185,19 @@ get_tool_shed_url_from_tool_shed_registry, ) from galaxy.version import VERSION_MAJOR -from galaxy.work.context import proxy_work_context_for_history +from galaxy.work.context import ( + proxy_work_context_for_history, + WorkRequestContext, +) +from ._types import ( + InputFormatT, + ParameterValidationErrorsT, + ToolRequestT, + ToolStateJobInstancePopulatedT, + ToolStateJobInstanceT, + ToolStateDumpedToJsonT, + ToolStateDumpedToJsonInternalT, +) from .execute import ( DatasetCollectionElementsSliceT, DEFAULT_JOB_CALLBACK, @@ -195,8 +209,6 @@ ExecutionSlice, JobCallbackT, MappingParameters, - ToolParameterRequestInstanceT, - ToolParameterRequestT, ) if TYPE_CHECKING: @@ -1796,22 +1808,17 @@ def visit_inputs(self, values, callback): if self.check_values: visit_input_values(self.inputs, values, callback) - def expand_incoming(self, trans, incoming, request_context, input_format="legacy"): - rerun_remap_job_id = None - if "rerun_remap_job_id" in incoming: - try: - rerun_remap_job_id = trans.app.security.decode_id(incoming["rerun_remap_job_id"]) - except Exception as exception: - log.error(str(exception)) - raise exceptions.MessageException( - "Failure executing tool with id '%s' (attempting to rerun invalid job).", self.id - ) - + def expand_incoming( + self, request_context: WorkRequestContext, incoming: ToolRequestT, input_format: InputFormatT = "legacy" + ): + rerun_remap_job_id = _rerun_remap_job_id(request_context, incoming, self.id) set_dataset_matcher_factory(request_context, self) # Fixed set of input parameters may correspond to any number of jobs. # Expand these out to individual parameters for given jobs (tool executions). - expanded_incomings, collection_info = expand_meta_parameters(trans, self, incoming) + expanded_incomings: List[ToolStateJobInstanceT] + collection_info: Optional[MatchingCollections] + expanded_incomings, collection_info = expand_meta_parameters(request_context, self, incoming) # Remapping a single job to many jobs doesn't make sense, so disable # remap if multi-runs of tools are being used. @@ -1832,38 +1839,10 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy "Validated and populated state for tool request", ) all_errors = [] - all_params = [] + all_params: List[ToolStateJobInstancePopulatedT] = [] + for expanded_incoming in expanded_incomings: - params = {} - errors: Dict[str, str] = {} - if self.input_translator: - self.input_translator.translate(expanded_incoming) - if not self.check_values: - # If `self.check_values` is false we don't do any checking or - # processing on input This is used to pass raw values - # through to/from external sites. - params = expanded_incoming - else: - # Update state for all inputs on the current page taking new - # values from `incoming`. - populate_state( - request_context, - self.inputs, - expanded_incoming, - params, - errors, - simple_errors=False, - input_format=input_format, - ) - # If the tool provides a `validate_input` hook, call it. - validate_input = self.get_hook("validate_input") - if validate_input: - # hooks are so terrible ... this is specifically for https://github.com/galaxyproject/tools-devteam/blob/main/tool_collections/gops/basecoverage/operation_filter.py - legacy_non_dce_params = { - k: v.hda if isinstance(v, model.DatasetCollectionElement) and v.hda else v - for k, v in params.items() - } - validate_input(request_context, errors, legacy_non_dce_params, self.inputs) + params, errors = self._populate(request_context, expanded_incoming, input_format) all_errors.append(errors) all_params.append(params) unset_dataset_matcher_factory(request_context) @@ -1871,14 +1850,70 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy log.info(validation_timer) return all_params, all_errors, rerun_remap_job_id, collection_info + def _populate( + self, request_context, expanded_incoming: ToolStateJobInstanceT, input_format: InputFormatT + ) -> Tuple[ToolStateJobInstancePopulatedT, ParameterValidationErrorsT]: + """Validate expanded parameters for a job to replace references with model objects. + + So convert a ToolStateJobInstanceT to a ToolStateJobInstancePopulatedT. + """ + params: ToolStateJobInstancePopulatedT = {} + errors: ParameterValidationErrorsT = {} + if self.input_translator: + self.input_translator.translate(expanded_incoming) + if not self.check_values: + # If `self.check_values` is false we don't do any checking or + # processing on input This is used to pass raw values + # through to/from external sites. + params = cast(ToolStateJobInstancePopulatedT, expanded_incoming) + else: + # Update state for all inputs on the current page taking new + # values from `incoming`. + populate_state( + request_context, + self.inputs, + expanded_incoming, + params, + errors, + simple_errors=False, + input_format=input_format, + ) + # If the tool provides a `validate_input` hook, call it. + validate_input = self.get_hook("validate_input") + if validate_input: + # hooks are so terrible ... this is specifically for https://github.com/galaxyproject/tools-devteam/blob/main/tool_collections/gops/basecoverage/operation_filter.py + legacy_non_dce_params = { + k: v.hda if isinstance(v, model.DatasetCollectionElement) and v.hda else v + for k, v in params.items() + } + validate_input(request_context, errors, legacy_non_dce_params, self.inputs) + return params, errors + + def completed_jobs(self, trans, use_cached_job: bool, all_params: List[ToolStateJobInstancePopulatedT]) -> Dict[int, Optional[model.Job]]: + completed_jobs: Dict[int, Optional[model.Job]] = {} + for i, param in enumerate(all_params): + if use_cached_job: + param_dump: ToolStateDumpedToJsonInternalT = params_to_json_internal(self.inputs, param, self.app) + completed_jobs[i] = self.job_search.by_tool_input( + trans=trans, + tool_id=self.id, + tool_version=self.version, + param=param, + param_dump=param_dump, + job_state=None, + ) + else: + completed_jobs[i] = None + return completed_jobs + def handle_input( self, trans, - incoming: ToolParameterRequestT, + incoming: ToolRequestT, history: Optional[model.History] = None, use_cached_job: bool = DEFAULT_USE_CACHED_JOB, preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, - input_format: str = "legacy", + input_format: InputFormatT = "legacy", ): """ Process incoming parameters for this tool from the dict `incoming`, @@ -1887,26 +1922,20 @@ def handle_input( there were no errors). """ request_context = proxy_work_context_for_history(trans, history=history) - all_params, all_errors, rerun_remap_job_id, collection_info = self.expand_incoming( + + expanded = self.expand_incoming( trans=trans, incoming=incoming, request_context=request_context, input_format=input_format ) + all_params: List[ToolStateJobInstancePopulatedT] = expanded[0] + all_errors: List[ParameterValidationErrorsT] = expanded[1] + rerun_remap_job_id: Optional[int] = expanded[2] + collection_info: Optional[MatchingCollections] = expanded[3] + # If there were errors, we stay on the same page and display them self.handle_incoming_errors(all_errors) mapping_params = MappingParameters(incoming, all_params) - completed_jobs: Dict[int, Optional[model.Job]] = {} - for i, param in enumerate(all_params): - if use_cached_job: - completed_jobs[i] = self.job_search.by_tool_input( - trans=trans, - tool_id=self.id, - tool_version=self.version, - param=param, - param_dump=self.params_to_strings(param, self.app, nested=True), - job_state=None, - ) - else: - completed_jobs[i] = None + completed_jobs: Dict[int, Optional[model.Job]] = self.completed_jobs(trans, use_cached_job, all_params) execution_tracker = execute_job( trans, self, @@ -1935,7 +1964,7 @@ def handle_input( implicit_collections=execution_tracker.implicit_collections, ) - def handle_incoming_errors(self, all_errors): + def handle_incoming_errors(self, all_errors: List[ParameterValidationErrorsT]): if any(all_errors): # simple param_key -> message string for tool form. err_data = {key: unicodify(value) for d in all_errors for (key, value) in d.items()} @@ -2060,7 +2089,7 @@ def get_static_param_values(self, trans): def execute( self, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[model.History] = None, set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, flush_job: bool = True, @@ -2086,7 +2115,7 @@ def execute( def _execute( self, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[model.History] = None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, execution_cache: Optional[ToolExecutionCache] = None, @@ -2128,7 +2157,7 @@ def _execute( log.error("Tool execution failed for job: %s", job_id) raise - def params_to_strings(self, params, app, nested=False): + def params_to_strings(self, params: ToolStateJobInstancePopulatedT, app, nested=False): return params_to_strings(self.inputs, params, app, nested) def params_from_strings(self, params, app, ignore_errors=False): @@ -2581,6 +2610,8 @@ def to_json(self, trans, kwd=None, job=None, workflow_building_mode=False, histo else: action = self.app.url_for(self.action) + state_inputs: ToolStateDumpedToJsonT = params_to_json(self.inputs, state_inputs, self.app) + # update tool model tool_model.update( { @@ -2594,7 +2625,7 @@ def to_json(self, trans, kwd=None, job=None, workflow_building_mode=False, histo "requirements": [{"name": r.name, "version": r.version} for r in self.requirements], "errors": state_errors, "tool_errors": self.tool_errors, - "state_inputs": params_to_strings(self.inputs, state_inputs, self.app, use_security=True, nested=True), + "state_inputs": state_inputs, "job_id": trans.security.encode_id(job.id) if job else None, "job_remap": job.remappable() if job else None, "history_id": trans.security.encode_id(history.id) if history else None, @@ -4110,6 +4141,21 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history # ---- Utility classes to be factored out ----------------------------------- + + +def _rerun_remap_job_id(trans, incoming, tool_id: str) -> Optional[int]: + rerun_remap_job_id = None + if "rerun_remap_job_id" in incoming: + try: + rerun_remap_job_id = trans.app.security.decode_id(incoming["rerun_remap_job_id"]) + except Exception as exception: + log.error(str(exception)) + raise exceptions.MessageException( + "Failure executing tool with id '%s' (attempting to rerun invalid job).", tool_id + ) + return rerun_remap_job_id + + class TracksterConfig: """Trackster configuration encapsulation.""" diff --git a/lib/galaxy/tools/_types.py b/lib/galaxy/tools/_types.py new file mode 100644 index 000000000000..b18ed5263a28 --- /dev/null +++ b/lib/galaxy/tools/_types.py @@ -0,0 +1,62 @@ +""" +Tool state goes through several different iterations that are difficult to follow I think, +hopefully datatypes can be used as markers to describe what has been done - even if they don't +provide strong traditional typing semantics. + ++--------------------------------+------------+---------------------------------+------------+-----------+ +| Python Type | State for? | Object References | Validated? | xref | ++================================+============+=================================+============+===========+ +| ToolRequestT | request | src dicts of encoded ids | nope | | +| ToolStateJobInstanceT | a job | src dicts of encoded ids | nope | | +| ToolStateJobInstancePopulatedT | a job | model objs loaded from db | check_param | | +| ToolStateDumpedToJsonT | a job | src dicts of encoded ids | " | | +| | | (normalized into values attr) | " | | +| ToolStateDumpedToJsonInternalT | a job | src dicts of decoded ids | " | | +| | | (normalized into values attr) | " | | +| ToolStateDumpedToStringsT | a job | src dicts dumped to strs | " | | +| | | (normalized into values attr) | " | | ++--------------------------------+------------+---------------------------------+-------------+----------+ +""" + +from typing import ( + Any, + Dict, +) + +from typing_extensions import Literal + +# Input dictionary from the API, may include map/reduce instructions. Objects are referenced by "src" +# dictionaries and encoded IDS. +ToolRequestT = Dict[str, Any] + +# Input dictionary extracted from a tool request for running a tool individually as a single job. Objects are referenced +# by "src" dictionaries with encoded IDs still but batch instructions have been pulled out. Parameters have not +# been "checked" (check_param has not been called). +ToolStateJobInstanceT = Dict[str, Any] + +# Input dictionary for an individual job where objects are their model objects and parameters have been +# "checked" (check_param has been called). +ToolStateJobInstancePopulatedT = Dict[str, Any] + +# Input dictionary for an individual where the state has been valiated and populated but then converted back down +# to json. Object references are unified in the format of {"values": List["src" dictionary]} where the src dictionaries. +# are decoded ids (ints). +# See comments on galaxy.tools.parameters.params_to_strings for more information. +ToolStateDumpedToJsonInternalT = Dict[str, Any] + +# Input dictionary for an individual where the state has been valiated and populated but then converted back down +# to json. Object references are unified in the format of {"values": List["src" dictionary]} where src dictonaries +# are encoded (ids). See comments on galaxy.tools.parameters.params_to_strings for more information. +ToolStateDumpedToJsonT = Dict[str, Any] + +# Input dictionary for an individual where the state has been valiated and populated but then converted back down +# to json. Object references are unified in the format of {"values": List["src" dictionary]} but dumped into +# strings. See comments on galaxy.tools.parameters.params_to_strings for more information. +ToolStateDumpedToStringsT = Dict[str, str] + +# A dictionary of error messages that occur while attempting to validate a ToolStateJobInstanceT and transform it +# into a ToolStateJobInstancePopulatedT with model objects populated. Tool errors indicate the job should not be +# further processed. +ParameterValidationErrorsT = Dict[str, str] + +InputFormatT = Literal["legacy", "21.01"] diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index 989ab5a6ad45..1c3969236f13 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -37,6 +37,7 @@ from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.model.none_like import NoneDataset from galaxy.objectstore import ObjectStorePopulator +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.execute import ( DatasetCollectionElementsSliceT, DEFAULT_DATASET_COLLECTION_ELEMENTS, @@ -45,7 +46,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ( filter_output, @@ -88,7 +88,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, @@ -401,7 +401,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/data_manager.py b/lib/galaxy/tools/actions/data_manager.py index c24e86fd0afb..d8786ad5a921 100644 --- a/lib/galaxy/tools/actions/data_manager.py +++ b/lib/galaxy/tools/actions/data_manager.py @@ -7,6 +7,7 @@ ) from galaxy.model.base import transaction from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.execute import ( DatasetCollectionElementsSliceT, DEFAULT_DATASET_COLLECTION_ELEMENTS, @@ -15,7 +16,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache from . import ( @@ -33,7 +33,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/metadata.py b/lib/galaxy/tools/actions/metadata.py index f7d6ce844a9d..2b46c6060ccd 100644 --- a/lib/galaxy/tools/actions/metadata.py +++ b/lib/galaxy/tools/actions/metadata.py @@ -16,6 +16,7 @@ ) from galaxy.model.base import transaction from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.execute import ( DatasetCollectionElementsSliceT, DEFAULT_DATASET_COLLECTION_ELEMENTS, @@ -24,7 +25,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.util import asbool @@ -43,7 +43,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/model_operations.py b/lib/galaxy/tools/actions/model_operations.py index 1b18adcf39f6..fdfecfb9c65e 100644 --- a/lib/galaxy/tools/actions/model_operations.py +++ b/lib/galaxy/tools/actions/model_operations.py @@ -10,6 +10,7 @@ ) from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.objectstore import ObjectStorePopulator +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.actions import ( DefaultToolAction, OutputCollections, @@ -24,7 +25,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache @@ -52,7 +52,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index aef512f61f37..f886a55f08f1 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -35,6 +35,10 @@ ToolExecutionCache, ) from galaxy.tools.parameters.workflow_utils import is_runtime_value +from ._types import ( + ToolRequestT, + ToolStateJobInstancePopulatedT, +) if typing.TYPE_CHECKING: from galaxy.tools import Tool @@ -48,10 +52,6 @@ CompletedJobsT = Dict[int, Optional[model.Job]] JobCallbackT = Callable WorkflowResourceParametersT = Dict[str, Any] -# Input dictionary from the API, may include map/reduce instructions -ToolParameterRequestT = Dict[str, Any] -# Input dictionary extracted from a tool request for running a tool individually -ToolParameterRequestInstanceT = Dict[str, Any] DatasetCollectionElementsSliceT = Dict[str, model.DatasetCollectionElement] DEFAULT_USE_CACHED_JOB = False DEFAULT_PREFERRED_OBJECT_STORE_ID: Optional[str] = None @@ -67,8 +67,8 @@ def __init__(self, execution_tracker: "ExecutionTracker"): class MappingParameters(NamedTuple): - param_template: ToolParameterRequestT - param_combinations: List[ToolParameterRequestInstanceT] + param_template: ToolRequestT + param_combinations: List[ToolStateJobInstancePopulatedT] def execute( @@ -242,14 +242,14 @@ def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optiona class ExecutionSlice: job_index: int - param_combination: ToolParameterRequestInstanceT + param_combination: ToolStateJobInstancePopulatedT dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] history: Optional[model.History] def __init__( self, job_index: int, - param_combination: ToolParameterRequestInstanceT, + param_combination: ToolStateJobInstancePopulatedT, dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, ): self.job_index = job_index diff --git a/lib/galaxy/tools/parameters/__init__.py b/lib/galaxy/tools/parameters/__init__.py index 7e72bf3a9eb1..e4cf75f668d3 100644 --- a/lib/galaxy/tools/parameters/__init__.py +++ b/lib/galaxy/tools/parameters/__init__.py @@ -4,6 +4,7 @@ from json import dumps from typing import ( + cast, Dict, Union, ) @@ -32,6 +33,13 @@ runtime_to_json, ) from .wrapped import flat_to_nested_state +from .._types import ( + InputFormatT, + ToolStateDumpedToJsonT, + ToolStateDumpedToJsonInternalT, + ToolStateDumpedToStringT, + ToolStateJobInstancePopulatedT, +) REPLACE_ON_TRUTHY = object() @@ -253,15 +261,43 @@ def check_param(trans, param, incoming_value, param_values, simple_errors=True): return value, error +def params_to_json_internal( + params: Dict[str, Union[Group, ToolParameter]], + param_values: ToolStateJobInstancePopulatedT, + app +) -> ToolStateDumpedToJsonInternalT: + """Return ToolStateDumpedToJsonT for supplied validated and populated parameters.""" + return cast( + ToolStateDumpedToJsonInternalT, params_to_strings(params, param_values, app, nested=True, use_security=False) + ) + + +def params_to_json( + params: Dict[str, Union[Group, ToolParameter]], + param_values: ToolStateJobInstancePopulatedT, + app +) -> ToolStateDumpedToJsonT: + """Return ToolStateDumpedToJsonT for supplied validated and populated parameters.""" + return cast( + ToolStateDumpedToJsonT, params_to_strings(params, param_values, app, nested=True, use_security=True) + ) + + def params_to_strings( - params: Dict[str, Union[Group, ToolParameter]], param_values: Dict, app, nested=False, use_security=False -) -> Dict: + params: Dict[str, Union[Group, ToolParameter]], + param_values: ToolStateJobInstancePopulatedT, + app, + nested=False, + use_security=False, +) -> Union[ToolStateDumpedToJsonT, ToolStateDumpedToJsonInternalT, ToolStateDumpedToStringT]: """ Convert a dictionary of parameter values to a dictionary of strings suitable for persisting. The `value_to_basic` method of each parameter is called to convert its value to basic types, the result of which is then json encoded (this allowing complex nested parameters and - such). + such). If `nested` this will remain as a sort of JSON-ifiable dictionary + (ToolStateDumpedToJsonT), otherwise these will dumped into strings of the + JSON (ToolStateDumpedToStringsT). """ rval = {} for key, value in param_values.items(): @@ -351,7 +387,7 @@ def populate_state( context=None, check=True, simple_errors=True, - input_format="legacy", + input_format: InputFormatT = "legacy", ): """ Populates nested state dict from incoming parameter values. diff --git a/lib/galaxy/tools/parameters/meta.py b/lib/galaxy/tools/parameters/meta.py index fee9f6d6079e..e275a44e03ce 100644 --- a/lib/galaxy/tools/parameters/meta.py +++ b/lib/galaxy/tools/parameters/meta.py @@ -21,6 +21,10 @@ ) from galaxy.util import permutations from . import visit_input_values +from ._types import ( + ToolRequestT, + ToolStateJobInstanceT, +) from .wrapped import process_key log = logging.getLogger(__name__) @@ -154,10 +158,10 @@ def is_batch(value): return WorkflowParameterExpansion(param_combinations, params_keys, input_combinations) -ExpandedT = Tuple[List[Dict[str, Any]], Optional[matching.MatchingCollections]] +ExpandedT = Tuple[List[ToolStateJobInstanceT], Optional[matching.MatchingCollections]] -def expand_meta_parameters(trans, tool, incoming) -> ExpandedT: +def expand_meta_parameters(trans, tool, incoming: ToolRequestT) -> ExpandedT: """ Take in a dictionary of raw incoming parameters and expand to a list of expanded incoming parameters (one set of parameters per tool diff --git a/lib/galaxy/webapps/galaxy/api/jobs.py b/lib/galaxy/webapps/galaxy/api/jobs.py index 6aebebe5ec3c..9eb5efb40938 100644 --- a/lib/galaxy/webapps/galaxy/api/jobs.py +++ b/lib/galaxy/webapps/galaxy/api/jobs.py @@ -71,7 +71,7 @@ JobIndexViewEnum, JobsService, ) -from galaxy.work.context import WorkRequestContext +from galaxy.work.context import proxy_work_context_for_history log = logging.getLogger(__name__) @@ -478,10 +478,8 @@ def search( for k, v in payload.__annotations__.items(): if k.startswith("files_") or k.startswith("__files_"): inputs[k] = v - request_context = WorkRequestContext(app=trans.app, user=trans.user, history=trans.history) - all_params, all_errors, _, _ = tool.expand_incoming( - trans=trans, incoming=inputs, request_context=request_context - ) + request_context = proxy_work_context_for_history(trans) + all_params, all_errors, _, _ = tool.expand_incoming(request_context, incoming=inputs) if any(all_errors): return [] params_dump = [tool.params_to_strings(param, trans.app, nested=True) for param in all_params] diff --git a/lib/galaxy/work/context.py b/lib/galaxy/work/context.py index 025fb7aac920..81db0f7e1c6c 100644 --- a/lib/galaxy/work/context.py +++ b/lib/galaxy/work/context.py @@ -175,7 +175,7 @@ def set_history(self, history): def proxy_work_context_for_history( trans: ProvidesHistoryContext, history: Optional[History] = None, workflow_building_mode=False -): +) -> WorkRequestContext: """Create a WorkContext for supplied context with potentially different history. This provides semi-structured access to a transaction/work context with a supplied target diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index 9a35e4f9b07b..5ecded6a0ec1 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -30,6 +30,7 @@ ) from galaxy.job_execution.actions.post import ActionBox from galaxy.model import ( + Job, PostJobAction, Workflow, WorkflowInvocationStep, @@ -2290,19 +2291,7 @@ def callback(input, prefixed_name: str, **kwargs): param_combinations.append(execution_state.inputs) complete = False - completed_jobs = {} - for i, param in enumerate(param_combinations): - if use_cached_job: - completed_jobs[i] = tool.job_search.by_tool_input( - trans=trans, - tool_id=tool.id, - tool_version=tool.version, - param=param, - param_dump=tool.params_to_strings(param, trans.app, nested=True), - job_state=None, - ) - else: - completed_jobs[i] = None + completed_jobs: Dict[int, Optional[Job]] = tool.completed_jobs(trans, use_cached_job, param_combinations) try: mapping_params = MappingParameters(tool_state.inputs, param_combinations) max_num_jobs = progress.maximum_jobs_to_schedule_or_none