gateway/modules/features/realEstate/routeFeatureRealEstate.py

2118 lines
89 KiB
Python

"""
Real Estate routes for the backend API.
Implements stateless endpoints for real estate database operations with AI-powered natural language processing.
"""
import asyncio
import json
import logging
import re
import aiohttp
import requests
from typing import Optional, Dict, Any, List, Union
from fastapi import APIRouter, HTTPException, Depends, Body, Request, Query, Path, status
from fastapi.responses import JSONResponse
# Import auth modules
from modules.auth import limiter, getRequestContext, RequestContext
# Import models
from modules.datamodels.datamodelPagination import (
PaginationParams,
PaginatedResponse,
PaginationMetadata,
normalize_pagination_dict,
)
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
from .datamodelFeatureRealEstate import (
Projekt,
Parzelle,
Dokument,
DokumentTyp,
Gemeinde,
Kanton,
Land,
Kontext,
StatusProzess,
)
# Import interfaces
from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface
# Import feature logic for AI-powered commands
from .mainRealEstate import (
processNaturalLanguageCommand,
create_project_with_parcel_data,
extract_bzo_information,
)
from .parcelSelectionService import compute_selection_summary, is_parcel_adjacent_to_selection
# Import Swiss Topo MapServer, ÖREB and Zurich WFS connectors
from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector
from modules.connectors.connectorOerebWfs import OerebWfsConnector
from modules.connectors.connectorZhWfsParcels import ZhWfsParcelsConnector
# Import ComponentObjects and Tavily for BZO document fetch
from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface
from modules.aicore.aicorePluginTavily import AiTavily
# Import attribute utilities for model schema
from modules.shared.attributeUtils import getModelAttributeDefinitions
# Configure logger
logger = logging.getLogger(__name__)
# Create router for real estate endpoints
router = APIRouter(
prefix="/api/realestate",
tags=["Real Estate"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"}
}
)
# ===== Helper Functions (instanceId-based routes, backend-driven like Trustee) =====
def _parsePagination(pagination: Optional[str]) -> Optional[PaginationParams]:
"""Parse pagination parameter from JSON string."""
if not pagination:
return None
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
return PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
return None
def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str:
"""
Validate that the user has access to the feature instance.
Returns the mandateId for the instance.
"""
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(instanceId)
if not instance:
raise HTTPException(
status_code=404,
detail=f"Feature instance '{instanceId}' not found"
)
if instance.featureCode != "realestate":
raise HTTPException(
status_code=400,
detail=f"Instance '{instanceId}' is not a realestate instance"
)
if not context.hasSysAdminRole:
featureAccesses = rootInterface.getFeatureAccessesForUser(str(context.user.id))
hasAccess = any(
str(fa.featureInstanceId) == instanceId and fa.enabled
for fa in featureAccesses
)
if not hasAccess:
raise HTTPException(
status_code=403,
detail=f"Access denied to feature instance '{instanceId}'"
)
return str(instance.mandateId)
# Mapping of entity names to Pydantic model classes (for attributes endpoint)
_REALESTATE_ENTITY_MODELS = {
"Projekt": Projekt,
"Parzelle": Parzelle,
"Dokument": Dokument,
"Gemeinde": Gemeinde,
"Kanton": Kanton,
"Land": Land,
}
# ============================================================================
# INSTANCE-ID ROUTES (backend-driven, analog to Trustee)
# ============================================================================
@router.get("/{instanceId}/attributes/{entityType}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
def get_entity_attributes(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
entityType: str = Path(..., description="Entity type (e.g., Projekt, Parzelle)"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get attribute definitions for a Real Estate entity. Used by FormGeneratorTable."""
_validateInstanceAccess(instanceId, context)
if entityType not in _REALESTATE_ENTITY_MODELS:
raise HTTPException(
status_code=404,
detail=f"Unknown entity type: {entityType}. Valid types: {list(_REALESTATE_ENTITY_MODELS.keys())}"
)
modelClass = _REALESTATE_ENTITY_MODELS[entityType]
try:
attrDefs = getModelAttributeDefinitions(modelClass)
visibleAttrs = [
attr for attr in attrDefs.get("attributes", [])
if isinstance(attr, dict) and attr.get("visible", True)
]
return {"attributes": visibleAttrs}
except Exception as e:
logger.error(f"Error getting attributes for {entityType}: {e}")
raise HTTPException(
status_code=500,
detail=f"Error getting attributes for {entityType}: {str(e)}"
)
@router.get("/{instanceId}/projects/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def get_project_options(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get project options for select dropdowns. Returns: [{ value, label }]"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
items = interface.getProjekte(recordFilter={"featureInstanceId": instanceId})
return [{"value": p.id, "label": getattr(p, "label", None) or p.id} for p in items]
@router.get("/{instanceId}/parcels/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def get_parcel_options(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get parcel options for select dropdowns. Returns: [{ value, label }]"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
items = interface.getParzellen(recordFilter={"featureInstanceId": instanceId})
return [{"value": p.id, "label": getattr(p, "label", None) or p.id} for p in items]
# ----- Projects CRUD -----
@router.get("/{instanceId}/projects", response_model=PaginatedResponse[Projekt])
@limiter.limit("30/minute")
def get_projects(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[Projekt]:
"""Get all projects for a feature instance with optional pagination."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
recordFilter = {"featureInstanceId": instanceId}
items = interface.getProjekte(recordFilter=recordFilter)
paginationParams = _parsePagination(pagination)
if paginationParams:
if paginationParams.sort:
for sort_field in reversed(paginationParams.sort):
field_name = sort_field.field
direction = sort_field.direction.lower()
items.sort(
key=lambda x: getattr(x, field_name, None),
reverse=(direction == "desc")
)
total_items = len(items)
total_pages = (total_items + paginationParams.pageSize - 1) // paginationParams.pageSize
start_idx = (paginationParams.page - 1) * paginationParams.pageSize
end_idx = start_idx + paginationParams.pageSize
paginated_items = items[start_idx:end_idx]
return PaginatedResponse(
items=paginated_items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=total_items,
totalPages=total_pages,
sort=paginationParams.sort or [],
filters=paginationParams.filters
)
)
return PaginatedResponse(items=items, pagination=None)
@router.get("/{instanceId}/projects/{projectId}", response_model=Projekt)
@limiter.limit("30/minute")
def get_project_by_id(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
projectId: str = Path(..., description="Project ID"),
context: RequestContext = Depends(getRequestContext)
) -> Projekt:
"""Get a single project by ID."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
projekt = interface.getProjekt(projectId)
if not projekt or str(getattr(projekt, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Project '{projectId}' not found")
return projekt
@router.post("/{instanceId}/projects", response_model=Projekt)
@limiter.limit("30/minute")
def create_project(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Projekt:
"""Create a new project."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
if "mandateId" not in data:
data["mandateId"] = mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = instanceId
try:
projekt = Projekt(**data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid data: {str(e)}")
return interface.createProjekt(projekt)
@router.put("/{instanceId}/projects/{projectId}", response_model=Projekt)
@limiter.limit("30/minute")
def update_project(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
projectId: str = Path(..., description="Project ID"),
data: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Projekt:
"""Update a project."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
projekt = interface.getProjekt(projectId)
if not projekt or str(getattr(projekt, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Project '{projectId}' not found")
updated = interface.updateProjekt(projectId, data)
if not updated:
raise HTTPException(status_code=500, detail="Update failed")
return updated
@router.delete("/{instanceId}/projects/{projectId}", status_code=status.HTTP_204_NO_CONTENT)
@limiter.limit("30/minute")
def delete_project(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
projectId: str = Path(..., description="Project ID"),
context: RequestContext = Depends(getRequestContext)
) -> None:
"""Delete a project."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
projekt = interface.getProjekt(projectId)
if not projekt or str(getattr(projekt, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Project '{projectId}' not found")
if not interface.deleteProjekt(projectId):
raise HTTPException(status_code=500, detail="Delete failed")
# ----- Parcels CRUD -----
@router.get("/{instanceId}/parcels", response_model=PaginatedResponse[Parzelle])
@limiter.limit("30/minute")
def get_parcels(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[Parzelle]:
"""Get all parcels for a feature instance with optional pagination."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
recordFilter = {"featureInstanceId": instanceId}
items = interface.getParzellen(recordFilter=recordFilter)
paginationParams = _parsePagination(pagination)
if paginationParams:
if paginationParams.sort:
for sort_field in reversed(paginationParams.sort):
field_name = sort_field.field
direction = sort_field.direction.lower()
items.sort(
key=lambda x: getattr(x, field_name, None),
reverse=(direction == "desc")
)
total_items = len(items)
total_pages = (total_items + paginationParams.pageSize - 1) // paginationParams.pageSize
start_idx = (paginationParams.page - 1) * paginationParams.pageSize
end_idx = start_idx + paginationParams.pageSize
paginated_items = items[start_idx:end_idx]
return PaginatedResponse(
items=paginated_items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=total_items,
totalPages=total_pages,
sort=paginationParams.sort or [],
filters=paginationParams.filters
)
)
return PaginatedResponse(items=items, pagination=None)
@router.get("/{instanceId}/parcels/{parcelId}", response_model=Parzelle)
@limiter.limit("30/minute")
def get_parcel_by_id(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
parcelId: str = Path(..., description="Parcel ID"),
context: RequestContext = Depends(getRequestContext)
) -> Parzelle:
"""Get a single parcel by ID."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
parzelle = interface.getParzelle(parcelId)
if not parzelle or str(getattr(parzelle, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Parcel '{parcelId}' not found")
return parzelle
@router.post("/{instanceId}/parcels", response_model=Parzelle)
@limiter.limit("30/minute")
def create_parcel(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
data: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Parzelle:
"""Create a new parcel."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
if "mandateId" not in data:
data["mandateId"] = mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = instanceId
try:
parzelle = Parzelle(**data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid data: {str(e)}")
return interface.createParzelle(parzelle)
@router.put("/{instanceId}/parcels/{parcelId}", response_model=Parzelle)
@limiter.limit("30/minute")
def update_parcel(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
parcelId: str = Path(..., description="Parcel ID"),
data: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Parzelle:
"""Update a parcel."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
parzelle = interface.getParzelle(parcelId)
if not parzelle or str(getattr(parzelle, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Parcel '{parcelId}' not found")
updated = interface.updateParzelle(parcelId, data)
if not updated:
raise HTTPException(status_code=500, detail="Update failed")
return updated
@router.delete("/{instanceId}/parcels/{parcelId}", status_code=status.HTTP_204_NO_CONTENT)
@limiter.limit("30/minute")
def delete_parcel(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
parcelId: str = Path(..., description="Parcel ID"),
context: RequestContext = Depends(getRequestContext)
) -> None:
"""Delete a parcel."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
parzelle = interface.getParzelle(parcelId)
if not parzelle or str(getattr(parzelle, "featureInstanceId", None)) != instanceId:
raise HTTPException(status_code=404, detail=f"Parcel '{parcelId}' not found")
if not interface.deleteParzelle(parcelId):
raise HTTPException(status_code=500, detail="Delete failed")
# ===== Helpers for Gemeinde/BZO routes =====
def _get_language_from_kanton(kanton_abk: Optional[str]) -> str:
"""Determine language (de/fr/it) based on Kanton abbreviation."""
if not kanton_abk:
return "de"
french_cantons = {"VD", "GE", "NE", "JU"}
italian_cantons = {"TI"}
kanton_upper = kanton_abk.upper()
if kanton_upper in french_cantons:
return "fr"
if kanton_upper in italian_cantons:
return "it"
return "de"
def _get_bzo_search_query(gemeinde_label: str, language: str) -> str:
"""Generate language-specific BZO search query for a Gemeinde."""
if language == "fr":
return f"Plan d'aménagement local {gemeinde_label} OR Règlement de construction {gemeinde_label}"
if language == "it":
return f"Piano di utilizzazione {gemeinde_label} OR Regolamento edilizio {gemeinde_label}"
return f"Bau und Zonenordnung {gemeinde_label}"
# ----- Instance-scoped Gemeinde and BZO routes -----
@router.get("/{instanceId}/gemeinden", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def get_instance_gemeinden(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
only_current: bool = Query(True, description="Only current municipalities (exclude historical)"),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""
Fetch all Gemeinden from Swiss Topo and save to DB for this instance.
Creates Kantone as needed. Scoped to instance mandateId.
"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
try:
oereb_connector = OerebWfsConnector()
connector = SwissTopoMapServerConnector(oereb_connector=oereb_connector)
gemeinden_data = await connector.get_all_gemeinden(only_current=only_current)
except Exception as e:
logger.error(f"Error fetching Gemeinden from Swiss Topo: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error fetching Gemeinden: {str(e)}")
gemeinden_created = 0
gemeinden_skipped = 0
kantone_created = 0
errors: List[str] = []
kanton_cache: Dict[str, str] = {}
def find_gemeinde_by_bfs_nummer(bfs_nummer: str) -> Optional[Any]:
try:
gemeinden = interface.getGemeinden(recordFilter={"mandateId": mandateId})
for g in gemeinden:
for k in (g.kontextInformationen or []):
try:
data = json.loads(k.inhalt) if isinstance(k.inhalt, str) else k.inhalt
if isinstance(data, dict) and str(data.get("bfs_nummer")) == str(bfs_nummer):
return g
except (json.JSONDecodeError, AttributeError):
continue
except Exception as ex:
logger.error(f"Error finding Gemeinde by BFS {bfs_nummer}: {ex}", exc_info=True)
return None
def get_or_create_kanton(kanton_abk: str) -> Optional[str]:
nonlocal kantone_created, errors
if not kanton_abk:
return None
if kanton_abk in kanton_cache:
return kanton_cache[kanton_abk]
kantone = interface.getKantone(recordFilter={"mandateId": mandateId, "abk": kanton_abk})
if kantone:
kanton_cache[kanton_abk] = kantone[0].id
return kantone[0].id
kanton_names = {
"AG": "Aargau", "AI": "Appenzell Innerrhoden", "AR": "Appenzell Ausserrhoden",
"BE": "Bern", "BL": "Basel-Landschaft", "BS": "Basel-Stadt",
"FR": "Freiburg", "GE": "Genf", "GL": "Glarus", "GR": "Graubünden",
"JU": "Jura", "LU": "Luzern", "NE": "Neuenburg", "NW": "Nidwalden",
"OW": "Obwalden", "SG": "St. Gallen", "SH": "Schaffhausen", "SO": "Solothurn",
"SZ": "Schwyz", "TG": "Thurgau", "TI": "Tessin", "UR": "Uri",
"VD": "Waadt", "VS": "Wallis", "ZG": "Zug", "ZH": "Zürich",
}
try:
kanton_label = kanton_names.get(kanton_abk, kanton_abk)
kanton = Kanton(
mandateId=mandateId,
featureInstanceId=instanceId,
label=kanton_label,
abk=kanton_abk,
)
created = interface.createKanton(kanton)
if created and created.id:
kanton_cache[kanton_abk] = created.id
kantone_created += 1
return created.id
except Exception as ex:
errors.append(f"Error creating Kanton {kanton_abk}: {ex}")
return None
saved_gemeinden: List[Dict[str, Any]] = []
for gd in gemeinden_data:
try:
gemeinde_name = gd.get("name")
bfs_nummer = gd.get("bfs_nummer")
kanton_abk = gd.get("kanton")
if not gemeinde_name or bfs_nummer is None:
gemeinden_skipped += 1
continue
existing = find_gemeinde_by_bfs_nummer(str(bfs_nummer))
if existing:
gemeinden_skipped += 1
saved_gemeinden.append(existing.model_dump() if hasattr(existing, "model_dump") else existing)
continue
kanton_id = get_or_create_kanton(kanton_abk) if kanton_abk else None
gemeinde = Gemeinde(
mandateId=mandateId,
featureInstanceId=instanceId,
label=gemeinde_name,
id_kanton=kanton_id,
kontextInformationen=[
Kontext(thema="BFS Nummer", inhalt=json.dumps({"bfs_nummer": bfs_nummer}, ensure_ascii=False))
],
)
created = interface.createGemeinde(gemeinde)
if created and created.id:
gemeinden_created += 1
saved_gemeinden.append(created.model_dump() if hasattr(created, "model_dump") else created)
else:
errors.append(f"Failed to create Gemeinde {gemeinde_name}")
gemeinden_skipped += 1
except Exception as ex:
errors.append(f"Error processing {gd.get('name', 'Unknown')}: {str(ex)}")
gemeinden_skipped += 1
return {
"gemeinden": saved_gemeinden,
"count": len(saved_gemeinden),
"stats": {
"gemeinden_created": gemeinden_created,
"gemeinden_skipped": gemeinden_skipped,
"kantone_created": kantone_created,
"error_count": len(errors),
"errors": errors[:10],
},
}
@router.post("/{instanceId}/gemeinden/fetch-bzo-documents", response_model=Dict[str, Any])
@limiter.limit("10/hour")
async def fetch_instance_bzo_documents(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Search for and download BZO documents for all Gemeinden of this instance (1 doc per Gemeinde, no duplicates)."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
componentInterface = getComponentInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
from modules.features.realEstate.realEstateGemeindeService import fetch_bzo_for_gemeinde
gemeinden = interface.getGemeinden(recordFilter={"mandateId": mandateId})
stats = {"gemeinden_processed": 0, "documents_created": 0, "documents_skipped": 0, "errors": []}
results: List[Dict[str, Any]] = []
for gemeinde in gemeinden:
gr = {"gemeinde_id": gemeinde.id, "gemeinde_label": gemeinde.label, "status": None, "dokument_ids": [], "error": None}
try:
stats["gemeinden_processed"] += 1
fetched = await fetch_bzo_for_gemeinde(
interface, componentInterface, gemeinde, mandateId, instanceId
)
if fetched:
gr["status"] = "created"
stats["documents_created"] += 1
refreshed = interface.getGemeinde(gemeinde.id)
if refreshed and refreshed.dokumente:
for doc in refreshed.dokumente:
doc_id = getattr(doc, "id", None) or (doc.get("id") if isinstance(doc, dict) else None)
if doc_id:
gr["dokument_ids"].append(doc_id)
else:
gr["status"] = "skipped"
stats["documents_skipped"] += 1
except Exception as ex:
gr["status"] = "error"
gr["error"] = str(ex)
stats["errors"].append(f"{gemeinde.label}: {str(ex)}")
results.append(gr)
return {"success": True, "stats": stats, "results": results}
@router.get("/{instanceId}/parcel-documents", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def get_parcel_documents(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
gemeinde: str = Query(..., description="Gemeinde name (e.g. Zürich)"),
bauzone: str = Query(..., description="Bauzone code (e.g. W5)"),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""
Ensure BZO document exists for Gemeinde, return documents for parcel info display.
Creates Gemeinde (Swiss Topo) and BZO (Tavily) if not in DB.
Returns documents for preview - does NOT run LangGraph.
"""
mandateId = _validateInstanceAccess(instanceId, context)
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
componentInterface = getComponentInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
from modules.features.realEstate.realEstateGemeindeService import (
ensure_single_gemeinde,
fetch_bzo_for_gemeinde,
)
gemeinde_obj = None
by_label = interface.getGemeinden(recordFilter={"label": gemeinde, "mandateId": mandateId})
gemeinde_obj = by_label[0] if by_label else None
if not gemeinde_obj:
gemeinde_obj = await ensure_single_gemeinde(interface, mandateId, instanceId, gemeinde_name=gemeinde)
if not gemeinde_obj:
return {"documents": [], "error": f"Gemeinde '{gemeinde}' nicht gefunden"}
bzo_docs = []
if gemeinde_obj.dokumente:
for doc in gemeinde_obj.dokumente:
typ = getattr(doc, "dokumentTyp", None) or (doc.get("dokumentTyp") if isinstance(doc, dict) else None)
if typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION] or str(typ) in ["gemeindeBzoAktuell", "gemeindeBzoRevision"]:
doc_id = doc.id if hasattr(doc, "id") else doc.get("id")
if doc_id:
full = interface.getDokument(doc_id)
if full and full.dokumentReferenz:
bzo_docs.append(full)
if not bzo_docs:
fetched = await fetch_bzo_for_gemeinde(interface, componentInterface, gemeinde_obj, mandateId, instanceId)
if fetched:
gemeinde_obj = interface.getGemeinde(gemeinde_obj.id)
if gemeinde_obj and gemeinde_obj.dokumente:
for doc in gemeinde_obj.dokumente:
typ = getattr(doc, "dokumentTyp", None) or (doc.get("dokumentTyp") if isinstance(doc, dict) else None)
if typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]:
doc_id = doc.id if hasattr(doc, "id") else doc.get("id")
if doc_id:
full = interface.getDokument(doc_id)
if full and full.dokumentReferenz:
bzo_docs.append(full)
result = []
for d in bzo_docs:
result.append({
"id": d.id,
"label": d.label,
"fileId": d.dokumentReferenz,
"fileName": (d.label or "BZO") + ".pdf",
"mimeType": d.mimeType or "application/pdf",
})
return {"documents": result, "gemeinde": gemeinde, "bauzone": bauzone}
@router.get("/{instanceId}/bzo-information", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def get_instance_bzo_information(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
gemeinde: str = Query(..., description="Gemeinde name or ID"),
bauzone: str = Query(..., description="Bauzone code (e.g., W3, W2/30)"),
total_area_m2: Optional[float] = Query(None, description="Total parcel area (m²) for Machbarkeitsstudie"),
parcel_ids: Optional[str] = Query(None, description="Comma-separated parcel IDs; total area computed from parcels"),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Extract BZO information for a Bauzone in a Gemeinde. Runs LangGraph workflow. With total_area_m2 or parcel_ids, includes Machbarkeitsstudie."""
mandateId = _validateInstanceAccess(instanceId, context)
parcels = None
if parcel_ids:
ids = [x.strip() for x in parcel_ids.split(",") if x.strip()]
if ids:
interface = getRealEstateInterface(
context.user, mandateId=mandateId, featureInstanceId=instanceId
)
parcels = []
for pid in ids:
p = interface.getParzelle(pid)
if p:
flat = dict(p) if hasattr(p, "keys") else (vars(p) if hasattr(p, "__dict__") else {})
parcels.append({"parcel": flat, "map_view": flat.get("map_view", {})})
return await extract_bzo_information(
currentUser=context.user,
gemeinde=gemeinde,
bauzone=bauzone,
mandateId=mandateId,
featureInstanceId=instanceId,
total_area_m2=total_area_m2,
parcels=parcels,
)
# ============================================================================
# LEGACY / STATELESS ROUTES (unchanged)
# ============================================================================
@router.post("/command", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def process_command(
request: Request,
userInput: str = Body(..., embed=True, description="Natural language command"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Process natural language command and execute corresponding CRUD operation.
Uses AI to analyze user intent and extract parameters, then executes the appropriate
CRUD operation. Works stateless without session management.
Example user inputs:
- "Erstelle ein neues Projekt namens 'Hauptstrasse 42'"
- "Zeige mir alle Projekte in Zuerich"
- "Aktualisiere Projekt XYZ mit Status 'Planung'"
- "Loesche Parzelle ABC"
- "SELECT * FROM Projekt WHERE plz = '8000'"
Headers:
- X-CSRF-Token: CSRF token (required for security)
Returns:
{
"success": true,
"intent": "CREATE|READ|UPDATE|DELETE|QUERY",
"entity": "Projekt|Parzelle|...|null",
"result": {...}
}
"""
try:
# Validate CSRF token (middleware also checks, but explicit validation for better error messages)
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for POST /api/realestate/command from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for POST /api/realestate/command from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for POST /api/realestate/command from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Processing command request from user {context.user.id} (mandate: {context.mandateId})")
logger.debug(f"User input: {userInput}")
# Process natural language command with AI
result = await processNaturalLanguageCommand(
currentUser=context.user,
mandateId=str(context.mandateId),
userInput=userInput
)
return result
except ValueError as e:
logger.error(f"Validation error in process_command: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Validation error: {str(e)}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing command: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error processing command: {str(e)}"
)
@router.get("/tables", response_model=Dict[str, Any])
@limiter.limit("120/minute")
def get_available_tables(
request: Request,
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Get all available real estate tables.
Returns a list of available table names with their descriptions.
Headers:
- X-CSRF-Token: CSRF token (required for security)
Example:
- GET /api/realestate/tables
"""
try:
# Validate CSRF token if provided
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for GET /api/realestate/tables from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for GET /api/realestate/tables from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for GET /api/realestate/tables from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Getting available tables for user {context.user.id} (mandate: {context.mandateId})")
# Define available tables with descriptions
tables = [
{
"name": "Projekt",
"description": "Real estate projects",
"model": "Projekt"
},
{
"name": "Parzelle",
"description": "Plots/parcels",
"model": "Parzelle"
},
{
"name": "Dokument",
"description": "Documents",
"model": "Dokument"
},
{
"name": "Gemeinde",
"description": "Municipalities",
"model": "Gemeinde"
},
{
"name": "Kanton",
"description": "Cantons",
"model": "Kanton"
},
{
"name": "Land",
"description": "Countries",
"model": "Land"
},
]
return {
"tables": tables,
"count": len(tables)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting available tables: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting available tables: {str(e)}"
)
@router.get("/table/{table}", response_model=PaginatedResponse[Any])
@limiter.limit("120/minute")
def get_table_data(
request: Request,
table: str = Path(..., description="Table name (Projekt, Parzelle, Dokument, Gemeinde, Kanton, Land)"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[Dict[str, Any]]:
"""
Get all data from a specific real estate table with optional pagination.
Available tables:
- Projekt: Real estate projects
- Parzelle: Plots/parcels
- Dokument: Documents
- Gemeinde: Municipalities
- Kanton: Cantons
- Land: Countries
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Headers:
- X-CSRF-Token: CSRF token (required for security)
Examples:
- GET /api/realestate/table/Projekt (no pagination - returns all items)
- GET /api/realestate/table/Parzelle?pagination={"page":1,"pageSize":10,"sort":[]}
- GET /api/realestate/table/Gemeinde?pagination={"page":2,"pageSize":20,"sort":[{"field":"label","direction":"asc"}]}
"""
try:
# Validate CSRF token if provided
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for GET /api/realestate/table/{table} from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for GET /api/realestate/table/{table} from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for GET /api/realestate/table/{table} from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Getting table data for '{table}' from user {context.user.id} (mandate: {context.mandateId})")
# Map table names to model classes and getter methods
table_mapping = {
"Projekt": (Projekt, "getProjekte"),
"Parzelle": (Parzelle, "getParzellen"),
"Dokument": (Dokument, "getDokumente"),
"Gemeinde": (Gemeinde, "getGemeinden"),
"Kanton": (Kanton, "getKantone"),
"Land": (Land, "getLaender"),
}
# Validate table name
if table not in table_mapping:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid table name '{table}'. Available tables: {', '.join(table_mapping.keys())}"
)
# Get interface and fetch data
realEstateInterface = getRealEstateInterface(context.user, mandateId=str(context.mandateId) if context.mandateId else None)
model_class, method_name = table_mapping[table]
getter_method = getattr(realEstateInterface, method_name)
# Fetch all records (no filter for now)
records = getter_method(recordFilter=None)
# Keep records as model instances (like routeDataFiles does with FileItem)
# FastAPI will automatically serialize Pydantic models to JSON
items = records
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
paginationParams = PaginationParams(**paginationDict) if paginationDict else None
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Apply pagination if requested
if paginationParams:
# Apply sorting if specified
if paginationParams.sort:
for sort_field in reversed(paginationParams.sort): # Reverse to apply in priority order
field_name = sort_field.field
direction = sort_field.direction.lower()
def sort_key(item):
# Access attribute from model instance
value = getattr(item, field_name, None)
# Handle None values - put them at the end for asc, at the start for desc
if value is None:
return (1, None) # Use tuple to ensure None values sort consistently
return (0, value)
items.sort(key=sort_key, reverse=(direction == "desc"))
# Apply pagination
total_items = len(items)
total_pages = (total_items + paginationParams.pageSize - 1) // paginationParams.pageSize # Ceiling division
start_idx = (paginationParams.page - 1) * paginationParams.pageSize
end_idx = start_idx + paginationParams.pageSize
paginated_items = items[start_idx:end_idx]
return PaginatedResponse(
items=paginated_items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=total_items,
totalPages=total_pages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
# No pagination - return all items (as model instances, like routeDataFiles)
return PaginatedResponse(
items=items,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting table data for '{table}': {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting table data: {str(e)}"
)
@router.post("/table/{table}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def create_table_record(
request: Request,
table: str = Path(..., description="Table name (Projekt, Parzelle, Dokument, Gemeinde, Kanton, Land)"),
data: Dict[str, Any] = Body(..., description="Record data to create"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Create a new record in a specific real estate table.
Available tables:
- Projekt: Real estate projects (with parcel data support)
- Parzelle: Plots/parcels
- Dokument: Documents
- Gemeinde: Municipalities
- Kanton: Cantons
- Land: Countries
Request Body:
For Projekt:
{
"label": "Projekt Bezeichnung",
"statusProzess": "Eingang", // Optional
"parzelle": {
"id": "OE5913",
"egrid": "CH252699779137",
"perimeter": {...},
"geometry": {...}, // Used for baulinie
...
}
}
For other tables:
- JSON object with fields matching the table's data model
Headers:
- X-CSRF-Token: CSRF token (required for security)
Examples:
- POST /api/realestate/table/Projekt
Body: {"label": "Hauptstrasse 42", "parzelle": {...}}
- POST /api/realestate/table/Parzelle
Body: {"label": "Parzelle 1", "strasseNr": "Hauptstrasse 42", "plz": "8000", "bauzone": "W3"}
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for POST /api/realestate/table/{table} from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for POST /api/realestate/table/{table} from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for POST /api/realestate/table/{table} from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Special handling for Projekt with parcel data
if table == "Projekt" and ("parzelle" in data or "parzellen" in data):
logger.info(f"Creating Projekt with parcel data for user {context.user.id} (mandate: {context.mandateId})")
# Extract fields
label = data.get("label")
if not label:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="label is required"
)
status_prozess = data.get("statusProzess", "Eingang")
# Support both single parzelle and multiple parzellen
parzellen_data = []
if "parzellen" in data:
# Multiple parcels
parzellen_data = data.get("parzellen", [])
if not isinstance(parzellen_data, list):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="parzellen must be an array"
)
elif "parzelle" in data:
# Single parcel
parzelle_data = data.get("parzelle")
if parzelle_data:
parzellen_data = [parzelle_data]
if not parzellen_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="parzelle or parzellen data is required"
)
# Use helper function to create project with parcel data
try:
result = await create_project_with_parcel_data(
currentUser=context.user,
mandateId=str(context.mandateId),
projekt_label=label,
parzellen_data=parzellen_data,
status_prozess=status_prozess,
)
# Return in format expected by frontend (single record, not nested)
return result.get("projekt", {})
except HTTPException:
# Re-raise HTTPExceptions directly
raise
except Exception as e:
logger.error(f"Error creating Projekt with parcel data: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating Projekt: {str(e)}"
)
# Standard handling for other tables or Projekt without parcel data
logger.info(f"Creating record in table '{table}' for user {context.user.id} (mandate: {context.mandateId})")
logger.debug(f"Record data: {data}")
# Map table names to model classes and create methods
table_mapping = {
"Projekt": (Projekt, "createProjekt"),
"Parzelle": (Parzelle, "createParzelle"),
"Dokument": (Dokument, "createDokument"),
"Gemeinde": (Gemeinde, "createGemeinde"),
"Kanton": (Kanton, "createKanton"),
"Land": (Land, "createLand"),
}
# Validate table name
if table not in table_mapping:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid table name '{table}'. Available tables: {', '.join(table_mapping.keys())}"
)
# Get interface
realEstateInterface = getRealEstateInterface(context.user, mandateId=str(context.mandateId) if context.mandateId else None)
model_class, method_name = table_mapping[table]
create_method = getattr(realEstateInterface, method_name)
# Ensure mandateId is set from context
if "mandateId" not in data:
data["mandateId"] = str(context.mandateId) if context.mandateId else None
# Create model instance from data
try:
model_instance = model_class(**data)
except Exception as e:
logger.error(f"Error creating {table} model instance: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid data for {table}: {str(e)}"
)
# Create record
try:
created_record = create_method(model_instance)
# Convert to dictionary for response
if hasattr(created_record, 'model_dump'):
return created_record.model_dump()
else:
return created_record
except Exception as e:
logger.error(f"Error creating {table} record: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating {table} record: {str(e)}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating record in table '{table}': {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating record: {str(e)}"
)
@router.get("/parcel/wfs")
@limiter.limit("60/minute")
def get_parcels_wfs(
request: Request,
bbox: str = Query(..., description="Bounding box as minx,miny,maxx,maxy in LV95 (EPSG:2056)"),
context: RequestContext = Depends(getRequestContext)
) -> JSONResponse:
"""
Fetch parcel geometries from geodienste.ch OGC API (Swiss Liegenschaften) within bounding box.
Returns GeoJSON FeatureCollection in WGS84 for map display.
"""
try:
connector = ZhWfsParcelsConnector()
geojson = connector.get_parcels_by_bbox(bbox)
return JSONResponse(content=geojson)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error fetching WFS parcels: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Failed to fetch parcel data from WFS"
)
@router.get("/parcel/search", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def search_parcel(
request: Request,
location: str = Query(..., description="Either coordinates as 'x,y' (LV95) or address string"),
include_adjacent: bool = Query(False, description="Include adjacent parcels information"),
include_bauzone: bool = Query(True, description="Include Bauzone from ÖREB WFS (zone information)"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Search for parcel information by address or coordinates.
Returns comprehensive parcel information including:
- Parcel identification (number, EGRID, etc.)
- Precise boundary geometry for map display
- Administrative context (canton, municipality)
- Bauzone (zone code from ÖREB WFS when include_bauzone=True)
- Link to official cadastral map
- Optional: Adjacent parcels
Query Parameters:
- location: Either coordinates as "x,y" (LV95/EPSG:2056) or address string
- include_adjacent: If true, fetches information about adjacent parcels (slower)
- include_bauzone: If true, queries ÖREB WFS for zone info (Bauzone/Wohnzone)
Headers:
- X-CSRF-Token: CSRF token (required for security)
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for GET /api/realestate/parcel/search from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
logger.info(f"Searching parcel for user {context.user.id} (mandate: {context.mandateId}) with location: {location}")
# Initialize connector
connector = SwissTopoMapServerConnector()
# Search for parcel
parcel_data = await connector.search_parcel(location)
if not parcel_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No parcel found for location: {location}"
)
# Extract and normalize attributes
extracted_attributes = connector.extract_parcel_attributes(parcel_data)
attributes = parcel_data.get("attributes", {})
geometry = parcel_data.get("geometry", {})
# Calculate parcel area from perimeter
area_m2 = None
centroid = None
if extracted_attributes.get("perimeter"):
perimeter = extracted_attributes["perimeter"]
points = perimeter.get("punkte", [])
# Calculate area using shoelace formula
if len(points) >= 3:
area = 0
for i in range(len(points)):
j = (i + 1) % len(points)
area += points[i]["x"] * points[j]["y"]
area -= points[j]["x"] * points[i]["y"]
area_m2 = abs(area / 2)
# Calculate centroid
sum_x = sum(p["x"] for p in points)
sum_y = sum(p["y"] for p in points)
centroid = {
"x": sum_x / len(points),
"y": sum_y / len(points)
}
# Extract canton early (needed for bauzone query and municipality resolution)
canton = attributes.get("ak", "")
# Extract municipality name and address from Swiss Topo data
municipality_name = None
full_address = None
plz = None
# First, try to use geocoded address info if available (more accurate than centroid query)
geocoded_address = parcel_data.get('geocoded_address')
if geocoded_address:
full_address = geocoded_address.get('full_address')
plz = geocoded_address.get('plz')
municipality_name = geocoded_address.get('municipality')
logger.debug(f"Using geocoded address: {full_address}")
# If geocoded address not available, try to get address by querying the address layer
# Use query coordinates (where user clicked/geocoded) instead of parcel centroid
# This ensures we get the address at the exact location, not at the parcel center
query_coords = parcel_data.get('query_coordinates')
address_query_coords = query_coords if query_coords else centroid
if not full_address and address_query_coords:
query_x = address_query_coords['x']
query_y = address_query_coords['y']
logger.debug(f"Querying address layer at query coordinates: ({query_x}, {query_y})")
# Check if this was a coordinate search (not geocoded address)
is_coordinate_search = ',' in location and not any(c.isalpha() for c in location.split(',')[0])
# Use connector's helper method to query building layer
# Use tolerance=1 (minimum) for coordinate searches to get exact building
building_tolerance = 1 if is_coordinate_search else 10
building_result = await connector._query_building_layer(query_x, query_y, tolerance=building_tolerance, buffer=25)
if building_result:
addr_attrs = building_result.get("attributes", {})
logger.debug(f"Address layer attributes: {addr_attrs}")
# Extract address using connector's helper method
address_info = connector._extract_address_from_building_attrs(addr_attrs)
full_address = address_info.get('full_address')
plz = address_info.get('plz')
municipality_name = address_info.get('municipality')
if full_address:
logger.debug(f"Constructed address: {full_address}")
# If address not found via building layer, try to construct from available data
if not full_address:
# Check if location was provided as an address string
if location and any(c.isalpha() for c in location) and "CH" not in location:
# Location looks like an address (not an EGRID)
full_address = location
logger.debug(f"Using location as address: {full_address}")
# Try to extract municipality name from address string (e.g. "Forchstrasse 6c, 8610 Uster")
if not municipality_name and full_address:
plz_municipality_match = re.search(r"\b(\d{4})\s+([A-ZÄÖÜ][a-zäöüß\s-]+)", full_address)
if plz_municipality_match:
extracted_municipality = plz_municipality_match.group(2).strip()
extracted_municipality = re.sub(r"[,;\.]+$", "", extracted_municipality).strip()
if extracted_municipality:
municipality_name = extracted_municipality
if not plz:
plz = plz_municipality_match.group(1)
logger.debug(f"Extracted municipality from address: {municipality_name}")
# Try to extract municipality name from BFSNR if not found
bfsnr = attributes.get("bfsnr")
if not municipality_name and bfsnr and canton and context.mandateId:
try:
interface = getRealEstateInterface(
context.user, mandateId=str(context.mandateId), featureInstanceId=None
)
gemeinden = interface.getGemeinden(recordFilter={"mandateId": str(context.mandateId)})
for g in gemeinden:
for k in (g.kontextInformationen or []):
try:
data = json.loads(k.inhalt) if isinstance(k.inhalt, str) else k.inhalt
if isinstance(data, dict):
bfs = data.get("bfs_nummer") or data.get("bfsnr") or data.get("municipality_code")
if str(bfs) == str(bfsnr):
municipality_name = g.label
logger.debug(f"Found Gemeinde by BFS {bfsnr} in DB: {municipality_name}")
break
except (json.JSONDecodeError, AttributeError):
continue
if municipality_name:
break
except Exception as e:
logger.debug(f"Error querying Gemeinde by BFS: {e}")
# Swiss Topo geocoding to get municipality from coordinates
if not municipality_name and centroid and canton:
try:
geocode_url = "https://api3.geo.admin.ch/rest/services/api/MapServer/identify"
params = {
"geometry": f"{centroid['x']},{centroid['y']}",
"geometryType": "esriGeometryPoint",
"layers": "all:ch.swisstopo.swissboundaries3d-gemeinde-flaeche.fill",
"tolerance": "0",
"returnGeometry": "false",
"sr": "2056",
"f": "json",
}
async with aiohttp.ClientSession() as session:
async with session.get(geocode_url, params=params) as resp:
if resp.status == 200:
data = await resp.json()
results = data.get("results", [])
if results:
attrs = results[0].get("attributes", {})
geo_name = attrs.get("name") or attrs.get("gemeindename") or attrs.get("label")
if geo_name:
municipality_name = connector._clean_municipality_name(str(geo_name))
logger.debug(f"Found municipality via Swiss Topo geocoding: {municipality_name}")
except Exception as e:
logger.debug(f"Error querying Swiss Topo geocoding: {e}")
# Expanded common municipalities fallback
if not municipality_name and bfsnr:
common_municipalities = {
261: "Zürich", 198: "Pfäffikon", 191: "Uster", 3203: "Winterthur",
351: "Bern", 2701: "Basel", 6621: "Genève", 5586: "Lausanne",
1061: "Luzern", 230: "St. Gallen", 5192: "Lugano", 1367: "Schwyz",
}
if bfsnr in common_municipalities:
municipality_name = common_municipalities[bfsnr]
logger.debug(f"Looked up municipality from common list: {municipality_name}")
elif canton and bfsnr:
municipality_name = f"{canton}-{bfsnr}"
logger.debug(f"Using fallback municipality: {municipality_name}")
# Final validation: Don't use EGRID as address
if full_address and full_address.startswith("CH") and len(full_address) == 14 and full_address[2:].isdigit():
# This is an EGRID, not an address
full_address = None
logger.debug("Removed EGRID from address field")
# Query Bauzone (wohnzone) from ÖREB WFS when requested
bauzone = None
has_geometry = geometry and (geometry.get("rings") or geometry.get("coordinates"))
if include_bauzone and canton and has_geometry and centroid:
try:
logger.debug(f"Querying zone information for parcel {attributes.get('label')} in canton {canton}")
oereb_connector = OerebWfsConnector()
zone_results = await oereb_connector.query_zone_layer(
egrid=attributes.get("egris_egrid", "") or "",
x=centroid["x"],
y=centroid["y"],
canton=canton,
geometry=geometry,
)
if zone_results and len(zone_results) > 0:
zone_attrs = zone_results[0].get("attributes", {})
typ_gde_abkuerzung = zone_attrs.get("typ_gde_abkuerzung")
if typ_gde_abkuerzung:
bauzone = typ_gde_abkuerzung
logger.debug(f"Found bauzone: {bauzone} for parcel {attributes.get('label')}")
except Exception as e:
logger.warning(f"Error querying zone information: {e}", exc_info=True)
# Build parcel info
parcel_info = {
"id": attributes.get("label") or attributes.get("number"),
"egrid": attributes.get("egris_egrid"),
"number": attributes.get("number"),
"name": attributes.get("name"),
"identnd": attributes.get("identnd"),
"canton": attributes.get("ak"),
"municipality_code": attributes.get("bfsnr"),
"municipality_name": municipality_name,
"address": full_address,
"plz": plz,
"perimeter": extracted_attributes.get("perimeter"),
"area_m2": area_m2,
"centroid": centroid,
"geoportal_url": attributes.get("geoportal_url"),
"realestate_type": attributes.get("realestate_type"),
"bauzone": bauzone,
}
# Build map view info
bbox = parcel_data.get("bbox", [])
map_view = {
"center": centroid,
"zoom_bounds": {
"min_x": bbox[0] if len(bbox) >= 4 else None,
"min_y": bbox[1] if len(bbox) >= 4 else None,
"max_x": bbox[2] if len(bbox) >= 4 else None,
"max_y": bbox[3] if len(bbox) >= 4 else None
},
"geometry_geojson": {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [
[[p["x"], p["y"]] for p in extracted_attributes["perimeter"]["punkte"]]
] if extracted_attributes.get("perimeter") else []
},
"properties": {
"id": parcel_info["id"],
"egrid": parcel_info["egrid"],
"number": parcel_info["number"]
}
}
}
# Build response
response_data = {
"parcel": parcel_info,
"map_view": map_view
}
# Fetch adjacent parcels if requested
if include_adjacent and parcel_data and parcel_data.get("geometry"):
try:
# Use the connector's method to find neighboring parcels by sampling along the boundary
# This ensures we find all parcels that actually touch the selected parcel
selected_parcel_id = parcel_info["id"]
adjacent_parcels_raw = await connector.find_neighboring_parcels(
parcel_data=parcel_data,
selected_parcel_id=selected_parcel_id,
sample_distance=20.0, # Sample every 20 meters (balanced for coverage and speed)
max_sample_points=30, # Allow up to 30 points to ensure all vertices are covered
max_neighbors=15, # Find up to 15 neighbors
max_concurrent=50 # Process up to 50 queries concurrently (maximum parallelization)
)
# Convert adjacent parcels to include GeoJSON geometry (optimized, minimal logging)
def convert_parcel_geometry(adj_parcel: Dict[str, Any]) -> Dict[str, Any]:
"""Convert a single adjacent parcel to include GeoJSON geometry."""
adj_parcel_with_geo = {
"id": adj_parcel["id"],
"egrid": adj_parcel.get("egrid"),
"number": adj_parcel.get("number"),
"perimeter": adj_parcel.get("perimeter")
}
# Convert geometry to GeoJSON format if available
adj_geometry = adj_parcel.get("geometry")
adj_perimeter = adj_parcel.get("perimeter")
if adj_geometry:
# Handle ESRI format (rings)
if "rings" in adj_geometry and adj_geometry["rings"]:
ring = adj_geometry["rings"][0] # Outer ring
coordinates = [[[p[0], p[1]] for p in ring]]
adj_parcel_with_geo["geometry_geojson"] = {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": coordinates
},
"properties": {
"id": adj_parcel["id"],
"egrid": adj_parcel.get("egrid"),
"number": adj_parcel.get("number")
}
}
# Handle GeoJSON format
elif adj_geometry.get("type") == "Polygon":
adj_parcel_with_geo["geometry_geojson"] = {
"type": "Feature",
"geometry": adj_geometry,
"properties": {
"id": adj_parcel["id"],
"egrid": adj_parcel.get("egrid"),
"number": adj_parcel.get("number")
}
}
# If no geometry_geojson was created but we have perimeter, create it from perimeter
if "geometry_geojson" not in adj_parcel_with_geo and adj_perimeter and adj_perimeter.get("punkte"):
punkte = adj_perimeter["punkte"]
coordinates = [[[p["x"], p["y"]] for p in punkte]]
adj_parcel_with_geo["geometry_geojson"] = {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": coordinates
},
"properties": {
"id": adj_parcel["id"],
"egrid": adj_parcel.get("egrid"),
"number": adj_parcel.get("number")
}
}
return adj_parcel_with_geo
# Convert all parcels in parallel (using list comprehension for speed)
adjacent_parcels = [convert_parcel_geometry(adj_parcel) for adj_parcel in adjacent_parcels_raw]
response_data["adjacent_parcels"] = adjacent_parcels
logger.info(f"Found {len(adjacent_parcels)} neighboring parcels for parcel {selected_parcel_id}")
except Exception as e:
logger.warning(f"Error fetching adjacent parcels: {e}", exc_info=True)
response_data["adjacent_parcels"] = []
return response_data
except HTTPException:
raise
except Exception as e:
logger.error(f"Error searching parcel: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error searching parcel: {str(e)}"
)
@router.post("/parcel/selection-summary", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def parcel_selection_summary(
request: Request,
body: Dict[str, Any] = Body(..., description="Parcel selection data"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Compute combined outline, total area, and Bauzone grouping for selected parcels.
Request body: { "parcels": [ { parcel, map_view, perimeter, geometry_geojson, ... } ] }
"""
try:
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
parcels = body.get("parcels", [])
if not parcels:
return {
"combined_outline_geojson": {"type": "Polygon", "coordinates": []},
"total_area_m2": 0.0,
"bauzonen": [],
}
result = compute_selection_summary(parcels)
logger.info(f"Computed selection summary for {len(parcels)} parcels, total area {result['total_area_m2']}")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error computing selection summary: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error computing selection summary: {str(e)}"
)
def _build_geometry_geojson(extracted: Dict[str, Any], parcel_info: Dict[str, Any]) -> Dict[str, Any]:
"""Build geometry_geojson from extracted perimeter for add-adjacent response."""
coords = []
if extracted.get("perimeter", {}).get("punkte"):
coords = [[[p["x"], p["y"]] for p in extracted["perimeter"]["punkte"]]]
return {
"type": "Feature",
"geometry": {"type": "Polygon", "coordinates": coords},
"properties": {"id": parcel_info["id"], "egrid": parcel_info["egrid"], "number": parcel_info["number"]},
}
@router.post("/parcel/add-adjacent", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def add_adjacent_parcel(
request: Request,
body: Dict[str, Any] = Body(..., description="Location and selected parcels"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Add an adjacent parcel to the selection. Validates that the parcel at the given
location touches the current selection.
Request body: { "location": { "x": number, "y": number }, "selected_parcels": [...] }
"""
try:
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
location = body.get("location")
selected_parcels = body.get("selected_parcels", [])
if not location or "x" not in location or "y" not in location:
raise HTTPException(status_code=400, detail="location with x,y required")
loc_str = f"{location['x']},{location['y']}"
connector = SwissTopoMapServerConnector()
parcel_data = await connector.search_parcel(loc_str)
if not parcel_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No parcel found at this location"
)
extracted = connector.extract_parcel_attributes(parcel_data)
attributes = parcel_data.get("attributes", {})
geometry = parcel_data.get("geometry", {})
area_m2 = None
centroid = None
if extracted.get("perimeter"):
perimeter = extracted["perimeter"]
points = perimeter.get("punkte", [])
if len(points) >= 3:
area = 0
for i in range(len(points)):
j = (i + 1) % len(points)
area += points[i]["x"] * points[j]["y"]
area -= points[j]["x"] * points[i]["y"]
area_m2 = abs(area / 2)
sum_x = sum(p["x"] for p in points)
sum_y = sum(p["y"] for p in points)
centroid = {"x": sum_x / len(points), "y": sum_y / len(points)}
parcel_info = {
"id": attributes.get("label") or attributes.get("number"),
"egrid": attributes.get("egris_egrid"),
"number": attributes.get("number"),
"name": attributes.get("name"),
"identnd": attributes.get("identnd"),
"canton": attributes.get("ak"),
"municipality_code": attributes.get("bfsnr"),
"municipality_name": None,
"address": None,
"plz": None,
"perimeter": extracted.get("perimeter"),
"area_m2": area_m2,
"centroid": centroid,
"geoportal_url": attributes.get("geoportal_url"),
"realestate_type": attributes.get("realestate_type"),
"bauzone": None,
}
map_view = {
"center": centroid,
"zoom_bounds": parcel_data.get("bbox", []) and {
"min_x": parcel_data["bbox"][0],
"min_y": parcel_data["bbox"][1],
"max_x": parcel_data["bbox"][2],
"max_y": parcel_data["bbox"][3],
} or None,
"geometry_geojson": _build_geometry_geojson(extracted, parcel_info),
}
new_parcel_response = {"parcel": parcel_info, "map_view": map_view}
if not is_parcel_adjacent_to_selection(new_parcel_response, selected_parcels):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Nur angrenzende Parzellen können hinzugefügt werden"
)
bbox = parcel_data.get("bbox", [])
map_view["zoom_bounds"] = {
"min_x": bbox[0], "min_y": bbox[1], "max_x": bbox[2], "max_y": bbox[3]
} if len(bbox) >= 4 else None
geocoded_address = parcel_data.get("geocoded_address")
if geocoded_address:
parcel_info["municipality_name"] = geocoded_address.get("municipality")
parcel_info["address"] = geocoded_address.get("full_address")
parcel_info["plz"] = geocoded_address.get("plz")
if centroid and attributes.get("ak"):
try:
oereb = OerebWfsConnector()
zone_results = await oereb.query_zone_layer(
egrid=attributes.get("egris_egrid", "") or "",
x=centroid["x"], y=centroid["y"],
canton=attributes.get("ak"),
geometry=geometry,
)
if zone_results and len(zone_results) > 0:
parcel_info["bauzone"] = zone_results[0].get("attributes", {}).get("typ_gde_abkuerzung")
except Exception as oe:
logger.debug(f"ÖREB zone query failed: {oe}")
return new_parcel_response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error adding adjacent parcel: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error adding adjacent parcel: {str(e)}"
)
@router.post("/projekt/{projekt_id}/add-parcel", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def add_parcel_to_project(
request: Request,
projekt_id: str = Path(..., description="Projekt ID"),
body: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Add a parcel to an existing project.
This endpoint can either:
1. Link an existing Parzelle to the Projekt
2. Create a new Parzelle from location data and link it
Request Body:
Option 1 - Link existing parcel:
{
"parcelId": "existing-parcel-id"
}
Option 2 - Create new parcel from location:
{
"location": "Hauptstrasse 42, 8000 Zuerich"
}
Option 3 - Create new parcel with custom data:
{
"parcelData": {
"label": "Parzelle 123",
"strasseNr": "Hauptstrasse 42",
"plz": "8000",
"bauzone": "W3",
...
}
}
Headers:
- X-CSRF-Token: CSRF token (required for security)
Returns:
{
"projekt": {...}, // Updated Projekt
"parzelle": {...} // Parcel that was added
}
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for POST /api/realestate/projekt/{projekt_id}/add-parcel from user {context.user.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Validate CSRF token format
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
try:
int(csrf_token, 16)
except ValueError:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Adding parcel to project {projekt_id} for user {context.user.id} (mandate: {context.mandateId})")
# Get interface
realEstateInterface = getRealEstateInterface(context.user, mandateId=str(context.mandateId) if context.mandateId else None)
# Fetch existing Projekt - use mandateId from context
recordFilter = {"id": projekt_id}
if context.mandateId:
recordFilter["mandateId"] = str(context.mandateId)
projekte = realEstateInterface.getProjekte(recordFilter=recordFilter)
if not projekte:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Projekt {projekt_id} not found"
)
projekt = projekte[0]
# Determine which option was used
parcel_id = body.get("parcelId")
location = body.get("location")
parcel_data_dict = body.get("parcelData")
parzelle = None
# Option 1: Link existing parcel
if parcel_id:
logger.info(f"Linking existing parcel {parcel_id}")
parcelFilter = {"id": parcel_id}
if context.mandateId:
parcelFilter["mandateId"] = str(context.mandateId)
parcels = realEstateInterface.getParzellen(recordFilter=parcelFilter)
if not parcels:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Parzelle {parcel_id} not found"
)
parzelle = parcels[0]
# Option 2: Create from location
elif location:
logger.info(f"Creating parcel from location: {location}")
# Initialize connector and search for parcel
connector = SwissTopoMapServerConnector()
parcel_data = await connector.search_parcel(location)
if not parcel_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No parcel found at location: {location}"
)
# Extract attributes
extracted_attributes = connector.extract_parcel_attributes(parcel_data)
attributes = parcel_data.get("attributes", {})
# Create Parzelle with mandateId from context
parzelle_create_data = {
"mandateId": str(context.mandateId) if context.mandateId else None,
"label": extracted_attributes.get("label") or attributes.get("number") or "Unknown",
"parzellenAliasTags": [attributes.get("egris_egrid")] if attributes.get("egris_egrid") else [],
"eigentuemerschaft": None,
"strasseNr": location if not location.replace(",", "").replace(".", "").replace(" ", "").isdigit() else None,
"plz": None,
"perimeter": extracted_attributes.get("perimeter"),
"baulinie": None,
"kontextGemeinde": None,
"bauzone": None,
"az": None,
"bz": None,
"vollgeschossZahl": None,
"anrechenbarDachgeschoss": None,
"anrechenbarUntergeschoss": None,
"gebaeudehoeheMax": None,
"regelnGrenzabstand": [],
"regelnMehrlaengenzuschlag": [],
"regelnMehrhoehenzuschlag": [],
"parzelleBebaut": None,
"parzelleErschlossen": None,
"parzelleHanglage": None,
"laermschutzzone": None,
"hochwasserschutzzone": None,
"grundwasserschutzzone": None,
"parzellenNachbarschaft": [],
"dokumente": [],
"kontextInformationen": [
Kontext(
thema="Swiss Topo Data",
inhalt=json.dumps({
"egrid": attributes.get("egris_egrid"),
"identnd": attributes.get("identnd"),
"canton": attributes.get("ak"),
"municipality_code": attributes.get("bfsnr"),
"geoportal_url": attributes.get("geoportal_url")
}, ensure_ascii=False)
)
]
}
parzelle_instance = Parzelle(**parzelle_create_data)
parzelle = realEstateInterface.createParzelle(parzelle_instance)
# Option 3: Create from custom data
elif parcel_data_dict:
logger.info(f"Creating parcel from custom data")
parcel_data_dict["mandateId"] = str(context.mandateId) if context.mandateId else None
parzelle_instance = Parzelle(**parcel_data_dict)
parzelle = realEstateInterface.createParzelle(parzelle_instance)
else:
raise ValueError("One of 'parcelId', 'location', or 'parcelData' is required")
# Add parcel to project
if parzelle not in projekt.parzellen:
projekt.parzellen.append(parzelle)
# Update projekt perimeter if needed (use first parcel's perimeter)
if not projekt.perimeter and parzelle.perimeter:
projekt.perimeter = parzelle.perimeter
# Update Projekt
updated_projekt = realEstateInterface.updateProjekt(projekt)
logger.info(f"Added Parzelle {parzelle.id} to Projekt {projekt_id}")
return {
"projekt": updated_projekt.model_dump(),
"parzelle": parzelle.model_dump()
}
except ValueError as e:
logger.error(f"Validation error in add_parcel_to_project: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Validation error: {str(e)}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error adding parcel to project: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error adding parcel to project: {str(e)}"
)