gateway/modules/workflows/processing/workflowProcessor.py

671 lines
32 KiB
Python

# workflowProcessor.py
# Main workflow processor with delegation pattern
import logging
import json
from typing import Dict, Any, Optional, List, TYPE_CHECKING
from modules.datamodels import datamodelChat
from modules.datamodels.datamodelChat import TaskStep, TaskContext, TaskPlan, ActionResult, ActionDocument, ChatDocument, ChatMessage
from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum
from modules.workflows.processing.modes.modeBase import BaseMode
from modules.workflows.processing.modes.modeDynamic import DynamicMode
from modules.workflows.processing.modes.modeAutomation import AutomationMode
from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
from modules.datamodels.datamodelAi import OperationTypeEnum, PriorityEnum, ProcessingModeEnum
from modules.shared.jsonUtils import extractJsonString, repairBrokenJson
if TYPE_CHECKING:
from modules.datamodels.datamodelWorkflow import TaskResult
logger = logging.getLogger(__name__)
class WorkflowProcessor:
"""Main workflow processor that delegates to appropriate mode implementations"""
def __init__(self, services):
self.services = services
self.mode = self._createMode(services.workflow.workflowMode)
self.workflow = services.workflow
def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode:
"""Create the appropriate mode implementation based on workflow mode"""
if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC:
return DynamicMode(self.services)
elif workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION:
return AutomationMode(self.services)
else:
raise ValueError(f"Invalid workflow mode: {workflowMode}")
async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan:
"""Generate a high-level task plan for the workflow"""
import time
# Init progress logger
operationId = f"taskPlan_{workflow.id}_{int(time.time())}"
try:
# Check workflow status before generating task plan
checkWorkflowStopped(self.services)
# Start progress tracking
self.services.chat.progressLogStart(
operationId,
"Workflow Planning",
"Task Plan Generation",
f"Mode: {workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode}"
)
# currentUserLanguage should already be set from user intention analysis in _sendFirstMessage
# Do NOT reset it here, as it contains the detected language from the user's input
# Only initialize if not already set (should not happen in normal flow)
if not hasattr(self.services, 'currentUserLanguage') or not self.services.currentUserLanguage:
self.services.currentUserLanguage = getattr(self.services.user, 'language', None) or 'en'
logger.info(f"=== STARTING TASK PLAN GENERATION ===")
logger.info(f"Using user language: {self.services.currentUserLanguage}")
logger.info(f"Workflow ID: {workflow.id}")
logger.info(f"User Input: {userInput}")
modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode
logger.info(f"Workflow Mode: {modeValue}")
# Update progress - generating task plan
self.services.chat.progressLogUpdate(operationId, 0.3, "Analyzing input")
# Delegate to the appropriate mode
taskPlan = await self.mode.generateTaskPlan(userInput, workflow)
# Update progress - creating task plan message
self.services.chat.progressLogUpdate(operationId, 0.8, "Creating plan")
# Create task plan message
await self.mode.createTaskPlanMessage(taskPlan, workflow)
# Complete progress tracking
self.services.chat.progressLogFinish(operationId, True)
return taskPlan
except Exception as e:
logger.error(f"Error in generateTaskPlan: {str(e)}")
# Complete progress tracking with failure
self.services.chat.progressLogFinish(operationId, False)
raise
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext) -> datamodelChat.TaskResult:
"""Execute a task step using the appropriate mode"""
import time
# Get task index from workflow state
taskIndex = workflow.getTaskIndex()
# Init progress logger
operationId = f"taskExec_{workflow.id}_{taskIndex}_{int(time.time())}"
try:
# Check workflow status before executing task
checkWorkflowStopped(self.services)
# Start progress tracking
self.services.chat.progressLogStart(
operationId,
"Workflow Execution",
"Task Execution",
f"Task {taskIndex}"
)
logger.info(f"=== STARTING TASK EXECUTION ===")
logger.info(f"Task: {taskStep.objective}")
modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode
logger.info(f"Mode: {modeValue}")
# Update progress - executing task
self.services.chat.progressLogUpdate(operationId, 0.2, "Executing")
# Delegate to the appropriate mode
result = await self.mode.executeTask(taskStep, workflow, context)
# Complete progress tracking
self.services.chat.progressLogFinish(operationId, True)
return result
except Exception as e:
logger.error(f"Error in executeTask: {str(e)}")
# Complete progress tracking with failure
self.services.chat.progressLogFinish(operationId, False)
raise
async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow,
previousResults: List = None, enhancedContext: TaskContext = None) -> List:
"""Generate actions for a task step using the appropriate mode"""
try:
# Check workflow status before generating actions
checkWorkflowStopped(self.services)
logger.info(f"=== STARTING ACTION GENERATION ===")
logger.info(f"Task: {taskStep.objective}")
modeValue = workflow.workflowMode.value if hasattr(workflow.workflowMode, 'value') else workflow.workflowMode
logger.info(f"Mode: {modeValue}")
# Delegate to the appropriate mode
return await self.mode.generateActionItems(taskStep, workflow, previousResults, enhancedContext)
except Exception as e:
logger.error(f"Error in generateActionItems: {str(e)}")
raise
def updateWorkflowAfterTaskPlanCreated(self, totalTasks: int):
"""Update workflow object after task plan creation"""
try:
updateData = {
"totalTasks": totalTasks,
"currentTask": 0,
"currentAction": 0,
"totalActions": 0
}
# Update workflow object
self.workflow.totalTasks = totalTasks
self.workflow.currentTask = 0
self.workflow.currentAction = 0
self.workflow.totalActions = 0
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} after task plan creation: {updateData}")
except Exception as e:
logger.error(f"Error updating workflow after task plan creation: {str(e)}")
def updateWorkflowBeforeExecutingTask(self, taskNumber: int):
"""Update workflow object before executing a task"""
try:
updateData = {
"currentTask": taskNumber,
"currentAction": 0,
"totalActions": 0
}
# Update workflow object
self.workflow.currentTask = taskNumber
self.workflow.currentAction = 0
self.workflow.totalActions = 0
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} before executing task {taskNumber}: {updateData}")
except Exception as e:
logger.error(f"Error updating workflow before executing task: {str(e)}")
def updateWorkflowAfterActionPlanning(self, totalActions: int):
"""Update workflow object after action planning for current task"""
try:
updateData = {
"totalActions": totalActions
}
# Update workflow object
self.workflow.totalActions = totalActions
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} after action planning: {updateData}")
except Exception as e:
logger.error(f"Error updating workflow after action planning: {str(e)}")
def updateWorkflowBeforeExecutingAction(self, actionNumber: int):
"""Update workflow object before executing an action"""
try:
updateData = {
"currentAction": actionNumber
}
# Update workflow object
self.workflow.currentAction = actionNumber
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} before executing action {actionNumber}: {updateData}")
except Exception as e:
logger.error(f"Error updating workflow before executing action: {str(e)}")
def setWorkflowTotals(self, totalTasks: int = None, totalActions: int = None):
"""Set total counts for workflow progress tracking and update database"""
try:
updateData = {}
if totalTasks is not None:
self.workflow.totalTasks = totalTasks
updateData["totalTasks"] = totalTasks
if totalActions is not None:
self.workflow.totalActions = totalActions
updateData["totalActions"] = totalActions
# Update workflow object in database if we have changes
if updateData:
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} totals in database: {updateData}")
logger.debug(f"Updated workflow totals: Tasks {self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 'N/A'}, Actions {self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 'N/A'}")
except Exception as e:
logger.error(f"Error setting workflow totals: {str(e)}")
def resetWorkflowForNewSession(self):
"""Reset workflow object for a new session"""
try:
updateData = {
"currentTask": 0,
"currentAction": 0,
"totalTasks": 0,
"totalActions": 0
}
# Update workflow object
self.workflow.currentTask = 0
self.workflow.currentAction = 0
self.workflow.totalTasks = 0
self.workflow.totalActions = 0
# Update in database
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Reset workflow {self.workflow.id} for new session: {updateData}")
except Exception as e:
logger.error(f"Error resetting workflow for new session: {str(e)}")
async def prepareTaskHandover(self, taskStep, taskActions, taskResult, workflow):
"""Prepare task handover data for workflow coordination"""
try:
# Check workflow status before preparing task handover
checkWorkflowStopped(self.services)
# Log handover status summary
status = taskResult.status if taskResult else 'unknown'
# Handle both TaskResult and ReviewResult objects
if hasattr(taskResult, 'metCriteria'):
# This is a ReviewResult object
met = taskResult.metCriteria if taskResult.metCriteria else []
reviewResult = taskResult.model_dump()
else:
# This is a TaskResult object
met = []
reviewResult = {
'status': taskResult.status if taskResult else 'unknown',
'reason': taskResult.error if taskResult and hasattr(taskResult, 'error') else None,
'success': taskResult.success if taskResult else False
}
handoverData = {
'task_id': taskStep.id,
'task_description': taskStep.objective,
'actions': [action.model_dump() for action in taskActions] if taskActions else [],
'review_result': reviewResult,
'workflow_id': workflow.id,
'handover_time': self.services.utils.timestampGetUtc()
}
logger.info(f"Prepared handover for task {taskStep.id} in workflow {workflow.id}")
return handoverData
except Exception as e:
logger.error(f"Error in prepareTaskHandover: {str(e)}")
return {'error': str(e)}
# Fast Path Implementation
async def detectComplexity(self, prompt: str, documents: Optional[List[ChatDocument]] = None) -> str:
"""
Detect request complexity using AI-based semantic understanding.
Returns:
"simple" | "moderate" | "complex"
Simple: Single question, no documents, straightforward answer (5-15s)
Moderate: Multiple steps, some documents, structured response (30-60s)
Complex: Multi-task, many documents, research needed, generation required (60-120s)
"""
try:
# Ensure AI service is initialized
await self.services.ai.ensureAiObjectsInitialized()
# Build complexity detection prompt (language-agnostic, semantic)
complexityPrompt = (
"You are a complexity analyzer. Analyze the user's request and determine its complexity level.\n\n"
"Consider:\n"
"- Number of distinct tasks or steps required\n"
"- Amount and type of documents provided\n"
"- Need for external research or web search\n"
"- Need for document analysis or extraction\n"
"- Need for content generation (reports, summaries, etc.)\n"
"- Need for multi-step reasoning or planning\n\n"
"Complexity levels:\n"
"- 'simple': Single question, no documents or minimal documents, straightforward answer that can be provided in one AI response (5-15s)\n"
"- 'moderate': Multiple steps, some documents, structured response requiring some processing (30-60s)\n"
"- 'complex': Multi-task workflow, many documents, research needed, content generation required, multi-step planning (60-120s)\n\n"
f"User request:\n{prompt}\n\n"
)
if documents and len(documents) > 0:
complexityPrompt += f"\nDocuments provided: {len(documents)} document(s)\n"
# Add document types
docTypes = [doc.mimeType for doc in documents if hasattr(doc, 'mimeType')]
if docTypes:
complexityPrompt += f"Document types: {', '.join(set(docTypes))}\n"
complexityPrompt += (
"\nReturn ONLY a JSON object with this exact structure:\n"
"{\n"
' "complexity": "simple" | "moderate" | "complex",\n'
' "reasoning": "Brief explanation of why this complexity level"\n'
"}\n"
)
# Call AI for complexity detection (planning call - no documents needed)
aiResponse = await self.services.ai.callAiPlanning(
prompt=complexityPrompt,
placeholders=None,
debugType="complexity_detection"
)
# Parse response
complexity = "moderate" # Default fallback
try:
# callAiPlanning returns a string directly, not an object
responseContent = str(aiResponse) if aiResponse else ""
# Extract JSON from response
jsonStr = extractJsonString(responseContent)
if not jsonStr:
# Try repair if broken
jsonStr = repairBrokenJson(responseContent)
if jsonStr:
parsed = json.loads(jsonStr)
complexity = parsed.get("complexity", "moderate")
reasoning = parsed.get("reasoning", "")
logger.info(f"Complexity detected: {complexity} - {reasoning}")
else:
logger.warning("Could not parse complexity detection response, defaulting to 'moderate'")
except Exception as e:
logger.warning(f"Error parsing complexity detection: {str(e)}, defaulting to 'moderate'")
return complexity
except Exception as e:
logger.error(f"Error in detectComplexity: {str(e)}")
# Default to moderate on error (safe fallback)
return "moderate"
async def fastPathExecute(self, prompt: str, documents: Optional[List[ChatDocument]] = None, userLanguage: Optional[str] = None) -> ActionResult:
"""
Execute simple requests via fast path (single AI call).
Fast path is for simple requests that can be answered in one AI response:
- Single question, no complex processing
- No document extraction needed
- No multi-step planning required
- Direct answer generation
Returns:
ActionResult with response text and optional documents
"""
try:
# Ensure AI service is initialized
await self.services.ai.ensureAiObjectsInitialized()
# Build fast path prompt (understand + execute + deliver in one call)
fastPathPrompt = (
"You are a helpful assistant. Answer the user's question directly and comprehensively.\n\n"
f"User question:\n{prompt}\n\n"
)
# Add user language context if available
if userLanguage:
fastPathPrompt += f"Respond in the user's language: {userLanguage}\n\n"
fastPathPrompt += (
"Provide a clear, complete answer. If the question requires information from documents, "
"extract and present the relevant information. If it's a general question, provide a helpful response.\n\n"
"Format your response as plain text (no markdown code blocks unless showing code examples)."
)
# Prepare AI call options for fast path (balanced, fast processing)
from modules.datamodels.datamodelAi import AiCallOptions, AiCallRequest
options = AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.BASIC,
maxCost=0.10, # Low cost for simple requests
maxProcessingTime=15 # Fast path should complete in 15s
)
# Call AI directly (no document generation - just plain text response)
# Use aiObjects.call() instead of callAiContent() to avoid document generation path
aiRequest = AiCallRequest(
prompt=fastPathPrompt,
context="",
options=options,
contentParts=None # Fast path doesn't process documents
)
aiCallResponse = await self.services.ai.aiObjects.call(aiRequest)
# Extract response content (AiCallResponse.content is a string)
responseText = aiCallResponse.content if aiCallResponse.content else ""
# Create ActionResult with response
# For fast path, we create a simple text document with the response
from modules.datamodels.datamodelChat import ActionDocument
responseDoc = ActionDocument(
documentName="fast_path_response.txt",
documentData=responseText.encode('utf-8') if isinstance(responseText, str) else responseText,
mimeType="text/plain"
)
result = ActionResult(
success=True,
documents=[responseDoc],
resultLabel="fast_path_response"
)
logger.info(f"Fast path executed successfully, response length: {len(responseText)} chars")
return result
except Exception as e:
import traceback
errorDetails = f"{type(e).__name__}: {str(e)}"
logger.error(f"Error in fastPathExecute: {errorDetails}")
logger.debug(f"Fast path error traceback:\n{traceback.format_exc()}")
return ActionResult.isFailure(f"Fast path execution failed: {errorDetails}")
# Workflow-Level Functions
async def initialUnderstanding(self, context: Any) -> Any: # RequestContext -> UnderstandingResult
"""
Initial understanding phase: Combined AI call for parameters + intention + context + tasks.
This function performs a unified understanding of the user's request:
- Extracts basic parameters (language, format, detail level)
- Determines user intention (primaryGoal, secondaryGoals, intentionType)
- Extracts context (topics, requirements, constraints)
- Identifies document references with purpose and relevance
- Creates TaskDefinition[] with deliverables
Args:
context: RequestContext with normalized user input
Returns:
UnderstandingResult with all understanding components
"""
try:
from modules.datamodels.datamodelWorkflow import UnderstandingResult, TaskDefinition
from modules.shared.jsonUtils import parseJsonWithModel
# Ensure AI service is initialized
await self.services.ai.ensureAiObjectsInitialized()
# Build combined understanding prompt
understandingPrompt = (
"You are a request understanding system. Analyze the user's request comprehensively and provide:\n\n"
"1. **Parameters**: Basic parameters (language, format, detail level)\n"
"2. **Intention**: User intention (primaryGoal, secondaryGoals, intentionType)\n"
"3. **Context**: Extracted context (topics, requirements, constraints)\n"
"4. **Document References**: Document references with purpose and relevance\n"
"5. **Tasks**: Task definitions with deliverables\n\n"
f"User request:\n{context.originalPrompt}\n\n"
f"User language: {context.userLanguage}\n"
f"Complexity: {context.detectedComplexity}\n"
)
if context.documents and len(context.documents) > 0:
understandingPrompt += f"\nDocuments provided: {len(context.documents)} document(s)\n"
docTypes = [doc.mimeType for doc in context.documents if hasattr(doc, 'mimeType')]
if docTypes:
understandingPrompt += f"Document types: {', '.join(set(docTypes))}\n"
understandingPrompt += (
"\nReturn ONLY a JSON object with this exact structure:\n"
"{\n"
' "parameters": {"language": "...", "format": "...", "detailLevel": "..."},\n'
' "intention": {"primaryGoal": "...", "secondaryGoals": [...], "intentionType": "..."},\n'
' "context": {"topics": [...], "requirements": [...], "constraints": [...]},\n'
' "documentReferences": [{"reference": "...", "purpose": "...", "relevance": "..."}],\n'
' "tasks": [{"id": "...", "objective": "...", "deliverable": {...}, ...}]\n'
"}\n"
)
# Call AI for understanding (planning call)
aiResponse = await self.services.ai.callAiPlanning(
prompt=understandingPrompt,
placeholders=None,
debugType="initial_understanding"
)
# Parse response using UnderstandingResult model
try:
understandingResult = parseJsonWithModel(aiResponse, UnderstandingResult)
logger.info(f"Initial understanding completed: {len(understandingResult.tasks)} tasks identified")
return understandingResult
except Exception as e:
logger.error(f"Error parsing UnderstandingResult: {str(e)}")
# Return minimal UnderstandingResult on error
return UnderstandingResult(
parameters={"language": context.userLanguage},
intention={"primaryGoal": context.originalPrompt},
context={},
documentReferences=[],
tasks=[]
)
except Exception as e:
logger.error(f"Error in initialUnderstanding: {str(e)}")
# Return minimal UnderstandingResult on error
from modules.datamodels.datamodelWorkflow import UnderstandingResult
return UnderstandingResult(
parameters={"language": context.userLanguage},
intention={"primaryGoal": context.originalPrompt},
context={},
documentReferences=[],
tasks=[]
)
async def persistTaskResult(self, taskResult: Any, workflow: ChatWorkflow, context: Optional[TaskContext] = None) -> ChatMessage: # TaskResult -> ChatMessage
"""
Persist task result as ChatMessage + ChatDocuments for cross-task/round references.
This function converts a TaskResult (workflow execution format) into a ChatMessage
(persistent format) so that documents can be referenced by subsequent tasks or rounds
using docList: references.
Args:
taskResult: TaskResult from task execution
workflow: Current workflow
context: Optional TaskContext for additional context
Returns:
ChatMessage with persisted documents
"""
try:
from modules.datamodels.datamodelChat import ChatMessage, ChatDocument, ActionDocument
from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
# Check workflow status
checkWorkflowStopped(self.services)
# Extract documents from ActionResult
chatDocuments = []
if taskResult.actionResult and taskResult.actionResult.documents:
for actionDoc in taskResult.actionResult.documents:
if hasattr(actionDoc, 'documentData') and actionDoc.documentData:
# Create file in component storage
fileItem = self.services.interfaceDbComponent.createFile(
name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else f"task_{taskResult.taskId}_result.txt",
mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain",
content=actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8')
)
# Persist file data
self.services.interfaceDbComponent.createFileData(
fileItem.id,
actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8')
)
# Get file info
fileInfo = self.services.chat.getFileInfo(fileItem.id)
# Create ChatDocument as dict (messageId will be assigned by createMessage)
# Don't create ChatDocument object directly - it requires messageId which doesn't exist yet
chatDoc = {
"fileId": fileItem.id,
"fileName": fileInfo.get("fileName", actionDoc.documentName) if fileInfo else actionDoc.documentName,
"fileSize": fileInfo.get("size", len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))) if fileInfo else (len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))),
"mimeType": fileInfo.get("mimeType", actionDoc.mimeType) if fileInfo else actionDoc.mimeType,
"roundNumber": workflow.currentRound,
"taskNumber": workflow.getTaskIndex(),
"actionNumber": workflow.getActionIndex()
}
chatDocuments.append(chatDoc)
# Create documentsLabel for docList: references
documentsLabel = f"task_{taskResult.taskId}_results"
if taskResult.actionResult and taskResult.actionResult.resultLabel:
documentsLabel = taskResult.actionResult.resultLabel
# Build user-friendly message
userMessage = "Task completed successfully"
if context and hasattr(context, 'taskStep') and context.taskStep and hasattr(context.taskStep, 'userMessage') and context.taskStep.userMessage:
userMessage = context.taskStep.userMessage
elif context and hasattr(context, 'taskStep') and context.taskStep and hasattr(context.taskStep, 'objective'):
userMessage = f"Completed: {context.taskStep.objective}"
# Create ChatMessage
messageData = {
"workflowId": workflow.id,
"role": "assistant",
"message": userMessage,
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": documentsLabel,
"documents": [],
# Add workflow context fields
"roundNumber": workflow.currentRound,
"taskNumber": workflow.getTaskIndex(),
"actionNumber": workflow.getActionIndex(),
# Add progress status
"taskProgress": "success" if taskResult.actionResult and taskResult.actionResult.success else "fail",
"actionProgress": "success" if taskResult.actionResult and taskResult.actionResult.success else "fail"
}
# Store message with documents
chatMessage = self.services.chat.storeMessageWithDocuments(workflow, messageData, chatDocuments)
logger.info(f"Persisted task result for task {taskResult.taskId}: {len(chatDocuments)} documents")
return chatMessage
except Exception as e:
logger.error(f"Error in persistTaskResult: {str(e)}")
raise