From 76e34628b59ccb25c7db3b9c1b3c87f250b8ff76 Mon Sep 17 00:00:00 2001 From: Pascal Bourgault Date: Fri, 10 Jan 2020 14:34:04 -0500 Subject: [PATCH] Improvement of ensemble_percentile | Fix of #298 (#339) * Improvement of ensemble_percentile | Fix of #298 * Modification of ensemble perc. tests * Update HISTORY.rst and fix create_ensemble docstring * Smart suggestion of codefactor * Fix test coverage * fix the fix * Suggested modifications - docstring and typo * Smarter rechunking * Final merging and rearrangements * Typo fix Co-authored-by: Trevor James Smith <10819524+Zeitsperre@users.noreply.github.com> --- HISTORY.rst | 1 + tests/test_ensembles.py | 9 +- xclim/ensembles.py | 206 ++++++++++++++++------------------------ 3 files changed, 89 insertions(+), 127 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 2da529847..50dd6f926 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -10,6 +10,7 @@ History * Fixed `ensembles.create_ensemble` errors for builds against xarray master branch. * Reformatted code to make better use of Python3.6 conventions (f-strings and object signatures). * Fixed randomly failing tests of `checks.missing_any`. +* Improvement of `ensemble.ensemble_percentile` and `ensemble.create_ensemble`. 0.12.x-beta (2019-11-18) ------------------------ diff --git a/tests/test_ensembles.py b/tests/test_ensembles.py index 60f94f58e..34aa4aa72 100644 --- a/tests/test_ensembles.py +++ b/tests/test_ensembles.py @@ -137,10 +137,13 @@ def test_calc_perc(self, transpose): out1 = ensembles.ensemble_percentiles(ens, values=(25, 75)) assert np.all(out1["tg_mean_p75"] > out1["tg_mean_p25"]) - def test_calc_perc_blocks(self): + @pytest.mark.parametrize("keep_chunk_size", [False, True, None]) + def test_calc_perc_dask(self, keep_chunk_size): ens = ensembles.create_ensemble(self.nc_files_simple) - out1 = ensembles.ensemble_percentiles(ens) - out2 = ensembles.ensemble_percentiles(ens, values=(10, 50, 90), time_block=10) + out2 = ensembles.ensemble_percentiles( + ens.chunk({"time": 2}), values=(10, 50, 90), keep_chunk_size=keep_chunk_size + ) + out1 = ensembles.ensemble_percentiles(ens.load()) np.testing.assert_array_equal(out1["tg_mean_p10"], out2["tg_mean_p10"]) np.testing.assert_array_equal(out1["tg_mean_p50"], out2["tg_mean_p50"]) np.testing.assert_array_equal(out1["tg_mean_p90"], out2["tg_mean_p90"]) diff --git a/xclim/ensembles.py b/xclim/ensembles.py index ddbccd9a6..f333facee 100644 --- a/xclim/ensembles.py +++ b/xclim/ensembles.py @@ -36,6 +36,7 @@ def create_ensemble( a new dimension (name:'realization'). In the case where input files have unequal time dimensions, the output ensemble Dataset is created for maximum time-step interval of all input files. Before concatenation, datasets not covering the entire time span have their data padded with NaN values. + Dataset and variable attributes of the first dataset are copied to the resulting dataset. Parameters ---------- @@ -48,7 +49,7 @@ def create_ensemble( Only applicable when "datasets" is a sequence of file paths. xr_kwargs : - Any keyword arguments to be given to xarray when opening the files. + Any keyword arguments to be given to `xr.open_dataset` when opening the files (or to `xr.open_mfdataset` if mf_flag is True) Returns ------- @@ -74,19 +75,16 @@ def create_ensemble( >>> datasets.append(glob.glob('/dir2/*.nc')) >>> ens = ensembles.create_ensemble(datasets, mf_flag=True) """ - - dim = "realization" - time_flag, time_all = _ens_checktimes(datasets, mf_flag, **xr_kwargs) - ds1 = _ens_align_datasets(datasets, mf_flag, time_flag, time_all) + ds = _ens_align_datasets(datasets, mf_flag, time_flag, time_all, **xr_kwargs) - for v in list(ds1[0].data_vars): - list1 = [ds[v] for ds in ds1] - data = xr.concat(list1, dim=dim) - if v == list(ds1[0].data_vars)[0]: - ens = xr.Dataset(data_vars=None, coords=data.coords, attrs=ds1[0].attrs) - ens[v] = data + dim = xr.IndexVariable("realization", np.arange(len(ds)), attrs={"axis": "E"}) + + ens = xr.concat(ds, dim) + for vname, var in ds[0].variables.items(): + ens[vname].attrs.update(**var.attrs) + ens.attrs.update(**ds[0].attrs) return ens @@ -142,12 +140,11 @@ def ensemble_mean_std_max_min(ens: xr.Dataset) -> xr.Dataset: def ensemble_percentiles( ens: xr.Dataset, values: Tuple[int, int, int] = (10, 50, 90), - time_block: Optional[int] = None, + keep_chunk_size: Optional[bool] = None, ) -> xr.Dataset: """Calculate ensemble statistics between a results from an ensemble of climate simulations. - Returns a Dataset containing ensemble statistics for input climate simulations. - Alternatively calculate ensemble percentiles (default) or ensemble mean and standard deviation. + Returns a Dataset containing ensemble percentiles for input climate simulations. Parameters ---------- @@ -155,9 +152,11 @@ def ensemble_percentiles( Ensemble dataset (see xclim.ensembles.create_ensemble). values : Tuple[int, int, int] Percentile values to calculate. Default: (10, 50, 90). - time_block : Optional[int] - For large ensembles, iteratively calculate percentiles in time-step blocks (n==time_block). - If not defined, the function tries to estimate an appropriate value. + keep_chunk_size : Optional[bool] + For ensembles using dask arrays, all chunks along the 'realization' axis are merged. + If True, the dataset is rechunked along the dimension with the largest chunks, so that the chunks keep the same size (approx) + If False, no shrinking is performed, resulting in much larger chunks + If not defined, the function decides which is best Returns ------- @@ -177,34 +176,58 @@ def ensemble_percentiles( Calculate non-default percentiles (25th and 75th) >>> ens_percs = ensembles.ensemble_percentiles(ens, values=(25, 50, 75)) >>> print(ens_percs['tas_p25']) - Calculate by time blocks (n=10) if ensemble size is too large to load in memory - >>> ens_percs = ensembles.ensemble_percentiles(ens, time_block=10) + If the original array has many small chunks, it might be more efficient to do: + >>> ens_percs = ensembles.ensemble_percentiles(ens, keep_chunk_size=False) >>> print(ens_percs['tas_p25']) """ ds_out = ens.drop_vars(names=set(ens.data_vars)) - dims = list(ens.dims) for v in ens.data_vars: - # Percentile calculation requires load to memory : automate size for large ensemble objects - if not time_block: - time_block = round( - 2e8 / (ens[v].size / ens[v].shape[dims.index("time")]), -1 - ) # 2E8 - - if time_block > len(ens[v].time): - out = _calc_percentiles_simple(ens, v, values) - + # Percentile calculation forbids any chunks along realization + if len(ens.chunks.get("realization", [])) > 1: + if keep_chunk_size is None: + # Enable smart rechunking is chunksize exceed 2E8 elements after merging along realization + keep_chunk_size = ( + np.prod(ens[v].isel(realization=0).data.chunksize) + * ens.realization.size + > 2e8 + ) + if keep_chunk_size: + # Smart rechunk on dimension where chunks are the largest + chkDim, chks = max( + ens.chunks.items(), + key=lambda kv: 0 if kv[0] == "realization" else max(kv[1]), + ) + var = ens[v].chunk( + {"realization": -1, chkDim: len(chks) * ens.realization.size,} + ) + else: + var = ens[v].chunk({"realization": -1}) else: - # loop through blocks - warnings.warn( - f"Large ensemble size detected: statistics will be" - f" calculated in blocks of {int(time_block)} time-steps.", - UserWarning, - stacklevel=2, + var = ens[v] + + for p in values: + perc = xr.apply_ufunc( + _calc_perc, + var, + input_core_dims=[["realization"]], + output_core_dims=[[]], + keep_attrs=True, + kwargs=dict(p=p), + dask="parallelized", + output_dtypes=[ens[v].dtype], ) - out = _calc_percentiles_blocks(ens, v, values, time_block) - for vv in out.data_vars: - ds_out[vv] = out[vv] + + perc.name = f"{v}_p{p:02d}" + ds_out[perc.name] = perc + + if "description" in ds_out[perc.name].attrs: + ds_out[perc.name].attrs[ + "description" + ] = f"{ds_out[perc.name].attrs['description']} : {p}th percentile of ensemble" + else: + ds_out[perc.name].attrs["description"] = f"{p}th percentile of ensemble" + return ds_out @@ -341,98 +364,33 @@ def _ens_align_datasets( return ds_all -def _calc_percentiles_simple(ens, v, values): - ds_out = ens.drop_vars(names=set(ens.data_vars)) - dims = list(ens[v].dims) - outdims = [x for x in dims if "realization" not in x] - - # print('loading ensemble data to memory') - arr = ens[v].load() # percentile calc requires loading the array - coords = {} - for c in outdims: - coords[c] = arr[c] - for p in values: - outvar = v + "_p" + str(p) - - out1 = _calc_perc(arr, p) - - ds_out[outvar] = xr.DataArray(out1, dims=outdims, coords=coords) - ds_out[outvar].attrs = ens[v].attrs - if "description" in ds_out[outvar].attrs.keys(): - ds_out[outvar].attrs[ - "description" - ] = f"{ds_out[outvar].attrs['description']} : {p}th percentile of ensemble" - else: - ds_out[outvar].attrs["description"] = f"{p}th percentile of ensemble" - return ds_out - - -def _calc_percentiles_blocks(ens, v, values, time_block): - ds_out = ens.drop_vars(names=set(ens.data_vars)) - dims = list(ens[v].dims) - outdims = [x for x in dims if "realization" not in x] - - blocks = list(range(0, len(ens.time) + 1, int(time_block))) - if blocks[-1] != len(ens[v].time): - blocks.append(len(ens[v].time)) - arr_p_all = {} - for t in range(0, len(blocks) - 1): - # print('Calculating block ', t + 1, ' of ', len(blocks) - 1) - time_sel = slice(blocks[t], blocks[t + 1]) - arr = ( - ens[v].isel(time=time_sel).load() - ) # percentile calc requires loading the array - coords = {} - for c in outdims: - coords[c] = arr[c] - for p in values: - - out1 = _calc_perc(arr, p) - - if t == 0: - arr_p_all[str(p)] = xr.DataArray(out1, dims=outdims, coords=coords) - else: - arr_p_all[str(p)] = xr.concat( - [ - arr_p_all[str(p)], - xr.DataArray(out1, dims=outdims, coords=coords), - ], - dim="time", - ) - for p in values: - outvar = v + "_p" + str(p) - ds_out[outvar] = arr_p_all[str(p)] - ds_out[outvar].attrs = ens[v].attrs - if "description" in ds_out[outvar].attrs.keys(): - ds_out[outvar].attrs[ - "description" - ] = f"{ds_out[outvar].attrs['description']} : {p}th percentile of ensemble" +def _calc_perc(arr, p=50): + """Ufunc-like computing a percentile over the last axis of the array. - else: - ds_out[outvar].attrs["description"] = f"{p}th percentile of ensemble" + Processes cases with invalid values separately, which makes it more efficent than np.nanpercentile for array with only a few invalid points. - return ds_out - - -def _calc_perc(arr, p): - dims = arr.dims - # make sure realization is the first dimension - if dims.index("realization") != 0: - arr = arr.transpose( - "realization", *[dim for dim in dims if dim != "realization"] - ) + Parameters + ---------- + arr : np.array + Percentile is computed over the last axis. + p : scalar + Percentile to compute, between 0 and 100. (the default is 50) - nan_count = np.isnan(arr).sum(axis=0) - out = np.percentile(arr, p, axis=0) - if np.any((nan_count > 0) & (nan_count < arr.shape[0])): - arr1 = arr.values.reshape(arr.shape[0], int(arr.size / arr.shape[0])) - # only use nanpercentile where we need it (slow performace compared to standard) : - nan_index = np.where((nan_count > 0) & (nan_count < arr.shape[0])) + Returns + ------- + np.array + """ + nan_count = np.isnan(arr).sum(axis=-1) + out = np.percentile(arr, p, axis=-1) + nans = (nan_count > 0) & (nan_count < arr.shape[-1]) + if np.any(nans): + arr1 = arr.reshape(int(arr.size / arr.shape[-1]), arr.shape[-1]) + # only use nanpercentile where we need it (slow performance compared to standard) : + nan_index = np.where(nans) t = np.ravel_multi_index(nan_index, nan_count.shape) out[np.unravel_index(t, nan_count.shape)] = np.nanpercentile( - arr1[:, t], p, axis=0 + arr1[t, :], p, axis=-1 ) - return out