Skip to content

Commit

Permalink
Move TRS import into WorkflowContentManager
Browse files Browse the repository at this point in the history
which fixes missing source metadata when importing a workflow through a
landing page.
  • Loading branch information
mvdbeek committed Oct 28, 2024
1 parent 6f770f8 commit c28b591
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 48 deletions.
2 changes: 1 addition & 1 deletion lib/galaxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ def __init__(self, configure_logging=True, use_converters=True, use_display_appl
InstalledRepositoryManager, InstalledRepositoryManager(self)
)
self.dynamic_tool_manager = self._register_singleton(DynamicToolManager)
self.trs_proxy = self._register_singleton(TrsProxy, TrsProxy(self.config))
self._configure_datatypes_registry(
use_converters=use_converters,
use_display_applications=use_display_applications,
Expand Down Expand Up @@ -843,7 +844,6 @@ def __init__(self, **kwargs) -> None:
# Must be initialized after job_config.
self.workflow_scheduling_manager = scheduling_manager.WorkflowSchedulingManager(self)

self.trs_proxy = self._register_singleton(TrsProxy, TrsProxy(self.config))
# We need InteractiveToolManager before the job handler starts
self.interactivetool_manager = InteractiveToolManager(self)
# Start the job manager
Expand Down
21 changes: 3 additions & 18 deletions lib/galaxy/managers/landing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
ObjectNotFound,
RequestParameterMissingException,
)
from galaxy.managers.workflows import (
WorkflowContentsManager,
WorkflowCreateOptions,
)
from galaxy.managers.workflows import WorkflowContentsManager
from galaxy.model import (
ToolLandingRequest as ToolLandingRequestModel,
WorkflowLandingRequest as WorkflowLandingRequestModel,
Expand Down Expand Up @@ -106,21 +103,9 @@ def _ensure_workflow(self, trans: ProvidesUserContext, request: WorkflowLandingR
if request.workflow_source_type == "trs_url" and isinstance(trans.app, StructuredApp):
# trans is always structured app except for unit test
assert request.workflow_source
trs_id, trs_version = request.workflow_source.rsplit("/", 1)
_, trs_id, trs_version = trans.app.trs_proxy.get_trs_id_and_version_from_trs_url(request.workflow_source)
workflow = self.workflow_contents_manager.get_workflow_by_trs_id_and_version(
self.sa_session, trs_id=trs_id, trs_version=trs_version, user_id=trans.user and trans.user.id
workflow = self.workflow_contents_manager.get_or_create_workflow_from_trs(
trans, trs_url=request.workflow_source
)
if not workflow:
data = trans.app.trs_proxy.get_version_from_trs_url(request.workflow_source)
as_dict = yaml.safe_load(data)
raw_workflow_description = self.workflow_contents_manager.normalize_workflow_format(trans, as_dict)
created_workflow = self.workflow_contents_manager.build_workflow_from_raw_description(
trans,
raw_workflow_description,
WorkflowCreateOptions(),
)
workflow = created_workflow.workflow
request.workflow_id = workflow.id

def get_tool_landing_request(self, trans: ProvidesUserContext, uuid: UUID4) -> ToolLandingRequest:
Expand Down
56 changes: 53 additions & 3 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
text_column_filter,
)
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.schema.invocation import InvocationCancellationUserRequest
from galaxy.schema.schema import WorkflowIndexQueryPayload
from galaxy.structured_app import MinimalManagerApp
Expand Down Expand Up @@ -139,6 +138,7 @@
attach_ordered_steps,
has_cycles,
)
from galaxy.workflow.trs_proxy import TrsProxy

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -598,8 +598,10 @@ def add_serializers(self):


class WorkflowContentsManager(UsesAnnotations):
def __init__(self, app: MinimalManagerApp):

def __init__(self, app: MinimalManagerApp, trs_proxy: TrsProxy):
self.app = app
self.trs_proxy = trs_proxy
self._resource_mapper_function = get_resource_mapper_function(app)

def ensure_raw_description(self, dict_or_raw_description):
Expand Down Expand Up @@ -2016,9 +2018,57 @@ def get_all_tools(self, workflow):
tools.extend(self.get_all_tools(step.subworkflow))
return tools

def get_or_create_workflow_from_trs(
self,
trans: ProvidesUserContext,
trs_url: Optional[str],
trs_id: Optional[str] = None,
trs_version: Optional[str] = None,
trs_server: Optional[str] = None,
):
user_id = trans.user and trans.user.id
assert user_id, "Cannot create workflow for anonymous user"
if not trs_url:
assert trs_server and trs_id and trs_version, "trs_url or trs_server, trs_version and trs_id must be passed"
server = self.trs_proxy.get_server(trs_server)
trs_url = self.trs_proxy.get_trs_url(server._trs_url, trs_id, trs_version)
else:
_, trs_id, trs_version = self.trs_proxy.get_trs_id_and_version_from_trs_url(trs_url=trs_url)
assert trs_id and trs_version

workflow = self.get_workflow_by_trs_id_and_version(trs_id=trs_id, trs_version=trs_version, user_id=user_id)
if not workflow:
workflow = self.create_workflow_from_trs_url(trans, trs_url)
return workflow

def create_workflow_from_trs_url(
self,
trans: ProvidesUserContext,
trs_url: str,
) -> Workflow:
server, trs_tool_id, trs_version_id = self.trs_proxy.get_trs_id_and_version_from_trs_url(trs_url=trs_url)
data = self.trs_proxy.get_version_from_trs_url(trs_url)
as_dict = yaml.safe_load(data)
raw_workflow_description = self.normalize_workflow_format(trans, as_dict)
created_workflow = self.build_workflow_from_raw_description(
trans,
raw_workflow_description,
WorkflowCreateOptions(),
)
workflow = created_workflow.workflow
workflow.source_metadata = {
"trs_tool_id": trs_tool_id,
"trs_version_id": trs_version_id,
"trs_url": trs_url,
"trs_server": server._trs_url,
}
return workflow

def get_workflow_by_trs_id_and_version(
self, sa_session: galaxy_scoped_session, trs_id: str, trs_version: str, user_id: Optional[int] = None
self, trs_id: str, trs_version: str, user_id: Optional[int] = None
) -> Optional[model.Workflow]:
sa_session = self.app.model.session

def to_json(column, keys: List[str]):
assert sa_session.bind
if sa_session.bind.dialect.name == "postgresql":
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7817,7 +7817,7 @@ class Workflow(Base, Dictifiable, RepresentById):
reports_config: Mapped[Optional[bytes]] = mapped_column(JSONType)
creator_metadata: Mapped[Optional[bytes]] = mapped_column(JSONType)
license: Mapped[Optional[str]] = mapped_column(TEXT)
source_metadata: Mapped[Optional[bytes]] = mapped_column(JSONType)
source_metadata: Mapped[Optional[Dict[str, str]]] = mapped_column(JSONType)
uuid: Mapped[Optional[Union[UUID, str]]] = mapped_column(UUIDType)

steps = relationship(
Expand Down
36 changes: 11 additions & 25 deletions lib/galaxy/webapps/galaxy/api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,29 +245,14 @@ def create(self, trans: GalaxyWebTransaction, payload=None, **kwd):
payload["workflow"] = workflow_src
return self.__api_import_new_workflow(trans, payload, **kwd)
elif archive_source == "trs_tool":
server = None
trs_tool_id = None
trs_version_id = None
import_source = None
if "trs_url" in payload:
parts = self.app.trs_proxy.match_url(
payload["trs_url"], trans.app.config.fetch_url_allowlist_ips
)
if parts:
server = self.app.trs_proxy.server_from_url(parts["trs_base_url"])
trs_tool_id = parts["tool_id"]
trs_version_id = parts["version_id"]
payload["trs_tool_id"] = trs_tool_id
payload["trs_version_id"] = trs_version_id
else:
raise exceptions.RequestParameterInvalidException(f"Invalid TRS URL {payload['trs_url']}.")
else:
trs_server = payload.get("trs_server")
server = self.app.trs_proxy.get_server(trs_server)
trs_tool_id = payload.get("trs_tool_id")
trs_version_id = payload.get("trs_version_id")

archive_data = server.get_version_descriptor(trs_tool_id, trs_version_id)
workflow = self.workflow_contents_manager.get_or_create_workflow_from_trs(
trans,
trs_url=payload.get("trs_url"),
trs_id=payload.get("trs_tool_id"),
trs_version=payload.get("trs_version_id"),
trs_server=payload.get("trs_server"),
)
return self.__api_import_response(workflow)
else:
try:
archive_data = stream_url_to_str(
Expand Down Expand Up @@ -602,13 +587,14 @@ def __api_import_from_archive(self, trans: GalaxyWebTransaction, archive_data, s
workflow, missing_tool_tups = self._workflow_from_dict(
trans, raw_workflow_description, workflow_create_options, source=source
)
workflow_id = workflow.id
workflow = workflow.latest_workflow
return self.__api_import_response(workflow)

def __api_import_response(self, workflow: model.Workflow):
response = {
"message": f"Workflow '{workflow.name}' imported successfully.",
"status": "success",
"id": trans.security.encode_id(workflow_id),
"id": self.app.security.encode_id(workflow.id),
}
if workflow.has_errors:
response["message"] = "Imported, but some steps in this workflow have validation errors."
Expand Down
3 changes: 3 additions & 0 deletions lib/galaxy/workflow/trs_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def get_server(self, trs_server):
def server_from_url(self, trs_url):
return TrsServer(trs_url)

def get_trs_url(self, trs_base_url: str, tool_id: str, version_id: str):
return urllib.parse.urljoin(trs_base_url, f"{tool_id}/{version_id}")

def get_trs_id_and_version_from_trs_url(self, trs_url):
parts = self.match_url(trs_url, self.fetch_url_allowlist_ips)
if parts:
Expand Down

0 comments on commit c28b591

Please sign in to comment.