diff --git a/looper/conductor.py b/looper/conductor.py index ffbb1b547..1b08961d3 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -4,12 +4,15 @@ import logging import os import subprocess +import signal +import psutil +import sys import time import yaml from math import ceil from copy import copy, deepcopy from json import loads -from subprocess import check_output +from subprocess import check_output, PIPE from typing import * from eido import read_schema, get_input_files_size @@ -189,6 +192,7 @@ def __init__( the project level, rather that on the sample level) """ super(SubmissionConductor, self).__init__() + self.collate = collate self.section_key = PROJECT_PL_KEY if self.collate else SAMPLE_PL_KEY self.pl_iface = pipeline_interface @@ -210,6 +214,7 @@ def __init__( self._curr_size = 0 self._failed_sample_names = [] self._curr_skip_pool = [] + self.process_id = None # this is used for currently submitted subprocess if self.extra_pipe_args: _LOGGER.debug( @@ -392,6 +397,10 @@ def submit(self, force=False): not for dry run) """ submitted = False + + # Override signal handler so that Ctrl+C can be used to gracefully terminate child process + signal.signal(signal.SIGINT, self._signal_int_handler) + if not self._pool: _LOGGER.debug("No submission (no pooled samples): %s", self.pl_name) # submitted = False @@ -420,9 +429,11 @@ def submit(self, force=False): submission_command = "{} {}".format(sub_cmd, script) # Capture submission command return value so that we can # intercept and report basic submission failures; #167 - try: - subprocess.check_call(submission_command, shell=True) - except subprocess.CalledProcessError: + process = subprocess.Popen(submission_command, stderr=PIPE, shell=True) + self.process_id = process.pid + output, errors = process.communicate() + _LOGGER.debug(msg=errors) + if process.returncode != 0: fails = ( "" if self.collate else [s.sample_name for s in self._samples] ) @@ -489,6 +500,87 @@ def _sample_lump_name(self, pool): # name concordant with 1-based, not 0-based indexing. return "lump{}".format(self._num_total_job_submissions + 1) + def _signal_int_handler(self, signal, frame): + """ + For catching interrupt (Ctrl +C) signals. Fails gracefully. + """ + signal_type = "SIGINT" + self._generic_signal_handler(signal_type) + + def _generic_signal_handler(self, signal_type): + """ + Function for handling both SIGTERM and SIGINT + """ + message = "Received " + signal_type + ". Failing gracefully..." + _LOGGER.warning(msg=message) + + self._terminate_current_subprocess() + + sys.exit(1) + + def _terminate_current_subprocess(self): + """This terminates the current sub process associated with self.process_id""" + + def pskill(proc_pid, sig=signal.SIGINT): + parent_process = psutil.Process(proc_pid) + for child_proc in parent_process.children(recursive=True): + child_proc.send_signal(sig) + parent_process.send_signal(sig) + + if self.process_id is None: + return + + # Gently wait for the subprocess before attempting to kill it + sys.stdout.flush() + still_running = self._attend_process(psutil.Process(self.process_id), 0) + sleeptime = 0.25 + time_waiting = 0 + + while still_running and time_waiting < 3: + try: + if time_waiting > 2: + pskill(self.process_id, signal.SIGKILL) + elif time_waiting > 1: + pskill(self.process_id, signal.SIGTERM) + else: + pskill(self.process_id, signal.SIGINT) + + except OSError: + # This would happen if the child process ended between the check + # and the next kill step + still_running = False + time_waiting = time_waiting + sleeptime + + # Now see if it's still running + time_waiting = time_waiting + sleeptime + if not self._attend_process(psutil.Process(self.process_id), sleeptime): + still_running = False + + if still_running: + _LOGGER.warning(f"Unable to halt child process: {self.process_id}") + else: + if time_waiting > 0: + note = f"terminated after {time_waiting} sec" + else: + note = "was already terminated" + _LOGGER.warning(msg=f"Child process {self.process_id} {note}.") + + def _attend_process(self, proc, sleeptime): + """ + Waits on a process for a given time to see if it finishes, returns True + if it's still running after the given time or False as soon as it + returns. + + :param psutil.Process proc: Process object opened by psutil.Popen() + :param float sleeptime: Time to wait + :return bool: True if process is still running; otherwise false + """ + try: + proc.wait(timeout=int(sleeptime)) + except psutil.TimeoutExpired: + return True + return False + def _jobname(self, pool): """Create the name for a job submission.""" return "{}_{}".format(self.pl_iface.pipeline_name, self._sample_lump_name(pool)) @@ -563,6 +655,7 @@ def _set_pipestat_namespace( "results_file": psm.file, "record_identifier": psm.record_identifier, "config_file": psm.config_path, + "output_schema": psm.cfg["_schema_path"], } filtered_namespace = {k: v for k, v in full_namespace.items() if v} return YAMLConfigManager(filtered_namespace) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index a78b632dd..bd28baa86 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -11,4 +11,5 @@ pyyaml>=3.12 rich>=9.10.0 ubiquerg>=0.5.2 yacman==0.9.3 -pydantic2-argparse>=0.9.2 \ No newline at end of file +pydantic2-argparse>=0.9.2 +psutil \ No newline at end of file diff --git a/requirements/requirements-test.txt b/requirements/requirements-test.txt index 87d100866..f5579ebaf 100644 --- a/requirements/requirements-test.txt +++ b/requirements/requirements-test.txt @@ -4,4 +4,5 @@ pytest pytest-cov pytest-remotedata veracitools -GitPython \ No newline at end of file +GitPython +psutil \ No newline at end of file diff --git a/tests/data/hello_looper-dev/advanced/pipeline/col_pipeline1.py b/tests/data/hello_looper-dev/advanced/pipeline/col_pipeline1.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/hello_looper-dev/advanced/pipeline/col_pipeline2.py b/tests/data/hello_looper-dev/advanced/pipeline/col_pipeline2.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/hello_looper-dev/advanced/pipeline/other_pipeline2.py b/tests/data/hello_looper-dev/advanced/pipeline/other_pipeline2.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline1.py b/tests/data/hello_looper-dev/advanced/pipeline/pipeline1.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_project.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_project.yaml index cddc14b76..2a23d3214 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_project.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_project.yaml @@ -2,10 +2,8 @@ pipeline_name: PIPELINE1 pipeline_type: project output_schema: output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/col_pipeline1.py" + path: "{looper.piface_dir}/col_pipeline1.py" command_template: > - {pipeline.var_templates.path} --project-name {project.name} + python3 {pipeline.var_templates.path} --project-name {project.name} + -bioconductor: - readFunName: readData - readFunPath: readData.R diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_sample.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_sample.yaml index 43638d923..8e79b7ae7 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_sample.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface1_sample.yaml @@ -3,13 +3,11 @@ pipeline_type: sample input_schema: https://schema.databio.org/pep/2.0.0.yaml output_schema: output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/pipeline1.py" + path: "{looper.piface_dir}/pipeline1.py" pre_submit: python_functions: - looper.write_sample_yaml command_template: > - {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + -bioconductor: - readFunName: readData - readFunPath: readData.R diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_project.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_project.yaml index 7c4a42238..824b7e09b 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_project.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_project.yaml @@ -2,12 +2,10 @@ pipeline_name: OTHER_PIPELINE2 pipeline_type: project output_schema: output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/col_pipeline2.py" + path: "{looper.piface_dir}/col_pipeline2.py" command_template: > - {pipeline.var_templates.path} --project-name {project.name} + python3 {pipeline.var_templates.path} --project-name {project.name} compute: size_dependent_variables: resources-project.tsv -bioconductor: - readFunName: readData - readFunPath: readData.R + diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_sample.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_sample.yaml index 987f7873d..589aef6dc 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_sample.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipeline_interface2_sample.yaml @@ -2,15 +2,12 @@ pipeline_name: OTHER_PIPELINE2 pipeline_type: sample output_schema: output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/other_pipeline2.py" + path: "{looper.piface_dir}/other_pipeline2.py" pre_submit: python_functions: - looper.write_sample_yaml command_template: > - {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} compute: size_dependent_variables: resources-sample.tsv -bioconductor: - readFunName: readData - readFunPath: readData.R diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface1_sample.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface1_sample.yaml index ff40c411a..e687ea0d3 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface1_sample.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface1_sample.yaml @@ -3,13 +3,11 @@ pipeline_type: sample input_schema: https://schema.databio.org/pep/2.0.0.yaml output_schema: pipestat_output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/pipeline1.py" + path: "{looper.piface_dir}/pipeline1.py" pre_submit: python_functions: - looper.write_sample_yaml command_template: > - {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + -bioconductor: - readFunName: readData - readFunPath: readData.R diff --git a/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface2_sample.yaml b/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface2_sample.yaml index 79dcf50f8..bac3ea3d4 100644 --- a/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface2_sample.yaml +++ b/tests/data/hello_looper-dev/advanced/pipeline/pipestat_pipeline_interface2_sample.yaml @@ -3,15 +3,13 @@ pipeline_type: sample input_schema: https://schema.databio.org/pep/2.0.0.yaml output_schema: pipestat_output_schema.yaml var_templates: - path: "{looper.piface_dir}/pipelines/other_pipeline2.py" + path: "{looper.piface_dir}/other_pipeline2.py" pre_submit: python_functions: - looper.write_sample_yaml command_template: > - {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} + python3 {pipeline.var_templates.path} --sample-name {sample.sample_name} --req-attr {sample.attr} compute: size_dependent_variables: resources-sample.tsv -bioconductor: - readFunName: readData - readFunPath: readData.R + diff --git a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/count_lines.py b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/count_lines.py index 97e866ee4..6f6a4ab8f 100755 --- a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/count_lines.py +++ b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/count_lines.py @@ -1,3 +1,5 @@ +import os.path + import pipestat import sys @@ -8,14 +10,17 @@ ] # this is the sample we wish to process by reading the number of lines sample_name = sys.argv[2] results_file = sys.argv[3] +schema_path = sys.argv[4] # Create pipestat manager and then report values psm = pipestat.PipestatManager( - schema_path="pipeline_pipestat/pipestat_output_schema.yaml", + schema_path=schema_path, results_file_path=results_file, record_identifier=sample_name, ) + +text_file = os.path.abspath(text_file) # Read text file and count lines with open(text_file, "r") as f: result = {"number_of_lines": len(f.readlines())} diff --git a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml index 1d26ac435..e5a144027 100644 --- a/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml +++ b/tests/data/hello_looper-dev/pipestat/pipeline_pipestat/pipeline_interface.yaml @@ -2,4 +2,4 @@ pipeline_name: example_pipestat_pipeline pipeline_type: sample output_schema: pipestat_output_schema.yaml command_template: > - python {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file} \ No newline at end of file + python3 {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file} {pipestat.output_schema} \ No newline at end of file diff --git a/tests/smoketests/test_other.py b/tests/smoketests/test_other.py index 2527f4f25..b90e9b61b 100644 --- a/tests/smoketests/test_other.py +++ b/tests/smoketests/test_other.py @@ -95,6 +95,30 @@ def test_pipestat_rerun(self, prep_temp_pep_pipestat, pipeline_name, flags): """Verify that rerun works with either failed or waiting flags""" tp = prep_temp_pep_pipestat _make_flags_pipestat(tp, flags, pipeline_name) + path_to_looper_config = prep_temp_pep_pipestat + pipestat_dir = os.path.dirname(path_to_looper_config) + + # open up the project config and replace the derived attributes with the path to the data. In a way, this simulates using the environment variables. + pipestat_project_file = get_project_config_path(path_to_looper_config) + + pipestat_pipeline_interface_file = os.path.join( + pipestat_dir, "pipeline_pipestat/pipeline_interface.yaml" + ) + + with open(pipestat_project_file, "r") as f: + pipestat_project_data = safe_load(f) + + pipestat_project_data["sample_modifiers"]["derive"]["sources"]["source1"] = ( + os.path.join(pipestat_dir, "data/{sample_name}.txt") + ) + + with open(pipestat_pipeline_interface_file, "r") as f: + pipestat_piface_data = safe_load(f) + + pipeline_name = pipestat_piface_data["pipeline_name"] + + with open(pipestat_project_file, "w") as f: + dump(pipestat_project_data, f) x = ["rerun", "--looper-config", tp] try: diff --git a/tests/test_comprehensive.py b/tests/test_comprehensive.py index cce74ca54..9b857f8f7 100644 --- a/tests/test_comprehensive.py +++ b/tests/test_comprehensive.py @@ -167,6 +167,7 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): tsv_list = [os.path.join(sd, f) for f in os.listdir(sd) if f.endswith(".tsv")] assert len(tsv_list) == 0 with pytest.raises(RecordNotFoundError): + psm = PipestatManager(config_file=path_to_pipestat_config) retrieved_result = psm.retrieve_one(record_identifier="frog_2")