From 46e83e178d1b3d7a30815d63a1c946b23b6dce23 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Wed, 28 Feb 2024 10:18:43 -0500 Subject: [PATCH 1/3] Implement a cross product collection operation tools. These are available in a list(n) x list (m) -> list(nxm) version (a cross product that produces two flat lists) and a list(n) x list(m) -> list(n):list(m) version (a cross product that produces two nested lists). After two lists have been run through one of these two tools - the result is two new lists that can be passed into another tool to perform all-against-all operations using Galaxy's normal collection mapping semantics. The choice of which to use will depend on how you want to continue to process the all-against-all results after the next step in an analysis. My sense is the flat version is "easier" to think about and pick through manually and the nested version perserves more structure if additional collection operation tools will be used to filter or aggregate the results. Some considerations: Apply Rules? I do not believe the Apply Rules tool semanatics would allow these operations but certainly the Apply Rules tool could be used to convert the result of the flat version to the nested version or vice versa - so no metadata is really lost per se between the two versions. I think it is still worth including both versions though - they both have utility (both for instance are baked into CWL's workflow semantics - https://docs.sevenbridges.com/docs/about-parallelizing-tool-executions#nested-cross-product) and avoiding requiring complex Apply Rules programs for simple workflows is probably ideal. One Tool vs Two? Marius and I agree that few simpler tools for these kinds of operations are better. The tool help can be more focused and avoiding the conditional and conditional outputs make the static analysis done for instance by the workflow editor simpler. --- lib/galaxy/config/sample/tool_conf.xml.sample | 2 + lib/galaxy/tools/__init__.py | 89 ++++++++++++++++++ lib/galaxy/tools/cross_product_flat.xml | 89 ++++++++++++++++++ lib/galaxy/tools/cross_product_nested.xml | 93 +++++++++++++++++++ lib/galaxy/tools/flatten_collection.xml | 17 ++-- lib/galaxy/tools/model_operation_macros.xml | 40 ++++++++ run_tests.sh | 13 +++ test/functional/tools/sample_tool_conf.xml | 2 + 8 files changed, 334 insertions(+), 11 deletions(-) create mode 100644 lib/galaxy/tools/cross_product_flat.xml create mode 100644 lib/galaxy/tools/cross_product_nested.xml create mode 100644 lib/galaxy/tools/model_operation_macros.xml diff --git a/lib/galaxy/config/sample/tool_conf.xml.sample b/lib/galaxy/config/sample/tool_conf.xml.sample index 218898880631..153b5ae314d4 100644 --- a/lib/galaxy/config/sample/tool_conf.xml.sample +++ b/lib/galaxy/config/sample/tool_conf.xml.sample @@ -38,6 +38,8 @@ + + diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 917464e5b45d..8186b9611404 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -13,6 +13,7 @@ from collections.abc import MutableMapping from pathlib import Path from typing import ( + Any, cast, Dict, List, @@ -3305,6 +3306,94 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history ) +class CrossProductFlatCollectionTool(DatabaseOperationTool): + tool_type = "cross_product_flat" + require_terminal_states = False + require_dataset_ok = False + + def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): + input_a = incoming["input_a"] + input_b = incoming["input_b"] + join_identifier = incoming["join_identifier"] + + output_a = {} + output_b = {} + all_copied_hdas = [] + + for input_a_dce in input_a.collection.elements: + element_identifier_a = input_a_dce.element_identifier + for input_b_dce in input_b.collection.elements: + element_identifier_b = input_b_dce.element_identifier + identifier = f"{element_identifier_a}{join_identifier}{element_identifier_b}" + + hda_a_copy = input_a_dce.element_object.copy(copy_tags=input_a_dce.element_object.tags, flush=False) + hda_b_copy = input_b_dce.element_object.copy(copy_tags=input_b_dce.element_object.tags, flush=False) + all_copied_hdas.append(hda_a_copy) + all_copied_hdas.append(hda_b_copy) + output_a[identifier] = hda_a_copy + output_b[identifier] = hda_b_copy + + self._add_datasets_to_history(history, all_copied_hdas) + output_collections.create_collection( + self.outputs["output_a"], "output_a", elements=output_a, propagate_hda_tags=False + ) + output_collections.create_collection( + self.outputs["output_b"], "output_b", elements=output_b, propagate_hda_tags=False + ) + + +class CrossProductNestedCollectionTool(DatabaseOperationTool): + tool_type = "cross_product_nested" + require_terminal_states = False + require_dataset_ok = False + + def produce_outputs(self, trans, out_data, output_collections, incoming, history, **kwds): + input_a = incoming["input_a"] + input_b = incoming["input_b"] + + output_a = {} + output_b = {} + all_copied_hdas = [] + + for input_a_dce in input_a.collection.elements: + element_identifier_a = input_a_dce.element_identifier + + iter_elements_a = {} + iter_elements_b = {} + + for input_b_dce in input_b.collection.elements: + element_identifier_b = input_b_dce.element_identifier + + hda_a_copy = input_a_dce.element_object.copy(copy_tags=input_a_dce.element_object.tags, flush=False) + hda_b_copy = input_b_dce.element_object.copy(copy_tags=input_b_dce.element_object.tags, flush=False) + all_copied_hdas.append(hda_a_copy) + all_copied_hdas.append(hda_b_copy) + iter_elements_a[element_identifier_b] = hda_a_copy + iter_elements_b[element_identifier_b] = hda_b_copy + + sub_collection_a: Dict[str, Any] = {} + sub_collection_a["src"] = "new_collection" + sub_collection_a["collection_type"] = "list" + sub_collection_a["elements"] = iter_elements_a + + output_a[element_identifier_a] = sub_collection_a + + sub_collection_b: Dict[str, Any] = {} + sub_collection_b["src"] = "new_collection" + sub_collection_b["collection_type"] = "list" + sub_collection_b["elements"] = iter_elements_b + + output_b[element_identifier_a] = sub_collection_b + + self._add_datasets_to_history(history, all_copied_hdas) + output_collections.create_collection( + self.outputs["output_a"], "output_a", elements=output_a, propagate_hda_tags=False + ) + output_collections.create_collection( + self.outputs["output_b"], "output_b", elements=output_b, propagate_hda_tags=False + ) + + class BuildListCollectionTool(DatabaseOperationTool): tool_type = "build_list" require_terminal_states = False diff --git a/lib/galaxy/tools/cross_product_flat.xml b/lib/galaxy/tools/cross_product_flat.xml new file mode 100644 index 000000000000..891772c78a6c --- /dev/null +++ b/lib/galaxy/tools/cross_product_flat.xml @@ -0,0 +1,89 @@ + + + + + model_operation_macros.xml + + + + operation_3436 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/lib/galaxy/tools/cross_product_nested.xml b/lib/galaxy/tools/cross_product_nested.xml new file mode 100644 index 000000000000..b4ba4d596de5 --- /dev/null +++ b/lib/galaxy/tools/cross_product_nested.xml @@ -0,0 +1,93 @@ + + + + + model_operation_macros.xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/lib/galaxy/tools/flatten_collection.xml b/lib/galaxy/tools/flatten_collection.xml index 999d9b887bbd..67a96252dfeb 100644 --- a/lib/galaxy/tools/flatten_collection.xml +++ b/lib/galaxy/tools/flatten_collection.xml @@ -9,13 +9,12 @@ operation_2409 + + model_operation_macros.xml + - - - - - + @@ -35,14 +34,10 @@ - - - + - - - + diff --git a/lib/galaxy/tools/model_operation_macros.xml b/lib/galaxy/tools/model_operation_macros.xml new file mode 100644 index 000000000000..a3f16c5398e3 --- /dev/null +++ b/lib/galaxy/tools/model_operation_macros.xml @@ -0,0 +1,40 @@ + + + + + This tool will create new history datasets copied from your input collections but your quota usage will not increase. + + + operation_3436 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/run_tests.sh b/run_tests.sh index 7fbd26bc813c..2e04eab5eee9 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -82,6 +82,19 @@ Run a selenium test against a running server while watching client (fastest iter . .venv/bin/activate # source the virtualenv so can skip run_tests.sh. pytest lib/galaxy_test/selenium/test_workflow_editor.py::TestWorkflowEditor::test_data_input +To run the tool tests for a specific framework test tool +listed in test/functional/tools/sample_tool_conf.xml. + + ./run_tests.sh -framework -id + +If you'd like to skip this script and run it with pytest +directly a command like the following can be used. Note +the framework tools run with conda installation on but 99% +of the tools do not require this so this example includes +disabling that. + + GALAXY_TEST_TOOL_CONF="test/functional/tools/sample_tool_conf.xml" GALAXY_CONFIG_OVERRIDE_CONDA_AUTO_INIT=false pytest test/functional/test_toolbox_pytest.py -k -m tool + Note About Selenium Tests: If using a local selenium driver such as a Chrome or Firefox based one diff --git a/test/functional/tools/sample_tool_conf.xml b/test/functional/tools/sample_tool_conf.xml index d63a21c73c4f..10a776989593 100644 --- a/test/functional/tools/sample_tool_conf.xml +++ b/test/functional/tools/sample_tool_conf.xml @@ -302,6 +302,8 @@ + + From bbfc6041b393e58c4d9f69d70bbeb830b0f683e2 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Mon, 25 Mar 2024 14:58:15 -0400 Subject: [PATCH 2/3] Add API test that fails because outputs aren't skipped --- lib/galaxy_test/api/test_workflows.py | 91 +++++++++++++++++++++-- lib/galaxy_test/base/workflow_fixtures.py | 21 ++++++ 2 files changed, 107 insertions(+), 5 deletions(-) diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 36549c820015..f0af906260e7 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -38,6 +38,7 @@ ) from galaxy_test.base.workflow_fixtures import ( NESTED_WORKFLOW_WITH_CONDITIONAL_SUBWORKFLOW_AND_DISCONNECTED_MAP_OVER_SOURCE, + WORKFLOW_FLAT_CROSS_PRODUCT, WORKFLOW_INPUTS_AS_OUTPUTS, WORKFLOW_NESTED_REPLACEMENT_PARAMETER, WORKFLOW_NESTED_RUNTIME_PARAMETER, @@ -2079,15 +2080,12 @@ def test_run_workflow_pick_value_bam_pja(self): change_datatype: bam tool_state: style_cond: - __current_case__: 2 pick_style: first_or_error type_cond: - __current_case__: 4 param_type: data pick_from: - - __index__: 0 - value: - __class__: RuntimeValue + - value: + __class__: RuntimeValue outputs: pick_out: outputSource: pick_value/data_param @@ -5114,6 +5112,89 @@ def test_run_with_default_file_in_step_inline(self): content = self.dataset_populator.get_history_dataset_content(history_id) assert "chr1" in content + def test_conditional_flat_crossproduct_subworkflow(self): + parent = yaml.safe_load( + """ +class: GalaxyWorkflow +inputs: + collection_a: collection + collection_b: collection + collection_c: collection +steps: + subworkflow_step: + run: null + in: + collection_a: collection_a + collection_b: collection_b + when: $(false) + pick_value: + tool_id: pick_value + in: + style_cond|type_cond|pick_from_0|value: + source: subworkflow_step/output_a + style_cond|type_cond|pick_from_1|value: + # we need a collection of same length as fallback, + # which makes this less intuitive than it could be. + source: collection_c + tool_state: + style_cond: + pick_style: first + type_cond: + param_type: data + pick_from: + - value: + __class__: RuntimeValue + - value: + __class__: RuntimeValue +outputs: + the_output: + outputSource: pick_value/data_param +test_data: + collection_a: + collection_type: list + elements: + - identifier: A + content: A + - identifier: B + content: B + collection_b: + collection_type: list + elements: + - identifier: C + content: C + - identifier: D + content: D + collection_c: + collection_type: list + elements: + - identifier: fallbackA + content: fallbackA + - identifier: fallbackBB + content: fallbackB + - identifier: fallbackC + content: fallbackC + - identifier: fallbackD + content: fallbackD +""" + ) + parent["steps"]["subworkflow_step"]["run"] = yaml.safe_load(WORKFLOW_FLAT_CROSS_PRODUCT) + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + parent, + history_id=history_id, + wait=True, + assert_ok=True, + ) + invocation = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + hdca_id = invocation["output_collections"]["the_output"]["id"] + hdca = self.dataset_populator.get_history_collection_details( + history_id=history_id, + content_id=hdca_id, + ) + # Following assert is what user would expect, but heuristic currently picks first input element as identifier source + # assert hdca["elements"][0]["element_identifier"] == "fallbackA" + assert "fallbackA" in hdca["elements"][0]["object"]["peek"] + def test_run_with_validated_parameter_connection_invalid(self): with self.dataset_populator.test_history() as history_id: self._run_jobs( diff --git a/lib/galaxy_test/base/workflow_fixtures.py b/lib/galaxy_test/base/workflow_fixtures.py index 912adecb32f9..4efb06f8ba92 100644 --- a/lib/galaxy_test/base/workflow_fixtures.py +++ b/lib/galaxy_test/base/workflow_fixtures.py @@ -1177,3 +1177,24 @@ format: txt location: https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed """ + + +WORKFLOW_FLAT_CROSS_PRODUCT = """ +class: GalaxyWorkflow +inputs: + collection_a: collection + collection_b: collection +steps: + cross_product: + tool_id: __CROSS_PRODUCT_FLAT__ + in: + input_a: + collection_a + input_b: + collection_b +outputs: + output_a: + outputSource: cross_product/output_a + output_b: + outputSource: cross_product/output_b +""" From 8807db4ca61509b6bfc67188f4f5ee4fa0d8bb0d Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 26 Feb 2024 13:40:17 +0000 Subject: [PATCH 3/3] Mark outputs skipped if necessary --- lib/galaxy/model/__init__.py | 12 ++++++++++++ lib/galaxy/tools/actions/__init__.py | 9 +-------- lib/galaxy/tools/actions/model_operations.py | 7 +++++-- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 7e6bbc9a40dc..f426fb72e6e6 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -152,6 +152,7 @@ ) from galaxy.model.orm.now import now from galaxy.model.orm.util import add_object_to_object_session +from galaxy.objectstore import ObjectStorePopulator from galaxy.schema.invocation import ( InvocationCancellationUserRequest, InvocationState, @@ -4581,6 +4582,17 @@ def get_quota_source_label(self): quota_source_label = property(get_quota_source_label) + def set_skipped(self, object_store_populator: ObjectStorePopulator): + assert self.dataset + object_store_populator.set_object_store_id(self) + self.extension = "expression.json" + self.state = self.states.OK + self.blurb = "skipped" + self.visible = False + with open(self.dataset.get_file_name(), "w") as out: + out.write(json.dumps(None)) + self.set_total_size() + def get_file_name(self, sync_cache=True) -> str: if self.dataset.purged: return "" diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index 8403e0137bbe..2c8f0fb354b9 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -675,14 +675,7 @@ def handle_output(name, output, hidden=None): hdca.visible = False object_store_populator = ObjectStorePopulator(trans.app, trans.user) for data in out_data.values(): - object_store_populator.set_object_store_id(data) - data.extension = "expression.json" - data.state = "ok" - data.blurb = "skipped" - data.visible = False - with open(data.dataset.get_file_name(), "w") as out: - out.write(json.dumps(None)) - data.set_total_size() + data.set_skipped(object_store_populator) job.preferred_object_store_id = preferred_object_store_id self._record_inputs(trans, tool, job, incoming, inp_data, inp_dataset_collections) self._record_outputs(job, out_data, output_collections) diff --git a/lib/galaxy/tools/actions/model_operations.py b/lib/galaxy/tools/actions/model_operations.py index a1e7230b9eeb..bedba42d4aa3 100644 --- a/lib/galaxy/tools/actions/model_operations.py +++ b/lib/galaxy/tools/actions/model_operations.py @@ -1,6 +1,7 @@ import logging from typing import TYPE_CHECKING +from galaxy.objectstore import ObjectStorePopulator from galaxy.tools.actions import ( DefaultToolAction, OutputCollections, @@ -137,8 +138,10 @@ def _produce_outputs( if skip: for output_collection in output_collections.out_collections.values(): output_collection.mark_as_populated() + object_store_populator = ObjectStorePopulator(trans.app, trans.user) for hdca in output_collections.out_collection_instances.values(): hdca.visible = False - # Would we also need to replace the datasets with skipped datasets? - + # Would we also need to replace the datasets with skipped datasets? + for data in hdca.dataset_instances: + data.set_skipped(object_store_populator) trans.sa_session.add_all(out_data.values())