diff --git a/config/README.md b/config/README.md new file mode 100644 index 00000000..af524a81 --- /dev/null +++ b/config/README.md @@ -0,0 +1,94 @@ +# Configuration autodetection for new systems + +The ReFrame configuration file can be automatically generated for a new system using ```generate.py```. + +## Features + +- Detection of system name +- Detection of hostname +- Detection of module system +- Detection of scheduler +- Detection of parallel launcher +- Detection of partitions based on node types (node features from Slurm) [only when the scheduler is **Slurm**] +- Detection of partitions based on reservations [only when the scheduler is **Slurm**] +- Detection of available container platforms in remote partitions (and required modules when the modules system is ```lmod``` or ```tmod```) [only when the scheduler is **Slurm**] +- Detection of devices with architecture in the nodes (GRes from Slurm) [only when the scheduler is **Slurm**] + +## Usage + +### Install Jinja2 and autopep8 python packages + +```sh +pip install jinja2 +pip install autopep8 +``` + +### Basic usage + +```sh +python3 generate.py +``` + +The script is run in **interactive** mode. User input is used to detect and generate the final configuration of the system. The user input can be supressed by passing the ```--auto``` option. + +## Available Arguments + +| Argument | Description | +|-----------------------------------------------------|-----------------------------------------------------------------------------------------| +| `--auto` | Disables user input. | +| `--exclude=[list_of_features]` | List of features to be excluded in the detection of node types | +| `--no-remote-containers` | Disables the detection of containers in the remote partitions | +| `--no-remote-devices` | Disables the detection of devices (slurm GRes) in the remote partitions | +| `--reservations=[list_reservations]` | Allows the specification of the reservations in the system for which a partitions should be created | +| `--prefix` | Shared directory where the jobs for remote detection will be created and submitted | +| `--access` | Additional access options that must be included for the sbatch submission | +| `-v` | Adjust the verbosity level to debug in ```auto``` mode. The option is only effective if combined with ```--auto```. | + +```sh +python3 generate.py --auto +``` + +With this option, user input is not required to generate the configuration file. In the ```auto``` mode the following partitions are automatically created: + +- Login partition +- Partition for each node type (based on Slurm AvailableFeatures) + +If additional partitions for a specific reservations are required, the ```--auto``` option can be combined with ```--reservations=reserv_1,reserv_2``` in order to create partitions for ```reserv_1``` and ```reserv_2``` respectively. + +```sh +python3 generate.py --auto --reservations=reserv_1,reserv_2 +``` + +In the ```auto``` mode the detection of container platforms and devices is by default enabled. This requires the submission of a job per partition to detect these features. The script will wait until the job is completed. This job submission can be disabled through the options ```--no-remote-containers``` and ```--no-remote-devices``` respectively. Note that by default if no Gres is detected in a node, no device detection script will be submitted. + +The options ```--no-remote-containers``` and ```--reservations=[list_reservations]``` are only used in the ```auto``` mode. The option ```--no-remote-devices``` is valid for both interactive and ```auto``` modes. + +**Excluding node features from the node types filtering** + +In order to exclude some features from the detection of the different node types, these can be passed to the script in the command line using the option ```--exclude=[list_of_features]```. Patterns can also be specified in this option using ```*```. + +*Usage example:* + +```sh +python3 generate.py --exclude=group*,c*,r* +``` + +Running this will ignore the node features that match those patterns. Node A with features ```(gpu,group2,column43,row9)``` and Node B ```(gpu,group8,column1,row75)``` will be identified as the same node type and included in the same partition. + +**Specifying additional access options** + +Additional access options can be passed to the script through the ```--access``` command line option. + +*Usage example:* + +```sh +python3 generate.py --access=-Cgpu +``` + +This option will add ```-Cgpu``` to the access options for the remote partitions in the configuration file and use it submit the remote detection jobs for container platforms and devices. + +## Generated configuration files + +The script generates a ```py``` file with the system configuration + +- .py: ``````_config.py diff --git a/config/generate.py b/config/generate.py new file mode 100644 index 00000000..f56c335e --- /dev/null +++ b/config/generate.py @@ -0,0 +1,137 @@ +# Copyright 2024 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import argparse +import autopep8 +import os +from jinja2 import Environment, FileSystemLoader +from utilities.config import SystemConfig +from utilities.io import getlogger, set_logger_level + +JINJA2_TEMPLATE = 'reframe_config_template.j2' + + +def main(user_input, containers_search, devices_search, reservs, + exclude_feat, access_opt, tmp_dir): + + # Initialize system configuration + system_info = SystemConfig() + # Build the configuration with the right options + system_info.build_config( + user_input=user_input, detect_containers=containers_search, + detect_devices=devices_search, exclude_feats=exclude_feats, + reservs=reservs, access_opt=access_opt, tmp_dir=tmp_dir + ) + + # Set up Jinja2 environment and load the template + template_loader = FileSystemLoader(searchpath='.') + env = Environment(loader=template_loader, + trim_blocks=True, lstrip_blocks=True) + rfm_config_template = env.get_template(JINJA2_TEMPLATE) + + systemn_info_jinja = system_info.format_for_jinja() + # Render the template with the gathered information + organized_config = rfm_config_template.render(systemn_info_jinja) + + # Output filename for the generated configuration + output_filename = f'{system_info.systemname}_config.py' + + # Format the content + formatted = autopep8.fix_code(organized_config) + + # Overwrite the file with formatted content + with open(output_filename, "w") as output_file: + output_file.write(formatted) + + getlogger().info( + f'\nThe following configuration files was created:\n' + f'PYTHON: {system_info.systemname}_config.py', color=False + ) + + +if __name__ == '__main__': + + # Create an ArgumentParser object + parser = argparse.ArgumentParser() + + # Define the '--auto' flag + parser.add_argument('--auto', action='store_true', + help='Turn off interactive mode') + # Define the '--no-remote-containers' flag + parser.add_argument( + '--no-remote-containers', action='store_true', + help='Disable container platform detection in remote partition' + ) + # Define the '--no-remote-devices' flag + parser.add_argument('--no-remote-devices', action='store_true', + help='Disable devices detection in remote partition') + # Define the '--reservations' flag + parser.add_argument( + '--reservations', nargs='?', + help='Specify the reservations that you want to create partitions for.' + ) + # Define the '--exclude' flag + parser.add_argument( + '--exclude', nargs='?', + help='Exclude the certain node features for the detection ' + + 'of node types' + ) + # Define the '--prefix' flag + parser.add_argument( + '--prefix', action='store', + help='Shared directory for remote detection jobs' + ) + # Define the '--access' flag + parser.add_argument( + '--access', action='store', + help='Compulsory options for accesing remote nodes with sbatch' + ) + # Define the '--access' flag + parser.add_argument( + '-v', action='store_true', + help='Set the verbosity to debug. Only effective if combined with --auto.' + ) + + args = parser.parse_args() + + user_input = not args.auto + + containers_search = True + if args.no_remote_containers: + containers_search = False + + devices_search = True + if args.no_remote_devices: + devices_search = False + + if args.reservations: + reservs = args.reservations.split(',') + else: + reservs = False + + if args.exclude: + exclude_feats = args.exclude.split(',') + else: + exclude_feats = [] + + if args.prefix: + if os.path.exists(args.prefix): + tmp_dir = args.prefix + else: + raise ValueError('The specified d--prefix was not found') + else: + tmp_dir = [] + + if args.access: + access_opt = args.access.split(',') + else: + access_opt = '' + + user_input = not args.auto + + set_logger_level(args.v or user_input) + + main(user_input, containers_search, devices_search, + reservs, exclude_feats, access_opt, tmp_dir) diff --git a/config/reframe_config_template.j2 b/config/reframe_config_template.j2 new file mode 100644 index 00000000..5c08d42a --- /dev/null +++ b/config/reframe_config_template.j2 @@ -0,0 +1,96 @@ +# Copyright 2024 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +# This is a generated ReFrame configuration file +# The values in this file are dynamically filled in using the system's current configuration + +site_configuration = { + 'systems': [ + { + 'name': '{{ name }}', # Name of the system + 'descr': 'System description for {{ name }}', # Description of the system + 'hostnames': {{hostnames}}, # Hostname used by this system + 'modules_system': '{{modules_system}}', + {% if modules %} + # Specify the modules to be loaded in the system when running reframe (if any) + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#config.systems.modules + 'modules': {{ modules }}, + {% endif %} + {% if resourcesdir %} + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#config.systems.resourcesdir + 'resourcesdir': '{{ resourcesdir }}', # Directory path for system resources + {% endif %} + # Define the partitions of the system (based on node type or reservations) + # !!Partition autodetection is only available for the slurm scheduler + 'partitions': [ + {% for partition in partitions %} + { + 'name': '{{partition.name}}', + 'descr': '{{partition.descr}}', + 'launcher': '{{partition.launcher}}', # Launcher for parallel jobs + 'environs': {{partition.environs}}, # Check 'environments' config below + 'scheduler': '{{partition.scheduler}}', + 'time_limit': '{{partition.time_limit}}', + 'max_jobs': {{partition.max_jobs}}, + {% if partition.features | length > 1 %} + # Resources for testing this partition (https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#config.systems.partitions.resources) + 'resources': [{'name': 'switches', + 'options': ['--switches={num_switches}']}, + {'name': 'gres', + 'options': ['--gres={gres}']}, + {'name': 'memory', + 'options': ['--mem={mem_per_node}']}], + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#config.systems.partitions.extras + 'extras': {{partition.extras}}, + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#config.systems.partitions.env_vars + 'env_vars': {{partition.env_vars}}, + {% if partition.devices %} + # Check if any specific devices were found in this node type + # The gpus found in slurm GRes will be specified here + 'devices': [ + {% for dev in partition.devices %} + { 'type': '{{dev.type}}', + 'model': '{{dev.model}}', + {% if dev.arch %} + 'arch': '{{dev.arch}}', + {% endif %} + 'num_devices': {{dev.num_devices}} + }, + {% endfor %} + ], + {% endif %} + {% if partition.container_platforms %} + # Check if any container platforms are available in these nodes and add them + # https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#container-platform-configuration + 'container_platforms': [ + {% for c_p in partition.container_platforms %} + { 'type': '{{c_p.type}}', # Type of container platform + {% if c_p.modules %} + # Specify here the modules required to run the container platforms (if any) + 'modules': {{c_p.modules}} + {% endif %} + }, + {% endfor %} + ], + {% endif %} + {% endif %} + {% if partition.access %} + # Options passed to the job scheduler in order to submit a job to the specific nodes in this partition + 'access': {{partition.access}}, + {% endif %} + {% if partition.features %} + # Node features detected in slurm + 'features': {{partition.features}}, + {% endif %} + }, + {% endfor %} + ], + }, + ], + # The environments cannot be automatically detected, check the following links for reference + # 'https://github.com/eth-cscs/cscs-reframe-tests/tree/alps/config/systems': CSCS github repo + # 'https://reframe-hpc.readthedocs.io/en/stable/config_reference.html#environment-configuration': ReFrame documentation + 'environments': [] +} diff --git a/config/utilities/config.py b/config/utilities/config.py new file mode 100644 index 00000000..57e5d5aa --- /dev/null +++ b/config/utilities/config.py @@ -0,0 +1,222 @@ +# Copyright 2024 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import asyncio +import os +import re +import socket +from typing import Union +from utilities.io import (getlogger, request_modules, + user_descr, user_selection) +from utilities.job_util import Launcher, Scheduler, SlurmContext +from utilities.modules import ModulesSystem, modules_impl + + +class SystemConfig: + '''Describes the system configuration''' + + def __init__(self): + self._systemname = '' + self._hostnames = [] + self._resourcesdir = '' + self._modules_system = 'nomod' + self._modules = [] + self._partitions = {} + self._remote_launcher = '' + self._remote_scheduler = '' + + @property + def systemname(self): + return self._systemname + + @property + def hostnames(self): + return self._hostnames + + @property + def resourcesdir(self): + return self._resourcesdir + + @property + def modules_system(self): + return self._modules_system + + @property + def modules(self): + return self._modules + + @property + def partitions(self): + return self._partitions + + def find_systemname(self): + '''Try to get the system name from the + environment variable CLUSTER_NAME + ''' + systemname = os.getenv('CLUSTER_NAME') + if systemname: + self._systemname = systemname + getlogger().info(f'System name is {systemname}') + else: + self._systemname = 'cluster' + getlogger().warning( + f'System name not found set to "{self._systemname}"' + ) + + def find_hostname(self): + '''Try to get the hostname''' + try: + hostname = socket.gethostname() + except Exception as e: + self._hostnames.append('') + getlogger().error('Hostname not found') + getlogger().error(f'Trying to retrieve the hostname raised:\n{e}') + else: + hostname = hostname.strip() + hostname = re.search(r'^[A-Za-z]+', hostname) + self._hostnames.append(hostname.group(0)) + getlogger().info(f'Hostname is {hostname.group(0)}') + + def find_modules_system(self) -> Union[ModulesSystem, None]: + '''Detect the modules system and return it''' + for mod in modules_impl: + modules_system = modules_impl[mod] + if modules_system().found: + self._modules_system = modules_system().name + getlogger().info( + f'Found a sane {self._modules_system} ' + 'installation in the system') + break + + if self._modules_system: + return modules_system() + else: + return None + + def _get_resourcesdir(self): + '''Ask about a possible resources dir''' + res_dir = user_descr(('Directory prefix where external test resources ' + '(e.g., large input files) are stored.'), + cancel_n=True) + while not os.path.exists(res_dir) and not res_dir: + getlogger().warning('The specified directory does not exist.') + res_dir = user_descr(('Directory prefix where external test ' + 'resources (e.g., large input files) ' + 'are stored.'), cancel_n=True) + if res_dir: + self._resourcesdir = res_dir + + def find_scheduler(self, user_input: bool, detect_containers: bool, + detect_devices: bool, wait: bool, access_opt: list, + tmp_dir: Union[str, None]) -> Union[SlurmContext, None]: + '''Detect the remote scheduler''' + scheduler = Scheduler() + scheduler.detect_scheduler(user_input) + self._remote_scheduler = scheduler.name + if self._remote_scheduler == 'slurm' or \ + self._remote_scheduler == 'squeue': + return SlurmContext(self._modules_system, + detect_containers=detect_containers, + detect_devices=detect_devices, + access_opt=access_opt, + wait=wait, tmp_dir=tmp_dir) + else: + return None + + def find_launcher(self, user_input: bool): + '''Detect the parallel launcher''' + launcher = Launcher() + launcher.detect_launcher(user_input) + self._remote_launcher = launcher.name + + def build_config(self, user_input: bool = True, + detect_containers: bool = True, + detect_devices: bool = True, + wait: bool = True, exclude_feats: list = [], + reservs: list = [], access_opt: list = [], + tmp_dir: Union[str, None] = None): + '''Build the configuration with all the information''' + # System name + self.find_systemname() + # Hostname + self.find_hostname() + # Modules system + modules_system = self.find_modules_system() + # TODO: not available for spack yet + if modules_system and user_input: + getlogger().debug('You can require some modules to be loaded ' + 'every time reframe is run on this system') + self._modules = request_modules(modules_system) + + if user_input: + self._get_resourcesdir() + # Scheduler + self._slurm_schd = self.find_scheduler( + user_input, + detect_containers=detect_containers, + detect_devices=detect_devices, + access_opt=access_opt, + wait=wait, tmp_dir=tmp_dir + ) + # Launcher + self.find_launcher(user_input) + # Partition detection only available with Slurm + if self._slurm_schd: + # Search node types + self._slurm_schd.search_node_types(exclude_feats) + # Start creation of the partitions if slurm is the scheduler + self._slurm_schd.create_login_partition(user_input=user_input) + # Initialize the asynchronous loop + loop = asyncio.get_event_loop() + loop.run_until_complete(self._slurm_schd.create_partitions( + launcher=self._remote_launcher, + scheduler=self._remote_scheduler, + user_input=user_input)) + loop.close() + # Assign the partitions to SystemConfig object + self._partitions = self._slurm_schd.partitions + # Create / search for the partitions based on the reservations + if user_input or reservs: + self._slurm_schd.search_reservations() + if not user_input: + for reserv in reservs: + if reserv not in self._slurm_schd.reservations: + getlogger().warning( + f'Reservation {reserv} not found, ' + 'skipping...\n') + else: + self._slurm_schd.create_reserv_partition( + launcher=self._remote_launcher, + scheduler=self._remote_scheduler, + user_input=False, + reserv=reserv + ) + else: + reserv = True + while reserv: + getlogger().debug( + 'Do you want to create a partition for ' + 'a reservation?') + getlogger().debug(f'{self._slurm_schd.reservations}\n') + reserv = user_selection( + self._slurm_schd.reservations, cancel_n=True) + if reserv: + self._slurm_schd.create_reserv_partition( + launcher=self._remote_launcher, + scheduler=self._remote_scheduler, + user_input=True, + reserv=reserv + ) + + def format_for_jinja(self) -> dict: + '''Generate an iterable for the jinja template''' + system_dict = {} + system_dict.update({'name': self.systemname}) + system_dict.update({'hostnames': self.hostnames}) + system_dict.update({'modules_system': self._modules_system}) + system_dict.update({'modules': self.modules}) + system_dict.update({'resourcesdir': self.resourcesdir}) + system_dict.update({'partitions': self.partitions}) + return system_dict diff --git a/config/utilities/constants.py b/config/utilities/constants.py new file mode 100644 index 00000000..8f3a1765 --- /dev/null +++ b/config/utilities/constants.py @@ -0,0 +1,336 @@ +# Copyright 2024 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +CONTAINERS = [{'name': 'Sarus', 'cmd': 'sarus'}, + {'name': 'Apptainer', 'cmd': 'apptainer'}, + {'name': 'Docker', 'cmd': 'docker'}, + {'name': 'Singularity', 'cmd': 'singularity'}, + {'name': 'Shifter', 'cmd': 'shifter'}] + + +class bcolors: + HEADER = '\033[95m' + BLUE = '\033[94m' + CYAN = '\033[96m' + GREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + +rfm_url = 'https://reframe-hpc.readthedocs.io/en/stable/config_reference.html' + + +RFM_DOCUMENTATION = { + 'modules': f'{rfm_url}#config.systems.modules', + 'resourcesdir': f'{rfm_url}#config.systems.resourcesdir', + 'schedulers': f'{rfm_url}#config.systems.partitions.scheduler', + 'devices': f'{rfm_url}#config.systems.partitions.devices', + 'sched_resources': f'{rfm_url}#custom-job-scheduler-resources', + 'extras': f'{rfm_url}#config.systems.partitions.extras', + 'partition_resources': f'{rfm_url}#config.systems.partitions.resources', + 'partition_envvars': f'{rfm_url}#config.systems.partitions.env_vars', + 'container_platforms': f'{rfm_url}#container-platform-configuration', + 'container_platformsm': (f'{rfm_url}#config.systems.partitions.' + 'container_platforms.modules'), + 'environments': f'{rfm_url}#environments', + 'modes': f'{rfm_url}#execution-mode-configuration' +} + +resources = [{'name': 'switches', + 'options': ['--switches={num_switches}']}, + {'name': 'gres', + 'options': ['--gres={gres}']}, + {'name': 'memory', + 'options': ['--mem={mem_per_node}']}] + +nvidia_gpu_architecture = { + "Tesla K20": "sm_35", + "Tesla K40": "sm_35", + "Tesla P100": "sm_60", + "Tesla V100": "sm_70", + "Tesla T4": "sm_75", + "Tesla A100": "sm_80", + "Quadro RTX 8000": "sm_75", + "Quadro RTX 6000": "sm_75", + "Quadro P6000": "sm_61", + "Quadro GV100": "sm_70", + "GeForce GTX 1080": "sm_61", + "GeForce GTX 1080 Ti": "sm_61", + "GeForce GTX 1070": "sm_61", + "GeForce GTX 1060": "sm_61", + "GeForce GTX 1050": "sm_61", + "GeForce RTX 2060": "sm_75", + "GeForce RTX 2070": "sm_75", + "GeForce RTX 2080": "sm_75", + "GeForce RTX 2080 Ti": "sm_75", + "GeForce RTX 3060": "sm_86", + "GeForce RTX 3070": "sm_86", + "GeForce RTX 3080": "sm_86", + "GeForce RTX 3090": "sm_86", + "GeForce RTX 4060": "sm_89", + "GeForce RTX 4070": "sm_89", + "GeForce RTX 4080": "sm_89", + "GeForce RTX 4090": "sm_89", + "A100": "sm_80", + "H100": "sm_90", + "H200": "sm_90", + "H100 PCIe": "sm_90", + "H100 SXM": "sm_90", + "Titan V": "sm_70", + "Titan RTX": "sm_75" +} + +amd_gpu_architecture = { + # RDNA 3 Series + "RX 7900 XTX": "RDNA 3", + "RX 7900 XT": "RDNA 3", + "RX 7800 XT": "RDNA 3", + "RX 7700 XT": "RDNA 3", + "RX 7600": "RDNA 3", + + # RDNA 2 Series + "RX 6950 XT": "RDNA 2", + "RX 6900 XT": "RDNA 2", + "RX 6800 XT": "RDNA 2", + "RX 6800": "RDNA 2", + "RX 6750 XT": "RDNA 2", + "RX 6700 XT": "RDNA 2", + "RX 6700": "RDNA 2", + "RX 6650 XT": "RDNA 2", + "RX 6600 XT": "RDNA 2", + "RX 6600": "RDNA 2", + "RX 6500 XT": "RDNA 2", + "RX 6400": "RDNA 2", + + # RDNA Series + "RX 5700 XT": "RDNA", + "RX 5700": "RDNA", + "RX 5600 XT": "RDNA", + "RX 5600": "RDNA", + "RX 5500 XT": "RDNA", + "RX 5500": "RDNA", + "RX 5300": "RDNA", + "RX 5300M": "RDNA", + + # Vega Series + "RX Vega 64": "Vega", + "RX Vega 56": "Vega", + "Radeon Vega Frontier Edition": "Vega", + "Radeon Pro Vega 64": "Vega", + "Radeon Pro Vega 56": "Vega", + + # Vega 20 Series + "Radeon Pro VII": "Vega 20", + "Instinct MI50": "Vega 20", + "Instinct MI60": "Vega 20", + + # Polaris (RX 500 Series) + "RX 590": "Polaris (GCN 4)", + "RX 580": "Polaris (GCN 4)", + "RX 570": "Polaris (GCN 4)", + "RX 560": "Polaris (GCN 4)", + "RX 550": "Polaris (GCN 4)", + + # Polaris (RX 400 Series) + "RX 480": "Polaris (GCN 4)", + "RX 470": "Polaris (GCN 4)", + "RX 460": "Polaris (GCN 4)", + + # Fury and Nano (Fiji, GCN 3) + "R9 Fury X": "Fiji (GCN 3)", + "R9 Fury": "Fiji (GCN 3)", + "R9 Nano": "Fiji (GCN 3)", + "R9 Fury Nano": "Fiji (GCN 3)", + + # R9 300 Series (GCN 2 and GCN 3) + "R9 390X": "Hawaii (GCN 2)", + "R9 390": "Hawaii (GCN 2)", + "R9 380X": "Antigua (GCN 3)", + "R9 380": "Antigua (GCN 3)", + "R9 370X": "Trinidad (GCN 1)", + "R9 370": "Trinidad (GCN 1)", + + # R7 300 Series (GCN 1) + "R7 370": "Trinidad (GCN 1)", + "R7 360": "Bonaire (GCN 1)", + + # R9 200 Series (GCN 1 and GCN 2) + "R9 290X": "Hawaii (GCN 2)", + "R9 290": "Hawaii (GCN 2)", + "R9 280X": "Tahiti (GCN 1)", + "R9 280": "Tahiti (GCN 1)", + "R9 270X": "Curacao (GCN 1)", + "R9 270": "Curacao (GCN 1)", + "R9 260X": "Bonaire (GCN 1)", + "R9 260": "Bonaire (GCN 1)", + + # R7 200 Series (GCN 1) + "R7 265": "Pitcairn (GCN 1)", + "R7 260X": "Bonaire (GCN 1)", + "R7 260": "Bonaire (GCN 1)", + "R7 250X": "Oland (GCN 1)", + "R7 250": "Oland (GCN 1)", + "R7 240": "Oland (GCN 1)", + + # HD 7000 Series (GCN 1) + "HD 7970": "Tahiti (GCN 1)", + "HD 7950": "Tahiti (GCN 1)", + "HD 7870": "Pitcairn (GCN 1)", + "HD 7850": "Pitcairn (GCN 1)", + "HD 7790": "Bonaire (GCN 1)", + "HD 7770": "Cape Verde (GCN 1)", + "HD 7750": "Cape Verde (GCN 1)" +} + +containers_detect_bash = ''' +# List of containers to check +CONTAINERS=( + "Sarus:sarus" + "Apptainer:apptainer" + "Docker:docker" + "Singularity:singularity" + "Shifter:shifter" +) + +# Array to hold installed containers +installed=() + +# Function to check for module existence (with lmod) +check_module_spider() { + output=$(module spider "$1" 2>&1) + if echo $output | grep -q "error"; then + return 1 + else + return 0 + fi +} + +# Function to check for module existence (with tmod) +check_module_avail() { + output=$(module avail "$1" 2>&1) + if echo $output | grep -q "$1"; then + return 0 + else + return 1 + fi +} + +check_lmod() { + if [[ -n "$LMOD_CMD" ]]; then + return 0 + else + return 1 + fi +} + +check_tmod() { + if [[ -n "modulecmd -V" ]]; then + return 0 + else + return 1 + fi +} + +# Check each container command +for container in "${CONTAINERS[@]}"; do + IFS=":" read -r name cmd <<< "$container" + + # Check if the command exists via 'which' + found_via_command=false + found_via_module=false + + if which "$cmd" > /dev/null 2>&1; then + found_via_command=true + fi + + if check_lmod; then + # Check if it is available as a module, regardless of 'which' result + if check_module_spider "$cmd"; then + output=$(module spider "$cmd" 2>&1) + modules_load=$(echo $output | grep -oP '(?<=available to load.).*?(?= Help)') + found_via_module=true + fi + fi + + if check_tmod; then + # Check if it is available as a module, regardless of 'which' result + if check_module_avail "$cmd"; then + output=$(module avail "$cmd" 2>&1) + modules_load="" + found_via_module=true + fi + fi + + # Determine the status of the container + if $found_via_command && $found_via_module; then + installed+=("$name modules: $modules_load") + elif $found_via_command; then + installed+=("$name") + elif $found_via_module; then + installed+=("$name modules: $modules_load") + else + echo "$name is not installed." + fi +done + +# Output installed containers +echo "Installed containers:" +for name in "${installed[@]}"; do + echo "$name" +done +''' + +devices_detect_bash = ''' +# Check for NVIDIA GPUs +if command -v nvidia-smi > /dev/null 2>&1; then + echo "Checking for NVIDIA GPUs..." + gpu_info=$(nvidia-smi --query-gpu=name --format=csv,noheader) + + if [ -z "$gpu_info" ]; then + echo "No NVIDIA GPU found." + else + echo "NVIDIA GPUs installed:" + echo "$gpu_info" + fi +else + echo "No NVIDIA GPU found or nvidia-smi command is not available." +fi + +# Check for AMD GPUs (if applicable) +if command -v lspci > /dev/null 2>&1; then + echo -e "\nChecking for AMD GPUs:" + if lspci | grep -i 'radeon'; then + lspci | grep -i 'radeon' + else + echo "No AMD GPU found." + fi +else + echo "lspci command is not available." +fi +''' + +cn_memory_bash = ''' +#!/bin/bash + +# Execute the free -h command and capture the output +output=$(free -h) + +# Extract the total memory value from the output (second column of the "Mem:" row) +total_memory=$(echo "$output" | grep -i "mem:" | awk '{print $2}') + +# Output the total memory +echo "Total memory: $total_memory" +''' + +slurm_submission_header = '''#!/bin/bash +#SBATCH --job-name="Config_autodetection" +#SBATCH --ntasks=1 +#SBATCH --output=config_autodetection_{partition_name}.out +#SBATCH --error=config_autodetection_{partition_name}.out +#SBATCH --time=0:2:0 +''' diff --git a/config/utilities/io.py b/config/utilities/io.py new file mode 100644 index 00000000..c63a6e2b --- /dev/null +++ b/config/utilities/io.py @@ -0,0 +1,215 @@ +# Copyright 2024 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import asyncio +import itertools +import logging +import sys +from typing import Union +from utilities.modules import ModulesSystem + + +async def status_bar(): + """ + Asynchronous function that displays a rotating status bar. + """ + sys.stdout.write('\n') + for symbol in itertools.cycle(["|", "/", "-", "\\"]): + sys.stdout.write(f"\rWaiting for the submitted jobs... {symbol}") + sys.stdout.flush() + await asyncio.sleep(0.2) # Adjust for desired speed of the status bar + + +class CustomFormatter(logging.Formatter): + # Define ANSI escape codes for colors + RESET = "\033[0m" + COLORS = { + logging.DEBUG: "", # No color for DEBUG + logging.INFO: "\033[32m", # Green for INFO + logging.WARNING: "\033[33m", # Yellow for WARNING + logging.ERROR: "\033[31m", # Red for ERROR + logging.CRITICAL: "\033[31;1m" # Bright Red for CRITICAL + } + + def format(self, record): + # Check if the log has a 'color' attribute set for this message + color = self.COLORS.get(record.levelno, self.RESET) if getattr( + record, 'use_color', True) else '' + message = super().format(record) + return f"{color}{message}{self.RESET}" + + +class CustomLogger(logging.Logger): + def __init__(self): + super().__init__("a") + + # Create console handler and formatter + self.console_handler = logging.StreamHandler() + formatter = CustomFormatter() + self.console_handler.setFormatter(formatter) + + self.addHandler(self.console_handler) + self.setLevel(logging.DEBUG) + + def _log_with_color(self, level, msg, *args, **kwargs): + """Internal method to log with color control.""" + # Check if the user has passed the 'color' argument + use_color = kwargs.pop('color', True) + extra = kwargs.setdefault('extra', {}) + extra['use_color'] = use_color + super()._log(level, msg, args, **kwargs) + + def debug(self, msg, *args, **kwargs): + self._log_with_color(logging.DEBUG, msg, *args, **kwargs) + + def info(self, msg, *args, **kwargs): + self._log_with_color(logging.INFO, msg, *args, **kwargs) + + def warning(self, msg, *args, **kwargs): + self._log_with_color(logging.WARNING, msg, *args, **kwargs) + + def error(self, msg, *args, **kwargs): + self._log_with_color(logging.ERROR, msg, *args, **kwargs) + + def critical(self, msg, *args, **kwargs): + self._log_with_color(logging.CRITICAL, msg, *args, **kwargs) + + +_logger = CustomLogger() + + +def set_logger_level(debug_info: bool = True): + global _logger + if not debug_info: + _logger.console_handler.setLevel(logging.INFO) + + +def getlogger(): + return _logger + + +def user_yn(prompt: str) -> bool: + ''' Request user yes or no''' + + prompt = prompt + ' (y/n): ' + user_response = '' + while user_response.lower() != 'y' and user_response.lower() != 'n': + user_response = input(prompt) + if user_response.lower() == 'y': + return True + else: + return False + + +def user_integer(prompt: str, default_value: Union[int, None] = None) -> int: + ''' Request user integer value''' + + if default_value: + prompt = prompt + f' (default is {default_value}): ' + is_integer = False + user_value = input(prompt) + while not is_integer: + if not user_value and default_value: + return default_value + try: + user_value = int(user_value) + is_integer = True + except Exception: + user_value = input('It must be an integer value:') + + return user_value + + +def user_selection(options: str, cancel_n: bool = False) -> Union[str, bool]: + '''Request user selection from a list of options''' + + if cancel_n: + msg = ('The selected option was not recognized. ' + 'Please check the syntax (or press n to remove): ') + else: + msg = ('The selected option was not recognized. ' + 'Please check the syntax: ') + + user_option = input('Please select one from the list above: ') + while user_option not in options: + user_option = input(msg) + if cancel_n and user_option.lower() == 'n': + return False + + return user_option + + +def user_descr(prompt: str, default_value: Union[str, None] = None, + cancel_n: bool = False) -> Union[str, bool]: + ''' Request user name / decription''' + + if default_value: + prompt = prompt + f' (default is {default_value}): ' + elif cancel_n: + prompt = prompt + ' (enter n to skip): ' + + user_response = input(prompt) + while not user_response: + if default_value: + return default_value + user_response = input(prompt) + if cancel_n and user_response.lower() == 'n': + return False + + return user_response + + +def request_modules(modules_system: ModulesSystem) -> list: + '''Request modules to be loaded''' + + modules_to_load = user_descr( + 'Do you require any modules to be loaded?\n' + 'Please write the modules names separated by commas', + cancel_n=True + ) + + if modules_to_load: + modules_to_load = [mod.strip() for mod in modules_to_load.split(',')] + # Check that all the modules are available in the system + check_modules = False + while not check_modules: + modules_ok = 0 + index_remove = [] + for m_i, module in enumerate(modules_to_load): + new_module = '' + # Check if the module system finds the requested module + mod_options = modules_system.available_modules(module) + if not mod_options: + new_module = user_descr(f'Module {module} not available.\n' + 'Specify the right one', + cancel_n=True) + elif len(mod_options) > 1: + # Several versions were detected + getlogger().debug( + 'There are multiple versions of the ' + f'module {module}: \n{mod_options}.\n' + ) + new_module = user_selection(mod_options, cancel_n=True) + if new_module: + modules_ok += 1 + else: + modules_ok += 1 + + if new_module: + modules_to_load[m_i] = new_module + elif new_module != '': + index_remove.append(m_i) + + if len(modules_to_load) == modules_ok: + check_modules = True + elif len(modules_to_load) - len(index_remove) == modules_ok: + for i in index_remove[::-1]: + modules_to_load.pop(i) + check_modules = True + else: + for i in index_remove[::-1]: + modules_to_load.pop(i) + + return modules_to_load diff --git a/config/utilities/job_util.py b/config/utilities/job_util.py new file mode 100644 index 00000000..d99b2295 --- /dev/null +++ b/config/utilities/job_util.py @@ -0,0 +1,930 @@ +# Copyright 2024 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import asyncio +import fnmatch +import grp +import os +import re +import shutil +import subprocess +import sys +import tempfile +from contextlib import contextmanager +from typing import Union +from utilities.constants import (amd_gpu_architecture, + containers_detect_bash, + devices_detect_bash, + nvidia_gpu_architecture, + resources) +from utilities.io import (getlogger, status_bar, user_descr, + user_integer, user_selection, user_yn) + +WDIR = os.getcwd() +TIME_OUT_POLICY = 200 + + +@contextmanager +def change_dir(destination: str): + try: + os.chdir(destination) # Change to the new directory + yield + finally: + os.chdir(WDIR) # Change back to the original directory + +# TODO: create a common base class for Scheduler and Launcher + + +class Scheduler: + '''Scheduler detector''' + + def __init__(self): + self._scheduler_dic = [{'name': 'flux', 'cmd': 'flux'}, + {'name': 'lsf', 'cmd': 'bsub'}, + {'name': 'oar', 'cmd': 'oarsub'}, + {'name': 'pbs', 'cmd': 'pbsnodes'}, + {'name': 'sge', 'cmd': 'qconf'}, + {'name': 'squeue', 'cmd': 'squeue'}, + {'name': 'slurm', 'cmd': 'sacct'}] + self._name = None + + def detect_scheduler(self, user_input: bool = True): + + schedulers_found = [] + for schd in self._scheduler_dic: + try: + subprocess.run( + ['which', f'{schd["cmd"]}'], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=True, check=True + ) + except subprocess.CalledProcessError: + pass + else: + schedulers_found.append(schd['name']) + + if not schedulers_found: + self._scheduler = 'local' + getlogger().warning( + 'No remote scheduler was detected in the system' + ) + elif len(schedulers_found) > 1: + getlogger().debug( + 'The following schedulers were found: ' + f'{", ".join(schedulers_found)}' + ) + if user_input: + self._name = user_selection(schedulers_found) + else: + self._name = schedulers_found[-1] + else: + self._name = schedulers_found[0] + + getlogger().info(f'The scheduler is set to {self._name}\n') + + @property + def name(self): + return self._name + + +class Launcher: + '''Launcher detector''' + + def __init__(self): + self._launcher_dic = [{'name': 'alps', 'cmd': 'aprun'}, + {'name': 'clush', 'cmd': 'clush'}, + {'name': 'ibrun', 'cmd': 'ibrun'}, + {'name': 'lrun', 'cmd': 'lrun'}, + {'name': 'mpirun', 'cmd': 'mpirun'}, + {'name': 'mpiexec', 'cmd': 'mpiexec'}, + {'name': 'pdsh', 'cmd': 'pdsh'}, + {'name': 'srun', 'cmd': 'srun'}] + self._name = None + + def detect_launcher(self, user_input: bool = True): + + launchers_found = [] + for lnchr in self._launcher_dic: + try: + subprocess.run( + ['which', f'{lnchr["cmd"]}'], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=True, check=True + ) + except subprocess.CalledProcessError: + pass + else: + launchers_found.append(lnchr['name']) + + if not launchers_found: + self._name = 'local' + getlogger().warning( + 'No parallel launcher was detected in the system' + ) + elif len(launchers_found) > 1: + getlogger().debug('The following launchers were found: ' + f'{", ".join(launchers_found)}') + if user_input: + self._name = user_selection(launchers_found) + else: + self._name = launchers_found[-1] + else: + self._name = launchers_found[0] + + getlogger().info(f'The launcher is set to {self._name}\n') + + @property + def name(self): + return self._name + + +class SlurmContext: + + def __init__(self, modules_system: str, detect_containers: bool = True, + detect_devices: bool = True, wait: bool = True, + access_opt: list = [], tmp_dir: str = None): + self.node_types = [] + self.default_nodes = [] + self.reservations = [] + self.partitions = [] + self._account = grp.getgrgid(os.getgid()).gr_name + self._modules_system = modules_system + self._detect_containers = detect_containers + self._detect_devices = detect_devices + self._wait = wait + self._access = access_opt + self._job_poll = [] # Job id's to poll + self._p_n = 0 # Number of partitions created + self._keep_tmp_dir = False + if not tmp_dir: + self.TMP_DIR = tempfile.mkdtemp( + prefix='reframe_config_detection_', dir=os.getenv('SCRATCH')) + else: + self.TMP_DIR = tempfile.mkdtemp( + prefix='reframe_config_detection_', dir=tmp_dir) + + def search_node_types(self, exclude_feats: list = []): + + getlogger().debug('Filtering nodes based on ActiveFeatures...') + try: + nodes_info = subprocess.run( + 'scontrol show nodes -o | grep "ActiveFeatures"', + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=True, check=True, shell=True + ) + nodes_info = nodes_info.stdout + raw_node_types = re.findall( + r'ActiveFeatures=([^ ]+) .*? Partitions=([^ ]+)', nodes_info) + # Unique combinations of features and partitions + raw_node_types = set(raw_node_types) + # List of [[features, partition]...] + raw_node_types = [[tuple(n[0].split(',')), tuple( + n[1].split(','))] for n in raw_node_types] + except Exception: + getlogger().error( + 'Node types could not be retrieved from scontrol' + ) + return + + default_partition = subprocess.run( + 'scontrol show partitions -o | grep "Default=YES"', + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=True, check=True, shell=True + ) + default_partition = re.findall( + r'PartitionName=([\w]+)', default_partition.stdout)[0] + getlogger().debug(f'Detected default partition: {default_partition}') + if not default_partition: + default_partition = None + getlogger().warning('Default partition could not be detected') + + self._set_nodes_types(exclude_feats, raw_node_types, default_partition) + + def _set_nodes_types(self, exclude_feats: list, raw_node_types: list, + default_partition: Union[str, None]): + + default_nodes = [] # Initialize the list of node types in the default + # Initialize the list of node types (with filtered features) + node_types = [] + + for node in raw_node_types: + node_feats_raw = list(node[0]) # Before filtering features + node_feats = node_feats_raw + node_partition = node[1] + if exclude_feats: # Filter features + node_feats = self._filter_node_feats( + exclude_feats, node_feats_raw) + if node_feats: # If all features were removed, empty list + node_types.append(tuple(node_feats)) + # The nodes in the default partition based on their raw feats + if default_partition in node_partition: + # default_nodes.append([tuple(node_feats_raw),tuple(node_feats)]) + default_nodes.append(tuple(node_feats)) + + default_nodes = set(default_nodes) + if len(default_nodes) > 1: + # Then all node types require the features in the access options + self.default_nodes = set() + else: + # self.default_nodes = default_nodes[0][1] # Get the filtered feats + self.default_nodes = default_nodes # Get the filtered features + + getlogger().debug( + f'\nThe following {len(set(node_types))} ' + 'node types were detected:') + for node_t in set(node_types): + getlogger().debug(node_t) + getlogger().info('') + + self.node_types = set(node_types) # Get the unique combinations + + @staticmethod + def _filter_node_feats(exclude_feats: list, node_feats: list) -> list: + '''Filter the node types excluding the specified fixtures''' + node_valid_feats = [] + for feat in node_feats: # loop around the features + feat_valid = not any([fnmatch.fnmatch(feat, pattern) + for pattern in exclude_feats]) + if feat_valid: + node_valid_feats.append(feat) + return node_valid_feats + + def _find_devices(self, node_feats: list) -> Union[dict, None]: + + getlogger().debug( + f'Detecting devices for node with features {node_feats}...') + devices = subprocess.run( + 'scontrol show nodes -o | grep ' + f'"ActiveFeatures=.*{".*,.*".join(node_feats)}.*"', + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=True, check=True, shell=True) + try: + devices_raw = re.findall(r'Gres=([\w,:()]+)', devices.stdout)[0] + except Exception: + getlogger().warning('Unable to detect the devices in the node') + return None + + devices = [item.rsplit(':', 1)[0] + for item in devices_raw.split(',')] # Remove the number + devices = [','.join(devices)] + if len(devices) > 1: + # This means that the nodes with this set of features + # do not all have the same devices installed. If the + # nodes have all the same model of GPUs but different + # number, it is considered as the same devices type + # so we don't raise this msg + getlogger().warning('Detected different devices in nodes ' + 'with the same set of features.\n' + 'Please check the devices option in ' + 'the configuration file.') + return None + elif '(null)' in list(devices) or 'gpu' not in next(iter(devices)): + # Detects if the nodes have no devices installed at + # all or if not GPUs are installed + getlogger().debug('No devices were found for this node type.') + return None + else: + getlogger().debug('Detected GPUs.') + # We only reach here if the devices installation + # is homogeneous accross the nodes + return self._count_gpus(devices_raw) + + @staticmethod + def _get_access_partition(node_feats: list) -> Union[str, None]: + + nd_partitions = subprocess.run( + 'scontrol show nodes -o | grep ' + f'"ActiveFeatures=.*{".*,.*".join(node_feats)}.*"', + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=True, check=True, shell=True) + nd_partitions = re.findall( + r'Partitions=([\w,:()]+)', nd_partitions.stdout) + nd_partitions = set(nd_partitions) + if len(nd_partitions) > 1: + return None + else: + nd_partitions = nd_partitions.pop().split(",") + for n_f in node_feats: + if n_f in nd_partitions: + return f'-p{n_f}' + if len(nd_partitions) == 1: + return f'-p{nd_partitions[0]}' + + @staticmethod + def _count_gpus(node_devices_raw: str) -> dict: + + # This method receives as input a string with the + # devices in the nodes + + # If more than one device is installed, we get the list + # Example: node_devices = 'gpu:2,craynetwork:6' + node_devices = node_devices_raw.split(",") + devices_dic = {} + for dvc in node_devices: + # Check if the device is a GPU + # There will be at least 1 GPU + if 'gpu' in dvc: + # Get the device model gpu or gpu:a100 + device_type = dvc.rsplit(":", 1)[0] + # Get the number of devices + devices_n = int(dvc.rsplit(":", 1)[1]) + # Save the minimum number found in all nodes + if device_type in devices_dic: + dvc_n = devices_dic[device_type] + if devices_n < dvc_n: + devices_dic[device_type] = devices_n + else: + devices_dic.update({device_type: devices_n}) + + return devices_dic + + def search_reservations(self): + + getlogger().debug('Searching for reservations...') + reservations_info = subprocess.run( + 'scontrol show res | grep "ReservationName"', + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=True, check=True, shell=True + ) + reservations_info = reservations_info.stdout + # Detecting the different types of nodes in the system + reservations = None + reservations = re.findall( + r'ReservationName=([\w-]+)', reservations_info) + self.reservations = reservations + if not reservations: + getlogger().warning('Unable to retrieve reservations') + getlogger().info('') + + @staticmethod + def _check_gpus_count(node_devices_slurm: dict, + node_devices_job: dict) -> list: + + gpus_slurm_count = 0 # Number of GPUs from Slurm Gres + gpus_job_count = 0 # Number of GPUs from remote job detection + devices = [] + + # Check that the same number of GPU models are the same + if len(node_devices_job) != len(node_devices_slurm): + getlogger().warning( + 'WARNING: discrepancy between the ' + 'number of GPU models\n' + f'GPU models from Gres ({len(node_devices_slurm)}) ' + f'GPU models from job ({len(node_devices_job)}) ' + ) + + # Get the total number of GPUs (independently of the model) + for gpu_slurm in node_devices_slurm: + gpus_slurm_count += node_devices_slurm[gpu_slurm] + + # Format the dictionary of the devices for the configuration file + # and get the total number of GPUs found + for gpu_job in node_devices_job: + devices.append({'type': 'gpu', + 'model': gpu_job, + 'arch': nvidia_gpu_architecture.get(gpu_job) or + amd_gpu_architecture.get(gpu_job), + 'num_devices': node_devices_job[gpu_job]}) + gpus_job_count += node_devices_job[gpu_job] + + if gpus_job_count != gpus_slurm_count: + getlogger().warning('The total number of detected GPUs ' + f'({gpus_job_count}) ' + 'differs from the (minimum) in GRes ' + f'from slurm({gpus_slurm_count}).') + if gpus_job_count > gpus_slurm_count: + getlogger().debug('It might be that nodes in this partition ' + 'have different number of GPUs ' + 'of the same model.\nIn the config, the ' + 'minimum number of GPUs that will ' + 'be found in the nodes of this partition ' + 'is set.\n') + elif gpus_job_count < gpus_slurm_count: + getlogger().error( + 'Lower number of GPUs were detected in the node.\n') + + return devices + + def create_login_partition(self, user_input: bool = True): + create_p = True + if user_input: + create_p = user_yn('Do you want to create a partition ' + 'for the login nodes?') + if create_p: + max_jobs = 4 + time_limit = '2m' + if user_input: + max_jobs = user_integer('Maximum number of forced local build ' + 'or run jobs allowed?', + default_value=max_jobs) + time_limit = user_descr('Time limit for the jobs submitted ' + 'on this partition?\nEnter "null" for' + ' no time limit', + default_value=time_limit) + self.partitions.append( + {'name': 'login', + 'scheduler': 'local', + 'time_limit': time_limit, + 'environs': ['builtin'], + 'max_jobs': max_jobs, + 'launcher': 'local'}) + + async def create_remote_partition(self, node_feats: tuple, launcher: str, + scheduler: str, user_input: bool = True): + + node_features = list(node_feats) + _detect_devices = self._detect_devices + _detect_containers = self._detect_containers + create_p = True + if user_input: + create_p = user_yn('Do you want to create a partition ' + f'for the node with features: {node_feats}?') + if create_p: + self._p_n += 1 + access_options = [f'--account={self._account}'] + if self._access: + access_options = access_options + self._access + access_node = [] + if node_feats not in self.default_nodes: + access_node = '&'.join(node_features) + name = f'partition_{self._p_n}' + if not user_input: + getlogger().info(f'{name} : {node_feats}', color=False) + max_jobs = 100 + time_limit = '10m' + container_platforms = [] + devices = [] + + # If user_input requested, these values will be changed according + if user_input: + name = user_descr('How do you want to name the partition?', + default_value=name) + + max_jobs = user_integer('Maximum number of forced local build ' + 'or run jobs allowed?', + default_value=max_jobs) + + time_limit = user_descr('Time limit for the jobs submitted ' + 'on this partition?\nEnter "null" for ' + 'no time limit', + default_value=time_limit) + + getlogger().debug( + f'The associated group "{self._account}" was added to the ' + 'slurm access options -A' + ) + if access_node: + getlogger().debug( + 'This node type is not the node type by ' + 'default, I added the required constraints:' + f' --constraint="{access_node}".' + ) + access_user = user_descr( + 'Do you need any additional access options?', + cancel_n=True + ) + if access_user: + access_options.append(access_user) + + _detect_containers = user_yn( + 'Do you require remote containers detection?') + + if _detect_devices: + # Retrieve a dictionary with the devices info + # If GRes for these nodes is 'gpu:a100:*' + # The returned dict will be: + # {'gpu:a100' : min(*)} + _detect_devices = self._find_devices(node_features) + + getlogger().info('') + + # Handle the job submission only if required + if _detect_devices or _detect_containers: + self._keep_tmp_dir = True + # All this must be inside a function + remote_job = JobRemoteDetect( + self.TMP_DIR, _detect_containers, _detect_devices) + access_partition = self._get_access_partition(node_features) + access_options = await remote_job.job_submission( + name, access_options, access_node, + access_partition, wait=self._wait + ) + if not self._wait and remote_job.job_id: + self._job_poll.append(remote_job.job_id) + # Here, the job failed or the output was already read + else: + if remote_job.container_platforms: + container_platforms = remote_job.container_platforms + if 'tmod' not in self._modules_system and \ + 'lmod' not in self._modules_system: + getlogger().warning( + '\nContainer platforms were ' + 'detected but the automatic detection ' + 'of required modules is not possible ' + f'with {self._modules_system}.' + ) + else: + getlogger().info('') + # Add the container platforms in the features + for cp_i, cp in enumerate(container_platforms): + getlogger().debug( + f'Detected container platform {cp["type"]} ' + 'in partition "{name}"' + ) + node_features.append(cp['type'].lower()) + else: + getlogger().debug( + '\n\nNo container platforms were detected in ' + f'partition "{name}"' + ) + + if remote_job.devices: + # Issue any warning regarding missconfigurations + # between Gres and the detected devices + getlogger().info(f"\nGPUs found in partition {name}") + devices = self._check_gpus_count( + _detect_devices, remote_job.devices) + + elif access_node: + # No jobs were launched so we cannot check the access options + access_options.append(access_node) + + # Create the partition + self.partitions.append( + {'name': name, + 'scheduler': scheduler, + 'time_limit': time_limit, + 'environs': ['builtin'], + 'max_jobs': max_jobs, + 'resources': resources, + 'extras': {}, + 'env_vars': [], + 'launcher': launcher, + 'access': access_options, + 'features': node_features+['remote'], + 'devices': devices, + 'container_platforms': container_platforms} + ) + else: + getlogger().info('') + + def create_reserv_partition(self, reserv: str, launcher: str, + scheduler: str, user_input: bool = True): + + self._p_n += 1 + access_options = [f'--account={self._account}'] + access_options.append(f'--reservation={reserv}') + name = f'{reserv}' + if not user_input: + getlogger().info( + f'Creating partition "{name}" for reservation: {reserv}', + color=False + ) + max_jobs = 100 + time_limit = '10m' + max_jobs = 100 + time_limit = '10m' + container_platforms = [] + devices = [] + + # If user_input requested, these values will be changed according + if user_input: + name = user_descr('How do you want to name the partition?', + default_value=name) + + max_jobs = user_integer('Maximum number of forced local build ' + 'or run jobs allowed?', + default_value=max_jobs) + + time_limit = user_descr('Time limit for the jobs submitted ' + 'on this partition?\nEnter "null" for no ' + 'time limit', default_value=time_limit) + + getlogger().debug( + f'The associated group "{self._account}" was added to the ' + 'slurm access options - A' + ) + getlogger().debug( + 'The reservation was added to the slurm access options ' + f'--reservation{reserv}.') + access_user = user_descr( + 'Do you need any additional access options?', cancel_n=True) + if access_user: + access_options.append(access_user) + + # Create the partition + self.partitions.append( + {'name': name, + 'scheduler': scheduler, + 'time_limit': time_limit, + 'environs': ['builtin'], + 'max_jobs': max_jobs, + 'resources': resources, + 'extras': {}, + 'env_vars': [], + 'launcher': launcher, + 'access': access_options, + 'features': [reserv, 'remote'], + 'devices': devices, + 'container_platforms': container_platforms} + ) + getlogger().info('') + + async def create_partitions(self, launcher: str, scheduler: str, + user_input: bool = True): + + # With no status bar + # await asyncio.gather(*(self.create_remote_partition(node,launcher, + # scheduler, user_input) for node in self.node_types)) + + all_partitions = asyncio.ensure_future(asyncio.gather( + *(self.create_remote_partition( + node, launcher, scheduler, user_input + ) for node in self.node_types))) + + status_task = None + try: + # 5 seconds delay until the bar appears + done, pending = await asyncio.wait( + [all_partitions, asyncio.ensure_future( + asyncio.sleep(10))], + return_when=asyncio.FIRST_COMPLETED + ) + # If the tasks are still running after 5 seconds, + # start the status bar + if not all_partitions.done(): + status_task = asyncio.ensure_future(status_bar()) + else: + # If no jobs were submitted it is possible that + # the sleep 5 secs is still pending + for task in pending: + task.cancel() + # Wait for all tasks to complete + await all_partitions + finally: + # Print warning if no partitions were created + if not self.partitions: + getlogger().error( + '\nNo partitions were created, ReFrame ' + 'requires at least one.\n' + ) + # Remove unused temp dir or print it + if self._keep_tmp_dir: + getlogger().info( + '\nYou can check the job submissions ' + f'in {self.TMP_DIR}.\n', color=False + ) + else: + shutil.rmtree(self.TMP_DIR) + # Cancel the status bar if it was started + if status_task: + status_task.cancel() + try: + await status_task # Ensure the status bar is canceled + except asyncio.CancelledError: + # Handle the cancellation gracefully + pass + sys.stdout.flush() + + +class JobRemoteDetect: + '''Job to detect information about the remote nodes''' + + _SBATCH_HEADER = ( + '#!/bin/bash\n' + '#SBATCH --job-name="Config_autodetection"\n' + '#SBATCH --ntasks=1\n' + '#SBATCH --output=config_autodetection_{partition_name}.out\n' + '#SBATCH --error=config_autodetection_{partition_name}.out\n' + '#SBATCH --time=0:2:0\n' + ) + _SBATCH_FILE = 'autodetection_{partition_name}.sh' + _OUTPUT_FILE = 'config_autodetection_{partition_name}.out' + + def __init__(self, tmp_dir: str, detect_containers: bool = True, + detect_devices: bool = True): + self._detect_containers = detect_containers + self._detect_devices = detect_devices + self.container_platforms = [] + self.devices = {} + self.job_id = None + self.TMP_DIR = tmp_dir + + def _prepare_job(self, partition_name: str, access_options: list): + with change_dir(self.TMP_DIR): + with open(self._SBATCH_FILE.format(partition_name=partition_name), + "w") as file: + file.write(self._SBATCH_HEADER.format( + partition_name=partition_name)) + for access in access_options: + file.write(f"#SBATCH {access}\n") + if self._detect_containers: + file.write(containers_detect_bash) + file.write("\n") + file.write("\n") + if self._detect_devices: + file.write(devices_detect_bash) + + async def _submit_job(self, partition_name: str, + wait: bool) -> Union[bool, None, str]: + + with change_dir(self.TMP_DIR): + cmd_parts = ['sbatch'] + if wait: + cmd_parts.append('-W') + cmd_parts += [f'autodetection_{partition_name}.sh'] + cmd = ' '.join(cmd_parts) + completed = await asyncio.create_subprocess_shell( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + cmd_out = await completed.stdout.readline() + cmd_out = cmd_out.decode().strip() + job_id = re.search(r'Submitted batch job (?P\d+)', cmd_out) + if job_id: + job_id = job_id.group('jobid') + getlogger().info( + f'\nJob submitted to partition {partition_name}: {job_id}' + ) + elif 'error' in cmd_out or not cmd_out: + return False + + try: + stdout, stderr = await asyncio.wait_for( + completed.communicate(), timeout=TIME_OUT_POLICY + ) + except asyncio.TimeoutError: + getlogger().warning( + f'\nJob submitted to {partition_name} took too long...' + ) + getlogger().info('Cancelling...', color=False) + subprocess.run( + f'scancel {job_id}', universal_newlines=True, + check=True, shell=True + ) + return 'cancelled' + + if completed.returncode != 0: + # Do not print error, second attempt with -p + return False + + stdout = stdout.decode('utf-8') + stdout = stdout.strip() + if not wait: + jobid = re.search(r'Submitted batch job (?P\d+)', + stdout) + if not jobid: + return None + else: + return jobid.group('jobid') + else: + return True + + async def job_submission(self, partition_name: str, + access_options: list, + access_node: list, + access_partition: Union[str, None], + wait: bool = False): + + if access_node: + self._prepare_job(partition_name, access_options + + [f'--constraint="{access_node}"']) + else: + self._prepare_job(partition_name, access_options) + job_exec = await self._submit_job(partition_name, wait) + cancelled = False + if job_exec == 'cancelled': + access_partition = '' # Avoid a resubmission + job_exec = False + cancelled = True + access_options.append(access_partition) + if job_exec and access_node: + access_options.append(f'--constraint="{access_node}"') + elif access_partition: + self._prepare_job(partition_name, [access_partition]) + job_exec = await self._submit_job(partition_name, wait) + if job_exec == 'cancelled': + job_exec = False + cancelled = True + access_options.append(access_partition) + if job_exec: + access_options.append(access_partition) + + if job_exec and wait: + self._extract_info(partition_name) + elif not job_exec and not cancelled: + if access_node: + access_options.append(f'--constraint="{access_node}"') + getlogger().error( + f'The autodetection script for "{partition_name}" ' + 'could not be submitted\n' + 'Please check the sbatch options ("access" field ' + 'in the partition description).' + ) + else: + # TODO I should check here that job_exec is a number + self.job_id = job_exec + + return access_options # return the access options that worked + + @staticmethod + def _parse_devices(file_path: str) -> dict: + '''Extract the information about the GPUs from the job output''' + gpu_info = {} # Initialize the dict for GPU info + nvidia_gpus_found = False + amd_gpus_found = False + + with open(file_path, 'r') as file: + lines = file.readlines() + + for line in lines: + # Check for NVIDIA GPUs + if "NVIDIA GPUs installed" in line: + nvidia_gpus_found = True + elif line == '\n': + nvidia_gpus_found = False + elif not line or "Batch Job Summary" in line: + break + elif nvidia_gpus_found: + model = [ + gpu_m for gpu_m in nvidia_gpu_architecture + if gpu_m in line + ] + if len(model) > 1: + model = [] + if model: + if model[0] not in gpu_info: + gpu_info.update({model[0]: 1}) + else: + gpu_info[model[0]] += 1 + + # Check for AMD GPUs + if "AMD GPUs" in line: + amd_gpus_found = True + amd_lines = [] + elif line == '\n' or "lspci" in line: + amd_gpus_found = False + elif not line or "Batch Job Summary" in line: + break + elif amd_gpus_found: + if line not in amd_lines: + amd_lines.append(line) + model = [ + gpu_m for gpu_m in amd_gpu_architecture + if gpu_m in line + ] + if len(model) > 1: + model = [] + if model: + if model[0] not in gpu_info: + gpu_info.update({model[0]: 1}) + else: + gpu_info[model[0]] += 1 + else: + pass + + return gpu_info + + @staticmethod + def _parse_containers(file_path: str) -> list: + '''Extract the information about the containers from the job output''' + containers_info = [] + containers_found = False + + with open(file_path, 'r') as file: + lines = file.readlines() + + for line in lines: + if "Installed containers" in line: + containers_found = True + elif "GPU" in line or line == "\n" or "Batch Job Summary" in line: + containers_found = False + break + elif containers_found: + type = line.split(' modules: ')[0].strip() + try: + modules = line.split(' modules: ')[1].split(', ') + modules = [m.strip() for m in modules] + if modules[0] != '': + modules.append(type.lower()) + else: + modules = [type.lower()] + except Exception: + modules = [] + containers_info.append({'type': type, 'modules': modules}) + + return containers_info + + def _extract_info(self, partition_name: str): + + file_path = os.path.join( + self.TMP_DIR, self._OUTPUT_FILE.format( + partition_name=partition_name + ) + ) + if self._detect_containers: + self.container_platforms = self._parse_containers(file_path) + + if self._detect_devices: + self.devices = self._parse_devices(file_path) diff --git a/config/utilities/modules.py b/config/utilities/modules.py new file mode 100644 index 00000000..c7a69334 --- /dev/null +++ b/config/utilities/modules.py @@ -0,0 +1,323 @@ +# Copyright 2024 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import abc +import os +import re +import subprocess + + +class ModulesSystem(abc.ABC): + '''Abstract base class for module systems. + + :meta private: + ''' + + @abc.abstractmethod + def _execute(self, cmd: str, *args): + '''Execute an arbitrary command of the module system.''' + + @abc.abstractmethod + def available_modules(self, substr: str): + '''Return a list of available modules, whose name contains ``substr``. + + This method returns a list of Module instances. + ''' + @property + @abc.abstractmethod + def name(self): + '''Return the name of this module system.''' + + @property + @abc.abstractmethod + def version(self): + '''Return the version of this module system.''' + + @abc.abstractmethod + def modulecmd(self, *args): + '''The low level command to use for issuing module commads''' + + +class TMod32Impl(ModulesSystem): + '''Base class for TMod Module system (Tcl).''' + + MIN_VERSION = (3, 2) + + def __init__(self): + self._version = None + self.found = self._do_validate() + + def _do_validate(self) -> bool: + # Try to figure out if we are indeed using the TCL version + try: + completed = subprocess.run( + ['modulecmd', '-V'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, check=True + ) + except Exception: + return False + + version_match = re.search(r'^VERSION=(\S+)', completed.stdout, + re.MULTILINE) + tcl_version_match = re.search(r'^TCL_VERSION=(\S+)', completed.stdout, + re.MULTILINE) + + if version_match is None or tcl_version_match is None: + return False + + version = version_match.group(1) + try: + ver_major, ver_minor = [int(v) for v in version.split('.')[:2]] + except ValueError: + return False + + if (ver_major, ver_minor) < self.MIN_VERSION: + return False + + self._version = version + + return True + + @property + def name(self) -> str: + return 'tmod32' + + @property + def version(self) -> str: + return self._version + + def modulecmd(self, *args) -> str: + return ['modulecmd', 'python', *args] + + def _execute(self, cmd: str, *args) -> str: + + modulecmd = self.modulecmd(cmd, *args) + completed = subprocess.run( + modulecmd, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, check=True + ) + if re.search(r'\bERROR\b', completed.stderr) is not None: + return '' + + return completed.stderr + + def available_modules(self, substr: str) -> list: + output = self._execute('avail', '-t', substr) + ret = [] + for line in output.split('\n'): + if not line or line[-1] == ':': + # Ignore empty lines and path entries + continue + + module = re.sub(r'\(default\)', '', line) + ret.append(module) + + return ret + + +class TMod31Impl(TMod32Impl): + '''Module system for TMod (Tcl).''' + + MIN_VERSION = (3, 1) + + def __init__(self): + self._version = None + self._command = None + self.found = self._do_validate() + + def _do_validate(self) -> bool: + # Try to figure out if we are indeed using the TCL version + try: + modulecmd = os.getenv('MODULESHOME') + modulecmd = os.path.join(modulecmd, 'modulecmd.tcl') + completed = subprocess.run( + [modulecmd], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, check=True) + except OSError: + return False + + version_match = re.search(r'Release Tcl (\S+)', completed.stderr, + re.MULTILINE) + tcl_version_match = version_match + + if version_match is None or tcl_version_match is None: + return False + + version = version_match.group(1) + try: + ver_major, ver_minor = [int(v) for v in version.split('.')[:2]] + except ValueError: + return False + + if (ver_major, ver_minor) < self.MIN_VERSION: + return False + + self._version = version + self._command = f'{modulecmd} python' + + return True + + @property + def name(self) -> str: + return 'tmod31' + + def modulecmd(self, *args): + return [self._command, *args] + + def _execute(self, cmd: str, *args) -> str: + + modulecmd = self.modulecmd(cmd, *args) + completed = subprocess.run( + modulecmd, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, check=True + ) + if re.search(r'\bERROR\b', completed.stderr) is not None: + return '' + + exec_match = re.search(r"^exec\s'(\S+)'", completed.stdout, + re.MULTILINE) + + with open(exec_match.group(1), 'r') as content_file: + cmd = content_file.read() + + return completed.stderr + + +class TMod4Impl(TMod32Impl): + '''Module system for TMod 4.''' + + MIN_VERSION = (4, 1) + + def __init__(self): + self._version = None + self._extra_module_paths = [] + self.found = self._do_validate() + + def _do_validate(self): + try: + completed = subprocess.run( + self.modulecmd('-V'), + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, check=True + ) + except Exception: + return False + + version_match = re.match(r'^Modules Release (\S+)\s+', + completed.stderr) + if not version_match: + return False + + version = version_match.group(1) + try: + ver_major, ver_minor = [int(v) for v in version.split('.')[:2]] + except ValueError: + return False + + if (ver_major, ver_minor) < self.MIN_VERSION: + return False + + self._version = version + return True + + @property + def name(self) -> str: + return 'tmod4' + + def modulecmd(self, *args) -> list: + return ['modulecmd', 'python', *args] + + def _execute(self, cmd: str, *args) -> str: + + modulecmd = self.modulecmd(cmd, *args) + completed = subprocess.run( + modulecmd, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, check=True + ) + namespace = {} + + # _mlstatus is set by the TMod4 only if the command was unsuccessful, + # but Lmod sets it always + if not namespace.get('_mlstatus', True): + return '' + + return completed.stderr + + +class LModImpl(TMod4Impl): + '''Module system for Lmod (Tcl/Lua).''' + + def __init__(self): + self._extra_module_paths = [] + self._version = None + self.found = self._do_validate() + + def _do_validate(self) -> bool: + # Try to figure out if we are indeed using LMOD + self._lmod_cmd = os.getenv('LMOD_CMD') + if self._lmod_cmd is None: + return False + + try: + completed = subprocess.run( + [f'{self._lmod_cmd}', '--version'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, check=True + ) + except Exception: + return False + + version_match = re.search(r'.*Version\s*(\S+)', completed.stderr, + re.MULTILINE) + if version_match is None: + return False + + self._version = version_match.group(1) + # Try the Python bindings now + completed = subprocess.run( + self.modulecmd(), + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, check=False + ) + if '_mlstatus' not in completed.stdout: + return False + + if re.search(r'Unknown shell type', completed.stderr): + return False + + return True + + @property + def name(self) -> str: + return 'lmod' + + def modulecmd(self, *args) -> list: + return [self._lmod_cmd, 'python', *args] + + def available_modules(self, substr: str) -> list: + output = self._execute('-t', 'avail', substr) + ret = [] + for line in output.split('\n'): + if not line or line[-1] == ':': + # Ignore empty lines and path entries + continue + + module = re.sub(r'\(\S+\)', '', line) + ret.append(module) + + return ret + + +modules_impl = { + 'tmod31': TMod31Impl, + 'tmod32': TMod32Impl, + 'tmod4': TMod4Impl, + 'lmod': LModImpl, +}