From 6b7ec051bc2890a122bae274d897ae2ec622558d Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 10 Sep 2024 12:29:20 -0400 Subject: [PATCH] Allow workflows to consumer deferred {src: url} dicts.. --- lib/galaxy/managers/hdas.py | 15 +++++++++++++++ lib/galaxy/managers/jobs.py | 12 +++++------- lib/galaxy/workflow/run_request.py | 21 +++++++++++++++++---- lib/galaxy_test/api/test_workflows.py | 13 +++++++++++-- lib/galaxy_test/base/populators.py | 19 ++++++++++++++++--- 5 files changed, 64 insertions(+), 16 deletions(-) diff --git a/lib/galaxy/managers/hdas.py b/lib/galaxy/managers/hdas.py index 9f8a6a34ec2f..d6fe40a26f50 100644 --- a/lib/galaxy/managers/hdas.py +++ b/lib/galaxy/managers/hdas.py @@ -44,6 +44,7 @@ taggable, users, ) +from galaxy.managers.context import ProvidesHistoryContext from galaxy.model import ( Job, JobStateHistory, @@ -51,6 +52,7 @@ ) from galaxy.model.base import transaction from galaxy.model.deferred import materializer_factory +from galaxy.model.dereference import dereference_to_model from galaxy.schema.schema import DatasetSourceType from galaxy.schema.storage_cleaner import ( CleanableItemsSummary, @@ -68,6 +70,7 @@ MinimalManagerApp, StructuredApp, ) +from galaxy.tool_util.parameters import DataRequestUri from galaxy.util.compression_utils import get_fileobj log = logging.getLogger(__name__) @@ -343,6 +346,18 @@ def _set_permissions(self, trans, hda, role_ids_dict): raise exceptions.RequestParameterInvalidException(error) +def dereference_input( + trans: ProvidesHistoryContext, data_request: DataRequestUri, history: Optional[model.History] = None +) -> model.HistoryDatasetAssociation: + target_history = history or trans.history + hda = dereference_to_model(trans.sa_session, trans.user, target_history, data_request) + permissions = trans.app.security_agent.history_get_default_permissions(target_history) + trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions, new=True, flush=False) + with transaction(trans.sa_session): + trans.sa_session.commit() + return hda + + class HDAStorageCleanerManager(base.StorageCleanerManager): def __init__(self, hda_manager: HDAManager, dataset_manager: datasets.DatasetManager): self.hda_manager = hda_manager diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 78213d4f3526..706caef3ef70 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -50,7 +50,10 @@ ProvidesUserContext, ) from galaxy.managers.datasets import DatasetManager -from galaxy.managers.hdas import HDAManager +from galaxy.managers.hdas import ( + dereference_input, + HDAManager, +) from galaxy.managers.histories import HistoryManager from galaxy.managers.lddas import LDDAManager from galaxy.managers.users import UserManager @@ -67,7 +70,6 @@ YIELD_PER_ROWS, ) from galaxy.model.base import transaction -from galaxy.model.dereference import dereference_to_model from galaxy.model.index_filter_util import ( raw_text_column_filter, text_column_filter, @@ -1219,12 +1221,8 @@ def dereference( def dereference_callback(data_request: DataRequestUri) -> DataRequestInternalHda: # a deferred dataset corresponding to request - hda = dereference_to_model(trans.sa_session, trans.user, trans.history, data_request) + hda = dereference_input(trans, data_request) new_hdas.append(DereferencedDatasetPair(hda, data_request)) - permissions = trans.app.security_agent.history_get_default_permissions(trans.history) - trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions, new=True, flush=False) - with transaction(trans.sa_session): - trans.sa_session.commit() return DataRequestInternalHda(id=hda.id) tool_state = RequestInternalToolState(tool_request.request) diff --git a/lib/galaxy/workflow/run_request.py b/lib/galaxy/workflow/run_request.py index 516c85d07eb1..10b5a38e4745 100644 --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -10,6 +10,7 @@ ) from galaxy import exceptions +from galaxy.managers.hdas import dereference_input from galaxy.model import ( EffectiveOutput, History, @@ -25,6 +26,7 @@ ensure_object_added_to_session, transaction, ) +from galaxy.tool_util.parameters import DataRequestUri from galaxy.tools.parameters.meta import expand_workflow_inputs from galaxy.workflow.resources import get_resource_mapper_function @@ -368,16 +370,21 @@ def build_workflow_run_configs( raise exceptions.RequestParameterInvalidException( f"Not input source type defined for input '{input_dict}'." ) - if "id" not in input_dict: - raise exceptions.RequestParameterInvalidException(f"Not input id defined for input '{input_dict}'.") + input_source = input_dict["src"] + if "id" not in input_dict and input_source != "url": + raise exceptions.RequestParameterInvalidException(f"No input id defined for input '{input_dict}'.") + elif input_source == "url" and not input_dict.get("url"): + raise exceptions.RequestParameterInvalidException( + f"Supplied 'url' is empty or absent for input '{input_dict}'." + ) if "content" in input_dict: raise exceptions.RequestParameterInvalidException( f"Input cannot specify explicit 'content' attribute {input_dict}'." ) - input_source = input_dict["src"] - input_id = input_dict["id"] + input_id = input_dict.get("id") try: if input_source == "ldda": + assert input_id ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, trans.security.decode_id(input_id)) assert ldda assert trans.user_is_admin or trans.app.security_agent.can_access_dataset( @@ -385,6 +392,7 @@ def build_workflow_run_configs( ) content = ldda.to_history_dataset_association(history, add_to_history=add_to_history) elif input_source == "ld": + assert input_id library_dataset = trans.sa_session.get(LibraryDataset, trans.security.decode_id(input_id)) assert library_dataset ldda = library_dataset.library_dataset_dataset_association @@ -394,6 +402,7 @@ def build_workflow_run_configs( ) content = ldda.to_history_dataset_association(history, add_to_history=add_to_history) elif input_source == "hda": + assert input_id # Get dataset handle, add to dict and history if necessary content = trans.sa_session.get(HistoryDatasetAssociation, trans.security.decode_id(input_id)) assert trans.user_is_admin or trans.app.security_agent.can_access_dataset( @@ -401,6 +410,10 @@ def build_workflow_run_configs( ) elif input_source == "hdca": content = app.dataset_collection_manager.get_dataset_collection_instance(trans, "history", input_id) + elif input_source == "url": + data_request = DataRequestUri.model_validate(input_dict) + hda: HistoryDatasetAssociation = dereference_input(trans, data_request, history) + content = hda else: raise exceptions.RequestParameterInvalidException( f"Unknown workflow input source '{input_source}' specified." diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 0b3b38de88fd..3a586255d21e 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -1496,11 +1496,20 @@ def test_run_workflow_by_name(self): def test_run_workflow(self): self.__run_cat_workflow(inputs_by="step_id") - def __run_cat_workflow(self, inputs_by): + @skip_without_tool("cat1") + def test_run_workflow_by_url(self): + with self.dataset_populator.test_history() as history_id: + self.__run_cat_workflow(inputs_by="url", history_id=history_id) + input_dataset_details = self.dataset_populator.get_history_dataset_details(history_id, hid=1) + assert input_dataset_details["state"] == "deferred" + + def __run_cat_workflow(self, inputs_by, history_id: Optional[str] = None): workflow = self.workflow_populator.load_workflow(name="test_for_run") workflow["steps"]["0"]["uuid"] = str(uuid4()) workflow["steps"]["1"]["uuid"] = str(uuid4()) - workflow_request, _, workflow_id = self._setup_workflow_run(workflow, inputs_by=inputs_by) + workflow_request, _, workflow_id = self._setup_workflow_run( + workflow, inputs_by=inputs_by, history_id=history_id + ) invocation_id = self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request).json()[ "id" ] diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 8ccacd422a30..25f9354f1c30 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -2182,12 +2182,16 @@ def setup_workflow_run( workflow_id = self.create_workflow(workflow) if not history_id: history_id = self.dataset_populator.new_history() - hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3", wait=True) - hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6", wait=True) + hda1: Optional[Dict[str, Any]] = None + hda2: Optional[Dict[str, Any]] = None + label_map: Optional[Dict[str, Any]] = None + if inputs_by != "url": + hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3", wait=True) + hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6", wait=True) + label_map = {"WorkflowInput1": ds_entry(hda1), "WorkflowInput2": ds_entry(hda2)} workflow_request = dict( history=f"hist_id={history_id}", ) - label_map = {"WorkflowInput1": ds_entry(hda1), "WorkflowInput2": ds_entry(hda2)} if inputs_by == "step_id": ds_map = self.build_ds_map(workflow_id, label_map) workflow_request["ds_map"] = ds_map @@ -2207,6 +2211,15 @@ def setup_workflow_run( workflow_request["inputs"] = json.dumps(uuid_map) if inputs_by == "step_uuid": workflow_request["inputs_by"] = "step_uuid" + elif inputs_by == "url": + input_b64_1 = base64.b64encode("1 2 3".encode("utf-8")).decode("utf-8") + input_b64_2 = base64.b64encode("4 5 6".encode("utf-8")).decode("utf-8") + inputs = { + "WorkflowInput1": {"src": "url", "url": f"base64://{input_b64_1}", "ext": "txt"}, + "WorkflowInput2": {"src": "url", "url": f"base64://{input_b64_2}", "ext": "txt"}, + } + workflow_request["inputs"] = json.dumps(inputs) + workflow_request["inputs_by"] = "name" return workflow_request, history_id, workflow_id