Skip to content

Commit

Permalink
Feat refactor conf (#4)
Browse files Browse the repository at this point in the history
* refactor task config; removed unused functions

* fix task cancelled and scenario directory

* fix get scenario directory

* fix task config get value

* removed unused defaults

* use scenario_directory from task_config

* remove uuids from taskconfig

* update sieve function

* fix basefileutils

* fix task_config params

* fix analysis init param

* update docstrings

* update base models

* fix task activities
  • Loading branch information
danangmassandy authored Sep 17, 2024
1 parent d2b8ed0 commit 8054fff
Show file tree
Hide file tree
Showing 13 changed files with 496 additions and 2,842 deletions.
1 change: 1 addition & 0 deletions cplus_core/analysis/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .analysis import ScenarioAnalysisTask
from .task_config import TaskConfig
161 changes: 116 additions & 45 deletions cplus_core/analysis/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
Plugin tasks related to the scenario analysis
"""
import datetime
import os
import uuid
from pathlib import Path
Expand All @@ -24,13 +23,14 @@
QgsTask,
)

from ..utils.conf import settings_manager, Settings
from ..utils.conf import Settings
from ..definitions.defaults import (
SCENARIO_OUTPUT_FILE_NAME,
)
from ..models.base import ScenarioResult
from ..models.helpers import clone_activity
from ..utils.helper import align_rasters, clean_filename, tr, log, BaseFileUtils
from ..utils.helper import align_rasters, clean_filename, tr, BaseFileUtils
from .task_config import TaskConfig


class ScenarioAnalysisTask(QgsTask):
Expand All @@ -40,28 +40,22 @@ class ScenarioAnalysisTask(QgsTask):
info_message_changed = QtCore.pyqtSignal(str, int)

custom_progress_changed = QtCore.pyqtSignal(float)
log_received = QtCore.pyqtSignal(str, str, bool, bool)
task_cancelled = QtCore.pyqtSignal(bool)

def __init__(
self,
analysis_scenario_name,
analysis_scenario_description,
analysis_activities,
analysis_priority_layers_groups,
analysis_extent,
scenario,
):
def __init__(self, task_config: TaskConfig):
super().__init__()
self.analysis_scenario_name = analysis_scenario_name
self.analysis_scenario_description = analysis_scenario_description
self.task_config = task_config
self.analysis_scenario_name = task_config.scenario.name
self.analysis_scenario_description = task_config.scenario.description

self.analysis_activities = analysis_activities
self.analysis_priority_layers_groups = analysis_priority_layers_groups
self.analysis_extent = analysis_extent
self.analysis_activities = task_config.analysis_activities
self.analysis_priority_layers_groups = task_config.priority_layer_groups
self.analysis_extent = task_config.scenario.extent
self.analysis_extent_string = None

self.analysis_weighted_activities = []
self.scenario_result = None
self.scenario_directory = None

self.success = True
self.output = None
Expand All @@ -74,28 +68,62 @@ def __init__(
self.feedback = QgsProcessingFeedback()
self.processing_context = QgsProcessingContext()

self.scenario = scenario
self.scenario = task_config.scenario
self.scenario_directory = task_config.base_dir

def get_settings_value(self, name: str, default=None, setting_type=None):
return settings_manager.get_value(name, default, setting_type)
"""Get attribute value by attribute name.
def get_scenario_directory(self):
base_dir = self.get_settings_value(Settings.BASE_DIR)
return os.path.join(
f"{base_dir}",
"scenario_" f'{datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}',
)
:param name: Attribute name/config key
:type name: Settings
:param default: Default value if not found, defaults to None
:type default: any, optional
:param setting_type: type of attribute, defaults to None
:type setting_type: any, optional
:return: Attribute value
:rtype: any
"""
return self.task_config.get_value(name, default)

def get_priority_layer(self, identifier):
return settings_manager.get_priority_layer(identifier)
"""Get priority layer dict by its UUID.
:param identifier: Priority Layer UUID
:type identifier: str
:return: Priority Layer dict
:rtype: typing.Dict
"""
return self.task_config.get_priority_layer(identifier)

def get_activity(self, activity_uuid):
return settings_manager.get_activity(activity_uuid)
"""Get activity object by its UUID.
:param activity_uuid: activity UUID
:type activity_uuid: str
:return: Activity object or None if not found
:rtype: typing.Union[Activity, None]
"""
return self.task_config.get_activity(activity_uuid)

def get_priority_layers(self):
return settings_manager.get_priority_layers()
"""Get all priority layers.
:return: List of priority layer dictionary
:rtype: typing.List
"""
return self.task_config.get_priority_layers()

def get_masking_layers(self):
"""Get masking layers
:return: list of mask layer file path
:rtype: List
"""
masking_layers_paths = self.get_settings_value(
Settings.MASK_LAYERS_PATHS, default=None
)
Expand All @@ -105,8 +133,18 @@ def get_masking_layers(self):
return masking_layers

def cancel_task(self, exception=None):
"""Cancel current task.
:param exception: exception if any, defaults to None
:type exception: Exception, optional
"""
self.error = exception
self.cancel()
try:
self.cancel()
except Exception:
pass
finally:
self.task_cancelled.emit(exception is not None)

def log_message(
self,
Expand All @@ -115,7 +153,21 @@ def log_message(
info: bool = True,
notify: bool = True,
):
log(message, name=name, info=info, notify=notify)
"""Handle when log is received from running task.
:param message: Message log
:type message: str
:param name: log name, defaults to "qgis_cplus"
:type name: str, optional
:param info: True if it is information log, defaults to True
:type info: bool, optional
:param notify: Not used in API, defaults to True
:type notify: bool, optional
"""
self.log_received.emit(message, name, info, notify)

def on_terminated(self):
"""Called when the task is terminated."""
Expand All @@ -128,8 +180,6 @@ def on_terminated(self):
def run(self):
"""Runs the main scenario analysis task operations"""

self.scenario_directory = self.get_scenario_directory()

BaseFileUtils.create_new_dir(self.scenario_directory)

selected_pathway = None
Expand Down Expand Up @@ -296,14 +346,31 @@ def finished(self, result: bool):
self.log_message(f"Error from task scenario task {self.error}")

def set_status_message(self, message):
"""Handle when status message is updated.
:param message: status message
:type message: str
"""
self.status_message = message
self.status_message_changed.emit(self.status_message)

def set_info_message(self, message, level=Qgis.Info):
"""Handle when info message is updated.
:param message: message
:type message: str
:param level: severity level, defaults to Qgis.Info
:type level: int, optional
"""
self.info_message = message
self.info_message_changed.emit(self.info_message, level)

def set_custom_progress(self, value):
"""Handle when progress value is updated.
:param value: progress value
:type value: float
"""
self.custom_progress = value
self.custom_progress_changed.emit(self.custom_progress)

Expand Down Expand Up @@ -431,7 +498,9 @@ def replace_nodata(self, layer_path, output_path, nodata_value):

return outputs is not None
except Exception as e:
log(f"Problem replacing no data value from a snapping output, {e}")
self.log_message(
f"Problem replacing no data value from a snapping output, {e}"
)

return False

Expand Down Expand Up @@ -824,14 +893,16 @@ def snap_layer(
"""

input_result_path, reference_result_path = align_rasters(
input_result_path, logs = align_rasters(
input_path,
reference_path,
extent,
directory,
rescale_values,
resampling_method,
)
for log in logs:
self.log_message(log, info=("Problem" not in log))

if input_result_path is not None:
result_path = Path(input_result_path)
Expand Down Expand Up @@ -1373,7 +1444,7 @@ def run_activities_sieve(self, models, temporary_output=False):
threshold size (in pixels) and replaces them with the pixel value of
the largest neighbour polygon.
:param models: List of the analyzed implementation models
:param models: List of the analyzed activities
:type models: typing.List[ImplementationModel]
:param extent: Selected area of interest extent
Expand All @@ -1390,9 +1461,7 @@ def run_activities_sieve(self, models, temporary_output=False):
# Will not proceed if processing has been cancelled by the user
return False

self.set_status_message(
tr("Applying sieve function to the implementation models")
)
self.set_status_message(tr("Applying sieve function to the activities"))

try:
for model in models:
Expand Down Expand Up @@ -1442,12 +1511,14 @@ def run_activities_sieve(self, models, temporary_output=False):
)

# Actual processing calculation
# alg_params = {
# "INPUT": model.path,
# "THRESHOLD": threshold_value,
# "MASK_LAYER": mask_layer,
# "OUTPUT": output,
# }
alg_params = {
"INPUT": model.path,
"THRESHOLD": threshold_value,
"MASK_LAYER": mask_layer,
"OUTPUT": output,
}

self.log_message(f"Used parameters for sieving: {alg_params} \n")

input_name = os.path.splitext(os.path.basename(model.path))[0]

Expand Down
Loading

0 comments on commit 8054fff

Please sign in to comment.