diff --git a/Snakefile b/Snakefile index ce3bd9f13..763ce2fd3 100644 --- a/Snakefile +++ b/Snakefile @@ -254,7 +254,7 @@ rule build_shapes: "benchmarks/" + RDIR + "build_shapes" threads: 1 resources: - mem_mb=500, + mem_mb=3096, script: "scripts/build_shapes.py" diff --git a/config.default.yaml b/config.default.yaml index 934e4ba7b..bde33a35b 100644 --- a/config.default.yaml +++ b/config.default.yaml @@ -90,7 +90,6 @@ build_shape_options: out_logging: true # When true, logging is printed to console year: 2020 # reference year used to derive shapes, info on population and info on GDP nprocesses: 3 # number of processes to be used in build_shapes - nchunks: 3 # number of data chuncks for build_shapes. If not specified or smaller than nprocesses, specified as nprocesses worldpop_method: "standard" # "standard" pulls from web 1kmx1km raster, "api" pulls from API 100mx100m raster, false (not "false") no pop addition to shape which is useful when generating only cutout gdp_method: "standard" # "standard" pulls from web 1x1km raster, false (not "false") no gdp addition to shape which useful when generating only cutout contended_flag: "set_by_country" # "set_by_country" assigns the contended areas to the countries according to the GADM database, "drop" drops these contended areas from the model diff --git a/config.tutorial.yaml b/config.tutorial.yaml index 6aba3e46c..29d6e700a 100644 --- a/config.tutorial.yaml +++ b/config.tutorial.yaml @@ -104,7 +104,6 @@ build_shape_options: out_logging: true # When true, logging is printed to console year: 2020 # reference year used to derive shapes, info on population and info on GDP nprocesses: 2 # number of processes to be used in build_shapes - nchunks: 2 # number of data chuncks for build_shapes. If not specified or smaller than nprocesses, specified as nprocesses worldpop_method: "standard" # "standard" pulls from web 1kmx1km raster, "api" pulls from API 100mx100m raster, false (not "false") no pop addition to shape which is useful when generating only cutout gdp_method: "standard" # "standard" pulls from web 1x1km raster, false (not "false") no gdp addition to shape which useful when generating only cutout contended_flag: "set_by_country" # "set_by_country" assigns the contended areas to the countries according to the GADM database, "drop" drops these contended areas from the model diff --git a/doc/configtables/build_shape_options.csv b/doc/configtables/build_shape_options.csv index 446dab8bd..78df78713 100644 --- a/doc/configtables/build_shape_options.csv +++ b/doc/configtables/build_shape_options.csv @@ -4,7 +4,6 @@ update_file, bool, "{True, False}", "True: all input files are downloaded again out_logging, bool, "{True, False}", "True: Logging is printed in the console." year,, "past year; e.g. YYYY", "Reference year used to derive shapes, info on population and info on GDP." nprocesses, int,, "Number of processes to be used in build_shapes." -nchunks, int,, "Number of data chuncks for build_shapes. If not specified or smaller than nprocesses, specified as nprocesses." worldpop_method,, "{""standard"", ""api"", false}", "Specifies how population is added to every shape: ""standard"" pulls from web 1kmx1km raster; ""api"" pulls from API 100mx100m raster; false (not ""false"") no population addition to shape. This is useful when generating only cutout." gdp_method,, "{""standard"", false}", "Specifies how GDP is added to every shape: ""standard"" pulls from web 1x1km raster; false (not ""false"") no gdp addition to shape. This is useful when generating only cutout." contended_flag,, "{""set_by_country"", ""drop""}", "Specifies what to do with contended countries: ""set_by_country"" assigns the contended areas to the countries according to the GADM database; ""drop"" drops the contended areas from the model." diff --git a/envs/environment.yaml b/envs/environment.yaml index 93c7df9f7..b65376cb7 100644 --- a/envs/environment.yaml +++ b/envs/environment.yaml @@ -23,7 +23,7 @@ dependencies: - xlrd - openpyxl - seaborn -- snakemake-minimal +- snakemake-minimal<8 - memory_profiler - ruamel.yaml<=0.17.26 - pytables @@ -43,6 +43,8 @@ dependencies: - matplotlib<=3.5.2 - reverse-geocode - country_converter +- pyogrio +- numba - py7zr # Keep in conda environment when calling ipython diff --git a/scripts/build_shapes.py b/scripts/build_shapes.py index 2125780a3..ccb303899 100644 --- a/scripts/build_shapes.py +++ b/scripts/build_shapes.py @@ -31,6 +31,7 @@ two_digits_2_name_country, ) from rasterio.mask import mask +from rasterio.windows import Window from shapely.geometry import LineString, MultiPolygon, Point, Polygon from shapely.geometry.base import BaseGeometry from shapely.ops import unary_union @@ -39,6 +40,10 @@ sets_path_to_root("pypsa-earth") +from numba import njit +from numba.core import types +from numba.typed import Dict + logger = create_logger(__name__) @@ -85,7 +90,6 @@ def download_GADM(country_code, update=False, out_logging=False): ------- gpkg file per country """ - GADM_filename = get_GADM_filename(country_code) GADM_url = f"https://geodata.ucdavis.edu/gadm/gadm4.1/gpkg/{GADM_filename}.gpkg" @@ -100,7 +104,7 @@ def download_GADM(country_code, update=False, out_logging=False): if not os.path.exists(GADM_inputfile_gpkg) or update is True: if out_logging: logger.warning( - f"Stage 4/4: {GADM_filename} of country code {country_code} does not exist, downloading to {GADM_inputfile_gpkg}" + f"Stage 5 of 5: {GADM_filename} of country {two_digits_2_name_country(country_code)} does not exist, downloading to {GADM_inputfile_gpkg}" ) # create data/osm directory os.makedirs(os.path.dirname(GADM_inputfile_gpkg), exist_ok=True) @@ -210,7 +214,7 @@ def get_GADM_layer( # read gpkg file geodf_temp = gpd.read_file( - file_gpkg, layer="ADM_ADM_" + str(cur_layer_id) + file_gpkg, layer="ADM_ADM_" + str(cur_layer_id), engine="pyogrio" ).to_crs(geo_crs) geodf_temp = filter_gadm( @@ -257,7 +261,7 @@ def countries(countries, geo_crs, contended_flag, update=False, out_logging=Fals "Create country shapes" if out_logging: - logger.info("Stage 1 of 4: Create country shapes") + logger.info("Stage 1 of 5: Create country shapes") # download data if needed and get the layer id 0, corresponding to the countries df_countries = get_GADM_layer( @@ -284,7 +288,7 @@ def countries(countries, geo_crs, contended_flag, update=False, out_logging=Fals def country_cover(country_shapes, eez_shapes=None, out_logging=False, distance=0.02): if out_logging: - logger.info("Stage 3 of 4: Merge country shapes to create continent shape") + logger.info("Stage 3 of 5: Merge country shapes to create continent shape") shapes = country_shapes.apply(lambda x: x.buffer(distance)) shapes_list = list(shapes) @@ -325,7 +329,7 @@ def load_EEZ(countries_codes, geo_crs, EEZ_gpkg="./data/eez/eez_v11.gpkg"): f"File EEZ {EEZ_gpkg} not found, please download it from https://www.marineregions.org/download_file.php?name=World_EEZ_v11_20191118_gpkg.zip and copy it in {os.path.dirname(EEZ_gpkg)}" ) - geodf_EEZ = gpd.read_file(EEZ_gpkg).to_crs(geo_crs) + geodf_EEZ = gpd.read_file(EEZ_gpkg, engine="pyogrio").to_crs(geo_crs) geodf_EEZ.dropna(axis=0, how="any", subset=["ISO_TER1"], inplace=True) # [["ISO_TER1", "TERRITORY1", "ISO_SOV1", "ISO_SOV2", "ISO_SOV3", "geometry"]] geodf_EEZ = geodf_EEZ[["ISO_TER1", "geometry"]] @@ -364,7 +368,7 @@ def eez( """ if out_logging: - logger.info("Stage 2 of 4: Create offshore shapes") + logger.info("Stage 2 of 5: Create offshore shapes") # load data df_eez = load_EEZ(countries, geo_crs, EEZ_gpkg) @@ -467,14 +471,12 @@ def download_WorldPop_standard( WorldPop_filename : str Name of the file """ - if out_logging: - logger.info("Stage 3/4: Download WorldPop datasets") if country_code == "XK": - WorldPop_filename = f"srb_ppp_{year}_UNadj_constrained.tif" + WorldPop_filename = f"kos_ppp_{year}_constrained.tif" WorldPop_urls = [ - f"https://data.worldpop.org/GIS/Population/Global_2000_2020_Constrained/2020/BSGM/SRB/{WorldPop_filename}", - f"https://data.worldpop.org/GIS/Population/Global_2000_2020_Constrained/2020/maxar_v1/SRB/{WorldPop_filename}", + f"https://data.worldpop.org/GIS/Population/Global_2000_2020_Constrained/2020/BSGM/KOS/{WorldPop_filename}", + f"https://data.worldpop.org/GIS/Population/Global_2000_2020_Constrained/2020/maxar_v1/KOS/{WorldPop_filename}", ] else: WorldPop_filename = f"{two_2_three_digits_country(country_code).lower()}_ppp_{year}_UNadj_constrained.tif" @@ -491,7 +493,7 @@ def download_WorldPop_standard( if not os.path.exists(WorldPop_inputfile) or update is True: if out_logging: logger.warning( - f"Stage 4/4: {WorldPop_filename} does not exist, downloading to {WorldPop_inputfile}" + f"Stage 3 of 5: {WorldPop_filename} does not exist, downloading to {WorldPop_inputfile}" ) # create data/osm directory os.makedirs(os.path.dirname(WorldPop_inputfile), exist_ok=True) @@ -505,7 +507,7 @@ def download_WorldPop_standard( loaded = True break if not loaded: - logger.error(f"Stage 4/4: Impossible to download {WorldPop_filename}") + logger.error(f"Stage 3 of 5: Impossible to download {WorldPop_filename}") return WorldPop_inputfile, WorldPop_filename @@ -535,8 +537,6 @@ def download_WorldPop_API( WorldPop_filename : str Name of the file """ - if out_logging: - logger.info("Stage 3/4: Download WorldPop datasets") WorldPop_filename = f"{two_2_three_digits_country(country_code).lower()}_ppp_{year}_UNadj_constrained.tif" # Request to get the file @@ -560,7 +560,7 @@ def download_WorldPop_API( loaded = True break if not loaded: - logger.error(f"Stage 4/4: Impossible to download {WorldPop_filename}") + logger.error(f"Stage 3 of 5: Impossible to download {WorldPop_filename}") return WorldPop_inputfile, WorldPop_filename @@ -572,7 +572,7 @@ def convert_GDP(name_file_nc, year=2015, out_logging=False): """ if out_logging: - logger.info("Stage 4/4: Access to GDP raster data") + logger.info("Stage 5 of 5: Access to GDP raster data") # tif namefile name_file_tif = name_file_nc[:-2] + "tif" @@ -599,7 +599,7 @@ def convert_GDP(name_file_nc, year=2015, out_logging=False): if year not in list_years: if out_logging: logger.warning( - f"Stage 3/4 GDP data of year {year} not found, selected the most recent data ({int(list_years[-1])})" + f"Stage 5 of 5 GDP data of year {year} not found, selected the most recent data ({int(list_years[-1])})" ) year = float(list_years[-1]) @@ -623,7 +623,7 @@ def load_GDP( """ if out_logging: - logger.info("Stage 4/4: Access to GDP raster data") + logger.info("Stage 5 of 5: Access to GDP raster data") # path of the nc file name_file_tif = name_file_nc[:-2] + "tif" @@ -634,7 +634,7 @@ def load_GDP( if update | (not os.path.exists(GDP_tif)): if out_logging: logger.warning( - f"Stage 4/4: File {name_file_tif} not found, the file will be produced by processing {name_file_nc}" + f"Stage 5 of 5: File {name_file_tif} not found, the file will be produced by processing {name_file_nc}" ) convert_GDP(name_file_nc, year, out_logging) @@ -696,7 +696,7 @@ def add_gdp_data( - Includes a new column ["gdp"] """ if out_logging: - logger.info("Stage 4/4: Add gdp data to GADM GeoDataFrame") + logger.info("Stage 5 of 5: Add gdp data to GADM GeoDataFrame") # initialize new gdp column df_gadm["gdp"] = 0.0 @@ -716,41 +716,395 @@ def add_gdp_data( return df_gadm -def _init_process_pop(df_gadm_, year_, worldpop_method_): - global df_gadm, year, worldpop_method - df_gadm, year, worldpop_method = df_gadm_, year_, worldpop_method_ +def _init_process_pop(df_gadm_, df_tasks_, dict_worldpop_file_locations_): + global df_gadm, df_tasks + df_gadm, df_tasks = df_gadm_, df_tasks_ + + global dict_worldpop_file_locations + dict_worldpop_file_locations = dict_worldpop_file_locations_ + + +def process_function_population(row_id): + """ + Function that reads the task from df_tasks and executes all the methods. + + to obtain population values for the specified region + ------- + Inputs: + row_id: integer which indicates a specific row of df_tasks + -------- + Outputs: + windowed_pop_count: Dataframe containing "GADM_ID" and "pop" columns + It represents the amount of population per region (GADM_ID), + for the settings given by the row in df_tasks + """ + # Get the current task values + current_row = df_tasks.iloc[row_id] + c_code = current_row["c_code"] + window_dimensions = current_row["window_dimensions"] + transform = current_row["affine_transform"] + latlong_topleft = current_row["latlong_coordinate_topleft"] + latlong_botright = current_row["latlong_coordinate_botright"] -# Auxiliary function to calculate population data in a parallel way -def _process_func_pop(gadm_idxs): + # Obtain the inputfile location from dict + WorldPop_inputfile = dict_worldpop_file_locations[c_code] # get subset by country code - df_gadm_subset = df_gadm.loc[gadm_idxs].copy() + country_rows = df_gadm.loc[df_gadm["country"] == c_code] + + np_pop_val, np_pop_xy = get_worldpop_val_xy(WorldPop_inputfile, window_dimensions) + + # If no values are present in the current window skip the remaining steps + if len(np_pop_val) == 0: + return [] + + # get the geomask with id mappings + region_geomask, id_mapping = compute_geomask_region( + country_rows, transform, window_dimensions, latlong_topleft, latlong_botright + ) + + # If no values are present in the id_mapping skip the remaining steps + if len(id_mapping) == 0: + return [] + + # Calculate the population for each region + windowed_pop_count = sum_values_using_geomask( + np_pop_val, np_pop_xy, region_geomask, id_mapping + ) + return windowed_pop_count + + +def get_worldpop_val_xy(WorldPop_inputfile, window_dimensions): + """ + Function to extract data from .tif input file + ------- + Inputs: + WorldPop_inputfile: file location of worldpop file + window_dimensions: dimensions of window used when reading file + -------- + Outputs: + np_pop_valid: array filled with values for each nonzero pixel in the worldpop file + np_pop_xy: array with [x,y] coordinates of the corresponding nonzero values in np_pop_valid + """ + col_offset, row_offset, width, height = window_dimensions + + current_window = Window(col_offset, row_offset, width, height) + + # Open the file using rasterio + with rasterio.open(WorldPop_inputfile) as src: + # --- Process the pixels in the image for population data --- + + # Read the gray layer (1) to get an np.array of this band + # Rasterio doesn't support lower than float32 readout + # Hence np_pop_raster will have nbytes = 4 * width * height + np_pop_raster = src.read(1, window=current_window) + + # Set 'nodata' values to 0 + np_pop_raster[np_pop_raster == src.nodata] = 0 + + # Set np_pop_xy to pixel locations of non zero values + np_pop_xy = np_pop_raster.nonzero() + + # Transform to get [ x, y ] array + # np_pop_xy as 'I' (uintc), see + # https://numpy.org/doc/stable/reference/arrays.scalars.html#numpy.uintc + np_pop_xy = np.array([np_pop_xy[0], np_pop_xy[1]]).T.astype("I") + + # Extract the values from the locations of non zero pixels + np_pop_valid = np_pop_raster[np_pop_xy.T[0], np_pop_xy.T[1]] - country_sublist = df_gadm_subset["country"].unique() + return np_pop_valid, np_pop_xy - for c_code in country_sublist: - # get worldpop image - WorldPop_inputfile, WorldPop_filename = download_WorldPop( - c_code, worldpop_method, year, False, False + +def compute_geomask_region( + country_rows, affine_transform, window_dimensions, latlong_topleft, latlong_botright +): + """ + Function to mask geometries into np_map_ID using an incrementing counter + ------- + Inputs: + country_rows: geoDataFrame filled with geometries and their GADM_ID + affine_transform: affine transform of current window + window_dimensions: dimensions of window used when reading file + latlong_topleft: [latitude, longitude] of top left corner of the window + latlong_botright: [latitude, longitude] of bottom right corner of the window + -------- + Outputs: + np_map_ID.astype("H"): np_map_ID contains an ID for each location (undefined is 0) + dimensions are taken from window_dimensions, .astype("H") for memory savings + id_result: + DataFrame of the mapping from id (from counter) to GADM_ID + """ + col_offset, row_offset, x_axis_len, y_axis_len = window_dimensions + + # Set an empty numpy array with the dimensions of the country .tif file + # np_map_ID will contain an ID for each location (undefined is 0) + # ID corresponds to a specific geometry in country_rows + np_map_ID = np.zeros((y_axis_len, x_axis_len)) + + # List to contain the mappings of id to GADM_ID + id_to_GADM_ID = [] + + # Loop the country_rows geoDataFrame + for i in range(len(country_rows)): + # Set the current geometry + cur_geometry = country_rows.iloc[i]["geometry"] + + latitude_min = cur_geometry.bounds[1] + latitude_max = cur_geometry.bounds[3] + + # Check if bounds of geometry overlap the window + # In the following cases we can skip the rest of the loop + # If the geometry is above the window + if latitude_min > latlong_topleft[0]: + continue + # If the geometry is below the window + if latitude_max < latlong_botright[0]: + continue + + # Generate a mask for the specific geometry + temp_mask = rasterio.features.geometry_mask( + [cur_geometry], + (y_axis_len, x_axis_len), + transform=affine_transform, + all_touched=True, + invert=True, ) - idxs_country = df_gadm_subset[df_gadm_subset["country"] == c_code].index + # Map the values of counter value to np_map_ID + np_map_ID[temp_mask] = i + 1 + + # Store the id -> GADM_ID mapping + id_to_GADM_ID.append([i + 1, country_rows.iloc[i]["GADM_ID"]]) + + if len(id_to_GADM_ID) > 0: + id_result = pd.DataFrame(id_to_GADM_ID).set_index(0) + else: + id_result = pd.DataFrame() + + # Return np_map_ID as type 'H' np.ushort + # 'H' -> https://numpy.org/doc/stable/reference/arrays.scalars.html#numpy.ushort + # This lowers memory usage, note: ID has to be within the range [0,65535] + return np_map_ID.astype("H"), id_result + + +def sum_values_using_geomask(np_pop_val, np_pop_xy, region_geomask, id_mapping): + """ + Function that sums all the population values in np_pop_val into the correct + GADM_ID It uses np_pop_xy to access the key stored in region_geomask[x][y] + + The relation of this key to GADM_ID is stored in id_mapping + ------- + Inputs: + np_pop_val: array filled with values for each nonzero pixel in the worldpop file + np_pop_xy: array with [x,y] coordinates of the corresponding nonzero values in np_pop_valid + region_geomask: array with dimensions of window, values are keys that map to GADM_ID using id_mapping + id_mapping: Dataframe that contains mappings of region_geomask values to GADM_IDs + -------- + Outputs: + df_pop_count: Dataframe with columns + - "GADM_ID" + - "pop" containing population of GADM_ID region + """ + # Initialize a dictionary + dict_id = Dict.empty( + key_type=types.int64, + value_type=types.int64, + ) + dict_id[0] = 0 + counter = 1 + # Loop over ip mapping and add indices to the dictionary + for ID_index in np.array(id_mapping.index): + dict_id[ID_index] = counter + counter += 1 + + # Declare an array to contain population counts + np_pop_count = np.zeros(len(id_mapping) + 1) + + # Calculate population count of region using a numba njit compiled function + np_pop_count = loop_and_extact_val_x_y( + np_pop_count, np_pop_val, np_pop_xy, region_geomask, dict_id + ) + + df_pop_count = pd.DataFrame(np_pop_count, columns=["pop"]) + df_pop_count["GADM_ID"] = np.append(np.array("NaN"), id_mapping.values) + df_pop_count = df_pop_count[["GADM_ID", "pop"]] + + return df_pop_count + + +@njit +def loop_and_extact_val_x_y( + np_pop_count, np_pop_val, np_pop_xy, region_geomask, dict_id +): + """ + Function that will be compiled using @njit (numba) It takes all the + population values from np_pop_val and stores them in np_pop_count. + + where each location in np_pop_count is mapped to a GADM_ID through dict_id (id_mapping by extension) + ------- + Inputs: + np_pop_count: np.zeros array, which will store population counts + np_pop_val: array filled with values for each nonzero pixel in the worldpop file + np_pop_xy: array with [x,y] coordinates of the corresponding nonzero values in np_pop_valid + region_geomask: array with dimensions of window, values are keys that map to GADM_ID using id_mapping + dict_id: numba typed.dict containing id_mapping.index -> location in np_pop_count + -------- + Outputs: + np_pop_count: np.array containing population counts + """ + # Loop the population data + for i in range(len(np_pop_val)): + cur_value = np_pop_val[i] + cur_x, cur_y = np_pop_xy[i] + + # Set the current id to the id at the same coordinate of the geomask + cur_id = region_geomask[int(cur_x)][int(cur_y)] - with rasterio.open(WorldPop_inputfile) as src: - for i in idxs_country: - df_gadm_subset.loc[i, "pop"] = _sum_raster_over_mask( - df_gadm_subset.geometry.loc[i], src - ) + # Add the current value to the population + np_pop_count[dict_id[cur_id]] += cur_value - return df_gadm_subset + return np_pop_count -# Auxiliary function to download WorldPop data in a parallel way -def _process_func_download_pop(c_code): - WorldPop_inputfile, WorldPop_filename = download_WorldPop( - c_code, worldpop_method, year, False, False +def calculate_transform_and_coords_for_window( + current_transform, window_dimensions, original_window=False +): + """ + Function which calculates the [lat,long] corners of the window given + window_dimensions, + + if not(original_window) it also changes the affine transform to match the window + ------- + Inputs: + current_transform: affine transform of source image + window_dimensions: dimensions of window used when reading file + original_window: boolean to track if window covers entire country + -------- + Outputs: + A list of: [ + adjusted_transform: affine transform adjusted to window + coordinate_topleft: [latitude, longitude] of top left corner of the window + coordinate_botright: [latitude, longitude] of bottom right corner of the window + ] + """ + col_offset, row_offset, x_axis_len, y_axis_len = window_dimensions + + # Declare a affine transformer with given current_transform + transformer = rasterio.transform.AffineTransformer(current_transform) + + # Obtain the coordinates of the upper left corner of window + window_topleft_longitude, window_topleft_latitude = transformer.xy( + row_offset, col_offset ) + # Obtain the coordinates of the bottom right corner of window + window_botright_longitude, window_botright_latitude = transformer.xy( + row_offset + y_axis_len, col_offset + x_axis_len + ) + + if original_window: + # If the window covers the entire country then use original affine transform + adjusted_transform = current_transform + else: + # Set the adjusted transform to the correct lat and long + adjusted_transform = rasterio.Affine( + current_transform[0], + current_transform[1], + window_topleft_longitude, + current_transform[3], + current_transform[4], + window_topleft_latitude, + ) + + coordinate_topleft = [window_topleft_latitude, window_topleft_longitude] + coordinate_botright = [window_botright_latitude, window_botright_longitude] + + return [adjusted_transform, coordinate_topleft, coordinate_botright] + + +def generate_df_tasks(c_code, mem_read_limit_per_process, WorldPop_inputfile): + """ + Function to generate a list of tasks based on the memory constraints. + + One task represents a single window of the image + ------- + Inputs: + c_code: country code + mem_read_limit_per_process: memory limit for src.read() operation + WorldPop_inputfile: file location of worldpop file + -------- + Outputs: + Dataframe of task_list + """ + task_list = [] + + # Read out dimensions and transform of image + with rasterio.open(WorldPop_inputfile) as src: + worldpop_y_dim, worldpop_x_dim = src.shape + transform = src.meta["transform"] + block_y_dim, block_x_dim = [int(i) for i in src.block_shapes[0]] + + # Rasterio doesn't support lower than float32 readout + # Hence reading the file will take up: nbytes = 4 * y_dim * x_dim + expected_bytes_input_read = 4 * worldpop_y_dim * worldpop_x_dim + + # Introduce a limit that avoids overfilling RAM during readout + # Ensure worldpop_byte_limit >= 883 * 10**6 (minimum memory for 'US') + worldpop_byte_limit = max(883, mem_read_limit_per_process) * 10**6 + + if expected_bytes_input_read < worldpop_byte_limit: + # If the rasterio read will be within byte limit + bool_original_window = True + # Set the window to entire image dimensions + current_window = [0, 0, worldpop_x_dim, worldpop_y_dim] + # Get latlong coordinates of window + transform_and_coords = calculate_transform_and_coords_for_window( + transform, current_window, bool_original_window + ) + task_list.append( + [c_code, current_window, bool_original_window] + transform_and_coords + ) + else: + # Reading operation has to be split up into multiple windows + bool_original_window = False + # Set the windows x dimension to the input x dimension (width) + window_x_dim = worldpop_x_dim + # From testing we can assume max x dimension will always fit in memory: + # Largest memory requirement is 'US' at ~882.1 MB = 4 * 512 * window_x_dim + # Hence worldpop_byte_limit has to be greater than 883 MB + + # As the window spans the x dimension, set column offset to 0 + window_col_offset = 0 + + # Calculate the bytes for reading the window using window_x_dim (readout is float32) + read_block_size = 4 * block_y_dim * window_x_dim + + # Calculate the amount of blocks that fit into the memory budget + # Using the calculated x dimension + window_block_count = int(worldpop_byte_limit // read_block_size) + + # Multiply the y_dimension by the amount of blocks in the window + # window_y_dim will be height of the window + window_y_dim = window_block_count * block_y_dim + + # Calculate row offsets for all windows + window_row_offset = np.arange(0, worldpop_y_dim, window_y_dim) + + # Loop the windows and add task for each one to task_list + for row_offset in window_row_offset: + current_window = [window_col_offset, row_offset, window_x_dim, window_y_dim] + + transform_and_coords = calculate_transform_and_coords_for_window( + transform, current_window, bool_original_window + ) + + task_list.append( + [c_code, current_window, bool_original_window] + transform_and_coords + ) + + return pd.DataFrame(task_list) + def add_population_data( df_gadm, @@ -759,12 +1113,26 @@ def add_population_data( year=2020, update=False, out_logging=False, + mem_read_limit_per_process=1024, nprocesses=2, - nchunks=2, disable_progressbar=False, ): """ Function to add population data to arbitrary number of shapes in a country. + It loads data from WorldPop raster files where each pixel represents the + population in that square region. Each square polygon (or pixel) is then + mapped into the corresponding GADM shape. Then the population in a GADM + shape is identified by summing over all pixels mapped to that region. + + This is performed with an iterative approach: + 1. All necessary WorldPop data tiff file are downloaded + 2. The so-called windows are created to handle RAM limitations related to large WorldPop files. + Large WorldPop files require significant RAM to handle, which may not be available, + hence, the entire activity is decomposed into multiple windows (or tasks). + Each window represents a subset of a raster file on which the following algorithm is applied. + Note: when enough RAM is available only a window is created for efficiency purposes. + 3. Execute all tasks by summing the values of the pixels mapped into each GADM shape. + Parallelization applies in this task. Inputs: ------- @@ -779,72 +1147,87 @@ def add_population_data( - Includes a new column ["pop"] """ - if out_logging: - logger.info("Stage 4/4 POP: Add population data to GADM GeoDataFrame") - - # initialize new population column + # Initialize new population column df_gadm["pop"] = 0.0 - tqdm_kwargs = dict( + # Initialize new dict to hold worldpop_inputfile strings + dict_worldpop_file_locations = {} + + # Initialize new dataframe to hold tasks for processing + df_tasks = pd.DataFrame() + + if out_logging: + logger.info( + "Stage 3 of 5: Download WorldPop datasets, method: " + str(worldpop_method) + ) + + tqdm_kwargs_download = dict( ascii=False, - desc="Compute population ", + desc="Downloading worldpop file per country and assigning tasks", ) - if (nprocesses is None) or (nprocesses == 1): - with tqdm(total=df_gadm.shape[0], **tqdm_kwargs) as pbar: - for c_code in country_codes: - # get subset by country code - country_rows = df_gadm.loc[df_gadm["country"] == c_code] - - # get worldpop image - WorldPop_inputfile, WorldPop_filename = download_WorldPop( - c_code, worldpop_method, year, update, out_logging - ) - - with rasterio.open(WorldPop_inputfile) as src: - for i, row in country_rows.iterrows(): - df_gadm.loc[i, "pop"] = _sum_raster_over_mask(row.geometry, src) - pbar.update(1) + with tqdm(total=len(country_codes), **tqdm_kwargs_download) as pbar: + for c_code in country_codes: + # Download worldpop image (if required) and get file location + WorldPop_inputfile, WorldPop_filename = download_WorldPop( + c_code, worldpop_method, year, update, out_logging + ) + dict_worldpop_file_locations[c_code] = WorldPop_inputfile - else: - # generator function to split a list ll into n_objs lists - def divide_list(ll, n_objs): - for i in range(0, len(ll), n_objs): - yield ll[i : i + n_objs] + # Given the memory constraint, generate a task for all windows to cover the country (c_code) + df_new_tasks = generate_df_tasks( + c_code, mem_read_limit_per_process, WorldPop_inputfile + ) - id_parallel_chunks = list( - divide_list(df_gadm.index, ceil(len(df_gadm) / nchunks)) - ) + df_tasks = pd.concat([df_tasks, df_new_tasks], ignore_index=True) - kwargs = { - "initializer": _init_process_pop, - "initargs": (df_gadm, year, worldpop_method), - "processes": nprocesses, - } - with mp.get_context("spawn").Pool(**kwargs) as pool: - if disable_progressbar: - list(pool.map(_process_func_download_pop, country_codes)) - _ = list(pool.map(_process_func_pop, id_parallel_chunks)) - for elem in _: - df_gadm.loc[elem.index, "pop"] = elem["pop"] - else: - list( - tqdm( - pool.imap(_process_func_download_pop, country_codes), - total=len(country_codes), - ascii=False, - desc="Download WorldPop ", - unit=" countries", - ) - ) - _ = list( - tqdm( - pool.imap(_process_func_pop, id_parallel_chunks), - total=len(id_parallel_chunks), - **tqdm_kwargs, - ) - ) - for elem in _: - df_gadm.loc[elem.index, "pop"] = elem["pop"] + pbar.update(1) + + df_tasks.columns = [ + "c_code", + "window_dimensions", + "is_original_window", + "affine_transform", + "latlong_coordinate_topleft", + "latlong_coordinate_botright", + ] + + if out_logging: + logger.info("Stage 4 of 5: Add population data to GADM GeoDataFrame") + + if out_logging: + logger.info("Stage 4 of 5: Starting multiprocessing") + + tqdm_kwargs_compute = dict( + ascii=False, desc="Compute population per window", unit=" window" + ) + lock = mp.Lock() + kwargs = { + "initializer": _init_process_pop, + "initargs": ( + df_gadm, + df_tasks, + dict_worldpop_file_locations, + ), + "processes": nprocesses, + } + # Spawn processes with the parameters from kwargs + with mp.get_context("spawn").Pool(**kwargs) as pool: + # Create a progress bar + with tqdm(total=len(df_tasks), **tqdm_kwargs_compute) as pbar: + # Give the pool a workload + for df_pop_count in pool.imap_unordered( + process_function_population, range(len(df_tasks)) + ): + # Acquire the lock before accessing df_gadm and pbar + with lock: + # Loop the regions and write population to df_gadm + for i in range(len(df_pop_count)): + gadm_id, pop_count = df_pop_count.iloc[i] + # Select the row with the same "GADM_ID" and set the population count + df_gadm.loc[df_gadm["GADM_ID"] == gadm_id, "pop"] += pop_count + + # update bar + pbar.update(1) def gadm( @@ -853,15 +1236,15 @@ def gadm( countries, geo_crs, contended_flag, + mem_mb, layer_id=2, update=False, out_logging=False, year=2020, nprocesses=None, - nchunks=None, ): if out_logging: - logger.info("Stage 4/4: Creation GADM GeoDataFrame") + logger.info("Stage 3 of 5: Creation GADM GeoDataFrame") # download data if needed and get the desired layer_id df_gadm = get_GADM_layer(countries, layer_id, geo_crs, contended_flag, update) @@ -878,6 +1261,7 @@ def gadm( ) if worldpop_method != False: + mem_read_limit_per_process = mem_mb / nprocesses # add the population data to the dataset add_population_data( df_gadm, @@ -886,8 +1270,8 @@ def gadm( year, update, out_logging, + mem_read_limit_per_process, nprocesses=nprocesses, - nchunks=nchunks, ) if gdp_method != False: @@ -901,11 +1285,9 @@ def gadm( ) # renaming 3 letter to 2 letter ISO code before saving GADM file - # solves issue: https://github.com/pypsa-meets-earth/pypsa-earth/issues/671 - df_gadm["GADM_ID"] = ( - df_gadm["GADM_ID"] - .str.split(".") - .apply(lambda id: three_2_two_digits_country(id[0]) + "." + ".".join(id[1:])) + # In the case of a contested territory in the form 'Z00.00_0', save 'AA.Z00.00_0' + df_gadm["GADM_ID"] = df_gadm["country"] + df_gadm["GADM_ID"].apply( + lambda x: "." + x if x[0] == "Z" else x[3:] ) df_gadm.set_index("GADM_ID", inplace=True) df_gadm["geometry"] = df_gadm["geometry"].map(_simplify_polys) @@ -928,22 +1310,21 @@ def gadm( out = snakemake.output + EEZ_gpkg = snakemake.input["eez"] + mem_mb = snakemake.resources["mem_mb"] + countries_list = snakemake.params.countries + geo_crs = snakemake.params.crs["geo_crs"] + distance_crs = snakemake.params.crs["distance_crs"] + layer_id = snakemake.params.build_shape_options["gadm_layer_id"] update = snakemake.params.build_shape_options["update_file"] out_logging = snakemake.params.build_shape_options["out_logging"] year = snakemake.params.build_shape_options["year"] nprocesses = snakemake.params.build_shape_options["nprocesses"] contended_flag = snakemake.params.build_shape_options["contended_flag"] - EEZ_gpkg = snakemake.input["eez"] worldpop_method = snakemake.params.build_shape_options["worldpop_method"] gdp_method = snakemake.params.build_shape_options["gdp_method"] - geo_crs = snakemake.params.crs["geo_crs"] - distance_crs = snakemake.params.crs["distance_crs"] - nchunks = snakemake.params.build_shape_options["nchunks"] - if nchunks < nprocesses: - logger.info(f"build_shapes data chunks set to nprocesses {nprocesses}") - nchunks = nprocesses country_shapes = countries( countries_list, @@ -971,11 +1352,11 @@ def gadm( countries_list, geo_crs, contended_flag, + mem_mb, layer_id, update, out_logging, year, nprocesses=nprocesses, - nchunks=nchunks, ) save_to_geojson(gadm_shapes, out.gadm_shapes)