diff --git a/README.md b/README.md index d97dc05..34d0a15 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,26 @@ $ python -m ipykernel install --user --name xaj --display-name "xaj" ### Prepare data -To use your own data to run the model, we set a data interface, here is the convention: +To use your own data to run the model, you can prepare the data in the required format: + +For one basin (We only support one basin now), the data is put in one csv/txt file. +There are three necessary columns: "time", "prcp", "pet", and "flow". "time" is the time series, "prcp" is the precipitation, "pet" is the potential evapotranspiration, and "flow" is the observed streamflow. +The time series should be continuous (NaN values are allowed), and the time step should be the same for all columns. The time format should be "YYYY-MM-DD HH:MM:SS". The data should be sorted by time. + +You can run a checker function to see if the data is in the right format: + +```Shell +$ cd hydromodel/scripts +$ python check_data_format.py --data_file +``` + +Then, you can use the data_preprocess module to transform the data to the required format: + +```Shell +$ python datapreprocess4calibrate.py --data --exp +``` + +The data will be transformed in data interface, here is the convention: - All input data for models are three-dimensional NumPy array: [time, basin, variable], which means "time" series data for "variables" in "basins" @@ -90,36 +109,6 @@ More details about the analysis could be seen in show_results.ipynb file. It is Now we only provide some simple statistics calculations. -### How to make the sample data - -In this part, we simply introduce how we prepare the sample data. - -Here We provide an example for some basins in [the CAMELS dataset](https://ral.ucar.edu/solutions/products/camels), a very common used dataset for hydrological model evaluation. - -You can download CAMELS according to this [instruction](https://github.com/OuyangWenyu/hydrodataset). - -Check if you have successfully downloaded and put it in the right place. - -```Shell -$ conda activate xaj -$ python ->>> import os ->>> from hydrodataset.camels import Camels ->>> camels = Camels(data_path=os.path.join("camels", "camels_us"), download=False, region="US") -``` - -if any error is raised, please see this [instruction](https://github.com/OuyangWenyu/hydrodataset) again. - -Then, we provide a script to transform data organized like CAMELS to the required format, you can use it like this: - -```Shell -$ cd hydromodel/app -$ python datapreprocess4calibrate.py --camels_dir --exp --calibrate_period --test_period --basin_id -# such as: python datapreprocess4calibrate.py --camels_name camels_us --exp xxx --calibrate_period 1990-10-01 2000-10-01 --test_period 2000-10-01 2010-10-01 --basin_id 01439500 06885500 08104900 09510200 -``` - -Then you can see some files in hydromodel/example/xxx directory. - ## Why does hydro-model-xaj exist When we want to learn about the rainfall-runoff process and make forecasts for floods, etc. We often use classic hydrological diff --git a/env-dev.yml b/env-dev.yml index 2da99b2..2fea46a 100644 --- a/env-dev.yml +++ b/env-dev.yml @@ -26,4 +26,4 @@ dependencies: - twine - bump2version - muskingumcunge - - hydrodataset + - hydrodata diff --git a/hydromodel/datasets/__init__.py b/hydromodel/datasets/__init__.py index e69de29..f513940 100644 --- a/hydromodel/datasets/__init__.py +++ b/hydromodel/datasets/__init__.py @@ -0,0 +1,44 @@ +PRCP_NAME = "prcp(mm/day)" +PET_NAME = "pet(mm/day)" +ET_NAME = "et(mm/day)" +FLOW_NAME = "flow(m^3/s)" +NODE_FLOW_NAME = "node1_flow(m^3/s)" +AREA_NAME = "area(km^2)" +TIME_NAME = "time" +TIME_FORMAT = "%Y-%m-%d %H:%M:%S" +ID_NAME = "id" +NAME_NAME = "name" + + +def remove_unit_from_name(name_with_unit): + """ + Remove the unit from a variable name. + + Parameters + ---------- + name_with_unit : str + The name of the variable including its unit, e.g., "prcp(mm/day)". + + Returns + ------- + str + The name of the variable without the unit, e.g., "prcp". + """ + return name_with_unit.split("(")[0] + + +def get_unit_from_name(name_with_unit): + """ + Extract the unit from a variable name. + + Parameters + ---------- + name_with_unit : str + The name of the variable including its unit, e.g., "prcp(mm/day)". + + Returns + ------- + str + The unit of the variable, e.g., "mm/day". + """ + return name_with_unit.split("(")[1].strip(")") if "(" in name_with_unit else "" diff --git a/hydromodel/datasets/camels_format_data.py b/hydromodel/datasets/camels_format_data.py deleted file mode 100644 index 9b92714..0000000 --- a/hydromodel/datasets/camels_format_data.py +++ /dev/null @@ -1,392 +0,0 @@ -import collections -import os -from typing import Union -import pandas as pd -import numpy as np -from pandas.core.dtypes.common import is_string_dtype, is_numeric_dtype -from tqdm import tqdm - -from hydroutils import hydro_time -import hydrodataset - - -class MyCamels(hydrodataset.Camels): - def __init__(self, data_path, download=False, region: str = "CC"): - """ - Initialization for my own CAMELS format dataset - - Parameters - ---------- - data_path - where we put the dataset - download - if true, download - region - remember the name of your own region - """ - hydrodataset.camels.CAMELS_REGIONS = hydrodataset.camels.CAMELS_REGIONS + [ - region - ] - super().__init__(data_path, download, region) - - def set_data_source_describe(self) -> collections.OrderedDict: - """ - Introduce the files in the dataset and list their location in the file system - - Returns - ------- - collections.OrderedDict - the description for a CAMELS dataset - """ - camels_db = self.data_source_dir - # shp files of basins - camels_shp_files_dir = os.path.join(camels_db, "basin_boudaries") - # attr, flow and forcing data are all in the same dir. each basin has one dir. - flow_dir = os.path.join(camels_db, "streamflow") - sm_dir = os.path.join(camels_db, "soil_moisture") - et_dir = os.path.join(camels_db, "evapotranspiration") - forcing_dir = os.path.join(camels_db, "basin_mean_forcing") - attr_dir = os.path.join(camels_db, "attribute") - # no gauge id file for CAMELS_CC, just read from any attribute file - gauge_id_file = os.path.join(camels_db, "gage_points.csv") - attr_key_lst = [ - "climate", - "geology", - "land_cover", - "permeability_porosity", - "root_depth", - "soil", - "topo_elev_slope", - "topo_shape_factors", - ] - return collections.OrderedDict( - CAMELS_DIR=camels_db, - CAMELS_FLOW_DIR=flow_dir, - CAMELS_SM_DIR=sm_dir, - CAMELS_ET_DIR=et_dir, - CAMELS_FORCING_DIR=forcing_dir, - CAMELS_ATTR_DIR=attr_dir, - CAMELS_ATTR_KEY_LST=attr_key_lst, - CAMELS_GAUGE_FILE=gauge_id_file, - CAMELS_BASINS_SHP_DIR=camels_shp_files_dir, - ) - - def read_site_info(self) -> pd.DataFrame: - """ - Read the basic information of gages in a CAMELS dataset - - Returns - ------- - pd.DataFrame - basic info of gages - """ - camels_file = self.data_source_description["CAMELS_GAUGE_FILE"] - data = pd.read_csv(camels_file, sep=",", dtype={"gage_id": str}) - return data - - def get_constant_cols(self) -> np.array: - """ - all readable attrs in CAMELS - - Returns - ------- - np.array - attribute types - """ - data_folder = self.data_source_description["CAMELS_ATTR_DIR"] - files = np.sort(os.listdir(data_folder)) - attr_types = [] - for file_ in files: - file = os.path.join(data_folder, file_) - attr_tmp = pd.read_csv(file, sep=",", dtype={"gage_id": str}) - attr_types = attr_types + attr_tmp.columns[1:].values.tolist() - return np.array(attr_types) - - def get_relevant_cols(self) -> np.array: - """ - all readable forcing types - - Returns - ------- - np.array - forcing types - """ - forcing_dir = self.data_source_description["CAMELS_FORCING_DIR"] - forcing_file = os.path.join(forcing_dir, os.listdir(forcing_dir)[0]) - forcing_tmp = pd.read_csv(forcing_file, sep="\s+", dtype={"gage_id": str}) - return forcing_tmp.columns.values - - def get_target_cols(self) -> np.array: - """ - For CAMELS, the target vars are streamflows - - Returns - ------- - np.array - streamflow types - """ - # ssm is the surface soil moisture - return np.array(["Q", "ssm", "ET"]) - - def read_object_ids(self, **kwargs) -> np.array: - """ - read station ids - - Parameters - ---------- - **kwargs - optional params if needed - - Returns - ------- - np.array - gage/station ids - """ - return self.camels_sites["gage_id"].values - - def read_target_cols( - self, - gage_id_lst: Union[list, np.array] = None, - t_range: list = None, - target_cols: Union[list, np.array] = None, - **kwargs - ) -> np.array: - """ - read target values; for all CAMELS, they are streamflows except for CAMELS-CC (inlcude soil moisture) - - default target_cols is an one-value list - Notice: the unit of target outputs in different regions are not totally same - - Parameters - ---------- - gage_id_lst - station ids - t_range - the time range, for example, ["1990-01-01", "2000-01-01"] - target_cols - the default is None, but we neea at least one default target. - kwargs - some other params if needed - - Returns - ------- - np.array - streamflow data, 3-dim [station, time, streamflow], unit is m3/s - """ - if target_cols is None: - return np.array([]) - else: - nf = len(target_cols) - t_range_list = hydro_time.t_range_days(t_range) - nt = t_range_list.shape[0] - y = np.full([len(gage_id_lst), nt, nf], np.nan) - for j in tqdm(range(len(target_cols)), desc="Read Q/SSM/ET data of CAMELS-CC"): - for k in tqdm(range(len(gage_id_lst))): - if target_cols[j] == "ssm": - sm_file = os.path.join( - self.data_source_description["CAMELS_SM_DIR"], - gage_id_lst[k] + "_lump_nasa_usda_smap.txt", - ) - sm_data = pd.read_csv(sm_file, sep=",") - df_date = sm_data[["Year", "Mnth", "Day"]] - df_date.columns = ["year", "month", "day"] - date = pd.to_datetime(df_date).values.astype("datetime64[D]") - [c, ind1, ind2] = np.intersect1d( - date, t_range_list, return_indices=True - ) - y[k, ind2, j] = sm_data["ssm(mm)"].values[ind1] - elif target_cols[j] == "ET": - et_file = os.path.join( - self.data_source_description["CAMELS_ET_DIR"], - gage_id_lst[k] + "_lump_modis16a2v006_et.txt", - ) - et_data = pd.read_csv(et_file, sep=",") - df_date = et_data[["Year", "Mnth", "Day"]] - df_date.columns = ["year", "month", "day"] - # all dates in a table - date = pd.to_datetime(df_date).values.astype("datetime64[D]") - if ( - np.datetime64(str(date[-1].astype(object).year) + "-12-31") - > date[-1] - > np.datetime64(str(date[-1].astype(object).year) + "-12-24") - ): - # the final date in all dates, if it is a date in the end of a year, its internal is 5 or 6 - final_date = np.datetime64( - str(date[-1].astype(object).year + 1) + "-01-01" - ) - else: - final_date = date[-1] + np.timedelta64(8, "D") - date_all = hydro_time.t_range_days( - hydro_time.t_days_lst2range([date[0], final_date]) - ) - t_range_final = np.intersect1d(date_all, t_range_list) - [_, ind3, ind4] = np.intersect1d( - date, t_range_final, return_indices=True - ) - - days_interval = [y - x for x, y in zip(ind4, ind4[1:])] - # get the final range - if ( - t_range_final[-1].item().month == 12 - and t_range_final[-1].item().day == 31 - ): - final_timedelta = ( - t_range_final[-1].item() - t_range_final[ind4[-1]].item() - ) - final_day_interval = [final_timedelta.days] - else: - final_day_interval = [8] - days_interval = np.array(days_interval + final_day_interval) - # there may be some missing data, so that some interval will be larger than 8 - days_interval[np.where(days_interval > 8)] = 8 - # we use mean value rather than sum, because less error when predicting for every day - # for example, mean: [1, x, x, 2, x, x, 3] is obs, [1, 1, 1, 2, 2, 2, 3] is pred, - # sum: [3, x, x, 6, x, x, 9] is obs, [1, 1, 1, 2, 2, 2, 3] is pred - # the final day's error is significant when using sum - # although a better way is to extend [1, 1, 1, 2, 2, 2, 3] to [1, 1, 1, 2, 2, 2, 3, 3, 3] - y[k, ind4, j] = ( - et_data["ET(kg/m^2/8day)"][ind3] / days_interval - ) - # More notice: it is only for unified process to divide by 35.314666721489 - # notice the value's unit is kg/m2/8d and has a scale factor 0.1 - # more information can be seen here: https://www.ntsg.umt.edu/project/modis/mod16.php - # it says: "The users should multiply 0.1 to get the real ET/PET values in mm/8day or mm/month" - else: - # only one streamflow type: Q - flow_file = os.path.join( - self.data_source_description["CAMELS_FLOW_DIR"], - gage_id_lst[k] + ".csv", - ) - flow_data = pd.read_csv(flow_file, sep=",") - date = pd.to_datetime(flow_data["DATE"]).values.astype( - "datetime64[D]" - ) - [c, ind1, ind2] = np.intersect1d( - date, t_range_list, return_indices=True - ) - y[k, ind2, j] = flow_data["Q"].values[ind1] - return y - - def read_relevant_cols( - self, - gage_id_lst: list = None, - t_range: list = None, - var_lst: list = None, - forcing_type="daymet", - ) -> np.array: - """ - Read forcing data - - Parameters - ---------- - gage_id_lst - station ids - t_range - the time range, for example, ["1990-01-01", "2000-01-01"] - var_lst - forcing variable types - forcing_type - only for CAMELS-US, don't care it - Returns - ------- - np.array - forcing data - """ - t_range_list = hydro_time.t_range_days(t_range) - nt = t_range_list.shape[0] - x = np.full([len(gage_id_lst), nt, len(var_lst)], np.nan) - for k in tqdm(range(len(gage_id_lst)), desc="Read forcing data of CAMELS-CC"): - forcing_file = os.path.join( - self.data_source_description["CAMELS_FORCING_DIR"], - gage_id_lst[k] + "_lump_era5_land_forcing.txt", - ) - forcing_data = pd.read_csv(forcing_file, sep=" ") - df_date = forcing_data[["Year", "Mnth", "Day"]] - df_date.columns = ["year", "month", "day"] - date = pd.to_datetime(df_date).values.astype("datetime64[D]") - - [c, ind1, ind2] = np.intersect1d(date, t_range_list, return_indices=True) - for j in range(len(var_lst)): - if "evaporation" in var_lst[j]: - # evaporation value are all negative (maybe upward flux is marked as negative) - x[k, ind2, j] = forcing_data[var_lst[j]].values[ind1] * -1 * 1e3 - # unit of prep and pet is m, tran them to mm - elif "precipitation" in var_lst[j]: - prcp = forcing_data[var_lst[j]].values - # there are a few negative values for prcp, set them 0 - prcp[prcp < 0] = 0.0 - x[k, ind2, j] = prcp[ind1] * 1e3 - else: - x[k, ind2, j] = forcing_data[var_lst[j]].values[ind1] - return x - - def read_attr_all(self): - data_folder = self.data_source_description["CAMELS_ATTR_DIR"] - key_lst = self.data_source_description["CAMELS_ATTR_KEY_LST"] - f_dict = dict() # factorize dict - var_dict = dict() - var_lst = list() - out_lst = list() - gage_dict = self.camels_sites - camels_str = "" - sep_ = "," - for key in key_lst: - data_file = os.path.join(data_folder, key + ".csv") - data_temp = pd.read_csv(data_file, sep=sep_) - var_lst_temp = list(data_temp.columns[1:]) - var_dict[key] = var_lst_temp - var_lst.extend(var_lst_temp) - k = 0 - gage_id_key = "gage_id" - n_gage = len(gage_dict[gage_id_key].values) - out_temp = np.full([n_gage, len(var_lst_temp)], np.nan) - for field in var_lst_temp: - if is_string_dtype(data_temp[field]): - value, ref = pd.factorize(data_temp[field], sort=True) - out_temp[:, k] = value - f_dict[field] = ref.tolist() - elif is_numeric_dtype(data_temp[field]): - out_temp[:, k] = data_temp[field].values - k = k + 1 - out_lst.append(out_temp) - out = np.concatenate(out_lst, 1) - return out, var_lst, var_dict, f_dict - - def read_constant_cols( - self, gage_id_lst=None, var_lst=None, is_return_dict=False - ) -> Union[tuple, np.array]: - """ - Read Attributes data - - Parameters - ---------- - gage_id_lst - station ids - var_lst - attribute variable types - is_return_dict - if true, return var_dict and f_dict for CAMELS_US - Returns - ------- - Union[tuple, np.array] - if attr var type is str, return factorized data. - When we need to know what a factorized value represents, we need return a tuple; - otherwise just return an array - """ - attr_all, var_lst_all, var_dict, f_dict = self.read_attr_all() - ind_var = [var_lst_all.index(var) for var in var_lst] - id_lst_all = self.read_object_ids() - # Notice the sequence of station ids ! Some id_lst_all are not sorted, so don't use np.intersect1d - ind_grid = [id_lst_all.tolist().index(tmp) for tmp in gage_id_lst] - temp = attr_all[ind_grid, :] - out = temp[:, ind_var] - if is_return_dict: - return out, var_dict, f_dict - else: - return out - - def read_area(self, object_ids) -> np.array: - return self.read_constant_cols(object_ids, ["Area"], is_return_dict=False) - - def read_mean_prep(self, object_ids) -> np.array: - return self.read_constant_cols(object_ids, ["p_mean"], is_return_dict=False) diff --git a/hydromodel/datasets/data_preprocess.py b/hydromodel/datasets/data_preprocess.py index 0b46221..e29ec4b 100644 --- a/hydromodel/datasets/data_preprocess.py +++ b/hydromodel/datasets/data_preprocess.py @@ -1,110 +1,257 @@ """ Author: Wenyu Ouyang Date: 2022-10-25 21:16:22 -LastEditTime: 2024-03-21 18:36:25 +LastEditTime: 2024-03-25 14:50:32 LastEditors: Wenyu Ouyang Description: preprocess data for models in hydro-model-xaj -FilePath: \hydro-model-xaj\hydromodel\data\data_preprocess.py +FilePath: \hydro-model-xaj\hydromodel\datasets\data_preprocess.py Copyright (c) 2021-2022 Wenyu Ouyang. All rights reserved. """ +import os +import re import numpy as np import pandas as pd from sklearn.model_selection import KFold -import sys -import os -from pathlib import Path from collections import OrderedDict +import xarray as xr -import hydrodataset from hydroutils import hydro_time, hydro_file -sys.path.append(os.path.dirname(Path(os.path.abspath(__file__)).parent.parent)) -from hydromodel.datasets import camels_format_data +from hydromodel import CACHE_DIR +from hydromodel.datasets import * -def trans_camels_format_to_xaj_format( - camels_data_dir, basin_ids: list, t_range: list, json_file, npy_file -): - """tranform data with camels format to hydro-model-xaj format +def check_tsdata_format(file_path): + """ + Checks the time-series data for required and optional columns + used in hydrological modeling. + + Parameters + ---------- + file_path : str + Path to the hydrological data file. + + Returns + ------- + bool + True if the data file format is correct, False otherwise. + """ + # prcp means precipitation, pet means potential evapotranspiration, flow means streamflow + required_columns = [ + TIME_NAME, + PRCP_NAME, + PET_NAME, + FLOW_NAME, + ] + # et means evapotranspiration, node_flow means upstream streamflow + # node1 means the first upstream node, node2 means the second upstream node, etc. + # these nodes are the nearest upstream nodes of the target node + # meaning: if node1_flow, node2_flow, and more upstream nodes are parellel. + # No serial relationship + optional_columns = [ET_NAME, NODE_FLOW_NAME] + + try: + data = pd.read_csv(file_path) + + # Check required columns + if any(column not in data.columns for column in required_columns): + print(f"Missing required columns in file: {file_path}") + return False + + # Check optional columns + for column in optional_columns: + if column not in data.columns: + print( + f"Optional column '{column}' not found in file: {file_path}, but it's okay." + ) + # Check node_flow columns (flexible number of nodes) + node_flow_columns = [ + col for col in data.columns if re.match(r"node\d+_flow", col) + ] + if not node_flow_columns: + print(f"No 'node_flow' columns found in file: {file_path}, but it's okay.") + + # Check time format and sorting + try: + data["time"] = pd.to_datetime(data["time"], format=TIME_FORMAT) + except ValueError: + print(f"Time format is incorrect in file: {file_path}") + return False + + if not data["time"].is_monotonic_increasing: + print(f"Data is not sorted by time in file: {file_path}") + return False + + # Check for consistent time intervals + time_differences = ( + data["time"].diff().dropna() + ) # Calculate differences and remove NaN + if not all(time_differences == time_differences.iloc[0]): + print(f"Time series is not at consistent intervals in file: {file_path}") + return False + + return True - CAMELS format could be seen here: https://gdex.ucar.edu/dataset/camels/file.html - download basin_timeseries_v1p2_metForcing_obsFlow.zip and unzip it, you will see the format of data + except Exception as e: + print(f"Error reading file {file_path}: {e}") + return False - hydro-model-xaj format: see README.md file -- https://github.com/OuyangWenyu/hydro-model-xaj + +def check_basin_attr_format(file_path): + """ + Checks the basin attributes data for required columns. Parameters ---------- - camels_data_dir : str - the directory of your CAMELS format data - basin_ids : list - a list of basins' ids which you choose for modeling - t_range: list - for example, ["2014-10-01", "2021-10-01"] - json_file: str - where to save the json file - npy_file: str - where to save the npy file + file_path : str + Path to the basin attributes data file. + + Returns + ------- + bool + True if the basin attributes file format is correct, False otherwise. """ - if camels_data_dir.stem == "camels_cc": - # this is for the author's own data format, for camels we don't need this - camels = camels_format_data.MyCamels(camels_data_dir) - p_pet = camels.read_relevant_cols( - gage_id_lst=basin_ids, - t_range=t_range, - var_lst=["total_precipitation", "potential_evaporation"], - ) - q = camels.read_target_cols( - gage_id_lst=basin_ids, t_range=t_range, target_cols=["Q"] - ) - else: - region = camels_data_dir.stem.split("_")[-1].upper() - camels = hydrodataset.Camels(camels_data_dir, region=region) - flow_tag = camels.get_target_cols() - ft3persec2m3persec = 1 / 35.314666721489 - if region == "US": - pet = camels.read_camels_us_model_output_data( - basin_ids, t_range, var_lst=["PET"] - ) - p = camels.read_relevant_cols( - gage_id_lst=basin_ids, - t_range=t_range, - var_lst=["prcp"], + required_columns = [ID_NAME, NAME_NAME, AREA_NAME] + + try: + data = pd.read_csv(file_path) + + if missing_required_columns := [ + col for col in required_columns if col not in data.columns + ]: + print( + f"Missing required columns in basin attributes file: {file_path}: {missing_required_columns}" ) - p_pet = np.concatenate([p, pet], axis=2) - else: - raise NotImplementedError("Only CAMELS-US is supported now.") - q = camels.read_target_cols( - gage_id_lst=basin_ids, t_range=t_range, target_cols=flow_tag - ) - # TODO: camels's streamflow data is in ft3/s, need refactor to unify the unit - q = q * ft3persec2m3persec - # generally streamflow's unit is m3/s, we transform it to mm/day - # basin areas also should be saved, - # we will use it to transform streamflow's unit to m3/s after we finished predicting - basin_area = camels.read_area(basin_ids) - # 1 km2 = 10^6 m2 - km2tom2 = 1e6 - # 1 m = 1000 mm - mtomm = 1000 - # 1 day = 24 * 3600 s - daytos = 24 * 3600 - temparea = np.tile(basin_area, (1, q.shape[1])) - q = np.expand_dims(q[:, :, 0] / (temparea * km2tom2) * mtomm * daytos, axis=2) - - date_lst = [str(t)[:10] for t in hydro_time.t_range_days(t_range)] - data_info = OrderedDict( - { - "time": date_lst, - "basin": basin_ids, - "variable": ["prcp(mm/day)", "pet(mm/day)", "streamflow(mm/day)"], - "area": basin_area.flatten().tolist(), - } - ) - hydro_file.serialize_json(data_info, json_file) - hydro_file.serialize_numpy( - np.swapaxes(np.concatenate((p_pet, q), axis=2), 0, 1), npy_file - ) + return False + + # Additional checks (e.g., datatype checks, non-empty rows) can be added here + + return True + + except Exception as e: + print(f"Error reading basin attributes file {file_path}: {e}") + return False + + +def check_folder_contents(folder_path, basin_attr_file="basin_attributes.csv"): + """ + Checks all time series data files in a folder and a single basin attributes file. + + Parameters + ---------- + folder_path : str + Path to the folder containing the time series data files. + basin_attr_file : str + Filename of the basin attributes file, default is "basin_attributes.csv". + + Returns + ------- + bool + True if all files in the folder and the basin attributes file are correct, False otherwise. + """ + # 检查流域属性文件 + if not check_basin_attr_format(os.path.join(folder_path, basin_attr_file)): + return False + + # 获取流域ID列表 + basin_ids = pd.read_csv(os.path.join(folder_path, basin_attr_file))["id"].tolist() + + # 检查每个流域的时序文件 + for basin_id in basin_ids: + file_name = f"basin_{basin_id}.csv" + file_path = os.path.join(folder_path, file_name) + + if not os.path.exists(file_path): + print(f"Missing time series data file for basin {basin_id}: {file_path}") + return False + + if not check_tsdata_format(file_path): + print(f"Time series data format check failed for file: {file_path}") + return False + + return True + + +def process_and_save_data_as_nc( + folder_path, + save_folder=CACHE_DIR, + nc_attrs_file="attributes.nc", + nc_ts_file="timeseries.nc", +): + # 验证文件夹内容 + if not check_folder_contents(folder_path): + print("Folder contents validation failed.") + return False + + # 读取流域属性 + basin_attr_file = os.path.join(folder_path, "basin_attributes.csv") + basin_attrs = pd.read_csv(basin_attr_file) + + # 创建属性数据集 + ds_attrs = xr.Dataset.from_dataframe(basin_attrs.set_index(ID_NAME)) + new_column_names = {} + units = {} + + for col in basin_attrs.columns: + new_name = remove_unit_from_name(col) + unit = get_unit_from_name(col) + new_column_names[col] = new_name + if unit: + units[new_name] = unit + + basin_attrs.rename(columns=new_column_names, inplace=True) + + # 创建不带单位的数据集 + ds_attrs = xr.Dataset.from_dataframe(basin_attrs.set_index(ID_NAME)) + + # 为有单位的变量添加单位属性 + for var_name, unit in units.items(): + ds_attrs[var_name].attrs["units"] = unit + # 初始化时序数据集 + ds_ts = xr.Dataset() + + # 初始化用于保存单位的字典 + units = {} + + # 获取流域ID列表 + basin_ids = basin_attrs[ID_NAME].tolist() + + # 为每个流域读取并处理时序数据 + for i, basin_id in enumerate(basin_ids): + file_name = f"basin_{basin_id}.csv" + file_path = os.path.join(folder_path, file_name) + data = pd.read_csv(file_path) + data[TIME_NAME] = pd.to_datetime(data[TIME_NAME]) + + # 在处理第一个流域时构建单位字典 + if i == 0: + for col in data.columns: + new_name = remove_unit_from_name(col) + if unit := get_unit_from_name(col): + units[new_name] = unit + + # 修改列名以移除单位 + renamed_columns = {col: remove_unit_from_name(col) for col in data.columns} + data.rename(columns=renamed_columns, inplace=True) + + # 将 DataFrame 转换为 xarray Dataset + ds_basin = xr.Dataset.from_dataframe(data.set_index(TIME_NAME)) + + # 为每个变量设置单位属性 + for var in ds_basin.data_vars: + if var in units: + ds_basin[var].attrs["units"] = units[var] + # 添加 basin 坐标 + ds_basin = ds_basin.expand_dims({"basin": [basin_id]}) + # 合并到主数据集 + ds_ts = xr.merge([ds_ts, ds_basin], compat="no_conflicts") + + # 保存为 NetCDF 文件 + ds_attrs.to_netcdf(os.path.join(save_folder, nc_attrs_file)) + ds_ts.to_netcdf(os.path.join(save_folder, nc_ts_file)) + + return True def split_train_test(json_file, npy_file, train_period, test_period): diff --git a/requirements.txt b/requirements.txt index f0de31a..c031e65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,4 @@ requests muskingumcunge -hydrodataset +hydrodata diff --git a/requirements_dev.txt b/requirements_dev.txt index 522f725..d9c0900 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -26,4 +26,4 @@ requests muskingumcunge -hydrodataset +hydrodata diff --git a/scripts/check_data_format.py b/scripts/check_data_format.py new file mode 100644 index 0000000..4827ac4 --- /dev/null +++ b/scripts/check_data_format.py @@ -0,0 +1,27 @@ +import argparse + +from hydromodel.datasets.data_preprocess import check_tsdata_format + + +def main(): + parser = argparse.ArgumentParser( + description="Check the format of hydrological data." + ) + parser.add_argument( + "--data_file", + type=str, + required=True, + help="Path to the hydrological data file", + ) + + args = parser.parse_args() + file_path = args.data_file + + if check_tsdata_format(file_path): + print("Data format is correct.") + else: + print("Data format is incorrect.") + + +if __name__ == "__main__": + main() diff --git a/test/test_data.py b/test/test_data.py index c4c8342..b6c6efc 100644 --- a/test/test_data.py +++ b/test/test_data.py @@ -1,7 +1,7 @@ """ Author: Wenyu Ouyang Date: 2022-10-25 21:16:22 -LastEditTime: 2024-03-22 09:54:30 +LastEditTime: 2024-03-25 11:29:10 LastEditors: Wenyu Ouyang Description: Test for data preprocess FilePath: \hydro-model-xaj\test\test_data.py @@ -9,6 +9,7 @@ """ import os + from hydrodataset import Camels from hydromodel import SETTING @@ -21,7 +22,3 @@ def test_load_dataset(): ["01013500"], ["2010-01-01", "2014-01-01"], ["streamflow"] ) print(data) - - -def test_read_your_own_data(): - pass diff --git a/test/test_data_preprocess.py b/test/test_data_preprocess.py new file mode 100644 index 0000000..99becc0 --- /dev/null +++ b/test/test_data_preprocess.py @@ -0,0 +1,234 @@ +import pytest +import os +import pandas as pd +import xarray as xr + +from hydromodel.datasets import * +from hydromodel.datasets.data_preprocess import process_and_save_data_as_nc +from hydromodel.datasets.data_preprocess import check_tsdata_format +from hydromodel.datasets.data_preprocess import check_basin_attr_format +from hydromodel.datasets.data_preprocess import check_folder_contents + + +@pytest.fixture() +def basin_attrs_file(tmp_path): + # Create a temporary CSV file with required columns + file_path = tmp_path / "basin_attributes.csv" + data = pd.DataFrame( + { + ID_NAME: [1, 2, 3], + NAME_NAME: ["Basin A", "Basin B", "Basin C"], + AREA_NAME: [100, 200, 300], + } + ) + data.to_csv(file_path, index=False) + return str(file_path) + + +@pytest.fixture() +def all_data_dir(tmp_path): + # Create time series data files for basins 1, 2, and 3 + for basin_id in [1, 2, 3]: + file_name = f"basin_{basin_id}.csv" + file_path = tmp_path / file_name + + data = pd.DataFrame( + { + TIME_NAME: [ + "2022-01-01 00:00:00", + "2022-01-02 00:00:00", + "2022-01-03 00:00:00", + ], + PET_NAME: [1, 2, 3], + PRCP_NAME: [4, 5, 6], + FLOW_NAME: [7, 8, 9], + ET_NAME: [10, 11, 12], + NODE_FLOW_NAME: [13, 14, 15], + } + ) + data.to_csv(file_path, index=False) + + return str(tmp_path) + + +def test_check_basin_attributes_format_with_valid_file(basin_attrs_file): + assert check_basin_attr_format(basin_attrs_file) == True + + +def test_check_basin_attributes_format_with_missing_columns(basin_attrs_file): + # Remove the 'name' column from the file + data = pd.read_csv(basin_attrs_file) + data.drop(columns=["name"], inplace=True) + data.to_csv(basin_attrs_file, index=False) + + assert check_basin_attr_format(basin_attrs_file) == False + + +def test_check_basin_attributes_format_with_invalid_file(basin_attrs_file): + # Write invalid data to the file + with open(basin_attrs_file, "w") as f: + f.write("Invalid data") + + assert check_basin_attr_format(basin_attrs_file) == False + + +def test_check_your_own_data(all_data_dir): + """ + Test to check the format of hydrological modeling data. + """ + # Define a sample file path + file_path = os.path.join(all_data_dir, "basin_1.csv") + + # Check the format of hydrological modeling data + result = check_tsdata_format(file_path) + + # Assert that the result is True + assert result + + # Clean up the sample file + os.remove(file_path) + + +def test_check_your_own_data_missing_required_columns(tmpdir): + """ + Test to check the format of hydrological modeling data with missing required columns. + """ + # Define a sample file path + file_path = os.path.join(str(tmpdir), "hydro_data.csv") + + # Create a sample file with missing required columns + sample_data = pd.DataFrame( + { + PRCP_NAME: [4, 5, 6], + FLOW_NAME: [7, 8, 9], + ET_NAME: [10, 11, 12], + NODE_FLOW_NAME: [13, 14, 15], + } + ) + sample_data.to_csv(file_path, index=False) + + # Check the format of hydrological modeling data + result = check_tsdata_format(file_path) + + # Assert that the result is False + assert not result + + # Clean up the sample file + os.remove(file_path) + + +def test_check_your_own_data_missing_optional_columns(tmpdir): + """ + Test to check the format of hydrological modeling data with missing optional columns. + """ + # Define a sample file path + file_path = os.path.join(str(tmpdir), "hydro_data.csv") + + # Create a sample file with missing optional columns + sample_data = pd.DataFrame( + { + TIME_NAME: [ + "2022-01-01 00:00:00", + "2022-01-02 00:00:00", + "2022-01-03 00:00:00", + ], + PET_NAME: [1, 2, 3], + PRCP_NAME: [4, 5, 6], + FLOW_NAME: [7, 8, 9], + } + ) + sample_data.to_csv(file_path, index=False) + + # Check the format of hydrological modeling data + result = check_tsdata_format(file_path) + + # Assert that the result is True + assert result + + # Clean up the sample file + os.remove(file_path) + + +def test_check_your_own_data_invalid_file(tmpdir): + """ + Test to check the format of an invalid hydrological modeling data file. + """ + # Define a sample file path + file_path = os.path.join(str(tmpdir), "hydro_data.csv") + + # Create an invalid file (not a CSV) + with open(file_path, "w") as f: + f.write("This is not a valid CSV file.") + + # Check the format of hydrological modeling data + result = check_tsdata_format(file_path) + + # Assert that the result is False + assert not result + + # Clean up the sample file + os.remove(file_path) + + +def test_check_folder_contents_with_valid_files(basin_attrs_file, all_data_dir): + assert check_folder_contents(all_data_dir, basin_attrs_file) == True + + +def test_check_folder_contents_with_missing_time_series_data_file( + basin_attrs_file, tmp_path +): + # 创建一个模拟的时序数据文件,然后删除它,模拟缺失的文件场景 + file_path = tmp_path / "basin_2.csv" + with open(file_path, "w") as f: + f.write("Dummy data") + os.remove(file_path) + + # 调用检查函数,确保它正确地返回 False + assert check_folder_contents(tmp_path, basin_attrs_file) == False + + +def test_check_folder_contents_with_invalid_time_series_data_file( + basin_attrs_file, tmp_path +): + # Create an invalid time series data file for basin 1 + file_path = tmp_path / "basin_1.csv" + with open(file_path, "w") as f: + f.write("Invalid data") + + assert check_folder_contents(tmp_path, basin_attrs_file) == False + + +def test_check_folder_contents_with_missing_basin_attributes_file( + tmp_path, basin_attrs_file +): + # Remove basin attributes file + os.remove(basin_attrs_file) + + assert check_folder_contents(tmp_path) == False + + +def test_check_folder_contents_with_missing_basin_attributes_column( + basin_attrs_file, all_data_dir +): + # Remove 'name' column from basin attributes file + data = pd.read_csv(basin_attrs_file) + data.drop(columns=["name"], inplace=True) + data.to_csv(basin_attrs_file, index=False) + + assert check_folder_contents(all_data_dir, basin_attrs_file) == False + + +def test_process_and_save_data_as_nc_with_valid_data(all_data_dir, basin_attrs_file): + # Create a temporary folder for testing + folder_path = os.path.join(all_data_dir, "test_folder") + os.makedirs(folder_path) + + # Call the function to process and save the data as NetCDF files + result = process_and_save_data_as_nc(all_data_dir, folder_path) + + # Assert that the function returns True + assert result + + # Assert that the NetCDF files are created + assert os.path.exists(os.path.join(folder_path, "attributes.nc")) + assert os.path.exists(os.path.join(folder_path, "timeseries.nc"))