377 lines
16 KiB
Python
377 lines
16 KiB
Python
"""
|
|
Gemeinde and BZO document services for Real Estate feature.
|
|
Provides ensure/import logic used by both routes and extract_bzo_information.
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import ssl
|
|
from typing import Any, Dict, List, Optional, Set
|
|
|
|
import aiohttp
|
|
|
|
from .datamodelFeatureRealEstate import Gemeinde, Kanton, Dokument, DokumentTyp, Kontext
|
|
from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector
|
|
from modules.aicore.aicorePluginTavily import AiTavily
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
KANTON_NAMES = {
|
|
"AG": "Aargau", "AI": "Appenzell Innerrhoden", "AR": "Appenzell Ausserrhoden",
|
|
"BE": "Bern", "BL": "Basel-Landschaft", "BS": "Basel-Stadt",
|
|
"FR": "Freiburg", "GE": "Genf", "GL": "Glarus", "GR": "Graubünden",
|
|
"JU": "Jura", "LU": "Luzern", "NE": "Neuenburg", "NW": "Nidwalden",
|
|
"OW": "Obwalden", "SG": "St. Gallen", "SH": "Schaffhausen", "SO": "Solothurn",
|
|
"SZ": "Schwyz", "TG": "Thurgau", "TI": "Tessin", "UR": "Uri",
|
|
"VD": "Waadt", "VS": "Wallis", "ZG": "Zug", "ZH": "Zürich",
|
|
}
|
|
|
|
# Known direct BZO PDF URLs for municipalities (by normalized name, lowercase)
|
|
# These are tried first to avoid SSL/HTML issues with Tavily search results
|
|
KNOWN_BZO_PDF_URLS: Dict[str, str] = {
|
|
"schlieren": "https://www.schlieren.ch/_docn/6239470/SKR_10.10_Bauordnung.pdf",
|
|
"zürich": "https://www.stadt-zuerich.ch/content/dam/stzh/portal/Deutsch/AmtlicheSammlung/Erlasse/700/100/700.100%20Bau-%20und%20Zonenordnung%20V2.pdf",
|
|
"zurich": "https://www.stadt-zuerich.ch/content/dam/stzh/portal/Deutsch/AmtlicheSammlung/Erlasse/700/100/700.100%20Bau-%20und%20Zonenordnung%20V2.pdf",
|
|
}
|
|
|
|
|
|
def _get_language_from_kanton(kanton_abk: Optional[str]) -> str:
|
|
if not kanton_abk:
|
|
return "de"
|
|
if kanton_abk.upper() in {"VD", "GE", "NE", "JU"}:
|
|
return "fr"
|
|
if kanton_abk.upper() == "TI":
|
|
return "it"
|
|
return "de"
|
|
|
|
|
|
# Swiss news/media domains to exclude from BZO search (return HTML articles, not PDFs)
|
|
_EXCLUDE_BZO_DOMAINS = [
|
|
"limmattalerzeitung.ch",
|
|
"20min.ch",
|
|
"tagesanzeiger.ch",
|
|
"nzz.ch",
|
|
"blick.ch",
|
|
"watson.ch",
|
|
"srf.ch",
|
|
"swissinfo.ch",
|
|
"zukunft-schlieren.ch", # project/development site, not official BZO
|
|
]
|
|
|
|
# Keywords that indicate the actual BZO regulation document (at least one required in URL/title)
|
|
_BZO_ORDINANCE_KEYWORDS = (
|
|
"bzo",
|
|
"zonenordnung",
|
|
"bauordnung",
|
|
"bau-und-zonenordnung",
|
|
"bau und zonenordnung",
|
|
"plan d'aménagement",
|
|
"règlement de construction",
|
|
"piano di utilizzazione",
|
|
"regolamento edilizio",
|
|
)
|
|
|
|
# Keywords that indicate articles or project docs (exclude if present in URL/title)
|
|
_BZO_ARTICLE_PROJECT_KEYWORDS = (
|
|
"ld.", # article ID (e.g. ld.2805321)
|
|
"warum", # "why" - typical in article headlines
|
|
"ruft ", # "calls [population to participate]"
|
|
"artikel", # article
|
|
"news",
|
|
"projektplanung", # project planning
|
|
"projekt/", # URL path for project pages
|
|
"/projekt",
|
|
"entwicklungsplan", # development plan (project doc)
|
|
)
|
|
|
|
|
|
def _normalize_gemeinde_for_match(name: str) -> str:
|
|
"""Normalize Gemeinde name for URL/title matching (lowercase, no umlauts)."""
|
|
if not name:
|
|
return ""
|
|
s = name.lower().strip()
|
|
s = s.replace("ä", "ae").replace("ö", "oe").replace("ü", "ue").replace("ß", "ss")
|
|
s = "".join(c for c in s if c.isalnum())
|
|
return s
|
|
|
|
|
|
def _get_bzo_search_query(gemeinde_label: str, language: str) -> str:
|
|
"""Build search query targeting BZO PDF documents (not articles)."""
|
|
if language == "fr":
|
|
return f"Plan d'aménagement local {gemeinde_label} PDF"
|
|
if language == "it":
|
|
return f"Piano di utilizzazione {gemeinde_label} PDF"
|
|
return f"Bau und Zonenordnung {gemeinde_label} PDF"
|
|
|
|
|
|
async def ensure_single_gemeinde(
|
|
interface: Any,
|
|
mandateId: str,
|
|
instanceId: str,
|
|
gemeinde_name: str,
|
|
) -> Optional[Any]:
|
|
"""
|
|
Ensure the given Gemeinde exists in DB. Fetches ONLY that one Gemeinde from Swiss Topo
|
|
and creates it if not found. No bulk import.
|
|
Returns the Gemeinde object if found/created, None otherwise.
|
|
"""
|
|
if not gemeinde_name or not gemeinde_name.strip():
|
|
return None
|
|
try:
|
|
connector = SwissTopoMapServerConnector()
|
|
gd = await connector.get_gemeinde_by_name(gemeinde_name)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching Gemeinde '{gemeinde_name}' from Swiss Topo: {e}", exc_info=True)
|
|
return None
|
|
if not gd:
|
|
logger.warning(f"Gemeinde '{gemeinde_name}' not found in Swiss Topo")
|
|
return None
|
|
|
|
def find_gemeinde_by_bfs_nummer(bfs_nummer: str) -> Optional[Any]:
|
|
try:
|
|
gemeinden = interface.getGemeinden(recordFilter={"mandateId": mandateId})
|
|
for g in gemeinden:
|
|
for k in (g.kontextInformationen or []):
|
|
try:
|
|
data = json.loads(k.inhalt) if isinstance(k.inhalt, str) else k.inhalt
|
|
if isinstance(data, dict) and str(data.get("bfs_nummer")) == str(bfs_nummer):
|
|
return g
|
|
except (json.JSONDecodeError, AttributeError):
|
|
continue
|
|
except Exception as ex:
|
|
logger.error(f"Error finding Gemeinde by BFS {bfs_nummer}: {ex}", exc_info=True)
|
|
return None
|
|
|
|
existing = find_gemeinde_by_bfs_nummer(str(gd["bfs_nummer"]))
|
|
if existing:
|
|
logger.info(f"Gemeinde '{gd['name']}' already in DB")
|
|
return existing
|
|
|
|
kanton_abk = gd.get("kanton")
|
|
kanton_id = None
|
|
if kanton_abk:
|
|
kantone = interface.getKantone(recordFilter={"mandateId": mandateId, "abk": kanton_abk})
|
|
if kantone:
|
|
kanton_id = kantone[0].id
|
|
else:
|
|
try:
|
|
kanton_label = KANTON_NAMES.get(kanton_abk, kanton_abk)
|
|
kanton = Kanton(
|
|
mandateId=mandateId,
|
|
featureInstanceId=instanceId,
|
|
label=kanton_label,
|
|
abk=kanton_abk,
|
|
)
|
|
created_k = interface.createKanton(kanton)
|
|
if created_k and created_k.id:
|
|
kanton_id = created_k.id
|
|
except Exception as ex:
|
|
logger.error(f"Error creating Kanton {kanton_abk}: {ex}")
|
|
|
|
try:
|
|
gemeinde = Gemeinde(
|
|
mandateId=mandateId,
|
|
featureInstanceId=instanceId,
|
|
label=gd["name"],
|
|
id_kanton=kanton_id,
|
|
kontextInformationen=[
|
|
Kontext(thema="BFS Nummer", inhalt=json.dumps({"bfs_nummer": gd["bfs_nummer"]}, ensure_ascii=False))
|
|
],
|
|
)
|
|
created = interface.createGemeinde(gemeinde)
|
|
if created and created.id:
|
|
logger.info(f"Created single Gemeinde '{gd['name']}' (BFS {gd['bfs_nummer']})")
|
|
return created
|
|
except Exception as ex:
|
|
logger.error(f"Error creating Gemeinde '{gd['name']}': {ex}", exc_info=True)
|
|
return None
|
|
|
|
|
|
def _extract_quelle(doc: Any) -> Optional[str]:
|
|
"""Extract quelle (source URL) from a document."""
|
|
return getattr(doc, "quelle", None) or (doc.get("quelle") if isinstance(doc, dict) else None)
|
|
|
|
|
|
async def fetch_bzo_for_gemeinde(
|
|
interface: Any,
|
|
componentInterface: Any,
|
|
gemeinde: Any,
|
|
mandateId: str,
|
|
instanceId: str,
|
|
) -> bool:
|
|
"""
|
|
Search for and download BZO documents for a single Gemeinde.
|
|
Returns True if at least one document was created.
|
|
Deduplication: re-fetches Gemeinde, skips if BZO exists, skips URLs we already have,
|
|
creates at most 1 new document per call to avoid duplicates from multiple Tavily URLs.
|
|
"""
|
|
# Re-fetch Gemeinde to get latest dokumente (avoid race with concurrent requests)
|
|
fresh = interface.getGemeinde(gemeinde.id)
|
|
if not fresh:
|
|
return False
|
|
gemeinde = fresh
|
|
|
|
existing_bzo = False
|
|
existing_quellen: Set[str] = set()
|
|
if gemeinde.dokumente:
|
|
for doc in gemeinde.dokumente:
|
|
typ = getattr(doc, "dokumentTyp", None) or (doc.get("dokumentTyp") if isinstance(doc, dict) else None)
|
|
label = getattr(doc, "label", None) or (doc.get("label") if isinstance(doc, dict) else None)
|
|
q = _extract_quelle(doc)
|
|
if q:
|
|
existing_quellen.add(q)
|
|
if typ in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]:
|
|
existing_bzo = True
|
|
break
|
|
if label and any(x in (label or "").upper() for x in ("BZO", "BAU UND ZONENORDNUNG", "PLAN D'AMÉNAGEMENT", "RÈGLEMENT DE CONSTRUCTION", "PIANO DI", "REGOLAMENTO EDILIZIO")):
|
|
existing_bzo = True
|
|
break
|
|
if existing_bzo:
|
|
return True
|
|
|
|
kanton_abk = None
|
|
if gemeinde.id_kanton:
|
|
k = interface.getKanton(gemeinde.id_kanton)
|
|
if k:
|
|
kanton_abk = k.abk
|
|
language = _get_language_from_kanton(kanton_abk)
|
|
search_query = _get_bzo_search_query(gemeinde.label, language)
|
|
|
|
logger.info(f"Tavily BZO search for {gemeinde.label}: {search_query}")
|
|
tavily = AiTavily()
|
|
gemeinde_normalized = _normalize_gemeinde_for_match(gemeinde.label or "")
|
|
|
|
search_results = await tavily._search(
|
|
query=search_query,
|
|
maxResults=10,
|
|
country="switzerland",
|
|
excludeDomains=_EXCLUDE_BZO_DOMAINS,
|
|
)
|
|
if not search_results:
|
|
logger.warning(f"No Tavily search results for BZO of {gemeinde.label}")
|
|
return False
|
|
logger.info(f"Tavily returned {len(search_results)} results for BZO of {gemeinde.label}")
|
|
|
|
# Filter: ONLY keep PDF URLs that are the actual BZO ordinance (not articles/project docs)
|
|
def _is_valid_bzo_result(url: str, title: str) -> bool:
|
|
combined = f"{url} {title}".lower()
|
|
combined_norm = _normalize_gemeinde_for_match(combined)
|
|
# 1. Gemeinde name MUST appear in URL or title
|
|
if not gemeinde_normalized or gemeinde_normalized not in combined_norm:
|
|
return False
|
|
# 2. MUST contain BZO ordinance keyword (actual regulation, not just "about" it)
|
|
if not any(kw in combined for kw in _BZO_ORDINANCE_KEYWORDS):
|
|
return False
|
|
# 3. EXCLUDE if it looks like an article or project planning doc
|
|
if any(kw in combined for kw in _BZO_ARTICLE_PROJECT_KEYWORDS):
|
|
return False
|
|
return True
|
|
|
|
pdf_urls = [
|
|
r.url
|
|
for r in search_results
|
|
if (r.url.lower().endswith(".pdf") or "/pdf" in r.url.lower())
|
|
and _is_valid_bzo_result(r.url, r.title or "")
|
|
]
|
|
if not pdf_urls:
|
|
logger.warning(
|
|
f"No PDF URLs with matching Gemeinde name for {gemeinde.label} "
|
|
f"(filtered {len(search_results)} results, requiring .pdf and name in URL/title)"
|
|
)
|
|
return False
|
|
|
|
# Prepend known direct PDF URLs for this Gemeinde (avoids SSL/HTML issues with Tavily results)
|
|
gemeinde_key = gemeinde.label.strip().lower() if gemeinde.label else ""
|
|
if gemeinde_key and gemeinde_key in KNOWN_BZO_PDF_URLS:
|
|
known_url = KNOWN_BZO_PDF_URLS[gemeinde_key]
|
|
pdf_urls = [known_url] + [u for u in pdf_urls if u != known_url]
|
|
logger.info(f"Using known BZO PDF URL for {gemeinde.label}")
|
|
|
|
# Use ssl.CERT_NONE to avoid CERTIFICATE_VERIFY_FAILED on Windows/corporate environments
|
|
# (same approach as routeRealEstate for external HTTP requests)
|
|
ssl_context = ssl.create_default_context()
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_NONE
|
|
connector = aiohttp.TCPConnector(ssl=ssl_context)
|
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "application/pdf,*/*"}
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
|
|
|
async def download_pdf(session: aiohttp.ClientSession, url: str) -> Optional[bytes]:
|
|
for attempt in range(3):
|
|
try:
|
|
async with session.get(url, allow_redirects=True) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.read()
|
|
if data and len(data) >= 100 and data.startswith(b"%PDF"):
|
|
return data
|
|
if data.startswith(b"<") or data.startswith(b"<!DOCTYPE"):
|
|
raise Exception("Server returned HTML instead of PDF")
|
|
elif resp.status == 406 and attempt < 2:
|
|
await asyncio.sleep(2)
|
|
continue
|
|
else:
|
|
raise Exception(f"HTTP {resp.status}")
|
|
except Exception:
|
|
if attempt >= 2:
|
|
raise
|
|
await asyncio.sleep(2)
|
|
return None
|
|
|
|
created_dokumente: List[Any] = []
|
|
current_dokumente = list(gemeinde.dokumente) if gemeinde.dokumente else []
|
|
safe_name = "".join(c for c in gemeinde.label if c.isalnum() or c in (" ", "-", "_")).strip().replace(" ", "_") or "Gemeinde"
|
|
base_label = f"BZO {gemeinde.label}" if language == "de" else (f"Plan d'aménagement local {gemeinde.label}" if language == "fr" else f"Piano di utilizzazione {gemeinde.label}")
|
|
|
|
# Track content hashes to avoid duplicate PDFs from different URLs
|
|
seen_content_hashes: Set[str] = set()
|
|
|
|
async with aiohttp.ClientSession(timeout=timeout, headers=headers, connector=connector) as session:
|
|
for idx, pdf_url in enumerate(pdf_urls[:5]):
|
|
# Skip URL we already have
|
|
if pdf_url in existing_quellen:
|
|
logger.debug(f"Skipping duplicate URL for {gemeinde.label}: {pdf_url[:60]}...")
|
|
continue
|
|
try:
|
|
pdf_content = await download_pdf(session, pdf_url)
|
|
if not pdf_content or len(pdf_content) < 100:
|
|
continue
|
|
# Deduplicate by content hash (same PDF from different URLs)
|
|
content_hash = hashlib.sha256(pdf_content[:8192]).hexdigest()
|
|
if content_hash in seen_content_hashes:
|
|
logger.debug(f"Skipping duplicate content for {gemeinde.label} (hash match)")
|
|
continue
|
|
seen_content_hashes.add(content_hash)
|
|
|
|
file_name = f"BZO_{safe_name}.pdf"
|
|
doc_label = base_label
|
|
file_item = componentInterface.createFile(name=file_name, mimeType="application/pdf", content=pdf_content)
|
|
componentInterface.createFileData(file_item.id, pdf_content)
|
|
dokument = Dokument(
|
|
mandateId=mandateId,
|
|
featureInstanceId=instanceId,
|
|
label=doc_label,
|
|
versionsbezeichnung="Aktuell",
|
|
dokumentTyp=DokumentTyp.GEMEINDE_BZO_AKTUELL,
|
|
dokumentReferenz=file_item.id,
|
|
quelle=pdf_url,
|
|
mimeType="application/pdf",
|
|
kategorienTags=["BZO", "Bauordnung", gemeinde.label],
|
|
)
|
|
created_dok = interface.createDokument(dokument)
|
|
created_dokumente.append(created_dok)
|
|
current_dokumente.append(created_dok)
|
|
existing_quellen.add(pdf_url)
|
|
# Create at most 1 BZO document per Gemeinde to prevent duplicates
|
|
logger.info(f"Created BZO document for {gemeinde.label}, stopping (1 doc per Gemeinde)")
|
|
break
|
|
except Exception as ex:
|
|
logger.warning(f"Error downloading BZO for {gemeinde.label} from {pdf_url}: {ex}")
|
|
continue
|
|
|
|
if created_dokumente:
|
|
interface.updateGemeinde(gemeinde.id, {"dokumente": current_dokumente})
|
|
logger.info(f"Created {len(created_dokumente)} BZO document(s) for {gemeinde.label}")
|
|
return True
|
|
return False
|