Skip to content

Commit

Permalink
Addressed initial ruff async errors, added helpful error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Dec 17, 2024
1 parent 6fe06b7 commit c9af606
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 28 deletions.
56 changes: 51 additions & 5 deletions src/lsst/cmservice/common/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from typing import Any

import yaml
from fastapi.concurrency import run_in_threadpool

from ..common.utils import read_lines
from .enums import StatusEnum
from .errors import CMBashSubmitError

Expand All @@ -15,16 +17,60 @@ async def get_diagnostic_message(
log_url: str,
) -> str:
"""Read the last line of a log file, aspirational hoping
that it contains a diagnostic error message"""
with open(log_url, encoding="utf-8") as fin:
lines = fin.readlines()
that it contains a diagnostic error message
Parameters
----------
log_url : `str`
The url of the log which may contain a diagnostic message
Returns
-------
The last line of the log file, potentially containing a diagnostic message.
"""
if not os.path.exists(log_url):
return f"Log file {log_url} does not exist"
try:
lines = await run_in_threadpool(read_lines, log_url)
if lines:
return lines[-1].strip()
return "Empty log file"
except Exception as e:
return f"Error reading log file: {e}"


async def parse_bps_stdout(url: str) -> dict[str, str]:
"""Parse the std from a bps submit job. Wraps the synchronous function
and passes to `fastapi.concurrency.run_in_threadpool`
Parameters
----------
url : `str`
url for BPS submit stdout
def parse_bps_stdout(url: str) -> dict[str, str]:
"""Parse the std from a bps submit job"""
Returns
-------
out_dict `str`
a dictionary containing the stdout from BPS submit
"""
out_dict = await run_in_threadpool(sync_parse_bps_stdout, url)
return out_dict


def sync_parse_bps_stdout(url: str) -> dict[str, str]:
"""Parse the std from a bps submit job. Synchronous function using
standard readline.
Parameters
----------
url : `str`
url for BPS submit stdout
Returns
-------
out_dict `str`
a dictionary containing the stdout from BPS submit
"""
out_dict = {}
with open(url, encoding="utf8") as fin:
line = fin.readline()
Expand Down
4 changes: 2 additions & 2 deletions src/lsst/cmservice/common/daemon.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from datetime import datetime, timedelta
from time import sleep

from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.future import select
Expand Down Expand Up @@ -32,4 +32,4 @@ async def daemon_iteration(session: async_scoped_session) -> None:
async def daemon_loop(session: async_scoped_session) -> None: # pragma: no cover
while True:
await daemon_iteration(session)
sleep(15)
await asyncio.sleep(15)
2 changes: 1 addition & 1 deletion src/lsst/cmservice/common/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class CMButlerCallError(RuntimeError):


class CMSpecficiationError(KeyError):
"""Raised when Specification calls out an non-existing fragement"""
"""Raised when Specification calls out an non-existing fragment"""


class CMTooFewAcceptedJobsError(KeyError):
Expand Down
62 changes: 62 additions & 0 deletions src/lsst/cmservice/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from collections.abc import Iterator, Mapping
from typing import Any

import yaml


@contextlib.contextmanager
def add_sys_path(path: os.PathLike | str | None) -> Iterator[None]:
Expand Down Expand Up @@ -39,3 +41,63 @@ def update_include_dict(
orig_dict[key].update(val)
else:
orig_dict[key] = val


# Synchronous file I/O utility functions included mostly to pass to
# `fastapi.concurrency.run_in_threadpool`


def yaml_safe_load(filename: str) -> dict:
"""Helper function to open yaml files using `yaml.safe_load`. To be used
with `fastapi.concurrency.run_in_threadpool`
Parameters
----------
filename : `str`
The path to the yaml file.
Returns
-------
A dictionary returned by `yaml.safe_load`
"""
with open(filename, encoding="utf-8") as file:
return yaml.safe_load(file)


def yaml_dump(contents: str | dict | list, filename: str) -> None:
"""Helper function to open yaml files using `yaml.dump`. To be used
with `fastapi.concurrency.run_in_threadpool`
Parameters
----------
contents: `str` | `Dict` | `list`
The contents to be dumped in a yaml file.
filename : `str`
The path where the yaml file will be written.
Returns
-------
A string, if the `dump` fails.
"""
with open(filename, "w", encoding="utf-8") as fout:
yaml.dump(contents, fout)


def read_lines(filename: str) -> list:
"""Helper function to open a file and return each line, string-by-string.
Just here to wrap the `with` and pass function to
`fastapi.concurrency.run_in_threadpool`
Parameters
----------
filename : `str`
The path to the yaml file.
Returns
-------
A list of full of the lines comprising the file, returned by standard
python `readlines`.
"""
with open(filename, encoding="utf-8") as fin:
lines = fin.readlines()
return lines
3 changes: 1 addition & 2 deletions src/lsst/cmservice/daemon.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
from datetime import datetime, timedelta
from time import sleep

import structlog
from safir.database import create_async_session, create_database_engine
Expand All @@ -26,7 +25,7 @@ async def main_loop() -> None:
logger.info(f"Daemon completed {iteration_count} iterations in {delta_seconds} seconds.")
last_log_time = datetime.now()
await daemon_iteration(session)
sleep(15)
await asyncio.sleep(15)


def main() -> None:
Expand Down
28 changes: 22 additions & 6 deletions src/lsst/cmservice/db/script_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
from typing import TYPE_CHECKING, Any

import yaml
from fastapi.concurrency import run_in_threadpool
from sqlalchemy import JSON
from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import Mapped, mapped_column

from ..common.errors import CMYamlParseError
from ..common.utils import yaml_safe_load
from .base import Base
from .row import RowMixin

Expand Down Expand Up @@ -78,9 +81,14 @@ async def load( # pylint: disable=too-many-arguments
Newly created `ScriptTemplate`
"""
full_file_path = os.path.abspath(os.path.expandvars(file_path))
with open(full_file_path, encoding="utf-8") as fin:
data = yaml.safe_load(fin)

if not os.path.exists(full_file_path):
raise CMYamlParseError(f"Script template does not exist at path {file_path}")
try:
data = await run_in_threadpool(yaml_safe_load, full_file_path)
except yaml.YAMLError as yaml_error:
raise CMYamlParseError(f"Error parsing YAML file: {yaml_error}") from yaml_error
except Exception as e:
raise CMYamlParseError(f"{e}") from e
return await cls.create_row(session, name=name, data=data)

async def update_from_file(
Expand Down Expand Up @@ -108,7 +116,15 @@ async def update_from_file(
Newly updated `ScriptTemplate`
"""
full_file_path = os.path.abspath(os.path.expandvars(file_path))
with open(full_file_path, encoding="utf-8") as fin:
data = yaml.safe_load(fin)

if not os.path.exists(full_file_path):
raise CMYamlParseError(f"Script template does not exist at path {file_path}")

try:
data = await run_in_threadpool(yaml_safe_load, full_file_path)
except yaml.YAMLError as yaml_error:
raise CMYamlParseError(
f"Error parsing YAML file at {file_path}; throws {yaml_error}"
) from yaml_error
except Exception as e:
raise CMYamlParseError(f"{e}") from e
return await self.update_values(session, name=name, data=data)
39 changes: 30 additions & 9 deletions src/lsst/cmservice/handlers/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any

import yaml
from fastapi.concurrency import run_in_threadpool
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_scoped_session

Expand All @@ -11,7 +12,7 @@

from ..common.enums import StatusEnum
from ..common.errors import CMMissingFullnameError, CMYamlParseError
from ..common.utils import update_include_dict
from ..common.utils import update_include_dict, yaml_safe_load
from ..db.campaign import Campaign
from ..db.job import Job
from ..db.pipetask_error import PipetaskError
Expand Down Expand Up @@ -237,8 +238,14 @@ async def load_specification(
loaded_specs = {}

specification = None
with open(yaml_file, encoding="utf-8") as fin:
spec_data = yaml.safe_load(fin)
if not os.path.exists(yaml_file):
raise CMYamlParseError(f"Specification does not exist at path {yaml_file}")
try:
spec_data = await run_in_threadpool(yaml_safe_load, yaml_file)
except yaml.YAMLError as yaml_error:
raise CMYamlParseError(f"Error parsing specification {yaml_file}; threw {yaml_error}") from yaml_error
except Exception as e:
raise CMYamlParseError(f"{e}") from e

for config_item in spec_data:
if "Imports" in config_item:
Expand Down Expand Up @@ -437,10 +444,16 @@ async def load_manifest_report(
job = await Job.get_row_by_fullname(session, job_name)
if fake_status is not None:
return job

with open(yaml_file, encoding="utf-8") as fin:
manifest_data = yaml.safe_load(fin)

if not os.path.exists(yaml_file):
raise CMYamlParseError(f"Manifest report yaml does not exist at path {yaml_file}")
try:
manifest_data = await run_in_threadpool(yaml_safe_load, yaml_file)
except yaml.YAMLError as yaml_error:
raise CMYamlParseError(
f"Error parsing manifest report yaml at path {yaml_file}; threw {yaml_error}"
) from yaml_error
except Exception as e:
raise CMYamlParseError(f"{e}") from e
for task_name_, task_data_ in manifest_data.items():
failed_quanta = task_data_.get("failed_quanta", {})
outputs = task_data_.get("outputs", {})
Expand Down Expand Up @@ -682,8 +695,16 @@ async def load_error_types(
error_types : list[PipetaskErrorType]
Newly loaded error types
"""
with open(yaml_file, encoding="utf-8") as fin:
error_types = yaml.safe_load(fin)
if not os.path.exists(yaml_file):
raise CMYamlParseError(f"Error type yaml does not exist at path {yaml_file}")
try:
error_types = await run_in_threadpool(yaml_safe_load, yaml_file)
except yaml.YAMLError as yaml_error:
raise CMYamlParseError(
f"Error parsing error type yaml at path {yaml_file}; threw {yaml_error}"
) from yaml_error
except Exception as e:
raise CMYamlParseError(f"{e}") from e

ret_list: list[PipetaskErrorType] = []
for error_type_ in error_types:
Expand Down
9 changes: 6 additions & 3 deletions src/lsst/cmservice/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
CMMissingScriptInputError,
test_type_and_raise,
)
from ..common.utils import yaml_dump
from ..db.element import ElementMixin
from ..db.job import Job
from ..db.script import Script
Expand Down Expand Up @@ -183,8 +184,10 @@ async def _write_script(
with contextlib.suppress(OSError):
await run_in_threadpool(os.makedirs, os.path.dirname(script_url), exist_ok=True)

with open(config_url, "w", encoding="utf-8") as fout:
yaml.dump(workflow_config, fout)
try:
await run_in_threadpool(yaml_dump, workflow_config, config_url)
except yaml.YAMLError as yaml_error:
raise yaml.YAMLError(f"Error writing a script to run BPS job {script}; threw {yaml_error}")
return StatusEnum.prepared

async def _check_slurm_job(
Expand Down Expand Up @@ -341,7 +344,7 @@ async def _load_wms_reports(
) -> StatusEnum | None:
"""Load the job processing info
Paramters
Parameters
---------
job: Job
Job in question
Expand Down

0 comments on commit c9af606

Please sign in to comment.