gateway/modules/features/neutralization/interfaceFeatureNeutralizer.py
2026-01-25 03:01:01 +01:00

176 lines
6.1 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Database interface for the Neutralizer feature.
Handles CRUD operations for neutralization configuration and attributes.
"""
import logging
from typing import Dict, List, Any, Optional
from modules.features.neutralization.datamodelFeatureNeutralizer import (
DataNeutraliserConfig,
DataNeutralizerAttributes,
)
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
from modules.shared.timeUtils import getUtcTimestamp
logger = logging.getLogger(__name__)
class InterfaceFeatureNeutralizer:
"""Database interface for Neutralizer feature operations"""
def __init__(self, db, currentUser, mandateId: str, userId: str):
"""
Initialize the interface with database connection and user context.
Args:
db: Database connection instance
currentUser: Current user object for RBAC
mandateId: Current mandate ID
userId: Current user ID
"""
self.db = db
self.currentUser = currentUser
self.mandateId = mandateId
self.userId = userId
def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]:
"""Get the data neutralization configuration for the current user's mandate"""
try:
# Use RBAC filtering
filteredConfigs = getRecordsetWithRBAC(
self.db,
DataNeutraliserConfig,
self.currentUser,
recordFilter={"mandateId": self.mandateId}
)
if not filteredConfigs:
return None
# Filter out database-specific fields
configDict = filteredConfigs[0]
cleanedConfig = {k: v for k, v in configDict.items() if not k.startswith("_")}
return DataNeutraliserConfig(**cleanedConfig)
except Exception as e:
logger.error(f"Error getting neutralization config: {str(e)}")
return None
def createOrUpdateNeutralizationConfig(
self, configData: Dict[str, Any]
) -> DataNeutraliserConfig:
"""Create or update the data neutralization configuration"""
try:
# Check if config already exists
existingConfig = self.getNeutralizationConfig()
if existingConfig:
# Update existing config
updateData = existingConfig.model_dump()
updateData.update(configData)
updateData["updatedAt"] = getUtcTimestamp()
updatedConfig = DataNeutraliserConfig(**updateData)
self.db.recordModify(
DataNeutraliserConfig, existingConfig.id, updatedConfig
)
return updatedConfig
else:
# Create new config
configData["mandateId"] = self.mandateId
configData["userId"] = self.userId
newConfig = DataNeutraliserConfig(**configData)
createdRecord = self.db.recordCreate(DataNeutraliserConfig, newConfig)
return DataNeutraliserConfig(**createdRecord)
except Exception as e:
logger.error(f"Error creating/updating neutralization config: {str(e)}")
raise ValueError(f"Failed to create/update neutralization config: {str(e)}")
def getNeutralizationAttributes(
self, fileId: Optional[str] = None
) -> List[DataNeutralizerAttributes]:
"""Get neutralization attributes, optionally filtered by file ID"""
try:
filterDict = {"mandateId": self.mandateId}
if fileId:
filterDict["fileId"] = fileId
# Use RBAC filtering
filteredAttributes = getRecordsetWithRBAC(
self.db,
DataNeutralizerAttributes,
self.currentUser,
recordFilter=filterDict
)
# Filter out database-specific fields
cleanedAttributes = []
for attr in filteredAttributes:
cleanedAttr = {k: v for k, v in attr.items() if not k.startswith("_")}
cleanedAttributes.append(cleanedAttr)
return [
DataNeutralizerAttributes(**attr)
for attr in cleanedAttributes
]
except Exception as e:
logger.error(f"Error getting neutralization attributes: {str(e)}")
return []
def deleteNeutralizationAttributes(self, fileId: str) -> bool:
"""Delete all neutralization attributes for a specific file"""
try:
attributes = self.db.getRecordset(
DataNeutralizerAttributes,
recordFilter={"mandateId": self.mandateId, "fileId": fileId},
)
for attribute in attributes:
self.db.recordDelete(DataNeutralizerAttributes, attribute["id"])
logger.info(
f"Deleted {len(attributes)} neutralization attributes for file {fileId}"
)
return True
except Exception as e:
logger.error(f"Error deleting neutralization attributes: {str(e)}")
return False
def getAttributeById(self, attributeId: str) -> Optional[Dict[str, Any]]:
"""Get a single neutralization attribute by ID"""
try:
attributes = self.db.getRecordset(
DataNeutralizerAttributes,
recordFilter={"mandateId": self.mandateId, "id": attributeId}
)
if attributes:
return attributes[0]
return None
except Exception as e:
logger.error(f"Error getting attribute by ID: {str(e)}")
return None
def getInterface(db, currentUser, mandateId: str, userId: str) -> InterfaceFeatureNeutralizer:
"""
Factory function to create a Neutralizer interface instance.
Args:
db: Database connection
currentUser: Current user for RBAC
mandateId: Current mandate ID
userId: Current user ID
Returns:
InterfaceFeatureNeutralizer instance
"""
return InterfaceFeatureNeutralizer(db, currentUser, mandateId, userId)