gateway/modules/routes/routeRealEstate.py

2360 lines
111 KiB
Python

"""
Real Estate routes for the backend API.
Implements stateless endpoints for real estate database operations with AI-powered natural language processing.
"""
import logging
import json
import re
import requests
import aiohttp
import asyncio
import ssl
from urllib.parse import urljoin, urlparse
from typing import Optional, Dict, Any, List, Union
from fastapi import APIRouter, HTTPException, Depends, Body, Request, Query, Path, status
# Import auth modules
from modules.auth import limiter, getCurrentUser
# Import models
from modules.datamodels.datamodelPagination import (
PaginationParams,
PaginatedResponse,
PaginationMetadata,
normalize_pagination_dict,
)
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
from modules.features.realEstate.datamodelFeatureRealEstate import (
Projekt,
Parzelle,
Dokument,
Gemeinde,
Kanton,
Land,
Kontext,
StatusProzess,
DokumentTyp,
)
# Import interfaces
from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface
from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface
# Import feature logic for AI-powered commands
from modules.features.realEstate.mainRealEstate import (
processNaturalLanguageCommand,
create_project_with_parcel_data,
extract_bzo_information,
)
# Import Swiss Topo MapServer connector for testing
from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector
from modules.connectors.connectorOerebWfs import OerebWfsConnector
# Import Tavily connector for BZO document search
from modules.aicore.aicorePluginTavily import AiTavily
# Import helper functions from scraping route
from modules.routes.routeRealEstateScraping import (
_get_language_from_kanton,
_get_bzo_search_query,
)
# Import attribute utilities for model schema
from modules.shared.attributeUtils import getModelAttributeDefinitions
# Configure logger
logger = logging.getLogger(__name__)
# Create router for real estate endpoints
router = APIRouter(
prefix="/api/realestate",
tags=["Real Estate"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"}
}
)
# ===== Helper Functions (instanceId-based routes, backend-driven like Trustee) =====
def _parsePagination(pagination: Optional[str]) -> Optional[PaginationParams]:
"""Parse pagination parameter from JSON string."""
if not pagination:
return None
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
return PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
return None
async def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str:
"""
Validate that the user has access to the feature instance.
Returns the mandateId for the instance.
"""
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(instanceId)
if not instance:
raise HTTPException(
status_code=404,
detail=f"Feature instance '{instanceId}' not found"
)
if instance.featureCode != "realestate":
raise HTTPException(
status_code=400,
detail=f"Instance '{instanceId}' is not a realestate instance"
)
if not context.hasSysAdminRole:
featureAccesses = rootInterface.getFeatureAccessesForUser(str(context.user.id))
hasAccess = any(
str(fa.featureInstanceId) == instanceId and fa.enabled
for fa in featureAccesses
)
if not hasAccess:
raise HTTPException(
status_code=403,
detail=f"Access denied to feature instance '{instanceId}'"
)
return str(instance.mandateId)
# Mapping of entity names to Pydantic model classes (for attributes endpoint)
_REALESTATE_ENTITY_MODELS = {
"Projekt": Projekt,
"Parzelle": Parzelle,
"Dokument": Dokument,
"Gemeinde": Gemeinde,
"Kanton": Kanton,
"Land": Land,
}
# ============================================================================
# INSTANCE-ID ROUTES (backend-driven, analog to Trustee)
# ============================================================================
@router.get("/{instanceId}/attributes/{entityType}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def get_entity_attributes(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
entityType: str = Path(..., description="Entity type (e.g., Projekt, Parzelle)"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get attribute definitions for a Real Estate entity. Used by FormGeneratorTable."""
await _validateInstanceAccess(instanceId, context)
if entityType not in _REALESTATE_ENTITY_MODELS:
raise HTTPException(
status_code=404,
detail=f"Unknown entity type: {entityType}. Valid types: {list(_REALESTATE_ENTITY_MODELS.keys())}"
)
modelClass = _REALESTATE_ENTITY_MODELS[entityType]
try:
attrDefs = getModelAttributeDefinitions(modelClass)
visibleAttrs = [
attr for attr in attrDefs.get("attributes", [])
if isinstance(attr, dict) and attr.get("visible", True)
]
return {"attributes": visibleAttrs}
except Exception as e:
logger.error(f"Error getting attributes for {entityType}: {e}")
raise HTTPException(
status_code=500,
detail=f"Error getting attributes for {entityType}: {str(e)}"
)
@router.get("/{instanceId}/projects/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
async def get_project_options(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get project options for select dropdowns. Returns: [{ value, label }]"""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
items = interface.getProjekte(recordFilter={"featureInstanceId": instanceId})
return [{"value": p.id, "label": getattr(p, "label", None) or p.id} for p in items]
@router.get("/{instanceId}/parcels/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
async def get_parcel_options(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get parcel options for select dropdowns. Returns: [{ value, label }]"""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
items = interface.getParzellen(recordFilter={"featureInstanceId": instanceId})
return [{"value": p.id, "label": getattr(p, "label", None) or p.id} for p in items]
# ----- Projects CRUD -----
@router.get("/{instanceId}/projects", response_model=PaginatedResponse[Projekt])
@limiter.limit("30/minute")
async def get_projects(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[Projekt]:
"""Get all projects for a feature instance with optional pagination."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
recordFilter = {"featureInstanceId": instanceId}
items = interface.getProjekte(recordFilter=recordFilter)
paginationParams = _parsePagination(pagination)
if paginationParams:
if paginationParams.sort:
for sort_field in reversed(paginationParams.sort):
field_name = sort_field.field
direction = sort_field.direction.lower()
items.sort(
key=lambda x: getattr(x, field_name, None),
reverse=(direction == "desc")
)
total_items = len(items)
total_pages = (total_items + paginationParams.pageSize - 1) // paginationParams.pageSize
start_idx = (paginationParams.page - 1) * paginationParams.pageSize
end_idx = start_idx + paginationParams.pageSize
paginated_items = items[start_idx:end_idx]
return PaginatedResponse(
items=paginated_items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=total_items,
totalPages=total_pages,
sort=paginationParams.sort or [],
filters=paginationParams.filters
)
)
return PaginatedResponse(items=items, pagination=None)
@router.get("/{instanceId}/projects/{projectId}", response_model=Projekt)
@limiter.limit("30/minute")
async def get_project_by_id(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
projectId: str = Path(..., description="Project ID"),
context: RequestContext = Depends(getRequestContext)
) -> Projekt:
"""Get a single project by ID."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
projekt = interface.getProjekt(projectId)
if not projekt or str(getattr(projekt, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Project '{projectId}' not found")
return projekt
@router.post("/{instanceId}/projects", response_model=Projekt)
@limiter.limit("30/minute")
async def create_project(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Projekt:
"""Create a new project."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
if "mandateId" not in data:
data["mandateId"] = mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = instanceId
try:
projekt = Projekt(**data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid data: {str(e)}")
return interface.createProjekt(projekt)
@router.put("/{instanceId}/projects/{projectId}", response_model=Projekt)
@limiter.limit("30/minute")
async def update_project(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
projectId: str = Path(..., description="Project ID"),
data: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Projekt:
"""Update a project."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
projekt = interface.getProjekt(projectId)
if not projekt or str(getattr(projekt, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Project '{projectId}' not found")
updated = interface.updateProjekt(projectId, data)
if not updated:
raise HTTPException(status_code=500, detail="Update failed")
return updated
@router.delete("/{instanceId}/projects/{projectId}", status_code=status.HTTP_204_NO_CONTENT)
@limiter.limit("30/minute")
async def delete_project(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
projectId: str = Path(..., description="Project ID"),
context: RequestContext = Depends(getRequestContext)
) -> None:
"""Delete a project."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
projekt = interface.getProjekt(projectId)
if not projekt or str(getattr(projekt, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Project '{projectId}' not found")
if not interface.deleteProjekt(projectId):
raise HTTPException(status_code=500, detail="Delete failed")
# ----- Parcels CRUD -----
@router.get("/{instanceId}/parcels", response_model=PaginatedResponse[Parzelle])
@limiter.limit("30/minute")
async def get_parcels(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[Parzelle]:
"""Get all parcels for a feature instance with optional pagination."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
recordFilter = {"featureInstanceId": instanceId}
items = interface.getParzellen(recordFilter=recordFilter)
paginationParams = _parsePagination(pagination)
if paginationParams:
if paginationParams.sort:
for sort_field in reversed(paginationParams.sort):
field_name = sort_field.field
direction = sort_field.direction.lower()
items.sort(
key=lambda x: getattr(x, field_name, None),
reverse=(direction == "desc")
)
total_items = len(items)
total_pages = (total_items + paginationParams.pageSize - 1) // paginationParams.pageSize
start_idx = (paginationParams.page - 1) * paginationParams.pageSize
end_idx = start_idx + paginationParams.pageSize
paginated_items = items[start_idx:end_idx]
return PaginatedResponse(
items=paginated_items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=total_items,
totalPages=total_pages,
sort=paginationParams.sort or [],
filters=paginationParams.filters
)
)
return PaginatedResponse(items=items, pagination=None)
@router.get("/{instanceId}/parcels/{parcelId}", response_model=Parzelle)
@limiter.limit("30/minute")
async def get_parcel_by_id(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
parcelId: str = Path(..., description="Parcel ID"),
context: RequestContext = Depends(getRequestContext)
) -> Parzelle:
"""Get a single parcel by ID."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
parzelle = interface.getParzelle(parcelId)
if not parzelle or str(getattr(parzelle, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Parcel '{parcelId}' not found")
return parzelle
@router.post("/{instanceId}/parcels", response_model=Parzelle)
@limiter.limit("30/minute")
async def create_parcel(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Parzelle:
"""Create a new parcel."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
if "mandateId" not in data:
data["mandateId"] = mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = instanceId
try:
parzelle = Parzelle(**data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid data: {str(e)}")
return interface.createParzelle(parzelle)
@router.put("/{instanceId}/parcels/{parcelId}", response_model=Parzelle)
@limiter.limit("30/minute")
async def update_parcel(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
parcelId: str = Path(..., description="Parcel ID"),
data: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Parzelle:
"""Update a parcel."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
parzelle = interface.getParzelle(parcelId)
if not parzelle or str(getattr(parzelle, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Parcel '{parcelId}' not found")
updated = interface.updateParzelle(parcelId, data)
if not updated:
raise HTTPException(status_code=500, detail="Update failed")
return updated
@router.delete("/{instanceId}/parcels/{parcelId}", status_code=status.HTTP_204_NO_CONTENT)
@limiter.limit("30/minute")
async def delete_parcel(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
parcelId: str = Path(..., description="Parcel ID"),
context: RequestContext = Depends(getRequestContext)
) -> None:
"""Delete a parcel."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
parzelle = interface.getParzelle(parcelId)
if not parzelle or str(getattr(parzelle, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Parcel '{parcelId}' not found")
if not interface.deleteParzelle(parcelId):
raise HTTPException(status_code=500, detail="Delete failed")
# ============================================================================
# LEGACY / STATELESS ROUTES (unchanged)
# ============================================================================
@router.post("/command", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def process_command(
request: Request,
userInput: str = Body(..., embed=True, description="Natural language command"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Process natural language command and execute corresponding CRUD operation.
Uses AI to analyze user intent and extract parameters, then executes the appropriate
CRUD operation. Works stateless without session management.
Example user inputs:
- "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
- "Zeige mir alle Projekte in Zürich"
- "Aktualisiere Projekt XYZ mit Status 'Planung'"
- "Lösche Parzelle ABC"
- "SELECT * FROM Projekt WHERE plz = '8000'"
Headers:
- X-CSRF-Token: CSRF token (required for security)
Returns:
{
"success": true,
"intent": "CREATE|READ|UPDATE|DELETE|QUERY",
"entity": "Projekt|Parzelle|...|null",
"result": {...}
}
"""
try:
# Validate CSRF token (middleware also checks, but explicit validation for better error messages)
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for POST /api/realestate/command from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for POST /api/realestate/command from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for POST /api/realestate/command from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Processing command request from user {currentUser.id} (mandate: {currentUser.mandateId})")
logger.debug(f"User input: {userInput}")
# Process natural language command with AI
result = await processNaturalLanguageCommand(
currentUser=currentUser,
userInput=userInput
)
return result
except ValueError as e:
logger.error(f"Validation error in process_command: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Validation error: {str(e)}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing command: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error processing command: {str(e)}"
)
@router.get("/tables", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def get_available_tables(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get all available real estate tables.
Returns a list of available table names with their descriptions.
Headers:
- X-CSRF-Token: CSRF token (required for security)
Example:
- GET /api/realestate/tables
"""
try:
# Validate CSRF token if provided
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for GET /api/realestate/tables from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for GET /api/realestate/tables from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for GET /api/realestate/tables from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Getting available tables for user {currentUser.id} (mandate: {currentUser.mandateId})")
# Define available tables with descriptions
tables = [
{
"name": "Projekt",
"description": "Real estate projects",
"model": "Projekt"
},
{
"name": "Parzelle",
"description": "Plots/parcels",
"model": "Parzelle"
},
{
"name": "Dokument",
"description": "Documents",
"model": "Dokument"
},
{
"name": "Gemeinde",
"description": "Municipalities",
"model": "Gemeinde"
},
{
"name": "Kanton",
"description": "Cantons",
"model": "Kanton"
},
{
"name": "Land",
"description": "Countries",
"model": "Land"
},
]
return {
"tables": tables,
"count": len(tables)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting available tables: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting available tables: {str(e)}"
)
@router.get("/table/{table}", response_model=PaginatedResponse[Any])
@limiter.limit("120/minute")
async def get_table_data(
request: Request,
table: str = Path(..., description="Table name (Projekt, Parzelle, Dokument, Gemeinde, Kanton, Land)"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> PaginatedResponse[Dict[str, Any]]:
"""
Get all data from a specific real estate table with optional pagination.
Available tables:
- Projekt: Real estate projects
- Parzelle: Plots/parcels
- Dokument: Documents
- Gemeinde: Municipalities
- Kanton: Cantons
- Land: Countries
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Headers:
- X-CSRF-Token: CSRF token (required for security)
Examples:
- GET /api/realestate/table/Projekt (no pagination - returns all items)
- GET /api/realestate/table/Parzelle?pagination={"page":1,"pageSize":10,"sort":[]}
- GET /api/realestate/table/Gemeinde?pagination={"page":2,"pageSize":20,"sort":[{"field":"label","direction":"asc"}]}
"""
try:
# Validate CSRF token if provided
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for GET /api/realestate/table/{table} from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for GET /api/realestate/table/{table} from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for GET /api/realestate/table/{table} from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Getting table data for '{table}' from user {currentUser.id} (mandate: {currentUser.mandateId})")
# Map table names to model classes and getter methods
table_mapping = {
"Projekt": (Projekt, "getProjekte"),
"Parzelle": (Parzelle, "getParzellen"),
"Dokument": (Dokument, "getDokumente"),
"Gemeinde": (Gemeinde, "getGemeinden"),
"Kanton": (Kanton, "getKantone"),
"Land": (Land, "getLaender"),
}
# Validate table name
if table not in table_mapping:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid table name '{table}'. Available tables: {', '.join(table_mapping.keys())}"
)
# Get interface and fetch data
realEstateInterface = getRealEstateInterface(currentUser)
model_class, method_name = table_mapping[table]
getter_method = getattr(realEstateInterface, method_name)
# Fetch all records (no filter for now)
records = getter_method(recordFilter=None)
# Keep records as model instances (like routeDataFiles does with FileItem)
# FastAPI will automatically serialize Pydantic models to JSON
items = records
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Apply pagination if requested
if paginationParams:
# Apply sorting if specified
if paginationParams.sort:
for sort_field in reversed(paginationParams.sort): # Reverse to apply in priority order
field_name = sort_field.field
direction = sort_field.direction.lower()
def sort_key(item):
# Access attribute from model instance
value = getattr(item, field_name, None)
# Handle None values - put them at the end for asc, at the start for desc
if value is None:
return (1, None) # Use tuple to ensure None values sort consistently
return (0, value)
items.sort(key=sort_key, reverse=(direction == "desc"))
# Apply pagination
total_items = len(items)
total_pages = (total_items + paginationParams.pageSize - 1) // paginationParams.pageSize # Ceiling division
start_idx = (paginationParams.page - 1) * paginationParams.pageSize
end_idx = start_idx + paginationParams.pageSize
paginated_items = items[start_idx:end_idx]
return PaginatedResponse(
items=paginated_items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=total_items,
totalPages=total_pages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
# No pagination - return all items (as model instances, like routeDataFiles)
return PaginatedResponse(
items=items,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting table data for '{table}': {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting table data: {str(e)}"
)
@router.post("/table/{table}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def create_table_record(
request: Request,
table: str = Path(..., description="Table name (Projekt, Parzelle, Dokument, Gemeinde, Kanton, Land)"),
data: Dict[str, Any] = Body(..., description="Record data to create"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Create a new record in a specific real estate table.
Available tables:
- Projekt: Real estate projects (with parcel data support)
- Parzelle: Plots/parcels
- Dokument: Documents
- Gemeinde: Municipalities
- Kanton: Cantons
- Land: Countries
Request Body:
For Projekt:
{
"label": "Projekt Bezeichnung",
"statusProzess": "Eingang", // Optional
"parzelle": {
"id": "OE5913",
"egrid": "CH252699779137",
"perimeter": {...},
"geometry": {...}, // Used for baulinie
...
}
}
For other tables:
- JSON object with fields matching the table's data model
Headers:
- X-CSRF-Token: CSRF token (required for security)
Examples:
- POST /api/realestate/table/Projekt
Body: {"label": "Hauptstrasse 42", "parzelle": {...}}
- POST /api/realestate/table/Parzelle
Body: {"label": "Parzelle 1", "strasseNr": "Hauptstrasse 42", "plz": "8000", "bauzone": "W3"}
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for POST /api/realestate/table/{table} from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for POST /api/realestate/table/{table} from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for POST /api/realestate/table/{table} from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Special handling for Projekt with parcel data
if table == "Projekt" and ("parzelle" in data or "parzellen" in data):
logger.info(f"Creating Projekt with parcel data for user {currentUser.id} (mandate: {currentUser.mandateId})")
# Extract fields
label = data.get("label")
if not label:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="label is required"
)
status_prozess = data.get("statusProzess", "Eingang")
# Support both single parzelle and multiple parzellen
parzellen_data = []
if "parzellen" in data:
# Multiple parcels
parzellen_data = data.get("parzellen", [])
if not isinstance(parzellen_data, list):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="parzellen must be an array"
)
elif "parzelle" in data:
# Single parcel (backward compatibility)
parzelle_data = data.get("parzelle")
if parzelle_data:
parzellen_data = [parzelle_data]
if not parzellen_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="parzelle or parzellen data is required"
)
# Use helper function to create project with parcel data
try:
result = await create_project_with_parcel_data(
currentUser=currentUser,
projekt_label=label,
parzellen_data=parzellen_data,
status_prozess=status_prozess,
)
# Return in format expected by frontend (single record, not nested)
return result.get("projekt", {})
except HTTPException:
# Re-raise HTTPExceptions directly
raise
except Exception as e:
logger.error(f"Error creating Projekt with parcel data: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating Projekt: {str(e)}"
)
# Standard handling for other tables or Projekt without parcel data
logger.info(f"Creating record in table '{table}' for user {currentUser.id} (mandate: {currentUser.mandateId})")
logger.debug(f"Record data: {data}")
# Map table names to model classes and create methods
table_mapping = {
"Projekt": (Projekt, "createProjekt"),
"Parzelle": (Parzelle, "createParzelle"),
"Dokument": (Dokument, "createDokument"),
"Gemeinde": (Gemeinde, "createGemeinde"),
"Kanton": (Kanton, "createKanton"),
"Land": (Land, "createLand"),
}
# Validate table name
if table not in table_mapping:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid table name '{table}'. Available tables: {', '.join(table_mapping.keys())}"
)
# Get interface
realEstateInterface = getRealEstateInterface(currentUser)
model_class, method_name = table_mapping[table]
create_method = getattr(realEstateInterface, method_name)
# Ensure mandateId is set (will be set by interface if missing)
if "mandateId" not in data:
data["mandateId"] = currentUser.mandateId
# Create model instance from data
try:
model_instance = model_class(**data)
except Exception as e:
logger.error(f"Error creating {table} model instance: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid data for {table}: {str(e)}"
)
# Create record
try:
created_record = create_method(model_instance)
# Convert to dictionary for response
if hasattr(created_record, 'model_dump'):
return created_record.model_dump()
else:
return created_record
except Exception as e:
logger.error(f"Error creating {table} record: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating {table} record: {str(e)}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating record in table '{table}': {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating record: {str(e)}"
)
@router.get("/parcel/search", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def search_parcel(
request: Request,
location: str = Query(..., description="Either coordinates as 'x,y' (LV95) or address string"),
include_adjacent: bool = Query(False, description="Include adjacent parcels information"),
fetch_documents: bool = Query(True, description="If true, fetch BZO documents for the Gemeinde (default: true)"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Search for parcel information by address or coordinates.
Returns comprehensive parcel information including:
- Parcel identification (number, EGRID, etc.)
- Precise boundary geometry for map display
- Administrative context (canton, municipality)
- Link to official cadastral map
- Optional: Adjacent parcels
- Optional: Gemeinde information and BZO documents (if fetch_documents=true)
Query Parameters:
- location: Either coordinates as "x,y" (LV95/EPSG:2056) or address string
- include_adjacent: If true, fetches information about adjacent parcels (slower)
- fetch_documents: If true, checks for and fetches Bauzonenverordnung (BZO) documents for the Gemeinde (default: true, slower)
Headers:
- X-CSRF-Token: CSRF token (required for security)
Examples:
- GET /api/realestate/parcel/search?location=2600000,1200000
- GET /api/realestate/parcel/search?location=Bundesplatz 3, 3003 Bern
- GET /api/realestate/parcel/search?location=Bundesplatz 3, 3003 Bern&include_adjacent=true
- GET /api/realestate/parcel/search?location=Bundesplatz 3, 3003 Bern&fetch_documents=true
Returns:
{
"parcel": {
"id": "823",
"egrid": "CH294676423526",
"number": "823",
"name": "823",
"identnd": "BE0200000042",
"canton": "BE",
"municipality_code": 351,
"municipality_name": "Bern",
"address": "Bundesplatz 3 3011 Bern",
"plz": "3011",
"perimeter": {...},
"area_m2": 1234.56,
"centroid": {"x": 2600000, "y": 1200000},
"geoportal_url": "https://...",
"realestate_type": null,
"bauzone": "W3"
},
"map_view": {
"center": {"x": 2600000, "y": 1200000},
"zoom_bounds": {"min_x": ..., "max_x": ..., "min_y": ..., "max_y": ...},
"geometry_geojson": {...}
},
"adjacent_parcels": [...], // Optional (only if include_adjacent=true)
"gemeinde": { // Optional (only if fetch_documents=true)
"id": "...",
"label": "Bern",
"plz": "3011"
},
"documents": [ // Optional (only if fetch_documents=true and documents found/created)
{
"id": "...",
"label": "BZO Bern",
"dokumentTyp": "gemeindeBzoAktuell",
"dokumentReferenz": "...",
"quelle": "https://...",
"mimeType": "application/pdf"
}
]
}
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for GET /api/realestate/parcel/search from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
logger.info(f"Searching parcel for user {currentUser.id} (mandate: {currentUser.mandateId}) with location: {location}")
# Initialize connector
connector = SwissTopoMapServerConnector()
# Search for parcel
parcel_data = await connector.search_parcel(location)
if not parcel_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No parcel found for location: {location}"
)
# Extract and normalize attributes
extracted_attributes = connector.extract_parcel_attributes(parcel_data)
attributes = parcel_data.get("attributes", {})
geometry = parcel_data.get("geometry", {})
# Calculate parcel area from perimeter
area_m2 = None
centroid = None
if extracted_attributes.get("perimeter"):
perimeter = extracted_attributes["perimeter"]
points = perimeter.get("punkte", [])
# Calculate area using shoelace formula
if len(points) >= 3:
area = 0
for i in range(len(points)):
j = (i + 1) % len(points)
area += points[i]["x"] * points[j]["y"]
area -= points[j]["x"] * points[i]["y"]
area_m2 = abs(area / 2)
# Calculate centroid
sum_x = sum(p["x"] for p in points)
sum_y = sum(p["y"] for p in points)
centroid = {
"x": sum_x / len(points),
"y": sum_y / len(points)
}
# Extract municipality name and address from Swiss Topo data
municipality_name = None
full_address = None
plz = None
canton = attributes.get("ak") # Extract canton early so it's always available
# Debug: Log all available attributes to understand what we have
logger.debug(f"Parcel attributes keys: {list(attributes.keys())}")
logger.debug(f"Sample parcel attributes: {dict(list(attributes.items())[:10])}") # First 10 items
# First, check if municipality is directly in parcel attributes (ggdename or dplzname)
# These fields are often present in the parcel data itself from Swiss Topo
municipality_from_attrs = attributes.get("ggdename") or attributes.get("dplzname") or attributes.get("gemeinde") or attributes.get("gemeindename")
if municipality_from_attrs:
# Use connector's cleaning method to remove canton suffix
municipality_name = connector._clean_municipality_name(str(municipality_from_attrs))
logger.info(f"Found municipality '{municipality_name}' in parcel attributes (from {municipality_from_attrs})")
# Also check extracted_attributes for municipality
if not municipality_name:
municipality_from_extracted = extracted_attributes.get("kontextGemeinde")
if municipality_from_extracted:
municipality_name = str(municipality_from_extracted)
logger.info(f"Found municipality '{municipality_name}' in extracted attributes")
# Also check for PLZ in parcel attributes
if not plz:
plz_from_attrs = attributes.get("dplz4") or attributes.get("plz")
if plz_from_attrs:
plz = str(plz_from_attrs).strip()
logger.debug(f"Found PLZ '{plz}' in parcel attributes")
# Try to use geocoded address info if available (more accurate than centroid query)
geocoded_address = parcel_data.get('geocoded_address')
if geocoded_address:
if not full_address:
full_address = geocoded_address.get('full_address')
if not plz:
plz = geocoded_address.get('plz')
if not municipality_name:
geocoded_municipality = geocoded_address.get('municipality')
if geocoded_municipality:
municipality_name = connector._clean_municipality_name(geocoded_municipality)
logger.debug(f"Found municipality '{municipality_name}' from geocoded address")
if full_address:
logger.debug(f"Using geocoded address: {full_address}")
# If geocoded address not available, try to get address by querying the address layer
# Use query coordinates (where user clicked/geocoded) instead of parcel centroid
# This ensures we get the address at the exact location, not at the parcel center
query_coords = parcel_data.get('query_coordinates')
address_query_coords = query_coords if query_coords else centroid
if not full_address and address_query_coords:
query_x = address_query_coords['x']
query_y = address_query_coords['y']
logger.debug(f"Querying address layer at query coordinates: ({query_x}, {query_y})")
# Check if this was a coordinate search (not geocoded address)
is_coordinate_search = ',' in location and not any(c.isalpha() for c in location.split(',')[0])
# Use connector's helper method to query building layer
# Use tolerance=1 (minimum) for coordinate searches to get exact building
building_tolerance = 1 if is_coordinate_search else 10
building_result = await connector._query_building_layer(query_x, query_y, tolerance=building_tolerance, buffer=25)
if building_result:
addr_attrs = building_result.get("attributes", {})
logger.debug(f"Address layer attributes: {addr_attrs}")
# Extract address using connector's helper method
address_info = connector._extract_address_from_building_attrs(addr_attrs)
if not full_address:
full_address = address_info.get('full_address')
if not plz:
plz = address_info.get('plz')
if not municipality_name:
municipality_name = address_info.get('municipality')
if municipality_name:
logger.debug(f"Found municipality '{municipality_name}' from building layer")
if full_address:
logger.debug(f"Constructed address: {full_address}")
# If address not found via building layer, try to construct from available data
if not full_address:
# Check if location was provided as an address string
if location and any(c.isalpha() for c in location) and "CH" not in location:
# Location looks like an address (not an EGRID)
full_address = location
logger.debug(f"Using location as address: {full_address}")
# Try to extract municipality name from address string if not found yet
if not municipality_name and full_address:
# Parse address string to extract municipality name
# Format is usually: "Street Number, PLZ Municipality" or "Street Number PLZ Municipality"
# Examples: "Forchstrasse 6c, 8610 Uster" or "Bundesplatz 3 3011 Bern"
# Try to match PLZ followed by municipality name
# PLZ is typically 4 digits, municipality name follows
plz_municipality_match = re.search(r'\b(\d{4})\s+([A-ZÄÖÜ][a-zäöüß\s-]+)', full_address)
if plz_municipality_match:
extracted_plz = plz_municipality_match.group(1)
extracted_municipality = plz_municipality_match.group(2).strip()
# Remove trailing commas or other punctuation
extracted_municipality = re.sub(r'[,;\.]+$', '', extracted_municipality).strip()
if extracted_municipality:
municipality_name = extracted_municipality
if not plz:
plz = extracted_plz
logger.debug(f"Extracted municipality '{municipality_name}' and PLZ '{plz}' from address string")
# Try to extract municipality name from BFSNR if still not found
if not municipality_name:
bfsnr = attributes.get("bfsnr")
logger.info(f"Attempting to resolve municipality name for BFS number {bfsnr} in canton {canton}")
# Try to query database for Gemeinde by BFS number
if bfsnr and canton:
try:
realEstateInterface = getRealEstateInterface(currentUser)
# Query Gemeinde by BFS number (stored in kontextInformationen)
gemeinden = realEstateInterface.getGemeinden(
recordFilter={"mandateId": currentUser.mandateId}
)
logger.debug(f"Found {len(gemeinden)} Gemeinden in database, searching for BFS {bfsnr}")
for gemeinde in gemeinden:
# Check kontextInformationen for BFS number
for kontext in gemeinde.kontextInformationen:
try:
kontext_data = json.loads(kontext.inhalt) if isinstance(kontext.inhalt, str) else kontext.inhalt
if isinstance(kontext_data, dict):
kontext_bfsnr = kontext_data.get("bfs_nummer") or kontext_data.get("bfsnr") or kontext_data.get("municipality_code")
if str(kontext_bfsnr) == str(bfsnr):
municipality_name = gemeinde.label
logger.info(f"Found Gemeinde '{municipality_name}' by BFS number {bfsnr} in database")
break
except (json.JSONDecodeError, AttributeError) as e:
logger.debug(f"Error parsing kontext: {e}")
continue
if municipality_name:
break
except Exception as e:
logger.warning(f"Error querying Gemeinde by BFS number: {e}", exc_info=True)
# If still not found, try to use Swiss Topo geocoding API to get municipality name from coordinates
# This is more reliable than BFS number lookup since coordinates are exact
if not municipality_name and centroid:
try:
# Use Swiss Topo geocoding to get municipality name from coordinates
geocode_url = "https://api3.geo.admin.ch/rest/services/api/MapServer/identify"
params = {
"geometry": f"{centroid['x']},{centroid['y']}",
"geometryType": "esriGeometryPoint",
"layers": "all:ch.swisstopo.swissboundaries3d-gemeinde-flaeche.fill",
"tolerance": "0",
"returnGeometry": "false",
"sr": "2056"
}
import aiohttp
import ssl
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector_aiohttp = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector_aiohttp) as session:
async with session.get(geocode_url, params=params) as resp:
if resp.status == 200:
data = await resp.json()
results = data.get("results", [])
if results:
result_attrs = results[0].get("attributes", {})
geocoded_municipality = result_attrs.get("name") or result_attrs.get("gemeindename") or result_attrs.get("label")
if geocoded_municipality:
municipality_name = connector._clean_municipality_name(str(geocoded_municipality))
logger.info(f"Found municipality '{municipality_name}' via Swiss Topo geocoding API (from {geocoded_municipality})")
except Exception as e:
logger.debug(f"Error querying Swiss Topo geocoding API: {e}", exc_info=True)
# If still not found, try expanded Swiss municipalities lookup
if not municipality_name and bfsnr:
# Expanded Swiss municipalities lookup by BFS number
# Source: https://www.bfs.admin.ch/bfs/de/home/grundlagen/agvch.html
common_municipalities = {
# Zürich (ZH)
261: "Zürich",
198: "Pfäffikon", # ZH-198 is Pfäffikon
191: "Uster", # Uster is ZH-191
3203: "Winterthur",
# Bern (BE)
351: "Bern",
# Basel (BS)
2701: "Basel",
# Genève (GE)
6621: "Genève",
# Vaud (VD)
5586: "Lausanne",
# Luzern (LU)
1061: "Luzern",
# St. Gallen (SG)
230: "St. Gallen",
# Ticino (TI)
5192: "Lugano",
# Schwyz (SZ)
1367: "Schwyz",
}
if bfsnr in common_municipalities:
municipality_name = common_municipalities[bfsnr]
logger.info(f"Looked up municipality '{municipality_name}' from common list for BFS {bfsnr}")
# If still not found, log warning
if not municipality_name:
logger.warning(f"Could not determine municipality name for BFS number {bfsnr} in canton {canton}. Municipality name will be None.")
# Final validation: Don't use EGRID as address
if full_address and full_address.startswith("CH") and len(full_address) == 14 and full_address[2:].isdigit():
# This is an EGRID, not an address
full_address = None
logger.debug("Removed EGRID from address field")
# Query zone information (wohnzone/bauzone) from ÖREB WFS
bauzone = None
# Check if geometry has actual data (either rings or coordinates)
has_geometry = geometry and (geometry.get("rings") or geometry.get("coordinates"))
if canton and has_geometry:
try:
logger.debug(f"Querying zone information for parcel {attributes.get('label')} in canton {canton}")
oereb_connector = OerebWfsConnector()
egrid = attributes.get("egris_egrid", "")
x = centroid["x"] if centroid else None
y = centroid["y"] if centroid else None
# Query zone layer using parcel geometry
zone_results = await oereb_connector.query_zone_layer(
egrid=egrid,
x=x or 0.0,
y=y or 0.0,
canton=canton,
geometry=geometry
)
if zone_results and len(zone_results) > 0:
zone_attrs = zone_results[0].get("attributes", {})
typ_gde_abkuerzung = zone_attrs.get("typ_gde_abkuerzung")
if typ_gde_abkuerzung:
bauzone = typ_gde_abkuerzung
logger.info(f"Found bauzone/wohnzone: {bauzone} for parcel {attributes.get('label')}")
else:
logger.debug(f"No typ_gde_abkuerzung found in zone results for parcel {attributes.get('label')}")
else:
logger.debug(f"No zone results found for parcel {attributes.get('label')}")
except Exception as e:
logger.warning(f"Error querying zone information: {e}", exc_info=True)
# Continue without zone information if query fails
# Build parcel info
parcel_info = {
"id": attributes.get("label") or attributes.get("number"),
"egrid": attributes.get("egris_egrid"),
"number": attributes.get("number"),
"name": attributes.get("name"),
"identnd": attributes.get("identnd"),
"canton": attributes.get("ak"),
"municipality_code": attributes.get("bfsnr"),
"municipality_name": municipality_name,
"address": full_address,
"plz": plz,
"perimeter": extracted_attributes.get("perimeter"),
"area_m2": area_m2,
"centroid": centroid,
"geoportal_url": attributes.get("geoportal_url"),
"realestate_type": attributes.get("realestate_type"),
"bauzone": bauzone
}
# Build map view info
bbox = parcel_data.get("bbox", [])
map_view = {
"center": centroid,
"zoom_bounds": {
"min_x": bbox[0] if len(bbox) >= 4 else None,
"min_y": bbox[1] if len(bbox) >= 4 else None,
"max_x": bbox[2] if len(bbox) >= 4 else None,
"max_y": bbox[3] if len(bbox) >= 4 else None
},
"geometry_geojson": {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [
[[p["x"], p["y"]] for p in extracted_attributes["perimeter"]["punkte"]]
] if extracted_attributes.get("perimeter") else []
},
"properties": {
"id": parcel_info["id"],
"egrid": parcel_info["egrid"],
"number": parcel_info["number"]
}
}
}
# Build response
response_data = {
"parcel": parcel_info,
"map_view": map_view
}
# Fetch adjacent parcels if requested
if include_adjacent and parcel_data and parcel_data.get("geometry"):
try:
# Use the connector's method to find neighboring parcels by sampling along the boundary
# This ensures we find all parcels that actually touch the selected parcel
selected_parcel_id = parcel_info["id"]
adjacent_parcels_raw = await connector.find_neighboring_parcels(
parcel_data=parcel_data,
selected_parcel_id=selected_parcel_id,
sample_distance=20.0, # Sample every 20 meters (balanced for coverage and speed)
max_sample_points=30, # Allow up to 30 points to ensure all vertices are covered
max_neighbors=15, # Find up to 15 neighbors
max_concurrent=50 # Process up to 50 queries concurrently (maximum parallelization)
)
# Convert adjacent parcels to include GeoJSON geometry (optimized, minimal logging)
def convert_parcel_geometry(adj_parcel: Dict[str, Any]) -> Dict[str, Any]:
"""Convert a single adjacent parcel to include GeoJSON geometry."""
adj_parcel_with_geo = {
"id": adj_parcel["id"],
"egrid": adj_parcel.get("egrid"),
"number": adj_parcel.get("number"),
"perimeter": adj_parcel.get("perimeter")
}
# Convert geometry to GeoJSON format if available
adj_geometry = adj_parcel.get("geometry")
adj_perimeter = adj_parcel.get("perimeter")
if adj_geometry:
# Handle ESRI format (rings)
if "rings" in adj_geometry and adj_geometry["rings"]:
ring = adj_geometry["rings"][0] # Outer ring
coordinates = [[[p[0], p[1]] for p in ring]]
adj_parcel_with_geo["geometry_geojson"] = {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": coordinates
},
"properties": {
"id": adj_parcel["id"],
"egrid": adj_parcel.get("egrid"),
"number": adj_parcel.get("number")
}
}
# Handle GeoJSON format
elif adj_geometry.get("type") == "Polygon":
adj_parcel_with_geo["geometry_geojson"] = {
"type": "Feature",
"geometry": adj_geometry,
"properties": {
"id": adj_parcel["id"],
"egrid": adj_parcel.get("egrid"),
"number": adj_parcel.get("number")
}
}
# If no geometry_geojson was created but we have perimeter, create it from perimeter
if "geometry_geojson" not in adj_parcel_with_geo and adj_perimeter and adj_perimeter.get("punkte"):
punkte = adj_perimeter["punkte"]
coordinates = [[[p["x"], p["y"]] for p in punkte]]
adj_parcel_with_geo["geometry_geojson"] = {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": coordinates
},
"properties": {
"id": adj_parcel["id"],
"egrid": adj_parcel.get("egrid"),
"number": adj_parcel.get("number")
}
}
return adj_parcel_with_geo
# Convert all parcels in parallel (using list comprehension for speed)
adjacent_parcels = [convert_parcel_geometry(adj_parcel) for adj_parcel in adjacent_parcels_raw]
response_data["adjacent_parcels"] = adjacent_parcels
logger.info(f"Found {len(adjacent_parcels)} neighboring parcels for parcel {selected_parcel_id}")
except Exception as e:
logger.warning(f"Error fetching adjacent parcels: {e}", exc_info=True)
response_data["adjacent_parcels"] = []
# Fetch BZO documents if requested
gemeinde_info = None
bzo_documents = []
logger.debug(f"Document fetch check: fetch_documents={fetch_documents}, municipality_name={municipality_name}, canton={canton}")
if fetch_documents and municipality_name and canton:
logger.info(f"Fetching BZO documents for Gemeinde '{municipality_name}' in canton '{canton}'")
try:
# Get interfaces
realEstateInterface = getRealEstateInterface(currentUser)
componentInterface = getComponentInterface(currentUser)
logger.debug(f"Interfaces initialized for document fetching")
# Resolve or create Gemeinde
gemeinde = None
# First, ensure Land "Schweiz" exists
laender = realEstateInterface.getLaender(recordFilter={"label": "Schweiz"})
if not laender:
land = Land(
mandateId=currentUser.mandateId,
label="Schweiz",
abk="CH"
)
land = realEstateInterface.createLand(land)
logger.debug(f"Created Land 'Schweiz' with ID: {land.id}")
else:
land = laender[0]
# Map canton abbreviations to full names
canton_names = {
"ZH": "Zürich", "BE": "Bern", "LU": "Luzern", "UR": "Uri", "SZ": "Schwyz",
"OW": "Obwalden", "NW": "Nidwalden", "GL": "Glarus", "ZG": "Zug", "FR": "Freiburg",
"SO": "Solothurn", "BS": "Basel-Stadt", "BL": "Basel-Landschaft", "SH": "Schaffhausen",
"AR": "Appenzell Ausserrhoden", "AI": "Appenzell Innerrhoden", "SG": "St. Gallen",
"GR": "Graubünden", "AG": "Aargau", "TG": "Thurgau", "TI": "Tessin",
"VD": "Waadt", "VS": "Wallis", "NE": "Neuenburg", "GE": "Genf", "JU": "Jura"
}
# Get or create Kanton
kantone = realEstateInterface.getKantone(recordFilter={"abk": canton})
if not kantone:
kanton_label = canton_names.get(canton, canton)
kanton_obj = Kanton(
mandateId=currentUser.mandateId,
label=kanton_label,
abk=canton,
id_land=land.id
)
kanton_obj = realEstateInterface.createKanton(kanton_obj)
logger.debug(f"Created Kanton '{kanton_label}' ({canton})")
else:
kanton_obj = kantone[0]
# Get or create Gemeinde
gemeinden = realEstateInterface.getGemeinden(
recordFilter={"label": municipality_name, "id_kanton": kanton_obj.id}
)
if not gemeinden:
gemeinde = Gemeinde(
mandateId=currentUser.mandateId,
label=municipality_name,
id_kanton=kanton_obj.id,
plz=plz
)
gemeinde = realEstateInterface.createGemeinde(gemeinde)
logger.info(f"Created Gemeinde '{municipality_name}'")
else:
gemeinde = gemeinden[0]
logger.debug(f"Found existing Gemeinde '{municipality_name}'")
gemeinde_info = {
"id": gemeinde.id,
"label": gemeinde.label,
"plz": gemeinde.plz
}
# Check if Gemeinde already has BZO documents
existing_bzo = False
logger.debug(f"Checking for existing BZO documents in Gemeinde '{gemeinde.label}' (has {len(gemeinde.dokumente) if gemeinde.dokumente else 0} documents)")
if gemeinde.dokumente:
for doc in gemeinde.dokumente:
if (doc.label and ("BZO" in doc.label.upper() or "BAU UND ZONENORDNUNG" in doc.label.upper() or
"PLAN D'AMÉNAGEMENT" in doc.label.upper() or "RÈGLEMENT DE CONSTRUCTION" in doc.label.upper() or
"PIANO DI UTILIZZAZIONE" in doc.label.upper() or "REGOLAMENTO EDILIZIO" in doc.label.upper())) or \
(doc.dokumentTyp and doc.dokumentTyp in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]):
existing_bzo = True
logger.info(f"Found existing BZO document: {doc.label} (ID: {doc.id})")
bzo_documents.append({
"id": doc.id,
"label": doc.label,
"dokumentTyp": doc.dokumentTyp.value if doc.dokumentTyp else None,
"dokumentReferenz": doc.dokumentReferenz,
"quelle": doc.quelle,
"mimeType": doc.mimeType
})
if existing_bzo:
logger.info(f"Gemeinde '{municipality_name}' already has {len(bzo_documents)} BZO document(s), skipping search")
# If no BZO documents found, search and download
if not existing_bzo:
logger.info(f"No BZO documents found for {municipality_name}, searching with Tavily...")
# Determine language
language = _get_language_from_kanton(canton)
# Generate search query
search_query = _get_bzo_search_query(municipality_name, language)
logger.debug(f"Tavily search query: {search_query}")
# Initialize Tavily connector
tavily = AiTavily()
# Search with Tavily
search_results = await tavily._search(
query=search_query,
maxResults=5,
country="switzerland"
)
if search_results:
# First, check for direct PDF URLs in search results
pdf_urls = []
html_urls = []
for result in search_results:
url = result.url.lower()
# Check if it's a direct PDF link
if url.endswith('.pdf') or '/pdf/' in url or url.endswith('/pdf'):
if not any(skip in url for skip in ['.html', '.htm', '/page/', '/article/', '/news/']):
pdf_urls.append(result.url)
else:
# It's an HTML page - we'll crawl it to find PDF links
html_urls.append(result.url)
# If no direct PDFs found, scrape HTML pages directly to find PDF links
if not pdf_urls and html_urls:
logger.info(f"No direct PDF links found, scraping {len(html_urls)} HTML pages to find PDF documents...")
# Helper function to scrape HTML and find PDF links
async def scrape_html_for_pdfs(url: str) -> List[str]:
"""Scrape an HTML page to find PDF links."""
found_pdfs = []
try:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector_aiohttp = aiohttp.TCPConnector(ssl=ssl_context)
timeout = aiohttp.ClientTimeout(total=15, connect=5)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8'
}
async with aiohttp.ClientSession(timeout=timeout, headers=headers, connector=connector_aiohttp) as session:
async with session.get(url, allow_redirects=True) as response:
if response.status == 200:
# Check Content-Type header first
content_type = response.headers.get('Content-Type', '').lower()
# Read first few bytes to check if it's a PDF
raw_bytes = await response.read()
# Check if it's actually a PDF by magic bytes
if raw_bytes.startswith(b'%PDF'):
found_pdfs.append(url)
logger.info(f"Found direct PDF link (detected by magic bytes): {url}")
return found_pdfs
# If Content-Type says it's a PDF, add it
if 'application/pdf' in content_type:
found_pdfs.append(url)
logger.info(f"Found direct PDF link (Content-Type): {url}")
return found_pdfs
# If URL ends with .pdf, it's likely a PDF
if url.lower().endswith('.pdf'):
found_pdfs.append(url)
logger.info(f"Found direct PDF link (URL extension): {url}")
return found_pdfs
# Try to decode as text for HTML parsing
try:
# Try UTF-8 first
html_content = raw_bytes.decode('utf-8')
except UnicodeDecodeError:
try:
# Try ISO-8859-1 (common for German sites)
html_content = raw_bytes.decode('iso-8859-1')
except UnicodeDecodeError:
try:
# Try Windows-1252
html_content = raw_bytes.decode('windows-1252')
except UnicodeDecodeError:
# If all else fails, skip this URL
logger.warning(f"Could not decode content from {url} (not UTF-8, ISO-8859-1, or Windows-1252), skipping HTML parsing")
return found_pdfs
# Look for PDF links in various formats
# Pattern 1: Direct PDF URLs
pdf_pattern = r'https?://[^\s<>"\'\)]+\.pdf(?:\?[^\s<>"\'\)]*)?'
found = re.findall(pdf_pattern, html_content, re.IGNORECASE)
# Pattern 2: Relative PDF links (convert to absolute)
relative_pattern = r'href=["\']([^"\']+\.pdf[^"\']*)["\']'
relative_found = re.findall(relative_pattern, html_content, re.IGNORECASE)
# Convert relative URLs to absolute
base_url = f"{urlparse(url).scheme}://{urlparse(url).netloc}"
for rel_url in relative_found:
# Remove query params and fragments for cleaner URLs
clean_url = rel_url.split('?')[0].split('#')[0]
if clean_url.endswith('.pdf'):
abs_url = urljoin(base_url, clean_url)
if abs_url not in found:
found.append(abs_url)
# Pattern 3: Look in data attributes and other places
data_pattern = r'data-[^=]*=["\']([^"\']+\.pdf[^"\']*)["\']'
data_found = re.findall(data_pattern, html_content, re.IGNORECASE)
for data_url in data_found:
clean_url = data_url.split('?')[0].split('#')[0]
if clean_url.endswith('.pdf'):
abs_url = urljoin(base_url, clean_url) if not clean_url.startswith('http') else clean_url
if abs_url not in found:
found.append(abs_url)
# Clean and deduplicate URLs
for pdf_link in found:
pdf_link = pdf_link.rstrip('.,;:!?)').strip()
# Remove common tracking parameters
if '?' in pdf_link:
base, params = pdf_link.split('?', 1)
# Keep only important params, remove tracking
important_params = []
for param in params.split('&'):
if param.split('=')[0].lower() not in ['utm_source', 'utm_medium', 'utm_campaign', 'ref', 'fbclid', 'gclid']:
important_params.append(param)
if important_params:
pdf_link = f"{base}?{'&'.join(important_params)}"
else:
pdf_link = base
if pdf_link not in found_pdfs and pdf_link.startswith('http'):
found_pdfs.append(pdf_link)
logger.debug(f"Found PDF link on {url}: {pdf_link}")
logger.info(f"Found {len(found_pdfs)} PDF links on {url}")
except Exception as e:
logger.debug(f"Error scraping {url} for PDFs: {e}", exc_info=True)
return found_pdfs
# Scrape HTML pages to find PDF links
for html_url in html_urls[:5]: # Limit to first 5 URLs
try:
logger.debug(f"Scraping {html_url} to find PDF links...")
found_pdfs = await scrape_html_for_pdfs(html_url)
pdf_urls.extend(found_pdfs)
except Exception as e:
logger.warning(f"Error scraping {html_url} to find PDFs: {e}", exc_info=True)
continue
# Also check rawContent from search results for PDF links
for result in search_results:
if result.rawContent:
pdf_pattern = r'https?://[^\s<>"\'\)]+\.pdf(?:\?[^\s<>"\'\)]*)?'
found_pdfs = re.findall(pdf_pattern, result.rawContent, re.IGNORECASE)
for pdf_link in found_pdfs:
pdf_link = pdf_link.rstrip('.,;:!?)').strip()
if pdf_link not in pdf_urls and pdf_link.startswith('http'):
pdf_urls.append(pdf_link)
logger.debug(f"Found PDF link in rawContent: {pdf_link}")
if not pdf_urls:
logger.warning(f"No PDF URLs found in Tavily results for {municipality_name}. Results were HTML pages, not direct PDF links.")
logger.debug(f"Tavily returned URLs: {[r.url for r in search_results]}")
logger.info(f"Found {len(pdf_urls)} potential PDF documents for {municipality_name}")
# Helper function to download a single PDF
async def download_pdf(pdf_url: str) -> Optional[bytes]:
"""Download a PDF from a URL with retry logic."""
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
if attempt > 0:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': '*/*'
}
else:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/pdf,application/octet-stream,*/*',
'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
# Create SSL context that doesn't verify certificates (for development)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Create connector with SSL context
connector = aiohttp.TCPConnector(ssl=ssl_context)
timeout = aiohttp.ClientTimeout(total=30, connect=10)
async with aiohttp.ClientSession(timeout=timeout, headers=headers, connector=connector) as session:
async with session.get(pdf_url, allow_redirects=True) as response:
if response.status == 200:
# Check content-type header first
content_type = response.headers.get('Content-Type', '').lower()
if 'text/html' in content_type or 'text/xml' in content_type:
logger.warning(f"URL {pdf_url} returned HTML content (Content-Type: {content_type}), skipping")
raise Exception("Server returned HTML content instead of PDF")
pdf_content = await response.read()
if not pdf_content or len(pdf_content) < 100:
raise Exception("Downloaded file is too small or empty")
# Verify it's actually a PDF
if not pdf_content.startswith(b'%PDF'):
if pdf_content.startswith(b'<') or pdf_content.startswith(b'<!DOCTYPE'):
logger.warning(f"URL {pdf_url} returned HTML content (detected HTML markers), skipping")
raise Exception("Server returned HTML instead of PDF")
logger.warning(f"Downloaded file from {pdf_url} doesn't appear to be a PDF (no PDF magic bytes), skipping")
raise Exception("File doesn't appear to be a valid PDF")
return pdf_content
elif response.status == 406:
logger.warning(f"HTTP 406 for {pdf_url}, will retry with minimal headers (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
else:
raise Exception(f"HTTP {response.status} (Not Acceptable) - server rejected request after {max_retries} attempts")
else:
raise Exception(f"HTTP {response.status} when downloading PDF")
except asyncio.TimeoutError:
logger.warning(f"Timeout downloading PDF from {pdf_url} (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
else:
raise Exception("Connection timeout after retries")
except aiohttp.ClientError as e:
logger.warning(f"Connection error downloading PDF from {pdf_url} (attempt {attempt + 1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
else:
raise Exception(f"Connection error: {str(e)}")
except Exception as e:
raise
return None
# Process PDF URLs
current_dokumente = list(gemeinde.dokumente) if gemeinde.dokumente else []
# Sanitize Gemeinde name for filename
safe_name = "".join(c for c in municipality_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_name = safe_name.replace(' ', '_')
if not safe_name:
safe_name = "Gemeinde"
# Determine base label based on language
if language == 'fr':
base_doc_label = f"Plan d'aménagement local {municipality_name}"
elif language == 'it':
base_doc_label = f"Piano di utilizzazione {municipality_name}"
else:
base_doc_label = f"BZO {municipality_name}"
# Process each PDF URL
for idx, pdf_url in enumerate(pdf_urls):
try:
logger.info(f"Downloading PDF {idx + 1}/{len(pdf_urls)} from {pdf_url} for {municipality_name}")
pdf_content = await download_pdf(pdf_url)
if not pdf_content or len(pdf_content) < 100:
logger.warning(f"Failed to download PDF from {pdf_url} for {municipality_name}")
continue
# Create unique file name
if len(pdf_urls) > 1:
file_name = f"BZO_{safe_name}_{idx + 1}.pdf"
doc_label = f"{base_doc_label} ({idx + 1})"
else:
file_name = f"BZO_{safe_name}.pdf"
doc_label = base_doc_label
# Store file using ComponentObjects
try:
file_item = componentInterface.createFile(
name=file_name,
mimeType="application/pdf",
content=pdf_content
)
componentInterface.createFileData(file_item.id, pdf_content)
logger.info(f"Stored file {file_name} with ID {file_item.id}")
except Exception as e:
logger.error(f"Error storing file {file_name}: {str(e)}", exc_info=True)
continue
# Create Dokument record
dokument = Dokument(
mandateId=currentUser.mandateId,
label=doc_label,
versionsbezeichnung="Aktuell",
dokumentTyp=DokumentTyp.GEMEINDE_BZO_AKTUELL,
dokumentReferenz=file_item.id,
quelle=pdf_url,
mimeType="application/pdf",
kategorienTags=["BZO", "Bauordnung", municipality_name]
)
# Create Dokument record
created_dokument = realEstateInterface.createDokument(dokument)
logger.info(f"Created Dokument record with ID {created_dokument.id}")
current_dokumente.append(created_dokument)
# Add to response
bzo_documents.append({
"id": created_dokument.id,
"label": created_dokument.label,
"dokumentTyp": created_dokument.dokumentTyp.value if created_dokument.dokumentTyp else None,
"dokumentReferenz": created_dokument.dokumentReferenz,
"quelle": created_dokument.quelle,
"mimeType": created_dokument.mimeType
})
except Exception as e:
logger.error(f"Error processing PDF {pdf_url}: {str(e)}", exc_info=True)
continue
# Update Gemeinde with new dokumente
if bzo_documents:
updated_gemeinde = realEstateInterface.updateGemeinde(
gemeinde.id,
{"dokumente": current_dokumente}
)
if updated_gemeinde:
logger.info(f"Successfully created {len(bzo_documents)} BZO document(s) for {municipality_name}")
else:
logger.warning(f"No search results found for {municipality_name}")
except Exception as e:
logger.error(f"Error fetching BZO documents for {municipality_name}: {e}", exc_info=True)
# Continue without documents - don't fail the request
elif fetch_documents:
if not municipality_name:
logger.warning("fetch_documents=true but municipality_name is not available, skipping document fetch")
elif not canton:
logger.warning("fetch_documents=true but canton is not available, skipping document fetch")
# Add Gemeinde and documents to response if available
logger.debug(f"Adding to response: gemeinde_info={gemeinde_info is not None}, bzo_documents count={len(bzo_documents)}")
if gemeinde_info:
response_data["gemeinde"] = gemeinde_info
logger.debug(f"Added gemeinde_info to response: {gemeinde_info}")
if bzo_documents:
response_data["documents"] = bzo_documents
logger.info(f"Added {len(bzo_documents)} BZO documents to response")
else:
logger.debug("No BZO documents to add to response")
return response_data
except HTTPException:
raise
except Exception as e:
logger.error(f"Error searching parcel: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error searching parcel: {str(e)}"
)
@router.post("/projekt/{projekt_id}/add-parcel", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def add_parcel_to_project(
request: Request,
projekt_id: str = Path(..., description="Projekt ID"),
body: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Add a parcel to an existing project.
This endpoint can either:
1. Link an existing Parzelle to the Projekt
2. Create a new Parzelle from location data and link it
Request Body:
Option 1 - Link existing parcel:
{
"parcelId": "existing-parcel-id"
}
Option 2 - Create new parcel from location:
{
"location": "Hauptstrasse 42, 8000 Zürich"
}
Option 3 - Create new parcel with custom data:
{
"parcelData": {
"label": "Parzelle 123",
"strasseNr": "Hauptstrasse 42",
"plz": "8000",
"bauzone": "W3",
...
}
}
Headers:
- X-CSRF-Token: CSRF token (required for security)
Returns:
{
"projekt": {...}, // Updated Projekt
"parzelle": {...} // Parcel that was added
}
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for POST /api/realestate/projekt/{projekt_id}/add-parcel from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Validate CSRF token format
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
try:
int(csrf_token, 16)
except ValueError:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Adding parcel to project {projekt_id} for user {currentUser.id} (mandate: {currentUser.mandateId})")
# Get interface
realEstateInterface = getRealEstateInterface(currentUser)
# Fetch existing Projekt
projekte = realEstateInterface.getProjekte(
recordFilter={"id": projekt_id, "mandateId": currentUser.mandateId}
)
if not projekte:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Projekt {projekt_id} not found"
)
projekt = projekte[0]
# Determine which option was used
parcel_id = body.get("parcelId")
location = body.get("location")
parcel_data_dict = body.get("parcelData")
parzelle = None
# Option 1: Link existing parcel
if parcel_id:
logger.info(f"Linking existing parcel {parcel_id}")
parcels = realEstateInterface.getParzellen(
recordFilter={"id": parcel_id, "mandateId": currentUser.mandateId}
)
if not parcels:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Parzelle {parcel_id} not found"
)
parzelle = parcels[0]
# Option 2: Create from location
elif location:
logger.info(f"Creating parcel from location: {location}")
# Initialize connector and search for parcel
connector = SwissTopoMapServerConnector()
parcel_data = await connector.search_parcel(location)
if not parcel_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No parcel found at location: {location}"
)
# Extract attributes
extracted_attributes = connector.extract_parcel_attributes(parcel_data)
attributes = parcel_data.get("attributes", {})
# Create Parzelle
parzelle_create_data = {
"mandateId": currentUser.mandateId,
"label": extracted_attributes.get("label") or attributes.get("number") or "Unknown",
"parzellenAliasTags": [attributes.get("egris_egrid")] if attributes.get("egris_egrid") else [],
"eigentuemerschaft": None,
"strasseNr": location if not location.replace(",", "").replace(".", "").replace(" ", "").isdigit() else None,
"plz": None,
"perimeter": extracted_attributes.get("perimeter"),
"baulinie": None,
"kontextGemeinde": None,
"bauzone": None,
"az": None,
"bz": None,
"vollgeschossZahl": None,
"anrechenbarDachgeschoss": None,
"anrechenbarUntergeschoss": None,
"gebaeudehoeheMax": None,
"regelnGrenzabstand": [],
"regelnMehrlaengenzuschlag": [],
"regelnMehrhoehenzuschlag": [],
"parzelleBebaut": None,
"parzelleErschlossen": None,
"parzelleHanglage": None,
"laermschutzzone": None,
"hochwasserschutzzone": None,
"grundwasserschutzzone": None,
"parzellenNachbarschaft": [],
"dokumente": [],
"kontextInformationen": [
Kontext(
thema="Swiss Topo Data",
inhalt=json.dumps({
"egrid": attributes.get("egris_egrid"),
"identnd": attributes.get("identnd"),
"canton": attributes.get("ak"),
"municipality_code": attributes.get("bfsnr"),
"geoportal_url": attributes.get("geoportal_url")
}, ensure_ascii=False)
)
]
}
parzelle_instance = Parzelle(**parzelle_create_data)
parzelle = realEstateInterface.createParzelle(parzelle_instance)
# Option 3: Create from custom data
elif parcel_data_dict:
logger.info(f"Creating parcel from custom data")
parcel_data_dict["mandateId"] = currentUser.mandateId
parzelle_instance = Parzelle(**parcel_data_dict)
parzelle = realEstateInterface.createParzelle(parzelle_instance)
else:
raise ValueError("One of 'parcelId', 'location', or 'parcelData' is required")
# Add parcel to project
if parzelle not in projekt.parzellen:
projekt.parzellen.append(parzelle)
# Update projekt perimeter if needed (use first parcel's perimeter)
if not projekt.perimeter and parzelle.perimeter:
projekt.perimeter = parzelle.perimeter
# Update Projekt
updated_projekt = realEstateInterface.updateProjekt(projekt)
logger.info(f"Added Parzelle {parzelle.id} to Projekt {projekt_id}")
return {
"projekt": updated_projekt.model_dump(),
"parzelle": parzelle.model_dump()
}
except ValueError as e:
logger.error(f"Validation error in add_parcel_to_project: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Validation error: {str(e)}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error adding parcel to project: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error adding parcel to project: {str(e)}"
)
@router.get("/bzo-information", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def get_bzo_information(
request: Request,
gemeinde: str = Query(..., description="Gemeinde name or ID"),
bauzone: str = Query(..., description="Bauzone code (e.g., W3, W2/30)"),
total_area_m2: Optional[float] = Query(None, description="Total parcel area (m²) for Machbarkeitsstudie"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Extract BZO information from PDF documents for a specific Bauzone in a Gemeinde.
Uses a langgraph workflow to extract content from BZO PDF documents for the
specified Gemeinde, then uses AI to search for relevant information specific
to the specified Bauzone.
The workflow:
1. Finds BZO documents for the Gemeinde (by name or ID)
2. Extracts content from PDFs using langgraph workflow
3. Filters rules, zones, and articles by Bauzone
4. Uses AI to generate a summary and find relevant information
Query Parameters:
- gemeinde: Gemeinde name (e.g., "Zürich") or ID
- bauzone: Bauzone code (e.g., "W3", "W2/30", "Z3")
Headers:
- X-CSRF-Token: CSRF token (required for security)
Returns:
{
"bauzone": "W3",
"gemeinde": {
"id": "...",
"label": "...",
"plz": "..."
},
"extracted_content": {
"zones": [...], // Zone information filtered by Bauzone
"rules": [...], // Rules filtered by Bauzone
"articles": [...], // Articles filtered by Bauzone
"total_zones": N,
"total_rules": N,
"total_articles": N
},
"ai_summary": "...", // AI-generated summary
"relevant_rules": [...], // Rules specifically for this Bauzone
"documents_processed": [ // List of document IDs processed
{
"id": "...",
"label": "...",
"dokumentTyp": "..."
}
],
"errors": [...],
"warnings": [...]
}
Examples:
- GET /api/realestate/bzo-information?gemeinde=Zürich&bauzone=W3
- GET /api/realestate/bzo-information?gemeinde=Uster&bauzone=W2/30
Raises:
- 404: Gemeinde not found
- 404: No BZO documents found for Gemeinde
- 500: Error during extraction or processing
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for GET /api/realestate/bzo-information from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for GET /api/realestate/bzo-information from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for GET /api/realestate/bzo-information from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}' (user: {currentUser.id}, mandate: {currentUser.mandateId})")
# Call the feature function
result = await extract_bzo_information(
currentUser=currentUser,
gemeinde=gemeinde,
bauzone=bauzone,
total_area_m2=total_area_m2,
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}': {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error extracting BZO information: {str(e)}"
)