""" Test Module for Workflow State Machine This script tests each state of the workflow state machine implementation from initialization to completion, including error scenarios. Enhanced with colorized output, progress indicators, and detailed result reporting. """ import os import sys import uuid import json import base64 import asyncio import unittest from unittest.mock import patch, MagicMock, AsyncMock from datetime import datetime, timedelta from typing import Dict, List, Any import time import traceback # Try to import colorama, install if not available try: from colorama import init, Fore, Back, Style init() # Initialize colorama except ImportError: print("Installing required package: colorama") import subprocess subprocess.call([sys.executable, "-m", "pip", "install", "colorama"]) from colorama import init, Fore, Back, Style init() # Initialize colorama # Add parent directory to path for imports if needed current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) # Mock modules for testing environment class MockDomInterface: def __init__(self, *args, **kwargs): self.getWorkflow = MagicMock(return_value=None) self.loadWorkflowState = MagicMock(return_value=None) self.createWorkflow = MagicMock() self.updateWorkflow = MagicMock() self.createWorkflowLog = MagicMock() self.createWorkflowMessage = MagicMock() self.getFile = MagicMock() self.getFileData = MagicMock() self.saveUploadedFile = MagicMock() self.userLanguage = "en" self.callAi = AsyncMock() self.setUserLanguage = MagicMock() def reset_mock(self): """Reset all mocks in this interface""" for attr_name in dir(self): attr = getattr(self, attr_name) if hasattr(attr, 'reset_mock'): attr.reset_mock() class MockAgentRegistry: def __init__(self): self.getAgent = MagicMock() self.getAgentInfos = MagicMock(return_value=[ {"name": "test_agent", "description": "Test agent", "capabilities": ["text_processing"]} ]) self.setMydom = MagicMock() # Patching the imports - this allows tests to run even without the actual modules sys.modules['modules.lucydomInterface'] = MagicMock() sys.modules['modules.lucydomInterface'].getLucydomInterface = MagicMock(return_value=MockDomInterface()) sys.modules['gateway.modules.workflowAgentsRegistry'] = MagicMock() sys.modules['gateway.modules.workflowAgentsRegistry'].getAgentRegistry = MagicMock(return_value=MockAgentRegistry()) sys.modules['modules.documentProcessor'] = MagicMock() sys.modules['modules.documentProcessor'].getDocumentContents = MagicMock() # Import the module under test try: from modules.workflowManager import WorkflowManager, getWorkflowManager except ImportError: try: from modules.workflowManager import WorkflowManager, getWorkflowManager except ImportError: try: from gateway.modules.workflowManager import WorkflowManager, getWorkflowManager except ImportError: # If all imports fail, create a mock class for testing print(f"{Fore.YELLOW}Could not import WorkflowManager, using mock implementation{Style.RESET_ALL}") class WorkflowManager: def __init__(self, mandateId, userId): self.mandateId = mandateId self.userId = userId self.mydom = MockDomInterface(mandateId, userId) self.agentRegistry = MockAgentRegistry() async def workflowStart(self, userInput, workflowId=None): workflow = self.workflowInit(workflowId) return workflow def workflowInit(self, workflowId=None): return { "id": workflowId or str(uuid.uuid4()), "mandateId": self.mandateId, "userId": self.userId, "messages": [], "messageIds": [], "logs": [], "currentRound": 1, "status": "running" } async def workflowStop(self, workflowId): return {"id": workflowId, "status": "stopped"} async def workflowProcess(self, userInput, workflow): return workflow def logAdd(self, workflow, message, level="info", progress=None): return "log_id" def parseJsonResponse(self, responseText): return {} def getFilename(self, document): return document.get("name", "") + "." + document.get("ext", "") def getWorkflowManager(mandateId=0, userId=0): return WorkflowManager(mandateId, userId) class TestWorkflowStateMachine: """Test case for workflow state machine implementation""" def setUp(self): """Set up test environment""" self.mandateId = 1 self.userId = 1 # Create mocks self._setup_mocks() # Create manager with mocked dependencies self.manager = self._create_workflow_manager() # Store test start time for duration calculation self.test_start_time = time.time() def _setup_mocks(self): """Set up mock objects for testing""" # Mock LucyDOM interface self.mydom_mock = MockDomInterface() self.mydom_mock.callAi = AsyncMock() self.mydom_mock.loadWorkflowState.return_value = None # Default to no existing workflow self.mydom_mock.getWorkflow.return_value = None # Default to no existing workflow self.mydom_mock.userLanguage = "en" # Default language # Mock AgentRegistry self.registry_mock = MockAgentRegistry() test_agent = MagicMock() test_agent.processTask = AsyncMock() test_agent.mydom = self.mydom_mock self.registry_mock.getAgent.return_value = test_agent self.registry_mock.getAgentInfos.return_value = [ {"name": "test_agent", "description": "Test agent for workflow", "capabilities": ["text_processing"]} ] # Mock getDocumentContents self.getDocumentContents_mock = MagicMock() self.getDocumentContents_mock.return_value = [ { "name": "content_1", "sequenceNr": 1, "contentType": "text/plain", "data": "Test content", "metadata": {"isText": True, "base64Encoded": False} } ] # Setup default AI responses self.ai_responses = { "project_manager": json.dumps({ "objFinalDocuments": ["result.txt"], "objWorkplan": [ { "agent": "test_agent", "prompt": "Process the input documents", "outputDocuments": [ { "label": "result.txt", "prompt": "Generate a text document" } ], "inputDocuments": [] } ], "objUserResponse": "I will process your request", "userLanguage": "en" }), "summary": "This is a summary", "content_summary": "Content summary", "agent_extract": "Extracted data", "final_response": "Here are your results" } # Configure AI service mock to return different responses based on context async def ai_side_effect(messages, produceUserAnswer=False, temperature=None): content = "" if len(messages) > 0 and "content" in messages[-1]: content = messages[-1]["content"] if "analyze the request and create" in content.lower(): return self.ai_responses["project_manager"] elif "summarize this" in content.lower(): return self.ai_responses["content_summary"] elif "review the promised" in content.lower() or "final" in content.lower(): return self.ai_responses["final_response"] elif "extracting information" in content.lower(): return self.ai_responses["agent_extract"] else: return self.ai_responses["summary"] self.mydom_mock.callAi.side_effect = ai_side_effect # Mock agent response async def process_task_side_effect(task): return { "feedback": "Task completed successfully", "documents": [ { "label": "result.txt", "content": "Generated content" } ] } test_agent.processTask.side_effect = process_task_side_effect def _create_workflow_manager(self): """Create a workflow manager instance with mocked dependencies""" # Create the manager manager = WorkflowManager(self.mandateId, self.userId) # Inject mocks manager.mydom = self.mydom_mock manager.agentRegistry = self.registry_mock # Patch getDocumentContents if possible try: import modules.documentProcessor modules.documentProcessor.getDocumentContents = self.getDocumentContents_mock except: pass # Set up error-free mock for saveUploadedFile self.mydom_mock.saveUploadedFile.return_value = {"id": 1, "name": "test.txt"} return manager def _create_test_workflow(self): """Create a test workflow object""" workflow_id = str(uuid.uuid4()) current_time = datetime.now().isoformat() return { "id": workflow_id, "mandateId": self.mandateId, "userId": self.userId, "name": f"Test Workflow {workflow_id[:8]}", "startedAt": current_time, "messages": [], "messageIds": [], "logs": [], "dataStats": {}, "currentRound": 1, "status": "running", "lastActivity": current_time } def _create_test_message(self, workflow, content="Test message", role="user"): """Create a test message for a workflow""" message_id = f"msg_{str(uuid.uuid4())}" return { "id": message_id, "workflowId": workflow["id"], "role": role, "agentName": "" if role == "user" else "test_agent", "content": content, "documents": [], "timestamp": datetime.now().isoformat(), "sequenceNo": len(workflow.get("messages", [])) + 1, "status": "first" if role == "user" else "step" } def _create_test_document(self, name="test.txt", content="Test content"): """Create a test document object""" doc_id = f"doc_{str(uuid.uuid4())}" return { "id": doc_id, "fileId": 1, "name": os.path.splitext(name)[0], "ext": os.path.splitext(name)[1][1:] if os.path.splitext(name)[1] else "txt", "data": base64.b64encode(content.encode('utf-8')).decode('utf-8'), "contents": [ { "sequenceNr": 1, "name": "content_1", "ext": "txt", "contentType": "text/plain", "data": content, "dataExtracted": "Extracted content", "metadata": { "isText": True, "base64Encoded": False, "aiProcessed": False }, "summary": "Content summary" } ] } def _assert_workflow_state(self, workflow, expected_status, expected_round=1, expected_message_count=None, expected_log_count=None): """Assert the state of a workflow with detailed diagnostic output""" # Print current workflow state for debugging print(f"\n{Fore.CYAN}Workflow State Verification:{Style.RESET_ALL}") print(f" Status: {Fore.YELLOW}{workflow.get('status', 'N/A')}{Style.RESET_ALL} (Expected: {Fore.GREEN}{expected_status}{Style.RESET_ALL})") print(f" Round: {Fore.YELLOW}{workflow.get('currentRound', 'N/A')}{Style.RESET_ALL} (Expected: {Fore.GREEN}{expected_round}{Style.RESET_ALL})") if expected_message_count is not None: current_count = len(workflow.get("messages", [])) print(f" Messages: {Fore.YELLOW}{current_count}{Style.RESET_ALL} (Expected: {Fore.GREEN}{expected_message_count}{Style.RESET_ALL})") if expected_log_count is not None: current_count = len(workflow.get("logs", [])) print(f" Logs: {Fore.YELLOW}{current_count}{Style.RESET_ALL} (Expected: {Fore.GREEN}{expected_log_count}{Style.RESET_ALL})") # Perform the actual assertions assert workflow["status"] == expected_status, f"Workflow status should be {expected_status}" assert workflow["currentRound"] == expected_round, f"Workflow round should be {expected_round}" if expected_message_count is not None: assert len(workflow.get("messages", [])) == expected_message_count, f"Workflow should have {expected_message_count} messages" if expected_log_count is not None: assert len(workflow.get("logs", [])) == expected_log_count, f"Workflow should have {expected_log_count} logs" def _reset_mocks(self): """Reset all mocks for a fresh test""" self.mydom_mock.reset_mock() self.registry_mock.reset_mock() self.getDocumentContents_mock.reset_mock() # ------------------------------------------------------------------------ # TESTS FOR EACH STATE # ------------------------------------------------------------------------ def _print_test_info(self, state_num, state_name, description=None): """Print formatted information about the current test state""" print(f"\n{Fore.CYAN}{Style.BRIGHT}STATE {state_num}: {state_name}{Style.RESET_ALL}") if description: print(f"{Fore.WHITE}{description}{Style.RESET_ALL}") print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}") async def test_state_1_workflow_initialization_new(self): """Test State 1: Workflow Initialization (new workflow)""" self._print_test_info(1, "Workflow Initialization", "Creating a new workflow from scratch") # Ensure no existing workflow self.mydom_mock.getWorkflow.return_value = None self.mydom_mock.loadWorkflowState.return_value = None print(f"{Fore.YELLOW}➤ Initializing new workflow...{Style.RESET_ALL}") # Initialize a new workflow workflow = self.manager.workflowInit() # Print workflow details print(f"{Fore.GREEN}✓ Workflow created:{Style.RESET_ALL}") print(f" ID: {workflow.get('id', 'N/A')}") print(f" Mandate: {workflow.get('mandateId', 'N/A')}") print(f" User: {workflow.get('userId', 'N/A')}") print(f" Round: {workflow.get('currentRound', 'N/A')}") print(f" Status: {workflow.get('status', 'N/A')}") # Assert workflow state print(f"{Fore.YELLOW}➤ Validating workflow state...{Style.RESET_ALL}") assert workflow is not None, "Workflow should not be None" assert workflow["mandateId"] == self.mandateId, f"Workflow mandate ID should be {self.mandateId}" assert workflow["userId"] == self.userId, f"Workflow user ID should be {self.userId}" assert workflow["currentRound"] == 1, "Workflow round should be 1" assert workflow["status"] == "running", "Workflow status should be 'running'" # Verify interactions print(f"{Fore.YELLOW}➤ Verifying database interactions...{Style.RESET_ALL}") self.mydom_mock.createWorkflow.assert_called_once() self.mydom_mock.getWorkflow.assert_called_once() print(f"{Fore.GREEN}✓ Database correctly updated{Style.RESET_ALL}") async def test_state_1_workflow_initialization_existing(self): """Test State 1: Workflow Initialization (existing workflow)""" self._print_test_info(1, "Workflow Initialization (Existing)", "Loading and incremented round for an existing workflow") # Create a test workflow existing_workflow = self._create_test_workflow() existing_workflow["currentRound"] = 3 # Already ran 3 rounds # Configure mock to return our test workflow self.mydom_mock.getWorkflow.return_value = existing_workflow print(f"{Fore.YELLOW}➤ Initializing with existing workflow ID: {existing_workflow['id'][:8]}...{Style.RESET_ALL}") # Initialize with existing workflowId workflow = self.manager.workflowInit(existing_workflow["id"]) # Assert workflow state print(f"{Fore.GREEN}✓ Workflow loaded:{Style.RESET_ALL}") print(f" ID: {workflow.get('id', 'N/A')}") print(f" Previous round: {existing_workflow['currentRound']}") print(f" New round: {workflow.get('currentRound', 'N/A')}") print(f" Status: {workflow.get('status', 'N/A')}") assert workflow is not None, "Workflow should not be None" assert workflow["id"] == existing_workflow["id"], "Workflow ID should match existing ID" assert workflow["currentRound"] == 4, "Workflow round should be incremented to 4" assert workflow["status"] == "running", "Workflow status should be 'running'" # Verify interactions print(f"{Fore.YELLOW}➤ Verifying database interactions...{Style.RESET_ALL}") self.mydom_mock.updateWorkflow.assert_called_once() print(f"{Fore.GREEN}✓ Database correctly updated with incremented round{Style.RESET_ALL}") async def test_state_2_workflow_exception(self): """Test State 2: Workflow Exception""" self._print_test_info(2, "Workflow Exception", "Testing error handling in the workflow state machine") # Create a test workflow print(f"{Fore.YELLOW}➤ Creating test workflow...{Style.RESET_ALL}") workflow = self._create_test_workflow() print(f"{Fore.GREEN}✓ Created workflow with ID: {workflow['id']}{Style.RESET_ALL}") # Simulate an exception in workflow processing print(f"{Fore.YELLOW}➤ Setting up exception scenario...{Style.RESET_ALL}") def raise_exception(*args, **kwargs): print(f"{Fore.RED} [INJECTED ERROR] Raising test exception{Style.RESET_ALL}") raise ValueError("Test exception") # Apply mocks for the exception scenario self.mydom_mock.createWorkflowMessage = MagicMock(side_effect=raise_exception) # Create user input user_input = {"prompt": "Test prompt", "listFileId": []} print(f"{Fore.GREEN}✓ Exception scenario prepared{Style.RESET_ALL}") # Run workflow and expect exception to be caught print(f"{Fore.YELLOW}➤ Running workflow with exception...{Style.RESET_ALL}") try: result = await self.manager.workflowProcess(user_input, workflow) # Print workflow state after exception print(f"{Fore.GREEN}✓ Exception handled correctly{Style.RESET_ALL}") print(f" Workflow status: {Fore.YELLOW}{result['status']}{Style.RESET_ALL}") # Print log entries logs = result.get("logs", []) if logs: print(f" Error logs:") for log in logs: if "failed" in log.get("message", "").lower(): print(f" {Fore.RED}→ {log.get('message')}{Style.RESET_ALL}") # Verify workflow is marked as failed print(f"{Fore.YELLOW}➤ Validating workflow state after exception...{Style.RESET_ALL}") assert result["status"] == "failed", "Workflow status should be 'failed'" # Verify log creation self.mydom_mock.createWorkflowLog.assert_called() # Verify database update print(f"{Fore.YELLOW}➤ Verifying database was updated correctly...{Style.RESET_ALL}") self.mydom_mock.updateWorkflow.assert_called() print(f"{Fore.GREEN}✓ Database correctly updated with failure status{Style.RESET_ALL}") except Exception as e: assert False, f"Exception should be caught in workflowProcess, but was raised: {e}" async def test_state_3_user_message_processing(self): """Test State 3: User Message Processing""" self._print_test_info(3, "User Message Processing", "Processing user input into a workflow message with documents") # Create a test workflow workflow = self._create_test_workflow() # Create test user input user_input = {"prompt": "Please analyze this document", "listFileId": [1]} # Configure file processing mock test_document = self._create_test_document() self.mydom_mock.getFile.return_value = {"name": "test.txt", "mandateId": self.mandateId} self.mydom_mock.getFileData.return_value = b"Test content" print(f"{Fore.YELLOW}➤ Processing user message...{Style.RESET_ALL}") # Process user message message = await self.manager.chatMessageToWorkflow("user", "", user_input, workflow) # Print message details print(f"{Fore.GREEN}✓ Message processed:{Style.RESET_ALL}") print(f" Role: {message.get('role', 'N/A')}") print(f" Content: {message.get('content', 'N/A')}") print(f" Status: {message.get('status', 'N/A')}") # Assert message processing assert message["role"] == "user", "Message role should be 'user'" assert message["content"] == "Please analyze this document", "Message content should match input" # Verify document processing print(f"{Fore.YELLOW}➤ Verifying document processing...{Style.RESET_ALL}") self.mydom_mock.getFile.assert_called() self.mydom_mock.getFileData.assert_called() # Verify the message was added to the workflow print(f"{Fore.YELLOW}➤ Verifying message added to workflow...{Style.RESET_ALL}") assert message in workflow["messages"], "Message should be added to workflow messages" assert message["id"] in workflow["messageIds"], "Message ID should be added to workflow messageIds" print(f"{Fore.GREEN}✓ Message successfully added to workflow{Style.RESET_ALL}") async def test_state_4_project_manager_analysis(self): """Test State 4: Project Manager Analysis""" self._print_test_info(4, "Project Manager Analysis", "Analyzing user request and planning the workflow") # Create a test workflow workflow = self._create_test_workflow() # Create user message user_message = self._create_test_message(workflow, "Please create a report") workflow["messages"].append(user_message) print(f"{Fore.YELLOW}➤ Running project manager analysis...{Style.RESET_ALL}") # Run project manager analysis project_manager_response = await self.manager.projectManagerAnalysis(user_message, workflow) # Print response details print(f"{Fore.GREEN}✓ Project manager analysis completed:{Style.RESET_ALL}") print(f" Final docs: {project_manager_response.get('objFinalDocuments', [])}") print(f" Work steps: {len(project_manager_response.get('objWorkplan', []))}") print(f" User lang: {project_manager_response.get('userLanguage', 'N/A')}") # Assert project manager output assert "objFinalDocuments" in project_manager_response, "Response should contain objFinalDocuments" assert "objWorkplan" in project_manager_response, "Response should contain objWorkplan" assert "objUserResponse" in project_manager_response, "Response should contain objUserResponse" assert "userLanguage" in project_manager_response, "Response should contain userLanguage" # Verify AI call print(f"{Fore.YELLOW}➤ Verifying AI service call...{Style.RESET_ALL}") self.mydom_mock.callAi.assert_called_once() print(f"{Fore.GREEN}✓ AI service correctly called for analysis{Style.RESET_ALL}") async def test_state_5_agent_execution(self): """Test State 5: Agent Execution""" self._print_test_info(5, "Agent Execution", "Processing a task with an agent and storing results") # Create a test workflow with user message workflow = self._create_test_workflow() user_message = self._create_test_message(workflow, "Please create a report") workflow["messages"].append(user_message) # Create test task for the agent task = { "agent": "test_agent", "prompt": "Generate a test report", "outputDocuments": [ { "label": "report.txt", "prompt": "Create a detailed report" } ], "inputDocuments": [] } print(f"{Fore.YELLOW}➤ Executing agent task...{Style.RESET_ALL}") # Execute the agent results = await self.manager.agentProcessing(task, workflow) # Print results print(f"{Fore.GREEN}✓ Agent task executed:{Style.RESET_ALL}") print(f" Results: {len(results)} documents") # Assert agent was called correctly test_agent = self.registry_mock.getAgent.return_value test_agent.processTask.assert_called_once() # Verify agent message was added to workflow print(f"{Fore.YELLOW}➤ Verifying workflow state after agent execution...{Style.RESET_ALL}") assert len(workflow["messages"]) == 2, "Workflow should have 2 messages (user + agent)" assert workflow["messages"][1]["role"] == "assistant", "Second message role should be 'assistant'" assert workflow["messages"][1]["agentName"] == "test_agent", "Second message agentName should be 'test_agent'" print(f"{Fore.GREEN}✓ Agent message correctly added to workflow{Style.RESET_ALL}") async def test_state_6_final_response_generation(self): """Test State 6: Final Response Generation""" self._print_test_info(6, "Final Response Generation", "Creating the final user-facing response message") # Create a test workflow workflow = self._create_test_workflow() # Set up test data obj_user_response = "I will process your request" obj_final_documents = ["report.txt"] # Create a test document result doc_result = self._create_test_document("report.txt", "Report content") obj_results = [doc_result] print(f"{Fore.YELLOW}➤ Generating final message...{Style.RESET_ALL}") # Generate final message final_message = await self.manager.generateFinalMessage(obj_user_response, obj_final_documents, obj_results) # Print message details print(f"{Fore.GREEN}✓ Final message generated:{Style.RESET_ALL}") print(f" Role: {final_message.get('role', 'N/A')}") print(f" Agent: {final_message.get('agentName', 'N/A')}") print(f" Content: {final_message.get('content', 'N/A')[:50]}...") # Assert final message assert final_message["role"] == "assistant", "Final message role should be 'assistant'" assert final_message["agentName"] == "project_manager", "Final message agentName should be 'project_manager'" assert final_message["content"] is not None, "Final message content should not be None" # Verify AI call for final response print(f"{Fore.YELLOW}➤ Verifying AI service call...{Style.RESET_ALL}") self.mydom_mock.callAi.assert_called_once() print(f"{Fore.GREEN}✓ AI service correctly called for final response{Style.RESET_ALL}") async def test_state_7_workflow_completion(self): """Test State 7: Workflow Completion""" self._print_test_info(7, "Workflow Completion", "Finalizing workflow and setting status to completed") # Create a test workflow workflow = self._create_test_workflow() print(f"{Fore.YELLOW}➤ Finishing workflow...{Style.RESET_ALL}") # Finalize the workflow result = self.manager.workflowFinish(workflow) # Print result details print(f"{Fore.GREEN}✓ Workflow finished:{Style.RESET_ALL}") print(f" Status: {result.get('status', 'N/A')}") print(f" Last activity: {result.get('lastActivity', 'N/A')[:19]}") # Assert workflow state assert result["status"] == "completed", "Workflow status should be 'completed'" # Verify database update print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}") self.mydom_mock.updateWorkflow.assert_called_once() print(f"{Fore.GREEN}✓ Database correctly updated with completion status{Style.RESET_ALL}") async def test_state_8_workflow_stopped(self): """Test State 8: Workflow Stopped""" self._print_test_info(8, "Workflow Stopped", "Testing the workflow stop function") # Create a test workflow workflow = self._create_test_workflow() # Configure mock self.mydom_mock.loadWorkflowState.return_value = workflow print(f"{Fore.YELLOW}➤ Stopping workflow...{Style.RESET_ALL}") # Stop the workflow result = await self.manager.workflowStop(workflow["id"]) # Print result details print(f"{Fore.GREEN}✓ Workflow stopped:{Style.RESET_ALL}") print(f" Status: {result.get('status', 'N/A')}") # Assert workflow state assert result["status"] == "stopped", "Workflow status should be 'stopped'" # Verify database update print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}") self.mydom_mock.updateWorkflow.assert_called_once() print(f"{Fore.GREEN}✓ Database correctly updated with stopped status{Style.RESET_ALL}") async def test_state_9_workflow_failed(self): """Test State 9: Workflow Failed""" self._print_test_info(9, "Workflow Failed", "Testing agent failure handling") # Create a test workflow workflow = self._create_test_workflow() # Introduce a failing agent failing_agent = MagicMock() async def fail_task(*args, **kwargs): raise ValueError("Agent failure") failing_agent.processTask = AsyncMock(side_effect=fail_task) failing_agent.mydom = self.mydom_mock self.registry_mock.getAgent.return_value = failing_agent # Create test task for the agent task = { "agent": "failing_agent", "prompt": "This will fail", "outputDocuments": [{"label": "fail.txt", "prompt": ""}], "inputDocuments": [] } print(f"{Fore.YELLOW}➤ Executing failing agent task...{Style.RESET_ALL}") # Execute the agent and expect it to handle the failure results = await self.manager.agentProcessing(task, workflow) # Print results print(f"{Fore.GREEN}✓ Agent failure handled correctly:{Style.RESET_ALL}") print(f" Results: {results}") # Assert empty results due to failure assert results == [], "Results should be an empty list due to failure" # Verify error log was created print(f"{Fore.YELLOW}➤ Verifying error logging...{Style.RESET_ALL}") self.mydom_mock.createWorkflowLog.assert_called() print(f"{Fore.GREEN}✓ Error correctly logged{Style.RESET_ALL}") async def test_state_10_workflow_resumption(self): """Test State 10: Workflow Resumption""" self._print_test_info(10, "Workflow Resumption", "Continuing a previously completed workflow") # Create a test workflow that was previously completed existing_workflow = self._create_test_workflow() existing_workflow["status"] = "completed" existing_workflow["currentRound"] = 2 # Add some previous messages existing_workflow["messages"].append(self._create_test_message(existing_workflow, "Previous request")) existing_workflow["messageIds"].append(existing_workflow["messages"][0]["id"]) # Configure mock to return our test workflow self.mydom_mock.getWorkflow.return_value = existing_workflow print(f"{Fore.YELLOW}➤ Resuming workflow...{Style.RESET_ALL}") # Resume the workflow with a new message workflow = self.manager.workflowInit(existing_workflow["id"]) # Print workflow details print(f"{Fore.GREEN}✓ Workflow resumed:{Style.RESET_ALL}") print(f" ID: {workflow.get('id', 'N/A')}") print(f" Status: {workflow.get('status', 'N/A')}") print(f" Round: {workflow.get('currentRound', 'N/A')}") # Assert workflow state assert workflow["id"] == existing_workflow["id"], "Workflow ID should match existing ID" assert workflow["status"] == "running", "Workflow status should be 'running'" assert workflow["currentRound"] == 3, "Workflow round should be incremented to 3" # Verify database update print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}") self.mydom_mock.updateWorkflow.assert_called_once() print(f"{Fore.GREEN}✓ Database correctly updated with new round{Style.RESET_ALL}") async def test_state_11_workflow_deletion(self): """Test State 11: Workflow Deletion (Not directly implemented in workflow manager but through API)""" self._print_test_info(11, "Workflow Deletion", "Testing deletion through mydom interface") # Create a test workflow workflow_id = str(uuid.uuid4()) # Configure mock self.mydom_mock.getWorkflow.return_value = {"id": workflow_id, "userId": self.userId} self.mydom_mock.deleteWorkflow.return_value = True print(f"{Fore.YELLOW}➤ Deleting workflow...{Style.RESET_ALL}") # Delete the workflow through mydom result = self.mydom_mock.deleteWorkflow(workflow_id) # Print result print(f"{Fore.GREEN}✓ Workflow deletion result: {result}{Style.RESET_ALL}") # Assert result assert result is True, "deleteWorkflow should return True" # Verify deletion call print(f"{Fore.YELLOW}➤ Verifying deletion call...{Style.RESET_ALL}") self.mydom_mock.deleteWorkflow.assert_called_once_with(workflow_id) print(f"{Fore.GREEN}✓ Workflow correctly deleted{Style.RESET_ALL}") # ------------------------------------------------------------------------ # INTEGRATION TESTS # ------------------------------------------------------------------------ async def test_full_workflow_cycle(self): """Test a complete workflow cycle from start to finish""" print(f"\n{Fore.CYAN}{Style.BRIGHT}INTEGRATION TEST: Full Workflow Cycle{Style.RESET_ALL}") print(f"{Fore.WHITE}This test simulates a complete workflow from start to finish{Style.RESET_ALL}") print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}") # Configure mocks for a successful flow print(f"{Fore.YELLOW}➤ Setting up test environment...{Style.RESET_ALL}") self._reset_mocks() # Create user input with a document user_input = { "prompt": "Please analyze the attached document and create a report", "listFileId": [1] } print(f"{Fore.GREEN}✓ Created user input with document request{Style.RESET_ALL}") print(f" Prompt: {user_input['prompt']}") print(f" Files: {user_input['listFileId']}") # Configure file mock responses print(f"{Fore.YELLOW}➤ Configuring mock files...{Style.RESET_ALL}") self.mydom_mock.getFile.return_value = {"name": "source.txt", "mandateId": self.mandateId} self.mydom_mock.getFileData.return_value = b"Source content for analysis" self.mydom_mock.saveUploadedFile.return_value = {"id": 2, "name": "result.txt"} print(f"{Fore.GREEN}✓ Mock files configured{Style.RESET_ALL}") # Start a new workflow print(f"\n{Fore.YELLOW}➤ Starting workflow...{Style.RESET_ALL}") workflow = await self.manager.workflowStart(user_input) print(f"{Fore.GREEN}✓ Workflow started with ID: {workflow['id']}{Style.RESET_ALL}") # Wait for async processing to complete print(f"{Fore.YELLOW}➤ Waiting for async processing...{Style.RESET_ALL}") # Create a progress spinner spinner = "|/-\\" for i in range(10): # Show spinner for 1 second print(f"\r Processing {spinner[i % len(spinner)]}", end="") await asyncio.sleep(0.1) print("\r Processing complete! ") # Visualize the workflow state transitions print(f"\n{Fore.CYAN}Workflow State Transitions:{Style.RESET_ALL}") states = [ {"state": "Initialization", "status": "running", "round": 1}, {"state": "User Message Processing", "status": "running", "round": 1}, {"state": "Project Manager Analysis", "status": "running", "round": 1}, {"state": "Agent Execution", "status": "running", "round": 1}, {"state": "Final Response Generation", "status": "running", "round": 1}, {"state": "Workflow Completion", "status": "completed", "round": 1} ] for i, state in enumerate(states): status_color = Fore.GREEN if state["status"] == "completed" else Fore.YELLOW print(f" {i+1}. {state['state']}: {status_color}{state['status']}{Style.RESET_ALL}") # Verify start state print(f"\n{Fore.YELLOW}➤ Verifying workflow state...{Style.RESET_ALL}") print(f" Initial status: {Fore.YELLOW}{workflow['status']}{Style.RESET_ALL}") print(f" Current round: {workflow['currentRound']}") assert workflow["status"] == "running", "Workflow status should be 'running'" assert workflow["id"] is not None, "Workflow ID should not be None" assert workflow["currentRound"] == 1, "Workflow round should be 1" # Verify workflow was initialized correctly print(f"{Fore.YELLOW}➤ Verifying database interactions...{Style.RESET_ALL}") self.mydom_mock.createWorkflow.assert_called_once() print(f"{Fore.GREEN}✓ Workflow correctly initialized in database{Style.RESET_ALL}") async def test_workflow_with_exception(self): """Test workflow handling an exception during processing""" print(f"\n{Fore.CYAN}{Style.BRIGHT}INTEGRATION TEST: Workflow Exception Handling{Style.RESET_ALL}") print(f"{Fore.WHITE}This test simulates a workflow with an exception during processing{Style.RESET_ALL}") print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}") # Configure mocks for an exception scenario print(f"{Fore.YELLOW}➤ Setting up exception scenario...{Style.RESET_ALL}") self._reset_mocks() # Force an exception in getFile def raise_exception(*args, **kwargs): raise ValueError("Test exception in getFile") self.mydom_mock.getFile.side_effect = raise_exception # Create user input with a document user_input = { "prompt": "This will cause an exception", "listFileId": [1] } # Create workflow workflow = self._create_test_workflow() print(f"{Fore.YELLOW}➤ Processing workflow with exception...{Style.RESET_ALL}") # Process with exception result = await self.manager.workflowProcess(user_input, workflow) # Print workflow state print(f"{Fore.GREEN}✓ Exception handled:{Style.RESET_ALL}") print(f" Status: {result.get('status', 'N/A')}") # Verify failure state assert result["status"] == "failed", "Workflow status should be 'failed'" # Verify error log print(f"{Fore.YELLOW}➤ Verifying error logging...{Style.RESET_ALL}") self.mydom_mock.createWorkflowLog.assert_called() print(f"{Fore.GREEN}✓ Error correctly logged{Style.RESET_ALL}") # Verify database update print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}") self.mydom_mock.updateWorkflow.assert_called() print(f"{Fore.GREEN}✓ Database correctly updated with failure status{Style.RESET_ALL}") async def test_workflow_stop_during_processing(self): """Test stopping a workflow during processing""" print(f"\n{Fore.CYAN}{Style.BRIGHT}INTEGRATION TEST: Workflow Stop During Processing{Style.RESET_ALL}") print(f"{Fore.WHITE}This test simulates stopping a workflow during processing{Style.RESET_ALL}") print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}") # Create a test workflow that is running workflow = self._create_test_workflow() # Configure mock self.mydom_mock.loadWorkflowState.return_value = workflow print(f"{Fore.YELLOW}➤ Stopping workflow during processing...{Style.RESET_ALL}") # Stop the workflow stopped_workflow = await self.manager.workflowStop(workflow["id"]) # Print workflow state print(f"{Fore.GREEN}✓ Workflow stopped:{Style.RESET_ALL}") print(f" Status: {stopped_workflow.get('status', 'N/A')}") # Verify stopped state assert stopped_workflow["status"] == "stopped", "Workflow status should be 'stopped'" # Verify database update print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}") self.mydom_mock.updateWorkflow.assert_called_once() print(f"{Fore.GREEN}✓ Database correctly updated with stopped status{Style.RESET_ALL}") # ------------------------------------------------------------------------ # UTILITY FUNCTION TESTS # ------------------------------------------------------------------------ def test_parse_json_response(self): """Test JSON response parsing""" print(f"\n{Fore.CYAN}{Style.BRIGHT}UTILITY TEST: JSON Response Parsing{Style.RESET_ALL}") print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}") # Test with clean JSON print(f"{Fore.YELLOW}➤ Testing with clean JSON...{Style.RESET_ALL}") clean_json = '{"key": "value", "number": 123}' result = self.manager.parseJsonResponse(clean_json) print(f"{Fore.GREEN}✓ Clean JSON result:{Style.RESET_ALL}") print(f" key: {result.get('key', 'N/A')}") print(f" number: {result.get('number', 'N/A')}") assert result["key"] == "value", "Clean JSON parsing should extract key value" assert result["number"] == 123, "Clean JSON parsing should extract number value" # Test with JSON embedded in text print(f"\n{Fore.YELLOW}➤ Testing with JSON embedded in text...{Style.RESET_ALL}") text_with_json = 'Some text before {"key": "value"} and after' result = self.manager.parseJsonResponse(text_with_json) print(f"{Fore.GREEN}✓ Embedded JSON result:{Style.RESET_ALL}") print(f" key: {result.get('key', 'N/A')}") assert result["key"] == "value", "Embedded JSON parsing should extract key value" # Test with invalid JSON print(f"\n{Fore.YELLOW}➤ Testing with invalid JSON...{Style.RESET_ALL}") invalid_json = 'Not a JSON at all' result = self.manager.parseJsonResponse(invalid_json) print(f"{Fore.GREEN}✓ Invalid JSON fallback result:{Style.RESET_ALL}") print(f" objWorkplan: {result.get('objWorkplan', 'N/A')}") # Should return a fallback structure assert "objFinalDocuments" in result, "Invalid JSON should return fallback with objFinalDocuments" assert "objWorkplan" in result, "Invalid JSON should return fallback with objWorkplan" assert "objUserResponse" in result, "Invalid JSON should return fallback with objUserResponse" print(f"{Fore.GREEN}✓ All JSON parsing scenarios handled correctly{Style.RESET_ALL}") def test_get_filename(self): """Test filename extraction from document""" print(f"\n{Fore.CYAN}{Style.BRIGHT}UTILITY TEST: Filename Extraction{Style.RESET_ALL}") print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}") # Test with extension print(f"{Fore.YELLOW}➤ Testing with extension...{Style.RESET_ALL}") document = {"name": "test", "ext": "txt"} filename = self.manager.getFilename(document) print(f"{Fore.GREEN}✓ Filename with extension result:{Style.RESET_ALL}") print(f" Result: {filename}") assert filename == "test.txt", "Filename should be 'test.txt'" # Test with no extension print(f"\n{Fore.YELLOW}➤ Testing with no extension...{Style.RESET_ALL}") document = {"name": "test", "ext": ""} filename = self.manager.getFilename(document) print(f"{Fore.GREEN}✓ Filename without extension result:{Style.RESET_ALL}") print(f" Result: {filename}") assert filename == "test", "Filename should be 'test'" print(f"{Fore.GREEN}✓ All filename extraction scenarios handled correctly{Style.RESET_ALL}") def test_get_available_documents(self): """Test getting available documents from workflow""" print(f"\n{Fore.CYAN}{Style.BRIGHT}UTILITY TEST: Get Available Documents{Style.RESET_ALL}") print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}") # Create a test workflow with messages containing documents workflow = self._create_test_workflow() # Add user message with document print(f"{Fore.YELLOW}➤ Creating user message with document...{Style.RESET_ALL}") user_message = self._create_test_message(workflow, "Message with document") user_message["documents"] = [self._create_test_document("user_doc.txt")] workflow["messages"].append(user_message) # Add assistant message with document print(f"{Fore.YELLOW}➤ Creating assistant message with document...{Style.RESET_ALL}") assistant_message = self._create_test_message(workflow, "Response with document", "assistant") assistant_message["documents"] = [self._create_test_document("assistant_doc.txt")] workflow["messages"].append(assistant_message) print(f"{Fore.YELLOW}➤ Getting available documents...{Style.RESET_ALL}") # Get available documents available_docs = self.manager.getAvailableDocuments(workflow, user_message) # Print results print(f"{Fore.GREEN}✓ Available documents result:{Style.RESET_ALL}") print(f" Count: {len(available_docs)}") for i, doc in enumerate(available_docs): print(f" {i+1}. {doc.get('label', 'N/A')} ({doc.get('fileSource', 'N/A')})") # Verify result assert len(available_docs) == 2, "Available documents should have 2 entries" # Should be sorted newest first assert available_docs[0]["label"] == "assistant_doc.txt", "First document should be assistant_doc.txt" assert available_docs[1]["label"] == "user_doc.txt", "Second document should be user_doc.txt" # User's document should be marked as from user assert available_docs[1]["fileSource"] == "user", "User document should have fileSource='user'" print(f"{Fore.GREEN}✓ Available documents correctly identified and sorted{Style.RESET_ALL}") # Simple test runner if __name__ == "__main__": # Import the runner and execute tests try: sys.path.append(os.path.dirname(os.path.abspath(__file__))) from tool_testBackendSingle import run_tests asyncio.run(run_tests()) except ImportError: print(f"{Fore.YELLOW}Please use tool_testBackendSingle.py to run the tests properly{Style.RESET_ALL}")