gateway/tool_testBackendSingle.py
2025-04-26 02:13:22 +02:00

432 lines
No EOL
16 KiB
Python

#!/usr/bin/env python3
"""
Simplified Test Runner for Workflow State Machine
This script provides a clean and simple test runner for the workflow state machine
tests that properly handles async test methods.
Usage:
python tool_testBackendSingle.py [test_name]
Examples:
python tool_testBackendSingle.py # Run all tests
python tool_testBackendSingle.py test_state_1 # Run tests starting with test_state_1
"""
import os
import sys
import asyncio
import time
import traceback
import importlib
import inspect
from unittest.mock import patch, MagicMock, AsyncMock
# Try to import colorama, install if not available
try:
from colorama import init, Fore, Back, Style
init() # Initialize colorama
except ImportError:
print("Installing required package: colorama")
import subprocess
subprocess.call([sys.executable, "-m", "pip", "install", "colorama"])
from colorama import init, Fore, Back, Style
init() # Initialize colorama
class AsyncTestRunner:
"""Simple test runner that supports async test methods"""
def __init__(self):
"""Initialize the test runner"""
self.success_count = 0
self.failure_count = 0
self.results = []
self.total_time = 0
def print_header(self, test_case_name):
"""Print a header for the test suite"""
print("\n" + "=" * 80)
print(f"{Fore.CYAN}{Style.BRIGHT}{test_case_name}{Style.RESET_ALL}")
print("=" * 80)
def print_result(self, test_name, success, duration, error=None):
"""Print a test result with appropriate formatting"""
clean_name = test_name.replace('test_', '').replace('_', ' ').title()
if success:
status = f"{Fore.GREEN}[PASS]{Style.RESET_ALL}"
self.success_count += 1
else:
status = f"{Fore.RED}[FAIL]{Style.RESET_ALL}"
self.failure_count += 1
# Print result line
print(f"{status} {clean_name} - {duration:.2f}s")
# Print error if any
if error:
print(f" {Fore.RED}{error}{Style.RESET_ALL}")
if isinstance(error, Exception):
traceback.print_exception(type(error), error, error.__traceback__)
# Store result
self.results.append({
'name': clean_name,
'success': success,
'duration': duration,
'error': error
})
def print_summary(self):
"""Print a summary of test results"""
print("\n" + "=" * 80)
print(f"{Fore.CYAN}{Style.BRIGHT}TEST SUMMARY{Style.RESET_ALL}")
print("-" * 80)
# Print timing
print(f"Total execution time: {self.total_time:.2f}s")
# Print counts
total = self.success_count + self.failure_count
print(f"Tests: {total}, Passed: {Fore.GREEN}{self.success_count}{Style.RESET_ALL}, Failed: {Fore.RED}{self.failure_count}{Style.RESET_ALL}")
# Print overall status
if self.failure_count == 0:
print(f"\n{Fore.GREEN}{Style.BRIGHT}✓ ALL TESTS PASSED{Style.RESET_ALL}")
else:
print(f"\n{Fore.RED}{Style.BRIGHT}✗ TESTS FAILED{Style.RESET_ALL}")
# Print failures
print(f"\n{Fore.RED}Failed tests:{Style.RESET_ALL}")
for result in self.results:
if not result['success']:
print(f" - {result['name']}")
print("=" * 80)
async def run_test(self, test_instance, test_method):
"""Run a single test method (sync or async)"""
# Prepare test
test_name = test_method.__name__
clean_name = test_name.replace('test_', '').replace('_', ' ').title()
# Print start
print(f"\n{Fore.BLUE}[RUNNING]{Style.RESET_ALL} {clean_name}...")
# Run setUp
if hasattr(test_instance, 'setUp'):
await self.run_method_with_instance(test_instance, test_instance.setUp)
# Time the test execution
start_time = time.time()
success = True
error = None
try:
# Run the test - ensure bound method gets called with instance
if hasattr(test_method, '__self__') and test_method.__self__ is None:
# This is an unbound method, bind it to the instance
bound_method = getattr(test_instance, test_method.__name__)
await self.run_method_with_instance(test_instance, bound_method)
else:
# This is already a bound method
await self.run_method_with_instance(test_instance, test_method)
except Exception as e:
success = False
error = e
# Calculate duration
duration = time.time() - start_time
# Run tearDown
if hasattr(test_instance, 'tearDown'):
await self.run_method_with_instance(test_instance, test_instance.tearDown)
# Record and print result
self.print_result(test_name, success, duration, error)
return success
async def run_method_with_instance(self, instance, method):
"""Run a method ensuring it has the correct instance"""
method_name = method.__name__
bound_method = getattr(instance, method_name)
if asyncio.iscoroutinefunction(bound_method):
return await bound_method()
else:
return bound_method()
async def run_method(self, method):
"""Run a method that might be async or regular"""
# Check if this is an unbound method that needs self
if hasattr(method, '__self__') and method.__self__ is None:
# This suggests it's an unbound method that needs an instance
raise TypeError(f"Method {method.__name__} appears to be unbound and needs 'self'")
if asyncio.iscoroutinefunction(method):
return await method()
else:
return method()
def _reset_mocks(self):
"""Reset all mocks for a fresh test"""
# Only reset if the objects have reset_mock method
if hasattr(self.mydom_mock, 'reset_mock'):
self.mydom_mock.reset_mock()
else:
# Recreate the mock objects
self._setup_mocks()
if hasattr(self.registry_mock, 'reset_mock'):
self.registry_mock.reset_mock()
if hasattr(self.getDocumentContents_mock, 'reset_mock'):
self.getDocumentContents_mock.reset_mock()
async def run_test_case(self, test_case_class, filter_pattern=None):
"""Run all test methods in a test case class"""
# Initialize timing
start_time = time.time()
# Print header
self.print_header(test_case_class.__name__)
# Get all test methods
test_methods = sorted([
getattr(test_case_class, name) for name in dir(test_case_class)
if name.startswith('test_') and callable(getattr(test_case_class, name))
], key=lambda x: x.__name__)
# Filter tests if pattern provided
if filter_pattern:
test_methods = [
method for method in test_methods
if filter_pattern in method.__name__
]
if not test_methods:
print(f"{Fore.YELLOW}No tests found{Style.RESET_ALL}")
return
print(f"Running {len(test_methods)} tests...\n")
# Run each test
for test_method in test_methods:
# Create a fresh instance for each test
test_instance = test_case_class()
await self.run_test(test_instance, test_method)
# Record total time
self.total_time = time.time() - start_time
# Print summary
self.print_summary()
return self.failure_count == 0
def setup_module_paths():
"""Set up module paths to make imports work"""
# Add current directory and parent directory to path
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
if current_dir not in sys.path:
sys.path.insert(0, current_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
# Also add any module directories that might exist
modules_dir = os.path.join(parent_dir, 'modules')
if os.path.exists(modules_dir) and modules_dir not in sys.path:
sys.path.insert(0, modules_dir)
gateway_dir = os.path.join(parent_dir, 'gateway')
if os.path.exists(gateway_dir) and gateway_dir not in sys.path:
sys.path.insert(0, gateway_dir)
print(f"{Fore.CYAN}Python path set to:{Style.RESET_ALL}")
for path in sys.path[:5]: # Print first 5 paths
print(f" - {path}")
def find_test_files():
"""Find test files in the current directory"""
# Look for test files in priority order
test_files = []
# First priority: test_workflow_state_machine.py
if os.path.exists('./test_workflow_state_machine.py'):
test_files.append('test_workflow_state_machine.py')
# Second priority: any tool_test*.py files
tool_test_files = [f for f in os.listdir('.') if f.startswith('tool_test') and f.endswith('.py') and f != 'tool_testBackendSingle.py']
test_files.extend(tool_test_files)
# Last priority: any test_*.py files
other_test_files = [f for f in os.listdir('.') if f.startswith('test_') and f.endswith('.py') and f not in test_files]
test_files.extend(other_test_files)
return test_files
async def run_tests(test_file=None, test_filter=None):
"""Run all tests"""
# Set up paths
setup_module_paths()
# Find test files if not specified
if not test_file:
test_files = find_test_files()
if not test_files:
print(f"{Fore.RED}No test files found{Style.RESET_ALL}")
return False
test_file = test_files[0]
print(f"{Fore.YELLOW}Found test files: {', '.join(test_files)}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Using: {test_file}{Style.RESET_ALL}")
# Remove .py extension for import
module_name = test_file[:-3] if test_file.endswith('.py') else test_file
try:
# First try a normal import
print(f"{Fore.YELLOW}Attempting to import module: {module_name}{Style.RESET_ALL}")
test_module = importlib.import_module(module_name)
print(f"{Fore.GREEN}Successfully imported test module: {module_name}{Style.RESET_ALL}")
except ImportError as e:
print(f"{Fore.RED}Error importing module {module_name}: {e}{Style.RESET_ALL}")
# Try different import approaches
try:
# Try to load as a relative module
print(f"{Fore.YELLOW}Trying relative import...{Style.RESET_ALL}")
test_module = importlib.import_module('.' + module_name, package=__package__)
print(f"{Fore.GREEN}Imported test module via relative import: {module_name}{Style.RESET_ALL}")
except ImportError as e:
print(f"{Fore.RED}Relative import failed: {e}{Style.RESET_ALL}")
# Fall back to exec (not recommended but sometimes necessary)
print(f"{Fore.YELLOW}Attempting to load using exec: {test_file}{Style.RESET_ALL}")
try:
with open(test_file, 'r') as f:
module_content = f.read()
# Create a new module namespace
module_namespace = {}
# Execute the module code in the namespace
exec(module_content, module_namespace)
# Create a mock module
class MockModule:
pass
test_module = MockModule()
# Copy the relevant attributes to the mock module
for key, value in module_namespace.items():
setattr(test_module, key, value)
print(f"{Fore.GREEN}Loaded test module using exec: {test_file}{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Failed to load module using exec: {e}{Style.RESET_ALL}")
traceback.print_exc()
return False
# Find test case class
test_case_class = None
print(f"{Fore.YELLOW}Looking for test case class in module...{Style.RESET_ALL}")
for item_name in dir(test_module):
item = getattr(test_module, item_name)
if inspect.isclass(item) and (item_name.startswith('Test') or 'Test' in item_name):
print(f"{Fore.GREEN}Found test case class: {item_name}{Style.RESET_ALL}")
test_case_class = item
break
if not test_case_class:
print(f"{Fore.RED}No test case class found in {test_file}{Style.RESET_ALL}")
return False
# Try to check for required imports
try:
print(f"{Fore.YELLOW}Checking for agent registry...{Style.RESET_ALL}")
try:
# First try direct import
from modules.workflowAgentsRegistry import getAgentRegistry
print(f"{Fore.GREEN}Successfully imported getAgentRegistry{Style.RESET_ALL}")
except ImportError:
try:
# Try alternate path
from modules.workflowAgentsRegistry import getAgentRegistry
print(f"{Fore.GREEN}Successfully imported getAgentRegistry from modules{Style.RESET_ALL}")
except ImportError:
print(f"{Fore.YELLOW}Agent registry import not found - may cause issues{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.YELLOW}Error checking agent registry: {e}{Style.RESET_ALL}")
# Run the tests
print(f"{Fore.CYAN}Starting test execution{Style.RESET_ALL}")
runner = AsyncTestRunner()
return await runner.run_test_case(test_case_class, test_filter)
if __name__ == "__main__":
# Get test filter from command line
test_file = None
test_filter = None
if len(sys.argv) > 1:
# Check if first arg is a file
if os.path.exists(sys.argv[1]) or sys.argv[1].endswith('.py'):
test_file = sys.argv[1]
if len(sys.argv) > 2:
test_filter = sys.argv[2]
else:
test_filter = sys.argv[1]
# Run tests
asyncio.run(run_tests(test_file, test_filter))
class MockDomInterface:
def __init__(self, *args, **kwargs):
self.getWorkflow = MagicMock(return_value=None)
self.loadWorkflowState = MagicMock(return_value=None)
self.createWorkflow = MagicMock()
self.updateWorkflow = MagicMock()
self.createWorkflowLog = MagicMock()
self.createWorkflowMessage = MagicMock()
self.getFile = MagicMock()
self.getFileData = MagicMock()
self.saveUploadedFile = MagicMock()
self.userLanguage = "en"
self.callAi = AsyncMock()
self.setUserLanguage = MagicMock()
def reset_mock(self):
"""Reset all mocks in this interface"""
for attr_name in dir(self):
attr = getattr(self, attr_name)
if hasattr(attr, 'reset_mock'):
attr.reset_mock()
class MockAgentRegistry:
def __init__(self):
self.getAgent = MagicMock()
self.getAgentInfos = MagicMock(return_value=[
{"name": "test_agent", "description": "Test agent", "capabilities": ["text_processing"]}
])
self.setMydom = MagicMock()
def reset_mock(self):
"""Reset all mocks in this registry"""
for attr_name in dir(self):
attr = getattr(self, attr_name)
if hasattr(attr, 'reset_mock'):
attr.reset_mock()