""" Swiss Topo Scraping Script Scrapes Switzerland systematically using the Swiss Topo connector and saves parcel data to the database. This script divides Switzerland into a grid and queries parcels at each grid point, then deduplicates and saves unique parcels to the database. """ import logging import asyncio from typing import Dict, Any, List, Set, Optional from dataclasses import dataclass import json from modules.datamodels.datamodelUam import User from .datamodelFeatureRealEstate import ( Parzelle, GeoPolylinie, GeoPunkt, Kontext, Gemeinde, Kanton, ) from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface from modules.connectors.connectorSwissTopoMapServer import SwissTopoMapServerConnector from modules.connectors.connectorOerebWfs import OerebWfsConnector logger = logging.getLogger(__name__) @dataclass class ScrapingStats: """Statistics for scraping operation.""" total_queries: int = 0 successful_queries: int = 0 failed_queries: int = 0 unique_parcels_found: int = 0 parcels_saved: int = 0 parcels_skipped: int = 0 errors: List[str] = None def __post_init__(self): if self.errors is None: self.errors = [] class SwissTopoScraper: """ Scraper for Swiss Topo parcel data. Divides Kanton Zürich into a grid and queries parcels systematically, then saves unique parcels to the database with ÖREB bauzone information. """ # Zürich canton bounds in LV95 coordinates ZURICH_BOUNDS = { "min_x": 2680000, "max_x": 2780000, "min_y": 1210000, "max_y": 1280000 } def __init__( self, current_user: User, grid_size: float = 500.0, # Grid size in meters (500m = reasonable coverage) max_concurrent: int = 50, # Maximum concurrent API requests batch_size: int = 100, # Process parcels in batches ): """ Initialize scraper. Args: current_user: User for database operations grid_size: Size of grid cells in meters (default: 500m) max_concurrent: Maximum concurrent API requests (default: 50) batch_size: Number of parcels to process before saving (default: 100) """ self.current_user = current_user self.grid_size = grid_size self.max_concurrent = max_concurrent self.batch_size = batch_size oereb_connector = OerebWfsConnector() self.connector = SwissTopoMapServerConnector(oereb_connector=oereb_connector) self.realEstateInterface = getRealEstateInterface(current_user) # Track unique parcels by EGRID or label self.unique_parcels: Dict[str, Dict[str, Any]] = {} self.stats = ScrapingStats() # Cache for Gemeinde and Kanton UUIDs to avoid repeated database queries # Key: bfs_nummer (int or str), Value: UUID (str) self.gemeinde_cache: Dict[str, str] = {} # Key: kanton abbreviation (str, e.g., "ZH"), Value: UUID (str) self.kanton_cache: Dict[str, str] = {} def _generate_grid_points(self) -> List[tuple]: """ Generate grid points covering Kanton Zürich. Returns: List of (x, y) coordinate tuples in LV95 """ bounds = self.ZURICH_BOUNDS min_x = bounds["min_x"] max_x = bounds["max_x"] min_y = bounds["min_y"] max_y = bounds["max_y"] grid_points = [] x = min_x while x <= max_x: y = min_y while y <= max_y: grid_points.append((x, y)) y += self.grid_size x += self.grid_size logger.info(f"Generated {len(grid_points)} grid points covering Kanton Zürich") return grid_points async def _query_parcel_at_point( self, x: float, y: float, semaphore: asyncio.Semaphore ) -> Optional[Dict[str, Any]]: """ Query parcel at a specific coordinate point. Args: x: X coordinate (LV95) y: Y coordinate (LV95) semaphore: Semaphore for concurrency control Returns: Parcel data dictionary or None if not found """ async with semaphore: try: self.stats.total_queries += 1 location_str = f"{x},{y}" parcel_data = await self.connector.search_parcel(location_str, tolerance=5) if parcel_data: self.stats.successful_queries += 1 return parcel_data else: self.stats.failed_queries += 1 return None except Exception as e: self.stats.failed_queries += 1 error_msg = f"Error querying parcel at ({x}, {y}): {str(e)}" logger.debug(error_msg) self.stats.errors.append(error_msg) return None def _find_gemeinde_by_bfs_nummer(self, bfs_nummer: str) -> Optional[Gemeinde]: """ Find existing Gemeinde by BFS number (stored in kontextInformationen). Args: bfs_nummer: BFS municipality number Returns: Gemeinde instance if found, None otherwise """ try: # Check cache first if bfs_nummer in self.gemeinde_cache: gemeinde_id = self.gemeinde_cache[bfs_nummer] gemeinde = self.realEstateInterface.getGemeinde(gemeinde_id) if gemeinde: return gemeinde # Search all Gemeinden and check kontextInformationen gemeinden = self.realEstateInterface.getGemeinden( recordFilter={"mandateId": self.current_user.mandateId} ) for gemeinde in gemeinden: # Check kontextInformationen for bfs_nummer for kontext in gemeinde.kontextInformationen: try: kontext_data = json.loads(kontext.inhalt) if isinstance(kontext.inhalt, str) else kontext.inhalt if isinstance(kontext_data, dict): if str(kontext_data.get("bfs_nummer")) == str(bfs_nummer): # Cache the result self.gemeinde_cache[bfs_nummer] = gemeinde.id return gemeinde except (json.JSONDecodeError, AttributeError): continue return None except Exception as e: logger.error(f"Error finding Gemeinde by BFS number {bfs_nummer}: {e}", exc_info=True) return None def _find_kanton_by_abbreviation(self, abk: str) -> Optional[Kanton]: """ Find existing Kanton by abbreviation. Args: abk: Canton abbreviation (e.g., "BE", "ZH") Returns: Kanton instance if found, None otherwise """ try: # Check cache first if abk in self.kanton_cache: kanton_id = self.kanton_cache[abk] kanton = self.realEstateInterface.getKanton(kanton_id) if kanton: return kanton # Search by abbreviation kantone = self.realEstateInterface.getKantone( recordFilter={ "mandateId": self.current_user.mandateId, "abk": abk } ) if kantone: kanton = kantone[0] # Cache the result self.kanton_cache[abk] = kanton.id return kanton return None except Exception as e: logger.error(f"Error finding Kanton by abbreviation {abk}: {e}", exc_info=True) return None def _get_or_create_kanton(self, kanton_abk: str) -> Optional[str]: """ Get or create a Kanton by abbreviation. Args: kanton_abk: Canton abbreviation (e.g., "BE", "ZH") Returns: UUID of the Kanton, or None if creation failed """ if not kanton_abk: return None # Check if exists existing_kanton = self._find_kanton_by_abbreviation(kanton_abk) if existing_kanton: return existing_kanton.id # Create new Kanton try: # Map common abbreviations to full names (fallback to abbreviation if not found) kanton_names = { "AG": "Aargau", "AI": "Appenzell Innerrhoden", "AR": "Appenzell Ausserrhoden", "BE": "Bern", "BL": "Basel-Landschaft", "BS": "Basel-Stadt", "FR": "Freiburg", "GE": "Genf", "GL": "Glarus", "GR": "Graubünden", "JU": "Jura", "LU": "Luzern", "NE": "Neuenburg", "NW": "Nidwalden", "OW": "Obwalden", "SG": "St. Gallen", "SH": "Schaffhausen", "SO": "Solothurn", "SZ": "Schwyz", "TG": "Thurgau", "TI": "Tessin", "UR": "Uri", "VD": "Waadt", "VS": "Wallis", "ZG": "Zug", "ZH": "Zürich" } kanton_label = kanton_names.get(kanton_abk, kanton_abk) kanton = Kanton( mandateId=self.current_user.mandateId, label=kanton_label, abk=kanton_abk ) created_kanton = self.realEstateInterface.createKanton(kanton) if created_kanton and created_kanton.id: # Cache the result self.kanton_cache[kanton_abk] = created_kanton.id logger.info(f"Created new Kanton: {kanton_label} ({kanton_abk})") return created_kanton.id except Exception as e: logger.error(f"Error creating Kanton {kanton_abk}: {e}", exc_info=True) return None def _get_or_create_gemeinde( self, gemeinde_name: str, bfs_nummer: str, kanton_abk: str ) -> Optional[str]: """ Get or create a Gemeinde by BFS number. Args: gemeinde_name: Municipality name bfs_nummer: BFS municipality number kanton_abk: Canton abbreviation Returns: UUID of the Gemeinde, or None if creation failed """ if not gemeinde_name or not bfs_nummer: return None # Check if exists existing_gemeinde = self._find_gemeinde_by_bfs_nummer(bfs_nummer) if existing_gemeinde: return existing_gemeinde.id # Get or create Kanton first kanton_id = self._get_or_create_kanton(kanton_abk) # Create new Gemeinde try: gemeinde = Gemeinde( mandateId=self.current_user.mandateId, label=gemeinde_name, id_kanton=kanton_id, kontextInformationen=[ Kontext( thema="BFS Nummer", inhalt=json.dumps({"bfs_nummer": bfs_nummer}, ensure_ascii=False) ) ] ) created_gemeinde = self.realEstateInterface.createGemeinde(gemeinde) if created_gemeinde and created_gemeinde.id: # Cache the result self.gemeinde_cache[bfs_nummer] = created_gemeinde.id logger.info(f"Created new Gemeinde: {gemeinde_name} (BFS: {bfs_nummer})") return created_gemeinde.id except Exception as e: logger.error(f"Error creating Gemeinde {gemeinde_name} (BFS: {bfs_nummer}): {e}", exc_info=True) return None def _extract_parcel_identifier(self, parcel_data: Dict[str, Any]) -> Optional[str]: """ Extract unique identifier for a parcel. Args: parcel_data: Parcel data from Swiss Topo Returns: Unique identifier (EGRID or label) or None """ attributes = parcel_data.get("attributes", {}) # Prefer EGRID as it's globally unique egrid = attributes.get("egris_egrid") if egrid: return f"egrid:{egrid}" # Fallback to label + municipality code label = attributes.get("label") or attributes.get("number") bfsnr = attributes.get("bfsnr") if label and bfsnr: return f"label:{bfsnr}:{label}" # Last resort: just label if label: return f"label:{label}" return None def _convert_to_parzelle_model( self, parcel_data: Dict[str, Any], parcel_id: str, gemeinde_id: Optional[str] = None ) -> Optional[Parzelle]: """ Convert Swiss Topo parcel data to Parzelle model. Args: parcel_data: Raw parcel data from Swiss Topo parcel_id: Unique parcel identifier gemeinde_id: UUID of the Gemeinde (if already resolved) Returns: Parzelle model instance or None if conversion fails """ try: attributes = parcel_data.get("attributes", {}) geometry = parcel_data.get("geometry", {}) # Extract attributes extracted_attrs = self.connector.extract_parcel_attributes(parcel_data) # Get geocoded address if available geocoded_address = parcel_data.get('geocoded_address', {}) # Get bauzone - prefer ÖREB bauzone if available (for Zürich), otherwise use extracted_attrs bauzone = parcel_data.get("oereb_bauzone") or extracted_attrs.get("bauzone") # Build Parzelle data parzelle_data = { "mandateId": self.current_user.mandateId, "label": extracted_attrs.get("label") or attributes.get("number") or f"Parcel-{parcel_id}", "parzellenAliasTags": [attributes.get("egris_egrid")] if attributes.get("egris_egrid") else [], "strasseNr": geocoded_address.get("full_address") or extracted_attrs.get("strasseNr"), "plz": geocoded_address.get("plz") or extracted_attrs.get("plz"), "eigentuemerschaft": extracted_attrs.get("eigentuemerschaft"), "bauzone": bauzone, "perimeter": extracted_attrs.get("perimeter"), "baulinie": None, "kontextGemeinde": gemeinde_id, # Use UUID reference instead of name "az": None, "bz": None, "vollgeschossZahl": None, "anrechenbarDachgeschoss": None, "anrechenbarUntergeschoss": None, "gebaeudehoeheMax": None, "regelnGrenzabstand": [], "regelnMehrlaengenzuschlag": [], "regelnMehrhoehenzuschlag": [], "parzelleBebaut": None, "parzelleErschlossen": None, "parzelleHanglage": None, "laermschutzzone": None, "hochwasserschutzzone": None, "grundwasserschutzzone": None, "parzellenNachbarschaft": [], "dokumente": [], "kontextInformationen": [ Kontext( thema="Swiss Topo Scraping", inhalt=json.dumps({ "egrid": attributes.get("egris_egrid"), "identnd": attributes.get("identnd"), "canton": attributes.get("ak"), "municipality_code": attributes.get("bfsnr"), "scraped_coordinates": parcel_data.get('query_coordinates', {}), "source": "swiss_topo_scraping", "bauzone_source": "oereb_wfs" if parcel_data.get("oereb_bauzone") else "swiss_topo" }, ensure_ascii=False) ) ] } # Create Parzelle instance parzelle_instance = Parzelle(**parzelle_data) return parzelle_instance except Exception as e: logger.error(f"Error converting parcel {parcel_id} to Parzelle model: {e}", exc_info=True) return None async def _save_parcels_batch( self, parcels: List[Dict[str, Any]] ) -> int: """ Save a batch of parcels to the database. This method handles the sorting algorithm: - For each parcel, extracts gemeinde_information - Checks if Gemeinde exists (by bfs_nummer), creates if not - Checks if Kanton exists (by abbreviation), creates if not - Links Parzelle to Gemeinde via UUID reference Args: parcels: List of parcel data dictionaries Returns: Number of parcels successfully saved """ saved_count = 0 for parcel_data in parcels: try: parcel_id = self._extract_parcel_identifier(parcel_data) if not parcel_id: logger.warning("Could not extract parcel identifier, skipping") self.stats.parcels_skipped += 1 continue # Check if parcel already exists in our unique set if parcel_id in self.unique_parcels: logger.debug(f"Parcel {parcel_id} already found, skipping duplicate") self.stats.parcels_skipped += 1 continue # Extract gemeinde_information from parcel_data # The connector returns it as 'gemeinde_info' (not 'gemeinde_information') gemeinde_info = parcel_data.get('gemeinde_info') or parcel_data.get('gemeinde_information') gemeinde_id = None kanton_abk = None if gemeinde_info: gemeinde_name = gemeinde_info.get('name') bfs_nummer = gemeinde_info.get('bfs_nummer') kanton_abk = gemeinde_info.get('kanton') # Skip parcels not from Zürich (safety filter) if kanton_abk and kanton_abk.upper() != "ZH": logger.debug(f"Skipping parcel {parcel_id} from canton {kanton_abk} (only Zürich parcels are processed)") self.stats.parcels_skipped += 1 continue if gemeinde_name and bfs_nummer: # Get or create Gemeinde (this also handles Kanton creation) gemeinde_id = self._get_or_create_gemeinde( gemeinde_name=gemeinde_name, bfs_nummer=str(bfs_nummer), kanton_abk=kanton_abk ) if not gemeinde_id: logger.warning( f"Could not get or create Gemeinde for parcel {parcel_id}: " f"name={gemeinde_name}, bfs_nummer={bfs_nummer}" ) else: logger.debug( f"Missing Gemeinde info for parcel {parcel_id}: " f"name={gemeinde_name}, bfs_nummer={bfs_nummer}" ) else: logger.debug(f"No gemeinde_info found in parcel_data for {parcel_id}") # Skip parcels without gemeinde_info (likely not from Zürich) self.stats.parcels_skipped += 1 continue # Query ÖREB WFS for bauzone (all parcels are from Zürich) if kanton_abk and kanton_abk.upper() == "ZH": try: attributes = parcel_data.get("attributes", {}) geometry = parcel_data.get("geometry", {}) egrid = attributes.get("egris_egrid", "") # Get coordinates for query (use centroid or first point if available) x = None y = None if geometry: if "rings" in geometry and geometry["rings"]: # Use first point of first ring first_ring = geometry["rings"][0] if first_ring and len(first_ring) > 0: x = first_ring[0][0] y = first_ring[0][1] elif "coordinates" in geometry: # Try to extract coordinates from GeoJSON format coords = geometry.get("coordinates", []) if coords and len(coords) > 0: # Handle nested coordinate arrays def get_first_coord(coord_list, depth=0): if depth < 3 and isinstance(coord_list, list) and len(coord_list) > 0: if isinstance(coord_list[0], (int, float)): return coord_list return get_first_coord(coord_list[0], depth + 1) return None first_coord = get_first_coord(coords) if first_coord and len(first_coord) >= 2: x = first_coord[0] y = first_coord[1] if geometry and self.connector.oereb_connector: logger.debug(f"Querying ÖREB WFS for bauzone for parcel {parcel_id}") zone_results = await self.connector.oereb_connector.query_zone_layer( egrid=egrid, x=x or 0.0, y=y or 0.0, canton="ZH", geometry=geometry ) if zone_results and len(zone_results) > 0: # Extract typ_gde_abkuerzung from the first result zone_attrs = zone_results[0].get("attributes", {}) oereb_bauzone = zone_attrs.get("typ_gde_abkuerzung") if oereb_bauzone: # Add bauzone to parcel_data so it can be used in _convert_to_parzelle_model parcel_data["oereb_bauzone"] = oereb_bauzone logger.debug(f"Found ÖREB bauzone '{oereb_bauzone}' for parcel {parcel_id}") else: logger.debug(f"No typ_gde_abkuerzung found in ÖREB response for parcel {parcel_id}") else: logger.debug(f"No zone results from ÖREB WFS for parcel {parcel_id}") else: logger.debug(f"Cannot query ÖREB WFS for parcel {parcel_id}: missing geometry or connector") except Exception as e: logger.warning(f"Error querying ÖREB WFS for bauzone for parcel {parcel_id}: {e}", exc_info=True) # Continue without ÖREB bauzone - will use default from extracted_attrs else: # This should not happen since we filter for ZH above, but log if it does logger.warning(f"Parcel {parcel_id} is not from Zürich (kanton: {kanton_abk}), skipping ÖREB query") # Convert to Parzelle model (with Gemeinde UUID reference) parzelle_instance = self._convert_to_parzelle_model( parcel_data, parcel_id, gemeinde_id=gemeinde_id ) if not parzelle_instance: logger.warning(f"Could not convert parcel {parcel_id} to model, skipping") self.stats.parcels_skipped += 1 continue # Check if parcel already exists in database (by label) # Note: We rely on in-memory deduplication by EGRID/label for uniqueness # Database check is mainly to avoid re-saving parcels from previous runs existing_parcels = self.realEstateInterface.getParzellen( recordFilter={ "mandateId": self.current_user.mandateId, "label": parzelle_instance.label } ) if existing_parcels: logger.debug(f"Parcel {parzelle_instance.label} already exists in database, skipping") self.unique_parcels[parcel_id] = parcel_data # Mark as seen self.stats.parcels_skipped += 1 continue # Save to database created_parzelle = self.realEstateInterface.createParzelle(parzelle_instance) if created_parzelle and created_parzelle.id: self.unique_parcels[parcel_id] = parcel_data saved_count += 1 self.stats.parcels_saved += 1 logger.debug( f"Saved parcel {created_parzelle.label} (ID: {created_parzelle.id}) " f"linked to Gemeinde {gemeinde_id if gemeinde_id else 'None'}" ) else: logger.warning(f"Failed to save parcel {parzelle_instance.label}") self.stats.parcels_skipped += 1 except Exception as e: error_msg = f"Error saving parcel: {str(e)}" logger.error(error_msg, exc_info=True) self.stats.errors.append(error_msg) self.stats.parcels_skipped += 1 return saved_count async def scrape( self, grid_points: Optional[List[tuple]] = None, progress_callback: Optional[callable] = None ) -> ScrapingStats: """ Scrape Kanton Zürich for parcel data. Args: grid_points: Optional list of (x, y) coordinates to query. If None, generates grid automatically for Zürich. progress_callback: Optional callback function(status_dict) called periodically Returns: ScrapingStats object with scraping statistics """ logger.info("Starting Swiss Topo scraping operation for Kanton Zürich") # Generate grid points if not provided if grid_points is None: grid_points = self._generate_grid_points() logger.info(f"Scraping {len(grid_points)} grid points in Kanton Zürich") # Create semaphore for concurrency control semaphore = asyncio.Semaphore(self.max_concurrent) # Query all grid points concurrently tasks = [ self._query_parcel_at_point(x, y, semaphore) for x, y in grid_points ] # Process results in batches batch = [] processed = 0 for coro in asyncio.as_completed(tasks): try: parcel_data = await coro processed += 1 if parcel_data: batch.append(parcel_data) # Save batch when it reaches batch_size if len(batch) >= self.batch_size: await self._save_parcels_batch(batch) batch = [] # Update stats self.stats.unique_parcels_found = len(self.unique_parcels) # Call progress callback if provided if progress_callback: progress_callback({ "processed": processed, "total": len(grid_points), "unique_parcels": self.stats.unique_parcels_found, "saved": self.stats.parcels_saved, "skipped": self.stats.parcels_skipped }) # Log progress periodically if processed % 100 == 0: logger.info( f"Progress: {processed}/{len(grid_points)} queries completed, " f"{self.stats.unique_parcels_found} unique parcels found, " f"{self.stats.parcels_saved} saved" ) except Exception as e: logger.error(f"Error processing query result: {e}", exc_info=True) self.stats.errors.append(str(e)) # Save remaining batch if batch: await self._save_parcels_batch(batch) # Final stats update self.stats.unique_parcels_found = len(self.unique_parcels) logger.info( f"Scraping completed: {self.stats.unique_parcels_found} unique parcels found, " f"{self.stats.parcels_saved} saved, {self.stats.parcels_skipped} skipped" ) return self.stats async def scrape_switzerland( current_user: User, grid_size: float = 500.0, max_concurrent: int = 50, batch_size: int = 100, grid_points: Optional[List[tuple]] = None ) -> Dict[str, Any]: """ Main function to scrape Kanton Zürich for parcel data. Note: This function now only scrapes parcels from Kanton Zürich. All parcels are queried for ÖREB bauzone information. Args: current_user: User for database operations grid_size: Size of grid cells in meters (default: 500m) max_concurrent: Maximum concurrent API requests (default: 50) batch_size: Number of parcels to process before saving (default: 100) grid_points: Optional list of (x, y) coordinates to query (must be within Zürich bounds) Returns: Dictionary with scraping statistics and results """ scraper = SwissTopoScraper( current_user=current_user, grid_size=grid_size, max_concurrent=max_concurrent, batch_size=batch_size ) stats = await scraper.scrape(grid_points=grid_points) return { "success": True, "stats": { "total_queries": stats.total_queries, "successful_queries": stats.successful_queries, "failed_queries": stats.failed_queries, "unique_parcels_found": stats.unique_parcels_found, "parcels_saved": stats.parcels_saved, "parcels_skipped": stats.parcels_skipped, "error_count": len(stats.errors), "errors": stats.errors[:10] # Return first 10 errors } }