# Copyright (c) 2025 Patrick Motsch # All rights reserved. import logging import asyncio from typing import Any, Dict, List, Optional from urllib.parse import urlparse, unquote from modules.datamodels.datamodelUam import User from .datamodelFeatureNeutralizer import DataNeutralizerAttributes, DataNeutraliserConfig from .interfaceFeatureNeutralizer import getInterface as _getNeutralizerInterface from modules.serviceHub import getInterface as getServices logger = logging.getLogger(__name__) class NeutralizationPlayground: """Feature/UI wrapper around NeutralizationService for playground & routes.""" def __init__(self, currentUser: User, mandateId: str, featureInstanceId: Optional[str] = None): self.currentUser = currentUser self.mandateId = mandateId self.featureInstanceId = featureInstanceId self.services = getServices(currentUser, None, mandateId=mandateId, featureInstanceId=featureInstanceId) def processText(self, text: str) -> Dict[str, Any]: return self.services.neutralization.processText(text) async def processUploadedFileAsync(self, file_bytes: bytes, filename: str) -> Dict[str, Any]: """Process an uploaded file (bytes + filename). Returns neutralized result for text or binary. Saves both original and neutralized files to user files (component storage) when available.""" import base64 name_lower = (filename or '').lower() mime_map = { '.pdf': 'application/pdf', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.xlsm': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', } mime = next((mime_map[ext] for ext in mime_map if name_lower.endswith(ext)), 'text/plain') binary_exts = {'.pdf', '.docx', '.xlsx', '.xlsm', '.pptx'} is_binary = any(name_lower.endswith(ext) for ext in binary_exts) original_file_id = None neutralized_file_id = None # Save original file to user files if self.services.interfaceDbComponent: try: file_item, _ = self.services.interfaceDbComponent.saveUploadedFile(file_bytes, filename) original_file_id = str(file_item.id) except Exception as e: logger.warning(f"Could not save original file to user files: {e}") if is_binary: result = await self.services.neutralization.processBinaryBytesAsync(file_bytes, filename, mime) neu_bytes = result.get('neutralized_bytes') logger.debug(f"Binary result: neu_bytes type={type(neu_bytes).__name__}, len={len(neu_bytes) if neu_bytes is not None else 0}") if neu_bytes is not None and len(neu_bytes) > 0: result['neutralized_file_base64'] = base64.b64encode(neu_bytes).decode('ascii') result['neutralized_file_name'] = result.get('neutralized_file_name', f'neutralized_{filename}') result['mime_type'] = result.get('mime_type', mime) # Save neutralized binary to user files if self.services.interfaceDbComponent: try: neu_name = result['neutralized_file_name'] file_item, _ = self.services.interfaceDbComponent.saveUploadedFile(neu_bytes, neu_name) neutralized_file_id = str(file_item.id) except Exception as e: logger.warning(f"Could not save neutralized file to user files: {e}") # Remove raw bytes before JSON response (avoid serialization issues; use base64 only) result.pop('neutralized_bytes', None) result['original_file_id'] = original_file_id result['neutralized_file_id'] = neutralized_file_id return result try: text_content = file_bytes.decode('utf-8') except UnicodeDecodeError: try: text_content = file_bytes.decode('latin-1') except UnicodeDecodeError: return { 'neutralized_text': None, 'original_file_id': original_file_id, 'neutralized_file_id': None, 'processed_info': {'type': 'error', 'error': 'File could not be decoded as text. Supported: UTF-8, Latin-1. For PDF/Word/Excel, use supported binary formats.'} } result = self.services.neutralization.processText(text_content) result['neutralized_file_name'] = f'neutralized_{filename}' # Save neutralized text as file to user files if self.services.interfaceDbComponent and result.get('neutralized_text') is not None: try: neu_text = result['neutralized_text'] neu_bytes = neu_text.encode('utf-8') neu_name = result['neutralized_file_name'] file_item, _ = self.services.interfaceDbComponent.saveUploadedFile(neu_bytes, neu_name) neutralized_file_id = str(file_item.id) except Exception as e: logger.warning(f"Could not save neutralized text file to user files: {e}") result['original_file_id'] = original_file_id result['neutralized_file_id'] = neutralized_file_id return result def processUploadedFile(self, file_bytes: bytes, filename: str) -> Dict[str, Any]: """Sync wrapper for sync callers. Uses asyncio.run; do NOT call from async routes (use processUploadedFileAsync).""" return asyncio.run(self.processUploadedFileAsync(file_bytes, filename)) def processFiles(self, fileIds: List[str]) -> Dict[str, Any]: results: List[Dict[str, Any]] = [] errors: List[str] = [] for fileId in fileIds: try: res = self.services.neutralization.processFile(fileId) results.append({ 'file_id': fileId, 'neutralized_file_name': res.get('neutralized_file_name'), 'attributes_count': len(res.get('attributes', [])) }) except Exception as e: logger.error(f"Error processing file {fileId}: {str(e)}") errors.append(f"{fileId}: {str(e)}") return { 'success': len(errors) == 0, 'total_files': len(fileIds), 'successful_files': len(results), 'failed_files': len(errors), 'results': results, 'errors': errors, } # Delete a single attribute by ID def deleteAttribute(self, attributeId: str) -> bool: interface = _getNeutralizerInterface(self.currentUser, self.mandateId, self.featureInstanceId) return interface.deleteAttributeById(attributeId) # Cleanup attributes def cleanAttributes(self, fileId: str) -> bool: return self.services.neutralization.deleteNeutralizationAttributes(fileId) # Stats def getStats(self) -> Dict[str, Any]: try: allAttributes = self.services.neutralization.getAttributes() patternCounts: Dict[str, int] = {} for attr in allAttributes: # Handle both dict and object access patterns if isinstance(attr, dict): patternType = attr.get('patternType', 'unknown') fileId = attr.get('fileId') else: patternType = getattr(attr, 'patternType', 'unknown') fileId = getattr(attr, 'fileId', None) if patternType: patternCounts[patternType] = patternCounts.get(patternType, 0) + 1 # Get unique files - handle both dict and object uniqueFiles = set() for attr in allAttributes: if isinstance(attr, dict): fileId = attr.get('fileId') else: fileId = getattr(attr, 'fileId', None) if fileId: uniqueFiles.add(fileId) return { 'total_attributes': len(allAttributes), 'unique_files': len(uniqueFiles), 'pattern_counts': patternCounts, 'mandate_id': self.mandateId, } except Exception as e: logger.error(f"Error getting stats: {str(e)}") return { 'total_attributes': 0, 'unique_files': 0, 'pattern_counts': {}, 'error': str(e), } # Additional methods needed by the route def getConfig(self) -> Optional[DataNeutraliserConfig]: """Get neutralization configuration""" return self.services.neutralization.getConfig() def saveConfig(self, configData: Dict[str, Any]) -> DataNeutraliserConfig: """Save neutralization configuration""" return self.services.neutralization.saveConfig(configData) def neutralizeText(self, text: str, fileId: str = None) -> Dict[str, Any]: """Neutralize text content""" return self.services.neutralization.processText(text) def resolveText(self, text: str) -> str: """Resolve UIDs in neutralized text back to original text""" return self.services.neutralization.resolveText(text) def getAttributes(self, fileId: str = None) -> List[DataNeutralizerAttributes]: """Get neutralization attributes, optionally filtered by file ID""" try: allAttributes = self.services.neutralization.getAttributes() if fileId: return [attr for attr in allAttributes if attr.fileId == fileId] return allAttributes except Exception as e: logger.error(f"Error getting attributes: {str(e)}") return [] async def processSharepointFiles(self, sourcePath: str, targetPath: str) -> Dict[str, Any]: """Process files from SharePoint source path and store neutralized files in target path""" from modules.serviceCenter.services.serviceSharepoint.mainServiceSharepoint import SharepointService processor = SharepointProcessor(self.currentUser, self.services) return await processor.processSharepointFiles(sourcePath, targetPath) def batchNeutralizeFiles(self, filesData: List[Dict[str, Any]]) -> Dict[str, Any]: """Process multiple files for neutralization""" fileIds = [fileData.get('fileId') for fileData in filesData if fileData.get('fileId')] return self.processFiles(fileIds) def getProcessingStats(self) -> Dict[str, Any]: """Get neutralization processing statistics""" return self.getStats() def cleanupFileAttributes(self, fileId: str) -> bool: """Clean up neutralization attributes for a specific file""" return self.cleanAttributes(fileId) # Internal SharePoint helper module separated to keep feature logic tidy class SharepointProcessor: def __init__(self, currentUser: User, services): self.currentUser = currentUser self.services = services async def processSharepointFiles(self, sourcePath: str, targetPath: str) -> Dict[str, Any]: try: logger.info(f"Processing SharePoint files from {sourcePath} to {targetPath}") # Get SharePoint connection connection = await self._getSharepointConnection(sourcePath) if not connection: return { 'success': False, 'message': 'No SharePoint connection found for user', 'processed_files': 0, 'errors': ['No SharePoint connection found'], } # Set access token for SharePoint service if not self.services.sharepoint.setAccessTokenFromConnection(connection): return { 'success': False, 'message': 'Failed to set SharePoint access token', 'processed_files': 0, 'errors': ['Failed to set SharePoint access token'], } return await self._processSharepointFilesAsync(sourcePath, targetPath) except Exception as e: logger.error(f"Error processing SharePoint files: {str(e)}") return { 'success': False, 'message': f'Error processing SharePoint files: {str(e)}', 'processed_files': 0, 'errors': [str(e)], } async def _getSharepointConnection(self, sharepointPath: str = None): try: # Use interface method to get user connections connections = self.services.interfaceDbApp.getUserConnections(self.services.interfaceDbApp.userId) def _is_msft_connection(c): av = c.authority.value if hasattr(c.authority, 'value') else str(getattr(c, 'authority', '')) return av and str(av).lower() == 'msft' msftConnections = [c for c in connections if _is_msft_connection(c)] if not msftConnections: logger.warning('No Microsoft connections found for user') return None if len(msftConnections) == 1: logger.info(f"Found single Microsoft connection: {msftConnections[0].id}") return msftConnections[0] if sharepointPath: return await self._matchConnectionToPath(msftConnections, sharepointPath) logger.info(f"Multiple Microsoft connections found, using first one: {msftConnections[0].id}") return msftConnections[0] except Exception: logger.error('Error getting SharePoint connection') return None async def _matchConnectionToPath(self, connections: list, sharepointPath: str): try: if not sharepointPath or not sharepointPath.startswith('https://'): logger.warning(f"Invalid sharepointPath for matching: {sharepointPath}") return connections[0] if connections else None targetDomain = urlparse(sharepointPath).netloc.lower() if not targetDomain: logger.warning(f"Could not extract domain from path: {sharepointPath}") return connections[0] if connections else None logger.info(f"Looking for connection matching domain: {targetDomain}") for connection in connections: try: if not self.services.sharepoint.setAccessTokenFromConnection(connection): continue if await self._testSharepointAccess(sharepointPath): logger.info(f"Found matching connection for domain {targetDomain}: {connection.get('id')}") return connection except Exception: continue logger.warning(f"No specific connection match found for {targetDomain}, using first available") return connections[0] except Exception: logger.error('Error matching connection to path') return connections[0] if connections else None async def _testSharepointAccess(self, sharepointPath: str) -> bool: try: siteUrl, _ = self._parseSharepointPath(sharepointPath) if not siteUrl: return False siteInfo = await self.services.sharepoint.findSiteByWebUrl(siteUrl) return siteInfo is not None except Exception: return False async def _processSharepointFilesAsync(self, sourcePath: str, targetPath: str) -> Dict[str, Any]: try: sourceSite, sourceFolder = self._parseSharepointPath(sourcePath) targetSite, targetFolder = self._parseSharepointPath(targetPath) if not sourceSite or not targetSite: return {'success': False, 'message': 'Invalid SharePoint path format', 'processed_files': 0, 'errors': ['Invalid SharePoint path format']} sourceSiteInfo = await self.services.sharepoint.findSiteByWebUrl(sourceSite) if not sourceSiteInfo: return {'success': False, 'message': f'Source site not found: {sourceSite}', 'processed_files': 0, 'errors': [f'Source site not found: {sourceSite}']} targetSiteInfo = await self.services.sharepoint.findSiteByWebUrl(targetSite) if not targetSiteInfo: return {'success': False, 'message': f'Target site not found: {targetSite}', 'processed_files': 0, 'errors': [f'Target site not found: {targetSite}']} logger.info(f"Listing files in folder: {sourceFolder} for site: {sourceSiteInfo['id']}") files = await self.services.sharepoint.listFolderContents(sourceSiteInfo['id'], sourceFolder) if not files: logger.warning(f"No files found in folder '{sourceFolder}', trying root folder") files = await self.services.sharepoint.listFolderContents(sourceSiteInfo['id'], '') if files: folders = [f for f in files if f.get('type') == 'folder'] folderNames = [f.get('name') for f in folders] logger.info(f"Available folders in root: {folderNames}") folderList = ", ".join(folderNames) if folderNames else "None" return { 'success': False, 'message': f"Folder '{sourceFolder}' not found. Available folders in root: {folderList}", 'processed_files': 0, 'errors': [f"Folder '{sourceFolder}' not found. Available folders: {folderList}"], 'available_folders': folderNames, } else: return {'success': False, 'message': f'No files found in source folder: {sourceFolder}', 'processed_files': 0, 'errors': [f'No files found in source folder: {sourceFolder}']} textFiles = [f for f in files if f.get('type') == 'file'] processed: List[Dict[str, Any]] = [] errors: List[str] = [] BINARY_EXTS = {'.pdf', '.docx', '.doc', '.xlsx', '.xlsm', '.pptx', '.ppt'} async def _processSingle(fileInfo: Dict[str, Any]): try: fileContent = await self.services.sharepoint.downloadFile(sourceSiteInfo['id'], fileInfo['id']) if not fileContent: return {'error': f"Failed to download file: {fileInfo['name']}"} name_lower = (fileInfo.get('name') or '').lower() is_binary = any(name_lower.endswith(ext) for ext in BINARY_EXTS) mime_map = { '.pdf': 'application/pdf', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.doc': 'application/msword', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.xlsm': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', '.ppt': 'application/vnd.ms-powerpoint', } mime = next((mime_map[ext] for ext in BINARY_EXTS if name_lower.endswith(ext)), 'text/plain') if is_binary: result = self.services.neutralization.processBinaryBytes(fileContent, fileInfo['name'], mime) if result.get('neutralized_bytes'): content_to_upload = result['neutralized_bytes'] else: return {'error': f"Failed to neutralize binary file {fileInfo['name']}: {result.get('processed_info', {}).get('error', 'Unknown error')}"} else: try: textContent = fileContent.decode('utf-8') except UnicodeDecodeError: textContent = fileContent.decode('latin-1') result = self.services.neutralization.processText(textContent) content_to_upload = (result.get('neutralized_text') or '').encode('utf-8') neutralizedFilename = f"neutralized_{fileInfo['name']}" uploadResult = await self.services.sharepoint.uploadFile(targetSiteInfo['id'], targetFolder, neutralizedFilename, content_to_upload) if 'error' in uploadResult: return {'error': f"Failed to upload neutralized file: {neutralizedFilename} - {uploadResult['error']}"} return { 'success': True, 'original_name': fileInfo['name'], 'neutralized_name': neutralizedFilename, 'attributes_count': len(result.get('attributes', [])), } except Exception as e: return {'error': f"Error processing file {fileInfo['name']}: {str(e)}"} tasks = [ _processSingle(f) for f in textFiles ] results = await asyncio.gather(*tasks, return_exceptions=True) for i, r in enumerate(results): if isinstance(r, Exception): errors.append(f"Exception processing file {textFiles[i]['name']}: {str(r)}") elif isinstance(r, dict) and 'error' in r: errors.append(r['error']) elif isinstance(r, dict) and r.get('success'): processed.append({ 'original_name': r['original_name'], 'neutralized_name': r['neutralized_name'], 'attributes_count': r['attributes_count'], }) else: errors.append(f"Unknown result processing file {textFiles[i]['name']}: {r}") return { 'success': len(processed) > 0, 'message': f"Processed {len(processed)} files successfully", 'processed_files': len(processed), 'files': processed, 'errors': errors, } except Exception as e: logger.error(f"Error in async SharePoint processing: {str(e)}") return {'success': False, 'message': f'Error in async SharePoint processing: {str(e)}', 'processed_files': 0, 'errors': [str(e)]} def _parseSharepointPath(self, path: str) -> tuple[str, str]: try: if not path.startswith('https://'): return None, None if '?' in path: path = path.split('?')[0] if '/sites/' not in path: return None, None parts = path.split('/sites/', 1) if len(parts) != 2: return None, None domain = parts[0].replace('https://', '') siteName = parts[1].split('/')[0] siteUrl = f"https://{domain}/sites/{siteName}" folderParts = parts[1].split('/')[1:] folderPath = unquote('/'.join(folderParts) if folderParts else '') return siteUrl, folderPath except Exception: logger.error(f"Error parsing SharePoint path '{path}'") return None, None