243 lines
10 KiB
Python
243 lines
10 KiB
Python
"""
|
|
Data Neutralization Service
|
|
Handles file processing for data neutralization including SharePoint integration
|
|
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
|
|
Unterstützt TXT, JSON, CSV, Excel und Word-Dateien
|
|
Mehrsprachig: DE, EN, FR, IT
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import json
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
from modules.datamodels.datamodelNeutralizer import DataNeutraliserConfig, DataNeutralizerAttributes
|
|
|
|
# Import all necessary classes and functions for neutralization
|
|
from modules.services.serviceNeutralization.subProcessCommon import CommonUtils, NeutralizationResult, NeutralizationAttribute
|
|
from modules.services.serviceNeutralization.subProcessText import TextProcessor, PlainText
|
|
from modules.services.serviceNeutralization.subProcessList import ListProcessor, TableData
|
|
from modules.services.serviceNeutralization.subProcessBinary import BinaryProcessor
|
|
from modules.services.serviceNeutralization.subPatterns import HeaderPatterns, DataPatterns, TextTablePatterns
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class NeutralizationService:
|
|
"""Service for handling data neutralization operations"""
|
|
|
|
def __init__(self, serviceCenter=None, NamesToParse: List[str] = None):
|
|
"""Initialize the service with user context and anonymization processors
|
|
|
|
Args:
|
|
serviceCenter: Service center instance for accessing other services
|
|
NamesToParse: List of names to parse and replace (case-insensitive)
|
|
"""
|
|
self.services = serviceCenter
|
|
self.interfaceDbApp = serviceCenter.interfaceDbApp
|
|
self.interfaceDbComponent = serviceCenter.interfaceDbComponent
|
|
|
|
# Initialize anonymization processors
|
|
self.NamesToParse = NamesToParse or []
|
|
self.textProcessor = TextProcessor(NamesToParse)
|
|
self.listProcessor = ListProcessor(NamesToParse)
|
|
self.binaryProcessor = BinaryProcessor()
|
|
self.commonUtils = CommonUtils()
|
|
|
|
def getConfig(self) -> Optional[DataNeutraliserConfig]:
|
|
"""Get the neutralization configuration for the current user's mandate"""
|
|
if not self.interfaceDbApp:
|
|
return None
|
|
return self.interfaceDbApp.getNeutralizationConfig()
|
|
|
|
def saveConfig(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig:
|
|
"""Save or update the neutralization configuration"""
|
|
if not self.interfaceDbApp:
|
|
raise ValueError("User context required for saving configuration")
|
|
return self.interfaceDbApp.createOrUpdateNeutralizationConfig(config_data)
|
|
|
|
# Public API: process text or file
|
|
|
|
def processText(self, text: str) -> Dict[str, Any]:
|
|
"""Neutralize a raw text string and return a standard result dict."""
|
|
return self._neutralizeText(text, 'text')
|
|
|
|
def processFile(self, fileId: str) -> Dict[str, Any]:
|
|
"""Neutralize a file referenced by its fileId using component interface."""
|
|
if not self.interfaceDbComponent:
|
|
raise ValueError("Component interface is required to process a file by fileId")
|
|
# Fetch file data and metadata
|
|
fileInfo = None
|
|
try:
|
|
# getFile returns an object; fallback to dict-like
|
|
fileInfo = self.interfaceDbComponent.getFile(fileId)
|
|
except Exception:
|
|
fileInfo = None
|
|
fileName = getattr(fileInfo, 'fileName', None) if fileInfo else None
|
|
mimeType = getattr(fileInfo, 'mimeType', None) if fileInfo else None
|
|
fileData = self.interfaceDbComponent.getFileData(fileId)
|
|
if not fileData:
|
|
raise ValueError(f"No file data found for fileId: {fileId}")
|
|
|
|
# Determine textType from mime
|
|
textType = self._getContentTypeFromMime(mimeType or '')
|
|
|
|
# Decode to text
|
|
try:
|
|
textContent = fileData.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
decoded = None
|
|
for enc in ['latin-1', 'cp1252', 'iso-8859-1']:
|
|
try:
|
|
decoded = fileData.decode(enc)
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
if decoded is None:
|
|
raise ValueError("Unable to decode file content")
|
|
textContent = decoded
|
|
|
|
result = self._neutralizeText(textContent, textType)
|
|
# Add a reasonable output filename if original known
|
|
if fileName:
|
|
result['neutralized_file_name'] = f"neutralized_{fileName}"
|
|
result['file_id'] = fileId
|
|
return result
|
|
|
|
def resolveText(self, text: str) -> str:
|
|
if not self.interfaceDbApp:
|
|
return text
|
|
try:
|
|
placeholder_pattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]'
|
|
matches = re.findall(placeholder_pattern, text)
|
|
resolved_text = text
|
|
for placeholder_type, uid in matches:
|
|
attributes = self.interfaceDbApp.db.getRecordset(
|
|
DataNeutralizerAttributes,
|
|
recordFilter={
|
|
"mandateId": self.interfaceDbApp.mandateId,
|
|
"id": uid
|
|
}
|
|
)
|
|
if attributes:
|
|
attribute = attributes[0]
|
|
placeholder = f"[{placeholder_type}.{uid}]"
|
|
resolved_text = resolved_text.replace(placeholder, attribute["originalText"])
|
|
return resolved_text
|
|
except Exception:
|
|
return text
|
|
|
|
def getAttributes(self) -> List[DataNeutralizerAttributes]:
|
|
"""Get all neutralization attributes for the current user's mandate"""
|
|
if not self.interfaceDbApp:
|
|
return []
|
|
try:
|
|
# Use the interface method which properly converts dicts to objects
|
|
return self.interfaceDbApp.getNeutralizationAttributes()
|
|
except Exception as e:
|
|
logger.error(f"Error getting neutralization attributes: {str(e)}")
|
|
return []
|
|
|
|
def deleteNeutralizationAttributes(self, fileId: str) -> bool:
|
|
"""Delete neutralization attributes for a specific file"""
|
|
if not self.interfaceDbApp:
|
|
return False
|
|
return self.interfaceDbApp.deleteNeutralizationAttributes(fileId)
|
|
|
|
def _reloadNamesFromConfig(self) -> None:
|
|
"""Reload names from config and update processors"""
|
|
try:
|
|
config = self.getConfig()
|
|
if not config:
|
|
return
|
|
|
|
# Parse namesToParse string into list
|
|
names_list = []
|
|
if config.namesToParse:
|
|
names_list = [name.strip() for name in config.namesToParse.split('\n') if name.strip()]
|
|
|
|
# Update internal list
|
|
self.NamesToParse = names_list
|
|
|
|
# Recreate processors with updated names
|
|
self.textProcessor = TextProcessor(names_list)
|
|
self.listProcessor = ListProcessor(names_list)
|
|
|
|
logger.debug(f"Reloaded {len(names_list)} names from config")
|
|
except Exception as e:
|
|
logger.error(f"Error reloading names from config: {str(e)}")
|
|
# Continue with existing names if reload fails
|
|
|
|
# Helper functions
|
|
|
|
def _neutralizeText(self, text: str, textType: str = None) -> Dict[str, Any]:
|
|
"""Process text and return unified dict for API consumption."""
|
|
try:
|
|
# Reload names from config before processing to ensure we have the latest names
|
|
self._reloadNamesFromConfig()
|
|
|
|
# Auto-detect content type if not provided
|
|
if textType is None:
|
|
textType = self.commonUtils.detectContentType(text)
|
|
|
|
# Check if content is binary data
|
|
if self.binaryProcessor.isBinaryContent(text):
|
|
data, mapping, replaced_fields, processed_info = self.binaryProcessor.processBinaryContent(text)
|
|
neutralized_text = text if isinstance(data, str) else str(data)
|
|
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
|
|
return NeutralizationResult(
|
|
neutralized_text=neutralized_text,
|
|
mapping=mapping,
|
|
attributes=attributes,
|
|
processed_info=processed_info
|
|
).model_dump()
|
|
|
|
# Inline former _processData routing
|
|
if textType in ['csv', 'json', 'xml']:
|
|
if textType == 'csv':
|
|
data, mapping, replaced_fields, processed_info = self.listProcessor.processCsvContent(text)
|
|
elif textType == 'json':
|
|
data, mapping, replaced_fields, processed_info = self.listProcessor.processJsonContent(text)
|
|
else: # xml
|
|
data, mapping, replaced_fields, processed_info = self.listProcessor.processXmlContent(text)
|
|
else:
|
|
data, mapping, replaced_fields, processed_info = self.textProcessor.processTextContent(text)
|
|
# Stringify data consistently
|
|
if textType == 'csv':
|
|
try:
|
|
neutralized_text = data.to_csv(index=False)
|
|
except Exception:
|
|
neutralized_text = str(data)
|
|
elif textType == 'json':
|
|
neutralized_text = json.dumps(data, ensure_ascii=False)
|
|
elif textType == 'xml':
|
|
neutralized_text = str(data)
|
|
else:
|
|
neutralized_text = str(data)
|
|
|
|
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
|
|
return NeutralizationResult(
|
|
neutralized_text=neutralized_text,
|
|
mapping=mapping,
|
|
attributes=attributes,
|
|
processed_info=processed_info
|
|
).model_dump()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing content: {str(e)}")
|
|
return NeutralizationResult(
|
|
neutralized_text='',
|
|
mapping={},
|
|
attributes=[],
|
|
processed_info={'type': 'error', 'error': str(e)}
|
|
).model_dump()
|
|
|
|
def _getContentTypeFromMime(self, mime_type: str) -> str:
|
|
"""Determine content type from MIME type for neutralization processing"""
|
|
if mime_type.startswith('text/'):
|
|
return 'text'
|
|
elif mime_type in ['application/json', 'application/xml', 'text/xml']:
|
|
return 'json' if 'json' in mime_type else 'xml'
|
|
elif mime_type in ['text/csv', 'application/csv']:
|
|
return 'csv'
|
|
else:
|
|
return 'text' # Default to text processing
|