From fb6bac7d9229875bbd45de84b0e77472d036ae66 Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 26 Jan 2026 07:14:56 +0100 Subject: [PATCH] feat: fetch oereb kataster and return Wohnzohne --- modules/connectors/connectorOerebWfs.py | 525 +++++++++++ .../realEstate/bzoDocumentRetriever.py | 199 ++++ .../realEstate/bzoExtractionLangGraph.py | 738 +++++++++++++++ .../features/realEstate/bzoPdfExtractor.py | 117 +++ .../features/realEstate/bzoRuleTaxonomy.py | 105 +++ .../features/realEstate/scrapeSwissTopo.py | 780 ++++++++++++++++ modules/routes/routeRealEstateScraping.py | 879 ++++++++++++++++++ 7 files changed, 3343 insertions(+) create mode 100644 modules/connectors/connectorOerebWfs.py create mode 100644 modules/features/realEstate/bzoDocumentRetriever.py create mode 100644 modules/features/realEstate/bzoExtractionLangGraph.py create mode 100644 modules/features/realEstate/bzoPdfExtractor.py create mode 100644 modules/features/realEstate/bzoRuleTaxonomy.py create mode 100644 modules/features/realEstate/scrapeSwissTopo.py create mode 100644 modules/routes/routeRealEstateScraping.py diff --git a/modules/connectors/connectorOerebWfs.py b/modules/connectors/connectorOerebWfs.py new file mode 100644 index 00000000..62b0ee18 --- /dev/null +++ b/modules/connectors/connectorOerebWfs.py @@ -0,0 +1,525 @@ +""" +ÖREB WFS Connector + +This connector handles interactions with ÖREB (Öffentlich-rechtliche Eigentumsbeschränkungen) +WFS services for zone information retrieval. + +ÖREB provides zoning information (Bauzonen) through WFS services. +""" + +import logging +from typing import Dict, List, Any, Optional +import aiohttp +import xml.etree.ElementTree as ET +from shapely.geometry import Polygon + +logger = logging.getLogger(__name__) + + +class OerebWfsConnector: + """ + Connector for ÖREB WFS services. + + Provides methods for: + - Querying zone information (Bauzonen) by parcel geometry + - Retrieving zoning data from canton-specific WFS services + """ + + def __init__( + self, + timeout: int = 10, + max_retries: int = 3, + retry_delay: float = 1.0 + ): + """ + Initialize ÖREB WFS connector. + + Args: + timeout: Request timeout in seconds + max_retries: Maximum number of retry attempts + retry_delay: Initial retry delay in seconds (exponential backoff) + """ + self.timeout = aiohttp.ClientTimeout(total=timeout) + self.max_retries = max_retries + self.retry_delay = retry_delay + self._wfs_cache: Dict[str, List[Dict[str, Any]]] = {} # Cache for WFS queries by bbox + + logger.info("ÖREB WFS Connector initialized") + + def _get_oereb_wfs_url(self, canton: str) -> Optional[str]: + """ + Get ÖREB WFS service URL for a given canton. + + Args: + canton: Canton abbreviation (e.g., "ZH", "BE") + + Returns: + WFS service URL or None if canton not supported + """ + oereb_wfs_urls = { + "ZH": "https://maps.zh.ch/wfs/OerebKatasterZHWFS", + } + return oereb_wfs_urls.get(canton.upper()) + + def _geometry_to_shapely_polygon(self, geometry: Dict[str, Any]) -> Optional[Polygon]: + """ + Convert parcel geometry (ESRI rings or GeoJSON coordinates) to Shapely Polygon. + + Args: + geometry: Geometry dictionary (ESRI rings or GeoJSON coordinates) + + Returns: + Shapely Polygon or None if invalid + """ + try: + # Handle ESRI geometry format (rings) + if "rings" in geometry: + rings = geometry.get("rings", []) + if not rings or not rings[0]: + return None + + # Use the first ring (exterior) for the polygon + exterior_ring = rings[0] + if len(exterior_ring) < 3: + return None + + # Ensure polygon is closed + coords = list(exterior_ring) + if coords[0] != coords[-1]: + coords.append(coords[0]) + + return Polygon(coords) + + # Handle GeoJSON format (coordinates) + elif "coordinates" in geometry: + coords = geometry.get("coordinates", []) + if not coords: + return None + + # Handle Polygon coordinates: [[[x1,y1], [x2,y2], ...]] + # Flatten to get the exterior ring + def extract_exterior(coord_list, depth=0): + if depth == 0 and isinstance(coord_list, list) and len(coord_list) > 0: + # First level might be array of rings, take first one + if isinstance(coord_list[0], list) and len(coord_list[0]) > 0: + if isinstance(coord_list[0][0], list): + # This is Polygon format: [[[x,y],...]] + return extract_exterior(coord_list[0], depth + 1) + elif isinstance(coord_list[0][0], (int, float)): + # This is already a ring: [[x,y],...] + return coord_list[0] + elif depth == 1 and isinstance(coord_list, list) and len(coord_list) > 0: + if isinstance(coord_list[0], (int, float)): + return coord_list + elif isinstance(coord_list[0], list): + return coord_list + return coord_list + + exterior_coords = extract_exterior(coords) + if not exterior_coords or len(exterior_coords) < 3: + return None + + # Ensure polygon is closed + coords_list = list(exterior_coords) + if coords_list[0] != coords_list[-1]: + coords_list.append(coords_list[0]) + + return Polygon(coords_list) + + except Exception as e: + logger.debug(f"Error converting geometry to Shapely Polygon: {e}") + + return None + + def _parse_gml_geometry(self, feature_elem: ET.Element) -> Optional[Polygon]: + """ + Parse GML geometry from WFS feature element and convert to Shapely Polygon. + + Args: + feature_elem: XML element containing the feature + + Returns: + Shapely Polygon or None if geometry not found or invalid + """ + try: + # Common GML namespaces + namespaces = { + 'gml': 'http://www.opengis.net/gml', + 'gml3': 'http://www.opengis.net/gml/3.2', + 'gml32': 'http://www.opengis.net/gml/3.2' + } + + # Try to find polygon geometry + polygon_elem = None + for ns_prefix, ns_url in namespaces.items(): + # Try different GML polygon element names + for tag_name in ['Polygon', 'polygon', 'PolygonProperty', 'geometryProperty']: + polygon_elem = feature_elem.find(f'.//{{{ns_url}}}{tag_name}') + if polygon_elem is not None: + break + # Also try without namespace prefix + polygon_elem = feature_elem.find(f'.//{tag_name}') + if polygon_elem is not None: + break + if polygon_elem is not None: + break + + if polygon_elem is None: + # Try to find any geometry element + for ns_prefix, ns_url in namespaces.items(): + polygon_elem = feature_elem.find(f'.//{{{ns_url}}}*') + if polygon_elem is not None and 'polygon' in polygon_elem.tag.lower(): + break + + if polygon_elem is None: + return None + + # Extract coordinates from GML + # GML Polygon typically has exterior ring with posList or pos elements + coords = [] + + # Try posList (most common in GML 3.2) + for ns_prefix, ns_url in namespaces.items(): + pos_list = polygon_elem.find(f'.//{{{ns_url}}}posList') + if pos_list is not None and pos_list.text: + # posList format: "x1 y1 x2 y2 x3 y3 ..." + coord_strings = pos_list.text.strip().split() + for i in range(0, len(coord_strings) - 1, 2): + if i + 1 < len(coord_strings): + x = float(coord_strings[i]) + y = float(coord_strings[i + 1]) + coords.append((x, y)) + break + + # If no posList, try pos elements + if not coords: + for ns_prefix, ns_url in namespaces.items(): + pos_elems = polygon_elem.findall(f'.//{{{ns_url}}}pos') + if pos_elems: + for pos in pos_elems: + if pos.text: + parts = pos.text.strip().split() + if len(parts) >= 2: + x = float(parts[0]) + y = float(parts[1]) + coords.append((x, y)) + break + + # If still no coords, try coordinates element (GML 2) + if not coords: + for ns_prefix, ns_url in namespaces.items(): + coords_elem = polygon_elem.find(f'.//{{{ns_url}}}coordinates') + if coords_elem is not None and coords_elem.text: + # GML 2 coordinates format: "x1,y1 x2,y2 ..." or "x1,y1,z1 x2,y2,z2 ..." + coord_strings = coords_elem.text.strip().split() + for coord_str in coord_strings: + parts = coord_str.split(',') + if len(parts) >= 2: + x = float(parts[0]) + y = float(parts[1]) + coords.append((x, y)) + break + + if len(coords) < 3: + return None + + # Ensure polygon is closed + if coords[0] != coords[-1]: + coords.append(coords[0]) + + return Polygon(coords) + + except Exception as e: + logger.debug(f"Error parsing GML geometry: {e}") + return None + + def _calculate_bbox_from_geometry(self, geometry: Dict[str, Any]) -> Optional[str]: + """ + Calculate bounding box from geometry for WFS queries. + + Args: + geometry: Geometry dictionary (ESRI rings or GeoJSON coordinates) + + Returns: + Bounding box string in format "min_x,min_y,max_x,max_y" or None if invalid + """ + try: + # Handle ESRI geometry format (rings) + if "rings" in geometry: + rings = geometry.get("rings", []) + if not rings or not rings[0]: + return None + + # Flatten all coordinates from all rings + all_coords = [] + for ring in rings: + all_coords.extend(ring) + + if not all_coords: + return None + + # Calculate bbox + x_coords = [coord[0] for coord in all_coords] + y_coords = [coord[1] for coord in all_coords] + + min_x = min(x_coords) + min_y = min(y_coords) + max_x = max(x_coords) + max_y = max(y_coords) + + return f"{min_x},{min_y},{max_x},{max_y}" + + # Handle GeoJSON format (coordinates) + elif "coordinates" in geometry: + coords = geometry.get("coordinates", []) + if not coords: + return None + + # Flatten coordinates based on geometry type + def flatten_coords(coord_list, depth=0): + if depth < 2: + result = [] + for item in coord_list: + if isinstance(item, (int, float)): + return coord_list + result.extend(flatten_coords(item, depth + 1)) + return result + return coord_list + + flat_coords = flatten_coords(coords) + if not flat_coords or len(flat_coords) < 2: + return None + + x_coords = [flat_coords[i] for i in range(0, len(flat_coords), 2)] + y_coords = [flat_coords[i+1] for i in range(0, len(flat_coords)-1, 2)] + + min_x = min(x_coords) + min_y = min(y_coords) + max_x = max(x_coords) + max_y = max(y_coords) + + return f"{min_x},{min_y},{max_x},{max_y}" + + except Exception as e: + logger.debug(f"Error calculating bbox from geometry: {e}") + + return None + + async def _query_wfs_get_feature( + self, + wfs_url: str, + type_name: str, + bbox: str, + srs: str = "EPSG:2056" + ) -> List[Dict[str, Any]]: + """ + Query WFS GetFeature to retrieve zone features within a bounding box. + + Args: + wfs_url: WFS service URL + type_name: Feature type name (e.g., "nutzungsplanung") + bbox: Bounding box string "min_x,min_y,max_x,max_y" + srs: Spatial reference system (default: EPSG:2056 for LV95) + + Returns: + List of feature dictionaries with properties and attributes + """ + # Only use WFS 1.1.0 (we know this works) + params = { + "service": "WFS", + "version": "1.1.0", + "request": "GetFeature", + "typeName": type_name, + "bbox": bbox, + "srsName": srs + } + + logger.debug(f"Querying WFS GetFeature: {wfs_url} with typeName={type_name}, bbox={bbox}") + + try: + async with aiohttp.ClientSession(timeout=self.timeout) as session: + async with session.get(wfs_url, params=params) as response: + if response.status != 200: + logger.debug(f"WFS GetFeature returned status {response.status}") + return [] + + # Parse XML/GML response + xml_content = await response.text() + + try: + root = ET.fromstring(xml_content) + + features = [] + members = root.findall('.//{http://www.opengis.net/gml}featureMember') or \ + root.findall('.//featureMember') + + for member in members: + attrs = {} + + # Find feature element + feature_elem = member + for child in member: + if child.tag and ('nutzung' in child.tag.lower() or 'plan' in child.tag.lower()): + feature_elem = child + break + + # Extract attributes + for elem in feature_elem.iter(): + if elem.tag and elem.text and elem.text.strip(): + tag_lower = elem.tag.lower() + if any(term in tag_lower for term in [ + 'pos', 'coordinates', 'point', 'polygon', 'linestring', + 'geometry', 'boundedby', 'envelope', 'gml' + ]): + continue + + tag_name = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag + if ':' in tag_name: + tag_name = tag_name.split(':')[-1] + + if tag_name not in attrs: + attrs[tag_name] = elem.text.strip() + + # Parse geometry from GML + geometry_polygon = self._parse_gml_geometry(feature_elem) + + if attrs: + feature_dict = {"properties": attrs, "attributes": attrs} + if geometry_polygon: + feature_dict["geometry"] = geometry_polygon + features.append(feature_dict) + + return features + + except ET.ParseError as e: + logger.debug(f"Failed to parse WFS XML response: {e}") + return [] + except Exception as e: + logger.debug(f"Error parsing WFS XML: {e}") + return [] + + except Exception as e: + logger.debug(f"WFS GetFeature query failed: {e}") + return [] + + async def query_zone_layer( + self, + egrid: str, + x: float, + y: float, + canton: Optional[str] = None, + geometry: Optional[Dict[str, Any]] = None + ) -> List[Dict[str, Any]]: + """ + Query zone information using ÖREB WFS service. + + Returns only zones that contain the parcel based on the parcel geometry. + + Args: + egrid: EGRID identifier (not currently used but kept for API compatibility) + x: X coordinate (LV95) - not used but kept for compatibility + y: Y coordinate (LV95) - not used but kept for compatibility + canton: Canton abbreviation (e.g., "ZH", "BE") + geometry: Parcel geometry dictionary (ESRI rings or GeoJSON coordinates) + + Returns: + List of zone dictionaries with layerBodId and attributes, or empty list if not found + """ + if not canton or not geometry: + return [] + + wfs_url = self._get_oereb_wfs_url(canton) + if not wfs_url: + return [] + + try: + bbox = self._calculate_bbox_from_geometry(geometry) + if not bbox: + return [] + + # Check cache + cache_key = f"{wfs_url}:{bbox}" + if cache_key in self._wfs_cache: + cached_features = self._wfs_cache[cache_key] + else: + cached_features = await self._query_wfs_get_feature(wfs_url, "nutzungsplanung", bbox) + self._wfs_cache[cache_key] = cached_features + + if not cached_features: + return [] + + # Convert parcel geometry to Shapely Polygon for spatial validation + parcel_polygon = self._geometry_to_shapely_polygon(geometry) + if not parcel_polygon: + logger.debug("Could not convert parcel geometry to Shapely Polygon") + # Fallback to first zone if geometry conversion fails + for feature in cached_features: + attrs = feature.get("properties", feature.get("attributes", {})) + typ_gde_abkuerzung = attrs.get("typ_gde_abkuerzung") + if typ_gde_abkuerzung: + return [{ + "layerBodId": "oereb_wfs", + "attributes": {"typ_gde_abkuerzung": typ_gde_abkuerzung} + }] + return [] + + # Find the zone that actually contains or intersects the parcel + # Since a parcel is always in exactly one zone, we check for containment first, + # then find the zone with the largest intersection area if no perfect containment is found + containing_zone = None + best_intersecting_zone = None + best_intersection_area = 0.0 + + for feature in cached_features: + attrs = feature.get("properties", feature.get("attributes", {})) + typ_gde_abkuerzung = attrs.get("typ_gde_abkuerzung") + + if not typ_gde_abkuerzung: + continue + + zone_geometry = feature.get("geometry") + if not zone_geometry: + # If geometry not parsed, skip spatial check for this feature + # But keep it as fallback if no geometry-based match is found + if not best_intersecting_zone: + best_intersecting_zone = feature + continue + + try: + # Check if zone contains the parcel (most precise) + if zone_geometry.contains(parcel_polygon): + containing_zone = feature + break # Found perfect match, stop searching + + # Check if zone intersects the parcel (for border cases) + if zone_geometry.intersects(parcel_polygon): + # Calculate intersection area to find the best match + intersection = zone_geometry.intersection(parcel_polygon) + if not intersection.is_empty: + intersection_area = intersection.area + # Keep the zone with the largest intersection area + if intersection_area > best_intersection_area: + best_intersection_area = intersection_area + best_intersecting_zone = feature + + except Exception as e: + logger.debug(f"Error checking spatial relationship: {e}") + # If spatial check fails, keep as fallback + if not best_intersecting_zone: + best_intersecting_zone = feature + + # Return the containing zone if found, otherwise the best intersecting zone + selected_feature = containing_zone or best_intersecting_zone + if selected_feature: + attrs = selected_feature.get("properties", selected_feature.get("attributes", {})) + typ_gde_abkuerzung = attrs.get("typ_gde_abkuerzung") + if typ_gde_abkuerzung: + return [{ + "layerBodId": "oereb_wfs", + "attributes": {"typ_gde_abkuerzung": typ_gde_abkuerzung} + }] + + return [] + + except Exception: + return [] diff --git a/modules/features/realEstate/bzoDocumentRetriever.py b/modules/features/realEstate/bzoDocumentRetriever.py new file mode 100644 index 00000000..a86c763b --- /dev/null +++ b/modules/features/realEstate/bzoDocumentRetriever.py @@ -0,0 +1,199 @@ +""" +Document retriever for BZO extraction pipeline. +Queries Dokument table and retrieves PDF content from ComponentObjects. +""" + +import logging +from typing import List, Dict, Any, Optional +from modules.datamodels.datamodelRealEstate import Dokument, DokumentTyp, Gemeinde +from modules.interfaces.interfaceDbRealEstateObjects import RealEstateObjects +from modules.interfaces.interfaceDbComponentObjects import ComponentObjects + +logger = logging.getLogger(__name__) + + +class BZODocumentRetriever: + """Retrieves BZO documents from database and ComponentObjects.""" + + def __init__(self, realEstateInterface: RealEstateObjects, componentInterface: ComponentObjects): + """ + Initialize document retriever. + + Args: + realEstateInterface: Real Estate database interface + componentInterface: ComponentObjects interface for file retrieval + """ + self.realEstateInterface = realEstateInterface + self.componentInterface = componentInterface + + def get_documents_by_ids(self, dokument_ids: List[str]) -> List[Dokument]: + """ + Retrieve specific documents by their IDs. + + Args: + dokument_ids: List of dokument IDs to retrieve + + Returns: + List of Dokument records + """ + try: + dokumente = [] + for dokument_id in dokument_ids: + dokument = self.realEstateInterface.getDokument(dokument_id) + if dokument: + dokumente.append(dokument) + else: + logger.warning(f"Dokument {dokument_id} not found") + + logger.info(f"Retrieved {len(dokumente)} documents out of {len(dokument_ids)} requested") + return dokumente + + except Exception as e: + logger.error(f"Error retrieving documents by IDs: {str(e)}", exc_info=True) + raise + + def get_bzo_documents(self, gemeinde_id: Optional[str] = None) -> List[Dokument]: + """ + Query Dokument table for BZO documents. + + Args: + gemeinde_id: Optional filter by specific Gemeinde + + Returns: + List of Dokument records with BZO document type + """ + try: + # Query each document type separately (database connector doesn't support array filters) + dokumente_aktuell = self.realEstateInterface.getDokumente( + recordFilter={ + "mandateId": self.realEstateInterface.mandateId, + "dokumentTyp": DokumentTyp.GEMEINDE_BZO_AKTUELL + } + ) + + dokumente_revision = self.realEstateInterface.getDokumente( + recordFilter={ + "mandateId": self.realEstateInterface.mandateId, + "dokumentTyp": DokumentTyp.GEMEINDE_BZO_REVISION + } + ) + + # Combine results and deduplicate by ID + dokumente_dict = {} + for dokument in dokumente_aktuell + dokumente_revision: + dokumente_dict[dokument.id] = dokument + dokumente = list(dokumente_dict.values()) + + # If gemeinde_id provided, filter by checking Gemeinde.dokumente relationship + if gemeinde_id: + filtered_dokumente = [] + for dokument in dokumente: + # Check if this dokument is linked to the specified Gemeinde + gemeinden = self.realEstateInterface.getGemeinden( + recordFilter={"mandateId": self.realEstateInterface.mandateId} + ) + for gemeinde in gemeinden: + if gemeinde.id == gemeinde_id: + # Check if dokument.id is in gemeinde.dokumente + if gemeinde.dokumente: + for doc in gemeinde.dokumente: + if isinstance(doc, dict) and doc.get("id") == dokument.id: + filtered_dokumente.append(dokument) + break + elif hasattr(doc, "id") and doc.id == dokument.id: + filtered_dokumente.append(dokument) + break + break + dokumente = filtered_dokumente + + logger.info(f"Found {len(dokumente)} BZO documents" + (f" for Gemeinde {gemeinde_id}" if gemeinde_id else "")) + return dokumente + + except Exception as e: + logger.error(f"Error querying BZO documents: {str(e)}", exc_info=True) + raise + + def retrieve_pdf_content(self, dokument: Dokument) -> Optional[bytes]: + """ + Retrieve PDF bytes from ComponentObjects using dokumentReferenz. + + Args: + dokument: Dokument record with dokumentReferenz field + + Returns: + PDF bytes or None if retrieval fails + """ + try: + if not dokument.dokumentReferenz: + logger.warning(f"Dokument {dokument.id} has no dokumentReferenz") + return None + + # Retrieve PDF bytes + pdf_bytes = self.componentInterface.getFileData(dokument.dokumentReferenz) + + if not pdf_bytes: + logger.warning(f"Could not retrieve PDF content for file {dokument.dokumentReferenz}") + return None + + logger.debug(f"Retrieved PDF content for dokument {dokument.id} ({len(pdf_bytes)} bytes)") + return pdf_bytes + + except Exception as e: + logger.error(f"Error retrieving PDF content for dokument {dokument.id}: {str(e)}", exc_info=True) + return None + + def resolve_gemeinde_for_dokument(self, dokument: Dokument) -> Optional[str]: + """ + Resolve gemeinde_id for a Dokument by checking Gemeinde.dokumente relationships. + + Args: + dokument: Dokument record + + Returns: + gemeinde_id if found, None otherwise + """ + try: + gemeinden = self.realEstateInterface.getGemeinden( + recordFilter={"mandateId": self.realEstateInterface.mandateId} + ) + + for gemeinde in gemeinden: + if gemeinde.dokumente: + for doc in gemeinde.dokumente: + doc_id = doc.id if hasattr(doc, "id") else doc.get("id") if isinstance(doc, dict) else None + if doc_id == dokument.id: + logger.debug(f"Resolved gemeinde_id {gemeinde.id} for dokument {dokument.id}") + return gemeinde.id + + logger.warning(f"Could not resolve gemeinde_id for dokument {dokument.id}") + return None + + except Exception as e: + logger.error(f"Error resolving gemeinde for dokument {dokument.id}: {str(e)}", exc_info=True) + return None + + def extract_year_from_dokument(self, dokument: Dokument) -> Optional[int]: + """ + Extract year from Dokument label or versionsbezeichnung. + + Args: + dokument: Dokument record + + Returns: + Year as integer if found, None otherwise + """ + import re + + # Try to extract year from label + if dokument.label: + year_match = re.search(r'\b(19|20)\d{2}\b', dokument.label) + if year_match: + return int(year_match.group()) + + # Try to extract year from versionsbezeichnung + if dokument.versionsbezeichnung: + year_match = re.search(r'\b(19|20)\d{2}\b', dokument.versionsbezeichnung) + if year_match: + return int(year_match.group()) + + return None diff --git a/modules/features/realEstate/bzoExtractionLangGraph.py b/modules/features/realEstate/bzoExtractionLangGraph.py new file mode 100644 index 00000000..10c56244 --- /dev/null +++ b/modules/features/realEstate/bzoExtractionLangGraph.py @@ -0,0 +1,738 @@ +""" +LangGraph-based pipeline for extracting structured content from BZO PDFs. +""" + +import logging +import re +from typing import TypedDict, List, Dict, Any, Optional +from dataclasses import dataclass +from langgraph.graph import StateGraph, START, END + +from modules.features.realEstate.bzoPdfExtractor import BZOPdfExtractor, TextBlock +from modules.features.realEstate.bzoRuleTaxonomy import RULE_TAXONOMY + +logger = logging.getLogger(__name__) + + +# ===== State Definition ===== + +@dataclass +class ClassifiedBlock: + """Classified text block.""" + block: TextBlock + block_type: str # "article", "heading", "table", "other" + article_label: Optional[str] = None + article_title: Optional[str] = None + + +@dataclass +class Article: + """Assembled article.""" + article_label: str + article_title: Optional[str] + text: str + page_start: int + page_end: int + section_level_1: Optional[str] = None + section_level_2: Optional[str] = None + section_level_3: Optional[str] = None + zone_raw: Optional[str] = None + + +@dataclass +class ZoneInfo: + """Zone information.""" + zone_code: str + zone_name: str + zone_category: Optional[str] = None + zone_subcategory: Optional[str] = None + empfindlichkeitsstufe: Optional[str] = None + geschosszahl: Optional[int] = None + gewerbeerleichterung: bool = False + + +@dataclass +class RuleCandidate: + """Rule candidate from pattern matching.""" + rule_type: str + matched_text: str + article_text: str + page: int + is_table_rule: bool = False + table_zones: List[str] = None + condition_text: Optional[str] = None + + +@dataclass +class ParsedRule: + """Parsed rule with structured values.""" + rule_type: str + value_numeric: Optional[float] + value_text: str + unit: Optional[str] + condition_text: Optional[str] + is_table_rule: bool + table_zones: List[str] + page: int + text_snippet: str + zone_raw: Optional[str] = None + rule_scope: str = "general" + confidence: float = 0.5 + + +class BZOExtractionState(TypedDict): + """State for BZO extraction pipeline.""" + # Input metadata + dokument_id: Optional[str] + pdf_id: str + + # Extracted text blocks (stored as dicts for serialization) + text_blocks: List[Dict[str, Any]] + + # Classified blocks (stored as dicts for serialization) + classified_blocks: List[Dict[str, Any]] + + # Assembled articles (stored as dicts for serialization) + articles: List[Dict[str, Any]] + + # Zone tracking + current_zones: Dict[str, Dict[str, Any]] + zones: List[Dict[str, Any]] + + # Rule extraction (stored as dicts for serialization) + rule_candidates: List[Dict[str, Any]] + parsed_rules: List[Dict[str, Any]] + + # Processing metadata + errors: List[str] + warnings: List[str] + + +# ===== Node Implementations ===== + +def extract_pdf_text(state: BZOExtractionState) -> BZOExtractionState: + """Extract text blocks from PDF.""" + try: + # PDF bytes should be passed in state context + # This is handled in run_extraction function + # State already has text_blocks populated + return state + + except Exception as e: + logger.error(f"Error extracting PDF text: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"PDF extraction error: {str(e)}"] + return state + + +def classify_text_block(state: BZOExtractionState) -> BZOExtractionState: + """Classify text blocks into articles, headings, tables, etc.""" + try: + classified = [] + + for block_dict in state["text_blocks"]: + text = block_dict["text"].strip() + if not text: + continue + + block_type = "other" + article_label = None + article_title = None + + # Check for article patterns + article_match = re.search(r'Art\.?\s*(\d+[a-z]?)', text, re.IGNORECASE) + if article_match: + block_type = "article" + article_label = f"Art. {article_match.group(1)}" + # Try to extract title (text after article label, before first period or newline) + title_match = re.search(r'Art\.?\s*\d+[a-z]?\s+(.+?)(?:\.|$|\n)', text, re.IGNORECASE) + if title_match: + article_title = title_match.group(1).strip() + + # Check for heading patterns (Roman numerals, letters, numbers) + elif re.match(r'^[A-Z]\.\s+[A-Z]', text) or re.match(r'^[IVX]+\.\s+[A-Z]', text) or re.match(r'^\d+\.\s+[A-Z]', text): + block_type = "heading" + + # Check for table patterns (multiple tabs or aligned columns) + elif '\t' in text or (len(text.split()) > 5 and text.count(' ') > 2): + block_type = "table" + + classified.append({ + "block": { + "page": block_dict["page"], + "text": block_dict["text"], + "block_id": block_dict["block_id"], + "bbox": block_dict.get("bbox") + }, + "block_type": block_type, + "article_label": article_label, + "article_title": article_title + }) + + # Update state with new classified blocks + existing_blocks = state.get("classified_blocks", []) + state["classified_blocks"] = existing_blocks + classified + return state + + except Exception as e: + logger.error(f"Error classifying text blocks: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Classification error: {str(e)}"] + return state + + +def assemble_articles(state: BZOExtractionState) -> BZOExtractionState: + """Assemble classified blocks into articles with hierarchical structure.""" + try: + articles = [] + current_article = None + current_section_1 = None + current_section_2 = None + current_section_3 = None + + for classified_dict in state["classified_blocks"]: + block_dict = classified_dict["block"] + block = TextBlock( + page=block_dict["page"], + text=block_dict["text"], + block_id=block_dict["block_id"], + bbox=block_dict.get("bbox") + ) + text = block.text.strip() + block_type = classified_dict["block_type"] + article_label = classified_dict.get("article_label") + article_title = classified_dict.get("article_title") + + # Update section levels + if block_type == "heading": + # Level 1: A., B., C. + if re.match(r'^[A-Z]\.\s+', text): + current_section_1 = text.split('.', 1)[0] + '.' + current_section_2 = None + current_section_3 = None + # Level 2: I., II., III. + elif re.match(r'^[IVX]+\.\s+', text): + current_section_2 = text.split('.', 1)[0] + '.' + current_section_3 = None + # Level 3: 1., 2., 3. + elif re.match(r'^\d+\.\s+', text): + current_section_3 = text.split('.', 1)[0] + '.' + + # Start new article + if article_label: + # Save previous article if exists + if current_article: + articles.append(current_article) + + # Start new article + current_article = { + "article_label": article_label, + "article_title": article_title, + "text": text, + "page_start": block.page, + "page_end": block.page, + "section_level_1": current_section_1, + "section_level_2": current_section_2, + "section_level_3": current_section_3, + "zone_raw": None + } + # Continue current article + elif current_article: + current_article["text"] += "\n" + text + current_article["page_end"] = block.page + + # Add last article + if current_article: + articles.append(current_article) + + # Update state with new articles + existing_articles = state.get("articles", []) + state["articles"] = existing_articles + articles + return state + + except Exception as e: + logger.error(f"Error assembling articles: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Article assembly error: {str(e)}"] + return state + + +def detect_zone_changes(state: BZOExtractionState) -> BZOExtractionState: + """Detect zone declarations and maintain zone scope.""" + try: + zones = [] + current_zones = state.get("current_zones", {}) + + for article_dict in state["articles"]: + text = article_dict.get("text", "") + article_label = article_dict.get("article_label", "") + page_start = article_dict.get("page_start", 0) + + # Pattern: "Wohnzone W2", "Zone W3", "Gewerbezone G1" + zone_patterns = [ + r'(?:Wohnzone|Zone|Gewerbezone|Industriezone|Zentrumszone|Ortsbildschutzzone|Erholungszone)\s+([A-Z0-9/]+)', + r'([A-Z]\d+(?:/\d+)?(?:G)?)', # W2/30, W2/30G, Z3, K3/4 + ] + + for pattern in zone_patterns: + matches = re.finditer(pattern, text, re.IGNORECASE) + for match in matches: + zone_code = match.group(1).upper() + + # Parse zone code + gewerbeerleichterung = zone_code.endswith('G') + if gewerbeerleichterung: + zone_code_base = zone_code[:-1] + else: + zone_code_base = zone_code + + # Extract geschosszahl from code (e.g., W2 -> 2, W3/50 -> 3) + geschosszahl = None + if '/' in zone_code_base: + parts = zone_code_base.split('/') + geschosszahl_match = re.search(r'(\d+)', parts[0]) + if geschosszahl_match: + geschosszahl = int(geschosszahl_match.group(1)) + else: + geschosszahl_match = re.search(r'(\d+)', zone_code_base) + if geschosszahl_match: + geschosszahl = int(geschosszahl_match.group(1)) + + # Determine zone category from context + zone_category = None + if 'Wohnzone' in text or zone_code.startswith('W'): + zone_category = "Wohnzonen" + elif 'Zentrumszone' in text or zone_code.startswith('Z'): + zone_category = "Zentrumszonen" + elif 'Gewerbezone' in text or zone_code.startswith('G'): + zone_category = "Arbeitsplatzzonen" + elif 'Industriezone' in text or zone_code.startswith('I'): + zone_category = "Arbeitsplatzzonen" + + zone_info = ZoneInfo( + zone_code=zone_code, + zone_name=f"Zone {zone_code}", + zone_category=zone_category, + geschosszahl=geschosszahl, + gewerbeerleichterung=gewerbeerleichterung + ) + + current_zones[zone_code] = zone_info + zones.append({ + "zone_code": zone_code, + "zone_name": zone_info.zone_name, + "zone_category": zone_category, + "geschosszahl": geschosszahl, + "gewerbeerleichterung": gewerbeerleichterung, + "source_article": article_label, + "page": page_start + }) + + # Update state with zones + state["current_zones"] = current_zones + existing_zones = state.get("zones", []) + state["zones"] = existing_zones + zones + return state + + except Exception as e: + logger.error(f"Error detecting zones: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Zone detection error: {str(e)}"] + return state + + +def detect_rule_candidates(state: BZOExtractionState) -> BZOExtractionState: + """Detect rule candidates using pattern matching.""" + try: + candidates = [] + + for article_dict in state["articles"]: + text = article_dict.get("text", "") + article_label = article_dict.get("article_label", "") + page_start = article_dict.get("page_start", 0) + + # Check each rule type in taxonomy + for rule_type, rule_config in RULE_TAXONOMY.items(): + patterns = rule_config.get("patterns", []) + + for pattern in patterns: + # Create regex pattern (case-insensitive) + regex_pattern = re.compile(pattern, re.IGNORECASE) + matches = regex_pattern.finditer(text) + + for match in matches: + # Extract context around match + start = max(0, match.start() - 100) + end = min(len(text), match.end() + 100) + context = text[start:end] + + # Check for conditions (geographic, temporal, etc.) + condition_text = None + condition_patterns = [ + r'(?:nördlich|südlich|östlich|westlich|oberhalb|unterhalb)\s+[^,\.]+', + r'(?:für|bei|in)\s+[^,\.]+', + ] + for cond_pattern in condition_patterns: + cond_match = re.search(cond_pattern, context, re.IGNORECASE) + if cond_match: + condition_text = cond_match.group(0) + break + + candidate = { + "rule_type": rule_type, + "matched_text": match.group(0), + "article_text": text, + "page": page_start, + "condition_text": condition_text, + "is_table_rule": False, + "table_zones": [] + } + candidates.append(candidate) + + # Update state with rule candidates + existing_candidates = state.get("rule_candidates", []) + state["rule_candidates"] = existing_candidates + candidates + return state + + except Exception as e: + logger.error(f"Error detecting rule candidates: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Rule candidate detection error: {str(e)}"] + return state + + +def parse_rule_values(state: BZOExtractionState) -> BZOExtractionState: + """Parse rule values using regex (LLM fallback can be added later).""" + try: + parsed_rules = [] + + for candidate_dict in state["rule_candidates"]: + rule_type = candidate_dict["rule_type"] + rule_config = RULE_TAXONOMY.get(rule_type, {}) + units = rule_config.get("units", []) + value_type = rule_config.get("value_type", "numeric") + + # Extract value using regex + matched_text = candidate_dict["matched_text"] + article_text = candidate_dict["article_text"] + text = matched_text + " " + article_text[article_text.find(matched_text):article_text.find(matched_text) + 200] + + value_numeric = None + value_text = matched_text + unit = None + + # Try to extract numeric value + if value_type in ["numeric", "integer"]: + # Pattern: "max. 4", "30 %", "min. 3.5 m" + value_patterns = [ + r'(?:max|maximal|min|mindestens|höchstens)\s*\.?\s*(\d+(?:\.\d+)?)', + r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)', + r'(\d+(?:\.\d+)?)', + ] + + for pattern in value_patterns: + match = re.search(pattern, text, re.IGNORECASE) + if match: + try: + value_numeric = float(match.group(1)) + if value_type == "integer": + value_numeric = int(value_numeric) + + # Check for unit + unit_match = re.search(r'(\d+(?:\.\d+)?)\s*(%|m|meter|metern|prozent)', text, re.IGNORECASE) + if unit_match: + unit = unit_match.group(2).lower() + if unit in ["meter", "metern"]: + unit = "m" + elif unit == "prozent": + unit = "%" + + break + except ValueError: + continue + + # Calculate confidence + confidence = 0.5 + if value_numeric is not None: + confidence = 0.8 + if unit: + confidence = 0.9 + + # Determine zone and scope + zone_raw = None + rule_scope = "general" + + # Check current zones context + if state.get("current_zones"): + # Use first zone as default (can be improved) + zone_raw = list(state["current_zones"].keys())[0] if state["current_zones"] else None + rule_scope = "zone" if zone_raw else "general" + + parsed_rule = { + "rule_type": rule_type, + "value_numeric": value_numeric, + "value_text": value_text, + "unit": unit, + "condition_text": candidate_dict.get("condition_text"), + "is_table_rule": candidate_dict.get("is_table_rule", False), + "table_zones": candidate_dict.get("table_zones", []), + "page": candidate_dict["page"], + "text_snippet": value_text, + "zone_raw": zone_raw, + "rule_scope": rule_scope, + "confidence": confidence + } + parsed_rules.append(parsed_rule) + + # Update state with parsed rules + existing_rules = state.get("parsed_rules", []) + state["parsed_rules"] = existing_rules + parsed_rules + return state + + except Exception as e: + logger.error(f"Error parsing rule values: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Rule parsing error: {str(e)}"] + return state + + +def assign_zone_and_scope(state: BZOExtractionState) -> BZOExtractionState: + """Assign zone and scope to parsed rules.""" + try: + # Rules already have zone and scope assigned in parse_rule_values + # This node can refine assignments if needed + return state + + except Exception as e: + logger.error(f"Error assigning zone and scope: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Zone/scope assignment error: {str(e)}"] + return state + + +def confidence_scoring(state: BZOExtractionState) -> BZOExtractionState: + """Calculate confidence scores for extracted data.""" + try: + # Confidence already calculated in parse_rule_values + # This node can refine scores if needed + return state + + except Exception as e: + logger.error(f"Error calculating confidence: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Confidence scoring error: {str(e)}"] + return state + + + + +# ===== Graph Construction ===== + +def create_bzo_extraction_graph(): + """Create and compile the BZO extraction graph.""" + workflow = StateGraph(BZOExtractionState) + + # Add nodes + workflow.add_node("extract_pdf_text", extract_pdf_text) + workflow.add_node("classify_text_block", classify_text_block) + workflow.add_node("assemble_articles", assemble_articles) + workflow.add_node("detect_zone_changes", detect_zone_changes) + workflow.add_node("detect_rule_candidates", detect_rule_candidates) + workflow.add_node("parse_rule_values", parse_rule_values) + workflow.add_node("assign_zone_and_scope", assign_zone_and_scope) + workflow.add_node("confidence_scoring", confidence_scoring) + + # Define edges + workflow.set_entry_point("extract_pdf_text") + workflow.add_edge("extract_pdf_text", "classify_text_block") + workflow.add_edge("classify_text_block", "assemble_articles") + workflow.add_edge("assemble_articles", "detect_zone_changes") + workflow.add_edge("detect_zone_changes", "detect_rule_candidates") + workflow.add_edge("detect_rule_candidates", "parse_rule_values") + workflow.add_edge("parse_rule_values", "assign_zone_and_scope") + workflow.add_edge("assign_zone_and_scope", "confidence_scoring") + workflow.add_edge("confidence_scoring", END) + + return workflow.compile() + + +def run_extraction(pdf_bytes: bytes, pdf_id: str = None, dokument_id: str = None) -> Dict[str, Any]: + """ + Run the extraction pipeline on a PDF and return structured, sorted results. + + Args: + pdf_bytes: PDF file content as bytes + pdf_id: Optional identifier for the PDF (defaults to generated ID) + dokument_id: Optional dokument ID for reference + + Returns: + Dictionary with extracted and sorted content: + { + "articles": [...], # Sorted by page_start, then article_label + "zones": [...], # Sorted by zone_code + "rules": [...], # Sorted by rule_type, then page + "errors": [...], + "warnings": [...] + } + """ + import uuid + + if not pdf_id: + pdf_id = f"pdf_{uuid.uuid4().hex[:8]}" + + # Initialize state + state: BZOExtractionState = { + "dokument_id": dokument_id, + "pdf_id": pdf_id, + "text_blocks": [], + "classified_blocks": [], + "articles": [], + "current_zones": {}, + "zones": [], + "rule_candidates": [], + "parsed_rules": [], + "errors": [], + "warnings": [] + } + + # Extract PDF text first + pdf_extractor = BZOPdfExtractor() + text_blocks_objects = pdf_extractor.extract_text_blocks(pdf_bytes, state["pdf_id"]) + # Convert TextBlock objects to dicts for state + state["text_blocks"] = [ + { + "page": tb.page, + "text": tb.text, + "block_id": tb.block_id, + "bbox": tb.bbox + } + for tb in text_blocks_objects + ] + + # Create and run graph + graph = create_bzo_extraction_graph() + final_state = graph.invoke(state) + + # Sort and structure results + articles = sorted( + final_state.get("articles", []), + key=lambda x: (x.get("page_start", 0), x.get("article_label", "")) + ) + + zones = sorted( + final_state.get("zones", []), + key=lambda x: x.get("zone_code", "") + ) + + rules = sorted( + final_state.get("parsed_rules", []), + key=lambda x: (x.get("rule_type", ""), x.get("page", 0)) + ) + + return { + "articles": articles, + "zones": zones, + "rules": rules, + "errors": final_state.get("errors", []), + "warnings": final_state.get("warnings", []) + } + + +def extract_from_documents( + document_retriever, + dokument_ids: List[str] +) -> Dict[str, Any]: + """ + Extract BZO content from one or more documents. + + Args: + document_retriever: BZODocumentRetriever instance + dokument_ids: List of dokument IDs to process + + Returns: + Dictionary with results per document: + { + "results": [ + { + "dokument_id": "...", + "articles": [...], + "zones": [...], + "rules": [...], + "errors": [...], + "warnings": [...] + }, + ... + ], + "summary": { + "total_documents": N, + "successful": M, + "failed": K, + "total_articles": X, + "total_zones": Y, + "total_rules": Z + } + } + """ + results = [] + total_articles = 0 + total_zones = 0 + total_rules = 0 + successful = 0 + failed = 0 + + # Retrieve documents + dokumente = document_retriever.get_documents_by_ids(dokument_ids) + + for dokument in dokumente: + try: + # Retrieve PDF content + pdf_bytes = document_retriever.retrieve_pdf_content(dokument) + if not pdf_bytes: + logger.warning(f"Could not retrieve PDF for dokument {dokument.id}") + results.append({ + "dokument_id": dokument.id, + "articles": [], + "zones": [], + "rules": [], + "errors": [f"Could not retrieve PDF content"], + "warnings": [] + }) + failed += 1 + continue + + # Run extraction + extraction_result = run_extraction( + pdf_bytes=pdf_bytes, + pdf_id=dokument.dokumentReferenz or f"dok_{dokument.id}", + dokument_id=dokument.id + ) + + # Add dokument_id to result + extraction_result["dokument_id"] = dokument.id + results.append(extraction_result) + + # Update counters + total_articles += len(extraction_result.get("articles", [])) + total_zones += len(extraction_result.get("zones", [])) + total_rules += len(extraction_result.get("rules", [])) + + if extraction_result.get("errors"): + failed += 1 + else: + successful += 1 + + except Exception as e: + logger.error(f"Error processing dokument {dokument.id}: {str(e)}", exc_info=True) + results.append({ + "dokument_id": dokument.id, + "articles": [], + "zones": [], + "rules": [], + "errors": [f"Processing error: {str(e)}"], + "warnings": [] + }) + failed += 1 + + return { + "results": results, + "summary": { + "total_documents": len(dokument_ids), + "successful": successful, + "failed": failed, + "total_articles": total_articles, + "total_zones": total_zones, + "total_rules": total_rules + } + } diff --git a/modules/features/realEstate/bzoPdfExtractor.py b/modules/features/realEstate/bzoPdfExtractor.py new file mode 100644 index 00000000..155f5406 --- /dev/null +++ b/modules/features/realEstate/bzoPdfExtractor.py @@ -0,0 +1,117 @@ +""" +PDF extraction module for BZO documents. +Extracts page-aware text blocks from PDF files. +""" + +import logging +from typing import List, Dict, Any +from dataclasses import dataclass +import fitz # PyMuPDF + +logger = logging.getLogger(__name__) + + +@dataclass +class TextBlock: + """Represents a text block from a PDF page.""" + page: int + text: str + block_id: str + bbox: tuple = None # (x0, y0, x1, y1) bounding box + + +class BZOPdfExtractor: + """Extracts text blocks from PDF files with page awareness.""" + + def __init__(self): + """Initialize the PDF extractor.""" + pass + + def extract_text_blocks(self, pdf_bytes: bytes, pdf_id: str) -> List[TextBlock]: + """ + Extract page-aware text blocks from PDF. + + Args: + pdf_bytes: PDF file content as bytes + pdf_id: Identifier for the PDF (for logging) + + Returns: + List of TextBlock objects with page numbers + """ + text_blocks = [] + + try: + # Open PDF from bytes + pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf") + + # Extract text from each page + for page_num in range(len(pdf_document)): + page = pdf_document[page_num] + + # Extract text blocks from page + blocks = page.get_text("blocks") + + for block_idx, block in enumerate(blocks): + # block format: (x0, y0, x1, y1, "text", block_no, block_type) + if len(block) >= 5: + bbox = (block[0], block[1], block[2], block[3]) + text = block[4].strip() + + # Skip empty blocks + if not text: + continue + + # Create TextBlock + block_id = f"{pdf_id}_p{page_num + 1}_b{block_idx}" + text_block = TextBlock( + page=page_num + 1, # 1-indexed pages + text=text, + block_id=block_id, + bbox=bbox + ) + text_blocks.append(text_block) + + # Store page count before closing + page_count = len(pdf_document) + pdf_document.close() + + logger.info(f"Extracted {len(text_blocks)} text blocks from PDF {pdf_id} ({page_count} pages)") + + except Exception as e: + logger.error(f"Error extracting text from PDF {pdf_id}: {str(e)}", exc_info=True) + raise + + return text_blocks + + def extract_text_by_page(self, pdf_bytes: bytes, pdf_id: str) -> Dict[int, str]: + """ + Extract full text per page (alternative method). + + Args: + pdf_bytes: PDF file content as bytes + pdf_id: Identifier for the PDF + + Returns: + Dictionary mapping page number to full page text + """ + page_texts = {} + + try: + pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf") + + for page_num in range(len(pdf_document)): + page = pdf_document[page_num] + text = page.get_text() + page_texts[page_num + 1] = text # 1-indexed + + # Store page count before closing + page_count = len(pdf_document) + pdf_document.close() + + logger.debug(f"Extracted text from {page_count} pages for PDF {pdf_id}") + + except Exception as e: + logger.error(f"Error extracting page text from PDF {pdf_id}: {str(e)}", exc_info=True) + raise + + return page_texts diff --git a/modules/features/realEstate/bzoRuleTaxonomy.py b/modules/features/realEstate/bzoRuleTaxonomy.py new file mode 100644 index 00000000..1c810efe --- /dev/null +++ b/modules/features/realEstate/bzoRuleTaxonomy.py @@ -0,0 +1,105 @@ +""" +Rule taxonomy for BZO extraction. +Defines fixed rule types and their patterns for deterministic rule detection. +""" + +RULE_TAXONOMY = { + "max_building_height": { + "patterns": ["fassadenhöhe", "fassadenhöhen", "gebäudehöhe", "firsthöhe", "traufhöhe", "höchsthöhe", "gesamt höhe", "gesamt höhen"], + "units": ["m", "meter", "metern"], + "value_type": "numeric", + "keywords": ["max", "maximal", "höchstens"] + }, + "max_floors": { + "patterns": ["vollgeschosse", "vollgeschoss", "geschosse", "geschosszahl"], + "units": [], + "value_type": "integer", + "keywords": ["max", "maximal", "höchstens"] + }, + "max_attachable_attics": { + "patterns": ["anrechenbare dachgeschosse", "anrechenbares attikageschoss", "dachgeschosse", "attikageschoss"], + "units": [], + "value_type": "integer", + "keywords": ["max", "maximal"] + }, + "max_attachable_basement": { + "patterns": ["anrechenbares untergeschoss", "untergeschoss"], + "units": [], + "value_type": "integer", + "keywords": ["max", "maximal"] + }, + "density": { + "patterns": ["ausnützungsziffer", "az", "ausnützung"], + "units": ["%", "prozent"], + "value_type": "numeric", + "keywords": ["max", "maximal"] + }, + "building_mass_index": { + "patterns": ["baumassenziffer", "bmz"], + "units": [], + "value_type": "numeric", + "keywords": ["max", "maximal"] + }, + "green_space_index": { + "patterns": ["grünflächenziffer", "gfz"], + "units": ["%", "prozent"], + "value_type": "numeric", + "keywords": ["min", "mindestens"] + }, + "boundary_distance": { + "patterns": ["grenzabstand", "grundabstand", "abstand"], + "units": ["m", "meter", "metern"], + "value_type": "numeric", + "keywords": ["min", "mindestens", "max", "maximal"] + }, + "boundary_distance_length_surcharge": { + "patterns": ["mehrlängenzuschlag", "längenzuschlag"], + "units": [], + "value_type": "fraction", # e.g., "1/3", "1/5" + "keywords": [] + }, + "boundary_distance_max": { + "patterns": ["höchstmass", "höchstmass grenzabstand", "höchstmass abstand"], + "units": ["m", "meter"], + "value_type": "numeric", + "keywords": ["max", "maximal"] + }, + "building_length": { + "patterns": ["gebäudelänge"], + "units": ["m", "meter"], + "value_type": "numeric", + "keywords": ["min", "mindestens", "max", "maximal"] + }, + "building_width": { + "patterns": ["gebäudebreite"], + "units": ["m", "meter"], + "value_type": "numeric", + "keywords": ["min", "mindestens", "max", "maximal"] + }, + "residential_area_share": { + "patterns": ["wohnflächenanteil", "wohnanteil"], + "units": ["%", "prozent"], + "value_type": "numeric", + "keywords": ["min", "mindestens", "max", "maximal"] + } +} + + +def get_rule_taxonomy() -> dict: + """Get the rule taxonomy dictionary.""" + return RULE_TAXONOMY + + +def get_rule_types() -> list: + """Get list of all rule types.""" + return list(RULE_TAXONOMY.keys()) + + +def get_rule_patterns(rule_type: str) -> list: + """Get patterns for a specific rule type.""" + return RULE_TAXONOMY.get(rule_type, {}).get("patterns", []) + + +def get_rule_units(rule_type: str) -> list: + """Get units for a specific rule type.""" + return RULE_TAXONOMY.get(rule_type, {}).get("units", []) diff --git a/modules/features/realEstate/scrapeSwissTopo.py b/modules/features/realEstate/scrapeSwissTopo.py new file mode 100644 index 00000000..f761100c --- /dev/null +++ b/modules/features/realEstate/scrapeSwissTopo.py @@ -0,0 +1,780 @@ +""" +Swiss Topo Scraping Script + +Scrapes Switzerland systematically using the Swiss Topo connector +and saves parcel data to the database. + +This script divides Switzerland into a grid and queries parcels at each grid point, +then deduplicates and saves unique parcels to the database. +""" + +import logging +import asyncio +from typing import Dict, Any, List, Set, Optional +from dataclasses import dataclass +import json + +from modules.datamodels.datamodelUam import User +from modules.datamodels.datamodelRealEstate import ( + Parzelle, + GeoPolylinie, + GeoPunkt, + Kontext, + Gemeinde, + Kanton, +) +from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface +from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector +from modules.connectors.connectorOerebWfs import OerebWfsConnector + +logger = logging.getLogger(__name__) + + +@dataclass +class ScrapingStats: + """Statistics for scraping operation.""" + total_queries: int = 0 + successful_queries: int = 0 + failed_queries: int = 0 + unique_parcels_found: int = 0 + parcels_saved: int = 0 + parcels_skipped: int = 0 + errors: List[str] = None + + def __post_init__(self): + if self.errors is None: + self.errors = [] + + +class SwissTopoScraper: + """ + Scraper for Swiss Topo parcel data. + + Divides Kanton Zürich into a grid and queries parcels systematically, + then saves unique parcels to the database with ÖREB bauzone information. + """ + + # Zürich canton bounds in LV95 coordinates + ZURICH_BOUNDS = { + "min_x": 2680000, + "max_x": 2780000, + "min_y": 1210000, + "max_y": 1280000 + } + + def __init__( + self, + current_user: User, + grid_size: float = 500.0, # Grid size in meters (500m = reasonable coverage) + max_concurrent: int = 50, # Maximum concurrent API requests + batch_size: int = 100, # Process parcels in batches + ): + """ + Initialize scraper. + + Args: + current_user: User for database operations + grid_size: Size of grid cells in meters (default: 500m) + max_concurrent: Maximum concurrent API requests (default: 50) + batch_size: Number of parcels to process before saving (default: 100) + """ + self.current_user = current_user + self.grid_size = grid_size + self.max_concurrent = max_concurrent + self.batch_size = batch_size + + oereb_connector = OerebWfsConnector() + self.connector = SwissTopoMapServerConnector(oereb_connector=oereb_connector) + self.realEstateInterface = getRealEstateInterface(current_user) + + # Track unique parcels by EGRID or label + self.unique_parcels: Dict[str, Dict[str, Any]] = {} + self.stats = ScrapingStats() + + # Cache for Gemeinde and Kanton UUIDs to avoid repeated database queries + # Key: bfs_nummer (int or str), Value: UUID (str) + self.gemeinde_cache: Dict[str, str] = {} + # Key: kanton abbreviation (str, e.g., "ZH"), Value: UUID (str) + self.kanton_cache: Dict[str, str] = {} + + def _generate_grid_points(self) -> List[tuple]: + """ + Generate grid points covering Kanton Zürich. + + Returns: + List of (x, y) coordinate tuples in LV95 + """ + bounds = self.ZURICH_BOUNDS + min_x = bounds["min_x"] + max_x = bounds["max_x"] + min_y = bounds["min_y"] + max_y = bounds["max_y"] + + grid_points = [] + x = min_x + while x <= max_x: + y = min_y + while y <= max_y: + grid_points.append((x, y)) + y += self.grid_size + x += self.grid_size + + logger.info(f"Generated {len(grid_points)} grid points covering Kanton Zürich") + return grid_points + + async def _query_parcel_at_point( + self, + x: float, + y: float, + semaphore: asyncio.Semaphore + ) -> Optional[Dict[str, Any]]: + """ + Query parcel at a specific coordinate point. + + Args: + x: X coordinate (LV95) + y: Y coordinate (LV95) + semaphore: Semaphore for concurrency control + + Returns: + Parcel data dictionary or None if not found + """ + async with semaphore: + try: + self.stats.total_queries += 1 + location_str = f"{x},{y}" + + parcel_data = await self.connector.search_parcel(location_str, tolerance=5) + + if parcel_data: + self.stats.successful_queries += 1 + return parcel_data + else: + self.stats.failed_queries += 1 + return None + + except Exception as e: + self.stats.failed_queries += 1 + error_msg = f"Error querying parcel at ({x}, {y}): {str(e)}" + logger.debug(error_msg) + self.stats.errors.append(error_msg) + return None + + def _find_gemeinde_by_bfs_nummer(self, bfs_nummer: str) -> Optional[Gemeinde]: + """ + Find existing Gemeinde by BFS number (stored in kontextInformationen). + + Args: + bfs_nummer: BFS municipality number + + Returns: + Gemeinde instance if found, None otherwise + """ + try: + # Check cache first + if bfs_nummer in self.gemeinde_cache: + gemeinde_id = self.gemeinde_cache[bfs_nummer] + gemeinde = self.realEstateInterface.getGemeinde(gemeinde_id) + if gemeinde: + return gemeinde + + # Search all Gemeinden and check kontextInformationen + gemeinden = self.realEstateInterface.getGemeinden( + recordFilter={"mandateId": self.current_user.mandateId} + ) + + for gemeinde in gemeinden: + # Check kontextInformationen for bfs_nummer + for kontext in gemeinde.kontextInformationen: + try: + kontext_data = json.loads(kontext.inhalt) if isinstance(kontext.inhalt, str) else kontext.inhalt + if isinstance(kontext_data, dict): + if str(kontext_data.get("bfs_nummer")) == str(bfs_nummer): + # Cache the result + self.gemeinde_cache[bfs_nummer] = gemeinde.id + return gemeinde + except (json.JSONDecodeError, AttributeError): + continue + + return None + except Exception as e: + logger.error(f"Error finding Gemeinde by BFS number {bfs_nummer}: {e}", exc_info=True) + return None + + def _find_kanton_by_abbreviation(self, abk: str) -> Optional[Kanton]: + """ + Find existing Kanton by abbreviation. + + Args: + abk: Canton abbreviation (e.g., "BE", "ZH") + + Returns: + Kanton instance if found, None otherwise + """ + try: + # Check cache first + if abk in self.kanton_cache: + kanton_id = self.kanton_cache[abk] + kanton = self.realEstateInterface.getKanton(kanton_id) + if kanton: + return kanton + + # Search by abbreviation + kantone = self.realEstateInterface.getKantone( + recordFilter={ + "mandateId": self.current_user.mandateId, + "abk": abk + } + ) + + if kantone: + kanton = kantone[0] + # Cache the result + self.kanton_cache[abk] = kanton.id + return kanton + + return None + except Exception as e: + logger.error(f"Error finding Kanton by abbreviation {abk}: {e}", exc_info=True) + return None + + def _get_or_create_kanton(self, kanton_abk: str) -> Optional[str]: + """ + Get or create a Kanton by abbreviation. + + Args: + kanton_abk: Canton abbreviation (e.g., "BE", "ZH") + + Returns: + UUID of the Kanton, or None if creation failed + """ + if not kanton_abk: + return None + + # Check if exists + existing_kanton = self._find_kanton_by_abbreviation(kanton_abk) + if existing_kanton: + return existing_kanton.id + + # Create new Kanton + try: + # Map common abbreviations to full names (fallback to abbreviation if not found) + kanton_names = { + "AG": "Aargau", "AI": "Appenzell Innerrhoden", "AR": "Appenzell Ausserrhoden", + "BE": "Bern", "BL": "Basel-Landschaft", "BS": "Basel-Stadt", + "FR": "Freiburg", "GE": "Genf", "GL": "Glarus", "GR": "Graubünden", + "JU": "Jura", "LU": "Luzern", "NE": "Neuenburg", "NW": "Nidwalden", + "OW": "Obwalden", "SG": "St. Gallen", "SH": "Schaffhausen", "SO": "Solothurn", + "SZ": "Schwyz", "TG": "Thurgau", "TI": "Tessin", "UR": "Uri", + "VD": "Waadt", "VS": "Wallis", "ZG": "Zug", "ZH": "Zürich" + } + + kanton_label = kanton_names.get(kanton_abk, kanton_abk) + + kanton = Kanton( + mandateId=self.current_user.mandateId, + label=kanton_label, + abk=kanton_abk + ) + + created_kanton = self.realEstateInterface.createKanton(kanton) + if created_kanton and created_kanton.id: + # Cache the result + self.kanton_cache[kanton_abk] = created_kanton.id + logger.info(f"Created new Kanton: {kanton_label} ({kanton_abk})") + return created_kanton.id + except Exception as e: + logger.error(f"Error creating Kanton {kanton_abk}: {e}", exc_info=True) + + return None + + def _get_or_create_gemeinde( + self, + gemeinde_name: str, + bfs_nummer: str, + kanton_abk: str + ) -> Optional[str]: + """ + Get or create a Gemeinde by BFS number. + + Args: + gemeinde_name: Municipality name + bfs_nummer: BFS municipality number + kanton_abk: Canton abbreviation + + Returns: + UUID of the Gemeinde, or None if creation failed + """ + if not gemeinde_name or not bfs_nummer: + return None + + # Check if exists + existing_gemeinde = self._find_gemeinde_by_bfs_nummer(bfs_nummer) + if existing_gemeinde: + return existing_gemeinde.id + + # Get or create Kanton first + kanton_id = self._get_or_create_kanton(kanton_abk) + + # Create new Gemeinde + try: + gemeinde = Gemeinde( + mandateId=self.current_user.mandateId, + label=gemeinde_name, + id_kanton=kanton_id, + kontextInformationen=[ + Kontext( + thema="BFS Nummer", + inhalt=json.dumps({"bfs_nummer": bfs_nummer}, ensure_ascii=False) + ) + ] + ) + + created_gemeinde = self.realEstateInterface.createGemeinde(gemeinde) + if created_gemeinde and created_gemeinde.id: + # Cache the result + self.gemeinde_cache[bfs_nummer] = created_gemeinde.id + logger.info(f"Created new Gemeinde: {gemeinde_name} (BFS: {bfs_nummer})") + return created_gemeinde.id + except Exception as e: + logger.error(f"Error creating Gemeinde {gemeinde_name} (BFS: {bfs_nummer}): {e}", exc_info=True) + + return None + + def _extract_parcel_identifier(self, parcel_data: Dict[str, Any]) -> Optional[str]: + """ + Extract unique identifier for a parcel. + + Args: + parcel_data: Parcel data from Swiss Topo + + Returns: + Unique identifier (EGRID or label) or None + """ + attributes = parcel_data.get("attributes", {}) + + # Prefer EGRID as it's globally unique + egrid = attributes.get("egris_egrid") + if egrid: + return f"egrid:{egrid}" + + # Fallback to label + municipality code + label = attributes.get("label") or attributes.get("number") + bfsnr = attributes.get("bfsnr") + if label and bfsnr: + return f"label:{bfsnr}:{label}" + + # Last resort: just label + if label: + return f"label:{label}" + + return None + + def _convert_to_parzelle_model( + self, + parcel_data: Dict[str, Any], + parcel_id: str, + gemeinde_id: Optional[str] = None + ) -> Optional[Parzelle]: + """ + Convert Swiss Topo parcel data to Parzelle model. + + Args: + parcel_data: Raw parcel data from Swiss Topo + parcel_id: Unique parcel identifier + gemeinde_id: UUID of the Gemeinde (if already resolved) + + Returns: + Parzelle model instance or None if conversion fails + """ + try: + attributes = parcel_data.get("attributes", {}) + geometry = parcel_data.get("geometry", {}) + + # Extract attributes + extracted_attrs = self.connector.extract_parcel_attributes(parcel_data) + + # Get geocoded address if available + geocoded_address = parcel_data.get('geocoded_address', {}) + + # Get bauzone - prefer ÖREB bauzone if available (for Zürich), otherwise use extracted_attrs + bauzone = parcel_data.get("oereb_bauzone") or extracted_attrs.get("bauzone") + + # Build Parzelle data + parzelle_data = { + "mandateId": self.current_user.mandateId, + "label": extracted_attrs.get("label") or attributes.get("number") or f"Parcel-{parcel_id}", + "parzellenAliasTags": [attributes.get("egris_egrid")] if attributes.get("egris_egrid") else [], + "strasseNr": geocoded_address.get("full_address") or extracted_attrs.get("strasseNr"), + "plz": geocoded_address.get("plz") or extracted_attrs.get("plz"), + "eigentuemerschaft": extracted_attrs.get("eigentuemerschaft"), + "bauzone": bauzone, + "perimeter": extracted_attrs.get("perimeter"), + "baulinie": None, + "kontextGemeinde": gemeinde_id, # Use UUID reference instead of name + "az": None, + "bz": None, + "vollgeschossZahl": None, + "anrechenbarDachgeschoss": None, + "anrechenbarUntergeschoss": None, + "gebaeudehoeheMax": None, + "regelnGrenzabstand": [], + "regelnMehrlaengenzuschlag": [], + "regelnMehrhoehenzuschlag": [], + "parzelleBebaut": None, + "parzelleErschlossen": None, + "parzelleHanglage": None, + "laermschutzzone": None, + "hochwasserschutzzone": None, + "grundwasserschutzzone": None, + "parzellenNachbarschaft": [], + "dokumente": [], + "kontextInformationen": [ + Kontext( + thema="Swiss Topo Scraping", + inhalt=json.dumps({ + "egrid": attributes.get("egris_egrid"), + "identnd": attributes.get("identnd"), + "canton": attributes.get("ak"), + "municipality_code": attributes.get("bfsnr"), + "scraped_coordinates": parcel_data.get('query_coordinates', {}), + "source": "swiss_topo_scraping", + "bauzone_source": "oereb_wfs" if parcel_data.get("oereb_bauzone") else "swiss_topo" + }, ensure_ascii=False) + ) + ] + } + + # Create Parzelle instance + parzelle_instance = Parzelle(**parzelle_data) + return parzelle_instance + + except Exception as e: + logger.error(f"Error converting parcel {parcel_id} to Parzelle model: {e}", exc_info=True) + return None + + async def _save_parcels_batch( + self, + parcels: List[Dict[str, Any]] + ) -> int: + """ + Save a batch of parcels to the database. + + This method handles the sorting algorithm: + - For each parcel, extracts gemeinde_information + - Checks if Gemeinde exists (by bfs_nummer), creates if not + - Checks if Kanton exists (by abbreviation), creates if not + - Links Parzelle to Gemeinde via UUID reference + + Args: + parcels: List of parcel data dictionaries + + Returns: + Number of parcels successfully saved + """ + saved_count = 0 + + for parcel_data in parcels: + try: + parcel_id = self._extract_parcel_identifier(parcel_data) + if not parcel_id: + logger.warning("Could not extract parcel identifier, skipping") + self.stats.parcels_skipped += 1 + continue + + # Check if parcel already exists in our unique set + if parcel_id in self.unique_parcels: + logger.debug(f"Parcel {parcel_id} already found, skipping duplicate") + self.stats.parcels_skipped += 1 + continue + + # Extract gemeinde_information from parcel_data + # The connector returns it as 'gemeinde_info' (not 'gemeinde_information') + gemeinde_info = parcel_data.get('gemeinde_info') or parcel_data.get('gemeinde_information') + gemeinde_id = None + kanton_abk = None + + if gemeinde_info: + gemeinde_name = gemeinde_info.get('name') + bfs_nummer = gemeinde_info.get('bfs_nummer') + kanton_abk = gemeinde_info.get('kanton') + + # Skip parcels not from Zürich (safety filter) + if kanton_abk and kanton_abk.upper() != "ZH": + logger.debug(f"Skipping parcel {parcel_id} from canton {kanton_abk} (only Zürich parcels are processed)") + self.stats.parcels_skipped += 1 + continue + + if gemeinde_name and bfs_nummer: + # Get or create Gemeinde (this also handles Kanton creation) + gemeinde_id = self._get_or_create_gemeinde( + gemeinde_name=gemeinde_name, + bfs_nummer=str(bfs_nummer), + kanton_abk=kanton_abk + ) + + if not gemeinde_id: + logger.warning( + f"Could not get or create Gemeinde for parcel {parcel_id}: " + f"name={gemeinde_name}, bfs_nummer={bfs_nummer}" + ) + else: + logger.debug( + f"Missing Gemeinde info for parcel {parcel_id}: " + f"name={gemeinde_name}, bfs_nummer={bfs_nummer}" + ) + else: + logger.debug(f"No gemeinde_info found in parcel_data for {parcel_id}") + # Skip parcels without gemeinde_info (likely not from Zürich) + self.stats.parcels_skipped += 1 + continue + + # Query ÖREB WFS for bauzone (all parcels are from Zürich) + if kanton_abk and kanton_abk.upper() == "ZH": + try: + attributes = parcel_data.get("attributes", {}) + geometry = parcel_data.get("geometry", {}) + egrid = attributes.get("egris_egrid", "") + + # Get coordinates for query (use centroid or first point if available) + x = None + y = None + if geometry: + if "rings" in geometry and geometry["rings"]: + # Use first point of first ring + first_ring = geometry["rings"][0] + if first_ring and len(first_ring) > 0: + x = first_ring[0][0] + y = first_ring[0][1] + elif "coordinates" in geometry: + # Try to extract coordinates from GeoJSON format + coords = geometry.get("coordinates", []) + if coords and len(coords) > 0: + # Handle nested coordinate arrays + def get_first_coord(coord_list, depth=0): + if depth < 3 and isinstance(coord_list, list) and len(coord_list) > 0: + if isinstance(coord_list[0], (int, float)): + return coord_list + return get_first_coord(coord_list[0], depth + 1) + return None + first_coord = get_first_coord(coords) + if first_coord and len(first_coord) >= 2: + x = first_coord[0] + y = first_coord[1] + + if geometry and self.connector.oereb_connector: + logger.debug(f"Querying ÖREB WFS for bauzone for parcel {parcel_id}") + zone_results = await self.connector.oereb_connector.query_zone_layer( + egrid=egrid, + x=x or 0.0, + y=y or 0.0, + canton="ZH", + geometry=geometry + ) + + if zone_results and len(zone_results) > 0: + # Extract typ_gde_abkuerzung from the first result + zone_attrs = zone_results[0].get("attributes", {}) + oereb_bauzone = zone_attrs.get("typ_gde_abkuerzung") + if oereb_bauzone: + # Add bauzone to parcel_data so it can be used in _convert_to_parzelle_model + parcel_data["oereb_bauzone"] = oereb_bauzone + logger.debug(f"Found ÖREB bauzone '{oereb_bauzone}' for parcel {parcel_id}") + else: + logger.debug(f"No typ_gde_abkuerzung found in ÖREB response for parcel {parcel_id}") + else: + logger.debug(f"No zone results from ÖREB WFS for parcel {parcel_id}") + else: + logger.debug(f"Cannot query ÖREB WFS for parcel {parcel_id}: missing geometry or connector") + except Exception as e: + logger.warning(f"Error querying ÖREB WFS for bauzone for parcel {parcel_id}: {e}", exc_info=True) + # Continue without ÖREB bauzone - will use default from extracted_attrs + else: + # This should not happen since we filter for ZH above, but log if it does + logger.warning(f"Parcel {parcel_id} is not from Zürich (kanton: {kanton_abk}), skipping ÖREB query") + + # Convert to Parzelle model (with Gemeinde UUID reference) + parzelle_instance = self._convert_to_parzelle_model( + parcel_data, + parcel_id, + gemeinde_id=gemeinde_id + ) + if not parzelle_instance: + logger.warning(f"Could not convert parcel {parcel_id} to model, skipping") + self.stats.parcels_skipped += 1 + continue + + # Check if parcel already exists in database (by label) + # Note: We rely on in-memory deduplication by EGRID/label for uniqueness + # Database check is mainly to avoid re-saving parcels from previous runs + existing_parcels = self.realEstateInterface.getParzellen( + recordFilter={ + "mandateId": self.current_user.mandateId, + "label": parzelle_instance.label + } + ) + + if existing_parcels: + logger.debug(f"Parcel {parzelle_instance.label} already exists in database, skipping") + self.unique_parcels[parcel_id] = parcel_data # Mark as seen + self.stats.parcels_skipped += 1 + continue + + # Save to database + created_parzelle = self.realEstateInterface.createParzelle(parzelle_instance) + + if created_parzelle and created_parzelle.id: + self.unique_parcels[parcel_id] = parcel_data + saved_count += 1 + self.stats.parcels_saved += 1 + logger.debug( + f"Saved parcel {created_parzelle.label} (ID: {created_parzelle.id}) " + f"linked to Gemeinde {gemeinde_id if gemeinde_id else 'None'}" + ) + else: + logger.warning(f"Failed to save parcel {parzelle_instance.label}") + self.stats.parcels_skipped += 1 + + except Exception as e: + error_msg = f"Error saving parcel: {str(e)}" + logger.error(error_msg, exc_info=True) + self.stats.errors.append(error_msg) + self.stats.parcels_skipped += 1 + + return saved_count + + async def scrape( + self, + grid_points: Optional[List[tuple]] = None, + progress_callback: Optional[callable] = None + ) -> ScrapingStats: + """ + Scrape Kanton Zürich for parcel data. + + Args: + grid_points: Optional list of (x, y) coordinates to query. + If None, generates grid automatically for Zürich. + progress_callback: Optional callback function(status_dict) called periodically + + Returns: + ScrapingStats object with scraping statistics + """ + logger.info("Starting Swiss Topo scraping operation for Kanton Zürich") + + # Generate grid points if not provided + if grid_points is None: + grid_points = self._generate_grid_points() + + logger.info(f"Scraping {len(grid_points)} grid points in Kanton Zürich") + + # Create semaphore for concurrency control + semaphore = asyncio.Semaphore(self.max_concurrent) + + # Query all grid points concurrently + tasks = [ + self._query_parcel_at_point(x, y, semaphore) + for x, y in grid_points + ] + + # Process results in batches + batch = [] + processed = 0 + + for coro in asyncio.as_completed(tasks): + try: + parcel_data = await coro + processed += 1 + + if parcel_data: + batch.append(parcel_data) + + # Save batch when it reaches batch_size + if len(batch) >= self.batch_size: + await self._save_parcels_batch(batch) + batch = [] + + # Update stats + self.stats.unique_parcels_found = len(self.unique_parcels) + + # Call progress callback if provided + if progress_callback: + progress_callback({ + "processed": processed, + "total": len(grid_points), + "unique_parcels": self.stats.unique_parcels_found, + "saved": self.stats.parcels_saved, + "skipped": self.stats.parcels_skipped + }) + + # Log progress periodically + if processed % 100 == 0: + logger.info( + f"Progress: {processed}/{len(grid_points)} queries completed, " + f"{self.stats.unique_parcels_found} unique parcels found, " + f"{self.stats.parcels_saved} saved" + ) + + except Exception as e: + logger.error(f"Error processing query result: {e}", exc_info=True) + self.stats.errors.append(str(e)) + + # Save remaining batch + if batch: + await self._save_parcels_batch(batch) + + # Final stats update + self.stats.unique_parcels_found = len(self.unique_parcels) + + logger.info( + f"Scraping completed: {self.stats.unique_parcels_found} unique parcels found, " + f"{self.stats.parcels_saved} saved, {self.stats.parcels_skipped} skipped" + ) + + return self.stats + + +async def scrape_switzerland( + current_user: User, + grid_size: float = 500.0, + max_concurrent: int = 50, + batch_size: int = 100, + grid_points: Optional[List[tuple]] = None +) -> Dict[str, Any]: + """ + Main function to scrape Kanton Zürich for parcel data. + + Note: This function now only scrapes parcels from Kanton Zürich. + All parcels are queried for ÖREB bauzone information. + + Args: + current_user: User for database operations + grid_size: Size of grid cells in meters (default: 500m) + max_concurrent: Maximum concurrent API requests (default: 50) + batch_size: Number of parcels to process before saving (default: 100) + grid_points: Optional list of (x, y) coordinates to query (must be within Zürich bounds) + + Returns: + Dictionary with scraping statistics and results + """ + scraper = SwissTopoScraper( + current_user=current_user, + grid_size=grid_size, + max_concurrent=max_concurrent, + batch_size=batch_size + ) + + stats = await scraper.scrape(grid_points=grid_points) + + return { + "success": True, + "stats": { + "total_queries": stats.total_queries, + "successful_queries": stats.successful_queries, + "failed_queries": stats.failed_queries, + "unique_parcels_found": stats.unique_parcels_found, + "parcels_saved": stats.parcels_saved, + "parcels_skipped": stats.parcels_skipped, + "error_count": len(stats.errors), + "errors": stats.errors[:10] # Return first 10 errors + } + } diff --git a/modules/routes/routeRealEstateScraping.py b/modules/routes/routeRealEstateScraping.py new file mode 100644 index 00000000..4b8d2d0d --- /dev/null +++ b/modules/routes/routeRealEstateScraping.py @@ -0,0 +1,879 @@ +""" +Real Estate scraping routes for the backend API. +Implements endpoints for scraping real estate data from external sources. +""" + +import logging +import json +import aiohttp +import asyncio +from typing import Optional, Dict, Any +from fastapi import APIRouter, HTTPException, Depends, Body, Request, Query, status + +# Import auth modules +from modules.auth import limiter, getCurrentUser + +# Import models +from modules.datamodels.datamodelUam import User +from modules.datamodels.datamodelRealEstate import ( + Gemeinde, + Kanton, + Dokument, + Kontext, + DokumentTyp, +) + +# Import interfaces +from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface +from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface + +# Import scraping script +from modules.features.realEstate.scrapeSwissTopo import scrape_switzerland + +# Import Swiss Topo MapServer connector +from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector +from modules.connectors.connectorOerebWfs import OerebWfsConnector + +# Import Tavily connector for BZO document search +from modules.aicore.aicorePluginTavily import AiTavily + +# Configure logger +logger = logging.getLogger(__name__) + +# Create router for real estate scraping endpoints +router = APIRouter( + prefix="/api/realestate", + tags=["Real Estate Scraping"], + responses={ + 404: {"description": "Not found"}, + 400: {"description": "Bad request"}, + 401: {"description": "Unauthorized"}, + 403: {"description": "Forbidden"}, + 500: {"description": "Internal server error"} + } +) + + +@router.post("/scrape-switzerland", response_model=Dict[str, Any]) +@limiter.limit("5/hour") # Limit to 5 requests per hour (scraping is resource-intensive) +async def scrape_switzerland_route( + request: Request, + body: Dict[str, Any] = Body(..., description="Scraping parameters"), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """ + Scrape Kanton Zürich systematically using Swiss Topo connector and save parcel data to database. + + This endpoint divides Kanton Zürich into a grid and queries parcels at each grid point, + then deduplicates and saves unique parcels to the database. For each parcel, it also + queries the ÖREB WFS service to retrieve bauzone information. + + **WARNING**: This is a resource-intensive operation that may take a long time + and make many API requests. Use with caution. + + Request Body: + { + "grid_size": 500.0, // Grid cell size in meters (default: 500m) + "max_concurrent": 50, // Maximum concurrent API requests (default: 50) + "batch_size": 100 // Number of parcels to process before saving (default: 100) + } + + Headers: + - X-CSRF-Token: CSRF token (required for security) + + Returns: + { + "success": true, + "stats": { + "total_queries": 1234, + "successful_queries": 1200, + "failed_queries": 34, + "unique_parcels_found": 500, + "parcels_saved": 450, + "parcels_skipped": 50, + "error_count": 5, + "errors": [...] + } + } + + Example: + - POST /api/realestate/scrape-switzerland + Body: {"grid_size": 1000.0, "max_concurrent": 5, "batch_size": 50} + """ + try: + # Validate CSRF token + csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token") + if not csrf_token: + logger.warning(f"CSRF token missing for POST /api/realestate/scrape-switzerland from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="CSRF token missing. Please include X-CSRF-Token header." + ) + + # Basic CSRF token format validation + if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64: + logger.warning(f"Invalid CSRF token format for POST /api/realestate/scrape-switzerland from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid CSRF token format" + ) + + # Validate token is hex string + try: + int(csrf_token, 16) + except ValueError: + logger.warning(f"CSRF token is not a valid hex string for POST /api/realestate/scrape-switzerland from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid CSRF token format" + ) + + # Extract parameters from body with defaults + grid_size = body.get("grid_size", 500.0) + max_concurrent = body.get("max_concurrent", 50) + batch_size = body.get("batch_size", 100) + + # Validate parameters + if grid_size <= 0 or grid_size > 10000: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="grid_size must be between 0 and 10000 meters" + ) + + if max_concurrent <= 0 or max_concurrent > 200: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="max_concurrent must be between 1 and 200" + ) + + if batch_size <= 0 or batch_size > 1000: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="batch_size must be between 1 and 1000" + ) + + logger.info( + f"Starting Switzerland scraping for user {currentUser.id} (mandate: {currentUser.mandateId}) " + f"with grid_size={grid_size}, max_concurrent={max_concurrent}, batch_size={batch_size}" + ) + + # Run scraping operation + result = await scrape_switzerland( + current_user=currentUser, + grid_size=grid_size, + max_concurrent=max_concurrent, + batch_size=batch_size + ) + + logger.info( + f"Scraping completed for user {currentUser.id}: " + f"{result['stats']['parcels_saved']} parcels saved" + ) + + return result + + except HTTPException: + raise + except ValueError as e: + logger.error(f"Validation error in scrape_switzerland_route: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Validation error: {str(e)}" + ) + except Exception as e: + logger.error(f"Error scraping Switzerland: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error scraping Switzerland: {str(e)}" + ) + + +@router.get("/gemeinden", response_model=Dict[str, Any]) +@limiter.limit("60/minute") +async def get_all_gemeinden( + request: Request, + only_current: bool = Query(True, description="Only return current municipalities (exclude historical)"), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """ + Fetch all Gemeinden (municipalities) from the Swiss Topo MapServer connector + and save them to the database. + + This endpoint: + 1. Fetches all Swiss municipalities from the Swiss Federal Office of Topography + 2. Saves them to the database (skipping duplicates based on BFS number) + 3. Creates Kantone (cantons) as needed + 4. Returns statistics about the import operation + + Query Parameters: + - only_current: If True, only return current municipalities (default: True). + If False, return all municipalities including historical ones. + + Headers: + - X-CSRF-Token: CSRF token (required for security) + + Returns: + { + "gemeinden": [ + { + "id": "uuid", + "mandateId": "uuid", + "label": "Bern", + "id_kanton": "uuid", + "kontextInformationen": [...], + ... + }, + ... + ], + "count": 2162, + "stats": { + "gemeinden_created": 2100, + "gemeinden_skipped": 62, + "kantone_created": 26, + "error_count": 0, + "errors": [] + } + } + + Example: + - GET /api/realestate/gemeinden + - GET /api/realestate/gemeinden?only_current=false + """ + try: + # Validate CSRF token + csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token") + if not csrf_token: + logger.warning(f"CSRF token missing for GET /api/realestate/gemeinden from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="CSRF token missing. Please include X-CSRF-Token header." + ) + + # Basic CSRF token format validation + if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64: + logger.warning(f"Invalid CSRF token format for GET /api/realestate/gemeinden from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid CSRF token format" + ) + + # Validate token is hex string + try: + int(csrf_token, 16) + except ValueError: + logger.warning(f"CSRF token is not a valid hex string for GET /api/realestate/gemeinden from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid CSRF token format" + ) + + logger.info(f"Fetching all Gemeinden for user {currentUser.id} (mandate: {currentUser.mandateId}), only_current={only_current}") + + # Initialize connectors and fetch all gemeinden + oereb_connector = OerebWfsConnector() + connector = SwissTopoMapServerConnector(oereb_connector=oereb_connector) + gemeinden_data = await connector.get_all_gemeinden(only_current=only_current) + + # Get interface for database operations + realEstateInterface = getRealEstateInterface(currentUser) + + # Statistics + gemeinden_created = 0 + gemeinden_skipped = 0 + kantone_created = 0 + errors = [] + + # Cache for Kanton UUIDs + kanton_cache: Dict[str, str] = {} + + # Helper function to find Gemeinde by BFS number + def find_gemeinde_by_bfs_nummer(bfs_nummer: str) -> Optional[Gemeinde]: + """Find existing Gemeinde by BFS number (stored in kontextInformationen).""" + try: + gemeinden = realEstateInterface.getGemeinden( + recordFilter={"mandateId": currentUser.mandateId} + ) + + for gemeinde in gemeinden: + # Check kontextInformationen for bfs_nummer + for kontext in gemeinde.kontextInformationen: + try: + kontext_data = json.loads(kontext.inhalt) if isinstance(kontext.inhalt, str) else kontext.inhalt + if isinstance(kontext_data, dict): + if str(kontext_data.get("bfs_nummer")) == str(bfs_nummer): + return gemeinde + except (json.JSONDecodeError, AttributeError): + continue + + return None + except Exception as e: + logger.error(f"Error finding Gemeinde by BFS number {bfs_nummer}: {e}", exc_info=True) + return None + + # Helper function to get or create Kanton + def get_or_create_kanton(kanton_abk: str) -> Optional[str]: + """Get or create a Kanton by abbreviation.""" + nonlocal kantone_created, errors + + if not kanton_abk: + return None + + # Check cache first + if kanton_abk in kanton_cache: + return kanton_cache[kanton_abk] + + # Check if exists + kantone = realEstateInterface.getKantone( + recordFilter={ + "mandateId": currentUser.mandateId, + "abk": kanton_abk + } + ) + + if kantone: + kanton_cache[kanton_abk] = kantone[0].id + return kantone[0].id + + # Create new Kanton + try: + # Map common abbreviations to full names + kanton_names = { + "AG": "Aargau", "AI": "Appenzell Innerrhoden", "AR": "Appenzell Ausserrhoden", + "BE": "Bern", "BL": "Basel-Landschaft", "BS": "Basel-Stadt", + "FR": "Freiburg", "GE": "Genf", "GL": "Glarus", "GR": "Graubünden", + "JU": "Jura", "LU": "Luzern", "NE": "Neuenburg", "NW": "Nidwalden", + "OW": "Obwalden", "SG": "St. Gallen", "SH": "Schaffhausen", "SO": "Solothurn", + "SZ": "Schwyz", "TG": "Thurgau", "TI": "Tessin", "UR": "Uri", + "VD": "Waadt", "VS": "Wallis", "ZG": "Zug", "ZH": "Zürich" + } + + kanton_label = kanton_names.get(kanton_abk, kanton_abk) + + kanton = Kanton( + mandateId=currentUser.mandateId, + label=kanton_label, + abk=kanton_abk + ) + + created_kanton = realEstateInterface.createKanton(kanton) + if created_kanton and created_kanton.id: + kanton_cache[kanton_abk] = created_kanton.id + kantone_created += 1 + logger.info(f"Created new Kanton: {kanton_label} ({kanton_abk})") + return created_kanton.id + except Exception as e: + error_msg = f"Error creating Kanton {kanton_abk}: {e}" + logger.error(error_msg, exc_info=True) + errors.append(error_msg) + + return None + + # Process each gemeinde and save to database + saved_gemeinden = [] + for gemeinde_data in gemeinden_data: + try: + gemeinde_name = gemeinde_data.get("name") + bfs_nummer = gemeinde_data.get("bfs_nummer") + kanton_abk = gemeinde_data.get("kanton") + + if not gemeinde_name or not bfs_nummer: + logger.warning(f"Skipping Gemeinde with missing data: {gemeinde_data}") + gemeinden_skipped += 1 + continue + + # Check if Gemeinde already exists + existing_gemeinde = find_gemeinde_by_bfs_nummer(str(bfs_nummer)) + if existing_gemeinde: + logger.debug(f"Gemeinde {gemeinde_name} (BFS: {bfs_nummer}) already exists, skipping") + gemeinden_skipped += 1 + saved_gemeinden.append(existing_gemeinde.model_dump() if hasattr(existing_gemeinde, 'model_dump') else existing_gemeinde) + continue + + # Get or create Kanton + kanton_id = get_or_create_kanton(kanton_abk) if kanton_abk else None + + # Create new Gemeinde + gemeinde = Gemeinde( + mandateId=currentUser.mandateId, + label=gemeinde_name, + id_kanton=kanton_id, + kontextInformationen=[ + Kontext( + thema="BFS Nummer", + inhalt=json.dumps({"bfs_nummer": bfs_nummer}, ensure_ascii=False) + ) + ] + ) + + created_gemeinde = realEstateInterface.createGemeinde(gemeinde) + if created_gemeinde and created_gemeinde.id: + gemeinden_created += 1 + logger.info(f"Created new Gemeinde: {gemeinde_name} (BFS: {bfs_nummer})") + saved_gemeinden.append(created_gemeinde.model_dump() if hasattr(created_gemeinde, 'model_dump') else created_gemeinde) + else: + error_msg = f"Failed to create Gemeinde {gemeinde_name} (BFS: {bfs_nummer})" + logger.error(error_msg) + errors.append(error_msg) + gemeinden_skipped += 1 + + except Exception as e: + error_msg = f"Error processing Gemeinde {gemeinde_data.get('name', 'Unknown')}: {str(e)}" + logger.error(error_msg, exc_info=True) + errors.append(error_msg) + gemeinden_skipped += 1 + + logger.info( + f"Gemeinden import completed: {gemeinden_created} created, " + f"{gemeinden_skipped} skipped, {kantone_created} Kantone created" + ) + + return { + "gemeinden": saved_gemeinden, + "count": len(saved_gemeinden), + "stats": { + "gemeinden_created": gemeinden_created, + "gemeinden_skipped": gemeinden_skipped, + "kantone_created": kantone_created, + "error_count": len(errors), + "errors": errors[:10] # Return first 10 errors + } + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error fetching all Gemeinden: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error fetching Gemeinden: {str(e)}" + ) + + +def _get_language_from_kanton(kanton_abk: Optional[str]) -> str: + """ + Determine language (German/French/Italian) based on Kanton abbreviation. + + Args: + kanton_abk: Kanton abbreviation (e.g., 'ZH', 'VD', 'TI') + + Returns: + Language code: 'de' (German), 'fr' (French), or 'it' (Italian) + """ + if not kanton_abk: + return 'de' # Default to German + + # French-speaking cantons + french_cantons = {'VD', 'GE', 'NE', 'JU'} + # Italian-speaking canton + italian_cantons = {'TI'} + + kanton_upper = kanton_abk.upper() + if kanton_upper in french_cantons: + return 'fr' + elif kanton_upper in italian_cantons: + return 'it' + else: + return 'de' # Default to German + + +def _get_bzo_search_query(gemeinde_label: str, language: str) -> str: + """ + Generate language-specific BZO search query for a Gemeinde. + + Args: + gemeinde_label: Name of the Gemeinde + language: Language code ('de', 'fr', 'it') + + Returns: + Search query string + """ + if language == 'fr': + # French: Plan d'aménagement local or Règlement de construction + return f"Plan d'aménagement local {gemeinde_label} OR Règlement de construction {gemeinde_label}" + elif language == 'it': + # Italian: Piano di utilizzazione or Regolamento edilizio + return f"Piano di utilizzazione {gemeinde_label} OR Regolamento edilizio {gemeinde_label}" + else: + # German: Bau und Zonenordnung + return f"Bau und Zonenordnung {gemeinde_label}" + + +@router.post("/gemeinden/fetch-bzo-documents", response_model=Dict[str, Any]) +@limiter.limit("10/hour") # Resource-intensive operation +async def fetch_bzo_documents( + request: Request, + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """ + Search for and download Bau und Zonenordnung (BZO) documents for all Gemeinden. + + This endpoint: + 1. Fetches all Gemeinden from the database + 2. For each Gemeinde, determines language based on Kanton + 3. Uses Tavily search to find BZO documents (up to 5 results) + 4. Downloads all PDF files found and stores them with content + 5. Creates Dokument records for each PDF and links them to Gemeinde's dokumente field + 6. Skips Gemeinden that already have BZO documents + + Note: If Tavily returns multiple PDF results, all of them will be downloaded + and saved as separate Dokument records. + + Headers: + - X-CSRF-Token: CSRF token (required for security) + + Returns: + { + "success": true, + "stats": { + "gemeinden_processed": 100, + "documents_created": 85, + "documents_skipped": 15, + "errors": [] + }, + "results": [ + { + "gemeinde_id": "...", + "gemeinde_label": "Zürich", + "status": "created|skipped|error", + "dokument_ids": ["...", "..."], // List of created document IDs (can be multiple) + "error": null + } + ] + } + """ + try: + # Validate CSRF token + csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token") + if not csrf_token: + logger.warning(f"CSRF token missing for POST /api/realestate/gemeinden/fetch-bzo-documents from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="CSRF token missing. Please include X-CSRF-Token header." + ) + + # Basic CSRF token format validation + if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64: + logger.warning(f"Invalid CSRF token format for POST /api/realestate/gemeinden/fetch-bzo-documents from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid CSRF token format" + ) + + # Validate token is hex string + try: + int(csrf_token, 16) + except ValueError: + logger.warning(f"CSRF token is not a valid hex string for POST /api/realestate/gemeinden/fetch-bzo-documents from user {currentUser.id}") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid CSRF token format" + ) + + logger.info(f"Starting BZO document fetch for user {currentUser.id} (mandate: {currentUser.mandateId})") + + # Get interfaces + realEstateInterface = getRealEstateInterface(currentUser) + componentInterface = getComponentInterface(currentUser) + + # Initialize Tavily connector + tavily = AiTavily() + + # Get all Gemeinden + gemeinden = realEstateInterface.getGemeinden( + recordFilter={"mandateId": currentUser.mandateId} + ) + + logger.info(f"Found {len(gemeinden)} Gemeinden to process") + + # Statistics + stats = { + "gemeinden_processed": 0, + "documents_created": 0, + "documents_skipped": 0, + "errors": [] + } + results = [] + + # Process each Gemeinde + for gemeinde in gemeinden: + gemeinde_result = { + "gemeinde_id": gemeinde.id, + "gemeinde_label": gemeinde.label, + "status": None, + "dokument_ids": [], # Changed to list to support multiple documents + "error": None + } + + try: + stats["gemeinden_processed"] += 1 + + # Check if Gemeinde already has a BZO document + existing_bzo = False + if gemeinde.dokumente: + for doc in gemeinde.dokumente: + # Check if it's a BZO document by label or dokumentTyp + if (doc.label and ("BZO" in doc.label.upper() or "BAU UND ZONENORDNUNG" in doc.label.upper() or + "PLAN D'AMÉNAGEMENT" in doc.label.upper() or "RÈGLEMENT DE CONSTRUCTION" in doc.label.upper() or + "PIANO DI UTILIZZAZIONE" in doc.label.upper() or "REGOLAMENTO EDILIZIO" in doc.label.upper())) or \ + (doc.dokumentTyp and doc.dokumentTyp in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]): + existing_bzo = True + break + + if existing_bzo: + logger.debug(f"Gemeinde {gemeinde.label} already has BZO document, skipping") + gemeinde_result["status"] = "skipped" + stats["documents_skipped"] += 1 + results.append(gemeinde_result) + continue + + # Get Kanton to determine language + kanton_abk = None + if gemeinde.id_kanton: + kanton = realEstateInterface.getKanton(gemeinde.id_kanton) + if kanton: + kanton_abk = kanton.abk + + # Determine language + language = _get_language_from_kanton(kanton_abk) + + # Generate search query + search_query = _get_bzo_search_query(gemeinde.label, language) + logger.info(f"Searching for BZO document for {gemeinde.label} (language: {language}) with query: {search_query}") + + # Search with Tavily using the private _search method + search_results = await tavily._search( + query=search_query, + maxResults=5, + country="switzerland" + ) + + if not search_results: + logger.warning(f"No search results found for {gemeinde.label}") + gemeinde_result["status"] = "error" + gemeinde_result["error"] = "No search results found" + stats["errors"].append(f"{gemeinde.label}: No search results found") + results.append(gemeinde_result) + continue + + # Find all PDF URLs from search results + pdf_urls = [] + for result in search_results: + url = result.url.lower() + if url.endswith('.pdf') or 'pdf' in url: + pdf_urls.append(result.url) + + # If no PDF URLs found, try to use all results (they might be PDFs even without .pdf extension) + if not pdf_urls: + pdf_urls = [result.url for result in search_results] + logger.info(f"No explicit PDF URLs found for {gemeinde.label}, trying all {len(pdf_urls)} results") + + logger.info(f"Found {len(pdf_urls)} potential PDF documents for {gemeinde.label}") + + # Helper function to download a single PDF + async def download_pdf(pdf_url: str) -> Optional[bytes]: + """Download a PDF from a URL with retry logic.""" + max_retries = 3 + retry_delay = 2 + + for attempt in range(max_retries): + try: + # Create headers - use minimal headers on retry after 406 error + if attempt > 0: + # Minimal headers for retry + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + 'Accept': '*/*' + } + else: + # Full headers for first attempt + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', + 'Accept': 'application/pdf,application/octet-stream,*/*', + 'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br', + 'Connection': 'keep-alive', + 'Upgrade-Insecure-Requests': '1' + } + + timeout = aiohttp.ClientTimeout(total=30, connect=10) + async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: + async with session.get(pdf_url, allow_redirects=True) as response: + if response.status == 200: + pdf_content = await response.read() + + if not pdf_content or len(pdf_content) < 100: # Minimum size check + raise Exception("Downloaded file is too small or empty") + + # Verify it's actually a PDF (check PDF magic bytes) + if not pdf_content.startswith(b'%PDF'): + # Check if it's HTML (common error page) + if pdf_content.startswith(b'<') or pdf_content.startswith(b' 1: + file_name = f"BZO_{safe_name}_{idx + 1}.pdf" + doc_label = f"{base_doc_label} ({idx + 1})" + else: + file_name = f"BZO_{safe_name}.pdf" + doc_label = base_doc_label + + # Store file using ComponentObjects + try: + file_item = componentInterface.createFile( + name=file_name, + mimeType="application/pdf", + content=pdf_content + ) + + # Store file data + componentInterface.createFileData(file_item.id, pdf_content) + + logger.info(f"Stored file {file_name} with ID {file_item.id} for {gemeinde.label}") + except Exception as e: + logger.error(f"Error storing file {file_name} for {gemeinde.label}: {str(e)}", exc_info=True) + stats["errors"].append(f"{gemeinde.label}: File storage failed for {pdf_url} - {str(e)}") + continue + + # Create Dokument record + dokument = Dokument( + mandateId=currentUser.mandateId, + label=doc_label, + versionsbezeichnung="Aktuell", + dokumentTyp=DokumentTyp.GEMEINDE_BZO_AKTUELL, + dokumentReferenz=file_item.id, # FileId from ComponentObjects + quelle=pdf_url, # Original URL + mimeType="application/pdf", + kategorienTags=["BZO", "Bauordnung", gemeinde.label] + ) + + # Create Dokument record in the Dokument table + created_dokument = realEstateInterface.createDokument(dokument) + logger.info(f"Created Dokument record with ID {created_dokument.id} for {gemeinde.label} (from {pdf_url})") + + created_dokumente.append(created_dokument) + current_dokumente.append(created_dokument) + gemeinde_result["dokument_ids"].append(created_dokument.id) + + except Exception as e: + logger.error(f"Error processing PDF {pdf_url} for {gemeinde.label}: {str(e)}", exc_info=True) + stats["errors"].append(f"{gemeinde.label}: Error processing PDF {pdf_url} - {str(e)}") + continue + + # Update Gemeinde with all new dokumente + if created_dokumente: + updated_gemeinde = realEstateInterface.updateGemeinde( + gemeinde.id, + {"dokumente": current_dokumente} + ) + + if updated_gemeinde: + logger.info(f"Successfully created {len(created_dokumente)} BZO document(s) for {gemeinde.label}") + gemeinde_result["status"] = "created" + stats["documents_created"] += len(created_dokumente) + else: + raise Exception("Failed to update Gemeinde") + else: + # No documents were successfully created + gemeinde_result["status"] = "error" + gemeinde_result["error"] = "No PDFs could be downloaded or processed" + stats["errors"].append(f"{gemeinde.label}: No PDFs could be downloaded or processed") + + except Exception as e: + logger.error(f"Error processing Gemeinde {gemeinde.label}: {str(e)}", exc_info=True) + gemeinde_result["status"] = "error" + gemeinde_result["error"] = str(e) + stats["errors"].append(f"{gemeinde.label}: {str(e)}") + + results.append(gemeinde_result) + + logger.info( + f"BZO document fetch completed: {stats['documents_created']} created, " + f"{stats['documents_skipped']} skipped, {len(stats['errors'])} errors" + ) + + return { + "success": True, + "stats": stats, + "results": results + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error fetching BZO documents: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error fetching BZO documents: {str(e)}" + )