diff --git a/doc/conf.py b/doc/conf.py index 7e0b4b988d..3bf62cee4e 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # ESMValTool documentation build configuration file, created by -# sphinx-quickstart on Tue Jun 2 11:34:13 2015. +# sphinx-quickstart on Tue Jun 2 11:34:13 2015. # # This file is execfile()d with the current directory set to its # containing dir. diff --git a/doc/contributing.rst b/doc/contributing.rst index de7d319cc3..1381e45158 100644 --- a/doc/contributing.rst +++ b/doc/contributing.rst @@ -158,7 +158,7 @@ These include in particular: branch. If a strong objection is raised the backward-incompatible change should not be merged until the objection is resolved. - 🛠 Information required for the “*backward-incompatible changes*” - section in the PR that introduces the *backward-incompatible change* + section in the PR that introduces the *backward-incompatible change* available. .. _scientific_relevance: diff --git a/doc/quickstart/configure.rst b/doc/quickstart/configure.rst index 78ce5dcea2..5fee34db5b 100644 --- a/doc/quickstart/configure.rst +++ b/doc/quickstart/configure.rst @@ -140,79 +140,82 @@ For example, Python's ``None`` is YAML's ``null``, Python's ``True`` is YAML's | Option | Description | Type | Default value | +===============================+========================================+=============================+========================================+ | ``auxiliary_data_dir`` | Directory where auxiliary data is | :obj:`str` | ``~/auxiliary_data`` | -| | stored [#f1]_ | | | +| | stored. [#f1]_ | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``check_level`` | Sensitivity of the CMOR check | :obj:`str` | ``default`` | | | (``debug``, ``strict``, ``default`` | | | | | ``relaxed``, ``ignore``), see | | | -| | :ref:`cmor_check_strictness` | | | +| | :ref:`cmor_check_strictness`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ -| ``compress_netcdf`` | Use netCDF compression | :obj:`bool` | ``False`` | +| ``compress_netcdf`` | Use netCDF compression. | :obj:`bool` | ``False`` | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``config_developer_file`` | Path to custom | :obj:`str` | ``None`` (default file) | -| | :ref:`config-developer` | | | +| | :ref:`config-developer`. | | | ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ +| ``dask`` | :ref:`config-dask`. | :obj:`dict` | See :ref:`config-dask-defaults` | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``diagnostics`` | Only run the selected diagnostics from | :obj:`list` or :obj:`str` | ``None`` (all diagnostics) | -| | the recipe, see :ref:`running` | | | +| | the recipe, see :ref:`running`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``download_dir`` | Directory where downloaded data will | :obj:`str` | ``~/climate_data`` | -| | be stored [#f4]_ | | | +| | be stored. [#f4]_ | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ -| ``drs`` | Directory structure for input data | :obj:`dict` | ``{CMIP3: ESGF, CMIP5: ESGF, CMIP6: | +| ``drs`` | Directory structure for input data. | :obj:`dict` | ``{CMIP3: ESGF, CMIP5: ESGF, CMIP6: | | | [#f2]_ | | ESGF, CORDEX: ESGF, obs4MIPs: ESGF}`` | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``exit_on_warning`` | Exit on warning (only used in NCL | :obj:`bool` | ``False`` | -| | diagnostic scripts) | | | +| | diagnostic scripts). | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``extra_facets_dir`` | Additional custom directory for | :obj:`list` of :obj:`str` | ``[]`` | -| | :ref:`extra_facets` | | | +| | :ref:`extra_facets`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``log_level`` | Log level of the console (``debug``, | :obj:`str` | ``info`` | -| | ``info``, ``warning``, ``error``) | | | +| | ``info``, ``warning``, ``error``). | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ -| ``logging`` | :ref:`config-logging` | :obj:`dict` | | +| ``logging`` | :ref:`config-logging`. | :obj:`dict` | See :ref:`config-logging` | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``max_datasets`` | Maximum number of datasets to use, see | :obj:`int` | ``None`` (all datasets from recipe) | -| | :ref:`running` | | | +| | :ref:`running`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``max_parallel_tasks`` | Maximum number of parallel processes, | :obj:`int` | ``None`` (number of available CPUs) | -| | see also :ref:`task_priority` | | | +| | see also :ref:`task_priority`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``max_years`` | Maximum number of years to use, see | :obj:`int` | ``None`` (all years from recipe) | -| | :ref:`running` | | | +| | :ref:`running`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``output_dir`` | Directory where all output will be | :obj:`str` | ``~/esmvaltool_output`` | -| | written, see :ref:`outputdata` | | | +| | written, see :ref:`outputdata`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ -| ``output_file_type`` | Plot file type | :obj:`str` | ``png`` | +| ``output_file_type`` | Plot file type. | :obj:`str` | ``png`` | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``profile_diagnostic`` | Use a profiling tool for the | :obj:`bool` | ``False`` | -| | diagnostic run [#f3]_ | | | +| | diagnostic run. [#f3]_ | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``remove_preproc_dir`` | Remove the ``preproc`` directory if | :obj:`bool` | ``True`` | | | the run was successful, see also | | | -| | :ref:`preprocessed_datasets` | | | +| | :ref:`preprocessed_datasets`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``resume_from`` | Resume previous run(s) by using | :obj:`list` of :obj:`str` | ``[]`` | | | preprocessor output files from these | | | -| | output directories, see :ref:`running` | | | +| | output directories, see | | | +| | ref:`running`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``rootpath`` | Rootpaths to the data from different | :obj:`dict` | ``{default: ~/climate_data}`` | -| | projects [#f2]_ | | | +| | projects. [#f2]_ | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``run_diagnostic`` | Run diagnostic scripts, see | :obj:`bool` | ``True`` | -| | :ref:`running` | | | +| | :ref:`running`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``save_intermediary_cubes`` | Save intermediary cubes from the | :obj:`bool` | ``False`` | | | preprocessor, see also | | | -| | :ref:`preprocessed_datasets` | | | +| | :ref:`preprocessed_datasets`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``search_esgf`` | Automatic data download from ESGF | :obj:`str` | ``never`` | | | (``never``, ``when_missing``, | | | -| | ``always``) [#f4]_ | | | +| | ``always``). [#f4]_ | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ | ``skip_nonexistent`` | Skip non-existent datasets, see | :obj:`bool` | ``False`` | -| | :ref:`running` | | | +| | :ref:`running`. | | | +-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ .. [#f1] The ``auxiliary_data_dir`` setting is the path to place any required @@ -271,157 +274,144 @@ For example, Python's ``None`` is YAML's ``null``, Python's ``True`` is YAML's will be downloaded; otherwise, local data will be used. -.. _config-logging: - -Logging configuration -===================== - -Configure what information is logged and how it is presented in the ``logging`` -section. - -.. note:: - - Not all logging configuration is available here yet, see :issue:`2596`. - -Configuration file example: - -.. code:: yaml - - logging: - log_progress_interval: 10s - -will log progress of Dask computations every 10 seconds instead of showing a -progress bar. - -Command line example: - -.. code:: bash - - esmvaltool run --logging='{"log_progress_interval": "1m"}' recipe_example.yml - - -will log progress of Dask computations every minute instead of showing a -progress bar. - -Available options: - -+-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ -| Option | Description | Type | Default value | -+===============================+========================================+=============================+========================================+ -| ``log_progress_interval`` | When running computations with Dask, | :obj:`str` or :obj:`float` | 0 | -| | log progress every | | | -| | ``log_progress_interval`` instead of | | | -| | showing a progress bar. The value can | | | -| | be specified in the format accepted by | | | -| | :func:`dask.utils.parse_timedelta`. A | | | -| | negative value disables any progress | | | -| | reporting. A progress bar is only | | | -| | shown if ``max_parallel_tasks: 1``. | | | -+-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ - .. _config-dask: Dask configuration ================== +Configure Dask in the ``dask`` section. + The :ref:`preprocessor functions ` and many of the :ref:`Python diagnostics in ESMValTool ` make use of the :ref:`Iris ` library to work with the data. In Iris, data can be either :ref:`real or lazy `. -Lazy data is represented by `dask arrays `_. +Lazy data is represented by `dask arrays `__. Dask arrays consist of many small -`numpy arrays `_ +`numpy arrays `__ (called chunks) and if possible, computations are run on those small arrays in parallel. In order to figure out what needs to be computed when, Dask makes use of a -'`scheduler `_'. -The default scheduler in Dask is rather basic, so it can only run on a single -computer and it may not always find the optimal task scheduling solution, -resulting in excessive memory use when using e.g. the +'`scheduler `__'. +The default (thread-based) scheduler in Dask is rather basic, so it can only +run on a single computer and it may not always find the optimal task scheduling +solution, resulting in excessive memory use when using e.g. the :func:`esmvalcore.preprocessor.multi_model_statistics` preprocessor function. Therefore it is recommended that you take a moment to configure the -`Dask distributed `_ scheduler. +`Dask distributed `__ scheduler. A Dask scheduler and the 'workers' running the actual computations, are collectively called a 'Dask cluster'. -Dask distributed configuration ------------------------------- +Dask profiles +------------- -In ESMValCore, the Dask Distributed cluster can configured by creating a file called -``~/.esmvaltool/dask.yml``, where ``~`` is short for your home directory. -In this file, under the ``client`` keyword, the arguments to -:obj:`distributed.Client` can be provided. -Under the ``cluster`` keyword, the type of cluster (e.g. -:obj:`distributed.LocalCluster`), as well as any arguments required to start -the cluster can be provided. -Extensive documentation on setting up Dask Clusters is available -`here `__. +Because some recipes require more computational resources than others, +ESMValCore provides the option to define "Dask profiles". +These profiles can be used to update the `Dask user configuration +`__ per recipe run. +The Dask profile can be selected in a YAML configuration file via -.. warning:: +.. code:: yaml + + dask: + use: + +or alternatively in the command line via + +.. code:: bash - The format of the ``~/.esmvaltool/dask.yml`` configuration file is not yet - fixed and may change in the next release of ESMValCore. + esmvaltool run --dask='{"use": ""}' recipe_example.yml + +Available predefined Dask profiles: + +- ``local_threaded`` (selected by default): use `threaded scheduler + `__ without + any further options. +- ``local_distributed``: use `local distributed scheduler + `__ + without any further options. +- ``debug``: use `synchronous Dask scheduler + `__ for + debugging purposes. + Best used with ``max_parallel_tasks: 1``. + +Dask distributed scheduler configuration +---------------------------------------- + +Here, some examples are provided on how to use a custom Dask distributed +scheduler. +Extensive documentation on setting up Dask Clusters is available `here +`__. .. note:: If not all preprocessor functions support lazy data, computational - performance may be best with the :ref:`default scheduler `. + performance may be best with the :ref:`threaded scheduler + `. See :issue:`674` for progress on making all preprocessor functions lazy. -**Example configurations** - *Personal computer* -Create a Dask distributed cluster on the computer running ESMValCore using -all available resources: +Create a :class:`distributed.LocalCluster` on the computer running ESMValCore +using all available resources: .. code:: yaml - cluster: - type: distributed.LocalCluster + dask: + use: local_cluster # use "local_cluster" defined below + profiles: + local_cluster: + cluster: + type: distributed.LocalCluster -this should work well for most personal computers. +This should work well for most personal computers. .. note:: - Note that, if running this configuration on a shared node of an HPC cluster, - Dask will try and use as many resources it can find available, and this may - lead to overcrowding the node by a single user (you)! + If running this configuration on a shared node of an HPC cluster, Dask will + try and use as many resources it can find available, and this may lead to + overcrowding the node by a single user (you)! *Shared computer* -Create a Dask distributed cluster on the computer running ESMValCore, with -2 workers with 4 threads/4 GiB of memory each (8 GiB in total): +Create a :class:`distributed.LocalCluster` on the computer running ESMValCore, +with 2 workers with 2 threads/4 GiB of memory each (8 GiB in total): .. code:: yaml - cluster: - type: distributed.LocalCluster - n_workers: 2 - threads_per_worker: 4 - memory_limit: 4 GiB + dask: + use: local_cluster # use "local_cluster" defined below + profiles: + local_cluster: + cluster: + type: distributed.LocalCluster + n_workers: 2 + threads_per_worker: 2 + memory_limit: 4GiB this should work well for shared computers. *Computer cluster* -Create a Dask distributed cluster on the -`Levante `_ -supercomputer using the `Dask-Jobqueue `_ -package: +Create a Dask distributed cluster on the `Levante +`__ supercomputer +using the `Dask-Jobqueue `__ package: .. code:: yaml - cluster: - type: dask_jobqueue.SLURMCluster - queue: shared - account: bk1088 - cores: 8 - memory: 7680MiB - processes: 2 - interface: ib0 - local_directory: "/scratch/b/b381141/dask-tmp" - n_workers: 24 + dask: + use: slurm_cluster # use "slurm_cluster" defined below + profiles: + slurm_cluster: + cluster: + type: dask_jobqueue.SLURMCluster + queue: shared + account: + cores: 8 + memory: 7680MiB + processes: 2 + interface: ib0 + local_directory: "/scratch/b//dask-tmp" + n_workers: 24 This will start 24 workers with ``cores / processes = 4`` threads each, resulting in ``n_workers / processes = 12`` Slurm jobs, where each Slurm job @@ -429,34 +419,38 @@ will request 8 CPU cores and 7680 MiB of memory and start ``processes = 2`` workers. This example will use the fast infiniband network connection (called ``ib0`` on Levante) for communication between workers running on different nodes. -It is -`important to set the right location for temporary storage `__, -in this case the ``/scratch`` space is used. +It is `important to set the right location for temporary storage +`__, in this +case the ``/scratch`` space is used. It is also possible to use environmental variables to configure the temporary storage location, if you cluster provides these. A configuration like this should work well for larger computations where it is advantageous to use multiple nodes in a compute cluster. -See -`Deploying Dask Clusters on High Performance Computers `_ -for more information. +See `Deploying Dask Clusters on High Performance Computers +`__ for more information. *Externally managed Dask cluster* -Use an externally managed cluster, e.g. a cluster that you started using the -`Dask Jupyterlab extension `_: +To use an externally managed cluster, specify an ``scheduler_address`` for the +selected profile. +Such a cluster can e.g. be started using the `Dask Jupyterlab extension +`__: .. code:: yaml - client: - address: '127.0.0.1:8786' + dask: + use: external # Use the `external` profile defined below + profiles: + external: + scheduler_address: "tcp://127.0.0.1:43605" -See `here `_ +See `here `__ for an example of how to configure this on a remote system. For debugging purposes, it can be useful to start the cluster outside of ESMValCore because then -`Dask dashboard `_ remains +`Dask dashboard `__ remains available after ESMValCore has finished running. **Advice on choosing performant configurations** @@ -477,60 +471,148 @@ Therefore, it may be beneficial to use fewer threads per worker if the computation is very simple and the runtime is determined by the speed with which the data can be read from and/or written to disk. -.. _config-dask-default-scheduler: +.. _config-dask-threaded-scheduler: -Dask default scheduler configuration ------------------------------------- +Custom Dask threaded scheduler configuration +-------------------------------------------- -The Dask default scheduler can be a good choice for recipes using a small +The Dask threaded scheduler can be a good choice for recipes using a small amount of data or when running a recipe where not all preprocessor functions -are lazy yet (see :issue:`674` for the current status). To use the the Dask -default scheduler, comment out or remove all content of ``~/.esmvaltool/dask.yml``. +are lazy yet (see :issue:`674` for the current status). To avoid running out of memory, it is important to set the number of workers -(threads) used by Dask to run its computations to a reasonable number. By -default the number of CPU cores in the machine will be used, but this may be -too many on shared machines or laptops with a large number of CPU cores +(threads) used by Dask to run its computations to a reasonable number. +By default, the number of CPU cores in the machine will be used, but this may +be too many on shared machines or laptops with a large number of CPU cores compared to the amount of memory they have available. -Typically, Dask requires about 2GB of RAM per worker, but this may be more +Typically, Dask requires about 2 GiB of RAM per worker, but this may be more depending on the computation. -To set the number of workers used by the Dask default scheduler, create a file -called ``~/.config/dask/dask.yml`` and add the following -content: +To set the number of workers used by the Dask threaded scheduler, use the +following configuration: .. code:: yaml - scheduler: threads - num_workers: 4 # this example sets the number of workers to 4 + dask: + use: local_threaded # This can be omitted + profiles: + local_threaded: + num_workers: 4 + +.. _config-dask-defaults: +Default options +--------------- -Note that the file name is arbitrary, only the directory it is in matters, as -explained in more detail -`here `__. -See the `Dask documentation `__ -for more information. +By default, the following Dask configuration is used: -Configuring Dask for debugging ------------------------------- +.. code:: yaml -For debugging purposes, it can be useful to disable all parallelism, as this -will often result in more clear error messages. This can be achieved by -setting ``max_parallel_tasks: 1`` in the configuration, -commenting out or removing all content of ``~/.esmvaltool/dask.yml``, and -creating a file called ``~/.config/dask/dask.yml`` with the following -content: + dask: + use: local_threaded # use the `local_threaded` profile defined below + profiles: + local_threaded: + scheduler: threads + local_distributed: + cluster: + type: distributed.LocalCluster + debug: + scheduler: synchronous + +All available options +--------------------- + ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ +| Option | Description | Type | Default value | ++===============================+========================================+=============================+========================================+ +| ``profiles`` | Different Dask profiles that can be | :obj:`dict` | See :ref:`config-dask-defaults` | +| | selected via the ``use`` option. Each | | | +| | profile has a name (:obj:`dict` keys) | | | +| | and corresponding options (:obj:`dict` | | | +| | values). See | | | +| | :ref:`config-dask-profiles` for | | | +| | details. | | | ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ +| ``use`` | Dask profile that is used; must be | :obj:`str` | ``local_threaded`` | +| | defined in the option ``profiles``. | | | ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ + +.. _config-dask-profiles: + +Options for Dask profiles +------------------------- + ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ +| Option | Description | Type | Default value | ++===============================+========================================+=============================+========================================+ +| ``cluster`` | Keyword arguments to initialize a Dask | :obj:`dict` | If omitted, use externally managed | +| | distributed cluster. Needs the option | | cluster if ``scheduler_address`` is | +| | ``type``, which specifies the class of | | given or a :ref:`Dask threaded | +| | the cluster. The remaining options are | | scheduler | +| | passed as keyword arguments to | | ` | +| | initialize that class. Cannot be used | | otherwise. | +| | in combination with | | | +| | ``scheduler_address``. | | | ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ +| ``scheduler_address`` | Scheduler address of an externally | :obj:`str` | If omitted, use a Dask distributed | +| | managed cluster. Will be passed to | | cluster if ``cluster`` is given or a | +| | :class:`distributed.Client`. Cannot be | | :ref:`Dask threaded scheduler | +| | used in combination with ``cluster``. | | ` | +| | | | otherwise. | ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ +| All other options | Passed as keyword arguments to | Any | No defaults. | +| | :func:`dask.config.set`. | | | ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ + + +.. _config-logging: + +Logging configuration +===================== + +Configure what information is logged and how it is presented in the ``logging`` +section. + +.. note:: + + Not all logging configuration is available here yet, see :issue:`2596`. + +Configuration file example: .. code:: yaml - scheduler: synchronous + logging: + log_progress_interval: 10s + +will log progress of Dask computations every 10 seconds instead of showing a +progress bar. + +Command line example: + +.. code:: bash + + esmvaltool run --logging='{"log_progress_interval": "1m"}' recipe_example.yml + + +will log progress of Dask computations every minute instead of showing a +progress bar. -Note that the file name is arbitrary, only the directory it is in matters, as -explained in more detail -`here `__. -See the `Dask documentation `__ -for more information. +Available options: + ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ +| Option | Description | Type | Default value | ++===============================+========================================+=============================+========================================+ +| ``log_progress_interval`` | When running computations with Dask, | :obj:`str` or :obj:`float` | 0 | +| | log progress every | | | +| | ``log_progress_interval`` instead of | | | +| | showing a progress bar. The value can | | | +| | be specified in the format accepted by | | | +| | :func:`dask.utils.parse_timedelta`. A | | | +| | negative value disables any progress | | | +| | reporting. A progress bar is only | | | +| | shown if ``max_parallel_tasks: 1``. | | | ++-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+ .. _config-esgf: @@ -713,7 +795,7 @@ The resulting directory path would look something like this: CMIP/MOHC/HadGEM3-GC31-LL/historical/r1i1p1f3/Omon/tos/gn/latest -Please, bear in mind that ``input_dirs`` can also be a list for those cases in +Please, bear in mind that ``input_dirs`` can also be a list for those cases in which may be needed: .. code-block:: yaml diff --git a/doc/quickstart/output.rst b/doc/quickstart/output.rst index 2698456c6b..71d35a00e5 100644 --- a/doc/quickstart/output.rst +++ b/doc/quickstart/output.rst @@ -150,6 +150,6 @@ Here is an example metadata.yml file: As you can see, this is effectively a dictionary with several items including data paths, metadata and other information. -There are several tools available in python which are built to read and parse +There are several tools available in python which are built to read and parse these files. The tools are available in the shared directory in the diagnostics directory. diff --git a/esmvalcore/_main.py b/esmvalcore/_main.py index 42fbc16092..422908e464 100755 --- a/esmvalcore/_main.py +++ b/esmvalcore/_main.py @@ -84,7 +84,6 @@ def process_recipe(recipe_file: Path, session): import shutil from esmvalcore._recipe.recipe import read_recipe_file - from esmvalcore.config._dask import check_distributed_config if not recipe_file.is_file(): import errno @@ -121,8 +120,6 @@ def process_recipe(recipe_file: Path, session): "'max_parallel_tasks' in your configuration." ) - check_distributed_config() - if session["compress_netcdf"]: logger.warning( "You have enabled NetCDF compression. Accessing .nc files can be " @@ -399,6 +396,7 @@ def run(self, recipe, **kwargs): """ from .config import CFG + from .config._dask import warn_if_old_dask_config_exists from .exceptions import InvalidConfigParameter cli_config_dir = kwargs.pop("config_dir", None) @@ -439,7 +437,7 @@ def run(self, recipe, **kwargs): recipe = self._get_recipe(recipe) - CFG.update(kwargs) + CFG.nested_update(kwargs) CFG["resume_from"] = parse_resume(CFG["resume_from"], recipe) session = CFG.start_session(recipe.stem) @@ -455,6 +453,8 @@ def run(self, recipe, **kwargs): if cli_config_dir is not None: CFG.update_from_dirs([cli_config_dir]) + warn_if_old_dask_config_exists() + @staticmethod def _create_session_dir(session): """Create `session.session_dir` or an alternative if it exists.""" diff --git a/esmvalcore/_recipe/recipe.py b/esmvalcore/_recipe/recipe.py index 9c5aa74553..f42463f5a7 100644 --- a/esmvalcore/_recipe/recipe.py +++ b/esmvalcore/_recipe/recipe.py @@ -18,6 +18,7 @@ from esmvalcore._provenance import get_recipe_provenance from esmvalcore._task import DiagnosticTask, ResumeTask, TaskSet from esmvalcore.config._config import TASKSEP +from esmvalcore.config._dask import validate_dask_config from esmvalcore.config._diagnostics import TAGS from esmvalcore.dataset import Dataset from esmvalcore.exceptions import InputFilesNotFound, RecipeError @@ -786,6 +787,8 @@ class Recipe: def __init__(self, raw_recipe, session, recipe_file: Path): """Parse a recipe file into an object.""" + validate_dask_config(session["dask"]) + # Clear the global variable containing the set of files to download DOWNLOAD_FILES.clear() USED_DATASETS.clear() diff --git a/esmvalcore/config/_config_object.py b/esmvalcore/config/_config_object.py index 489e2301b2..29aed379ce 100644 --- a/esmvalcore/config/_config_object.py +++ b/esmvalcore/config/_config_object.py @@ -5,7 +5,7 @@ import os import sys import warnings -from collections.abc import Iterable +from collections.abc import Iterable, Mapping from datetime import datetime from pathlib import Path from typing import Optional @@ -464,9 +464,29 @@ def update_from_dirs(self, dirs: Iterable[str | Path]) -> None: """ new_config_dict = self._get_config_dict_from_dirs(dirs) - merged_config_dict = dask.config.merge(self, new_config_dict) - self.update(merged_config_dict) + self.nested_update(new_config_dict) + + def nested_update(self, new_options: Mapping) -> None: + """Nested update of configuration object with another mapping. + Merge the existing configuration object with a new mapping using + :func:`dask.config.merge` (new values are preferred over old values). + Nested objects are properly considered; see :func:`dask.config.update` + for details. + + Parameters + ---------- + new_options: + New configuration options. + + Raises + ------ + esmvalcore.exceptions.InvalidConfigParameter + Invalid configuration option given. + + """ + merged_config_dict = dask.config.merge(self, new_options) + self.update(merged_config_dict) self.check_missing() diff --git a/esmvalcore/config/_config_validators.py b/esmvalcore/config/_config_validators.py index b12ed08204..1f7edbc390 100644 --- a/esmvalcore/config/_config_validators.py +++ b/esmvalcore/config/_config_validators.py @@ -326,6 +326,7 @@ def validate_extra_facets_dir(value): "check_level": validate_check_level, "compress_netcdf": validate_bool, "config_developer_file": validate_config_developer, + "dask": validate_dict, "diagnostics": validate_diagnostics, "download_dir": validate_path, "drs": validate_drs, diff --git a/esmvalcore/config/_dask.py b/esmvalcore/config/_dask.py index 4de51e4aef..f9562d161b 100644 --- a/esmvalcore/config/_dask.py +++ b/esmvalcore/config/_dask.py @@ -3,76 +3,212 @@ import contextlib import importlib import logging +import os +import warnings +from collections.abc import Generator, Mapping +from copy import deepcopy from pathlib import Path +from typing import Any +import dask.config import yaml from distributed import Client +from distributed.deploy import Cluster + +from esmvalcore.config import CFG +from esmvalcore.exceptions import ( + ESMValCoreDeprecationWarning, + InvalidConfigParameter, +) logger = logging.getLogger(__name__) +# TODO: Remove in v2.14.0 CONFIG_FILE = Path.home() / ".esmvaltool" / "dask.yml" -def check_distributed_config(): - """Check the Dask distributed configuration.""" - if not CONFIG_FILE.exists(): - logger.warning( - "Using the Dask basic scheduler. This may lead to slow " - "computations and out-of-memory errors. " - "Note that the basic scheduler may still be the best choice for " - "preprocessor functions that are not lazy. " - "In that case, you can safely ignore this warning. " - "See https://docs.esmvaltool.org/projects/ESMValCore/en/latest/" - "quickstart/configure.html#dask-distributed-configuration for " - "more information. " +# TODO: Remove in v2.14.0 +def warn_if_old_dask_config_exists() -> None: + """Warn user if deprecated dask configuration file exists.""" + if CONFIG_FILE.exists() and not os.environ.get( + "ESMVALTOOL_USE_NEW_DASK_CONFIG" + ): + deprecation_msg = ( + "Usage of Dask configuration file ~/.esmvaltool/dask.yml " + "has been deprecated in ESMValCore version 2.12.0 and is " + "scheduled for removal in version 2.14.0. Please use the " + "configuration option `dask` instead (see " + "https://docs.esmvaltool.org/projects/ESMValCore/en/latest/" + "quickstart/configure.html#dask-configuration for details). " + "Ignoring all existing `dask` configuration options for this run. " + "To enable the new `dask` configuration options, delete or move " + "the file ~/.esmvaltool/dask.yml or set the environment variable " + "ESMVALTOOL_USE_NEW_DASK_CONFIG=1." + ) + warnings.warn( + deprecation_msg, ESMValCoreDeprecationWarning, stacklevel=2 + ) + + +def validate_dask_config(dask_config: Mapping) -> None: + """Validate dask configuration options.""" + for option in ("profiles", "use"): + if option not in dask_config: + raise InvalidConfigParameter( + f"Key '{option}' needs to be defined for 'dask' configuration" + ) + profiles = dask_config["profiles"] + use = dask_config["use"] + if not isinstance(profiles, Mapping): + raise InvalidConfigParameter( + f"Key 'dask.profiles' needs to be a mapping, got " + f"{type(profiles)}" + ) + for profile, profile_cfg in profiles.items(): + has_scheduler_address = any( + [ + "scheduler_address" in profile_cfg, + "scheduler-address" in profile_cfg, + ] ) + if "cluster" in profile_cfg and has_scheduler_address: + raise InvalidConfigParameter( + f"Key 'dask.profiles.{profile}' uses 'cluster' and " + f"'scheduler_address', can only have one of those" + ) + if "cluster" in profile_cfg: + cluster = profile_cfg["cluster"] + if not isinstance(cluster, Mapping): + raise InvalidConfigParameter( + f"Key 'dask.profiles.{profile}.cluster' needs to be a " + f"mapping, got {type(cluster)}" + ) + if "type" not in cluster: + raise InvalidConfigParameter( + f"Key 'dask.profiles.{profile}.cluster' does not have a " + f"'type'" + ) + if use not in profiles: + raise InvalidConfigParameter( + f"Key 'dask.use' needs to point to an element of 'dask.profiles'; " + f"got '{use}', expected one of {list(profiles.keys())}" + ) + + +# TODO: Remove in v2.14.0 +def _get_old_dask_config() -> dict: + """Get dask configuration dict from old dask configuration file.""" + dask_config: dict[str, Any] = { + "use": "local_threaded", + "profiles": {"local_threaded": {"scheduler": "threads"}}, + } + config = yaml.safe_load(CONFIG_FILE.read_text(encoding="utf-8")) + + # Use settings from file if this is not empty + if config is not None: + client_kwargs = config.get("client", {}) + cluster_kwargs = config.get("cluster", {}) + + # Externally managed cluster + if "address" in client_kwargs: + if cluster_kwargs: + logger.warning( + "Not using Dask 'cluster' settings from %s because a " + "cluster 'address' is already provided in 'client'.", + CONFIG_FILE, + ) + dask_config = { + "use": "external", + "profiles": { + "external": { + "scheduler_address": client_kwargs.pop("address"), + }, + }, + } + + # Dask distributed cluster + elif cluster_kwargs: + cluster_kwargs.setdefault("type", "distributed.LocalCluster") + dask_config = { + "use": "cluster_from_file", + "profiles": { + "cluster_from_file": { + "cluster": cluster_kwargs, + }, + }, + } + + dask_config["client"] = client_kwargs + + return dask_config + + +# TODO: Remove in v2.14.0; used deepcopy(CFG["dask"]) instead +def _get_dask_config() -> dict: + """Get Dask configuration dictionary.""" + if CONFIG_FILE.exists() and not os.environ.get( + "ESMVALTOOL_USE_NEW_DASK_CONFIG" + ): + dask_config = _get_old_dask_config() + else: + dask_config = deepcopy(CFG["dask"]) + return dask_config @contextlib.contextmanager -def get_distributed_client(): +def get_distributed_client() -> Generator[None | Client]: """Get a Dask distributed client.""" - dask_args = {} - if CONFIG_FILE.exists(): - config = yaml.safe_load(CONFIG_FILE.read_text(encoding="utf-8")) - if config is not None: - dask_args = config - - client_args = dask_args.get("client") or {} - cluster_args = dask_args.get("cluster") or {} - - # Start a cluster, if requested - if "address" in client_args: - # Use an externally managed cluster. + warn_if_old_dask_config_exists() + dask_config = _get_dask_config() + validate_dask_config(dask_config) + + # TODO: Remove in v2.14.0 + client_kwargs = dask_config.get("client", {}) + + # Set up cluster and client according to the selected profile + # Note: we already ensured earlier that the selected profile (via `use`) + # actually exists in `profiles`, so we don't have to check that again here + logger.debug("Using Dask profile '%s'", dask_config["use"]) + profile = dask_config["profiles"][dask_config["use"]] + cluster_kwargs = profile.pop("cluster", None) + + logger.debug("Using additional Dask settings %s", profile) + dask.config.set(profile) + + cluster: None | Cluster + client: None | Client + + # Threaded scheduler + if cluster_kwargs is None: cluster = None - if cluster_args: - logger.warning( - "Not using Dask 'cluster' settings from %s because a cluster " - "'address' is already provided in 'client'.", - CONFIG_FILE, - ) - elif cluster_args: - # Start cluster. - cluster_type = cluster_args.pop( - "type", - "distributed.LocalCluster", - ) + + # Distributed scheduler + else: + cluster_type = cluster_kwargs.pop("type") cluster_module_name, cluster_cls_name = cluster_type.rsplit(".", 1) cluster_module = importlib.import_module(cluster_module_name) cluster_cls = getattr(cluster_module, cluster_cls_name) - cluster = cluster_cls(**cluster_args) - client_args["address"] = cluster.scheduler_address - else: - # No cluster configured, use Dask basic scheduler, or a LocalCluster - # managed through Client. - cluster = None + cluster = cluster_cls(**cluster_kwargs) + dask.config.set({"scheduler_address": cluster.scheduler_address}) + logger.debug("Using Dask cluster %s", cluster) - # Start a client, if requested - if dask_args: - client = Client(**client_args) - logger.info("Dask dashboard: %s", client.dashboard_link) - else: - logger.info("Using the Dask basic scheduler.") + if dask.config.get("scheduler_address", None) is None: client = None + logger.info( + "Using Dask threaded scheduler. The distributed scheduler is " + "recommended, please read https://docs.esmvaltool.org/projects/" + "ESMValCore/en/latest/quickstart/" + "configure.html#dask-configuration how to use a distributed " + "scheduler." + ) + else: + client = Client(**client_kwargs) + logger.info( + "Using Dask distributed scheduler (address: %s, dashboard link: " + "%s)", + dask.config.get("scheduler_address"), + client.dashboard_link, + ) try: yield client diff --git a/esmvalcore/config/configurations/defaults/dask.yml b/esmvalcore/config/configurations/defaults/dask.yml new file mode 100644 index 0000000000..33f5579532 --- /dev/null +++ b/esmvalcore/config/configurations/defaults/dask.yml @@ -0,0 +1,10 @@ +dask: + use: local_threaded # use the `local_threaded` profile defined below + profiles: + local_threaded: + scheduler: threads + local_distributed: + cluster: + type: distributed.LocalCluster + debug: + scheduler: synchronous diff --git a/esmvalcore/config/configurations/defaults/more_options.yml b/esmvalcore/config/configurations/defaults/more_top_level_options.yml similarity index 76% rename from esmvalcore/config/configurations/defaults/more_options.yml rename to esmvalcore/config/configurations/defaults/more_top_level_options.yml index c61a70a493..2e0dc8a49c 100644 --- a/esmvalcore/config/configurations/defaults/more_options.yml +++ b/esmvalcore/config/configurations/defaults/more_top_level_options.yml @@ -1,4 +1,3 @@ -# Other options not included in config-user.yml check_level: default diagnostics: null extra_facets_dir: [] diff --git a/esmvalcore/experimental/recipe.py b/esmvalcore/experimental/recipe.py index f199ef719f..ce54b7c792 100644 --- a/esmvalcore/experimental/recipe.py +++ b/esmvalcore/experimental/recipe.py @@ -10,7 +10,7 @@ import yaml from esmvalcore._recipe.recipe import Recipe as RecipeEngine -from esmvalcore.config import CFG, Session, _dask +from esmvalcore.config import CFG, Session from ._logging import log_to_dir from .recipe_info import RecipeInfo @@ -133,7 +133,6 @@ def run( session["diagnostics"] = task with log_to_dir(session.run_dir): - _dask.check_distributed_config() self._engine = self._load(session=session) self._engine.run() diff --git a/esmvalcore/local.py b/esmvalcore/local.py index 6483ee7a8f..41cf424476 100644 --- a/esmvalcore/local.py +++ b/esmvalcore/local.py @@ -183,7 +183,7 @@ def _dates_to_timerange(start_date, end_date): Note ---- This function ensures that dates in years format follow the pattern YYYY - (i.e., that they have at least 4 digits). Other formats, such as wildcards + (i.e., that they have at least 4 digits). Other formats, such as wildcards (``'*'``) and relative time ranges (e.g., ``'P6Y'``) are used unchanged. Parameters diff --git a/esmvalcore/preprocessor/_dask_progress.py b/esmvalcore/preprocessor/_dask_progress.py index bcfc3380d5..55594c40da 100644 --- a/esmvalcore/preprocessor/_dask_progress.py +++ b/esmvalcore/preprocessor/_dask_progress.py @@ -20,7 +20,7 @@ class RichProgressBar(dask.diagnostics.Callback): - """Progress bar using `rich` for the Dask default scheduler.""" + """Progress bar using `rich` for the Dask threaded scheduler.""" # Disable warnings about design choices that have been made in the base class. # pylint: disable=method-hidden,super-init-not-called,too-few-public-methods,unused-argument,useless-suppression @@ -109,7 +109,7 @@ def _draw_stop(self, **kwargs): class ProgressLogger(dask.diagnostics.ProgressBar): - """Progress logger for the Dask default scheduler.""" + """Progress logger for the Dask threaded scheduler.""" # Disable warnings about design choices that have been made in the base class. # pylint: disable=too-few-public-methods,unused-argument,useless-suppression diff --git a/esmvalcore/preprocessor/_volume.py b/esmvalcore/preprocessor/_volume.py index 8d56d7b51a..4c2f7574d0 100644 --- a/esmvalcore/preprocessor/_volume.py +++ b/esmvalcore/preprocessor/_volume.py @@ -457,7 +457,7 @@ def extract_transect( transect along 28 West. Also, `'extract_transect(cube, longitude=-28, latitude=[-50, 50])'` will - produce a transect along 28 West between 50 south and 50 North. + produce a transect along 28 West between 50 south and 50 North. This function is not yet implemented for irregular arrays - instead try the extract_trajectory function, but note that it is currently diff --git a/tests/conftest.py b/tests/conftest.py index d973b76695..d16442d302 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,7 @@ import pytest +import esmvalcore.config._dask from esmvalcore.config import CFG, Config @@ -32,3 +33,13 @@ def ignore_old_config_user(tmp_path, monkeypatch): monkeypatch.setattr( Config, "_DEFAULT_USER_CONFIG_DIR", nonexistent_config_dir ) + + +# TODO: remove in v2.14.0 +@pytest.fixture(autouse=True) +def ignore_old_dask_config_file(tmp_path, monkeypatch): + """Ignore potentially existing old dask.yml file in all tests.""" + nonexistent_file = tmp_path / "nonexistent_file.yml" + monkeypatch.setattr( + esmvalcore.config._dask, "CONFIG_FILE", nonexistent_file + ) diff --git a/tests/integration/cmor/test_fix.py b/tests/integration/cmor/test_fix.py index 43b9419f64..4ce1b01e60 100644 --- a/tests/integration/cmor/test_fix.py +++ b/tests/integration/cmor/test_fix.py @@ -526,7 +526,7 @@ def test_fix_metadata_cfmon_ta_alternative(self): assert self.mock_warning.call_count == 9 def test_fix_metadata_cfmon_ta_no_alternative(self, mocker): - """Test ``fix_metadata`` with no alternative coordinate.""" + """Test ``fix_metadata`` with no alternative coordinate.""" short_name = "ta" project = "CMIP6" dataset = "__MODEL_WITH_NO_EXPLICIT_FIX__" diff --git a/tests/parse_pymon.py b/tests/parse_pymon.py index 3f05929703..b3b87be6e6 100644 --- a/tests/parse_pymon.py +++ b/tests/parse_pymon.py @@ -72,7 +72,7 @@ def _parse_pymon_database(): # Be sure to close the connection con.close() - # Throw a sys exit so test fails if we have >4GB tests + # Throw a sys exit so test fails if we have >4GB tests if big_mem_tests: print("Some tests exceed 4GB of RES memory, look into them!") print(big_mem_tests) diff --git a/tests/unit/config/test_config.py b/tests/unit/config/test_config.py index 44b8d6ee3e..04d10e2459 100644 --- a/tests/unit/config/test_config.py +++ b/tests/unit/config/test_config.py @@ -179,6 +179,22 @@ def test_load_default_config(cfg_default, monkeypatch): "check_level": CheckLevels.DEFAULT, "compress_netcdf": False, "config_developer_file": default_dev_file, + "dask": { + "profiles": { + "local_threaded": { + "scheduler": "threads", + }, + "local_distributed": { + "cluster": { + "type": "distributed.LocalCluster", + }, + }, + "debug": { + "scheduler": "synchronous", + }, + }, + "use": "local_threaded", + }, "diagnostics": None, "download_dir": Path.home() / "climate_data", "drs": { diff --git a/tests/unit/config/test_config_object.py b/tests/unit/config/test_config_object.py index 09fc93ed3a..8c9f4e1913 100644 --- a/tests/unit/config/test_config_object.py +++ b/tests/unit/config/test_config_object.py @@ -588,3 +588,19 @@ def test_update_from_dirs(dirs, output_file_type, rootpath, tmp_path): assert cfg["output_file_type"] == output_file_type assert cfg["rootpath"] == rootpath assert cfg["search_esgf"] == "when_missing" + + +def test_nested_update(): + """Test `Config.update_from_dirs`.""" + cfg = Config() + assert not cfg + + cfg["drs"] = {"X": "x", "Z": "z"} + cfg["search_esgf"] = "when_missing" + + cfg.nested_update({"drs": {"Y": "y", "X": "xx"}, "max_years": 1}) + + assert len(cfg) == 3 + assert cfg["drs"] == {"Y": "y", "X": "xx", "Z": "z"} + assert cfg["search_esgf"] == "when_missing" + assert cfg["max_years"] == 1 diff --git a/tests/unit/config/test_dask.py b/tests/unit/config/test_dask.py index 8efc305023..3baf9d5c2f 100644 --- a/tests/unit/config/test_dask.py +++ b/tests/unit/config/test_dask.py @@ -1,21 +1,165 @@ import pytest import yaml -from esmvalcore.config import _dask +from esmvalcore.config import CFG, _dask +from esmvalcore.exceptions import ( + ESMValCoreDeprecationWarning, + InvalidConfigParameter, +) -def test_get_no_distributed_client(mocker, tmp_path): - mocker.patch.object(_dask, "CONFIG_FILE", tmp_path / "nonexistent.yml") +@pytest.fixture +def mock_dask_config_set(mocker): + dask_config_dict = {} + mock_dask_set = mocker.patch("dask.config.set", autospec=True) + mock_dask_set.side_effect = dask_config_dict.update + mock_dask_get = mocker.patch("dask.config.get", autospec=True) + mock_dask_get.side_effect = dask_config_dict.get + return mock_dask_set + + +def test_get_no_distributed_client(): with _dask.get_distributed_client() as client: assert client is None +# TODO: Remove in v2.14.0 +def test_get_distributed_client_empty_dask_file(mocker, tmp_path): + # Create mock client configuration. + cfg_file = tmp_path / "dask.yml" + with cfg_file.open("w", encoding="utf-8") as file: + file.write("") + mocker.patch.object(_dask, "CONFIG_FILE", cfg_file) + + # Create mock distributed.Client + with pytest.warns(ESMValCoreDeprecationWarning): + with _dask.get_distributed_client() as client: + assert client is None + + +# TODO: Remove in v2.14.0 +@pytest.mark.parametrize("use_new_dask_config", ["", "1"]) +def test_force_new_dask_config( + monkeypatch, mocker, tmp_path, mock_dask_config_set, use_new_dask_config +): + # Old config -> threaded scheduler + cfg_file = tmp_path / "dask.yml" + with cfg_file.open("w", encoding="utf-8") as file: + file.write("") + mocker.patch.object(_dask, "CONFIG_FILE", cfg_file) + + # New config -> distributed scheduler + slurm_cluster = { + "type": "dask_jobqueue.SLURMCluster", + "queue": "interactive", + "cores": "8", + "memory": "16GiB", + } + monkeypatch.setitem( + CFG, + "dask", + { + "use": "slurm_cluster", + "profiles": {"slurm_cluster": {"cluster": slurm_cluster}}, + }, + ) + + # Create mock distributed.Client + mock_client = mocker.Mock() + mocker.patch.object( + _dask, "Client", create_autospec=True, return_value=mock_client + ) + + mock_module = mocker.Mock() + mock_cluster_cls = mocker.Mock() + mock_module.SLURMCluster = mock_cluster_cls + mocker.patch.object( + _dask.importlib, + "import_module", + create_autospec=True, + return_value=mock_module, + ) + + monkeypatch.setenv("ESMVALTOOL_USE_NEW_DASK_CONFIG", use_new_dask_config) + + with _dask.get_distributed_client() as client: + if use_new_dask_config: + assert client is not None + else: + assert client is None + + +# TODO: Remove in v2.14.0 +def test_get_old_dask_config(mocker, tmp_path): + # Create mock client configuration. + cfg = {"cluster": {"n_workers": 2}} + cfg_file = tmp_path / "dask.yml" + with cfg_file.open("w", encoding="utf-8") as file: + yaml.safe_dump(cfg, file) + mocker.patch.object(_dask, "CONFIG_FILE", cfg_file) + + dask_cfg = _dask._get_old_dask_config() + + expected_cfg = { + "use": "cluster_from_file", + "profiles": { + "cluster_from_file": { + "cluster": { + "n_workers": 2, + "type": "distributed.LocalCluster", + }, + }, + }, + "client": {}, + } + assert dask_cfg == expected_cfg + + +def test_get_distributed_client_external( + monkeypatch, mocker, mock_dask_config_set +): + monkeypatch.setitem( + CFG, + "dask", + { + "use": "external", + "profiles": { + "external": { + "scheduler_address": "tcp://127.0.0.1:42021", + }, + }, + }, + ) + + # Create mock distributed.Client + mock_client = mocker.Mock() + mocker.patch.object( + _dask, "Client", create_autospec=True, return_value=mock_client + ) + + with _dask.get_distributed_client() as client: + assert client is mock_client + _dask.Client.assert_called_once_with() + mock_client.close.assert_called_once_with() + assert ( + mocker.call({"scheduler_address": "tcp://127.0.0.1:42021"}) + in mock_dask_config_set.mock_calls + ) + + +# TODO: Remove in v2.14.0 @pytest.mark.parametrize("warn_unused_args", [False, True]) -def test_get_distributed_client_external(mocker, tmp_path, warn_unused_args): +def test_get_distributed_client_external_old( + mocker, + tmp_path, + mock_dask_config_set, + warn_unused_args, +): # Create mock client configuration. cfg = { "client": { "address": "tcp://127.0.0.1:42021", + "other_client_options": 1, }, } if warn_unused_args: @@ -31,14 +175,78 @@ def test_get_distributed_client_external(mocker, tmp_path, warn_unused_args): _dask, "Client", create_autospec=True, return_value=mock_client ) + with pytest.warns(ESMValCoreDeprecationWarning): + with _dask.get_distributed_client() as client: + assert client is mock_client + _dask.Client.assert_called_once_with(other_client_options=1) + mock_client.close.assert_called_once_with() + assert ( + mocker.call({"scheduler_address": "tcp://127.0.0.1:42021"}) + in mock_dask_config_set.mock_calls + ) + + +@pytest.mark.parametrize("shutdown_timeout", [False, True]) +def test_get_distributed_client_slurm( + monkeypatch, mocker, mock_dask_config_set, shutdown_timeout +): + slurm_cluster = { + "type": "dask_jobqueue.SLURMCluster", + "queue": "interactive", + "cores": "8", + "memory": "16GiB", + } + monkeypatch.setitem( + CFG, + "dask", + { + "use": "slurm_cluster", + "profiles": { + "slurm_cluster": { + "cluster": slurm_cluster, + "num_workers": 42, + }, + }, + }, + ) + + # Create mock distributed.Client + mock_client = mocker.Mock() + mocker.patch.object( + _dask, "Client", create_autospec=True, return_value=mock_client + ) + + mock_module = mocker.Mock() + mock_cluster_cls = mocker.Mock() + mock_module.SLURMCluster = mock_cluster_cls + mocker.patch.object( + _dask.importlib, + "import_module", + create_autospec=True, + return_value=mock_module, + ) + mock_cluster = mock_cluster_cls.return_value + if shutdown_timeout: + mock_cluster.close.side_effect = TimeoutError with _dask.get_distributed_client() as client: assert client is mock_client - _dask.Client.assert_called_with(**cfg["client"]) - mock_client.close.assert_called() + mock_client.close.assert_called_once_with() + _dask.Client.assert_called_once_with() + args = {k: v for k, v in slurm_cluster.items() if k != "type"} + mock_cluster_cls.assert_called_once_with(**args) + mock_cluster.close.assert_called() + assert mocker.call({"num_workers": 42}) in mock_dask_config_set.mock_calls + assert ( + mocker.call({"scheduler_address": mock_cluster.scheduler_address}) + in mock_dask_config_set.mock_calls + ) +# TODO: Remove in v2.14.0 @pytest.mark.parametrize("shutdown_timeout", [False, True]) -def test_get_distributed_client_slurm(mocker, tmp_path, shutdown_timeout): +def test_get_distributed_client_slurm_old( + mocker, tmp_path, mock_dask_config_set, shutdown_timeout +): cfg = { "cluster": { "type": "dask_jobqueue.SLURMCluster", @@ -70,10 +278,138 @@ def test_get_distributed_client_slurm(mocker, tmp_path, shutdown_timeout): mock_cluster = mock_cluster_cls.return_value if shutdown_timeout: mock_cluster.close.side_effect = TimeoutError - with _dask.get_distributed_client() as client: - assert client is mock_client - mock_client.close.assert_called() - _dask.Client.assert_called_with(address=mock_cluster.scheduler_address) + with pytest.warns(ESMValCoreDeprecationWarning): + with _dask.get_distributed_client() as client: + assert client is mock_client + mock_client.close.assert_called_once_with() + _dask.Client.assert_called_once_with() args = {k: v for k, v in cfg["cluster"].items() if k != "type"} - mock_cluster_cls.assert_called_with(**args) + mock_cluster_cls.assert_called_once_with(**args) mock_cluster.close.assert_called() + assert ( + mocker.call({"scheduler_address": mock_cluster.scheduler_address}) + in mock_dask_config_set.mock_calls + ) + + +def test_custom_default_scheduler(monkeypatch, mock_dask_config_set): + default_scheduler = {"num_workers": 42, "scheduler": "processes"} + monkeypatch.setitem( + CFG, + "dask", + { + "use": "process_scheduler", + "profiles": {"process_scheduler": default_scheduler}, + }, + ) + + with _dask.get_distributed_client() as client: + assert client is None + + mock_dask_config_set.assert_called_once_with( + {"num_workers": 42, "scheduler": "processes"} + ) + + +def test_invalid_dask_config_no_profiles(monkeypatch): + monkeypatch.setitem(CFG, "dask", {}) + + msg = "Key 'profiles' needs to be defined for 'dask' configuration" + with pytest.raises(InvalidConfigParameter, match=msg): + with _dask.get_distributed_client(): + pass + + +def test_invalid_dask_config_no_use(monkeypatch): + monkeypatch.setitem(CFG, "dask", {"profiles": {}}) + + msg = "Key 'use' needs to be defined for 'dask' configuration" + with pytest.raises(InvalidConfigParameter, match=msg): + with _dask.get_distributed_client(): + pass + + +def test_invalid_dask_config_invalid_profiles(monkeypatch): + monkeypatch.setitem(CFG, "dask", {"use": "test", "profiles": 1}) + + msg = "Key 'dask.profiles' needs to be a mapping, got" + with pytest.raises(InvalidConfigParameter, match=msg): + with _dask.get_distributed_client(): + pass + + +@pytest.mark.parametrize( + "address_name", ["scheduler_address", "scheduler-address"] +) +def test_invalid_dask_config_profile_with_cluster_and_address( + monkeypatch, address_name +): + monkeypatch.setitem( + CFG, + "dask", + { + "use": "test", + "profiles": { + "test": {"cluster": {}, address_name: "8786"}, + }, + }, + ) + + msg = "Key 'dask.profiles.test' uses 'cluster' and 'scheduler_address'" + with pytest.raises(InvalidConfigParameter, match=msg): + with _dask.get_distributed_client(): + pass + + +def test_invalid_dask_config_profile_invalid_cluster(monkeypatch): + monkeypatch.setitem( + CFG, + "dask", + { + "use": "test", + "profiles": { + "test": {"cluster": 1}, + }, + }, + ) + + msg = "Key 'dask.profiles.test.cluster' needs to be a mapping" + with pytest.raises(InvalidConfigParameter, match=msg): + with _dask.get_distributed_client(): + pass + + +def test_invalid_dask_config_cluster_no_type(monkeypatch): + monkeypatch.setitem( + CFG, + "dask", + { + "use": "test", + "profiles": { + "test": {"cluster": {}}, + }, + }, + ) + + msg = "Key 'dask.profiles.test.cluster' does not have a 'type'" + with pytest.raises(InvalidConfigParameter, match=msg): + with _dask.get_distributed_client(): + pass + + +def test_invalid_dask_config_invalid_use(monkeypatch): + monkeypatch.setitem( + CFG, + "dask", + { + "use": "not_in_profiles", + "profiles": { + "test": {}, + }, + }, + ) + + msg = "Key 'dask.use' needs to point to an element of 'dask.profiles'" + with pytest.raises(InvalidConfigParameter, match=msg): + with _dask.get_distributed_client(): + pass diff --git a/tests/unit/main/test_esmvaltool.py b/tests/unit/main/test_esmvaltool.py index 1e03bbe5b1..03985363d7 100644 --- a/tests/unit/main/test_esmvaltool.py +++ b/tests/unit/main/test_esmvaltool.py @@ -21,17 +21,23 @@ @pytest.fixture def cfg(mocker, tmp_path): """Mock `esmvalcore.config.CFG`.""" - cfg_dict = {"resume_from": []} + cfg_dict = { + "dask": { + "profiles": {"local_threaded": {"scheduler": "threads"}}, + "use": "local_threaded", + }, + "resume_from": [], + } cfg = mocker.MagicMock() cfg.__getitem__.side_effect = cfg_dict.__getitem__ cfg.__setitem__.side_effect = cfg_dict.__setitem__ - cfg.update.side_effect = cfg_dict.update + cfg.nested_update.side_effect = cfg_dict.update session = mocker.MagicMock() session.__getitem__.side_effect = cfg.__getitem__ session.__setitem__.side_effect = cfg.__setitem__ - session.update.side_effect = cfg.update + session.nested_update.side_effect = cfg.nested_update output_dir = tmp_path / "esmvaltool_output" session.session_dir = output_dir / "recipe_test"