diff --git a/lib/galaxy/tool_util/parameters/__init__.py b/lib/galaxy/tool_util/parameters/__init__.py
index ffd307f98e78..423d4e950bf5 100644
--- a/lib/galaxy/tool_util/parameters/__init__.py
+++ b/lib/galaxy/tool_util/parameters/__init__.py
@@ -1,3 +1,4 @@
+from .case import test_case_state
from .convert import (
decode,
encode,
@@ -7,6 +8,7 @@
input_models_for_pages,
input_models_for_tool_source,
input_models_from_json,
+ ParameterDefinitionError,
tool_parameter_bundle_from_json,
)
from .json import to_json_schema_string
@@ -68,6 +70,7 @@
"input_models_for_tool_source",
"tool_parameter_bundle_from_json",
"input_models_from_json",
+ "ParameterDefinitionError",
"JobInternalToolState",
"ToolParameterBundle",
"ToolParameterBundleModel",
@@ -106,6 +109,7 @@
"TestCaseToolState",
"ToolParameterT",
"to_json_schema_string",
+ "test_case_state",
"RequestToolState",
"RequestInternalToolState",
"flat_state_path",
diff --git a/lib/galaxy/tool_util/parameters/case.py b/lib/galaxy/tool_util/parameters/case.py
new file mode 100644
index 000000000000..15c029fc68ff
--- /dev/null
+++ b/lib/galaxy/tool_util/parameters/case.py
@@ -0,0 +1,304 @@
+from dataclasses import dataclass
+from re import compile
+from typing import (
+ Any,
+ cast,
+ Dict,
+ List,
+ Optional,
+ Set,
+)
+
+from packaging.version import Version
+
+from galaxy.tool_util.parser.interface import (
+ ToolSource,
+ ToolSourceTest,
+ ToolSourceTestInput,
+ ToolSourceTestInputs,
+)
+from galaxy.util import asbool
+from .factory import input_models_for_tool_source
+from .models import (
+ BooleanParameterModel,
+ ConditionalParameterModel,
+ ConditionalWhen,
+ DataCollectionParameterModel,
+ DataColumnParameterModel,
+ FloatParameterModel,
+ IntegerParameterModel,
+ RepeatParameterModel,
+ SectionParameterModel,
+ ToolParameterT,
+)
+from .state import TestCaseToolState
+from .visitor import (
+ flat_state_path,
+ repeat_inputs_to_array,
+ validate_explicit_conditional_test_value,
+)
+
+INTEGER_STR_PATTERN = compile(r"^(\d+)$")
+COLUMN_NAME_STR_PATTERN = compile(r"^c(\d+): .*$")
+# In an effort to squeeze all the ambiguity out of test cases - at some point Marius and John
+# agree tools should be using value_json for typed inputs to parameters but John has come around on
+# this now that we're validating the parameters as a whole on load. The models are ensuring only
+# unambigious test cases are being loaded.
+WARN_ON_UNTYPED_XML_STRINGS = False
+
+
+@dataclass
+class TestCaseStateAndWarnings:
+ tool_state: TestCaseToolState
+ warnings: List[str]
+ unhandled_inputs: List[str]
+
+
+@dataclass
+class TestCaseStateValidationResult:
+ tool_state: TestCaseToolState
+ warnings: List[str]
+ validation_error: Optional[Exception]
+ tool_parameter_bundle: List[ToolParameterT]
+ profile: str
+
+ def to_dict(self):
+ tool_state_json = self.tool_state.input_state
+ warnings = self.warnings
+ validation_error = str(self.validation_error) if self.validation_error else None
+ return {
+ "tool_state": tool_state_json,
+ "warnings": warnings,
+ "validation_error": validation_error,
+ "validated_with_profile": self.profile,
+ }
+
+
+def legacy_from_string(parameter: ToolParameterT, value: Optional[Any], warnings: List[str], profile: str) -> Any:
+ """Convert string values in XML test cases into typed variants.
+
+ This should only be used when parsing XML test cases into a TestCaseToolState object.
+ We have to maintain backward compatibility on these for older Galaxy tool profile versions.
+ """
+ result_value: Any = value
+ if isinstance(value, str):
+ value_str = cast(str, value)
+ if isinstance(parameter, (IntegerParameterModel,)):
+ if WARN_ON_UNTYPED_XML_STRINGS:
+ warnings.append(
+ f"Implicitly converted {parameter.name} to an integer from a string value, please use 'value_json' to define this test input parameter value instead."
+ )
+ result_value = int(value_str)
+ elif isinstance(parameter, (FloatParameterModel,)):
+ if WARN_ON_UNTYPED_XML_STRINGS:
+ warnings.append(
+ f"Implicitly converted {parameter.name} to a floating point number from a string value, please use 'value_json' to define this test input parameter value instead."
+ )
+ result_value = float(value_str)
+ elif isinstance(parameter, (BooleanParameterModel,)):
+ if WARN_ON_UNTYPED_XML_STRINGS:
+ warnings.append(
+ f"Implicitly converted {parameter.name} to a boolean from a string value, please use 'value_json' to define this test input parameter value instead."
+ )
+ try:
+ result_value = asbool(value_str)
+ except ValueError:
+ warnings.append(
+ "Likely using deprected truevalue/falsevalue in tool parameter - switch to 'true' or 'false'"
+ )
+ elif isinstance(parameter, (DataColumnParameterModel,)):
+ integer_match = INTEGER_STR_PATTERN.match(value_str)
+ if integer_match:
+ if WARN_ON_UNTYPED_XML_STRINGS:
+ warnings.append(
+ f"Implicitly converted {parameter.name} to a column index integer from a string value, please use 'value_json' to define this test input parameter value instead."
+ )
+ result_value = int(value_str)
+ elif Version(profile) < Version("24.2"):
+ # allow this for older tools but new tools will just require the integer index
+ warnings.append(
+ f"Using column names as test case values is deprecated, please adjust {parameter.name} to just use an integer column index."
+ )
+ column_name_value_match = COLUMN_NAME_STR_PATTERN.match(value_str)
+ if column_name_value_match:
+ column_part = column_name_value_match.group(1)
+ result_value = int(column_part)
+ return result_value
+
+
+def test_case_state(
+ test_dict: ToolSourceTest, tool_parameter_bundle: List[ToolParameterT], profile: str, validate: bool = True
+) -> TestCaseStateAndWarnings:
+ warnings: List[str] = []
+ inputs: ToolSourceTestInputs = test_dict["inputs"]
+ unhandled_inputs = []
+ state: Dict[str, Any] = {}
+
+ handled_inputs = _merge_level_into_state(tool_parameter_bundle, inputs, state, profile, warnings, None)
+
+ for test_input in inputs:
+ input_name = test_input["name"]
+ if input_name not in handled_inputs:
+ unhandled_inputs.append(input_name)
+
+ tool_state = TestCaseToolState(state)
+ if validate:
+ tool_state.validate(tool_parameter_bundle)
+ for input_name in unhandled_inputs:
+ raise Exception(f"Invalid parameter name found {input_name}")
+ return TestCaseStateAndWarnings(tool_state, warnings, unhandled_inputs)
+
+
+def test_case_validation(
+ test_dict: ToolSourceTest, tool_parameter_bundle: List[ToolParameterT], profile: str
+) -> TestCaseStateValidationResult:
+ test_case_state_and_warnings = test_case_state(test_dict, tool_parameter_bundle, profile, validate=False)
+ exception: Optional[Exception] = None
+ try:
+ test_case_state_and_warnings.tool_state.validate(tool_parameter_bundle)
+ for input_name in test_case_state_and_warnings.unhandled_inputs:
+ raise Exception(f"Invalid parameter name found {input_name}")
+ except Exception as e:
+ exception = e
+ return TestCaseStateValidationResult(
+ test_case_state_and_warnings.tool_state,
+ test_case_state_and_warnings.warnings,
+ exception,
+ tool_parameter_bundle,
+ profile,
+ )
+
+
+def _merge_level_into_state(
+ tool_inputs: List[ToolParameterT],
+ inputs: ToolSourceTestInputs,
+ state_at_level: dict,
+ profile: str,
+ warnings: List[str],
+ prefix: Optional[str],
+) -> Set[str]:
+ handled_inputs: Set[str] = set()
+ for tool_input in tool_inputs:
+ handled_inputs.update(_merge_into_state(tool_input, inputs, state_at_level, profile, warnings, prefix))
+
+ return handled_inputs
+
+
+def _inputs_as_dict(inputs: ToolSourceTestInputs) -> Dict[str, ToolSourceTestInput]:
+ as_dict: Dict[str, ToolSourceTestInput] = {}
+ for input in inputs:
+ as_dict[input["name"]] = input
+
+ return as_dict
+
+
+def _merge_into_state(
+ tool_input: ToolParameterT,
+ inputs: ToolSourceTestInputs,
+ state_at_level: dict,
+ profile: str,
+ warnings: List[str],
+ prefix: Optional[str],
+) -> Set[str]:
+ handled_inputs = set()
+
+ input_name = tool_input.name
+ state_path = flat_state_path(input_name, prefix)
+ handled_inputs.add(state_path)
+
+ if isinstance(tool_input, (ConditionalParameterModel,)):
+ conditional_state = state_at_level.get(input_name, {})
+ if input_name not in state_at_level:
+ state_at_level[input_name] = conditional_state
+
+ conditional = cast(ConditionalParameterModel, tool_input)
+ when: ConditionalWhen = _select_which_when(conditional, conditional_state, inputs, state_path)
+ test_parameter = conditional.test_parameter
+ handled_inputs.update(
+ _merge_into_state(test_parameter, inputs, conditional_state, profile, warnings, state_path)
+ )
+ handled_inputs.update(
+ _merge_level_into_state(when.parameters, inputs, conditional_state, profile, warnings, state_path)
+ )
+ elif isinstance(tool_input, (RepeatParameterModel,)):
+ repeat_state_array = state_at_level.get(input_name, [])
+ if input_name not in state_at_level:
+ state_at_level[input_name] = repeat_state_array
+
+ repeat = cast(RepeatParameterModel, tool_input)
+ repeat_instance_inputs = repeat_inputs_to_array(state_path, _inputs_as_dict(inputs))
+ for i, _ in enumerate(repeat_instance_inputs):
+ while len(repeat_state_array) <= i:
+ repeat_state_array.append({})
+
+ repeat_instance_prefix = f"{state_path}_{i}"
+ handled_inputs.update(
+ _merge_level_into_state(
+ repeat.parameters, inputs, repeat_state_array[i], profile, warnings, repeat_instance_prefix
+ )
+ )
+ elif isinstance(tool_input, (SectionParameterModel,)):
+ section_state = state_at_level.get(input_name, {})
+ if input_name not in state_at_level:
+ state_at_level[input_name] = section_state
+
+ section = cast(SectionParameterModel, tool_input)
+ handled_inputs.update(
+ _merge_level_into_state(section.parameters, inputs, section_state, profile, warnings, state_path)
+ )
+ else:
+ test_input = _input_for(state_path, inputs)
+ if test_input is not None:
+ if isinstance(tool_input, (DataCollectionParameterModel,)):
+ input_value = test_input.get("attributes", {}).get("collection")
+ else:
+ input_value = test_input["value"]
+ input_value = legacy_from_string(tool_input, input_value, warnings, profile)
+
+ state_at_level[input_name] = input_value
+
+ return handled_inputs
+
+
+def _select_which_when(
+ conditional: ConditionalParameterModel, state: dict, inputs: ToolSourceTestInputs, prefix: str
+) -> ConditionalWhen:
+ test_parameter = conditional.test_parameter
+ test_parameter_name = test_parameter.name
+ test_parameter_flat_path = flat_state_path(test_parameter_name, prefix)
+
+ test_input = _input_for(test_parameter_flat_path, inputs)
+ explicit_test_value = test_input["value"] if test_input else None
+ test_value = validate_explicit_conditional_test_value(test_parameter_name, explicit_test_value)
+ for when in conditional.whens:
+ if test_value is None and when.is_default_when:
+ return when
+ elif test_value == when.discriminator:
+ return when
+ else:
+ raise Exception(f"Invalid conditional test value ({explicit_test_value}) for parameter ({test_parameter_name})")
+
+
+def _input_for(flat_state_path: str, inputs: ToolSourceTestInputs) -> Optional[ToolSourceTestInput]:
+ for input in inputs:
+ if input["name"] == flat_state_path:
+ return input
+ else:
+ return None
+
+
+def validate_test_cases_for_tool_source(
+ tool_source: ToolSource, use_latest_profile: bool = False
+) -> List[TestCaseStateValidationResult]:
+ tool_parameter_bundle = input_models_for_tool_source(tool_source)
+ if use_latest_profile:
+ # this might get old but it is fine, just needs to be updated when test case changes are made
+ profile = "24.2"
+ else:
+ profile = tool_source.parse_profile()
+ test_cases: List[ToolSourceTest] = tool_source.parse_tests_to_dict()["tests"]
+ results_by_test: List[TestCaseStateValidationResult] = []
+ for test_case in test_cases:
+ validation_result = test_case_validation(test_case, tool_parameter_bundle.input_models, profile)
+ results_by_test.append(validation_result)
+ return results_by_test
diff --git a/lib/galaxy/tool_util/parameters/factory.py b/lib/galaxy/tool_util/parameters/factory.py
index f2145ef2e571..987d61580ed0 100644
--- a/lib/galaxy/tool_util/parameters/factory.py
+++ b/lib/galaxy/tool_util/parameters/factory.py
@@ -56,6 +56,10 @@ class ParameterDefinitionError(Exception):
pass
+class UnknownParameterTypeError(ParameterDefinitionError):
+ pass
+
+
def get_color_value(input_source: InputSource) -> str:
return input_source.get("value", "#000000")
@@ -204,14 +208,14 @@ def _from_input_source_galaxy(input_source: InputSource) -> ToolParameterT:
name=input_source.parse_name(),
)
else:
- raise Exception(f"Unknown Galaxy parameter type {param_type}")
+ raise UnknownParameterTypeError(f"Unknown Galaxy parameter type {param_type}")
elif input_type == "conditional":
test_param_input_source = input_source.parse_test_input_source()
test_parameter = cast(
Union[BooleanParameterModel, SelectParameterModel], _from_input_source_galaxy(test_param_input_source)
)
whens = []
- default_value = cond_test_parameter_default_value(test_parameter)
+ default_test_value = cond_test_parameter_default_value(test_parameter)
for value, case_inputs_sources in input_source.parse_when_input_sources():
if isinstance(test_parameter, BooleanParameterModel):
# TODO: investigate truevalue/falsevalue when...
@@ -221,7 +225,7 @@ def _from_input_source_galaxy(input_source: InputSource) -> ToolParameterT:
tool_parameter_models = input_models_for_page(case_inputs_sources)
is_default_when = False
- if typed_value == default_value:
+ if typed_value == default_test_value:
is_default_when = True
whens.append(
ConditionalWhen(discriminator=value, parameters=tool_parameter_models, is_default_when=is_default_when)
diff --git a/lib/galaxy/tool_util/parameters/models.py b/lib/galaxy/tool_util/parameters/models.py
index 83d4a3a688d7..f83b0bd0d9e0 100644
--- a/lib/galaxy/tool_util/parameters/models.py
+++ b/lib/galaxy/tool_util/parameters/models.py
@@ -39,7 +39,10 @@
)
from galaxy.exceptions import RequestParameterInvalidException
-from galaxy.tool_util.parser.interface import DrillDownOptionsDict
+from galaxy.tool_util.parser.interface import (
+ DrillDownOptionsDict,
+ TestCollectionDefDict,
+)
from ._types import (
cast_as_type,
is_optional,
@@ -57,7 +60,7 @@
# + request_internal: This is a pydantic model to validate what Galaxy expects to find in the database,
# in particular dataset and collection references should be decoded integers.
StateRepresentationT = Literal[
- "request", "request_internal", "job_internal", "test_case", "workflow_step", "workflow_step_linked"
+ "request", "request_internal", "job_internal", "test_case_xml", "workflow_step", "workflow_step_linked"
]
@@ -119,17 +122,16 @@ def request_requires_value(self) -> bool:
def dynamic_model_information_from_py_type(
- param_model: ParamModel, py_type: Type, requires_value: Optional[bool] = None
+ param_model: ParamModel, py_type: Type, requires_value: Optional[bool] = None, validators=None
):
name = param_model.name
if requires_value is None:
requires_value = param_model.request_requires_value
initialize = ... if requires_value else None
py_type_is_optional = is_optional(py_type)
+ validators = validators or {}
if not py_type_is_optional and not requires_value:
- validators = {"not_null": field_validator(name)(Validators.validate_not_none)}
- else:
- validators = {}
+ validators["not_null"] = field_validator(name)(Validators.validate_not_none)
return DynamicModelInformation(
name,
@@ -310,9 +312,9 @@ def py_type_internal(self) -> Type:
def py_type_test_case(self) -> Type:
base_model: Type
if self.multiple:
- base_model = MultiDataRequestInternal
+ base_model = str
else:
- base_model = DataTestCaseValue
+ base_model = str
return optional_if_needed(base_model, self.optional)
def pydantic_template(self, state_representation: StateRepresentationT) -> DynamicModelInformation:
@@ -324,7 +326,7 @@ def pydantic_template(self, state_representation: StateRepresentationT) -> Dynam
)
elif state_representation == "job_internal":
return dynamic_model_information_from_py_type(self, self.py_type_internal)
- elif state_representation == "test_case":
+ elif state_representation == "test_case_xml":
return dynamic_model_information_from_py_type(self, self.py_type_test_case)
elif state_representation == "workflow_step":
return dynamic_model_information_from_py_type(self, type(None), requires_value=False)
@@ -369,6 +371,8 @@ def pydantic_template(self, state_representation: StateRepresentationT) -> Dynam
return dynamic_model_information_from_py_type(self, type(None), requires_value=False)
elif state_representation == "workflow_step_linked":
return dynamic_model_information_from_py_type(self, ConnectedValue)
+ elif state_representation == "test_case_xml":
+ return dynamic_model_information_from_py_type(self, TestCollectionDefDict)
else:
raise NotImplementedError(
f"Have not implemented data collection parameter models for state representation {state_representation}"
@@ -536,7 +540,14 @@ class SelectParameterModel(BaseGalaxyToolParameterModelDefinition):
options: Optional[List[LabelValue]] = None
multiple: bool
- def py_type_if_required(self, allow_connections=False) -> Type:
+ @staticmethod
+ def split_str(cls, data: Any) -> Any:
+ if isinstance(data, str):
+ return [x.strip() for x in data.split(",")]
+
+ return data
+
+ def py_type_if_required(self, allow_connections: bool = False) -> Type:
if self.options is not None:
if len(self.options) > 0:
literal_options: List[Type] = [cast_as_type(Literal[o.value]) for o in self.options]
@@ -569,6 +580,16 @@ def pydantic_template(self, state_representation: StateRepresentationT) -> Dynam
elif state_representation == "workflow_step_linked":
py_type = self.py_type_if_required(allow_connections=True)
return dynamic_model_information_from_py_type(self, optional_if_needed(py_type, self.optional))
+ elif state_representation == "test_case_xml":
+ # in a YAML test case representation this can be string, in XML we are still expecting a comma separated string
+ py_type = self.py_type_if_required(allow_connections=False)
+ if self.multiple:
+ validators = {"from_string": field_validator(self.name, mode="before")(SelectParameterModel.split_str)}
+ else:
+ validators = {}
+ return dynamic_model_information_from_py_type(
+ self, optional_if_needed(py_type, self.optional), validators=validators
+ )
else:
return dynamic_model_information_from_py_type(self, self.py_type)
@@ -662,8 +683,16 @@ def py_type(self) -> Type:
return py_type
+ @property
+ def py_type_test_case_xml(self) -> Type:
+ base_model = str
+ return optional_if_needed(base_model, not self.request_requires_value)
+
def pydantic_template(self, state_representation: StateRepresentationT) -> DynamicModelInformation:
- return dynamic_model_information_from_py_type(self, self.py_type)
+ if state_representation == "test_case_xml":
+ return dynamic_model_information_from_py_type(self, self.py_type_test_case_xml)
+ else:
+ return dynamic_model_information_from_py_type(self, self.py_type)
@property
def request_requires_value(self) -> bool:
@@ -693,6 +722,15 @@ class DataColumnParameterModel(BaseGalaxyToolParameterModelDefinition):
parameter_type: Literal["gx_data_column"] = "gx_data_column"
multiple: bool
+ @staticmethod
+ def split_str(cls, data: Any) -> Any:
+ if isinstance(data, str):
+ return [int(x.strip()) for x in data.split(",")]
+ elif isinstance(data, int):
+ return [data]
+
+ return data
+
@property
def py_type(self) -> Type:
py_type: Type = StrictInt
@@ -701,7 +739,16 @@ def py_type(self) -> Type:
return optional_if_needed(py_type, self.optional)
def pydantic_template(self, state_representation: StateRepresentationT) -> DynamicModelInformation:
- return dynamic_model_information_from_py_type(self, self.py_type)
+ if state_representation == "test_case_xml":
+ if self.multiple:
+ validators = {
+ "from_string": field_validator(self.name, mode="before")(DataColumnParameterModel.split_str)
+ }
+ else:
+ validators = {}
+ return dynamic_model_information_from_py_type(self, self.py_type, validators=validators)
+ else:
+ return dynamic_model_information_from_py_type(self, self.py_type)
@property
def request_requires_value(self) -> bool:
@@ -1125,13 +1172,6 @@ class ToolParameterBundleModel(BaseModel):
input_models: List[ToolParameterT]
-def parameters_by_name(tool_parameter_bundle: ToolParameterBundle) -> Dict[str, ToolParameterT]:
- as_dict = {}
- for input_model in simple_input_models(tool_parameter_bundle.input_models):
- as_dict[input_model.name] = input_model
- return as_dict
-
-
def to_simple_model(input_parameter: Union[ToolParameterModel, ToolParameterT]) -> ToolParameterT:
if input_parameter.__class__ == ToolParameterModel:
assert isinstance(input_parameter, ToolParameterModel)
@@ -1166,7 +1206,7 @@ def create_job_internal_model(tool: ToolParameterBundle, name: str = "DynamicMod
def create_test_case_model(tool: ToolParameterBundle, name: str = "DynamicModelForTool") -> Type[BaseModel]:
- return create_field_model(tool.input_models, name, "test_case")
+ return create_field_model(tool.input_models, name, "test_case_xml")
def create_workflow_step_model(tool: ToolParameterBundle, name: str = "DynamicModelForTool") -> Type[BaseModel]:
diff --git a/lib/galaxy/tool_util/parameters/scripts/__init__.py b/lib/galaxy/tool_util/parameters/scripts/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/lib/galaxy/tool_util/parameters/scripts/validate_test_cases.py b/lib/galaxy/tool_util/parameters/scripts/validate_test_cases.py
new file mode 100644
index 000000000000..8c920a225cc9
--- /dev/null
+++ b/lib/galaxy/tool_util/parameters/scripts/validate_test_cases.py
@@ -0,0 +1,152 @@
+import argparse
+import json
+import os
+import sys
+from dataclasses import dataclass
+from pathlib import Path
+from typing import (
+ List,
+ Optional,
+)
+
+from galaxy.tool_util.parameters.case import (
+ TestCaseStateValidationResult,
+ validate_test_cases_for_tool_source,
+)
+from galaxy.tool_util.parser import get_tool_source
+
+DESCRIPTION = """
+A small utility to load a Galaxy tool file and run tool test validation code on it.
+
+This is not meant to be an end-user application or part of the typical tool developer tool
+chain. This functionality will be integrated into Planemo with the typical polish that
+entails. This purpose of this utility is for Galaxy developers and people wishing to
+make informed decisions about the tool test schema and best practices to apply the tool
+state validation code in isolation. This script can also be used to integrate this functionality
+in non-Python tool chains.
+"""
+
+
+def arg_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(description=DESCRIPTION)
+ parser.add_argument("tool_source")
+ parser.add_argument(
+ "-l",
+ "--latest",
+ dest="latest",
+ default=False,
+ action="store_true",
+ help="Validate test against latest tool profile version regardless of explicit tool profile version specified in tool source file.",
+ )
+ parser.add_argument(
+ "-j",
+ "--json",
+ default=False,
+ action="store_true",
+ help="Output validation results as JSON.",
+ )
+ return parser
+
+
+@dataclass
+class ToolTestValidationResults:
+ tool_id: str
+ tool_version: Optional[str]
+ tool_profile: str
+ tool_path: str
+ results: List[TestCaseStateValidationResult]
+ load_error: Optional[Exception]
+
+ def to_dict(self):
+ return {
+ "tool_id": self.tool_id,
+ "tool_version": self.tool_version,
+ "tool_path": str(self.tool_path),
+ "results": [r.to_dict() for r in self.results],
+ "load_error": str(self.load_error) if self.load_error else None,
+ }
+
+
+def report_results(results_for_tool: ToolTestValidationResults) -> None:
+ test_results = results_for_tool.results
+ print(
+ f"Found {len(test_results)} test cases to validate for tool {results_for_tool.tool_id} / {results_for_tool.tool_version} (@ {results_for_tool.tool_path})"
+ )
+ for i, result in enumerate(test_results):
+ if result.validation_error is not None:
+ print(f"Test Case {i+1}: validation failed - {str(result.validation_error)}")
+ elif len(result.warnings) == 0:
+ print(f"Test Case {i+1}: validated")
+ else:
+ print(f"Test case {i+1}: test case validated with warnings:")
+ for test_warning in result.warnings:
+ print(f"- {test_warning}")
+
+
+class NotValidToolException(Exception):
+ pass
+
+
+def validate_tool(tool_path, latest) -> ToolTestValidationResults:
+ try:
+ tool_source = get_tool_source(tool_path)
+ except Exception:
+ # probably not a Galaxy tool... just skip
+ raise NotValidToolException()
+ tool_id = tool_source.parse_id()
+ if tool_id is None:
+ raise NotValidToolException()
+ load_error: Optional[Exception] = None
+ try:
+ results = validate_test_cases_for_tool_source(tool_source, use_latest_profile=latest)
+ except Exception as e:
+ load_error = e
+ results = []
+ return ToolTestValidationResults(
+ tool_id,
+ tool_source.parse_version(),
+ tool_source.parse_profile(),
+ tool_path,
+ results,
+ load_error,
+ )
+
+
+def validate_test_cases(args):
+ path = args.tool_source
+ latest = args.latest
+ if not os.path.isdir(path):
+ results = validate_tool(path, latest)
+ if args.json:
+ print(json.dumps(results.to_dict()))
+ else:
+ report_results(results)
+ else:
+ xml_files = Path(path).glob("**/*.xml")
+ all_results = []
+ for xml_file in xml_files:
+ if "test-data" in str(xml_file):
+ continue
+ try:
+ results = validate_tool(xml_file, latest)
+ all_results.append(results)
+ except NotValidToolException:
+ continue
+
+ if args.json:
+ print(json.dumps([r.to_dict() for r in all_results], indent=4))
+ else:
+ for results in all_results:
+ report_results(results)
+
+
+def main(argv=None) -> None:
+ if argv is None:
+ argv = sys.argv[1:]
+
+ args = arg_parser().parse_args(argv)
+ validate_test_cases(args)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/lib/galaxy/tool_util/parameters/state.py b/lib/galaxy/tool_util/parameters/state.py
index 3c5389c9c230..c21163d1e786 100644
--- a/lib/galaxy/tool_util/parameters/state.py
+++ b/lib/galaxy/tool_util/parameters/state.py
@@ -17,6 +17,7 @@
create_job_internal_model,
create_request_internal_model,
create_request_model,
+ create_test_case_model,
create_workflow_step_linked_model,
create_workflow_step_model,
StateRepresentationT,
@@ -91,12 +92,12 @@ def _parameter_model_for(cls, input_models: ToolParameterBundle) -> Type[BaseMod
class TestCaseToolState(ToolState):
- state_representation: Literal["test_case"] = "test_case"
+ state_representation: Literal["test_case_xml"] = "test_case_xml"
@classmethod
def _parameter_model_for(cls, input_models: ToolParameterBundle) -> Type[BaseModel]:
# implement a test case model...
- return create_request_internal_model(input_models)
+ return create_test_case_model(input_models)
class WorkflowStepToolState(ToolState):
diff --git a/lib/galaxy/tool_util/unittest_utils/__init__.py b/lib/galaxy/tool_util/unittest_utils/__init__.py
index d179aff465bc..05849a7bedd8 100644
--- a/lib/galaxy/tool_util/unittest_utils/__init__.py
+++ b/lib/galaxy/tool_util/unittest_utils/__init__.py
@@ -31,5 +31,9 @@ def get_content(filename: Optional[str]) -> bytes:
return get_content
+def functional_test_tool_directory() -> str:
+ return os.path.join(galaxy_directory(), "test/functional/tools")
+
+
def functional_test_tool_path(test_path: str) -> str:
- return os.path.join(galaxy_directory(), "test/functional/tools", test_path)
+ return os.path.join(functional_test_tool_directory(), test_path)
diff --git a/lib/galaxy/tool_util/verify/parse.py b/lib/galaxy/tool_util/verify/parse.py
index aa32b5328ffe..ffbf71d38e1a 100644
--- a/lib/galaxy/tool_util/verify/parse.py
+++ b/lib/galaxy/tool_util/verify/parse.py
@@ -5,11 +5,17 @@
Iterable,
List,
Optional,
+ Tuple,
Union,
)
from packaging.version import Version
+from galaxy.tool_util.parameters import (
+ input_models_for_tool_source,
+ test_case_state as case_state,
+ ParameterDefinitionError,
+)
from galaxy.tool_util.parser.interface import (
InputSource,
TestCollectionDef,
@@ -52,10 +58,38 @@ def parse_tool_test_descriptions(
"""
Build ToolTestDescription objects for each test description.
"""
+ validate_on_load = Version(tool_source.parse_profile()) >= Version("24.2")
raw_tests_dict: ToolSourceTests = tool_source.parse_tests_to_dict()
tests: List[ToolTestDescription] = []
+
+ profile = tool_source.parse_profile()
for i, raw_test_dict in enumerate(raw_tests_dict.get("tests", [])):
- test = _description_from_tool_source(tool_source, raw_test_dict, i, tool_guid)
+ validation_exception: Optional[Exception] = None
+ if validate_on_load:
+ tool_parameter_bundle = input_models_for_tool_source(tool_source)
+ try:
+ case_state(raw_test_dict, tool_parameter_bundle.input_models, profile, validate=True)
+ except Exception as e:
+ # TOOD: restrict types of validation exceptions a bit probably?
+ validation_exception = e
+
+ if validation_exception:
+ tool_id, tool_version = _tool_id_and_version(tool_source, tool_guid)
+ test = ToolTestDescription.from_tool_source_dict(
+ InvalidToolTestDict(
+ {
+ "tool_id": tool_id,
+ "tool_version": tool_version,
+ "test_index": i,
+ "inputs": {},
+ "error": True,
+ "exception": unicodify(validation_exception),
+ "maxseconds": None,
+ }
+ )
+ )
+ else:
+ test = _description_from_tool_source(tool_source, raw_test_dict, i, tool_guid)
tests.append(test)
return tests
@@ -74,10 +108,7 @@ def _description_from_tool_source(
if maxseconds is not None:
maxseconds = int(maxseconds)
- tool_id = tool_guid or tool_source.parse_id()
- assert tool_id
- tool_version = parse_tool_version_with_defaults(tool_id, tool_source)
-
+ tool_id, tool_version = _tool_id_and_version(tool_source, tool_guid)
processed_test_dict: Union[ValidToolTestDict, InvalidToolTestDict]
try:
processed_inputs = _process_raw_inputs(
@@ -127,6 +158,13 @@ def _description_from_tool_source(
return ToolTestDescription.from_tool_source_dict(processed_test_dict)
+def _tool_id_and_version(tool_source: ToolSource, tool_guid: Optional[str]) -> Tuple[str, str]:
+ tool_id = tool_guid or tool_source.parse_id()
+ assert tool_id
+ tool_version = parse_tool_version_with_defaults(tool_id, tool_source)
+ return tool_id, tool_version
+
+
def _process_raw_inputs(
tool_source: ToolSource,
input_sources: List[InputSource],
diff --git a/packages/tool_util/setup.cfg b/packages/tool_util/setup.cfg
index 7c8fd75feec1..5b4f5619cfab 100644
--- a/packages/tool_util/setup.cfg
+++ b/packages/tool_util/setup.cfg
@@ -49,6 +49,7 @@ python_requires = >=3.7
[options.entry_points]
console_scripts =
galaxy-tool-test = galaxy.tool_util.verify.script:main
+ galaxy-tool-test-case-validation = galaxy.tool_util.parameters.scripts.validate_test_cases:main
mulled-build = galaxy.tool_util.deps.mulled.mulled_build:main
mulled-build-channel = galaxy.tool_util.deps.mulled.mulled_build_channel:main
mulled-build-files = galaxy.tool_util.deps.mulled.mulled_build_files:main
diff --git a/test/functional/tools/section_24_2.xml b/test/functional/tools/section_24_2.xml
new file mode 100644
index 000000000000..ec62f0fd8ee9
--- /dev/null
+++ b/test/functional/tools/section_24_2.xml
@@ -0,0 +1,43 @@
+
+ '$out_file1' &&
+echo $float.floattest >> '$out_file1'
+ ]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/unit/tool_util/parameter_specification.yml b/test/unit/tool_util/parameter_specification.yml
index 6bc266597af4..066588295138 100644
--- a/test/unit/tool_util/parameter_specification.yml
+++ b/test/unit/tool_util/parameter_specification.yml
@@ -27,10 +27,10 @@ gx_int:
- parameter: "None"
- parameter: { 5 }
- parameter: {__class__: 'ConnectedValue'}
- test_case_valid:
+ test_case_xml_valid:
- parameter: 5
- {}
- test_case_invalid:
+ test_case_xml_invalid:
- parameter: null
- parameter: "5"
workflow_step_valid:
@@ -204,11 +204,11 @@ gx_select:
- parameter: "ex2"
request_internal_invalid:
- parameter: {}
- test_case_valid:
+ test_case_xml_valid:
- parameter: 'ex2'
- parameter: '--ex1'
- {}
- test_case_invalid:
+ test_case_xml_invalid:
- parameter: {}
- parameter: null
workflow_step_valid:
@@ -412,11 +412,11 @@ gx_float:
- parameter: "5"
- parameter: "5.0"
- parameter: { "moo": "cow" }
- test_case_valid:
+ test_case_xml_valid:
- parameter: 5
- parameter: 5.0
- {}
- test_case_invalid:
+ test_case_xml_invalid:
- parameter: null
- parameter: "5.0"
- parameter: "5.1"
@@ -449,12 +449,12 @@ gx_float_optional:
- parameter: "5.0"
- parameter: {}
- parameter: { "moo": "cow" }
- test_case_valid:
+ test_case_xml_valid:
- parameter: 5
- parameter: 5.0
- {}
- parameter: null
- test_case_invalid:
+ test_case_xml_invalid:
- parameter: "5.0"
- parameter: "5.1"
workflow_step_valid:
diff --git a/test/unit/tool_util/test_parameter_specification.py b/test/unit/tool_util/test_parameter_specification.py
index d3f43955631e..6a54d78cfabf 100644
--- a/test/unit/tool_util/test_parameter_specification.py
+++ b/test/unit/tool_util/test_parameter_specification.py
@@ -93,8 +93,8 @@ def _test_file(file: str, specification=None, parameter_bundle: Optional[ToolPar
"request_internal_invalid": _assert_internal_requests_invalid,
"job_internal_valid": _assert_internal_jobs_validate,
"job_internal_invalid": _assert_internal_jobs_invalid,
- "test_case_valid": _assert_test_cases_validate,
- "test_case_invalid": _assert_test_cases_invalid,
+ "test_case_xml_valid": _assert_test_cases_validate,
+ "test_case_xml_invalid": _assert_test_cases_invalid,
"workflow_step_valid": _assert_workflow_steps_validate,
"workflow_step_invalid": _assert_workflow_steps_invalid,
"workflow_step_linked_valid": _assert_workflow_steps_linked_validate,
diff --git a/test/unit/tool_util/test_parameter_test_cases.py b/test/unit/tool_util/test_parameter_test_cases.py
new file mode 100644
index 000000000000..92884c8891b7
--- /dev/null
+++ b/test/unit/tool_util/test_parameter_test_cases.py
@@ -0,0 +1,127 @@
+import os
+import re
+from typing import List
+
+from galaxy.tool_util.models import parse_tool
+from galaxy.tool_util.parameters.case import (
+ test_case_state as case_state,
+ validate_test_cases_for_tool_source,
+ TestCaseStateValidationResult,
+)
+from galaxy.tool_util.parser.factory import get_tool_source
+from galaxy.tool_util.parser.interface import ToolSourceTest
+from galaxy.tool_util.unittest_utils import functional_test_tool_directory
+from galaxy.tool_util.verify.parse import parse_tool_test_descriptions
+
+# legacy tools allows specifying parameter and repeat parameters without
+# qualification. This was problematic and could result in ambigious specifications.
+TOOLS_THAT_USE_UNQUALIFIED_PARAMETER_ACCESS = [
+ "boolean_conditional.xml",
+ "simple_constructs.xml",
+ "section.xml",
+ "collection_paired_conditional_structured_like.xml",
+ "output_action_change_format.xml",
+ "top_level_data.xml",
+ "disambiguate_cond.xml",
+ "multi_repeats.xml",
+ "implicit_default_conds.xml",
+ "min_repeat.xml",
+]
+
+# tools that use truevalue/falsevalue in parameter setting, I think we're going to
+# forbid this for a future tool profile version. Potential ambigouity could result.
+TOOLS_THAT_USE_TRUE_FALSE_VALUE_BOOLEAN_SPECIFICATION = [
+ "inputs_as_json_profile.xml",
+ "inputs_as_json_with_paths.xml",
+ "inputs_as_json.xml",
+]
+
+TOOLS_THAT_USE_SELECT_BY_VALUE = [
+ "multi_select.xml",
+]
+
+
+TEST_TOOL_THAT_DO_NOT_VALIDATE = (
+ TOOLS_THAT_USE_UNQUALIFIED_PARAMETER_ACCESS
+ + TOOLS_THAT_USE_TRUE_FALSE_VALUE_BOOLEAN_SPECIFICATION
+ + TOOLS_THAT_USE_SELECT_BY_VALUE
+ + [
+ # will never handle upload_dataset
+ "upload.xml",
+ ]
+)
+
+
+def test_parameter_test_cases_validate():
+ validation_result = validate_test_cases_for("column_param")
+ assert len(validation_result[0].warnings) == 0
+ assert len(validation_result[1].warnings) == 0
+ assert len(validation_result[2].warnings) == 1
+
+ validation_result = validate_test_cases_for("column_param", use_latest_profile=True)
+ assert validation_result[2].validation_error
+
+
+def test_legacy_features_fail_validation_with_24_2(tmp_path):
+ for filename in TOOLS_THAT_USE_UNQUALIFIED_PARAMETER_ACCESS + TOOLS_THAT_USE_TRUE_FALSE_VALUE_BOOLEAN_SPECIFICATION:
+ _assert_tool_test_parsing_only_fails_with_newer_profile(tmp_path, filename)
+
+ # column parameters need to be indexes
+ _assert_tool_test_parsing_only_fails_with_newer_profile(tmp_path, "column_param.xml")
+
+ # selection by value only
+ _assert_tool_test_parsing_only_fails_with_newer_profile(tmp_path, "multi_select.xml")
+
+
+def _assert_tool_test_parsing_only_fails_with_newer_profile(tmp_path, filename: str):
+ test_tool_directory = functional_test_tool_directory()
+ original_path = os.path.join(test_tool_directory, filename)
+ new_path = tmp_path / filename
+ with open(original_path) as rf:
+ tool_contents = rf.read()
+ tool_contents = re.sub(r'profile="[\d\.]*"', r"", tool_contents)
+ new_profile_contents = tool_contents.replace(" List[TestCaseStateValidationResult]:
+ test_tool_directory = functional_test_tool_directory()
+ tool_path = os.path.join(test_tool_directory, f"{tool_name}.xml")
+ tool_source = get_tool_source(tool_path)
+ return validate_test_cases_for_tool_source(tool_source, **kwd)
diff --git a/test/unit/tool_util/test_test_definition_parsing.py b/test/unit/tool_util/test_test_definition_parsing.py
index 032bd69d4c76..f7c2c3bd28b9 100644
--- a/test/unit/tool_util/test_test_definition_parsing.py
+++ b/test/unit/tool_util/test_test_definition_parsing.py
@@ -101,7 +101,7 @@ def test_unqualified_access_disabled_in_24_2(self):
self._init_tool_for_path(functional_test_tool_path("deprecated/simple_constructs_24_2.xml"))
test_dicts = self._parse_tests()
test_0 = test_dicts[0].to_dict()
- assert "p1|p1use" not in test_0["inputs"]
+ assert test_0["error"] is True
def test_bigwigtowig_converter(self):
# defines