Skip to content

Commit

Permalink
Typing around tool state.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Aug 29, 2024
1 parent 7227bd2 commit 6437858
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 76 deletions.
135 changes: 85 additions & 50 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,17 @@
get_tool_shed_url_from_tool_shed_registry,
)
from galaxy.version import VERSION_MAJOR
from galaxy.work.context import proxy_work_context_for_history
from galaxy.work.context import (
proxy_work_context_for_history,
WorkRequestContext,
)
from ._types import (
InputFormatT,
ParameterValidationErrorsT,
ToolRequestT,
ToolStateJobInstancePopulatedT,
ToolStateJobInstanceT,
)
from .execute import (
DatasetCollectionElementsSliceT,
DEFAULT_JOB_CALLBACK,
Expand All @@ -195,8 +205,6 @@
ExecutionSlice,
JobCallbackT,
MappingParameters,
ToolParameterRequestInstanceT,
ToolParameterRequestT,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -1796,21 +1804,16 @@ def visit_inputs(self, values, callback):
if self.check_values:
visit_input_values(self.inputs, values, callback)

def expand_incoming(self, trans, incoming, request_context, input_format="legacy"):
rerun_remap_job_id = None
if "rerun_remap_job_id" in incoming:
try:
rerun_remap_job_id = trans.app.security.decode_id(incoming["rerun_remap_job_id"])
except Exception as exception:
log.error(str(exception))
raise exceptions.MessageException(
"Failure executing tool with id '%s' (attempting to rerun invalid job).", self.id
)

def expand_incoming(
self, request_context: WorkRequestContext, incoming: ToolRequestT, input_format: InputFormatT = "legacy"
):
rerun_remap_job_id = _rerun_remap_job_id(request_context, incoming)
set_dataset_matcher_factory(request_context, self)

# Fixed set of input parameters may correspond to any number of jobs.
# Expand these out to individual parameters for given jobs (tool executions).
expanded_incomings: List[ToolStateJobInstanceT]
collection_info: Optional[MatchingCollections]
expanded_incomings, collection_info = expand_meta_parameters(trans, self, incoming)

# Remapping a single job to many jobs doesn't make sense, so disable
Expand All @@ -1832,49 +1835,60 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy
"Validated and populated state for tool request",
)
all_errors = []
all_params = []
all_params: List[ToolStateJobInstancePopulatedT] = []

for expanded_incoming in expanded_incomings:
params = {}
errors: Dict[str, str] = {}
if self.input_translator:
self.input_translator.translate(expanded_incoming)
if not self.check_values:
# If `self.check_values` is false we don't do any checking or
# processing on input This is used to pass raw values
# through to/from external sites.
params = expanded_incoming
else:
# Update state for all inputs on the current page taking new
# values from `incoming`.
populate_state(
request_context,
self.inputs,
expanded_incoming,
params,
errors,
simple_errors=False,
input_format=input_format,
)
# If the tool provides a `validate_input` hook, call it.
validate_input = self.get_hook("validate_input")
if validate_input:
# hooks are so terrible ... this is specifically for https://github.com/galaxyproject/tools-devteam/blob/main/tool_collections/gops/basecoverage/operation_filter.py
legacy_non_dce_params = {
k: v.hda if isinstance(v, model.DatasetCollectionElement) and v.hda else v
for k, v in params.items()
}
validate_input(request_context, errors, legacy_non_dce_params, self.inputs)
params, errors = self._populate(request_context, expanded_incoming, input_format)
all_errors.append(errors)
all_params.append(params)
unset_dataset_matcher_factory(request_context)

log.info(validation_timer)
return all_params, all_errors, rerun_remap_job_id, collection_info

def _populate(
request_context, expanded_incoming: ToolStateJobInstanceT, input_format: InputFormatT
) -> Tuple[ToolStateJobInstancePopulatedT, ParameterValidationErrorsT]:
"""Validate expanded parameters for a job to replace references with model objects.
So convert a ToolStateJobInstanceT to a ToolStateJobInstancePopulatedT.
"""
params: ToolStateJobInstancePopulatedT = {}
errors: ParameterValidationErrorsT = {}
if self.input_translator:
self.input_translator.translate(expanded_incoming)
if not self.check_values:
# If `self.check_values` is false we don't do any checking or
# processing on input This is used to pass raw values
# through to/from external sites.
params = cast(ToolStateJobInstancePopulatedT, expanded_incoming)
else:
# Update state for all inputs on the current page taking new
# values from `incoming`.
populate_state(
request_context,
self.inputs,
expanded_incoming,
params,
errors,
simple_errors=False,
input_format=input_format,
)
# If the tool provides a `validate_input` hook, call it.
validate_input = self.get_hook("validate_input")
if validate_input:
# hooks are so terrible ... this is specifically for https://github.com/galaxyproject/tools-devteam/blob/main/tool_collections/gops/basecoverage/operation_filter.py
legacy_non_dce_params = {
k: v.hda if isinstance(v, model.DatasetCollectionElement) and v.hda else v
for k, v in params.items()
}
validate_input(request_context, errors, legacy_non_dce_params, self.inputs)
return params, errors

def handle_input(
self,
trans,
incoming: ToolParameterRequestT,
incoming: ToolRequestT,
history: Optional[model.History] = None,
use_cached_job: bool = DEFAULT_USE_CACHED_JOB,
preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID,
Expand All @@ -1887,9 +1901,15 @@ def handle_input(
there were no errors).
"""
request_context = proxy_work_context_for_history(trans, history=history)
all_params, all_errors, rerun_remap_job_id, collection_info = self.expand_incoming(

expanded = self.expand_incoming(
trans=trans, incoming=incoming, request_context=request_context, input_format=input_format
)
all_params: List[ToolStateJobInstancePopulatedT] = expanded[0]
all_errors: List[ParameterValidationErrorsT] = expanded[1]
rerun_remap_job_id: Optional[int] = expanded[2]
collection_info: Optional[MatchingCollections] = expanded[3]

# If there were errors, we stay on the same page and display them
self.handle_incoming_errors(all_errors)

Expand Down Expand Up @@ -1935,7 +1955,7 @@ def handle_input(
implicit_collections=execution_tracker.implicit_collections,
)

def handle_incoming_errors(self, all_errors):
def handle_incoming_errors(self, all_errors: List[ParameterValidationErrorsT]):
if any(all_errors):
# simple param_key -> message string for tool form.
err_data = {key: unicodify(value) for d in all_errors for (key, value) in d.items()}
Expand Down Expand Up @@ -2060,7 +2080,7 @@ def get_static_param_values(self, trans):
def execute(
self,
trans,
incoming: Optional[ToolParameterRequestInstanceT] = None,
incoming: Optional[ToolStateJobInstancePopulatedT] = None,
history: Optional[model.History] = None,
set_output_hid: bool = DEFAULT_SET_OUTPUT_HID,
flush_job: bool = True,
Expand All @@ -2086,7 +2106,7 @@ def execute(
def _execute(
self,
trans,
incoming: Optional[ToolParameterRequestInstanceT] = None,
incoming: Optional[ToolStateJobInstancePopulatedT] = None,
history: Optional[model.History] = None,
rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID,
execution_cache: Optional[ToolExecutionCache] = None,
Expand Down Expand Up @@ -4110,6 +4130,21 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history


# ---- Utility classes to be factored out -----------------------------------


def _rerun_remap_job_id(trans, incoming) -> Optional[int]:
rerun_remap_job_id = None
if "rerun_remap_job_id" in incoming:
try:
rerun_remap_job_id = trans.app.security.decode_id(incoming["rerun_remap_job_id"])
except Exception as exception:
log.error(str(exception))
raise exceptions.MessageException(
"Failure executing tool with id '%s' (attempting to rerun invalid job).", self.id
)
return rerun_remap_job_id


class TracksterConfig:
"""Trackster configuration encapsulation."""

Expand Down
30 changes: 30 additions & 0 deletions lib/galaxy/tools/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import (
Any,
Dict,
)

from typing_extensions import Literal

# Tool state goes through several different walk throughs that are difficult to follow I think,
# hopefully datatypes can be used as markers to describe what has been done - even if they don't
# provide strong traditional typing semantics.

# Input dictionary from the API, may include map/reduce instructions. Objects are referenced by "src"
# dictionaries and encoded IDS.
ToolRequestT = Dict[str, Any]

# Input dictionary extracted from a tool request for running a tool individually as a single job. Objects are referenced
# by "src" dictionaries with encoded IDs still but batch instructions have been pulled out. Parameters have not
# been "checked" (check_param has not been called).
ToolStateJobInstanceT = Dict[str, Any]

# Input dictionary for an individual job where objects are their model objects and parameters have been
# "checked" (check_param has been called).
ToolStateJobInstancePopulatedT = Dict[str, Any]

# A dictionary of error messages that occur while attempting to validate a ToolStateJobInstanceT and transform it
# into a ToolStateJobInstancePopulatedT with model objects populated. Tool errors indicate the job should not be
# further processed.
ParameterValidationErrorsT = Dict[str, str]

InputFormatT = Literal["legacy", "21.01"]
6 changes: 3 additions & 3 deletions lib/galaxy/tools/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.model.none_like import NoneDataset
from galaxy.objectstore import ObjectStorePopulator
from galaxy.tools._types import ToolStateJobInstancePopulatedT
from galaxy.tools.execute import (
DatasetCollectionElementsSliceT,
DEFAULT_DATASET_COLLECTION_ELEMENTS,
Expand All @@ -45,7 +46,6 @@
DEFAULT_RERUN_REMAP_JOB_ID,
DEFAULT_SET_OUTPUT_HID,
JobCallbackT,
ToolParameterRequestInstanceT,
)
from galaxy.tools.execution_helpers import (
filter_output,
Expand Down Expand Up @@ -88,7 +88,7 @@ def execute(
self,
tool,
trans,
incoming: Optional[ToolParameterRequestInstanceT] = None,
incoming: Optional[ToolStateJobInstancePopulatedT] = None,
history: Optional[History] = None,
job_params=None,
rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID,
Expand Down Expand Up @@ -401,7 +401,7 @@ def execute(
self,
tool,
trans,
incoming: Optional[ToolParameterRequestInstanceT] = None,
incoming: Optional[ToolStateJobInstancePopulatedT] = None,
history: Optional[History] = None,
job_params=None,
rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID,
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/tools/actions/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
)
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.tools._types import ToolStateJobInstancePopulatedT
from galaxy.tools.execute import (
DatasetCollectionElementsSliceT,
DEFAULT_DATASET_COLLECTION_ELEMENTS,
Expand All @@ -15,7 +16,6 @@
DEFAULT_RERUN_REMAP_JOB_ID,
DEFAULT_SET_OUTPUT_HID,
JobCallbackT,
ToolParameterRequestInstanceT,
)
from galaxy.tools.execution_helpers import ToolExecutionCache
from . import (
Expand All @@ -33,7 +33,7 @@ def execute(
self,
tool,
trans,
incoming: Optional[ToolParameterRequestInstanceT] = None,
incoming: Optional[ToolStateJobInstancePopulatedT] = None,
history: Optional[History] = None,
job_params=None,
rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID,
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/tools/actions/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.tools._types import ToolStateJobInstancePopulatedT
from galaxy.tools.execute import (
DatasetCollectionElementsSliceT,
DEFAULT_DATASET_COLLECTION_ELEMENTS,
Expand All @@ -24,7 +25,6 @@
DEFAULT_RERUN_REMAP_JOB_ID,
DEFAULT_SET_OUTPUT_HID,
JobCallbackT,
ToolParameterRequestInstanceT,
)
from galaxy.tools.execution_helpers import ToolExecutionCache
from galaxy.util import asbool
Expand All @@ -43,7 +43,7 @@ def execute(
self,
tool,
trans,
incoming: Optional[ToolParameterRequestInstanceT] = None,
incoming: Optional[ToolStateJobInstancePopulatedT] = None,
history: Optional[History] = None,
job_params=None,
rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID,
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/tools/actions/model_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.objectstore import ObjectStorePopulator
from galaxy.tools._types import ToolStateJobInstancePopulatedT
from galaxy.tools.actions import (
DefaultToolAction,
OutputCollections,
Expand All @@ -24,7 +25,6 @@
DEFAULT_RERUN_REMAP_JOB_ID,
DEFAULT_SET_OUTPUT_HID,
JobCallbackT,
ToolParameterRequestInstanceT,
)
from galaxy.tools.execution_helpers import ToolExecutionCache

Expand Down Expand Up @@ -52,7 +52,7 @@ def execute(
self,
tool,
trans,
incoming: Optional[ToolParameterRequestInstanceT] = None,
incoming: Optional[ToolStateJobInstancePopulatedT] = None,
history: Optional[History] = None,
job_params=None,
rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID,
Expand Down
Loading

0 comments on commit 6437858

Please sign in to comment.