import logging import importlib import pkgutil import inspect import os from typing import Dict, Any, List, Optional from modules.interfaces.interfaceAppModel import User, UserConnection from modules.interfaces.interfaceChatModel import ( TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, DocumentExchange, ExtractedContent ) from modules.interfaces.interfaceAiCalls import AiCalls from modules.interfaces.interfaceChatObjects import getInterface as getChatObjects from modules.interfaces.interfaceChatModel import ActionResult from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects from modules.chat.documents.documentExtraction import DocumentExtraction from modules.chat.documents.documentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData from modules.chat.methodBase import MethodBase from modules.shared.timezoneUtils import get_utc_timestamp import uuid import asyncio logger = logging.getLogger(__name__) class ServiceCenter: """Service center that provides access to all services and their functions""" def __init__(self, currentUser: User, workflow: ChatWorkflow): # Core services self.user = currentUser self.workflow = workflow self.tasks = workflow.tasks self.statusEnums = TaskStatus self.currentTask = None # Initialize current task as None # Initialize managers self.interfaceChat = getChatObjects(currentUser) self.interfaceComponent = getComponentObjects(currentUser) self.interfaceApp = getAppObjects(currentUser) self.interfaceAiCalls = AiCalls() self.documentProcessor = DocumentExtraction(self) # Initialize methods catalog self.methods = {} # Discover additional methods self._discoverMethods() def _discoverMethods(self): """Dynamically discover all method classes and their actions in modules.methods package""" try: # Import the methods package methodsPackage = importlib.import_module('modules.methods') # Discover all modules in the package for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__): if not isPkg and name.startswith('method'): try: # Import the module module = importlib.import_module(f'modules.methods.{name}') # Find all classes in the module that inherit from MethodBase for itemName, item in inspect.getmembers(module): if (inspect.isclass(item) and issubclass(item, MethodBase) and item != MethodBase): # Instantiate the method methodInstance = item(self) # Discover actions from public methods actions = {} for methodName, method in inspect.getmembers(type(methodInstance), predicate=inspect.iscoroutinefunction): if not methodName.startswith('_'): # Bind the method to the instance bound_method = method.__get__(methodInstance, type(methodInstance)) sig = inspect.signature(method) params = {} for paramName, param in sig.parameters.items(): if paramName not in ['self']: # Get parameter type paramType = param.annotation if param.annotation != param.empty else Any # Get parameter description from docstring or default paramDesc = None if param.default != param.empty and hasattr(param.default, '__doc__'): paramDesc = param.default.__doc__ params[paramName] = { 'type': paramType, 'required': param.default == param.empty, 'description': paramDesc, 'default': param.default if param.default != param.empty else None } actions[methodName] = { 'description': method.__doc__ or '', 'parameters': params, 'method': bound_method } # Add method instance with discovered actions self.methods[methodInstance.name] = { 'instance': methodInstance, 'description': methodInstance.description, 'actions': actions } logger.info(f"Discovered method: {methodInstance.name} with {len(actions)} actions") except Exception as e: logger.error(f"Error loading method module {name}: {str(e)}", exc_info=True) except Exception as e: logger.error(f"Error discovering methods: {str(e)}") # ===== Functions for Prompts: Context ===== def getMethodsList(self) -> List[str]: """Get list of available methods with their signatures in the required format""" methodList = [] for methodName, method in self.methods.items(): methodInstance = method['instance'] for actionName, action in method['actions'].items(): # Use the new signature format from MethodBase signature = methodInstance.getActionSignature(actionName) if signature: methodList.append(signature) return methodList async def summarizeChat(self, messages: List[ChatMessage]) -> str: """ Summarize chat messages from last to first message with status="first" Args: messages: List of chat messages to summarize Returns: str: Summary of the chat in user's language """ try: # Get messages from last to first, stopping at first message with status="first" relevantMessages = [] for msg in reversed(messages): relevantMessages.append(msg) if msg.status == "first": break # Create prompt for AI prompt = f"""You are an AI assistant providing a summary of a chat conversation. Please respond in '{self.user.language}' language. Chat History: {chr(10).join(f"- {msg.message}" for msg in reversed(relevantMessages))} Instructions: 1. Summarize the conversation's key points and outcomes 2. Be concise but informative 3. Use a professional but friendly tone 4. Focus on important decisions and next steps if any Please provide a comprehensive summary of this conversation.""" # Get summary using AI return await self.callAiTextBasic(prompt) except Exception as e: logger.error(f"Error summarizing chat: {str(e)}") return f"Error summarizing chat: {str(e)}" # ===== Functions for Prompts + Actions: Document References generation and resolution ===== def getEnhancedDocumentContext(self) -> str: """Get enhanced document context formatted for action planning prompts with proper docList and docItem references""" try: document_list = self.getDocumentReferenceList() # Build technical context string for AI action planning context = "AVAILABLE DOCUMENTS:\n\n" # Process chat exchanges (current round) if document_list["chat"]: context += "CURRENT ROUND DOCUMENTS:\n" for exchange in document_list["chat"]: # Generate docList reference for the exchange (using message ID and label) # Find the message that corresponds to this exchange message_id = None for message in self.workflow.messages: if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange.documentsLabel: message_id = message.id break if message_id: doc_list_ref = f"docList:{message_id}:{exchange.documentsLabel}" else: # Fallback to label-only format if message ID not found doc_list_ref = f"docList:{exchange.documentsLabel}" logger.debug(f"Using document label for action planning: {exchange.documentsLabel} (message_id: {message_id})") context += f"- {doc_list_ref} contains:\n" # Generate docItem references for each document in the list for doc_ref in exchange.documents: if doc_ref.startswith("docItem:"): context += f" - {doc_ref}\n" else: # Convert to proper docItem format if needed context += f" - docItem:{doc_ref}\n" context += "\n" # Process history exchanges (previous rounds) if document_list["history"]: context += "WORKFLOW HISTORY DOCUMENTS:\n" for exchange in document_list["history"]: # Generate docList reference for the exchange (using message ID and label) # Find the message that corresponds to this exchange message_id = None for message in self.workflow.messages: if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange.documentsLabel: message_id = message.id break if message_id: doc_list_ref = f"docList:{message_id}:{exchange.documentsLabel}" else: # Fallback to label-only format if message ID not found doc_list_ref = f"docList:{exchange.documentsLabel}" logger.debug(f"Using history document label for action planning: {exchange.documentsLabel} (message_id: {message_id})") context += f"- {doc_list_ref} contains:\n" # Generate docItem references for each document in the list for doc_ref in exchange.documents: if doc_ref.startswith("docItem:"): context += f" - {doc_ref}\n" else: # Convert to proper docItem format if needed context += f" - docItem:{doc_ref}\n" context += "\n" if not document_list["chat"] and not document_list["history"]: context += "NO DOCUMENTS AVAILABLE - This workflow has no documents to process.\n" return context except Exception as e: logger.error(f"Error generating enhanced document context: {str(e)}") return "NO DOCUMENTS AVAILABLE - Error generating document context." def getDocumentReferenceList(self) -> Dict[str, List[DocumentExchange]]: """Get list of document exchanges with new labeling format, sorted by recency""" # Collect all documents first and refresh their attributes all_documents = [] for message in self.workflow.messages: if message.documents: all_documents.extend(message.documents) # Refresh file attributes for all documents if all_documents: self._refreshDocumentFileAttributes(all_documents) chat_exchanges = [] history_exchanges = [] # Process messages in reverse order; "first" marks boundary in_current_round = True for message in reversed(self.workflow.messages): is_first = message.status == "first" if hasattr(message, 'status') else False # Build a DocumentExchange if message has documents doc_exchange = None if message.documents: if message.actionId and message.documentsLabel: # Validate that we use the same label as in the message validated_label = self._validateDocumentLabelConsistency(message) # Use the message's actual documentsLabel doc_refs = [] for doc in message.documents: doc_ref = self._getDocumentReferenceFromChatDocument(doc, message) doc_refs.append(doc_ref) doc_exchange = DocumentExchange( documentsLabel=validated_label, documents=doc_refs ) else: # Generate new labels for documents without explicit labels doc_refs = [] for doc in message.documents: doc_ref = self._getDocumentReferenceFromChatDocument(doc, message) doc_refs.append(doc_ref) if doc_refs: # Create a label based on message context context_prefix = self._generateWorkflowContextPrefix(message) context_label = f"{context_prefix}_context" doc_exchange = DocumentExchange( documentsLabel=context_label, documents=doc_refs ) # Append to appropriate container based on boundary if doc_exchange: if in_current_round: chat_exchanges.append(doc_exchange) else: history_exchanges.append(doc_exchange) # Flip boundary after including the "first" message in chat if in_current_round and is_first: in_current_round = False # Sort by recency: most recent first, then current round, then earlier rounds # Sort chat exchanges by message sequence number (most recent first) chat_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x), reverse=True) # Sort history exchanges by message sequence number (most recent first) history_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x), reverse=True) return { "chat": chat_exchanges, "history": history_exchanges } def _refreshDocumentFileAttributes(self, documents: List[ChatDocument]) -> None: """Update file attributes (fileName, fileSize, mimeType) for documents""" for doc in documents: try: file_item = self.interfaceComponent.getFile(doc.fileId) if file_item: doc.fileName = file_item.fileName doc.fileSize = file_item.fileSize doc.mimeType = file_item.mimeType else: logger.warning(f"File not found for document {doc.id}, fileId: {doc.fileId}") except Exception as e: logger.error(f"Error refreshing file attributes for document {doc.id}: {e}") def _generateWorkflowContextPrefix(self, message: ChatMessage) -> str: """Generate workflow context prefix: round{num}_task{num}_action{num}""" round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1 task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0 action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0 return f"round{round_num}_task{task_num}_action{action_num}" def _getDocumentReferenceFromChatDocument(self, document: ChatDocument, message: ChatMessage) -> str: """Get document reference using document ID and filename.""" try: # Use document ID and filename for simple reference return f"docItem:{document.id}:{document.fileName}" except Exception as e: logger.error(f"Critical error creating document reference for document {document.id}: {str(e)}") # Re-raise the error to prevent workflow from continuing with invalid data raise def _getMessageSequenceForExchange(self, exchange: DocumentExchange) -> int: """Get message sequence number for sorting exchanges by recency""" try: # Extract message ID from the first document reference if exchange.documents and len(exchange.documents) > 0: first_doc_ref = exchange.documents[0] if first_doc_ref.startswith("docItem:"): # docItem::