gateway/modules/services/serviceNeutralization/mainServiceNeutralization.py
2025-12-15 21:55:26 +01:00

314 lines
14 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Data Neutralization Service
Handles file processing for data neutralization including SharePoint integration
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
Unterstützt TXT, JSON, CSV, Excel und Word-Dateien
Mehrsprachig: DE, EN, FR, IT
"""
import logging
import re
import json
from typing import Dict, List, Any, Optional
from modules.datamodels.datamodelNeutralizer import DataNeutraliserConfig, DataNeutralizerAttributes
# Import all necessary classes and functions for neutralization
from modules.services.serviceNeutralization.subProcessCommon import CommonUtils, NeutralizationResult, NeutralizationAttribute
from modules.services.serviceNeutralization.subProcessText import TextProcessor, PlainText
from modules.services.serviceNeutralization.subProcessList import ListProcessor, TableData
from modules.services.serviceNeutralization.subProcessBinary import BinaryProcessor
from modules.services.serviceNeutralization.subPatterns import HeaderPatterns, DataPatterns, TextTablePatterns
logger = logging.getLogger(__name__)
class NeutralizationService:
"""Service for handling data neutralization operations"""
def __init__(self, serviceCenter=None, NamesToParse: List[str] = None):
"""Initialize the service with user context and anonymization processors
Args:
serviceCenter: Service center instance for accessing other services
NamesToParse: List of names to parse and replace (case-insensitive)
"""
self.services = serviceCenter
self.interfaceDbApp = serviceCenter.interfaceDbApp
self.interfaceDbComponent = serviceCenter.interfaceDbComponent
# Initialize anonymization processors
self.NamesToParse = NamesToParse or []
self.textProcessor = TextProcessor(NamesToParse)
self.listProcessor = ListProcessor(NamesToParse)
self.binaryProcessor = BinaryProcessor()
self.commonUtils = CommonUtils()
def getConfig(self) -> Optional[DataNeutraliserConfig]:
"""Get the neutralization configuration for the current user's mandate"""
if not self.interfaceDbApp:
return None
return self.interfaceDbApp.getNeutralizationConfig()
def saveConfig(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig:
"""Save or update the neutralization configuration"""
if not self.interfaceDbApp:
raise ValueError("User context required for saving configuration")
return self.interfaceDbApp.createOrUpdateNeutralizationConfig(config_data)
# Public API: process text or file
def processText(self, text: str) -> Dict[str, Any]:
"""Neutralize a raw text string and return a standard result dict."""
return self._neutralizeText(text, 'text')
def processFile(self, fileId: str) -> Dict[str, Any]:
"""Neutralize a file referenced by its fileId using component interface.
Binary files are not neutralized but will be indicated in the result."""
if not self.interfaceDbComponent:
raise ValueError("Component interface is required to process a file by fileId")
# Fetch file data and metadata
fileInfo = None
try:
# getFile returns an object; fallback to dict-like
fileInfo = self.interfaceDbComponent.getFile(fileId)
except Exception:
fileInfo = None
fileName = getattr(fileInfo, 'fileName', None) if fileInfo else None
mimeType = getattr(fileInfo, 'mimeType', None) if fileInfo else None
# Check if file is binary and cannot be neutralized
if self._isBinaryMimeType(mimeType or ''):
# Return a result indicating binary file (not neutralized)
return {
'file_id': fileId,
'is_binary': True,
'mime_type': mimeType or 'unknown',
'file_name': fileName or 'unknown',
'neutralized_text': None,
'processed_info': {
'type': 'binary',
'status': 'skipped',
'message': 'Binary file neutralization will be implemented in the future'
}
}
fileData = self.interfaceDbComponent.getFileData(fileId)
if not fileData:
raise ValueError(f"No file data found for fileId: {fileId}")
# Determine textType from mime
textType = self._getContentTypeFromMime(mimeType or '')
# Decode to text
try:
textContent = fileData.decode('utf-8')
except UnicodeDecodeError:
decoded = None
for enc in ['latin-1', 'cp1252', 'iso-8859-1']:
try:
decoded = fileData.decode(enc)
break
except UnicodeDecodeError:
continue
if decoded is None:
raise ValueError("Unable to decode file content as text. This may indicate a binary file that cannot be neutralized.")
textContent = decoded
result = self._neutralizeText(textContent, textType)
# Add a reasonable output filename if original known
if fileName:
result['neutralized_file_name'] = f"neutralized_{fileName}"
result['file_id'] = fileId
result['is_binary'] = False
return result
def resolveText(self, text: str) -> str:
if not self.interfaceDbApp:
return text
try:
placeholder_pattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]'
matches = re.findall(placeholder_pattern, text)
resolved_text = text
for placeholder_type, uid in matches:
attributes = self.interfaceDbApp.db.getRecordset(
DataNeutralizerAttributes,
recordFilter={
"mandateId": self.interfaceDbApp.mandateId,
"id": uid
}
)
if attributes:
attribute = attributes[0]
placeholder = f"[{placeholder_type}.{uid}]"
resolved_text = resolved_text.replace(placeholder, attribute["originalText"])
return resolved_text
except Exception:
return text
def getAttributes(self) -> List[DataNeutralizerAttributes]:
"""Get all neutralization attributes for the current user's mandate"""
if not self.interfaceDbApp:
return []
try:
# Use the interface method which properly converts dicts to objects
return self.interfaceDbApp.getNeutralizationAttributes()
except Exception as e:
logger.error(f"Error getting neutralization attributes: {str(e)}")
return []
def deleteNeutralizationAttributes(self, fileId: str) -> bool:
"""Delete neutralization attributes for a specific file"""
if not self.interfaceDbApp:
return False
return self.interfaceDbApp.deleteNeutralizationAttributes(fileId)
def _reloadNamesFromConfig(self) -> None:
"""Reload names from config and update processors"""
try:
config = self.getConfig()
if not config:
return
# Parse namesToParse string into list
names_list = []
if config.namesToParse:
names_list = [name.strip() for name in config.namesToParse.split('\n') if name.strip()]
# Update internal list
self.NamesToParse = names_list
# Recreate processors with updated names
self.textProcessor = TextProcessor(names_list)
self.listProcessor = ListProcessor(names_list)
logger.debug(f"Reloaded {len(names_list)} names from config")
except Exception as e:
logger.error(f"Error reloading names from config: {str(e)}")
# Continue with existing names if reload fails
# Helper functions
def _neutralizeText(self, text: str, textType: str = None) -> Dict[str, Any]:
"""Process text and return unified dict for API consumption."""
try:
# Reload names from config before processing to ensure we have the latest names
self._reloadNamesFromConfig()
# Auto-detect content type if not provided
if textType is None:
textType = self.commonUtils.detectContentType(text)
# Check if content is binary data
if self.binaryProcessor.isBinaryContent(text):
data, mapping, replaced_fields, processed_info = self.binaryProcessor.processBinaryContent(text)
neutralized_text = text if isinstance(data, str) else str(data)
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
return NeutralizationResult(
neutralized_text=neutralized_text,
mapping=mapping,
attributes=attributes,
processed_info=processed_info
).model_dump()
# Inline former _processData routing
if textType in ['csv', 'json', 'xml']:
if textType == 'csv':
data, mapping, replaced_fields, processed_info = self.listProcessor.processCsvContent(text)
elif textType == 'json':
data, mapping, replaced_fields, processed_info = self.listProcessor.processJsonContent(text)
else: # xml
data, mapping, replaced_fields, processed_info = self.listProcessor.processXmlContent(text)
else:
data, mapping, replaced_fields, processed_info = self.textProcessor.processTextContent(text)
# Stringify data consistently
if textType == 'csv':
try:
neutralized_text = data.to_csv(index=False)
except Exception:
neutralized_text = str(data)
elif textType == 'json':
neutralized_text = json.dumps(data, ensure_ascii=False)
elif textType == 'xml':
neutralized_text = str(data)
else:
neutralized_text = str(data)
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
return NeutralizationResult(
neutralized_text=neutralized_text,
mapping=mapping,
attributes=attributes,
processed_info=processed_info
).model_dump()
except Exception as e:
logger.error(f"Error processing content: {str(e)}")
return NeutralizationResult(
neutralized_text='',
mapping={},
attributes=[],
processed_info={'type': 'error', 'error': str(e)}
).model_dump()
def _isBinaryMimeType(self, mime_type: str) -> bool:
"""Check if a MIME type represents binary content that cannot be neutralized as text"""
if not mime_type:
return False
mime_type_lower = mime_type.lower()
# Text-based MIME types that CAN be neutralized (explicit list)
text_mime_types = [
'text/plain', 'text/html', 'text/css', 'text/markdown', 'text/csv',
'text/javascript', 'text/xml', 'text/json',
'application/json', 'application/xml', 'application/javascript',
'application/csv'
]
# Check explicit text types first
if mime_type_lower in text_mime_types:
return False
# Text-based prefixes that can be neutralized
if mime_type_lower.startswith('text/'):
return False
# Binary MIME types that CANNOT be neutralized
binary_mime_prefixes = [
'image/', 'audio/', 'video/',
'application/pdf', 'application/zip',
'application/octet-stream', 'application/x-',
'application/vnd.', 'application/msword',
'application/vnd.ms-', 'application/vnd.openxmlformats-'
]
# Check if it's a binary type by prefix
if any(mime_type_lower.startswith(prefix) for prefix in binary_mime_prefixes):
return True
# Additional specific binary document types
binary_mime_types = [
'application/pdf', 'application/msword', 'application/vnd.ms-excel',
'application/vnd.ms-powerpoint',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'application/zip', 'application/x-rar-compressed', 'application/x-7z-compressed',
'application/x-tar', 'application/gzip'
]
return mime_type_lower in binary_mime_types
def _getContentTypeFromMime(self, mime_type: str) -> str:
"""Determine content type from MIME type for neutralization processing"""
if mime_type.startswith('text/'):
return 'text'
elif mime_type in ['application/json', 'application/xml', 'text/xml']:
return 'json' if 'json' in mime_type else 'xml'
elif mime_type in ['text/csv', 'application/csv']:
return 'csv'
else:
return 'text' # Default to text processing