From 5687b1aec77b641740473ccdb88a4ffbb4ae34a2 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Thu, 10 Jul 2025 23:58:27 +0200
Subject: [PATCH] system scope 3
---
modules/interfaces/interfaceChatObjects.py | 61 +-
modules/routes/routeWorkflows.py | 7 +-
modules/workflow/managerChat.py | 1385 ++++++++++++--------
modules/workflow/managerWorkflow.py | 2 +-
4 files changed, 911 insertions(+), 544 deletions(-)
diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py
index c29fd70e..4132027d 100644
--- a/modules/interfaces/interfaceChatObjects.py
+++ b/modules/interfaces/interfaceChatObjects.py
@@ -6,6 +6,7 @@ Uses the JSON connector for data access with added language support.
import os
import logging
import uuid
+import time
from datetime import datetime, UTC
from typing import Dict, Any, List, Optional, Union
@@ -167,7 +168,12 @@ class ChatObjects:
startedAt=workflow.get("startedAt", self._getCurrentTimestamp()),
logs=[ChatLog(**log) for log in workflow.get("logs", [])],
messages=[ChatMessage(**msg) for msg in workflow.get("messages", [])],
- stats=ChatStat(**workflow.get("dataStats", {})) if workflow.get("dataStats") else None,
+ stats=ChatStat(**workflow.get("dataStats", {})) if workflow.get("dataStats") else ChatStat(
+ bytesSent=0,
+ bytesReceived=0,
+ tokenCount=0,
+ processingTime=0
+ ),
mandateId=workflow.get("mandateId", self.currentUser.mandateId)
)
except Exception as e:
@@ -230,7 +236,12 @@ class ChatObjects:
startedAt=updated.get("startedAt", workflow.startedAt),
logs=[ChatLog(**log) for log in updated.get("logs", workflow.logs)],
messages=[ChatMessage(**msg) for msg in updated.get("messages", workflow.messages)],
- stats=ChatStat(**updated.get("dataStats", workflow.stats.dict() if workflow.stats else {})) if updated.get("dataStats") or workflow.stats else None,
+ stats=ChatStat(**updated.get("dataStats", workflow.stats.dict() if workflow.stats else {})) if updated.get("dataStats") or workflow.stats else ChatStat(
+ bytesSent=0,
+ bytesReceived=0,
+ tokenCount=0,
+ processingTime=0
+ ),
mandateId=updated.get("mandateId", workflow.mandateId)
)
@@ -327,11 +338,7 @@ class ChatObjects:
publishedAt=createdMessage.get("publishedAt", self._getCurrentTimestamp()),
stats=ChatStat(**createdMessage.get("stats", {})) if createdMessage.get("stats") else None
)
-
- # Update workflow stats for message creation (estimate bytes for message)
- message_size = len(createdMessage.get("message", "")) + sum(len(doc.get("filename", "")) for doc in createdMessage.get("documents", []))
- self.updateWorkflowStats(workflowId, bytesSent=0, bytesReceived=message_size)
-
+
except Exception as e:
logger.error(f"Error creating workflow message: {str(e)}")
return None
@@ -553,22 +560,31 @@ class ChatObjects:
logger.error(f"No permission to update workflow {workflowId} stats")
return False
- # Get current stats
- currentStats = workflow.stats.dict() if workflow.stats else {
- "bytesSent": 0,
- "bytesReceived": 0,
- "tokenCount": 0,
- "processingTime": 0
- }
+ # Get current stats - ensure we have proper defaults
+ if workflow.stats:
+ currentStats = workflow.stats.dict()
+ # Ensure all required fields exist
+ currentStats.setdefault("bytesSent", 0)
+ currentStats.setdefault("bytesReceived", 0)
+ currentStats.setdefault("tokenCount", 0)
+ currentStats.setdefault("processingTime", 0)
+ else:
+ currentStats = {
+ "bytesSent": 0,
+ "bytesReceived": 0,
+ "tokenCount": 0,
+ "processingTime": 0
+ }
- # Calculate processing time from workflow start
- workflow_start = datetime.fromisoformat(workflow.startedAt.replace('Z', '+00:00'))
- current_time = datetime.now(UTC)
- processing_time = (current_time - workflow_start).total_seconds()
+ # Simple processing time - just use current time
+ processing_time = time.time()
- # Update stats with incremental values
- currentStats["bytesSent"] = currentStats.get("bytesSent", 0) + bytesSent
- currentStats["bytesReceived"] = currentStats.get("bytesReceived", 0) + bytesReceived
+ # Update stats with incremental values - ensure no None values
+ current_bytes_sent = currentStats.get("bytesSent", 0) or 0
+ current_bytes_received = currentStats.get("bytesReceived", 0) or 0
+
+ currentStats["bytesSent"] = current_bytes_sent + bytesSent
+ currentStats["bytesReceived"] = current_bytes_received + bytesReceived
currentStats["tokenCount"] = currentStats["bytesSent"] + currentStats["bytesReceived"]
currentStats["processingTime"] = processing_time
@@ -840,6 +856,9 @@ class ChatObjects:
# Create workflow
workflow = self.createWorkflow(workflowData)
+ # Initialize stats for the new workflow
+ self.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0)
+
# Remove the 'Workflow started' log entry
# Start workflow processing
diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py
index e01cfeb7..6b14379d 100644
--- a/modules/routes/routeWorkflows.py
+++ b/modules/routes/routeWorkflows.py
@@ -71,7 +71,12 @@ async def get_workflows(
startedAt=workflow_data.get("startedAt", appInterface._getCurrentTimestamp()),
logs=[ChatLog(**log) for log in workflow_data.get("logs", [])],
messages=[ChatMessage(**msg) for msg in workflow_data.get("messages", [])],
- stats=ChatStat(**workflow_data.get("dataStats", {})) if workflow_data.get("dataStats") else None,
+ stats=ChatStat(**workflow_data.get("dataStats", {})) if workflow_data.get("dataStats") else ChatStat(
+ bytesSent=0,
+ bytesReceived=0,
+ tokenCount=0,
+ processingTime=0
+ ),
mandateId=workflow_data.get("mandateId", currentUser.mandateId or "")
)
workflows.append(workflow)
diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py
index 470653d8..7b77aa6e 100644
--- a/modules/workflow/managerChat.py
+++ b/modules/workflow/managerChat.py
@@ -8,13 +8,221 @@ from datetime import datetime, UTC
from modules.interfaces.interfaceAppModel import User
from modules.interfaces.interfaceChatModel import (
- TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow
+ TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, UserInputRequest, ActionResult,
+ ExtractedContent, ContentItem, ContentMetadata, DocumentExchange
)
from modules.workflow.serviceCenter import ServiceCenter
from modules.interfaces.interfaceChatObjects import ChatObjects
logger = logging.getLogger(__name__)
+# ===== STATE MANAGEMENT AND VALIDATION CLASSES =====
+
+class TaskExecutionState:
+ """Manages state during task execution with retry logic"""
+ def __init__(self, task_step: dict):
+ self.task_step = task_step
+ self.successful_actions = [] # Preserved across retries
+ self.failed_actions = [] # For analysis
+ self.current_action_index = 0
+ self.retry_count = 0
+ self.improvements = []
+ self.partial_results = {} # Store intermediate results
+ self.max_retries = 3
+ def addSuccessfulAction(self, action_result: dict):
+ self.successful_actions.append(action_result)
+ if action_result.get('resultLabel'):
+ self.partial_results[action_result['resultLabel']] = action_result
+ def addFailedAction(self, action_result: dict):
+ self.failed_actions.append(action_result)
+ def getAvailableResults(self) -> list:
+ return [result.get('resultLabel', '') for result in self.successful_actions if result.get('resultLabel')]
+ def shouldRetryTask(self) -> bool:
+ return len(self.successful_actions) > 0 and len(self.failed_actions) > 0
+ def canRetry(self) -> bool:
+ return self.retry_count < self.max_retries
+ def incrementRetryCount(self):
+ self.retry_count += 1
+ def getFailurePatterns(self) -> list:
+ patterns = []
+ for action in self.failed_actions:
+ error = action.get('error', '').lower()
+ if "timeout" in error:
+ patterns.append("timeout_issues")
+ elif "document_not_found" in error or "file not found" in error:
+ patterns.append("document_reference_issues")
+ elif "empty_result" in error or "no content" in error:
+ patterns.append("content_extraction_issues")
+ elif "invalid_format" in error or "wrong format" in error:
+ patterns.append("format_issues")
+ elif "permission" in error or "access denied" in error:
+ patterns.append("permission_issues")
+ return list(set(patterns))
+
+class ActionValidator:
+ """Generic AI-based action result validation"""
+ def __init__(self, chat_manager):
+ self.chat_manager = chat_manager
+
+ async def validateActionResult(self, action_result: ActionResult, action: TaskAction, context: dict) -> dict:
+ """Generic action validation using AI"""
+ try:
+ # Create generic validation prompt
+ prompt = self._createGenericValidationPrompt(action_result, action, context)
+ response = await self.chat_manager._callAIWithCircuitBreaker(prompt, "action_validation")
+ validation = self._parseValidationResponse(response)
+
+ # Add action metadata
+ validation['action_id'] = action.id
+ validation['action_method'] = action.execMethod
+ validation['action_name'] = action.execAction
+ validation['result_label'] = action.execResultLabel
+
+ return validation
+ except Exception as e:
+ logger.error(f"Error validating action result: {str(e)}")
+ return {
+ 'status': 'success',
+ 'reason': f'Validation failed: {str(e)}',
+ 'confidence': 0.5,
+ 'improvements': [],
+ 'action_id': action.id,
+ 'action_method': action.execMethod,
+ 'action_name': action.execAction,
+ 'result_label': action.execResultLabel
+ }
+
+ def _createGenericValidationPrompt(self, action_result: ActionResult, action: TaskAction, context: dict) -> str:
+ """Create a validation prompt focused on result file delivery"""
+ # Extract data from ActionResult model
+ success = action_result.success
+ result_data = action_result.data
+ error = action_result.error
+ validation_messages = action_result.validation
+
+ # Extract result text from data
+ result_text = result_data.get("result", "") if isinstance(result_data, dict) else str(result_data)
+
+ # Get documents from ActionResult data
+ documents = result_data.get("documents", []) if isinstance(result_data, dict) else []
+ doc_count = len(documents)
+
+ # Extract expected result format from action parameters
+ expected_result_label = action.execResultLabel
+ expected_format = action.execParameters.get('outputFormat', 'unknown')
+
+ # Analyze delivered documents and content
+ delivered_files = []
+ content_items = []
+
+ # Check for ChatDocument objects
+ for doc in documents:
+ if hasattr(doc, 'filename'):
+ delivered_files.append(doc.filename)
+ elif isinstance(doc, dict) and 'filename' in doc:
+ delivered_files.append(doc['filename'])
+ else:
+ delivered_files.append(f"document_{len(delivered_files)}")
+
+ # Check for ExtractedContent in result data
+ if isinstance(result_data, dict):
+ if 'extractedContent' in result_data:
+ extracted_content = result_data['extractedContent']
+ if hasattr(extracted_content, 'contents'):
+ content_items = extracted_content.contents
+ elif 'contents' in result_data:
+ content_items = result_data['contents']
+
+ # Analyze content items
+ content_summary = []
+ for item in content_items:
+ if hasattr(item, 'label') and hasattr(item, 'metadata'):
+ content_summary.append(f"{item.label}: {item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else 'unknown'}")
+
+ return f"""You are an action result validator. Your primary focus is to validate that the action delivered the promised result files in the promised format.
+
+ACTION DETAILS:
+- Method: {action.execMethod}
+- Action: {action.execAction}
+- Expected Result Label: {expected_result_label}
+- Expected Format: {expected_format}
+- Parameters: {json.dumps(action.execParameters, indent=2)}
+
+RESULT TO VALIDATE:
+- Success: {success}
+- Result Data: {result_text[:500]}{'...' if len(result_text) > 500 else ''}
+- Error: {error}
+- Validation Messages: {', '.join(validation_messages) if validation_messages else 'None'}
+- Documents Produced: {doc_count}
+- Delivered Files: {', '.join(delivered_files) if delivered_files else 'None'}
+- Content Items: {', '.join(content_summary) if content_summary else 'None'}
+
+CRITICAL VALIDATION CRITERIA:
+1. **File Delivery**: Did the action deliver the promised result file(s)?
+2. **Format Compliance**: Are the delivered files in the promised format?
+3. **Result Label Match**: Does the result match the expected result label?
+4. **Content Quality**: Is the content of the delivered files usable and complete?
+5. **Content Processing**: If content extraction was expected, was it performed correctly?
+
+CONTEXT:
+- Task Description: {context.get('task_description', 'Unknown')}
+- Previous Results: {', '.join(context.get('previous_results', []))}
+
+VALIDATION INSTRUCTIONS:
+1. Check if the expected result label "{expected_result_label}" is present in the result
+2. Verify that files were delivered when expected
+3. Validate that the delivered files match the expected format "{expected_format}"
+4. Assess if the content is complete and usable
+5. Check if content extraction was performed when expected
+6. Determine if retry would improve file delivery or format compliance
+
+REQUIRED JSON RESPONSE:
+{{
+ "status": "success|retry|fail",
+ "reason": "Detailed explanation focusing on file delivery and format compliance",
+ "confidence": 0.0-1.0,
+ "improvements": ["specific file delivery improvements", "format compliance fixes"],
+ "quality_score": 1-10,
+ "missing_elements": ["missing files", "format issues"],
+ "suggested_retry_approach": "Specific approach for retry if status is retry"
+}}
+
+NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
+
+ def _parseValidationResponse(self, response: str) -> dict:
+ """Parse the AI validation response"""
+ try:
+ json_start = response.find('{')
+ json_end = response.rfind('}') + 1
+ if json_start == -1 or json_end == 0:
+ raise ValueError("No JSON found in validation response")
+
+ json_str = response[json_start:json_end]
+ validation = json.loads(json_str)
+
+ if 'status' not in validation:
+ raise ValueError("Validation response missing 'status' field")
+
+ # Set defaults for optional fields
+ validation.setdefault('confidence', 0.5)
+ validation.setdefault('improvements', [])
+ validation.setdefault('quality_score', 5)
+ validation.setdefault('missing_elements', [])
+ validation.setdefault('suggested_retry_approach', '')
+
+ return validation
+ except Exception as e:
+ logger.error(f"Error parsing validation response: {str(e)}")
+ return {
+ 'status': 'success',
+ 'reason': f'Parse error: {str(e)}',
+ 'confidence': 0.5,
+ 'improvements': [],
+ 'quality_score': 5,
+ 'missing_elements': [],
+ 'suggested_retry_approach': ''
+ }
+
class ChatManager:
"""Chat manager with improved AI integration and method handling"""
@@ -62,21 +270,15 @@ class ChatManager:
task_plan = self._parseTaskPlanResponse(response)
if not self._validateTaskPlan(task_plan):
- logger.warning("Generated task plan failed validation, using fallback")
- task_plan = self._createFallbackTaskPlan({
- 'user_request': userInput,
- 'available_documents': self._getAvailableDocuments(workflow)
- })
+ logger.error("Generated task plan failed validation")
+ raise Exception("AI-generated task plan failed validation - AI is required for task planning")
logger.info(f"High-level task planning completed: {len(task_plan.get('tasks', []))} tasks")
return task_plan
except Exception as e:
logger.error(f"Error in high-level task planning: {str(e)}")
- return self._createFallbackTaskPlan({
- 'user_request': userInput,
- 'available_documents': self._getAvailableDocuments(workflow)
- })
+ raise Exception(f"AI is required for task planning but failed: {str(e)}")
# Phase 2: Task Definition and Action Generation
async def defineTaskActions(self, task_step: Dict[str, Any], workflow: ChatWorkflow, previous_results: List[str] = None,
@@ -135,7 +337,7 @@ class ChatManager:
# Phase 3: Action Execution
async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[Dict[str, Any]]:
- """Phase 3: Execute all actions for a task"""
+ """Phase 3: Execute all actions for a task with retry mechanism"""
try:
logger.info(f"Executing {len(task_actions)} task actions")
@@ -143,14 +345,15 @@ class ChatManager:
for i, action in enumerate(task_actions):
logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}")
- # Execute single action
+ # Execute single action with retry mechanism
result = await self._executeSingleAction(action, workflow)
results.append(result)
- # If action failed, stop execution
+ # If action failed after all retries, continue with next action instead of stopping
if result.get('status') == 'failed':
- logger.error(f"Action {i+1} failed, stopping task execution")
- break
+ logger.error(f"Action {i+1} failed after retries, continuing with next action")
+ # Don't break - continue with remaining actions
+ continue
logger.info(f"Task action execution completed: {len(results)} results")
return results
@@ -397,36 +600,7 @@ class ChatManager:
logger.error(f"Error validating task plan: {str(e)}")
return False
- def _createFallbackTaskPlan(self, context: Dict[str, Any]) -> Dict[str, Any]:
- """Create a fallback task plan when AI generation fails"""
- logger.warning("Creating fallback task plan due to AI generation failure")
-
- return {
- "overview": "Fallback task plan - comprehensive document analysis and processing",
- "tasks": [
- {
- "id": "task_1",
- "description": "Extract and analyze all provided documents comprehensively",
- "dependencies": [],
- "expected_outputs": ["comprehensive_document_analysis"],
- "success_criteria": ["All documents processed and analyzed"],
- "required_documents": context.get('available_documents', []),
- "estimated_complexity": "medium",
- "ai_prompt": "Extract and analyze all content from the provided documents. Identify key information, patterns, and insights that are relevant to the user's request. Provide a comprehensive analysis that can be used for further processing."
- },
- {
- "id": "task_2",
- "description": "Generate comprehensive output based on analysis",
- "dependencies": ["task_1"],
- "expected_outputs": ["final_output"],
- "success_criteria": ["Output generated and formatted appropriately"],
- "required_documents": ["comprehensive_document_analysis"],
- "estimated_complexity": "low",
- "ai_prompt": "Based on the comprehensive document analysis, generate the final output that addresses the user's original request. Format the output appropriately and ensure it meets the user's requirements."
- }
- ]
- }
-
+
def _validateActions(self, actions: List[Dict[str, Any]], context: Dict[str, Any]) -> bool:
"""Validate generated actions"""
try:
@@ -473,40 +647,7 @@ class ChatManager:
logger.error(f"Error validating actions: {str(e)}")
return False
- def _createFallbackActions(self, task_step: Dict[str, Any], context: Dict[str, Any]) -> List[Dict[str, Any]]:
- """Create fallback actions when AI generation fails with retry context awareness"""
- logger.warning("Creating fallback actions due to AI generation failure")
-
- # Get available documents
- available_docs = context.get('available_documents', [])
- retry_count = context.get('retry_count', 0)
- previous_action_results = context.get('previous_action_results', [])
-
- if not available_docs:
- logger.warning("No available documents for fallback actions")
- return []
-
- # Create fallback actions for document analysis
- fallback_actions = []
- for i, doc in enumerate(available_docs):
- # Enhanced AI prompt for retry scenarios
- ai_prompt = "Fallback document analysis for " + doc
- if retry_count > 0 and previous_action_results:
- ai_prompt += f". Previous attempt failed - ensure comprehensive extraction with detailed analysis."
-
- fallback_actions.append({
- "method": "document",
- "action": "analyze",
- "parameters": {
- "documentList": ["task1_previous_results"],
- "aiPrompt": ai_prompt
- },
- "resultLabel": f"task1_fallback_retry{retry_count}:" + doc + ":analysis",
- "description": f"Fallback document analysis for {doc} (attempt {retry_count + 1})"
- })
-
- logger.info(f"Created {len(fallback_actions)} fallback actions")
- return fallback_actions
+
# ===== Prompt Creation Methods =====
@@ -834,18 +975,18 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# Validate actions
if not self._validateActions(actions, context):
- logger.warning("Generated actions failed validation, using fallback actions")
- actions = self._createFallbackActions(context['task_step'], context)
+ logger.error("Generated actions failed validation")
+ raise Exception("AI-generated actions failed validation - AI is required for action generation")
logger.info(f"Generated {len(actions)} actions for task step")
return actions
except Exception as e:
logger.error(f"Error generating actions for task step: {str(e)}")
- return self._createFallbackActions(context['task_step'], context)
+ raise Exception(f"AI is required for action generation but failed: {str(e)}")
- async def _executeSingleAction(self, action: TaskAction, workflow: ChatWorkflow) -> Dict[str, Any]:
- """Execute a single action and return result with enhanced document processing"""
+ async def _executeSingleAction(self, action: TaskAction, workflow: ChatWorkflow) -> ActionResult:
+ """Execute a single action and return ActionResult with enhanced document processing"""
try:
# Execute the actual method action using the service center
result = await self.service.executeAction(
@@ -925,28 +1066,46 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
'document': doc
})
- return {
- "status": "completed" if result.success else "failed",
- "result": result.data.get("result", ""),
- "error": result.error or "",
- "resultLabel": result_label,
- "documents": processed_documents,
- "actionId": action.id,
- "actionMethod": action.execMethod,
- "actionName": action.execAction
- }
+ # Create ActionResult with processed data
+ return ActionResult(
+ success=result.success,
+ data={
+ "result": result.data.get("result", ""),
+ "documents": processed_documents,
+ "actionId": action.id,
+ "actionMethod": action.execMethod,
+ "actionName": action.execAction,
+ "resultLabel": result_label
+ },
+ metadata={
+ "actionId": action.id,
+ "actionMethod": action.execMethod,
+ "actionName": action.execAction,
+ "resultLabel": result_label
+ },
+ validation=[],
+ error=result.error or ""
+ )
except Exception as e:
logger.error(f"Error executing single action: {str(e)}")
action.setError(str(e))
- return {
- "status": "failed",
- "error": str(e),
- "actionId": action.id,
- "actionMethod": action.execMethod,
- "actionName": action.execAction,
- "documents": []
- }
+ return ActionResult(
+ success=False,
+ data={
+ "actionId": action.id,
+ "actionMethod": action.execMethod,
+ "actionName": action.execAction,
+ "documents": []
+ },
+ metadata={
+ "actionId": action.id,
+ "actionMethod": action.execMethod,
+ "actionName": action.execAction
+ },
+ validation=[],
+ error=str(e)
+ )
async def _createActionMessage(self, action: TaskAction, result: Any, workflow: ChatWorkflow, result_label: str = None) -> None:
"""Create and store a message for the action result in the workflow with enhanced document processing"""
@@ -1457,8 +1616,8 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# ===== UNIFIED WORKFLOW EXECUTION =====
- async def executeUnifiedWorkflow(self, userInput: str, workflow: ChatWorkflow) -> Dict[str, Any]:
- """Execute a unified workflow with all phases"""
+ async def executeUnifiedWorkflow(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> Dict[str, Any]:
+ """Execute a unified workflow with state management and action-level validation"""
try:
logger.info(f"Starting unified workflow execution for workflow {workflow.id}")
start_time = time.time()
@@ -1473,7 +1632,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
logger.info(f"Processed {len(documents)} documents")
# Calculate and update user input stats
- user_input_size = self.service.calculateUserInputSize(userInput)
+ user_input_size = self.service.calculateUserInputSize(userInput.prompt)
self.service.updateWorkflowStats(eventLabel="userinput", bytesReceived=user_input_size)
# Phase 1: High-Level Task Planning
@@ -1515,10 +1674,9 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
task_plan_log['tasks'].append(task_log)
logger.debug(f"TASK PLAN CREATED: {json.dumps(task_plan_log, indent=2, ensure_ascii=False)}")
- # Execute each task step with retry logic
+ # Execute each task step with state management
workflow_results = []
previous_results = []
- max_retries = 3 # Maximum retries per task
for i, task_step in enumerate(task_plan['tasks']):
task_description = task_step.get('description', 'Unknown')
@@ -1535,435 +1693,89 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
"agentName": "System"
})
- # Retry loop for each task
- task_success = False
- retry_count = 0
- task_actions = []
- action_results = []
- review_result = {}
- handover_data = {}
- previous_action_results = [] # Track previous action results for retry context
- previous_review_feedback = "" # Track previous review feedback for retry context
-
- while not task_success and retry_count < max_retries:
- if retry_count > 0:
- logger.info(f"--- RETRY {retry_count}/{max_retries} FOR TASK {i+1} ---")
- # Create user-friendly retry log
- self.chatInterface.createWorkflowLog({
- "workflowId": workflow.id,
- "message": f"Retrying task {i+1} (attempt {retry_count}/{max_retries})",
- "type": "warning",
- "status": "running",
- "progress": progress,
- "agentName": "System"
- })
-
- try:
- # Phase 2: Define Task Actions
- logger.info(f"--- PHASE 2: DEFINING ACTIONS FOR TASK {i+1} ---")
-
- # Enhanced context for retries - include previous results and feedback
- enhanced_previous_results = previous_results.copy() if previous_results else []
- if retry_count > 0 and previous_action_results:
- # Add previous action results to context
- for result in previous_action_results:
- if result.get('resultLabel'):
- enhanced_previous_results.append(result.get('resultLabel'))
-
- # Create enhanced context with retry information
- context = {
- 'task_step': task_step,
+ # Create context for task execution
+ task_context = {
'workflow': workflow,
'workflow_id': workflow.id,
'available_documents': self._getAvailableDocuments(workflow),
- 'previous_results': enhanced_previous_results,
- 'improvements': previous_review_feedback if retry_count > 0 else None,
- 'retry_count': retry_count,
- 'previous_action_results': previous_action_results if retry_count > 0 else [],
- 'previous_review_result': review_result if retry_count > 0 else None
- }
-
- task_actions = await self.defineTaskActions(task_step, workflow, enhanced_previous_results, context)
- if not task_actions:
- logger.warning(f"No actions defined for task {i+1}, skipping")
- break
-
- # Log task actions (convert to serializable format with metadata only)
- task_actions_serializable = []
- for action in task_actions:
- # Extract only metadata from parameters, not document content
- parameters_metadata = {}
- if hasattr(action, 'execParameters') and action.execParameters:
- for key, value in action.execParameters.items():
- if key == 'documentList':
- # Log document list as count and labels only
- if isinstance(value, list):
- parameters_metadata[key] = {
- 'count': len(value),
- 'labels': [str(v).split(':')[-1] if ':' in str(v) else str(v) for v in value]
- }
- else:
- parameters_metadata[key] = str(value)
- elif key == 'aiPrompt':
- # Truncate AI prompts to avoid logging large content
- parameters_metadata[key] = str(value)[:100] + '...' if len(str(value)) > 100 else str(value)
- else:
- parameters_metadata[key] = str(value)
-
- action_dict = {
- 'execMethod': action.execMethod,
- 'execAction': action.execAction,
- 'execParameters': parameters_metadata,
- 'execResultLabel': action.execResultLabel,
- 'status': action.status.value if hasattr(action.status, 'value') else str(action.status)
- }
- task_actions_serializable.append(action_dict)
- logger.debug(f"TASK {i+1} ACTIONS CREATED: {json.dumps(task_actions_serializable, indent=2, ensure_ascii=False)}")
-
- # Phase 3: Execute Task Actions
- logger.info(f"--- PHASE 3: EXECUTING TASK {i+1} ACTIONS ---")
- action_results = await self.executeTaskActions(task_actions, workflow)
-
- # Update stats for action execution
- # Action stats are already handled by the service center during AI calls
-
- # Create user-friendly action completion log with quality metrics
- successful_actions = sum(1 for result in action_results if result.get('status') == 'completed')
- total_actions = len(action_results)
-
- if total_actions > 0:
- if successful_actions == total_actions:
- log_type = "success"
- elif successful_actions == 0:
- log_type = "error"
- else:
- log_type = "warning"
- self.chatInterface.createWorkflowLog({
- "workflowId": workflow.id,
- "message": f"Successful actions: {successful_actions}/{total_actions}",
- "type": log_type,
- "status": "running",
- "progress": progress + 10
- })
-
- # Log action results (with metadata only)
- action_results_metadata = []
- for result in action_results:
- # Extract document metadata only
- documents_metadata = []
- for doc in result.get('documents', []):
- if hasattr(doc, 'filename'):
- documents_metadata.append({
- 'filename': doc.filename,
- 'fileSize': getattr(doc, 'fileSize', 0),
- 'mimeType': getattr(doc, 'mimeType', 'unknown')
- })
- elif isinstance(doc, dict):
- documents_metadata.append({
- 'filename': doc.get('filename', 'unknown'),
- 'fileSize': doc.get('fileSize', 0),
- 'mimeType': doc.get('mimeType', 'unknown')
- })
-
- result_metadata = {
- 'status': result.get('status', ''),
- 'result_summary': result.get('result', '')[:200] + '...' if len(result.get('result', '')) > 200 else result.get('result', ''),
- 'error': result.get('error', ''),
- 'resultLabel': result.get('resultLabel', ''),
- 'documents_count': len(documents_metadata),
- 'documents_metadata': documents_metadata,
- 'actionId': result.get('actionId', ''),
- 'actionMethod': result.get('actionMethod', ''),
- 'actionName': result.get('actionName', '')
- }
- action_results_metadata.append(result_metadata)
- logger.debug(f"TASK {i+1} ACTION RESULTS: {json.dumps(action_results_metadata, indent=2, ensure_ascii=False)}")
-
- # Phase 4: Review Task Completion
- logger.info(f"--- PHASE 4: REVIEWING TASK {i+1} COMPLETION ---")
- review_result = await self.reviewTaskCompletion(task_step, task_actions, action_results, workflow)
-
- # Update stats for task review
- # Task review stats are already handled by the service center during AI calls
-
- # Create user-friendly review log with quality metrics
- quality_metrics = review_result.get('quality_metrics', {})
- quality_score = quality_metrics.get('score', 0)
- confidence = quality_metrics.get('confidence', 0)
-
- review_status = review_result.get('status', 'unknown')
- if review_status == 'success':
- self.chatInterface.createWorkflowLog({
- "workflowId": workflow.id,
- "message": f"🎯 Task completed successfully with quality score {quality_score} and confidence {confidence}",
- "type": "success",
- "status": "running",
- "progress": progress + 20
- })
- elif review_status == 'retry':
- # Extract improvement details
- improvements = review_result.get('improvements', '')
- reason = review_result.get('reason', '')
- unmet_criteria = review_result.get('unmet_criteria', [])
-
- # Build detailed message
- retry_details = []
- if reason:
- retry_details.append(f"Reason: {reason}")
- if improvements:
- retry_details.append(f"Improvements: {improvements}")
- if unmet_criteria:
- retry_details.append(f"Missing criteria: {', '.join(unmet_criteria[:3])}{'...' if len(unmet_criteria) > 3 else ''}")
-
- retry_message = f"🔄 Task needs improvement"
- if retry_details:
- retry_message += f"\n{chr(10).join(retry_details)}"
-
- self.chatInterface.createWorkflowLog({
- "workflowId": workflow.id,
- "message": retry_message,
- "type": "warning",
- "status": "running",
- "progress": progress + 15
- })
- else:
- # Extract failure details
- reason = review_result.get('reason', '')
- unmet_criteria = review_result.get('unmet_criteria', [])
- missing_outputs = review_result.get('missing_outputs', [])
-
- # Build detailed failure message
- failure_details = []
- if reason:
- failure_details.append(f"Reason: {reason}")
- if unmet_criteria:
- failure_details.append(f"Unmet criteria: {', '.join(unmet_criteria[:3])}{'...' if len(unmet_criteria) > 3 else ''}")
- if missing_outputs:
- failure_details.append(f"Missing outputs: {', '.join(missing_outputs[:3])}{'...' if len(missing_outputs) > 3 else ''}")
-
- failure_message = f"❌ Task failed"
- if failure_details:
- failure_message += f"\n{chr(10).join(failure_details)}"
-
- self.chatInterface.createWorkflowLog({
- "workflowId": workflow.id,
- "message": failure_message,
- "type": "error",
- "status": "running",
- "progress": progress + 15
- })
-
- # Log review result (with metadata only)
- review_result_metadata = {
- 'status': review_result.get('status', ''),
- 'reason': review_result.get('reason', ''),
- 'improvements': review_result.get('improvements', ''),
- 'quality_score': review_result.get('quality_score', 0),
- 'missing_outputs_count': len(review_result.get('missing_outputs', [])),
- 'met_criteria_count': len(review_result.get('met_criteria', [])),
- 'unmet_criteria_count': len(review_result.get('unmet_criteria', []))
- }
- logger.debug(f"TASK {i+1} REVIEW RESULT: {json.dumps(review_result_metadata, indent=2, ensure_ascii=False)}")
-
- # Phase 5: Prepare Task Handover
- logger.info(f"--- PHASE 5: PREPARING TASK {i+1} HANDOVER ---")
- handover_data = await self.prepareTaskHandover(task_step, task_actions, review_result, workflow)
-
- # Log handover data (with metadata only)
- handover_data_metadata = {
- 'task_step_id': handover_data.get('task_step', {}).get('id', ''),
- 'task_actions_count': len(handover_data.get('task_actions', [])),
- 'review_status': handover_data.get('review_result', {}).get('status', ''),
- 'next_task_ready': handover_data.get('next_task_ready', False),
- 'available_results_count': len(handover_data.get('available_results', []))
- }
- logger.debug(f"TASK {i+1} HANDOVER DATA: {json.dumps(handover_data_metadata, indent=2, ensure_ascii=False)}")
-
- # Check if task is successful or needs retry
- review_status = review_result.get('status', 'unknown')
- if review_status == 'success':
- task_success = True
- logger.info(f"Task {i+1} completed successfully")
- elif review_status == 'retry':
- # Store current results and feedback for next retry
- previous_action_results = action_results.copy()
- previous_review_feedback = review_result.get('improvements', '')
-
- retry_count += 1
- if retry_count > max_retries:
- logger.error(f"Task {i+1} failed after {max_retries} retries")
- task_success = False
- else:
- logger.info(f"Task {i+1} needs retry (attempt {retry_count}/{max_retries})")
- logger.info(f"Previous feedback: {previous_review_feedback}")
- # Add delay before retry
- await asyncio.sleep(2)
- continue
- elif review_status == 'failed':
- logger.error(f"Task {i+1} failed permanently")
- task_success = False
- break
- else:
- logger.warning(f"Unknown review status '{review_status}' for task {i+1}")
- task_success = False
- break
-
- except Exception as e:
- logger.error(f"Error processing task {i+1} (attempt {retry_count + 1}): {str(e)}")
- retry_count += 1
- if retry_count >= max_retries:
- logger.error(f"Task {i+1} failed after {max_retries} retries due to exceptions")
- task_success = False
- break
- else:
- logger.info(f"Retrying task {i+1} after exception")
- await asyncio.sleep(2)
- continue
+ 'previous_results': previous_results,
+ 'task_description': task_description
+ }
- # Collect results regardless of success/failure
- workflow_results.append({
- 'task_step': task_step,
- 'task_actions': task_actions,
- 'action_results': action_results,
- 'review_result': review_result,
- 'handover_data': handover_data,
- 'retry_count': retry_count,
- 'task_success': task_success
- })
+ # Execute task with state management
+ task_result = await self.executeTaskWithStateManagement(task_step, workflow, task_context)
- # Update previous results for next task if successful
- if task_success and handover_data.get('next_task_ready', False):
- previous_results = handover_data.get('available_results', [])
+ # Log task result
+ if task_result['status'] == 'success':
+ successful_actions = len(task_result.get('successful_actions', []))
+ failed_actions = len(task_result.get('failed_actions', []))
+ quality_metrics = task_result.get('quality_metrics', {})
+ quality_score = quality_metrics.get('score', 0)
+
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"🎯 Task {i+1} completed successfully ({successful_actions} actions, quality score: {quality_score})",
+ "type": "success",
+ "status": "running",
+ "progress": progress + 20
+ })
+
+ # Update previous results for next task
+ previous_results = task_result.get('successful_actions', [])
+
else:
- # If task failed, stop workflow
- logger.warning(f"Task {i+1} not successful, stopping workflow")
- break
-
- # Final workflow summary
- successful_tasks = sum(1 for result in workflow_results if result.get('task_success', False))
- total_tasks = len(task_plan['tasks'])
-
- # Final workflow stats are already handled by the service center during AI calls
+ # Task failed - provide detailed feedback and stop workflow
+ failed_actions = len(task_result.get('failed_actions', []))
+ retry_count = task_result.get('retry_count', 0)
+ reason = task_result.get('reason', 'Unknown error')
+
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"❌ Task {i+1} failed after {retry_count} retries: {reason}",
+ "type": "error",
+ "status": "failed",
+ "progress": progress + 15
+ })
+
+ # Generate detailed failure feedback
+ failure_feedback = self._generateTaskFailureFeedback(task_step, task_result, i+1, len(task_plan['tasks']))
+
+ # Stop workflow and return failure
+ return {
+ 'status': 'failed',
+ 'completed_tasks': i,
+ 'total_tasks': len(task_plan['tasks']),
+ 'failed_task': task_step,
+ 'failure_reason': reason,
+ 'feedback': failure_feedback,
+ 'execution_time': time.time() - start_time
+ }
+
+ # Add task result to workflow results
+ workflow_results.append(task_result)
# Calculate total processing time
total_processing_time = time.time() - start_time
- # Create final user-friendly completion log
- if successful_tasks == total_tasks:
- self.chatInterface.createWorkflowLog({
- "workflowId": workflow.id,
- "message": f"🎉 Workflow completed ({successful_tasks}/{total_tasks} tasks)",
- "type": "success",
- "status": "completed",
- "progress": 100
- })
- elif successful_tasks > 0:
- self.chatInterface.createWorkflowLog({
- "workflowId": workflow.id,
- "message": f"⚠️ Workflow partially completed ({successful_tasks}/{total_tasks} tasks)",
- "type": "warning",
- "status": "completed",
- "progress": 100
- })
- else:
- self.chatInterface.createWorkflowLog({
- "workflowId": workflow.id,
- "message": f"❌ Workflow failed ({successful_tasks}/{total_tasks} tasks)",
- "type": "error",
- "status": "failed",
- "progress": 100
- })
-
- # Create serializable workflow results (with metadata only)
- workflow_results_serializable = []
- for result in workflow_results:
- # Extract action results metadata
- action_results_metadata = []
- for action_result in result.get('action_results', []):
- documents_metadata = []
- for doc in action_result.get('documents', []):
- if hasattr(doc, 'filename'):
- documents_metadata.append({
- 'filename': doc.filename,
- 'fileSize': getattr(doc, 'fileSize', 0),
- 'mimeType': getattr(doc, 'mimeType', 'unknown')
- })
- elif isinstance(doc, dict):
- documents_metadata.append({
- 'filename': doc.get('filename', 'unknown'),
- 'fileSize': doc.get('fileSize', 0),
- 'mimeType': doc.get('mimeType', 'unknown')
- })
-
- action_result_metadata = {
- 'status': action_result.get('status', ''),
- 'result_summary': action_result.get('result', '')[:200] + '...' if len(action_result.get('result', '')) > 200 else action_result.get('result', ''),
- 'error': action_result.get('error', ''),
- 'resultLabel': action_result.get('resultLabel', ''),
- 'documents_count': len(documents_metadata),
- 'documents_metadata': documents_metadata,
- 'actionId': action_result.get('actionId', ''),
- 'actionMethod': action_result.get('actionMethod', ''),
- 'actionName': action_result.get('actionName', ''),
- 'success_indicator': 'documents' if len(documents_metadata) > 0 else 'text_result' if action_result.get('result', '').strip() else 'none'
- }
- action_results_metadata.append(action_result_metadata)
-
- serializable_result = {
- 'task_step': result['task_step'],
- 'action_results': action_results_metadata,
- 'review_result': result['review_result'],
- 'handover_data': {
- 'task_step_id': result['handover_data'].get('task_step', {}).get('id', ''),
- 'task_actions_count': len(result['handover_data'].get('task_actions', [])),
- 'review_status': result['handover_data'].get('review_result', {}).get('status', ''),
- 'next_task_ready': result['handover_data'].get('next_task_ready', False),
- 'available_results_count': len(result['handover_data'].get('available_results', []))
- },
- 'retry_count': result.get('retry_count', 0),
- 'task_success': result.get('task_success', False)
- }
- # Convert task_actions to serializable format with metadata only
- if 'task_actions' in result:
- task_actions_serializable = []
- for action in result['task_actions']:
- # Extract only metadata from parameters
- parameters_metadata = {}
- if hasattr(action, 'execParameters') and action.execParameters:
- for key, value in action.execParameters.items():
- if key == 'documentList':
- if isinstance(value, list):
- parameters_metadata[key] = {
- 'count': len(value),
- 'labels': [str(v).split(':')[-1] if ':' in str(v) else str(v) for v in value]
- }
- else:
- parameters_metadata[key] = str(value)
- elif key == 'aiPrompt':
- parameters_metadata[key] = str(value)[:100] + '...' if len(str(value)) > 100 else str(value)
- else:
- parameters_metadata[key] = str(value)
-
- action_dict = {
- 'execMethod': action.execMethod,
- 'execAction': action.execAction,
- 'execParameters': parameters_metadata,
- 'execResultLabel': action.execResultLabel,
- 'status': action.status.value if hasattr(action.status, 'value') else str(action.status)
- }
- task_actions_serializable.append(action_dict)
- serializable_result['task_actions'] = task_actions_serializable
- workflow_results_serializable.append(serializable_result)
+ # Create final success log
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"🎉 Workflow completed successfully ({len(workflow_results)}/{len(task_plan['tasks'])} tasks)",
+ "type": "success",
+ "status": "completed",
+ "progress": 100
+ })
+ # Create workflow summary
workflow_summary = {
- 'status': 'completed' if successful_tasks == total_tasks else 'partial',
- 'successful_tasks': successful_tasks,
- 'total_tasks': total_tasks,
- 'workflow_results_count': len(workflow_results_serializable),
+ 'status': 'completed',
+ 'completed_tasks': len(workflow_results),
+ 'total_tasks': len(task_plan['tasks']),
+ 'execution_time': total_processing_time,
'final_results_count': len(previous_results)
}
- logger.info(f"=== UNIFIED WORKFLOW COMPLETED: {successful_tasks}/{total_tasks} tasks successful ===")
+ logger.info(f"=== UNIFIED WORKFLOW COMPLETED: {len(workflow_results)}/{len(task_plan['tasks'])} tasks successful ===")
logger.debug(f"FINAL WORKFLOW SUMMARY: {json.dumps(workflow_summary, indent=2, ensure_ascii=False)}")
return workflow_summary
@@ -1983,3 +1795,534 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
'error': str(e),
'phase': 'execution'
}
+
+ def _generateTaskFailureFeedback(self, task_step: Dict[str, Any], task_result: Dict[str, Any], task_number: int, total_tasks: int) -> str:
+ """Generate detailed feedback for task failure"""
+
+ successful_actions = len(task_result.get('successful_actions', []))
+ failed_actions = len(task_result.get('failed_actions', []))
+ retry_count = task_result.get('retry_count', 0)
+ reason = task_result.get('reason', 'Unknown error')
+
+ feedback = f"""
+Workflow execution stopped due to task failure.
+
+PROGRESS: {task_number}/{total_tasks} tasks completed successfully
+FAILED TASK: {task_step.get('description', 'Unknown')}
+
+FAILURE DETAILS:
+- Retry attempts: {retry_count}
+- Successful actions: {successful_actions}
+- Failed actions: {failed_actions}
+- Failure reason: {reason}
+
+The workflow cannot continue because this task is essential for subsequent steps.
+Please review the task requirements and try again with different input or approach.
+"""
+
+ return feedback
+
+ # ===== NEW STATE MANAGEMENT AND VALIDATION CLASSES =====
+
+ async def executeTaskWithStateManagement(self, task_step: Dict[str, Any], workflow: ChatWorkflow, context: Dict[str, Any]) -> Dict[str, Any]:
+ """Execute task with state management and action-level retry logic"""
+ try:
+ logger.info(f"Executing task with state management: {task_step.get('description', 'Unknown')}")
+
+ # Initialize task state
+ state = TaskExecutionState(task_step)
+
+ while state.canRetry():
+ logger.info(f"Task execution attempt {state.retry_count + 1}/{state.max_retries + 1}")
+
+ # Generate actions (first time or after regeneration)
+ if state.retry_count == 0:
+ actions = await self.defineTaskActions(task_step, workflow, context.get('previous_results', []))
+ else:
+ # Regenerate actions with failure context
+ actions = await self._regenerateTaskActionsWithFailureContext(task_step, state, context)
+
+ if not actions:
+ logger.warning(f"No actions defined for task, marking as failed")
+ return {
+ 'status': 'failed',
+ 'reason': 'No actions could be generated for task',
+ 'successful_actions': state.successful_actions,
+ 'failed_actions': state.failed_actions,
+ 'retry_count': state.retry_count
+ }
+
+ # Execute actions with individual validation
+ for i, action in enumerate(actions):
+ logger.info(f"Executing action {i+1}/{len(actions)}: {action.execMethod}.{action.execAction}")
+
+ # Execute action with validation
+ result = await self.executeActionWithValidation(action, workflow, context)
+
+ if result['validation']['status'] == 'success':
+ state.addSuccessfulAction(result)
+ logger.info(f"Action {i+1} completed successfully")
+
+ elif result['validation']['status'] == 'retry':
+ # Retry individual action
+ improvements = result['validation'].get('improvements', [])
+ retry_result = await self.retryActionWithImprovements(action, result, improvements)
+
+ if retry_result['validation']['status'] == 'success':
+ state.addSuccessfulAction(retry_result)
+ logger.info(f"Action {i+1} retry successful")
+ else:
+ state.addFailedAction(retry_result)
+ logger.error(f"Action {i+1} retry failed")
+ # Action failed after retry - stop task execution and regenerate
+ break
+
+ else: # fail
+ state.addFailedAction(result)
+ logger.error(f"Action {i+1} failed validation - stopping task execution")
+ # Action failed - stop task execution and regenerate
+ break
+
+ # Validate task completion
+ task_validation = await self._validateTaskCompletion(state.successful_actions, task_step, workflow)
+
+ if task_validation['status'] == 'success':
+ logger.info(f"Task completed successfully with {len(state.successful_actions)} successful actions")
+ return {
+ 'status': 'success',
+ 'successful_actions': state.successful_actions,
+ 'failed_actions': state.failed_actions,
+ 'retry_count': state.retry_count,
+ 'quality_metrics': task_validation.get('quality_metrics', {})
+ }
+
+ elif task_validation['status'] == 'retry':
+ state.improvements = task_validation.get('improvements', [])
+ state.incrementRetryCount()
+ logger.info(f"Task needs retry. Improvements: {state.improvements}")
+ # Preserve successful actions for next iteration
+ continue
+
+ else: # fail
+ logger.error(f"Task failed permanently")
+ return {
+ 'status': 'failed',
+ 'reason': task_validation.get('reason', 'Task validation failed'),
+ 'successful_actions': state.successful_actions,
+ 'failed_actions': state.failed_actions,
+ 'retry_count': state.retry_count
+ }
+
+ # Max retries reached
+ logger.error(f"Task failed after {state.max_retries} retries")
+ return {
+ 'status': 'failed',
+ 'reason': f'Task failed after {state.max_retries} retries',
+ 'successful_actions': state.successful_actions,
+ 'failed_actions': state.failed_actions,
+ 'retry_count': state.retry_count
+ }
+
+ except Exception as e:
+ logger.error(f"Error in task execution with state management: {str(e)}")
+ return {
+ 'status': 'failed',
+ 'reason': f'Task execution error: {str(e)}',
+ 'successful_actions': [],
+ 'failed_actions': [],
+ 'retry_count': 0
+ }
+
+ async def _regenerateTaskActionsWithFailureContext(self, task_step: Dict[str, Any], state: TaskExecutionState, context: Dict[str, Any]) -> List[TaskAction]:
+ """Regenerate task actions with failure context and improvements"""
+ try:
+ logger.info(f"Regenerating actions for task with failure context")
+
+ # Analyze failure patterns
+ failure_patterns = state.getFailurePatterns()
+
+ # Create enhanced context with failure information
+ enhanced_context = {
+ 'task_step': task_step,
+ 'workflow': context.get('workflow'),
+ 'workflow_id': context.get('workflow_id'),
+ 'available_documents': context.get('available_documents', []),
+ 'previous_results': state.getAvailableResults(),
+ 'improvements': state.improvements,
+ 'retry_count': state.retry_count,
+ 'failure_patterns': failure_patterns,
+ 'failed_actions': state.failed_actions,
+ 'successful_actions': state.successful_actions,
+ 'is_regeneration': True
+ }
+
+ # Generate new actions with failure avoidance
+ actions = await self.defineTaskActions(task_step, context.get('workflow'), state.getAvailableResults(), enhanced_context)
+
+ logger.info(f"Regenerated {len(actions)} actions with failure context")
+ return actions
+
+ except Exception as e:
+ logger.error(f"Error regenerating task actions: {str(e)}")
+ return []
+
+ async def _validateTaskCompletion(self, successful_actions: List[Dict[str, Any]], task_step: Dict[str, Any], workflow: ChatWorkflow) -> Dict[str, Any]:
+ """Validate if task is completed successfully"""
+ try:
+ logger.info(f"Validating task completion: {task_step.get('description', 'Unknown')}")
+
+ # Create task result summary
+ task_result = {
+ 'task_step': task_step,
+ 'successful_actions': successful_actions,
+ 'successful_count': len(successful_actions),
+ 'expected_outputs': task_step.get('expected_outputs', []),
+ 'success_criteria': task_step.get('success_criteria', [])
+ }
+
+ # Use AI to validate task completion
+ prompt = self._createTaskCompletionValidationPrompt(task_result, task_step)
+ response = await self._callAIWithCircuitBreaker(prompt, "task_completion_validation")
+
+ # Parse validation result
+ validation = self._parseTaskValidationResponse(response)
+
+ # Add quality metrics
+ validation['quality_metrics'] = self._calculateTaskQualityMetrics(task_step, successful_actions)
+
+ logger.info(f"Task completion validation: {validation.get('status', 'unknown')}")
+ return validation
+
+ except Exception as e:
+ logger.error(f"Error validating task completion: {str(e)}")
+ return {
+ 'status': 'success', # Default to success to avoid blocking
+ 'reason': f'Validation failed: {str(e)}',
+ 'quality_metrics': {'score': 5, 'confidence': 0.5}
+ }
+
+ def _createTaskCompletionValidationPrompt(self, task_result: Dict[str, Any], task_step: Dict[str, Any]) -> str:
+ """Create prompt for task completion validation"""
+
+ successful_actions = task_result['successful_actions']
+ expected_outputs = task_result['expected_outputs']
+ success_criteria = task_result['success_criteria']
+
+ # Summarize successful actions
+ action_summary = []
+ for action in successful_actions:
+ action_summary.append({
+ 'method': action.get('actionMethod', ''),
+ 'action': action.get('actionName', ''),
+ 'result_label': action.get('resultLabel', ''),
+ 'documents_count': len(action.get('documents', [])),
+ 'has_text_result': bool(action.get('result', '').strip())
+ })
+
+ return f"""You are a task completion validator that evaluates if a task was successfully completed.
+
+TASK DETAILS:
+- Description: {task_step.get('description', 'Unknown')}
+- Expected Outputs: {', '.join(expected_outputs)}
+- Success Criteria: {', '.join(success_criteria)}
+
+SUCCESSFUL ACTIONS ({len(successful_actions)}):
+{json.dumps(action_summary, indent=2)}
+
+VALIDATION QUESTIONS:
+1. Were all expected outputs produced?
+2. Are the success criteria met?
+3. Do the action results collectively accomplish the task goal?
+4. Is the task ready for handover to the next task?
+
+REQUIRED JSON RESPONSE:
+{{
+ "status": "success|retry|fail",
+ "reason": "Detailed explanation of the validation decision",
+ "improvements": ["specific improvement 1", "specific improvement 2"],
+ "missing_outputs": ["output1", "output2"],
+ "met_criteria": ["criteria1", "criteria2"],
+ "unmet_criteria": ["criteria3", "criteria4"],
+ "quality_score": 1-10,
+ "confidence": 0.0-1.0
+}}
+
+NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
+
+ def _parseTaskValidationResponse(self, response: str) -> Dict[str, Any]:
+ """Parse task validation response"""
+ try:
+ # Extract JSON from response
+ json_start = response.find('{')
+ json_end = response.rfind('}') + 1
+ if json_start == -1 or json_end == 0:
+ raise ValueError("No JSON found in task validation response")
+
+ json_str = response[json_start:json_end]
+ validation = json.loads(json_str)
+
+ # Validate structure
+ if 'status' not in validation:
+ raise ValueError("Task validation response missing 'status' field")
+
+ # Set default values
+ validation.setdefault('reason', 'No reason provided')
+ validation.setdefault('improvements', [])
+ validation.setdefault('missing_outputs', [])
+ validation.setdefault('met_criteria', [])
+ validation.setdefault('unmet_criteria', [])
+ validation.setdefault('quality_score', 5)
+ validation.setdefault('confidence', 0.5)
+
+ return validation
+
+ except Exception as e:
+ logger.error(f"Error parsing task validation response: {str(e)}")
+ return {
+ 'status': 'success',
+ 'reason': f'Parse error: {str(e)}',
+ 'improvements': [],
+ 'missing_outputs': [],
+ 'met_criteria': [],
+ 'unmet_criteria': [],
+ 'quality_score': 5,
+ 'confidence': 0.5
+ }
+
+ # ===== FIX MISSING METHOD CALL =====
+
+ async def _executeSingleActionWithRetry(self, action: TaskAction, workflow: ChatWorkflow) -> Dict[str, Any]:
+ """Execute single action with retry mechanism - FIXED"""
+ try:
+ logger.info(f"Executing action with retry: {action.execMethod}.{action.execAction}")
+
+ # Create context for validation
+ context = {
+ 'task_description': 'Action execution',
+ 'previous_results': [],
+ 'action_index': 0,
+ 'total_actions': 1
+ }
+
+ # Execute action with validation
+ result = await self.executeActionWithValidation(action, workflow, context)
+
+ # If validation indicates retry, attempt retry
+ if result['validation']['status'] == 'retry':
+ improvements = result['validation'].get('improvements', [])
+ retry_result = await self.retryActionWithImprovements(action, result, improvements)
+ return retry_result
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Error executing action with retry: {str(e)}")
+ action.setError(str(e))
+ return {
+ "status": "failed",
+ "error": str(e),
+ "actionId": action.id,
+ "actionMethod": action.execMethod,
+ "actionName": action.execAction,
+ "documents": [],
+ "validation": {
+ 'status': 'fail',
+ 'reason': f'Execution error: {str(e)}',
+ 'confidence': 0.0,
+ 'improvements': [],
+ 'quality_score': 0,
+ 'missing_elements': [],
+ 'suggested_retry_approach': ''
+ }
+ }
+
+ # ===== REPLACE OLD executeTaskActions METHOD =====
+
+ async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[Dict[str, Any]]:
+ """Execute task actions with individual validation and retry logic"""
+ try:
+ logger.info(f"Executing {len(task_actions)} task actions with validation")
+
+ results = []
+ for i, action in enumerate(task_actions):
+ logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}")
+
+ # Create context for validation
+ context = {
+ 'task_description': 'Action execution',
+ 'previous_results': [r.get('resultLabel', '') for r in results if r.get('resultLabel')],
+ 'action_index': i,
+ 'total_actions': len(task_actions)
+ }
+
+ # Execute action with validation
+ result = await self.executeActionWithValidation(action, workflow, context)
+ results.append(result)
+
+ # If action failed after validation, continue with next action
+ if result['validation']['status'] == 'fail':
+ logger.error(f"Action {i+1} failed validation, continuing with next action")
+ continue
+
+ logger.info(f"Task action execution completed: {len(results)} results")
+ return results
+
+ except Exception as e:
+ logger.error(f"Error executing task actions: {str(e)}")
+ return []
+
+ # ===== RESTRUCTURED ACTION EXECUTION WITH VALIDATION =====
+
+ async def executeActionWithValidation(self, action: TaskAction, workflow: ChatWorkflow, context: Dict[str, Any]) -> Dict[str, Any]:
+ """Execute single action with immediate validation"""
+ try:
+ logger.info(f"Executing action: {action.execMethod}.{action.execAction}")
+
+ # Execute the action
+ result = await self._executeSingleAction(action, workflow)
+
+ # Validate the result immediately
+ validator = ActionValidator(self)
+ validation = await validator.validateActionResult(result, action, context)
+
+ # Add validation result to action result
+ result['validation'] = validation
+
+ # Update action status based on validation
+ if validation['status'] == 'success':
+ action.setSuccess()
+ logger.info(f"Action {action.execMethod}.{action.execAction} validated successfully")
+ elif validation['status'] == 'retry':
+ action.status = TaskStatus.PENDING # Keep pending for retry
+ logger.warning(f"Action {action.execMethod}.{action.execAction} needs retry: {validation.get('reason', 'No reason')}")
+ else: # fail
+ action.setError(validation.get('reason', 'Action failed validation'))
+ logger.error(f"Action {action.execMethod}.{action.execAction} failed validation: {validation.get('reason', 'No reason')}")
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Error executing action with validation: {str(e)}")
+ action.setError(str(e))
+ return {
+ "status": "failed",
+ "error": str(e),
+ "actionId": action.id,
+ "actionMethod": action.execMethod,
+ "actionName": action.execAction,
+ "documents": [],
+ "validation": {
+ 'status': 'fail',
+ 'reason': f'Execution error: {str(e)}',
+ 'confidence': 0.0,
+ 'improvements': [],
+ 'quality_score': 0,
+ 'missing_elements': [],
+ 'suggested_retry_approach': ''
+ }
+ }
+
+ async def retryActionWithImprovements(self, action: TaskAction, previous_result: Dict[str, Any], improvements: List[str]) -> Dict[str, Any]:
+ """Retry action with improvements based on previous failure"""
+ try:
+ logger.info(f"Retrying action {action.execMethod}.{action.execAction} with improvements")
+
+ # Apply improvements to action parameters
+ enhanced_action = self._enhanceActionWithImprovements(action, improvements, previous_result)
+
+ # Execute enhanced action
+ result = await self._executeSingleAction(enhanced_action, self.workflow)
+
+ # Validate the retry result
+ validator = ActionValidator(self)
+ context = {
+ 'task_description': 'Action retry',
+ 'previous_results': [],
+ 'retry_count': 1
+ }
+ validation = await validator.validateActionResult(result, enhanced_action, context)
+
+ # Add validation and retry metadata
+ result['validation'] = validation
+ result['is_retry'] = True
+ result['previous_error'] = previous_result.get('error', '')
+ result['applied_improvements'] = improvements
+
+ # Update action status
+ if validation['status'] == 'success':
+ enhanced_action.setSuccess()
+ logger.info(f"Action retry successful: {enhanced_action.execMethod}.{enhanced_action.execAction}")
+ else:
+ enhanced_action.setError(validation.get('reason', 'Retry failed'))
+ logger.error(f"Action retry failed: {enhanced_action.execMethod}.{enhanced_action.execAction}")
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Error retrying action: {str(e)}")
+ action.setError(str(e))
+ return {
+ "status": "failed",
+ "error": str(e),
+ "actionId": action.id,
+ "actionMethod": action.execMethod,
+ "actionName": action.execAction,
+ "documents": [],
+ "is_retry": True,
+ "validation": {
+ 'status': 'fail',
+ 'reason': f'Retry execution error: {str(e)}',
+ 'confidence': 0.0,
+ 'improvements': [],
+ 'quality_score': 0,
+ 'missing_elements': [],
+ 'suggested_retry_approach': ''
+ }
+ }
+
+ def _enhanceActionWithImprovements(self, action: TaskAction, improvements: List[str], previous_result: Dict[str, Any]) -> TaskAction:
+ """Enhance action parameters based on improvements and previous failure"""
+ enhanced_action = TaskAction(
+ id=action.id,
+ execMethod=action.execMethod,
+ execAction=action.execAction,
+ execParameters=action.execParameters.copy(), # Copy to avoid modifying original
+ execResultLabel=action.execResultLabel,
+ status=action.status
+ )
+
+ # Apply improvements based on failure patterns
+ for improvement in improvements:
+ if "incomplete_analysis" in improvement.lower():
+ # Enhance AI prompt for more comprehensive analysis
+ current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
+ enhanced_prompt = current_prompt + "\n\nIMPORTANT: Provide comprehensive, detailed analysis covering all aspects. Be thorough and include all relevant information."
+ enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
+
+ elif "missing_content" in improvement.lower():
+ # Add content validation to prompt
+ current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
+ enhanced_prompt = current_prompt + "\n\nVALIDATION: Ensure all content is extracted and no information is missed. Provide complete output."
+ enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
+
+ elif "wrong_format" in improvement.lower():
+ # Add format specification
+ current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
+ enhanced_prompt = current_prompt + "\n\nFORMAT: Provide output in structured, well-organized format with clear sections and proper formatting."
+ enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
+
+ elif "timeout" in improvement.lower():
+ # Add timeout handling
+ current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
+ enhanced_prompt = current_prompt + "\n\nEFFICIENCY: Provide concise but complete analysis. Focus on key information and avoid unnecessary details."
+ enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
+
+ # Apply specific improvements from validation
+ validation = previous_result.get('validation', {})
+ suggested_approach = validation.get('suggested_retry_approach', '')
+ if suggested_approach:
+ current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
+ enhanced_prompt = current_prompt + f"\n\nRETRY APPROACH: {suggested_approach}"
+ enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
+
+ return enhanced_action
diff --git a/modules/workflow/managerWorkflow.py b/modules/workflow/managerWorkflow.py
index ad06111e..e35f064a 100644
--- a/modules/workflow/managerWorkflow.py
+++ b/modules/workflow/managerWorkflow.py
@@ -42,7 +42,7 @@ class WorkflowManager:
message = await self._sendFirstMessage(userInput, workflow)
# Execute unified workflow
- workflow_result = await self.chatManager.executeUnifiedWorkflow(userInput.prompt, workflow)
+ workflow_result = await self.chatManager.executeUnifiedWorkflow(userInput, workflow)
# Process workflow results
await self._processWorkflowResults(workflow, workflow_result, message)