724 lines
34 KiB
Python
724 lines
34 KiB
Python
# Copyright (c) 2026 PowerOn AG
|
|
# All rights reserved.
|
|
"""
|
|
Real Estate feature — BZO (Bau- und Zonenordnung) information extraction.
|
|
|
|
Handles extraction of BZO information from PDF documents, filtering rules/zones/articles
|
|
by Bauzone, and generating AI summaries for building zone regulations.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
from fastapi import HTTPException, status
|
|
|
|
from modules.datamodels.datamodelUam import User
|
|
from .datamodelFeatureRealEstate import DokumentTyp
|
|
from modules.serviceCenter import getService
|
|
from modules.serviceCenter.context import ServiceCenterContext
|
|
from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface
|
|
from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface
|
|
from modules.features.realEstate.bzoDocumentRetriever import BZODocumentRetriever
|
|
from modules.features.realEstate.bzoExtraction import run_extraction, run_bzo_params_extraction
|
|
from modules.features.realEstate.parcelSelectionService import compute_selection_summary
|
|
from modules.features.realEstate.realEstateGemeindeService import (
|
|
ensure_single_gemeinde,
|
|
fetch_bzo_for_gemeinde,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def extract_bzo_information(
|
|
currentUser: User,
|
|
gemeinde: str,
|
|
bauzone: str,
|
|
mandateId: Optional[str] = None,
|
|
featureInstanceId: Optional[str] = None,
|
|
total_area_m2: Optional[float] = None,
|
|
parcels: Optional[List[Dict[str, Any]]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Extract BZO information from PDF documents for a specific Bauzone in a Gemeinde.
|
|
|
|
Retrieves BZO documents for the specified Gemeinde, extracts content using
|
|
the BZO extraction pipeline, filters by Bauzone, and uses AI to find relevant information.
|
|
When total_area_m2 or parcels are provided, runs Machbarkeitsstudie for structured output.
|
|
|
|
Args:
|
|
currentUser: Current authenticated user
|
|
gemeinde: Gemeinde name (e.g., "Zürich") or ID
|
|
bauzone: Bauzone code (e.g., "W3", "W2/30")
|
|
mandateId: Optional mandate ID for instance-scoped data (defaults to currentUser.mandateId)
|
|
featureInstanceId: Optional feature instance ID for instance-scoped data
|
|
total_area_m2: Optional total parcel area (m²) for Machbarkeitsstudie
|
|
parcels: Optional list of parcel dicts; total area computed via compute_selection_summary if not total_area_m2
|
|
|
|
Returns:
|
|
Dictionary containing:
|
|
- bauzone, gemeinde, extracted_content, ai_summary, relevant_rules, documents_processed
|
|
- machbarkeitsstudie: Structured Machbarkeitsstudie output when total_area_m2/parcels provided
|
|
"""
|
|
try:
|
|
_mandateId = mandateId or (str(currentUser.mandateId) if currentUser.mandateId else None)
|
|
logger.info(f"Extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}' (user: {currentUser.id}, mandate: {_mandateId})")
|
|
|
|
realEstateInterface = getRealEstateInterface(
|
|
currentUser, mandateId=_mandateId, featureInstanceId=featureInstanceId
|
|
)
|
|
componentInterface = getComponentInterface(
|
|
currentUser, mandateId=_mandateId, featureInstanceId=featureInstanceId
|
|
)
|
|
|
|
logger.debug(f"Attempting to retrieve Gemeinde '{gemeinde}' for mandate {_mandateId}")
|
|
gemeinde_obj = realEstateInterface.getGemeinde(gemeinde)
|
|
|
|
if not gemeinde_obj:
|
|
logger.debug(f"Gemeinde not found by ID, trying to search by label: {gemeinde}")
|
|
record_filter = {"label": gemeinde}
|
|
if _mandateId:
|
|
record_filter["mandateId"] = _mandateId
|
|
gemeinden_by_label = realEstateInterface.getGemeinden(
|
|
recordFilter=record_filter
|
|
)
|
|
if gemeinden_by_label and len(gemeinden_by_label) > 0:
|
|
gemeinde_obj = gemeinden_by_label[0]
|
|
logger.info(f"Found Gemeinde by label '{gemeinde}' with ID: {gemeinde_obj.id}")
|
|
|
|
if not gemeinde_obj and _mandateId and featureInstanceId:
|
|
logger.info(f"Gemeinde '{gemeinde}' not in DB - fetching from Swiss Topo (this Gemeinde only)")
|
|
gemeinde_obj = await ensure_single_gemeinde(
|
|
realEstateInterface, _mandateId, featureInstanceId, gemeinde_name=gemeinde
|
|
)
|
|
|
|
if not gemeinde_obj:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Gemeinde '{gemeinde}' not found or not accessible"
|
|
)
|
|
|
|
gemeinde_id = gemeinde_obj.id
|
|
|
|
bzo_documents = []
|
|
if gemeinde_obj.dokumente:
|
|
for doc in gemeinde_obj.dokumente:
|
|
if isinstance(doc, dict):
|
|
doc_id = doc.get("id")
|
|
doc_typ = doc.get("dokumentTyp")
|
|
else:
|
|
doc_id = doc.id if hasattr(doc, "id") else None
|
|
doc_typ = doc.dokumentTyp if hasattr(doc, "dokumentTyp") else None
|
|
|
|
if doc_typ:
|
|
if isinstance(doc_typ, DokumentTyp):
|
|
is_bzo = doc_typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]
|
|
elif isinstance(doc_typ, str):
|
|
is_bzo = doc_typ in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"]
|
|
else:
|
|
doc_typ_str = str(doc_typ)
|
|
is_bzo = doc_typ_str in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"]
|
|
|
|
if is_bzo:
|
|
if doc_id:
|
|
full_doc = realEstateInterface.getDokument(doc_id)
|
|
if full_doc:
|
|
bzo_documents.append(full_doc)
|
|
else:
|
|
logger.warning(f"Document {doc_id} referenced in Gemeinde but not found in database")
|
|
|
|
if not bzo_documents and _mandateId and featureInstanceId:
|
|
logger.info(f"No BZO documents for Gemeinde '{gemeinde_obj.label}' - fetching from web")
|
|
fetched = await fetch_bzo_for_gemeinde(
|
|
realEstateInterface, componentInterface, gemeinde_obj, _mandateId, featureInstanceId
|
|
)
|
|
if fetched:
|
|
gemeinde_obj = realEstateInterface.getGemeinde(gemeinde_obj.id)
|
|
bzo_documents = []
|
|
if gemeinde_obj and gemeinde_obj.dokumente:
|
|
for doc in gemeinde_obj.dokumente:
|
|
if isinstance(doc, dict):
|
|
doc_id = doc.get("id")
|
|
doc_typ = doc.get("dokumentTyp")
|
|
else:
|
|
doc_id = doc.id if hasattr(doc, "id") else None
|
|
doc_typ = doc.dokumentTyp if hasattr(doc, "dokumentTyp") else None
|
|
if doc_typ:
|
|
if isinstance(doc_typ, DokumentTyp):
|
|
is_bzo = doc_typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]
|
|
elif isinstance(doc_typ, str):
|
|
is_bzo = doc_typ in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"]
|
|
else:
|
|
is_bzo = str(doc_typ) in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"]
|
|
if is_bzo and doc_id:
|
|
full_doc = realEstateInterface.getDokument(doc_id)
|
|
if full_doc:
|
|
bzo_documents.append(full_doc)
|
|
|
|
if not bzo_documents:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"No BZO documents found for Gemeinde '{gemeinde_obj.label}'"
|
|
)
|
|
|
|
logger.info(f"Found {len(bzo_documents)} BZO document(s) for Gemeinde '{gemeinde_obj.label}'")
|
|
|
|
document_retriever = BZODocumentRetriever(realEstateInterface, componentInterface)
|
|
|
|
all_extracted_content = {
|
|
"articles": [],
|
|
"zones": [],
|
|
"rules": [],
|
|
"zone_parameter_tables": [],
|
|
"errors": [],
|
|
"warnings": []
|
|
}
|
|
documents_processed = []
|
|
|
|
for dokument in bzo_documents:
|
|
try:
|
|
logger.info(f"Processing document {dokument.id}: {dokument.label}")
|
|
|
|
pdf_bytes = document_retriever.retrieve_pdf_content(dokument)
|
|
if not pdf_bytes:
|
|
logger.warning(f"Could not retrieve PDF content for dokument {dokument.id}")
|
|
all_extracted_content["warnings"].append(
|
|
f"Could not retrieve PDF content for document '{dokument.label}'"
|
|
)
|
|
continue
|
|
|
|
extraction_result = run_extraction(
|
|
pdf_bytes=pdf_bytes,
|
|
pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}",
|
|
dokument_id=dokument.id
|
|
)
|
|
|
|
all_extracted_content["articles"].extend(extraction_result.get("articles", []))
|
|
all_extracted_content["zones"].extend(extraction_result.get("zones", []))
|
|
all_extracted_content["rules"].extend(extraction_result.get("rules", []))
|
|
all_extracted_content["zone_parameter_tables"].extend(extraction_result.get("zone_parameter_tables", []))
|
|
all_extracted_content["errors"].extend(extraction_result.get("errors", []))
|
|
all_extracted_content["warnings"].extend(extraction_result.get("warnings", []))
|
|
|
|
documents_processed.append({
|
|
"id": dokument.id,
|
|
"label": dokument.label,
|
|
"dokumentTyp": dokument.dokumentTyp.value if dokument.dokumentTyp else None
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing document {dokument.id}: {str(e)}", exc_info=True)
|
|
all_extracted_content["errors"].append(
|
|
f"Error processing document '{dokument.label}': {str(e)}"
|
|
)
|
|
continue
|
|
|
|
relevant_rules = filter_rules_by_bauzone(
|
|
all_extracted_content["rules"],
|
|
bauzone
|
|
)
|
|
logger.info(f"Extracting for Bauzone {bauzone}: {len(relevant_rules)} zone-specific rules, "
|
|
f"{len([t for t in all_extracted_content.get('zone_parameter_tables', []) if bauzone.upper() in str(t.get('zones', [])).upper()])} tables with zone data")
|
|
|
|
relevant_zones = filter_zones_by_bauzone(
|
|
all_extracted_content["zones"],
|
|
bauzone
|
|
)
|
|
|
|
relevant_articles = filter_articles_by_bauzone(
|
|
all_extracted_content.get("articles", []),
|
|
bauzone
|
|
)
|
|
|
|
_total_area_m2 = total_area_m2
|
|
if _total_area_m2 is None and parcels:
|
|
selection_summary = compute_selection_summary(parcels)
|
|
_total_area_m2 = selection_summary.get("total_area_m2") or 0.0
|
|
|
|
bzo_params_result = None
|
|
try:
|
|
ctx = ServiceCenterContext(user=currentUser, mandateId=_mandateId, featureInstanceId=featureInstanceId)
|
|
ai_service = getService("ai", ctx)
|
|
bzo_params_result = await run_bzo_params_extraction(
|
|
extracted_content=all_extracted_content,
|
|
bauzone=bauzone,
|
|
ai_service=ai_service,
|
|
gemeinde=gemeinde_obj.label,
|
|
relevant_rules=relevant_rules,
|
|
relevant_articles=relevant_articles,
|
|
total_area_m2=_total_area_m2,
|
|
)
|
|
except Exception as me:
|
|
logger.warning(f"BZO parameter extraction failed: {me}", exc_info=True)
|
|
all_extracted_content["warnings"] = all_extracted_content.get("warnings", []) + [
|
|
f"BZO-Parameter konnten nicht extrahiert werden: {str(me)}"
|
|
]
|
|
|
|
ai_summary = await generate_bauzone_ai_summary(
|
|
currentUser=currentUser,
|
|
bauzone=bauzone,
|
|
gemeinde=gemeinde_obj.label,
|
|
extracted_content=all_extracted_content,
|
|
relevant_rules=relevant_rules,
|
|
relevant_zones=relevant_zones,
|
|
mandateId=_mandateId,
|
|
featureInstanceId=featureInstanceId,
|
|
)
|
|
|
|
unified_summary = ai_summary
|
|
|
|
summary_lower = unified_summary.lower()
|
|
|
|
zones_mentioned = any(zone.get("zone_code", "").upper() in summary_lower for zone in relevant_zones)
|
|
if not zones_mentioned and relevant_zones:
|
|
unified_summary += "\n\n=== ZONENDEFINITIONEN ===\n"
|
|
for zone in relevant_zones:
|
|
zone_code = zone.get("zone_code", "")
|
|
zone_name = zone.get("zone_name", "")
|
|
zone_category = zone.get("zone_category", "")
|
|
geschosszahl = zone.get("geschosszahl")
|
|
gewerbeerleichterung = zone.get("gewerbeerleichterung", False)
|
|
page_num = zone.get("page", 0)
|
|
source_article = zone.get("source_article", "")
|
|
|
|
zone_info = f"{zone_code}: {zone_name}"
|
|
if zone_category:
|
|
zone_info += f"\nKategorie: {zone_category}"
|
|
if geschosszahl:
|
|
zone_info += f"\nGeschosszahl: {geschosszahl}"
|
|
if gewerbeerleichterung:
|
|
zone_info += "\nGewerbeerleichterung: Ja"
|
|
if source_article:
|
|
zone_info += f"\nQuelle: {source_article} (Seite {page_num})"
|
|
unified_summary += zone_info + "\n\n"
|
|
|
|
articles_mentioned = any(article.get("article_label", "") in summary_lower for article in relevant_articles)
|
|
if not articles_mentioned and relevant_articles:
|
|
unified_summary += "\n\n=== RELEVANTE ARTIKEL ===\n"
|
|
for article in relevant_articles:
|
|
article_label = article.get("article_label", "")
|
|
article_title = article.get("article_title", "")
|
|
article_text = article.get("text", "")
|
|
page_start = article.get("page_start", 0)
|
|
page_end = article.get("page_end", 0)
|
|
page_range = f"Seite {page_start}" if page_start == page_end else f"Seiten {page_start}-{page_end}"
|
|
|
|
unified_summary += f"{article_label}"
|
|
if article_title:
|
|
unified_summary += f": {article_title}"
|
|
unified_summary += f" ({page_range})\n"
|
|
if article_text:
|
|
preview = article_text[:500] + "..." if len(article_text) > 500 else article_text
|
|
unified_summary += f"{preview}\n\n"
|
|
|
|
return {
|
|
"bauzone": bauzone,
|
|
"gemeinde": {
|
|
"id": gemeinde_obj.id,
|
|
"label": gemeinde_obj.label,
|
|
"plz": gemeinde_obj.plz
|
|
},
|
|
"extracted_content": {
|
|
"zones": relevant_zones,
|
|
"rules": relevant_rules,
|
|
"articles": relevant_articles,
|
|
"zone_parameter_tables": _filter_tables_by_bauzone(
|
|
all_extracted_content.get("zone_parameter_tables", []),
|
|
bauzone
|
|
),
|
|
"total_zones": len(all_extracted_content.get("zones", [])),
|
|
"total_rules": len(all_extracted_content.get("rules", [])),
|
|
"total_articles": len(all_extracted_content.get("articles", [])),
|
|
"total_tables": len(all_extracted_content.get("zone_parameter_tables", []))
|
|
},
|
|
"ai_summary": unified_summary,
|
|
"relevant_rules": relevant_rules,
|
|
"documents_processed": documents_processed,
|
|
"errors": all_extracted_content.get("errors", []),
|
|
"warnings": all_extracted_content.get("warnings", []),
|
|
"machbarkeitsstudie": bzo_params_result,
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}': {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error extracting BZO information: {str(e)}"
|
|
)
|
|
|
|
|
|
def filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filter rules by Bauzone code. Only keeps rules from SINGLE-zone articles to avoid
|
|
wrong values (e.g. article with W2,W3,W5 has different values per zone - we cannot
|
|
associate a rule value with a specific zone from article text alone).
|
|
"""
|
|
relevant_rules = []
|
|
bauzone_upper = bauzone.upper()
|
|
|
|
def _zone_matches(z: str) -> bool:
|
|
zu = (z or "").upper().strip()
|
|
if not zu:
|
|
return False
|
|
if bauzone_upper in zu:
|
|
return True
|
|
if zu in bauzone_upper and len(zu) >= 2:
|
|
return True
|
|
return False
|
|
|
|
for rule in rules:
|
|
table_zones = rule.get("table_zones", []) or []
|
|
zone_raw = rule.get("zone_raw")
|
|
|
|
has_zone = bool(zone_raw) or bool(table_zones)
|
|
if not has_zone:
|
|
continue
|
|
|
|
if len(table_zones) > 1:
|
|
matches_all = all(_zone_matches(str(z)) for z in table_zones)
|
|
if not matches_all:
|
|
continue
|
|
|
|
matches = False
|
|
if zone_raw and _zone_matches(zone_raw):
|
|
matches = True
|
|
if not matches and table_zones:
|
|
for tz in table_zones:
|
|
if _zone_matches(str(tz)):
|
|
matches = True
|
|
break
|
|
if not matches:
|
|
ts = (rule.get("text_snippet") or "").upper()
|
|
if bauzone_upper in ts and len(table_zones) <= 1:
|
|
matches = True
|
|
|
|
if matches:
|
|
relevant_rules.append(rule)
|
|
|
|
logger.info(f"Filtered {len(relevant_rules)} rules for Bauzone {bauzone} from {len(rules)} total (multi-zone articles excluded)")
|
|
return relevant_rules
|
|
|
|
|
|
def filter_zones_by_bauzone(zones: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filter zones by Bauzone code.
|
|
|
|
Args:
|
|
zones: List of zone dictionaries from extraction
|
|
bauzone: Bauzone code to filter by
|
|
|
|
Returns:
|
|
Filtered list of zones that match the Bauzone
|
|
"""
|
|
relevant_zones = []
|
|
bauzone_upper = bauzone.upper()
|
|
|
|
for zone in zones:
|
|
zone_code = zone.get("zone_code", "")
|
|
if bauzone_upper in zone_code.upper():
|
|
relevant_zones.append(zone)
|
|
|
|
logger.info(f"Filtered {len(relevant_zones)} zones for Bauzone {bauzone} from {len(zones)} total zones")
|
|
return relevant_zones
|
|
|
|
|
|
def filter_articles_by_bauzone(articles: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filter articles that mention the Bauzone.
|
|
|
|
Args:
|
|
articles: List of article dictionaries from extraction
|
|
bauzone: Bauzone code to filter by
|
|
|
|
Returns:
|
|
Filtered list of articles that mention the Bauzone
|
|
"""
|
|
relevant_articles = []
|
|
bauzone_upper = bauzone.upper()
|
|
|
|
for article in articles:
|
|
text = article.get("text", "")
|
|
zone_raw = article.get("zone_raw")
|
|
|
|
text_matches = bauzone_upper in text.upper() if text else False
|
|
zone_matches = bauzone_upper in zone_raw.upper() if zone_raw else False
|
|
|
|
if text_matches or zone_matches:
|
|
relevant_articles.append(article)
|
|
|
|
logger.info(f"Filtered {len(relevant_articles)} articles for Bauzone {bauzone} from {len(articles)} total articles")
|
|
return relevant_articles
|
|
|
|
|
|
def _filter_tables_by_bauzone(tables: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filter zone-parameter tables to include only those containing the specified Bauzone.
|
|
|
|
Args:
|
|
tables: List of zone-parameter table dictionaries
|
|
bauzone: Bauzone code to filter by
|
|
|
|
Returns:
|
|
Filtered list of tables containing the Bauzone
|
|
"""
|
|
relevant_tables = []
|
|
bauzone_upper = bauzone.upper()
|
|
|
|
for table in tables:
|
|
zones = table.get("zones", [])
|
|
matching_zones = [z for z in zones if bauzone_upper in str(z).upper()]
|
|
|
|
if matching_zones:
|
|
filtered_table = {
|
|
"page": table.get("page"),
|
|
"zones": matching_zones,
|
|
"parameters": []
|
|
}
|
|
|
|
for param in table.get("parameters", []):
|
|
values_by_zone = param.get("values_by_zone", {})
|
|
filtered_values = {
|
|
zone: values_by_zone[zone]
|
|
for zone in matching_zones
|
|
if zone in values_by_zone
|
|
}
|
|
|
|
if filtered_values:
|
|
filtered_table["parameters"].append({
|
|
"parameter": param.get("parameter"),
|
|
"values_by_zone": filtered_values
|
|
})
|
|
|
|
if filtered_table["parameters"]:
|
|
relevant_tables.append(filtered_table)
|
|
|
|
logger.info(f"Filtered {len(relevant_tables)} tables for Bauzone {bauzone} from {len(tables)} total tables")
|
|
return relevant_tables
|
|
|
|
|
|
async def generate_bauzone_ai_summary(
|
|
currentUser: User,
|
|
bauzone: str,
|
|
gemeinde: str,
|
|
extracted_content: Dict[str, Any],
|
|
relevant_rules: List[Dict[str, Any]],
|
|
relevant_zones: List[Dict[str, Any]],
|
|
mandateId: Optional[str] = None,
|
|
featureInstanceId: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Use AI to generate a summary of relevant information for a Bauzone.
|
|
|
|
Args:
|
|
currentUser: Current authenticated user
|
|
bauzone: Bauzone code
|
|
gemeinde: Gemeinde name
|
|
extracted_content: All extracted content from PDFs
|
|
relevant_rules: Rules filtered by Bauzone
|
|
relevant_zones: Zones filtered by Bauzone
|
|
|
|
Returns:
|
|
AI-generated summary string
|
|
"""
|
|
try:
|
|
ctx = ServiceCenterContext(user=currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
|
aiService = getService("ai", ctx)
|
|
|
|
context_parts = []
|
|
|
|
zone_parameter_tables = extracted_content.get("zone_parameter_tables", [])
|
|
table_values_for_bauzone = []
|
|
|
|
if zone_parameter_tables:
|
|
context_parts.append("=== BUILDING REGULATIONS TABLE VALUES FOR BAUZONE (INCLUDE THESE EXACT VALUES IN YOUR SUMMARY) ===")
|
|
for table in zone_parameter_tables:
|
|
page_num = table.get("page", 0)
|
|
article_ref = table.get("article", "Unknown article")
|
|
zones_in_table = table.get("zones", [])
|
|
|
|
matching_zones = [z for z in zones_in_table if bauzone.upper() in str(z).upper()]
|
|
|
|
if matching_zones:
|
|
context_parts.append(f"\nTabelle aus {article_ref} (Seite {page_num}):")
|
|
|
|
for param in table.get("parameters", []):
|
|
param_name = param.get("parameter", "")
|
|
values_by_zone = param.get("values_by_zone", {})
|
|
|
|
for zone, values in values_by_zone.items():
|
|
if bauzone.upper() in zone.upper():
|
|
if isinstance(values, list) and len(values) > 0:
|
|
val_entry = values[0]
|
|
value = val_entry.get("value", "")
|
|
unit = val_entry.get("unit", "")
|
|
unit_str = f" {unit}" if unit else ""
|
|
|
|
formatted_param = param_name
|
|
if "Ausnützungsziffer" in param_name or "ausnützungsziffer" in param_name.lower():
|
|
formatted_param = "Ausnützungsziffer max."
|
|
elif "Vollgeschosse" in param_name or "vollgeschosse" in param_name.lower():
|
|
formatted_param = "Vollgeschosse max."
|
|
elif "Gebäudelänge" in param_name or "gebäudelänge" in param_name.lower():
|
|
formatted_param = "Gebäudelänge max."
|
|
elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Grundabstand" in param_name or "grundabstand" in param_name.lower()):
|
|
formatted_param = "Grenzabstand - Grundabstand min."
|
|
elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Mehrlängen" in param_name or "mehrlängen" in param_name.lower()):
|
|
formatted_param = "Grenzabstand - Mehrlängen-zuschlag"
|
|
elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Höchstmass" in param_name or "höchstmass" in param_name.lower() or "Höchstmaß" in param_name):
|
|
formatted_param = "Grenzabstand - Höchstmass max."
|
|
elif "Fassadenhöhen" in param_name or "fassadenhöhen" in param_name.lower():
|
|
formatted_param = "Fassadenhöhen max."
|
|
elif "Dachgeschosse" in param_name or "dachgeschosse" in param_name.lower():
|
|
formatted_param = "anrechenbare Dachgeschosse max."
|
|
elif "Attikageschoss" in param_name or "attikageschoss" in param_name.lower():
|
|
formatted_param = "anrechenbares Attikageschoss max."
|
|
elif "Untergeschoss" in param_name or "untergeschoss" in param_name.lower():
|
|
formatted_param = "anrechenbares Untergeschoss max."
|
|
|
|
table_values_for_bauzone.append({
|
|
"parameter": formatted_param,
|
|
"value": value,
|
|
"unit": unit_str,
|
|
"article": article_ref,
|
|
"page": page_num
|
|
})
|
|
context_parts.append(f" • {formatted_param}: {value}{unit_str} (Quelle: {article_ref}, Seite {page_num})")
|
|
|
|
if len(values) > 1:
|
|
for idx, val_entry in enumerate(values[1:], 1):
|
|
value_extra = val_entry.get("value", "")
|
|
unit_extra = val_entry.get("unit", "")
|
|
unit_str_extra = f" {unit_extra}" if unit_extra else ""
|
|
context_parts.append(f" (Alternative: {value_extra}{unit_str_extra})")
|
|
|
|
if relevant_zones:
|
|
context_parts.append("\n=== ZONE DEFINITIONS ===")
|
|
for zone in relevant_zones:
|
|
zone_code = zone.get("zone_code", "")
|
|
zone_name = zone.get("zone_name", "")
|
|
zone_category = zone.get("zone_category", "")
|
|
geschosszahl = zone.get("geschosszahl")
|
|
gewerbeerleichterung = zone.get("gewerbeerleichterung", False)
|
|
page_num = zone.get("page", 0)
|
|
source_article = zone.get("source_article", "")
|
|
|
|
zone_info = f"- {zone_code}: {zone_name}"
|
|
if zone_category:
|
|
zone_info += f" (Kategorie: {zone_category})"
|
|
if geschosszahl:
|
|
zone_info += f", Geschosszahl: {geschosszahl}"
|
|
if gewerbeerleichterung:
|
|
zone_info += ", Gewerbeerleichterung: Ja"
|
|
if source_article:
|
|
zone_info += f" - Quelle: {source_article} (Seite {page_num})"
|
|
context_parts.append(zone_info)
|
|
|
|
relevant_articles = filter_articles_by_bauzone(extracted_content.get("articles", []), bauzone)
|
|
if relevant_articles:
|
|
context_parts.append("\n=== RELEVANT ARTICLES (full content) ===")
|
|
for article in relevant_articles:
|
|
article_label = article.get("article_label", "")
|
|
article_title = article.get("article_title", "")
|
|
article_text = article.get("text", "")
|
|
page_start = article.get("page_start", 0)
|
|
page_end = article.get("page_end", 0)
|
|
page_range = f"Seite {page_start}" if page_start == page_end else f"Seiten {page_start}-{page_end}"
|
|
|
|
context_parts.append(f"\n{article_label}: {article_title or 'Kein Titel'}")
|
|
context_parts.append(f"Lage: {page_range}")
|
|
if len(article_text) > 1000:
|
|
context_parts.append(f"Inhalt: {article_text[:1000]}...")
|
|
else:
|
|
context_parts.append(f"Inhalt: {article_text}")
|
|
|
|
if relevant_rules:
|
|
table_parameter_names = set()
|
|
for table in zone_parameter_tables:
|
|
for param in table.get("parameters", []):
|
|
param_name = param.get("parameter", "").lower()
|
|
table_parameter_names.add(param_name)
|
|
|
|
unique_rules = []
|
|
for rule in relevant_rules[:15]:
|
|
rule_type = rule.get("rule_type", "").lower()
|
|
if not any(tp in rule_type for tp in table_parameter_names):
|
|
unique_rules.append(rule)
|
|
|
|
if unique_rules:
|
|
context_parts.append("\n=== ADDITIONAL BUILDING REGULATIONS (from text) ===")
|
|
for rule in unique_rules[:8]:
|
|
rule_type = rule.get("rule_type", "")
|
|
value_numeric = rule.get("value_numeric")
|
|
value_text = rule.get("value_text", "")
|
|
unit = rule.get("unit", "")
|
|
page_num = rule.get("page", 0)
|
|
|
|
rule_desc = f"- {rule_type}: "
|
|
if value_numeric is not None:
|
|
rule_desc += f"{value_numeric}"
|
|
if unit:
|
|
rule_desc += f" {unit}"
|
|
else:
|
|
rule_desc += value_text
|
|
rule_desc += f" (Seite {page_num})"
|
|
|
|
context_parts.append(rule_desc)
|
|
|
|
context = "\n".join(context_parts)
|
|
|
|
prompt = f"""
|
|
Analyze the following building zone (Bauzone) information extracted from BZO (Bau- und Zonenordnung) documents for {gemeinde}, specifically for Bauzone {bauzone}.
|
|
|
|
Extracted Content:
|
|
{context}
|
|
|
|
CRITICAL INSTRUCTIONS:
|
|
1. You MUST include ALL actual values from the tables in your summary - do NOT just say "see tables on page X"
|
|
2. List ALL parameters with their actual values: Ausnützungsziffer, Vollgeschosse, Gebäudelänge, Grenzabstand (Grundabstand, Mehrlängen-zuschlag, Höchstmass), Fassadenhöhen, etc.
|
|
3. Integrate zone definitions and article information INTO the summary text - do NOT create separate sections
|
|
4. Always cite WHERE each piece of information was found (article number and page number)
|
|
5. Combine everything into ONE unified, flowing summary - no separate sections for zones/articles
|
|
6. Be comprehensive - include all relevant details from zones, articles, and tables
|
|
7. Format as a single, well-structured German text document
|
|
|
|
Please provide a comprehensive, unified summary that includes:
|
|
|
|
1. General description of Bauzone {bauzone}:
|
|
- Zone category (Wohnzonen, Zentrumszonen, etc.)
|
|
- Geschosszahl (number of full storeys)
|
|
- Gewerbeerleichterung status (Ja/Nein)
|
|
- Where defined (article and page number)
|
|
|
|
2. ALL building regulations with ACTUAL VALUES from tables (you MUST include the exact values):
|
|
- Ausnützungsziffer max.: [ACTUAL PERCENTAGE VALUE]% (from article, page)
|
|
- Vollgeschosse max.: [ACTUAL NUMBER] (from article, page)
|
|
- anrechenbare Dachgeschosse max.: [ACTUAL NUMBER] (from article, page)
|
|
- anrechenbares Attikageschoss max.: [ACTUAL NUMBER] (from article, page)
|
|
- anrechenbares Untergeschoss max.: [ACTUAL NUMBER] (from article, page)
|
|
- Gebäudelänge max.: [ACTUAL VALUE] m (from article, page)
|
|
- Grenzabstand - Grundabstand min.: [ACTUAL VALUE] m (from article, page)
|
|
- Grenzabstand - Mehrlängen-zuschlag: [ACTUAL FRACTION] (from article, page)
|
|
- Grenzabstand - Höchstmass max.: [ACTUAL VALUE] m (from article, page)
|
|
- Fassadenhöhen max.: [ACTUAL VALUE] m (from article, page, include footnote values if present)
|
|
|
|
3. Zone definitions: Integrate information about where this zone is defined (which articles mention it, with page numbers)
|
|
|
|
4. Relevant articles: Integrate key content from relevant articles naturally into the summary, citing article numbers and page numbers
|
|
|
|
5. Special conditions: Any special requirements or exceptions mentioned in articles
|
|
|
|
CRITICAL: You MUST include the actual numeric values from the tables in your summary. Do NOT say "see tables" - list the actual values. Format everything as ONE unified, flowing German text document without separate sections. Integrate zones and articles naturally into the narrative.
|
|
"""
|
|
|
|
logger.info(f"Generating AI summary for Bauzone {bauzone} in {gemeinde}")
|
|
ai_response = await aiService.callAiPlanning(
|
|
prompt=prompt,
|
|
debugType="bzo_summary"
|
|
)
|
|
|
|
return ai_response.strip()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating AI summary: {str(e)}", exc_info=True)
|
|
return f"Summary generation failed: {str(e)}. Found {len(relevant_rules)} relevant rules and {len(relevant_zones)} zones for Bauzone {bauzone}."
|