2058 lines
93 KiB
Python
2058 lines
93 KiB
Python
"""
|
|
Real Estate feature main logic.
|
|
Handles database operations with AI-powered natural language processing.
|
|
Stateless implementation without session management.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
from typing import Optional, Dict, Any, List
|
|
from fastapi import HTTPException, status
|
|
from shapely.geometry import Polygon
|
|
from shapely.ops import unary_union
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelRealEstate import (
|
|
Projekt,
|
|
Parzelle,
|
|
StatusProzess,
|
|
GeoPolylinie,
|
|
GeoPunkt,
|
|
Kontext,
|
|
Gemeinde,
|
|
Kanton,
|
|
Land,
|
|
)
|
|
from modules.services import getInterface as getServices
|
|
from modules.interfaces.interfaceDbRealEstate import getInterface as getRealEstateInterface
|
|
from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ===== Geometry Utilities =====
|
|
|
|
def geopolylinie_to_shapely_polygon(geopolylinie: GeoPolylinie) -> Polygon:
|
|
"""
|
|
Convert GeoPolylinie to Shapely Polygon.
|
|
|
|
Args:
|
|
geopolylinie: GeoPolylinie instance with punkte list
|
|
|
|
Returns:
|
|
Shapely Polygon object
|
|
"""
|
|
if not geopolylinie or not geopolylinie.punkte:
|
|
raise ValueError("GeoPolylinie must have at least one point")
|
|
|
|
# Extract coordinates from punkte
|
|
coordinates = []
|
|
for punkt in geopolylinie.punkte:
|
|
coordinates.append((punkt.x, punkt.y))
|
|
|
|
# Ensure polygon is closed (first point == last point)
|
|
if len(coordinates) < 3:
|
|
raise ValueError("Polygon must have at least 3 points")
|
|
|
|
# Close polygon if not already closed
|
|
if coordinates[0] != coordinates[-1]:
|
|
coordinates.append(coordinates[0])
|
|
|
|
return Polygon(coordinates)
|
|
|
|
|
|
def shapely_polygon_to_geopolylinie(polygon: Polygon) -> GeoPolylinie:
|
|
"""
|
|
Convert Shapely Polygon to GeoPolylinie.
|
|
|
|
Args:
|
|
polygon: Shapely Polygon object
|
|
|
|
Returns:
|
|
GeoPolylinie instance with LV95 coordinate system
|
|
"""
|
|
if not polygon or polygon.is_empty:
|
|
raise ValueError("Polygon must not be empty")
|
|
|
|
# Extract exterior coordinates
|
|
exterior_coords = list(polygon.exterior.coords)
|
|
|
|
# Remove duplicate last point if present (Shapely includes it)
|
|
if len(exterior_coords) > 1 and exterior_coords[0] == exterior_coords[-1]:
|
|
exterior_coords = exterior_coords[:-1]
|
|
|
|
# Convert to GeoPunkt list
|
|
punkte = []
|
|
for coord in exterior_coords:
|
|
punkt = GeoPunkt(
|
|
koordinatensystem="LV95",
|
|
x=float(coord[0]),
|
|
y=float(coord[1]),
|
|
z=None
|
|
)
|
|
punkte.append(punkt)
|
|
|
|
return GeoPolylinie(
|
|
closed=True,
|
|
punkte=punkte
|
|
)
|
|
|
|
|
|
def combine_parcel_geometries(geometries: List[GeoPolylinie]) -> GeoPolylinie:
|
|
"""
|
|
Combine multiple parcel geometries into a single outer outline.
|
|
|
|
Uses Shapely union operation to merge polygons and automatically
|
|
removes internal edges. The result is a clean outer boundary.
|
|
|
|
Args:
|
|
geometries: List of GeoPolylinie instances to combine
|
|
|
|
Returns:
|
|
Combined GeoPolylinie representing the outer outline
|
|
|
|
Raises:
|
|
ValueError: If geometries list is empty or invalid
|
|
"""
|
|
if not geometries or len(geometries) == 0:
|
|
raise ValueError("At least one geometry is required")
|
|
|
|
if len(geometries) == 1:
|
|
# Single geometry - return as-is
|
|
return geometries[0]
|
|
|
|
# Convert all geometries to Shapely Polygons
|
|
shapely_polygons = []
|
|
for geo in geometries:
|
|
try:
|
|
polygon = geopolylinie_to_shapely_polygon(geo)
|
|
if not polygon.is_empty:
|
|
shapely_polygons.append(polygon)
|
|
except Exception as e:
|
|
logger.warning(f"Error converting geometry to Shapely Polygon: {e}")
|
|
continue
|
|
|
|
if not shapely_polygons:
|
|
raise ValueError("No valid geometries to combine")
|
|
|
|
if len(shapely_polygons) == 1:
|
|
# Only one valid polygon - convert back
|
|
return shapely_polygon_to_geopolylinie(shapely_polygons[0])
|
|
|
|
# Perform union operation - automatically removes internal edges
|
|
try:
|
|
combined = unary_union(shapely_polygons)
|
|
|
|
# Handle MultiPolygon case (disconnected parcels)
|
|
if hasattr(combined, 'geoms'):
|
|
# Multiple separate polygons - combine their exteriors
|
|
# For now, take the largest polygon or combine all exteriors
|
|
# In practice, we might want to keep them separate or combine differently
|
|
largest = max(combined.geoms, key=lambda p: p.area)
|
|
combined = largest
|
|
|
|
# Extract outer boundary
|
|
if combined.is_empty:
|
|
raise ValueError("Union resulted in empty geometry")
|
|
|
|
# Convert back to GeoPolylinie
|
|
result = shapely_polygon_to_geopolylinie(combined)
|
|
logger.info(f"Combined {len(geometries)} geometries into single outline with {len(result.punkte)} points")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error combining geometries: {e}", exc_info=True)
|
|
raise ValueError(f"Failed to combine geometries: {str(e)}")
|
|
|
|
|
|
def filter_neighbor_parcels(
|
|
neighbors: List[Dict[str, Any]],
|
|
selected_geometries: List[GeoPolylinie]
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filter neighbor parcels to exclude those that are part of the selected parcels.
|
|
|
|
Uses geometric comparison to check if neighbor parcels intersect or touch
|
|
any of the selected parcel geometries.
|
|
|
|
Args:
|
|
neighbors: List of neighbor parcel dictionaries (must have 'perimeter' or 'geometry_geojson')
|
|
selected_geometries: List of GeoPolylinie instances representing selected parcels
|
|
|
|
Returns:
|
|
Filtered list of neighbor parcels (excluding selected ones)
|
|
"""
|
|
if not neighbors or not selected_geometries:
|
|
return neighbors
|
|
|
|
# Convert selected geometries to Shapely Polygons for comparison
|
|
selected_polygons = []
|
|
for geo in selected_geometries:
|
|
try:
|
|
polygon = geopolylinie_to_shapely_polygon(geo)
|
|
if not polygon.is_empty:
|
|
selected_polygons.append(polygon)
|
|
except Exception as e:
|
|
logger.warning(f"Error converting selected geometry for filtering: {e}")
|
|
continue
|
|
|
|
if not selected_polygons:
|
|
# No valid selected geometries - return all neighbors
|
|
return neighbors
|
|
|
|
# Filter neighbors
|
|
filtered_neighbors = []
|
|
for neighbor in neighbors:
|
|
try:
|
|
# Try to get geometry from neighbor
|
|
neighbor_geometry = None
|
|
|
|
# Check for perimeter (GeoPolylinie format)
|
|
if neighbor.get("perimeter"):
|
|
perimeter = neighbor["perimeter"]
|
|
if isinstance(perimeter, dict) and perimeter.get("punkte"):
|
|
# Convert to GeoPolylinie
|
|
punkte = []
|
|
for p in perimeter["punkte"]:
|
|
punkt = GeoPunkt(
|
|
koordinatensystem=p.get("koordinatensystem", "LV95"),
|
|
x=float(p.get("x", 0)),
|
|
y=float(p.get("y", 0)),
|
|
z=p.get("z")
|
|
)
|
|
punkte.append(punkt)
|
|
neighbor_geometry = GeoPolylinie(closed=True, punkte=punkte)
|
|
|
|
# Check for geometry_geojson
|
|
elif neighbor.get("geometry_geojson"):
|
|
geo_json = neighbor["geometry_geojson"]
|
|
geometry = geo_json.get("geometry") if isinstance(geo_json, dict) else geo_json
|
|
|
|
if geometry and geometry.get("type") == "Polygon":
|
|
coordinates = geometry.get("coordinates", [])
|
|
if coordinates and len(coordinates) > 0:
|
|
ring = coordinates[0] # Outer ring
|
|
punkte = []
|
|
for coord in ring:
|
|
if len(coord) >= 2:
|
|
punkt = GeoPunkt(
|
|
koordinatensystem="LV95",
|
|
x=float(coord[0]),
|
|
y=float(coord[1]),
|
|
z=float(coord[2]) if len(coord) > 2 else None
|
|
)
|
|
punkte.append(punkt)
|
|
neighbor_geometry = GeoPolylinie(closed=True, punkte=punkte)
|
|
|
|
if not neighbor_geometry:
|
|
# No geometry available - include neighbor (can't filter without geometry)
|
|
filtered_neighbors.append(neighbor)
|
|
continue
|
|
|
|
# Convert neighbor geometry to Shapely Polygon
|
|
neighbor_polygon = geopolylinie_to_shapely_polygon(neighbor_geometry)
|
|
|
|
# Check if neighbor intersects or touches any selected parcel
|
|
is_selected = False
|
|
for selected_polygon in selected_polygons:
|
|
if neighbor_polygon.intersects(selected_polygon) or neighbor_polygon.touches(selected_polygon):
|
|
# Check if they're actually the same (within tolerance)
|
|
# If areas are very similar, it's likely the same parcel
|
|
area_diff = abs(neighbor_polygon.area - selected_polygon.area)
|
|
if area_diff < 1.0: # Less than 1 m² difference
|
|
is_selected = True
|
|
break
|
|
# Also check if one contains the other (shouldn't happen for neighbors, but check anyway)
|
|
if neighbor_polygon.contains(selected_polygon) or selected_polygon.contains(neighbor_polygon):
|
|
is_selected = True
|
|
break
|
|
|
|
if not is_selected:
|
|
filtered_neighbors.append(neighbor)
|
|
else:
|
|
logger.debug(f"Filtered out neighbor parcel {neighbor.get('id')} - part of selected parcels")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error filtering neighbor parcel {neighbor.get('id')}: {e}")
|
|
# On error, include neighbor (better to show too many than too few)
|
|
filtered_neighbors.append(neighbor)
|
|
|
|
logger.info(f"Filtered {len(neighbors)} neighbors to {len(filtered_neighbors)} (removed {len(neighbors) - len(filtered_neighbors)} selected parcels)")
|
|
return filtered_neighbors
|
|
|
|
|
|
# ===== Swisstopo Integration =====
|
|
|
|
async def fetch_parcel_polygon_from_swisstopo(
|
|
gemeinde: str,
|
|
parzellen_nr: str,
|
|
sr: int = 2056
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Holt die vollständige Polygon-Geometrie einer Parzelle von Swisstopo API.
|
|
|
|
Args:
|
|
gemeinde: Name der Gemeinde (z.B. "Bern")
|
|
parzellen_nr: Parzellennummer (z.B. "1234")
|
|
sr: Koordinatensystem (2056=LV95, 4326=WGS84)
|
|
|
|
Returns:
|
|
Dictionary mit GeoPolylinie-Format für perimeter-Feld, oder None wenn nicht gefunden
|
|
Format: {"closed": True, "punkte": [{"koordinatensystem": "LV95", "x": ..., "y": ..., "z": None}, ...]}
|
|
"""
|
|
try:
|
|
connector = SwissTopoMapServerConnector()
|
|
|
|
# Get GeoJSON feature from Swisstopo
|
|
feature = await connector.get_parcel_polygon(gemeinde, parzellen_nr, sr)
|
|
|
|
if not feature:
|
|
logger.warning(f"Parzelle {gemeinde} {parzellen_nr} nicht gefunden in Swisstopo")
|
|
return None
|
|
|
|
# Convert GeoJSON to GeoPolylinie format
|
|
geometry = feature.get("geometry", {})
|
|
if geometry.get("type") == "Polygon":
|
|
coordinates = geometry.get("coordinates", [])
|
|
if coordinates and len(coordinates) > 0:
|
|
ring = coordinates[0] # Outer ring
|
|
|
|
punkte = []
|
|
for coord in ring:
|
|
if len(coord) >= 2:
|
|
punkt = {
|
|
"koordinatensystem": "LV95" if sr == 2056 else "WGS84",
|
|
"x": coord[0], # GeoJSON: [x, y] = [easting, northing]
|
|
"y": coord[1],
|
|
"z": coord[2] if len(coord) > 2 else None
|
|
}
|
|
punkte.append(punkt)
|
|
|
|
logger.info(f"Successfully fetched polygon with {len(punkte)} points for {gemeinde} {parzellen_nr}")
|
|
|
|
return {
|
|
"closed": True,
|
|
"punkte": punkte
|
|
}
|
|
|
|
logger.warning(f"Unexpected geometry type in Swisstopo response: {geometry.get('type')}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching parcel polygon from Swisstopo: {e}", exc_info=True)
|
|
return None
|
|
|
|
|
|
# ===== Direkte Query-Ausführung (stateless) =====
|
|
|
|
async def executeDirectQuery(
|
|
currentUser: User,
|
|
mandateId: str,
|
|
queryText: str,
|
|
parameters: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute a database query directly without session management.
|
|
|
|
Args:
|
|
currentUser: Current authenticated user
|
|
mandateId: Mandate context (from RequestContext / X-Mandate-Id header)
|
|
queryText: SQL query text
|
|
parameters: Optional parameters for parameterized queries
|
|
|
|
Returns:
|
|
Dictionary containing query result (rows, columns, rowCount)
|
|
|
|
Note:
|
|
- No session or query history is saved
|
|
- Query is executed directly and result is returned
|
|
- For production, validate and sanitize queries before execution
|
|
"""
|
|
try:
|
|
logger.info(f"Executing direct query for user {currentUser.id} (mandate: {mandateId})")
|
|
logger.debug(f"Query text: {queryText}")
|
|
if parameters:
|
|
logger.debug(f"Query parameters: {parameters}")
|
|
|
|
# Execute query via Real Estate interface (stateless)
|
|
realEstateInterface = getRealEstateInterface(currentUser, mandateId=mandateId)
|
|
result = realEstateInterface.executeQuery(queryText, parameters)
|
|
|
|
logger.info(
|
|
f"Query executed successfully: {result['rowCount']} rows in {result.get('executionTime', 0):.3f}s"
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"rows": result["rows"],
|
|
"columns": result["columns"],
|
|
"rowCount": result["rowCount"],
|
|
"executionTime": result.get("executionTime", 0),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing query: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
|
|
# ===== AI-basierte Intent-Erkennung und CRUD-Operationen =====
|
|
|
|
def _formatEntitySummary(entity_type: str, items: List[Dict[str, Any]], filters: Dict[str, Any]) -> str:
|
|
"""
|
|
Format a human-readable summary of query results.
|
|
|
|
Args:
|
|
entity_type: Type of entity (Projekt, Parzelle, etc.)
|
|
items: List of entity data dictionaries
|
|
filters: Filter parameters used in the query
|
|
|
|
Returns:
|
|
Human-readable summary string
|
|
"""
|
|
if not items:
|
|
return f"Keine {entity_type} gefunden"
|
|
|
|
count = len(items)
|
|
filter_desc = ""
|
|
if filters:
|
|
# Format filter description
|
|
if "kontextGemeinde" in filters:
|
|
filter_desc = f" in {filters['kontextGemeinde']}"
|
|
elif "plz" in filters:
|
|
filter_desc = f" mit PLZ {filters['plz']}"
|
|
elif "location_filter" in filters:
|
|
filter_desc = f" in {filters['location_filter']}"
|
|
|
|
# Start with count
|
|
summary = f"Gefunden: {count} {entity_type}{filter_desc}"
|
|
|
|
# Add details based on entity type
|
|
if entity_type == "Parzelle":
|
|
summary += "\n\nDetails:"
|
|
for i, item in enumerate(items[:10], 1): # Limit to first 10
|
|
parts = []
|
|
|
|
# Add label or ID
|
|
if item.get("label"):
|
|
parts.append(f"Parzelle '{item['label']}'")
|
|
elif item.get("id"):
|
|
parts.append(f"Parzelle {item['id'][:8]}...")
|
|
|
|
# Add address
|
|
if item.get("strasseNr"):
|
|
parts.append(item["strasseNr"])
|
|
|
|
# Add PLZ and municipality
|
|
location_parts = []
|
|
if item.get("plz"):
|
|
location_parts.append(item["plz"])
|
|
if item.get("kontextGemeinde"):
|
|
location_parts.append(item["kontextGemeinde"])
|
|
if location_parts:
|
|
parts.append(" ".join(location_parts))
|
|
|
|
# Add building zone
|
|
if item.get("bauzone"):
|
|
parts.append(f"Bauzone: {item['bauzone']}")
|
|
|
|
summary += f"\n{i}. {', '.join(parts)}"
|
|
|
|
if count > 10:
|
|
summary += f"\n... und {count - 10} weitere"
|
|
|
|
elif entity_type == "Projekt":
|
|
summary += "\n\nDetails:"
|
|
for i, item in enumerate(items[:10], 1):
|
|
parts = []
|
|
|
|
# Add label
|
|
if item.get("label"):
|
|
parts.append(f"'{item['label']}'")
|
|
|
|
# Add status
|
|
if item.get("statusProzess"):
|
|
parts.append(f"Status: {item['statusProzess']}")
|
|
|
|
# Add parcel count
|
|
parzellen = item.get("parzellen", [])
|
|
if parzellen:
|
|
parts.append(f"{len(parzellen)} Parzelle(n)")
|
|
|
|
summary += f"\n{i}. {' - '.join(parts)}"
|
|
|
|
if count > 10:
|
|
summary += f"\n... und {count - 10} weitere"
|
|
|
|
elif entity_type == "Gemeinde":
|
|
summary += "\n\nDetails:"
|
|
for i, item in enumerate(items[:10], 1):
|
|
parts = []
|
|
|
|
if item.get("label"):
|
|
parts.append(item["label"])
|
|
if item.get("plz"):
|
|
parts.append(f"PLZ: {item['plz']}")
|
|
if item.get("abk"):
|
|
parts.append(f"Abk: {item['abk']}")
|
|
|
|
summary += f"\n{i}. {', '.join(parts)}"
|
|
|
|
if count > 10:
|
|
summary += f"\n... und {count - 10} weitere"
|
|
|
|
elif entity_type == "Dokument":
|
|
summary += "\n\nDetails:"
|
|
for i, item in enumerate(items[:10], 1):
|
|
parts = []
|
|
|
|
if item.get("label"):
|
|
parts.append(item["label"])
|
|
if item.get("dokumentTyp"):
|
|
parts.append(f"Typ: {item['dokumentTyp']}")
|
|
if item.get("quelle"):
|
|
parts.append(f"Quelle: {item['quelle']}")
|
|
|
|
summary += f"\n{i}. {', '.join(parts)}"
|
|
|
|
if count > 10:
|
|
summary += f"\n... und {count - 10} weitere"
|
|
|
|
else:
|
|
# Generic format for other entity types
|
|
if count <= 5:
|
|
summary += "\n\nDetails:"
|
|
for i, item in enumerate(items, 1):
|
|
label = item.get("label") or item.get("id", "")
|
|
if label:
|
|
summary += f"\n{i}. {label}"
|
|
|
|
return summary
|
|
|
|
|
|
async def processNaturalLanguageCommand(
|
|
currentUser: User,
|
|
mandateId: str,
|
|
userInput: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Process natural language user input and execute corresponding CRUD operations.
|
|
|
|
Uses AI to analyze user intent and extract parameters, then executes the appropriate
|
|
CRUD operation through the interface. Works stateless without session management.
|
|
|
|
Args:
|
|
currentUser: Current authenticated user
|
|
mandateId: Mandate context (from RequestContext / X-Mandate-Id header)
|
|
userInput: Natural language command from user
|
|
|
|
Returns:
|
|
Dictionary containing operation result and metadata
|
|
|
|
Example user inputs:
|
|
- "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
|
|
- "Zeige mir alle Projekte in Zürich"
|
|
- "Aktualisiere Projekt XYZ mit Status 'Planung'"
|
|
- "Lösche Parzelle ABC"
|
|
- "SELECT * FROM Projekt WHERE plz = '8000'"
|
|
"""
|
|
try:
|
|
logger.info(f"Processing natural language command for user {currentUser.id} (mandate: {mandateId})")
|
|
logger.debug(f"User input: {userInput}")
|
|
|
|
# Initialize services for AI access
|
|
services = getServices(currentUser, workflow=None, mandateId=mandateId)
|
|
aiService = services.ai
|
|
|
|
# Step 1: Analyze user intent with AI
|
|
intentAnalysis = await analyzeUserIntent(aiService, userInput)
|
|
|
|
logger.info(f"Intent analysis result: intent={intentAnalysis.get('intent')}, entity={intentAnalysis.get('entity')}")
|
|
|
|
# Step 2: Execute CRUD operation based on intent
|
|
result = await executeIntentBasedOperation(
|
|
currentUser=currentUser,
|
|
mandateId=mandateId,
|
|
intent=intentAnalysis["intent"],
|
|
entity=intentAnalysis.get("entity"),
|
|
parameters=intentAnalysis.get("parameters", {}),
|
|
)
|
|
|
|
# Build user-friendly response
|
|
response = {
|
|
"success": True,
|
|
"intent": intentAnalysis["intent"],
|
|
"entity": intentAnalysis.get("entity"),
|
|
"result": result,
|
|
}
|
|
|
|
# Add human-readable summary for operations
|
|
if intentAnalysis["intent"] == "CREATE" and isinstance(result, dict):
|
|
# Add confirmation message for CREATE operations
|
|
operation_result = result.get("result")
|
|
if isinstance(operation_result, dict):
|
|
entity_name = intentAnalysis.get('entity', 'Eintrag')
|
|
label = operation_result.get("label", operation_result.get("id", ""))
|
|
|
|
# Build detailed message
|
|
msg_parts = [f"✅ {entity_name} '{label}' erfolgreich erstellt"]
|
|
|
|
if entity_name == "Parzelle":
|
|
if operation_result.get("plz"):
|
|
msg_parts.append(f"PLZ: {operation_result['plz']}")
|
|
if operation_result.get("kontextGemeinde"):
|
|
msg_parts.append(f"Gemeinde: {operation_result['kontextGemeinde']}")
|
|
if operation_result.get("bauzone"):
|
|
msg_parts.append(f"Bauzone: {operation_result['bauzone']}")
|
|
|
|
kontext_items = operation_result.get("kontextInformationen", [])
|
|
if kontext_items:
|
|
msg_parts.append(f"\n📋 {len(kontext_items)} Kontextinformationen gespeichert:")
|
|
for kontext in kontext_items[:5]: # Show first 5
|
|
thema = kontext.get("thema", "")
|
|
inhalt = kontext.get("inhalt", "")
|
|
if thema and inhalt:
|
|
msg_parts.append(f" • {thema}: {inhalt}")
|
|
if len(kontext_items) > 5:
|
|
msg_parts.append(f" • ... und {len(kontext_items) - 5} weitere")
|
|
|
|
elif entity_name == "Projekt":
|
|
if operation_result.get("statusProzess"):
|
|
msg_parts.append(f"Status: {operation_result['statusProzess']}")
|
|
parzellen = operation_result.get("parzellen", [])
|
|
if parzellen:
|
|
msg_parts.append(f"{len(parzellen)} Parzelle(n)")
|
|
|
|
response["message"] = "\n".join(msg_parts)
|
|
|
|
elif intentAnalysis["intent"] == "READ" and isinstance(result, dict):
|
|
operation_result = result.get("result")
|
|
if isinstance(operation_result, list):
|
|
response["count"] = len(operation_result)
|
|
entity_name = intentAnalysis.get('entity', 'Einträge')
|
|
|
|
if len(operation_result) == 0:
|
|
# Provide helpful message for empty results
|
|
filter_info = intentAnalysis.get('parameters', {})
|
|
if filter_info:
|
|
filter_desc = ", ".join([f"{k}={v}" for k, v in filter_info.items()])
|
|
response["message"] = f"Keine {entity_name} gefunden mit Filter: {filter_desc}. Möglicherweise sind noch keine Daten vorhanden oder der Filter ist zu spezifisch."
|
|
else:
|
|
response["message"] = f"Keine {entity_name} vorhanden. Erstellen Sie zuerst neue Einträge."
|
|
else:
|
|
# Create detailed summary based on entity type
|
|
response["message"] = _formatEntitySummary(
|
|
entity_name,
|
|
operation_result,
|
|
intentAnalysis.get('parameters', {})
|
|
)
|
|
elif isinstance(operation_result, dict):
|
|
response["count"] = 1
|
|
# Format single entity
|
|
entity_name = intentAnalysis.get('entity', 'Eintrag')
|
|
response["message"] = _formatEntitySummary(entity_name, [operation_result], {})
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing natural language command: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
|
|
async def analyzeUserIntent(
|
|
aiService,
|
|
userInput: str
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Use AI to analyze user input and extract intent, entity, and parameters.
|
|
|
|
Args:
|
|
aiService: AI service instance
|
|
userInput: Natural language user input
|
|
|
|
Returns:
|
|
Dictionary with 'intent', 'entity', and 'parameters'
|
|
"""
|
|
# Create a structured prompt for intent analysis with accurate field information
|
|
intentPrompt = f"""
|
|
Analyze the following user command and extract the intent, entity, and parameters.
|
|
|
|
User Command: "{userInput}"
|
|
|
|
Available intents:
|
|
- CREATE: User wants to create a new entity
|
|
- READ: User wants to read/query entities
|
|
- UPDATE: User wants to update an existing entity
|
|
- DELETE: User wants to delete an entity
|
|
- QUERY: User wants to execute a database query (SQL statements)
|
|
|
|
Available entities and their fields:
|
|
|
|
**Projekt** (Real estate project):
|
|
- id: string (primary key)
|
|
- mandateId: string (mandate ID)
|
|
- label: string (project designation/name)
|
|
- statusProzess: string enum (Eingang, Analyse, Studie, Planung, Baurechtsverfahren, Umsetzung, Archiv)
|
|
- perimeter: GeoPolylinie (geographic boundary, JSONB)
|
|
- baulinie: GeoPolylinie (building line, JSONB)
|
|
- parzellen: List[Parzelle] (plots belonging to project, JSONB)
|
|
- dokumente: List[Dokument] (documents, JSONB)
|
|
- kontextInformationen: List[Kontext] (context info, JSONB)
|
|
|
|
**Parzelle** (Plot/parcel):
|
|
- id: string (primary key)
|
|
- mandateId: string (mandate ID)
|
|
- label: string (plot designation)
|
|
- strasseNr: string (street and house number)
|
|
- plz: string (postal code)
|
|
- kontextGemeinde: string (municipality ID, Foreign Key to Gemeinde table)
|
|
- bauzone: string (building zone, e.g. W3, WG2)
|
|
- az: float (Ausnützungsziffer)
|
|
- bz: float (Bebauungsziffer)
|
|
- vollgeschossZahl: int (number of allowed full floors)
|
|
- gebaeudehoeheMax: float (maximum building height in meters)
|
|
- laermschutzzone: string (noise protection zone)
|
|
- hochwasserschutzzone: string (flood protection zone)
|
|
- grundwasserschutzzone: string (groundwater protection zone)
|
|
- parzelleBebaut: JaNein enum (is plot built)
|
|
- parzelleErschlossen: JaNein enum (is plot developed)
|
|
- parzelleHanglage: JaNein enum (is plot on slope)
|
|
- kontextInformationen: List[Kontext] (metadata - each item has 'thema' and 'inhalt' fields only)
|
|
|
|
**Kontext** (Context information for metadata):
|
|
- thema: string (topic/subject, e.g. "EGRID", "Fläche", "Zentrum")
|
|
- inhalt: string (content as text, e.g. "CH887199917793", "6514.99 m²", "X: 123, Y: 456")
|
|
|
|
**Important relationships:**
|
|
- Projekte contain Parzellen (projects have plots)
|
|
- Parzelle links to Gemeinde (via kontextGemeinde)
|
|
- Gemeinde links to Kanton (via id_kanton)
|
|
- Kanton links to Land (via id_land)
|
|
- Location queries (city, postal code) should use Parzelle.kontextGemeinde (municipality name will be resolved to ID)
|
|
- Projekt does NOT have location fields directly - location is stored in associated Parzellen
|
|
|
|
Return a JSON object with the following structure:
|
|
{{
|
|
"intent": "CREATE|READ|UPDATE|DELETE|QUERY",
|
|
"entity": "Projekt|Parzelle|Dokument|Kanton|Gemeinde|null",
|
|
"parameters": {{
|
|
// Extracted parameters from user input
|
|
// For CREATE/UPDATE: include all relevant fields using EXACT field names from above
|
|
// For READ: include filter criteria using EXACT field names (id, label, plz, kontextGemeinde, etc.)
|
|
// For DELETE: include entity ID if mentioned
|
|
// For QUERY: include queryText if SQL is detected
|
|
// IMPORTANT: Use only field names that exist in the entity definition above
|
|
}},
|
|
"confidence": 0.0-1.0 // Confidence score for the analysis
|
|
}}
|
|
|
|
Examples:
|
|
- Input: "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
|
|
Output: {{"intent": "CREATE", "entity": "Projekt", "parameters": {{"label": "Hauptstrasse 42"}}, "confidence": 0.95}}
|
|
|
|
- Input: "Erstelle eine Parzelle mit Label 123, PLZ 8000, Gemeinde Zürich, Bauzone W3"
|
|
Output: {{"intent": "CREATE", "entity": "Parzelle", "parameters": {{"label": "123", "plz": "8000", "kontextGemeinde": "Zürich", "bauzone": "W3"}}, "confidence": 0.95}}
|
|
|
|
- Input: "Parzellen-Informationen: ID:AA1704, Nummer:AA1704, EGRID:CH887199917793, Kanton:ZH, Gemeinde:Zürich, Gemeinde-Code:261, Fläche:6514.99 m², Zentrum:2682951.44,1247622.91"
|
|
Output: {{
|
|
"intent": "CREATE",
|
|
"entity": "Parzelle",
|
|
"parameters": {{
|
|
"label": "AA1704",
|
|
"parzellenAliasTags": ["AA1704"],
|
|
"kontextGemeinde": "Zürich",
|
|
"kontextInformationen": [
|
|
{{"thema": "EGRID", "inhalt": "CH887199917793"}},
|
|
{{"thema": "Kanton", "inhalt": "ZH"}},
|
|
{{"thema": "BFS-Nummer", "inhalt": "261"}},
|
|
{{"thema": "Fläche", "inhalt": "6514.99 m²"}},
|
|
{{"thema": "Zentrum (LV95)", "inhalt": "X: 2682951.44 m, Y: 1247622.91 m (EPSG:2056)"}}
|
|
]
|
|
}},
|
|
"confidence": 0.9
|
|
}}
|
|
Note: Extract structured data from detailed input. Use kontextInformationen for metadata. Each item has 'thema' (topic) and 'inhalt' (content as text).
|
|
|
|
- Input: "Zeige mir alle Projekte"
|
|
Output: {{"intent": "READ", "entity": "Projekt", "parameters": {{}}, "confidence": 0.9}}
|
|
|
|
- Input: "Zeige mir Projekte in Zürich" or "Wie viele Projekte in Zürich"
|
|
Output: {{"intent": "READ", "entity": "Projekt", "parameters": {{"location_filter": "Zürich"}}, "confidence": 0.9}}
|
|
Note: For project location queries, use Projekt entity with location_filter parameter
|
|
|
|
- Input: "Zeige mir Parzellen mit PLZ 8000"
|
|
Output: {{"intent": "READ", "entity": "Parzelle", "parameters": {{"plz": "8000"}}, "confidence": 0.95}}
|
|
|
|
- Input: "Aktualisiere Projekt XYZ mit Status 'Planung'"
|
|
Output: {{"intent": "UPDATE", "entity": "Projekt", "parameters": {{"id": "XYZ", "statusProzess": "Planung"}}, "confidence": 0.85}}
|
|
|
|
- Input: "SELECT * FROM Projekt WHERE label = 'Test'"
|
|
Output: {{"intent": "QUERY", "entity": null, "parameters": {{"queryText": "SELECT * FROM Projekt WHERE label = 'Test'", "queryType": "sql"}}, "confidence": 1.0}}
|
|
|
|
- Input: "Lösche Parzelle ABC"
|
|
Output: {{"intent": "DELETE", "entity": "Parzelle", "parameters": {{"id": "ABC"}}, "confidence": 0.9}}
|
|
|
|
IMPORTANT EXTRACTION RULES:
|
|
1. For CREATE operations, extract ALL mentioned data fields from the user input
|
|
2. Use kontextInformationen array for metadata that doesn't have dedicated fields (EGRID, BFS numbers, area, coordinates, etc.)
|
|
3. Each kontextInformationen item MUST have exactly two fields: 'thema' (topic/subject) and 'inhalt' (content as text string)
|
|
4. Format kontextInformationen values as readable text strings, including units (e.g., "6514.99 m²", "X: 123, Y: 456")
|
|
5. Match field names EXACTLY to the entity definition above
|
|
6. Convert data types correctly (strings for text, numbers for numeric values)
|
|
7. Extract coordinates, areas, and other numeric values from text
|
|
8. When multiple values are mentioned for the same concept (ID, Nummer, Name), use the most relevant one for 'label' and put alternatives in parzellenAliasTags
|
|
"""
|
|
|
|
try:
|
|
# Use AI planning call for structured JSON response
|
|
response = await aiService.callAiPlanning(
|
|
prompt=intentPrompt,
|
|
debugType="intentanalysis"
|
|
)
|
|
|
|
# Extract JSON from response (handles markdown code blocks)
|
|
jsonStart = response.find('{')
|
|
jsonEnd = response.rfind('}') + 1
|
|
|
|
if jsonStart == -1 or jsonEnd == 0:
|
|
raise ValueError("No JSON found in AI response")
|
|
|
|
jsonStr = response[jsonStart:jsonEnd]
|
|
|
|
# Parse JSON response
|
|
intentData = json.loads(jsonStr)
|
|
|
|
# Validate response structure
|
|
if "intent" not in intentData:
|
|
raise ValueError("Invalid intent analysis response: missing 'intent' field")
|
|
|
|
# Ensure parameters exists
|
|
if "parameters" not in intentData:
|
|
intentData["parameters"] = {}
|
|
|
|
logger.debug(f"Parsed intent analysis: {intentData}")
|
|
|
|
return intentData
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse AI intent analysis response: {e}")
|
|
logger.error(f"Raw response: {response}")
|
|
raise ValueError(f"AI returned invalid JSON: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing user intent: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
|
|
async def executeIntentBasedOperation(
|
|
currentUser: User,
|
|
mandateId: str,
|
|
intent: str,
|
|
entity: Optional[str],
|
|
parameters: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute CRUD operation based on analyzed intent.
|
|
|
|
Args:
|
|
currentUser: Current authenticated user
|
|
mandateId: Mandate context (from RequestContext / X-Mandate-Id header)
|
|
intent: Intent from AI analysis (CREATE, READ, UPDATE, DELETE, QUERY)
|
|
entity: Entity type from AI analysis
|
|
parameters: Extracted parameters from AI analysis
|
|
|
|
Returns:
|
|
Operation result
|
|
|
|
Note:
|
|
- Supports CREATE, READ, UPDATE, DELETE, QUERY intents
|
|
- Entity types: Projekt, Parzelle, Gemeinde, Kanton, Land, Dokument
|
|
"""
|
|
try:
|
|
logger.info(f"Executing intent-based operation: intent={intent}, entity={entity}")
|
|
logger.debug(f"Parameters: {parameters}")
|
|
|
|
if intent == "QUERY":
|
|
# Execute database query directly (stateless)
|
|
queryText = parameters.get("queryText", "")
|
|
|
|
if not queryText:
|
|
raise ValueError("QUERY intent requires queryText in parameters")
|
|
|
|
result = await executeDirectQuery(
|
|
currentUser=currentUser,
|
|
mandateId=mandateId,
|
|
queryText=queryText,
|
|
parameters=parameters.get("queryParameters"),
|
|
)
|
|
return result
|
|
|
|
elif intent == "CREATE":
|
|
# Create new entity
|
|
realEstateInterface = getRealEstateInterface(currentUser, mandateId=mandateId)
|
|
|
|
if entity == "Projekt":
|
|
# Create Projekt from parameters
|
|
projekt = Projekt(
|
|
mandateId=mandateId,
|
|
label=parameters.get("label", ""),
|
|
statusProzess=StatusProzess(parameters.get("statusProzess", "EINGANG")) if parameters.get("statusProzess") else None,
|
|
)
|
|
created = realEstateInterface.createProjekt(projekt)
|
|
return {
|
|
"operation": "CREATE",
|
|
"entity": "Projekt",
|
|
"result": created.model_dump()
|
|
}
|
|
|
|
elif entity == "Parzelle":
|
|
# Create Parzelle from parameters
|
|
# Import Kontext for kontextInformationen
|
|
from modules.datamodels.datamodelRealEstate import Kontext, GeoPolylinie
|
|
|
|
# Build parzelle data with all extracted parameters
|
|
parzelle_data = {
|
|
"mandateId": mandateId,
|
|
"label": parameters.get("label", ""),
|
|
}
|
|
|
|
# Add optional fields if present
|
|
optional_fields = [
|
|
"parzellenAliasTags", "eigentuemerschaft", "strasseNr", "plz",
|
|
"bauzone", "az", "bz", "vollgeschossZahl", "anrechenbarDachgeschoss",
|
|
"anrechenbarUntergeschoss", "gebaeudehoeheMax", "kontextGemeinde",
|
|
"regelnGrenzabstand", "regelnMehrlaengenzuschlag", "regelnMehrhoehenzuschlag",
|
|
"parzelleBebaut", "parzelleErschlossen", "parzelleHanglage",
|
|
"laermschutzzone", "hochwasserschutzzone", "grundwasserschutzzone"
|
|
]
|
|
|
|
for field in optional_fields:
|
|
if field in parameters and parameters[field] is not None:
|
|
parzelle_data[field] = parameters[field]
|
|
|
|
# Handle complex objects
|
|
if "perimeter" in parameters and parameters["perimeter"]:
|
|
parzelle_data["perimeter"] = GeoPolylinie(**parameters["perimeter"])
|
|
elif "kontextGemeinde" in parameters and parameters.get("kontextGemeinde"):
|
|
# Try to fetch polygon from Swisstopo if gemeinde and parzellen_nr are available
|
|
gemeinde = parameters.get("kontextGemeinde")
|
|
parzellen_nr = parameters.get("label") or parameters.get("parzellen_nr") or parameters.get("parzellennummer")
|
|
|
|
if gemeinde and parzellen_nr:
|
|
logger.info(f"Attempting to fetch polygon from Swisstopo for {gemeinde} {parzellen_nr}")
|
|
try:
|
|
# Try to resolve gemeinde name if it's an ID
|
|
gemeinde_name = gemeinde
|
|
if len(gemeinde) == 36: # UUID format
|
|
# Try to get gemeinde name from interface (realEstateInterface already initialized above)
|
|
gemeinde_obj = realEstateInterface.getGemeinde(gemeinde)
|
|
if gemeinde_obj:
|
|
gemeinde_name = gemeinde_obj.label
|
|
|
|
polygon_data = await fetch_parcel_polygon_from_swisstopo(
|
|
gemeinde=gemeinde_name,
|
|
parzellen_nr=str(parzellen_nr),
|
|
sr=2056
|
|
)
|
|
|
|
if polygon_data:
|
|
parzelle_data["perimeter"] = GeoPolylinie(**polygon_data)
|
|
logger.info(f"Successfully fetched and set perimeter from Swisstopo")
|
|
else:
|
|
logger.warning(f"Could not fetch polygon from Swisstopo for {gemeinde_name} {parzellen_nr}")
|
|
except Exception as e:
|
|
logger.warning(f"Error fetching polygon from Swisstopo (continuing without): {e}")
|
|
|
|
if "baulinie" in parameters and parameters["baulinie"]:
|
|
parzelle_data["baulinie"] = GeoPolylinie(**parameters["baulinie"])
|
|
|
|
# Handle kontextInformationen (convert dicts to Kontext objects)
|
|
if "kontextInformationen" in parameters and parameters["kontextInformationen"]:
|
|
kontext_list = []
|
|
for kontext_data in parameters["kontextInformationen"]:
|
|
if isinstance(kontext_data, dict):
|
|
# Ensure only thema and inhalt are passed (Kontext model only has these fields)
|
|
kontext_obj = Kontext(
|
|
thema=kontext_data.get("thema", ""),
|
|
inhalt=kontext_data.get("inhalt", "")
|
|
)
|
|
kontext_list.append(kontext_obj)
|
|
else:
|
|
kontext_list.append(kontext_data)
|
|
parzelle_data["kontextInformationen"] = kontext_list
|
|
|
|
parzelle = Parzelle(**parzelle_data)
|
|
created = realEstateInterface.createParzelle(parzelle)
|
|
|
|
logger.info(f"Created Parzelle '{created.label}' with {len(created.kontextInformationen)} context items")
|
|
|
|
return {
|
|
"operation": "CREATE",
|
|
"entity": "Parzelle",
|
|
"result": created.model_dump()
|
|
}
|
|
elif entity == "Gemeinde":
|
|
# Create Gemeinde from parameters
|
|
from modules.datamodels.datamodelRealEstate import Gemeinde
|
|
gemeinde = Gemeinde(
|
|
mandateId=mandateId,
|
|
label=parameters.get("label", ""),
|
|
id_kanton=parameters.get("id_kanton"),
|
|
plz=parameters.get("plz"),
|
|
)
|
|
created = realEstateInterface.createGemeinde(gemeinde)
|
|
return {
|
|
"operation": "CREATE",
|
|
"entity": "Gemeinde",
|
|
"result": created.model_dump()
|
|
}
|
|
elif entity == "Kanton":
|
|
# Create Kanton from parameters
|
|
from modules.datamodels.datamodelRealEstate import Kanton
|
|
kanton = Kanton(
|
|
mandateId=mandateId,
|
|
label=parameters.get("label", ""),
|
|
id_land=parameters.get("id_land"),
|
|
abk=parameters.get("abk"),
|
|
)
|
|
created = realEstateInterface.createKanton(kanton)
|
|
return {
|
|
"operation": "CREATE",
|
|
"entity": "Kanton",
|
|
"result": created.model_dump()
|
|
}
|
|
elif entity == "Land":
|
|
# Create Land from parameters
|
|
from modules.datamodels.datamodelRealEstate import Land
|
|
land = Land(
|
|
mandateId=mandateId,
|
|
label=parameters.get("label", ""),
|
|
abk=parameters.get("abk"),
|
|
)
|
|
created = realEstateInterface.createLand(land)
|
|
return {
|
|
"operation": "CREATE",
|
|
"entity": "Land",
|
|
"result": created.model_dump()
|
|
}
|
|
elif entity == "Dokument":
|
|
# Create Dokument from parameters
|
|
from modules.datamodels.datamodelRealEstate import Dokument
|
|
dokument = Dokument(
|
|
mandateId=mandateId,
|
|
label=parameters.get("label", ""),
|
|
dokumentReferenz=parameters.get("dokumentReferenz", ""),
|
|
versionsbezeichnung=parameters.get("versionsbezeichnung"),
|
|
dokumentTyp=parameters.get("dokumentTyp"),
|
|
quelle=parameters.get("quelle"),
|
|
mimeType=parameters.get("mimeType"),
|
|
)
|
|
created = realEstateInterface.createDokument(dokument)
|
|
return {
|
|
"operation": "CREATE",
|
|
"entity": "Dokument",
|
|
"result": created.model_dump()
|
|
}
|
|
else:
|
|
raise ValueError(f"CREATE operation not supported for entity: {entity}")
|
|
|
|
elif intent == "READ":
|
|
# Read entities
|
|
realEstateInterface = getRealEstateInterface(currentUser)
|
|
|
|
if entity == "Projekt":
|
|
projektId = parameters.get("id")
|
|
if projektId:
|
|
# Get single Projekt by ID
|
|
projekt = realEstateInterface.getProjekt(projektId)
|
|
if not projekt:
|
|
raise ValueError(f"Projekt {projektId} not found")
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Projekt",
|
|
"result": projekt.model_dump()
|
|
}
|
|
else:
|
|
# List all Projekte (with optional filters)
|
|
# Validate filter fields against Projekt model
|
|
validProjektFields = {"id", "mandateId", "label", "statusProzess"}
|
|
recordFilter = {
|
|
k: v for k, v in parameters.items()
|
|
if k != "id" and k in validProjektFields
|
|
}
|
|
|
|
# Handle location_filter specially (filter projects by parcel location)
|
|
location_filter = parameters.get("location_filter")
|
|
|
|
# Get all projects first
|
|
projekte = realEstateInterface.getProjekte(recordFilter=recordFilter if recordFilter else None)
|
|
|
|
# If location filter is present, filter by parcels in that location
|
|
if location_filter:
|
|
logger.info(f"Filtering projects by location: {location_filter}")
|
|
|
|
# Try to resolve location to Gemeinde ID for UUID comparison
|
|
location_id = None
|
|
try:
|
|
# Check if it's already a UUID
|
|
import re
|
|
uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.IGNORECASE)
|
|
if not uuid_pattern.match(location_filter):
|
|
# Try to resolve name to ID
|
|
gemeinde_records = realEstateInterface.getGemeinden(recordFilter={"label": location_filter})
|
|
if gemeinde_records:
|
|
location_id = gemeinde_records[0].id
|
|
logger.debug(f"Resolved location '{location_filter}' to ID '{location_id}'")
|
|
except Exception as e:
|
|
logger.debug(f"Could not resolve location filter: {e}")
|
|
|
|
filtered_projekte = []
|
|
|
|
for projekt in projekte:
|
|
# Check if any parcel in the project matches the location
|
|
for parzelle in projekt.parzellen:
|
|
# Check kontextGemeinde (both UUID and string), plz, or strasseNr for location match
|
|
location_lower = location_filter.lower()
|
|
matches = False
|
|
|
|
# Check if kontextGemeinde matches (as UUID or string)
|
|
if parzelle.kontextGemeinde:
|
|
if (parzelle.kontextGemeinde == location_id or # UUID match
|
|
parzelle.kontextGemeinde == location_filter or # Exact match
|
|
location_lower in parzelle.kontextGemeinde.lower()): # Partial string match
|
|
matches = True
|
|
|
|
# Check PLZ or address
|
|
if not matches and (
|
|
(parzelle.plz and location_lower in parzelle.plz) or
|
|
(parzelle.strasseNr and location_lower in parzelle.strasseNr.lower())
|
|
):
|
|
matches = True
|
|
|
|
if matches:
|
|
filtered_projekte.append(projekt)
|
|
break # Found a matching parcel, no need to check more
|
|
|
|
projekte = filtered_projekte
|
|
logger.info(f"Found {len(projekte)} projects in location '{location_filter}'")
|
|
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Projekt",
|
|
"result": [p.model_dump() for p in projekte],
|
|
"count": len(projekte)
|
|
}
|
|
elif entity == "Parzelle":
|
|
parzelleId = parameters.get("id")
|
|
if parzelleId:
|
|
# Get single Parzelle by ID
|
|
parzelle = realEstateInterface.getParzelle(parzelleId)
|
|
if not parzelle:
|
|
raise ValueError(f"Parzelle {parzelleId} not found")
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Parzelle",
|
|
"result": parzelle.model_dump()
|
|
}
|
|
else:
|
|
# List all Parzellen (with optional filters)
|
|
# Validate filter fields against Parzelle model
|
|
# Note: kontextKanton and kontextLand are NOT direct fields on Parzelle
|
|
# Parzelle links to Gemeinde, Gemeinde links to Kanton, Kanton links to Land
|
|
validParzelleFields = {
|
|
"id", "mandateId", "label", "strasseNr", "plz",
|
|
"kontextGemeinde", # Only direct link - Gemeinde → Kanton → Land
|
|
"bauzone", "az", "bz", "vollgeschossZahl", "gebaeudehoeheMax",
|
|
"laermschutzzone", "hochwasserschutzzone", "grundwasserschutzzone",
|
|
"parzelleBebaut", "parzelleErschlossen", "parzelleHanglage"
|
|
}
|
|
recordFilter = {
|
|
k: v for k, v in parameters.items()
|
|
if k != "id" and k in validParzelleFields
|
|
}
|
|
# Warn about invalid fields
|
|
invalidFields = {k: v for k, v in parameters.items() if k not in validParzelleFields and k != "id"}
|
|
if invalidFields:
|
|
logger.warning(f"Invalid filter fields for Parzelle ignored: {list(invalidFields.keys())}")
|
|
|
|
parzellen = realEstateInterface.getParzellen(recordFilter=recordFilter if recordFilter else None)
|
|
|
|
# Debug logging for empty results
|
|
if not parzellen and recordFilter:
|
|
logger.info(f"No Parzellen found matching filter: {recordFilter}")
|
|
# Get total count to help debug
|
|
all_parzellen = realEstateInterface.getParzellen(recordFilter=None)
|
|
logger.info(f"Total Parzellen in database: {len(all_parzellen)}")
|
|
if all_parzellen:
|
|
# Show some sample kontextGemeinde values
|
|
sample_gemeinden = set()
|
|
for p in all_parzellen[:10]:
|
|
if p.kontextGemeinde:
|
|
sample_gemeinden.add(p.kontextGemeinde)
|
|
logger.info(f"Sample kontextGemeinde values in database: {sample_gemeinden}")
|
|
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Parzelle",
|
|
"result": [p.model_dump() for p in parzellen],
|
|
"count": len(parzellen)
|
|
}
|
|
elif entity == "Gemeinde":
|
|
from modules.datamodels.datamodelRealEstate import Gemeinde
|
|
gemeindeId = parameters.get("id")
|
|
if gemeindeId:
|
|
gemeinde = realEstateInterface.getGemeinde(gemeindeId)
|
|
if not gemeinde:
|
|
raise ValueError(f"Gemeinde {gemeindeId} not found")
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Gemeinde",
|
|
"result": gemeinde.model_dump()
|
|
}
|
|
else:
|
|
recordFilter = {k: v for k, v in parameters.items() if k != "id"}
|
|
gemeinden = realEstateInterface.getGemeinden(recordFilter=recordFilter if recordFilter else None)
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Gemeinde",
|
|
"result": [g.model_dump() for g in gemeinden],
|
|
"count": len(gemeinden)
|
|
}
|
|
elif entity == "Kanton":
|
|
from modules.datamodels.datamodelRealEstate import Kanton
|
|
kantonId = parameters.get("id")
|
|
if kantonId:
|
|
kanton = realEstateInterface.getKanton(kantonId)
|
|
if not kanton:
|
|
raise ValueError(f"Kanton {kantonId} not found")
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Kanton",
|
|
"result": kanton.model_dump()
|
|
}
|
|
else:
|
|
recordFilter = {k: v for k, v in parameters.items() if k != "id"}
|
|
kantone = realEstateInterface.getKantone(recordFilter=recordFilter if recordFilter else None)
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Kanton",
|
|
"result": [k.model_dump() for k in kantone],
|
|
"count": len(kantone)
|
|
}
|
|
elif entity == "Land":
|
|
from modules.datamodels.datamodelRealEstate import Land
|
|
landId = parameters.get("id")
|
|
if landId:
|
|
land = realEstateInterface.getLand(landId)
|
|
if not land:
|
|
raise ValueError(f"Land {landId} not found")
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Land",
|
|
"result": land.model_dump()
|
|
}
|
|
else:
|
|
recordFilter = {k: v for k, v in parameters.items() if k != "id"}
|
|
laender = realEstateInterface.getLaender(recordFilter=recordFilter if recordFilter else None)
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Land",
|
|
"result": [l.model_dump() for l in laender],
|
|
"count": len(laender)
|
|
}
|
|
elif entity == "Dokument":
|
|
from modules.datamodels.datamodelRealEstate import Dokument
|
|
dokumentId = parameters.get("id")
|
|
if dokumentId:
|
|
dokument = realEstateInterface.getDokument(dokumentId)
|
|
if not dokument:
|
|
raise ValueError(f"Dokument {dokumentId} not found")
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Dokument",
|
|
"result": dokument.model_dump()
|
|
}
|
|
else:
|
|
recordFilter = {k: v for k, v in parameters.items() if k != "id"}
|
|
dokumente = realEstateInterface.getDokumente(recordFilter=recordFilter if recordFilter else None)
|
|
return {
|
|
"operation": "READ",
|
|
"entity": "Dokument",
|
|
"result": [d.model_dump() for d in dokumente],
|
|
"count": len(dokumente)
|
|
}
|
|
else:
|
|
raise ValueError(f"READ operation not supported for entity: {entity}")
|
|
|
|
elif intent == "UPDATE":
|
|
# Update existing entity
|
|
realEstateInterface = getRealEstateInterface(currentUser)
|
|
|
|
if entity == "Projekt":
|
|
projektId = parameters.get("id")
|
|
if not projektId:
|
|
raise ValueError("UPDATE operation requires entity ID")
|
|
|
|
# Get existing projekt
|
|
projekt = realEstateInterface.getProjekt(projektId)
|
|
if not projekt:
|
|
raise ValueError(f"Projekt {projektId} not found")
|
|
|
|
# Update fields
|
|
updateData = {k: v for k, v in parameters.items() if k != "id"}
|
|
updated = realEstateInterface.updateProjekt(projektId, updateData)
|
|
return {
|
|
"operation": "UPDATE",
|
|
"entity": "Projekt",
|
|
"result": updated.model_dump()
|
|
}
|
|
elif entity == "Parzelle":
|
|
parzelleId = parameters.get("id")
|
|
if not parzelleId:
|
|
raise ValueError("UPDATE operation requires entity ID")
|
|
|
|
# Get existing parzelle
|
|
parzelle = realEstateInterface.getParzelle(parzelleId)
|
|
if not parzelle:
|
|
raise ValueError(f"Parzelle {parzelleId} not found")
|
|
|
|
# Update fields
|
|
updateData = {k: v for k, v in parameters.items() if k != "id"}
|
|
updated = realEstateInterface.updateParzelle(parzelleId, updateData)
|
|
return {
|
|
"operation": "UPDATE",
|
|
"entity": "Parzelle",
|
|
"result": updated.model_dump()
|
|
}
|
|
elif entity == "Gemeinde":
|
|
from modules.datamodels.datamodelRealEstate import Gemeinde
|
|
gemeindeId = parameters.get("id")
|
|
if not gemeindeId:
|
|
raise ValueError("UPDATE operation requires entity ID")
|
|
|
|
gemeinde = realEstateInterface.getGemeinde(gemeindeId)
|
|
if not gemeinde:
|
|
raise ValueError(f"Gemeinde {gemeindeId} not found")
|
|
|
|
updateData = {k: v for k, v in parameters.items() if k != "id"}
|
|
updated = realEstateInterface.updateGemeinde(gemeindeId, updateData)
|
|
return {
|
|
"operation": "UPDATE",
|
|
"entity": "Gemeinde",
|
|
"result": updated.model_dump()
|
|
}
|
|
elif entity == "Kanton":
|
|
from modules.datamodels.datamodelRealEstate import Kanton
|
|
kantonId = parameters.get("id")
|
|
if not kantonId:
|
|
raise ValueError("UPDATE operation requires entity ID")
|
|
|
|
kanton = realEstateInterface.getKanton(kantonId)
|
|
if not kanton:
|
|
raise ValueError(f"Kanton {kantonId} not found")
|
|
|
|
updateData = {k: v for k, v in parameters.items() if k != "id"}
|
|
updated = realEstateInterface.updateKanton(kantonId, updateData)
|
|
return {
|
|
"operation": "UPDATE",
|
|
"entity": "Kanton",
|
|
"result": updated.model_dump()
|
|
}
|
|
elif entity == "Land":
|
|
from modules.datamodels.datamodelRealEstate import Land
|
|
landId = parameters.get("id")
|
|
if not landId:
|
|
raise ValueError("UPDATE operation requires entity ID")
|
|
|
|
land = realEstateInterface.getLand(landId)
|
|
if not land:
|
|
raise ValueError(f"Land {landId} not found")
|
|
|
|
updateData = {k: v for k, v in parameters.items() if k != "id"}
|
|
updated = realEstateInterface.updateLand(landId, updateData)
|
|
return {
|
|
"operation": "UPDATE",
|
|
"entity": "Land",
|
|
"result": updated.model_dump()
|
|
}
|
|
elif entity == "Dokument":
|
|
from modules.datamodels.datamodelRealEstate import Dokument
|
|
dokumentId = parameters.get("id")
|
|
if not dokumentId:
|
|
raise ValueError("UPDATE operation requires entity ID")
|
|
|
|
dokument = realEstateInterface.getDokument(dokumentId)
|
|
if not dokument:
|
|
raise ValueError(f"Dokument {dokumentId} not found")
|
|
|
|
updateData = {k: v for k, v in parameters.items() if k != "id"}
|
|
updated = realEstateInterface.updateDokument(dokumentId, updateData)
|
|
return {
|
|
"operation": "UPDATE",
|
|
"entity": "Dokument",
|
|
"result": updated.model_dump()
|
|
}
|
|
else:
|
|
raise ValueError(f"UPDATE operation not supported for entity: {entity}")
|
|
|
|
elif intent == "DELETE":
|
|
# Delete entity
|
|
realEstateInterface = getRealEstateInterface(currentUser)
|
|
|
|
if entity == "Projekt":
|
|
projektId = parameters.get("id")
|
|
if not projektId:
|
|
raise ValueError("DELETE operation requires entity ID")
|
|
|
|
success = realEstateInterface.deleteProjekt(projektId)
|
|
return {
|
|
"operation": "DELETE",
|
|
"entity": "Projekt",
|
|
"success": success
|
|
}
|
|
elif entity == "Parzelle":
|
|
parzelleId = parameters.get("id")
|
|
if not parzelleId:
|
|
raise ValueError("DELETE operation requires entity ID")
|
|
|
|
success = realEstateInterface.deleteParzelle(parzelleId)
|
|
return {
|
|
"operation": "DELETE",
|
|
"entity": "Parzelle",
|
|
"success": success
|
|
}
|
|
elif entity == "Gemeinde":
|
|
from modules.datamodels.datamodelRealEstate import Gemeinde
|
|
gemeindeId = parameters.get("id")
|
|
if not gemeindeId:
|
|
raise ValueError("DELETE operation requires entity ID")
|
|
|
|
success = realEstateInterface.deleteGemeinde(gemeindeId)
|
|
return {
|
|
"operation": "DELETE",
|
|
"entity": "Gemeinde",
|
|
"success": success
|
|
}
|
|
elif entity == "Kanton":
|
|
from modules.datamodels.datamodelRealEstate import Kanton
|
|
kantonId = parameters.get("id")
|
|
if not kantonId:
|
|
raise ValueError("DELETE operation requires entity ID")
|
|
|
|
success = realEstateInterface.deleteKanton(kantonId)
|
|
return {
|
|
"operation": "DELETE",
|
|
"entity": "Kanton",
|
|
"success": success
|
|
}
|
|
elif entity == "Land":
|
|
from modules.datamodels.datamodelRealEstate import Land
|
|
landId = parameters.get("id")
|
|
if not landId:
|
|
raise ValueError("DELETE operation requires entity ID")
|
|
|
|
success = realEstateInterface.deleteLand(landId)
|
|
return {
|
|
"operation": "DELETE",
|
|
"entity": "Land",
|
|
"success": success
|
|
}
|
|
elif entity == "Dokument":
|
|
from modules.datamodels.datamodelRealEstate import Dokument
|
|
dokumentId = parameters.get("id")
|
|
if not dokumentId:
|
|
raise ValueError("DELETE operation requires entity ID")
|
|
|
|
success = realEstateInterface.deleteDokument(dokumentId)
|
|
return {
|
|
"operation": "DELETE",
|
|
"entity": "Dokument",
|
|
"success": success
|
|
}
|
|
else:
|
|
raise ValueError(f"DELETE operation not supported for entity: {entity}")
|
|
|
|
else:
|
|
raise ValueError(f"Unknown intent: {intent}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing intent-based operation: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
|
|
# ===== Project Creation with Parcel Data =====
|
|
|
|
async def create_project_with_parcel_data(
|
|
currentUser: User,
|
|
mandateId: str,
|
|
projekt_label: str,
|
|
parzellen_data: List[Dict[str, Any]],
|
|
status_prozess: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a Projekt with one or more Parzellen from provided parcel data.
|
|
|
|
Args:
|
|
currentUser: Current authenticated user
|
|
mandateId: Mandate context (from RequestContext / X-Mandate-Id header)
|
|
projekt_label: Label for the Projekt
|
|
parzellen_data: List of dictionaries containing parcel information from request
|
|
status_prozess: Optional project status (defaults to "Eingang")
|
|
|
|
Returns:
|
|
Dictionary containing created Projekt and list of Parzellen
|
|
|
|
Raises:
|
|
HTTPException: If Gemeinde or Kanton not found, or validation fails
|
|
"""
|
|
try:
|
|
logger.info(f"Creating project '{projekt_label}' with {len(parzellen_data)} parcel(s) for user {currentUser.id}")
|
|
|
|
# Get interface with mandate context
|
|
realEstateInterface = getRealEstateInterface(currentUser, mandateId=mandateId)
|
|
|
|
# Validate required fields
|
|
if not projekt_label:
|
|
raise ValueError("Projekt label is required")
|
|
|
|
if not parzellen_data or len(parzellen_data) == 0:
|
|
raise ValueError("At least one Parzelle data is required")
|
|
|
|
# Validate all parcels have required fields
|
|
for idx, parzelle_data in enumerate(parzellen_data):
|
|
if not parzelle_data.get("perimeter"):
|
|
raise ValueError(f"Parzelle {idx + 1} perimeter is required")
|
|
|
|
# Helper function to convert GeoJSON geometry to GeoPolylinie (defined early for use in geometry collection)
|
|
def convert_geojson_to_geopolylinie(geometry_data: Dict[str, Any]) -> Optional[GeoPolylinie]:
|
|
"""Convert GeoJSON geometry to GeoPolylinie format."""
|
|
if not geometry_data:
|
|
return None
|
|
|
|
# Handle nested geometry structure (geometry.geometry.coordinates)
|
|
if "geometry" in geometry_data:
|
|
geometry_data = geometry_data["geometry"]
|
|
|
|
geometry_type = geometry_data.get("type")
|
|
coordinates = geometry_data.get("coordinates")
|
|
|
|
if not coordinates or geometry_type != "Polygon":
|
|
return None
|
|
|
|
# Extract outer ring (first array of coordinates)
|
|
if not coordinates or len(coordinates) == 0:
|
|
return None
|
|
|
|
ring = coordinates[0] # Outer ring
|
|
|
|
# Convert coordinates to GeoPunkt list
|
|
punkte = []
|
|
for coord in ring:
|
|
if len(coord) >= 2:
|
|
punkt = GeoPunkt(
|
|
koordinatensystem="LV95",
|
|
x=float(coord[0]),
|
|
y=float(coord[1]),
|
|
z=float(coord[2]) if len(coord) > 2 else None
|
|
)
|
|
punkte.append(punkt)
|
|
|
|
if not punkte:
|
|
return None
|
|
|
|
return GeoPolylinie(
|
|
closed=True,
|
|
punkte=punkte
|
|
)
|
|
|
|
# First pass: Collect all parcel geometries for neighbor filtering
|
|
# Convert all perimeters to GeoPolylinie format
|
|
all_parcel_geometries = []
|
|
for parzelle_data in parzellen_data:
|
|
perimeter = parzelle_data.get("perimeter")
|
|
if perimeter:
|
|
# Convert to GeoPolylinie if needed
|
|
if isinstance(perimeter, dict):
|
|
if "punkte" in perimeter and "closed" in perimeter:
|
|
try:
|
|
geo_perimeter = GeoPolylinie(**perimeter)
|
|
all_parcel_geometries.append(geo_perimeter)
|
|
except Exception as e:
|
|
logger.warning(f"Error converting perimeter to GeoPolylinie: {e}")
|
|
else:
|
|
# Try GeoJSON conversion
|
|
converted = convert_geojson_to_geopolylinie(perimeter)
|
|
if converted:
|
|
all_parcel_geometries.append(converted)
|
|
elif isinstance(perimeter, GeoPolylinie):
|
|
all_parcel_geometries.append(perimeter)
|
|
|
|
# Process all parcels - create each one or use existing
|
|
created_parzellen = []
|
|
parcel_perimeters = [] # Collect all parcel perimeters for baulinie calculation
|
|
|
|
for idx, parzelle_data in enumerate(parzellen_data):
|
|
logger.info(f"Processing Parzelle {idx + 1}/{len(parzellen_data)}")
|
|
|
|
# Determine parcel label for uniqueness check
|
|
parcel_label = parzelle_data.get("id") or parzelle_data.get("number") or parzelle_data.get("label") or "Unknown"
|
|
|
|
# Check if Parzelle with this label already exists
|
|
existing_parzellen = realEstateInterface.getParzellen(
|
|
recordFilter={"label": parcel_label, "mandateId": mandateId}
|
|
)
|
|
|
|
if existing_parzellen and len(existing_parzellen) > 0:
|
|
# Parzelle already exists - use existing one
|
|
existing_parzelle = existing_parzellen[0]
|
|
logger.info(f"Parzelle with label '{parcel_label}' already exists (ID: {existing_parzelle.id}), reusing it")
|
|
|
|
# Collect perimeter for baulinie calculation
|
|
if existing_parzelle.perimeter:
|
|
parcel_perimeters.append(existing_parzelle.perimeter)
|
|
|
|
# Add to list of created parcels (actually existing)
|
|
created_parzellen.append(existing_parzelle)
|
|
continue # Skip creation, use existing
|
|
|
|
# Parzelle does not exist - create new one
|
|
logger.info(f"Parzelle with label '{parcel_label}' does not exist, creating new one")
|
|
|
|
# Resolve Gemeinde and Kanton for this parcel (create if not found)
|
|
gemeinde_id = None
|
|
canton_abk = parzelle_data.get("canton")
|
|
municipality_name = parzelle_data.get("municipality_name")
|
|
|
|
logger.debug(f"Resolving Gemeinde/Kanton: canton='{canton_abk}', municipality='{municipality_name}'")
|
|
|
|
if municipality_name and canton_abk:
|
|
# Mapping of canton abbreviations to full names
|
|
canton_names = {
|
|
"ZH": "Zürich", "BE": "Bern", "LU": "Luzern", "UR": "Uri", "SZ": "Schwyz",
|
|
"OW": "Obwalden", "NW": "Nidwalden", "GL": "Glarus", "ZG": "Zug", "FR": "Freiburg",
|
|
"SO": "Solothurn", "BS": "Basel-Stadt", "BL": "Basel-Landschaft", "SH": "Schaffhausen",
|
|
"AR": "Appenzell Ausserrhoden", "AI": "Appenzell Innerrhoden", "SG": "St. Gallen",
|
|
"GR": "Graubünden", "AG": "Aargau", "TG": "Thurgau", "TI": "Tessin",
|
|
"VD": "Waadt", "VS": "Wallis", "NE": "Neuenburg", "GE": "Genf", "JU": "Jura"
|
|
}
|
|
|
|
# First, ensure Land "Schweiz" exists
|
|
logger.debug("Ensuring Land 'Schweiz' exists")
|
|
laender = realEstateInterface.getLaender(recordFilter={"label": "Schweiz"})
|
|
if not laender:
|
|
logger.info("Creating Land 'Schweiz'")
|
|
land = Land(
|
|
mandateId=mandateId,
|
|
label="Schweiz",
|
|
abk="CH"
|
|
)
|
|
land = realEstateInterface.createLand(land)
|
|
logger.info(f"Created Land 'Schweiz' with ID: {land.id}")
|
|
else:
|
|
land = laender[0]
|
|
logger.debug(f"Found Land 'Schweiz' with ID: {land.id}")
|
|
|
|
# Then, lookup or create Kanton
|
|
logger.debug(f"Looking up Kanton with abk='{canton_abk}'")
|
|
kantone = realEstateInterface.getKantone(recordFilter={"abk": canton_abk})
|
|
logger.debug(f"Found {len(kantone)} Kanton(e) with abk='{canton_abk}'")
|
|
if not kantone:
|
|
logger.info(f"Kanton '{canton_abk}' not found, creating it")
|
|
kanton_label = canton_names.get(canton_abk, canton_abk) # Use mapping or fallback to abk
|
|
kanton = Kanton(
|
|
mandateId=mandateId,
|
|
label=kanton_label,
|
|
abk=canton_abk,
|
|
id_land=land.id
|
|
)
|
|
kanton = realEstateInterface.createKanton(kanton)
|
|
logger.info(f"Created Kanton '{kanton_label}' ({canton_abk}) with ID: {kanton.id}")
|
|
else:
|
|
kanton = kantone[0]
|
|
logger.debug(f"Found Kanton: ID={kanton.id}, Label={kanton.label}, abk={kanton.abk}")
|
|
|
|
# Then, lookup or create Gemeinde
|
|
logger.debug(f"Looking up Gemeinde with label='{municipality_name}' and id_kanton='{kanton.id}'")
|
|
gemeinden = realEstateInterface.getGemeinden(
|
|
recordFilter={"label": municipality_name, "id_kanton": kanton.id}
|
|
)
|
|
logger.debug(f"Found {len(gemeinden)} Gemeinde(n) with label='{municipality_name}' and id_kanton='{kanton.id}'")
|
|
if not gemeinden:
|
|
logger.info(f"Gemeinde '{municipality_name}' in Kanton '{canton_abk}' not found, creating it")
|
|
gemeinde = Gemeinde(
|
|
mandateId=mandateId,
|
|
label=municipality_name,
|
|
id_kanton=kanton.id,
|
|
plz=parzelle_data.get("plz") # Use PLZ directly from Swiss Topo API
|
|
)
|
|
gemeinde = realEstateInterface.createGemeinde(gemeinde)
|
|
logger.info(f"Created Gemeinde '{municipality_name}' with ID: {gemeinde.id}")
|
|
else:
|
|
gemeinde = gemeinden[0]
|
|
logger.debug(f"Found Gemeinde: ID={gemeinde.id}, Label={gemeinde.label}")
|
|
|
|
gemeinde_id = gemeinde.id
|
|
logger.info(f"Resolved Gemeinde '{municipality_name}' to ID '{gemeinde_id}'")
|
|
else:
|
|
logger.warning(f"Missing Gemeinde/Kanton data: municipality_name={municipality_name}, canton={canton_abk}")
|
|
|
|
# Build parzellenAliasTags
|
|
alias_tags = []
|
|
if parzelle_data.get("egrid"):
|
|
alias_tags.append(parzelle_data["egrid"])
|
|
if parzelle_data.get("number") and parzelle_data["number"] != parzelle_data.get("id"):
|
|
alias_tags.append(parzelle_data["number"])
|
|
|
|
# Extract address information from Swiss Topo API data
|
|
# Each parcel should have its own address data from Swiss Topo API
|
|
# The address comes from the parcel search API response for THIS specific parcel
|
|
strasse_nr = None
|
|
plz = None
|
|
|
|
# Use address from Swiss Topo API - this is specific to THIS parcel
|
|
# The address field contains the full address string from Swiss Topo
|
|
address = parzelle_data.get("address")
|
|
if address:
|
|
# Swiss Topo provides full address string like "Street Number, PLZ City"
|
|
# Parse to extract street and number (before comma)
|
|
parts = address.split(",")
|
|
if len(parts) >= 1:
|
|
strasse_nr = parts[0].strip()
|
|
# PLZ is provided separately by Swiss Topo API
|
|
plz = parzelle_data.get("plz")
|
|
|
|
# Log address info for debugging
|
|
logger.debug(f"Parzelle {idx + 1} address data: strasse_nr='{strasse_nr}', plz='{plz}', full_address='{address}'")
|
|
|
|
# If no address found, log warning but continue
|
|
if not strasse_nr and not plz:
|
|
logger.warning(f"No address data found for Parzelle {idx + 1} (label: {parcel_label})")
|
|
|
|
# Build kontextInformationen
|
|
kontext_items = []
|
|
|
|
if parzelle_data.get("egrid"):
|
|
kontext_items.append(Kontext(
|
|
thema="EGRID",
|
|
inhalt=parzelle_data["egrid"]
|
|
))
|
|
|
|
if parzelle_data.get("identnd"):
|
|
kontext_items.append(Kontext(
|
|
thema="IdentND",
|
|
inhalt=parzelle_data["identnd"]
|
|
))
|
|
|
|
if parzelle_data.get("area_m2"):
|
|
kontext_items.append(Kontext(
|
|
thema="Fläche",
|
|
inhalt=f"{parzelle_data['area_m2']} m²"
|
|
))
|
|
|
|
if parzelle_data.get("centroid"):
|
|
centroid = parzelle_data["centroid"]
|
|
kontext_items.append(Kontext(
|
|
thema="Zentrum (LV95)",
|
|
inhalt=f"X: {centroid.get('x')} m, Y: {centroid.get('y')} m (EPSG:2056)"
|
|
))
|
|
|
|
if parzelle_data.get("geoportal_url"):
|
|
kontext_items.append(Kontext(
|
|
thema="Geoportal URL",
|
|
inhalt=parzelle_data["geoportal_url"]
|
|
))
|
|
|
|
if parzelle_data.get("municipality_code"):
|
|
kontext_items.append(Kontext(
|
|
thema="BFS-Nummer",
|
|
inhalt=str(parzelle_data["municipality_code"])
|
|
))
|
|
|
|
# Handle adjacent parcels - filter out selected parcels geometrically
|
|
adjacent_parcel_refs = []
|
|
if parzelle_data.get("adjacent_parcels"):
|
|
# Filter neighbors to exclude selected parcels
|
|
neighbors_to_filter = []
|
|
for adj_parcel in parzelle_data["adjacent_parcels"]:
|
|
if isinstance(adj_parcel, dict):
|
|
neighbors_to_filter.append(adj_parcel)
|
|
elif isinstance(adj_parcel, str):
|
|
neighbors_to_filter.append({"id": adj_parcel})
|
|
|
|
# Filter using geometry comparison if we have geometries
|
|
if all_parcel_geometries and neighbors_to_filter:
|
|
try:
|
|
filtered_neighbors = filter_neighbor_parcels(
|
|
neighbors_to_filter,
|
|
all_parcel_geometries
|
|
)
|
|
# Extract IDs from filtered neighbors
|
|
for filtered_neighbor in filtered_neighbors:
|
|
adj_id = filtered_neighbor.get("id")
|
|
if adj_id:
|
|
adjacent_parcel_refs.append({"id": adj_id})
|
|
except Exception as e:
|
|
logger.warning(f"Error filtering neighbor parcels: {e}, including all neighbors")
|
|
# Fallback: include all neighbors if filtering fails
|
|
for adj_parcel in parzelle_data["adjacent_parcels"]:
|
|
if isinstance(adj_parcel, dict):
|
|
adj_id = adj_parcel.get("id")
|
|
if adj_id:
|
|
adjacent_parcel_refs.append({"id": adj_id})
|
|
elif isinstance(adj_parcel, str):
|
|
adjacent_parcel_refs.append({"id": adj_parcel})
|
|
else:
|
|
# No geometries available - include all neighbors
|
|
for adj_parcel in parzelle_data["adjacent_parcels"]:
|
|
if isinstance(adj_parcel, dict):
|
|
adj_id = adj_parcel.get("id")
|
|
if adj_id:
|
|
adjacent_parcel_refs.append({"id": adj_id})
|
|
elif isinstance(adj_parcel, str):
|
|
adjacent_parcel_refs.append({"id": adj_parcel})
|
|
|
|
# Convert perimeter to GeoPolylinie if needed
|
|
perimeter = parzelle_data.get("perimeter")
|
|
if isinstance(perimeter, dict):
|
|
# Check if it's already in GeoPolylinie format (has punkte and closed)
|
|
if "punkte" in perimeter and "closed" in perimeter:
|
|
try:
|
|
perimeter = GeoPolylinie(**perimeter)
|
|
except Exception as e:
|
|
raise ValueError(f"Invalid perimeter format: {str(e)}")
|
|
else:
|
|
# Try to convert from GeoJSON format
|
|
converted = convert_geojson_to_geopolylinie(perimeter)
|
|
if converted:
|
|
perimeter = converted
|
|
else:
|
|
raise ValueError("Invalid perimeter format: cannot convert to GeoPolylinie")
|
|
elif isinstance(perimeter, GeoPolylinie):
|
|
# Already a GeoPolylinie instance, use as-is
|
|
pass
|
|
else:
|
|
raise ValueError("Invalid perimeter type: must be dict or GeoPolylinie")
|
|
|
|
# Extract baulinie from geometry if provided
|
|
baulinie = None
|
|
geometry = parzelle_data.get("geometry")
|
|
logger.debug(f"Geometry present: {geometry is not None}")
|
|
if geometry:
|
|
logger.debug(f"Geometry type: {type(geometry)}, keys: {list(geometry.keys()) if isinstance(geometry, dict) else 'not a dict'}")
|
|
baulinie = convert_geojson_to_geopolylinie(geometry)
|
|
if baulinie:
|
|
logger.info(f"Extracted baulinie from geometry with {len(baulinie.punkte)} points")
|
|
else:
|
|
logger.warning("Failed to extract baulinie from geometry")
|
|
else:
|
|
logger.warning("No geometry found in parzelle_data")
|
|
|
|
# Build Parzelle data
|
|
parzelle_create_data = {
|
|
"mandateId": mandateId,
|
|
"label": parcel_label, # Use the label we determined earlier for uniqueness check
|
|
"parzellenAliasTags": alias_tags,
|
|
"eigentuemerschaft": None,
|
|
"strasseNr": strasse_nr,
|
|
"plz": plz,
|
|
"perimeter": perimeter,
|
|
"baulinie": baulinie,
|
|
"kontextGemeinde": gemeinde_id,
|
|
"bauzone": None,
|
|
"az": None,
|
|
"bz": None,
|
|
"vollgeschossZahl": None,
|
|
"anrechenbarDachgeschoss": None,
|
|
"anrechenbarUntergeschoss": None,
|
|
"gebaeudehoeheMax": None,
|
|
"regelnGrenzabstand": [],
|
|
"regelnMehrlaengenzuschlag": [],
|
|
"regelnMehrhoehenzuschlag": [],
|
|
"parzelleBebaut": None,
|
|
"parzelleErschlossen": None,
|
|
"parzelleHanglage": None,
|
|
"laermschutzzone": None,
|
|
"hochwasserschutzzone": None,
|
|
"grundwasserschutzzone": None,
|
|
"parzellenNachbarschaft": adjacent_parcel_refs,
|
|
"dokumente": [],
|
|
"kontextInformationen": kontext_items,
|
|
}
|
|
|
|
# Create Parzelle instance
|
|
logger.debug(f"Creating Parzelle with label: {parzelle_create_data.get('label')}")
|
|
logger.debug(f"Parzelle mandateId: {parzelle_create_data.get('mandateId')}")
|
|
logger.debug(f"Parzelle perimeter present: {parzelle_create_data.get('perimeter') is not None}")
|
|
|
|
try:
|
|
parzelle_instance = Parzelle(**parzelle_create_data)
|
|
logger.debug(f"Parzelle instance created successfully with ID: {parzelle_instance.id}")
|
|
except Exception as e:
|
|
logger.error(f"Error creating Parzelle instance: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
# Create Parzelle in database
|
|
try:
|
|
logger.info(f"Calling createParzelle for Parzelle '{parzelle_instance.label}' (ID: {parzelle_instance.id})")
|
|
logger.debug(f"Parzelle instance before createParzelle: {parzelle_instance.model_dump(mode='json', exclude={'perimeter', 'baulinie', 'kontextInformationen'})}")
|
|
|
|
# Use model_dump with mode='json' to ensure nested Pydantic models are serialized
|
|
parzelle_dict = parzelle_instance.model_dump(mode='json')
|
|
logger.debug(f"Parzelle dict keys: {list(parzelle_dict.keys())}")
|
|
|
|
# Create Parzelle using the interface, which will handle serialization
|
|
created_parzelle = realEstateInterface.createParzelle(parzelle_instance)
|
|
|
|
logger.info(f"createParzelle returned: ID={created_parzelle.id if created_parzelle else 'None'}, Label={created_parzelle.label if created_parzelle else 'None'}")
|
|
|
|
# Verify Parzelle was created successfully
|
|
if not created_parzelle:
|
|
raise ValueError("Failed to create Parzelle - createParzelle returned None")
|
|
|
|
if not created_parzelle.id:
|
|
raise ValueError("Failed to create Parzelle - no ID returned")
|
|
|
|
logger.info(f"Parzelle created with ID: {created_parzelle.id}")
|
|
|
|
# Verify Parzelle exists in database by fetching it
|
|
logger.debug(f"Verifying Parzelle {created_parzelle.id} exists in database...")
|
|
verify_parzelle = realEstateInterface.getParzelle(created_parzelle.id)
|
|
if not verify_parzelle:
|
|
logger.error(f"Parzelle {created_parzelle.id} was not found in database after creation")
|
|
# Try to get all Parzellen to see what's in the database
|
|
all_parzellen = realEstateInterface.getParzellen(recordFilter=None)
|
|
logger.error(f"Total Parzellen in database: {len(all_parzellen)}")
|
|
if all_parzellen:
|
|
logger.error(f"Sample Parzelle IDs: {[p.id for p in all_parzellen[:5]]}")
|
|
raise ValueError(f"Parzelle {created_parzelle.id} was not found in database after creation")
|
|
|
|
logger.info(f"Verified Parzelle {created_parzelle.id} exists in database")
|
|
# Use the verified Parzelle from database to ensure it has all fields
|
|
created_parzelle = verify_parzelle
|
|
|
|
# Collect perimeter for baulinie calculation
|
|
if created_parzelle.perimeter:
|
|
parcel_perimeters.append(created_parzelle.perimeter)
|
|
|
|
# Add to list of created parcels
|
|
created_parzellen.append(created_parzelle)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating Parzelle {idx + 1}: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
if not created_parzellen:
|
|
raise ValueError("No Parzellen were successfully created")
|
|
|
|
logger.info(f"Successfully created {len(created_parzellen)} Parzelle(n)")
|
|
|
|
# Calculate combined baulinie from all parcel perimeters
|
|
project_baulinie = None
|
|
if len(parcel_perimeters) > 0:
|
|
try:
|
|
if len(parcel_perimeters) == 1:
|
|
# Single parcel - use its perimeter as baulinie
|
|
project_baulinie = parcel_perimeters[0]
|
|
logger.info("Using single parcel perimeter as baulinie")
|
|
else:
|
|
# Multiple parcels - combine geometries to create outer outline
|
|
logger.info(f"Combining {len(parcel_perimeters)} parcel geometries to create baulinie")
|
|
project_baulinie = combine_parcel_geometries(parcel_perimeters)
|
|
logger.info(f"Created combined baulinie with {len(project_baulinie.punkte)} points")
|
|
except Exception as e:
|
|
logger.error(f"Error combining parcel geometries for baulinie: {e}", exc_info=True)
|
|
# Fallback: use first parcel's perimeter
|
|
if parcel_perimeters:
|
|
project_baulinie = parcel_perimeters[0]
|
|
logger.warning("Using first parcel perimeter as fallback baulinie")
|
|
|
|
# Convert status_prozess to enum
|
|
status_prozess_enum = None
|
|
if status_prozess:
|
|
try:
|
|
# Try to convert string to enum
|
|
if isinstance(status_prozess, str):
|
|
status_prozess_enum = StatusProzess(status_prozess)
|
|
elif isinstance(status_prozess, StatusProzess):
|
|
status_prozess_enum = status_prozess
|
|
except (ValueError, KeyError):
|
|
logger.warning(f"Invalid statusProzess '{status_prozess}', using default 'Eingang'")
|
|
status_prozess_enum = StatusProzess.EINGANG
|
|
else:
|
|
status_prozess_enum = StatusProzess.EINGANG
|
|
|
|
# Create Projekt with combined baulinie
|
|
# Use the verified Parzelle instance (from database) to ensure it has all fields properly set
|
|
logger.debug(f"Preparing Projekt creation with baulinie: {project_baulinie is not None}")
|
|
if project_baulinie:
|
|
logger.debug(f"Baulinie has {len(project_baulinie.punkte)} points")
|
|
|
|
# Use first parcel's perimeter for project perimeter (or combine if needed)
|
|
project_perimeter = created_parzellen[0].perimeter if created_parzellen else None
|
|
|
|
projekt_create_data = {
|
|
"mandateId": mandateId,
|
|
"label": projekt_label,
|
|
"statusProzess": status_prozess_enum,
|
|
"perimeter": project_perimeter, # Use first parcel perimeter as project perimeter
|
|
"baulinie": project_baulinie, # Set baulinie from first parcel geometry
|
|
"parzellen": created_parzellen, # Link all created Parzelle instances
|
|
"dokumente": [],
|
|
"kontextInformationen": [],
|
|
}
|
|
|
|
logger.debug(f"Projekt data prepared: label={projekt_label}, parzellen_count={len(projekt_create_data['parzellen'])}, baulinie={'present' if project_baulinie else 'None'}")
|
|
|
|
try:
|
|
projekt_instance = Projekt(**projekt_create_data)
|
|
logger.debug(f"Projekt instance created successfully with ID: {projekt_instance.id}")
|
|
except Exception as e:
|
|
logger.error(f"Error creating Projekt instance: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
# Log before creation for debugging
|
|
logger.debug(f"Creating Projekt with {len(projekt_instance.parzellen)} Parzelle(n)")
|
|
if projekt_instance.parzellen:
|
|
for idx, p in enumerate(projekt_instance.parzellen):
|
|
logger.debug(f" Parzelle {idx}: ID={p.id}, Label={p.label}")
|
|
|
|
logger.debug(f"Projekt baulinie before save: {projekt_instance.baulinie is not None}")
|
|
if projekt_instance.baulinie:
|
|
logger.debug(f"Projekt baulinie has {len(projekt_instance.baulinie.punkte)} points")
|
|
|
|
try:
|
|
created_projekt = realEstateInterface.createProjekt(projekt_instance)
|
|
logger.info(f"Created Projekt '{created_projekt.label}' (ID: {created_projekt.id})")
|
|
logger.debug(f"Created Projekt baulinie: {created_projekt.baulinie is not None}")
|
|
except Exception as e:
|
|
logger.error(f"Error calling createProjekt: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
# Verify Projekt was created
|
|
if not created_projekt or not created_projekt.id:
|
|
raise ValueError("Failed to create Projekt - no ID returned")
|
|
|
|
# Verify Parzelle is linked in the created Projekt
|
|
if not created_projekt.parzellen or len(created_projekt.parzellen) == 0:
|
|
logger.warning(f"Projekt {created_projekt.id} created but no Parzellen linked")
|
|
# Try to fetch the Projekt from database to see if Parzellen are there
|
|
verify_projekt = realEstateInterface.getProjekt(created_projekt.id)
|
|
if verify_projekt and verify_projekt.parzellen:
|
|
logger.info(f"Parzellen found when fetching Projekt from database: {len(verify_projekt.parzellen)}")
|
|
created_projekt = verify_projekt
|
|
else:
|
|
raise ValueError(f"Projekt {created_projekt.id} has no Parzellen linked after creation")
|
|
else:
|
|
logger.info(f"Projekt {created_projekt.id} successfully linked to {len(created_projekt.parzellen)} Parzelle(n)")
|
|
# Log Parzelle details
|
|
for idx, p in enumerate(created_projekt.parzellen):
|
|
logger.debug(f" Linked Parzelle {idx}: ID={p.id if hasattr(p, 'id') else 'NO ID'}, Label={p.label if hasattr(p, 'label') else 'NO LABEL'}")
|
|
|
|
return {
|
|
"projekt": created_projekt.model_dump(),
|
|
"parzellen": [p.model_dump() for p in created_parzellen],
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating project with parcel data: {str(e)}", exc_info=True)
|
|
raise
|
|
|