Skip to content

Commit

Permalink
Fix mapping parent workflow formal inputs to subworkflow informal rep…
Browse files Browse the repository at this point in the history
…lacement parameters.
  • Loading branch information
jmchilton committed Aug 24, 2023
1 parent 3af41be commit eea5fb1
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 9 deletions.
2 changes: 1 addition & 1 deletion lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ def _workflow_to_dict_run(self, trans, stored, workflow, history=None):
inputs = step.module.get_runtime_inputs(connections=step.output_connections)
step_model = {"inputs": [input.to_dict(trans) for input in inputs.values()]}
step_model["when"] = step.when_expression
step_model["replacement_parameters"] = step.module.get_replacement_parameters(step)
step_model["replacement_parameters"] = step.module.get_informal_replacement_parameters(step)
step_model["step_type"] = step.type
step_model["step_label"] = step.label
step_model["step_name"] = step.module.get_name()
Expand Down
31 changes: 23 additions & 8 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,11 @@ def recover_mapping(self, invocation_step, progress):

progress.set_step_outputs(invocation_step, outputs, already_persisted=True)

def get_replacement_parameters(self, step):
"""Return a list of replacement parameters."""
def get_informal_replacement_parameters(self, step) -> List[str]:
"""Return a list of informal replacement parameters.
If replacement is handled via formal workflow inputs - do not include it in this list.
"""

return []

Expand Down Expand Up @@ -832,14 +835,22 @@ def callback(input, prefixed_name, prefixed_label, value=None, **kwds):

return inputs

def get_replacement_parameters(self, step):
def get_informal_replacement_parameters(self, step) -> List[str]:
"""Return a list of replacement parameters."""
replacement_parameters = set()

formal_parameters = set()
for step in self.subworkflow.input_steps:
if step.label:
formal_parameters.add(step.label)

for subworkflow_step in self.subworkflow.steps:
module = subworkflow_step.module
assert module
for replacement_parameter in module.get_replacement_parameters(subworkflow_step):
replacement_parameters.add(replacement_parameter)

for replacement_parameter in module.get_informal_replacement_parameters(subworkflow_step):
if replacement_parameter not in formal_parameters:
replacement_parameters.add(replacement_parameter)

return list(replacement_parameters)

Expand Down Expand Up @@ -2256,7 +2267,9 @@ def callback(input, prefixed_name, **kwargs):
invocation_step=invocation_step,
max_num_jobs=max_num_jobs,
validate_outputs=validate_outputs,
job_callback=lambda job: self._handle_post_job_actions(step, job, progress.replacement_dict),
job_callback=lambda job: self._handle_post_job_actions(
step, job, progress.effective_replacement_dict()
),
completed_jobs=completed_jobs,
workflow_resource_parameters=resource_parameters,
)
Expand All @@ -2280,7 +2293,9 @@ def callback(input, prefixed_name, **kwargs):
step_inputs = mapping_params.param_template
step_inputs.update(collection_info.collections)

self._handle_mapped_over_post_job_actions(step, step_inputs, step_outputs, progress.replacement_dict)
self._handle_mapped_over_post_job_actions(
step, step_inputs, step_outputs, progress.effective_replacement_dict()
)
if execution_tracker.execution_errors:
# TODO: formalize into InvocationFailure ?
message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}"
Expand Down Expand Up @@ -2342,7 +2357,7 @@ def __to_pja(self, key, value, step):
action_arguments = None
return PostJobAction(value["action_type"], step, output_name, action_arguments)

def get_replacement_parameters(self, step):
def get_informal_replacement_parameters(self, step) -> List[str]:
"""Return a list of replacement parameters."""
replacement_parameters = set()
for pja in step.post_job_actions:
Expand Down
12 changes: 12 additions & 0 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def __init__(
self.copy_inputs_to_history = copy_inputs_to_history
self.use_cached_job = use_cached_job
self.replacement_dict = replacement_dict or {}
self.runtime_replacements: Dict[str, str] = {}
self.subworkflow_collection_info = subworkflow_collection_info
self.subworkflow_structure = subworkflow_collection_info.structure if subworkflow_collection_info else None
self.when_values = when_values
Expand Down Expand Up @@ -547,8 +548,19 @@ def set_outputs_for_input(
elif step_id in self.inputs_by_step_id:
outputs["output"] = self.inputs_by_step_id[step_id]

if step.label and step.type == "parameter_input" and "output" in outputs:
self.runtime_replacements[step.label] = str(outputs["output"])
self.set_step_outputs(invocation_step, outputs, already_persisted=already_persisted)

def effective_replacement_dict(self):
replacement_dict = {}
for key, value in self.replacement_dict.items():
replacement_dict[key] = value
for key, value in self.runtime_replacements.items():
if key not in replacement_dict:
replacement_dict[key] = value
return replacement_dict

def set_step_outputs(
self, invocation_step: WorkflowInvocationStep, outputs: Dict[str, Any], already_persisted: bool = False
) -> None:
Expand Down
39 changes: 39 additions & 0 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,45 @@ def test_placements_from_text_inputs(self):
details = self.dataset_populator.get_history_dataset_details(history_id)
assert details["name"] == "moocow name 2", details["name"]

def test_placements_from_text_inputs_nested(self):
with self.dataset_populator.test_history() as history_id:
run_def = """
class: GalaxyWorkflow
inputs:
replacemeouter: text
steps:
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
replacemeinner: text
outputs:
workflow_output_1:
outputSource: create_2/out_file1
workflow_output_2:
outputSource: create_2/out_file2
steps:
create_2:
tool_id: create_2
state:
sleep_time: 0
outputs:
out_file1:
rename: "${replacemeinner} name"
out_file2:
rename: "${replacemeinner} name 2"
in:
replacemeinner: replacemeouter
test_data:
replacemeouter:
value: moocow
type: raw
"""
self._run_jobs(run_def, history_id=history_id)
details = self.dataset_populator.get_history_dataset_details(history_id)
assert details["name"] == "moocow name 2", details["name"]

@skip_without_tool("random_lines1")
def test_run_runtime_parameters_after_pause(self):
with self.dataset_populator.test_history() as history_id:
Expand Down

0 comments on commit eea5fb1

Please sign in to comment.