Skip to content

Commit

Permalink
Fix mapping parent workflow formal inputs to subworkflow informal rep…
Browse files Browse the repository at this point in the history
…lacement parameters.
  • Loading branch information
jmchilton committed Aug 24, 2023
1 parent 3af41be commit 098c9a5
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
26 changes: 19 additions & 7 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,12 @@ def recover_mapping(self, invocation_step, progress):

progress.set_step_outputs(invocation_step, outputs, already_persisted=True)

def get_replacement_parameters(self, step):
"""Return a list of replacement parameters."""
def get_replacement_parameters(self, step) -> List[str]:
"""Return a list of informal replacement parameters.
If replacement is handled via formal workflow inputs - do not include it in this list.
TODO: if this works, rename this method to get_informal_replacement_parameters.
"""

return []

Expand Down Expand Up @@ -832,14 +836,22 @@ def callback(input, prefixed_name, prefixed_label, value=None, **kwds):

return inputs

def get_replacement_parameters(self, step):
def get_replacement_parameters(self, step) -> List[str]:
"""Return a list of replacement parameters."""
replacement_parameters = set()

formal_parameters = set()
for step in self.subworkflow.input_steps:
if step.label:
formal_parameters.add(step.label)

for subworkflow_step in self.subworkflow.steps:
module = subworkflow_step.module
assert module

for replacement_parameter in module.get_replacement_parameters(subworkflow_step):
replacement_parameters.add(replacement_parameter)
if replacement_parameter not in formal_parameters:
replacement_parameters.add(replacement_parameter)

return list(replacement_parameters)

Expand Down Expand Up @@ -2256,7 +2268,7 @@ def callback(input, prefixed_name, **kwargs):
invocation_step=invocation_step,
max_num_jobs=max_num_jobs,
validate_outputs=validate_outputs,
job_callback=lambda job: self._handle_post_job_actions(step, job, progress.replacement_dict),
job_callback=lambda job: self._handle_post_job_actions(step, job, progress.effective_replacement_dict()),
completed_jobs=completed_jobs,
workflow_resource_parameters=resource_parameters,
)
Expand All @@ -2280,7 +2292,7 @@ def callback(input, prefixed_name, **kwargs):
step_inputs = mapping_params.param_template
step_inputs.update(collection_info.collections)

self._handle_mapped_over_post_job_actions(step, step_inputs, step_outputs, progress.replacement_dict)
self._handle_mapped_over_post_job_actions(step, step_inputs, step_outputs, progress.effective_replacement_dict())
if execution_tracker.execution_errors:
# TODO: formalize into InvocationFailure ?
message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}"
Expand Down Expand Up @@ -2342,7 +2354,7 @@ def __to_pja(self, key, value, step):
action_arguments = None
return PostJobAction(value["action_type"], step, output_name, action_arguments)

def get_replacement_parameters(self, step):
def get_replacement_parameters(self, step) -> List[str]:
"""Return a list of replacement parameters."""
replacement_parameters = set()
for pja in step.post_job_actions:
Expand Down
12 changes: 12 additions & 0 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def __init__(
self.copy_inputs_to_history = copy_inputs_to_history
self.use_cached_job = use_cached_job
self.replacement_dict = replacement_dict or {}
self.runtime_replacements = {}
self.subworkflow_collection_info = subworkflow_collection_info
self.subworkflow_structure = subworkflow_collection_info.structure if subworkflow_collection_info else None
self.when_values = when_values
Expand Down Expand Up @@ -547,8 +548,19 @@ def set_outputs_for_input(
elif step_id in self.inputs_by_step_id:
outputs["output"] = self.inputs_by_step_id[step_id]

if step.label and step.type == "parameter_input" and "output" in outputs:
self.runtime_replacements[step.label] = str(outputs["output"])
self.set_step_outputs(invocation_step, outputs, already_persisted=already_persisted)

def effective_replacement_dict(self):
replacement_dict = {}
for key, value in self.replacement_dict.items():
replacement_dict[key] = value
for key, value in self.runtime_replacements.items():
if key not in replacement_dict:
replacement_dict[key] = value
return replacement_dict

def set_step_outputs(
self, invocation_step: WorkflowInvocationStep, outputs: Dict[str, Any], already_persisted: bool = False
) -> None:
Expand Down

0 comments on commit 098c9a5

Please sign in to comment.