Skip to content

Commit

Permalink
Merge pull request #506 from pepkit/dev
Browse files Browse the repository at this point in the history
v1.9.0 release
  • Loading branch information
donaldcampbelljr authored Jun 26, 2024
2 parents 1c17c88 + 497ffdc commit 51a8ea7
Show file tree
Hide file tree
Showing 27 changed files with 322 additions and 139 deletions.
8 changes: 8 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) format.


## [1.9.0] -- 2024-06-26

### Added
- user can now add cli modifiers to looper config instead of PEP project [#270](https://github.com/pepkit/looper/issues/270)
- pipeline interfaces no longer must be nested under sample and project keys within looper config file [#465](https://github.com/pepkit/looper/issues/465)
- var_templates can now be hierarchical [#334](https://github.com/pepkit/looper/issues/334)
- looper can now gracefully halt spawned subprocesses when the user sends a keyboard interrupt [#37](https://github.com/pepkit/looper/issues/37)

## [1.8.1] -- 2024-06-06

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion looper/_version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = "1.8.1"
__version__ = "1.9.0"
# You must change the version in parser = pydantic2_argparse.ArgumentParser in cli_pydantic.py!!!
22 changes: 14 additions & 8 deletions looper/cli_pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
# with types.
from __future__ import annotations

import os
import sys

import logmuse
import pydantic2_argparse
import pydantic_argparse
import yaml
from eido import inspect_project
from pephubclient import PEPHubClient
from pydantic2_argparse.argparse.parser import ArgumentParser
from pydantic_argparse.argparse.parser import ArgumentParser

from divvy import select_divvy_config

from .const import PipelineLevel
from . import __version__

from .command_models.arguments import ArgumentEnum
Expand Down Expand Up @@ -151,8 +149,12 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None):
looper_config_dict = read_looper_dotfile()
_LOGGER.info(f"Using looper config ({looper_cfg_path}).")

cli_modifiers_dict = None
for looper_config_key, looper_config_item in looper_config_dict.items():
setattr(subcommand_args, looper_config_key, looper_config_item)
if looper_config_key == CLI_KEY:
cli_modifiers_dict = looper_config_item
else:
setattr(subcommand_args, looper_config_key, looper_config_item)

except OSError:
parser.print_help(sys.stderr)
Expand All @@ -168,7 +170,11 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None):
)

subcommand_args = enrich_args_via_cfg(
subcommand_name, subcommand_args, parser, test_args=test_args
subcommand_name,
subcommand_args,
parser,
test_args=test_args,
cli_modifiers=cli_modifiers_dict,
)

# If project pipeline interface defined in the cli, change name to: "pipeline_interface"
Expand Down Expand Up @@ -325,12 +331,12 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None):


def main(test_args=None) -> None:
parser = pydantic2_argparse.ArgumentParser(
parser = pydantic_argparse.ArgumentParser(
model=TopLevelParser,
prog="looper",
description="Looper: A job submitter for Portable Encapsulated Projects",
add_help=True,
version="1.8.1",
version="1.9.0",
)

parser = add_short_arguments(parser, ArgumentEnum)
Expand Down
2 changes: 1 addition & 1 deletion looper/command_models/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from ..const import MESSAGE_BY_SUBCOMMAND
from .arguments import Argument, ArgumentEnum
from pydantic2_argparse import ArgumentParser
from pydantic_argparse import ArgumentParser


@dataclass
Expand Down
115 changes: 106 additions & 9 deletions looper/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import logging
import os
import subprocess
import signal
import psutil
import sys
import time
import yaml
from math import ceil
from copy import copy, deepcopy
from json import loads
from subprocess import check_output
from typing import *
Expand All @@ -19,14 +21,18 @@
from peppy.const import CONFIG_KEY, SAMPLE_NAME_ATTR, SAMPLE_YAML_EXT
from peppy.exceptions import RemoteYAMLError
from pipestat import PipestatError
from ubiquerg import expandpath, is_command_callable
from ubiquerg import expandpath
from yaml import dump
from yacman import FutureYAMLConfigManager as YAMLConfigManager

from .const import *
from .exceptions import JobSubmissionException, SampleFailedException
from .exceptions import JobSubmissionException
from .processed_project import populate_sample_paths
from .utils import fetch_sample_flags, jinja_render_template_strictly
from .utils import (
fetch_sample_flags,
jinja_render_template_strictly,
expand_nested_var_templates,
)
from .const import PipelineLevel


Expand Down Expand Up @@ -189,6 +195,7 @@ def __init__(
the project level, rather that on the sample level)
"""
super(SubmissionConductor, self).__init__()

self.collate = collate
self.section_key = PROJECT_PL_KEY if self.collate else SAMPLE_PL_KEY
self.pl_iface = pipeline_interface
Expand All @@ -210,6 +217,7 @@ def __init__(
self._curr_size = 0
self._failed_sample_names = []
self._curr_skip_pool = []
self.process_id = None # this is used for currently submitted subprocess

if self.extra_pipe_args:
_LOGGER.debug(
Expand Down Expand Up @@ -392,6 +400,10 @@ def submit(self, force=False):
not for dry run)
"""
submitted = False

# Override signal handler so that Ctrl+C can be used to gracefully terminate child process
signal.signal(signal.SIGINT, self._signal_int_handler)

if not self._pool:
_LOGGER.debug("No submission (no pooled samples): %s", self.pl_name)
# submitted = False
Expand Down Expand Up @@ -420,9 +432,10 @@ def submit(self, force=False):
submission_command = "{} {}".format(sub_cmd, script)
# Capture submission command return value so that we can
# intercept and report basic submission failures; #167
try:
subprocess.check_call(submission_command, shell=True)
except subprocess.CalledProcessError:
process = subprocess.Popen(submission_command, shell=True)
self.process_id = process.pid
process.wait()
if process.returncode != 0:
fails = (
"" if self.collate else [s.sample_name for s in self._samples]
)
Expand Down Expand Up @@ -489,6 +502,87 @@ def _sample_lump_name(self, pool):
# name concordant with 1-based, not 0-based indexing.
return "lump{}".format(self._num_total_job_submissions + 1)

def _signal_int_handler(self, signal, frame):
"""
For catching interrupt (Ctrl +C) signals. Fails gracefully.
"""
signal_type = "SIGINT"
self._generic_signal_handler(signal_type)

def _generic_signal_handler(self, signal_type):
"""
Function for handling both SIGTERM and SIGINT
"""
message = "Received " + signal_type + ". Failing gracefully..."
_LOGGER.warning(msg=message)

self._terminate_current_subprocess()

sys.exit(1)

def _terminate_current_subprocess(self):
"""This terminates the current sub process associated with self.process_id"""

def pskill(proc_pid, sig=signal.SIGINT):
parent_process = psutil.Process(proc_pid)
for child_proc in parent_process.children(recursive=True):
child_proc.send_signal(sig)
parent_process.send_signal(sig)

if self.process_id is None:
return

# Gently wait for the subprocess before attempting to kill it
sys.stdout.flush()
still_running = self._attend_process(psutil.Process(self.process_id), 0)
sleeptime = 0.25
time_waiting = 0

while still_running and time_waiting < 3:
try:
if time_waiting > 2:
pskill(self.process_id, signal.SIGKILL)
elif time_waiting > 1:
pskill(self.process_id, signal.SIGTERM)
else:
pskill(self.process_id, signal.SIGINT)

except OSError:
# This would happen if the child process ended between the check
# and the next kill step
still_running = False
time_waiting = time_waiting + sleeptime

# Now see if it's still running
time_waiting = time_waiting + sleeptime
if not self._attend_process(psutil.Process(self.process_id), sleeptime):
still_running = False

if still_running:
_LOGGER.warning(f"Unable to halt child process: {self.process_id}")
else:
if time_waiting > 0:
note = f"terminated after {time_waiting} sec"
else:
note = "was already terminated"
_LOGGER.warning(msg=f"Child process {self.process_id} {note}.")

def _attend_process(self, proc, sleeptime):
"""
Waits on a process for a given time to see if it finishes, returns True
if it's still running after the given time or False as soon as it
returns.
:param psutil.Process proc: Process object opened by psutil.Popen()
:param float sleeptime: Time to wait
:return bool: True if process is still running; otherwise false
"""
try:
proc.wait(timeout=int(sleeptime))
except psutil.TimeoutExpired:
return True
return False

def _jobname(self, pool):
"""Create the name for a job submission."""
return "{}_{}".format(self.pl_iface.pipeline_name, self._sample_lump_name(pool))
Expand Down Expand Up @@ -563,6 +657,7 @@ def _set_pipestat_namespace(
"results_file": psm.file,
"record_identifier": psm.record_identifier,
"config_file": psm.config_path,
"output_schema": psm.cfg["_schema_path"],
}
filtered_namespace = {k: v for k, v in full_namespace.items() if v}
return YAMLConfigManager(filtered_namespace)
Expand Down Expand Up @@ -626,8 +721,10 @@ def write_script(self, pool, size):
_LOGGER.debug(f"namespace pipelines: { pl_iface }")

namespaces["pipeline"]["var_templates"] = pl_iface[VAR_TEMPL_KEY] or {}
for k, v in namespaces["pipeline"]["var_templates"].items():
namespaces["pipeline"]["var_templates"][k] = expandpath(v)

namespaces["pipeline"]["var_templates"] = expand_nested_var_templates(
namespaces["pipeline"]["var_templates"], namespaces
)

# pre_submit hook namespace updates
namespaces = _exec_pre_submit(pl_iface, namespaces)
Expand Down
1 change: 1 addition & 0 deletions looper/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
"DEBUG_EIDO_VALIDATION",
"LOOPER_GENERIC_OUTPUT_SCHEMA",
"LOOPER_GENERIC_COUNT_LINES",
"PipelineLevel",
]

FLAGS = ["completed", "running", "failed", "waiting", "partial"]
Expand Down
6 changes: 0 additions & 6 deletions looper/divvy.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
""" Computing configuration representation """

import logging
import logmuse
import os
import sys
import shutil
import yaml


from shutil import copytree
from yacman import FutureYAMLConfigManager as YAMLConfigManager
from yacman import write_lock, FILEPATH_KEY, load_yaml, select_config
from yaml import SafeLoader
from ubiquerg import is_writable, VersionInHelpParser


from .const import (
Expand All @@ -24,7 +19,6 @@
)
from .utils import write_submit_script

# from . import __version__

_LOGGER = logging.getLogger(__name__)

Expand Down
2 changes: 0 additions & 2 deletions looper/looper.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@
from rich.console import Console
from rich.table import Table
from ubiquerg.cli_tools import query_yes_no
from ubiquerg.collection import uniqify


from .conductor import SubmissionConductor

from .exceptions import *
from .const import *
from .pipeline_interface import PipelineInterface
from .project import Project
from .utils import (
desired_samples_range_skipped,
Expand Down
5 changes: 2 additions & 3 deletions looper/pipeline_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
InvalidResourceSpecificationException,
PipelineInterfaceConfigError,
)
from .utils import jinja_render_template_strictly
from .utils import render_nested_var_templates

__author__ = "Michal Stolarczyk"
__email__ = "michal@virginia.edu"
Expand Down Expand Up @@ -89,8 +89,7 @@ def render_var_templates(self, namespaces):
var_templates = {}
if curr_data:
var_templates.update(curr_data)
for k, v in var_templates.items():
var_templates[k] = jinja_render_template_strictly(v, namespaces)
var_templates = render_nested_var_templates(var_templates, namespaces)
return var_templates

def get_pipeline_schemas(self, schema_key=INPUT_SCHEMA_KEY):
Expand Down
Loading

0 comments on commit 51a8ea7

Please sign in to comment.