From 0f17d916a3a64ada947b5c33e73d671db458c8e0 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Thu, 24 Aug 2023 13:44:11 -0400 Subject: [PATCH] Fix mapping parent workflow formal inputs to subworkflow informal replacement parameters. --- lib/galaxy/managers/workflows.py | 2 +- lib/galaxy/workflow/modules.py | 27 +++++++++++++++++++-------- lib/galaxy/workflow/run.py | 12 ++++++++++++ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index a268c4ce73c4..4d84e834cd85 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -948,7 +948,7 @@ def _workflow_to_dict_run(self, trans, stored, workflow, history=None): inputs = step.module.get_runtime_inputs(connections=step.output_connections) step_model = {"inputs": [input.to_dict(trans) for input in inputs.values()]} step_model["when"] = step.when_expression - step_model["replacement_parameters"] = step.module.get_replacement_parameters(step) + step_model["replacement_parameters"] = step.module.get_informal_replacement_parameters(step) step_model["step_type"] = step.type step_model["step_label"] = step.label step_model["step_name"] = step.module.get_name() diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index c373d148da9f..cd72d2af9cb2 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -497,8 +497,11 @@ def recover_mapping(self, invocation_step, progress): progress.set_step_outputs(invocation_step, outputs, already_persisted=True) - def get_replacement_parameters(self, step): - """Return a list of replacement parameters.""" + def get_informal_replacement_parameters(self, step) -> List[str]: + """Return a list of informal replacement parameters. + + If replacement is handled via formal workflow inputs - do not include it in this list. + """ return [] @@ -832,14 +835,22 @@ def callback(input, prefixed_name, prefixed_label, value=None, **kwds): return inputs - def get_replacement_parameters(self, step): + def get_informal_replacement_parameters(self, step) -> List[str]: """Return a list of replacement parameters.""" replacement_parameters = set() + + formal_parameters = set() + for step in self.subworkflow.input_steps: + if step.label: + formal_parameters.add(step.label) + for subworkflow_step in self.subworkflow.steps: module = subworkflow_step.module assert module - for replacement_parameter in module.get_replacement_parameters(subworkflow_step): - replacement_parameters.add(replacement_parameter) + + for replacement_parameter in module.get_informal_replacement_parameters(subworkflow_step): + if replacement_parameter not in formal_parameters: + replacement_parameters.add(replacement_parameter) return list(replacement_parameters) @@ -2256,7 +2267,7 @@ def callback(input, prefixed_name, **kwargs): invocation_step=invocation_step, max_num_jobs=max_num_jobs, validate_outputs=validate_outputs, - job_callback=lambda job: self._handle_post_job_actions(step, job, progress.replacement_dict), + job_callback=lambda job: self._handle_post_job_actions(step, job, progress.effective_replacement_dict()), completed_jobs=completed_jobs, workflow_resource_parameters=resource_parameters, ) @@ -2280,7 +2291,7 @@ def callback(input, prefixed_name, **kwargs): step_inputs = mapping_params.param_template step_inputs.update(collection_info.collections) - self._handle_mapped_over_post_job_actions(step, step_inputs, step_outputs, progress.replacement_dict) + self._handle_mapped_over_post_job_actions(step, step_inputs, step_outputs, progress.effective_replacement_dict()) if execution_tracker.execution_errors: # TODO: formalize into InvocationFailure ? message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}" @@ -2342,7 +2353,7 @@ def __to_pja(self, key, value, step): action_arguments = None return PostJobAction(value["action_type"], step, output_name, action_arguments) - def get_replacement_parameters(self, step): + def get_informal_replacement_parameters(self, step) -> List[str]: """Return a list of replacement parameters.""" replacement_parameters = set() for pja in step.post_job_actions: diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index cf82b2a2c710..6ddc56427672 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -353,6 +353,7 @@ def __init__( self.copy_inputs_to_history = copy_inputs_to_history self.use_cached_job = use_cached_job self.replacement_dict = replacement_dict or {} + self.runtime_replacements: Dict[str, str] = {} self.subworkflow_collection_info = subworkflow_collection_info self.subworkflow_structure = subworkflow_collection_info.structure if subworkflow_collection_info else None self.when_values = when_values @@ -547,8 +548,19 @@ def set_outputs_for_input( elif step_id in self.inputs_by_step_id: outputs["output"] = self.inputs_by_step_id[step_id] + if step.label and step.type == "parameter_input" and "output" in outputs: + self.runtime_replacements[step.label] = str(outputs["output"]) self.set_step_outputs(invocation_step, outputs, already_persisted=already_persisted) + def effective_replacement_dict(self): + replacement_dict = {} + for key, value in self.replacement_dict.items(): + replacement_dict[key] = value + for key, value in self.runtime_replacements.items(): + if key not in replacement_dict: + replacement_dict[key] = value + return replacement_dict + def set_step_outputs( self, invocation_step: WorkflowInvocationStep, outputs: Dict[str, Any], already_persisted: bool = False ) -> None: