import asyncio import logging import uuid import json import time from typing import Dict, Any, Optional, List, Union from datetime import datetime, UTC from modules.interfaces.interfaceAppModel import User from modules.interfaces.interfaceChatModel import ( TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow ) from modules.workflow.serviceCenter import ServiceCenter from modules.interfaces.interfaceChatObjects import ChatObjects logger = logging.getLogger(__name__) class ChatManager: """Chat manager with improved AI integration and method handling""" def __init__(self, currentUser: User, chatInterface: ChatObjects): self.currentUser = currentUser self.chatInterface = chatInterface self.service: ServiceCenter = None self.workflow: ChatWorkflow = None # Circuit breaker for AI calls self.ai_failure_count = 0 self.ai_last_failure_time = None self.ai_circuit_breaker_threshold = 5 self.ai_circuit_breaker_timeout = 300 # 5 minutes # Timeout settings self.ai_call_timeout = 120 # 2 minutes self.task_execution_timeout = 600 # 10 minutes # ===== Initialization and Setup ===== async def initialize(self, workflow: ChatWorkflow) -> None: """Initialize chat manager with workflow""" self.workflow = workflow self.service = ServiceCenter(self.currentUser, self.workflow) # ===== WORKFLOW PHASES ===== # Phase 1: High-Level Task Planning async def planHighLevelTasks(self, userInput: str, workflow: ChatWorkflow) -> Dict[str, Any]: """Phase 1: Plan high-level tasks from user input""" try: logger.info(f"Planning high-level tasks for workflow {workflow.id}") # Create planning prompt prompt = self._createTaskPlanningPrompt({ 'user_request': userInput, 'available_documents': self._getAvailableDocuments(workflow), 'workflow_id': workflow.id }) # Get AI response response = await self.service.callAiTextAdvanced(prompt) # Parse and validate task plan task_plan = self._parseTaskPlanResponse(response) if not self._validateTaskPlan(task_plan): logger.warning("Generated task plan failed validation, using fallback") task_plan = self._createFallbackTaskPlan({ 'user_request': userInput, 'available_documents': self._getAvailableDocuments(workflow) }) logger.info(f"High-level task planning completed: {len(task_plan.get('tasks', []))} tasks") return task_plan except Exception as e: logger.error(f"Error in high-level task planning: {str(e)}") return self._createFallbackTaskPlan({ 'user_request': userInput, 'available_documents': self._getAvailableDocuments(workflow) }) # Phase 2: Task Definition and Action Generation async def defineTaskActions(self, task_step: Dict[str, Any], workflow: ChatWorkflow, previous_results: List[str] = None, enhanced_context: Dict[str, Any] = None) -> List[TaskAction]: """Phase 2: Define specific actions for a task step with enhanced retry context""" try: logger.info(f"Defining actions for task: {task_step.get('description', 'Unknown')}") # Use enhanced context if provided (for retries), otherwise create basic context if enhanced_context: context = enhanced_context else: context = { 'task_step': task_step, 'workflow': workflow, 'workflow_id': workflow.id, 'available_documents': self._getAvailableDocuments(workflow), 'previous_results': previous_results or [], 'improvements': None, 'retry_count': 0, 'previous_action_results': [], 'previous_review_result': None } # Generate actions using AI actions = await self._generateActionsForTaskStep(context) # Convert to TaskAction objects task_actions = [] for action_dict in actions: action_data = { "execMethod": action_dict.get('method', 'unknown'), "execAction": action_dict.get('action', 'unknown'), "execParameters": action_dict.get('parameters', {}), "execResultLabel": action_dict.get('resultLabel', ''), "status": TaskStatus.PENDING } task_action = self.chatInterface.createTaskAction(action_data) if task_action: task_actions.append(task_action) logger.info(f"Created task action: {task_action.execMethod}.{task_action.execAction}") # Update stats for task validation (estimate bytes for action validation) if task_actions: # Calculate actual action size for stats action_size = self.service.calculateObjectSize(task_actions) self.service.updateWorkflowStats(eventLabel="action", bytesSent=action_size) logger.info(f"Task action definition completed: {len(task_actions)} actions") return task_actions except Exception as e: logger.error(f"Error defining task actions: {str(e)}") return [] # Phase 3: Action Execution async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[Dict[str, Any]]: """Phase 3: Execute all actions for a task""" try: logger.info(f"Executing {len(task_actions)} task actions") results = [] for i, action in enumerate(task_actions): logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}") # Execute single action result = await self._executeSingleAction(action, workflow) results.append(result) # If action failed, stop execution if result.get('status') == 'failed': logger.error(f"Action {i+1} failed, stopping task execution") break logger.info(f"Task action execution completed: {len(results)} results") return results except Exception as e: logger.error(f"Error executing task actions: {str(e)}") return [] # Phase 4: Task Review and Quality Assessment async def reviewTaskCompletion(self, task_step: Dict[str, Any], task_actions: List[TaskAction], action_results: List[Dict[str, Any]], workflow: ChatWorkflow) -> Dict[str, Any]: """Phase 4: Review task completion and decide next steps""" try: logger.info(f"Reviewing task completion: {task_step.get('description', 'Unknown')}") # Create step result summary from action results step_result = { 'task_step': task_step, 'action_results': action_results, 'successful_actions': sum(1 for result in action_results if result.get('status') == 'completed'), 'total_actions': len(action_results), 'results': [result.get('result', '') for result in action_results if result.get('status') == 'completed'], 'errors': [result.get('error', '') for result in action_results if result.get('status') == 'failed'] } # Prepare review context review_context = { 'task_step': task_step, 'task_actions': task_actions, 'action_results': action_results, 'step_result': step_result, # Add the missing step_result 'workflow_id': workflow.id, 'previous_results': self._getPreviousResultsFromActions(task_actions) } # Use AI to review the results review = await self._performTaskReview(review_context) # Add quality metrics review['quality_metrics'] = self._calculateTaskQualityMetrics(task_step, action_results) logger.info(f"Task review completed: {review.get('status', 'unknown')}") return review except Exception as e: logger.error(f"Error reviewing task completion: {str(e)}") return { 'status': 'failed', 'reason': f'Review failed: {str(e)}', 'quality_metrics': {'score': 0, 'confidence': 0} } # Phase 5: Task Handover and State Management async def prepareTaskHandover(self, task_step: Dict[str, Any], task_actions: List[TaskAction], review_result: Dict[str, Any], workflow: ChatWorkflow) -> Dict[str, Any]: """Phase 5: Prepare results for next task or workflow completion""" try: logger.info(f"Preparing task handover: {task_step.get('description', 'Unknown')}") # Update task actions with results for action in task_actions: if action.status == TaskStatus.PENDING: action.status = TaskStatus.COMPLETED if review_result.get('status') == 'success' else TaskStatus.FAILED # Create serializable task actions task_actions_serializable = [] for action in task_actions: action_dict = { 'id': action.id, 'execMethod': action.execMethod, 'execAction': action.execAction, 'execParameters': action.execParameters, 'execResultLabel': action.execResultLabel, 'status': action.status.value if hasattr(action.status, 'value') else str(action.status) } task_actions_serializable.append(action_dict) # Create handover data handover_data = { 'task_step': task_step, 'task_actions': task_actions_serializable, 'review_result': review_result, 'next_task_ready': review_result.get('status') == 'success', 'available_results': self._getPreviousResultsFromActions(task_actions) } logger.info(f"Task handover prepared: next_task_ready={handover_data['next_task_ready']}") return handover_data except Exception as e: logger.error(f"Error preparing task handover: {str(e)}") # Create serializable task actions for exception case task_actions_serializable = [] for action in task_actions: action_dict = { 'id': action.id, 'execMethod': action.execMethod, 'execAction': action.execAction, 'execParameters': action.execParameters, 'execResultLabel': action.execResultLabel, 'status': action.status.value if hasattr(action.status, 'value') else str(action.status) } task_actions_serializable.append(action_dict) return { 'task_step': task_step, 'task_actions': task_actions_serializable, 'review_result': review_result, 'next_task_ready': False, 'available_results': [] } # ===== Utility Methods ===== async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]: """Process file IDs and return ChatDocument objects""" documents = [] for fileId in fileIds: try: # Ensure service is initialized if not hasattr(self, 'service') or not self.service: logger.error(f"Service not initialized for file ID {fileId}") continue # Get file info from service fileInfo = self.service.getFileInfo(fileId) if fileInfo: # Create document using interface documentData = { "fileId": fileId, "filename": fileInfo.get("filename", "unknown"), "fileSize": fileInfo.get("size", 0), "mimeType": fileInfo.get("mimeType", "application/octet-stream") } document = self.chatInterface.createChatDocument(documentData) if document: documents.append(document) logger.info(f"Processed file ID {fileId} -> {document.filename}") else: logger.warning(f"No file info found for file ID {fileId}") except Exception as e: logger.error(f"Error processing file ID {fileId}: {str(e)}") return documents def setUserLanguage(self, language: str) -> None: """Set user language for the chat manager""" if hasattr(self, 'service') and self.service: self.service.user.language = language # ===== Enhanced Task Planning Methods ===== async def _callAIWithCircuitBreaker(self, prompt: str, context: str) -> str: """Call AI with circuit breaker pattern for fault tolerance""" try: # Check circuit breaker if self._isCircuitBreakerOpen(): raise Exception("AI circuit breaker is open - too many recent failures") # Call AI with timeout logger.debug(f"ACTION GENERATION PROMPT: {prompt}") response = await asyncio.wait_for( self._callAI(prompt, context), timeout=self.ai_call_timeout ) # Reset failure count on success self.ai_failure_count = 0 return response except asyncio.TimeoutError: self._recordAIFailure("Timeout") raise Exception(f"AI call timed out after {self.ai_call_timeout} seconds") except Exception as e: self._recordAIFailure(str(e)) raise def _isCircuitBreakerOpen(self) -> bool: """Check if circuit breaker is open""" if self.ai_failure_count >= self.ai_circuit_breaker_threshold: if self.ai_last_failure_time: time_since_failure = (datetime.now(UTC) - self.ai_last_failure_time).total_seconds() if time_since_failure < self.ai_circuit_breaker_timeout: return True else: # Reset circuit breaker after timeout self.ai_failure_count = 0 self.ai_last_failure_time = None return False def _recordAIFailure(self, error: str): """Record AI failure for circuit breaker""" self.ai_failure_count += 1 self.ai_last_failure_time = datetime.now(UTC) logger.warning(f"AI failure recorded ({self.ai_failure_count}/{self.ai_circuit_breaker_threshold}): {error}") def _validateTaskPlan(self, task_plan: Dict[str, Any]) -> bool: """Validate task plan structure and dependencies""" try: if not isinstance(task_plan, dict): return False if 'tasks' not in task_plan or not isinstance(task_plan['tasks'], list): return False # Check each task task_ids = set() for task in task_plan['tasks']: if not isinstance(task, dict): return False required_fields = ['id', 'description', 'expected_outputs', 'success_criteria'] if not all(field in task for field in required_fields): return False # Check for duplicate task IDs if task['id'] in task_ids: return False task_ids.add(task['id']) # Validate dependencies dependencies = task.get('dependencies', []) if not isinstance(dependencies, list): return False # Check that dependencies reference existing tasks for dep in dependencies: if dep not in task_ids and dep != 'task_0': # Allow task_0 as special case return False # Validate ai_prompt if present (optional field) if 'ai_prompt' in task and not isinstance(task['ai_prompt'], str): return False return True except Exception as e: logger.error(f"Error validating task plan: {str(e)}") return False def _createFallbackTaskPlan(self, context: Dict[str, Any]) -> Dict[str, Any]: """Create a fallback task plan when AI generation fails""" logger.warning("Creating fallback task plan due to AI generation failure") return { "overview": "Fallback task plan - comprehensive document analysis and processing", "tasks": [ { "id": "task_1", "description": "Extract and analyze all provided documents comprehensively", "dependencies": [], "expected_outputs": ["comprehensive_document_analysis"], "success_criteria": ["All documents processed and analyzed"], "required_documents": context.get('available_documents', []), "estimated_complexity": "medium", "ai_prompt": "Extract and analyze all content from the provided documents. Identify key information, patterns, and insights that are relevant to the user's request. Provide a comprehensive analysis that can be used for further processing." }, { "id": "task_2", "description": "Generate comprehensive output based on analysis", "dependencies": ["task_1"], "expected_outputs": ["final_output"], "success_criteria": ["Output generated and formatted appropriately"], "required_documents": ["comprehensive_document_analysis"], "estimated_complexity": "low", "ai_prompt": "Based on the comprehensive document analysis, generate the final output that addresses the user's original request. Format the output appropriately and ensure it meets the user's requirements." } ] } def _validateActions(self, actions: List[Dict[str, Any]], context: Dict[str, Any]) -> bool: """Validate generated actions""" try: if not isinstance(actions, list): logger.error("Actions must be a list") return False if len(actions) == 0: logger.warning("No actions generated") return False for i, action in enumerate(actions): if not isinstance(action, dict): logger.error(f"Action {i} must be a dictionary") return False # Check required fields required_fields = ['method', 'action', 'parameters', 'resultLabel'] missing_fields = [] for field in required_fields: if field not in action or not action[field]: missing_fields.append(field) if missing_fields: logger.error(f"Action {i} missing required fields: {missing_fields}") return False # Validate result label format result_label = action.get('resultLabel', '') if not result_label.startswith('task'): logger.error(f"Action {i} result label must start with 'task': {result_label}") return False # Validate parameters parameters = action.get('parameters', {}) if not isinstance(parameters, dict): logger.error(f"Action {i} parameters must be a dictionary") return False logger.info(f"Successfully validated {len(actions)} actions") return True except Exception as e: logger.error(f"Error validating actions: {str(e)}") return False def _createFallbackActions(self, task_step: Dict[str, Any], context: Dict[str, Any]) -> List[Dict[str, Any]]: """Create fallback actions when AI generation fails with retry context awareness""" logger.warning("Creating fallback actions due to AI generation failure") # Get available documents available_docs = context.get('available_documents', []) retry_count = context.get('retry_count', 0) previous_action_results = context.get('previous_action_results', []) if not available_docs: logger.warning("No available documents for fallback actions") return [] # Create fallback actions for document analysis fallback_actions = [] for i, doc in enumerate(available_docs): # Enhanced AI prompt for retry scenarios ai_prompt = "Fallback document analysis for " + doc if retry_count > 0 and previous_action_results: ai_prompt += f". Previous attempt failed - ensure comprehensive extraction with detailed analysis." fallback_actions.append({ "method": "document", "action": "analyze", "parameters": { "documentList": ["task1_previous_results"], "aiPrompt": ai_prompt }, "resultLabel": f"task1_fallback_retry{retry_count}:" + doc + ":analysis", "description": f"Fallback document analysis for {doc} (attempt {retry_count + 1})" }) logger.info(f"Created {len(fallback_actions)} fallback actions") return fallback_actions # ===== Prompt Creation Methods ===== def _createTaskPlanningPrompt(self, context: Dict[str, Any]) -> str: """Create prompt for task planning""" return f"""You are a task planning AI that analyzes user requests and creates structured task plans. USER REQUEST: {context['user_request']} AVAILABLE DOCUMENTS: {', '.join(context['available_documents'])} INSTRUCTIONS: 1. Analyze the user request and available documents 2. Break down the request into 2-4 meaningful high-level task steps 3. Focus on business outcomes, not technical operations 4. For document processing, create ONE task with a comprehensive AI prompt rather than multiple granular tasks 5. Each task should produce meaningful, usable outputs 6. Ensure proper handover between tasks using result labels 7. Return a JSON object with the exact structure shown below TASK PLANNING PRINCIPLES: - Combine related operations into single tasks (e.g., "Extract and analyze all candidate profiles" instead of separate "read file" and "analyze content" tasks) - Use comprehensive AI prompts for document processing rather than multiple small tasks - Focus on business value and outcomes - Keep tasks at a meaningful level of abstraction - Each task should produce results that can be used by subsequent tasks REQUIRED JSON STRUCTURE: {{ "overview": "Brief description of the overall plan", "tasks": [ {{ "id": "task_1", "description": "Clear description of what this task accomplishes (business outcome)", "dependencies": ["task_0"], // IDs of tasks that must complete first "expected_outputs": ["output1", "output2"], "success_criteria": ["criteria1", "criteria2"], "required_documents": ["doc1", "doc2"], "estimated_complexity": "low|medium|high", "ai_prompt": "Comprehensive AI prompt for document processing tasks (if applicable)" }} ] }} EXAMPLES OF GOOD TASK DESCRIPTIONS: - "Extract and analyze all candidate profiles to identify key qualifications and experience" - "Create evaluation matrix and rate candidates against product designer criteria" - "Generate comprehensive PowerPoint presentation for management decision" - "Store final presentation in SharePoint for specified account" EXAMPLES OF BAD TASK DESCRIPTIONS: - "Open and read the PDF file" (too granular) - "Identify table structure" (technical detail) - "Convert data to CSV format" (implementation detail) NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" async def _createActionDefinitionPrompt(self, context: Dict[str, Any]) -> str: """Create prompt for action generation with enhanced document extraction guidance and retry context""" task_step = context['task_step'] workflow = context.get('workflow') available_docs = context['available_documents'] previous_results = context['previous_results'] improvements = context.get('improvements', '') retry_count = context.get('retry_count', 0) previous_action_results = context.get('previous_action_results', []) previous_review_result = context.get('previous_review_result') # Get available methods and actions with signatures methodList = self.service.getMethodsList() method_actions = {} for sig in methodList: if '.' in sig: method, rest = sig.split('.', 1) action = rest.split('(')[0] method_actions.setdefault(method, []).append((action, sig)) # Get workflow history messageSummary = await self.service.summarizeChat(workflow.messages) # Get available documents and connections docRefs = self.service.getDocumentReferenceList() connRefs = self.service.getConnectionReferenceList() all_doc_refs = docRefs.get('chat', []) + docRefs.get('history', []) # Build AVAILABLE METHODS section available_methods_str = '' for method, actions in method_actions.items(): available_methods_str += f"- {method}:\n" for action, sig in actions: available_methods_str += f" - {action}: {sig}\n" # Get AI prompt from task step if available task_ai_prompt = task_step.get('ai_prompt', '') # Build retry context section retry_context = "" if retry_count > 0: retry_context = f""" RETRY CONTEXT (Attempt {retry_count}): Previous action results that failed or were incomplete: """ for i, result in enumerate(previous_action_results): retry_context += f"- Action {i+1}: {result.get('actionMethod', 'unknown')}.{result.get('actionName', 'unknown')}\n" retry_context += f" Status: {result.get('status', 'unknown')}\n" retry_context += f" Error: {result.get('error', 'None')}\n" retry_context += f" Result: {result.get('result', '')[:100]}...\n" if previous_review_result: retry_context += f""" Previous review feedback: - Status: {previous_review_result.get('status', 'unknown')} - Reason: {previous_review_result.get('reason', 'No reason provided')} - Quality Score: {previous_review_result.get('quality_score', 0)}/10 - Missing Outputs: {', '.join(previous_review_result.get('missing_outputs', []))} - Unmet Criteria: {', '.join(previous_review_result.get('unmet_criteria', []))} """ return f""" You are an action generation AI that creates specific actions to accomplish a task step. DOCUMENT REFERENCE TYPES: - docItem: Reference to a single document. Format: "docItem::" - docList: Reference to a group of documents under a label. Format: