From 1e8bb8c0c67a03da9cccdadd786737b9af05c98b Mon Sep 17 00:00:00 2001 From: Patrick Motsch <141018425+patrick-motsch@users.noreply.github.com> Date: Sun, 11 Jan 2026 15:00:05 +0100 Subject: [PATCH] Delete modules/workflows/workflowManager.py Was old moodule --- modules/workflows/workflowManager.py | 1239 -------------------------- 1 file changed, 1239 deletions(-) delete mode 100644 modules/workflows/workflowManager.py diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py deleted file mode 100644 index 626bd2cf..00000000 --- a/modules/workflows/workflowManager.py +++ /dev/null @@ -1,1239 +0,0 @@ -from typing import Dict, Any, List, Optional -import logging -import uuid -import asyncio -import json - -from modules.datamodels.datamodelChat import ( - UserInputRequest, - ChatMessage, - ChatWorkflow, - ChatDocument, - WorkflowModeEnum -) -from modules.datamodels.datamodelChat import TaskContext -from modules.workflows.processing.workflowProcessor import WorkflowProcessor -from modules.workflows.processing.shared.stateTools import WorkflowStoppedException, checkWorkflowStopped - - -logger = logging.getLogger(__name__) - -class WorkflowManager: - """Manager for workflow processing and coordination""" - - def __init__(self, services): - self.services = services - self.workflowProcessor = None - - # Exported functions - - async def workflowStart(self, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None) -> ChatWorkflow: - """Starts a new workflow or continues an existing one, then launches processing.""" - try: - # Debug log to check workflowMode parameter - logger.info(f"WorkflowManager received workflowMode: {workflowMode}") - currentTime = self.services.utils.timestampGetUtc() - - if workflowId: - workflow = self.services.chat.getWorkflow(workflowId) - if not workflow: - raise ValueError(f"Workflow {workflowId} not found") - - # Store workflow in services for reference (this is the ChatWorkflow object) - self.services.workflow = workflow - - # CRITICAL: Update all method instances to use the current Services object with the correct workflow - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - discoverMethods(self.services) - logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}") - - if workflow.status == "running": - logger.info(f"Stopping running workflow {workflowId} before processing new prompt") - workflow.status = "stopped" - workflow.lastActivity = currentTime - self.services.chat.updateWorkflow(workflowId, { - "status": "stopped", - "lastActivity": currentTime - }) - self.services.chat.storeLog(workflow, { - "message": "Workflow stopped for new prompt", - "type": "info", - "status": "stopped", - "progress": 1.0 - }) - - newRound = workflow.currentRound + 1 - self.services.chat.updateWorkflow(workflowId, { - "status": "running", - "lastActivity": currentTime, - "currentRound": newRound, - "workflowMode": workflowMode # Update workflow mode for existing workflows - }) - - # Reflect updates on the in-memory object without reloading - workflow.status = "running" - workflow.lastActivity = currentTime - workflow.currentRound = newRound - workflow.workflowMode = workflowMode - - self.services.chat.storeLog(workflow, { - "message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}", - "type": "info", - "status": "running", - "progress": 0 - }) - - else: - workflowData = { - "name": "New Workflow", - "status": "running", - "startedAt": currentTime, - "lastActivity": currentTime, - "currentRound": 1, - "currentTask": 0, - "currentAction": 0, - "totalTasks": 0, - "totalActions": 0, - "mandateId": self.services.user.mandateId, - "messageIds": [], - "workflowMode": workflowMode, - "maxSteps": 10 , # Set maxSteps - } - - workflow = self.services.chat.createWorkflow(workflowData) - logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}") - logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}") - - # Store workflow in services (this is the ChatWorkflow object) - self.services.workflow = workflow - - # CRITICAL: Update all method instances to use the current Services object with the correct workflow - # This ensures cached method instances don't use stale workflow IDs from previous workflows - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - discoverMethods(self.services) - logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}") - - # Start workflow processing asynchronously - asyncio.create_task(self._workflowProcess(userInput)) - - return workflow - except Exception as e: - logger.error(f"Error starting workflow: {str(e)}") - raise - - async def workflowStop(self, workflowId: str) -> ChatWorkflow: - """Stops a running workflow.""" - try: - workflow = self.services.chat.getWorkflow(workflowId) - if not workflow: - raise ValueError(f"Workflow {workflowId} not found") - - # Store workflow in services (this is the ChatWorkflow object) - self.services.workflow = workflow - - workflow.status = "stopped" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflowId, { - "status": "stopped", - "lastActivity": workflow.lastActivity - }) - self.services.chat.storeLog(workflow, { - "message": "Workflow stopped", - "type": "warning", - "status": "stopped", - "progress": 1.0 - }) - return workflow - except Exception as e: - logger.error(f"Error stopping workflow: {str(e)}") - raise - - # Main processor - - async def _workflowProcess(self, userInput: UserInputRequest) -> None: - """Process a workflow with user input""" - try: - # Store the current user prompt in services for easy access throughout the workflow - self.services.rawUserPrompt = userInput.prompt - self.services.currentUserPrompt = userInput.prompt - - # Reset progress logger for new workflow - self.services.chat._progressLogger = None - - # Reset workflow history flag at start of each workflow - setattr(self.services, '_needsWorkflowHistory', False) - - self.workflowProcessor = WorkflowProcessor(self.services) - - # Get workflow mode to determine if complexity detection is needed - workflowMode = getattr(self.services.workflow, 'workflowMode', None) - skipComplexityDetection = (workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION) - - if skipComplexityDetection: - logger.info("Skipping complexity detection for AUTOMATION mode - using predefined plan") - complexity = "moderate" # Default for automation workflows - needsWorkflowHistory = False # Automation workflows don't need history - detectedLanguage = None # No language detection in automation mode - else: - # Process user-uploaded documents from userInput for complexity detection - # This is the correct way: use the input data directly, not workflow state - documents = [] - if userInput.listFileId: - try: - documents = await self._processFileIds(userInput.listFileId, None) - except Exception as e: - logger.warning(f"Failed to process user fileIds for complexity detection: {e}") - - # Detect complexity (AI-based semantic understanding) using user input documents - # Also detects language for fast path responses - complexity, needsWorkflowHistory, detectedLanguage = await self.workflowProcessor.detectComplexity(userInput.prompt, documents) - logger.info(f"Request complexity detected: {complexity}, needsWorkflowHistory: {needsWorkflowHistory}, language: {detectedLanguage}") - - # Set detected language for fast path (if detected) - if detectedLanguage and isinstance(detectedLanguage, str): - self._setUserLanguage(detectedLanguage) - try: - setattr(self.services, 'currentUserLanguage', detectedLanguage) - except Exception: - pass - - # Route to fast path for simple requests if history is not needed - # Skip fast path for automation mode or if history is needed - if complexity == "simple" and not needsWorkflowHistory: - logger.info("Routing to fast path for simple request") - await self._executeFastPath(userInput, documents) - return # Fast path completes the workflow - - # Now send the first message (which will also process the documents again, but that's fine) - await self._sendFirstMessage(userInput) - - # Route to full workflow for moderate/complex requests or automation mode - logger.info(f"Routing to full workflow for {complexity} request" + (" (automation mode)" if skipComplexityDetection else "")) - taskPlan = await self._planTasks(userInput) - await self._executeTasks(taskPlan) - await self._processWorkflowResults() - - except WorkflowStoppedException: - self._handleWorkflowStop() - - except Exception as e: - self._handleWorkflowError(e) - - # Helper functions - - async def _executeFastPath(self, userInput: UserInputRequest, documents: List[ChatDocument]) -> None: - """Execute fast path for simple requests and deliver result to user""" - try: - workflow = self.services.workflow - checkWorkflowStopped(self.services) - - # Log fast path start - self.services.chat.storeLog(workflow, { - "message": "Fast path execution started", - "type": "info", - "status": "running", - "progress": 0.1 - }) - - # Get user language if available - userLanguage = getattr(self.services, 'currentUserLanguage', None) - - # Execute fast path - use normalizedRequest if available, otherwise use raw prompt - normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt - - # Log fast path execution - self.services.chat.storeLog(workflow, { - "message": f"Processing request via fast path (language: {userLanguage or 'auto'})", - "type": "info", - "status": "running", - "progress": 0.3 - }) - - result = await self.workflowProcessor.fastPathExecute( - prompt=normalizedPrompt, - documents=documents, - userLanguage=userLanguage - ) - - if not result.success: - # Fast path failed, fall back to full workflow - logger.warning(f"Fast path failed: {result.error}, falling back to full workflow") - self.services.chat.storeLog(workflow, { - "message": f"Fast path failed: {result.error}. Falling back to full workflow.", - "type": "warning", - "status": "running", - "progress": 0.5 - }) - taskPlan = await self._planTasks(userInput) - await self._executeTasks(taskPlan) - await self._processWorkflowResults() - return - - # Extract response text from ActionResult - responseText = "" - chatDocuments = [] - - if result.documents and len(result.documents) > 0: - # Get response text from first document - firstDoc = result.documents[0] - if hasattr(firstDoc, 'documentData'): - docData = firstDoc.documentData - if isinstance(docData, bytes): - responseText = docData.decode('utf-8') - else: - responseText = str(docData) - - # Convert ActionDocuments to ChatDocuments for persistence - for actionDoc in result.documents: - if hasattr(actionDoc, 'documentData') and actionDoc.documentData: - # Create file in component storage - fileItem = self.services.interfaceDbComponent.createFile( - name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else "fast_path_response.txt", - mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain", - content=actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8') - ) - # Persist file data - self.services.interfaceDbComponent.createFileData(fileItem.id, actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8')) - - # Get file info - fileInfo = self.services.chat.getFileInfo(fileItem.id) - - # Create ChatDocument as dict (messageId will be assigned by createMessage) - # Don't create ChatDocument object directly - it requires messageId which doesn't exist yet - chatDoc = { - "fileId": fileItem.id, - "fileName": fileInfo.get("fileName", actionDoc.documentName) if fileInfo else actionDoc.documentName, - "fileSize": fileInfo.get("size", len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))) if fileInfo else (len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))), - "mimeType": fileInfo.get("mimeType", actionDoc.mimeType) if fileInfo else actionDoc.mimeType, - "roundNumber": workflow.currentRound, - "taskNumber": 0, # Fast path doesn't have tasks - "actionNumber": 0 - } - chatDocuments.append(chatDoc) - - # Create initial user message first - roundNum = workflow.currentRound - contextLabel = f"round{roundNum}_usercontext" - - userMessageData = { - "workflowId": workflow.id, - "role": "user", - "message": userInput.prompt, - "status": "first", - "sequenceNr": 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": contextLabel, - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "pending", - "actionProgress": "pending" - } - - # Store user message (with any uploaded documents) - # Convert ChatDocument objects to dicts for storeMessageWithDocuments - userDocuments = [] - for doc in documents: - if isinstance(doc, dict): - userDoc = dict(doc) - else: - # ChatDocument object - convert to dict - userDoc = { - "fileId": doc.fileId, - "fileName": doc.fileName, - "fileSize": doc.fileSize, - "mimeType": doc.mimeType, - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0 - } - userDocuments.append(userDoc) - - self.services.chat.storeMessageWithDocuments(workflow, userMessageData, userDocuments) - - # Log user message stored - self.services.chat.storeLog(workflow, { - "message": "User message stored", - "type": "info", - "status": "running", - "progress": 0.6 - }) - - # Mark workflow as completed BEFORE storing response message (so UI polling stops) - workflow.status = "completed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "completed", - "lastActivity": workflow.lastActivity - }) - - # Log response generation - self.services.chat.storeLog(workflow, { - "message": "Generating fast path response", - "type": "info", - "status": "running", - "progress": 0.8 - }) - - # Create ChatMessage with fast path response (in user's language) - messageData = { - "workflowId": workflow.id, - "role": "assistant", - "message": responseText or "Fast path response completed", - "status": "last", # Fast path completes the workflow - UI polling stops on this - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "fast_path_response", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, # Fast path doesn't have tasks - "actionNumber": 0, - # Add progress status - "taskProgress": "success", - "actionProgress": "success" - } - - # Store message with documents - self.services.chat.storeMessageWithDocuments(workflow, messageData, chatDocuments) - - # Log fast path completion - self.services.chat.storeLog(workflow, { - "message": f"Fast path completed successfully (response length: {len(responseText)} chars)", - "type": "info", - "status": "completed", - "progress": 1.0 - }) - - logger.info(f"Fast path completed successfully, response length: {len(responseText)} chars") - - except Exception as e: - logger.error(f"Error in _executeFastPath: {str(e)}") - # Log error - self.services.chat.storeLog(workflow, { - "message": f"Fast path error: {str(e)}. Falling back to full workflow.", - "type": "error", - "status": "running", - "progress": 0.5 - }) - # Fall back to full workflow on error - logger.info("Falling back to full workflow due to fast path error") - taskPlan = await self._planTasks(userInput) - await self._executeTasks(taskPlan) - await self._processWorkflowResults() - - async def _sendFirstMessage(self, userInput: UserInputRequest) -> None: - """Send first message to start workflow""" - try: - workflow = self.services.workflow - checkWorkflowStopped(self.services) - - # Create initial message using interface - # For first user message, include round info in the user context label - roundNum = workflow.currentRound - contextLabel = f"round{roundNum}_usercontext" - - messageData = { - "workflowId": workflow.id, - "role": "user", - "message": userInput.prompt, - "status": "first", - "sequenceNr": 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": contextLabel, - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "pending", - "actionProgress": "pending" - } - - # Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents - # SKIP user intention analysis for AUTOMATION mode - it uses predefined JSON plans - createdDocs = [] - workflowMode = getattr(workflow, 'workflowMode', None) - skipIntentionAnalysis = (workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION) - - if skipIntentionAnalysis: - logger.info("Skipping user intention analysis for AUTOMATION mode - using direct user input") - # For automation mode, use user input directly without AI analysis - self.services.currentUserPrompt = userInput.prompt - # Always set currentUserPromptNormalized - use user input directly for automation mode - self.services.currentUserPromptNormalized = userInput.prompt - detectedLanguage = None - normalizedRequest = None - intentText = userInput.prompt - contextItems = [] - else: - try: - analyzerPrompt = ( - "You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n" - "1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n" - "2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n" - "3) intent: concise single-paragraph core request in the detected language for high-level routing.\n" - "4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n" - "5) primaryGoal: The main objective the user wants to achieve.\n" - "6) dataType: What type of data/content they want (numbers|text|documents|analysis|code|unknown).\n" - "7) expectedFormats: What file format(s) they expect - provide matching file format extensions list (e.g., [\"xlsx\", \"pdf\"]). If format is unclear or not specified, use empty list [].\n" - "8) qualityRequirements: Quality requirements they have (accuracy, completeness) as {accuracyThreshold: 0.0-1.0, completenessThreshold: 0.0-1.0}.\n" - "9) successCriteria: Specific success criteria that define completion (array of strings).\n" - "10) needsWorkflowHistory: Boolean indicating if this request needs previous workflow rounds/history to be understood or completed (e.g., 'continue', 'retry', 'fix', 'improve', 'update', 'modify', 'based on previous', 'build on', references to earlier work). Return true if the request is a continuation, retry, modification, or builds upon previous work.\n\n" - "Rules:\n" - "- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n" - "- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n" - "- Preserve critical references (URLs, filenames) in intent.\n" - "- Normalize to the primary detected language if mixed-language.\n\n" - "Return ONLY JSON (no markdown) with this shape:\n" - "{\n" - " \"detectedLanguage\": \"de|en|fr|it|...\",\n" - " \"normalizedRequest\": \"Full explicit instruction in detected language\",\n" - " \"intent\": \"Concise normalized request...\",\n" - " \"contextItems\": [\n" - " {\n" - " \"title\": \"User context 1\",\n" - " \"mimeType\": \"text/plain\",\n" - " \"content\": \"Full extracted content block here\"\n" - " }\n" - " ],\n" - " \"primaryGoal\": \"The main objective the user wants to achieve\",\n" - " \"dataType\": \"numbers|text|documents|analysis|code|unknown\",\n" - " \"expectedFormats\": [\"pdf\", \"docx\", \"xlsx\", \"txt\", \"json\", \"csv\", \"html\", \"md\"],\n" - " \"qualityRequirements\": {\n" - " \"accuracyThreshold\": 0.0-1.0,\n" - " \"completenessThreshold\": 0.0-1.0\n" - " },\n" - " \"successCriteria\": [\"specific criterion 1\", \"specific criterion 2\"],\n" - " \"needsWorkflowHistory\": true|false\n" - "}\n\n" - "## User Message\n" - "The following is the user's original input message. Extract intent, normalize the request, and identify any large context blocks that should be moved to separate documents:\n\n" - "################ USER INPUT START #################\n" - f"{userInput.prompt.replace('{', '{{').replace('}', '}}') if userInput.prompt else ''}\n" - "################ USER INPUT FINISH #################" - ) - - # Call AI analyzer (planning call - will use static parameters) - aiResponse = await self.services.ai.callAiPlanning( - prompt=analyzerPrompt, - placeholders=None, - debugType="userintention" - ) - - detectedLanguage = None - normalizedRequest = None - intentText = userInput.prompt - contextItems = [] - workflowIntent = None - - # Parse analyzer response (JSON expected) - try: - jsonStart = aiResponse.find('{') if aiResponse else -1 - jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0 - if jsonStart != -1 and jsonEnd > jsonStart: - parsed = json.loads(aiResponse[jsonStart:jsonEnd]) - detectedLanguage = parsed.get('detectedLanguage') or None - normalizedRequest = parsed.get('normalizedRequest') or None - if parsed.get('intent'): - intentText = parsed.get('intent') - contextItems = parsed.get('contextItems') or [] - - # Extract intent analysis fields and store as workflowIntent - workflowIntent = { - 'primaryGoal': parsed.get('primaryGoal'), - 'dataType': parsed.get('dataType', 'unknown'), - 'expectedFormats': parsed.get('expectedFormats', []), - 'qualityRequirements': parsed.get('qualityRequirements', {}), - 'successCriteria': parsed.get('successCriteria', []), - 'languageUserDetected': detectedLanguage, - 'needsWorkflowHistory': parsed.get('needsWorkflowHistory', False) - } - - # Store needsWorkflowHistory in services for fast path decision - needsHistoryFromIntention = parsed.get('needsWorkflowHistory', False) - # Always set the value - default to False if not a boolean - setattr(self.services, '_needsWorkflowHistory', bool(needsHistoryFromIntention) if isinstance(needsHistoryFromIntention, bool) else False) - - # Store workflowIntent in workflow object for reuse - if hasattr(self.services, 'workflow') and self.services.workflow: - self.services.workflow._workflowIntent = workflowIntent - except Exception: - contextItems = [] - workflowIntent = None - # Ensure needsWorkflowHistory is False if parsing fails - setattr(self.services, '_needsWorkflowHistory', False) - - # Update services state - if detectedLanguage and isinstance(detectedLanguage, str): - self._setUserLanguage(detectedLanguage) - try: - setattr(self.services, 'currentUserLanguage', detectedLanguage) - except Exception: - pass - self.services.currentUserPrompt = intentText or userInput.prompt - # Always set currentUserPromptNormalized - use normalizedRequest if available, otherwise fallback to currentUserPrompt - normalizedValue = normalizedRequest or intentText or userInput.prompt - self.services.currentUserPromptNormalized = normalizedValue - if contextItems is not None: - self.services.currentUserContextItems = contextItems - - # Create documents for context items - if contextItems and isinstance(contextItems, list): - for idx, item in enumerate(contextItems): - try: - title = item.get('title') if isinstance(item, dict) else None - mime = item.get('mimeType') if isinstance(item, dict) else None - content = item.get('content') if isinstance(item, dict) else None - if not content: - continue - fileName = (title or f"user_context_{idx+1}.txt").strip() - mimeType = (mime or "text/plain").strip() - - # Neutralize content before storing if neutralization is enabled - contentBytes = content.encode('utf-8') - contentBytes = await self._neutralizeContentIfEnabled(contentBytes, mimeType) - - # Create file in component storage - fileItem = self.services.interfaceDbComponent.createFile( - name=fileName, - mimeType=mimeType, - content=contentBytes - ) - # Persist file data - self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes) - - # Collect file info - fileInfo = self.services.chat.getFileInfo(fileItem.id) - from modules.datamodels.datamodelChat import ChatDocument - doc = ChatDocument( - fileId=fileItem.id, - fileName=fileInfo.get("fileName", fileName) if fileInfo else fileName, - fileSize=fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes), - mimeType=fileInfo.get("mimeType", mimeType) if fileInfo else mimeType - ) - createdDocs.append(doc) - except Exception: - continue - except Exception as e: - logger.warning(f"Prompt analysis failed or skipped: {str(e)}") - - # Process user-uploaded documents (fileIds) and combine with context documents - if userInput.listFileId: - try: - userDocs = await self._processFileIds(userInput.listFileId, None) - if userDocs: - createdDocs.extend(userDocs) - except Exception as e: - logger.warning(f"Failed to process user fileIds: {e}") - - # Finally, persist and bind the first message with combined documents (context + user) - self.services.chat.storeMessageWithDocuments(workflow, messageData, createdDocs) - - except Exception as e: - logger.error(f"Error sending first message: {str(e)}") - raise - - async def _planTasks(self, userInput: UserInputRequest): - """Generate task plan for workflow execution""" - workflow = self.services.workflow - handling = self.workflowProcessor - # Generate task plan first (shared for both modes) - # Use normalizedRequest instead of raw userInput.prompt for security - normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt - taskPlan = await handling.generateTaskPlan(normalizedPrompt, workflow) - if not taskPlan or not taskPlan.tasks: - raise Exception("No tasks generated in task plan.") - workflowMode = getattr(workflow, 'workflowMode') - logger.info(f"Workflow object attributes: {workflow.__dict__ if hasattr(workflow, '__dict__') else 'No __dict__'}") - logger.info(f"Executing workflow mode={workflowMode} with {len(taskPlan.tasks)} tasks") - return taskPlan - - async def _executeTasks(self, taskPlan) -> None: - """Execute all tasks in the task plan and update workflow status.""" - workflow = self.services.workflow - handling = self.workflowProcessor - totalTasks = len(taskPlan.tasks) - allTaskResults: List = [] - previousResults: List[str] = [] - - for idx, taskStep in enumerate(taskPlan.tasks): - currentTaskIndex = idx + 1 - logger.info(f"Task {currentTaskIndex}/{totalTasks}: {taskStep.objective}") - - # Update workflow state before executing task (fixes "Task 0" issue) - handling.updateWorkflowBeforeExecutingTask(currentTaskIndex) - - # Build TaskContext (mode-specific behavior is inside WorkflowProcessor) - taskContext = TaskContext( - taskStep=taskStep, - workflow=workflow, - workflowId=workflow.id, - availableDocuments=None, - availableConnections=None, - previousResults=previousResults, - previousHandover=None, - improvements=[], - retryCount=0, - previousActionResults=[], - previousReviewResult=None, - isRegeneration=False, - failurePatterns=[], - failedActions=[], - successfulActions=[], - criteriaProgress={ - 'met_criteria': set(), - 'unmet_criteria': set(), - 'attempt_history': [] - } - ) - - taskResult = await handling.executeTask(taskStep, workflow, taskContext) - - # Persist task result for cross-task/round document references - # Convert ChatTaskResult to WorkflowTaskResult for persistence - from modules.datamodels.datamodelWorkflow import TaskResult as WorkflowTaskResult - from modules.datamodels.datamodelChat import ActionResult - - # Get final ActionResult from task execution (last action result) - finalActionResult = None - if hasattr(taskResult, 'actionResult'): - finalActionResult = taskResult.actionResult - elif taskContext.previousActionResults and len(taskContext.previousActionResults) > 0: - # Use last action result from context - finalActionResult = taskContext.previousActionResults[-1] - - # Create WorkflowTaskResult for persistence - if finalActionResult: - workflowTaskResult = WorkflowTaskResult( - taskId=taskStep.id, - actionResult=finalActionResult - ) - # Persist task result (creates ChatMessage + ChatDocuments) - await handling.persistTaskResult(workflowTaskResult, workflow, taskContext) - - handoverData = await handling.prepareTaskHandover(taskStep, [], taskResult, workflow) - allTaskResults.append({ - 'taskStep': taskStep, - 'taskResult': taskResult, - 'handoverData': handoverData - }) - if taskResult.success and taskResult.feedback: - previousResults.append(taskResult.feedback) - - # Mark workflow as completed; error/stop cases update status elsewhere - workflow.status = "completed" - return None - - async def _processWorkflowResults(self) -> None: - """Process workflow results based on workflow status and create appropriate messages""" - try: - workflow = self.services.workflow - try: - checkWorkflowStopped(self.services) - except WorkflowStoppedException: - logger.info(f"Workflow {workflow.id} was stopped during result processing") - - # Create final stopped message - stoppedMessage = { - "workflowId": workflow.id, - "role": "assistant", - "message": "🛑 Workflow stopped by user", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_stopped", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "stopped", - "actionProgress": "stopped" - } - self.services.chat.storeMessageWithDocuments(workflow, stoppedMessage, []) - - # Update workflow status to stopped - workflow.status = "stopped" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "stopped", - "lastActivity": workflow.lastActivity - }) - return - - if workflow.status == 'stopped': - # Create stopped message - stopped_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": "🛑 Workflow stopped by user", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_stopped", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "stopped", - "actionProgress": "stopped" - } - self.services.chat.storeMessageWithDocuments(workflow, stopped_message, []) - - # Update workflow status to stopped - workflow.status = "stopped" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "stopped", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Add stopped log entry - self.services.chat.storeLog(workflow, { - "message": "Workflow stopped by user", - "type": "warning", - "status": "stopped", - "progress": 1.0 - }) - return - elif workflow.status == 'failed': - # Create error message - errorMessage = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Workflow failed: {'Unknown error'}", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_failure", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "fail", - "actionProgress": "fail" - } - self.services.chat.storeMessageWithDocuments(workflow, errorMessage, []) - - # Update workflow status to failed - workflow.status = "failed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "failed", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Add failed log entry - self.services.chat.storeLog(workflow, { - "message": "Workflow failed: Unknown error", - "type": "error", - "status": "failed", - "progress": 1.0 - }) - return - - # For successful workflows, send detailed completion message - await self._sendLastMessage() - - except Exception as e: - logger.error(f"Error processing workflow results: {str(e)}") - # Create error message - error_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Error processing workflow results: {str(e)}", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_error", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "fail", - "actionProgress": "fail" - } - self.services.chat.storeMessageWithDocuments(workflow, error_message, []) - - # Update workflow status to failed - workflow.status = "failed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "failed", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - async def _sendLastMessage(self) -> None: - """Send last message to complete workflow (only for successful workflows)""" - try: - workflow = self.services.workflow - # Safety check: ensure this is only called for successful workflows - if workflow.status in ['stopped', 'failed']: - logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}") - return - - # Generate feedback - feedback = await self._generateWorkflowFeedback() - - # Create last message using interface - messageData = { - "workflowId": workflow.id, - "role": "assistant", - "message": feedback, - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_feedback", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "success", - "actionProgress": "success" - } - - # Create message using interface - self.services.chat.storeMessageWithDocuments(workflow, messageData, []) - - # Update workflow status to completed - workflow.status = "completed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - - # Update workflow in database - self.services.chat.updateWorkflow(workflow.id, { - "status": "completed", - "lastActivity": workflow.lastActivity - }) - - # Add completion log entry - self.services.chat.storeLog(workflow, { - "message": "Workflow completed", - "type": "success", - "status": "completed", - "progress": 1.0 - }) - - except Exception as e: - logger.error(f"Error sending last message: {str(e)}") - raise - - async def _generateWorkflowFeedback(self) -> str: - """Generate feedback message for workflow completion""" - try: - workflow = self.services.workflow - checkWorkflowStopped(self.services) - - # Count messages by role - userMessages = [msg for msg in workflow.messages if msg.role == 'user'] - assistantMessages = [msg for msg in workflow.messages if msg.role == 'assistant'] - - # Generate summary feedback - feedback = f"Workflow completed.\n\n" - feedback += f"Processed {len(userMessages)} user inputs and generated {len(assistantMessages)} responses.\n" - - # Add final status - if workflow.status == "completed": - feedback += "All tasks completed successfully." - elif workflow.status == "partial": - feedback += "Some tasks completed with partial success." - else: - feedback += f"Workflow status: {workflow.status}" - - return feedback - - except Exception as e: - logger.error(f"Error generating workflow feedback: {str(e)}") - return "Workflow processing completed." - - def _handleWorkflowStop(self) -> None: - """Handle workflow stop exception""" - workflow = self.services.workflow - logger.info("Workflow stopped by user") - - # Update workflow status to stopped - workflow.status = "stopped" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "stopped", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Create final stopped message - stopped_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": "🛑 Workflow stopped by user", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_stopped", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "pending", - "actionProgress": "pending" - } - self.services.chat.storeMessageWithDocuments(workflow, stopped_message, []) - - # Add log entry - self.services.chat.storeLog(workflow, { - "message": "Workflow stopped by user", - "type": "warning", - "status": "stopped", - "progress": 1.0 - }) - - def _handleWorkflowError(self, error: Exception) -> None: - """Handle workflow error exception""" - workflow = self.services.workflow - logger.error(f"Workflow processing error: {str(error)}") - - # Update workflow status to failed - workflow.status = "failed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "failed", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Create error message - error_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Workflow processing failed: {str(error)}", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_error", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "fail", - "actionProgress": "fail" - } - self.services.chat.storeMessageWithDocuments(workflow, error_message, []) - - # Add error log entry - self.services.chat.storeLog(workflow, { - "message": f"Workflow failed: {str(error)}", - "type": "error", - "status": "failed", - "progress": 1.0 - }) - - raise - - async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]: - """Process file IDs from existing files and return ChatDocument objects. - If neutralization is enabled, files are neutralized and new files are created with neutralized content. - If neutralization fails, the document is not included and an error is logged to ChatLog.""" - documents = [] - - # Check if neutralization is enabled - neutralizationEnabled = False - try: - config = self.services.neutralization.getConfig() - neutralizationEnabled = config and config.enabled - except Exception as e: - logger.debug(f"Could not check neutralization config: {str(e)}") - - workflow = self.services.workflow - - for fileId in fileIds: - try: - # Get file info from chat service - fileInfo = self.services.chat.getFileInfo(fileId) - if not fileInfo: - logger.warning(f"No file info found for file ID {fileId}") - continue - - originalFileName = fileInfo.get("fileName", "unknown") - originalMimeType = fileInfo.get("mimeType", "application/octet-stream") - fileIdToUse = fileId - fileNameToUse = originalFileName - fileSizeToUse = fileInfo.get("size", 0) - neutralizationFailed = False - - # Neutralize file if enabled - if neutralizationEnabled: - try: - # Neutralize the file using the neutralization service - neutralizationResult = self.services.neutralization.processFile(fileId) - - # Check if file is binary (not neutralized) - if neutralizationResult.get('is_binary', False): - # Binary file - log INFO and use original file - infoMsg = f"File '{originalFileName}' (MIME type: {neutralizationResult.get('mime_type', 'unknown')}) is a binary file. Binary file neutralization will be implemented in the future. Using original file without neutralization." - logger.info(infoMsg) - self.services.chat.storeLog(workflow, { - "message": infoMsg, - "type": "info", - "status": "running", - "progress": 50 - }) - # Use original file (fileIdToUse already set to fileId) - elif neutralizationResult and 'neutralized_text' in neutralizationResult: - neutralizedText = neutralizationResult['neutralized_text'] - - # Create new file with neutralized content - neutralizedFileName = neutralizationResult.get('neutralized_file_name', f"neutralized_{originalFileName}") - neutralizedContentBytes = neutralizedText.encode('utf-8') - - # Create file in component storage - neutralizedFileItem = self.services.interfaceDbComponent.createFile( - name=neutralizedFileName, - mimeType=originalMimeType, - content=neutralizedContentBytes - ) - # Persist file data - self.services.interfaceDbComponent.createFileData(neutralizedFileItem.id, neutralizedContentBytes) - - # Use the neutralized file ID and actual size - fileIdToUse = neutralizedFileItem.id - fileNameToUse = neutralizedFileName - fileSizeToUse = len(neutralizedContentBytes) - - logger.info(f"Neutralized file {fileId} -> {fileIdToUse} ({fileNameToUse})") - else: - neutralizationFailed = True - errorMsg = f"Neutralization did not return neutralized_text for file '{originalFileName}' (ID: {fileId})" - logger.warning(errorMsg) - self.services.chat.storeLog(workflow, { - "message": errorMsg, - "type": "error", - "status": "error", - "progress": -1 - }) - except Exception as e: - neutralizationFailed = True - errorMsg = f"Failed to neutralize file '{originalFileName}' (ID: {fileId}): {str(e)}" - logger.error(errorMsg) - self.services.chat.storeLog(workflow, { - "message": errorMsg, - "type": "error", - "status": "error", - "progress": -1 - }) - - # Only skip document if neutralization failed (not for binary files) - if not neutralizationFailed: - # Create document with file ID (neutralized or original) - document = ChatDocument( - id=str(uuid.uuid4()), - messageId=messageId or "", - fileId=fileIdToUse, - fileName=fileNameToUse, - fileSize=fileSizeToUse, - mimeType=originalMimeType - ) - documents.append(document) - logger.info(f"Processed file ID {fileId} -> {document.fileName} (using fileId: {fileIdToUse})") - else: - logger.warning(f"Skipping document for file ID {fileId} due to neutralization failure") - except Exception as e: - errorMsg = f"Error processing file ID {fileId}: {str(e)}" - logger.error(errorMsg) - self.services.chat.storeLog(workflow, { - "message": errorMsg, - "type": "error", - "status": "error", - "progress": -1 - }) - return documents - - - def _setUserLanguage(self, language: str) -> None: - """Set user language for the service center""" - self.services.user.language = language - - async def _neutralizeContentIfEnabled(self, contentBytes: bytes, mimeType: str) -> bytes: - """Neutralize content if neutralization is enabled in user settings""" - try: - # Check if neutralization is enabled - config = self.services.neutralization.getConfig() - if not config or not config.enabled: - return contentBytes - - # Decode content to text for neutralization - try: - textContent = contentBytes.decode('utf-8') - except UnicodeDecodeError: - # Try alternative encodings - for enc in ['latin-1', 'cp1252', 'iso-8859-1']: - try: - textContent = contentBytes.decode(enc) - break - except UnicodeDecodeError: - continue - else: - # If unable to decode, return original bytes (binary content) - logger.debug(f"Unable to decode content for neutralization, skipping: {mimeType}") - return contentBytes - - # Neutralize the text content - # Note: The neutralization service should use names from config when processing - result = self.services.neutralization.processText(textContent) - if result and 'neutralized_text' in result: - neutralizedText = result['neutralized_text'] - # Encode back to bytes using the same encoding - try: - return neutralizedText.encode('utf-8') - except Exception as e: - logger.warning(f"Error encoding neutralized text: {str(e)}") - return contentBytes - else: - logger.warning("Neutralization did not return neutralized_text") - return contentBytes - except Exception as e: - logger.error(f"Error during content neutralization: {str(e)}") - # Return original content on error - return contentBytes - - def _checkIfHistoryAvailable(self) -> bool: - """Check if workflow history is available (previous rounds exist). - - Returns True if there are previous workflow rounds with messages. - """ - try: - from modules.workflows.processing.shared.placeholderFactory import getPreviousRoundContext - - history = getPreviousRoundContext(self.services) - - # Check if history contains actual content (not just "No previous round context available") - if history and history != "No previous round context available": - return True - - return False - except Exception as e: - logger.error(f"Error checking if history is available: {str(e)}") - return False