diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index d12d1ce..677c502 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -1,6 +1,6 @@ name: Unit Tests -on: [push] +on: [push, pull_request] jobs: build: @@ -23,4 +23,4 @@ jobs: - name: Test with nosetests run: | nosetests -w janis_assistant --with-coverage --cover-package=janis_assistant - - uses: codecov/codecov-action@v1 \ No newline at end of file + - uses: codecov/codecov-action@v1 diff --git a/janis_assistant/cli.py b/janis_assistant/cli.py index a091c24..01846b6 100644 --- a/janis_assistant/cli.py +++ b/janis_assistant/cli.py @@ -94,7 +94,7 @@ def process_args(sysargs=None): ) add_translate_args( subparsers.add_parser( - "translate", help="Translate a janis workflow to CWL or WDL" + "translate", help="Translate a janis workflow to CWL, WDL, or Nextflow" ) ) add_inputs_args( diff --git a/janis_assistant/data/models/preparedjob.py b/janis_assistant/data/models/preparedjob.py index c59af7e..dab67f1 100644 --- a/janis_assistant/data/models/preparedjob.py +++ b/janis_assistant/data/models/preparedjob.py @@ -9,6 +9,7 @@ from janis_assistant.management.configuration import ( parse_if_dict, JanisConfigurationCromwell, + JanisConfigurationNextflow, JanisConfigurationTemplate, JanisConfigurationNotifications, JanisConfigurationEnvironment, @@ -66,6 +67,7 @@ def __init__( container_type: str = None, workflow_reference: str = None, post_run_script: str = None, + nextflow: JanisConfigurationNextflow = None ): """ @@ -116,6 +118,10 @@ def __init__( "cromwell", skip_if_empty=not requires_cromwell_config, ) + requires_nextflow_config = self.engine == EngineType.nextflow + self.nextflow: JanisConfigurationNextflow = parse_if_dict( + JanisConfigurationNextflow, nextflow or {}, "nextflow", skip_if_empty=not requires_nextflow_config + ) self.template: JanisConfigurationTemplate = parse_if_dict( JanisConfigurationTemplate, template or {}, "template", skip_if_empty=False ) diff --git a/janis_assistant/data/models/run.py b/janis_assistant/data/models/run.py index e7e1e40..8d1cc58 100644 --- a/janis_assistant/data/models/run.py +++ b/janis_assistant/data/models/run.py @@ -268,7 +268,7 @@ def format(self, **kwargs): errors.extend(r.error for r in self.runs if r.error) rstatuses = ", ".join( - str(r.status.to_string()) for r in self.runs if r.status + str(r.status) for r in self.runs if r.status ) statuses = self.status.to_string() if self.status else rstatuses diff --git a/janis_assistant/engines/__init__.py b/janis_assistant/engines/__init__.py index 1767d08..a3f45d6 100644 --- a/janis_assistant/engines/__init__.py +++ b/janis_assistant/engines/__init__.py @@ -4,6 +4,7 @@ from .cromwell import Cromwell, CromwellConfiguration from .enginetypes import EngineType from .cwltool.main import CWLTool +from .nextflow.main import Nextflow from .engine import Engine from janis_core import SupportedTranslation @@ -15,6 +16,9 @@ def get_ideal_specification_for_engine(eng: Engine): elif isinstance(eng, Toil): return SupportedTranslation.CWL + elif isinstance(eng, Nextflow): + return SupportedTranslation.Nextflow + return SupportedTranslation.CWL @@ -26,5 +30,7 @@ def get_engine_type(engtype: Union[str, EngineType]): return CWLTool elif engid == EngineType.toil.value: return Toil + elif engid == EngineType.nextflow.value: + return Nextflow raise Exception("Couldn't recognise engine type ") diff --git a/janis_assistant/engines/engine.py b/janis_assistant/engines/engine.py index 36ad451..c0eee36 100644 --- a/janis_assistant/engines/engine.py +++ b/janis_assistant/engines/engine.py @@ -64,7 +64,16 @@ def terminate_task(self, identifier) -> TaskStatus: @abstractmethod def metadata(self, identifier) -> RunModel: - pass + return RunModel( + id_=identifier, + engine_id=identifier, + execution_dir=None, + submission_id=None, + name=identifier, + status=self.taskmeta.get("status"), + jobs=list(self.taskmeta.get("jobs", {}).values()), + error=self.taskmeta.get("error"), + ) keys_to_ignore = {"progress_callbacks"} diff --git a/janis_assistant/engines/enginetypes.py b/janis_assistant/engines/enginetypes.py index 9ac17c1..795c4ec 100644 --- a/janis_assistant/engines/enginetypes.py +++ b/janis_assistant/engines/enginetypes.py @@ -5,10 +5,11 @@ class EngineType(Enum): cromwell = "cromwell" cwltool = "cwltool" toil = "toil" + nextflow = "nextflow" def __str__(self): return self.value @staticmethod def engines(): - return [EngineType.cwltool.value, EngineType.cromwell.value] + return [EngineType.cwltool.value, EngineType.cromwell.value, EngineType.nextflow.value] diff --git a/janis_assistant/engines/nextflow/__init__.py b/janis_assistant/engines/nextflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/janis_assistant/engines/nextflow/main.py b/janis_assistant/engines/nextflow/main.py new file mode 100644 index 0000000..7e48077 --- /dev/null +++ b/janis_assistant/engines/nextflow/main.py @@ -0,0 +1,538 @@ +import json +import os +import glob +import time +import re +import subprocess +import socketserver +import socket +import random +from typing import Dict, Any, Optional +from datetime import datetime +from http.server import BaseHTTPRequestHandler + +from janis_core import LogLevel +from janis_core.types.data_types import is_python_primitive +from janis_core.utils.logger import Logger +from janis_core.translations import nfgen, NextflowTranslator +from janis_assistant.data.models.outputs import WorkflowOutputModel +from janis_assistant.data.models.run import RunModel +from janis_assistant.data.models.workflowjob import RunJobModel +from janis_assistant.engines.nextflow.nextflowconfiguration import NextflowConfiguration +from janis_assistant.engines.engine import Engine, TaskStatus +from janis_assistant.engines.enginetypes import EngineType +from janis_assistant.utils import ProcessLogger +from janis_assistant.utils.dateutils import DateUtil +from janis_assistant.management.configuration import JanisConfigurationNextflow +from janis_assistant.data.models.preparedjob import PreparedJob + + +def make_request_handler(nextflow_logger): + class NextflowRequestHandler(BaseHTTPRequestHandler): + def do_POST(self): + content_len = int(self.headers.get('Content-Length')) + post_body = self.rfile.read(content_len) + body_as_str = post_body.decode("utf-8") + body_as_json = json.loads(body_as_str) + + # Logger.debug(body_as_json) + + event = body_as_json.get("event") + + if event == "completed" or event == "error": + Logger.debug("shutting down server") + self.server.shutdown() + Logger.debug("server shut down") + + self.read_cached_process(body_as_json) + + nextflow_logger.exit_function(nextflow_logger) + nextflow_logger.terminate() + elif event.startswith("process_"): + trace = body_as_json.get("trace", {}) + name = trace.get("name") + process = trace.get("process") + task_id = trace.get("task_id") + work_dir = trace.get("workdir") + exit_code = trace.get("exit") + process_id = trace.get("native_id") + container = trace.get("container") + err_message = self.read_error_message(work_dir) + + janis_status = self.read_process_status(body_as_json) + start, finish = self.set_start_finish_time(janis_status) + + if process == NextflowTranslator.FINAL_STEP_NAME: + nextflow_logger.nf_monitor = NextFlowTaskMonitor(id=task_id, name=name, status=janis_status, + exit=exit_code, work_dir=work_dir) + else: + job = RunJobModel( + submission_id=None, + run_id=nextflow_logger.sid, + id_=name, + parent=None, + name=name, + status=janis_status, + start=start, + finish=finish, + workdir=work_dir, + stderr=os.path.join(work_dir, ".command.err"), + stdout=os.path.join(work_dir, ".command.out"), + script=os.path.join(work_dir, ".command.run"), + batchid=process_id, + container=container, + returncode=exit_code, + error=err_message + ) + + nextflow_logger.metadata_callback(nextflow_logger, job) + elif event == "started": + pass + else: + raise Exception(f"Unknown weblog request event {event}") + + @classmethod + def read_error_message(cls, work_dir: str) -> Optional[str]: + err_message = None + + file_path = os.path.join(work_dir, ".err") + if os.path.exists(file_path): + with open(file_path, "r") as f: + err_message = f.read() + + return err_message + + def read_cached_process(self, data: dict): + processes = data.get("metadata", {}).get("workflow", {}).get("workflowStats", {}).get("processes", []) + + for p in processes: + if p["cached"] == 1: + name = p["name"] + janis_status = self.read_cached_process_status(p) + start, finish = self.set_start_finish_time(janis_status) + + job = RunJobModel( + submission_id=None, + run_id=nextflow_logger.sid, + id_=name, + parent=None, + name=name, + status=janis_status, + start=start, + finish=finish, + workdir=None, + cached=True + ) + + nextflow_logger.metadata_callback(nextflow_logger, job) + + @classmethod + def read_cached_process_status(cls, process_json: dict): + if process_json.get("errored") == False: + return TaskStatus.COMPLETED + else: + return TaskStatus.FAILED + + def read_process_status(self, data: dict): + status = data["trace"]["status"] + + janis_status = TaskStatus.QUEUED + if status == "COMPLETED": + janis_status = TaskStatus.COMPLETED + elif status == "RUNNING": + janis_status = TaskStatus.RUNNING + elif status == "SUBMITTED": + janis_status = TaskStatus.QUEUED + elif status == "FAILED": + janis_status = TaskStatus.FAILED + + return janis_status + + def set_start_finish_time(self, janis_status: TaskStatus): + start = DateUtil.now() if janis_status == TaskStatus.RUNNING else None + finish = DateUtil.now() if janis_status == TaskStatus.COMPLETED else None + + return start, finish + + return NextflowRequestHandler + + +class NextflowLogger(ProcessLogger): + + def __init__(self, sid: str, process, nextflow_log_filename, logfp, metadata_callback, execution_directory, + listener_host, listener_port, listener_socket, exit_function=None): + self.sid = sid + + self.error = None + self.metadata_callback = metadata_callback + self.outputs = None + self.workflow_scope = [] + self.execution_directory = execution_directory + self.work_directory = None + self.nextflow_log_file = os.path.join(self.execution_directory, nextflow_log_filename) + self.nf_monitor = None + self.executor = None + self.current_nf_monitor = None + self.current_nf_submitter = None + self.listener_host = listener_host + self.listener_port = listener_port + self.listener_socket = listener_socket + super().__init__( + process=process, prefix="nextflow: ", logfp=logfp, exit_function=exit_function + ) + + def run(self): + try: + httpd = None + self.listener_socket.close() + socketserver.ThreadingTCPServer.allow_reuse_address = True + httpd = socketserver.ThreadingTCPServer((self.listener_host, self.listener_port), + make_request_handler(self)) + print("serving at port", self.listener_port) + + if httpd is not None: + httpd.serve_forever() + else: + raise Exception("Failed to start http server to listen to Nextflow status update") + + except KeyboardInterrupt: + self.should_terminate = True + print("Detected keyboard interrupt") + self.shutdown_listener(httpd) + + except Exception as e: + print("Detected another error") + if not self.shutdown_listener(httpd): + self.exit_function(self) + self.process.kill() + self.terminate() + raise + + def shutdown_listener(self, httpd): + if httpd is not None: + httpd.shutdown() + + return True + + return False + + +class NextFlowTaskMonitor: + task_monitor_regex = r"TaskHandler\[(.+)\]" + + def __init__(self, id: Optional[str] = None, + name: Optional[str] = None, + status: Optional[str] = None, + exit: Optional[str] = None, + error: Optional[str] = None, + work_dir: Optional[str] = None, + stdout_path: Optional[str] = None, + stderr_path: Optional[str] = None, + ): + + self.id = id + self.name = name + self.status = status + self.exit = exit + self.error = error + self.workDir = work_dir + self.stdout_path = stdout_path + self.stderr_path = stderr_path + + @classmethod + def from_task_monitor(cls, task_monitor_log_entry: str): + + Logger.info("[Task monitor]") + Logger.info(task_monitor_log_entry) + match = re.search(cls.task_monitor_regex, task_monitor_log_entry) + monitor = cls() + + if match and len(match.groups()) >= 1: + task_monitor_str = match[1] + task_monitor = task_monitor_str.split("; ") + + Logger.debug("task_monitor") + Logger.debug(task_monitor) + + for att in task_monitor: + if ": " in att: + parts = att.split(": ") + key = parts[0] + val = parts[1] + setattr(monitor, key.strip(), val.strip()) + + else: + Logger.warn(f"unknown task_monitor entry {att}") + + # TODO: why is nextflow returning + # 'workDir: /data/gpfs/projects/punim0755/workspace/juny/nextflow/fastqcNf/janis/execution/work/f1/da15bd54d649281b1f60e93047128f started: 1618904643195' + if monitor.workDir is not None: + unexpected_str = " started" + if monitor.workDir.endswith(unexpected_str): + monitor.workDir = monitor.workDir[:- len(unexpected_str)] + + if monitor: + monitor.read_task_status() + monitor.read_stdout_path() + monitor.read_stderr_path() + + return monitor + + def read_task_status(self): + if self.status == 'COMPLETED' and self.exit == '0': + self.status = TaskStatus.COMPLETED + elif self.status == 'COMPLETED' and self.exit == '1': + self.status = TaskStatus.FAILED + else: + self.status = TaskStatus.PROCESSING + + def read_stdout_path(self): + self.stdout_path = os.path.join(self.workDir, ".command.out") + + def read_stderr_path(self): + self.stderr_path = os.path.join(self.workDir, ".command.err") + + +class Nextflow(Engine): + def __init__( + self, + execution_dir: str, + configuration_dir: str, + logfile=None, + identifier: str = "nextflow", + config: NextflowConfiguration = None, + ): + super().__init__( + identifier, EngineType.nextflow, logfile=logfile, execution_dir=execution_dir + ) + self.configuration_dir = configuration_dir + + self.process = None + self._logger = None + self.nextflow_log_filename = f"nextflow-{int(time.time())}.log" + self.listener_host = "localhost" + self.listener_port = None + self.listener_socket = None + + self.taskmeta = {} + + self.find_or_generate_config(config) + + def find_or_generate_config(self, config: NextflowConfiguration): + """ + Nextflow run configuration + + :param config: + :type config: + :param job: + :type job: + :return: + :rtype: + """ + job = PreparedJob.instance() + + if config: + self.config = config + else: + self.config = ( + job.template.template.engine_config(EngineType.nextflow, job) + or NextflowConfiguration(job) + ) + + def start_engine(self): + return self + + def stop_engine(self): + return self + + def terminate_task(self, identifier) -> TaskStatus: + self.stop_engine() + self.taskmeta["status"] = TaskStatus.ABORTED + return TaskStatus.ABORTED + + def find_available_port(self): + a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + range_min = 8000 + range_max = 8100 + available_port = None + num_tries = 10 + while True and num_tries > 0: + port = random.randint(range_min, range_max) + num_tries -= 1 + + location = (self.listener_host, port) + result_of_check = a_socket.connect_ex(location) + + if result_of_check == 0: + # Port is open + continue + else: + # Port is not open + available_port = port + break + + if available_port is None: + raise Exception(f"Cannot find available port between {range_min} and {range_max}") + + return port, a_socket + + def start_from_paths(self, wid, source_path: str, input_path: str, deps_path: str): + Logger.debug(f"source_path: {source_path}") + Logger.debug(f"input_path: {input_path}") + Logger.debug(f"deps_path: {deps_path}") + + self.taskmeta = { + "start": DateUtil.now(), + "status": TaskStatus.PROCESSING, + "jobs": {}, + } + + self.listener_port, self.listener_socket = self.find_available_port() + Logger.info(f"Port {self.listener_port} is available") + cmd = self.config.build_command_line(source_path=source_path, input_path=input_path, + nextflow_log_filename=self.nextflow_log_filename, + configuration_dir=self.configuration_dir, + host=self.listener_host, port=self.listener_port) + + Logger.info(f"Running command: {cmd}") + + self.process = subprocess.Popen( + cmd, preexec_fn=os.setsid, + ) + + Logger.info("Nextflow has started with pid=" + str(self.process.pid)) + self.process_id = self.process.pid + + self._logger = NextflowLogger( + wid, + self.process, + nextflow_log_filename=self.nextflow_log_filename, + logfp=open(self.logfile, "a+"), + metadata_callback=self.task_did_update, + execution_directory=self.execution_dir, + listener_host=self.listener_host, + listener_port=self.listener_port, + listener_socket=self.listener_socket, + exit_function=self.task_did_exit + ) + + return wid + + def metadata(self, identifier) -> RunModel: + return RunModel( + id_=identifier, + engine_id=identifier, + execution_dir=self.execution_dir, + submission_id=None, + name=identifier, + status=self.taskmeta.get("status"), + jobs=list(self.taskmeta.get("jobs", {}).values()), + error=self.taskmeta.get("error"), + ) + + def outputs_task(self, identifier) -> Dict[str, Any]: + outs = self.taskmeta.get("outputs") + + if not outs: + return {} + + retval: Dict[str, WorkflowOutputModel] = {} + for k, o in outs.items(): + retval.update(self.process_potential_out(identifier, k, o)) + + return retval + + @classmethod + def process_potential_out(cls, run_id, key, out): + if isinstance(out, list): + outs = [cls.process_potential_out(run_id, key, o) for o in out] + ups = {} + for o in outs: + for k, v in o.items(): + if k not in ups: + ups[k] = [] + ups[k].append(v) + return ups + + if out is None: + return {} + + return { + key: WorkflowOutputModel( + submission_id=None, + run_id=run_id, + id_=key, + original_path=None, + is_copyable=False, + timestamp=DateUtil.now(), + value=out, + new_path=None, + output_folder=None, + output_name=None, + secondaries=None, + extension=None, + ) + } + + def poll_task(self, identifier) -> TaskStatus: + return self.taskmeta.get("status", TaskStatus.PROCESSING) + + def read_janis_output(self): + outputs = {} + + if self.taskmeta["work_directory"] is None: + return outputs + + output_meta_regex = os.path.join(self.taskmeta["work_directory"], NextflowTranslator.OUTPUT_METADATA_FILENAME) + Logger.debug(output_meta_regex) + found = glob.glob(output_meta_regex) + Logger.debug(found) + if len(found) == 1: + output_meta_path = found[0] + else: + raise Exception(f"Cannot find file that matches {output_meta_regex}") + + if output_meta_path is not None: + with open(output_meta_path, "r") as f: + for line in f: + try: + key, val = line.split("=") + key = key.strip() + val = val.strip() + + # Note: cannot use json.load or ast.literal_eval because strings are not quoted + if val == "[]": + val = [] + elif val.startswith("["): + val = val.strip("][").split(", ") + + outputs[key] = val + + except Exception as e: + raise Exception(f"Failed to parse line {line} in {NextflowTranslator.OUTPUT_METADATA_FILENAME}") + + return outputs + + def task_did_exit(self, logger: NextflowLogger): + Logger.debug("Shell fired 'did exit'") + + if logger.nf_monitor is not None: + self.taskmeta["status"] = logger.nf_monitor.status + self.taskmeta["finish"] = DateUtil.now() + self.taskmeta["work_directory"] = logger.nf_monitor.workDir + self.taskmeta["stdout_path"] = logger.nf_monitor.stdout_path + self.taskmeta["outputs"] = self.read_janis_output() + else: + self.taskmeta["status"] = TaskStatus.FAILED + self.taskmeta["outputs"] = None + + for callback in self.progress_callbacks.get(self._logger.sid, []): + callback(self.metadata(self._logger.sid)) + + def task_did_update(self, logger: NextflowLogger, job: RunJobModel): + Logger.debug(f"Updated task {job.id_} with status={job.status}") + self.taskmeta["jobs"][job.id_] = job + + for callback in self.progress_callbacks.get(logger.sid, []): + callback(self.metadata(logger.sid)) diff --git a/janis_assistant/engines/nextflow/nextflowconfiguration.py b/janis_assistant/engines/nextflow/nextflowconfiguration.py new file mode 100644 index 0000000..9cf7d15 --- /dev/null +++ b/janis_assistant/engines/nextflow/nextflowconfiguration.py @@ -0,0 +1,146 @@ +import os +import stat +import shutil +import subprocess +from typing import Optional, Dict, List, Any +from janis_assistant.management.configuration import JanisConfigurationNextflow +from janis_assistant.data.models.preparedjob import PreparedJob +from janis_core.utils.logger import Logger + + +class NextflowConfiguration: + EXECUTABLE = "nextflow" + DOWNLOAD_EXEC_URL = "https://get.nextflow.io" + + def __init__(self, + job: PreparedJob, + process_executor: Optional[str] = None, + docker: Optional[bool] = True, + singularity: Optional[bool] = False, + queue: Optional[str] = None, + singularity_container_dir: Optional[str] = None, + job_email: Optional[str] = None): + + self.job = job + self.process_executor = process_executor + self.singularity = singularity + self.docker = docker + self.queue = queue + self.executable_path = self.resolve_executable(job.nextflow, job.config_dir) + self.singularity_container_dir = singularity_container_dir + self.job_email = job_email + + def build_command_line(self, source_path: str, input_path: str, configuration_dir: str, nextflow_log_filename: str, host: str, port: int): + + config_path = self.build_config_file(configuration_dir) + + cmd = [ + self.executable_path, + "-C", config_path, + "-log", nextflow_log_filename, + "run", source_path, + "-params-file", input_path, + "-ansi-log", 'false', + "-with-weblog", f"http://{host}:{port}", + "-resume" + ] + + return cmd + + def build_config_file(self, configuration_dir: str): + config_path = os.path.join(configuration_dir, "nextflow.config") + config_values = {} + + # we only want one or the other and we want to prioritise singularity + if self.singularity: + config_values["singularity.enabled"] = self._to_nexflow_string(self.singularity) + config_values["singularity.autoMounts"] = self._to_nexflow_string(True) + + if self.singularity_container_dir is not None: + config_values["singularity.cacheDir"] = self._to_nexflow_string(self.singularity_container_dir) + else: + config_values["docker.enabled"] = self._to_nexflow_string(self.docker) + + if self.process_executor is not None: + config_values["process.executor"] = self._to_nexflow_string(self.process_executor) + + if self.queue is not None: + config_values["process.queue"] = self._to_nexflow_string(self.queue) + + if self.job_email is not None: + config_values["process.clusterOptions"] = self._to_nexflow_string( + f"--mail-user {self.job_email} --mail-type END") + + config_lines = [f"{key} = {value}" for key, value in config_values.items()] + with open(config_path, "w") as f: + f.write("\n".join(config_lines)) + + return config_path + + def _to_nexflow_string(self, val: Any): + if type(val) == bool: + return str(val).lower() + if type(val) == str: + return f"'{val}'" + + return val + + @classmethod + def executable_exists(cls): + return shutil.which(cls.EXECUTABLE) + + @classmethod + def resolve_executable(cls, janis_nextflow_config: JanisConfigurationNextflow, janis_config_dir: str) -> str: + """ + Resolve the path to the Nextflow executable file + + :param janis_nextflow_config: + :type janis_nextflow_config: + :param janis_config_dir: + :type janis_config_dir: + :return: + :rtype: + """ + # Order of checks: + # 1. Check if Nextflow executable path is provided in config + # 2. Check if Nextflow executable path is found in `.janis` configuration directory + # 3. Check if Nextflow executable can be found in $PATH env variable + # 4. Attempt to download Nextflow executable + + # Check if Nextflow executable path is provided in config + path = janis_nextflow_config.executable + + if path is not None and not os.path.exists(path): + raise Exception(f"Nextflow executable file provided in Janis configuration not found: {path}") + + # Check if Nextflow executable path is found in `.janis` configuration directory + if path is None: + path = os.path.join(janis_config_dir, cls.EXECUTABLE) + if not os.path.exists(path): + path = None + + # Check if Nextflow executable can be found in $PATH env variable + if path is None: + path = cls.executable_exists() + + # Attempt to download Nextflow executable + # Now, try to download online + # follow instructions from https://www.nextflow.io/docs/latest/getstarted.html + if path is None: + try: + Logger.info("Downloading Nextflow executable") + path = os.path.join(janis_config_dir, cls.EXECUTABLE) + process = subprocess.Popen(f"cd {janis_config_dir} && curl -s {cls.DOWNLOAD_EXEC_URL} | bash", + shell=True) + process.wait() + if os.path.exists(path): + st = os.stat(path) + os.chmod(path, st.st_mode | stat.S_IEXEC) + else: + raise + except Exception as e: + raise Exception("Failed to download Nextflow executable") + + Logger.info(f"Nextflow executable path: {path}") + + return path diff --git a/janis_assistant/main.py b/janis_assistant/main.py index 77af442..55bf6f0 100644 --- a/janis_assistant/main.py +++ b/janis_assistant/main.py @@ -102,7 +102,7 @@ def run_with_outputs( validation_reqs=None, engine=engine, hints={}, - keep_intermediate_files=False, + keep_intermediate_files=True, max_cores=None, max_memory=None, max_duration=None, @@ -715,6 +715,7 @@ def prepare_job( call_caching_enabled=jc.call_caching_enabled, container_type=jc.container.get_container_type(), post_run_script=post_run_script, + nextflow=jc.nextflow ) if db_type: @@ -769,6 +770,8 @@ def get_engine_from_eng( cromwelljar=cromwell_jar, execution_dir=execdir, ) + elif engid == EngineType.nextflow.value: + return get_engine_type(eng)(logfile=logfile, execution_dir=execdir, configuration_dir=confdir) return get_engine_type(eng)(logfile=logfile, execution_dir=execdir) diff --git a/janis_assistant/management/configuration.py b/janis_assistant/management/configuration.py index cd8511e..2ac6f28 100644 --- a/janis_assistant/management/configuration.py +++ b/janis_assistant/management/configuration.py @@ -166,6 +166,11 @@ def get_database_config_helper(self): ) +class JanisConfigurationNextflow(Serializable): + def __init__(self, executable: str = None): + self.executable = executable + + class JanisConfigurationRecipes(Serializable): VALID_YAML_EXTENSIONS = { "yaml", @@ -390,6 +395,7 @@ def __init__( digest_cache_location: str = None, container: Union[str, Container] = None, search_paths: List[str] = None, + nextflow: Union[JanisConfigurationNextflow, dict] = None, ): """ :param engine: Default engine to use @@ -441,6 +447,9 @@ def __init__( self.cromwell: JanisConfigurationCromwell = parse_if_dict( JanisConfigurationCromwell, cromwell or {}, "cromwell", skip_if_empty=False ) + self.nextflow: JanisConfigurationNextflow = parse_if_dict( + JanisConfigurationNextflow, nextflow or {}, "nextflow", skip_if_empty=False + ) self.notifications: JanisConfigurationNotifications = parse_if_dict( JanisConfigurationNotifications, notifications or {}, diff --git a/janis_assistant/management/workflowmanager.py b/janis_assistant/management/workflowmanager.py index fa3a55e..8db7d91 100644 --- a/janis_assistant/management/workflowmanager.py +++ b/janis_assistant/management/workflowmanager.py @@ -53,6 +53,7 @@ get_ideal_specification_for_engine, Cromwell, CWLTool, + Nextflow, CromwellConfiguration, Engine, EngineType, @@ -706,6 +707,7 @@ def callb(meta: RunModel): # add extra check for engine on resume meta = self.engine.metadata(self.get_engine_id()) if meta and meta.status in TaskStatus.final_states(): + meta.submission_id = self.submission_id self.save_metadata(meta) return self.process_completed_task() @@ -733,6 +735,7 @@ def callb(meta: RunModel): self.suspend_workflow() break + # # TODO: make this interval be a config option # if ( # DateUtil.now() - last_health_check @@ -1372,7 +1375,7 @@ def save_metadata_if_required(self): with open(os.path.join(metadir, "metadata.json"), "w+") as fp: json.dump(meta.meta, fp) - elif isinstance(engine, CWLTool): + elif isinstance(engine, CWLTool) or isinstance(engine, Nextflow): import json meta = engine.metadata(self.submission_id) @@ -1662,6 +1665,13 @@ def explode_at_index(iterable, index_to_explode, index_to_select): frompath = apply_secondary_file_format_to_filename(original_filepath, sec) tofn = apply_secondary_file_format_to_filename(outfn, sec) topath = os.path.join(outdir, tofn) + + if not os.path.exists(frompath): + if sec.startswith("."): + sec = f"^{sec}" + + frompath = apply_secondary_file_format_to_filename(original_filepath, sec) + fs.cp_from(frompath, topath, force=True) return [original_filepath, newoutputfilepath] diff --git a/janis_assistant/templates/base.py b/janis_assistant/templates/base.py index 542bca7..40ad216 100644 --- a/janis_assistant/templates/base.py +++ b/janis_assistant/templates/base.py @@ -281,7 +281,7 @@ def process_container_dir(container_dir): from os import getenv - envs_to_search = ["CWL_SINGULARITY_CACHE", "SINGULARITY_TMPDIR"] + envs_to_search = ["CWL_SINGULARITY_CACHE", "SINGULARITY_TMPDIR", "NXF_SINGULARITY_CACHEDIR"] for env in envs_to_search: e = getenv(env) if e: diff --git a/janis_assistant/templates/slurm.py b/janis_assistant/templates/slurm.py index 3f0dfc0..3f055db 100644 --- a/janis_assistant/templates/slurm.py +++ b/janis_assistant/templates/slurm.py @@ -1,6 +1,7 @@ from typing import Union, List, Optional from janis_assistant.engines.cwltool.cwltoolconfiguation import CWLToolConfiguration +from janis_assistant.engines.nextflow.nextflowconfiguration import NextflowConfiguration from janis_assistant.data.models.preparedjob import PreparedJob from janis_assistant.engines.cromwell.cromwellconfiguration import CromwellConfiguration @@ -144,6 +145,18 @@ def cwltool(self, job): return config + def nextflow(self, job): + + config = NextflowConfiguration(job) + config.singularity = True + config.process_executor = "slurm" + config.singularity_container_dir = self.singularity_container_dir + + if self.send_job_emails: + config.job_email = job.notifications.email + + return config + def engine_config(self, engine: EngineType, job): if engine == EngineType.cromwell: return self.cromwell(job) @@ -151,6 +164,9 @@ def engine_config(self, engine: EngineType, job): elif engine == EngineType.cwltool: return self.cwltool(job) + elif engine == EngineType.nextflow: + return self.nextflow(job) + raise NotImplementedError( f"The {self.__class__.__name__} template does not have a configuration for {engine.value}" )