Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add keyboard interrupt for subprocesses #499

Merged
merged 7 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 97 additions & 4 deletions looper/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import logging
import os
import subprocess
import signal
import psutil
import sys
import time
import yaml
from math import ceil
from copy import copy, deepcopy
from json import loads
from subprocess import check_output
from subprocess import check_output, PIPE
from typing import *

from eido import read_schema, get_input_files_size
Expand Down Expand Up @@ -189,6 +192,7 @@ def __init__(
the project level, rather that on the sample level)
"""
super(SubmissionConductor, self).__init__()

self.collate = collate
self.section_key = PROJECT_PL_KEY if self.collate else SAMPLE_PL_KEY
self.pl_iface = pipeline_interface
Expand All @@ -210,6 +214,7 @@ def __init__(
self._curr_size = 0
self._failed_sample_names = []
self._curr_skip_pool = []
self.process_id = None # this is used for currently submitted subprocess

if self.extra_pipe_args:
_LOGGER.debug(
Expand Down Expand Up @@ -392,6 +397,10 @@ def submit(self, force=False):
not for dry run)
"""
submitted = False

# Override signal handler so that Ctrl+C can be used to gracefully terminate child process
signal.signal(signal.SIGINT, self._signal_int_handler)

if not self._pool:
_LOGGER.debug("No submission (no pooled samples): %s", self.pl_name)
# submitted = False
Expand Down Expand Up @@ -420,9 +429,11 @@ def submit(self, force=False):
submission_command = "{} {}".format(sub_cmd, script)
# Capture submission command return value so that we can
# intercept and report basic submission failures; #167
try:
subprocess.check_call(submission_command, shell=True)
except subprocess.CalledProcessError:
process = subprocess.Popen(submission_command, stderr=PIPE, shell=True)
self.process_id = process.pid
output, errors = process.communicate()
_LOGGER.debug(msg=errors)
if process.returncode != 0:
fails = (
"" if self.collate else [s.sample_name for s in self._samples]
)
Expand Down Expand Up @@ -489,6 +500,87 @@ def _sample_lump_name(self, pool):
# name concordant with 1-based, not 0-based indexing.
return "lump{}".format(self._num_total_job_submissions + 1)

def _signal_int_handler(self, signal, frame):
"""
For catching interrupt (Ctrl +C) signals. Fails gracefully.
"""
signal_type = "SIGINT"
self._generic_signal_handler(signal_type)

def _generic_signal_handler(self, signal_type):
"""
Function for handling both SIGTERM and SIGINT
"""
message = "Received " + signal_type + ". Failing gracefully..."
_LOGGER.warning(msg=message)

self._terminate_current_subprocess()

sys.exit(1)

def _terminate_current_subprocess(self):
"""This terminates the current sub process associated with self.process_id"""

def pskill(proc_pid, sig=signal.SIGINT):
parent_process = psutil.Process(proc_pid)
for child_proc in parent_process.children(recursive=True):
child_proc.send_signal(sig)
parent_process.send_signal(sig)

if self.process_id is None:
return

# Gently wait for the subprocess before attempting to kill it
sys.stdout.flush()
still_running = self._attend_process(psutil.Process(self.process_id), 0)
sleeptime = 0.25
time_waiting = 0

while still_running and time_waiting < 3:
try:
if time_waiting > 2:
pskill(self.process_id, signal.SIGKILL)
elif time_waiting > 1:
pskill(self.process_id, signal.SIGTERM)
else:
pskill(self.process_id, signal.SIGINT)

except OSError:
# This would happen if the child process ended between the check
# and the next kill step
still_running = False
time_waiting = time_waiting + sleeptime

# Now see if it's still running
time_waiting = time_waiting + sleeptime
if not self._attend_process(psutil.Process(self.process_id), sleeptime):
still_running = False

if still_running:
_LOGGER.warning(f"Unable to halt child process: {self.process_id}")
else:
if time_waiting > 0:
note = f"terminated after {time_waiting} sec"
else:
note = "was already terminated"
_LOGGER.warning(msg=f"Child process {self.process_id} {note}.")

def _attend_process(self, proc, sleeptime):
"""
Waits on a process for a given time to see if it finishes, returns True
if it's still running after the given time or False as soon as it
returns.

:param psutil.Process proc: Process object opened by psutil.Popen()
:param float sleeptime: Time to wait
:return bool: True if process is still running; otherwise false
"""
try:
proc.wait(timeout=int(sleeptime))
except psutil.TimeoutExpired:
return True
return False

def _jobname(self, pool):
"""Create the name for a job submission."""
return "{}_{}".format(self.pl_iface.pipeline_name, self._sample_lump_name(pool))
Expand Down Expand Up @@ -563,6 +655,7 @@ def _set_pipestat_namespace(
"results_file": psm.file,
"record_identifier": psm.record_identifier,
"config_file": psm.config_path,
"output_schema": psm.cfg["_schema_path"],
}
filtered_namespace = {k: v for k, v in full_namespace.items() if v}
return YAMLConfigManager(filtered_namespace)
Expand Down
3 changes: 2 additions & 1 deletion requirements/requirements-all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pyyaml>=3.12
rich>=9.10.0
ubiquerg>=0.5.2
yacman==0.9.3
pydantic2-argparse>=0.9.2
pydantic2-argparse>=0.9.2
psutil
3 changes: 2 additions & 1 deletion requirements/requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pytest
pytest-cov
pytest-remotedata
veracitools
GitPython
GitPython
psutil
Empty file.
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ pipeline_name: PIPELINE1
pipeline_type: project
output_schema: output_schema.yaml
var_templates:
path: "{looper.piface_dir}/pipelines/col_pipeline1.py"
path: "{looper.piface_dir}/col_pipeline1.py"
command_template: >
{pipeline.var_templates.path} --project-name {project.name}
python3 {pipeline.var_templates.path} --project-name {project.name}


bioconductor:
readFunName: readData
readFunPath: readData.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ pipeline_type: sample
input_schema: https://schema.databio.org/pep/2.0.0.yaml
output_schema: output_schema.yaml
var_templates:
path: "{looper.piface_dir}/pipelines/pipeline1.py"
path: "{looper.piface_dir}/pipeline1.py"
pre_submit:
python_functions:
- looper.write_sample_yaml
command_template: >
{pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr}
python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr}


bioconductor:
readFunName: readData
readFunPath: readData.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ pipeline_name: OTHER_PIPELINE2
pipeline_type: project
output_schema: output_schema.yaml
var_templates:
path: "{looper.piface_dir}/pipelines/col_pipeline2.py"
path: "{looper.piface_dir}/col_pipeline2.py"
command_template: >
{pipeline.var_templates.path} --project-name {project.name}
python3 {pipeline.var_templates.path} --project-name {project.name}
compute:
size_dependent_variables: resources-project.tsv

bioconductor:
readFunName: readData
readFunPath: readData.R

Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@ pipeline_name: OTHER_PIPELINE2
pipeline_type: sample
output_schema: output_schema.yaml
var_templates:
path: "{looper.piface_dir}/pipelines/other_pipeline2.py"
path: "{looper.piface_dir}/other_pipeline2.py"
pre_submit:
python_functions:
- looper.write_sample_yaml
command_template: >
{pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr}
python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr}
compute:
size_dependent_variables: resources-sample.tsv

bioconductor:
readFunName: readData
readFunPath: readData.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ pipeline_type: sample
input_schema: https://schema.databio.org/pep/2.0.0.yaml
output_schema: pipestat_output_schema.yaml
var_templates:
path: "{looper.piface_dir}/pipelines/pipeline1.py"
path: "{looper.piface_dir}/pipeline1.py"
pre_submit:
python_functions:
- looper.write_sample_yaml
command_template: >
{pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr}
python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr}


bioconductor:
readFunName: readData
readFunPath: readData.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ pipeline_type: sample
input_schema: https://schema.databio.org/pep/2.0.0.yaml
output_schema: pipestat_output_schema.yaml
var_templates:
path: "{looper.piface_dir}/pipelines/other_pipeline2.py"
path: "{looper.piface_dir}/other_pipeline2.py"
pre_submit:
python_functions:
- looper.write_sample_yaml
command_template: >
{pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr}
python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr}
compute:
size_dependent_variables: resources-sample.tsv

bioconductor:
readFunName: readData
readFunPath: readData.R

Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os.path

import pipestat
import sys

Expand All @@ -8,14 +10,17 @@
] # this is the sample we wish to process by reading the number of lines
sample_name = sys.argv[2]
results_file = sys.argv[3]
schema_path = sys.argv[4]

# Create pipestat manager and then report values
psm = pipestat.PipestatManager(
schema_path="pipeline_pipestat/pipestat_output_schema.yaml",
schema_path=schema_path,
results_file_path=results_file,
record_identifier=sample_name,
)


text_file = os.path.abspath(text_file)
# Read text file and count lines
with open(text_file, "r") as f:
result = {"number_of_lines": len(f.readlines())}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ pipeline_name: example_pipestat_pipeline
pipeline_type: sample
output_schema: pipestat_output_schema.yaml
command_template: >
python {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file}
python3 {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file} {pipestat.output_schema}
24 changes: 24 additions & 0 deletions tests/smoketests/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,30 @@ def test_pipestat_rerun(self, prep_temp_pep_pipestat, pipeline_name, flags):
"""Verify that rerun works with either failed or waiting flags"""
tp = prep_temp_pep_pipestat
_make_flags_pipestat(tp, flags, pipeline_name)
path_to_looper_config = prep_temp_pep_pipestat
pipestat_dir = os.path.dirname(path_to_looper_config)

# open up the project config and replace the derived attributes with the path to the data. In a way, this simulates using the environment variables.
pipestat_project_file = get_project_config_path(path_to_looper_config)

pipestat_pipeline_interface_file = os.path.join(
pipestat_dir, "pipeline_pipestat/pipeline_interface.yaml"
)

with open(pipestat_project_file, "r") as f:
pipestat_project_data = safe_load(f)

pipestat_project_data["sample_modifiers"]["derive"]["sources"]["source1"] = (
os.path.join(pipestat_dir, "data/{sample_name}.txt")
)

with open(pipestat_pipeline_interface_file, "r") as f:
pipestat_piface_data = safe_load(f)

pipeline_name = pipestat_piface_data["pipeline_name"]

with open(pipestat_project_file, "w") as f:
dump(pipestat_project_data, f)

x = ["rerun", "--looper-config", tp]
try:
Expand Down
1 change: 1 addition & 0 deletions tests/test_comprehensive.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat):
tsv_list = [os.path.join(sd, f) for f in os.listdir(sd) if f.endswith(".tsv")]
assert len(tsv_list) == 0
with pytest.raises(RecordNotFoundError):
psm = PipestatManager(config_file=path_to_pipestat_config)
retrieved_result = psm.retrieve_one(record_identifier="frog_2")


Expand Down
Loading