""" ÖREB WFS Connector This connector handles interactions with ÖREB (Öffentlich-rechtliche Eigentumsbeschränkungen) WFS services for zone information retrieval. ÖREB provides zoning information (Bauzonen) through WFS services. """ import logging from typing import Dict, List, Any, Optional import aiohttp import xml.etree.ElementTree as ET from shapely.geometry import Polygon logger = logging.getLogger(__name__) class OerebWfsConnector: """ Connector for ÖREB WFS services. Provides methods for: - Querying zone information (Bauzonen) by parcel geometry - Retrieving zoning data from canton-specific WFS services """ def __init__( self, timeout: int = 10, max_retries: int = 3, retry_delay: float = 1.0 ): """ Initialize ÖREB WFS connector. Args: timeout: Request timeout in seconds max_retries: Maximum number of retry attempts retry_delay: Initial retry delay in seconds (exponential backoff) """ self.timeout = aiohttp.ClientTimeout(total=timeout) self.max_retries = max_retries self.retry_delay = retry_delay self._wfs_cache: Dict[str, List[Dict[str, Any]]] = {} # Cache for WFS queries by bbox logger.info("ÖREB WFS Connector initialized") def _get_oereb_wfs_url(self, canton: str) -> Optional[str]: """ Get ÖREB WFS service URL for a given canton. Args: canton: Canton abbreviation (e.g., "ZH", "BE") Returns: WFS service URL or None if canton not supported """ oereb_wfs_urls = { "ZH": "https://maps.zh.ch/wfs/OerebKatasterZHWFS", } return oereb_wfs_urls.get(canton.upper()) def _geometry_to_shapely_polygon(self, geometry: Dict[str, Any]) -> Optional[Polygon]: """ Convert parcel geometry (ESRI rings or GeoJSON coordinates) to Shapely Polygon. Args: geometry: Geometry dictionary (ESRI rings or GeoJSON coordinates) Returns: Shapely Polygon or None if invalid """ try: # Handle ESRI geometry format (rings) if "rings" in geometry: rings = geometry.get("rings", []) if not rings or not rings[0]: return None # Use the first ring (exterior) for the polygon exterior_ring = rings[0] if len(exterior_ring) < 3: return None # Ensure polygon is closed coords = list(exterior_ring) if coords[0] != coords[-1]: coords.append(coords[0]) return Polygon(coords) # Handle GeoJSON format (coordinates) elif "coordinates" in geometry: coords = geometry.get("coordinates", []) if not coords: return None # Handle Polygon coordinates: [[[x1,y1], [x2,y2], ...]] # Flatten to get the exterior ring def extract_exterior(coord_list, depth=0): if depth == 0 and isinstance(coord_list, list) and len(coord_list) > 0: # First level might be array of rings, take first one if isinstance(coord_list[0], list) and len(coord_list[0]) > 0: if isinstance(coord_list[0][0], list): # This is Polygon format: [[[x,y],...]] return extract_exterior(coord_list[0], depth + 1) elif isinstance(coord_list[0][0], (int, float)): # This is already a ring: [[x,y],...] return coord_list[0] elif depth == 1 and isinstance(coord_list, list) and len(coord_list) > 0: if isinstance(coord_list[0], (int, float)): return coord_list elif isinstance(coord_list[0], list): return coord_list return coord_list exterior_coords = extract_exterior(coords) if not exterior_coords or len(exterior_coords) < 3: return None # Ensure polygon is closed coords_list = list(exterior_coords) if coords_list[0] != coords_list[-1]: coords_list.append(coords_list[0]) return Polygon(coords_list) except Exception as e: logger.debug(f"Error converting geometry to Shapely Polygon: {e}") return None def _parse_gml_geometry(self, feature_elem: ET.Element) -> Optional[Polygon]: """ Parse GML geometry from WFS feature element and convert to Shapely Polygon. Args: feature_elem: XML element containing the feature Returns: Shapely Polygon or None if geometry not found or invalid """ try: # Common GML namespaces namespaces = { 'gml': 'http://www.opengis.net/gml', 'gml3': 'http://www.opengis.net/gml/3.2', 'gml32': 'http://www.opengis.net/gml/3.2' } # Try to find polygon geometry polygon_elem = None for ns_prefix, ns_url in namespaces.items(): # Try different GML polygon element names for tag_name in ['Polygon', 'polygon', 'PolygonProperty', 'geometryProperty']: polygon_elem = feature_elem.find(f'.//{{{ns_url}}}{tag_name}') if polygon_elem is not None: break # Also try without namespace prefix polygon_elem = feature_elem.find(f'.//{tag_name}') if polygon_elem is not None: break if polygon_elem is not None: break if polygon_elem is None: # Try to find any geometry element for ns_prefix, ns_url in namespaces.items(): polygon_elem = feature_elem.find(f'.//{{{ns_url}}}*') if polygon_elem is not None and 'polygon' in polygon_elem.tag.lower(): break if polygon_elem is None: return None # Extract coordinates from GML # GML Polygon typically has exterior ring with posList or pos elements coords = [] # Try posList (most common in GML 3.2) for ns_prefix, ns_url in namespaces.items(): pos_list = polygon_elem.find(f'.//{{{ns_url}}}posList') if pos_list is not None and pos_list.text: # posList format: "x1 y1 x2 y2 x3 y3 ..." coord_strings = pos_list.text.strip().split() for i in range(0, len(coord_strings) - 1, 2): if i + 1 < len(coord_strings): x = float(coord_strings[i]) y = float(coord_strings[i + 1]) coords.append((x, y)) break # If no posList, try pos elements if not coords: for ns_prefix, ns_url in namespaces.items(): pos_elems = polygon_elem.findall(f'.//{{{ns_url}}}pos') if pos_elems: for pos in pos_elems: if pos.text: parts = pos.text.strip().split() if len(parts) >= 2: x = float(parts[0]) y = float(parts[1]) coords.append((x, y)) break # If still no coords, try coordinates element (GML 2) if not coords: for ns_prefix, ns_url in namespaces.items(): coords_elem = polygon_elem.find(f'.//{{{ns_url}}}coordinates') if coords_elem is not None and coords_elem.text: # GML 2 coordinates format: "x1,y1 x2,y2 ..." or "x1,y1,z1 x2,y2,z2 ..." coord_strings = coords_elem.text.strip().split() for coord_str in coord_strings: parts = coord_str.split(',') if len(parts) >= 2: x = float(parts[0]) y = float(parts[1]) coords.append((x, y)) break if len(coords) < 3: return None # Ensure polygon is closed if coords[0] != coords[-1]: coords.append(coords[0]) return Polygon(coords) except Exception as e: logger.debug(f"Error parsing GML geometry: {e}") return None def _calculate_bbox_from_geometry(self, geometry: Dict[str, Any]) -> Optional[str]: """ Calculate bounding box from geometry for WFS queries. Args: geometry: Geometry dictionary (ESRI rings or GeoJSON coordinates) Returns: Bounding box string in format "min_x,min_y,max_x,max_y" or None if invalid """ try: # Handle ESRI geometry format (rings) if "rings" in geometry: rings = geometry.get("rings", []) if not rings or not rings[0]: return None # Flatten all coordinates from all rings all_coords = [] for ring in rings: all_coords.extend(ring) if not all_coords: return None # Calculate bbox x_coords = [coord[0] for coord in all_coords] y_coords = [coord[1] for coord in all_coords] min_x = min(x_coords) min_y = min(y_coords) max_x = max(x_coords) max_y = max(y_coords) return f"{min_x},{min_y},{max_x},{max_y}" # Handle GeoJSON format (coordinates) elif "coordinates" in geometry: coords = geometry.get("coordinates", []) if not coords: return None # Flatten coordinates based on geometry type def flatten_coords(coord_list, depth=0): if depth < 2: result = [] for item in coord_list: if isinstance(item, (int, float)): return coord_list result.extend(flatten_coords(item, depth + 1)) return result return coord_list flat_coords = flatten_coords(coords) if not flat_coords or len(flat_coords) < 2: return None x_coords = [flat_coords[i] for i in range(0, len(flat_coords), 2)] y_coords = [flat_coords[i+1] for i in range(0, len(flat_coords)-1, 2)] min_x = min(x_coords) min_y = min(y_coords) max_x = max(x_coords) max_y = max(y_coords) return f"{min_x},{min_y},{max_x},{max_y}" except Exception as e: logger.debug(f"Error calculating bbox from geometry: {e}") return None async def _query_wfs_get_feature( self, wfs_url: str, type_name: str, bbox: str, srs: str = "EPSG:2056" ) -> List[Dict[str, Any]]: """ Query WFS GetFeature to retrieve zone features within a bounding box. Args: wfs_url: WFS service URL type_name: Feature type name (e.g., "nutzungsplanung") bbox: Bounding box string "min_x,min_y,max_x,max_y" srs: Spatial reference system (default: EPSG:2056 for LV95) Returns: List of feature dictionaries with properties and attributes """ # Only use WFS 1.1.0 (we know this works) params = { "service": "WFS", "version": "1.1.0", "request": "GetFeature", "typeName": type_name, "bbox": bbox, "srsName": srs } logger.debug(f"Querying WFS GetFeature: {wfs_url} with typeName={type_name}, bbox={bbox}") try: async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(wfs_url, params=params) as response: if response.status != 200: logger.debug(f"WFS GetFeature returned status {response.status}") return [] # Parse XML/GML response xml_content = await response.text() try: root = ET.fromstring(xml_content) features = [] members = root.findall('.//{http://www.opengis.net/gml}featureMember') or \ root.findall('.//featureMember') for member in members: attrs = {} # Find feature element feature_elem = member for child in member: if child.tag and ('nutzung' in child.tag.lower() or 'plan' in child.tag.lower()): feature_elem = child break # Extract attributes for elem in feature_elem.iter(): if elem.tag and elem.text and elem.text.strip(): tag_lower = elem.tag.lower() if any(term in tag_lower for term in [ 'pos', 'coordinates', 'point', 'polygon', 'linestring', 'geometry', 'boundedby', 'envelope', 'gml' ]): continue tag_name = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag if ':' in tag_name: tag_name = tag_name.split(':')[-1] if tag_name not in attrs: attrs[tag_name] = elem.text.strip() # Parse geometry from GML geometry_polygon = self._parse_gml_geometry(feature_elem) if attrs: feature_dict = {"properties": attrs, "attributes": attrs} if geometry_polygon: feature_dict["geometry"] = geometry_polygon features.append(feature_dict) return features except ET.ParseError as e: logger.debug(f"Failed to parse WFS XML response: {e}") return [] except Exception as e: logger.debug(f"Error parsing WFS XML: {e}") return [] except Exception as e: logger.debug(f"WFS GetFeature query failed: {e}") return [] async def query_zone_layer( self, egrid: str, x: float, y: float, canton: Optional[str] = None, geometry: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """ Query zone information using ÖREB WFS service. Returns only zones that contain the parcel based on the parcel geometry. Args: egrid: EGRID identifier (not currently used but kept for API compatibility) x: X coordinate (LV95) - not used but kept for compatibility y: Y coordinate (LV95) - not used but kept for compatibility canton: Canton abbreviation (e.g., "ZH", "BE") geometry: Parcel geometry dictionary (ESRI rings or GeoJSON coordinates) Returns: List of zone dictionaries with layerBodId and attributes, or empty list if not found """ if not canton or not geometry: return [] wfs_url = self._get_oereb_wfs_url(canton) if not wfs_url: return [] try: bbox = self._calculate_bbox_from_geometry(geometry) if not bbox: return [] # Check cache cache_key = f"{wfs_url}:{bbox}" if cache_key in self._wfs_cache: cached_features = self._wfs_cache[cache_key] else: cached_features = await self._query_wfs_get_feature(wfs_url, "nutzungsplanung", bbox) self._wfs_cache[cache_key] = cached_features if not cached_features: return [] # Convert parcel geometry to Shapely Polygon for spatial validation parcel_polygon = self._geometry_to_shapely_polygon(geometry) if not parcel_polygon: logger.debug("Could not convert parcel geometry to Shapely Polygon") # Fallback to first zone if geometry conversion fails for feature in cached_features: attrs = feature.get("properties", feature.get("attributes", {})) typ_gde_abkuerzung = attrs.get("typ_gde_abkuerzung") if typ_gde_abkuerzung: return [{ "layerBodId": "oereb_wfs", "attributes": {"typ_gde_abkuerzung": typ_gde_abkuerzung} }] return [] # Find the zone that actually contains or intersects the parcel # Since a parcel is always in exactly one zone, we check for containment first, # then find the zone with the largest intersection area if no perfect containment is found containing_zone = None best_intersecting_zone = None best_intersection_area = 0.0 for feature in cached_features: attrs = feature.get("properties", feature.get("attributes", {})) typ_gde_abkuerzung = attrs.get("typ_gde_abkuerzung") if not typ_gde_abkuerzung: continue zone_geometry = feature.get("geometry") if not zone_geometry: # If geometry not parsed, skip spatial check for this feature # But keep it as fallback if no geometry-based match is found if not best_intersecting_zone: best_intersecting_zone = feature continue try: # Check if zone contains the parcel (most precise) if zone_geometry.contains(parcel_polygon): containing_zone = feature break # Found perfect match, stop searching # Check if zone intersects the parcel (for border cases) if zone_geometry.intersects(parcel_polygon): # Calculate intersection area to find the best match intersection = zone_geometry.intersection(parcel_polygon) if not intersection.is_empty: intersection_area = intersection.area # Keep the zone with the largest intersection area if intersection_area > best_intersection_area: best_intersection_area = intersection_area best_intersecting_zone = feature except Exception as e: logger.debug(f"Error checking spatial relationship: {e}") # If spatial check fails, keep as fallback if not best_intersecting_zone: best_intersecting_zone = feature # Return the containing zone if found, otherwise the best intersecting zone selected_feature = containing_zone or best_intersecting_zone if selected_feature: attrs = selected_feature.get("properties", selected_feature.get("attributes", {})) typ_gde_abkuerzung = attrs.get("typ_gde_abkuerzung") if typ_gde_abkuerzung: return [{ "layerBodId": "oereb_wfs", "attributes": {"typ_gde_abkuerzung": typ_gde_abkuerzung} }] return [] except Exception: return []