gateway/docs/real-estate-feature-integration-guide/04-feature-logic.md

780 lines
25 KiB
Markdown

# Schritt 3: Feature-Logik implementieren
[← Zurück: Interface erstellen](03-interfaces.md) | [Weiter: Routen erstellen →](05-routes.md)
**Datei:** `modules/features/realEstate/mainRealEstate.py`
Die Feature-Logik enthält die Geschäftslogik für das Feature. Sie wird von den Routen aufgerufen und arbeitet **stateless** ohne Session-Management.
## Übersicht: Stateless Feature-Logik mit AI-Integration
Die Feature-Logik verwendet **AI**, um natürliche Sprache direkt in CRUD-Operationen zu übersetzen - ohne Session-Management:
```
User Input (natürliche Sprache)
AI-Analyse (Intent-Erkennung)
CRUD-Operation identifizieren
Parameter extrahieren
Interface CRUD-Methode aufrufen
Datenbank-Operation ausführen
Ergebnis zurückgeben (keine Session-Speicherung)
```
**Beispiel:**
- User: "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
- AI analysiert → Intent: CREATE, Entity: Projekt, Parameter: {label: "Hauptstrasse 42"}
- Feature-Logik ruft auf → `interface.createProjekt(Projekt(label="Hauptstrasse 42"))`
- Ergebnis wird direkt zurückgegeben (keine Session, keine History)
## AI-Integration: Services initialisieren
Um AI zu verwenden, müssen Sie die **Services** initialisieren. Services sind eine zentrale Schnittstelle zu verschiedenen Systemkomponenten (AI, Chat, Database, etc.).
### Services-Initialisierung
```python
from modules.services import getInterface as getServices
# Services für einen User erhalten
services = getServices(currentUser, workflow=None)
# AI-Service verfügbar über:
aiService = services.ai # Für AI-Aufrufe
```
**Wichtig:** Services werden normalerweise im Feature-Logik-Modul initialisiert und an Funktionen weitergegeben.
---
## AI-basierte Intent-Erkennung und CRUD-Operationen
### Schritt 1: Intent-Analyse mit AI
Die AI analysiert User-Input und identifiziert:
- **Intent**: CREATE, READ, UPDATE, DELETE, QUERY
- **Entity**: Projekt, Parzelle, Dokument, etc.
- **Parameter**: Extrahierte Werte aus dem User-Input
### Schritt 2: CRUD-Operation ausführen
Basierend auf der AI-Analyse wird die entsprechende Interface-Methode aufgerufen.
---
## Beispiel-Implementierung:
```python
"""
Real Estate feature main logic.
Handles chat interface for database queries with AI-powered natural language processing.
"""
import logging
import json
from typing import Optional, Dict, Any, List
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelRealEstate import (
Projekt,
Parzelle,
StatusProzess,
)
from modules.interfaces.interfaceDbRealEstateChatObjects import getInterface as getChatInterface
from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface
from modules.services import getInterface as getServices
logger = logging.getLogger(__name__)
# ===== Direkte Query-Ausführung (stateless) =====
async def executeDirectQuery(
currentUser: User,
queryText: str,
parameters: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Execute a database query directly without session management.
Args:
currentUser: Current authenticated user
queryText: SQL query text
parameters: Optional parameters for parameterized queries
Returns:
Dictionary containing query result (rows, columns, rowCount)
Note:
- No session or query history is saved
- Query is executed directly and result is returned
- For production, validate and sanitize queries before execution
"""
try:
chatInterface = getChatInterface(currentUser)
# Execute query directly (no session tracking)
result = chatInterface.executeQuery(queryText, parameters)
logger.info(
f"Query executed successfully: {result['rowCount']} rows in {result.get('executionTime', 0):.3f}s"
)
return {
"status": "success",
"rows": result["rows"],
"columns": result["columns"],
"rowCount": result["rowCount"],
"executionTime": result.get("executionTime", 0),
}
except Exception as e:
logger.error(f"Error executing query: {str(e)}")
raise
# ===== AI-basierte Intent-Erkennung und CRUD-Operationen =====
async def processNaturalLanguageCommand(
currentUser: User,
userInput: str,
) -> Dict[str, Any]:
"""
Process natural language user input and execute corresponding CRUD operations.
Uses AI to analyze user intent and extract parameters, then executes the appropriate
CRUD operation through the interface. Works stateless without session management.
Args:
currentUser: Current authenticated user
userInput: Natural language command from user
Returns:
Dictionary containing operation result and metadata
Example user inputs:
- "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
- "Zeige mir alle Projekte in Zürich"
- "Aktualisiere Projekt XYZ mit Status 'Planung'"
- "Lösche Parzelle ABC"
- "SELECT * FROM Projekt WHERE plz = '8000'"
"""
try:
# Initialize services for AI access
services = getServices(currentUser, workflow=None)
aiService = services.ai
# Step 1: Analyze user intent with AI
intentAnalysis = await analyzeUserIntent(aiService, userInput)
logger.info(f"Intent analysis result: {intentAnalysis}")
# Step 2: Execute CRUD operation based on intent
result = await executeIntentBasedOperation(
currentUser=currentUser,
intent=intentAnalysis["intent"],
entity=intentAnalysis["entity"],
parameters=intentAnalysis["parameters"],
)
return {
"success": True,
"intent": intentAnalysis["intent"],
"entity": intentAnalysis["entity"],
"result": result,
}
except Exception as e:
logger.error(f"Error processing natural language command: {str(e)}")
raise
async def analyzeUserIntent(
aiService,
userInput: str
) -> Dict[str, Any]:
"""
Use AI to analyze user input and extract intent, entity, and parameters.
Args:
aiService: AI service instance
userInput: Natural language user input
Returns:
Dictionary with 'intent', 'entity', and 'parameters'
"""
# Create a structured prompt for intent analysis
intentPrompt = f"""
Analyze the following user command and extract the intent, entity, and parameters.
User Command: "{userInput}"
Available intents:
- CREATE: User wants to create a new entity
- READ: User wants to read/query entities
- UPDATE: User wants to update an existing entity
- DELETE: User wants to delete an entity
- QUERY: User wants to execute a database query
Available entities:
- Projekt: Real estate project
- Parzelle: Plot/parcel
- Dokument: Document
- Kanton: Canton
- Gemeinde: Municipality
Return a JSON object with the following structure:
{{
"intent": "CREATE|READ|UPDATE|DELETE|QUERY",
"entity": "Projekt|Parzelle|Dokument|Kanton|Gemeinde|null",
"parameters": {{
// Extracted parameters from user input
// For CREATE/UPDATE: include all relevant fields
// For READ: include filter criteria
// For DELETE: include entity ID if mentioned
// For QUERY: include query text or natural language query
}},
"confidence": 0.0-1.0 // Confidence score for the analysis
}}
Examples:
- Input: "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
Output: {{"intent": "CREATE", "entity": "Projekt", "parameters": {{"label": "Hauptstrasse 42"}}, "confidence": 0.95}}
- Input: "Zeige mir alle Projekte"
Output: {{"intent": "READ", "entity": "Projekt", "parameters": {{}}, "confidence": 0.9}}
- Input: "SELECT * FROM Projekt WHERE plz = '8000'"
Output: {{"intent": "QUERY", "entity": null, "parameters": {{"queryText": "SELECT * FROM Projekt WHERE plz = '8000'", "queryType": "sql"}}, "confidence": 1.0}}
"""
try:
# Use AI planning call for structured JSON response
response = await aiService.callAiPlanning(
prompt=intentPrompt,
debugType="intentanalysis"
)
# Parse JSON response
intentData = json.loads(response)
# Validate response structure
if "intent" not in intentData or "entity" not in intentData:
raise ValueError("Invalid intent analysis response structure")
return intentData
except json.JSONDecodeError as e:
logger.error(f"Failed to parse AI intent analysis response: {e}")
logger.error(f"Raw response: {response}")
raise ValueError(f"AI returned invalid JSON: {str(e)}")
except Exception as e:
logger.error(f"Error analyzing user intent: {str(e)}")
raise
async def executeIntentBasedOperation(
currentUser: User,
intent: str,
entity: Optional[str],
parameters: Dict[str, Any],
) -> Dict[str, Any]:
"""
Execute CRUD operation based on analyzed intent.
Args:
currentUser: Current authenticated user
intent: Intent from AI analysis (CREATE, READ, UPDATE, DELETE, QUERY)
entity: Entity type from AI analysis
parameters: Extracted parameters from AI analysis
Returns:
Operation result
"""
try:
if intent == "QUERY":
# Execute database query directly (stateless)
queryText = parameters.get("queryText", "")
result = await executeDirectQuery(
currentUser=currentUser,
queryText=queryText,
parameters=parameters.get("queryParameters"),
)
return result
elif intent == "CREATE":
# Create new entity
realEstateInterface = getRealEstateInterface(currentUser)
if entity == "Projekt":
projekt = Projekt(
mandateId=currentUser.mandateId,
label=parameters.get("label", ""),
statusProzess=StatusProzess(parameters.get("statusProzess", "EINGANG")) if parameters.get("statusProzess") else None,
)
created = realEstateInterface.createProjekt(projekt)
return {"operation": "CREATE", "entity": "Projekt", "result": created.model_dump()}
elif entity == "Parzelle":
parzelle = Parzelle(
mandateId=currentUser.mandateId,
label=parameters.get("label", ""),
# ... weitere Parameter
)
created = realEstateInterface.createParzelle(parzelle)
return {"operation": "CREATE", "entity": "Parzelle", "result": created.model_dump()}
else:
raise ValueError(f"CREATE operation not supported for entity: {entity}")
elif intent == "READ":
# Read entities
realEstateInterface = getRealEstateInterface(currentUser)
if entity == "Projekt":
# Apply filters from parameters
projektId = parameters.get("id")
if projektId:
projekt = realEstateInterface.getProjekt(projektId)
return {"operation": "READ", "entity": "Projekt", "result": projekt.model_dump() if projekt else None}
else:
# List all projects (with optional filters)
# Note: You may need to implement getProjekte() method
raise NotImplementedError("List operation needs to be implemented")
else:
raise ValueError(f"READ operation not supported for entity: {entity}")
elif intent == "UPDATE":
# Update existing entity
realEstateInterface = getRealEstateInterface(currentUser)
if entity == "Projekt":
projektId = parameters.get("id")
if not projektId:
raise ValueError("UPDATE operation requires entity ID")
# Get existing projekt
projekt = realEstateInterface.getProjekt(projektId)
if not projekt:
raise ValueError(f"Projekt {projektId} not found")
# Update fields
updateData = {k: v for k, v in parameters.items() if k != "id"}
updated = realEstateInterface.updateProjekt(projektId, updateData)
return {"operation": "UPDATE", "entity": "Projekt", "result": updated.model_dump()}
else:
raise ValueError(f"UPDATE operation not supported for entity: {entity}")
elif intent == "DELETE":
# Delete entity
realEstateInterface = getRealEstateInterface(currentUser)
if entity == "Projekt":
projektId = parameters.get("id")
if not projektId:
raise ValueError("DELETE operation requires entity ID")
success = realEstateInterface.deleteProjekt(projektId)
return {"operation": "DELETE", "entity": "Projekt", "success": success}
else:
raise ValueError(f"DELETE operation not supported for entity: {entity}")
else:
raise ValueError(f"Unknown intent: {intent}")
except Exception as e:
logger.error(f"Error executing intent-based operation: {str(e)}")
raise
# ===== Erweiterte Query-Funktion mit AI-Unterstützung =====
async def executeNaturalLanguageQuery(
currentUser: User,
naturalLanguageQuery: str,
) -> Dict[str, Any]:
"""
Execute a natural language query by translating it to SQL using AI.
Args:
currentUser: Current authenticated user
naturalLanguageQuery: Natural language query (e.g., "Zeige mir alle Projekte in Zürich")
Returns:
Query result with metadata (stateless, no session)
"""
try:
services = getServices(currentUser, workflow=None)
aiService = services.ai
# Step 1: Translate natural language to SQL using AI
sqlQuery = await translateNaturalLanguageToSQL(aiService, naturalLanguageQuery, currentUser.mandateId)
logger.info(f"Translated '{naturalLanguageQuery}' to SQL: {sqlQuery}")
# Step 2: Execute the SQL query directly (stateless)
result = await executeDirectQuery(
currentUser=currentUser,
queryText=sqlQuery,
)
return result
except Exception as e:
logger.error(f"Error executing natural language query: {str(e)}")
raise
async def translateNaturalLanguageToSQL(
aiService,
naturalLanguageQuery: str,
mandateId: str
) -> str:
"""
Use AI to translate natural language query to SQL.
Args:
aiService: AI service instance
naturalLanguageQuery: Natural language query
mandateId: User's mandate ID for filtering
Returns:
SQL query string with mandateId filter applied
"""
translationPrompt = f"""
Translate the following natural language query into a valid PostgreSQL SQL SELECT statement.
Natural Language Query: "{naturalLanguageQuery}"
Available tables and their fields:
- Projekt: id, mandateId, label, statusProzess, perimeter, baulinie, parzellen (JSONB), dokumente (JSONB)
- Parzelle: id, mandateId, label, strasseNr, plz, bauzone, az, bz, kontextKanton, kontextGemeinde
- Dokument: id, mandateId, label, dokumentTyp, dokumentReferenz, mimeType
- Kanton: id, mandateId, label, abk
- Gemeinde: id, mandateId, label, plz
Rules:
1. Always include 'mandateId' filter based on user context (use placeholder {{mandateId}})
2. Only use SELECT statements (no INSERT, UPDATE, DELETE)
3. Return ONLY the SQL query, no explanations
4. Use proper PostgreSQL syntax
5. For text searches, use ILIKE for case-insensitive matching
Examples:
- Input: "Zeige mir alle Projekte"
Output: SELECT * FROM Projekt WHERE mandateId = '{{mandateId}}'
- Input: "Zeige mir alle Parzellen in Zürich"
Output: SELECT p.* FROM Parzelle p JOIN Gemeinde g ON p.kontextGemeinde = g.id WHERE g.label ILIKE '%Zürich%' AND p.mandateId = '{{mandateId}}'
- Input: "Wie viele Projekte haben Status 'Planung'?"
Output: SELECT COUNT(*) as count FROM Projekt WHERE statusProzess = 'Planung' AND mandateId = '{{mandateId}}'
Now translate this query:
"""
try:
# Use AI planning call for SQL generation
response = await aiService.callAiPlanning(
prompt=translationPrompt,
debugType="sqltranslation"
)
# Clean response (remove markdown code blocks if present)
sqlQuery = response.strip()
if sqlQuery.startswith("```sql"):
sqlQuery = sqlQuery[6:]
if sqlQuery.startswith("```"):
sqlQuery = sqlQuery[3:]
if sqlQuery.endswith("```"):
sqlQuery = sqlQuery[:-3]
sqlQuery = sqlQuery.strip()
# Replace placeholder with actual mandateId
sqlQuery = sqlQuery.replace("{{mandateId}}", mandateId)
return sqlQuery
except Exception as e:
logger.error(f"Error translating natural language to SQL: {str(e)}")
raise ValueError(f"Failed to translate query: {str(e)}")
```
## Wichtige Punkte:
### 1. Services-Initialisierung
- **`getServices(currentUser, workflow=None)`** - Initialisiert Services für AI-Zugriff
- **`services.ai`** - Zugriff auf AI-Service für AI-Aufrufe
### 2. AI-Aufrufe
- **`callAiPlanning()`** - Für strukturierte JSON-Antworten (Intent-Analyse, SQL-Übersetzung)
- **`callAiText()`** - Für einfache Text-Generierung
- **`callAiDocuments()`** - Für Dokumenten-Verarbeitung
### 3. Intent-Analyse
Die AI analysiert User-Input und gibt zurück:
- **Intent**: CREATE, READ, UPDATE, DELETE, QUERY
- **Entity**: Projekt, Parzelle, Dokument, etc.
- **Parameters**: Extrahierte Werte aus dem Input
### 4. CRUD-Operationen
Basierend auf der Intent-Analyse:
- **CREATE** → `interface.createProjekt()`, `interface.createParzelle()`, etc.
- **READ** → `interface.getProjekt()`, `interface.getParzelle()`, etc.
- **UPDATE** → `interface.updateProjekt()`, etc.
- **DELETE** → `interface.deleteProjekt()`, etc.
- **QUERY** → `interface.executeQuery()` oder `executeDatabaseQuery()`
### 5. Natural Language to SQL
- AI übersetzt natürliche Sprache in SQL-Queries
- Automatische Validierung und Sanitization empfohlen
- MandateId-Filter wird automatisch hinzugefügt
### 6. Error Handling
- Umfassendes Error Handling für AI-Aufrufe
- JSON-Parsing mit Fallback
- Logging für Debugging
---
## Beispiel-Verwendung:
```python
# In einer Route (stateless):
@router.post("/command")
async def process_command(
userInput: str = Body(...),
currentUser: User = Depends(getCurrentUser)
):
result = await processNaturalLanguageCommand(
currentUser=currentUser,
userInput=userInput
)
return result
# Direkte Query (stateless):
@router.post("/query")
async def execute_query(
queryText: str = Body(...),
currentUser: User = Depends(getCurrentUser)
):
result = await executeDirectQuery(
currentUser=currentUser,
queryText=queryText
)
return result
```
**User-Input-Beispiele:**
- `"Erstelle ein neues Projekt namens 'Hauptstrasse 42'"`
- `"Zeige mir alle Projekte in Zürich"`
- `"Aktualisiere Projekt XYZ mit Status 'Planung'"`
- `"Wie viele Parzellen haben Bauzone W3?"`
---
## Vollständiger Flow: User-Input → CRUD-Operation (stateless)
### Beispiel: "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
```
1. User sendet HTTP POST Request
POST /api/realestate/command
Body: {"userInput": "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"}
2. Route ruft Feature-Logik auf
→ processNaturalLanguageCommand(currentUser, userInput)
# Keine Session-ID notwendig!
3. Feature-Logik initialisiert Services
→ services = getServices(currentUser, workflow=None)
→ aiService = services.ai
4. AI analysiert User-Input
→ analyzeUserIntent(aiService, userInput)
→ AI gibt zurück:
{
"intent": "CREATE",
"entity": "Projekt",
"parameters": {"label": "Hauptstrasse 42"},
"confidence": 0.95
}
5. Feature-Logik führt CRUD-Operation aus
→ executeIntentBasedOperation(intent="CREATE", entity="Projekt", ...)
→ realEstateInterface = getRealEstateInterface(currentUser)
→ projekt = Projekt(mandateId=..., label="Hauptstrasse 42")
→ created = realEstateInterface.createProjekt(projekt)
6. Interface speichert in Datenbank
→ DatabaseConnector.recordCreate(Projekt, projekt.model_dump())
→ PostgreSQL INSERT INTO Projekt ...
7. Ergebnis wird direkt zurückgegeben
→ Route gibt HTTP Response zurück
→ Keine Session-Speicherung, keine History
→ Frontend zeigt Erfolg
```
---
## AI-Service Methoden im Detail
### `callAiPlanning()` - Für strukturierte Antworten
**Verwendung:** Intent-Analyse, SQL-Übersetzung, strukturierte Daten-Extraktion
```python
response = await aiService.callAiPlanning(
prompt=intentPrompt,
debugType="intentanalysis" # Optional: für Debug-Dateien
)
# Response ist JSON-String, muss geparst werden
intentData = json.loads(response)
```
**Vorteile:**
- Optimiert für strukturierte JSON-Antworten
- Verwendet beste Modelle für Planungs-Aufgaben
- Automatisches Debug-File-Writing
### `callAiText()` - Für einfache Text-Generierung
**Verwendung:** Text-Generierung, Zusammenfassungen, Erklärungen
```python
response = await aiService.callAiText(
prompt="Erkläre mir...",
documents=None, # Optional: Dokumente für Kontext
options=AiCallOptions(...)
)
# Response ist direkt Text-String
```
### `callAiDocuments()` - Für Dokumenten-Verarbeitung
**Verwendung:** Dokumenten-Analyse, Extraktion, Generierung mit Dokumenten-Kontext
```python
response = await aiService.callAiDocuments(
prompt="Analysiere diese Dokumente...",
documents=[ChatDocument(...), ...],
options=AiCallOptions(...),
outputFormat="json" # Optional: Format für Output
)
```
---
## Best Practices für AI-Integration
### 1. Prompt-Engineering
- **Klare Struktur**: Definieren Sie genau, welche Antwort Sie erwarten
- **Beispiele**: Geben Sie Beispiele für bessere Ergebnisse
- **Format**: Spezifizieren Sie das erwartete Format (JSON, SQL, etc.)
### 2. Error Handling
- **JSON-Parsing**: Immer try/except für JSON-Parsing
- **Fallback**: Planen Sie Fallback-Strategien bei AI-Fehlern
- **Validierung**: Validieren Sie AI-Antworten vor Verwendung
### 3. Sicherheit
- **Query-Validierung**: Validieren Sie SQL-Queries vor Ausführung
- **Parameter-Sanitization**: Sanitizen Sie alle Parameter
- **MandateId-Filter**: Stellen Sie sicher, dass MandateId immer gefiltert wird
### 4. Performance
- **Caching**: Cache häufige AI-Antworten wenn möglich
- **Model-Auswahl**: Lassen Sie das System automatisch das beste Modell wählen
- **Async**: Nutzen Sie async/await für nicht-blockierende Operationen
### 5. Debugging
- **Debug-Files**: Nutzen Sie `debugType` Parameter für Debug-Dateien
- **Logging**: Loggen Sie alle AI-Aufrufe und Antworten
- **Confidence-Scores**: Nutzen Sie Confidence-Scores für Fehlerbehandlung
---
## Erweiterte Features
### Schema-Aware Prompting
Sie können das Datenbank-Schema in Prompts einbinden:
```python
# Lade Schema-Informationen
schemaInfo = getDatabaseSchema() # Ihre Funktion
prompt = f"""
Available database schema:
{schemaInfo}
User query: "{userInput}"
...
"""
```
### Context-Aware Operations (Optional)
Falls Sie später Kontext zwischen Queries benötigen, können Sie optional eine Session verwenden:
```python
# Optional: Session für Kontext (nur wenn nötig)
# Für stateless Operationen nicht notwendig
# Falls Session gewünscht:
sessionId = parameters.get("sessionId") # Optional
if sessionId:
previousQueries = interface.getQueries(sessionId=sessionId)
context = "\n".join([q.queryText for q in previousQueries[-5:]])
else:
context = "" # Kein Kontext bei stateless Operationen
prompt = f"""
{context if context else ""}
User query: "{userInput}"
...
"""
```
### Multi-Step Operations
Für komplexe Operationen können Sie mehrere AI-Calls machen:
```python
# Schritt 1: Intent-Analyse
intent = await analyzeUserIntent(aiService, userInput)
# Schritt 2: Parameter-Validierung
if intent["intent"] == "CREATE":
validatedParams = await validateParameters(aiService, intent["parameters"])
# Schritt 3: CRUD-Operation
result = await executeIntentBasedOperation(...)
```
---
[← Zurück: Interface erstellen](03-interfaces.md) | [Weiter: Routen erstellen →](05-routes.md)