gateway/modules/services/serviceNeutralization/mainNeutralization.py
2025-09-23 22:47:54 +02:00

206 lines
9 KiB
Python

"""
Data Neutralization Service
Handles file processing for data neutralization including SharePoint integration
DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme
Unterstützt TXT, JSON, CSV, Excel und Word-Dateien
Mehrsprachig: DE, EN, FR, IT
"""
import logging
import re
import os
import uuid
import json
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime
from pathlib import Path
import mimetypes
from modules.interfaces.interfaceAppObjects import getInterface
from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes
from modules.shared.timezoneUtils import get_utc_timestamp
# Import all necessary classes and functions for neutralization
from modules.services.serviceNeutralization.subProcessCommon import ProcessResult, CommonUtils, NeutralizationResult, NeutralizationAttribute
from modules.services.serviceNeutralization.subProcessText import TextProcessor, PlainText
from modules.services.serviceNeutralization.subProcessList import ListProcessor, TableData
from modules.services.serviceNeutralization.subProcessBinary import BinaryProcessor, BinaryData
from modules.services.serviceNeutralization.subParseString import StringParser
from modules.services.serviceNeutralization.subPatterns import Pattern, HeaderPatterns, DataPatterns, TextTablePatterns
logger = logging.getLogger(__name__)
class NeutralizationService:
"""Service for handling data neutralization operations"""
def __init__(self, current_user: User = None, names_to_parse: List[str] = None):
"""Initialize the service with user context and anonymization processors
Args:
current_user: User object for context (optional for basic neutralization)
names_to_parse: List of names to parse and replace (case-insensitive)
"""
self.current_user = current_user
self.app_interface = getInterface(current_user) if current_user else None
# Initialize anonymization processors
self.names_to_parse = names_to_parse or []
self.textProcessor = TextProcessor(names_to_parse)
self.listProcessor = ListProcessor(names_to_parse)
self.binaryProcessor = BinaryProcessor()
self.commonUtils = CommonUtils()
def getConfig(self) -> Optional[DataNeutraliserConfig]:
"""Get the neutralization configuration for the current user's mandate"""
if not self.app_interface:
return None
return self.app_interface.getNeutralizationConfig()
def saveConfig(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig:
"""Save or update the neutralization configuration"""
if not self.app_interface:
raise ValueError("User context required for saving configuration")
return self.app_interface.createOrUpdateNeutralizationConfig(config_data)
# Public API: process text or file
def processText(self, text: str) -> Dict[str, Any]:
"""Neutralize a raw text string and return a standard result dict."""
return self._neutralizeText(text, 'text')
def processFile(self, fileId: str) -> Dict[str, Any]:
"""Neutralize a file referenced by its fileId using app interface."""
if not self.app_interface:
raise ValueError("User context is required to process a file by fileId")
# Fetch file data and metadata
fileInfo = None
try:
# getFile returns an object; fallback to dict-like
fileInfo = self.app_interface.getFile(fileId)
except Exception:
fileInfo = None
fileName = getattr(fileInfo, 'fileName', None) if fileInfo else None
mimeType = getattr(fileInfo, 'mimeType', None) if fileInfo else None
fileData = self.app_interface.getFileData(fileId)
if not fileData:
raise ValueError(f"No file data found for fileId: {fileId}")
# Determine textType from mime
textType = self._getContentTypeFromMime(mimeType or '')
# Decode to text
try:
textContent = fileData.decode('utf-8')
except UnicodeDecodeError:
decoded = None
for enc in ['latin-1', 'cp1252', 'iso-8859-1']:
try:
decoded = fileData.decode(enc)
break
except UnicodeDecodeError:
continue
if decoded is None:
raise ValueError("Unable to decode file content")
textContent = decoded
result = self._neutralizeText(textContent, textType)
# Add a reasonable output filename if original known
if fileName:
result['neutralized_file_name'] = f"neutralized_{fileName}"
result['file_id'] = fileId
return result
def resolveText(self, text: str) -> str:
if not self.app_interface:
return text
try:
placeholder_pattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]'
matches = re.findall(placeholder_pattern, text)
resolved_text = text
for placeholder_type, uid in matches:
attributes = self.app_interface.db.getRecordset(
DataNeutralizerAttributes,
recordFilter={
"mandateId": self.app_interface.mandateId,
"id": uid
}
)
if attributes:
attribute = attributes[0]
placeholder = f"[{placeholder_type}.{uid}]"
resolved_text = resolved_text.replace(placeholder, attribute["originalText"])
return resolved_text
except Exception:
return text
# Helper functions
def _neutralizeText(self, text: str, textType: str = None) -> Dict[str, Any]:
"""Process text and return unified dict for API consumption."""
try:
# Auto-detect content type if not provided
if textType is None:
textType = self.commonUtils.detect_content_type(text)
# Check if content is binary data
if self.binaryProcessor.is_binary_content(text):
data, mapping, replaced_fields, processed_info = self.binaryProcessor.process_binary_content(text)
neutralized_text = text if isinstance(data, str) else str(data)
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
return NeutralizationResult(
neutralized_text=neutralized_text,
mapping=mapping,
attributes=attributes,
processed_info=processed_info
).model_dump()
# Inline former _processData routing
if textType in ['csv', 'json', 'xml']:
if textType == 'csv':
data, mapping, replaced_fields, processed_info = self.listProcessor.process_csv_content(text)
elif textType == 'json':
data, mapping, replaced_fields, processed_info = self.listProcessor.process_json_content(text)
else: # xml
data, mapping, replaced_fields, processed_info = self.listProcessor.process_xml_content(text)
else:
data, mapping, replaced_fields, processed_info = self.textProcessor.process_text_content(text)
# Stringify data consistently
if textType == 'csv':
try:
neutralized_text = data.to_csv(index=False)
except Exception:
neutralized_text = str(data)
elif textType == 'json':
neutralized_text = json.dumps(data, ensure_ascii=False)
elif textType == 'xml':
neutralized_text = str(data)
else:
neutralized_text = str(data)
attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()]
return NeutralizationResult(
neutralized_text=neutralized_text,
mapping=mapping,
attributes=attributes,
processed_info=processed_info
).model_dump()
except Exception as e:
logger.error(f"Error processing content: {str(e)}")
return NeutralizationResult(
neutralized_text='',
mapping={},
attributes=[],
processed_info={'type': 'error', 'error': str(e)}
).model_dump()
def _getContentTypeFromMime(self, mime_type: str) -> str:
"""Determine content type from MIME type for neutralization processing"""
if mime_type.startswith('text/'):
return 'text'
elif mime_type in ['application/json', 'application/xml', 'text/xml']:
return 'json' if 'json' in mime_type else 'xml'
elif mime_type in ['text/csv', 'application/csv']:
return 'csv'
else:
return 'text' # Default to text processing