615 lines
26 KiB
Python
615 lines
26 KiB
Python
from typing import Dict, Any, List, Optional
|
|
import logging
|
|
from datetime import datetime, UTC
|
|
import uuid
|
|
import asyncio
|
|
|
|
from modules.interfaces.interfaceAppObjects import User
|
|
|
|
from modules.interfaces.interfaceChatModel import (UserInputRequest, ChatMessage, ChatWorkflow, TaskItem, TaskStatus)
|
|
from modules.interfaces.interfaceChatObjects import ChatObjects
|
|
from modules.workflows._transfer.handlingTasks import HandlingTasks, WorkflowStoppedException
|
|
from modules.interfaces.interfaceChatModel import WorkflowResult
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class WorkflowManager:
|
|
"""Manager for workflow processing and coordination"""
|
|
|
|
def __init__(self, chatInterface: ChatObjects, currentUser: User):
|
|
self.chatInterface = chatInterface
|
|
self.currentUser = currentUser
|
|
self.handlingTasks = None
|
|
|
|
# Exported functions
|
|
|
|
async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None, workflowMode: str = "Actionplan") -> ChatWorkflow:
|
|
"""Starts a new workflow or continues an existing one, then launches processing."""
|
|
try:
|
|
# Debug log to check workflowMode parameter
|
|
logger.info(f"WorkflowManager received workflowMode: {workflowMode}")
|
|
currentTime = get_utc_timestamp()
|
|
|
|
if workflowId:
|
|
workflow = self.chatInterface.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise ValueError(f"Workflow {workflowId} not found")
|
|
|
|
if workflow.status == "running":
|
|
logger.info(f"Stopping running workflow {workflowId} before processing new prompt")
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = currentTime
|
|
self.chatInterface.updateWorkflow(workflowId, {
|
|
"status": "stopped",
|
|
"lastActivity": currentTime
|
|
})
|
|
self.chatInterface.createLog({
|
|
"workflowId": workflowId,
|
|
"message": "Workflow stopped for new prompt",
|
|
"type": "info",
|
|
"status": "stopped",
|
|
"progress": 100
|
|
})
|
|
await asyncio.sleep(0.1)
|
|
|
|
newRound = workflow.currentRound + 1
|
|
self.chatInterface.updateWorkflow(workflowId, {
|
|
"status": "running",
|
|
"lastActivity": currentTime,
|
|
"currentRound": newRound
|
|
})
|
|
|
|
workflow = self.chatInterface.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise ValueError(f"Failed to reload workflow {workflowId} after update")
|
|
|
|
self.chatInterface.createLog({
|
|
"workflowId": workflowId,
|
|
"message": f"Workflow resumed (round {workflow.currentRound})",
|
|
"type": "info",
|
|
"status": "running",
|
|
"progress": 0
|
|
})
|
|
else:
|
|
workflowData = {
|
|
"name": "New Workflow",
|
|
"status": "running",
|
|
"startedAt": currentTime,
|
|
"lastActivity": currentTime,
|
|
"currentRound": 0,
|
|
"currentTask": 0,
|
|
"currentAction": 0,
|
|
"totalTasks": 0,
|
|
"totalActions": 0,
|
|
"mandateId": self.chatInterface.mandateId,
|
|
"messageIds": [],
|
|
"workflowMode": workflowMode,
|
|
"maxSteps": 5 if workflowMode == "React" else 1, # Set maxSteps for React mode
|
|
"stats": {
|
|
"processingTime": None,
|
|
"tokenCount": None,
|
|
"bytesSent": None,
|
|
"bytesReceived": None,
|
|
"successRate": None,
|
|
"errorCount": None
|
|
}
|
|
}
|
|
|
|
workflow = self.chatInterface.createWorkflow(workflowData)
|
|
logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}")
|
|
logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}")
|
|
workflow.currentRound = 1
|
|
self.chatInterface.updateWorkflow(workflow.id, {"currentRound": 1})
|
|
self.chatInterface.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0)
|
|
|
|
# Start workflow processing asynchronously
|
|
asyncio.create_task(self._workflowProcess(userInput, workflow))
|
|
|
|
return workflow
|
|
except Exception as e:
|
|
logger.error(f"Error starting workflow: {str(e)}")
|
|
raise
|
|
|
|
async def workflowStop(self, workflowId: str) -> ChatWorkflow:
|
|
"""Stops a running workflow."""
|
|
try:
|
|
workflow = self.chatInterface.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise ValueError(f"Workflow {workflowId} not found")
|
|
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = get_utc_timestamp()
|
|
self.chatInterface.updateWorkflow(workflowId, {
|
|
"status": "stopped",
|
|
"lastActivity": workflow.lastActivity
|
|
})
|
|
self.chatInterface.createLog({
|
|
"workflowId": workflowId,
|
|
"message": "Workflow stopped",
|
|
"type": "warning",
|
|
"status": "stopped",
|
|
"progress": 100
|
|
})
|
|
return workflow
|
|
except Exception as e:
|
|
logger.error(f"Error stopping workflow: {str(e)}")
|
|
raise
|
|
|
|
# Main processor
|
|
|
|
async def _workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
|
|
"""Process a workflow with user input"""
|
|
try:
|
|
self.handlingTasks = HandlingTasks(self.chatInterface, self.currentUser, workflow)
|
|
self.handlingTasks.service.setUserLanguage(userInput.userLanguage)
|
|
message = await self._sendFirstMessage(userInput, workflow)
|
|
task_plan = await self._planTasks(userInput, workflow)
|
|
workflow_result = await self._executeTasks(task_plan, workflow)
|
|
await self._processWorkflowResults(workflow, workflow_result, message)
|
|
|
|
except WorkflowStoppedException:
|
|
self._handleWorkflowStop(workflow)
|
|
|
|
except Exception as e:
|
|
self._handleWorkflowError(workflow, e)
|
|
|
|
# Helper functions
|
|
|
|
async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage:
|
|
"""Send first message to start workflow"""
|
|
try:
|
|
self.handlingTasks._checkWorkflowStopped()
|
|
|
|
# Create initial message using interface
|
|
# Generate the correct documentsLabel that matches what getDocumentReferenceString will create
|
|
round_num = workflow.currentRound
|
|
task_num = 0
|
|
action_num = 0
|
|
context_label = f"round{round_num}_task{task_num}_action{action_num}_context"
|
|
|
|
messageData = {
|
|
"workflowId": workflow.id,
|
|
"role": "user",
|
|
"message": userInput.prompt,
|
|
"status": "first",
|
|
"sequenceNr": 1,
|
|
"publishedAt": get_utc_timestamp(),
|
|
"documentsLabel": context_label,
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "pending",
|
|
"actionProgress": "pending"
|
|
}
|
|
|
|
# Create message first to get messageId
|
|
message = self.chatInterface.createMessage(messageData)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
|
|
# Clear trace log for new workflow session
|
|
self.handlingTasks.service.clearTraceLog()
|
|
|
|
# Add documents if any, now with messageId
|
|
if userInput.listFileId:
|
|
# Process file IDs and add to message data
|
|
documents = await self.handlingTasks.service.processFileIds(userInput.listFileId, message.id)
|
|
message.documents = documents
|
|
# Update the message with documents in database
|
|
self.chatInterface.updateMessage(message.id, {"documents": [doc.to_dict() for doc in documents]})
|
|
|
|
return message
|
|
else:
|
|
raise Exception("Failed to create first message")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending first message: {str(e)}")
|
|
raise
|
|
|
|
async def _planTasks(self, userInput: UserInputRequest, workflow: ChatWorkflow):
|
|
"""Generate task plan for workflow execution"""
|
|
handling = self.handlingTasks
|
|
# Generate task plan first (shared for both modes)
|
|
task_plan = await handling.generateTaskPlan(userInput.prompt, workflow)
|
|
if not task_plan or not task_plan.tasks:
|
|
raise Exception("No tasks generated in task plan.")
|
|
workflow_mode = getattr(workflow, 'workflowMode', 'Actionplan')
|
|
logger.info(f"Workflow object attributes: {workflow.__dict__ if hasattr(workflow, '__dict__') else 'No __dict__'}")
|
|
logger.info(f"Executing workflow mode={workflow_mode} with {len(task_plan.tasks)} tasks")
|
|
return task_plan
|
|
|
|
async def _executeTasks(self, task_plan, workflow: ChatWorkflow) -> WorkflowResult:
|
|
"""Execute all tasks in the task plan"""
|
|
handling = self.handlingTasks
|
|
total_tasks = len(task_plan.tasks)
|
|
all_task_results: List = []
|
|
previous_results: List[str] = []
|
|
|
|
for idx, task_step in enumerate(task_plan.tasks):
|
|
current_task_index = idx + 1
|
|
logger.info(f"Task {current_task_index}/{total_tasks}: {task_step.objective}")
|
|
|
|
# Build TaskContext (mode-specific behavior is inside HandlingTasks)
|
|
from modules.interfaces.interfaceChatModel import TaskContext
|
|
task_context = TaskContext(
|
|
task_step=task_step,
|
|
workflow=workflow,
|
|
workflow_id=workflow.id,
|
|
available_documents=None,
|
|
available_connections=None,
|
|
previous_results=previous_results,
|
|
previous_handover=None,
|
|
improvements=[],
|
|
retry_count=0,
|
|
previous_action_results=[],
|
|
previous_review_result=None,
|
|
is_regeneration=False,
|
|
failure_patterns=[],
|
|
failed_actions=[],
|
|
successful_actions=[],
|
|
criteria_progress={
|
|
'met_criteria': set(),
|
|
'unmet_criteria': set(),
|
|
'attempt_history': []
|
|
}
|
|
)
|
|
|
|
task_result = await handling.executeTask(task_step, workflow, task_context, current_task_index, total_tasks)
|
|
handover_data = await handling.prepareTaskHandover(task_step, [], task_result, workflow)
|
|
all_task_results.append({
|
|
'task_step': task_step,
|
|
'task_result': task_result,
|
|
'handover_data': handover_data
|
|
})
|
|
if task_result.success and task_result.feedback:
|
|
previous_results.append(task_result.feedback)
|
|
|
|
return WorkflowResult(
|
|
status="completed",
|
|
completed_tasks=len(all_task_results),
|
|
total_tasks=total_tasks,
|
|
execution_time=0.0,
|
|
final_results_count=len(all_task_results)
|
|
)
|
|
|
|
async def _processWorkflowResults(self, workflow: ChatWorkflow, workflow_result: WorkflowResult, initial_message: ChatMessage) -> None:
|
|
"""Process workflow results and create appropriate messages"""
|
|
try:
|
|
try:
|
|
self.handlingTasks._checkWorkflowStopped()
|
|
except WorkflowStoppedException:
|
|
logger.info(f"Workflow {workflow.id} was stopped during result processing")
|
|
|
|
# Create final stopped message
|
|
stopped_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": "🛑 Workflow stopped by user",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": get_utc_timestamp(),
|
|
"documentsLabel": "workflow_stopped",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "stopped",
|
|
"actionProgress": "stopped"
|
|
}
|
|
message = self.chatInterface.createMessage(stopped_message)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
|
|
# Update workflow status to stopped
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = get_utc_timestamp()
|
|
self.chatInterface.updateWorkflow(workflow.id, {
|
|
"status": "stopped",
|
|
"lastActivity": workflow.lastActivity
|
|
})
|
|
return
|
|
|
|
if workflow_result.status == 'stopped':
|
|
# Create stopped message
|
|
stopped_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": "🛑 Workflow stopped by user",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": get_utc_timestamp(),
|
|
"documentsLabel": "workflow_stopped",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "stopped",
|
|
"actionProgress": "stopped"
|
|
}
|
|
message = self.chatInterface.createMessage(stopped_message)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
|
|
# Update workflow status to stopped
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = get_utc_timestamp()
|
|
self.chatInterface.updateWorkflow(workflow.id, {
|
|
"status": "stopped",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
# Add stopped log entry
|
|
self.chatInterface.createLog({
|
|
"workflowId": workflow.id,
|
|
"message": "Workflow stopped by user",
|
|
"type": "warning",
|
|
"status": "stopped",
|
|
"progress": 100
|
|
})
|
|
return
|
|
elif workflow_result.status == 'failed':
|
|
# Create error message
|
|
error_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": f"Workflow failed: {workflow_result.error or 'Unknown error'}",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": get_utc_timestamp(),
|
|
"documentsLabel": "workflow_failure",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "fail",
|
|
"actionProgress": "fail"
|
|
}
|
|
message = self.chatInterface.createMessage(error_message)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
|
|
# Update workflow status to failed
|
|
workflow.status = "failed"
|
|
workflow.lastActivity = get_utc_timestamp()
|
|
self.chatInterface.updateWorkflow(workflow.id, {
|
|
"status": "failed",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
# Add failed log entry
|
|
self.chatInterface.createLog({
|
|
"workflowId": workflow.id,
|
|
"message": f"Workflow failed: {workflow_result.error or 'Unknown error'}",
|
|
"type": "error",
|
|
"status": "failed",
|
|
"progress": 100
|
|
})
|
|
return
|
|
|
|
# For successful workflows, send detailed completion message
|
|
await self._sendLastMessage(workflow)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing workflow results: {str(e)}")
|
|
# Create error message
|
|
error_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": f"Error processing workflow results: {str(e)}",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": get_utc_timestamp(),
|
|
"documentsLabel": "workflow_error",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "fail",
|
|
"actionProgress": "fail"
|
|
}
|
|
message = self.chatInterface.createMessage(error_message)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
|
|
# Update workflow status to failed
|
|
workflow.status = "failed"
|
|
workflow.lastActivity = get_utc_timestamp()
|
|
self.chatInterface.updateWorkflow(workflow.id, {
|
|
"status": "failed",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
async def _sendLastMessage(self, workflow: ChatWorkflow) -> None:
|
|
"""Send last message to complete workflow (only for successful workflows)"""
|
|
try:
|
|
# Safety check: ensure this is only called for successful workflows
|
|
if workflow.status in ['stopped', 'failed']:
|
|
logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}")
|
|
return
|
|
|
|
# Generate feedback
|
|
feedback = await self._generateWorkflowFeedback(workflow)
|
|
|
|
# Create last message using interface
|
|
messageData = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": feedback,
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": get_utc_timestamp(),
|
|
"documentsLabel": "workflow_feedback",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "success",
|
|
"actionProgress": "success"
|
|
}
|
|
|
|
# Create message using interface
|
|
message = self.chatInterface.createMessage(messageData)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
|
|
# Update workflow status to completed
|
|
workflow.status = "completed"
|
|
workflow.lastActivity = get_utc_timestamp()
|
|
|
|
# Update workflow in database
|
|
self.chatInterface.updateWorkflow(workflow.id, {
|
|
"status": "completed",
|
|
"lastActivity": workflow.lastActivity
|
|
})
|
|
|
|
# Add completion log entry
|
|
self.chatInterface.createLog({
|
|
"workflowId": workflow.id,
|
|
"message": "Workflow completed",
|
|
"type": "success",
|
|
"status": "completed",
|
|
"progress": 100
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending last message: {str(e)}")
|
|
raise
|
|
|
|
async def _generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
|
|
"""Generate feedback message for workflow completion"""
|
|
try:
|
|
self.handlingTasks._checkWorkflowStopped()
|
|
|
|
# Count messages by role
|
|
user_messages = [msg for msg in workflow.messages if msg.role == 'user']
|
|
assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant']
|
|
|
|
# Generate summary feedback
|
|
feedback = f"Workflow completed.\n\n"
|
|
feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n"
|
|
|
|
# Add final status
|
|
if workflow.status == "completed":
|
|
feedback += "All tasks completed successfully."
|
|
elif workflow.status == "partial":
|
|
feedback += "Some tasks completed with partial success."
|
|
else:
|
|
feedback += f"Workflow status: {workflow.status}"
|
|
|
|
return feedback
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating workflow feedback: {str(e)}")
|
|
return "Workflow processing completed."
|
|
|
|
def _handleWorkflowStop(self, workflow: ChatWorkflow) -> None:
|
|
"""Handle workflow stop exception"""
|
|
logger.info("Workflow stopped by user")
|
|
|
|
# Update workflow status to stopped
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = get_utc_timestamp()
|
|
self.chatInterface.updateWorkflow(workflow.id, {
|
|
"status": "stopped",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
# Create final stopped message
|
|
stopped_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": "🛑 Workflow stopped by user",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": get_utc_timestamp(),
|
|
"documentsLabel": "workflow_stopped",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "pending",
|
|
"actionProgress": "pending"
|
|
}
|
|
message = self.chatInterface.createMessage(stopped_message)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
|
|
# Add log entry
|
|
self.chatInterface.createLog({
|
|
"workflowId": workflow.id,
|
|
"message": "Workflow stopped by user",
|
|
"type": "warning",
|
|
"status": "stopped",
|
|
"progress": 100
|
|
})
|
|
|
|
def _handleWorkflowError(self, workflow: ChatWorkflow, error: Exception) -> None:
|
|
"""Handle workflow error exception"""
|
|
logger.error(f"Workflow processing error: {str(error)}")
|
|
|
|
# Update workflow status to failed
|
|
workflow.status = "failed"
|
|
workflow.lastActivity = get_utc_timestamp()
|
|
self.chatInterface.updateWorkflow(workflow.id, {
|
|
"status": "failed",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
# Create error message
|
|
error_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": f"Workflow processing failed: {str(error)}",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": get_utc_timestamp(),
|
|
"documentsLabel": "workflow_error",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "fail",
|
|
"actionProgress": "fail"
|
|
}
|
|
message = self.chatInterface.createMessage(error_message)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
|
|
# Add error log entry
|
|
self.chatInterface.createLog({
|
|
"workflowId": workflow.id,
|
|
"message": f"Workflow failed: {str(error)}",
|
|
"type": "error",
|
|
"status": "failed",
|
|
"progress": 100
|
|
})
|
|
|
|
raise
|