diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index a268c4ce73c4..4d84e834cd85 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -948,7 +948,7 @@ def _workflow_to_dict_run(self, trans, stored, workflow, history=None): inputs = step.module.get_runtime_inputs(connections=step.output_connections) step_model = {"inputs": [input.to_dict(trans) for input in inputs.values()]} step_model["when"] = step.when_expression - step_model["replacement_parameters"] = step.module.get_replacement_parameters(step) + step_model["replacement_parameters"] = step.module.get_informal_replacement_parameters(step) step_model["step_type"] = step.type step_model["step_label"] = step.label step_model["step_name"] = step.module.get_name() diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index c373d148da9f..02b6540e5da5 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -497,8 +497,11 @@ def recover_mapping(self, invocation_step, progress): progress.set_step_outputs(invocation_step, outputs, already_persisted=True) - def get_replacement_parameters(self, step): - """Return a list of replacement parameters.""" + def get_informal_replacement_parameters(self, step) -> List[str]: + """Return a list of informal replacement parameters. + + If replacement is handled via formal workflow inputs - do not include it in this list. + """ return [] @@ -832,14 +835,22 @@ def callback(input, prefixed_name, prefixed_label, value=None, **kwds): return inputs - def get_replacement_parameters(self, step): + def get_informal_replacement_parameters(self, step) -> List[str]: """Return a list of replacement parameters.""" replacement_parameters = set() + + formal_parameters = set() + for step in self.subworkflow.input_steps: + if step.label: + formal_parameters.add(step.label) + for subworkflow_step in self.subworkflow.steps: module = subworkflow_step.module assert module - for replacement_parameter in module.get_replacement_parameters(subworkflow_step): - replacement_parameters.add(replacement_parameter) + + for replacement_parameter in module.get_informal_replacement_parameters(subworkflow_step): + if replacement_parameter not in formal_parameters: + replacement_parameters.add(replacement_parameter) return list(replacement_parameters) @@ -2256,7 +2267,9 @@ def callback(input, prefixed_name, **kwargs): invocation_step=invocation_step, max_num_jobs=max_num_jobs, validate_outputs=validate_outputs, - job_callback=lambda job: self._handle_post_job_actions(step, job, progress.replacement_dict), + job_callback=lambda job: self._handle_post_job_actions( + step, job, progress.effective_replacement_dict() + ), completed_jobs=completed_jobs, workflow_resource_parameters=resource_parameters, ) @@ -2280,7 +2293,9 @@ def callback(input, prefixed_name, **kwargs): step_inputs = mapping_params.param_template step_inputs.update(collection_info.collections) - self._handle_mapped_over_post_job_actions(step, step_inputs, step_outputs, progress.replacement_dict) + self._handle_mapped_over_post_job_actions( + step, step_inputs, step_outputs, progress.effective_replacement_dict() + ) if execution_tracker.execution_errors: # TODO: formalize into InvocationFailure ? message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}" @@ -2342,7 +2357,7 @@ def __to_pja(self, key, value, step): action_arguments = None return PostJobAction(value["action_type"], step, output_name, action_arguments) - def get_replacement_parameters(self, step): + def get_informal_replacement_parameters(self, step) -> List[str]: """Return a list of replacement parameters.""" replacement_parameters = set() for pja in step.post_job_actions: diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index cf82b2a2c710..6ddc56427672 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -353,6 +353,7 @@ def __init__( self.copy_inputs_to_history = copy_inputs_to_history self.use_cached_job = use_cached_job self.replacement_dict = replacement_dict or {} + self.runtime_replacements: Dict[str, str] = {} self.subworkflow_collection_info = subworkflow_collection_info self.subworkflow_structure = subworkflow_collection_info.structure if subworkflow_collection_info else None self.when_values = when_values @@ -547,8 +548,19 @@ def set_outputs_for_input( elif step_id in self.inputs_by_step_id: outputs["output"] = self.inputs_by_step_id[step_id] + if step.label and step.type == "parameter_input" and "output" in outputs: + self.runtime_replacements[step.label] = str(outputs["output"]) self.set_step_outputs(invocation_step, outputs, already_persisted=already_persisted) + def effective_replacement_dict(self): + replacement_dict = {} + for key, value in self.replacement_dict.items(): + replacement_dict[key] = value + for key, value in self.runtime_replacements.items(): + if key not in replacement_dict: + replacement_dict[key] = value + return replacement_dict + def set_step_outputs( self, invocation_step: WorkflowInvocationStep, outputs: Dict[str, Any], already_persisted: bool = False ) -> None: diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 7cc9849dc947..a5e42bc562e4 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -2474,6 +2474,45 @@ def test_placements_from_text_inputs(self): details = self.dataset_populator.get_history_dataset_details(history_id) assert details["name"] == "moocow name 2", details["name"] + def test_placements_from_text_inputs_nested(self): + with self.dataset_populator.test_history() as history_id: + run_def = """ +class: GalaxyWorkflow +inputs: + replacemeouter: text +steps: + nested_workflow: + run: + class: GalaxyWorkflow + inputs: + replacemeinner: text + outputs: + workflow_output_1: + outputSource: create_2/out_file1 + workflow_output_2: + outputSource: create_2/out_file2 + steps: + create_2: + tool_id: create_2 + state: + sleep_time: 0 + outputs: + out_file1: + rename: "${replacemeinner} name" + out_file2: + rename: "${replacemeinner} name 2" + in: + replacemeinner: replacemeouter + +test_data: + replacemeouter: + value: moocow + type: raw +""" + self._run_jobs(run_def, history_id=history_id) + details = self.dataset_populator.get_history_dataset_details(history_id) + assert details["name"] == "moocow name 2", details["name"] + @skip_without_tool("random_lines1") def test_run_runtime_parameters_after_pause(self): with self.dataset_populator.test_history() as history_id: