450 lines
23 KiB
Python
450 lines
23 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
import logging
|
|
import asyncio
|
|
from typing import Any, Dict, List, Optional
|
|
from urllib.parse import urlparse, unquote
|
|
|
|
from modules.datamodels.datamodelUam import User
|
|
from .datamodelFeatureNeutralizer import DataNeutralizerAttributes, DataNeutraliserConfig
|
|
from modules.services import getInterface as getServices
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class NeutralizationPlayground:
|
|
"""Feature/UI wrapper around NeutralizationService for playground & routes."""
|
|
|
|
def __init__(self, currentUser: User, mandateId: str, featureInstanceId: Optional[str] = None):
|
|
self.currentUser = currentUser
|
|
self.mandateId = mandateId
|
|
self.featureInstanceId = featureInstanceId
|
|
self.services = getServices(currentUser, None, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
|
|
|
def processText(self, text: str) -> Dict[str, Any]:
|
|
return self.services.neutralization.processText(text)
|
|
|
|
async def processUploadedFileAsync(self, file_bytes: bytes, filename: str) -> Dict[str, Any]:
|
|
"""Process an uploaded file (bytes + filename). Returns neutralized result for text or binary.
|
|
Saves both original and neutralized files to user files (component storage) when available."""
|
|
import base64
|
|
name_lower = (filename or '').lower()
|
|
mime_map = {
|
|
'.pdf': 'application/pdf',
|
|
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
'.xlsm': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
|
|
}
|
|
mime = next((mime_map[ext] for ext in mime_map if name_lower.endswith(ext)), 'text/plain')
|
|
binary_exts = {'.pdf', '.docx', '.xlsx', '.xlsm', '.pptx'}
|
|
is_binary = any(name_lower.endswith(ext) for ext in binary_exts)
|
|
|
|
original_file_id = None
|
|
neutralized_file_id = None
|
|
|
|
# Save original file to user files
|
|
if self.services.interfaceDbComponent:
|
|
try:
|
|
file_item, _ = self.services.interfaceDbComponent.saveUploadedFile(file_bytes, filename)
|
|
original_file_id = str(file_item.id)
|
|
except Exception as e:
|
|
logger.warning(f"Could not save original file to user files: {e}")
|
|
|
|
if is_binary:
|
|
result = await self.services.neutralization.processBinaryBytesAsync(file_bytes, filename, mime)
|
|
neu_bytes = result.get('neutralized_bytes')
|
|
logger.debug(f"Binary result: neu_bytes type={type(neu_bytes).__name__}, len={len(neu_bytes) if neu_bytes is not None else 0}")
|
|
if neu_bytes is not None and len(neu_bytes) > 0:
|
|
result['neutralized_file_base64'] = base64.b64encode(neu_bytes).decode('ascii')
|
|
result['neutralized_file_name'] = result.get('neutralized_file_name', f'neutralized_{filename}')
|
|
result['mime_type'] = result.get('mime_type', mime)
|
|
# Save neutralized binary to user files
|
|
if self.services.interfaceDbComponent:
|
|
try:
|
|
neu_name = result['neutralized_file_name']
|
|
file_item, _ = self.services.interfaceDbComponent.saveUploadedFile(neu_bytes, neu_name)
|
|
neutralized_file_id = str(file_item.id)
|
|
except Exception as e:
|
|
logger.warning(f"Could not save neutralized file to user files: {e}")
|
|
# Remove raw bytes before JSON response (avoid serialization issues; use base64 only)
|
|
result.pop('neutralized_bytes', None)
|
|
result['original_file_id'] = original_file_id
|
|
result['neutralized_file_id'] = neutralized_file_id
|
|
return result
|
|
|
|
try:
|
|
text_content = file_bytes.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
try:
|
|
text_content = file_bytes.decode('latin-1')
|
|
except UnicodeDecodeError:
|
|
return {
|
|
'neutralized_text': None,
|
|
'original_file_id': original_file_id,
|
|
'neutralized_file_id': None,
|
|
'processed_info': {'type': 'error', 'error': 'File could not be decoded as text. Supported: UTF-8, Latin-1. For PDF/Word/Excel, use supported binary formats.'}
|
|
}
|
|
result = self.services.neutralization.processText(text_content)
|
|
result['neutralized_file_name'] = f'neutralized_{filename}'
|
|
# Save neutralized text as file to user files
|
|
if self.services.interfaceDbComponent and result.get('neutralized_text') is not None:
|
|
try:
|
|
neu_text = result['neutralized_text']
|
|
neu_bytes = neu_text.encode('utf-8')
|
|
neu_name = result['neutralized_file_name']
|
|
file_item, _ = self.services.interfaceDbComponent.saveUploadedFile(neu_bytes, neu_name)
|
|
neutralized_file_id = str(file_item.id)
|
|
except Exception as e:
|
|
logger.warning(f"Could not save neutralized text file to user files: {e}")
|
|
result['original_file_id'] = original_file_id
|
|
result['neutralized_file_id'] = neutralized_file_id
|
|
return result
|
|
|
|
def processUploadedFile(self, file_bytes: bytes, filename: str) -> Dict[str, Any]:
|
|
"""Sync wrapper for sync callers. Uses asyncio.run; do NOT call from async routes (use processUploadedFileAsync)."""
|
|
return asyncio.run(self.processUploadedFileAsync(file_bytes, filename))
|
|
|
|
def processFiles(self, fileIds: List[str]) -> Dict[str, Any]:
|
|
results: List[Dict[str, Any]] = []
|
|
errors: List[str] = []
|
|
for fileId in fileIds:
|
|
try:
|
|
res = self.services.neutralization.processFile(fileId)
|
|
results.append({
|
|
'file_id': fileId,
|
|
'neutralized_file_name': res.get('neutralized_file_name'),
|
|
'attributes_count': len(res.get('attributes', []))
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error processing file {fileId}: {str(e)}")
|
|
errors.append(f"{fileId}: {str(e)}")
|
|
return {
|
|
'success': len(errors) == 0,
|
|
'total_files': len(fileIds),
|
|
'successful_files': len(results),
|
|
'failed_files': len(errors),
|
|
'results': results,
|
|
'errors': errors,
|
|
}
|
|
|
|
|
|
# Cleanup attributes
|
|
def cleanAttributes(self, fileId: str) -> bool:
|
|
return self.services.neutralization.deleteNeutralizationAttributes(fileId)
|
|
|
|
# Stats
|
|
def getStats(self) -> Dict[str, Any]:
|
|
try:
|
|
allAttributes = self.services.neutralization.getAttributes()
|
|
patternCounts: Dict[str, int] = {}
|
|
for attr in allAttributes:
|
|
# Handle both dict and object access patterns
|
|
if isinstance(attr, dict):
|
|
patternType = attr.get('patternType', 'unknown')
|
|
fileId = attr.get('fileId')
|
|
else:
|
|
patternType = getattr(attr, 'patternType', 'unknown')
|
|
fileId = getattr(attr, 'fileId', None)
|
|
|
|
if patternType:
|
|
patternCounts[patternType] = patternCounts.get(patternType, 0) + 1
|
|
|
|
# Get unique files - handle both dict and object
|
|
uniqueFiles = set()
|
|
for attr in allAttributes:
|
|
if isinstance(attr, dict):
|
|
fileId = attr.get('fileId')
|
|
else:
|
|
fileId = getattr(attr, 'fileId', None)
|
|
if fileId:
|
|
uniqueFiles.add(fileId)
|
|
|
|
return {
|
|
'total_attributes': len(allAttributes),
|
|
'unique_files': len(uniqueFiles),
|
|
'pattern_counts': patternCounts,
|
|
'mandate_id': self.mandateId,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting stats: {str(e)}")
|
|
return {
|
|
'total_attributes': 0,
|
|
'unique_files': 0,
|
|
'pattern_counts': {},
|
|
'error': str(e),
|
|
}
|
|
|
|
# Additional methods needed by the route
|
|
def getConfig(self) -> Optional[DataNeutraliserConfig]:
|
|
"""Get neutralization configuration"""
|
|
return self.services.neutralization.getConfig()
|
|
|
|
def saveConfig(self, configData: Dict[str, Any]) -> DataNeutraliserConfig:
|
|
"""Save neutralization configuration"""
|
|
return self.services.neutralization.saveConfig(configData)
|
|
|
|
def neutralizeText(self, text: str, fileId: str = None) -> Dict[str, Any]:
|
|
"""Neutralize text content"""
|
|
return self.services.neutralization.processText(text)
|
|
|
|
def resolveText(self, text: str) -> str:
|
|
"""Resolve UIDs in neutralized text back to original text"""
|
|
return self.services.neutralization.resolveText(text)
|
|
|
|
def getAttributes(self, fileId: str = None) -> List[DataNeutralizerAttributes]:
|
|
"""Get neutralization attributes, optionally filtered by file ID"""
|
|
try:
|
|
allAttributes = self.services.neutralization.getAttributes()
|
|
if fileId:
|
|
return [attr for attr in allAttributes if attr.fileId == fileId]
|
|
return allAttributes
|
|
except Exception as e:
|
|
logger.error(f"Error getting attributes: {str(e)}")
|
|
return []
|
|
|
|
async def processSharepointFiles(self, sourcePath: str, targetPath: str) -> Dict[str, Any]:
|
|
"""Process files from SharePoint source path and store neutralized files in target path"""
|
|
from modules.services.serviceSharepoint.mainServiceSharepoint import SharepointService
|
|
processor = SharepointProcessor(self.currentUser, self.services)
|
|
return await processor.processSharepointFiles(sourcePath, targetPath)
|
|
|
|
def batchNeutralizeFiles(self, filesData: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""Process multiple files for neutralization"""
|
|
fileIds = [fileData.get('fileId') for fileData in filesData if fileData.get('fileId')]
|
|
return self.processFiles(fileIds)
|
|
|
|
def getProcessingStats(self) -> Dict[str, Any]:
|
|
"""Get neutralization processing statistics"""
|
|
return self.getStats()
|
|
|
|
def cleanupFileAttributes(self, fileId: str) -> bool:
|
|
"""Clean up neutralization attributes for a specific file"""
|
|
return self.cleanAttributes(fileId)
|
|
|
|
|
|
# Internal SharePoint helper module separated to keep feature logic tidy
|
|
class SharepointProcessor:
|
|
def __init__(self, currentUser: User, services):
|
|
self.currentUser = currentUser
|
|
self.services = services
|
|
|
|
async def processSharepointFiles(self, sourcePath: str, targetPath: str) -> Dict[str, Any]:
|
|
try:
|
|
logger.info(f"Processing SharePoint files from {sourcePath} to {targetPath}")
|
|
|
|
# Get SharePoint connection
|
|
connection = await self._getSharepointConnection(sourcePath)
|
|
if not connection:
|
|
return {
|
|
'success': False,
|
|
'message': 'No SharePoint connection found for user',
|
|
'processed_files': 0,
|
|
'errors': ['No SharePoint connection found'],
|
|
}
|
|
|
|
# Set access token for SharePoint service
|
|
if not self.services.sharepoint.setAccessTokenFromConnection(connection):
|
|
return {
|
|
'success': False,
|
|
'message': 'Failed to set SharePoint access token',
|
|
'processed_files': 0,
|
|
'errors': ['Failed to set SharePoint access token'],
|
|
}
|
|
|
|
return await self._processSharepointFilesAsync(sourcePath, targetPath)
|
|
except Exception as e:
|
|
logger.error(f"Error processing SharePoint files: {str(e)}")
|
|
return {
|
|
'success': False,
|
|
'message': f'Error processing SharePoint files: {str(e)}',
|
|
'processed_files': 0,
|
|
'errors': [str(e)],
|
|
}
|
|
|
|
async def _getSharepointConnection(self, sharepointPath: str = None):
|
|
try:
|
|
# Use interface method to get user connections
|
|
connections = self.services.interfaceDbApp.getUserConnections(self.services.interfaceDbApp.userId)
|
|
msftConnections = [c for c in connections if c.authority == 'msft']
|
|
if not msftConnections:
|
|
logger.warning('No Microsoft connections found for user')
|
|
return None
|
|
if len(msftConnections) == 1:
|
|
logger.info(f"Found single Microsoft connection: {msftConnections[0].id}")
|
|
return msftConnections[0]
|
|
if sharepointPath:
|
|
return await self._matchConnectionToPath(msftConnections, sharepointPath)
|
|
logger.info(f"Multiple Microsoft connections found, using first one: {msftConnections[0].id}")
|
|
return msftConnections[0]
|
|
except Exception:
|
|
logger.error('Error getting SharePoint connection')
|
|
return None
|
|
|
|
async def _matchConnectionToPath(self, connections: list, sharepointPath: str):
|
|
try:
|
|
if not sharepointPath or not sharepointPath.startswith('https://'):
|
|
logger.warning(f"Invalid sharepointPath for matching: {sharepointPath}")
|
|
return connections[0] if connections else None
|
|
|
|
targetDomain = urlparse(sharepointPath).netloc.lower()
|
|
if not targetDomain:
|
|
logger.warning(f"Could not extract domain from path: {sharepointPath}")
|
|
return connections[0] if connections else None
|
|
|
|
logger.info(f"Looking for connection matching domain: {targetDomain}")
|
|
|
|
for connection in connections:
|
|
try:
|
|
if not self.services.sharepoint.setAccessTokenFromConnection(connection):
|
|
continue
|
|
if await self._testSharepointAccess(sharepointPath):
|
|
logger.info(f"Found matching connection for domain {targetDomain}: {connection.get('id')}")
|
|
return connection
|
|
except Exception:
|
|
continue
|
|
logger.warning(f"No specific connection match found for {targetDomain}, using first available")
|
|
return connections[0]
|
|
except Exception:
|
|
logger.error('Error matching connection to path')
|
|
return connections[0] if connections else None
|
|
|
|
async def _testSharepointAccess(self, sharepointPath: str) -> bool:
|
|
try:
|
|
siteUrl, _ = self._parseSharepointPath(sharepointPath)
|
|
if not siteUrl:
|
|
return False
|
|
siteInfo = await self.services.sharepoint.findSiteByWebUrl(siteUrl)
|
|
return siteInfo is not None
|
|
except Exception:
|
|
return False
|
|
|
|
async def _processSharepointFilesAsync(self, sourcePath: str, targetPath: str) -> Dict[str, Any]:
|
|
try:
|
|
sourceSite, sourceFolder = self._parseSharepointPath(sourcePath)
|
|
targetSite, targetFolder = self._parseSharepointPath(targetPath)
|
|
if not sourceSite or not targetSite:
|
|
return {'success': False, 'message': 'Invalid SharePoint path format', 'processed_files': 0, 'errors': ['Invalid SharePoint path format']}
|
|
sourceSiteInfo = await self.services.sharepoint.findSiteByWebUrl(sourceSite)
|
|
if not sourceSiteInfo:
|
|
return {'success': False, 'message': f'Source site not found: {sourceSite}', 'processed_files': 0, 'errors': [f'Source site not found: {sourceSite}']}
|
|
targetSiteInfo = await self.services.sharepoint.findSiteByWebUrl(targetSite)
|
|
if not targetSiteInfo:
|
|
return {'success': False, 'message': f'Target site not found: {targetSite}', 'processed_files': 0, 'errors': [f'Target site not found: {targetSite}']}
|
|
logger.info(f"Listing files in folder: {sourceFolder} for site: {sourceSiteInfo['id']}")
|
|
files = await self.services.sharepoint.listFolderContents(sourceSiteInfo['id'], sourceFolder)
|
|
if not files:
|
|
logger.warning(f"No files found in folder '{sourceFolder}', trying root folder")
|
|
files = await self.services.sharepoint.listFolderContents(sourceSiteInfo['id'], '')
|
|
if files:
|
|
folders = [f for f in files if f.get('type') == 'folder']
|
|
folderNames = [f.get('name') for f in folders]
|
|
logger.info(f"Available folders in root: {folderNames}")
|
|
folderList = ", ".join(folderNames) if folderNames else "None"
|
|
return {
|
|
'success': False,
|
|
'message': f"Folder '{sourceFolder}' not found. Available folders in root: {folderList}",
|
|
'processed_files': 0,
|
|
'errors': [f"Folder '{sourceFolder}' not found. Available folders: {folderList}"],
|
|
'available_folders': folderNames,
|
|
}
|
|
else:
|
|
return {'success': False, 'message': f'No files found in source folder: {sourceFolder}', 'processed_files': 0, 'errors': [f'No files found in source folder: {sourceFolder}']}
|
|
|
|
textFiles = [f for f in files if f.get('type') == 'file']
|
|
processed: List[Dict[str, Any]] = []
|
|
errors: List[str] = []
|
|
|
|
BINARY_EXTS = {'.pdf', '.docx', '.doc', '.xlsx', '.xlsm', '.pptx', '.ppt'}
|
|
|
|
async def _processSingle(fileInfo: Dict[str, Any]):
|
|
try:
|
|
fileContent = await self.services.sharepoint.downloadFile(sourceSiteInfo['id'], fileInfo['id'])
|
|
if not fileContent:
|
|
return {'error': f"Failed to download file: {fileInfo['name']}"}
|
|
name_lower = (fileInfo.get('name') or '').lower()
|
|
is_binary = any(name_lower.endswith(ext) for ext in BINARY_EXTS)
|
|
mime_map = {
|
|
'.pdf': 'application/pdf',
|
|
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
'.doc': 'application/msword',
|
|
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
'.xlsm': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
|
|
'.ppt': 'application/vnd.ms-powerpoint',
|
|
}
|
|
mime = next((mime_map[ext] for ext in BINARY_EXTS if name_lower.endswith(ext)), 'text/plain')
|
|
|
|
if is_binary:
|
|
result = self.services.neutralization.processBinaryBytes(fileContent, fileInfo['name'], mime)
|
|
if result.get('neutralized_bytes'):
|
|
content_to_upload = result['neutralized_bytes']
|
|
else:
|
|
return {'error': f"Failed to neutralize binary file {fileInfo['name']}: {result.get('processed_info', {}).get('error', 'Unknown error')}"}
|
|
else:
|
|
try:
|
|
textContent = fileContent.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
textContent = fileContent.decode('latin-1')
|
|
result = self.services.neutralization.processText(textContent)
|
|
content_to_upload = (result.get('neutralized_text') or '').encode('utf-8')
|
|
|
|
neutralizedFilename = f"neutralized_{fileInfo['name']}"
|
|
uploadResult = await self.services.sharepoint.uploadFile(targetSiteInfo['id'], targetFolder, neutralizedFilename, content_to_upload)
|
|
if 'error' in uploadResult:
|
|
return {'error': f"Failed to upload neutralized file: {neutralizedFilename} - {uploadResult['error']}"}
|
|
return {
|
|
'success': True,
|
|
'original_name': fileInfo['name'],
|
|
'neutralized_name': neutralizedFilename,
|
|
'attributes_count': len(result.get('attributes', [])),
|
|
}
|
|
except Exception as e:
|
|
return {'error': f"Error processing file {fileInfo['name']}: {str(e)}"}
|
|
|
|
tasks = [ _processSingle(f) for f in textFiles ]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
for i, r in enumerate(results):
|
|
if isinstance(r, Exception):
|
|
errors.append(f"Exception processing file {textFiles[i]['name']}: {str(r)}")
|
|
elif isinstance(r, dict) and 'error' in r:
|
|
errors.append(r['error'])
|
|
elif isinstance(r, dict) and r.get('success'):
|
|
processed.append({
|
|
'original_name': r['original_name'],
|
|
'neutralized_name': r['neutralized_name'],
|
|
'attributes_count': r['attributes_count'],
|
|
})
|
|
else:
|
|
errors.append(f"Unknown result processing file {textFiles[i]['name']}: {r}")
|
|
return {
|
|
'success': len(processed) > 0,
|
|
'message': f"Processed {len(processed)} files successfully",
|
|
'processed_files': len(processed),
|
|
'files': processed,
|
|
'errors': errors,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error in async SharePoint processing: {str(e)}")
|
|
return {'success': False, 'message': f'Error in async SharePoint processing: {str(e)}', 'processed_files': 0, 'errors': [str(e)]}
|
|
|
|
def _parseSharepointPath(self, path: str) -> tuple[str, str]:
|
|
try:
|
|
if not path.startswith('https://'):
|
|
return None, None
|
|
if '?' in path:
|
|
path = path.split('?')[0]
|
|
if '/sites/' not in path:
|
|
return None, None
|
|
parts = path.split('/sites/', 1)
|
|
if len(parts) != 2:
|
|
return None, None
|
|
domain = parts[0].replace('https://', '')
|
|
siteName = parts[1].split('/')[0]
|
|
siteUrl = f"https://{domain}/sites/{siteName}"
|
|
folderParts = parts[1].split('/')[1:]
|
|
folderPath = unquote('/'.join(folderParts) if folderParts else '')
|
|
return siteUrl, folderPath
|
|
except Exception:
|
|
logger.error(f"Error parsing SharePoint path '{path}'")
|
|
return None, None
|