# workflowProcessor.py # Main workflow processor with delegation pattern import logging from typing import Dict, Any, Optional, List from modules.datamodels.datamodelChat import TaskStep, TaskContext, TaskPlan, TaskResult from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum from modules.workflows.processing.modes.modeBase import BaseMode from modules.workflows.processing.modes.modeActionplan import ActionplanMode from modules.workflows.processing.modes.modeDynamic import DynamicMode from modules.workflows.processing.modes.modeAutomation import AutomationMode from modules.workflows.processing.shared.stateTools import checkWorkflowStopped logger = logging.getLogger(__name__) class WorkflowProcessor: """Main workflow processor that delegates to appropriate mode implementations""" def __init__(self, services): self.services = services self.mode = self._createMode(services.workflow.workflowMode) def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode: """Create the appropriate mode implementation based on workflow mode""" if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC: return DynamicMode(self.services) elif workflowMode == WorkflowModeEnum.WORKFLOW_ACTIONPLAN: return ActionplanMode(self.services) elif workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION: return AutomationMode(self.services) else: raise ValueError(f"Invalid workflow mode: {workflowMode}") async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan: """Generate a high-level task plan for the workflow""" import time # Init progress logger operationId = f"taskPlan_{workflow.id}_{int(time.time())}" try: # Check workflow status before generating task plan checkWorkflowStopped(self.services) # Start progress tracking self.services.chat.progressLogStart( operationId, "Workflow Planning", "Task Plan Generation", f"Mode: {workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode}" ) # Initialize currentUserLanguage to empty at workflow start self.services.currentUserLanguage = "" logger.info(f"=== STARTING TASK PLAN GENERATION ===") logger.info(f"Workflow ID: {workflow.id}") logger.info(f"User Input: {userInput}") modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode logger.info(f"Workflow Mode: {modeValue}") # Update progress - generating task plan self.services.chat.progressLogUpdate(operationId, 0.3, "Analyzing input") # Delegate to the appropriate mode taskPlan = await self.mode.generateTaskPlan(userInput, workflow) # Update progress - creating task plan message self.services.chat.progressLogUpdate(operationId, 0.8, "Creating plan") # Create task plan message await self.mode.createTaskPlanMessage(taskPlan, workflow) # Complete progress tracking self.services.chat.progressLogFinish(operationId, True) return taskPlan except Exception as e: logger.error(f"Error in generateTaskPlan: {str(e)}") # Complete progress tracking with failure self.services.chat.progressLogFinish(operationId, False) raise async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext, taskIndex: int = None, totalTasks: int = None) -> TaskResult: """Execute a task step using the appropriate mode""" import time # Init progress logger operationId = f"taskExec_{workflow.id}_{taskIndex}_{int(time.time())}" try: # Check workflow status before executing task checkWorkflowStopped(self.services) # Start progress tracking self.services.chat.progressLogStart( operationId, "Workflow Execution", "Task Execution", f"Task {taskIndex}/{totalTasks}" ) logger.info(f"=== STARTING TASK EXECUTION ===") logger.info(f"Task: {taskStep.objective}") modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode logger.info(f"Mode: {modeValue}") # Update progress - executing task self.services.chat.progressLogUpdate(operationId, 0.2, "Executing") # Delegate to the appropriate mode result = await self.mode.executeTask(taskStep, workflow, context, taskIndex, totalTasks) # Complete progress tracking self.services.chat.progressLogFinish(operationId, True) return result except Exception as e: logger.error(f"Error in executeTask: {str(e)}") # Complete progress tracking with failure self.services.chat.progressLogFinish(operationId, False) raise async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow, previousResults: List = None, enhancedContext: TaskContext = None) -> List: """Generate actions for a task step using the appropriate mode""" try: # Check workflow status before generating actions checkWorkflowStopped(self.services) logger.info(f"=== STARTING ACTION GENERATION ===") logger.info(f"Task: {taskStep.objective}") modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode logger.info(f"Mode: {modeValue}") # Delegate to the appropriate mode return await self.mode.generateActionItems(taskStep, workflow, previousResults, enhancedContext) except Exception as e: logger.error(f"Error in generateActionItems: {str(e)}") raise def updateWorkflowAfterTaskPlanCreated(self, totalTasks: int): """Update workflow object after task plan creation""" try: updateData = { "totalTasks": totalTasks, "currentTask": 0, "currentAction": 0, "totalActions": 0 } # Update workflow object self.workflow.totalTasks = totalTasks self.workflow.currentTask = 0 self.workflow.currentAction = 0 self.workflow.totalActions = 0 # Update in database self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) logger.info(f"Updated workflow {self.workflow.id} after task plan creation: {updateData}") except Exception as e: logger.error(f"Error updating workflow after task plan creation: {str(e)}") def updateWorkflowBeforeExecutingTask(self, taskNumber: int): """Update workflow object before executing a task""" try: updateData = { "currentTask": taskNumber, "currentAction": 0, "totalActions": 0 } # Update workflow object self.workflow.currentTask = taskNumber self.workflow.currentAction = 0 self.workflow.totalActions = 0 # Update in database self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) logger.info(f"Updated workflow {self.workflow.id} before executing task {taskNumber}: {updateData}") except Exception as e: logger.error(f"Error updating workflow before executing task: {str(e)}") def updateWorkflowAfterActionPlanning(self, totalActions: int): """Update workflow object after action planning for current task""" try: updateData = { "totalActions": totalActions } # Update workflow object self.workflow.totalActions = totalActions # Update in database self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) logger.info(f"Updated workflow {self.workflow.id} after action planning: {updateData}") except Exception as e: logger.error(f"Error updating workflow after action planning: {str(e)}") def updateWorkflowBeforeExecutingAction(self, actionNumber: int): """Update workflow object before executing an action""" try: updateData = { "currentAction": actionNumber } # Update workflow object self.workflow.currentAction = actionNumber # Update in database self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) logger.info(f"Updated workflow {self.workflow.id} before executing action {actionNumber}: {updateData}") except Exception as e: logger.error(f"Error updating workflow before executing action: {str(e)}") def setWorkflowTotals(self, totalTasks: int = None, totalActions: int = None): """Set total counts for workflow progress tracking and update database""" try: updateData = {} if totalTasks is not None: self.workflow.totalTasks = totalTasks updateData["totalTasks"] = totalTasks if totalActions is not None: self.workflow.totalActions = totalActions updateData["totalActions"] = totalActions # Update workflow object in database if we have changes if updateData: self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) logger.info(f"Updated workflow {self.workflow.id} totals in database: {updateData}") logger.debug(f"Updated workflow totals: Tasks {self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 'N/A'}, Actions {self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 'N/A'}") except Exception as e: logger.error(f"Error setting workflow totals: {str(e)}") def resetWorkflowForNewSession(self): """Reset workflow object for a new session""" try: updateData = { "currentTask": 0, "currentAction": 0, "totalTasks": 0, "totalActions": 0 } # Update workflow object self.workflow.currentTask = 0 self.workflow.currentAction = 0 self.workflow.totalTasks = 0 self.workflow.totalActions = 0 # Update in database self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData) logger.info(f"Reset workflow {self.workflow.id} for new session: {updateData}") except Exception as e: logger.error(f"Error resetting workflow for new session: {str(e)}") async def prepareTaskHandover(self, taskStep, taskActions, taskResult, workflow): """Prepare task handover data for workflow coordination""" try: # Check workflow status before preparing task handover checkWorkflowStopped(self.services) # Log handover status summary status = taskResult.status if taskResult else 'unknown' # Handle both TaskResult and ReviewResult objects if hasattr(taskResult, 'metCriteria'): # This is a ReviewResult object met = taskResult.metCriteria if taskResult.metCriteria else [] reviewResult = taskResult.model_dump() else: # This is a TaskResult object met = [] reviewResult = { 'status': taskResult.status if taskResult else 'unknown', 'reason': taskResult.error if taskResult and hasattr(taskResult, 'error') else None, 'success': taskResult.success if taskResult else False } handoverData = { 'task_id': taskStep.id, 'task_description': taskStep.objective, 'actions': [action.model_dump() for action in taskActions] if taskActions else [], 'review_result': reviewResult, 'workflow_id': workflow.id, 'handover_time': self.services.utils.timestampGetUtc() } logger.info(f"Prepared handover for task {taskStep.id} in workflow {workflow.id}") return handoverData except Exception as e: logger.error(f"Error in prepareTaskHandover: {str(e)}") return {'error': str(e)}