#!/usr/bin/env python3 """ OpenAI Timeout Analysis Test - Tests OpenAI API calls to identify timeout issues Compares different scenarios to understand why OpenAI calls fail in functional tests but work in module tests. """ import asyncio import json import sys import os import time from typing import Dict, Any, List # Add the gateway to path (go up 2 levels from tests/functional/) _gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if _gateway_path not in sys.path: sys.path.insert(0, _gateway_path) from modules.services import getInterface as getServices from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects class OpenAITimeoutTester: """Test OpenAI API calls to identify timeout issues.""" def __init__(self): # Use root user for testing (has full access to everything) from modules.interfaces.interfaceDbAppObjects import getRootInterface rootInterface = getRootInterface() self.testUser = rootInterface.currentUser # Initialize services self.services = getServices(self.testUser, None) self.testResults = [] async def initialize(self): """Initialize workflow and services.""" import logging import uuid import time as time_module # Set logging level to DEBUG to see detailed logs logging.getLogger().setLevel(logging.DEBUG) # Create and save workflow in database currentTimestamp = time_module.time() testWorkflow = ChatWorkflow( id=str(uuid.uuid4()), name="OpenAI Timeout Test Workflow", status="running", startedAt=currentTimestamp, lastActivity=currentTimestamp, currentRound=1, currentTask=0, currentAction=0, totalTasks=0, totalActions=0, mandateId=self.testUser.mandateId, messageIds=[], workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC, maxSteps=5 ) # Save workflow to database interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) workflowDict = testWorkflow.model_dump() interfaceDbChat.createWorkflow(workflowDict) # Set the workflow in services self.services.workflow = testWorkflow print("āœ… Services initialized") print(f"šŸ“‹ Workflow ID: {testWorkflow.id}") async def testDirectOpenAICall(self, prompt: str, description: str) -> Dict[str, Any]: """Test direct OpenAI API call through the connector.""" print(f"\n{'='*80}") print(f"TEST: {description}") print(f"{'='*80}") print(f"Prompt: {prompt[:100]}...") startTime = time.time() result = { "description": description, "prompt": prompt, "success": False, "error": None, "responseTime": 0, "responseLength": 0, "method": "direct_connector" } try: # Get OpenAI connector directly from modules.aicore.aicorePluginOpenai import AiOpenai from modules.datamodels.datamodelAi import AiModelCall, AiModel # Initialize connector connector = AiOpenai() # Get the gpt-4o model models = connector.getModels() gpt4oModel = None for model in models: if model.name == "gpt-4o": gpt4oModel = model break if not gpt4oModel: result["error"] = "gpt-4o model not found" return result # Create model call modelCall = AiModelCall( model=gpt4oModel, messages=[{"role": "user", "content": prompt}], options=AiCallOptions() ) # Make the call print(f"ā±ļø Starting API call...") response = await connector.callAiBasic(modelCall) endTime = time.time() responseTime = endTime - startTime result["success"] = True result["responseTime"] = responseTime result["responseLength"] = len(response.content) if response.content else 0 print(f"āœ… Success! Response time: {responseTime:.2f}s") print(f"šŸ“Š Response length: {result['responseLength']} characters") print(f"šŸ“ Response preview: {response.content[:200] if response.content else 'None'}...") except Exception as e: endTime = time.time() responseTime = endTime - startTime result["error"] = str(e) result["responseTime"] = responseTime print(f"āŒ Failed after {responseTime:.2f}s") print(f"šŸ’„ Error: {type(e).__name__}: {str(e)}") import traceback print(f"šŸ“‹ Traceback:\n{traceback.format_exc()}") self.testResults.append(result) return result async def testThroughAiService(self, prompt: str, description: str) -> Dict[str, Any]: """Test OpenAI call through AiService.callAiContent.""" print(f"\n{'='*80}") print(f"TEST: {description}") print(f"{'='*80}") print(f"Prompt: {prompt[:100]}...") startTime = time.time() result = { "description": description, "prompt": prompt, "success": False, "error": None, "responseTime": 0, "responseLength": 0, "method": "ai_service" } try: from modules.datamodels.datamodelWorkflow import AiResponse options = AiCallOptions( operationType=OperationTypeEnum.DATA_GENERATE ) print(f"ā±ļø Starting AI service call...") aiResponse: AiResponse = await self.services.ai.callAiContent( prompt=prompt, options=options, outputFormat="json" ) endTime = time.time() responseTime = endTime - startTime result["success"] = True result["responseTime"] = responseTime if isinstance(aiResponse, AiResponse): content = aiResponse.content if aiResponse.content else "" result["responseLength"] = len(content) print(f"āœ… Success! Response time: {responseTime:.2f}s") print(f"šŸ“Š Response length: {result['responseLength']} characters") print(f"šŸ“ Response preview: {content[:200] if content else 'None'}...") else: result["responseLength"] = len(str(aiResponse)) print(f"āœ… Success! Response time: {responseTime:.2f}s") print(f"šŸ“Š Response length: {result['responseLength']} characters") except Exception as e: endTime = time.time() responseTime = endTime - startTime result["error"] = str(e) result["responseTime"] = responseTime print(f"āŒ Failed after {responseTime:.2f}s") print(f"šŸ’„ Error: {type(e).__name__}: {str(e)}") import traceback print(f"šŸ“‹ Traceback:\n{traceback.format_exc()}") self.testResults.append(result) return result async def testTimeoutConfiguration(self) -> Dict[str, Any]: """Test timeout configuration of OpenAI connector.""" print(f"\n{'='*80}") print("TEST: Timeout Configuration Analysis") print(f"{'='*80}") result = { "description": "Timeout Configuration Analysis", "timeout": None, "httpClientTimeout": None, "connectorType": None } try: from modules.aicore.aicorePluginOpenai import AiOpenai connector = AiOpenai() result["connectorType"] = connector.getConnectorType() # Check httpClient timeout if hasattr(connector, 'httpClient'): httpClient = connector.httpClient if hasattr(httpClient, 'timeout'): result["httpClientTimeout"] = str(httpClient.timeout) print(f"šŸ“‹ HttpClient timeout: {httpClient.timeout}") else: print(f"āš ļø HttpClient has no timeout attribute") # Check for timeout in config from modules.shared.configuration import APP_CONFIG openaiTimeout = APP_CONFIG.get('Connector_AiOpenai_TIMEOUT', None) if openaiTimeout: result["timeout"] = openaiTimeout print(f"šŸ“‹ Config timeout: {openaiTimeout}") else: print(f"šŸ“‹ No timeout in config (using default)") print(f"āœ… Timeout analysis complete") except Exception as e: result["error"] = str(e) print(f"āŒ Error analyzing timeout: {str(e)}") import traceback print(f"šŸ“‹ Traceback:\n{traceback.format_exc()}") self.testResults.append(result) return result def printSummary(self): """Print test summary.""" print(f"\n{'='*80}") print("OPENAI TIMEOUT TEST SUMMARY") print(f"{'='*80}") for i, result in enumerate(self.testResults, 1): print(f"\n[{i}] {result.get('description', 'Unknown')}") print(f" Method: {result.get('method', 'N/A')}") print(f" Success: {'āœ…' if result.get('success') else 'āŒ'}") if result.get('responseTime'): print(f" Response Time: {result['responseTime']:.2f}s") if result.get('responseLength'): print(f" Response Length: {result['responseLength']} characters") if result.get('error'): print(f" Error: {result['error'][:200]}...") if result.get('timeout'): print(f" Timeout Config: {result['timeout']}") if result.get('httpClientTimeout'): print(f" HttpClient Timeout: {result['httpClientTimeout']}") # Analyze failures failures = [r for r in self.testResults if not r.get('success')] if failures: print(f"\n{'='*80}") print(f"FAILURES: {len(failures)}/{len(self.testResults)}") print(f"{'='*80}") for failure in failures: print(f"\nāŒ {failure.get('description')}") print(f" Method: {failure.get('method')}") print(f" Error: {failure.get('error', 'Unknown')[:200]}...") # Test scenarios TEST_SCENARIOS = [ { "description": "Simple prompt (should work)", "prompt": "Say hello in one sentence." }, { "description": "Medium complexity prompt", "prompt": "Generate a list of the first 100 prime numbers." }, { "description": "Complex prompt (5000 primes - known to timeout)", "prompt": "Generate the first 5000 prime numbers in a table with 10 columns per row." }, { "description": "Very simple JSON generation", "prompt": "Generate a JSON object with one field 'message' containing 'Hello World'." } ] async def main(): """Run OpenAI timeout analysis tests.""" tester = OpenAITimeoutTester() print("="*80) print("OPENAI TIMEOUT ANALYSIS TEST") print("="*80) print("\nThis test analyzes why OpenAI calls timeout in functional tests.") print("It compares direct connector calls vs AiService calls.\n") await tester.initialize() # Test timeout configuration first await tester.testTimeoutConfiguration() # Test each scenario with both methods for scenario in TEST_SCENARIOS: prompt = scenario["prompt"] description = scenario["description"] # Test 1: Direct connector call await tester.testDirectOpenAICall( prompt=f"{description} - {prompt}", description=f"{description} (Direct Connector)" ) # Wait a bit between tests await asyncio.sleep(2) # Test 2: Through AiService await tester.testThroughAiService( prompt=f"{description} - {prompt}", description=f"{description} (AiService)" ) # Wait between scenarios await asyncio.sleep(3) # Print summary tester.printSummary() print(f"\n{'='*80}") print("TEST COMPLETE") print(f"{'='*80}") if __name__ == "__main__": asyncio.run(main())