gateway/modules/features/realEstate/mainRealEstate.py

1212 lines
54 KiB
Python

"""
Real Estate feature main logic.
Handles database operations with AI-powered natural language processing.
Stateless implementation without session management.
"""
import logging
import json
from typing import Optional, Dict, Any, List
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelRealEstate import (
Projekt,
Parzelle,
StatusProzess,
GeoPolylinie,
)
from modules.services import getInterface as getServices
from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface
from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector
logger = logging.getLogger(__name__)
# ===== Swisstopo Integration =====
async def fetch_parcel_polygon_from_swisstopo(
gemeinde: str,
parzellen_nr: str,
sr: int = 2056
) -> Optional[Dict[str, Any]]:
"""
Holt die vollständige Polygon-Geometrie einer Parzelle von Swisstopo API.
Args:
gemeinde: Name der Gemeinde (z.B. "Bern")
parzellen_nr: Parzellennummer (z.B. "1234")
sr: Koordinatensystem (2056=LV95, 4326=WGS84)
Returns:
Dictionary mit GeoPolylinie-Format für perimeter-Feld, oder None wenn nicht gefunden
Format: {"closed": True, "punkte": [{"koordinatensystem": "LV95", "x": ..., "y": ..., "z": None}, ...]}
"""
try:
connector = SwissTopoMapServerConnector()
# Get GeoJSON feature from Swisstopo
feature = await connector.get_parcel_polygon(gemeinde, parzellen_nr, sr)
if not feature:
logger.warning(f"Parzelle {gemeinde} {parzellen_nr} nicht gefunden in Swisstopo")
return None
# Convert GeoJSON to GeoPolylinie format
geometry = feature.get("geometry", {})
if geometry.get("type") == "Polygon":
coordinates = geometry.get("coordinates", [])
if coordinates and len(coordinates) > 0:
ring = coordinates[0] # Outer ring
punkte = []
for coord in ring:
if len(coord) >= 2:
punkt = {
"koordinatensystem": "LV95" if sr == 2056 else "WGS84",
"x": coord[0], # GeoJSON: [x, y] = [easting, northing]
"y": coord[1],
"z": coord[2] if len(coord) > 2 else None
}
punkte.append(punkt)
logger.info(f"Successfully fetched polygon with {len(punkte)} points for {gemeinde} {parzellen_nr}")
return {
"closed": True,
"punkte": punkte
}
logger.warning(f"Unexpected geometry type in Swisstopo response: {geometry.get('type')}")
return None
except Exception as e:
logger.error(f"Error fetching parcel polygon from Swisstopo: {e}", exc_info=True)
return None
# ===== Direkte Query-Ausführung (stateless) =====
async def executeDirectQuery(
currentUser: User,
queryText: str,
parameters: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Execute a database query directly without session management.
Args:
currentUser: Current authenticated user
queryText: SQL query text
parameters: Optional parameters for parameterized queries
Returns:
Dictionary containing query result (rows, columns, rowCount)
Note:
- No session or query history is saved
- Query is executed directly and result is returned
- For production, validate and sanitize queries before execution
- TODO: Implement actual database query execution via interface
"""
try:
logger.info(f"Executing direct query for user {currentUser.id} (mandate: {currentUser.mandateId})")
logger.debug(f"Query text: {queryText}")
if parameters:
logger.debug(f"Query parameters: {parameters}")
# Execute query via Real Estate interface (stateless)
realEstateInterface = getRealEstateInterface(currentUser)
result = realEstateInterface.executeQuery(queryText, parameters)
logger.info(
f"Query executed successfully: {result['rowCount']} rows in {result.get('executionTime', 0):.3f}s"
)
return {
"status": "success",
"rows": result["rows"],
"columns": result["columns"],
"rowCount": result["rowCount"],
"executionTime": result.get("executionTime", 0),
}
except Exception as e:
logger.error(f"Error executing query: {str(e)}", exc_info=True)
raise
# ===== AI-basierte Intent-Erkennung und CRUD-Operationen =====
def _formatEntitySummary(entity_type: str, items: List[Dict[str, Any]], filters: Dict[str, Any]) -> str:
"""
Format a human-readable summary of query results.
Args:
entity_type: Type of entity (Projekt, Parzelle, etc.)
items: List of entity data dictionaries
filters: Filter parameters used in the query
Returns:
Human-readable summary string
"""
if not items:
return f"Keine {entity_type} gefunden"
count = len(items)
filter_desc = ""
if filters:
# Format filter description
if "kontextGemeinde" in filters:
filter_desc = f" in {filters['kontextGemeinde']}"
elif "plz" in filters:
filter_desc = f" mit PLZ {filters['plz']}"
elif "location_filter" in filters:
filter_desc = f" in {filters['location_filter']}"
# Start with count
summary = f"Gefunden: {count} {entity_type}{filter_desc}"
# Add details based on entity type
if entity_type == "Parzelle":
summary += "\n\nDetails:"
for i, item in enumerate(items[:10], 1): # Limit to first 10
parts = []
# Add label or ID
if item.get("label"):
parts.append(f"Parzelle '{item['label']}'")
elif item.get("id"):
parts.append(f"Parzelle {item['id'][:8]}...")
# Add address
if item.get("strasseNr"):
parts.append(item["strasseNr"])
# Add PLZ and municipality
location_parts = []
if item.get("plz"):
location_parts.append(item["plz"])
if item.get("kontextGemeinde"):
location_parts.append(item["kontextGemeinde"])
if location_parts:
parts.append(" ".join(location_parts))
# Add building zone
if item.get("bauzone"):
parts.append(f"Bauzone: {item['bauzone']}")
summary += f"\n{i}. {', '.join(parts)}"
if count > 10:
summary += f"\n... und {count - 10} weitere"
elif entity_type == "Projekt":
summary += "\n\nDetails:"
for i, item in enumerate(items[:10], 1):
parts = []
# Add label
if item.get("label"):
parts.append(f"'{item['label']}'")
# Add status
if item.get("statusProzess"):
parts.append(f"Status: {item['statusProzess']}")
# Add parcel count
parzellen = item.get("parzellen", [])
if parzellen:
parts.append(f"{len(parzellen)} Parzelle(n)")
summary += f"\n{i}. {' - '.join(parts)}"
if count > 10:
summary += f"\n... und {count - 10} weitere"
elif entity_type == "Gemeinde":
summary += "\n\nDetails:"
for i, item in enumerate(items[:10], 1):
parts = []
if item.get("label"):
parts.append(item["label"])
if item.get("plz"):
parts.append(f"PLZ: {item['plz']}")
if item.get("abk"):
parts.append(f"Abk: {item['abk']}")
summary += f"\n{i}. {', '.join(parts)}"
if count > 10:
summary += f"\n... und {count - 10} weitere"
elif entity_type == "Dokument":
summary += "\n\nDetails:"
for i, item in enumerate(items[:10], 1):
parts = []
if item.get("label"):
parts.append(item["label"])
if item.get("dokumentTyp"):
parts.append(f"Typ: {item['dokumentTyp']}")
if item.get("quelle"):
parts.append(f"Quelle: {item['quelle']}")
summary += f"\n{i}. {', '.join(parts)}"
if count > 10:
summary += f"\n... und {count - 10} weitere"
else:
# Generic format for other entity types
if count <= 5:
summary += "\n\nDetails:"
for i, item in enumerate(items, 1):
label = item.get("label") or item.get("id", "")
if label:
summary += f"\n{i}. {label}"
return summary
async def processNaturalLanguageCommand(
currentUser: User,
userInput: str,
) -> Dict[str, Any]:
"""
Process natural language user input and execute corresponding CRUD operations.
Uses AI to analyze user intent and extract parameters, then executes the appropriate
CRUD operation through the interface. Works stateless without session management.
Args:
currentUser: Current authenticated user
userInput: Natural language command from user
Returns:
Dictionary containing operation result and metadata
Example user inputs:
- "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
- "Zeige mir alle Projekte in Zürich"
- "Aktualisiere Projekt XYZ mit Status 'Planung'"
- "Lösche Parzelle ABC"
- "SELECT * FROM Projekt WHERE plz = '8000'"
"""
try:
logger.info(f"Processing natural language command for user {currentUser.id} (mandate: {currentUser.mandateId})")
logger.debug(f"User input: {userInput}")
# Initialize services for AI access
services = getServices(currentUser, workflow=None)
aiService = services.ai
# Step 1: Analyze user intent with AI
intentAnalysis = await analyzeUserIntent(aiService, userInput)
logger.info(f"Intent analysis result: intent={intentAnalysis.get('intent')}, entity={intentAnalysis.get('entity')}")
# Step 2: Execute CRUD operation based on intent
result = await executeIntentBasedOperation(
currentUser=currentUser,
intent=intentAnalysis["intent"],
entity=intentAnalysis.get("entity"),
parameters=intentAnalysis.get("parameters", {}),
)
# Build user-friendly response
response = {
"success": True,
"intent": intentAnalysis["intent"],
"entity": intentAnalysis.get("entity"),
"result": result,
}
# Add human-readable summary for operations
if intentAnalysis["intent"] == "CREATE" and isinstance(result, dict):
# Add confirmation message for CREATE operations
operation_result = result.get("result")
if isinstance(operation_result, dict):
entity_name = intentAnalysis.get('entity', 'Eintrag')
label = operation_result.get("label", operation_result.get("id", ""))
# Build detailed message
msg_parts = [f"{entity_name} '{label}' erfolgreich erstellt"]
if entity_name == "Parzelle":
if operation_result.get("plz"):
msg_parts.append(f"PLZ: {operation_result['plz']}")
if operation_result.get("kontextGemeinde"):
msg_parts.append(f"Gemeinde: {operation_result['kontextGemeinde']}")
if operation_result.get("bauzone"):
msg_parts.append(f"Bauzone: {operation_result['bauzone']}")
kontext_items = operation_result.get("kontextInformationen", [])
if kontext_items:
msg_parts.append(f"\n📋 {len(kontext_items)} Kontextinformationen gespeichert:")
for kontext in kontext_items[:5]: # Show first 5
thema = kontext.get("thema", "")
inhalt = kontext.get("inhalt", "")
if thema and inhalt:
msg_parts.append(f"{thema}: {inhalt}")
if len(kontext_items) > 5:
msg_parts.append(f" • ... und {len(kontext_items) - 5} weitere")
elif entity_name == "Projekt":
if operation_result.get("statusProzess"):
msg_parts.append(f"Status: {operation_result['statusProzess']}")
parzellen = operation_result.get("parzellen", [])
if parzellen:
msg_parts.append(f"{len(parzellen)} Parzelle(n)")
response["message"] = "\n".join(msg_parts)
elif intentAnalysis["intent"] == "READ" and isinstance(result, dict):
operation_result = result.get("result")
if isinstance(operation_result, list):
response["count"] = len(operation_result)
entity_name = intentAnalysis.get('entity', 'Einträge')
if len(operation_result) == 0:
# Provide helpful message for empty results
filter_info = intentAnalysis.get('parameters', {})
if filter_info:
filter_desc = ", ".join([f"{k}={v}" for k, v in filter_info.items()])
response["message"] = f"Keine {entity_name} gefunden mit Filter: {filter_desc}. Möglicherweise sind noch keine Daten vorhanden oder der Filter ist zu spezifisch."
else:
response["message"] = f"Keine {entity_name} vorhanden. Erstellen Sie zuerst neue Einträge."
else:
# Create detailed summary based on entity type
response["message"] = _formatEntitySummary(
entity_name,
operation_result,
intentAnalysis.get('parameters', {})
)
elif isinstance(operation_result, dict):
response["count"] = 1
# Format single entity
entity_name = intentAnalysis.get('entity', 'Eintrag')
response["message"] = _formatEntitySummary(entity_name, [operation_result], {})
return response
except Exception as e:
logger.error(f"Error processing natural language command: {str(e)}", exc_info=True)
raise
async def analyzeUserIntent(
aiService,
userInput: str
) -> Dict[str, Any]:
"""
Use AI to analyze user input and extract intent, entity, and parameters.
Args:
aiService: AI service instance
userInput: Natural language user input
Returns:
Dictionary with 'intent', 'entity', and 'parameters'
"""
# Create a structured prompt for intent analysis with accurate field information
intentPrompt = f"""
Analyze the following user command and extract the intent, entity, and parameters.
User Command: "{userInput}"
Available intents:
- CREATE: User wants to create a new entity
- READ: User wants to read/query entities
- UPDATE: User wants to update an existing entity
- DELETE: User wants to delete an entity
- QUERY: User wants to execute a database query (SQL statements)
Available entities and their fields:
**Projekt** (Real estate project):
- id: string (primary key)
- mandateId: string (mandate ID)
- label: string (project designation/name)
- statusProzess: string enum (Eingang, Analyse, Studie, Planung, Baurechtsverfahren, Umsetzung, Archiv)
- perimeter: GeoPolylinie (geographic boundary, JSONB)
- baulinie: GeoPolylinie (building line, JSONB)
- parzellen: List[Parzelle] (plots belonging to project, JSONB)
- dokumente: List[Dokument] (documents, JSONB)
- kontextInformationen: List[Kontext] (context info, JSONB)
**Parzelle** (Plot/parcel):
- id: string (primary key)
- mandateId: string (mandate ID)
- label: string (plot designation)
- strasseNr: string (street and house number)
- plz: string (postal code)
- kontextGemeinde: string (municipality ID, Foreign Key to Gemeinde table)
- bauzone: string (building zone, e.g. W3, WG2)
- az: float (Ausnützungsziffer)
- bz: float (Bebauungsziffer)
- vollgeschossZahl: int (number of allowed full floors)
- gebaeudehoeheMax: float (maximum building height in meters)
- laermschutzzone: string (noise protection zone)
- hochwasserschutzzone: string (flood protection zone)
- grundwasserschutzzone: string (groundwater protection zone)
- parzelleBebaut: JaNein enum (is plot built)
- parzelleErschlossen: JaNein enum (is plot developed)
- parzelleHanglage: JaNein enum (is plot on slope)
- kontextInformationen: List[Kontext] (metadata - each item has 'thema' and 'inhalt' fields only)
**Kontext** (Context information for metadata):
- thema: string (topic/subject, e.g. "EGRID", "Fläche", "Zentrum")
- inhalt: string (content as text, e.g. "CH887199917793", "6514.99 m²", "X: 123, Y: 456")
**Important relationships:**
- Projekte contain Parzellen (projects have plots)
- Parzelle links to Gemeinde (via kontextGemeinde)
- Gemeinde links to Kanton (via id_kanton)
- Kanton links to Land (via id_land)
- Location queries (city, postal code) should use Parzelle.kontextGemeinde (municipality name will be resolved to ID)
- Projekt does NOT have location fields directly - location is stored in associated Parzellen
Return a JSON object with the following structure:
{{
"intent": "CREATE|READ|UPDATE|DELETE|QUERY",
"entity": "Projekt|Parzelle|Dokument|Kanton|Gemeinde|null",
"parameters": {{
// Extracted parameters from user input
// For CREATE/UPDATE: include all relevant fields using EXACT field names from above
// For READ: include filter criteria using EXACT field names (id, label, plz, kontextGemeinde, etc.)
// For DELETE: include entity ID if mentioned
// For QUERY: include queryText if SQL is detected
// IMPORTANT: Use only field names that exist in the entity definition above
}},
"confidence": 0.0-1.0 // Confidence score for the analysis
}}
Examples:
- Input: "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
Output: {{"intent": "CREATE", "entity": "Projekt", "parameters": {{"label": "Hauptstrasse 42"}}, "confidence": 0.95}}
- Input: "Erstelle eine Parzelle mit Label 123, PLZ 8000, Gemeinde Zürich, Bauzone W3"
Output: {{"intent": "CREATE", "entity": "Parzelle", "parameters": {{"label": "123", "plz": "8000", "kontextGemeinde": "Zürich", "bauzone": "W3"}}, "confidence": 0.95}}
- Input: "Parzellen-Informationen: ID:AA1704, Nummer:AA1704, EGRID:CH887199917793, Kanton:ZH, Gemeinde:Zürich, Gemeinde-Code:261, Fläche:6514.99 m², Zentrum:2682951.44,1247622.91"
Output: {{
"intent": "CREATE",
"entity": "Parzelle",
"parameters": {{
"label": "AA1704",
"parzellenAliasTags": ["AA1704"],
"kontextGemeinde": "Zürich",
"kontextInformationen": [
{{"thema": "EGRID", "inhalt": "CH887199917793"}},
{{"thema": "Kanton", "inhalt": "ZH"}},
{{"thema": "BFS-Nummer", "inhalt": "261"}},
{{"thema": "Fläche", "inhalt": "6514.99 m²"}},
{{"thema": "Zentrum (LV95)", "inhalt": "X: 2682951.44 m, Y: 1247622.91 m (EPSG:2056)"}}
]
}},
"confidence": 0.9
}}
Note: Extract structured data from detailed input. Use kontextInformationen for metadata. Each item has 'thema' (topic) and 'inhalt' (content as text).
- Input: "Zeige mir alle Projekte"
Output: {{"intent": "READ", "entity": "Projekt", "parameters": {{}}, "confidence": 0.9}}
- Input: "Zeige mir Projekte in Zürich" or "Wie viele Projekte in Zürich"
Output: {{"intent": "READ", "entity": "Projekt", "parameters": {{"location_filter": "Zürich"}}, "confidence": 0.9}}
Note: For project location queries, use Projekt entity with location_filter parameter
- Input: "Zeige mir Parzellen mit PLZ 8000"
Output: {{"intent": "READ", "entity": "Parzelle", "parameters": {{"plz": "8000"}}, "confidence": 0.95}}
- Input: "Aktualisiere Projekt XYZ mit Status 'Planung'"
Output: {{"intent": "UPDATE", "entity": "Projekt", "parameters": {{"id": "XYZ", "statusProzess": "Planung"}}, "confidence": 0.85}}
- Input: "SELECT * FROM Projekt WHERE label = 'Test'"
Output: {{"intent": "QUERY", "entity": null, "parameters": {{"queryText": "SELECT * FROM Projekt WHERE label = 'Test'", "queryType": "sql"}}, "confidence": 1.0}}
- Input: "Lösche Parzelle ABC"
Output: {{"intent": "DELETE", "entity": "Parzelle", "parameters": {{"id": "ABC"}}, "confidence": 0.9}}
IMPORTANT EXTRACTION RULES:
1. For CREATE operations, extract ALL mentioned data fields from the user input
2. Use kontextInformationen array for metadata that doesn't have dedicated fields (EGRID, BFS numbers, area, coordinates, etc.)
3. Each kontextInformationen item MUST have exactly two fields: 'thema' (topic/subject) and 'inhalt' (content as text string)
4. Format kontextInformationen values as readable text strings, including units (e.g., "6514.99 m²", "X: 123, Y: 456")
5. Match field names EXACTLY to the entity definition above
6. Convert data types correctly (strings for text, numbers for numeric values)
7. Extract coordinates, areas, and other numeric values from text
8. When multiple values are mentioned for the same concept (ID, Nummer, Name), use the most relevant one for 'label' and put alternatives in parzellenAliasTags
"""
try:
# Use AI planning call for structured JSON response
response = await aiService.callAiPlanning(
prompt=intentPrompt,
debugType="intentanalysis"
)
# Extract JSON from response (handles markdown code blocks)
jsonStart = response.find('{')
jsonEnd = response.rfind('}') + 1
if jsonStart == -1 or jsonEnd == 0:
raise ValueError("No JSON found in AI response")
jsonStr = response[jsonStart:jsonEnd]
# Parse JSON response
intentData = json.loads(jsonStr)
# Validate response structure
if "intent" not in intentData:
raise ValueError("Invalid intent analysis response: missing 'intent' field")
# Ensure parameters exists
if "parameters" not in intentData:
intentData["parameters"] = {}
logger.debug(f"Parsed intent analysis: {intentData}")
return intentData
except json.JSONDecodeError as e:
logger.error(f"Failed to parse AI intent analysis response: {e}")
logger.error(f"Raw response: {response}")
raise ValueError(f"AI returned invalid JSON: {str(e)}")
except Exception as e:
logger.error(f"Error analyzing user intent: {str(e)}", exc_info=True)
raise
async def executeIntentBasedOperation(
currentUser: User,
intent: str,
entity: Optional[str],
parameters: Dict[str, Any],
) -> Dict[str, Any]:
"""
Execute CRUD operation based on analyzed intent.
Args:
currentUser: Current authenticated user
intent: Intent from AI analysis (CREATE, READ, UPDATE, DELETE, QUERY)
entity: Entity type from AI analysis
parameters: Extracted parameters from AI analysis
Returns:
Operation result
Note:
- TODO: Implement actual interface calls once datamodels are ready
- Currently returns test responses showing what would be executed
"""
try:
logger.info(f"Executing intent-based operation: intent={intent}, entity={entity}")
logger.debug(f"Parameters: {parameters}")
if intent == "QUERY":
# Execute database query directly (stateless)
queryText = parameters.get("queryText", "")
if not queryText:
raise ValueError("QUERY intent requires queryText in parameters")
result = await executeDirectQuery(
currentUser=currentUser,
queryText=queryText,
parameters=parameters.get("queryParameters"),
)
return result
elif intent == "CREATE":
# Create new entity
realEstateInterface = getRealEstateInterface(currentUser)
if entity == "Projekt":
# Create Projekt from parameters
projekt = Projekt(
mandateId=currentUser.mandateId,
label=parameters.get("label", ""),
statusProzess=StatusProzess(parameters.get("statusProzess", "EINGANG")) if parameters.get("statusProzess") else None,
)
created = realEstateInterface.createProjekt(projekt)
return {
"operation": "CREATE",
"entity": "Projekt",
"result": created.model_dump()
}
elif entity == "Parzelle":
# Create Parzelle from parameters
# Import Kontext for kontextInformationen
from modules.datamodels.datamodelRealEstate import Kontext, GeoPolylinie
# Build parzelle data with all extracted parameters
parzelle_data = {
"mandateId": currentUser.mandateId,
"label": parameters.get("label", ""),
}
# Add optional fields if present
optional_fields = [
"parzellenAliasTags", "eigentuemerschaft", "strasseNr", "plz",
"bauzone", "az", "bz", "vollgeschossZahl", "anrechenbarDachgeschoss",
"anrechenbarUntergeschoss", "gebaeudehoeheMax", "kontextGemeinde",
"regelnGrenzabstand", "regelnMehrlaengenzuschlag", "regelnMehrhoehenzuschlag",
"parzelleBebaut", "parzelleErschlossen", "parzelleHanglage",
"laermschutzzone", "hochwasserschutzzone", "grundwasserschutzzone"
]
for field in optional_fields:
if field in parameters and parameters[field] is not None:
parzelle_data[field] = parameters[field]
# Handle complex objects
if "perimeter" in parameters and parameters["perimeter"]:
parzelle_data["perimeter"] = GeoPolylinie(**parameters["perimeter"])
elif "kontextGemeinde" in parameters and parameters.get("kontextGemeinde"):
# Try to fetch polygon from Swisstopo if gemeinde and parzellen_nr are available
gemeinde = parameters.get("kontextGemeinde")
parzellen_nr = parameters.get("label") or parameters.get("parzellen_nr") or parameters.get("parzellennummer")
if gemeinde and parzellen_nr:
logger.info(f"Attempting to fetch polygon from Swisstopo for {gemeinde} {parzellen_nr}")
try:
# Try to resolve gemeinde name if it's an ID
gemeinde_name = gemeinde
if len(gemeinde) == 36: # UUID format
# Try to get gemeinde name from interface (realEstateInterface already initialized above)
gemeinde_obj = realEstateInterface.getGemeinde(gemeinde)
if gemeinde_obj:
gemeinde_name = gemeinde_obj.label
polygon_data = await fetch_parcel_polygon_from_swisstopo(
gemeinde=gemeinde_name,
parzellen_nr=str(parzellen_nr),
sr=2056
)
if polygon_data:
parzelle_data["perimeter"] = GeoPolylinie(**polygon_data)
logger.info(f"Successfully fetched and set perimeter from Swisstopo")
else:
logger.warning(f"Could not fetch polygon from Swisstopo for {gemeinde_name} {parzellen_nr}")
except Exception as e:
logger.warning(f"Error fetching polygon from Swisstopo (continuing without): {e}")
if "baulinie" in parameters and parameters["baulinie"]:
parzelle_data["baulinie"] = GeoPolylinie(**parameters["baulinie"])
# Handle kontextInformationen (convert dicts to Kontext objects)
if "kontextInformationen" in parameters and parameters["kontextInformationen"]:
kontext_list = []
for kontext_data in parameters["kontextInformationen"]:
if isinstance(kontext_data, dict):
# Ensure only thema and inhalt are passed (Kontext model only has these fields)
kontext_obj = Kontext(
thema=kontext_data.get("thema", ""),
inhalt=kontext_data.get("inhalt", "")
)
kontext_list.append(kontext_obj)
else:
kontext_list.append(kontext_data)
parzelle_data["kontextInformationen"] = kontext_list
parzelle = Parzelle(**parzelle_data)
created = realEstateInterface.createParzelle(parzelle)
logger.info(f"Created Parzelle '{created.label}' with {len(created.kontextInformationen)} context items")
return {
"operation": "CREATE",
"entity": "Parzelle",
"result": created.model_dump()
}
elif entity == "Gemeinde":
# Create Gemeinde from parameters
from modules.datamodels.datamodelRealEstate import Gemeinde
gemeinde = Gemeinde(
mandateId=currentUser.mandateId,
label=parameters.get("label", ""),
id_kanton=parameters.get("id_kanton"),
plz=parameters.get("plz"),
)
created = realEstateInterface.createGemeinde(gemeinde)
return {
"operation": "CREATE",
"entity": "Gemeinde",
"result": created.model_dump()
}
elif entity == "Kanton":
# Create Kanton from parameters
from modules.datamodels.datamodelRealEstate import Kanton
kanton = Kanton(
mandateId=currentUser.mandateId,
label=parameters.get("label", ""),
id_land=parameters.get("id_land"),
abk=parameters.get("abk"),
)
created = realEstateInterface.createKanton(kanton)
return {
"operation": "CREATE",
"entity": "Kanton",
"result": created.model_dump()
}
elif entity == "Land":
# Create Land from parameters
from modules.datamodels.datamodelRealEstate import Land
land = Land(
mandateId=currentUser.mandateId,
label=parameters.get("label", ""),
abk=parameters.get("abk"),
)
created = realEstateInterface.createLand(land)
return {
"operation": "CREATE",
"entity": "Land",
"result": created.model_dump()
}
elif entity == "Dokument":
# Create Dokument from parameters
from modules.datamodels.datamodelRealEstate import Dokument
dokument = Dokument(
mandateId=currentUser.mandateId,
label=parameters.get("label", ""),
dokumentReferenz=parameters.get("dokumentReferenz", ""),
versionsbezeichnung=parameters.get("versionsbezeichnung"),
dokumentTyp=parameters.get("dokumentTyp"),
quelle=parameters.get("quelle"),
mimeType=parameters.get("mimeType"),
)
created = realEstateInterface.createDokument(dokument)
return {
"operation": "CREATE",
"entity": "Dokument",
"result": created.model_dump()
}
else:
raise ValueError(f"CREATE operation not supported for entity: {entity}")
elif intent == "READ":
# Read entities
realEstateInterface = getRealEstateInterface(currentUser)
if entity == "Projekt":
projektId = parameters.get("id")
if projektId:
# Get single Projekt by ID
projekt = realEstateInterface.getProjekt(projektId)
if not projekt:
raise ValueError(f"Projekt {projektId} not found")
return {
"operation": "READ",
"entity": "Projekt",
"result": projekt.model_dump()
}
else:
# List all Projekte (with optional filters)
# Validate filter fields against Projekt model
validProjektFields = {"id", "mandateId", "label", "statusProzess"}
recordFilter = {
k: v for k, v in parameters.items()
if k != "id" and k in validProjektFields
}
# Handle location_filter specially (filter projects by parcel location)
location_filter = parameters.get("location_filter")
# Get all projects first
projekte = realEstateInterface.getProjekte(recordFilter=recordFilter if recordFilter else None)
# If location filter is present, filter by parcels in that location
if location_filter:
logger.info(f"Filtering projects by location: {location_filter}")
# Try to resolve location to Gemeinde ID for UUID comparison
location_id = None
try:
# Check if it's already a UUID
import re
uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.IGNORECASE)
if not uuid_pattern.match(location_filter):
# Try to resolve name to ID
gemeinde_records = realEstateInterface.getGemeinden(recordFilter={"label": location_filter})
if gemeinde_records:
location_id = gemeinde_records[0].id
logger.debug(f"Resolved location '{location_filter}' to ID '{location_id}'")
except Exception as e:
logger.debug(f"Could not resolve location filter: {e}")
filtered_projekte = []
for projekt in projekte:
# Check if any parcel in the project matches the location
for parzelle in projekt.parzellen:
# Check kontextGemeinde (both UUID and string), plz, or strasseNr for location match
location_lower = location_filter.lower()
matches = False
# Check if kontextGemeinde matches (as UUID or string)
if parzelle.kontextGemeinde:
if (parzelle.kontextGemeinde == location_id or # UUID match
parzelle.kontextGemeinde == location_filter or # Exact match
location_lower in parzelle.kontextGemeinde.lower()): # Partial string match
matches = True
# Check PLZ or address
if not matches and (
(parzelle.plz and location_lower in parzelle.plz) or
(parzelle.strasseNr and location_lower in parzelle.strasseNr.lower())
):
matches = True
if matches:
filtered_projekte.append(projekt)
break # Found a matching parcel, no need to check more
projekte = filtered_projekte
logger.info(f"Found {len(projekte)} projects in location '{location_filter}'")
return {
"operation": "READ",
"entity": "Projekt",
"result": [p.model_dump() for p in projekte],
"count": len(projekte)
}
elif entity == "Parzelle":
parzelleId = parameters.get("id")
if parzelleId:
# Get single Parzelle by ID
parzelle = realEstateInterface.getParzelle(parzelleId)
if not parzelle:
raise ValueError(f"Parzelle {parzelleId} not found")
return {
"operation": "READ",
"entity": "Parzelle",
"result": parzelle.model_dump()
}
else:
# List all Parzellen (with optional filters)
# Validate filter fields against Parzelle model
# Note: kontextKanton and kontextLand are NOT direct fields on Parzelle
# Parzelle links to Gemeinde, Gemeinde links to Kanton, Kanton links to Land
validParzelleFields = {
"id", "mandateId", "label", "strasseNr", "plz",
"kontextGemeinde", # Only direct link - Gemeinde → Kanton → Land
"bauzone", "az", "bz", "vollgeschossZahl", "gebaeudehoeheMax",
"laermschutzzone", "hochwasserschutzzone", "grundwasserschutzzone",
"parzelleBebaut", "parzelleErschlossen", "parzelleHanglage"
}
recordFilter = {
k: v for k, v in parameters.items()
if k != "id" and k in validParzelleFields
}
# Warn about invalid fields
invalidFields = {k: v for k, v in parameters.items() if k not in validParzelleFields and k != "id"}
if invalidFields:
logger.warning(f"Invalid filter fields for Parzelle ignored: {list(invalidFields.keys())}")
parzellen = realEstateInterface.getParzellen(recordFilter=recordFilter if recordFilter else None)
# Debug logging for empty results
if not parzellen and recordFilter:
logger.info(f"No Parzellen found matching filter: {recordFilter}")
# Get total count to help debug
all_parzellen = realEstateInterface.getParzellen(recordFilter=None)
logger.info(f"Total Parzellen in database: {len(all_parzellen)}")
if all_parzellen:
# Show some sample kontextGemeinde values
sample_gemeinden = set()
for p in all_parzellen[:10]:
if p.kontextGemeinde:
sample_gemeinden.add(p.kontextGemeinde)
logger.info(f"Sample kontextGemeinde values in database: {sample_gemeinden}")
return {
"operation": "READ",
"entity": "Parzelle",
"result": [p.model_dump() for p in parzellen],
"count": len(parzellen)
}
elif entity == "Gemeinde":
from modules.datamodels.datamodelRealEstate import Gemeinde
gemeindeId = parameters.get("id")
if gemeindeId:
gemeinde = realEstateInterface.getGemeinde(gemeindeId)
if not gemeinde:
raise ValueError(f"Gemeinde {gemeindeId} not found")
return {
"operation": "READ",
"entity": "Gemeinde",
"result": gemeinde.model_dump()
}
else:
recordFilter = {k: v for k, v in parameters.items() if k != "id"}
gemeinden = realEstateInterface.getGemeinden(recordFilter=recordFilter if recordFilter else None)
return {
"operation": "READ",
"entity": "Gemeinde",
"result": [g.model_dump() for g in gemeinden],
"count": len(gemeinden)
}
elif entity == "Kanton":
from modules.datamodels.datamodelRealEstate import Kanton
kantonId = parameters.get("id")
if kantonId:
kanton = realEstateInterface.getKanton(kantonId)
if not kanton:
raise ValueError(f"Kanton {kantonId} not found")
return {
"operation": "READ",
"entity": "Kanton",
"result": kanton.model_dump()
}
else:
recordFilter = {k: v for k, v in parameters.items() if k != "id"}
kantone = realEstateInterface.getKantone(recordFilter=recordFilter if recordFilter else None)
return {
"operation": "READ",
"entity": "Kanton",
"result": [k.model_dump() for k in kantone],
"count": len(kantone)
}
elif entity == "Land":
from modules.datamodels.datamodelRealEstate import Land
landId = parameters.get("id")
if landId:
land = realEstateInterface.getLand(landId)
if not land:
raise ValueError(f"Land {landId} not found")
return {
"operation": "READ",
"entity": "Land",
"result": land.model_dump()
}
else:
recordFilter = {k: v for k, v in parameters.items() if k != "id"}
laender = realEstateInterface.getLaender(recordFilter=recordFilter if recordFilter else None)
return {
"operation": "READ",
"entity": "Land",
"result": [l.model_dump() for l in laender],
"count": len(laender)
}
elif entity == "Dokument":
from modules.datamodels.datamodelRealEstate import Dokument
dokumentId = parameters.get("id")
if dokumentId:
dokument = realEstateInterface.getDokument(dokumentId)
if not dokument:
raise ValueError(f"Dokument {dokumentId} not found")
return {
"operation": "READ",
"entity": "Dokument",
"result": dokument.model_dump()
}
else:
recordFilter = {k: v for k, v in parameters.items() if k != "id"}
dokumente = realEstateInterface.getDokumente(recordFilter=recordFilter if recordFilter else None)
return {
"operation": "READ",
"entity": "Dokument",
"result": [d.model_dump() for d in dokumente],
"count": len(dokumente)
}
else:
raise ValueError(f"READ operation not supported for entity: {entity}")
elif intent == "UPDATE":
# Update existing entity
realEstateInterface = getRealEstateInterface(currentUser)
if entity == "Projekt":
projektId = parameters.get("id")
if not projektId:
raise ValueError("UPDATE operation requires entity ID")
# Get existing projekt
projekt = realEstateInterface.getProjekt(projektId)
if not projekt:
raise ValueError(f"Projekt {projektId} not found")
# Update fields
updateData = {k: v for k, v in parameters.items() if k != "id"}
updated = realEstateInterface.updateProjekt(projektId, updateData)
return {
"operation": "UPDATE",
"entity": "Projekt",
"result": updated.model_dump()
}
elif entity == "Parzelle":
parzelleId = parameters.get("id")
if not parzelleId:
raise ValueError("UPDATE operation requires entity ID")
# Get existing parzelle
parzelle = realEstateInterface.getParzelle(parzelleId)
if not parzelle:
raise ValueError(f"Parzelle {parzelleId} not found")
# Update fields
updateData = {k: v for k, v in parameters.items() if k != "id"}
updated = realEstateInterface.updateParzelle(parzelleId, updateData)
return {
"operation": "UPDATE",
"entity": "Parzelle",
"result": updated.model_dump()
}
elif entity == "Gemeinde":
from modules.datamodels.datamodelRealEstate import Gemeinde
gemeindeId = parameters.get("id")
if not gemeindeId:
raise ValueError("UPDATE operation requires entity ID")
gemeinde = realEstateInterface.getGemeinde(gemeindeId)
if not gemeinde:
raise ValueError(f"Gemeinde {gemeindeId} not found")
updateData = {k: v for k, v in parameters.items() if k != "id"}
updated = realEstateInterface.updateGemeinde(gemeindeId, updateData)
return {
"operation": "UPDATE",
"entity": "Gemeinde",
"result": updated.model_dump()
}
elif entity == "Kanton":
from modules.datamodels.datamodelRealEstate import Kanton
kantonId = parameters.get("id")
if not kantonId:
raise ValueError("UPDATE operation requires entity ID")
kanton = realEstateInterface.getKanton(kantonId)
if not kanton:
raise ValueError(f"Kanton {kantonId} not found")
updateData = {k: v for k, v in parameters.items() if k != "id"}
updated = realEstateInterface.updateKanton(kantonId, updateData)
return {
"operation": "UPDATE",
"entity": "Kanton",
"result": updated.model_dump()
}
elif entity == "Land":
from modules.datamodels.datamodelRealEstate import Land
landId = parameters.get("id")
if not landId:
raise ValueError("UPDATE operation requires entity ID")
land = realEstateInterface.getLand(landId)
if not land:
raise ValueError(f"Land {landId} not found")
updateData = {k: v for k, v in parameters.items() if k != "id"}
updated = realEstateInterface.updateLand(landId, updateData)
return {
"operation": "UPDATE",
"entity": "Land",
"result": updated.model_dump()
}
elif entity == "Dokument":
from modules.datamodels.datamodelRealEstate import Dokument
dokumentId = parameters.get("id")
if not dokumentId:
raise ValueError("UPDATE operation requires entity ID")
dokument = realEstateInterface.getDokument(dokumentId)
if not dokument:
raise ValueError(f"Dokument {dokumentId} not found")
updateData = {k: v for k, v in parameters.items() if k != "id"}
updated = realEstateInterface.updateDokument(dokumentId, updateData)
return {
"operation": "UPDATE",
"entity": "Dokument",
"result": updated.model_dump()
}
else:
raise ValueError(f"UPDATE operation not supported for entity: {entity}")
elif intent == "DELETE":
# Delete entity
realEstateInterface = getRealEstateInterface(currentUser)
if entity == "Projekt":
projektId = parameters.get("id")
if not projektId:
raise ValueError("DELETE operation requires entity ID")
success = realEstateInterface.deleteProjekt(projektId)
return {
"operation": "DELETE",
"entity": "Projekt",
"success": success
}
elif entity == "Parzelle":
parzelleId = parameters.get("id")
if not parzelleId:
raise ValueError("DELETE operation requires entity ID")
success = realEstateInterface.deleteParzelle(parzelleId)
return {
"operation": "DELETE",
"entity": "Parzelle",
"success": success
}
elif entity == "Gemeinde":
from modules.datamodels.datamodelRealEstate import Gemeinde
gemeindeId = parameters.get("id")
if not gemeindeId:
raise ValueError("DELETE operation requires entity ID")
success = realEstateInterface.deleteGemeinde(gemeindeId)
return {
"operation": "DELETE",
"entity": "Gemeinde",
"success": success
}
elif entity == "Kanton":
from modules.datamodels.datamodelRealEstate import Kanton
kantonId = parameters.get("id")
if not kantonId:
raise ValueError("DELETE operation requires entity ID")
success = realEstateInterface.deleteKanton(kantonId)
return {
"operation": "DELETE",
"entity": "Kanton",
"success": success
}
elif entity == "Land":
from modules.datamodels.datamodelRealEstate import Land
landId = parameters.get("id")
if not landId:
raise ValueError("DELETE operation requires entity ID")
success = realEstateInterface.deleteLand(landId)
return {
"operation": "DELETE",
"entity": "Land",
"success": success
}
elif entity == "Dokument":
from modules.datamodels.datamodelRealEstate import Dokument
dokumentId = parameters.get("id")
if not dokumentId:
raise ValueError("DELETE operation requires entity ID")
success = realEstateInterface.deleteDokument(dokumentId)
return {
"operation": "DELETE",
"entity": "Dokument",
"success": success
}
else:
raise ValueError(f"DELETE operation not supported for entity: {entity}")
else:
raise ValueError(f"Unknown intent: {intent}")
except Exception as e:
logger.error(f"Error executing intent-based operation: {str(e)}", exc_info=True)
raise