Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Job dispatcher #360

Merged
merged 46 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d1903e6
Update aiaccel/job
aramoto99 Mar 8, 2024
fdf8986
fix lint error
aramoto99 Mar 11, 2024
1d25a39
fix aiaccel/job/dispatcher.py
aramoto99 Mar 19, 2024
c88174a
Merge branch 'develop/v2' into feature/v2-jobexecutor
yoshipon Apr 7, 2024
2080e45
wip
aramoto99 Mar 27, 2024
da2b2bd
Fix issues pointed out in review
aramoto99 Apr 8, 2024
c6c547b
rename
aramoto99 Apr 8, 2024
a6c14d8
fix test
aramoto99 Apr 8, 2024
2bce208
fix for test
aramoto99 Apr 8, 2024
c331dbf
fix for test
aramoto99 Apr 8, 2024
af8ad24
fix for test
aramoto99 Apr 8, 2024
95efab8
fix for test
aramoto99 Apr 8, 2024
202fcea
Reflected code review feedback
aramoto99 Apr 18, 2024
8bea495
fix test
aramoto99 Apr 18, 2024
ef2cef3
fix test
aramoto99 Apr 18, 2024
725f098
refactoring
aramoto99 Apr 18, 2024
01d4b25
Run code formatter ruff
aramoto99 Apr 18, 2024
0664e1a
refactoring
aramoto99 Apr 18, 2024
be78145
Run code formatter ruff
aramoto99 Apr 18, 2024
c1ccb90
fix lint.yaml
aramoto99 Apr 19, 2024
3f1cf3f
fix pypoject.toml
aramoto99 Apr 19, 2024
4d2a620
fix mypy.ini
aramoto99 Apr 19, 2024
e123115
fix mypy.ini
aramoto99 Apr 19, 2024
c1b64f9
fix mypy.ini
aramoto99 Apr 19, 2024
92da65e
fix mypy.ini
aramoto99 Apr 19, 2024
89979c0
fix mypy.ini
aramoto99 Apr 19, 2024
ec39354
Merge branch 'develop/v2' into feature/v2-jobexecutor
yoshipon Apr 20, 2024
fd11dab
Refactor abci jobs
yoshipon Apr 21, 2024
c0cf4d1
Merge branch 'feature/v2-jobexecutor' of github.com:aistairc/aiaccel …
yoshipon Apr 21, 2024
4dea624
Update pyproject.toml
yoshipon Apr 21, 2024
257eeba
adjusted to work in the ABCI environment.
aramoto99 Apr 30, 2024
6377c83
fix lint error
aramoto99 Apr 30, 2024
16ca715
fix test
aramoto99 May 8, 2024
7a059f7
fix test
aramoto99 May 8, 2024
542bb13
Merge branch 'develop/v2' into feature/v2-jobexecutor
aramoto99 May 14, 2024
a4337e3
Changed from unittest to pytest
aramoto99 May 16, 2024
bead26c
Incorporate revisions based on review feedback
aramoto99 May 21, 2024
bbb8d1b
Add test items
aramoto99 May 24, 2024
515e9bf
Merge branch 'develop/v2' into feature/v2-jobexecutor
yoshipon Jun 10, 2024
e0fc243
fix test
aramoto99 Jun 19, 2024
fc82455
fix test
aramoto99 Jun 19, 2024
1809675
fix test
aramoto99 Jun 19, 2024
f966062
remove __future__.annotations
yoshipon Jun 19, 2024
e829c02
A bit simplify qstat_xml
yoshipon Jun 19, 2024
b7ecd51
Simplify tests
yoshipon Jun 19, 2024
6feff5d
Merge branch 'develop/v2' into feature/v2-jobexecutor
yoshipon Jun 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
pip install .[dev,github-actions]
- name: Perform ruff
run: |
ruff check
ruff check
ruff format --check
- name: Perform mypy
run: |
Expand Down
4 changes: 4 additions & 0 deletions aiaccel/job/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from aiaccel.job.abci_job import AbciJob, JobStatus
from aiaccel.job.abci_job_manager import AbciJobExecutor

__all__ = ["AbciJob", "JobStatus", "AbciJobExecutor"]
222 changes: 222 additions & 0 deletions aiaccel/job/abci_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
from __future__ import annotations

import re
import subprocess
import time
from enum import IntEnum, auto
from pathlib import Path
from typing import Any
from xml.etree import ElementTree


class JobStatus(IntEnum):
"""
Represents the status of a job.

Attributes:
UNSUBMITTED: The job has not been submitted.
WAITING: The job is waiting to be executed.
RUNNING: The job is currently running.
FINISHED: The job has finished successfully.
ERROR: The job encountered an error.

Methods:
from_qsub(status: str) -> JobStatus:
Converts a status string from the qsub command to a JobStatus enum value.

Raises:
ValueError: If the status string is not recognized.
"""

UNSUBMITTED = auto()
WAITING = auto()
RUNNING = auto()
FINISHED = auto()
ERROR = auto()

@classmethod
def from_qsub(cls, status: str) -> JobStatus:
"""
Converts a status string from the qsub command to a JobStatus enum value.

Args:
status (str): The status string from the qsub command.

Returns:
JobStatus: The corresponding JobStatus enum value.

Raises:
ValueError: If the status string is not recognized.
"""
match status:
case "r":
return JobStatus.RUNNING
case "qw" | "h" | "t" | "s" | "S" | "T" | "Rq":
yoshipon marked this conversation as resolved.
Show resolved Hide resolved
return JobStatus.WAITING
case "d" | "Rr":
return JobStatus.RUNNING
case "E":
return JobStatus.ERROR
case _:
raise ValueError(f"Unexpected status: {status}")


class AbciJob:
"""
Represents a job to be submitted and managed on the ABCI system.

Attributes:
job_filename (Path): The path to the job file.
job_group (str): The job group.
job_name (str): The name of the job.
cwd (Path): The current working directory.
stdout_filename (Path): The path to the standard output file.
stderr_filename (Path): The path to the standard error file.
tag (Any): A tag associated with the job.
status (JobStatus): The status of the job.
job_number (int | None): The job number assigned by the system.

Methods:
submit: Submits the job to the system.
update_status: Updates the status of the job.
wait: Waits for the job to finish.
update_status_batch: Updates the status of a batch of jobs.
"""

job_filename: Path
job_group: str

job_name: str

cwd: Path
stdout_filename: Path
stderr_filename: Path

tag: Any

status: JobStatus
job_number: int | None

def __init__(
self,
job_filename: Path | str,
job_group: str,
job_name: str | None = None,
cwd: Path | str | None = None,
stdout_filename: Path | str | None = None,
stderr_filename: Path | str | None = None,
qsub_args: list[str] | None = None,
args: list[str] | None = None,
tag: Any = None,
):
"""
Initializes a new instance of the AbciJob class.

Args:
job_filename (Path | str): The path to the job file.
job_group (str): The job group.
job_name (str | None, optional): The name of the job. If not provided, \
the name will be derived from the job filename.
cwd (Path | str | None, optional): The current working directory. If not provided, \
the current working directory will be used.
stdout_filename (Path | str | None, optional): The path to the standard output file. If not provided, \
a default filename will be used.
stderr_filename (Path | str | None, optional): The path to the standard error file. If not provided, \
a default filename will be used.
qsub_args (list[str] | None, optional): Additional arguments to pass to the qsub command. Defaults to None.
args (list[str] | None, optional): Additional arguments to pass to the job file. Defaults to None.
tag (Any, optional): A tag associated with the job. Defaults to None.
"""
self.job_filename = Path(job_filename)
self.job_group = job_group
self.job_name = job_name if job_name is not None else self.job_filename.name

self.cwd = Path(cwd) if cwd is not None else Path.cwd()
self.stdout_filename = Path(stdout_filename) if stdout_filename is not None else self.cwd / f"{self.job_name}.o"
self.stderr_filename = Path(stderr_filename) if stderr_filename is not None else self.cwd / f"{self.job_name}.o"

self.tag = tag

self.status = JobStatus.UNSUBMITTED
self.job_number = None

# generate qsub command
self.cmd = ["qsub", "-g", job_group, "-o", str(self.stdout_filename), "-e", str(self.stderr_filename)]
if self.job_name is not None:
self.cmd += ["-N", self.job_name]
if qsub_args is not None:
self.cmd += [arg.format(job=self) for arg in qsub_args]
self.cmd += [str(self.job_filename)]
if args is not None:
self.cmd += [arg.format(job=self) for arg in args]

def submit(self) -> AbciJob:
"""
Submits the job to the system.

Returns:
AbciJob: The submitted job.

Raises:
RuntimeError: If the job is already submitted.
RuntimeError: If the qsub result cannot be parsed.
"""
if self.status >= JobStatus.WAITING:
raise RuntimeError(f"This job is already submited as {self.job_name} (id: {self.job_number})")

p = subprocess.run(self.cmd, capture_output=True, text=True, check=True)

match = re.search(r"Your job (\d+)", p.stdout)
if match is None:
raise RuntimeError(f"The following qsub result cannot be parsed: {p.stdout}")

self.job_number = int(match.group(1))
self.status = JobStatus.WAITING

return self

def update_status(self) -> JobStatus:
"""
Updates the status of the job.

Returns:
JobStatus: The updated status of the job.
"""
self.update_status_batch([self])
return self.status

def wait(self, sleep_time: float = 10.0) -> AbciJob:
"""
Waits for the job to finish.

Args:
sleep_time (float, optional): The time to sleep between status updates. Defaults to 10.0.

Returns:
AbciJob: The finished job.
"""
while self.update_status() < JobStatus.FINISHED:
time.sleep(sleep_time)

return self

@classmethod
def update_status_batch(cls, job_list: list[AbciJob]) -> None:
"""
Updates the status of a batch of jobs.

Args:
job_list (list[AbciJob]): The list of jobs to update.
"""
job_dict = {j.job_number: j for j in job_list if j.status not in [JobStatus.UNSUBMITTED, JobStatus.FINISHED]}
p = subprocess.run(["qstat", "-xml"], capture_output=True, text=True, check=True)

status_dict: dict[int, str] = {}
for el in ElementTree.fromstring(p.stdout).iter("job_list"):
status_dict[int(el.findtext("JB_job_number", default=-1))] = el.findtext("state", "")

for job_number in set(job_dict.keys()) - set(status_dict.keys()):
job_dict[job_number].status = JobStatus.FINISHED

for job_number, status in status_dict.items():
job_dict[job_number].status = JobStatus.from_qsub(status)
112 changes: 112 additions & 0 deletions aiaccel/job/abci_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import time
from pathlib import Path
from typing import Any

from aiaccel.job.abci_job import AbciJob, JobStatus


class AbciJobExecutor:
"""
Executes and manages jobs on the ABCI system.

Attributes:
job_filename (Path): The path to the job file.
job_group (str): The group to which the job belongs.
job_name (str | None): The name of the job.
work_dir (Path): The working directory for the job.
n_max_jobs (int): The maximum number of jobs.
job_list (list[AbciJob]): The list of submitted jobs.

Methods:
submit: Submits a job to the job manager.
available_slots: Returns the number of available slots for new jobs.
collect_finished: Collects and removes all finished jobs from the job list.
"""

def __init__(
self,
job_filename: Path | str,
job_group: str,
job_name: str | None = None,
work_dir: Path | str | None = None,
n_max_jobs: int = 100,
):
"""
Initialize the AbciJobManager object.

Args:
job_filename (Path | str): The path or filename of the job.
job_group (str): The group to which the job belongs.
job_name (str | None, optional): The name of the job. Defaults to None.
work_dir (Path | str | None, optional): The working directory for the job. Defaults to None.
n_max_jobs (int, optional): The maximum number of jobs. Defaults to 100.
"""
self.job_filename = job_filename if isinstance(job_filename, Path) else Path(job_filename)
self.job_group = job_group
self.job_name = job_name

self.work_dir = Path(work_dir) if work_dir is not None else Path.cwd()
self.work_dir.mkdir(parents=True, exist_ok=True)

self.n_max_jobs = n_max_jobs

self.job_list: list[AbciJob] = []

def submit(
self,
args: list[str],
tag: Any = None,
sleep_time: float = 5.0,
) -> AbciJob:
"""
Submits a job to the job manager.

Args:
args (list[str]): The arguments for the job.
tag (Any, optional): A tag to associate with the job. Defaults to None.
sleep_time (float, optional): The sleep time between checking for available slots. Defaults to 5.0.

Returns:
AbciJob: The submitted job.
"""
while self.available_slots() == 0:
time.sleep(sleep_time)

job = AbciJob(
self.job_filename,
self.job_group,
job_name=self.job_name,
args=args,
tag=tag,
)

job.submit()
self.job_list.append(job)

return job

def available_slots(self) -> int:
"""
Returns the number of available slots for new jobs.

The available slots are calculated by subtracting the number of finished jobs
from the maximum number of jobs allowed.

Returns:
int: The number of available slots.
"""
AbciJob.update_status_batch(self.job_list)
return self.n_max_jobs - len([job for job in self.job_list if job.status < JobStatus.FINISHED])

def collect_finished(self) -> list[AbciJob]:
"""
Collects and removes all finished jobs from the job list.

Returns:
A list of finished AbciJob objects.
"""
finished_jobs = [job for job in self.job_list if job.status >= JobStatus.FINISHED]
for job in finished_jobs:
self.job_list.remove(job)

return finished_jobs
Loading