From 339bdfd1465cada3a31cce9ca59bd1a46240f156 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 10 Sep 2024 12:29:20 -0400 Subject: [PATCH] Allow workflows to consumer deferred {src: url} dicts.. --- lib/galaxy/config/schemas/config_schema.yml | 13 +++++++ lib/galaxy/jobs/handler.py | 8 ++++- lib/galaxy/managers/hdas.py | 15 ++++++++ lib/galaxy/managers/jobs.py | 12 +++---- lib/galaxy/model/__init__.py | 28 ++++++++++++++- lib/galaxy/schema/invocation.py | 1 + lib/galaxy/workflow/run.py | 7 +++- lib/galaxy/workflow/run_request.py | 39 +++++++++++++++++---- lib/galaxy/workflow/scheduling_manager.py | 29 +++++++++++++-- lib/galaxy_test/api/test_workflows.py | 20 +++++++++-- lib/galaxy_test/base/populators.py | 27 ++++++++++++-- 11 files changed, 175 insertions(+), 24 deletions(-) diff --git a/lib/galaxy/config/schemas/config_schema.yml b/lib/galaxy/config/schemas/config_schema.yml index 5098d923d8ca..5659b80b5997 100644 --- a/lib/galaxy/config/schemas/config_schema.yml +++ b/lib/galaxy/config/schemas/config_schema.yml @@ -3738,6 +3738,19 @@ mapping: Optional configuration file similar to `job_config_file` to specify which Galaxy processes should schedule workflows. + workflow_scheduling_separate_materialization_iteration: + type: bool + default: true + required: false + desc: | + Workflows launched with URI/URL inputs that are not marked as 'deferred' + are "materialized" (or undeferred) by the workflow scheduler. This might be + a lengthy process. Setting this to 'True' will place the invocation back in + the queue after materialization before scheduling the workflow so it is less + likely to starve other workflow scheduling. Ideally, Galaxy would allow more + fine grain control of handlers but until then, this provides a way to tip the + balance between "doing more work" and "being more fair". + cache_user_job_count: type: bool default: false diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 0213a797aab4..e66065aab2e4 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -130,7 +130,13 @@ def setup_query(self): if self.grab_model is model.Job: grab_condition = self.grab_model.state == self.grab_model.states.NEW elif self.grab_model is model.WorkflowInvocation: - grab_condition = self.grab_model.state.in_((self.grab_model.states.NEW, self.grab_model.states.CANCELLING)) + grab_condition = self.grab_model.state.in_( + ( + self.grab_model.states.NEW, + self.grab_model.states.REQUIRES_MATERIALIZATION, + self.grab_model.states.CANCELLING, + ) + ) else: raise NotImplementedError(f"Grabbing {self.grab_model.__name__} not implemented") subq = ( diff --git a/lib/galaxy/managers/hdas.py b/lib/galaxy/managers/hdas.py index 9f8a6a34ec2f..d6fe40a26f50 100644 --- a/lib/galaxy/managers/hdas.py +++ b/lib/galaxy/managers/hdas.py @@ -44,6 +44,7 @@ taggable, users, ) +from galaxy.managers.context import ProvidesHistoryContext from galaxy.model import ( Job, JobStateHistory, @@ -51,6 +52,7 @@ ) from galaxy.model.base import transaction from galaxy.model.deferred import materializer_factory +from galaxy.model.dereference import dereference_to_model from galaxy.schema.schema import DatasetSourceType from galaxy.schema.storage_cleaner import ( CleanableItemsSummary, @@ -68,6 +70,7 @@ MinimalManagerApp, StructuredApp, ) +from galaxy.tool_util.parameters import DataRequestUri from galaxy.util.compression_utils import get_fileobj log = logging.getLogger(__name__) @@ -343,6 +346,18 @@ def _set_permissions(self, trans, hda, role_ids_dict): raise exceptions.RequestParameterInvalidException(error) +def dereference_input( + trans: ProvidesHistoryContext, data_request: DataRequestUri, history: Optional[model.History] = None +) -> model.HistoryDatasetAssociation: + target_history = history or trans.history + hda = dereference_to_model(trans.sa_session, trans.user, target_history, data_request) + permissions = trans.app.security_agent.history_get_default_permissions(target_history) + trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions, new=True, flush=False) + with transaction(trans.sa_session): + trans.sa_session.commit() + return hda + + class HDAStorageCleanerManager(base.StorageCleanerManager): def __init__(self, hda_manager: HDAManager, dataset_manager: datasets.DatasetManager): self.hda_manager = hda_manager diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 78213d4f3526..706caef3ef70 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -50,7 +50,10 @@ ProvidesUserContext, ) from galaxy.managers.datasets import DatasetManager -from galaxy.managers.hdas import HDAManager +from galaxy.managers.hdas import ( + dereference_input, + HDAManager, +) from galaxy.managers.histories import HistoryManager from galaxy.managers.lddas import LDDAManager from galaxy.managers.users import UserManager @@ -67,7 +70,6 @@ YIELD_PER_ROWS, ) from galaxy.model.base import transaction -from galaxy.model.dereference import dereference_to_model from galaxy.model.index_filter_util import ( raw_text_column_filter, text_column_filter, @@ -1219,12 +1221,8 @@ def dereference( def dereference_callback(data_request: DataRequestUri) -> DataRequestInternalHda: # a deferred dataset corresponding to request - hda = dereference_to_model(trans.sa_session, trans.user, trans.history, data_request) + hda = dereference_input(trans, data_request) new_hdas.append(DereferencedDatasetPair(hda, data_request)) - permissions = trans.app.security_agent.history_get_default_permissions(trans.history) - trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions, new=True, flush=False) - with transaction(trans.sa_session): - trans.sa_session.commit() return DataRequestInternalHda(id=hda.id) tool_state = RequestInternalToolState(tool_request.request) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index db2f57f7e433..ffc4763298cb 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -18,6 +18,7 @@ import string from collections import defaultdict from collections.abc import Callable +from dataclasses import dataclass from datetime import ( datetime, timedelta, @@ -8547,6 +8548,12 @@ class StoredWorkflowMenuEntry(Base, RepresentById): ) +@dataclass +class InputWithRequest: + input: Any + request: Dict[str, Any] + + class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializable): __tablename__ = "workflow_invocation" @@ -8778,6 +8785,7 @@ def poll_active_workflow_ids(engine, scheduler=None, handler=None): and_conditions = [ or_( WorkflowInvocation.state == WorkflowInvocation.states.NEW, + WorkflowInvocation.state == WorkflowInvocation.states.REQUIRES_MATERIALIZATION, WorkflowInvocation.state == WorkflowInvocation.states.READY, WorkflowInvocation.state == WorkflowInvocation.states.CANCELLING, ), @@ -8869,6 +8877,14 @@ def input_associations(self): inputs.append(input_dataset_collection_assoc) return inputs + def inputs_requiring_materialization(self): + hdas_to_materialize = [] + for input_dataset_assoc in self.input_datasets: + request = input_dataset_assoc.request + if request and not request.get("deferred", False): + hdas_to_materialize.append(input_dataset_assoc.dataset) + return hdas_to_materialize + def _serialize(self, id_encoder, serialization_options): invocation_attrs = dict_for(self) invocation_attrs["state"] = self.state @@ -9031,20 +9047,28 @@ def attach_step(request_to_content): else: request_to_content.workflow_step = step + request: Optional[Dict[str, Any]] = None + if isinstance(content, InputWithRequest): + request = content.request + content = content.input + history_content_type = getattr(content, "history_content_type", None) if history_content_type == "dataset": request_to_content = WorkflowRequestToInputDatasetAssociation() request_to_content.dataset = content + request_to_content.request = request attach_step(request_to_content) self.input_datasets.append(request_to_content) elif history_content_type == "dataset_collection": request_to_content = WorkflowRequestToInputDatasetCollectionAssociation() request_to_content.dataset_collection = content + request_to_content.request = request attach_step(request_to_content) self.input_dataset_collections.append(request_to_content) else: request_to_content = WorkflowRequestInputStepParameter() request_to_content.parameter_value = content + request_to_content.request = request attach_step(request_to_content) self.input_step_parameters.append(request_to_content) @@ -9468,6 +9492,7 @@ class WorkflowRequestToInputDatasetAssociation(Base, Dictifiable, Serializable): workflow_invocation_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_invocation.id"), index=True) workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id")) dataset_id: Mapped[Optional[int]] = mapped_column(ForeignKey("history_dataset_association.id"), index=True) + request: Mapped[Optional[Dict]] = mapped_column(JSONType) workflow_step: Mapped[Optional["WorkflowStep"]] = relationship() dataset: Mapped[Optional["HistoryDatasetAssociation"]] = relationship() @@ -9503,6 +9528,7 @@ class WorkflowRequestToInputDatasetCollectionAssociation(Base, Dictifiable, Seri workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship( back_populates="input_dataset_collections" ) + request: Mapped[Optional[Dict]] = mapped_column(JSONType) history_content_type = "dataset_collection" dict_collection_visible_keys = ["id", "workflow_invocation_id", "workflow_step_id", "dataset_collection_id", "name"] @@ -9526,6 +9552,7 @@ class WorkflowRequestInputStepParameter(Base, Dictifiable, Serializable): workflow_invocation_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_invocation.id"), index=True) workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id")) parameter_value: Mapped[Optional[bytes]] = mapped_column(MutableJSONType) + request: Mapped[Optional[Dict]] = mapped_column(JSONType) workflow_step: Mapped[Optional["WorkflowStep"]] = relationship() workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(back_populates="input_step_parameters") @@ -9549,7 +9576,6 @@ class WorkflowInvocationOutputDatasetAssociation(Base, Dictifiable, Serializable workflow_step_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_step.id"), index=True) dataset_id: Mapped[Optional[int]] = mapped_column(ForeignKey("history_dataset_association.id"), index=True) workflow_output_id: Mapped[Optional[int]] = mapped_column(ForeignKey("workflow_output.id"), index=True) - workflow_invocation: Mapped[Optional["WorkflowInvocation"]] = relationship(back_populates="output_datasets") workflow_step: Mapped[Optional["WorkflowStep"]] = relationship() dataset: Mapped[Optional["HistoryDatasetAssociation"]] = relationship() diff --git a/lib/galaxy/schema/invocation.py b/lib/galaxy/schema/invocation.py index 4d5ce80548e8..1040aac4d1b4 100644 --- a/lib/galaxy/schema/invocation.py +++ b/lib/galaxy/schema/invocation.py @@ -281,6 +281,7 @@ class InvocationMessageResponseModel(RootModel): class InvocationState(str, Enum): NEW = "new" # Brand new workflow invocation... maybe this should be same as READY + REQUIRES_MATERIALIZATION = "requires_materialization" # an otherwise NEW or READY workflow that requires inputs to be materialized (undeferred) READY = "ready" # Workflow ready for another iteration of scheduling. SCHEDULED = "scheduled" # Workflow has been scheduled. CANCELLED = "cancelled" diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 5ebb586fd56f..5619a4547b8f 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -144,7 +144,12 @@ def queue_invoke( ) workflow_invocation = workflow_run_config_to_request(trans, workflow_run_config, workflow) workflow_invocation.workflow = workflow - return trans.app.workflow_scheduling_manager.queue(workflow_invocation, request_params, flush=flush) + initial_state = model.WorkflowInvocation.states.NEW + if workflow_run_config.requires_materialization: + initial_state = model.WorkflowInvocation.states.REQUIRES_MATERIALIZATION + return trans.app.workflow_scheduling_manager.queue( + workflow_invocation, request_params, flush=flush, initial_state=initial_state + ) class WorkflowInvoker: diff --git a/lib/galaxy/workflow/run_request.py b/lib/galaxy/workflow/run_request.py index 516c85d07eb1..24b81fff849d 100644 --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -10,11 +10,13 @@ ) from galaxy import exceptions +from galaxy.managers.hdas import dereference_input from galaxy.model import ( EffectiveOutput, History, HistoryDatasetAssociation, HistoryDatasetCollectionAssociation, + InputWithRequest, LibraryDataset, LibraryDatasetDatasetAssociation, WorkflowInvocation, @@ -25,6 +27,7 @@ ensure_object_added_to_session, transaction, ) +from galaxy.tool_util.parameters import DataRequestUri from galaxy.tools.parameters.meta import expand_workflow_inputs from galaxy.workflow.resources import get_resource_mapper_function @@ -57,6 +60,9 @@ class WorkflowRunConfig: :param inputs: Map from step ids to dict's containing HDA for these steps. :type inputs: dict + :param requires_materialization: True if an input requires materialization before + the workflow is scheduled. + :param inputs_by: How inputs maps to inputs (datasets/collections) to workflows steps - by unencoded database id ('step_id'), index in workflow 'step_index' (independent of database), or by input name for @@ -78,6 +84,7 @@ def __init__( copy_inputs_to_history: bool = False, use_cached_job: bool = False, resource_params: Optional[Dict[int, Any]] = None, + requires_materialization: bool = False, preferred_object_store_id: Optional[str] = None, preferred_outputs_object_store_id: Optional[str] = None, preferred_intermediate_object_store_id: Optional[str] = None, @@ -91,6 +98,7 @@ def __init__( self.resource_params = resource_params or {} self.allow_tool_state_corrections = allow_tool_state_corrections self.use_cached_job = use_cached_job + self.requires_materialization = requires_materialization self.preferred_object_store_id = preferred_object_store_id self.preferred_outputs_object_store_id = preferred_outputs_object_store_id self.preferred_intermediate_object_store_id = preferred_intermediate_object_store_id @@ -310,7 +318,7 @@ def build_workflow_run_configs( legacy = payload.get("legacy", False) already_normalized = payload.get("parameters_normalized", False) raw_parameters = payload.get("parameters", {}) - + requires_materialization: bool = False run_configs = [] unexpanded_param_map = _normalize_step_parameters( workflow.steps, raw_parameters, legacy=legacy, already_normalized=already_normalized @@ -368,16 +376,22 @@ def build_workflow_run_configs( raise exceptions.RequestParameterInvalidException( f"Not input source type defined for input '{input_dict}'." ) - if "id" not in input_dict: - raise exceptions.RequestParameterInvalidException(f"Not input id defined for input '{input_dict}'.") + input_source = input_dict["src"] + if "id" not in input_dict and input_source != "url": + raise exceptions.RequestParameterInvalidException(f"No input id defined for input '{input_dict}'.") + elif input_source == "url" and not input_dict.get("url"): + raise exceptions.RequestParameterInvalidException( + f"Supplied 'url' is empty or absent for input '{input_dict}'." + ) if "content" in input_dict: raise exceptions.RequestParameterInvalidException( f"Input cannot specify explicit 'content' attribute {input_dict}'." ) - input_source = input_dict["src"] - input_id = input_dict["id"] + input_id = input_dict.get("id") try: + added_to_history = False if input_source == "ldda": + assert input_id ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, trans.security.decode_id(input_id)) assert ldda assert trans.user_is_admin or trans.app.security_agent.can_access_dataset( @@ -385,6 +399,7 @@ def build_workflow_run_configs( ) content = ldda.to_history_dataset_association(history, add_to_history=add_to_history) elif input_source == "ld": + assert input_id library_dataset = trans.sa_session.get(LibraryDataset, trans.security.decode_id(input_id)) assert library_dataset ldda = library_dataset.library_dataset_dataset_association @@ -394,6 +409,7 @@ def build_workflow_run_configs( ) content = ldda.to_history_dataset_association(history, add_to_history=add_to_history) elif input_source == "hda": + assert input_id # Get dataset handle, add to dict and history if necessary content = trans.sa_session.get(HistoryDatasetAssociation, trans.security.decode_id(input_id)) assert trans.user_is_admin or trans.app.security_agent.can_access_dataset( @@ -401,11 +417,21 @@ def build_workflow_run_configs( ) elif input_source == "hdca": content = app.dataset_collection_manager.get_dataset_collection_instance(trans, "history", input_id) + elif input_source == "url": + data_request = DataRequestUri.model_validate(input_dict) + hda: HistoryDatasetAssociation = dereference_input(trans, data_request, history) + added_to_history = True + content = InputWithRequest( + input=hda, + request=data_request.model_dump(mode="json"), + ) + if not data_request.deferred: + requires_materialization = True else: raise exceptions.RequestParameterInvalidException( f"Unknown workflow input source '{input_source}' specified." ) - if add_to_history and content.history != history: + if not added_to_history and add_to_history and content.history != history: if isinstance(content, HistoryDatasetCollectionAssociation): content = content.copy(element_destination=history, flush=False) else: @@ -474,6 +500,7 @@ def build_workflow_run_configs( allow_tool_state_corrections=allow_tool_state_corrections, use_cached_job=use_cached_job, resource_params=resource_params, + requires_materialization=requires_materialization, preferred_object_store_id=preferred_object_store_id, preferred_outputs_object_store_id=preferred_outputs_object_store_id, preferred_intermediate_object_store_id=preferred_intermediate_object_store_id, diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index 3868e24c13a9..d01c85d5d6bc 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -1,11 +1,17 @@ import os from functools import partial +from typing import Optional import galaxy.workflow.schedulers from galaxy import model from galaxy.exceptions import HandlerAssignmentError from galaxy.jobs.handler import InvocationGrabber from galaxy.model.base import transaction +from galaxy.schema.invocation import InvocationState +from galaxy.schema.tasks import ( + MaterializeDatasetInstanceTaskRequest, + RequestUser, +) from galaxy.util import plugin_config from galaxy.util.custom_logging import get_logger from galaxy.util.monitors import Monitors @@ -154,8 +160,9 @@ def shutdown(self): if exception: raise exception - def queue(self, workflow_invocation, request_params, flush=True): - workflow_invocation.set_state(model.WorkflowInvocation.states.NEW) + def queue(self, workflow_invocation, request_params, flush=True, initial_state: Optional[InvocationState] = None): + initial_state = initial_state or model.WorkflowInvocation.states.NEW + workflow_invocation.set_state(initial_state) workflow_invocation.scheduler = request_params.get("scheduler", None) or self.default_scheduler_id sa_session = self.app.model.context sa_session.add(workflow_invocation) @@ -329,7 +336,23 @@ def __schedule(self, workflow_scheduler_id, workflow_scheduler): def __attempt_schedule(self, invocation_id, workflow_scheduler): with self.app.model.context() as session: workflow_invocation = session.get(model.WorkflowInvocation, invocation_id) - + if workflow_invocation.state == workflow_invocation.states.REQUIRES_MATERIALIZATION: + hdas_to_materialize = workflow_invocation.inputs_requiring_materialization() + for hda in hdas_to_materialize: + user = RequestUser(user_id=workflow_invocation.history.user_id) + task_request = MaterializeDatasetInstanceTaskRequest( + user=user, + history_id=workflow_invocation.history.id, + source="hda", + content=hda.id, + ) + self.app.hda_manager.materialize(task_request, in_place=True) + # place back into ready and let it proceed normally on next iteration? + workflow_invocation.set_state(model.WorkflowInvocation.states.READY) + session.add(workflow_invocation) + session.commit() + if self.app.config.workflow_scheduling_separate_materialization_iteration: + return None try: if workflow_invocation.state == workflow_invocation.states.CANCELLING: workflow_invocation.cancel_invocation_steps() diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 0b3b38de88fd..23919b907945 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -1496,11 +1496,27 @@ def test_run_workflow_by_name(self): def test_run_workflow(self): self.__run_cat_workflow(inputs_by="step_id") - def __run_cat_workflow(self, inputs_by): + @skip_without_tool("cat1") + def test_run_workflow_by_deferred_url(self): + with self.dataset_populator.test_history() as history_id: + self.__run_cat_workflow(inputs_by="deferred_url", history_id=history_id) + input_dataset_details = self.dataset_populator.get_history_dataset_details(history_id, hid=1) + assert input_dataset_details["state"] == "deferred" + + @skip_without_tool("cat1") + def test_run_workflow_by_url(self): + with self.dataset_populator.test_history() as history_id: + self.__run_cat_workflow(inputs_by="url", history_id=history_id) + input_dataset_details = self.dataset_populator.get_history_dataset_details(history_id, hid=1) + assert input_dataset_details["state"] == "ok" + + def __run_cat_workflow(self, inputs_by, history_id: Optional[str] = None): workflow = self.workflow_populator.load_workflow(name="test_for_run") workflow["steps"]["0"]["uuid"] = str(uuid4()) workflow["steps"]["1"]["uuid"] = str(uuid4()) - workflow_request, _, workflow_id = self._setup_workflow_run(workflow, inputs_by=inputs_by) + workflow_request, _, workflow_id = self._setup_workflow_run( + workflow, inputs_by=inputs_by, history_id=history_id + ) invocation_id = self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request).json()[ "id" ] diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 8ccacd422a30..452355dd9571 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -2182,23 +2182,33 @@ def setup_workflow_run( workflow_id = self.create_workflow(workflow) if not history_id: history_id = self.dataset_populator.new_history() - hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3", wait=True) - hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6", wait=True) + hda1: Optional[Dict[str, Any]] = None + hda2: Optional[Dict[str, Any]] = None + label_map: Optional[Dict[str, Any]] = None + if inputs_by != "url": + hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3", wait=True) + hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6", wait=True) + label_map = {"WorkflowInput1": ds_entry(hda1), "WorkflowInput2": ds_entry(hda2)} workflow_request = dict( history=f"hist_id={history_id}", ) - label_map = {"WorkflowInput1": ds_entry(hda1), "WorkflowInput2": ds_entry(hda2)} if inputs_by == "step_id": + assert label_map ds_map = self.build_ds_map(workflow_id, label_map) workflow_request["ds_map"] = ds_map elif inputs_by == "step_index": + assert hda1 + assert hda2 index_map = {"0": ds_entry(hda1), "1": ds_entry(hda2)} workflow_request["inputs"] = json.dumps(index_map) workflow_request["inputs_by"] = "step_index" elif inputs_by == "name": + assert label_map workflow_request["inputs"] = json.dumps(label_map) workflow_request["inputs_by"] = "name" elif inputs_by in ["step_uuid", "uuid_implicitly"]: + assert hda1 + assert hda2 assert workflow, f"Must specify workflow for this inputs_by {inputs_by} parameter value" uuid_map = { workflow["steps"]["0"]["uuid"]: ds_entry(hda1), @@ -2207,6 +2217,16 @@ def setup_workflow_run( workflow_request["inputs"] = json.dumps(uuid_map) if inputs_by == "step_uuid": workflow_request["inputs_by"] = "step_uuid" + elif inputs_by in ["url", "deferred_url"]: + input_b64_1 = base64.b64encode(b"1 2 3").decode("utf-8") + input_b64_2 = base64.b64encode(b"4 5 6").decode("utf-8") + deferred = inputs_by == "deferred_url" + inputs = { + "WorkflowInput1": {"src": "url", "url": f"base64://{input_b64_1}", "ext": "txt", "deferred": deferred}, + "WorkflowInput2": {"src": "url", "url": f"base64://{input_b64_2}", "ext": "txt", "deferred": deferred}, + } + workflow_request["inputs"] = json.dumps(inputs) + workflow_request["inputs_by"] = "name" return workflow_request, history_id, workflow_id @@ -3262,6 +3282,7 @@ def get_state(): "queued", "new", "ready", + "requires_materialization", "stop", "stopped", "setting_metadata",