""" Parcel selection service: compute combined outline, total area, and Bauzone grouping. Used for multi-parcel selection in PEK map view. """ import logging from typing import Any, Dict, List, Optional from shapely.geometry import Polygon from shapely.ops import unary_union from shapely.geometry.base import BaseGeometry logger = logging.getLogger(__name__) def _parcel_to_shapely_polygon(parcel: Dict[str, Any]) -> Optional[Polygon]: """ Convert a parcel dict (perimeter or geometry_geojson) to Shapely Polygon. Returns None if conversion fails. """ # Try geometry_geojson first geo = parcel.get("geometry_geojson") or parcel.get("map_view", {}).get("geometry_geojson") if geo and isinstance(geo, dict): geom = geo.get("geometry") or geo if geom and isinstance(geom, dict) and geom.get("type") == "Polygon": coords = geom.get("coordinates") if coords and len(coords) > 0: ring = coords[0] if isinstance(coords[0][0], list) else coords if ring and len(ring) >= 3: try: poly = Polygon(ring) if not poly.is_empty: return poly except Exception as e: logger.debug(f"GeoJSON to Polygon failed: {e}") # Try perimeter (punkte with x, y) perimeter = parcel.get("perimeter") if not perimeter and "parcel" in parcel: perimeter = parcel.get("parcel", {}).get("perimeter") if perimeter and isinstance(perimeter, dict): punkte = perimeter.get("punkte", []) if len(punkte) >= 3: try: coords = [(p.get("x"), p.get("y")) for p in punkte if "x" in p and "y" in p] if len(coords) >= 3: if coords[0] != coords[-1]: coords.append(coords[0]) return Polygon(coords) except Exception as e: logger.debug(f"Perimeter to Polygon failed: {e}") return None def _shapely_to_geojson(geom: BaseGeometry) -> Dict[str, Any]: """ Convert Shapely geometry to GeoJSON dict (Polygon or MultiPolygon). GeoJSON rings are closed (first point == last point). """ if geom is None or geom.is_empty: return {"type": "Polygon", "coordinates": []} if hasattr(geom, "geoms"): # MultiPolygon: coordinates = [ [[ring1]], [[ring2]], ... ] per polygon parts = [] for g in geom.geoms: if not g.is_empty and hasattr(g, "exterior"): ring = [list(c) for c in g.exterior.coords] parts.append([ring]) return {"type": "MultiPolygon", "coordinates": parts} elif hasattr(geom, "exterior"): ring = [list(c) for c in geom.exterior.coords] return {"type": "Polygon", "coordinates": [ring]} return {"type": "Polygon", "coordinates": []} def compute_selection_summary(parcels: List[Dict[str, Any]]) -> Dict[str, Any]: """ Compute combined outline, total area, and Bauzone grouping for selected parcels. Args: parcels: List of parcel dicts with perimeter, geometry_geojson, bauzone, area_m2. Can have structure { "parcel": {...}, "map_view": {...} } or flat. Returns: { "combined_outline_geojson": { "type": "Polygon"|"MultiPolygon", "coordinates": [...] }, "total_area_m2": float, "bauzonen": [ { "bauzone": str, "parcels": [...], "area_m2": float } ] } """ if not parcels: return { "combined_outline_geojson": {"type": "Polygon", "coordinates": []}, "total_area_m2": 0.0, "bauzonen": [], } # Normalize: extract parcel data for geometry and Bauzone shapely_polygons = [] parcel_records = [] for p in parcels: # Support both { parcel: {...}, map_view: {...} } and flat { id, perimeter, ... } flat = dict(p.get("parcel", {}), **{k: v for k, v in p.items() if k != "parcel"}) flat["map_view"] = p.get("map_view", {}) flat["geometry_geojson"] = flat.get("geometry_geojson") or flat.get("map_view", {}).get("geometry_geojson") flat["perimeter"] = flat.get("perimeter") or flat.get("map_view", {}).get("geometry_geojson") and None poly = _parcel_to_shapely_polygon(flat) if poly is not None and not poly.is_empty: shapely_polygons.append(poly) parcel_records.append(flat) if not shapely_polygons: return { "combined_outline_geojson": {"type": "Polygon", "coordinates": []}, "total_area_m2": 0.0, "bauzonen": [], } # Union all polygons (keeps MultiPolygon if disconnected) combined = unary_union(shapely_polygons) total_area_m2 = float(combined.area) if combined and not combined.is_empty else 0.0 combined_geojson = _shapely_to_geojson(combined) # Group parcels by Bauzone bauzone_map: Dict[str, List[Dict]] = {} for flat in parcel_records: bz = flat.get("bauzone") or flat.get("parcel", {}).get("bauzone") or "Unbekannt" if bz not in bauzone_map: bauzone_map[bz] = [] bauzone_map[bz].append(flat) bauzonen = [] for bz, plist in bauzone_map.items(): area_sum = sum( float(p.get("area_m2") or p.get("parcel", {}).get("area_m2") or 0) for p in plist ) bauzonen.append({ "bauzone": bz, "parcels": plist, "area_m2": round(area_sum, 2), }) return { "combined_outline_geojson": combined_geojson, "total_area_m2": round(total_area_m2, 2), "bauzonen": bauzonen, } def is_parcel_adjacent_to_selection( new_parcel: Dict[str, Any], selected_parcels: List[Dict[str, Any]], buffer_m: float = 0.01, ) -> bool: """ Check if a parcel touches (is adjacent to) any selected parcel. Uses small buffer for floating-point tolerance. """ new_poly = _parcel_to_shapely_polygon(new_parcel) if new_poly is None or new_poly.is_empty: return False for sel in selected_parcels: flat = dict(sel.get("parcel", {}), **{k: v for k, v in sel.items() if k != "parcel"}) flat["map_view"] = sel.get("map_view", {}) flat["geometry_geojson"] = flat.get("geometry_geojson") or flat.get("map_view", {}).get("geometry_geojson") sel_poly = _parcel_to_shapely_polygon(flat) if sel_poly is None or sel_poly.is_empty: continue if buffer_m > 0: sel_buf = sel_poly.buffer(buffer_m) if new_poly.intersects(sel_buf) or new_poly.touches(sel_buf): return True else: if new_poly.touches(sel_poly) or new_poly.intersects(sel_poly): return True return False