From 8592cdd790404e0a592b07fa70f2a0fa8c5d2f15 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sat, 16 Aug 2025 23:32:36 +0200
Subject: [PATCH] stable workflow
---
modules/chat/OLD_BACKUP managerChat.py | 3078 ------------------
modules/chat/documents/documentExtraction.py | 1435 +++++++-
modules/chat/documents/documentGeneration.py | 47 +-
modules/chat/handling/handlingTasks.py | 410 ++-
modules/chat/handling/promptFactory.py | 139 +-
modules/chat/managerChat.py | 32 +-
modules/chat/serviceCenter.py | 116 +-
modules/interfaces/interfaceAiCalls.py | 53 +-
modules/interfaces/interfaceAppObjects.py | 15 +-
modules/interfaces/interfaceChatModel.py | 7 +-
modules/interfaces/interfaceChatObjects.py | 23 +
modules/methods/methodDocument.py | 461 ++-
modules/methods/methodOutlook.py | 325 +-
modules/methods/methodSharepoint.py | 4 +-
modules/workflow/managerWorkflow.py | 81 +-
requirements.txt | 12 +
test_documentExtraction.py | 855 +++++
test_excel_processing.py | 189 ++
web_search_20250717_140455.txt | 0
web_search_20250717_144557.txt | 0
20 files changed, 3701 insertions(+), 3581 deletions(-)
delete mode 100644 modules/chat/OLD_BACKUP managerChat.py
create mode 100644 test_documentExtraction.py
create mode 100644 test_excel_processing.py
delete mode 100644 web_search_20250717_140455.txt
delete mode 100644 web_search_20250717_144557.txt
diff --git a/modules/chat/OLD_BACKUP managerChat.py b/modules/chat/OLD_BACKUP managerChat.py
deleted file mode 100644
index 056efcbc..00000000
--- a/modules/chat/OLD_BACKUP managerChat.py
+++ /dev/null
@@ -1,3078 +0,0 @@
-import asyncio
-import logging
-import uuid
-import json
-import time
-from typing import Dict, Any, Optional, List, Union
-from datetime import datetime, UTC
-
-from modules.interfaces.interfaceAppModel import User
-from modules.interfaces.interfaceChatModel import (
- TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, UserInputRequest, ActionResult,
- ExtractedContent, ContentItem, ContentMetadata, DocumentExchange, TaskStep, TaskContext, ActionExecutionResult, ReviewContext, ReviewResult, TaskPlan, WorkflowResult
-)
-from modules.chat.serviceCenter import ServiceCenter
-from modules.interfaces.interfaceChatObjects import ChatObjects
-
-logger = logging.getLogger(__name__)
-
-# ===== STATE MANAGEMENT AND VALIDATION CLASSES =====
-
-class TaskExecutionState:
- """Manages state during task execution with retry logic"""
- def __init__(self, task_step: TaskStep):
- self.task_step = task_step
- self.successful_actions: List[ActionExecutionResult] = [] # Preserved across retries
- self.failed_actions: List[ActionExecutionResult] = [] # For analysis
- self.current_action_index = 0
- self.retry_count = 0
- self.improvements = []
- self.partial_results = {} # Store intermediate results
- self.max_retries = 3
- def addSuccessfulAction(self, action_result: ActionExecutionResult):
- self.successful_actions.append(action_result)
- if action_result.data.get('resultLabel'):
- self.partial_results[action_result.data['resultLabel']] = action_result
- def addFailedAction(self, action_result: ActionExecutionResult):
- self.failed_actions.append(action_result)
- def getAvailableResults(self) -> list:
- return [result.data.get('resultLabel', '') for result in self.successful_actions if result.data.get('resultLabel')]
- def shouldRetryTask(self) -> bool:
- return len(self.successful_actions) > 0 and len(self.failed_actions) > 0
- def canRetry(self) -> bool:
- return self.retry_count < self.max_retries
- def incrementRetryCount(self):
- self.retry_count += 1
- def getFailurePatterns(self) -> list:
- patterns = []
- for action in self.failed_actions:
- error = action.error.lower() if action.error else ''
- if "timeout" in error:
- patterns.append("timeout_issues")
- elif "document_not_found" in error or "file not found" in error:
- patterns.append("document_reference_issues")
- elif "empty_result" in error or "no content" in error:
- patterns.append("content_extraction_issues")
- elif "invalid_format" in error or "wrong format" in error:
- patterns.append("format_issues")
- elif "permission" in error or "access denied" in error:
- patterns.append("permission_issues")
- return list(set(patterns))
-
-class ActionValidator:
- """Generic AI-based action result validation"""
- def __init__(self, chat_manager):
- self.chat_manager = chat_manager
-
- async def validateActionResult(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> dict:
- """Generic action validation using AI"""
- try:
- # Create generic validation prompt
- prompt = self._createGenericValidationPrompt(action_result, action, context)
- response = await self.chat_manager._callAIWithCircuitBreaker(prompt, "action_validation")
- validation = self._parseValidationResponse(response)
-
- # Add action metadata
- validation['action_id'] = action.id
- validation['action_method'] = action.execMethod
- validation['action_name'] = action.execAction
- validation['result_label'] = action.execResultLabel
-
- return validation
- except Exception as e:
- logger.error(f"Error validating action result: {str(e)}")
- return {
- 'status': 'success',
- 'reason': f'Validation failed: {str(e)}',
- 'confidence': 0.5,
- 'improvements': [],
- 'action_id': action.id,
- 'action_method': action.execMethod,
- 'action_name': action.execAction,
- 'result_label': action.execResultLabel
- }
-
- def _createGenericValidationPrompt(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> str:
- """Create a validation prompt focused on result file delivery"""
- # Extract data from ActionResult model
- success = action_result.success
- result_data = action_result.data
- error = action_result.error
- validation_messages = action_result.validation
-
- # Extract result text from data
- result_text = result_data.get("result", "") if isinstance(result_data, dict) else str(result_data)
-
- # Get documents from ActionResult data
- documents = result_data.get("documents", []) if isinstance(result_data, dict) else []
- doc_count = len(documents)
-
- # Extract expected result format from action parameters
- expected_result_label = action.execResultLabel
- expected_format = action.execParameters.get('outputFormat', 'unknown')
-
- # Extract expected document formats from action
- expected_document_formats = action.expectedDocumentFormats or []
-
- # Check if the result label is present in the action result data
- actual_result_label = result_data.get("resultLabel", "") if isinstance(result_data, dict) else ""
- result_label_match = actual_result_label == expected_result_label
-
- # Analyze delivered documents and content
- delivered_files = []
- delivered_formats = []
- content_items = []
-
- # Check for ChatDocument objects
- for doc in documents:
- if hasattr(doc, 'filename'):
- delivered_files.append(doc.filename)
- # Extract format information
- file_extension = self._getFileExtension(doc.filename)
- mime_type = getattr(doc, 'mimeType', 'application/octet-stream')
- delivered_formats.append({
- 'filename': doc.filename,
- 'extension': file_extension,
- 'mimeType': mime_type
- })
- elif isinstance(doc, dict) and 'filename' in doc:
- delivered_files.append(doc['filename'])
- file_extension = self._getFileExtension(doc['filename'])
- mime_type = doc.get('mimeType', 'application/octet-stream')
- delivered_formats.append({
- 'filename': doc['filename'],
- 'extension': file_extension,
- 'mimeType': mime_type
- })
- else:
- delivered_files.append(f"document_{len(delivered_files)}")
- delivered_formats.append({
- 'filename': f"document_{len(delivered_files)}",
- 'extension': 'unknown',
- 'mimeType': 'application/octet-stream'
- })
-
- # Check for ExtractedContent in result data
- if isinstance(result_data, dict):
- if 'extractedContent' in result_data:
- extracted_content = result_data['extractedContent']
- if hasattr(extracted_content, 'contents'):
- content_items = extracted_content.contents
- elif 'contents' in result_data:
- content_items = result_data['contents']
-
- # If we have delivered files but no content items, consider it successful
- # This handles the case where content is stored in files rather than result data
- if delivered_files and not content_items:
- content_items = [f"File content available in: {', '.join(delivered_files)}"]
-
- # Analyze content items
- content_summary = []
- for item in content_items:
- if hasattr(item, 'label') and hasattr(item, 'metadata'):
- content_summary.append(f"{item.label}: {item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else 'unknown'}")
- elif isinstance(item, str):
- content_summary.append(item)
- else:
- content_summary.append(str(item))
-
- return f"""You are an action result validator. Your primary focus is to validate that the action delivered the promised result files in the promised format.
-
-ACTION DETAILS:
-- Method: {action.execMethod}
-- Action: {action.execAction}
-- Expected Result Label: {expected_result_label}
-- Actual Result Label: {actual_result_label}
-- Result Label Match: {result_label_match}
-- Expected Format: {expected_format}
-- Expected Document Formats: {json.dumps(expected_document_formats, indent=2) if expected_document_formats else 'None specified'}
-- Parameters: {json.dumps(action.execParameters, indent=2)}
-
-RESULT TO VALIDATE:
-- Success: {success}
-- Result Data: {result_text[:500]}{'...' if len(result_text) > 500 else ''}
-- Error: {error}
-- Validation Messages: {', '.join(validation_messages) if validation_messages else 'None'}
-- Documents Produced: {doc_count}
-- Delivered Files: {', '.join(delivered_files) if delivered_files else 'None'}
-- Delivered Formats: {json.dumps(delivered_formats, indent=2) if delivered_formats else 'None'}
-- Content Items: {', '.join(content_summary) if content_summary else 'None'}
-
-CRITICAL VALIDATION CRITERIA:
-1. **Result Label Match**: Does the action result contain the expected result label?
-2. **File Delivery**: Did the action deliver the promised result file(s)?
-3. **Format Compliance**: If expected document formats were specified, do the delivered files match the expected formats?
-4. **Content Quality**: Is the content of the delivered files usable and complete?
-5. **Content Processing**: If content extraction was expected, was it performed correctly?
-
-CONTEXT:
-- Task Description: {context.task_step.description if context.task_step else 'Unknown'}
-- Previous Results: {', '.join(context.previous_results) if context.previous_results else 'None'}
-
-VALIDATION INSTRUCTIONS:
-1. **Result Label Check**: Verify that the expected result label "{expected_result_label}" is present in the action result data. This is the primary success criterion.
-2. **File Delivery**: Check if files were delivered when expected. The individual filenames don't need to match the result label - focus on whether content was actually produced.
-3. **Format Compliance**: If expected document formats were specified, check if delivered files match the expected extensions and MIME types. If no formats were specified, this criterion is satisfied.
-4. **Content Quality**: If files were delivered, consider the action successful. The presence of delivered files indicates content was processed and stored.
-5. **Content Processing**: If files were delivered, assume content extraction was performed correctly. The file delivery is evidence of successful processing.
-6. **Success Criteria**: The action is successful if the result label matches AND files were delivered. If expected formats were specified, they should also match.
-
-IMPORTANT NOTES:
-- The result label must be present in the action result data for success
-- Individual filenames can be different from the result label
-- If files were delivered, consider the action successful even if content details are not provided
-- Focus on whether the action accomplished its intended purpose (file delivery)
-- Empty files should be considered failures, but delivered files indicate success
-
-REQUIRED JSON RESPONSE:
-{{
- "status": "success|retry|fail",
- "reason": "Detailed explanation focusing on result label match and content quality",
- "confidence": 0.0-1.0,
- "improvements": ["specific improvements if needed"],
- "quality_score": 1-10,
- "missing_elements": ["missing result label", "missing files", "content issues"],
- "suggested_retry_approach": "Specific approach for retry if status is retry"
-}}
-
-NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
-
- def _parseValidationResponse(self, response: str) -> dict:
- """Parse the AI validation response"""
- try:
- json_start = response.find('{')
- json_end = response.rfind('}') + 1
- if json_start == -1 or json_end == 0:
- raise ValueError("No JSON found in validation response")
-
- json_str = response[json_start:json_end]
- validation = json.loads(json_str)
-
- if 'status' not in validation:
- raise ValueError("Validation response missing 'status' field")
-
- # Set defaults for optional fields
- validation.setdefault('confidence', 0.5)
- validation.setdefault('improvements', [])
- validation.setdefault('quality_score', 5)
- validation.setdefault('missing_elements', [])
- validation.setdefault('suggested_retry_approach', '')
-
- return validation
- except Exception as e:
- logger.error(f"Error parsing validation response: {str(e)}")
- return {
- 'status': 'success',
- 'reason': f'Parse error: {str(e)}',
- 'confidence': 0.5,
- 'improvements': [],
- 'quality_score': 5,
- 'missing_elements': [],
- 'suggested_retry_approach': ''
- }
-
- def _getFileExtension(self, filename: str) -> str:
- """Extract file extension from filename"""
- if '.' in filename:
- return '.' + filename.split('.')[-1]
- return ''
-
-class ChatManager:
- """Chat manager with improved AI integration and method handling"""
-
- def __init__(self, currentUser: User, chatInterface: ChatObjects):
- self.currentUser = currentUser
- self.chatInterface = chatInterface
- self.service: ServiceCenter = None
- self.workflow: ChatWorkflow = None
-
- # Circuit breaker for AI calls
- self.ai_failure_count = 0
- self.ai_last_failure_time = None
- self.ai_circuit_breaker_threshold = 5
- self.ai_circuit_breaker_timeout = 300 # 5 minutes
-
- # Timeout settings
- self.ai_call_timeout = 120 # 2 minutes
- self.task_execution_timeout = 600 # 10 minutes
-
- # ===== Initialization and Setup =====
- async def initialize(self, workflow: ChatWorkflow) -> None:
- """Initialize chat manager with workflow"""
- self.workflow = workflow
- self.service = ServiceCenter(self.currentUser, self.workflow)
-
- # ===== WORKFLOW PHASES =====
-
- # Phase 1: High-Level Task Planning
- async def planHighLevelTasks(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan:
- """Phase 1: Plan high-level tasks from user input"""
- try:
- logger.info(f"Planning high-level tasks for workflow {workflow.id}")
-
- # Create planning prompt
- prompt = self.createTaskPlanningPrompt({
- 'user_request': userInput,
- 'available_documents': self._getAvailableDocuments(workflow),
- 'workflow_id': workflow.id
- })
-
- # Get AI response with fallback mechanism
- response = await self._callAIWithCircuitBreaker(prompt, "task_planning")
-
- # Parse and validate task plan
- task_plan_dict = self._parseTaskPlanResponse(response)
-
- if not self._validateTaskPlan(task_plan_dict):
- logger.error("Generated task plan failed validation")
- raise Exception("AI-generated task plan failed validation - AI is required for task planning")
-
- # Convert to TaskPlan model
- tasks = []
- for task_dict in task_plan_dict.get('tasks', []):
- task = TaskStep(
- id=task_dict.get('id', ''),
- description=task_dict.get('description', ''),
- dependencies=task_dict.get('dependencies', []),
- expected_outputs=task_dict.get('expected_outputs', []),
- success_criteria=task_dict.get('success_criteria', []),
- required_documents=task_dict.get('required_documents', []),
- estimated_complexity=task_dict.get('estimated_complexity'),
- ai_prompt=task_dict.get('ai_prompt')
- )
- tasks.append(task)
-
- task_plan = TaskPlan(
- overview=task_plan_dict.get('overview', ''),
- tasks=tasks
- )
-
- # Log the task plan as JSON for debugging
- logger.info(f"Task plan created for workflow {workflow.id}:")
- task_plan_json = {
- 'overview': task_plan.overview,
- 'tasks_count': len(task_plan.tasks),
- 'tasks': []
- }
- for task in task_plan.tasks:
- task_json = {
- 'id': task.id,
- 'description': task.description,
- 'dependencies': task.dependencies or [],
- 'expected_outputs': task.expected_outputs or [],
- 'success_criteria': task.success_criteria or [],
- 'required_documents': task.required_documents or [],
- 'estimated_complexity': task.estimated_complexity or '',
- 'ai_prompt': task.ai_prompt or ''
- }
- task_plan_json['tasks'].append(task_json)
- logger.info(f"Task Plan: {json.dumps(task_plan_json, indent=2, ensure_ascii=False)}")
-
- logger.info(f"High-level task planning completed: {len(task_plan.tasks)} tasks")
- return task_plan
-
- except Exception as e:
- error_message = str(e)
- logger.error(f"Error in high-level task planning: {error_message}")
-
- # Provide more specific error messages based on the error type
- if "overloaded" in error_message.lower() or "529" in error_message:
- detailed_error = "AI service is currently overloaded. Please try again in a few minutes."
- elif "rate limit" in error_message.lower() or "429" in error_message:
- detailed_error = "Rate limit exceeded. Please wait before making another request."
- elif "api key" in error_message.lower() or "401" in error_message:
- detailed_error = "Invalid API key. Please check your AI service configuration."
- elif "timeout" in error_message.lower():
- detailed_error = "AI service request timed out. Please try again."
- else:
- detailed_error = f"AI service error: {error_message}"
-
- raise Exception(detailed_error)
-
- # Phase 2: Task Definition and Action Generation
- async def defineTaskActions(self, task_step: TaskStep, workflow: ChatWorkflow, previous_results: List[str] = None,
- enhanced_context: TaskContext = None) -> List[TaskAction]:
- """Phase 2: Define specific actions for a task step with enhanced retry context"""
- try:
- logger.info(f"Defining actions for task: {task_step.description if hasattr(task_step, 'description') else 'Unknown'}")
-
- # Use enhanced context if provided (for retries), otherwise create basic context
- if enhanced_context:
- context = enhanced_context
- else:
- context = TaskContext(
- task_step=task_step,
- workflow=workflow,
- workflow_id=workflow.id,
- available_documents=self._getAvailableDocuments(workflow),
- previous_results=previous_results or [],
- improvements=[],
- retry_count=0,
- previous_action_results=[],
- previous_review_result=None,
- is_regeneration=False,
- failure_patterns=[],
- failed_actions=[],
- successful_actions=[]
- )
-
- # Generate actions using AI
- actions = await self._generateActionsForTaskStep(context)
-
- # Log the generated actions as JSON for debugging
- logger.info(f"Generated {len(actions)} actions for task '{task_step.description}':")
- for i, action in enumerate(actions):
- logger.info(f"Action {i+1}: {json.dumps(action, indent=2, ensure_ascii=False)}")
-
- # Convert to TaskAction objects
- # Get available document labels for validation
- available_document_labels = set(self._getAvailableDocuments(workflow))
- task_actions = []
- invalid_doc_ref_detected = False
- # Collect resultLabels of actions defined so far in this step
- result_labels_so_far = set()
- for action_dict in actions:
- # Validate document references in parameters
- params = action_dict.get('parameters', {})
- if 'documentList' in params and isinstance(params['documentList'], list):
- original_refs = params['documentList']
- # Allow references to available documents or to resultLabels of actions defined so far
- valid_refs = [ref for ref in original_refs if ref in available_document_labels or ref in result_labels_so_far]
- if len(valid_refs) < len(original_refs):
- logger.warning(f"Action {action_dict.get('method','?')}.{action_dict.get('action','?')} has invalid document references: {set(original_refs) - set(valid_refs)}. Only using valid: {valid_refs}")
- invalid_doc_ref_detected = True
- if not valid_refs:
- logger.warning(f"Skipping action {action_dict.get('method','?')}.{action_dict.get('action','?')} due to no valid document references.")
- continue
- params['documentList'] = valid_refs
- action_data = {
- "execMethod": action_dict.get('method', 'unknown'),
- "execAction": action_dict.get('action', 'unknown'),
- "execParameters": params,
- "execResultLabel": action_dict.get('resultLabel', ''),
- "expectedDocumentFormats": action_dict.get('expectedDocumentFormats', None),
- "status": TaskStatus.PENDING
- }
- task_action = self.chatInterface.createTaskAction(action_data)
- if task_action:
- # Log action definition: parameters, input documentLabels, output document label
- logger.debug(f"[ACTION DEFINITION] Method: {task_action.execMethod}, Action: {task_action.execAction}, Parameters: {json.dumps(task_action.execParameters, ensure_ascii=False)}, Input documentLabels: {task_action.execParameters.get('documentList', [])}, Output documentLabel: {task_action.execResultLabel}")
- task_actions.append(task_action)
- # Add this action's resultLabel to the running set for subsequent actions
- if action_data["execResultLabel"]:
- result_labels_so_far.add(action_data["execResultLabel"])
- logger.info(f"Created task action: {task_action.execMethod}.{task_action.execAction}")
- # If all actions were skipped due to invalid document references, add improvement and return []
- if not task_actions and invalid_doc_ref_detected:
- improvement_msg = ("Previous action(s) referenced invalid or unavailable document labels. "
- "Only use document labels listed in AVAILABLE DOCUMENTS. Do not invent or copy message IDs.")
- if enhanced_context:
- if hasattr(enhanced_context, 'improvements') and isinstance(enhanced_context.improvements, list):
- enhanced_context.improvements.append(improvement_msg)
- else:
- if hasattr(context, 'improvements') and isinstance(context.improvements, list):
- context.improvements.append(improvement_msg)
- logger.warning("All actions skipped due to invalid document references. Added improvement for retry.")
- return []
-
- # Update stats for task validation (estimate bytes for action validation)
- if task_actions:
- # Calculate actual action size for stats
- action_size = self.service.calculateObjectSize(task_actions)
- self.service.updateWorkflowStats(eventLabel="action", bytesSent=action_size)
-
- # Log the final TaskAction objects as JSON
- logger.info(f"Final TaskAction objects for task '{task_step.description}':")
- for i, task_action in enumerate(task_actions):
- action_json = {
- 'id': task_action.id,
- 'execMethod': task_action.execMethod,
- 'execAction': task_action.execAction,
- 'execParameters': task_action.execParameters,
- 'execResultLabel': task_action.execResultLabel,
- 'status': task_action.status.value if hasattr(task_action.status, 'value') else str(task_action.status)
- }
- logger.info(f"TaskAction {i+1}: {json.dumps(action_json, indent=2, ensure_ascii=False)}")
-
- logger.info(f"Task action definition completed: {len(task_actions)} actions")
- return task_actions
-
- except Exception as e:
- logger.error(f"Error defining task actions: {str(e)}")
- return []
-
- # Phase 3: Action Execution
- async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[ActionExecutionResult]:
- """Phase 3: Execute all actions for a task with retry mechanism"""
- try:
- logger.info(f"Executing {len(task_actions)} task actions")
-
- results = []
- for i, action in enumerate(task_actions):
- logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}")
-
- # Execute single action with retry mechanism
- result = await self._executeSingleAction(action, workflow)
- results.append(result)
-
- # If action failed after all retries, continue with next action instead of stopping
- if not result.success:
- logger.error(f"Action {i+1} failed after retries, continuing with next action")
- # Don't break - continue with remaining actions
- continue
-
- logger.info(f"Task action execution completed: {len(results)} results")
- return results
-
- except Exception as e:
- logger.error(f"Error executing task actions: {str(e)}")
- return []
-
- # Phase 4: Task Review and Quality Assessment
- async def reviewTaskCompletion(self, task_step: TaskStep, task_actions: List[TaskAction],
- action_results: List[ActionExecutionResult], workflow: ChatWorkflow) -> ReviewResult:
- """Phase 4: Review task completion and decide next steps"""
- try:
- logger.info(f"Reviewing task completion: {task_step.description}")
-
- # Create step result summary from action results
- step_result = {
- 'task_step': task_step,
- 'action_results': action_results,
- 'successful_actions': sum(1 for result in action_results if result.success),
- 'total_actions': len(action_results),
- 'results': [result.data.get('result', '') for result in action_results if result.success],
- 'errors': [result.error for result in action_results if not result.success]
- }
-
- # Prepare review context
- review_context = ReviewContext(
- task_step=task_step,
- task_actions=task_actions,
- action_results=action_results,
- step_result=step_result,
- workflow_id=workflow.id,
- previous_results=self._getPreviousResultsFromActions(task_actions)
- )
-
- # Use AI to review the results
- review = await self._performTaskReview(review_context)
-
- # Add quality metrics
- quality_metrics = self._calculateTaskQualityMetrics(task_step, action_results)
-
- logger.info(f"Task review completed: {review.status}")
- return ReviewResult(
- status=review.status,
- reason=review.reason,
- improvements=review.improvements,
- quality_score=review.quality_score,
- missing_outputs=review.missing_outputs,
- met_criteria=review.met_criteria,
- unmet_criteria=review.unmet_criteria,
- confidence=review.confidence
- )
-
- except Exception as e:
- logger.error(f"Error reviewing task completion: {str(e)}")
- return ReviewResult(
- status='failed',
- reason=f'Review failed: {str(e)}',
- quality_score=0,
- confidence=0
- )
-
- # Phase 5: Task Handover and State Management
- async def prepareTaskHandover(self, task_step: TaskStep, task_actions: List[TaskAction],
- review_result: ReviewResult, workflow: ChatWorkflow) -> Dict[str, Any]:
- """Phase 5: Prepare results for next task or workflow completion"""
- try:
- logger.info(f"Preparing task handover: {task_step.description}")
-
- # Update task actions with results
- for action in task_actions:
- if action.status == TaskStatus.PENDING:
- action.status = TaskStatus.COMPLETED if review_result.status == 'success' else TaskStatus.FAILED
-
- # Create serializable task actions
- task_actions_serializable = []
- for action in task_actions:
- action_dict = {
- 'id': action.id,
- 'execMethod': action.execMethod,
- 'execAction': action.execAction,
- 'execParameters': action.execParameters,
- 'execResultLabel': action.execResultLabel,
- 'status': action.status.value if hasattr(action.status, 'value') else str(action.status)
- }
- task_actions_serializable.append(action_dict)
-
- # Create handover data
- handover_data = {
- 'task_step': task_step,
- 'task_actions': task_actions_serializable,
- 'review_result': review_result,
- 'next_task_ready': review_result.status == 'success',
- 'available_results': self._getPreviousResultsFromActions(task_actions)
- }
-
- logger.info(f"Task handover prepared: next_task_ready={handover_data['next_task_ready']}")
- return handover_data
-
- except Exception as e:
- logger.error(f"Error preparing task handover: {str(e)}")
- # Create serializable task actions for exception case
- task_actions_serializable = []
- for action in task_actions:
- action_dict = {
- 'id': action.id,
- 'execMethod': action.execMethod,
- 'execAction': action.execAction,
- 'execParameters': action.execParameters,
- 'execResultLabel': action.execResultLabel,
- 'status': action.status.value if hasattr(action.status, 'value') else str(action.status)
- }
- task_actions_serializable.append(action_dict)
-
- return {
- 'task_step': task_step,
- 'task_actions': task_actions_serializable,
- 'review_result': review_result,
- 'next_task_ready': False,
- 'available_results': []
- }
-
-
-
-
-
- # ===== Utility Methods =====
-
- async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]:
- """Process file IDs and return ChatDocument objects"""
- documents = []
-
- for fileId in fileIds:
- try:
- # Ensure service is initialized
- if not hasattr(self, 'service') or not self.service:
- logger.error(f"Service not initialized for file ID {fileId}")
- continue
-
- # Get file info from service
- fileInfo = self.service.getFileInfo(fileId)
- if fileInfo:
- # Create document using interface
- documentData = {
- "fileId": fileId,
- "filename": fileInfo.get("filename", "unknown"),
- "fileSize": fileInfo.get("size", 0),
- "mimeType": fileInfo.get("mimeType", "application/octet-stream")
- }
- document = self.chatInterface.createChatDocument(documentData)
- if document:
- documents.append(document)
- logger.info(f"Processed file ID {fileId} -> {document.filename}")
- else:
- logger.warning(f"No file info found for file ID {fileId}")
- except Exception as e:
- logger.error(f"Error processing file ID {fileId}: {str(e)}")
-
-
- return documents
-
- def setUserLanguage(self, language: str) -> None:
- """Set user language for the chat manager"""
- if hasattr(self, 'service') and self.service:
- self.service.user.language = language
-
- # ===== Enhanced Task Planning Methods =====
-
- async def _callAIWithCircuitBreaker(self, prompt: str, context: str) -> str:
- """Call AI with intelligent routing based on complexity and circuit breaker pattern"""
- max_retries = 3
- base_delay = 2 # Start with 2 seconds
-
- for attempt in range(max_retries):
- try:
- # Check circuit breaker
- if self._isCircuitBreakerOpen():
- raise Exception("AI circuit breaker is open - too many recent failures")
-
- # Determine which AI service to use based on complexity
- ai_choice = self._determineAIChoice(prompt, context)
- logger.debug(f"AI choice for {context}: {ai_choice} (attempt {attempt + 1}/{max_retries})")
-
- if ai_choice == "advanced":
- # Use advanced AI for complex tasks
- try:
- response = await asyncio.wait_for(
- self._callAdvancedAI(prompt, context),
- timeout=self.ai_call_timeout
- )
-
- # Reset failure count on success
- self.ai_failure_count = 0
- logger.info(f"Advanced AI call successful for {context}")
- return response
-
- except Exception as advanced_error:
- error_message = str(advanced_error)
- logger.warning(f"Advanced AI call failed for {context}: {error_message}")
-
- # Fall back to basic AI for complex tasks
- logger.info(f"Falling back to basic AI for complex task: {context}")
- try:
- response = await asyncio.wait_for(
- self._callStandardAI(prompt, context),
- timeout=self.ai_call_timeout
- )
-
- # Reset failure count on success
- self.ai_failure_count = 0
- logger.info(f"Basic AI fallback successful for complex task: {context}")
- return response
-
- except Exception as standard_error:
- # Both failed for complex task
- error_message = f"Advanced AI failed: {str(advanced_error)}. Basic AI failed: {str(standard_error)}"
- raise Exception(error_message)
-
- else: # basic
- # Use basic AI for simple tasks
- try:
- response = await asyncio.wait_for(
- self._callStandardAI(prompt, context),
- timeout=self.ai_call_timeout
- )
-
- # Reset failure count on success
- self.ai_failure_count = 0
- logger.info(f"Basic AI call successful for {context}")
- return response
-
- except Exception as basic_error:
- error_message = str(basic_error)
- logger.warning(f"Basic AI call failed for {context}: {error_message}")
-
- # Only upgrade to advanced AI for critical simple tasks
- if self._isCriticalTask(context):
- logger.info(f"Upgrading to advanced AI for critical simple task: {context}")
- try:
- response = await asyncio.wait_for(
- self._callAdvancedAI(prompt, context),
- timeout=self.ai_call_timeout
- )
-
- # Reset failure count on success
- self.ai_failure_count = 0
- logger.info(f"Advanced AI upgrade successful for critical task: {context}")
- return response
-
- except Exception as advanced_error:
- # Both failed for critical task
- error_message = f"Basic AI failed: {str(basic_error)}. Advanced AI failed: {str(advanced_error)}"
- raise Exception(error_message)
- else:
- # Non-critical simple task failed
- raise Exception(f"Basic AI failed for simple task: {error_message}")
-
- except asyncio.TimeoutError:
- self._recordAIFailure("Timeout")
- if attempt < max_retries - 1:
- delay = base_delay * (2 ** attempt) # Exponential backoff
- logger.warning(f"AI call timed out, retrying in {delay} seconds (attempt {attempt + 1}/{max_retries})")
- await asyncio.sleep(delay)
- continue
- else:
- raise Exception(f"AI call timed out after {self.ai_call_timeout} seconds")
-
- except Exception as e:
- error_message = str(e)
-
- # Special handling for overloaded service (529 error)
- if "overloaded" in error_message.lower() or "529" in error_message:
- if attempt < max_retries - 1:
- delay = base_delay * (2 ** attempt) # Exponential backoff
- logger.warning(f"AI service overloaded, retrying in {delay} seconds (attempt {attempt + 1}/{max_retries})")
- await asyncio.sleep(delay)
- continue
- else:
- # Don't record this as a circuit breaker failure since it's a service issue
- raise Exception("AI service is currently overloaded. Please try again in a few minutes.")
-
- # For other errors, record failure and potentially retry
- self._recordAIFailure(error_message)
- if attempt < max_retries - 1:
- delay = base_delay * (2 ** attempt) # Exponential backoff
- logger.warning(f"AI call failed, retrying in {delay} seconds (attempt {attempt + 1}/{max_retries}): {error_message}")
- await asyncio.sleep(delay)
- continue
- else:
- raise
-
- def _isCircuitBreakerOpen(self) -> bool:
- """Check if circuit breaker is open"""
- if self.ai_failure_count >= self.ai_circuit_breaker_threshold:
- if self.ai_last_failure_time:
- time_since_failure = (datetime.now(UTC) - self.ai_last_failure_time).total_seconds()
- if time_since_failure < self.ai_circuit_breaker_timeout:
- return True
- else:
- # Reset circuit breaker after timeout
- self.ai_failure_count = 0
- self.ai_last_failure_time = None
- return False
-
- def _determineAIChoice(self, prompt: str, context: str) -> str:
- """Determine whether to use advanced or basic AI based on task complexity"""
-
- # Check for forced AI choice based on context
- forced_choice = self._getForcedAIChoice(context)
- if forced_choice:
- logger.debug(f"Forced AI choice for {context}: {forced_choice}")
- return forced_choice
-
- # Define complex task patterns that require advanced AI
- complex_patterns = [
- # Task planning and workflow management
- "task_planning", "action_generation", "result_review", "task_completion_validation",
-
- # Complex document analysis
- "document", "extract", "analysis", "comprehensive", "detailed analysis",
-
- # Multi-step reasoning
- "plan", "strategy", "evaluate", "assess", "compare", "analyze",
-
- # Complex business logic
- "workflow", "task", "action", "validation", "review", "assessment",
-
- # Critical decision making
- "decision", "recommendation", "evaluation", "quality", "success criteria",
-
- # Complex prompts
- "JSON", "structured", "format", "validation", "improvements", "quality_score"
- ]
-
- # Define simple task patterns that can use basic AI
- simple_patterns = [
- # Basic text processing
- "summarize", "translate", "format", "convert", "extract text",
-
- # Simple queries
- "find", "search", "list", "get", "retrieve",
-
- # Basic operations
- "send", "upload", "download", "create", "delete",
-
- # Simple responses
- "confirm", "acknowledge", "status", "info"
- ]
-
- # Check prompt and context for complexity indicators
- combined_text = f"{prompt} {context}".lower()
-
- # Count complex indicators
- complex_count = sum(1 for pattern in complex_patterns if pattern in combined_text)
-
- # Count simple indicators
- simple_count = sum(1 for pattern in simple_patterns if pattern in combined_text)
-
- # Additional complexity factors
- prompt_length = len(prompt)
- has_json_requirement = "json" in combined_text and ("{" in prompt or "}" in prompt)
- has_structured_output = any(word in combined_text for word in ["format", "structure", "template"])
- has_validation = any(word in combined_text for word in ["validate", "check", "verify", "quality"])
-
- # Calculate complexity score
- complexity_score = 0
- complexity_score += complex_count * 2 # Complex patterns worth more
- complexity_score += simple_count * 1 # Simple patterns worth less
- complexity_score += (prompt_length > 1000) * 3 # Long prompts are complex
- complexity_score += has_json_requirement * 5 # JSON requirements are complex
- complexity_score += has_structured_output * 3 # Structured output is complex
- complexity_score += has_validation * 4 # Validation is complex
-
- # Determine AI choice based on complexity score
- if complexity_score >= 5:
- logger.debug(f"Complex task detected (score: {complexity_score}) - using advanced AI for {context}")
- return "advanced"
- else:
- logger.debug(f"Simple task detected (score: {complexity_score}) - using basic AI for {context}")
- return "basic"
-
- def _getForcedAIChoice(self, context: str) -> str:
- """Get forced AI choice for specific contexts (can be overridden)"""
-
- # Define contexts that always use advanced AI
- advanced_contexts = [
- "task_planning", # Always use advanced for task planning
- "action_generation", # Always use advanced for action generation
- "result_review", # Always use advanced for result review
- "task_completion_validation" # Always use advanced for validation
- ]
-
- # Define contexts that always use basic AI
- basic_contexts = [
- "summarize", # Always use basic for summarization
- "translate", # Always use basic for translation
- "format", # Always use basic for formatting
- "status", # Always use basic for status updates
- "info" # Always use basic for info queries
- ]
-
- context_lower = context.lower()
-
- # Check for forced advanced AI
- for advanced_context in advanced_contexts:
- if advanced_context in context_lower:
- return "advanced"
-
- # Check for forced basic AI
- for basic_context in basic_contexts:
- if basic_context in context_lower:
- return "basic"
-
- # No forced choice
- return None
-
- def _isCriticalTask(self, context: str) -> bool:
- """Determine if a simple task is critical enough to warrant advanced AI upgrade"""
-
- # Define critical task patterns
- critical_patterns = [
- # Workflow critical tasks
- "task_planning", "workflow", "critical", "essential",
-
- # User-facing decisions
- "decision", "recommendation", "evaluation", "assessment",
-
- # Quality-sensitive tasks
- "quality", "validation", "review", "check",
-
- # Business-critical operations
- "business", "strategy", "planning", "analysis"
- ]
-
- context_lower = context.lower()
-
- # Check if context contains critical patterns
- is_critical = any(pattern in context_lower for pattern in critical_patterns)
-
- if is_critical:
- logger.debug(f"Critical task detected - {context}")
-
- return is_critical
-
- def _recordAIFailure(self, error: str):
- """Record AI failure for circuit breaker"""
- self.ai_failure_count += 1
- self.ai_last_failure_time = datetime.now(UTC)
- logger.warning(f"AI failure recorded ({self.ai_failure_count}/{self.ai_circuit_breaker_threshold}): {error}")
-
- def _validateTaskPlan(self, task_plan: Dict[str, Any]) -> bool:
- """Validate task plan structure and dependencies"""
- try:
- if not isinstance(task_plan, dict):
- return False
-
- if 'tasks' not in task_plan or not isinstance(task_plan['tasks'], list):
- return False
-
- # Check each task
- task_ids = set()
- for task in task_plan['tasks']:
- if not isinstance(task, dict):
- return False
-
- required_fields = ['id', 'description', 'expected_outputs', 'success_criteria']
- if not all(field in task for field in required_fields):
- return False
-
- # Check for duplicate task IDs
- if task['id'] in task_ids:
- return False
- task_ids.add(task['id'])
-
- # Validate dependencies
- dependencies = task.get('dependencies', [])
- if not isinstance(dependencies, list):
- return False
-
- # Check that dependencies reference existing tasks
- for dep in dependencies:
- if dep not in task_ids and dep != 'task_0': # Allow task_0 as special case
- return False
-
- # Validate ai_prompt if present (optional field)
- if 'ai_prompt' in task and not isinstance(task['ai_prompt'], str):
- return False
-
- return True
-
- except Exception as e:
- logger.error(f"Error validating task plan: {str(e)}")
- return False
-
-
- def _validateActions(self, actions: List[Dict[str, Any]], context: TaskContext) -> bool:
- """Validate generated actions"""
- try:
- if not isinstance(actions, list):
- logger.error("Actions must be a list")
- return False
-
- if len(actions) == 0:
- logger.warning("No actions generated")
- return False
-
- for i, action in enumerate(actions):
- if not isinstance(action, dict):
- logger.error(f"Action {i} must be a dictionary")
- return False
-
- # Check required fields
- required_fields = ['method', 'action', 'parameters', 'resultLabel']
- missing_fields = []
- for field in required_fields:
- if field not in action or not action[field]:
- missing_fields.append(field)
-
- if missing_fields:
- logger.error(f"Action {i} missing required fields: {missing_fields}")
- return False
-
- # Validate result label format
- result_label = action.get('resultLabel', '')
- if not result_label.startswith('task'):
- logger.error(f"Action {i} result label must start with 'task': {result_label}")
- return False
-
- # Validate parameters
- parameters = action.get('parameters', {})
- if not isinstance(parameters, dict):
- logger.error(f"Action {i} parameters must be a dictionary")
- return False
-
- logger.info(f"Successfully validated {len(actions)} actions")
- return True
-
- except Exception as e:
- logger.error(f"Error validating actions: {str(e)}")
- return False
-
-
-
- # ===== Prompt Creation Methods =====
-
- def createTaskPlanningPrompt(self, context: Dict[str, Any]) -> str:
- """Create prompt for task planning"""
- return f"""You are a task planning AI that analyzes user requests and creates structured task plans.
-
-USER REQUEST: {context['user_request']}
-
-AVAILABLE DOCUMENTS: {', '.join(context['available_documents'])}
-
-INSTRUCTIONS:
-1. Analyze the user request and available documents
-2. Break down the request into 2-4 meaningful high-level task steps
-3. Focus on business outcomes, not technical operations
-4. For document processing, create ONE task with a comprehensive AI prompt rather than multiple granular tasks
-5. Each task should produce meaningful, usable outputs
-6. Ensure proper handover between tasks using result labels
-7. Return a JSON object with the exact structure shown below
-
-TASK PLANNING PRINCIPLES:
-- Combine related operations into single tasks (e.g., "Extract and analyze all candidate profiles" instead of separate "read file" and "analyze content" tasks)
-- Use comprehensive AI prompts for document processing rather than multiple small tasks
-- Focus on business value and outcomes
-- Keep tasks at a meaningful level of abstraction
-- Each task should produce results that can be used by subsequent tasks
-
-REQUIRED JSON STRUCTURE:
-{{
- "overview": "Brief description of the overall plan",
- "tasks": [
- {{
- "id": "task_1",
- "description": "Clear description of what this task accomplishes (business outcome)",
- "dependencies": ["task_0"], // IDs of tasks that must complete first
- "expected_outputs": ["output1", "output2"],
- "success_criteria": ["criteria1", "criteria2"],
- "required_documents": ["doc1", "doc2"],
- "estimated_complexity": "low|medium|high",
- "ai_prompt": "Comprehensive AI prompt for document processing tasks (if applicable)"
- }}
- ]
-}}
-
-EXAMPLES OF GOOD TASK DESCRIPTIONS:
-- "Extract and analyze all candidate profiles to identify key qualifications and experience"
-- "Create evaluation matrix and rate candidates against product designer criteria"
-- "Generate comprehensive PowerPoint presentation for management decision"
-- "Store final presentation in SharePoint for specified account"
-
-EXAMPLES OF BAD TASK DESCRIPTIONS:
-- "Open and read the PDF file" (too granular)
-- "Identify table structure" (technical detail)
-- "Convert data to CSV format" (implementation detail)
-
-NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
-
- async def createActionDefinitionPrompt(self, context: TaskContext) -> str:
- """Create prompt for action generation with enhanced document extraction guidance and retry context"""
- task_step = context.task_step
- workflow = context.workflow
- available_docs = context.available_documents or []
- previous_results = context.previous_results or []
- improvements = context.improvements or []
- retry_count = context.retry_count or 0
- previous_action_results = context.previous_action_results or []
- previous_review_result = context.previous_review_result
-
- # Get available methods and actions with signatures
- methodList = self.service.getMethodsList()
- method_actions = {}
- for sig in methodList:
- if '.' in sig:
- method, rest = sig.split('.', 1)
- action = rest.split('(')[0]
- method_actions.setdefault(method, []).append((action, sig))
-
- # Get workflow history
- messageSummary = await self.service.summarizeChat(workflow.messages)
-
- # Get available documents and connections
- docRefs = self.service.getDocumentReferenceList()
- connRefs = self.service.getConnectionReferenceList()
- all_doc_refs = docRefs.get('chat', []) + docRefs.get('history', [])
-
- # Build AVAILABLE METHODS section
- available_methods_str = ''
- for method, actions in method_actions.items():
- available_methods_str += f"- {method}:\n"
- for action, sig in actions:
- available_methods_str += f" - {action}: {sig}\n"
-
- # Get AI prompt from task step if available
- task_ai_prompt = task_step.ai_prompt or ''
-
- # Build retry context section
- retry_context = ""
- if retry_count > 0:
- retry_context = f"""
-RETRY CONTEXT (Attempt {retry_count}):
-Previous action results that failed or were incomplete:
-"""
- for i, result in enumerate(previous_action_results):
- retry_context += f"- Action {i+1}: {result.actionMethod or 'unknown'}.{result.actionName or 'unknown'}\n"
- retry_context += f" Status: {result.success and 'success' or 'failed'}\n"
- retry_context += f" Error: {result.error or 'None'}\n"
- retry_context += f" Result: {(result.data.get('result', '') if result.data else '')[:100]}...\n"
-
- if previous_review_result:
- retry_context += f"""
-Previous review feedback:
-- Status: {previous_review_result.status or 'unknown'}
-- Reason: {previous_review_result.reason or 'No reason provided'}
-- Quality Score: {previous_review_result.quality_score or 0}/10
-- Missing Outputs: {', '.join(previous_review_result.missing_outputs or [])}
-- Unmet Criteria: {', '.join(previous_review_result.unmet_criteria or [])}
-"""
-
- # Precompute all complex string expressions to avoid f-string nesting issues
- expected_outputs_str = ', '.join(task_step.expected_outputs or [])
- success_criteria_str = ', '.join(task_step.success_criteria or [])
- previous_results_str = ', '.join(previous_results) if previous_results else 'None'
- improvements_str = str(improvements) if improvements else 'None'
- available_connections_str = '\n'.join(f"- {conn}" for conn in connRefs)
- available_documents_str = '\n'.join(f"- {doc.documentsLabel} contains {', '.join(doc.documents)}" for doc in all_doc_refs)
- # Build the prompt using only precomputed variables
- prompt = f"""
-You are an action generation AI that creates specific actions to accomplish a task step.
-
-DOCUMENT REFERENCE TYPES:
-- docItem: Reference to a single document. Format: "docItem::"
-- docList: Reference to a group of documents under a label. Format: