From 680fab6e0087177fd91a8599e20edfc9e3125146 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 11 Jan 2026 15:09:11 +0100
Subject: [PATCH] fixes in int
---
how --stat HEAD | 49 -
modules/workflows/workflowManager copy.py | 1385 +++++++++++++++++++++
2 files changed, 1385 insertions(+), 49 deletions(-)
delete mode 100644 how --stat HEAD
create mode 100644 modules/workflows/workflowManager copy.py
diff --git a/how --stat HEAD b/how --stat HEAD
deleted file mode 100644
index 52b1d7b3..00000000
--- a/how --stat HEAD
+++ /dev/null
@@ -1,49 +0,0 @@
-M app.py
-A modules/.$DEPENDENCY_DIAGRAM.drawio.bkp
-A modules/AUTOMATION_FEATURE_ANALYSIS.md
-A modules/BIDIRECTIONAL_IMPORTS.md
-A modules/DEPENDENCY_DIAGRAM.drawio
-A modules/FEATURES_TO_INTERFACES_IMPORTS.md
-M modules/connectors/connectorVoiceGoogle.py
-M modules/datamodels/datamodelChat.py
-M modules/datamodels/datamodelPagination.py
-A modules/features/automation/__init__.py
-A modules/features/automation/mainAutomation.py
-A modules/features/automation/subAutomationUtils.py
-D modules/features/chatAlthaus/COMPONENT_DIAGRAM.md
-M modules/features/featuresLifecycle.py
-M modules/interfaces/interfaceAiObjects.py
-M modules/interfaces/interfaceDbAppObjects.py
-M modules/interfaces/interfaceDbChatObjects.py
-M modules/interfaces/interfaceDbComponentObjects.py
-M modules/interfaces/interfaceVoiceObjects.py
-M modules/routes/routeAdminAutomationEvents.py
-M modules/routes/routeVoiceGoogle.py
-M modules/services/__init__.py
-M modules/services/serviceAi/mainServiceAi.py
-M modules/services/serviceAi/subJsonResponseHandling.py
-M modules/services/serviceChat/mainServiceChat.py
-M modules/services/serviceExtraction/mainServiceExtraction.py
-M modules/services/serviceExtraction/subPipeline.py
-M modules/services/serviceExtraction/subPromptBuilderExtraction.py
-M modules/services/serviceGeneration/renderers/rendererXlsx.py
-M modules/services/serviceGeneration/subPromptBuilderGeneration.py
-A modules/services/serviceSecurity/mainServiceSecurity.py
-M modules/services/serviceSharepoint/mainServiceSharepoint.py
-M modules/services/serviceUtils/mainServiceUtils.py
-A modules/shared/callbackRegistry.py
-M modules/shared/debugLogger.py
-M modules/shared/jsonUtils.py
-M modules/workflows/methods/methodAi.py
-M modules/workflows/methods/methodBase.py
-A modules/workflows/methods/methodContext.py
-M modules/workflows/methods/methodOutlook.py
-M modules/workflows/methods/methodSharepoint.py
-M modules/workflows/processing/adaptive/contentValidator.py
-M modules/workflows/processing/core/messageCreator.py
-M modules/workflows/processing/modes/modeAutomation.py
-M modules/workflows/processing/modes/modeDynamic.py
-M modules/workflows/processing/shared/promptGenerationActionsDynamic.py
-M modules/workflows/processing/shared/promptGenerationTaskplan.py
-M modules/workflows/processing/workflowProcessor.py
-M modules/workflows/workflowManager.py
diff --git a/modules/workflows/workflowManager copy.py b/modules/workflows/workflowManager copy.py
new file mode 100644
index 00000000..9806060a
--- /dev/null
+++ b/modules/workflows/workflowManager copy.py
@@ -0,0 +1,1385 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+from typing import Dict, Any, List, Optional
+import logging
+import uuid
+import asyncio
+import json
+
+from modules.datamodels.datamodelChat import (
+ UserInputRequest,
+ ChatMessage,
+ ChatWorkflow,
+ ChatDocument,
+ WorkflowModeEnum
+)
+from modules.datamodels.datamodelChat import TaskContext
+from modules.workflows.processing.workflowProcessor import WorkflowProcessor
+from modules.workflows.processing.shared.stateTools import WorkflowStoppedException, checkWorkflowStopped
+
+
+logger = logging.getLogger(__name__)
+
+class WorkflowManager:
+ """Manager for workflow processing and coordination"""
+
+ def __init__(self, services):
+ self.services = services
+ self.workflowProcessor = None
+
+ # Exported functions
+
+ async def workflowStart(self, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None) -> ChatWorkflow:
+ """Starts a new workflow or continues an existing one, then launches processing."""
+ try:
+ # Debug log to check workflowMode parameter
+ logger.info(f"WorkflowManager received workflowMode: {workflowMode}")
+ currentTime = self.services.utils.timestampGetUtc()
+
+ if workflowId:
+ workflow = self.services.chat.getWorkflow(workflowId)
+ if not workflow:
+ raise ValueError(f"Workflow {workflowId} not found")
+
+ # Store workflow in services for reference (this is the ChatWorkflow object)
+ self.services.workflow = workflow
+
+ # CRITICAL: Update all method instances to use the current Services object with the correct workflow
+ from modules.workflows.processing.shared.methodDiscovery import discoverMethods
+ discoverMethods(self.services)
+ logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}")
+
+ if workflow.status == "running":
+ logger.info(f"Stopping running workflow {workflowId} before processing new prompt")
+ workflow.status = "stopped"
+ workflow.lastActivity = currentTime
+ self.services.chat.updateWorkflow(workflowId, {
+ "status": "stopped",
+ "lastActivity": currentTime
+ })
+ self.services.chat.storeLog(workflow, {
+ "message": "Workflow stopped for new prompt",
+ "type": "info",
+ "status": "stopped",
+ "progress": 1.0
+ })
+
+ newRound = workflow.currentRound + 1
+ self.services.chat.updateWorkflow(workflowId, {
+ "status": "running",
+ "lastActivity": currentTime,
+ "currentRound": newRound,
+ "workflowMode": workflowMode # Update workflow mode for existing workflows
+ })
+
+ # Reflect updates on the in-memory object without reloading
+ workflow.status = "running"
+ workflow.lastActivity = currentTime
+ workflow.currentRound = newRound
+ workflow.workflowMode = workflowMode
+
+ self.services.chat.storeLog(workflow, {
+ "message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}",
+ "type": "info",
+ "status": "running",
+ "progress": 0
+ })
+
+ else:
+ workflowData = {
+ "name": "New Workflow",
+ "status": "running",
+ "startedAt": currentTime,
+ "lastActivity": currentTime,
+ "currentRound": 1,
+ "currentTask": 0,
+ "currentAction": 0,
+ "totalTasks": 0,
+ "totalActions": 0,
+ "mandateId": self.services.user.mandateId,
+ "messageIds": [],
+ "workflowMode": workflowMode,
+ "maxSteps": 10 , # Set maxSteps
+ }
+
+ workflow = self.services.chat.createWorkflow(workflowData)
+ logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}")
+ logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}")
+
+ # Store workflow in services (this is the ChatWorkflow object)
+ self.services.workflow = workflow
+
+ # CRITICAL: Update all method instances to use the current Services object with the correct workflow
+ # This ensures cached method instances don't use stale workflow IDs from previous workflows
+ from modules.workflows.processing.shared.methodDiscovery import discoverMethods
+ discoverMethods(self.services)
+ logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}")
+
+ # Start workflow processing asynchronously
+ asyncio.create_task(self._workflowProcess(userInput))
+
+ return workflow
+ except Exception as e:
+ logger.error(f"Error starting workflow: {str(e)}")
+ raise
+
+ async def workflowStop(self, workflowId: str) -> ChatWorkflow:
+ """Stops a running workflow."""
+ try:
+ workflow = self.services.chat.getWorkflow(workflowId)
+ if not workflow:
+ raise ValueError(f"Workflow {workflowId} not found")
+
+ # Store workflow in services (this is the ChatWorkflow object)
+ self.services.workflow = workflow
+
+ workflow.status = "stopped"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+ self.services.chat.updateWorkflow(workflowId, {
+ "status": "stopped",
+ "lastActivity": workflow.lastActivity
+ })
+ self.services.chat.storeLog(workflow, {
+ "message": "Workflow stopped",
+ "type": "warning",
+ "status": "stopped",
+ "progress": 1.0
+ })
+ return workflow
+ except Exception as e:
+ logger.error(f"Error stopping workflow: {str(e)}")
+ raise
+
+ # Main processor
+
+ async def _workflowProcess(self, userInput: UserInputRequest) -> None:
+ """Process a workflow with user input"""
+ try:
+ # Send ChatLog message immediately when workflow starts
+ workflow = self.services.workflow
+ self.services.chat.storeLog(workflow, {
+ "message": "Workflow started...",
+ "type": "info",
+ "status": "running",
+ "progress": 0.0
+ })
+
+ # Store the current user prompt in services for easy access throughout the workflow
+ self.services.rawUserPrompt = userInput.prompt
+ self.services.currentUserPrompt = userInput.prompt
+
+ # Reset progress logger for new workflow
+ self.services.chat._progressLogger = None
+
+ # Reset workflow history flag at start of each workflow
+ setattr(self.services, '_needsWorkflowHistory', False)
+
+ self.workflowProcessor = WorkflowProcessor(self.services)
+
+ # Get workflow mode to determine if combined analysis is needed
+ workflowMode = getattr(self.services.workflow, 'workflowMode', None)
+ skipCombinedAnalysis = (workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION)
+
+ if skipCombinedAnalysis:
+ logger.info("Skipping combined analysis for AUTOMATION mode - using predefined plan")
+ complexity = "moderate" # Default for automation workflows
+ needsWorkflowHistory = False # Automation workflows don't need history
+ detectedLanguage = None # No language detection in automation mode
+ normalizedRequest = userInput.prompt
+ intentText = userInput.prompt
+ contextItems = []
+ workflowIntent = None
+ else:
+ # Process user-uploaded documents from userInput for combined analysis
+ documents = []
+ if userInput.listFileId:
+ try:
+ documents = await self._processFileIds(userInput.listFileId, None)
+ except Exception as e:
+ logger.warning(f"Failed to process user fileIds for combined analysis: {e}")
+
+ # Phase 1+2: Kombinierte Analyse: Intent + Komplexität in einem AI-Call
+ analysisResult = await self._analyzeUserInputAndComplexity(userInput.prompt, documents)
+
+ # Extract results
+ detectedLanguage = analysisResult.get('detectedLanguage')
+ normalizedRequest = analysisResult.get('normalizedRequest')
+ intentText = analysisResult.get('intent') or userInput.prompt
+ contextItems = analysisResult.get('contextItems', [])
+ complexity = analysisResult.get('complexity', 'moderate')
+ needsWorkflowHistory = analysisResult.get('needsWorkflowHistory', False)
+ fastTrack = analysisResult.get('fastTrack', False)
+
+ # Extract intent analysis fields and store as workflowIntent
+ workflowIntent = {
+ 'intent': intentText, # Use intent instead of primaryGoal
+ 'dataType': analysisResult.get('dataType', 'unknown'),
+ 'expectedFormats': analysisResult.get('expectedFormats', []),
+ 'qualityRequirements': analysisResult.get('qualityRequirements', {}),
+ 'successCriteria': analysisResult.get('successCriteria', []),
+ 'languageUserDetected': detectedLanguage,
+ 'needsWorkflowHistory': needsWorkflowHistory
+ }
+
+ # Store needsWorkflowHistory in services
+ setattr(self.services, '_needsWorkflowHistory', bool(needsWorkflowHistory))
+
+ # Store workflowIntent in workflow object for reuse
+ if hasattr(self.services, 'workflow') and self.services.workflow:
+ self.services.workflow._workflowIntent = workflowIntent
+
+ # Store normalized request and intent
+ # CRITICAL: normalizedRequest MUST be used if available, do NOT fall back to intent
+ self.services.currentUserPrompt = intentText or userInput.prompt
+ if normalizedRequest and normalizedRequest.strip():
+ # Use normalizedRequest if available and not empty
+ self.services.currentUserPromptNormalized = normalizedRequest
+ logger.info(f"Stored normalized request (length: {len(normalizedRequest)}, preview: {normalizedRequest[:100]}...)")
+ else:
+ # Fallback only if normalizedRequest is None or empty
+ logger.warning(f"normalizedRequest is None or empty, falling back to intentText. normalizedRequest={normalizedRequest}, intentText={intentText[:100] if intentText else None}...")
+ self.services.currentUserPromptNormalized = intentText or userInput.prompt
+ if contextItems is not None:
+ self.services.currentUserContextItems = contextItems
+
+ # Set detected language
+ if detectedLanguage and isinstance(detectedLanguage, str):
+ self._setUserLanguage(detectedLanguage)
+ try:
+ setattr(self.services, 'currentUserLanguage', detectedLanguage)
+ except Exception:
+ pass
+
+ logger.info(f"Combined analysis: complexity={complexity}, needsWorkflowHistory={needsWorkflowHistory}, language={detectedLanguage}, fastTrack={fastTrack}")
+
+ # Route to fast path for simple requests if history is not needed
+ # Skip fast path for automation mode or if history is needed
+ if not skipCombinedAnalysis and complexity == "simple" and not needsWorkflowHistory:
+ logger.info("Routing to fast path for simple request")
+ await self._executeFastPath(userInput, documents)
+ return # Fast path completes the workflow
+
+ # Now send the first message (use already analyzed data if available)
+ await self._sendFirstMessage(userInput, skipIntentionAnalysis=not skipCombinedAnalysis)
+
+ # Route to full workflow for moderate/complex requests or automation mode
+ logger.info(f"Routing to full workflow for {complexity} request" + (" (automation mode)" if skipCombinedAnalysis else ""))
+ taskPlan = await self._planTasks(userInput)
+ await self._executeTasks(taskPlan)
+ await self._processWorkflowResults()
+
+ except WorkflowStoppedException:
+ self._handleWorkflowStop()
+
+ except Exception as e:
+ self._handleWorkflowError(e)
+
+ # Helper functions
+
+ async def _analyzeUserInputAndComplexity(
+ self,
+ userPrompt: str,
+ documents: List[ChatDocument]
+ ) -> Dict[str, Any]:
+ """
+ Phase 1+2: Kombinierte Analyse: Intent + Komplexität in einem AI-Call.
+
+ Args:
+ userPrompt: User-Anfrage
+ documents: Liste der Dokumente
+
+ Returns:
+ Dict mit:
+ - detectedLanguage: ISO 639-1 Sprachcode
+ - normalizedRequest: Vollständige, explizite Umformulierung
+ - intent: Kurze Kern-Anfrage
+ - contextItems: Große Datenblöcke als separate Dokumente
+ - complexity: "simple" | "moderate" | "complex"
+ - needsWorkflowHistory: bool
+ - fastTrack: bool
+ - dataType: Datentyp
+ - expectedFormats: Erwartete Formate
+ - qualityRequirements: Qualitätsanforderungen
+ - successCriteria: Erfolgskriterien
+ """
+ # Baue Dokument-Liste für Prompt
+ docListText = ""
+ if documents:
+ for i, doc in enumerate(documents, 1):
+ docListText += f"\n{i}. {doc.fileName} ({doc.mimeType}, {doc.fileSize} bytes)"
+
+ analysisPrompt = f"""You are an input analyzer. From the user's message, perform ALL of the following in one pass:
+
+1. detectedLanguage: Detect ISO 639-1 language code (e.g., de, en, fr, it)
+2. normalizedRequest: Full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details
+3. intent: Concise single-paragraph core request in the detected language for high-level routing
+4. contextItems: Supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content
+5. complexity: "simple" | "moderate" | "complex"
+ - "simple": Only if NO documents AND NO web search required. Single question, straightforward answer (5-15s)
+ - "moderate": Multiple steps, some documents, structured response requiring some processing, or web search needed (30-60s)
+ - "complex": Multi-task workflow, many documents, research needed, content generation required, multi-step planning (60-120s)
+6. needsWorkflowHistory: Boolean indicating if this request needs previous workflow rounds/history (e.g., 'continue', 'retry', 'fix', 'improve', 'update', 'modify', 'based on previous', 'build on', references to earlier work)
+7. fastTrack: Boolean indicating if Fast Track is possible (simple requests without documents and without workflow history)
+8. dataType: What type of data/content they want (numbers|text|documents|analysis|code|unknown)
+9. expectedFormats: What file format(s) they expect - provide matching file format extensions list (e.g., ["xlsx", "pdf"]). If format is unclear or not specified, use empty list []
+10. qualityRequirements: Quality requirements they have (accuracy, completeness) as {{accuracyThreshold: 0.0-1.0, completenessThreshold: 0.0-1.0}}
+11. successCriteria: Specific success criteria that define completion (array of strings)
+
+Rules:
+- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained
+- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear
+- Preserve critical references (URLs, filenames) in intent
+- Normalize to the primary detected language if mixed-language
+- Consider number of documents provided when determining complexity
+- Consider need for external research or web search when determining complexity
+
+Documents provided: {len(documents)} document(s)
+{docListText}
+
+Return ONLY JSON (no markdown) with this exact structure:
+{{
+ "detectedLanguage": "de|en|fr|it|...",
+ "normalizedRequest": "Full explicit instruction in detected language",
+ "intent": "Concise normalized request...",
+ "contextItems": [
+ {{
+ "title": "User context 1",
+ "mimeType": "text/plain",
+ "content": "Full extracted content block here"
+ }}
+ ],
+ "complexity": "simple" | "moderate" | "complex",
+ "needsWorkflowHistory": true|false,
+ "fastTrack": true|false,
+ "dataType": "numbers|text|documents|analysis|code|unknown",
+ "expectedFormats": ["pdf", "docx", "xlsx", "txt", "json", "csv", "html", "md"],
+ "qualityRequirements": {{
+ "accuracyThreshold": 0.0-1.0,
+ "completenessThreshold": 0.0-1.0
+ }},
+ "successCriteria": ["specific criterion 1", "specific criterion 2"]
+}}
+
+## User Message
+The following is the user's original input message. Analyze intent, normalize the request, determine complexity, and identify any large context blocks that should be moved to separate documents:
+
+################ USER INPUT START #################
+{userPrompt.replace('{', '{{').replace('}', '}}') if userPrompt else ''}
+################ USER INPUT FINISH #################
+"""
+
+ # AI-Call (verwende callAiPlanning für einfache JSON-Responses)
+ # Debug-Logs werden bereits von callAiPlanning geschrieben
+ aiResponse = await self.services.ai.callAiPlanning(
+ prompt=analysisPrompt,
+ placeholders=None,
+ debugType="user_input_analysis"
+ )
+
+ # Parse Result
+ try:
+ jsonStart = aiResponse.find('{') if aiResponse else -1
+ jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0
+ if jsonStart != -1 and jsonEnd > jsonStart:
+ result = json.loads(aiResponse[jsonStart:jsonEnd])
+ return result
+ else:
+ logger.warning("Could not parse combined analysis response, using defaults")
+ return self._getDefaultAnalysisResult()
+ except Exception as e:
+ logger.warning(f"Error parsing combined analysis response: {str(e)}, using defaults")
+ return self._getDefaultAnalysisResult()
+
+ def _getDefaultAnalysisResult(self) -> Dict[str, Any]:
+ """Fallback Default-Werte wenn Parsing fehlschlägt."""
+ return {
+ "detectedLanguage": "en",
+ "normalizedRequest": "",
+ "intent": "",
+ "contextItems": [],
+ "complexity": "moderate",
+ "needsWorkflowHistory": False,
+ "fastTrack": False,
+ "dataType": "unknown",
+ "expectedFormats": [],
+ "qualityRequirements": {
+ "accuracyThreshold": 0.8,
+ "completenessThreshold": 0.8
+ },
+ "successCriteria": []
+ }
+
+ async def _executeFastPath(self, userInput: UserInputRequest, documents: List[ChatDocument]) -> None:
+ """Execute fast path for simple requests and deliver result to user"""
+ try:
+ workflow = self.services.workflow
+ checkWorkflowStopped(self.services)
+
+ # Get user language if available
+ userLanguage = getattr(self.services, 'currentUserLanguage', None)
+
+ # Execute fast path - use normalizedRequest if available, otherwise use raw prompt
+ normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt
+ result = await self.workflowProcessor.fastPathExecute(
+ prompt=normalizedPrompt,
+ documents=documents,
+ userLanguage=userLanguage
+ )
+
+ if not result.success:
+ # Fast path failed, fall back to full workflow
+ logger.warning(f"Fast path failed: {result.error}, falling back to full workflow")
+ taskPlan = await self._planTasks(userInput)
+ await self._executeTasks(taskPlan)
+ await self._processWorkflowResults()
+ return
+
+ # Extract response text from ActionResult
+ responseText = ""
+ chatDocuments = []
+
+ if result.documents and len(result.documents) > 0:
+ # Get response text from first document
+ firstDoc = result.documents[0]
+ if hasattr(firstDoc, 'documentData'):
+ docData = firstDoc.documentData
+ if isinstance(docData, bytes):
+ responseText = docData.decode('utf-8')
+ else:
+ responseText = str(docData)
+
+ # Convert ActionDocuments to ChatDocuments for persistence
+ for actionDoc in result.documents:
+ if hasattr(actionDoc, 'documentData') and actionDoc.documentData:
+ # Create file in component storage
+ fileItem = self.services.interfaceDbComponent.createFile(
+ name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else "fast_path_response.txt",
+ mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain",
+ content=actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8')
+ )
+ # Persist file data
+ self.services.interfaceDbComponent.createFileData(fileItem.id, actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8'))
+
+ # Get file info
+ fileInfo = self.services.chat.getFileInfo(fileItem.id)
+
+ # Create ChatDocument as dict (messageId will be assigned by createMessage)
+ # Don't create ChatDocument object directly - it requires messageId which doesn't exist yet
+ chatDoc = {
+ "fileId": fileItem.id,
+ "fileName": fileInfo.get("fileName", actionDoc.documentName) if fileInfo else actionDoc.documentName,
+ "fileSize": fileInfo.get("size", len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))) if fileInfo else (len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))),
+ "mimeType": fileInfo.get("mimeType", actionDoc.mimeType) if fileInfo else actionDoc.mimeType,
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0, # Fast path doesn't have tasks
+ "actionNumber": 0
+ }
+ chatDocuments.append(chatDoc)
+
+ # Mark workflow as completed BEFORE storing message (so UI polling stops)
+ workflow.status = "completed"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+ self.services.chat.updateWorkflow(workflow.id, {
+ "status": "completed",
+ "lastActivity": workflow.lastActivity
+ })
+
+ # Create ChatMessage with fast path response (in user's language)
+ messageData = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": responseText or "Fast path response completed",
+ "status": "last", # Fast path completes the workflow - UI polling stops on this
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": "fast_path_response",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0, # Fast path doesn't have tasks
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "success",
+ "actionProgress": "success"
+ }
+
+ # Store message with documents
+ self.services.chat.storeMessageWithDocuments(workflow, messageData, chatDocuments)
+
+ logger.info(f"Fast path completed successfully, response length: {len(responseText)} chars")
+
+ except Exception as e:
+ logger.error(f"Error in _executeFastPath: {str(e)}")
+ # Fall back to full workflow on error
+ logger.info("Falling back to full workflow due to fast path error")
+ taskPlan = await self._planTasks(userInput)
+ await self._executeTasks(taskPlan)
+ await self._processWorkflowResults()
+
+ async def _sendFirstMessage(self, userInput: UserInputRequest, skipIntentionAnalysis: bool = False) -> None:
+ """Send first message to start workflow"""
+ try:
+ workflow = self.services.workflow
+ checkWorkflowStopped(self.services)
+
+ # Create initial message using interface
+ # For first user message, include round info in the user context label
+ roundNum = workflow.currentRound
+ contextLabel = f"round{roundNum}_usercontext"
+
+ # Use normalized request if available (from combined analysis), otherwise use original prompt
+ # This ensures the first message uses the normalized request for security
+ normalizedRequest = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt
+
+ messageData = {
+ "workflowId": workflow.id,
+ "role": "user",
+ "message": normalizedRequest, # Use normalized request instead of original prompt
+ "status": "first",
+ "sequenceNr": 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": contextLabel,
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "pending",
+ "actionProgress": "pending"
+ }
+
+ # Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents
+ # SKIP user intention analysis if already done in combined analysis (skipIntentionAnalysis=True)
+ # or for AUTOMATION mode - it uses predefined JSON plans
+ createdDocs = []
+ workflowMode = getattr(workflow, 'workflowMode', None)
+ skipIntentionAnalysis = skipIntentionAnalysis or (workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION)
+
+ if skipIntentionAnalysis:
+ logger.info("Skipping user intention analysis (already done in combined analysis or AUTOMATION mode)")
+ # Use already analyzed data if available, otherwise use user input directly
+ detectedLanguage = getattr(self.services, 'currentUserLanguage', None)
+ normalizedRequest = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt
+ intentText = getattr(self.services, 'currentUserPrompt', None) or userInput.prompt
+ contextItems = getattr(self.services, 'currentUserContextItems', None) or []
+ workflowIntent = getattr(workflow, '_workflowIntent', None)
+
+ # Create documents for context items (if available from combined analysis)
+ if contextItems and isinstance(contextItems, list):
+ for idx, item in enumerate(contextItems):
+ try:
+ title = item.get('title') if isinstance(item, dict) else None
+ mime = item.get('mimeType') if isinstance(item, dict) else None
+ content = item.get('content') if isinstance(item, dict) else None
+ if not content:
+ continue
+ fileName = (title or f"user_context_{idx+1}.txt").strip()
+ mimeType = (mime or "text/plain").strip()
+
+ # Neutralize content before storing if neutralization is enabled
+ contentBytes = content.encode('utf-8')
+ contentBytes = await self._neutralizeContentIfEnabled(contentBytes, mimeType)
+
+ # Create file in component storage
+ fileItem = self.services.interfaceDbComponent.createFile(
+ name=fileName,
+ mimeType=mimeType,
+ content=contentBytes
+ )
+ # Persist file data
+ self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes)
+
+ # Collect file info
+ fileInfo = self.services.chat.getFileInfo(fileItem.id)
+ from modules.datamodels.datamodelChat import ChatDocument
+ doc = ChatDocument(
+ fileId=fileItem.id,
+ fileName=fileInfo.get("fileName", fileName) if fileInfo else fileName,
+ fileSize=fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes),
+ mimeType=fileInfo.get("mimeType", mimeType) if fileInfo else mimeType
+ )
+ createdDocs.append(doc)
+ except Exception:
+ continue
+ else:
+ try:
+ analyzerPrompt = (
+ "You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n"
+ "1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n"
+ "2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n"
+ "3) intent: concise single-paragraph core request in the detected language for high-level routing.\n"
+ "4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n"
+ "5) dataType: What type of data/content they want (numbers|text|documents|analysis|code|unknown).\n"
+ "6) expectedFormats: What file format(s) they expect - provide matching file format extensions list (e.g., [\"xlsx\", \"pdf\"]). If format is unclear or not specified, use empty list [].\n"
+ "7) qualityRequirements: Quality requirements they have (accuracy, completeness) as {accuracyThreshold: 0.0-1.0, completenessThreshold: 0.0-1.0}.\n"
+ "8) successCriteria: Specific success criteria that define completion (array of strings).\n"
+ "9) needsWorkflowHistory: Boolean indicating if this request needs previous workflow rounds/history to be understood or completed (e.g., 'continue', 'retry', 'fix', 'improve', 'update', 'modify', 'based on previous', 'build on', references to earlier work). Return true if the request is a continuation, retry, modification, or builds upon previous work.\n\n"
+ "Rules:\n"
+ "- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n"
+ "- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n"
+ "- Preserve critical references (URLs, filenames) in intent.\n"
+ "- Normalize to the primary detected language if mixed-language.\n\n"
+ "Return ONLY JSON (no markdown) with this shape:\n"
+ "{\n"
+ " \"detectedLanguage\": \"de|en|fr|it|...\",\n"
+ " \"normalizedRequest\": \"Full explicit instruction in detected language\",\n"
+ " \"intent\": \"Concise normalized request...\",\n"
+ " \"contextItems\": [\n"
+ " {\n"
+ " \"title\": \"User context 1\",\n"
+ " \"mimeType\": \"text/plain\",\n"
+ " \"content\": \"Full extracted content block here\"\n"
+ " }\n"
+ " ],\n"
+ " \"dataType\": \"numbers|text|documents|analysis|code|unknown\",\n"
+ " \"expectedFormats\": [\"pdf\", \"docx\", \"xlsx\", \"txt\", \"json\", \"csv\", \"html\", \"md\"],\n"
+ " \"qualityRequirements\": {\n"
+ " \"accuracyThreshold\": 0.0-1.0,\n"
+ " \"completenessThreshold\": 0.0-1.0\n"
+ " },\n"
+ " \"successCriteria\": [\"specific criterion 1\", \"specific criterion 2\"],\n"
+ " \"needsWorkflowHistory\": true|false\n"
+ "}\n\n"
+ "## User Message\n"
+ "The following is the user's original input message. Extract intent, normalize the request, and identify any large context blocks that should be moved to separate documents:\n\n"
+ "################ USER INPUT START #################\n"
+ f"{userInput.prompt.replace('{', '{{').replace('}', '}}') if userInput.prompt else ''}\n"
+ "################ USER INPUT FINISH #################"
+ )
+
+ # Call AI analyzer (planning call - will use static parameters)
+ aiResponse = await self.services.ai.callAiPlanning(
+ prompt=analyzerPrompt,
+ placeholders=None,
+ debugType="userintention"
+ )
+
+ detectedLanguage = None
+ normalizedRequest = None
+ intentText = userInput.prompt
+ contextItems = []
+ workflowIntent = None
+
+ # Parse analyzer response (JSON expected)
+ try:
+ jsonStart = aiResponse.find('{') if aiResponse else -1
+ jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0
+ if jsonStart != -1 and jsonEnd > jsonStart:
+ parsed = json.loads(aiResponse[jsonStart:jsonEnd])
+ detectedLanguage = parsed.get('detectedLanguage') or None
+ normalizedRequest = parsed.get('normalizedRequest') or None
+ if parsed.get('intent'):
+ intentText = parsed.get('intent')
+ contextItems = parsed.get('contextItems') or []
+
+ # Extract intent analysis fields and store as workflowIntent
+ intentText = parsed.get('intent') or userInput.prompt
+ workflowIntent = {
+ 'intent': intentText, # Use intent instead of primaryGoal
+ 'dataType': parsed.get('dataType', 'unknown'),
+ 'expectedFormats': parsed.get('expectedFormats', []),
+ 'qualityRequirements': parsed.get('qualityRequirements', {}),
+ 'successCriteria': parsed.get('successCriteria', []),
+ 'languageUserDetected': detectedLanguage,
+ 'needsWorkflowHistory': parsed.get('needsWorkflowHistory', False)
+ }
+
+ # Store needsWorkflowHistory in services for fast path decision
+ needsHistoryFromIntention = parsed.get('needsWorkflowHistory', False)
+ # Always set the value - default to False if not a boolean
+ setattr(self.services, '_needsWorkflowHistory', bool(needsHistoryFromIntention) if isinstance(needsHistoryFromIntention, bool) else False)
+
+ # Store workflowIntent in workflow object for reuse
+ if hasattr(self.services, 'workflow') and self.services.workflow:
+ self.services.workflow._workflowIntent = workflowIntent
+ except Exception:
+ contextItems = []
+ workflowIntent = None
+ # Ensure needsWorkflowHistory is False if parsing fails
+ setattr(self.services, '_needsWorkflowHistory', False)
+
+ # Update services state
+ # CRITICAL: Validate language from AI response
+ # If AI didn't return language or invalid → use user language
+ # If user language not set → use "en"
+ validatedLanguage = None
+
+ # Validate AI-detected language
+ if detectedLanguage and isinstance(detectedLanguage, str):
+ detectedLanguage = detectedLanguage.strip().lower()
+ # Check if it's a valid 2-character ISO code
+ if len(detectedLanguage) == 2 and detectedLanguage.isalpha():
+ validatedLanguage = detectedLanguage
+
+ # If AI didn't return valid language, use user language
+ if not validatedLanguage:
+ userLanguage = getattr(self.services.user, 'language', None) if hasattr(self.services, 'user') and self.services.user else None
+ if userLanguage and isinstance(userLanguage, str):
+ userLanguage = userLanguage.strip().lower()
+ if len(userLanguage) == 2 and userLanguage.isalpha():
+ validatedLanguage = userLanguage
+
+ # Final fallback to "en"
+ if not validatedLanguage:
+ validatedLanguage = "en"
+ logger.warning("Language not detected from AI and user language not set - using default 'en'")
+
+ # Set validated language
+ self._setUserLanguage(validatedLanguage)
+ try:
+ setattr(self.services, 'currentUserLanguage', validatedLanguage)
+ logger.debug(f"Set currentUserLanguage to validated value: {validatedLanguage}")
+ except Exception:
+ pass
+ self.services.currentUserPrompt = intentText or userInput.prompt
+ # Always set currentUserPromptNormalized - use normalizedRequest if available, otherwise fallback to currentUserPrompt
+ # CRITICAL: normalizedRequest MUST be used if available, do NOT fall back to intent
+ if normalizedRequest and normalizedRequest.strip():
+ # Use normalizedRequest if available and not empty
+ self.services.currentUserPromptNormalized = normalizedRequest
+ logger.debug(f"Stored normalized request from analysis (length: {len(normalizedRequest)})")
+ else:
+ # Fallback only if normalizedRequest is None or empty
+ logger.warning(f"normalizedRequest is None or empty in analysis, falling back to intentText. normalizedRequest={normalizedRequest}, intentText={intentText}")
+ self.services.currentUserPromptNormalized = intentText or userInput.prompt
+ if contextItems is not None:
+ self.services.currentUserContextItems = contextItems
+
+ # Update message with normalized request if analysis produced one
+ if normalizedRequest and normalizedRequest != userInput.prompt:
+ messageData["message"] = normalizedRequest
+ logger.debug(f"Updated first message with normalized request (length: {len(normalizedRequest)})")
+
+ # Create documents for context items
+ if contextItems and isinstance(contextItems, list):
+ for idx, item in enumerate(contextItems):
+ try:
+ title = item.get('title') if isinstance(item, dict) else None
+ mime = item.get('mimeType') if isinstance(item, dict) else None
+ content = item.get('content') if isinstance(item, dict) else None
+ if not content:
+ continue
+ fileName = (title or f"user_context_{idx+1}.txt").strip()
+ mimeType = (mime or "text/plain").strip()
+
+ # Neutralize content before storing if neutralization is enabled
+ contentBytes = content.encode('utf-8')
+ contentBytes = await self._neutralizeContentIfEnabled(contentBytes, mimeType)
+
+ # Create file in component storage
+ fileItem = self.services.interfaceDbComponent.createFile(
+ name=fileName,
+ mimeType=mimeType,
+ content=contentBytes
+ )
+ # Persist file data
+ self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes)
+
+ # Collect file info
+ fileInfo = self.services.chat.getFileInfo(fileItem.id)
+ from modules.datamodels.datamodelChat import ChatDocument
+ doc = ChatDocument(
+ fileId=fileItem.id,
+ fileName=fileInfo.get("fileName", fileName) if fileInfo else fileName,
+ fileSize=fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes),
+ mimeType=fileInfo.get("mimeType", mimeType) if fileInfo else mimeType
+ )
+ createdDocs.append(doc)
+ except Exception:
+ continue
+ except Exception as e:
+ logger.warning(f"Prompt analysis failed or skipped: {str(e)}")
+
+ # Process user-uploaded documents (fileIds) and combine with context documents
+ if userInput.listFileId:
+ try:
+ userDocs = await self._processFileIds(userInput.listFileId, None)
+ if userDocs:
+ createdDocs.extend(userDocs)
+ except Exception as e:
+ logger.warning(f"Failed to process user fileIds: {e}")
+
+ # Finally, persist and bind the first message with combined documents (context + user)
+ self.services.chat.storeMessageWithDocuments(workflow, messageData, createdDocs)
+
+ # Create ChatMessage with success criteria (KPI) AFTER the first user message
+ # This ensures the KPI message appears after the user message in the UI
+ workflowIntent = getattr(workflow, '_workflowIntent', None)
+ if workflowIntent and isinstance(workflowIntent, dict):
+ successCriteria = workflowIntent.get('successCriteria', [])
+ if successCriteria and isinstance(successCriteria, list) and len(successCriteria) > 0:
+ try:
+ # Format success criteria as message with "KPI" title
+ criteriaText = "**KPI**\n\n" + "\n".join([f"• {criterion}" for criterion in successCriteria])
+
+ kpiMessageData = {
+ "workflowId": workflow.id,
+ "role": "system",
+ "message": criteriaText,
+ "summary": f"KPI: {len(successCriteria)} success criteria",
+ "status": "step",
+ "sequenceNr": len(workflow.messages) + 1, # After user message
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0
+ }
+
+ self.services.chat.storeMessageWithDocuments(workflow, kpiMessageData, [])
+ logger.info(f"Created KPI message with {len(successCriteria)} success criteria after first user message")
+ except Exception as e:
+ logger.error(f"Error creating KPI message: {str(e)}")
+
+ except Exception as e:
+ logger.error(f"Error sending first message: {str(e)}")
+ raise
+
+ async def _planTasks(self, userInput: UserInputRequest):
+ """Generate task plan for workflow execution"""
+ workflow = self.services.workflow
+ handling = self.workflowProcessor
+ # Generate task plan first (shared for both modes)
+ # Use normalizedRequest instead of raw userInput.prompt for security
+ normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt
+ taskPlan = await handling.generateTaskPlan(normalizedPrompt, workflow)
+ if not taskPlan or not taskPlan.tasks:
+ raise Exception("No tasks generated in task plan.")
+ workflowMode = getattr(workflow, 'workflowMode')
+ logger.info(f"Workflow object attributes: {workflow.__dict__ if hasattr(workflow, '__dict__') else 'No __dict__'}")
+ logger.info(f"Executing workflow mode={workflowMode} with {len(taskPlan.tasks)} tasks")
+ return taskPlan
+
+ async def _executeTasks(self, taskPlan) -> None:
+ """Execute all tasks in the task plan and update workflow status."""
+ workflow = self.services.workflow
+ handling = self.workflowProcessor
+ totalTasks = len(taskPlan.tasks)
+ allTaskResults: List = []
+ previousResults: List[str] = []
+
+ # Create "Service Workflow Execution" root entry - parent of all tasks
+ workflowExecOperationId = f"workflowExec_{workflow.id}"
+ self.services.chat.progressLogStart(
+ workflowExecOperationId,
+ "Service",
+ "Workflow Execution",
+ f"Executing {totalTasks} task(s)"
+ )
+
+ # Store workflow execution operationId in workflowProcessor for task hierarchy
+ handling.workflowExecOperationId = workflowExecOperationId
+
+ try:
+ for idx, taskStep in enumerate(taskPlan.tasks):
+ currentTaskIndex = idx + 1
+ logger.info(f"Task {currentTaskIndex}/{totalTasks}: {taskStep.objective}")
+
+ # Update workflow state before executing task (fixes "Task 0" issue)
+ handling.updateWorkflowBeforeExecutingTask(currentTaskIndex)
+
+ # Build TaskContext (mode-specific behavior is inside WorkflowProcessor)
+ taskContext = TaskContext(
+ taskStep=taskStep,
+ workflow=workflow,
+ workflowId=workflow.id,
+ availableDocuments=None,
+ availableConnections=None,
+ previousResults=previousResults,
+ previousHandover=None,
+ improvements=[],
+ retryCount=0,
+ previousActionResults=[],
+ previousReviewResult=None,
+ isRegeneration=False,
+ failurePatterns=[],
+ failedActions=[],
+ successfulActions=[],
+ criteriaProgress={
+ 'met_criteria': set(),
+ 'unmet_criteria': set(),
+ 'attempt_history': []
+ }
+ )
+
+ taskResult = await handling.executeTask(taskStep, workflow, taskContext)
+
+ # Persist task result for cross-task/round document references
+ # Convert ChatTaskResult to WorkflowTaskResult for persistence
+ from modules.datamodels.datamodelWorkflow import TaskResult as WorkflowTaskResult
+ from modules.datamodels.datamodelChat import ActionResult
+
+ # Get final ActionResult from task execution (last action result)
+ finalActionResult = None
+ if hasattr(taskResult, 'actionResult'):
+ finalActionResult = taskResult.actionResult
+ elif taskContext.previousActionResults and len(taskContext.previousActionResults) > 0:
+ # Use last action result from context
+ finalActionResult = taskContext.previousActionResults[-1]
+
+ # Create WorkflowTaskResult for persistence
+ if finalActionResult:
+ workflowTaskResult = WorkflowTaskResult(
+ taskId=taskStep.id,
+ actionResult=finalActionResult
+ )
+ # Persist task result (creates ChatMessage + ChatDocuments)
+ await handling.persistTaskResult(workflowTaskResult, workflow, taskContext)
+
+ handoverData = await handling.prepareTaskHandover(taskStep, [], taskResult, workflow)
+ allTaskResults.append({
+ 'taskStep': taskStep,
+ 'taskResult': taskResult,
+ 'handoverData': handoverData
+ })
+ if taskResult.success and taskResult.feedback:
+ previousResults.append(taskResult.feedback)
+
+ # Mark workflow as completed; error/stop cases update status elsewhere
+ workflow.status = "completed"
+ finally:
+ # Finish "Service Workflow Execution" entry
+ self.services.chat.progressLogFinish(workflowExecOperationId, True)
+
+ return None
+
+ async def _processWorkflowResults(self) -> None:
+ """Process workflow results based on workflow status and create appropriate messages"""
+ try:
+ workflow = self.services.workflow
+ try:
+ checkWorkflowStopped(self.services)
+ except WorkflowStoppedException:
+ logger.info(f"Workflow {workflow.id} was stopped during result processing")
+
+ # Create final stopped message
+ stoppedMessage = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": "🛑 Workflow stopped by user",
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": "workflow_stopped",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "stopped",
+ "actionProgress": "stopped"
+ }
+ self.services.chat.storeMessageWithDocuments(workflow, stoppedMessage, [])
+
+ # Update workflow status to stopped
+ workflow.status = "stopped"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+ self.services.chat.updateWorkflow(workflow.id, {
+ "status": "stopped",
+ "lastActivity": workflow.lastActivity
+ })
+ return
+
+ if workflow.status == 'stopped':
+ # Create stopped message
+ stopped_message = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": "🛑 Workflow stopped by user",
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": "workflow_stopped",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "stopped",
+ "actionProgress": "stopped"
+ }
+ self.services.chat.storeMessageWithDocuments(workflow, stopped_message, [])
+
+ # Update workflow status to stopped
+ workflow.status = "stopped"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+ self.services.chat.updateWorkflow(workflow.id, {
+ "status": "stopped",
+ "lastActivity": workflow.lastActivity,
+ "totalTasks": workflow.totalTasks,
+ "totalActions": workflow.totalActions
+ })
+
+ # Add stopped log entry
+ self.services.chat.storeLog(workflow, {
+ "message": "Workflow stopped by user",
+ "type": "warning",
+ "status": "stopped",
+ "progress": 1.0
+ })
+ return
+ elif workflow.status == 'failed':
+ # Create error message
+ errorMessage = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": f"Workflow failed: {'Unknown error'}",
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": "workflow_failure",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "fail",
+ "actionProgress": "fail"
+ }
+ self.services.chat.storeMessageWithDocuments(workflow, errorMessage, [])
+
+ # Update workflow status to failed
+ workflow.status = "failed"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+ self.services.chat.updateWorkflow(workflow.id, {
+ "status": "failed",
+ "lastActivity": workflow.lastActivity,
+ "totalTasks": workflow.totalTasks,
+ "totalActions": workflow.totalActions
+ })
+
+ # Add failed log entry
+ self.services.chat.storeLog(workflow, {
+ "message": "Workflow failed: Unknown error",
+ "type": "error",
+ "status": "failed",
+ "progress": 1.0
+ })
+ return
+
+ # For successful workflows, send detailed completion message
+ await self._sendLastMessage()
+
+ except Exception as e:
+ logger.error(f"Error processing workflow results: {str(e)}")
+ # Create error message
+ error_message = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": f"Error processing workflow results: {str(e)}",
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": "workflow_error",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "fail",
+ "actionProgress": "fail"
+ }
+ self.services.chat.storeMessageWithDocuments(workflow, error_message, [])
+
+ # Update workflow status to failed
+ workflow.status = "failed"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+ self.services.chat.updateWorkflow(workflow.id, {
+ "status": "failed",
+ "lastActivity": workflow.lastActivity,
+ "totalTasks": workflow.totalTasks,
+ "totalActions": workflow.totalActions
+ })
+
+ async def _sendLastMessage(self) -> None:
+ """Send last message to complete workflow (only for successful workflows)"""
+ try:
+ workflow = self.services.workflow
+ # Safety check: ensure this is only called for successful workflows
+ if workflow.status in ['stopped', 'failed']:
+ logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}")
+ return
+
+ # Generate feedback
+ feedback = await self._generateWorkflowFeedback()
+
+ # Create last message using interface
+ messageData = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": feedback,
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": "workflow_feedback",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "success",
+ "actionProgress": "success"
+ }
+
+ # Create message using interface
+ self.services.chat.storeMessageWithDocuments(workflow, messageData, [])
+
+ # Update workflow status to completed
+ workflow.status = "completed"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+
+ # Update workflow in database
+ self.services.chat.updateWorkflow(workflow.id, {
+ "status": "completed",
+ "lastActivity": workflow.lastActivity
+ })
+
+ # Add completion log entry
+ self.services.chat.storeLog(workflow, {
+ "message": "Workflow completed",
+ "type": "success",
+ "status": "completed",
+ "progress": 1.0
+ })
+
+ except Exception as e:
+ logger.error(f"Error sending last message: {str(e)}")
+ raise
+
+ async def _generateWorkflowFeedback(self) -> str:
+ """Generate feedback message for workflow completion"""
+ try:
+ workflow = self.services.workflow
+ checkWorkflowStopped(self.services)
+
+ # Count messages by role
+ userMessages = [msg for msg in workflow.messages if msg.role == 'user']
+ assistantMessages = [msg for msg in workflow.messages if msg.role == 'assistant']
+
+ # Generate summary feedback
+ feedback = f"Workflow completed.\n\n"
+ feedback += f"Processed {len(userMessages)} user inputs and generated {len(assistantMessages)} responses.\n"
+
+ # Add final status
+ if workflow.status == "completed":
+ feedback += "All tasks completed successfully."
+ elif workflow.status == "partial":
+ feedback += "Some tasks completed with partial success."
+ else:
+ feedback += f"Workflow status: {workflow.status}"
+
+ return feedback
+
+ except Exception as e:
+ logger.error(f"Error generating workflow feedback: {str(e)}")
+ return "Workflow processing completed."
+
+ def _handleWorkflowStop(self) -> None:
+ """Handle workflow stop exception"""
+ workflow = self.services.workflow
+ logger.info("Workflow stopped by user")
+
+ # Update workflow status to stopped
+ workflow.status = "stopped"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+ self.services.chat.updateWorkflow(workflow.id, {
+ "status": "stopped",
+ "lastActivity": workflow.lastActivity,
+ "totalTasks": workflow.totalTasks,
+ "totalActions": workflow.totalActions
+ })
+
+ # Create final stopped message
+ stopped_message = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": "🛑 Workflow stopped by user",
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": "workflow_stopped",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "pending",
+ "actionProgress": "pending"
+ }
+ self.services.chat.storeMessageWithDocuments(workflow, stopped_message, [])
+
+ # Add log entry
+ self.services.chat.storeLog(workflow, {
+ "message": "Workflow stopped by user",
+ "type": "warning",
+ "status": "stopped",
+ "progress": 1.0
+ })
+
+ def _handleWorkflowError(self, error: Exception) -> None:
+ """Handle workflow error exception"""
+ workflow = self.services.workflow
+ logger.error(f"Workflow processing error: {str(error)}")
+
+ # Update workflow status to failed
+ workflow.status = "failed"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+ self.services.chat.updateWorkflow(workflow.id, {
+ "status": "failed",
+ "lastActivity": workflow.lastActivity,
+ "totalTasks": workflow.totalTasks,
+ "totalActions": workflow.totalActions
+ })
+
+ # Create error message
+ error_message = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": f"Workflow processing failed: {str(error)}",
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": "workflow_error",
+ "documents": [],
+ # Add workflow context fields
+ "roundNumber": workflow.currentRound,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ # Add progress status
+ "taskProgress": "fail",
+ "actionProgress": "fail"
+ }
+ self.services.chat.storeMessageWithDocuments(workflow, error_message, [])
+
+ # Add error log entry
+ self.services.chat.storeLog(workflow, {
+ "message": f"Workflow failed: {str(error)}",
+ "type": "error",
+ "status": "failed",
+ "progress": 1.0
+ })
+
+ raise
+
+ async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]:
+ """Process file IDs from existing files and return ChatDocument objects.
+
+ NOTE: Neutralization is NOT performed here. For dynamic workflows, neutralization
+ should happen AFTER content extraction (in extractContent action) to neutralize
+ extracted data (ContentPart.data), not ChatDocuments. This ensures neutralization
+ happens after extraction but before AI processing.
+ """
+ documents = []
+
+ workflow = self.services.workflow
+
+ for fileId in fileIds:
+ try:
+ # Get file info from chat service
+ fileInfo = self.services.chat.getFileInfo(fileId)
+ if not fileInfo:
+ logger.warning(f"No file info found for file ID {fileId}")
+ continue
+
+ originalFileName = fileInfo.get("fileName", "unknown")
+ originalMimeType = fileInfo.get("mimeType", "application/octet-stream")
+ fileSizeToUse = fileInfo.get("size", 0)
+
+ # NOTE: Neutralization removed from here - it should happen in extractContent action
+ # after content extraction but before AI processing (for dynamic workflows)
+ # This ensures we neutralize extracted data (ContentPart.data), not ChatDocuments
+
+ # Create document with original file ID (no neutralization)
+ document = ChatDocument(
+ id=str(uuid.uuid4()),
+ messageId=messageId or "",
+ fileId=fileId,
+ fileName=originalFileName,
+ fileSize=fileSizeToUse,
+ mimeType=originalMimeType
+ )
+ documents.append(document)
+ logger.info(f"Processed file ID {fileId} -> {document.fileName}")
+ except Exception as e:
+ errorMsg = f"Error processing file ID {fileId}: {str(e)}"
+ logger.error(errorMsg)
+ self.services.chat.storeLog(workflow, {
+ "message": errorMsg,
+ "type": "error",
+ "status": "error",
+ "progress": -1
+ })
+ return documents
+
+
+ def _setUserLanguage(self, language: str) -> None:
+ """Set user language for the service center"""
+ self.services.user.language = language
+
+ async def _neutralizeContentIfEnabled(self, contentBytes: bytes, mimeType: str) -> bytes:
+ """Neutralize content if neutralization is enabled in user settings"""
+ try:
+ # Check if neutralization is enabled
+ config = self.services.neutralization.getConfig()
+ if not config or not config.enabled:
+ return contentBytes
+
+ # Decode content to text for neutralization
+ try:
+ textContent = contentBytes.decode('utf-8')
+ except UnicodeDecodeError:
+ # Try alternative encodings
+ for enc in ['latin-1', 'cp1252', 'iso-8859-1']:
+ try:
+ textContent = contentBytes.decode(enc)
+ break
+ except UnicodeDecodeError:
+ continue
+ else:
+ # If unable to decode, return original bytes (binary content)
+ logger.debug(f"Unable to decode content for neutralization, skipping: {mimeType}")
+ return contentBytes
+
+ # Neutralize the text content
+ # Note: The neutralization service should use names from config when processing
+ result = self.services.neutralization.processText(textContent)
+ if result and 'neutralized_text' in result:
+ neutralizedText = result['neutralized_text']
+ # Encode back to bytes using the same encoding
+ try:
+ return neutralizedText.encode('utf-8')
+ except Exception as e:
+ logger.warning(f"Error encoding neutralized text: {str(e)}")
+ return contentBytes
+ else:
+ logger.warning("Neutralization did not return neutralized_text")
+ return contentBytes
+ except Exception as e:
+ logger.error(f"Error during content neutralization: {str(e)}")
+ # Return original content on error
+ return contentBytes
+
+ def _checkIfHistoryAvailable(self) -> bool:
+ """Check if workflow history is available (previous rounds exist).
+
+ Returns True if there are previous workflow rounds with messages.
+ """
+ try:
+ from modules.workflows.processing.shared.placeholderFactory import getPreviousRoundContext
+
+ history = getPreviousRoundContext(self.services)
+
+ # Check if history contains actual content (not just "No previous round context available")
+ if history and history != "No previous round context available":
+ return True
+
+ return False
+ except Exception as e:
+ logger.error(f"Error checking if history is available: {str(e)}")
+ return False