diff --git a/cplus_core/analysis/__init__.py b/cplus_core/analysis/__init__.py index aca904d..504115d 100644 --- a/cplus_core/analysis/__init__.py +++ b/cplus_core/analysis/__init__.py @@ -1 +1,2 @@ from .analysis import ScenarioAnalysisTask +from .task_config import TaskConfig diff --git a/cplus_core/analysis/analysis.py b/cplus_core/analysis/analysis.py index e42f1d1..64c7a3e 100644 --- a/cplus_core/analysis/analysis.py +++ b/cplus_core/analysis/analysis.py @@ -3,7 +3,6 @@ Plugin tasks related to the scenario analysis """ -import datetime import os import uuid from pathlib import Path @@ -24,13 +23,14 @@ QgsTask, ) -from ..utils.conf import settings_manager, Settings +from ..utils.conf import Settings from ..definitions.defaults import ( SCENARIO_OUTPUT_FILE_NAME, ) from ..models.base import ScenarioResult from ..models.helpers import clone_activity -from ..utils.helper import align_rasters, clean_filename, tr, log, BaseFileUtils +from ..utils.helper import align_rasters, clean_filename, tr, BaseFileUtils +from .task_config import TaskConfig class ScenarioAnalysisTask(QgsTask): @@ -40,28 +40,22 @@ class ScenarioAnalysisTask(QgsTask): info_message_changed = QtCore.pyqtSignal(str, int) custom_progress_changed = QtCore.pyqtSignal(float) + log_received = QtCore.pyqtSignal(str, str, bool, bool) + task_cancelled = QtCore.pyqtSignal(bool) - def __init__( - self, - analysis_scenario_name, - analysis_scenario_description, - analysis_activities, - analysis_priority_layers_groups, - analysis_extent, - scenario, - ): + def __init__(self, task_config: TaskConfig): super().__init__() - self.analysis_scenario_name = analysis_scenario_name - self.analysis_scenario_description = analysis_scenario_description + self.task_config = task_config + self.analysis_scenario_name = task_config.scenario.name + self.analysis_scenario_description = task_config.scenario.description - self.analysis_activities = analysis_activities - self.analysis_priority_layers_groups = analysis_priority_layers_groups - self.analysis_extent = analysis_extent + self.analysis_activities = task_config.analysis_activities + self.analysis_priority_layers_groups = task_config.priority_layer_groups + self.analysis_extent = task_config.scenario.extent self.analysis_extent_string = None self.analysis_weighted_activities = [] self.scenario_result = None - self.scenario_directory = None self.success = True self.output = None @@ -74,28 +68,62 @@ def __init__( self.feedback = QgsProcessingFeedback() self.processing_context = QgsProcessingContext() - self.scenario = scenario + self.scenario = task_config.scenario + self.scenario_directory = task_config.base_dir def get_settings_value(self, name: str, default=None, setting_type=None): - return settings_manager.get_value(name, default, setting_type) + """Get attribute value by attribute name. - def get_scenario_directory(self): - base_dir = self.get_settings_value(Settings.BASE_DIR) - return os.path.join( - f"{base_dir}", - "scenario_" f'{datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}', - ) + :param name: Attribute name/config key + :type name: Settings + + :param default: Default value if not found, defaults to None + :type default: any, optional + + :param setting_type: type of attribute, defaults to None + :type setting_type: any, optional + + :return: Attribute value + :rtype: any + """ + return self.task_config.get_value(name, default) def get_priority_layer(self, identifier): - return settings_manager.get_priority_layer(identifier) + """Get priority layer dict by its UUID. + + :param identifier: Priority Layer UUID + :type identifier: str + + :return: Priority Layer dict + :rtype: typing.Dict + """ + return self.task_config.get_priority_layer(identifier) def get_activity(self, activity_uuid): - return settings_manager.get_activity(activity_uuid) + """Get activity object by its UUID. + + :param activity_uuid: activity UUID + :type activity_uuid: str + + :return: Activity object or None if not found + :rtype: typing.Union[Activity, None] + """ + return self.task_config.get_activity(activity_uuid) def get_priority_layers(self): - return settings_manager.get_priority_layers() + """Get all priority layers. + + :return: List of priority layer dictionary + :rtype: typing.List + """ + return self.task_config.get_priority_layers() def get_masking_layers(self): + """Get masking layers + + :return: list of mask layer file path + :rtype: List + """ masking_layers_paths = self.get_settings_value( Settings.MASK_LAYERS_PATHS, default=None ) @@ -105,8 +133,18 @@ def get_masking_layers(self): return masking_layers def cancel_task(self, exception=None): + """Cancel current task. + + :param exception: exception if any, defaults to None + :type exception: Exception, optional + """ self.error = exception - self.cancel() + try: + self.cancel() + except Exception: + pass + finally: + self.task_cancelled.emit(exception is not None) def log_message( self, @@ -115,7 +153,21 @@ def log_message( info: bool = True, notify: bool = True, ): - log(message, name=name, info=info, notify=notify) + """Handle when log is received from running task. + + :param message: Message log + :type message: str + + :param name: log name, defaults to "qgis_cplus" + :type name: str, optional + + :param info: True if it is information log, defaults to True + :type info: bool, optional + + :param notify: Not used in API, defaults to True + :type notify: bool, optional + """ + self.log_received.emit(message, name, info, notify) def on_terminated(self): """Called when the task is terminated.""" @@ -128,8 +180,6 @@ def on_terminated(self): def run(self): """Runs the main scenario analysis task operations""" - self.scenario_directory = self.get_scenario_directory() - BaseFileUtils.create_new_dir(self.scenario_directory) selected_pathway = None @@ -296,14 +346,31 @@ def finished(self, result: bool): self.log_message(f"Error from task scenario task {self.error}") def set_status_message(self, message): + """Handle when status message is updated. + + :param message: status message + :type message: str + """ self.status_message = message self.status_message_changed.emit(self.status_message) def set_info_message(self, message, level=Qgis.Info): + """Handle when info message is updated. + + :param message: message + :type message: str + :param level: severity level, defaults to Qgis.Info + :type level: int, optional + """ self.info_message = message self.info_message_changed.emit(self.info_message, level) def set_custom_progress(self, value): + """Handle when progress value is updated. + + :param value: progress value + :type value: float + """ self.custom_progress = value self.custom_progress_changed.emit(self.custom_progress) @@ -431,7 +498,9 @@ def replace_nodata(self, layer_path, output_path, nodata_value): return outputs is not None except Exception as e: - log(f"Problem replacing no data value from a snapping output, {e}") + self.log_message( + f"Problem replacing no data value from a snapping output, {e}" + ) return False @@ -824,7 +893,7 @@ def snap_layer( """ - input_result_path, reference_result_path = align_rasters( + input_result_path, logs = align_rasters( input_path, reference_path, extent, @@ -832,6 +901,8 @@ def snap_layer( rescale_values, resampling_method, ) + for log in logs: + self.log_message(log, info=("Problem" not in log)) if input_result_path is not None: result_path = Path(input_result_path) @@ -1373,7 +1444,7 @@ def run_activities_sieve(self, models, temporary_output=False): threshold size (in pixels) and replaces them with the pixel value of the largest neighbour polygon. - :param models: List of the analyzed implementation models + :param models: List of the analyzed activities :type models: typing.List[ImplementationModel] :param extent: Selected area of interest extent @@ -1390,9 +1461,7 @@ def run_activities_sieve(self, models, temporary_output=False): # Will not proceed if processing has been cancelled by the user return False - self.set_status_message( - tr("Applying sieve function to the implementation models") - ) + self.set_status_message(tr("Applying sieve function to the activities")) try: for model in models: @@ -1442,12 +1511,14 @@ def run_activities_sieve(self, models, temporary_output=False): ) # Actual processing calculation - # alg_params = { - # "INPUT": model.path, - # "THRESHOLD": threshold_value, - # "MASK_LAYER": mask_layer, - # "OUTPUT": output, - # } + alg_params = { + "INPUT": model.path, + "THRESHOLD": threshold_value, + "MASK_LAYER": mask_layer, + "OUTPUT": output, + } + + self.log_message(f"Used parameters for sieving: {alg_params} \n") input_name = os.path.splitext(os.path.basename(model.path))[0] diff --git a/cplus_core/analysis/task_config.py b/cplus_core/analysis/task_config.py new file mode 100644 index 0000000..2476ce4 --- /dev/null +++ b/cplus_core/analysis/task_config.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +""" + TaskConfig +""" +import typing +import enum + +from ..models.base import Scenario, Activity +from ..definitions.defaults import DEFAULT_VALUES +from ..utils.conf import Settings + + +class TaskConfig(object): + """Config class for Scenario Analysis.""" + + # scenario data + scenario: Scenario = None + priority_layers: typing.List = [] + priority_layer_groups: typing.List = [] + analysis_activities: typing.List[Activity] = [] + all_activities: typing.List[Activity] = [] + + # config + snapping_enabled: bool = DEFAULT_VALUES.snapping_enabled + snap_layer = "" + snap_rescale = DEFAULT_VALUES.snap_rescale + snap_method = DEFAULT_VALUES.snap_method + pathway_suitability_index = DEFAULT_VALUES.pathway_suitability_index + carbon_coefficient = DEFAULT_VALUES.carbon_coefficient + sieve_enabled = DEFAULT_VALUES.sieve_enabled + sieve_threshold = DEFAULT_VALUES.sieve_threshold + mask_path = "" + mask_layers_paths = "" + + # output selections + ncs_with_carbon = DEFAULT_VALUES.ncs_with_carbon + landuse_project = DEFAULT_VALUES.landuse_project + landuse_normalized = DEFAULT_VALUES.landuse_normalized + landuse_weighted = DEFAULT_VALUES.landuse_weighted + highest_position = DEFAULT_VALUES.highest_position + base_dir = "" + + def __init__( + self, + scenario, + priority_layers, + priority_layer_groups, + analysis_activities, + all_activities, + snapping_enabled=False, + snap_rescale=DEFAULT_VALUES.snap_rescale, + snap_method=DEFAULT_VALUES.snap_method, + pathway_suitability_index=DEFAULT_VALUES.pathway_suitability_index, # noqa + carbon_coefficient=DEFAULT_VALUES.carbon_coefficient, + sieve_enabled=DEFAULT_VALUES.sieve_enabled, + sieve_threshold=DEFAULT_VALUES.sieve_threshold, + ncs_with_carbon=DEFAULT_VALUES.ncs_with_carbon, + landuse_project=DEFAULT_VALUES.landuse_project, + landuse_normalized=DEFAULT_VALUES.landuse_normalized, + landuse_weighted=DEFAULT_VALUES.landuse_weighted, + highest_position=DEFAULT_VALUES.highest_position, + base_dir="", + ) -> None: + """Initialize analysis task configuration. + + :param scenario: scenario object + :type scenario: Scenario + + :param priority_layers: list of priority layer dict + :type priority_layers: List + + :param priority_layer_groups: List of priority layer group dict + :type priority_layer_groups: List + + :param analysis_activities: scenario activities + :type analysis_activities: List[Activity] + + :param all_activities: every activities from main config + :type all_activities: List[Activity] + + :param snapping_enabled: enable snapping, defaults to False + :type snapping_enabled: bool, optional + + :param snap_rescale: Enable snap rescale, + defaults to DEFAULT_VALUES.snap_rescale + :type snap_rescale: bool, optional + + :param snap_method: Snap method, + defaults to DEFAULT_VALUES.snap_method + :type snap_method: int, optional + + :param pathway_suitability_index: Pathway suitability index, + defaults to DEFAULT_VALUES.pathway_suitability_index + :type pathway_suitability_index: int, optional + + :param sieve_enabled: Enable sieve function, + defaults to DEFAULT_VALUES.sieve_enabled + :type sieve_enabled: bool, optional + + :param sieve_threshold: Sieve function threshold, + defaults to DEFAULT_VALUES.sieve_threshold + :type sieve_threshold: float, optional + + :param ncs_with_carbon: Enable output ncs with carbon, + defaults to DEFAULT_VALUES.ncs_with_carbon + :type ncs_with_carbon: bool, optional + + :param landuse_project: Enable output landuse project, + defaults to DEFAULT_VALUES.landuse_project + :type landuse_project: bool, optional + + :param landuse_normalized: Enable output landuse normalized, + defaults to DEFAULT_VALUES.landuse_normalized + :type landuse_normalized: bool, optional + + :param landuse_weighted: Enable output landuse weighted, + defaults to DEFAULT_VALUES.landuse_weighted + :type landuse_weighted: bool, optional + + :param highest_position: Enable output highest position, + defaults to DEFAULT_VALUES.highest_position + :type highest_position: bool, optional + + :param base_dir: base scenario directory, defaults to "" + :type base_dir: str, optional + """ + self.scenario = scenario + self.priority_layers = priority_layers + self.priority_layer_groups = priority_layer_groups + self.analysis_activities = analysis_activities + self.all_activities = all_activities + + self.snapping_enabled = snapping_enabled + self.pathway_suitability_index = pathway_suitability_index + self.carbon_coefficient = carbon_coefficient + self.snap_rescale = snap_rescale + self.snap_method = snap_method + self.sieve_enabled = sieve_enabled + self.sieve_threshold = sieve_threshold + + # output selections + self.ncs_with_carbon = ncs_with_carbon + self.landuse_project = landuse_project + self.landuse_normalized = landuse_normalized + self.landuse_weighted = landuse_weighted + self.highest_position = highest_position + + self.base_dir = base_dir + + def get_activity( + self, activity_uuid: str) -> typing.Union[Activity, None]: + """Retrieve activity by uuid. + + :param activity_uuid: Activity UUID + :type activity_uuid: str + + :return: Activity + :rtype: typing.Union[Activity, None] + """ + activity = None + filtered = [ + act for act in self.all_activities if + str(act.uuid) == activity_uuid + ] + if filtered: + activity = filtered[0] + return activity + + def get_priority_layers(self) -> typing.List: + """Retrieve priority layer list. + + :return: Priority Layers + :rtype: typing.List + """ + return self.priority_layers + + def get_priority_layer(self, identifier) -> typing.Dict: + """Retrieve priority layer by identifier. + + :param identifier: Priority layer ID + :type identifier: str + + :return: Dictionary of priority layer + :rtype: typing.Dict + """ + priority_layer = None + filtered = [f for f in self.priority_layers if f["uuid"] == str(identifier)] + if filtered: + priority_layer = filtered[0] + return priority_layer + + def get_value(self, attr_name: enum.Enum, default=None): + """Get attribute value by name. + + :param attr_name: Settings enum + :type attr_name: enum.Enum + + :param default: Default value if not found, defaults to None + :type default: any, optional + + :return: Attribute value + :rtype: any + """ + if attr_name == Settings.BASE_DIR: + return self.base_dir + return getattr(self, attr_name.value, default) + + def to_dict(self) -> dict: + """Generate dictionary of TaskConfig. + + :return: Dictionary of task config + :rtype: dict + """ + input_dict = { + "scenario_name": self.scenario.name, + "scenario_desc": self.scenario.description, + "extent": self.scenario.extent.bbox, + "snapping_enabled": self.snapping_enabled, + "snap_layer": self.snap_layer, + "snap_rescale": self.snap_rescale, + "snap_method": self.snap_method, + "pathway_suitability_index": self.pathway_suitability_index, + "carbon_coefficient": self.carbon_coefficient, + "sieve_enabled": self.sieve_enabled, + "sieve_threshold": self.sieve_threshold, + "mask_path": self.mask_path, + "mask_layers_paths": self.mask_layers_paths, + "priority_layers": self.priority_layers, + "priority_layer_groups": self.priority_layer_groups, + "activities": [], + "ncs_with_carbon": self.ncs_with_carbon, + "landuse_project": self.landuse_project, + "landuse_normalized": self.landuse_normalized, + "landuse_weighted": self.landuse_weighted, + "highest_position": self.highest_position, + "base_dir": self.base_dir, + } + for activity in self.scenario.activities: + activity_dict = { + "uuid": str(activity.uuid), + "name": activity.name, + "description": activity.description, + "path": activity.path, + "layer_type": activity.layer_type, + "user_defined": activity.user_defined, + "pathways": [], + "priority_layers": activity.priority_layers, + "layer_styles": activity.layer_styles, + } + for pathway in activity.pathways: + activity_dict["pathways"].append( + { + "uuid": str(pathway.uuid), + "name": pathway.name, + "description": pathway.description, + "path": pathway.path, + "layer_type": pathway.layer_type, + "carbon_paths": pathway.carbon_paths, + } + ) + input_dict["activities"].append(activity_dict) + return input_dict diff --git a/cplus_core/data/default/activities.json b/cplus_core/data/default/activities.json deleted file mode 100644 index 439dd8d..0000000 --- a/cplus_core/data/default/activities.json +++ /dev/null @@ -1,334 +0,0 @@ -{ - "activities": [ - { - "uuid": "a0b8fd2d-1259-4141-9ad6-d4369cf0dfd4", - "name": "Agroforestry", - "description": " Agroforestry is an integrated land use system that combines the cultivation of trees with agricultural crops and/or livestock. It promotes sustainable land management, biodiversity conservation, soil health improvement, and diversified income sources for farmers.", - "pwls_ids": [ - "c931282f-db2d-4644-9786-6720b3ab206a", - "fce41934-5196-45d5-80bd-96423ff0e74e", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20" - ], - "style": { - "scenario_layer": { - "color": "#d80007", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "Reds"} - } - }, - { - "uuid": "1c8db48b-717b-451b-a644-3af1bee984ea", - "name": "Alien Plant Removal", - "description": "This model involves the removal of invasive alien plant species that negatively impact native ecosystems. By eradicating these plants, natural habitats can be restored, allowing native flora and fauna to thrive.", - "pwls_ids": [ - "c931282f-db2d-4644-9786-6720b3ab206a", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20", - "3c155210-ccd8-404b-bbe8-b1433d6158a2", - "9f6c8b8f-0648-44ca-b943-58fab043f559", - "9291a5d9-d1cd-44c2-8fc3-2b3b20f80572" - ], - "style": { - "scenario_layer": { - "color": "#6f6f6f", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "Greys"} - } - }, - { - "uuid": "de9597b2-f082-4299-9620-1da3bad8ab62", - "name": "Applied Nucleation", - "description": " Applied nucleation is a technique that jump-starts the restoration process by creating focal points of vegetation growth within degraded areas. These 'nuclei' serve as centers for biodiversity recovery, attracting seeds, dispersers, and other ecological processes, ultimately leading to the regeneration of the surrounding landscape.", - "pwls_ids": [ - "c931282f-db2d-4644-9786-6720b3ab206a", - "fce41934-5196-45d5-80bd-96423ff0e74e", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20" - ], - "style": { - "scenario_layer": { - "color": "#81c4ff", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "PuBu"} - } - }, - { - "uuid": "40f04ea6-1f91-4695-830a-7d46f821f5db", - "name": "Assisted Natural Regeneration", - "description": " This model focuses on facilitating the natural regeneration of forests and degraded lands by removing barriers (such as alien plants or hard crusted soils) and providing support for native plant species to regrow. It involves activities such as removing competing vegetation, protecting young seedlings, and restoring ecosystem functions.", - "pwls_ids": [ - "fce41934-5196-45d5-80bd-96423ff0e74e", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20", - "85cd441e-fa3d-46e4-add9-973ba58f8bd4", - "5e41f4fa-3d7f-41aa-bee7-b9e9d08b56db", - "86c3dfc5-58d7-4ebd-a851-3b65a6bf5edd", - "620d5d7d-c452-498f-b848-b206a76891cd" - ], - "style": { - "scenario_layer": { - "color": "#e8ec18", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "YlOrRd"} - } - }, - { - "uuid": "43f96ed8-cd2f-4b91-b6c8-330d3b93bcc1", - "name": "Avoided Deforestation and Degradation", - "description": " This model focuses on preventing the conversion of forested areas into other land uses and minimizing degradation of existing forests. It involves implementing measures to protect and sustainably manage forests, preserving their biodiversity, carbon sequestration potential, and ecosystem services.", - "pwls_ids": [ - "f5687ced-af18-4cfc-9bc3-8006e40420b6", - "fce41934-5196-45d5-80bd-96423ff0e74e", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20" - ], - "style": { - "scenario_layer": { - "color": "#ff4c84", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "RdPu"} - } - }, - { - "uuid": "c3c5a381-2b9f-4ddc-8a77-708239314fb6", - "name": "Avoided Wetland Conversion/Restoration", - "description": " This model aims to prevent the conversion of wetland ecosystems into other land uses and, where possible, restore degraded wetlands. It involves implementing conservation measures, such as land-use planning, regulatory frameworks, and restoration efforts, to safeguard the ecological functions and biodiversity of wetland habitats", - "pwls_ids": [ - "f5687ced-af18-4cfc-9bc3-8006e40420b6", - "fce41934-5196-45d5-80bd-96423ff0e74e", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20" - ], - "style": { - "scenario_layer": { - "color": "#1f31d3", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "Blues"} - } - }, - { - "uuid": "3defbd0e-2b12-4ab2-a7d4-a035152396a7", - "name": "Bioproducts", - "description": " The bioproducts model focuses on utilizing natural resources sustainably to create value-added products. It involves the development and production of renewable and biodegradable materials, such as biofuels, bio-based chemicals, and bio-based materials, to reduce reliance on fossil fuels and promote a more sustainable economy.", - "pwls_ids": [ - "c931282f-db2d-4644-9786-6720b3ab206a", - "fef3c7e4-0cdf-477f-823b-a99da42f931e", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20", - "fb92cac1-7744-4b11-8238-4e1da97650e0", - "9e5cff3f-73e7-4734-b76a-2a9f0536fa27", - "c5b1b81e-e1ae-41ec-adeb-7388f7597156", - "3872be6d-f791-41f7-b031-b85173e41d5e" - ], - "style": { - "scenario_layer": { - "color": "#67593f", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "BrBG"} - } - }, - { - "uuid": "22f9e555-0356-4b18-b292-c2d516dcdba5", - "name": "Bush Thinning", - "description": "Bush thinning refers to the controlled removal of excess woody vegetation in certain ecosystems and using that biomass to brush pack bare soil areas to promote regrowth of grass. This practice helps restore natural balance, prevent overgrowth, and enhance biodiversity.", - "pwls_ids": [ - "c931282f-db2d-4644-9786-6720b3ab206a", - "fef3c7e4-0cdf-477f-823b-a99da42f931e", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20", - "e1a801c5-7f77-4746-be34-0138b62ff25c", - "478b0729-a507-4729-b1e4-b2bea7e161fd", - "5f329f53-31ff-4039-b0ec-a8d174a50866", - "5bcebbe2-7035-4d81-9817-0b4db8aa63e2" - ], - "style": { - "scenario_layer": { - "color": "#30ff01", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "BuGn"} - } - }, - { - "uuid": "177f1f27-cace-4f3e-9c3c-ef2cf54fc283", - "name": "Direct Tree Seeding", - "description": " This model involves planting tree seeds directly into the ground, allowing them to grow and establish without the need for nursery cultivation. It is a cost-effective and environmentally friendly approach to reforestation and afforestation efforts, promoting forest restoration and carbon sequestration.", - "pwls_ids": [ - "fce41934-5196-45d5-80bd-96423ff0e74e", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20" - ], - "style": { - "scenario_layer": { - "color": "#bd6b70", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "PuRd"} - } - }, - { - "uuid": "d9d00a77-3db1-4390-944e-09b27bcbb981", - "name": "Livestock Rangeland Management", - "description": "This model focuses on sustainable management practices for livestock grazing on rangelands. It includes rotational grazing, monitoring of vegetation health, and implementing grazing strategies that promote biodiversity, soil health, and sustainable land use.", - "pwls_ids": [ - "c931282f-db2d-4644-9786-6720b3ab206a", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20", - "fee0b421-805b-4bd9-a629-06586a760405", - "38a33633-9198-4b55-a424-135a4d522973", - "88dc8ff3-e61f-4a48-8f9b-5791efb6603f", - "a1bfff8e-fb87-4bca-97fa-a984d9bde712" - ], - "style": { - "scenario_layer": { - "color": "#ffa500", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "YlOrBr"} - } - }, - { - "uuid": "4fbfcb1c-bfd7-4305-b216-7a1077a2ccf7", - "name": "Livestock Market Access", - "description": " This model aims to improve market access for livestock producers practicing sustainable and regenerative farming methods. It involves creating networks, certifications, and partnerships that support the sale of sustainably produced livestock products, promoting economic viability and incentivizing environmentally friendly practices.", - "pwls_ids": [ - "c931282f-db2d-4644-9786-6720b3ab206a", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20" - ], - "style": { - "scenario_layer": { - "color": "#6c0009", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "PRGn"} - } - }, - { - "uuid": "20491092-e665-4ee7-b92f-b0ed864c7312", - "name": "Natural Woodland Livestock Management", - "description": " This model emphasizes the sustainable management of livestock within natural woodland environments. It involves implementing practices that balance livestock grazing with the protection and regeneration of native woodlands, ensuring ecological integrity while meeting livestock production goals.", - "pwls_ids": [ - "c931282f-db2d-4644-9786-6720b3ab206a", - "fce41934-5196-45d5-80bd-96423ff0e74e", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20" - ], - "style": { - "scenario_layer": { - "color": "#007018", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "Greens"} - } - }, - { - "uuid": "1334cc3b-cb7b-46a3-923a-45d9b18d9d56", - "name": "Protected Area Expansion", - "description": "This model involves expanding existing protected areas to protect more grassland, savanna, and forest from converting to an anthropogenic land cover class.", - "pwls_ids": [ - "f5687ced-af18-4cfc-9bc3-8006e40420b6", - "fce41934-5196-45d5-80bd-96423ff0e74e", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20" - ], - "style": { - "scenario_layer": { - "color": "#c27ba0", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "PiYG"} - } - }, - { - "uuid": "1334cc3b-cb7b-46a3-923a-45d9b18d9d56", - "name": "Protected Area Expansion", - "description": "This model involves expanding existing protected areas to protect more grassland, savanna, and forest from converting to an anthropogenic land cover class.", - "pwls_ids": [ - "f5687ced-af18-4cfc-9bc3-8006e40420b6", - "fce41934-5196-45d5-80bd-96423ff0e74e", - "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20" - ], - "style": { - "scenario_layer": { - "color": "#c27ba0", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "PiYG"} - } - }, - { - "uuid": "92054916-e8ea-45a0-992c-b6273d1b75a7", - "name": "Sustainable Crop Farming & Aquaponics", - "description": " This model combines sustainable crop farming practices such as agroecology, Permaculture and aquaponics, a system that integrates aquaculture (fish farming) with hydroponics (soil-less crop cultivation). It enables the production of crops with sustainable practices in a mutually beneficial and resource-efficient manner, reducing water usage and chemical inputs while maximizing productivity.", - "pwls_ids": [ - "f5687ced-af18-4cfc-9bc3-8006e40420b6", - "fce41934-5196-45d5-80bd-96423ff0e74e", - "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "2f76304a-bb73-442c-9c02-ff9945389a20", - "6f7c1494-f73e-4e5e-8411-59676f9fa6e1", - "151668e7-8ffb-4766-9534-09949ab0356b", - "ed1ee71b-e7db-4599-97a9-a97c941a615f", - "307df1f4-206b-4f70-8db4-6505948e2a4e" - ], - "style": { - "scenario_layer": { - "color": "#781a8b", - "style": "solid", - "outline_width": "0", - "outline_color": "35,35,35,0" - }, - "activity_layer": {"color_ramp": "Purples"} - } - } - ] - } - \ No newline at end of file diff --git a/cplus_core/data/default/ncs_pathways.json b/cplus_core/data/default/ncs_pathways.json deleted file mode 100644 index e804d52..0000000 --- a/cplus_core/data/default/ncs_pathways.json +++ /dev/null @@ -1,125 +0,0 @@ -{ - "pathways": [ - { - "uuid": "b187f92f-b85b-45c4-9179-447f7ea114e3", - "name": "Agroforestry", - "description": "Provides additional carbon sequestration in agricultural systems by strategically planting trees in croplands.", - "path": "Final_Agroforestry_Priority_norm.tif", - "layer_type": 0, - "carbon_paths": ["bou_SOC_carbonsum_norm_inverse_null_con_clip.tif"] - }, - { - "uuid": "5fe775ba-0e80-4b70-a53a-1ed874b72da3", - "name": "Alien Plant Removal", - "description": "Alien Plant Class.", - "path": "Final_Alien_Invasive_Plant_priority_norm.tif", - "layer_type": 0, - "carbon_paths": [] - }, - { - "uuid": "bd381140-64f0-43d0-be6c-50120dd6c174", - "name": "Animal Management", - "description": "Provides additional soil carbon sequestration, reduces methane emissions from ruminants, and improves feed efficiency.", - "path": "Final_Animal_Management_Priority_norm.tif", - "layer_type": 0, - "carbon_paths": [ - "bou_SOC_carbonsum_norm_null_con_clip.tif", - "SOC_trend_30m_4_scaled_clip_norm_inverse_null_con_clip.tif" - ] - }, - { - "uuid": "fc36dd06-aea3-4067-9626-2d73916d79b0", - "name": "Avoided Deforestation", - "description": "Avoids carbon emissions by preventing forest conversion in areas with a high risk of deforestation. Forest is defined as indigenous forest regions with tree density exceeding 75% with a canopy over 6m.", - "path": "Final_Avoided_Indigenous_Forest_priority_norm.tif", - "layer_type": 0, - "carbon_paths": ["bou_SOC_carbonsum_norm_null_con_clip.tif"] - }, - { - "uuid": "f7084946-6617-4c5d-97e8-de21059ca0d2", - "name": "Avoided Grassland Conversion", - "description": "Avoids carbon emissions by preventing the conversion of grasslands in areas with a high risk of grassland loss. Grassland is defined as regions with vegetation density less than 10%.", - "path": "Final_Avoided_Grassland_priority_norm.tif", - "layer_type": 0, - "carbon_paths": ["bou_SOC_carbonsum_norm_null_con_clip.tif"] - }, - { - "uuid": "00db44cf-a2e7-428a-86bb-0afedb9719ec", - "name": "Avoided Savanna Woodland Conversion", - "description": "Avoids carbon emissions by preventing the conversion of open woodland in areas with a high risk of open woodland loss. Savanna woodland is defined as savanna regions with open woodlands (vegetation density less than 35% and a tree canopy greater than 2.5m) and natural wooded lands (vegetation density greater than 35% and a tree canopy between 2.5m and 6m).", - "path": "Final_Avoided_OpenWoodland_NaturalWoodedland_priority_norm.tif", - "layer_type": 0, - "carbon_paths": ["bou_SOC_carbonsum_norm_null_con_clip.tif"] - }, - { - "uuid": "7228ecae-8759-448d-b7ea-19366f74ee02", - "name": "Avoided Wetland Conversion", - "description": "Avoids carbon emissions by preventing the conversion of wetlands in areas with a high risk of wetland loss. Wetlands are defined as natural or semi-natural wetlands covered in permanent or seasonal herbaceous vegetation.", - "path": "Final_Avoided_Wetland_priority_norm.tif", - "layer_type": 0, - "carbon_paths": ["bou_SOC_carbonsum_norm_null_con_clip.tif"] - }, - { - "uuid": "5475dd4a-5efc-4fb4-ae90-68ff4102591e", - "name": "Fire Management", - "description": "Provides additional sequestration and avoids carbon emissions by increasing resilience to catastrophic fire.", - "path": "Final_Fire_Management_Priority_norm.tif", - "layer_type": 0, - "carbon_paths": [ - "bou_SOC_carbonsum_norm_null_con_clip.tif", - "SOC_trend_30m_4_scaled_clip_norm_inverse_null_con_clip.tif" - ] - }, - { - "uuid": "bede344c-9317-4c3f-801c-3117cc76be2c", - "name": "Restoration - Forest", - "description": "Provides additional carbon sequestration by converting non-forest into forest in areas where forests are the native cover type. This pathway excludes afforestation, where native non-forest areas are converted to forest. Forest is defined as indigenous forest regions with tree density exceeding 75% with a canopy over 6m.", - "path": "Final_Forest_Restoration_priority_norm.tif", - "layer_type": 0, - "carbon_paths": [ - "bou_SOC_carbonsum_norm_inverse_null_con_clip.tif", - "SOC_trend_30m_4_scaled_clip_norm_inverse_null_con_clip.tif" - ] - }, - { - "uuid": "384863e3-08d1-453b-ac5f-94ad6a6aa1fd", - "name": "Restoration - Savanna", - "description": "Sequesters carbon through the restoration of native grassland and open woodland habitat. This pathway excludes the opportunity to convert non-native savanna regions to savannas. Savanna in this context contains grasslands (vegetation density less than 10%), open woodlands (vegetation density less than 35% and a tree canopy greater than 2.5m), and natural wooded lands (vegetation density greater than 75% and a tree canopy between 2.5m and 6m).", - "path": "Final_Sananna_Restoration_priority_norm.tif", - "layer_type": 0, - "carbon_paths": [ - "bou_SOC_carbonsum_norm_inverse_null_con_clip.tif", - "SOC_trend_30m_4_scaled_clip_norm_inverse_null_con_clip.tif" - ] - }, - { - "uuid": "540470c7-0ed8-48af-8d91-63c15e6d69d7", - "name": "Restoration - Wetland", - "description": "Sequesters carbon through the restoration of wetland habitat. This pathway excludes the opportunity to convert non-native wetland regions to wetlands. Wetlands are defined as natural or semi-natural wetlands covered in permanent or seasonal herbaceous vegetation.", - "path": "Final_Wetland_Restoration_priority_norm.tif", - "layer_type": 0, - "carbon_paths": [ - "bou_SOC_carbonsum_norm_inverse_null_con_clip.tif", - "SOC_trend_30m_4_scaled_clip_norm_inverse_null_con_clip.tif" - ] - }, - { - "uuid": "e6d7d4cd-dd6b-4ad5-b8a6-eab5436a89f1", - "name": "Sustainable Agriculture Crop Farming", - "description": "Change from natural grassland, woodland, forest in 1990 into cropland.", - "path": "Final_Sustinable_Ag_Crop_Farming_priority_norm.tif", - "layer_type": 0, - "carbon_paths": [ - "bou_SOC_carbonsum_norm_null_con_clip.tif" - ] - }, - { - "uuid": "71de0448-46c4-4163-a124-3d88cdcbba42", - "name": "Woody Encroachment Control", - "description": "Gradual woody plant encroachment into non-forest biomes has important negative consequences for ecosystem functioning, carbon balances, and economies.", - "path": "Final_woody_encroachment_norm.tif", - "layer_type": 0 - } - ] - } - \ No newline at end of file diff --git a/cplus_core/data/default/priority_weighting_layers.json b/cplus_core/data/default/priority_weighting_layers.json deleted file mode 100644 index 2c739d8..0000000 --- a/cplus_core/data/default/priority_weighting_layers.json +++ /dev/null @@ -1,60 +0,0 @@ -{ - "layers" :[ - { - "uuid": "c931282f-db2d-4644-9786-6720b3ab206a", - "name": "Social norm", - "description": "Placeholder text for social norm ", - "selected": true, - "path": "social_int_clip_norm.tif" - }, - { - "uuid": "f5687ced-af18-4cfc-9bc3-8006e40420b6", - "name": "Social norm inverse", - "description": "Placeholder text for social norm inverse", - "selected": false, - "path": "social_int_clip_norm_inverse.tif" - }, - { - "uuid": "fef3c7e4-0cdf-477f-823b-a99da42f931e", - "name": "Climate Resilience norm inverse", - "description": "Placeholder text for climate resilience", - "selected": false, - "path": "cccombo_clip_norm_inverse.tif" - }, - { - "uuid": "fce41934-5196-45d5-80bd-96423ff0e74e", - "name": "Climate Resilience norm", - "description": "Placeholder text for climate resilience norm", - "selected": false, - "path": "cccombo_clip_norm.tif" - }, - { - "uuid": "88c1c7dd-c5d1-420c-a71c-a5c595c1c5be", - "name": "Ecological Infrastructure", - "description": "Placeholder text for ecological infrastructure", - "selected": false, - "path": "ei_all_gknp_clip_norm.tif" - }, - { - "uuid": "3e0c7dff-51f2-48c5-a316-15d9ca2407cb", - "name": "Ecological Infrastructure inverse", - "description": "Placeholder text for ecological infrastructure inverse", - "selected": false, - "path": "ei_all_gknp_clip_norm.tif" - }, - { - "uuid": "9ab8c67a-5642-4a09-a777-bd94acfae9d1", - "name": "Biodiversity norm", - "description": "Placeholder text for biodiversity norm", - "selected": false, - "path": "biocombine_clip_norm.tif" - }, - { - "uuid": "c2dddd0f-a430-444a-811c-72b987b5e8ce", - "name": "Biodiversity norm inverse", - "description": "Placeholder text for biodiversity norm inverse", - "selected": false, - "path": "biocombine_clip_norm_inverse.tif" - } - ] -} diff --git a/cplus_core/data/layers/null_raster.tif b/cplus_core/data/layers/null_raster.tif deleted file mode 100644 index 50437df..0000000 Binary files a/cplus_core/data/layers/null_raster.tif and /dev/null differ diff --git a/cplus_core/definitions/defaults.py b/cplus_core/definitions/defaults.py index 0ff9496..9a0410b 100644 --- a/cplus_core/definitions/defaults.py +++ b/cplus_core/definitions/defaults.py @@ -3,16 +3,6 @@ Definitions for all defaults settings """ -import os -import json - -from pathlib import Path - -PILOT_AREA_EXTENT = { - "type": "Polygon", - "coordinates": [30.743498637, 32.069186664, -25.201606226, -23.960197335], -} - DEFAULT_CRS_ID = 4326 SCENARIO_OUTPUT_FILE_NAME = "cplus_scenario_output" @@ -20,83 +10,19 @@ QGIS_GDAL_PROVIDER = "gdal" -# Initiliazing the plugin default data as found in the data directory -priority_layer_path = ( - Path(__file__).parent.parent.resolve() - / "data" - / "default" - / "priority_weighting_layers.json" -) - -with priority_layer_path.open("r") as fh: - priority_layers_dict = json.load(fh) -PRIORITY_LAYERS = priority_layers_dict["layers"] - - -pathways_path = ( - Path(__file__).parent.parent.resolve() / "data" / "default" / "ncs_pathways.json" -) - -with pathways_path.open("r") as fh: - pathways_dict = json.load(fh) -# Path just contains the file name and is relative to {download_folder}/ncs_pathways -DEFAULT_NCS_PATHWAYS = pathways_dict["pathways"] - - -activities_path = ( - Path(__file__).parent.parent.resolve() / "data" / "default" / "activities.json" -) - -with activities_path.open("r") as fh: - models_dict = json.load(fh) - -DEFAULT_ACTIVITIES = models_dict["activities"] - -PRIORITY_GROUPS = [ - { - "uuid": "dcfb3214-4877-441c-b3ef-8228ab6dfad3", - "name": "Biodiversity", - "description": "Placeholder text for bio diversity", - }, - { - "uuid": "8b9fb419-b6b8-40e8-9438-c82901d18cd9", - "name": "Livelihood", - "description": "Placeholder text for livelihood", - }, - { - "uuid": "21a30a80-eb49-4c5e-aff6-558123688e09", - "name": "Climate Resilience", - "description": "Placeholder text for climate resilience ", - }, - { - "uuid": "ae1791c3-93fd-4e8a-8bdf-8f5fced11ade", - "name": "Ecological infrastructure", - "description": "Placeholder text for ecological infrastructure", - }, - { - "uuid": "8cac9e25-98a8-4eae-a257-14a4ef8995d0", - "name": "Policy", - "description": "Placeholder text for policy", - }, - { - "uuid": "3a66c845-2f9b-482c-b9a9-bcfca8395ad5", - "name": "Finance - Years Experience", - "description": "Placeholder text for years of experience", - }, - { - "uuid": "c6dbfe09-b05c-4cfc-8fc0-fb63cfe0ceee", - "name": "Finance - Market Trends", - "description": "Placeholder text for market trends", - }, - { - "uuid": "3038cce0-3470-4b09-bb2a-f82071fe57fd", - "name": "Finance - Net Present value", - "description": "Placeholder text for net present value", - }, - { - "uuid": "3b2c7421-f879-48ef-a973-2aa3b1390694", - "name": "Finance - Carbon", - "description": "Placeholder text for finance carbon", - }, -] +class DEFAULT_VALUES(object): + """Default values for analysis.""" + + snapping_enabled = False + pathway_suitability_index = 0 + carbon_coefficient = 0.0 + snap_rescale = False + snap_method = 0 + sieve_enabled = False + sieve_threshold = 10.0 + ncs_with_carbon = False + landuse_project = True + landuse_normalized = True + landuse_weighted = True + highest_position = True diff --git a/cplus_core/models/base.py b/cplus_core/models/base.py index 2a1dfd6..e915d7a 100644 --- a/cplus_core/models/base.py +++ b/cplus_core/models/base.py @@ -139,8 +139,22 @@ class LayerModelComponent(BaseModelComponent): def __post_init__(self): """Try to set the layer and layer type properties.""" + if self.layer_uuid: + return self.update_layer_type() + @property + def layer_uuid(self): + """Return Layer UUID for default layer. + + Default layer's path will start with 'cplus://'. + :return: Server Layer UUID + :rtype: str + """ + if self.path.startswith("cplus://"): + return self.path.replace("cplus://", "") + return None + def update_layer_type(self): """Update the layer type if either the layer or path properties have been set. @@ -188,6 +202,8 @@ def is_valid(self) -> bool: invalid or of None type. :rtype: bool """ + if self.layer_uuid: + return True layer = self.to_map_layer() if layer is None: return False @@ -198,8 +214,18 @@ def __eq__(self, other) -> bool: """Uses BaseModelComponent equality test rather than what the dataclass default implementation will provide. """ + if self.layer_uuid: + return self.layer_uuid == other.layer_uuid return super().__eq__(other) + def is_default_layer(self) -> bool: + """Check if layer is a default layer + + :return: True if layer comes from server API + :rtype: bool + """ + return self.layer_uuid is not None + LayerModelComponentType = typing.TypeVar( "LayerModelComponentType", bound=LayerModelComponent @@ -222,6 +248,34 @@ class PriorityLayer(BaseModelComponent): path: str = "" type: PriorityLayerType = PriorityLayerType.DEFAULT + @property + def layer_uuid(self): + """Return Layer UUID for default layer. + + Default layer's path will start with 'cplus://'. + :return: Server Layer UUID + :rtype: str + """ + if self.path.startswith("cplus://"): + return self.path.replace("cplus://", "") + return None + + def __eq__(self, other) -> bool: + """Uses BaseModelComponent equality test rather than + what the dataclass default implementation will provide. + """ + if self.layer_uuid: + return self.layer_uuid == other.layer_uuid + return super().__eq__(other) + + def is_default_layer(self) -> bool: + """Check if layer is a default layer + + :return: True if layer comes from server API + :rtype: bool + """ + return self.layer_uuid is not None + @dataclasses.dataclass class NcsPathway(LayerModelComponent): @@ -256,7 +310,7 @@ def __eq__(self, other: "NcsPathway") -> bool: return True - def add_carbon_path(self, carbon_path: str) -> bool: + def add_carbon_path(self, carbon_path: str, is_default_layer: bool = False) -> bool: """Add a carbon layer path. Checks if the path has already been defined or if it exists @@ -270,8 +324,9 @@ def add_carbon_path(self, carbon_path: str) -> bool: if carbon_path in self.carbon_paths: return False - if not os.path.exists(carbon_path): - return False + if not is_default_layer: + if not os.path.exists(carbon_path): + return False self.carbon_paths.append(carbon_path) @@ -288,7 +343,11 @@ def carbon_layers(self) -> typing.List[QgsRasterLayer]: if the path is not defined. :rtype: list """ - return [QgsRasterLayer(carbon_path) for carbon_path in self.carbon_paths] + return [ + QgsRasterLayer(carbon_path) + for carbon_path in self.carbon_paths + if not carbon_path.startswith("cplus://") + ] def is_carbon_valid(self) -> bool: """Checks if the carbon layers are valid. @@ -333,6 +392,17 @@ class Activity(LayerModelComponent): layer_styles: dict = dataclasses.field(default_factory=dict) style_pixel_value: int = -1 + @classmethod + def from_dict(cls, activity_dict: typing.Dict): + """Create an Activity object from Activity dict.""" + pathways = [] + for pathway in activity_dict["pathways"]: + del pathway["layer_uuid"] + del pathway["carbon_uuids"] + pathways.append(NcsPathway(**pathway)) + activity_dict["pathways"] = pathways + return Activity(**activity_dict) + def __post_init__(self): """Pre-checks on initialization.""" super().__post_init__() @@ -433,7 +503,11 @@ def pw_layers(self) -> typing.List[QgsRasterLayer]: if the path is not defined. :rtype: list """ - return [QgsRasterLayer(layer.get("path")) for layer in self.priority_layers] + return [ + QgsRasterLayer(layer.get("path")) + for layer in self.priority_layers + if layer.get("path") + ] def is_pwls_valid(self) -> bool: """Checks if the priority layers are valid. @@ -574,6 +648,7 @@ class Scenario(BaseModelComponent): weighted_activities: typing.List[Activity] priority_layer_groups: typing.List state: ScenarioState = ScenarioState.IDLE + server_uuid: UUID = None @dataclasses.dataclass diff --git a/cplus_core/models/financial.py b/cplus_core/models/financial.py deleted file mode 100644 index ad1ddf1..0000000 --- a/cplus_core/models/financial.py +++ /dev/null @@ -1,167 +0,0 @@ -# -*- coding: utf-8 -*- - -""" Data models for the financial elements of the tool.""" - -import dataclasses -from enum import IntEnum -import typing - -from .base import Activity - - -@dataclasses.dataclass -class NpvParameters: - """Parameters for computing an activity's NPV.""" - - years: int - discount: float - absolute_npv: float = 0.0 - normalized_npv: float = 0.0 - # Each tuple contains 3 elements i.e. revenue, costs and discount rates - yearly_rates: typing.List[tuple] = dataclasses.field(default_factory=list) - - -@dataclasses.dataclass -class ActivityNpv: - """Mapping of the NPV parameters to the corresponding Activity model.""" - - params: NpvParameters - enabled: bool - activity: typing.Optional[Activity] - - @property - def activity_id(self) -> str: - """Gets the identifier of the activity model. - - :returns: The unique identifier of the activity model else an - empty string if no activity has been set. - """ - if not self.activity: - return "" - - return str(self.activity.uuid) - - @property - def base_name(self) -> str: - """Returns a proposed name for the activity NPV. - - An empty string will be return id the `activity` attribute - is not set. - - :returns: Proposed base name for the activity NPV. - :rtype: str - """ - if self.activity is None: - return "" - - return f"{self.activity.name} NPV Norm" - - -@dataclasses.dataclass -class ActivityNpvCollection: - """Collection for all ActivityNpvMapping configurations that have been - specified by the user. - """ - - minimum_value: float - maximum_value: float - use_computed: bool = True - remove_existing: bool = False - mappings: typing.List[ActivityNpv] = dataclasses.field(default_factory=list) - - def activity_npv(self, activity_identifier: str) -> typing.Optional[ActivityNpv]: - """Gets the mapping of an activity's NPV mapping if defined. - - :param activity_identifier: Unique identifier of an activity whose - NPV mapping is to be retrieved. - :type activity_identifier: str - - :returns: The activity's NPV mapping else None if not found. - :rtype: ActivityNpv - """ - matching_mapping = [ - activity_npv - for activity_npv in self.mappings - if activity_npv.activity_id == activity_identifier - ] - - return None if len(matching_mapping) == 0 else matching_mapping[0] - - def update_computed_normalization_range(self) -> bool: - """Update the minimum and maximum normalization values - based on the absolute values of the existing ActivityNpv - objects. - - Values for disabled activity NPVs will be excluded from - the computation. - - :returns: True if the min/max values were updated else False if - there are no mappings or valid absolute NPV values defined. - """ - if len(self.mappings) == 0: - return False - - valid_npv_values = [ - activity_npv.params.absolute_npv - for activity_npv in self.mappings - if activity_npv.params.absolute_npv is not None and activity_npv.enabled - ] - - if len(valid_npv_values) == 0: - return False - - self.minimum_value = min(valid_npv_values) - self.maximum_value = max(valid_npv_values) - - return True - - def normalize_npvs(self) -> bool: - """Normalize the NPV values of the activities using the specified - normalization range. - - If the absolute NPV values are less than or greater than the - normalization range, then they will be truncated to 0.0 and 1.0 - respectively. To avoid such a situation from occurring, it is recommended - to make sure that the ranges are synchronized using the latest absolute - NPV values hence call `update_computed_normalization_range` before - normalizing the NPVs. - - :returns: True if the NPVs were successfully normalized else False due - to various reasons such as if the minimum value is greater than the - maximum value or if the min/max values are the same. - """ - if self.minimum_value > self.maximum_value: - return False - - norm_range = float(self.maximum_value - self.minimum_value) - - if norm_range == 0.0: - return False - - for activity_npv in self.mappings: - absolute_npv = activity_npv.params.absolute_npv - if not absolute_npv: - continue - - if absolute_npv <= self.minimum_value: - normalized_npv = 0.0 - elif absolute_npv >= self.maximum_value: - normalized_npv = 1.0 - else: - normalized_npv = (absolute_npv - self.minimum_value) / norm_range - - activity_npv.params.normalized_npv = normalized_npv - - return True - - -@dataclasses.dataclass -class ActivityNpvPwl: - """Convenience class that contains parameters for creating - a PWL raster layer. - """ - - npv: ActivityNpv - extent: typing.List[float] - crs: str - pixel_size: float diff --git a/cplus_core/models/helpers.py b/cplus_core/models/helpers.py index dd3e761..455509e 100644 --- a/cplus_core/models/helpers.py +++ b/cplus_core/models/helpers.py @@ -7,259 +7,11 @@ import uuid from .base import ( - BaseModelComponent, - BaseModelComponentType, Activity, LayerModelComponent, LayerModelComponentType, - LayerType, NcsPathway, - SpatialExtent, ) -from ..definitions.constants import ( - ACTIVITY_IDENTIFIER_PROPERTY, - ABSOLUTE_NPV_ATTRIBUTE, - CARBON_PATHS_ATTRIBUTE, - COMPUTED_ATTRIBUTE, - DISCOUNT_ATTRIBUTE, - ENABLED_ATTRIBUTE, - STYLE_ATTRIBUTE, - NAME_ATTRIBUTE, - DESCRIPTION_ATTRIBUTE, - LAYER_TYPE_ATTRIBUTE, - NPV_MAPPINGS_ATTRIBUTE, - MAX_VALUE_ATTRIBUTE, - MIN_VALUE_ATTRIBUTE, - NORMALIZED_NPV_ATTRIBUTE, - PATH_ATTRIBUTE, - PIXEL_VALUE_ATTRIBUTE, - PRIORITY_LAYERS_SEGMENT, - REMOVE_EXISTING_ATTRIBUTE, - USER_DEFINED_ATTRIBUTE, - UUID_ATTRIBUTE, - YEARS_ATTRIBUTE, - YEARLY_RATES_ATTRIBUTE, -) -from ..definitions.defaults import DEFAULT_CRS_ID -from .financial import ActivityNpv, ActivityNpvCollection, NpvParameters - -from ..utils.helper import log - -from qgis.core import ( - QgsCoordinateReferenceSystem, - QgsCoordinateTransform, - QgsProject, - QgsRectangle, -) - - -def model_component_to_dict( - model_component: BaseModelComponentType, uuid_to_str=True -) -> dict: - """Creates a dictionary containing the base attribute - name-value pairs from a model component object. - - :param model_component: Source model component object whose - values are to be mapped to the corresponding - attribute names. - :type model_component: BaseModelComponent - - :param uuid_to_str: Set True to convert the UUID to a - string equivalent, else False. Some serialization engines - such as JSON are unable to handle UUID objects hence the need - to convert to string. - :type uuid_to_str: bool - - :returns: Returns a dictionary item containing attribute - name-value pairs. - :rtype: dict - """ - model_uuid = model_component.uuid - if uuid_to_str: - model_uuid = str(model_uuid) - - return { - UUID_ATTRIBUTE: model_uuid, - NAME_ATTRIBUTE: model_component.name, - DESCRIPTION_ATTRIBUTE: model_component.description, - } - - -def create_model_component( - source_dict: dict, - model_cls: typing.Callable[[uuid.UUID, str, str], BaseModelComponentType], -) -> typing.Union[BaseModelComponentType, None]: - """Factory method for creating and setting attribute values - for a base model component object. - - :param source_dict: Dictionary containing attribute values. - :type source_dict: dict - - :param model_cls: Callable class that will be created based on the - input argument values from the dictionary. - :type model_cls: BaseModelComponent - - :returns: Base model component object with property values - derived from the dictionary. - :rtype: BaseModelComponent - """ - if not issubclass(model_cls, BaseModelComponent): - return None - - return model_cls( - uuid.UUID(source_dict[UUID_ATTRIBUTE]), - source_dict[NAME_ATTRIBUTE], - source_dict[DESCRIPTION_ATTRIBUTE], - ) - - -def create_layer_component( - source_dict, - model_cls: typing.Callable[ - [uuid.UUID, str, str, str, LayerType, bool], LayerModelComponentType - ], -) -> typing.Union[LayerModelComponent, None]: - """Factory method for creating a layer model component using - attribute values defined in a dictionary. - - :param source_dict: Dictionary containing property values. - :type source_dict: dict - - :param model_cls: Callable class that will be created based on the - input argument values from the dictionary. - :type model_cls: LayerModelComponent - - :returns: Layer model component object with property values set - from the dictionary. - :rtype: LayerModelComponent - """ - if UUID_ATTRIBUTE not in source_dict: - return None - - source_uuid = source_dict[UUID_ATTRIBUTE] - if isinstance(source_uuid, str): - source_uuid = uuid.UUID(source_uuid) - - kwargs = {} - if PATH_ATTRIBUTE in source_dict: - kwargs[PATH_ATTRIBUTE] = source_dict[PATH_ATTRIBUTE] - - if LAYER_TYPE_ATTRIBUTE in source_dict: - kwargs[LAYER_TYPE_ATTRIBUTE] = LayerType(int(source_dict[LAYER_TYPE_ATTRIBUTE])) - - if USER_DEFINED_ATTRIBUTE in source_dict: - kwargs[USER_DEFINED_ATTRIBUTE] = bool(source_dict[USER_DEFINED_ATTRIBUTE]) - - return model_cls( - source_uuid, - source_dict[NAME_ATTRIBUTE], - source_dict[DESCRIPTION_ATTRIBUTE], - **kwargs, - ) - - -def create_ncs_pathway(source_dict) -> typing.Union[NcsPathway, None]: - """Factory method for creating an NcsPathway object using - attribute values defined in a dictionary. - - :param source_dict: Dictionary containing property values. - :type source_dict: dict - - :returns: NCS pathway object with property values set - from the dictionary. - :rtype: NcsPathway - """ - ncs = create_layer_component(source_dict, NcsPathway) - - # We are checking because of the various iterations of the attributes - # in the NcsPathway class where some of these attributes might - # be missing. - if CARBON_PATHS_ATTRIBUTE in source_dict: - ncs.carbon_paths = source_dict[CARBON_PATHS_ATTRIBUTE] - - return ncs - - -def create_activity(source_dict) -> typing.Union[Activity, None]: - """Factory method for creating an activity using - attribute values defined in a dictionary. - - :param source_dict: Dictionary containing property values. - :type source_dict: dict - - :returns: activity with property values set - from the dictionary. - :rtype: Activity - """ - activity = create_layer_component(source_dict, Activity) - if PRIORITY_LAYERS_SEGMENT in source_dict.keys(): - activity.priority_layers = source_dict[PRIORITY_LAYERS_SEGMENT] - - # Set style - if STYLE_ATTRIBUTE in source_dict.keys(): - activity.layer_styles = source_dict[STYLE_ATTRIBUTE] - - # Set styling pixel value - if PIXEL_VALUE_ATTRIBUTE in source_dict.keys(): - activity.style_pixel_value = source_dict[PIXEL_VALUE_ATTRIBUTE] - - return activity - - -def layer_component_to_dict( - layer_component: LayerModelComponentType, uuid_to_str=True -) -> dict: - """Creates a dictionary containing attribute - name-value pairs from a layer model component object. - - :param layer_component: Source layer model component object whose - values are to be mapped to the corresponding - attribute names. - :type layer_component: LayerModelComponent - - :param uuid_to_str: Set True to convert the UUID to a - string equivalent, else False. Some serialization engines - such as JSON are unable to handle UUID objects hence the need - to convert to string. - :type uuid_to_str: bool - - :returns: Returns a dictionary item containing attribute - name-value pairs. - :rtype: dict - """ - base_attrs = model_component_to_dict(layer_component, uuid_to_str) - base_attrs[PATH_ATTRIBUTE] = layer_component.path - base_attrs[LAYER_TYPE_ATTRIBUTE] = int(layer_component.layer_type) - base_attrs[USER_DEFINED_ATTRIBUTE] = layer_component.user_defined - - return base_attrs - - -def ncs_pathway_to_dict(ncs_pathway: NcsPathway, uuid_to_str=True) -> dict: - """Creates a dictionary containing attribute - name-value pairs from an NCS pathway object. - - This function has been retained for legacy support. - - :param ncs_pathway: Source NCS pathway object whose - values are to be mapped to the corresponding - attribute names. - :type ncs_pathway: NcsPathway - - :param uuid_to_str: Set True to convert the UUID to a - string equivalent, else False. Some serialization engines - such as JSON are unable to handle UUID objects hence the need - to convert to string. - :type uuid_to_str: bool - - :returns: Returns a dictionary item containing attribute - name-value pairs. - :rtype: dict - """ - base_ncs_dict = layer_component_to_dict(ncs_pathway, uuid_to_str) - base_ncs_dict[CARBON_PATHS_ATTRIBUTE] = ncs_pathway.carbon_paths - - return base_ncs_dict def clone_layer_component( @@ -331,264 +83,3 @@ def clone_activity( activity.pathways = cloned_pathways return activity - - -def copy_layer_component_attributes( - target: LayerModelComponent, source: LayerModelComponent -) -> LayerModelComponent: - """Copies the attribute values of source to target. The uuid - attribute value is not copied as well as the layer attribute. - However, for the latter, the path is copied. - - :param target: Target object whose attribute values will be updated. - :type target: LayerModelComponent - - :param source: Source object whose attribute values will be copied to - the target. - :type source: LayerModelComponent - - :returns: Target object containing the updated attribute values apart - for the uuid whose value will not change. - :rtype: LayerModelComponent - """ - if not isinstance(target, LayerModelComponent) or not isinstance( - source, LayerModelComponent - ): - raise TypeError( - "Source or target objects are not of type 'LayerModelComponent'" - ) - - for f in fields(source): - # Exclude uuid - if f.name == UUID_ATTRIBUTE: - continue - attr_val = getattr(source, f.name) - setattr(target, f.name, attr_val) - - # Force layer to be set/updated - target.update_layer_type() - - return target - - -def extent_to_qgs_rectangle( - spatial_extent: SpatialExtent, -) -> typing.Union[QgsRectangle, None]: - """Returns a QgsRectangle object from the SpatialExtent object. - - If the SpatialExtent is invalid (i.e. less than four items) then it - will return None. - - :param spatial_extent: Spatial extent data model that defines the - scenario bounds. - :type spatial_extent: SpatialExtent - - :returns: QGIS rectangle defining the bounds for the scenario. - :rtype: QgsRectangle - """ - if len(spatial_extent.bbox) < 4: - return None - - return QgsRectangle( - spatial_extent.bbox[0], - spatial_extent.bbox[2], - spatial_extent.bbox[1], - spatial_extent.bbox[3], - ) - - -def extent_to_project_crs_extent( - spatial_extent: SpatialExtent, project: QgsProject = None -) -> typing.Union[QgsRectangle, None]: - """Transforms SpatialExtent model to an QGIS extent based - on the CRS of the given project. - - :param spatial_extent: Spatial extent data model that defines the - scenario bounds. - :type spatial_extent: SpatialExtent - - :param project: Project whose CRS will be used to determine - the values of the output extent. - :type project: QgsProject - - :returns: Output extent in the project's CRS. If the input extent - is invalid, this function will return None. - :rtype: QgsRectangle - """ - input_rect = extent_to_qgs_rectangle(spatial_extent) - if input_rect is None: - return None - - default_crs = QgsCoordinateReferenceSystem.fromEpsgId(DEFAULT_CRS_ID) - if not default_crs.isValid(): - return None - - if project is None: - project = QgsProject.instance() - - target_crs = project.crs() - if default_crs == target_crs: - # No need for transformation - return input_rect - - try: - coordinate_xform = QgsCoordinateTransform(default_crs, project.crs(), project) - return coordinate_xform.transformBoundingBox(input_rect) - except Exception as e: - log(f"{e}, using the default input extent.") - - return input_rect - - -def activity_npv_to_dict(activity_npv: ActivityNpv) -> dict: - """Converts an ActivityNpv object to a dictionary representation. - - :returns: A dictionary containing attribute name-value pairs. - :rtype: dict - """ - return { - YEARS_ATTRIBUTE: activity_npv.params.years, - DISCOUNT_ATTRIBUTE: activity_npv.params.discount, - ABSOLUTE_NPV_ATTRIBUTE: activity_npv.params.absolute_npv, - NORMALIZED_NPV_ATTRIBUTE: activity_npv.params.normalized_npv, - YEARLY_RATES_ATTRIBUTE: activity_npv.params.yearly_rates, - ENABLED_ATTRIBUTE: activity_npv.enabled, - ACTIVITY_IDENTIFIER_PROPERTY: activity_npv.activity_id, - } - - -def create_activity_npv(activity_npv_dict: dict) -> typing.Optional[ActivityNpv]: - """Creates an ActivityNpv object from the equivalent dictionary - representation. - - Please note that the `activity` attribute of the `ActivityNpv` object will be - `None` hence, will have to be set manually by extracting the corresponding `Activity` - from the activity UUID. - - :param activity_npv_dict: Dictionary containing information for deserializing - to the ActivityNpv object. - :type activity_npv_dict: dict - - :returns: ActivityNpv deserialized from the dictionary representation. - :rtype: ActivityNpv - """ - args = [] - if YEARS_ATTRIBUTE in activity_npv_dict: - args.append(activity_npv_dict[YEARS_ATTRIBUTE]) - - if DISCOUNT_ATTRIBUTE in activity_npv_dict: - args.append(activity_npv_dict[DISCOUNT_ATTRIBUTE]) - - if ABSOLUTE_NPV_ATTRIBUTE in activity_npv_dict: - args.append(activity_npv_dict[ABSOLUTE_NPV_ATTRIBUTE]) - - if NORMALIZED_NPV_ATTRIBUTE in activity_npv_dict: - args.append(activity_npv_dict[NORMALIZED_NPV_ATTRIBUTE]) - - if len(args) < 4: - return None - - yearly_rates = [] - if YEARLY_RATES_ATTRIBUTE in activity_npv_dict: - yearly_rates = activity_npv_dict[YEARLY_RATES_ATTRIBUTE] - - npv_params = NpvParameters(*args) - npv_params.yearly_rates = yearly_rates - - npv_enabled = False - if ENABLED_ATTRIBUTE in activity_npv_dict: - npv_enabled = activity_npv_dict[ENABLED_ATTRIBUTE] - - return ActivityNpv(npv_params, npv_enabled, None) - - -def activity_npv_collection_to_dict(activity_collection: ActivityNpvCollection) -> dict: - """Converts the activity NPV collection object to the - dictionary representation. - - :returns: A dictionary containing the attribute name-value pairs - of an activity NPV collection object - :rtype: dict - """ - npv_collection_dict = { - MIN_VALUE_ATTRIBUTE: activity_collection.minimum_value, - MAX_VALUE_ATTRIBUTE: activity_collection.maximum_value, - COMPUTED_ATTRIBUTE: activity_collection.use_computed, - REMOVE_EXISTING_ATTRIBUTE: activity_collection.remove_existing, - } - - mapping_dict = list(map(activity_npv_to_dict, activity_collection.mappings)) - npv_collection_dict[NPV_MAPPINGS_ATTRIBUTE] = mapping_dict - - return npv_collection_dict - - -def create_activity_npv_collection( - activity_collection_dict: dict, reference_activities: typing.List[Activity] = None -) -> typing.Optional[ActivityNpvCollection]: - """Creates an activity NPV collection object from the corresponding - dictionary representation. - - :param activity_collection_dict: Dictionary representation containing - information of an activity NPV collection object. - :type activity_collection_dict: dict - - :param reference_activities: Optional list of activities that will be - used to lookup when deserializing the ActivityNpv objects. - :type reference_activities: list - - :returns: Activity NPV collection object from the dictionary representation - or None if the source dictionary is invalid. - :rtype: ActivityNpvCollection - """ - if len(activity_collection_dict) == 0: - return None - - ref_activities_by_uuid = { - str(activity.uuid): activity for activity in reference_activities - } - - args = [] - - # Minimum value - if MIN_VALUE_ATTRIBUTE in activity_collection_dict: - args.append(activity_collection_dict[MIN_VALUE_ATTRIBUTE]) - - # Maximum value - if MAX_VALUE_ATTRIBUTE in activity_collection_dict: - args.append(activity_collection_dict[MAX_VALUE_ATTRIBUTE]) - - if len(args) < 2: - return None - - activity_npv_collection = ActivityNpvCollection(*args) - - # Use computed - if COMPUTED_ATTRIBUTE in activity_collection_dict: - use_computed = activity_collection_dict[COMPUTED_ATTRIBUTE] - activity_npv_collection.use_computed = use_computed - - # Remove existing - if REMOVE_EXISTING_ATTRIBUTE in activity_collection_dict: - remove_existing = activity_collection_dict[REMOVE_EXISTING_ATTRIBUTE] - activity_npv_collection.remove_existing = remove_existing - - if NPV_MAPPINGS_ATTRIBUTE in activity_collection_dict: - mappings_dict = activity_collection_dict[NPV_MAPPINGS_ATTRIBUTE] - npv_mappings = [] - for md in mappings_dict: - activity_npv = create_activity_npv(md) - if activity_npv is None: - continue - - # Get the corresponding activity from the unique identifier - if ACTIVITY_IDENTIFIER_PROPERTY in md: - activity_id = md[ACTIVITY_IDENTIFIER_PROPERTY] - if activity_id in ref_activities_by_uuid: - ref_activity = ref_activities_by_uuid[activity_id] - activity_npv.activity = ref_activity - npv_mappings.append(activity_npv) - - activity_npv_collection.mappings = npv_mappings - - return activity_npv_collection diff --git a/cplus_core/utils/conf.py b/cplus_core/utils/conf.py index 1e3d7ea..92becdb 100644 --- a/cplus_core/utils/conf.py +++ b/cplus_core/utils/conf.py @@ -1,154 +1,14 @@ # -*- coding: utf-8 -*- """ - Handles storage and retrieval of the plugin QgsSettings. + Handles retrieval of the task config. """ -import contextlib -import dataclasses -import datetime import enum -import json -import os.path -from pathlib import Path -import typing -import uuid - -from qgis.PyQt import QtCore -from qgis.core import QgsSettings - -from ..definitions.defaults import PRIORITY_LAYERS - -from ..definitions.constants import ( - STYLE_ATTRIBUTE, - NCS_CARBON_SEGMENT, - NCS_PATHWAY_SEGMENT, - NPV_COLLECTION_PROPERTY, - PATH_ATTRIBUTE, - PATHWAYS_ATTRIBUTE, - PIXEL_VALUE_ATTRIBUTE, - PRIORITY_LAYERS_SEGMENT, - UUID_ATTRIBUTE, -) - -from ..models.base import ( - Activity, - NcsPathway, - PriorityLayerType, - Scenario, - ScenarioResult, - SpatialExtent, -) -from ..models.financial import ActivityNpvCollection -from ..models.helpers import ( - activity_npv_collection_to_dict, - create_activity, - create_activity_npv_collection, - create_ncs_pathway, - layer_component_to_dict, - ncs_pathway_to_dict, -) - -from ..utils.helper import log, todict, CustomJsonEncoder - - -@contextlib.contextmanager -def qgis_settings(group_root: str, settings=None): - """Context manager to help defining groups when creating QgsSettings. - - :param group_root: Name of the root group for the settings - :type group_root: str - - :param settings: QGIS settings to use - :type settings: QgsSettings - - :yields: Instance of the created settings - :ytype: QgsSettings - """ - if settings is None: - settings = QgsSettings() - settings.beginGroup(group_root) - try: - yield settings - finally: - settings.endGroup() - - -@dataclasses.dataclass -class ScenarioSettings(Scenario): - """Plugin Scenario settings.""" - - @classmethod - def from_qgs_settings(cls, identifier: str, settings: QgsSettings): - """Reads QGIS settings and parses them into a scenario - settings instance with the respective settings values as properties. - - :param identifier: Scenario identifier - :type identifier: str - - :param settings: Scenario identifier - :type settings: QgsSettings - - :returns: Scenario settings object - :rtype: ScenarioSettings - """ - - return cls( - uuid=uuid.UUID(identifier), - name=settings.value("name", None), - description=settings.value("description", None), - extent=[], - activities=[], - weighted_activities=[], - priority_layer_groups=[], - ) - - @classmethod - def get_scenario_extent(cls, identifier): - """Fetches Scenario extent from - the passed scenario settings. - - - :returns: Spatial extent instance extent - :rtype: SpatialExtent - """ - spatial_key = ( - f"{settings_manager._get_scenario_settings_base(identifier)}/extent/spatial" - ) - - with qgis_settings(spatial_key) as settings: - bbox = settings.value("bbox", None) - spatial_extent = SpatialExtent(bbox=bbox) - - return spatial_extent class Settings(enum.Enum): """Plugin settings names""" - DOWNLOAD_FOLDER = "download_folder" - REFRESH_FREQUENCY = "refresh/period" - REFRESH_FREQUENCY_UNIT = "refresh/unit" - REFRESH_LAST_UPDATE = "refresh/last_update" - REFRESH_STATE = "refresh/state" - - # Report settings - REPORT_ORGANIZATION = "report/organization" - REPORT_CONTACT_EMAIL = "report/email" - REPORT_WEBSITE = "report/website" - REPORT_CUSTOM_LOGO = "report/custom_logo" - REPORT_CPLUS_LOGO = "report/cplus_logo" - REPORT_CI_LOGO = "report/ci_logo" - REPORT_LOGO_DIR = "report/logo_dir" - REPORT_FOOTER = "report/footer" - REPORT_DISCLAIMER = "report/disclaimer" - REPORT_LICENSE = "report/license" - REPORT_STAKEHOLDERS = "report/stakeholders" - REPORT_CULTURE_POLICIES = "report/culture_policies" - - # Last selected data directory - LAST_DATA_DIR = "last_data_dir" - LAST_MASK_DIR = "last_mask_dir" - # Advanced settings BASE_DIR = "advanced/base_dir" @@ -185,1135 +45,3 @@ class Settings(enum.Enum): LANDUSE_NORMALIZED = "landuse_normalized" LANDUSE_WEIGHTED = "landuse_weighted" HIGHEST_POSITION = "highest_position" - - # Processing option - PROCESSING_TYPE = "processing_type" - - # DEBUG - DEBUG = "debug" - DEV_MODE = "dev_mode" - BASE_API_URL = "base_api_url" - - -class SettingsManager(QtCore.QObject): - """Manages saving/loading settings for the plugin in QgsSettings.""" - - BASE_GROUP_NAME: str = "cplus_plugin" - SCENARIO_GROUP_NAME: str = "scenarios" - SCENARIO_RESULTS_GROUP_NAME: str = "scenarios_results" - PRIORITY_GROUP_NAME: str = "priority_groups" - PRIORITY_LAYERS_GROUP_NAME: str = "priority_layers" - NCS_PATHWAY_BASE: str = "ncs_pathways" - LAYER_MAPPING_BASE: str = "layer_mapping" - - ACTIVITY_BASE: str = "activities" - - settings = QgsSettings() - - scenarios_settings_updated = QtCore.pyqtSignal() - priority_layers_changed = QtCore.pyqtSignal() - settings_updated = QtCore.pyqtSignal([str, object], [Settings, object]) - - def set_value(self, name: str, value): - """Adds a new setting key and value on the plugin specific settings. - - :param name: Name of setting key - :type name: str - - :param value: Value of the setting - :type value: Any - """ - self.settings.setValue(f"{self.BASE_GROUP_NAME}/{name}", value) - if isinstance(name, Settings): - name = name.value - - self.settings_updated.emit(name, value) - - def get_value(self, name: str, default=None, setting_type=None): - """Gets value of the setting with the passed name. - - :param name: Name of setting key - :type name: str - - :param default: Default value returned when the setting key does not exist - :type default: Any - - :param setting_type: Type of the store setting - :type setting_type: Any - - :returns: Value of the setting - :rtype: Any - """ - if setting_type: - return self.settings.value( - f"{self.BASE_GROUP_NAME}/{name}", default, setting_type - ) - return self.settings.value(f"{self.BASE_GROUP_NAME}/{name}", default) - - def find_settings(self, name): - """Returns the plugin setting keys from the - plugin root group that matches the passed name - - :param name: Setting name to search for - :type name: str - - :returns result: List of the matching settings names - :rtype result: list - """ - - result = [] - with qgis_settings(f"{self.BASE_GROUP_NAME}") as settings: - for settings_name in settings.childKeys(): - if name in settings_name: - result.append(settings_name) - return result - - def remove(self, name): - """Remove the setting with the specified name. - - :param name: Name of the setting key - :type name: str - """ - self.settings.remove(f"{self.BASE_GROUP_NAME}/{name}") - - def delete_settings(self): - """Deletes the all the plugin settings.""" - self.settings.remove(f"{self.BASE_GROUP_NAME}") - - def _get_scenario_settings_base(self, identifier): - """Gets the scenario settings base url. - - :param identifier: Scenario settings identifier - :type identifier: uuid.UUID - - :returns: Scenario settings base group - :rtype: str - """ - return ( - f"{self.BASE_GROUP_NAME}/" - f"{self.SCENARIO_GROUP_NAME}/" - f"{str(identifier)}" - ) - - def _get_scenario_results_settings_base(self, identifier): - """Gets the scenario results settings base url. - - :param identifier: Scenario identifier - :type identifier: uuid.UUID - - :returns: Scenario settings base group - :rtype: str - """ - return ( - f"{self.BASE_GROUP_NAME}/" - f"{self.SCENARIO_RESULTS_GROUP_NAME}" - f"{str(identifier)}" - ) - - def save_scenario(self, scenario_settings): - """Save the passed scenario settings into the plugin settings - - :param scenario_settings: Scenario settings - :type scenario_settings: ScenarioSettings - """ - settings_key = self._get_scenario_settings_base(scenario_settings.uuid) - - self.save_scenario_extent(settings_key, scenario_settings.extent) - - with qgis_settings(settings_key) as settings: - settings.setValue("name", scenario_settings.name) - settings.setValue("description", scenario_settings.description) - settings.setValue("uuid", scenario_settings.uuid) - - def save_scenario_extent(self, key, extent): - """Saves the scenario extent into plugin settings - using the provided settings group key. - - :param key: Scenario extent - :type key: SpatialExtent - - :param extent: QgsSettings group key - :type extent: str - - Args: - extent (SpatialExtent): Scenario extent - key (str): QgsSettings group key - """ - spatial_extent = extent.bbox - - spatial_key = f"{key}/extent/spatial/" - with qgis_settings(spatial_key) as settings: - settings.setValue("bbox", spatial_extent) - - # def get_scenario(self, identifier): - # """Retrieves the scenario that matches the passed identifier. - # - # :param identifier: Scenario identifier - # :type identifier: str - # - # :returns: Scenario settings instance - # :rtype: ScenarioSettings - # """ - # - # settings_key = self._get_scenario_settings_base(identifier) - # with qgis_settings(settings_key) as settings: - # scenario_settings = ScenarioSettings.from_qgs_settings( - # str(identifier), settings - # ) - # return scenario_settings - - def get_scenario(self, scenario_id): - """Retrieves the first scenario that matched the passed scenario id. - - :param scenario_id: Scenario id - :type scenario_id: str - - :returns: Scenario settings instance - :rtype: ScenarioSettings - """ - - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.SCENARIO_GROUP_NAME}" - ) as settings: - for scenario_uuid in settings.childGroups(): - scenario_settings_key = self._get_scenario_settings_base(scenario_uuid) - with qgis_settings(scenario_settings_key) as scenario_settings: - if scenario_uuid == scenario_id: - scenario = ScenarioSettings.from_qgs_settings( - scenario_uuid, scenario_settings - ) - - scenario.extent = scenario.get_scenario_extent(scenario_uuid) - return scenario - return None - - def get_scenarios(self): - """Gets all the available scenarios settings in the plugin. - - :returns: List of the scenario settings instances - :rtype: list - """ - result = [] - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.SCENARIO_GROUP_NAME}" - ) as settings: - for scenario_uuid in settings.childGroups(): - scenario_settings_key = self._get_scenario_settings_base(scenario_uuid) - with qgis_settings(scenario_settings_key) as scenario_settings: - scenario = ScenarioSettings.from_qgs_settings( - scenario_uuid, scenario_settings - ) - scenario.extent = scenario.get_scenario_extent(scenario_uuid) - result.append(scenario) - return result - - def delete_scenario(self, scenario_id): - """Delete the scenario with the passed scenarion id. - - :param scenario_id: Scenario identifier - :type scenario_id: str - """ - - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.SCENARIO_GROUP_NAME}" - ) as settings: - for scenario_identifier in settings.childGroups(): - if str(scenario_identifier) == str(scenario_id): - settings.remove(scenario_identifier) - - def delete_all_scenarios(self): - """Deletes all the plugin scenarios settings.""" - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.SCENARIO_GROUP_NAME}" - ) as settings: - for scenario_name in settings.childGroups(): - settings.remove(scenario_name) - - def save_scenario_result(self, scenario_result, scenario_id): - """Save the scenario results plugin settings - - :param scenario_settings: Scenario settings - :type scenario_settings: ScenarioSettings - """ - settings_key = self._get_scenario_results_settings_base(scenario_id) - - analysis_output = json.dumps(scenario_result.analysis_output) - - with qgis_settings(settings_key) as settings: - settings.setValue("scenario_id", scenario_id) - settings.setValue( - "created_date", - scenario_result.created_date.strftime("%Y_%m_%d_%H_%M_%S"), - ) - settings.setValue("analysis_output", analysis_output) - settings.setValue("output_layer_name", scenario_result.output_layer_name) - settings.setValue("scenario_directory", scenario_result.scenario_directory) - - def get_scenario_result(self, scenario_id): - """Retrieves the scenario result that matched the passed scenario id. - - :param scenario_id: Scenario id - :type scenario_id: str - - :returns: Scenario result - :rtype: ScenarioSettings - """ - with qgis_settings( - f"{self.BASE_GROUP_NAME}/{self.SCENARIO_RESULTS_GROUP_NAME}" - ) as settings: - for result_uuid in settings.childGroups(): - if scenario_id != result_uuid: - continue - scenario_settings_key = self._get_scenario_results_settings_base( - result_uuid - ) - with qgis_settings(scenario_settings_key) as scenario_settings: - created_date = scenario_settings.value("created_date") - analysis_output = scenario_settings.value("analysis_output") - output_layer_name = scenario_settings.value("output_layer_name") - scenario_directory = scenario_settings.value("scenario_directory") - - try: - created_date = datetime.datetime.strptime( - created_date, "%Y_%m_%d_%H_%M_%S" - ) - analysis_output = json.loads(analysis_output) - except Exception as e: - log(f"Problem fetching scenario result, {e}") - return None - - return ScenarioResult( - scenario=None, - created_date=created_date, - analysis_output=analysis_output, - output_layer_name=output_layer_name, - scenario_directory=scenario_directory, - ) - return None - - def get_scenarios_results(self): - """Gets all the saved scenarios results. - - :returns: List of the scenario results - :rtype: list - """ - result = [] - with qgis_settings( - f"{self.BASE_GROUP_NAME}/{self.SCENARIO_RESULTS_GROUP_NAME}" - ) as settings: - for uuid in settings.childGroups(): - scenario_settings_key = self._get_scenario_results_settings_base(uuid) - with qgis_settings(scenario_settings_key) as scenario_settings: - created_date = scenario_settings.value("created_date") - analysis_output = scenario_settings.value("analysis_output") - output_layer_name = scenario_settings.value("output_layer_name") - scenario_directory = scenario_settings.value("scenario_directory") - - try: - created_date = datetime.datetime.strptime( - created_date, "%Y_%m_%d_%H_%M_%S" - ) - analysis_output = json.loads(analysis_output) - except Exception as e: - log(f"Problem fetching scenario result, {e}") - return None - - result.append( - ScenarioResult( - scenario=None, - created_date=created_date, - analysis_output=analysis_output, - output_layer_name=output_layer_name, - scenario_directory=scenario_directory, - ) - ) - return result - - def delete_scenario_result(self, scenario_id): - """Delete the scenario result that contains the scenario id. - - :param scenario_id: Scenario identifier - :type scenario_id: str - """ - - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.SCENARIO_RESULTS_GROUP_NAME}" - ) as settings: - for scenario_identifier in settings.childGroups(): - if str(scenario_identifier) == str(scenario_id): - settings.remove(scenario_identifier) - - def delete_all_scenarios_results(self): - """Deletes all the plugin scenarios results settings.""" - with qgis_settings( - f"{self.BASE_GROUP_NAME}/{self.SCENARIO_GROUP_NAME}/" - f"{self.SCENARIO_RESULTS_GROUP_NAME}" - ) as settings: - for scenario_result in settings.childGroups(): - settings.remove(scenario_result) - - def _get_priority_layers_settings_base(self, identifier) -> str: - """Gets the priority layers settings base url. - - :param identifier: Priority layers settings identifier - :type identifier: uuid.UUID - - :returns: Priority layers settings base group - :rtype: str - """ - return ( - f"{self.BASE_GROUP_NAME}/" - f"{self.PRIORITY_LAYERS_GROUP_NAME}/" - f"{str(identifier)}" - ) - - def get_priority_layer(self, identifier) -> typing.Dict: - """Retrieves the priority layer that matches the passed identifier. - - :param identifier: Priority layers identifier - :type identifier: uuid.UUID - - :returns: Priority layer dict - :rtype: dict - """ - priority_layer = None - - settings_key = self._get_priority_layers_settings_base(identifier) - with qgis_settings(settings_key) as settings: - groups_key = f"{settings_key}/groups" - groups = [] - - if len(settings.childKeys()) <= 0: - return priority_layer - - with qgis_settings(groups_key) as groups_settings: - for name in groups_settings.childGroups(): - group_settings_key = f"{groups_key}/{name}" - with qgis_settings(group_settings_key) as group_settings: - stored_group = {} - stored_group["uuid"] = group_settings.value("uuid") - stored_group["name"] = group_settings.value("name") - stored_group["value"] = group_settings.value("value") - groups.append(stored_group) - - priority_layer = {"uuid": str(identifier)} - priority_layer["name"] = settings.value("name") - priority_layer["description"] = settings.value("description") - priority_layer["path"] = settings.value("path") - priority_layer["selected"] = settings.value("selected", type=bool) - priority_layer["user_defined"] = settings.value( - "user_defined", defaultValue=True, type=bool - ) - priority_layer["type"] = settings.value("type", defaultValue=0, type=int) - priority_layer["groups"] = groups - return priority_layer - - def get_priority_layers(self) -> typing.List: - """Gets all the available priority layers in the plugin. - - :returns: Priority layers list - :rtype: list - """ - priority_layer_list = [] - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_LAYERS_GROUP_NAME}" - ) as settings: - for uuid in settings.childGroups(): - priority_layer_settings = self._get_priority_layers_settings_base(uuid) - with qgis_settings(priority_layer_settings) as priority_settings: - groups_key = f"{priority_layer_settings}/groups" - groups = [] - - with qgis_settings(groups_key) as groups_settings: - for name in groups_settings.childGroups(): - group_settings_key = f"{groups_key}/{name}" - with qgis_settings(group_settings_key) as group_settings: - stored_group = {} - stored_group["uuid"] = group_settings.value("uuid") - stored_group["name"] = group_settings.value("name") - stored_group["value"] = group_settings.value("value") - groups.append(stored_group) - layer = { - "uuid": uuid, - "name": priority_settings.value("name"), - "description": priority_settings.value("description"), - "path": priority_settings.value("path"), - "selected": priority_settings.value("selected", type=bool), - "user_defined": priority_settings.value( - "user_defined", defaultValue=True, type=bool - ), - "type": priority_settings.value( - "type", defaultValue=0, type=int - ), - "groups": groups, - } - priority_layer_list.append(layer) - return priority_layer_list - - def find_layer_by_name(self, name) -> typing.Dict: - """Finds a priority layer setting inside - the plugin QgsSettings by name. - - :param name: Priority layers identifier - :type name: str - - :returns: Priority layers dict - :rtype: dict - """ - found_id = None - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_LAYERS_GROUP_NAME}" - ) as settings: - for layer_id in settings.childGroups(): - layer_settings_key = self._get_priority_layers_settings_base(layer_id) - with qgis_settings(layer_settings_key) as layer_settings: - layer_name = layer_settings.value("name") - if layer_name == name: - found_id = uuid.UUID(layer_id) - break - - return self.get_priority_layer(found_id) if found_id is not None else None - - def find_layers_by_group(self, group) -> typing.List: - """Finds priority layers inside the plugin QgsSettings - that contain the passed group. - - :param group: Priority group name - :type group: str - - :returns: Priority layers list - :rtype: list - """ - layers = [] - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_LAYERS_GROUP_NAME}" - ) as settings: - for layer_id in settings.childGroups(): - priority_layer_settings = self._get_priority_layers_settings_base( - layer_id - ) - with qgis_settings(priority_layer_settings) as priority_settings: - groups_key = f"{priority_layer_settings}/groups" - - with qgis_settings(groups_key) as groups_settings: - for name in groups_settings.childGroups(): - group_settings_key = f"{groups_key}/{name}" - with qgis_settings(group_settings_key) as group_settings: - if group == group_settings.value("name"): - layers.append(self.get_priority_layer(layer_id)) - return layers - - def save_priority_layer(self, priority_layer): - """Save the priority layer into the plugin settings. - Updates the layer with new priority groups. - - Note: Emits priority_layers_changed signal - - :param priority_layer: Priority layer - :type priority_layer: dict - """ - settings_key = self._get_priority_layers_settings_base(priority_layer["uuid"]) - - with qgis_settings(settings_key) as settings: - groups = priority_layer.get("groups", []) - settings.setValue("name", priority_layer["name"]) - settings.setValue("description", priority_layer["description"]) - settings.setValue("path", priority_layer["path"]) - settings.setValue("selected", priority_layer.get("selected", False)) - settings.setValue("user_defined", priority_layer.get("user_defined", True)) - settings.setValue("type", priority_layer.get("type", 0)) - groups_key = f"{settings_key}/groups" - with qgis_settings(groups_key) as groups_settings: - for group_id in groups_settings.childGroups(): - groups_settings.remove(group_id) - for group in groups: - group_key = f"{groups_key}/{group['name']}" - with qgis_settings(group_key) as group_settings: - group_settings.setValue("uuid", group.get("uuid")) - group_settings.setValue("name", group["name"]) - group_settings.setValue("value", group["value"]) - - self.priority_layers_changed.emit() - - def set_current_priority_layer(self, identifier): - """Set current priority layer - - :param identifier: Priority layer identifier - :type identifier: str - """ - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_LAYERS_GROUP_NAME}/" - ) as settings: - for priority_layer in settings.childGroups(): - settings_key = self._get_priority_layers_settings_base(identifier) - with qgis_settings(settings_key) as layer_settings: - layer_settings.setValue( - "selected", str(priority_layer) == str(identifier) - ) - - def delete_priority_layers(self): - """Deletes all the plugin priority weighting layers settings.""" - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_LAYERS_GROUP_NAME}" - ) as settings: - for priority_layer in settings.childGroups(): - settings.remove(priority_layer) - - def delete_priority_layer(self, identifier): - """Removes priority layer that match the passed identifier - - :param identifier: Priority layer identifier - :type identifier: str - """ - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_LAYERS_GROUP_NAME}/" - ) as settings: - for priority_layer in settings.childGroups(): - if str(priority_layer) == str(identifier): - settings.remove(priority_layer) - - def _get_priority_groups_settings_base(self, identifier) -> str: - """Gets the priority group settings base url. - - :param identifier: Priority group settings identifier - :type identifier: str - - :returns: Priority groups settings base group - :rtype: str - - """ - return ( - f"{self.BASE_GROUP_NAME}/" - f"{self.PRIORITY_GROUP_NAME}/" - f"{str(identifier)}" - ) - - def find_group_by_name(self, name) -> typing.Dict: - """Finds a priority group setting inside the plugin QgsSettings by name. - - :param name: Name of the group - :type name: str - - :returns: Priority group - :rtype: typing.Dict - """ - - found_id = None - - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_GROUP_NAME}" - ) as settings: - for group_id in settings.childGroups(): - group_settings_key = self._get_priority_groups_settings_base(group_id) - with qgis_settings(group_settings_key) as group_settings_key: - group_name = group_settings_key.value("name") - if group_name == name: - found_id = uuid.UUID(group_id) - break - - return self.get_priority_group(found_id) - - def get_priority_group(self, identifier) -> typing.Dict: - """Retrieves the priority group that matches the passed identifier. - - :param identifier: Priority group identifier - :type identifier: str - - :returns: Priority group - :rtype: typing.Dict - """ - - if identifier is None: - return None - - settings_key = self._get_priority_groups_settings_base(identifier) - with qgis_settings(settings_key) as settings: - priority_group = {"uuid": identifier} - priority_group["name"] = settings.value("name") - priority_group["value"] = settings.value("value") - priority_group["description"] = settings.value("description") - return priority_group - - def get_priority_groups(self) -> typing.List[typing.Dict]: - """Gets all the available priority groups in the plugin. - - :returns: List of the priority groups instances - :rtype: list - """ - priority_groups = [] - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_GROUP_NAME}" - ) as settings: - for uuid in settings.childGroups(): - priority_layer_settings = self._get_priority_groups_settings_base(uuid) - with qgis_settings(priority_layer_settings) as priority_settings: - group = { - "uuid": uuid, - "name": priority_settings.value("name"), - "value": priority_settings.value("value"), - "description": priority_settings.value("description"), - } - priority_groups.append(group) - return priority_groups - - def save_priority_group(self, priority_group): - """Save the priority group into the plugin settings - - :param priority_group: Priority group - :type priority_group: str - """ - - settings_key = self._get_priority_groups_settings_base(priority_group["uuid"]) - - with qgis_settings(settings_key) as settings: - settings.setValue("name", priority_group["name"]) - settings.setValue("value", priority_group["value"]) - settings.setValue("description", priority_group.get("description")) - - def delete_priority_group(self, identifier): - """Removes priority group that match the passed identifier - - :param identifier: Priority group identifier - :type identifier: str - """ - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_GROUP_NAME}/" - ) as settings: - for priority_group in settings.childGroups(): - if str(priority_group) == str(identifier): - settings.remove(priority_group) - - def delete_priority_groups(self): - """Deletes all the plugin priority groups settings.""" - with qgis_settings( - f"{self.BASE_GROUP_NAME}/" f"{self.PRIORITY_GROUP_NAME}" - ) as settings: - for priority_group in settings.childGroups(): - settings.remove(priority_group) - - def _get_layer_mappings_settings_base(self) -> str: - """Returns the path for Layer Mapping settings. - - :returns: Base path to Layer Mapping group. - :rtype: str - """ - return f"{self.BASE_GROUP_NAME}/{self.LAYER_MAPPING_BASE}" - - def get_all_layer_mapping(self) -> typing.Dict: - """Return all layer mapping.""" - layer_mapping = {} - - layer_mapping_root = self._get_layer_mappings_settings_base() - with qgis_settings(layer_mapping_root) as settings: - keys = settings.childKeys() - for k in keys: - layer_raw = settings.value(k, dict()) - if len(layer_raw) > 0: - try: - layer = json.loads(layer_raw) - layer_mapping[k] = layer - except json.JSONDecodeError: - log("Layer Mapping JSON is invalid") - return layer_mapping - - def get_layer_mapping(self, identifier) -> typing.Dict: - """Retrieves the layer mapping that matches the passed identifier. - - :param identifier: Layer mapping identifier - :type identifier: str path - - :returns: Layer mapping - :rtype: typing.Dict - """ - - layer_mapping = {} - - layer_mapping_root = self._get_layer_mappings_settings_base() - - with qgis_settings(layer_mapping_root) as settings: - layer = settings.value(identifier, dict()) - if len(layer) > 0: - try: - layer_mapping = json.loads(layer) - except json.JSONDecodeError: - log("Layer Mapping JSON is invalid") - return layer_mapping - - def save_layer_mapping(self, input_layer, identifier=None): - """Save the layer mapping into the plugin settings - - :param input_layer: Layer mapping - :type input_layer: dict - :param identifier: file identifier using path - :type identifier: str - """ - - if not identifier: - identifier = input_layer["path"].replace(os.sep, "--") - settings_key = self._get_layer_mappings_settings_base() - - with qgis_settings(settings_key) as settings: - settings.setValue(identifier, json.dumps(input_layer)) - - def remove_layer_mapping(self, identifier): - """Remove layer mapping from settings.""" - self.remove(f"{self.LAYER_MAPPING_BASE}/{identifier}") - - def _get_ncs_pathway_settings_base(self) -> str: - """Returns the path for NCS pathway settings. - - :returns: Base path to NCS pathway group. - :rtype: str - """ - return f"{self.BASE_GROUP_NAME}/" f"{NCS_PATHWAY_SEGMENT}" - - def save_ncs_pathway(self, ncs_pathway: typing.Union[NcsPathway, dict]): - """Saves an NCS pathway object serialized to a json string - indexed by the UUID. - - :param ncs_pathway: NCS pathway object or attribute values - in a dictionary which are then serialized to a JSON string. - :type ncs_pathway: NcsPathway, dict - """ - if isinstance(ncs_pathway, NcsPathway): - ncs_pathway = ncs_pathway_to_dict(ncs_pathway) - - ncs_str = json.dumps(ncs_pathway) - - ncs_uuid = ncs_pathway[UUID_ATTRIBUTE] - ncs_root = self._get_ncs_pathway_settings_base() - - with qgis_settings(ncs_root) as settings: - settings.setValue(ncs_uuid, ncs_str) - - def get_ncs_pathway(self, ncs_uuid: str) -> typing.Union[NcsPathway, None]: - """Gets an NCS pathway object matching the given unique identified. - - :param ncs_uuid: Unique identifier for the NCS pathway object. - :type ncs_uuid: str - - :returns: Returns the NCS pathway object matching the given - identifier else None if not found. - :rtype: NcsPathway - """ - ncs_pathway = None - - ncs_dict = self.get_ncs_pathway_dict(ncs_uuid) - if len(ncs_dict) == 0: - return None - - ncs_pathway = create_ncs_pathway(ncs_dict) - - return ncs_pathway - - def get_ncs_pathway_dict(self, ncs_uuid: str) -> dict: - """Gets an NCS pathway attribute values as a dictionary. - - :param ncs_uuid: Unique identifier for the NCS pathway object. - :type ncs_uuid: str - - :returns: Returns the NCS pathway attribute values matching the given - identifier else an empty dictionary if not found. - :rtype: dict - """ - ncs_pathway_dict = {} - - ncs_root = self._get_ncs_pathway_settings_base() - - with qgis_settings(ncs_root) as settings: - ncs_model = settings.value(ncs_uuid, dict()) - if len(ncs_model) > 0: - try: - ncs_pathway_dict = json.loads(ncs_model) - except json.JSONDecodeError: - log("NCS pathway JSON is invalid") - - return ncs_pathway_dict - - def get_all_ncs_pathways(self) -> typing.List[NcsPathway]: - """Get all the NCS pathway objects stored in settings. - - :returns: Returns all the NCS pathway objects. - :rtype: list - """ - ncs_pathways = [] - - ncs_root = self._get_ncs_pathway_settings_base() - - with qgis_settings(ncs_root) as settings: - keys = settings.childKeys() - for k in keys: - ncs_pathway = self.get_ncs_pathway(k) - if ncs_pathway is not None: - ncs_pathways.append(ncs_pathway) - - return sorted(ncs_pathways, key=lambda ncs: ncs.name) - - def update_ncs_pathways(self): - """Updates the path attribute of all NCS pathway settings - based on the BASE_DIR settings to reflect the absolute path - of each NCS pathway layer. - If BASE_DIR is empty then the NCS pathway settings will not - be updated. - """ - ncs_pathways = self.get_all_ncs_pathways() - for ncs in ncs_pathways: - self.update_ncs_pathway(ncs) - - def update_ncs_pathway(self, ncs_pathway: NcsPathway): - """Updates the attributes of the NCS pathway object - in settings. On the path, the BASE_DIR in settings - is used to reflect the absolute path of each NCS - pathway layer. If BASE_DIR is empty then the NCS - pathway setting will not be updated, this only applies - for default pathways. - - :param ncs_pathway: NCS pathway object to be updated. - :type ncs_pathway: NcsPathway - """ - base_dir = self.get_value(Settings.BASE_DIR) - if not base_dir: - return - - # Pathway location for default pathway - if not ncs_pathway.user_defined: - p = Path(ncs_pathway.path) - # Only update if path does not exist otherwise - # fallback to check under base directory. - if not p.exists(): - abs_path = f"{base_dir}/{NCS_PATHWAY_SEGMENT}/" f"{p.name}" - abs_path = str(os.path.normpath(abs_path)) - ncs_pathway.path = abs_path - - # Carbon location - abs_carbon_paths = [] - for cb_path in ncs_pathway.carbon_paths: - cp = Path(cb_path) - # Similarly, if the given carbon path does not exist then try - # to use the default one in the ncs_carbon directory. - if not cp.exists(): - abs_carbon_path = f"{base_dir}/{NCS_CARBON_SEGMENT}/" f"{cp.name}" - abs_carbon_path = str(os.path.normpath(abs_carbon_path)) - abs_carbon_paths.append(abs_carbon_path) - else: - abs_carbon_paths.append(cb_path) - - ncs_pathway.carbon_paths = abs_carbon_paths - - # Remove then re-insert - self.remove_ncs_pathway(str(ncs_pathway.uuid)) - self.save_ncs_pathway(ncs_pathway) - - def remove_ncs_pathway(self, ncs_uuid: str): - """Removes an NCS pathway settings entry using the UUID. - - :param ncs_uuid: Unique identifier of the NCS pathway entry - to removed. - :type ncs_uuid: str - """ - if self.get_ncs_pathway(ncs_uuid) is not None: - self.remove(f"{self.NCS_PATHWAY_BASE}/{ncs_uuid}") - - def _get_activity_settings_base(self) -> str: - """Returns the path for activity settings. - - :returns: Base path to activity group. - :rtype: str - """ - return f"{self.BASE_GROUP_NAME}/" f"{self.ACTIVITY_BASE}" - - def save_activity(self, activity: typing.Union[Activity, dict]): - """Saves an activity object serialized to a json string - indexed by the UUID. - - :param activity: Activity object or attribute - values in a dictionary which are then serialized to a JSON string. - :type activity: Activity, dict - """ - if isinstance(activity, Activity): - priority_layers = activity.priority_layers - layer_styles = activity.layer_styles - style_pixel_value = activity.style_pixel_value - - ncs_pathways = [] - for ncs in activity.pathways: - ncs_pathways.append(str(ncs.uuid)) - - activity = layer_component_to_dict(activity) - activity[PRIORITY_LAYERS_SEGMENT] = priority_layers - activity[PATHWAYS_ATTRIBUTE] = ncs_pathways - activity[STYLE_ATTRIBUTE] = layer_styles - activity[PIXEL_VALUE_ATTRIBUTE] = style_pixel_value - - if isinstance(activity, dict): - priority_layers = [] - if activity.get("pwls_ids") is not None: - for layer_id in activity.get("pwls_ids", []): - layer = self.get_priority_layer(layer_id) - priority_layers.append(layer) - if len(priority_layers) > 0: - activity[PRIORITY_LAYERS_SEGMENT] = priority_layers - - activity_str = json.dumps(todict(activity), cls=CustomJsonEncoder) - - activity_uuid = activity[UUID_ATTRIBUTE] - activity_root = self._get_activity_settings_base() - - with qgis_settings(activity_root) as settings: - settings.setValue(activity_uuid, activity_str) - - def get_activity(self, activity_uuid: str) -> typing.Union[Activity, None]: - """Gets an activity object matching the given unique - identifier. - - :param activity_uuid: Unique identifier of the - activity object. - :type activity_uuid: str - - :returns: Returns the activity object matching the given - identifier else None if not found. - :rtype: Activity - """ - activity = None - - activity_root = self._get_activity_settings_base() - - with qgis_settings(activity_root) as settings: - activity = settings.value(activity_uuid, None) - ncs_uuids = [] - if activity is not None: - activity_dict = {} - try: - activity_dict = json.loads(activity) - except json.JSONDecodeError: - log("Activity JSON is invalid.") - - if PATHWAYS_ATTRIBUTE in activity_dict: - ncs_uuids = activity_dict[PATHWAYS_ATTRIBUTE] - - activity = create_activity(activity_dict) - if activity is not None: - for ncs_uuid in ncs_uuids: - ncs = self.get_ncs_pathway(ncs_uuid) - if ncs is not None: - activity.add_ncs_pathway(ncs) - - return activity - - def find_activity_by_name(self, name) -> typing.Dict: - """Finds an activity setting inside - the plugin QgsSettings that equals or matches the name. - - :param name: Activity name. - :type name: str - - :returns: Activity object. - :rtype: Activity - """ - for activity in self.get_all_activities(): - model_name = activity.name - trimmed_name = model_name.replace(" ", "_") - if model_name == name or model_name in name or trimmed_name in name: - return activity - - return None - - def get_all_activities(self) -> typing.List[Activity]: - """Get all the activity objects stored in settings. - - :returns: Returns all the activity objects. - :rtype: list - """ - activities = [] - - activity_root = self._get_activity_settings_base() - - with qgis_settings(activity_root) as settings: - keys = settings.childKeys() - for k in keys: - activity = self.get_activity(k) - if activity is not None: - activities.append(activity) - - return sorted(activities, key=lambda activity: activity.name) - - def update_activity(self, activity: Activity): - """Updates the attributes of the activity object - in settings. On the path, the BASE_DIR in settings - is used to reflect the absolute path of each NCS - pathway layer. If BASE_DIR is empty then the NCS - pathway setting will not be updated. - - :param activity: Activity object to be updated. - :type activity: Activity - """ - base_dir = self.get_value(Settings.BASE_DIR) - - if base_dir: - # PWLs path update - for layer in activity.priority_layers: - if layer in PRIORITY_LAYERS and base_dir not in layer.get( - PATH_ATTRIBUTE - ): - abs_pwl_path = ( - f"{base_dir}/{PRIORITY_LAYERS_SEGMENT}/" - f"{layer.get(PATH_ATTRIBUTE)}" - ) - abs_pwl_path = str(os.path.normpath(abs_pwl_path)) - layer[PATH_ATTRIBUTE] = abs_pwl_path - - # Remove then re-insert - self.remove_activity(str(activity.uuid)) - self.save_activity(activity) - - def update_activities(self): - """Updates the attributes of the existing activities.""" - activities = self.get_all_activities() - - for activity in activities: - self.update_activity(activity) - - def remove_activity(self, activity_uuid: str): - """Removes an activity settings entry using the UUID. - - :param activity_uuid: Unique identifier of the activity - to be removed. - :type activity_uuid: str - """ - if self.get_activity(activity_uuid) is not None: - self.remove(f"{self.ACTIVITY_BASE}/{activity_uuid}") - - def get_npv_collection(self) -> typing.Optional[ActivityNpvCollection]: - """Gets the collection of NPV mappings of activities. - - :returns: The collection of activity NPV mappings or None - if not defined. - :rtype: ActivityNpvCollection - """ - npv_collection_str = self.get_value(NPV_COLLECTION_PROPERTY, None) - if not npv_collection_str: - return None - - npv_collection_dict = {} - try: - npv_collection_dict = json.loads(npv_collection_str) - except json.JSONDecodeError: - log("ActivityNPVCollection JSON is invalid.") - - return create_activity_npv_collection( - npv_collection_dict, self.get_all_activities() - ) - - def save_npv_collection(self, npv_collection: ActivityNpvCollection): - """Saves the activity NPV collection in the settings as a serialized - JSON string. - - :param npv_collection: Activity NPV collection serialized to a JSON string. - :type npv_collection: ActivityNpvCollection - """ - npv_collection_dict = activity_npv_collection_to_dict(npv_collection) - npv_collection_str = json.dumps(npv_collection_dict) - self.set_value(NPV_COLLECTION_PROPERTY, npv_collection_str) - - -settings_manager = SettingsManager() diff --git a/cplus_core/utils/helper.py b/cplus_core/utils/helper.py index 279e3d1..138308b 100644 --- a/cplus_core/utils/helper.py +++ b/cplus_core/utils/helper.py @@ -10,26 +10,18 @@ import datetime from pathlib import Path from uuid import UUID +from enum import Enum from qgis.PyQt import QtCore from qgis.core import ( - Qgis, QgsCoordinateReferenceSystem, QgsCoordinateTransform, - QgsCoordinateTransformContext, - QgsDistanceArea, - QgsMessageLog, - QgsProcessingFeedback, QgsProject, - QgsProcessing, QgsRasterLayer, - QgsUnitTypes, ) from qgis.analysis import QgsAlignRaster -from qgis import processing - def tr(message): """Get the translation for a string using Qt translation API. @@ -45,38 +37,6 @@ def tr(message): return QtCore.QCoreApplication.translate("QgisCplus", message) -def log( - message: str, - name: str = "qgis_cplus", - info: bool = True, - notify: bool = True, -): - """Logs the message into QGIS logs using qgis_cplus as the default - log instance. - If notify_user is True, user will be notified about the log. - - :param message: The log message - :type message: str - - :param name: Name of te log instance, qgis_cplus is the default - :type message: str - - :param info: Whether the message is about info or a - warning - :type info: bool - - :param notify: Whether to notify user about the log - :type notify: bool - """ - level = Qgis.Info if info else Qgis.Warning - QgsMessageLog.logMessage( - message, - name, - level=level, - notifyUser=notify, - ) - - def clean_filename(filename): """Creates a safe filename by removing operating system invalid filename characters. @@ -96,79 +56,6 @@ def clean_filename(filename): return filename -def calculate_raster_value_area( - layer: QgsRasterLayer, band_number: int = 1, feedback: QgsProcessingFeedback = None -) -> dict: - """Calculates the area of value pixels for the given band in a raster layer. - - Please note that this function will run in the main application thread hence - for best results, it is recommended to execute it in a background process - if part of a bigger workflow. - - :param layer: Input layer whose area for value pixels is to be calculated. - :type layer: QgsRasterLayer - - :param band_number: Band number to compute area, default is band one. - :type band_number: int - - :param feedback: Feedback object for progress during area calculation. - :type feedback: QgsProcessingFeedback - - :returns: A dictionary containing the pixel value as - the key and the corresponding area in hectares as the value for all the pixels - in the raster otherwise returns a empty dictionary if the raster is invalid - or if it is empty. - :rtype: float - """ - if not layer.isValid(): - log("Invalid layer for raster area calculation.", info=False) - return {} - - algorithm_name = "native:rasterlayeruniquevaluesreport" - params = { - "INPUT": layer, - "BAND": band_number, - "OUTPUT_TABLE": "TEMPORARY_OUTPUT", - "OUTPUT_HTML_FILE": QgsProcessing.TEMPORARY_OUTPUT, - } - - algorithm_result = processing.run(algorithm_name, params, feedback=feedback) - - # Get number of pixels with values - total_pixel_count = algorithm_result["TOTAL_PIXEL_COUNT"] - if total_pixel_count == 0: - log("Input layer for raster area calculation is empty.", info=False) - return {} - - output_table = algorithm_result["OUTPUT_TABLE"] - if output_table is None: - log("Unique values raster table could not be retrieved.", info=False) - return {} - - area_calc = QgsDistanceArea() - crs = layer.crs() - area_calc.setSourceCrs(crs, QgsCoordinateTransformContext()) - if crs is not None: - # Use ellipsoid calculation if available - area_calc.setEllipsoid(crs.ellipsoidAcronym()) - - version = Qgis.versionInt() - if version < 33000: - unit_type = QgsUnitTypes.AreaUnit.AreaHectares - else: - unit_type = Qgis.AreaUnit.Hectares - - pixel_areas = {} - features = output_table.getFeatures() - for f in features: - pixel_value = f.attribute(0) - area = f.attribute(2) - pixel_value_area = area_calc.convertAreaMeasurement(area, unit_type) - pixel_areas[pixel_value] = pixel_value_area - - return pixel_areas - - def transform_extent(extent, source_crs, dest_crs): """Transforms the passed extent into the destination crs @@ -182,113 +69,13 @@ def transform_extent(extent, source_crs, dest_crs): :type dest_crs: QgsCoordinateReferenceSystem """ - transform = QgsCoordinateTransform(source_crs, dest_crs, QgsProject.instance()) + transform = QgsCoordinateTransform( + source_crs, dest_crs, QgsProject.instance()) transformed_extent = transform.transformBoundingBox(extent) return transformed_extent -def align_rasters( - input_raster_source, - reference_raster_source, - extent=None, - output_dir=None, - rescale_values=False, - resample_method=0, -): - """ - Based from work on https://github.com/inasafe/inasafe/pull/2070 - Aligns the passed raster files source and save the results into new files. - - :param input_raster_source: Input layer source - :type input_raster_source: str - - :param reference_raster_source: Reference layer source - :type reference_raster_source: str - - :param extent: Clip extent - :type extent: list - - :param output_dir: Absolute path of the output directory for the snapped - layers - :type output_dir: str - - :param rescale_values: Whether to rescale pixel values - :type rescale_values: bool - - :param resample_method: Method to use when resampling - :type resample_method: QgsAlignRaster.ResampleAlg - - """ - - try: - snap_directory = os.path.join(output_dir, "snap_layers") - - BaseFileUtils.create_new_dir(snap_directory) - - input_path = Path(input_raster_source) - - input_layer_output = os.path.join( - f"{snap_directory}", f"{input_path.stem}_{str(uuid.uuid4())[:4]}.tif" - ) - - BaseFileUtils.create_new_file(input_layer_output) - - align = QgsAlignRaster() - lst = [ - QgsAlignRaster.Item(input_raster_source, input_layer_output), - ] - - resample_method_value = QgsAlignRaster.ResampleAlg.RA_NearestNeighbour - - try: - resample_method_value = QgsAlignRaster.ResampleAlg(int(resample_method)) - except Exception as e: - log(f"Problem creating a resample value when snapping, {e}") - - if rescale_values: - lst[0].rescaleValues = rescale_values - - lst[0].resample_method = resample_method_value - - align.setRasters(lst) - align.setParametersFromRaster(reference_raster_source) - - layer = QgsRasterLayer(input_raster_source, "input_layer") - - extent = transform_extent( - layer.extent(), - QgsCoordinateReferenceSystem(layer.crs()), - QgsCoordinateReferenceSystem(align.destinationCrs()), - ) - - align.setClipExtent(extent) - - log(f"Snapping clip extent {layer.extent().asWktPolygon()} \n") - - if not align.run(): - log( - f"Problem during snapping for {input_raster_source} and " - f"{reference_raster_source}, {align.errorMessage()}" - ) - raise Exception(align.errorMessage()) - except Exception as e: - log( - f"Problem occured when snapping, {str(e)}." - f" Update snap settings and re-run the analysis" - ) - - return None, None - - log( - f"Finished snapping" - f" original layer - {input_raster_source}," - f"snapped output - {input_layer_output} \n" - ) - - return input_layer_output, None - - class BaseFileUtils: """ Provides functionality for commonly used file-related operations. @@ -299,10 +86,7 @@ def create_new_dir(directory: str, log_message: str = ""): """Creates new file directory if it doesn't exist""" p = Path(directory) if not p.exists(): - try: - p.mkdir() - except (FileNotFoundError, OSError): - log(log_message) + p.mkdir() @staticmethod def create_new_file(file_path: str, log_message: str = ""): @@ -310,10 +94,7 @@ def create_new_file(file_path: str, log_message: str = ""): p = Path(file_path) if not p.exists(): - try: - p.touch(exist_ok=True) - except FileNotFoundError: - log(log_message) + p.touch(exist_ok=True) def align_rasters( @@ -348,7 +129,7 @@ def align_rasters( :type resample_method: QgsAlignRaster.ResampleAlg """ - + logs = [] try: snap_directory = os.path.join(output_dir, "snap_layers") @@ -357,7 +138,8 @@ def align_rasters( input_path = Path(input_raster_source) input_layer_output = os.path.join( - f"{snap_directory}", f"{input_path.stem}_{str(uuid.uuid4())[:4]}.tif" + f"{snap_directory}", + f"{input_path.stem}_{str(uuid.uuid4())[:4]}.tif" ) BaseFileUtils.create_new_file(input_layer_output) @@ -370,9 +152,11 @@ def align_rasters( resample_method_value = QgsAlignRaster.ResampleAlg.RA_NearestNeighbour try: - resample_method_value = QgsAlignRaster.ResampleAlg(int(resample_method)) + resample_method_value = QgsAlignRaster.ResampleAlg( + int(resample_method)) except Exception as e: - log(f"Problem creating a resample value when snapping, {e}") + logs.append( + f"Problem creating a resample value when snapping, {e}") if rescale_values: lst[0].rescaleValues = rescale_values @@ -392,29 +176,29 @@ def align_rasters( align.setClipExtent(extent) - log(f"Snapping clip extent {layer.extent().asWktPolygon()} \n") + logs.append(f"Snapping clip extent {layer.extent().asWktPolygon()} \n") if not align.run(): - log( + logs.append( f"Problem during snapping for {input_raster_source} and " f"{reference_raster_source}, {align.errorMessage()}" ) raise Exception(align.errorMessage()) except Exception as e: - log( + logs.append( f"Problem occured when snapping, {str(e)}." f" Update snap settings and re-run the analysis" ) - return None, None + return None, logs - log( + logs.append( f"Finished snapping" f" original layer - {input_raster_source}," f"snapped output - {input_layer_output} \n" ) - return input_layer_output, None + return input_layer_output, logs def get_layer_type(file_path: str): @@ -450,7 +234,9 @@ def todict(obj, classkey=None): Convert any object to dictionary """ - if isinstance(obj, dict): + if isinstance(obj, Enum): + return obj.value + elif isinstance(obj, dict): data = {} for k, v in obj.items(): data[k] = todict(v, classkey)