2600 lines
122 KiB
Python
2600 lines
122 KiB
Python
import asyncio
|
|
import logging
|
|
import uuid
|
|
import json
|
|
import time
|
|
from typing import Dict, Any, Optional, List, Union
|
|
from datetime import datetime, UTC
|
|
|
|
from modules.interfaces.interfaceAppModel import User
|
|
from modules.interfaces.interfaceChatModel import (
|
|
TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, UserInputRequest, ActionResult,
|
|
ExtractedContent, ContentItem, ContentMetadata, DocumentExchange, TaskStep, TaskContext, ActionExecutionResult, ReviewContext, ReviewResult, TaskPlan, WorkflowResult
|
|
)
|
|
from modules.chat.serviceCenter import ServiceCenter
|
|
from modules.interfaces.interfaceChatObjects import ChatObjects
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ===== STATE MANAGEMENT AND VALIDATION CLASSES =====
|
|
|
|
class TaskExecutionState:
|
|
"""Manages state during task execution with retry logic"""
|
|
def __init__(self, task_step: TaskStep):
|
|
self.task_step = task_step
|
|
self.successful_actions: List[ActionExecutionResult] = [] # Preserved across retries
|
|
self.failed_actions: List[ActionExecutionResult] = [] # For analysis
|
|
self.current_action_index = 0
|
|
self.retry_count = 0
|
|
self.improvements = []
|
|
self.partial_results = {} # Store intermediate results
|
|
self.max_retries = 3
|
|
def addSuccessfulAction(self, action_result: ActionExecutionResult):
|
|
self.successful_actions.append(action_result)
|
|
if action_result.data.get('resultLabel'):
|
|
self.partial_results[action_result.data['resultLabel']] = action_result
|
|
def addFailedAction(self, action_result: ActionExecutionResult):
|
|
self.failed_actions.append(action_result)
|
|
def getAvailableResults(self) -> list:
|
|
return [result.data.get('resultLabel', '') for result in self.successful_actions if result.data.get('resultLabel')]
|
|
def shouldRetryTask(self) -> bool:
|
|
return len(self.successful_actions) > 0 and len(self.failed_actions) > 0
|
|
def canRetry(self) -> bool:
|
|
return self.retry_count < self.max_retries
|
|
def incrementRetryCount(self):
|
|
self.retry_count += 1
|
|
def getFailurePatterns(self) -> list:
|
|
patterns = []
|
|
for action in self.failed_actions:
|
|
error = action.error.lower() if action.error else ''
|
|
if "timeout" in error:
|
|
patterns.append("timeout_issues")
|
|
elif "document_not_found" in error or "file not found" in error:
|
|
patterns.append("document_reference_issues")
|
|
elif "empty_result" in error or "no content" in error:
|
|
patterns.append("content_extraction_issues")
|
|
elif "invalid_format" in error or "wrong format" in error:
|
|
patterns.append("format_issues")
|
|
elif "permission" in error or "access denied" in error:
|
|
patterns.append("permission_issues")
|
|
return list(set(patterns))
|
|
|
|
class ActionValidator:
|
|
"""Generic AI-based action result validation"""
|
|
def __init__(self, chat_manager):
|
|
self.chat_manager = chat_manager
|
|
|
|
async def validateActionResult(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> dict:
|
|
"""Generic action validation using AI"""
|
|
try:
|
|
# Create generic validation prompt
|
|
prompt = self._createGenericValidationPrompt(action_result, action, context)
|
|
response = await self.chat_manager._callAIWithCircuitBreaker(prompt, "action_validation")
|
|
validation = self._parseValidationResponse(response)
|
|
|
|
# Add action metadata
|
|
validation['action_id'] = action.id
|
|
validation['action_method'] = action.execMethod
|
|
validation['action_name'] = action.execAction
|
|
validation['result_label'] = action.execResultLabel
|
|
|
|
return validation
|
|
except Exception as e:
|
|
logger.error(f"Error validating action result: {str(e)}")
|
|
return {
|
|
'status': 'success',
|
|
'reason': f'Validation failed: {str(e)}',
|
|
'confidence': 0.5,
|
|
'improvements': [],
|
|
'action_id': action.id,
|
|
'action_method': action.execMethod,
|
|
'action_name': action.execAction,
|
|
'result_label': action.execResultLabel
|
|
}
|
|
|
|
def _createGenericValidationPrompt(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> str:
|
|
"""Create a validation prompt focused on result file delivery"""
|
|
# Extract data from ActionResult model
|
|
success = action_result.success
|
|
result_data = action_result.data
|
|
error = action_result.error
|
|
validation_messages = action_result.validation
|
|
|
|
# Extract result text from data
|
|
result_text = result_data.get("result", "") if isinstance(result_data, dict) else str(result_data)
|
|
|
|
# Get documents from ActionResult data
|
|
documents = result_data.get("documents", []) if isinstance(result_data, dict) else []
|
|
doc_count = len(documents)
|
|
|
|
# Extract expected result format from action parameters
|
|
expected_result_label = action.execResultLabel
|
|
expected_format = action.execParameters.get('outputFormat', 'unknown')
|
|
|
|
# Extract expected document formats from action
|
|
expected_document_formats = action.expectedDocumentFormats or []
|
|
|
|
# Check if the result label is present in the action result data
|
|
actual_result_label = result_data.get("resultLabel", "") if isinstance(result_data, dict) else ""
|
|
result_label_match = actual_result_label == expected_result_label
|
|
|
|
# Analyze delivered documents and content
|
|
delivered_files = []
|
|
delivered_formats = []
|
|
content_items = []
|
|
|
|
# Check for ChatDocument objects
|
|
for doc in documents:
|
|
if hasattr(doc, 'filename'):
|
|
delivered_files.append(doc.filename)
|
|
# Extract format information
|
|
file_extension = self._getFileExtension(doc.filename)
|
|
mime_type = getattr(doc, 'mimeType', 'application/octet-stream')
|
|
delivered_formats.append({
|
|
'filename': doc.filename,
|
|
'extension': file_extension,
|
|
'mimeType': mime_type
|
|
})
|
|
elif isinstance(doc, dict) and 'filename' in doc:
|
|
delivered_files.append(doc['filename'])
|
|
file_extension = self._getFileExtension(doc['filename'])
|
|
mime_type = doc.get('mimeType', 'application/octet-stream')
|
|
delivered_formats.append({
|
|
'filename': doc['filename'],
|
|
'extension': file_extension,
|
|
'mimeType': mime_type
|
|
})
|
|
else:
|
|
delivered_files.append(f"document_{len(delivered_files)}")
|
|
delivered_formats.append({
|
|
'filename': f"document_{len(delivered_files)}",
|
|
'extension': 'unknown',
|
|
'mimeType': 'application/octet-stream'
|
|
})
|
|
|
|
# Check for ExtractedContent in result data
|
|
if isinstance(result_data, dict):
|
|
if 'extractedContent' in result_data:
|
|
extracted_content = result_data['extractedContent']
|
|
if hasattr(extracted_content, 'contents'):
|
|
content_items = extracted_content.contents
|
|
elif 'contents' in result_data:
|
|
content_items = result_data['contents']
|
|
|
|
# If we have delivered files but no content items, consider it successful
|
|
# This handles the case where content is stored in files rather than result data
|
|
if delivered_files and not content_items:
|
|
content_items = [f"File content available in: {', '.join(delivered_files)}"]
|
|
|
|
# Analyze content items
|
|
content_summary = []
|
|
for item in content_items:
|
|
if hasattr(item, 'label') and hasattr(item, 'metadata'):
|
|
content_summary.append(f"{item.label}: {item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else 'unknown'}")
|
|
elif isinstance(item, str):
|
|
content_summary.append(item)
|
|
else:
|
|
content_summary.append(str(item))
|
|
|
|
return f"""You are an action result validator. Your primary focus is to validate that the action delivered the promised result files in the promised format.
|
|
|
|
ACTION DETAILS:
|
|
- Method: {action.execMethod}
|
|
- Action: {action.execAction}
|
|
- Expected Result Label: {expected_result_label}
|
|
- Actual Result Label: {actual_result_label}
|
|
- Result Label Match: {result_label_match}
|
|
- Expected Format: {expected_format}
|
|
- Expected Document Formats: {json.dumps(expected_document_formats, indent=2) if expected_document_formats else 'None specified'}
|
|
- Parameters: {json.dumps(action.execParameters, indent=2)}
|
|
|
|
RESULT TO VALIDATE:
|
|
- Success: {success}
|
|
- Result Data: {result_text[:500]}{'...' if len(result_text) > 500 else ''}
|
|
- Error: {error}
|
|
- Validation Messages: {', '.join(validation_messages) if validation_messages else 'None'}
|
|
- Documents Produced: {doc_count}
|
|
- Delivered Files: {', '.join(delivered_files) if delivered_files else 'None'}
|
|
- Delivered Formats: {json.dumps(delivered_formats, indent=2) if delivered_formats else 'None'}
|
|
- Content Items: {', '.join(content_summary) if content_summary else 'None'}
|
|
|
|
CRITICAL VALIDATION CRITERIA:
|
|
1. **Result Label Match**: Does the action result contain the expected result label?
|
|
2. **File Delivery**: Did the action deliver the promised result file(s)?
|
|
3. **Format Compliance**: If expected document formats were specified, do the delivered files match the expected formats?
|
|
4. **Content Quality**: Is the content of the delivered files usable and complete?
|
|
5. **Content Processing**: If content extraction was expected, was it performed correctly?
|
|
|
|
CONTEXT:
|
|
- Task Description: {context.task_step.description if context.task_step else 'Unknown'}
|
|
- Previous Results: {', '.join(context.previous_results) if context.previous_results else 'None'}
|
|
|
|
VALIDATION INSTRUCTIONS:
|
|
1. **Result Label Check**: Verify that the expected result label "{expected_result_label}" is present in the action result data. This is the primary success criterion.
|
|
2. **File Delivery**: Check if files were delivered when expected. The individual filenames don't need to match the result label - focus on whether content was actually produced.
|
|
3. **Format Compliance**: If expected document formats were specified, check if delivered files match the expected extensions and MIME types. If no formats were specified, this criterion is satisfied.
|
|
4. **Content Quality**: If files were delivered, consider the action successful. The presence of delivered files indicates content was processed and stored.
|
|
5. **Content Processing**: If files were delivered, assume content extraction was performed correctly. The file delivery is evidence of successful processing.
|
|
6. **Success Criteria**: The action is successful if the result label matches AND files were delivered. If expected formats were specified, they should also match.
|
|
|
|
IMPORTANT NOTES:
|
|
- The result label must be present in the action result data for success
|
|
- Individual filenames can be different from the result label
|
|
- If files were delivered, consider the action successful even if content details are not provided
|
|
- Focus on whether the action accomplished its intended purpose (file delivery)
|
|
- Empty files should be considered failures, but delivered files indicate success
|
|
|
|
REQUIRED JSON RESPONSE:
|
|
{{
|
|
"status": "success|retry|fail",
|
|
"reason": "Detailed explanation focusing on result label match and content quality",
|
|
"confidence": 0.0-1.0,
|
|
"improvements": ["specific improvements if needed"],
|
|
"quality_score": 1-10,
|
|
"missing_elements": ["missing result label", "missing files", "content issues"],
|
|
"suggested_retry_approach": "Specific approach for retry if status is retry"
|
|
}}
|
|
|
|
NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
|
|
|
|
def _parseValidationResponse(self, response: str) -> dict:
|
|
"""Parse the AI validation response"""
|
|
try:
|
|
json_start = response.find('{')
|
|
json_end = response.rfind('}') + 1
|
|
if json_start == -1 or json_end == 0:
|
|
raise ValueError("No JSON found in validation response")
|
|
|
|
json_str = response[json_start:json_end]
|
|
validation = json.loads(json_str)
|
|
|
|
if 'status' not in validation:
|
|
raise ValueError("Validation response missing 'status' field")
|
|
|
|
# Set defaults for optional fields
|
|
validation.setdefault('confidence', 0.5)
|
|
validation.setdefault('improvements', [])
|
|
validation.setdefault('quality_score', 5)
|
|
validation.setdefault('missing_elements', [])
|
|
validation.setdefault('suggested_retry_approach', '')
|
|
|
|
return validation
|
|
except Exception as e:
|
|
logger.error(f"Error parsing validation response: {str(e)}")
|
|
return {
|
|
'status': 'success',
|
|
'reason': f'Parse error: {str(e)}',
|
|
'confidence': 0.5,
|
|
'improvements': [],
|
|
'quality_score': 5,
|
|
'missing_elements': [],
|
|
'suggested_retry_approach': ''
|
|
}
|
|
|
|
def _getFileExtension(self, filename: str) -> str:
|
|
"""Extract file extension from filename"""
|
|
if '.' in filename:
|
|
return '.' + filename.split('.')[-1]
|
|
return ''
|
|
|
|
class ChatManager:
|
|
"""Chat manager with improved AI integration and method handling"""
|
|
|
|
def __init__(self, currentUser: User, chatInterface: ChatObjects):
|
|
self.currentUser = currentUser
|
|
self.chatInterface = chatInterface
|
|
self.service: ServiceCenter = None
|
|
self.workflow: ChatWorkflow = None
|
|
|
|
# Circuit breaker for AI calls
|
|
self.ai_failure_count = 0
|
|
self.ai_last_failure_time = None
|
|
self.ai_circuit_breaker_threshold = 5
|
|
self.ai_circuit_breaker_timeout = 300 # 5 minutes
|
|
|
|
# Timeout settings
|
|
self.ai_call_timeout = 120 # 2 minutes
|
|
self.task_execution_timeout = 600 # 10 minutes
|
|
|
|
# ===== Initialization and Setup =====
|
|
async def initialize(self, workflow: ChatWorkflow) -> None:
|
|
"""Initialize chat manager with workflow"""
|
|
self.workflow = workflow
|
|
self.service = ServiceCenter(self.currentUser, self.workflow)
|
|
|
|
# ===== WORKFLOW PHASES =====
|
|
|
|
# Phase 1: High-Level Task Planning
|
|
async def planHighLevelTasks(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan:
|
|
"""Phase 1: Plan high-level tasks from user input"""
|
|
try:
|
|
logger.info(f"Planning high-level tasks for workflow {workflow.id}")
|
|
|
|
# Create planning prompt
|
|
prompt = self._createTaskPlanningPrompt({
|
|
'user_request': userInput,
|
|
'available_documents': self._getAvailableDocuments(workflow),
|
|
'workflow_id': workflow.id
|
|
})
|
|
|
|
# Get AI response
|
|
response = await self.service.callAiTextAdvanced(prompt)
|
|
|
|
# Parse and validate task plan
|
|
task_plan_dict = self._parseTaskPlanResponse(response)
|
|
|
|
if not self._validateTaskPlan(task_plan_dict):
|
|
logger.error("Generated task plan failed validation")
|
|
raise Exception("AI-generated task plan failed validation - AI is required for task planning")
|
|
|
|
# Convert to TaskPlan model
|
|
tasks = []
|
|
for task_dict in task_plan_dict.get('tasks', []):
|
|
task = TaskStep(
|
|
id=task_dict.get('id', ''),
|
|
description=task_dict.get('description', ''),
|
|
dependencies=task_dict.get('dependencies', []),
|
|
expected_outputs=task_dict.get('expected_outputs', []),
|
|
success_criteria=task_dict.get('success_criteria', []),
|
|
required_documents=task_dict.get('required_documents', []),
|
|
estimated_complexity=task_dict.get('estimated_complexity'),
|
|
ai_prompt=task_dict.get('ai_prompt')
|
|
)
|
|
tasks.append(task)
|
|
|
|
task_plan = TaskPlan(
|
|
overview=task_plan_dict.get('overview', ''),
|
|
tasks=tasks
|
|
)
|
|
|
|
# Log the task plan as JSON for debugging
|
|
logger.info(f"Task plan created for workflow {workflow.id}:")
|
|
task_plan_json = {
|
|
'overview': task_plan.overview,
|
|
'tasks_count': len(task_plan.tasks),
|
|
'tasks': []
|
|
}
|
|
for task in task_plan.tasks:
|
|
task_json = {
|
|
'id': task.id,
|
|
'description': task.description,
|
|
'dependencies': task.dependencies or [],
|
|
'expected_outputs': task.expected_outputs or [],
|
|
'success_criteria': task.success_criteria or [],
|
|
'required_documents': task.required_documents or [],
|
|
'estimated_complexity': task.estimated_complexity or '',
|
|
'ai_prompt': task.ai_prompt or ''
|
|
}
|
|
task_plan_json['tasks'].append(task_json)
|
|
logger.info(f"Task Plan: {json.dumps(task_plan_json, indent=2, ensure_ascii=False)}")
|
|
|
|
logger.info(f"High-level task planning completed: {len(task_plan.tasks)} tasks")
|
|
return task_plan
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in high-level task planning: {str(e)}")
|
|
raise Exception(f"AI is required for task planning but failed: {str(e)}")
|
|
|
|
# Phase 2: Task Definition and Action Generation
|
|
async def defineTaskActions(self, task_step: TaskStep, workflow: ChatWorkflow, previous_results: List[str] = None,
|
|
enhanced_context: TaskContext = None) -> List[TaskAction]:
|
|
"""Phase 2: Define specific actions for a task step with enhanced retry context"""
|
|
try:
|
|
logger.info(f"Defining actions for task: {task_step.description if hasattr(task_step, 'description') else 'Unknown'}")
|
|
|
|
# Use enhanced context if provided (for retries), otherwise create basic context
|
|
if enhanced_context:
|
|
context = enhanced_context
|
|
else:
|
|
context = TaskContext(
|
|
task_step=task_step,
|
|
workflow=workflow,
|
|
workflow_id=workflow.id,
|
|
available_documents=self._getAvailableDocuments(workflow),
|
|
previous_results=previous_results or [],
|
|
improvements=[],
|
|
retry_count=0,
|
|
previous_action_results=[],
|
|
previous_review_result=None,
|
|
is_regeneration=False,
|
|
failure_patterns=[],
|
|
failed_actions=[],
|
|
successful_actions=[]
|
|
)
|
|
|
|
# Generate actions using AI
|
|
actions = await self._generateActionsForTaskStep(context)
|
|
|
|
# Log the generated actions as JSON for debugging
|
|
logger.info(f"Generated {len(actions)} actions for task '{task_step.description}':")
|
|
for i, action in enumerate(actions):
|
|
logger.info(f"Action {i+1}: {json.dumps(action, indent=2, ensure_ascii=False)}")
|
|
|
|
# Convert to TaskAction objects
|
|
task_actions = []
|
|
for action_dict in actions:
|
|
action_data = {
|
|
"execMethod": action_dict.get('method', 'unknown'),
|
|
"execAction": action_dict.get('action', 'unknown'),
|
|
"execParameters": action_dict.get('parameters', {}),
|
|
"execResultLabel": action_dict.get('resultLabel', ''),
|
|
"expectedDocumentFormats": action_dict.get('expectedDocumentFormats', None),
|
|
"status": TaskStatus.PENDING
|
|
}
|
|
|
|
task_action = self.chatInterface.createTaskAction(action_data)
|
|
if task_action:
|
|
task_actions.append(task_action)
|
|
logger.info(f"Created task action: {task_action.execMethod}.{task_action.execAction}")
|
|
|
|
# Update stats for task validation (estimate bytes for action validation)
|
|
if task_actions:
|
|
# Calculate actual action size for stats
|
|
action_size = self.service.calculateObjectSize(task_actions)
|
|
self.service.updateWorkflowStats(eventLabel="action", bytesSent=action_size)
|
|
|
|
# Log the final TaskAction objects as JSON
|
|
logger.info(f"Final TaskAction objects for task '{task_step.description}':")
|
|
for i, task_action in enumerate(task_actions):
|
|
action_json = {
|
|
'id': task_action.id,
|
|
'execMethod': task_action.execMethod,
|
|
'execAction': task_action.execAction,
|
|
'execParameters': task_action.execParameters,
|
|
'execResultLabel': task_action.execResultLabel,
|
|
'status': task_action.status.value if hasattr(task_action.status, 'value') else str(task_action.status)
|
|
}
|
|
logger.info(f"TaskAction {i+1}: {json.dumps(action_json, indent=2, ensure_ascii=False)}")
|
|
|
|
logger.info(f"Task action definition completed: {len(task_actions)} actions")
|
|
return task_actions
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error defining task actions: {str(e)}")
|
|
return []
|
|
|
|
# Phase 3: Action Execution
|
|
async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[ActionExecutionResult]:
|
|
"""Phase 3: Execute all actions for a task with retry mechanism"""
|
|
try:
|
|
logger.info(f"Executing {len(task_actions)} task actions")
|
|
|
|
results = []
|
|
for i, action in enumerate(task_actions):
|
|
logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}")
|
|
|
|
# Execute single action with retry mechanism
|
|
result = await self._executeSingleAction(action, workflow)
|
|
results.append(result)
|
|
|
|
# If action failed after all retries, continue with next action instead of stopping
|
|
if not result.success:
|
|
logger.error(f"Action {i+1} failed after retries, continuing with next action")
|
|
# Don't break - continue with remaining actions
|
|
continue
|
|
|
|
logger.info(f"Task action execution completed: {len(results)} results")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing task actions: {str(e)}")
|
|
return []
|
|
|
|
# Phase 4: Task Review and Quality Assessment
|
|
async def reviewTaskCompletion(self, task_step: TaskStep, task_actions: List[TaskAction],
|
|
action_results: List[ActionExecutionResult], workflow: ChatWorkflow) -> ReviewResult:
|
|
"""Phase 4: Review task completion and decide next steps"""
|
|
try:
|
|
logger.info(f"Reviewing task completion: {task_step.description}")
|
|
|
|
# Create step result summary from action results
|
|
step_result = {
|
|
'task_step': task_step,
|
|
'action_results': action_results,
|
|
'successful_actions': sum(1 for result in action_results if result.success),
|
|
'total_actions': len(action_results),
|
|
'results': [result.data.get('result', '') for result in action_results if result.success],
|
|
'errors': [result.error for result in action_results if not result.success]
|
|
}
|
|
|
|
# Prepare review context
|
|
review_context = ReviewContext(
|
|
task_step=task_step,
|
|
task_actions=task_actions,
|
|
action_results=action_results,
|
|
step_result=step_result,
|
|
workflow_id=workflow.id,
|
|
previous_results=self._getPreviousResultsFromActions(task_actions)
|
|
)
|
|
|
|
# Use AI to review the results
|
|
review = await self._performTaskReview(review_context)
|
|
|
|
# Add quality metrics
|
|
quality_metrics = self._calculateTaskQualityMetrics(task_step, action_results)
|
|
|
|
logger.info(f"Task review completed: {review.status}")
|
|
return ReviewResult(
|
|
status=review.status,
|
|
reason=review.reason,
|
|
improvements=review.improvements,
|
|
quality_score=review.quality_score,
|
|
missing_outputs=review.missing_outputs,
|
|
met_criteria=review.met_criteria,
|
|
unmet_criteria=review.unmet_criteria,
|
|
confidence=review.confidence
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reviewing task completion: {str(e)}")
|
|
return ReviewResult(
|
|
status='failed',
|
|
reason=f'Review failed: {str(e)}',
|
|
quality_score=0,
|
|
confidence=0
|
|
)
|
|
|
|
# Phase 5: Task Handover and State Management
|
|
async def prepareTaskHandover(self, task_step: TaskStep, task_actions: List[TaskAction],
|
|
review_result: ReviewResult, workflow: ChatWorkflow) -> Dict[str, Any]:
|
|
"""Phase 5: Prepare results for next task or workflow completion"""
|
|
try:
|
|
logger.info(f"Preparing task handover: {task_step.description}")
|
|
|
|
# Update task actions with results
|
|
for action in task_actions:
|
|
if action.status == TaskStatus.PENDING:
|
|
action.status = TaskStatus.COMPLETED if review_result.status == 'success' else TaskStatus.FAILED
|
|
|
|
# Create serializable task actions
|
|
task_actions_serializable = []
|
|
for action in task_actions:
|
|
action_dict = {
|
|
'id': action.id,
|
|
'execMethod': action.execMethod,
|
|
'execAction': action.execAction,
|
|
'execParameters': action.execParameters,
|
|
'execResultLabel': action.execResultLabel,
|
|
'status': action.status.value if hasattr(action.status, 'value') else str(action.status)
|
|
}
|
|
task_actions_serializable.append(action_dict)
|
|
|
|
# Create handover data
|
|
handover_data = {
|
|
'task_step': task_step,
|
|
'task_actions': task_actions_serializable,
|
|
'review_result': review_result,
|
|
'next_task_ready': review_result.status == 'success',
|
|
'available_results': self._getPreviousResultsFromActions(task_actions)
|
|
}
|
|
|
|
logger.info(f"Task handover prepared: next_task_ready={handover_data['next_task_ready']}")
|
|
return handover_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error preparing task handover: {str(e)}")
|
|
# Create serializable task actions for exception case
|
|
task_actions_serializable = []
|
|
for action in task_actions:
|
|
action_dict = {
|
|
'id': action.id,
|
|
'execMethod': action.execMethod,
|
|
'execAction': action.execAction,
|
|
'execParameters': action.execParameters,
|
|
'execResultLabel': action.execResultLabel,
|
|
'status': action.status.value if hasattr(action.status, 'value') else str(action.status)
|
|
}
|
|
task_actions_serializable.append(action_dict)
|
|
|
|
return {
|
|
'task_step': task_step,
|
|
'task_actions': task_actions_serializable,
|
|
'review_result': review_result,
|
|
'next_task_ready': False,
|
|
'available_results': []
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# ===== Utility Methods =====
|
|
|
|
async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]:
|
|
"""Process file IDs and return ChatDocument objects"""
|
|
documents = []
|
|
|
|
for fileId in fileIds:
|
|
try:
|
|
# Ensure service is initialized
|
|
if not hasattr(self, 'service') or not self.service:
|
|
logger.error(f"Service not initialized for file ID {fileId}")
|
|
continue
|
|
|
|
# Get file info from service
|
|
fileInfo = self.service.getFileInfo(fileId)
|
|
if fileInfo:
|
|
# Create document using interface
|
|
documentData = {
|
|
"fileId": fileId,
|
|
"filename": fileInfo.get("filename", "unknown"),
|
|
"fileSize": fileInfo.get("size", 0),
|
|
"mimeType": fileInfo.get("mimeType", "application/octet-stream")
|
|
}
|
|
document = self.chatInterface.createChatDocument(documentData)
|
|
if document:
|
|
documents.append(document)
|
|
logger.info(f"Processed file ID {fileId} -> {document.filename}")
|
|
else:
|
|
logger.warning(f"No file info found for file ID {fileId}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing file ID {fileId}: {str(e)}")
|
|
|
|
|
|
return documents
|
|
|
|
def setUserLanguage(self, language: str) -> None:
|
|
"""Set user language for the chat manager"""
|
|
if hasattr(self, 'service') and self.service:
|
|
self.service.user.language = language
|
|
|
|
# ===== Enhanced Task Planning Methods =====
|
|
|
|
async def _callAIWithCircuitBreaker(self, prompt: str, context: str) -> str:
|
|
"""Call AI with circuit breaker pattern for fault tolerance"""
|
|
try:
|
|
# Check circuit breaker
|
|
if self._isCircuitBreakerOpen():
|
|
raise Exception("AI circuit breaker is open - too many recent failures")
|
|
|
|
# Call AI with timeout
|
|
logger.debug(f"ACTION GENERATION PROMPT: {prompt}")
|
|
response = await asyncio.wait_for(
|
|
self._callAI(prompt, context),
|
|
timeout=self.ai_call_timeout
|
|
)
|
|
|
|
# Reset failure count on success
|
|
self.ai_failure_count = 0
|
|
return response
|
|
|
|
except asyncio.TimeoutError:
|
|
self._recordAIFailure("Timeout")
|
|
raise Exception(f"AI call timed out after {self.ai_call_timeout} seconds")
|
|
except Exception as e:
|
|
self._recordAIFailure(str(e))
|
|
raise
|
|
|
|
def _isCircuitBreakerOpen(self) -> bool:
|
|
"""Check if circuit breaker is open"""
|
|
if self.ai_failure_count >= self.ai_circuit_breaker_threshold:
|
|
if self.ai_last_failure_time:
|
|
time_since_failure = (datetime.now(UTC) - self.ai_last_failure_time).total_seconds()
|
|
if time_since_failure < self.ai_circuit_breaker_timeout:
|
|
return True
|
|
else:
|
|
# Reset circuit breaker after timeout
|
|
self.ai_failure_count = 0
|
|
self.ai_last_failure_time = None
|
|
return False
|
|
|
|
def _recordAIFailure(self, error: str):
|
|
"""Record AI failure for circuit breaker"""
|
|
self.ai_failure_count += 1
|
|
self.ai_last_failure_time = datetime.now(UTC)
|
|
logger.warning(f"AI failure recorded ({self.ai_failure_count}/{self.ai_circuit_breaker_threshold}): {error}")
|
|
|
|
def _validateTaskPlan(self, task_plan: Dict[str, Any]) -> bool:
|
|
"""Validate task plan structure and dependencies"""
|
|
try:
|
|
if not isinstance(task_plan, dict):
|
|
return False
|
|
|
|
if 'tasks' not in task_plan or not isinstance(task_plan['tasks'], list):
|
|
return False
|
|
|
|
# Check each task
|
|
task_ids = set()
|
|
for task in task_plan['tasks']:
|
|
if not isinstance(task, dict):
|
|
return False
|
|
|
|
required_fields = ['id', 'description', 'expected_outputs', 'success_criteria']
|
|
if not all(field in task for field in required_fields):
|
|
return False
|
|
|
|
# Check for duplicate task IDs
|
|
if task['id'] in task_ids:
|
|
return False
|
|
task_ids.add(task['id'])
|
|
|
|
# Validate dependencies
|
|
dependencies = task.get('dependencies', [])
|
|
if not isinstance(dependencies, list):
|
|
return False
|
|
|
|
# Check that dependencies reference existing tasks
|
|
for dep in dependencies:
|
|
if dep not in task_ids and dep != 'task_0': # Allow task_0 as special case
|
|
return False
|
|
|
|
# Validate ai_prompt if present (optional field)
|
|
if 'ai_prompt' in task and not isinstance(task['ai_prompt'], str):
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating task plan: {str(e)}")
|
|
return False
|
|
|
|
|
|
def _validateActions(self, actions: List[Dict[str, Any]], context: TaskContext) -> bool:
|
|
"""Validate generated actions"""
|
|
try:
|
|
if not isinstance(actions, list):
|
|
logger.error("Actions must be a list")
|
|
return False
|
|
|
|
if len(actions) == 0:
|
|
logger.warning("No actions generated")
|
|
return False
|
|
|
|
for i, action in enumerate(actions):
|
|
if not isinstance(action, dict):
|
|
logger.error(f"Action {i} must be a dictionary")
|
|
return False
|
|
|
|
# Check required fields
|
|
required_fields = ['method', 'action', 'parameters', 'resultLabel']
|
|
missing_fields = []
|
|
for field in required_fields:
|
|
if field not in action or not action[field]:
|
|
missing_fields.append(field)
|
|
|
|
if missing_fields:
|
|
logger.error(f"Action {i} missing required fields: {missing_fields}")
|
|
return False
|
|
|
|
# Validate result label format
|
|
result_label = action.get('resultLabel', '')
|
|
if not result_label.startswith('task'):
|
|
logger.error(f"Action {i} result label must start with 'task': {result_label}")
|
|
return False
|
|
|
|
# Validate parameters
|
|
parameters = action.get('parameters', {})
|
|
if not isinstance(parameters, dict):
|
|
logger.error(f"Action {i} parameters must be a dictionary")
|
|
return False
|
|
|
|
logger.info(f"Successfully validated {len(actions)} actions")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating actions: {str(e)}")
|
|
return False
|
|
|
|
|
|
|
|
# ===== Prompt Creation Methods =====
|
|
|
|
def _createTaskPlanningPrompt(self, context: Dict[str, Any]) -> str:
|
|
"""Create prompt for task planning"""
|
|
return f"""You are a task planning AI that analyzes user requests and creates structured task plans.
|
|
|
|
USER REQUEST: {context['user_request']}
|
|
|
|
AVAILABLE DOCUMENTS: {', '.join(context['available_documents'])}
|
|
|
|
INSTRUCTIONS:
|
|
1. Analyze the user request and available documents
|
|
2. Break down the request into 2-4 meaningful high-level task steps
|
|
3. Focus on business outcomes, not technical operations
|
|
4. For document processing, create ONE task with a comprehensive AI prompt rather than multiple granular tasks
|
|
5. Each task should produce meaningful, usable outputs
|
|
6. Ensure proper handover between tasks using result labels
|
|
7. Return a JSON object with the exact structure shown below
|
|
|
|
TASK PLANNING PRINCIPLES:
|
|
- Combine related operations into single tasks (e.g., "Extract and analyze all candidate profiles" instead of separate "read file" and "analyze content" tasks)
|
|
- Use comprehensive AI prompts for document processing rather than multiple small tasks
|
|
- Focus on business value and outcomes
|
|
- Keep tasks at a meaningful level of abstraction
|
|
- Each task should produce results that can be used by subsequent tasks
|
|
|
|
REQUIRED JSON STRUCTURE:
|
|
{{
|
|
"overview": "Brief description of the overall plan",
|
|
"tasks": [
|
|
{{
|
|
"id": "task_1",
|
|
"description": "Clear description of what this task accomplishes (business outcome)",
|
|
"dependencies": ["task_0"], // IDs of tasks that must complete first
|
|
"expected_outputs": ["output1", "output2"],
|
|
"success_criteria": ["criteria1", "criteria2"],
|
|
"required_documents": ["doc1", "doc2"],
|
|
"estimated_complexity": "low|medium|high",
|
|
"ai_prompt": "Comprehensive AI prompt for document processing tasks (if applicable)"
|
|
}}
|
|
]
|
|
}}
|
|
|
|
EXAMPLES OF GOOD TASK DESCRIPTIONS:
|
|
- "Extract and analyze all candidate profiles to identify key qualifications and experience"
|
|
- "Create evaluation matrix and rate candidates against product designer criteria"
|
|
- "Generate comprehensive PowerPoint presentation for management decision"
|
|
- "Store final presentation in SharePoint for specified account"
|
|
|
|
EXAMPLES OF BAD TASK DESCRIPTIONS:
|
|
- "Open and read the PDF file" (too granular)
|
|
- "Identify table structure" (technical detail)
|
|
- "Convert data to CSV format" (implementation detail)
|
|
|
|
NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
|
|
|
|
async def _createActionDefinitionPrompt(self, context: TaskContext) -> str:
|
|
"""Create prompt for action generation with enhanced document extraction guidance and retry context"""
|
|
task_step = context.task_step
|
|
workflow = context.workflow
|
|
available_docs = context.available_documents or []
|
|
previous_results = context.previous_results or []
|
|
improvements = context.improvements or []
|
|
retry_count = context.retry_count or 0
|
|
previous_action_results = context.previous_action_results or []
|
|
previous_review_result = context.previous_review_result
|
|
|
|
# Get available methods and actions with signatures
|
|
methodList = self.service.getMethodsList()
|
|
method_actions = {}
|
|
for sig in methodList:
|
|
if '.' in sig:
|
|
method, rest = sig.split('.', 1)
|
|
action = rest.split('(')[0]
|
|
method_actions.setdefault(method, []).append((action, sig))
|
|
|
|
# Get workflow history
|
|
messageSummary = await self.service.summarizeChat(workflow.messages)
|
|
|
|
# Get available documents and connections
|
|
docRefs = self.service.getDocumentReferenceList()
|
|
connRefs = self.service.getConnectionReferenceList()
|
|
all_doc_refs = docRefs.get('chat', []) + docRefs.get('history', [])
|
|
|
|
# Build AVAILABLE METHODS section
|
|
available_methods_str = ''
|
|
for method, actions in method_actions.items():
|
|
available_methods_str += f"- {method}:\n"
|
|
for action, sig in actions:
|
|
available_methods_str += f" - {action}: {sig}\n"
|
|
|
|
# Get AI prompt from task step if available
|
|
task_ai_prompt = task_step.ai_prompt or ''
|
|
|
|
# Build retry context section
|
|
retry_context = ""
|
|
if retry_count > 0:
|
|
retry_context = f"""
|
|
RETRY CONTEXT (Attempt {retry_count}):
|
|
Previous action results that failed or were incomplete:
|
|
"""
|
|
for i, result in enumerate(previous_action_results):
|
|
retry_context += f"- Action {i+1}: {result.actionMethod or 'unknown'}.{result.actionName or 'unknown'}\n"
|
|
retry_context += f" Status: {result.success and 'success' or 'failed'}\n"
|
|
retry_context += f" Error: {result.error or 'None'}\n"
|
|
retry_context += f" Result: {(result.data.get('result', '') if result.data else '')[:100]}...\n"
|
|
|
|
if previous_review_result:
|
|
retry_context += f"""
|
|
Previous review feedback:
|
|
- Status: {previous_review_result.status or 'unknown'}
|
|
- Reason: {previous_review_result.reason or 'No reason provided'}
|
|
- Quality Score: {previous_review_result.quality_score or 0}/10
|
|
- Missing Outputs: {', '.join(previous_review_result.missing_outputs or [])}
|
|
- Unmet Criteria: {', '.join(previous_review_result.unmet_criteria or [])}
|
|
"""
|
|
|
|
return f"""
|
|
You are an action generation AI that creates specific actions to accomplish a task step.
|
|
|
|
DOCUMENT REFERENCE TYPES:
|
|
- docItem: Reference to a single document. Format: "docItem:<id>:<filename>"
|
|
- docList: Reference to a group of documents under a label. Format: <label> (e.g., "task1_action2_results" or "docList:msg123:user_uploads").
|
|
- Each docList label maps to a list of docItem references (see AVAILABLE DOCUMENTS).
|
|
- A label like "task1_action2_results" refers to the output of action 2 in task 1.
|
|
|
|
TASK STEP: {task_step.description} (ID: {task_step.id})
|
|
EXPECTED OUTPUTS: {', '.join(task_step.expected_outputs or [])}
|
|
SUCCESS CRITERIA: {', '.join(task_step.success_criteria or [])}
|
|
TASK AI PROMPT: {task_ai_prompt if task_ai_prompt else 'None provided'}
|
|
|
|
CONTEXT - Chat History:
|
|
{messageSummary}
|
|
|
|
AVAILABLE METHODS AND ACTIONS (with signatures):
|
|
{available_methods_str}
|
|
|
|
AVAILABLE CONNECTIONS:
|
|
{chr(10).join(f"- {conn}" for conn in connRefs)}
|
|
|
|
AVAILABLE DOCUMENTS:
|
|
{chr(10).join(f"- {doc.documentsLabel} contains {', '.join(doc.documents)}" for doc in all_doc_refs)}
|
|
|
|
(Use the label as a value in documentList to refer to the group)
|
|
|
|
PREVIOUS RESULTS: {', '.join(previous_results) if previous_results else 'None'}
|
|
IMPROVEMENTS NEEDED: {improvements if improvements else 'None'}{retry_context}
|
|
|
|
ACTION GENERATION PRINCIPLES:
|
|
- Create meaningful actions per task step
|
|
- Use comprehensive AI prompts for document processing
|
|
- Focus on business outcomes, not technical operations
|
|
- Combine related operations into single actions when possible
|
|
- Use the task's AI prompt if provided, or create a comprehensive one
|
|
- Each action should produce meaningful, usable outputs
|
|
- For document extraction, ensure prompts are specific and detailed
|
|
- Include validation steps in extraction prompts
|
|
- If this is a retry, learn from previous failures and improve the approach
|
|
- Address specific issues mentioned in previous review feedback
|
|
- When specifying expectedDocumentFormats, ensure AI prompts explicitly request pure data without markdown formatting
|
|
|
|
INSTRUCTIONS:
|
|
- Generate actions to accomplish this task step using available documents, connections, and previous results
|
|
- Use docItem for single documents and docList labels for groups of documents as shown in AVAILABLE DOCUMENTS
|
|
- Always pass documentList as a LIST of references (docItem and/or docList)
|
|
- For resultLabel, use the format: "task{{task_id}}_action{{action_number}}_{{short_label}}" where:
|
|
- {{task_id}} = the current task's id (e.g., 1)
|
|
- {{action_number}} = the sequence number of the action within the task (e.g., 2)
|
|
- {{short_label}} = a short, descriptive label for the output (e.g., "analysis_results")
|
|
Example: "task1_action2_analysis_results"
|
|
- If this is a retry, ensure the new actions address the specific issues from previous attempts
|
|
- Follow the JSON structure below. All fields are required.
|
|
|
|
REQUIRED JSON STRUCTURE:
|
|
{{
|
|
"actions": [
|
|
{{
|
|
"method": "method_name", // Use only the method name (e.g., "document")
|
|
"action": "action_name", // Use only the action name (e.g., "extract")
|
|
"parameters": {{
|
|
"documentList": ["docItem:doc_abc:file1.txt", "task1_action2_results"],
|
|
"aiPrompt": "Comprehensive AI prompt describing what to accomplish"
|
|
}},
|
|
"resultLabel": "task1_action3_analysis_results",
|
|
"expectedDocumentFormats": [ // OPTIONAL: Specify expected document formats when needed
|
|
{{
|
|
"extension": ".csv",
|
|
"mimeType": "text/csv",
|
|
"description": "Structured data output"
|
|
}}
|
|
],
|
|
"description": "What this action accomplishes (business outcome)"
|
|
}}
|
|
]
|
|
}}
|
|
|
|
FIELD REQUIREMENTS:
|
|
- "method": Must be from AVAILABLE METHODS
|
|
- "action": Must be valid for the method
|
|
- "parameters": Method-specific, must include documentList as a list if required by the signature
|
|
- "resultLabel": Must follow the format above (e.g., "task1_action3_analysis_results")
|
|
- "expectedDocumentFormats": OPTIONAL - Only specify when you need to control output format
|
|
- Use when you need specific file types (e.g., CSV for data, JSON for structured output)
|
|
- Omit when format is flexible (e.g., folder queries with mixed file types)
|
|
- Each format should specify: extension, mimeType, description
|
|
- When using expectedDocumentFormats, ensure the aiPrompt explicitly requests pure data without markdown formatting
|
|
- "description": Clear summary of the business outcome
|
|
|
|
EXAMPLES OF GOOD ACTIONS:
|
|
|
|
1. Document analysis with specific output format (use expectedDocumentFormats):
|
|
{{
|
|
"method": "document",
|
|
"action": "extract",
|
|
"parameters": {{
|
|
"documentList": ["docItem:doc_57520394-6b6d-41c2-b641-bab3fc6d7f4b:candidate_1_profile.txt"],
|
|
"aiPrompt": "Extract and analyze the candidate's qualifications, experience, skills, and suitability for the product designer position. Identify key strengths, relevant experience, technical skills, and any areas of concern. Provide a comprehensive assessment that can be used for evaluation."
|
|
}},
|
|
"resultLabel": "task1_action1_candidate_analysis",
|
|
"expectedDocumentFormats": [
|
|
{{
|
|
"extension": ".json",
|
|
"mimeType": "application/json",
|
|
"description": "Structured candidate analysis data"
|
|
}}
|
|
],
|
|
"description": "Comprehensive analysis of candidate profile for evaluation"
|
|
}}
|
|
|
|
2. Multi-document processing with flexible output (omit expectedDocumentFormats):
|
|
{{
|
|
"method": "document",
|
|
"action": "extract",
|
|
"parameters": {{
|
|
"documentList": ["task1_action1_candidate_analysis", "task1_action2_candidate_analysis", "task1_action3_candidate_analysis"],
|
|
"aiPrompt": "Compare all three candidate profiles and create an evaluation matrix. Rate each candidate on technical skills, experience level, cultural fit, portfolio quality, and communication skills. Provide clear rankings and recommendations for the product designer position."
|
|
}},
|
|
"resultLabel": "task1_action4_evaluation_matrix",
|
|
"description": "Create comprehensive evaluation matrix comparing all candidates"
|
|
}}
|
|
|
|
3. Data extraction with specific CSV format:
|
|
{{
|
|
"method": "document",
|
|
"action": "extract",
|
|
"parameters": {{
|
|
"documentList": ["docItem:doc_abc:table_data.pdf"],
|
|
"aiPrompt": "Extract all table data and convert to structured CSV format with proper headers and data types. IMPORTANT: Deliver pure CSV data without any markdown formatting, code blocks, or additional text. Output only the CSV content with proper headers and data rows."
|
|
}},
|
|
"resultLabel": "task1_action2_structured_data",
|
|
"expectedDocumentFormats": [
|
|
{{
|
|
"extension": ".csv",
|
|
"mimeType": "text/csv",
|
|
"description": "Structured table data in CSV format"
|
|
}}
|
|
],
|
|
"description": "Extract and structure table data for analysis"
|
|
}}
|
|
|
|
NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
|
|
|
|
|
|
def _createResultReviewPrompt(self, review_context: ReviewContext) -> str:
|
|
"""Create prompt for result review"""
|
|
task_step = review_context.task_step
|
|
step_result = review_context.step_result or {}
|
|
|
|
# Create serializable version of step_result with only metadata (no document content)
|
|
step_result_serializable = {
|
|
'task_step': {
|
|
'id': task_step.id,
|
|
'description': task_step.description,
|
|
'expected_outputs': task_step.expected_outputs or [],
|
|
'success_criteria': task_step.success_criteria or []
|
|
},
|
|
'action_results': [],
|
|
'successful_actions': step_result.get('successful_actions', 0),
|
|
'total_actions': step_result.get('total_actions', 0),
|
|
'results_count': len(step_result.get('results', [])),
|
|
'errors_count': len(step_result.get('errors', []))
|
|
}
|
|
|
|
# Convert action_results to serializable format with only metadata (no document content)
|
|
for action_result in review_context.action_results or []:
|
|
# Extract only document metadata, not content
|
|
documents_metadata = []
|
|
for doc in action_result.documents or []:
|
|
if hasattr(doc, 'filename'):
|
|
documents_metadata.append({
|
|
'filename': doc.filename,
|
|
'fileSize': getattr(doc, 'fileSize', 0),
|
|
'mimeType': getattr(doc, 'mimeType', 'unknown')
|
|
})
|
|
elif isinstance(doc, dict):
|
|
documents_metadata.append({
|
|
'filename': doc.get('filename', 'unknown'),
|
|
'fileSize': doc.get('fileSize', 0),
|
|
'mimeType': doc.get('mimeType', 'unknown')
|
|
})
|
|
|
|
serializable_action_result = {
|
|
'status': 'completed' if action_result.success else 'failed',
|
|
'result_summary': action_result.data.get('result', '')[:200] + '...' if len(action_result.data.get('result', '')) > 200 else action_result.data.get('result', ''),
|
|
'error': action_result.error,
|
|
'resultLabel': action_result.data.get('resultLabel', ''),
|
|
'documents_count': len(documents_metadata),
|
|
'documents_metadata': documents_metadata,
|
|
'actionId': action_result.actionId,
|
|
'actionMethod': action_result.actionMethod,
|
|
'actionName': action_result.actionName,
|
|
'success_indicator': 'documents' if len(documents_metadata) > 0 else 'text_result' if action_result.data.get('result', '').strip() else 'none'
|
|
}
|
|
step_result_serializable['action_results'].append(serializable_action_result)
|
|
|
|
return f"""You are a result review AI that evaluates task step completion and decides on next actions.
|
|
|
|
TASK STEP: {task_step.description}
|
|
EXPECTED OUTPUTS: {', '.join(task_step.expected_outputs or [])}
|
|
SUCCESS CRITERIA: {', '.join(task_step.success_criteria or [])}
|
|
|
|
STEP RESULT: {json.dumps(step_result_serializable, indent=2, ensure_ascii=False)}
|
|
|
|
INSTRUCTIONS:
|
|
1. Evaluate if the task step was completed successfully
|
|
2. Check if all expected outputs were produced
|
|
3. Verify if success criteria were met
|
|
4. Decide on next action: continue, retry, or fail
|
|
5. If retry, provide specific improvements needed
|
|
|
|
IMPORTANT NOTES:
|
|
- Actions can produce either text results OR documents (or both)
|
|
- Empty result_summary is acceptable if documents were produced (documents_count > 0)
|
|
- Focus on whether the action achieved its intended purpose, not just text output
|
|
- Document-based actions (like file extractions) often have empty text results but successful document outputs
|
|
- Check the 'success_indicator' field: 'documents' means success via document output, 'text_result' means success via text, 'none' means no output
|
|
|
|
REQUIRED JSON STRUCTURE:
|
|
{{
|
|
"status": "success|retry|failed",
|
|
"reason": "Explanation of the decision",
|
|
"improvements": "Specific improvements for retry (if status is retry)",
|
|
"quality_score": 1-10,
|
|
"missing_outputs": ["output1", "output2"],
|
|
"met_criteria": ["criteria1", "criteria2"],
|
|
"unmet_criteria": ["criteria3", "criteria4"]
|
|
}}
|
|
|
|
NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
|
|
|
|
# ===== HELPER METHODS FOR WORKFLOW PHASES =====
|
|
|
|
async def _generateActionsForTaskStep(self, context: TaskContext) -> List[Dict[str, Any]]:
|
|
"""Generate actions for a specific task step with enhanced retry context"""
|
|
try:
|
|
# Prepare prompt for action generation
|
|
prompt = await self._createActionDefinitionPrompt(context)
|
|
|
|
# Call AI with circuit breaker
|
|
response = await self._callAIWithCircuitBreaker(prompt, "action_generation")
|
|
|
|
# Parse and validate actions
|
|
actions = self._parseActionResponse(response)
|
|
|
|
# Validate actions
|
|
if not self._validateActions(actions, context):
|
|
logger.error("Generated actions failed validation")
|
|
raise Exception("AI-generated actions failed validation - AI is required for action generation")
|
|
|
|
logger.info(f"Generated {len(actions)} actions for task step")
|
|
return actions
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating actions for task step: {str(e)}")
|
|
raise Exception(f"AI is required for action generation but failed: {str(e)}")
|
|
|
|
async def _executeSingleAction(self, action: TaskAction, workflow: ChatWorkflow) -> ActionResult:
|
|
"""Execute a single action and return ActionResult with enhanced document processing"""
|
|
try:
|
|
# Enhance parameters with expected document formats if specified
|
|
enhanced_parameters = action.execParameters.copy()
|
|
if action.expectedDocumentFormats:
|
|
enhanced_parameters['expectedDocumentFormats'] = action.expectedDocumentFormats
|
|
logger.info(f"Action {action.execMethod}.{action.execAction} expects formats: {action.expectedDocumentFormats}")
|
|
|
|
# Execute the actual method action using the service center
|
|
result = await self.service.executeAction(
|
|
methodName=action.execMethod,
|
|
actionName=action.execAction,
|
|
parameters=enhanced_parameters
|
|
)
|
|
|
|
# Always use the execResultLabel from the action definition
|
|
result_label = action.execResultLabel
|
|
|
|
# Update action based on result
|
|
if result.success:
|
|
action.setSuccess()
|
|
action.result = result.data.get("result", "")
|
|
action.execResultLabel = result_label
|
|
|
|
# Create and store message in workflow for successful action
|
|
await self._createActionMessage(action, result, workflow, result_label)
|
|
|
|
else:
|
|
action.setError(result.error or "Action execution failed")
|
|
|
|
# Enhanced result processing with better document handling
|
|
documents = result.data.get("documents", [])
|
|
processed_documents = []
|
|
|
|
# Process documents with better metadata extraction
|
|
for doc in documents:
|
|
if hasattr(doc, 'filename') and doc.filename:
|
|
# Document object with proper metadata
|
|
mime_type = getattr(doc, 'mimeType', 'application/octet-stream')
|
|
|
|
# Enhanced MIME type detection for document objects
|
|
if mime_type == "application/octet-stream":
|
|
mime_type = self._detectMimeTypeFromDocument(doc, doc.filename)
|
|
|
|
processed_documents.append({
|
|
'filename': doc.filename,
|
|
'fileSize': getattr(doc, 'fileSize', 0),
|
|
'mimeType': mime_type,
|
|
'content': getattr(doc, 'content', ''),
|
|
'document': doc
|
|
})
|
|
elif isinstance(doc, dict):
|
|
# Dictionary document with metadata
|
|
filename = doc.get('documentName', doc.get('filename', f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"))
|
|
fileSize = doc.get('fileSize', len(str(doc.get('documentData', ''))))
|
|
mimeType = doc.get('mimeType', 'application/octet-stream')
|
|
|
|
# Enhanced MIME type detection for dictionary documents
|
|
if mimeType == "application/octet-stream":
|
|
document_data = doc.get('documentData', '')
|
|
mimeType = self._detectMimeTypeFromContent(document_data, filename)
|
|
|
|
processed_documents.append({
|
|
'filename': filename,
|
|
'fileSize': fileSize,
|
|
'mimeType': mimeType,
|
|
'content': doc.get('documentData', ''),
|
|
'document': doc
|
|
})
|
|
else:
|
|
# Fallback for unknown document types
|
|
logger.warning(f"Unknown document type for action {action.execMethod}.{action.execAction}: {type(doc)}")
|
|
filename = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
|
|
mimeType = 'application/octet-stream'
|
|
|
|
# Try to detect MIME type for unknown document types
|
|
mimeType = self._detectMimeTypeFromContent(doc, filename)
|
|
|
|
processed_documents.append({
|
|
'filename': filename,
|
|
'fileSize': 0,
|
|
'mimeType': mimeType,
|
|
'content': str(doc),
|
|
'document': doc
|
|
})
|
|
|
|
# Create ActionResult with processed data
|
|
return ActionResult(
|
|
success=result.success,
|
|
data={
|
|
"result": result.data.get("result", ""),
|
|
"documents": processed_documents,
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction,
|
|
"resultLabel": result_label
|
|
},
|
|
metadata={
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction,
|
|
"resultLabel": result_label
|
|
},
|
|
validation=[],
|
|
error=result.error or ""
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing single action: {str(e)}")
|
|
action.setError(str(e))
|
|
return ActionResult(
|
|
success=False,
|
|
data={
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction,
|
|
"documents": []
|
|
},
|
|
metadata={
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction
|
|
},
|
|
validation=[],
|
|
error=str(e)
|
|
)
|
|
|
|
async def _createActionMessage(self, action: TaskAction, result: Any, workflow: ChatWorkflow, result_label: str = None) -> None:
|
|
"""Create and store a message for the action result in the workflow with enhanced document processing"""
|
|
try:
|
|
# Get result data
|
|
result_data = result.data if hasattr(result, 'data') else {}
|
|
documents_data = result_data.get("documents", [])
|
|
if result_label is None:
|
|
result_label = action.execResultLabel
|
|
|
|
# Create message data
|
|
message_data = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": f"Executed action {action.execMethod}.{action.execAction}",
|
|
"status": "step",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": datetime.now(UTC).isoformat(),
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction,
|
|
"documentsLabel": result_label, # Use intent label from action definition
|
|
"documents": []
|
|
}
|
|
|
|
# Process documents if any
|
|
if documents_data:
|
|
processed_documents = []
|
|
for doc_data in documents_data:
|
|
try:
|
|
# Handle different document data formats
|
|
if isinstance(doc_data, dict):
|
|
# Enhanced document processing for dictionary format
|
|
document_name = doc_data.get("documentName", doc_data.get("filename", f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"))
|
|
document_data = doc_data.get("documentData", {})
|
|
file_size = doc_data.get("fileSize", 0)
|
|
mime_type = doc_data.get("mimeType", "application/octet-stream")
|
|
elif hasattr(doc_data, 'filename'):
|
|
# Document object format
|
|
document_name = doc_data.filename
|
|
document_data = getattr(doc_data, 'content', {})
|
|
file_size = getattr(doc_data, 'fileSize', 0)
|
|
mime_type = getattr(doc_data, 'mimeType', "application/octet-stream")
|
|
else:
|
|
# Fallback for unknown formats
|
|
document_name = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
|
|
document_data = doc_data
|
|
file_size = len(str(doc_data))
|
|
mime_type = "application/octet-stream"
|
|
|
|
# Enhanced MIME type detection using service center
|
|
if mime_type == "application/octet-stream":
|
|
mime_type = self._detectMimeTypeFromContent(document_data, document_name)
|
|
|
|
# Convert document data to string content
|
|
content = self._convertDocumentDataToString(document_data, self._getFileExtension(document_name))
|
|
|
|
# Validate content before creating file
|
|
if not content or content.strip() == "":
|
|
logger.warning(f"Empty content for document {document_name}, skipping")
|
|
continue
|
|
|
|
# Create file in database
|
|
file_id = self.service.createFile(
|
|
fileName=document_name,
|
|
mimeType=mime_type,
|
|
content=content,
|
|
base64encoded=False
|
|
)
|
|
|
|
if not file_id:
|
|
logger.error(f"Failed to create file for document {document_name}")
|
|
continue
|
|
|
|
# Create ChatDocument object
|
|
document = self.service.createDocument(
|
|
fileName=document_name,
|
|
mimeType=mime_type,
|
|
content=content,
|
|
base64encoded=False
|
|
)
|
|
|
|
if document:
|
|
processed_documents.append(document)
|
|
logger.info(f"Created document: {document_name} with file ID: {file_id} and MIME type: {mime_type}")
|
|
else:
|
|
logger.error(f"Failed to create ChatDocument object for {document_name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing document {doc_data.get('documentName', 'unknown')}: {str(e)}")
|
|
continue
|
|
|
|
# Update message with processed documents
|
|
message_data["documents"] = processed_documents
|
|
|
|
# Create message using interface
|
|
message = self.chatInterface.createWorkflowMessage(message_data)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
logger.info(f"Created action message for {action.execMethod}.{action.execAction} with {len(message_data.get('documents', []))} documents")
|
|
else:
|
|
logger.error(f"Failed to create workflow message for action {action.execMethod}.{action.execAction}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating action message: {str(e)}")
|
|
|
|
def _getFileExtension(self, filename: str) -> str:
|
|
"""Extract file extension from filename"""
|
|
return self.service.getFileExtension(filename)
|
|
|
|
def _getMimeType(self, extension: str) -> str:
|
|
"""Get MIME type based on file extension"""
|
|
return self.service.getMimeTypeFromExtension(extension)
|
|
|
|
def _detectMimeTypeFromContent(self, content: Any, filename: str) -> str:
|
|
"""
|
|
Detect MIME type from content and filename using service center.
|
|
Only returns a detected MIME type if it's better than application/octet-stream.
|
|
|
|
Args:
|
|
content: Content data (string, dict, or other)
|
|
filename: Name of the file
|
|
|
|
Returns:
|
|
str: Detected MIME type or original if detection failed
|
|
"""
|
|
try:
|
|
# Convert content to bytes for MIME type detection
|
|
if isinstance(content, str):
|
|
file_bytes = content.encode('utf-8')
|
|
elif isinstance(content, dict):
|
|
import json
|
|
file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8')
|
|
else:
|
|
file_bytes = str(content).encode('utf-8')
|
|
|
|
# Use service center's MIME type detection
|
|
detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename)
|
|
if detected_mime_type != "application/octet-stream":
|
|
return detected_mime_type
|
|
return "application/octet-stream"
|
|
except Exception as e:
|
|
logger.warning(f"Error in MIME type detection for {filename}: {str(e)}")
|
|
return 'application/octet-stream'
|
|
|
|
def _detectMimeTypeFromDocument(self, document: Any, filename: str) -> str:
|
|
"""
|
|
Detect MIME type from document object using service center.
|
|
Only returns a detected MIME type if it's better than application/octet-stream.
|
|
|
|
Args:
|
|
document: Document object with content attribute
|
|
filename: Name of the file
|
|
|
|
Returns:
|
|
str: Detected MIME type or original if detection failed
|
|
"""
|
|
try:
|
|
# Get document content as bytes for MIME type detection
|
|
content = getattr(document, 'content', '')
|
|
if isinstance(content, str):
|
|
file_bytes = content.encode('utf-8')
|
|
else:
|
|
file_bytes = str(content).encode('utf-8')
|
|
|
|
# Use service center's MIME type detection
|
|
detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename)
|
|
if detected_mime_type != "application/octet-stream":
|
|
return detected_mime_type
|
|
return "application/octet-stream"
|
|
except Exception as e:
|
|
logger.warning(f"Error in MIME type detection for document {filename}: {str(e)}")
|
|
return 'application/octet-stream'
|
|
|
|
def _convertDocumentDataToString(self, document_data: Dict[str, Any], file_extension: str) -> str:
|
|
"""Convert document data to string content based on file type with enhanced processing"""
|
|
try:
|
|
# Handle None or empty data
|
|
if document_data is None:
|
|
return ""
|
|
|
|
# Handle string data directly
|
|
if isinstance(document_data, str):
|
|
return document_data
|
|
|
|
# Handle dictionary data
|
|
if isinstance(document_data, dict):
|
|
# For JSON files, return formatted JSON
|
|
if file_extension == 'json':
|
|
return json.dumps(document_data, indent=2, ensure_ascii=False)
|
|
|
|
# For text files, try to extract text content
|
|
elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']:
|
|
# Look for common text content fields
|
|
text_fields = ['content', 'text', 'data', 'result', 'summary', 'extracted_content', 'table_data']
|
|
for field in text_fields:
|
|
if field in document_data:
|
|
content = document_data[field]
|
|
if isinstance(content, str):
|
|
return content
|
|
elif isinstance(content, (dict, list)):
|
|
return json.dumps(content, indent=2, ensure_ascii=False)
|
|
|
|
# If no text field found, convert entire dict to JSON
|
|
return json.dumps(document_data, indent=2, ensure_ascii=False)
|
|
|
|
# For CSV files, try to extract table data
|
|
elif file_extension == 'csv':
|
|
# Look for CSV-specific fields first, then general content fields
|
|
csv_fields = ['table_data', 'csv_data', 'rows', 'data', 'content', 'text']
|
|
for field in csv_fields:
|
|
if field in document_data:
|
|
content = document_data[field]
|
|
if isinstance(content, str):
|
|
return content
|
|
elif isinstance(content, list):
|
|
# Convert list of rows to CSV format
|
|
if content and isinstance(content[0], (list, dict)):
|
|
import csv
|
|
import io
|
|
output = io.StringIO()
|
|
if isinstance(content[0], dict):
|
|
# List of dictionaries
|
|
if content:
|
|
fieldnames = content[0].keys()
|
|
writer = csv.DictWriter(output, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(content)
|
|
else:
|
|
# List of lists
|
|
writer = csv.writer(output)
|
|
writer.writerows(content)
|
|
return output.getvalue()
|
|
|
|
# Fallback to JSON if no CSV data found
|
|
return json.dumps(document_data, indent=2, ensure_ascii=False)
|
|
|
|
# For other file types, convert to JSON
|
|
else:
|
|
return json.dumps(document_data, indent=2, ensure_ascii=False)
|
|
|
|
# Handle list data
|
|
elif isinstance(document_data, list):
|
|
if file_extension == 'csv':
|
|
# Convert list to CSV format
|
|
import csv
|
|
import io
|
|
output = io.StringIO()
|
|
if document_data and isinstance(document_data[0], dict):
|
|
# List of dictionaries
|
|
fieldnames = document_data[0].keys()
|
|
writer = csv.DictWriter(output, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(document_data)
|
|
else:
|
|
# List of lists
|
|
writer = csv.writer(output)
|
|
writer.writerows(document_data)
|
|
return output.getvalue()
|
|
else:
|
|
return json.dumps(document_data, indent=2, ensure_ascii=False)
|
|
|
|
# Handle other data types
|
|
else:
|
|
return str(document_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error converting document data to string: {str(e)}")
|
|
return str(document_data)
|
|
|
|
async def _performTaskReview(self, review_context: ReviewContext) -> ReviewResult:
|
|
"""Perform AI-based task review with enhanced retry logic"""
|
|
try:
|
|
# Prepare prompt for result review
|
|
prompt = self._createResultReviewPrompt(review_context)
|
|
|
|
# Call AI with circuit breaker
|
|
response = await self._callAIWithCircuitBreaker(prompt, "result_review")
|
|
|
|
# Parse review result
|
|
review_dict = self._parseReviewResponse(response)
|
|
|
|
# Add default values for missing fields
|
|
review_dict.setdefault('status', 'unknown')
|
|
review_dict.setdefault('reason', 'No reason provided')
|
|
review_dict.setdefault('quality_score', 5)
|
|
|
|
# Enhanced retry logic based on result quality
|
|
if review_dict.get('status') == 'retry':
|
|
# Analyze the specific issues for better retry guidance
|
|
action_results = review_context.action_results or []
|
|
if action_results:
|
|
# Check for common issues that warrant retry
|
|
# Only consider empty results a problem if there are no documents produced
|
|
has_empty_results = any(
|
|
not result.data.get('result', '').strip() and
|
|
not result.documents and
|
|
not result.documents
|
|
for result in action_results
|
|
if result.success
|
|
)
|
|
|
|
has_incomplete_metadata = any(
|
|
any(doc.get('filename') == 'unknown' for doc in result.documents or [])
|
|
for result in action_results
|
|
if result.success
|
|
)
|
|
|
|
if has_empty_results:
|
|
review_dict['improvements'] = (review_dict.get('improvements', '') +
|
|
" Ensure the document extraction returns actual content, not empty results. " +
|
|
"Check if the AI prompt is specific enough to extract meaningful data.")
|
|
|
|
if has_incomplete_metadata:
|
|
review_dict['improvements'] = (review_dict.get('improvements', '') +
|
|
" Ensure proper document metadata is extracted including filename, size, and mime type. " +
|
|
"The document processing should provide complete file information.")
|
|
|
|
# If we have specific issues, adjust quality score
|
|
if has_empty_results or has_incomplete_metadata:
|
|
review_dict['quality_score'] = max(1, review_dict.get('quality_score', 5) - 2)
|
|
|
|
# Create ReviewResult model
|
|
return ReviewResult(
|
|
status=review_dict.get('status', 'unknown'),
|
|
reason=review_dict.get('reason', 'No reason provided'),
|
|
improvements=review_dict.get('improvements', []),
|
|
quality_score=review_dict.get('quality_score', 5),
|
|
missing_outputs=review_dict.get('missing_outputs', []),
|
|
met_criteria=review_dict.get('met_criteria', []),
|
|
unmet_criteria=review_dict.get('unmet_criteria', []),
|
|
confidence=review_dict.get('confidence', 0.5)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error performing task review: {str(e)}")
|
|
return ReviewResult(
|
|
status='success', # Default to success to avoid blocking workflow
|
|
reason=f'Review failed: {str(e)}',
|
|
quality_score=5,
|
|
confidence=0.5
|
|
)
|
|
|
|
def _getPreviousResultsFromActions(self, task_actions: List[TaskAction]) -> List[str]:
|
|
"""Get list of previous results from completed actions and workflow messages"""
|
|
results = []
|
|
|
|
# Get results from action objects
|
|
for action in task_actions:
|
|
if action.execResultLabel and action.isSuccessful():
|
|
results.append(action.execResultLabel)
|
|
|
|
# Get results from workflow messages (for actions that have been executed)
|
|
if hasattr(self, 'workflow') and self.workflow and self.workflow.messages:
|
|
for message in self.workflow.messages:
|
|
if (message.role == 'assistant' and
|
|
message.status == 'step' and
|
|
message.documentsLabel and
|
|
message.documentsLabel not in results):
|
|
results.append(message.documentsLabel)
|
|
|
|
return results
|
|
|
|
def _calculateTaskQualityMetrics(self, task_step: TaskStep, action_results: List[ActionExecutionResult]) -> Dict[str, Any]:
|
|
"""Calculate quality metrics for task step results"""
|
|
try:
|
|
quality_score = 0
|
|
confidence = 0
|
|
|
|
# Count successful actions
|
|
successful_actions = sum(1 for result in action_results if result.success)
|
|
total_actions = len(action_results)
|
|
|
|
if total_actions > 0:
|
|
success_rate = successful_actions / total_actions
|
|
quality_score = int(success_rate * 10) # Scale to 0-10
|
|
confidence = min(success_rate, 1.0)
|
|
|
|
return {
|
|
'score': quality_score,
|
|
'confidence': confidence,
|
|
'successful_actions': successful_actions,
|
|
'total_actions': total_actions
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating task quality metrics: {str(e)}")
|
|
return {'score': 0, 'confidence': 0, 'successful_actions': 0, 'total_actions': 0}
|
|
|
|
# ===== BASIC HELPER METHODS =====
|
|
|
|
def _getAvailableDocuments(self, workflow: ChatWorkflow) -> List[str]:
|
|
"""Get list of available documents in the workflow"""
|
|
documents = []
|
|
for message in workflow.messages:
|
|
for doc in message.documents:
|
|
documents.append(doc.filename)
|
|
return documents
|
|
|
|
def _getPreviousResults(self, task: TaskItem) -> List[str]:
|
|
"""Get list of previous results from completed actions"""
|
|
results = []
|
|
for action in task.actionList:
|
|
if action.execResultLabel:
|
|
results.append(action.execResultLabel)
|
|
return results
|
|
|
|
def _parseTaskPlanResponse(self, response: str) -> Dict[str, Any]:
|
|
"""Parse AI response into task plan structure"""
|
|
try:
|
|
# Extract JSON from response
|
|
json_start = response.find('{')
|
|
json_end = response.rfind('}') + 1
|
|
if json_start == -1 or json_end == 0:
|
|
raise ValueError("No JSON found in response")
|
|
|
|
json_str = response[json_start:json_end]
|
|
task_plan = json.loads(json_str)
|
|
|
|
# Validate structure
|
|
if 'tasks' not in task_plan:
|
|
raise ValueError("Task plan missing 'tasks' field")
|
|
|
|
return task_plan
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing task plan response: {str(e)}")
|
|
return {'tasks': []}
|
|
|
|
def _parseActionResponse(self, response: str) -> List[Dict[str, Any]]:
|
|
"""Parse AI response into action list"""
|
|
try:
|
|
# Extract JSON from response
|
|
json_start = response.find('{')
|
|
json_end = response.rfind('}') + 1
|
|
if json_start == -1 or json_end == 0:
|
|
raise ValueError("No JSON found in response")
|
|
|
|
json_str = response[json_start:json_end]
|
|
action_data = json.loads(json_str)
|
|
|
|
# Validate structure
|
|
if 'actions' not in action_data:
|
|
raise ValueError("Action response missing 'actions' field")
|
|
|
|
return action_data['actions']
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing action response: {str(e)}")
|
|
return []
|
|
|
|
def _parseReviewResponse(self, response: str) -> Dict[str, Any]:
|
|
"""Parse AI response into review result"""
|
|
try:
|
|
# Extract JSON from response
|
|
json_start = response.find('{')
|
|
json_end = response.rfind('}') + 1
|
|
if json_start == -1 or json_end == 0:
|
|
raise ValueError("No JSON found in response")
|
|
|
|
json_str = response[json_start:json_end]
|
|
review = json.loads(json_str)
|
|
|
|
# Validate structure
|
|
if 'status' not in review:
|
|
raise ValueError("Review response missing 'status' field")
|
|
|
|
return review
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing review response: {str(e)}")
|
|
return {'status': 'failed', 'reason': f'Parse error: {str(e)}'}
|
|
|
|
async def _callAI(self, prompt: str, context: str) -> str:
|
|
"""Call AI service with prompt"""
|
|
try:
|
|
# Use the existing AI call mechanism through service
|
|
if hasattr(self, 'service') and self.service:
|
|
# Ensure service is properly initialized
|
|
if hasattr(self.service, 'callAiTextBasic'):
|
|
response = await self.service.callAiTextBasic(prompt)
|
|
return response
|
|
else:
|
|
raise Exception("Service does not have callAiTextBasic method")
|
|
else:
|
|
raise Exception("No service available for AI calls")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calling AI for {context}: {str(e)}")
|
|
raise
|
|
|
|
# ===== WORKFLOW FEEDBACK GENERATION =====
|
|
|
|
async def generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
|
|
"""Generate feedback message for workflow completion"""
|
|
try:
|
|
# Count messages by role
|
|
user_messages = [msg for msg in workflow.messages if msg.role == 'user']
|
|
assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant']
|
|
|
|
# Generate summary feedback
|
|
feedback = f"Workflow completed.\n\n"
|
|
feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n"
|
|
|
|
# Add final status
|
|
if workflow.status == "completed":
|
|
feedback += "All tasks completed successfully."
|
|
elif workflow.status == "partial":
|
|
feedback += "Some tasks completed with partial success."
|
|
else:
|
|
feedback += f"Workflow status: {workflow.status}"
|
|
|
|
return feedback
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating workflow feedback: {str(e)}")
|
|
return "Workflow processing completed."
|
|
|
|
# ===== UNIFIED WORKFLOW EXECUTION =====
|
|
|
|
async def executeUnifiedWorkflow(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> WorkflowResult:
|
|
"""Execute a unified workflow with state management and action-level validation"""
|
|
try:
|
|
logger.info(f"Starting unified workflow execution for workflow {workflow.id}")
|
|
start_time = time.time()
|
|
|
|
# Initialize chat manager with workflow
|
|
await self.initialize(workflow)
|
|
|
|
# Process file IDs if provided
|
|
documents = []
|
|
if hasattr(userInput, 'listFileId') and userInput.listFileId:
|
|
documents = await self.processFileIds(userInput.listFileId)
|
|
logger.info(f"Processed {len(documents)} documents")
|
|
|
|
# Calculate and update user input stats
|
|
user_input_size = self.service.calculateUserInputSize(userInput.prompt)
|
|
self.service.updateWorkflowStats(eventLabel="userinput", bytesReceived=user_input_size)
|
|
|
|
# Phase 1: High-Level Task Planning
|
|
logger.info("--- PHASE 1: HIGH-LEVEL TASK PLANNING ---")
|
|
task_plan = await self.planHighLevelTasks(userInput.prompt, workflow)
|
|
|
|
# Update stats for task planning
|
|
task_plan_size = self.service.calculateObjectSize(task_plan)
|
|
self.service.updateWorkflowStats(eventLabel="taskplan", bytesSent=task_plan_size)
|
|
|
|
# Create user-friendly task plan log
|
|
tasks_count = len(task_plan.tasks)
|
|
task_descriptions = "\n".join([f"- {task.description}" for task in task_plan.tasks])
|
|
self.chatInterface.createWorkflowLog({
|
|
"workflowId": workflow.id,
|
|
"message": f"Planning completed: {tasks_count} tasks identified\n{task_descriptions}",
|
|
"type": "info",
|
|
"status": "running",
|
|
"progress": 15,
|
|
"agentName": "System"
|
|
})
|
|
|
|
# Log task plan details (without document content)
|
|
task_plan_log = {
|
|
'overview': task_plan.overview,
|
|
'tasks_count': len(task_plan.tasks),
|
|
'tasks': []
|
|
}
|
|
for task in task_plan.tasks:
|
|
task_log = {
|
|
'id': task.id,
|
|
'description': task.description,
|
|
'dependencies': task.dependencies or [],
|
|
'expected_outputs': task.expected_outputs or [],
|
|
'success_criteria': task.success_criteria or [],
|
|
'required_documents_count': len(task.required_documents or []),
|
|
'estimated_complexity': task.estimated_complexity or ''
|
|
}
|
|
task_plan_log['tasks'].append(task_log)
|
|
logger.debug(f"TASK PLAN CREATED: {json.dumps(task_plan_log, indent=2, ensure_ascii=False)}")
|
|
|
|
# Execute each task step with state management
|
|
workflow_results = []
|
|
previous_results = []
|
|
|
|
for i, task_step in enumerate(task_plan.tasks):
|
|
task_description = task_step.description
|
|
logger.info(f"=== PROCESSING TASK {i+1}/{len(task_plan.tasks)}: {task_description} ===")
|
|
|
|
# Create user-friendly task start log
|
|
progress = 20 + (i * 60 // len(task_plan.tasks))
|
|
self.chatInterface.createWorkflowLog({
|
|
"workflowId": workflow.id,
|
|
"message": f"Executing task {i+1}/{len(task_plan.tasks)}: {task_description}",
|
|
"type": "info",
|
|
"status": "running",
|
|
"progress": progress,
|
|
"agentName": "System"
|
|
})
|
|
|
|
# Create context for task execution
|
|
task_context = TaskContext(
|
|
task_step=task_step,
|
|
workflow=workflow,
|
|
workflow_id=workflow.id,
|
|
available_documents=self._getAvailableDocuments(workflow),
|
|
previous_results=previous_results
|
|
)
|
|
|
|
# Execute task with state management
|
|
task_result = await self.executeTaskWithStateManagement(task_step, workflow, task_context)
|
|
|
|
# Log task result
|
|
if task_result.success:
|
|
self.chatInterface.createWorkflowLog({
|
|
"workflowId": workflow.id,
|
|
"message": f"🎯 Task {i+1} completed successfully",
|
|
"type": "success",
|
|
"status": "running",
|
|
"progress": progress + 20
|
|
})
|
|
|
|
# Update previous results for next task
|
|
# Note: TaskResult doesn't have successful_actions field, so we'll use a placeholder
|
|
previous_results = [f"task_{i+1}_results"]
|
|
|
|
else:
|
|
# Task failed - provide detailed feedback and stop workflow
|
|
reason = task_result.error or 'Unknown error'
|
|
|
|
self.chatInterface.createWorkflowLog({
|
|
"workflowId": workflow.id,
|
|
"message": f"❌ Task {i+1} failed: {reason}",
|
|
"type": "error",
|
|
"status": "failed",
|
|
"progress": progress + 15
|
|
})
|
|
|
|
# Generate detailed failure feedback
|
|
failure_feedback = self._generateTaskFailureFeedback(task_step, task_result, i+1, len(task_plan.tasks))
|
|
|
|
# Stop workflow and return failure
|
|
return WorkflowResult(
|
|
status='failed',
|
|
error=reason,
|
|
phase='execution',
|
|
completed_tasks=i,
|
|
total_tasks=len(task_plan.tasks),
|
|
execution_time=time.time() - start_time,
|
|
final_results_count=0
|
|
)
|
|
|
|
# Add task result to workflow results
|
|
workflow_results.append(task_result)
|
|
|
|
# Calculate total processing time
|
|
total_processing_time = time.time() - start_time
|
|
|
|
# Create final success log
|
|
self.chatInterface.createWorkflowLog({
|
|
"workflowId": workflow.id,
|
|
"message": f"🎉 Workflow completed ({len(workflow_results)}/{len(task_plan.tasks)} tasks)",
|
|
"type": "success",
|
|
"status": "completed",
|
|
"progress": 100
|
|
})
|
|
|
|
# Create workflow summary
|
|
workflow_summary = WorkflowResult(
|
|
status='completed',
|
|
completed_tasks=len(workflow_results),
|
|
total_tasks=len(task_plan.tasks),
|
|
execution_time=total_processing_time,
|
|
final_results_count=len(previous_results)
|
|
)
|
|
|
|
logger.info(f"=== UNIFIED WORKFLOW COMPLETED: {len(workflow_results)}/{len(task_plan.tasks)} tasks successful ===")
|
|
logger.debug(f"FINAL WORKFLOW SUMMARY: {json.dumps(workflow_summary.dict(), indent=2, ensure_ascii=False)}")
|
|
return workflow_summary
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in unified workflow execution: {str(e)}")
|
|
# Create error log for user
|
|
self.chatInterface.createWorkflowLog({
|
|
"workflowId": workflow.id,
|
|
"message": f"Workflow execution failed: {str(e)}",
|
|
"type": "error",
|
|
"status": "failed",
|
|
"progress": 100,
|
|
"agentName": "System"
|
|
})
|
|
return WorkflowResult(
|
|
status='failed',
|
|
error=str(e),
|
|
phase='execution',
|
|
completed_tasks=0,
|
|
total_tasks=0,
|
|
execution_time=0,
|
|
final_results_count=0
|
|
)
|
|
|
|
def _generateTaskFailureFeedback(self, task_step: TaskStep, task_result: TaskResult, task_number: int, total_tasks: int) -> str:
|
|
"""Generate detailed feedback for task failure"""
|
|
|
|
feedback = f"""
|
|
Workflow execution stopped due to task failure.
|
|
|
|
PROGRESS: {task_number}/{total_tasks} tasks completed successfully
|
|
FAILED TASK: {task_step.description}
|
|
|
|
FAILURE DETAILS:
|
|
- Task ID: {task_result.taskId}
|
|
- Status: {task_result.status}
|
|
- Success: {task_result.success}
|
|
- Error: {task_result.error or 'Unknown error'}
|
|
|
|
The workflow cannot continue because this task is essential for subsequent steps.
|
|
Please review the task requirements and try again with different input or approach.
|
|
"""
|
|
|
|
return feedback
|
|
|
|
# ===== NEW STATE MANAGEMENT AND VALIDATION CLASSES =====
|
|
|
|
async def executeTaskWithStateManagement(self, task_step: TaskStep, workflow: ChatWorkflow, context: TaskContext) -> TaskResult:
|
|
"""Execute task with state management and action-level retry logic"""
|
|
try:
|
|
logger.info(f"Executing task with state management: {task_step.description}")
|
|
|
|
# Initialize task state
|
|
state = TaskExecutionState(task_step)
|
|
|
|
while state.canRetry():
|
|
logger.info(f"Task execution attempt {state.retry_count + 1}/{state.max_retries + 1}")
|
|
|
|
# Generate actions (first time or after regeneration)
|
|
if state.retry_count == 0:
|
|
actions = await self.defineTaskActions(task_step, workflow, context.previous_results)
|
|
else:
|
|
# Regenerate actions with failure context
|
|
actions = await self._regenerateTaskActionsWithFailureContext(task_step, state, context)
|
|
|
|
if not actions:
|
|
logger.warning(f"No actions defined for task, marking as failed")
|
|
return TaskResult(
|
|
taskId=task_step.id,
|
|
status=TaskStatus.FAILED,
|
|
success=False,
|
|
error='No actions could be generated for task'
|
|
)
|
|
|
|
# Execute actions with individual validation
|
|
for i, action in enumerate(actions):
|
|
logger.info(f"Executing action {i+1}/{len(actions)}: {action.execMethod}.{action.execAction}")
|
|
|
|
# Execute action with validation
|
|
result = await self.executeActionWithValidation(action, workflow, context)
|
|
|
|
if result.validation.get('status') == 'success':
|
|
state.addSuccessfulAction(result)
|
|
logger.info(f"Action {i+1} completed successfully")
|
|
|
|
elif result.validation.get('status') == 'retry':
|
|
# Retry individual action
|
|
improvements = result.validation.get('improvements', [])
|
|
retry_result = await self.retryActionWithImprovements(action, result, improvements)
|
|
|
|
if retry_result.validation.get('status') == 'success':
|
|
state.addSuccessfulAction(retry_result)
|
|
logger.info(f"Action {i+1} retry successful")
|
|
else:
|
|
state.addFailedAction(retry_result)
|
|
logger.error(f"Action {i+1} retry failed")
|
|
# Action failed after retry - stop task execution and regenerate
|
|
break
|
|
|
|
else: # fail
|
|
state.addFailedAction(result)
|
|
logger.error(f"Action {i+1} failed validation - stopping task execution")
|
|
# Action failed - stop task execution and regenerate
|
|
break
|
|
|
|
# Validate task completion
|
|
task_validation = await self._validateTaskCompletion(state.successful_actions, task_step, workflow)
|
|
|
|
if task_validation.status == 'success':
|
|
logger.info(f"Task completed successfully with {len(state.successful_actions)} successful actions")
|
|
return TaskResult(
|
|
taskId=task_step.id,
|
|
status=TaskStatus.COMPLETED,
|
|
success=True,
|
|
feedback=f"Task completed successfully with {len(state.successful_actions)} successful actions"
|
|
)
|
|
|
|
elif task_validation.status == 'retry':
|
|
state.improvements = task_validation.improvements or []
|
|
state.incrementRetryCount()
|
|
logger.info(f"Task needs retry. Improvements: {state.improvements}")
|
|
# Preserve successful actions for next iteration
|
|
continue
|
|
|
|
else: # fail
|
|
logger.error(f"Task failed permanently")
|
|
return TaskResult(
|
|
taskId=task_step.id,
|
|
status=TaskStatus.FAILED,
|
|
success=False,
|
|
error=task_validation.reason or 'Task validation failed'
|
|
)
|
|
|
|
# Max retries reached
|
|
logger.error(f"Task failed after {state.max_retries} retries")
|
|
return TaskResult(
|
|
taskId=task_step.id,
|
|
status=TaskStatus.FAILED,
|
|
success=False,
|
|
error=f'Task failed after {state.max_retries} retries'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in task execution with state management: {str(e)}")
|
|
return TaskResult(
|
|
taskId=task_step.id,
|
|
status=TaskStatus.FAILED,
|
|
success=False,
|
|
error=f'Task execution error: {str(e)}'
|
|
)
|
|
|
|
async def _regenerateTaskActionsWithFailureContext(self, task_step: TaskStep, state: TaskExecutionState, context: TaskContext) -> List[TaskAction]:
|
|
"""Regenerate task actions with failure context and improvements"""
|
|
try:
|
|
logger.info(f"Regenerating actions for task with failure context")
|
|
|
|
# Analyze failure patterns
|
|
failure_patterns = state.getFailurePatterns()
|
|
|
|
# Create enhanced context with failure information
|
|
enhanced_context = TaskContext(
|
|
task_step=task_step,
|
|
workflow=context.workflow,
|
|
workflow_id=context.workflow_id,
|
|
available_documents=context.available_documents,
|
|
previous_results=state.getAvailableResults(),
|
|
improvements=state.improvements,
|
|
retry_count=state.retry_count,
|
|
failure_patterns=failure_patterns,
|
|
failed_actions=state.failed_actions,
|
|
successful_actions=state.successful_actions,
|
|
is_regeneration=True
|
|
)
|
|
|
|
# Generate new actions with failure avoidance
|
|
logger.info(f"Regenerating actions for task '{task_step.description}' with failure context (retry {state.retry_count})")
|
|
actions = await self.defineTaskActions(task_step, context.workflow, state.getAvailableResults(), enhanced_context)
|
|
|
|
logger.info(f"Regenerated {len(actions)} actions with failure context")
|
|
return actions
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error regenerating task actions: {str(e)}")
|
|
return []
|
|
|
|
async def _validateTaskCompletion(self, successful_actions: List[ActionExecutionResult], task_step: TaskStep, workflow: ChatWorkflow) -> ReviewResult:
|
|
"""Validate if task is completed successfully"""
|
|
try:
|
|
logger.info(f"Validating task completion: {task_step.description}")
|
|
|
|
# Create task result summary
|
|
task_result = {
|
|
'task_step': task_step,
|
|
'successful_actions': successful_actions,
|
|
'successful_count': len(successful_actions),
|
|
'expected_outputs': task_step.expected_outputs or [],
|
|
'success_criteria': task_step.success_criteria or []
|
|
}
|
|
|
|
# Use AI to validate task completion
|
|
prompt = self._createTaskCompletionValidationPrompt(task_result, task_step)
|
|
response = await self._callAIWithCircuitBreaker(prompt, "task_completion_validation")
|
|
|
|
# Log the validation response for debugging
|
|
logger.debug(f"Task validation AI response: {response}")
|
|
|
|
# Parse validation result
|
|
validation = self._parseTaskValidationResponse(response)
|
|
|
|
# Add quality metrics
|
|
validation['quality_metrics'] = self._calculateTaskQualityMetrics(task_step, successful_actions)
|
|
|
|
logger.info(f"Task completion validation: {validation.get('status', 'unknown')} - Reason: {validation.get('reason', 'No reason')}")
|
|
logger.debug(f"Parsed validation result: {json.dumps(validation, indent=2)}")
|
|
return ReviewResult(
|
|
status=validation.get('status', 'unknown'),
|
|
reason=validation.get('reason', 'No reason provided'),
|
|
improvements=validation.get('improvements', []),
|
|
quality_score=validation.get('quality_score', 5),
|
|
missing_outputs=validation.get('missing_outputs', []),
|
|
met_criteria=validation.get('met_criteria', []),
|
|
unmet_criteria=validation.get('unmet_criteria', []),
|
|
confidence=validation.get('confidence', 0.5)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating task completion: {str(e)}")
|
|
return ReviewResult(
|
|
status='success', # Default to success to avoid blocking
|
|
reason=f'Validation failed: {str(e)}',
|
|
quality_score=5,
|
|
confidence=0.5
|
|
)
|
|
|
|
def _createTaskCompletionValidationPrompt(self, task_result: Dict[str, Any], task_step: TaskStep) -> str:
|
|
"""Create prompt for task completion validation"""
|
|
|
|
successful_actions = task_result['successful_actions']
|
|
expected_outputs = task_result['expected_outputs']
|
|
success_criteria = task_result['success_criteria']
|
|
|
|
# Summarize successful actions
|
|
action_summary = []
|
|
for action in successful_actions:
|
|
action_summary.append({
|
|
'method': action.actionMethod or '',
|
|
'action': action.actionName or '',
|
|
'result_label': action.data.get('resultLabel', ''),
|
|
'documents_count': len(action.documents or []),
|
|
'has_text_result': bool(action.data.get('result', '').strip())
|
|
})
|
|
|
|
return f"""You are an action completion validator that evaluates if individual actions were successfully completed.
|
|
|
|
ACTION DETAILS:
|
|
{json.dumps(action_summary, indent=2)}
|
|
|
|
VALIDATION CRITERIA:
|
|
1. Check if the action's result_label matches what was delivered
|
|
2. If documents were delivered and result_label is present → SUCCESS
|
|
3. If no documents but text result with matching result_label or different result_label → RETRY
|
|
4. If no result_label and no delivery → FAIL
|
|
|
|
VALIDATION RULES:
|
|
- Focus on result_label matching
|
|
- Check if the action delivered the expected result type
|
|
- Document delivery with correct result_label = SUCCESS
|
|
- Text result with correct result_label = SUCCESS
|
|
|
|
VALIDATION QUESTIONS:
|
|
1. Does the result_label match what the action was supposed to deliver?
|
|
2. Were documents or text results delivered with the correct label?
|
|
3. Does the delivery match the action's objective?
|
|
|
|
REQUIRED JSON RESPONSE:
|
|
{{
|
|
"status": "success|retry|fail",
|
|
"reason": "Detailed explanation of the validation decision",
|
|
"improvements": ["specific improvement 1", "specific improvement 2"],
|
|
"missing_outputs": ["output1", "output2"],
|
|
"met_criteria": ["criteria1", "criteria2"],
|
|
"unmet_criteria": ["criteria3", "criteria4"],
|
|
"quality_score": 1-10,
|
|
"confidence": 0.0-1.0
|
|
}}
|
|
|
|
NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
|
|
|
|
def _parseTaskValidationResponse(self, response: str) -> Dict[str, Any]:
|
|
"""Parse task validation response"""
|
|
try:
|
|
# Extract JSON from response
|
|
json_start = response.find('{')
|
|
json_end = response.rfind('}') + 1
|
|
if json_start == -1 or json_end == 0:
|
|
raise ValueError("No JSON found in task validation response")
|
|
|
|
json_str = response[json_start:json_end]
|
|
validation = json.loads(json_str)
|
|
|
|
# Validate structure
|
|
if 'status' not in validation:
|
|
raise ValueError("Task validation response missing 'status' field")
|
|
|
|
# Set default values
|
|
validation.setdefault('reason', 'No reason provided')
|
|
validation.setdefault('improvements', [])
|
|
validation.setdefault('missing_outputs', [])
|
|
validation.setdefault('met_criteria', [])
|
|
validation.setdefault('unmet_criteria', [])
|
|
validation.setdefault('quality_score', 5)
|
|
validation.setdefault('confidence', 0.5)
|
|
|
|
return validation
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing task validation response: {str(e)}")
|
|
return {
|
|
'status': 'success',
|
|
'reason': f'Parse error: {str(e)}',
|
|
'improvements': [],
|
|
'missing_outputs': [],
|
|
'met_criteria': [],
|
|
'unmet_criteria': [],
|
|
'quality_score': 5,
|
|
'confidence': 0.5
|
|
}
|
|
|
|
# ===== FIX MISSING METHOD CALL =====
|
|
|
|
async def _executeSingleActionWithRetry(self, action: TaskAction, workflow: ChatWorkflow) -> ActionExecutionResult:
|
|
"""Execute single action with retry mechanism - FIXED"""
|
|
try:
|
|
logger.info(f"Executing action with retry: {action.execMethod}.{action.execAction}")
|
|
|
|
# Create context for validation
|
|
context = TaskContext(
|
|
task_step=TaskStep(id="action_retry", description="Action execution"),
|
|
workflow_id=workflow.id,
|
|
available_documents=[],
|
|
previous_results=[]
|
|
)
|
|
|
|
# Execute action with validation
|
|
result = await self.executeActionWithValidation(action, workflow, context)
|
|
|
|
# If validation indicates retry, attempt retry
|
|
if result.validation.get('status') == 'retry':
|
|
improvements = result.validation.get('improvements', [])
|
|
retry_result = await self.retryActionWithImprovements(action, result, improvements)
|
|
return retry_result
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing action with retry: {str(e)}")
|
|
action.setError(str(e))
|
|
return ActionExecutionResult(
|
|
success=False,
|
|
data={},
|
|
metadata={},
|
|
error=str(e),
|
|
actionId=action.id,
|
|
actionMethod=action.execMethod,
|
|
actionName=action.execAction,
|
|
documents=[],
|
|
validation={
|
|
'status': 'fail',
|
|
'reason': f'Execution error: {str(e)}',
|
|
'confidence': 0.0,
|
|
'improvements': [],
|
|
'quality_score': 0,
|
|
'missing_elements': [],
|
|
'suggested_retry_approach': ''
|
|
}
|
|
)
|
|
|
|
# ===== REPLACE OLD executeTaskActions METHOD =====
|
|
|
|
async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[ActionExecutionResult]:
|
|
"""Execute task actions with individual validation and retry logic"""
|
|
try:
|
|
logger.info(f"Executing {len(task_actions)} task actions with validation")
|
|
|
|
results = []
|
|
for i, action in enumerate(task_actions):
|
|
logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}")
|
|
|
|
# Create context for validation
|
|
context = TaskContext(
|
|
task_step=TaskStep(id=f"action_{i}", description="Action execution"),
|
|
workflow_id=workflow.id,
|
|
available_documents=[],
|
|
previous_results=[r.data.get('resultLabel', '') for r in results if r.data.get('resultLabel')]
|
|
)
|
|
|
|
# Execute action with validation
|
|
result = await self.executeActionWithValidation(action, workflow, context)
|
|
results.append(result)
|
|
|
|
# If action failed after validation, continue with next action
|
|
if isinstance(result.validation, dict) and result.validation.get('status') == 'fail':
|
|
logger.error(f"Action {i+1} failed validation, continuing with next action")
|
|
continue
|
|
|
|
logger.info(f"Task action execution completed: {len(results)} results")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing task actions: {str(e)}")
|
|
return []
|
|
|
|
# ===== RESTRUCTURED ACTION EXECUTION WITH VALIDATION =====
|
|
|
|
async def executeActionWithValidation(self, action: TaskAction, workflow: ChatWorkflow, context: TaskContext) -> ActionExecutionResult:
|
|
"""Execute single action with immediate validation"""
|
|
try:
|
|
logger.info(f"Executing action: {action.execMethod}.{action.execAction}")
|
|
|
|
# Execute the action
|
|
result = await self._executeSingleAction(action, workflow)
|
|
|
|
# Validate the result immediately
|
|
validator = ActionValidator(self)
|
|
validation = await validator.validateActionResult(result, action, context)
|
|
|
|
# Create ActionExecutionResult model
|
|
action_result = ActionExecutionResult(
|
|
success=result.success,
|
|
data=result.data,
|
|
metadata=result.metadata,
|
|
validation=validation,
|
|
error=result.error,
|
|
actionId=action.id,
|
|
actionMethod=action.execMethod,
|
|
actionName=action.execAction,
|
|
documents=result.data.get("documents", [])
|
|
)
|
|
|
|
# Log the action execution result as JSON (without document content)
|
|
action_result_json = {
|
|
'success': action_result.success,
|
|
'actionId': action_result.actionId,
|
|
'actionMethod': action_result.actionMethod,
|
|
'actionName': action_result.actionName,
|
|
'validation': action_result.validation,
|
|
'error': action_result.error,
|
|
'documents_count': len(action_result.documents),
|
|
'document_names': [doc.filename if hasattr(doc, 'filename') else str(doc) for doc in action_result.documents],
|
|
'data_keys': list(action_result.data.keys()) if isinstance(action_result.data, dict) else [],
|
|
'metadata_keys': list(action_result.metadata.keys()) if isinstance(action_result.metadata, dict) else []
|
|
}
|
|
logger.info(f"Action execution result for {action.execMethod}.{action.execAction}: {json.dumps(action_result_json, indent=2, ensure_ascii=False)}")
|
|
|
|
# Update action status based on validation
|
|
if validation['status'] == 'success':
|
|
action.setSuccess()
|
|
logger.info(f"Action {action.execMethod}.{action.execAction} validated successfully")
|
|
elif validation['status'] == 'retry':
|
|
action.status = TaskStatus.PENDING # Keep pending for retry
|
|
logger.warning(f"Action {action.execMethod}.{action.execAction} needs retry: {validation.get('reason', 'No reason')}")
|
|
else: # fail
|
|
action.setError(validation.get('reason', 'Action failed validation'))
|
|
logger.error(f"Action {action.execMethod}.{action.execAction} failed validation: {validation.get('reason', 'No reason')}")
|
|
|
|
return action_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing action with validation: {str(e)}")
|
|
action.setError(str(e))
|
|
return ActionExecutionResult(
|
|
success=False,
|
|
data={},
|
|
metadata={},
|
|
error=str(e),
|
|
actionId=action.id,
|
|
actionMethod=action.execMethod,
|
|
actionName=action.execAction,
|
|
documents=[],
|
|
validation={
|
|
'status': 'fail',
|
|
'reason': f'Execution error: {str(e)}',
|
|
'confidence': 0.0,
|
|
'improvements': [],
|
|
'quality_score': 0,
|
|
'missing_elements': [],
|
|
'suggested_retry_approach': ''
|
|
}
|
|
)
|
|
|
|
async def retryActionWithImprovements(self, action: TaskAction, previous_result: ActionExecutionResult, improvements: List[str]) -> ActionExecutionResult:
|
|
"""Retry action with improvements based on previous failure"""
|
|
try:
|
|
logger.info(f"Retrying action {action.execMethod}.{action.execAction} with improvements")
|
|
|
|
# Apply improvements to action parameters
|
|
enhanced_action = self._enhanceActionWithImprovements(action, improvements, previous_result)
|
|
|
|
# Execute enhanced action
|
|
result = await self._executeSingleAction(enhanced_action, self.workflow)
|
|
|
|
# Validate the retry result
|
|
validator = ActionValidator(self)
|
|
# Create a proper TaskContext for retry validation
|
|
retry_context = TaskContext(
|
|
task_step=TaskStep(
|
|
id="retry_context",
|
|
description="Action retry",
|
|
dependencies=[],
|
|
expected_outputs=[],
|
|
success_criteria=[],
|
|
required_documents=[],
|
|
estimated_complexity="low",
|
|
ai_prompt=""
|
|
),
|
|
workflow=None,
|
|
workflow_id=None,
|
|
available_documents=[],
|
|
previous_results=[],
|
|
improvements=[],
|
|
retry_count=1,
|
|
previous_action_results=[],
|
|
previous_review_result=None,
|
|
is_regeneration=False,
|
|
failure_patterns=[],
|
|
failed_actions=[],
|
|
successful_actions=[]
|
|
)
|
|
validation = await validator.validateActionResult(result, enhanced_action, retry_context)
|
|
|
|
# Create ActionExecutionResult model with retry metadata
|
|
action_result = ActionExecutionResult(
|
|
success=result.success,
|
|
data=result.data,
|
|
metadata=result.metadata,
|
|
validation=validation,
|
|
error=result.error,
|
|
actionId=action.id,
|
|
actionMethod=action.execMethod,
|
|
actionName=action.execAction,
|
|
documents=result.data.get("documents", []),
|
|
is_retry=True,
|
|
previous_error=previous_result.error,
|
|
applied_improvements=improvements
|
|
)
|
|
|
|
# Log the retry action execution result as JSON (without document content)
|
|
retry_result_json = {
|
|
'success': action_result.success,
|
|
'actionId': action_result.actionId,
|
|
'actionMethod': action_result.actionMethod,
|
|
'actionName': action_result.actionName,
|
|
'validation': action_result.validation,
|
|
'error': action_result.error,
|
|
'is_retry': action_result.is_retry,
|
|
'previous_error': action_result.previous_error,
|
|
'applied_improvements': action_result.applied_improvements,
|
|
'documents_count': len(action_result.documents),
|
|
'document_names': [doc.filename if hasattr(doc, 'filename') else str(doc) for doc in action_result.documents],
|
|
'data_keys': list(action_result.data.keys()) if isinstance(action_result.data, dict) else [],
|
|
'metadata_keys': list(action_result.metadata.keys()) if isinstance(action_result.metadata, dict) else []
|
|
}
|
|
logger.info(f"Retry action execution result for {action.execMethod}.{action.execAction}: {json.dumps(retry_result_json, indent=2, ensure_ascii=False)}")
|
|
|
|
# Update action status
|
|
if validation['status'] == 'success':
|
|
enhanced_action.setSuccess()
|
|
logger.info(f"Action retry successful: {enhanced_action.execMethod}.{enhanced_action.execAction}")
|
|
else:
|
|
enhanced_action.setError(validation.get('reason', 'Retry failed'))
|
|
logger.error(f"Action retry failed: {enhanced_action.execMethod}.{enhanced_action.execAction}")
|
|
|
|
return action_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrying action: {str(e)}")
|
|
action.setError(str(e))
|
|
return ActionExecutionResult(
|
|
success=False,
|
|
data={},
|
|
metadata={},
|
|
error=str(e),
|
|
actionId=action.id,
|
|
actionMethod=action.execMethod,
|
|
actionName=action.execAction,
|
|
documents=[],
|
|
is_retry=True,
|
|
validation={
|
|
'status': 'fail',
|
|
'reason': f'Retry execution error: {str(e)}',
|
|
'confidence': 0.0,
|
|
'improvements': [],
|
|
'quality_score': 0,
|
|
'missing_elements': [],
|
|
'suggested_retry_approach': ''
|
|
}
|
|
)
|
|
|
|
def _enhanceActionWithImprovements(self, action: TaskAction, improvements: List[str], previous_result: ActionExecutionResult) -> TaskAction:
|
|
"""Enhance action parameters based on improvements and previous failure"""
|
|
enhanced_action = TaskAction(
|
|
id=action.id,
|
|
execMethod=action.execMethod,
|
|
execAction=action.execAction,
|
|
execParameters=action.execParameters.copy(), # Copy to avoid modifying original
|
|
execResultLabel=action.execResultLabel,
|
|
status=action.status
|
|
)
|
|
|
|
# Apply improvements based on failure patterns
|
|
for improvement in improvements:
|
|
if "incomplete_analysis" in improvement.lower():
|
|
# Enhance AI prompt for more comprehensive analysis
|
|
current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
|
|
enhanced_prompt = current_prompt + "\n\nIMPORTANT: Provide comprehensive, detailed analysis covering all aspects. Be thorough and include all relevant information."
|
|
enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
|
|
|
|
elif "missing_content" in improvement.lower():
|
|
# Add content validation to prompt
|
|
current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
|
|
enhanced_prompt = current_prompt + "\n\nVALIDATION: Ensure all content is extracted and no information is missed. Provide complete output."
|
|
enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
|
|
|
|
elif "wrong_format" in improvement.lower():
|
|
# Add format specification
|
|
current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
|
|
enhanced_prompt = current_prompt + "\n\nFORMAT: Provide output in structured, well-organized format with clear sections and proper formatting."
|
|
enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
|
|
|
|
elif "timeout" in improvement.lower():
|
|
# Add timeout handling
|
|
current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
|
|
enhanced_prompt = current_prompt + "\n\nEFFICIENCY: Provide concise but complete analysis. Focus on key information and avoid unnecessary details."
|
|
enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
|
|
|
|
# Apply specific improvements from validation
|
|
validation = previous_result.validation if hasattr(previous_result, 'validation') else {}
|
|
suggested_approach = validation.get('suggested_retry_approach', '') if isinstance(validation, dict) else ''
|
|
if suggested_approach:
|
|
current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
|
|
enhanced_prompt = current_prompt + f"\n\nRETRY APPROACH: {suggested_approach}"
|
|
enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
|
|
|
|
return enhanced_action
|