898 lines
35 KiB
Python
898 lines
35 KiB
Python
"""
|
|
Interface to Real Estate database objects.
|
|
Uses PostgreSQL connector for data access with user/mandate filtering.
|
|
Handles CRUD operations on Real Estate entities (Projekt, Parzelle, etc.).
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from .datamodelFeatureRealEstate import (
|
|
Projekt,
|
|
Parzelle,
|
|
Dokument,
|
|
Kanton,
|
|
Gemeinde,
|
|
Land,
|
|
GeoPolylinie,
|
|
GeoPunkt,
|
|
Kontext,
|
|
StatusProzess,
|
|
)
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.shared.dbRegistry import registerDatabase
|
|
from modules.security.rbac import RbacClass
|
|
from modules.datamodels.datamodelRbac import AccessRuleContext
|
|
from modules.datamodels.datamodelUam import AccessLevel
|
|
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC, getRecordsetPaginatedWithRBAC, getDistinctColumnValuesWithRBAC
|
|
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
realEstateDatabase = "poweron_realestate"
|
|
registerDatabase(realEstateDatabase)
|
|
|
|
# Singleton factory for Real Estate interfaces
|
|
_realEstateInterfaces = {}
|
|
|
|
|
|
class RealEstateObjects:
|
|
"""
|
|
Interface to Real Estate database objects.
|
|
Uses PostgreSQL connector for data access with user/mandate filtering.
|
|
Handles CRUD operations on Real Estate entities.
|
|
"""
|
|
|
|
# Feature code for RBAC objectKey construction
|
|
# Used to build: data.feature.realestate.{TableName}
|
|
FEATURE_CODE = "realestate"
|
|
|
|
def __init__(self, currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
|
|
"""Initializes the Real Estate Interface.
|
|
|
|
Args:
|
|
currentUser: The authenticated user
|
|
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
|
|
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
|
|
"""
|
|
self.currentUser = currentUser
|
|
self.userId = currentUser.id if currentUser else None
|
|
# Use mandateId from parameter (Request-Context), not from user object
|
|
self.mandateId = mandateId
|
|
self.featureInstanceId = featureInstanceId
|
|
self.rbac = None # RBAC interface
|
|
|
|
# Initialize database
|
|
self._initializeDatabase()
|
|
|
|
# Set user context if provided
|
|
if currentUser:
|
|
self.setUserContext(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
|
|
|
def _initializeDatabase(self):
|
|
"""Initialize PostgreSQL database connection."""
|
|
try:
|
|
# Get database configuration from environment
|
|
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
|
|
dbDatabase = realEstateDatabase
|
|
dbUser = APP_CONFIG.get("DB_USER")
|
|
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
|
|
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
|
|
|
|
# Initialize database connector
|
|
self.db = DatabaseConnector(
|
|
dbHost=dbHost,
|
|
dbDatabase=dbDatabase,
|
|
dbUser=dbUser,
|
|
dbPassword=dbPassword,
|
|
dbPort=dbPort,
|
|
userId=self.userId if self.userId else None,
|
|
)
|
|
|
|
# Ensure all supporting tables are created (Land, Kanton, Gemeinde, Dokument)
|
|
# These tables are needed for foreign key relationships
|
|
self._ensureSupportingTablesExist()
|
|
|
|
logger.info(f"Real Estate database connector initialized for database: {dbDatabase}")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing Real Estate database: {e}")
|
|
raise
|
|
|
|
def _ensureSupportingTablesExist(self):
|
|
"""Ensure all supporting tables (Land, Kanton, Gemeinde, Dokument) are created."""
|
|
try:
|
|
# These tables are created on-demand when first accessed, but we ensure they exist here
|
|
# to avoid errors when resolving location names to IDs
|
|
self.db._ensureTableExists(Land)
|
|
self.db._ensureTableExists(Kanton)
|
|
self.db._ensureTableExists(Gemeinde)
|
|
self.db._ensureTableExists(Dokument)
|
|
logger.debug("Supporting tables (Land, Kanton, Gemeinde, Dokument) verified/created")
|
|
except Exception as e:
|
|
logger.warning(f"Error ensuring supporting tables exist: {e}")
|
|
# Don't raise - tables will be created on-demand anyway
|
|
|
|
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
|
|
"""Sets the user context for the interface.
|
|
|
|
Args:
|
|
currentUser: The authenticated user
|
|
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
|
|
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
|
|
"""
|
|
self.currentUser = currentUser
|
|
self.userId = currentUser.id
|
|
# Use mandateId from parameter (Request-Context), not from user object
|
|
self.mandateId = mandateId
|
|
self.featureInstanceId = featureInstanceId
|
|
|
|
if not self.userId:
|
|
raise ValueError("Invalid user context: id is required")
|
|
|
|
# Note: mandateId is ALWAYS optional here - it comes from Request-Context, not from User.
|
|
# Users are NOT assigned to mandates by design - they get mandate context from the request.
|
|
# sysAdmin users can additionally perform cross-mandate operations.
|
|
# Without mandateId, operations will be filtered to accessible mandates via RBAC.
|
|
|
|
# Initialize RBAC interface
|
|
if not self.currentUser:
|
|
raise ValueError("User context is required for RBAC")
|
|
# Get DbApp connection for RBAC AccessRule queries
|
|
from modules.security.rootAccess import getRootDbAppConnector
|
|
dbApp = getRootDbAppConnector()
|
|
self.rbac = RbacClass(self.db, dbApp=dbApp)
|
|
|
|
# Update database context
|
|
self.db.updateContext(self.userId)
|
|
|
|
# ===== Projekt Methods =====
|
|
|
|
def createProjekt(self, projekt: Projekt) -> Projekt:
|
|
"""Create a new project."""
|
|
# Check RBAC permission
|
|
if not self.checkRbacPermission(Projekt, "create"):
|
|
raise PermissionError(f"User {self.userId} cannot create projects")
|
|
|
|
# Ensure mandateId and featureInstanceId are set for proper data isolation
|
|
if not projekt.mandateId:
|
|
projekt.mandateId = self.mandateId
|
|
if not projekt.featureInstanceId:
|
|
projekt.featureInstanceId = self.featureInstanceId
|
|
|
|
# Save to database - use mode='json' to ensure nested Pydantic models are serialized
|
|
self.db.recordCreate(Projekt, projekt.model_dump(mode='json'))
|
|
|
|
return projekt
|
|
|
|
def getProjekt(self, projektId: str) -> Optional[Projekt]:
|
|
"""Get a project by ID."""
|
|
records = getRecordsetWithRBAC(
|
|
self.db,
|
|
Projekt,
|
|
self.currentUser,
|
|
recordFilter={"id": projektId},
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if not records:
|
|
return None
|
|
|
|
return Projekt(**records[0])
|
|
|
|
def getProjekte(self, recordFilter: Optional[Dict[str, Any]] = None, pagination: Optional[PaginationParams] = None) -> Union[List[Projekt], PaginatedResult]:
|
|
"""Get all projects matching the filter with optional DB-level pagination."""
|
|
result = getRecordsetPaginatedWithRBAC(
|
|
self.db,
|
|
Projekt,
|
|
self.currentUser,
|
|
pagination=pagination,
|
|
recordFilter=recordFilter or {},
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if isinstance(result, PaginatedResult):
|
|
result.items = [Projekt(**r) for r in result.items]
|
|
return result
|
|
return [Projekt(**r) for r in result]
|
|
|
|
def updateProjekt(self, projektId_or_projekt: Union[str, Projekt], updateData: Optional[Dict[str, Any]] = None) -> Optional[Projekt]:
|
|
"""Update a project.
|
|
|
|
Args:
|
|
projektId_or_projekt: Either a project ID (str) or a Projekt object
|
|
updateData: Optional dict of fields to update (only used when projektId_or_projekt is a string)
|
|
"""
|
|
# Handle both Projekt object and projektId string
|
|
if isinstance(projektId_or_projekt, Projekt):
|
|
projekt = projektId_or_projekt
|
|
projektId = projekt.id
|
|
else:
|
|
projektId = projektId_or_projekt
|
|
projekt = self.getProjekt(projektId)
|
|
if not projekt:
|
|
return None
|
|
|
|
# Update fields from updateData if provided
|
|
if updateData:
|
|
for key, value in updateData.items():
|
|
if hasattr(projekt, key):
|
|
setattr(projekt, key, value)
|
|
|
|
# Check RBAC permission
|
|
if not self.checkRbacPermission(Projekt, "update", projektId):
|
|
raise PermissionError(f"User {self.userId} cannot modify project {projektId}")
|
|
|
|
# Save to database
|
|
self.db.recordModify(Projekt, projektId, projekt.model_dump())
|
|
|
|
return projekt
|
|
|
|
def deleteProjekt(self, projektId: str) -> bool:
|
|
"""Delete a project."""
|
|
projekt = self.getProjekt(projektId)
|
|
if not projekt:
|
|
return False
|
|
|
|
# Check RBAC permission
|
|
if not self.checkRbacPermission(Projekt, "delete", projektId):
|
|
raise PermissionError(f"User {self.userId} cannot delete project {projektId}")
|
|
|
|
return self.db.recordDelete(Projekt, projektId)
|
|
|
|
# ===== Parzelle Methods =====
|
|
|
|
def createParzelle(self, parzelle: Parzelle) -> Parzelle:
|
|
"""Create a new plot."""
|
|
# Check RBAC permission
|
|
if not self.checkRbacPermission(Parzelle, "create"):
|
|
raise PermissionError(f"User {self.userId} cannot create plots")
|
|
|
|
# Ensure mandateId and featureInstanceId are set for proper data isolation
|
|
if not parzelle.mandateId:
|
|
parzelle.mandateId = self.mandateId
|
|
if not parzelle.featureInstanceId:
|
|
parzelle.featureInstanceId = self.featureInstanceId
|
|
|
|
# Use mode='json' to ensure nested Pydantic models (like GeoPolylinie) are serialized
|
|
self.db.recordCreate(Parzelle, parzelle.model_dump(mode='json'))
|
|
|
|
return parzelle
|
|
|
|
def getParzelle(self, parzelleId: str) -> Optional[Parzelle]:
|
|
"""Get a plot by ID."""
|
|
records = getRecordsetWithRBAC(
|
|
self.db,
|
|
Parzelle,
|
|
self.currentUser,
|
|
recordFilter={"id": parzelleId},
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if not records:
|
|
return None
|
|
|
|
return Parzelle(**records[0])
|
|
|
|
def getParzellen(self, recordFilter: Optional[Dict[str, Any]] = None, pagination: Optional[PaginationParams] = None) -> Union[List[Parzelle], PaginatedResult]:
|
|
"""Get all plots matching the filter with optional DB-level pagination."""
|
|
if recordFilter:
|
|
recordFilter = self._resolveLocationFilters(recordFilter)
|
|
|
|
result = getRecordsetPaginatedWithRBAC(
|
|
self.db,
|
|
Parzelle,
|
|
self.currentUser,
|
|
pagination=pagination,
|
|
recordFilter=recordFilter or {}
|
|
)
|
|
|
|
if isinstance(result, PaginatedResult):
|
|
result.items = [Parzelle(**r) for r in result.items]
|
|
return result
|
|
return [Parzelle(**r) for r in result]
|
|
|
|
def _resolveLocationFilters(self, recordFilter: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Resolve location names to IDs for foreign key fields.
|
|
Only handles kontextGemeinde (Parzelle → Gemeinde).
|
|
Note: Parzelle does NOT have direct links to Kanton or Land.
|
|
The relationship is: Parzelle → Gemeinde → Kanton → Land
|
|
"""
|
|
resolvedFilter = recordFilter.copy()
|
|
|
|
# Resolve Gemeinde name to ID
|
|
# This is the only direct location link on Parzelle
|
|
if "kontextGemeinde" in resolvedFilter:
|
|
gemeindeValue = resolvedFilter["kontextGemeinde"]
|
|
# Check if it's a name (not a UUID-like string)
|
|
if not self._isUUID(gemeindeValue):
|
|
gemeindeId = self._resolveGemeindeByName(gemeindeValue)
|
|
if gemeindeId:
|
|
resolvedFilter["kontextGemeinde"] = gemeindeId
|
|
logger.debug(f"Resolved Gemeinde name '{gemeindeValue}' to ID '{gemeindeId}'")
|
|
else:
|
|
logger.warning(f"Gemeinde '{gemeindeValue}' not found, filter may return no results")
|
|
# Keep the original value - query will return empty if not found
|
|
|
|
# Note: kontextKanton and kontextLand are NOT fields on Parzelle
|
|
# If they appear in the filter, they will be filtered out by the validation in mainRealEstate.py
|
|
|
|
return resolvedFilter
|
|
|
|
def _isUUID(self, value: str) -> bool:
|
|
"""Check if a string looks like a UUID."""
|
|
import re
|
|
uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.IGNORECASE)
|
|
return bool(uuid_pattern.match(value))
|
|
|
|
def _resolveGemeindeByName(self, name: str) -> Optional[str]:
|
|
"""Resolve Gemeinde name to ID by looking up in Gemeinde table."""
|
|
try:
|
|
# First try exact match
|
|
gemeinden = self.db.getRecordset(
|
|
Gemeinde,
|
|
recordFilter={"label": name}
|
|
)
|
|
if gemeinden:
|
|
gemeindeId = gemeinden[0].get("id")
|
|
logger.debug(f"Found Gemeinde '{name}' with ID '{gemeindeId}'")
|
|
return gemeindeId
|
|
|
|
# If no exact match, try case-insensitive search via SQL query
|
|
# This handles cases where the name might have different casing
|
|
self.db._ensure_connection()
|
|
with self.db.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
'SELECT "id" FROM "Gemeinde" WHERE LOWER("label") = LOWER(%s) LIMIT 1',
|
|
(name,)
|
|
)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
# psycopg2 returns tuples, so result[0] is the id
|
|
gemeindeId = result[0]
|
|
logger.debug(f"Found Gemeinde '{name}' (case-insensitive) with ID '{gemeindeId}'")
|
|
return gemeindeId
|
|
|
|
logger.warning(f"Gemeinde '{name}' not found in database")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error resolving Gemeinde by name '{name}': {e}", exc_info=True)
|
|
return None
|
|
|
|
def _resolveKantonByName(self, name: str) -> Optional[str]:
|
|
"""Resolve Kanton name to ID by looking up in Kanton table."""
|
|
try:
|
|
# First try exact match
|
|
kantone = self.db.getRecordset(
|
|
Kanton,
|
|
recordFilter={"label": name}
|
|
)
|
|
if kantone:
|
|
kantonId = kantone[0].get("id")
|
|
logger.debug(f"Found Kanton '{name}' with ID '{kantonId}'")
|
|
return kantonId
|
|
|
|
# Try case-insensitive search
|
|
self.db._ensure_connection()
|
|
with self.db.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
'SELECT "id" FROM "Kanton" WHERE LOWER("label") = LOWER(%s) LIMIT 1',
|
|
(name,)
|
|
)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
# psycopg2 returns tuples, so result[0] is the id
|
|
kantonId = result[0]
|
|
logger.debug(f"Found Kanton '{name}' (case-insensitive) with ID '{kantonId}'")
|
|
return kantonId
|
|
|
|
logger.warning(f"Kanton '{name}' not found in database")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error resolving Kanton by name '{name}': {e}", exc_info=True)
|
|
return None
|
|
|
|
def _resolveLandByName(self, name: str) -> Optional[str]:
|
|
"""Resolve Land name to ID by looking up in Land table."""
|
|
try:
|
|
# First try exact match
|
|
laender = self.db.getRecordset(
|
|
Land,
|
|
recordFilter={"label": name}
|
|
)
|
|
if laender:
|
|
landId = laender[0].get("id")
|
|
logger.debug(f"Found Land '{name}' with ID '{landId}'")
|
|
return landId
|
|
|
|
# Try case-insensitive search
|
|
self.db._ensure_connection()
|
|
with self.db.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
'SELECT "id" FROM "Land" WHERE LOWER("label") = LOWER(%s) LIMIT 1',
|
|
(name,)
|
|
)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
# psycopg2 returns tuples, so result[0] is the id
|
|
landId = result[0]
|
|
logger.debug(f"Found Land '{name}' (case-insensitive) with ID '{landId}'")
|
|
return landId
|
|
|
|
logger.warning(f"Land '{name}' not found in database")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error resolving Land by name '{name}': {e}", exc_info=True)
|
|
return None
|
|
|
|
def updateParzelle(self, parzelleId: str, updateData: Dict[str, Any]) -> Optional[Parzelle]:
|
|
"""Update a plot."""
|
|
parzelle = self.getParzelle(parzelleId)
|
|
if not parzelle:
|
|
return None
|
|
|
|
if not self.checkRbacPermission(Parzelle, "update", parzelleId):
|
|
raise PermissionError(f"User {self.userId} cannot modify plot {parzelleId}")
|
|
|
|
for key, value in updateData.items():
|
|
if hasattr(parzelle, key):
|
|
setattr(parzelle, key, value)
|
|
|
|
self.db.recordModify(Parzelle, parzelleId, parzelle.model_dump())
|
|
|
|
return parzelle
|
|
|
|
def deleteParzelle(self, parzelleId: str) -> bool:
|
|
"""Delete a plot."""
|
|
parzelle = self.getParzelle(parzelleId)
|
|
if not parzelle:
|
|
return False
|
|
|
|
if not self.checkRbacPermission(Parzelle, "delete", parzelleId):
|
|
raise PermissionError(f"User {self.userId} cannot delete plot {parzelleId}")
|
|
|
|
return self.db.recordDelete(Parzelle, parzelleId)
|
|
|
|
# ===== Dokument Methods =====
|
|
|
|
def createDokument(self, dokument: Dokument) -> Dokument:
|
|
"""Create a new document."""
|
|
# Check RBAC permission
|
|
if not self.checkRbacPermission(Dokument, "create"):
|
|
raise PermissionError(f"User {self.userId} cannot create documents")
|
|
|
|
# Ensure mandateId and featureInstanceId are set for proper data isolation
|
|
if not dokument.mandateId:
|
|
dokument.mandateId = self.mandateId
|
|
if not dokument.featureInstanceId:
|
|
dokument.featureInstanceId = self.featureInstanceId
|
|
|
|
self.db.recordCreate(Dokument, dokument.model_dump())
|
|
|
|
return dokument
|
|
|
|
def getDokument(self, dokumentId: str) -> Optional[Dokument]:
|
|
"""Get a document by ID."""
|
|
records = getRecordsetWithRBAC(
|
|
self.db,
|
|
Dokument,
|
|
self.currentUser,
|
|
recordFilter={"id": dokumentId},
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if not records:
|
|
return None
|
|
|
|
return Dokument(**records[0])
|
|
|
|
def getDokumente(self, recordFilter: Optional[Dict[str, Any]] = None, pagination: Optional[PaginationParams] = None) -> Union[List[Dokument], PaginatedResult]:
|
|
"""Get all documents matching the filter with optional DB-level pagination."""
|
|
result = getRecordsetPaginatedWithRBAC(
|
|
self.db,
|
|
Dokument,
|
|
self.currentUser,
|
|
pagination=pagination,
|
|
recordFilter=recordFilter or {},
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if isinstance(result, PaginatedResult):
|
|
result.items = [Dokument(**r) for r in result.items]
|
|
return result
|
|
return [Dokument(**r) for r in result]
|
|
|
|
def updateDokument(self, dokumentId: str, updateData: Dict[str, Any]) -> Optional[Dokument]:
|
|
"""Update a document."""
|
|
dokument = self.getDokument(dokumentId)
|
|
if not dokument:
|
|
return None
|
|
|
|
if not self.checkRbacPermission(Dokument, "update", dokumentId):
|
|
raise PermissionError(f"User {self.userId} cannot modify document {dokumentId}")
|
|
|
|
for key, value in updateData.items():
|
|
if hasattr(dokument, key):
|
|
setattr(dokument, key, value)
|
|
|
|
self.db.recordModify(Dokument, dokumentId, dokument.model_dump())
|
|
return dokument
|
|
|
|
def deleteDokument(self, dokumentId: str) -> bool:
|
|
"""Delete a document."""
|
|
dokument = self.getDokument(dokumentId)
|
|
if not dokument:
|
|
return False
|
|
|
|
if not self.checkRbacPermission(Dokument, "delete", dokumentId):
|
|
raise PermissionError(f"User {self.userId} cannot delete document {dokumentId}")
|
|
|
|
return self.db.recordDelete(Dokument, dokumentId)
|
|
|
|
# ===== Gemeinde Methods =====
|
|
|
|
def createGemeinde(self, gemeinde: Gemeinde) -> Gemeinde:
|
|
"""Create a new municipality."""
|
|
# Check RBAC permission
|
|
if not self.checkRbacPermission(Gemeinde, "create"):
|
|
raise PermissionError(f"User {self.userId} cannot create municipalities")
|
|
|
|
# Ensure mandateId and featureInstanceId are set for proper data isolation
|
|
if not gemeinde.mandateId:
|
|
gemeinde.mandateId = self.mandateId
|
|
if not gemeinde.featureInstanceId:
|
|
gemeinde.featureInstanceId = self.featureInstanceId
|
|
|
|
self.db.recordCreate(Gemeinde, gemeinde.model_dump())
|
|
|
|
return gemeinde
|
|
|
|
def getGemeinde(self, gemeindeId: str) -> Optional[Gemeinde]:
|
|
"""Get a municipality by ID."""
|
|
records = getRecordsetWithRBAC(
|
|
self.db,
|
|
Gemeinde,
|
|
self.currentUser,
|
|
recordFilter={"id": gemeindeId},
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if not records:
|
|
return None
|
|
|
|
return Gemeinde(**records[0])
|
|
|
|
def getGemeinden(self, recordFilter: Optional[Dict[str, Any]] = None, pagination: Optional[PaginationParams] = None) -> Union[List[Gemeinde], PaginatedResult]:
|
|
"""Get all municipalities matching the filter with optional DB-level pagination."""
|
|
result = getRecordsetPaginatedWithRBAC(
|
|
self.db,
|
|
Gemeinde,
|
|
self.currentUser,
|
|
pagination=pagination,
|
|
recordFilter=recordFilter or {},
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if isinstance(result, PaginatedResult):
|
|
result.items = [Gemeinde(**r) for r in result.items]
|
|
return result
|
|
return [Gemeinde(**r) for r in result]
|
|
|
|
def updateGemeinde(self, gemeindeId: str, updateData: Dict[str, Any]) -> Optional[Gemeinde]:
|
|
"""Update a municipality."""
|
|
gemeinde = self.getGemeinde(gemeindeId)
|
|
if not gemeinde:
|
|
return None
|
|
|
|
if not self.checkRbacPermission(Gemeinde, "update", gemeindeId):
|
|
raise PermissionError(f"User {self.userId} cannot modify municipality {gemeindeId}")
|
|
|
|
for key, value in updateData.items():
|
|
if hasattr(gemeinde, key):
|
|
setattr(gemeinde, key, value)
|
|
|
|
self.db.recordModify(Gemeinde, gemeindeId, gemeinde.model_dump())
|
|
return gemeinde
|
|
|
|
def deleteGemeinde(self, gemeindeId: str) -> bool:
|
|
"""Delete a municipality."""
|
|
gemeinde = self.getGemeinde(gemeindeId)
|
|
if not gemeinde:
|
|
return False
|
|
|
|
if not self.checkRbacPermission(Gemeinde, "delete", gemeindeId):
|
|
raise PermissionError(f"User {self.userId} cannot delete municipality {gemeindeId}")
|
|
|
|
return self.db.recordDelete(Gemeinde, gemeindeId)
|
|
|
|
# ===== Kanton Methods =====
|
|
|
|
def createKanton(self, kanton: Kanton) -> Kanton:
|
|
"""Create a new canton."""
|
|
# Check RBAC permission
|
|
if not self.checkRbacPermission(Kanton, "create"):
|
|
raise PermissionError(f"User {self.userId} cannot create cantons")
|
|
|
|
# Ensure mandateId and featureInstanceId are set for proper data isolation
|
|
if not kanton.mandateId:
|
|
kanton.mandateId = self.mandateId
|
|
if not kanton.featureInstanceId:
|
|
kanton.featureInstanceId = self.featureInstanceId
|
|
|
|
self.db.recordCreate(Kanton, kanton.model_dump())
|
|
|
|
return kanton
|
|
|
|
def getKanton(self, kantonId: str) -> Optional[Kanton]:
|
|
"""Get a canton by ID."""
|
|
records = getRecordsetWithRBAC(
|
|
self.db,
|
|
Kanton,
|
|
self.currentUser,
|
|
recordFilter={"id": kantonId},
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if not records:
|
|
return None
|
|
|
|
return Kanton(**records[0])
|
|
|
|
def getKantone(self, recordFilter: Optional[Dict[str, Any]] = None, pagination: Optional[PaginationParams] = None) -> Union[List[Kanton], PaginatedResult]:
|
|
"""Get all cantons matching the filter with optional DB-level pagination."""
|
|
result = getRecordsetPaginatedWithRBAC(
|
|
self.db,
|
|
Kanton,
|
|
self.currentUser,
|
|
pagination=pagination,
|
|
recordFilter=recordFilter or {},
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if isinstance(result, PaginatedResult):
|
|
result.items = [Kanton(**r) for r in result.items]
|
|
return result
|
|
return [Kanton(**r) for r in result]
|
|
|
|
def updateKanton(self, kantonId: str, updateData: Dict[str, Any]) -> Optional[Kanton]:
|
|
"""Update a canton."""
|
|
kanton = self.getKanton(kantonId)
|
|
if not kanton:
|
|
return None
|
|
|
|
if not self.checkRbacPermission(Kanton, "update", kantonId):
|
|
raise PermissionError(f"User {self.userId} cannot modify canton {kantonId}")
|
|
|
|
for key, value in updateData.items():
|
|
if hasattr(kanton, key):
|
|
setattr(kanton, key, value)
|
|
|
|
self.db.recordModify(Kanton, kantonId, kanton.model_dump())
|
|
return kanton
|
|
|
|
def deleteKanton(self, kantonId: str) -> bool:
|
|
"""Delete a canton."""
|
|
kanton = self.getKanton(kantonId)
|
|
if not kanton:
|
|
return False
|
|
|
|
if not self.checkRbacPermission(Kanton, "delete", kantonId):
|
|
raise PermissionError(f"User {self.userId} cannot delete canton {kantonId}")
|
|
|
|
return self.db.recordDelete(Kanton, kantonId)
|
|
|
|
# ===== Land Methods =====
|
|
|
|
def createLand(self, land: Land) -> Land:
|
|
"""Create a new country."""
|
|
# Check RBAC permission
|
|
if not self.checkRbacPermission(Land, "create"):
|
|
raise PermissionError(f"User {self.userId} cannot create countries")
|
|
|
|
# Ensure mandateId and featureInstanceId are set for proper data isolation
|
|
if not land.mandateId:
|
|
land.mandateId = self.mandateId
|
|
if not land.featureInstanceId:
|
|
land.featureInstanceId = self.featureInstanceId
|
|
|
|
self.db.recordCreate(Land, land.model_dump())
|
|
|
|
return land
|
|
|
|
def getLand(self, landId: str) -> Optional[Land]:
|
|
"""Get a country by ID."""
|
|
records = getRecordsetWithRBAC(
|
|
self.db,
|
|
Land,
|
|
self.currentUser,
|
|
recordFilter={"id": landId},
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if not records:
|
|
return None
|
|
|
|
return Land(**records[0])
|
|
|
|
def getLaender(self, recordFilter: Optional[Dict[str, Any]] = None, pagination: Optional[PaginationParams] = None) -> Union[List[Land], PaginatedResult]:
|
|
"""Get all countries matching the filter with optional DB-level pagination."""
|
|
result = getRecordsetPaginatedWithRBAC(
|
|
self.db,
|
|
Land,
|
|
self.currentUser,
|
|
pagination=pagination,
|
|
recordFilter=recordFilter or {},
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
if isinstance(result, PaginatedResult):
|
|
result.items = [Land(**r) for r in result.items]
|
|
return result
|
|
return [Land(**r) for r in result]
|
|
|
|
def updateLand(self, landId: str, updateData: Dict[str, Any]) -> Optional[Land]:
|
|
"""Update a country."""
|
|
land = self.getLand(landId)
|
|
if not land:
|
|
return None
|
|
|
|
if not self.checkRbacPermission(Land, "update", landId):
|
|
raise PermissionError(f"User {self.userId} cannot modify country {landId}")
|
|
|
|
for key, value in updateData.items():
|
|
if hasattr(land, key):
|
|
setattr(land, key, value)
|
|
|
|
self.db.recordModify(Land, landId, land.model_dump())
|
|
return land
|
|
|
|
def deleteLand(self, landId: str) -> bool:
|
|
"""Delete a country."""
|
|
land = self.getLand(landId)
|
|
if not land:
|
|
return False
|
|
|
|
if not self.checkRbacPermission(Land, "delete", landId):
|
|
raise PermissionError(f"User {self.userId} cannot delete country {landId}")
|
|
|
|
return self.db.recordDelete(Land, landId)
|
|
|
|
# ===== RBAC Permission Checks =====
|
|
|
|
def checkRbacPermission(
|
|
self,
|
|
modelClass: type,
|
|
operation: str,
|
|
recordId: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Check RBAC permission for a specific operation on a table.
|
|
|
|
Args:
|
|
modelClass: Pydantic model class for the table
|
|
operation: Operation to check ('create', 'update', 'delete', 'read')
|
|
recordId: Optional record ID for specific record check
|
|
|
|
Returns:
|
|
Boolean indicating permission
|
|
"""
|
|
if not self.rbac or not self.currentUser:
|
|
return False
|
|
|
|
tableName = modelClass.__name__
|
|
from modules.interfaces.interfaceRbac import buildDataObjectKey
|
|
objectKey = buildDataObjectKey(tableName, featureCode=self.featureCode if hasattr(self, 'featureCode') else None)
|
|
permissions = self.rbac.getUserPermissions(
|
|
self.currentUser,
|
|
AccessRuleContext.DATA,
|
|
objectKey,
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId
|
|
)
|
|
|
|
if operation == "create":
|
|
return permissions.create != AccessLevel.NONE
|
|
elif operation == "update":
|
|
return permissions.update != AccessLevel.NONE
|
|
elif operation == "delete":
|
|
return permissions.delete != AccessLevel.NONE
|
|
elif operation == "read":
|
|
return permissions.read != AccessLevel.NONE
|
|
else:
|
|
return False
|
|
|
|
# ===== Direct Query Execution (stateless) =====
|
|
|
|
def executeQuery(self, queryText: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""
|
|
Execute a SQL query directly on the database (stateless).
|
|
|
|
WARNING: This method executes raw SQL. Ensure proper validation and sanitization
|
|
before calling this method. Consider implementing query whitelisting or
|
|
only allowing SELECT statements for production use.
|
|
|
|
Args:
|
|
queryText: SQL query string (preferably SELECT only)
|
|
parameters: Optional parameters for parameterized queries
|
|
|
|
Returns:
|
|
Dictionary with 'rows' (list of dicts), 'columns' (list of column names),
|
|
'rowCount' (int), and 'executionTime' (float)
|
|
"""
|
|
import time
|
|
|
|
try:
|
|
start_time = time.time()
|
|
|
|
# Ensure connection is alive
|
|
self.db._ensure_connection()
|
|
|
|
with self.db.connection.cursor() as cursor:
|
|
# Execute query
|
|
if parameters:
|
|
# Use parameterized query for safety
|
|
cursor.execute(queryText, parameters)
|
|
else:
|
|
cursor.execute(queryText)
|
|
|
|
# Fetch results
|
|
rows = cursor.fetchall()
|
|
|
|
# Convert to list of dictionaries
|
|
result_rows = []
|
|
if rows:
|
|
columns = [desc[0] for desc in cursor.description] if cursor.description else []
|
|
result_rows = [dict(zip(columns, row)) for row in rows]
|
|
else:
|
|
columns = []
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
return {
|
|
"rows": result_rows,
|
|
"columns": columns,
|
|
"rowCount": len(result_rows),
|
|
"executionTime": execution_time,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error executing query: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
def getInterface(currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> RealEstateObjects:
|
|
"""
|
|
Factory function to get or create a Real Estate interface instance for a user.
|
|
Uses singleton pattern per user.
|
|
|
|
Args:
|
|
currentUser: The authenticated user
|
|
mandateId: The mandate ID from RequestContext (X-Mandate-Id header). Required.
|
|
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header).
|
|
"""
|
|
effectiveMandateId = str(mandateId) if mandateId else None
|
|
effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None
|
|
|
|
# Include featureInstanceId in key for proper isolation
|
|
userKey = f"{currentUser.id}_{effectiveMandateId}_{effectiveFeatureInstanceId}"
|
|
|
|
if userKey not in _realEstateInterfaces:
|
|
_realEstateInterfaces[userKey] = RealEstateObjects(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
|
|
else:
|
|
# Update user context if needed
|
|
_realEstateInterfaces[userKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
|
|
|
|
return _realEstateInterfaces[userKey]
|
|
|