diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py index c29fd70e..4132027d 100644 --- a/modules/interfaces/interfaceChatObjects.py +++ b/modules/interfaces/interfaceChatObjects.py @@ -6,6 +6,7 @@ Uses the JSON connector for data access with added language support. import os import logging import uuid +import time from datetime import datetime, UTC from typing import Dict, Any, List, Optional, Union @@ -167,7 +168,12 @@ class ChatObjects: startedAt=workflow.get("startedAt", self._getCurrentTimestamp()), logs=[ChatLog(**log) for log in workflow.get("logs", [])], messages=[ChatMessage(**msg) for msg in workflow.get("messages", [])], - stats=ChatStat(**workflow.get("dataStats", {})) if workflow.get("dataStats") else None, + stats=ChatStat(**workflow.get("dataStats", {})) if workflow.get("dataStats") else ChatStat( + bytesSent=0, + bytesReceived=0, + tokenCount=0, + processingTime=0 + ), mandateId=workflow.get("mandateId", self.currentUser.mandateId) ) except Exception as e: @@ -230,7 +236,12 @@ class ChatObjects: startedAt=updated.get("startedAt", workflow.startedAt), logs=[ChatLog(**log) for log in updated.get("logs", workflow.logs)], messages=[ChatMessage(**msg) for msg in updated.get("messages", workflow.messages)], - stats=ChatStat(**updated.get("dataStats", workflow.stats.dict() if workflow.stats else {})) if updated.get("dataStats") or workflow.stats else None, + stats=ChatStat(**updated.get("dataStats", workflow.stats.dict() if workflow.stats else {})) if updated.get("dataStats") or workflow.stats else ChatStat( + bytesSent=0, + bytesReceived=0, + tokenCount=0, + processingTime=0 + ), mandateId=updated.get("mandateId", workflow.mandateId) ) @@ -327,11 +338,7 @@ class ChatObjects: publishedAt=createdMessage.get("publishedAt", self._getCurrentTimestamp()), stats=ChatStat(**createdMessage.get("stats", {})) if createdMessage.get("stats") else None ) - - # Update workflow stats for message creation (estimate bytes for message) - message_size = len(createdMessage.get("message", "")) + sum(len(doc.get("filename", "")) for doc in createdMessage.get("documents", [])) - self.updateWorkflowStats(workflowId, bytesSent=0, bytesReceived=message_size) - + except Exception as e: logger.error(f"Error creating workflow message: {str(e)}") return None @@ -553,22 +560,31 @@ class ChatObjects: logger.error(f"No permission to update workflow {workflowId} stats") return False - # Get current stats - currentStats = workflow.stats.dict() if workflow.stats else { - "bytesSent": 0, - "bytesReceived": 0, - "tokenCount": 0, - "processingTime": 0 - } + # Get current stats - ensure we have proper defaults + if workflow.stats: + currentStats = workflow.stats.dict() + # Ensure all required fields exist + currentStats.setdefault("bytesSent", 0) + currentStats.setdefault("bytesReceived", 0) + currentStats.setdefault("tokenCount", 0) + currentStats.setdefault("processingTime", 0) + else: + currentStats = { + "bytesSent": 0, + "bytesReceived": 0, + "tokenCount": 0, + "processingTime": 0 + } - # Calculate processing time from workflow start - workflow_start = datetime.fromisoformat(workflow.startedAt.replace('Z', '+00:00')) - current_time = datetime.now(UTC) - processing_time = (current_time - workflow_start).total_seconds() + # Simple processing time - just use current time + processing_time = time.time() - # Update stats with incremental values - currentStats["bytesSent"] = currentStats.get("bytesSent", 0) + bytesSent - currentStats["bytesReceived"] = currentStats.get("bytesReceived", 0) + bytesReceived + # Update stats with incremental values - ensure no None values + current_bytes_sent = currentStats.get("bytesSent", 0) or 0 + current_bytes_received = currentStats.get("bytesReceived", 0) or 0 + + currentStats["bytesSent"] = current_bytes_sent + bytesSent + currentStats["bytesReceived"] = current_bytes_received + bytesReceived currentStats["tokenCount"] = currentStats["bytesSent"] + currentStats["bytesReceived"] currentStats["processingTime"] = processing_time @@ -840,6 +856,9 @@ class ChatObjects: # Create workflow workflow = self.createWorkflow(workflowData) + # Initialize stats for the new workflow + self.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0) + # Remove the 'Workflow started' log entry # Start workflow processing diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py index e01cfeb7..6b14379d 100644 --- a/modules/routes/routeWorkflows.py +++ b/modules/routes/routeWorkflows.py @@ -71,7 +71,12 @@ async def get_workflows( startedAt=workflow_data.get("startedAt", appInterface._getCurrentTimestamp()), logs=[ChatLog(**log) for log in workflow_data.get("logs", [])], messages=[ChatMessage(**msg) for msg in workflow_data.get("messages", [])], - stats=ChatStat(**workflow_data.get("dataStats", {})) if workflow_data.get("dataStats") else None, + stats=ChatStat(**workflow_data.get("dataStats", {})) if workflow_data.get("dataStats") else ChatStat( + bytesSent=0, + bytesReceived=0, + tokenCount=0, + processingTime=0 + ), mandateId=workflow_data.get("mandateId", currentUser.mandateId or "") ) workflows.append(workflow) diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py index 470653d8..7b77aa6e 100644 --- a/modules/workflow/managerChat.py +++ b/modules/workflow/managerChat.py @@ -8,13 +8,221 @@ from datetime import datetime, UTC from modules.interfaces.interfaceAppModel import User from modules.interfaces.interfaceChatModel import ( - TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow + TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, UserInputRequest, ActionResult, + ExtractedContent, ContentItem, ContentMetadata, DocumentExchange ) from modules.workflow.serviceCenter import ServiceCenter from modules.interfaces.interfaceChatObjects import ChatObjects logger = logging.getLogger(__name__) +# ===== STATE MANAGEMENT AND VALIDATION CLASSES ===== + +class TaskExecutionState: + """Manages state during task execution with retry logic""" + def __init__(self, task_step: dict): + self.task_step = task_step + self.successful_actions = [] # Preserved across retries + self.failed_actions = [] # For analysis + self.current_action_index = 0 + self.retry_count = 0 + self.improvements = [] + self.partial_results = {} # Store intermediate results + self.max_retries = 3 + def addSuccessfulAction(self, action_result: dict): + self.successful_actions.append(action_result) + if action_result.get('resultLabel'): + self.partial_results[action_result['resultLabel']] = action_result + def addFailedAction(self, action_result: dict): + self.failed_actions.append(action_result) + def getAvailableResults(self) -> list: + return [result.get('resultLabel', '') for result in self.successful_actions if result.get('resultLabel')] + def shouldRetryTask(self) -> bool: + return len(self.successful_actions) > 0 and len(self.failed_actions) > 0 + def canRetry(self) -> bool: + return self.retry_count < self.max_retries + def incrementRetryCount(self): + self.retry_count += 1 + def getFailurePatterns(self) -> list: + patterns = [] + for action in self.failed_actions: + error = action.get('error', '').lower() + if "timeout" in error: + patterns.append("timeout_issues") + elif "document_not_found" in error or "file not found" in error: + patterns.append("document_reference_issues") + elif "empty_result" in error or "no content" in error: + patterns.append("content_extraction_issues") + elif "invalid_format" in error or "wrong format" in error: + patterns.append("format_issues") + elif "permission" in error or "access denied" in error: + patterns.append("permission_issues") + return list(set(patterns)) + +class ActionValidator: + """Generic AI-based action result validation""" + def __init__(self, chat_manager): + self.chat_manager = chat_manager + + async def validateActionResult(self, action_result: ActionResult, action: TaskAction, context: dict) -> dict: + """Generic action validation using AI""" + try: + # Create generic validation prompt + prompt = self._createGenericValidationPrompt(action_result, action, context) + response = await self.chat_manager._callAIWithCircuitBreaker(prompt, "action_validation") + validation = self._parseValidationResponse(response) + + # Add action metadata + validation['action_id'] = action.id + validation['action_method'] = action.execMethod + validation['action_name'] = action.execAction + validation['result_label'] = action.execResultLabel + + return validation + except Exception as e: + logger.error(f"Error validating action result: {str(e)}") + return { + 'status': 'success', + 'reason': f'Validation failed: {str(e)}', + 'confidence': 0.5, + 'improvements': [], + 'action_id': action.id, + 'action_method': action.execMethod, + 'action_name': action.execAction, + 'result_label': action.execResultLabel + } + + def _createGenericValidationPrompt(self, action_result: ActionResult, action: TaskAction, context: dict) -> str: + """Create a validation prompt focused on result file delivery""" + # Extract data from ActionResult model + success = action_result.success + result_data = action_result.data + error = action_result.error + validation_messages = action_result.validation + + # Extract result text from data + result_text = result_data.get("result", "") if isinstance(result_data, dict) else str(result_data) + + # Get documents from ActionResult data + documents = result_data.get("documents", []) if isinstance(result_data, dict) else [] + doc_count = len(documents) + + # Extract expected result format from action parameters + expected_result_label = action.execResultLabel + expected_format = action.execParameters.get('outputFormat', 'unknown') + + # Analyze delivered documents and content + delivered_files = [] + content_items = [] + + # Check for ChatDocument objects + for doc in documents: + if hasattr(doc, 'filename'): + delivered_files.append(doc.filename) + elif isinstance(doc, dict) and 'filename' in doc: + delivered_files.append(doc['filename']) + else: + delivered_files.append(f"document_{len(delivered_files)}") + + # Check for ExtractedContent in result data + if isinstance(result_data, dict): + if 'extractedContent' in result_data: + extracted_content = result_data['extractedContent'] + if hasattr(extracted_content, 'contents'): + content_items = extracted_content.contents + elif 'contents' in result_data: + content_items = result_data['contents'] + + # Analyze content items + content_summary = [] + for item in content_items: + if hasattr(item, 'label') and hasattr(item, 'metadata'): + content_summary.append(f"{item.label}: {item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else 'unknown'}") + + return f"""You are an action result validator. Your primary focus is to validate that the action delivered the promised result files in the promised format. + +ACTION DETAILS: +- Method: {action.execMethod} +- Action: {action.execAction} +- Expected Result Label: {expected_result_label} +- Expected Format: {expected_format} +- Parameters: {json.dumps(action.execParameters, indent=2)} + +RESULT TO VALIDATE: +- Success: {success} +- Result Data: {result_text[:500]}{'...' if len(result_text) > 500 else ''} +- Error: {error} +- Validation Messages: {', '.join(validation_messages) if validation_messages else 'None'} +- Documents Produced: {doc_count} +- Delivered Files: {', '.join(delivered_files) if delivered_files else 'None'} +- Content Items: {', '.join(content_summary) if content_summary else 'None'} + +CRITICAL VALIDATION CRITERIA: +1. **File Delivery**: Did the action deliver the promised result file(s)? +2. **Format Compliance**: Are the delivered files in the promised format? +3. **Result Label Match**: Does the result match the expected result label? +4. **Content Quality**: Is the content of the delivered files usable and complete? +5. **Content Processing**: If content extraction was expected, was it performed correctly? + +CONTEXT: +- Task Description: {context.get('task_description', 'Unknown')} +- Previous Results: {', '.join(context.get('previous_results', []))} + +VALIDATION INSTRUCTIONS: +1. Check if the expected result label "{expected_result_label}" is present in the result +2. Verify that files were delivered when expected +3. Validate that the delivered files match the expected format "{expected_format}" +4. Assess if the content is complete and usable +5. Check if content extraction was performed when expected +6. Determine if retry would improve file delivery or format compliance + +REQUIRED JSON RESPONSE: +{{ + "status": "success|retry|fail", + "reason": "Detailed explanation focusing on file delivery and format compliance", + "confidence": 0.0-1.0, + "improvements": ["specific file delivery improvements", "format compliance fixes"], + "quality_score": 1-10, + "missing_elements": ["missing files", "format issues"], + "suggested_retry_approach": "Specific approach for retry if status is retry" +}} + +NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" + + def _parseValidationResponse(self, response: str) -> dict: + """Parse the AI validation response""" + try: + json_start = response.find('{') + json_end = response.rfind('}') + 1 + if json_start == -1 or json_end == 0: + raise ValueError("No JSON found in validation response") + + json_str = response[json_start:json_end] + validation = json.loads(json_str) + + if 'status' not in validation: + raise ValueError("Validation response missing 'status' field") + + # Set defaults for optional fields + validation.setdefault('confidence', 0.5) + validation.setdefault('improvements', []) + validation.setdefault('quality_score', 5) + validation.setdefault('missing_elements', []) + validation.setdefault('suggested_retry_approach', '') + + return validation + except Exception as e: + logger.error(f"Error parsing validation response: {str(e)}") + return { + 'status': 'success', + 'reason': f'Parse error: {str(e)}', + 'confidence': 0.5, + 'improvements': [], + 'quality_score': 5, + 'missing_elements': [], + 'suggested_retry_approach': '' + } + class ChatManager: """Chat manager with improved AI integration and method handling""" @@ -62,21 +270,15 @@ class ChatManager: task_plan = self._parseTaskPlanResponse(response) if not self._validateTaskPlan(task_plan): - logger.warning("Generated task plan failed validation, using fallback") - task_plan = self._createFallbackTaskPlan({ - 'user_request': userInput, - 'available_documents': self._getAvailableDocuments(workflow) - }) + logger.error("Generated task plan failed validation") + raise Exception("AI-generated task plan failed validation - AI is required for task planning") logger.info(f"High-level task planning completed: {len(task_plan.get('tasks', []))} tasks") return task_plan except Exception as e: logger.error(f"Error in high-level task planning: {str(e)}") - return self._createFallbackTaskPlan({ - 'user_request': userInput, - 'available_documents': self._getAvailableDocuments(workflow) - }) + raise Exception(f"AI is required for task planning but failed: {str(e)}") # Phase 2: Task Definition and Action Generation async def defineTaskActions(self, task_step: Dict[str, Any], workflow: ChatWorkflow, previous_results: List[str] = None, @@ -135,7 +337,7 @@ class ChatManager: # Phase 3: Action Execution async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[Dict[str, Any]]: - """Phase 3: Execute all actions for a task""" + """Phase 3: Execute all actions for a task with retry mechanism""" try: logger.info(f"Executing {len(task_actions)} task actions") @@ -143,14 +345,15 @@ class ChatManager: for i, action in enumerate(task_actions): logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}") - # Execute single action + # Execute single action with retry mechanism result = await self._executeSingleAction(action, workflow) results.append(result) - # If action failed, stop execution + # If action failed after all retries, continue with next action instead of stopping if result.get('status') == 'failed': - logger.error(f"Action {i+1} failed, stopping task execution") - break + logger.error(f"Action {i+1} failed after retries, continuing with next action") + # Don't break - continue with remaining actions + continue logger.info(f"Task action execution completed: {len(results)} results") return results @@ -397,36 +600,7 @@ class ChatManager: logger.error(f"Error validating task plan: {str(e)}") return False - def _createFallbackTaskPlan(self, context: Dict[str, Any]) -> Dict[str, Any]: - """Create a fallback task plan when AI generation fails""" - logger.warning("Creating fallback task plan due to AI generation failure") - - return { - "overview": "Fallback task plan - comprehensive document analysis and processing", - "tasks": [ - { - "id": "task_1", - "description": "Extract and analyze all provided documents comprehensively", - "dependencies": [], - "expected_outputs": ["comprehensive_document_analysis"], - "success_criteria": ["All documents processed and analyzed"], - "required_documents": context.get('available_documents', []), - "estimated_complexity": "medium", - "ai_prompt": "Extract and analyze all content from the provided documents. Identify key information, patterns, and insights that are relevant to the user's request. Provide a comprehensive analysis that can be used for further processing." - }, - { - "id": "task_2", - "description": "Generate comprehensive output based on analysis", - "dependencies": ["task_1"], - "expected_outputs": ["final_output"], - "success_criteria": ["Output generated and formatted appropriately"], - "required_documents": ["comprehensive_document_analysis"], - "estimated_complexity": "low", - "ai_prompt": "Based on the comprehensive document analysis, generate the final output that addresses the user's original request. Format the output appropriately and ensure it meets the user's requirements." - } - ] - } - + def _validateActions(self, actions: List[Dict[str, Any]], context: Dict[str, Any]) -> bool: """Validate generated actions""" try: @@ -473,40 +647,7 @@ class ChatManager: logger.error(f"Error validating actions: {str(e)}") return False - def _createFallbackActions(self, task_step: Dict[str, Any], context: Dict[str, Any]) -> List[Dict[str, Any]]: - """Create fallback actions when AI generation fails with retry context awareness""" - logger.warning("Creating fallback actions due to AI generation failure") - - # Get available documents - available_docs = context.get('available_documents', []) - retry_count = context.get('retry_count', 0) - previous_action_results = context.get('previous_action_results', []) - - if not available_docs: - logger.warning("No available documents for fallback actions") - return [] - - # Create fallback actions for document analysis - fallback_actions = [] - for i, doc in enumerate(available_docs): - # Enhanced AI prompt for retry scenarios - ai_prompt = "Fallback document analysis for " + doc - if retry_count > 0 and previous_action_results: - ai_prompt += f". Previous attempt failed - ensure comprehensive extraction with detailed analysis." - - fallback_actions.append({ - "method": "document", - "action": "analyze", - "parameters": { - "documentList": ["task1_previous_results"], - "aiPrompt": ai_prompt - }, - "resultLabel": f"task1_fallback_retry{retry_count}:" + doc + ":analysis", - "description": f"Fallback document analysis for {doc} (attempt {retry_count + 1})" - }) - - logger.info(f"Created {len(fallback_actions)} fallback actions") - return fallback_actions + # ===== Prompt Creation Methods ===== @@ -834,18 +975,18 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" # Validate actions if not self._validateActions(actions, context): - logger.warning("Generated actions failed validation, using fallback actions") - actions = self._createFallbackActions(context['task_step'], context) + logger.error("Generated actions failed validation") + raise Exception("AI-generated actions failed validation - AI is required for action generation") logger.info(f"Generated {len(actions)} actions for task step") return actions except Exception as e: logger.error(f"Error generating actions for task step: {str(e)}") - return self._createFallbackActions(context['task_step'], context) + raise Exception(f"AI is required for action generation but failed: {str(e)}") - async def _executeSingleAction(self, action: TaskAction, workflow: ChatWorkflow) -> Dict[str, Any]: - """Execute a single action and return result with enhanced document processing""" + async def _executeSingleAction(self, action: TaskAction, workflow: ChatWorkflow) -> ActionResult: + """Execute a single action and return ActionResult with enhanced document processing""" try: # Execute the actual method action using the service center result = await self.service.executeAction( @@ -925,28 +1066,46 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" 'document': doc }) - return { - "status": "completed" if result.success else "failed", - "result": result.data.get("result", ""), - "error": result.error or "", - "resultLabel": result_label, - "documents": processed_documents, - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction - } + # Create ActionResult with processed data + return ActionResult( + success=result.success, + data={ + "result": result.data.get("result", ""), + "documents": processed_documents, + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "resultLabel": result_label + }, + metadata={ + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "resultLabel": result_label + }, + validation=[], + error=result.error or "" + ) except Exception as e: logger.error(f"Error executing single action: {str(e)}") action.setError(str(e)) - return { - "status": "failed", - "error": str(e), - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction, - "documents": [] - } + return ActionResult( + success=False, + data={ + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "documents": [] + }, + metadata={ + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction + }, + validation=[], + error=str(e) + ) async def _createActionMessage(self, action: TaskAction, result: Any, workflow: ChatWorkflow, result_label: str = None) -> None: """Create and store a message for the action result in the workflow with enhanced document processing""" @@ -1457,8 +1616,8 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" # ===== UNIFIED WORKFLOW EXECUTION ===== - async def executeUnifiedWorkflow(self, userInput: str, workflow: ChatWorkflow) -> Dict[str, Any]: - """Execute a unified workflow with all phases""" + async def executeUnifiedWorkflow(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> Dict[str, Any]: + """Execute a unified workflow with state management and action-level validation""" try: logger.info(f"Starting unified workflow execution for workflow {workflow.id}") start_time = time.time() @@ -1473,7 +1632,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" logger.info(f"Processed {len(documents)} documents") # Calculate and update user input stats - user_input_size = self.service.calculateUserInputSize(userInput) + user_input_size = self.service.calculateUserInputSize(userInput.prompt) self.service.updateWorkflowStats(eventLabel="userinput", bytesReceived=user_input_size) # Phase 1: High-Level Task Planning @@ -1515,10 +1674,9 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" task_plan_log['tasks'].append(task_log) logger.debug(f"TASK PLAN CREATED: {json.dumps(task_plan_log, indent=2, ensure_ascii=False)}") - # Execute each task step with retry logic + # Execute each task step with state management workflow_results = [] previous_results = [] - max_retries = 3 # Maximum retries per task for i, task_step in enumerate(task_plan['tasks']): task_description = task_step.get('description', 'Unknown') @@ -1535,435 +1693,89 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" "agentName": "System" }) - # Retry loop for each task - task_success = False - retry_count = 0 - task_actions = [] - action_results = [] - review_result = {} - handover_data = {} - previous_action_results = [] # Track previous action results for retry context - previous_review_feedback = "" # Track previous review feedback for retry context - - while not task_success and retry_count < max_retries: - if retry_count > 0: - logger.info(f"--- RETRY {retry_count}/{max_retries} FOR TASK {i+1} ---") - # Create user-friendly retry log - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": f"Retrying task {i+1} (attempt {retry_count}/{max_retries})", - "type": "warning", - "status": "running", - "progress": progress, - "agentName": "System" - }) - - try: - # Phase 2: Define Task Actions - logger.info(f"--- PHASE 2: DEFINING ACTIONS FOR TASK {i+1} ---") - - # Enhanced context for retries - include previous results and feedback - enhanced_previous_results = previous_results.copy() if previous_results else [] - if retry_count > 0 and previous_action_results: - # Add previous action results to context - for result in previous_action_results: - if result.get('resultLabel'): - enhanced_previous_results.append(result.get('resultLabel')) - - # Create enhanced context with retry information - context = { - 'task_step': task_step, + # Create context for task execution + task_context = { 'workflow': workflow, 'workflow_id': workflow.id, 'available_documents': self._getAvailableDocuments(workflow), - 'previous_results': enhanced_previous_results, - 'improvements': previous_review_feedback if retry_count > 0 else None, - 'retry_count': retry_count, - 'previous_action_results': previous_action_results if retry_count > 0 else [], - 'previous_review_result': review_result if retry_count > 0 else None - } - - task_actions = await self.defineTaskActions(task_step, workflow, enhanced_previous_results, context) - if not task_actions: - logger.warning(f"No actions defined for task {i+1}, skipping") - break - - # Log task actions (convert to serializable format with metadata only) - task_actions_serializable = [] - for action in task_actions: - # Extract only metadata from parameters, not document content - parameters_metadata = {} - if hasattr(action, 'execParameters') and action.execParameters: - for key, value in action.execParameters.items(): - if key == 'documentList': - # Log document list as count and labels only - if isinstance(value, list): - parameters_metadata[key] = { - 'count': len(value), - 'labels': [str(v).split(':')[-1] if ':' in str(v) else str(v) for v in value] - } - else: - parameters_metadata[key] = str(value) - elif key == 'aiPrompt': - # Truncate AI prompts to avoid logging large content - parameters_metadata[key] = str(value)[:100] + '...' if len(str(value)) > 100 else str(value) - else: - parameters_metadata[key] = str(value) - - action_dict = { - 'execMethod': action.execMethod, - 'execAction': action.execAction, - 'execParameters': parameters_metadata, - 'execResultLabel': action.execResultLabel, - 'status': action.status.value if hasattr(action.status, 'value') else str(action.status) - } - task_actions_serializable.append(action_dict) - logger.debug(f"TASK {i+1} ACTIONS CREATED: {json.dumps(task_actions_serializable, indent=2, ensure_ascii=False)}") - - # Phase 3: Execute Task Actions - logger.info(f"--- PHASE 3: EXECUTING TASK {i+1} ACTIONS ---") - action_results = await self.executeTaskActions(task_actions, workflow) - - # Update stats for action execution - # Action stats are already handled by the service center during AI calls - - # Create user-friendly action completion log with quality metrics - successful_actions = sum(1 for result in action_results if result.get('status') == 'completed') - total_actions = len(action_results) - - if total_actions > 0: - if successful_actions == total_actions: - log_type = "success" - elif successful_actions == 0: - log_type = "error" - else: - log_type = "warning" - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": f"Successful actions: {successful_actions}/{total_actions}", - "type": log_type, - "status": "running", - "progress": progress + 10 - }) - - # Log action results (with metadata only) - action_results_metadata = [] - for result in action_results: - # Extract document metadata only - documents_metadata = [] - for doc in result.get('documents', []): - if hasattr(doc, 'filename'): - documents_metadata.append({ - 'filename': doc.filename, - 'fileSize': getattr(doc, 'fileSize', 0), - 'mimeType': getattr(doc, 'mimeType', 'unknown') - }) - elif isinstance(doc, dict): - documents_metadata.append({ - 'filename': doc.get('filename', 'unknown'), - 'fileSize': doc.get('fileSize', 0), - 'mimeType': doc.get('mimeType', 'unknown') - }) - - result_metadata = { - 'status': result.get('status', ''), - 'result_summary': result.get('result', '')[:200] + '...' if len(result.get('result', '')) > 200 else result.get('result', ''), - 'error': result.get('error', ''), - 'resultLabel': result.get('resultLabel', ''), - 'documents_count': len(documents_metadata), - 'documents_metadata': documents_metadata, - 'actionId': result.get('actionId', ''), - 'actionMethod': result.get('actionMethod', ''), - 'actionName': result.get('actionName', '') - } - action_results_metadata.append(result_metadata) - logger.debug(f"TASK {i+1} ACTION RESULTS: {json.dumps(action_results_metadata, indent=2, ensure_ascii=False)}") - - # Phase 4: Review Task Completion - logger.info(f"--- PHASE 4: REVIEWING TASK {i+1} COMPLETION ---") - review_result = await self.reviewTaskCompletion(task_step, task_actions, action_results, workflow) - - # Update stats for task review - # Task review stats are already handled by the service center during AI calls - - # Create user-friendly review log with quality metrics - quality_metrics = review_result.get('quality_metrics', {}) - quality_score = quality_metrics.get('score', 0) - confidence = quality_metrics.get('confidence', 0) - - review_status = review_result.get('status', 'unknown') - if review_status == 'success': - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": f"🎯 Task completed successfully with quality score {quality_score} and confidence {confidence}", - "type": "success", - "status": "running", - "progress": progress + 20 - }) - elif review_status == 'retry': - # Extract improvement details - improvements = review_result.get('improvements', '') - reason = review_result.get('reason', '') - unmet_criteria = review_result.get('unmet_criteria', []) - - # Build detailed message - retry_details = [] - if reason: - retry_details.append(f"Reason: {reason}") - if improvements: - retry_details.append(f"Improvements: {improvements}") - if unmet_criteria: - retry_details.append(f"Missing criteria: {', '.join(unmet_criteria[:3])}{'...' if len(unmet_criteria) > 3 else ''}") - - retry_message = f"🔄 Task needs improvement" - if retry_details: - retry_message += f"\n{chr(10).join(retry_details)}" - - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": retry_message, - "type": "warning", - "status": "running", - "progress": progress + 15 - }) - else: - # Extract failure details - reason = review_result.get('reason', '') - unmet_criteria = review_result.get('unmet_criteria', []) - missing_outputs = review_result.get('missing_outputs', []) - - # Build detailed failure message - failure_details = [] - if reason: - failure_details.append(f"Reason: {reason}") - if unmet_criteria: - failure_details.append(f"Unmet criteria: {', '.join(unmet_criteria[:3])}{'...' if len(unmet_criteria) > 3 else ''}") - if missing_outputs: - failure_details.append(f"Missing outputs: {', '.join(missing_outputs[:3])}{'...' if len(missing_outputs) > 3 else ''}") - - failure_message = f"❌ Task failed" - if failure_details: - failure_message += f"\n{chr(10).join(failure_details)}" - - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": failure_message, - "type": "error", - "status": "running", - "progress": progress + 15 - }) - - # Log review result (with metadata only) - review_result_metadata = { - 'status': review_result.get('status', ''), - 'reason': review_result.get('reason', ''), - 'improvements': review_result.get('improvements', ''), - 'quality_score': review_result.get('quality_score', 0), - 'missing_outputs_count': len(review_result.get('missing_outputs', [])), - 'met_criteria_count': len(review_result.get('met_criteria', [])), - 'unmet_criteria_count': len(review_result.get('unmet_criteria', [])) - } - logger.debug(f"TASK {i+1} REVIEW RESULT: {json.dumps(review_result_metadata, indent=2, ensure_ascii=False)}") - - # Phase 5: Prepare Task Handover - logger.info(f"--- PHASE 5: PREPARING TASK {i+1} HANDOVER ---") - handover_data = await self.prepareTaskHandover(task_step, task_actions, review_result, workflow) - - # Log handover data (with metadata only) - handover_data_metadata = { - 'task_step_id': handover_data.get('task_step', {}).get('id', ''), - 'task_actions_count': len(handover_data.get('task_actions', [])), - 'review_status': handover_data.get('review_result', {}).get('status', ''), - 'next_task_ready': handover_data.get('next_task_ready', False), - 'available_results_count': len(handover_data.get('available_results', [])) - } - logger.debug(f"TASK {i+1} HANDOVER DATA: {json.dumps(handover_data_metadata, indent=2, ensure_ascii=False)}") - - # Check if task is successful or needs retry - review_status = review_result.get('status', 'unknown') - if review_status == 'success': - task_success = True - logger.info(f"Task {i+1} completed successfully") - elif review_status == 'retry': - # Store current results and feedback for next retry - previous_action_results = action_results.copy() - previous_review_feedback = review_result.get('improvements', '') - - retry_count += 1 - if retry_count > max_retries: - logger.error(f"Task {i+1} failed after {max_retries} retries") - task_success = False - else: - logger.info(f"Task {i+1} needs retry (attempt {retry_count}/{max_retries})") - logger.info(f"Previous feedback: {previous_review_feedback}") - # Add delay before retry - await asyncio.sleep(2) - continue - elif review_status == 'failed': - logger.error(f"Task {i+1} failed permanently") - task_success = False - break - else: - logger.warning(f"Unknown review status '{review_status}' for task {i+1}") - task_success = False - break - - except Exception as e: - logger.error(f"Error processing task {i+1} (attempt {retry_count + 1}): {str(e)}") - retry_count += 1 - if retry_count >= max_retries: - logger.error(f"Task {i+1} failed after {max_retries} retries due to exceptions") - task_success = False - break - else: - logger.info(f"Retrying task {i+1} after exception") - await asyncio.sleep(2) - continue + 'previous_results': previous_results, + 'task_description': task_description + } - # Collect results regardless of success/failure - workflow_results.append({ - 'task_step': task_step, - 'task_actions': task_actions, - 'action_results': action_results, - 'review_result': review_result, - 'handover_data': handover_data, - 'retry_count': retry_count, - 'task_success': task_success - }) + # Execute task with state management + task_result = await self.executeTaskWithStateManagement(task_step, workflow, task_context) - # Update previous results for next task if successful - if task_success and handover_data.get('next_task_ready', False): - previous_results = handover_data.get('available_results', []) + # Log task result + if task_result['status'] == 'success': + successful_actions = len(task_result.get('successful_actions', [])) + failed_actions = len(task_result.get('failed_actions', [])) + quality_metrics = task_result.get('quality_metrics', {}) + quality_score = quality_metrics.get('score', 0) + + self.chatInterface.createWorkflowLog({ + "workflowId": workflow.id, + "message": f"🎯 Task {i+1} completed successfully ({successful_actions} actions, quality score: {quality_score})", + "type": "success", + "status": "running", + "progress": progress + 20 + }) + + # Update previous results for next task + previous_results = task_result.get('successful_actions', []) + else: - # If task failed, stop workflow - logger.warning(f"Task {i+1} not successful, stopping workflow") - break - - # Final workflow summary - successful_tasks = sum(1 for result in workflow_results if result.get('task_success', False)) - total_tasks = len(task_plan['tasks']) - - # Final workflow stats are already handled by the service center during AI calls + # Task failed - provide detailed feedback and stop workflow + failed_actions = len(task_result.get('failed_actions', [])) + retry_count = task_result.get('retry_count', 0) + reason = task_result.get('reason', 'Unknown error') + + self.chatInterface.createWorkflowLog({ + "workflowId": workflow.id, + "message": f"❌ Task {i+1} failed after {retry_count} retries: {reason}", + "type": "error", + "status": "failed", + "progress": progress + 15 + }) + + # Generate detailed failure feedback + failure_feedback = self._generateTaskFailureFeedback(task_step, task_result, i+1, len(task_plan['tasks'])) + + # Stop workflow and return failure + return { + 'status': 'failed', + 'completed_tasks': i, + 'total_tasks': len(task_plan['tasks']), + 'failed_task': task_step, + 'failure_reason': reason, + 'feedback': failure_feedback, + 'execution_time': time.time() - start_time + } + + # Add task result to workflow results + workflow_results.append(task_result) # Calculate total processing time total_processing_time = time.time() - start_time - # Create final user-friendly completion log - if successful_tasks == total_tasks: - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": f"🎉 Workflow completed ({successful_tasks}/{total_tasks} tasks)", - "type": "success", - "status": "completed", - "progress": 100 - }) - elif successful_tasks > 0: - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": f"⚠️ Workflow partially completed ({successful_tasks}/{total_tasks} tasks)", - "type": "warning", - "status": "completed", - "progress": 100 - }) - else: - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": f"❌ Workflow failed ({successful_tasks}/{total_tasks} tasks)", - "type": "error", - "status": "failed", - "progress": 100 - }) - - # Create serializable workflow results (with metadata only) - workflow_results_serializable = [] - for result in workflow_results: - # Extract action results metadata - action_results_metadata = [] - for action_result in result.get('action_results', []): - documents_metadata = [] - for doc in action_result.get('documents', []): - if hasattr(doc, 'filename'): - documents_metadata.append({ - 'filename': doc.filename, - 'fileSize': getattr(doc, 'fileSize', 0), - 'mimeType': getattr(doc, 'mimeType', 'unknown') - }) - elif isinstance(doc, dict): - documents_metadata.append({ - 'filename': doc.get('filename', 'unknown'), - 'fileSize': doc.get('fileSize', 0), - 'mimeType': doc.get('mimeType', 'unknown') - }) - - action_result_metadata = { - 'status': action_result.get('status', ''), - 'result_summary': action_result.get('result', '')[:200] + '...' if len(action_result.get('result', '')) > 200 else action_result.get('result', ''), - 'error': action_result.get('error', ''), - 'resultLabel': action_result.get('resultLabel', ''), - 'documents_count': len(documents_metadata), - 'documents_metadata': documents_metadata, - 'actionId': action_result.get('actionId', ''), - 'actionMethod': action_result.get('actionMethod', ''), - 'actionName': action_result.get('actionName', ''), - 'success_indicator': 'documents' if len(documents_metadata) > 0 else 'text_result' if action_result.get('result', '').strip() else 'none' - } - action_results_metadata.append(action_result_metadata) - - serializable_result = { - 'task_step': result['task_step'], - 'action_results': action_results_metadata, - 'review_result': result['review_result'], - 'handover_data': { - 'task_step_id': result['handover_data'].get('task_step', {}).get('id', ''), - 'task_actions_count': len(result['handover_data'].get('task_actions', [])), - 'review_status': result['handover_data'].get('review_result', {}).get('status', ''), - 'next_task_ready': result['handover_data'].get('next_task_ready', False), - 'available_results_count': len(result['handover_data'].get('available_results', [])) - }, - 'retry_count': result.get('retry_count', 0), - 'task_success': result.get('task_success', False) - } - # Convert task_actions to serializable format with metadata only - if 'task_actions' in result: - task_actions_serializable = [] - for action in result['task_actions']: - # Extract only metadata from parameters - parameters_metadata = {} - if hasattr(action, 'execParameters') and action.execParameters: - for key, value in action.execParameters.items(): - if key == 'documentList': - if isinstance(value, list): - parameters_metadata[key] = { - 'count': len(value), - 'labels': [str(v).split(':')[-1] if ':' in str(v) else str(v) for v in value] - } - else: - parameters_metadata[key] = str(value) - elif key == 'aiPrompt': - parameters_metadata[key] = str(value)[:100] + '...' if len(str(value)) > 100 else str(value) - else: - parameters_metadata[key] = str(value) - - action_dict = { - 'execMethod': action.execMethod, - 'execAction': action.execAction, - 'execParameters': parameters_metadata, - 'execResultLabel': action.execResultLabel, - 'status': action.status.value if hasattr(action.status, 'value') else str(action.status) - } - task_actions_serializable.append(action_dict) - serializable_result['task_actions'] = task_actions_serializable - workflow_results_serializable.append(serializable_result) + # Create final success log + self.chatInterface.createWorkflowLog({ + "workflowId": workflow.id, + "message": f"🎉 Workflow completed successfully ({len(workflow_results)}/{len(task_plan['tasks'])} tasks)", + "type": "success", + "status": "completed", + "progress": 100 + }) + # Create workflow summary workflow_summary = { - 'status': 'completed' if successful_tasks == total_tasks else 'partial', - 'successful_tasks': successful_tasks, - 'total_tasks': total_tasks, - 'workflow_results_count': len(workflow_results_serializable), + 'status': 'completed', + 'completed_tasks': len(workflow_results), + 'total_tasks': len(task_plan['tasks']), + 'execution_time': total_processing_time, 'final_results_count': len(previous_results) } - logger.info(f"=== UNIFIED WORKFLOW COMPLETED: {successful_tasks}/{total_tasks} tasks successful ===") + logger.info(f"=== UNIFIED WORKFLOW COMPLETED: {len(workflow_results)}/{len(task_plan['tasks'])} tasks successful ===") logger.debug(f"FINAL WORKFLOW SUMMARY: {json.dumps(workflow_summary, indent=2, ensure_ascii=False)}") return workflow_summary @@ -1983,3 +1795,534 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" 'error': str(e), 'phase': 'execution' } + + def _generateTaskFailureFeedback(self, task_step: Dict[str, Any], task_result: Dict[str, Any], task_number: int, total_tasks: int) -> str: + """Generate detailed feedback for task failure""" + + successful_actions = len(task_result.get('successful_actions', [])) + failed_actions = len(task_result.get('failed_actions', [])) + retry_count = task_result.get('retry_count', 0) + reason = task_result.get('reason', 'Unknown error') + + feedback = f""" +Workflow execution stopped due to task failure. + +PROGRESS: {task_number}/{total_tasks} tasks completed successfully +FAILED TASK: {task_step.get('description', 'Unknown')} + +FAILURE DETAILS: +- Retry attempts: {retry_count} +- Successful actions: {successful_actions} +- Failed actions: {failed_actions} +- Failure reason: {reason} + +The workflow cannot continue because this task is essential for subsequent steps. +Please review the task requirements and try again with different input or approach. +""" + + return feedback + + # ===== NEW STATE MANAGEMENT AND VALIDATION CLASSES ===== + + async def executeTaskWithStateManagement(self, task_step: Dict[str, Any], workflow: ChatWorkflow, context: Dict[str, Any]) -> Dict[str, Any]: + """Execute task with state management and action-level retry logic""" + try: + logger.info(f"Executing task with state management: {task_step.get('description', 'Unknown')}") + + # Initialize task state + state = TaskExecutionState(task_step) + + while state.canRetry(): + logger.info(f"Task execution attempt {state.retry_count + 1}/{state.max_retries + 1}") + + # Generate actions (first time or after regeneration) + if state.retry_count == 0: + actions = await self.defineTaskActions(task_step, workflow, context.get('previous_results', [])) + else: + # Regenerate actions with failure context + actions = await self._regenerateTaskActionsWithFailureContext(task_step, state, context) + + if not actions: + logger.warning(f"No actions defined for task, marking as failed") + return { + 'status': 'failed', + 'reason': 'No actions could be generated for task', + 'successful_actions': state.successful_actions, + 'failed_actions': state.failed_actions, + 'retry_count': state.retry_count + } + + # Execute actions with individual validation + for i, action in enumerate(actions): + logger.info(f"Executing action {i+1}/{len(actions)}: {action.execMethod}.{action.execAction}") + + # Execute action with validation + result = await self.executeActionWithValidation(action, workflow, context) + + if result['validation']['status'] == 'success': + state.addSuccessfulAction(result) + logger.info(f"Action {i+1} completed successfully") + + elif result['validation']['status'] == 'retry': + # Retry individual action + improvements = result['validation'].get('improvements', []) + retry_result = await self.retryActionWithImprovements(action, result, improvements) + + if retry_result['validation']['status'] == 'success': + state.addSuccessfulAction(retry_result) + logger.info(f"Action {i+1} retry successful") + else: + state.addFailedAction(retry_result) + logger.error(f"Action {i+1} retry failed") + # Action failed after retry - stop task execution and regenerate + break + + else: # fail + state.addFailedAction(result) + logger.error(f"Action {i+1} failed validation - stopping task execution") + # Action failed - stop task execution and regenerate + break + + # Validate task completion + task_validation = await self._validateTaskCompletion(state.successful_actions, task_step, workflow) + + if task_validation['status'] == 'success': + logger.info(f"Task completed successfully with {len(state.successful_actions)} successful actions") + return { + 'status': 'success', + 'successful_actions': state.successful_actions, + 'failed_actions': state.failed_actions, + 'retry_count': state.retry_count, + 'quality_metrics': task_validation.get('quality_metrics', {}) + } + + elif task_validation['status'] == 'retry': + state.improvements = task_validation.get('improvements', []) + state.incrementRetryCount() + logger.info(f"Task needs retry. Improvements: {state.improvements}") + # Preserve successful actions for next iteration + continue + + else: # fail + logger.error(f"Task failed permanently") + return { + 'status': 'failed', + 'reason': task_validation.get('reason', 'Task validation failed'), + 'successful_actions': state.successful_actions, + 'failed_actions': state.failed_actions, + 'retry_count': state.retry_count + } + + # Max retries reached + logger.error(f"Task failed after {state.max_retries} retries") + return { + 'status': 'failed', + 'reason': f'Task failed after {state.max_retries} retries', + 'successful_actions': state.successful_actions, + 'failed_actions': state.failed_actions, + 'retry_count': state.retry_count + } + + except Exception as e: + logger.error(f"Error in task execution with state management: {str(e)}") + return { + 'status': 'failed', + 'reason': f'Task execution error: {str(e)}', + 'successful_actions': [], + 'failed_actions': [], + 'retry_count': 0 + } + + async def _regenerateTaskActionsWithFailureContext(self, task_step: Dict[str, Any], state: TaskExecutionState, context: Dict[str, Any]) -> List[TaskAction]: + """Regenerate task actions with failure context and improvements""" + try: + logger.info(f"Regenerating actions for task with failure context") + + # Analyze failure patterns + failure_patterns = state.getFailurePatterns() + + # Create enhanced context with failure information + enhanced_context = { + 'task_step': task_step, + 'workflow': context.get('workflow'), + 'workflow_id': context.get('workflow_id'), + 'available_documents': context.get('available_documents', []), + 'previous_results': state.getAvailableResults(), + 'improvements': state.improvements, + 'retry_count': state.retry_count, + 'failure_patterns': failure_patterns, + 'failed_actions': state.failed_actions, + 'successful_actions': state.successful_actions, + 'is_regeneration': True + } + + # Generate new actions with failure avoidance + actions = await self.defineTaskActions(task_step, context.get('workflow'), state.getAvailableResults(), enhanced_context) + + logger.info(f"Regenerated {len(actions)} actions with failure context") + return actions + + except Exception as e: + logger.error(f"Error regenerating task actions: {str(e)}") + return [] + + async def _validateTaskCompletion(self, successful_actions: List[Dict[str, Any]], task_step: Dict[str, Any], workflow: ChatWorkflow) -> Dict[str, Any]: + """Validate if task is completed successfully""" + try: + logger.info(f"Validating task completion: {task_step.get('description', 'Unknown')}") + + # Create task result summary + task_result = { + 'task_step': task_step, + 'successful_actions': successful_actions, + 'successful_count': len(successful_actions), + 'expected_outputs': task_step.get('expected_outputs', []), + 'success_criteria': task_step.get('success_criteria', []) + } + + # Use AI to validate task completion + prompt = self._createTaskCompletionValidationPrompt(task_result, task_step) + response = await self._callAIWithCircuitBreaker(prompt, "task_completion_validation") + + # Parse validation result + validation = self._parseTaskValidationResponse(response) + + # Add quality metrics + validation['quality_metrics'] = self._calculateTaskQualityMetrics(task_step, successful_actions) + + logger.info(f"Task completion validation: {validation.get('status', 'unknown')}") + return validation + + except Exception as e: + logger.error(f"Error validating task completion: {str(e)}") + return { + 'status': 'success', # Default to success to avoid blocking + 'reason': f'Validation failed: {str(e)}', + 'quality_metrics': {'score': 5, 'confidence': 0.5} + } + + def _createTaskCompletionValidationPrompt(self, task_result: Dict[str, Any], task_step: Dict[str, Any]) -> str: + """Create prompt for task completion validation""" + + successful_actions = task_result['successful_actions'] + expected_outputs = task_result['expected_outputs'] + success_criteria = task_result['success_criteria'] + + # Summarize successful actions + action_summary = [] + for action in successful_actions: + action_summary.append({ + 'method': action.get('actionMethod', ''), + 'action': action.get('actionName', ''), + 'result_label': action.get('resultLabel', ''), + 'documents_count': len(action.get('documents', [])), + 'has_text_result': bool(action.get('result', '').strip()) + }) + + return f"""You are a task completion validator that evaluates if a task was successfully completed. + +TASK DETAILS: +- Description: {task_step.get('description', 'Unknown')} +- Expected Outputs: {', '.join(expected_outputs)} +- Success Criteria: {', '.join(success_criteria)} + +SUCCESSFUL ACTIONS ({len(successful_actions)}): +{json.dumps(action_summary, indent=2)} + +VALIDATION QUESTIONS: +1. Were all expected outputs produced? +2. Are the success criteria met? +3. Do the action results collectively accomplish the task goal? +4. Is the task ready for handover to the next task? + +REQUIRED JSON RESPONSE: +{{ + "status": "success|retry|fail", + "reason": "Detailed explanation of the validation decision", + "improvements": ["specific improvement 1", "specific improvement 2"], + "missing_outputs": ["output1", "output2"], + "met_criteria": ["criteria1", "criteria2"], + "unmet_criteria": ["criteria3", "criteria4"], + "quality_score": 1-10, + "confidence": 0.0-1.0 +}} + +NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" + + def _parseTaskValidationResponse(self, response: str) -> Dict[str, Any]: + """Parse task validation response""" + try: + # Extract JSON from response + json_start = response.find('{') + json_end = response.rfind('}') + 1 + if json_start == -1 or json_end == 0: + raise ValueError("No JSON found in task validation response") + + json_str = response[json_start:json_end] + validation = json.loads(json_str) + + # Validate structure + if 'status' not in validation: + raise ValueError("Task validation response missing 'status' field") + + # Set default values + validation.setdefault('reason', 'No reason provided') + validation.setdefault('improvements', []) + validation.setdefault('missing_outputs', []) + validation.setdefault('met_criteria', []) + validation.setdefault('unmet_criteria', []) + validation.setdefault('quality_score', 5) + validation.setdefault('confidence', 0.5) + + return validation + + except Exception as e: + logger.error(f"Error parsing task validation response: {str(e)}") + return { + 'status': 'success', + 'reason': f'Parse error: {str(e)}', + 'improvements': [], + 'missing_outputs': [], + 'met_criteria': [], + 'unmet_criteria': [], + 'quality_score': 5, + 'confidence': 0.5 + } + + # ===== FIX MISSING METHOD CALL ===== + + async def _executeSingleActionWithRetry(self, action: TaskAction, workflow: ChatWorkflow) -> Dict[str, Any]: + """Execute single action with retry mechanism - FIXED""" + try: + logger.info(f"Executing action with retry: {action.execMethod}.{action.execAction}") + + # Create context for validation + context = { + 'task_description': 'Action execution', + 'previous_results': [], + 'action_index': 0, + 'total_actions': 1 + } + + # Execute action with validation + result = await self.executeActionWithValidation(action, workflow, context) + + # If validation indicates retry, attempt retry + if result['validation']['status'] == 'retry': + improvements = result['validation'].get('improvements', []) + retry_result = await self.retryActionWithImprovements(action, result, improvements) + return retry_result + + return result + + except Exception as e: + logger.error(f"Error executing action with retry: {str(e)}") + action.setError(str(e)) + return { + "status": "failed", + "error": str(e), + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "documents": [], + "validation": { + 'status': 'fail', + 'reason': f'Execution error: {str(e)}', + 'confidence': 0.0, + 'improvements': [], + 'quality_score': 0, + 'missing_elements': [], + 'suggested_retry_approach': '' + } + } + + # ===== REPLACE OLD executeTaskActions METHOD ===== + + async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[Dict[str, Any]]: + """Execute task actions with individual validation and retry logic""" + try: + logger.info(f"Executing {len(task_actions)} task actions with validation") + + results = [] + for i, action in enumerate(task_actions): + logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}") + + # Create context for validation + context = { + 'task_description': 'Action execution', + 'previous_results': [r.get('resultLabel', '') for r in results if r.get('resultLabel')], + 'action_index': i, + 'total_actions': len(task_actions) + } + + # Execute action with validation + result = await self.executeActionWithValidation(action, workflow, context) + results.append(result) + + # If action failed after validation, continue with next action + if result['validation']['status'] == 'fail': + logger.error(f"Action {i+1} failed validation, continuing with next action") + continue + + logger.info(f"Task action execution completed: {len(results)} results") + return results + + except Exception as e: + logger.error(f"Error executing task actions: {str(e)}") + return [] + + # ===== RESTRUCTURED ACTION EXECUTION WITH VALIDATION ===== + + async def executeActionWithValidation(self, action: TaskAction, workflow: ChatWorkflow, context: Dict[str, Any]) -> Dict[str, Any]: + """Execute single action with immediate validation""" + try: + logger.info(f"Executing action: {action.execMethod}.{action.execAction}") + + # Execute the action + result = await self._executeSingleAction(action, workflow) + + # Validate the result immediately + validator = ActionValidator(self) + validation = await validator.validateActionResult(result, action, context) + + # Add validation result to action result + result['validation'] = validation + + # Update action status based on validation + if validation['status'] == 'success': + action.setSuccess() + logger.info(f"Action {action.execMethod}.{action.execAction} validated successfully") + elif validation['status'] == 'retry': + action.status = TaskStatus.PENDING # Keep pending for retry + logger.warning(f"Action {action.execMethod}.{action.execAction} needs retry: {validation.get('reason', 'No reason')}") + else: # fail + action.setError(validation.get('reason', 'Action failed validation')) + logger.error(f"Action {action.execMethod}.{action.execAction} failed validation: {validation.get('reason', 'No reason')}") + + return result + + except Exception as e: + logger.error(f"Error executing action with validation: {str(e)}") + action.setError(str(e)) + return { + "status": "failed", + "error": str(e), + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "documents": [], + "validation": { + 'status': 'fail', + 'reason': f'Execution error: {str(e)}', + 'confidence': 0.0, + 'improvements': [], + 'quality_score': 0, + 'missing_elements': [], + 'suggested_retry_approach': '' + } + } + + async def retryActionWithImprovements(self, action: TaskAction, previous_result: Dict[str, Any], improvements: List[str]) -> Dict[str, Any]: + """Retry action with improvements based on previous failure""" + try: + logger.info(f"Retrying action {action.execMethod}.{action.execAction} with improvements") + + # Apply improvements to action parameters + enhanced_action = self._enhanceActionWithImprovements(action, improvements, previous_result) + + # Execute enhanced action + result = await self._executeSingleAction(enhanced_action, self.workflow) + + # Validate the retry result + validator = ActionValidator(self) + context = { + 'task_description': 'Action retry', + 'previous_results': [], + 'retry_count': 1 + } + validation = await validator.validateActionResult(result, enhanced_action, context) + + # Add validation and retry metadata + result['validation'] = validation + result['is_retry'] = True + result['previous_error'] = previous_result.get('error', '') + result['applied_improvements'] = improvements + + # Update action status + if validation['status'] == 'success': + enhanced_action.setSuccess() + logger.info(f"Action retry successful: {enhanced_action.execMethod}.{enhanced_action.execAction}") + else: + enhanced_action.setError(validation.get('reason', 'Retry failed')) + logger.error(f"Action retry failed: {enhanced_action.execMethod}.{enhanced_action.execAction}") + + return result + + except Exception as e: + logger.error(f"Error retrying action: {str(e)}") + action.setError(str(e)) + return { + "status": "failed", + "error": str(e), + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "documents": [], + "is_retry": True, + "validation": { + 'status': 'fail', + 'reason': f'Retry execution error: {str(e)}', + 'confidence': 0.0, + 'improvements': [], + 'quality_score': 0, + 'missing_elements': [], + 'suggested_retry_approach': '' + } + } + + def _enhanceActionWithImprovements(self, action: TaskAction, improvements: List[str], previous_result: Dict[str, Any]) -> TaskAction: + """Enhance action parameters based on improvements and previous failure""" + enhanced_action = TaskAction( + id=action.id, + execMethod=action.execMethod, + execAction=action.execAction, + execParameters=action.execParameters.copy(), # Copy to avoid modifying original + execResultLabel=action.execResultLabel, + status=action.status + ) + + # Apply improvements based on failure patterns + for improvement in improvements: + if "incomplete_analysis" in improvement.lower(): + # Enhance AI prompt for more comprehensive analysis + current_prompt = enhanced_action.execParameters.get('aiPrompt', '') + enhanced_prompt = current_prompt + "\n\nIMPORTANT: Provide comprehensive, detailed analysis covering all aspects. Be thorough and include all relevant information." + enhanced_action.execParameters['aiPrompt'] = enhanced_prompt + + elif "missing_content" in improvement.lower(): + # Add content validation to prompt + current_prompt = enhanced_action.execParameters.get('aiPrompt', '') + enhanced_prompt = current_prompt + "\n\nVALIDATION: Ensure all content is extracted and no information is missed. Provide complete output." + enhanced_action.execParameters['aiPrompt'] = enhanced_prompt + + elif "wrong_format" in improvement.lower(): + # Add format specification + current_prompt = enhanced_action.execParameters.get('aiPrompt', '') + enhanced_prompt = current_prompt + "\n\nFORMAT: Provide output in structured, well-organized format with clear sections and proper formatting." + enhanced_action.execParameters['aiPrompt'] = enhanced_prompt + + elif "timeout" in improvement.lower(): + # Add timeout handling + current_prompt = enhanced_action.execParameters.get('aiPrompt', '') + enhanced_prompt = current_prompt + "\n\nEFFICIENCY: Provide concise but complete analysis. Focus on key information and avoid unnecessary details." + enhanced_action.execParameters['aiPrompt'] = enhanced_prompt + + # Apply specific improvements from validation + validation = previous_result.get('validation', {}) + suggested_approach = validation.get('suggested_retry_approach', '') + if suggested_approach: + current_prompt = enhanced_action.execParameters.get('aiPrompt', '') + enhanced_prompt = current_prompt + f"\n\nRETRY APPROACH: {suggested_approach}" + enhanced_action.execParameters['aiPrompt'] = enhanced_prompt + + return enhanced_action diff --git a/modules/workflow/managerWorkflow.py b/modules/workflow/managerWorkflow.py index ad06111e..e35f064a 100644 --- a/modules/workflow/managerWorkflow.py +++ b/modules/workflow/managerWorkflow.py @@ -42,7 +42,7 @@ class WorkflowManager: message = await self._sendFirstMessage(userInput, workflow) # Execute unified workflow - workflow_result = await self.chatManager.executeUnifiedWorkflow(userInput.prompt, workflow) + workflow_result = await self.chatManager.executeUnifiedWorkflow(userInput, workflow) # Process workflow results await self._processWorkflowResults(workflow, workflow_result, message)