Skip to content

Commit

Permalink
WIP Refactoring opportunities mask into a processor
Browse files Browse the repository at this point in the history
  • Loading branch information
timlinux committed Jan 4, 2025
1 parent f71b1e4 commit 483ca31
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 234 deletions.
8 changes: 8 additions & 0 deletions geest/core/algorithms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,11 @@
from .population_processor import PopulationRasterProcessingTask
from .wee_by_population_score_processor import WEEByPopulationScoreProcessingTask
from .subnational_aggregation_processor import SubnationalAggregationProcessingTask
from .opportunities_mask_processor import OpportunitiesMaskProcessor
from .utilities import (
assign_crs_to_raster_layer,
assign_crs_to_vector_layer,
subset_vector_layer,
geometry_to_memory_layer,
check_and_reproject_layer,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import shutil
import traceback
from typing import Optional
from qgis.core import (
QgsRasterLayer,
Qgis,
Expand All @@ -10,16 +12,36 @@
QgsVectorLayer,
QgsGeometry,
QgsProcessingFeedback,
QgsTask,
)
from qgis.PyQt.QtCore import QVariant
import processing
from .workflow_base import WorkflowBase
from geest.core import JsonTreeItem
from geest.utilities import log_message, resources_path
from .utilities import (
subset_vector_layer,
geometry_to_memory_layer,
check_and_reproject_layer,
)
from .area_iterator import AreaIterator


class OpportunitiesPolygonMaskWorkflow(WorkflowBase):
class OpportunitiesMaskProcessor(QgsTask):
"""
A QgsTask subclass for generating job opportunity mask layers.
It iterates over bounding boxes and study areas, selects the intersecting features
(if inputs are points or polygons) or in the case of a raster maske, clips the raster
data to match the study area bbox.
Args:
item (JSONTreeItem): Analysis item containing the needed parameters.
study_area_gpkg_path (str): Path to the GeoPackage containing study area masks.
output_dir (str): Directory to save the output rasters.
cell_size_m (float): Cell size for the output rasters.
context (Optional[QgsProcessingContext]): QGIS processing context.
feedback (Optional[QgsFeedback]): QGIS feedback object.
force_clear (bool): Flag to force clearing of all outputs before processing.
Concrete implementation of a geest insight for masking by job opportunities.
It will create a raster layer where all cells outside the masked areas (defined
Expand All @@ -44,33 +66,40 @@ class OpportunitiesPolygonMaskWorkflow(WorkflowBase):
This workflow expects that the user has configured the root analysis node dialog with
the polygon mask settings configured.
Input can be any of:
* a point layer (with a buffer distance)
* a polygon layer (attributes are ignored)
* a raster layer (any non-null pixel will be set to 1)
"""

def __init__(
self,
item: JsonTreeItem,
study_area_gpkg_path: str,
working_directory: str,
cell_size_m: float,
feedback: QgsFeedback,
context: QgsProcessingContext,
working_directory: str = None,
context: Optional[QgsProcessingContext] = None,
feedback: Optional[QgsFeedback] = None,
force_clear: bool = False,
):
"""
Initialize the workflow with attributes and feedback.
:param attributes: Item containing workflow parameters (should be node type: analysis).
:param feedback: QgsFeedback object for progress reporting and cancellation.
:context: QgsProcessingContext object for processing. This can be used to pass objects to the thread. e.g. the QgsProject Instance
:working_directory: Folder containing study_area.gpkg and where the outputs will be placed. If not set will be taken from QSettings.
"""
log_message(f"Working_directory: {working_directory}")
super().__init__(
item=item,
cell_size_m=cell_size_m,
feedback=feedback,
context=context,
working_directory=working_directory,
) # ⭐️ Item is a reference - whatever you change in this item will directly update the tree

self.mask_mode = self.attributes.get(
super().__init__("Opportunities Mask Processor", QgsTask.CanCancel)
self.study_area_gpkg_path = study_area_gpkg_path
self.cell_size_m = cell_size_m
self.working_directory = working_directory
layer: QgsVectorLayer = QgsVectorLayer(
f"{self.study_area_gpkg_path}|layername=study_area_clip_polygons",
"study_area_clip_polygons",
"ogr",
)
self.target_crs = layer.crs()
del layer
self.context = context
self.feedback = feedback
self.clipped_rasters = []
self.item = item
self.mask_mode = self.item.attribute(
"mask_mode", None
) # if set, will be "point", "polygon" or "raster"
if not self.mask_mode:
Expand All @@ -81,19 +110,19 @@ def __init__(
# so we set it manually.
self.layer_id = "Opportunities_Mask"
if self.mask_mode == "point":
self.buffer_distance_m = self.attributes.get("buffer_distance_m", 1000)
self.buffer_distance_m = self.item.attribute("buffer_distance_m", 1000)
if self.mask_mode in ["point", "polygon"]:
# There are two ways a user can specify the polygon mask layer
# either as a shapefile path added in a line edit or as a layer source
# using a QgsMapLayerComboBox. We prioritize the shapefile path, so check that first.
layer_source = self.attributes.get(f"{self.mask_mode}_mask_shapefile", None)
layer_source = self.item.attribute(f"{self.mask_mode}_mask_shapefile", None)
provider_type = "ogr"
if not layer_source:
# Fall back to the QgsMapLayerComboBox source
layer_source = self.attributes.get(
layer_source = self.item.attribute(
f"{self.mask_mode}_mask_layer_source", None
)
provider_type = self.attributes.get(
provider_type = self.item.attribute(
f"{self.mask_mode}_mask_layer_provider_type", "ogr"
)
if not layer_source:
Expand All @@ -112,9 +141,15 @@ def __init__(
)
log_message(f"Layer Source: {layer_source}", level=Qgis.Critical)
return False

# Check the geometries and reproject if necessary
self.features_layer = check_and_reproject_layer(
self.features_layer, self.target_crs
)

elif self.mask_mode == "raster":
# The base class has all the logic for clipping the raster layer
# we just need to assign it to self.raster_layer
# Check the input raster is ok. The raster itself does not need to be a mask
# (i.e. with 1 and nodata values) - we will take care of that in this class.
# Then the _process_raster_for_area method is where we turn it into a mask
log_message("Loading source raster mask layer")
# First try the one defined in the line edit
Expand All @@ -137,34 +172,72 @@ def __init__(
)
return False
# Workflow directory is the subdir under working_directory
## This is usually set in the base class but we override that behaviour for this workflow
self.workflow_directory = os.path.join(working_directory, "opportunity_masks")
os.makedirs(self.workflow_directory, exist_ok=True)
# Again, normally auto-set in the base class but we override it here:

self.output_filename = "Opportunities_Mask"
# And customise which key we will write the result file to (see base class for notes):
# And customise which key we will write the result file to:
self.result_file_key = "opportunities_mask_result_file"
self.result_key = "opportunities_mask_result"

# Section below to be removed

# These folders should already exist from the aggregation analysis and population raster processing
self.wee_by_population_folder = os.path.join(
working_directory, "wee_by_population_score"
)

if not os.path.exists(self.wee_by_population_folder):
raise Exception(
f"WEE folder not found.\n{self.wee_by_population_folder}\nPlease run WEE raster processing first."
)

# TODO make user configurable
self.force_clear = False
if self.force_clear and os.path.exists(self.workflow_directory):
for file in os.listdir(self.workflow_directory):
os.remove(os.path.join(self.workflow_directory, file))

log_message("Initialized WEE Opportunities Polygon Mask Workflow")
log_message(f"---------------------------------------------")
log_message(f"Initialized WEE Opportunities Mask Workflow")
log_message(f"---------------------------------------------")
log_message(f"Item: {self.item.name}")
log_message(f"Study area GeoPackage path: {self.study_area_gpkg_path}")
log_message(f"Working_directory: {self.working_directory}")
log_message(f"Workflow directory: {self.workflow_directory}")
log_message(f"Cell size: {self.cell_size_m}")
log_message(f"CRS: {self.target_crs.authid() if self.target_crs else 'None'}")
log_message(f"Force clear: {self.force_clear}")
log_message(f"---------------------------------------------")

def run(self) -> bool:
"""
Executes the task to process mask for each are.
"""
try:
area_iterator = AreaIterator(self.study_area_gpkg_path)
for index, (current_area, clip_area, current_bbox, progress) in enumerate(
area_iterator
):
if self.feedback and self.feedback.isCanceled():
return False
if self.mask_mode == "raster":
area_raster = self._subset_raster_layer(current_bbox, index)
mask_layer = self._process_raster_for_area(
current_area, clip_area, current_bbox, area_raster, index
)
else:
vector_layer = subset_vector_layer(
self.workflow_directory,
self.features_layer,
current_area,
str(index),
)
mask_layer = self._process_features_for_area(
current_area, clip_area, current_bbox, vector_layer, index
)

except Exception as e:
log_message(f"Task failed: {e}")
log_message(traceback.format_exc())
return False

def finished(self, result: bool) -> None:
"""
Called when the task completes.
"""
if result:
log_message("Population raster processing completed successfully.")
else:
log_message("Population raster processing failed.")

def _process_features_for_area(
self,
Expand Down Expand Up @@ -208,12 +281,6 @@ def _process_features_for_area(
log_message(f"Mask layer saved to {mask_layer}")
return mask_layer
else: # Raster
# The raster workflow is handled by the base class
# We just need to set the self.raster_layer attribute in the
# ctor for it to be initiated.
#
# Override the implementations in this class's _process_raster_for_area
# for the actual processing logic.
pass

def _buffer_features(self, layer: QgsVectorLayer, index: int) -> QgsVectorLayer:
Expand Down Expand Up @@ -259,7 +326,7 @@ def _clip_features(
QgsVectorLayer: The mask features layer clipped to the clip area.
"""
output_name = f"opportunites_polygons_clipped_{index}"
clip_layer = self.geometry_to_memory_layer(clip_area, "clip_area")
clip_layer = geometry_to_memory_layer(clip_area, self.target_crs, "clip_area")
output_path = os.path.join(self.workflow_directory, f"{output_name}.shp")
params = {"INPUT": layer, "OVERLAY": clip_layer, "OUTPUT": output_path}
output = processing.run("native:clip", params)["OUTPUT"]
Expand Down Expand Up @@ -307,74 +374,6 @@ def generate_mask_layer(
output = processing.run("gdal:rasterize", params)["OUTPUT"]
return rasterized_polygons_path

def process_wee_score(self, mask_path, index):
"""
TODO MOVE TO ITS OWN WORKFLOW, CURRENTLY NOT USED
Apply the work opportunities mask to the WEE Score raster layer.
"""

# Load your raster layer
wee_path = os.path.join(
self.wee_by_population_folder, "wee_by_population_score.vrt"
)
wee_by_population_layer = QgsRasterLayer(wee_path, "WEE by Population Score")

if not wee_by_population_layer.isValid():
log_message(f"The raster layer is invalid!\n{wee_path}\nTrying WEE score")
wee_by_population_path = os.path.join(
os.pardir(self.wee_by_population_folder), "wee_by_population_score.vrt"
)
wee_by_population_layer = QgsRasterLayer(
wee_path, "WEE By Population Score"
)
if not wee_by_population_layer.isValid():
raise Exception(
f"Neither WEE x Population nor WEE Score layers are valid.\n{wee_path}\n"
)
else:
# Get the extent of the raster layer
extent = wee_by_population_layer.extent()

# Get the data provider for the raster layer
provider = wee_by_population_layer.dataProvider()

# Get the raster's width, height, and size of cells
width = provider.xSize()
height = provider.ySize()

cell_width = extent.width() / width
cell_height = extent.height() / height
log_message(f"Raster layer loaded: {wee_path}")
log_message(f"Raster extent: {extent}")
log_message(f"Raster cell size: {cell_width} x {cell_height}")

log_message(f"Masked WEE Score raster saved to {output}")
opportunities_mask = os.path.join(
self.workflow_directory, "oppotunities_mask.tif"
)
params = {
"INPUT_A": wee_by_population_layer,
"BAND_A": 1,
"INPUT_B": rasterized_polygons_path,
"BAND_B": 1,
"FORMULA": "A*B",
"NO_DATA": None,
"EXTENT_OPT": 3,
"PROJWIN": None,
"RTYPE": 0,
"OPTIONS": "",
"EXTRA": "",
"OUTPUT": opportunities_mask,
}

processing.run("gdal:rastercalculator", params)
self.output_rasters.append(opportunities_mask)

log_message(f"WEE SCORE raster saved to {opportunities_mask}")

def apply_qml_style(self, source_qml: str, qml_path: str) -> None:

log_message(f"Copying QML style from {source_qml} to {qml_path}")
Expand Down Expand Up @@ -444,7 +443,6 @@ def _subset_raster_layer(self, bbox: QgsGeometry, index: int):
)["OUTPUT"]
return reprojected_raster_path

# Default implementation of the abstract method - not used in this workflow
def _process_raster_for_area(
self,
current_area: QgsGeometry,
Expand Down Expand Up @@ -493,15 +491,3 @@ def _process_raster_for_area(

processing.run("gdal:rastercalculator", params)
return opportunities_mask_path

def _process_aggregate_for_area(
self,
current_area: QgsGeometry,
clip_area: QgsGeometry,
current_bbox: QgsGeometry,
index: int,
):
"""
Executes the workflow, reporting progress through the feedback object and checking for cancellation.
"""
pass
3 changes: 1 addition & 2 deletions geest/core/algorithms/population_processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import traceback
import shutil
from typing import Optional, Tuple
from typing import Optional
import subprocess
import platform

Expand All @@ -10,7 +10,6 @@
QgsTask,
QgsProcessingContext,
QgsFeedback,
QgsCoordinateReferenceSystem,
QgsRasterLayer,
QgsRasterDataProvider,
QgsVectorLayer,
Expand Down
Loading

0 comments on commit 483ca31

Please sign in to comment.