diff --git a/how --stat HEAD b/how --stat HEAD deleted file mode 100644 index 52b1d7b3..00000000 --- a/how --stat HEAD +++ /dev/null @@ -1,49 +0,0 @@ -M app.py -A modules/.$DEPENDENCY_DIAGRAM.drawio.bkp -A modules/AUTOMATION_FEATURE_ANALYSIS.md -A modules/BIDIRECTIONAL_IMPORTS.md -A modules/DEPENDENCY_DIAGRAM.drawio -A modules/FEATURES_TO_INTERFACES_IMPORTS.md -M modules/connectors/connectorVoiceGoogle.py -M modules/datamodels/datamodelChat.py -M modules/datamodels/datamodelPagination.py -A modules/features/automation/__init__.py -A modules/features/automation/mainAutomation.py -A modules/features/automation/subAutomationUtils.py -D modules/features/chatAlthaus/COMPONENT_DIAGRAM.md -M modules/features/featuresLifecycle.py -M modules/interfaces/interfaceAiObjects.py -M modules/interfaces/interfaceDbAppObjects.py -M modules/interfaces/interfaceDbChatObjects.py -M modules/interfaces/interfaceDbComponentObjects.py -M modules/interfaces/interfaceVoiceObjects.py -M modules/routes/routeAdminAutomationEvents.py -M modules/routes/routeVoiceGoogle.py -M modules/services/__init__.py -M modules/services/serviceAi/mainServiceAi.py -M modules/services/serviceAi/subJsonResponseHandling.py -M modules/services/serviceChat/mainServiceChat.py -M modules/services/serviceExtraction/mainServiceExtraction.py -M modules/services/serviceExtraction/subPipeline.py -M modules/services/serviceExtraction/subPromptBuilderExtraction.py -M modules/services/serviceGeneration/renderers/rendererXlsx.py -M modules/services/serviceGeneration/subPromptBuilderGeneration.py -A modules/services/serviceSecurity/mainServiceSecurity.py -M modules/services/serviceSharepoint/mainServiceSharepoint.py -M modules/services/serviceUtils/mainServiceUtils.py -A modules/shared/callbackRegistry.py -M modules/shared/debugLogger.py -M modules/shared/jsonUtils.py -M modules/workflows/methods/methodAi.py -M modules/workflows/methods/methodBase.py -A modules/workflows/methods/methodContext.py -M modules/workflows/methods/methodOutlook.py -M modules/workflows/methods/methodSharepoint.py -M modules/workflows/processing/adaptive/contentValidator.py -M modules/workflows/processing/core/messageCreator.py -M modules/workflows/processing/modes/modeAutomation.py -M modules/workflows/processing/modes/modeDynamic.py -M modules/workflows/processing/shared/promptGenerationActionsDynamic.py -M modules/workflows/processing/shared/promptGenerationTaskplan.py -M modules/workflows/processing/workflowProcessor.py -M modules/workflows/workflowManager.py diff --git a/modules/workflows/workflowManager copy.py b/modules/workflows/workflowManager copy.py new file mode 100644 index 00000000..9806060a --- /dev/null +++ b/modules/workflows/workflowManager copy.py @@ -0,0 +1,1385 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +from typing import Dict, Any, List, Optional +import logging +import uuid +import asyncio +import json + +from modules.datamodels.datamodelChat import ( + UserInputRequest, + ChatMessage, + ChatWorkflow, + ChatDocument, + WorkflowModeEnum +) +from modules.datamodels.datamodelChat import TaskContext +from modules.workflows.processing.workflowProcessor import WorkflowProcessor +from modules.workflows.processing.shared.stateTools import WorkflowStoppedException, checkWorkflowStopped + + +logger = logging.getLogger(__name__) + +class WorkflowManager: + """Manager for workflow processing and coordination""" + + def __init__(self, services): + self.services = services + self.workflowProcessor = None + + # Exported functions + + async def workflowStart(self, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None) -> ChatWorkflow: + """Starts a new workflow or continues an existing one, then launches processing.""" + try: + # Debug log to check workflowMode parameter + logger.info(f"WorkflowManager received workflowMode: {workflowMode}") + currentTime = self.services.utils.timestampGetUtc() + + if workflowId: + workflow = self.services.chat.getWorkflow(workflowId) + if not workflow: + raise ValueError(f"Workflow {workflowId} not found") + + # Store workflow in services for reference (this is the ChatWorkflow object) + self.services.workflow = workflow + + # CRITICAL: Update all method instances to use the current Services object with the correct workflow + from modules.workflows.processing.shared.methodDiscovery import discoverMethods + discoverMethods(self.services) + logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}") + + if workflow.status == "running": + logger.info(f"Stopping running workflow {workflowId} before processing new prompt") + workflow.status = "stopped" + workflow.lastActivity = currentTime + self.services.chat.updateWorkflow(workflowId, { + "status": "stopped", + "lastActivity": currentTime + }) + self.services.chat.storeLog(workflow, { + "message": "Workflow stopped for new prompt", + "type": "info", + "status": "stopped", + "progress": 1.0 + }) + + newRound = workflow.currentRound + 1 + self.services.chat.updateWorkflow(workflowId, { + "status": "running", + "lastActivity": currentTime, + "currentRound": newRound, + "workflowMode": workflowMode # Update workflow mode for existing workflows + }) + + # Reflect updates on the in-memory object without reloading + workflow.status = "running" + workflow.lastActivity = currentTime + workflow.currentRound = newRound + workflow.workflowMode = workflowMode + + self.services.chat.storeLog(workflow, { + "message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}", + "type": "info", + "status": "running", + "progress": 0 + }) + + else: + workflowData = { + "name": "New Workflow", + "status": "running", + "startedAt": currentTime, + "lastActivity": currentTime, + "currentRound": 1, + "currentTask": 0, + "currentAction": 0, + "totalTasks": 0, + "totalActions": 0, + "mandateId": self.services.user.mandateId, + "messageIds": [], + "workflowMode": workflowMode, + "maxSteps": 10 , # Set maxSteps + } + + workflow = self.services.chat.createWorkflow(workflowData) + logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}") + logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}") + + # Store workflow in services (this is the ChatWorkflow object) + self.services.workflow = workflow + + # CRITICAL: Update all method instances to use the current Services object with the correct workflow + # This ensures cached method instances don't use stale workflow IDs from previous workflows + from modules.workflows.processing.shared.methodDiscovery import discoverMethods + discoverMethods(self.services) + logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}") + + # Start workflow processing asynchronously + asyncio.create_task(self._workflowProcess(userInput)) + + return workflow + except Exception as e: + logger.error(f"Error starting workflow: {str(e)}") + raise + + async def workflowStop(self, workflowId: str) -> ChatWorkflow: + """Stops a running workflow.""" + try: + workflow = self.services.chat.getWorkflow(workflowId) + if not workflow: + raise ValueError(f"Workflow {workflowId} not found") + + # Store workflow in services (this is the ChatWorkflow object) + self.services.workflow = workflow + + workflow.status = "stopped" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflowId, { + "status": "stopped", + "lastActivity": workflow.lastActivity + }) + self.services.chat.storeLog(workflow, { + "message": "Workflow stopped", + "type": "warning", + "status": "stopped", + "progress": 1.0 + }) + return workflow + except Exception as e: + logger.error(f"Error stopping workflow: {str(e)}") + raise + + # Main processor + + async def _workflowProcess(self, userInput: UserInputRequest) -> None: + """Process a workflow with user input""" + try: + # Send ChatLog message immediately when workflow starts + workflow = self.services.workflow + self.services.chat.storeLog(workflow, { + "message": "Workflow started...", + "type": "info", + "status": "running", + "progress": 0.0 + }) + + # Store the current user prompt in services for easy access throughout the workflow + self.services.rawUserPrompt = userInput.prompt + self.services.currentUserPrompt = userInput.prompt + + # Reset progress logger for new workflow + self.services.chat._progressLogger = None + + # Reset workflow history flag at start of each workflow + setattr(self.services, '_needsWorkflowHistory', False) + + self.workflowProcessor = WorkflowProcessor(self.services) + + # Get workflow mode to determine if combined analysis is needed + workflowMode = getattr(self.services.workflow, 'workflowMode', None) + skipCombinedAnalysis = (workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION) + + if skipCombinedAnalysis: + logger.info("Skipping combined analysis for AUTOMATION mode - using predefined plan") + complexity = "moderate" # Default for automation workflows + needsWorkflowHistory = False # Automation workflows don't need history + detectedLanguage = None # No language detection in automation mode + normalizedRequest = userInput.prompt + intentText = userInput.prompt + contextItems = [] + workflowIntent = None + else: + # Process user-uploaded documents from userInput for combined analysis + documents = [] + if userInput.listFileId: + try: + documents = await self._processFileIds(userInput.listFileId, None) + except Exception as e: + logger.warning(f"Failed to process user fileIds for combined analysis: {e}") + + # Phase 1+2: Kombinierte Analyse: Intent + Komplexität in einem AI-Call + analysisResult = await self._analyzeUserInputAndComplexity(userInput.prompt, documents) + + # Extract results + detectedLanguage = analysisResult.get('detectedLanguage') + normalizedRequest = analysisResult.get('normalizedRequest') + intentText = analysisResult.get('intent') or userInput.prompt + contextItems = analysisResult.get('contextItems', []) + complexity = analysisResult.get('complexity', 'moderate') + needsWorkflowHistory = analysisResult.get('needsWorkflowHistory', False) + fastTrack = analysisResult.get('fastTrack', False) + + # Extract intent analysis fields and store as workflowIntent + workflowIntent = { + 'intent': intentText, # Use intent instead of primaryGoal + 'dataType': analysisResult.get('dataType', 'unknown'), + 'expectedFormats': analysisResult.get('expectedFormats', []), + 'qualityRequirements': analysisResult.get('qualityRequirements', {}), + 'successCriteria': analysisResult.get('successCriteria', []), + 'languageUserDetected': detectedLanguage, + 'needsWorkflowHistory': needsWorkflowHistory + } + + # Store needsWorkflowHistory in services + setattr(self.services, '_needsWorkflowHistory', bool(needsWorkflowHistory)) + + # Store workflowIntent in workflow object for reuse + if hasattr(self.services, 'workflow') and self.services.workflow: + self.services.workflow._workflowIntent = workflowIntent + + # Store normalized request and intent + # CRITICAL: normalizedRequest MUST be used if available, do NOT fall back to intent + self.services.currentUserPrompt = intentText or userInput.prompt + if normalizedRequest and normalizedRequest.strip(): + # Use normalizedRequest if available and not empty + self.services.currentUserPromptNormalized = normalizedRequest + logger.info(f"Stored normalized request (length: {len(normalizedRequest)}, preview: {normalizedRequest[:100]}...)") + else: + # Fallback only if normalizedRequest is None or empty + logger.warning(f"normalizedRequest is None or empty, falling back to intentText. normalizedRequest={normalizedRequest}, intentText={intentText[:100] if intentText else None}...") + self.services.currentUserPromptNormalized = intentText or userInput.prompt + if contextItems is not None: + self.services.currentUserContextItems = contextItems + + # Set detected language + if detectedLanguage and isinstance(detectedLanguage, str): + self._setUserLanguage(detectedLanguage) + try: + setattr(self.services, 'currentUserLanguage', detectedLanguage) + except Exception: + pass + + logger.info(f"Combined analysis: complexity={complexity}, needsWorkflowHistory={needsWorkflowHistory}, language={detectedLanguage}, fastTrack={fastTrack}") + + # Route to fast path for simple requests if history is not needed + # Skip fast path for automation mode or if history is needed + if not skipCombinedAnalysis and complexity == "simple" and not needsWorkflowHistory: + logger.info("Routing to fast path for simple request") + await self._executeFastPath(userInput, documents) + return # Fast path completes the workflow + + # Now send the first message (use already analyzed data if available) + await self._sendFirstMessage(userInput, skipIntentionAnalysis=not skipCombinedAnalysis) + + # Route to full workflow for moderate/complex requests or automation mode + logger.info(f"Routing to full workflow for {complexity} request" + (" (automation mode)" if skipCombinedAnalysis else "")) + taskPlan = await self._planTasks(userInput) + await self._executeTasks(taskPlan) + await self._processWorkflowResults() + + except WorkflowStoppedException: + self._handleWorkflowStop() + + except Exception as e: + self._handleWorkflowError(e) + + # Helper functions + + async def _analyzeUserInputAndComplexity( + self, + userPrompt: str, + documents: List[ChatDocument] + ) -> Dict[str, Any]: + """ + Phase 1+2: Kombinierte Analyse: Intent + Komplexität in einem AI-Call. + + Args: + userPrompt: User-Anfrage + documents: Liste der Dokumente + + Returns: + Dict mit: + - detectedLanguage: ISO 639-1 Sprachcode + - normalizedRequest: Vollständige, explizite Umformulierung + - intent: Kurze Kern-Anfrage + - contextItems: Große Datenblöcke als separate Dokumente + - complexity: "simple" | "moderate" | "complex" + - needsWorkflowHistory: bool + - fastTrack: bool + - dataType: Datentyp + - expectedFormats: Erwartete Formate + - qualityRequirements: Qualitätsanforderungen + - successCriteria: Erfolgskriterien + """ + # Baue Dokument-Liste für Prompt + docListText = "" + if documents: + for i, doc in enumerate(documents, 1): + docListText += f"\n{i}. {doc.fileName} ({doc.mimeType}, {doc.fileSize} bytes)" + + analysisPrompt = f"""You are an input analyzer. From the user's message, perform ALL of the following in one pass: + +1. detectedLanguage: Detect ISO 639-1 language code (e.g., de, en, fr, it) +2. normalizedRequest: Full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details +3. intent: Concise single-paragraph core request in the detected language for high-level routing +4. contextItems: Supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content +5. complexity: "simple" | "moderate" | "complex" + - "simple": Only if NO documents AND NO web search required. Single question, straightforward answer (5-15s) + - "moderate": Multiple steps, some documents, structured response requiring some processing, or web search needed (30-60s) + - "complex": Multi-task workflow, many documents, research needed, content generation required, multi-step planning (60-120s) +6. needsWorkflowHistory: Boolean indicating if this request needs previous workflow rounds/history (e.g., 'continue', 'retry', 'fix', 'improve', 'update', 'modify', 'based on previous', 'build on', references to earlier work) +7. fastTrack: Boolean indicating if Fast Track is possible (simple requests without documents and without workflow history) +8. dataType: What type of data/content they want (numbers|text|documents|analysis|code|unknown) +9. expectedFormats: What file format(s) they expect - provide matching file format extensions list (e.g., ["xlsx", "pdf"]). If format is unclear or not specified, use empty list [] +10. qualityRequirements: Quality requirements they have (accuracy, completeness) as {{accuracyThreshold: 0.0-1.0, completenessThreshold: 0.0-1.0}} +11. successCriteria: Specific success criteria that define completion (array of strings) + +Rules: +- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained +- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear +- Preserve critical references (URLs, filenames) in intent +- Normalize to the primary detected language if mixed-language +- Consider number of documents provided when determining complexity +- Consider need for external research or web search when determining complexity + +Documents provided: {len(documents)} document(s) +{docListText} + +Return ONLY JSON (no markdown) with this exact structure: +{{ + "detectedLanguage": "de|en|fr|it|...", + "normalizedRequest": "Full explicit instruction in detected language", + "intent": "Concise normalized request...", + "contextItems": [ + {{ + "title": "User context 1", + "mimeType": "text/plain", + "content": "Full extracted content block here" + }} + ], + "complexity": "simple" | "moderate" | "complex", + "needsWorkflowHistory": true|false, + "fastTrack": true|false, + "dataType": "numbers|text|documents|analysis|code|unknown", + "expectedFormats": ["pdf", "docx", "xlsx", "txt", "json", "csv", "html", "md"], + "qualityRequirements": {{ + "accuracyThreshold": 0.0-1.0, + "completenessThreshold": 0.0-1.0 + }}, + "successCriteria": ["specific criterion 1", "specific criterion 2"] +}} + +## User Message +The following is the user's original input message. Analyze intent, normalize the request, determine complexity, and identify any large context blocks that should be moved to separate documents: + +################ USER INPUT START ################# +{userPrompt.replace('{', '{{').replace('}', '}}') if userPrompt else ''} +################ USER INPUT FINISH ################# +""" + + # AI-Call (verwende callAiPlanning für einfache JSON-Responses) + # Debug-Logs werden bereits von callAiPlanning geschrieben + aiResponse = await self.services.ai.callAiPlanning( + prompt=analysisPrompt, + placeholders=None, + debugType="user_input_analysis" + ) + + # Parse Result + try: + jsonStart = aiResponse.find('{') if aiResponse else -1 + jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0 + if jsonStart != -1 and jsonEnd > jsonStart: + result = json.loads(aiResponse[jsonStart:jsonEnd]) + return result + else: + logger.warning("Could not parse combined analysis response, using defaults") + return self._getDefaultAnalysisResult() + except Exception as e: + logger.warning(f"Error parsing combined analysis response: {str(e)}, using defaults") + return self._getDefaultAnalysisResult() + + def _getDefaultAnalysisResult(self) -> Dict[str, Any]: + """Fallback Default-Werte wenn Parsing fehlschlägt.""" + return { + "detectedLanguage": "en", + "normalizedRequest": "", + "intent": "", + "contextItems": [], + "complexity": "moderate", + "needsWorkflowHistory": False, + "fastTrack": False, + "dataType": "unknown", + "expectedFormats": [], + "qualityRequirements": { + "accuracyThreshold": 0.8, + "completenessThreshold": 0.8 + }, + "successCriteria": [] + } + + async def _executeFastPath(self, userInput: UserInputRequest, documents: List[ChatDocument]) -> None: + """Execute fast path for simple requests and deliver result to user""" + try: + workflow = self.services.workflow + checkWorkflowStopped(self.services) + + # Get user language if available + userLanguage = getattr(self.services, 'currentUserLanguage', None) + + # Execute fast path - use normalizedRequest if available, otherwise use raw prompt + normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt + result = await self.workflowProcessor.fastPathExecute( + prompt=normalizedPrompt, + documents=documents, + userLanguage=userLanguage + ) + + if not result.success: + # Fast path failed, fall back to full workflow + logger.warning(f"Fast path failed: {result.error}, falling back to full workflow") + taskPlan = await self._planTasks(userInput) + await self._executeTasks(taskPlan) + await self._processWorkflowResults() + return + + # Extract response text from ActionResult + responseText = "" + chatDocuments = [] + + if result.documents and len(result.documents) > 0: + # Get response text from first document + firstDoc = result.documents[0] + if hasattr(firstDoc, 'documentData'): + docData = firstDoc.documentData + if isinstance(docData, bytes): + responseText = docData.decode('utf-8') + else: + responseText = str(docData) + + # Convert ActionDocuments to ChatDocuments for persistence + for actionDoc in result.documents: + if hasattr(actionDoc, 'documentData') and actionDoc.documentData: + # Create file in component storage + fileItem = self.services.interfaceDbComponent.createFile( + name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else "fast_path_response.txt", + mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain", + content=actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8') + ) + # Persist file data + self.services.interfaceDbComponent.createFileData(fileItem.id, actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8')) + + # Get file info + fileInfo = self.services.chat.getFileInfo(fileItem.id) + + # Create ChatDocument as dict (messageId will be assigned by createMessage) + # Don't create ChatDocument object directly - it requires messageId which doesn't exist yet + chatDoc = { + "fileId": fileItem.id, + "fileName": fileInfo.get("fileName", actionDoc.documentName) if fileInfo else actionDoc.documentName, + "fileSize": fileInfo.get("size", len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))) if fileInfo else (len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))), + "mimeType": fileInfo.get("mimeType", actionDoc.mimeType) if fileInfo else actionDoc.mimeType, + "roundNumber": workflow.currentRound, + "taskNumber": 0, # Fast path doesn't have tasks + "actionNumber": 0 + } + chatDocuments.append(chatDoc) + + # Mark workflow as completed BEFORE storing message (so UI polling stops) + workflow.status = "completed" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflow.id, { + "status": "completed", + "lastActivity": workflow.lastActivity + }) + + # Create ChatMessage with fast path response (in user's language) + messageData = { + "workflowId": workflow.id, + "role": "assistant", + "message": responseText or "Fast path response completed", + "status": "last", # Fast path completes the workflow - UI polling stops on this + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": "fast_path_response", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, # Fast path doesn't have tasks + "actionNumber": 0, + # Add progress status + "taskProgress": "success", + "actionProgress": "success" + } + + # Store message with documents + self.services.chat.storeMessageWithDocuments(workflow, messageData, chatDocuments) + + logger.info(f"Fast path completed successfully, response length: {len(responseText)} chars") + + except Exception as e: + logger.error(f"Error in _executeFastPath: {str(e)}") + # Fall back to full workflow on error + logger.info("Falling back to full workflow due to fast path error") + taskPlan = await self._planTasks(userInput) + await self._executeTasks(taskPlan) + await self._processWorkflowResults() + + async def _sendFirstMessage(self, userInput: UserInputRequest, skipIntentionAnalysis: bool = False) -> None: + """Send first message to start workflow""" + try: + workflow = self.services.workflow + checkWorkflowStopped(self.services) + + # Create initial message using interface + # For first user message, include round info in the user context label + roundNum = workflow.currentRound + contextLabel = f"round{roundNum}_usercontext" + + # Use normalized request if available (from combined analysis), otherwise use original prompt + # This ensures the first message uses the normalized request for security + normalizedRequest = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt + + messageData = { + "workflowId": workflow.id, + "role": "user", + "message": normalizedRequest, # Use normalized request instead of original prompt + "status": "first", + "sequenceNr": 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": contextLabel, + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "pending", + "actionProgress": "pending" + } + + # Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents + # SKIP user intention analysis if already done in combined analysis (skipIntentionAnalysis=True) + # or for AUTOMATION mode - it uses predefined JSON plans + createdDocs = [] + workflowMode = getattr(workflow, 'workflowMode', None) + skipIntentionAnalysis = skipIntentionAnalysis or (workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION) + + if skipIntentionAnalysis: + logger.info("Skipping user intention analysis (already done in combined analysis or AUTOMATION mode)") + # Use already analyzed data if available, otherwise use user input directly + detectedLanguage = getattr(self.services, 'currentUserLanguage', None) + normalizedRequest = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt + intentText = getattr(self.services, 'currentUserPrompt', None) or userInput.prompt + contextItems = getattr(self.services, 'currentUserContextItems', None) or [] + workflowIntent = getattr(workflow, '_workflowIntent', None) + + # Create documents for context items (if available from combined analysis) + if contextItems and isinstance(contextItems, list): + for idx, item in enumerate(contextItems): + try: + title = item.get('title') if isinstance(item, dict) else None + mime = item.get('mimeType') if isinstance(item, dict) else None + content = item.get('content') if isinstance(item, dict) else None + if not content: + continue + fileName = (title or f"user_context_{idx+1}.txt").strip() + mimeType = (mime or "text/plain").strip() + + # Neutralize content before storing if neutralization is enabled + contentBytes = content.encode('utf-8') + contentBytes = await self._neutralizeContentIfEnabled(contentBytes, mimeType) + + # Create file in component storage + fileItem = self.services.interfaceDbComponent.createFile( + name=fileName, + mimeType=mimeType, + content=contentBytes + ) + # Persist file data + self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes) + + # Collect file info + fileInfo = self.services.chat.getFileInfo(fileItem.id) + from modules.datamodels.datamodelChat import ChatDocument + doc = ChatDocument( + fileId=fileItem.id, + fileName=fileInfo.get("fileName", fileName) if fileInfo else fileName, + fileSize=fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes), + mimeType=fileInfo.get("mimeType", mimeType) if fileInfo else mimeType + ) + createdDocs.append(doc) + except Exception: + continue + else: + try: + analyzerPrompt = ( + "You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n" + "1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n" + "2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n" + "3) intent: concise single-paragraph core request in the detected language for high-level routing.\n" + "4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n" + "5) dataType: What type of data/content they want (numbers|text|documents|analysis|code|unknown).\n" + "6) expectedFormats: What file format(s) they expect - provide matching file format extensions list (e.g., [\"xlsx\", \"pdf\"]). If format is unclear or not specified, use empty list [].\n" + "7) qualityRequirements: Quality requirements they have (accuracy, completeness) as {accuracyThreshold: 0.0-1.0, completenessThreshold: 0.0-1.0}.\n" + "8) successCriteria: Specific success criteria that define completion (array of strings).\n" + "9) needsWorkflowHistory: Boolean indicating if this request needs previous workflow rounds/history to be understood or completed (e.g., 'continue', 'retry', 'fix', 'improve', 'update', 'modify', 'based on previous', 'build on', references to earlier work). Return true if the request is a continuation, retry, modification, or builds upon previous work.\n\n" + "Rules:\n" + "- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n" + "- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n" + "- Preserve critical references (URLs, filenames) in intent.\n" + "- Normalize to the primary detected language if mixed-language.\n\n" + "Return ONLY JSON (no markdown) with this shape:\n" + "{\n" + " \"detectedLanguage\": \"de|en|fr|it|...\",\n" + " \"normalizedRequest\": \"Full explicit instruction in detected language\",\n" + " \"intent\": \"Concise normalized request...\",\n" + " \"contextItems\": [\n" + " {\n" + " \"title\": \"User context 1\",\n" + " \"mimeType\": \"text/plain\",\n" + " \"content\": \"Full extracted content block here\"\n" + " }\n" + " ],\n" + " \"dataType\": \"numbers|text|documents|analysis|code|unknown\",\n" + " \"expectedFormats\": [\"pdf\", \"docx\", \"xlsx\", \"txt\", \"json\", \"csv\", \"html\", \"md\"],\n" + " \"qualityRequirements\": {\n" + " \"accuracyThreshold\": 0.0-1.0,\n" + " \"completenessThreshold\": 0.0-1.0\n" + " },\n" + " \"successCriteria\": [\"specific criterion 1\", \"specific criterion 2\"],\n" + " \"needsWorkflowHistory\": true|false\n" + "}\n\n" + "## User Message\n" + "The following is the user's original input message. Extract intent, normalize the request, and identify any large context blocks that should be moved to separate documents:\n\n" + "################ USER INPUT START #################\n" + f"{userInput.prompt.replace('{', '{{').replace('}', '}}') if userInput.prompt else ''}\n" + "################ USER INPUT FINISH #################" + ) + + # Call AI analyzer (planning call - will use static parameters) + aiResponse = await self.services.ai.callAiPlanning( + prompt=analyzerPrompt, + placeholders=None, + debugType="userintention" + ) + + detectedLanguage = None + normalizedRequest = None + intentText = userInput.prompt + contextItems = [] + workflowIntent = None + + # Parse analyzer response (JSON expected) + try: + jsonStart = aiResponse.find('{') if aiResponse else -1 + jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0 + if jsonStart != -1 and jsonEnd > jsonStart: + parsed = json.loads(aiResponse[jsonStart:jsonEnd]) + detectedLanguage = parsed.get('detectedLanguage') or None + normalizedRequest = parsed.get('normalizedRequest') or None + if parsed.get('intent'): + intentText = parsed.get('intent') + contextItems = parsed.get('contextItems') or [] + + # Extract intent analysis fields and store as workflowIntent + intentText = parsed.get('intent') or userInput.prompt + workflowIntent = { + 'intent': intentText, # Use intent instead of primaryGoal + 'dataType': parsed.get('dataType', 'unknown'), + 'expectedFormats': parsed.get('expectedFormats', []), + 'qualityRequirements': parsed.get('qualityRequirements', {}), + 'successCriteria': parsed.get('successCriteria', []), + 'languageUserDetected': detectedLanguage, + 'needsWorkflowHistory': parsed.get('needsWorkflowHistory', False) + } + + # Store needsWorkflowHistory in services for fast path decision + needsHistoryFromIntention = parsed.get('needsWorkflowHistory', False) + # Always set the value - default to False if not a boolean + setattr(self.services, '_needsWorkflowHistory', bool(needsHistoryFromIntention) if isinstance(needsHistoryFromIntention, bool) else False) + + # Store workflowIntent in workflow object for reuse + if hasattr(self.services, 'workflow') and self.services.workflow: + self.services.workflow._workflowIntent = workflowIntent + except Exception: + contextItems = [] + workflowIntent = None + # Ensure needsWorkflowHistory is False if parsing fails + setattr(self.services, '_needsWorkflowHistory', False) + + # Update services state + # CRITICAL: Validate language from AI response + # If AI didn't return language or invalid → use user language + # If user language not set → use "en" + validatedLanguage = None + + # Validate AI-detected language + if detectedLanguage and isinstance(detectedLanguage, str): + detectedLanguage = detectedLanguage.strip().lower() + # Check if it's a valid 2-character ISO code + if len(detectedLanguage) == 2 and detectedLanguage.isalpha(): + validatedLanguage = detectedLanguage + + # If AI didn't return valid language, use user language + if not validatedLanguage: + userLanguage = getattr(self.services.user, 'language', None) if hasattr(self.services, 'user') and self.services.user else None + if userLanguage and isinstance(userLanguage, str): + userLanguage = userLanguage.strip().lower() + if len(userLanguage) == 2 and userLanguage.isalpha(): + validatedLanguage = userLanguage + + # Final fallback to "en" + if not validatedLanguage: + validatedLanguage = "en" + logger.warning("Language not detected from AI and user language not set - using default 'en'") + + # Set validated language + self._setUserLanguage(validatedLanguage) + try: + setattr(self.services, 'currentUserLanguage', validatedLanguage) + logger.debug(f"Set currentUserLanguage to validated value: {validatedLanguage}") + except Exception: + pass + self.services.currentUserPrompt = intentText or userInput.prompt + # Always set currentUserPromptNormalized - use normalizedRequest if available, otherwise fallback to currentUserPrompt + # CRITICAL: normalizedRequest MUST be used if available, do NOT fall back to intent + if normalizedRequest and normalizedRequest.strip(): + # Use normalizedRequest if available and not empty + self.services.currentUserPromptNormalized = normalizedRequest + logger.debug(f"Stored normalized request from analysis (length: {len(normalizedRequest)})") + else: + # Fallback only if normalizedRequest is None or empty + logger.warning(f"normalizedRequest is None or empty in analysis, falling back to intentText. normalizedRequest={normalizedRequest}, intentText={intentText}") + self.services.currentUserPromptNormalized = intentText or userInput.prompt + if contextItems is not None: + self.services.currentUserContextItems = contextItems + + # Update message with normalized request if analysis produced one + if normalizedRequest and normalizedRequest != userInput.prompt: + messageData["message"] = normalizedRequest + logger.debug(f"Updated first message with normalized request (length: {len(normalizedRequest)})") + + # Create documents for context items + if contextItems and isinstance(contextItems, list): + for idx, item in enumerate(contextItems): + try: + title = item.get('title') if isinstance(item, dict) else None + mime = item.get('mimeType') if isinstance(item, dict) else None + content = item.get('content') if isinstance(item, dict) else None + if not content: + continue + fileName = (title or f"user_context_{idx+1}.txt").strip() + mimeType = (mime or "text/plain").strip() + + # Neutralize content before storing if neutralization is enabled + contentBytes = content.encode('utf-8') + contentBytes = await self._neutralizeContentIfEnabled(contentBytes, mimeType) + + # Create file in component storage + fileItem = self.services.interfaceDbComponent.createFile( + name=fileName, + mimeType=mimeType, + content=contentBytes + ) + # Persist file data + self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes) + + # Collect file info + fileInfo = self.services.chat.getFileInfo(fileItem.id) + from modules.datamodels.datamodelChat import ChatDocument + doc = ChatDocument( + fileId=fileItem.id, + fileName=fileInfo.get("fileName", fileName) if fileInfo else fileName, + fileSize=fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes), + mimeType=fileInfo.get("mimeType", mimeType) if fileInfo else mimeType + ) + createdDocs.append(doc) + except Exception: + continue + except Exception as e: + logger.warning(f"Prompt analysis failed or skipped: {str(e)}") + + # Process user-uploaded documents (fileIds) and combine with context documents + if userInput.listFileId: + try: + userDocs = await self._processFileIds(userInput.listFileId, None) + if userDocs: + createdDocs.extend(userDocs) + except Exception as e: + logger.warning(f"Failed to process user fileIds: {e}") + + # Finally, persist and bind the first message with combined documents (context + user) + self.services.chat.storeMessageWithDocuments(workflow, messageData, createdDocs) + + # Create ChatMessage with success criteria (KPI) AFTER the first user message + # This ensures the KPI message appears after the user message in the UI + workflowIntent = getattr(workflow, '_workflowIntent', None) + if workflowIntent and isinstance(workflowIntent, dict): + successCriteria = workflowIntent.get('successCriteria', []) + if successCriteria and isinstance(successCriteria, list) and len(successCriteria) > 0: + try: + # Format success criteria as message with "KPI" title + criteriaText = "**KPI**\n\n" + "\n".join([f"• {criterion}" for criterion in successCriteria]) + + kpiMessageData = { + "workflowId": workflow.id, + "role": "system", + "message": criteriaText, + "summary": f"KPI: {len(successCriteria)} success criteria", + "status": "step", + "sequenceNr": len(workflow.messages) + 1, # After user message + "publishedAt": self.services.utils.timestampGetUtc(), + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0 + } + + self.services.chat.storeMessageWithDocuments(workflow, kpiMessageData, []) + logger.info(f"Created KPI message with {len(successCriteria)} success criteria after first user message") + except Exception as e: + logger.error(f"Error creating KPI message: {str(e)}") + + except Exception as e: + logger.error(f"Error sending first message: {str(e)}") + raise + + async def _planTasks(self, userInput: UserInputRequest): + """Generate task plan for workflow execution""" + workflow = self.services.workflow + handling = self.workflowProcessor + # Generate task plan first (shared for both modes) + # Use normalizedRequest instead of raw userInput.prompt for security + normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt + taskPlan = await handling.generateTaskPlan(normalizedPrompt, workflow) + if not taskPlan or not taskPlan.tasks: + raise Exception("No tasks generated in task plan.") + workflowMode = getattr(workflow, 'workflowMode') + logger.info(f"Workflow object attributes: {workflow.__dict__ if hasattr(workflow, '__dict__') else 'No __dict__'}") + logger.info(f"Executing workflow mode={workflowMode} with {len(taskPlan.tasks)} tasks") + return taskPlan + + async def _executeTasks(self, taskPlan) -> None: + """Execute all tasks in the task plan and update workflow status.""" + workflow = self.services.workflow + handling = self.workflowProcessor + totalTasks = len(taskPlan.tasks) + allTaskResults: List = [] + previousResults: List[str] = [] + + # Create "Service Workflow Execution" root entry - parent of all tasks + workflowExecOperationId = f"workflowExec_{workflow.id}" + self.services.chat.progressLogStart( + workflowExecOperationId, + "Service", + "Workflow Execution", + f"Executing {totalTasks} task(s)" + ) + + # Store workflow execution operationId in workflowProcessor for task hierarchy + handling.workflowExecOperationId = workflowExecOperationId + + try: + for idx, taskStep in enumerate(taskPlan.tasks): + currentTaskIndex = idx + 1 + logger.info(f"Task {currentTaskIndex}/{totalTasks}: {taskStep.objective}") + + # Update workflow state before executing task (fixes "Task 0" issue) + handling.updateWorkflowBeforeExecutingTask(currentTaskIndex) + + # Build TaskContext (mode-specific behavior is inside WorkflowProcessor) + taskContext = TaskContext( + taskStep=taskStep, + workflow=workflow, + workflowId=workflow.id, + availableDocuments=None, + availableConnections=None, + previousResults=previousResults, + previousHandover=None, + improvements=[], + retryCount=0, + previousActionResults=[], + previousReviewResult=None, + isRegeneration=False, + failurePatterns=[], + failedActions=[], + successfulActions=[], + criteriaProgress={ + 'met_criteria': set(), + 'unmet_criteria': set(), + 'attempt_history': [] + } + ) + + taskResult = await handling.executeTask(taskStep, workflow, taskContext) + + # Persist task result for cross-task/round document references + # Convert ChatTaskResult to WorkflowTaskResult for persistence + from modules.datamodels.datamodelWorkflow import TaskResult as WorkflowTaskResult + from modules.datamodels.datamodelChat import ActionResult + + # Get final ActionResult from task execution (last action result) + finalActionResult = None + if hasattr(taskResult, 'actionResult'): + finalActionResult = taskResult.actionResult + elif taskContext.previousActionResults and len(taskContext.previousActionResults) > 0: + # Use last action result from context + finalActionResult = taskContext.previousActionResults[-1] + + # Create WorkflowTaskResult for persistence + if finalActionResult: + workflowTaskResult = WorkflowTaskResult( + taskId=taskStep.id, + actionResult=finalActionResult + ) + # Persist task result (creates ChatMessage + ChatDocuments) + await handling.persistTaskResult(workflowTaskResult, workflow, taskContext) + + handoverData = await handling.prepareTaskHandover(taskStep, [], taskResult, workflow) + allTaskResults.append({ + 'taskStep': taskStep, + 'taskResult': taskResult, + 'handoverData': handoverData + }) + if taskResult.success and taskResult.feedback: + previousResults.append(taskResult.feedback) + + # Mark workflow as completed; error/stop cases update status elsewhere + workflow.status = "completed" + finally: + # Finish "Service Workflow Execution" entry + self.services.chat.progressLogFinish(workflowExecOperationId, True) + + return None + + async def _processWorkflowResults(self) -> None: + """Process workflow results based on workflow status and create appropriate messages""" + try: + workflow = self.services.workflow + try: + checkWorkflowStopped(self.services) + except WorkflowStoppedException: + logger.info(f"Workflow {workflow.id} was stopped during result processing") + + # Create final stopped message + stoppedMessage = { + "workflowId": workflow.id, + "role": "assistant", + "message": "🛑 Workflow stopped by user", + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": "workflow_stopped", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "stopped", + "actionProgress": "stopped" + } + self.services.chat.storeMessageWithDocuments(workflow, stoppedMessage, []) + + # Update workflow status to stopped + workflow.status = "stopped" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflow.id, { + "status": "stopped", + "lastActivity": workflow.lastActivity + }) + return + + if workflow.status == 'stopped': + # Create stopped message + stopped_message = { + "workflowId": workflow.id, + "role": "assistant", + "message": "🛑 Workflow stopped by user", + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": "workflow_stopped", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "stopped", + "actionProgress": "stopped" + } + self.services.chat.storeMessageWithDocuments(workflow, stopped_message, []) + + # Update workflow status to stopped + workflow.status = "stopped" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflow.id, { + "status": "stopped", + "lastActivity": workflow.lastActivity, + "totalTasks": workflow.totalTasks, + "totalActions": workflow.totalActions + }) + + # Add stopped log entry + self.services.chat.storeLog(workflow, { + "message": "Workflow stopped by user", + "type": "warning", + "status": "stopped", + "progress": 1.0 + }) + return + elif workflow.status == 'failed': + # Create error message + errorMessage = { + "workflowId": workflow.id, + "role": "assistant", + "message": f"Workflow failed: {'Unknown error'}", + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": "workflow_failure", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "fail", + "actionProgress": "fail" + } + self.services.chat.storeMessageWithDocuments(workflow, errorMessage, []) + + # Update workflow status to failed + workflow.status = "failed" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflow.id, { + "status": "failed", + "lastActivity": workflow.lastActivity, + "totalTasks": workflow.totalTasks, + "totalActions": workflow.totalActions + }) + + # Add failed log entry + self.services.chat.storeLog(workflow, { + "message": "Workflow failed: Unknown error", + "type": "error", + "status": "failed", + "progress": 1.0 + }) + return + + # For successful workflows, send detailed completion message + await self._sendLastMessage() + + except Exception as e: + logger.error(f"Error processing workflow results: {str(e)}") + # Create error message + error_message = { + "workflowId": workflow.id, + "role": "assistant", + "message": f"Error processing workflow results: {str(e)}", + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": "workflow_error", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "fail", + "actionProgress": "fail" + } + self.services.chat.storeMessageWithDocuments(workflow, error_message, []) + + # Update workflow status to failed + workflow.status = "failed" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflow.id, { + "status": "failed", + "lastActivity": workflow.lastActivity, + "totalTasks": workflow.totalTasks, + "totalActions": workflow.totalActions + }) + + async def _sendLastMessage(self) -> None: + """Send last message to complete workflow (only for successful workflows)""" + try: + workflow = self.services.workflow + # Safety check: ensure this is only called for successful workflows + if workflow.status in ['stopped', 'failed']: + logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}") + return + + # Generate feedback + feedback = await self._generateWorkflowFeedback() + + # Create last message using interface + messageData = { + "workflowId": workflow.id, + "role": "assistant", + "message": feedback, + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": "workflow_feedback", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "success", + "actionProgress": "success" + } + + # Create message using interface + self.services.chat.storeMessageWithDocuments(workflow, messageData, []) + + # Update workflow status to completed + workflow.status = "completed" + workflow.lastActivity = self.services.utils.timestampGetUtc() + + # Update workflow in database + self.services.chat.updateWorkflow(workflow.id, { + "status": "completed", + "lastActivity": workflow.lastActivity + }) + + # Add completion log entry + self.services.chat.storeLog(workflow, { + "message": "Workflow completed", + "type": "success", + "status": "completed", + "progress": 1.0 + }) + + except Exception as e: + logger.error(f"Error sending last message: {str(e)}") + raise + + async def _generateWorkflowFeedback(self) -> str: + """Generate feedback message for workflow completion""" + try: + workflow = self.services.workflow + checkWorkflowStopped(self.services) + + # Count messages by role + userMessages = [msg for msg in workflow.messages if msg.role == 'user'] + assistantMessages = [msg for msg in workflow.messages if msg.role == 'assistant'] + + # Generate summary feedback + feedback = f"Workflow completed.\n\n" + feedback += f"Processed {len(userMessages)} user inputs and generated {len(assistantMessages)} responses.\n" + + # Add final status + if workflow.status == "completed": + feedback += "All tasks completed successfully." + elif workflow.status == "partial": + feedback += "Some tasks completed with partial success." + else: + feedback += f"Workflow status: {workflow.status}" + + return feedback + + except Exception as e: + logger.error(f"Error generating workflow feedback: {str(e)}") + return "Workflow processing completed." + + def _handleWorkflowStop(self) -> None: + """Handle workflow stop exception""" + workflow = self.services.workflow + logger.info("Workflow stopped by user") + + # Update workflow status to stopped + workflow.status = "stopped" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflow.id, { + "status": "stopped", + "lastActivity": workflow.lastActivity, + "totalTasks": workflow.totalTasks, + "totalActions": workflow.totalActions + }) + + # Create final stopped message + stopped_message = { + "workflowId": workflow.id, + "role": "assistant", + "message": "🛑 Workflow stopped by user", + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": "workflow_stopped", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "pending", + "actionProgress": "pending" + } + self.services.chat.storeMessageWithDocuments(workflow, stopped_message, []) + + # Add log entry + self.services.chat.storeLog(workflow, { + "message": "Workflow stopped by user", + "type": "warning", + "status": "stopped", + "progress": 1.0 + }) + + def _handleWorkflowError(self, error: Exception) -> None: + """Handle workflow error exception""" + workflow = self.services.workflow + logger.error(f"Workflow processing error: {str(error)}") + + # Update workflow status to failed + workflow.status = "failed" + workflow.lastActivity = self.services.utils.timestampGetUtc() + self.services.chat.updateWorkflow(workflow.id, { + "status": "failed", + "lastActivity": workflow.lastActivity, + "totalTasks": workflow.totalTasks, + "totalActions": workflow.totalActions + }) + + # Create error message + error_message = { + "workflowId": workflow.id, + "role": "assistant", + "message": f"Workflow processing failed: {str(error)}", + "status": "last", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": self.services.utils.timestampGetUtc(), + "documentsLabel": "workflow_error", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "fail", + "actionProgress": "fail" + } + self.services.chat.storeMessageWithDocuments(workflow, error_message, []) + + # Add error log entry + self.services.chat.storeLog(workflow, { + "message": f"Workflow failed: {str(error)}", + "type": "error", + "status": "failed", + "progress": 1.0 + }) + + raise + + async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]: + """Process file IDs from existing files and return ChatDocument objects. + + NOTE: Neutralization is NOT performed here. For dynamic workflows, neutralization + should happen AFTER content extraction (in extractContent action) to neutralize + extracted data (ContentPart.data), not ChatDocuments. This ensures neutralization + happens after extraction but before AI processing. + """ + documents = [] + + workflow = self.services.workflow + + for fileId in fileIds: + try: + # Get file info from chat service + fileInfo = self.services.chat.getFileInfo(fileId) + if not fileInfo: + logger.warning(f"No file info found for file ID {fileId}") + continue + + originalFileName = fileInfo.get("fileName", "unknown") + originalMimeType = fileInfo.get("mimeType", "application/octet-stream") + fileSizeToUse = fileInfo.get("size", 0) + + # NOTE: Neutralization removed from here - it should happen in extractContent action + # after content extraction but before AI processing (for dynamic workflows) + # This ensures we neutralize extracted data (ContentPart.data), not ChatDocuments + + # Create document with original file ID (no neutralization) + document = ChatDocument( + id=str(uuid.uuid4()), + messageId=messageId or "", + fileId=fileId, + fileName=originalFileName, + fileSize=fileSizeToUse, + mimeType=originalMimeType + ) + documents.append(document) + logger.info(f"Processed file ID {fileId} -> {document.fileName}") + except Exception as e: + errorMsg = f"Error processing file ID {fileId}: {str(e)}" + logger.error(errorMsg) + self.services.chat.storeLog(workflow, { + "message": errorMsg, + "type": "error", + "status": "error", + "progress": -1 + }) + return documents + + + def _setUserLanguage(self, language: str) -> None: + """Set user language for the service center""" + self.services.user.language = language + + async def _neutralizeContentIfEnabled(self, contentBytes: bytes, mimeType: str) -> bytes: + """Neutralize content if neutralization is enabled in user settings""" + try: + # Check if neutralization is enabled + config = self.services.neutralization.getConfig() + if not config or not config.enabled: + return contentBytes + + # Decode content to text for neutralization + try: + textContent = contentBytes.decode('utf-8') + except UnicodeDecodeError: + # Try alternative encodings + for enc in ['latin-1', 'cp1252', 'iso-8859-1']: + try: + textContent = contentBytes.decode(enc) + break + except UnicodeDecodeError: + continue + else: + # If unable to decode, return original bytes (binary content) + logger.debug(f"Unable to decode content for neutralization, skipping: {mimeType}") + return contentBytes + + # Neutralize the text content + # Note: The neutralization service should use names from config when processing + result = self.services.neutralization.processText(textContent) + if result and 'neutralized_text' in result: + neutralizedText = result['neutralized_text'] + # Encode back to bytes using the same encoding + try: + return neutralizedText.encode('utf-8') + except Exception as e: + logger.warning(f"Error encoding neutralized text: {str(e)}") + return contentBytes + else: + logger.warning("Neutralization did not return neutralized_text") + return contentBytes + except Exception as e: + logger.error(f"Error during content neutralization: {str(e)}") + # Return original content on error + return contentBytes + + def _checkIfHistoryAvailable(self) -> bool: + """Check if workflow history is available (previous rounds exist). + + Returns True if there are previous workflow rounds with messages. + """ + try: + from modules.workflows.processing.shared.placeholderFactory import getPreviousRoundContext + + history = getPreviousRoundContext(self.services) + + # Check if history contains actual content (not just "No previous round context available") + if history and history != "No previous round context available": + return True + + return False + except Exception as e: + logger.error(f"Error checking if history is available: {str(e)}") + return False