#!/usr/bin/env python3 """ Simplified Test Runner for Workflow State Machine This script provides a clean and simple test runner for the workflow state machine tests that properly handles async test methods. Usage: python tool_testBackendSingle.py [test_name] Examples: python tool_testBackendSingle.py # Run all tests python tool_testBackendSingle.py test_state_1 # Run tests starting with test_state_1 """ import os import sys import asyncio import time import traceback import importlib import inspect from unittest.mock import patch, MagicMock, AsyncMock # Try to import colorama, install if not available try: from colorama import init, Fore, Back, Style init() # Initialize colorama except ImportError: print("Installing required package: colorama") import subprocess subprocess.call([sys.executable, "-m", "pip", "install", "colorama"]) from colorama import init, Fore, Back, Style init() # Initialize colorama class AsyncTestRunner: """Simple test runner that supports async test methods""" def __init__(self): """Initialize the test runner""" self.success_count = 0 self.failure_count = 0 self.results = [] self.total_time = 0 def print_header(self, test_case_name): """Print a header for the test suite""" print("\n" + "=" * 80) print(f"{Fore.CYAN}{Style.BRIGHT}{test_case_name}{Style.RESET_ALL}") print("=" * 80) def print_result(self, test_name, success, duration, error=None): """Print a test result with appropriate formatting""" clean_name = test_name.replace('test_', '').replace('_', ' ').title() if success: status = f"{Fore.GREEN}[PASS]{Style.RESET_ALL}" self.success_count += 1 else: status = f"{Fore.RED}[FAIL]{Style.RESET_ALL}" self.failure_count += 1 # Print result line print(f"{status} {clean_name} - {duration:.2f}s") # Print error if any if error: print(f" {Fore.RED}→ {error}{Style.RESET_ALL}") if isinstance(error, Exception): traceback.print_exception(type(error), error, error.__traceback__) # Store result self.results.append({ 'name': clean_name, 'success': success, 'duration': duration, 'error': error }) def print_summary(self): """Print a summary of test results""" print("\n" + "=" * 80) print(f"{Fore.CYAN}{Style.BRIGHT}TEST SUMMARY{Style.RESET_ALL}") print("-" * 80) # Print timing print(f"Total execution time: {self.total_time:.2f}s") # Print counts total = self.success_count + self.failure_count print(f"Tests: {total}, Passed: {Fore.GREEN}{self.success_count}{Style.RESET_ALL}, Failed: {Fore.RED}{self.failure_count}{Style.RESET_ALL}") # Print overall status if self.failure_count == 0: print(f"\n{Fore.GREEN}{Style.BRIGHT}✓ ALL TESTS PASSED{Style.RESET_ALL}") else: print(f"\n{Fore.RED}{Style.BRIGHT}✗ TESTS FAILED{Style.RESET_ALL}") # Print failures print(f"\n{Fore.RED}Failed tests:{Style.RESET_ALL}") for result in self.results: if not result['success']: print(f" - {result['name']}") print("=" * 80) async def run_test(self, test_instance, test_method): """Run a single test method (sync or async)""" # Prepare test test_name = test_method.__name__ clean_name = test_name.replace('test_', '').replace('_', ' ').title() # Print start print(f"\n{Fore.BLUE}[RUNNING]{Style.RESET_ALL} {clean_name}...") # Run setUp if hasattr(test_instance, 'setUp'): await self.run_method_with_instance(test_instance, test_instance.setUp) # Time the test execution start_time = time.time() success = True error = None try: # Run the test - ensure bound method gets called with instance if hasattr(test_method, '__self__') and test_method.__self__ is None: # This is an unbound method, bind it to the instance bound_method = getattr(test_instance, test_method.__name__) await self.run_method_with_instance(test_instance, bound_method) else: # This is already a bound method await self.run_method_with_instance(test_instance, test_method) except Exception as e: success = False error = e # Calculate duration duration = time.time() - start_time # Run tearDown if hasattr(test_instance, 'tearDown'): await self.run_method_with_instance(test_instance, test_instance.tearDown) # Record and print result self.print_result(test_name, success, duration, error) return success async def run_method_with_instance(self, instance, method): """Run a method ensuring it has the correct instance""" method_name = method.__name__ bound_method = getattr(instance, method_name) if asyncio.iscoroutinefunction(bound_method): return await bound_method() else: return bound_method() async def run_method(self, method): """Run a method that might be async or regular""" # Check if this is an unbound method that needs self if hasattr(method, '__self__') and method.__self__ is None: # This suggests it's an unbound method that needs an instance raise TypeError(f"Method {method.__name__} appears to be unbound and needs 'self'") if asyncio.iscoroutinefunction(method): return await method() else: return method() def _reset_mocks(self): """Reset all mocks for a fresh test""" # Only reset if the objects have reset_mock method if hasattr(self.mydom_mock, 'reset_mock'): self.mydom_mock.reset_mock() else: # Recreate the mock objects self._setup_mocks() if hasattr(self.registry_mock, 'reset_mock'): self.registry_mock.reset_mock() if hasattr(self.getDocumentContents_mock, 'reset_mock'): self.getDocumentContents_mock.reset_mock() async def run_test_case(self, test_case_class, filter_pattern=None): """Run all test methods in a test case class""" # Initialize timing start_time = time.time() # Print header self.print_header(test_case_class.__name__) # Get all test methods test_methods = sorted([ getattr(test_case_class, name) for name in dir(test_case_class) if name.startswith('test_') and callable(getattr(test_case_class, name)) ], key=lambda x: x.__name__) # Filter tests if pattern provided if filter_pattern: test_methods = [ method for method in test_methods if filter_pattern in method.__name__ ] if not test_methods: print(f"{Fore.YELLOW}No tests found{Style.RESET_ALL}") return print(f"Running {len(test_methods)} tests...\n") # Run each test for test_method in test_methods: # Create a fresh instance for each test test_instance = test_case_class() await self.run_test(test_instance, test_method) # Record total time self.total_time = time.time() - start_time # Print summary self.print_summary() return self.failure_count == 0 def setup_module_paths(): """Set up module paths to make imports work""" # Add current directory and parent directory to path current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) if current_dir not in sys.path: sys.path.insert(0, current_dir) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) # Also add any module directories that might exist modules_dir = os.path.join(parent_dir, 'modules') if os.path.exists(modules_dir) and modules_dir not in sys.path: sys.path.insert(0, modules_dir) gateway_dir = os.path.join(parent_dir, 'gateway') if os.path.exists(gateway_dir) and gateway_dir not in sys.path: sys.path.insert(0, gateway_dir) print(f"{Fore.CYAN}Python path set to:{Style.RESET_ALL}") for path in sys.path[:5]: # Print first 5 paths print(f" - {path}") def find_test_files(): """Find test files in the current directory""" # Look for test files in priority order test_files = [] # First priority: test_workflow_state_machine.py if os.path.exists('./test_workflow_state_machine.py'): test_files.append('test_workflow_state_machine.py') # Second priority: any tool_test*.py files tool_test_files = [f for f in os.listdir('.') if f.startswith('tool_test') and f.endswith('.py') and f != 'tool_testBackendSingle.py'] test_files.extend(tool_test_files) # Last priority: any test_*.py files other_test_files = [f for f in os.listdir('.') if f.startswith('test_') and f.endswith('.py') and f not in test_files] test_files.extend(other_test_files) return test_files async def run_tests(test_file=None, test_filter=None): """Run all tests""" # Set up paths setup_module_paths() # Find test files if not specified if not test_file: test_files = find_test_files() if not test_files: print(f"{Fore.RED}No test files found{Style.RESET_ALL}") return False test_file = test_files[0] print(f"{Fore.YELLOW}Found test files: {', '.join(test_files)}{Style.RESET_ALL}") print(f"{Fore.YELLOW}Using: {test_file}{Style.RESET_ALL}") # Remove .py extension for import module_name = test_file[:-3] if test_file.endswith('.py') else test_file try: # First try a normal import print(f"{Fore.YELLOW}Attempting to import module: {module_name}{Style.RESET_ALL}") test_module = importlib.import_module(module_name) print(f"{Fore.GREEN}Successfully imported test module: {module_name}{Style.RESET_ALL}") except ImportError as e: print(f"{Fore.RED}Error importing module {module_name}: {e}{Style.RESET_ALL}") # Try different import approaches try: # Try to load as a relative module print(f"{Fore.YELLOW}Trying relative import...{Style.RESET_ALL}") test_module = importlib.import_module('.' + module_name, package=__package__) print(f"{Fore.GREEN}Imported test module via relative import: {module_name}{Style.RESET_ALL}") except ImportError as e: print(f"{Fore.RED}Relative import failed: {e}{Style.RESET_ALL}") # Fall back to exec (not recommended but sometimes necessary) print(f"{Fore.YELLOW}Attempting to load using exec: {test_file}{Style.RESET_ALL}") try: with open(test_file, 'r') as f: module_content = f.read() # Create a new module namespace module_namespace = {} # Execute the module code in the namespace exec(module_content, module_namespace) # Create a mock module class MockModule: pass test_module = MockModule() # Copy the relevant attributes to the mock module for key, value in module_namespace.items(): setattr(test_module, key, value) print(f"{Fore.GREEN}Loaded test module using exec: {test_file}{Style.RESET_ALL}") except Exception as e: print(f"{Fore.RED}Failed to load module using exec: {e}{Style.RESET_ALL}") traceback.print_exc() return False # Find test case class test_case_class = None print(f"{Fore.YELLOW}Looking for test case class in module...{Style.RESET_ALL}") for item_name in dir(test_module): item = getattr(test_module, item_name) if inspect.isclass(item) and (item_name.startswith('Test') or 'Test' in item_name): print(f"{Fore.GREEN}Found test case class: {item_name}{Style.RESET_ALL}") test_case_class = item break if not test_case_class: print(f"{Fore.RED}No test case class found in {test_file}{Style.RESET_ALL}") return False # Try to check for required imports try: print(f"{Fore.YELLOW}Checking for agent registry...{Style.RESET_ALL}") try: # First try direct import from modules.workflowAgentsRegistry import getAgentRegistry print(f"{Fore.GREEN}Successfully imported getAgentRegistry{Style.RESET_ALL}") except ImportError: try: # Try alternate path from modules.workflowAgentsRegistry import getAgentRegistry print(f"{Fore.GREEN}Successfully imported getAgentRegistry from modules{Style.RESET_ALL}") except ImportError: print(f"{Fore.YELLOW}Agent registry import not found - may cause issues{Style.RESET_ALL}") except Exception as e: print(f"{Fore.YELLOW}Error checking agent registry: {e}{Style.RESET_ALL}") # Run the tests print(f"{Fore.CYAN}Starting test execution{Style.RESET_ALL}") runner = AsyncTestRunner() return await runner.run_test_case(test_case_class, test_filter) if __name__ == "__main__": # Get test filter from command line test_file = None test_filter = None if len(sys.argv) > 1: # Check if first arg is a file if os.path.exists(sys.argv[1]) or sys.argv[1].endswith('.py'): test_file = sys.argv[1] if len(sys.argv) > 2: test_filter = sys.argv[2] else: test_filter = sys.argv[1] # Run tests asyncio.run(run_tests(test_file, test_filter)) class MockDomInterface: def __init__(self, *args, **kwargs): self.getWorkflow = MagicMock(return_value=None) self.loadWorkflowState = MagicMock(return_value=None) self.createWorkflow = MagicMock() self.updateWorkflow = MagicMock() self.createWorkflowLog = MagicMock() self.createWorkflowMessage = MagicMock() self.getFile = MagicMock() self.getFileData = MagicMock() self.saveUploadedFile = MagicMock() self.userLanguage = "en" self.callAi = AsyncMock() self.setUserLanguage = MagicMock() def reset_mock(self): """Reset all mocks in this interface""" for attr_name in dir(self): attr = getattr(self, attr_name) if hasattr(attr, 'reset_mock'): attr.reset_mock() class MockAgentRegistry: def __init__(self): self.getAgent = MagicMock() self.getAgentInfos = MagicMock(return_value=[ {"name": "test_agent", "description": "Test agent", "capabilities": ["text_processing"]} ]) self.setMydom = MagicMock() def reset_mock(self): """Reset all mocks in this registry""" for attr_name in dir(self): attr = getattr(self, attr_name) if hasattr(attr, 'reset_mock'): attr.reset_mock()