# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Database interface for the Neutralizer feature. Handles CRUD operations for neutralization configuration and attributes. """ import logging from typing import Dict, List, Any, Optional from modules.features.neutralization.datamodelFeatureNeutralizer import ( DataNeutraliserConfig, DataNeutralizerAttributes, ) from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.interfaces.interfaceRbac import getRecordsetWithRBAC from modules.shared.configuration import APP_CONFIG from modules.shared.timeUtils import getUtcTimestamp from modules.datamodels.datamodelUam import User logger = logging.getLogger(__name__) # Singleton cache for interface instances _neutralizerInterfaces = {} class InterfaceFeatureNeutralizer: """Database interface for Neutralizer feature operations""" # Feature code for RBAC objectKey construction FEATURE_CODE = "neutralization" def __init__(self, currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None): """ Initialize the interface with database connection and user context. Args: currentUser: Current user object for RBAC mandateId: Current mandate ID featureInstanceId: Current feature instance ID """ self.currentUser = currentUser self.mandateId = mandateId self.featureInstanceId = featureInstanceId self.userId = currentUser.id if currentUser else None self.db = None # Initialize database self._initializeDatabase() def _initializeDatabase(self): """Initialize the database connection.""" try: # Use same database config pattern as other feature interfaces dbHost = APP_CONFIG.get("DB_HOST", "localhost") dbDatabase = "poweron_neutralization" dbUser = APP_CONFIG.get("DB_USER", "postgres") dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=self.userId, ) logger.debug("Neutralizer database initialized successfully") except Exception as e: logger.error(f"Error initializing Neutralizer database: {str(e)}") raise def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None): """Sets the user context for the interface.""" if not currentUser: logger.info("Initializing interface without user context") return self.currentUser = currentUser self.userId = currentUser.id self.mandateId = mandateId self.featureInstanceId = featureInstanceId def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]: """Get the data neutralization configuration for the current user's mandate""" try: # Use RBAC filtering filteredConfigs = getRecordsetWithRBAC( self.db, DataNeutraliserConfig, self.currentUser, recordFilter={"mandateId": self.mandateId}, mandateId=self.mandateId ) if not filteredConfigs: return None # Filter out database-specific fields configDict = filteredConfigs[0] cleanedConfig = {k: v for k, v in configDict.items() if not k.startswith("_")} return DataNeutraliserConfig(**cleanedConfig) except Exception as e: logger.error(f"Error getting neutralization config: {str(e)}") return None def createOrUpdateNeutralizationConfig( self, configData: Dict[str, Any] ) -> DataNeutraliserConfig: """Create or update the data neutralization configuration""" try: # Check if config already exists existingConfig = self.getNeutralizationConfig() if existingConfig: # Update existing config updateData = existingConfig.model_dump() updateData.update(configData) updateData["updatedAt"] = getUtcTimestamp() updatedConfig = DataNeutraliserConfig(**updateData) self.db.recordModify( DataNeutraliserConfig, existingConfig.id, updatedConfig ) return updatedConfig else: # Create new config configData["mandateId"] = self.mandateId configData["userId"] = self.userId newConfig = DataNeutraliserConfig(**configData) createdRecord = self.db.recordCreate(DataNeutraliserConfig, newConfig) return DataNeutraliserConfig(**createdRecord) except Exception as e: logger.error(f"Error creating/updating neutralization config: {str(e)}") raise ValueError(f"Failed to create/update neutralization config: {str(e)}") def getNeutralizationAttributes( self, fileId: Optional[str] = None ) -> List[DataNeutralizerAttributes]: """Get neutralization attributes, optionally filtered by file ID""" try: filterDict = {"mandateId": self.mandateId} if fileId: filterDict["fileId"] = fileId # Use RBAC filtering filteredAttributes = getRecordsetWithRBAC( self.db, DataNeutralizerAttributes, self.currentUser, recordFilter=filterDict, mandateId=self.mandateId ) # Filter out database-specific fields cleanedAttributes = [] for attr in filteredAttributes: cleanedAttr = {k: v for k, v in attr.items() if not k.startswith("_")} cleanedAttributes.append(cleanedAttr) return [ DataNeutralizerAttributes(**attr) for attr in cleanedAttributes ] except Exception as e: logger.error(f"Error getting neutralization attributes: {str(e)}") return [] def deleteNeutralizationAttributes(self, fileId: str) -> bool: """Delete all neutralization attributes for a specific file""" try: attributes = self.db.getRecordset( DataNeutralizerAttributes, recordFilter={"mandateId": self.mandateId, "fileId": fileId}, ) for attribute in attributes: self.db.recordDelete(DataNeutralizerAttributes, attribute["id"]) logger.info( f"Deleted {len(attributes)} neutralization attributes for file {fileId}" ) return True except Exception as e: logger.error(f"Error deleting neutralization attributes: {str(e)}") return False def getAttributeById(self, attributeId: str) -> Optional[Dict[str, Any]]: """Get a single neutralization attribute by ID""" try: attributes = self.db.getRecordset( DataNeutralizerAttributes, recordFilter={"mandateId": self.mandateId, "id": attributeId} ) if attributes: return attributes[0] return None except Exception as e: logger.error(f"Error getting attribute by ID: {str(e)}") return None def getInterface(currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> InterfaceFeatureNeutralizer: """ Factory function to get or create a Neutralizer interface instance. Uses singleton pattern per user context. Args: currentUser: Current user for RBAC mandateId: Current mandate ID featureInstanceId: Current feature instance ID Returns: InterfaceFeatureNeutralizer instance """ global _neutralizerInterfaces if not currentUser: raise ValueError("Valid user context required") effectiveMandateId = str(mandateId) if mandateId else None effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None # Include featureInstanceId in cache key for proper isolation cacheKey = f"{currentUser.id}_{effectiveMandateId}_{effectiveFeatureInstanceId}" if cacheKey not in _neutralizerInterfaces: _neutralizerInterfaces[cacheKey] = InterfaceFeatureNeutralizer(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId) else: # Update user context if needed _neutralizerInterfaces[cacheKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId) return _neutralizerInterfaces[cacheKey]