From c9af6065293e33f82966d7a736e4d596806de842 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Tue, 10 Dec 2024 17:59:32 -0800 Subject: [PATCH] Addressed initial ruff async errors, added helpful error messages --- src/lsst/cmservice/common/bash.py | 56 +++++++++++++++++++-- src/lsst/cmservice/common/daemon.py | 4 +- src/lsst/cmservice/common/errors.py | 2 +- src/lsst/cmservice/common/utils.py | 62 ++++++++++++++++++++++++ src/lsst/cmservice/daemon.py | 3 +- src/lsst/cmservice/db/script_template.py | 28 ++++++++--- src/lsst/cmservice/handlers/functions.py | 39 +++++++++++---- src/lsst/cmservice/handlers/jobs.py | 9 ++-- 8 files changed, 175 insertions(+), 28 deletions(-) diff --git a/src/lsst/cmservice/common/bash.py b/src/lsst/cmservice/common/bash.py index 3c1096647..1e7ceb199 100644 --- a/src/lsst/cmservice/common/bash.py +++ b/src/lsst/cmservice/common/bash.py @@ -6,7 +6,9 @@ from typing import Any import yaml +from fastapi.concurrency import run_in_threadpool +from ..common.utils import read_lines from .enums import StatusEnum from .errors import CMBashSubmitError @@ -15,16 +17,60 @@ async def get_diagnostic_message( log_url: str, ) -> str: """Read the last line of a log file, aspirational hoping - that it contains a diagnostic error message""" - with open(log_url, encoding="utf-8") as fin: - lines = fin.readlines() + that it contains a diagnostic error message + + Parameters + ---------- + log_url : `str` + The url of the log which may contain a diagnostic message + + Returns + ------- + The last line of the log file, potentially containing a diagnostic message. + """ + if not os.path.exists(log_url): + return f"Log file {log_url} does not exist" + try: + lines = await run_in_threadpool(read_lines, log_url) if lines: return lines[-1].strip() return "Empty log file" + except Exception as e: + return f"Error reading log file: {e}" + + +async def parse_bps_stdout(url: str) -> dict[str, str]: + """Parse the std from a bps submit job. Wraps the synchronous function + and passes to `fastapi.concurrency.run_in_threadpool` + Parameters + ---------- + url : `str` + url for BPS submit stdout -def parse_bps_stdout(url: str) -> dict[str, str]: - """Parse the std from a bps submit job""" + Returns + ------- + out_dict `str` + a dictionary containing the stdout from BPS submit + """ + out_dict = await run_in_threadpool(sync_parse_bps_stdout, url) + return out_dict + + +def sync_parse_bps_stdout(url: str) -> dict[str, str]: + """Parse the std from a bps submit job. Synchronous function using + standard readline. + + Parameters + ---------- + url : `str` + url for BPS submit stdout + + Returns + ------- + out_dict `str` + a dictionary containing the stdout from BPS submit + """ out_dict = {} with open(url, encoding="utf8") as fin: line = fin.readline() diff --git a/src/lsst/cmservice/common/daemon.py b/src/lsst/cmservice/common/daemon.py index 146275622..5d9b0526f 100644 --- a/src/lsst/cmservice/common/daemon.py +++ b/src/lsst/cmservice/common/daemon.py @@ -1,5 +1,5 @@ +import asyncio from datetime import datetime, timedelta -from time import sleep from sqlalchemy.ext.asyncio import async_scoped_session from sqlalchemy.future import select @@ -32,4 +32,4 @@ async def daemon_iteration(session: async_scoped_session) -> None: async def daemon_loop(session: async_scoped_session) -> None: # pragma: no cover while True: await daemon_iteration(session) - sleep(15) + await asyncio.sleep(15) diff --git a/src/lsst/cmservice/common/errors.py b/src/lsst/cmservice/common/errors.py index 6f0ff5c91..ab96fdedc 100644 --- a/src/lsst/cmservice/common/errors.py +++ b/src/lsst/cmservice/common/errors.py @@ -96,7 +96,7 @@ class CMButlerCallError(RuntimeError): class CMSpecficiationError(KeyError): - """Raised when Specification calls out an non-existing fragement""" + """Raised when Specification calls out an non-existing fragment""" class CMTooFewAcceptedJobsError(KeyError): diff --git a/src/lsst/cmservice/common/utils.py b/src/lsst/cmservice/common/utils.py index 605312057..f7a6c9777 100644 --- a/src/lsst/cmservice/common/utils.py +++ b/src/lsst/cmservice/common/utils.py @@ -6,6 +6,8 @@ from collections.abc import Iterator, Mapping from typing import Any +import yaml + @contextlib.contextmanager def add_sys_path(path: os.PathLike | str | None) -> Iterator[None]: @@ -39,3 +41,63 @@ def update_include_dict( orig_dict[key].update(val) else: orig_dict[key] = val + + +# Synchronous file I/O utility functions included mostly to pass to +# `fastapi.concurrency.run_in_threadpool` + + +def yaml_safe_load(filename: str) -> dict: + """Helper function to open yaml files using `yaml.safe_load`. To be used + with `fastapi.concurrency.run_in_threadpool` + + Parameters + ---------- + filename : `str` + The path to the yaml file. + + Returns + ------- + A dictionary returned by `yaml.safe_load` + """ + with open(filename, encoding="utf-8") as file: + return yaml.safe_load(file) + + +def yaml_dump(contents: str | dict | list, filename: str) -> None: + """Helper function to open yaml files using `yaml.dump`. To be used + with `fastapi.concurrency.run_in_threadpool` + + Parameters + ---------- + contents: `str` | `Dict` | `list` + The contents to be dumped in a yaml file. + filename : `str` + The path where the yaml file will be written. + + Returns + ------- + A string, if the `dump` fails. + """ + with open(filename, "w", encoding="utf-8") as fout: + yaml.dump(contents, fout) + + +def read_lines(filename: str) -> list: + """Helper function to open a file and return each line, string-by-string. + Just here to wrap the `with` and pass function to + `fastapi.concurrency.run_in_threadpool` + + Parameters + ---------- + filename : `str` + The path to the yaml file. + + Returns + ------- + A list of full of the lines comprising the file, returned by standard + python `readlines`. + """ + with open(filename, encoding="utf-8") as fin: + lines = fin.readlines() + return lines diff --git a/src/lsst/cmservice/daemon.py b/src/lsst/cmservice/daemon.py index deb237199..384dbe47f 100644 --- a/src/lsst/cmservice/daemon.py +++ b/src/lsst/cmservice/daemon.py @@ -1,6 +1,5 @@ import asyncio from datetime import datetime, timedelta -from time import sleep import structlog from safir.database import create_async_session, create_database_engine @@ -26,7 +25,7 @@ async def main_loop() -> None: logger.info(f"Daemon completed {iteration_count} iterations in {delta_seconds} seconds.") last_log_time = datetime.now() await daemon_iteration(session) - sleep(15) + await asyncio.sleep(15) def main() -> None: diff --git a/src/lsst/cmservice/db/script_template.py b/src/lsst/cmservice/db/script_template.py index c5079af45..0b8e6d27c 100644 --- a/src/lsst/cmservice/db/script_template.py +++ b/src/lsst/cmservice/db/script_template.py @@ -4,11 +4,14 @@ from typing import TYPE_CHECKING, Any import yaml +from fastapi.concurrency import run_in_threadpool from sqlalchemy import JSON from sqlalchemy.ext.asyncio import async_scoped_session from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import Mapped, mapped_column +from ..common.errors import CMYamlParseError +from ..common.utils import yaml_safe_load from .base import Base from .row import RowMixin @@ -78,9 +81,14 @@ async def load( # pylint: disable=too-many-arguments Newly created `ScriptTemplate` """ full_file_path = os.path.abspath(os.path.expandvars(file_path)) - with open(full_file_path, encoding="utf-8") as fin: - data = yaml.safe_load(fin) - + if not os.path.exists(full_file_path): + raise CMYamlParseError(f"Script template does not exist at path {file_path}") + try: + data = await run_in_threadpool(yaml_safe_load, full_file_path) + except yaml.YAMLError as yaml_error: + raise CMYamlParseError(f"Error parsing YAML file: {yaml_error}") from yaml_error + except Exception as e: + raise CMYamlParseError(f"{e}") from e return await cls.create_row(session, name=name, data=data) async def update_from_file( @@ -108,7 +116,15 @@ async def update_from_file( Newly updated `ScriptTemplate` """ full_file_path = os.path.abspath(os.path.expandvars(file_path)) - with open(full_file_path, encoding="utf-8") as fin: - data = yaml.safe_load(fin) - + if not os.path.exists(full_file_path): + raise CMYamlParseError(f"Script template does not exist at path {file_path}") + + try: + data = await run_in_threadpool(yaml_safe_load, full_file_path) + except yaml.YAMLError as yaml_error: + raise CMYamlParseError( + f"Error parsing YAML file at {file_path}; throws {yaml_error}" + ) from yaml_error + except Exception as e: + raise CMYamlParseError(f"{e}") from e return await self.update_values(session, name=name, data=data) diff --git a/src/lsst/cmservice/handlers/functions.py b/src/lsst/cmservice/handlers/functions.py index 1771260f5..0de61670c 100644 --- a/src/lsst/cmservice/handlers/functions.py +++ b/src/lsst/cmservice/handlers/functions.py @@ -3,6 +3,7 @@ from typing import Any import yaml +from fastapi.concurrency import run_in_threadpool from sqlalchemy import select from sqlalchemy.ext.asyncio import async_scoped_session @@ -11,7 +12,7 @@ from ..common.enums import StatusEnum from ..common.errors import CMMissingFullnameError, CMYamlParseError -from ..common.utils import update_include_dict +from ..common.utils import update_include_dict, yaml_safe_load from ..db.campaign import Campaign from ..db.job import Job from ..db.pipetask_error import PipetaskError @@ -237,8 +238,14 @@ async def load_specification( loaded_specs = {} specification = None - with open(yaml_file, encoding="utf-8") as fin: - spec_data = yaml.safe_load(fin) + if not os.path.exists(yaml_file): + raise CMYamlParseError(f"Specification does not exist at path {yaml_file}") + try: + spec_data = await run_in_threadpool(yaml_safe_load, yaml_file) + except yaml.YAMLError as yaml_error: + raise CMYamlParseError(f"Error parsing specification {yaml_file}; threw {yaml_error}") from yaml_error + except Exception as e: + raise CMYamlParseError(f"{e}") from e for config_item in spec_data: if "Imports" in config_item: @@ -437,10 +444,16 @@ async def load_manifest_report( job = await Job.get_row_by_fullname(session, job_name) if fake_status is not None: return job - - with open(yaml_file, encoding="utf-8") as fin: - manifest_data = yaml.safe_load(fin) - + if not os.path.exists(yaml_file): + raise CMYamlParseError(f"Manifest report yaml does not exist at path {yaml_file}") + try: + manifest_data = await run_in_threadpool(yaml_safe_load, yaml_file) + except yaml.YAMLError as yaml_error: + raise CMYamlParseError( + f"Error parsing manifest report yaml at path {yaml_file}; threw {yaml_error}" + ) from yaml_error + except Exception as e: + raise CMYamlParseError(f"{e}") from e for task_name_, task_data_ in manifest_data.items(): failed_quanta = task_data_.get("failed_quanta", {}) outputs = task_data_.get("outputs", {}) @@ -682,8 +695,16 @@ async def load_error_types( error_types : list[PipetaskErrorType] Newly loaded error types """ - with open(yaml_file, encoding="utf-8") as fin: - error_types = yaml.safe_load(fin) + if not os.path.exists(yaml_file): + raise CMYamlParseError(f"Error type yaml does not exist at path {yaml_file}") + try: + error_types = await run_in_threadpool(yaml_safe_load, yaml_file) + except yaml.YAMLError as yaml_error: + raise CMYamlParseError( + f"Error parsing error type yaml at path {yaml_file}; threw {yaml_error}" + ) from yaml_error + except Exception as e: + raise CMYamlParseError(f"{e}") from e ret_list: list[PipetaskErrorType] = [] for error_type_ in error_types: diff --git a/src/lsst/cmservice/handlers/jobs.py b/src/lsst/cmservice/handlers/jobs.py index 42a9cd277..d173e71ec 100644 --- a/src/lsst/cmservice/handlers/jobs.py +++ b/src/lsst/cmservice/handlers/jobs.py @@ -23,6 +23,7 @@ CMMissingScriptInputError, test_type_and_raise, ) +from ..common.utils import yaml_dump from ..db.element import ElementMixin from ..db.job import Job from ..db.script import Script @@ -183,8 +184,10 @@ async def _write_script( with contextlib.suppress(OSError): await run_in_threadpool(os.makedirs, os.path.dirname(script_url), exist_ok=True) - with open(config_url, "w", encoding="utf-8") as fout: - yaml.dump(workflow_config, fout) + try: + await run_in_threadpool(yaml_dump, workflow_config, config_url) + except yaml.YAMLError as yaml_error: + raise yaml.YAMLError(f"Error writing a script to run BPS job {script}; threw {yaml_error}") return StatusEnum.prepared async def _check_slurm_job( @@ -341,7 +344,7 @@ async def _load_wms_reports( ) -> StatusEnum | None: """Load the job processing info - Paramters + Parameters --------- job: Job Job in question