diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 9558b9f7..f6e6e8d6 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -24,7 +24,7 @@ jobs: pip install .[dev,github-actions] - name: Perform ruff run: | - ruff check + ruff check ruff format --check - name: Perform mypy run: | diff --git a/aiaccel/job/__init__.py b/aiaccel/job/__init__.py new file mode 100644 index 00000000..ddd0af90 --- /dev/null +++ b/aiaccel/job/__init__.py @@ -0,0 +1,4 @@ +from aiaccel.job.abci_job import AbciJob, JobStatus +from aiaccel.job.abci_job_manager import AbciJobExecutor + +__all__ = ["AbciJob", "JobStatus", "AbciJobExecutor"] diff --git a/aiaccel/job/abci_job.py b/aiaccel/job/abci_job.py new file mode 100644 index 00000000..55317a6b --- /dev/null +++ b/aiaccel/job/abci_job.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import re +import subprocess +import time +from enum import IntEnum, auto +from pathlib import Path +from typing import Any +from xml.etree import ElementTree + + +class JobStatus(IntEnum): + """ + Represents the status of a job. + + Attributes: + UNSUBMITTED: The job has not been submitted. + WAITING: The job is waiting to be executed. + RUNNING: The job is currently running. + FINISHED: The job has finished successfully. + ERROR: The job encountered an error. + + Methods: + from_qsub(status: str) -> JobStatus: + Converts a status string from the qsub command to a JobStatus enum value. + + Raises: + ValueError: If the status string is not recognized. + """ + + UNSUBMITTED = auto() + WAITING = auto() + RUNNING = auto() + FINISHED = auto() + ERROR = auto() + + @classmethod + def from_qsub(cls, status: str) -> JobStatus: + """ + Converts a status string from the qsub command to a JobStatus enum value. + + Args: + status (str): The status string from the qsub command. + + Returns: + JobStatus: The corresponding JobStatus enum value. + + Raises: + ValueError: If the status string is not recognized. + """ + match status: + case "r": + return JobStatus.RUNNING + case "qw" | "h" | "t" | "s" | "S" | "T" | "Rq": + return JobStatus.WAITING + case "d" | "Rr": + return JobStatus.RUNNING + case "E": + return JobStatus.ERROR + case _: + raise ValueError(f"Unexpected status: {status}") + + +class AbciJob: + """ + Represents a job to be submitted and managed on the ABCI system. + + Attributes: + job_filename (Path): The path to the job file. + job_group (str): The job group. + job_name (str): The name of the job. + cwd (Path): The current working directory. + stdout_filename (Path): The path to the standard output file. + stderr_filename (Path): The path to the standard error file. + tag (Any): A tag associated with the job. + status (JobStatus): The status of the job. + job_number (int | None): The job number assigned by the system. + + Methods: + submit: Submits the job to the system. + update_status: Updates the status of the job. + wait: Waits for the job to finish. + update_status_batch: Updates the status of a batch of jobs. + """ + + job_filename: Path + job_group: str + + job_name: str + + cwd: Path + stdout_filename: Path + stderr_filename: Path + + tag: Any + + status: JobStatus + job_number: int | None + + def __init__( + self, + job_filename: Path | str, + job_group: str, + job_name: str | None = None, + cwd: Path | str | None = None, + stdout_filename: Path | str | None = None, + stderr_filename: Path | str | None = None, + qsub_args: list[str] | None = None, + args: list[str] | None = None, + tag: Any = None, + ): + """ + Initializes a new instance of the AbciJob class. + + Args: + job_filename (Path | str): The path to the job file. + job_group (str): The job group. + job_name (str | None, optional): The name of the job. If not provided, \ + the name will be derived from the job filename. + cwd (Path | str | None, optional): The current working directory. If not provided, \ + the current working directory will be used. + stdout_filename (Path | str | None, optional): The path to the standard output file. If not provided, \ + a default filename will be used. + stderr_filename (Path | str | None, optional): The path to the standard error file. If not provided, \ + a default filename will be used. + qsub_args (list[str] | None, optional): Additional arguments to pass to the qsub command. Defaults to None. + args (list[str] | None, optional): Additional arguments to pass to the job file. Defaults to None. + tag (Any, optional): A tag associated with the job. Defaults to None. + """ + self.job_filename = Path(job_filename) + self.job_group = job_group + self.job_name = job_name if job_name is not None else self.job_filename.name + + self.cwd = Path(cwd) if cwd is not None else Path.cwd() + self.stdout_filename = Path(stdout_filename) if stdout_filename is not None else self.cwd / f"{self.job_name}.o" + self.stderr_filename = Path(stderr_filename) if stderr_filename is not None else self.cwd / f"{self.job_name}.o" + + self.tag = tag + + self.status = JobStatus.UNSUBMITTED + self.job_number = None + + # generate qsub command + self.cmd = ["qsub", "-g", job_group, "-o", str(self.stdout_filename), "-e", str(self.stderr_filename)] + if self.job_name is not None: + self.cmd += ["-N", self.job_name] + if qsub_args is not None: + self.cmd += [arg.format(job=self) for arg in qsub_args] + self.cmd += [str(self.job_filename)] + if args is not None: + self.cmd += [arg.format(job=self) for arg in args] + + def submit(self) -> AbciJob: + """ + Submits the job to the system. + + Returns: + AbciJob: The submitted job. + + Raises: + RuntimeError: If the job is already submitted. + RuntimeError: If the qsub result cannot be parsed. + """ + if self.status >= JobStatus.WAITING: + raise RuntimeError(f"This job is already submited as {self.job_name} (id: {self.job_number})") + + p = subprocess.run(self.cmd, capture_output=True, text=True, check=True) + + match = re.search(r"Your job (\d+)", p.stdout) + if match is None: + raise RuntimeError(f"The following qsub result cannot be parsed: {p.stdout}") + + self.job_number = int(match.group(1)) + self.status = JobStatus.WAITING + + return self + + def update_status(self) -> JobStatus: + """ + Updates the status of the job. + + Returns: + JobStatus: The updated status of the job. + """ + self.update_status_batch([self]) + return self.status + + def wait(self, sleep_time: float = 10.0) -> AbciJob: + """ + Waits for the job to finish. + + Args: + sleep_time (float, optional): The time to sleep between status updates. Defaults to 10.0. + + Returns: + AbciJob: The finished job. + """ + while self.update_status() < JobStatus.FINISHED: + time.sleep(sleep_time) + + return self + + @classmethod + def update_status_batch(cls, job_list: list[AbciJob]) -> None: + """ + Updates the status of a batch of jobs. + + Args: + job_list (list[AbciJob]): The list of jobs to update. + """ + job_dict = {j.job_number: j for j in job_list if j.status not in [JobStatus.UNSUBMITTED, JobStatus.FINISHED]} + p = subprocess.run(["qstat", "-xml"], capture_output=True, text=True, check=True) + + status_dict: dict[int, str] = {} + for el in ElementTree.fromstring(p.stdout).iter("job_list"): + status_dict[int(el.findtext("JB_job_number", default=-1))] = el.findtext("state", "") + + for job_number in set(job_dict.keys()) - set(status_dict.keys()): + job_dict[job_number].status = JobStatus.FINISHED + + for job_number, status in status_dict.items(): + job_dict[job_number].status = JobStatus.from_qsub(status) diff --git a/aiaccel/job/abci_job_manager.py b/aiaccel/job/abci_job_manager.py new file mode 100644 index 00000000..9b23dcff --- /dev/null +++ b/aiaccel/job/abci_job_manager.py @@ -0,0 +1,112 @@ +import time +from pathlib import Path +from typing import Any + +from aiaccel.job.abci_job import AbciJob, JobStatus + + +class AbciJobExecutor: + """ + Executes and manages jobs on the ABCI system. + + Attributes: + job_filename (Path): The path to the job file. + job_group (str): The group to which the job belongs. + job_name (str | None): The name of the job. + work_dir (Path): The working directory for the job. + n_max_jobs (int): The maximum number of jobs. + job_list (list[AbciJob]): The list of submitted jobs. + + Methods: + submit: Submits a job to the job manager. + available_slots: Returns the number of available slots for new jobs. + collect_finished: Collects and removes all finished jobs from the job list. + """ + + def __init__( + self, + job_filename: Path | str, + job_group: str, + job_name: str | None = None, + work_dir: Path | str | None = None, + n_max_jobs: int = 100, + ): + """ + Initialize the AbciJobManager object. + + Args: + job_filename (Path | str): The path or filename of the job. + job_group (str): The group to which the job belongs. + job_name (str | None, optional): The name of the job. Defaults to None. + work_dir (Path | str | None, optional): The working directory for the job. Defaults to None. + n_max_jobs (int, optional): The maximum number of jobs. Defaults to 100. + """ + self.job_filename = job_filename if isinstance(job_filename, Path) else Path(job_filename) + self.job_group = job_group + self.job_name = job_name + + self.work_dir = Path(work_dir) if work_dir is not None else Path.cwd() + self.work_dir.mkdir(parents=True, exist_ok=True) + + self.n_max_jobs = n_max_jobs + + self.job_list: list[AbciJob] = [] + + def submit( + self, + args: list[str], + tag: Any = None, + sleep_time: float = 5.0, + ) -> AbciJob: + """ + Submits a job to the job manager. + + Args: + args (list[str]): The arguments for the job. + tag (Any, optional): A tag to associate with the job. Defaults to None. + sleep_time (float, optional): The sleep time between checking for available slots. Defaults to 5.0. + + Returns: + AbciJob: The submitted job. + """ + while self.available_slots() == 0: + time.sleep(sleep_time) + + job = AbciJob( + self.job_filename, + self.job_group, + job_name=self.job_name, + args=args, + tag=tag, + ) + + job.submit() + self.job_list.append(job) + + return job + + def available_slots(self) -> int: + """ + Returns the number of available slots for new jobs. + + The available slots are calculated by subtracting the number of finished jobs + from the maximum number of jobs allowed. + + Returns: + int: The number of available slots. + """ + AbciJob.update_status_batch(self.job_list) + return self.n_max_jobs - len([job for job in self.job_list if job.status < JobStatus.FINISHED]) + + def collect_finished(self) -> list[AbciJob]: + """ + Collects and removes all finished jobs from the job list. + + Returns: + A list of finished AbciJob objects. + """ + finished_jobs = [job for job in self.job_list if job.status >= JobStatus.FINISHED] + for job in finished_jobs: + self.job_list.remove(job) + + return finished_jobs diff --git a/examples/job/optuna/main_parallel.py b/examples/job/optuna/main_parallel.py new file mode 100644 index 00000000..ebf6f271 --- /dev/null +++ b/examples/job/optuna/main_parallel.py @@ -0,0 +1,50 @@ +import pickle as pkl +from pathlib import Path + +import optuna + +from aiaccel.job import AbciJobExecutor + + +def main() -> None: + n_trials = 50 + + job_filename = Path("objective.sh") + job_group = "gaa50000" + + result_filename_template = "{job.cwd}/{job.job_name}_result.pkl" + + jobs = AbciJobExecutor(job_filename, job_group) + + study = optuna.create_study(direction="minimize") + + finished_job_count = 0 + + while finished_job_count <= n_trials: + for _ in range(jobs.available_slots()): + trial = study.ask() + hparams = { + "x1": trial.suggest_float("x1", 0, 10), + "x2": trial.suggest_float("x2", 0, 10), + } + + jobs.job_name = str(jobs.job_filename) + f"_{trial.number}" + + job = jobs.submit( + args=[result_filename_template] + sum([[f"--{k}", f"{v:.5f}"] for k, v in hparams.items()], []), + tag=trial, + ) + + for job in jobs.collect_finished(): + trial = job.tag + + with open(result_filename_template.format(job=job), "rb") as f: + y = pkl.load(f) + + study.tell(trial, y) + + finished_job_count += 1 + + +if __name__ == "__main__": + main() diff --git a/examples/job/optuna/main_serial.py b/examples/job/optuna/main_serial.py new file mode 100644 index 00000000..215c65ec --- /dev/null +++ b/examples/job/optuna/main_serial.py @@ -0,0 +1,42 @@ +import pickle as pkl +from pathlib import Path + +import optuna + +from aiaccel.job import AbciJob + + +def main() -> None: + n_trials = 50 + + job_filename = Path("objective.sh") + job_group = "gaa50000" + + result_filename_template = "{job.cwd}/{job.job_name}_result.pkl" + + study = optuna.create_study(direction="minimize") + + for _ in range(n_trials): + trial = study.ask() + hparams = { + "x1": trial.suggest_float("x1", 0, 10), + "x2": trial.suggest_float("x2", 0, 10), + } + + job = AbciJob( + job_filename, + job_group, + args=[result_filename_template] + sum([[f"--{k}", f"{v:.5f}"] for k, v in hparams.items()], []), + ) + + job.submit() + job.wait() + + with open(result_filename_template.format(job=job), "rb") as f: + y = pkl.load(f) + + study.tell(trial, y) + + +if __name__ == "__main__": + main() diff --git a/examples/job/optuna/objective.py b/examples/job/optuna/objective.py new file mode 100644 index 00000000..bddd398e --- /dev/null +++ b/examples/job/optuna/objective.py @@ -0,0 +1,22 @@ +import pickle as pkl +from argparse import ArgumentParser +from pathlib import Path + + +def main() -> None: + parser = ArgumentParser() + parser.add_argument("dst_filename", type=Path) + parser.add_argument("--x1", type=float) + parser.add_argument("--x2", type=float) + args = parser.parse_args() + + x1, x2 = args.x1, args.x2 + + y = (x1**2) - (4.0 * x1) + (x2**2) - x2 - (x1 * x2) + + with open(args.dst_filename, "wb") as f: + pkl.dump(y, f) + + +if __name__ == "__main__": + main() diff --git a/examples/job/optuna/objective.sh b/examples/job/optuna/objective.sh new file mode 100644 index 00000000..f7cca0bd --- /dev/null +++ b/examples/job/optuna/objective.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +#$-l rt_C.small=1 +#$-cwd + +source /etc/profile.d/modules.sh +module load gcc/13.2.0 +module load python/3.10/3.10.14 + +python objective.py $@ diff --git a/pyproject.toml b/pyproject.toml index 0f8d3c71..fa7f6000 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ classifiers = [ ] dependencies = [ "numpy", + "optuna", "optuna>=3.4.0" ] diff --git a/tests/job/qstat_dat 2r_1qw.txt b/tests/job/qstat_dat 2r_1qw.txt new file mode 100644 index 00000000..4a67368a --- /dev/null +++ b/tests/job/qstat_dat 2r_1qw.txt @@ -0,0 +1,40 @@ + + + + + 42340793 + 0.25586 + objective.sh_3 + group_name + r + 2024-05-08T09:32:24.097 + gpu@g1025 + + 10 + + + 42340795 + 0.25586 + objective.sh_4 + group_name + r + 2024-05-08T09:32:41.323 + gpu@g1022 + + 10 + + + + + 42340797 + 0.00000 + objective.sh_5 + group_name + qw + 2024-05-08T09:32:42.537 + + + 10 + + + diff --git a/tests/job/qstat_dat.txt b/tests/job/qstat_dat.txt new file mode 100644 index 00000000..47f0833e --- /dev/null +++ b/tests/job/qstat_dat.txt @@ -0,0 +1,51 @@ + + + + + 42340793 + 0.25586 + objective.sh_0 + group_name + r + 2024-05-07T15:17:56.963 + gpu@g1026 + + 10 + + + 42340795 + 0.25586 + objective.sh_1 + group_name + r + 2024-05-07T15:17:57.269 + gpu@g1030 + + 10 + + + 42340797 + 0.25586 + objective.sh_2 + group_name + r + 2024-05-07T15:17:59.078 + gpu@g1030 + + 10 + + + 42340799 + 0.25586 + objective.sh_3 + group_name + r + 2024-05-07T15:17:59.504 + gpu@g1030 + + 10 + + + + + diff --git a/tests/job/qstat_dat_1r.txt b/tests/job/qstat_dat_1r.txt new file mode 100644 index 00000000..e9a88b37 --- /dev/null +++ b/tests/job/qstat_dat_1r.txt @@ -0,0 +1,18 @@ + + + + + 42340793 + 0.25586 + objective.sh_0 + group_name + r + 2024-05-07T15:17:56.963 + gpu@g1026 + + 10 + + + + + diff --git a/tests/job/qstat_dat_empty.txt b/tests/job/qstat_dat_empty.txt new file mode 100644 index 00000000..d0f7f274 --- /dev/null +++ b/tests/job/qstat_dat_empty.txt @@ -0,0 +1,7 @@ + + + + + + + diff --git a/tests/job/test_abci_job.py b/tests/job/test_abci_job.py new file mode 100644 index 00000000..bfec8b97 --- /dev/null +++ b/tests/job/test_abci_job.py @@ -0,0 +1,169 @@ +import re +import shutil +from collections.abc import Generator +from pathlib import Path +from unittest.mock import patch + +import pytest +from utils import qstat_xml + +from aiaccel.job import AbciJob, JobStatus + +## JobStatus + + +def test_from_qsub_running() -> None: + for status in ["r", "d", "Rr"]: + assert JobStatus.from_qsub(status) == JobStatus.RUNNING + + +def test_from_qsub_waiting() -> None: + for status in ["qw", "h", "t", "s", "S", "T", "Rq"]: + assert JobStatus.from_qsub(status) == JobStatus.WAITING + + +def test_from_qsub_error() -> None: + assert JobStatus.from_qsub("E") == JobStatus.ERROR + + +def test_from_qsub_unexpected_status() -> None: + with pytest.raises(ValueError, match="Unexpected status: unexpected"): + JobStatus.from_qsub("unexpected") + + +## AbciJob + + +@pytest.fixture +def job_instance(tmpdir: Path) -> Generator[AbciJob, None, None]: + cwd = tmpdir / "cwd" + cwd.mkdir() + + job_name = "job" + + yield AbciJob( + job_filename="job.sh", + job_group="group", + job_name=job_name, + cwd=str(cwd), + stdout_filename=cwd / f"{job_name}.o", + stderr_filename=cwd / f"{job_name}.e", + qsub_args=["-l"], + args=["arg1", "arg2"], + tag=None, + ) + + shutil.rmtree(str(cwd)) + + +def test_init(job_instance: AbciJob) -> None: + job = job_instance + assert isinstance(job.job_filename, Path) + assert isinstance(job.cwd, Path) + assert isinstance(job.stdout_filename, Path) + assert isinstance(job.stderr_filename, Path) + assert job.status == JobStatus.UNSUBMITTED + assert job.job_number is None + assert job.cmd == ( + ["qsub"] + + ["-g", job.job_group] + + ["-o", str(job.stdout_filename)] + + ["-e", str(job.stderr_filename)] + + ["-N", job.job_name] + + ["-l", str(job.job_filename)] + + ["arg1", "arg2"] + ) + + +def test_submit(job_instance: AbciJob) -> None: + job = job_instance + job_number = 123456 + + assert job.status == JobStatus.UNSUBMITTED + + with patch("subprocess.run") as mock_run: + mock_run.return_value.stdout = "Your job 123456" + result = job.submit() + + assert result == job + assert result.job_number == job_number + assert result.status == JobStatus.WAITING + mock_run.assert_called_once_with(job.cmd, capture_output=True, text=True, check=True) + + +def test_submit_already_submitted(job_instance: AbciJob) -> None: + job = job_instance + job.job_number = 123456 + error_message = f"This job is already submited as {job.job_name} (id: {job.job_number})" + + with pytest.raises(RuntimeError, match=re.escape(error_message)): + job.status = JobStatus.WAITING + job.submit() + + +def test_submit_qsub_result_cannot_be_parsed(job_instance: AbciJob) -> None: + job = job_instance + error_message = "The following qsub result cannot be parsed: Invalid qsub result" + + assert job.status == JobStatus.UNSUBMITTED + + with patch("subprocess.run") as mock_run: + mock_run.return_value.stdout = "Invalid qsub result" + + with pytest.raises(RuntimeError, match=re.escape(error_message)): + job.submit() + + +def test_update_status(job_instance: AbciJob) -> None: + job = job_instance + job.job_number = 42340793 + + with patch("subprocess.run", return_value=qstat_xml("tests/job/qstat_dat_1r.txt")): + job.status = JobStatus.WAITING + result = job.update_status() + + assert result == JobStatus.RUNNING + + +def test_wait(job_instance: AbciJob) -> None: + job = job_instance + + with patch.object(job, "update_status") as mock_update_status: + mock_update_status.side_effect = [ + JobStatus.RUNNING, + JobStatus.RUNNING, + JobStatus.FINISHED, + ] + result = job.wait(sleep_time=0.1) + + assert result == job + assert mock_update_status.call_count == 3 + mock_update_status.assert_called_with() + + +def test_update_status_batch() -> None: + """ + - Job status: + - WAITING: 4 + - RUNNING: 0 + + ↓↓↓↓↓ + + - Job status: + - WAITING: 0 + - RUNNING: 4 + """ + + job_list: list[AbciJob] = [] + for ii in range(4): + job = AbciJob(job_filename="job.sh", job_group="group", job_name="job") + job.status = JobStatus.WAITING + job.job_number = 42340793 + 2 * ii + + job_list.append(job) + + with patch("subprocess.run", return_value=qstat_xml("tests/job/qstat_dat.txt")): + AbciJob.update_status_batch(job_list) + + for job in job_list: + assert job.status == JobStatus.RUNNING diff --git a/tests/job/test_abci_job_manager.py b/tests/job/test_abci_job_manager.py new file mode 100644 index 00000000..0c926788 --- /dev/null +++ b/tests/job/test_abci_job_manager.py @@ -0,0 +1,120 @@ +import shutil +from collections.abc import Generator +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from utils import qstat_xml + +from aiaccel.job import AbciJob, AbciJobExecutor, JobStatus + + +@pytest.fixture +def executor(tmpdir: Path) -> Generator[AbciJobExecutor, None, None]: + work_dir = tmpdir / "workdir" + work_dir.mkdir() + + n_max_jobs = 4 + + yield AbciJobExecutor( + job_filename="main.sh", + job_group="group", + job_name="job", + work_dir=str(work_dir), + n_max_jobs=n_max_jobs, + ) + + shutil.rmtree(str(work_dir)) + + +def test_available_slots_full(executor: AbciJobExecutor) -> None: + """ + - Job status: + - RUNNING: 4 + - available: 0 + """ + + for ii in range(4): + job = AbciJob(executor.job_filename, executor.job_group, job_name=executor.job_name) + job.status = JobStatus.RUNNING + job.job_number = 42340793 + 2 * ii + + executor.job_list.append(job) + + with patch("subprocess.run", return_value=qstat_xml("tests/job/qstat_dat.txt")): + slots = executor.available_slots() + assert slots == executor.n_max_jobs - len([j for j in executor.job_list if j.status <= JobStatus.RUNNING]) + + +def test_available_slots_pending(executor: AbciJobExecutor) -> None: + """ + - Job status: + - RUNNING: 2 + - WAITING: 1 + - available: 1 + """ + + for ii, status in enumerate([JobStatus.RUNNING, JobStatus.RUNNING, JobStatus.WAITING]): + job = AbciJob(executor.job_filename, executor.job_group, job_name=executor.job_name) + job.status = status + job.job_number = 42340793 + 2 * ii + + executor.job_list.append(job) + + with patch("subprocess.run", return_value=qstat_xml("tests/job/qstat_dat 2r_1qw.txt")): + slots = executor.available_slots() + assert slots == 1 + + +def test_available_slots_full_running(executor: AbciJobExecutor) -> None: + """ + - Job status: + - RUNNING: 4 + - WAITING: 0 + - available: 0 + """ + + for ii in range(4): + job = AbciJob(executor.job_filename, executor.job_group, job_name=executor.job_name) + job.status = JobStatus.RUNNING + job.job_number = 42340793 + 2 * ii + + executor.job_list.append(job) + + with patch("subprocess.run", return_value=qstat_xml("tests/job/qstat_dat.txt")): + result = executor.available_slots() + assert result == 0 + + +def test_available_slots_empty(executor: AbciJobExecutor) -> None: + """ + - Job status: + - RUNNING: 0 + - WAITING: 0 + - available: 4 + """ + with patch("subprocess.run", return_value=qstat_xml("tests/job/qstat_dat_empty.txt")): + result = executor.available_slots() + assert result == 4 + + +def test_collect_finished(executor: AbciJobExecutor) -> None: + job_list: list[AbciJob] = [] + for status in [JobStatus.FINISHED, JobStatus.RUNNING, JobStatus.FINISHED]: + job = MagicMock(spec=AbciJob) + job.status = status + + job_list.append(job) + executor.job_list.append(job) + + result = executor.collect_finished() + assert result == [job_list[0], job_list[2]] + assert executor.job_list == [job_list[1]] + + +def test_collect_finished_empty(executor: AbciJobExecutor) -> None: + executor.job_list = [] + + result = executor.collect_finished() + assert result == [] + assert executor.job_list == [] diff --git a/tests/job/utils.py b/tests/job/utils.py new file mode 100644 index 00000000..8da86e29 --- /dev/null +++ b/tests/job/utils.py @@ -0,0 +1,13 @@ +import subprocess +from typing import Any + + +def qstat_xml(txt_data_path: str = "tests/job/qstat_dat.txt") -> Any: + with open(txt_data_path) as f: + p = subprocess.CompletedProcess( + [], + returncode=0, + stdout=f.read().encode(), + stderr=b"", + ) + return p