diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 186cbb57..6fb52d64 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -9,20 +9,22 @@ "service": "test", // "shutdownAction": "stopCompose", "workspaceFolder": "/workspace", - // Features to add to the dev container. More info: https://containers.dev/features. // "features": {}, - // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], - // Use 'postCreateCommand' to run commands after the container is created. // "postCreateCommand": "uname -a", "postCreateCommand": "pip install -e ." - // Configure tool-specific properties. - // "customizations": {}, - + , + "customizations": { + "vscode": { + "extensions": [ + "ms-python.python" + ] + } + } // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. // "remoteUser": "root" } diff --git a/docs/configuration.md b/docs/configuration.md index d5bd1732..c250693c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -381,6 +381,28 @@ The config.yaml in current working directory is used. +```yaml +destinations: + grid: + scheduler: + type: dirac + storage_element: StorageElementOne + proxy: + # Passphrase for ~/.globus/userkey.pem + password_file: /path/to/passwordfile + filesystem: + type: dirac + lfn_root: /tutoVO/user/c/ciuser/bartenderjobs + storage_element: StorageElementOne + proxy: + password_file: /path/to/passwordfile +``` + +### Example of running jobs on a DIRAC grid with myproxy + +Requires [diracos](https://github.com/DIRACGrid/DIRACOS2) and dirac.cfg +to be installed and configured. + ```yaml destinations: grid: @@ -389,12 +411,22 @@ destinations: storage_element: StorageElementOne proxy: log_level: DEBUG + myproxy: + username: myusername + password_file: /path/to/passwordfile + proxy_rfc: /tmp/x509up_u1000-rfc + proxy: /tmp/x509up_u1000 filesystem: type: dirac lfn_root: /tutoVO/user/c/ciuser/bartenderjobs storage_element: StorageElementOne proxy: log_level: DEBUG + myproxy: + username: myusername + password_file: /path/to/passwordfile + proxy_rfc: /tmp/x509up_u1000-rfc + proxy: /tmp/x509up_u1000 ``` ### Example of running jobs direct on submission diff --git a/docs/develop.md b/docs/develop.md index 3c11f349..b8ae6b24 100644 --- a/docs/develop.md +++ b/docs/develop.md @@ -203,7 +203,7 @@ the `WorkloadManagement_SiteDirector` service starting a pilot. To look around inside the DIRAC server use ```shell -docker-compose -f tests_dirac/docker-compose.yml exec dirac-tuto bash +docker compose -f tests_dirac/docker-compose.yml exec dirac-tuto bash ``` Sometimes the DIRAC server needs clearing of its state, diff --git a/src/bartender/filesystems/dirac.py b/src/bartender/filesystems/dirac.py index 03f50e9f..888389e0 100644 --- a/src/bartender/filesystems/dirac.py +++ b/src/bartender/filesystems/dirac.py @@ -71,7 +71,7 @@ async def upload(self, src: JobDescription, target: JobDescription) -> None: input_tar_on_grid = target.job_dir / archive_fn.name logger.warning( f"Uploading {archive_fn} to {input_tar_on_grid}" - f"on {self.storage_element}", + f" on {self.storage_element}", ) result = await put( lfn=str(input_tar_on_grid), diff --git a/src/bartender/filesystems/dirac_config.py b/src/bartender/filesystems/dirac_config.py index 3dd6a278..c2c3089f 100644 --- a/src/bartender/filesystems/dirac_config.py +++ b/src/bartender/filesystems/dirac_config.py @@ -29,7 +29,7 @@ def _validate_lfn_root( cls, # noqa: N805 signature of validator v: str, # noqa: WPS111 signature of validator ) -> str: - pattern = r"^\/\w+\/user\/([a-zA-Z])\/\1\w+\/.*$" + pattern = r"^\/[\w\.]+\/user\/([a-zA-Z])\/\w+\/.*$" if not re.match(pattern, v): template = "//user///" raise ValueError( diff --git a/src/bartender/schedulers/dirac.py b/src/bartender/schedulers/dirac.py index 6fcc5ccc..0d7209ef 100644 --- a/src/bartender/schedulers/dirac.py +++ b/src/bartender/schedulers/dirac.py @@ -249,8 +249,14 @@ def _job_script_content(self, description: JobDescription) -> str: def _command_script(self, description: JobDescription) -> str: command = description.command + dl_image = "" if self.config.apptainer_image: - image = self.config.apptainer_image + image = str(self.config.apptainer_image) + if self.config.apptainer_image.is_relative_to(_lfn_user_home(description)): + image = self.config.apptainer_image.name + # Also exclude sif file from in output.tar + lfn_image = self.config.apptainer_image + dl_image = f"dirac-dms-get-file {lfn_image} && echo {image} >> .input_files.txt" # noqa: E501 # TODO if command is complex then qoutes are likely needed command = f"apptainer run {image} {description.command}" # added echo so DIRAC @@ -265,6 +271,7 @@ def _command_script(self, description: JobDescription) -> str: return dedent( f"""\ # Run command + {dl_image} echo 'Running command for {description.job_dir}' ({command}) > stdout.txt 2> stderr.txt echo -n $? > returncode @@ -302,6 +309,19 @@ async def _jdl_script(self, description: JobDescription, scriptdir: Path) -> str ) +def _lfn_user_home(description: JobDescription) -> Path: + """Return user's home directory on grid storage. + + Args: + description: Description of job. + + Returns: + User's home directory on grid storage. + """ + nr_home_dir_parts = 5 + return Path(*description.job_dir.parts[:nr_home_dir_parts]) + + def _relative_output_dir(description: JobDescription) -> Path: """Return description.output_dir relative to user's home directory. diff --git a/src/bartender/schedulers/dirac_config.py b/src/bartender/schedulers/dirac_config.py index 0774d1aa..f1cb00de 100644 --- a/src/bartender/schedulers/dirac_config.py +++ b/src/bartender/schedulers/dirac_config.py @@ -10,8 +10,10 @@ class DiracSchedulerConfig(BaseModel): """Configuration for DIRAC scheduler. Args: - apptainer_image: Path on cvmfs to apptainer image. - Will run application command inside apptainer image. + apptainer_image: Path on cvmfs or grid storage to apptainer image. + When set will run application command inside apptainer image. + Image can also be on grid storage, + it will then be downloaded to current directory first. storage_element: Storage element to upload output files to. proxy: Proxy configuration. """ diff --git a/src/bartender/shared/dirac.py b/src/bartender/shared/dirac.py index d485f318..0a8dcb84 100644 --- a/src/bartender/shared/dirac.py +++ b/src/bartender/shared/dirac.py @@ -1,5 +1,7 @@ import asyncio import logging +import shutil +from pathlib import Path from subprocess import ( # noqa: S404 security implications OK PIPE, CalledProcessError, @@ -11,7 +13,7 @@ from DIRAC.Core.Security.ProxyInfo import getProxyInfo from DIRAC.Core.Utilities.exceptions import DIRACInitError -from bartender.shared.dirac_config import ProxyConfig +from bartender.shared.dirac_config import MyProxyConfig, ProxyConfig logger = logging.getLogger(__file__) @@ -40,7 +42,12 @@ async def proxy_init(config: ProxyConfig) -> None: Raises: CalledProcessError: If failed to create proxy. + + Returns: + Nothing """ + if config.myproxy: + return await myproxy_init(config) cmd = _proxy_init_command(config) logger.warning(f"Running command: {cmd}") process = await asyncio.create_subprocess_exec( @@ -49,8 +56,13 @@ async def proxy_init(config: ProxyConfig) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) + password = None + if config.password: + password = config.password.encode() + elif config.password_file: + password = config.password_file.read_bytes() stdout, stderr = await process.communicate( - config.password.encode() if config.password else None, + password, ) if process.returncode: raise CalledProcessError(process.returncode, cmd, stderr=stderr, output=stdout) @@ -66,10 +78,16 @@ def sync_proxy_init(config: ProxyConfig) -> None: # but dirac-proxy-init script is too long to copy here # and password would be unpassable so decided to keep calling subprocess. cmd = _proxy_init_command(config) + password = None + if config.password: + password = config.password.encode() + elif config.password_file: + password = config.password_file.read_bytes() + logger.warning(f"Running command: {cmd}") run( # noqa: S603 subprocess call OK cmd, - input=config.password.encode() if config.password else None, + input=password, stdout=PIPE, stderr=PIPE, check=True, @@ -86,7 +104,7 @@ def _proxy_init_command(config: ProxyConfig) -> list[str]: parts.extend(["-K", config.key]) if config.group: parts.extend(["-g", config.group]) - if config.password: + if config.password or config.password_file: parts.append("-p") return parts @@ -162,3 +180,95 @@ async def teardown_proxy_renewer() -> None: task.cancel() await asyncio.gather(task, return_exceptions=True) renewer = None # noqa: WPS442 simpler then singleton + + +async def myproxy_init(config: ProxyConfig) -> None: + """ + Create or renew proxy using MyProxy server. + + Args: + config: The MyProxy configuration. + + Raises: + ValueError: If no MyProxy configuration is provided. + """ + if config.myproxy is None: + raise ValueError("No myproxy configuration") + + # myproxy-logon \ + # --pshost config.pshost \ + # --proxy_lifetime config.proxy_lifetime \ + # --username config.username \ + # --out tmprfcfile \ + # --stdin_pass \ + # < config.password_file + await myproxy_logon(config.myproxy) + # cp tmprfcfile proxyfile + await shutil.copy(config.myproxy.proxy_rfc, config.myproxy.proxy) + # dirac-admin-proxy-upload -d -P tmprfcfile + await proxy_upload(config.myproxy.proxy) + # then check that proxy is valid and has time left + get_time_left_on_proxy() + + +async def myproxy_logon(config: MyProxyConfig) -> None: + """ + Log in to MyProxy server using the provided configuration. + + Args: + config: The configuration object containing the necessary parameters. + + Raises: + CalledProcessError: If the MyProxy logon process returns a non-zero exit code. + + Returns: + None + """ + password = config.password_file.read_bytes() + cmd = [ + "myproxy-logon", + "--pshost", + config.pshost, + "--proxy_lifetime", + config.proxy_lifetime, + "--username", + config.username, + "--out", + str(config.proxy_rfc), + "--stdin_pass", + ] + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await process.communicate( + password, + ) + if process.returncode: + raise CalledProcessError(process.returncode, cmd, stderr=stderr, output=stdout) + + +async def proxy_upload(proxy: Path) -> None: + """ + Uploads the given proxy file using dirac-admin-proxy-upload command. + + Args: + proxy: The path to the proxy file. + + Raises: + CalledProcessError: If the subprocess returns a non-zero exit code. + + Returns: + None + """ + # TODO use Python library to upload proxy + cmd = ["dirac-admin-proxy-upload", "-d", "-P", str(proxy)] + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await process.communicate() + if process.returncode: + raise CalledProcessError(process.returncode, cmd, stderr=stderr, output=stdout) diff --git a/src/bartender/shared/dirac_config.py b/src/bartender/shared/dirac_config.py index 16b32a01..78fbcde4 100644 --- a/src/bartender/shared/dirac_config.py +++ b/src/bartender/shared/dirac_config.py @@ -1,7 +1,8 @@ import pkgutil +from pathlib import Path from typing import Literal, Optional -from pydantic import BaseModel +from pydantic import BaseModel, FilePath DIRAC_INSTALLED = ( pkgutil.find_loader("DIRAC") is not None @@ -23,6 +24,27 @@ ] +class MyProxyConfig(BaseModel): + """Configuration for MyProxy server. + + Args: + pshost: The hostname of the MyProxy server. + username: Username for the delegated proxy + proxy_lifetime: Lifetime of proxies delegated by the server + password_file: The path to the file containing the password for the proxy. + proxy_rfc: The path to the generated RFC proxy file. + proxy: The path to the generated proxy file. + This proxy file should be used submit and manage jobs. + """ + + pshost: str = "px.grid.sara.nl" + username: str + password_file: FilePath + proxy_lifetime: str = "167:59" # 7 days + proxy_rfc: Path + proxy: Path + + class ProxyConfig(BaseModel): """Configuration for DIRAC proxy. @@ -33,6 +55,9 @@ class ProxyConfig(BaseModel): valid: How long proxy should be valid. Format HH:MM. By default is 24 hours. password: The password for the private key file. + password_file: The path to the file containing + the password for the private key file. + Should not end with a newline. min_life: If proxy has less than this many seconds left, renew it. Default 30 minutes. log_level: The log level for the DIRAC logger. Default INFO. @@ -43,5 +68,7 @@ class ProxyConfig(BaseModel): group: Optional[str] = None valid: Optional[str] = None password: Optional[str] = None + password_file: Optional[FilePath] = None min_life: int = 1800 log_level: LogLevel = "INFO" + myproxy: Optional[MyProxyConfig] = None diff --git a/tests/filesystems/test_dirac_config.py b/tests/filesystems/test_dirac_config.py new file mode 100644 index 00000000..795cb7c7 --- /dev/null +++ b/tests/filesystems/test_dirac_config.py @@ -0,0 +1,33 @@ +import pytest + +from bartender.filesystems.dirac_config import DiracFileSystemConfig + + +class TestDiracFileSystemConfig: + def test_lfn_root_nodots(self) -> None: + config = DiracFileSystemConfig( + lfn_root="/tutoVO/user/c/ciuser/bartenderjobs", + storage_element="StorageElementOne", + ) + assert isinstance(config, DiracFileSystemConfig) + + def test_lfn_root_withdots(self) -> None: + config = DiracFileSystemConfig( + lfn_root="/tuto.VO/user/c/ciuser/bartender.jobs", + storage_element="StorageElementOne", + ) + assert isinstance(config, DiracFileSystemConfig) + + def test_lfn_root_initial_not_first(self) -> None: + config = DiracFileSystemConfig( + lfn_root="/tuto.VO/user/o/someone/bartenderjobs", + storage_element="StorageElementOne", + ) + assert isinstance(config, DiracFileSystemConfig) + + def test_lfn_root_bad(self) -> None: + with pytest.raises(ValueError): + DiracFileSystemConfig( + lfn_root="/", + storage_element="StorageElementOne", + ) diff --git a/tests_dirac/Dockerfile b/tests_dirac/Dockerfile index da1db05f..14fa4a90 100644 --- a/tests_dirac/Dockerfile +++ b/tests_dirac/Dockerfile @@ -1,4 +1,4 @@ -FROM ghcr.io/xenon-middleware/diracclient:8.0.39 +FROM ghcr.io/xenon-middleware/diracclient:8.0.49 RUN pip install --upgrade pip wheel setuptools && \ pip install poetry==1.7.0 @@ -15,4 +15,4 @@ COPY pyproject.toml poetry.lock ./ RUN poetry install --no-root --no-interaction --no-ansi --with dev # Workaround `ImportWarning: _SixMetaPathImporter.find_spec() not found; falling back to find_module()` error -RUN /home/diracuser/diracos/bin/micromamba install m2crypto==0.041.0 +RUN /home/diracuser/diracos/bin/micromamba install -y m2crypto==0.041.0 diff --git a/tests_dirac/docker-compose.yml b/tests_dirac/docker-compose.yml index e502b1fe..bb7f02ff 100644 --- a/tests_dirac/docker-compose.yml +++ b/tests_dirac/docker-compose.yml @@ -2,7 +2,7 @@ version: '3.9' services: dirac-tuto: - image: ghcr.io/xenon-middleware/dirac:8.0.39 + image: ghcr.io/xenon-middleware/dirac:8.0.49 privileged: true hostname: dirac-tuto test: diff --git a/tests_dirac/test_it.py b/tests_dirac/test_it.py index bc20c738..21630a2e 100644 --- a/tests_dirac/test_it.py +++ b/tests_dirac/test_it.py @@ -1,5 +1,6 @@ from asyncio import sleep from pathlib import Path +from textwrap import dedent import pytest @@ -306,3 +307,31 @@ async def test_filesystem_delete( await fs.download(gdescription, description) finally: await fs.close() + + +@pytest.mark.anyio +async def test_apptainer_image_on_lfn() -> None: + sched_config = DiracSchedulerConfig( + storage_element="StorageElementOne", + apptainer_image=Path("/tutoVO/user/c/ciuser/alpine.sif"), + ) + scheduler = DiracScheduler(sched_config) + + description = JobDescription( + command="echo hello", + job_dir=Path("/tutoVO/user/c/ciuser/bartenderjobs/job1"), + ) + script = scheduler._command_script(description) # noqa: WPS437 + + expected = dedent( + f"""\ + # Run command + dirac-dms-get-file /tutoVO/user/c/ciuser/alpine.sif && echo alpine.sif >> .input_files.txt + echo 'Running command for {description.job_dir}' + (apptainer run alpine.sif echo hello) > stdout.txt 2> stderr.txt + echo -n $? > returncode + cat stdout.txt + cat stderr.txt >&2 + """, # noqa: E501 + ) + assert script == expected