diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..8412d5e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ +services: + geocube: + build: + context: . + dockerfile: Dockerfile + environment: + TOOL_RUN: geocube + command: echo "run this tool as docker compose run --rm geocube python run.py" + volumes: + - ./in:/in + - ./out:/out + - ./src:/src + \ No newline at end of file diff --git a/in/inputs.json b/in/inputs.json index 651bf91..dfdccb9 100644 --- a/in/inputs.json +++ b/in/inputs.json @@ -4,7 +4,8 @@ "integration": "spatiotemporal", "precision": "7d", "resolution": 5000, - "aggregates": ["mean", "min", "max"] + "aggregates": ["mean", "min", "max"], + "target_epsg": 25832 } } } \ No newline at end of file diff --git a/src/gridding.py b/src/gridding.py new file mode 100644 index 0000000..32f9f38 --- /dev/null +++ b/src/gridding.py @@ -0,0 +1,123 @@ +from typing import List, Optional, Literal + +import numpy as np +import pandas as pd +import xarray as xr +from rioxarray.crs import CRS + +# define the possible integration types +INTEGRATIONS = Literal['spatiotemporal', 'spatial', 'temporal'] + + +def create_grid(arrays: List[xr.DataArray], integration: INTEGRATIONS, resolution: int, precision: str, target_epsg: Optional[int] = None, buffer_edge: Optional[float] = 0.01, **kwargs) -> xr.Dataset: + # switch the integration + if integration == 'spatiotemporal': + return create_spatiotemporal_grid(arrays, resolution, precision, target_epsg, buffer_edge, **kwargs) + elif integration == 'spatial': + return create_spatial_grid(arrays, resolution, target_epsg, buffer_edge, **kwargs) + elif integration == 'temporal': + return create_temporal_grid(arrays, precision, **kwargs) + else: + raise ValueError(f"Integration type {integration} not supported.") + + +def create_spatiotemporal_grid(arrays: List[xr.DataArray], resolution: int, precision: str, target_epsg: Optional[int] = None, buffer_edge: Optional[float] = 0.01, **kwargs) -> xr.Dataset: + # figure out the bounding box of all coordinate axes + try: + minx = min([arr.x.values.min() for arr in arrays if 'x' in arr.indexes]) + maxx = max([arr.x.values.max() for arr in arrays if 'x' in arr.indexes]) + miny = min([arr.y.values.min() for arr in arrays if 'y' in arr.indexes]) + maxy = max([arr.y.values.max() for arr in arrays if 'y' in arr.indexes]) + except ValueError: + raise ValueError("No x/y coordinates found in any of the input arrays") + + # also run time-axis + try: + mint = min([arr.time.values.min() for arr in arrays if 'time' in arr.indexes]) + maxt = max([arr.time.values.max() for arr in arrays if 'time' in arr.indexes]) + except ValueError: + raise ValueError("No time coordinates found in any of the input arrays") + + # buffer the axes if needed, then the binning will span a bit wider than the actual extremes + if buffer_edge is not None: + minx = np.round(minx - buffer_edge * resolution) + maxx = np.round(maxx + buffer_edge * resolution) + miny = np.round(miny - buffer_edge * resolution) + maxy = np.round(maxy + buffer_edge * resolution) + + # build the axes + xaxis = np.arange(minx, maxx, resolution) + yaxis = np.arange(miny, maxy, resolution) + + # if we have a time axis, we need to build a 3D grid + taxis = pd.date_range(mint, maxt, freq=precision) + coords = {'time': ('time', taxis), 'y': ('y', yaxis), 'x': ('x', xaxis)} + + # build a master grid + grid = xr.Dataset(coords=coords) + + # set the CRS + if target_epsg is None: + crs = [a.rio.crs for a in arrays if a.rio.crs is not None][0] + else: + crs = CRS.from_epsg(target_epsg) + + # set the CRS + grid.rio.write_crs(crs, inplace=True) + + return grid + + +def create_spatial_grid(arrays: List[xr.DataArray], resolution: int, target_epsg: Optional[int] = None, buffer_edge: Optional[float] = 0.01, **kwargs) -> xr.Dataset: + # figure out the bounding box of all coordinate axes + try: + minx = min([arr.x.values.min() for arr in arrays if 'x' in arr.indexes]) + maxx = max([arr.x.values.max() for arr in arrays if 'x' in arr.indexes]) + miny = min([arr.y.values.min() for arr in arrays if 'y' in arr.indexes]) + maxy = max([arr.y.values.max() for arr in arrays if 'y' in arr.indexes]) + except ValueError: + raise ValueError("No x/y coordinates found in any of the input arrays") + + # buffer the axes if needed, then the binning will span a bit wider than the actual extremes + if buffer_edge is not None: + minx = np.round(minx - buffer_edge * resolution) + maxx = np.round(maxx + buffer_edge * resolution) + miny = np.round(miny - buffer_edge * resolution) + maxy = np.round(maxy + buffer_edge * resolution) + + # build the axes + xaxis = np.arange(minx, maxx, resolution) + yaxis = np.arange(miny, maxy, resolution) + coords = {'y': ('y', yaxis), 'x': ('x', xaxis)} + + # build a master grid + grid = xr.Dataset(coords=coords) + + # set the CRS + if target_epsg is None: + crs = [a.rio.crs for a in arrays if a.rio.crs is not None][0] + else: + crs = CRS.from_epsg(target_epsg) + + # set the CRS + grid.rio.write_crs(crs, inplace=True) + + return grid + + +def create_temporal_grid(arrays: List[xr.DataArray], precision: str, **kwargs) -> xr.Dataset: + # also run time-axis + try: + mint = min([arr.time.values.min() for arr in arrays if 'time' in arr.indexes]) + maxt = max([arr.time.values.max() for arr in arrays if 'time' in arr.indexes]) + except ValueError: + raise ValueError("No time coordinates found in any of the input arrays") + + # build the time coordinates + taxis = pd.date_range(mint, maxt, freq=precision) + coords = {'time': ('time', taxis)} + + # build a master grid + grid = xr.Dataset(coords=coords) + + return grid diff --git a/src/ingestor.py b/src/ingestor.py new file mode 100644 index 0000000..542b99a --- /dev/null +++ b/src/ingestor.py @@ -0,0 +1,214 @@ +from typing import List, Dict, Tuple, Optional +from pathlib import Path +import warnings + +from metacatalog.models import Entry +from tqdm import tqdm +import rioxarray +from rioxarray.crs import CRS +import xarray as xr +import numpy as np +from json2args.logger import logger +from pydantic import BaseModel + +from utils import FileMapping + +# these are the parameters comming from json2args +class Params(BaseModel): + precision: str + resolution: int + integration: str + aggregates: List[str] + + +def load_files(file_mapping: List[FileMapping], params: Params | Dict) -> xr.Dataset: + # handle the parameters + if isinstance(params, dict): + params = Params(**params) + + # create a container for the DataArrays + arrays = [] + + # iterate over the mapping and load every dataset as a xarray DataArray + for mapping in tqdm(file_mapping): + # unpack the mapping + entry = mapping['entry'] + data_path = Path(mapping['data_path']) + #logger.debug(f"Loading dataset from {data_path}") + + # check which loader to use + if data_path.is_dir(): + # load all files in the directory + #files = glob.glob(str(data_path / '**' / '*'), recursive=True) + files = list(data_path.rglob('*')) + else: + files = [str(data_path)] + + # check if this is a nc + if str(files[0]).lower().endswith('.nc'): + arr = load_raster(files, entry, target_epsg=params.target_epsg) + elif str(files[0]).lower().endswith('.tif') or files[0].lower().endswith('.tiff'): + arr = load_raster(files, entry, target_epsg=params.target_epsg) + else: + logger.error(f"File typ of Dataset not yet supported: {files[0]}") + continue + + # write a CRS if None is given and warn in that case + if not arr.rio.crs: + logger.warning(f"Dataset has no CRS. This might lead to unexpected results.") + arr = arr.rio.write_crs(CRS.from_epsg(4326)) + + # apped the array to the container + arrays.append(arr) + return arrays + # # next step is to aggregate to the target resolution and precision + # aggregates = aggregate_xarray(arr, entry, **params.model_dump()) + + # # add the arrays to the list + # for agg in aggregates: + # arrays.append(agg) + + # return arrays + + +def merge_arrays(arrays: List[xr.DataArray]) -> xr.Dataset: + # check if all were mapped to the same suggested UTM CRS + crs = set([arr.rio.crs for arr in arrays]) + if len(crs) > 1: + logger.warning(f"The aggregated dataset chunks could not be reprojected into a common CRS and now use different CRS: [{crs}]. This might be caused by missing CRS information in the original datasets.") + + # now overwrite the CRS + + # merge all arrays + merged = xr.merge(arrays, combine_attrs='drop_conflicts', join='outer', compat='no_conflicts') + + return merged + + +def _binned_spatial_index(arr: xr.DataArray, grid: xr.Dataset) -> Dict[str, Tuple[str, np.ndarray]]: + # extract the original coordinates + original_x = arr.x.values if 'x' in arr.indexes else [] + original_y = arr.y.values if 'y' in arr.indexes else [] + + # digitize the corrdinates to the grid + x_indices = np.digitize(original_x, grid.x.values) - 1 if 'x' in arr.indexes else [] + y_indices = np.digitize(original_y, grid.y.values) - 1 if 'y' in arr.indexes else [] + + # create the binned coordinates + binned_x = grid.x.values[x_indices] + binned_y = grid.y.values[y_indices] + + # bin the array by the binned coords + return {'y': ('y', binned_y), 'x': ('x', binned_x)} + + +def _binned_temporal_index(arr: xr.DataArray, grid: xr.Dataset) -> Dict[str, Tuple[str, np.ndarray]]: + # extract the original coordinates + original_t = arr.time.values.astype(int) if 'time' in arr.indexes else [] + + # digitize the corrdinates to the grid + t_indices = np.digitize(original_t, grid.time.values.astype(int)) - 1 + + # create the binned coordinates + binned_t = grid.time.values[t_indices] + + # bin the array by the binned coords + return {'time': ('time', binned_t)} + + +def bin_coordinate_axes(arr: xr.DataArray, grid: xr.Dataset) -> xr.DataArray: + coords_def = {} + + # get the time index if there is a time axis in the grid + if 'time' in grid.indexes: + coords_def.update(_binned_temporal_index(arr, grid)) + + # get the spatial indices if the grid has spatial axes + if 'y' in grid.indexes: + coords_def.update(_binned_spatial_index(arr, grid)) + + # replace the DataArray coordinates with the binned version + arr_binned = arr.assign_coords(coords_def) + + return arr_binned + + +def aggregate_xarray(arrays: List[xr.DataArray], grid: xr.Dataset, aggregates: List[str]) -> xr.Dataset: + # make a deep copy of grid + cube = grid.copy(deep=True) + + for arr in tqdm(arrays): + arr_binned = bin_coordinate_axes(arr, grid) + + # groupby each of the passed aggregates + for aggregate in aggregates: + # groupby this aggregate over all axes + + # use only the axes that are in the binned array AND in the grid + axes = [ax for ax in grid.indexes if ax in arr_binned.indexes] + + agg = arr_binned.to_dataframe()[[v for v in arr_binned.data_vars]].groupby(axes).aggregate(aggregate).to_xarray() + #agg = agg.groupby('y').reduce(getattr(np, aggregate)).groupby('x').reduce(getattr(np, aggregate)) + #agg = arr_binned[[v for v in arr_binned.data_vars]].groupby(**{ax: xr.groupers.UniqueGrouper() for ax in axes}).reduce(getattr(np, aggregate)) + # add all data_variables to the cube + for data_name in agg.data_vars: + cube[f"{data_name}_{aggregate}"] = agg[data_name] + + # return + return cube + +def load_raster(files: List[str], entry: Entry, target_epsg: Optional[int] = None) -> xr.DataArray: + # load the variable name + var_names = entry.datasource.variable_names + + # load the data + with warnings.catch_warnings(): + warnings.simplefilter('ignore') + if str(files[0]).lower().endswith('.tif') or str(files[0]).lower().endswith('.tiff'): + xarr = xr.open_mfdataset(files, decode_coords='all')[var_names] + else: + xarr = xr.open_mfdataset(files, decode_coords='all', engine='h5netcdf')[var_names] + + # check that each chunk as a CRS + crs = xarr.rio.crs + if crs is None: + # TODO: in the future we can either remove the dataset here or just assume its WGS84 + logger.warning(f"Dataset has no CRS. This might lead to unexpected results.") + crs = CRS.from_epsg(4326) + + # load the used indexes + indices = [i for i in xarr.indexes] + to_squeeze = [] + for idx in indices: + if idx not in ['time', 'x', 'y']: + logger.warning(f"Dataset in file <{files[0]}> has an index <{idx}> that is not in ['time', 'x', 'y']. This might lead to unexpected results, as we will drop it.") + to_squeeze.append(idx) + if len(to_squeeze) > 0: + xarr = xarr.squeeze(to_squeeze, drop=True) + + # drop all that is not an indexed coordinate and not a variable + names_set = set([*[c for c in xarr.coords], *[c for c in xarr.dims]]) + xarr = xarr.drop_vars([c for c in names_set if c not in ['time', 'x', 'y', *var_names]]) + + # write back the CRS + xarr = xarr.rio.write_crs(crs) + + # handle the target CRS system + if target_epsg is None: + try: + target_crs = xarr.rio.estimate_utm_crs() + except RuntimeError as e: + logger.error(f"No CRS found for dataset in file <{files[0]}>. An UTM CRS could not be inferred. Error: {str(e)}") + else: + target_crs = CRS.from_epsg(target_epsg) + + # reproject and copy + xarr = xarr.rio.reproject(target_crs) + out = xarr.copy() + xarr.close() + + return out + + +def load_parquet(files: List[str], entry: Entry) -> xr.DataArray: + raise NotImplementedError("Parquet files are not yet supported.") diff --git a/src/run.py b/src/run.py index abf4230..dd13f03 100644 --- a/src/run.py +++ b/src/run.py @@ -1,30 +1,67 @@ import os -from datetime import datetime as dt -from pprint import pprint +import sys +import warnings +import time from json2args import get_parameter -from json2args.data import get_data +from json2args.logger import logger + +from utils import get_file_mapping +import ingestor +import gridding # parse parameters -kwargs = get_parameter() -data = get_data(as_dict=True) +param = get_parameter(typed=True) + +# TODO: the dataset path inside the container is hardcoded here. We might want to make this a parameter +datasets_path = os.getenv('DATASETS_PATH', '/in/datasets') # check if a toolname was set in env -toolname = os.environ.get('TOOL_RUN', 'foobar').lower() - -# switch the tool -if toolname == 'foobar': - # RUN the tool here and create the output in /out - print('This toolbox does not include any tool. Did you run the template?\n') - - # write parameters to STDOUT.log - pprint(kwargs) - - for name, ds in data.items(): - print(f"\n### {name}") - print(ds) - - -# In any other case, it was not clear which tool to run -else: - raise AttributeError(f"[{dt.now().isocalendar()}] Either no TOOL_RUN environment variable available, or '{toolname}' is not valid.\n") +toolname = os.environ.get('TOOL_RUN', 'geocube').lower() + +# check if the tool is valid +if toolname != 'geocube': + logger.error(f"The entrypoint '{toolname}' is not supported. Check the tool.yml for valid endpoints.") + sys.exit(1) + +# run the main tool - start a timer +start = time.time() + +# info that the GeoCube tool is starting +logger.info("##TOOL START - Geocube") +logger.debug(f"Parameters passed to the tool: params = {repr(param)}") + +# create a file - mapping +with warnings.catch_warnings(record=True) as w: + file_mapping = get_file_mapping(datasets_path, metadata_files='*.metadata.json') + logger.debug(f"Found {len(file_mapping)} dataset folders with metadata.") + + # log the warnings if any + for warn in w: + logger.warning(warn.message) + +# load all datasets +arrays = ingestor.load_files(file_mapping=file_mapping, params=param) +logger.debug(f"arrays = ingestor.loader_files(file_mapping={[{'data_path': m['data_path'], 'entry': 'Metadata(id=%d)' % m['entry'].id} for m in file_mapping]}, params=params)") + +# create a common Grid +grid = gridding.create_grid(arrays=arrays, **param.model_dump()) +logger.debug(f"grid = gridding.create_grid(arrays=arrays, **{param})") + +# aggregate to the grid +cube = ingestor.aggregate_xarray(arrays, grid, param.aggregates) +logger.debug(f"cube = ingestor.aggregate_xarray(arrays=arrays, grid=grid, aggregates=[{param.aggregates}])") + +# save the cube +# TODO this has to be implemented in a more helpful way +# start saving +start_save = time.time() +cube.to_netcdf('/out/cube.nc', engine='h5netcdf') +end_save = time.time() +logger.debug("cube.to_netcdf('/out/cube.nc', engine='h5netcdf')") +logger.info(f"Saving the cube took {end_save - start_save:.2f} seconds") + +# inform that the tool has finished +end = time.time() +logger.info(f"Total runtime: {end - start:.2f} seconds") +logger.info("##TOOL FINISH - Geocube") \ No newline at end of file diff --git a/src/tool.yml b/src/tool.yml index 23ff90c..bc0564e 100644 --- a/src/tool.yml +++ b/src/tool.yml @@ -8,25 +8,51 @@ tools: The data is aggregated to a target precision (temporal) and spatial resolution and then ingested into a geocube that is stored as a netCDF file. parameters: - foo_int: - type: integer - foo_float: - type: float - foo_string: - type: string - foo_enum: + integration: type: enum values: - - foo - - bar - - baz - foo_array: + - spatial + - temporal + - spatiotemporal + description: | + This is the aggregation to be used for ingesting the data into a common data cube. + If spatial or temporal is used, the data is aggregated over the other dimension(s), and + will only have the requested dimension left (ie. 'spatial' will not have a datetime-axis anymore). + 'spatiotemporal' will keep both dimensions. + precision: + type: string + description: | + This is the temporal precision to be used for temporal or spatiotemporal aggreations. It is either + an ISO8601 duration string or a shorthand frequency supported by pandas using m,d,H,M,Y prefixed by a + multiplier like 6H or 2Y. + resolution: type: integer + description: | + This is the spatial resolution to be used for spatial or spatiotemporal aggregations. It is an integer + that is used to resample the data to a grid with the given resolution. The resolution is applied to both + axis of the data and is measured in the axis units. If the input data is generated by the V-FOR-WaTer + data loader, the input data has been re-projected to EPSG:3857 and the resolution is then in meters. + aggregates: + type: enum array: true - data: - foo_matrix: - extension: dat - description: A matrix file that can be read by numpy - foo_csv: - extension: csv - description: A standard formatted CSV file, for autoloading using pandas + values: + - mean + - median + - sum + - min + - max + - std + default: [mean, min, max, std, sum] + description: | + This is the aggregation to be used for the data. The data is aggregated over the given axis using the + given aggregation function. The aggregation is applied to all data variables in the input data. That means + if the input folder contains 5 variables and the default aggregations are used, the output data cube will + contain 5 * 5 = 25 variables. + Note that it is currently not possible to assign different aggregations to different variables, but that + is planned for the future. + target_epsg: + type: integer + description: | + The target CRS EPSG number to use for the data cube. This is an optional parameter. If it is not given, + the tool tries to guess the best UTM CRS based on the bounding box of the data. + optional: true