176 lines
6.1 KiB
Python
176 lines
6.1 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Database interface for the Neutralizer feature.
|
|
Handles CRUD operations for neutralization configuration and attributes.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
from modules.features.neutralizer.datamodelFeatureNeutralizer import (
|
|
DataNeutraliserConfig,
|
|
DataNeutralizerAttributes,
|
|
)
|
|
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class InterfaceFeatureNeutralizer:
|
|
"""Database interface for Neutralizer feature operations"""
|
|
|
|
def __init__(self, db, currentUser, mandateId: str, userId: str):
|
|
"""
|
|
Initialize the interface with database connection and user context.
|
|
|
|
Args:
|
|
db: Database connection instance
|
|
currentUser: Current user object for RBAC
|
|
mandateId: Current mandate ID
|
|
userId: Current user ID
|
|
"""
|
|
self.db = db
|
|
self.currentUser = currentUser
|
|
self.mandateId = mandateId
|
|
self.userId = userId
|
|
|
|
def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]:
|
|
"""Get the data neutralization configuration for the current user's mandate"""
|
|
try:
|
|
# Use RBAC filtering
|
|
filteredConfigs = getRecordsetWithRBAC(
|
|
self.db,
|
|
DataNeutraliserConfig,
|
|
self.currentUser,
|
|
recordFilter={"mandateId": self.mandateId}
|
|
)
|
|
|
|
if not filteredConfigs:
|
|
return None
|
|
|
|
# Filter out database-specific fields
|
|
configDict = filteredConfigs[0]
|
|
cleanedConfig = {k: v for k, v in configDict.items() if not k.startswith("_")}
|
|
return DataNeutraliserConfig(**cleanedConfig)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting neutralization config: {str(e)}")
|
|
return None
|
|
|
|
def createOrUpdateNeutralizationConfig(
|
|
self, configData: Dict[str, Any]
|
|
) -> DataNeutraliserConfig:
|
|
"""Create or update the data neutralization configuration"""
|
|
try:
|
|
# Check if config already exists
|
|
existingConfig = self.getNeutralizationConfig()
|
|
|
|
if existingConfig:
|
|
# Update existing config
|
|
updateData = existingConfig.model_dump()
|
|
updateData.update(configData)
|
|
updateData["updatedAt"] = getUtcTimestamp()
|
|
|
|
updatedConfig = DataNeutraliserConfig(**updateData)
|
|
self.db.recordModify(
|
|
DataNeutraliserConfig, existingConfig.id, updatedConfig
|
|
)
|
|
|
|
return updatedConfig
|
|
else:
|
|
# Create new config
|
|
configData["mandateId"] = self.mandateId
|
|
configData["userId"] = self.userId
|
|
|
|
newConfig = DataNeutraliserConfig(**configData)
|
|
createdRecord = self.db.recordCreate(DataNeutraliserConfig, newConfig)
|
|
|
|
return DataNeutraliserConfig(**createdRecord)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating/updating neutralization config: {str(e)}")
|
|
raise ValueError(f"Failed to create/update neutralization config: {str(e)}")
|
|
|
|
def getNeutralizationAttributes(
|
|
self, fileId: Optional[str] = None
|
|
) -> List[DataNeutralizerAttributes]:
|
|
"""Get neutralization attributes, optionally filtered by file ID"""
|
|
try:
|
|
filterDict = {"mandateId": self.mandateId}
|
|
if fileId:
|
|
filterDict["fileId"] = fileId
|
|
|
|
# Use RBAC filtering
|
|
filteredAttributes = getRecordsetWithRBAC(
|
|
self.db,
|
|
DataNeutralizerAttributes,
|
|
self.currentUser,
|
|
recordFilter=filterDict
|
|
)
|
|
|
|
# Filter out database-specific fields
|
|
cleanedAttributes = []
|
|
for attr in filteredAttributes:
|
|
cleanedAttr = {k: v for k, v in attr.items() if not k.startswith("_")}
|
|
cleanedAttributes.append(cleanedAttr)
|
|
|
|
return [
|
|
DataNeutralizerAttributes(**attr)
|
|
for attr in cleanedAttributes
|
|
]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting neutralization attributes: {str(e)}")
|
|
return []
|
|
|
|
def deleteNeutralizationAttributes(self, fileId: str) -> bool:
|
|
"""Delete all neutralization attributes for a specific file"""
|
|
try:
|
|
attributes = self.db.getRecordset(
|
|
DataNeutralizerAttributes,
|
|
recordFilter={"mandateId": self.mandateId, "fileId": fileId},
|
|
)
|
|
|
|
for attribute in attributes:
|
|
self.db.recordDelete(DataNeutralizerAttributes, attribute["id"])
|
|
|
|
logger.info(
|
|
f"Deleted {len(attributes)} neutralization attributes for file {fileId}"
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting neutralization attributes: {str(e)}")
|
|
return False
|
|
|
|
def getAttributeById(self, attributeId: str) -> Optional[Dict[str, Any]]:
|
|
"""Get a single neutralization attribute by ID"""
|
|
try:
|
|
attributes = self.db.getRecordset(
|
|
DataNeutralizerAttributes,
|
|
recordFilter={"mandateId": self.mandateId, "id": attributeId}
|
|
)
|
|
if attributes:
|
|
return attributes[0]
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting attribute by ID: {str(e)}")
|
|
return None
|
|
|
|
|
|
def getInterface(db, currentUser, mandateId: str, userId: str) -> InterfaceFeatureNeutralizer:
|
|
"""
|
|
Factory function to create a Neutralizer interface instance.
|
|
|
|
Args:
|
|
db: Database connection
|
|
currentUser: Current user for RBAC
|
|
mandateId: Current mandate ID
|
|
userId: Current user ID
|
|
|
|
Returns:
|
|
InterfaceFeatureNeutralizer instance
|
|
"""
|
|
return InterfaceFeatureNeutralizer(db, currentUser, mandateId, userId)
|