Skip to content

Commit

Permalink
Optimize functions mask_landsea(), mask_landseaice() and `calcula…
Browse files Browse the repository at this point in the history
…te_volume()` for lazy input (#2515)

Co-authored-by: Bouwe Andela <b.andela@esciencecenter.nl>
  • Loading branch information
schlunma and bouweandela authored Sep 4, 2024
1 parent a35d50d commit 2247a29
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 129 deletions.
153 changes: 91 additions & 62 deletions esmvalcore/preprocessor/_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,33 @@

import logging
import os
from collections.abc import Iterable
from typing import Literal, Optional

import cartopy.io.shapereader as shpreader
import dask.array as da
import iris
import iris.util
import numpy as np
import shapely.vectorized as shp_vect
from iris.analysis import Aggregator
from iris.cube import Cube
from iris.util import rolling_window

from esmvalcore.preprocessor._shared import get_array_module

from ._supplementary_vars import register_supplementaries

logger = logging.getLogger(__name__)


def _get_fx_mask(fx_data, fx_option, mask_type):
def _get_fx_mask(
fx_data: np.ndarray | da.Array,
fx_option: Literal['land', 'sea', 'landsea', 'ice'],
mask_type: Literal['sftlf', 'sftof', 'sftgif'],
) -> np.ndarray | da.Array:
"""Build a percentage-thresholded mask from an fx file."""
inmask = da.zeros_like(fx_data, bool)
inmask = np.zeros_like(fx_data, bool) # respects dask through dispatch
if mask_type == 'sftlf':
if fx_option == 'land':
# Mask land out
Expand All @@ -50,22 +60,29 @@ def _get_fx_mask(fx_data, fx_option, mask_type):
return inmask


def _apply_fx_mask(fx_mask, var_data):
"""Apply the fx data extracted mask on the actual processed data."""
# Apply mask across
old_mask = da.ma.getmaskarray(var_data)
mask = old_mask | fx_mask
var_data = da.ma.masked_array(var_data, mask=mask)
# maybe fill_value=1e+20

return var_data
def _apply_mask(
mask: np.ndarray | da.Array,
array: np.ndarray | da.Array,
dim_map: Optional[Iterable[int]] = None,
) -> np.ndarray | da.Array:
"""Apply a (broadcasted) mask on an array."""
npx = get_array_module(mask, array)
if dim_map is not None:
if isinstance(array, da.Array):
chunks = array.chunks
else:
chunks = None
mask = iris.util.broadcast_to_shape(
mask, array.shape, dim_map, chunks=chunks
)
return npx.ma.masked_where(mask, array)


@register_supplementaries(
variables=['sftlf', 'sftof'],
required='prefer_at_least_one',
)
def mask_landsea(cube, mask_out):
def mask_landsea(cube: Cube, mask_out: Literal['land', 'sea']) -> Cube:
"""Mask out either land mass or sea (oceans, seas and lakes).
It uses dedicated ancillary variables (sftlf or sftof) or,
Expand All @@ -78,16 +95,15 @@ def mask_landsea(cube, mask_out):
Parameters
----------
cube: iris.cube.Cube
data cube to be masked. If the cube has an
cube:
Data cube to be masked. If the cube has an
:class:`iris.coords.AncillaryVariable` with standard name
``'land_area_fraction'`` or ``'sea_area_fraction'`` that will be used.
If both are present, only the 'land_area_fraction' will be used. If the
ancillary variable is not available, the mask will be calculated from
Natural Earth shapefiles.
mask_out: str
either "land" to mask out land mass or "sea" to mask out seas.
mask_out:
Either ``'land'`` to mask out land mass or ``'sea'`` to mask out seas.
Returns
-------
Expand All @@ -112,35 +128,40 @@ def mask_landsea(cube, mask_out):
}

# preserve importance order: try stflf first then sftof
fx_cube = None
ancillary_var = None
try:
fx_cube = cube.ancillary_variable('land_area_fraction')
ancillary_var = cube.ancillary_variable('land_area_fraction')
except iris.exceptions.AncillaryVariableNotFoundError:
try:
fx_cube = cube.ancillary_variable('sea_area_fraction')
ancillary_var = cube.ancillary_variable('sea_area_fraction')
except iris.exceptions.AncillaryVariableNotFoundError:
logger.debug('Ancillary variables land/sea area fraction not '
'found in cube. Check fx_file availability.')

if fx_cube:
fx_cube_data = da.broadcast_to(fx_cube.core_data(), cube.shape)
landsea_mask = _get_fx_mask(fx_cube_data, mask_out,
fx_cube.var_name)
cube.data = _apply_fx_mask(landsea_mask, cube.core_data())
logger.debug("Applying land-sea mask: %s", fx_cube.var_name)
logger.debug(
"Ancillary variables land/sea area fraction not found in "
"cube. Check fx_file availability."
)

if ancillary_var:
landsea_mask = _get_fx_mask(
ancillary_var.core_data(), mask_out, ancillary_var.var_name
)
cube.data = _apply_mask(
landsea_mask,
cube.core_data(),
cube.ancillary_variable_dims(ancillary_var),
)
logger.debug("Applying land-sea mask: %s", ancillary_var.var_name)
else:
if cube.coord('longitude').points.ndim < 2:
cube = _mask_with_shp(cube, shapefiles[mask_out], [
0,
])
cube = _mask_with_shp(cube, shapefiles[mask_out], [0])
logger.debug(
"Applying land-sea mask from Natural Earth shapefile: \n%s",
shapefiles[mask_out],
)
else:
msg = ("Use of shapefiles with irregular grids not yet "
"implemented, land-sea mask not applied.")
raise ValueError(msg)
raise ValueError(
"Use of shapefiles with irregular grids not yet implemented, "
"land-sea mask not applied."
)

return cube

Expand All @@ -149,7 +170,7 @@ def mask_landsea(cube, mask_out):
variables=['sftgif'],
required='require_at_least_one',
)
def mask_landseaice(cube, mask_out):
def mask_landseaice(cube: Cube, mask_out: Literal['landsea', 'ice']) -> Cube:
"""Mask out either landsea (combined) or ice.
Function that masks out either landsea (land and seas) or ice (Antarctica,
Expand All @@ -159,13 +180,13 @@ def mask_landseaice(cube, mask_out):
Parameters
----------
cube: iris.cube.Cube
data cube to be masked. It should have an
cube:
Data cube to be masked. It should have an
:class:`iris.coords.AncillaryVariable` with standard name
``'land_ice_area_fraction'``.
mask_out: str
either "landsea" to mask out landsea or "ice" to mask out ice.
Either ``'landsea'`` to mask out land and oceans or ``'ice'`` to mask
out ice.
Returns
-------
Expand All @@ -178,20 +199,26 @@ def mask_landseaice(cube, mask_out):
Error raised if landsea-ice mask not found as an ancillary variable.
"""
# sftgif is the only one so far but users can set others
fx_cube = None
ancillary_var = None
try:
fx_cube = cube.ancillary_variable('land_ice_area_fraction')
ancillary_var = cube.ancillary_variable('land_ice_area_fraction')
except iris.exceptions.AncillaryVariableNotFoundError:
logger.debug('Ancillary variable land ice area fraction '
'not found in cube. Check fx_file availability.')
if fx_cube:
fx_cube_data = da.broadcast_to(fx_cube.core_data(), cube.shape)
landice_mask = _get_fx_mask(fx_cube_data, mask_out, fx_cube.var_name)
cube.data = _apply_fx_mask(landice_mask, cube.core_data())
logger.debug(
"Ancillary variable land ice area fraction not found in cube. "
"Check fx_file availability."
)
if ancillary_var:
landseaice_mask = _get_fx_mask(
ancillary_var.core_data(), mask_out, ancillary_var.var_name
)
cube.data = _apply_mask(
landseaice_mask,
cube.core_data(),
cube.ancillary_variable_dims(ancillary_var),
)
logger.debug("Applying landsea-ice mask: sftgif")
else:
msg = "Landsea-ice mask could not be found. Stopping. "
raise ValueError(msg)
raise ValueError("Landsea-ice mask could not be found. Stopping.")

return cube

Expand Down Expand Up @@ -285,9 +312,10 @@ def _mask_with_shp(cube, shapefilename, region_indices=None):
# Create a set of x,y points from the cube
# 1D regular grids
if cube.coord('longitude').points.ndim < 2:
x_p, y_p = da.meshgrid(
x_p, y_p = np.meshgrid(
cube.coord(axis='X').points,
cube.coord(axis='Y').points)
cube.coord(axis='Y').points,
)
# 2D irregular grids; spit an error for now
else:
msg = ("No fx-files found (sftlf or sftof)!"
Expand All @@ -296,14 +324,14 @@ def _mask_with_shp(cube, shapefilename, region_indices=None):
raise ValueError(msg)

# Wrap around longitude coordinate to match data
x_p_180 = da.where(x_p >= 180., x_p - 360., x_p)
x_p_180 = np.where(x_p >= 180., x_p - 360., x_p)

# the NE mask has no points at x = -180 and y = +/-90
# so we will fool it and apply the mask at (-179, -89, 89) instead
x_p_180 = da.where(x_p_180 == -180., x_p_180 + 1., x_p_180)
x_p_180 = np.where(x_p_180 == -180., x_p_180 + 1., x_p_180)

y_p_0 = da.where(y_p == -90., y_p + 1., y_p)
y_p_90 = da.where(y_p_0 == 90., y_p_0 - 1., y_p_0)
y_p_0 = np.where(y_p == -90., y_p + 1., y_p)
y_p_90 = np.where(y_p_0 == 90., y_p_0 - 1., y_p_0)

mask = None
for region in regions:
Expand All @@ -313,13 +341,14 @@ def _mask_with_shp(cube, shapefilename, region_indices=None):
else:
mask |= shp_vect.contains(region, x_p_180, y_p_90)

mask = da.array(mask)
iris.util.broadcast_to_shape(mask, cube.shape, cube.coord_dims('latitude')
+ cube.coord_dims('longitude'))
if cube.has_lazy_data():
mask = da.array(mask)

old_mask = da.ma.getmaskarray(cube.core_data())
mask = old_mask | mask
cube.data = da.ma.masked_array(cube.core_data(), mask=mask)
cube.data = _apply_mask(
mask,
cube.core_data(),
cube.coord_dims('latitude') + cube.coord_dims('longitude'),
)

return cube

Expand Down
14 changes: 11 additions & 3 deletions esmvalcore/preprocessor/_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,11 @@ def get_weights(

# Time weights: lengths of time interval
if 'time' in coords:
weights *= broadcast_to_shape(
weights = weights * broadcast_to_shape(
npx.array(get_time_weights(cube)),
cube.shape,
cube.coord_dims('time'),
chunks=cube.lazy_data().chunks if cube.has_lazy_data() else None,
)

# Latitude weights: cell areas
Expand All @@ -350,10 +351,17 @@ def get_weights(
f"variable)"
)
try_adding_calculated_cell_area(cube)
weights *= broadcast_to_shape(
cube.cell_measure('cell_area').core_data(),
area_weights = cube.cell_measure('cell_area').core_data()
if cube.has_lazy_data():
area_weights = da.array(area_weights)
chunks = cube.lazy_data().chunks
else:
chunks = None
weights = weights * broadcast_to_shape(
area_weights,
cube.shape,
cube.cell_measure_dims('cell_area'),
chunks=chunks,
)

return weights
Expand Down
11 changes: 7 additions & 4 deletions esmvalcore/preprocessor/_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,22 @@ def calculate_volume(cube: Cube) -> da.core.Array:
try_adding_calculated_cell_area(cube)
area = cube.cell_measure('cell_area').copy()
area_dim = cube.cell_measure_dims(area)

# Ensure cell area is in square meters as the units
area.convert_units('m2')
area_array = area.core_data()
if cube.has_lazy_data():
area_array = da.array(area_array)

# Make sure input cube has not been modified
if not has_cell_measure:
cube.remove_cell_measure('cell_area')

chunks = cube.core_data().chunks if cube.has_lazy_data() else None
area_arr = broadcast_to_shape(
area.core_data(), cube.shape, area_dim, chunks=chunks)
area_array, cube.shape, area_dim, chunks=chunks
)
thickness_arr = broadcast_to_shape(
thickness, cube.shape, z_dim, chunks=chunks)
thickness, cube.shape, z_dim, chunks=chunks
)
grid_volume = area_arr * thickness_arr

return grid_volume
Expand Down
Loading

0 comments on commit 2247a29

Please sign in to comment.