gateway/modules/workflows/processing/workflowProcessor.py

303 lines
13 KiB
Python

# workflowProcessor.py
# Main workflow processor with delegation pattern
import logging
from typing import Dict, Any, Optional, List
from modules.datamodels.datamodelChat import TaskStep, TaskContext, TaskPlan, TaskResult
from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum
from modules.workflows.processing.modes.modeBase import BaseMode
from modules.workflows.processing.modes.modeActionplan import ActionplanMode
from modules.workflows.processing.modes.modeDynamic import DynamicMode
from modules.workflows.processing.modes.modeAutomation import AutomationMode
from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
logger = logging.getLogger(__name__)
class WorkflowProcessor:
"""Main workflow processor that delegates to appropriate mode implementations"""
def __init__(self, services):
self.services = services
self.mode = self._createMode(services.workflow.workflowMode)
def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode:
"""Create the appropriate mode implementation based on workflow mode"""
if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC:
return DynamicMode(self.services)
elif workflowMode == WorkflowModeEnum.WORKFLOW_ACTIONPLAN:
return ActionplanMode(self.services)
elif workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION:
return AutomationMode(self.services)
else:
raise ValueError(f"Invalid workflow mode: {workflowMode}")
async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan:
"""Generate a high-level task plan for the workflow"""
import time
# Init progress logger
operationId = f"taskPlan_{workflow.id}_{int(time.time())}"
try:
# Check workflow status before generating task plan
checkWorkflowStopped(self.services)
# Start progress tracking
self.services.chat.progressLogStart(
operationId,
"Workflow Planning",
"Task Plan Generation",
f"Mode: {workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode}"
)
# Initialize currentUserLanguage to empty at workflow start
self.services.currentUserLanguage = ""
logger.info(f"=== STARTING TASK PLAN GENERATION ===")
logger.info(f"Workflow ID: {workflow.id}")
logger.info(f"User Input: {userInput}")
modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode
logger.info(f"Workflow Mode: {modeValue}")
# Update progress - generating task plan
self.services.chat.progressLogUpdate(operationId, 0.3, "Analyzing input")
# Delegate to the appropriate mode
taskPlan = await self.mode.generateTaskPlan(userInput, workflow)
# Update progress - creating task plan message
self.services.chat.progressLogUpdate(operationId, 0.8, "Creating plan")
# Create task plan message
await self.mode.createTaskPlanMessage(taskPlan, workflow)
# Complete progress tracking
self.services.chat.progressLogFinish(operationId, True)
return taskPlan
except Exception as e:
logger.error(f"Error in generateTaskPlan: {str(e)}")
# Complete progress tracking with failure
self.services.chat.progressLogFinish(operationId, False)
raise
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
taskIndex: int = None, totalTasks: int = None) -> TaskResult:
"""Execute a task step using the appropriate mode"""
import time
# Init progress logger
operationId = f"taskExec_{workflow.id}_{taskIndex}_{int(time.time())}"
try:
# Check workflow status before executing task
checkWorkflowStopped(self.services)
# Start progress tracking
self.services.chat.progressLogStart(
operationId,
"Workflow Execution",
"Task Execution",
f"Task {taskIndex}/{totalTasks}"
)
logger.info(f"=== STARTING TASK EXECUTION ===")
logger.info(f"Task: {taskStep.objective}")
modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode
logger.info(f"Mode: {modeValue}")
# Update progress - executing task
self.services.chat.progressLogUpdate(operationId, 0.2, "Executing")
# Delegate to the appropriate mode
result = await self.mode.executeTask(taskStep, workflow, context, taskIndex, totalTasks)
# Complete progress tracking
self.services.chat.progressLogFinish(operationId, True)
return result
except Exception as e:
logger.error(f"Error in executeTask: {str(e)}")
# Complete progress tracking with failure
self.services.chat.progressLogFinish(operationId, False)
raise
async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow,
previousResults: List = None, enhancedContext: TaskContext = None) -> List:
"""Generate actions for a task step using the appropriate mode"""
try:
# Check workflow status before generating actions
checkWorkflowStopped(self.services)
logger.info(f"=== STARTING ACTION GENERATION ===")
logger.info(f"Task: {taskStep.objective}")
modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode
logger.info(f"Mode: {modeValue}")
# Delegate to the appropriate mode
return await self.mode.generateActionItems(taskStep, workflow, previousResults, enhancedContext)
except Exception as e:
logger.error(f"Error in generateActionItems: {str(e)}")
raise
def updateWorkflowAfterTaskPlanCreated(self, totalTasks: int):
"""Update workflow object after task plan creation"""
try:
updateData = {
"totalTasks": totalTasks,
"currentTask": 0,
"currentAction": 0,
"totalActions": 0
}
# Update workflow object
self.workflow.totalTasks = totalTasks
self.workflow.currentTask = 0
self.workflow.currentAction = 0
self.workflow.totalActions = 0
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} after task plan creation: {updateData}")
except Exception as e:
logger.error(f"Error updating workflow after task plan creation: {str(e)}")
def updateWorkflowBeforeExecutingTask(self, taskNumber: int):
"""Update workflow object before executing a task"""
try:
updateData = {
"currentTask": taskNumber,
"currentAction": 0,
"totalActions": 0
}
# Update workflow object
self.workflow.currentTask = taskNumber
self.workflow.currentAction = 0
self.workflow.totalActions = 0
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} before executing task {taskNumber}: {updateData}")
except Exception as e:
logger.error(f"Error updating workflow before executing task: {str(e)}")
def updateWorkflowAfterActionPlanning(self, totalActions: int):
"""Update workflow object after action planning for current task"""
try:
updateData = {
"totalActions": totalActions
}
# Update workflow object
self.workflow.totalActions = totalActions
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} after action planning: {updateData}")
except Exception as e:
logger.error(f"Error updating workflow after action planning: {str(e)}")
def updateWorkflowBeforeExecutingAction(self, actionNumber: int):
"""Update workflow object before executing an action"""
try:
updateData = {
"currentAction": actionNumber
}
# Update workflow object
self.workflow.currentAction = actionNumber
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} before executing action {actionNumber}: {updateData}")
except Exception as e:
logger.error(f"Error updating workflow before executing action: {str(e)}")
def setWorkflowTotals(self, totalTasks: int = None, totalActions: int = None):
"""Set total counts for workflow progress tracking and update database"""
try:
updateData = {}
if totalTasks is not None:
self.workflow.totalTasks = totalTasks
updateData["totalTasks"] = totalTasks
if totalActions is not None:
self.workflow.totalActions = totalActions
updateData["totalActions"] = totalActions
# Update workflow object in database if we have changes
if updateData:
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} totals in database: {updateData}")
logger.debug(f"Updated workflow totals: Tasks {self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 'N/A'}, Actions {self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 'N/A'}")
except Exception as e:
logger.error(f"Error setting workflow totals: {str(e)}")
def resetWorkflowForNewSession(self):
"""Reset workflow object for a new session"""
try:
updateData = {
"currentTask": 0,
"currentAction": 0,
"totalTasks": 0,
"totalActions": 0
}
# Update workflow object
self.workflow.currentTask = 0
self.workflow.currentAction = 0
self.workflow.totalTasks = 0
self.workflow.totalActions = 0
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Reset workflow {self.workflow.id} for new session: {updateData}")
except Exception as e:
logger.error(f"Error resetting workflow for new session: {str(e)}")
async def prepareTaskHandover(self, taskStep, taskActions, taskResult, workflow):
"""Prepare task handover data for workflow coordination"""
try:
# Check workflow status before preparing task handover
checkWorkflowStopped(self.services)
# Log handover status summary
status = taskResult.status if taskResult else 'unknown'
# Handle both TaskResult and ReviewResult objects
if hasattr(taskResult, 'metCriteria'):
# This is a ReviewResult object
met = taskResult.metCriteria if taskResult.metCriteria else []
reviewResult = taskResult.model_dump()
else:
# This is a TaskResult object
met = []
reviewResult = {
'status': taskResult.status if taskResult else 'unknown',
'reason': taskResult.error if taskResult and hasattr(taskResult, 'error') else None,
'success': taskResult.success if taskResult else False
}
handoverData = {
'task_id': taskStep.id,
'task_description': taskStep.objective,
'actions': [action.model_dump() for action in taskActions] if taskActions else [],
'review_result': reviewResult,
'workflow_id': workflow.id,
'handover_time': self.services.utils.timestampGetUtc()
}
logger.info(f"Prepared handover for task {taskStep.id} in workflow {workflow.id}")
return handoverData
except Exception as e:
logger.error(f"Error in prepareTaskHandover: {str(e)}")
return {'error': str(e)}