From c2f2ed3b143aa772ccd6b438cf49e09b6b1346fe Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Tue, 27 Jan 2026 14:21:19 +0100 Subject: [PATCH] feat: extract bzo information --- app.py | 13 + .../realEstate/bzoExtractionLangGraph.py | 327 ++++++- modules/features/realEstate/mainRealEstate.py | 674 +++++++++++++- modules/routes/routeRealEstate.py | 834 +++++++++++++++++- 4 files changed, 1811 insertions(+), 37 deletions(-) diff --git a/app.py b/app.py index 06ee8c2d..cfcc3c62 100644 --- a/app.py +++ b/app.py @@ -286,6 +286,7 @@ instanceLabel = APP_CONFIG.get("APP_ENV_LABEL") async def lifespan(app: FastAPI): logger.info("Application is starting up") +<<<<<<< HEAD # --- Register RBAC catalog for features (moved here from loadFeatureRouters for single-pass loading) --- try: from modules.security.rbacCatalog import getCatalogService @@ -294,6 +295,18 @@ async def lifespan(app: FastAPI): registerAllFeaturesInCatalog(catalogService) except Exception as e: logger.warning(f"Could not register feature RBAC catalog: {e}") +======= + # Bootstrap database if needed (creates initial users, mandates, roles, etc.) + # This must happen before getting root interface + from modules.security.rootAccess import getRootDbAppConnector + from modules.interfaces.interfaceBootstrap import initBootstrap + rootDb = getRootDbAppConnector() + try: + initBootstrap(rootDb) + logger.info("Bootstrap check completed") + except Exception as e: + logger.warning(f"Bootstrap check failed (may already be initialized): {str(e)}") +>>>>>>> f6f41e2 (feat: extract bzo information) # Get event user for feature lifecycle (system-level user for background operations) rootInterface = getRootInterface() diff --git a/modules/features/realEstate/bzoExtractionLangGraph.py b/modules/features/realEstate/bzoExtractionLangGraph.py index 10c56244..0f5f909a 100644 --- a/modules/features/realEstate/bzoExtractionLangGraph.py +++ b/modules/features/realEstate/bzoExtractionLangGraph.py @@ -103,6 +103,9 @@ class BZOExtractionState(TypedDict): rule_candidates: List[Dict[str, Any]] parsed_rules: List[Dict[str, Any]] + # Zone-parameter tables (structured table data mapping zones to parameters) + zone_parameter_tables: List[Dict[str, Any]] + # Processing metadata errors: List[str] warnings: List[str] @@ -516,6 +519,322 @@ def confidence_scoring(state: BZOExtractionState) -> BZOExtractionState: return state +def extract_zone_parameter_tables(state: BZOExtractionState) -> BZOExtractionState: + """ + Extract structured zone-parameter mappings from tables. + + Parses tables that map building zones to parameter values (e.g., Ausnützungsziffer, + Vollgeschosse, Gebäudelänge, Grenzabstand, Fassadenhöhen). + """ + try: + import re + tables = [] + + # Find all table blocks + table_blocks = [ + block for block in state.get("classified_blocks", []) + if block.get("block_type") == "table" + ] + + logger.info(f"Found {len(table_blocks)} table blocks to process") + + for table_block in table_blocks: + block_dict = table_block.get("block", {}) + text = block_dict.get("text", "") + page = block_dict.get("page", 0) + + if not text or len(text.strip()) < 20: # Skip very short blocks + continue + + # Try to parse table structure + # Look for zone codes in header row (W2/30, W3/50, W4/70G*, etc.) + zone_pattern = r'\b([WLIZK]\d+(?:/\d+)?(?:G\*?)?)\b' + lines = text.split('\n') + + # Find header row (usually contains zone codes) + header_row_idx = None + zone_columns = [] + + for idx, line in enumerate(lines): + # Look for multiple zone codes in a line (header row) + zone_matches = re.findall(zone_pattern, line, re.IGNORECASE) + if len(zone_matches) >= 3: # At least 3 zones indicates header row + header_row_idx = idx + zone_columns = zone_matches + logger.debug(f"Found header row at line {idx} with zones: {zone_columns}") + break + + if not header_row_idx or not zone_columns: + # Try alternative: look for common table patterns + # Check if text contains parameter names and zone codes + parameter_keywords = [ + r'Ausnützungsziffer', + r'Vollgeschosse', + r'Dachgeschosse', + r'Attikageschoss', + r'Untergeschoss', + r'Gebäudelänge', + r'Grenzabstand', + r'Fassadenhöhen', + r'Grundabstand', + r'Mehrlängen', + r'Höchstmass' + ] + + has_parameters = any(re.search(kw, text, re.IGNORECASE) for kw in parameter_keywords) + has_zones = len(re.findall(zone_pattern, text, re.IGNORECASE)) >= 3 + + if has_parameters and has_zones: + # Extract all zones from entire text + all_zones = re.findall(zone_pattern, text, re.IGNORECASE) + zone_columns = list(dict.fromkeys(all_zones)) # Remove duplicates, preserve order + header_row_idx = 0 # Assume header is at start + logger.debug(f"Found zones in table text: {zone_columns}") + + if not zone_columns: + continue + + # Parse parameter rows + table_data = { + "page": page, + "zones": zone_columns, + "parameters": [], + "source_text": text[:500], # Store first 500 chars for reference + "article": None # Will be set if found + } + + # Extract parameters and their values + # Look for parameter rows (a), b), c), etc. or parameter names + parameter_row_patterns = [ + r'^[a-g]\)\s+(.+?)(?:\s+max\.|min\.|:)?', # a) Parameter name + r'^(Ausnützungsziffer|Vollgeschosse|Dachgeschosse|Attikageschoss|Untergeschoss|Gebäudelänge|Grenzabstand|Fassadenhöhen|Grundabstand|Mehrlängen|Höchstmass|Höchstmaß)', + ] + + # Parse each line after header + start_idx = header_row_idx + 1 if header_row_idx is not None else 0 + current_parameter = None + current_subparameter = None + parameter_values = {} + subparameter_values = {} + + # Track which article/section this table belongs to + article_context = None + for block in state.get("classified_blocks", []): + if block.get("block", {}).get("page") == page: + article_label = block.get("article_label") + if article_label: + article_context = article_label + break + + for line_idx in range(start_idx, len(lines)): + line = lines[line_idx].strip() + if not line: + continue + + # Check if this is a parameter row (main parameter like a), b), c)) + is_parameter_row = False + parameter_name = None + + for pattern in parameter_row_patterns: + match = re.match(pattern, line, re.IGNORECASE) + if match: + is_parameter_row = True + parameter_name = match.group(1).strip() + # Clean up parameter name + parameter_name = re.sub(r'\s+max\.?\s*$', '', parameter_name, flags=re.IGNORECASE) + parameter_name = re.sub(r'\s+min\.?\s*$', '', parameter_name, flags=re.IGNORECASE) + break + + # Check for sub-parameters (like "Grundabstand min.", "Mehrlängen-zuschlag", "Höchstmass max.") + is_subparameter = False + subparameter_name = None + if not is_parameter_row: + subparameter_patterns = [ + r'^(Grundabstand|Mehrlängen|Höchstmass|Höchstmaß|Fassadenhöhen)\s*(min\.|max\.)?', + r'^(anrechenbare\s+Dachgeschosse|anrechenbares\s+Attikageschoss|anrechenbares\s+Untergeschoss)', + ] + for pattern in subparameter_patterns: + match = re.search(pattern, line, re.IGNORECASE) + if match: + is_subparameter = True + subparameter_name = match.group(1).strip() + if match.lastindex > 1 and match.group(2): + subparameter_name += f" {match.group(2).strip()}" + break + + if is_parameter_row and parameter_name: + # Save previous parameter if exists + if current_parameter and parameter_values: + param_entry = { + "parameter": current_parameter, + "values_by_zone": parameter_values.copy() + } + if article_context: + param_entry["article"] = article_context + table_data["parameters"].append(param_entry) + + # Start new parameter + current_parameter = parameter_name + current_subparameter = None + parameter_values = {} + subparameter_values = {} + continue + + if is_subparameter and subparameter_name: + # Save previous subparameter if exists + if current_subparameter and subparameter_values: + if current_parameter: + # Add subparameter as nested parameter + param_entry = { + "parameter": f"{current_parameter} - {current_subparameter}", + "values_by_zone": subparameter_values.copy() + } + if article_context: + param_entry["article"] = article_context + table_data["parameters"].append(param_entry) + + current_subparameter = subparameter_name + subparameter_values = {} + continue + + # Try to extract values for current parameter or subparameter + target_values = subparameter_values if current_subparameter else parameter_values + if current_parameter or current_subparameter: + # Improved parsing: try to align values with zone columns + # Split line by multiple spaces or tabs (table column separators) + line_parts = re.split(r'\s{2,}|\t', line) + line_parts = [p.strip() for p in line_parts if p.strip()] + + # Look for numeric values with units + numeric_pattern = r'(\d+(?:\.\d+)?)\s*(%|m|Geschoss|Geschosse|Geschosse\s+max\.?|Geschoss\s+max\.?)?' + all_matches = list(re.finditer(numeric_pattern, line, re.IGNORECASE)) + + # Also look for fractions (like 1/3) + fraction_pattern = r'(\d+/\d+)' + fraction_matches = list(re.finditer(fraction_pattern, line, re.IGNORECASE)) + + # Combine all matches, preserving position + all_value_matches = [] + for m in all_matches: + value = m.group(1) + unit = m.group(2) if m.lastindex > 1 else None + all_value_matches.append((m.start(), m.group(0), value, unit)) + + for m in fraction_matches: + all_value_matches.append((m.start(), m.group(0), m.group(0), None)) + + all_value_matches.sort(key=lambda x: x[0]) + + # Try to map values to zones + # Strategy: if we have roughly the same number of values as zones, map 1:1 + # Otherwise, try to distribute evenly + if len(all_value_matches) > 0 and len(zone_columns) > 0: + if len(all_value_matches) == len(zone_columns): + # Perfect 1:1 mapping + for zone_idx, zone in enumerate(zone_columns): + if zone_idx < len(all_value_matches): + _, full_match, value, unit = all_value_matches[zone_idx] + if zone not in target_values: + target_values[zone] = [] + target_values[zone].append({ + "value": value, + "unit": unit.strip() if unit else None, + "raw_text": line[:200], + "line_number": line_idx + }) + elif len(all_value_matches) >= len(zone_columns): + # More values than zones - try to group + values_per_zone = len(all_value_matches) / len(zone_columns) + for zone_idx, zone in enumerate(zone_columns): + start_idx = int(zone_idx * values_per_zone) + end_idx = int((zone_idx + 1) * values_per_zone) + zone_values = all_value_matches[start_idx:end_idx] + + if zone_values: + if zone not in target_values: + target_values[zone] = [] + # Take the first (or most relevant) value + _, full_match, value, unit = zone_values[0] + target_values[zone].append({ + "value": value, + "unit": unit.strip() if unit else None, + "raw_text": line[:200], + "line_number": line_idx + }) + else: + # Fewer values than zones - try to match by position + # Use line_parts if they align better + if len(line_parts) >= len(zone_columns) * 0.7: + # Try to extract values from line_parts + for zone_idx, zone in enumerate(zone_columns): + if zone_idx < len(line_parts): + part = line_parts[zone_idx] + # Extract numeric value from this part + num_match = re.search(r'(\d+(?:\.\d+)?)', part) + if num_match: + value = num_match.group(1) + unit_match = re.search(r'(%|m|Geschoss)', part, re.IGNORECASE) + unit = unit_match.group(0) if unit_match else None + + if zone not in target_values: + target_values[zone] = [] + target_values[zone].append({ + "value": value, + "unit": unit, + "raw_text": part[:100], + "line_number": line_idx + }) + else: + # Fallback: assign to first zone(s) + for idx, (_, full_match, value, unit) in enumerate(all_value_matches): + if idx < len(zone_columns): + zone = zone_columns[idx] + if zone not in target_values: + target_values[zone] = [] + target_values[zone].append({ + "value": value, + "unit": unit.strip() if unit else None, + "raw_text": line[:200], + "line_number": line_idx + }) + + # Save last parameter/subparameter + if current_subparameter and subparameter_values: + if current_parameter: + param_entry = { + "parameter": f"{current_parameter} - {current_subparameter}", + "values_by_zone": subparameter_values.copy() + } + if article_context: + param_entry["article"] = article_context + table_data["parameters"].append(param_entry) + + if current_parameter and parameter_values: + param_entry = { + "parameter": current_parameter, + "values_by_zone": parameter_values.copy() + } + if article_context: + param_entry["article"] = article_context + table_data["parameters"].append(param_entry) + + if table_data["parameters"]: + tables.append(table_data) + logger.info(f"Extracted table with {len(table_data['zones'])} zones and {len(table_data['parameters'])} parameters from page {page}") + + # Update state + existing_tables = state.get("zone_parameter_tables", []) + state["zone_parameter_tables"] = existing_tables + tables + + logger.info(f"Extracted {len(tables)} zone-parameter tables total") + return state + + except Exception as e: + logger.error(f"Error extracting zone-parameter tables: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Table extraction error: {str(e)}"] + return state + + # ===== Graph Construction ===== @@ -529,6 +848,7 @@ def create_bzo_extraction_graph(): workflow.add_node("classify_text_block", classify_text_block) workflow.add_node("assemble_articles", assemble_articles) workflow.add_node("detect_zone_changes", detect_zone_changes) + workflow.add_node("extract_zone_parameter_tables", extract_zone_parameter_tables) workflow.add_node("detect_rule_candidates", detect_rule_candidates) workflow.add_node("parse_rule_values", parse_rule_values) workflow.add_node("assign_zone_and_scope", assign_zone_and_scope) @@ -539,7 +859,8 @@ def create_bzo_extraction_graph(): workflow.add_edge("extract_pdf_text", "classify_text_block") workflow.add_edge("classify_text_block", "assemble_articles") workflow.add_edge("assemble_articles", "detect_zone_changes") - workflow.add_edge("detect_zone_changes", "detect_rule_candidates") + workflow.add_edge("detect_zone_changes", "extract_zone_parameter_tables") + workflow.add_edge("extract_zone_parameter_tables", "detect_rule_candidates") workflow.add_edge("detect_rule_candidates", "parse_rule_values") workflow.add_edge("parse_rule_values", "assign_zone_and_scope") workflow.add_edge("assign_zone_and_scope", "confidence_scoring") @@ -583,6 +904,7 @@ def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None "zones": [], "rule_candidates": [], "parsed_rules": [], + "zone_parameter_tables": [], "errors": [], "warnings": [] } @@ -621,10 +943,13 @@ def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None key=lambda x: (x.get("rule_type", ""), x.get("page", 0)) ) + zone_parameter_tables = final_state.get("zone_parameter_tables", []) + return { "articles": articles, "zones": zones, "rules": rules, + "zone_parameter_tables": zone_parameter_tables, "errors": final_state.get("errors", []), "warnings": final_state.get("warnings", []) } diff --git a/modules/features/realEstate/mainRealEstate.py b/modules/features/realEstate/mainRealEstate.py index 0483218d..37b34ee9 100644 --- a/modules/features/realEstate/mainRealEstate.py +++ b/modules/features/realEstate/mainRealEstate.py @@ -294,10 +294,14 @@ from .datamodelFeatureRealEstate import ( Gemeinde, Kanton, Land, + DokumentTyp, ) from modules.services import getInterface as getServices -from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface +from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface +from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector +from modules.features.realEstate.bzoDocumentRetriever import BZODocumentRetriever +from modules.features.realEstate.bzoExtractionLangGraph import run_extraction logger = logging.getLogger(__name__) @@ -2329,3 +2333,671 @@ async def create_project_with_parcel_data( logger.error(f"Error creating project with parcel data: {str(e)}", exc_info=True) raise + +# ===== BZO Information Extraction for Parcels ===== + +async def extract_bzo_information( + currentUser: User, + gemeinde: str, + bauzone: str, +) -> Dict[str, Any]: + """ + Extract BZO information from PDF documents for a specific Bauzone in a Gemeinde. + + Retrieves BZO documents for the specified Gemeinde, extracts content using + langgraph workflow, filters by Bauzone, and uses AI to find relevant information. + + Args: + currentUser: Current authenticated user + gemeinde: Gemeinde name (e.g., "Zürich") or ID + bauzone: Bauzone code (e.g., "W3", "W2/30") + + Returns: + Dictionary containing: + - bauzone: Bauzone code + - gemeinde: Gemeinde information + - extracted_content: Extracted content from PDFs + - ai_summary: AI-generated summary + - relevant_rules: Rules filtered by Bauzone + - documents_processed: List of document IDs processed + + Raises: + HTTPException: If Gemeinde not found or no documents found + """ + try: + logger.info(f"Extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}' (user: {currentUser.id})") + + # Get interfaces + realEstateInterface = getRealEstateInterface(currentUser) + componentInterface = getComponentInterface(currentUser) + + # Get Gemeinde - try by ID first, then by label + logger.debug(f"Attempting to retrieve Gemeinde '{gemeinde}' for mandate {currentUser.mandateId}") + gemeinde_obj = realEstateInterface.getGemeinde(gemeinde) + + # If not found by ID, try searching by label + if not gemeinde_obj: + logger.debug(f"Gemeinde not found by ID, trying to search by label: {gemeinde}") + gemeinden_by_label = realEstateInterface.getGemeinden( + recordFilter={"label": gemeinde} + ) + if gemeinden_by_label and len(gemeinden_by_label) > 0: + gemeinde_obj = gemeinden_by_label[0] + logger.info(f"Found Gemeinde by label '{gemeinde}' with ID: {gemeinde_obj.id}") + else: + # Try to get all gemeinden to see what's available (for debugging) + all_gemeinden = realEstateInterface.getGemeinden(recordFilter=None) + logger.warning(f"Gemeinde '{gemeinde}' not found by ID or label. Total Gemeinden in database: {len(all_gemeinden)}") + if all_gemeinden: + sample_ids = [g.id for g in all_gemeinden[:5]] + sample_labels = [g.label for g in all_gemeinden[:5] if g.label] + logger.warning(f"Sample Gemeinde IDs: {sample_ids}") + if sample_labels: + logger.warning(f"Sample Gemeinde labels: {sample_labels}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Gemeinde '{gemeinde}' not found or not accessible" + ) + + gemeinde_id = gemeinde_obj.id + + # Get BZO documents directly from Gemeinde's dokumente field + bzo_documents = [] + if gemeinde_obj.dokumente: + for doc in gemeinde_obj.dokumente: + # Handle both dict and object formats + if isinstance(doc, dict): + doc_id = doc.get("id") + doc_typ = doc.get("dokumentTyp") + else: + doc_id = doc.id if hasattr(doc, "id") else None + doc_typ = doc.dokumentTyp if hasattr(doc, "dokumentTyp") else None + + # Check if it's a BZO document type + if doc_typ: + # Handle enum, string, or dict formats + if isinstance(doc_typ, DokumentTyp): + is_bzo = doc_typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION] + elif isinstance(doc_typ, str): + is_bzo = doc_typ in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"] + else: + doc_typ_str = str(doc_typ) + is_bzo = doc_typ_str in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"] + + if is_bzo: + # Get full document object + if doc_id: + full_doc = realEstateInterface.getDokument(doc_id) + if full_doc: + bzo_documents.append(full_doc) + else: + logger.warning(f"Document {doc_id} referenced in Gemeinde but not found in database") + + if not bzo_documents: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"No BZO documents found for Gemeinde '{gemeinde_obj.label}'" + ) + + logger.info(f"Found {len(bzo_documents)} BZO document(s) for Gemeinde '{gemeinde_obj.label}'") + + # Initialize document retriever + document_retriever = BZODocumentRetriever(realEstateInterface, componentInterface) + + # Extract content from all documents + all_extracted_content = { + "articles": [], + "zones": [], + "rules": [], + "zone_parameter_tables": [], + "errors": [], + "warnings": [] + } + documents_processed = [] + + for dokument in bzo_documents: + try: + logger.info(f"Processing document {dokument.id}: {dokument.label}") + + # Retrieve PDF content + pdf_bytes = document_retriever.retrieve_pdf_content(dokument) + if not pdf_bytes: + logger.warning(f"Could not retrieve PDF content for dokument {dokument.id}") + all_extracted_content["warnings"].append( + f"Could not retrieve PDF content for document '{dokument.label}'" + ) + continue + + # Run extraction using langgraph workflow + extraction_result = run_extraction( + pdf_bytes=pdf_bytes, + pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}", + dokument_id=dokument.id + ) + + # Combine results + all_extracted_content["articles"].extend(extraction_result.get("articles", [])) + all_extracted_content["zones"].extend(extraction_result.get("zones", [])) + all_extracted_content["rules"].extend(extraction_result.get("rules", [])) + all_extracted_content["zone_parameter_tables"].extend(extraction_result.get("zone_parameter_tables", [])) + all_extracted_content["errors"].extend(extraction_result.get("errors", [])) + all_extracted_content["warnings"].extend(extraction_result.get("warnings", [])) + + documents_processed.append({ + "id": dokument.id, + "label": dokument.label, + "dokumentTyp": dokument.dokumentTyp.value if dokument.dokumentTyp else None + }) + + except Exception as e: + logger.error(f"Error processing document {dokument.id}: {str(e)}", exc_info=True) + all_extracted_content["errors"].append( + f"Error processing document '{dokument.label}': {str(e)}" + ) + continue + + # Filter rules by Bauzone + relevant_rules = filter_rules_by_bauzone( + all_extracted_content["rules"], + bauzone + ) + + # Filter zones by Bauzone + relevant_zones = filter_zones_by_bauzone( + all_extracted_content["zones"], + bauzone + ) + + # Filter articles that mention the Bauzone + relevant_articles = filter_articles_by_bauzone( + all_extracted_content.get("articles", []), + bauzone + ) + + # Use AI to generate summary and find additional information + ai_summary = await generate_bauzone_ai_summary( + currentUser=currentUser, + bauzone=bauzone, + gemeinde=gemeinde_obj.label, + extracted_content=all_extracted_content, + relevant_rules=relevant_rules, + relevant_zones=relevant_zones + ) + + # Build unified summary that includes zones and articles + unified_summary = ai_summary + + # Append zone and article information to the summary if not already included + # The AI should have integrated this, but we add it as backup if needed + summary_lower = unified_summary.lower() + + # Check if zones are mentioned in summary + zones_mentioned = any(zone.get("zone_code", "").upper() in summary_lower for zone in relevant_zones) + if not zones_mentioned and relevant_zones: + unified_summary += "\n\n=== ZONENDEFINITIONEN ===\n" + for zone in relevant_zones: + zone_code = zone.get("zone_code", "") + zone_name = zone.get("zone_name", "") + zone_category = zone.get("zone_category", "") + geschosszahl = zone.get("geschosszahl") + gewerbeerleichterung = zone.get("gewerbeerleichterung", False) + page_num = zone.get("page", 0) + source_article = zone.get("source_article", "") + + zone_info = f"{zone_code}: {zone_name}" + if zone_category: + zone_info += f"\nKategorie: {zone_category}" + if geschosszahl: + zone_info += f"\nGeschosszahl: {geschosszahl}" + if gewerbeerleichterung: + zone_info += "\nGewerbeerleichterung: Ja" + if source_article: + zone_info += f"\nQuelle: {source_article} (Seite {page_num})" + unified_summary += zone_info + "\n\n" + + # Check if articles are mentioned in summary + articles_mentioned = any(article.get("article_label", "") in summary_lower for article in relevant_articles) + if not articles_mentioned and relevant_articles: + unified_summary += "\n\n=== RELEVANTE ARTIKEL ===\n" + for article in relevant_articles: + article_label = article.get("article_label", "") + article_title = article.get("article_title", "") + article_text = article.get("text", "") + page_start = article.get("page_start", 0) + page_end = article.get("page_end", 0) + page_range = f"Seite {page_start}" if page_start == page_end else f"Seiten {page_start}-{page_end}" + + unified_summary += f"{article_label}" + if article_title: + unified_summary += f": {article_title}" + unified_summary += f" ({page_range})\n" + # Include first 500 chars of article text + if article_text: + preview = article_text[:500] + "..." if len(article_text) > 500 else article_text + unified_summary += f"{preview}\n\n" + + return { + "bauzone": bauzone, + "gemeinde": { + "id": gemeinde_obj.id, + "label": gemeinde_obj.label, + "plz": gemeinde_obj.plz + }, + "extracted_content": { + "zones": relevant_zones, + "rules": relevant_rules, + "articles": relevant_articles, + "zone_parameter_tables": _filter_tables_by_bauzone( + all_extracted_content.get("zone_parameter_tables", []), + bauzone + ), + "total_zones": len(all_extracted_content.get("zones", [])), + "total_rules": len(all_extracted_content.get("rules", [])), + "total_articles": len(all_extracted_content.get("articles", [])), + "total_tables": len(all_extracted_content.get("zone_parameter_tables", [])) + }, + "ai_summary": unified_summary, + "relevant_rules": relevant_rules, + "documents_processed": documents_processed, + "errors": all_extracted_content.get("errors", []), + "warnings": all_extracted_content.get("warnings", []) + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}': {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error extracting BZO information: {str(e)}" + ) + + +def filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: + """ + Filter rules by Bauzone code. + + Args: + rules: List of rule dictionaries from extraction + bauzone: Bauzone code to filter by (e.g., "W3", "W2/30") + + Returns: + Filtered list of rules that match the Bauzone + """ + relevant_rules = [] + bauzone_upper = bauzone.upper() + + for rule in rules: + # Check if rule has zone information + zone_raw = rule.get("zone_raw") + table_zones = rule.get("table_zones", []) + + # Check if rule matches Bauzone + matches = False + + # Direct zone match + if zone_raw and bauzone_upper in zone_raw.upper(): + matches = True + + # Table zone match + if not matches and table_zones: + for table_zone in table_zones: + if bauzone_upper in str(table_zone).upper(): + matches = True + break + + # Check text snippet for Bauzone mention + if not matches: + text_snippet = rule.get("text_snippet", "") + if bauzone_upper in text_snippet.upper(): + matches = True + + if matches: + relevant_rules.append(rule) + + logger.info(f"Filtered {len(relevant_rules)} rules for Bauzone {bauzone} from {len(rules)} total rules") + return relevant_rules + + +def filter_zones_by_bauzone(zones: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: + """ + Filter zones by Bauzone code. + + Args: + zones: List of zone dictionaries from extraction + bauzone: Bauzone code to filter by + + Returns: + Filtered list of zones that match the Bauzone + """ + relevant_zones = [] + bauzone_upper = bauzone.upper() + + for zone in zones: + zone_code = zone.get("zone_code", "") + if bauzone_upper in zone_code.upper(): + relevant_zones.append(zone) + + logger.info(f"Filtered {len(relevant_zones)} zones for Bauzone {bauzone} from {len(zones)} total zones") + return relevant_zones + + +def filter_articles_by_bauzone(articles: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: + """ + Filter articles that mention the Bauzone. + + Args: + articles: List of article dictionaries from extraction + bauzone: Bauzone code to filter by + + Returns: + Filtered list of articles that mention the Bauzone + """ + relevant_articles = [] + bauzone_upper = bauzone.upper() + + for article in articles: + text = article.get("text", "") + zone_raw = article.get("zone_raw") + + # Check if article mentions the Bauzone + text_matches = bauzone_upper in text.upper() if text else False + zone_matches = bauzone_upper in zone_raw.upper() if zone_raw else False + + if text_matches or zone_matches: + relevant_articles.append(article) + + logger.info(f"Filtered {len(relevant_articles)} articles for Bauzone {bauzone} from {len(articles)} total articles") + return relevant_articles + + +def _filter_tables_by_bauzone(tables: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: + """ + Filter zone-parameter tables to include only those containing the specified Bauzone. + + Args: + tables: List of zone-parameter table dictionaries + bauzone: Bauzone code to filter by + + Returns: + Filtered list of tables containing the Bauzone + """ + relevant_tables = [] + bauzone_upper = bauzone.upper() + + for table in tables: + zones = table.get("zones", []) + # Check if any zone in the table matches the Bauzone + matching_zones = [z for z in zones if bauzone_upper in str(z).upper()] + + if matching_zones: + # Create filtered version with only relevant zone columns + filtered_table = { + "page": table.get("page"), + "zones": matching_zones, + "parameters": [] + } + + # Filter parameters to only include values for matching zones + for param in table.get("parameters", []): + values_by_zone = param.get("values_by_zone", {}) + filtered_values = { + zone: values_by_zone[zone] + for zone in matching_zones + if zone in values_by_zone + } + + if filtered_values: + filtered_table["parameters"].append({ + "parameter": param.get("parameter"), + "values_by_zone": filtered_values + }) + + if filtered_table["parameters"]: + relevant_tables.append(filtered_table) + + logger.info(f"Filtered {len(relevant_tables)} tables for Bauzone {bauzone} from {len(tables)} total tables") + return relevant_tables + + +async def generate_bauzone_ai_summary( + currentUser: User, + bauzone: str, + gemeinde: str, + extracted_content: Dict[str, Any], + relevant_rules: List[Dict[str, Any]], + relevant_zones: List[Dict[str, Any]] +) -> str: + """ + Use AI to generate a summary of relevant information for a Bauzone. + + Args: + currentUser: Current authenticated user + bauzone: Bauzone code + gemeinde: Gemeinde name + extracted_content: All extracted content from PDFs + relevant_rules: Rules filtered by Bauzone + relevant_zones: Zones filtered by Bauzone + + Returns: + AI-generated summary string + """ + try: + # Initialize AI service + services = getServices(currentUser, workflow=None) + aiService = services.ai + + # Build context from extracted content, prioritizing zone-parameter tables + context_parts = [] + + # Extract and format zone-parameter table values for the specific Bauzone + zone_parameter_tables = extracted_content.get("zone_parameter_tables", []) + table_values_for_bauzone = [] + + if zone_parameter_tables: + context_parts.append("=== BUILDING REGULATIONS TABLE VALUES FOR BAUZONE (INCLUDE THESE EXACT VALUES IN YOUR SUMMARY) ===") + for table in zone_parameter_tables: + page_num = table.get("page", 0) + article_ref = table.get("article", "Unknown article") + zones_in_table = table.get("zones", []) + + # Check if this table contains the requested Bauzone + matching_zones = [z for z in zones_in_table if bauzone.upper() in str(z).upper()] + + if matching_zones: + context_parts.append(f"\nTabelle aus {article_ref} (Seite {page_num}):") + + for param in table.get("parameters", []): + param_name = param.get("parameter", "") + values_by_zone = param.get("values_by_zone", {}) + + # Extract values for the requested Bauzone + for zone, values in values_by_zone.items(): + if bauzone.upper() in zone.upper(): + if isinstance(values, list) and len(values) > 0: + # Take the first value (most relevant) + val_entry = values[0] + value = val_entry.get("value", "") + unit = val_entry.get("unit", "") + unit_str = f" {unit}" if unit else "" + + # Format parameter name nicely + formatted_param = param_name + if "Ausnützungsziffer" in param_name or "ausnützungsziffer" in param_name.lower(): + formatted_param = "Ausnützungsziffer max." + elif "Vollgeschosse" in param_name or "vollgeschosse" in param_name.lower(): + formatted_param = "Vollgeschosse max." + elif "Gebäudelänge" in param_name or "gebäudelänge" in param_name.lower(): + formatted_param = "Gebäudelänge max." + elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Grundabstand" in param_name or "grundabstand" in param_name.lower()): + formatted_param = "Grenzabstand - Grundabstand min." + elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Mehrlängen" in param_name or "mehrlängen" in param_name.lower()): + formatted_param = "Grenzabstand - Mehrlängen-zuschlag" + elif ("Grenzabstand" in param_name or "grenzabstand" in param_name.lower()) and ("Höchstmass" in param_name or "höchstmass" in param_name.lower() or "Höchstmaß" in param_name): + formatted_param = "Grenzabstand - Höchstmass max." + elif "Fassadenhöhen" in param_name or "fassadenhöhen" in param_name.lower(): + formatted_param = "Fassadenhöhen max." + elif "Dachgeschosse" in param_name or "dachgeschosse" in param_name.lower(): + formatted_param = "anrechenbare Dachgeschosse max." + elif "Attikageschoss" in param_name or "attikageschoss" in param_name.lower(): + formatted_param = "anrechenbares Attikageschoss max." + elif "Untergeschoss" in param_name or "untergeschoss" in param_name.lower(): + formatted_param = "anrechenbares Untergeschoss max." + + table_values_for_bauzone.append({ + "parameter": formatted_param, + "value": value, + "unit": unit_str, + "article": article_ref, + "page": page_num + }) + context_parts.append(f" • {formatted_param}: {value}{unit_str} (Quelle: {article_ref}, Seite {page_num})") + + # Also check for multiple values (e.g., Fassadenhöhen with footnote values) + if len(values) > 1: + for idx, val_entry in enumerate(values[1:], 1): + value_extra = val_entry.get("value", "") + unit_extra = val_entry.get("unit", "") + unit_str_extra = f" {unit_extra}" if unit_extra else "" + context_parts.append(f" (Alternative: {value_extra}{unit_str_extra})") + + # Add zone information with all details + if relevant_zones: + context_parts.append("\n=== ZONE DEFINITIONS ===") + for zone in relevant_zones: + zone_code = zone.get("zone_code", "") + zone_name = zone.get("zone_name", "") + zone_category = zone.get("zone_category", "") + geschosszahl = zone.get("geschosszahl") + gewerbeerleichterung = zone.get("gewerbeerleichterung", False) + page_num = zone.get("page", 0) + source_article = zone.get("source_article", "") + + zone_info = f"- {zone_code}: {zone_name}" + if zone_category: + zone_info += f" (Kategorie: {zone_category})" + if geschosszahl: + zone_info += f", Geschosszahl: {geschosszahl}" + if gewerbeerleichterung: + zone_info += ", Gewerbeerleichterung: Ja" + if source_article: + zone_info += f" - Quelle: {source_article} (Seite {page_num})" + context_parts.append(zone_info) + + # Add article information with full text previews + relevant_articles = filter_articles_by_bauzone(extracted_content.get("articles", []), bauzone) + if relevant_articles: + context_parts.append("\n=== RELEVANT ARTICLES (full content) ===") + for article in relevant_articles: + article_label = article.get("article_label", "") + article_title = article.get("article_title", "") + article_text = article.get("text", "") + page_start = article.get("page_start", 0) + page_end = article.get("page_end", 0) + page_range = f"Seite {page_start}" if page_start == page_end else f"Seiten {page_start}-{page_end}" + + context_parts.append(f"\n{article_label}: {article_title or 'Kein Titel'}") + context_parts.append(f"Lage: {page_range}") + # Include full article text (truncated if too long) + if len(article_text) > 1000: + context_parts.append(f"Inhalt: {article_text[:1000]}...") + else: + context_parts.append(f"Inhalt: {article_text}") + + # Add relevant rules (only if not already covered in tables) + if relevant_rules: + # Filter out rules that are likely already in tables + table_parameter_names = set() + for table in zone_parameter_tables: + for param in table.get("parameters", []): + param_name = param.get("parameter", "").lower() + table_parameter_names.add(param_name) + + unique_rules = [] + for rule in relevant_rules[:15]: + rule_type = rule.get("rule_type", "").lower() + # Skip if this rule type is likely in tables + if not any(tp in rule_type for tp in table_parameter_names): + unique_rules.append(rule) + + if unique_rules: + context_parts.append("\n=== ADDITIONAL BUILDING REGULATIONS (from text) ===") + for rule in unique_rules[:8]: + rule_type = rule.get("rule_type", "") + value_numeric = rule.get("value_numeric") + value_text = rule.get("value_text", "") + unit = rule.get("unit", "") + page_num = rule.get("page", 0) + + rule_desc = f"- {rule_type}: " + if value_numeric is not None: + rule_desc += f"{value_numeric}" + if unit: + rule_desc += f" {unit}" + else: + rule_desc += value_text + rule_desc += f" (Seite {page_num})" + + context_parts.append(rule_desc) + + context = "\n".join(context_parts) + + # Create AI prompt with explicit instructions to include all table values + prompt = f""" +Analyze the following building zone (Bauzone) information extracted from BZO (Bau- und Zonenordnung) documents for {gemeinde}, specifically for Bauzone {bauzone}. + +Extracted Content: +{context} + +CRITICAL INSTRUCTIONS: +1. You MUST include ALL actual values from the tables in your summary - do NOT just say "see tables on page X" +2. List ALL parameters with their actual values: Ausnützungsziffer, Vollgeschosse, Gebäudelänge, Grenzabstand (Grundabstand, Mehrlängen-zuschlag, Höchstmass), Fassadenhöhen, etc. +3. Integrate zone definitions and article information INTO the summary text - do NOT create separate sections +4. Always cite WHERE each piece of information was found (article number and page number) +5. Combine everything into ONE unified, flowing summary - no separate sections for zones/articles +6. Be comprehensive - include all relevant details from zones, articles, and tables +7. Format as a single, well-structured German text document + +Please provide a comprehensive, unified summary that includes: + +1. General description of Bauzone {bauzone}: + - Zone category (Wohnzonen, Zentrumszonen, etc.) + - Geschosszahl (number of full storeys) + - Gewerbeerleichterung status (Ja/Nein) + - Where defined (article and page number) + +2. ALL building regulations with ACTUAL VALUES from tables (you MUST include the exact values): + - Ausnützungsziffer max.: [ACTUAL PERCENTAGE VALUE]% (from article, page) + - Vollgeschosse max.: [ACTUAL NUMBER] (from article, page) + - anrechenbare Dachgeschosse max.: [ACTUAL NUMBER] (from article, page) + - anrechenbares Attikageschoss max.: [ACTUAL NUMBER] (from article, page) + - anrechenbares Untergeschoss max.: [ACTUAL NUMBER] (from article, page) + - Gebäudelänge max.: [ACTUAL VALUE] m (from article, page) + - Grenzabstand - Grundabstand min.: [ACTUAL VALUE] m (from article, page) + - Grenzabstand - Mehrlängen-zuschlag: [ACTUAL FRACTION] (from article, page) + - Grenzabstand - Höchstmass max.: [ACTUAL VALUE] m (from article, page) + - Fassadenhöhen max.: [ACTUAL VALUE] m (from article, page, include footnote values if present) + +3. Zone definitions: Integrate information about where this zone is defined (which articles mention it, with page numbers) + +4. Relevant articles: Integrate key content from relevant articles naturally into the summary, citing article numbers and page numbers + +5. Special conditions: Any special requirements or exceptions mentioned in articles + +CRITICAL: You MUST include the actual numeric values from the tables in your summary. Do NOT say "see tables" - list the actual values. Format everything as ONE unified, flowing German text document without separate sections. Integrate zones and articles naturally into the narrative. +""" + + # Call AI service + logger.info(f"Generating AI summary for Bauzone {bauzone} in {gemeinde}") + ai_response = await aiService.callAiPlanning( + prompt=prompt, + debugType="bzo_summary" + ) + + return ai_response.strip() + + except Exception as e: + logger.error(f"Error generating AI summary: {str(e)}", exc_info=True) + # Return a basic summary if AI fails + return f"Summary generation failed: {str(e)}. Found {len(relevant_rules)} relevant rules and {len(relevant_zones)} zones for Bauzone {bauzone}." + diff --git a/modules/routes/routeRealEstate.py b/modules/routes/routeRealEstate.py index 935f665d..587209a2 100644 --- a/modules/routes/routeRealEstate.py +++ b/modules/routes/routeRealEstate.py @@ -5,7 +5,12 @@ Implements stateless endpoints for real estate database operations with AI-power import logging import json +import re import requests +import aiohttp +import asyncio +import ssl +from urllib.parse import urljoin, urlparse from typing import Optional, Dict, Any, List, Union from fastapi import APIRouter, HTTPException, Depends, Body, Request, Query, Path, status @@ -36,21 +41,33 @@ from .datamodelFeatureRealEstate import ( Land, Kontext, StatusProzess, + DokumentTyp, ) # Import interfaces from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface +from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface # Import feature logic for AI-powered commands from modules.features.realEstate.mainRealEstate import ( processNaturalLanguageCommand, create_project_with_parcel_data, + extract_bzo_information, ) # Import Swiss Topo MapServer connector for testing from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector from modules.connectors.connectorOerebWfs import OerebWfsConnector +# Import Tavily connector for BZO document search +from modules.aicore.aicorePluginTavily import AiTavily + +# Import helper functions from scraping route +from modules.routes.routeRealEstateScraping import ( + _get_language_from_kanton, + _get_bzo_search_query, +) + # Import attribute utilities for model schema from modules.shared.attributeUtils import getModelAttributeDefinitions @@ -1006,6 +1023,7 @@ async def search_parcel( request: Request, location: str = Query(..., description="Either coordinates as 'x,y' (LV95) or address string"), include_adjacent: bool = Query(False, description="Include adjacent parcels information"), + fetch_documents: bool = Query(True, description="If true, fetch BZO documents for the Gemeinde (default: true)"), currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """ @@ -1017,10 +1035,12 @@ async def search_parcel( - Administrative context (canton, municipality) - Link to official cadastral map - Optional: Adjacent parcels + - Optional: Gemeinde information and BZO documents (if fetch_documents=true) Query Parameters: - location: Either coordinates as "x,y" (LV95/EPSG:2056) or address string - include_adjacent: If true, fetches information about adjacent parcels (slower) + - fetch_documents: If true, checks for and fetches Bauzonenverordnung (BZO) documents for the Gemeinde (default: true, slower) Headers: - X-CSRF-Token: CSRF token (required for security) @@ -1029,6 +1049,7 @@ async def search_parcel( - GET /api/realestate/parcel/search?location=2600000,1200000 - GET /api/realestate/parcel/search?location=Bundesplatz 3, 3003 Bern - GET /api/realestate/parcel/search?location=Bundesplatz 3, 3003 Bern&include_adjacent=true + - GET /api/realestate/parcel/search?location=Bundesplatz 3, 3003 Bern&fetch_documents=true Returns: { @@ -1047,14 +1068,30 @@ async def search_parcel( "area_m2": 1234.56, "centroid": {"x": 2600000, "y": 1200000}, "geoportal_url": "https://...", - "realestate_type": null + "realestate_type": null, + "bauzone": "W3" }, "map_view": { "center": {"x": 2600000, "y": 1200000}, "zoom_bounds": {"min_x": ..., "max_x": ..., "min_y": ..., "max_y": ...}, "geometry_geojson": {...} }, - "adjacent_parcels": [...] // Optional (only if include_adjacent=true) + "adjacent_parcels": [...], // Optional (only if include_adjacent=true) + "gemeinde": { // Optional (only if fetch_documents=true) + "id": "...", + "label": "Bern", + "plz": "3011" + }, + "documents": [ // Optional (only if fetch_documents=true and documents found/created) + { + "id": "...", + "label": "BZO Bern", + "dokumentTyp": "gemeindeBzoAktuell", + "dokumentReferenz": "...", + "quelle": "https://...", + "mimeType": "application/pdf" + } + ] } """ try: @@ -1114,14 +1151,48 @@ async def search_parcel( municipality_name = None full_address = None plz = None + canton = attributes.get("ak") # Extract canton early so it's always available - # First, try to use geocoded address info if available (more accurate than centroid query) + # Debug: Log all available attributes to understand what we have + logger.debug(f"Parcel attributes keys: {list(attributes.keys())}") + logger.debug(f"Sample parcel attributes: {dict(list(attributes.items())[:10])}") # First 10 items + + # First, check if municipality is directly in parcel attributes (ggdename or dplzname) + # These fields are often present in the parcel data itself from Swiss Topo + municipality_from_attrs = attributes.get("ggdename") or attributes.get("dplzname") or attributes.get("gemeinde") or attributes.get("gemeindename") + if municipality_from_attrs: + # Use connector's cleaning method to remove canton suffix + municipality_name = connector._clean_municipality_name(str(municipality_from_attrs)) + logger.info(f"Found municipality '{municipality_name}' in parcel attributes (from {municipality_from_attrs})") + + # Also check extracted_attributes for municipality + if not municipality_name: + municipality_from_extracted = extracted_attributes.get("kontextGemeinde") + if municipality_from_extracted: + municipality_name = str(municipality_from_extracted) + logger.info(f"Found municipality '{municipality_name}' in extracted attributes") + + # Also check for PLZ in parcel attributes + if not plz: + plz_from_attrs = attributes.get("dplz4") or attributes.get("plz") + if plz_from_attrs: + plz = str(plz_from_attrs).strip() + logger.debug(f"Found PLZ '{plz}' in parcel attributes") + + # Try to use geocoded address info if available (more accurate than centroid query) geocoded_address = parcel_data.get('geocoded_address') if geocoded_address: - full_address = geocoded_address.get('full_address') - plz = geocoded_address.get('plz') - municipality_name = geocoded_address.get('municipality') - logger.debug(f"Using geocoded address: {full_address}") + if not full_address: + full_address = geocoded_address.get('full_address') + if not plz: + plz = geocoded_address.get('plz') + if not municipality_name: + geocoded_municipality = geocoded_address.get('municipality') + if geocoded_municipality: + municipality_name = connector._clean_municipality_name(geocoded_municipality) + logger.debug(f"Found municipality '{municipality_name}' from geocoded address") + if full_address: + logger.debug(f"Using geocoded address: {full_address}") # If geocoded address not available, try to get address by querying the address layer # Use query coordinates (where user clicked/geocoded) instead of parcel centroid @@ -1148,9 +1219,14 @@ async def search_parcel( # Extract address using connector's helper method address_info = connector._extract_address_from_building_attrs(addr_attrs) - full_address = address_info.get('full_address') - plz = address_info.get('plz') - municipality_name = address_info.get('municipality') + if not full_address: + full_address = address_info.get('full_address') + if not plz: + plz = address_info.get('plz') + if not municipality_name: + municipality_name = address_info.get('municipality') + if municipality_name: + logger.debug(f"Found municipality '{municipality_name}' from building layer") if full_address: logger.debug(f"Constructed address: {full_address}") @@ -1163,34 +1239,128 @@ async def search_parcel( full_address = location logger.debug(f"Using location as address: {full_address}") - # Try to extract municipality name from BFSNR if not found + # Try to extract municipality name from address string if not found yet + if not municipality_name and full_address: + # Parse address string to extract municipality name + # Format is usually: "Street Number, PLZ Municipality" or "Street Number PLZ Municipality" + # Examples: "Forchstrasse 6c, 8610 Uster" or "Bundesplatz 3 3011 Bern" + # Try to match PLZ followed by municipality name + # PLZ is typically 4 digits, municipality name follows + plz_municipality_match = re.search(r'\b(\d{4})\s+([A-ZÄÖÜ][a-zäöüß\s-]+)', full_address) + if plz_municipality_match: + extracted_plz = plz_municipality_match.group(1) + extracted_municipality = plz_municipality_match.group(2).strip() + # Remove trailing commas or other punctuation + extracted_municipality = re.sub(r'[,;\.]+$', '', extracted_municipality).strip() + if extracted_municipality: + municipality_name = extracted_municipality + if not plz: + plz = extracted_plz + logger.debug(f"Extracted municipality '{municipality_name}' and PLZ '{plz}' from address string") + + # Try to extract municipality name from BFSNR if still not found if not municipality_name: - # Common Swiss municipalities lookup (you can expand this) bfsnr = attributes.get("bfsnr") - canton = attributes.get("ak", "") - # Basic municipality lookup for common codes - common_municipalities = { - 351: "Bern", - 261: "Zürich", - 6621: "Genève", - 2701: "Basel", - 5586: "Lausanne", - 1061: "Luzern", - 3203: "Winterthur", - 230: "St. Gallen", - 5192: "Lugano", - 351: "Bern", - 1367: "Schwyz" - } + logger.info(f"Attempting to resolve municipality name for BFS number {bfsnr} in canton {canton}") - if bfsnr and bfsnr in common_municipalities: - municipality_name = common_municipalities[bfsnr] - logger.debug(f"Looked up municipality: {municipality_name}") - else: - # Fallback: Use canton + code - municipality_name = f"{canton}-{bfsnr}" if canton and bfsnr else "Unknown" - logger.debug(f"Using fallback municipality: {municipality_name}") + # Try to query database for Gemeinde by BFS number + if bfsnr and canton: + try: + realEstateInterface = getRealEstateInterface(currentUser) + # Query Gemeinde by BFS number (stored in kontextInformationen) + gemeinden = realEstateInterface.getGemeinden( + recordFilter={"mandateId": currentUser.mandateId} + ) + logger.debug(f"Found {len(gemeinden)} Gemeinden in database, searching for BFS {bfsnr}") + for gemeinde in gemeinden: + # Check kontextInformationen for BFS number + for kontext in gemeinde.kontextInformationen: + try: + kontext_data = json.loads(kontext.inhalt) if isinstance(kontext.inhalt, str) else kontext.inhalt + if isinstance(kontext_data, dict): + kontext_bfsnr = kontext_data.get("bfs_nummer") or kontext_data.get("bfsnr") or kontext_data.get("municipality_code") + if str(kontext_bfsnr) == str(bfsnr): + municipality_name = gemeinde.label + logger.info(f"Found Gemeinde '{municipality_name}' by BFS number {bfsnr} in database") + break + except (json.JSONDecodeError, AttributeError) as e: + logger.debug(f"Error parsing kontext: {e}") + continue + if municipality_name: + break + except Exception as e: + logger.warning(f"Error querying Gemeinde by BFS number: {e}", exc_info=True) + + # If still not found, try to use Swiss Topo geocoding API to get municipality name from coordinates + # This is more reliable than BFS number lookup since coordinates are exact + if not municipality_name and centroid: + try: + # Use Swiss Topo geocoding to get municipality name from coordinates + geocode_url = "https://api3.geo.admin.ch/rest/services/api/MapServer/identify" + params = { + "geometry": f"{centroid['x']},{centroid['y']}", + "geometryType": "esriGeometryPoint", + "layers": "all:ch.swisstopo.swissboundaries3d-gemeinde-flaeche.fill", + "tolerance": "0", + "returnGeometry": "false", + "sr": "2056" + } + import aiohttp + import ssl + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + connector_aiohttp = aiohttp.TCPConnector(ssl=ssl_context) + async with aiohttp.ClientSession(connector=connector_aiohttp) as session: + async with session.get(geocode_url, params=params) as resp: + if resp.status == 200: + data = await resp.json() + results = data.get("results", []) + if results: + result_attrs = results[0].get("attributes", {}) + geocoded_municipality = result_attrs.get("name") or result_attrs.get("gemeindename") or result_attrs.get("label") + if geocoded_municipality: + municipality_name = connector._clean_municipality_name(str(geocoded_municipality)) + logger.info(f"Found municipality '{municipality_name}' via Swiss Topo geocoding API (from {geocoded_municipality})") + except Exception as e: + logger.debug(f"Error querying Swiss Topo geocoding API: {e}", exc_info=True) + + # If still not found, try expanded Swiss municipalities lookup + if not municipality_name and bfsnr: + # Expanded Swiss municipalities lookup by BFS number + # Source: https://www.bfs.admin.ch/bfs/de/home/grundlagen/agvch.html + common_municipalities = { + # Zürich (ZH) + 261: "Zürich", + 198: "Pfäffikon", # ZH-198 is Pfäffikon + 191: "Uster", # Uster is ZH-191 + 3203: "Winterthur", + # Bern (BE) + 351: "Bern", + # Basel (BS) + 2701: "Basel", + # Genève (GE) + 6621: "Genève", + # Vaud (VD) + 5586: "Lausanne", + # Luzern (LU) + 1061: "Luzern", + # St. Gallen (SG) + 230: "St. Gallen", + # Ticino (TI) + 5192: "Lugano", + # Schwyz (SZ) + 1367: "Schwyz", + } + + if bfsnr in common_municipalities: + municipality_name = common_municipalities[bfsnr] + logger.info(f"Looked up municipality '{municipality_name}' from common list for BFS {bfsnr}") + + # If still not found, log warning + if not municipality_name: + logger.warning(f"Could not determine municipality name for BFS number {bfsnr} in canton {canton}. Municipality name will be None.") # Final validation: Don't use EGRID as address if full_address and full_address.startswith("CH") and len(full_address) == 14 and full_address[2:].isdigit(): @@ -1200,7 +1370,6 @@ async def search_parcel( # Query zone information (wohnzone/bauzone) from ÖREB WFS bauzone = None - canton = attributes.get("ak") # Check if geometry has actual data (either rings or coordinates) has_geometry = geometry and (geometry.get("rings") or geometry.get("coordinates")) if canton and has_geometry: @@ -1373,6 +1542,486 @@ async def search_parcel( logger.warning(f"Error fetching adjacent parcels: {e}", exc_info=True) response_data["adjacent_parcels"] = [] + # Fetch BZO documents if requested + gemeinde_info = None + bzo_documents = [] + + logger.debug(f"Document fetch check: fetch_documents={fetch_documents}, municipality_name={municipality_name}, canton={canton}") + + if fetch_documents and municipality_name and canton: + logger.info(f"Fetching BZO documents for Gemeinde '{municipality_name}' in canton '{canton}'") + try: + # Get interfaces + realEstateInterface = getRealEstateInterface(currentUser) + componentInterface = getComponentInterface(currentUser) + logger.debug(f"Interfaces initialized for document fetching") + + # Resolve or create Gemeinde + gemeinde = None + # First, ensure Land "Schweiz" exists + laender = realEstateInterface.getLaender(recordFilter={"label": "Schweiz"}) + if not laender: + land = Land( + mandateId=currentUser.mandateId, + label="Schweiz", + abk="CH" + ) + land = realEstateInterface.createLand(land) + logger.debug(f"Created Land 'Schweiz' with ID: {land.id}") + else: + land = laender[0] + + # Map canton abbreviations to full names + canton_names = { + "ZH": "Zürich", "BE": "Bern", "LU": "Luzern", "UR": "Uri", "SZ": "Schwyz", + "OW": "Obwalden", "NW": "Nidwalden", "GL": "Glarus", "ZG": "Zug", "FR": "Freiburg", + "SO": "Solothurn", "BS": "Basel-Stadt", "BL": "Basel-Landschaft", "SH": "Schaffhausen", + "AR": "Appenzell Ausserrhoden", "AI": "Appenzell Innerrhoden", "SG": "St. Gallen", + "GR": "Graubünden", "AG": "Aargau", "TG": "Thurgau", "TI": "Tessin", + "VD": "Waadt", "VS": "Wallis", "NE": "Neuenburg", "GE": "Genf", "JU": "Jura" + } + + # Get or create Kanton + kantone = realEstateInterface.getKantone(recordFilter={"abk": canton}) + if not kantone: + kanton_label = canton_names.get(canton, canton) + kanton_obj = Kanton( + mandateId=currentUser.mandateId, + label=kanton_label, + abk=canton, + id_land=land.id + ) + kanton_obj = realEstateInterface.createKanton(kanton_obj) + logger.debug(f"Created Kanton '{kanton_label}' ({canton})") + else: + kanton_obj = kantone[0] + + # Get or create Gemeinde + gemeinden = realEstateInterface.getGemeinden( + recordFilter={"label": municipality_name, "id_kanton": kanton_obj.id} + ) + if not gemeinden: + gemeinde = Gemeinde( + mandateId=currentUser.mandateId, + label=municipality_name, + id_kanton=kanton_obj.id, + plz=plz + ) + gemeinde = realEstateInterface.createGemeinde(gemeinde) + logger.info(f"Created Gemeinde '{municipality_name}'") + else: + gemeinde = gemeinden[0] + logger.debug(f"Found existing Gemeinde '{municipality_name}'") + + gemeinde_info = { + "id": gemeinde.id, + "label": gemeinde.label, + "plz": gemeinde.plz + } + + # Check if Gemeinde already has BZO documents + existing_bzo = False + logger.debug(f"Checking for existing BZO documents in Gemeinde '{gemeinde.label}' (has {len(gemeinde.dokumente) if gemeinde.dokumente else 0} documents)") + if gemeinde.dokumente: + for doc in gemeinde.dokumente: + if (doc.label and ("BZO" in doc.label.upper() or "BAU UND ZONENORDNUNG" in doc.label.upper() or + "PLAN D'AMÉNAGEMENT" in doc.label.upper() or "RÈGLEMENT DE CONSTRUCTION" in doc.label.upper() or + "PIANO DI UTILIZZAZIONE" in doc.label.upper() or "REGOLAMENTO EDILIZIO" in doc.label.upper())) or \ + (doc.dokumentTyp and doc.dokumentTyp in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]): + existing_bzo = True + logger.info(f"Found existing BZO document: {doc.label} (ID: {doc.id})") + bzo_documents.append({ + "id": doc.id, + "label": doc.label, + "dokumentTyp": doc.dokumentTyp.value if doc.dokumentTyp else None, + "dokumentReferenz": doc.dokumentReferenz, + "quelle": doc.quelle, + "mimeType": doc.mimeType + }) + + if existing_bzo: + logger.info(f"Gemeinde '{municipality_name}' already has {len(bzo_documents)} BZO document(s), skipping search") + + # If no BZO documents found, search and download + if not existing_bzo: + logger.info(f"No BZO documents found for {municipality_name}, searching with Tavily...") + + # Determine language + language = _get_language_from_kanton(canton) + + # Generate search query + search_query = _get_bzo_search_query(municipality_name, language) + logger.debug(f"Tavily search query: {search_query}") + + # Initialize Tavily connector + tavily = AiTavily() + + # Search with Tavily + search_results = await tavily._search( + query=search_query, + maxResults=5, + country="switzerland" + ) + + if search_results: + # First, check for direct PDF URLs in search results + pdf_urls = [] + html_urls = [] + + for result in search_results: + url = result.url.lower() + # Check if it's a direct PDF link + if url.endswith('.pdf') or '/pdf/' in url or url.endswith('/pdf'): + if not any(skip in url for skip in ['.html', '.htm', '/page/', '/article/', '/news/']): + pdf_urls.append(result.url) + else: + # It's an HTML page - we'll crawl it to find PDF links + html_urls.append(result.url) + + # If no direct PDFs found, scrape HTML pages directly to find PDF links + if not pdf_urls and html_urls: + logger.info(f"No direct PDF links found, scraping {len(html_urls)} HTML pages to find PDF documents...") + + # Helper function to scrape HTML and find PDF links + async def scrape_html_for_pdfs(url: str) -> List[str]: + """Scrape an HTML page to find PDF links.""" + found_pdfs = [] + try: + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + connector_aiohttp = aiohttp.TCPConnector(ssl=ssl_context) + + timeout = aiohttp.ClientTimeout(total=15, connect=5) + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8' + } + + async with aiohttp.ClientSession(timeout=timeout, headers=headers, connector=connector_aiohttp) as session: + async with session.get(url, allow_redirects=True) as response: + if response.status == 200: + # Check Content-Type header first + content_type = response.headers.get('Content-Type', '').lower() + + # Read first few bytes to check if it's a PDF + raw_bytes = await response.read() + + # Check if it's actually a PDF by magic bytes + if raw_bytes.startswith(b'%PDF'): + found_pdfs.append(url) + logger.info(f"Found direct PDF link (detected by magic bytes): {url}") + return found_pdfs + + # If Content-Type says it's a PDF, add it + if 'application/pdf' in content_type: + found_pdfs.append(url) + logger.info(f"Found direct PDF link (Content-Type): {url}") + return found_pdfs + + # If URL ends with .pdf, it's likely a PDF + if url.lower().endswith('.pdf'): + found_pdfs.append(url) + logger.info(f"Found direct PDF link (URL extension): {url}") + return found_pdfs + + # Try to decode as text for HTML parsing + try: + # Try UTF-8 first + html_content = raw_bytes.decode('utf-8') + except UnicodeDecodeError: + try: + # Try ISO-8859-1 (common for German sites) + html_content = raw_bytes.decode('iso-8859-1') + except UnicodeDecodeError: + try: + # Try Windows-1252 + html_content = raw_bytes.decode('windows-1252') + except UnicodeDecodeError: + # If all else fails, skip this URL + logger.warning(f"Could not decode content from {url} (not UTF-8, ISO-8859-1, or Windows-1252), skipping HTML parsing") + return found_pdfs + + # Look for PDF links in various formats + # Pattern 1: Direct PDF URLs + pdf_pattern = r'https?://[^\s<>"\'\)]+\.pdf(?:\?[^\s<>"\'\)]*)?' + found = re.findall(pdf_pattern, html_content, re.IGNORECASE) + + # Pattern 2: Relative PDF links (convert to absolute) + relative_pattern = r'href=["\']([^"\']+\.pdf[^"\']*)["\']' + relative_found = re.findall(relative_pattern, html_content, re.IGNORECASE) + + # Convert relative URLs to absolute + base_url = f"{urlparse(url).scheme}://{urlparse(url).netloc}" + + for rel_url in relative_found: + # Remove query params and fragments for cleaner URLs + clean_url = rel_url.split('?')[0].split('#')[0] + if clean_url.endswith('.pdf'): + abs_url = urljoin(base_url, clean_url) + if abs_url not in found: + found.append(abs_url) + + # Pattern 3: Look in data attributes and other places + data_pattern = r'data-[^=]*=["\']([^"\']+\.pdf[^"\']*)["\']' + data_found = re.findall(data_pattern, html_content, re.IGNORECASE) + for data_url in data_found: + clean_url = data_url.split('?')[0].split('#')[0] + if clean_url.endswith('.pdf'): + abs_url = urljoin(base_url, clean_url) if not clean_url.startswith('http') else clean_url + if abs_url not in found: + found.append(abs_url) + + # Clean and deduplicate URLs + for pdf_link in found: + pdf_link = pdf_link.rstrip('.,;:!?)').strip() + # Remove common tracking parameters + if '?' in pdf_link: + base, params = pdf_link.split('?', 1) + # Keep only important params, remove tracking + important_params = [] + for param in params.split('&'): + if param.split('=')[0].lower() not in ['utm_source', 'utm_medium', 'utm_campaign', 'ref', 'fbclid', 'gclid']: + important_params.append(param) + if important_params: + pdf_link = f"{base}?{'&'.join(important_params)}" + else: + pdf_link = base + + if pdf_link not in found_pdfs and pdf_link.startswith('http'): + found_pdfs.append(pdf_link) + logger.debug(f"Found PDF link on {url}: {pdf_link}") + + logger.info(f"Found {len(found_pdfs)} PDF links on {url}") + + except Exception as e: + logger.debug(f"Error scraping {url} for PDFs: {e}", exc_info=True) + + return found_pdfs + + # Scrape HTML pages to find PDF links + for html_url in html_urls[:5]: # Limit to first 5 URLs + try: + logger.debug(f"Scraping {html_url} to find PDF links...") + found_pdfs = await scrape_html_for_pdfs(html_url) + pdf_urls.extend(found_pdfs) + except Exception as e: + logger.warning(f"Error scraping {html_url} to find PDFs: {e}", exc_info=True) + continue + + # Also check rawContent from search results for PDF links + for result in search_results: + if result.rawContent: + pdf_pattern = r'https?://[^\s<>"\'\)]+\.pdf(?:\?[^\s<>"\'\)]*)?' + found_pdfs = re.findall(pdf_pattern, result.rawContent, re.IGNORECASE) + for pdf_link in found_pdfs: + pdf_link = pdf_link.rstrip('.,;:!?)').strip() + if pdf_link not in pdf_urls and pdf_link.startswith('http'): + pdf_urls.append(pdf_link) + logger.debug(f"Found PDF link in rawContent: {pdf_link}") + + if not pdf_urls: + logger.warning(f"No PDF URLs found in Tavily results for {municipality_name}. Results were HTML pages, not direct PDF links.") + logger.debug(f"Tavily returned URLs: {[r.url for r in search_results]}") + + logger.info(f"Found {len(pdf_urls)} potential PDF documents for {municipality_name}") + + # Helper function to download a single PDF + async def download_pdf(pdf_url: str) -> Optional[bytes]: + """Download a PDF from a URL with retry logic.""" + max_retries = 3 + retry_delay = 2 + + for attempt in range(max_retries): + try: + if attempt > 0: + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + 'Accept': '*/*' + } + else: + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'Accept': 'application/pdf,application/octet-stream,*/*', + 'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br', + 'Connection': 'keep-alive', + 'Upgrade-Insecure-Requests': '1' + } + + # Create SSL context that doesn't verify certificates (for development) + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + # Create connector with SSL context + connector = aiohttp.TCPConnector(ssl=ssl_context) + + timeout = aiohttp.ClientTimeout(total=30, connect=10) + async with aiohttp.ClientSession(timeout=timeout, headers=headers, connector=connector) as session: + async with session.get(pdf_url, allow_redirects=True) as response: + if response.status == 200: + # Check content-type header first + content_type = response.headers.get('Content-Type', '').lower() + if 'text/html' in content_type or 'text/xml' in content_type: + logger.warning(f"URL {pdf_url} returned HTML content (Content-Type: {content_type}), skipping") + raise Exception("Server returned HTML content instead of PDF") + + pdf_content = await response.read() + + if not pdf_content or len(pdf_content) < 100: + raise Exception("Downloaded file is too small or empty") + + # Verify it's actually a PDF + if not pdf_content.startswith(b'%PDF'): + if pdf_content.startswith(b'<') or pdf_content.startswith(b' 1: + file_name = f"BZO_{safe_name}_{idx + 1}.pdf" + doc_label = f"{base_doc_label} ({idx + 1})" + else: + file_name = f"BZO_{safe_name}.pdf" + doc_label = base_doc_label + + # Store file using ComponentObjects + try: + file_item = componentInterface.createFile( + name=file_name, + mimeType="application/pdf", + content=pdf_content + ) + + componentInterface.createFileData(file_item.id, pdf_content) + logger.info(f"Stored file {file_name} with ID {file_item.id}") + except Exception as e: + logger.error(f"Error storing file {file_name}: {str(e)}", exc_info=True) + continue + + # Create Dokument record + dokument = Dokument( + mandateId=currentUser.mandateId, + label=doc_label, + versionsbezeichnung="Aktuell", + dokumentTyp=DokumentTyp.GEMEINDE_BZO_AKTUELL, + dokumentReferenz=file_item.id, + quelle=pdf_url, + mimeType="application/pdf", + kategorienTags=["BZO", "Bauordnung", municipality_name] + ) + + # Create Dokument record + created_dokument = realEstateInterface.createDokument(dokument) + logger.info(f"Created Dokument record with ID {created_dokument.id}") + + current_dokumente.append(created_dokument) + + # Add to response + bzo_documents.append({ + "id": created_dokument.id, + "label": created_dokument.label, + "dokumentTyp": created_dokument.dokumentTyp.value if created_dokument.dokumentTyp else None, + "dokumentReferenz": created_dokument.dokumentReferenz, + "quelle": created_dokument.quelle, + "mimeType": created_dokument.mimeType + }) + + except Exception as e: + logger.error(f"Error processing PDF {pdf_url}: {str(e)}", exc_info=True) + continue + + # Update Gemeinde with new dokumente + if bzo_documents: + updated_gemeinde = realEstateInterface.updateGemeinde( + gemeinde.id, + {"dokumente": current_dokumente} + ) + if updated_gemeinde: + logger.info(f"Successfully created {len(bzo_documents)} BZO document(s) for {municipality_name}") + else: + logger.warning(f"No search results found for {municipality_name}") + + except Exception as e: + logger.error(f"Error fetching BZO documents for {municipality_name}: {e}", exc_info=True) + # Continue without documents - don't fail the request + elif fetch_documents: + if not municipality_name: + logger.warning("fetch_documents=true but municipality_name is not available, skipping document fetch") + elif not canton: + logger.warning("fetch_documents=true but canton is not available, skipping document fetch") + + # Add Gemeinde and documents to response if available + logger.debug(f"Adding to response: gemeinde_info={gemeinde_info is not None}, bzo_documents count={len(bzo_documents)}") + if gemeinde_info: + response_data["gemeinde"] = gemeinde_info + logger.debug(f"Added gemeinde_info to response: {gemeinde_info}") + if bzo_documents: + response_data["documents"] = bzo_documents + logger.info(f"Added {len(bzo_documents)} BZO documents to response") + else: + logger.debug("No BZO documents to add to response") + return response_data except HTTPException: @@ -1598,3 +2247,118 @@ async def add_parcel_to_project( detail=f"Error adding parcel to project: {str(e)}" ) + +@router.get("/bzo-information", response_model=Dict[str, Any]) +@limiter.limit("30/minute") +async def get_bzo_information( + request: Request, + gemeinde: str = Query(..., description="Gemeinde name or ID"), + bauzone: str = Query(..., description="Bauzone code (e.g., W3, W2/30)"), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """ + Extract BZO information from PDF documents for a specific Bauzone in a Gemeinde. + + Uses a langgraph workflow to extract content from BZO PDF documents for the + specified Gemeinde, then uses AI to search for relevant information specific + to the specified Bauzone. + + The workflow: + 1. Finds BZO documents for the Gemeinde (by name or ID) + 2. Extracts content from PDFs using langgraph workflow + 3. Filters rules, zones, and articles by Bauzone + 4. Uses AI to generate a summary and find relevant information + + Query Parameters: + - gemeinde: Gemeinde name (e.g., "Zürich") or ID + - bauzone: Bauzone code (e.g., "W3", "W2/30", "Z3") + + Headers: + - X-CSRF-Token: CSRF token (required for security) + + Returns: + { + "bauzone": "W3", + "gemeinde": { + "id": "...", + "label": "...", + "plz": "..." + }, + "extracted_content": { + "zones": [...], // Zone information filtered by Bauzone + "rules": [...], // Rules filtered by Bauzone + "articles": [...], // Articles filtered by Bauzone + "total_zones": N, + "total_rules": N, + "total_articles": N + }, + "ai_summary": "...", // AI-generated summary + "relevant_rules": [...], // Rules specifically for this Bauzone + "documents_processed": [ // List of document IDs processed + { + "id": "...", + "label": "...", + "dokumentTyp": "..." + } + ], + "errors": [...], + "warnings": [...] + } + + Examples: + - GET /api/realestate/bzo-information?gemeinde=Zürich&bauzone=W3 + - GET /api/realestate/bzo-information?gemeinde=Uster&bauzone=W2/30 + + Raises: + - 404: Gemeinde not found + - 404: No BZO documents found for Gemeinde + - 500: Error during extraction or processing + """ + try: + # Validate CSRF token + csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token") + if not csrf_token: + logger.warning(f"CSRF token missing for GET /api/realestate/bzo-information from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="CSRF token missing. Please include X-CSRF-Token header." + ) + + # Basic CSRF token format validation + if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64: + logger.warning(f"Invalid CSRF token format for GET /api/realestate/bzo-information from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid CSRF token format" + ) + + # Validate token is hex string + try: + int(csrf_token, 16) + except ValueError: + logger.warning(f"CSRF token is not a valid hex string for GET /api/realestate/bzo-information from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid CSRF token format" + ) + + logger.info(f"Extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}' (user: {currentUser.id}, mandate: {currentUser.mandateId})") + + # Call the feature function + result = await extract_bzo_information( + currentUser=currentUser, + gemeinde=gemeinde, + bauzone=bauzone + ) + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}': {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error extracting BZO information: {str(e)}" + ) +