Skip to content

Commit

Permalink
Merge pull request #496 from pepkit/dev_492_refactor_pipestat
Browse files Browse the repository at this point in the history
Issue 492 refactor pipestat
  • Loading branch information
donaldcampbelljr authored Jun 4, 2024
2 parents a0e1167 + affd8d4 commit 7c95e6e
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 255 deletions.
23 changes: 13 additions & 10 deletions looper/cli_pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from divvy import select_divvy_config

from .const import PipelineLevel
from . import __version__

from .command_models.arguments import ArgumentEnum
Expand Down Expand Up @@ -241,6 +242,14 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None):
selector_flag=subcommand_args.sel_flag,
exclusion_flag=subcommand_args.exc_flag,
) as prj:

# Check at the beginning if user wants to use pipestat and pipestat is configurable
is_pipestat_configured = (
prj._check_if_pipestat_configured(pipeline_type=PipelineLevel.PROJECT.value)
if getattr(args, "project", None)
else prj._check_if_pipestat_configured()
)

if subcommand_name in ["run", "rerun"]:
rerun = subcommand_name == "rerun"
run = Runner(prj)
Expand Down Expand Up @@ -271,32 +280,26 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None):
if subcommand_name == "destroy":
return Destroyer(prj)(subcommand_args)

use_pipestat = (
prj.pipestat_configured_project
if getattr(subcommand_args, "project", None)
else prj.pipestat_configured
)

if subcommand_name == "table":
if use_pipestat:
if is_pipestat_configured:
return Tabulator(prj)(subcommand_args)
else:
raise PipestatConfigurationException("table")

if subcommand_name == "report":
if use_pipestat:
if is_pipestat_configured:
return Reporter(prj)(subcommand_args)
else:
raise PipestatConfigurationException("report")

if subcommand_name == "link":
if use_pipestat:
if is_pipestat_configured:
Linker(prj)(subcommand_args)
else:
raise PipestatConfigurationException("link")

if subcommand_name == "check":
if use_pipestat:
if is_pipestat_configured:
return Checker(prj)(subcommand_args)
else:
raise PipestatConfigurationException("check")
Expand Down
9 changes: 7 additions & 2 deletions looper/command_models/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,13 @@ class ArgumentEnum(enum.Enum):
name="sample_pipeline_interfaces",
alias="-S",
default=(List, []),
description="Paths to looper sample config files",
description="Paths to looper sample pipeline interfaces",
)
PROJECT_PIPELINE_INTERFACES = Argument(
name="project_pipeline_interfaces",
alias="-P",
default=(List, []),
description="Paths to looper project config files",
description="Paths to looper project pipeline interfaces",
)
AMEND = Argument(
name="amend", default=(List, []), description="List of amendments to activate"
Expand Down Expand Up @@ -276,3 +276,8 @@ class ArgumentEnum(enum.Enum):
default=(bool, False),
description="Makes html report portable.",
)
PROJECT_LEVEL = Argument(
name="project",
default=(bool, False),
description="Is this command executed for project-level?",
)
1 change: 1 addition & 0 deletions looper/command_models/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def create_model(self) -> Type[pydantic.BaseModel]:
ArgumentEnum.PIPESTAT.value,
ArgumentEnum.SETTINGS.value,
ArgumentEnum.AMEND.value,
ArgumentEnum.PROJECT_LEVEL.value,
]

RunParser = Command(
Expand Down
27 changes: 16 additions & 11 deletions looper/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .exceptions import JobSubmissionException, SampleFailedException
from .processed_project import populate_sample_paths
from .utils import fetch_sample_flags, jinja_render_template_strictly
from .const import PipelineLevel


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,6 +91,13 @@ def write_pipestat_config(looper_pipestat_config_path, pipestat_config_dict):
:param dict pipestat_config_dict: the dict containing key value pairs to be written to the pipestat configutation
return bool
"""

if not os.path.exists(os.path.dirname(looper_pipestat_config_path)):
try:
os.makedirs(os.path.dirname(looper_pipestat_config_path))
except FileExistsError:
pass

with open(looper_pipestat_config_path, "w") as f:
yaml.dump(pipestat_config_dict, f)
_LOGGER.debug(
Expand Down Expand Up @@ -266,8 +274,12 @@ def is_project_submittable(self, force=False):
:param bool frorce: whether to force the project submission (ignore status/flags)
"""
psms = {}
if self.prj.pipestat_configured_project:
psm = self.prj.get_pipestat_managers(project_level=True)[self.pl_name]
for piface in self.prj.project_pipeline_interfaces:
if piface.psm.pipeline_type == PipelineLevel.PROJECT.value:
psms[piface.psm.pipeline_name] = piface.psm
psm = psms[self.pl_name]
status = psm.get_status()
if not force and status is not None:
_LOGGER.info(f"> Skipping project. Determined status: {status}")
Expand All @@ -293,12 +305,11 @@ def add_sample(self, sample, rerun=False):
)
)
if self.prj.pipestat_configured:
psms = self.prj.get_pipestat_managers(sample_name=sample.sample_name)
sample_statuses = psms[self.pl_name].get_status(
sample_statuses = self.pl_iface.psm.get_status(
record_identifier=sample.sample_name
)
if sample_statuses == "failed" and rerun is True:
psms[self.pl_name].set_status(
self.pl_iface.psm.set_status(
record_identifier=sample.sample_name, status_identifier="waiting"
)
sample_statuses = "waiting"
Expand Down Expand Up @@ -537,12 +548,7 @@ def _set_pipestat_namespace(
:return yacman.YAMLConfigManager: pipestat namespace
"""
try:
psms = (
self.prj.get_pipestat_managers(sample_name)
if sample_name
else self.prj.get_pipestat_managers(project_level=True)
)
psm = psms[self.pl_iface.pipeline_name]
psm = self.pl_iface.psm
except (PipestatError, AttributeError) as e:
# pipestat section faulty or not found in project.looper or sample
# or project is missing required pipestat attributes
Expand Down Expand Up @@ -630,7 +636,6 @@ def write_script(self, pool, size):
argstring = jinja_render_template_strictly(
template=templ, namespaces=namespaces
)
print(argstring)
except UndefinedError as jinja_exception:
_LOGGER.warning(NOT_SUB_MSG.format(str(jinja_exception)))
except KeyError as e:
Expand Down
8 changes: 8 additions & 0 deletions looper/const.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Shared project constants """

import os
from enum import Enum

__author__ = "Databio lab"
__email__ = "nathan@code.databio.org"
Expand Down Expand Up @@ -268,3 +269,10 @@ def _get_apperance_dict(type, templ=APPEARANCE_BY_FLAG):
"init-piface": "Initialize generic pipeline interface.",
"link": "Create directory of symlinks for reported results.",
}

# Add project/sample enum


class PipelineLevel(Enum):
SAMPLE = "sample"
PROJECT = "project"
Loading

0 comments on commit 7c95e6e

Please sign in to comment.