Skip to content

Commit

Permalink
Improvement of ensemble_percentile | Fix of #298 (#339)
Browse files Browse the repository at this point in the history
* Improvement of ensemble_percentile | Fix of #298

* Modification of ensemble perc. tests

* Update HISTORY.rst and fix create_ensemble docstring

* Smart suggestion of codefactor

* Fix test coverage

* fix the fix

* Suggested modifications - docstring and typo

* Smarter rechunking

* Final merging and rearrangements

* Typo fix

Co-authored-by: Trevor James Smith <10819524+Zeitsperre@users.noreply.github.com>
  • Loading branch information
aulemahal and Zeitsperre authored Jan 10, 2020
1 parent 0790e8c commit 76e3462
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 127 deletions.
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ History
* Fixed `ensembles.create_ensemble` errors for builds against xarray master branch.
* Reformatted code to make better use of Python3.6 conventions (f-strings and object signatures).
* Fixed randomly failing tests of `checks.missing_any`.
* Improvement of `ensemble.ensemble_percentile` and `ensemble.create_ensemble`.

0.12.x-beta (2019-11-18)
------------------------
Expand Down
9 changes: 6 additions & 3 deletions tests/test_ensembles.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,13 @@ def test_calc_perc(self, transpose):
out1 = ensembles.ensemble_percentiles(ens, values=(25, 75))
assert np.all(out1["tg_mean_p75"] > out1["tg_mean_p25"])

def test_calc_perc_blocks(self):
@pytest.mark.parametrize("keep_chunk_size", [False, True, None])
def test_calc_perc_dask(self, keep_chunk_size):
ens = ensembles.create_ensemble(self.nc_files_simple)
out1 = ensembles.ensemble_percentiles(ens)
out2 = ensembles.ensemble_percentiles(ens, values=(10, 50, 90), time_block=10)
out2 = ensembles.ensemble_percentiles(
ens.chunk({"time": 2}), values=(10, 50, 90), keep_chunk_size=keep_chunk_size
)
out1 = ensembles.ensemble_percentiles(ens.load())
np.testing.assert_array_equal(out1["tg_mean_p10"], out2["tg_mean_p10"])
np.testing.assert_array_equal(out1["tg_mean_p50"], out2["tg_mean_p50"])
np.testing.assert_array_equal(out1["tg_mean_p90"], out2["tg_mean_p90"])
Expand Down
206 changes: 82 additions & 124 deletions xclim/ensembles.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def create_ensemble(
a new dimension (name:'realization'). In the case where input files have unequal time dimensions, the output
ensemble Dataset is created for maximum time-step interval of all input files. Before concatenation, datasets not
covering the entire time span have their data padded with NaN values.
Dataset and variable attributes of the first dataset are copied to the resulting dataset.
Parameters
----------
Expand All @@ -48,7 +49,7 @@ def create_ensemble(
Only applicable when "datasets" is a sequence of file paths.
xr_kwargs :
Any keyword arguments to be given to xarray when opening the files.
Any keyword arguments to be given to `xr.open_dataset` when opening the files (or to `xr.open_mfdataset` if mf_flag is True)
Returns
-------
Expand All @@ -74,19 +75,16 @@ def create_ensemble(
>>> datasets.append(glob.glob('/dir2/*.nc'))
>>> ens = ensembles.create_ensemble(datasets, mf_flag=True)
"""

dim = "realization"

time_flag, time_all = _ens_checktimes(datasets, mf_flag, **xr_kwargs)

ds1 = _ens_align_datasets(datasets, mf_flag, time_flag, time_all)
ds = _ens_align_datasets(datasets, mf_flag, time_flag, time_all, **xr_kwargs)

for v in list(ds1[0].data_vars):
list1 = [ds[v] for ds in ds1]
data = xr.concat(list1, dim=dim)
if v == list(ds1[0].data_vars)[0]:
ens = xr.Dataset(data_vars=None, coords=data.coords, attrs=ds1[0].attrs)
ens[v] = data
dim = xr.IndexVariable("realization", np.arange(len(ds)), attrs={"axis": "E"})

ens = xr.concat(ds, dim)
for vname, var in ds[0].variables.items():
ens[vname].attrs.update(**var.attrs)
ens.attrs.update(**ds[0].attrs)

return ens

Expand Down Expand Up @@ -142,22 +140,23 @@ def ensemble_mean_std_max_min(ens: xr.Dataset) -> xr.Dataset:
def ensemble_percentiles(
ens: xr.Dataset,
values: Tuple[int, int, int] = (10, 50, 90),
time_block: Optional[int] = None,
keep_chunk_size: Optional[bool] = None,
) -> xr.Dataset:
"""Calculate ensemble statistics between a results from an ensemble of climate simulations.
Returns a Dataset containing ensemble statistics for input climate simulations.
Alternatively calculate ensemble percentiles (default) or ensemble mean and standard deviation.
Returns a Dataset containing ensemble percentiles for input climate simulations.
Parameters
----------
ens: xr.Dataset
Ensemble dataset (see xclim.ensembles.create_ensemble).
values : Tuple[int, int, int]
Percentile values to calculate. Default: (10, 50, 90).
time_block : Optional[int]
For large ensembles, iteratively calculate percentiles in time-step blocks (n==time_block).
If not defined, the function tries to estimate an appropriate value.
keep_chunk_size : Optional[bool]
For ensembles using dask arrays, all chunks along the 'realization' axis are merged.
If True, the dataset is rechunked along the dimension with the largest chunks, so that the chunks keep the same size (approx)
If False, no shrinking is performed, resulting in much larger chunks
If not defined, the function decides which is best
Returns
-------
Expand All @@ -177,34 +176,58 @@ def ensemble_percentiles(
Calculate non-default percentiles (25th and 75th)
>>> ens_percs = ensembles.ensemble_percentiles(ens, values=(25, 50, 75))
>>> print(ens_percs['tas_p25'])
Calculate by time blocks (n=10) if ensemble size is too large to load in memory
>>> ens_percs = ensembles.ensemble_percentiles(ens, time_block=10)
If the original array has many small chunks, it might be more efficient to do:
>>> ens_percs = ensembles.ensemble_percentiles(ens, keep_chunk_size=False)
>>> print(ens_percs['tas_p25'])
"""

ds_out = ens.drop_vars(names=set(ens.data_vars))
dims = list(ens.dims)
for v in ens.data_vars:
# Percentile calculation requires load to memory : automate size for large ensemble objects
if not time_block:
time_block = round(
2e8 / (ens[v].size / ens[v].shape[dims.index("time")]), -1
) # 2E8

if time_block > len(ens[v].time):
out = _calc_percentiles_simple(ens, v, values)

# Percentile calculation forbids any chunks along realization
if len(ens.chunks.get("realization", [])) > 1:
if keep_chunk_size is None:
# Enable smart rechunking is chunksize exceed 2E8 elements after merging along realization
keep_chunk_size = (
np.prod(ens[v].isel(realization=0).data.chunksize)
* ens.realization.size
> 2e8
)
if keep_chunk_size:
# Smart rechunk on dimension where chunks are the largest
chkDim, chks = max(
ens.chunks.items(),
key=lambda kv: 0 if kv[0] == "realization" else max(kv[1]),
)
var = ens[v].chunk(
{"realization": -1, chkDim: len(chks) * ens.realization.size,}
)
else:
var = ens[v].chunk({"realization": -1})
else:
# loop through blocks
warnings.warn(
f"Large ensemble size detected: statistics will be"
f" calculated in blocks of {int(time_block)} time-steps.",
UserWarning,
stacklevel=2,
var = ens[v]

for p in values:
perc = xr.apply_ufunc(
_calc_perc,
var,
input_core_dims=[["realization"]],
output_core_dims=[[]],
keep_attrs=True,
kwargs=dict(p=p),
dask="parallelized",
output_dtypes=[ens[v].dtype],
)
out = _calc_percentiles_blocks(ens, v, values, time_block)
for vv in out.data_vars:
ds_out[vv] = out[vv]

perc.name = f"{v}_p{p:02d}"
ds_out[perc.name] = perc

if "description" in ds_out[perc.name].attrs:
ds_out[perc.name].attrs[
"description"
] = f"{ds_out[perc.name].attrs['description']} : {p}th percentile of ensemble"
else:
ds_out[perc.name].attrs["description"] = f"{p}th percentile of ensemble"

return ds_out


Expand Down Expand Up @@ -341,98 +364,33 @@ def _ens_align_datasets(
return ds_all


def _calc_percentiles_simple(ens, v, values):
ds_out = ens.drop_vars(names=set(ens.data_vars))
dims = list(ens[v].dims)
outdims = [x for x in dims if "realization" not in x]

# print('loading ensemble data to memory')
arr = ens[v].load() # percentile calc requires loading the array
coords = {}
for c in outdims:
coords[c] = arr[c]
for p in values:
outvar = v + "_p" + str(p)

out1 = _calc_perc(arr, p)

ds_out[outvar] = xr.DataArray(out1, dims=outdims, coords=coords)
ds_out[outvar].attrs = ens[v].attrs
if "description" in ds_out[outvar].attrs.keys():
ds_out[outvar].attrs[
"description"
] = f"{ds_out[outvar].attrs['description']} : {p}th percentile of ensemble"
else:
ds_out[outvar].attrs["description"] = f"{p}th percentile of ensemble"
return ds_out


def _calc_percentiles_blocks(ens, v, values, time_block):
ds_out = ens.drop_vars(names=set(ens.data_vars))
dims = list(ens[v].dims)
outdims = [x for x in dims if "realization" not in x]

blocks = list(range(0, len(ens.time) + 1, int(time_block)))
if blocks[-1] != len(ens[v].time):
blocks.append(len(ens[v].time))
arr_p_all = {}
for t in range(0, len(blocks) - 1):
# print('Calculating block ', t + 1, ' of ', len(blocks) - 1)
time_sel = slice(blocks[t], blocks[t + 1])
arr = (
ens[v].isel(time=time_sel).load()
) # percentile calc requires loading the array
coords = {}
for c in outdims:
coords[c] = arr[c]
for p in values:

out1 = _calc_perc(arr, p)

if t == 0:
arr_p_all[str(p)] = xr.DataArray(out1, dims=outdims, coords=coords)
else:
arr_p_all[str(p)] = xr.concat(
[
arr_p_all[str(p)],
xr.DataArray(out1, dims=outdims, coords=coords),
],
dim="time",
)
for p in values:
outvar = v + "_p" + str(p)
ds_out[outvar] = arr_p_all[str(p)]
ds_out[outvar].attrs = ens[v].attrs
if "description" in ds_out[outvar].attrs.keys():
ds_out[outvar].attrs[
"description"
] = f"{ds_out[outvar].attrs['description']} : {p}th percentile of ensemble"
def _calc_perc(arr, p=50):
"""Ufunc-like computing a percentile over the last axis of the array.
else:
ds_out[outvar].attrs["description"] = f"{p}th percentile of ensemble"
Processes cases with invalid values separately, which makes it more efficent than np.nanpercentile for array with only a few invalid points.
return ds_out


def _calc_perc(arr, p):
dims = arr.dims
# make sure realization is the first dimension
if dims.index("realization") != 0:
arr = arr.transpose(
"realization", *[dim for dim in dims if dim != "realization"]
)
Parameters
----------
arr : np.array
Percentile is computed over the last axis.
p : scalar
Percentile to compute, between 0 and 100. (the default is 50)
nan_count = np.isnan(arr).sum(axis=0)
out = np.percentile(arr, p, axis=0)
if np.any((nan_count > 0) & (nan_count < arr.shape[0])):
arr1 = arr.values.reshape(arr.shape[0], int(arr.size / arr.shape[0]))
# only use nanpercentile where we need it (slow performace compared to standard) :
nan_index = np.where((nan_count > 0) & (nan_count < arr.shape[0]))
Returns
-------
np.array
"""
nan_count = np.isnan(arr).sum(axis=-1)
out = np.percentile(arr, p, axis=-1)
nans = (nan_count > 0) & (nan_count < arr.shape[-1])
if np.any(nans):
arr1 = arr.reshape(int(arr.size / arr.shape[-1]), arr.shape[-1])
# only use nanpercentile where we need it (slow performance compared to standard) :
nan_index = np.where(nans)
t = np.ravel_multi_index(nan_index, nan_count.shape)
out[np.unravel_index(t, nan_count.shape)] = np.nanpercentile(
arr1[:, t], p, axis=0
arr1[t, :], p, axis=-1
)

return out


Expand Down

0 comments on commit 76e3462

Please sign in to comment.