Skip to content

Commit

Permalink
Add Job dispatcher (#360)
Browse files Browse the repository at this point in the history
* Update aiaccel/job

* fix lint error

* fix aiaccel/job/dispatcher.py

* wip

* Fix issues pointed out in review

* rename

* fix test

* fix for test

* fix for test

* fix for test

* fix for test

* Reflected code review feedback

* fix test

* fix test

* refactoring

* Run code formatter ruff

* refactoring

* Run code formatter ruff

* fix lint.yaml

* fix pypoject.toml

* fix mypy.ini

* fix mypy.ini

* fix mypy.ini

* fix mypy.ini

* fix mypy.ini

* Refactor abci jobs

* Update pyproject.toml

* adjusted to work in the ABCI environment.

* fix lint error

* fix test

* fix test

* Changed from unittest to pytest

* Incorporate revisions based on review feedback

* Add test items

* fix test

* fix test

* fix test

* remove __future__.annotations

* A bit simplify qstat_xml

* Simplify tests

---------

Co-authored-by: Yoshiaki Bando <yoshipon@users.noreply.github.com>
  • Loading branch information
aramoto99 and yoshipon authored Jun 19, 2024
1 parent 4ac7e6e commit b8c0b1b
Show file tree
Hide file tree
Showing 16 changed files with 882 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
pip install .[dev,github-actions]
- name: Perform ruff
run: |
ruff check
ruff check
ruff format --check
- name: Perform mypy
run: |
Expand Down
4 changes: 4 additions & 0 deletions aiaccel/job/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from aiaccel.job.abci_job import AbciJob, JobStatus
from aiaccel.job.abci_job_manager import AbciJobExecutor

__all__ = ["AbciJob", "JobStatus", "AbciJobExecutor"]
222 changes: 222 additions & 0 deletions aiaccel/job/abci_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
from __future__ import annotations

import re
import subprocess
import time
from enum import IntEnum, auto
from pathlib import Path
from typing import Any
from xml.etree import ElementTree


class JobStatus(IntEnum):
"""
Represents the status of a job.
Attributes:
UNSUBMITTED: The job has not been submitted.
WAITING: The job is waiting to be executed.
RUNNING: The job is currently running.
FINISHED: The job has finished successfully.
ERROR: The job encountered an error.
Methods:
from_qsub(status: str) -> JobStatus:
Converts a status string from the qsub command to a JobStatus enum value.
Raises:
ValueError: If the status string is not recognized.
"""

UNSUBMITTED = auto()
WAITING = auto()
RUNNING = auto()
FINISHED = auto()
ERROR = auto()

@classmethod
def from_qsub(cls, status: str) -> JobStatus:
"""
Converts a status string from the qsub command to a JobStatus enum value.
Args:
status (str): The status string from the qsub command.
Returns:
JobStatus: The corresponding JobStatus enum value.
Raises:
ValueError: If the status string is not recognized.
"""
match status:
case "r":
return JobStatus.RUNNING
case "qw" | "h" | "t" | "s" | "S" | "T" | "Rq":
return JobStatus.WAITING
case "d" | "Rr":
return JobStatus.RUNNING
case "E":
return JobStatus.ERROR
case _:
raise ValueError(f"Unexpected status: {status}")


class AbciJob:
"""
Represents a job to be submitted and managed on the ABCI system.
Attributes:
job_filename (Path): The path to the job file.
job_group (str): The job group.
job_name (str): The name of the job.
cwd (Path): The current working directory.
stdout_filename (Path): The path to the standard output file.
stderr_filename (Path): The path to the standard error file.
tag (Any): A tag associated with the job.
status (JobStatus): The status of the job.
job_number (int | None): The job number assigned by the system.
Methods:
submit: Submits the job to the system.
update_status: Updates the status of the job.
wait: Waits for the job to finish.
update_status_batch: Updates the status of a batch of jobs.
"""

job_filename: Path
job_group: str

job_name: str

cwd: Path
stdout_filename: Path
stderr_filename: Path

tag: Any

status: JobStatus
job_number: int | None

def __init__(
self,
job_filename: Path | str,
job_group: str,
job_name: str | None = None,
cwd: Path | str | None = None,
stdout_filename: Path | str | None = None,
stderr_filename: Path | str | None = None,
qsub_args: list[str] | None = None,
args: list[str] | None = None,
tag: Any = None,
):
"""
Initializes a new instance of the AbciJob class.
Args:
job_filename (Path | str): The path to the job file.
job_group (str): The job group.
job_name (str | None, optional): The name of the job. If not provided, \
the name will be derived from the job filename.
cwd (Path | str | None, optional): The current working directory. If not provided, \
the current working directory will be used.
stdout_filename (Path | str | None, optional): The path to the standard output file. If not provided, \
a default filename will be used.
stderr_filename (Path | str | None, optional): The path to the standard error file. If not provided, \
a default filename will be used.
qsub_args (list[str] | None, optional): Additional arguments to pass to the qsub command. Defaults to None.
args (list[str] | None, optional): Additional arguments to pass to the job file. Defaults to None.
tag (Any, optional): A tag associated with the job. Defaults to None.
"""
self.job_filename = Path(job_filename)
self.job_group = job_group
self.job_name = job_name if job_name is not None else self.job_filename.name

self.cwd = Path(cwd) if cwd is not None else Path.cwd()
self.stdout_filename = Path(stdout_filename) if stdout_filename is not None else self.cwd / f"{self.job_name}.o"
self.stderr_filename = Path(stderr_filename) if stderr_filename is not None else self.cwd / f"{self.job_name}.o"

self.tag = tag

self.status = JobStatus.UNSUBMITTED
self.job_number = None

# generate qsub command
self.cmd = ["qsub", "-g", job_group, "-o", str(self.stdout_filename), "-e", str(self.stderr_filename)]
if self.job_name is not None:
self.cmd += ["-N", self.job_name]
if qsub_args is not None:
self.cmd += [arg.format(job=self) for arg in qsub_args]
self.cmd += [str(self.job_filename)]
if args is not None:
self.cmd += [arg.format(job=self) for arg in args]

def submit(self) -> AbciJob:
"""
Submits the job to the system.
Returns:
AbciJob: The submitted job.
Raises:
RuntimeError: If the job is already submitted.
RuntimeError: If the qsub result cannot be parsed.
"""
if self.status >= JobStatus.WAITING:
raise RuntimeError(f"This job is already submited as {self.job_name} (id: {self.job_number})")

p = subprocess.run(self.cmd, capture_output=True, text=True, check=True)

match = re.search(r"Your job (\d+)", p.stdout)
if match is None:
raise RuntimeError(f"The following qsub result cannot be parsed: {p.stdout}")

self.job_number = int(match.group(1))
self.status = JobStatus.WAITING

return self

def update_status(self) -> JobStatus:
"""
Updates the status of the job.
Returns:
JobStatus: The updated status of the job.
"""
self.update_status_batch([self])
return self.status

def wait(self, sleep_time: float = 10.0) -> AbciJob:
"""
Waits for the job to finish.
Args:
sleep_time (float, optional): The time to sleep between status updates. Defaults to 10.0.
Returns:
AbciJob: The finished job.
"""
while self.update_status() < JobStatus.FINISHED:
time.sleep(sleep_time)

return self

@classmethod
def update_status_batch(cls, job_list: list[AbciJob]) -> None:
"""
Updates the status of a batch of jobs.
Args:
job_list (list[AbciJob]): The list of jobs to update.
"""
job_dict = {j.job_number: j for j in job_list if j.status not in [JobStatus.UNSUBMITTED, JobStatus.FINISHED]}
p = subprocess.run(["qstat", "-xml"], capture_output=True, text=True, check=True)

status_dict: dict[int, str] = {}
for el in ElementTree.fromstring(p.stdout).iter("job_list"):
status_dict[int(el.findtext("JB_job_number", default=-1))] = el.findtext("state", "")

for job_number in set(job_dict.keys()) - set(status_dict.keys()):
job_dict[job_number].status = JobStatus.FINISHED

for job_number, status in status_dict.items():
job_dict[job_number].status = JobStatus.from_qsub(status)
112 changes: 112 additions & 0 deletions aiaccel/job/abci_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import time
from pathlib import Path
from typing import Any

from aiaccel.job.abci_job import AbciJob, JobStatus


class AbciJobExecutor:
"""
Executes and manages jobs on the ABCI system.
Attributes:
job_filename (Path): The path to the job file.
job_group (str): The group to which the job belongs.
job_name (str | None): The name of the job.
work_dir (Path): The working directory for the job.
n_max_jobs (int): The maximum number of jobs.
job_list (list[AbciJob]): The list of submitted jobs.
Methods:
submit: Submits a job to the job manager.
available_slots: Returns the number of available slots for new jobs.
collect_finished: Collects and removes all finished jobs from the job list.
"""

def __init__(
self,
job_filename: Path | str,
job_group: str,
job_name: str | None = None,
work_dir: Path | str | None = None,
n_max_jobs: int = 100,
):
"""
Initialize the AbciJobManager object.
Args:
job_filename (Path | str): The path or filename of the job.
job_group (str): The group to which the job belongs.
job_name (str | None, optional): The name of the job. Defaults to None.
work_dir (Path | str | None, optional): The working directory for the job. Defaults to None.
n_max_jobs (int, optional): The maximum number of jobs. Defaults to 100.
"""
self.job_filename = job_filename if isinstance(job_filename, Path) else Path(job_filename)
self.job_group = job_group
self.job_name = job_name

self.work_dir = Path(work_dir) if work_dir is not None else Path.cwd()
self.work_dir.mkdir(parents=True, exist_ok=True)

self.n_max_jobs = n_max_jobs

self.job_list: list[AbciJob] = []

def submit(
self,
args: list[str],
tag: Any = None,
sleep_time: float = 5.0,
) -> AbciJob:
"""
Submits a job to the job manager.
Args:
args (list[str]): The arguments for the job.
tag (Any, optional): A tag to associate with the job. Defaults to None.
sleep_time (float, optional): The sleep time between checking for available slots. Defaults to 5.0.
Returns:
AbciJob: The submitted job.
"""
while self.available_slots() == 0:
time.sleep(sleep_time)

job = AbciJob(
self.job_filename,
self.job_group,
job_name=self.job_name,
args=args,
tag=tag,
)

job.submit()
self.job_list.append(job)

return job

def available_slots(self) -> int:
"""
Returns the number of available slots for new jobs.
The available slots are calculated by subtracting the number of finished jobs
from the maximum number of jobs allowed.
Returns:
int: The number of available slots.
"""
AbciJob.update_status_batch(self.job_list)
return self.n_max_jobs - len([job for job in self.job_list if job.status < JobStatus.FINISHED])

def collect_finished(self) -> list[AbciJob]:
"""
Collects and removes all finished jobs from the job list.
Returns:
A list of finished AbciJob objects.
"""
finished_jobs = [job for job in self.job_list if job.status >= JobStatus.FINISHED]
for job in finished_jobs:
self.job_list.remove(job)

return finished_jobs
Loading

0 comments on commit b8c0b1b

Please sign in to comment.