""" Real Estate feature main logic. Handles database operations with AI-powered natural language processing. Stateless implementation without session management. This module also handles feature initialization and RBAC catalog registration. """ import logging # Feature metadata for RBAC catalog FEATURE_CODE = "realestate" FEATURE_LABEL = {"en": "Real Estate", "de": "Immobilien", "fr": "Immobilier"} FEATURE_ICON = "mdi-home-city" # UI Objects for RBAC catalog (only map view) UI_OBJECTS = [ { "objectKey": "ui.feature.realestate.dashboard", "label": {"en": "Map", "de": "Karte", "fr": "Carte"}, "meta": {"area": "dashboard"} }, ] # Resource Objects for RBAC catalog RESOURCE_OBJECTS = [ { "objectKey": "resource.feature.realestate.project.create", "label": {"en": "Create Project", "de": "Projekt erstellen", "fr": "Créer projet"}, "meta": {"endpoint": "/api/realestate/project", "method": "POST"} }, { "objectKey": "resource.feature.realestate.project.delete", "label": {"en": "Delete Project", "de": "Projekt löschen", "fr": "Supprimer projet"}, "meta": {"endpoint": "/api/realestate/project/{projectId}", "method": "DELETE"} }, ] # Template roles for this feature with AccessRules # IMPORTANT: item uses vollqualifizierte ObjectKeys (gemäss Navigation-API-Konzept) TEMPLATE_ROLES = [ { "roleLabel": "realestate-admin", "description": { "en": "Real Estate Administrator - Full access to all property data and settings", "de": "Immobilien-Administrator - Vollzugriff auf alle Immobiliendaten und Einstellungen", "fr": "Administrateur immobilier - Accès complet aux données et paramètres" }, "accessRules": [ # Full UI access (all views including admin views) {"context": "UI", "item": None, "view": True}, # Full DATA access {"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"}, # Admin resources {"context": "RESOURCE", "item": "resource.feature.realestate.project.create", "view": True}, {"context": "RESOURCE", "item": "resource.feature.realestate.project.delete", "view": True}, ] }, { "roleLabel": "realestate-manager", "description": { "en": "Real Estate Manager - Manage properties and tenants", "de": "Immobilien-Verwalter - Immobilien und Mieter verwalten", "fr": "Gestionnaire immobilier - Gérer les propriétés et locataires" }, "accessRules": [ # UI access to map view {"context": "UI", "item": "ui.feature.realestate.dashboard", "view": True}, # Group-level DATA access {"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "g"}, # Resource: create projects {"context": "RESOURCE", "item": "resource.feature.realestate.project.create", "view": True}, ] }, { "roleLabel": "realestate-viewer", "description": { "en": "Real Estate Viewer - View property information", "de": "Immobilien-Betrachter - Immobilien-Informationen einsehen", "fr": "Visualiseur immobilier - Consulter les informations immobilières" }, "accessRules": [ # UI access to map view (read-only) {"context": "UI", "item": "ui.feature.realestate.dashboard", "view": True}, # Read-only DATA access (my records) {"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"}, ] }, ] def getFeatureDefinition(): """Return the feature definition for registration.""" return { "code": FEATURE_CODE, "label": FEATURE_LABEL, "icon": FEATURE_ICON } def getUiObjects(): """Return UI objects for RBAC catalog registration.""" return UI_OBJECTS def getResourceObjects(): """Return resource objects for RBAC catalog registration.""" return RESOURCE_OBJECTS def getTemplateRoles(): """Return template roles for this feature.""" return TEMPLATE_ROLES def registerFeature(catalogService) -> bool: """Register this feature's RBAC objects in the catalog.""" try: for uiObj in UI_OBJECTS: catalogService.registerUiObject( featureCode=FEATURE_CODE, objectKey=uiObj["objectKey"], label=uiObj["label"], meta=uiObj.get("meta") ) for resObj in RESOURCE_OBJECTS: catalogService.registerResourceObject( featureCode=FEATURE_CODE, objectKey=resObj["objectKey"], label=resObj["label"], meta=resObj.get("meta") ) # Sync template roles to database (with AccessRules) _syncTemplateRolesToDb() return True except Exception as e: logging.getLogger(__name__).error(f"Failed to register feature '{FEATURE_CODE}': {e}") return False def _syncTemplateRolesToDb() -> int: """ Sync template roles and their AccessRules to the database. Creates global template roles (mandateId=None) if they don't exist. """ try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext rootInterface = getRootInterface() db = rootInterface.db existingRoles = db.getRecordset( Role, recordFilter={"featureCode": FEATURE_CODE, "mandateId": None} ) existingRoleLabels = {r.get("roleLabel"): r.get("id") for r in existingRoles} createdCount = 0 for roleTemplate in TEMPLATE_ROLES: roleLabel = roleTemplate["roleLabel"] if roleLabel in existingRoleLabels: roleId = existingRoleLabels[roleLabel] _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) else: newRole = Role( roleLabel=roleLabel, description=roleTemplate.get("description", {}), featureCode=FEATURE_CODE, mandateId=None, featureInstanceId=None, isSystemRole=False ) createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump()) roleId = createdRole.get("id") existingRoleLabels[roleLabel] = roleId _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) logging.getLogger(__name__).info(f"Created template role '{roleLabel}' with ID {roleId}") createdCount += 1 if createdCount > 0: logging.getLogger(__name__).info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles") _repairInstanceRolesAccessRules(rootInterface, existingRoleLabels) return createdCount except Exception as e: logging.getLogger(__name__).error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}") return 0 def _repairInstanceRolesAccessRules(rootInterface, templateRoleLabels: dict) -> int: """Repair instance-specific roles by copying AccessRules from their template roles.""" from modules.datamodels.datamodelRbac import Role, AccessRule repairedCount = 0 allRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE) instanceRoles = [r for r in allRoles if r.mandateId is not None] for instanceRole in instanceRoles: roleLabel = instanceRole.roleLabel instanceRoleId = str(instanceRole.id) templateRoleId = templateRoleLabels.get(roleLabel) if not templateRoleId: continue existingRules = rootInterface.getAccessRulesByRole(instanceRoleId) if existingRules: continue templateRules = rootInterface.getAccessRulesByRole(templateRoleId) if not templateRules: continue for rule in templateRules: newRule = AccessRule( roleId=instanceRoleId, context=rule.context, item=rule.item, view=rule.view if rule.view else False, read=rule.read, create=rule.create, update=rule.update, delete=rule.delete, ) rootInterface.db.recordCreate(AccessRule, newRule.model_dump()) repairedCount += 1 return repairedCount def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: list) -> int: """Ensure AccessRules exist for a role based on templates.""" from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext existingRules = rootInterface.getAccessRulesByRole(roleId) # IMPORTANT: Use .value for enum comparison, not str() which gives "AccessRuleContext.DATA" in Python 3.11+ existingSignatures = {(r.context.value if r.context else None, r.item) for r in existingRules} createdCount = 0 for template in ruleTemplates or []: context = template.get("context", "UI") item = template.get("item") if (context, item) in existingSignatures: continue if context == "UI": contextEnum = AccessRuleContext.UI elif context == "DATA": contextEnum = AccessRuleContext.DATA elif context == "RESOURCE": contextEnum = AccessRuleContext.RESOURCE else: contextEnum = context newRule = AccessRule( roleId=roleId, context=contextEnum, item=item, view=template.get("view", False), read=template.get("read"), create=template.get("create"), update=template.get("update"), delete=template.get("delete"), ) rootInterface.db.recordCreate(AccessRule, newRule.model_dump()) createdCount += 1 existingSignatures.add((context, item)) return createdCount import json from typing import Optional, Dict, Any, List from fastapi import HTTPException, status from shapely.geometry import Polygon from shapely.ops import unary_union from modules.datamodels.datamodelUam import User from .datamodelFeatureRealEstate import ( Projekt, Parzelle, StatusProzess, GeoPolylinie, GeoPunkt, Kontext, Gemeinde, Kanton, Land, DokumentTyp, ) from modules.services import getInterface as getServices from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector from modules.features.realEstate.bzoDocumentRetriever import BZODocumentRetriever from modules.features.realEstate.bzoExtractionLangGraph import run_extraction, run_bzo_params_extraction from modules.features.realEstate.parcelSelectionService import compute_selection_summary from modules.features.realEstate.realEstateGemeindeService import ( ensure_single_gemeinde, fetch_bzo_for_gemeinde, ) logger = logging.getLogger(__name__) # ===== Geometry Utilities ===== def geopolylinie_to_shapely_polygon(geopolylinie: GeoPolylinie) -> Polygon: """ Convert GeoPolylinie to Shapely Polygon. Args: geopolylinie: GeoPolylinie instance with punkte list Returns: Shapely Polygon object """ if not geopolylinie or not geopolylinie.punkte: raise ValueError("GeoPolylinie must have at least one point") # Extract coordinates from punkte coordinates = [] for punkt in geopolylinie.punkte: coordinates.append((punkt.x, punkt.y)) # Ensure polygon is closed (first point == last point) if len(coordinates) < 3: raise ValueError("Polygon must have at least 3 points") # Close polygon if not already closed if coordinates[0] != coordinates[-1]: coordinates.append(coordinates[0]) return Polygon(coordinates) def shapely_polygon_to_geopolylinie(polygon: Polygon) -> GeoPolylinie: """ Convert Shapely Polygon to GeoPolylinie. Args: polygon: Shapely Polygon object Returns: GeoPolylinie instance with LV95 coordinate system """ if not polygon or polygon.is_empty: raise ValueError("Polygon must not be empty") # Extract exterior coordinates exterior_coords = list(polygon.exterior.coords) # Remove duplicate last point if present (Shapely includes it) if len(exterior_coords) > 1 and exterior_coords[0] == exterior_coords[-1]: exterior_coords = exterior_coords[:-1] # Convert to GeoPunkt list punkte = [] for coord in exterior_coords: punkt = GeoPunkt( koordinatensystem="LV95", x=float(coord[0]), y=float(coord[1]), z=None ) punkte.append(punkt) return GeoPolylinie( closed=True, punkte=punkte ) def combine_parcel_geometries(geometries: List[GeoPolylinie]) -> GeoPolylinie: """ Combine multiple parcel geometries into a single outer outline. Uses Shapely union operation to merge polygons and automatically removes internal edges. The result is a clean outer boundary. Args: geometries: List of GeoPolylinie instances to combine Returns: Combined GeoPolylinie representing the outer outline Raises: ValueError: If geometries list is empty or invalid """ if not geometries or len(geometries) == 0: raise ValueError("At least one geometry is required") if len(geometries) == 1: # Single geometry - return as-is return geometries[0] # Convert all geometries to Shapely Polygons shapely_polygons = [] for geo in geometries: try: polygon = geopolylinie_to_shapely_polygon(geo) if not polygon.is_empty: shapely_polygons.append(polygon) except Exception as e: logger.warning(f"Error converting geometry to Shapely Polygon: {e}") continue if not shapely_polygons: raise ValueError("No valid geometries to combine") if len(shapely_polygons) == 1: # Only one valid polygon - convert back return shapely_polygon_to_geopolylinie(shapely_polygons[0]) # Perform union operation - automatically removes internal edges try: combined = unary_union(shapely_polygons) # Handle MultiPolygon case (disconnected parcels) if hasattr(combined, 'geoms'): # Multiple separate polygons - combine their exteriors # For now, take the largest polygon or combine all exteriors # In practice, we might want to keep them separate or combine differently largest = max(combined.geoms, key=lambda p: p.area) combined = largest # Extract outer boundary if combined.is_empty: raise ValueError("Union resulted in empty geometry") # Convert back to GeoPolylinie result = shapely_polygon_to_geopolylinie(combined) logger.info(f"Combined {len(geometries)} geometries into single outline with {len(result.punkte)} points") return result except Exception as e: logger.error(f"Error combining geometries: {e}", exc_info=True) raise ValueError(f"Failed to combine geometries: {str(e)}") def filter_neighbor_parcels( neighbors: List[Dict[str, Any]], selected_geometries: List[GeoPolylinie] ) -> List[Dict[str, Any]]: """ Filter neighbor parcels to exclude those that are part of the selected parcels. Uses geometric comparison to check if neighbor parcels intersect or touch any of the selected parcel geometries. Args: neighbors: List of neighbor parcel dictionaries (must have 'perimeter' or 'geometry_geojson') selected_geometries: List of GeoPolylinie instances representing selected parcels Returns: Filtered list of neighbor parcels (excluding selected ones) """ if not neighbors or not selected_geometries: return neighbors # Convert selected geometries to Shapely Polygons for comparison selected_polygons = [] for geo in selected_geometries: try: polygon = geopolylinie_to_shapely_polygon(geo) if not polygon.is_empty: selected_polygons.append(polygon) except Exception as e: logger.warning(f"Error converting selected geometry for filtering: {e}") continue if not selected_polygons: # No valid selected geometries - return all neighbors return neighbors # Filter neighbors filtered_neighbors = [] for neighbor in neighbors: try: # Try to get geometry from neighbor neighbor_geometry = None # Check for perimeter (GeoPolylinie format) if neighbor.get("perimeter"): perimeter = neighbor["perimeter"] if isinstance(perimeter, dict) and perimeter.get("punkte"): # Convert to GeoPolylinie punkte = [] for p in perimeter["punkte"]: punkt = GeoPunkt( koordinatensystem=p.get("koordinatensystem", "LV95"), x=float(p.get("x", 0)), y=float(p.get("y", 0)), z=p.get("z") ) punkte.append(punkt) neighbor_geometry = GeoPolylinie(closed=True, punkte=punkte) # Check for geometry_geojson elif neighbor.get("geometry_geojson"): geo_json = neighbor["geometry_geojson"] geometry = geo_json.get("geometry") if isinstance(geo_json, dict) else geo_json if geometry and geometry.get("type") == "Polygon": coordinates = geometry.get("coordinates", []) if coordinates and len(coordinates) > 0: ring = coordinates[0] # Outer ring punkte = [] for coord in ring: if len(coord) >= 2: punkt = GeoPunkt( koordinatensystem="LV95", x=float(coord[0]), y=float(coord[1]), z=float(coord[2]) if len(coord) > 2 else None ) punkte.append(punkt) neighbor_geometry = GeoPolylinie(closed=True, punkte=punkte) if not neighbor_geometry: # No geometry available - include neighbor (can't filter without geometry) filtered_neighbors.append(neighbor) continue # Convert neighbor geometry to Shapely Polygon neighbor_polygon = geopolylinie_to_shapely_polygon(neighbor_geometry) # Check if neighbor intersects or touches any selected parcel is_selected = False for selected_polygon in selected_polygons: if neighbor_polygon.intersects(selected_polygon) or neighbor_polygon.touches(selected_polygon): # Check if they're actually the same (within tolerance) # If areas are very similar, it's likely the same parcel area_diff = abs(neighbor_polygon.area - selected_polygon.area) if area_diff < 1.0: # Less than 1 m² difference is_selected = True break # Also check if one contains the other (shouldn't happen for neighbors, but check anyway) if neighbor_polygon.contains(selected_polygon) or selected_polygon.contains(neighbor_polygon): is_selected = True break if not is_selected: filtered_neighbors.append(neighbor) else: logger.debug(f"Filtered out neighbor parcel {neighbor.get('id')} - part of selected parcels") except Exception as e: logger.warning(f"Error filtering neighbor parcel {neighbor.get('id')}: {e}") # On error, include neighbor (better to show too many than too few) filtered_neighbors.append(neighbor) logger.info(f"Filtered {len(neighbors)} neighbors to {len(filtered_neighbors)} (removed {len(neighbors) - len(filtered_neighbors)} selected parcels)") return filtered_neighbors # ===== Swisstopo Integration ===== async def fetch_parcel_polygon_from_swisstopo( gemeinde: str, parzellen_nr: str, sr: int = 2056 ) -> Optional[Dict[str, Any]]: """ Holt die vollständige Polygon-Geometrie einer Parzelle von Swisstopo API. Args: gemeinde: Name der Gemeinde (z.B. "Bern") parzellen_nr: Parzellennummer (z.B. "1234") sr: Koordinatensystem (2056=LV95, 4326=WGS84) Returns: Dictionary mit GeoPolylinie-Format für perimeter-Feld, oder None wenn nicht gefunden Format: {"closed": True, "punkte": [{"koordinatensystem": "LV95", "x": ..., "y": ..., "z": None}, ...]} """ try: connector = SwissTopoMapServerConnector() # Get GeoJSON feature from Swisstopo feature = await connector.get_parcel_polygon(gemeinde, parzellen_nr, sr) if not feature: logger.warning(f"Parzelle {gemeinde} {parzellen_nr} nicht gefunden in Swisstopo") return None # Convert GeoJSON to GeoPolylinie format geometry = feature.get("geometry", {}) if geometry.get("type") == "Polygon": coordinates = geometry.get("coordinates", []) if coordinates and len(coordinates) > 0: ring = coordinates[0] # Outer ring punkte = [] for coord in ring: if len(coord) >= 2: punkt = { "koordinatensystem": "LV95" if sr == 2056 else "WGS84", "x": coord[0], # GeoJSON: [x, y] = [easting, northing] "y": coord[1], "z": coord[2] if len(coord) > 2 else None } punkte.append(punkt) logger.info(f"Successfully fetched polygon with {len(punkte)} points for {gemeinde} {parzellen_nr}") return { "closed": True, "punkte": punkte } logger.warning(f"Unexpected geometry type in Swisstopo response: {geometry.get('type')}") return None except Exception as e: logger.error(f"Error fetching parcel polygon from Swisstopo: {e}", exc_info=True) return None # ===== Direkte Query-Ausführung (stateless) ===== async def executeDirectQuery( currentUser: User, mandateId: str, queryText: str, parameters: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Execute a database query directly without session management. Args: currentUser: Current authenticated user mandateId: Mandate context (from RequestContext / X-Mandate-Id header) queryText: SQL query text parameters: Optional parameters for parameterized queries Returns: Dictionary containing query result (rows, columns, rowCount) Note: - No session or query history is saved - Query is executed directly and result is returned - For production, validate and sanitize queries before execution """ try: logger.info(f"Executing direct query for user {currentUser.id} (mandate: {mandateId})") logger.debug(f"Query text: {queryText}") if parameters: logger.debug(f"Query parameters: {parameters}") # Execute query via Real Estate interface (stateless) realEstateInterface = getRealEstateInterface(currentUser, mandateId=mandateId) result = realEstateInterface.executeQuery(queryText, parameters) logger.info( f"Query executed successfully: {result['rowCount']} rows in {result.get('executionTime', 0):.3f}s" ) return { "status": "success", "rows": result["rows"], "columns": result["columns"], "rowCount": result["rowCount"], "executionTime": result.get("executionTime", 0), } except Exception as e: logger.error(f"Error executing query: {str(e)}", exc_info=True) raise # ===== AI-basierte Intent-Erkennung und CRUD-Operationen ===== def _formatEntitySummary(entity_type: str, items: List[Dict[str, Any]], filters: Dict[str, Any]) -> str: """ Format a human-readable summary of query results. Args: entity_type: Type of entity (Projekt, Parzelle, etc.) items: List of entity data dictionaries filters: Filter parameters used in the query Returns: Human-readable summary string """ if not items: return f"Keine {entity_type} gefunden" count = len(items) filter_desc = "" if filters: # Format filter description if "kontextGemeinde" in filters: filter_desc = f" in {filters['kontextGemeinde']}" elif "plz" in filters: filter_desc = f" mit PLZ {filters['plz']}" elif "location_filter" in filters: filter_desc = f" in {filters['location_filter']}" # Start with count summary = f"Gefunden: {count} {entity_type}{filter_desc}" # Add details based on entity type if entity_type == "Parzelle": summary += "\n\nDetails:" for i, item in enumerate(items[:10], 1): # Limit to first 10 parts = [] # Add label or ID if item.get("label"): parts.append(f"Parzelle '{item['label']}'") elif item.get("id"): parts.append(f"Parzelle {item['id'][:8]}...") # Add address if item.get("strasseNr"): parts.append(item["strasseNr"]) # Add PLZ and municipality location_parts = [] if item.get("plz"): location_parts.append(item["plz"]) if item.get("kontextGemeinde"): location_parts.append(item["kontextGemeinde"]) if location_parts: parts.append(" ".join(location_parts)) # Add building zone if item.get("bauzone"): parts.append(f"Bauzone: {item['bauzone']}") summary += f"\n{i}. {', '.join(parts)}" if count > 10: summary += f"\n... und {count - 10} weitere" elif entity_type == "Projekt": summary += "\n\nDetails:" for i, item in enumerate(items[:10], 1): parts = [] # Add label if item.get("label"): parts.append(f"'{item['label']}'") # Add status if item.get("statusProzess"): parts.append(f"Status: {item['statusProzess']}") # Add parcel count parzellen = item.get("parzellen", []) if parzellen: parts.append(f"{len(parzellen)} Parzelle(n)") summary += f"\n{i}. {' - '.join(parts)}" if count > 10: summary += f"\n... und {count - 10} weitere" elif entity_type == "Gemeinde": summary += "\n\nDetails:" for i, item in enumerate(items[:10], 1): parts = [] if item.get("label"): parts.append(item["label"]) if item.get("plz"): parts.append(f"PLZ: {item['plz']}") if item.get("abk"): parts.append(f"Abk: {item['abk']}") summary += f"\n{i}. {', '.join(parts)}" if count > 10: summary += f"\n... und {count - 10} weitere" elif entity_type == "Dokument": summary += "\n\nDetails:" for i, item in enumerate(items[:10], 1): parts = [] if item.get("label"): parts.append(item["label"]) if item.get("dokumentTyp"): parts.append(f"Typ: {item['dokumentTyp']}") if item.get("quelle"): parts.append(f"Quelle: {item['quelle']}") summary += f"\n{i}. {', '.join(parts)}" if count > 10: summary += f"\n... und {count - 10} weitere" else: # Generic format for other entity types if count <= 5: summary += "\n\nDetails:" for i, item in enumerate(items, 1): label = item.get("label") or item.get("id", "") if label: summary += f"\n{i}. {label}" return summary async def processNaturalLanguageCommand( currentUser: User, mandateId: str, userInput: str, ) -> Dict[str, Any]: """ Process natural language user input and execute corresponding CRUD operations. Uses AI to analyze user intent and extract parameters, then executes the appropriate CRUD operation through the interface. Works stateless without session management. Args: currentUser: Current authenticated user mandateId: Mandate context (from RequestContext / X-Mandate-Id header) userInput: Natural language command from user Returns: Dictionary containing operation result and metadata Example user inputs: - "Erstelle ein neues Projekt namens 'Hauptstrasse 42'" - "Zeige mir alle Projekte in Zürich" - "Aktualisiere Projekt XYZ mit Status 'Planung'" - "Lösche Parzelle ABC" - "SELECT * FROM Projekt WHERE plz = '8000'" """ try: logger.info(f"Processing natural language command for user {currentUser.id} (mandate: {mandateId})") logger.debug(f"User input: {userInput}") # Initialize services for AI access services = getServices(currentUser, workflow=None, mandateId=mandateId) aiService = services.ai # Step 1: Analyze user intent with AI intentAnalysis = await analyzeUserIntent(aiService, userInput) logger.info(f"Intent analysis result: intent={intentAnalysis.get('intent')}, entity={intentAnalysis.get('entity')}") # Step 2: Execute CRUD operation based on intent result = await executeIntentBasedOperation( currentUser=currentUser, mandateId=mandateId, intent=intentAnalysis["intent"], entity=intentAnalysis.get("entity"), parameters=intentAnalysis.get("parameters", {}), ) # Build user-friendly response response = { "success": True, "intent": intentAnalysis["intent"], "entity": intentAnalysis.get("entity"), "result": result, } # Add human-readable summary for operations if intentAnalysis["intent"] == "CREATE" and isinstance(result, dict): # Add confirmation message for CREATE operations operation_result = result.get("result") if isinstance(operation_result, dict): entity_name = intentAnalysis.get('entity', 'Eintrag') label = operation_result.get("label", operation_result.get("id", "")) # Build detailed message msg_parts = [f"✅ {entity_name} '{label}' erfolgreich erstellt"] if entity_name == "Parzelle": if operation_result.get("plz"): msg_parts.append(f"PLZ: {operation_result['plz']}") if operation_result.get("kontextGemeinde"): msg_parts.append(f"Gemeinde: {operation_result['kontextGemeinde']}") if operation_result.get("bauzone"): msg_parts.append(f"Bauzone: {operation_result['bauzone']}") kontext_items = operation_result.get("kontextInformationen", []) if kontext_items: msg_parts.append(f"\n📋 {len(kontext_items)} Kontextinformationen gespeichert:") for kontext in kontext_items[:5]: # Show first 5 thema = kontext.get("thema", "") inhalt = kontext.get("inhalt", "") if thema and inhalt: msg_parts.append(f" • {thema}: {inhalt}") if len(kontext_items) > 5: msg_parts.append(f" • ... und {len(kontext_items) - 5} weitere") elif entity_name == "Projekt": if operation_result.get("statusProzess"): msg_parts.append(f"Status: {operation_result['statusProzess']}") parzellen = operation_result.get("parzellen", []) if parzellen: msg_parts.append(f"{len(parzellen)} Parzelle(n)") response["message"] = "\n".join(msg_parts) elif intentAnalysis["intent"] == "READ" and isinstance(result, dict): operation_result = result.get("result") if isinstance(operation_result, list): response["count"] = len(operation_result) entity_name = intentAnalysis.get('entity', 'Einträge') if len(operation_result) == 0: # Provide helpful message for empty results filter_info = intentAnalysis.get('parameters', {}) if filter_info: filter_desc = ", ".join([f"{k}={v}" for k, v in filter_info.items()]) response["message"] = f"Keine {entity_name} gefunden mit Filter: {filter_desc}. Möglicherweise sind noch keine Daten vorhanden oder der Filter ist zu spezifisch." else: response["message"] = f"Keine {entity_name} vorhanden. Erstellen Sie zuerst neue Einträge." else: # Create detailed summary based on entity type response["message"] = _formatEntitySummary( entity_name, operation_result, intentAnalysis.get('parameters', {}) ) elif isinstance(operation_result, dict): response["count"] = 1 # Format single entity entity_name = intentAnalysis.get('entity', 'Eintrag') response["message"] = _formatEntitySummary(entity_name, [operation_result], {}) return response except Exception as e: logger.error(f"Error processing natural language command: {str(e)}", exc_info=True) raise async def analyzeUserIntent( aiService, userInput: str ) -> Dict[str, Any]: """ Use AI to analyze user input and extract intent, entity, and parameters. Args: aiService: AI service instance userInput: Natural language user input Returns: Dictionary with 'intent', 'entity', and 'parameters' """ # Create a structured prompt for intent analysis with accurate field information intentPrompt = f""" Analyze the following user command and extract the intent, entity, and parameters. User Command: "{userInput}" Available intents: - CREATE: User wants to create a new entity - READ: User wants to read/query entities - UPDATE: User wants to update an existing entity - DELETE: User wants to delete an entity - QUERY: User wants to execute a database query (SQL statements) Available entities and their fields: **Projekt** (Real estate project): - id: string (primary key) - mandateId: string (mandate ID) - label: string (project designation/name) - statusProzess: string enum (Eingang, Analyse, Studie, Planung, Baurechtsverfahren, Umsetzung, Archiv) - perimeter: GeoPolylinie (geographic boundary, JSONB) - baulinie: GeoPolylinie (building line, JSONB) - parzellen: List[Parzelle] (plots belonging to project, JSONB) - dokumente: List[Dokument] (documents, JSONB) - kontextInformationen: List[Kontext] (context info, JSONB) **Parzelle** (Plot/parcel): - id: string (primary key) - mandateId: string (mandate ID) - label: string (plot designation) - strasseNr: string (street and house number) - plz: string (postal code) - kontextGemeinde: string (municipality ID, Foreign Key to Gemeinde table) - bauzone: string (building zone, e.g. W3, WG2) - az: float (Ausnützungsziffer) - bz: float (Bebauungsziffer) - vollgeschossZahl: int (number of allowed full floors) - gebaeudehoeheMax: float (maximum building height in meters) - laermschutzzone: string (noise protection zone) - hochwasserschutzzone: string (flood protection zone) - grundwasserschutzzone: string (groundwater protection zone) - parzelleBebaut: JaNein enum (is plot built) - parzelleErschlossen: JaNein enum (is plot developed) - parzelleHanglage: JaNein enum (is plot on slope) - kontextInformationen: List[Kontext] (metadata - each item has 'thema' and 'inhalt' fields only) **Kontext** (Context information for metadata): - thema: string (topic/subject, e.g. "EGRID", "Fläche", "Zentrum") - inhalt: string (content as text, e.g. "CH887199917793", "6514.99 m²", "X: 123, Y: 456") **Important relationships:** - Projekte contain Parzellen (projects have plots) - Parzelle links to Gemeinde (via kontextGemeinde) - Gemeinde links to Kanton (via id_kanton) - Kanton links to Land (via id_land) - Location queries (city, postal code) should use Parzelle.kontextGemeinde (municipality name will be resolved to ID) - Projekt does NOT have location fields directly - location is stored in associated Parzellen Return a JSON object with the following structure: {{ "intent": "CREATE|READ|UPDATE|DELETE|QUERY", "entity": "Projekt|Parzelle|Dokument|Kanton|Gemeinde|null", "parameters": {{ // Extracted parameters from user input // For CREATE/UPDATE: include all relevant fields using EXACT field names from above // For READ: include filter criteria using EXACT field names (id, label, plz, kontextGemeinde, etc.) // For DELETE: include entity ID if mentioned // For QUERY: include queryText if SQL is detected // IMPORTANT: Use only field names that exist in the entity definition above }}, "confidence": 0.0-1.0 // Confidence score for the analysis }} Examples: - Input: "Erstelle ein neues Projekt namens 'Hauptstrasse 42'" Output: {{"intent": "CREATE", "entity": "Projekt", "parameters": {{"label": "Hauptstrasse 42"}}, "confidence": 0.95}} - Input: "Erstelle eine Parzelle mit Label 123, PLZ 8000, Gemeinde Zürich, Bauzone W3" Output: {{"intent": "CREATE", "entity": "Parzelle", "parameters": {{"label": "123", "plz": "8000", "kontextGemeinde": "Zürich", "bauzone": "W3"}}, "confidence": 0.95}} - Input: "Parzellen-Informationen: ID:AA1704, Nummer:AA1704, EGRID:CH887199917793, Kanton:ZH, Gemeinde:Zürich, Gemeinde-Code:261, Fläche:6514.99 m², Zentrum:2682951.44,1247622.91" Output: {{ "intent": "CREATE", "entity": "Parzelle", "parameters": {{ "label": "AA1704", "parzellenAliasTags": ["AA1704"], "kontextGemeinde": "Zürich", "kontextInformationen": [ {{"thema": "EGRID", "inhalt": "CH887199917793"}}, {{"thema": "Kanton", "inhalt": "ZH"}}, {{"thema": "BFS-Nummer", "inhalt": "261"}}, {{"thema": "Fläche", "inhalt": "6514.99 m²"}}, {{"thema": "Zentrum (LV95)", "inhalt": "X: 2682951.44 m, Y: 1247622.91 m (EPSG:2056)"}} ] }}, "confidence": 0.9 }} Note: Extract structured data from detailed input. Use kontextInformationen for metadata. Each item has 'thema' (topic) and 'inhalt' (content as text). - Input: "Zeige mir alle Projekte" Output: {{"intent": "READ", "entity": "Projekt", "parameters": {{}}, "confidence": 0.9}} - Input: "Zeige mir Projekte in Zürich" or "Wie viele Projekte in Zürich" Output: {{"intent": "READ", "entity": "Projekt", "parameters": {{"location_filter": "Zürich"}}, "confidence": 0.9}} Note: For project location queries, use Projekt entity with location_filter parameter - Input: "Zeige mir Parzellen mit PLZ 8000" Output: {{"intent": "READ", "entity": "Parzelle", "parameters": {{"plz": "8000"}}, "confidence": 0.95}} - Input: "Aktualisiere Projekt XYZ mit Status 'Planung'" Output: {{"intent": "UPDATE", "entity": "Projekt", "parameters": {{"id": "XYZ", "statusProzess": "Planung"}}, "confidence": 0.85}} - Input: "SELECT * FROM Projekt WHERE label = 'Test'" Output: {{"intent": "QUERY", "entity": null, "parameters": {{"queryText": "SELECT * FROM Projekt WHERE label = 'Test'", "queryType": "sql"}}, "confidence": 1.0}} - Input: "Lösche Parzelle ABC" Output: {{"intent": "DELETE", "entity": "Parzelle", "parameters": {{"id": "ABC"}}, "confidence": 0.9}} IMPORTANT EXTRACTION RULES: 1. For CREATE operations, extract ALL mentioned data fields from the user input 2. Use kontextInformationen array for metadata that doesn't have dedicated fields (EGRID, BFS numbers, area, coordinates, etc.) 3. Each kontextInformationen item MUST have exactly two fields: 'thema' (topic/subject) and 'inhalt' (content as text string) 4. Format kontextInformationen values as readable text strings, including units (e.g., "6514.99 m²", "X: 123, Y: 456") 5. Match field names EXACTLY to the entity definition above 6. Convert data types correctly (strings for text, numbers for numeric values) 7. Extract coordinates, areas, and other numeric values from text 8. When multiple values are mentioned for the same concept (ID, Nummer, Name), use the most relevant one for 'label' and put alternatives in parzellenAliasTags """ try: # Use AI planning call for structured JSON response response = await aiService.callAiPlanning( prompt=intentPrompt, debugType="intentanalysis" ) # Extract JSON from response (handles markdown code blocks) jsonStart = response.find('{') jsonEnd = response.rfind('}') + 1 if jsonStart == -1 or jsonEnd == 0: raise ValueError("No JSON found in AI response") jsonStr = response[jsonStart:jsonEnd] # Parse JSON response intentData = json.loads(jsonStr) # Validate response structure if "intent" not in intentData: raise ValueError("Invalid intent analysis response: missing 'intent' field") # Ensure parameters exists if "parameters" not in intentData: intentData["parameters"] = {} logger.debug(f"Parsed intent analysis: {intentData}") return intentData except json.JSONDecodeError as e: logger.error(f"Failed to parse AI intent analysis response: {e}") logger.error(f"Raw response: {response}") raise ValueError(f"AI returned invalid JSON: {str(e)}") except Exception as e: logger.error(f"Error analyzing user intent: {str(e)}", exc_info=True) raise async def executeIntentBasedOperation( currentUser: User, mandateId: str, intent: str, entity: Optional[str], parameters: Dict[str, Any], ) -> Dict[str, Any]: """ Execute CRUD operation based on analyzed intent. Args: currentUser: Current authenticated user mandateId: Mandate context (from RequestContext / X-Mandate-Id header) intent: Intent from AI analysis (CREATE, READ, UPDATE, DELETE, QUERY) entity: Entity type from AI analysis parameters: Extracted parameters from AI analysis Returns: Operation result Note: - Supports CREATE, READ, UPDATE, DELETE, QUERY intents - Entity types: Projekt, Parzelle, Gemeinde, Kanton, Land, Dokument """ try: logger.info(f"Executing intent-based operation: intent={intent}, entity={entity}") logger.debug(f"Parameters: {parameters}") if intent == "QUERY": # Execute database query directly (stateless) queryText = parameters.get("queryText", "") if not queryText: raise ValueError("QUERY intent requires queryText in parameters") result = await executeDirectQuery( currentUser=currentUser, mandateId=mandateId, queryText=queryText, parameters=parameters.get("queryParameters"), ) return result elif intent == "CREATE": # Create new entity realEstateInterface = getRealEstateInterface(currentUser, mandateId=mandateId) if entity == "Projekt": # Create Projekt from parameters projekt = Projekt( mandateId=mandateId, label=parameters.get("label", ""), statusProzess=StatusProzess(parameters.get("statusProzess", "EINGANG")) if parameters.get("statusProzess") else None, ) created = realEstateInterface.createProjekt(projekt) return { "operation": "CREATE", "entity": "Projekt", "result": created.model_dump() } elif entity == "Parzelle": # Create Parzelle from parameters # Import Kontext for kontextInformationen from modules.features.realestate.datamodelFeatureRealEstate import Kontext, GeoPolylinie # Build parzelle data with all extracted parameters parzelle_data = { "mandateId": mandateId, "label": parameters.get("label", ""), } # Add optional fields if present optional_fields = [ "parzellenAliasTags", "eigentuemerschaft", "strasseNr", "plz", "bauzone", "az", "bz", "vollgeschossZahl", "anrechenbarDachgeschoss", "anrechenbarUntergeschoss", "gebaeudehoeheMax", "kontextGemeinde", "regelnGrenzabstand", "regelnMehrlaengenzuschlag", "regelnMehrhoehenzuschlag", "parzelleBebaut", "parzelleErschlossen", "parzelleHanglage", "laermschutzzone", "hochwasserschutzzone", "grundwasserschutzzone" ] for field in optional_fields: if field in parameters and parameters[field] is not None: parzelle_data[field] = parameters[field] # Handle complex objects if "perimeter" in parameters and parameters["perimeter"]: parzelle_data["perimeter"] = GeoPolylinie(**parameters["perimeter"]) elif "kontextGemeinde" in parameters and parameters.get("kontextGemeinde"): # Try to fetch polygon from Swisstopo if gemeinde and parzellen_nr are available gemeinde = parameters.get("kontextGemeinde") parzellen_nr = parameters.get("label") or parameters.get("parzellen_nr") or parameters.get("parzellennummer") if gemeinde and parzellen_nr: logger.info(f"Attempting to fetch polygon from Swisstopo for {gemeinde} {parzellen_nr}") try: # Try to resolve gemeinde name if it's an ID gemeinde_name = gemeinde if len(gemeinde) == 36: # UUID format # Try to get gemeinde name from interface (realEstateInterface already initialized above) gemeinde_obj = realEstateInterface.getGemeinde(gemeinde) if gemeinde_obj: gemeinde_name = gemeinde_obj.label polygon_data = await fetch_parcel_polygon_from_swisstopo( gemeinde=gemeinde_name, parzellen_nr=str(parzellen_nr), sr=2056 ) if polygon_data: parzelle_data["perimeter"] = GeoPolylinie(**polygon_data) logger.info(f"Successfully fetched and set perimeter from Swisstopo") else: logger.warning(f"Could not fetch polygon from Swisstopo for {gemeinde_name} {parzellen_nr}") except Exception as e: logger.warning(f"Error fetching polygon from Swisstopo (continuing without): {e}") if "baulinie" in parameters and parameters["baulinie"]: parzelle_data["baulinie"] = GeoPolylinie(**parameters["baulinie"]) # Handle kontextInformationen (convert dicts to Kontext objects) if "kontextInformationen" in parameters and parameters["kontextInformationen"]: kontext_list = [] for kontext_data in parameters["kontextInformationen"]: if isinstance(kontext_data, dict): # Ensure only thema and inhalt are passed (Kontext model only has these fields) kontext_obj = Kontext( thema=kontext_data.get("thema", ""), inhalt=kontext_data.get("inhalt", "") ) kontext_list.append(kontext_obj) else: kontext_list.append(kontext_data) parzelle_data["kontextInformationen"] = kontext_list parzelle = Parzelle(**parzelle_data) created = realEstateInterface.createParzelle(parzelle) logger.info(f"Created Parzelle '{created.label}' with {len(created.kontextInformationen)} context items") return { "operation": "CREATE", "entity": "Parzelle", "result": created.model_dump() } elif entity == "Gemeinde": # Create Gemeinde from parameters from modules.features.realestate.datamodelFeatureRealEstate import Gemeinde gemeinde = Gemeinde( mandateId=mandateId, label=parameters.get("label", ""), id_kanton=parameters.get("id_kanton"), plz=parameters.get("plz"), ) created = realEstateInterface.createGemeinde(gemeinde) return { "operation": "CREATE", "entity": "Gemeinde", "result": created.model_dump() } elif entity == "Kanton": # Create Kanton from parameters from modules.features.realestate.datamodelFeatureRealEstate import Kanton kanton = Kanton( mandateId=mandateId, label=parameters.get("label", ""), id_land=parameters.get("id_land"), abk=parameters.get("abk"), ) created = realEstateInterface.createKanton(kanton) return { "operation": "CREATE", "entity": "Kanton", "result": created.model_dump() } elif entity == "Land": # Create Land from parameters from modules.features.realestate.datamodelFeatureRealEstate import Land land = Land( mandateId=mandateId, label=parameters.get("label", ""), abk=parameters.get("abk"), ) created = realEstateInterface.createLand(land) return { "operation": "CREATE", "entity": "Land", "result": created.model_dump() } elif entity == "Dokument": # Create Dokument from parameters from modules.features.realestate.datamodelFeatureRealEstate import Dokument dokument = Dokument( mandateId=mandateId, label=parameters.get("label", ""), dokumentReferenz=parameters.get("dokumentReferenz", ""), versionsbezeichnung=parameters.get("versionsbezeichnung"), dokumentTyp=parameters.get("dokumentTyp"), quelle=parameters.get("quelle"), mimeType=parameters.get("mimeType"), ) created = realEstateInterface.createDokument(dokument) return { "operation": "CREATE", "entity": "Dokument", "result": created.model_dump() } else: raise ValueError(f"CREATE operation not supported for entity: {entity}") elif intent == "READ": # Read entities realEstateInterface = getRealEstateInterface(currentUser) if entity == "Projekt": projektId = parameters.get("id") if projektId: # Get single Projekt by ID projekt = realEstateInterface.getProjekt(projektId) if not projekt: raise ValueError(f"Projekt {projektId} not found") return { "operation": "READ", "entity": "Projekt", "result": projekt.model_dump() } else: # List all Projekte (with optional filters) # Validate filter fields against Projekt model validProjektFields = {"id", "mandateId", "label", "statusProzess"} recordFilter = { k: v for k, v in parameters.items() if k != "id" and k in validProjektFields } # Handle location_filter specially (filter projects by parcel location) location_filter = parameters.get("location_filter") # Get all projects first projekte = realEstateInterface.getProjekte(recordFilter=recordFilter if recordFilter else None) # If location filter is present, filter by parcels in that location if location_filter: logger.info(f"Filtering projects by location: {location_filter}") # Try to resolve location to Gemeinde ID for UUID comparison location_id = None try: # Check if it's already a UUID import re uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.IGNORECASE) if not uuid_pattern.match(location_filter): # Try to resolve name to ID gemeinde_records = realEstateInterface.getGemeinden(recordFilter={"label": location_filter}) if gemeinde_records: location_id = gemeinde_records[0].id logger.debug(f"Resolved location '{location_filter}' to ID '{location_id}'") except Exception as e: logger.debug(f"Could not resolve location filter: {e}") filtered_projekte = [] for projekt in projekte: # Check if any parcel in the project matches the location for parzelle in projekt.parzellen: # Check kontextGemeinde (both UUID and string), plz, or strasseNr for location match location_lower = location_filter.lower() matches = False # Check if kontextGemeinde matches (as UUID or string) if parzelle.kontextGemeinde: if (parzelle.kontextGemeinde == location_id or # UUID match parzelle.kontextGemeinde == location_filter or # Exact match location_lower in parzelle.kontextGemeinde.lower()): # Partial string match matches = True # Check PLZ or address if not matches and ( (parzelle.plz and location_lower in parzelle.plz) or (parzelle.strasseNr and location_lower in parzelle.strasseNr.lower()) ): matches = True if matches: filtered_projekte.append(projekt) break # Found a matching parcel, no need to check more projekte = filtered_projekte logger.info(f"Found {len(projekte)} projects in location '{location_filter}'") return { "operation": "READ", "entity": "Projekt", "result": [p.model_dump() for p in projekte], "count": len(projekte) } elif entity == "Parzelle": parzelleId = parameters.get("id") if parzelleId: # Get single Parzelle by ID parzelle = realEstateInterface.getParzelle(parzelleId) if not parzelle: raise ValueError(f"Parzelle {parzelleId} not found") return { "operation": "READ", "entity": "Parzelle", "result": parzelle.model_dump() } else: # List all Parzellen (with optional filters) # Validate filter fields against Parzelle model # Note: kontextKanton and kontextLand are NOT direct fields on Parzelle # Parzelle links to Gemeinde, Gemeinde links to Kanton, Kanton links to Land validParzelleFields = { "id", "mandateId", "label", "strasseNr", "plz", "kontextGemeinde", # Only direct link - Gemeinde → Kanton → Land "bauzone", "az", "bz", "vollgeschossZahl", "gebaeudehoeheMax", "laermschutzzone", "hochwasserschutzzone", "grundwasserschutzzone", "parzelleBebaut", "parzelleErschlossen", "parzelleHanglage" } recordFilter = { k: v for k, v in parameters.items() if k != "id" and k in validParzelleFields } # Warn about invalid fields invalidFields = {k: v for k, v in parameters.items() if k not in validParzelleFields and k != "id"} if invalidFields: logger.warning(f"Invalid filter fields for Parzelle ignored: {list(invalidFields.keys())}") parzellen = realEstateInterface.getParzellen(recordFilter=recordFilter if recordFilter else None) # Debug logging for empty results if not parzellen and recordFilter: logger.info(f"No Parzellen found matching filter: {recordFilter}") # Get total count to help debug all_parzellen = realEstateInterface.getParzellen(recordFilter=None) logger.info(f"Total Parzellen in database: {len(all_parzellen)}") if all_parzellen: # Show some sample kontextGemeinde values sample_gemeinden = set() for p in all_parzellen[:10]: if p.kontextGemeinde: sample_gemeinden.add(p.kontextGemeinde) logger.info(f"Sample kontextGemeinde values in database: {sample_gemeinden}") return { "operation": "READ", "entity": "Parzelle", "result": [p.model_dump() for p in parzellen], "count": len(parzellen) } elif entity == "Gemeinde": from modules.features.realestate.datamodelFeatureRealEstate import Gemeinde gemeindeId = parameters.get("id") if gemeindeId: gemeinde = realEstateInterface.getGemeinde(gemeindeId) if not gemeinde: raise ValueError(f"Gemeinde {gemeindeId} not found") return { "operation": "READ", "entity": "Gemeinde", "result": gemeinde.model_dump() } else: recordFilter = {k: v for k, v in parameters.items() if k != "id"} gemeinden = realEstateInterface.getGemeinden(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Gemeinde", "result": [g.model_dump() for g in gemeinden], "count": len(gemeinden) } elif entity == "Kanton": from modules.features.realestate.datamodelFeatureRealEstate import Kanton kantonId = parameters.get("id") if kantonId: kanton = realEstateInterface.getKanton(kantonId) if not kanton: raise ValueError(f"Kanton {kantonId} not found") return { "operation": "READ", "entity": "Kanton", "result": kanton.model_dump() } else: recordFilter = {k: v for k, v in parameters.items() if k != "id"} kantone = realEstateInterface.getKantone(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Kanton", "result": [k.model_dump() for k in kantone], "count": len(kantone) } elif entity == "Land": from modules.features.realestate.datamodelFeatureRealEstate import Land landId = parameters.get("id") if landId: land = realEstateInterface.getLand(landId) if not land: raise ValueError(f"Land {landId} not found") return { "operation": "READ", "entity": "Land", "result": land.model_dump() } else: recordFilter = {k: v for k, v in parameters.items() if k != "id"} laender = realEstateInterface.getLaender(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Land", "result": [l.model_dump() for l in laender], "count": len(laender) } elif entity == "Dokument": from modules.features.realestate.datamodelFeatureRealEstate import Dokument dokumentId = parameters.get("id") if dokumentId: dokument = realEstateInterface.getDokument(dokumentId) if not dokument: raise ValueError(f"Dokument {dokumentId} not found") return { "operation": "READ", "entity": "Dokument", "result": dokument.model_dump() } else: recordFilter = {k: v for k, v in parameters.items() if k != "id"} dokumente = realEstateInterface.getDokumente(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Dokument", "result": [d.model_dump() for d in dokumente], "count": len(dokumente) } else: raise ValueError(f"READ operation not supported for entity: {entity}") elif intent == "UPDATE": # Update existing entity realEstateInterface = getRealEstateInterface(currentUser) if entity == "Projekt": projektId = parameters.get("id") if not projektId: raise ValueError("UPDATE operation requires entity ID") # Get existing projekt projekt = realEstateInterface.getProjekt(projektId) if not projekt: raise ValueError(f"Projekt {projektId} not found") # Update fields updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateProjekt(projektId, updateData) return { "operation": "UPDATE", "entity": "Projekt", "result": updated.model_dump() } elif entity == "Parzelle": parzelleId = parameters.get("id") if not parzelleId: raise ValueError("UPDATE operation requires entity ID") # Get existing parzelle parzelle = realEstateInterface.getParzelle(parzelleId) if not parzelle: raise ValueError(f"Parzelle {parzelleId} not found") # Update fields updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateParzelle(parzelleId, updateData) return { "operation": "UPDATE", "entity": "Parzelle", "result": updated.model_dump() } elif entity == "Gemeinde": from modules.features.realestate.datamodelFeatureRealEstate import Gemeinde gemeindeId = parameters.get("id") if not gemeindeId: raise ValueError("UPDATE operation requires entity ID") gemeinde = realEstateInterface.getGemeinde(gemeindeId) if not gemeinde: raise ValueError(f"Gemeinde {gemeindeId} not found") updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateGemeinde(gemeindeId, updateData) return { "operation": "UPDATE", "entity": "Gemeinde", "result": updated.model_dump() } elif entity == "Kanton": from modules.features.realestate.datamodelFeatureRealEstate import Kanton kantonId = parameters.get("id") if not kantonId: raise ValueError("UPDATE operation requires entity ID") kanton = realEstateInterface.getKanton(kantonId) if not kanton: raise ValueError(f"Kanton {kantonId} not found") updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateKanton(kantonId, updateData) return { "operation": "UPDATE", "entity": "Kanton", "result": updated.model_dump() } elif entity == "Land": from modules.features.realestate.datamodelFeatureRealEstate import Land landId = parameters.get("id") if not landId: raise ValueError("UPDATE operation requires entity ID") land = realEstateInterface.getLand(landId) if not land: raise ValueError(f"Land {landId} not found") updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateLand(landId, updateData) return { "operation": "UPDATE", "entity": "Land", "result": updated.model_dump() } elif entity == "Dokument": from modules.features.realestate.datamodelFeatureRealEstate import Dokument dokumentId = parameters.get("id") if not dokumentId: raise ValueError("UPDATE operation requires entity ID") dokument = realEstateInterface.getDokument(dokumentId) if not dokument: raise ValueError(f"Dokument {dokumentId} not found") updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateDokument(dokumentId, updateData) return { "operation": "UPDATE", "entity": "Dokument", "result": updated.model_dump() } else: raise ValueError(f"UPDATE operation not supported for entity: {entity}") elif intent == "DELETE": # Delete entity realEstateInterface = getRealEstateInterface(currentUser) if entity == "Projekt": projektId = parameters.get("id") if not projektId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteProjekt(projektId) return { "operation": "DELETE", "entity": "Projekt", "success": success } elif entity == "Parzelle": parzelleId = parameters.get("id") if not parzelleId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteParzelle(parzelleId) return { "operation": "DELETE", "entity": "Parzelle", "success": success } elif entity == "Gemeinde": from modules.features.realestate.datamodelFeatureRealEstate import Gemeinde gemeindeId = parameters.get("id") if not gemeindeId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteGemeinde(gemeindeId) return { "operation": "DELETE", "entity": "Gemeinde", "success": success } elif entity == "Kanton": from modules.features.realestate.datamodelFeatureRealEstate import Kanton kantonId = parameters.get("id") if not kantonId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteKanton(kantonId) return { "operation": "DELETE", "entity": "Kanton", "success": success } elif entity == "Land": from modules.features.realestate.datamodelFeatureRealEstate import Land landId = parameters.get("id") if not landId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteLand(landId) return { "operation": "DELETE", "entity": "Land", "success": success } elif entity == "Dokument": from modules.features.realestate.datamodelFeatureRealEstate import Dokument dokumentId = parameters.get("id") if not dokumentId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteDokument(dokumentId) return { "operation": "DELETE", "entity": "Dokument", "success": success } else: raise ValueError(f"DELETE operation not supported for entity: {entity}") else: raise ValueError(f"Unknown intent: {intent}") except Exception as e: logger.error(f"Error executing intent-based operation: {str(e)}", exc_info=True) raise # ===== Project Creation with Parcel Data ===== async def create_project_with_parcel_data( currentUser: User, mandateId: str, projekt_label: str, parzellen_data: List[Dict[str, Any]], status_prozess: Optional[str] = None, ) -> Dict[str, Any]: """ Create a Projekt with one or more Parzellen from provided parcel data. Args: currentUser: Current authenticated user mandateId: Mandate context (from RequestContext / X-Mandate-Id header) projekt_label: Label for the Projekt parzellen_data: List of dictionaries containing parcel information from request status_prozess: Optional project status (defaults to "Eingang") Returns: Dictionary containing created Projekt and list of Parzellen Raises: HTTPException: If Gemeinde or Kanton not found, or validation fails """ try: logger.info(f"Creating project '{projekt_label}' with {len(parzellen_data)} parcel(s) for user {currentUser.id}") # Get interface with mandate context realEstateInterface = getRealEstateInterface(currentUser, mandateId=mandateId) # Validate required fields if not projekt_label: raise ValueError("Projekt label is required") if not parzellen_data or len(parzellen_data) == 0: raise ValueError("At least one Parzelle data is required") # Validate all parcels have required fields for idx, parzelle_data in enumerate(parzellen_data): if not parzelle_data.get("perimeter"): raise ValueError(f"Parzelle {idx + 1} perimeter is required") # Helper function to convert GeoJSON geometry to GeoPolylinie (defined early for use in geometry collection) def convert_geojson_to_geopolylinie(geometry_data: Dict[str, Any]) -> Optional[GeoPolylinie]: """Convert GeoJSON geometry to GeoPolylinie format.""" if not geometry_data: return None # Handle nested geometry structure (geometry.geometry.coordinates) if "geometry" in geometry_data: geometry_data = geometry_data["geometry"] geometry_type = geometry_data.get("type") coordinates = geometry_data.get("coordinates") if not coordinates or geometry_type != "Polygon": return None # Extract outer ring (first array of coordinates) if not coordinates or len(coordinates) == 0: return None ring = coordinates[0] # Outer ring # Convert coordinates to GeoPunkt list punkte = [] for coord in ring: if len(coord) >= 2: punkt = GeoPunkt( koordinatensystem="LV95", x=float(coord[0]), y=float(coord[1]), z=float(coord[2]) if len(coord) > 2 else None ) punkte.append(punkt) if not punkte: return None return GeoPolylinie( closed=True, punkte=punkte ) # First pass: Collect all parcel geometries for neighbor filtering # Convert all perimeters to GeoPolylinie format all_parcel_geometries = [] for parzelle_data in parzellen_data: perimeter = parzelle_data.get("perimeter") if perimeter: # Convert to GeoPolylinie if needed if isinstance(perimeter, dict): if "punkte" in perimeter and "closed" in perimeter: try: geo_perimeter = GeoPolylinie(**perimeter) all_parcel_geometries.append(geo_perimeter) except Exception as e: logger.warning(f"Error converting perimeter to GeoPolylinie: {e}") else: # Try GeoJSON conversion converted = convert_geojson_to_geopolylinie(perimeter) if converted: all_parcel_geometries.append(converted) elif isinstance(perimeter, GeoPolylinie): all_parcel_geometries.append(perimeter) # Process all parcels - create each one or use existing created_parzellen = [] parcel_perimeters = [] # Collect all parcel perimeters for baulinie calculation for idx, parzelle_data in enumerate(parzellen_data): logger.info(f"Processing Parzelle {idx + 1}/{len(parzellen_data)}") # Determine parcel label for uniqueness check parcel_label = parzelle_data.get("id") or parzelle_data.get("number") or parzelle_data.get("label") or "Unknown" # Check if Parzelle with this label already exists existing_parzellen = realEstateInterface.getParzellen( recordFilter={"label": parcel_label, "mandateId": mandateId} ) if existing_parzellen and len(existing_parzellen) > 0: # Parzelle already exists - use existing one existing_parzelle = existing_parzellen[0] logger.info(f"Parzelle with label '{parcel_label}' already exists (ID: {existing_parzelle.id}), reusing it") # Collect perimeter for baulinie calculation if existing_parzelle.perimeter: parcel_perimeters.append(existing_parzelle.perimeter) # Add to list of created parcels (actually existing) created_parzellen.append(existing_parzelle) continue # Skip creation, use existing # Parzelle does not exist - create new one logger.info(f"Parzelle with label '{parcel_label}' does not exist, creating new one") # Resolve Gemeinde and Kanton for this parcel (create if not found) gemeinde_id = None canton_abk = parzelle_data.get("canton") municipality_name = parzelle_data.get("municipality_name") logger.debug(f"Resolving Gemeinde/Kanton: canton='{canton_abk}', municipality='{municipality_name}'") if municipality_name and canton_abk: # Mapping of canton abbreviations to full names canton_names = { "ZH": "Zürich", "BE": "Bern", "LU": "Luzern", "UR": "Uri", "SZ": "Schwyz", "OW": "Obwalden", "NW": "Nidwalden", "GL": "Glarus", "ZG": "Zug", "FR": "Freiburg", "SO": "Solothurn", "BS": "Basel-Stadt", "BL": "Basel-Landschaft", "SH": "Schaffhausen", "AR": "Appenzell Ausserrhoden", "AI": "Appenzell Innerrhoden", "SG": "St. Gallen", "GR": "Graubünden", "AG": "Aargau", "TG": "Thurgau", "TI": "Tessin", "VD": "Waadt", "VS": "Wallis", "NE": "Neuenburg", "GE": "Genf", "JU": "Jura" } # First, ensure Land "Schweiz" exists logger.debug("Ensuring Land 'Schweiz' exists") laender = realEstateInterface.getLaender(recordFilter={"label": "Schweiz"}) if not laender: logger.info("Creating Land 'Schweiz'") land = Land( mandateId=mandateId, label="Schweiz", abk="CH" ) land = realEstateInterface.createLand(land) logger.info(f"Created Land 'Schweiz' with ID: {land.id}") else: land = laender[0] logger.debug(f"Found Land 'Schweiz' with ID: {land.id}") # Then, lookup or create Kanton logger.debug(f"Looking up Kanton with abk='{canton_abk}'") kantone = realEstateInterface.getKantone(recordFilter={"abk": canton_abk}) logger.debug(f"Found {len(kantone)} Kanton(e) with abk='{canton_abk}'") if not kantone: logger.info(f"Kanton '{canton_abk}' not found, creating it") kanton_label = canton_names.get(canton_abk, canton_abk) # Use mapping or fallback to abk kanton = Kanton( mandateId=mandateId, label=kanton_label, abk=canton_abk, id_land=land.id ) kanton = realEstateInterface.createKanton(kanton) logger.info(f"Created Kanton '{kanton_label}' ({canton_abk}) with ID: {kanton.id}") else: kanton = kantone[0] logger.debug(f"Found Kanton: ID={kanton.id}, Label={kanton.label}, abk={kanton.abk}") # Then, lookup or create Gemeinde logger.debug(f"Looking up Gemeinde with label='{municipality_name}' and id_kanton='{kanton.id}'") gemeinden = realEstateInterface.getGemeinden( recordFilter={"label": municipality_name, "id_kanton": kanton.id} ) logger.debug(f"Found {len(gemeinden)} Gemeinde(n) with label='{municipality_name}' and id_kanton='{kanton.id}'") if not gemeinden: logger.info(f"Gemeinde '{municipality_name}' in Kanton '{canton_abk}' not found, creating it") gemeinde = Gemeinde( mandateId=mandateId, label=municipality_name, id_kanton=kanton.id, plz=parzelle_data.get("plz") # Use PLZ directly from Swiss Topo API ) gemeinde = realEstateInterface.createGemeinde(gemeinde) logger.info(f"Created Gemeinde '{municipality_name}' with ID: {gemeinde.id}") else: gemeinde = gemeinden[0] logger.debug(f"Found Gemeinde: ID={gemeinde.id}, Label={gemeinde.label}") gemeinde_id = gemeinde.id logger.info(f"Resolved Gemeinde '{municipality_name}' to ID '{gemeinde_id}'") else: logger.warning(f"Missing Gemeinde/Kanton data: municipality_name={municipality_name}, canton={canton_abk}") # Build parzellenAliasTags alias_tags = [] if parzelle_data.get("egrid"): alias_tags.append(parzelle_data["egrid"]) if parzelle_data.get("number") and parzelle_data["number"] != parzelle_data.get("id"): alias_tags.append(parzelle_data["number"]) # Extract address information from Swiss Topo API data # Each parcel should have its own address data from Swiss Topo API # The address comes from the parcel search API response for THIS specific parcel strasse_nr = None plz = None # Use address from Swiss Topo API - this is specific to THIS parcel # The address field contains the full address string from Swiss Topo address = parzelle_data.get("address") if address: # Swiss Topo provides full address string like "Street Number, PLZ City" # Parse to extract street and number (before comma) parts = address.split(",") if len(parts) >= 1: strasse_nr = parts[0].strip() # PLZ is provided separately by Swiss Topo API plz = parzelle_data.get("plz") # Log address info for debugging logger.debug(f"Parzelle {idx + 1} address data: strasse_nr='{strasse_nr}', plz='{plz}', full_address='{address}'") # If no address found, log warning but continue if not strasse_nr and not plz: logger.warning(f"No address data found for Parzelle {idx + 1} (label: {parcel_label})") # Build kontextInformationen kontext_items = [] if parzelle_data.get("egrid"): kontext_items.append(Kontext( thema="EGRID", inhalt=parzelle_data["egrid"] )) if parzelle_data.get("identnd"): kontext_items.append(Kontext( thema="IdentND", inhalt=parzelle_data["identnd"] )) if parzelle_data.get("area_m2"): kontext_items.append(Kontext( thema="Fläche", inhalt=f"{parzelle_data['area_m2']} m²" )) if parzelle_data.get("centroid"): centroid = parzelle_data["centroid"] kontext_items.append(Kontext( thema="Zentrum (LV95)", inhalt=f"X: {centroid.get('x')} m, Y: {centroid.get('y')} m (EPSG:2056)" )) if parzelle_data.get("geoportal_url"): kontext_items.append(Kontext( thema="Geoportal URL", inhalt=parzelle_data["geoportal_url"] )) if parzelle_data.get("municipality_code"): kontext_items.append(Kontext( thema="BFS-Nummer", inhalt=str(parzelle_data["municipality_code"]) )) # Handle adjacent parcels - filter out selected parcels geometrically adjacent_parcel_refs = [] if parzelle_data.get("adjacent_parcels"): # Filter neighbors to exclude selected parcels neighbors_to_filter = [] for adj_parcel in parzelle_data["adjacent_parcels"]: if isinstance(adj_parcel, dict): neighbors_to_filter.append(adj_parcel) elif isinstance(adj_parcel, str): neighbors_to_filter.append({"id": adj_parcel}) # Filter using geometry comparison if we have geometries if all_parcel_geometries and neighbors_to_filter: try: filtered_neighbors = filter_neighbor_parcels( neighbors_to_filter, all_parcel_geometries ) # Extract IDs from filtered neighbors for filtered_neighbor in filtered_neighbors: adj_id = filtered_neighbor.get("id") if adj_id: adjacent_parcel_refs.append({"id": adj_id}) except Exception as e: logger.warning(f"Error filtering neighbor parcels: {e}, including all neighbors") # Fallback: include all neighbors if filtering fails for adj_parcel in parzelle_data["adjacent_parcels"]: if isinstance(adj_parcel, dict): adj_id = adj_parcel.get("id") if adj_id: adjacent_parcel_refs.append({"id": adj_id}) elif isinstance(adj_parcel, str): adjacent_parcel_refs.append({"id": adj_parcel}) else: # No geometries available - include all neighbors for adj_parcel in parzelle_data["adjacent_parcels"]: if isinstance(adj_parcel, dict): adj_id = adj_parcel.get("id") if adj_id: adjacent_parcel_refs.append({"id": adj_id}) elif isinstance(adj_parcel, str): adjacent_parcel_refs.append({"id": adj_parcel}) # Convert perimeter to GeoPolylinie if needed perimeter = parzelle_data.get("perimeter") if isinstance(perimeter, dict): # Check if it's already in GeoPolylinie format (has punkte and closed) if "punkte" in perimeter and "closed" in perimeter: try: perimeter = GeoPolylinie(**perimeter) except Exception as e: raise ValueError(f"Invalid perimeter format: {str(e)}") else: # Try to convert from GeoJSON format converted = convert_geojson_to_geopolylinie(perimeter) if converted: perimeter = converted else: raise ValueError("Invalid perimeter format: cannot convert to GeoPolylinie") elif isinstance(perimeter, GeoPolylinie): # Already a GeoPolylinie instance, use as-is pass else: raise ValueError("Invalid perimeter type: must be dict or GeoPolylinie") # Extract baulinie from geometry if provided baulinie = None geometry = parzelle_data.get("geometry") logger.debug(f"Geometry present: {geometry is not None}") if geometry: logger.debug(f"Geometry type: {type(geometry)}, keys: {list(geometry.keys()) if isinstance(geometry, dict) else 'not a dict'}") baulinie = convert_geojson_to_geopolylinie(geometry) if baulinie: logger.info(f"Extracted baulinie from geometry with {len(baulinie.punkte)} points") else: logger.warning("Failed to extract baulinie from geometry") else: logger.warning("No geometry found in parzelle_data") # Build Parzelle data parzelle_create_data = { "mandateId": mandateId, "label": parcel_label, # Use the label we determined earlier for uniqueness check "parzellenAliasTags": alias_tags, "eigentuemerschaft": None, "strasseNr": strasse_nr, "plz": plz, "perimeter": perimeter, "baulinie": baulinie, "kontextGemeinde": gemeinde_id, "bauzone": None, "az": None, "bz": None, "vollgeschossZahl": None, "anrechenbarDachgeschoss": None, "anrechenbarUntergeschoss": None, "gebaeudehoeheMax": None, "regelnGrenzabstand": [], "regelnMehrlaengenzuschlag": [], "regelnMehrhoehenzuschlag": [], "parzelleBebaut": None, "parzelleErschlossen": None, "parzelleHanglage": None, "laermschutzzone": None, "hochwasserschutzzone": None, "grundwasserschutzzone": None, "parzellenNachbarschaft": adjacent_parcel_refs, "dokumente": [], "kontextInformationen": kontext_items, } # Create Parzelle instance logger.debug(f"Creating Parzelle with label: {parzelle_create_data.get('label')}") logger.debug(f"Parzelle mandateId: {parzelle_create_data.get('mandateId')}") logger.debug(f"Parzelle perimeter present: {parzelle_create_data.get('perimeter') is not None}") try: parzelle_instance = Parzelle(**parzelle_create_data) logger.debug(f"Parzelle instance created successfully with ID: {parzelle_instance.id}") except Exception as e: logger.error(f"Error creating Parzelle instance: {str(e)}", exc_info=True) raise # Create Parzelle in database try: logger.info(f"Calling createParzelle for Parzelle '{parzelle_instance.label}' (ID: {parzelle_instance.id})") logger.debug(f"Parzelle instance before createParzelle: {parzelle_instance.model_dump(mode='json', exclude={'perimeter', 'baulinie', 'kontextInformationen'})}") # Use model_dump with mode='json' to ensure nested Pydantic models are serialized parzelle_dict = parzelle_instance.model_dump(mode='json') logger.debug(f"Parzelle dict keys: {list(parzelle_dict.keys())}") # Create Parzelle using the interface, which will handle serialization created_parzelle = realEstateInterface.createParzelle(parzelle_instance) logger.info(f"createParzelle returned: ID={created_parzelle.id if created_parzelle else 'None'}, Label={created_parzelle.label if created_parzelle else 'None'}") # Verify Parzelle was created successfully if not created_parzelle: raise ValueError("Failed to create Parzelle - createParzelle returned None") if not created_parzelle.id: raise ValueError("Failed to create Parzelle - no ID returned") logger.info(f"Parzelle created with ID: {created_parzelle.id}") # Verify Parzelle exists in database by fetching it logger.debug(f"Verifying Parzelle {created_parzelle.id} exists in database...") verify_parzelle = realEstateInterface.getParzelle(created_parzelle.id) if not verify_parzelle: logger.error(f"Parzelle {created_parzelle.id} was not found in database after creation") # Try to get all Parzellen to see what's in the database all_parzellen = realEstateInterface.getParzellen(recordFilter=None) logger.error(f"Total Parzellen in database: {len(all_parzellen)}") if all_parzellen: logger.error(f"Sample Parzelle IDs: {[p.id for p in all_parzellen[:5]]}") raise ValueError(f"Parzelle {created_parzelle.id} was not found in database after creation") logger.info(f"Verified Parzelle {created_parzelle.id} exists in database") # Use the verified Parzelle from database to ensure it has all fields created_parzelle = verify_parzelle # Collect perimeter for baulinie calculation if created_parzelle.perimeter: parcel_perimeters.append(created_parzelle.perimeter) # Add to list of created parcels created_parzellen.append(created_parzelle) except Exception as e: logger.error(f"Error creating Parzelle {idx + 1}: {str(e)}", exc_info=True) raise if not created_parzellen: raise ValueError("No Parzellen were successfully created") logger.info(f"Successfully created {len(created_parzellen)} Parzelle(n)") # Calculate combined baulinie from all parcel perimeters project_baulinie = None if len(parcel_perimeters) > 0: try: if len(parcel_perimeters) == 1: # Single parcel - use its perimeter as baulinie project_baulinie = parcel_perimeters[0] logger.info("Using single parcel perimeter as baulinie") else: # Multiple parcels - combine geometries to create outer outline logger.info(f"Combining {len(parcel_perimeters)} parcel geometries to create baulinie") project_baulinie = combine_parcel_geometries(parcel_perimeters) logger.info(f"Created combined baulinie with {len(project_baulinie.punkte)} points") except Exception as e: logger.error(f"Error combining parcel geometries for baulinie: {e}", exc_info=True) # Fallback: use first parcel's perimeter if parcel_perimeters: project_baulinie = parcel_perimeters[0] logger.warning("Using first parcel perimeter as fallback baulinie") # Convert status_prozess to enum status_prozess_enum = None if status_prozess: try: # Try to convert string to enum if isinstance(status_prozess, str): status_prozess_enum = StatusProzess(status_prozess) elif isinstance(status_prozess, StatusProzess): status_prozess_enum = status_prozess except (ValueError, KeyError): logger.warning(f"Invalid statusProzess '{status_prozess}', using default 'Eingang'") status_prozess_enum = StatusProzess.EINGANG else: status_prozess_enum = StatusProzess.EINGANG # Create Projekt with combined baulinie # Use the verified Parzelle instance (from database) to ensure it has all fields properly set logger.debug(f"Preparing Projekt creation with baulinie: {project_baulinie is not None}") if project_baulinie: logger.debug(f"Baulinie has {len(project_baulinie.punkte)} points") # Use first parcel's perimeter for project perimeter (or combine if needed) project_perimeter = created_parzellen[0].perimeter if created_parzellen else None projekt_create_data = { "mandateId": mandateId, "label": projekt_label, "statusProzess": status_prozess_enum, "perimeter": project_perimeter, # Use first parcel perimeter as project perimeter "baulinie": project_baulinie, # Set baulinie from first parcel geometry "parzellen": created_parzellen, # Link all created Parzelle instances "dokumente": [], "kontextInformationen": [], } logger.debug(f"Projekt data prepared: label={projekt_label}, parzellen_count={len(projekt_create_data['parzellen'])}, baulinie={'present' if project_baulinie else 'None'}") try: projekt_instance = Projekt(**projekt_create_data) logger.debug(f"Projekt instance created successfully with ID: {projekt_instance.id}") except Exception as e: logger.error(f"Error creating Projekt instance: {str(e)}", exc_info=True) raise # Log before creation for debugging logger.debug(f"Creating Projekt with {len(projekt_instance.parzellen)} Parzelle(n)") if projekt_instance.parzellen: for idx, p in enumerate(projekt_instance.parzellen): logger.debug(f" Parzelle {idx}: ID={p.id}, Label={p.label}") logger.debug(f"Projekt baulinie before save: {projekt_instance.baulinie is not None}") if projekt_instance.baulinie: logger.debug(f"Projekt baulinie has {len(projekt_instance.baulinie.punkte)} points") try: created_projekt = realEstateInterface.createProjekt(projekt_instance) logger.info(f"Created Projekt '{created_projekt.label}' (ID: {created_projekt.id})") logger.debug(f"Created Projekt baulinie: {created_projekt.baulinie is not None}") except Exception as e: logger.error(f"Error calling createProjekt: {str(e)}", exc_info=True) raise # Verify Projekt was created if not created_projekt or not created_projekt.id: raise ValueError("Failed to create Projekt - no ID returned") # Verify Parzelle is linked in the created Projekt if not created_projekt.parzellen or len(created_projekt.parzellen) == 0: logger.warning(f"Projekt {created_projekt.id} created but no Parzellen linked") # Try to fetch the Projekt from database to see if Parzellen are there verify_projekt = realEstateInterface.getProjekt(created_projekt.id) if verify_projekt and verify_projekt.parzellen: logger.info(f"Parzellen found when fetching Projekt from database: {len(verify_projekt.parzellen)}") created_projekt = verify_projekt else: raise ValueError(f"Projekt {created_projekt.id} has no Parzellen linked after creation") else: logger.info(f"Projekt {created_projekt.id} successfully linked to {len(created_projekt.parzellen)} Parzelle(n)") # Log Parzelle details for idx, p in enumerate(created_projekt.parzellen): logger.debug(f" Linked Parzelle {idx}: ID={p.id if hasattr(p, 'id') else 'NO ID'}, Label={p.label if hasattr(p, 'label') else 'NO LABEL'}") return { "projekt": created_projekt.model_dump(), "parzellen": [p.model_dump() for p in created_parzellen], } except HTTPException: raise except Exception as e: logger.error(f"Error creating project with parcel data: {str(e)}", exc_info=True) raise # ===== BZO Information Extraction for Parcels ===== async def extract_bzo_information( currentUser: User, gemeinde: str, bauzone: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, total_area_m2: Optional[float] = None, parcels: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """ Extract BZO information from PDF documents for a specific Bauzone in a Gemeinde. Retrieves BZO documents for the specified Gemeinde, extracts content using langgraph workflow, filters by Bauzone, and uses AI to find relevant information. When total_area_m2 or parcels are provided, runs Machbarkeitsstudie for structured output. Args: currentUser: Current authenticated user gemeinde: Gemeinde name (e.g., "Zürich") or ID bauzone: Bauzone code (e.g., "W3", "W2/30") mandateId: Optional mandate ID for instance-scoped data (defaults to currentUser.mandateId) featureInstanceId: Optional feature instance ID for instance-scoped data total_area_m2: Optional total parcel area (m²) for Machbarkeitsstudie parcels: Optional list of parcel dicts; total area computed via compute_selection_summary if not total_area_m2 Returns: Dictionary containing: - bauzone, gemeinde, extracted_content, ai_summary, relevant_rules, documents_processed - machbarkeitsstudie: Structured Machbarkeitsstudie output when total_area_m2/parcels provided """ try: _mandateId = mandateId or (str(currentUser.mandateId) if currentUser.mandateId else None) logger.info(f"Extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}' (user: {currentUser.id}, mandate: {_mandateId})") # Get interfaces (instance-scoped when mandateId/featureInstanceId provided) realEstateInterface = getRealEstateInterface( currentUser, mandateId=_mandateId, featureInstanceId=featureInstanceId ) componentInterface = getComponentInterface( currentUser, mandateId=_mandateId, featureInstanceId=featureInstanceId ) # Get Gemeinde - try by ID first, then by label logger.debug(f"Attempting to retrieve Gemeinde '{gemeinde}' for mandate {_mandateId}") gemeinde_obj = realEstateInterface.getGemeinde(gemeinde) # If not found by ID, try searching by label if not gemeinde_obj: logger.debug(f"Gemeinde not found by ID, trying to search by label: {gemeinde}") record_filter = {"label": gemeinde} if _mandateId: record_filter["mandateId"] = _mandateId gemeinden_by_label = realEstateInterface.getGemeinden( recordFilter=record_filter ) if gemeinden_by_label and len(gemeinden_by_label) > 0: gemeinde_obj = gemeinden_by_label[0] logger.info(f"Found Gemeinde by label '{gemeinde}' with ID: {gemeinde_obj.id}") # If still not found: fetch only this Gemeinde from Swiss Topo and create it if not gemeinde_obj and _mandateId and featureInstanceId: logger.info(f"Gemeinde '{gemeinde}' not in DB - fetching from Swiss Topo (this Gemeinde only)") gemeinde_obj = await ensure_single_gemeinde( realEstateInterface, _mandateId, featureInstanceId, gemeinde_name=gemeinde ) if not gemeinde_obj: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Gemeinde '{gemeinde}' not found or not accessible" ) gemeinde_id = gemeinde_obj.id # Get BZO documents directly from Gemeinde's dokumente field bzo_documents = [] if gemeinde_obj.dokumente: for doc in gemeinde_obj.dokumente: # Handle both dict and object formats if isinstance(doc, dict): doc_id = doc.get("id") doc_typ = doc.get("dokumentTyp") else: doc_id = doc.id if hasattr(doc, "id") else None doc_typ = doc.dokumentTyp if hasattr(doc, "dokumentTyp") else None # Check if it's a BZO document type if doc_typ: # Handle enum, string, or dict formats if isinstance(doc_typ, DokumentTyp): is_bzo = doc_typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION] elif isinstance(doc_typ, str): is_bzo = doc_typ in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"] else: doc_typ_str = str(doc_typ) is_bzo = doc_typ_str in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"] if is_bzo: # Get full document object if doc_id: full_doc = realEstateInterface.getDokument(doc_id) if full_doc: bzo_documents.append(full_doc) else: logger.warning(f"Document {doc_id} referenced in Gemeinde but not found in database") # If no BZO documents: auto-fetch from Tavily, then retry if not bzo_documents and _mandateId and featureInstanceId: logger.info(f"No BZO documents for Gemeinde '{gemeinde_obj.label}' - fetching from web") fetched = await fetch_bzo_for_gemeinde( realEstateInterface, componentInterface, gemeinde_obj, _mandateId, featureInstanceId ) if fetched: # Reload Gemeinde to get updated dokumente gemeinde_obj = realEstateInterface.getGemeinde(gemeinde_obj.id) bzo_documents = [] if gemeinde_obj and gemeinde_obj.dokumente: for doc in gemeinde_obj.dokumente: if isinstance(doc, dict): doc_id = doc.get("id") doc_typ = doc.get("dokumentTyp") else: doc_id = doc.id if hasattr(doc, "id") else None doc_typ = doc.dokumentTyp if hasattr(doc, "dokumentTyp") else None if doc_typ: if isinstance(doc_typ, DokumentTyp): is_bzo = doc_typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION] elif isinstance(doc_typ, str): is_bzo = doc_typ in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"] else: is_bzo = str(doc_typ) in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"] if is_bzo and doc_id: full_doc = realEstateInterface.getDokument(doc_id) if full_doc: bzo_documents.append(full_doc) if not bzo_documents: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"No BZO documents found for Gemeinde '{gemeinde_obj.label}'" ) logger.info(f"Found {len(bzo_documents)} BZO document(s) for Gemeinde '{gemeinde_obj.label}'") # Initialize document retriever document_retriever = BZODocumentRetriever(realEstateInterface, componentInterface) # Extract content from all documents all_extracted_content = { "articles": [], "zones": [], "rules": [], "zone_parameter_tables": [], "errors": [], "warnings": [] } documents_processed = [] for dokument in bzo_documents: try: logger.info(f"Processing document {dokument.id}: {dokument.label}") # Retrieve PDF content pdf_bytes = document_retriever.retrieve_pdf_content(dokument) if not pdf_bytes: logger.warning(f"Could not retrieve PDF content for dokument {dokument.id}") all_extracted_content["warnings"].append( f"Could not retrieve PDF content for document '{dokument.label}'" ) continue # Run extraction using langgraph workflow extraction_result = run_extraction( pdf_bytes=pdf_bytes, pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}", dokument_id=dokument.id ) # Combine results all_extracted_content["articles"].extend(extraction_result.get("articles", [])) all_extracted_content["zones"].extend(extraction_result.get("zones", [])) all_extracted_content["rules"].extend(extraction_result.get("rules", [])) all_extracted_content["zone_parameter_tables"].extend(extraction_result.get("zone_parameter_tables", [])) all_extracted_content["errors"].extend(extraction_result.get("errors", [])) all_extracted_content["warnings"].extend(extraction_result.get("warnings", [])) documents_processed.append({ "id": dokument.id, "label": dokument.label, "dokumentTyp": dokument.dokumentTyp.value if dokument.dokumentTyp else None }) except Exception as e: logger.error(f"Error processing document {dokument.id}: {str(e)}", exc_info=True) all_extracted_content["errors"].append( f"Error processing document '{dokument.label}': {str(e)}" ) continue # Filter rules by Bauzone - only rules explicitly associated with this zone relevant_rules = filter_rules_by_bauzone( all_extracted_content["rules"], bauzone ) logger.info(f"Extracting for Bauzone {bauzone}: {len(relevant_rules)} zone-specific rules, " f"{len([t for t in all_extracted_content.get('zone_parameter_tables', []) if bauzone.upper() in str(t.get('zones', [])).upper()])} tables with zone data") # Filter zones by Bauzone relevant_zones = filter_zones_by_bauzone( all_extracted_content["zones"], bauzone ) # Filter articles that mention the Bauzone relevant_articles = filter_articles_by_bauzone( all_extracted_content.get("articles", []), bauzone ) # Compute total_area_m2 from parcels if not provided _total_area_m2 = total_area_m2 if _total_area_m2 is None and parcels: selection_summary = compute_selection_summary(parcels) _total_area_m2 = selection_summary.get("total_area_m2") or 0.0 # Extract BZO parameters for Wohnzone via LangGraph + LLM (bullet list with sources) bzo_params_result = None try: services = getServices( currentUser, workflow=None, mandateId=_mandateId, featureInstanceId=featureInstanceId ) ai_service = services.ai bzo_params_result = await run_bzo_params_extraction( extracted_content=all_extracted_content, bauzone=bauzone, ai_service=ai_service, gemeinde=gemeinde_obj.label, relevant_rules=relevant_rules, relevant_articles=relevant_articles, total_area_m2=_total_area_m2, ) except Exception as me: logger.warning(f"BZO parameter extraction failed: {me}", exc_info=True) all_extracted_content["warnings"] = all_extracted_content.get("warnings", []) + [ f"BZO-Parameter konnten nicht extrahiert werden: {str(me)}" ] # Use AI to generate summary and find additional information ai_summary = await generate_bauzone_ai_summary( currentUser=currentUser, bauzone=bauzone, gemeinde=gemeinde_obj.label, extracted_content=all_extracted_content, relevant_rules=relevant_rules, relevant_zones=relevant_zones, mandateId=_mandateId, featureInstanceId=featureInstanceId, ) # Build unified summary that includes zones and articles unified_summary = ai_summary # Append zone and article information to the summary if not already included # The AI should have integrated this, but we add it as backup if needed summary_lower = unified_summary.lower() # Check if zones are mentioned in summary zones_mentioned = any(zone.get("zone_code", "").upper() in summary_lower for zone in relevant_zones) if not zones_mentioned and relevant_zones: unified_summary += "\n\n=== ZONENDEFINITIONEN ===\n" for zone in relevant_zones: zone_code = zone.get("zone_code", "") zone_name = zone.get("zone_name", "") zone_category = zone.get("zone_category", "") geschosszahl = zone.get("geschosszahl") gewerbeerleichterung = zone.get("gewerbeerleichterung", False) page_num = zone.get("page", 0) source_article = zone.get("source_article", "") zone_info = f"{zone_code}: {zone_name}" if zone_category: zone_info += f"\nKategorie: {zone_category}" if geschosszahl: zone_info += f"\nGeschosszahl: {geschosszahl}" if gewerbeerleichterung: zone_info += "\nGewerbeerleichterung: Ja" if source_article: zone_info += f"\nQuelle: {source_article} (Seite {page_num})" unified_summary += zone_info + "\n\n" # Check if articles are mentioned in summary articles_mentioned = any(article.get("article_label", "") in summary_lower for article in relevant_articles) if not articles_mentioned and relevant_articles: unified_summary += "\n\n=== RELEVANTE ARTIKEL ===\n" for article in relevant_articles: article_label = article.get("article_label", "") article_title = article.get("article_title", "") article_text = article.get("text", "") page_start = article.get("page_start", 0) page_end = article.get("page_end", 0) page_range = f"Seite {page_start}" if page_start == page_end else f"Seiten {page_start}-{page_end}" unified_summary += f"{article_label}" if article_title: unified_summary += f": {article_title}" unified_summary += f" ({page_range})\n" # Include first 500 chars of article text if article_text: preview = article_text[:500] + "..." if len(article_text) > 500 else article_text unified_summary += f"{preview}\n\n" return { "bauzone": bauzone, "gemeinde": { "id": gemeinde_obj.id, "label": gemeinde_obj.label, "plz": gemeinde_obj.plz }, "extracted_content": { "zones": relevant_zones, "rules": relevant_rules, "articles": relevant_articles, "zone_parameter_tables": _filter_tables_by_bauzone( all_extracted_content.get("zone_parameter_tables", []), bauzone ), "total_zones": len(all_extracted_content.get("zones", [])), "total_rules": len(all_extracted_content.get("rules", [])), "total_articles": len(all_extracted_content.get("articles", [])), "total_tables": len(all_extracted_content.get("zone_parameter_tables", [])) }, "ai_summary": unified_summary, "relevant_rules": relevant_rules, "documents_processed": documents_processed, "errors": all_extracted_content.get("errors", []), "warnings": all_extracted_content.get("warnings", []), "machbarkeitsstudie": bzo_params_result, # Same key for frontend compatibility } except HTTPException: raise except Exception as e: logger.error(f"Error extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}': {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error extracting BZO information: {str(e)}" ) def filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: """ Filter rules by Bauzone code. Only keeps rules from SINGLE-zone articles to avoid wrong values (e.g. article with W2,W3,W5 has different values per zone - we cannot associate a rule value with a specific zone from article text alone). """ relevant_rules = [] bauzone_upper = bauzone.upper() def _zone_matches(z: str) -> bool: zu = (z or "").upper().strip() if not zu: return False if bauzone_upper in zu: return True if zu in bauzone_upper and len(zu) >= 2: return True return False for rule in rules: table_zones = rule.get("table_zones", []) or [] zone_raw = rule.get("zone_raw") # Rule must be zone-associated has_zone = bool(zone_raw) or bool(table_zones) if not has_zone: continue # CRITICAL: Only use rules from single-zone articles. Multi-zone articles # (e.g. table with W2,W3,W5) have different values per zone - we cannot # know which value applies to our zone from article text. if len(table_zones) > 1: # Check if ALL zones in article match our bauzone (e.g. W5, W5/50) - unlikely matches_all = all(_zone_matches(str(z)) for z in table_zones) if not matches_all: continue # Ambiguous: exclude # Zone must match our bauzone matches = False if zone_raw and _zone_matches(zone_raw): matches = True if not matches and table_zones: for tz in table_zones: if _zone_matches(str(tz)): matches = True break if not matches: ts = (rule.get("text_snippet") or "").upper() if bauzone_upper in ts and len(table_zones) <= 1: matches = True if matches: relevant_rules.append(rule) logger.info(f"Filtered {len(relevant_rules)} rules for Bauzone {bauzone} from {len(rules)} total (multi-zone articles excluded)") return relevant_rules def filter_zones_by_bauzone(zones: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: """ Filter zones by Bauzone code. Args: zones: List of zone dictionaries from extraction bauzone: Bauzone code to filter by Returns: Filtered list of zones that match the Bauzone """ relevant_zones = [] bauzone_upper = bauzone.upper() for zone in zones: zone_code = zone.get("zone_code", "") if bauzone_upper in zone_code.upper(): relevant_zones.append(zone) logger.info(f"Filtered {len(relevant_zones)} zones for Bauzone {bauzone} from {len(zones)} total zones") return relevant_zones def filter_articles_by_bauzone(articles: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: """ Filter articles that mention the Bauzone. Args: articles: List of article dictionaries from extraction bauzone: Bauzone code to filter by Returns: Filtered list of articles that mention the Bauzone """ relevant_articles = [] bauzone_upper = bauzone.upper() for article in articles: text = article.get("text", "") zone_raw = article.get("zone_raw") # Check if article mentions the Bauzone text_matches = bauzone_upper in text.upper() if text else False zone_matches = bauzone_upper in zone_raw.upper() if zone_raw else False if text_matches or zone_matches: relevant_articles.append(article) logger.info(f"Filtered {len(relevant_articles)} articles for Bauzone {bauzone} from {len(articles)} total articles") return relevant_articles def _filter_tables_by_bauzone(tables: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: """ Filter zone-parameter tables to include only those containing the specified Bauzone. Args: tables: List of zone-parameter table dictionaries bauzone: Bauzone code to filter by Returns: Filtered list of tables containing the Bauzone """ relevant_tables = [] bauzone_upper = bauzone.upper() for table in tables: zones = table.get("zones", []) # Check if any zone in the table matches the Bauzone matching_zones = [z for z in zones if bauzone_upper in str(z).upper()] if matching_zones: # Create filtered version with only relevant zone columns filtered_table = { "page": table.get("page"), "zones": matching_zones, "parameters": [] } # Filter parameters to only include values for matching zones for param in table.get("parameters", []): values_by_zone = param.get("values_by_zone", {}) filtered_values = { zone: values_by_zone[zone] for zone in matching_zones if zone in values_by_zone } if filtered_values: filtered_table["parameters"].append({ "parameter": param.get("parameter"), "values_by_zone": filtered_values }) if filtered_table["parameters"]: relevant_tables.append(filtered_table) logger.info(f"Filtered {len(relevant_tables)} tables for Bauzone {bauzone} from {len(tables)} total tables") return relevant_tables async def generate_bauzone_ai_summary( currentUser: User, bauzone: str, gemeinde: str, extracted_content: Dict[str, Any], relevant_rules: List[Dict[str, Any]], relevant_zones: List[Dict[str, Any]], mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, ) -> str: """ Use AI to generate a summary of relevant information for a Bauzone. Args: currentUser: Current authenticated user bauzone: Bauzone code gemeinde: Gemeinde name extracted_content: All extracted content from PDFs relevant_rules: Rules filtered by Bauzone relevant_zones: Zones filtered by Bauzone Returns: AI-generated summary string """ try: # Initialize AI service (mandateId required for billing) services = getServices( currentUser, workflow=None, mandateId=mandateId, featureInstanceId=featureInstanceId ) aiService = services.ai # Build context from extracted content, prioritizing zone-parameter tables context_parts = [] # Extract and format zone-parameter table values for the specific Bauzone zone_parameter_tables = extracted_content.get("zone_parameter_tables", []) table_values_for_bauzone = [] if zone_parameter_tables: context_parts.append("=== BUILDING REGULATIONS TABLE VALUES FOR BAUZONE (INCLUDE THESE EXACT VALUES IN YOUR SUMMARY) ===") for table in zone_parameter_tables: page_num = table.get("page", 0) article_ref = table.get("article", "Unknown article") zones_in_table = table.get("zones", []) # Check if this table contains the requested Bauzone matching_zones = [z for z in zones_in_table if bauzone.upper() in str(z).upper()] if matching_zones: context_parts.append(f"\nTabelle aus {article_ref} (Seite {page_num}):") for param in table.get("parameters", []): param_name = param.get("parameter", "") values_by_zone = param.get("values_by_zone", {}) # Extract values for the requested Bauzone for zone, values in values_by_zone.items(): if bauzone.upper() in zone.upper(): if isinstance(values, list) and len(values) > 0: # Take the first value (most relevant) val_entry = values[0] value = val_entry.get("value", "") unit = val_entry.get("unit", "") unit_str = f" {unit}" if unit else "" # Format parameter name nicely formatted_param = param_name if "Ausnützungsziffer" in param_name or "ausnützungsziffer" in param_name.lower(): formatted_param = "Ausnützungsziffer max." elif "Vollgeschosse" in param_name or "vollgeschosse" in param_name.lower(): formatted_param = "Vollgeschosse max." elif "Gebäudelänge" in param_name or "gebäudelänge" in param_name.lower(): formatted_param = "Gebäudelänge max." elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Grundabstand" in param_name or "grundabstand" in param_name.lower()): formatted_param = "Grenzabstand - Grundabstand min." elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Mehrlängen" in param_name or "mehrlängen" in param_name.lower()): formatted_param = "Grenzabstand - Mehrlängen-zuschlag" elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Höchstmass" in param_name or "höchstmass" in param_name.lower() or "Höchstmaß" in param_name): formatted_param = "Grenzabstand - Höchstmass max." elif "Fassadenhöhen" in param_name or "fassadenhöhen" in param_name.lower(): formatted_param = "Fassadenhöhen max." elif "Dachgeschosse" in param_name or "dachgeschosse" in param_name.lower(): formatted_param = "anrechenbare Dachgeschosse max." elif "Attikageschoss" in param_name or "attikageschoss" in param_name.lower(): formatted_param = "anrechenbares Attikageschoss max." elif "Untergeschoss" in param_name or "untergeschoss" in param_name.lower(): formatted_param = "anrechenbares Untergeschoss max." table_values_for_bauzone.append({ "parameter": formatted_param, "value": value, "unit": unit_str, "article": article_ref, "page": page_num }) context_parts.append(f" • {formatted_param}: {value}{unit_str} (Quelle: {article_ref}, Seite {page_num})") # Also check for multiple values (e.g., Fassadenhöhen with footnote values) if len(values) > 1: for idx, val_entry in enumerate(values[1:], 1): value_extra = val_entry.get("value", "") unit_extra = val_entry.get("unit", "") unit_str_extra = f" {unit_extra}" if unit_extra else "" context_parts.append(f" (Alternative: {value_extra}{unit_str_extra})") # Add zone information with all details if relevant_zones: context_parts.append("\n=== ZONE DEFINITIONS ===") for zone in relevant_zones: zone_code = zone.get("zone_code", "") zone_name = zone.get("zone_name", "") zone_category = zone.get("zone_category", "") geschosszahl = zone.get("geschosszahl") gewerbeerleichterung = zone.get("gewerbeerleichterung", False) page_num = zone.get("page", 0) source_article = zone.get("source_article", "") zone_info = f"- {zone_code}: {zone_name}" if zone_category: zone_info += f" (Kategorie: {zone_category})" if geschosszahl: zone_info += f", Geschosszahl: {geschosszahl}" if gewerbeerleichterung: zone_info += ", Gewerbeerleichterung: Ja" if source_article: zone_info += f" - Quelle: {source_article} (Seite {page_num})" context_parts.append(zone_info) # Add article information with full text previews relevant_articles = filter_articles_by_bauzone(extracted_content.get("articles", []), bauzone) if relevant_articles: context_parts.append("\n=== RELEVANT ARTICLES (full content) ===") for article in relevant_articles: article_label = article.get("article_label", "") article_title = article.get("article_title", "") article_text = article.get("text", "") page_start = article.get("page_start", 0) page_end = article.get("page_end", 0) page_range = f"Seite {page_start}" if page_start == page_end else f"Seiten {page_start}-{page_end}" context_parts.append(f"\n{article_label}: {article_title or 'Kein Titel'}") context_parts.append(f"Lage: {page_range}") # Include full article text (truncated if too long) if len(article_text) > 1000: context_parts.append(f"Inhalt: {article_text[:1000]}...") else: context_parts.append(f"Inhalt: {article_text}") # Add relevant rules (only if not already covered in tables) if relevant_rules: # Filter out rules that are likely already in tables table_parameter_names = set() for table in zone_parameter_tables: for param in table.get("parameters", []): param_name = param.get("parameter", "").lower() table_parameter_names.add(param_name) unique_rules = [] for rule in relevant_rules[:15]: rule_type = rule.get("rule_type", "").lower() # Skip if this rule type is likely in tables if not any(tp in rule_type for tp in table_parameter_names): unique_rules.append(rule) if unique_rules: context_parts.append("\n=== ADDITIONAL BUILDING REGULATIONS (from text) ===") for rule in unique_rules[:8]: rule_type = rule.get("rule_type", "") value_numeric = rule.get("value_numeric") value_text = rule.get("value_text", "") unit = rule.get("unit", "") page_num = rule.get("page", 0) rule_desc = f"- {rule_type}: " if value_numeric is not None: rule_desc += f"{value_numeric}" if unit: rule_desc += f" {unit}" else: rule_desc += value_text rule_desc += f" (Seite {page_num})" context_parts.append(rule_desc) context = "\n".join(context_parts) # Create AI prompt with explicit instructions to include all table values prompt = f""" Analyze the following building zone (Bauzone) information extracted from BZO (Bau- und Zonenordnung) documents for {gemeinde}, specifically for Bauzone {bauzone}. Extracted Content: {context} CRITICAL INSTRUCTIONS: 1. You MUST include ALL actual values from the tables in your summary - do NOT just say "see tables on page X" 2. List ALL parameters with their actual values: Ausnützungsziffer, Vollgeschosse, Gebäudelänge, Grenzabstand (Grundabstand, Mehrlängen-zuschlag, Höchstmass), Fassadenhöhen, etc. 3. Integrate zone definitions and article information INTO the summary text - do NOT create separate sections 4. Always cite WHERE each piece of information was found (article number and page number) 5. Combine everything into ONE unified, flowing summary - no separate sections for zones/articles 6. Be comprehensive - include all relevant details from zones, articles, and tables 7. Format as a single, well-structured German text document Please provide a comprehensive, unified summary that includes: 1. General description of Bauzone {bauzone}: - Zone category (Wohnzonen, Zentrumszonen, etc.) - Geschosszahl (number of full storeys) - Gewerbeerleichterung status (Ja/Nein) - Where defined (article and page number) 2. ALL building regulations with ACTUAL VALUES from tables (you MUST include the exact values): - Ausnützungsziffer max.: [ACTUAL PERCENTAGE VALUE]% (from article, page) - Vollgeschosse max.: [ACTUAL NUMBER] (from article, page) - anrechenbare Dachgeschosse max.: [ACTUAL NUMBER] (from article, page) - anrechenbares Attikageschoss max.: [ACTUAL NUMBER] (from article, page) - anrechenbares Untergeschoss max.: [ACTUAL NUMBER] (from article, page) - Gebäudelänge max.: [ACTUAL VALUE] m (from article, page) - Grenzabstand - Grundabstand min.: [ACTUAL VALUE] m (from article, page) - Grenzabstand - Mehrlängen-zuschlag: [ACTUAL FRACTION] (from article, page) - Grenzabstand - Höchstmass max.: [ACTUAL VALUE] m (from article, page) - Fassadenhöhen max.: [ACTUAL VALUE] m (from article, page, include footnote values if present) 3. Zone definitions: Integrate information about where this zone is defined (which articles mention it, with page numbers) 4. Relevant articles: Integrate key content from relevant articles naturally into the summary, citing article numbers and page numbers 5. Special conditions: Any special requirements or exceptions mentioned in articles CRITICAL: You MUST include the actual numeric values from the tables in your summary. Do NOT say "see tables" - list the actual values. Format everything as ONE unified, flowing German text document without separate sections. Integrate zones and articles naturally into the narrative. """ # Call AI service logger.info(f"Generating AI summary for Bauzone {bauzone} in {gemeinde}") ai_response = await aiService.callAiPlanning( prompt=prompt, debugType="bzo_summary" ) return ai_response.strip() except Exception as e: logger.error(f"Error generating AI summary: {str(e)}", exc_info=True) # Return a basic summary if AI fails return f"Summary generation failed: {str(e)}. Found {len(relevant_rules)} relevant rules and {len(relevant_zones)} zones for Bauzone {bauzone}."