From 02d34b914e182164680ff34baf9d1a5b6dce86bf Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Fri, 18 Jul 2025 14:20:11 +0200 Subject: [PATCH] refactored chat manager --- app.py | 7 + modules/chat/BACKUP managerChat.py | 3078 +++++++++++++++++ modules/chat/documents/documentCreation.py | 124 + .../documentProcessing.py} | 0 modules/chat/handling/executionState.py | 53 + modules/chat/handling/handlingActions.py | 432 +++ modules/chat/handling/handlingTasks.py | 291 ++ modules/chat/handling/promptFactory.py | 405 +++ modules/chat/managerChat.py | 3057 +--------------- modules/chat/serviceCenter.py | 129 +- modules/interfaces/interfaceAiCalls.py | 22 +- modules/methods/methodDocument.py | 29 +- modules/methods/methodWeb.py | 847 +---- modules/workflow/managerWorkflow.py | 27 +- notes/changelog.txt | 53 +- web_search_20250717_140455.txt | 0 web_search_20250717_144557.txt | 0 17 files changed, 4711 insertions(+), 3843 deletions(-) create mode 100644 modules/chat/BACKUP managerChat.py create mode 100644 modules/chat/documents/documentCreation.py rename modules/chat/{processorDocument.py => documents/documentProcessing.py} (100%) create mode 100644 modules/chat/handling/executionState.py create mode 100644 modules/chat/handling/handlingActions.py create mode 100644 modules/chat/handling/handlingTasks.py create mode 100644 modules/chat/handling/promptFactory.py create mode 100644 web_search_20250717_140455.txt create mode 100644 web_search_20250717_144557.txt diff --git a/app.py b/app.py index fda05eaa..81b1c9af 100644 --- a/app.py +++ b/app.py @@ -37,6 +37,11 @@ def initLogging(): ('.well-known/appspecific/com.chrome.devtools.json' in record.msg or 'Request: /index.html' in record.msg)) + # Add filter to exclude all httpcore loggers (including sub-loggers) + class HttpcoreStarFilter(logging.Filter): + def filter(self, record): + return not (record.name == 'httpcore' or record.name.startswith('httpcore.')) + # Add filter to exclude HTTP debug messages class HTTPDebugFilter(logging.Filter): def filter(self, record): @@ -61,6 +66,7 @@ def initLogging(): consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(consoleFormatter) consoleHandler.addFilter(ChromeDevToolsFilter()) + consoleHandler.addFilter(HttpcoreStarFilter()) consoleHandler.addFilter(HTTPDebugFilter()) handlers.append(consoleHandler) @@ -88,6 +94,7 @@ def initLogging(): ) fileHandler.setFormatter(fileFormatter) fileHandler.addFilter(ChromeDevToolsFilter()) + fileHandler.addFilter(HttpcoreStarFilter()) fileHandler.addFilter(HTTPDebugFilter()) handlers.append(fileHandler) diff --git a/modules/chat/BACKUP managerChat.py b/modules/chat/BACKUP managerChat.py new file mode 100644 index 00000000..056efcbc --- /dev/null +++ b/modules/chat/BACKUP managerChat.py @@ -0,0 +1,3078 @@ +import asyncio +import logging +import uuid +import json +import time +from typing import Dict, Any, Optional, List, Union +from datetime import datetime, UTC + +from modules.interfaces.interfaceAppModel import User +from modules.interfaces.interfaceChatModel import ( + TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, UserInputRequest, ActionResult, + ExtractedContent, ContentItem, ContentMetadata, DocumentExchange, TaskStep, TaskContext, ActionExecutionResult, ReviewContext, ReviewResult, TaskPlan, WorkflowResult +) +from modules.chat.serviceCenter import ServiceCenter +from modules.interfaces.interfaceChatObjects import ChatObjects + +logger = logging.getLogger(__name__) + +# ===== STATE MANAGEMENT AND VALIDATION CLASSES ===== + +class TaskExecutionState: + """Manages state during task execution with retry logic""" + def __init__(self, task_step: TaskStep): + self.task_step = task_step + self.successful_actions: List[ActionExecutionResult] = [] # Preserved across retries + self.failed_actions: List[ActionExecutionResult] = [] # For analysis + self.current_action_index = 0 + self.retry_count = 0 + self.improvements = [] + self.partial_results = {} # Store intermediate results + self.max_retries = 3 + def addSuccessfulAction(self, action_result: ActionExecutionResult): + self.successful_actions.append(action_result) + if action_result.data.get('resultLabel'): + self.partial_results[action_result.data['resultLabel']] = action_result + def addFailedAction(self, action_result: ActionExecutionResult): + self.failed_actions.append(action_result) + def getAvailableResults(self) -> list: + return [result.data.get('resultLabel', '') for result in self.successful_actions if result.data.get('resultLabel')] + def shouldRetryTask(self) -> bool: + return len(self.successful_actions) > 0 and len(self.failed_actions) > 0 + def canRetry(self) -> bool: + return self.retry_count < self.max_retries + def incrementRetryCount(self): + self.retry_count += 1 + def getFailurePatterns(self) -> list: + patterns = [] + for action in self.failed_actions: + error = action.error.lower() if action.error else '' + if "timeout" in error: + patterns.append("timeout_issues") + elif "document_not_found" in error or "file not found" in error: + patterns.append("document_reference_issues") + elif "empty_result" in error or "no content" in error: + patterns.append("content_extraction_issues") + elif "invalid_format" in error or "wrong format" in error: + patterns.append("format_issues") + elif "permission" in error or "access denied" in error: + patterns.append("permission_issues") + return list(set(patterns)) + +class ActionValidator: + """Generic AI-based action result validation""" + def __init__(self, chat_manager): + self.chat_manager = chat_manager + + async def validateActionResult(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> dict: + """Generic action validation using AI""" + try: + # Create generic validation prompt + prompt = self._createGenericValidationPrompt(action_result, action, context) + response = await self.chat_manager._callAIWithCircuitBreaker(prompt, "action_validation") + validation = self._parseValidationResponse(response) + + # Add action metadata + validation['action_id'] = action.id + validation['action_method'] = action.execMethod + validation['action_name'] = action.execAction + validation['result_label'] = action.execResultLabel + + return validation + except Exception as e: + logger.error(f"Error validating action result: {str(e)}") + return { + 'status': 'success', + 'reason': f'Validation failed: {str(e)}', + 'confidence': 0.5, + 'improvements': [], + 'action_id': action.id, + 'action_method': action.execMethod, + 'action_name': action.execAction, + 'result_label': action.execResultLabel + } + + def _createGenericValidationPrompt(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> str: + """Create a validation prompt focused on result file delivery""" + # Extract data from ActionResult model + success = action_result.success + result_data = action_result.data + error = action_result.error + validation_messages = action_result.validation + + # Extract result text from data + result_text = result_data.get("result", "") if isinstance(result_data, dict) else str(result_data) + + # Get documents from ActionResult data + documents = result_data.get("documents", []) if isinstance(result_data, dict) else [] + doc_count = len(documents) + + # Extract expected result format from action parameters + expected_result_label = action.execResultLabel + expected_format = action.execParameters.get('outputFormat', 'unknown') + + # Extract expected document formats from action + expected_document_formats = action.expectedDocumentFormats or [] + + # Check if the result label is present in the action result data + actual_result_label = result_data.get("resultLabel", "") if isinstance(result_data, dict) else "" + result_label_match = actual_result_label == expected_result_label + + # Analyze delivered documents and content + delivered_files = [] + delivered_formats = [] + content_items = [] + + # Check for ChatDocument objects + for doc in documents: + if hasattr(doc, 'filename'): + delivered_files.append(doc.filename) + # Extract format information + file_extension = self._getFileExtension(doc.filename) + mime_type = getattr(doc, 'mimeType', 'application/octet-stream') + delivered_formats.append({ + 'filename': doc.filename, + 'extension': file_extension, + 'mimeType': mime_type + }) + elif isinstance(doc, dict) and 'filename' in doc: + delivered_files.append(doc['filename']) + file_extension = self._getFileExtension(doc['filename']) + mime_type = doc.get('mimeType', 'application/octet-stream') + delivered_formats.append({ + 'filename': doc['filename'], + 'extension': file_extension, + 'mimeType': mime_type + }) + else: + delivered_files.append(f"document_{len(delivered_files)}") + delivered_formats.append({ + 'filename': f"document_{len(delivered_files)}", + 'extension': 'unknown', + 'mimeType': 'application/octet-stream' + }) + + # Check for ExtractedContent in result data + if isinstance(result_data, dict): + if 'extractedContent' in result_data: + extracted_content = result_data['extractedContent'] + if hasattr(extracted_content, 'contents'): + content_items = extracted_content.contents + elif 'contents' in result_data: + content_items = result_data['contents'] + + # If we have delivered files but no content items, consider it successful + # This handles the case where content is stored in files rather than result data + if delivered_files and not content_items: + content_items = [f"File content available in: {', '.join(delivered_files)}"] + + # Analyze content items + content_summary = [] + for item in content_items: + if hasattr(item, 'label') and hasattr(item, 'metadata'): + content_summary.append(f"{item.label}: {item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else 'unknown'}") + elif isinstance(item, str): + content_summary.append(item) + else: + content_summary.append(str(item)) + + return f"""You are an action result validator. Your primary focus is to validate that the action delivered the promised result files in the promised format. + +ACTION DETAILS: +- Method: {action.execMethod} +- Action: {action.execAction} +- Expected Result Label: {expected_result_label} +- Actual Result Label: {actual_result_label} +- Result Label Match: {result_label_match} +- Expected Format: {expected_format} +- Expected Document Formats: {json.dumps(expected_document_formats, indent=2) if expected_document_formats else 'None specified'} +- Parameters: {json.dumps(action.execParameters, indent=2)} + +RESULT TO VALIDATE: +- Success: {success} +- Result Data: {result_text[:500]}{'...' if len(result_text) > 500 else ''} +- Error: {error} +- Validation Messages: {', '.join(validation_messages) if validation_messages else 'None'} +- Documents Produced: {doc_count} +- Delivered Files: {', '.join(delivered_files) if delivered_files else 'None'} +- Delivered Formats: {json.dumps(delivered_formats, indent=2) if delivered_formats else 'None'} +- Content Items: {', '.join(content_summary) if content_summary else 'None'} + +CRITICAL VALIDATION CRITERIA: +1. **Result Label Match**: Does the action result contain the expected result label? +2. **File Delivery**: Did the action deliver the promised result file(s)? +3. **Format Compliance**: If expected document formats were specified, do the delivered files match the expected formats? +4. **Content Quality**: Is the content of the delivered files usable and complete? +5. **Content Processing**: If content extraction was expected, was it performed correctly? + +CONTEXT: +- Task Description: {context.task_step.description if context.task_step else 'Unknown'} +- Previous Results: {', '.join(context.previous_results) if context.previous_results else 'None'} + +VALIDATION INSTRUCTIONS: +1. **Result Label Check**: Verify that the expected result label "{expected_result_label}" is present in the action result data. This is the primary success criterion. +2. **File Delivery**: Check if files were delivered when expected. The individual filenames don't need to match the result label - focus on whether content was actually produced. +3. **Format Compliance**: If expected document formats were specified, check if delivered files match the expected extensions and MIME types. If no formats were specified, this criterion is satisfied. +4. **Content Quality**: If files were delivered, consider the action successful. The presence of delivered files indicates content was processed and stored. +5. **Content Processing**: If files were delivered, assume content extraction was performed correctly. The file delivery is evidence of successful processing. +6. **Success Criteria**: The action is successful if the result label matches AND files were delivered. If expected formats were specified, they should also match. + +IMPORTANT NOTES: +- The result label must be present in the action result data for success +- Individual filenames can be different from the result label +- If files were delivered, consider the action successful even if content details are not provided +- Focus on whether the action accomplished its intended purpose (file delivery) +- Empty files should be considered failures, but delivered files indicate success + +REQUIRED JSON RESPONSE: +{{ + "status": "success|retry|fail", + "reason": "Detailed explanation focusing on result label match and content quality", + "confidence": 0.0-1.0, + "improvements": ["specific improvements if needed"], + "quality_score": 1-10, + "missing_elements": ["missing result label", "missing files", "content issues"], + "suggested_retry_approach": "Specific approach for retry if status is retry" +}} + +NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" + + def _parseValidationResponse(self, response: str) -> dict: + """Parse the AI validation response""" + try: + json_start = response.find('{') + json_end = response.rfind('}') + 1 + if json_start == -1 or json_end == 0: + raise ValueError("No JSON found in validation response") + + json_str = response[json_start:json_end] + validation = json.loads(json_str) + + if 'status' not in validation: + raise ValueError("Validation response missing 'status' field") + + # Set defaults for optional fields + validation.setdefault('confidence', 0.5) + validation.setdefault('improvements', []) + validation.setdefault('quality_score', 5) + validation.setdefault('missing_elements', []) + validation.setdefault('suggested_retry_approach', '') + + return validation + except Exception as e: + logger.error(f"Error parsing validation response: {str(e)}") + return { + 'status': 'success', + 'reason': f'Parse error: {str(e)}', + 'confidence': 0.5, + 'improvements': [], + 'quality_score': 5, + 'missing_elements': [], + 'suggested_retry_approach': '' + } + + def _getFileExtension(self, filename: str) -> str: + """Extract file extension from filename""" + if '.' in filename: + return '.' + filename.split('.')[-1] + return '' + +class ChatManager: + """Chat manager with improved AI integration and method handling""" + + def __init__(self, currentUser: User, chatInterface: ChatObjects): + self.currentUser = currentUser + self.chatInterface = chatInterface + self.service: ServiceCenter = None + self.workflow: ChatWorkflow = None + + # Circuit breaker for AI calls + self.ai_failure_count = 0 + self.ai_last_failure_time = None + self.ai_circuit_breaker_threshold = 5 + self.ai_circuit_breaker_timeout = 300 # 5 minutes + + # Timeout settings + self.ai_call_timeout = 120 # 2 minutes + self.task_execution_timeout = 600 # 10 minutes + + # ===== Initialization and Setup ===== + async def initialize(self, workflow: ChatWorkflow) -> None: + """Initialize chat manager with workflow""" + self.workflow = workflow + self.service = ServiceCenter(self.currentUser, self.workflow) + + # ===== WORKFLOW PHASES ===== + + # Phase 1: High-Level Task Planning + async def planHighLevelTasks(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan: + """Phase 1: Plan high-level tasks from user input""" + try: + logger.info(f"Planning high-level tasks for workflow {workflow.id}") + + # Create planning prompt + prompt = self.createTaskPlanningPrompt({ + 'user_request': userInput, + 'available_documents': self._getAvailableDocuments(workflow), + 'workflow_id': workflow.id + }) + + # Get AI response with fallback mechanism + response = await self._callAIWithCircuitBreaker(prompt, "task_planning") + + # Parse and validate task plan + task_plan_dict = self._parseTaskPlanResponse(response) + + if not self._validateTaskPlan(task_plan_dict): + logger.error("Generated task plan failed validation") + raise Exception("AI-generated task plan failed validation - AI is required for task planning") + + # Convert to TaskPlan model + tasks = [] + for task_dict in task_plan_dict.get('tasks', []): + task = TaskStep( + id=task_dict.get('id', ''), + description=task_dict.get('description', ''), + dependencies=task_dict.get('dependencies', []), + expected_outputs=task_dict.get('expected_outputs', []), + success_criteria=task_dict.get('success_criteria', []), + required_documents=task_dict.get('required_documents', []), + estimated_complexity=task_dict.get('estimated_complexity'), + ai_prompt=task_dict.get('ai_prompt') + ) + tasks.append(task) + + task_plan = TaskPlan( + overview=task_plan_dict.get('overview', ''), + tasks=tasks + ) + + # Log the task plan as JSON for debugging + logger.info(f"Task plan created for workflow {workflow.id}:") + task_plan_json = { + 'overview': task_plan.overview, + 'tasks_count': len(task_plan.tasks), + 'tasks': [] + } + for task in task_plan.tasks: + task_json = { + 'id': task.id, + 'description': task.description, + 'dependencies': task.dependencies or [], + 'expected_outputs': task.expected_outputs or [], + 'success_criteria': task.success_criteria or [], + 'required_documents': task.required_documents or [], + 'estimated_complexity': task.estimated_complexity or '', + 'ai_prompt': task.ai_prompt or '' + } + task_plan_json['tasks'].append(task_json) + logger.info(f"Task Plan: {json.dumps(task_plan_json, indent=2, ensure_ascii=False)}") + + logger.info(f"High-level task planning completed: {len(task_plan.tasks)} tasks") + return task_plan + + except Exception as e: + error_message = str(e) + logger.error(f"Error in high-level task planning: {error_message}") + + # Provide more specific error messages based on the error type + if "overloaded" in error_message.lower() or "529" in error_message: + detailed_error = "AI service is currently overloaded. Please try again in a few minutes." + elif "rate limit" in error_message.lower() or "429" in error_message: + detailed_error = "Rate limit exceeded. Please wait before making another request." + elif "api key" in error_message.lower() or "401" in error_message: + detailed_error = "Invalid API key. Please check your AI service configuration." + elif "timeout" in error_message.lower(): + detailed_error = "AI service request timed out. Please try again." + else: + detailed_error = f"AI service error: {error_message}" + + raise Exception(detailed_error) + + # Phase 2: Task Definition and Action Generation + async def defineTaskActions(self, task_step: TaskStep, workflow: ChatWorkflow, previous_results: List[str] = None, + enhanced_context: TaskContext = None) -> List[TaskAction]: + """Phase 2: Define specific actions for a task step with enhanced retry context""" + try: + logger.info(f"Defining actions for task: {task_step.description if hasattr(task_step, 'description') else 'Unknown'}") + + # Use enhanced context if provided (for retries), otherwise create basic context + if enhanced_context: + context = enhanced_context + else: + context = TaskContext( + task_step=task_step, + workflow=workflow, + workflow_id=workflow.id, + available_documents=self._getAvailableDocuments(workflow), + previous_results=previous_results or [], + improvements=[], + retry_count=0, + previous_action_results=[], + previous_review_result=None, + is_regeneration=False, + failure_patterns=[], + failed_actions=[], + successful_actions=[] + ) + + # Generate actions using AI + actions = await self._generateActionsForTaskStep(context) + + # Log the generated actions as JSON for debugging + logger.info(f"Generated {len(actions)} actions for task '{task_step.description}':") + for i, action in enumerate(actions): + logger.info(f"Action {i+1}: {json.dumps(action, indent=2, ensure_ascii=False)}") + + # Convert to TaskAction objects + # Get available document labels for validation + available_document_labels = set(self._getAvailableDocuments(workflow)) + task_actions = [] + invalid_doc_ref_detected = False + # Collect resultLabels of actions defined so far in this step + result_labels_so_far = set() + for action_dict in actions: + # Validate document references in parameters + params = action_dict.get('parameters', {}) + if 'documentList' in params and isinstance(params['documentList'], list): + original_refs = params['documentList'] + # Allow references to available documents or to resultLabels of actions defined so far + valid_refs = [ref for ref in original_refs if ref in available_document_labels or ref in result_labels_so_far] + if len(valid_refs) < len(original_refs): + logger.warning(f"Action {action_dict.get('method','?')}.{action_dict.get('action','?')} has invalid document references: {set(original_refs) - set(valid_refs)}. Only using valid: {valid_refs}") + invalid_doc_ref_detected = True + if not valid_refs: + logger.warning(f"Skipping action {action_dict.get('method','?')}.{action_dict.get('action','?')} due to no valid document references.") + continue + params['documentList'] = valid_refs + action_data = { + "execMethod": action_dict.get('method', 'unknown'), + "execAction": action_dict.get('action', 'unknown'), + "execParameters": params, + "execResultLabel": action_dict.get('resultLabel', ''), + "expectedDocumentFormats": action_dict.get('expectedDocumentFormats', None), + "status": TaskStatus.PENDING + } + task_action = self.chatInterface.createTaskAction(action_data) + if task_action: + # Log action definition: parameters, input documentLabels, output document label + logger.debug(f"[ACTION DEFINITION] Method: {task_action.execMethod}, Action: {task_action.execAction}, Parameters: {json.dumps(task_action.execParameters, ensure_ascii=False)}, Input documentLabels: {task_action.execParameters.get('documentList', [])}, Output documentLabel: {task_action.execResultLabel}") + task_actions.append(task_action) + # Add this action's resultLabel to the running set for subsequent actions + if action_data["execResultLabel"]: + result_labels_so_far.add(action_data["execResultLabel"]) + logger.info(f"Created task action: {task_action.execMethod}.{task_action.execAction}") + # If all actions were skipped due to invalid document references, add improvement and return [] + if not task_actions and invalid_doc_ref_detected: + improvement_msg = ("Previous action(s) referenced invalid or unavailable document labels. " + "Only use document labels listed in AVAILABLE DOCUMENTS. Do not invent or copy message IDs.") + if enhanced_context: + if hasattr(enhanced_context, 'improvements') and isinstance(enhanced_context.improvements, list): + enhanced_context.improvements.append(improvement_msg) + else: + if hasattr(context, 'improvements') and isinstance(context.improvements, list): + context.improvements.append(improvement_msg) + logger.warning("All actions skipped due to invalid document references. Added improvement for retry.") + return [] + + # Update stats for task validation (estimate bytes for action validation) + if task_actions: + # Calculate actual action size for stats + action_size = self.service.calculateObjectSize(task_actions) + self.service.updateWorkflowStats(eventLabel="action", bytesSent=action_size) + + # Log the final TaskAction objects as JSON + logger.info(f"Final TaskAction objects for task '{task_step.description}':") + for i, task_action in enumerate(task_actions): + action_json = { + 'id': task_action.id, + 'execMethod': task_action.execMethod, + 'execAction': task_action.execAction, + 'execParameters': task_action.execParameters, + 'execResultLabel': task_action.execResultLabel, + 'status': task_action.status.value if hasattr(task_action.status, 'value') else str(task_action.status) + } + logger.info(f"TaskAction {i+1}: {json.dumps(action_json, indent=2, ensure_ascii=False)}") + + logger.info(f"Task action definition completed: {len(task_actions)} actions") + return task_actions + + except Exception as e: + logger.error(f"Error defining task actions: {str(e)}") + return [] + + # Phase 3: Action Execution + async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[ActionExecutionResult]: + """Phase 3: Execute all actions for a task with retry mechanism""" + try: + logger.info(f"Executing {len(task_actions)} task actions") + + results = [] + for i, action in enumerate(task_actions): + logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}") + + # Execute single action with retry mechanism + result = await self._executeSingleAction(action, workflow) + results.append(result) + + # If action failed after all retries, continue with next action instead of stopping + if not result.success: + logger.error(f"Action {i+1} failed after retries, continuing with next action") + # Don't break - continue with remaining actions + continue + + logger.info(f"Task action execution completed: {len(results)} results") + return results + + except Exception as e: + logger.error(f"Error executing task actions: {str(e)}") + return [] + + # Phase 4: Task Review and Quality Assessment + async def reviewTaskCompletion(self, task_step: TaskStep, task_actions: List[TaskAction], + action_results: List[ActionExecutionResult], workflow: ChatWorkflow) -> ReviewResult: + """Phase 4: Review task completion and decide next steps""" + try: + logger.info(f"Reviewing task completion: {task_step.description}") + + # Create step result summary from action results + step_result = { + 'task_step': task_step, + 'action_results': action_results, + 'successful_actions': sum(1 for result in action_results if result.success), + 'total_actions': len(action_results), + 'results': [result.data.get('result', '') for result in action_results if result.success], + 'errors': [result.error for result in action_results if not result.success] + } + + # Prepare review context + review_context = ReviewContext( + task_step=task_step, + task_actions=task_actions, + action_results=action_results, + step_result=step_result, + workflow_id=workflow.id, + previous_results=self._getPreviousResultsFromActions(task_actions) + ) + + # Use AI to review the results + review = await self._performTaskReview(review_context) + + # Add quality metrics + quality_metrics = self._calculateTaskQualityMetrics(task_step, action_results) + + logger.info(f"Task review completed: {review.status}") + return ReviewResult( + status=review.status, + reason=review.reason, + improvements=review.improvements, + quality_score=review.quality_score, + missing_outputs=review.missing_outputs, + met_criteria=review.met_criteria, + unmet_criteria=review.unmet_criteria, + confidence=review.confidence + ) + + except Exception as e: + logger.error(f"Error reviewing task completion: {str(e)}") + return ReviewResult( + status='failed', + reason=f'Review failed: {str(e)}', + quality_score=0, + confidence=0 + ) + + # Phase 5: Task Handover and State Management + async def prepareTaskHandover(self, task_step: TaskStep, task_actions: List[TaskAction], + review_result: ReviewResult, workflow: ChatWorkflow) -> Dict[str, Any]: + """Phase 5: Prepare results for next task or workflow completion""" + try: + logger.info(f"Preparing task handover: {task_step.description}") + + # Update task actions with results + for action in task_actions: + if action.status == TaskStatus.PENDING: + action.status = TaskStatus.COMPLETED if review_result.status == 'success' else TaskStatus.FAILED + + # Create serializable task actions + task_actions_serializable = [] + for action in task_actions: + action_dict = { + 'id': action.id, + 'execMethod': action.execMethod, + 'execAction': action.execAction, + 'execParameters': action.execParameters, + 'execResultLabel': action.execResultLabel, + 'status': action.status.value if hasattr(action.status, 'value') else str(action.status) + } + task_actions_serializable.append(action_dict) + + # Create handover data + handover_data = { + 'task_step': task_step, + 'task_actions': task_actions_serializable, + 'review_result': review_result, + 'next_task_ready': review_result.status == 'success', + 'available_results': self._getPreviousResultsFromActions(task_actions) + } + + logger.info(f"Task handover prepared: next_task_ready={handover_data['next_task_ready']}") + return handover_data + + except Exception as e: + logger.error(f"Error preparing task handover: {str(e)}") + # Create serializable task actions for exception case + task_actions_serializable = [] + for action in task_actions: + action_dict = { + 'id': action.id, + 'execMethod': action.execMethod, + 'execAction': action.execAction, + 'execParameters': action.execParameters, + 'execResultLabel': action.execResultLabel, + 'status': action.status.value if hasattr(action.status, 'value') else str(action.status) + } + task_actions_serializable.append(action_dict) + + return { + 'task_step': task_step, + 'task_actions': task_actions_serializable, + 'review_result': review_result, + 'next_task_ready': False, + 'available_results': [] + } + + + + + + # ===== Utility Methods ===== + + async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]: + """Process file IDs and return ChatDocument objects""" + documents = [] + + for fileId in fileIds: + try: + # Ensure service is initialized + if not hasattr(self, 'service') or not self.service: + logger.error(f"Service not initialized for file ID {fileId}") + continue + + # Get file info from service + fileInfo = self.service.getFileInfo(fileId) + if fileInfo: + # Create document using interface + documentData = { + "fileId": fileId, + "filename": fileInfo.get("filename", "unknown"), + "fileSize": fileInfo.get("size", 0), + "mimeType": fileInfo.get("mimeType", "application/octet-stream") + } + document = self.chatInterface.createChatDocument(documentData) + if document: + documents.append(document) + logger.info(f"Processed file ID {fileId} -> {document.filename}") + else: + logger.warning(f"No file info found for file ID {fileId}") + except Exception as e: + logger.error(f"Error processing file ID {fileId}: {str(e)}") + + + return documents + + def setUserLanguage(self, language: str) -> None: + """Set user language for the chat manager""" + if hasattr(self, 'service') and self.service: + self.service.user.language = language + + # ===== Enhanced Task Planning Methods ===== + + async def _callAIWithCircuitBreaker(self, prompt: str, context: str) -> str: + """Call AI with intelligent routing based on complexity and circuit breaker pattern""" + max_retries = 3 + base_delay = 2 # Start with 2 seconds + + for attempt in range(max_retries): + try: + # Check circuit breaker + if self._isCircuitBreakerOpen(): + raise Exception("AI circuit breaker is open - too many recent failures") + + # Determine which AI service to use based on complexity + ai_choice = self._determineAIChoice(prompt, context) + logger.debug(f"AI choice for {context}: {ai_choice} (attempt {attempt + 1}/{max_retries})") + + if ai_choice == "advanced": + # Use advanced AI for complex tasks + try: + response = await asyncio.wait_for( + self._callAdvancedAI(prompt, context), + timeout=self.ai_call_timeout + ) + + # Reset failure count on success + self.ai_failure_count = 0 + logger.info(f"Advanced AI call successful for {context}") + return response + + except Exception as advanced_error: + error_message = str(advanced_error) + logger.warning(f"Advanced AI call failed for {context}: {error_message}") + + # Fall back to basic AI for complex tasks + logger.info(f"Falling back to basic AI for complex task: {context}") + try: + response = await asyncio.wait_for( + self._callStandardAI(prompt, context), + timeout=self.ai_call_timeout + ) + + # Reset failure count on success + self.ai_failure_count = 0 + logger.info(f"Basic AI fallback successful for complex task: {context}") + return response + + except Exception as standard_error: + # Both failed for complex task + error_message = f"Advanced AI failed: {str(advanced_error)}. Basic AI failed: {str(standard_error)}" + raise Exception(error_message) + + else: # basic + # Use basic AI for simple tasks + try: + response = await asyncio.wait_for( + self._callStandardAI(prompt, context), + timeout=self.ai_call_timeout + ) + + # Reset failure count on success + self.ai_failure_count = 0 + logger.info(f"Basic AI call successful for {context}") + return response + + except Exception as basic_error: + error_message = str(basic_error) + logger.warning(f"Basic AI call failed for {context}: {error_message}") + + # Only upgrade to advanced AI for critical simple tasks + if self._isCriticalTask(context): + logger.info(f"Upgrading to advanced AI for critical simple task: {context}") + try: + response = await asyncio.wait_for( + self._callAdvancedAI(prompt, context), + timeout=self.ai_call_timeout + ) + + # Reset failure count on success + self.ai_failure_count = 0 + logger.info(f"Advanced AI upgrade successful for critical task: {context}") + return response + + except Exception as advanced_error: + # Both failed for critical task + error_message = f"Basic AI failed: {str(basic_error)}. Advanced AI failed: {str(advanced_error)}" + raise Exception(error_message) + else: + # Non-critical simple task failed + raise Exception(f"Basic AI failed for simple task: {error_message}") + + except asyncio.TimeoutError: + self._recordAIFailure("Timeout") + if attempt < max_retries - 1: + delay = base_delay * (2 ** attempt) # Exponential backoff + logger.warning(f"AI call timed out, retrying in {delay} seconds (attempt {attempt + 1}/{max_retries})") + await asyncio.sleep(delay) + continue + else: + raise Exception(f"AI call timed out after {self.ai_call_timeout} seconds") + + except Exception as e: + error_message = str(e) + + # Special handling for overloaded service (529 error) + if "overloaded" in error_message.lower() or "529" in error_message: + if attempt < max_retries - 1: + delay = base_delay * (2 ** attempt) # Exponential backoff + logger.warning(f"AI service overloaded, retrying in {delay} seconds (attempt {attempt + 1}/{max_retries})") + await asyncio.sleep(delay) + continue + else: + # Don't record this as a circuit breaker failure since it's a service issue + raise Exception("AI service is currently overloaded. Please try again in a few minutes.") + + # For other errors, record failure and potentially retry + self._recordAIFailure(error_message) + if attempt < max_retries - 1: + delay = base_delay * (2 ** attempt) # Exponential backoff + logger.warning(f"AI call failed, retrying in {delay} seconds (attempt {attempt + 1}/{max_retries}): {error_message}") + await asyncio.sleep(delay) + continue + else: + raise + + def _isCircuitBreakerOpen(self) -> bool: + """Check if circuit breaker is open""" + if self.ai_failure_count >= self.ai_circuit_breaker_threshold: + if self.ai_last_failure_time: + time_since_failure = (datetime.now(UTC) - self.ai_last_failure_time).total_seconds() + if time_since_failure < self.ai_circuit_breaker_timeout: + return True + else: + # Reset circuit breaker after timeout + self.ai_failure_count = 0 + self.ai_last_failure_time = None + return False + + def _determineAIChoice(self, prompt: str, context: str) -> str: + """Determine whether to use advanced or basic AI based on task complexity""" + + # Check for forced AI choice based on context + forced_choice = self._getForcedAIChoice(context) + if forced_choice: + logger.debug(f"Forced AI choice for {context}: {forced_choice}") + return forced_choice + + # Define complex task patterns that require advanced AI + complex_patterns = [ + # Task planning and workflow management + "task_planning", "action_generation", "result_review", "task_completion_validation", + + # Complex document analysis + "document", "extract", "analysis", "comprehensive", "detailed analysis", + + # Multi-step reasoning + "plan", "strategy", "evaluate", "assess", "compare", "analyze", + + # Complex business logic + "workflow", "task", "action", "validation", "review", "assessment", + + # Critical decision making + "decision", "recommendation", "evaluation", "quality", "success criteria", + + # Complex prompts + "JSON", "structured", "format", "validation", "improvements", "quality_score" + ] + + # Define simple task patterns that can use basic AI + simple_patterns = [ + # Basic text processing + "summarize", "translate", "format", "convert", "extract text", + + # Simple queries + "find", "search", "list", "get", "retrieve", + + # Basic operations + "send", "upload", "download", "create", "delete", + + # Simple responses + "confirm", "acknowledge", "status", "info" + ] + + # Check prompt and context for complexity indicators + combined_text = f"{prompt} {context}".lower() + + # Count complex indicators + complex_count = sum(1 for pattern in complex_patterns if pattern in combined_text) + + # Count simple indicators + simple_count = sum(1 for pattern in simple_patterns if pattern in combined_text) + + # Additional complexity factors + prompt_length = len(prompt) + has_json_requirement = "json" in combined_text and ("{" in prompt or "}" in prompt) + has_structured_output = any(word in combined_text for word in ["format", "structure", "template"]) + has_validation = any(word in combined_text for word in ["validate", "check", "verify", "quality"]) + + # Calculate complexity score + complexity_score = 0 + complexity_score += complex_count * 2 # Complex patterns worth more + complexity_score += simple_count * 1 # Simple patterns worth less + complexity_score += (prompt_length > 1000) * 3 # Long prompts are complex + complexity_score += has_json_requirement * 5 # JSON requirements are complex + complexity_score += has_structured_output * 3 # Structured output is complex + complexity_score += has_validation * 4 # Validation is complex + + # Determine AI choice based on complexity score + if complexity_score >= 5: + logger.debug(f"Complex task detected (score: {complexity_score}) - using advanced AI for {context}") + return "advanced" + else: + logger.debug(f"Simple task detected (score: {complexity_score}) - using basic AI for {context}") + return "basic" + + def _getForcedAIChoice(self, context: str) -> str: + """Get forced AI choice for specific contexts (can be overridden)""" + + # Define contexts that always use advanced AI + advanced_contexts = [ + "task_planning", # Always use advanced for task planning + "action_generation", # Always use advanced for action generation + "result_review", # Always use advanced for result review + "task_completion_validation" # Always use advanced for validation + ] + + # Define contexts that always use basic AI + basic_contexts = [ + "summarize", # Always use basic for summarization + "translate", # Always use basic for translation + "format", # Always use basic for formatting + "status", # Always use basic for status updates + "info" # Always use basic for info queries + ] + + context_lower = context.lower() + + # Check for forced advanced AI + for advanced_context in advanced_contexts: + if advanced_context in context_lower: + return "advanced" + + # Check for forced basic AI + for basic_context in basic_contexts: + if basic_context in context_lower: + return "basic" + + # No forced choice + return None + + def _isCriticalTask(self, context: str) -> bool: + """Determine if a simple task is critical enough to warrant advanced AI upgrade""" + + # Define critical task patterns + critical_patterns = [ + # Workflow critical tasks + "task_planning", "workflow", "critical", "essential", + + # User-facing decisions + "decision", "recommendation", "evaluation", "assessment", + + # Quality-sensitive tasks + "quality", "validation", "review", "check", + + # Business-critical operations + "business", "strategy", "planning", "analysis" + ] + + context_lower = context.lower() + + # Check if context contains critical patterns + is_critical = any(pattern in context_lower for pattern in critical_patterns) + + if is_critical: + logger.debug(f"Critical task detected - {context}") + + return is_critical + + def _recordAIFailure(self, error: str): + """Record AI failure for circuit breaker""" + self.ai_failure_count += 1 + self.ai_last_failure_time = datetime.now(UTC) + logger.warning(f"AI failure recorded ({self.ai_failure_count}/{self.ai_circuit_breaker_threshold}): {error}") + + def _validateTaskPlan(self, task_plan: Dict[str, Any]) -> bool: + """Validate task plan structure and dependencies""" + try: + if not isinstance(task_plan, dict): + return False + + if 'tasks' not in task_plan or not isinstance(task_plan['tasks'], list): + return False + + # Check each task + task_ids = set() + for task in task_plan['tasks']: + if not isinstance(task, dict): + return False + + required_fields = ['id', 'description', 'expected_outputs', 'success_criteria'] + if not all(field in task for field in required_fields): + return False + + # Check for duplicate task IDs + if task['id'] in task_ids: + return False + task_ids.add(task['id']) + + # Validate dependencies + dependencies = task.get('dependencies', []) + if not isinstance(dependencies, list): + return False + + # Check that dependencies reference existing tasks + for dep in dependencies: + if dep not in task_ids and dep != 'task_0': # Allow task_0 as special case + return False + + # Validate ai_prompt if present (optional field) + if 'ai_prompt' in task and not isinstance(task['ai_prompt'], str): + return False + + return True + + except Exception as e: + logger.error(f"Error validating task plan: {str(e)}") + return False + + + def _validateActions(self, actions: List[Dict[str, Any]], context: TaskContext) -> bool: + """Validate generated actions""" + try: + if not isinstance(actions, list): + logger.error("Actions must be a list") + return False + + if len(actions) == 0: + logger.warning("No actions generated") + return False + + for i, action in enumerate(actions): + if not isinstance(action, dict): + logger.error(f"Action {i} must be a dictionary") + return False + + # Check required fields + required_fields = ['method', 'action', 'parameters', 'resultLabel'] + missing_fields = [] + for field in required_fields: + if field not in action or not action[field]: + missing_fields.append(field) + + if missing_fields: + logger.error(f"Action {i} missing required fields: {missing_fields}") + return False + + # Validate result label format + result_label = action.get('resultLabel', '') + if not result_label.startswith('task'): + logger.error(f"Action {i} result label must start with 'task': {result_label}") + return False + + # Validate parameters + parameters = action.get('parameters', {}) + if not isinstance(parameters, dict): + logger.error(f"Action {i} parameters must be a dictionary") + return False + + logger.info(f"Successfully validated {len(actions)} actions") + return True + + except Exception as e: + logger.error(f"Error validating actions: {str(e)}") + return False + + + + # ===== Prompt Creation Methods ===== + + def createTaskPlanningPrompt(self, context: Dict[str, Any]) -> str: + """Create prompt for task planning""" + return f"""You are a task planning AI that analyzes user requests and creates structured task plans. + +USER REQUEST: {context['user_request']} + +AVAILABLE DOCUMENTS: {', '.join(context['available_documents'])} + +INSTRUCTIONS: +1. Analyze the user request and available documents +2. Break down the request into 2-4 meaningful high-level task steps +3. Focus on business outcomes, not technical operations +4. For document processing, create ONE task with a comprehensive AI prompt rather than multiple granular tasks +5. Each task should produce meaningful, usable outputs +6. Ensure proper handover between tasks using result labels +7. Return a JSON object with the exact structure shown below + +TASK PLANNING PRINCIPLES: +- Combine related operations into single tasks (e.g., "Extract and analyze all candidate profiles" instead of separate "read file" and "analyze content" tasks) +- Use comprehensive AI prompts for document processing rather than multiple small tasks +- Focus on business value and outcomes +- Keep tasks at a meaningful level of abstraction +- Each task should produce results that can be used by subsequent tasks + +REQUIRED JSON STRUCTURE: +{{ + "overview": "Brief description of the overall plan", + "tasks": [ + {{ + "id": "task_1", + "description": "Clear description of what this task accomplishes (business outcome)", + "dependencies": ["task_0"], // IDs of tasks that must complete first + "expected_outputs": ["output1", "output2"], + "success_criteria": ["criteria1", "criteria2"], + "required_documents": ["doc1", "doc2"], + "estimated_complexity": "low|medium|high", + "ai_prompt": "Comprehensive AI prompt for document processing tasks (if applicable)" + }} + ] +}} + +EXAMPLES OF GOOD TASK DESCRIPTIONS: +- "Extract and analyze all candidate profiles to identify key qualifications and experience" +- "Create evaluation matrix and rate candidates against product designer criteria" +- "Generate comprehensive PowerPoint presentation for management decision" +- "Store final presentation in SharePoint for specified account" + +EXAMPLES OF BAD TASK DESCRIPTIONS: +- "Open and read the PDF file" (too granular) +- "Identify table structure" (technical detail) +- "Convert data to CSV format" (implementation detail) + +NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" + + async def createActionDefinitionPrompt(self, context: TaskContext) -> str: + """Create prompt for action generation with enhanced document extraction guidance and retry context""" + task_step = context.task_step + workflow = context.workflow + available_docs = context.available_documents or [] + previous_results = context.previous_results or [] + improvements = context.improvements or [] + retry_count = context.retry_count or 0 + previous_action_results = context.previous_action_results or [] + previous_review_result = context.previous_review_result + + # Get available methods and actions with signatures + methodList = self.service.getMethodsList() + method_actions = {} + for sig in methodList: + if '.' in sig: + method, rest = sig.split('.', 1) + action = rest.split('(')[0] + method_actions.setdefault(method, []).append((action, sig)) + + # Get workflow history + messageSummary = await self.service.summarizeChat(workflow.messages) + + # Get available documents and connections + docRefs = self.service.getDocumentReferenceList() + connRefs = self.service.getConnectionReferenceList() + all_doc_refs = docRefs.get('chat', []) + docRefs.get('history', []) + + # Build AVAILABLE METHODS section + available_methods_str = '' + for method, actions in method_actions.items(): + available_methods_str += f"- {method}:\n" + for action, sig in actions: + available_methods_str += f" - {action}: {sig}\n" + + # Get AI prompt from task step if available + task_ai_prompt = task_step.ai_prompt or '' + + # Build retry context section + retry_context = "" + if retry_count > 0: + retry_context = f""" +RETRY CONTEXT (Attempt {retry_count}): +Previous action results that failed or were incomplete: +""" + for i, result in enumerate(previous_action_results): + retry_context += f"- Action {i+1}: {result.actionMethod or 'unknown'}.{result.actionName or 'unknown'}\n" + retry_context += f" Status: {result.success and 'success' or 'failed'}\n" + retry_context += f" Error: {result.error or 'None'}\n" + retry_context += f" Result: {(result.data.get('result', '') if result.data else '')[:100]}...\n" + + if previous_review_result: + retry_context += f""" +Previous review feedback: +- Status: {previous_review_result.status or 'unknown'} +- Reason: {previous_review_result.reason or 'No reason provided'} +- Quality Score: {previous_review_result.quality_score or 0}/10 +- Missing Outputs: {', '.join(previous_review_result.missing_outputs or [])} +- Unmet Criteria: {', '.join(previous_review_result.unmet_criteria or [])} +""" + + # Precompute all complex string expressions to avoid f-string nesting issues + expected_outputs_str = ', '.join(task_step.expected_outputs or []) + success_criteria_str = ', '.join(task_step.success_criteria or []) + previous_results_str = ', '.join(previous_results) if previous_results else 'None' + improvements_str = str(improvements) if improvements else 'None' + available_connections_str = '\n'.join(f"- {conn}" for conn in connRefs) + available_documents_str = '\n'.join(f"- {doc.documentsLabel} contains {', '.join(doc.documents)}" for doc in all_doc_refs) + # Build the prompt using only precomputed variables + prompt = f""" +You are an action generation AI that creates specific actions to accomplish a task step. + +DOCUMENT REFERENCE TYPES: +- docItem: Reference to a single document. Format: "docItem::" +- docList: Reference to a group of documents under a label. Format: