Skip to content

Commit

Permalink
Merge branch 'release_23.1' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Aug 26, 2023
2 parents a709456 + 8d5feb4 commit b9ae7a1
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 20 deletions.
2 changes: 1 addition & 1 deletion lib/galaxy/dependencies/conditional-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ldap3==2.9.1
python-pam
galaxycloudrunner
pkce
total-perspective-vortex>=2.2.4,<3
total-perspective-vortex>=2.3.2,<3

# For file sources plugins
fs.webdavfs>=0.4.2 # type: webdav
Expand Down
7 changes: 4 additions & 3 deletions lib/galaxy/job_execution/output_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,18 +732,19 @@ def collect_extra_files(object_store, dataset, job_working_directory):
# Fall back to working dir, remove in 23.2
output_location = "working"
temp_file_path = os.path.join(job_working_directory, output_location, file_name)
extra_dir = None
if not os.path.exists(temp_file_path):
# no outputs to working directory, but may still need to push form cache to backend
temp_file_path = dataset.extra_files_path
try:
# This skips creation of directories - object store
# automatically creates them. However, empty directories will
# not be created in the object store at all, which might be a
# problem.
for root, _dirs, files in os.walk(temp_file_path):
extra_dir = root.replace(os.path.join(job_working_directory, output_location), "", 1).lstrip(os.path.sep)
for f in files:
object_store.update_from_file(
dataset.dataset,
extra_dir=extra_dir,
extra_dir=os.path.normpath(os.path.join(file_name, os.path.relpath(root, temp_file_path))),
alt_name=f,
file_name=os.path.join(root, f),
create=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ GALAXY_LIB="$galaxy_lib"
_galaxy_setup_environment "$PRESERVE_GALAXY_ENVIRONMENT"
export _GALAXY_JOB_HOME_DIR
export _GALAXY_JOB_TMP_DIR

TEMP="${TEMP:-$TMP}"
TMPDIR="${TMPDIR:-$TMP}"

TMP="${TMP:-$TEMP}"
TMPDIR="${TMPDIR:-$TEMP}"

TMP="${TMP:-$TMPDIR}"
TEMP="${TEMP:-$TMPDIR}"

TMP="${TMP:-$_GALAXY_JOB_TMP_DIR}"
TEMP="${TEMP:-$_GALAXY_JOB_TMP_DIR}"
TMPDIR="${TMPDIR:-$_GALAXY_JOB_TMP_DIR}"

export TMP
export TEMP
export TMPDIR

GALAXY_PYTHON=`command -v python`
cd $working_directory
$memory_statement
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ def _workflow_to_dict_run(self, trans, stored, workflow, history=None):
inputs = step.module.get_runtime_inputs(connections=step.output_connections)
step_model = {"inputs": [input.to_dict(trans) for input in inputs.values()]}
step_model["when"] = step.when_expression
step_model["replacement_parameters"] = step.module.get_replacement_parameters(step)
step_model["replacement_parameters"] = step.module.get_informal_replacement_parameters(step)
step_model["step_type"] = step.type
step_model["step_label"] = step.label
step_model["step_name"] = step.module.get_name()
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/metadata/set_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ def write_job_metadata(tool_job_working_directory, job_metadata, set_meta, tool_
new_dataset = Dataset(id=-i, external_filename=new_dataset_filename)
extra_files = file_dict.get("extra_files", None)
if extra_files is not None:
new_dataset._extra_files_path = os.path.join(tool_job_working_directory, "working", extra_files)
new_dataset._extra_files_path = os.path.join(tool_job_working_directory, "outputs", extra_files)
new_dataset.state = new_dataset.states.OK
new_dataset_instance = HistoryDatasetAssociation(
id=-i, dataset=new_dataset, extension=file_dict.get("ext", "data")
Expand Down
18 changes: 17 additions & 1 deletion lib/galaxy/objectstore/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@
logging.getLogger("boto").setLevel(logging.INFO) # Otherwise boto is quite noisy


def download_directory(bucket, remote_folder, local_path):
# List objects in the specified S3 folder
objects = bucket.list(prefix=remote_folder)

for obj in objects:
remote_file_path = obj.key
local_file_path = os.path.join(local_path, os.path.relpath(remote_file_path, remote_folder))

# Create directories if they don't exist
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

# Download the file
obj.get_contents_to_filename(local_file_path)


def parse_config_xml(config_xml):
try:
a_xml = config_xml.findall("auth")[0]
Expand Down Expand Up @@ -652,7 +667,8 @@ def _get_filename(self, obj, **kwargs):
return cache_path
# Check if the file exists in persistent storage and, if it does, pull it into cache
elif self._exists(obj, **kwargs):
if dir_only: # Directories do not get pulled into cache
if dir_only:
download_directory(self._bucket, rel_path, cache_path)
return cache_path
else:
if self._pull_into_cache(rel_path):
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/tool_util/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,8 +1174,10 @@ def write_bundle(
out_data: Dict[str, OutputDataset],
bundle_description: DataTableBundleProcessorDescription,
repo_info: Optional[RepoInfo],
) -> None:
) -> Dict[str, OutputDataset]:
"""Writes bundle and returns bundle path."""
data_manager_dict = _data_manager_dict(out_data, ensure_single_output=True)
bundle_datasets: Dict[str, OutputDataset] = {}
for output_name, dataset in out_data.items():
if dataset.ext != "data_manager_json":
continue
Expand All @@ -1190,6 +1192,8 @@ def write_bundle(
bundle_path = os.path.join(extra_files_path, BUNDLE_INDEX_FILE_NAME)
with open(bundle_path, "w") as fw:
json.dump(bundle.dict(), fw)
bundle_datasets[bundle_path] = dataset
return bundle_datasets


SUPPORTED_DATA_TABLE_TYPES = TabularToolDataTable
Expand Down
12 changes: 11 additions & 1 deletion lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3090,7 +3090,17 @@ def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, fina
elif data_manager_mode == "dry_run":
pass
elif data_manager_mode == "bundle":
data_manager.write_bundle(out_data)
for bundle_path, dataset in data_manager.write_bundle(out_data).items():
dataset = cast(model.HistoryDatasetAssociation, dataset)
dataset.dataset.object_store.update_from_file(
dataset.dataset,
extra_dir=dataset.dataset.extra_files_path_name,
file_name=bundle_path,
alt_name=os.path.basename(bundle_path),
create=True,
preserve_symlinks=True,
)

else:
raise Exception("Unknown data manager mode encountered type...")

Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/tools/data_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ def process_result(self, out_data: Dict[str, OutputDataset]) -> None:
def write_bundle(
self,
out_data: Dict[str, OutputDataset],
) -> None:
):
tool_data_tables = self.data_managers.app.tool_data_tables
tool_data_tables.write_bundle(
return tool_data_tables.write_bundle(
out_data,
self.processor_description,
self.repo_info,
Expand Down
31 changes: 23 additions & 8 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,11 @@ def recover_mapping(self, invocation_step, progress):

progress.set_step_outputs(invocation_step, outputs, already_persisted=True)

def get_replacement_parameters(self, step):
"""Return a list of replacement parameters."""
def get_informal_replacement_parameters(self, step) -> List[str]:
"""Return a list of informal replacement parameters.
If replacement is handled via formal workflow inputs - do not include it in this list.
"""

return []

Expand Down Expand Up @@ -832,14 +835,22 @@ def callback(input, prefixed_name, prefixed_label, value=None, **kwds):

return inputs

def get_replacement_parameters(self, step):
def get_informal_replacement_parameters(self, step) -> List[str]:
"""Return a list of replacement parameters."""
replacement_parameters = set()

formal_parameters = set()
for step in self.subworkflow.input_steps:
if step.label:
formal_parameters.add(step.label)

for subworkflow_step in self.subworkflow.steps:
module = subworkflow_step.module
assert module
for replacement_parameter in module.get_replacement_parameters(subworkflow_step):
replacement_parameters.add(replacement_parameter)

for replacement_parameter in module.get_informal_replacement_parameters(subworkflow_step):
if replacement_parameter not in formal_parameters:
replacement_parameters.add(replacement_parameter)

return list(replacement_parameters)

Expand Down Expand Up @@ -2255,7 +2266,9 @@ def callback(input, prefixed_name, **kwargs):
invocation_step=invocation_step,
max_num_jobs=max_num_jobs,
validate_outputs=validate_outputs,
job_callback=lambda job: self._handle_post_job_actions(step, job, progress.replacement_dict),
job_callback=lambda job: self._handle_post_job_actions(
step, job, progress.effective_replacement_dict()
),
completed_jobs=completed_jobs,
workflow_resource_parameters=resource_parameters,
)
Expand All @@ -2279,7 +2292,9 @@ def callback(input, prefixed_name, **kwargs):
step_inputs = mapping_params.param_template
step_inputs.update(collection_info.collections)

self._handle_mapped_over_post_job_actions(step, step_inputs, step_outputs, progress.replacement_dict)
self._handle_mapped_over_post_job_actions(
step, step_inputs, step_outputs, progress.effective_replacement_dict()
)
if execution_tracker.execution_errors:
# TODO: formalize into InvocationFailure ?
message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}"
Expand Down Expand Up @@ -2341,7 +2356,7 @@ def __to_pja(self, key, value, step):
action_arguments = None
return PostJobAction(value["action_type"], step, output_name, action_arguments)

def get_replacement_parameters(self, step):
def get_informal_replacement_parameters(self, step) -> List[str]:
"""Return a list of replacement parameters."""
replacement_parameters = set()
for pja in step.post_job_actions:
Expand Down
12 changes: 12 additions & 0 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def __init__(
self.copy_inputs_to_history = copy_inputs_to_history
self.use_cached_job = use_cached_job
self.replacement_dict = replacement_dict or {}
self.runtime_replacements: Dict[str, str] = {}
self.subworkflow_collection_info = subworkflow_collection_info
self.subworkflow_structure = subworkflow_collection_info.structure if subworkflow_collection_info else None
self.when_values = when_values
Expand Down Expand Up @@ -547,8 +548,19 @@ def set_outputs_for_input(
elif step_id in self.inputs_by_step_id:
outputs["output"] = self.inputs_by_step_id[step_id]

if step.label and step.type == "parameter_input" and "output" in outputs:
self.runtime_replacements[step.label] = str(outputs["output"])
self.set_step_outputs(invocation_step, outputs, already_persisted=already_persisted)

def effective_replacement_dict(self):
replacement_dict = {}
for key, value in self.replacement_dict.items():
replacement_dict[key] = value
for key, value in self.runtime_replacements.items():
if key not in replacement_dict:
replacement_dict[key] = value
return replacement_dict

def set_step_outputs(
self, invocation_step: WorkflowInvocationStep, outputs: Dict[str, Any], already_persisted: bool = False
) -> None:
Expand Down
39 changes: 39 additions & 0 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,45 @@ def test_placements_from_text_inputs(self):
details = self.dataset_populator.get_history_dataset_details(history_id)
assert details["name"] == "moocow name 2", details["name"]

def test_placements_from_text_inputs_nested(self):
with self.dataset_populator.test_history() as history_id:
run_def = """
class: GalaxyWorkflow
inputs:
replacemeouter: text
steps:
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
replacemeinner: text
outputs:
workflow_output_1:
outputSource: create_2/out_file1
workflow_output_2:
outputSource: create_2/out_file2
steps:
create_2:
tool_id: create_2
state:
sleep_time: 0
outputs:
out_file1:
rename: "${replacemeinner} name"
out_file2:
rename: "${replacemeinner} name 2"
in:
replacemeinner: replacemeouter
test_data:
replacemeouter:
value: moocow
type: raw
"""
self._run_jobs(run_def, history_id=history_id)
details = self.dataset_populator.get_history_dataset_details(history_id)
assert details["name"] == "moocow name 2", details["name"]

@skip_without_tool("random_lines1")
def test_run_runtime_parameters_after_pause(self):
with self.dataset_populator.test_history() as history_id:
Expand Down
8 changes: 7 additions & 1 deletion test/integration/test_tool_data_bundles.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import shutil

from galaxy.util.compression_utils import decompress_bytes_to_directory
from .objectstore._base import BaseSwiftObjectStoreIntegrationTestCase
from .test_tool_data_delete import DataManagerIntegrationTestCase


class TestDataBundlesIntegration(DataManagerIntegrationTestCase):
class TestDataBundlesIntegration(BaseSwiftObjectStoreIntegrationTestCase, DataManagerIntegrationTestCase):
def test_admin_build_data_bundle_by_uri(self):
original_count = self._testbeta_field_count()

Expand All @@ -24,10 +26,14 @@ def test_admin_build_data_bundle_by_uri(self):
post_job_count = self._testbeta_field_count()
assert original_count == post_job_count

shutil.rmtree(self.object_store_cache_path)
os.makedirs(self.object_store_cache_path)

content = self.dataset_populator.get_history_dataset_content(
history_id, to_ext="data_manager_json", type="bytes"
)
temp_directory = decompress_bytes_to_directory(content)
assert os.path.exists(os.path.join(temp_directory, "newvalue.txt"))
uri = f"file://{os.path.normpath(temp_directory)}"
data = {
"source": {
Expand Down

0 comments on commit b9ae7a1

Please sign in to comment.