diff --git a/looper/const.py b/looper/const.py index e25f911f..017aceb4 100644 --- a/looper/const.py +++ b/looper/const.py @@ -95,6 +95,7 @@ "LOOPER_GENERIC_OUTPUT_SCHEMA", "LOOPER_GENERIC_COUNT_LINES", "CLI_MODS_KEY", + "PipelineLevel", ] FLAGS = ["completed", "running", "failed", "waiting", "partial"] diff --git a/looper/utils.py b/looper/utils.py index 9cc8eeaa..6e052f96 100644 --- a/looper/utils.py +++ b/looper/utils.py @@ -17,6 +17,7 @@ from ubiquerg import convert_value, expandpath, parse_registry_path, deep_update from pephubclient.constants import RegistryPath from pydantic import ValidationError +from yacman import load_yaml from .const import * from .command_models.commands import SUPPORTED_COMMANDS @@ -534,6 +535,33 @@ def initiate_looper_config( return True +def determine_pipeline_type(piface_path: str, looper_config_path: str): + """ + Read pipeline interface from disk and determine if pipeline type is sample or project-level + + + :param str piface_path: path to pipeline_interface + :param str looper_config_path: path to looper config file + :return Tuple[Union[str,None],Union[str,None]] : (pipeline type, resolved path) or (None, None) + """ + + if piface_path is None: + return None, None + piface_path = expandpath(piface_path) + if not os.path.isabs(piface_path): + piface_path = os.path.realpath( + os.path.join(os.path.dirname(looper_config_path), piface_path) + ) + try: + piface_dict = load_yaml(piface_path) + except FileNotFoundError: + return None, None + + pipeline_type = piface_dict.get("pipeline_type", None) + + return pipeline_type, piface_path + + def read_looper_config_file(looper_config_path: str) -> dict: """ Read Looper config file which includes: @@ -581,11 +609,39 @@ def read_looper_config_file(looper_config_path: str) -> dict: return_dict[CLI_MODS_KEY] = dp_data[CLI_MODS_KEY] if PIPELINE_INTERFACES_KEY in dp_data: + dp_data.setdefault(PIPELINE_INTERFACES_KEY, {}) - return_dict[SAMPLE_PL_ARG] = dp_data.get(PIPELINE_INTERFACES_KEY).get("sample") - return_dict[PROJECT_PL_ARG] = dp_data.get(PIPELINE_INTERFACES_KEY).get( - "project" - ) + + if isinstance(dp_data.get(PIPELINE_INTERFACES_KEY), dict) and ( + dp_data.get(PIPELINE_INTERFACES_KEY).get("sample") + or dp_data.get(PIPELINE_INTERFACES_KEY).get("project") + ): + # Support original nesting of pipeline interfaces under "sample" and "project" + return_dict[SAMPLE_PL_ARG] = dp_data.get(PIPELINE_INTERFACES_KEY).get( + "sample" + ) + return_dict[PROJECT_PL_ARG] = dp_data.get(PIPELINE_INTERFACES_KEY).get( + "project" + ) + else: + # infer pipeline type based from interface instead of nested keys: https://github.com/pepkit/looper/issues/465 + all_pipeline_interfaces = dp_data.get(PIPELINE_INTERFACES_KEY) + sample_pifaces = [] + project_pifaces = [] + if isinstance(all_pipeline_interfaces, str): + all_pipeline_interfaces = [all_pipeline_interfaces] + for piface in all_pipeline_interfaces: + pipeline_type, piface_path = determine_pipeline_type( + piface, looper_config_path + ) + if pipeline_type == PipelineLevel.SAMPLE.value: + sample_pifaces.append(piface_path) + elif pipeline_type == PipelineLevel.PROJECT.value: + project_pifaces.append(piface_path) + if len(sample_pifaces) > 0: + return_dict[SAMPLE_PL_ARG] = sample_pifaces + if len(project_pifaces) > 0: + return_dict[PROJECT_PL_ARG] = project_pifaces else: _LOGGER.warning(