From 53a4a39214b9a09d1a026e2c3a4038462a2a737a Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Fri, 11 Jul 2025 00:45:57 +0200
Subject: [PATCH] cleaned chatmanager with pydantic models
---
modules/{workflow => chat}/managerChat.py | 780 ++++++++++--------
modules/{workflow => chat}/methodBase.py | 0
.../{workflow => chat}/processorDocument.py | 0
modules/{workflow => chat}/serviceCenter.py | 4 +-
modules/interfaces/interfaceChatModel.py | 72 ++
modules/methods/EXCLUDED_methodCoder.py | 2 +-
modules/methods/methodDocument.py | 2 +-
modules/methods/methodOutlook.py | 2 +-
modules/methods/methodSharepoint.py | 2 +-
modules/methods/methodWeb.py | 2 +-
modules/workflow/managerWorkflow.py | 43 +-
11 files changed, 520 insertions(+), 389 deletions(-)
rename modules/{workflow => chat}/managerChat.py (80%)
rename modules/{workflow => chat}/methodBase.py (100%)
rename modules/{workflow => chat}/processorDocument.py (100%)
rename modules/{workflow => chat}/serviceCenter.py (99%)
diff --git a/modules/workflow/managerChat.py b/modules/chat/managerChat.py
similarity index 80%
rename from modules/workflow/managerChat.py
rename to modules/chat/managerChat.py
index 7b77aa6e..68bc643d 100644
--- a/modules/workflow/managerChat.py
+++ b/modules/chat/managerChat.py
@@ -9,9 +9,9 @@ from datetime import datetime, UTC
from modules.interfaces.interfaceAppModel import User
from modules.interfaces.interfaceChatModel import (
TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, UserInputRequest, ActionResult,
- ExtractedContent, ContentItem, ContentMetadata, DocumentExchange
+ ExtractedContent, ContentItem, ContentMetadata, DocumentExchange, TaskStep, TaskContext, ActionExecutionResult, ReviewContext, ReviewResult, TaskPlan, WorkflowResult
)
-from modules.workflow.serviceCenter import ServiceCenter
+from modules.chat.serviceCenter import ServiceCenter
from modules.interfaces.interfaceChatObjects import ChatObjects
logger = logging.getLogger(__name__)
@@ -20,23 +20,23 @@ logger = logging.getLogger(__name__)
class TaskExecutionState:
"""Manages state during task execution with retry logic"""
- def __init__(self, task_step: dict):
+ def __init__(self, task_step: TaskStep):
self.task_step = task_step
- self.successful_actions = [] # Preserved across retries
- self.failed_actions = [] # For analysis
+ self.successful_actions: List[ActionExecutionResult] = [] # Preserved across retries
+ self.failed_actions: List[ActionExecutionResult] = [] # For analysis
self.current_action_index = 0
self.retry_count = 0
self.improvements = []
self.partial_results = {} # Store intermediate results
self.max_retries = 3
- def addSuccessfulAction(self, action_result: dict):
+ def addSuccessfulAction(self, action_result: ActionExecutionResult):
self.successful_actions.append(action_result)
- if action_result.get('resultLabel'):
- self.partial_results[action_result['resultLabel']] = action_result
- def addFailedAction(self, action_result: dict):
+ if action_result.data.get('resultLabel'):
+ self.partial_results[action_result.data['resultLabel']] = action_result
+ def addFailedAction(self, action_result: ActionExecutionResult):
self.failed_actions.append(action_result)
def getAvailableResults(self) -> list:
- return [result.get('resultLabel', '') for result in self.successful_actions if result.get('resultLabel')]
+ return [result.data.get('resultLabel', '') for result in self.successful_actions if result.data.get('resultLabel')]
def shouldRetryTask(self) -> bool:
return len(self.successful_actions) > 0 and len(self.failed_actions) > 0
def canRetry(self) -> bool:
@@ -46,7 +46,7 @@ class TaskExecutionState:
def getFailurePatterns(self) -> list:
patterns = []
for action in self.failed_actions:
- error = action.get('error', '').lower()
+ error = action.error.lower() if action.error else ''
if "timeout" in error:
patterns.append("timeout_issues")
elif "document_not_found" in error or "file not found" in error:
@@ -64,7 +64,7 @@ class ActionValidator:
def __init__(self, chat_manager):
self.chat_manager = chat_manager
- async def validateActionResult(self, action_result: ActionResult, action: TaskAction, context: dict) -> dict:
+ async def validateActionResult(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> dict:
"""Generic action validation using AI"""
try:
# Create generic validation prompt
@@ -92,7 +92,7 @@ class ActionValidator:
'result_label': action.execResultLabel
}
- def _createGenericValidationPrompt(self, action_result: ActionResult, action: TaskAction, context: dict) -> str:
+ def _createGenericValidationPrompt(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> str:
"""Create a validation prompt focused on result file delivery"""
# Extract data from ActionResult model
success = action_result.success
@@ -165,8 +165,8 @@ CRITICAL VALIDATION CRITERIA:
5. **Content Processing**: If content extraction was expected, was it performed correctly?
CONTEXT:
-- Task Description: {context.get('task_description', 'Unknown')}
-- Previous Results: {', '.join(context.get('previous_results', []))}
+- Task Description: {context.task_step.description if context.task_step else 'Unknown'}
+- Previous Results: {', '.join(context.previous_results) if context.previous_results else 'None'}
VALIDATION INSTRUCTIONS:
1. Check if the expected result label "{expected_result_label}" is present in the result
@@ -251,7 +251,7 @@ class ChatManager:
# ===== WORKFLOW PHASES =====
# Phase 1: High-Level Task Planning
- async def planHighLevelTasks(self, userInput: str, workflow: ChatWorkflow) -> Dict[str, Any]:
+ async def planHighLevelTasks(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan:
"""Phase 1: Plan high-level tasks from user input"""
try:
logger.info(f"Planning high-level tasks for workflow {workflow.id}")
@@ -267,13 +267,33 @@ class ChatManager:
response = await self.service.callAiTextAdvanced(prompt)
# Parse and validate task plan
- task_plan = self._parseTaskPlanResponse(response)
+ task_plan_dict = self._parseTaskPlanResponse(response)
- if not self._validateTaskPlan(task_plan):
+ if not self._validateTaskPlan(task_plan_dict):
logger.error("Generated task plan failed validation")
raise Exception("AI-generated task plan failed validation - AI is required for task planning")
- logger.info(f"High-level task planning completed: {len(task_plan.get('tasks', []))} tasks")
+ # Convert to TaskPlan model
+ tasks = []
+ for task_dict in task_plan_dict.get('tasks', []):
+ task = TaskStep(
+ id=task_dict.get('id', ''),
+ description=task_dict.get('description', ''),
+ dependencies=task_dict.get('dependencies', []),
+ expected_outputs=task_dict.get('expected_outputs', []),
+ success_criteria=task_dict.get('success_criteria', []),
+ required_documents=task_dict.get('required_documents', []),
+ estimated_complexity=task_dict.get('estimated_complexity'),
+ ai_prompt=task_dict.get('ai_prompt')
+ )
+ tasks.append(task)
+
+ task_plan = TaskPlan(
+ overview=task_plan_dict.get('overview', ''),
+ tasks=tasks
+ )
+
+ logger.info(f"High-level task planning completed: {len(task_plan.tasks)} tasks")
return task_plan
except Exception as e:
@@ -281,27 +301,31 @@ class ChatManager:
raise Exception(f"AI is required for task planning but failed: {str(e)}")
# Phase 2: Task Definition and Action Generation
- async def defineTaskActions(self, task_step: Dict[str, Any], workflow: ChatWorkflow, previous_results: List[str] = None,
- enhanced_context: Dict[str, Any] = None) -> List[TaskAction]:
+ async def defineTaskActions(self, task_step: TaskStep, workflow: ChatWorkflow, previous_results: List[str] = None,
+ enhanced_context: TaskContext = None) -> List[TaskAction]:
"""Phase 2: Define specific actions for a task step with enhanced retry context"""
try:
- logger.info(f"Defining actions for task: {task_step.get('description', 'Unknown')}")
+ logger.info(f"Defining actions for task: {task_step.description if hasattr(task_step, 'description') else 'Unknown'}")
# Use enhanced context if provided (for retries), otherwise create basic context
if enhanced_context:
context = enhanced_context
else:
- context = {
- 'task_step': task_step,
- 'workflow': workflow,
- 'workflow_id': workflow.id,
- 'available_documents': self._getAvailableDocuments(workflow),
- 'previous_results': previous_results or [],
- 'improvements': None,
- 'retry_count': 0,
- 'previous_action_results': [],
- 'previous_review_result': None
- }
+ context = TaskContext(
+ task_step=task_step,
+ workflow=workflow,
+ workflow_id=workflow.id,
+ available_documents=self._getAvailableDocuments(workflow),
+ previous_results=previous_results or [],
+ improvements=[],
+ retry_count=0,
+ previous_action_results=[],
+ previous_review_result=None,
+ is_regeneration=False,
+ failure_patterns=[],
+ failed_actions=[],
+ successful_actions=[]
+ )
# Generate actions using AI
actions = await self._generateActionsForTaskStep(context)
@@ -336,7 +360,7 @@ class ChatManager:
return []
# Phase 3: Action Execution
- async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[Dict[str, Any]]:
+ async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[ActionExecutionResult]:
"""Phase 3: Execute all actions for a task with retry mechanism"""
try:
logger.info(f"Executing {len(task_actions)} task actions")
@@ -350,7 +374,7 @@ class ChatManager:
results.append(result)
# If action failed after all retries, continue with next action instead of stopping
- if result.get('status') == 'failed':
+ if not result.success:
logger.error(f"Action {i+1} failed after retries, continuing with next action")
# Don't break - continue with remaining actions
continue
@@ -363,60 +387,70 @@ class ChatManager:
return []
# Phase 4: Task Review and Quality Assessment
- async def reviewTaskCompletion(self, task_step: Dict[str, Any], task_actions: List[TaskAction],
- action_results: List[Dict[str, Any]], workflow: ChatWorkflow) -> Dict[str, Any]:
+ async def reviewTaskCompletion(self, task_step: TaskStep, task_actions: List[TaskAction],
+ action_results: List[ActionExecutionResult], workflow: ChatWorkflow) -> ReviewResult:
"""Phase 4: Review task completion and decide next steps"""
try:
- logger.info(f"Reviewing task completion: {task_step.get('description', 'Unknown')}")
+ logger.info(f"Reviewing task completion: {task_step.description}")
# Create step result summary from action results
step_result = {
'task_step': task_step,
'action_results': action_results,
- 'successful_actions': sum(1 for result in action_results if result.get('status') == 'completed'),
+ 'successful_actions': sum(1 for result in action_results if result.success),
'total_actions': len(action_results),
- 'results': [result.get('result', '') for result in action_results if result.get('status') == 'completed'],
- 'errors': [result.get('error', '') for result in action_results if result.get('status') == 'failed']
+ 'results': [result.data.get('result', '') for result in action_results if result.success],
+ 'errors': [result.error for result in action_results if not result.success]
}
# Prepare review context
- review_context = {
- 'task_step': task_step,
- 'task_actions': task_actions,
- 'action_results': action_results,
- 'step_result': step_result, # Add the missing step_result
- 'workflow_id': workflow.id,
- 'previous_results': self._getPreviousResultsFromActions(task_actions)
- }
+ review_context = ReviewContext(
+ task_step=task_step,
+ task_actions=task_actions,
+ action_results=action_results,
+ step_result=step_result,
+ workflow_id=workflow.id,
+ previous_results=self._getPreviousResultsFromActions(task_actions)
+ )
# Use AI to review the results
review = await self._performTaskReview(review_context)
# Add quality metrics
- review['quality_metrics'] = self._calculateTaskQualityMetrics(task_step, action_results)
+ quality_metrics = self._calculateTaskQualityMetrics(task_step, action_results)
- logger.info(f"Task review completed: {review.get('status', 'unknown')}")
- return review
+ logger.info(f"Task review completed: {review.status}")
+ return ReviewResult(
+ status=review.status,
+ reason=review.reason,
+ improvements=review.improvements,
+ quality_score=review.quality_score,
+ missing_outputs=review.missing_outputs,
+ met_criteria=review.met_criteria,
+ unmet_criteria=review.unmet_criteria,
+ confidence=review.confidence
+ )
except Exception as e:
logger.error(f"Error reviewing task completion: {str(e)}")
- return {
- 'status': 'failed',
- 'reason': f'Review failed: {str(e)}',
- 'quality_metrics': {'score': 0, 'confidence': 0}
- }
+ return ReviewResult(
+ status='failed',
+ reason=f'Review failed: {str(e)}',
+ quality_score=0,
+ confidence=0
+ )
# Phase 5: Task Handover and State Management
- async def prepareTaskHandover(self, task_step: Dict[str, Any], task_actions: List[TaskAction],
- review_result: Dict[str, Any], workflow: ChatWorkflow) -> Dict[str, Any]:
+ async def prepareTaskHandover(self, task_step: TaskStep, task_actions: List[TaskAction],
+ review_result: ReviewResult, workflow: ChatWorkflow) -> Dict[str, Any]:
"""Phase 5: Prepare results for next task or workflow completion"""
try:
- logger.info(f"Preparing task handover: {task_step.get('description', 'Unknown')}")
+ logger.info(f"Preparing task handover: {task_step.description}")
# Update task actions with results
for action in task_actions:
if action.status == TaskStatus.PENDING:
- action.status = TaskStatus.COMPLETED if review_result.get('status') == 'success' else TaskStatus.FAILED
+ action.status = TaskStatus.COMPLETED if review_result.status == 'success' else TaskStatus.FAILED
# Create serializable task actions
task_actions_serializable = []
@@ -436,7 +470,7 @@ class ChatManager:
'task_step': task_step,
'task_actions': task_actions_serializable,
'review_result': review_result,
- 'next_task_ready': review_result.get('status') == 'success',
+ 'next_task_ready': review_result.status == 'success',
'available_results': self._getPreviousResultsFromActions(task_actions)
}
@@ -601,7 +635,7 @@ class ChatManager:
return False
- def _validateActions(self, actions: List[Dict[str, Any]], context: Dict[str, Any]) -> bool:
+ def _validateActions(self, actions: List[Dict[str, Any]], context: TaskContext) -> bool:
"""Validate generated actions"""
try:
if not isinstance(actions, list):
@@ -705,16 +739,16 @@ EXAMPLES OF BAD TASK DESCRIPTIONS:
NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
- async def _createActionDefinitionPrompt(self, context: Dict[str, Any]) -> str:
+ async def _createActionDefinitionPrompt(self, context: TaskContext) -> str:
"""Create prompt for action generation with enhanced document extraction guidance and retry context"""
- task_step = context['task_step']
- workflow = context.get('workflow')
- available_docs = context['available_documents']
- previous_results = context['previous_results']
- improvements = context.get('improvements', '')
- retry_count = context.get('retry_count', 0)
- previous_action_results = context.get('previous_action_results', [])
- previous_review_result = context.get('previous_review_result')
+ task_step = context.task_step
+ workflow = context.workflow
+ available_docs = context.available_documents or []
+ previous_results = context.previous_results or []
+ improvements = context.improvements or []
+ retry_count = context.retry_count or 0
+ previous_action_results = context.previous_action_results or []
+ previous_review_result = context.previous_review_result
# Get available methods and actions with signatures
methodList = self.service.getMethodsList()
@@ -741,7 +775,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
available_methods_str += f" - {action}: {sig}\n"
# Get AI prompt from task step if available
- task_ai_prompt = task_step.get('ai_prompt', '')
+ task_ai_prompt = task_step.ai_prompt or ''
# Build retry context section
retry_context = ""
@@ -751,19 +785,19 @@ RETRY CONTEXT (Attempt {retry_count}):
Previous action results that failed or were incomplete:
"""
for i, result in enumerate(previous_action_results):
- retry_context += f"- Action {i+1}: {result.get('actionMethod', 'unknown')}.{result.get('actionName', 'unknown')}\n"
- retry_context += f" Status: {result.get('status', 'unknown')}\n"
- retry_context += f" Error: {result.get('error', 'None')}\n"
- retry_context += f" Result: {result.get('result', '')[:100]}...\n"
+ retry_context += f"- Action {i+1}: {result.actionMethod or 'unknown'}.{result.actionName or 'unknown'}\n"
+ retry_context += f" Status: {result.success and 'success' or 'failed'}\n"
+ retry_context += f" Error: {result.error or 'None'}\n"
+ retry_context += f" Result: {(result.data.get('result', '') if result.data else '')[:100]}...\n"
if previous_review_result:
retry_context += f"""
Previous review feedback:
-- Status: {previous_review_result.get('status', 'unknown')}
-- Reason: {previous_review_result.get('reason', 'No reason provided')}
-- Quality Score: {previous_review_result.get('quality_score', 0)}/10
-- Missing Outputs: {', '.join(previous_review_result.get('missing_outputs', []))}
-- Unmet Criteria: {', '.join(previous_review_result.get('unmet_criteria', []))}
+- Status: {previous_review_result.status or 'unknown'}
+- Reason: {previous_review_result.reason or 'No reason provided'}
+- Quality Score: {previous_review_result.quality_score or 0}/10
+- Missing Outputs: {', '.join(previous_review_result.missing_outputs or [])}
+- Unmet Criteria: {', '.join(previous_review_result.unmet_criteria or [])}
"""
return f"""
@@ -775,9 +809,9 @@ DOCUMENT REFERENCE TYPES:
- Each docList label maps to a list of docItem references (see AVAILABLE DOCUMENTS).
- A label like "task1_action2_results" refers to the output of action 2 in task 1.
-TASK STEP: {task_step.get('description', 'Unknown')} (ID: {task_step.get('id', 'Unknown')})
-EXPECTED OUTPUTS: {', '.join(task_step.get('expected_outputs', []))}
-SUCCESS CRITERIA: {', '.join(task_step.get('success_criteria', []))}
+TASK STEP: {task_step.description} (ID: {task_step.id})
+EXPECTED OUTPUTS: {', '.join(task_step.expected_outputs or [])}
+SUCCESS CRITERIA: {', '.join(task_step.success_criteria or [])}
TASK AI PROMPT: {task_ai_prompt if task_ai_prompt else 'None provided'}
CONTEXT - Chat History:
@@ -872,18 +906,18 @@ EXAMPLES OF GOOD ACTIONS:
NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
- def _createResultReviewPrompt(self, review_context: Dict[str, Any]) -> str:
+ def _createResultReviewPrompt(self, review_context: ReviewContext) -> str:
"""Create prompt for result review"""
- task_step = review_context['task_step']
- step_result = review_context['step_result']
+ task_step = review_context.task_step
+ step_result = review_context.step_result or {}
# Create serializable version of step_result with only metadata (no document content)
step_result_serializable = {
'task_step': {
- 'id': task_step.get('id', ''),
- 'description': task_step.get('description', ''),
- 'expected_outputs': task_step.get('expected_outputs', []),
- 'success_criteria': task_step.get('success_criteria', [])
+ 'id': task_step.id,
+ 'description': task_step.description,
+ 'expected_outputs': task_step.expected_outputs or [],
+ 'success_criteria': task_step.success_criteria or []
},
'action_results': [],
'successful_actions': step_result.get('successful_actions', 0),
@@ -893,10 +927,10 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
}
# Convert action_results to serializable format with only metadata (no document content)
- for action_result in step_result.get('action_results', []):
+ for action_result in review_context.action_results or []:
# Extract only document metadata, not content
documents_metadata = []
- for doc in action_result.get('documents', []):
+ for doc in action_result.documents or []:
if hasattr(doc, 'filename'):
documents_metadata.append({
'filename': doc.filename,
@@ -911,24 +945,24 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
})
serializable_action_result = {
- 'status': action_result.get('status', ''),
- 'result_summary': action_result.get('result', '')[:200] + '...' if len(action_result.get('result', '')) > 200 else action_result.get('result', ''),
- 'error': action_result.get('error', ''),
- 'resultLabel': action_result.get('resultLabel', ''),
+ 'status': 'completed' if action_result.success else 'failed',
+ 'result_summary': action_result.data.get('result', '')[:200] + '...' if len(action_result.data.get('result', '')) > 200 else action_result.data.get('result', ''),
+ 'error': action_result.error,
+ 'resultLabel': action_result.data.get('resultLabel', ''),
'documents_count': len(documents_metadata),
'documents_metadata': documents_metadata,
- 'actionId': action_result.get('actionId', ''),
- 'actionMethod': action_result.get('actionMethod', ''),
- 'actionName': action_result.get('actionName', ''),
- 'success_indicator': 'documents' if len(documents_metadata) > 0 else 'text_result' if action_result.get('result', '').strip() else 'none'
+ 'actionId': action_result.actionId,
+ 'actionMethod': action_result.actionMethod,
+ 'actionName': action_result.actionName,
+ 'success_indicator': 'documents' if len(documents_metadata) > 0 else 'text_result' if action_result.data.get('result', '').strip() else 'none'
}
step_result_serializable['action_results'].append(serializable_action_result)
return f"""You are a result review AI that evaluates task step completion and decides on next actions.
-TASK STEP: {task_step.get('description', 'Unknown')}
-EXPECTED OUTPUTS: {', '.join(task_step.get('expected_outputs', []))}
-SUCCESS CRITERIA: {', '.join(task_step.get('success_criteria', []))}
+TASK STEP: {task_step.description}
+EXPECTED OUTPUTS: {', '.join(task_step.expected_outputs or [])}
+SUCCESS CRITERIA: {', '.join(task_step.success_criteria or [])}
STEP RESULT: {json.dumps(step_result_serializable, indent=2, ensure_ascii=False)}
@@ -961,7 +995,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# ===== HELPER METHODS FOR WORKFLOW PHASES =====
- async def _generateActionsForTaskStep(self, context: Dict[str, Any]) -> List[Dict[str, Any]]:
+ async def _generateActionsForTaskStep(self, context: TaskContext) -> List[Dict[str, Any]]:
"""Generate actions for a specific task step with enhanced retry context"""
try:
# Prepare prompt for action generation
@@ -1376,7 +1410,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
logger.error(f"Error converting document data to string: {str(e)}")
return str(document_data)
- async def _performTaskReview(self, review_context: Dict[str, Any]) -> Dict[str, Any]:
+ async def _performTaskReview(self, review_context: ReviewContext) -> ReviewResult:
"""Perform AI-based task review with enhanced retry logic"""
try:
# Prepare prompt for result review
@@ -1386,58 +1420,68 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
response = await self._callAIWithCircuitBreaker(prompt, "result_review")
# Parse review result
- review = self._parseReviewResponse(response)
+ review_dict = self._parseReviewResponse(response)
# Add default values for missing fields
- review.setdefault('status', 'unknown')
- review.setdefault('reason', 'No reason provided')
- review.setdefault('quality_score', 5)
+ review_dict.setdefault('status', 'unknown')
+ review_dict.setdefault('reason', 'No reason provided')
+ review_dict.setdefault('quality_score', 5)
# Enhanced retry logic based on result quality
- if review.get('status') == 'retry':
+ if review_dict.get('status') == 'retry':
# Analyze the specific issues for better retry guidance
- action_results = review_context.get('action_results', [])
+ action_results = review_context.action_results or []
if action_results:
# Check for common issues that warrant retry
# Only consider empty results a problem if there are no documents produced
has_empty_results = any(
- not result.get('result', '').strip() and
- not result.get('documents', []) and
- not result.get('documents_metadata', [])
+ not result.data.get('result', '').strip() and
+ not result.documents and
+ not result.documents
for result in action_results
- if result.get('status') == 'completed'
+ if result.success
)
has_incomplete_metadata = any(
- any(doc.get('filename') == 'unknown' for doc in result.get('documents_metadata', []))
+ any(doc.get('filename') == 'unknown' for doc in result.documents or [])
for result in action_results
- if result.get('status') == 'completed'
+ if result.success
)
if has_empty_results:
- review['improvements'] = (review.get('improvements', '') +
+ review_dict['improvements'] = (review_dict.get('improvements', '') +
" Ensure the document extraction returns actual content, not empty results. " +
"Check if the AI prompt is specific enough to extract meaningful data.")
if has_incomplete_metadata:
- review['improvements'] = (review.get('improvements', '') +
+ review_dict['improvements'] = (review_dict.get('improvements', '') +
" Ensure proper document metadata is extracted including filename, size, and mime type. " +
"The document processing should provide complete file information.")
# If we have specific issues, adjust quality score
if has_empty_results or has_incomplete_metadata:
- review['quality_score'] = max(1, review.get('quality_score', 5) - 2)
+ review_dict['quality_score'] = max(1, review_dict.get('quality_score', 5) - 2)
- return review
+ # Create ReviewResult model
+ return ReviewResult(
+ status=review_dict.get('status', 'unknown'),
+ reason=review_dict.get('reason', 'No reason provided'),
+ improvements=review_dict.get('improvements', []),
+ quality_score=review_dict.get('quality_score', 5),
+ missing_outputs=review_dict.get('missing_outputs', []),
+ met_criteria=review_dict.get('met_criteria', []),
+ unmet_criteria=review_dict.get('unmet_criteria', []),
+ confidence=review_dict.get('confidence', 0.5)
+ )
except Exception as e:
logger.error(f"Error performing task review: {str(e)}")
- return {
- 'status': 'success', # Default to success to avoid blocking workflow
- 'reason': f'Review failed: {str(e)}',
- 'quality_score': 5,
- 'confidence': 0.5
- }
+ return ReviewResult(
+ status='success', # Default to success to avoid blocking workflow
+ reason=f'Review failed: {str(e)}',
+ quality_score=5,
+ confidence=0.5
+ )
def _getPreviousResultsFromActions(self, task_actions: List[TaskAction]) -> List[str]:
"""Get list of previous results from completed actions and workflow messages"""
@@ -1459,14 +1503,14 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
return results
- def _calculateTaskQualityMetrics(self, task_step: Dict[str, Any], action_results: List[Dict[str, Any]]) -> Dict[str, Any]:
+ def _calculateTaskQualityMetrics(self, task_step: TaskStep, action_results: List[ActionExecutionResult]) -> Dict[str, Any]:
"""Calculate quality metrics for task step results"""
try:
quality_score = 0
confidence = 0
# Count successful actions
- successful_actions = sum(1 for result in action_results if result.get('status') == 'completed')
+ successful_actions = sum(1 for result in action_results if result.success)
total_actions = len(action_results)
if total_actions > 0:
@@ -1616,7 +1660,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# ===== UNIFIED WORKFLOW EXECUTION =====
- async def executeUnifiedWorkflow(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> Dict[str, Any]:
+ async def executeUnifiedWorkflow(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> WorkflowResult:
"""Execute a unified workflow with state management and action-level validation"""
try:
logger.info(f"Starting unified workflow execution for workflow {workflow.id}")
@@ -1644,8 +1688,8 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
self.service.updateWorkflowStats(eventLabel="taskplan", bytesSent=task_plan_size)
# Create user-friendly task plan log
- tasks_count = len(task_plan.get('tasks', []))
- task_descriptions = "\n".join([f"- {task.get('description', 'No description')}" for task in task_plan.get('tasks', [])])
+ tasks_count = len(task_plan.tasks)
+ task_descriptions = "\n".join([f"- {task.description}" for task in task_plan.tasks])
self.chatInterface.createWorkflowLog({
"workflowId": workflow.id,
"message": f"Planning completed: {tasks_count} tasks identified\n{task_descriptions}",
@@ -1657,19 +1701,19 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# Log task plan details (without document content)
task_plan_log = {
- 'overview': task_plan.get('overview', ''),
- 'tasks_count': len(task_plan.get('tasks', [])),
+ 'overview': task_plan.overview,
+ 'tasks_count': len(task_plan.tasks),
'tasks': []
}
- for task in task_plan.get('tasks', []):
+ for task in task_plan.tasks:
task_log = {
- 'id': task.get('id', ''),
- 'description': task.get('description', ''),
- 'dependencies': task.get('dependencies', []),
- 'expected_outputs': task.get('expected_outputs', []),
- 'success_criteria': task.get('success_criteria', []),
- 'required_documents_count': len(task.get('required_documents', [])),
- 'estimated_complexity': task.get('estimated_complexity', '')
+ 'id': task.id,
+ 'description': task.description,
+ 'dependencies': task.dependencies or [],
+ 'expected_outputs': task.expected_outputs or [],
+ 'success_criteria': task.success_criteria or [],
+ 'required_documents_count': len(task.required_documents or []),
+ 'estimated_complexity': task.estimated_complexity or ''
}
task_plan_log['tasks'].append(task_log)
logger.debug(f"TASK PLAN CREATED: {json.dumps(task_plan_log, indent=2, ensure_ascii=False)}")
@@ -1678,15 +1722,15 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
workflow_results = []
previous_results = []
- for i, task_step in enumerate(task_plan['tasks']):
- task_description = task_step.get('description', 'Unknown')
- logger.info(f"=== PROCESSING TASK {i+1}/{len(task_plan['tasks'])}: {task_description} ===")
+ for i, task_step in enumerate(task_plan.tasks):
+ task_description = task_step.description
+ logger.info(f"=== PROCESSING TASK {i+1}/{len(task_plan.tasks)}: {task_description} ===")
# Create user-friendly task start log
- progress = 20 + (i * 60 // len(task_plan['tasks']))
+ progress = 20 + (i * 60 // len(task_plan.tasks))
self.chatInterface.createWorkflowLog({
"workflowId": workflow.id,
- "message": f"Executing task {i+1}/{len(task_plan['tasks'])}: {task_description}",
+ "message": f"Executing task {i+1}/{len(task_plan.tasks)}: {task_description}",
"type": "info",
"status": "running",
"progress": progress,
@@ -1694,62 +1738,56 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
})
# Create context for task execution
- task_context = {
- 'workflow': workflow,
- 'workflow_id': workflow.id,
- 'available_documents': self._getAvailableDocuments(workflow),
- 'previous_results': previous_results,
- 'task_description': task_description
- }
+ task_context = TaskContext(
+ task_step=task_step,
+ workflow=workflow,
+ workflow_id=workflow.id,
+ available_documents=self._getAvailableDocuments(workflow),
+ previous_results=previous_results
+ )
# Execute task with state management
task_result = await self.executeTaskWithStateManagement(task_step, workflow, task_context)
# Log task result
- if task_result['status'] == 'success':
- successful_actions = len(task_result.get('successful_actions', []))
- failed_actions = len(task_result.get('failed_actions', []))
- quality_metrics = task_result.get('quality_metrics', {})
- quality_score = quality_metrics.get('score', 0)
-
+ if task_result.success:
self.chatInterface.createWorkflowLog({
"workflowId": workflow.id,
- "message": f"π― Task {i+1} completed successfully ({successful_actions} actions, quality score: {quality_score})",
+ "message": f"π― Task {i+1} completed successfully",
"type": "success",
"status": "running",
"progress": progress + 20
})
# Update previous results for next task
- previous_results = task_result.get('successful_actions', [])
+ # Note: TaskResult doesn't have successful_actions field, so we'll use a placeholder
+ previous_results = [f"task_{i+1}_results"]
else:
# Task failed - provide detailed feedback and stop workflow
- failed_actions = len(task_result.get('failed_actions', []))
- retry_count = task_result.get('retry_count', 0)
- reason = task_result.get('reason', 'Unknown error')
+ reason = task_result.error or 'Unknown error'
self.chatInterface.createWorkflowLog({
"workflowId": workflow.id,
- "message": f"β Task {i+1} failed after {retry_count} retries: {reason}",
+ "message": f"β Task {i+1} failed: {reason}",
"type": "error",
"status": "failed",
"progress": progress + 15
})
# Generate detailed failure feedback
- failure_feedback = self._generateTaskFailureFeedback(task_step, task_result, i+1, len(task_plan['tasks']))
+ failure_feedback = self._generateTaskFailureFeedback(task_step, task_result, i+1, len(task_plan.tasks))
# Stop workflow and return failure
- return {
- 'status': 'failed',
- 'completed_tasks': i,
- 'total_tasks': len(task_plan['tasks']),
- 'failed_task': task_step,
- 'failure_reason': reason,
- 'feedback': failure_feedback,
- 'execution_time': time.time() - start_time
- }
+ return WorkflowResult(
+ status='failed',
+ error=reason,
+ phase='execution',
+ completed_tasks=i,
+ total_tasks=len(task_plan.tasks),
+ execution_time=time.time() - start_time,
+ final_results_count=0
+ )
# Add task result to workflow results
workflow_results.append(task_result)
@@ -1760,23 +1798,23 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# Create final success log
self.chatInterface.createWorkflowLog({
"workflowId": workflow.id,
- "message": f"π Workflow completed successfully ({len(workflow_results)}/{len(task_plan['tasks'])} tasks)",
+ "message": f"π Workflow completed successfully ({len(workflow_results)}/{len(task_plan.tasks)} tasks)",
"type": "success",
"status": "completed",
"progress": 100
})
# Create workflow summary
- workflow_summary = {
- 'status': 'completed',
- 'completed_tasks': len(workflow_results),
- 'total_tasks': len(task_plan['tasks']),
- 'execution_time': total_processing_time,
- 'final_results_count': len(previous_results)
- }
+ workflow_summary = WorkflowResult(
+ status='completed',
+ completed_tasks=len(workflow_results),
+ total_tasks=len(task_plan.tasks),
+ execution_time=total_processing_time,
+ final_results_count=len(previous_results)
+ )
- logger.info(f"=== UNIFIED WORKFLOW COMPLETED: {len(workflow_results)}/{len(task_plan['tasks'])} tasks successful ===")
- logger.debug(f"FINAL WORKFLOW SUMMARY: {json.dumps(workflow_summary, indent=2, ensure_ascii=False)}")
+ logger.info(f"=== UNIFIED WORKFLOW COMPLETED: {len(workflow_results)}/{len(task_plan.tasks)} tasks successful ===")
+ logger.debug(f"FINAL WORKFLOW SUMMARY: {json.dumps(workflow_summary.model_dump(), indent=2, ensure_ascii=False)}")
return workflow_summary
except Exception as e:
@@ -1790,31 +1828,30 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
"progress": 100,
"agentName": "System"
})
- return {
- 'status': 'failed',
- 'error': str(e),
- 'phase': 'execution'
- }
+ return WorkflowResult(
+ status='failed',
+ error=str(e),
+ phase='execution',
+ completed_tasks=0,
+ total_tasks=0,
+ execution_time=0,
+ final_results_count=0
+ )
- def _generateTaskFailureFeedback(self, task_step: Dict[str, Any], task_result: Dict[str, Any], task_number: int, total_tasks: int) -> str:
+ def _generateTaskFailureFeedback(self, task_step: TaskStep, task_result: TaskResult, task_number: int, total_tasks: int) -> str:
"""Generate detailed feedback for task failure"""
- successful_actions = len(task_result.get('successful_actions', []))
- failed_actions = len(task_result.get('failed_actions', []))
- retry_count = task_result.get('retry_count', 0)
- reason = task_result.get('reason', 'Unknown error')
-
feedback = f"""
Workflow execution stopped due to task failure.
PROGRESS: {task_number}/{total_tasks} tasks completed successfully
-FAILED TASK: {task_step.get('description', 'Unknown')}
+FAILED TASK: {task_step.description}
FAILURE DETAILS:
-- Retry attempts: {retry_count}
-- Successful actions: {successful_actions}
-- Failed actions: {failed_actions}
-- Failure reason: {reason}
+- Task ID: {task_result.taskId}
+- Status: {task_result.status}
+- Success: {task_result.success}
+- Error: {task_result.error or 'Unknown error'}
The workflow cannot continue because this task is essential for subsequent steps.
Please review the task requirements and try again with different input or approach.
@@ -1824,10 +1861,10 @@ Please review the task requirements and try again with different input or approa
# ===== NEW STATE MANAGEMENT AND VALIDATION CLASSES =====
- async def executeTaskWithStateManagement(self, task_step: Dict[str, Any], workflow: ChatWorkflow, context: Dict[str, Any]) -> Dict[str, Any]:
+ async def executeTaskWithStateManagement(self, task_step: TaskStep, workflow: ChatWorkflow, context: TaskContext) -> TaskResult:
"""Execute task with state management and action-level retry logic"""
try:
- logger.info(f"Executing task with state management: {task_step.get('description', 'Unknown')}")
+ logger.info(f"Executing task with state management: {task_step.description}")
# Initialize task state
state = TaskExecutionState(task_step)
@@ -1837,20 +1874,19 @@ Please review the task requirements and try again with different input or approa
# Generate actions (first time or after regeneration)
if state.retry_count == 0:
- actions = await self.defineTaskActions(task_step, workflow, context.get('previous_results', []))
+ actions = await self.defineTaskActions(task_step, workflow, context.previous_results)
else:
# Regenerate actions with failure context
actions = await self._regenerateTaskActionsWithFailureContext(task_step, state, context)
if not actions:
logger.warning(f"No actions defined for task, marking as failed")
- return {
- 'status': 'failed',
- 'reason': 'No actions could be generated for task',
- 'successful_actions': state.successful_actions,
- 'failed_actions': state.failed_actions,
- 'retry_count': state.retry_count
- }
+ return TaskResult(
+ taskId=task_step.id,
+ status=TaskStatus.FAILED,
+ success=False,
+ error='No actions could be generated for task'
+ )
# Execute actions with individual validation
for i, action in enumerate(actions):
@@ -1859,16 +1895,16 @@ Please review the task requirements and try again with different input or approa
# Execute action with validation
result = await self.executeActionWithValidation(action, workflow, context)
- if result['validation']['status'] == 'success':
+ if result.validation.get('status') == 'success':
state.addSuccessfulAction(result)
logger.info(f"Action {i+1} completed successfully")
- elif result['validation']['status'] == 'retry':
+ elif result.validation.get('status') == 'retry':
# Retry individual action
- improvements = result['validation'].get('improvements', [])
+ improvements = result.validation.get('improvements', [])
retry_result = await self.retryActionWithImprovements(action, result, improvements)
- if retry_result['validation']['status'] == 'success':
+ if retry_result.validation.get('status') == 'success':
state.addSuccessfulAction(retry_result)
logger.info(f"Action {i+1} retry successful")
else:
@@ -1886,18 +1922,17 @@ Please review the task requirements and try again with different input or approa
# Validate task completion
task_validation = await self._validateTaskCompletion(state.successful_actions, task_step, workflow)
- if task_validation['status'] == 'success':
+ if task_validation.status == 'success':
logger.info(f"Task completed successfully with {len(state.successful_actions)} successful actions")
- return {
- 'status': 'success',
- 'successful_actions': state.successful_actions,
- 'failed_actions': state.failed_actions,
- 'retry_count': state.retry_count,
- 'quality_metrics': task_validation.get('quality_metrics', {})
- }
+ return TaskResult(
+ taskId=task_step.id,
+ status=TaskStatus.COMPLETED,
+ success=True,
+ feedback=f"Task completed successfully with {len(state.successful_actions)} successful actions"
+ )
- elif task_validation['status'] == 'retry':
- state.improvements = task_validation.get('improvements', [])
+ elif task_validation.status == 'retry':
+ state.improvements = task_validation.improvements or []
state.incrementRetryCount()
logger.info(f"Task needs retry. Improvements: {state.improvements}")
# Preserve successful actions for next iteration
@@ -1905,35 +1940,32 @@ Please review the task requirements and try again with different input or approa
else: # fail
logger.error(f"Task failed permanently")
- return {
- 'status': 'failed',
- 'reason': task_validation.get('reason', 'Task validation failed'),
- 'successful_actions': state.successful_actions,
- 'failed_actions': state.failed_actions,
- 'retry_count': state.retry_count
- }
+ return TaskResult(
+ taskId=task_step.id,
+ status=TaskStatus.FAILED,
+ success=False,
+ error=task_validation.reason or 'Task validation failed'
+ )
# Max retries reached
logger.error(f"Task failed after {state.max_retries} retries")
- return {
- 'status': 'failed',
- 'reason': f'Task failed after {state.max_retries} retries',
- 'successful_actions': state.successful_actions,
- 'failed_actions': state.failed_actions,
- 'retry_count': state.retry_count
- }
+ return TaskResult(
+ taskId=task_step.id,
+ status=TaskStatus.FAILED,
+ success=False,
+ error=f'Task failed after {state.max_retries} retries'
+ )
except Exception as e:
logger.error(f"Error in task execution with state management: {str(e)}")
- return {
- 'status': 'failed',
- 'reason': f'Task execution error: {str(e)}',
- 'successful_actions': [],
- 'failed_actions': [],
- 'retry_count': 0
- }
+ return TaskResult(
+ taskId=task_step.id,
+ status=TaskStatus.FAILED,
+ success=False,
+ error=f'Task execution error: {str(e)}'
+ )
- async def _regenerateTaskActionsWithFailureContext(self, task_step: Dict[str, Any], state: TaskExecutionState, context: Dict[str, Any]) -> List[TaskAction]:
+ async def _regenerateTaskActionsWithFailureContext(self, task_step: TaskStep, state: TaskExecutionState, context: TaskContext) -> List[TaskAction]:
"""Regenerate task actions with failure context and improvements"""
try:
logger.info(f"Regenerating actions for task with failure context")
@@ -1942,22 +1974,22 @@ Please review the task requirements and try again with different input or approa
failure_patterns = state.getFailurePatterns()
# Create enhanced context with failure information
- enhanced_context = {
- 'task_step': task_step,
- 'workflow': context.get('workflow'),
- 'workflow_id': context.get('workflow_id'),
- 'available_documents': context.get('available_documents', []),
- 'previous_results': state.getAvailableResults(),
- 'improvements': state.improvements,
- 'retry_count': state.retry_count,
- 'failure_patterns': failure_patterns,
- 'failed_actions': state.failed_actions,
- 'successful_actions': state.successful_actions,
- 'is_regeneration': True
- }
+ enhanced_context = TaskContext(
+ task_step=task_step,
+ workflow=context.workflow,
+ workflow_id=context.workflow_id,
+ available_documents=context.available_documents,
+ previous_results=state.getAvailableResults(),
+ improvements=state.improvements,
+ retry_count=state.retry_count,
+ failure_patterns=failure_patterns,
+ failed_actions=state.failed_actions,
+ successful_actions=state.successful_actions,
+ is_regeneration=True
+ )
# Generate new actions with failure avoidance
- actions = await self.defineTaskActions(task_step, context.get('workflow'), state.getAvailableResults(), enhanced_context)
+ actions = await self.defineTaskActions(task_step, context.workflow, state.getAvailableResults(), enhanced_context)
logger.info(f"Regenerated {len(actions)} actions with failure context")
return actions
@@ -1966,18 +1998,18 @@ Please review the task requirements and try again with different input or approa
logger.error(f"Error regenerating task actions: {str(e)}")
return []
- async def _validateTaskCompletion(self, successful_actions: List[Dict[str, Any]], task_step: Dict[str, Any], workflow: ChatWorkflow) -> Dict[str, Any]:
+ async def _validateTaskCompletion(self, successful_actions: List[ActionExecutionResult], task_step: TaskStep, workflow: ChatWorkflow) -> ReviewResult:
"""Validate if task is completed successfully"""
try:
- logger.info(f"Validating task completion: {task_step.get('description', 'Unknown')}")
+ logger.info(f"Validating task completion: {task_step.description}")
# Create task result summary
task_result = {
'task_step': task_step,
'successful_actions': successful_actions,
'successful_count': len(successful_actions),
- 'expected_outputs': task_step.get('expected_outputs', []),
- 'success_criteria': task_step.get('success_criteria', [])
+ 'expected_outputs': task_step.expected_outputs or [],
+ 'success_criteria': task_step.success_criteria or []
}
# Use AI to validate task completion
@@ -1991,17 +2023,27 @@ Please review the task requirements and try again with different input or approa
validation['quality_metrics'] = self._calculateTaskQualityMetrics(task_step, successful_actions)
logger.info(f"Task completion validation: {validation.get('status', 'unknown')}")
- return validation
+ return ReviewResult(
+ status=validation.get('status', 'unknown'),
+ reason=validation.get('reason', 'No reason provided'),
+ improvements=validation.get('improvements', []),
+ quality_score=validation.get('quality_score', 5),
+ missing_outputs=validation.get('missing_outputs', []),
+ met_criteria=validation.get('met_criteria', []),
+ unmet_criteria=validation.get('unmet_criteria', []),
+ confidence=validation.get('confidence', 0.5)
+ )
except Exception as e:
logger.error(f"Error validating task completion: {str(e)}")
- return {
- 'status': 'success', # Default to success to avoid blocking
- 'reason': f'Validation failed: {str(e)}',
- 'quality_metrics': {'score': 5, 'confidence': 0.5}
- }
+ return ReviewResult(
+ status='success', # Default to success to avoid blocking
+ reason=f'Validation failed: {str(e)}',
+ quality_score=5,
+ confidence=0.5
+ )
- def _createTaskCompletionValidationPrompt(self, task_result: Dict[str, Any], task_step: Dict[str, Any]) -> str:
+ def _createTaskCompletionValidationPrompt(self, task_result: Dict[str, Any], task_step: TaskStep) -> str:
"""Create prompt for task completion validation"""
successful_actions = task_result['successful_actions']
@@ -2012,17 +2054,17 @@ Please review the task requirements and try again with different input or approa
action_summary = []
for action in successful_actions:
action_summary.append({
- 'method': action.get('actionMethod', ''),
- 'action': action.get('actionName', ''),
- 'result_label': action.get('resultLabel', ''),
- 'documents_count': len(action.get('documents', [])),
- 'has_text_result': bool(action.get('result', '').strip())
+ 'method': action.actionMethod or '',
+ 'action': action.actionName or '',
+ 'result_label': action.data.get('resultLabel', ''),
+ 'documents_count': len(action.documents or []),
+ 'has_text_result': bool(action.data.get('result', '').strip())
})
return f"""You are a task completion validator that evaluates if a task was successfully completed.
TASK DETAILS:
-- Description: {task_step.get('description', 'Unknown')}
+- Description: {task_step.description}
- Expected Outputs: {', '.join(expected_outputs)}
- Success Criteria: {', '.join(success_criteria)}
@@ -2091,25 +2133,25 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# ===== FIX MISSING METHOD CALL =====
- async def _executeSingleActionWithRetry(self, action: TaskAction, workflow: ChatWorkflow) -> Dict[str, Any]:
+ async def _executeSingleActionWithRetry(self, action: TaskAction, workflow: ChatWorkflow) -> ActionExecutionResult:
"""Execute single action with retry mechanism - FIXED"""
try:
logger.info(f"Executing action with retry: {action.execMethod}.{action.execAction}")
# Create context for validation
- context = {
- 'task_description': 'Action execution',
- 'previous_results': [],
- 'action_index': 0,
- 'total_actions': 1
- }
+ context = TaskContext(
+ task_step=TaskStep(id="action_retry", description="Action execution"),
+ workflow_id=workflow.id,
+ available_documents=[],
+ previous_results=[]
+ )
# Execute action with validation
result = await self.executeActionWithValidation(action, workflow, context)
# If validation indicates retry, attempt retry
- if result['validation']['status'] == 'retry':
- improvements = result['validation'].get('improvements', [])
+ if result.validation.get('status') == 'retry':
+ improvements = result.validation.get('improvements', [])
retry_result = await self.retryActionWithImprovements(action, result, improvements)
return retry_result
@@ -2118,14 +2160,16 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
except Exception as e:
logger.error(f"Error executing action with retry: {str(e)}")
action.setError(str(e))
- return {
- "status": "failed",
- "error": str(e),
- "actionId": action.id,
- "actionMethod": action.execMethod,
- "actionName": action.execAction,
- "documents": [],
- "validation": {
+ return ActionExecutionResult(
+ success=False,
+ data={},
+ metadata={},
+ error=str(e),
+ actionId=action.id,
+ actionMethod=action.execMethod,
+ actionName=action.execAction,
+ documents=[],
+ validation={
'status': 'fail',
'reason': f'Execution error: {str(e)}',
'confidence': 0.0,
@@ -2134,11 +2178,11 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
'missing_elements': [],
'suggested_retry_approach': ''
}
- }
+ )
# ===== REPLACE OLD executeTaskActions METHOD =====
- async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[Dict[str, Any]]:
+ async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[ActionExecutionResult]:
"""Execute task actions with individual validation and retry logic"""
try:
logger.info(f"Executing {len(task_actions)} task actions with validation")
@@ -2148,19 +2192,19 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}")
# Create context for validation
- context = {
- 'task_description': 'Action execution',
- 'previous_results': [r.get('resultLabel', '') for r in results if r.get('resultLabel')],
- 'action_index': i,
- 'total_actions': len(task_actions)
- }
+ context = TaskContext(
+ task_step=TaskStep(id=f"action_{i}", description="Action execution"),
+ workflow_id=workflow.id,
+ available_documents=[],
+ previous_results=[r.data.get('resultLabel', '') for r in results if r.data.get('resultLabel')]
+ )
# Execute action with validation
result = await self.executeActionWithValidation(action, workflow, context)
results.append(result)
# If action failed after validation, continue with next action
- if result['validation']['status'] == 'fail':
+ if isinstance(result.validation, dict) and result.validation.get('status') == 'fail':
logger.error(f"Action {i+1} failed validation, continuing with next action")
continue
@@ -2173,7 +2217,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# ===== RESTRUCTURED ACTION EXECUTION WITH VALIDATION =====
- async def executeActionWithValidation(self, action: TaskAction, workflow: ChatWorkflow, context: Dict[str, Any]) -> Dict[str, Any]:
+ async def executeActionWithValidation(self, action: TaskAction, workflow: ChatWorkflow, context: TaskContext) -> ActionExecutionResult:
"""Execute single action with immediate validation"""
try:
logger.info(f"Executing action: {action.execMethod}.{action.execAction}")
@@ -2185,8 +2229,18 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
validator = ActionValidator(self)
validation = await validator.validateActionResult(result, action, context)
- # Add validation result to action result
- result['validation'] = validation
+ # Create ActionExecutionResult model
+ action_result = ActionExecutionResult(
+ success=result.success,
+ data=result.data,
+ metadata=result.metadata,
+ validation=validation,
+ error=result.error,
+ actionId=action.id,
+ actionMethod=action.execMethod,
+ actionName=action.execAction,
+ documents=result.data.get("documents", [])
+ )
# Update action status based on validation
if validation['status'] == 'success':
@@ -2199,19 +2253,21 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
action.setError(validation.get('reason', 'Action failed validation'))
logger.error(f"Action {action.execMethod}.{action.execAction} failed validation: {validation.get('reason', 'No reason')}")
- return result
+ return action_result
except Exception as e:
logger.error(f"Error executing action with validation: {str(e)}")
action.setError(str(e))
- return {
- "status": "failed",
- "error": str(e),
- "actionId": action.id,
- "actionMethod": action.execMethod,
- "actionName": action.execAction,
- "documents": [],
- "validation": {
+ return ActionExecutionResult(
+ success=False,
+ data={},
+ metadata={},
+ error=str(e),
+ actionId=action.id,
+ actionMethod=action.execMethod,
+ actionName=action.execAction,
+ documents=[],
+ validation={
'status': 'fail',
'reason': f'Execution error: {str(e)}',
'confidence': 0.0,
@@ -2220,9 +2276,9 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
'missing_elements': [],
'suggested_retry_approach': ''
}
- }
+ )
- async def retryActionWithImprovements(self, action: TaskAction, previous_result: Dict[str, Any], improvements: List[str]) -> Dict[str, Any]:
+ async def retryActionWithImprovements(self, action: TaskAction, previous_result: ActionExecutionResult, improvements: List[str]) -> ActionExecutionResult:
"""Retry action with improvements based on previous failure"""
try:
logger.info(f"Retrying action {action.execMethod}.{action.execAction} with improvements")
@@ -2235,18 +2291,48 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# Validate the retry result
validator = ActionValidator(self)
- context = {
- 'task_description': 'Action retry',
- 'previous_results': [],
- 'retry_count': 1
- }
- validation = await validator.validateActionResult(result, enhanced_action, context)
+ # Create a proper TaskContext for retry validation
+ retry_context = TaskContext(
+ task_step=TaskStep(
+ id="retry_context",
+ description="Action retry",
+ dependencies=[],
+ expected_outputs=[],
+ success_criteria=[],
+ required_documents=[],
+ estimated_complexity="low",
+ ai_prompt=""
+ ),
+ workflow=None,
+ workflow_id=None,
+ available_documents=[],
+ previous_results=[],
+ improvements=[],
+ retry_count=1,
+ previous_action_results=[],
+ previous_review_result=None,
+ is_regeneration=False,
+ failure_patterns=[],
+ failed_actions=[],
+ successful_actions=[]
+ )
+ validation = await validator.validateActionResult(result, enhanced_action, retry_context)
- # Add validation and retry metadata
- result['validation'] = validation
- result['is_retry'] = True
- result['previous_error'] = previous_result.get('error', '')
- result['applied_improvements'] = improvements
+ # Create ActionExecutionResult model with retry metadata
+ action_result = ActionExecutionResult(
+ success=result.success,
+ data=result.data,
+ metadata=result.metadata,
+ validation=validation,
+ error=result.error,
+ actionId=action.id,
+ actionMethod=action.execMethod,
+ actionName=action.execAction,
+ documents=result.data.get("documents", []),
+ is_retry=True,
+ previous_error=previous_result.error,
+ applied_improvements=improvements
+ )
# Update action status
if validation['status'] == 'success':
@@ -2256,20 +2342,22 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
enhanced_action.setError(validation.get('reason', 'Retry failed'))
logger.error(f"Action retry failed: {enhanced_action.execMethod}.{enhanced_action.execAction}")
- return result
+ return action_result
except Exception as e:
logger.error(f"Error retrying action: {str(e)}")
action.setError(str(e))
- return {
- "status": "failed",
- "error": str(e),
- "actionId": action.id,
- "actionMethod": action.execMethod,
- "actionName": action.execAction,
- "documents": [],
- "is_retry": True,
- "validation": {
+ return ActionExecutionResult(
+ success=False,
+ data={},
+ metadata={},
+ error=str(e),
+ actionId=action.id,
+ actionMethod=action.execMethod,
+ actionName=action.execAction,
+ documents=[],
+ is_retry=True,
+ validation={
'status': 'fail',
'reason': f'Retry execution error: {str(e)}',
'confidence': 0.0,
@@ -2278,9 +2366,9 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
'missing_elements': [],
'suggested_retry_approach': ''
}
- }
+ )
- def _enhanceActionWithImprovements(self, action: TaskAction, improvements: List[str], previous_result: Dict[str, Any]) -> TaskAction:
+ def _enhanceActionWithImprovements(self, action: TaskAction, improvements: List[str], previous_result: ActionExecutionResult) -> TaskAction:
"""Enhance action parameters based on improvements and previous failure"""
enhanced_action = TaskAction(
id=action.id,
@@ -2318,8 +2406,8 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
enhanced_action.execParameters['aiPrompt'] = enhanced_prompt
# Apply specific improvements from validation
- validation = previous_result.get('validation', {})
- suggested_approach = validation.get('suggested_retry_approach', '')
+ validation = previous_result.validation if hasattr(previous_result, 'validation') else {}
+ suggested_approach = validation.get('suggested_retry_approach', '') if isinstance(validation, dict) else ''
if suggested_approach:
current_prompt = enhanced_action.execParameters.get('aiPrompt', '')
enhanced_prompt = current_prompt + f"\n\nRETRY APPROACH: {suggested_approach}"
diff --git a/modules/workflow/methodBase.py b/modules/chat/methodBase.py
similarity index 100%
rename from modules/workflow/methodBase.py
rename to modules/chat/methodBase.py
diff --git a/modules/workflow/processorDocument.py b/modules/chat/processorDocument.py
similarity index 100%
rename from modules/workflow/processorDocument.py
rename to modules/chat/processorDocument.py
diff --git a/modules/workflow/serviceCenter.py b/modules/chat/serviceCenter.py
similarity index 99%
rename from modules/workflow/serviceCenter.py
rename to modules/chat/serviceCenter.py
index d403aca9..3f48cbce 100644
--- a/modules/workflow/serviceCenter.py
+++ b/modules/chat/serviceCenter.py
@@ -15,8 +15,8 @@ from modules.interfaces.interfaceChatObjects import getInterface as getChatObjec
from modules.interfaces.interfaceChatModel import ActionResult
from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects
from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects
-from modules.workflow.processorDocument import DocumentProcessor
-from modules.workflow.methodBase import MethodBase
+from modules.chat.processorDocument import DocumentProcessor
+from modules.chat.methodBase import MethodBase
import uuid
import base64
import hashlib
diff --git a/modules/interfaces/interfaceChatModel.py b/modules/interfaces/interfaceChatModel.py
index 410f676f..28559d1f 100644
--- a/modules/interfaces/interfaceChatModel.py
+++ b/modules/interfaces/interfaceChatModel.py
@@ -454,3 +454,75 @@ register_model_labels(
"tasks": {"en": "Tasks", "fr": "TΓ’ches"}
}
)
+
+# ====== WORKFLOW SUPPORT MODELS (for managerChat.py compatibility) ======
+
+class TaskStep(BaseModel, ModelMixin):
+ id: str
+ description: str
+ dependencies: Optional[list[str]] = []
+ expected_outputs: Optional[list[str]] = []
+ success_criteria: Optional[list[str]] = []
+ required_documents: Optional[list[str]] = []
+ estimated_complexity: Optional[str] = None
+ ai_prompt: Optional[str] = None
+
+class TaskContext(BaseModel, ModelMixin):
+ task_step: TaskStep
+ workflow: Optional['ChatWorkflow'] = None
+ workflow_id: Optional[str] = None
+ available_documents: Optional[list[str]] = []
+ previous_results: Optional[list[str]] = []
+ improvements: Optional[list[str]] = []
+ retry_count: Optional[int] = 0
+ previous_action_results: Optional[list] = []
+ previous_review_result: Optional[dict] = None
+ is_regeneration: Optional[bool] = False
+ failure_patterns: Optional[list[str]] = []
+ failed_actions: Optional[list] = []
+ successful_actions: Optional[list] = []
+
+class ActionExecutionResult(BaseModel, ModelMixin):
+ success: bool
+ data: dict
+ metadata: dict = {}
+ error: Optional[str] = None
+ actionId: Optional[str] = None
+ actionMethod: Optional[str] = None
+ actionName: Optional[str] = None
+ documents: Optional[list] = []
+ validation: Optional[dict] = {}
+ is_retry: Optional[bool] = False
+ previous_error: Optional[str] = None
+ applied_improvements: Optional[list[str]] = []
+
+class ReviewContext(BaseModel, ModelMixin):
+ task_step: TaskStep
+ task_actions: Optional[list] = []
+ action_results: Optional[list] = []
+ step_result: Optional[dict] = {}
+ workflow_id: Optional[str] = None
+ previous_results: Optional[list[str]] = []
+
+class ReviewResult(BaseModel, ModelMixin):
+ status: str
+ reason: Optional[str] = None
+ improvements: Optional[list[str]] = []
+ quality_score: Optional[int] = 5
+ missing_outputs: Optional[list[str]] = []
+ met_criteria: Optional[list[str]] = []
+ unmet_criteria: Optional[list[str]] = []
+ confidence: Optional[float] = 0.5
+
+class TaskPlan(BaseModel, ModelMixin):
+ overview: str
+ tasks: list[TaskStep]
+
+class WorkflowResult(BaseModel, ModelMixin):
+ status: str
+ completed_tasks: int
+ total_tasks: int
+ execution_time: float
+ final_results_count: int
+ error: Optional[str] = None
+ phase: Optional[str] = None
diff --git a/modules/methods/EXCLUDED_methodCoder.py b/modules/methods/EXCLUDED_methodCoder.py
index 33d285a0..c935b1a1 100644
--- a/modules/methods/EXCLUDED_methodCoder.py
+++ b/modules/methods/EXCLUDED_methodCoder.py
@@ -3,7 +3,7 @@ import logging
import uuid
from datetime import datetime, UTC
-from modules.workflow.methodBase import MethodBase, ActionResult, action
+from modules.chat.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
diff --git a/modules/methods/methodDocument.py b/modules/methods/methodDocument.py
index 208f736d..456bac95 100644
--- a/modules/methods/methodDocument.py
+++ b/modules/methods/methodDocument.py
@@ -8,7 +8,7 @@ from typing import Dict, Any, List, Optional
import uuid
from datetime import datetime, UTC
-from modules.workflow.methodBase import MethodBase, ActionResult, action
+from modules.chat.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
diff --git a/modules/methods/methodOutlook.py b/modules/methods/methodOutlook.py
index fb226731..a2e91896 100644
--- a/modules/methods/methodOutlook.py
+++ b/modules/methods/methodOutlook.py
@@ -9,7 +9,7 @@ from datetime import datetime, UTC
import json
import uuid
-from modules.workflow.methodBase import MethodBase, ActionResult, action
+from modules.chat.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py
index dbfc4c1f..8288119d 100644
--- a/modules/methods/methodSharepoint.py
+++ b/modules/methods/methodSharepoint.py
@@ -9,7 +9,7 @@ from datetime import datetime, UTC
import json
import uuid
-from modules.workflow.methodBase import MethodBase, ActionResult, action
+from modules.chat.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
diff --git a/modules/methods/methodWeb.py b/modules/methods/methodWeb.py
index 4602a5a0..a03549e5 100644
--- a/modules/methods/methodWeb.py
+++ b/modules/methods/methodWeb.py
@@ -11,7 +11,7 @@ from bs4 import BeautifulSoup
import time
import uuid
-from modules.workflow.methodBase import MethodBase, ActionResult, action
+from modules.chat.methodBase import MethodBase, ActionResult, action
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
diff --git a/modules/workflow/managerWorkflow.py b/modules/workflow/managerWorkflow.py
index e35f064a..cb4294d2 100644
--- a/modules/workflow/managerWorkflow.py
+++ b/modules/workflow/managerWorkflow.py
@@ -8,7 +8,8 @@ from modules.interfaces.interfaceAppObjects import User
from modules.interfaces.interfaceChatModel import (UserInputRequest, ChatMessage, ChatWorkflow, TaskItem, TaskStatus)
from modules.interfaces.interfaceChatObjects import ChatObjects
-from modules.workflow.managerChat import ChatManager
+from modules.chat.managerChat import ChatManager
+from modules.interfaces.interfaceChatModel import WorkflowResult
logger = logging.getLogger(__name__)
@@ -179,15 +180,15 @@ class WorkflowManager:
logger.error(f"Error sending last message: {str(e)}")
raise
- async def _processWorkflowResults(self, workflow: ChatWorkflow, workflow_result: Dict[str, Any], initial_message: ChatMessage) -> None:
+ async def _processWorkflowResults(self, workflow: ChatWorkflow, workflow_result: WorkflowResult, initial_message: ChatMessage) -> None:
"""Process workflow results and create appropriate messages"""
try:
- if workflow_result.get('status') == 'failed':
+ if workflow_result.status == 'failed':
# Create error message
error_message = {
"workflowId": workflow.id,
"role": "assistant",
- "message": f"Workflow failed: {workflow_result.get('error', 'Unknown error')}",
+ "message": f"Workflow failed: {workflow_result.error or 'Unknown error'}",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": datetime.now(UTC).isoformat()
@@ -205,41 +206,11 @@ class WorkflowManager:
})
return
- # Process successful workflow results
- workflow_results = workflow_result.get('workflow_results', [])
-
- for i, result in enumerate(workflow_results):
- task_step = result['task_step']
- action_results = result['action_results']
- review_result = result['review_result']
-
- # Create message for task step
- step_message = {
- "workflowId": workflow.id,
- "role": "assistant",
- "message": f"Completed task: {task_step.get('description', 'Unknown')}",
- "status": "step",
- "sequenceNr": len(workflow.messages) + 1,
- "publishedAt": datetime.now(UTC).isoformat()
- }
-
- # Add action details if available
- if action_results:
- successful_actions = [r for r in action_results if r.get('status') == 'completed']
- step_message["message"] += f"\nExecuted {len(successful_actions)}/{len(action_results)} actions successfully."
-
- message = self.chatInterface.createWorkflowMessage(step_message)
- if message:
- workflow.messages.append(message)
-
- # Create final summary message
- successful_tasks = workflow_result.get('successful_tasks', 0)
- total_tasks = workflow_result.get('total_tasks', 0)
-
+ # For successful workflows, create a simple completion message
summary_message = {
"workflowId": workflow.id,
"role": "assistant",
- "message": f"Workflow completed successfully. Completed {successful_tasks}/{total_tasks} tasks.",
+ "message": f"Workflow completed successfully. Completed {workflow_result.completed_tasks}/{workflow_result.total_tasks} tasks in {workflow_result.execution_time:.2f} seconds.",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": datetime.now(UTC).isoformat()