Skip to content

Commit

Permalink
Make Dask configurable in our configuration (#2616)
Browse files Browse the repository at this point in the history
Co-authored-by: Bouwe Andela <b.andela@esciencecenter.nl>
  • Loading branch information
schlunma and bouweandela authored Dec 20, 2024
1 parent 90d11a2 commit 23400b1
Show file tree
Hide file tree
Showing 22 changed files with 890 additions and 255 deletions.
2 changes: 1 addition & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# ESMValTool documentation build configuration file, created by
# sphinx-quickstart on Tue Jun 2 11:34:13 2015.
# sphinx-quickstart on Tue Jun 2 11:34:13 2015.
#
# This file is execfile()d with the current directory set to its
# containing dir.
Expand Down
2 changes: 1 addition & 1 deletion doc/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ These include in particular:
branch. If a strong objection is raised the backward-incompatible
change should not be merged until the objection is resolved.
- 🛠 Information required for the “*backward-incompatible changes*”
section in the PR that introduces the *backward-incompatible change*
section in the PR that introduces the *backward-incompatible change*
available.

.. _scientific_relevance:
Expand Down
426 changes: 254 additions & 172 deletions doc/quickstart/configure.rst

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion doc/quickstart/output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,6 @@ Here is an example metadata.yml file:
As you can see, this is effectively a dictionary with several items including
data paths, metadata and other information.

There are several tools available in python which are built to read and parse
There are several tools available in python which are built to read and parse
these files. The tools are available in the shared directory in the diagnostics
directory.
8 changes: 4 additions & 4 deletions esmvalcore/_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def process_recipe(recipe_file: Path, session):
import shutil

from esmvalcore._recipe.recipe import read_recipe_file
from esmvalcore.config._dask import check_distributed_config

if not recipe_file.is_file():
import errno
Expand Down Expand Up @@ -121,8 +120,6 @@ def process_recipe(recipe_file: Path, session):
"'max_parallel_tasks' in your configuration."
)

check_distributed_config()

if session["compress_netcdf"]:
logger.warning(
"You have enabled NetCDF compression. Accessing .nc files can be "
Expand Down Expand Up @@ -399,6 +396,7 @@ def run(self, recipe, **kwargs):
"""
from .config import CFG
from .config._dask import warn_if_old_dask_config_exists
from .exceptions import InvalidConfigParameter

cli_config_dir = kwargs.pop("config_dir", None)
Expand Down Expand Up @@ -439,7 +437,7 @@ def run(self, recipe, **kwargs):

recipe = self._get_recipe(recipe)

CFG.update(kwargs)
CFG.nested_update(kwargs)
CFG["resume_from"] = parse_resume(CFG["resume_from"], recipe)
session = CFG.start_session(recipe.stem)

Expand All @@ -455,6 +453,8 @@ def run(self, recipe, **kwargs):
if cli_config_dir is not None:
CFG.update_from_dirs([cli_config_dir])

warn_if_old_dask_config_exists()

@staticmethod
def _create_session_dir(session):
"""Create `session.session_dir` or an alternative if it exists."""
Expand Down
3 changes: 3 additions & 0 deletions esmvalcore/_recipe/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from esmvalcore._provenance import get_recipe_provenance
from esmvalcore._task import DiagnosticTask, ResumeTask, TaskSet
from esmvalcore.config._config import TASKSEP
from esmvalcore.config._dask import validate_dask_config
from esmvalcore.config._diagnostics import TAGS
from esmvalcore.dataset import Dataset
from esmvalcore.exceptions import InputFilesNotFound, RecipeError
Expand Down Expand Up @@ -786,6 +787,8 @@ class Recipe:

def __init__(self, raw_recipe, session, recipe_file: Path):
"""Parse a recipe file into an object."""
validate_dask_config(session["dask"])

# Clear the global variable containing the set of files to download
DOWNLOAD_FILES.clear()
USED_DATASETS.clear()
Expand Down
26 changes: 23 additions & 3 deletions esmvalcore/config/_config_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import sys
import warnings
from collections.abc import Iterable
from collections.abc import Iterable, Mapping
from datetime import datetime
from pathlib import Path
from typing import Optional
Expand Down Expand Up @@ -464,9 +464,29 @@ def update_from_dirs(self, dirs: Iterable[str | Path]) -> None:
"""
new_config_dict = self._get_config_dict_from_dirs(dirs)
merged_config_dict = dask.config.merge(self, new_config_dict)
self.update(merged_config_dict)
self.nested_update(new_config_dict)

def nested_update(self, new_options: Mapping) -> None:
"""Nested update of configuration object with another mapping.
Merge the existing configuration object with a new mapping using
:func:`dask.config.merge` (new values are preferred over old values).
Nested objects are properly considered; see :func:`dask.config.update`
for details.
Parameters
----------
new_options:
New configuration options.
Raises
------
esmvalcore.exceptions.InvalidConfigParameter
Invalid configuration option given.
"""
merged_config_dict = dask.config.merge(self, new_options)
self.update(merged_config_dict)
self.check_missing()


Expand Down
1 change: 1 addition & 0 deletions esmvalcore/config/_config_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def validate_extra_facets_dir(value):
"check_level": validate_check_level,
"compress_netcdf": validate_bool,
"config_developer_file": validate_config_developer,
"dask": validate_dict,
"diagnostics": validate_diagnostics,
"download_dir": validate_path,
"drs": validate_drs,
Expand Down
234 changes: 185 additions & 49 deletions esmvalcore/config/_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,212 @@
import contextlib
import importlib
import logging
import os
import warnings
from collections.abc import Generator, Mapping
from copy import deepcopy
from pathlib import Path
from typing import Any

import dask.config
import yaml
from distributed import Client
from distributed.deploy import Cluster

from esmvalcore.config import CFG
from esmvalcore.exceptions import (
ESMValCoreDeprecationWarning,
InvalidConfigParameter,
)

logger = logging.getLogger(__name__)

# TODO: Remove in v2.14.0
CONFIG_FILE = Path.home() / ".esmvaltool" / "dask.yml"


def check_distributed_config():
"""Check the Dask distributed configuration."""
if not CONFIG_FILE.exists():
logger.warning(
"Using the Dask basic scheduler. This may lead to slow "
"computations and out-of-memory errors. "
"Note that the basic scheduler may still be the best choice for "
"preprocessor functions that are not lazy. "
"In that case, you can safely ignore this warning. "
"See https://docs.esmvaltool.org/projects/ESMValCore/en/latest/"
"quickstart/configure.html#dask-distributed-configuration for "
"more information. "
# TODO: Remove in v2.14.0
def warn_if_old_dask_config_exists() -> None:
"""Warn user if deprecated dask configuration file exists."""
if CONFIG_FILE.exists() and not os.environ.get(
"ESMVALTOOL_USE_NEW_DASK_CONFIG"
):
deprecation_msg = (
"Usage of Dask configuration file ~/.esmvaltool/dask.yml "
"has been deprecated in ESMValCore version 2.12.0 and is "
"scheduled for removal in version 2.14.0. Please use the "
"configuration option `dask` instead (see "
"https://docs.esmvaltool.org/projects/ESMValCore/en/latest/"
"quickstart/configure.html#dask-configuration for details). "
"Ignoring all existing `dask` configuration options for this run. "
"To enable the new `dask` configuration options, delete or move "
"the file ~/.esmvaltool/dask.yml or set the environment variable "
"ESMVALTOOL_USE_NEW_DASK_CONFIG=1."
)
warnings.warn(
deprecation_msg, ESMValCoreDeprecationWarning, stacklevel=2
)


def validate_dask_config(dask_config: Mapping) -> None:
"""Validate dask configuration options."""
for option in ("profiles", "use"):
if option not in dask_config:
raise InvalidConfigParameter(
f"Key '{option}' needs to be defined for 'dask' configuration"
)
profiles = dask_config["profiles"]
use = dask_config["use"]
if not isinstance(profiles, Mapping):
raise InvalidConfigParameter(
f"Key 'dask.profiles' needs to be a mapping, got "
f"{type(profiles)}"
)
for profile, profile_cfg in profiles.items():
has_scheduler_address = any(
[
"scheduler_address" in profile_cfg,
"scheduler-address" in profile_cfg,
]
)
if "cluster" in profile_cfg and has_scheduler_address:
raise InvalidConfigParameter(
f"Key 'dask.profiles.{profile}' uses 'cluster' and "
f"'scheduler_address', can only have one of those"
)
if "cluster" in profile_cfg:
cluster = profile_cfg["cluster"]
if not isinstance(cluster, Mapping):
raise InvalidConfigParameter(
f"Key 'dask.profiles.{profile}.cluster' needs to be a "
f"mapping, got {type(cluster)}"
)
if "type" not in cluster:
raise InvalidConfigParameter(
f"Key 'dask.profiles.{profile}.cluster' does not have a "
f"'type'"
)
if use not in profiles:
raise InvalidConfigParameter(
f"Key 'dask.use' needs to point to an element of 'dask.profiles'; "
f"got '{use}', expected one of {list(profiles.keys())}"
)


# TODO: Remove in v2.14.0
def _get_old_dask_config() -> dict:
"""Get dask configuration dict from old dask configuration file."""
dask_config: dict[str, Any] = {
"use": "local_threaded",
"profiles": {"local_threaded": {"scheduler": "threads"}},
}
config = yaml.safe_load(CONFIG_FILE.read_text(encoding="utf-8"))

# Use settings from file if this is not empty
if config is not None:
client_kwargs = config.get("client", {})
cluster_kwargs = config.get("cluster", {})

# Externally managed cluster
if "address" in client_kwargs:
if cluster_kwargs:
logger.warning(
"Not using Dask 'cluster' settings from %s because a "
"cluster 'address' is already provided in 'client'.",
CONFIG_FILE,
)
dask_config = {
"use": "external",
"profiles": {
"external": {
"scheduler_address": client_kwargs.pop("address"),
},
},
}

# Dask distributed cluster
elif cluster_kwargs:
cluster_kwargs.setdefault("type", "distributed.LocalCluster")
dask_config = {
"use": "cluster_from_file",
"profiles": {
"cluster_from_file": {
"cluster": cluster_kwargs,
},
},
}

dask_config["client"] = client_kwargs

return dask_config


# TODO: Remove in v2.14.0; used deepcopy(CFG["dask"]) instead
def _get_dask_config() -> dict:
"""Get Dask configuration dictionary."""
if CONFIG_FILE.exists() and not os.environ.get(
"ESMVALTOOL_USE_NEW_DASK_CONFIG"
):
dask_config = _get_old_dask_config()
else:
dask_config = deepcopy(CFG["dask"])
return dask_config


@contextlib.contextmanager
def get_distributed_client():
def get_distributed_client() -> Generator[None | Client]:
"""Get a Dask distributed client."""
dask_args = {}
if CONFIG_FILE.exists():
config = yaml.safe_load(CONFIG_FILE.read_text(encoding="utf-8"))
if config is not None:
dask_args = config

client_args = dask_args.get("client") or {}
cluster_args = dask_args.get("cluster") or {}

# Start a cluster, if requested
if "address" in client_args:
# Use an externally managed cluster.
warn_if_old_dask_config_exists()
dask_config = _get_dask_config()
validate_dask_config(dask_config)

# TODO: Remove in v2.14.0
client_kwargs = dask_config.get("client", {})

# Set up cluster and client according to the selected profile
# Note: we already ensured earlier that the selected profile (via `use`)
# actually exists in `profiles`, so we don't have to check that again here
logger.debug("Using Dask profile '%s'", dask_config["use"])
profile = dask_config["profiles"][dask_config["use"]]
cluster_kwargs = profile.pop("cluster", None)

logger.debug("Using additional Dask settings %s", profile)
dask.config.set(profile)

cluster: None | Cluster
client: None | Client

# Threaded scheduler
if cluster_kwargs is None:
cluster = None
if cluster_args:
logger.warning(
"Not using Dask 'cluster' settings from %s because a cluster "
"'address' is already provided in 'client'.",
CONFIG_FILE,
)
elif cluster_args:
# Start cluster.
cluster_type = cluster_args.pop(
"type",
"distributed.LocalCluster",
)

# Distributed scheduler
else:
cluster_type = cluster_kwargs.pop("type")
cluster_module_name, cluster_cls_name = cluster_type.rsplit(".", 1)
cluster_module = importlib.import_module(cluster_module_name)
cluster_cls = getattr(cluster_module, cluster_cls_name)
cluster = cluster_cls(**cluster_args)
client_args["address"] = cluster.scheduler_address
else:
# No cluster configured, use Dask basic scheduler, or a LocalCluster
# managed through Client.
cluster = None
cluster = cluster_cls(**cluster_kwargs)
dask.config.set({"scheduler_address": cluster.scheduler_address})
logger.debug("Using Dask cluster %s", cluster)

# Start a client, if requested
if dask_args:
client = Client(**client_args)
logger.info("Dask dashboard: %s", client.dashboard_link)
else:
logger.info("Using the Dask basic scheduler.")
if dask.config.get("scheduler_address", None) is None:
client = None
logger.info(
"Using Dask threaded scheduler. The distributed scheduler is "
"recommended, please read https://docs.esmvaltool.org/projects/"
"ESMValCore/en/latest/quickstart/"
"configure.html#dask-configuration how to use a distributed "
"scheduler."
)
else:
client = Client(**client_kwargs)
logger.info(
"Using Dask distributed scheduler (address: %s, dashboard link: "
"%s)",
dask.config.get("scheduler_address"),
client.dashboard_link,
)

try:
yield client
Expand Down
Loading

0 comments on commit 23400b1

Please sign in to comment.