""" Real Estate feature main logic. Handles database operations with AI-powered natural language processing. Stateless implementation without session management. """ import logging import json from typing import Optional, Dict, Any from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelRealEstate import ( Projekt, Parzelle, StatusProzess, ) from modules.services import getInterface as getServices from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface logger = logging.getLogger(__name__) # ===== Direkte Query-Ausführung (stateless) ===== async def executeDirectQuery( currentUser: User, queryText: str, parameters: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Execute a database query directly without session management. Args: currentUser: Current authenticated user queryText: SQL query text parameters: Optional parameters for parameterized queries Returns: Dictionary containing query result (rows, columns, rowCount) Note: - No session or query history is saved - Query is executed directly and result is returned - For production, validate and sanitize queries before execution - TODO: Implement actual database query execution via interface """ try: logger.info(f"Executing direct query for user {currentUser.id} (mandate: {currentUser.mandateId})") logger.debug(f"Query text: {queryText}") if parameters: logger.debug(f"Query parameters: {parameters}") # Execute query via Real Estate interface (stateless) realEstateInterface = getRealEstateInterface(currentUser) result = realEstateInterface.executeQuery(queryText, parameters) logger.info( f"Query executed successfully: {result['rowCount']} rows in {result.get('executionTime', 0):.3f}s" ) return { "status": "success", "rows": result["rows"], "columns": result["columns"], "rowCount": result["rowCount"], "executionTime": result.get("executionTime", 0), } except Exception as e: logger.error(f"Error executing query: {str(e)}", exc_info=True) raise # ===== AI-basierte Intent-Erkennung und CRUD-Operationen ===== async def processNaturalLanguageCommand( currentUser: User, userInput: str, ) -> Dict[str, Any]: """ Process natural language user input and execute corresponding CRUD operations. Uses AI to analyze user intent and extract parameters, then executes the appropriate CRUD operation through the interface. Works stateless without session management. Args: currentUser: Current authenticated user userInput: Natural language command from user Returns: Dictionary containing operation result and metadata Example user inputs: - "Erstelle ein neues Projekt namens 'Hauptstrasse 42'" - "Zeige mir alle Projekte in Zürich" - "Aktualisiere Projekt XYZ mit Status 'Planung'" - "Lösche Parzelle ABC" - "SELECT * FROM Projekt WHERE plz = '8000'" """ try: logger.info(f"Processing natural language command for user {currentUser.id} (mandate: {currentUser.mandateId})") logger.debug(f"User input: {userInput}") # Initialize services for AI access services = getServices(currentUser, workflow=None) aiService = services.ai # Step 1: Analyze user intent with AI intentAnalysis = await analyzeUserIntent(aiService, userInput) logger.info(f"Intent analysis result: intent={intentAnalysis.get('intent')}, entity={intentAnalysis.get('entity')}") # Step 2: Execute CRUD operation based on intent result = await executeIntentBasedOperation( currentUser=currentUser, intent=intentAnalysis["intent"], entity=intentAnalysis.get("entity"), parameters=intentAnalysis.get("parameters", {}), ) return { "success": True, "intent": intentAnalysis["intent"], "entity": intentAnalysis.get("entity"), "result": result, } except Exception as e: logger.error(f"Error processing natural language command: {str(e)}", exc_info=True) raise async def analyzeUserIntent( aiService, userInput: str ) -> Dict[str, Any]: """ Use AI to analyze user input and extract intent, entity, and parameters. Args: aiService: AI service instance userInput: Natural language user input Returns: Dictionary with 'intent', 'entity', and 'parameters' """ # Create a structured prompt for intent analysis with accurate field information intentPrompt = f""" Analyze the following user command and extract the intent, entity, and parameters. User Command: "{userInput}" Available intents: - CREATE: User wants to create a new entity - READ: User wants to read/query entities - UPDATE: User wants to update an existing entity - DELETE: User wants to delete an entity - QUERY: User wants to execute a database query (SQL statements) Available entities and their fields: **Projekt** (Real estate project): - id: string (primary key) - mandateId: string (mandate ID) - label: string (project designation/name) - statusProzess: string enum (Eingang, Analyse, Studie, Planung, Baurechtsverfahren, Umsetzung, Archiv) - perimeter: GeoPolylinie (geographic boundary, JSONB) - baulinie: GeoPolylinie (building line, JSONB) - parzellen: List[Parzelle] (plots belonging to project, JSONB) - dokumente: List[Dokument] (documents, JSONB) - kontextInformationen: List[Kontext] (context info, JSONB) **Parzelle** (Plot/parcel): - id: string (primary key) - mandateId: string (mandate ID) - label: string (plot designation) - strasseNr: string (street and house number) - plz: string (postal code) - kontextGemeinde: string (municipality ID, Foreign Key to Gemeinde table) - bauzone: string (building zone, e.g. W3, WG2) - az: float (Ausnützungsziffer) - bz: float (Bebauungsziffer) - vollgeschossZahl: int (number of allowed full floors) - gebaeudehoeheMax: float (maximum building height in meters) - laermschutzzone: string (noise protection zone) - hochwasserschutzzone: string (flood protection zone) - grundwasserschutzzone: string (groundwater protection zone) - parzelleBebaut: JaNein enum (is plot built) - parzelleErschlossen: JaNein enum (is plot developed) - parzelleHanglage: JaNein enum (is plot on slope) **Important relationships:** - Projekte contain Parzellen (projects have plots) - Parzelle links to Gemeinde (via kontextGemeinde) - Gemeinde links to Kanton (via id_kanton) - Kanton links to Land (via id_land) - Location queries (city, postal code) should use Parzelle.kontextGemeinde (municipality name will be resolved to ID) - Projekt does NOT have location fields directly - location is stored in associated Parzellen Return a JSON object with the following structure: {{ "intent": "CREATE|READ|UPDATE|DELETE|QUERY", "entity": "Projekt|Parzelle|Dokument|Kanton|Gemeinde|null", "parameters": {{ // Extracted parameters from user input // For CREATE/UPDATE: include all relevant fields using EXACT field names from above // For READ: include filter criteria using EXACT field names (id, label, plz, kontextGemeinde, etc.) // For DELETE: include entity ID if mentioned // For QUERY: include queryText if SQL is detected // IMPORTANT: Use only field names that exist in the entity definition above }}, "confidence": 0.0-1.0 // Confidence score for the analysis }} Examples: - Input: "Erstelle ein neues Projekt namens 'Hauptstrasse 42'" Output: {{"intent": "CREATE", "entity": "Projekt", "parameters": {{"label": "Hauptstrasse 42"}}, "confidence": 0.95}} - Input: "Zeige mir alle Projekte" Output: {{"intent": "READ", "entity": "Projekt", "parameters": {{}}, "confidence": 0.9}} - Input: "Zeige mir Projekte in Zürich" Output: {{"intent": "READ", "entity": "Parzelle", "parameters": {{"kontextGemeinde": "Zürich"}}, "confidence": 0.9}} Note: Location queries should query Parzelle, not Projekt directly - Input: "Zeige mir Parzellen mit PLZ 8000" Output: {{"intent": "READ", "entity": "Parzelle", "parameters": {{"plz": "8000"}}, "confidence": 0.95}} - Input: "Aktualisiere Projekt XYZ mit Status 'Planung'" Output: {{"intent": "UPDATE", "entity": "Projekt", "parameters": {{"id": "XYZ", "statusProzess": "Planung"}}, "confidence": 0.85}} - Input: "SELECT * FROM Projekt WHERE label = 'Test'" Output: {{"intent": "QUERY", "entity": null, "parameters": {{"queryText": "SELECT * FROM Projekt WHERE label = 'Test'", "queryType": "sql"}}, "confidence": 1.0}} - Input: "Lösche Parzelle ABC" Output: {{"intent": "DELETE", "entity": "Parzelle", "parameters": {{"id": "ABC"}}, "confidence": 0.9}} """ try: # Use AI planning call for structured JSON response response = await aiService.callAiPlanning( prompt=intentPrompt, debugType="intentanalysis" ) # Extract JSON from response (handles markdown code blocks) jsonStart = response.find('{') jsonEnd = response.rfind('}') + 1 if jsonStart == -1 or jsonEnd == 0: raise ValueError("No JSON found in AI response") jsonStr = response[jsonStart:jsonEnd] # Parse JSON response intentData = json.loads(jsonStr) # Validate response structure if "intent" not in intentData: raise ValueError("Invalid intent analysis response: missing 'intent' field") # Ensure parameters exists if "parameters" not in intentData: intentData["parameters"] = {} logger.debug(f"Parsed intent analysis: {intentData}") return intentData except json.JSONDecodeError as e: logger.error(f"Failed to parse AI intent analysis response: {e}") logger.error(f"Raw response: {response}") raise ValueError(f"AI returned invalid JSON: {str(e)}") except Exception as e: logger.error(f"Error analyzing user intent: {str(e)}", exc_info=True) raise async def executeIntentBasedOperation( currentUser: User, intent: str, entity: Optional[str], parameters: Dict[str, Any], ) -> Dict[str, Any]: """ Execute CRUD operation based on analyzed intent. Args: currentUser: Current authenticated user intent: Intent from AI analysis (CREATE, READ, UPDATE, DELETE, QUERY) entity: Entity type from AI analysis parameters: Extracted parameters from AI analysis Returns: Operation result Note: - TODO: Implement actual interface calls once datamodels are ready - Currently returns test responses showing what would be executed """ try: logger.info(f"Executing intent-based operation: intent={intent}, entity={entity}") logger.debug(f"Parameters: {parameters}") if intent == "QUERY": # Execute database query directly (stateless) queryText = parameters.get("queryText", "") if not queryText: raise ValueError("QUERY intent requires queryText in parameters") result = await executeDirectQuery( currentUser=currentUser, queryText=queryText, parameters=parameters.get("queryParameters"), ) return result elif intent == "CREATE": # Create new entity realEstateInterface = getRealEstateInterface(currentUser) if entity == "Projekt": # Create Projekt from parameters projekt = Projekt( mandateId=currentUser.mandateId, label=parameters.get("label", ""), statusProzess=StatusProzess(parameters.get("statusProzess", "EINGANG")) if parameters.get("statusProzess") else None, ) created = realEstateInterface.createProjekt(projekt) return { "operation": "CREATE", "entity": "Projekt", "result": created.model_dump() } elif entity == "Parzelle": # Create Parzelle from parameters parzelle = Parzelle( mandateId=currentUser.mandateId, label=parameters.get("label", ""), strasseNr=parameters.get("strasseNr"), plz=parameters.get("plz"), bauzone=parameters.get("bauzone"), kontextGemeinde=parameters.get("kontextGemeinde"), ) created = realEstateInterface.createParzelle(parzelle) return { "operation": "CREATE", "entity": "Parzelle", "result": created.model_dump() } elif entity == "Gemeinde": # Create Gemeinde from parameters from modules.datamodels.datamodelRealEstate import Gemeinde gemeinde = Gemeinde( mandateId=currentUser.mandateId, label=parameters.get("label", ""), id_kanton=parameters.get("id_kanton"), plz=parameters.get("plz"), ) created = realEstateInterface.createGemeinde(gemeinde) return { "operation": "CREATE", "entity": "Gemeinde", "result": created.model_dump() } elif entity == "Kanton": # Create Kanton from parameters from modules.datamodels.datamodelRealEstate import Kanton kanton = Kanton( mandateId=currentUser.mandateId, label=parameters.get("label", ""), id_land=parameters.get("id_land"), abk=parameters.get("abk"), ) created = realEstateInterface.createKanton(kanton) return { "operation": "CREATE", "entity": "Kanton", "result": created.model_dump() } elif entity == "Land": # Create Land from parameters from modules.datamodels.datamodelRealEstate import Land land = Land( mandateId=currentUser.mandateId, label=parameters.get("label", ""), abk=parameters.get("abk"), ) created = realEstateInterface.createLand(land) return { "operation": "CREATE", "entity": "Land", "result": created.model_dump() } elif entity == "Dokument": # Create Dokument from parameters from modules.datamodels.datamodelRealEstate import Dokument dokument = Dokument( mandateId=currentUser.mandateId, label=parameters.get("label", ""), dokumentReferenz=parameters.get("dokumentReferenz", ""), versionsbezeichnung=parameters.get("versionsbezeichnung"), dokumentTyp=parameters.get("dokumentTyp"), quelle=parameters.get("quelle"), mimeType=parameters.get("mimeType"), ) created = realEstateInterface.createDokument(dokument) return { "operation": "CREATE", "entity": "Dokument", "result": created.model_dump() } else: raise ValueError(f"CREATE operation not supported for entity: {entity}") elif intent == "READ": # Read entities realEstateInterface = getRealEstateInterface(currentUser) if entity == "Projekt": projektId = parameters.get("id") if projektId: # Get single Projekt by ID projekt = realEstateInterface.getProjekt(projektId) if not projekt: raise ValueError(f"Projekt {projektId} not found") return { "operation": "READ", "entity": "Projekt", "result": projekt.model_dump() } else: # List all Projekte (with optional filters) # Validate filter fields against Projekt model validProjektFields = {"id", "mandateId", "label", "statusProzess"} recordFilter = { k: v for k, v in parameters.items() if k != "id" and k in validProjektFields } # Warn about invalid fields invalidFields = {k: v for k, v in parameters.items() if k not in validProjektFields and k != "id"} if invalidFields: logger.warning(f"Invalid filter fields for Projekt ignored: {list(invalidFields.keys())}") logger.info("Note: Location queries should use Parzelle entity, not Projekt") projekte = realEstateInterface.getProjekte(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Projekt", "result": [p.model_dump() for p in projekte] } elif entity == "Parzelle": parzelleId = parameters.get("id") if parzelleId: # Get single Parzelle by ID parzelle = realEstateInterface.getParzelle(parzelleId) if not parzelle: raise ValueError(f"Parzelle {parzelleId} not found") return { "operation": "READ", "entity": "Parzelle", "result": parzelle.model_dump() } else: # List all Parzellen (with optional filters) # Validate filter fields against Parzelle model # Note: kontextKanton and kontextLand are NOT direct fields on Parzelle # Parzelle links to Gemeinde, Gemeinde links to Kanton, Kanton links to Land validParzelleFields = { "id", "mandateId", "label", "strasseNr", "plz", "kontextGemeinde", # Only direct link - Gemeinde → Kanton → Land "bauzone", "az", "bz", "vollgeschossZahl", "gebaeudehoeheMax", "laermschutzzone", "hochwasserschutzzone", "grundwasserschutzzone", "parzelleBebaut", "parzelleErschlossen", "parzelleHanglage" } recordFilter = { k: v for k, v in parameters.items() if k != "id" and k in validParzelleFields } # Warn about invalid fields invalidFields = {k: v for k, v in parameters.items() if k not in validParzelleFields and k != "id"} if invalidFields: logger.warning(f"Invalid filter fields for Parzelle ignored: {list(invalidFields.keys())}") parzellen = realEstateInterface.getParzellen(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Parzelle", "result": [p.model_dump() for p in parzellen] } elif entity == "Gemeinde": from modules.datamodels.datamodelRealEstate import Gemeinde gemeindeId = parameters.get("id") if gemeindeId: gemeinde = realEstateInterface.getGemeinde(gemeindeId) if not gemeinde: raise ValueError(f"Gemeinde {gemeindeId} not found") return { "operation": "READ", "entity": "Gemeinde", "result": gemeinde.model_dump() } else: recordFilter = {k: v for k, v in parameters.items() if k != "id"} gemeinden = realEstateInterface.getGemeinden(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Gemeinde", "result": [g.model_dump() for g in gemeinden] } elif entity == "Kanton": from modules.datamodels.datamodelRealEstate import Kanton kantonId = parameters.get("id") if kantonId: kanton = realEstateInterface.getKanton(kantonId) if not kanton: raise ValueError(f"Kanton {kantonId} not found") return { "operation": "READ", "entity": "Kanton", "result": kanton.model_dump() } else: recordFilter = {k: v for k, v in parameters.items() if k != "id"} kantone = realEstateInterface.getKantone(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Kanton", "result": [k.model_dump() for k in kantone] } elif entity == "Land": from modules.datamodels.datamodelRealEstate import Land landId = parameters.get("id") if landId: land = realEstateInterface.getLand(landId) if not land: raise ValueError(f"Land {landId} not found") return { "operation": "READ", "entity": "Land", "result": land.model_dump() } else: recordFilter = {k: v for k, v in parameters.items() if k != "id"} laender = realEstateInterface.getLaender(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Land", "result": [l.model_dump() for l in laender] } elif entity == "Dokument": from modules.datamodels.datamodelRealEstate import Dokument dokumentId = parameters.get("id") if dokumentId: dokument = realEstateInterface.getDokument(dokumentId) if not dokument: raise ValueError(f"Dokument {dokumentId} not found") return { "operation": "READ", "entity": "Dokument", "result": dokument.model_dump() } else: recordFilter = {k: v for k, v in parameters.items() if k != "id"} dokumente = realEstateInterface.getDokumente(recordFilter=recordFilter if recordFilter else None) return { "operation": "READ", "entity": "Dokument", "result": [d.model_dump() for d in dokumente] } else: raise ValueError(f"READ operation not supported for entity: {entity}") elif intent == "UPDATE": # Update existing entity realEstateInterface = getRealEstateInterface(currentUser) if entity == "Projekt": projektId = parameters.get("id") if not projektId: raise ValueError("UPDATE operation requires entity ID") # Get existing projekt projekt = realEstateInterface.getProjekt(projektId) if not projekt: raise ValueError(f"Projekt {projektId} not found") # Update fields updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateProjekt(projektId, updateData) return { "operation": "UPDATE", "entity": "Projekt", "result": updated.model_dump() } elif entity == "Parzelle": parzelleId = parameters.get("id") if not parzelleId: raise ValueError("UPDATE operation requires entity ID") # Get existing parzelle parzelle = realEstateInterface.getParzelle(parzelleId) if not parzelle: raise ValueError(f"Parzelle {parzelleId} not found") # Update fields updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateParzelle(parzelleId, updateData) return { "operation": "UPDATE", "entity": "Parzelle", "result": updated.model_dump() } elif entity == "Gemeinde": from modules.datamodels.datamodelRealEstate import Gemeinde gemeindeId = parameters.get("id") if not gemeindeId: raise ValueError("UPDATE operation requires entity ID") gemeinde = realEstateInterface.getGemeinde(gemeindeId) if not gemeinde: raise ValueError(f"Gemeinde {gemeindeId} not found") updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateGemeinde(gemeindeId, updateData) return { "operation": "UPDATE", "entity": "Gemeinde", "result": updated.model_dump() } elif entity == "Kanton": from modules.datamodels.datamodelRealEstate import Kanton kantonId = parameters.get("id") if not kantonId: raise ValueError("UPDATE operation requires entity ID") kanton = realEstateInterface.getKanton(kantonId) if not kanton: raise ValueError(f"Kanton {kantonId} not found") updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateKanton(kantonId, updateData) return { "operation": "UPDATE", "entity": "Kanton", "result": updated.model_dump() } elif entity == "Land": from modules.datamodels.datamodelRealEstate import Land landId = parameters.get("id") if not landId: raise ValueError("UPDATE operation requires entity ID") land = realEstateInterface.getLand(landId) if not land: raise ValueError(f"Land {landId} not found") updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateLand(landId, updateData) return { "operation": "UPDATE", "entity": "Land", "result": updated.model_dump() } elif entity == "Dokument": from modules.datamodels.datamodelRealEstate import Dokument dokumentId = parameters.get("id") if not dokumentId: raise ValueError("UPDATE operation requires entity ID") dokument = realEstateInterface.getDokument(dokumentId) if not dokument: raise ValueError(f"Dokument {dokumentId} not found") updateData = {k: v for k, v in parameters.items() if k != "id"} updated = realEstateInterface.updateDokument(dokumentId, updateData) return { "operation": "UPDATE", "entity": "Dokument", "result": updated.model_dump() } else: raise ValueError(f"UPDATE operation not supported for entity: {entity}") elif intent == "DELETE": # Delete entity realEstateInterface = getRealEstateInterface(currentUser) if entity == "Projekt": projektId = parameters.get("id") if not projektId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteProjekt(projektId) return { "operation": "DELETE", "entity": "Projekt", "success": success } elif entity == "Parzelle": parzelleId = parameters.get("id") if not parzelleId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteParzelle(parzelleId) return { "operation": "DELETE", "entity": "Parzelle", "success": success } elif entity == "Gemeinde": from modules.datamodels.datamodelRealEstate import Gemeinde gemeindeId = parameters.get("id") if not gemeindeId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteGemeinde(gemeindeId) return { "operation": "DELETE", "entity": "Gemeinde", "success": success } elif entity == "Kanton": from modules.datamodels.datamodelRealEstate import Kanton kantonId = parameters.get("id") if not kantonId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteKanton(kantonId) return { "operation": "DELETE", "entity": "Kanton", "success": success } elif entity == "Land": from modules.datamodels.datamodelRealEstate import Land landId = parameters.get("id") if not landId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteLand(landId) return { "operation": "DELETE", "entity": "Land", "success": success } elif entity == "Dokument": from modules.datamodels.datamodelRealEstate import Dokument dokumentId = parameters.get("id") if not dokumentId: raise ValueError("DELETE operation requires entity ID") success = realEstateInterface.deleteDokument(dokumentId) return { "operation": "DELETE", "entity": "Dokument", "success": success } else: raise ValueError(f"DELETE operation not supported for entity: {entity}") else: raise ValueError(f"Unknown intent: {intent}") except Exception as e: logger.error(f"Error executing intent-based operation: {str(e)}", exc_info=True) raise