Skip to content

Commit

Permalink
More asyncification in src/lsst/cmservice/common/bash.py
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Dec 17, 2024
1 parent c9af606 commit dd3c145
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 85 deletions.
136 changes: 78 additions & 58 deletions src/lsst/cmservice/common/bash.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
"""Utility functions for working with bash scripts"""

import contextlib
import os
import subprocess
from pathlib import Path
from typing import Any

import yaml
from fastapi.concurrency import run_in_threadpool

from ..common.utils import read_lines
from ..common.utils import check_file_exists, read_lines, write_string, yaml_dump, yaml_safe_load
from .enums import StatusEnum
from .errors import CMBashSubmitError

Expand All @@ -28,7 +26,7 @@ async def get_diagnostic_message(
-------
The last line of the log file, potentially containing a diagnostic message.
"""
if not os.path.exists(log_url):
if not await run_in_threadpool(check_file_exists, log_url):
return f"Log file {log_url} does not exist"
try:
lines = await run_in_threadpool(read_lines, log_url)
Expand Down Expand Up @@ -59,7 +57,8 @@ async def parse_bps_stdout(url: str) -> dict[str, str]:

def sync_parse_bps_stdout(url: str) -> dict[str, str]:
"""Parse the std from a bps submit job. Synchronous function using
standard readline.
standard readline. More work should be done to make this function work in
the async world.
Parameters
----------
Expand All @@ -84,13 +83,15 @@ def sync_parse_bps_stdout(url: str) -> dict[str, str]:
return out_dict


def run_bash_job(
async def run_bash_job(
script_url: str,
log_url: str,
stamp_url: str,
fake_status: StatusEnum | None = None,
) -> None:
"""Run a bash job
"""Run a bash job. Most of the work is done in subprocesses run in a
companion synchronous function, `sync_submit_file_to_run_in_bash`, which is
wrapped in `fastapi.concurrency.run_in_threadpool`.
Parameters
----------
Expand All @@ -107,35 +108,53 @@ def run_bash_job(
If set, don't actually submit the job
"""
if fake_status is not None:
with open(stamp_url, "w", encoding="utf-8") as fstamp:
fields = dict(status="reviewable")
yaml.dump(fields, fstamp)
fields = dict(status="reviewable")
await run_in_threadpool(yaml_dump, fields, stamp_url)
return
try:
with open(log_url, "w", encoding="utf-8") as fout:
os.system(f"chmod +x {script_url}")
with subprocess.Popen(
[os.path.abspath(script_url)],
stdout=fout,
stderr=fout,
) as process:
process.wait()
if process.returncode != 0: # pragma: no cover
assert process.stderr
msg = process.stderr.read().decode()
raise CMBashSubmitError(f"Bad bash submit: {msg}")
await run_in_threadpool(sync_submit_file_to_run_in_bash, script_url, log_url)
except Exception as msg:
raise CMBashSubmitError(f"Bad bash submit: {msg}") from msg
with open(stamp_url, "w", encoding="utf-8") as fstamp:
fields = dict(status="accepted")
yaml.dump(fields, fstamp)
fields = dict(status="accepted")
await run_in_threadpool(yaml_dump, fields, stamp_url)


def check_stamp_file(
def sync_submit_file_to_run_in_bash(script_url: str, log_url: str) -> None:
"""Make a script executable, then submit to run in bash. To be wrapped in
`fastapi.concurrency.run_in_threadpool` by the asynchronous function above,
`run_bash_job`. Just a quick attempt to wrap the process; more work should
be done here to make this function async-friendly.
Parameters
----------
script_url : `str`
Path to the script to run.
log_url : `str`
Path to output the logs.
"""
with open(log_url, "w", encoding="utf-8") as fout:
script_path = Path(script_url)
if script_path.exists():
script_path.chmod(0o755)
else:
raise CMBashSubmitError(f"No script at path {script_url}")
with subprocess.Popen(
[script_path.resolve()],
stdout=fout,
stderr=fout,
) as process:
process.wait()
if process.returncode != 0: # pragma: no cover
assert process.stderr
msg = process.stderr.read().decode()
raise CMBashSubmitError(f"Bad bash submit: {msg}")


async def check_stamp_file(
stamp_file: str | None,
default_status: StatusEnum,
) -> StatusEnum:
"""Check a 'stamp' file for a status code
"""Check a 'stamp' file for a status code.
Parameters
----------
Expand All @@ -152,70 +171,71 @@ def check_stamp_file(
"""
if stamp_file is None:
return default_status
if not os.path.exists(stamp_file):
file_exists = await run_in_threadpool(check_file_exists, stamp_file)
if not file_exists:
return default_status
with open(stamp_file, encoding="utf-8") as fin:
fields = yaml.safe_load(fin)
return StatusEnum[fields["status"]]
fields = await run_in_threadpool(yaml_safe_load, stamp_file)
return StatusEnum[fields["status"]]


def write_bash_script(
async def write_bash_script(
script_url: str,
command: str,
**kwargs: Any,
) -> str:
"""Utility function to write a bash script for later execution
) -> Path:
"""Utility function to write a bash script for later execution.
Parameters
----------
script_url: str
script_url: `str`
Location to write the script
command: str
command: `str`
Main command line(s) in the script
Keywords
--------
prepend: str | None
prepend: `str | None`
Text to prepend before command
append: str | None
append: `str | None`
Test to append after command
stamp: str | None
stamp: `str | None`
Text to echo to stamp file when script completes
stamp_url: str | None
stamp_url: `str | None`
Stamp file to write to when script completes
fake: str | None
fake: `str | None`
Echo command instead of running it
rollback: str | None
rollback: `str | Path | None`
Prefix to script_url used when rolling back
processing
Returns
-------
script_url : str
script_url : `str`
The path to the newly written script
"""
prepend = kwargs.get("prepend")
append = kwargs.get("append")
fake = kwargs.get("fake")
rollback_prefix = kwargs.get("rollback", "")

script_url = f"{rollback_prefix}{script_url}"
with contextlib.suppress(OSError):
os.makedirs(os.path.dirname(script_url))

with open(script_url, "w", encoding="utf-8") as fout:
if prepend:
fout.write(f"{prepend}\n")
if fake:
command = f"echo '{command}'"
fout.write(command)
fout.write("\n")
if append:
fout.write(f"{append}\n")
return script_url
if isinstance(rollback_prefix, Path):
script_path = rollback_prefix / script_url
else:
script_path = Path(f"{rollback_prefix}{script_url}")
script_path.parent.mkdir(parents=True, exist_ok=True)
# I chose not to make the directory `with contextlib.suppress(OSError)`
# because cm-service under-reports a lot of the issues it might run into
# that we need to respond to better. I think most relevant situations will
# be handled by `parents=True, exist_ok=True`, but will revert to the above
# suppression if I am wrong.
if fake:
command = f"echo '{command}'"
contents = (prepend if prepend else "") + "\n" + command + "\n" + (append if append else "")
await run_in_threadpool(write_string, contents, script_path)
return script_path
50 changes: 50 additions & 0 deletions src/lsst/cmservice/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import sys
from collections.abc import Iterator, Mapping
from pathlib import Path
from typing import Any

import yaml
Expand Down Expand Up @@ -101,3 +102,52 @@ def read_lines(filename: str) -> list:
with open(filename, encoding="utf-8") as fin:
lines = fin.readlines()
return lines


def check_file_exists(file: str | Path) -> bool:
"""`pathlib.Path.exists`, to be wrapped in
`fastapi.concurrency.run_in_threadpool`
Parameters
----------
file : `str` | `pathlib.Path`
The path to the file.
Returns
-------
A boolean value: does the file exist?
"""
if isinstance(file, str):
file = Path(file)
return file.exists()


def write_string(contents: str, file: str | Path) -> None:
"""Write string to a file. To be wrapped in
`fastapi.concurrency.run_in_threadpool`
Parameters
----------
contents : `str`
The contents to be written.
file : `str | Path`
The path to write the file.
"""
with open(file, "w") as fout:
fout.write(contents)


def unlink_path(file: str | Path) -> None:
"""Check that a file exists, then delete the file using
`pathlib.Path.unlink`. To be wrapped in
`fastapi.concurrency.run_in_threadpool`.
Parameters
----------
file : `str | Path`
The path to the file to be deleted.
"""
if isinstance(file, str):
file = Path(file)
if file.exists():
file.unlink()
8 changes: 4 additions & 4 deletions src/lsst/cmservice/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async def _write_script(
prepend += f"\n{custom_lsst_setup}\n"
prepend += bps_wms_script_template_.data["text"] # type: ignore

await run_in_threadpool(write_bash_script, script_url, command, prepend=prepend)
await write_bash_script(script_url, command, prepend=prepend)

workflow_config = bps_core_yaml_template_.data.copy() # type: ignore

Expand Down Expand Up @@ -212,7 +212,7 @@ async def _check_slurm_job(
if fake_status is not None:
wms_job_id = "fake_job"
else: # pragma: no cover
bps_dict = parse_bps_stdout(script.log_url) # type: ignore
bps_dict = await parse_bps_stdout(script.log_url) # type: ignore
wms_job_id = self.get_job_id(bps_dict)
await parent.update_values(session, wms_job_id=wms_job_id)
return slurm_status
Expand All @@ -238,7 +238,7 @@ async def _check_htcondor_job(
if fake_status is not None:
wms_job_id = "fake_job"
else: # pragma: no cover
bps_dict = parse_bps_stdout(script.log_url) # type: ignore
bps_dict = await parse_bps_stdout(script.log_url) # type: ignore
wms_job_id = self.get_job_id(bps_dict)
await parent.update_values(session, wms_job_id=wms_job_id)
return htcondor_status
Expand Down Expand Up @@ -492,7 +492,7 @@ async def _write_script(
prepend = "\n".join([line.strip() for line in prepend.splitlines()])

command = f"pipetask report --full-output-filename {report_url} {butler_repo} {graph_url}"
write_bash_script(script_url, command, prepend=prepend)
await write_bash_script(script_url, command, prepend=prepend)

return StatusEnum.prepared

Expand Down
4 changes: 2 additions & 2 deletions src/lsst/cmservice/handlers/script_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ async def _check_stamp_file( # pylint: disable=unused-argument
The status of the processing
"""
default_status = script.status if fake_status is None else fake_status
status = check_stamp_file(stamp_file, default_status)
status = await check_stamp_file(stamp_file, default_status)
await script.update_values(session, status=status)
return status

Expand Down Expand Up @@ -473,7 +473,7 @@ async def launch(
if script_method == ScriptMethodEnum.bash:
if not script.stamp_url: # pragma: no cover
raise CMMissingNodeUrlError(f"log_url is not set for {script}")
run_bash_job(script.script_url, script.log_url, script.stamp_url, **kwargs)
await run_bash_job(script.script_url, script.log_url, script.stamp_url, **kwargs)
status = StatusEnum.running
await script.update_values(session, status=status)
elif script_method == ScriptMethodEnum.slurm:
Expand Down
Loading

0 comments on commit dd3c145

Please sign in to comment.