""" Interface to Real Estate database objects. Uses PostgreSQL connector for data access with user/mandate filtering. Handles CRUD operations on Real Estate entities (Projekt, Parzelle, etc.). """ import logging from typing import Dict, Any, List, Optional from modules.datamodels.datamodelRealEstate import ( Projekt, Parzelle, Dokument, Kanton, Gemeinde, Land, GeoPolylinie, GeoPunkt, Kontext, StatusProzess, ) from modules.datamodels.datamodelUam import User from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG # Import Access-Klasse aus separater Datei from modules.interfaces.interfaceDbRealEstateAccess import RealEstateAccess logger = logging.getLogger(__name__) # Singleton factory for Real Estate interfaces _realEstateInterfaces = {} class RealEstateObjects: """ Interface to Real Estate database objects. Uses PostgreSQL connector for data access with user/mandate filtering. Handles CRUD operations on Real Estate entities. """ def __init__(self, currentUser: Optional[User] = None): """Initializes the Real Estate Interface.""" self.currentUser = currentUser self.userId = currentUser.id if currentUser else None self.mandateId = currentUser.mandateId if currentUser else None self.access = None # Initialize database self._initializeDatabase() # Set user context if provided if currentUser: self.setUserContext(currentUser) def _initializeDatabase(self): """Initialize PostgreSQL database connection.""" try: # Get database configuration from environment dbHost = APP_CONFIG.get("DB_REALESTATE_HOST", "localhost") dbDatabase = APP_CONFIG.get("DB_REALESTATE_DATABASE", "poweron_realestate") dbUser = APP_CONFIG.get("DB_REALESTATE_USER") dbPassword = APP_CONFIG.get("DB_REALESTATE_PASSWORD_SECRET") dbPort = int(APP_CONFIG.get("DB_REALESTATE_PORT", 5432)) # Initialize database connector self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=self.userId if self.userId else None, ) # Initialize database system (creates database and system table if needed) # Note: This is also called in DatabaseConnector.__init__, but we call it explicitly # for consistency with other interfaces and to ensure proper initialization self.db.initDbSystem() # Ensure all supporting tables are created (Land, Kanton, Gemeinde, Dokument) # These tables are needed for foreign key relationships self._ensureSupportingTablesExist() logger.info(f"Real Estate database connector initialized for database: {dbDatabase}") except Exception as e: logger.error(f"Error initializing Real Estate database: {e}") raise def _ensureSupportingTablesExist(self): """Ensure all supporting tables (Land, Kanton, Gemeinde, Dokument) are created.""" try: # These tables are created on-demand when first accessed, but we ensure they exist here # to avoid errors when resolving location names to IDs self.db._ensureTableExists(Land) self.db._ensureTableExists(Kanton) self.db._ensureTableExists(Gemeinde) self.db._ensureTableExists(Dokument) logger.debug("Supporting tables (Land, Kanton, Gemeinde, Dokument) verified/created") except Exception as e: logger.warning(f"Error ensuring supporting tables exist: {e}") # Don't raise - tables will be created on-demand anyway def setUserContext(self, currentUser: User): """Sets the user context for the interface.""" self.currentUser = currentUser self.userId = currentUser.id self.mandateId = currentUser.mandateId if not self.userId or not self.mandateId: raise ValueError("Invalid user context: id and mandateId are required") # Initialize access control self.access = RealEstateAccess(self.currentUser, self.db) # Update database context self.db.updateContext(self.userId) # ===== Projekt Methods ===== def createProjekt(self, projekt: Projekt) -> Projekt: """Create a new project.""" # Ensure mandateId is set if not projekt.mandateId: projekt.mandateId = self.mandateId # Apply access control self.access.uam(Projekt, []) # Save to database self.db.recordCreate(Projekt, projekt.model_dump()) return projekt def getProjekt(self, projektId: str) -> Optional[Projekt]: """Get a project by ID.""" records = self.db.getRecordset( Projekt, recordFilter={"id": projektId} ) if not records: return None # Apply access control filtered = self.access.uam(Projekt, records) if not filtered: return None return Projekt(**filtered[0]) def getProjekte(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Projekt]: """Get all projects matching the filter.""" records = self.db.getRecordset(Projekt, recordFilter=recordFilter or {}) # Apply access control filtered = self.access.uam(Projekt, records) return [Projekt(**r) for r in filtered] def updateProjekt(self, projektId: str, updateData: Dict[str, Any]) -> Optional[Projekt]: """Update a project.""" projekt = self.getProjekt(projektId) if not projekt: return None # Check if user can modify if not self.access.canModify(Projekt, projektId): raise PermissionError(f"User {self.userId} cannot modify project {projektId}") # Update fields for key, value in updateData.items(): if hasattr(projekt, key): setattr(projekt, key, value) # Save to database self.db.recordModify(Projekt, projektId, projekt.model_dump()) return projekt def deleteProjekt(self, projektId: str) -> bool: """Delete a project.""" projekt = self.getProjekt(projektId) if not projekt: return False # Check if user can modify if not self.access.canModify(Projekt, projektId): raise PermissionError(f"User {self.userId} cannot delete project {projektId}") return self.db.recordDelete(Projekt, projektId) # ===== Parzelle Methods ===== def createParzelle(self, parzelle: Parzelle) -> Parzelle: """Create a new plot.""" if not parzelle.mandateId: parzelle.mandateId = self.mandateId self.access.uam(Parzelle, []) self.db.recordCreate(Parzelle, parzelle.model_dump()) return parzelle def getParzelle(self, parzelleId: str) -> Optional[Parzelle]: """Get a plot by ID.""" records = self.db.getRecordset( Parzelle, recordFilter={"id": parzelleId} ) if not records: return None filtered = self.access.uam(Parzelle, records) if not filtered: return None return Parzelle(**filtered[0]) def getParzellen(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Parzelle]: """Get all plots matching the filter.""" # Resolve location names to IDs if needed if recordFilter: recordFilter = self._resolveLocationFilters(recordFilter) records = self.db.getRecordset(Parzelle, recordFilter=recordFilter or {}) # Apply access control filtered = self.access.uam(Parzelle, records) return [Parzelle(**r) for r in filtered] def _resolveLocationFilters(self, recordFilter: Dict[str, Any]) -> Dict[str, Any]: """ Resolve location names to IDs for foreign key fields. Only handles kontextGemeinde (Parzelle → Gemeinde). Note: Parzelle does NOT have direct links to Kanton or Land. The relationship is: Parzelle → Gemeinde → Kanton → Land """ resolvedFilter = recordFilter.copy() # Resolve Gemeinde name to ID # This is the only direct location link on Parzelle if "kontextGemeinde" in resolvedFilter: gemeindeValue = resolvedFilter["kontextGemeinde"] # Check if it's a name (not a UUID-like string) if not self._isUUID(gemeindeValue): gemeindeId = self._resolveGemeindeByName(gemeindeValue) if gemeindeId: resolvedFilter["kontextGemeinde"] = gemeindeId logger.debug(f"Resolved Gemeinde name '{gemeindeValue}' to ID '{gemeindeId}'") else: logger.warning(f"Gemeinde '{gemeindeValue}' not found, filter may return no results") # Keep the original value - query will return empty if not found # Note: kontextKanton and kontextLand are NOT fields on Parzelle # If they appear in the filter, they will be filtered out by the validation in mainRealEstate.py return resolvedFilter def _isUUID(self, value: str) -> bool: """Check if a string looks like a UUID.""" import re uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.IGNORECASE) return bool(uuid_pattern.match(value)) def _resolveGemeindeByName(self, name: str) -> Optional[str]: """Resolve Gemeinde name to ID by looking up in Gemeinde table.""" try: # First try exact match gemeinden = self.db.getRecordset( Gemeinde, recordFilter={"label": name} ) if gemeinden: gemeindeId = gemeinden[0].get("id") logger.debug(f"Found Gemeinde '{name}' with ID '{gemeindeId}'") return gemeindeId # If no exact match, try case-insensitive search via SQL query # This handles cases where the name might have different casing self.db._ensure_connection() with self.db.connection.cursor() as cursor: cursor.execute( 'SELECT "id" FROM "Gemeinde" WHERE LOWER("label") = LOWER(%s) LIMIT 1', (name,) ) result = cursor.fetchone() if result: # psycopg2 returns tuples, so result[0] is the id gemeindeId = result[0] logger.debug(f"Found Gemeinde '{name}' (case-insensitive) with ID '{gemeindeId}'") return gemeindeId logger.warning(f"Gemeinde '{name}' not found in database") return None except Exception as e: logger.error(f"Error resolving Gemeinde by name '{name}': {e}", exc_info=True) return None def _resolveKantonByName(self, name: str) -> Optional[str]: """Resolve Kanton name to ID by looking up in Kanton table.""" try: # First try exact match kantone = self.db.getRecordset( Kanton, recordFilter={"label": name} ) if kantone: kantonId = kantone[0].get("id") logger.debug(f"Found Kanton '{name}' with ID '{kantonId}'") return kantonId # Try case-insensitive search self.db._ensure_connection() with self.db.connection.cursor() as cursor: cursor.execute( 'SELECT "id" FROM "Kanton" WHERE LOWER("label") = LOWER(%s) LIMIT 1', (name,) ) result = cursor.fetchone() if result: # psycopg2 returns tuples, so result[0] is the id kantonId = result[0] logger.debug(f"Found Kanton '{name}' (case-insensitive) with ID '{kantonId}'") return kantonId logger.warning(f"Kanton '{name}' not found in database") return None except Exception as e: logger.error(f"Error resolving Kanton by name '{name}': {e}", exc_info=True) return None def _resolveLandByName(self, name: str) -> Optional[str]: """Resolve Land name to ID by looking up in Land table.""" try: # First try exact match laender = self.db.getRecordset( Land, recordFilter={"label": name} ) if laender: landId = laender[0].get("id") logger.debug(f"Found Land '{name}' with ID '{landId}'") return landId # Try case-insensitive search self.db._ensure_connection() with self.db.connection.cursor() as cursor: cursor.execute( 'SELECT "id" FROM "Land" WHERE LOWER("label") = LOWER(%s) LIMIT 1', (name,) ) result = cursor.fetchone() if result: # psycopg2 returns tuples, so result[0] is the id landId = result[0] logger.debug(f"Found Land '{name}' (case-insensitive) with ID '{landId}'") return landId logger.warning(f"Land '{name}' not found in database") return None except Exception as e: logger.error(f"Error resolving Land by name '{name}': {e}", exc_info=True) return None def updateParzelle(self, parzelleId: str, updateData: Dict[str, Any]) -> Optional[Parzelle]: """Update a plot.""" parzelle = self.getParzelle(parzelleId) if not parzelle: return None if not self.access.canModify(Parzelle, parzelleId): raise PermissionError(f"User {self.userId} cannot modify plot {parzelleId}") for key, value in updateData.items(): if hasattr(parzelle, key): setattr(parzelle, key, value) self.db.recordModify(Parzelle, parzelleId, parzelle.model_dump()) return parzelle def deleteParzelle(self, parzelleId: str) -> bool: """Delete a plot.""" parzelle = self.getParzelle(parzelleId) if not parzelle: return False if not self.access.canModify(Parzelle, parzelleId): raise PermissionError(f"User {self.userId} cannot delete plot {parzelleId}") return self.db.recordDelete(Parzelle, parzelleId) # ===== Dokument Methods ===== def createDokument(self, dokument: Dokument) -> Dokument: """Create a new document.""" if not dokument.mandateId: dokument.mandateId = self.mandateId self.access.uam(Dokument, []) self.db.recordCreate(Dokument, dokument.model_dump()) return dokument def getDokument(self, dokumentId: str) -> Optional[Dokument]: """Get a document by ID.""" records = self.db.getRecordset( Dokument, recordFilter={"id": dokumentId} ) if not records: return None filtered = self.access.uam(Dokument, records) if not filtered: return None return Dokument(**filtered[0]) def getDokumente(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Dokument]: """Get all documents matching the filter.""" records = self.db.getRecordset(Dokument, recordFilter=recordFilter or {}) filtered = self.access.uam(Dokument, records) return [Dokument(**r) for r in filtered] def updateDokument(self, dokumentId: str, updateData: Dict[str, Any]) -> Optional[Dokument]: """Update a document.""" dokument = self.getDokument(dokumentId) if not dokument: return None if not self.access.canModify(Dokument, dokumentId): raise PermissionError(f"User {self.userId} cannot modify document {dokumentId}") for key, value in updateData.items(): if hasattr(dokument, key): setattr(dokument, key, value) self.db.recordModify(Dokument, dokumentId, dokument.model_dump()) return dokument def deleteDokument(self, dokumentId: str) -> bool: """Delete a document.""" dokument = self.getDokument(dokumentId) if not dokument: return False if not self.access.canModify(Dokument, dokumentId): raise PermissionError(f"User {self.userId} cannot delete document {dokumentId}") return self.db.recordDelete(Dokument, dokumentId) # ===== Gemeinde Methods ===== def createGemeinde(self, gemeinde: Gemeinde) -> Gemeinde: """Create a new municipality.""" if not gemeinde.mandateId: gemeinde.mandateId = self.mandateId self.access.uam(Gemeinde, []) self.db.recordCreate(Gemeinde, gemeinde.model_dump()) return gemeinde def getGemeinde(self, gemeindeId: str) -> Optional[Gemeinde]: """Get a municipality by ID.""" records = self.db.getRecordset( Gemeinde, recordFilter={"id": gemeindeId} ) if not records: return None filtered = self.access.uam(Gemeinde, records) if not filtered: return None return Gemeinde(**filtered[0]) def getGemeinden(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Gemeinde]: """Get all municipalities matching the filter.""" records = self.db.getRecordset(Gemeinde, recordFilter=recordFilter or {}) filtered = self.access.uam(Gemeinde, records) return [Gemeinde(**r) for r in filtered] def updateGemeinde(self, gemeindeId: str, updateData: Dict[str, Any]) -> Optional[Gemeinde]: """Update a municipality.""" gemeinde = self.getGemeinde(gemeindeId) if not gemeinde: return None if not self.access.canModify(Gemeinde, gemeindeId): raise PermissionError(f"User {self.userId} cannot modify municipality {gemeindeId}") for key, value in updateData.items(): if hasattr(gemeinde, key): setattr(gemeinde, key, value) self.db.recordModify(Gemeinde, gemeindeId, gemeinde.model_dump()) return gemeinde def deleteGemeinde(self, gemeindeId: str) -> bool: """Delete a municipality.""" gemeinde = self.getGemeinde(gemeindeId) if not gemeinde: return False if not self.access.canModify(Gemeinde, gemeindeId): raise PermissionError(f"User {self.userId} cannot delete municipality {gemeindeId}") return self.db.recordDelete(Gemeinde, gemeindeId) # ===== Kanton Methods ===== def createKanton(self, kanton: Kanton) -> Kanton: """Create a new canton.""" if not kanton.mandateId: kanton.mandateId = self.mandateId self.access.uam(Kanton, []) self.db.recordCreate(Kanton, kanton.model_dump()) return kanton def getKanton(self, kantonId: str) -> Optional[Kanton]: """Get a canton by ID.""" records = self.db.getRecordset( Kanton, recordFilter={"id": kantonId} ) if not records: return None filtered = self.access.uam(Kanton, records) if not filtered: return None return Kanton(**filtered[0]) def getKantone(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Kanton]: """Get all cantons matching the filter.""" records = self.db.getRecordset(Kanton, recordFilter=recordFilter or {}) filtered = self.access.uam(Kanton, records) return [Kanton(**r) for r in filtered] def updateKanton(self, kantonId: str, updateData: Dict[str, Any]) -> Optional[Kanton]: """Update a canton.""" kanton = self.getKanton(kantonId) if not kanton: return None if not self.access.canModify(Kanton, kantonId): raise PermissionError(f"User {self.userId} cannot modify canton {kantonId}") for key, value in updateData.items(): if hasattr(kanton, key): setattr(kanton, key, value) self.db.recordModify(Kanton, kantonId, kanton.model_dump()) return kanton def deleteKanton(self, kantonId: str) -> bool: """Delete a canton.""" kanton = self.getKanton(kantonId) if not kanton: return False if not self.access.canModify(Kanton, kantonId): raise PermissionError(f"User {self.userId} cannot delete canton {kantonId}") return self.db.recordDelete(Kanton, kantonId) # ===== Land Methods ===== def createLand(self, land: Land) -> Land: """Create a new country.""" if not land.mandateId: land.mandateId = self.mandateId self.access.uam(Land, []) self.db.recordCreate(Land, land.model_dump()) return land def getLand(self, landId: str) -> Optional[Land]: """Get a country by ID.""" records = self.db.getRecordset( Land, recordFilter={"id": landId} ) if not records: return None filtered = self.access.uam(Land, records) if not filtered: return None return Land(**filtered[0]) def getLaender(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Land]: """Get all countries matching the filter.""" records = self.db.getRecordset(Land, recordFilter=recordFilter or {}) filtered = self.access.uam(Land, records) return [Land(**r) for r in filtered] def updateLand(self, landId: str, updateData: Dict[str, Any]) -> Optional[Land]: """Update a country.""" land = self.getLand(landId) if not land: return None if not self.access.canModify(Land, landId): raise PermissionError(f"User {self.userId} cannot modify country {landId}") for key, value in updateData.items(): if hasattr(land, key): setattr(land, key, value) self.db.recordModify(Land, landId, land.model_dump()) return land def deleteLand(self, landId: str) -> bool: """Delete a country.""" land = self.getLand(landId) if not land: return False if not self.access.canModify(Land, landId): raise PermissionError(f"User {self.userId} cannot delete country {landId}") return self.db.recordDelete(Land, landId) # ===== Direct Query Execution (stateless) ===== def executeQuery(self, queryText: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Execute a SQL query directly on the database (stateless). WARNING: This method executes raw SQL. Ensure proper validation and sanitization before calling this method. Consider implementing query whitelisting or only allowing SELECT statements for production use. Args: queryText: SQL query string (preferably SELECT only) parameters: Optional parameters for parameterized queries Returns: Dictionary with 'rows' (list of dicts), 'columns' (list of column names), 'rowCount' (int), and 'executionTime' (float) """ import time try: start_time = time.time() # Ensure connection is alive self.db._ensure_connection() with self.db.connection.cursor() as cursor: # Execute query if parameters: # Use parameterized query for safety cursor.execute(queryText, parameters) else: cursor.execute(queryText) # Fetch results rows = cursor.fetchall() # Convert to list of dictionaries result_rows = [] if rows: columns = [desc[0] for desc in cursor.description] if cursor.description else [] result_rows = [dict(zip(columns, row)) for row in rows] else: columns = [] execution_time = time.time() - start_time return { "rows": result_rows, "columns": columns, "rowCount": len(result_rows), "executionTime": execution_time, } except Exception as e: logger.error(f"Error executing query: {e}", exc_info=True) raise def getInterface(currentUser: User) -> RealEstateObjects: """ Factory function to get or create a Real Estate interface instance for a user. Uses singleton pattern per user. """ userKey = f"{currentUser.id}_{currentUser.mandateId}" if userKey not in _realEstateInterfaces: _realEstateInterfaces[userKey] = RealEstateObjects(currentUser) return _realEstateInterfaces[userKey]