gateway/modules/features/realEstate/parcelSelectionService.py

180 lines
6.7 KiB
Python

"""
Parcel selection service: compute combined outline, total area, and Bauzone grouping.
Used for multi-parcel selection in PEK map view.
"""
import logging
from typing import Any, Dict, List, Optional
from shapely.geometry import Polygon
from shapely.ops import unary_union
from shapely.geometry.base import BaseGeometry
logger = logging.getLogger(__name__)
def _parcel_to_shapely_polygon(parcel: Dict[str, Any]) -> Optional[Polygon]:
"""
Convert a parcel dict (perimeter or geometry_geojson) to Shapely Polygon.
Returns None if conversion fails.
"""
# Try geometry_geojson first
geo = parcel.get("geometry_geojson") or parcel.get("map_view", {}).get("geometry_geojson")
if geo and isinstance(geo, dict):
geom = geo.get("geometry") or geo
if geom and isinstance(geom, dict) and geom.get("type") == "Polygon":
coords = geom.get("coordinates")
if coords and len(coords) > 0:
ring = coords[0] if isinstance(coords[0][0], list) else coords
if ring and len(ring) >= 3:
try:
poly = Polygon(ring)
if not poly.is_empty:
return poly
except Exception as e:
logger.debug(f"GeoJSON to Polygon failed: {e}")
# Try perimeter (punkte with x, y)
perimeter = parcel.get("perimeter")
if not perimeter and "parcel" in parcel:
perimeter = parcel.get("parcel", {}).get("perimeter")
if perimeter and isinstance(perimeter, dict):
punkte = perimeter.get("punkte", [])
if len(punkte) >= 3:
try:
coords = [(p.get("x"), p.get("y")) for p in punkte if "x" in p and "y" in p]
if len(coords) >= 3:
if coords[0] != coords[-1]:
coords.append(coords[0])
return Polygon(coords)
except Exception as e:
logger.debug(f"Perimeter to Polygon failed: {e}")
return None
def _shapely_to_geojson(geom: BaseGeometry) -> Dict[str, Any]:
"""
Convert Shapely geometry to GeoJSON dict (Polygon or MultiPolygon).
GeoJSON rings are closed (first point == last point).
"""
if geom is None or geom.is_empty:
return {"type": "Polygon", "coordinates": []}
if hasattr(geom, "geoms"):
# MultiPolygon: coordinates = [ [[ring1]], [[ring2]], ... ] per polygon
parts = []
for g in geom.geoms:
if not g.is_empty and hasattr(g, "exterior"):
ring = [list(c) for c in g.exterior.coords]
parts.append([ring])
return {"type": "MultiPolygon", "coordinates": parts}
elif hasattr(geom, "exterior"):
ring = [list(c) for c in geom.exterior.coords]
return {"type": "Polygon", "coordinates": [ring]}
return {"type": "Polygon", "coordinates": []}
def compute_selection_summary(parcels: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Compute combined outline, total area, and Bauzone grouping for selected parcels.
Args:
parcels: List of parcel dicts with perimeter, geometry_geojson, bauzone, area_m2.
Can have structure { "parcel": {...}, "map_view": {...} } or flat.
Returns:
{
"combined_outline_geojson": { "type": "Polygon"|"MultiPolygon", "coordinates": [...] },
"total_area_m2": float,
"bauzonen": [ { "bauzone": str, "parcels": [...], "area_m2": float } ]
}
"""
if not parcels:
return {
"combined_outline_geojson": {"type": "Polygon", "coordinates": []},
"total_area_m2": 0.0,
"bauzonen": [],
}
# Normalize: extract parcel data for geometry and Bauzone
shapely_polygons = []
parcel_records = []
for p in parcels:
# Support both { parcel: {...}, map_view: {...} } and flat { id, perimeter, ... }
flat = dict(p.get("parcel", {}), **{k: v for k, v in p.items() if k != "parcel"})
flat["map_view"] = p.get("map_view", {})
flat["geometry_geojson"] = flat.get("geometry_geojson") or flat.get("map_view", {}).get("geometry_geojson")
flat["perimeter"] = flat.get("perimeter") or flat.get("map_view", {}).get("geometry_geojson") and None
poly = _parcel_to_shapely_polygon(flat)
if poly is not None and not poly.is_empty:
shapely_polygons.append(poly)
parcel_records.append(flat)
if not shapely_polygons:
return {
"combined_outline_geojson": {"type": "Polygon", "coordinates": []},
"total_area_m2": 0.0,
"bauzonen": [],
}
# Union all polygons (keeps MultiPolygon if disconnected)
combined = unary_union(shapely_polygons)
total_area_m2 = float(combined.area) if combined and not combined.is_empty else 0.0
combined_geojson = _shapely_to_geojson(combined)
# Group parcels by Bauzone
bauzone_map: Dict[str, List[Dict]] = {}
for flat in parcel_records:
bz = flat.get("bauzone") or flat.get("parcel", {}).get("bauzone") or "Unbekannt"
if bz not in bauzone_map:
bauzone_map[bz] = []
bauzone_map[bz].append(flat)
bauzonen = []
for bz, plist in bauzone_map.items():
area_sum = sum(
float(p.get("area_m2") or p.get("parcel", {}).get("area_m2") or 0)
for p in plist
)
bauzonen.append({
"bauzone": bz,
"parcels": plist,
"area_m2": round(area_sum, 2),
})
return {
"combined_outline_geojson": combined_geojson,
"total_area_m2": round(total_area_m2, 2),
"bauzonen": bauzonen,
}
def is_parcel_adjacent_to_selection(
new_parcel: Dict[str, Any],
selected_parcels: List[Dict[str, Any]],
buffer_m: float = 0.01,
) -> bool:
"""
Check if a parcel touches (is adjacent to) any selected parcel.
Uses small buffer for floating-point tolerance.
"""
new_poly = _parcel_to_shapely_polygon(new_parcel)
if new_poly is None or new_poly.is_empty:
return False
for sel in selected_parcels:
flat = dict(sel.get("parcel", {}), **{k: v for k, v in sel.items() if k != "parcel"})
flat["map_view"] = sel.get("map_view", {})
flat["geometry_geojson"] = flat.get("geometry_geojson") or flat.get("map_view", {}).get("geometry_geojson")
sel_poly = _parcel_to_shapely_polygon(flat)
if sel_poly is None or sel_poly.is_empty:
continue
if buffer_m > 0:
sel_buf = sel_poly.buffer(buffer_m)
if new_poly.intersects(sel_buf) or new_poly.touches(sel_buf):
return True
else:
if new_poly.touches(sel_poly) or new_poly.intersects(sel_poly):
return True
return False