Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(launcher): allow local launcher to work with xpress #2251

Merged
merged 31 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
28cd574
feat(launcher): allow local launcher to work with xpress
MartinBelthle Nov 27, 2024
f086e46
continue work
MartinBelthle Nov 27, 2024
dd6f9e3
remove line
MartinBelthle Nov 27, 2024
d27b569
continue work
MartinBelthle Nov 29, 2024
671686d
add doc
MartinBelthle Nov 29, 2024
4287e68
fix linting
MartinBelthle Nov 29, 2024
970b4bb
add unit test for local launcher
MartinBelthle Nov 29, 2024
3d78e2e
get binary for Alexander
MartinBelthle Dec 6, 2024
460e021
use solver 8.8.11 for desktop version
MartinBelthle Dec 10, 2024
9cfe01f
fix xpress presolve
MartinBelthle Dec 11, 2024
885b087
prepare work for logs
MartinBelthle Dec 11, 2024
1819e9b
continue work preparation
MartinBelthle Dec 11, 2024
7dcd434
little refactoring
MartinBelthle Dec 11, 2024
889014b
continue work
MartinBelthle Dec 11, 2024
521b9b8
remove useless function
MartinBelthle Dec 11, 2024
e579ba5
use .log instead of .txt
MartinBelthle Dec 11, 2024
a1067ef
import output only if the simulation succeeds
MartinBelthle Dec 11, 2024
c3ea9f2
add std err file
MartinBelthle Dec 11, 2024
4c6a741
add logs inside debug view
MartinBelthle Dec 11, 2024
c72bd08
see logs folder inside the front-end
MartinBelthle Dec 11, 2024
92de6b9
put back the workflows
MartinBelthle Dec 11, 2024
bb88b69
fix test
MartinBelthle Dec 11, 2024
549572b
fix test
MartinBelthle Dec 11, 2024
54e4c8b
add little condition for stderr
MartinBelthle Dec 12, 2024
e569513
resolve comments by doing the same as slurm inside local
MartinBelthle Dec 18, 2024
4483ca3
little refactoring
MartinBelthle Dec 18, 2024
6098d99
fix test
MartinBelthle Dec 18, 2024
93cb23a
add new folder inside resources directory
MartinBelthle Dec 19, 2024
ce7e305
merge with dev
MartinBelthle Jan 3, 2025
7e371e7
Merge branch 'dev' into feat/allow-xpress-and-presolve-in-local
sylvlecl Jan 15, 2025
47418a0
Merge branch 'dev' into feat/allow-xpress-and-presolve-in-local
sylvlecl Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ class LocalConfig:
enable_nb_cores_detection: bool = True
nb_cores: NbCoresConfig = NbCoresConfig()
time_limit: TimeLimitConfig = TimeLimitConfig()
xpress_dir: Optional[str] = None
local_workspace: Path = Path("./local_workspace")

@classmethod
def from_dict(cls, data: JSON) -> "LocalConfig":
Expand All @@ -278,10 +280,14 @@ def from_dict(cls, data: JSON) -> "LocalConfig":
nb_cores = data.get("nb_cores", asdict(defaults.nb_cores))
if enable_nb_cores_detection:
nb_cores.update(cls._autodetect_nb_cores())
xpress_dir = data.get("xpress_dir", defaults.xpress_dir)
local_workspace = Path(data["local_workspace"]) if "local_workspace" in data else defaults.local_workspace
return cls(
binaries={str(v): Path(p) for v, p in binaries.items()},
enable_nb_cores_detection=enable_nb_cores_detection,
nb_cores=NbCoresConfig(**nb_cores),
xpress_dir=xpress_dir,
local_workspace=local_workspace,
)

@classmethod
Expand Down
8 changes: 1 addition & 7 deletions antarest/launcher/adapters/abstractlauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from antarest.core.interfaces.cache import ICache
from antarest.core.interfaces.eventbus import Event, EventChannelDirectory, EventType, IEventBus
from antarest.core.model import PermissionInfo, PublicMode
from antarest.core.requests import RequestParameters
from antarest.launcher.adapters.log_parser import LaunchProgressDTO
from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType

Expand Down Expand Up @@ -70,12 +69,7 @@ def __init__(

@abstractmethod
def run_study(
self,
study_uuid: str,
job_id: str,
version: SolverVersion,
launcher_parameters: LauncherParametersDTO,
params: RequestParameters,
self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO
) -> None:
raise NotImplementedError()

Expand Down
161 changes: 77 additions & 84 deletions antarest/launcher/adapters/local_launcher/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,24 @@
#
# This file is part of the Antares project.

import io
import logging
import os
import shutil
import signal
import subprocess
import tempfile
import threading
import time
from pathlib import Path
from typing import Callable, Dict, Optional, Tuple, cast
from uuid import UUID
from typing import Any, Callable, Dict, List, Optional, Tuple

from antares.study.version import SolverVersion
from typing_extensions import override

from antarest.core.config import Config
from antarest.core.interfaces.cache import ICache
from antarest.core.interfaces.eventbus import IEventBus
from antarest.core.requests import RequestParameters
from antarest.launcher.adapters.abstractlauncher import AbstractLauncher, LauncherCallbacks, LauncherInitException
from antarest.launcher.adapters.log_manager import follow
from antarest.launcher.adapters.log_manager import LogTailManager
from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType

logger = logging.getLogger(__name__)
Expand All @@ -51,7 +48,11 @@ def __init__(
super().__init__(config, callbacks, event_bus, cache)
if self.config.launcher.local is None:
raise LauncherInitException("Missing parameter 'launcher.local'")
self.tmpdir = config.storage.tmp_dir
self.local_workspace = self.config.launcher.local.local_workspace
logs_path = self.local_workspace / "LOGS"
logs_path.mkdir(parents=True, exist_ok=True)
self.log_directory = logs_path
self.log_tail_manager = LogTailManager(self.local_workspace)
self.job_id_to_study_id: Dict[str, Tuple[str, Path, subprocess.Popen]] = {} # type: ignore
self.logs: Dict[str, str] = {}

Expand All @@ -76,12 +77,7 @@ def _select_best_binary(self, version: str) -> Path:

@override
def run_study(
self,
study_uuid: str,
job_id: str,
version: SolverVersion,
launcher_parameters: LauncherParametersDTO,
params: RequestParameters,
self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO
) -> None:
antares_solver_path = self._select_best_binary(f"{version:ddd}")

Expand All @@ -98,68 +94,37 @@ def run_study(
)
job.start()

def _get_job_final_output_path(self, job_id: str) -> Path:
return self.config.storage.tmp_dir / f"antares_solver-{job_id}.log"

def _compute(
self,
antares_solver_path: Path,
study_uuid: str,
uuid: UUID,
job_id: str,
launcher_parameters: LauncherParametersDTO,
) -> None:
end = False

def stop_reading_output() -> bool:
if end and str(uuid) in self.logs:
with open(
self._get_job_final_output_path(str(uuid)),
"w",
) as log_file:
log_file.write(self.logs[str(uuid)])
del self.logs[str(uuid)]
return end

tmp_path = tempfile.mkdtemp(prefix="local_launch_", dir=str(self.tmpdir))
export_path = Path(tmp_path) / "export"
export_path = self.local_workspace / job_id
logs_path = self.log_directory / job_id
logs_path.mkdir()
try:
self.callbacks.export_study(str(uuid), study_uuid, export_path, launcher_parameters)

args = [
str(antares_solver_path),
f"--force-parallel={launcher_parameters.nb_cpu}",
str(export_path),
]
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
encoding="utf-8",
)
self.job_id_to_study_id[str(uuid)] = (
study_uuid,
export_path,
process,
)
self.callbacks.update_status(
str(uuid),
JobStatus.RUNNING,
None,
None,
)
self.callbacks.export_study(job_id, study_uuid, export_path, launcher_parameters)

simulator_args, environment_variables = self._parse_launcher_options(launcher_parameters)
new_args = [str(antares_solver_path)] + simulator_args + [str(export_path)]

std_err_file = logs_path / f"{job_id}-err.log"
std_out_file = logs_path / f"{job_id}-out.log"
with open(std_err_file, "w") as err_file, open(std_out_file, "w") as out_file:
process = subprocess.Popen(
new_args,
env=environment_variables,
stdout=out_file,
stderr=err_file,
universal_newlines=True,
encoding="utf-8",
)
self.job_id_to_study_id[job_id] = (study_uuid, export_path, process)
self.callbacks.update_status(job_id, JobStatus.RUNNING, None, None)

thread = threading.Thread(
target=lambda: follow(
cast(io.StringIO, process.stdout),
self.create_update_log(str(uuid)),
stop_reading_output,
None,
),
name=f"{self.__class__.__name__}-LogsWatcher",
daemon=True,
)
thread.start()
self.log_tail_manager.track(std_out_file, self.create_update_log(job_id))

while process.poll() is None:
time.sleep(1)
Expand All @@ -170,32 +135,59 @@ def stop_reading_output() -> bool:
subprocess.run(["Rscript", "post-processing.R"], cwd=export_path)

output_id: Optional[str] = None
try:
output_id = self.callbacks.import_output(str(uuid), export_path / "output", {})
except Exception as e:
logger.error(
f"Failed to import output for study {study_uuid} located at {export_path}",
exc_info=e,
)
del self.job_id_to_study_id[str(uuid)]
if process.returncode == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In slurm laucher we still call the import output when it has failed, we need to understand if it makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, if the simulator fails, the output is not retrieved from calin, therefore the _import_output fails. But with the local launcher the output exists and therefore it can be retrieved. I think we should keep the code as it is, else i would have to touch code shared with the slurm part

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Indeed if we still wanted to import the output, we should have some kind of status to say that it's a "failed output".

I wonder if we should not also delete the import output in slurm implementation, to clarify the behaviour ?

# The job succeed we need to import the output
try:
launcher_logs = self._import_launcher_logs(job_id)
output_id = self.callbacks.import_output(job_id, export_path / "output", launcher_logs)
except Exception as e:
logger.error(
f"Failed to import output for study {study_uuid} located at {export_path}",
exc_info=e,
)
del self.job_id_to_study_id[job_id]
self.callbacks.update_status(
str(uuid),
job_id,
JobStatus.FAILED if process.returncode != 0 or not output_id else JobStatus.SUCCESS,
None,
output_id,
)
except Exception as e:
logger.error(f"Unexpected error happened during launch {uuid}", exc_info=e)
logger.error(f"Unexpected error happened during launch {job_id}", exc_info=e)
self.callbacks.update_status(
str(uuid),
job_id,
JobStatus.FAILED,
str(e),
None,
)
finally:
logger.info(f"Removing launch {uuid} export path at {tmp_path}")
end = True
shutil.rmtree(tmp_path)
logger.info(f"Removing launch {job_id} export path at {export_path}")
shutil.rmtree(export_path, ignore_errors=True)

def _import_launcher_logs(self, job_id: str) -> Dict[str, List[Path]]:
logs_path = self.log_directory / job_id
return {
"antares-out.log": [logs_path / f"{job_id}-out.log"],
"antares-err.log": [logs_path / f"{job_id}-err.log"],
}

def _parse_launcher_options(self, launcher_parameters: LauncherParametersDTO) -> Tuple[List[str], Dict[str, Any]]:
simulator_args = [f"--force-parallel={launcher_parameters.nb_cpu}"]
environment_variables = os.environ.copy()
if launcher_parameters.other_options:
solver = []
if "xpress" in launcher_parameters.other_options:
solver = ["--use-ortools", "--ortools-solver=xpress"]
if xpress_dir_path := self.config.launcher.local.xpress_dir: # type: ignore
environment_variables["XPRESSDIR"] = xpress_dir_path
environment_variables["XPRESS"] = environment_variables["XPRESSDIR"] + os.sep + "bin"
elif "coin" in launcher_parameters.other_options:
solver = ["--use-ortools", "--ortools-solver=coin"]
if solver:
simulator_args += solver
if "presolve" in launcher_parameters.other_options:
simulator_args += ["--solver-parameters", "PRESOLVE 1"]
return simulator_args, environment_variables

@override
def create_update_log(self, job_id: str) -> Callable[[str], None]:
Expand All @@ -210,10 +202,11 @@ def append_to_log(log_line: str) -> None:

@override
def get_log(self, job_id: str, log_type: LogType) -> Optional[str]:
if job_id in self.job_id_to_study_id and job_id in self.logs:
if job_id in self.job_id_to_study_id and job_id in self.logs and log_type == LogType.STDOUT:
return self.logs[job_id]
elif self._get_job_final_output_path(job_id).exists():
return self._get_job_final_output_path(job_id).read_text()
job_path = self.log_directory / job_id / f"{job_id}-{log_type.to_suffix()}"
if job_path.exists():
return job_path.read_text()
return None

@override
Expand Down
8 changes: 1 addition & 7 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from antarest.core.interfaces.cache import ICache
from antarest.core.interfaces.eventbus import Event, EventType, IEventBus
from antarest.core.model import PermissionInfo, PublicMode
from antarest.core.requests import RequestParameters
from antarest.core.utils.archives import unzip
from antarest.core.utils.utils import assert_this
from antarest.launcher.adapters.abstractlauncher import AbstractLauncher, LauncherCallbacks, LauncherInitException
Expand Down Expand Up @@ -592,12 +591,7 @@ def _apply_params(self, launcher_params: LauncherParametersDTO) -> argparse.Name

@override
def run_study(
self,
study_uuid: str,
job_id: str,
version: SolverVersion,
launcher_parameters: LauncherParametersDTO,
params: RequestParameters,
self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO
) -> None:
thread = threading.Thread(
target=self._run_study,
Expand Down
8 changes: 1 addition & 7 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,7 @@ def run_study(
)
self.job_result_repository.save(job_status)

self.launchers[launcher].run_study(
study_uuid,
job_uuid,
solver_version,
launcher_parameters,
params,
)
self.launchers[launcher].run_study(study_uuid, job_uuid, solver_version, launcher_parameters)

self.event_bus.push(
Event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

from typing_extensions import override

from antarest.study.storage.rawstudy.model.filesystem.bucket_node import BucketNode
from antarest.study.storage.rawstudy.model.filesystem.folder_node import FolderNode
from antarest.study.storage.rawstudy.model.filesystem.inode import TREE
from antarest.study.storage.rawstudy.model.filesystem.root.desktop import Desktop
from antarest.study.storage.rawstudy.model.filesystem.root.input.input import Input
from antarest.study.storage.rawstudy.model.filesystem.root.layers.layers import Layers
from antarest.study.storage.rawstudy.model.filesystem.root.logs import Logs
from antarest.study.storage.rawstudy.model.filesystem.root.output.output import Output
from antarest.study.storage.rawstudy.model.filesystem.root.settings.settings import Settings
from antarest.study.storage.rawstudy.model.filesystem.root.study_antares import StudyAntares
Expand All @@ -40,11 +40,13 @@ def build(self) -> TREE:
"study": StudyAntares(self.context, self.config.next_file("study.antares")),
"settings": Settings(self.context, self.config.next_file("settings")),
"layers": Layers(self.context, self.config.next_file("layers")),
"logs": Logs(self.context, self.config.next_file("logs")),
"input": Input(self.context, self.config.next_file("input")),
"user": User(self.context, self.config.next_file("user")),
}

if (self.config.path / "logs").exists():
children["logs"] = BucketNode(self.context, self.config.next_file("logs"))

if self.config.outputs:
output_config = self.config.next_file("output")
output_config.path = self.config.output_path or output_config.path
Expand Down
17 changes: 0 additions & 17 deletions antarest/study/storage/rawstudy/model/filesystem/root/logs.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ def build(self) -> TREE:
for i, s in self.config.outputs.items()
}

children["logs"] = BucketNode(self.context, self.config.next_file("logs"))
if (self.config.path / "logs").exists():
children["logs"] = BucketNode(self.context, self.config.next_file("logs"))
return children
13 changes: 13 additions & 0 deletions docs/developer-guide/install/1-CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,19 @@ it is instantiated on shared servers using `slurm`.

> NOTE: As you can see, you can use newer solver for older study version thanks to the solver retro-compatibility

### **xpress_dir**

- **Type:** str
- **Default value:** None
- **Description:** Path towards your xpress_dir. Needed if you want to launch a study with xpress. If the environment
variables "XPRESS_DIR" and "XPRESS" are set on your local environment it should work without setting them.

### **local_workspace**

- **Type:** Path
- **Default value:** `./local_workspace`
- **Description:** Antares Web uses this directory to run the simulations.

## **slurm**

SLURM (Simple Linux Utility for Resource Management) is used to interact with a remote environment (for Antares it's
Expand Down
Loading
Loading