#!/usr/bin/env python3 """ Workflow Test with Documents - Tests chat workflow execution with uploaded documents Simulates the UI route flow: upload files, start workflow with prompt and documents """ import asyncio import json import sys import os import time from typing import Dict, Any, List, Optional # Add the gateway to path (go up 2 levels from tests/functional/) _gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if _gateway_path not in sys.path: sys.path.insert(0, _gateway_path) # Import the service initialization from modules.services import getInterface as getServices from modules.datamodels.datamodelChat import UserInputRequest, WorkflowModeEnum from modules.datamodels.datamodelUam import User from modules.features.chatPlayground.mainChatPlayground import chatStart import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects class WorkflowWithDocumentsTester: def __init__(self): # Use root user for testing (has full access to everything) from modules.interfaces.interfaceDbAppObjects import getRootInterface rootInterface = getRootInterface() self.testUser = rootInterface.currentUser # Initialize services using the existing system self.services = getServices(self.testUser, None) # Test user, no workflow self.workflow = None self.testResults = {} async def initialize(self): """Initialize the test environment.""" # Set logging level to INFO to see workflow progress import logging logging.getLogger().setLevel(logging.INFO) print(f"Initialized test with user: {self.testUser.id}") print(f"Mandate ID: {self.testUser.mandateId}") def createCsvTemplate(self) -> str: """Create a CSV template file for prime numbers.""" csvContent = """Primzahl,Index 2,1 3,2 5,3 7,4 11,5 13,6 17,7 19,8 23,9 29,10 """ return csvContent def createSecondDocument(self) -> str: """Create a second text document with instructions.""" docContent = """Anweisungen zur Primzahlgenerierung: 1. Generiere die ersten 5000 Primzahlen 2. Formatiere sie in einer Tabelle mit 10 Spalten pro Zeile 3. Verwende das bereitgestellte CSV-Vorlagenformat 4. Stelle sicher, dass alle Zahlen korrekt formatiert sind 5. Füge eine Index-Spalte hinzu, die bei 1 beginnt """ return docContent async def uploadFiles(self) -> List[str]: """Upload test files to the filesystem and return their file IDs.""" print("\n" + "="*60) print("UPLOADING TEST FILES") print("="*60) fileIds = [] # Create CSV template file csvContent = self.createCsvTemplate() csvFileName = "prime_numbers_template.csv" print(f"Creating CSV template: {csvFileName}") print(f"Content length: {len(csvContent)} bytes") # Create file in component storage csvFileItem = self.services.interfaceDbComponent.createFile( name=csvFileName, mimeType="text/csv", content=csvContent.encode('utf-8') ) # Persist file data self.services.interfaceDbComponent.createFileData(csvFileItem.id, csvContent.encode('utf-8')) fileIds.append(csvFileItem.id) print(f"✅ Created CSV file with ID: {csvFileItem.id}") print(f" File name: {csvFileItem.fileName}") print(f" MIME type: {csvFileItem.mimeType}") # Create second text document docContent = self.createSecondDocument() docFileName = "prime_numbers_instructions.txt" print(f"\nCreating instruction document: {docFileName}") print(f"Content length: {len(docContent)} bytes") # Create file in component storage docFileItem = self.services.interfaceDbComponent.createFile( name=docFileName, mimeType="text/plain", content=docContent.encode('utf-8') ) # Persist file data self.services.interfaceDbComponent.createFileData(docFileItem.id, docContent.encode('utf-8')) fileIds.append(docFileItem.id) print(f"✅ Created instruction file with ID: {docFileItem.id}") print(f" File name: {docFileItem.fileName}") print(f" MIME type: {docFileItem.mimeType}") return fileIds async def startWorkflow(self, prompt: str, fileIds: List[str]) -> None: """Start a chat workflow with prompt and documents.""" print("\n" + "="*60) print("STARTING WORKFLOW") print("="*60) print(f"Prompt: {prompt}") print(f"Number of files: {len(fileIds)}") print(f"File IDs: {fileIds}") # Create UserInputRequest userInput = UserInputRequest( prompt=prompt, listFileId=fileIds, userLanguage="en" ) # Start workflow (this is async and returns immediately) print("\nCalling chatStart...") self.workflow = await chatStart( currentUser=self.testUser, userInput=userInput, workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC, workflowId=None ) print(f"✅ Workflow started with ID: {self.workflow.id}") print(f" Status: {self.workflow.status}") print(f" Mode: {self.workflow.workflowMode}") print(f" Current Round: {self.workflow.currentRound}") async def waitForWorkflowCompletion(self, maxWaitTime: int = 300) -> bool: """Wait for workflow to complete, checking status periodically.""" print("\n" + "="*60) print("WAITING FOR WORKFLOW COMPLETION") print("="*60) if not self.workflow: print("❌ No workflow to wait for") return False startTime = time.time() checkInterval = 2 # Check every 2 seconds lastStatus = None while time.time() - startTime < maxWaitTime: # Get current workflow status interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) currentWorkflow = interfaceDbChat.getWorkflow(self.workflow.id) if not currentWorkflow: print("❌ Workflow not found in database") return False currentStatus = currentWorkflow.status # Print status if it changed if currentStatus != lastStatus: print(f"Workflow status: {currentStatus} (elapsed: {int(time.time() - startTime)}s)") lastStatus = currentStatus # Check if workflow is complete if currentStatus in ["completed", "stopped", "failed"]: self.workflow = currentWorkflow print(f"\n✅ Workflow finished with status: {currentStatus}") return currentStatus == "completed" # Wait before next check await asyncio.sleep(checkInterval) print(f"\n⚠️ Workflow did not complete within {maxWaitTime} seconds") print(f" Final status: {self.workflow.status}") return False def analyzeWorkflowResults(self) -> Dict[str, Any]: """Analyze workflow results and extract information.""" print("\n" + "="*60) print("ANALYZING WORKFLOW RESULTS") print("="*60) if not self.workflow: return {"error": "No workflow to analyze"} interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) workflow = interfaceDbChat.getWorkflow(self.workflow.id) if not workflow: return {"error": "Workflow not found"} # Get unified chat data chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None) # Count messages messages = chatData.get("messages", []) userMessages = [m for m in messages if m.get("role") == "user"] assistantMessages = [m for m in messages if m.get("role") == "assistant"] # Count documents documents = chatData.get("documents", []) # Get logs logs = chatData.get("logs", []) # Get stats stats = chatData.get("stats", []) results = { "workflowId": workflow.id, "status": workflow.status, "workflowMode": str(workflow.workflowMode) if hasattr(workflow, 'workflowMode') else None, "currentRound": workflow.currentRound, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions, "messageCount": len(messages), "userMessageCount": len(userMessages), "assistantMessageCount": len(assistantMessages), "documentCount": len(documents), "logCount": len(logs), "statCount": len(stats), "messages": messages, "documents": documents, "logs": logs, "stats": stats } print(f"Workflow ID: {results['workflowId']}") print(f"Status: {results['status']}") print(f"Mode: {results['workflowMode']}") print(f"Round: {results['currentRound']}") print(f"Tasks: {results['totalTasks']}") print(f"Actions: {results['totalActions']}") print(f"Messages: {results['messageCount']} (User: {results['userMessageCount']}, Assistant: {results['assistantMessageCount']})") print(f"Documents: {results['documentCount']}") print(f"Logs: {results['logCount']}") print(f"Stats: {results['statCount']}") # Print first user message if userMessages: print(f"\nFirst user message:") print(f" {userMessages[0].get('message', '')[:200]}...") # Print last assistant message if assistantMessages: print(f"\nLast assistant message:") lastMsg = assistantMessages[-1] print(f" {lastMsg.get('message', '')[:200]}...") if lastMsg.get('documents'): print(f" Documents attached: {len(lastMsg['documents'])}") # Print document names if documents: print(f"\nGenerated documents:") for doc in documents: print(f" - {doc.get('fileName', 'unknown')} ({doc.get('fileSize', 0)} bytes)") return results async def runTest(self): """Run the complete test.""" print("\n" + "="*80) print("WORKFLOW TEST WITH DOCUMENTS") print("="*80) try: # Initialize await self.initialize() # Upload files fileIds = await self.uploadFiles() # Start workflow with prompt and files prompt = "Generiere die ersten 5000 Primzahlen in einer Tabelle mit 10 Spalten pro Zeile." await self.startWorkflow(prompt, fileIds) # Wait for completion completed = await self.waitForWorkflowCompletion(maxWaitTime=300) # Analyze results results = self.analyzeWorkflowResults() self.testResults = { "completed": completed, "results": results } print("\n" + "="*80) print("TEST SUMMARY") print("="*80) print(f"Workflow completed: {'✅' if completed else '❌'}") print(f"Status: {results.get('status', 'unknown')}") print(f"Messages: {results.get('messageCount', 0)}") print(f"Documents: {results.get('documentCount', 0)}") return self.testResults except Exception as e: import traceback print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}") print(f"Traceback:\n{traceback.format_exc()}") self.testResults = { "completed": False, "error": str(e), "traceback": traceback.format_exc() } return self.testResults async def main(): """Run workflow test with documents.""" tester = WorkflowWithDocumentsTester() results = await tester.runTest() # Print final results as JSON for easy parsing print("\n" + "="*80) print("FINAL RESULTS (JSON)") print("="*80) print(json.dumps(results, indent=2, default=str)) if __name__ == "__main__": asyncio.run(main())