Skip to content

Commit

Permalink
infer pipeline_type directly from interface #465
Browse files Browse the repository at this point in the history
  • Loading branch information
donaldcampbelljr committed Jun 21, 2024
1 parent 4bba529 commit 6ba543c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
1 change: 1 addition & 0 deletions looper/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
"LOOPER_GENERIC_OUTPUT_SCHEMA",
"LOOPER_GENERIC_COUNT_LINES",
"CLI_MODS_KEY",
"PipelineLevel",
]

FLAGS = ["completed", "running", "failed", "waiting", "partial"]
Expand Down
64 changes: 60 additions & 4 deletions looper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ubiquerg import convert_value, expandpath, parse_registry_path, deep_update
from pephubclient.constants import RegistryPath
from pydantic import ValidationError
from yacman import load_yaml

from .const import *
from .command_models.commands import SUPPORTED_COMMANDS
Expand Down Expand Up @@ -534,6 +535,33 @@ def initiate_looper_config(
return True


def determine_pipeline_type(piface_path: str, looper_config_path: str):
"""
Read pipeline interface from disk and determine if pipeline type is sample or project-level
:param str piface_path: path to pipeline_interface
:param str looper_config_path: path to looper config file
:return Tuple[Union[str,None],Union[str,None]] : (pipeline type, resolved path) or (None, None)
"""

if piface_path is None:
return None, None
piface_path = expandpath(piface_path)
if not os.path.isabs(piface_path):
piface_path = os.path.realpath(
os.path.join(os.path.dirname(looper_config_path), piface_path)
)
try:
piface_dict = load_yaml(piface_path)
except FileNotFoundError:
return None, None

pipeline_type = piface_dict.get("pipeline_type", None)

return pipeline_type, piface_path


def read_looper_config_file(looper_config_path: str) -> dict:
"""
Read Looper config file which includes:
Expand Down Expand Up @@ -581,11 +609,39 @@ def read_looper_config_file(looper_config_path: str) -> dict:
return_dict[CLI_MODS_KEY] = dp_data[CLI_MODS_KEY]

if PIPELINE_INTERFACES_KEY in dp_data:

dp_data.setdefault(PIPELINE_INTERFACES_KEY, {})
return_dict[SAMPLE_PL_ARG] = dp_data.get(PIPELINE_INTERFACES_KEY).get("sample")
return_dict[PROJECT_PL_ARG] = dp_data.get(PIPELINE_INTERFACES_KEY).get(
"project"
)

if isinstance(dp_data.get(PIPELINE_INTERFACES_KEY), dict) and (
dp_data.get(PIPELINE_INTERFACES_KEY).get("sample")
or dp_data.get(PIPELINE_INTERFACES_KEY).get("project")
):
# Support original nesting of pipeline interfaces under "sample" and "project"
return_dict[SAMPLE_PL_ARG] = dp_data.get(PIPELINE_INTERFACES_KEY).get(
"sample"
)
return_dict[PROJECT_PL_ARG] = dp_data.get(PIPELINE_INTERFACES_KEY).get(
"project"
)
else:
# infer pipeline type based from interface instead of nested keys: https://github.com/pepkit/looper/issues/465
all_pipeline_interfaces = dp_data.get(PIPELINE_INTERFACES_KEY)
sample_pifaces = []
project_pifaces = []
if isinstance(all_pipeline_interfaces, str):
all_pipeline_interfaces = [all_pipeline_interfaces]
for piface in all_pipeline_interfaces:
pipeline_type, piface_path = determine_pipeline_type(
piface, looper_config_path
)
if pipeline_type == PipelineLevel.SAMPLE.value:
sample_pifaces.append(piface_path)
elif pipeline_type == PipelineLevel.PROJECT.value:
project_pifaces.append(piface_path)
if len(sample_pifaces) > 0:
return_dict[SAMPLE_PL_ARG] = sample_pifaces
if len(project_pifaces) > 0:
return_dict[PROJECT_PL_ARG] = project_pifaces

else:
_LOGGER.warning(
Expand Down

0 comments on commit 6ba543c

Please sign in to comment.