diff --git a/spp_gis_report/__init__.py b/spp_gis_report/__init__.py
index ada0b87be..fc45f5ddf 100644
--- a/spp_gis_report/__init__.py
+++ b/spp_gis_report/__init__.py
@@ -1,3 +1,84 @@
+import logging
+
+from psycopg2 import sql
+
from . import controllers
from . import models
from . import wizards
+
+_logger = logging.getLogger(__name__)
+
+# Mapping: old boolean field name -> dimension technical name
+_BOOL_TO_DIMENSION = {
+ "disaggregate_by_gender": "gender",
+ "disaggregate_by_age": "age_group",
+ "disaggregate_by_disability": "disability_status",
+}
+
+
+def _migrate_boolean_disaggregation(env):
+ """Post-init hook: migrate boolean disaggregation flags to dimension_ids.
+
+ Looks up reports that had boolean flags set and links them to the
+ corresponding spp.demographic.dimension records. Logs a warning
+ (does not crash) if a dimension record is not found.
+ """
+ cr = env.cr
+
+ # Check if the old boolean columns still exist
+ cr.execute(
+ """
+ SELECT column_name FROM information_schema.columns
+ WHERE table_name = 'spp_gis_report'
+ AND column_name IN ('disaggregate_by_gender', 'disaggregate_by_age', 'disaggregate_by_disability')
+ """
+ )
+ existing_columns = {row[0] for row in cr.fetchall()}
+
+ if not existing_columns:
+ _logger.info("No legacy boolean disaggregation columns found, skipping migration")
+ return
+
+ # Get the m2m relation table name
+ Dimension = env["spp.demographic.dimension"]
+
+ for bool_field, dim_name in _BOOL_TO_DIMENSION.items():
+ if bool_field not in existing_columns:
+ continue
+
+ # Find reports with this boolean set. bool_field is a trusted column
+ # name from _BOOL_TO_DIMENSION; quote it safely via psycopg2.sql.
+ query = sql.SQL("SELECT id FROM spp_gis_report WHERE {col} = true").format(col=sql.Identifier(bool_field))
+ cr.execute(query)
+ report_ids = [row[0] for row in cr.fetchall()]
+ if not report_ids:
+ continue
+
+ dimension = Dimension.search([("name", "=", dim_name)], limit=1)
+ if not dimension:
+ _logger.warning(
+ "Dimension '%s' not found, cannot migrate %d reports with %s=True",
+ dim_name,
+ len(report_ids),
+ bool_field,
+ )
+ continue
+
+ # Insert m2m links (skip duplicates)
+ for report_id in report_ids:
+ cr.execute(
+ """
+ INSERT INTO spp_gis_report_spp_demographic_dimension_rel
+ (spp_gis_report_id, spp_demographic_dimension_id)
+ VALUES (%s, %s)
+ ON CONFLICT DO NOTHING
+ """,
+ (report_id, dimension.id),
+ )
+
+ _logger.info(
+ "Migrated %d reports from %s=True to dimension '%s'",
+ len(report_ids),
+ bool_field,
+ dim_name,
+ )
diff --git a/spp_gis_report/__manifest__.py b/spp_gis_report/__manifest__.py
index 997f8197e..bbed6b55d 100644
--- a/spp_gis_report/__manifest__.py
+++ b/spp_gis_report/__manifest__.py
@@ -1,6 +1,6 @@
{
"name": "OpenSPP GIS Reports",
- "version": "19.0.2.0.1",
+ "version": "19.0.2.1.0",
"category": "OpenSPP",
"summary": "Geographic visualization and reporting for social protection data",
"author": "OpenSPP.org, OpenSPP",
@@ -10,6 +10,7 @@
"depends": [
"spp_area",
"spp_gis",
+ "spp_metric_service",
"spp_registry",
"spp_vocabulary",
"spp_cel_domain",
@@ -47,6 +48,7 @@
"spp_gis_report/static/src/css/gis_report.css",
],
},
+ "post_init_hook": "_migrate_boolean_disaggregation",
"installable": True,
"application": False,
"auto_install": False,
diff --git a/spp_gis_report/controllers/geojson.py b/spp_gis_report/controllers/geojson.py
index 20dde19e9..4a1cc0aa9 100644
--- a/spp_gis_report/controllers/geojson.py
+++ b/spp_gis_report/controllers/geojson.py
@@ -100,23 +100,19 @@ def list_reports(self):
for report in reports:
admin_levels = report.data_ids.mapped("area_level") if report.data_ids else []
- report_list.append(
- {
- "code": report.code,
- "name": report.name,
- "category": report.category_id.name if report.category_id else None,
- "last_refresh": (report.last_refresh.isoformat() if report.last_refresh else None),
- "admin_levels_available": sorted(set(admin_levels)),
- "has_disaggregation": any(
- [
- report.disaggregate_by_gender,
- report.disaggregate_by_age,
- report.disaggregate_by_disability,
- report.disaggregate_by_tag_ids,
- ]
- ),
- }
- )
+ report_data = {
+ "code": report.code,
+ "name": report.name,
+ "category": report.category_id.name if report.category_id else None,
+ "last_refresh": (report.last_refresh.isoformat() if report.last_refresh else None),
+ "admin_levels_available": sorted(set(admin_levels)),
+ "has_disaggregation": bool(report.dimension_ids) or bool(report.disaggregate_by_tag_ids),
+ }
+ if report.dimension_ids:
+ report_data["disaggregation_dimensions"] = [
+ {"name": d.name, "label": d.label} for d in report.dimension_ids
+ ]
+ report_list.append(report_data)
return self._json_response({"reports": report_list})
diff --git a/spp_gis_report/models/area_ext.py b/spp_gis_report/models/area_ext.py
index c6b720fda..43ae22045 100644
--- a/spp_gis_report/models/area_ext.py
+++ b/spp_gis_report/models/area_ext.py
@@ -153,7 +153,15 @@ def get_gis_layers(self, view_id=None, view_type="gis", **options):
# Build legend info from thresholds for display purposes only.
thresholds = report.threshold_ids.sorted("sequence")
if thresholds:
- layer_dict["report_legend"] = [{"color": t.color, "label": t.label} for t in thresholds]
+ layer_dict["report_legend"] = [
+ {
+ "color": t.color,
+ "label": t.label,
+ "min_value": t.min_value,
+ "max_value": t.max_value,
+ }
+ for t in thresholds
+ ]
layer_dict["report_legend_title"] = report.name
# Build pre-computed features for report layer
diff --git a/spp_gis_report/models/gis_report.py b/spp_gis_report/models/gis_report.py
index c2d070871..43aff4996 100644
--- a/spp_gis_report/models/gis_report.py
+++ b/spp_gis_report/models/gis_report.py
@@ -3,7 +3,9 @@
from ast import literal_eval
from odoo import _, api, fields, models
+from odoo.exceptions import ValidationError
from odoo.osv import expression
+from odoo.tools.sql import SQL
_logger = logging.getLogger(__name__)
@@ -294,18 +296,23 @@ def _get_source_model_domain(self):
)
# ===== Disaggregation Configuration =====
- disaggregate_by_gender = fields.Boolean(
- "Disaggregate by Gender",
- help="Show gender breakdown in popup",
+ dimension_ids = fields.Many2many(
+ "spp.demographic.dimension",
+ string="Disaggregation Dimensions",
+ help="Demographic dimensions to compute disaggregation for (e.g., gender, age group)",
)
- disaggregate_by_age = fields.Boolean(
- "Disaggregate by Age Group",
- help="Show age group breakdown in popup",
- )
- disaggregate_by_disability = fields.Boolean(
- "Disaggregate by Disability",
- help="Show disability status breakdown",
+ member_expansion = fields.Selection(
+ [("none", "No Expansion"), ("expand", "Expand to Members")],
+ "Member Expansion",
+ default="none",
+ help=(
+ "For group-filtered reports, expand to individual members "
+ "for demographic breakdown (e.g., gender, age). "
+ "Use 'Expand to Members' when dimensions should be evaluated "
+ "on individuals inside groups, not on the groups themselves."
+ ),
)
+
disaggregate_by_tag_ids = fields.Many2many(
"spp.vocabulary",
string="Disaggregate by Tags",
@@ -435,46 +442,50 @@ def _compute_is_stale(self):
else:
report.is_stale = False
- def _compute_base_aggregation(self):
- """Compute aggregated values at the base area level.
+ @api.constrains("member_expansion", "source_model")
+ def _check_member_expansion(self):
+ """Validate member_expansion is only used with res.partner source model."""
+ for report in self:
+ if report.member_expansion == "expand" and report.source_model != "res.partner":
+ raise ValidationError(
+ _("Member expansion is only supported for reports with 'Contact' (res.partner) source model.")
+ )
- Uses Odoo's read_group for efficient aggregation. For very large
- datasets (1M+ records), consider using PostGIS-optimized SQL queries.
+ def _prepare_area_context(self):
+ """Build shared area context for aggregation and disaggregation.
+
+ Computes the filter domain, area field, base areas, and the
+ child_to_base mapping that maps descendant areas to their
+ base-level ancestor. Both _compute_base_aggregation and
+ _compute_disaggregation use this context.
Returns:
- dict: {area_id: {'raw': float, 'count': int, 'weight': float}}
+ dict: Area context with keys:
+ - domain: list, the filter domain including area filter
+ - child_to_base: dict, mapping area_id -> base_area_id
+ - base_areas: recordset, areas at base_area_level
+ - area_field: str, first part of area_field_path
+ Returns None if no source model or no base areas exist.
"""
self.ensure_one()
- _logger.info("Computing base aggregation for report ID %s", self.id)
- # Get the source model
if not self.source_model:
_logger.warning("No source model configured for report ID %s", self.id)
- return {}
-
- Model = self.env[self.source_model]
+ return None
- # Build the filter domain
domain = self._build_filter_domain()
-
- # Get area field (first part of path)
area_field = self.area_field_path.split(".")[0]
- # Get areas at the base level
base_areas = self.env["spp.area"].search([("area_level", "=", self.base_area_level)])
-
if not base_areas:
_logger.info("No areas at level %s found", self.base_area_level)
- return {}
+ return None
# Build mapping from descendant areas to their base-level ancestor.
# Registrants may be assigned to areas more granular than base_area_level
# (e.g., barangays when base level is municipality). We include all
# descendant areas in the query and aggregate results back to the
# base-level parent.
- #
- # Fetch all descendants of all base areas in a single query, then build
- # the child_to_base mapping in memory to avoid an N+1 query pattern.
child_to_base = {base_area.id: base_area.id for base_area in base_areas}
all_descendants = self.env["spp.area"].search(
[
@@ -482,7 +493,6 @@ def _compute_base_aggregation(self):
("id", "not in", base_areas.ids),
]
)
- # For each descendant, walk up its parent chain to find the base-level ancestor
for desc in all_descendants:
ancestor = desc.parent_id
while ancestor and ancestor.id not in child_to_base:
@@ -490,9 +500,44 @@ def _compute_base_aggregation(self):
if ancestor:
child_to_base[desc.id] = child_to_base[ancestor.id]
- # Add area filter to domain (base areas + all descendants)
+ # Add area filter to domain
domain.append((area_field, "in", list(child_to_base.keys())))
+ return {
+ "domain": domain,
+ "child_to_base": child_to_base,
+ "base_areas": base_areas,
+ "area_field": area_field,
+ }
+
+ def _compute_base_aggregation(self, area_context=None):
+ """Compute aggregated values at the base area level.
+
+ Uses Odoo's read_group for efficient aggregation. For very large
+ datasets (1M+ records), consider using PostGIS-optimized SQL queries.
+
+ Args:
+ area_context: Optional pre-computed area context from _prepare_area_context().
+ If not provided, it will be computed internally.
+
+ Returns:
+ dict: {area_id: {'raw': float, 'count': int, 'weight': float}}
+ """
+ self.ensure_one()
+ _logger.info("Computing base aggregation for report ID %s", self.id)
+
+ if area_context is None:
+ area_context = self._prepare_area_context()
+
+ if area_context is None:
+ return {}
+
+ Model = self.env[self.source_model]
+ domain = list(area_context["domain"])
+ area_field = area_context["area_field"]
+ base_areas = area_context["base_areas"]
+ child_to_base = area_context["child_to_base"]
+
# Initialize results for all base areas with 0
# This ensures areas with no matching records get 0 instead of being missing
results = {area.id: {"raw": 0, "count": 0, "weight": 0} for area in base_areas}
@@ -572,6 +617,336 @@ def _compute_base_aggregation(self):
return results
+ def _compute_disaggregation(self, area_context):
+ """Compute disaggregation counts per area for configured dimensions.
+
+ Uses SQL for performance when possible, with Python fallback for
+ dimensions that cannot compile to SQL. When member_expansion is
+ "expand", drills into group members to evaluate individual-level
+ demographics (gender, age) with area inheritance from the group.
+
+ Args:
+ area_context: Area context from _prepare_area_context()
+
+ Returns:
+ dict: {area_id: {dim_name: {raw_value: count}}}
+ Empty dict if no dimensions or non-partner source model.
+ """
+ self.ensure_one()
+
+ if not self.dimension_ids:
+ return {}
+
+ if self.source_model != "res.partner":
+ return {}
+
+ if area_context is None:
+ return {}
+
+ child_to_base = area_context["child_to_base"]
+ area_field = area_context["area_field"]
+
+ # Compile dimensions to SQL, separating compilable from fallback
+ sql_dims = [] # [(dimension, SQLColumnResult)]
+ fallback_dims = [] # [dimension]
+ alias_counter = 0
+ partner_alias = "ind" if self.member_expansion == "expand" else "reg"
+
+ for dimension in self.dimension_ids:
+ result = dimension.to_sql_column(partner_alias, alias_counter)
+ if result is not None:
+ sql_dims.append((dimension, result))
+ alias_counter = result.alias_counter
+ else:
+ fallback_dims.append(dimension)
+
+ results = {}
+
+ if sql_dims:
+ results = self._disaggregation_sql(area_context, sql_dims, child_to_base, area_field, partner_alias)
+
+ if fallback_dims:
+ fallback_results = self._disaggregation_python(area_context, fallback_dims, child_to_base, area_field)
+ # Merge fallback results into main results
+ for area_id, dim_data in fallback_results.items():
+ results.setdefault(area_id, {}).update(dim_data)
+
+ return results
+
+ def _disaggregation_sql(self, area_context, sql_dims, child_to_base, area_field, partner_alias):
+ """Execute SQL-based disaggregation query.
+
+ Builds a single SQL query that:
+ 1. Maps descendant areas to base-level areas via CTE
+ 2. Optionally expands group membership to individuals (DISTINCT)
+ 3. Computes all dimension values as SQL expressions
+ 4. Unpivots dimensions via CROSS JOIN LATERAL VALUES
+ 5. Groups by area + dimension + value
+ """
+ domain = list(area_context["domain"])
+
+ # Get matching registrant IDs (respects record rules via search())
+ Partner = self.env["res.partner"]
+ registrant_ids = Partner.search(domain, order="id").ids
+ if not registrant_ids:
+ return {}
+
+ # Build area_mapping CTE from child_to_base dict
+ area_mapping_values = SQL(", ").join(
+ SQL("(%s, %s)", child_id, base_id) for child_id, base_id in child_to_base.items()
+ )
+ area_mapping_cte = SQL(
+ "area_mapping(child_id, base_id) AS (VALUES %s)",
+ area_mapping_values,
+ )
+
+ # Collect all JOINs from dimension compilations
+ all_joins = []
+ for _dim, col_result in sql_dims:
+ all_joins.extend(col_result.joins)
+
+ # Build the dimension SELECT columns and VALUES entries
+ dim_select_parts = []
+ dim_values_entries = []
+ for dim, col_result in sql_dims:
+ col_alias = f"dim_{dim.name}"
+ dim_select_parts.append(SQL("%s AS %s", col_result.expression, SQL.identifier(col_alias)))
+ dim_values_entries.append(SQL("(%s, %s)", dim.name, SQL.identifier(col_alias)))
+
+ dim_select_sql = SQL(", ").join(dim_select_parts)
+ dim_values_sql = SQL(", ").join(dim_values_entries)
+
+ if self.member_expansion == "expand":
+ query = self._build_expand_query(
+ registrant_ids, area_mapping_cte, area_field, all_joins, dim_select_sql, dim_values_sql
+ )
+ else:
+ query = self._build_direct_query(
+ registrant_ids,
+ area_mapping_cte,
+ area_field,
+ all_joins,
+ dim_select_sql,
+ dim_values_sql,
+ partner_alias,
+ )
+
+ self.env.cr.execute(query)
+ rows = self.env.cr.fetchall()
+
+ # Parse results: (base_area_id, dim_name, dim_value, count)
+ results = {}
+ for base_area_id, dim_name, dim_value, cnt in rows:
+ area_disagg = results.setdefault(base_area_id, {})
+ dim_counts = area_disagg.setdefault(dim_name, {})
+ dim_counts[dim_value] = cnt
+
+ return results
+
+ def _build_expand_query(
+ self, registrant_ids, area_mapping_cte, area_field, all_joins, dim_select_sql, dim_values_sql
+ ):
+ """Build SQL query with member expansion (groups -> individuals)."""
+ area_field_col = SQL.identifier(area_field)
+
+ # Build JOINs string
+ joins_sql = SQL(" ").join(all_joins) if all_joins else SQL("")
+
+ return SQL(
+ """
+ WITH %s,
+ expanded AS (
+ SELECT DISTINCT ON (ind.id)
+ am.base_id AS resolved_area_id,
+ ind.id AS individual_id,
+ %s
+ FROM res_partner grp
+ JOIN spp_group_membership gm ON gm.%s = grp.id AND NOT gm.%s
+ JOIN res_partner ind ON ind.id = gm.%s
+ %s
+ JOIN area_mapping am ON am.child_id = COALESCE(ind.%s, grp.%s)
+ WHERE grp.id IN %s
+ ORDER BY ind.id, grp.id
+ )
+ SELECT resolved_area_id, dim_name, dim_value, COUNT(*) AS cnt
+ FROM expanded
+ CROSS JOIN LATERAL (VALUES %s) AS dims(dim_name, dim_value)
+ GROUP BY resolved_area_id, dim_name, dim_value
+ """,
+ area_mapping_cte,
+ dim_select_sql,
+ SQL.identifier("group"),
+ SQL.identifier("is_ended"),
+ SQL.identifier("individual"),
+ joins_sql,
+ area_field_col,
+ area_field_col,
+ tuple(registrant_ids),
+ dim_values_sql,
+ )
+
+ def _build_direct_query(
+ self, registrant_ids, area_mapping_cte, area_field, all_joins, dim_select_sql, dim_values_sql, partner_alias
+ ):
+ """Build SQL query for direct registrant disaggregation (no expansion)."""
+ area_field_col = SQL.identifier(area_field)
+ alias_id = SQL.identifier(partner_alias)
+
+ # Build JOINs string
+ joins_sql = SQL(" ").join(all_joins) if all_joins else SQL("")
+
+ return SQL(
+ """
+ WITH %s,
+ registrants AS (
+ SELECT
+ am.base_id AS resolved_area_id,
+ %s.id AS registrant_id,
+ %s
+ FROM res_partner %s
+ %s
+ JOIN area_mapping am ON am.child_id = %s.%s
+ WHERE %s.id IN %s
+ )
+ SELECT resolved_area_id, dim_name, dim_value, COUNT(*) AS cnt
+ FROM registrants
+ CROSS JOIN LATERAL (VALUES %s) AS dims(dim_name, dim_value)
+ GROUP BY resolved_area_id, dim_name, dim_value
+ """,
+ area_mapping_cte,
+ alias_id,
+ dim_select_sql,
+ alias_id,
+ joins_sql,
+ alias_id,
+ area_field_col,
+ alias_id,
+ tuple(registrant_ids),
+ dim_values_sql,
+ )
+
+ def _disaggregation_python(self, area_context, fallback_dims, child_to_base, area_field):
+ """Python fallback for dimensions that couldn't compile to SQL.
+
+ Uses the existing ORM-based evaluation path via the dimension cache.
+ When member_expansion is "expand", fetches individual members first.
+ """
+ domain = list(area_context["domain"])
+ base_areas = area_context["base_areas"]
+
+ Partner = self.env["res.partner"]
+ registrants = Partner.search_read(domain, [area_field], order="id")
+ if not registrants:
+ return {}
+
+ if self.member_expansion == "expand":
+ return self._disaggregation_python_expanded(registrants, fallback_dims, child_to_base, area_field)
+
+ # Direct mode: evaluate on the matching registrants
+ area_registrant_ids = {}
+ all_registrant_ids = []
+ for reg in registrants:
+ area_val = reg[area_field]
+ if not area_val:
+ continue
+ area_id = area_val[0] if isinstance(area_val, (list, tuple)) else area_val
+ base_id = child_to_base.get(area_id)
+ if base_id is None:
+ continue
+ area_registrant_ids.setdefault(base_id, []).append(reg["id"])
+ all_registrant_ids.append(reg["id"])
+
+ if not all_registrant_ids:
+ return {}
+
+ cache_service = self.env["spp.metric.dimension.cache"]
+ dim_evaluations = {}
+ for dimension in fallback_dims:
+ dim_evaluations[dimension.name] = cache_service.evaluate_dimension_batch(dimension, all_registrant_ids)
+
+ results = {}
+ for area_id in base_areas.ids:
+ reg_ids = area_registrant_ids.get(area_id, [])
+ if not reg_ids:
+ continue
+ area_disagg = {}
+ for dimension in fallback_dims:
+ evals = dim_evaluations[dimension.name]
+ value_counts = {}
+ for rid in reg_ids:
+ value = evals.get(rid, dimension.default_value or "unknown")
+ value_counts[value] = value_counts.get(value, 0) + 1
+ area_disagg[dimension.name] = value_counts
+ results[area_id] = area_disagg
+
+ return results
+
+ def _disaggregation_python_expanded(self, group_registrants, fallback_dims, child_to_base, area_field):
+ """Python fallback with member expansion for group-filtered reports."""
+ group_ids = [r["id"] for r in group_registrants]
+ if not group_ids:
+ return {}
+
+ # Build group area mapping
+ group_area = {}
+ for reg in group_registrants:
+ area_val = reg[area_field]
+ if area_val:
+ area_id = area_val[0] if isinstance(area_val, (list, tuple)) else area_val
+ group_area[reg["id"]] = area_id
+
+ # Find individual members of these groups
+ Membership = self.env["spp.group.membership"]
+ memberships = Membership.search_read(
+ [("group", "in", group_ids), ("is_ended", "=", False)],
+ ["individual", "group"],
+ )
+
+ if not memberships:
+ return {}
+
+ # Build individual -> base_area mapping, inheriting from group
+ seen_individuals = set()
+ individual_area = {}
+ all_individual_ids = []
+
+ for mem in memberships:
+ ind_id = mem["individual"][0] if isinstance(mem["individual"], (list, tuple)) else mem["individual"]
+ if ind_id in seen_individuals:
+ continue
+ seen_individuals.add(ind_id)
+ all_individual_ids.append(ind_id)
+
+ grp_id = mem["group"][0] if isinstance(mem["group"], (list, tuple)) else mem["group"]
+ # Individual's area or inherited from group
+ grp_area_id = group_area.get(grp_id)
+ if grp_area_id:
+ base_id = child_to_base.get(grp_area_id)
+ if base_id:
+ individual_area[ind_id] = base_id
+
+ if not all_individual_ids:
+ return {}
+
+ cache_service = self.env["spp.metric.dimension.cache"]
+ dim_evaluations = {}
+ for dimension in fallback_dims:
+ dim_evaluations[dimension.name] = cache_service.evaluate_dimension_batch(dimension, all_individual_ids)
+
+ results = {}
+ for ind_id in all_individual_ids:
+ base_id = individual_area.get(ind_id)
+ if base_id is None:
+ continue
+ area_disagg = results.setdefault(base_id, {})
+ for dimension in fallback_dims:
+ evals = dim_evaluations[dimension.name]
+ value = evals.get(ind_id, dimension.default_value or "unknown")
+ dim_counts = area_disagg.setdefault(dimension.name, {})
+ dim_counts[value] = dim_counts.get(value, 0) + 1
+
+ return results
+
def _build_filter_domain(self):
"""Build the complete filter domain including context filters.
@@ -1190,8 +1565,11 @@ def _refresh_data(self):
self.ensure_one()
_logger.info("Starting full refresh for report ID %s", self.id)
+ # Step 0: Build shared area context
+ area_context = self._prepare_area_context()
+
# Step 1: Compute base aggregation
- base_results = self._compute_base_aggregation()
+ base_results = self._compute_base_aggregation(area_context)
if not base_results:
_logger.info("No data found for report ID %s", self.id)
@@ -1265,6 +1643,9 @@ def _refresh_data(self):
}
)
+ # Step 7b: Compute disaggregation (if dimensions configured)
+ disaggregation_by_area = self._compute_disaggregation(area_context)
+
# Step 8: Store results in spp.gis.report.data
now = fields.Datetime.now()
@@ -1291,6 +1672,7 @@ def _refresh_data(self):
"bucket_color": data.get("bucket_color", "#CCCCCC"),
"bucket_label": data.get("bucket_label", "No Data"),
"computed_at": now,
+ "disaggregation": disaggregation_by_area.get(area_id) or False,
}
if area_id in existing_data:
@@ -1473,6 +1855,10 @@ def _to_geojson(
# Get filtered data records
data_records = self.env["spp.gis.report.data"].search(domain)
+ # Build threshold lookup for enriching bucket info with ranges
+ sorted_thresholds = self.threshold_ids.sorted("sequence")
+ threshold_ranges = {i: (t.min_value, t.max_value) for i, t in enumerate(sorted_thresholds)}
+
# Build features
features = []
for data in data_records:
@@ -1496,6 +1882,8 @@ def _to_geojson(
"index": data.bucket_index, # null if no data
"color": data.bucket_color,
"label": data.bucket_label,
+ "min_value": threshold_ranges.get(data.bucket_index, (None, None))[0],
+ "max_value": threshold_ranges.get(data.bucket_index, (None, None))[1],
},
"weight": data.weight,
"record_count": data.record_count,
@@ -1511,9 +1899,11 @@ def _to_geojson(
"household_count": data.area_id.household_count,
}
- # Add disaggregation if requested
+ # Add disaggregation as flat disagg_* properties for QGIS compatibility
if include_disaggregation and data.disaggregation:
- properties["disaggregation"] = data.disaggregation
+ for dim_name, value_counts in data.disaggregation.items():
+ for raw_value, count in value_counts.items():
+ properties[f"disagg_{dim_name}_{raw_value}"] = count
# Build feature
feature = {
@@ -1588,6 +1978,28 @@ def _to_geojson(
)
metadata["thresholds"] = thresholds
+ # Add disaggregation dimension metadata (for interpreting disagg_* properties)
+ if include_disaggregation and self.dimension_ids:
+ disagg_metadata = []
+ for dim in self.dimension_ids:
+ dim_info = {
+ "name": dim.name,
+ "label": dim.label,
+ "property_prefix": f"disagg_{dim.name}_",
+ }
+ if dim.value_labels_json:
+ labels = dim.value_labels_json
+ if isinstance(labels, str):
+ import json
+
+ try:
+ labels = json.loads(labels)
+ except (json.JSONDecodeError, TypeError):
+ labels = {}
+ dim_info["value_labels"] = labels
+ disagg_metadata.append(dim_info)
+ metadata["disaggregation"] = disagg_metadata
+
# Add summary statistics if we have data
if data_records:
summary = self._get_summary(
diff --git a/spp_gis_report/readme/HISTORY.md b/spp_gis_report/readme/HISTORY.md
index d519fd50e..369cb3265 100644
--- a/spp_gis_report/readme/HISTORY.md
+++ b/spp_gis_report/readme/HISTORY.md
@@ -1,3 +1,7 @@
+### 19.0.2.1.0
+
+- feat: metric disaggregation in GIS reports (breakdown dimensions, report helpers, wizard support) (re-land from #76; uses the spp_metric_service breakdown API).
+
### 19.0.2.0.1
- fix(security): grant `group_gis_report_user` to spp_user_roles' Global Program Manager role so the OP#951 menu audit expectation (Program Manager sees GIS Reports) is preserved once the GIS Reports menu root is gated.
diff --git a/spp_gis_report/tests/__init__.py b/spp_gis_report/tests/__init__.py
index 1ad03b1a5..a5915facc 100644
--- a/spp_gis_report/tests/__init__.py
+++ b/spp_gis_report/tests/__init__.py
@@ -6,5 +6,7 @@
from . import test_gis_report
from . import test_gis_report_api
from . import test_gis_report_computation
+from . import test_gis_report_helpers
+from . import test_disaggregation
from . import test_gis_report_data
from . import test_gis_report_wizard
diff --git a/spp_gis_report/tests/test_area_ext.py b/spp_gis_report/tests/test_area_ext.py
index 65f72ff5f..fc1117268 100644
--- a/spp_gis_report/tests/test_area_ext.py
+++ b/spp_gis_report/tests/test_area_ext.py
@@ -817,8 +817,12 @@ def test_04_report_legend_from_thresholds(self):
self.assertEqual(len(report_layer["report_legend"]), 2)
self.assertEqual(report_layer["report_legend"][0]["color"], "#00ff00")
self.assertEqual(report_layer["report_legend"][0]["label"], "Low")
+ self.assertEqual(report_layer["report_legend"][0]["min_value"], 0)
+ self.assertEqual(report_layer["report_legend"][0]["max_value"], 50)
self.assertEqual(report_layer["report_legend"][1]["color"], "#ff0000")
self.assertEqual(report_layer["report_legend"][1]["label"], "High")
+ self.assertEqual(report_layer["report_legend"][1]["min_value"], 50)
+ self.assertEqual(report_layer["report_legend"][1]["max_value"], 100)
self.assertEqual(report_layer["report_legend_title"], "Report With Legend")
def test_05_shapely_geometry_converted_to_geojson(self):
diff --git a/spp_gis_report/tests/test_disaggregation.py b/spp_gis_report/tests/test_disaggregation.py
new file mode 100644
index 000000000..1d6ed27f3
--- /dev/null
+++ b/spp_gis_report/tests/test_disaggregation.py
@@ -0,0 +1,452 @@
+# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
+
+import logging
+
+from odoo.fields import Command
+from odoo.tests import tagged
+
+from .common import GISReportTestBase
+
+_logger = logging.getLogger(__name__)
+
+
+@tagged("post_install", "-at_install")
+class TestDisaggregation(GISReportTestBase):
+ """Test disaggregation computation via demographic dimensions."""
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ # Create gender dimension (field-based)
+ cls.gender_dimension = cls.env["spp.demographic.dimension"].search([("name", "=", "gender")], limit=1)
+ if not cls.gender_dimension:
+ cls.gender_dimension = cls.env["spp.demographic.dimension"].create(
+ {
+ "name": "gender",
+ "label": "Gender",
+ "dimension_type": "field",
+ "field_path": "gender_id.code",
+ "value_labels_json": {"1": "Male", "2": "Female", "0": "Not Known"},
+ "default_value": "unknown",
+ }
+ )
+
+ # Create age_group dimension (field-based for testing, uses a simple field)
+ cls.age_dimension = cls.env["spp.demographic.dimension"].search([("name", "=", "age_group")], limit=1)
+ if not cls.age_dimension:
+ cls.age_dimension = cls.env["spp.demographic.dimension"].create(
+ {
+ "name": "age_group",
+ "label": "Age Group",
+ "dimension_type": "field",
+ "field_path": "is_group",
+ "value_labels_json": {"true": "Group", "false": "Individual"},
+ "default_value": "unknown",
+ }
+ )
+
+ # Create individuals-only dimension
+ cls.individual_dimension = cls.env["spp.demographic.dimension"].create(
+ {
+ "name": "test_individual_dim",
+ "label": "Test Individual Dim",
+ "dimension_type": "field",
+ "field_path": "is_registrant",
+ "applies_to": "individuals",
+ "default_value": "n/a",
+ }
+ )
+
+ # =========================================================================
+ # Phase B: _compute_disaggregation() tests
+ # =========================================================================
+
+ def test_disaggregation_no_dimensions_returns_empty(self):
+ """Test _compute_disaggregation returns empty dict when no dimensions configured."""
+ report = self.create_test_report(name="No Dims Report")
+ area_context = report._prepare_area_context()
+ result = report._compute_disaggregation(area_context)
+ self.assertEqual(result, {})
+
+ def test_disaggregation_non_partner_model_returns_empty(self):
+ """Test _compute_disaggregation returns empty dict for non-partner source model."""
+ # Find a non-partner model
+ area_model = self.env["ir.model"].search([("model", "=", "spp.area")], limit=1)
+ if not area_model:
+ return # Skip if spp.area model not found
+
+ report = self.env["spp.gis.report"].create(
+ {
+ "name": "Non-Partner Report",
+ "code": "non_partner_test",
+ "category_id": self.category.id,
+ "source_model_id": area_model.id,
+ "area_field_path": "parent_id",
+ "aggregation_method": "count",
+ "normalization_method": "raw",
+ "base_area_level": 2,
+ "dimension_ids": [Command.set([self.gender_dimension.id])],
+ }
+ )
+ area_context = report._prepare_area_context()
+ result = report._compute_disaggregation(area_context)
+ self.assertEqual(result, {})
+
+ def test_disaggregation_with_gender_dimension(self):
+ """Test _compute_disaggregation with gender dimension returns per-area counts."""
+ report = self.create_test_report(
+ name="Gender Disagg Test",
+ dimension_ids=[Command.set([self.gender_dimension.id])],
+ )
+ area_context = report._prepare_area_context()
+ result = report._compute_disaggregation(area_context)
+
+ # We have 3 registrants total:
+ # - individual_1 in district_1 (no gender set, should get default)
+ # - individual_2 in district_2
+ # - group in district_1
+ # All lack gender_id, so all should fall to default_value="unknown"
+
+ # We should have results for areas that have registrants
+ self.assertIsInstance(result, dict)
+ # Each area result should have a "gender" key
+ for _area_id, disagg in result.items():
+ self.assertIn("gender", disagg)
+
+ def test_disaggregation_with_multiple_dimensions(self):
+ """Test _compute_disaggregation with multiple dimensions."""
+ report = self.create_test_report(
+ name="Multi Dim Test",
+ dimension_ids=[Command.set([self.gender_dimension.id, self.age_dimension.id])],
+ )
+ area_context = report._prepare_area_context()
+ result = report._compute_disaggregation(area_context)
+
+ # Each area result should have both dimension keys
+ for _area_id, disagg in result.items():
+ self.assertIn("gender", disagg)
+ self.assertIn("age_group", disagg)
+
+ def test_disaggregation_applies_to_filtering(self):
+ """Test _compute_disaggregation respects applies_to on dimensions."""
+ report = self.create_test_report(
+ name="Applies To Test",
+ dimension_ids=[Command.set([self.individual_dimension.id])],
+ )
+ area_context = report._prepare_area_context()
+ result = report._compute_disaggregation(area_context)
+
+ # The dimension only applies to individuals. Groups should get "n/a".
+ self.assertIsInstance(result, dict)
+ # District 1 has both an individual and a group
+ if self.area_district_1.id in result:
+ dim_data = result[self.area_district_1.id]["test_individual_dim"]
+ # The group registrant should have "n/a" value
+ self.assertIn("n/a", dim_data)
+
+ def test_disaggregation_none_area_context_returns_empty(self):
+ """Test _compute_disaggregation returns empty dict when area_context is None."""
+ report = self.create_test_report(
+ name="None Context Test",
+ dimension_ids=[Command.set([self.gender_dimension.id])],
+ )
+ result = report._compute_disaggregation(None)
+ self.assertEqual(result, {})
+
+ # =========================================================================
+ # Phase B: _refresh_data() stores disaggregation
+ # =========================================================================
+
+ def test_refresh_data_stores_disaggregation(self):
+ """Test _refresh_data populates disaggregation field on data records."""
+ report = self.create_test_report(
+ name="Refresh Disagg Test",
+ dimension_ids=[Command.set([self.gender_dimension.id])],
+ )
+ report._refresh_data()
+
+ # Check that data records have disaggregation populated
+ data_with_disagg = report.data_ids.filtered(lambda d: d.disaggregation and d.area_level == 2)
+ # We should have base-level data records with disaggregation
+ # (only for areas that have registrants)
+ for data in data_with_disagg:
+ self.assertIn("gender", data.disaggregation)
+
+ def test_refresh_data_no_dimensions_no_disaggregation(self):
+ """Test _refresh_data leaves disaggregation empty when no dimensions configured."""
+ report = self.create_test_report(name="No Dims Refresh Test")
+ report._refresh_data()
+
+ for data in report.data_ids:
+ self.assertFalse(data.disaggregation)
+
+ # =========================================================================
+ # Phase C: GeoJSON output with flat disagg_* properties
+ # =========================================================================
+
+ def test_geojson_flat_disagg_properties(self):
+ """Test GeoJSON output includes flat disagg_* properties when requested."""
+ report = self.create_test_report(
+ name="GeoJSON Disagg Test",
+ dimension_ids=[Command.set([self.gender_dimension.id])],
+ )
+ # Create data with known disaggregation
+ self.create_test_data(
+ report,
+ self.area_district_1,
+ raw_value=100,
+ disaggregation={"gender": {"1": 60, "2": 40}},
+ )
+
+ geojson = report._to_geojson(
+ include_disaggregation=True,
+ include_geometry=False,
+ )
+
+ features = geojson["features"]
+ self.assertTrue(len(features) > 0)
+
+ # Find the district_1 feature
+ feature = next(f for f in features if f["id"] == self.area_district_1.code)
+ props = feature["properties"]
+
+ # Should have flat disagg_* properties
+ self.assertEqual(props["disagg_gender_1"], 60)
+ self.assertEqual(props["disagg_gender_2"], 40)
+
+ def test_geojson_no_disagg_without_flag(self):
+ """Test GeoJSON without include_disaggregation has no disagg_* properties."""
+ report = self.create_test_report(
+ name="GeoJSON No Disagg Test",
+ dimension_ids=[Command.set([self.gender_dimension.id])],
+ )
+ self.create_test_data(
+ report,
+ self.area_district_1,
+ raw_value=100,
+ disaggregation={"gender": {"1": 60, "2": 40}},
+ )
+
+ geojson = report._to_geojson(
+ include_disaggregation=False,
+ include_geometry=False,
+ )
+
+ feature = geojson["features"][0]
+ props = feature["properties"]
+
+ # Should NOT have disagg_* properties
+ disagg_keys = [k for k in props if k.startswith("disagg_")]
+ self.assertEqual(len(disagg_keys), 0)
+
+ def test_geojson_disagg_metadata(self):
+ """Test GeoJSON metadata includes disaggregation dimension info with labels."""
+ report = self.create_test_report(
+ name="GeoJSON Metadata Test",
+ dimension_ids=[Command.set([self.gender_dimension.id])],
+ )
+ self.create_test_data(
+ report,
+ self.area_district_1,
+ raw_value=100,
+ disaggregation={"gender": {"1": 60, "2": 40}},
+ )
+
+ geojson = report._to_geojson(
+ include_disaggregation=True,
+ include_geometry=False,
+ )
+
+ metadata = geojson["metadata"]
+ self.assertIn("disaggregation", metadata)
+ self.assertEqual(len(metadata["disaggregation"]), 1)
+
+ dim_meta = metadata["disaggregation"][0]
+ self.assertEqual(dim_meta["name"], "gender")
+ self.assertEqual(dim_meta["label"], "Gender")
+ self.assertEqual(dim_meta["property_prefix"], "disagg_gender_")
+ self.assertIn("value_labels", dim_meta)
+ self.assertEqual(dim_meta["value_labels"]["1"], "Male")
+ self.assertEqual(dim_meta["value_labels"]["2"], "Female")
+
+ def test_geojson_no_disagg_metadata_without_flag(self):
+ """Test GeoJSON metadata excludes disaggregation when not requested."""
+ report = self.create_test_report(
+ name="GeoJSON No Meta Test",
+ dimension_ids=[Command.set([self.gender_dimension.id])],
+ )
+ self.create_test_data(report, self.area_district_1, raw_value=100)
+
+ geojson = report._to_geojson(
+ include_disaggregation=False,
+ include_geometry=False,
+ )
+
+ metadata = geojson["metadata"]
+ self.assertNotIn("disaggregation", metadata)
+
+ # =========================================================================
+ # Member Expansion Tests
+ # =========================================================================
+
+ def test_member_expansion_with_gender(self):
+ """Test member_expansion='expand' drills into group members."""
+ # Create individual members of the group with gender
+ gender_male = self.env["spp.vocabulary.code"].search([("code", "=", "1")], limit=1)
+ gender_female = self.env["spp.vocabulary.code"].search([("code", "=", "2")], limit=1)
+
+ # Only run if gender codes exist
+ if not gender_male or not gender_female:
+ self.skipTest("Gender vocabulary codes not available")
+
+ member1 = self.env["res.partner"].create(
+ {
+ "name": "Member Male",
+ "is_registrant": True,
+ "is_group": False,
+ }
+ )
+ member1.gender_id = gender_male.id
+
+ member2 = self.env["res.partner"].create(
+ {
+ "name": "Member Female",
+ "is_registrant": True,
+ "is_group": False,
+ }
+ )
+ member2.gender_id = gender_female.id
+
+ # Add members to group
+ self.env["spp.group.membership"].create({"group": self.registrant_group.id, "individual": member1.id})
+ self.env["spp.group.membership"].create({"group": self.registrant_group.id, "individual": member2.id})
+
+ # Create report with member expansion filtering groups
+ report = self.create_test_report(
+ name="Expand Test",
+ dimension_ids=[Command.set([self.gender_dimension.id])],
+ member_expansion="expand",
+ filter_domain="[('is_registrant', '=', True), ('is_group', '=', True)]",
+ filter_mode="domain",
+ )
+
+ area_context = report._prepare_area_context()
+ result = report._compute_disaggregation(area_context)
+
+ # Should have results for district_1 (the group's area)
+ self.assertIn(self.area_district_1.id, result)
+ gender_data = result[self.area_district_1.id]["gender"]
+ # Should have counted individuals, not the group
+ total = sum(gender_data.values())
+ self.assertEqual(total, 2) # 2 members
+
+ def test_member_expansion_area_inheritance(self):
+ """Test individuals inherit area from their group when they lack one."""
+ member = self.env["res.partner"].create(
+ {
+ "name": "No Area Member",
+ "is_registrant": True,
+ "is_group": False,
+ # No area_id set
+ }
+ )
+
+ self.env["spp.group.membership"].create({"group": self.registrant_group.id, "individual": member.id})
+
+ report = self.create_test_report(
+ name="Area Inherit Test",
+ dimension_ids=[Command.set([self.age_dimension.id])],
+ member_expansion="expand",
+ filter_domain="[('is_registrant', '=', True), ('is_group', '=', True)]",
+ filter_mode="domain",
+ )
+
+ area_context = report._prepare_area_context()
+ result = report._compute_disaggregation(area_context)
+
+ # The member should appear under district_1 (inherited from group)
+ self.assertIn(self.area_district_1.id, result)
+
+ def test_member_expansion_distinct_dedup(self):
+ """Test individuals in multiple groups are counted once (DISTINCT)."""
+ # Create a second group in district_1
+ group2 = self.env["res.partner"].create(
+ {
+ "name": "Test Group 2",
+ "is_registrant": True,
+ "is_group": True,
+ "area_id": self.area_district_1.id,
+ }
+ )
+
+ shared_member = self.env["res.partner"].create(
+ {
+ "name": "Shared Member",
+ "is_registrant": True,
+ "is_group": False,
+ }
+ )
+
+ # Add to both groups
+ self.env["spp.group.membership"].create({"group": self.registrant_group.id, "individual": shared_member.id})
+ self.env["spp.group.membership"].create({"group": group2.id, "individual": shared_member.id})
+
+ report = self.create_test_report(
+ name="Dedup Test",
+ dimension_ids=[Command.set([self.age_dimension.id])],
+ member_expansion="expand",
+ filter_domain="[('is_registrant', '=', True), ('is_group', '=', True)]",
+ filter_mode="domain",
+ )
+
+ area_context = report._prepare_area_context()
+ result = report._compute_disaggregation(area_context)
+
+ # Count the total across all dimension values for district_1
+ if self.area_district_1.id in result:
+ age_data = result[self.area_district_1.id].get("age_group", {})
+ total = sum(age_data.values())
+ # shared_member should only be counted once (not doubled across groups)
+ self.assertEqual(total, 1)
+
+ def test_backward_compat_no_expansion(self):
+ """Test member_expansion='none' produces same output format as before."""
+ report = self.create_test_report(
+ name="Backward Compat Test",
+ dimension_ids=[Command.set([self.gender_dimension.id])],
+ member_expansion="none",
+ )
+ area_context = report._prepare_area_context()
+ result = report._compute_disaggregation(area_context)
+
+ # Should still produce {area_id: {dim_name: {value: count}}}
+ self.assertIsInstance(result, dict)
+ for _area_id, disagg in result.items():
+ self.assertIn("gender", disagg)
+ for value, count in disagg["gender"].items():
+ self.assertIsInstance(value, str)
+ self.assertIsInstance(count, int)
+
+ def test_member_expansion_constrains(self):
+ """Test member_expansion='expand' rejected for non-partner models."""
+ area_model = self.env["ir.model"].search([("model", "=", "spp.area")], limit=1)
+ if not area_model:
+ self.skipTest("spp.area model not available")
+
+ from odoo.exceptions import ValidationError
+
+ with self.assertRaises(ValidationError):
+ self.env["spp.gis.report"].create(
+ {
+ "name": "Bad Expand Report",
+ "code": "bad_expand_test",
+ "category_id": self.category.id,
+ "source_model_id": area_model.id,
+ "area_field_path": "parent_id",
+ "aggregation_method": "count",
+ "normalization_method": "raw",
+ "base_area_level": 2,
+ "member_expansion": "expand",
+ }
+ )
diff --git a/spp_gis_report/tests/test_gis_report_api.py b/spp_gis_report/tests/test_gis_report_api.py
index 051c49c32..673fdc333 100644
--- a/spp_gis_report/tests/test_gis_report_api.py
+++ b/spp_gis_report/tests/test_gis_report_api.py
@@ -104,6 +104,19 @@ def setUpClass(cls):
}
)
+ # Get or create a gender dimension for test
+ cls.gender_dimension = cls.env["spp.demographic.dimension"].search([("name", "=", "gender")], limit=1)
+ if not cls.gender_dimension:
+ cls.gender_dimension = cls.env["spp.demographic.dimension"].create(
+ {
+ "name": "gender",
+ "label": "Gender",
+ "dimension_type": "field",
+ "field_path": "gender_id.code",
+ "value_labels_json": {"1": "Male", "2": "Female", "0": "Not Known"},
+ }
+ )
+
# Create report with disaggregation
cls.report_disagg = cls.env["spp.gis.report"].create(
{
@@ -115,7 +128,7 @@ def setUpClass(cls):
"aggregation_method": "count",
"normalization_method": "raw",
"base_area_level": 2,
- "disaggregate_by_gender": True,
+ "dimension_ids": [(6, 0, [cls.gender_dimension.id])],
}
)
cls.env["spp.gis.report.data"].create(
@@ -328,16 +341,24 @@ def test_10_geojson_simple_format(self):
self.assertNotIn("metadata", result)
def test_11_geojson_with_disaggregation(self):
- """Test GeoJSON includes disaggregation data when requested."""
+ """Test GeoJSON includes flat disagg_* properties when requested."""
self.authenticate("admin", "admin")
result = self._get_json(
"/api/v2/GISReport/api_disagg_test/geojson?include_disaggregation=true&include_geometry=false"
)
- # Verify disaggregation included
+ # Verify flat disagg_* properties are present
feature = result["features"][0]
- self.assertIn("disaggregation", feature["properties"])
- self.assertIn("gender", feature["properties"]["disaggregation"])
+ props = feature["properties"]
+ # The stored disaggregation has gender.male=60, gender.female=40
+ self.assertIn("disagg_gender_male", props)
+ self.assertEqual(props["disagg_gender_male"], 60)
+ self.assertIn("disagg_gender_female", props)
+ self.assertEqual(props["disagg_gender_female"], 40)
+
+ # Verify disaggregation metadata in the metadata block
+ metadata = result.get("metadata", {})
+ self.assertIn("disaggregation", metadata)
def test_12_geojson_report_not_found(self):
"""Test GeoJSON endpoint handles non-existent report."""
diff --git a/spp_gis_report/tests/test_gis_report_helpers.py b/spp_gis_report/tests/test_gis_report_helpers.py
new file mode 100644
index 000000000..7cf2d6928
--- /dev/null
+++ b/spp_gis_report/tests/test_gis_report_helpers.py
@@ -0,0 +1,72 @@
+# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
+"""Tests for spp.gis.report helper methods: threshold calculators, refresh
+compute, the member-expansion constraint, and view/refresh actions."""
+
+from odoo.exceptions import ValidationError
+from odoo.tests import tagged
+
+from .common import GISReportTestBase
+
+
+@tagged("post_install", "-at_install")
+class TestGisReportHelpers(GISReportTestBase):
+ """Cover quantile/jenks calculators, constraint, compute, and actions."""
+
+ def test_calculate_quantiles(self):
+ """Quantile breakpoints span the data and have num_buckets + 1 entries."""
+ report = self.create_test_report()
+ self.assertEqual(report._calculate_quantiles([], 4), [0, 1])
+ breaks = report._calculate_quantiles([1, 2, 3, 4, 5], 4)
+ self.assertEqual(len(breaks), 5)
+ self.assertEqual(breaks[0], 1)
+ self.assertEqual(breaks[-1], 5)
+
+ def test_calculate_jenks_breaks_small_dataset(self):
+ """When there are no more values than buckets, values are returned as-is."""
+ report = self.create_test_report()
+ self.assertEqual(report._calculate_jenks_breaks([10, 20], 5), [10, 20, 20])
+
+ def test_calculate_jenks_breaks_with_gaps(self):
+ """The largest natural gap becomes a break point."""
+ report = self.create_test_report()
+ breaks = report._calculate_jenks_breaks([1, 2, 3, 100, 101, 102], 2)
+ self.assertEqual(breaks[0], 1)
+ self.assertEqual(breaks[-1], 102)
+ self.assertIn(100, breaks)
+
+ def test_compute_next_refresh_without_cron(self):
+ """A report with no scheduling cron has no next refresh time."""
+ report = self.create_test_report()
+ self.assertFalse(report.next_refresh)
+
+ def test_check_member_expansion_rejects_non_partner_source(self):
+ """member_expansion='expand' is only valid for res.partner sources."""
+ area_model = self.env["ir.model"].search([("model", "=", "spp.area")], limit=1)
+ with self.assertRaises(ValidationError):
+ self.create_test_report(source_model_id=area_model.id, member_expansion="expand")
+
+ def test_check_member_expansion_allows_partner_source(self):
+ """member_expansion='expand' is accepted for a res.partner source."""
+ report = self.create_test_report(member_expansion="expand")
+ self.assertEqual(report.member_expansion, "expand")
+
+ def test_action_view_data(self):
+ """The data action targets this report's data records."""
+ report = self.create_test_report()
+ action = report.action_view_data()
+ self.assertEqual(action["type"], "ir.actions.act_window")
+ self.assertEqual(action["res_model"], "spp.gis.report.data")
+ self.assertEqual(action["domain"], [("report_id", "=", report.id)])
+
+ def test_action_view_map(self):
+ """The map action opens the GIS view, or falls back to the data view."""
+ report = self.create_test_report()
+ action = report.action_view_map()
+ self.assertEqual(action["type"], "ir.actions.act_window")
+ self.assertIn(action["res_model"], ("spp.area", "spp.gis.report.data"))
+
+ def test_action_refresh_now(self):
+ """Refreshing recomputes data and stamps last_refresh."""
+ report = self.create_test_report()
+ report.action_refresh_now()
+ self.assertTrue(report.last_refresh)
diff --git a/spp_gis_report/tests/test_gis_report_wizard.py b/spp_gis_report/tests/test_gis_report_wizard.py
index c66f04157..695f7a605 100644
--- a/spp_gis_report/tests/test_gis_report_wizard.py
+++ b/spp_gis_report/tests/test_gis_report_wizard.py
@@ -349,6 +349,18 @@ def test_14_wizard_code_uniqueness(self):
def test_15_wizard_prepare_report_vals(self):
"""Test preparation of report values from wizard."""
+ # Get or create a gender dimension for test
+ gender_dim = self.env["spp.demographic.dimension"].search([("name", "=", "gender")], limit=1)
+ if not gender_dim:
+ gender_dim = self.env["spp.demographic.dimension"].create(
+ {
+ "name": "gender",
+ "label": "Gender",
+ "dimension_type": "field",
+ "field_path": "gender_id.code",
+ }
+ )
+
wizard = self.env["spp.gis.report.wizard"].create(
{
"template_id": self.template.id,
@@ -357,7 +369,7 @@ def test_15_wizard_prepare_report_vals(self):
"base_area_level": 2,
"normalization_method": "per_area_sqkm",
"enable_rollup": True,
- "disaggregate_by_gender": True,
+ "dimension_ids": [(6, 0, [gender_dim.id])],
"threshold_mode": "auto_quartile",
"refresh_mode": "scheduled",
"refresh_interval": "hourly",
@@ -376,7 +388,8 @@ def test_15_wizard_prepare_report_vals(self):
self.assertEqual(vals["base_area_level"], 2)
self.assertEqual(vals["normalization_method"], "per_area_sqkm")
self.assertTrue(vals["enable_rollup"])
- self.assertTrue(vals["disaggregate_by_gender"])
+ # dimension_ids should be a Command tuple
+ self.assertIn("dimension_ids", vals)
# Check threshold fields
self.assertEqual(vals["threshold_mode"], "auto_quartile")
@@ -418,22 +431,44 @@ def test_17_wizard_no_template_error(self):
self.assertIn("template", str(cm.exception).lower())
def test_18_wizard_disaggregation_options(self):
- """Test disaggregation options are properly set."""
+ """Test disaggregation dimension_ids are properly set."""
+ # Get or create dimensions for test
+ gender_dim = self.env["spp.demographic.dimension"].search([("name", "=", "gender")], limit=1)
+ if not gender_dim:
+ gender_dim = self.env["spp.demographic.dimension"].create(
+ {
+ "name": "gender",
+ "label": "Gender",
+ "dimension_type": "field",
+ "field_path": "gender_id.code",
+ }
+ )
+ age_dim = self.env["spp.demographic.dimension"].search([("name", "=", "age_group")], limit=1)
+ if not age_dim:
+ age_dim = self.env["spp.demographic.dimension"].create(
+ {
+ "name": "age_group",
+ "label": "Age Group",
+ "dimension_type": "expression",
+ "cel_expression": "age_bucket(r.birthdate)",
+ }
+ )
+
wizard = self.env["spp.gis.report.wizard"].create(
{
"template_id": self.template.id,
"name": "Disaggregation Test",
"base_area_level": 2,
- "disaggregate_by_gender": True,
- "disaggregate_by_age": True,
+ "dimension_ids": [(6, 0, [gender_dim.id, age_dim.id])],
}
)
result = wizard.action_create_report()
report = self.env["spp.gis.report"].browse(result["res_id"])
- self.assertTrue(report.disaggregate_by_gender)
- self.assertTrue(report.disaggregate_by_age)
+ self.assertEqual(len(report.dimension_ids), 2)
+ self.assertIn(gender_dim, report.dimension_ids)
+ self.assertIn(age_dim, report.dimension_ids)
def test_19_wizard_color_schemes(self):
"""Test different color scheme options."""
diff --git a/spp_gis_report/views/gis_report_views.xml b/spp_gis_report/views/gis_report_views.xml
index b861a84d8..6ead4ccd5 100644
--- a/spp_gis_report/views/gis_report_views.xml
+++ b/spp_gis_report/views/gis_report_views.xml
@@ -327,16 +327,13 @@ Part of OpenSPP. See LICENSE file for full copyright and licensing details.
>
-
diff --git a/spp_gis_report/wizards/gis_report_wizard.py b/spp_gis_report/wizards/gis_report_wizard.py
index c0e809dd5..13ccca005 100644
--- a/spp_gis_report/wizards/gis_report_wizard.py
+++ b/spp_gis_report/wizards/gis_report_wizard.py
@@ -102,15 +102,10 @@ class GISReportWizard(models.TransientModel):
)
# Disaggregation options
- disaggregate_by_gender = fields.Boolean(
- string="Include Gender Disaggregation",
- default=False,
- help="Show gender breakdown in map popups",
- )
- disaggregate_by_age = fields.Boolean(
- string="Include Age Group Disaggregation",
- default=False,
- help="Show age group breakdown in map popups",
+ dimension_ids = fields.Many2many(
+ "spp.demographic.dimension",
+ string="Disaggregation Dimensions",
+ help="Demographic dimensions to compute disaggregation for",
)
# Rollup
@@ -360,8 +355,7 @@ def _prepare_report_vals(self):
"base_area_level": self.base_area_level,
"normalization_method": self.normalization_method,
"enable_rollup": self.enable_rollup,
- "disaggregate_by_gender": self.disaggregate_by_gender,
- "disaggregate_by_age": self.disaggregate_by_age,
+ "dimension_ids": [(6, 0, self.dimension_ids.ids)],
# Thresholds
"threshold_mode": self.threshold_mode,
"color_scheme_id": self.color_scheme_id.id,
diff --git a/spp_gis_report/wizards/gis_report_wizard_views.xml b/spp_gis_report/wizards/gis_report_wizard_views.xml
index 9d7cd4518..223aab16c 100644
--- a/spp_gis_report/wizards/gis_report_wizard_views.xml
+++ b/spp_gis_report/wizards/gis_report_wizard_views.xml
@@ -147,8 +147,11 @@
-
-
+