From c189af754f68a34f3f00c2b2b7ab608df87b0c89 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Thu, 31 Oct 2024 10:28:30 -0400 Subject: [PATCH] Adapt workflow parameter validator work to validator models. --- lib/galaxy/tool_util/parser/factory.py | 9 +++-- .../tool_util/parser/parameter_validators.py | 22 ++++++++++++ lib/galaxy/tool_util/parser/yaml.py | 23 +++++------- lib/galaxy/workflow/modules.py | 35 ++++++++++++++++--- 4 files changed, 67 insertions(+), 22 deletions(-) diff --git a/lib/galaxy/tool_util/parser/factory.py b/lib/galaxy/tool_util/parser/factory.py index f36647dfd40d..ce403638f7f0 100644 --- a/lib/galaxy/tool_util/parser/factory.py +++ b/lib/galaxy/tool_util/parser/factory.py @@ -119,17 +119,22 @@ def get_tool_source_from_representation(tool_format, tool_representation): raise Exception(f"Unknown tool representation format [{tool_format}].") -def get_input_source(content): +def get_input_source(content, trusted: bool = True): """Wrap dicts or XML elements as InputSource if needed. If the supplied content is already an InputSource object, it is simply returned. This allow Galaxy to uniformly consume using the tool input source interface. + + Setting trusted to false indicates that no dynamic code should be + executed - no eval. This should be used for user-defined tools (in + the future) and for workflow inputs. """ if not isinstance(content, InputSource): if isinstance(content, dict): - content = YamlInputSource(content) + content = YamlInputSource(content, trusted=trusted) else: + assert trusted # trust is not implemented for XML inputs content = XmlInputSource(content) return content diff --git a/lib/galaxy/tool_util/parser/parameter_validators.py b/lib/galaxy/tool_util/parser/parameter_validators.py index 6525b7d90fb8..b8117c63764b 100644 --- a/lib/galaxy/tool_util/parser/parameter_validators.py +++ b/lib/galaxy/tool_util/parser/parameter_validators.py @@ -2,6 +2,7 @@ from typing import ( Any, cast, + Dict, List, Optional, Sequence, @@ -14,6 +15,7 @@ Field, model_validator, PrivateAttr, + TypeAdapter, ) from typing_extensions import ( Annotated, @@ -96,6 +98,9 @@ class ParameterValidatorModel(StrictModel): implicit: bool = False _static: bool = PrivateAttr(False) _deprecated: bool = PrivateAttr(False) + # validators must be explicitly set as 'safe' to operate as user-defined workflow parameters or to be used + # within future user-defined tool parameters + _safe: bool = PrivateAttr(False) @model_validator(mode="after") def set_default_message(self) -> Self: @@ -163,6 +168,7 @@ class RegexParameterValidatorModel(StaticValidatorModel): type: Literal["regex"] = "regex" negate: Negate = NEGATE_DEFAULT expression: Annotated[str, ValidationArgument("Regular expression to validate against.", xml_body=True)] + _safe: bool = PrivateAttr(True) @property def default_message(self) -> str: @@ -189,6 +195,7 @@ class InRangeParameterValidatorModel(StaticValidatorModel): exclude_min: bool = False exclude_max: bool = False negate: Negate = NEGATE_DEFAULT + _safe: bool = PrivateAttr(True) def statically_validate(self, value: Any): if isinstance(value, (int, float)): @@ -220,6 +227,7 @@ class LengthParameterValidatorModel(StaticValidatorModel): min: Optional[int] = None max: Optional[int] = None negate: Negate = NEGATE_DEFAULT + _safe: bool = PrivateAttr(True) def statically_validate(self, value: Any): if isinstance(value, str): @@ -458,6 +466,20 @@ def default_message(self) -> str: ] +DiscriminatedAnyValidatorModel = TypeAdapter(AnyValidatorModel) + + +def parse_dict_validators(validator_dicts: List[Dict[str, Any]], trusted: bool) -> List[AnyValidatorModel]: + validator_models = [] + for validator_dict in validator_dicts: + validator = DiscriminatedAnyValidatorModel.validate_python(validator_dict) + if not trusted: + # Don't risk instantiating unsafe validators for user-defined code + assert validator._safe + validator_models.append(validator) + return validator_models + + def parse_xml_validators(input_elem: Element) -> List[AnyValidatorModel]: validator_els: List[Element] = input_elem.findall("validator") or [] models = [] diff --git a/lib/galaxy/tool_util/parser/yaml.py b/lib/galaxy/tool_util/parser/yaml.py index 5150f0374c17..88be9c72846a 100644 --- a/lib/galaxy/tool_util/parser/yaml.py +++ b/lib/galaxy/tool_util/parser/yaml.py @@ -34,6 +34,10 @@ ToolOutputCollection, ToolOutputCollectionStructure, ) +from .parameter_validators import ( + AnyValidatorModel, + parse_dict_validators, +) from .stdio import error_on_exit_code from .util import is_dict @@ -329,8 +333,9 @@ def parse_input_sources(self): class YamlInputSource(InputSource): - def __init__(self, input_dict): + def __init__(self, input_dict, trusted: bool = True): self.input_dict = input_dict + self.trusted = trusted def get(self, key, default=None): return self.input_dict.get(key, default) @@ -378,20 +383,8 @@ def parse_when_input_sources(self): sources.append((value, case_page_source)) return sources - def parse_validator_elems(self): - elements = [] - if "validators" in self.input_dict: - for elem in self.input_dict["validators"]: - if "regex_match" in elem: - elements.append( - { - "message": elem.get("regex_doc"), - "content": elem["regex_match"], - "negate": elem.get("negate", False), - "type": "regex", - } - ) - return elements + def parse_validators(self) -> List[AnyValidatorModel]: + return parse_dict_validators(self.input_dict.get("validators", []), trusted=self.trusted) def parse_static_options(self) -> List[Tuple[str, str, bool]]: static_options = [] diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index 99eee94bb1dc..c4fe206fa2e7 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -54,6 +54,7 @@ InvocationFailureWorkflowParameterInvalid, ) from galaxy.tool_util.cwl.util import set_basename_and_derived_properties +from galaxy.tool_util.parser import get_input_source from galaxy.tool_util.parser.output_objects import ToolExpressionOutput from galaxy.tools import ( DatabaseOperationTool, @@ -1523,9 +1524,10 @@ def _parameter_def_list_to_options(parameter_value): suggestion_values = parameter_def.get("suggestions") parameter_kwds["options"] = _parameter_def_list_to_options(suggestion_values) - input_source = dict( + input_source_dict = dict( name="input", label=self.label, type=client_parameter_type, optional=optional, **parameter_kwds ) + input_source = get_input_source(input_source_dict, trusted=False) input = parameter_class(None, input_source) return dict(input=input) @@ -1587,7 +1589,7 @@ def step_state_to_tool_state(self, state): default_set = True default_value = state["default"] multiple = state.get("multiple") - validators = state.get("validators") + source_validators = state.get("validators") restrictions = state.get("restrictions") restrictOnConnections = state.get("restrictOnConnections") suggestions = state.get("suggestions") @@ -1607,8 +1609,19 @@ def step_state_to_tool_state(self, state): } if multiple is not None: state["parameter_definition"]["multiple"] = multiple - if validators is not None: - state["parameter_definition"]["validators"] = validators + if source_validators is not None: + form_validators = [] + # the form definition can change from Galaxy to Galaxy fairly freely, but the source validators are persisted + # and need to be consistent - here we convert the persisted/YAML tool definition version to the "tool form" version. + for i, source_validator in enumerate(source_validators): + form_validators.append( + { + "__index__": i, + "regex_doc": source_validator.get("message"), + "regex_match": source_validator.get("expression"), + } + ) + state["parameter_definition"]["validators"] = form_validators state["parameter_definition"]["restrictions"] = {} state["parameter_definition"]["restrictions"]["how"] = restrictions_how @@ -1653,7 +1666,19 @@ def _parse_state_into_dict(self): if "multiple" in parameters_def: rval["multiple"] = parameters_def["multiple"] if "validators" in parameters_def: - rval["validators"] = parameters_def["validators"] + form_validators = parameters_def["validators"] + source_validators = [] + # convert the current tool form structure to the persisted YAML-definition style + for form_validator in form_validators: + source_validators.append( + { + "message": form_validator["regex_doc"], + "expression": form_validator["regex_match"], + "negate": False, + "type": "regex", + } + ) + rval["validators"] = source_validators restrictions_cond_values = parameters_def.get("restrictions") if restrictions_cond_values: