Skip to content

Commit

Permalink
Allow job files to consume TUS uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jan 15, 2024
1 parent fb5199b commit 460052a
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/galaxy/dependencies/pinned-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ promise==2.3 ; python_version >= "3.8" and python_version < "3.12"
prompt-toolkit==3.0.43 ; python_version >= "3.8" and python_version < "3.12"
prov==1.5.1 ; python_version >= "3.8" and python_version < "3.12"
psutil==5.9.7 ; python_version >= "3.8" and python_version < "3.12"
pulsar-galaxy-lib==0.15.5 ; python_version >= "3.8" and python_version < "3.12"
pulsar-galaxy-lib==0.15.6 ; python_version >= "3.8" and python_version < "3.12"
pyasn1==0.5.1 ; python_version >= "3.8" and python_version < "3.12"
pycparser==2.21 ; python_version >= "3.8" and python_version < "3.12"
pycryptodome==3.19.1 ; python_version >= "3.8" and python_version < "3.12"
Expand Down
51 changes: 51 additions & 0 deletions lib/galaxy/webapps/galaxy/api/job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import logging
import os
import re
import shutil

from galaxy import (
Expand Down Expand Up @@ -99,6 +100,14 @@ def create(self, trans, job_id, payload, **kwargs):
upload_store
), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})"
input_file = open(file_path)
elif "session_id" in payload:
# code stolen from basic.py
session_id = payload["session_id"]
upload_store = trans.app.config.tus_upload_store or trans.app.config.new_file_path
if re.match(r"^[\w-]+$", session_id) is None:
raise ValueError("Invalid session id format.")
local_filename = os.path.abspath(os.path.join(upload_store, session_id))
input_file = open(local_filename)
else:
input_file = payload.get("file", payload.get("__file", None)).file
target_dir = os.path.dirname(path)
Expand All @@ -114,6 +123,48 @@ def create(self, trans, job_id, payload, **kwargs):
pass
return {"message": "ok"}

@expose_api_raw_anonymous_and_sessionless
def tus_patch(self, trans, **kwds):
"""
Exposed as PATCH /api/job_files/resumable_upload.
I think based on the docs, a separate tusd server is needed for job files if
also hosting one for use facing uploads.
Setting up tusd for job files should just look like (I think):
tusd -host localhost -port 1080 -upload-dir=<galaxy_root>/database/tmp
Shouldn't need the API key and session stuff the upload tusd server uses.
I would love to check the job state with __authorize_job_access on the first
POST but it seems like TusMiddleware doesn't default to coming in here for that
initial POST the way it does for the subsequent PATCHes. Ultimately, the upload
is still authorized before the write done with POST /api/jobs/<job_id>/files
so I think there is no route here to mess with user data - the worst of the security
issues that can be caused is filling up the sever with needless files that aren't
acted on. Since this endpoint is not meant for public consumption - all the job
files stuff and the TUS server should be blocked to public IPs anyway and restricted
to your Pulsar servers and similar targeting could be accomplished with a user account
and the user facing upload endpoints.
"""
return None

@expose_api_raw_anonymous_and_sessionless
def tus_hook(self, trans, **kwds):
"""No-op but if hook specified the way we do for user upload it would hit this action."""
pass

@expose_api_raw_anonymous_and_sessionless
def tus_post(self, trans, **kwds):
"""The middleware prevents reaching this endpoint.
In here mirroring the routes if not the actions setup for the
user upload endpoint. Can we just drop this from the buildapp
route definition for both endpoints?
"""
pass

def __authorize_job_access(self, trans, encoded_job_id, **kwargs):
for key in ["path", "job_key"]:
if key not in kwargs:
Expand Down
23 changes: 23 additions & 0 deletions lib/galaxy/webapps/galaxy/buildapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,18 @@ def populate_api_routes(webapp, app):
)
webapp.mapper.connect("/api/upload/resumable_upload", controller="uploads", action="hooks")
webapp.mapper.connect("/api/upload/hooks", controller="uploads", action="hooks", conditions=dict(method=["POST"]))

webapp.mapper.connect(
"/api/job_files/resumable_upload/{session_id}",
controller="job_files",
action="tus_patch",
conditions=dict(method=["PATCH"]),
)
webapp.mapper.connect("/api/job_files/resumable_upload", controller="job_files", action="tus_post")
webapp.mapper.connect(
"/api/job_files/hooks", controller="job_files", action="tus_hook", conditions=dict(method=["POST"])
)

webapp.mapper.resource(
"revision",
"revisions",
Expand Down Expand Up @@ -1103,6 +1115,17 @@ def wrap_in_middleware(app, global_conf, application_stack, **local_conf):
"max_size": application_stack.config.maximum_upload_file_size,
},
)
# TUS upload middleware for job files....
app = wrap_if_allowed(
app,
stack,
TusMiddleware,
kwargs={
"upload_path": urljoin(f"{application_stack.config.galaxy_url_prefix}/", "api/job_files/resumable_upload"),
"tmp_dir": application_stack.config.tus_upload_store or application_stack.config.new_file_path,
"max_size": application_stack.config.maximum_upload_file_size,
},
)
# X-Forwarded-Host handling
app = wrap_if_allowed(app, stack, XForwardedHostMiddleware)
# Request ID middleware
Expand Down
35 changes: 35 additions & 0 deletions test/integration/test_job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io
import os
import tempfile
from typing import Dict

import requests
from sqlalchemy import select
from tusclient import client

from galaxy import model
from galaxy.model.base import (
Expand All @@ -35,6 +37,7 @@

TEST_INPUT_TEXT = "test input content\n"
TEST_FILE_IO = io.StringIO("some initial text data")
TEST_TUS_CHUNK_SIZE = 1024


class TestJobFilesIntegration(integration_util.IntegrationTestCase):
Expand Down Expand Up @@ -110,6 +113,38 @@ def files():
response = requests.post(post_url, data=data, files=files())
_assert_insufficient_permissions(response)

def test_write_with_tus(self):
# shared setup with above test
job, output_hda, _ = self.create_static_job_with_state("running")
job_id, job_key = self._api_job_keys(job)
path = self._app.object_store.get_filename(output_hda.dataset)
assert path

upload_url = self._api_url(f"job_files/resumable_upload?job_key={job_key}", use_key=False)
headers: Dict[str, str] = {}
my_client = client.TusClient(upload_url, headers=headers)

storage = None
metadata: Dict[str, str] = {}
t_file = tempfile.NamedTemporaryFile("w")
t_file.write("some initial text data")
t_file.flush()

input_path = t_file.name

uploader = my_client.uploader(input_path, metadata=metadata, url_storage=storage)
uploader.chunk_size = TEST_TUS_CHUNK_SIZE
uploader.upload()
upload_session_url = uploader.url
assert upload_session_url
tus_session_id = upload_session_url.rsplit("/", 1)[1]

data = {"path": path, "job_key": job_key, "session_id": tus_session_id}
post_url = self._api_url(f"jobs/{job_id}/files", use_key=False)
response = requests.post(post_url, data=data)
api_asserts.assert_status_code_is_ok(response)
assert open(path).read() == "some initial text data"

def test_write_protection(self):
job, _, _ = self.create_static_job_with_state("running")
job_id, job_key = self._api_job_keys(job)
Expand Down
53 changes: 53 additions & 0 deletions test/integration/test_job_files_tus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Integration tests for the Pulsar embedded runner with outputs written back to Galaxy via TUS."""

import os
import tempfile

from galaxy.util import safe_makedirs
from galaxy_test.driver import integration_util

JOB_CONF_TEMPLATE = """
runners:
local:
load: galaxy.jobs.runners.local:LocalJobRunner
pulsar_embed:
load: galaxy.jobs.runners.pulsar:PulsarEmbeddedJobRunner
pulsar_app_config:
tool_dependency_dir: none
conda_auto_init: false
conda_auto_install: false
execution:
default: pulsar_embed
environments:
local:
runner: local
pulsar_embed:
runner: pulsar_embed
default_file_action: remote_transfer_tus
tools:
- class: local
environment: local
"""


class EmbeddedPulsarTargetingTusIntegrationInstance(integration_util.IntegrationInstance):
"""Describe a Galaxy test instance with embedded pulsar configured to target job files tus."""

framework_tool_and_types = True

@classmethod
def handle_galaxy_config_kwds(cls, config):
jobs_directory = os.path.join(cls._test_driver.mkdtemp(), "pulsar_staging")
safe_makedirs(jobs_directory)
with tempfile.NamedTemporaryFile(suffix="_tus_job_conf.yml", mode="w", delete=False) as job_conf:
job_conf.write(JOB_CONF_TEMPLATE)
config["job_config_file"] = job_conf.name
infrastructure_url = "http://localhost:$GALAXY_WEB_PORT"
config["galaxy_infrastructure_url"] = infrastructure_url


instance = integration_util.integration_module_instance(EmbeddedPulsarTargetingTusIntegrationInstance)

test_tools = integration_util.integration_tool_runner(["simple_constructs", "composite_output_tests"])

0 comments on commit 460052a

Please sign in to comment.