import logging import importlib import pkgutil import inspect import os from typing import Dict, Any, List, Optional from modules.interfaces.interfaceAppModel import User, UserConnection from modules.interfaces.interfaceChatModel import ( TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, DocumentExchange, ExtractedContent ) from modules.interfaces.interfaceAiCalls import AiCalls from modules.interfaces.interfaceChatObjects import getInterface as getChatObjects from modules.interfaces.interfaceChatModel import ActionResult from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects from modules.chat.documents.documentExtraction import DocumentExtraction from modules.chat.methodBase import MethodBase from modules.shared.timezoneUtils import get_utc_timestamp import uuid import asyncio logger = logging.getLogger(__name__) class ServiceCenter: """Service center that provides access to all services and their functions""" def __init__(self, currentUser: User, workflow: ChatWorkflow): # Core services self.user = currentUser self.workflow = workflow self.tasks = workflow.tasks self.statusEnums = TaskStatus self.currentTask = None # Initialize current task as None # Initialize managers self.interfaceChat = getChatObjects(currentUser) self.interfaceComponent = getComponentObjects(currentUser) self.interfaceApp = getAppObjects(currentUser) self.interfaceAiCalls = AiCalls() self.documentProcessor = DocumentExtraction(self) # Initialize methods catalog self.methods = {} # Discover additional methods self._discoverMethods() def _discoverMethods(self): """Dynamically discover all method classes and their actions in modules.methods package""" try: # Import the methods package methodsPackage = importlib.import_module('modules.methods') # Discover all modules in the package for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__): if not isPkg and name.startswith('method'): try: # Import the module module = importlib.import_module(f'modules.methods.{name}') # Find all classes in the module that inherit from MethodBase for itemName, item in inspect.getmembers(module): if (inspect.isclass(item) and issubclass(item, MethodBase) and item != MethodBase): # Instantiate the method methodInstance = item(self) # Discover actions from public methods actions = {} for methodName, method in inspect.getmembers(type(methodInstance), predicate=inspect.iscoroutinefunction): if not methodName.startswith('_'): # Bind the method to the instance bound_method = method.__get__(methodInstance, type(methodInstance)) sig = inspect.signature(method) params = {} for paramName, param in sig.parameters.items(): if paramName not in ['self']: # Get parameter type paramType = param.annotation if param.annotation != param.empty else Any # Get parameter description from docstring or default paramDesc = None if param.default != param.empty and hasattr(param.default, '__doc__'): paramDesc = param.default.__doc__ params[paramName] = { 'type': paramType, 'required': param.default == param.empty, 'description': paramDesc, 'default': param.default if param.default != param.empty else None } actions[methodName] = { 'description': method.__doc__ or '', 'parameters': params, 'method': bound_method } # Add method instance with discovered actions self.methods[methodInstance.name] = { 'instance': methodInstance, 'description': methodInstance.description, 'actions': actions } logger.info(f"Discovered method: {methodInstance.name} with {len(actions)} actions") except Exception as e: logger.error(f"Error loading method module {name}: {str(e)}", exc_info=True) except Exception as e: logger.error(f"Error discovering methods: {str(e)}") def detectContentTypeFromData(self, fileData: bytes, fileName: str) -> str: """ Detect content type from file data and fileName. This method makes the MIME type detection function accessible through the service center. Args: fileData: Raw file data as bytes fileName: Name of the file Returns: str: Detected MIME type """ try: # Check file extension first ext = os.path.splitext(fileName)[1].lower() if ext: # Map common extensions to MIME types extToMime = { '.txt': 'text/plain', '.md': 'text/markdown', '.csv': 'text/csv', '.json': 'application/json', '.xml': 'application/xml', '.js': 'application/javascript', '.py': 'application/x-python', '.svg': 'image/svg+xml', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp', '.pdf': 'application/pdf', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.doc': 'application/msword', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.xls': 'application/vnd.ms-excel', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', '.ppt': 'application/vnd.ms-powerpoint', '.html': 'text/html', '.htm': 'text/html', '.css': 'text/css', '.zip': 'application/zip', '.rar': 'application/x-rar-compressed', '.7z': 'application/x-7z-compressed', '.tar': 'application/x-tar', '.gz': 'application/gzip' } if ext in extToMime: return extToMime[ext] # Try to detect from content if fileData.startswith(b'%PDF'): return 'application/pdf' elif fileData.startswith(b'PK\x03\x04'): # ZIP-based formats (docx, xlsx, pptx) return 'application/zip' elif fileData.startswith(b'<'): # XML-based formats try: text = fileData.decode('utf-8', errors='ignore') if ' str: """ Get MIME type based on file extension. This method consolidates MIME type detection from extension. Args: extension: File extension (with or without dot) Returns: str: MIME type for the extension """ # Normalize extension (remove dot if present) if extension.startswith('.'): extension = extension[1:] # Map extensions to MIME types mime_types = { 'txt': 'text/plain', 'json': 'application/json', 'xml': 'application/xml', 'csv': 'text/csv', 'html': 'text/html', 'htm': 'text/html', 'md': 'text/markdown', 'py': 'text/x-python', 'js': 'application/javascript', 'css': 'text/css', 'pdf': 'application/pdf', 'doc': 'application/msword', 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'xls': 'application/vnd.ms-excel', 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'ppt': 'application/vnd.ms-powerpoint', 'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'svg': 'image/svg+xml', 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif', 'bmp': 'image/bmp', 'webp': 'image/webp', 'zip': 'application/zip', 'rar': 'application/x-rar-compressed', '7z': 'application/x-7z-compressed', 'tar': 'application/x-tar', 'gz': 'application/gzip' } return mime_types.get(extension.lower(), 'application/octet-stream') def getFileExtension(self, fileName: str) -> str: """ Extract file extension from fileName. Args: fileName: Name of the file Returns: str: File extension (without dot) """ if '.' in fileName: return fileName.split('.')[-1].lower() return "txt" # Default to text def getFileExtension(self, fileName): """ Extract file extension from fileName (without dot, lowercased). Returns empty string if no extension is found. """ if '.' in fileName: return fileName.rsplit('.', 1)[-1].lower() return '' # ===== Functions ===== def getMethodsList(self) -> List[str]: """Get list of available methods with their signatures in the required format""" methodList = [] for methodName, method in self.methods.items(): methodInstance = method['instance'] for actionName, action in method['actions'].items(): # Use the new signature format from MethodBase signature = methodInstance.getActionSignature(actionName) if signature: methodList.append(signature) return methodList def generateDocumentLabel(self, document: ChatDocument, message: ChatMessage) -> str: """Generate new document label: round+task+action+filename.extension""" try: # Get workflow context from message round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1 task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0 action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0 # Get file extension from document's fileName property try: file_extension = self.getFileExtension(document.fileName) filename = document.fileName except Exception as e: # Try to diagnose and recover the issue diagnosis = self.diagnoseDocumentAccess(document) logger.error(f"Critical error: Cannot access document fileName for document {document.id}. Diagnosis: {diagnosis}") # Attempt recovery if self.recoverDocumentAccess(document): try: file_extension = self.getFileExtension(document.fileName) filename = document.fileName logger.info(f"Document access recovered for {document.id}") except Exception as recovery_error: logger.error(f"Recovery failed for document {document.id}: {str(recovery_error)}") raise RuntimeError(f"Document {document.id} is permanently inaccessible after recovery attempt: {str(recovery_error)}") else: # Recovery failed - don't continue with invalid data raise RuntimeError(f"Document {document.id} is inaccessible and recovery failed. Diagnosis: {diagnosis}") # Construct label: round1_task2_action3_filename.ext if file_extension: label = f"round{round_num}_task{task_num}_action{action_num}_{filename}" else: label = f"round{round_num}_task{task_num}_action{action_num}_{filename}" return label except Exception as e: logger.error(f"Critical error generating document label for document {document.id}: {str(e)}") # Re-raise the error to prevent workflow from continuing with invalid data raise def getDocumentReferenceList(self) -> Dict[str, List[DocumentExchange]]: """Get list of document exchanges with new labeling format, sorted by recency""" # Collect all documents first and refresh their attributes all_documents = [] for message in self.workflow.messages: if message.documents: all_documents.extend(message.documents) # Refresh file attributes for all documents if all_documents: self.refreshDocumentFileAttributes(all_documents) chat_exchanges = [] history_exchanges = [] # Process messages in reverse order; "first" marks boundary in_current_round = True for message in reversed(self.workflow.messages): is_first = message.status == "first" if hasattr(message, 'status') else False # Build a DocumentExchange if message has documents doc_exchange = None if message.documents: if message.actionId and message.documentsLabel: # Use new document label format doc_refs = [] for doc in message.documents: doc_ref = self.getDocumentReferenceFromChatDocument(doc, message) doc_refs.append(doc_ref) doc_exchange = DocumentExchange( documentsLabel=message.documentsLabel, documents=doc_refs ) else: # Generate new labels for documents without explicit labels doc_refs = [] for doc in message.documents: doc_ref = self.getDocumentReferenceFromChatDocument(doc, message) doc_refs.append(doc_ref) if doc_refs: # Create a label based on message context round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1 task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0 action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0 context_label = f"round{round_num}_task{task_num}_action{action_num}_context" doc_exchange = DocumentExchange( documentsLabel=context_label, documents=doc_refs ) # Append to appropriate container based on boundary if doc_exchange: if in_current_round: chat_exchanges.append(doc_exchange) else: history_exchanges.append(doc_exchange) # Flip boundary after including the "first" message in chat if in_current_round and is_first: in_current_round = False # Sort by recency: most recent first, then current round, then earlier rounds # Sort chat exchanges by message sequence number (most recent first) chat_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x), reverse=True) # Sort history exchanges by message sequence number (most recent first) history_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x), reverse=True) return { "chat": chat_exchanges, "history": history_exchanges } def _getMessageSequenceForExchange(self, exchange: DocumentExchange) -> int: """Get message sequence number for sorting exchanges by recency""" try: # Extract message ID from the first document reference if exchange.documents and len(exchange.documents) > 0: first_doc_ref = exchange.documents[0] if first_doc_ref.startswith("docItem:"): # docItem::