From d03e485b5f1eaba8d5cfac4727dfe80096dccad6 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Mon, 16 Dec 2024 18:05:10 -0800 Subject: [PATCH] More asyncification in src/lsst/cmservice/common/bash.py --- src/lsst/cmservice/common/bash.py | 136 +++++++++++++++++------------ src/lsst/cmservice/common/utils.py | 34 ++++++++ 2 files changed, 112 insertions(+), 58 deletions(-) diff --git a/src/lsst/cmservice/common/bash.py b/src/lsst/cmservice/common/bash.py index 1e7ceb19..299d6ed6 100644 --- a/src/lsst/cmservice/common/bash.py +++ b/src/lsst/cmservice/common/bash.py @@ -1,14 +1,12 @@ """Utility functions for working with bash scripts""" -import contextlib -import os import subprocess +from pathlib import Path from typing import Any -import yaml from fastapi.concurrency import run_in_threadpool -from ..common.utils import read_lines +from ..common.utils import check_file_exists, read_lines, write_string, yaml_dump, yaml_safe_load from .enums import StatusEnum from .errors import CMBashSubmitError @@ -28,7 +26,7 @@ async def get_diagnostic_message( ------- The last line of the log file, potentially containing a diagnostic message. """ - if not os.path.exists(log_url): + if not await run_in_threadpool(check_file_exists, log_url): return f"Log file {log_url} does not exist" try: lines = await run_in_threadpool(read_lines, log_url) @@ -59,7 +57,8 @@ async def parse_bps_stdout(url: str) -> dict[str, str]: def sync_parse_bps_stdout(url: str) -> dict[str, str]: """Parse the std from a bps submit job. Synchronous function using - standard readline. + standard readline. More work should be done to make this function work in + the async world. Parameters ---------- @@ -84,13 +83,15 @@ def sync_parse_bps_stdout(url: str) -> dict[str, str]: return out_dict -def run_bash_job( +async def run_bash_job( script_url: str, log_url: str, stamp_url: str, fake_status: StatusEnum | None = None, ) -> None: - """Run a bash job + """Run a bash job. Most of the work is done in subprocesses run in a + companion synchronous function, `sync_submit_file_to_run_in_bash`, which is + wrapped in `fastapi.concurrency.run_in_threadpool`. Parameters ---------- @@ -107,35 +108,53 @@ def run_bash_job( If set, don't actually submit the job """ if fake_status is not None: - with open(stamp_url, "w", encoding="utf-8") as fstamp: - fields = dict(status="reviewable") - yaml.dump(fields, fstamp) + fields = dict(status="reviewable") + await run_in_threadpool(yaml_dump, fields, stamp_url) return try: - with open(log_url, "w", encoding="utf-8") as fout: - os.system(f"chmod +x {script_url}") - with subprocess.Popen( - [os.path.abspath(script_url)], - stdout=fout, - stderr=fout, - ) as process: - process.wait() - if process.returncode != 0: # pragma: no cover - assert process.stderr - msg = process.stderr.read().decode() - raise CMBashSubmitError(f"Bad bash submit: {msg}") + await run_in_threadpool(sync_submit_file_to_run_in_bash, script_url, log_url) except Exception as msg: raise CMBashSubmitError(f"Bad bash submit: {msg}") from msg - with open(stamp_url, "w", encoding="utf-8") as fstamp: - fields = dict(status="accepted") - yaml.dump(fields, fstamp) + fields = dict(status="accepted") + await run_in_threadpool(yaml_dump, fields, stamp_url) -def check_stamp_file( +def sync_submit_file_to_run_in_bash(script_url: str, log_url: str) -> None: + """Make a script executable, then submit to run in bash. To be wrapped in + `fastapi.concurrency.run_in_threadpool` by the asynchronous function above, + `run_bash_job`. Just a quick attempt to wrap the process; more work should + be done here to make this function async-friendly. + + Parameters + ---------- + script_url : `str` + Path to the script to run. + log_url : `str` + Path to output the logs. + """ + with open(log_url, "w", encoding="utf-8") as fout: + script_path = Path(script_url) + if script_path.exists(): + script_path.chmod(0o755) + else: + raise CMBashSubmitError(f"No script at path {script_url}") + with subprocess.Popen( + [script_path.resolve()], + stdout=fout, + stderr=fout, + ) as process: + process.wait() + if process.returncode != 0: # pragma: no cover + assert process.stderr + msg = process.stderr.read().decode() + raise CMBashSubmitError(f"Bad bash submit: {msg}") + + +async def check_stamp_file( stamp_file: str | None, default_status: StatusEnum, ) -> StatusEnum: - """Check a 'stamp' file for a status code + """Check a 'stamp' file for a status code. Parameters ---------- @@ -152,52 +171,52 @@ def check_stamp_file( """ if stamp_file is None: return default_status - if not os.path.exists(stamp_file): + file_exists = await run_in_threadpool(check_file_exists, stamp_file) + if not file_exists: return default_status - with open(stamp_file, encoding="utf-8") as fin: - fields = yaml.safe_load(fin) - return StatusEnum[fields["status"]] + fields = await run_in_threadpool(yaml_safe_load, stamp_file) + return StatusEnum[fields["status"]] -def write_bash_script( +async def write_bash_script( script_url: str, command: str, **kwargs: Any, -) -> str: - """Utility function to write a bash script for later execution +) -> Path: + """Utility function to write a bash script for later execution. Parameters ---------- - script_url: str + script_url: `str` Location to write the script - command: str + command: `str` Main command line(s) in the script Keywords -------- - prepend: str | None + prepend: `str | None` Text to prepend before command - append: str | None + append: `str | None` Test to append after command - stamp: str | None + stamp: `str | None` Text to echo to stamp file when script completes - stamp_url: str | None + stamp_url: `str | None` Stamp file to write to when script completes - fake: str | None + fake: `str | None` Echo command instead of running it - rollback: str | None + rollback: `str | Path | None` Prefix to script_url used when rolling back processing Returns ------- - script_url : str + script_url : `str` The path to the newly written script """ prepend = kwargs.get("prepend") @@ -205,17 +224,18 @@ def write_bash_script( fake = kwargs.get("fake") rollback_prefix = kwargs.get("rollback", "") - script_url = f"{rollback_prefix}{script_url}" - with contextlib.suppress(OSError): - os.makedirs(os.path.dirname(script_url)) - - with open(script_url, "w", encoding="utf-8") as fout: - if prepend: - fout.write(f"{prepend}\n") - if fake: - command = f"echo '{command}'" - fout.write(command) - fout.write("\n") - if append: - fout.write(f"{append}\n") - return script_url + if isinstance(rollback_prefix, Path): + script_path = rollback_prefix / script_url + else: + script_path = Path(f"{rollback_prefix}{script_url}") + script_path.parent.mkdir(parents=True, exist_ok=True) + # I chose not to make the directory `with contextlib.suppress(OSError)` + # because cm-service under-reports a lot of the issues it might run into + # that we need to respond to better. I think most relevant situations will + # be handled by `parents=True, exist_ok=True`, but will revert to the above + # suppression if I am wrong. + if fake: + command = f"echo '{command}'" + contents = (prepend if prepend else "") + "\n" + command + "\n" + (append if append else "") + await run_in_threadpool(write_string, contents, script_path) + return script_path diff --git a/src/lsst/cmservice/common/utils.py b/src/lsst/cmservice/common/utils.py index f7a6c977..8b11cce1 100644 --- a/src/lsst/cmservice/common/utils.py +++ b/src/lsst/cmservice/common/utils.py @@ -4,6 +4,7 @@ import os import sys from collections.abc import Iterator, Mapping +from pathlib import Path from typing import Any import yaml @@ -101,3 +102,36 @@ def read_lines(filename: str) -> list: with open(filename, encoding="utf-8") as fin: lines = fin.readlines() return lines + + +def check_file_exists(file: str | Path) -> bool: + """`pathlib.Path.exists`, to be wrapped in + `fastapi.concurrency.run_in_threadpool` + + Parameters + ---------- + file : `str` | `pathlib.Path` + The path to the file. + + Returns + ------- + A boolean value: does the file exist? + """ + if isinstance(file, str): + file = Path(file) + return file.exists() + + +def write_string(contents: str, file: str | Path) -> None: + """Write string to a file. To be wrapped in + `fastapi.concurrency.run_in_threadpool` + + Parameters + ---------- + contents : `str` + The contents to be written. + file : `str | Path` + The path to write the file. + """ + with open(file, "w") as fout: + fout.write(contents)