gateway/modules/routes/routeRealEstateScraping.py

879 lines
38 KiB
Python

"""
Real Estate scraping routes for the backend API.
Implements endpoints for scraping real estate data from external sources.
"""
import logging
import json
import aiohttp
import asyncio
from typing import Optional, Dict, Any
from fastapi import APIRouter, HTTPException, Depends, Body, Request, Query, status
# Import auth modules
from modules.auth import limiter, getCurrentUser
# Import models
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelRealEstate import (
Gemeinde,
Kanton,
Dokument,
Kontext,
DokumentTyp,
)
# Import interfaces
from modules.interfaces.interfaceDbRealEstateObjects import getInterface as getRealEstateInterface
from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface
# Import scraping script
from modules.features.realEstate.scrapeSwissTopo import scrape_switzerland
# Import Swiss Topo MapServer connector
from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector
from modules.connectors.connectorOerebWfs import OerebWfsConnector
# Import Tavily connector for BZO document search
from modules.aicore.aicorePluginTavily import AiTavily
# Configure logger
logger = logging.getLogger(__name__)
# Create router for real estate scraping endpoints
router = APIRouter(
prefix="/api/realestate",
tags=["Real Estate Scraping"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"}
}
)
@router.post("/scrape-switzerland", response_model=Dict[str, Any])
@limiter.limit("5/hour") # Limit to 5 requests per hour (scraping is resource-intensive)
async def scrape_switzerland_route(
request: Request,
body: Dict[str, Any] = Body(..., description="Scraping parameters"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Scrape Kanton Zürich systematically using Swiss Topo connector and save parcel data to database.
This endpoint divides Kanton Zürich into a grid and queries parcels at each grid point,
then deduplicates and saves unique parcels to the database. For each parcel, it also
queries the ÖREB WFS service to retrieve bauzone information.
**WARNING**: This is a resource-intensive operation that may take a long time
and make many API requests. Use with caution.
Request Body:
{
"grid_size": 500.0, // Grid cell size in meters (default: 500m)
"max_concurrent": 50, // Maximum concurrent API requests (default: 50)
"batch_size": 100 // Number of parcels to process before saving (default: 100)
}
Headers:
- X-CSRF-Token: CSRF token (required for security)
Returns:
{
"success": true,
"stats": {
"total_queries": 1234,
"successful_queries": 1200,
"failed_queries": 34,
"unique_parcels_found": 500,
"parcels_saved": 450,
"parcels_skipped": 50,
"error_count": 5,
"errors": [...]
}
}
Example:
- POST /api/realestate/scrape-switzerland
Body: {"grid_size": 1000.0, "max_concurrent": 5, "batch_size": 50}
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for POST /api/realestate/scrape-switzerland from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for POST /api/realestate/scrape-switzerland from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for POST /api/realestate/scrape-switzerland from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Extract parameters from body with defaults
grid_size = body.get("grid_size", 500.0)
max_concurrent = body.get("max_concurrent", 50)
batch_size = body.get("batch_size", 100)
# Validate parameters
if grid_size <= 0 or grid_size > 10000:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="grid_size must be between 0 and 10000 meters"
)
if max_concurrent <= 0 or max_concurrent > 200:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="max_concurrent must be between 1 and 200"
)
if batch_size <= 0 or batch_size > 1000:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="batch_size must be between 1 and 1000"
)
logger.info(
f"Starting Switzerland scraping for user {currentUser.id} (mandate: {currentUser.mandateId}) "
f"with grid_size={grid_size}, max_concurrent={max_concurrent}, batch_size={batch_size}"
)
# Run scraping operation
result = await scrape_switzerland(
current_user=currentUser,
grid_size=grid_size,
max_concurrent=max_concurrent,
batch_size=batch_size
)
logger.info(
f"Scraping completed for user {currentUser.id}: "
f"{result['stats']['parcels_saved']} parcels saved"
)
return result
except HTTPException:
raise
except ValueError as e:
logger.error(f"Validation error in scrape_switzerland_route: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Validation error: {str(e)}"
)
except Exception as e:
logger.error(f"Error scraping Switzerland: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error scraping Switzerland: {str(e)}"
)
@router.get("/gemeinden", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def get_all_gemeinden(
request: Request,
only_current: bool = Query(True, description="Only return current municipalities (exclude historical)"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Fetch all Gemeinden (municipalities) from the Swiss Topo MapServer connector
and save them to the database.
This endpoint:
1. Fetches all Swiss municipalities from the Swiss Federal Office of Topography
2. Saves them to the database (skipping duplicates based on BFS number)
3. Creates Kantone (cantons) as needed
4. Returns statistics about the import operation
Query Parameters:
- only_current: If True, only return current municipalities (default: True).
If False, return all municipalities including historical ones.
Headers:
- X-CSRF-Token: CSRF token (required for security)
Returns:
{
"gemeinden": [
{
"id": "uuid",
"mandateId": "uuid",
"label": "Bern",
"id_kanton": "uuid",
"kontextInformationen": [...],
...
},
...
],
"count": 2162,
"stats": {
"gemeinden_created": 2100,
"gemeinden_skipped": 62,
"kantone_created": 26,
"error_count": 0,
"errors": []
}
}
Example:
- GET /api/realestate/gemeinden
- GET /api/realestate/gemeinden?only_current=false
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for GET /api/realestate/gemeinden from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for GET /api/realestate/gemeinden from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for GET /api/realestate/gemeinden from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Fetching all Gemeinden for user {currentUser.id} (mandate: {currentUser.mandateId}), only_current={only_current}")
# Initialize connectors and fetch all gemeinden
oereb_connector = OerebWfsConnector()
connector = SwissTopoMapServerConnector(oereb_connector=oereb_connector)
gemeinden_data = await connector.get_all_gemeinden(only_current=only_current)
# Get interface for database operations
realEstateInterface = getRealEstateInterface(currentUser)
# Statistics
gemeinden_created = 0
gemeinden_skipped = 0
kantone_created = 0
errors = []
# Cache for Kanton UUIDs
kanton_cache: Dict[str, str] = {}
# Helper function to find Gemeinde by BFS number
def find_gemeinde_by_bfs_nummer(bfs_nummer: str) -> Optional[Gemeinde]:
"""Find existing Gemeinde by BFS number (stored in kontextInformationen)."""
try:
gemeinden = realEstateInterface.getGemeinden(
recordFilter={"mandateId": currentUser.mandateId}
)
for gemeinde in gemeinden:
# Check kontextInformationen for bfs_nummer
for kontext in gemeinde.kontextInformationen:
try:
kontext_data = json.loads(kontext.inhalt) if isinstance(kontext.inhalt, str) else kontext.inhalt
if isinstance(kontext_data, dict):
if str(kontext_data.get("bfs_nummer")) == str(bfs_nummer):
return gemeinde
except (json.JSONDecodeError, AttributeError):
continue
return None
except Exception as e:
logger.error(f"Error finding Gemeinde by BFS number {bfs_nummer}: {e}", exc_info=True)
return None
# Helper function to get or create Kanton
def get_or_create_kanton(kanton_abk: str) -> Optional[str]:
"""Get or create a Kanton by abbreviation."""
nonlocal kantone_created, errors
if not kanton_abk:
return None
# Check cache first
if kanton_abk in kanton_cache:
return kanton_cache[kanton_abk]
# Check if exists
kantone = realEstateInterface.getKantone(
recordFilter={
"mandateId": currentUser.mandateId,
"abk": kanton_abk
}
)
if kantone:
kanton_cache[kanton_abk] = kantone[0].id
return kantone[0].id
# Create new Kanton
try:
# Map common abbreviations to full names
kanton_names = {
"AG": "Aargau", "AI": "Appenzell Innerrhoden", "AR": "Appenzell Ausserrhoden",
"BE": "Bern", "BL": "Basel-Landschaft", "BS": "Basel-Stadt",
"FR": "Freiburg", "GE": "Genf", "GL": "Glarus", "GR": "Graubünden",
"JU": "Jura", "LU": "Luzern", "NE": "Neuenburg", "NW": "Nidwalden",
"OW": "Obwalden", "SG": "St. Gallen", "SH": "Schaffhausen", "SO": "Solothurn",
"SZ": "Schwyz", "TG": "Thurgau", "TI": "Tessin", "UR": "Uri",
"VD": "Waadt", "VS": "Wallis", "ZG": "Zug", "ZH": "Zürich"
}
kanton_label = kanton_names.get(kanton_abk, kanton_abk)
kanton = Kanton(
mandateId=currentUser.mandateId,
label=kanton_label,
abk=kanton_abk
)
created_kanton = realEstateInterface.createKanton(kanton)
if created_kanton and created_kanton.id:
kanton_cache[kanton_abk] = created_kanton.id
kantone_created += 1
logger.info(f"Created new Kanton: {kanton_label} ({kanton_abk})")
return created_kanton.id
except Exception as e:
error_msg = f"Error creating Kanton {kanton_abk}: {e}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
return None
# Process each gemeinde and save to database
saved_gemeinden = []
for gemeinde_data in gemeinden_data:
try:
gemeinde_name = gemeinde_data.get("name")
bfs_nummer = gemeinde_data.get("bfs_nummer")
kanton_abk = gemeinde_data.get("kanton")
if not gemeinde_name or not bfs_nummer:
logger.warning(f"Skipping Gemeinde with missing data: {gemeinde_data}")
gemeinden_skipped += 1
continue
# Check if Gemeinde already exists
existing_gemeinde = find_gemeinde_by_bfs_nummer(str(bfs_nummer))
if existing_gemeinde:
logger.debug(f"Gemeinde {gemeinde_name} (BFS: {bfs_nummer}) already exists, skipping")
gemeinden_skipped += 1
saved_gemeinden.append(existing_gemeinde.model_dump() if hasattr(existing_gemeinde, 'model_dump') else existing_gemeinde)
continue
# Get or create Kanton
kanton_id = get_or_create_kanton(kanton_abk) if kanton_abk else None
# Create new Gemeinde
gemeinde = Gemeinde(
mandateId=currentUser.mandateId,
label=gemeinde_name,
id_kanton=kanton_id,
kontextInformationen=[
Kontext(
thema="BFS Nummer",
inhalt=json.dumps({"bfs_nummer": bfs_nummer}, ensure_ascii=False)
)
]
)
created_gemeinde = realEstateInterface.createGemeinde(gemeinde)
if created_gemeinde and created_gemeinde.id:
gemeinden_created += 1
logger.info(f"Created new Gemeinde: {gemeinde_name} (BFS: {bfs_nummer})")
saved_gemeinden.append(created_gemeinde.model_dump() if hasattr(created_gemeinde, 'model_dump') else created_gemeinde)
else:
error_msg = f"Failed to create Gemeinde {gemeinde_name} (BFS: {bfs_nummer})"
logger.error(error_msg)
errors.append(error_msg)
gemeinden_skipped += 1
except Exception as e:
error_msg = f"Error processing Gemeinde {gemeinde_data.get('name', 'Unknown')}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
gemeinden_skipped += 1
logger.info(
f"Gemeinden import completed: {gemeinden_created} created, "
f"{gemeinden_skipped} skipped, {kantone_created} Kantone created"
)
return {
"gemeinden": saved_gemeinden,
"count": len(saved_gemeinden),
"stats": {
"gemeinden_created": gemeinden_created,
"gemeinden_skipped": gemeinden_skipped,
"kantone_created": kantone_created,
"error_count": len(errors),
"errors": errors[:10] # Return first 10 errors
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching all Gemeinden: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error fetching Gemeinden: {str(e)}"
)
def _get_language_from_kanton(kanton_abk: Optional[str]) -> str:
"""
Determine language (German/French/Italian) based on Kanton abbreviation.
Args:
kanton_abk: Kanton abbreviation (e.g., 'ZH', 'VD', 'TI')
Returns:
Language code: 'de' (German), 'fr' (French), or 'it' (Italian)
"""
if not kanton_abk:
return 'de' # Default to German
# French-speaking cantons
french_cantons = {'VD', 'GE', 'NE', 'JU'}
# Italian-speaking canton
italian_cantons = {'TI'}
kanton_upper = kanton_abk.upper()
if kanton_upper in french_cantons:
return 'fr'
elif kanton_upper in italian_cantons:
return 'it'
else:
return 'de' # Default to German
def _get_bzo_search_query(gemeinde_label: str, language: str) -> str:
"""
Generate language-specific BZO search query for a Gemeinde.
Args:
gemeinde_label: Name of the Gemeinde
language: Language code ('de', 'fr', 'it')
Returns:
Search query string
"""
if language == 'fr':
# French: Plan d'aménagement local or Règlement de construction
return f"Plan d'aménagement local {gemeinde_label} OR Règlement de construction {gemeinde_label}"
elif language == 'it':
# Italian: Piano di utilizzazione or Regolamento edilizio
return f"Piano di utilizzazione {gemeinde_label} OR Regolamento edilizio {gemeinde_label}"
else:
# German: Bau und Zonenordnung
return f"Bau und Zonenordnung {gemeinde_label}"
@router.post("/gemeinden/fetch-bzo-documents", response_model=Dict[str, Any])
@limiter.limit("10/hour") # Resource-intensive operation
async def fetch_bzo_documents(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Search for and download Bau und Zonenordnung (BZO) documents for all Gemeinden.
This endpoint:
1. Fetches all Gemeinden from the database
2. For each Gemeinde, determines language based on Kanton
3. Uses Tavily search to find BZO documents (up to 5 results)
4. Downloads all PDF files found and stores them with content
5. Creates Dokument records for each PDF and links them to Gemeinde's dokumente field
6. Skips Gemeinden that already have BZO documents
Note: If Tavily returns multiple PDF results, all of them will be downloaded
and saved as separate Dokument records.
Headers:
- X-CSRF-Token: CSRF token (required for security)
Returns:
{
"success": true,
"stats": {
"gemeinden_processed": 100,
"documents_created": 85,
"documents_skipped": 15,
"errors": []
},
"results": [
{
"gemeinde_id": "...",
"gemeinde_label": "Zürich",
"status": "created|skipped|error",
"dokument_ids": ["...", "..."], // List of created document IDs (can be multiple)
"error": null
}
]
}
"""
try:
# Validate CSRF token
csrf_token = request.headers.get("X-CSRF-Token") or request.headers.get("x-csrf-token")
if not csrf_token:
logger.warning(f"CSRF token missing for POST /api/realestate/gemeinden/fetch-bzo-documents from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="CSRF token missing. Please include X-CSRF-Token header."
)
# Basic CSRF token format validation
if not isinstance(csrf_token, str) or len(csrf_token) < 16 or len(csrf_token) > 64:
logger.warning(f"Invalid CSRF token format for POST /api/realestate/gemeinden/fetch-bzo-documents from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
# Validate token is hex string
try:
int(csrf_token, 16)
except ValueError:
logger.warning(f"CSRF token is not a valid hex string for POST /api/realestate/gemeinden/fetch-bzo-documents from user {currentUser.id}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid CSRF token format"
)
logger.info(f"Starting BZO document fetch for user {currentUser.id} (mandate: {currentUser.mandateId})")
# Get interfaces
realEstateInterface = getRealEstateInterface(currentUser)
componentInterface = getComponentInterface(currentUser)
# Initialize Tavily connector
tavily = AiTavily()
# Get all Gemeinden
gemeinden = realEstateInterface.getGemeinden(
recordFilter={"mandateId": currentUser.mandateId}
)
logger.info(f"Found {len(gemeinden)} Gemeinden to process")
# Statistics
stats = {
"gemeinden_processed": 0,
"documents_created": 0,
"documents_skipped": 0,
"errors": []
}
results = []
# Process each Gemeinde
for gemeinde in gemeinden:
gemeinde_result = {
"gemeinde_id": gemeinde.id,
"gemeinde_label": gemeinde.label,
"status": None,
"dokument_ids": [], # Changed to list to support multiple documents
"error": None
}
try:
stats["gemeinden_processed"] += 1
# Check if Gemeinde already has a BZO document
existing_bzo = False
if gemeinde.dokumente:
for doc in gemeinde.dokumente:
# Check if it's a BZO document by label or dokumentTyp
if (doc.label and ("BZO" in doc.label.upper() or "BAU UND ZONENORDNUNG" in doc.label.upper() or
"PLAN D'AMÉNAGEMENT" in doc.label.upper() or "RÈGLEMENT DE CONSTRUCTION" in doc.label.upper() or
"PIANO DI UTILIZZAZIONE" in doc.label.upper() or "REGOLAMENTO EDILIZIO" in doc.label.upper())) or \
(doc.dokumentTyp and doc.dokumentTyp in [DokumentTyp.GEMEINDE_BZO_AKTUELL, DokumentTyp.GEMEINDE_BZO_REVISION]):
existing_bzo = True
break
if existing_bzo:
logger.debug(f"Gemeinde {gemeinde.label} already has BZO document, skipping")
gemeinde_result["status"] = "skipped"
stats["documents_skipped"] += 1
results.append(gemeinde_result)
continue
# Get Kanton to determine language
kanton_abk = None
if gemeinde.id_kanton:
kanton = realEstateInterface.getKanton(gemeinde.id_kanton)
if kanton:
kanton_abk = kanton.abk
# Determine language
language = _get_language_from_kanton(kanton_abk)
# Generate search query
search_query = _get_bzo_search_query(gemeinde.label, language)
logger.info(f"Searching for BZO document for {gemeinde.label} (language: {language}) with query: {search_query}")
# Search with Tavily using the private _search method
search_results = await tavily._search(
query=search_query,
maxResults=5,
country="switzerland"
)
if not search_results:
logger.warning(f"No search results found for {gemeinde.label}")
gemeinde_result["status"] = "error"
gemeinde_result["error"] = "No search results found"
stats["errors"].append(f"{gemeinde.label}: No search results found")
results.append(gemeinde_result)
continue
# Find all PDF URLs from search results
pdf_urls = []
for result in search_results:
url = result.url.lower()
if url.endswith('.pdf') or 'pdf' in url:
pdf_urls.append(result.url)
# If no PDF URLs found, try to use all results (they might be PDFs even without .pdf extension)
if not pdf_urls:
pdf_urls = [result.url for result in search_results]
logger.info(f"No explicit PDF URLs found for {gemeinde.label}, trying all {len(pdf_urls)} results")
logger.info(f"Found {len(pdf_urls)} potential PDF documents for {gemeinde.label}")
# Helper function to download a single PDF
async def download_pdf(pdf_url: str) -> Optional[bytes]:
"""Download a PDF from a URL with retry logic."""
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
# Create headers - use minimal headers on retry after 406 error
if attempt > 0:
# Minimal headers for retry
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': '*/*'
}
else:
# Full headers for first attempt
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/pdf,application/octet-stream,*/*',
'Accept-Language': 'de-DE,de;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
timeout = aiohttp.ClientTimeout(total=30, connect=10)
async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:
async with session.get(pdf_url, allow_redirects=True) as response:
if response.status == 200:
pdf_content = await response.read()
if not pdf_content or len(pdf_content) < 100: # Minimum size check
raise Exception("Downloaded file is too small or empty")
# Verify it's actually a PDF (check PDF magic bytes)
if not pdf_content.startswith(b'%PDF'):
# Check if it's HTML (common error page)
if pdf_content.startswith(b'<') or pdf_content.startswith(b'<!DOCTYPE'):
raise Exception("Server returned HTML instead of PDF")
logger.warning(f"Downloaded file from {pdf_url} doesn't appear to be a PDF, but continuing anyway")
# Success - return content
return pdf_content
elif response.status == 406:
# Not Acceptable - try with minimal headers on next attempt
logger.warning(f"HTTP 406 for {pdf_url}, will retry with minimal headers (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
else:
raise Exception(f"HTTP {response.status} (Not Acceptable) - server rejected request after {max_retries} attempts")
elif response.status in [301, 302, 303, 307, 308]:
# Redirect - should be handled by allow_redirects=True, but log it
logger.warning(f"Redirect {response.status} for {pdf_url}, following...")
pdf_content = await response.read()
return pdf_content
else:
raise Exception(f"HTTP {response.status} when downloading PDF")
except asyncio.TimeoutError:
logger.warning(f"Timeout downloading PDF from {pdf_url} (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
else:
raise Exception("Connection timeout after retries")
except aiohttp.ClientError as e:
logger.warning(f"Connection error downloading PDF from {pdf_url} (attempt {attempt + 1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
else:
raise Exception(f"Connection error: {str(e)}")
except Exception as e:
# For other errors, don't retry
raise
return None
# Process all PDF URLs
created_dokumente = []
current_dokumente = list(gemeinde.dokumente) if gemeinde.dokumente else []
# Sanitize Gemeinde name for filename
safe_name = "".join(c for c in gemeinde.label if c.isalnum() or c in (' ', '-', '_')).strip()
safe_name = safe_name.replace(' ', '_')
if not safe_name:
safe_name = "Gemeinde"
# Determine base label based on language
if language == 'fr':
base_doc_label = f"Plan d'aménagement local {gemeinde.label}"
elif language == 'it':
base_doc_label = f"Piano di utilizzazione {gemeinde.label}"
else:
base_doc_label = f"BZO {gemeinde.label}"
# Process each PDF URL
for idx, pdf_url in enumerate(pdf_urls):
try:
logger.info(f"Downloading PDF {idx + 1}/{len(pdf_urls)} from {pdf_url} for {gemeinde.label}")
pdf_content = await download_pdf(pdf_url)
if not pdf_content:
logger.warning(f"Failed to download PDF from {pdf_url} for {gemeinde.label}")
continue
# Additional validation
if len(pdf_content) < 100:
logger.warning(f"Downloaded file from {pdf_url} is too small for {gemeinde.label}")
continue
# Create unique file name (add index if multiple documents)
if len(pdf_urls) > 1:
file_name = f"BZO_{safe_name}_{idx + 1}.pdf"
doc_label = f"{base_doc_label} ({idx + 1})"
else:
file_name = f"BZO_{safe_name}.pdf"
doc_label = base_doc_label
# Store file using ComponentObjects
try:
file_item = componentInterface.createFile(
name=file_name,
mimeType="application/pdf",
content=pdf_content
)
# Store file data
componentInterface.createFileData(file_item.id, pdf_content)
logger.info(f"Stored file {file_name} with ID {file_item.id} for {gemeinde.label}")
except Exception as e:
logger.error(f"Error storing file {file_name} for {gemeinde.label}: {str(e)}", exc_info=True)
stats["errors"].append(f"{gemeinde.label}: File storage failed for {pdf_url} - {str(e)}")
continue
# Create Dokument record
dokument = Dokument(
mandateId=currentUser.mandateId,
label=doc_label,
versionsbezeichnung="Aktuell",
dokumentTyp=DokumentTyp.GEMEINDE_BZO_AKTUELL,
dokumentReferenz=file_item.id, # FileId from ComponentObjects
quelle=pdf_url, # Original URL
mimeType="application/pdf",
kategorienTags=["BZO", "Bauordnung", gemeinde.label]
)
# Create Dokument record in the Dokument table
created_dokument = realEstateInterface.createDokument(dokument)
logger.info(f"Created Dokument record with ID {created_dokument.id} for {gemeinde.label} (from {pdf_url})")
created_dokumente.append(created_dokument)
current_dokumente.append(created_dokument)
gemeinde_result["dokument_ids"].append(created_dokument.id)
except Exception as e:
logger.error(f"Error processing PDF {pdf_url} for {gemeinde.label}: {str(e)}", exc_info=True)
stats["errors"].append(f"{gemeinde.label}: Error processing PDF {pdf_url} - {str(e)}")
continue
# Update Gemeinde with all new dokumente
if created_dokumente:
updated_gemeinde = realEstateInterface.updateGemeinde(
gemeinde.id,
{"dokumente": current_dokumente}
)
if updated_gemeinde:
logger.info(f"Successfully created {len(created_dokumente)} BZO document(s) for {gemeinde.label}")
gemeinde_result["status"] = "created"
stats["documents_created"] += len(created_dokumente)
else:
raise Exception("Failed to update Gemeinde")
else:
# No documents were successfully created
gemeinde_result["status"] = "error"
gemeinde_result["error"] = "No PDFs could be downloaded or processed"
stats["errors"].append(f"{gemeinde.label}: No PDFs could be downloaded or processed")
except Exception as e:
logger.error(f"Error processing Gemeinde {gemeinde.label}: {str(e)}", exc_info=True)
gemeinde_result["status"] = "error"
gemeinde_result["error"] = str(e)
stats["errors"].append(f"{gemeinde.label}: {str(e)}")
results.append(gemeinde_result)
logger.info(
f"BZO document fetch completed: {stats['documents_created']} created, "
f"{stats['documents_skipped']} skipped, {len(stats['errors'])} errors"
)
return {
"success": True,
"stats": stats,
"results": results
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching BZO documents: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error fetching BZO documents: {str(e)}"
)