291 lines
11 KiB
Python
291 lines
11 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Database interface for the Neutralizer feature.
|
|
Handles CRUD operations for neutralization configuration and attributes.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
from modules.features.neutralization.datamodelFeatureNeutralizer import (
|
|
DataNeutraliserConfig,
|
|
DataNeutralizerAttributes,
|
|
)
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
from modules.datamodels.datamodelUam import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton cache for interface instances
|
|
_neutralizerInterfaces = {}
|
|
|
|
|
|
class InterfaceFeatureNeutralizer:
|
|
"""Database interface for Neutralizer feature operations"""
|
|
|
|
# Feature code for RBAC objectKey construction
|
|
FEATURE_CODE = "neutralization"
|
|
|
|
def __init__(self, currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
|
|
"""
|
|
Initialize the interface with database connection and user context.
|
|
|
|
Args:
|
|
currentUser: Current user object for RBAC
|
|
mandateId: Current mandate ID
|
|
featureInstanceId: Current feature instance ID
|
|
"""
|
|
self.currentUser = currentUser
|
|
self.mandateId = mandateId
|
|
self.featureInstanceId = featureInstanceId
|
|
self.userId = currentUser.id if currentUser else None
|
|
self.db = None
|
|
|
|
# Initialize database
|
|
self._initializeDatabase()
|
|
|
|
def _initializeDatabase(self):
|
|
"""Initialize the database connection."""
|
|
try:
|
|
# Use same database config pattern as other feature interfaces
|
|
dbHost = APP_CONFIG.get("DB_HOST", "localhost")
|
|
dbDatabase = "poweron_neutralization"
|
|
dbUser = APP_CONFIG.get("DB_USER", "postgres")
|
|
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
|
|
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
|
|
|
|
self.db = DatabaseConnector(
|
|
dbHost=dbHost,
|
|
dbDatabase=dbDatabase,
|
|
dbUser=dbUser,
|
|
dbPassword=dbPassword,
|
|
dbPort=dbPort,
|
|
userId=self.userId,
|
|
)
|
|
logger.debug("Neutralizer database initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing Neutralizer database: {str(e)}")
|
|
raise
|
|
|
|
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
|
|
"""Sets the user context for the interface."""
|
|
if not currentUser:
|
|
logger.info("Initializing interface without user context")
|
|
return
|
|
|
|
self.currentUser = currentUser
|
|
self.userId = currentUser.id
|
|
self.mandateId = mandateId
|
|
self.featureInstanceId = featureInstanceId
|
|
|
|
def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]:
|
|
"""Get the data neutralization configuration for the current user's mandate and instance"""
|
|
try:
|
|
record_filter = {"mandateId": self.mandateId}
|
|
if self.featureInstanceId:
|
|
record_filter["featureInstanceId"] = self.featureInstanceId
|
|
filteredConfigs = getRecordsetWithRBAC(
|
|
self.db,
|
|
DataNeutraliserConfig,
|
|
self.currentUser,
|
|
recordFilter=record_filter,
|
|
mandateId=self.mandateId
|
|
)
|
|
|
|
if not filteredConfigs:
|
|
return None
|
|
|
|
# Filter out database-specific fields
|
|
configDict = filteredConfigs[0]
|
|
cleanedConfig = {k: v for k, v in configDict.items() if not k.startswith("_")}
|
|
return DataNeutraliserConfig(**cleanedConfig)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting neutralization config: {str(e)}")
|
|
return None
|
|
|
|
def createOrUpdateNeutralizationConfig(
|
|
self, configData: Dict[str, Any]
|
|
) -> DataNeutraliserConfig:
|
|
"""Create or update the data neutralization configuration"""
|
|
try:
|
|
# Check if config already exists
|
|
existingConfig = self.getNeutralizationConfig()
|
|
|
|
if existingConfig:
|
|
# Update existing config
|
|
updateData = existingConfig.model_dump()
|
|
updateData.update(configData)
|
|
updateData["updatedAt"] = getUtcTimestamp()
|
|
|
|
updatedConfig = DataNeutraliserConfig(**updateData)
|
|
self.db.recordModify(
|
|
DataNeutraliserConfig, existingConfig.id, updatedConfig
|
|
)
|
|
|
|
return updatedConfig
|
|
else:
|
|
# Create new config
|
|
configData["mandateId"] = self.mandateId
|
|
configData["userId"] = self.userId
|
|
if self.featureInstanceId:
|
|
configData["featureInstanceId"] = self.featureInstanceId
|
|
|
|
newConfig = DataNeutraliserConfig(**configData)
|
|
createdRecord = self.db.recordCreate(DataNeutraliserConfig, newConfig)
|
|
|
|
return DataNeutraliserConfig(**createdRecord)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating/updating neutralization config: {str(e)}")
|
|
raise ValueError(f"Failed to create/update neutralization config: {str(e)}")
|
|
|
|
def getNeutralizationAttributes(
|
|
self, fileId: Optional[str] = None
|
|
) -> List[DataNeutralizerAttributes]:
|
|
"""Get neutralization attributes, optionally filtered by file ID"""
|
|
try:
|
|
filterDict = {"mandateId": self.mandateId}
|
|
if fileId:
|
|
filterDict["fileId"] = fileId
|
|
|
|
# Use RBAC filtering
|
|
filteredAttributes = getRecordsetWithRBAC(
|
|
self.db,
|
|
DataNeutralizerAttributes,
|
|
self.currentUser,
|
|
recordFilter=filterDict,
|
|
mandateId=self.mandateId
|
|
)
|
|
|
|
# Filter out database-specific fields
|
|
cleanedAttributes = []
|
|
for attr in filteredAttributes:
|
|
cleanedAttr = {k: v for k, v in attr.items() if not k.startswith("_")}
|
|
cleanedAttributes.append(cleanedAttr)
|
|
|
|
return [
|
|
DataNeutralizerAttributes(**attr)
|
|
for attr in cleanedAttributes
|
|
]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting neutralization attributes: {str(e)}")
|
|
return []
|
|
|
|
def deleteNeutralizationAttributes(self, fileId: str) -> bool:
|
|
"""Delete all neutralization attributes for a specific file"""
|
|
try:
|
|
attributes = self.db.getRecordset(
|
|
DataNeutralizerAttributes,
|
|
recordFilter={"mandateId": self.mandateId, "fileId": fileId},
|
|
)
|
|
|
|
for attribute in attributes:
|
|
self.db.recordDelete(DataNeutralizerAttributes, attribute["id"])
|
|
|
|
logger.info(
|
|
f"Deleted {len(attributes)} neutralization attributes for file {fileId}"
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting neutralization attributes: {str(e)}")
|
|
return False
|
|
|
|
def getAttributeById(self, attributeId: str) -> Optional[Dict[str, Any]]:
|
|
"""Get a single neutralization attribute by ID"""
|
|
try:
|
|
attributes = self.db.getRecordset(
|
|
DataNeutralizerAttributes,
|
|
recordFilter={"mandateId": self.mandateId, "id": attributeId}
|
|
)
|
|
if not attributes:
|
|
return None
|
|
attr = attributes[0]
|
|
return {k: v for k, v in attr.items() if not k.startswith("_")}
|
|
except Exception as e:
|
|
logger.error(f"Error getting attribute by ID: {str(e)}")
|
|
return None
|
|
|
|
def deleteAttributeById(self, attributeId: str) -> bool:
|
|
"""Delete a single neutralization attribute by its ID"""
|
|
try:
|
|
attribute = self.getAttributeById(attributeId)
|
|
if not attribute:
|
|
logger.warning(f"Attribute {attributeId} not found for deletion")
|
|
return False
|
|
|
|
self.db.recordDelete(DataNeutralizerAttributes, attributeId)
|
|
logger.info(f"Deleted neutralization attribute {attributeId}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error deleting attribute by ID: {str(e)}")
|
|
return False
|
|
|
|
def createAttribute(
|
|
self,
|
|
attributeId: str,
|
|
originalText: str,
|
|
patternType: str,
|
|
fileId: Optional[str] = None
|
|
) -> Optional[DataNeutralizerAttributes]:
|
|
"""Create a neutralization attribute for placeholder resolution."""
|
|
try:
|
|
mandate_id = self.mandateId or ""
|
|
feature_instance_id = self.featureInstanceId or ""
|
|
if not self.userId:
|
|
logger.warning("Cannot create attribute: missing userId")
|
|
return None
|
|
attr = DataNeutralizerAttributes(
|
|
id=attributeId,
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
userId=self.userId,
|
|
originalText=originalText,
|
|
fileId=fileId,
|
|
patternType=patternType,
|
|
)
|
|
|
|
created = self.db.recordCreate(DataNeutralizerAttributes, attr.model_dump())
|
|
return DataNeutralizerAttributes(**{k: v for k, v in created.items() if not k.startswith("_")})
|
|
except Exception as e:
|
|
logger.error(f"Error creating attribute: {str(e)}")
|
|
return None
|
|
|
|
|
|
def getInterface(currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> InterfaceFeatureNeutralizer:
|
|
"""
|
|
Factory function to get or create a Neutralizer interface instance.
|
|
Uses singleton pattern per user context.
|
|
|
|
Args:
|
|
currentUser: Current user for RBAC
|
|
mandateId: Current mandate ID
|
|
featureInstanceId: Current feature instance ID
|
|
|
|
Returns:
|
|
InterfaceFeatureNeutralizer instance
|
|
"""
|
|
global _neutralizerInterfaces
|
|
|
|
if not currentUser:
|
|
raise ValueError("Valid user context required")
|
|
|
|
effectiveMandateId = str(mandateId) if mandateId else None
|
|
effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None
|
|
|
|
# Include featureInstanceId in cache key for proper isolation
|
|
cacheKey = f"{currentUser.id}_{effectiveMandateId}_{effectiveFeatureInstanceId}"
|
|
|
|
if cacheKey not in _neutralizerInterfaces:
|
|
_neutralizerInterfaces[cacheKey] = InterfaceFeatureNeutralizer(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
|
|
else:
|
|
# Update user context if needed
|
|
_neutralizerInterfaces[cacheKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
|
|
|
|
return _neutralizerInterfaces[cacheKey]
|