# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Data Neutralization Service Handles file processing for data neutralization including SharePoint integration DSGVO-konformer Daten-Neutralisierer für KI-Agentensysteme Supports TXT, JSON, CSV, PDF, DOCX, XLSX, PPTX (extract -> neutralize -> generate) Mehrsprachig: DE, EN, FR, IT """ import asyncio import logging import re import json from typing import Dict, List, Any, Optional from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig, DataNeutralizerAttributes from modules.features.neutralization.interfaceFeatureNeutralizer import InterfaceFeatureNeutralizer, getInterface as getNeutralizerInterface # Import all necessary classes and functions for neutralization from .subProcessCommon import CommonUtils, NeutralizationResult, NeutralizationAttribute from .subProcessText import TextProcessor, PlainText from .subProcessList import ListProcessor, TableData from .subProcessBinary import BinaryProcessor from .subProcessPdfInPlace import neutralize_pdf_in_place from .subPatterns import HeaderPatterns, DataPatterns, TextTablePatterns from .subContentPartAdapter import content_parts_to_renderer_schema logger = logging.getLogger(__name__) # MIME types that can be processed via extract -> neutralize -> generate EXTRACTABLE_BINARY_MIME_TYPES = frozenset({ "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.openxmlformats-officedocument.presentationml.presentation", }) class NeutralizationService: """Service for handling data neutralization operations""" def __init__(self, serviceCenter=None, NamesToParse: List[str] = None): """Initialize the service with user context and anonymization processors Args: serviceCenter: Service center instance for accessing other services NamesToParse: List of names to parse and replace (case-insensitive) """ self.services = serviceCenter self.interfaceDbComponent = serviceCenter.interfaceDbComponent # Create feature-specific interface for neutralizer DB operations self.interfaceNeutralizer: InterfaceFeatureNeutralizer = None if serviceCenter and serviceCenter.interfaceDbApp: dbApp = serviceCenter.interfaceDbApp self.interfaceNeutralizer = getNeutralizerInterface( currentUser=dbApp.currentUser, mandateId=serviceCenter.mandateId or dbApp.mandateId, featureInstanceId=getattr(serviceCenter, 'featureInstanceId', None) or getattr(dbApp, 'featureInstanceId', None) ) # Initialize anonymization processors self.NamesToParse = NamesToParse or [] self.textProcessor = TextProcessor(NamesToParse) self.listProcessor = ListProcessor(NamesToParse) self.binaryProcessor = BinaryProcessor() self.commonUtils = CommonUtils() def getConfig(self) -> Optional[DataNeutraliserConfig]: """Get the neutralization configuration for the current user's mandate""" if not self.interfaceNeutralizer: return None return self.interfaceNeutralizer.getNeutralizationConfig() def saveConfig(self, configData: Dict[str, Any]) -> DataNeutraliserConfig: """Save or update the neutralization configuration""" if not self.interfaceNeutralizer: raise ValueError("User context required for saving configuration") return self.interfaceNeutralizer.createOrUpdateNeutralizationConfig(configData) # Public API: process text or file def processText(self, text: str) -> Dict[str, Any]: """Neutralize a raw text string and return a standard result dict.""" result = self._neutralizeText(text, 'text') self._persistAttributes(result.get('mapping', {}), None) return result def processFile(self, fileId: str) -> Dict[str, Any]: """Neutralize a file referenced by its fileId using component interface. Supports text files directly; PDF/DOCX/XLSX/PPTX via extract -> neutralize -> generate.""" if not self.interfaceDbComponent: raise ValueError("Component interface is required to process a file by fileId") fileInfo = None try: fileInfo = self.interfaceDbComponent.getFile(fileId) except Exception: fileInfo = None fileName = getattr(fileInfo, 'fileName', None) if fileInfo else None mimeType = getattr(fileInfo, 'mimeType', None) if fileInfo else None fileData = self.interfaceDbComponent.getFileData(fileId) if not fileData: raise ValueError(f"No file data found for fileId: {fileId}") mime_lower = (mimeType or '').lower() # Binary but extractable: PDF, DOCX, XLSX, PPTX if mime_lower in EXTRACTABLE_BINARY_MIME_TYPES: try: result = asyncio.run(self._processBinaryFile(fileData, fileName or "document", mime_lower, fileId)) if result: result['file_id'] = fileId result['neutralized_file_name'] = f"neutralized_{fileName}" if fileName else "neutralized_document" return result except Exception as e: logger.error(f"Binary file neutralization failed: {str(e)}") return { 'file_id': fileId, 'is_binary': True, 'mime_type': mimeType or 'unknown', 'file_name': fileName or 'unknown', 'neutralized_text': None, 'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)} } # Binary but not extractable if self._isBinaryMimeType(mimeType or ''): return { 'file_id': fileId, 'is_binary': True, 'mime_type': mimeType or 'unknown', 'file_name': fileName or 'unknown', 'neutralized_text': None, 'processed_info': {'type': 'binary', 'status': 'skipped', 'message': 'File type not supported for neutralization'} } # Text-based file textType = self._getContentTypeFromMime(mimeType or '') try: textContent = fileData.decode('utf-8') except UnicodeDecodeError: decoded = None for enc in ['latin-1', 'cp1252', 'iso-8859-1']: try: decoded = fileData.decode(enc) break except UnicodeDecodeError: continue if decoded is None: raise ValueError("Unable to decode file content as text.") textContent = decoded result = self._neutralizeText(textContent, textType) self._persistAttributes(result.get('mapping', {}), fileId) if fileName: result['neutralized_file_name'] = f"neutralized_{fileName}" result['file_id'] = fileId result['is_binary'] = False return result def processBinaryBytes(self, fileBytes: bytes, fileName: str, mimeType: str) -> Dict[str, Any]: """Neutralize binary file bytes (sync - use from sync callers). Uses asyncio.run when event loop not running.""" mime_lower = (mimeType or '').lower() if mime_lower not in EXTRACTABLE_BINARY_MIME_TYPES: return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'skipped', 'message': 'File type not supported'} } try: return asyncio.run(self._processBinaryFile(fileBytes, fileName, mime_lower, None)) except Exception as e: logger.error(f"Binary neutralization failed: {str(e)}") return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)} } async def processBinaryBytesAsync(self, fileBytes: bytes, fileName: str, mimeType: str) -> Dict[str, Any]: """Neutralize binary file bytes (async - use from async routes to avoid event loop conflict).""" mime_lower = (mimeType or '').lower() if mime_lower not in EXTRACTABLE_BINARY_MIME_TYPES: return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'skipped', 'message': 'File type not supported'} } try: return await self._processBinaryFile(fileBytes, fileName, mime_lower, None) except Exception as e: logger.error(f"Binary neutralization failed: {str(e)}") return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)} } def resolveText(self, text: str) -> str: if not self.interfaceNeutralizer: return text try: placeholderPattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]' matches = re.findall(placeholderPattern, text) resolvedText = text for placeholderType, uid in matches: attribute = self.interfaceNeutralizer.getAttributeById(uid) if attribute: placeholder = f"[{placeholderType}.{uid}]" resolvedText = resolvedText.replace(placeholder, attribute["originalText"]) return resolvedText except Exception: return text def getAttributes(self) -> List[DataNeutralizerAttributes]: """Get all neutralization attributes for the current user's mandate""" if not self.interfaceNeutralizer: return [] try: # Use the interface method which properly converts dicts to objects return self.interfaceNeutralizer.getNeutralizationAttributes() except Exception as e: logger.error(f"Error getting neutralization attributes: {str(e)}") return [] def deleteNeutralizationAttributes(self, fileId: str) -> bool: """Delete neutralization attributes for a specific file""" if not self.interfaceNeutralizer: return False return self.interfaceNeutralizer.deleteNeutralizationAttributes(fileId) def _persistAttributes(self, mapping: Dict[str, str], fileId: Optional[str]) -> None: """Persist mapping to DB for resolve to work. mapping: originalText -> placeholder e.g. '[email.uuid]'""" if not self.interfaceNeutralizer or not mapping: return import re placeholder_re = re.compile(r'^\[([a-z]+)\.([a-f0-9-]{36})\]$') for original_text, placeholder in mapping.items(): m = placeholder_re.match(placeholder) if m: pattern_type, uid = m.group(1), m.group(2) try: self.interfaceNeutralizer.createAttribute( attributeId=uid, originalText=original_text, patternType=pattern_type, fileId=fileId ) except Exception as e: logger.debug(f"Could not persist attribute {uid}: {e}") async def _processBinaryFile( self, fileBytes: bytes, fileName: str, mimeType: str, fileId: Optional[str] ) -> Dict[str, Any]: """Extract -> neutralize -> adapt -> generate for PDF/DOCX/XLSX/PPTX.""" from modules.serviceCenter.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy # Ensure registries exist if ExtractionService._sharedExtractorRegistry is None: ExtractionService(self.services) registry = ExtractionService._sharedExtractorRegistry chunker = ExtractionService._sharedChunkerRegistry opts = ExtractionOptions(prompt="neutralize", mergeStrategy=MergeStrategy(preserveChunks=True)) # 1. Extract extracted = runExtraction(registry, chunker, fileBytes, fileName, mimeType, opts) parts = extracted.parts if hasattr(extracted, 'parts') else [] if not parts: return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'error', 'error': 'No content extracted'} } # 2. Neutralize each text/table part all_mapping: Dict[str, str] = {} neutralized_parts: List[Any] = [] neutralization_error: Optional[str] = None for part in parts: p = part if isinstance(part, dict) else part.model_dump() if hasattr(part, 'model_dump') else part type_group = p.get('typeGroup', '') data = p.get('data', '') if type_group in ('binary', 'image') or not (data and str(data).strip()): neutralized_parts.append(part) continue nr = self._neutralizeText(str(data), 'text' if type_group != 'table' else 'csv') proc = nr.get('processed_info', {}) or {} if isinstance(proc, dict) and proc.get('type') == 'error': neutralization_error = proc.get('error', 'Neutralization failed') neu_text = nr.get('neutralized_text', str(data)) mapping = nr.get('mapping', {}) all_mapping.update(mapping) new_part = {**p, 'data': neu_text} neutralized_parts.append(new_part) self._persistAttributes(all_mapping, fileId) # 3. PDF: Use in-place only; no fallback to render if mimeType == "application/pdf": if neutralization_error: logger.error(f"PDF neutralization aborted: {neutralization_error}") return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'error', 'error': neutralization_error} } in_place_bytes = neutralize_pdf_in_place(fileBytes, all_mapping) if in_place_bytes is not None: logger.info("PDF neutralization completed via in-place redaction (layout preserved)") return { 'neutralized_text': None, 'neutralized_bytes': in_place_bytes, 'neutralized_file_name': f"neutralized_{fileName}", 'is_binary': True, 'mime_type': 'application/pdf', 'attributes': [{'original': k, 'placeholder': v} for k, v in all_mapping.items()], 'processed_info': {'type': 'binary', 'status': 'success', 'format': 'pdf', 'method': 'in-place'} } logger.error("PDF in-place neutralization failed") return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'error', 'error': 'PDF in-place neutralization failed'} } # 4. Adapter: ContentPart list -> renderer schema (non-PDF only) schema = content_parts_to_renderer_schema(neutralized_parts, title=fileName or "Neutralized") # 5. Render to format renderer, output_mime = self._getRendererForMime(mimeType) if not renderer: return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'error', 'error': f'No renderer for {mimeType}'} } try: logger.info(f"Calling renderer.render for mime={mimeType}, renderer={type(renderer).__name__}") rendered = await renderer.render(schema, fileName or "document", None, None) logger.info(f"Renderer returned: type={type(rendered).__name__}, len={len(rendered) if rendered else 0}") if not rendered or len(rendered) == 0: logger.error("Renderer returned empty list") return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'error', 'error': 'Render produced no output'} } doc = rendered[0] logger.info(f"First doc: type={type(doc).__name__}, isinstance(dict)={isinstance(doc, dict)}, has documentData attr={hasattr(doc, 'documentData')}") # Extract documentData: Pydantic v2 models may need model_dump() for reliable access if isinstance(doc, dict): doc_data = doc.get('documentData') elif hasattr(doc, 'model_dump'): d = doc.model_dump(mode='python') doc_data = d.get('documentData') else: doc_data = getattr(doc, 'documentData', None) logger.info(f"doc_data: type={type(doc_data).__name__ if doc_data is not None else 'None'}, len={len(doc_data) if doc_data else 0}") if doc_data is None: logger.error("Renderer returned document with no documentData") return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'error', 'error': 'Renderer returned no data'} } if isinstance(doc_data, str): doc_data = doc_data.encode('utf-8') return { 'neutralized_text': None, 'neutralized_bytes': doc_data, 'neutralized_file_name': f"neutralized_{fileName}", 'is_binary': True, 'mime_type': output_mime, 'attributes': [{'original': k, 'placeholder': v} for k, v in all_mapping.items()], 'processed_info': {'type': 'binary', 'status': 'success', 'format': mimeType} } except Exception as e: logger.error(f"Render failed for {mimeType}: {str(e)}", exc_info=True) raise return { 'neutralized_text': None, 'neutralized_bytes': None, 'is_binary': True, 'processed_info': {'type': 'binary', 'status': 'error', 'error': 'Render produced no output'} } def _getRendererForMime(self, mimeType: str): """Get renderer instance and output mime for the given input MIME type.""" from modules.serviceCenter.services.serviceGeneration.renderers.rendererPdf import RendererPdf from modules.serviceCenter.services.serviceGeneration.renderers.rendererDocx import RendererDocx from modules.serviceCenter.services.serviceGeneration.renderers.rendererXlsx import RendererXlsx from modules.serviceCenter.services.serviceGeneration.renderers.rendererPptx import RendererPptx mime_map = { "application/pdf": (RendererPdf, "application/pdf"), "application/vnd.openxmlformats-officedocument.wordprocessingml.document": (RendererDocx, "application/vnd.openxmlformats-officedocument.wordprocessingml.document"), "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": (RendererXlsx, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"), "application/vnd.openxmlformats-officedocument.presentationml.presentation": (RendererPptx, "application/vnd.openxmlformats-officedocument.presentationml.presentation"), } pair = mime_map.get(mimeType) if not pair: return None, None cls, out_mime = pair renderer = cls(self.services) return renderer, out_mime def _reloadNamesFromConfig(self) -> None: """Reload names from config and update processors""" try: config = self.getConfig() if not config: return # Parse namesToParse string into list names_list = [] if config.namesToParse: names_list = [name.strip() for name in config.namesToParse.split('\n') if name.strip()] # Update internal list self.NamesToParse = names_list # Recreate processors with updated names self.textProcessor = TextProcessor(names_list) self.listProcessor = ListProcessor(names_list) logger.debug(f"Reloaded {len(names_list)} names from config") except Exception as e: logger.error(f"Error reloading names from config: {str(e)}") # Continue with existing names if reload fails # Helper functions def _neutralizeText(self, text: str, textType: str = None) -> Dict[str, Any]: """Process text and return unified dict for API consumption.""" try: # Reload names from config before processing to ensure we have the latest names self._reloadNamesFromConfig() # Auto-detect content type if not provided if textType is None: textType = self.commonUtils.detectContentType(text) # Check if content is binary data if self.binaryProcessor.isBinaryContent(text): data, mapping, replaced_fields, processed_info = self.binaryProcessor.processBinaryContent(text) neutralized_text = text if isinstance(data, str) else str(data) attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()] return NeutralizationResult( neutralized_text=neutralized_text, mapping=mapping, attributes=attributes, processed_info=processed_info ).model_dump() # Inline former _processData routing if textType in ['csv', 'json', 'xml']: if textType == 'csv': data, mapping, replaced_fields, processed_info = self.listProcessor.processCsvContent(text) elif textType == 'json': data, mapping, replaced_fields, processed_info = self.listProcessor.processJsonContent(text) else: # xml data, mapping, replaced_fields, processed_info = self.listProcessor.processXmlContent(text) else: data, mapping, replaced_fields, processed_info = self.textProcessor.processTextContent(text) # Stringify data consistently if textType == 'csv': try: neutralized_text = data.to_csv(index=False) except Exception: neutralized_text = str(data) elif textType == 'json': neutralized_text = json.dumps(data, ensure_ascii=False) elif textType == 'xml': neutralized_text = str(data) else: neutralized_text = str(data) attributes = [NeutralizationAttribute(original=k, placeholder=v) for k, v in mapping.items()] return NeutralizationResult( neutralized_text=neutralized_text, mapping=mapping, attributes=attributes, processed_info=processed_info ).model_dump() except Exception as e: logger.error(f"Error processing content: {str(e)}") return NeutralizationResult( neutralized_text='', mapping={}, attributes=[], processed_info={'type': 'error', 'error': str(e)} ).model_dump() def _isBinaryMimeType(self, mime_type: str) -> bool: """Check if a MIME type represents binary content that cannot be neutralized as text""" if not mime_type: return False mime_type_lower = mime_type.lower() # Text-based MIME types that CAN be neutralized (explicit list) text_mime_types = [ 'text/plain', 'text/html', 'text/css', 'text/markdown', 'text/csv', 'text/javascript', 'text/xml', 'text/json', 'application/json', 'application/xml', 'application/javascript', 'application/csv' ] # Check explicit text types first if mime_type_lower in text_mime_types: return False # Text-based prefixes that can be neutralized if mime_type_lower.startswith('text/'): return False # Binary MIME types that CANNOT be neutralized binary_mime_prefixes = [ 'image/', 'audio/', 'video/', 'application/pdf', 'application/zip', 'application/octet-stream', 'application/x-', 'application/vnd.', 'application/msword', 'application/vnd.ms-', 'application/vnd.openxmlformats-' ] # Check if it's a binary type by prefix if any(mime_type_lower.startswith(prefix) for prefix in binary_mime_prefixes): return True # Additional specific binary document types binary_mime_types = [ 'application/pdf', 'application/msword', 'application/vnd.ms-excel', 'application/vnd.ms-powerpoint', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'application/zip', 'application/x-rar-compressed', 'application/x-7z-compressed', 'application/x-tar', 'application/gzip' ] return mime_type_lower in binary_mime_types def _getContentTypeFromMime(self, mime_type: str) -> str: """Determine content type from MIME type for neutralization processing""" if mime_type.startswith('text/'): return 'text' elif mime_type in ['application/json', 'application/xml', 'text/xml']: return 'json' if 'json' in mime_type else 'xml' elif mime_type in ['text/csv', 'application/csv']: return 'csv' else: return 'text' # Default to text processing