gateway/modules/features/neutralization/interfaceFeatureNeutralizer.py

276 lines
10 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Database interface for the Neutralizer feature.
Handles CRUD operations for neutralization configuration and attributes.
"""
import logging
from typing import Dict, List, Any, Optional
from modules.features.neutralization.datamodelFeatureNeutralizer import (
DataNeutraliserConfig,
DataNeutralizerAttributes,
)
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
from modules.shared.configuration import APP_CONFIG
from modules.shared.timeUtils import getUtcTimestamp
from modules.datamodels.datamodelUam import User
logger = logging.getLogger(__name__)
# Singleton cache for interface instances
_neutralizerInterfaces = {}
class InterfaceFeatureNeutralizer:
"""Database interface for Neutralizer feature operations"""
# Feature code for RBAC objectKey construction
FEATURE_CODE = "neutralization"
def __init__(self, currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
"""
Initialize the interface with database connection and user context.
Args:
currentUser: Current user object for RBAC
mandateId: Current mandate ID
featureInstanceId: Current feature instance ID
"""
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.userId = currentUser.id if currentUser else None
self.db = None
# Initialize database
self._initializeDatabase()
def _initializeDatabase(self):
"""Initialize the database connection."""
try:
# Use same database config pattern as other feature interfaces
dbHost = APP_CONFIG.get("DB_HOST", "localhost")
dbDatabase = "poweron_neutralization"
dbUser = APP_CONFIG.get("DB_USER", "postgres")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId,
)
logger.debug("Neutralizer database initialized successfully")
except Exception as e:
logger.error(f"Error initializing Neutralizer database: {str(e)}")
raise
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
"""Sets the user context for the interface."""
if not currentUser:
logger.info("Initializing interface without user context")
return
self.currentUser = currentUser
self.userId = currentUser.id
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]:
"""Get the data neutralization configuration for the current user's mandate and instance"""
try:
record_filter = {"mandateId": self.mandateId}
if self.featureInstanceId:
record_filter["featureInstanceId"] = self.featureInstanceId
filteredConfigs = getRecordsetWithRBAC(
self.db,
DataNeutraliserConfig,
self.currentUser,
recordFilter=record_filter,
mandateId=self.mandateId
)
if not filteredConfigs:
return None
# Filter out database-specific fields
configDict = filteredConfigs[0]
cleanedConfig = {k: v for k, v in configDict.items() if not k.startswith("_")}
return DataNeutraliserConfig(**cleanedConfig)
except Exception as e:
logger.error(f"Error getting neutralization config: {str(e)}")
return None
def createOrUpdateNeutralizationConfig(
self, configData: Dict[str, Any]
) -> DataNeutraliserConfig:
"""Create or update the data neutralization configuration"""
try:
# Check if config already exists
existingConfig = self.getNeutralizationConfig()
if existingConfig:
# Update existing config
updateData = existingConfig.model_dump()
updateData.update(configData)
updateData["updatedAt"] = getUtcTimestamp()
updatedConfig = DataNeutraliserConfig(**updateData)
self.db.recordModify(
DataNeutraliserConfig, existingConfig.id, updatedConfig
)
return updatedConfig
else:
# Create new config
configData["mandateId"] = self.mandateId
configData["userId"] = self.userId
if self.featureInstanceId:
configData["featureInstanceId"] = self.featureInstanceId
newConfig = DataNeutraliserConfig(**configData)
createdRecord = self.db.recordCreate(DataNeutraliserConfig, newConfig)
return DataNeutraliserConfig(**createdRecord)
except Exception as e:
logger.error(f"Error creating/updating neutralization config: {str(e)}")
raise ValueError(f"Failed to create/update neutralization config: {str(e)}")
def getNeutralizationAttributes(
self, fileId: Optional[str] = None
) -> List[DataNeutralizerAttributes]:
"""Get neutralization attributes, optionally filtered by file ID"""
try:
filterDict = {"mandateId": self.mandateId}
if fileId:
filterDict["fileId"] = fileId
# Use RBAC filtering
filteredAttributes = getRecordsetWithRBAC(
self.db,
DataNeutralizerAttributes,
self.currentUser,
recordFilter=filterDict,
mandateId=self.mandateId
)
# Filter out database-specific fields
cleanedAttributes = []
for attr in filteredAttributes:
cleanedAttr = {k: v for k, v in attr.items() if not k.startswith("_")}
cleanedAttributes.append(cleanedAttr)
return [
DataNeutralizerAttributes(**attr)
for attr in cleanedAttributes
]
except Exception as e:
logger.error(f"Error getting neutralization attributes: {str(e)}")
return []
def deleteNeutralizationAttributes(self, fileId: str) -> bool:
"""Delete all neutralization attributes for a specific file"""
try:
attributes = self.db.getRecordset(
DataNeutralizerAttributes,
recordFilter={"mandateId": self.mandateId, "fileId": fileId},
)
for attribute in attributes:
self.db.recordDelete(DataNeutralizerAttributes, attribute["id"])
logger.info(
f"Deleted {len(attributes)} neutralization attributes for file {fileId}"
)
return True
except Exception as e:
logger.error(f"Error deleting neutralization attributes: {str(e)}")
return False
def getAttributeById(self, attributeId: str) -> Optional[Dict[str, Any]]:
"""Get a single neutralization attribute by ID"""
try:
attributes = self.db.getRecordset(
DataNeutralizerAttributes,
recordFilter={"mandateId": self.mandateId, "id": attributeId}
)
if not attributes:
return None
attr = attributes[0]
return {k: v for k, v in attr.items() if not k.startswith("_")}
except Exception as e:
logger.error(f"Error getting attribute by ID: {str(e)}")
return None
def createAttribute(
self,
attributeId: str,
originalText: str,
patternType: str,
fileId: Optional[str] = None
) -> Optional[DataNeutralizerAttributes]:
"""Create a neutralization attribute for placeholder resolution."""
try:
mandate_id = self.mandateId or ""
feature_instance_id = self.featureInstanceId or ""
if not self.userId:
logger.warning("Cannot create attribute: missing userId")
return None
attr = DataNeutralizerAttributes(
id=attributeId,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
userId=self.userId,
originalText=originalText,
fileId=fileId,
patternType=patternType,
)
created = self.db.recordCreate(DataNeutralizerAttributes, attr.model_dump())
return DataNeutralizerAttributes(**{k: v for k, v in created.items() if not k.startswith("_")})
except Exception as e:
logger.error(f"Error creating attribute: {str(e)}")
return None
def getInterface(currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> InterfaceFeatureNeutralizer:
"""
Factory function to get or create a Neutralizer interface instance.
Uses singleton pattern per user context.
Args:
currentUser: Current user for RBAC
mandateId: Current mandate ID
featureInstanceId: Current feature instance ID
Returns:
InterfaceFeatureNeutralizer instance
"""
global _neutralizerInterfaces
if not currentUser:
raise ValueError("Valid user context required")
effectiveMandateId = str(mandateId) if mandateId else None
effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None
# Include featureInstanceId in cache key for proper isolation
cacheKey = f"{currentUser.id}_{effectiveMandateId}_{effectiveFeatureInstanceId}"
if cacheKey not in _neutralizerInterfaces:
_neutralizerInterfaces[cacheKey] = InterfaceFeatureNeutralizer(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
else:
# Update user context if needed
_neutralizerInterfaces[cacheKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
return _neutralizerInterfaces[cacheKey]