# handlingTasks.py # Refactored for clarity and consolidation import asyncio import logging import json import time from typing import Dict, Any, Optional, List, Union from datetime import datetime, UTC from modules.interfaces.interfaceChatModel import ( TaskStatus, TaskStep, TaskContext, TaskAction, ReviewResult, TaskPlan, WorkflowResult, TaskResult, ReviewContext, ActionResult ) from .executionState import TaskExecutionState from .promptFactory import createTaskPlanningPrompt, createActionDefinitionPrompt, createResultReviewPrompt from modules.chat.documents.documentGeneration import DocumentGenerator logger = logging.getLogger(__name__) class WorkflowStoppedException(Exception): """Exception raised when a workflow is stopped by the user.""" pass class HandlingTasks: def __init__(self, chatInterface, service, workflow=None): self.chatInterface = chatInterface self.service = service self.workflow = workflow self.documentGenerator = DocumentGenerator(service) def _checkWorkflowStopped(self): """ Check if workflow has been stopped by user and raise exception if so. This function centralizes all workflow stop checking logic to avoid code duplication. """ try: # Get the current workflow status from the database to avoid stale data current_workflow = self.chatInterface.getWorkflow(self.service.workflow.id) if current_workflow and current_workflow.status == "stopped": logger.info("Workflow stopped by user, aborting execution") raise WorkflowStoppedException("Workflow was stopped by user") except WorkflowStoppedException: # Re-raise the WorkflowStoppedException immediately raise except Exception as e: # If we can't get the current status due to other database issues, fall back to the in-memory object logger.warning(f"Could not check current workflow status from database: {str(e)}") if self.service.workflow.status == "stopped": logger.info("Workflow stopped by user (from in-memory object), aborting execution") raise WorkflowStoppedException("Workflow was stopped by user") async def generateTaskPlan(self, userInput: str, workflow) -> TaskPlan: """Generate a high-level task plan for the workflow.""" try: # Check workflow status before generating task plan self._checkWorkflowStopped() logger.info(f"Generating task plan for workflow {workflow.id}") available_docs = self.service.getAvailableDocuments(workflow) # Check workflow status before calling AI service self._checkWorkflowStopped() prompt = await self.service.callAiTextAdvanced( createTaskPlanningPrompt({ 'user_request': userInput, 'available_documents': available_docs, 'workflow_id': workflow.id }) ) # Inline _parseTaskPlanResponse logic try: json_start = prompt.find('{') json_end = prompt.rfind('}') + 1 if json_start == -1 or json_end == 0: raise ValueError("No JSON found in response") json_str = prompt[json_start:json_end] task_plan_dict = json.loads(json_str) if 'tasks' not in task_plan_dict: raise ValueError("Task plan missing 'tasks' field") except Exception as e: logger.error(f"Error parsing task plan response: {str(e)}") task_plan_dict = {'tasks': []} if not self._validateTaskPlan(task_plan_dict): logger.error("Generated task plan failed validation") logger.error(f"AI Response: {prompt}") logger.error(f"Parsed Task Plan: {json.dumps(task_plan_dict, indent=2)}") raise Exception("AI-generated task plan failed validation - AI is required for task planning") tasks = [] for task_dict in task_plan_dict.get('tasks', []): # Map old 'description' field to new 'objective' field if 'description' in task_dict and 'objective' not in task_dict: task_dict['objective'] = task_dict.pop('description') tasks.append(TaskStep(**task_dict)) task_plan = TaskPlan( overview=task_plan_dict.get('overview', ''), tasks=tasks ) logger.info(f"Task plan generated successfully with {len(tasks)} tasks") # Log the generated tasks for i, task in enumerate(tasks): logger.info(f" Task {i+1}: {task.objective}") if hasattr(task, 'success_criteria') and task.success_criteria: logger.info(f" Success criteria: {task.success_criteria}") # Log the complete task plan logger.info("=== GENERATED TASK PLAN ===") logger.info(f"Overview: {task_plan.overview}") logger.info(f"Total tasks: {len(tasks)}") # Log the RAW AI-generated task plan JSON for debugging logger.info("=== RAW AI TASK PLAN JSON ===") logger.info(f"AI Response with task plan: {prompt}") logger.info("=== END RAW AI TASK PLAN JSON ===") return task_plan except Exception as e: logger.error(f"Error in generateTaskPlan: {str(e)}") raise async def generateTaskActions(self, task_step, workflow, previous_results=None, enhanced_context=None) -> List[TaskAction]: """Generate actions for a given task step.""" try: # Check workflow status before generating actions self._checkWorkflowStopped() logger.info(f"Generating actions for task: {task_step.objective}") available_docs = self.service.getAvailableDocuments(workflow) available_connections = self.service.getConnectionReferenceList() context = enhanced_context or TaskContext( task_step=task_step, workflow=workflow, workflow_id=workflow.id, available_documents=available_docs, previous_results=previous_results or [], improvements=[], retry_count=0, previous_action_results=[], previous_review_result=None, is_regeneration=False, failure_patterns=[], failed_actions=[], successful_actions=[] ) # Check workflow status before calling AI service self._checkWorkflowStopped() prompt = await self.service.callAiTextAdvanced( await createActionDefinitionPrompt(context, self.service) ) # Inline parseActionResponse logic here json_start = prompt.find('{') json_end = prompt.rfind('}') + 1 if json_start == -1 or json_end == 0: raise ValueError("No JSON found in response") json_str = prompt[json_start:json_end] try: action_data = json.loads(json_str) except Exception as e: logger.error(f"Error parsing action response JSON: {str(e)}") action_data = {} if 'actions' not in action_data: raise ValueError("Action response missing 'actions' field") actions = action_data['actions'] if not self._validateActions(actions, context): logger.error("Generated actions failed validation") raise Exception("AI-generated actions failed validation - AI is required for action generation") # Convert to TaskAction objects task_actions = [self.chatInterface.createTaskAction({ "execMethod": a.get('method', 'unknown'), "execAction": a.get('action', 'unknown'), "execParameters": a.get('parameters', {}), "execResultLabel": a.get('resultLabel', ''), "expectedDocumentFormats": a.get('expectedDocumentFormats', None), "status": TaskStatus.PENDING }) for a in actions] valid_actions = [ta for ta in task_actions if ta] logger.info(f"Generated {len(valid_actions)} actions for task: {task_step.objective}") # Log the generated actions for i, action in enumerate(valid_actions): logger.info(f" Action {i+1}: {action.execMethod}.{action.execAction}") if action.expectedDocumentFormats: logger.info(f" Expected formats: {action.expectedDocumentFormats}") if action.execParameters.get('documentList'): logger.info(f" Input documents: {action.execParameters['documentList']}") # Log the complete action plan logger.info("=== GENERATED ACTION PLAN ===") logger.info(f"Task: {task_step.objective}") logger.info(f"Total actions: {len(valid_actions)}") # Log the RAW AI-generated action plan JSON for debugging logger.info("=== RAW AI ACTION PLAN JSON ===") logger.info(f"AI Response with parsed actions: {prompt}") logger.info("=== END RAW AI ACTION PLAN JSON ===") return valid_actions except Exception as e: logger.error(f"Error in generateTaskActions: {str(e)}") return [] async def executeTask(self, task_step, workflow, context, task_index=None, total_tasks=None) -> TaskResult: """Execute all actions for a task step, with state management and retries.""" logger.info(f"=== STARTING TASK {task_index or '?'}: {task_step.objective} ===") # Create database log entry for task start in format expected by frontend if task_index is not None: if total_tasks is not None: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"Executing task {task_index}/{total_tasks}", "type": "info" }) else: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"Executing task {task_index}/?", "type": "info" }) # Create a task start message for the user task_progress = f"{task_index}/{total_tasks}" if total_tasks is not None else str(task_index) task_start_message = { "workflowId": workflow.id, "role": "assistant", "message": f"🚀 Starting Task {task_progress}\n\nObjective: {task_step.objective}", "status": "step", "sequenceNr": len(workflow.messages) + 1, "publishedAt": datetime.now(UTC).isoformat(), "documentsLabel": f"task_{task_index}_start", "documents": [] } message = self.chatInterface.createWorkflowMessage(task_start_message) if message: workflow.messages.append(message) logger.info(f"Task start message created for task {task_index}") state = TaskExecutionState(task_step) retry_context = context max_retries = state.max_retries for attempt in range(max_retries): logger.info(f"Task execution attempt {attempt+1}/{max_retries}") # Check workflow status before starting task execution self._checkWorkflowStopped() actions = await self.generateTaskActions(task_step, workflow, previous_results=retry_context.previous_results, enhanced_context=retry_context) if not actions: logger.error("No actions defined for task step, aborting task execution") break # Log total actions count for this task total_actions = len(actions) logger.info(f"Task {task_index or '?'} has {total_actions} actions") action_results = [] for action_idx, action in enumerate(actions): # Check workflow status before each action execution self._checkWorkflowStopped() # Log action start in format expected by frontend action_number = action_idx + 1 logger.info(f"Task {task_index} - Starting action {action_number}/{total_actions}") # Create database log entry for action start self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"Task {task_index} - Starting action {action_number}/{total_actions}", "type": "info" }) # Create an action start message for the user action_start_message = { "workflowId": workflow.id, "role": "assistant", "message": f"⚡ Task {task_index} - Action {action_number}/{total_actions}\n\nMethod: {action.execMethod}.{action.execAction}", "status": "step", "sequenceNr": len(workflow.messages) + 1, "publishedAt": datetime.now(UTC).isoformat(), "documentsLabel": f"action_{action_number}_start", "documents": [] } message = self.chatInterface.createWorkflowMessage(action_start_message) if message: workflow.messages.append(message) logger.info(f"Action start message created for action {action_number}") # Pass action index to executeSingleAction with task context result = await self.executeSingleAction(action, workflow, task_step, task_index, action_number, total_actions) action_results.append(result) if result.success: state.addSuccessfulAction(result) else: state.addFailedAction(result) # Check workflow status before review self._checkWorkflowStopped() review_result = await self.reviewTaskCompletion(task_step, actions, action_results, workflow) success = review_result.status == 'success' feedback = review_result.reason error = None if success else review_result.reason if success: logger.info(f"=== TASK {task_index or '?'} COMPLETED SUCCESSFULLY: {task_step.objective} ===") # Create database log entry for task completion if total_tasks is not None: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"🎯 Task {task_index}/{total_tasks} completed", "type": "success" }) else: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"🎯 Task {task_index}/? completed", "type": "success" }) # Create a task completion message for the user task_progress = f"{task_index}/{total_tasks}" if total_tasks is not None else str(task_index) task_completion_message = { "workflowId": workflow.id, "role": "assistant", "message": f"🎯 Task {task_progress} Completed Successfully!\n\nObjective: {task_step.objective}\n\nFeedback: {feedback or 'Task completed successfully'}", "status": "step", "sequenceNr": len(workflow.messages) + 1, "publishedAt": datetime.now(UTC).isoformat(), "documentsLabel": f"task_{task_index}_completion", "documents": [] } message = self.chatInterface.createWorkflowMessage(task_completion_message) if message: workflow.messages.append(message) logger.info(f"Task completion message created for task {task_index}") return TaskResult( taskId=task_step.id, status=TaskStatus.COMPLETED, success=True, feedback=feedback, error=None ) elif review_result.status == 'retry' and state.canRetry(): logger.warning(f"Task step '{task_step.objective}' requires retry: {review_result.improvements}") state.incrementRetryCount() retry_context.retry_count = state.retry_count retry_context.improvements = review_result.improvements retry_context.previous_action_results = action_results retry_context.previous_review_result = review_result retry_context.is_regeneration = True retry_context.failure_patterns = state.getFailurePatterns() retry_context.failed_actions = state.failed_actions retry_context.successful_actions = state.successful_actions continue else: logger.error(f"=== TASK {task_index or '?'} FAILED: {task_step.objective} after {attempt+1} attempts ===") # Create user-facing error message for task failure error_message = f"❌ Task {task_index or '?'} - '{task_step.objective}' failed after {attempt+1} attempts\n\n" error_message += f"Objective: {task_step.objective}\n\n" # Add specific error details if available if error: error_message += f"Error: {error}\n\n" # Add retry information error_message += f"Attempts: {attempt+1}\n" error_message += f"Status: Will retry automatically\n\n" error_message += "The system will attempt to retry this task. Please wait..." # Create workflow message for user message_data = { "workflowId": workflow.id, "role": "assistant", "message": error_message, "status": "step", "sequenceNr": len(workflow.messages) + 1, "publishedAt": datetime.now(UTC).isoformat(), "actionId": None, "actionMethod": "task", "actionName": "task_retry", "documentsLabel": None, "documents": [] } try: message = self.chatInterface.createWorkflowMessage(message_data) if message: workflow.messages.append(message) logger.info(f"Created user-facing retry message for failed task: {task_step.objective}") else: logger.error(f"Failed to create user-facing retry message for failed task: {task_step.objective}") except Exception as e: logger.error(f"Error creating user-facing retry message: {str(e)}") return TaskResult( taskId=task_step.id, status=TaskStatus.FAILED, success=False, feedback=feedback, error=error ) logger.error(f"=== TASK {task_index or '?'} FAILED AFTER ALL RETRIES: {task_step.objective} ===") # Create user-facing error message for task failure error_message = f"❌ Task {task_index or '?'} - '{task_step.objective}' failed after all retries\n\n" error_message += f"Objective: {task_step.objective}\n\n" # Add specific error details if available if error and error != "Task failed after all retries.": error_message += f"Error: {error}\n\n" # Add retry information error_message += f"Retries attempted: {retry_context.retry_count if retry_context else 'Unknown'}\n" error_message += f"Status: Task failed permanently\n\n" error_message += "Please check the connection and try again, or contact support if the issue persists." # Create workflow message for user message_data = { "workflowId": workflow.id, "role": "assistant", "message": error_message, "status": "step", "sequenceNr": len(workflow.messages) + 1, "publishedAt": datetime.now(UTC).isoformat(), "actionId": None, "actionMethod": "task", "actionName": "task_failure", "documentsLabel": None, "documents": [] } try: message = self.chatInterface.createWorkflowMessage(message_data) if message: workflow.messages.append(message) logger.info(f"Created user-facing error message for failed task: {task_step.objective}") else: logger.error(f"Failed to create user-facing error message for failed task: {task_step.objective}") except Exception as e: logger.error(f"Error creating user-facing error message: {str(e)}") return TaskResult( taskId=task_step.id, status=TaskStatus.FAILED, success=False, feedback="Task failed after all retries.", error="Task failed after all retries." ) async def reviewTaskCompletion(self, task_step, task_actions, action_results, workflow): try: # Check workflow status before reviewing task completion self._checkWorkflowStopped() review_context = ReviewContext( task_step=task_step, action_results=action_results, workflow=workflow, step_result={ 'successful_actions': sum(1 for result in action_results if result.success), 'total_actions': len(action_results), 'results': [result.data.get('result', '') for result in action_results if result.success], 'errors': [result.error for result in action_results if not result.success] } ) # Check workflow status before calling AI service self._checkWorkflowStopped() # Use promptFactory for review prompt prompt = await createResultReviewPrompt(review_context) response = await self.service.callAiTextAdvanced(prompt) # Inline parseReviewResponse logic here json_start = response.find('{') json_end = response.rfind('}') + 1 if json_start == -1 or json_end == 0: raise ValueError("No JSON found in review response") json_str = response[json_start:json_end] try: review = json.loads(json_str) except Exception as e: logger.error(f"Error parsing review response JSON: {str(e)}") review = {} if 'status' not in review: raise ValueError("Review response missing 'status' field") review.setdefault('status', 'unknown') review.setdefault('reason', 'No reason provided') review.setdefault('quality_score', 5) # Ensure improvements is a list improvements = review.get('improvements', []) if isinstance(improvements, str): # Split string into list if it's a single improvement improvements = [improvements.strip()] if improvements.strip() else [] elif not isinstance(improvements, list): improvements = [] # Ensure all list fields are properly typed met_criteria = review.get('met_criteria', []) if not isinstance(met_criteria, list): met_criteria = [] unmet_criteria = review.get('unmet_criteria', []) if not isinstance(unmet_criteria, list): unmet_criteria = [] review_result = ReviewResult( status=review.get('status', 'unknown'), reason=review.get('reason', 'No reason provided'), improvements=improvements, quality_score=review.get('quality_score', 5), missing_outputs=[], met_criteria=met_criteria, unmet_criteria=unmet_criteria, confidence=review.get('confidence', 0.5) ) # Enhanced validation logging logger.info(f"VALIDATION RESULT - Task: '{task_step.objective}' - Status: {review_result.status.upper()}, Quality: {review_result.quality_score}/10") if review_result.status == 'success': logger.info(f"VALIDATION SUCCESS - Task completed successfully") if review_result.met_criteria: logger.info(f"Met criteria: {', '.join(review_result.met_criteria)}") elif review_result.status == 'retry': logger.warning(f"VALIDATION RETRY - Task requires retry: {review_result.improvements}") if review_result.unmet_criteria: logger.warning(f"Unmet criteria: {', '.join(review_result.unmet_criteria)}") else: logger.error(f"VALIDATION FAILED - Task failed: {review_result.reason}") return review_result except Exception as e: logger.error(f"Error in reviewTaskCompletion: {str(e)}") return ReviewResult( status='failed', reason=str(e), quality_score=0 ) async def prepareTaskHandover(self, task_step, task_actions, review_result, workflow): try: # Check workflow status before preparing task handover self._checkWorkflowStopped() # Log handover status summary if hasattr(review_result, 'status'): status = review_result.status if hasattr(review_result, 'met_criteria'): met = review_result.met_criteria else: met = [] handover_data = { 'task_id': task_step.id, 'task_description': task_step.objective, 'actions': [action.to_dict() for action in task_actions], 'review_result': review_result.to_dict() if hasattr(review_result, 'to_dict') else review_result, 'workflow_id': workflow.id, 'handover_time': datetime.now(UTC).isoformat() } logger.info(f"Prepared handover for task {task_step.id} in workflow {workflow.id}") return handover_data except Exception as e: logger.error(f"Error in prepareTaskHandover: {str(e)}") return {'error': str(e)} # --- Helper action handling methods --- async def executeSingleAction(self, action, workflow, task_step, task_index=None, action_index=None, total_actions=None): """Execute a single action and return ActionResult with enhanced document processing""" try: # Check workflow status before executing action self._checkWorkflowStopped() # Use passed indices or fallback to '?' task_num = task_index if task_index is not None else '?' action_num = action_index if action_index is not None else '?' logger.info(f"=== TASK {task_num} ACTION {action_num}: {action.execMethod}.{action.execAction} ===") # Log input parameters input_docs = action.execParameters.get('documentList', []) input_connections = action.execParameters.get('connections', []) logger.info(f"Input documents: {input_docs} (type: {type(input_docs)})") if input_connections: logger.info(f"Input connections: {input_connections}") # Log all action parameters for debugging logger.info(f"All action parameters: {action.execParameters}") enhanced_parameters = action.execParameters.copy() if action.expectedDocumentFormats: enhanced_parameters['expectedDocumentFormats'] = action.expectedDocumentFormats logger.info(f"Expected formats: {action.expectedDocumentFormats}") # Check workflow status before executing the action self._checkWorkflowStopped() result = await self.service.executeAction( methodName=action.execMethod, actionName=action.execAction, parameters=enhanced_parameters ) result_label = action.execResultLabel # Process documents from the action result created_documents = [] if result.success: created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow) action.setSuccess() action.result = result.data.get("result", "") action.execResultLabel = result_label await self.createActionMessage(action, result, workflow, result_label, created_documents, task_step, task_index) # Log action results logger.info(f"✓ Action completed successfully") # Create database log entry for action completion if total_actions is not None: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"✅ Task {task_num} - Action {action_num}/{total_actions} completed", "type": "success" }) else: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"✅ Task {task_num} - Action {action_num}/? completed", "type": "success" }) if created_documents: logger.info(f"Output documents ({len(created_documents)}):") for i, doc in enumerate(created_documents): if hasattr(doc, 'filename'): logger.info(f" {i+1}. {doc.filename}") elif isinstance(doc, dict) and 'filename' in doc: logger.info(f" {i+1}. {doc['filename']}") else: logger.info(f" {i+1}. {type(doc).__name__}") # Log document details for debugging logger.info("Document details:") for i, doc in enumerate(created_documents): if hasattr(doc, 'filename'): logger.info(f" Doc {i+1}: filename={doc.filename}, type={type(doc)}") if hasattr(doc, 'id'): logger.info(f" ID: {doc.id}") if hasattr(doc, 'fileId'): logger.info(f" File ID: {doc.fileId}") elif isinstance(doc, dict): logger.info(f" Doc {i+1}: dict with keys: {list(doc.keys())}") else: logger.info("Output: No documents created") else: action.setError(result.error or "Action execution failed") logger.error(f"✗ Action failed: {result.error}") # ⚠️ IMPORTANT: Create error message for failed actions so user can see what went wrong await self.createActionMessage(action, result, workflow, result_label, [], task_step, task_index) # Create database log entry for action failure if total_actions is not None: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"❌ Task {task_num} - Action {action_num}/{total_actions} failed: {result.error}", "type": "error" }) else: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"❌ Task {task_num} - Action {action_num}/? failed: {result.error}", "type": "error" }) # Extract document filenames for the ActionResult document_filenames = [] for doc in created_documents: if hasattr(doc, 'filename'): document_filenames.append(doc.filename) elif isinstance(doc, dict) and 'filename' in doc: document_filenames.append(doc['filename']) # Also include the original documents from the service result for validation original_documents = result.data.get("documents", []) # Log action summary logger.info(f"=== TASK {task_num} ACTION {action_num} COMPLETED ===") return ActionResult( success=result.success, data={ "result": result.data.get("result", ""), "documents": created_documents, # Include actual document objects in data "actionId": action.id, "actionMethod": action.execMethod, "actionName": action.execAction, "resultLabel": result_label }, documents=document_filenames, # Keep as filenames for the documents field metadata={ "actionId": action.id, "actionMethod": action.execMethod, "actionName": action.execAction, "resultLabel": result_label }, validation={}, error=result.error or "" ) except Exception as e: logger.error(f"Error executing single action: {str(e)}") action.setError(str(e)) return ActionResult( success=False, data={ "actionId": action.id, "actionMethod": action.execMethod, "actionName": action.execAction, "documents": [] }, metadata={ "actionId": action.id, "actionMethod": action.execMethod, "actionName": action.execAction }, validation={}, error=str(e) ) async def createActionMessage(self, action, result, workflow, result_label=None, created_documents=None, task_step=None, task_index=None): """Create and store a message for the action result in the workflow with enhanced document processing""" try: # Check workflow status before creating action message self._checkWorkflowStopped() if result_label is None: result_label = action.execResultLabel # Use provided documents or process them if not provided if created_documents is None: created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow) # Log delivered documents if created_documents: logger.info(f"Result label: {result_label} - {len(created_documents)} documents") else: logger.info(f"Result label: {result_label} - No documents") # Create a more meaningful message that includes task context task_objective = task_step.objective if task_step else 'Unknown task' # Build a user-friendly message based on success/failure if result.success: if created_documents and len(created_documents) > 0: doc_names = [doc.filename if hasattr(doc, 'filename') else str(doc) for doc in created_documents[:3]] if len(created_documents) > 3: doc_names.append(f"... and {len(created_documents) - 3} more") message_text = f"✅ Task {task_index or '?'} - Action {action.execMethod}.{action.execAction} completed\n\nObjective: {task_objective}\n\nGenerated {len(created_documents)} document(s): {', '.join(doc_names)}" else: message_text = f"✅ Task {task_index or '?'} - Action {action.execMethod}.{action.execAction} completed\n\nObjective: {task_objective}\n\nAction executed successfully" else: # ⚠️ FAILURE MESSAGE - Show error details to user error_details = result.error if result.error else "Unknown error occurred" message_text = f"❌ Task {task_index or '?'} - Action {action.execMethod}.{action.execAction} failed\n\nObjective: {task_objective}\n\nError: {error_details}\n\nPlease check the connection and try again." message_data = { "workflowId": workflow.id, "role": "assistant", "message": message_text, "status": "step", "sequenceNr": len(workflow.messages) + 1, "publishedAt": datetime.now(UTC).isoformat(), "actionId": action.id, "actionMethod": action.execMethod, "actionName": action.execAction, "documentsLabel": result_label, "documents": created_documents } # Add debugging for error messages if not result.success: logger.info(f"Creating ERROR message: {message_text}") logger.info(f"Message data: {message_data}") message = self.chatInterface.createWorkflowMessage(message_data) if message: workflow.messages.append(message) logger.info(f"Message created: {action.execMethod}.{action.execAction}") else: logger.error(f"Failed to create workflow message for action {action.execMethod}.{action.execAction}") except Exception as e: logger.error(f"Error creating action message: {str(e)}") # --- Helper validation methods --- def _validateTaskPlan(self, task_plan: Dict[str, Any]) -> bool: try: if not isinstance(task_plan, dict): logger.error("Task plan is not a dictionary") return False if 'tasks' not in task_plan or not isinstance(task_plan['tasks'], list): logger.error(f"Task plan missing 'tasks' field or not a list. Found: {type(task_plan.get('tasks', 'MISSING'))}") return False # First pass: collect all task IDs to validate dependencies task_ids = set() for task in task_plan['tasks']: if not isinstance(task, dict): logger.error(f"Task is not a dictionary: {type(task)}") return False if 'id' not in task: logger.error(f"Task missing 'id' field: {task}") return False task_ids.add(task['id']) # Second pass: validate each task for i, task in enumerate(task_plan['tasks']): if not isinstance(task, dict): logger.error(f"Task {i} is not a dictionary: {type(task)}") return False required_fields = ['id', 'objective', 'success_criteria'] missing_fields = [field for field in required_fields if field not in task] if missing_fields: logger.error(f"Task {i} missing required fields: {missing_fields}") return False # Check for duplicate IDs (shouldn't happen after first pass, but safety check) if task['id'] in task_ids and list(task_plan['tasks']).count(task['id']) > 1: logger.error(f"Task {i} has duplicate ID: {task['id']}") return False dependencies = task.get('dependencies', []) if not isinstance(dependencies, list): logger.error(f"Task {i} dependencies is not a list: {type(dependencies)}") return False for dep in dependencies: if dep not in task_ids and dep != 'task_0': logger.error(f"Task {i} has invalid dependency: {dep} (available: {list(task_ids) + ['task_0']})") return False logger.info(f"Task plan validation successful with {len(task_ids)} tasks") return True except Exception as e: logger.error(f"Error validating task plan: {str(e)}") return False def _validateActions(self, actions: List[Dict[str, Any]], context) -> bool: try: if not isinstance(actions, list): logger.error("Actions must be a list") return False if len(actions) == 0: logger.warning("No actions generated") return False for i, action in enumerate(actions): if not isinstance(action, dict): logger.error(f"Action {i} must be a dictionary") return False required_fields = ['method', 'action', 'parameters', 'resultLabel'] missing_fields = [] for field in required_fields: if field not in action or not action[field]: missing_fields.append(field) if missing_fields: logger.error(f"Action {i} missing required fields: {missing_fields}") return False result_label = action.get('resultLabel', '') if not result_label.startswith('task'): logger.error(f"Action {i} result label must start with 'task': {result_label}") return False parameters = action.get('parameters', {}) if not isinstance(parameters, dict): logger.error(f"Action {i} parameters must be a dictionary") return False logger.info(f"Successfully validated {len(actions)} actions") return True except Exception as e: logger.error(f"Error validating actions: {str(e)}") return False