feat: extract bzo information

This commit is contained in:
Ida Dittrich 2026-01-27 14:21:19 +01:00
parent 238ff61740
commit c2f2ed3b14
4 changed files with 1811 additions and 37 deletions

13
app.py
View file

@ -286,6 +286,7 @@ instanceLabel = APP_CONFIG.get("APP_ENV_LABEL")
async def lifespan(app: FastAPI):
logger.info("Application is starting up")
<<<<<<< HEAD
# --- Register RBAC catalog for features (moved here from loadFeatureRouters for single-pass loading) ---
try:
from modules.security.rbacCatalog import getCatalogService
@ -294,6 +295,18 @@ async def lifespan(app: FastAPI):
registerAllFeaturesInCatalog(catalogService)
except Exception as e:
logger.warning(f"Could not register feature RBAC catalog: {e}")
=======
# Bootstrap database if needed (creates initial users, mandates, roles, etc.)
# This must happen before getting root interface
from modules.security.rootAccess import getRootDbAppConnector
from modules.interfaces.interfaceBootstrap import initBootstrap
rootDb = getRootDbAppConnector()
try:
initBootstrap(rootDb)
logger.info("Bootstrap check completed")
except Exception as e:
logger.warning(f"Bootstrap check failed (may already be initialized): {str(e)}")
>>>>>>> f6f41e2 (feat: extract bzo information)
# Get event user for feature lifecycle (system-level user for background operations)
rootInterface = getRootInterface()

View file

@ -103,6 +103,9 @@ class BZOExtractionState(TypedDict):
rule_candidates: List[Dict[str, Any]]
parsed_rules: List[Dict[str, Any]]
# Zone-parameter tables (structured table data mapping zones to parameters)
zone_parameter_tables: List[Dict[str, Any]]
# Processing metadata
errors: List[str]
warnings: List[str]
@ -516,6 +519,322 @@ def confidence_scoring(state: BZOExtractionState) -> BZOExtractionState:
return state
def extract_zone_parameter_tables(state: BZOExtractionState) -> BZOExtractionState:
"""
Extract structured zone-parameter mappings from tables.
Parses tables that map building zones to parameter values (e.g., Ausnützungsziffer,
Vollgeschosse, Gebäudelänge, Grenzabstand, Fassadenhöhen).
"""
try:
import re
tables = []
# Find all table blocks
table_blocks = [
block for block in state.get("classified_blocks", [])
if block.get("block_type") == "table"
]
logger.info(f"Found {len(table_blocks)} table blocks to process")
for table_block in table_blocks:
block_dict = table_block.get("block", {})
text = block_dict.get("text", "")
page = block_dict.get("page", 0)
if not text or len(text.strip()) < 20: # Skip very short blocks
continue
# Try to parse table structure
# Look for zone codes in header row (W2/30, W3/50, W4/70G*, etc.)
zone_pattern = r'\b([WLIZK]\d+(?:/\d+)?(?:G\*?)?)\b'
lines = text.split('\n')
# Find header row (usually contains zone codes)
header_row_idx = None
zone_columns = []
for idx, line in enumerate(lines):
# Look for multiple zone codes in a line (header row)
zone_matches = re.findall(zone_pattern, line, re.IGNORECASE)
if len(zone_matches) >= 3: # At least 3 zones indicates header row
header_row_idx = idx
zone_columns = zone_matches
logger.debug(f"Found header row at line {idx} with zones: {zone_columns}")
break
if not header_row_idx or not zone_columns:
# Try alternative: look for common table patterns
# Check if text contains parameter names and zone codes
parameter_keywords = [
r'Ausnützungsziffer',
r'Vollgeschosse',
r'Dachgeschosse',
r'Attikageschoss',
r'Untergeschoss',
r'Gebäudelänge',
r'Grenzabstand',
r'Fassadenhöhen',
r'Grundabstand',
r'Mehrlängen',
r'Höchstmass'
]
has_parameters = any(re.search(kw, text, re.IGNORECASE) for kw in parameter_keywords)
has_zones = len(re.findall(zone_pattern, text, re.IGNORECASE)) >= 3
if has_parameters and has_zones:
# Extract all zones from entire text
all_zones = re.findall(zone_pattern, text, re.IGNORECASE)
zone_columns = list(dict.fromkeys(all_zones)) # Remove duplicates, preserve order
header_row_idx = 0 # Assume header is at start
logger.debug(f"Found zones in table text: {zone_columns}")
if not zone_columns:
continue
# Parse parameter rows
table_data = {
"page": page,
"zones": zone_columns,
"parameters": [],
"source_text": text[:500], # Store first 500 chars for reference
"article": None # Will be set if found
}
# Extract parameters and their values
# Look for parameter rows (a), b), c), etc. or parameter names
parameter_row_patterns = [
r'^[a-g]\)\s+(.+?)(?:\s+max\.|min\.|:)?', # a) Parameter name
r'^(Ausnützungsziffer|Vollgeschosse|Dachgeschosse|Attikageschoss|Untergeschoss|Gebäudelänge|Grenzabstand|Fassadenhöhen|Grundabstand|Mehrlängen|Höchstmass|Höchstmaß)',
]
# Parse each line after header
start_idx = header_row_idx + 1 if header_row_idx is not None else 0
current_parameter = None
current_subparameter = None
parameter_values = {}
subparameter_values = {}
# Track which article/section this table belongs to
article_context = None
for block in state.get("classified_blocks", []):
if block.get("block", {}).get("page") == page:
article_label = block.get("article_label")
if article_label:
article_context = article_label
break
for line_idx in range(start_idx, len(lines)):
line = lines[line_idx].strip()
if not line:
continue
# Check if this is a parameter row (main parameter like a), b), c))
is_parameter_row = False
parameter_name = None
for pattern in parameter_row_patterns:
match = re.match(pattern, line, re.IGNORECASE)
if match:
is_parameter_row = True
parameter_name = match.group(1).strip()
# Clean up parameter name
parameter_name = re.sub(r'\s+max\.?\s*$', '', parameter_name, flags=re.IGNORECASE)
parameter_name = re.sub(r'\s+min\.?\s*$', '', parameter_name, flags=re.IGNORECASE)
break
# Check for sub-parameters (like "Grundabstand min.", "Mehrlängen-zuschlag", "Höchstmass max.")
is_subparameter = False
subparameter_name = None
if not is_parameter_row:
subparameter_patterns = [
r'^(Grundabstand|Mehrlängen|Höchstmass|Höchstmaß|Fassadenhöhen)\s*(min\.|max\.)?',
r'^(anrechenbare\s+Dachgeschosse|anrechenbares\s+Attikageschoss|anrechenbares\s+Untergeschoss)',
]
for pattern in subparameter_patterns:
match = re.search(pattern, line, re.IGNORECASE)
if match:
is_subparameter = True
subparameter_name = match.group(1).strip()
if match.lastindex > 1 and match.group(2):
subparameter_name += f" {match.group(2).strip()}"
break
if is_parameter_row and parameter_name:
# Save previous parameter if exists
if current_parameter and parameter_values:
param_entry = {
"parameter": current_parameter,
"values_by_zone": parameter_values.copy()
}
if article_context:
param_entry["article"] = article_context
table_data["parameters"].append(param_entry)
# Start new parameter
current_parameter = parameter_name
current_subparameter = None
parameter_values = {}
subparameter_values = {}
continue
if is_subparameter and subparameter_name:
# Save previous subparameter if exists
if current_subparameter and subparameter_values:
if current_parameter:
# Add subparameter as nested parameter
param_entry = {
"parameter": f"{current_parameter} - {current_subparameter}",
"values_by_zone": subparameter_values.copy()
}
if article_context:
param_entry["article"] = article_context
table_data["parameters"].append(param_entry)
current_subparameter = subparameter_name
subparameter_values = {}
continue
# Try to extract values for current parameter or subparameter
target_values = subparameter_values if current_subparameter else parameter_values
if current_parameter or current_subparameter:
# Improved parsing: try to align values with zone columns
# Split line by multiple spaces or tabs (table column separators)
line_parts = re.split(r'\s{2,}|\t', line)
line_parts = [p.strip() for p in line_parts if p.strip()]
# Look for numeric values with units
numeric_pattern = r'(\d+(?:\.\d+)?)\s*(%|m|Geschoss|Geschosse|Geschosse\s+max\.?|Geschoss\s+max\.?)?'
all_matches = list(re.finditer(numeric_pattern, line, re.IGNORECASE))
# Also look for fractions (like 1/3)
fraction_pattern = r'(\d+/\d+)'
fraction_matches = list(re.finditer(fraction_pattern, line, re.IGNORECASE))
# Combine all matches, preserving position
all_value_matches = []
for m in all_matches:
value = m.group(1)
unit = m.group(2) if m.lastindex > 1 else None
all_value_matches.append((m.start(), m.group(0), value, unit))
for m in fraction_matches:
all_value_matches.append((m.start(), m.group(0), m.group(0), None))
all_value_matches.sort(key=lambda x: x[0])
# Try to map values to zones
# Strategy: if we have roughly the same number of values as zones, map 1:1
# Otherwise, try to distribute evenly
if len(all_value_matches) > 0 and len(zone_columns) > 0:
if len(all_value_matches) == len(zone_columns):
# Perfect 1:1 mapping
for zone_idx, zone in enumerate(zone_columns):
if zone_idx < len(all_value_matches):
_, full_match, value, unit = all_value_matches[zone_idx]
if zone not in target_values:
target_values[zone] = []
target_values[zone].append({
"value": value,
"unit": unit.strip() if unit else None,
"raw_text": line[:200],
"line_number": line_idx
})
elif len(all_value_matches) >= len(zone_columns):
# More values than zones - try to group
values_per_zone = len(all_value_matches) / len(zone_columns)
for zone_idx, zone in enumerate(zone_columns):
start_idx = int(zone_idx * values_per_zone)
end_idx = int((zone_idx + 1) * values_per_zone)
zone_values = all_value_matches[start_idx:end_idx]
if zone_values:
if zone not in target_values:
target_values[zone] = []
# Take the first (or most relevant) value
_, full_match, value, unit = zone_values[0]
target_values[zone].append({
"value": value,
"unit": unit.strip() if unit else None,
"raw_text": line[:200],
"line_number": line_idx
})
else:
# Fewer values than zones - try to match by position
# Use line_parts if they align better
if len(line_parts) >= len(zone_columns) * 0.7:
# Try to extract values from line_parts
for zone_idx, zone in enumerate(zone_columns):
if zone_idx < len(line_parts):
part = line_parts[zone_idx]
# Extract numeric value from this part
num_match = re.search(r'(\d+(?:\.\d+)?)', part)
if num_match:
value = num_match.group(1)
unit_match = re.search(r'(%|m|Geschoss)', part, re.IGNORECASE)
unit = unit_match.group(0) if unit_match else None
if zone not in target_values:
target_values[zone] = []
target_values[zone].append({
"value": value,
"unit": unit,
"raw_text": part[:100],
"line_number": line_idx
})
else:
# Fallback: assign to first zone(s)
for idx, (_, full_match, value, unit) in enumerate(all_value_matches):
if idx < len(zone_columns):
zone = zone_columns[idx]
if zone not in target_values:
target_values[zone] = []
target_values[zone].append({
"value": value,
"unit": unit.strip() if unit else None,
"raw_text": line[:200],
"line_number": line_idx
})
# Save last parameter/subparameter
if current_subparameter and subparameter_values:
if current_parameter:
param_entry = {
"parameter": f"{current_parameter} - {current_subparameter}",
"values_by_zone": subparameter_values.copy()
}
if article_context:
param_entry["article"] = article_context
table_data["parameters"].append(param_entry)
if current_parameter and parameter_values:
param_entry = {
"parameter": current_parameter,
"values_by_zone": parameter_values.copy()
}
if article_context:
param_entry["article"] = article_context
table_data["parameters"].append(param_entry)
if table_data["parameters"]:
tables.append(table_data)
logger.info(f"Extracted table with {len(table_data['zones'])} zones and {len(table_data['parameters'])} parameters from page {page}")
# Update state
existing_tables = state.get("zone_parameter_tables", [])
state["zone_parameter_tables"] = existing_tables + tables
logger.info(f"Extracted {len(tables)} zone-parameter tables total")
return state
except Exception as e:
logger.error(f"Error extracting zone-parameter tables: {e}", exc_info=True)
state["errors"] = state.get("errors", []) + [f"Table extraction error: {str(e)}"]
return state
# ===== Graph Construction =====
@ -529,6 +848,7 @@ def create_bzo_extraction_graph():
workflow.add_node("classify_text_block", classify_text_block)
workflow.add_node("assemble_articles", assemble_articles)
workflow.add_node("detect_zone_changes", detect_zone_changes)
workflow.add_node("extract_zone_parameter_tables", extract_zone_parameter_tables)
workflow.add_node("detect_rule_candidates", detect_rule_candidates)
workflow.add_node("parse_rule_values", parse_rule_values)
workflow.add_node("assign_zone_and_scope", assign_zone_and_scope)
@ -539,7 +859,8 @@ def create_bzo_extraction_graph():
workflow.add_edge("extract_pdf_text", "classify_text_block")
workflow.add_edge("classify_text_block", "assemble_articles")
workflow.add_edge("assemble_articles", "detect_zone_changes")
workflow.add_edge("detect_zone_changes", "detect_rule_candidates")
workflow.add_edge("detect_zone_changes", "extract_zone_parameter_tables")
workflow.add_edge("extract_zone_parameter_tables", "detect_rule_candidates")
workflow.add_edge("detect_rule_candidates", "parse_rule_values")
workflow.add_edge("parse_rule_values", "assign_zone_and_scope")
workflow.add_edge("assign_zone_and_scope", "confidence_scoring")
@ -583,6 +904,7 @@ def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None
"zones": [],
"rule_candidates": [],
"parsed_rules": [],
"zone_parameter_tables": [],
"errors": [],
"warnings": []
}
@ -621,10 +943,13 @@ def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None
key=lambda x: (x.get("rule_type", ""), x.get("page", 0))
)
zone_parameter_tables = final_state.get("zone_parameter_tables", [])
return {
"articles": articles,
"zones": zones,
"rules": rules,
"zone_parameter_tables": zone_parameter_tables,
"errors": final_state.get("errors", []),
"warnings": final_state.get("warnings", [])
}

View file

@ -294,10 +294,14 @@ from .datamodelFeatureRealEstate import (
Gemeinde,
Kanton,
Land,
DokumentTyp,
)
from modules.services import getInterface as getServices
from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface
from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface
from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface
from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector
from modules.features.realEstate.bzoDocumentRetriever import BZODocumentRetriever
from modules.features.realEstate.bzoExtractionLangGraph import run_extraction
logger = logging.getLogger(__name__)
@ -2329,3 +2333,671 @@ async def create_project_with_parcel_data(
logger.error(f"Error creating project with parcel data: {str(e)}", exc_info=True)
raise
# ===== BZO Information Extraction for Parcels =====
async def extract_bzo_information(
currentUser: User,
gemeinde: str,
bauzone: str,
) -> Dict[str, Any]:
"""
Extract BZO information from PDF documents for a specific Bauzone in a Gemeinde.
Retrieves BZO documents for the specified Gemeinde, extracts content using
langgraph workflow, filters by Bauzone, and uses AI to find relevant information.
Args:
currentUser: Current authenticated user
gemeinde: Gemeinde name (e.g., "Zürich") or ID
bauzone: Bauzone code (e.g., "W3", "W2/30")
Returns:
Dictionary containing:
- bauzone: Bauzone code
- gemeinde: Gemeinde information
- extracted_content: Extracted content from PDFs
- ai_summary: AI-generated summary
- relevant_rules: Rules filtered by Bauzone
- documents_processed: List of document IDs processed
Raises:
HTTPException: If Gemeinde not found or no documents found
"""
try:
logger.info(f"Extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}' (user: {currentUser.id})")
# Get interfaces
realEstateInterface = getRealEstateInterface(currentUser)
componentInterface = getComponentInterface(currentUser)
# Get Gemeinde - try by ID first, then by label
logger.debug(f"Attempting to retrieve Gemeinde '{gemeinde}' for mandate {currentUser.mandateId}")
gemeinde_obj = realEstateInterface.getGemeinde(gemeinde)
# If not found by ID, try searching by label
if not gemeinde_obj:
logger.debug(f"Gemeinde not found by ID, trying to search by label: {gemeinde}")
gemeinden_by_label = realEstateInterface.getGemeinden(
recordFilter={"label": gemeinde}
)
if gemeinden_by_label and len(gemeinden_by_label) > 0:
gemeinde_obj = gemeinden_by_label[0]
logger.info(f"Found Gemeinde by label '{gemeinde}' with ID: {gemeinde_obj.id}")
else:
# Try to get all gemeinden to see what's available (for debugging)
all_gemeinden = realEstateInterface.getGemeinden(recordFilter=None)
logger.warning(f"Gemeinde '{gemeinde}' not found by ID or label. Total Gemeinden in database: {len(all_gemeinden)}")
if all_gemeinden:
sample_ids = [g.id for g in all_gemeinden[:5]]
sample_labels = [g.label for g in all_gemeinden[:5] if g.label]
logger.warning(f"Sample Gemeinde IDs: {sample_ids}")
if sample_labels:
logger.warning(f"Sample Gemeinde labels: {sample_labels}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Gemeinde '{gemeinde}' not found or not accessible"
)
gemeinde_id = gemeinde_obj.id
# Get BZO documents directly from Gemeinde's dokumente field
bzo_documents = []
if gemeinde_obj.dokumente:
for doc in gemeinde_obj.dokumente:
# Handle both dict and object formats
if isinstance(doc, dict):
doc_id = doc.get("id")
doc_typ = doc.get("dokumentTyp")
else:
doc_id = doc.id if hasattr(doc, "id") else None
doc_typ = doc.dokumentTyp if hasattr(doc, "dokumentTyp") else None
# Check if it's a BZO document type
if doc_typ:
# Handle enum, string, or dict formats
if isinstance(doc_typ, DokumentTyp):
is_bzo = doc_typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]
elif isinstance(doc_typ, str):
is_bzo = doc_typ in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"]
else:
doc_typ_str = str(doc_typ)
is_bzo = doc_typ_str in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"]
if is_bzo:
# Get full document object
if doc_id:
full_doc = realEstateInterface.getDokument(doc_id)
if full_doc:
bzo_documents.append(full_doc)
else:
logger.warning(f"Document {doc_id} referenced in Gemeinde but not found in database")
if not bzo_documents:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No BZO documents found for Gemeinde '{gemeinde_obj.label}'"
)
logger.info(f"Found {len(bzo_documents)} BZO document(s) for Gemeinde '{gemeinde_obj.label}'")
# Initialize document retriever
document_retriever = BZODocumentRetriever(realEstateInterface, componentInterface)
# Extract content from all documents
all_extracted_content = {
"articles": [],
"zones": [],
"rules": [],
"zone_parameter_tables": [],
"errors": [],
"warnings": []
}
documents_processed = []
for dokument in bzo_documents:
try:
logger.info(f"Processing document {dokument.id}: {dokument.label}")
# Retrieve PDF content
pdf_bytes = document_retriever.retrieve_pdf_content(dokument)
if not pdf_bytes:
logger.warning(f"Could not retrieve PDF content for dokument {dokument.id}")
all_extracted_content["warnings"].append(
f"Could not retrieve PDF content for document '{dokument.label}'"
)
continue
# Run extraction using langgraph workflow
extraction_result = run_extraction(
pdf_bytes=pdf_bytes,
pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}",
dokument_id=dokument.id
)
# Combine results
all_extracted_content["articles"].extend(extraction_result.get("articles", []))
all_extracted_content["zones"].extend(extraction_result.get("zones", []))
all_extracted_content["rules"].extend(extraction_result.get("rules", []))
all_extracted_content["zone_parameter_tables"].extend(extraction_result.get("zone_parameter_tables", []))
all_extracted_content["errors"].extend(extraction_result.get("errors", []))
all_extracted_content["warnings"].extend(extraction_result.get("warnings", []))
documents_processed.append({
"id": dokument.id,
"label": dokument.label,
"dokumentTyp": dokument.dokumentTyp.value if dokument.dokumentTyp else None
})
except Exception as e:
logger.error(f"Error processing document {dokument.id}: {str(e)}", exc_info=True)
all_extracted_content["errors"].append(
f"Error processing document '{dokument.label}': {str(e)}"
)
continue
# Filter rules by Bauzone
relevant_rules = filter_rules_by_bauzone(
all_extracted_content["rules"],
bauzone
)
# Filter zones by Bauzone
relevant_zones = filter_zones_by_bauzone(
all_extracted_content["zones"],
bauzone
)
# Filter articles that mention the Bauzone
relevant_articles = filter_articles_by_bauzone(
all_extracted_content.get("articles", []),
bauzone
)
# Use AI to generate summary and find additional information
ai_summary = await generate_bauzone_ai_summary(
currentUser=currentUser,
bauzone=bauzone,
gemeinde=gemeinde_obj.label,
extracted_content=all_extracted_content,
relevant_rules=relevant_rules,
relevant_zones=relevant_zones
)
# Build unified summary that includes zones and articles
unified_summary = ai_summary
# Append zone and article information to the summary if not already included
# The AI should have integrated this, but we add it as backup if needed
summary_lower = unified_summary.lower()
# Check if zones are mentioned in summary
zones_mentioned = any(zone.get("zone_code", "").upper() in summary_lower for zone in relevant_zones)
if not zones_mentioned and relevant_zones:
unified_summary += "\n\n=== ZONENDEFINITIONEN ===\n"
for zone in relevant_zones:
zone_code = zone.get("zone_code", "")
zone_name = zone.get("zone_name", "")
zone_category = zone.get("zone_category", "")
geschosszahl = zone.get("geschosszahl")
gewerbeerleichterung = zone.get("gewerbeerleichterung", False)
page_num = zone.get("page", 0)
source_article = zone.get("source_article", "")
zone_info = f"{zone_code}: {zone_name}"
if zone_category:
zone_info += f"\nKategorie: {zone_category}"
if geschosszahl:
zone_info += f"\nGeschosszahl: {geschosszahl}"
if gewerbeerleichterung:
zone_info += "\nGewerbeerleichterung: Ja"
if source_article:
zone_info += f"\nQuelle: {source_article} (Seite {page_num})"
unified_summary += zone_info + "\n\n"
# Check if articles are mentioned in summary
articles_mentioned = any(article.get("article_label", "") in summary_lower for article in relevant_articles)
if not articles_mentioned and relevant_articles:
unified_summary += "\n\n=== RELEVANTE ARTIKEL ===\n"
for article in relevant_articles:
article_label = article.get("article_label", "")
article_title = article.get("article_title", "")
article_text = article.get("text", "")
page_start = article.get("page_start", 0)
page_end = article.get("page_end", 0)
page_range = f"Seite {page_start}" if page_start == page_end else f"Seiten {page_start}-{page_end}"
unified_summary += f"{article_label}"
if article_title:
unified_summary += f": {article_title}"
unified_summary += f" ({page_range})\n"
# Include first 500 chars of article text
if article_text:
preview = article_text[:500] + "..." if len(article_text) > 500 else article_text
unified_summary += f"{preview}\n\n"
return {
"bauzone": bauzone,
"gemeinde": {
"id": gemeinde_obj.id,
"label": gemeinde_obj.label,
"plz": gemeinde_obj.plz
},
"extracted_content": {
"zones": relevant_zones,
"rules": relevant_rules,
"articles": relevant_articles,
"zone_parameter_tables": _filter_tables_by_bauzone(
all_extracted_content.get("zone_parameter_tables", []),
bauzone
),
"total_zones": len(all_extracted_content.get("zones", [])),
"total_rules": len(all_extracted_content.get("rules", [])),
"total_articles": len(all_extracted_content.get("articles", [])),
"total_tables": len(all_extracted_content.get("zone_parameter_tables", []))
},
"ai_summary": unified_summary,
"relevant_rules": relevant_rules,
"documents_processed": documents_processed,
"errors": all_extracted_content.get("errors", []),
"warnings": all_extracted_content.get("warnings", [])
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}': {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error extracting BZO information: {str(e)}"
)
def filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""
Filter rules by Bauzone code.
Args:
rules: List of rule dictionaries from extraction
bauzone: Bauzone code to filter by (e.g., "W3", "W2/30")
Returns:
Filtered list of rules that match the Bauzone
"""
relevant_rules = []
bauzone_upper = bauzone.upper()
for rule in rules:
# Check if rule has zone information
zone_raw = rule.get("zone_raw")
table_zones = rule.get("table_zones", [])
# Check if rule matches Bauzone
matches = False
# Direct zone match
if zone_raw and bauzone_upper in zone_raw.upper():
matches = True
# Table zone match
if not matches and table_zones:
for table_zone in table_zones:
if bauzone_upper in str(table_zone).upper():
matches = True
break
# Check text snippet for Bauzone mention
if not matches:
text_snippet = rule.get("text_snippet", "")
if bauzone_upper in text_snippet.upper():
matches = True
if matches:
relevant_rules.append(rule)
logger.info(f"Filtered {len(relevant_rules)} rules for Bauzone {bauzone} from {len(rules)} total rules")
return relevant_rules
def filter_zones_by_bauzone(zones: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""
Filter zones by Bauzone code.
Args:
zones: List of zone dictionaries from extraction
bauzone: Bauzone code to filter by
Returns:
Filtered list of zones that match the Bauzone
"""
relevant_zones = []
bauzone_upper = bauzone.upper()
for zone in zones:
zone_code = zone.get("zone_code", "")
if bauzone_upper in zone_code.upper():
relevant_zones.append(zone)
logger.info(f"Filtered {len(relevant_zones)} zones for Bauzone {bauzone} from {len(zones)} total zones")
return relevant_zones
def filter_articles_by_bauzone(articles: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""
Filter articles that mention the Bauzone.
Args:
articles: List of article dictionaries from extraction
bauzone: Bauzone code to filter by
Returns:
Filtered list of articles that mention the Bauzone
"""
relevant_articles = []
bauzone_upper = bauzone.upper()
for article in articles:
text = article.get("text", "")
zone_raw = article.get("zone_raw")
# Check if article mentions the Bauzone
text_matches = bauzone_upper in text.upper() if text else False
zone_matches = bauzone_upper in zone_raw.upper() if zone_raw else False
if text_matches or zone_matches:
relevant_articles.append(article)
logger.info(f"Filtered {len(relevant_articles)} articles for Bauzone {bauzone} from {len(articles)} total articles")
return relevant_articles
def _filter_tables_by_bauzone(tables: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]:
"""
Filter zone-parameter tables to include only those containing the specified Bauzone.
Args:
tables: List of zone-parameter table dictionaries
bauzone: Bauzone code to filter by
Returns:
Filtered list of tables containing the Bauzone
"""
relevant_tables = []
bauzone_upper = bauzone.upper()
for table in tables:
zones = table.get("zones", [])
# Check if any zone in the table matches the Bauzone
matching_zones = [z for z in zones if bauzone_upper in str(z).upper()]
if matching_zones:
# Create filtered version with only relevant zone columns
filtered_table = {
"page": table.get("page"),
"zones": matching_zones,
"parameters": []
}
# Filter parameters to only include values for matching zones
for param in table.get("parameters", []):
values_by_zone = param.get("values_by_zone", {})
filtered_values = {
zone: values_by_zone[zone]
for zone in matching_zones
if zone in values_by_zone
}
if filtered_values:
filtered_table["parameters"].append({
"parameter": param.get("parameter"),
"values_by_zone": filtered_values
})
if filtered_table["parameters"]:
relevant_tables.append(filtered_table)
logger.info(f"Filtered {len(relevant_tables)} tables for Bauzone {bauzone} from {len(tables)} total tables")
return relevant_tables
async def generate_bauzone_ai_summary(
currentUser: User,
bauzone: str,
gemeinde: str,
extracted_content: Dict[str, Any],
relevant_rules: List[Dict[str, Any]],
relevant_zones: List[Dict[str, Any]]
) -> str:
"""
Use AI to generate a summary of relevant information for a Bauzone.
Args:
currentUser: Current authenticated user
bauzone: Bauzone code
gemeinde: Gemeinde name
extracted_content: All extracted content from PDFs
relevant_rules: Rules filtered by Bauzone
relevant_zones: Zones filtered by Bauzone
Returns:
AI-generated summary string
"""
try:
# Initialize AI service
services = getServices(currentUser, workflow=None)
aiService = services.ai
# Build context from extracted content, prioritizing zone-parameter tables
context_parts = []
# Extract and format zone-parameter table values for the specific Bauzone
zone_parameter_tables = extracted_content.get("zone_parameter_tables", [])
table_values_for_bauzone = []
if zone_parameter_tables:
context_parts.append("=== BUILDING REGULATIONS TABLE VALUES FOR BAUZONE (INCLUDE THESE EXACT VALUES IN YOUR SUMMARY) ===")
for table in zone_parameter_tables:
page_num = table.get("page", 0)
article_ref = table.get("article", "Unknown article")
zones_in_table = table.get("zones", [])
# Check if this table contains the requested Bauzone
matching_zones = [z for z in zones_in_table if bauzone.upper() in str(z).upper()]
if matching_zones:
context_parts.append(f"\nTabelle aus {article_ref} (Seite {page_num}):")
for param in table.get("parameters", []):
param_name = param.get("parameter", "")
values_by_zone = param.get("values_by_zone", {})
# Extract values for the requested Bauzone
for zone, values in values_by_zone.items():
if bauzone.upper() in zone.upper():
if isinstance(values, list) and len(values) > 0:
# Take the first value (most relevant)
val_entry = values[0]
value = val_entry.get("value", "")
unit = val_entry.get("unit", "")
unit_str = f" {unit}" if unit else ""
# Format parameter name nicely
formatted_param = param_name
if "Ausnützungsziffer" in param_name or "ausnützungsziffer" in param_name.lower():
formatted_param = "Ausnützungsziffer max."
elif "Vollgeschosse" in param_name or "vollgeschosse" in param_name.lower():
formatted_param = "Vollgeschosse max."
elif "Gebäudelänge" in param_name or "gebäudelänge" in param_name.lower():
formatted_param = "Gebäudelänge max."
elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Grundabstand" in param_name or "grundabstand" in param_name.lower()):
formatted_param = "Grenzabstand - Grundabstand min."
elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Mehrlängen" in param_name or "mehrlängen" in param_name.lower()):
formatted_param = "Grenzabstand - Mehrlängen-zuschlag"
elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Höchstmass" in param_name or "höchstmass" in param_name.lower() or "Höchstmaß" in param_name):
formatted_param = "Grenzabstand - Höchstmass max."
elif "Fassadenhöhen" in param_name or "fassadenhöhen" in param_name.lower():
formatted_param = "Fassadenhöhen max."
elif "Dachgeschosse" in param_name or "dachgeschosse" in param_name.lower():
formatted_param = "anrechenbare Dachgeschosse max."
elif "Attikageschoss" in param_name or "attikageschoss" in param_name.lower():
formatted_param = "anrechenbares Attikageschoss max."
elif "Untergeschoss" in param_name or "untergeschoss" in param_name.lower():
formatted_param = "anrechenbares Untergeschoss max."
table_values_for_bauzone.append({
"parameter": formatted_param,
"value": value,
"unit": unit_str,
"article": article_ref,
"page": page_num
})
context_parts.append(f"{formatted_param}: {value}{unit_str} (Quelle: {article_ref}, Seite {page_num})")
# Also check for multiple values (e.g., Fassadenhöhen with footnote values)
if len(values) > 1:
for idx, val_entry in enumerate(values[1:], 1):
value_extra = val_entry.get("value", "")
unit_extra = val_entry.get("unit", "")
unit_str_extra = f" {unit_extra}" if unit_extra else ""
context_parts.append(f" (Alternative: {value_extra}{unit_str_extra})")
# Add zone information with all details
if relevant_zones:
context_parts.append("\n=== ZONE DEFINITIONS ===")
for zone in relevant_zones:
zone_code = zone.get("zone_code", "")
zone_name = zone.get("zone_name", "")
zone_category = zone.get("zone_category", "")
geschosszahl = zone.get("geschosszahl")
gewerbeerleichterung = zone.get("gewerbeerleichterung", False)
page_num = zone.get("page", 0)
source_article = zone.get("source_article", "")
zone_info = f"- {zone_code}: {zone_name}"
if zone_category:
zone_info += f" (Kategorie: {zone_category})"
if geschosszahl:
zone_info += f", Geschosszahl: {geschosszahl}"
if gewerbeerleichterung:
zone_info += ", Gewerbeerleichterung: Ja"
if source_article:
zone_info += f" - Quelle: {source_article} (Seite {page_num})"
context_parts.append(zone_info)
# Add article information with full text previews
relevant_articles = filter_articles_by_bauzone(extracted_content.get("articles", []), bauzone)
if relevant_articles:
context_parts.append("\n=== RELEVANT ARTICLES (full content) ===")
for article in relevant_articles:
article_label = article.get("article_label", "")
article_title = article.get("article_title", "")
article_text = article.get("text", "")
page_start = article.get("page_start", 0)
page_end = article.get("page_end", 0)
page_range = f"Seite {page_start}" if page_start == page_end else f"Seiten {page_start}-{page_end}"
context_parts.append(f"\n{article_label}: {article_title or 'Kein Titel'}")
context_parts.append(f"Lage: {page_range}")
# Include full article text (truncated if too long)
if len(article_text) > 1000:
context_parts.append(f"Inhalt: {article_text[:1000]}...")
else:
context_parts.append(f"Inhalt: {article_text}")
# Add relevant rules (only if not already covered in tables)
if relevant_rules:
# Filter out rules that are likely already in tables
table_parameter_names = set()
for table in zone_parameter_tables:
for param in table.get("parameters", []):
param_name = param.get("parameter", "").lower()
table_parameter_names.add(param_name)
unique_rules = []
for rule in relevant_rules[:15]:
rule_type = rule.get("rule_type", "").lower()
# Skip if this rule type is likely in tables
if not any(tp in rule_type for tp in table_parameter_names):
unique_rules.append(rule)
if unique_rules:
context_parts.append("\n=== ADDITIONAL BUILDING REGULATIONS (from text) ===")
for rule in unique_rules[:8]:
rule_type = rule.get("rule_type", "")
value_numeric = rule.get("value_numeric")
value_text = rule.get("value_text", "")
unit = rule.get("unit", "")
page_num = rule.get("page", 0)
rule_desc = f"- {rule_type}: "
if value_numeric is not None:
rule_desc += f"{value_numeric}"
if unit:
rule_desc += f" {unit}"
else:
rule_desc += value_text
rule_desc += f" (Seite {page_num})"
context_parts.append(rule_desc)
context = "\n".join(context_parts)
# Create AI prompt with explicit instructions to include all table values
prompt = f"""
Analyze the following building zone (Bauzone) information extracted from BZO (Bau- und Zonenordnung) documents for {gemeinde}, specifically for Bauzone {bauzone}.
Extracted Content:
{context}
CRITICAL INSTRUCTIONS:
1. You MUST include ALL actual values from the tables in your summary - do NOT just say "see tables on page X"
2. List ALL parameters with their actual values: Ausnützungsziffer, Vollgeschosse, Gebäudelänge, Grenzabstand (Grundabstand, Mehrlängen-zuschlag, Höchstmass), Fassadenhöhen, etc.
3. Integrate zone definitions and article information INTO the summary text - do NOT create separate sections
4. Always cite WHERE each piece of information was found (article number and page number)
5. Combine everything into ONE unified, flowing summary - no separate sections for zones/articles
6. Be comprehensive - include all relevant details from zones, articles, and tables
7. Format as a single, well-structured German text document
Please provide a comprehensive, unified summary that includes:
1. General description of Bauzone {bauzone}:
- Zone category (Wohnzonen, Zentrumszonen, etc.)
- Geschosszahl (number of full storeys)
- Gewerbeerleichterung status (Ja/Nein)
- Where defined (article and page number)
2. ALL building regulations with ACTUAL VALUES from tables (you MUST include the exact values):
- Ausnützungsziffer max.: [ACTUAL PERCENTAGE VALUE]% (from article, page)
- Vollgeschosse max.: [ACTUAL NUMBER] (from article, page)
- anrechenbare Dachgeschosse max.: [ACTUAL NUMBER] (from article, page)
- anrechenbares Attikageschoss max.: [ACTUAL NUMBER] (from article, page)
- anrechenbares Untergeschoss max.: [ACTUAL NUMBER] (from article, page)
- Gebäudelänge max.: [ACTUAL VALUE] m (from article, page)
- Grenzabstand - Grundabstand min.: [ACTUAL VALUE] m (from article, page)
- Grenzabstand - Mehrlängen-zuschlag: [ACTUAL FRACTION] (from article, page)
- Grenzabstand - Höchstmass max.: [ACTUAL VALUE] m (from article, page)
- Fassadenhöhen max.: [ACTUAL VALUE] m (from article, page, include footnote values if present)
3. Zone definitions: Integrate information about where this zone is defined (which articles mention it, with page numbers)
4. Relevant articles: Integrate key content from relevant articles naturally into the summary, citing article numbers and page numbers
5. Special conditions: Any special requirements or exceptions mentioned in articles
CRITICAL: You MUST include the actual numeric values from the tables in your summary. Do NOT say "see tables" - list the actual values. Format everything as ONE unified, flowing German text document without separate sections. Integrate zones and articles naturally into the narrative.
"""
# Call AI service
logger.info(f"Generating AI summary for Bauzone {bauzone} in {gemeinde}")
ai_response = await aiService.callAiPlanning(
prompt=prompt,
debugType="bzo_summary"
)
return ai_response.strip()
except Exception as e:
logger.error(f"Error generating AI summary: {str(e)}", exc_info=True)
# Return a basic summary if AI fails
return f"Summary generation failed: {str(e)}. Found {len(relevant_rules)} relevant rules and {len(relevant_zones)} zones for Bauzone {bauzone}."

View file

@ -5,7 +5,12 @@ Implements stateless endpoints for real estate database operations with AI-power
import logging
import json
import re
import requests
import aiohttp
import asyncio
import ssl
from urllib.parse import urljoin, urlparse
from typing import Optional, Dict, Any, List, Union
from fastapi import APIRouter, HTTPException, Depends, Body, Request, Query, Path, status
@ -36,21 +41,33 @@ from .datamodelFeatureRealEstate import (
Land,
Kontext,
StatusProzess,
DokumentTyp,
)
# Import interfaces
from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface
from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface
# Import feature logic for AI-powered commands
from modules.features.realEstate.mainRealEstate import (
processNaturalLanguageCommand,
create_project_with_parcel_data,
extract_bzo_information,
)
# Import Swiss Topo MapServer connector for testing
from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector
from modules.connectors.connectorOerebWfs import OerebWfsConnector
# Import Tavily connector for BZO document search
from modules.aicore.aicorePluginTavily import AiTavily
# Import helper functions from scraping route
from modules.routes.routeRealEstateScraping import (
_get_language_from_kanton,
_get_bzo_search_query,
)
# Import attribute utilities for model schema
from modules.shared.attributeUtils import getModelAttributeDefinitions
@ -1006,6 +1023,7 @@ async def search_parcel(
request: Request,
location: str = Query(..., description="Either coordinates as 'x,y' (LV95) or address string"),
include_adjacent: bool = Query(False, description="Include adjacent parcels information"),
fetch_documents: bool = Query(True, description="If true, fetch BZO documents for the Gemeinde (default: true)"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
@ -1017,10 +1035,12 @@ async def search_parcel(
- Administrative context (canton, municipality)
- Link to official cadastral map
- Optional: Adjacent parcels
- Optional: Gemeinde information and BZO documents (if fetch_documents=true)
Query Parameters:
- location: Either coordinates as "x,y" (LV95/EPSG:2056) or address string
- include_adjacent: If true, fetches information about adjacent parcels (slower)
- fetch_documents: If true, checks for and fetches Bauzonenverordnung (BZO) documents for the Gemeinde (default: true, slower)
Headers:
- X-CSRF-Token: CSRF token (required for security)
@ -1029,6 +1049,7 @@ async def search_parcel(
- GET /api/realestate/parcel/search?location=2600000,1200000
- GET /api/realestate/parcel/search?location=Bundesplatz 3, 3003 Bern
- GET /api/realestate/parcel/search?location=Bundesplatz 3, 3003 Bern&include_adjacent=true
- GET /api/realestate/parcel/search?location=Bundesplatz 3, 3003 Bern&fetch_documents=true
Returns:
{
@ -1047,14 +1068,30 @@ async def search_parcel(
"area_m2": 1234.56,
"centroid": {"x": 2600000, "y": 1200000},
"geoportal_url": "https://...",
"realestate_type": null
"realestate_type": null,
"bauzone": "W3"
},
"map_view": {
"center": {"x": 2600000, "y": 1200000},
"zoom_bounds": {"min_x": ..., "max_x": ..., "min_y": ..., "max_y": ...},
"geometry_geojson": {...}
},
"adjacent_parcels": [...] // Optional (only if include_adjacent=true)
"adjacent_parcels": [...], // Optional (only if include_adjacent=true)
"gemeinde": { // Optional (only if fetch_documents=true)
"id": "...",
"label": "Bern",
"plz": "3011"
},
"documents": [ // Optional (only if fetch_documents=true and documents found/created)
{
"id": "...",
"label": "BZO Bern",
"dokumentTyp": "gemeindeBzoAktuell",
"dokumentReferenz": "...",
"quelle": "https://...",
"mimeType": "application/pdf"
}
]
}
"""
try:
@ -1114,14 +1151,48 @@ async def search_parcel(
municipality_name = None
full_address = None
plz = None
canton = attributes.get("ak") # Extract canton early so it's always available
# First, try to use geocoded address info if available (more accurate than centroid query)
# Debug: Log all available attributes to understand what we have
logger.debug(f"Parcel attributes keys: {list(attributes.keys())}")
logger.debug(f"Sample parcel attributes: {dict(list(attributes.items())[:10])}") # First 10 items
# First, check if municipality is directly in parcel attributes (ggdename or dplzname)
# These fields are often present in the parcel data itself from Swiss Topo
municipality_from_attrs = attributes.get("ggdename") or attributes.get("dplzname") or attributes.get("gemeinde") or attributes.get("gemeindename")
if municipality_from_attrs:
# Use connector's cleaning method to remove canton suffix
municipality_name = connector._clean_municipality_name(str(municipality_from_attrs))
logger.info(f"Found municipality '{municipality_name}' in parcel attributes (from {municipality_from_attrs})")
# Also check extracted_attributes for municipality
if not municipality_name:
municipality_from_extracted = extracted_attributes.get("kontextGemeinde")
if municipality_from_extracted:
municipality_name = str(municipality_from_extracted)
logger.info(f"Found municipality '{municipality_name}' in extracted attributes")
# Also check for PLZ in parcel attributes
if not plz:
plz_from_attrs = attributes.get("dplz4") or attributes.get("plz")
if plz_from_attrs:
plz = str(plz_from_attrs).strip()
logger.debug(f"Found PLZ '{plz}' in parcel attributes")
# Try to use geocoded address info if available (more accurate than centroid query)
geocoded_address = parcel_data.get('geocoded_address')
if geocoded_address:
full_address = geocoded_address.get('full_address')
plz = geocoded_address.get('plz')
municipality_name = geocoded_address.get('municipality')
logger.debug(f"Using geocoded address: {full_address}")
if not full_address:
full_address = geocoded_address.get('full_address')
if not plz:
plz = geocoded_address.get('plz')
if not municipality_name:
geocoded_municipality = geocoded_address.get('municipality')
if geocoded_municipality:
municipality_name = connector._clean_municipality_name(geocoded_municipality)
logger.debug(f"Found municipality '{municipality_name}' from geocoded address")
if full_address:
logger.debug(f"Using geocoded address: {full_address}")
# If geocoded address not available, try to get address by querying the address layer
# Use query coordinates (where user clicked/geocoded) instead of parcel centroid
@ -1148,9 +1219,14 @@ async def search_parcel(
# Extract address using connector's helper method
address_info = connector._extract_address_from_building_attrs(addr_attrs)
full_address = address_info.get('full_address')
plz = address_info.get('plz')
municipality_name = address_info.get('municipality')
if not full_address:
full_address = address_info.get('full_address')
if not plz:
plz = address_info.get('plz')
if not municipality_name:
municipality_name = address_info.get('municipality')
if municipality_name:
logger.debug(f"Found municipality '{municipality_name}' from building layer")
if full_address:
logger.debug(f"Constructed address: {full_address}")
@ -1163,34 +1239,128 @@ async def search_parcel(
full_address = location
logger.debug(f"Using location as address: {full_address}")
# Try to extract municipality name from BFSNR if not found
# Try to extract municipality name from address string if not found yet
if not municipality_name and full_address:
# Parse address string to extract municipality name
# Format is usually: "Street Number, PLZ Municipality" or "Street Number PLZ Municipality"
# Examples: "Forchstrasse 6c, 8610 Uster" or "Bundesplatz 3 3011 Bern"
# Try to match PLZ followed by municipality name
# PLZ is typically 4 digits, municipality name follows
plz_municipality_match = re.search(r'\b(\d{4})\s+([A-ZÄÖÜ][a-zäöüß\s-]+)', full_address)
if plz_municipality_match:
extracted_plz = plz_municipality_match.group(1)
extracted_municipality = plz_municipality_match.group(2).strip()
# Remove trailing commas or other punctuation
extracted_municipality = re.sub(r'[,;\.]+$', '', extracted_municipality).strip()
if extracted_municipality:
municipality_name = extracted_municipality
if not plz:
plz = extracted_plz
logger.debug(f"Extracted municipality '{municipality_name}' and PLZ '{plz}' from address string")
# Try to extract municipality name from BFSNR if still not found
if not municipality_name:
# Common Swiss municipalities lookup (you can expand this)
bfsnr = attributes.get("bfsnr")
canton = attributes.get("ak", "")
# Basic municipality lookup for common codes
common_municipalities = {
351: "Bern",
261: "Zürich",
6621: "Genève",
2701: "Basel",
5586: "Lausanne",
1061: "Luzern",
3203: "Winterthur",
230: "St. Gallen",
5192: "Lugano",
351: "Bern",
1367: "Schwyz"
}
logger.info(f"Attempting to resolve municipality name for BFS number {bfsnr} in canton {canton}")
if bfsnr and bfsnr in common_municipalities:
municipality_name = common_municipalities[bfsnr]
logger.debug(f"Looked up municipality: {municipality_name}")
else:
# Fallback: Use canton + code
municipality_name = f"{canton}-{bfsnr}" if canton and bfsnr else "Unknown"
logger.debug(f"Using fallback municipality: {municipality_name}")
# Try to query database for Gemeinde by BFS number
if bfsnr and canton:
try:
realEstateInterface = getRealEstateInterface(currentUser)
# Query Gemeinde by BFS number (stored in kontextInformationen)
gemeinden = realEstateInterface.getGemeinden(
recordFilter={"mandateId": currentUser.mandateId}
)
logger.debug(f"Found {len(gemeinden)} Gemeinden in database, searching for BFS {bfsnr}")
for gemeinde in gemeinden:
# Check kontextInformationen for BFS number
for kontext in gemeinde.kontextInformationen:
try:
kontext_data = json.loads(kontext.inhalt) if isinstance(kontext.inhalt, str) else kontext.inhalt
if isinstance(kontext_data, dict):
kontext_bfsnr = kontext_data.get("bfs_nummer") or kontext_data.get("bfsnr") or kontext_data.get("municipality_code")
if str(kontext_bfsnr) == str(bfsnr):
municipality_name = gemeinde.label
logger.info(f"Found Gemeinde '{municipality_name}' by BFS number {bfsnr} in database")
break
except (json.JSONDecodeError, AttributeError) as e:
logger.debug(f"Error parsing kontext: {e}")
continue
if municipality_name:
break
except Exception as e:
logger.warning(f"Error querying Gemeinde by BFS number: {e}", exc_info=True)
# If still not found, try to use Swiss Topo geocoding API to get municipality name from coordinates
# This is more reliable than BFS number lookup since coordinates are exact
if not municipality_name and centroid:
try:
# Use Swiss Topo geocoding to get municipality name from coordinates
geocode_url = "https://api3.geo.admin.ch/rest/services/api/MapServer/identify"
params = {
"geometry": f"{centroid['x']},{centroid['y']}",
"geometryType": "esriGeometryPoint",
"layers": "all:ch.swisstopo.swissboundaries3d-gemeinde-flaeche.fill",
"tolerance": "0",
"returnGeometry": "false",
"sr": "2056"
}
import aiohttp
import ssl
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector_aiohttp = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector_aiohttp) as session:
async with session.get(geocode_url, params=params) as resp:
if resp.status == 200:
data = await resp.json()
results = data.get("results", [])
if results:
result_attrs = results[0].get("attributes", {})
geocoded_municipality = result_attrs.get("name") or result_attrs.get("gemeindename") or result_attrs.get("label")
if geocoded_municipality:
municipality_name = connector._clean_municipality_name(str(geocoded_municipality))
logger.info(f"Found municipality '{municipality_name}' via Swiss Topo geocoding API (from {geocoded_municipality})")
except Exception as e:
logger.debug(f"Error querying Swiss Topo geocoding API: {e}", exc_info=True)
# If still not found, try expanded Swiss municipalities lookup
if not municipality_name and bfsnr:
# Expanded Swiss municipalities lookup by BFS number
# Source: https://www.bfs.admin.ch/bfs/de/home/grundlagen/agvch.html
common_municipalities = {
# Zürich (ZH)
261: "Zürich",
198: "Pfäffikon", # ZH-198 is Pfäffikon
191: "Uster", # Uster is ZH-191
3203: "Winterthur",
# Bern (BE)
351: "Bern",
# Basel (BS)
2701: "Basel",
# Genève (GE)
6621: "Genève",
# Vaud (VD)
5586: "Lausanne",
# Luzern (LU)
1061: "Luzern",
# St. Gallen (SG)
230: "St. Gallen",
# Ticino (TI)
5192: "Lugano",
# Schwyz (SZ)
1367: "Schwyz",
}
if bfsnr in common_municipalities:
municipality_name = common_municipalities[bfsnr]
logger.info(f"Looked up municipality '{municipality_name}' from common list for BFS {bfsnr}")
# If still not found, log warning
if not municipality_name:
logger.warning(f"Could not determine municipality name for BFS number {bfsnr} in canton {canton}. Municipality name will be None.")
# Final validation: Don't use EGRID as address
if full_address and full_address.startswith("CH") and len(full_address) == 14 and full_address[2:].isdigit():
@ -1200,7 +1370,6 @@ async def search_parcel(
# Query zone information (wohnzone/bauzone) from ÖREB WFS
bauzone = None
canton = attributes.get("ak")
# Check if geometry has actual data (either rings or coordinates)
has_geometry = geometry and (geometry.get("rings") or geometry.get("coordinates"))
if canton and has_geometry:
@ -1373,6 +1542,486 @@ async def search_parcel(
logger.warning(f"Error fetching adjacent parcels: {e}", exc_info=True)
response_data["adjacent_parcels"] = []
# Fetch BZO documents if requested
gemeinde_info = None
bzo_documents = []
logger.debug(f"Document fetch check: fetch_documents={fetch_documents}, municipality_name={municipality_name}, canton={canton}")
if fetch_documents and municipality_name and canton:
logger.info(f"Fetching BZO documents for Gemeinde '{municipality_name}' in canton '{canton}'")
try:
# Get interfaces
realEstateInterface = getRealEstateInterface(currentUser)
componentInterface = getComponentInterface(currentUser)
logger.debug(f"Interfaces initialized for document fetching")
# Resolve or create Gemeinde
gemeinde = None
# First, ensure Land "Schweiz" exists
laender = realEstateInterface.getLaender(recordFilter={"label": "Schweiz"})
if not laender:
land = Land(
mandateId=currentUser.mandateId,
label="Schweiz",
abk="CH"
)
land = realEstateInterface.createLand(land)
logger.debug(f"Created Land 'Schweiz' with ID: {land.id}")
else:
land = laender[0]
# Map canton abbreviations to full names
canton_names = {
"ZH": "Zürich", "BE": "Bern", "LU": "Luzern", "UR": "Uri", "SZ": "Schwyz",
"OW": "Obwalden", "NW": "Nidwalden", "GL": "Glarus", "ZG": "Zug", "FR": "Freiburg",
"SO": "Solothurn", "BS": "Basel-Stadt", "BL": "Basel-Landschaft", "SH": "Schaffhausen",
"AR": "Appenzell Ausserrhoden", "AI": "Appenzell Innerrhoden", "SG": "St. Gallen",
"GR": "Graubünden", "AG": "Aargau", "TG": "Thurgau", "TI": "Tessin",
"VD": "Waadt", "VS": "Wallis", "NE": "Neuenburg", "GE": "Genf", "JU": "Jura"
}
# Get or create Kanton
kantone = realEstateInterface.getKantone(recordFilter={"abk": canton})
if not kantone:
kanton_label = canton_names.get(canton, canton)
kanton_obj = Kanton(
mandateId=currentUser.mandateId,
label=kanton_label,
abk=canton,
id_land=land.id
)
kanton_obj = realEstateInterface.createKanton(kanton_obj)
logger.debug(f"Created Kanton '{kanton_label}' ({canton})")
else:
kanton_obj = kantone[0]
# Get or create Gemeinde
gemeinden = realEstateInterface.getGemeinden(
recordFilter={"label": municipality_name, "id_kanton": kanton_obj.id}
)
if not gemeinden:
gemeinde = Gemeinde(
mandateId=currentUser.mandateId,
label=municipality_name,
id_kanton=kanton_obj.id,
plz=plz
)
gemeinde = realEstateInterface.createGemeinde(gemeinde)
logger.info(f"Created Gemeinde '{municipality_name}'")
else:
gemeinde = gemeinden[0]
logger.debug(f"Found existing Gemeinde '{municipality_name}'")
gemeinde_info = {
"id": gemeinde.id,
"label": gemeinde.label,
"plz": gemeinde.plz
}
# Check if Gemeinde already has BZO documents
existing_bzo = False
logger.debug(f"Checking for existing BZO documents in Gemeinde '{gemeinde.label}' (has {len(gemeinde.dokumente) if gemeinde.dokumente else 0} documents)")
if gemeinde.dokumente:
for doc in gemeinde.dokumente:
if (doc.label and ("BZO" in doc.label.upper() or "BAU UND ZONENORDNUNG" in doc.label.upper() or
"PLAN D'AMÉNAGEMENT" in doc.label.upper() or "RÈGLEMENT DE CONSTRUCTION" in doc.label.upper() or
"PIANO DI UTILIZZAZIONE" in doc.label.upper() or "REGOLAMENTO EDILIZIO" in doc.label.upper())) or \
(doc.dokumentTyp and doc.dokumentTyp in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]):
existing_bzo = True
logger.info(f"Found existing BZO document: {doc.label} (ID: {doc.id})")
bzo_documents.append({
"id": doc.id,
"label": doc.label,
"dokumentTyp": doc.dokumentTyp.value if doc.dokumentTyp else None,
"dokumentReferenz": doc.dokumentReferenz,
"quelle": doc.quelle,
"mimeType": doc.mimeType
})
if existing_bzo:
logger.info(f"Gemeinde '{municipality_name}' already has {len(bzo_documents)} BZO document(s), skipping search")
# If no BZO documents found, search and download
if not existing_bzo:
logger.info(f"No BZO documents found for {municipality_name}, searching with Tavily...")
# Determine language
language = _get_language_from_kanton(canton)
# Generate search query
search_query = _get_bzo_search_query(municipality_name, language)
logger.debug(f"Tavily search query: {search_query}")
# Initialize Tavily connector
tavily = AiTavily()
# Search with Tavily
search_results = await tavily._search(
query=search_query,
maxResults=5,
country="switzerland"
)
if search_results:
# First, check for direct PDF URLs in search results
pdf_urls = []
html_urls = []
for result in search_results:
url = result.url.lower()
# Check if it's a direct PDF link
if url.endswith('.pdf') or '/pdf/' in url or url.endswith('/pdf'):
if not any(skip in url for skip in ['.html', '.htm', '/page/', '/article/', '/news/']):
pdf_urls.append(result.url)
else:
# It's an HTML page - we'll crawl it to find PDF links
html_urls.append(result.url)
# If no direct PDFs found, scrape HTML pages directly to find PDF links
if not pdf_urls and html_urls:
logger.info(f"No direct PDF links found, scraping {len(html_urls)} HTML pages to find PDF documents...")
# Helper function to scrape HTML and find PDF links
async def scrape_html_for_pdfs(url: str) -> List[str]:
"""Scrape an HTML page to find PDF links."""
found_pdfs = []
try:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector_aiohttp = aiohttp.TCPConnector(ssl=ssl_context)
timeout = aiohttp.ClientTimeout(total=15, connect=5)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8'
}
async with aiohttp.ClientSession(timeout=timeout, headers=headers, connector=connector_aiohttp) as session:
async with session.get(url, allow_redirects=True) as response:
if response.status == 200:
# Check Content-Type header first
content_type = response.headers.get('Content-Type', '').lower()
# Read first few bytes to check if it's a PDF
raw_bytes = await response.read()
# Check if it's actually a PDF by magic bytes
if raw_bytes.startswith(b'%PDF'):
found_pdfs.append(url)
logger.info(f"Found direct PDF link (detected by magic bytes): {url}")
return found_pdfs
# If Content-Type says it's a PDF, add it
if 'application/pdf' in content_type:
found_pdfs.append(url)
logger.info(f"Found direct PDF link (Content-Type): {url}")
return found_pdfs
# If URL ends with .pdf, it's likely a PDF
if url.lower().endswith('.pdf'):
found_pdfs.append(url)
logger.info(f"Found direct PDF link (URL extension): {url}")
return found_pdfs
# Try to decode as text for HTML parsing
try:
# Try UTF-8 first
html_content = raw_bytes.decode('utf-8')
except UnicodeDecodeError:
try:
# Try ISO-8859-1 (common for German sites)
html_content = raw_bytes.decode('iso-8859-1')
except UnicodeDecodeError:
try:
# Try Windows-1252
html_content = raw_bytes.decode('windows-1252')
except UnicodeDecodeError:
# If all else fails, skip this URL
logger.warning(f"Could not decode content from {url} (not UTF-8, ISO-8859-1, or Windows-1252), skipping HTML parsing")
return found_pdfs
# Look for PDF links in various formats
# Pattern 1: Direct PDF URLs
pdf_pattern = r'https?://[^\s<>"\'\)]+\.pdf(?:\?[^\s<>"\'\)]*)?'
found = re.findall(pdf_pattern, html_content, re.IGNORECASE)
# Pattern 2: Relative PDF links (convert to absolute)
relative_pattern = r'href=["\']([^"\']+\.pdf[^"\']*)["\']'
relative_found = re.findall(relative_pattern, html_content, re.IGNORECASE)
# Convert relative URLs to absolute
base_url = f"{urlparse(url).scheme}://{urlparse(url).netloc}"
for rel_url in relative_found:
# Remove query params and fragments for cleaner URLs
clean_url = rel_url.split('?')[0].split('#')[0]
if clean_url.endswith('.pdf'):
abs_url = urljoin(base_url, clean_url)
if abs_url not in found:
found.append(abs_url)
# Pattern 3: Look in data attributes and other places
data_pattern = r'data-[^=]*=["\']([^"\']+\.pdf[^"\']*)["\']'
data_found = re.findall(data_pattern, html_content, re.IGNORECASE)
for data_url in data_found:
clean_url = data_url.split('?')[0].split('#')[0]
if clean_url.endswith('.pdf'):
abs_url = urljoin(base_url, clean_url) if not clean_url.startswith('http') else clean_url
if abs_url not in found:
found.append(abs_url)
# Clean and deduplicate URLs
for pdf_link in found:
pdf_link = pdf_link.rstrip('.,;:!?)').strip()
# Remove common tracking parameters
if '?' in pdf_link:
base, params = pdf_link.split('?', 1)
# Keep only important params, remove tracking
important_params = []
for param in params.split('&'):
if param.split('=')[0].lower() not in ['utm_source', 'utm_medium', 'utm_campaign', 'ref', 'fbclid', 'gclid']:
important_params.append(param)
if important_params:
pdf_link = f"{base}?{'&'.join(important_params)}"
else:
pdf_link = base
if pdf_link not in found_pdfs and pdf_link.startswith('http'):
found_pdfs.append(pdf_link)
logger.debug(f"Found PDF link on {url}: {pdf_link}")
logger.info(f"Found {len(found_pdfs)} PDF links on {url}")
except Exception as e:
logger.debug(f"Error scraping {url} for PDFs: {e}", exc_info=True)
return found_pdfs
# Scrape HTML pages to find PDF links
for html_url in html_urls[:5]: # Limit to first 5 URLs
try:
logger.debug(f"Scraping {html_url} to find PDF links...")
found_pdfs = await scrape_html_for_pdfs(html_url)
pdf_urls.extend(found_pdfs)
except Exception as e:
logger.warning(f"Error scraping {html_url} to find PDFs: {e}", exc_info=True)
continue
# Also check rawContent from search results for PDF links
for result in search_results:
if result.rawContent:
pdf_pattern = r'https?://[^\s<>"\'\)]+\.pdf(?:\?[^\s<>"\'\)]*)?'
found_pdfs = re.findall(pdf_pattern, result.rawContent, re.IGNORECASE)
for pdf_link in found_pdfs:
pdf_link = pdf_link.rstrip('.,;:!?)').strip()
if pdf_link not in pdf_urls and pdf_link.startswith('http'):
pdf_urls.append(pdf_link)
logger.debug(f"Found PDF link in rawContent: {pdf_link}")
if not pdf_urls:
logger.warning(f"No PDF URLs found in Tavily results for {municipality_name}. Results were HTML pages, not direct PDF links.")
logger.debug(f"Tavily returned URLs: {[r.url for r in search_results]}")
logger.info(f"Found {len(pdf_urls)} potential PDF documents for {municipality_name}")
# Helper function to download a single PDF
async def download_pdf(pdf_url: str) -> Optional[bytes]:
"""Download a PDF from a URL with retry logic."""
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
if attempt > 0:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': '*/*'
}
else:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/pdf,application/octet-stream,*/*',
'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
# Create SSL context that doesn't verify certificates (for development)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Create connector with SSL context
connector = aiohttp.TCPConnector(ssl=ssl_context)
timeout = aiohttp.ClientTimeout(total=30, connect=10)
async with aiohttp.ClientSession(timeout=timeout, headers=headers, connector=connector) as session:
async with session.get(pdf_url, allow_redirects=True) as response:
if response.status == 200:
# Check content-type header first
content_type = response.headers.get('Content-Type', '').lower()
if 'text/html' in content_type or 'text/xml' in content_type:
logger.warning(f"URL {pdf_url} returned HTML content (Content-Type: {content_type}), skipping")
raise Exception("Server returned HTML content instead of PDF")
pdf_content = await response.read()
if not pdf_content or len(pdf_content) < 100:
raise Exception("Downloaded file is too small or empty")
# Verify it's actually a PDF
if not pdf_content.startswith(b'%PDF'):
if pdf_content.startswith(b'<') or pdf_content.startswith(b'<!DOCTYPE'):
logger.warning(f"URL {pdf_url} returned HTML content (detected HTML markers), skipping")
raise Exception("Server returned HTML instead of PDF")
logger.warning(f"Downloaded file from {pdf_url} doesn't appear to be a PDF (no PDF magic bytes), skipping")
raise Exception("File doesn't appear to be a valid PDF")
return pdf_content
elif response.status == 406:
logger.warning(f"HTTP 406 for {pdf_url}, will retry with minimal headers (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
else:
raise Exception(f"HTTP {response.status} (Not Acceptable) - server rejected request after {max_retries} attempts")
else:
raise Exception(f"HTTP {response.status} when downloading PDF")
except asyncio.TimeoutError:
logger.warning(f"Timeout downloading PDF from {pdf_url} (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
else:
raise Exception("Connection timeout after retries")
except aiohttp.ClientError as e:
logger.warning(f"Connection error downloading PDF from {pdf_url} (attempt {attempt + 1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
else:
raise Exception(f"Connection error: {str(e)}")
except Exception as e:
raise
return None
# Process PDF URLs
current_dokumente = list(gemeinde.dokumente) if gemeinde.dokumente else []
# Sanitize Gemeinde name for filename
safe_name = "".join(c for c in municipality_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_name = safe_name.replace(' ', '_')
if not safe_name:
safe_name = "Gemeinde"
# Determine base label based on language
if language == 'fr':
base_doc_label = f"Plan d'aménagement local {municipality_name}"
elif language == 'it':
base_doc_label = f"Piano di utilizzazione {municipality_name}"
else:
base_doc_label = f"BZO {municipality_name}"
# Process each PDF URL
for idx, pdf_url in enumerate(pdf_urls):
try:
logger.info(f"Downloading PDF {idx + 1}/{len(pdf_urls)} from {pdf_url} for {municipality_name}")
pdf_content = await download_pdf(pdf_url)
if not pdf_content or len(pdf_content) < 100:
logger.warning(f"Failed to download PDF from {pdf_url} for {municipality_name}")
continue
# Create unique file name
if len(pdf_urls) > 1:
file_name = f"BZO_{safe_name}_{idx + 1}.pdf"
doc_label = f"{base_doc_label} ({idx + 1})"
else:
file_name = f"BZO_{safe_name}.pdf"
doc_label = base_doc_label
# Store file using ComponentObjects
try:
file_item = componentInterface.createFile(
name=file_name,
mimeType="application/pdf",
content=pdf_content
)
componentInterface.createFileData(file_item.id, pdf_content)
logger.info(f"Stored file {file_name} with ID {file_item.id}")
except Exception as e:
logger.error(f"Error storing file {file_name}: {str(e)}", exc_info=True)
continue
# Create Dokument record
dokument = Dokument(
mandateId=currentUser.mandateId,
label=doc_label,
versionsbezeichnung="Aktuell",
dokumentTyp=DokumentTyp.GEMEINDE_BZO_AKTUELL,
dokumentReferenz=file_item.id,
quelle=pdf_url,
mimeType="application/pdf",
kategorienTags=["BZO", "Bauordnung", municipality_name]
)
# Create Dokument record
created_dokument = realEstateInterface.createDokument(dokument)
logger.info(f"Created Dokument record with ID {created_dokument.id}")
current_dokumente.append(created_dokument)
# Add to response
bzo_documents.append({
"id": created_dokument.id,
"label": created_dokument.label,
"dokumentTyp": created_dokument.dokumentTyp.value if created_dokument.dokumentTyp else None,
"dokumentReferenz": created_dokument.dokumentReferenz,
"quelle": created_dokument.quelle,
"mimeType": created_dokument.mimeType
})
except Exception as e:
logger.error(f"Error processing PDF {pdf_url}: {str(e)}", exc_info=True)
continue
# Update Gemeinde with new dokumente
if bzo_documents:
updated_gemeinde = realEstateInterface.updateGemeinde(
gemeinde.id,
{"dokumente": current_dokumente}
)
if updated_gemeinde:
logger.info(f"Successfully created {len(bzo_documents)} BZO document(s) for {municipality_name}")
else:
logger.warning(f"No search results found for {municipality_name}")
except Exception as e:
logger.error(f"Error fetching BZO documents for {municipality_name}: {e}", exc_info=True)
# Continue without documents - don't fail the request
elif fetch_documents:
if not municipality_name:
logger.warning("fetch_documents=true but municipality_name is not available, skipping document fetch")
elif not canton:
logger.warning("fetch_documents=true but canton is not available, skipping document fetch")
# Add Gemeinde and documents to response if available
logger.debug(f"Adding to response: gemeinde_info={gemeinde_info is not None}, bzo_documents count={len(bzo_documents)}")
if gemeinde_info:
response_data["gemeinde"] = gemeinde_info
logger.debug(f"Added gemeinde_info to response: {gemeinde_info}")
if bzo_documents:
response_data["documents"] = bzo_documents
logger.info(f"Added {len(bzo_documents)} BZO documents to response")
else:
logger.debug("No BZO documents to add to response")
return response_data
except HTTPException:
@ -1598,3 +2247,118 @@ async def add_parcel_to_project(
detail=f"Error adding parcel to project: {str(e)}"
)
@router.get("/bzo-information", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def get_bzo_information(
request: Request,
gemeinde: str = Query(..., description="Gemeinde name or ID"),
bauzone: str = Query(..., description="Bauzone code (e.g., W3, W2/30)"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Extract BZO information from PDF documents for a specific Bauzone in a Gemeinde.
Uses a langgraph workflow to extract content from BZO PDF documents for the
specified Gemeinde, then uses AI to search for relevant information specific
to the specified Bauzone.
The workflow:
1. Finds BZO documents for the Gemeinde (by name or ID)
2. Extracts content from PDFs using langgraph workflow
3. Filters rules, zones, and articles by Bauzone
4. Uses AI to generate a summary and find relevant information
Query Parameters:
- gemeinde: Gemeinde name (e.g., "Zürich") or ID
- bauzone: Bauzone code (e.g., "W3", "W2/30", "Z3")
Headers:
- X-CSRF-Token: CSRF token (required for security)
Returns:
{
"bauzone": "W3",
"gemeinde": {
"id": "...",
"label": "...",
"plz": "..."
},
"extracted_content": {
"zones": [...], // Zone information filtered by Bauzone
"rules": [...], // Rules filtered by Bauzone
"articles": [...], // Articles filtered by Bauzone
"total_zones": N,
"total_rules": N,
"total_articles": N
},
"ai_summary": "...", // AI-generated summary
"relevant_rules": [...], // Rules specifically for this Bauzone
"documents_processed": [ // List of document IDs processed
{
"id": "...",
"label": "...",
"dokumentTyp": "..."
}
],
"errors": [...],
"warnings": [...]
}
Examples:
- GET /api/realestate/bzo-information?gemeinde=Zürich&bauzone=W3
- GET /api/realestate/bzo-information?gemeinde=Uster&bauzone=W2/30
Raises:
- 404: Gemeinde not found
- 404: No BZO documents found for Gemeinde
- 500: Error during extraction or processing
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for GET /api/realestate/bzo-information from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for GET /api/realestate/bzo-information from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for GET /api/realestate/bzo-information from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}' (user: {currentUser.id}, mandate: {currentUser.mandateId})")
# Call the feature function
result = await extract_bzo_information(
currentUser=currentUser,
gemeinde=gemeinde,
bauzone=bauzone
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}': {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error extracting BZO information: {str(e)}"
)