""" Data Neutralization Service Handles file processing for data neutralization including SharePoint integration DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme Unterstützt TXT, JSON, CSV, Excel und Word-Dateien Mehrsprachig: DE, EN, FR, IT """ import logging import re import json from typing import Dict, List, Any, Optional from modules.interfaces.interfaceAppModel import DataNeutraliserConfig, DataNeutralizerAttributes # Import all necessary classes and functions for neutralization from modules.services.serviceNeutralization.subProcessCommon import CommonUtils, NeutralizationResult, NeutralizationAttribute from modules.services.serviceNeutralization.subProcessText import TextProcessor, PlainText from modules.services.serviceNeutralization.subProcessList import ListProcessor, TableData from modules.services.serviceNeutralization.subProcessBinary import BinaryProcessor from modules.services.serviceNeutralization.subPatterns import HeaderPatterns, DataPatterns, TextTablePatterns logger = logging.getLogger(__name__) class NeutralizationService: """Service for handling data neutralization operations""" def __init__(self, serviceCenter=None, NamesToParse: List[str] = None): """Initialize the service with user context and anonymization processors Args: serviceCenter: Service center instance for accessing other services NamesToParse: List of names to parse and replace (case-insensitive) """ self.serviceCenter = serviceCenter self.interfaceApp = serviceCenter.interfaceApp # Initialize anonymization processors self.NamesToParse = NamesToParse or [] self.textProcessor = TextProcessor(NamesToParse) self.listProcessor = ListProcessor(NamesToParse) self.binaryProcessor = BinaryProcessor() self.commonUtils = CommonUtils() def getConfig(self) -> Optional[DataNeutraliserConfig]: """Get the neutralization configuration for the current user's mandate""" if not self.interfaceApp: return None return self.interfaceApp.getNeutralizationConfig() def saveConfig(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig: """Save or update the neutralization configuration""" if not self.interfaceApp: raise ValueError("User context required for saving configuration") return self.interfaceApp.createOrUpdateNeutralizationConfig(config_data) # Public API: process text or file def processText(self, text: str) -> Dict[str, Any]: """Neutralize a raw text string and return a standard result dict.""" return self._neutralizeText(text, 'text') def processFile(self, fileId: str) -> Dict[str, Any]: """Neutralize a file referenced by its fileId using app interface.""" if not self.interfaceApp: raise ValueError("User context is required to process a file by fileId") # Fetch file data and metadata fileInfo = None try: # getFile returns an object; fallback to dict-like fileInfo = self.interfaceApp.getFile(fileId) except Exception: fileInfo = None fileName = getattr(fileInfo, 'fileName', None) if fileInfo else None mimeType = getattr(fileInfo, 'mimeType', None) if fileInfo else None fileData = self.interfaceApp.getFileData(fileId) if not fileData: raise ValueError(f"No file data found for fileId: {fileId}") # Determine textType from mime textType = self._getContentTypeFromMime(mimeType or '') # Decode to text try: textContent = fileData.decode('utf-8') except UnicodeDecodeError: decoded = None for enc in ['latin-1', 'cp1252', 'iso-8859-1']: try: decoded = fileData.decode(enc) break except UnicodeDecodeError: continue if decoded is None: raise ValueError("Unable to decode file content") textContent = decoded result = self._neutralizeText(textContent, textType) # Add a reasonable output filename if original known if fileName: result['neutralized_file_name'] = f"neutralized_{fileName}" result['file_id'] = fileId return result def resolveText(self, text: str) -> str: if not self.interfaceApp: return text try: placeholder_pattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]' matches = re.findall(placeholder_pattern, text) resolved_text = text for placeholder_type, uid in matches: attributes = self.interfaceApp.db.getRecordset( DataNeutralizerAttributes, recordFilter={ "mandateId": self.interfaceApp.mandateId, "id": uid } ) if attributes: attribute = attributes[0] placeholder = f"[{placeholder_type}.{uid}]" resolved_text = resolved_text.replace(placeholder, attribute["originalText"]) return resolved_text except Exception: return text # Helper functions def _neutralizeText(self, text: str, textType: str = None) -> Dict[str, Any]: """Process text and return unified dict for API consumption.""" try: # Auto-detect content type if not provided if textType is None: textType = self.commonUtils.detect_content_type(text) # Check if content is binary data if self.binaryProcessor.is_binary_content(text): data, mapping, replaced_fields, processed_info = self.binaryProcessor.process_binary_content(text) neutralized_text = text if isinstance(data, str) else str(data) attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()] return NeutralizationResult( neutralized_text=neutralized_text, mapping=mapping, attributes=attributes, processed_info=processed_info ).model_dump() # Inline former _processData routing if textType in ['csv', 'json', 'xml']: if textType == 'csv': data, mapping, replaced_fields, processed_info = self.listProcessor.process_csv_content(text) elif textType == 'json': data, mapping, replaced_fields, processed_info = self.listProcessor.process_json_content(text) else: # xml data, mapping, replaced_fields, processed_info = self.listProcessor.process_xml_content(text) else: data, mapping, replaced_fields, processed_info = self.textProcessor.process_text_content(text) # Stringify data consistently if textType == 'csv': try: neutralized_text = data.to_csv(index=False) except Exception: neutralized_text = str(data) elif textType == 'json': neutralized_text = json.dumps(data, ensure_ascii=False) elif textType == 'xml': neutralized_text = str(data) else: neutralized_text = str(data) attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()] return NeutralizationResult( neutralized_text=neutralized_text, mapping=mapping, attributes=attributes, processed_info=processed_info ).model_dump() except Exception as e: logger.error(f"Error processing content: {str(e)}") return NeutralizationResult( neutralized_text='', mapping={}, attributes=[], processed_info={'type': 'error', 'error': str(e)} ).model_dump() def _getAttributes(self) -> List[DataNeutralizerAttributes]: """Get all neutralization attributes for the current user's mandate""" if not self.interfaceApp: return [] try: return self.interfaceApp.db.getRecordset( DataNeutralizerAttributes, recordFilter={"mandateId": self.interfaceApp.mandateId} ) except Exception as e: logger.error(f"Error getting neutralization attributes: {str(e)}") return [] def _getContentTypeFromMime(self, mime_type: str) -> str: """Determine content type from MIME type for neutralization processing""" if mime_type.startswith('text/'): return 'text' elif mime_type in ['application/json', 'application/xml', 'text/xml']: return 'json' if 'json' in mime_type else 'xml' elif mime_type in ['text/csv', 'application/csv']: return 'csv' else: return 'text' # Default to text processing