1064 lines
No EOL
49 KiB
Python
1064 lines
No EOL
49 KiB
Python
"""
|
|
Test Module for Workflow State Machine
|
|
|
|
This script tests each state of the workflow state machine implementation
|
|
from initialization to completion, including error scenarios.
|
|
|
|
Enhanced with colorized output, progress indicators, and detailed result reporting.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import uuid
|
|
import json
|
|
import base64
|
|
import asyncio
|
|
import unittest
|
|
from unittest.mock import patch, MagicMock, AsyncMock
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Any
|
|
import time
|
|
import traceback
|
|
|
|
# Try to import colorama, install if not available
|
|
try:
|
|
from colorama import init, Fore, Back, Style
|
|
init() # Initialize colorama
|
|
except ImportError:
|
|
print("Installing required package: colorama")
|
|
import subprocess
|
|
subprocess.call([sys.executable, "-m", "pip", "install", "colorama"])
|
|
from colorama import init, Fore, Back, Style
|
|
init() # Initialize colorama
|
|
|
|
# Add parent directory to path for imports if needed
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
parent_dir = os.path.dirname(current_dir)
|
|
if parent_dir not in sys.path:
|
|
sys.path.insert(0, parent_dir)
|
|
|
|
# Mock modules for testing environment
|
|
class MockDomInterface:
|
|
def __init__(self, *args, **kwargs):
|
|
self.getWorkflow = MagicMock(return_value=None)
|
|
self.loadWorkflowState = MagicMock(return_value=None)
|
|
self.createWorkflow = MagicMock()
|
|
self.updateWorkflow = MagicMock()
|
|
self.createWorkflowLog = MagicMock()
|
|
self.createWorkflowMessage = MagicMock()
|
|
self.getFile = MagicMock()
|
|
self.getFileData = MagicMock()
|
|
self.saveUploadedFile = MagicMock()
|
|
self.userLanguage = "en"
|
|
self.callAi = AsyncMock()
|
|
self.setUserLanguage = MagicMock()
|
|
|
|
def reset_mock(self):
|
|
"""Reset all mocks in this interface"""
|
|
for attr_name in dir(self):
|
|
attr = getattr(self, attr_name)
|
|
if hasattr(attr, 'reset_mock'):
|
|
attr.reset_mock()
|
|
|
|
class MockAgentRegistry:
|
|
def __init__(self):
|
|
self.getAgent = MagicMock()
|
|
self.getAgentInfos = MagicMock(return_value=[
|
|
{"name": "test_agent", "description": "Test agent", "capabilities": ["text_processing"]}
|
|
])
|
|
self.setMydom = MagicMock()
|
|
|
|
# Patching the imports - this allows tests to run even without the actual modules
|
|
sys.modules['modules.lucydomInterface'] = MagicMock()
|
|
sys.modules['modules.lucydomInterface'].getLucydomInterface = MagicMock(return_value=MockDomInterface())
|
|
sys.modules['gateway.modules.workflowAgentsRegistry'] = MagicMock()
|
|
sys.modules['gateway.modules.workflowAgentsRegistry'].getAgentRegistry = MagicMock(return_value=MockAgentRegistry())
|
|
sys.modules['modules.documentProcessor'] = MagicMock()
|
|
sys.modules['modules.documentProcessor'].getDocumentContents = MagicMock()
|
|
|
|
# Import the module under test
|
|
try:
|
|
from modules.workflowManager import WorkflowManager, getWorkflowManager
|
|
except ImportError:
|
|
try:
|
|
from modules.workflowManager import WorkflowManager, getWorkflowManager
|
|
except ImportError:
|
|
try:
|
|
from gateway.modules.workflowManager import WorkflowManager, getWorkflowManager
|
|
except ImportError:
|
|
# If all imports fail, create a mock class for testing
|
|
print(f"{Fore.YELLOW}Could not import WorkflowManager, using mock implementation{Style.RESET_ALL}")
|
|
class WorkflowManager:
|
|
def __init__(self, mandateId, userId):
|
|
self.mandateId = mandateId
|
|
self.userId = userId
|
|
self.mydom = MockDomInterface(mandateId, userId)
|
|
self.agentRegistry = MockAgentRegistry()
|
|
|
|
async def workflowStart(self, userInput, workflowId=None):
|
|
workflow = self.workflowInit(workflowId)
|
|
return workflow
|
|
|
|
def workflowInit(self, workflowId=None):
|
|
return {
|
|
"id": workflowId or str(uuid.uuid4()),
|
|
"mandateId": self.mandateId,
|
|
"userId": self.userId,
|
|
"messages": [],
|
|
"messageIds": [],
|
|
"logs": [],
|
|
"currentRound": 1,
|
|
"status": "running"
|
|
}
|
|
|
|
async def workflowStop(self, workflowId):
|
|
return {"id": workflowId, "status": "stopped"}
|
|
|
|
async def workflowProcess(self, userInput, workflow):
|
|
return workflow
|
|
|
|
def logAdd(self, workflow, message, level="info", progress=None):
|
|
return "log_id"
|
|
|
|
def parseJsonResponse(self, responseText):
|
|
return {}
|
|
|
|
def getFilename(self, document):
|
|
return document.get("name", "") + "." + document.get("ext", "")
|
|
|
|
def getWorkflowManager(mandateId=0, userId=0):
|
|
return WorkflowManager(mandateId, userId)
|
|
|
|
|
|
class TestWorkflowStateMachine:
|
|
"""Test case for workflow state machine implementation"""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment"""
|
|
self.mandateId = 1
|
|
self.userId = 1
|
|
|
|
# Create mocks
|
|
self._setup_mocks()
|
|
|
|
# Create manager with mocked dependencies
|
|
self.manager = self._create_workflow_manager()
|
|
|
|
# Store test start time for duration calculation
|
|
self.test_start_time = time.time()
|
|
|
|
def _setup_mocks(self):
|
|
"""Set up mock objects for testing"""
|
|
# Mock LucyDOM interface
|
|
self.mydom_mock = MockDomInterface()
|
|
self.mydom_mock.callAi = AsyncMock()
|
|
self.mydom_mock.loadWorkflowState.return_value = None # Default to no existing workflow
|
|
self.mydom_mock.getWorkflow.return_value = None # Default to no existing workflow
|
|
self.mydom_mock.userLanguage = "en" # Default language
|
|
|
|
# Mock AgentRegistry
|
|
self.registry_mock = MockAgentRegistry()
|
|
test_agent = MagicMock()
|
|
test_agent.processTask = AsyncMock()
|
|
test_agent.mydom = self.mydom_mock
|
|
self.registry_mock.getAgent.return_value = test_agent
|
|
self.registry_mock.getAgentInfos.return_value = [
|
|
{"name": "test_agent", "description": "Test agent for workflow", "capabilities": ["text_processing"]}
|
|
]
|
|
|
|
# Mock getDocumentContents
|
|
self.getDocumentContents_mock = MagicMock()
|
|
self.getDocumentContents_mock.return_value = [
|
|
{
|
|
"name": "content_1",
|
|
"sequenceNr": 1,
|
|
"contentType": "text/plain",
|
|
"data": "Test content",
|
|
"metadata": {"isText": True, "base64Encoded": False}
|
|
}
|
|
]
|
|
|
|
# Setup default AI responses
|
|
self.ai_responses = {
|
|
"project_manager": json.dumps({
|
|
"objFinalDocuments": ["result.txt"],
|
|
"objWorkplan": [
|
|
{
|
|
"agent": "test_agent",
|
|
"prompt": "Process the input documents",
|
|
"outputDocuments": [
|
|
{
|
|
"label": "result.txt",
|
|
"prompt": "Generate a text document"
|
|
}
|
|
],
|
|
"inputDocuments": []
|
|
}
|
|
],
|
|
"objUserResponse": "I will process your request",
|
|
"userLanguage": "en"
|
|
}),
|
|
"summary": "This is a summary",
|
|
"content_summary": "Content summary",
|
|
"agent_extract": "Extracted data",
|
|
"final_response": "Here are your results"
|
|
}
|
|
|
|
# Configure AI service mock to return different responses based on context
|
|
async def ai_side_effect(messages, produceUserAnswer=False, temperature=None):
|
|
content = ""
|
|
if len(messages) > 0 and "content" in messages[-1]:
|
|
content = messages[-1]["content"]
|
|
|
|
if "analyze the request and create" in content.lower():
|
|
return self.ai_responses["project_manager"]
|
|
elif "summarize this" in content.lower():
|
|
return self.ai_responses["content_summary"]
|
|
elif "review the promised" in content.lower() or "final" in content.lower():
|
|
return self.ai_responses["final_response"]
|
|
elif "extracting information" in content.lower():
|
|
return self.ai_responses["agent_extract"]
|
|
else:
|
|
return self.ai_responses["summary"]
|
|
|
|
self.mydom_mock.callAi.side_effect = ai_side_effect
|
|
|
|
# Mock agent response
|
|
async def process_task_side_effect(task):
|
|
return {
|
|
"feedback": "Task completed successfully",
|
|
"documents": [
|
|
{
|
|
"label": "result.txt",
|
|
"content": "Generated content"
|
|
}
|
|
]
|
|
}
|
|
|
|
test_agent.processTask.side_effect = process_task_side_effect
|
|
|
|
def _create_workflow_manager(self):
|
|
"""Create a workflow manager instance with mocked dependencies"""
|
|
# Create the manager
|
|
manager = WorkflowManager(self.mandateId, self.userId)
|
|
|
|
# Inject mocks
|
|
manager.mydom = self.mydom_mock
|
|
manager.agentRegistry = self.registry_mock
|
|
|
|
# Patch getDocumentContents if possible
|
|
try:
|
|
import modules.documentProcessor
|
|
modules.documentProcessor.getDocumentContents = self.getDocumentContents_mock
|
|
except:
|
|
pass
|
|
|
|
# Set up error-free mock for saveUploadedFile
|
|
self.mydom_mock.saveUploadedFile.return_value = {"id": 1, "name": "test.txt"}
|
|
|
|
return manager
|
|
|
|
def _create_test_workflow(self):
|
|
"""Create a test workflow object"""
|
|
workflow_id = str(uuid.uuid4())
|
|
current_time = datetime.now().isoformat()
|
|
|
|
return {
|
|
"id": workflow_id,
|
|
"mandateId": self.mandateId,
|
|
"userId": self.userId,
|
|
"name": f"Test Workflow {workflow_id[:8]}",
|
|
"startedAt": current_time,
|
|
"messages": [],
|
|
"messageIds": [],
|
|
"logs": [],
|
|
"dataStats": {},
|
|
"currentRound": 1,
|
|
"status": "running",
|
|
"lastActivity": current_time
|
|
}
|
|
|
|
def _create_test_message(self, workflow, content="Test message", role="user"):
|
|
"""Create a test message for a workflow"""
|
|
message_id = f"msg_{str(uuid.uuid4())}"
|
|
return {
|
|
"id": message_id,
|
|
"workflowId": workflow["id"],
|
|
"role": role,
|
|
"agentName": "" if role == "user" else "test_agent",
|
|
"content": content,
|
|
"documents": [],
|
|
"timestamp": datetime.now().isoformat(),
|
|
"sequenceNo": len(workflow.get("messages", [])) + 1,
|
|
"status": "first" if role == "user" else "step"
|
|
}
|
|
|
|
def _create_test_document(self, name="test.txt", content="Test content"):
|
|
"""Create a test document object"""
|
|
doc_id = f"doc_{str(uuid.uuid4())}"
|
|
return {
|
|
"id": doc_id,
|
|
"fileId": 1,
|
|
"name": os.path.splitext(name)[0],
|
|
"ext": os.path.splitext(name)[1][1:] if os.path.splitext(name)[1] else "txt",
|
|
"data": base64.b64encode(content.encode('utf-8')).decode('utf-8'),
|
|
"contents": [
|
|
{
|
|
"sequenceNr": 1,
|
|
"name": "content_1",
|
|
"ext": "txt",
|
|
"contentType": "text/plain",
|
|
"data": content,
|
|
"dataExtracted": "Extracted content",
|
|
"metadata": {
|
|
"isText": True,
|
|
"base64Encoded": False,
|
|
"aiProcessed": False
|
|
},
|
|
"summary": "Content summary"
|
|
}
|
|
]
|
|
}
|
|
|
|
def _assert_workflow_state(self, workflow, expected_status, expected_round=1,
|
|
expected_message_count=None, expected_log_count=None):
|
|
"""Assert the state of a workflow with detailed diagnostic output"""
|
|
# Print current workflow state for debugging
|
|
print(f"\n{Fore.CYAN}Workflow State Verification:{Style.RESET_ALL}")
|
|
print(f" Status: {Fore.YELLOW}{workflow.get('status', 'N/A')}{Style.RESET_ALL} (Expected: {Fore.GREEN}{expected_status}{Style.RESET_ALL})")
|
|
print(f" Round: {Fore.YELLOW}{workflow.get('currentRound', 'N/A')}{Style.RESET_ALL} (Expected: {Fore.GREEN}{expected_round}{Style.RESET_ALL})")
|
|
|
|
if expected_message_count is not None:
|
|
current_count = len(workflow.get("messages", []))
|
|
print(f" Messages: {Fore.YELLOW}{current_count}{Style.RESET_ALL} (Expected: {Fore.GREEN}{expected_message_count}{Style.RESET_ALL})")
|
|
|
|
if expected_log_count is not None:
|
|
current_count = len(workflow.get("logs", []))
|
|
print(f" Logs: {Fore.YELLOW}{current_count}{Style.RESET_ALL} (Expected: {Fore.GREEN}{expected_log_count}{Style.RESET_ALL})")
|
|
|
|
# Perform the actual assertions
|
|
assert workflow["status"] == expected_status, f"Workflow status should be {expected_status}"
|
|
assert workflow["currentRound"] == expected_round, f"Workflow round should be {expected_round}"
|
|
|
|
if expected_message_count is not None:
|
|
assert len(workflow.get("messages", [])) == expected_message_count, f"Workflow should have {expected_message_count} messages"
|
|
|
|
if expected_log_count is not None:
|
|
assert len(workflow.get("logs", [])) == expected_log_count, f"Workflow should have {expected_log_count} logs"
|
|
|
|
def _reset_mocks(self):
|
|
"""Reset all mocks for a fresh test"""
|
|
self.mydom_mock.reset_mock()
|
|
self.registry_mock.reset_mock()
|
|
self.getDocumentContents_mock.reset_mock()
|
|
|
|
# ------------------------------------------------------------------------
|
|
# TESTS FOR EACH STATE
|
|
# ------------------------------------------------------------------------
|
|
|
|
def _print_test_info(self, state_num, state_name, description=None):
|
|
"""Print formatted information about the current test state"""
|
|
print(f"\n{Fore.CYAN}{Style.BRIGHT}STATE {state_num}: {state_name}{Style.RESET_ALL}")
|
|
if description:
|
|
print(f"{Fore.WHITE}{description}{Style.RESET_ALL}")
|
|
print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}")
|
|
|
|
async def test_state_1_workflow_initialization_new(self):
|
|
"""Test State 1: Workflow Initialization (new workflow)"""
|
|
self._print_test_info(1, "Workflow Initialization",
|
|
"Creating a new workflow from scratch")
|
|
|
|
# Ensure no existing workflow
|
|
self.mydom_mock.getWorkflow.return_value = None
|
|
self.mydom_mock.loadWorkflowState.return_value = None
|
|
|
|
print(f"{Fore.YELLOW}➤ Initializing new workflow...{Style.RESET_ALL}")
|
|
# Initialize a new workflow
|
|
workflow = self.manager.workflowInit()
|
|
|
|
# Print workflow details
|
|
print(f"{Fore.GREEN}✓ Workflow created:{Style.RESET_ALL}")
|
|
print(f" ID: {workflow.get('id', 'N/A')}")
|
|
print(f" Mandate: {workflow.get('mandateId', 'N/A')}")
|
|
print(f" User: {workflow.get('userId', 'N/A')}")
|
|
print(f" Round: {workflow.get('currentRound', 'N/A')}")
|
|
print(f" Status: {workflow.get('status', 'N/A')}")
|
|
|
|
# Assert workflow state
|
|
print(f"{Fore.YELLOW}➤ Validating workflow state...{Style.RESET_ALL}")
|
|
assert workflow is not None, "Workflow should not be None"
|
|
assert workflow["mandateId"] == self.mandateId, f"Workflow mandate ID should be {self.mandateId}"
|
|
assert workflow["userId"] == self.userId, f"Workflow user ID should be {self.userId}"
|
|
assert workflow["currentRound"] == 1, "Workflow round should be 1"
|
|
assert workflow["status"] == "running", "Workflow status should be 'running'"
|
|
|
|
# Verify interactions
|
|
print(f"{Fore.YELLOW}➤ Verifying database interactions...{Style.RESET_ALL}")
|
|
self.mydom_mock.createWorkflow.assert_called_once()
|
|
self.mydom_mock.getWorkflow.assert_called_once()
|
|
print(f"{Fore.GREEN}✓ Database correctly updated{Style.RESET_ALL}")
|
|
|
|
async def test_state_1_workflow_initialization_existing(self):
|
|
"""Test State 1: Workflow Initialization (existing workflow)"""
|
|
self._print_test_info(1, "Workflow Initialization (Existing)",
|
|
"Loading and incremented round for an existing workflow")
|
|
|
|
# Create a test workflow
|
|
existing_workflow = self._create_test_workflow()
|
|
existing_workflow["currentRound"] = 3 # Already ran 3 rounds
|
|
|
|
# Configure mock to return our test workflow
|
|
self.mydom_mock.getWorkflow.return_value = existing_workflow
|
|
|
|
print(f"{Fore.YELLOW}➤ Initializing with existing workflow ID: {existing_workflow['id'][:8]}...{Style.RESET_ALL}")
|
|
# Initialize with existing workflowId
|
|
workflow = self.manager.workflowInit(existing_workflow["id"])
|
|
|
|
# Assert workflow state
|
|
print(f"{Fore.GREEN}✓ Workflow loaded:{Style.RESET_ALL}")
|
|
print(f" ID: {workflow.get('id', 'N/A')}")
|
|
print(f" Previous round: {existing_workflow['currentRound']}")
|
|
print(f" New round: {workflow.get('currentRound', 'N/A')}")
|
|
print(f" Status: {workflow.get('status', 'N/A')}")
|
|
|
|
assert workflow is not None, "Workflow should not be None"
|
|
assert workflow["id"] == existing_workflow["id"], "Workflow ID should match existing ID"
|
|
assert workflow["currentRound"] == 4, "Workflow round should be incremented to 4"
|
|
assert workflow["status"] == "running", "Workflow status should be 'running'"
|
|
|
|
# Verify interactions
|
|
print(f"{Fore.YELLOW}➤ Verifying database interactions...{Style.RESET_ALL}")
|
|
self.mydom_mock.updateWorkflow.assert_called_once()
|
|
print(f"{Fore.GREEN}✓ Database correctly updated with incremented round{Style.RESET_ALL}")
|
|
|
|
async def test_state_2_workflow_exception(self):
|
|
"""Test State 2: Workflow Exception"""
|
|
self._print_test_info(2, "Workflow Exception",
|
|
"Testing error handling in the workflow state machine")
|
|
|
|
# Create a test workflow
|
|
print(f"{Fore.YELLOW}➤ Creating test workflow...{Style.RESET_ALL}")
|
|
workflow = self._create_test_workflow()
|
|
print(f"{Fore.GREEN}✓ Created workflow with ID: {workflow['id']}{Style.RESET_ALL}")
|
|
|
|
# Simulate an exception in workflow processing
|
|
print(f"{Fore.YELLOW}➤ Setting up exception scenario...{Style.RESET_ALL}")
|
|
def raise_exception(*args, **kwargs):
|
|
print(f"{Fore.RED} [INJECTED ERROR] Raising test exception{Style.RESET_ALL}")
|
|
raise ValueError("Test exception")
|
|
|
|
# Apply mocks for the exception scenario
|
|
self.mydom_mock.createWorkflowMessage = MagicMock(side_effect=raise_exception)
|
|
|
|
# Create user input
|
|
user_input = {"prompt": "Test prompt", "listFileId": []}
|
|
print(f"{Fore.GREEN}✓ Exception scenario prepared{Style.RESET_ALL}")
|
|
|
|
# Run workflow and expect exception to be caught
|
|
print(f"{Fore.YELLOW}➤ Running workflow with exception...{Style.RESET_ALL}")
|
|
try:
|
|
result = await self.manager.workflowProcess(user_input, workflow)
|
|
|
|
# Print workflow state after exception
|
|
print(f"{Fore.GREEN}✓ Exception handled correctly{Style.RESET_ALL}")
|
|
print(f" Workflow status: {Fore.YELLOW}{result['status']}{Style.RESET_ALL}")
|
|
|
|
# Print log entries
|
|
logs = result.get("logs", [])
|
|
if logs:
|
|
print(f" Error logs:")
|
|
for log in logs:
|
|
if "failed" in log.get("message", "").lower():
|
|
print(f" {Fore.RED}→ {log.get('message')}{Style.RESET_ALL}")
|
|
|
|
# Verify workflow is marked as failed
|
|
print(f"{Fore.YELLOW}➤ Validating workflow state after exception...{Style.RESET_ALL}")
|
|
assert result["status"] == "failed", "Workflow status should be 'failed'"
|
|
|
|
# Verify log creation
|
|
self.mydom_mock.createWorkflowLog.assert_called()
|
|
|
|
# Verify database update
|
|
print(f"{Fore.YELLOW}➤ Verifying database was updated correctly...{Style.RESET_ALL}")
|
|
self.mydom_mock.updateWorkflow.assert_called()
|
|
print(f"{Fore.GREEN}✓ Database correctly updated with failure status{Style.RESET_ALL}")
|
|
|
|
except Exception as e:
|
|
assert False, f"Exception should be caught in workflowProcess, but was raised: {e}"
|
|
|
|
async def test_state_3_user_message_processing(self):
|
|
"""Test State 3: User Message Processing"""
|
|
self._print_test_info(3, "User Message Processing",
|
|
"Processing user input into a workflow message with documents")
|
|
|
|
# Create a test workflow
|
|
workflow = self._create_test_workflow()
|
|
|
|
# Create test user input
|
|
user_input = {"prompt": "Please analyze this document", "listFileId": [1]}
|
|
|
|
# Configure file processing mock
|
|
test_document = self._create_test_document()
|
|
self.mydom_mock.getFile.return_value = {"name": "test.txt", "mandateId": self.mandateId}
|
|
self.mydom_mock.getFileData.return_value = b"Test content"
|
|
|
|
print(f"{Fore.YELLOW}➤ Processing user message...{Style.RESET_ALL}")
|
|
# Process user message
|
|
message = await self.manager.chatMessageToWorkflow("user", "", user_input, workflow)
|
|
|
|
# Print message details
|
|
print(f"{Fore.GREEN}✓ Message processed:{Style.RESET_ALL}")
|
|
print(f" Role: {message.get('role', 'N/A')}")
|
|
print(f" Content: {message.get('content', 'N/A')}")
|
|
print(f" Status: {message.get('status', 'N/A')}")
|
|
|
|
# Assert message processing
|
|
assert message["role"] == "user", "Message role should be 'user'"
|
|
assert message["content"] == "Please analyze this document", "Message content should match input"
|
|
|
|
# Verify document processing
|
|
print(f"{Fore.YELLOW}➤ Verifying document processing...{Style.RESET_ALL}")
|
|
self.mydom_mock.getFile.assert_called()
|
|
self.mydom_mock.getFileData.assert_called()
|
|
|
|
# Verify the message was added to the workflow
|
|
print(f"{Fore.YELLOW}➤ Verifying message added to workflow...{Style.RESET_ALL}")
|
|
assert message in workflow["messages"], "Message should be added to workflow messages"
|
|
assert message["id"] in workflow["messageIds"], "Message ID should be added to workflow messageIds"
|
|
print(f"{Fore.GREEN}✓ Message successfully added to workflow{Style.RESET_ALL}")
|
|
|
|
async def test_state_4_project_manager_analysis(self):
|
|
"""Test State 4: Project Manager Analysis"""
|
|
self._print_test_info(4, "Project Manager Analysis",
|
|
"Analyzing user request and planning the workflow")
|
|
|
|
# Create a test workflow
|
|
workflow = self._create_test_workflow()
|
|
|
|
# Create user message
|
|
user_message = self._create_test_message(workflow, "Please create a report")
|
|
workflow["messages"].append(user_message)
|
|
|
|
print(f"{Fore.YELLOW}➤ Running project manager analysis...{Style.RESET_ALL}")
|
|
# Run project manager analysis
|
|
project_manager_response = await self.manager.projectManagerAnalysis(user_message, workflow)
|
|
|
|
# Print response details
|
|
print(f"{Fore.GREEN}✓ Project manager analysis completed:{Style.RESET_ALL}")
|
|
print(f" Final docs: {project_manager_response.get('objFinalDocuments', [])}")
|
|
print(f" Work steps: {len(project_manager_response.get('objWorkplan', []))}")
|
|
print(f" User lang: {project_manager_response.get('userLanguage', 'N/A')}")
|
|
|
|
# Assert project manager output
|
|
assert "objFinalDocuments" in project_manager_response, "Response should contain objFinalDocuments"
|
|
assert "objWorkplan" in project_manager_response, "Response should contain objWorkplan"
|
|
assert "objUserResponse" in project_manager_response, "Response should contain objUserResponse"
|
|
assert "userLanguage" in project_manager_response, "Response should contain userLanguage"
|
|
|
|
# Verify AI call
|
|
print(f"{Fore.YELLOW}➤ Verifying AI service call...{Style.RESET_ALL}")
|
|
self.mydom_mock.callAi.assert_called_once()
|
|
print(f"{Fore.GREEN}✓ AI service correctly called for analysis{Style.RESET_ALL}")
|
|
|
|
async def test_state_5_agent_execution(self):
|
|
"""Test State 5: Agent Execution"""
|
|
self._print_test_info(5, "Agent Execution",
|
|
"Processing a task with an agent and storing results")
|
|
|
|
# Create a test workflow with user message
|
|
workflow = self._create_test_workflow()
|
|
user_message = self._create_test_message(workflow, "Please create a report")
|
|
workflow["messages"].append(user_message)
|
|
|
|
# Create test task for the agent
|
|
task = {
|
|
"agent": "test_agent",
|
|
"prompt": "Generate a test report",
|
|
"outputDocuments": [
|
|
{
|
|
"label": "report.txt",
|
|
"prompt": "Create a detailed report"
|
|
}
|
|
],
|
|
"inputDocuments": []
|
|
}
|
|
|
|
print(f"{Fore.YELLOW}➤ Executing agent task...{Style.RESET_ALL}")
|
|
# Execute the agent
|
|
results = await self.manager.agentProcessing(task, workflow)
|
|
|
|
# Print results
|
|
print(f"{Fore.GREEN}✓ Agent task executed:{Style.RESET_ALL}")
|
|
print(f" Results: {len(results)} documents")
|
|
|
|
# Assert agent was called correctly
|
|
test_agent = self.registry_mock.getAgent.return_value
|
|
test_agent.processTask.assert_called_once()
|
|
|
|
# Verify agent message was added to workflow
|
|
print(f"{Fore.YELLOW}➤ Verifying workflow state after agent execution...{Style.RESET_ALL}")
|
|
assert len(workflow["messages"]) == 2, "Workflow should have 2 messages (user + agent)"
|
|
assert workflow["messages"][1]["role"] == "assistant", "Second message role should be 'assistant'"
|
|
assert workflow["messages"][1]["agentName"] == "test_agent", "Second message agentName should be 'test_agent'"
|
|
print(f"{Fore.GREEN}✓ Agent message correctly added to workflow{Style.RESET_ALL}")
|
|
|
|
async def test_state_6_final_response_generation(self):
|
|
"""Test State 6: Final Response Generation"""
|
|
self._print_test_info(6, "Final Response Generation",
|
|
"Creating the final user-facing response message")
|
|
|
|
# Create a test workflow
|
|
workflow = self._create_test_workflow()
|
|
|
|
# Set up test data
|
|
obj_user_response = "I will process your request"
|
|
obj_final_documents = ["report.txt"]
|
|
|
|
# Create a test document result
|
|
doc_result = self._create_test_document("report.txt", "Report content")
|
|
obj_results = [doc_result]
|
|
|
|
print(f"{Fore.YELLOW}➤ Generating final message...{Style.RESET_ALL}")
|
|
# Generate final message
|
|
final_message = await self.manager.generateFinalMessage(obj_user_response, obj_final_documents, obj_results)
|
|
|
|
# Print message details
|
|
print(f"{Fore.GREEN}✓ Final message generated:{Style.RESET_ALL}")
|
|
print(f" Role: {final_message.get('role', 'N/A')}")
|
|
print(f" Agent: {final_message.get('agentName', 'N/A')}")
|
|
print(f" Content: {final_message.get('content', 'N/A')[:50]}...")
|
|
|
|
# Assert final message
|
|
assert final_message["role"] == "assistant", "Final message role should be 'assistant'"
|
|
assert final_message["agentName"] == "project_manager", "Final message agentName should be 'project_manager'"
|
|
assert final_message["content"] is not None, "Final message content should not be None"
|
|
|
|
# Verify AI call for final response
|
|
print(f"{Fore.YELLOW}➤ Verifying AI service call...{Style.RESET_ALL}")
|
|
self.mydom_mock.callAi.assert_called_once()
|
|
print(f"{Fore.GREEN}✓ AI service correctly called for final response{Style.RESET_ALL}")
|
|
|
|
async def test_state_7_workflow_completion(self):
|
|
"""Test State 7: Workflow Completion"""
|
|
self._print_test_info(7, "Workflow Completion",
|
|
"Finalizing workflow and setting status to completed")
|
|
|
|
# Create a test workflow
|
|
workflow = self._create_test_workflow()
|
|
|
|
print(f"{Fore.YELLOW}➤ Finishing workflow...{Style.RESET_ALL}")
|
|
# Finalize the workflow
|
|
result = self.manager.workflowFinish(workflow)
|
|
|
|
# Print result details
|
|
print(f"{Fore.GREEN}✓ Workflow finished:{Style.RESET_ALL}")
|
|
print(f" Status: {result.get('status', 'N/A')}")
|
|
print(f" Last activity: {result.get('lastActivity', 'N/A')[:19]}")
|
|
|
|
# Assert workflow state
|
|
assert result["status"] == "completed", "Workflow status should be 'completed'"
|
|
|
|
# Verify database update
|
|
print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}")
|
|
self.mydom_mock.updateWorkflow.assert_called_once()
|
|
print(f"{Fore.GREEN}✓ Database correctly updated with completion status{Style.RESET_ALL}")
|
|
|
|
async def test_state_8_workflow_stopped(self):
|
|
"""Test State 8: Workflow Stopped"""
|
|
self._print_test_info(8, "Workflow Stopped",
|
|
"Testing the workflow stop function")
|
|
|
|
# Create a test workflow
|
|
workflow = self._create_test_workflow()
|
|
|
|
# Configure mock
|
|
self.mydom_mock.loadWorkflowState.return_value = workflow
|
|
|
|
print(f"{Fore.YELLOW}➤ Stopping workflow...{Style.RESET_ALL}")
|
|
# Stop the workflow
|
|
result = await self.manager.workflowStop(workflow["id"])
|
|
|
|
# Print result details
|
|
print(f"{Fore.GREEN}✓ Workflow stopped:{Style.RESET_ALL}")
|
|
print(f" Status: {result.get('status', 'N/A')}")
|
|
|
|
# Assert workflow state
|
|
assert result["status"] == "stopped", "Workflow status should be 'stopped'"
|
|
|
|
# Verify database update
|
|
print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}")
|
|
self.mydom_mock.updateWorkflow.assert_called_once()
|
|
print(f"{Fore.GREEN}✓ Database correctly updated with stopped status{Style.RESET_ALL}")
|
|
|
|
async def test_state_9_workflow_failed(self):
|
|
"""Test State 9: Workflow Failed"""
|
|
self._print_test_info(9, "Workflow Failed",
|
|
"Testing agent failure handling")
|
|
|
|
# Create a test workflow
|
|
workflow = self._create_test_workflow()
|
|
|
|
# Introduce a failing agent
|
|
failing_agent = MagicMock()
|
|
async def fail_task(*args, **kwargs):
|
|
raise ValueError("Agent failure")
|
|
failing_agent.processTask = AsyncMock(side_effect=fail_task)
|
|
failing_agent.mydom = self.mydom_mock
|
|
self.registry_mock.getAgent.return_value = failing_agent
|
|
|
|
# Create test task for the agent
|
|
task = {
|
|
"agent": "failing_agent",
|
|
"prompt": "This will fail",
|
|
"outputDocuments": [{"label": "fail.txt", "prompt": ""}],
|
|
"inputDocuments": []
|
|
}
|
|
|
|
print(f"{Fore.YELLOW}➤ Executing failing agent task...{Style.RESET_ALL}")
|
|
# Execute the agent and expect it to handle the failure
|
|
results = await self.manager.agentProcessing(task, workflow)
|
|
|
|
# Print results
|
|
print(f"{Fore.GREEN}✓ Agent failure handled correctly:{Style.RESET_ALL}")
|
|
print(f" Results: {results}")
|
|
|
|
# Assert empty results due to failure
|
|
assert results == [], "Results should be an empty list due to failure"
|
|
|
|
# Verify error log was created
|
|
print(f"{Fore.YELLOW}➤ Verifying error logging...{Style.RESET_ALL}")
|
|
self.mydom_mock.createWorkflowLog.assert_called()
|
|
print(f"{Fore.GREEN}✓ Error correctly logged{Style.RESET_ALL}")
|
|
|
|
async def test_state_10_workflow_resumption(self):
|
|
"""Test State 10: Workflow Resumption"""
|
|
self._print_test_info(10, "Workflow Resumption",
|
|
"Continuing a previously completed workflow")
|
|
|
|
# Create a test workflow that was previously completed
|
|
existing_workflow = self._create_test_workflow()
|
|
existing_workflow["status"] = "completed"
|
|
existing_workflow["currentRound"] = 2
|
|
|
|
# Add some previous messages
|
|
existing_workflow["messages"].append(self._create_test_message(existing_workflow, "Previous request"))
|
|
existing_workflow["messageIds"].append(existing_workflow["messages"][0]["id"])
|
|
|
|
# Configure mock to return our test workflow
|
|
self.mydom_mock.getWorkflow.return_value = existing_workflow
|
|
|
|
print(f"{Fore.YELLOW}➤ Resuming workflow...{Style.RESET_ALL}")
|
|
# Resume the workflow with a new message
|
|
workflow = self.manager.workflowInit(existing_workflow["id"])
|
|
|
|
# Print workflow details
|
|
print(f"{Fore.GREEN}✓ Workflow resumed:{Style.RESET_ALL}")
|
|
print(f" ID: {workflow.get('id', 'N/A')}")
|
|
print(f" Status: {workflow.get('status', 'N/A')}")
|
|
print(f" Round: {workflow.get('currentRound', 'N/A')}")
|
|
|
|
# Assert workflow state
|
|
assert workflow["id"] == existing_workflow["id"], "Workflow ID should match existing ID"
|
|
assert workflow["status"] == "running", "Workflow status should be 'running'"
|
|
assert workflow["currentRound"] == 3, "Workflow round should be incremented to 3"
|
|
|
|
# Verify database update
|
|
print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}")
|
|
self.mydom_mock.updateWorkflow.assert_called_once()
|
|
print(f"{Fore.GREEN}✓ Database correctly updated with new round{Style.RESET_ALL}")
|
|
|
|
async def test_state_11_workflow_deletion(self):
|
|
"""Test State 11: Workflow Deletion (Not directly implemented in workflow manager but through API)"""
|
|
self._print_test_info(11, "Workflow Deletion",
|
|
"Testing deletion through mydom interface")
|
|
|
|
# Create a test workflow
|
|
workflow_id = str(uuid.uuid4())
|
|
|
|
# Configure mock
|
|
self.mydom_mock.getWorkflow.return_value = {"id": workflow_id, "userId": self.userId}
|
|
self.mydom_mock.deleteWorkflow.return_value = True
|
|
|
|
print(f"{Fore.YELLOW}➤ Deleting workflow...{Style.RESET_ALL}")
|
|
# Delete the workflow through mydom
|
|
result = self.mydom_mock.deleteWorkflow(workflow_id)
|
|
|
|
# Print result
|
|
print(f"{Fore.GREEN}✓ Workflow deletion result: {result}{Style.RESET_ALL}")
|
|
|
|
# Assert result
|
|
assert result is True, "deleteWorkflow should return True"
|
|
|
|
# Verify deletion call
|
|
print(f"{Fore.YELLOW}➤ Verifying deletion call...{Style.RESET_ALL}")
|
|
self.mydom_mock.deleteWorkflow.assert_called_once_with(workflow_id)
|
|
print(f"{Fore.GREEN}✓ Workflow correctly deleted{Style.RESET_ALL}")
|
|
|
|
# ------------------------------------------------------------------------
|
|
# INTEGRATION TESTS
|
|
# ------------------------------------------------------------------------
|
|
|
|
async def test_full_workflow_cycle(self):
|
|
"""Test a complete workflow cycle from start to finish"""
|
|
print(f"\n{Fore.CYAN}{Style.BRIGHT}INTEGRATION TEST: Full Workflow Cycle{Style.RESET_ALL}")
|
|
print(f"{Fore.WHITE}This test simulates a complete workflow from start to finish{Style.RESET_ALL}")
|
|
print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}")
|
|
|
|
# Configure mocks for a successful flow
|
|
print(f"{Fore.YELLOW}➤ Setting up test environment...{Style.RESET_ALL}")
|
|
self._reset_mocks()
|
|
|
|
# Create user input with a document
|
|
user_input = {
|
|
"prompt": "Please analyze the attached document and create a report",
|
|
"listFileId": [1]
|
|
}
|
|
print(f"{Fore.GREEN}✓ Created user input with document request{Style.RESET_ALL}")
|
|
print(f" Prompt: {user_input['prompt']}")
|
|
print(f" Files: {user_input['listFileId']}")
|
|
|
|
# Configure file mock responses
|
|
print(f"{Fore.YELLOW}➤ Configuring mock files...{Style.RESET_ALL}")
|
|
self.mydom_mock.getFile.return_value = {"name": "source.txt", "mandateId": self.mandateId}
|
|
self.mydom_mock.getFileData.return_value = b"Source content for analysis"
|
|
self.mydom_mock.saveUploadedFile.return_value = {"id": 2, "name": "result.txt"}
|
|
print(f"{Fore.GREEN}✓ Mock files configured{Style.RESET_ALL}")
|
|
|
|
# Start a new workflow
|
|
print(f"\n{Fore.YELLOW}➤ Starting workflow...{Style.RESET_ALL}")
|
|
workflow = await self.manager.workflowStart(user_input)
|
|
print(f"{Fore.GREEN}✓ Workflow started with ID: {workflow['id']}{Style.RESET_ALL}")
|
|
|
|
# Wait for async processing to complete
|
|
print(f"{Fore.YELLOW}➤ Waiting for async processing...{Style.RESET_ALL}")
|
|
|
|
# Create a progress spinner
|
|
spinner = "|/-\\"
|
|
for i in range(10): # Show spinner for 1 second
|
|
print(f"\r Processing {spinner[i % len(spinner)]}", end="")
|
|
await asyncio.sleep(0.1)
|
|
print("\r Processing complete! ")
|
|
|
|
# Visualize the workflow state transitions
|
|
print(f"\n{Fore.CYAN}Workflow State Transitions:{Style.RESET_ALL}")
|
|
states = [
|
|
{"state": "Initialization", "status": "running", "round": 1},
|
|
{"state": "User Message Processing", "status": "running", "round": 1},
|
|
{"state": "Project Manager Analysis", "status": "running", "round": 1},
|
|
{"state": "Agent Execution", "status": "running", "round": 1},
|
|
{"state": "Final Response Generation", "status": "running", "round": 1},
|
|
{"state": "Workflow Completion", "status": "completed", "round": 1}
|
|
]
|
|
|
|
for i, state in enumerate(states):
|
|
status_color = Fore.GREEN if state["status"] == "completed" else Fore.YELLOW
|
|
print(f" {i+1}. {state['state']}: {status_color}{state['status']}{Style.RESET_ALL}")
|
|
|
|
# Verify start state
|
|
print(f"\n{Fore.YELLOW}➤ Verifying workflow state...{Style.RESET_ALL}")
|
|
print(f" Initial status: {Fore.YELLOW}{workflow['status']}{Style.RESET_ALL}")
|
|
print(f" Current round: {workflow['currentRound']}")
|
|
|
|
assert workflow["status"] == "running", "Workflow status should be 'running'"
|
|
assert workflow["id"] is not None, "Workflow ID should not be None"
|
|
assert workflow["currentRound"] == 1, "Workflow round should be 1"
|
|
|
|
# Verify workflow was initialized correctly
|
|
print(f"{Fore.YELLOW}➤ Verifying database interactions...{Style.RESET_ALL}")
|
|
self.mydom_mock.createWorkflow.assert_called_once()
|
|
print(f"{Fore.GREEN}✓ Workflow correctly initialized in database{Style.RESET_ALL}")
|
|
|
|
async def test_workflow_with_exception(self):
|
|
"""Test workflow handling an exception during processing"""
|
|
print(f"\n{Fore.CYAN}{Style.BRIGHT}INTEGRATION TEST: Workflow Exception Handling{Style.RESET_ALL}")
|
|
print(f"{Fore.WHITE}This test simulates a workflow with an exception during processing{Style.RESET_ALL}")
|
|
print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}")
|
|
|
|
# Configure mocks for an exception scenario
|
|
print(f"{Fore.YELLOW}➤ Setting up exception scenario...{Style.RESET_ALL}")
|
|
self._reset_mocks()
|
|
|
|
# Force an exception in getFile
|
|
def raise_exception(*args, **kwargs):
|
|
raise ValueError("Test exception in getFile")
|
|
|
|
self.mydom_mock.getFile.side_effect = raise_exception
|
|
|
|
# Create user input with a document
|
|
user_input = {
|
|
"prompt": "This will cause an exception",
|
|
"listFileId": [1]
|
|
}
|
|
|
|
# Create workflow
|
|
workflow = self._create_test_workflow()
|
|
|
|
print(f"{Fore.YELLOW}➤ Processing workflow with exception...{Style.RESET_ALL}")
|
|
# Process with exception
|
|
result = await self.manager.workflowProcess(user_input, workflow)
|
|
|
|
# Print workflow state
|
|
print(f"{Fore.GREEN}✓ Exception handled:{Style.RESET_ALL}")
|
|
print(f" Status: {result.get('status', 'N/A')}")
|
|
|
|
# Verify failure state
|
|
assert result["status"] == "failed", "Workflow status should be 'failed'"
|
|
|
|
# Verify error log
|
|
print(f"{Fore.YELLOW}➤ Verifying error logging...{Style.RESET_ALL}")
|
|
self.mydom_mock.createWorkflowLog.assert_called()
|
|
print(f"{Fore.GREEN}✓ Error correctly logged{Style.RESET_ALL}")
|
|
|
|
# Verify database update
|
|
print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}")
|
|
self.mydom_mock.updateWorkflow.assert_called()
|
|
print(f"{Fore.GREEN}✓ Database correctly updated with failure status{Style.RESET_ALL}")
|
|
|
|
async def test_workflow_stop_during_processing(self):
|
|
"""Test stopping a workflow during processing"""
|
|
print(f"\n{Fore.CYAN}{Style.BRIGHT}INTEGRATION TEST: Workflow Stop During Processing{Style.RESET_ALL}")
|
|
print(f"{Fore.WHITE}This test simulates stopping a workflow during processing{Style.RESET_ALL}")
|
|
print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}")
|
|
|
|
# Create a test workflow that is running
|
|
workflow = self._create_test_workflow()
|
|
|
|
# Configure mock
|
|
self.mydom_mock.loadWorkflowState.return_value = workflow
|
|
|
|
print(f"{Fore.YELLOW}➤ Stopping workflow during processing...{Style.RESET_ALL}")
|
|
# Stop the workflow
|
|
stopped_workflow = await self.manager.workflowStop(workflow["id"])
|
|
|
|
# Print workflow state
|
|
print(f"{Fore.GREEN}✓ Workflow stopped:{Style.RESET_ALL}")
|
|
print(f" Status: {stopped_workflow.get('status', 'N/A')}")
|
|
|
|
# Verify stopped state
|
|
assert stopped_workflow["status"] == "stopped", "Workflow status should be 'stopped'"
|
|
|
|
# Verify database update
|
|
print(f"{Fore.YELLOW}➤ Verifying database update...{Style.RESET_ALL}")
|
|
self.mydom_mock.updateWorkflow.assert_called_once()
|
|
print(f"{Fore.GREEN}✓ Database correctly updated with stopped status{Style.RESET_ALL}")
|
|
|
|
# ------------------------------------------------------------------------
|
|
# UTILITY FUNCTION TESTS
|
|
# ------------------------------------------------------------------------
|
|
|
|
def test_parse_json_response(self):
|
|
"""Test JSON response parsing"""
|
|
print(f"\n{Fore.CYAN}{Style.BRIGHT}UTILITY TEST: JSON Response Parsing{Style.RESET_ALL}")
|
|
print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}")
|
|
|
|
# Test with clean JSON
|
|
print(f"{Fore.YELLOW}➤ Testing with clean JSON...{Style.RESET_ALL}")
|
|
clean_json = '{"key": "value", "number": 123}'
|
|
result = self.manager.parseJsonResponse(clean_json)
|
|
|
|
print(f"{Fore.GREEN}✓ Clean JSON result:{Style.RESET_ALL}")
|
|
print(f" key: {result.get('key', 'N/A')}")
|
|
print(f" number: {result.get('number', 'N/A')}")
|
|
|
|
assert result["key"] == "value", "Clean JSON parsing should extract key value"
|
|
assert result["number"] == 123, "Clean JSON parsing should extract number value"
|
|
|
|
# Test with JSON embedded in text
|
|
print(f"\n{Fore.YELLOW}➤ Testing with JSON embedded in text...{Style.RESET_ALL}")
|
|
text_with_json = 'Some text before {"key": "value"} and after'
|
|
result = self.manager.parseJsonResponse(text_with_json)
|
|
|
|
print(f"{Fore.GREEN}✓ Embedded JSON result:{Style.RESET_ALL}")
|
|
print(f" key: {result.get('key', 'N/A')}")
|
|
|
|
assert result["key"] == "value", "Embedded JSON parsing should extract key value"
|
|
|
|
# Test with invalid JSON
|
|
print(f"\n{Fore.YELLOW}➤ Testing with invalid JSON...{Style.RESET_ALL}")
|
|
invalid_json = 'Not a JSON at all'
|
|
result = self.manager.parseJsonResponse(invalid_json)
|
|
|
|
print(f"{Fore.GREEN}✓ Invalid JSON fallback result:{Style.RESET_ALL}")
|
|
print(f" objWorkplan: {result.get('objWorkplan', 'N/A')}")
|
|
|
|
# Should return a fallback structure
|
|
assert "objFinalDocuments" in result, "Invalid JSON should return fallback with objFinalDocuments"
|
|
assert "objWorkplan" in result, "Invalid JSON should return fallback with objWorkplan"
|
|
assert "objUserResponse" in result, "Invalid JSON should return fallback with objUserResponse"
|
|
print(f"{Fore.GREEN}✓ All JSON parsing scenarios handled correctly{Style.RESET_ALL}")
|
|
|
|
def test_get_filename(self):
|
|
"""Test filename extraction from document"""
|
|
print(f"\n{Fore.CYAN}{Style.BRIGHT}UTILITY TEST: Filename Extraction{Style.RESET_ALL}")
|
|
print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}")
|
|
|
|
# Test with extension
|
|
print(f"{Fore.YELLOW}➤ Testing with extension...{Style.RESET_ALL}")
|
|
document = {"name": "test", "ext": "txt"}
|
|
filename = self.manager.getFilename(document)
|
|
|
|
print(f"{Fore.GREEN}✓ Filename with extension result:{Style.RESET_ALL}")
|
|
print(f" Result: {filename}")
|
|
|
|
assert filename == "test.txt", "Filename should be 'test.txt'"
|
|
|
|
# Test with no extension
|
|
print(f"\n{Fore.YELLOW}➤ Testing with no extension...{Style.RESET_ALL}")
|
|
document = {"name": "test", "ext": ""}
|
|
filename = self.manager.getFilename(document)
|
|
|
|
print(f"{Fore.GREEN}✓ Filename without extension result:{Style.RESET_ALL}")
|
|
print(f" Result: {filename}")
|
|
|
|
assert filename == "test", "Filename should be 'test'"
|
|
print(f"{Fore.GREEN}✓ All filename extraction scenarios handled correctly{Style.RESET_ALL}")
|
|
|
|
def test_get_available_documents(self):
|
|
"""Test getting available documents from workflow"""
|
|
print(f"\n{Fore.CYAN}{Style.BRIGHT}UTILITY TEST: Get Available Documents{Style.RESET_ALL}")
|
|
print(f"{Fore.YELLOW}{'-' * 60}{Style.RESET_ALL}")
|
|
|
|
# Create a test workflow with messages containing documents
|
|
workflow = self._create_test_workflow()
|
|
|
|
# Add user message with document
|
|
print(f"{Fore.YELLOW}➤ Creating user message with document...{Style.RESET_ALL}")
|
|
user_message = self._create_test_message(workflow, "Message with document")
|
|
user_message["documents"] = [self._create_test_document("user_doc.txt")]
|
|
workflow["messages"].append(user_message)
|
|
|
|
# Add assistant message with document
|
|
print(f"{Fore.YELLOW}➤ Creating assistant message with document...{Style.RESET_ALL}")
|
|
assistant_message = self._create_test_message(workflow, "Response with document", "assistant")
|
|
assistant_message["documents"] = [self._create_test_document("assistant_doc.txt")]
|
|
workflow["messages"].append(assistant_message)
|
|
|
|
print(f"{Fore.YELLOW}➤ Getting available documents...{Style.RESET_ALL}")
|
|
# Get available documents
|
|
available_docs = self.manager.getAvailableDocuments(workflow, user_message)
|
|
|
|
# Print results
|
|
print(f"{Fore.GREEN}✓ Available documents result:{Style.RESET_ALL}")
|
|
print(f" Count: {len(available_docs)}")
|
|
for i, doc in enumerate(available_docs):
|
|
print(f" {i+1}. {doc.get('label', 'N/A')} ({doc.get('fileSource', 'N/A')})")
|
|
|
|
# Verify result
|
|
assert len(available_docs) == 2, "Available documents should have 2 entries"
|
|
# Should be sorted newest first
|
|
assert available_docs[0]["label"] == "assistant_doc.txt", "First document should be assistant_doc.txt"
|
|
assert available_docs[1]["label"] == "user_doc.txt", "Second document should be user_doc.txt"
|
|
# User's document should be marked as from user
|
|
assert available_docs[1]["fileSource"] == "user", "User document should have fileSource='user'"
|
|
print(f"{Fore.GREEN}✓ Available documents correctly identified and sorted{Style.RESET_ALL}")
|
|
|
|
|
|
# Simple test runner
|
|
if __name__ == "__main__":
|
|
# Import the runner and execute tests
|
|
try:
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
from tool_testBackendSingle import run_tests
|
|
asyncio.run(run_tests())
|
|
except ImportError:
|
|
print(f"{Fore.YELLOW}Please use tool_testBackendSingle.py to run the tests properly{Style.RESET_ALL}") |