""" Swiss Topo MapServer Connector (Simplified) This connector handles interactions with the Swiss Federal Office of Topography MapServer identify endpoint for parcel data retrieval. Endpoint: https://api3.geo.admin.ch/rest/services/api/MapServer/identify """ import logging import asyncio import re from typing import Dict, List, Any, Optional, Tuple, Union import aiohttp from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) class SwissTopoMapServerConnector: """ Simplified connector for Swiss Topo MapServer identify endpoint. Provides methods for: - Searching parcels by coordinates (LV95) - Geocoding addresses to coordinates - Extracting parcel information """ # API endpoints MAPSERVER_IDENTIFY_URL = "https://api3.geo.admin.ch/rest/services/api/MapServer/identify" GEOCODING_URL = "https://api3.geo.admin.ch/rest/services/api/SearchServer" # Swiss official survey layer LAYER_AMTLICHE_VERMESSUNG = "all:ch.swisstopo-vd.amtliche-vermessung" # Switzerland bounds in LV95 (EPSG:2056) SWITZERLAND_BOUNDS = { "min_x": 2480000, "max_x": 2840000, "min_y": 1070000, "max_y": 1300000 } def __init__( self, timeout: int = 30, max_retries: int = 3, retry_delay: float = 1.0 ): """ Initialize MapServer connector. Args: timeout: Request timeout in seconds max_retries: Maximum number of retry attempts retry_delay: Initial retry delay in seconds (exponential backoff) """ self.timeout = aiohttp.ClientTimeout(total=timeout) self.max_retries = max_retries self.retry_delay = retry_delay logger.info("Swiss Topo MapServer Connector initialized") def _point_in_polygon(self, point_x: float, point_y: float, polygon_rings: List[List[List[float]]]) -> bool: """ Check if a point is inside a polygon using ray casting algorithm. Args: point_x: X coordinate of the point point_y: Y coordinate of the point polygon_rings: List of rings (each ring is a list of [x, y] coordinates) First ring is outer boundary, subsequent rings are holes Returns: True if point is inside polygon, False otherwise """ if not polygon_rings or not polygon_rings[0]: return False # Use outer ring (first ring) ring = polygon_rings[0] if len(ring) < 3: return False # Ray casting algorithm: count intersections of horizontal ray with polygon edges inside = False j = len(ring) - 1 for i in range(len(ring)): xi, yi = ring[i][0], ring[i][1] xj, yj = ring[j][0], ring[j][1] # Check if ray crosses edge if ((yi > point_y) != (yj > point_y)) and (point_x < (xj - xi) * (point_y - yi) / (yj - yi) + xi): inside = not inside j = i # If point is inside outer ring, check if it's in any hole if inside and len(polygon_rings) > 1: for hole_ring in polygon_rings[1:]: if self._point_in_polygon(point_x, point_y, [hole_ring]): inside = False # Point is in a hole, so not in polygon break return inside def _clean_municipality_name(self, municipality: str) -> str: """ Clean municipality name by removing canton suffix. Args: municipality: Municipality name (e.g., "Zürich ZH" or "Lohn (SH)") Returns: Cleaned municipality name (e.g., "Zürich" or "Lohn") """ if not municipality: return municipality # Remove parentheses content like "Lohn (SH)" -> "Lohn" municipality = municipality.split("(")[0].strip() # Remove canton code at the end if present (e.g., "Wald ZH" -> "Wald") parts = municipality.split() if len(parts) > 1 and len(parts[-1]) == 2 and parts[-1].isupper(): municipality = " ".join(parts[:-1]) return municipality def _extract_address_from_building_attrs(self, attrs: Dict[str, Any]) -> Dict[str, Optional[str]]: """ Extract address components from building layer attributes. Args: attrs: Attributes dictionary from building layer query Returns: Dictionary with street, house_number, plz, municipality, and full_address """ # Extract address components (note: strname is a list!) street_list = attrs.get("strname", []) street = street_list[0] if isinstance(street_list, list) and street_list else None house_number = attrs.get("deinr") plz = attrs.get("dplz4") municipality = attrs.get("dplzname") or attrs.get("ggdename") # Clean municipality name if municipality: municipality = self._clean_municipality_name(municipality) # Construct full address full_address = None if street and house_number and plz and municipality: full_address = f"{street} {house_number}, {plz} {municipality}" elif street and house_number: full_address = f"{street} {house_number}" if plz: full_address += f", {plz}" if municipality: full_address += f" {municipality}" return { 'street': street, 'house_number': str(house_number) if house_number else None, 'plz': str(plz) if plz else None, 'municipality': municipality, 'full_address': full_address } async def _query_building_layer( self, x: float, y: float, tolerance: int = 1, # Default to 1 pixel (minimum) for maximum precision buffer: int = 25 # Reduced buffer for more precise queries ) -> Optional[Dict[str, Any]]: """ Query the building/address layer at given coordinates. Args: x: X coordinate (LV95) y: Y coordinate (LV95) tolerance: Tolerance in pixels for identify operation buffer: Buffer in meters for map extent Returns: First building result dictionary, or None if not found """ try: building_extent = f"{x - buffer},{y - buffer},{x + buffer},{y + buffer}" building_params = { "geometry": f"{x},{y}", "geometryType": "esriGeometryPoint", "sr": "2056", "layers": "all:ch.bfs.gebaeude_wohnungs_register", "tolerance": tolerance, "mapExtent": building_extent, "imageDisplay": "800,600,96", "returnGeometry": "false", "f": "json" } async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session: async with session.get(self.MAPSERVER_IDENTIFY_URL, params=building_params) as response: if response.status == 200: building_data = await response.json() building_results = building_data.get("results", []) if building_results: return building_results[0] return None except Exception as e: logger.debug(f"Could not query building layer: {e}") return None def _validate_coordinates(self, x: float, y: float) -> None: """ Validate coordinates are within Switzerland bounds (LV95). Args: x: East coordinate (LV95) y: North coordinate (LV95) Raises: ValueError: If coordinates are out of bounds """ if not (self.SWITZERLAND_BOUNDS["min_x"] <= x <= self.SWITZERLAND_BOUNDS["max_x"]): raise ValueError( f"X coordinate {x} out of bounds " f"({self.SWITZERLAND_BOUNDS['min_x']} - {self.SWITZERLAND_BOUNDS['max_x']})" ) if not (self.SWITZERLAND_BOUNDS["min_y"] <= y <= self.SWITZERLAND_BOUNDS["max_y"]): raise ValueError( f"Y coordinate {y} out of bounds " f"({self.SWITZERLAND_BOUNDS['min_y']} - {self.SWITZERLAND_BOUNDS['max_y']})" ) def _extract_coordinates_from_search_result(self, result: Dict[str, Any]) -> Optional[Tuple[float, float]]: """ Extract LV95 coordinates from SearchServer API result. Handles different response formats and coordinate swapping. Args: result: Search result from SearchServer API Returns: Tuple of (x, y) in LV95 coordinates, or None if not found """ attrs = result.get("attrs", {}) # Try multiple extraction patterns x = None y = None # Pattern 1: From 'geom_st_box2d' (most reliable - has correct X,Y order) if 'geom_st_box2d' in attrs: bbox_str = attrs['geom_st_box2d'] match = re.search(r'BOX\(([0-9.]+) ([0-9.]+),([0-9.]+) ([0-9.]+)\)', bbox_str) if match: xmin, ymin, xmax, ymax = map(float, match.groups()) x = (xmin + xmax) / 2 # Easting y = (ymin + ymax) / 2 # Northing logger.debug(f"Extracted coordinates from geom_st_box2d: X={x}, Y={y}") # Pattern 2: From attrs with 'x' and 'y' keys (WARNING: these are SWAPPED!) # The Swiss Geo Admin API returns x=northing, y=easting (opposite of standard) elif 'x' in attrs and 'y' in attrs: # SWAP: their 'x' is our Y, their 'y' is our X x = attrs.get('y') # Their Y is easting (our X) y = attrs.get('x') # Their X is northing (our Y) logger.debug(f"Extracted swapped coordinates from attrs: X={x} (from 'y'), Y={y} (from 'x')") # Pattern 3: From attrs with 'easting' and 'northing' keys elif 'easting' in attrs and 'northing' in attrs: x = attrs.get('easting') y = attrs.get('northing') logger.debug(f"Extracted coordinates from easting/northing: X={x}, Y={y}") # Pattern 4: From top-level result (also likely swapped) elif 'x' in result and 'y' in result: x = result.get('y') # Swap y = result.get('x') # Swap logger.debug(f"Extracted swapped coordinates from result: X={x}, Y={y}") if x is None or y is None: return None # Convert to float x = float(x) y = float(y) # Validate that coordinates are in LV95 range if not (self.SWITZERLAND_BOUNDS["min_x"] <= x <= self.SWITZERLAND_BOUNDS["max_x"]): logger.warning(f"Extracted X coordinate {x} is out of LV95 bounds") return None if not (self.SWITZERLAND_BOUNDS["min_y"] <= y <= self.SWITZERLAND_BOUNDS["max_y"]): logger.warning(f"Extracted Y coordinate {y} is out of LV95 bounds") return None return (x, y) async def _make_request( self, url: str, params: Dict[str, Any] ) -> Dict[str, Any]: """ Make HTTP GET request with retry logic and exponential backoff. Args: url: API endpoint URL params: Query parameters Returns: Response JSON data Raises: aiohttp.ClientError: If request fails after all retries """ for attempt in range(self.max_retries + 1): try: async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(url, params=params) as response: # Don't retry on 4xx errors (client errors) if 400 <= response.status < 500: error_text = await response.text() logger.error(f"API client error: {response.status} - {error_text}") response.raise_for_status() # Retry on 5xx errors (server errors) if response.status >= 500: if attempt < self.max_retries: delay = self.retry_delay * (2 ** attempt) logger.warning( f"API server error {response.status}, " f"retrying in {delay}s (attempt {attempt + 1}/{self.max_retries + 1})" ) await asyncio.sleep(delay) continue else: error_text = await response.text() logger.error(f"API request failed after {self.max_retries + 1} attempts: {response.status} - {error_text}") response.raise_for_status() return await response.json() except aiohttp.ClientError as e: if attempt < self.max_retries: delay = self.retry_delay * (2 ** attempt) logger.warning( f"API network error, retrying in {delay}s " f"(attempt {attempt + 1}/{self.max_retries + 1}): {e}" ) await asyncio.sleep(delay) continue else: logger.error(f"API request failed after {self.max_retries + 1} attempts: {e}") raise except Exception as e: logger.error(f"Unexpected error in API request: {e}") raise raise Exception(f"API request failed after {self.max_retries + 1} attempts") async def geocode_address(self, address: str, return_full_result: bool = False) -> Optional[Union[Tuple[float, float], Dict[str, Any]]]: """ Geocode an address to LV95 coordinates. Args: address: Address string (e.g. "Bundesplatz 3, 3003 Bern") return_full_result: If True, returns dict with coords and full result. If False, returns just coords tuple. Returns: If return_full_result=False: Tuple of (x, y) in LV95 coordinates, or None if not found If return_full_result=True: Dict with 'coords' (x, y tuple) and 'result' (full geocoding result), or None if not found """ try: # Normalize address: ensure postal code is properly formatted # Swiss postal codes are 4 digits, often separated from street normalized_address = address.strip() # Try to detect and format postal code pattern: "Street Number POSTALCODE" # Pattern: ends with 4 digits (postal code) postal_code_pattern = r'\b(\d{4})\b' postal_code_match = re.search(postal_code_pattern, normalized_address) if postal_code_match: postal_code = postal_code_match.group(1) # Format as "Street Number, POSTALCODE" for better geocoding # This helps the API understand the structure better parts = normalized_address.split() if len(parts) >= 2 and parts[-1] == postal_code: # Already ends with postal code, format with comma street_part = " ".join(parts[:-1]) normalized_address = f"{street_part}, {postal_code}" logger.debug(f"Normalized address format: '{normalized_address}'") # Request multiple results to find the best match # Use "address" origin to prioritize address results over parcels params = { "searchText": normalized_address, "type": "locations", "origins": "address", # Prioritize address results "sr": "2056", # Request coordinates in LV95 (EPSG:2056) "limit": 10 # Get more results to find best match } logger.info(f"Geocoding address: {address} (normalized: {normalized_address})") response = await self._make_request(self.GEOCODING_URL, params) results = response.get("results", []) if not results: logger.warning(f"No geocoding results for address: {address}") return None # Normalize input address for comparison input_normalized = address.lower().strip() input_parts = [p.strip() for p in input_normalized.split() if p.strip()] # Try to find the best matching result best_match = None best_score = 0 for result in results: # Extract address label from result (remove HTML tags) result_label_raw = result.get("label", "") # Remove HTML tags like ... result_label = re.sub(r'<[^>]+>', '', result_label_raw).lower().strip() attrs = result.get("attrs", {}) # Extract address components street = attrs.get("strname", "") if isinstance(street, list): street = street[0] if street else "" house_number = str(attrs.get("deinr", "")).strip() plz = str(attrs.get("dplz4", "")).strip() municipality = attrs.get("dplzname", "") or attrs.get("ggdename", "") # Build address components for comparison result_parts = [] if street: result_parts.append(street.lower()) if house_number: result_parts.append(house_number) if plz: result_parts.append(plz) if municipality: # Remove canton suffix municipality_clean = municipality.split("(")[0].strip().split() if len(municipality_clean) > 1 and len(municipality_clean[-1]) == 2: municipality_clean = municipality_clean[:-1] result_parts.append(" ".join(municipality_clean).lower()) # Also use the label for matching (it often has the full address) if result_label: label_parts = [p.strip() for p in result_label.split() if len(p.strip()) > 2] result_parts.extend(label_parts) # Calculate match score score = 0 matched_parts = 0 # Check if input parts match result parts for input_part in input_parts: for result_part in result_parts: if input_part in result_part or result_part in input_part: score += 1 matched_parts += 1 break # Prefer results with higher match score if matched_parts > 0: score = matched_parts / max(len(input_parts), len(result_parts)) logger.debug(f"Geocoding result match: label='{result_label}', score={score:.2f}, matched={matched_parts}/{len(input_parts)}") if score > best_score: best_score = score best_match = result # Use best match or fall back to first result result = best_match if best_match else results[0] if best_match: logger.info(f"Selected best match with score {best_score:.2f}: {result.get('label', 'Unknown')}") else: logger.info(f"Using first result: {result.get('label', 'Unknown')}") logger.debug(f"Geocoding result: {result}") coords = self._extract_coordinates_from_search_result(result) if coords: x, y = coords logger.info(f"Geocoded address '{address}' to LV95 coordinates: ({x}, {y})") if return_full_result: return { 'coords': coords, 'result': result } else: return coords else: logger.warning(f"No coordinates found in geocoding result. Result: {result}") return None except Exception as e: logger.error(f"Error geocoding address '{address}': {e}", exc_info=True) return None async def search_parcel( self, location: str, tolerance: int = 5 # Reduced default tolerance for more precise parcel matching ) -> Optional[Dict[str, Any]]: """ Search for parcel by address or coordinates using MapServer identify. Args: location: Either coordinates as "x,y" (LV95) or address string tolerance: Tolerance in pixels for identify operation Returns: Parcel information dictionary with geocoded address info, or None if not found """ try: original_location = location geocoded_address_info = None geocoded_coords = None # Try to parse as coordinates first parts = location.split(",") if len(parts) == 2: try: x = float(parts[0].strip()) y = float(parts[1].strip()) logger.info(f"Parsed location as coordinates: ({x}, {y})") # Validate coordinates self._validate_coordinates(x, y) except ValueError: # Not coordinates, try geocoding - get full result geocode_result = await self.geocode_address(location, return_full_result=True) if geocode_result is None: logger.warning(f"Could not geocode location: {location}") return None x, y = geocode_result['coords'] geocoded_address_info = geocode_result['result'] else: # Treat as address and geocode - get full result including address info geocode_result = await self.geocode_address(location, return_full_result=True) if geocode_result is None: logger.warning(f"Could not geocode location: {location}") return None x, y = geocode_result['coords'] geocoded_address_info = geocode_result['result'] # Log the geocoded address info if geocoded_address_info: label_raw = geocoded_address_info.get('label', '') label_clean = re.sub(r'<[^>]+>', '', label_raw).strip() logger.info(f"Geocoded address info: {label_clean}") # First, query the building layer with minimal tolerance to get the exact building # This is critical for precise map clicks - we want the exact building, not nearby ones is_coordinate_search = len(parts) == 2 and not geocoded_address_info # Use 1 pixel tolerance (minimum) for coordinate clicks to ensure we get results building_tolerance = 1 if is_coordinate_search else 10 building_result = await self._query_building_layer(x, y, tolerance=building_tolerance, buffer=25) building_parcel_id = None if building_result: building_attrs = building_result.get("attributes", {}) building_parcel_id = building_attrs.get("lparz") logger.debug(f"Found building with parcel ID: {building_parcel_id}") # Use MapServer identify to get parcel info # For coordinate searches, use minimal tolerance (1 pixel) for spatial containment # Note: tolerance=0 might be too strict, use 1 pixel minimum parcel_tolerance = 1 if is_coordinate_search else tolerance extent_buffer = 1000 # 1km buffer map_extent = f"{x - extent_buffer},{y - extent_buffer},{x + extent_buffer},{y + extent_buffer}" params = { "geometry": f"{x},{y}", "geometryType": "esriGeometryPoint", "sr": "2056", # LV95 "layers": self.LAYER_AMTLICHE_VERMESSUNG, "tolerance": parcel_tolerance, "mapExtent": map_extent, "imageDisplay": "800,600,96", "returnGeometry": "true", "f": "json" } logger.info(f"Querying parcel at coordinates: ({x}, {y}) with tolerance: {parcel_tolerance}") response = await self._make_request(self.MAPSERVER_IDENTIFY_URL, params) results = response.get("results", []) if not results: logger.warning(f"No parcel found at coordinates: ({x}, {y})") return None # Strategy 1: If we have a building parcel ID, find the matching parcel parcel_data = None if building_parcel_id: building_parcel_id_normalized = str(building_parcel_id).strip().upper() for result in results: result_label = str(result.get('attributes', {}).get('label', '')).strip().upper() result_number = str(result.get('attributes', {}).get('number', '')).strip().upper() if (result_label == building_parcel_id_normalized or result_number == building_parcel_id_normalized or result_label.endswith(building_parcel_id_normalized) or building_parcel_id_normalized in result_label): parcel_data = result logger.info(f"Found matching parcel by building parcel ID: {result_label} (building had: {building_parcel_id})") break # Strategy 2: For coordinate searches, always check spatial containment (point-in-polygon) # This ensures we get the parcel that actually contains the point, not just the nearest one # We MUST use spatial containment for coordinate searches - no legacy fallback if is_coordinate_search: logger.debug("Checking spatial containment for coordinate search") containing_parcels = [] for result in results: geometry = result.get('geometry', {}) if not geometry: continue # Extract polygon rings from geometry rings = geometry.get('rings', []) if rings: try: # Check if point is inside this parcel's polygon if self._point_in_polygon(x, y, rings): containing_parcels.append(result) parcel_label = result.get('attributes', {}).get('label', 'Unknown') logger.debug(f"Point is inside parcel: {parcel_label}") except Exception as e: logger.debug(f"Error checking point-in-polygon for parcel {result.get('attributes', {}).get('label', 'Unknown')}: {e}") continue if not containing_parcels: # No parcels contain the point - this should not happen for valid coordinates logger.warning(f"No parcels contain point ({x}, {y}) - this may indicate a data issue") return None # If we already have a parcel from building match, verify it's in containing parcels if parcel_data: parcel_label = parcel_data.get('attributes', {}).get('label', 'Unknown') parcel_in_containing = any( p.get('attributes', {}).get('label') == parcel_label for p in containing_parcels ) if not parcel_in_containing: logger.warning(f"Building-matched parcel {parcel_label} does not contain point, using spatial containment instead") parcel_data = None # If no parcel yet or building match was invalid, use spatial containment if not parcel_data: if building_parcel_id: # Prefer parcel matching building parcel ID among containing parcels building_parcel_id_normalized = str(building_parcel_id).strip().upper() for parcel in containing_parcels: result_label = str(parcel.get('attributes', {}).get('label', '')).strip().upper() if (result_label == building_parcel_id_normalized or result_label.endswith(building_parcel_id_normalized) or building_parcel_id_normalized in result_label): parcel_data = parcel logger.info(f"Found matching containing parcel by building parcel ID: {result_label}") break # If no building match, use first containing parcel if not parcel_data: parcel_data = containing_parcels[0] parcel_label = parcel_data.get('attributes', {}).get('label', 'Unknown') logger.info(f"Found parcel by spatial containment: {parcel_label} (from {len(containing_parcels)} containing parcels)") # Fallback for address searches only (not coordinate searches) if not parcel_data and not is_coordinate_search: parcel_data = results[0] if building_parcel_id: logger.debug(f"Could not find parcel matching building parcel ID {building_parcel_id}, using first result: {parcel_data.get('attributes', {}).get('label', 'Unknown')}") parcel_label = parcel_data.get('attributes', {}).get('label', 'Unknown') logger.info(f"Found parcel: {parcel_label}") # Store the query coordinates (used for address lookup, not parcel centroid) parcel_data['query_coordinates'] = {'x': x, 'y': y} # Attach geocoded address info to parcel_data for use by the route if geocoded_address_info: # Extract address components from geocoded result attrs = geocoded_address_info.get('attrs', {}) # Get label and clean HTML tags first label_raw = geocoded_address_info.get('label', '') label_clean = re.sub(r'<[^>]+>', '', label_raw).strip() # Extract address components - try multiple sources street_list = attrs.get("strname", []) street = street_list[0] if isinstance(street_list, list) and street_list else None # House number can be in attrs.deinr or top-level num house_number = attrs.get("deinr") or geocoded_address_info.get("num") if house_number: house_number = str(house_number).strip() # Postal code plz = attrs.get("dplz4") if plz: plz = str(plz).strip() # Municipality municipality = attrs.get("dplzname") or attrs.get("ggdename") # If street not found in attrs, try to parse from label if not street and label_clean: # Parse "Street Number" from label (e.g., "Ueberlandstrasse 11 8050 Zürich") label_parts = label_clean.split() if len(label_parts) >= 2: # Try to find street name (everything before the number) for i in range(len(label_parts) - 1, 0, -1): if label_parts[i].isdigit(): street = " ".join(label_parts[:i]) break # If postal code not found, try to extract from label if not plz and label_clean: # Look for 4-digit postal code in label postal_match = re.search(r'\b(\d{4})\b', label_clean) if postal_match: plz = postal_match.group(1) # If municipality not found, try to extract from label if not municipality and label_clean: # Municipality is usually after postal code # Format: "Street Number POSTALCODE Municipality" label_parts = label_clean.split() if len(label_parts) >= 2: # Find postal code position and take what comes after for i, part in enumerate(label_parts): if len(part) == 4 and part.isdigit(): if i + 1 < len(label_parts): municipality = " ".join(label_parts[i+1:]) break # Clean municipality name if municipality: municipality = self._clean_municipality_name(municipality) # Construct full address - prefer label_clean, but build from components if needed if label_clean: full_address = label_clean elif street and house_number and plz and municipality: full_address = f"{street} {house_number}, {plz} {municipality}" elif street and house_number: full_address = f"{street} {house_number}" if plz: full_address += f", {plz}" if municipality: full_address += f" {municipality}" else: full_address = label_clean or original_location parcel_data['geocoded_address'] = { 'label': label_clean, 'street': street, 'house_number': house_number, 'plz': plz, 'municipality': municipality, 'full_address': full_address } logger.debug(f"Attached geocoded address to parcel: {full_address}") logger.debug(f"Address components - street: {street}, house_number: {house_number}, plz: {plz}, municipality: {municipality}") # Log warning if geocoded address doesn't match input (for debugging) if geocoded_address_info and original_location and any(c.isalpha() for c in original_location): label_raw = geocoded_address_info.get('label', '') geocoded_label = re.sub(r'<[^>]+>', '', label_raw).lower().strip() original_lower = original_location.lower() # Check if key parts match original_parts = set(p.strip() for p in original_lower.split() if len(p.strip()) > 2) geocoded_parts = set(p.strip() for p in geocoded_label.split() if len(p.strip()) > 2) # Check if street name and number are similar match_score = len(original_parts.intersection(geocoded_parts)) / max(len(original_parts), 1) if match_score < 0.5: logger.warning( f"Geocoded address may not match input! " f"Input: '{original_location}' -> Geocoded: '{geocoded_label}' " f"(match score: {match_score:.2f})" ) return parcel_data except Exception as e: logger.error(f"Error searching parcel for location '{location}': {e}") raise def extract_parcel_attributes(self, parcel_data: Dict[str, Any]) -> Dict[str, Any]: """ Extract and normalize parcel attributes from MapServer response. Args: parcel_data: Raw parcel data from MapServer identify Returns: Dictionary with normalized parcel attributes compatible with Parzelle model """ attributes = parcel_data.get("attributes", {}) geometry = parcel_data.get("geometry") # Extract common parcel attributes # Note: Attribute names depend on the actual data structure from the API # These are common fields, adjust based on actual API response extracted = { "label": attributes.get("label") or attributes.get("parzellennummer") or attributes.get("nummer") or "Unknown", "parzellenAliasTags": [], "strasseNr": attributes.get("adresse") or attributes.get("strasse"), "plz": attributes.get("plz"), "eigentuemerschaft": attributes.get("eigentuemer") or attributes.get("eigentuemerschaft"), "bauzone": attributes.get("bauzone") or attributes.get("zonierung"), "perimeter": self._convert_geometry_to_geopolylinie(geometry) if geometry else None, "kontextGemeinde": attributes.get("gemeinde") or attributes.get("gemeindename"), # Additional metadata "raw_attributes": attributes # Keep raw data for reference } return extracted async def get_parcel_polygon( self, gemeinde: str, parzellen_nr: str, sr: int = 2056 ) -> Optional[Dict[str, Any]]: """ Holt die vollständige Polygon-Geometrie einer Parzelle nach Gemeinde und Parzellennummer. Args: gemeinde: Name der Gemeinde (z.B. "Bern") parzellen_nr: Parzellennummer (z.B. "1234") sr: Koordinatensystem (2056=LV95, 4326=WGS84) Returns: GeoJSON-Feature mit Polygon-Koordinaten oder None wenn nicht gefunden """ try: # Schritt 1: Parzelle suchen search_params = { "searchText": f"{gemeinde} {parzellen_nr}", "type": "locations", "origins": "parcel", "sr": str(sr) } logger.info(f"Searching for parcel: {gemeinde} {parzellen_nr}") response = await self._make_request(self.GEOCODING_URL, search_params) search_results = response.get("results", []) if not search_results: logger.warning(f"Parzelle {gemeinde} {parzellen_nr} nicht gefunden") return None # Extract coordinates using shared helper method search_result = search_results[0] coords = self._extract_coordinates_from_search_result(search_result) if coords is None: logger.warning(f"Could not extract coordinates from search result") return None x, y = coords logger.info(f"Parzelle gefunden: {search_result.get('label', 'Unknown')}, Zentrum: E={x}, N={y}") # Schritt 2: Polygon-Geometrie abrufen identify_params = { "geometry": f"{x},{y}", "geometryType": "esriGeometryPoint", "layers": self.LAYER_AMTLICHE_VERMESSUNG, "tolerance": "0", "returnGeometry": "true", "geometryFormat": "geojson", "sr": str(sr), "imageDisplay": "0,0,0", "mapExtent": "0,0,0,0" } logger.info(f"Fetching polygon geometry for parcel at ({x}, {y})") identify_response = await self._make_request(self.MAPSERVER_IDENTIFY_URL, identify_params) identify_results = identify_response.get("results", []) if not identify_results: logger.warning("Keine Geometrie gefunden") return None # Return the GeoJSON feature feature = identify_results[0] logger.info(f"Successfully retrieved polygon geometry with {len(feature.get('geometry', {}).get('coordinates', [[]])[0])} points") return feature except Exception as e: logger.error(f"Error getting parcel polygon for {gemeinde} {parzellen_nr}: {e}", exc_info=True) return None def extract_boundary_points(self, feature: Dict[str, Any]) -> List[List[float]]: """ Extrahiert alle Grenzpunkte aus dem GeoJSON-Feature. Args: feature: GeoJSON Feature mit Polygon-Geometrie Returns: Liste von Koordinatenpaaren [[x, y], ...] in LV95 """ try: geometry = feature.get("geometry", {}) if geometry.get("type") == "Polygon": # Äusserer Ring = erste Liste von Koordinaten return geometry["coordinates"][0] elif geometry.get("type") == "MultiPolygon": # Alle Ringe aller Polygone all_points = [] for polygon in geometry["coordinates"]: all_points.extend(polygon[0]) return all_points return [] except Exception as e: logger.error(f"Error extracting boundary points: {e}") return [] async def find_neighboring_parcels( self, parcel_data: Dict[str, Any], selected_parcel_id: str, sample_distance: float = 20.0, # Balanced distance for good coverage max_sample_points: int = 30, # Increased to ensure all vertices + some intermediate points max_neighbors: int = 15, # Increased to find more neighbors max_concurrent: int = 50 # Process up to 50 queries concurrently (maximum parallelization) ) -> List[Dict[str, Any]]: """ Find all parcels that touch the boundary of the selected parcel. This method samples points along the parcel boundary and queries parcels at each point to find all neighboring parcels that actually touch the boundary. Optimized for speed with intelligent sampling and early stopping. Args: parcel_data: The selected parcel's data dictionary (must contain geometry) selected_parcel_id: The ID of the selected parcel (to exclude from results) sample_distance: Distance in meters between sample points along the boundary (default: 15m) max_sample_points: Maximum number of sample points to query (default: 30) max_neighbors: Stop early if we find this many unique neighbors (default: 15) Returns: List of neighboring parcel dictionaries with id, egrid, and number """ try: geometry = parcel_data.get("geometry", {}) if not geometry: logger.warning("No geometry found in parcel_data for finding neighbors") return [] # Extract boundary points from geometry boundary_points = [] # Handle ESRI format (rings) if "rings" in geometry and geometry["rings"]: ring = geometry["rings"][0] # Outer ring for coord in ring: if len(coord) >= 2: boundary_points.append((coord[0], coord[1])) # Handle GeoJSON format elif geometry.get("type") == "Polygon": coordinates = geometry.get("coordinates", []) if coordinates and len(coordinates) > 0: ring = coordinates[0] # Outer ring for coord in ring: if len(coord) >= 2: boundary_points.append((coord[0], coord[1])) if not boundary_points: logger.warning("No boundary points found in parcel geometry") return [] # Sample points along the boundary, offset outward # Increased offset to ensure we're outside the selected parcel offset_distance = 8.0 # meters - increased from 2.0 to ensure we're outside sampled_points = [] # Calculate centroid for determining outward direction centroid_x = sum(p[0] for p in boundary_points) / len(boundary_points) centroid_y = sum(p[1] for p in boundary_points) / len(boundary_points) # First pass: Always include ALL vertices (critical for finding all neighbors) # Don't limit vertices - they're essential for coverage edge_info = [] # Store edge information for intermediate sampling for i in range(len(boundary_points)): p1 = boundary_points[i] p2 = boundary_points[(i + 1) % len(boundary_points)] # Calculate edge vector edge_dx = p2[0] - p1[0] edge_dy = p2[1] - p1[1] edge_length = (edge_dx**2 + edge_dy**2)**0.5 if edge_length > 0: # Calculate perpendicular vector (pointing outward) perp_dx = -edge_dy / edge_length perp_dy = edge_dx / edge_length # Check if this points outward (away from centroid) to_point_dx = p1[0] - centroid_x to_point_dy = p1[1] - centroid_y if (perp_dx * to_point_dx + perp_dy * to_point_dy) < 0: perp_dx = -perp_dx perp_dy = -perp_dy # Offset vertex point outward offset_p1 = (p1[0] + perp_dx * offset_distance, p1[1] + perp_dy * offset_distance) sampled_points.append(offset_p1) # Store edge info for intermediate sampling edge_info.append({ 'p1': p1, 'edge_length': edge_length, 'perp_dx': perp_dx, 'perp_dy': perp_dy, 'edge_dx': edge_dx, 'edge_dy': edge_dy }) # Second pass: Add intermediate points for edges longer than sample_distance # This ensures we don't miss neighbors that touch in the middle of long edges for edge in edge_info: if len(sampled_points) >= max_sample_points: break # Add intermediate points for edges longer than sample_distance if edge['edge_length'] > sample_distance: # Calculate how many intermediate points we need num_samples = int(edge['edge_length'] / sample_distance) # Limit to reasonable number per edge (max 3) to avoid too many points num_samples = min(num_samples, 3) for j in range(1, num_samples + 1): if len(sampled_points) >= max_sample_points: break t = j / (num_samples + 1) # Interpolate along edge interp_x = edge['p1'][0] + t * edge['edge_dx'] interp_y = edge['p1'][1] + t * edge['edge_dy'] # Offset outward offset_x = interp_x + edge['perp_dx'] * offset_distance offset_y = interp_y + edge['perp_dy'] * offset_distance sampled_points.append((offset_x, offset_y)) # Don't limit vertices, but limit total if we have too many intermediate points # This ensures we always have all vertices for complete coverage if len(sampled_points) > max_sample_points: # Keep all vertices, limit intermediate points vertex_count = len(boundary_points) if vertex_count <= max_sample_points: # Keep vertices + some intermediate points sampled_points = sampled_points[:max_sample_points] else: # Too many vertices - keep all vertices anyway (they're critical) sampled_points = sampled_points[:len(sampled_points)] # Query parcels in parallel batches for much faster performance neighboring_parcels = {} tolerance = 2 # Small tolerance for boundary queries async def query_point(point: Tuple[float, float]) -> Optional[Dict[str, Any]]: """Query a single point and return parcel data if found.""" try: x, y = point location_str = f"{x},{y}" adj_data = await self.search_parcel(location_str, tolerance=tolerance) if adj_data: adj_attrs = adj_data.get("attributes", {}) adj_id = adj_attrs.get("label") or adj_attrs.get("number") # Exclude the selected parcel itself if adj_id and adj_id != selected_parcel_id: # Extract geometry information adj_geometry = adj_data.get("geometry", {}) extracted_attrs = self.extract_parcel_attributes(adj_data) return { "id": adj_id, "egrid": adj_attrs.get("egris_egrid"), "number": adj_attrs.get("number"), "perimeter": extracted_attrs.get("perimeter"), "geometry": adj_geometry } except Exception: # Silently skip errors for speed pass return None # Process ALL queries concurrently for maximum speed # Use semaphore to limit concurrent connections to avoid overwhelming the API semaphore = asyncio.Semaphore(max_concurrent) async def query_point_with_semaphore(point: Tuple[float, float]) -> Optional[Dict[str, Any]]: """Query a point with semaphore-controlled concurrency.""" async with semaphore: return await query_point(point) # Launch all queries concurrently (semaphore will limit actual concurrency) results = await asyncio.gather(*[query_point_with_semaphore(point) for point in sampled_points], return_exceptions=True) # Process all results (minimal logging for speed) for result in results: if isinstance(result, Exception): continue if result and result["id"] not in neighboring_parcels: neighboring_parcels[result["id"]] = result # Early stop if we've found enough neighbors if len(neighboring_parcels) >= max_neighbors: break result_list = list(neighboring_parcels.values()) logger.info(f"Found {len(result_list)} neighboring parcels for parcel {selected_parcel_id} (queried {len(sampled_points)} points)") return result_list except Exception as e: logger.error(f"Error finding neighboring parcels: {e}", exc_info=True) return [] def _convert_geometry_to_geopolylinie(self, geometry: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Convert ESRI geometry or GeoJSON to GeoPolylinie format. Args: geometry: ESRI geometry (rings) or GeoJSON geometry from MapServer response Returns: GeoPolylinie-compatible dictionary or None """ try: # Handle GeoJSON format (from get_parcel_polygon) if geometry.get("type") == "Polygon": coordinates = geometry.get("coordinates", []) if coordinates and len(coordinates) > 0: ring = coordinates[0] # Outer ring punkte = [] for coord in ring: if len(coord) >= 2: punkt = { "koordinatensystem": "LV95", "x": coord[0], # GeoJSON: [x, y] = [easting, northing] "y": coord[1], "z": coord[2] if len(coord) > 2 else None } punkte.append(punkt) return { "closed": True, "punkte": punkte } # Handle ESRI format (rings) - existing code elif "rings" in geometry and geometry["rings"]: ring = geometry["rings"][0] # Take first ring (outer boundary) punkte = [] for coord in ring: if len(coord) >= 2: punkt = { "koordinatensystem": "LV95", "x": coord[0], "y": coord[1], "z": coord[2] if len(coord) > 2 else None } punkte.append(punkt) return { "closed": True, "punkte": punkte } return None except Exception as e: logger.error(f"Error converting geometry to GeoPolylinie: {e}") return None