Skip to content

Commit

Permalink
Merge pull request #497 from kartoza/aggregation
Browse files Browse the repository at this point in the history
Refactor aggreagtion workflow
  • Loading branch information
timlinux authored Oct 28, 2024
2 parents a3e03f1 + a8cc86d commit de1baa9
Show file tree
Hide file tree
Showing 20 changed files with 293 additions and 242 deletions.
9 changes: 6 additions & 3 deletions geest/core/json_tree_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ def getFactorAttributes(self):
attributes["Factor ID"] = self.data(0)
attributes["Indicators"] = [
{
"Indicator ID": i,
"Indicator No": i,
"Indicator ID": child.data(3).get("ID", ""),
"Indicator Name": child.data(0),
"Indicator Weighting": child.data(2),
"Indicator Result File": child.data(3).get(
Expand All @@ -304,7 +305,8 @@ def getDimensionAttributes(self):
attributes["Dimension ID"] = self.data(0)
attributes["Factors"] = [
{
"Factor ID": i,
"Factor No": i,
"Factor ID": child.data(3).get("ID", ""),
"Factor Name": child.data(0),
"Factor Weighting": child.data(2),
"Factor Result File": child.data(3).get(f"Factor Result File", ""),
Expand All @@ -325,7 +327,8 @@ def getAnalysisAttributes(self):

attributes["Dimensions"] = [
{
"Dimension ID": i,
"Dimension No": i,
"Dimension ID": child.data(3).get("id", ""),
"Dimension Name": child.data(0),
"Dimension Weighting": child.data(2),
"Dimension Result File": child.data(3).get(
Expand Down
11 changes: 11 additions & 0 deletions geest/core/workflows/acled_impact_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,14 @@ def _process_raster_for_area(
:return: Path to the reclassified raster.
"""
pass

def _process_aggregate_for_area(
self,
current_area: QgsGeometry,
current_bbox: QgsGeometry,
index: int,
):
"""
Executes the workflow, reporting progress through the feedback object and checking for cancellation.
"""
pass
185 changes: 51 additions & 134 deletions geest/core/workflows/aggregation_workflow_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def __init__(
super().__init__(
item, feedback, context
) # ⭐️ Item is a reference - whatever you change in this item will directly update the tree
self.attributes = item.data(3)
self.aggregation_attributes = None # This should be set by the child class e.g. item.getIndicatorAttributes()
self.analysis_mode = self.attributes.get("Analysis Mode", "")
self.id = None # This should be set by the child class
Expand All @@ -45,6 +44,8 @@ def __init__(
self.raster_path_key = (
None # This should be set by the child class e.g. "Indicator Result File"
)
self.aggregation = True
self.workflow_is_legacy = False

def get_weights(self) -> list:
"""
Expand All @@ -57,27 +58,20 @@ def get_weights(self) -> list:
weight = layer.get(self.weight_key, 1.0)
if weight == "" and len(self.layers) == 1:
weight = 1.0
# Ensure the weight is numeric, cast to float if necessary
try:
weight = float(weight)
except (ValueError, TypeError):
weight = 1.0 # Default fallback to 1.0 if weight is invalid
weights.append(weight)
return weights

def output_path(self, extension: str) -> str:
"""
Define output path for the aggregated raster based on the analysis mode.
Parameters:
extension (str): The file extension for the output file.
Returns:
str: Path to the aggregated raster file.
"""
pass

def aggregate(self, input_files: list) -> None:
def aggregate(self, input_files: list, index: int) -> str:
"""
Perform weighted raster aggregation on the found raster files.
:param input_files: List of raster file paths to aggregate.
:param index: The index of the area being processed.
:return: Path to the aggregated raster file.
"""
Expand Down Expand Up @@ -108,17 +102,21 @@ def aggregate(self, input_files: list) -> None:

# Create QgsRasterCalculatorEntries for each raster layer
entries = []
ref_names = []
for i, raster_layer in enumerate(raster_layers):
QgsMessageLog.logMessage(
f"Adding raster layer {i+1} to the raster calculator. {raster_layer.source()}",
tag="Geest",
level=Qgis.Info,
)
entry = QgsRasterCalculatorEntry()
entry.ref = f"layer_{i+1}@1" # layer_1@1, layer_2@1, etc.
ref_name = os.path.basename(raster_layer.source()).split(".")[0]
entry.ref = f"{ref_name}_{i+1}@1" # Reference the first band
# entry.ref = f"layer_{i+1}@1" # layer_1@1, layer_2@1, etc.
entry.raster = raster_layer
entry.bandNumber = 1
entries.append(entry)
ref_names.append(f"{ref_name}_{i+1}")

# Assign default weights (you can modify this as needed)
weights = self.get_weights()
Expand All @@ -131,13 +129,15 @@ def aggregate(self, input_files: list) -> None:

# Build the calculation expression for weighted average
expression = " + ".join(
[f"({weights[i]} * layer_{i+1}@1)" for i in range(layer_count)]
[f"({weights[i]} * {ref_names[i]}@1)" for i in range(layer_count)]
)

# Wrap the weighted sum and divide by the sum of weights
expression = f"({expression}) / {sum_weights}"
expression = f"({expression}) / {layer_count}"

aggregation_output = self.output_path("tif")
aggregation_output = os.path.join(
self.workflow_directory, f"{self.layer_id}_aggregated_{index}.tif"
)
QgsMessageLog.logMessage(
f"Aggregating {len(input_files)} raster layers to {aggregation_output}",
tag="Geest",
Expand Down Expand Up @@ -170,105 +170,56 @@ def aggregate(self, input_files: list) -> None:
)
return None

converter = RasterConverter()
aggregation_output_8bit = aggregation_output.replace(".tif", "_8bit.tif")
# Convert the aggregated raster to 8-bit
converter.convert_to_8bit(aggregation_output, aggregation_output_8bit)
if os.path.exists(aggregation_output_8bit):
# TODO We should check if developer mode is set and keep the 32-bit raster if it is
os.remove(aggregation_output)

QgsMessageLog.logMessage(
"Raster aggregation completed successfully.",
tag="Geest",
level=Qgis.Info,
)
# Add the aggregated raster to the map
aggregated_layer = QgsRasterLayer(
aggregation_output_8bit, f"aggregated_{self.id}.tif"
)
if not aggregated_layer.isValid():
QgsMessageLog.logMessage(
"Aggregate layer is not valid.",
tag="Geest",
level=Qgis.Critical,
)
return None
# WRite the output path to the attributes
# That will get passed back to the json model
self.attributes[self.result_file_tag] = aggregation_output_8bit

# Fallback sequence to copy QML style
# qml with same name as factor
# qml with generic name of factor.qml
qml_paths = []
qml_paths.append(
resources_path(
"resources",
"qml",
f"{self.id}.qml",
)
)
qml_paths.append(
resources_path(
"resources",
"qml",
f"{self.analysis_mode}.qml", # e.g. factor.qml
)
)
qml_dest_path = self.output_path("qml")
for qml_src_path in qml_paths:
if os.path.exists(qml_src_path):
qml_dest_path_8bit = qml_dest_path.replace(".qml", "_8bit.qml")
shutil.copy(qml_src_path, qml_dest_path_8bit)
QgsMessageLog.logMessage(
f"Copied QML style file to {qml_dest_path_8bit}",
tag="Geest",
level=Qgis.Info,
)
result = aggregated_layer.loadNamedStyle(qml_dest_path_8bit)
if result[0]: # Check if the style was successfully loaded
QgsMessageLog.logMessage(
"Successfully applied QML style.",
tag="Geest",
level=Qgis.Info,
)
break

self.context.project().addMapLayer(aggregated_layer)
QgsMessageLog.logMessage(
"Added raster layer to the map.", tag="Geest", level=Qgis.Info
)
return aggregation_output_8bit
self.attributes[self.result_file_tag] = aggregation_output

return aggregation_output

def get_raster_list(self) -> list:
def get_raster_list(self, index) -> list:
"""
Get the list of rasters from the attributes that will be aggregated.
(Factor Aggregation, Dimension Aggregation, Analysis).
Parameters:
index (int): The index of the area being processed.
Returns:
list: List of found raster file paths.
"""
raster_files = []

for layer in self.layers:
path = layer.get(self.raster_path_key, "")
raster_files.append(path)
QgsMessageLog.logMessage(
f"Adding raster: {path}", tag="Geest", level=Qgis.Info
id = layer.get("Indicator ID", "").lower()
layer_folder = os.path.dirname(layer.get("Indicator Result File", ""))
path = os.path.join(
self.workflow_directory, layer_folder, f"{id}_masked_{index}.tif"
)
if path:
raster_files.append(path)
QgsMessageLog.logMessage(
f"Adding raster: {path}", tag="Geest", level=Qgis.Info
)
QgsMessageLog.logMessage(
f"Total raster files found: {len(raster_files)}",
tag="Geest",
level=Qgis.Info,
)
return raster_files

def do_execute(self):
def _process_aggregate_for_area(
self,
current_area: QgsGeometry,
current_bbox: QgsGeometry,
index: int,
):
"""
Executes the workflow, reporting progress through the feedback object and checking for cancellation.
"""
_ = current_area # Unused in this analysis
_ = current_bbox # Unused in this analysis

# Log the execution
QgsMessageLog.logMessage(
f"Executing {self.analysis_mode} Aggregation Workflow",
Expand All @@ -282,7 +233,7 @@ def do_execute(self):
level=Qgis.Info,
)

raster_files = self.get_raster_list()
raster_files = self.get_raster_list(index)

if not raster_files or not isinstance(raster_files, list):
QgsMessageLog.logMessage(
Expand All @@ -302,49 +253,15 @@ def do_execute(self):
)

# Perform aggregation only if raster files are provided
result_file = self.aggregate(raster_files)
if result_file:
QgsMessageLog.logMessage(
"Aggregation Workflow completed successfully.",
tag="Geest",
level=Qgis.Info,
)
self.attributes[self.result_file_tag] = result_file
self.attributes["Result"] = (
f"{self.analysis_mode} Factor Aggregation Workflow Completed"
)
return True
else:
QgsMessageLog.logMessage(
"Aggregation failed due to missing or invalid raster files.",
tag="Geest",
level=Qgis.Warning,
)
self.attributes[self.result_file_tag] = None
self.attributes["Result"] = (
f"{self.analysis_mode} Aggregation Workflow Failed"
)
return False
result_file = self.aggregate(raster_files, index)

return result_file

def _process_features_for_area(self):
pass

# Default implementation of the abstract method - not used in this workflow
def _process_raster_for_area(
self,
current_area: QgsGeometry,
current_bbox: QgsGeometry,
area_raster: str,
index: int,
):
"""
Executes the actual workflow logic for a single area using a raster.
:current_area: Current polygon from our study area.
:current_bbox: Bounding box of the above area.
:area_raster: A raster layer of features to analyse that includes only bbox pixels in the study area.
:index: Index of the current area.
def _process_raster_for_area(self):
pass

:return: Path to the reclassified raster.
"""
def do_execute(self):
pass
24 changes: 0 additions & 24 deletions geest/core/workflows/analysis_aggregation_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,3 @@ def __init__(
self.weight_key = "Dimension Weighting"
self.result_file_tag = "Analysis Result File"
self.raster_path_key = "Dimension Result File"

def output_path(self, extension: str) -> str:
"""
Define output path for the aggregated raster based on the analysis mode.
Parameters:
extension (str): The file extension for the output file.
Returns:
str: Path to the aggregated raster file.
"""
directory = self.workflow_directory
# Create the directory if it doesn't exist
if not os.path.exists(directory):
os.makedirs(directory)

return os.path.join(
directory,
f"aggregate_{self.id}" + f".{extension}",
)

def _process_features_for_area(self):
pass
11 changes: 11 additions & 0 deletions geest/core/workflows/default_index_score_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,14 @@ def _process_raster_for_area(
:return: Path to the reclassified raster.
"""
pass

def _process_aggregate_for_area(
self,
current_area: QgsGeometry,
current_bbox: QgsGeometry,
index: int,
):
"""
Executes the workflow, reporting progress through the feedback object and checking for cancellation.
"""
pass
Loading

0 comments on commit de1baa9

Please sign in to comment.