1206 lines
55 KiB
Python
1206 lines
55 KiB
Python
"""
|
|
Swiss Topo MapServer Connector (Simplified)
|
|
|
|
This connector handles interactions with the Swiss Federal Office of Topography
|
|
MapServer identify endpoint for parcel data retrieval.
|
|
|
|
Endpoint: https://api3.geo.admin.ch/rest/services/api/MapServer/identify
|
|
"""
|
|
|
|
import logging
|
|
import asyncio
|
|
import re
|
|
from typing import Dict, List, Any, Optional, Tuple, Union
|
|
import aiohttp
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SwissTopoMapServerConnector:
|
|
"""
|
|
Simplified connector for Swiss Topo MapServer identify endpoint.
|
|
|
|
Provides methods for:
|
|
- Searching parcels by coordinates (LV95)
|
|
- Geocoding addresses to coordinates
|
|
- Extracting parcel information
|
|
"""
|
|
|
|
# API endpoints
|
|
MAPSERVER_IDENTIFY_URL = "https://api3.geo.admin.ch/rest/services/api/MapServer/identify"
|
|
GEOCODING_URL = "https://api3.geo.admin.ch/rest/services/api/SearchServer"
|
|
|
|
# Swiss official survey layer
|
|
LAYER_AMTLICHE_VERMESSUNG = "all:ch.swisstopo-vd.amtliche-vermessung"
|
|
|
|
# Switzerland bounds in LV95 (EPSG:2056)
|
|
SWITZERLAND_BOUNDS = {
|
|
"min_x": 2480000,
|
|
"max_x": 2840000,
|
|
"min_y": 1070000,
|
|
"max_y": 1300000
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
timeout: int = 30,
|
|
max_retries: int = 3,
|
|
retry_delay: float = 1.0
|
|
):
|
|
"""
|
|
Initialize MapServer connector.
|
|
|
|
Args:
|
|
timeout: Request timeout in seconds
|
|
max_retries: Maximum number of retry attempts
|
|
retry_delay: Initial retry delay in seconds (exponential backoff)
|
|
"""
|
|
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
|
self.max_retries = max_retries
|
|
self.retry_delay = retry_delay
|
|
|
|
logger.info("Swiss Topo MapServer Connector initialized")
|
|
|
|
def _point_in_polygon(self, point_x: float, point_y: float, polygon_rings: List[List[List[float]]]) -> bool:
|
|
"""
|
|
Check if a point is inside a polygon using ray casting algorithm.
|
|
|
|
Args:
|
|
point_x: X coordinate of the point
|
|
point_y: Y coordinate of the point
|
|
polygon_rings: List of rings (each ring is a list of [x, y] coordinates)
|
|
First ring is outer boundary, subsequent rings are holes
|
|
|
|
Returns:
|
|
True if point is inside polygon, False otherwise
|
|
"""
|
|
if not polygon_rings or not polygon_rings[0]:
|
|
return False
|
|
|
|
# Use outer ring (first ring)
|
|
ring = polygon_rings[0]
|
|
if len(ring) < 3:
|
|
return False
|
|
|
|
# Ray casting algorithm: count intersections of horizontal ray with polygon edges
|
|
inside = False
|
|
j = len(ring) - 1
|
|
|
|
for i in range(len(ring)):
|
|
xi, yi = ring[i][0], ring[i][1]
|
|
xj, yj = ring[j][0], ring[j][1]
|
|
|
|
# Check if ray crosses edge
|
|
if ((yi > point_y) != (yj > point_y)) and (point_x < (xj - xi) * (point_y - yi) / (yj - yi) + xi):
|
|
inside = not inside
|
|
j = i
|
|
|
|
# If point is inside outer ring, check if it's in any hole
|
|
if inside and len(polygon_rings) > 1:
|
|
for hole_ring in polygon_rings[1:]:
|
|
if self._point_in_polygon(point_x, point_y, [hole_ring]):
|
|
inside = False # Point is in a hole, so not in polygon
|
|
break
|
|
|
|
return inside
|
|
|
|
def _clean_municipality_name(self, municipality: str) -> str:
|
|
"""
|
|
Clean municipality name by removing canton suffix.
|
|
|
|
Args:
|
|
municipality: Municipality name (e.g., "Zürich ZH" or "Lohn (SH)")
|
|
|
|
Returns:
|
|
Cleaned municipality name (e.g., "Zürich" or "Lohn")
|
|
"""
|
|
if not municipality:
|
|
return municipality
|
|
|
|
# Remove parentheses content like "Lohn (SH)" -> "Lohn"
|
|
municipality = municipality.split("(")[0].strip()
|
|
|
|
# Remove canton code at the end if present (e.g., "Wald ZH" -> "Wald")
|
|
parts = municipality.split()
|
|
if len(parts) > 1 and len(parts[-1]) == 2 and parts[-1].isupper():
|
|
municipality = " ".join(parts[:-1])
|
|
|
|
return municipality
|
|
|
|
def _extract_address_from_building_attrs(self, attrs: Dict[str, Any]) -> Dict[str, Optional[str]]:
|
|
"""
|
|
Extract address components from building layer attributes.
|
|
|
|
Args:
|
|
attrs: Attributes dictionary from building layer query
|
|
|
|
Returns:
|
|
Dictionary with street, house_number, plz, municipality, and full_address
|
|
"""
|
|
# Extract address components (note: strname is a list!)
|
|
street_list = attrs.get("strname", [])
|
|
street = street_list[0] if isinstance(street_list, list) and street_list else None
|
|
house_number = attrs.get("deinr")
|
|
plz = attrs.get("dplz4")
|
|
municipality = attrs.get("dplzname") or attrs.get("ggdename")
|
|
|
|
# Clean municipality name
|
|
if municipality:
|
|
municipality = self._clean_municipality_name(municipality)
|
|
|
|
# Construct full address
|
|
full_address = None
|
|
if street and house_number and plz and municipality:
|
|
full_address = f"{street} {house_number}, {plz} {municipality}"
|
|
elif street and house_number:
|
|
full_address = f"{street} {house_number}"
|
|
if plz:
|
|
full_address += f", {plz}"
|
|
if municipality:
|
|
full_address += f" {municipality}"
|
|
|
|
return {
|
|
'street': street,
|
|
'house_number': str(house_number) if house_number else None,
|
|
'plz': str(plz) if plz else None,
|
|
'municipality': municipality,
|
|
'full_address': full_address
|
|
}
|
|
|
|
async def _query_building_layer(
|
|
self,
|
|
x: float,
|
|
y: float,
|
|
tolerance: int = 1, # Default to 1 pixel (minimum) for maximum precision
|
|
buffer: int = 25 # Reduced buffer for more precise queries
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Query the building/address layer at given coordinates.
|
|
|
|
Args:
|
|
x: X coordinate (LV95)
|
|
y: Y coordinate (LV95)
|
|
tolerance: Tolerance in pixels for identify operation
|
|
buffer: Buffer in meters for map extent
|
|
|
|
Returns:
|
|
First building result dictionary, or None if not found
|
|
"""
|
|
try:
|
|
building_extent = f"{x - buffer},{y - buffer},{x + buffer},{y + buffer}"
|
|
|
|
building_params = {
|
|
"geometry": f"{x},{y}",
|
|
"geometryType": "esriGeometryPoint",
|
|
"sr": "2056",
|
|
"layers": "all:ch.bfs.gebaeude_wohnungs_register",
|
|
"tolerance": tolerance,
|
|
"mapExtent": building_extent,
|
|
"imageDisplay": "800,600,96",
|
|
"returnGeometry": "false",
|
|
"f": "json"
|
|
}
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
|
|
async with session.get(self.MAPSERVER_IDENTIFY_URL, params=building_params) as response:
|
|
if response.status == 200:
|
|
building_data = await response.json()
|
|
building_results = building_data.get("results", [])
|
|
if building_results:
|
|
return building_results[0]
|
|
return None
|
|
except Exception as e:
|
|
logger.debug(f"Could not query building layer: {e}")
|
|
return None
|
|
|
|
def _validate_coordinates(self, x: float, y: float) -> None:
|
|
"""
|
|
Validate coordinates are within Switzerland bounds (LV95).
|
|
|
|
Args:
|
|
x: East coordinate (LV95)
|
|
y: North coordinate (LV95)
|
|
|
|
Raises:
|
|
ValueError: If coordinates are out of bounds
|
|
"""
|
|
if not (self.SWITZERLAND_BOUNDS["min_x"] <= x <= self.SWITZERLAND_BOUNDS["max_x"]):
|
|
raise ValueError(
|
|
f"X coordinate {x} out of bounds "
|
|
f"({self.SWITZERLAND_BOUNDS['min_x']} - {self.SWITZERLAND_BOUNDS['max_x']})"
|
|
)
|
|
if not (self.SWITZERLAND_BOUNDS["min_y"] <= y <= self.SWITZERLAND_BOUNDS["max_y"]):
|
|
raise ValueError(
|
|
f"Y coordinate {y} out of bounds "
|
|
f"({self.SWITZERLAND_BOUNDS['min_y']} - {self.SWITZERLAND_BOUNDS['max_y']})"
|
|
)
|
|
|
|
def _extract_coordinates_from_search_result(self, result: Dict[str, Any]) -> Optional[Tuple[float, float]]:
|
|
"""
|
|
Extract LV95 coordinates from SearchServer API result.
|
|
Handles different response formats and coordinate swapping.
|
|
|
|
Args:
|
|
result: Search result from SearchServer API
|
|
|
|
Returns:
|
|
Tuple of (x, y) in LV95 coordinates, or None if not found
|
|
"""
|
|
attrs = result.get("attrs", {})
|
|
|
|
# Try multiple extraction patterns
|
|
x = None
|
|
y = None
|
|
|
|
# Pattern 1: From 'geom_st_box2d' (most reliable - has correct X,Y order)
|
|
if 'geom_st_box2d' in attrs:
|
|
bbox_str = attrs['geom_st_box2d']
|
|
match = re.search(r'BOX\(([0-9.]+) ([0-9.]+),([0-9.]+) ([0-9.]+)\)', bbox_str)
|
|
if match:
|
|
xmin, ymin, xmax, ymax = map(float, match.groups())
|
|
x = (xmin + xmax) / 2 # Easting
|
|
y = (ymin + ymax) / 2 # Northing
|
|
logger.debug(f"Extracted coordinates from geom_st_box2d: X={x}, Y={y}")
|
|
# Pattern 2: From attrs with 'x' and 'y' keys (WARNING: these are SWAPPED!)
|
|
# The Swiss Geo Admin API returns x=northing, y=easting (opposite of standard)
|
|
elif 'x' in attrs and 'y' in attrs:
|
|
# SWAP: their 'x' is our Y, their 'y' is our X
|
|
x = attrs.get('y') # Their Y is easting (our X)
|
|
y = attrs.get('x') # Their X is northing (our Y)
|
|
logger.debug(f"Extracted swapped coordinates from attrs: X={x} (from 'y'), Y={y} (from 'x')")
|
|
# Pattern 3: From attrs with 'easting' and 'northing' keys
|
|
elif 'easting' in attrs and 'northing' in attrs:
|
|
x = attrs.get('easting')
|
|
y = attrs.get('northing')
|
|
logger.debug(f"Extracted coordinates from easting/northing: X={x}, Y={y}")
|
|
# Pattern 4: From top-level result (also likely swapped)
|
|
elif 'x' in result and 'y' in result:
|
|
x = result.get('y') # Swap
|
|
y = result.get('x') # Swap
|
|
logger.debug(f"Extracted swapped coordinates from result: X={x}, Y={y}")
|
|
|
|
if x is None or y is None:
|
|
return None
|
|
|
|
# Convert to float
|
|
x = float(x)
|
|
y = float(y)
|
|
|
|
# Validate that coordinates are in LV95 range
|
|
if not (self.SWITZERLAND_BOUNDS["min_x"] <= x <= self.SWITZERLAND_BOUNDS["max_x"]):
|
|
logger.warning(f"Extracted X coordinate {x} is out of LV95 bounds")
|
|
return None
|
|
|
|
if not (self.SWITZERLAND_BOUNDS["min_y"] <= y <= self.SWITZERLAND_BOUNDS["max_y"]):
|
|
logger.warning(f"Extracted Y coordinate {y} is out of LV95 bounds")
|
|
return None
|
|
|
|
return (x, y)
|
|
|
|
async def _make_request(
|
|
self,
|
|
url: str,
|
|
params: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Make HTTP GET request with retry logic and exponential backoff.
|
|
|
|
Args:
|
|
url: API endpoint URL
|
|
params: Query parameters
|
|
|
|
Returns:
|
|
Response JSON data
|
|
|
|
Raises:
|
|
aiohttp.ClientError: If request fails after all retries
|
|
"""
|
|
for attempt in range(self.max_retries + 1):
|
|
try:
|
|
async with aiohttp.ClientSession(timeout=self.timeout) as session:
|
|
async with session.get(url, params=params) as response:
|
|
# Don't retry on 4xx errors (client errors)
|
|
if 400 <= response.status < 500:
|
|
error_text = await response.text()
|
|
logger.error(f"API client error: {response.status} - {error_text}")
|
|
response.raise_for_status()
|
|
|
|
# Retry on 5xx errors (server errors)
|
|
if response.status >= 500:
|
|
if attempt < self.max_retries:
|
|
delay = self.retry_delay * (2 ** attempt)
|
|
logger.warning(
|
|
f"API server error {response.status}, "
|
|
f"retrying in {delay}s (attempt {attempt + 1}/{self.max_retries + 1})"
|
|
)
|
|
await asyncio.sleep(delay)
|
|
continue
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(f"API request failed after {self.max_retries + 1} attempts: {response.status} - {error_text}")
|
|
response.raise_for_status()
|
|
|
|
return await response.json()
|
|
|
|
except aiohttp.ClientError as e:
|
|
if attempt < self.max_retries:
|
|
delay = self.retry_delay * (2 ** attempt)
|
|
logger.warning(
|
|
f"API network error, retrying in {delay}s "
|
|
f"(attempt {attempt + 1}/{self.max_retries + 1}): {e}"
|
|
)
|
|
await asyncio.sleep(delay)
|
|
continue
|
|
else:
|
|
logger.error(f"API request failed after {self.max_retries + 1} attempts: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in API request: {e}")
|
|
raise
|
|
|
|
raise Exception(f"API request failed after {self.max_retries + 1} attempts")
|
|
|
|
async def geocode_address(self, address: str, return_full_result: bool = False) -> Optional[Union[Tuple[float, float], Dict[str, Any]]]:
|
|
"""
|
|
Geocode an address to LV95 coordinates.
|
|
|
|
Args:
|
|
address: Address string (e.g. "Bundesplatz 3, 3003 Bern")
|
|
return_full_result: If True, returns dict with coords and full result. If False, returns just coords tuple.
|
|
|
|
Returns:
|
|
If return_full_result=False: Tuple of (x, y) in LV95 coordinates, or None if not found
|
|
If return_full_result=True: Dict with 'coords' (x, y tuple) and 'result' (full geocoding result), or None if not found
|
|
"""
|
|
try:
|
|
# Normalize address: ensure postal code is properly formatted
|
|
# Swiss postal codes are 4 digits, often separated from street
|
|
normalized_address = address.strip()
|
|
|
|
# Try to detect and format postal code pattern: "Street Number POSTALCODE"
|
|
# Pattern: ends with 4 digits (postal code)
|
|
postal_code_pattern = r'\b(\d{4})\b'
|
|
postal_code_match = re.search(postal_code_pattern, normalized_address)
|
|
|
|
if postal_code_match:
|
|
postal_code = postal_code_match.group(1)
|
|
# Format as "Street Number, POSTALCODE" for better geocoding
|
|
# This helps the API understand the structure better
|
|
parts = normalized_address.split()
|
|
if len(parts) >= 2 and parts[-1] == postal_code:
|
|
# Already ends with postal code, format with comma
|
|
street_part = " ".join(parts[:-1])
|
|
normalized_address = f"{street_part}, {postal_code}"
|
|
logger.debug(f"Normalized address format: '{normalized_address}'")
|
|
|
|
# Request multiple results to find the best match
|
|
# Use "address" origin to prioritize address results over parcels
|
|
params = {
|
|
"searchText": normalized_address,
|
|
"type": "locations",
|
|
"origins": "address", # Prioritize address results
|
|
"sr": "2056", # Request coordinates in LV95 (EPSG:2056)
|
|
"limit": 10 # Get more results to find best match
|
|
}
|
|
|
|
logger.info(f"Geocoding address: {address} (normalized: {normalized_address})")
|
|
response = await self._make_request(self.GEOCODING_URL, params)
|
|
|
|
results = response.get("results", [])
|
|
if not results:
|
|
logger.warning(f"No geocoding results for address: {address}")
|
|
return None
|
|
|
|
# Normalize input address for comparison
|
|
input_normalized = address.lower().strip()
|
|
input_parts = [p.strip() for p in input_normalized.split() if p.strip()]
|
|
|
|
# Try to find the best matching result
|
|
best_match = None
|
|
best_score = 0
|
|
|
|
for result in results:
|
|
# Extract address label from result (remove HTML tags)
|
|
result_label_raw = result.get("label", "")
|
|
# Remove HTML tags like <b>...</b>
|
|
result_label = re.sub(r'<[^>]+>', '', result_label_raw).lower().strip()
|
|
attrs = result.get("attrs", {})
|
|
|
|
# Extract address components
|
|
street = attrs.get("strname", "")
|
|
if isinstance(street, list):
|
|
street = street[0] if street else ""
|
|
house_number = str(attrs.get("deinr", "")).strip()
|
|
plz = str(attrs.get("dplz4", "")).strip()
|
|
municipality = attrs.get("dplzname", "") or attrs.get("ggdename", "")
|
|
|
|
# Build address components for comparison
|
|
result_parts = []
|
|
if street:
|
|
result_parts.append(street.lower())
|
|
if house_number:
|
|
result_parts.append(house_number)
|
|
if plz:
|
|
result_parts.append(plz)
|
|
if municipality:
|
|
# Remove canton suffix
|
|
municipality_clean = municipality.split("(")[0].strip().split()
|
|
if len(municipality_clean) > 1 and len(municipality_clean[-1]) == 2:
|
|
municipality_clean = municipality_clean[:-1]
|
|
result_parts.append(" ".join(municipality_clean).lower())
|
|
|
|
# Also use the label for matching (it often has the full address)
|
|
if result_label:
|
|
label_parts = [p.strip() for p in result_label.split() if len(p.strip()) > 2]
|
|
result_parts.extend(label_parts)
|
|
|
|
# Calculate match score
|
|
score = 0
|
|
matched_parts = 0
|
|
|
|
# Check if input parts match result parts
|
|
for input_part in input_parts:
|
|
for result_part in result_parts:
|
|
if input_part in result_part or result_part in input_part:
|
|
score += 1
|
|
matched_parts += 1
|
|
break
|
|
|
|
# Prefer results with higher match score
|
|
if matched_parts > 0:
|
|
score = matched_parts / max(len(input_parts), len(result_parts))
|
|
|
|
logger.debug(f"Geocoding result match: label='{result_label}', score={score:.2f}, matched={matched_parts}/{len(input_parts)}")
|
|
|
|
if score > best_score:
|
|
best_score = score
|
|
best_match = result
|
|
|
|
# Use best match or fall back to first result
|
|
result = best_match if best_match else results[0]
|
|
|
|
if best_match:
|
|
logger.info(f"Selected best match with score {best_score:.2f}: {result.get('label', 'Unknown')}")
|
|
else:
|
|
logger.info(f"Using first result: {result.get('label', 'Unknown')}")
|
|
|
|
logger.debug(f"Geocoding result: {result}")
|
|
|
|
coords = self._extract_coordinates_from_search_result(result)
|
|
if coords:
|
|
x, y = coords
|
|
logger.info(f"Geocoded address '{address}' to LV95 coordinates: ({x}, {y})")
|
|
|
|
if return_full_result:
|
|
return {
|
|
'coords': coords,
|
|
'result': result
|
|
}
|
|
else:
|
|
return coords
|
|
else:
|
|
logger.warning(f"No coordinates found in geocoding result. Result: {result}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error geocoding address '{address}': {e}", exc_info=True)
|
|
return None
|
|
|
|
async def search_parcel(
|
|
self,
|
|
location: str,
|
|
tolerance: int = 5 # Reduced default tolerance for more precise parcel matching
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Search for parcel by address or coordinates using MapServer identify.
|
|
|
|
Args:
|
|
location: Either coordinates as "x,y" (LV95) or address string
|
|
tolerance: Tolerance in pixels for identify operation
|
|
|
|
Returns:
|
|
Parcel information dictionary with geocoded address info, or None if not found
|
|
"""
|
|
try:
|
|
original_location = location
|
|
geocoded_address_info = None
|
|
geocoded_coords = None
|
|
|
|
# Try to parse as coordinates first
|
|
parts = location.split(",")
|
|
if len(parts) == 2:
|
|
try:
|
|
x = float(parts[0].strip())
|
|
y = float(parts[1].strip())
|
|
logger.info(f"Parsed location as coordinates: ({x}, {y})")
|
|
# Validate coordinates
|
|
self._validate_coordinates(x, y)
|
|
except ValueError:
|
|
# Not coordinates, try geocoding - get full result
|
|
geocode_result = await self.geocode_address(location, return_full_result=True)
|
|
if geocode_result is None:
|
|
logger.warning(f"Could not geocode location: {location}")
|
|
return None
|
|
x, y = geocode_result['coords']
|
|
geocoded_address_info = geocode_result['result']
|
|
else:
|
|
# Treat as address and geocode - get full result including address info
|
|
geocode_result = await self.geocode_address(location, return_full_result=True)
|
|
if geocode_result is None:
|
|
logger.warning(f"Could not geocode location: {location}")
|
|
return None
|
|
|
|
x, y = geocode_result['coords']
|
|
geocoded_address_info = geocode_result['result']
|
|
|
|
# Log the geocoded address info
|
|
if geocoded_address_info:
|
|
label_raw = geocoded_address_info.get('label', '')
|
|
label_clean = re.sub(r'<[^>]+>', '', label_raw).strip()
|
|
logger.info(f"Geocoded address info: {label_clean}")
|
|
|
|
# First, query the building layer with minimal tolerance to get the exact building
|
|
# This is critical for precise map clicks - we want the exact building, not nearby ones
|
|
is_coordinate_search = len(parts) == 2 and not geocoded_address_info
|
|
# Use 1 pixel tolerance (minimum) for coordinate clicks to ensure we get results
|
|
building_tolerance = 1 if is_coordinate_search else 10
|
|
building_result = await self._query_building_layer(x, y, tolerance=building_tolerance, buffer=25)
|
|
|
|
building_parcel_id = None
|
|
if building_result:
|
|
building_attrs = building_result.get("attributes", {})
|
|
building_parcel_id = building_attrs.get("lparz")
|
|
logger.debug(f"Found building with parcel ID: {building_parcel_id}")
|
|
|
|
# Use MapServer identify to get parcel info
|
|
# For coordinate searches, use minimal tolerance (1 pixel) for spatial containment
|
|
# Note: tolerance=0 might be too strict, use 1 pixel minimum
|
|
parcel_tolerance = 1 if is_coordinate_search else tolerance
|
|
extent_buffer = 1000 # 1km buffer
|
|
map_extent = f"{x - extent_buffer},{y - extent_buffer},{x + extent_buffer},{y + extent_buffer}"
|
|
|
|
params = {
|
|
"geometry": f"{x},{y}",
|
|
"geometryType": "esriGeometryPoint",
|
|
"sr": "2056", # LV95
|
|
"layers": self.LAYER_AMTLICHE_VERMESSUNG,
|
|
"tolerance": parcel_tolerance,
|
|
"mapExtent": map_extent,
|
|
"imageDisplay": "800,600,96",
|
|
"returnGeometry": "true",
|
|
"f": "json"
|
|
}
|
|
|
|
logger.info(f"Querying parcel at coordinates: ({x}, {y}) with tolerance: {parcel_tolerance}")
|
|
response = await self._make_request(self.MAPSERVER_IDENTIFY_URL, params)
|
|
|
|
results = response.get("results", [])
|
|
if not results:
|
|
logger.warning(f"No parcel found at coordinates: ({x}, {y})")
|
|
return None
|
|
|
|
# Strategy 1: If we have a building parcel ID, find the matching parcel
|
|
parcel_data = None
|
|
if building_parcel_id:
|
|
building_parcel_id_normalized = str(building_parcel_id).strip().upper()
|
|
|
|
for result in results:
|
|
result_label = str(result.get('attributes', {}).get('label', '')).strip().upper()
|
|
result_number = str(result.get('attributes', {}).get('number', '')).strip().upper()
|
|
|
|
if (result_label == building_parcel_id_normalized or
|
|
result_number == building_parcel_id_normalized or
|
|
result_label.endswith(building_parcel_id_normalized) or
|
|
building_parcel_id_normalized in result_label):
|
|
parcel_data = result
|
|
logger.info(f"Found matching parcel by building parcel ID: {result_label} (building had: {building_parcel_id})")
|
|
break
|
|
|
|
# Strategy 2: For coordinate searches, always check spatial containment (point-in-polygon)
|
|
# This ensures we get the parcel that actually contains the point, not just the nearest one
|
|
# We MUST use spatial containment for coordinate searches - no legacy fallback
|
|
if is_coordinate_search:
|
|
logger.debug("Checking spatial containment for coordinate search")
|
|
|
|
containing_parcels = []
|
|
|
|
for result in results:
|
|
geometry = result.get('geometry', {})
|
|
if not geometry:
|
|
continue
|
|
|
|
# Extract polygon rings from geometry
|
|
rings = geometry.get('rings', [])
|
|
if rings:
|
|
try:
|
|
# Check if point is inside this parcel's polygon
|
|
if self._point_in_polygon(x, y, rings):
|
|
containing_parcels.append(result)
|
|
parcel_label = result.get('attributes', {}).get('label', 'Unknown')
|
|
logger.debug(f"Point is inside parcel: {parcel_label}")
|
|
except Exception as e:
|
|
logger.debug(f"Error checking point-in-polygon for parcel {result.get('attributes', {}).get('label', 'Unknown')}: {e}")
|
|
continue
|
|
|
|
if not containing_parcels:
|
|
# No parcels contain the point - this should not happen for valid coordinates
|
|
logger.warning(f"No parcels contain point ({x}, {y}) - this may indicate a data issue")
|
|
return None
|
|
|
|
# If we already have a parcel from building match, verify it's in containing parcels
|
|
if parcel_data:
|
|
parcel_label = parcel_data.get('attributes', {}).get('label', 'Unknown')
|
|
parcel_in_containing = any(
|
|
p.get('attributes', {}).get('label') == parcel_label
|
|
for p in containing_parcels
|
|
)
|
|
if not parcel_in_containing:
|
|
logger.warning(f"Building-matched parcel {parcel_label} does not contain point, using spatial containment instead")
|
|
parcel_data = None
|
|
|
|
# If no parcel yet or building match was invalid, use spatial containment
|
|
if not parcel_data:
|
|
if building_parcel_id:
|
|
# Prefer parcel matching building parcel ID among containing parcels
|
|
building_parcel_id_normalized = str(building_parcel_id).strip().upper()
|
|
for parcel in containing_parcels:
|
|
result_label = str(parcel.get('attributes', {}).get('label', '')).strip().upper()
|
|
if (result_label == building_parcel_id_normalized or
|
|
result_label.endswith(building_parcel_id_normalized) or
|
|
building_parcel_id_normalized in result_label):
|
|
parcel_data = parcel
|
|
logger.info(f"Found matching containing parcel by building parcel ID: {result_label}")
|
|
break
|
|
|
|
# If no building match, use first containing parcel
|
|
if not parcel_data:
|
|
parcel_data = containing_parcels[0]
|
|
parcel_label = parcel_data.get('attributes', {}).get('label', 'Unknown')
|
|
logger.info(f"Found parcel by spatial containment: {parcel_label} (from {len(containing_parcels)} containing parcels)")
|
|
|
|
# Fallback for address searches only (not coordinate searches)
|
|
if not parcel_data and not is_coordinate_search:
|
|
parcel_data = results[0]
|
|
if building_parcel_id:
|
|
logger.debug(f"Could not find parcel matching building parcel ID {building_parcel_id}, using first result: {parcel_data.get('attributes', {}).get('label', 'Unknown')}")
|
|
|
|
parcel_label = parcel_data.get('attributes', {}).get('label', 'Unknown')
|
|
logger.info(f"Found parcel: {parcel_label}")
|
|
|
|
# Store the query coordinates (used for address lookup, not parcel centroid)
|
|
parcel_data['query_coordinates'] = {'x': x, 'y': y}
|
|
|
|
# Attach geocoded address info to parcel_data for use by the route
|
|
if geocoded_address_info:
|
|
# Extract address components from geocoded result
|
|
attrs = geocoded_address_info.get('attrs', {})
|
|
|
|
# Get label and clean HTML tags first
|
|
label_raw = geocoded_address_info.get('label', '')
|
|
label_clean = re.sub(r'<[^>]+>', '', label_raw).strip()
|
|
|
|
# Extract address components - try multiple sources
|
|
street_list = attrs.get("strname", [])
|
|
street = street_list[0] if isinstance(street_list, list) and street_list else None
|
|
|
|
# House number can be in attrs.deinr or top-level num
|
|
house_number = attrs.get("deinr") or geocoded_address_info.get("num")
|
|
if house_number:
|
|
house_number = str(house_number).strip()
|
|
|
|
# Postal code
|
|
plz = attrs.get("dplz4")
|
|
if plz:
|
|
plz = str(plz).strip()
|
|
|
|
# Municipality
|
|
municipality = attrs.get("dplzname") or attrs.get("ggdename")
|
|
|
|
# If street not found in attrs, try to parse from label
|
|
if not street and label_clean:
|
|
# Parse "Street Number" from label (e.g., "Ueberlandstrasse 11 8050 Zürich")
|
|
label_parts = label_clean.split()
|
|
if len(label_parts) >= 2:
|
|
# Try to find street name (everything before the number)
|
|
for i in range(len(label_parts) - 1, 0, -1):
|
|
if label_parts[i].isdigit():
|
|
street = " ".join(label_parts[:i])
|
|
break
|
|
|
|
# If postal code not found, try to extract from label
|
|
if not plz and label_clean:
|
|
# Look for 4-digit postal code in label
|
|
postal_match = re.search(r'\b(\d{4})\b', label_clean)
|
|
if postal_match:
|
|
plz = postal_match.group(1)
|
|
|
|
# If municipality not found, try to extract from label
|
|
if not municipality and label_clean:
|
|
# Municipality is usually after postal code
|
|
# Format: "Street Number POSTALCODE Municipality"
|
|
label_parts = label_clean.split()
|
|
if len(label_parts) >= 2:
|
|
# Find postal code position and take what comes after
|
|
for i, part in enumerate(label_parts):
|
|
if len(part) == 4 and part.isdigit():
|
|
if i + 1 < len(label_parts):
|
|
municipality = " ".join(label_parts[i+1:])
|
|
break
|
|
|
|
# Clean municipality name
|
|
if municipality:
|
|
municipality = self._clean_municipality_name(municipality)
|
|
|
|
# Construct full address - prefer label_clean, but build from components if needed
|
|
if label_clean:
|
|
full_address = label_clean
|
|
elif street and house_number and plz and municipality:
|
|
full_address = f"{street} {house_number}, {plz} {municipality}"
|
|
elif street and house_number:
|
|
full_address = f"{street} {house_number}"
|
|
if plz:
|
|
full_address += f", {plz}"
|
|
if municipality:
|
|
full_address += f" {municipality}"
|
|
else:
|
|
full_address = label_clean or original_location
|
|
|
|
parcel_data['geocoded_address'] = {
|
|
'label': label_clean,
|
|
'street': street,
|
|
'house_number': house_number,
|
|
'plz': plz,
|
|
'municipality': municipality,
|
|
'full_address': full_address
|
|
}
|
|
|
|
logger.debug(f"Attached geocoded address to parcel: {full_address}")
|
|
logger.debug(f"Address components - street: {street}, house_number: {house_number}, plz: {plz}, municipality: {municipality}")
|
|
|
|
# Log warning if geocoded address doesn't match input (for debugging)
|
|
if geocoded_address_info and original_location and any(c.isalpha() for c in original_location):
|
|
label_raw = geocoded_address_info.get('label', '')
|
|
geocoded_label = re.sub(r'<[^>]+>', '', label_raw).lower().strip()
|
|
original_lower = original_location.lower()
|
|
|
|
# Check if key parts match
|
|
original_parts = set(p.strip() for p in original_lower.split() if len(p.strip()) > 2)
|
|
geocoded_parts = set(p.strip() for p in geocoded_label.split() if len(p.strip()) > 2)
|
|
|
|
# Check if street name and number are similar
|
|
match_score = len(original_parts.intersection(geocoded_parts)) / max(len(original_parts), 1)
|
|
|
|
if match_score < 0.5:
|
|
logger.warning(
|
|
f"Geocoded address may not match input! "
|
|
f"Input: '{original_location}' -> Geocoded: '{geocoded_label}' "
|
|
f"(match score: {match_score:.2f})"
|
|
)
|
|
|
|
return parcel_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching parcel for location '{location}': {e}")
|
|
raise
|
|
|
|
def extract_parcel_attributes(self, parcel_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extract and normalize parcel attributes from MapServer response.
|
|
|
|
Args:
|
|
parcel_data: Raw parcel data from MapServer identify
|
|
|
|
Returns:
|
|
Dictionary with normalized parcel attributes compatible with Parzelle model
|
|
"""
|
|
attributes = parcel_data.get("attributes", {})
|
|
geometry = parcel_data.get("geometry")
|
|
|
|
# Extract common parcel attributes
|
|
# Note: Attribute names depend on the actual data structure from the API
|
|
# These are common fields, adjust based on actual API response
|
|
extracted = {
|
|
"label": attributes.get("label") or attributes.get("parzellennummer") or attributes.get("nummer") or "Unknown",
|
|
"parzellenAliasTags": [],
|
|
"strasseNr": attributes.get("adresse") or attributes.get("strasse"),
|
|
"plz": attributes.get("plz"),
|
|
"eigentuemerschaft": attributes.get("eigentuemer") or attributes.get("eigentuemerschaft"),
|
|
"bauzone": attributes.get("bauzone") or attributes.get("zonierung"),
|
|
"perimeter": self._convert_geometry_to_geopolylinie(geometry) if geometry else None,
|
|
"kontextGemeinde": attributes.get("gemeinde") or attributes.get("gemeindename"),
|
|
# Additional metadata
|
|
"raw_attributes": attributes # Keep raw data for reference
|
|
}
|
|
|
|
return extracted
|
|
|
|
async def get_parcel_polygon(
|
|
self,
|
|
gemeinde: str,
|
|
parzellen_nr: str,
|
|
sr: int = 2056
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Holt die vollständige Polygon-Geometrie einer Parzelle nach Gemeinde und Parzellennummer.
|
|
|
|
Args:
|
|
gemeinde: Name der Gemeinde (z.B. "Bern")
|
|
parzellen_nr: Parzellennummer (z.B. "1234")
|
|
sr: Koordinatensystem (2056=LV95, 4326=WGS84)
|
|
|
|
Returns:
|
|
GeoJSON-Feature mit Polygon-Koordinaten oder None wenn nicht gefunden
|
|
"""
|
|
try:
|
|
# Schritt 1: Parzelle suchen
|
|
search_params = {
|
|
"searchText": f"{gemeinde} {parzellen_nr}",
|
|
"type": "locations",
|
|
"origins": "parcel",
|
|
"sr": str(sr)
|
|
}
|
|
|
|
logger.info(f"Searching for parcel: {gemeinde} {parzellen_nr}")
|
|
response = await self._make_request(self.GEOCODING_URL, search_params)
|
|
|
|
search_results = response.get("results", [])
|
|
if not search_results:
|
|
logger.warning(f"Parzelle {gemeinde} {parzellen_nr} nicht gefunden")
|
|
return None
|
|
|
|
# Extract coordinates using shared helper method
|
|
search_result = search_results[0]
|
|
coords = self._extract_coordinates_from_search_result(search_result)
|
|
|
|
if coords is None:
|
|
logger.warning(f"Could not extract coordinates from search result")
|
|
return None
|
|
|
|
x, y = coords
|
|
logger.info(f"Parzelle gefunden: {search_result.get('label', 'Unknown')}, Zentrum: E={x}, N={y}")
|
|
|
|
# Schritt 2: Polygon-Geometrie abrufen
|
|
identify_params = {
|
|
"geometry": f"{x},{y}",
|
|
"geometryType": "esriGeometryPoint",
|
|
"layers": self.LAYER_AMTLICHE_VERMESSUNG,
|
|
"tolerance": "0",
|
|
"returnGeometry": "true",
|
|
"geometryFormat": "geojson",
|
|
"sr": str(sr),
|
|
"imageDisplay": "0,0,0",
|
|
"mapExtent": "0,0,0,0"
|
|
}
|
|
|
|
logger.info(f"Fetching polygon geometry for parcel at ({x}, {y})")
|
|
identify_response = await self._make_request(self.MAPSERVER_IDENTIFY_URL, identify_params)
|
|
|
|
identify_results = identify_response.get("results", [])
|
|
if not identify_results:
|
|
logger.warning("Keine Geometrie gefunden")
|
|
return None
|
|
|
|
# Return the GeoJSON feature
|
|
feature = identify_results[0]
|
|
logger.info(f"Successfully retrieved polygon geometry with {len(feature.get('geometry', {}).get('coordinates', [[]])[0])} points")
|
|
|
|
return feature
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting parcel polygon for {gemeinde} {parzellen_nr}: {e}", exc_info=True)
|
|
return None
|
|
|
|
def extract_boundary_points(self, feature: Dict[str, Any]) -> List[List[float]]:
|
|
"""
|
|
Extrahiert alle Grenzpunkte aus dem GeoJSON-Feature.
|
|
|
|
Args:
|
|
feature: GeoJSON Feature mit Polygon-Geometrie
|
|
|
|
Returns:
|
|
Liste von Koordinatenpaaren [[x, y], ...] in LV95
|
|
"""
|
|
try:
|
|
geometry = feature.get("geometry", {})
|
|
|
|
if geometry.get("type") == "Polygon":
|
|
# Äusserer Ring = erste Liste von Koordinaten
|
|
return geometry["coordinates"][0]
|
|
elif geometry.get("type") == "MultiPolygon":
|
|
# Alle Ringe aller Polygone
|
|
all_points = []
|
|
for polygon in geometry["coordinates"]:
|
|
all_points.extend(polygon[0])
|
|
return all_points
|
|
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting boundary points: {e}")
|
|
return []
|
|
|
|
async def find_neighboring_parcels(
|
|
self,
|
|
parcel_data: Dict[str, Any],
|
|
selected_parcel_id: str,
|
|
sample_distance: float = 20.0, # Balanced distance for good coverage
|
|
max_sample_points: int = 30, # Increased to ensure all vertices + some intermediate points
|
|
max_neighbors: int = 15, # Increased to find more neighbors
|
|
max_concurrent: int = 50 # Process up to 50 queries concurrently (maximum parallelization)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Find all parcels that touch the boundary of the selected parcel.
|
|
|
|
This method samples points along the parcel boundary and queries parcels
|
|
at each point to find all neighboring parcels that actually touch the boundary.
|
|
Optimized for speed with intelligent sampling and early stopping.
|
|
|
|
Args:
|
|
parcel_data: The selected parcel's data dictionary (must contain geometry)
|
|
selected_parcel_id: The ID of the selected parcel (to exclude from results)
|
|
sample_distance: Distance in meters between sample points along the boundary (default: 15m)
|
|
max_sample_points: Maximum number of sample points to query (default: 30)
|
|
max_neighbors: Stop early if we find this many unique neighbors (default: 15)
|
|
|
|
Returns:
|
|
List of neighboring parcel dictionaries with id, egrid, and number
|
|
"""
|
|
try:
|
|
geometry = parcel_data.get("geometry", {})
|
|
if not geometry:
|
|
logger.warning("No geometry found in parcel_data for finding neighbors")
|
|
return []
|
|
|
|
# Extract boundary points from geometry
|
|
boundary_points = []
|
|
|
|
# Handle ESRI format (rings)
|
|
if "rings" in geometry and geometry["rings"]:
|
|
ring = geometry["rings"][0] # Outer ring
|
|
for coord in ring:
|
|
if len(coord) >= 2:
|
|
boundary_points.append((coord[0], coord[1]))
|
|
|
|
# Handle GeoJSON format
|
|
elif geometry.get("type") == "Polygon":
|
|
coordinates = geometry.get("coordinates", [])
|
|
if coordinates and len(coordinates) > 0:
|
|
ring = coordinates[0] # Outer ring
|
|
for coord in ring:
|
|
if len(coord) >= 2:
|
|
boundary_points.append((coord[0], coord[1]))
|
|
|
|
if not boundary_points:
|
|
logger.warning("No boundary points found in parcel geometry")
|
|
return []
|
|
|
|
# Sample points along the boundary, offset outward
|
|
# Increased offset to ensure we're outside the selected parcel
|
|
offset_distance = 8.0 # meters - increased from 2.0 to ensure we're outside
|
|
sampled_points = []
|
|
|
|
# Calculate centroid for determining outward direction
|
|
centroid_x = sum(p[0] for p in boundary_points) / len(boundary_points)
|
|
centroid_y = sum(p[1] for p in boundary_points) / len(boundary_points)
|
|
|
|
# First pass: Always include ALL vertices (critical for finding all neighbors)
|
|
# Don't limit vertices - they're essential for coverage
|
|
edge_info = [] # Store edge information for intermediate sampling
|
|
for i in range(len(boundary_points)):
|
|
p1 = boundary_points[i]
|
|
p2 = boundary_points[(i + 1) % len(boundary_points)]
|
|
|
|
# Calculate edge vector
|
|
edge_dx = p2[0] - p1[0]
|
|
edge_dy = p2[1] - p1[1]
|
|
edge_length = (edge_dx**2 + edge_dy**2)**0.5
|
|
|
|
if edge_length > 0:
|
|
# Calculate perpendicular vector (pointing outward)
|
|
perp_dx = -edge_dy / edge_length
|
|
perp_dy = edge_dx / edge_length
|
|
|
|
# Check if this points outward (away from centroid)
|
|
to_point_dx = p1[0] - centroid_x
|
|
to_point_dy = p1[1] - centroid_y
|
|
if (perp_dx * to_point_dx + perp_dy * to_point_dy) < 0:
|
|
perp_dx = -perp_dx
|
|
perp_dy = -perp_dy
|
|
|
|
# Offset vertex point outward
|
|
offset_p1 = (p1[0] + perp_dx * offset_distance, p1[1] + perp_dy * offset_distance)
|
|
sampled_points.append(offset_p1)
|
|
|
|
# Store edge info for intermediate sampling
|
|
edge_info.append({
|
|
'p1': p1,
|
|
'edge_length': edge_length,
|
|
'perp_dx': perp_dx,
|
|
'perp_dy': perp_dy,
|
|
'edge_dx': edge_dx,
|
|
'edge_dy': edge_dy
|
|
})
|
|
|
|
# Second pass: Add intermediate points for edges longer than sample_distance
|
|
# This ensures we don't miss neighbors that touch in the middle of long edges
|
|
for edge in edge_info:
|
|
if len(sampled_points) >= max_sample_points:
|
|
break
|
|
|
|
# Add intermediate points for edges longer than sample_distance
|
|
if edge['edge_length'] > sample_distance:
|
|
# Calculate how many intermediate points we need
|
|
num_samples = int(edge['edge_length'] / sample_distance)
|
|
# Limit to reasonable number per edge (max 3) to avoid too many points
|
|
num_samples = min(num_samples, 3)
|
|
|
|
for j in range(1, num_samples + 1):
|
|
if len(sampled_points) >= max_sample_points:
|
|
break
|
|
t = j / (num_samples + 1)
|
|
# Interpolate along edge
|
|
interp_x = edge['p1'][0] + t * edge['edge_dx']
|
|
interp_y = edge['p1'][1] + t * edge['edge_dy']
|
|
# Offset outward
|
|
offset_x = interp_x + edge['perp_dx'] * offset_distance
|
|
offset_y = interp_y + edge['perp_dy'] * offset_distance
|
|
sampled_points.append((offset_x, offset_y))
|
|
|
|
# Don't limit vertices, but limit total if we have too many intermediate points
|
|
# This ensures we always have all vertices for complete coverage
|
|
if len(sampled_points) > max_sample_points:
|
|
# Keep all vertices, limit intermediate points
|
|
vertex_count = len(boundary_points)
|
|
if vertex_count <= max_sample_points:
|
|
# Keep vertices + some intermediate points
|
|
sampled_points = sampled_points[:max_sample_points]
|
|
else:
|
|
# Too many vertices - keep all vertices anyway (they're critical)
|
|
sampled_points = sampled_points[:len(sampled_points)]
|
|
|
|
# Query parcels in parallel batches for much faster performance
|
|
neighboring_parcels = {}
|
|
tolerance = 2 # Small tolerance for boundary queries
|
|
|
|
async def query_point(point: Tuple[float, float]) -> Optional[Dict[str, Any]]:
|
|
"""Query a single point and return parcel data if found."""
|
|
try:
|
|
x, y = point
|
|
location_str = f"{x},{y}"
|
|
adj_data = await self.search_parcel(location_str, tolerance=tolerance)
|
|
|
|
if adj_data:
|
|
adj_attrs = adj_data.get("attributes", {})
|
|
adj_id = adj_attrs.get("label") or adj_attrs.get("number")
|
|
|
|
# Exclude the selected parcel itself
|
|
if adj_id and adj_id != selected_parcel_id:
|
|
# Extract geometry information
|
|
adj_geometry = adj_data.get("geometry", {})
|
|
extracted_attrs = self.extract_parcel_attributes(adj_data)
|
|
|
|
return {
|
|
"id": adj_id,
|
|
"egrid": adj_attrs.get("egris_egrid"),
|
|
"number": adj_attrs.get("number"),
|
|
"perimeter": extracted_attrs.get("perimeter"),
|
|
"geometry": adj_geometry
|
|
}
|
|
except Exception:
|
|
# Silently skip errors for speed
|
|
pass
|
|
return None
|
|
|
|
# Process ALL queries concurrently for maximum speed
|
|
# Use semaphore to limit concurrent connections to avoid overwhelming the API
|
|
semaphore = asyncio.Semaphore(max_concurrent)
|
|
|
|
async def query_point_with_semaphore(point: Tuple[float, float]) -> Optional[Dict[str, Any]]:
|
|
"""Query a point with semaphore-controlled concurrency."""
|
|
async with semaphore:
|
|
return await query_point(point)
|
|
|
|
# Launch all queries concurrently (semaphore will limit actual concurrency)
|
|
results = await asyncio.gather(*[query_point_with_semaphore(point) for point in sampled_points], return_exceptions=True)
|
|
|
|
# Process all results (minimal logging for speed)
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
continue
|
|
|
|
if result and result["id"] not in neighboring_parcels:
|
|
neighboring_parcels[result["id"]] = result
|
|
|
|
# Early stop if we've found enough neighbors
|
|
if len(neighboring_parcels) >= max_neighbors:
|
|
break
|
|
|
|
result_list = list(neighboring_parcels.values())
|
|
logger.info(f"Found {len(result_list)} neighboring parcels for parcel {selected_parcel_id} (queried {len(sampled_points)} points)")
|
|
|
|
return result_list
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error finding neighboring parcels: {e}", exc_info=True)
|
|
return []
|
|
|
|
def _convert_geometry_to_geopolylinie(self, geometry: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Convert ESRI geometry or GeoJSON to GeoPolylinie format.
|
|
|
|
Args:
|
|
geometry: ESRI geometry (rings) or GeoJSON geometry from MapServer response
|
|
|
|
Returns:
|
|
GeoPolylinie-compatible dictionary or None
|
|
"""
|
|
try:
|
|
# Handle GeoJSON format (from get_parcel_polygon)
|
|
if geometry.get("type") == "Polygon":
|
|
coordinates = geometry.get("coordinates", [])
|
|
if coordinates and len(coordinates) > 0:
|
|
ring = coordinates[0] # Outer ring
|
|
|
|
punkte = []
|
|
for coord in ring:
|
|
if len(coord) >= 2:
|
|
punkt = {
|
|
"koordinatensystem": "LV95",
|
|
"x": coord[0], # GeoJSON: [x, y] = [easting, northing]
|
|
"y": coord[1],
|
|
"z": coord[2] if len(coord) > 2 else None
|
|
}
|
|
punkte.append(punkt)
|
|
|
|
return {
|
|
"closed": True,
|
|
"punkte": punkte
|
|
}
|
|
|
|
# Handle ESRI format (rings) - existing code
|
|
elif "rings" in geometry and geometry["rings"]:
|
|
ring = geometry["rings"][0] # Take first ring (outer boundary)
|
|
|
|
punkte = []
|
|
for coord in ring:
|
|
if len(coord) >= 2:
|
|
punkt = {
|
|
"koordinatensystem": "LV95",
|
|
"x": coord[0],
|
|
"y": coord[1],
|
|
"z": coord[2] if len(coord) > 2 else None
|
|
}
|
|
punkte.append(punkt)
|
|
|
|
return {
|
|
"closed": True,
|
|
"punkte": punkte
|
|
}
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error converting geometry to GeoPolylinie: {e}")
|
|
return None
|
|
|