diff --git a/env_dev.env b/env_dev.env index da6f931f..a195c856 100644 --- a/env_dev.env +++ b/env_dev.env @@ -75,3 +75,7 @@ PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switz MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss +# Zurich WFS Parcels (dynamic map layer). Default: Stadt Zürich OGD. Override for full canton if wfs.zh.ch resolves. +# Connector_ZhWfsParcels_WFS_URL = https://wfs.zh.ch/av +# Connector_ZhWfsParcels_TYPENAMES = av_li_liegenschaften_a + diff --git a/modules/connectors/connectorSwissTopoMapServer.py b/modules/connectors/connectorSwissTopoMapServer.py index f4ebb8ff..d7b7a91e 100644 --- a/modules/connectors/connectorSwissTopoMapServer.py +++ b/modules/connectors/connectorSwissTopoMapServer.py @@ -7,6 +7,7 @@ MapServer identify endpoint for parcel data retrieval. Endpoint: https://api3.geo.admin.ch/rest/services/api/MapServer/identify """ +import json import logging import asyncio import re @@ -29,7 +30,9 @@ class SwissTopoMapServerConnector: # API endpoints MAPSERVER_IDENTIFY_URL = "https://api3.geo.admin.ch/rest/services/api/MapServer/identify" + MAPSERVER_FIND_URL = "https://api3.geo.admin.ch/rest/services/ech/MapServer/find" GEOCODING_URL = "https://api3.geo.admin.ch/rest/services/api/SearchServer" + LAYER_GEMEINDE = "ch.swisstopo.swissboundaries3d-gemeinde-flaeche.fill" # Swiss official survey layer LAYER_AMTLICHE_VERMESSUNG = "all:ch.swisstopo-vd.amtliche-vermessung" @@ -46,7 +49,8 @@ class SwissTopoMapServerConnector: self, timeout: int = 30, max_retries: int = 3, - retry_delay: float = 1.0 + retry_delay: float = 1.0, + oereb_connector: Optional[Any] = None, ): """ Initialize MapServer connector. @@ -55,10 +59,12 @@ class SwissTopoMapServerConnector: timeout: Request timeout in seconds max_retries: Maximum number of retry attempts retry_delay: Initial retry delay in seconds (exponential backoff) + oereb_connector: Optional OerebWfsConnector for zone queries (used by scraping) """ self.timeout = aiohttp.ClientTimeout(total=timeout) self.max_retries = max_retries self.retry_delay = retry_delay + self.oereb_connector = oereb_connector logger.info("Swiss Topo MapServer Connector initialized") @@ -128,6 +134,136 @@ class SwissTopoMapServerConnector: return municipality + async def get_gemeinde_by_name(self, gemeinde_name: str) -> Optional[Dict[str, Any]]: + """ + Fetch a single Gemeinde from Swiss Topo by name (e.g. "Zürich"). + Uses Find API with exact/contains search. Returns the best match. + + Returns: + Dict with keys: name, bfs_nummer, kanton, or None if not found + """ + if not gemeinde_name or not gemeinde_name.strip(): + return None + search_text = gemeinde_name.strip() + # Try exact match first (Zürich) + for q in [search_text, search_text.split("(")[0].strip()]: + try: + params = { + "layer": self.LAYER_GEMEINDE, + "searchText": q, + "searchField": "gemname", + "returnGeometry": "false", + "contains": "true", + } + data = await self._make_request(self.MAPSERVER_FIND_URL, params) + results = data.get("results", []) + if not results: + continue + # Pick best match: exact name first, then by highest jahr + target_lower = (gemeinde_name or "").strip().lower() + best = None + best_score = -1 + for feat in results: + attrs = feat.get("attributes", {}) + gemname = attrs.get("gemname") or attrs.get("label", "") + cleaned = self._clean_municipality_name(gemname) + gde_nr = attrs.get("gde_nr") + kanton = attrs.get("kanton") + jahr = attrs.get("jahr", 0) + objektart = attrs.get("objektart", attrs.get("objval")) + if objektart is not None and int(objektart) != 11: + continue + if not gde_nr or not kanton: + continue + # Score: exact match = 100, partial = 50, else by jahr + cleaned_lower = cleaned.strip().lower() + if cleaned_lower == target_lower: + score = 1000 + jahr + elif target_lower in cleaned_lower or cleaned_lower in target_lower: + score = 500 + jahr + else: + score = jahr + if score > best_score: + best_score = score + best = {"name": cleaned, "bfs_nummer": int(gde_nr), "kanton": str(kanton)} + if best: + logger.info(f"Found Gemeinde '{best['name']}' (BFS {best['bfs_nummer']}) for search '{gemeinde_name}'") + return best + except Exception as e: + logger.warning(f"Error fetching Gemeinde '{q}': {e}") + continue + return None + + async def get_all_gemeinden(self, only_current: bool = True) -> List[Dict[str, Any]]: + """ + Fetch all Swiss municipalities (Gemeinden) from the Swiss Topo MapServer. + + Uses the Find API to query the municipality layer. Iterates with search + strings to collect all municipalities, then deduplicates by BFS number. + + Note: layerDefs is not used - the MapServer Find API reports that + is_current_jahr/objektart are not queryable. Filtering is done client-side. + + Args: + only_current: If True, keep only the latest jahr per BFS number (current municipalities). + + Returns: + List of dicts with keys: name, bfs_nummer, kanton + """ + # Search strings to achieve broad coverage (Swiss municipality names) + search_chars = [ + "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", + "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z", + "ä", "ö", "ü", "é", "è", "à", "â", "î", "ô", "ç", "-", " ", "'", + ] + + seen: Dict[Tuple[int, str], Dict[str, Any]] = {} + + for char in search_chars: + try: + params = { + "layer": self.LAYER_GEMEINDE, + "searchText": char, + "searchField": "gemname", + "returnGeometry": "false", + "contains": "true", + } + # Do not use layerDefs - is_current_jahr/objektart are not queryable on this layer + data = await self._make_request(self.MAPSERVER_FIND_URL, params) + results = data.get("results", []) + + for feat in results: + attrs = feat.get("attributes", {}) + gde_nr = attrs.get("gde_nr") + kanton = attrs.get("kanton") + gemname = attrs.get("gemname") or attrs.get("label", "") + jahr = attrs.get("jahr", 0) + objektart = attrs.get("objektart", attrs.get("objval")) + # Only include politische Gemeinden (objektart 11) when attribute present + if objektart is not None and only_current and int(objektart) != 11: + continue + if not gde_nr or not kanton: + continue + key = (int(gde_nr), str(kanton)) + if key not in seen or jahr > (seen[key].get("_jahr") or 0): + cleaned_name = self._clean_municipality_name(gemname) + seen[key] = { + "name": cleaned_name, + "bfs_nummer": int(gde_nr), + "kanton": str(kanton), + "_jahr": jahr, + } + except Exception as e: + logger.warning(f"Error fetching gemeinden for search '{char}': {e}") + continue + + result = [ + {"name": v["name"], "bfs_nummer": v["bfs_nummer"], "kanton": v["kanton"]} + for v in seen.values() + ] + logger.info(f"Fetched {len(result)} unique Gemeinden from Swiss Topo (only_current={only_current})") + return result + def _extract_address_from_building_attrs(self, attrs: Dict[str, Any]) -> Dict[str, Optional[str]]: """ Extract address components from building layer attributes. diff --git a/modules/connectors/connectorZhWfsParcels.py b/modules/connectors/connectorZhWfsParcels.py new file mode 100644 index 00000000..066c1727 --- /dev/null +++ b/modules/connectors/connectorZhWfsParcels.py @@ -0,0 +1,89 @@ +""" +Swiss Parcel (Liegenschaften) Connector + +Fetches parcel data from geodienste.ch OGC API Features (Amtliche Vermessung). +Covers all of Switzerland. Returns GeoJSON in WGS84. + +Uses: geodienste.ch OGC API - RESF collection (Liegenschaften) +No config override needed - this is the single working solution. +""" + +import logging +from typing import Dict, Any + +import requests + +logger = logging.getLogger(__name__) + +# geodienste.ch OGC API - RESF = Liegenschaften (parcels), all Switzerland +# API returns WGS84 directly when bbox-crs=EPSG:2056 is used +_OGC_API_BASE = "https://www.geodienste.ch/db/av_0/deu/ogcapi/collections/RESF/items" +_MAX_ITEMS = 2000 +_TIMEOUT = 30 + + +class ZhWfsParcelsConnector: + """ + Connector for Swiss parcel (Liegenschaften) data via geodienste.ch OGC API. + Returns GeoJSON FeatureCollection in WGS84. + """ + + def __init__(self, timeout: int = _TIMEOUT): + self.timeout = timeout + logger.info("ZhWfsParcelsConnector initialized (geodienste.ch OGC API)") + + def get_parcels_by_bbox(self, bbox: str) -> Dict[str, Any]: + """ + Fetch parcels within bounding box. + Returns GeoJSON FeatureCollection in WGS84 (EPSG:4326). + + Args: + bbox: Bounding box as "minx,miny,maxx,maxy" in LV95 (EPSG:2056) + + Returns: + GeoJSON FeatureCollection with geometries in WGS84 + """ + try: + parts = [p.strip() for p in bbox.split(",")] + if len(parts) != 4: + raise ValueError(f"Invalid bbox: expected minx,miny,maxx,maxy, got {bbox}") + minx, miny, maxx, maxy = (float(p) for p in parts) + + params = { + "f": "json", + "limit": _MAX_ITEMS, + "bbox": f"{minx},{miny},{maxx},{maxy}", + "bbox-crs": "http://www.opengis.net/def/crs/EPSG/0/2056", + } + + logger.debug(f"Requesting parcels: bbox={bbox}") + resp = requests.get(_OGC_API_BASE, params=params, timeout=self.timeout) + + if resp.status_code != 200: + logger.error(f"Parcel API failed: status={resp.status_code}, body={resp.text[:500]}") + return {"type": "FeatureCollection", "features": []} + + data = resp.json() + + # OGC API returns FeatureCollection in WGS84 directly + features = data.get("features", []) + if not features: + return {"type": "FeatureCollection", "features": []} + + # Pass through - geodienste returns WGS84 GeoJSON + result = { + "type": "FeatureCollection", + "features": features, + } + logger.info(f"Returned {len(features)} parcels in WGS84") + return result + + except ValueError as e: + logger.warning(f"Invalid bbox: {e}") + raise + except requests.RequestException as e: + logger.error(f"Parcel API request error: {e}") + return {"type": "FeatureCollection", "features": []} + except Exception as e: + logger.error(f"Error fetching parcels: {e}", exc_info=True) + return {"type": "FeatureCollection", "features": []} diff --git a/modules/features/realEstate/bzoDocumentRetriever.py b/modules/features/realEstate/bzoDocumentRetriever.py index a86c763b..a9356301 100644 --- a/modules/features/realEstate/bzoDocumentRetriever.py +++ b/modules/features/realEstate/bzoDocumentRetriever.py @@ -5,9 +5,9 @@ Queries Dokument table and retrieves PDF content from ComponentObjects. import logging from typing import List, Dict, Any, Optional -from modules.datamodels.datamodelRealEstate import Dokument, DokumentTyp, Gemeinde -from modules.interfaces.interfaceDbRealEstateObjects import RealEstateObjects -from modules.interfaces.interfaceDbComponentObjects import ComponentObjects +from .datamodelFeatureRealEstate import Dokument, DokumentTyp, Gemeinde +from .interfaceFeatureRealEstate import RealEstateObjects +from modules.interfaces.interfaceDbManagement import ComponentObjects logger = logging.getLogger(__name__) @@ -128,8 +128,8 @@ class BZODocumentRetriever: logger.warning(f"Dokument {dokument.id} has no dokumentReferenz") return None - # Retrieve PDF bytes - pdf_bytes = self.componentInterface.getFileData(dokument.dokumentReferenz) + # Retrieve PDF bytes (unrestricted - BZO documents are public, accessible to all users) + pdf_bytes = self.componentInterface.getFileDataForPublicDocument(dokument.dokumentReferenz) if not pdf_bytes: logger.warning(f"Could not retrieve PDF content for file {dokument.dokumentReferenz}") diff --git a/modules/features/realEstate/bzoExtractionLangGraph.py b/modules/features/realEstate/bzoExtractionLangGraph.py index 0f5f909a..9bec66fe 100644 --- a/modules/features/realEstate/bzoExtractionLangGraph.py +++ b/modules/features/realEstate/bzoExtractionLangGraph.py @@ -14,6 +14,25 @@ from modules.features.realEstate.bzoRuleTaxonomy import RULE_TAXONOMY logger = logging.getLogger(__name__) +# ===== BZO Params Extraction State (LangGraph with LLM) ===== + +class BZOParamsExtractionState(TypedDict): + """State for BZO params extraction via LLM.""" + extracted_content: Dict[str, Any] + bauzone: str + total_area_m2: Optional[float] + relevant_rules: List[Dict[str, Any]] + relevant_articles: List[Dict[str, Any]] + zone_parameter_tables: List[Dict[str, Any]] + ai_service: Any + gemeinde: str + # Output + bauzone_params_list: List[str] + fakten: List[Dict[str, str]] + zusatzinformationen: List[Dict[str, Any]] + errors: List[str] + + # ===== State Definition ===== @dataclass @@ -111,158 +130,82 @@ class BZOExtractionState(TypedDict): warnings: List[str] -# ===== Node Implementations ===== +# ===== Node Implementations (Simplified 4-node pipeline) ===== -def extract_pdf_text(state: BZOExtractionState) -> BZOExtractionState: - """Extract text blocks from PDF.""" - try: - # PDF bytes should be passed in state context - # This is handled in run_extraction function - # State already has text_blocks populated - return state - - except Exception as e: - logger.error(f"Error extracting PDF text: {e}", exc_info=True) - state["errors"] = state.get("errors", []) + [f"PDF extraction error: {str(e)}"] - return state - - -def classify_text_block(state: BZOExtractionState) -> BZOExtractionState: - """Classify text blocks into articles, headings, tables, etc.""" +def classify_and_assemble(state: BZOExtractionState) -> BZOExtractionState: + """Classify text blocks and assemble into articles (merged node).""" try: classified = [] - for block_dict in state["text_blocks"]: text = block_dict["text"].strip() if not text: continue - block_type = "other" article_label = None article_title = None - - # Check for article patterns article_match = re.search(r'Art\.?\s*(\d+[a-z]?)', text, re.IGNORECASE) if article_match: block_type = "article" article_label = f"Art. {article_match.group(1)}" - # Try to extract title (text after article label, before first period or newline) title_match = re.search(r'Art\.?\s*\d+[a-z]?\s+(.+?)(?:\.|$|\n)', text, re.IGNORECASE) if title_match: article_title = title_match.group(1).strip() - - # Check for heading patterns (Roman numerals, letters, numbers) elif re.match(r'^[A-Z]\.\s+[A-Z]', text) or re.match(r'^[IVX]+\.\s+[A-Z]', text) or re.match(r'^\d+\.\s+[A-Z]', text): block_type = "heading" - - # Check for table patterns (multiple tabs or aligned columns) elif '\t' in text or (len(text.split()) > 5 and text.count(' ') > 2): block_type = "table" - classified.append({ - "block": { - "page": block_dict["page"], - "text": block_dict["text"], - "block_id": block_dict["block_id"], - "bbox": block_dict.get("bbox") - }, - "block_type": block_type, - "article_label": article_label, - "article_title": article_title + "block": {"page": block_dict["page"], "text": block_dict["text"], "block_id": block_dict["block_id"], "bbox": block_dict.get("bbox")}, + "block_type": block_type, "article_label": article_label, "article_title": article_title }) - - # Update state with new classified blocks - existing_blocks = state.get("classified_blocks", []) - state["classified_blocks"] = existing_blocks + classified - return state - - except Exception as e: - logger.error(f"Error classifying text blocks: {e}", exc_info=True) - state["errors"] = state.get("errors", []) + [f"Classification error: {str(e)}"] - return state + state["classified_blocks"] = classified - -def assemble_articles(state: BZOExtractionState) -> BZOExtractionState: - """Assemble classified blocks into articles with hierarchical structure.""" - try: articles = [] current_article = None - current_section_1 = None - current_section_2 = None - current_section_3 = None - - for classified_dict in state["classified_blocks"]: + current_section_1 = current_section_2 = current_section_3 = None + for classified_dict in classified: block_dict = classified_dict["block"] - block = TextBlock( - page=block_dict["page"], - text=block_dict["text"], - block_id=block_dict["block_id"], - bbox=block_dict.get("bbox") - ) - text = block.text.strip() + text = block_dict["text"].strip() block_type = classified_dict["block_type"] article_label = classified_dict.get("article_label") article_title = classified_dict.get("article_title") - - # Update section levels if block_type == "heading": - # Level 1: A., B., C. if re.match(r'^[A-Z]\.\s+', text): current_section_1 = text.split('.', 1)[0] + '.' - current_section_2 = None - current_section_3 = None - # Level 2: I., II., III. + current_section_2 = current_section_3 = None elif re.match(r'^[IVX]+\.\s+', text): current_section_2 = text.split('.', 1)[0] + '.' current_section_3 = None - # Level 3: 1., 2., 3. elif re.match(r'^\d+\.\s+', text): current_section_3 = text.split('.', 1)[0] + '.' - - # Start new article if article_label: - # Save previous article if exists if current_article: articles.append(current_article) - - # Start new article current_article = { - "article_label": article_label, - "article_title": article_title, - "text": text, - "page_start": block.page, - "page_end": block.page, - "section_level_1": current_section_1, - "section_level_2": current_section_2, - "section_level_3": current_section_3, - "zone_raw": None + "article_label": article_label, "article_title": article_title, "text": text, + "page_start": block_dict["page"], "page_end": block_dict["page"], + "section_level_1": current_section_1, "section_level_2": current_section_2, + "section_level_3": current_section_3, "zone_raw": None } - # Continue current article elif current_article: current_article["text"] += "\n" + text - current_article["page_end"] = block.page - - # Add last article + current_article["page_end"] = block_dict["page"] if current_article: articles.append(current_article) - - # Update state with new articles - existing_articles = state.get("articles", []) - state["articles"] = existing_articles + articles + state["articles"] = articles return state - except Exception as e: - logger.error(f"Error assembling articles: {e}", exc_info=True) - state["errors"] = state.get("errors", []) + [f"Article assembly error: {str(e)}"] + logger.error(f"Error in classify_and_assemble: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Classify/assemble error: {str(e)}"] return state -def detect_zone_changes(state: BZOExtractionState) -> BZOExtractionState: - """Detect zone declarations and maintain zone scope.""" +def extract_zones_and_tables(state: BZOExtractionState) -> BZOExtractionState: + """Detect zones and extract zone-parameter tables (merged node).""" try: + # Part 1: Detect zone declarations zones = [] - current_zones = state.get("current_zones", {}) - + current_zones = {} for article_dict in state["articles"]: text = article_dict.get("text", "") article_label = article_dict.get("article_label", "") @@ -328,83 +271,184 @@ def detect_zone_changes(state: BZOExtractionState) -> BZOExtractionState: "page": page_start }) - # Update state with zones state["current_zones"] = current_zones - existing_zones = state.get("zones", []) - state["zones"] = existing_zones + zones + state["zones"] = zones + + # Part 2: Extract zone-parameter tables + _extract_zone_parameter_tables_impl(state) return state - except Exception as e: - logger.error(f"Error detecting zones: {e}", exc_info=True) - state["errors"] = state.get("errors", []) + [f"Zone detection error: {str(e)}"] + logger.error(f"Error in extract_zones_and_tables: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Zones/tables error: {str(e)}"] return state -def detect_rule_candidates(state: BZOExtractionState) -> BZOExtractionState: - """Detect rule candidates using pattern matching.""" +def _extract_zone_parameter_tables_impl(state: BZOExtractionState) -> None: + """Extract zone-parameter tables from classified blocks. Mutates state in place.""" + tables = [] + table_blocks = [b for b in state.get("classified_blocks", []) if b.get("block_type") == "table"] + zone_pattern = r'\b([WLIZK]\d+(?:/\d+)?(?:G\*?)?)\b' + parameter_keywords = [ + r'Ausnützungsziffer', r'Überbauungsziffer', r'Vollgeschosse', r'Dachgeschosse', r'Attikageschoss', r'Untergeschoss', + r'Gebäudelänge', r'Grenzabstand', r'Fassadenhöhen', r'Grundabstand', r'Mehrlängen', r'Höchstmass' + ] + parameter_row_patterns = [ + r'^[a-g]\)\s+(.+?)(?:\s+max\.|min\.|:)?', + r'^(Ausnützungsziffer|Überbauungsziffer|Vollgeschosse|Dachgeschosse|Attikageschoss|Untergeschoss|Gebäudelänge|Grenzabstand|Fassadenhöhen|Grundabstand|Mehrlängen|Höchstmass|Höchstmaß)', + ] + subparameter_patterns = [ + r'^(Grundabstand|Mehrlängen|Höchstmass|Höchstmaß|Fassadenhöhen)\s*(min\.|max\.)?', + r'^(anrechenbare\s+Dachgeschosse|anrechenbares\s+Attikageschoss|anrechenbares\s+Untergeschoss)', + ] + numeric_pattern = r'(\d+(?:\.\d+)?)\s*(%|m|Geschoss|Geschosse|Geschosse\s+max\.?|Geschoss\s+max\.?)?' + for table_block in table_blocks: + block_dict = table_block.get("block", {}) + text = block_dict.get("text", "") + page = block_dict.get("page", 0) + if not text or len(text.strip()) < 20: + continue + lines = text.split('\n') + header_row_idx, zone_columns = None, [] + for idx, line in enumerate(lines): + zone_matches = re.findall(zone_pattern, line, re.IGNORECASE) + if len(zone_matches) >= 3: + header_row_idx, zone_columns = idx, zone_matches + break + if not zone_columns: + has_parameters = any(re.search(kw, text, re.IGNORECASE) for kw in parameter_keywords) + has_zones = len(re.findall(zone_pattern, text, re.IGNORECASE)) >= 3 + if has_parameters and has_zones: + zone_columns = list(dict.fromkeys(re.findall(zone_pattern, text, re.IGNORECASE))) + header_row_idx = 0 + if not zone_columns: + continue + article_context = None + for block in state.get("classified_blocks", []): + if block.get("block", {}).get("page") == page and block.get("article_label"): + article_context = block.get("article_label") + break + table_data = {"page": page, "zones": zone_columns, "parameters": [], "source_text": text[:500], "article": article_context} + start_idx = (header_row_idx + 1) if header_row_idx is not None else 0 + current_parameter = current_subparameter = None + parameter_values = subparameter_values = {} + for line_idx in range(start_idx, len(lines)): + line = lines[line_idx].strip() + if not line: + continue + is_parameter_row, parameter_name = False, None + for pat in parameter_row_patterns: + m = re.match(pat, line, re.IGNORECASE) + if m: + is_parameter_row, parameter_name = True, re.sub(r'\s+max\.?\s*$', '', re.sub(r'\s+min\.?\s*$', '', m.group(1).strip(), flags=re.I), flags=re.I) + break + is_subparameter, subparameter_name = False, None + if not is_parameter_row: + for pat in subparameter_patterns: + m = re.search(pat, line, re.IGNORECASE) + if m: + is_subparameter, subparameter_name = True, m.group(1).strip() + (f" {m.group(2).strip()}" if m.lastindex and m.lastindex >= 2 and m.group(2) else "") + break + target_values = subparameter_values if current_subparameter else parameter_values + if is_parameter_row and parameter_name: + if current_parameter and parameter_values: + table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context}) + current_parameter, current_subparameter, parameter_values, subparameter_values = parameter_name, None, {}, {} + continue + if is_subparameter and subparameter_name: + if current_subparameter and subparameter_values and current_parameter: + table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context}) + current_subparameter, subparameter_values = subparameter_name, {} + continue + if current_parameter or current_subparameter: + line_parts = re.split(r'\s{2,}|\t', line) + line_parts = [p.strip() for p in line_parts if p.strip()] + n = len(zone_columns) + value_parts = [] + # Column-based: extract trailing numeric/fraction parts that align with zone count + for p in reversed(line_parts): + if re.match(r'^\d+(?:\.\d+)?\s*(%|m)?$', p, re.I) or re.match(r'^\d+/\d+$', p): + val = re.sub(r'\s*(%|m)$', '', p, flags=re.I).strip() + unit = None + um = re.search(r'\s*(%|m)$', p, re.I) + if um: + unit = 'm' if um.group(1).lower() == 'm' else '%' + value_parts.insert(0, (val, unit)) + else: + break + if len(value_parts) == n: + for zi, zone in enumerate(zone_columns): + if zone not in target_values: + target_values[zone] = [] + val, unit = value_parts[zi] + target_values[zone].append({"value": val, "unit": unit, "raw_text": line[:200], "line_number": line_idx}) + else: + # Fallback: regex match by character position + all_matches = [(m.start(), m.group(0), m.group(1), m.group(2) if m.lastindex and m.lastindex > 1 else None) for m in re.finditer(numeric_pattern, line, re.I)] + all_matches += [(m.start(), m.group(0), m.group(0), None) for m in re.finditer(r'(\d+/\d+)', line, re.I)] + all_matches.sort(key=lambda x: x[0]) + if len(all_matches) == n: + for zi, zone in enumerate(zone_columns): + if zone not in target_values: + target_values[zone] = [] + _, _, val, unit = all_matches[zi] + target_values[zone].append({"value": val, "unit": unit.strip() if unit else None, "raw_text": line[:200], "line_number": line_idx}) + if current_subparameter and subparameter_values and current_parameter: + table_data["parameters"].append({"parameter": f"{current_parameter} - {current_subparameter}", "values_by_zone": subparameter_values.copy(), "article": article_context}) + if current_parameter and parameter_values: + table_data["parameters"].append({"parameter": current_parameter, "values_by_zone": parameter_values.copy(), "article": article_context}) + if table_data["parameters"]: + tables.append(table_data) + state["zone_parameter_tables"] = state.get("zone_parameter_tables", []) + tables + if tables: + logger.info(f"Extracted {len(tables)} zone-parameter tables") + + +# Zone code pattern: W5, W2/30, Z3, K3/4, W5G, W 5 (optional space) +_ZONE_CODE_PATTERN = re.compile(r'\b([WZIK]\s*\d+(?:\s*/\s*\d+)?(?:G)?)\b', re.IGNORECASE) + + +def _zones_in_text(text: str) -> List[str]: + """Extract zone codes (W5, W2/30, Z3, etc.) from text. Returns unique list, normalized (e.g. W5).""" + matches = _ZONE_CODE_PATTERN.findall(text) + seen = set() + result = [] + for m in matches: + # Normalize: remove spaces -> W5, W2/30 + n = re.sub(r'\s+', '', m).upper() + if n and n not in seen: + seen.add(n) + result.append(n) + return result + + +def extract_rules(state: BZOExtractionState) -> BZOExtractionState: + """Detect rule candidates and parse values. Associates each rule with zones from its source article.""" try: candidates = [] - for article_dict in state["articles"]: text = article_dict.get("text", "") - article_label = article_dict.get("article_label", "") page_start = article_dict.get("page_start", 0) - - # Check each rule type in taxonomy + # Zones mentioned in THIS article - rules from this article apply to these zones + article_zones = _zones_in_text(text) for rule_type, rule_config in RULE_TAXONOMY.items(): - patterns = rule_config.get("patterns", []) - - for pattern in patterns: - # Create regex pattern (case-insensitive) - regex_pattern = re.compile(pattern, re.IGNORECASE) - matches = regex_pattern.finditer(text) - - for match in matches: - # Extract context around match - start = max(0, match.start() - 100) - end = min(len(text), match.end() + 100) + for pattern in rule_config.get("patterns", []): + for match in re.finditer(pattern, text, re.IGNORECASE): + start, end = max(0, match.start() - 100), min(len(text), match.end() + 100) context = text[start:end] - - # Check for conditions (geographic, temporal, etc.) condition_text = None - condition_patterns = [ - r'(?:nördlich|südlich|östlich|westlich|oberhalb|unterhalb)\s+[^,\.]+', - r'(?:für|bei|in)\s+[^,\.]+', - ] - for cond_pattern in condition_patterns: - cond_match = re.search(cond_pattern, context, re.IGNORECASE) - if cond_match: - condition_text = cond_match.group(0) + for cond_pat in [r'(?:nördlich|südlich|östlich|westlich|oberhalb|unterhalb)\s+[^,\.]+', r'(?:für|bei|in)\s+[^,\.]+']: + cm = re.search(cond_pat, context, re.IGNORECASE) + if cm: + condition_text = cm.group(0) break - - candidate = { - "rule_type": rule_type, - "matched_text": match.group(0), - "article_text": text, - "page": page_start, - "condition_text": condition_text, - "is_table_rule": False, - "table_zones": [] - } - candidates.append(candidate) - - # Update state with rule candidates - existing_candidates = state.get("rule_candidates", []) - state["rule_candidates"] = existing_candidates + candidates - return state - - except Exception as e: - logger.error(f"Error detecting rule candidates: {e}", exc_info=True) - state["errors"] = state.get("errors", []) + [f"Rule candidate detection error: {str(e)}"] - return state - - -def parse_rule_values(state: BZOExtractionState) -> BZOExtractionState: - """Parse rule values using regex (LLM fallback can be added later).""" - try: + candidates.append({ + "rule_type": rule_type, "matched_text": match.group(0), "article_text": text, + "page": page_start, "article_label": article_dict.get("article_label"), + "condition_text": condition_text, "is_table_rule": False, + "table_zones": article_zones.copy(), + }) parsed_rules = [] - - for candidate_dict in state["rule_candidates"]: + for candidate_dict in candidates: rule_type = candidate_dict["rule_type"] rule_config = RULE_TAXONOMY.get(rule_type, {}) units = rule_config.get("units", []) @@ -456,16 +500,11 @@ def parse_rule_values(state: BZOExtractionState) -> BZOExtractionState: if unit: confidence = 0.9 - # Determine zone and scope - zone_raw = None - rule_scope = "general" - - # Check current zones context - if state.get("current_zones"): - # Use first zone as default (can be improved) - zone_raw = list(state["current_zones"].keys())[0] if state["current_zones"] else None - rule_scope = "zone" if zone_raw else "general" - + # Zone association from source article (zones mentioned in that article) + article_zones = candidate_dict.get("table_zones", []) + zone_raw = article_zones[0] if article_zones else None + rule_scope = "zone" if zone_raw else "general" + parsed_rule = { "rule_type": rule_type, "value_numeric": value_numeric, @@ -473,399 +512,794 @@ def parse_rule_values(state: BZOExtractionState) -> BZOExtractionState: "unit": unit, "condition_text": candidate_dict.get("condition_text"), "is_table_rule": candidate_dict.get("is_table_rule", False), - "table_zones": candidate_dict.get("table_zones", []), + "table_zones": article_zones, "page": candidate_dict["page"], + "article_label": candidate_dict.get("article_label"), "text_snippet": value_text, "zone_raw": zone_raw, "rule_scope": rule_scope, "confidence": confidence } parsed_rules.append(parsed_rule) - - # Update state with parsed rules - existing_rules = state.get("parsed_rules", []) - state["parsed_rules"] = existing_rules + parsed_rules + state["parsed_rules"] = parsed_rules return state - except Exception as e: - logger.error(f"Error parsing rule values: {e}", exc_info=True) - state["errors"] = state.get("errors", []) + [f"Rule parsing error: {str(e)}"] + logger.error(f"Error in extract_rules: {e}", exc_info=True) + state["errors"] = state.get("errors", []) + [f"Extract rules error: {str(e)}"] return state -def assign_zone_and_scope(state: BZOExtractionState) -> BZOExtractionState: - """Assign zone and scope to parsed rules.""" - try: - # Rules already have zone and scope assigned in parse_rule_values - # This node can refine assignments if needed - return state - - except Exception as e: - logger.error(f"Error assigning zone and scope: {e}", exc_info=True) - state["errors"] = state.get("errors", []) + [f"Zone/scope assignment error: {str(e)}"] - return state +# ===== Wohnzone Parameter Extraction ===== + +# Canonical order for BZO parameters (Fakten) +BZO_PARAM_ORDER = [ + "vollgeschosse", "vollgeschoss", + "anrechenbares untergeschoss", "untergeschoss", + "anrechenbares dachgeschoss", "dachgeschoss", "attikageschoss", + "ausnützungsziffer", "ausnutzungsziffer", "az", + "überbauungsziffer", + "gebäudehöhe", "fassadenhöhen", + "grundabstand", "grenzabstand", + "gebäudelänge", + "mehrlängen", "höchstmass", + "baumassenziffer", "grünflächenziffer", "wohnflächenanteil", "gebäudebreite", +] + +RULE_TYPE_TO_PARAM: Dict[str, str] = { + "max_building_height": "Gebäudehöhe max.", + "max_floors": "Vollgeschosse max.", + "max_attachable_attics": "anrechenbares Dachgeschoss max.", + "max_attachable_basement": "anrechenbares Untergeschoss max.", + "density": "Ausnützungsziffer", + "building_coverage": "Überbauungsziffer", + "building_mass_index": "Baumassenziffer (BMZ)", + "green_space_index": "Grünflächenziffer (GFZ)", + "boundary_distance": "Grundabstand min.", + "boundary_distance_length_surcharge": "Mehrlängen-zuschlag (MLZ)", + "boundary_distance_max": "Höchstmass Grenzabstand max.", + "building_length": "Gebäudelänge max.", + "building_width": "Gebäudebreite max.", + "residential_area_share": "Wohnflächenanteil", +} + +RULE_TYPE_TO_DEFAULT_UNIT: Dict[str, str] = { + "max_building_height": "m", + "max_floors": "Stk.", + "max_attachable_attics": "Stk.", + "max_attachable_basement": "Stk.", + "density": "%", + "building_coverage": "%", + "building_mass_index": "", + "green_space_index": "%", + "boundary_distance": "m", + "boundary_distance_length_surcharge": "", + "boundary_distance_max": "m", + "building_length": "m", + "building_width": "m", + "residential_area_share": "%", +} + +_ARTIKEL_KEYWORDS = [ + r"herabsetzung", r"grenzabstand", r"nutzweise", r"wohnanteil", + r"besondere\s+gebäude", r"überbauungsziffer", r"sonderregel", + r"ausnahmen", r"abweichungen", r"erleichterungen", + r"mischung", r"gewerbe", r"dienstleistung", + r"kantonale", r"abstandsvorschriften", + r"vollgeschoss", r"reduziert", r"mindestmass", + r"störend", r"nicht\s+störend", r"mässig\s+störend", +] + +# Artikel that are parameter tables - EXCLUDE from Weiterführende Bestimmungen +_ZUSATZ_EXCLUDE_TITLES = ("zonen", "grundmasse", "mehrlängenzuschlag", "mehrlaengenzuschlag") + +# Artikel that are substantive provisions - INCLUDE in Weiterführende Bestimmungen +_ZUSATZ_INCLUDE_TITLES = ( + "herabsetzung", "nutzweise", "besondere", "besonderes", + "ausnahmen", "abweichungen", "erleichterungen", "sonderregel", + "wohnanteil", "nutzungsart", "abstandsvorschriften", + "mischung", "gewerbe", "dienstleistung", +) -def confidence_scoring(state: BZOExtractionState) -> BZOExtractionState: - """Calculate confidence scores for extracted data.""" - try: - # Confidence already calculated in parse_rule_values - # This node can refine scores if needed - return state - - except Exception as e: - logger.error(f"Error calculating confidence: {e}", exc_info=True) - state["errors"] = state.get("errors", []) + [f"Confidence scoring error: {str(e)}"] - return state +def _format_article_text_readable(text: str, article_label: str = "", article_title: str = "") -> str: + """Format raw PDF-extracted text for readable display.""" + if not text or not text.strip(): + return "" + # Strip redundant article header at start (e.g. "Art. 16 Nutzweise" when already in summary) + if article_label or article_title: + prefix = f"{article_label} {article_title}".strip() + if prefix: + pat = re.escape(prefix) + text = re.sub(rf"^{pat}\s*", "", text.strip(), flags=re.I).lstrip() + lines = [] + for line in text.split("\n"): + line = line.strip() + if not line: + continue + lines.append(line) + if not lines: + return "" + # Join hyphenated word breaks (e.g. "Gewerbe-\nund" -> "Gewerbe und") + merged = [] + i = 0 + while i < len(lines): + line = lines[i] + while line.rstrip().endswith("-") and i + 1 < len(lines): + line = line.rstrip()[:-1] + lines[i + 1].strip() + i += 1 + if re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines): + next_line = lines[i + 1] + if not re.match(r"^Art\.\s", next_line) and len(next_line) > 3: + line = line + " " + next_line.strip() + i += 1 + elif re.match(r"^\d{1,2}\s*$", line) and i + 1 < len(lines) and re.match(r"^Art\.\s", lines[i + 1]): + i += 1 + continue + merged.append(line) + i += 1 + combined = " ".join(merged) + # Fix run-together paragraph numbers: "1In" -> "1. In", "2Ist" -> "2. Ist" + combined = re.sub(r"(\d)([A-ZÄÖÜ])", r"\1. \2", combined) + # Also fix "a)Something" -> "a) Something" for subparagraphs + combined = re.sub(r"([a-z]\))([A-ZÄÖÜ])", r"\1 \2", combined) + # Split into paragraphs: numbered (1. ..., 2. ...) or lettered (a) ..., b) ...) + parts = re.split(r"(?=\d+\.\s+[A-ZÄÖÜa-zäöü])|(?=[a-z]\)\s+[A-ZÄÖÜa-zäöü])", combined) + paragraphs = [] + for p in parts: + p = p.strip() + if not p or len(p) < 3: + continue + paragraphs.append(p) + return "\n\n".join(paragraphs) -def extract_zone_parameter_tables(state: BZOExtractionState) -> BZOExtractionState: - """ - Extract structured zone-parameter mappings from tables. - - Parses tables that map building zones to parameter values (e.g., Ausnützungsziffer, - Vollgeschosse, Gebäudelänge, Grenzabstand, Fassadenhöhen). - """ - try: - import re - tables = [] - - # Find all table blocks - table_blocks = [ - block for block in state.get("classified_blocks", []) - if block.get("block_type") == "table" - ] - - logger.info(f"Found {len(table_blocks)} table blocks to process") - - for table_block in table_blocks: - block_dict = table_block.get("block", {}) - text = block_dict.get("text", "") - page = block_dict.get("page", 0) - - if not text or len(text.strip()) < 20: # Skip very short blocks +def _is_zusatzinfo_article(title: str) -> bool: + """True if article should appear in Weiterführende Bestimmungen (provisions, not param tables).""" + t = (title or "").lower().strip() + for exc in _ZUSATZ_EXCLUDE_TITLES: + if exc in t: + return False + for inc in _ZUSATZ_INCLUDE_TITLES: + if inc in t: + return True + return False + + +def _bzo_build_source(page: Optional[int], article: Optional[str]) -> str: + """Build source string: Art. X, S. Y""" + parts = [] + if article: + parts.append(str(article)) + if page is not None and page > 0: + parts.append(f"S. {page}") + return ", ".join(parts) if parts else "" + + +def _bzo_zone_matches_table(bauzone: str, zone_col: str) -> bool: + """Check if table column zone matches target bauzone.""" + b = (bauzone or "").upper().strip() + z = (zone_col or "").upper().strip() + if not b or not z: + return False + return b in z or (len(z) >= 2 and z in b) + + +def _bzo_article_mentions_bauzone(article_text: str, bauzone: str) -> bool: + """Check if article text mentions the bauzone or applies to it.""" + if not bauzone or not article_text: + return False + b = bauzone.upper().strip() + t = article_text.upper() + if b in t: + return True + if len(b) >= 2 and b[0] in "WZIK" and re.search(rf"\b{b[0]}\s*\d+", t): + base = re.sub(r"\s+", "", b.split("/")[0].rstrip("G")) + if base in t or re.search(rf"\b{base}\b", t): + return True + return False + + +def _bzo_get_params_from_tables( + zone_parameter_tables: List[Dict[str, Any]], + bauzone: str +) -> List[Dict[str, Any]]: + """Extract parameter values for a Bauzone from zone-parameter tables.""" + result = [] + seen = set() + for table in zone_parameter_tables: + zones = table.get("zones", []) + if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in zones): + continue + page = table.get("page") + art = table.get("article") + for param in table.get("parameters", []): + values_by_zone = param.get("values_by_zone", {}) + for zone, values in values_by_zone.items(): + if not _bzo_zone_matches_table(bauzone, str(zone)): + continue + if not isinstance(values, list) or len(values) == 0: + continue + val_entry = values[0] + value = val_entry.get("value", "") + unit = val_entry.get("unit") or "" + param_name = param.get("parameter", "") + key = f"{param_name}|{value}|{unit}" + if key not in seen: + seen.add(key) + source = _bzo_build_source(page, param.get("article") or art) + result.append({ + "parameter": param_name, + "value": str(value), + "unit": str(unit).strip() if unit else "", + "source": source or "Tabelle im Dokument", + "rule_type": None, + }) + return result + + +def _bzo_filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: + """Filter rules by Bauzone code.""" + bauzone_upper = (bauzone or "").upper() + out = [] + for r in rules: + if bauzone_upper in (r.get("zone_raw") or "").upper(): + out.append(r) + continue + for tz in (r.get("table_zones") or []): + if bauzone_upper in str(tz).upper(): + out.append(r) + break + else: + if bauzone_upper in (r.get("text_snippet") or "").upper(): + out.append(r) + return out + + +def _bzo_get_params_from_rules(rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert parsed rules to {parameter, value, unit, source, rule_type} format.""" + result = [] + seen = set() + for r in rules: + rule_type = r.get("rule_type", "") + param_name = RULE_TYPE_TO_PARAM.get(rule_type) or rule_type.replace("_", " ").title() + value_numeric = r.get("value_numeric") + value_text = r.get("value_text", "") + unit = r.get("unit") or "" + if value_numeric is not None: + val_str = str(int(value_numeric)) if isinstance(value_numeric, float) and value_numeric == int(value_numeric) else str(value_numeric) + else: + val_str = str(value_text).strip() if value_text else "" + if not val_str: + continue + val_lower = val_str.lower() + if val_lower in ("gebäudelänge", "gebäudebreite", "mehrlängenzuschlag", "mehrlängen", "grenzabstand", "fassadenhöhe"): + continue + unit_str = str(unit).strip() if unit else (RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, "")) + page = r.get("page") + article = r.get("article_label") + source = _bzo_build_source(page, article) or "Artikeltxt" + key = f"{param_name}|{val_str}|{unit_str}" + if key not in seen: + seen.add(key) + result.append({ + "parameter": param_name, + "value": val_str, + "unit": unit_str, + "source": source, + "rule_type": rule_type, + }) + return result + + +def _bzo_param_to_rule_type(param_name: str) -> Optional[str]: + """Map parameter display name to rule_type.""" + p = (param_name or "").lower() + if "vollgeschoss" in p: + return "max_floors" + if "dachgeschoss" in p or "attika" in p: + return "max_attachable_attics" + if "untergeschoss" in p: + return "max_attachable_basement" + if "ausnützungsziffer" in p or "ausnutzungsziffer" in p or " az " in p: + return "density" + if "überbauungsziffer" in p or " uz " in p: + return "building_coverage" + if "baumassenziffer" in p or "bmz" in p: + return "building_mass_index" + if "grünflächen" in p or "gfz" in p: + return "green_space_index" + if "grenzabstand" in p or "grundabstand" in p: + return "boundary_distance" + if "mehrlängen" in p or "mlz" in p: + return "boundary_distance_length_surcharge" + if "höchstmass" in p: + return "boundary_distance_max" + if "gebäudelänge" in p: + return "building_length" + if "gebäudebreite" in p: + return "building_width" + if "fassadenhöhe" in p or "gebäudehöhe" in p: + return "max_building_height" + if "wohnflächenanteil" in p or "wohnanteil" in p: + return "residential_area_share" + return None + + +def _bzo_merge_rules( + from_tables: List[Dict[str, Any]], + from_rules: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Merge table params and rule params. Tables take precedence.""" + by_param_lower: Dict[str, Dict[str, Any]] = {} + for r in from_tables: + p = (r.get("parameter") or "").lower() + if p and p not in by_param_lower: + rr = r.copy() + if not rr.get("rule_type"): + rr["rule_type"] = _bzo_param_to_rule_type(rr.get("parameter", "")) + by_param_lower[p] = rr + for r in from_rules: + p = (r.get("parameter") or "").lower() + if p and p not in by_param_lower: + by_param_lower[p] = r.copy() + return list(by_param_lower.values()) + + +def _bzo_param_sort_key(param_name: str) -> int: + """Order parameters by BZO_PARAM_ORDER.""" + p = (param_name or "").lower() + for i, kw in enumerate(BZO_PARAM_ORDER): + if kw in p: + return i + return 99 + + +def _bzo_extract_zusatzinformationen( + articles: List[Dict[str, Any]], + bauzone: str = "", + zone_parameter_tables: Optional[List[Dict[str, Any]]] = None, +) -> List[Dict[str, Any]]: + """Extract article excerpts relevant to the bauzone.""" + result = [] + seen = set() + patterns = [re.compile(kw, re.IGNORECASE) for kw in _ARTIKEL_KEYWORDS] + table_articles = set() + if zone_parameter_tables and bauzone: + for t in zone_parameter_tables: + if not any(_bzo_zone_matches_table(bauzone, str(z)) for z in t.get("zones", [])): continue - - # Try to parse table structure - # Look for zone codes in header row (W2/30, W3/50, W4/70G*, etc.) - zone_pattern = r'\b([WLIZK]\d+(?:/\d+)?(?:G\*?)?)\b' - lines = text.split('\n') - - # Find header row (usually contains zone codes) - header_row_idx = None - zone_columns = [] - - for idx, line in enumerate(lines): - # Look for multiple zone codes in a line (header row) - zone_matches = re.findall(zone_pattern, line, re.IGNORECASE) - if len(zone_matches) >= 3: # At least 3 zones indicates header row - header_row_idx = idx - zone_columns = zone_matches - logger.debug(f"Found header row at line {idx} with zones: {zone_columns}") - break - - if not header_row_idx or not zone_columns: - # Try alternative: look for common table patterns - # Check if text contains parameter names and zone codes - parameter_keywords = [ - r'Ausnützungsziffer', - r'Vollgeschosse', - r'Dachgeschosse', - r'Attikageschoss', - r'Untergeschoss', - r'Gebäudelänge', - r'Grenzabstand', - r'Fassadenhöhen', - r'Grundabstand', - r'Mehrlängen', - r'Höchstmass' - ] - - has_parameters = any(re.search(kw, text, re.IGNORECASE) for kw in parameter_keywords) - has_zones = len(re.findall(zone_pattern, text, re.IGNORECASE)) >= 3 - - if has_parameters and has_zones: - # Extract all zones from entire text - all_zones = re.findall(zone_pattern, text, re.IGNORECASE) - zone_columns = list(dict.fromkeys(all_zones)) # Remove duplicates, preserve order - header_row_idx = 0 # Assume header is at start - logger.debug(f"Found zones in table text: {zone_columns}") - - if not zone_columns: + table_articles.add(t.get("article") or "") + for art in articles: + label = art.get("article_label") or "" + title = (art.get("article_title") or "").strip() + text = (art.get("text") or "").strip() + page = art.get("page_start") or art.get("page_end") or 0 + if not label or not text: + continue + key = f"{label}|{page}" + if key in seen: + continue + combined = f"{title} {text}" + if not any(p.search(combined) for p in patterns): + continue + if bauzone: + if not _bzo_article_mentions_bauzone(combined, bauzone) and label not in table_articles: continue - - # Parse parameter rows - table_data = { - "page": page, - "zones": zone_columns, - "parameters": [], - "source_text": text[:500], # Store first 500 chars for reference - "article": None # Will be set if found + seen.add(key) + source = _bzo_build_source(page, label) + result.append({ + "article_label": label, + "article_title": title, + "text": text[:3500].strip(), + "page": page, + "source": source or "BZO-Dokument", + }) + return sorted(result, key=lambda x: (x.get("page", 0), x.get("article_label", ""))) + + +def extract_wohnzone_params( + extracted_content: Dict[str, Any], + bauzone: str, + relevant_rules: Optional[List[Dict[str, Any]]] = None, + total_area_m2: Optional[float] = None, +) -> Dict[str, Any]: + """ + Extract BZO parameters for a Wohnzone from extracted content. + Returns ordered list of fakten (with sources) and zusatzinformationen. + """ + articles = extracted_content.get("articles", []) + zone_parameter_tables = extracted_content.get("zone_parameter_tables", []) + all_rules = extracted_content.get("rules", []) + rules_to_use = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone(all_rules, bauzone) + from_tables = _bzo_get_params_from_tables(zone_parameter_tables, bauzone) + from_rules = _bzo_get_params_from_rules(rules_to_use) + bauzone_rules = _bzo_merge_rules(from_tables, from_rules) + fakten = [] + if bauzone: + fakten.append({"item": "Auswertung für Bauzone", "value": bauzone, "source": ""}) + if total_area_m2 is not None and total_area_m2 > 0: + fakten.append({ + "item": "Grundstücksfläche", + "value": f"{total_area_m2:,.0f} m²".replace(",", "'"), + "source": "Parzellendaten", + }) + for r in sorted(bauzone_rules, key=lambda x: _bzo_param_sort_key(x.get("parameter", ""))): + param = r.get("parameter", "").strip() + val = r.get("value", "") + unit = (r.get("unit") or "").strip() + rule_type = r.get("rule_type") or _bzo_param_to_rule_type(param) + if not unit and rule_type: + unit = RULE_TYPE_TO_DEFAULT_UNIT.get(rule_type, "") + value_str = f"{val}{(' ' + unit) if unit else ''}".strip() + if param and value_str: + fakten.append({ + "item": param, + "value": value_str, + "source": r.get("source") or "BZO-Dokument", + }) + zusatzinformationen = _bzo_extract_zusatzinformationen( + articles, bauzone, zone_parameter_tables + ) + return { + "bauzone": bauzone, + "fakten": fakten, + "zusatzinformationen": zusatzinformationen, + } + + +# ===== LangGraph: LLM-based BZO Params Extraction ===== + +def _build_bauzone_context_for_llm(state: BZOParamsExtractionState) -> str: + """Build context string for LLM from extracted BZO content.""" + bauzone = (state.get("bauzone") or "").upper() + zone_parameter_tables = state.get("zone_parameter_tables", []) + relevant_articles = state.get("relevant_articles", []) + relevant_rules = state.get("relevant_rules", []) + total_area_m2 = state.get("total_area_m2") + parts = [] + + if total_area_m2 is not None and total_area_m2 > 0: + parts.append(f"Grundstücksfläche der Parzelle: {total_area_m2:,.0f} m²".replace(",", "'")) + parts.append("") + + # Full article texts - LLM can parse tables like Art. 14 (zones in rows, values in columns) + parts.append("=== ARTIKEL MIT VOLLEM TEXT (Tabellen genau lesen, richtige Spalte/Zeile für Bauzone wählen) ===") + for art in relevant_articles: + label = art.get("article_label", "") + title = (art.get("article_title") or "").strip() + text = art.get("text", "") + page = art.get("page_start") or art.get("page_end", 0) + parts.append(f"\n{label}: {title}") + parts.append(f"Seite: {page}") + parts.append(f"Inhalt:\n{text}") + parts.append("") + + # Zone-parameter tables (pre-parsed) + if zone_parameter_tables: + parts.append("=== VORSTRUKTURIERTE TABELLENWERTE FÜR BAUZONE ===") + for table in zone_parameter_tables: + page = table.get("page", 0) + art = table.get("article", "") + parts.append(f"\n{art} (S. {page}):") + for param in table.get("parameters", []): + pname = param.get("parameter", "") + for zone, values in (param.get("values_by_zone") or {}).items(): + if bauzone in (zone or "").upper(): + if isinstance(values, list) and values: + v = values[0].get("value", "") + u = values[0].get("unit") or "" + parts.append(f" {pname} [{zone}]: {v} {u}".strip()) + parts.append("") + + # Rules from text + if relevant_rules: + parts.append("=== REGELN AUS ARTIKELTEXT ===") + for r in relevant_rules[:20]: + rt = r.get("rule_type", "") + vn = r.get("value_numeric") + vt = r.get("value_text", "") + u = r.get("unit", "") + page = r.get("page", 0) + art = r.get("article_label", "") + val = str(int(vn)) if vn is not None and isinstance(vn, float) and vn == int(vn) else (str(vn) if vn is not None else vt) + parts.append(f" {rt}: {val} {u} ({art}, S. {page})".strip()) + + return "\n".join(parts) + + +def _parse_llm_bullet_list(text: str) -> List[Dict[str, str]]: + """Parse LLM response into fakten list. Expects lines like '- Param: value (Art. X, S. Y)'.""" + fakten = [] + for line in (text or "").strip().split("\n"): + line = line.strip() + if not line or not line.startswith("-"): + continue + line = line.lstrip("- ").strip() + # Match "Param: value (source)" or "Param: value" + match = re.match(r"^(.+?):\s*(.+?)(?:\s*\(([^)]+)\))?\s*$", line) + if match: + item = match.group(1).strip() + value = match.group(2).strip() + source = (match.group(3) or "").strip() + if item and value: + fakten.append({"item": item, "value": value, "source": source}) + elif ":" in line: + idx = line.find(":") + fakten.append({ + "item": line[:idx].strip(), + "value": line[idx + 1 :].strip(), + "source": "", + }) + return fakten + + +async def _llm_filter_relevant_provisions( + ai_service: Any, + bauzone: str, + fakten: List[Dict[str, str]], + provision_articles: List[Dict[str, Any]], +) -> Optional[set]: + """ + Use LLM to determine which provision articles are relevant for a parcel in this bauzone. + Returns set of article labels (e.g. {"Art. 15", "Art. 16"}) or None to include all on error. + """ + if not provision_articles: + return set() + fakten_str = "\n".join( + f"- {f.get('item', '')}: {f.get('value', '')}" for f in fakten + if f.get("item") and "Auswertung" not in (f.get("item") or "") + ) + articles_str = "\n".join( + f"- {a.get('article_label', '')}: {a.get('article_title', '')}" + for a in provision_articles + if a.get("article_label") + ) + prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO). + +Eine Parzelle liegt in der Bauzone {bauzone}. Folgende BZO-Parameter gelten für diese Zone: +{fakten_str} + +Folgende Bestimmungen (Weiterführende Artikel) könnten zutreffen: +{articles_str} + +AUFGABE: Welche dieser Artikel sind für eine Parzelle in Bauzone {bauzone} mit diesen Parametern TATSÄCHLICH RELEVANT? +- Nur Artikel angeben, die auf diese Zone/Parameter Bezug nehmen oder Bedingungen nennen, die hier greifen +- z.B. Art. 15 Herabsetzung: relevant wenn Vollgeschosse und Grenzabstand vorhanden (Reduktion bei weggelassenen Geschossen) +- z.B. Art. 16 Nutzweise: relevant für Wohnzonen mit Wohnanteil +- z.B. Art. 40 Wohnanteil: nur wenn dieser Artikel die Zone {bauzone} erwähnt oder für Wohnzonen gilt +- Artikel die andere Zonen betreffen (z.B. nur Z5, I) und {bauzone} ausschliessen: NICHT aufnehmen + +Antwort NUR mit den relevanten Artikelnummern, eine pro Zeile (z.B. "Art. 15", "Art. 16"). Keine anderen Zeichen.""" + + try: + response = await ai_service.callAiPlanning( + prompt=prompt, + debugType="bzo_relevant_provisions", + ) + labels = set() + for line in (response or "").strip().split("\n"): + m = re.search(r"(Art\.\s*\d+[a-z]?)", line.strip(), re.I) + if m: + lbl = re.sub(r"\s+", " ", m.group(1).strip()) + labels.add(lbl) + return labels if labels else None # None = include all (fallback on error or empty) + except Exception as e: + logger.warning(f"LLM provision filter failed: {e}") + return None + + +async def llm_extract_bauzone_params_node(state: BZOParamsExtractionState) -> BZOParamsExtractionState: + """LangGraph node: use LLM to extract BZO parameters for Bauzone as bullet list.""" + bauzone = state.get("bauzone", "") + gemeinde = state.get("gemeinde", "") + ai_service = state.get("ai_service") + errors = list(state.get("errors", [])) + + if not ai_service: + errors.append("AI service not provided") + return {**state, "fakten": [], "bauzone_params_list": [], "errors": errors} + + context = _build_bauzone_context_for_llm(state) + + prompt = f"""Du bist Experte für Schweizer Bau- und Zonenordnungen (BZO). Extrahiere alle relevanten BZO-Parameter für die Bauzone {bauzone} in {gemeinde}. + +BZO-INHALT: +{context} + +AUFGABE: Erstelle eine geordnete Bullet-Liste ALLER zutreffenden Parameter für Bauzone {bauzone}. +Priorität: Vollgeschosse, anrechenbares Untergeschoss, anrechenbares Dachgeschoss, Ausnützungsziffer, Überbauungsziffer, Gebäudehöhe, Grundabstand/Grenzabstand, Gebäudelänge, Mehrlängenzuschlag, Höchstmass, sowie alle anderen Bestimmungen die für diese Zone gelten. + +WICHTIG: +- Bei Tabellen: die richtige Spalte/Zeile für {bauzone} verwenden (z.B. Art. 14 Mehrlängenzuschlag: W5 = 13 m) +- Jede Zeile: "- Parametername: Wert (Art. X, S. Y)" +- Nur tatsächlich im Dokument vorhandene Werte angeben +- Einheit (m, %, Stk.) bei Zahlen mit angeben +- Keine leeren Zeilen oder Kommentare - nur die Liste + +Antwort NUR mit der Bullet-Liste, sonst nichts:""" + + try: + ai_response = await ai_service.callAiPlanning( + prompt=prompt, + debugType="bzo_params_extraction", + ) + response_text = (ai_response or "").strip() + # Parse into fakten + fakten = _parse_llm_bullet_list(response_text) + # Build bauzone_params_list (raw "- ..." strings) + bauzone_params_list = [f"- {f['item']}: {f['value']}" + (f" ({f['source']})" if f.get("source") else "") for f in fakten] + # Add header items if missing + if bauzone and not any("Auswertung" in (f.get("item") or "") for f in fakten): + fakten.insert(0, {"item": "Auswertung für Bauzone", "value": bauzone, "source": ""}) + total_area_m2 = state.get("total_area_m2") + if total_area_m2 is not None and total_area_m2 > 0 and not any("Grundstücksfläche" in (f.get("item") or "") for f in fakten): + fakten.insert(1, { + "item": "Grundstücksfläche", + "value": f"{total_area_m2:,.0f} m²".replace(",", "'"), + "source": "Parzellendaten", + }) + # Zusatzinformationen: only provisions RELEVANT for this parcel in this bauzone + all_articles = state.get("extracted_content", {}).get("articles", []) or state.get("relevant_articles", []) + provision_articles = [a for a in all_articles if _is_zusatzinfo_article((a.get("article_title") or "").strip())] + relevant_labels = await _llm_filter_relevant_provisions( + ai_service=ai_service, + bauzone=bauzone, + fakten=fakten, + provision_articles=provision_articles, + ) + def _norm_label(s: str) -> str: + return re.sub(r"\s+", " ", (s or "").strip()) + + zusatzinformationen = [] + for art in provision_articles: + label = art.get("article_label", "") + title = (art.get("article_title") or "").strip() + norm = _norm_label(label) + if relevant_labels is not None and norm and norm not in relevant_labels: + continue + raw_text = (art.get("text") or "")[:4000] + text = _format_article_text_readable( + raw_text, + article_label=label, + article_title=title, + ) + if not text: + continue + page = art.get("page_start") or art.get("page_end", 0) + source = f"{label}, S. {page}" if label else f"S. {page}" + zusatzinformationen.append({ + "article_label": label, + "article_title": title, + "text": text, + "source": source, + }) + return { + **state, + "fakten": fakten, + "bauzone_params_list": bauzone_params_list, + "zusatzinformationen": zusatzinformationen, + "errors": errors, + } + except Exception as e: + logger.error(f"LLM BZO params extraction failed: {e}", exc_info=True) + errors.append(str(e)) + return { + **state, + "fakten": [], + "bauzone_params_list": [], + "zusatzinformationen": [], + "errors": errors, + } + + +def create_bzo_params_extraction_graph(): + """Create LangGraph for LLM-based BZO params extraction.""" + workflow = StateGraph(BZOParamsExtractionState) + workflow.add_node("llm_extract", llm_extract_bauzone_params_node) + workflow.set_entry_point("llm_extract") + workflow.add_edge("llm_extract", END) + return workflow.compile() + + +def _filter_articles_by_bauzone(articles: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: + """Filter articles that mention the Bauzone.""" + bauzone_upper = (bauzone or "").upper() + return [ + a for a in articles + if bauzone_upper in (a.get("text") or "").upper() or bauzone_upper in (a.get("zone_raw") or "").upper() + ] + + +def _filter_tables_by_bauzone(tables: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: + """Filter zone-parameter tables to those containing the Bauzone.""" + bauzone_upper = (bauzone or "").upper() + relevant = [] + for table in tables: + zones = table.get("zones", []) + matching = [z for z in zones if bauzone_upper in str(z).upper()] + if matching: + filtered = { + "page": table.get("page"), + "article": table.get("article"), + "zones": matching, + "parameters": [ + {"parameter": p.get("parameter"), "values_by_zone": { + z: v for z, v in (p.get("values_by_zone") or {}).items() + if bauzone_upper in str(z).upper() + }} + for p in table.get("parameters", []) + if any(bauzone_upper in str(z).upper() for z in (p.get("values_by_zone") or {})) + ], } - - # Extract parameters and their values - # Look for parameter rows (a), b), c), etc. or parameter names - parameter_row_patterns = [ - r'^[a-g]\)\s+(.+?)(?:\s+max\.|min\.|:)?', # a) Parameter name - r'^(Ausnützungsziffer|Vollgeschosse|Dachgeschosse|Attikageschoss|Untergeschoss|Gebäudelänge|Grenzabstand|Fassadenhöhen|Grundabstand|Mehrlängen|Höchstmass|Höchstmaß)', - ] - - # Parse each line after header - start_idx = header_row_idx + 1 if header_row_idx is not None else 0 - current_parameter = None - current_subparameter = None - parameter_values = {} - subparameter_values = {} - - # Track which article/section this table belongs to - article_context = None - for block in state.get("classified_blocks", []): - if block.get("block", {}).get("page") == page: - article_label = block.get("article_label") - if article_label: - article_context = article_label - break - - for line_idx in range(start_idx, len(lines)): - line = lines[line_idx].strip() - if not line: - continue - - # Check if this is a parameter row (main parameter like a), b), c)) - is_parameter_row = False - parameter_name = None - - for pattern in parameter_row_patterns: - match = re.match(pattern, line, re.IGNORECASE) - if match: - is_parameter_row = True - parameter_name = match.group(1).strip() - # Clean up parameter name - parameter_name = re.sub(r'\s+max\.?\s*$', '', parameter_name, flags=re.IGNORECASE) - parameter_name = re.sub(r'\s+min\.?\s*$', '', parameter_name, flags=re.IGNORECASE) - break - - # Check for sub-parameters (like "Grundabstand min.", "Mehrlängen-zuschlag", "Höchstmass max.") - is_subparameter = False - subparameter_name = None - if not is_parameter_row: - subparameter_patterns = [ - r'^(Grundabstand|Mehrlängen|Höchstmass|Höchstmaß|Fassadenhöhen)\s*(min\.|max\.)?', - r'^(anrechenbare\s+Dachgeschosse|anrechenbares\s+Attikageschoss|anrechenbares\s+Untergeschoss)', - ] - for pattern in subparameter_patterns: - match = re.search(pattern, line, re.IGNORECASE) - if match: - is_subparameter = True - subparameter_name = match.group(1).strip() - if match.lastindex > 1 and match.group(2): - subparameter_name += f" {match.group(2).strip()}" - break - - if is_parameter_row and parameter_name: - # Save previous parameter if exists - if current_parameter and parameter_values: - param_entry = { - "parameter": current_parameter, - "values_by_zone": parameter_values.copy() - } - if article_context: - param_entry["article"] = article_context - table_data["parameters"].append(param_entry) - - # Start new parameter - current_parameter = parameter_name - current_subparameter = None - parameter_values = {} - subparameter_values = {} - continue - - if is_subparameter and subparameter_name: - # Save previous subparameter if exists - if current_subparameter and subparameter_values: - if current_parameter: - # Add subparameter as nested parameter - param_entry = { - "parameter": f"{current_parameter} - {current_subparameter}", - "values_by_zone": subparameter_values.copy() - } - if article_context: - param_entry["article"] = article_context - table_data["parameters"].append(param_entry) - - current_subparameter = subparameter_name - subparameter_values = {} - continue - - # Try to extract values for current parameter or subparameter - target_values = subparameter_values if current_subparameter else parameter_values - if current_parameter or current_subparameter: - # Improved parsing: try to align values with zone columns - # Split line by multiple spaces or tabs (table column separators) - line_parts = re.split(r'\s{2,}|\t', line) - line_parts = [p.strip() for p in line_parts if p.strip()] - - # Look for numeric values with units - numeric_pattern = r'(\d+(?:\.\d+)?)\s*(%|m|Geschoss|Geschosse|Geschosse\s+max\.?|Geschoss\s+max\.?)?' - all_matches = list(re.finditer(numeric_pattern, line, re.IGNORECASE)) - - # Also look for fractions (like 1/3) - fraction_pattern = r'(\d+/\d+)' - fraction_matches = list(re.finditer(fraction_pattern, line, re.IGNORECASE)) - - # Combine all matches, preserving position - all_value_matches = [] - for m in all_matches: - value = m.group(1) - unit = m.group(2) if m.lastindex > 1 else None - all_value_matches.append((m.start(), m.group(0), value, unit)) - - for m in fraction_matches: - all_value_matches.append((m.start(), m.group(0), m.group(0), None)) - - all_value_matches.sort(key=lambda x: x[0]) - - # Try to map values to zones - # Strategy: if we have roughly the same number of values as zones, map 1:1 - # Otherwise, try to distribute evenly - if len(all_value_matches) > 0 and len(zone_columns) > 0: - if len(all_value_matches) == len(zone_columns): - # Perfect 1:1 mapping - for zone_idx, zone in enumerate(zone_columns): - if zone_idx < len(all_value_matches): - _, full_match, value, unit = all_value_matches[zone_idx] - if zone not in target_values: - target_values[zone] = [] - target_values[zone].append({ - "value": value, - "unit": unit.strip() if unit else None, - "raw_text": line[:200], - "line_number": line_idx - }) - elif len(all_value_matches) >= len(zone_columns): - # More values than zones - try to group - values_per_zone = len(all_value_matches) / len(zone_columns) - for zone_idx, zone in enumerate(zone_columns): - start_idx = int(zone_idx * values_per_zone) - end_idx = int((zone_idx + 1) * values_per_zone) - zone_values = all_value_matches[start_idx:end_idx] - - if zone_values: - if zone not in target_values: - target_values[zone] = [] - # Take the first (or most relevant) value - _, full_match, value, unit = zone_values[0] - target_values[zone].append({ - "value": value, - "unit": unit.strip() if unit else None, - "raw_text": line[:200], - "line_number": line_idx - }) - else: - # Fewer values than zones - try to match by position - # Use line_parts if they align better - if len(line_parts) >= len(zone_columns) * 0.7: - # Try to extract values from line_parts - for zone_idx, zone in enumerate(zone_columns): - if zone_idx < len(line_parts): - part = line_parts[zone_idx] - # Extract numeric value from this part - num_match = re.search(r'(\d+(?:\.\d+)?)', part) - if num_match: - value = num_match.group(1) - unit_match = re.search(r'(%|m|Geschoss)', part, re.IGNORECASE) - unit = unit_match.group(0) if unit_match else None - - if zone not in target_values: - target_values[zone] = [] - target_values[zone].append({ - "value": value, - "unit": unit, - "raw_text": part[:100], - "line_number": line_idx - }) - else: - # Fallback: assign to first zone(s) - for idx, (_, full_match, value, unit) in enumerate(all_value_matches): - if idx < len(zone_columns): - zone = zone_columns[idx] - if zone not in target_values: - target_values[zone] = [] - target_values[zone].append({ - "value": value, - "unit": unit.strip() if unit else None, - "raw_text": line[:200], - "line_number": line_idx - }) - - # Save last parameter/subparameter - if current_subparameter and subparameter_values: - if current_parameter: - param_entry = { - "parameter": f"{current_parameter} - {current_subparameter}", - "values_by_zone": subparameter_values.copy() - } - if article_context: - param_entry["article"] = article_context - table_data["parameters"].append(param_entry) - - if current_parameter and parameter_values: - param_entry = { - "parameter": current_parameter, - "values_by_zone": parameter_values.copy() - } - if article_context: - param_entry["article"] = article_context - table_data["parameters"].append(param_entry) - - if table_data["parameters"]: - tables.append(table_data) - logger.info(f"Extracted table with {len(table_data['zones'])} zones and {len(table_data['parameters'])} parameters from page {page}") - - # Update state - existing_tables = state.get("zone_parameter_tables", []) - state["zone_parameter_tables"] = existing_tables + tables - - logger.info(f"Extracted {len(tables)} zone-parameter tables total") - return state - - except Exception as e: - logger.error(f"Error extracting zone-parameter tables: {e}", exc_info=True) - state["errors"] = state.get("errors", []) + [f"Table extraction error: {str(e)}"] - return state + filtered["parameters"] = [x for x in filtered["parameters"] if x["values_by_zone"]] + if filtered["parameters"]: + relevant.append(filtered) + return relevant +async def run_bzo_params_extraction( + extracted_content: Dict[str, Any], + bauzone: str, + ai_service: Any, + gemeinde: str, + relevant_rules: Optional[List[Dict[str, Any]]] = None, + relevant_articles: Optional[List[Dict[str, Any]]] = None, + total_area_m2: Optional[float] = None, +) -> Dict[str, Any]: + """ + Run LangGraph workflow to extract BZO parameters for a Bauzone via LLM. + Returns fakten (item/value/source), bauzone_params_list (bullet strings), zusatzinformationen. + """ + rules = relevant_rules if relevant_rules is not None else _bzo_filter_rules_by_bauzone( + extracted_content.get("rules", []), bauzone + ) + articles = relevant_articles if relevant_articles is not None else _filter_articles_by_bauzone( + extracted_content.get("articles", []), bauzone + ) + tables = _filter_tables_by_bauzone( + extracted_content.get("zone_parameter_tables", []), + bauzone + ) + + state: BZOParamsExtractionState = { + "extracted_content": extracted_content, + "bauzone": bauzone, + "total_area_m2": total_area_m2, + "relevant_rules": rules, + "relevant_articles": articles, + "zone_parameter_tables": tables, + "ai_service": ai_service, + "gemeinde": gemeinde, + "bauzone_params_list": [], + "fakten": [], + "zusatzinformationen": [], + "errors": [], + } + + graph = create_bzo_params_extraction_graph() + final_state = await graph.ainvoke(state) + + return { + "bauzone": bauzone, + "fakten": final_state.get("fakten", []), + "bauzone_params_list": final_state.get("bauzone_params_list", []), + "zusatzinformationen": final_state.get("zusatzinformationen", []), + "errors": final_state.get("errors", []), + } # ===== Graph Construction ===== def create_bzo_extraction_graph(): - """Create and compile the BZO extraction graph.""" + """Create and compile the BZO extraction graph (simplified 4-node pipeline).""" workflow = StateGraph(BZOExtractionState) - - # Add nodes - workflow.add_node("extract_pdf_text", extract_pdf_text) - workflow.add_node("classify_text_block", classify_text_block) - workflow.add_node("assemble_articles", assemble_articles) - workflow.add_node("detect_zone_changes", detect_zone_changes) - workflow.add_node("extract_zone_parameter_tables", extract_zone_parameter_tables) - workflow.add_node("detect_rule_candidates", detect_rule_candidates) - workflow.add_node("parse_rule_values", parse_rule_values) - workflow.add_node("assign_zone_and_scope", assign_zone_and_scope) - workflow.add_node("confidence_scoring", confidence_scoring) - - # Define edges - workflow.set_entry_point("extract_pdf_text") - workflow.add_edge("extract_pdf_text", "classify_text_block") - workflow.add_edge("classify_text_block", "assemble_articles") - workflow.add_edge("assemble_articles", "detect_zone_changes") - workflow.add_edge("detect_zone_changes", "extract_zone_parameter_tables") - workflow.add_edge("extract_zone_parameter_tables", "detect_rule_candidates") - workflow.add_edge("detect_rule_candidates", "parse_rule_values") - workflow.add_edge("parse_rule_values", "assign_zone_and_scope") - workflow.add_edge("assign_zone_and_scope", "confidence_scoring") - workflow.add_edge("confidence_scoring", END) - + workflow.add_node("classify_and_assemble", classify_and_assemble) + workflow.add_node("extract_zones_and_tables", extract_zones_and_tables) + workflow.add_node("extract_rules", extract_rules) + workflow.set_entry_point("classify_and_assemble") + workflow.add_edge("classify_and_assemble", "extract_zones_and_tables") + workflow.add_edge("extract_zones_and_tables", "extract_rules") + workflow.add_edge("extract_rules", END) return workflow.compile() diff --git a/modules/features/realEstate/bzoRuleTaxonomy.py b/modules/features/realEstate/bzoRuleTaxonomy.py index 1c810efe..dffd824d 100644 --- a/modules/features/realEstate/bzoRuleTaxonomy.py +++ b/modules/features/realEstate/bzoRuleTaxonomy.py @@ -34,6 +34,12 @@ RULE_TAXONOMY = { "value_type": "numeric", "keywords": ["max", "maximal"] }, + "building_coverage": { + "patterns": ["überbauungsziffer", "überbauungsziffer max", "uz"], + "units": ["%", "prozent"], + "value_type": "numeric", + "keywords": ["max", "maximal"] + }, "building_mass_index": { "patterns": ["baumassenziffer", "bmz"], "units": [], diff --git a/modules/features/realEstate/interfaceFeatureRealEstate.py b/modules/features/realEstate/interfaceFeatureRealEstate.py index 57e14f23..65601d9a 100644 --- a/modules/features/realEstate/interfaceFeatureRealEstate.py +++ b/modules/features/realEstate/interfaceFeatureRealEstate.py @@ -467,6 +467,8 @@ class RealEstateObjects: Dokument, self.currentUser, recordFilter={"id": dokumentId}, + mandateId=self.mandateId, + featureInstanceId=self.featureInstanceId, featureCode=self.FEATURE_CODE ) @@ -482,6 +484,8 @@ class RealEstateObjects: Dokument, self.currentUser, recordFilter=recordFilter or {}, + mandateId=self.mandateId, + featureInstanceId=self.featureInstanceId, featureCode=self.FEATURE_CODE ) return [Dokument(**r) for r in records] @@ -538,6 +542,8 @@ class RealEstateObjects: Gemeinde, self.currentUser, recordFilter={"id": gemeindeId}, + mandateId=self.mandateId, + featureInstanceId=self.featureInstanceId, featureCode=self.FEATURE_CODE ) @@ -553,6 +559,8 @@ class RealEstateObjects: Gemeinde, self.currentUser, recordFilter=recordFilter or {}, + mandateId=self.mandateId, + featureInstanceId=self.featureInstanceId, featureCode=self.FEATURE_CODE ) return [Gemeinde(**r) for r in records] @@ -609,6 +617,8 @@ class RealEstateObjects: Kanton, self.currentUser, recordFilter={"id": kantonId}, + mandateId=self.mandateId, + featureInstanceId=self.featureInstanceId, featureCode=self.FEATURE_CODE ) @@ -624,6 +634,8 @@ class RealEstateObjects: Kanton, self.currentUser, recordFilter=recordFilter or {}, + mandateId=self.mandateId, + featureInstanceId=self.featureInstanceId, featureCode=self.FEATURE_CODE ) return [Kanton(**r) for r in records] diff --git a/modules/features/realEstate/mainRealEstate.py b/modules/features/realEstate/mainRealEstate.py index 1b15d0b7..0425c94c 100644 --- a/modules/features/realEstate/mainRealEstate.py +++ b/modules/features/realEstate/mainRealEstate.py @@ -13,23 +13,13 @@ FEATURE_CODE = "realestate" FEATURE_LABEL = {"en": "Real Estate", "de": "Immobilien", "fr": "Immobilier"} FEATURE_ICON = "mdi-home-city" -# UI Objects for RBAC catalog +# UI Objects for RBAC catalog (only map view) UI_OBJECTS = [ { "objectKey": "ui.feature.realestate.dashboard", - "label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"}, + "label": {"en": "Map", "de": "Karte", "fr": "Carte"}, "meta": {"area": "dashboard"} }, - { - "objectKey": "ui.feature.realestate.projects", - "label": {"en": "Projects", "de": "Projekte", "fr": "Projets"}, - "meta": {"area": "projects"} - }, - { - "objectKey": "ui.feature.realestate.parcels", - "label": {"en": "Parcels", "de": "Parzellen", "fr": "Parcelles"}, - "meta": {"area": "parcels"} - }, ] # Resource Objects for RBAC catalog @@ -74,10 +64,8 @@ TEMPLATE_ROLES = [ "fr": "Gestionnaire immobilier - Gérer les propriétés et locataires" }, "accessRules": [ - # UI access to main views - vollqualifizierte ObjectKeys + # UI access to map view {"context": "UI", "item": "ui.feature.realestate.dashboard", "view": True}, - {"context": "UI", "item": "ui.feature.realestate.projects", "view": True}, - {"context": "UI", "item": "ui.feature.realestate.parcels", "view": True}, # Group-level DATA access {"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "g"}, # Resource: create projects @@ -92,10 +80,8 @@ TEMPLATE_ROLES = [ "fr": "Visualiseur immobilier - Consulter les informations immobilières" }, "accessRules": [ - # UI access to view-only views - vollqualifizierte ObjectKeys + # UI access to map view (read-only) {"context": "UI", "item": "ui.feature.realestate.dashboard", "view": True}, - {"context": "UI", "item": "ui.feature.realestate.projects", "view": True}, - {"context": "UI", "item": "ui.feature.realestate.parcels", "view": True}, # Read-only DATA access (my records) {"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"}, ] @@ -299,11 +285,16 @@ from .datamodelFeatureRealEstate import ( DokumentTyp, ) from modules.services import getInterface as getServices -from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface -from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface +from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface +from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector from modules.features.realEstate.bzoDocumentRetriever import BZODocumentRetriever -from modules.features.realEstate.bzoExtractionLangGraph import run_extraction +from modules.features.realEstate.bzoExtractionLangGraph import run_extraction, run_bzo_params_extraction +from modules.features.realEstate.parcelSelectionService import compute_selection_summary +from modules.features.realEstate.realEstateGemeindeService import ( + ensure_single_gemeinde, + fetch_bzo_for_gemeinde, +) logger = logging.getLogger(__name__) @@ -2342,64 +2333,73 @@ async def extract_bzo_information( currentUser: User, gemeinde: str, bauzone: str, + mandateId: Optional[str] = None, + featureInstanceId: Optional[str] = None, + total_area_m2: Optional[float] = None, + parcels: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """ Extract BZO information from PDF documents for a specific Bauzone in a Gemeinde. Retrieves BZO documents for the specified Gemeinde, extracts content using langgraph workflow, filters by Bauzone, and uses AI to find relevant information. + When total_area_m2 or parcels are provided, runs Machbarkeitsstudie for structured output. Args: currentUser: Current authenticated user gemeinde: Gemeinde name (e.g., "Zürich") or ID bauzone: Bauzone code (e.g., "W3", "W2/30") + mandateId: Optional mandate ID for instance-scoped data (defaults to currentUser.mandateId) + featureInstanceId: Optional feature instance ID for instance-scoped data + total_area_m2: Optional total parcel area (m²) for Machbarkeitsstudie + parcels: Optional list of parcel dicts; total area computed via compute_selection_summary if not total_area_m2 Returns: Dictionary containing: - - bauzone: Bauzone code - - gemeinde: Gemeinde information - - extracted_content: Extracted content from PDFs - - ai_summary: AI-generated summary - - relevant_rules: Rules filtered by Bauzone - - documents_processed: List of document IDs processed - - Raises: - HTTPException: If Gemeinde not found or no documents found + - bauzone, gemeinde, extracted_content, ai_summary, relevant_rules, documents_processed + - machbarkeitsstudie: Structured Machbarkeitsstudie output when total_area_m2/parcels provided """ try: - logger.info(f"Extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}' (user: {currentUser.id})") + _mandateId = mandateId or (str(currentUser.mandateId) if currentUser.mandateId else None) + logger.info(f"Extracting BZO information for Gemeinde '{gemeinde}', Bauzone '{bauzone}' (user: {currentUser.id}, mandate: {_mandateId})") - # Get interfaces - realEstateInterface = getRealEstateInterface(currentUser) - componentInterface = getComponentInterface(currentUser) + # Get interfaces (instance-scoped when mandateId/featureInstanceId provided) + realEstateInterface = getRealEstateInterface( + currentUser, mandateId=_mandateId, featureInstanceId=featureInstanceId + ) + componentInterface = getComponentInterface( + currentUser, mandateId=_mandateId, featureInstanceId=featureInstanceId + ) # Get Gemeinde - try by ID first, then by label - logger.debug(f"Attempting to retrieve Gemeinde '{gemeinde}' for mandate {currentUser.mandateId}") + logger.debug(f"Attempting to retrieve Gemeinde '{gemeinde}' for mandate {_mandateId}") gemeinde_obj = realEstateInterface.getGemeinde(gemeinde) # If not found by ID, try searching by label if not gemeinde_obj: logger.debug(f"Gemeinde not found by ID, trying to search by label: {gemeinde}") + record_filter = {"label": gemeinde} + if _mandateId: + record_filter["mandateId"] = _mandateId gemeinden_by_label = realEstateInterface.getGemeinden( - recordFilter={"label": gemeinde} + recordFilter=record_filter ) if gemeinden_by_label and len(gemeinden_by_label) > 0: gemeinde_obj = gemeinden_by_label[0] logger.info(f"Found Gemeinde by label '{gemeinde}' with ID: {gemeinde_obj.id}") - else: - # Try to get all gemeinden to see what's available (for debugging) - all_gemeinden = realEstateInterface.getGemeinden(recordFilter=None) - logger.warning(f"Gemeinde '{gemeinde}' not found by ID or label. Total Gemeinden in database: {len(all_gemeinden)}") - if all_gemeinden: - sample_ids = [g.id for g in all_gemeinden[:5]] - sample_labels = [g.label for g in all_gemeinden[:5] if g.label] - logger.warning(f"Sample Gemeinde IDs: {sample_ids}") - if sample_labels: - logger.warning(f"Sample Gemeinde labels: {sample_labels}") - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Gemeinde '{gemeinde}' not found or not accessible" - ) + + # If still not found: fetch only this Gemeinde from Swiss Topo and create it + if not gemeinde_obj and _mandateId and featureInstanceId: + logger.info(f"Gemeinde '{gemeinde}' not in DB - fetching from Swiss Topo (this Gemeinde only)") + gemeinde_obj = await ensure_single_gemeinde( + realEstateInterface, _mandateId, featureInstanceId, gemeinde_name=gemeinde + ) + + if not gemeinde_obj: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Gemeinde '{gemeinde}' not found or not accessible" + ) gemeinde_id = gemeinde_obj.id @@ -2435,6 +2435,36 @@ async def extract_bzo_information( else: logger.warning(f"Document {doc_id} referenced in Gemeinde but not found in database") + # If no BZO documents: auto-fetch from Tavily, then retry + if not bzo_documents and _mandateId and featureInstanceId: + logger.info(f"No BZO documents for Gemeinde '{gemeinde_obj.label}' - fetching from web") + fetched = await fetch_bzo_for_gemeinde( + realEstateInterface, componentInterface, gemeinde_obj, _mandateId, featureInstanceId + ) + if fetched: + # Reload Gemeinde to get updated dokumente + gemeinde_obj = realEstateInterface.getGemeinde(gemeinde_obj.id) + bzo_documents = [] + if gemeinde_obj and gemeinde_obj.dokumente: + for doc in gemeinde_obj.dokumente: + if isinstance(doc, dict): + doc_id = doc.get("id") + doc_typ = doc.get("dokumentTyp") + else: + doc_id = doc.id if hasattr(doc, "id") else None + doc_typ = doc.dokumentTyp if hasattr(doc, "dokumentTyp") else None + if doc_typ: + if isinstance(doc_typ, DokumentTyp): + is_bzo = doc_typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION] + elif isinstance(doc_typ, str): + is_bzo = doc_typ in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"] + else: + is_bzo = str(doc_typ) in ["gemeindeBzoAktuell", "gemeindeBzoRevision", "GEMEINDE_BZO_AKTUELL", "GEMEINDE_BZO_REVISION"] + if is_bzo and doc_id: + full_doc = realEstateInterface.getDokument(doc_id) + if full_doc: + bzo_documents.append(full_doc) + if not bzo_documents: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -2498,11 +2528,13 @@ async def extract_bzo_information( ) continue - # Filter rules by Bauzone + # Filter rules by Bauzone - only rules explicitly associated with this zone relevant_rules = filter_rules_by_bauzone( all_extracted_content["rules"], bauzone ) + logger.info(f"Extracting for Bauzone {bauzone}: {len(relevant_rules)} zone-specific rules, " + f"{len([t for t in all_extracted_content.get('zone_parameter_tables', []) if bauzone.upper() in str(t.get('zones', [])).upper()])} tables with zone data") # Filter zones by Bauzone relevant_zones = filter_zones_by_bauzone( @@ -2515,6 +2547,34 @@ async def extract_bzo_information( all_extracted_content.get("articles", []), bauzone ) + + # Compute total_area_m2 from parcels if not provided + _total_area_m2 = total_area_m2 + if _total_area_m2 is None and parcels: + selection_summary = compute_selection_summary(parcels) + _total_area_m2 = selection_summary.get("total_area_m2") or 0.0 + + # Extract BZO parameters for Wohnzone via LangGraph + LLM (bullet list with sources) + bzo_params_result = None + try: + services = getServices( + currentUser, workflow=None, mandateId=_mandateId, featureInstanceId=featureInstanceId + ) + ai_service = services.ai + bzo_params_result = await run_bzo_params_extraction( + extracted_content=all_extracted_content, + bauzone=bauzone, + ai_service=ai_service, + gemeinde=gemeinde_obj.label, + relevant_rules=relevant_rules, + relevant_articles=relevant_articles, + total_area_m2=_total_area_m2, + ) + except Exception as me: + logger.warning(f"BZO parameter extraction failed: {me}", exc_info=True) + all_extracted_content["warnings"] = all_extracted_content.get("warnings", []) + [ + f"BZO-Parameter konnten nicht extrahiert werden: {str(me)}" + ] # Use AI to generate summary and find additional information ai_summary = await generate_bauzone_ai_summary( @@ -2523,7 +2583,9 @@ async def extract_bzo_information( gemeinde=gemeinde_obj.label, extracted_content=all_extracted_content, relevant_rules=relevant_rules, - relevant_zones=relevant_zones + relevant_zones=relevant_zones, + mandateId=_mandateId, + featureInstanceId=featureInstanceId, ) # Build unified summary that includes zones and articles @@ -2602,7 +2664,8 @@ async def extract_bzo_information( "relevant_rules": relevant_rules, "documents_processed": documents_processed, "errors": all_extracted_content.get("errors", []), - "warnings": all_extracted_content.get("warnings", []) + "warnings": all_extracted_content.get("warnings", []), + "machbarkeitsstudie": bzo_params_result, # Same key for frontend compatibility } except HTTPException: @@ -2617,47 +2680,59 @@ async def extract_bzo_information( def filter_rules_by_bauzone(rules: List[Dict[str, Any]], bauzone: str) -> List[Dict[str, Any]]: """ - Filter rules by Bauzone code. - - Args: - rules: List of rule dictionaries from extraction - bauzone: Bauzone code to filter by (e.g., "W3", "W2/30") - - Returns: - Filtered list of rules that match the Bauzone + Filter rules by Bauzone code. Only keeps rules from SINGLE-zone articles to avoid + wrong values (e.g. article with W2,W3,W5 has different values per zone - we cannot + associate a rule value with a specific zone from article text alone). """ relevant_rules = [] bauzone_upper = bauzone.upper() - + + def _zone_matches(z: str) -> bool: + zu = (z or "").upper().strip() + if not zu: + return False + if bauzone_upper in zu: + return True + if zu in bauzone_upper and len(zu) >= 2: + return True + return False + for rule in rules: - # Check if rule has zone information + table_zones = rule.get("table_zones", []) or [] zone_raw = rule.get("zone_raw") - table_zones = rule.get("table_zones", []) - - # Check if rule matches Bauzone + + # Rule must be zone-associated + has_zone = bool(zone_raw) or bool(table_zones) + if not has_zone: + continue + + # CRITICAL: Only use rules from single-zone articles. Multi-zone articles + # (e.g. table with W2,W3,W5) have different values per zone - we cannot + # know which value applies to our zone from article text. + if len(table_zones) > 1: + # Check if ALL zones in article match our bauzone (e.g. W5, W5/50) - unlikely + matches_all = all(_zone_matches(str(z)) for z in table_zones) + if not matches_all: + continue # Ambiguous: exclude + + # Zone must match our bauzone matches = False - - # Direct zone match - if zone_raw and bauzone_upper in zone_raw.upper(): + if zone_raw and _zone_matches(zone_raw): matches = True - - # Table zone match if not matches and table_zones: - for table_zone in table_zones: - if bauzone_upper in str(table_zone).upper(): + for tz in table_zones: + if _zone_matches(str(tz)): matches = True break - - # Check text snippet for Bauzone mention if not matches: - text_snippet = rule.get("text_snippet", "") - if bauzone_upper in text_snippet.upper(): + ts = (rule.get("text_snippet") or "").upper() + if bauzone_upper in ts and len(table_zones) <= 1: matches = True - + if matches: relevant_rules.append(rule) - - logger.info(f"Filtered {len(relevant_rules)} rules for Bauzone {bauzone} from {len(rules)} total rules") + + logger.info(f"Filtered {len(relevant_rules)} rules for Bauzone {bauzone} from {len(rules)} total (multi-zone articles excluded)") return relevant_rules @@ -2768,7 +2843,9 @@ async def generate_bauzone_ai_summary( gemeinde: str, extracted_content: Dict[str, Any], relevant_rules: List[Dict[str, Any]], - relevant_zones: List[Dict[str, Any]] + relevant_zones: List[Dict[str, Any]], + mandateId: Optional[str] = None, + featureInstanceId: Optional[str] = None, ) -> str: """ Use AI to generate a summary of relevant information for a Bauzone. @@ -2785,8 +2862,10 @@ async def generate_bauzone_ai_summary( AI-generated summary string """ try: - # Initialize AI service - services = getServices(currentUser, workflow=None) + # Initialize AI service (mandateId required for billing) + services = getServices( + currentUser, workflow=None, mandateId=mandateId, featureInstanceId=featureInstanceId + ) aiService = services.ai # Build context from extracted content, prioritizing zone-parameter tables diff --git a/modules/features/realEstate/parcelSelectionService.py b/modules/features/realEstate/parcelSelectionService.py new file mode 100644 index 00000000..c83efbe3 --- /dev/null +++ b/modules/features/realEstate/parcelSelectionService.py @@ -0,0 +1,180 @@ +""" +Parcel selection service: compute combined outline, total area, and Bauzone grouping. +Used for multi-parcel selection in PEK map view. +""" + +import logging +from typing import Any, Dict, List, Optional + +from shapely.geometry import Polygon +from shapely.ops import unary_union +from shapely.geometry.base import BaseGeometry + +logger = logging.getLogger(__name__) + + +def _parcel_to_shapely_polygon(parcel: Dict[str, Any]) -> Optional[Polygon]: + """ + Convert a parcel dict (perimeter or geometry_geojson) to Shapely Polygon. + Returns None if conversion fails. + """ + # Try geometry_geojson first + geo = parcel.get("geometry_geojson") or parcel.get("map_view", {}).get("geometry_geojson") + if geo and isinstance(geo, dict): + geom = geo.get("geometry") or geo + if geom and isinstance(geom, dict) and geom.get("type") == "Polygon": + coords = geom.get("coordinates") + if coords and len(coords) > 0: + ring = coords[0] if isinstance(coords[0][0], list) else coords + if ring and len(ring) >= 3: + try: + poly = Polygon(ring) + if not poly.is_empty: + return poly + except Exception as e: + logger.debug(f"GeoJSON to Polygon failed: {e}") + + # Try perimeter (punkte with x, y) + perimeter = parcel.get("perimeter") + if not perimeter and "parcel" in parcel: + perimeter = parcel.get("parcel", {}).get("perimeter") + if perimeter and isinstance(perimeter, dict): + punkte = perimeter.get("punkte", []) + if len(punkte) >= 3: + try: + coords = [(p.get("x"), p.get("y")) for p in punkte if "x" in p and "y" in p] + if len(coords) >= 3: + if coords[0] != coords[-1]: + coords.append(coords[0]) + return Polygon(coords) + except Exception as e: + logger.debug(f"Perimeter to Polygon failed: {e}") + + return None + + +def _shapely_to_geojson(geom: BaseGeometry) -> Dict[str, Any]: + """ + Convert Shapely geometry to GeoJSON dict (Polygon or MultiPolygon). + GeoJSON rings are closed (first point == last point). + """ + if geom is None or geom.is_empty: + return {"type": "Polygon", "coordinates": []} + + if hasattr(geom, "geoms"): + # MultiPolygon: coordinates = [ [[ring1]], [[ring2]], ... ] per polygon + parts = [] + for g in geom.geoms: + if not g.is_empty and hasattr(g, "exterior"): + ring = [list(c) for c in g.exterior.coords] + parts.append([ring]) + return {"type": "MultiPolygon", "coordinates": parts} + elif hasattr(geom, "exterior"): + ring = [list(c) for c in geom.exterior.coords] + return {"type": "Polygon", "coordinates": [ring]} + return {"type": "Polygon", "coordinates": []} + + +def compute_selection_summary(parcels: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Compute combined outline, total area, and Bauzone grouping for selected parcels. + + Args: + parcels: List of parcel dicts with perimeter, geometry_geojson, bauzone, area_m2. + Can have structure { "parcel": {...}, "map_view": {...} } or flat. + + Returns: + { + "combined_outline_geojson": { "type": "Polygon"|"MultiPolygon", "coordinates": [...] }, + "total_area_m2": float, + "bauzonen": [ { "bauzone": str, "parcels": [...], "area_m2": float } ] + } + """ + if not parcels: + return { + "combined_outline_geojson": {"type": "Polygon", "coordinates": []}, + "total_area_m2": 0.0, + "bauzonen": [], + } + + # Normalize: extract parcel data for geometry and Bauzone + shapely_polygons = [] + parcel_records = [] + for p in parcels: + # Support both { parcel: {...}, map_view: {...} } and flat { id, perimeter, ... } + flat = dict(p.get("parcel", {}), **{k: v for k, v in p.items() if k != "parcel"}) + flat["map_view"] = p.get("map_view", {}) + flat["geometry_geojson"] = flat.get("geometry_geojson") or flat.get("map_view", {}).get("geometry_geojson") + flat["perimeter"] = flat.get("perimeter") or flat.get("map_view", {}).get("geometry_geojson") and None + poly = _parcel_to_shapely_polygon(flat) + if poly is not None and not poly.is_empty: + shapely_polygons.append(poly) + parcel_records.append(flat) + + if not shapely_polygons: + return { + "combined_outline_geojson": {"type": "Polygon", "coordinates": []}, + "total_area_m2": 0.0, + "bauzonen": [], + } + + # Union all polygons (keeps MultiPolygon if disconnected) + combined = unary_union(shapely_polygons) + total_area_m2 = float(combined.area) if combined and not combined.is_empty else 0.0 + combined_geojson = _shapely_to_geojson(combined) + + # Group parcels by Bauzone + bauzone_map: Dict[str, List[Dict]] = {} + for flat in parcel_records: + bz = flat.get("bauzone") or flat.get("parcel", {}).get("bauzone") or "Unbekannt" + if bz not in bauzone_map: + bauzone_map[bz] = [] + bauzone_map[bz].append(flat) + + bauzonen = [] + for bz, plist in bauzone_map.items(): + area_sum = sum( + float(p.get("area_m2") or p.get("parcel", {}).get("area_m2") or 0) + for p in plist + ) + bauzonen.append({ + "bauzone": bz, + "parcels": plist, + "area_m2": round(area_sum, 2), + }) + + return { + "combined_outline_geojson": combined_geojson, + "total_area_m2": round(total_area_m2, 2), + "bauzonen": bauzonen, + } + + +def is_parcel_adjacent_to_selection( + new_parcel: Dict[str, Any], + selected_parcels: List[Dict[str, Any]], + buffer_m: float = 0.01, +) -> bool: + """ + Check if a parcel touches (is adjacent to) any selected parcel. + Uses small buffer for floating-point tolerance. + """ + new_poly = _parcel_to_shapely_polygon(new_parcel) + if new_poly is None or new_poly.is_empty: + return False + + for sel in selected_parcels: + flat = dict(sel.get("parcel", {}), **{k: v for k, v in sel.items() if k != "parcel"}) + flat["map_view"] = sel.get("map_view", {}) + flat["geometry_geojson"] = flat.get("geometry_geojson") or flat.get("map_view", {}).get("geometry_geojson") + sel_poly = _parcel_to_shapely_polygon(flat) + if sel_poly is None or sel_poly.is_empty: + continue + if buffer_m > 0: + sel_buf = sel_poly.buffer(buffer_m) + if new_poly.intersects(sel_buf) or new_poly.touches(sel_buf): + return True + else: + if new_poly.touches(sel_poly) or new_poly.intersects(sel_poly): + return True + return False diff --git a/modules/features/realEstate/realEstateGemeindeService.py b/modules/features/realEstate/realEstateGemeindeService.py new file mode 100644 index 00000000..dc8ff7b5 --- /dev/null +++ b/modules/features/realEstate/realEstateGemeindeService.py @@ -0,0 +1,377 @@ +""" +Gemeinde and BZO document services for Real Estate feature. +Provides ensure/import logic used by both routes and extract_bzo_information. +""" + +import asyncio +import hashlib +import json +import logging +import ssl +from typing import Any, Dict, List, Optional, Set + +import aiohttp + +from .datamodelFeatureRealEstate import Gemeinde, Kanton, Dokument, DokumentTyp, Kontext +from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector +from modules.aicore.aicorePluginTavily import AiTavily + +logger = logging.getLogger(__name__) + +KANTON_NAMES = { + "AG": "Aargau", "AI": "Appenzell Innerrhoden", "AR": "Appenzell Ausserrhoden", + "BE": "Bern", "BL": "Basel-Landschaft", "BS": "Basel-Stadt", + "FR": "Freiburg", "GE": "Genf", "GL": "Glarus", "GR": "Graubünden", + "JU": "Jura", "LU": "Luzern", "NE": "Neuenburg", "NW": "Nidwalden", + "OW": "Obwalden", "SG": "St. Gallen", "SH": "Schaffhausen", "SO": "Solothurn", + "SZ": "Schwyz", "TG": "Thurgau", "TI": "Tessin", "UR": "Uri", + "VD": "Waadt", "VS": "Wallis", "ZG": "Zug", "ZH": "Zürich", +} + +# Known direct BZO PDF URLs for municipalities (by normalized name, lowercase) +# These are tried first to avoid SSL/HTML issues with Tavily search results +KNOWN_BZO_PDF_URLS: Dict[str, str] = { + "schlieren": "https://www.schlieren.ch/_docn/6239470/SKR_10.10_Bauordnung.pdf", + "zürich": "https://www.stadt-zuerich.ch/content/dam/stzh/portal/Deutsch/AmtlicheSammlung/Erlasse/700/100/700.100%20Bau-%20und%20Zonenordnung%20V2.pdf", + "zurich": "https://www.stadt-zuerich.ch/content/dam/stzh/portal/Deutsch/AmtlicheSammlung/Erlasse/700/100/700.100%20Bau-%20und%20Zonenordnung%20V2.pdf", +} + + +def _get_language_from_kanton(kanton_abk: Optional[str]) -> str: + if not kanton_abk: + return "de" + if kanton_abk.upper() in {"VD", "GE", "NE", "JU"}: + return "fr" + if kanton_abk.upper() == "TI": + return "it" + return "de" + + +# Swiss news/media domains to exclude from BZO search (return HTML articles, not PDFs) +_EXCLUDE_BZO_DOMAINS = [ + "limmattalerzeitung.ch", + "20min.ch", + "tagesanzeiger.ch", + "nzz.ch", + "blick.ch", + "watson.ch", + "srf.ch", + "swissinfo.ch", + "zukunft-schlieren.ch", # project/development site, not official BZO +] + +# Keywords that indicate the actual BZO regulation document (at least one required in URL/title) +_BZO_ORDINANCE_KEYWORDS = ( + "bzo", + "zonenordnung", + "bauordnung", + "bau-und-zonenordnung", + "bau und zonenordnung", + "plan d'aménagement", + "règlement de construction", + "piano di utilizzazione", + "regolamento edilizio", +) + +# Keywords that indicate articles or project docs (exclude if present in URL/title) +_BZO_ARTICLE_PROJECT_KEYWORDS = ( + "ld.", # article ID (e.g. ld.2805321) + "warum", # "why" - typical in article headlines + "ruft ", # "calls [population to participate]" + "artikel", # article + "news", + "projektplanung", # project planning + "projekt/", # URL path for project pages + "/projekt", + "entwicklungsplan", # development plan (project doc) +) + + +def _normalize_gemeinde_for_match(name: str) -> str: + """Normalize Gemeinde name for URL/title matching (lowercase, no umlauts).""" + if not name: + return "" + s = name.lower().strip() + s = s.replace("ä", "ae").replace("ö", "oe").replace("ü", "ue").replace("ß", "ss") + s = "".join(c for c in s if c.isalnum()) + return s + + +def _get_bzo_search_query(gemeinde_label: str, language: str) -> str: + """Build search query targeting BZO PDF documents (not articles).""" + if language == "fr": + return f"Plan d'aménagement local {gemeinde_label} PDF" + if language == "it": + return f"Piano di utilizzazione {gemeinde_label} PDF" + return f"Bau und Zonenordnung {gemeinde_label} PDF" + + +async def ensure_single_gemeinde( + interface: Any, + mandateId: str, + instanceId: str, + gemeinde_name: str, +) -> Optional[Any]: + """ + Ensure the given Gemeinde exists in DB. Fetches ONLY that one Gemeinde from Swiss Topo + and creates it if not found. No bulk import. + Returns the Gemeinde object if found/created, None otherwise. + """ + if not gemeinde_name or not gemeinde_name.strip(): + return None + try: + connector = SwissTopoMapServerConnector() + gd = await connector.get_gemeinde_by_name(gemeinde_name) + except Exception as e: + logger.error(f"Error fetching Gemeinde '{gemeinde_name}' from Swiss Topo: {e}", exc_info=True) + return None + if not gd: + logger.warning(f"Gemeinde '{gemeinde_name}' not found in Swiss Topo") + return None + + def find_gemeinde_by_bfs_nummer(bfs_nummer: str) -> Optional[Any]: + try: + gemeinden = interface.getGemeinden(recordFilter={"mandateId": mandateId}) + for g in gemeinden: + for k in (g.kontextInformationen or []): + try: + data = json.loads(k.inhalt) if isinstance(k.inhalt, str) else k.inhalt + if isinstance(data, dict) and str(data.get("bfs_nummer")) == str(bfs_nummer): + return g + except (json.JSONDecodeError, AttributeError): + continue + except Exception as ex: + logger.error(f"Error finding Gemeinde by BFS {bfs_nummer}: {ex}", exc_info=True) + return None + + existing = find_gemeinde_by_bfs_nummer(str(gd["bfs_nummer"])) + if existing: + logger.info(f"Gemeinde '{gd['name']}' already in DB") + return existing + + kanton_abk = gd.get("kanton") + kanton_id = None + if kanton_abk: + kantone = interface.getKantone(recordFilter={"mandateId": mandateId, "abk": kanton_abk}) + if kantone: + kanton_id = kantone[0].id + else: + try: + kanton_label = KANTON_NAMES.get(kanton_abk, kanton_abk) + kanton = Kanton( + mandateId=mandateId, + featureInstanceId=instanceId, + label=kanton_label, + abk=kanton_abk, + ) + created_k = interface.createKanton(kanton) + if created_k and created_k.id: + kanton_id = created_k.id + except Exception as ex: + logger.error(f"Error creating Kanton {kanton_abk}: {ex}") + + try: + gemeinde = Gemeinde( + mandateId=mandateId, + featureInstanceId=instanceId, + label=gd["name"], + id_kanton=kanton_id, + kontextInformationen=[ + Kontext(thema="BFS Nummer", inhalt=json.dumps({"bfs_nummer": gd["bfs_nummer"]}, ensure_ascii=False)) + ], + ) + created = interface.createGemeinde(gemeinde) + if created and created.id: + logger.info(f"Created single Gemeinde '{gd['name']}' (BFS {gd['bfs_nummer']})") + return created + except Exception as ex: + logger.error(f"Error creating Gemeinde '{gd['name']}': {ex}", exc_info=True) + return None + + +def _extract_quelle(doc: Any) -> Optional[str]: + """Extract quelle (source URL) from a document.""" + return getattr(doc, "quelle", None) or (doc.get("quelle") if isinstance(doc, dict) else None) + + +async def fetch_bzo_for_gemeinde( + interface: Any, + componentInterface: Any, + gemeinde: Any, + mandateId: str, + instanceId: str, +) -> bool: + """ + Search for and download BZO documents for a single Gemeinde. + Returns True if at least one document was created. + Deduplication: re-fetches Gemeinde, skips if BZO exists, skips URLs we already have, + creates at most 1 new document per call to avoid duplicates from multiple Tavily URLs. + """ + # Re-fetch Gemeinde to get latest dokumente (avoid race with concurrent requests) + fresh = interface.getGemeinde(gemeinde.id) + if not fresh: + return False + gemeinde = fresh + + existing_bzo = False + existing_quellen: Set[str] = set() + if gemeinde.dokumente: + for doc in gemeinde.dokumente: + typ = getattr(doc, "dokumentTyp", None) or (doc.get("dokumentTyp") if isinstance(doc, dict) else None) + label = getattr(doc, "label", None) or (doc.get("label") if isinstance(doc, dict) else None) + q = _extract_quelle(doc) + if q: + existing_quellen.add(q) + if typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]: + existing_bzo = True + break + if label and any(x in (label or "").upper() for x in ("BZO", "BAU UND ZONENORDNUNG", "PLAN D'AMÉNAGEMENT", "RÈGLEMENT DE CONSTRUCTION", "PIANO DI", "REGOLAMENTO EDILIZIO")): + existing_bzo = True + break + if existing_bzo: + return True + + kanton_abk = None + if gemeinde.id_kanton: + k = interface.getKanton(gemeinde.id_kanton) + if k: + kanton_abk = k.abk + language = _get_language_from_kanton(kanton_abk) + search_query = _get_bzo_search_query(gemeinde.label, language) + + logger.info(f"Tavily BZO search for {gemeinde.label}: {search_query}") + tavily = AiTavily() + gemeinde_normalized = _normalize_gemeinde_for_match(gemeinde.label or "") + + search_results = await tavily._search( + query=search_query, + maxResults=10, + country="switzerland", + excludeDomains=_EXCLUDE_BZO_DOMAINS, + ) + if not search_results: + logger.warning(f"No Tavily search results for BZO of {gemeinde.label}") + return False + logger.info(f"Tavily returned {len(search_results)} results for BZO of {gemeinde.label}") + + # Filter: ONLY keep PDF URLs that are the actual BZO ordinance (not articles/project docs) + def _is_valid_bzo_result(url: str, title: str) -> bool: + combined = f"{url} {title}".lower() + combined_norm = _normalize_gemeinde_for_match(combined) + # 1. Gemeinde name MUST appear in URL or title + if not gemeinde_normalized or gemeinde_normalized not in combined_norm: + return False + # 2. MUST contain BZO ordinance keyword (actual regulation, not just "about" it) + if not any(kw in combined for kw in _BZO_ORDINANCE_KEYWORDS): + return False + # 3. EXCLUDE if it looks like an article or project planning doc + if any(kw in combined for kw in _BZO_ARTICLE_PROJECT_KEYWORDS): + return False + return True + + pdf_urls = [ + r.url + for r in search_results + if (r.url.lower().endswith(".pdf") or "/pdf" in r.url.lower()) + and _is_valid_bzo_result(r.url, r.title or "") + ] + if not pdf_urls: + logger.warning( + f"No PDF URLs with matching Gemeinde name for {gemeinde.label} " + f"(filtered {len(search_results)} results, requiring .pdf and name in URL/title)" + ) + return False + + # Prepend known direct PDF URLs for this Gemeinde (avoids SSL/HTML issues with Tavily results) + gemeinde_key = gemeinde.label.strip().lower() if gemeinde.label else "" + if gemeinde_key and gemeinde_key in KNOWN_BZO_PDF_URLS: + known_url = KNOWN_BZO_PDF_URLS[gemeinde_key] + pdf_urls = [known_url] + [u for u in pdf_urls if u != known_url] + logger.info(f"Using known BZO PDF URL for {gemeinde.label}") + + # Use ssl.CERT_NONE to avoid CERTIFICATE_VERIFY_FAILED on Windows/corporate environments + # (same approach as routeRealEstate for external HTTP requests) + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + connector = aiohttp.TCPConnector(ssl=ssl_context) + headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "application/pdf,*/*"} + timeout = aiohttp.ClientTimeout(total=30) + + async def download_pdf(session: aiohttp.ClientSession, url: str) -> Optional[bytes]: + for attempt in range(3): + try: + async with session.get(url, allow_redirects=True) as resp: + if resp.status == 200: + data = await resp.read() + if data and len(data) >= 100 and data.startswith(b"%PDF"): + return data + if data.startswith(b"<") or data.startswith(b"= 2: + raise + await asyncio.sleep(2) + return None + + created_dokumente: List[Any] = [] + current_dokumente = list(gemeinde.dokumente) if gemeinde.dokumente else [] + safe_name = "".join(c for c in gemeinde.label if c.isalnum() or c in (" ", "-", "_")).strip().replace(" ", "_") or "Gemeinde" + base_label = f"BZO {gemeinde.label}" if language == "de" else (f"Plan d'aménagement local {gemeinde.label}" if language == "fr" else f"Piano di utilizzazione {gemeinde.label}") + + # Track content hashes to avoid duplicate PDFs from different URLs + seen_content_hashes: Set[str] = set() + + async with aiohttp.ClientSession(timeout=timeout, headers=headers, connector=connector) as session: + for idx, pdf_url in enumerate(pdf_urls[:5]): + # Skip URL we already have + if pdf_url in existing_quellen: + logger.debug(f"Skipping duplicate URL for {gemeinde.label}: {pdf_url[:60]}...") + continue + try: + pdf_content = await download_pdf(session, pdf_url) + if not pdf_content or len(pdf_content) < 100: + continue + # Deduplicate by content hash (same PDF from different URLs) + content_hash = hashlib.sha256(pdf_content[:8192]).hexdigest() + if content_hash in seen_content_hashes: + logger.debug(f"Skipping duplicate content for {gemeinde.label} (hash match)") + continue + seen_content_hashes.add(content_hash) + + file_name = f"BZO_{safe_name}.pdf" + doc_label = base_label + file_item = componentInterface.createFile(name=file_name, mimeType="application/pdf", content=pdf_content) + componentInterface.createFileData(file_item.id, pdf_content) + dokument = Dokument( + mandateId=mandateId, + featureInstanceId=instanceId, + label=doc_label, + versionsbezeichnung="Aktuell", + dokumentTyp=DokumentTyp.GEMEINDE_BZO_AKTUELL, + dokumentReferenz=file_item.id, + quelle=pdf_url, + mimeType="application/pdf", + kategorienTags=["BZO", "Bauordnung", gemeinde.label], + ) + created_dok = interface.createDokument(dokument) + created_dokumente.append(created_dok) + current_dokumente.append(created_dok) + existing_quellen.add(pdf_url) + # Create at most 1 BZO document per Gemeinde to prevent duplicates + logger.info(f"Created BZO document for {gemeinde.label}, stopping (1 doc per Gemeinde)") + break + except Exception as ex: + logger.warning(f"Error downloading BZO for {gemeinde.label} from {pdf_url}: {ex}") + continue + + if created_dokumente: + interface.updateGemeinde(gemeinde.id, {"dokumente": current_dokumente}) + logger.info(f"Created {len(created_dokumente)} BZO document(s) for {gemeinde.label}") + return True + return False diff --git a/modules/features/realEstate/routeFeatureRealEstate.py b/modules/features/realEstate/routeFeatureRealEstate.py index 9c56541f..5237c2ea 100644 --- a/modules/features/realEstate/routeFeatureRealEstate.py +++ b/modules/features/realEstate/routeFeatureRealEstate.py @@ -3,11 +3,15 @@ Real Estate routes for the backend API. Implements stateless endpoints for real estate database operations with AI-powered natural language processing. """ -import logging +import asyncio import json +import logging +import re +import aiohttp import requests from typing import Optional, Dict, Any, List, Union from fastapi import APIRouter, HTTPException, Depends, Body, Request, Query, Path, status +from fastapi.responses import JSONResponse # Import auth modules from modules.auth import limiter, getRequestContext, RequestContext @@ -25,6 +29,7 @@ from .datamodelFeatureRealEstate import ( Projekt, Parzelle, Dokument, + DokumentTyp, Gemeinde, Kanton, Land, @@ -39,10 +44,18 @@ from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface from .mainRealEstate import ( processNaturalLanguageCommand, create_project_with_parcel_data, + extract_bzo_information, ) +from .parcelSelectionService import compute_selection_summary, is_parcel_adjacent_to_selection -# Import Swiss Topo MapServer connector for testing +# Import Swiss Topo MapServer, ÖREB and Zurich WFS connectors from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector +from modules.connectors.connectorOerebWfs import OerebWfsConnector +from modules.connectors.connectorZhWfsParcels import ZhWfsParcelsConnector + +# Import ComponentObjects and Tavily for BZO document fetch +from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface +from modules.aicore.aicorePluginTavily import AiTavily # Import attribute utilities for model schema from modules.shared.attributeUtils import getModelAttributeDefinitions @@ -457,6 +470,314 @@ def delete_parcel( raise HTTPException(status_code=500, detail="Delete failed") +# ===== Helpers for Gemeinde/BZO routes ===== + +def _get_language_from_kanton(kanton_abk: Optional[str]) -> str: + """Determine language (de/fr/it) based on Kanton abbreviation.""" + if not kanton_abk: + return "de" + french_cantons = {"VD", "GE", "NE", "JU"} + italian_cantons = {"TI"} + kanton_upper = kanton_abk.upper() + if kanton_upper in french_cantons: + return "fr" + if kanton_upper in italian_cantons: + return "it" + return "de" + + +def _get_bzo_search_query(gemeinde_label: str, language: str) -> str: + """Generate language-specific BZO search query for a Gemeinde.""" + if language == "fr": + return f"Plan d'aménagement local {gemeinde_label} OR Règlement de construction {gemeinde_label}" + if language == "it": + return f"Piano di utilizzazione {gemeinde_label} OR Regolamento edilizio {gemeinde_label}" + return f"Bau und Zonenordnung {gemeinde_label}" + + +# ----- Instance-scoped Gemeinde and BZO routes ----- + +@router.get("/{instanceId}/gemeinden", response_model=Dict[str, Any]) +@limiter.limit("60/minute") +async def get_instance_gemeinden( + request: Request, + instanceId: str = Path(..., description="Feature Instance ID"), + only_current: bool = Query(True, description="Only current municipalities (exclude historical)"), + context: RequestContext = Depends(getRequestContext), +) -> Dict[str, Any]: + """ + Fetch all Gemeinden from Swiss Topo and save to DB for this instance. + Creates Kantone as needed. Scoped to instance mandateId. + """ + mandateId = _validateInstanceAccess(instanceId, context) + interface = getRealEstateInterface( + context.user, mandateId=mandateId, featureInstanceId=instanceId + ) + try: + oereb_connector = OerebWfsConnector() + connector = SwissTopoMapServerConnector(oereb_connector=oereb_connector) + gemeinden_data = await connector.get_all_gemeinden(only_current=only_current) + except Exception as e: + logger.error(f"Error fetching Gemeinden from Swiss Topo: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error fetching Gemeinden: {str(e)}") + gemeinden_created = 0 + gemeinden_skipped = 0 + kantone_created = 0 + errors: List[str] = [] + kanton_cache: Dict[str, str] = {} + + def find_gemeinde_by_bfs_nummer(bfs_nummer: str) -> Optional[Any]: + try: + gemeinden = interface.getGemeinden(recordFilter={"mandateId": mandateId}) + for g in gemeinden: + for k in (g.kontextInformationen or []): + try: + data = json.loads(k.inhalt) if isinstance(k.inhalt, str) else k.inhalt + if isinstance(data, dict) and str(data.get("bfs_nummer")) == str(bfs_nummer): + return g + except (json.JSONDecodeError, AttributeError): + continue + except Exception as ex: + logger.error(f"Error finding Gemeinde by BFS {bfs_nummer}: {ex}", exc_info=True) + return None + + def get_or_create_kanton(kanton_abk: str) -> Optional[str]: + nonlocal kantone_created, errors + if not kanton_abk: + return None + if kanton_abk in kanton_cache: + return kanton_cache[kanton_abk] + kantone = interface.getKantone(recordFilter={"mandateId": mandateId, "abk": kanton_abk}) + if kantone: + kanton_cache[kanton_abk] = kantone[0].id + return kantone[0].id + kanton_names = { + "AG": "Aargau", "AI": "Appenzell Innerrhoden", "AR": "Appenzell Ausserrhoden", + "BE": "Bern", "BL": "Basel-Landschaft", "BS": "Basel-Stadt", + "FR": "Freiburg", "GE": "Genf", "GL": "Glarus", "GR": "Graubünden", + "JU": "Jura", "LU": "Luzern", "NE": "Neuenburg", "NW": "Nidwalden", + "OW": "Obwalden", "SG": "St. Gallen", "SH": "Schaffhausen", "SO": "Solothurn", + "SZ": "Schwyz", "TG": "Thurgau", "TI": "Tessin", "UR": "Uri", + "VD": "Waadt", "VS": "Wallis", "ZG": "Zug", "ZH": "Zürich", + } + try: + kanton_label = kanton_names.get(kanton_abk, kanton_abk) + kanton = Kanton( + mandateId=mandateId, + featureInstanceId=instanceId, + label=kanton_label, + abk=kanton_abk, + ) + created = interface.createKanton(kanton) + if created and created.id: + kanton_cache[kanton_abk] = created.id + kantone_created += 1 + return created.id + except Exception as ex: + errors.append(f"Error creating Kanton {kanton_abk}: {ex}") + return None + + saved_gemeinden: List[Dict[str, Any]] = [] + for gd in gemeinden_data: + try: + gemeinde_name = gd.get("name") + bfs_nummer = gd.get("bfs_nummer") + kanton_abk = gd.get("kanton") + if not gemeinde_name or bfs_nummer is None: + gemeinden_skipped += 1 + continue + existing = find_gemeinde_by_bfs_nummer(str(bfs_nummer)) + if existing: + gemeinden_skipped += 1 + saved_gemeinden.append(existing.model_dump() if hasattr(existing, "model_dump") else existing) + continue + kanton_id = get_or_create_kanton(kanton_abk) if kanton_abk else None + gemeinde = Gemeinde( + mandateId=mandateId, + featureInstanceId=instanceId, + label=gemeinde_name, + id_kanton=kanton_id, + kontextInformationen=[ + Kontext(thema="BFS Nummer", inhalt=json.dumps({"bfs_nummer": bfs_nummer}, ensure_ascii=False)) + ], + ) + created = interface.createGemeinde(gemeinde) + if created and created.id: + gemeinden_created += 1 + saved_gemeinden.append(created.model_dump() if hasattr(created, "model_dump") else created) + else: + errors.append(f"Failed to create Gemeinde {gemeinde_name}") + gemeinden_skipped += 1 + except Exception as ex: + errors.append(f"Error processing {gd.get('name', 'Unknown')}: {str(ex)}") + gemeinden_skipped += 1 + + return { + "gemeinden": saved_gemeinden, + "count": len(saved_gemeinden), + "stats": { + "gemeinden_created": gemeinden_created, + "gemeinden_skipped": gemeinden_skipped, + "kantone_created": kantone_created, + "error_count": len(errors), + "errors": errors[:10], + }, + } + + +@router.post("/{instanceId}/gemeinden/fetch-bzo-documents", response_model=Dict[str, Any]) +@limiter.limit("10/hour") +async def fetch_instance_bzo_documents( + request: Request, + instanceId: str = Path(..., description="Feature Instance ID"), + context: RequestContext = Depends(getRequestContext), +) -> Dict[str, Any]: + """Search for and download BZO documents for all Gemeinden of this instance (1 doc per Gemeinde, no duplicates).""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = getRealEstateInterface( + context.user, mandateId=mandateId, featureInstanceId=instanceId + ) + componentInterface = getComponentInterface( + context.user, mandateId=mandateId, featureInstanceId=instanceId + ) + from modules.features.realEstate.realEstateGemeindeService import fetch_bzo_for_gemeinde + + gemeinden = interface.getGemeinden(recordFilter={"mandateId": mandateId}) + stats = {"gemeinden_processed": 0, "documents_created": 0, "documents_skipped": 0, "errors": []} + results: List[Dict[str, Any]] = [] + + for gemeinde in gemeinden: + gr = {"gemeinde_id": gemeinde.id, "gemeinde_label": gemeinde.label, "status": None, "dokument_ids": [], "error": None} + try: + stats["gemeinden_processed"] += 1 + fetched = await fetch_bzo_for_gemeinde( + interface, componentInterface, gemeinde, mandateId, instanceId + ) + if fetched: + gr["status"] = "created" + stats["documents_created"] += 1 + refreshed = interface.getGemeinde(gemeinde.id) + if refreshed and refreshed.dokumente: + for doc in refreshed.dokumente: + doc_id = getattr(doc, "id", None) or (doc.get("id") if isinstance(doc, dict) else None) + if doc_id: + gr["dokument_ids"].append(doc_id) + else: + gr["status"] = "skipped" + stats["documents_skipped"] += 1 + except Exception as ex: + gr["status"] = "error" + gr["error"] = str(ex) + stats["errors"].append(f"{gemeinde.label}: {str(ex)}") + results.append(gr) + + return {"success": True, "stats": stats, "results": results} + + +@router.get("/{instanceId}/parcel-documents", response_model=Dict[str, Any]) +@limiter.limit("60/minute") +async def get_parcel_documents( + request: Request, + instanceId: str = Path(..., description="Feature Instance ID"), + gemeinde: str = Query(..., description="Gemeinde name (e.g. Zürich)"), + bauzone: str = Query(..., description="Bauzone code (e.g. W5)"), + context: RequestContext = Depends(getRequestContext), +) -> Dict[str, Any]: + """ + Ensure BZO document exists for Gemeinde, return documents for parcel info display. + Creates Gemeinde (Swiss Topo) and BZO (Tavily) if not in DB. + Returns documents for preview - does NOT run LangGraph. + """ + mandateId = _validateInstanceAccess(instanceId, context) + interface = getRealEstateInterface( + context.user, mandateId=mandateId, featureInstanceId=instanceId + ) + componentInterface = getComponentInterface( + context.user, mandateId=mandateId, featureInstanceId=instanceId + ) + from modules.features.realEstate.realEstateGemeindeService import ( + ensure_single_gemeinde, + fetch_bzo_for_gemeinde, + ) + gemeinde_obj = None + by_label = interface.getGemeinden(recordFilter={"label": gemeinde, "mandateId": mandateId}) + gemeinde_obj = by_label[0] if by_label else None + if not gemeinde_obj: + gemeinde_obj = await ensure_single_gemeinde(interface, mandateId, instanceId, gemeinde_name=gemeinde) + if not gemeinde_obj: + return {"documents": [], "error": f"Gemeinde '{gemeinde}' nicht gefunden"} + bzo_docs = [] + if gemeinde_obj.dokumente: + for doc in gemeinde_obj.dokumente: + typ = getattr(doc, "dokumentTyp", None) or (doc.get("dokumentTyp") if isinstance(doc, dict) else None) + if typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION] or str(typ) in ["gemeindeBzoAktuell", "gemeindeBzoRevision"]: + doc_id = doc.id if hasattr(doc, "id") else doc.get("id") + if doc_id: + full = interface.getDokument(doc_id) + if full and full.dokumentReferenz: + bzo_docs.append(full) + if not bzo_docs: + fetched = await fetch_bzo_for_gemeinde(interface, componentInterface, gemeinde_obj, mandateId, instanceId) + if fetched: + gemeinde_obj = interface.getGemeinde(gemeinde_obj.id) + if gemeinde_obj and gemeinde_obj.dokumente: + for doc in gemeinde_obj.dokumente: + typ = getattr(doc, "dokumentTyp", None) or (doc.get("dokumentTyp") if isinstance(doc, dict) else None) + if typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]: + doc_id = doc.id if hasattr(doc, "id") else doc.get("id") + if doc_id: + full = interface.getDokument(doc_id) + if full and full.dokumentReferenz: + bzo_docs.append(full) + result = [] + for d in bzo_docs: + result.append({ + "id": d.id, + "label": d.label, + "fileId": d.dokumentReferenz, + "fileName": (d.label or "BZO") + ".pdf", + "mimeType": d.mimeType or "application/pdf", + }) + return {"documents": result, "gemeinde": gemeinde, "bauzone": bauzone} + + +@router.get("/{instanceId}/bzo-information", response_model=Dict[str, Any]) +@limiter.limit("30/minute") +async def get_instance_bzo_information( + request: Request, + instanceId: str = Path(..., description="Feature Instance ID"), + gemeinde: str = Query(..., description="Gemeinde name or ID"), + bauzone: str = Query(..., description="Bauzone code (e.g., W3, W2/30)"), + total_area_m2: Optional[float] = Query(None, description="Total parcel area (m²) for Machbarkeitsstudie"), + parcel_ids: Optional[str] = Query(None, description="Comma-separated parcel IDs; total area computed from parcels"), + context: RequestContext = Depends(getRequestContext), +) -> Dict[str, Any]: + """Extract BZO information for a Bauzone in a Gemeinde. Runs LangGraph workflow. With total_area_m2 or parcel_ids, includes Machbarkeitsstudie.""" + mandateId = _validateInstanceAccess(instanceId, context) + parcels = None + if parcel_ids: + ids = [x.strip() for x in parcel_ids.split(",") if x.strip()] + if ids: + interface = getRealEstateInterface( + context.user, mandateId=mandateId, featureInstanceId=instanceId + ) + parcels = [] + for pid in ids: + p = interface.getParzelle(pid) + if p: + flat = dict(p) if hasattr(p, "keys") else (vars(p) if hasattr(p, "__dict__") else {}) + parcels.append({"parcel": flat, "map_view": flat.get("map_view", {})}) + return await extract_bzo_information( + currentUser=context.user, + gemeinde=gemeinde, + bauzone=bauzone, + mandateId=mandateId, + featureInstanceId=instanceId, + total_area_m2=total_area_m2, + parcels=parcels, + ) + + # ============================================================================ # LEGACY / STATELESS ROUTES (unchanged) # ============================================================================ @@ -995,12 +1316,38 @@ async def create_table_record( ) +@router.get("/parcel/wfs") +@limiter.limit("60/minute") +def get_parcels_wfs( + request: Request, + bbox: str = Query(..., description="Bounding box as minx,miny,maxx,maxy in LV95 (EPSG:2056)"), + context: RequestContext = Depends(getRequestContext) +) -> JSONResponse: + """ + Fetch parcel geometries from geodienste.ch OGC API (Swiss Liegenschaften) within bounding box. + Returns GeoJSON FeatureCollection in WGS84 for map display. + """ + try: + connector = ZhWfsParcelsConnector() + geojson = connector.get_parcels_by_bbox(bbox) + return JSONResponse(content=geojson) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error fetching WFS parcels: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail="Failed to fetch parcel data from WFS" + ) + + @router.get("/parcel/search", response_model=Dict[str, Any]) @limiter.limit("60/minute") async def search_parcel( request: Request, location: str = Query(..., description="Either coordinates as 'x,y' (LV95) or address string"), include_adjacent: bool = Query(False, description="Include adjacent parcels information"), + include_bauzone: bool = Query(True, description="Include Bauzone from ÖREB WFS (zone information)"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ @@ -1010,12 +1357,14 @@ async def search_parcel( - Parcel identification (number, EGRID, etc.) - Precise boundary geometry for map display - Administrative context (canton, municipality) + - Bauzone (zone code from ÖREB WFS when include_bauzone=True) - Link to official cadastral map - Optional: Adjacent parcels Query Parameters: - location: Either coordinates as "x,y" (LV95/EPSG:2056) or address string - include_adjacent: If true, fetches information about adjacent parcels (slower) + - include_bauzone: If true, queries ÖREB WFS for zone info (Bauzone/Wohnzone) Headers: - X-CSRF-Token: CSRF token (required for security) @@ -1073,6 +1422,9 @@ async def search_parcel( "y": sum_y / len(points) } + # Extract canton early (needed for bauzone query and municipality resolution) + canton = attributes.get("ak", "") + # Extract municipality name and address from Swiss Topo data municipality_name = None full_address = None @@ -1126,32 +1478,82 @@ async def search_parcel( full_address = location logger.debug(f"Using location as address: {full_address}") + # Try to extract municipality name from address string (e.g. "Forchstrasse 6c, 8610 Uster") + if not municipality_name and full_address: + plz_municipality_match = re.search(r"\b(\d{4})\s+([A-ZÄÖÜ][a-zäöüß\s-]+)", full_address) + if plz_municipality_match: + extracted_municipality = plz_municipality_match.group(2).strip() + extracted_municipality = re.sub(r"[,;\.]+$", "", extracted_municipality).strip() + if extracted_municipality: + municipality_name = extracted_municipality + if not plz: + plz = plz_municipality_match.group(1) + logger.debug(f"Extracted municipality from address: {municipality_name}") + # Try to extract municipality name from BFSNR if not found - if not municipality_name: - # Common Swiss municipalities lookup (you can expand this) - bfsnr = attributes.get("bfsnr") - canton = attributes.get("ak", "") - - # Basic municipality lookup for common codes + bfsnr = attributes.get("bfsnr") + if not municipality_name and bfsnr and canton and context.mandateId: + try: + interface = getRealEstateInterface( + context.user, mandateId=str(context.mandateId), featureInstanceId=None + ) + gemeinden = interface.getGemeinden(recordFilter={"mandateId": str(context.mandateId)}) + for g in gemeinden: + for k in (g.kontextInformationen or []): + try: + data = json.loads(k.inhalt) if isinstance(k.inhalt, str) else k.inhalt + if isinstance(data, dict): + bfs = data.get("bfs_nummer") or data.get("bfsnr") or data.get("municipality_code") + if str(bfs) == str(bfsnr): + municipality_name = g.label + logger.debug(f"Found Gemeinde by BFS {bfsnr} in DB: {municipality_name}") + break + except (json.JSONDecodeError, AttributeError): + continue + if municipality_name: + break + except Exception as e: + logger.debug(f"Error querying Gemeinde by BFS: {e}") + + # Swiss Topo geocoding to get municipality from coordinates + if not municipality_name and centroid and canton: + try: + geocode_url = "https://api3.geo.admin.ch/rest/services/api/MapServer/identify" + params = { + "geometry": f"{centroid['x']},{centroid['y']}", + "geometryType": "esriGeometryPoint", + "layers": "all:ch.swisstopo.swissboundaries3d-gemeinde-flaeche.fill", + "tolerance": "0", + "returnGeometry": "false", + "sr": "2056", + "f": "json", + } + async with aiohttp.ClientSession() as session: + async with session.get(geocode_url, params=params) as resp: + if resp.status == 200: + data = await resp.json() + results = data.get("results", []) + if results: + attrs = results[0].get("attributes", {}) + geo_name = attrs.get("name") or attrs.get("gemeindename") or attrs.get("label") + if geo_name: + municipality_name = connector._clean_municipality_name(str(geo_name)) + logger.debug(f"Found municipality via Swiss Topo geocoding: {municipality_name}") + except Exception as e: + logger.debug(f"Error querying Swiss Topo geocoding: {e}") + + # Expanded common municipalities fallback + if not municipality_name and bfsnr: common_municipalities = { - 351: "Bern", - 261: "Zuerich", - 6621: "Geneve", - 2701: "Basel", - 5586: "Lausanne", - 1061: "Luzern", - 3203: "Winterthur", - 230: "St. Gallen", - 5192: "Lugano", - 1367: "Schwyz" + 261: "Zürich", 198: "Pfäffikon", 191: "Uster", 3203: "Winterthur", + 351: "Bern", 2701: "Basel", 6621: "Genève", 5586: "Lausanne", + 1061: "Luzern", 230: "St. Gallen", 5192: "Lugano", 1367: "Schwyz", } - - if bfsnr and bfsnr in common_municipalities: + if bfsnr in common_municipalities: municipality_name = common_municipalities[bfsnr] - logger.debug(f"Looked up municipality: {municipality_name}") - else: - # Fallback: Use canton + code - municipality_name = f"{canton}-{bfsnr}" if canton and bfsnr else "Unknown" + logger.debug(f"Looked up municipality from common list: {municipality_name}") + elif canton and bfsnr: + municipality_name = f"{canton}-{bfsnr}" logger.debug(f"Using fallback municipality: {municipality_name}") # Final validation: Don't use EGRID as address @@ -1160,6 +1562,29 @@ async def search_parcel( full_address = None logger.debug("Removed EGRID from address field") + # Query Bauzone (wohnzone) from ÖREB WFS when requested + bauzone = None + has_geometry = geometry and (geometry.get("rings") or geometry.get("coordinates")) + if include_bauzone and canton and has_geometry and centroid: + try: + logger.debug(f"Querying zone information for parcel {attributes.get('label')} in canton {canton}") + oereb_connector = OerebWfsConnector() + zone_results = await oereb_connector.query_zone_layer( + egrid=attributes.get("egris_egrid", "") or "", + x=centroid["x"], + y=centroid["y"], + canton=canton, + geometry=geometry, + ) + if zone_results and len(zone_results) > 0: + zone_attrs = zone_results[0].get("attributes", {}) + typ_gde_abkuerzung = zone_attrs.get("typ_gde_abkuerzung") + if typ_gde_abkuerzung: + bauzone = typ_gde_abkuerzung + logger.debug(f"Found bauzone: {bauzone} for parcel {attributes.get('label')}") + except Exception as e: + logger.warning(f"Error querying zone information: {e}", exc_info=True) + # Build parcel info parcel_info = { "id": attributes.get("label") or attributes.get("number"), @@ -1176,7 +1601,8 @@ async def search_parcel( "area_m2": area_m2, "centroid": centroid, "geoportal_url": attributes.get("geoportal_url"), - "realestate_type": attributes.get("realestate_type") + "realestate_type": attributes.get("realestate_type"), + "bauzone": bauzone, } # Build map view info @@ -1310,6 +1736,172 @@ async def search_parcel( ) +@router.post("/parcel/selection-summary", response_model=Dict[str, Any]) +@limiter.limit("60/minute") +async def parcel_selection_summary( + request: Request, + body: Dict[str, Any] = Body(..., description="Parcel selection data"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """ + Compute combined outline, total area, and Bauzone grouping for selected parcels. + Request body: { "parcels": [ { parcel, map_view, perimeter, geometry_geojson, ... } ] } + """ + try: + csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token") + if not csrf_token: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="CSRF token missing. Please include X-CSRF-Token header." + ) + parcels = body.get("parcels", []) + if not parcels: + return { + "combined_outline_geojson": {"type": "Polygon", "coordinates": []}, + "total_area_m2": 0.0, + "bauzonen": [], + } + result = compute_selection_summary(parcels) + logger.info(f"Computed selection summary for {len(parcels)} parcels, total area {result['total_area_m2']} m²") + return result + except HTTPException: + raise + except Exception as e: + logger.error(f"Error computing selection summary: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error computing selection summary: {str(e)}" + ) + + +def _build_geometry_geojson(extracted: Dict[str, Any], parcel_info: Dict[str, Any]) -> Dict[str, Any]: + """Build geometry_geojson from extracted perimeter for add-adjacent response.""" + coords = [] + if extracted.get("perimeter", {}).get("punkte"): + coords = [[[p["x"], p["y"]] for p in extracted["perimeter"]["punkte"]]] + return { + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": coords}, + "properties": {"id": parcel_info["id"], "egrid": parcel_info["egrid"], "number": parcel_info["number"]}, + } + + +@router.post("/parcel/add-adjacent", response_model=Dict[str, Any]) +@limiter.limit("60/minute") +async def add_adjacent_parcel( + request: Request, + body: Dict[str, Any] = Body(..., description="Location and selected parcels"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """ + Add an adjacent parcel to the selection. Validates that the parcel at the given + location touches the current selection. + Request body: { "location": { "x": number, "y": number }, "selected_parcels": [...] } + """ + try: + csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token") + if not csrf_token: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="CSRF token missing. Please include X-CSRF-Token header." + ) + location = body.get("location") + selected_parcels = body.get("selected_parcels", []) + if not location or "x" not in location or "y" not in location: + raise HTTPException(status_code=400, detail="location with x,y required") + loc_str = f"{location['x']},{location['y']}" + connector = SwissTopoMapServerConnector() + parcel_data = await connector.search_parcel(loc_str) + if not parcel_data: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="No parcel found at this location" + ) + extracted = connector.extract_parcel_attributes(parcel_data) + attributes = parcel_data.get("attributes", {}) + geometry = parcel_data.get("geometry", {}) + area_m2 = None + centroid = None + if extracted.get("perimeter"): + perimeter = extracted["perimeter"] + points = perimeter.get("punkte", []) + if len(points) >= 3: + area = 0 + for i in range(len(points)): + j = (i + 1) % len(points) + area += points[i]["x"] * points[j]["y"] + area -= points[j]["x"] * points[i]["y"] + area_m2 = abs(area / 2) + sum_x = sum(p["x"] for p in points) + sum_y = sum(p["y"] for p in points) + centroid = {"x": sum_x / len(points), "y": sum_y / len(points)} + parcel_info = { + "id": attributes.get("label") or attributes.get("number"), + "egrid": attributes.get("egris_egrid"), + "number": attributes.get("number"), + "name": attributes.get("name"), + "identnd": attributes.get("identnd"), + "canton": attributes.get("ak"), + "municipality_code": attributes.get("bfsnr"), + "municipality_name": None, + "address": None, + "plz": None, + "perimeter": extracted.get("perimeter"), + "area_m2": area_m2, + "centroid": centroid, + "geoportal_url": attributes.get("geoportal_url"), + "realestate_type": attributes.get("realestate_type"), + "bauzone": None, + } + map_view = { + "center": centroid, + "zoom_bounds": parcel_data.get("bbox", []) and { + "min_x": parcel_data["bbox"][0], + "min_y": parcel_data["bbox"][1], + "max_x": parcel_data["bbox"][2], + "max_y": parcel_data["bbox"][3], + } or None, + "geometry_geojson": _build_geometry_geojson(extracted, parcel_info), + } + new_parcel_response = {"parcel": parcel_info, "map_view": map_view} + if not is_parcel_adjacent_to_selection(new_parcel_response, selected_parcels): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Nur angrenzende Parzellen können hinzugefügt werden" + ) + bbox = parcel_data.get("bbox", []) + map_view["zoom_bounds"] = { + "min_x": bbox[0], "min_y": bbox[1], "max_x": bbox[2], "max_y": bbox[3] + } if len(bbox) >= 4 else None + geocoded_address = parcel_data.get("geocoded_address") + if geocoded_address: + parcel_info["municipality_name"] = geocoded_address.get("municipality") + parcel_info["address"] = geocoded_address.get("full_address") + parcel_info["plz"] = geocoded_address.get("plz") + if centroid and attributes.get("ak"): + try: + oereb = OerebWfsConnector() + zone_results = await oereb.query_zone_layer( + egrid=attributes.get("egris_egrid", "") or "", + x=centroid["x"], y=centroid["y"], + canton=attributes.get("ak"), + geometry=geometry, + ) + if zone_results and len(zone_results) > 0: + parcel_info["bauzone"] = zone_results[0].get("attributes", {}).get("typ_gde_abkuerzung") + except Exception as oe: + logger.debug(f"ÖREB zone query failed: {oe}") + return new_parcel_response + except HTTPException: + raise + except Exception as e: + logger.error(f"Error adding adjacent parcel: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error adding adjacent parcel: {str(e)}" + ) + + @router.post("/projekt/{projekt_id}/add-parcel", response_model=Dict[str, Any]) @limiter.limit("60/minute") async def add_parcel_to_project( diff --git a/modules/features/realEstate/scrapeSwissTopo.py b/modules/features/realEstate/scrapeSwissTopo.py index f761100c..7f7d54e7 100644 --- a/modules/features/realEstate/scrapeSwissTopo.py +++ b/modules/features/realEstate/scrapeSwissTopo.py @@ -15,7 +15,7 @@ from dataclasses import dataclass import json from modules.datamodels.datamodelUam import User -from modules.datamodels.datamodelRealEstate import ( +from .datamodelFeatureRealEstate import ( Parzelle, GeoPolylinie, GeoPunkt, @@ -23,7 +23,7 @@ from modules.datamodels.datamodelRealEstate import ( Gemeinde, Kanton, ) -from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface +from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector from modules.connectors.connectorOerebWfs import OerebWfsConnector diff --git a/modules/interfaces/interfaceDbManagement.py b/modules/interfaces/interfaceDbManagement.py index 6d4a912e..61e32886 100644 --- a/modules/interfaces/interfaceDbManagement.py +++ b/modules/interfaces/interfaceDbManagement.py @@ -1254,6 +1254,34 @@ class ComponentObjects: logger.error(f"Error processing file data for {fileId}: {str(e)}") return None + def getFileDataForPublicDocument(self, fileId: str) -> Optional[bytes]: + """ + Returns binary data for public documents (e.g. BZO) WITHOUT RBAC filtering. + Use for official/mandate documents that must be accessible to all users. + Reads FileData directly from database. + """ + try: + fileDataEntries = self.db.getRecordset(FileData, recordFilter={"id": fileId}) + if not fileDataEntries: + logger.warning(f"No file data found for public document ID {fileId}") + return None + fileDataEntry = fileDataEntries[0] + if "data" not in fileDataEntry: + logger.warning(f"No data field in file data for ID {fileId}") + return None + data = fileDataEntry["data"] + base64Encoded = fileDataEntry.get("base64Encoded", False) + if base64Encoded: + return base64.b64decode(data) + # PDF/binary stored as text: try base64 decode (common for binary files) + try: + return base64.b64decode(data) + except Exception: + return data.encode("utf-8") if isinstance(data, str) else data + except Exception as e: + logger.error(f"Error retrieving public document {fileId}: {str(e)}", exc_info=True) + return None + def getFileContent(self, fileId: str) -> Optional[FilePreview]: """Returns the full file content if user has access.""" try: diff --git a/modules/routes/routeRealEstate.py b/modules/routes/routeRealEstate.py index 587209a2..cb1506c1 100644 --- a/modules/routes/routeRealEstate.py +++ b/modules/routes/routeRealEstate.py @@ -2254,6 +2254,7 @@ async def get_bzo_information( request: Request, gemeinde: str = Query(..., description="Gemeinde name or ID"), bauzone: str = Query(..., description="Bauzone code (e.g., W3, W2/30)"), + total_area_m2: Optional[float] = Query(None, description="Total parcel area (m²) for Machbarkeitsstudie"), currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """ @@ -2348,7 +2349,8 @@ async def get_bzo_information( result = await extract_bzo_information( currentUser=currentUser, gemeinde=gemeinde, - bauzone=bauzone + bauzone=bauzone, + total_area_m2=total_area_m2, ) return result diff --git a/modules/routes/routeSystem.py b/modules/routes/routeSystem.py index 6b01ad21..1a2ad6bf 100644 --- a/modules/routes/routeSystem.py +++ b/modules/routes/routeSystem.py @@ -100,7 +100,7 @@ def _getFeatureUiObjects(featureCode: str) -> List[Dict[str, Any]]: from modules.features.trustee.mainTrustee import UI_OBJECTS return UI_OBJECTS elif featureCode == "realestate": - from modules.features.realestate.mainRealEstate import UI_OBJECTS + from modules.features.realEstate.mainRealEstate import UI_OBJECTS return UI_OBJECTS elif featureCode == "chatplayground": from modules.features.chatplayground.mainChatplayground import UI_OBJECTS