From 64e64bead210743e49590e213c4fccd79ddddd2b Mon Sep 17 00:00:00 2001 From: John Chilton Date: Sun, 8 Sep 2024 19:32:34 -0400 Subject: [PATCH] Implement {src: "url"} on the new request API... --- .../dev/tool_state_state_classes.plantuml.svg | 78 +++++++------ .../dev/tool_state_state_classes.plantuml.txt | 23 ++-- doc/source/dev/tool_state_task.plantuml.svg | 62 ++++++++++ doc/source/dev/tool_state_task.plantuml.txt | 25 +++++ lib/galaxy/managers/hdas.py | 7 +- lib/galaxy/managers/jobs.py | 95 ++++++++++++++-- lib/galaxy/model/deferred.py | 13 ++- lib/galaxy/model/dereference.py | 30 +++++ lib/galaxy/schema/tasks.py | 4 +- lib/galaxy/tool_util/parameters/__init__.py | 10 ++ lib/galaxy/tool_util/parameters/convert.py | 61 ++++++++-- lib/galaxy/tool_util/parameters/models.py | 106 +++++++++++++++--- lib/galaxy/tool_util/parameters/state.py | 9 ++ lib/galaxy/tools/__init__.py | 14 ++- lib/galaxy_test/base/populators.py | 2 +- test/unit/data/test_dereference.py | 13 +++ .../tool_util/parameter_specification.yml | 55 ++++++++- test/unit/tool_util/test_parameter_covert.py | 35 +++++- .../tool_util/test_parameter_specification.py | 25 +++++ 19 files changed, 571 insertions(+), 96 deletions(-) create mode 100644 doc/source/dev/tool_state_task.plantuml.svg create mode 100644 doc/source/dev/tool_state_task.plantuml.txt create mode 100644 lib/galaxy/model/dereference.py create mode 100644 test/unit/data/test_dereference.py diff --git a/doc/source/dev/tool_state_state_classes.plantuml.svg b/doc/source/dev/tool_state_state_classes.plantuml.svg index de86dbcd3787..28c7da1c9092 100644 --- a/doc/source/dev/tool_state_state_classes.plantuml.svg +++ b/doc/source/dev/tool_state_state_classes.plantuml.svg @@ -1,21 +1,17 @@ -galaxy.tool_util.parameters.stateToolStatestate_representation: strinput_state: Dict[str, Any]validate(input_models: ToolParameterBundle)_to_base_model(input_models: ToolParameterBundle): Optional[Type[BaseModel]]RequestToolStatestate_representation = "request"_to_base_model(input_models: ToolParameterBundle): Type[BaseModel]Object references of the form{src: "hda", id: <encoded_id>}.Allow mapping/reduce constructs.RequestInternalToolStatestate_representation = "request_internal"_to_base_model(input_models: ToolParameterBundle): Type[BaseModel]Object references of the form{src: "hda", id: <decoded_id>}.Allow mapping/reduce constructs.JobInternalToolStatestate_representation = "job_internal"_to_base_model(input_models: ToolParameterBundle): Type[BaseModel]Object references of the form{src: "hda", id: <decoded_id>}.Mapping constructs expanded out.(Defaults are inserted?)TestCaseToolStatestate_representation = "test_case"_to_base_model(input_models: ToolParameterBundle): Type[BaseModel]Object references of the form file name and URIs.Mapping constructs not allowed. WorkflowStepToolStatestate_representation = "workflow_step"_to_base_model(input_models: ToolParameterBundle): Type[BaseModel]Nearly everything optional except conditional discriminators. WorkflowStepLinkedToolStatestate_representation = "workflow_step_linked"_to_base_model(input_models: ToolParameterBundle): Type[BaseModel]Expect pre-process ``in`` dictionaries and bring in representationof links and defaults and validate them in model. decodeexpandpreprocess_links_and_defaultsgalaxy.tool_util.parameters.stateToolStatestate_representation: strinput_state: Dict[str, Any]validate(parameters: ToolParameterBundle)_to_base_model(parameters: ToolParameterBundle): Optional[Type[BaseModel]]RequestToolStatestate_representation = "request"_to_base_model(parameters: ToolParameterBundle): Type[BaseModel]Object references of the form{src: "hda", id: <encoded_id>}.Allow mapping/reduce constructs.RequestInternalToolStatestate_representation = "request_internal"_to_base_model(parameters: ToolParameterBundle): Type[BaseModel]Object references of the form{src: "hda", id: <decoded_id>}.Allow mapping/reduce constructs. Allows URI src dicts.RequestInternalDereferencedToolStatestate_representation = "request_internal"_to_base_model(parameters: ToolParameterBundle): Type[BaseModel]Object references of the form{src: "hda", id: <decoded_id>}.Allow mapping/reduce constructs. No URI src dicts - all converted to HDAs.JobInternalToolStatestate_representation = "job_internal"_to_base_model(parameters: ToolParameterBundle): Type[BaseModel]Object references of the form{src: "hda", id: <decoded_id>}.Mapping constructs expanded out.(Defaults are inserted?)decodedereferenceexpand \ No newline at end of file diff --git a/doc/source/dev/tool_state_task.plantuml.txt b/doc/source/dev/tool_state_task.plantuml.txt new file mode 100644 index 000000000000..64286f2f5ed5 --- /dev/null +++ b/doc/source/dev/tool_state_task.plantuml.txt @@ -0,0 +1,25 @@ +@startuml +'!include plantuml_options.txt +queue TaskQueue as queue +participant "queue_jobs Task" as task +participant "JobSubmitter.queue_jobs" as queue_jobs +participant "JobSubmitter.dereference" as dereference +participant "materialize Task" as materialize_task +participant "Tool.handle_input_async" as handle_input +participant "expand_meta_parameters_async" as expand +participant "ToolAction.execute" as tool_action + +queue -> task : +task -> queue_jobs : QueueJobs pydantic model +queue_jobs -> dereference : RequestInternalToolState +dereference -> queue_jobs : RequestInternalDereferencedToolState +queue_jobs -> materialize_task : HDA (with state deferred) +materialize_task -> queue_jobs : return when state is okay +queue_jobs -> handle_input : RequestInternalDereferencedToolState +handle_input -> expand : RequestInternalDereferencedToolState +expand -> handle_input : JobInternalToolState[] +loop over expanded job tool states + handle_input -> tool_action : + tool_action -> handle_input : A Galaxy Job +end +@enduml diff --git a/lib/galaxy/managers/hdas.py b/lib/galaxy/managers/hdas.py index 08ffca33fbb6..9f8a6a34ec2f 100644 --- a/lib/galaxy/managers/hdas.py +++ b/lib/galaxy/managers/hdas.py @@ -173,7 +173,7 @@ def create( session.commit() return hda - def materialize(self, request: MaterializeDatasetInstanceTaskRequest) -> None: + def materialize(self, request: MaterializeDatasetInstanceTaskRequest, in_place: bool = False) -> None: request_user: RequestUser = request.user materializer = materializer_factory( True, # attached... @@ -187,8 +187,9 @@ def materialize(self, request: MaterializeDatasetInstanceTaskRequest) -> None: else: dataset_instance = self.ldda_manager.get_accessible(request.content, user) history = self.app.history_manager.by_id(request.history_id) - new_hda = materializer.ensure_materialized(dataset_instance, target_history=history) - history.add_dataset(new_hda, set_hid=True) + new_hda = materializer.ensure_materialized(dataset_instance, target_history=history, in_place=in_place) + if not in_place: + history.add_dataset(new_hda, set_hid=True) session = self.session() with transaction(session): session.commit() diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 893a3bcaf1ac..78213d4f3526 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -1,5 +1,6 @@ import json import logging +from dataclasses import dataclass from datetime import ( date, datetime, @@ -10,6 +11,7 @@ Dict, List, Optional, + Tuple, Union, ) @@ -65,6 +67,7 @@ YIELD_PER_ROWS, ) from galaxy.model.base import transaction +from galaxy.model.dereference import dereference_to_model from galaxy.model.index_filter_util import ( raw_text_column_filter, text_column_filter, @@ -74,12 +77,22 @@ JobIndexQueryPayload, JobIndexSortByEnum, ) -from galaxy.schema.tasks import QueueJobs +from galaxy.schema.tasks import ( + MaterializeDatasetInstanceTaskRequest, + QueueJobs, +) from galaxy.security.idencoding import IdEncodingHelper from galaxy.structured_app import ( MinimalManagerApp, StructuredApp, ) +from galaxy.tool_util.parameters import ( + DataRequestInternalHda, + DataRequestUri, + dereference, + RequestInternalDereferencedToolState, + RequestInternalToolState, +) from galaxy.tools import Tool from galaxy.tools._types import ( ToolStateDumpedToJsonInternalT, @@ -1170,35 +1183,75 @@ def get_job(session, *where_clauses): return session.scalars(stmt).first() +@dataclass +class DereferencedDatasetPair: + hda: model.HistoryDatasetAssociation + request: DataRequestUri + + class JobSubmitter: def __init__( self, history_manager: HistoryManager, user_manager: UserManager, + hda_manager: HDAManager, app: MinimalManagerApp, ): self.history_manager = history_manager self.user_manager = user_manager + self.hda_manager = hda_manager self.app = app + def materialize_request_for( + self, trans: WorkRequestContext, hda: model.HistoryDatasetAssociation + ) -> MaterializeDatasetInstanceTaskRequest: + return MaterializeDatasetInstanceTaskRequest( + user=trans.async_request_user, + history_id=trans.history.id, + source="hda", + content=hda.id, + ) + + def dereference( + self, trans: WorkRequestContext, tool: Tool, request: QueueJobs, tool_request: ToolRequest + ) -> Tuple[RequestInternalDereferencedToolState, List[DereferencedDatasetPair]]: + new_hdas: List[DereferencedDatasetPair] = [] + + def dereference_callback(data_request: DataRequestUri) -> DataRequestInternalHda: + # a deferred dataset corresponding to request + hda = dereference_to_model(trans.sa_session, trans.user, trans.history, data_request) + new_hdas.append(DereferencedDatasetPair(hda, data_request)) + permissions = trans.app.security_agent.history_get_default_permissions(trans.history) + trans.app.security_agent.set_all_dataset_permissions(hda.dataset, permissions, new=True, flush=False) + with transaction(trans.sa_session): + trans.sa_session.commit() + return DataRequestInternalHda(id=hda.id) + + tool_state = RequestInternalToolState(tool_request.request) + return dereference(tool_state, tool, dereference_callback), new_hdas + def queue_jobs(self, tool: Tool, request: QueueJobs) -> None: - user = self.user_manager.by_id(request.user.user_id) + tool_request: ToolRequest = self._tool_request(request.tool_request_id) sa_session = self.app.model.context - tool_request: ToolRequest = cast(ToolRequest, sa_session.query(ToolRequest).get(request.tool_request_id)) - if tool_request is None: - raise Exception(f"Problem fetching request with ID {request.tool_request_id}") try: - target_history = tool_request.history + request_context = self._context(tool_request, request) + target_history = request_context.history use_cached_jobs = request.use_cached_jobs rerun_remap_job_id = request.rerun_remap_job_id - trans = WorkRequestContext( - self.app, - user, - history=target_history, - ) + tool_state: RequestInternalDereferencedToolState + new_hdas: List[DereferencedDatasetPair] + tool_state, new_hdas = self.dereference(request_context, tool, request, tool_request) + to_materialize_list: List[DereferencedDatasetPair] = [p for p in new_hdas if not p.request.deferred] + for to_materialize in to_materialize_list: + materialize_request = self.materialize_request_for(request_context, to_materialize.hda) + # API dataset materialization is immutable and produces new datasets + # here we just created the datasets - lets just materialize them in place + # and avoid extra and confusing input copies + self.hda_manager.materialize(materialize_request, in_place=True) tool.handle_input_async( - trans, + request_context, tool_request, + tool_state, history=target_history, use_cached_job=use_cached_jobs, rerun_remap_job_id=rerun_remap_job_id, @@ -1208,8 +1261,26 @@ def queue_jobs(self, tool: Tool, request: QueueJobs) -> None: with transaction(sa_session): sa_session.commit() except Exception as e: + log.exception("Problem here....") tool_request.state = ToolRequest.states.FAILED tool_request.state_message = str(e) sa_session.add(tool_request) with transaction(sa_session): sa_session.commit() + + def _context(self, tool_request: ToolRequest, request: QueueJobs) -> WorkRequestContext: + user = self.user_manager.by_id(request.user.user_id) + target_history = tool_request.history + trans = WorkRequestContext( + self.app, + user, + history=target_history, + ) + return trans + + def _tool_request(self, tool_request_id: int) -> ToolRequest: + sa_session = self.app.model.context + tool_request: ToolRequest = cast(ToolRequest, sa_session.query(ToolRequest).get(tool_request_id)) + if tool_request is None: + raise Exception(f"Problem fetching request with ID {tool_request_id}") + return tool_request diff --git a/lib/galaxy/model/deferred.py b/lib/galaxy/model/deferred.py index a241d1418c1c..ef4eeb801b60 100644 --- a/lib/galaxy/model/deferred.py +++ b/lib/galaxy/model/deferred.py @@ -95,6 +95,7 @@ def ensure_materialized( self, dataset_instance: Union[HistoryDatasetAssociation, LibraryDatasetDatasetAssociation], target_history: Optional[History] = None, + in_place: bool = False, ) -> HistoryDatasetAssociation: """Create a new detached dataset instance from the supplied instance. @@ -148,10 +149,14 @@ def ensure_materialized( history = dataset_instance.history except DetachedInstanceError: history = None - materialized_dataset_instance = HistoryDatasetAssociation( - create_dataset=False, # is the default but lets make this really clear... - history=history, - ) + + if not in_place: + materialized_dataset_instance = HistoryDatasetAssociation( + create_dataset=False, # is the default but lets make this really clear... + history=history, + ) + else: + materialized_dataset_instance = dataset_instance if attached: sa_session = self._sa_session if sa_session is None: diff --git a/lib/galaxy/model/dereference.py b/lib/galaxy/model/dereference.py new file mode 100644 index 000000000000..dd960e183c2b --- /dev/null +++ b/lib/galaxy/model/dereference.py @@ -0,0 +1,30 @@ +import os.path + +from galaxy.model import ( + DatasetSource, + HistoryDatasetAssociation, +) +from galaxy.tool_util.parameters import DataRequestUri + + +def dereference_to_model(sa_session, user, history, data_request_uri: DataRequestUri) -> HistoryDatasetAssociation: + # based on code from upload_common + name = os.path.basename(data_request_uri.url) + dbkey = "?" + hda = HistoryDatasetAssociation( + name=name, + extension=data_request_uri.ext, + dbkey=dbkey, # TODO + history=history, + create_dataset=True, + sa_session=sa_session, + ) + hda.state = hda.states.DEFERRED + dataset_source = DatasetSource() + dataset_source.source_uri = data_request_uri.url + hda.dataset.sources = [dataset_source] + + sa_session.add(hda) + sa_session.add(dataset_source) + history.add_dataset(hda, genome_build=dbkey, quota=False) + return hda diff --git a/lib/galaxy/schema/tasks.py b/lib/galaxy/schema/tasks.py index ad81ff1b7324..31255d968268 100644 --- a/lib/galaxy/schema/tasks.py +++ b/lib/galaxy/schema/tasks.py @@ -104,8 +104,8 @@ class MaterializeDatasetInstanceTaskRequest(Model): title="Content", description=( "Depending on the `source` it can be:\n" - "- The encoded id of the source library dataset\n" - "- The encoded id of the HDA\n" + "- The decoded id of the source library dataset\n" + "- The decoded id of the HDA\n" ), ) diff --git a/lib/galaxy/tool_util/parameters/__init__.py b/lib/galaxy/tool_util/parameters/__init__.py index ac0caf72e3a5..a1b222b9b692 100644 --- a/lib/galaxy/tool_util/parameters/__init__.py +++ b/lib/galaxy/tool_util/parameters/__init__.py @@ -1,6 +1,7 @@ from .case import test_case_state from .convert import ( decode, + dereference, encode, encode_test, ) @@ -30,6 +31,8 @@ DataCollectionRequest, DataParameterModel, DataRequest, + DataRequestInternalHda, + DataRequestUri, FloatParameterModel, HiddenParameterModel, IntegerParameterModel, @@ -45,6 +48,7 @@ validate_against_model, validate_internal_job, validate_internal_request, + validate_internal_request_dereferenced, validate_request, validate_test_case, validate_workflow_step, @@ -52,6 +56,7 @@ ) from .state import ( JobInternalToolState, + RequestInternalDereferencedToolState, RequestInternalToolState, RequestToolState, TestCaseToolState, @@ -79,6 +84,8 @@ "ToolParameterBundle", "ToolParameterBundleModel", "DataRequest", + "DataRequestInternalHda", + "DataRequestUri", "DataCollectionRequest", "ToolParameterModel", "IntegerParameterModel", @@ -106,6 +113,7 @@ "validate_against_model", "validate_internal_job", "validate_internal_request", + "validate_internal_request_dereferenced", "validate_request", "validate_test_case", "validate_workflow_step", @@ -118,6 +126,7 @@ "test_case_state", "RequestToolState", "RequestInternalToolState", + "RequestInternalDereferencedToolState", "flat_state_path", "keys_starting_with", "visit_input_values", @@ -126,6 +135,7 @@ "decode", "encode", "encode_test", + "dereference", "WorkflowStepToolState", "WorkflowStepLinkedToolState", ) diff --git a/lib/galaxy/tool_util/parameters/convert.py b/lib/galaxy/tool_util/parameters/convert.py index 77423034d82e..601b4d29da0f 100644 --- a/lib/galaxy/tool_util/parameters/convert.py +++ b/lib/galaxy/tool_util/parameters/convert.py @@ -17,11 +17,14 @@ DataCollectionRequest, DataParameterModel, DataRequest, + DataRequestInternalHda, + DataRequestUri, SelectParameterModel, ToolParameterBundle, ToolParameterT, ) from .state import ( + RequestInternalDereferencedToolState, RequestInternalToolState, RequestToolState, TestCaseToolState, @@ -42,10 +45,12 @@ def decode( external_state.validate(input_models) def decode_src_dict(src_dict: dict): - assert "id" in src_dict - decoded_dict = src_dict.copy() - decoded_dict["id"] = decode_id(src_dict["id"]) - return decoded_dict + if "id" in src_dict: + decoded_dict = src_dict.copy() + decoded_dict["id"] = decode_id(src_dict["id"]) + return decoded_dict + else: + return src_dict def decode_callback(parameter: ToolParameterT, value: Any): if parameter.parameter_type == "gx_data": @@ -79,10 +84,12 @@ def encode( """Prepare an external representation of tool state (request) for storing in the database (request_internal).""" def encode_src_dict(src_dict: dict): - assert "id" in src_dict - encoded_dict = src_dict.copy() - encoded_dict["id"] = encode_id(src_dict["id"]) - return encoded_dict + if "id" in src_dict: + encoded_dict = src_dict.copy() + encoded_dict["id"] = encode_id(src_dict["id"]) + return encoded_dict + else: + return src_dict def encode_callback(parameter: ToolParameterT, value: Any): if parameter.parameter_type == "gx_data": @@ -109,6 +116,44 @@ def encode_callback(parameter: ToolParameterT, value: Any): return request_state +DereferenceCallable = Callable[[DataRequestUri], DataRequestInternalHda] + + +def dereference( + internal_state: RequestInternalToolState, input_models: ToolParameterBundle, dereference: DereferenceCallable +) -> RequestInternalDereferencedToolState: + + def derefrence_dict(src_dict: dict): + src = src_dict.get("src") + if src == "url": + data_request_uri: DataRequestUri = DataRequestUri.model_validate(src_dict) + data_request_hda: DataRequestInternalHda = dereference(data_request_uri) + return data_request_hda.model_dump() + else: + return src_dict + + def dereference_callback(parameter: ToolParameterT, value: Any): + if parameter.parameter_type == "gx_data": + data_parameter = cast(DataParameterModel, parameter) + if data_parameter.multiple: + assert isinstance(value, list), str(value) + return list(map(derefrence_dict, value)) + else: + assert isinstance(value, dict), str(value) + return derefrence_dict(value) + else: + return VISITOR_NO_REPLACEMENT + + request_state_dict = visit_input_values( + input_models, + internal_state, + dereference_callback, + ) + request_state = RequestInternalDereferencedToolState(request_state_dict) + request_state.validate(input_models) + return request_state + + # interfaces for adapting test data dictionaries to tool request dictionaries # e.g. {class: File, path: foo.bed} => {src: hda, id: ab1235cdfea3} AdaptDatasets = Callable[[JsonTestDatasetDefDict], DataRequest] diff --git a/lib/galaxy/tool_util/parameters/models.py b/lib/galaxy/tool_util/parameters/models.py index d8f4e3816cd7..dd3684c82f0b 100644 --- a/lib/galaxy/tool_util/parameters/models.py +++ b/lib/galaxy/tool_util/parameters/models.py @@ -61,7 +61,13 @@ # + request_internal: This is a pydantic model to validate what Galaxy expects to find in the database, # in particular dataset and collection references should be decoded integers. StateRepresentationT = Literal[ - "request", "request_internal", "job_internal", "test_case_xml", "workflow_step", "workflow_step_linked" + "request", + "request_internal", + "request_internal_dereferenced", + "job_internal", + "test_case_xml", + "workflow_step", + "workflow_step_linked", ] @@ -239,40 +245,85 @@ def request_requires_value(self) -> bool: TestCaseDataSrcT = Literal["File"] -class DataRequest(StrictModel): - src: DataSrcT +class DataRequestHda(StrictModel): + src: Literal["hda"] = "hda" id: StrictStr -class BatchDataInstance(StrictModel): - src: MultiDataSrcT +class DataRequestLdda(StrictModel): + src: Literal["ldda"] = "ldda" + id: StrictStr + + +class DataRequestHdca(StrictModel): + src: Literal["hdca"] = "hdca" id: StrictStr -class MultiDataInstance(StrictModel): +class DataRequestUri(StrictModel): + # calling it url instead of uri to match data fetch schema... + src: Literal["url"] = "url" + url: StrictStr + ext: StrictStr + dbkey: StrictStr = "?" + deferred: StrictBool = False + + +DataRequest = Annotated[Union[DataRequestHda, DataRequestLdda, DataRequestUri], Field(discriminator="src")] + + +class BatchDataInstance(StrictModel): src: MultiDataSrcT id: StrictStr +MultiDataInstance = Annotated[ + Union[DataRequestHda, DataRequestLdda, DataRequestHdca, DataRequestUri], Field(discriminator="src") +] MultiDataRequest: Type = union_type([MultiDataInstance, List[MultiDataInstance]]) -class DataRequestInternal(StrictModel): - src: DataSrcT +class DataRequestInternalHda(StrictModel): + src: Literal["hda"] = "hda" id: StrictInt -class BatchDataInstanceInternal(StrictModel): - src: MultiDataSrcT +class DataRequestInternalLdda(StrictModel): + src: Literal["ldda"] = "ldda" + id: StrictInt + + +class DataRequestInternalHdca(StrictModel): + src: Literal["hdca"] = "hdca" id: StrictInt -class MultiDataInstanceInternal(StrictModel): +DataRequestInternal = Annotated[ + Union[DataRequestInternalHda, DataRequestInternalLdda, DataRequestUri], Field(discriminator="src") +] +DataRequestInternalDereferenced = Annotated[ + Union[DataRequestInternalHda, DataRequestInternalLdda], Field(discriminator="src") +] +DataJobInternal = DataRequestInternalDereferenced + + +class BatchDataInstanceInternal(StrictModel): src: MultiDataSrcT id: StrictInt +MultiDataInstanceInternal = Annotated[ + Union[DataRequestInternalHda, DataRequestInternalLdda, DataRequestInternalHdca, DataRequestUri], + Field(discriminator="src"), +] +MultiDataInstanceInternalDereferenced = Annotated[ + Union[DataRequestInternalHda, DataRequestInternalLdda, DataRequestInternalHdca], Field(discriminator="src") +] + MultiDataRequestInternal: Type = union_type([MultiDataInstanceInternal, List[MultiDataInstanceInternal]]) +MultiDataRequestInternalDereferenced: Type = union_type( + [MultiDataInstanceInternalDereferenced, List[MultiDataInstanceInternalDereferenced]] +) class DataParameterModel(BaseGalaxyToolParameterModelDefinition): @@ -300,6 +351,15 @@ def py_type_internal(self) -> Type: base_model = DataRequestInternal return optional_if_needed(base_model, self.optional) + @property + def py_type_internal_dereferenced(self) -> Type: + base_model: Type + if self.multiple: + base_model = MultiDataRequestInternalDereferenced + else: + base_model = DataRequestInternalDereferenced + return optional_if_needed(base_model, self.optional) + @property def py_type_test_case(self) -> Type: base_model: Type @@ -316,8 +376,13 @@ def pydantic_template(self, state_representation: StateRepresentationT) -> Dynam return allow_batching( dynamic_model_information_from_py_type(self, self.py_type_internal), BatchDataInstanceInternal ) + elif state_representation == "request_internal_dereferenced": + return allow_batching( + dynamic_model_information_from_py_type(self, self.py_type_internal_dereferenced), + BatchDataInstanceInternal, + ) elif state_representation == "job_internal": - return dynamic_model_information_from_py_type(self, self.py_type_internal) + return dynamic_model_information_from_py_type(self, self.py_type_internal_dereferenced) elif state_representation == "test_case_xml": return dynamic_model_information_from_py_type(self, self.py_type_test_case) elif state_representation == "workflow_step": @@ -357,7 +422,7 @@ def py_type_internal(self) -> Type: def pydantic_template(self, state_representation: StateRepresentationT) -> DynamicModelInformation: if state_representation == "request": return allow_batching(dynamic_model_information_from_py_type(self, self.py_type)) - elif state_representation == "request_internal": + elif state_representation in ["request_internal", "request_internal_dereferenced"]: return allow_batching(dynamic_model_information_from_py_type(self, self.py_type_internal)) elif state_representation == "job_internal": return dynamic_model_information_from_py_type(self, self.py_type_internal) @@ -1177,9 +1242,7 @@ def to_simple_model(input_parameter: Union[ToolParameterModel, ToolParameterT]) return cast(ToolParameterT, input_parameter) -def simple_input_models( - parameters: Union[List[ToolParameterModel], List[ToolParameterT]] -) -> Iterable[ToolParameterT]: +def simple_input_models(parameters: Union[List[ToolParameterModel], List[ToolParameterT]]) -> Iterable[ToolParameterT]: return [to_simple_model(m) for m in parameters] @@ -1198,6 +1261,12 @@ def create_request_internal_model(tool: ToolParameterBundle, name: str = "Dynami return create_field_model(tool.parameters, name, "request_internal") +def create_request_internal_dereferenced_model( + tool: ToolParameterBundle, name: str = "DynamicModelForTool" +) -> Type[BaseModel]: + return create_field_model(tool.parameters, name, "request_internal_dereferenced") + + def create_job_internal_model(tool: ToolParameterBundle, name: str = "DynamicModelForTool") -> Type[BaseModel]: return create_field_model(tool.parameters, name, "job_internal") @@ -1258,6 +1327,11 @@ def validate_internal_request(tool: ToolParameterBundle, request: Dict[str, Any] validate_against_model(pydantic_model, request) +def validate_internal_request_dereferenced(tool: ToolParameterBundle, request: Dict[str, Any]) -> None: + pydantic_model = create_request_internal_dereferenced_model(tool) + validate_against_model(pydantic_model, request) + + def validate_internal_job(tool: ToolParameterBundle, request: Dict[str, Any]) -> None: pydantic_model = create_job_internal_model(tool) validate_against_model(pydantic_model, request) diff --git a/lib/galaxy/tool_util/parameters/state.py b/lib/galaxy/tool_util/parameters/state.py index 94eb48b5de52..af15cf23ac7b 100644 --- a/lib/galaxy/tool_util/parameters/state.py +++ b/lib/galaxy/tool_util/parameters/state.py @@ -15,6 +15,7 @@ from .models import ( create_job_internal_model, + create_request_internal_dereferenced_model, create_request_internal_model, create_request_model, create_test_case_model, @@ -83,6 +84,14 @@ def _parameter_model_for(cls, parameters: ToolParameterBundle) -> Type[BaseModel return create_request_internal_model(parameters) +class RequestInternalDereferencedToolState(ToolState): + state_representation: Literal["request_internal_dereferenced"] = "request_internal_dereferenced" + + @classmethod + def _parameter_model_for(cls, parameters: ToolParameterBundle) -> Type[BaseModel]: + return create_request_internal_dereferenced_model(parameters) + + class JobInternalToolState(ToolState): state_representation: Literal["job_internal"] = "job_internal" diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 7c9238d6b737..2f9f7978e9e4 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -75,6 +75,7 @@ from galaxy.tool_util.parameters import ( input_models_for_pages, JobInternalToolState, + RequestInternalDereferencedToolState, RequestInternalToolState, ToolParameterBundle, ) @@ -1998,8 +1999,9 @@ def completed_jobs( def handle_input_async( self, - trans, + request_context: WorkRequestContext, tool_request: ToolRequest, + tool_state: RequestInternalDereferencedToolState, history: Optional[model.History] = None, use_cached_job: bool = DEFAULT_USE_CACHED_JOB, preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, @@ -2007,15 +2009,15 @@ def handle_input_async( input_format: str = "legacy", ): """The tool request API+tasks version of handle_input.""" - request_context = proxy_work_context_for_history(trans, history=history) - tool_request_state = RequestInternalToolState(tool_request.request) all_params, all_errors, collection_info, job_tool_states = self.expand_incoming_async( - request_context, tool_request_state, rerun_remap_job_id + request_context, tool_state, rerun_remap_job_id ) self.handle_incoming_errors(all_errors) - mapping_params = MappingParameters(tool_request.request, all_params, tool_request_state, job_tool_states) - completed_jobs: Dict[int, Optional[model.Job]] = self.completed_jobs(trans, use_cached_job, all_params) + mapping_params = MappingParameters(tool_request.request, all_params, tool_state, job_tool_states) + completed_jobs: Dict[int, Optional[model.Job]] = self.completed_jobs( + request_context, use_cached_job, all_params + ) execute_async( request_context, self, diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 33515c9ec071..8ccacd422a30 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -1451,7 +1451,7 @@ def is_ready(): return state() == "submitted" def get_tool_request(self, tool_request_id: str) -> Dict[str, Any]: - response = self._get(f"tool_requests/{tool_request_id}/state") + response = self._get(f"tool_requests/{tool_request_id}") api_asserts.assert_status_code_is_ok(response) return response.json() diff --git a/test/unit/data/test_dereference.py b/test/unit/data/test_dereference.py new file mode 100644 index 000000000000..e7df3084cb73 --- /dev/null +++ b/test/unit/data/test_dereference.py @@ -0,0 +1,13 @@ +from galaxy.model.dereference import dereference_to_model +from galaxy.tool_util.parameters import DataRequestUri +from .model.test_model_store import setup_fixture_context_with_history + +TEST_URI = "gxfiles://test/1.bed" + + +def test_dereference(): + app, sa_session, user, history = setup_fixture_context_with_history() + uri_request = DataRequestUri(url=TEST_URI, ext="bed") + hda = dereference_to_model(sa_session, user, history, uri_request) + assert hda.name == "1.bed" + assert hda.dataset.sources[0].source_uri == TEST_URI diff --git a/test/unit/tool_util/parameter_specification.yml b/test/unit/tool_util/parameter_specification.yml index d4d3e07c0218..ed6b1abbb319 100644 --- a/test/unit/tool_util/parameter_specification.yml +++ b/test/unit/tool_util/parameter_specification.yml @@ -118,17 +118,25 @@ gx_int_required_via_empty_string: <<: *gx_int_required gx_text: - request_valid: + request_valid: &gx_text_request_valid - parameter: moocow - parameter: 'some spaces' - parameter: '' - {} - request_invalid: + request_invalid: &gx_text_request_invalid - parameter: 5 - parameter: null - parameter: {} - parameter: { "moo": "cow" } - parameter: {__class__: 'ConnectedValue'} + request_internal_valid: + *gx_text_request_valid + request_internal_invalid: + *gx_text_request_invalid + request_internal_dereferenced_valid: + *gx_text_request_valid + request_internal_dereferenced_invalid: + *gx_text_request_invalid workflow_step_valid: - parameter: moocow - parameter: 'some spaces' @@ -199,11 +207,15 @@ gx_select: - parameter: null - parameter: {} - parameter: 5 - request_internal_valid: + request_internal_valid: &gx_select_request_valid - parameter: "--ex1" - parameter: "ex2" - request_internal_invalid: + request_internal_invalid: &gx_select_request_invalid - parameter: {} + request_internal_dereferenced_valid: + *gx_select_request_valid + request_internal_dereferenced_invalid: + *gx_select_request_invalid test_case_xml_valid: - parameter: 'ex2' - parameter: '--ex1' @@ -501,6 +513,7 @@ gx_color: gx_data: request_valid: - parameter: {src: hda, id: abcdabcd} + - parameter: {src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", "ext": "txt"} - parameter: {__class__: "Batch", values: [{src: hdca, id: abcdabcd}]} request_invalid: - parameter: {__class__: "Batch", values: [{src: hdca, id: 5}]} @@ -518,6 +531,7 @@ gx_data: - parameter: {__class__: "Batch", values: [{src: hdca, id: 5}]} - parameter: {src: hda, id: 5} - parameter: {src: hda, id: 0} + - parameter: {src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", ext: "txt"} request_internal_invalid: - parameter: {__class__: "Batch", values: [{src: hdca, id: abcdabcd}]} - parameter: {src: hda, id: abcdabcd} @@ -526,6 +540,14 @@ gx_data: - parameter: true - parameter: 5 - parameter: "5" + request_internal_dereferenced_valid: + - parameter: {__class__: "Batch", values: [{src: hdca, id: 5}]} + - parameter: {src: hda, id: 5} + - parameter: {src: hda, id: 0} + request_internal_dereferenced_invalid: + # the difference between request internal and request internal dereferenced is that these have been converted + # to datasets. + - parameter: {src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", ext: "txt"} job_internal_valid: - parameter: {src: hda, id: 7} job_internal_invalid: @@ -533,6 +555,8 @@ gx_data: # expanded out. - parameter: {__class__: "Batch", values: [{src: hdca, id: 5}]} - parameter: {src: hda, id: abcdabcd} + # url parameters should be dereferrenced into datasets by this point... + - parameter: {src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", "ext": "txt"} test_case_xml_valid: - parameter: {class: File, path: foo.bed} - parameter: {class: File, location: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt"} @@ -627,6 +651,8 @@ gx_data_multiple: - parameter: [{src: hda, id: 5}] - parameter: [{src: hdca, id: 5}] - parameter: [{src: hdca, id: 5}, {src: hda, id: 5}] + - parameter: [{src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", ext: "txt"}] + - parameter: {__class__: "Batch", values: [{src: hdca, id: 5}]} request_internal_invalid: - parameter: {src: hda, id: abcdabcd} - parameter: [{src: hdca, id: abcdabcd}, {src: hda, id: abcdabcd}] @@ -636,6 +662,14 @@ gx_data_multiple: - parameter: true - parameter: 5 - parameter: "5" + request_internal_dereferenced_valid: + - parameter: {__class__: "Batch", values: [{src: hdca, id: 5}]} + - parameter: [{src: hda, id: 5}] + - parameter: [{src: hda, id: 0}] + request_internal_dereferenced_invalid: + # the difference between request internal and request internal dereferenced is that these have been converted + # to datasets. + - parameter: [{src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", ext: "txt"}] gx_data_multiple_optional: request_valid: @@ -646,6 +680,8 @@ gx_data_multiple_optional: - parameter: [{src: hdca, id: abcdabcd}, {src: hda, id: abcdabcd}] - parameter: null - {} + - parameter: {src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", ext: "txt"} + - parameter: [{src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", ext: "txt"}] request_invalid: - parameter: {src: hda, id: 5} - parameter: {} @@ -660,12 +696,19 @@ gx_data_multiple_optional: - parameter: [{src: hdca, id: 5}, {src: hda, id: 5}] - parameter: null - {} + - parameter: [{src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", ext: "txt"}] request_internal_invalid: - parameter: {src: hda, id: abcdabcd} - parameter: {} - parameter: true - parameter: 5 - parameter: "5" + request_internal_dereferenced_valid: + - parameter: {src: hda, id: 5} + request_internal_dereferenced_invalid: + - parameter: {src: hda, id: abcdabcd} + - parameter: [{src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", ext: "txt"}] + - parameter: {src: url, url: "https://raw.githubusercontent.com/galaxyproject/planemo/7be1bf5b3971a43eaa73f483125bfb8cabf1c440/tests/data/hello.txt", ext: "txt"} gx_data_collection: request_valid: @@ -692,6 +735,10 @@ gx_data_collection: - parameter: true - parameter: 5 - parameter: "5" + request_internal_dereferenced_valid: + - parameter: {src: hdca, id: 5} + request_internal_dereferenced_invalid: + - parameter: {src: hdca, id: abcdabcd} workflow_step_valid: - {} workflow_step_invalid: diff --git a/test/unit/tool_util/test_parameter_covert.py b/test/unit/tool_util/test_parameter_covert.py index 434032c1c1bc..1b2891fb97df 100644 --- a/test/unit/tool_util/test_parameter_covert.py +++ b/test/unit/tool_util/test_parameter_covert.py @@ -1,9 +1,17 @@ -from typing import Dict +from typing import ( + Dict, + Optional, +) from galaxy.tool_util.parameters import ( + DataRequestInternalHda, + DataRequestUri, decode, + dereference, encode, input_models_for_tool_source, + RequestInternalDereferencedToolState, + RequestInternalToolState, RequestToolState, ) from .test_parameter_test_cases import tool_source_for @@ -91,6 +99,31 @@ def test_multi_data(): assert encoded_state.input_state["parameter"][1]["id"] == EXAMPLE_ID_2_ENCODED +def test_dereference(): + tool_source = tool_source_for("parameters/gx_data") + bundle = input_models_for_tool_source(tool_source) + raw_request_state = {"parameter": {"src": "url", "url": "gxfiles://mystorage/1.bed", "ext": "bed"}} + request_state = RequestInternalToolState(raw_request_state) + request_state.validate(bundle) + + exception: Optional[Exception] = None + try: + # quickly verify this request needs to be dereferenced + bad_state = RequestInternalDereferencedToolState(raw_request_state) + bad_state.validate(bundle) + except Exception as e: + exception = e + assert exception is not None + + dereferenced_state = dereference(request_state, bundle, _fake_dereference) + assert isinstance(dereferenced_state, RequestInternalDereferencedToolState) + dereferenced_state.validate(bundle) + + +def _fake_dereference(input: DataRequestUri) -> DataRequestInternalHda: + return DataRequestInternalHda(id=EXAMPLE_ID_1) + + def _fake_decode(input: str) -> int: return next(key for key, value in ID_MAP.items() if value == input) diff --git a/test/unit/tool_util/test_parameter_specification.py b/test/unit/tool_util/test_parameter_specification.py index 6a54d78cfabf..01afc9b37a40 100644 --- a/test/unit/tool_util/test_parameter_specification.py +++ b/test/unit/tool_util/test_parameter_specification.py @@ -18,6 +18,7 @@ ToolParameterBundleModel, validate_internal_job, validate_internal_request, + validate_internal_request_dereferenced, validate_request, validate_test_case, validate_workflow_step, @@ -91,6 +92,8 @@ def _test_file(file: str, specification=None, parameter_bundle: Optional[ToolPar "request_invalid": _assert_requests_invalid, "request_internal_valid": _assert_internal_requests_validate, "request_internal_invalid": _assert_internal_requests_invalid, + "request_internal_dereferenced_valid": _assert_internal_requests_dereferenced_validate, + "request_internal_dereferenced_invalid": _assert_internal_requests_dereferenced_invalid, "job_internal_valid": _assert_internal_jobs_validate, "job_internal_invalid": _assert_internal_jobs_invalid, "test_case_xml_valid": _assert_test_cases_validate, @@ -153,6 +156,26 @@ def _assert_internal_request_invalid(parameters: ToolParameterBundleModel, reque ), f"Parameters {parameters} didn't result in validation error on internal request {request} as expected." +def _assert_internal_request_dereferenced_validates( + parameters: ToolParameterBundleModel, request: RawStateDict +) -> None: + try: + validate_internal_request_dereferenced(parameters, request) + except RequestParameterInvalidException as e: + raise AssertionError(f"Parameters {parameters} failed to validate dereferenced internal request {request}. {e}") + + +def _assert_internal_request_dereferenced_invalid(parameters: ToolParameterBundleModel, request: RawStateDict) -> None: + exc = None + try: + validate_internal_request_dereferenced(parameters, request) + except RequestParameterInvalidException as e: + exc = e + assert ( + exc is not None + ), f"Parameters {parameters} didn't result in validation error on dereferenced internal request {request} as expected." + + def _assert_internal_job_validates(parameters: ToolParameterBundleModel, request: RawStateDict) -> None: try: validate_internal_job(parameters, request) @@ -235,6 +258,8 @@ def _assert_workflow_step_linked_invalid( _assert_requests_invalid = partial(_for_each, _assert_request_invalid) _assert_internal_requests_validate = partial(_for_each, _assert_internal_request_validates) _assert_internal_requests_invalid = partial(_for_each, _assert_internal_request_invalid) +_assert_internal_requests_dereferenced_validate = partial(_for_each, _assert_internal_request_dereferenced_validates) +_assert_internal_requests_dereferenced_invalid = partial(_for_each, _assert_internal_request_dereferenced_invalid) _assert_internal_jobs_validate = partial(_for_each, _assert_internal_job_validates) _assert_internal_jobs_invalid = partial(_for_each, _assert_internal_job_invalid) _assert_test_cases_validate = partial(_for_each, _assert_test_case_validates)