from typing import Dict, Any, List, Optional import logging from datetime import datetime, UTC import uuid import asyncio from modules.interfaces.interfaceAppObjects import User from modules.interfaces.interfaceChatModel import (UserInputRequest, ChatMessage, ChatWorkflow, TaskItem, TaskStatus) from modules.interfaces.interfaceChatObjects import ChatObjects from modules.chat.managerChat import ChatManager from modules.chat.handling.handlingTasks import WorkflowStoppedException from modules.interfaces.interfaceChatModel import WorkflowResult from modules.shared.timezoneUtils import get_utc_timestamp logger = logging.getLogger(__name__) class WorkflowManager: """Manager for workflow processing and coordination""" def __init__(self, chatInterface: ChatObjects, currentUser: User): self.chatInterface = chatInterface self.chatManager = ChatManager(currentUser, chatInterface) self.currentUser = currentUser async def workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None: """Process a workflow with user input using unified workflow phases""" try: # Initialize chat manager await self.chatManager.initialize(workflow) # Set user language self.chatManager.service.setUserLanguage(userInput.userLanguage) # Send first message message = await self._sendFirstMessage(userInput, workflow) # Execute unified workflow workflow_result = await self.chatManager.executeUnifiedWorkflow(userInput, workflow) # Process workflow results await self._processWorkflowResults(workflow, workflow_result, message) # Only send last message for successful workflows # Stopped/failed workflows get their final messages in _processWorkflowResults if workflow_result.status == 'success': await self._sendLastMessage(workflow) except WorkflowStoppedException: logger.info("Workflow stopped by user") # Update workflow status to stopped workflow.status = "stopped" workflow.lastActivity = get_utc_timestamp() self.chatInterface.updateWorkflow(workflow.id, { "status": "stopped", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions }) # Create final stopped message stopped_message = { "workflowId": workflow.id, "role": "assistant", "message": "🛑 Workflow stopped by user", "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_stopped", "documents": [], # Add workflow context fields "roundNumber": workflow.currentRound, "taskNumber": 0, "actionNumber": 0, # Add progress status "taskProgress": "pending", "actionProgress": "pending" } message = self.chatInterface.createWorkflowMessage(stopped_message) if message: workflow.messages.append(message) # Add log entry self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": "Workflow stopped by user", "type": "warning", "status": "stopped", "progress": 100 }) except Exception as e: logger.error(f"Workflow processing error: {str(e)}") # Update workflow status to failed workflow.status = "failed" workflow.lastActivity = get_utc_timestamp() self.chatInterface.updateWorkflow(workflow.id, { "status": "failed", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions }) # Create error message error_message = { "workflowId": workflow.id, "role": "assistant", "message": f"Workflow processing failed: {str(e)}", "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_error", "documents": [], # Add workflow context fields "roundNumber": workflow.currentRound, "taskNumber": 0, "actionNumber": 0, # Add progress status "taskProgress": "fail", "actionProgress": "fail" } message = self.chatInterface.createWorkflowMessage(error_message) if message: workflow.messages.append(message) # Add error log entry self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"Workflow failed: {str(e)}", "type": "error", "status": "failed", "progress": 100 }) raise async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage: """Send first message to start workflow""" try: self.chatManager.handlingTasks._checkWorkflowStopped() # Create initial message using interface # Generate the correct documentsLabel that matches what getDocumentReferenceList() will create round_num = workflow.currentRound task_num = 0 action_num = 0 context_label = f"round{round_num}_task{task_num}_action{action_num}_context" messageData = { "workflowId": workflow.id, "role": "user", "message": userInput.prompt, "status": "first", "sequenceNr": 1, "publishedAt": get_utc_timestamp(), "documentsLabel": context_label, "documents": [], # Add workflow context fields "roundNumber": workflow.currentRound, "taskNumber": 0, "actionNumber": 0, # Add progress status "taskProgress": "pending", "actionProgress": "pending" } # Add documents if any if userInput.listFileId: # Process file IDs and add to message data documents = await self.chatManager.service.processFileIds(userInput.listFileId) messageData["documents"] = documents # Create message using interface message = self.chatInterface.createWorkflowMessage(messageData) if message: workflow.messages.append(message) return message else: raise Exception("Failed to create first message") except Exception as e: logger.error(f"Error sending first message: {str(e)}") raise async def _generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str: """Generate feedback message for workflow completion""" try: self.chatManager.handlingTasks._checkWorkflowStopped() # Count messages by role user_messages = [msg for msg in workflow.messages if msg.role == 'user'] assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant'] # Generate summary feedback feedback = f"Workflow completed.\n\n" feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n" # Add final status if workflow.status == "completed": feedback += "All tasks completed successfully." elif workflow.status == "partial": feedback += "Some tasks completed with partial success." else: feedback += f"Workflow status: {workflow.status}" return feedback except Exception as e: logger.error(f"Error generating workflow feedback: {str(e)}") return "Workflow processing completed." async def _sendLastMessage(self, workflow: ChatWorkflow) -> None: """Send last message to complete workflow (only for successful workflows)""" try: # Safety check: ensure this is only called for successful workflows if workflow.status in ['stopped', 'failed']: logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}") return # Generate feedback feedback = await self._generateWorkflowFeedback(workflow) # Create last message using interface messageData = { "workflowId": workflow.id, "role": "assistant", "message": feedback, "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_feedback", "documents": [], # Add workflow context fields "roundNumber": workflow.currentRound, "taskNumber": 0, "actionNumber": 0, # Add progress status "taskProgress": "success", "actionProgress": "success" } # Create message using interface message = self.chatInterface.createWorkflowMessage(messageData) if message: workflow.messages.append(message) # Update workflow status to completed workflow.status = "completed" workflow.lastActivity = get_utc_timestamp() # Update workflow in database self.chatInterface.updateWorkflow(workflow.id, { "status": "completed", "lastActivity": workflow.lastActivity }) # Add completion log entry self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": "Workflow completed", "type": "success", "status": "completed", "progress": 100 }) except Exception as e: logger.error(f"Error sending last message: {str(e)}") raise async def _processWorkflowResults(self, workflow: ChatWorkflow, workflow_result: WorkflowResult, initial_message: ChatMessage) -> None: """Process workflow results and create appropriate messages""" try: try: self.chatManager.handlingTasks._checkWorkflowStopped() except WorkflowStoppedException: logger.info(f"Workflow {workflow.id} was stopped during result processing") # Create final stopped message stopped_message = { "workflowId": workflow.id, "role": "assistant", "message": "🛑 Workflow stopped by user", "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_stopped", "documents": [], # Add workflow context fields "roundNumber": workflow.currentRound, "taskNumber": 0, "actionNumber": 0, # Add progress status "taskProgress": "stopped", "actionProgress": "stopped" } message = self.chatInterface.createWorkflowMessage(stopped_message) if message: workflow.messages.append(message) # Update workflow status to stopped workflow.status = "stopped" workflow.lastActivity = get_utc_timestamp() self.chatInterface.updateWorkflow(workflow.id, { "status": "stopped", "lastActivity": workflow.lastActivity }) return if workflow_result.status == 'stopped': # Create stopped message stopped_message = { "workflowId": workflow.id, "role": "assistant", "message": "🛑 Workflow stopped by user", "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_stopped", "documents": [], # Add workflow context fields "roundNumber": workflow.currentRound, "taskNumber": 0, "actionNumber": 0, # Add progress status "taskProgress": "stopped", "actionProgress": "stopped" } message = self.chatInterface.createWorkflowMessage(stopped_message) if message: workflow.messages.append(message) # Update workflow status to stopped workflow.status = "stopped" workflow.lastActivity = get_utc_timestamp() self.chatInterface.updateWorkflow(workflow.id, { "status": "stopped", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions }) # Add stopped log entry self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": "Workflow stopped by user", "type": "warning", "status": "stopped", "progress": 100 }) return elif workflow_result.status == 'failed': # Create error message error_message = { "workflowId": workflow.id, "role": "assistant", "message": f"Workflow failed: {workflow_result.error or 'Unknown error'}", "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_failure", "documents": [], # Add workflow context fields "roundNumber": workflow.currentRound, "taskNumber": 0, "actionNumber": 0, # Add progress status "taskProgress": "fail", "actionProgress": "fail" } message = self.chatInterface.createWorkflowMessage(error_message) if message: workflow.messages.append(message) # Update workflow status to failed workflow.status = "failed" workflow.lastActivity = get_utc_timestamp() self.chatInterface.updateWorkflow(workflow.id, { "status": "failed", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions }) # Add failed log entry self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": f"Workflow failed: {workflow_result.error or 'Unknown error'}", "type": "error", "status": "failed", "progress": 100 }) return # For successful workflows, create a simple completion message summary_message = { "workflowId": workflow.id, "role": "assistant", "message": f"Workflow completed successfully.", "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_completion", "documents": [], # Add workflow context fields "roundNumber": workflow.currentRound, "taskNumber": 0, "actionNumber": 0, # Add progress status "taskProgress": "success", "actionProgress": "success" } message = self.chatInterface.createWorkflowMessage(summary_message) if message: workflow.messages.append(message) # Update workflow status to completed for successful workflows workflow.status = "completed" workflow.lastActivity = get_utc_timestamp() self.chatInterface.updateWorkflow(workflow.id, { "status": "completed", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions }) # Add completion log entry self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, "message": "Workflow completed successfully", "type": "success", "status": "completed", "progress": 100 }) except Exception as e: logger.error(f"Error processing workflow results: {str(e)}") # Create error message error_message = { "workflowId": workflow.id, "role": "assistant", "message": f"Error processing workflow results: {str(e)}", "status": "last", "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_error", "documents": [], # Add workflow context fields "roundNumber": workflow.currentRound, "taskNumber": 0, "actionNumber": 0, # Add progress status "taskProgress": "fail", "actionProgress": "fail" } message = self.chatInterface.createWorkflowMessage(error_message) if message: workflow.messages.append(message) # Update workflow status to failed workflow.status = "failed" workflow.lastActivity = get_utc_timestamp() self.chatInterface.updateWorkflow(workflow.id, { "status": "failed", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions })