#!/usr/bin/env python3 """ AI Models Test - Tests all available AI models individually """ import asyncio import json import sys import os import base64 from datetime import datetime from typing import Dict, Any, List # Add the gateway to path sys.path.append(os.path.dirname(__file__)) # Import the service initialization from modules.features.chatPlayground.mainChatPlayground import getServices from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.datamodels.datamodelUam import User class AIModelsTester: def __init__(self): # Create a minimal user context for testing testUser = User( id="test_user", username="test_user", email="test@example.com", fullName="Test User", language="en", mandateId="test_mandate" ) # Initialize services using the existing system self.services = getServices(testUser, None) # Test user, no workflow self.testResults = [] # Create logs directory if it doesn't exist self.logsDir = os.path.join(os.path.dirname(__file__), "..", "local", "logs") os.makedirs(self.logsDir, exist_ok=True) # Create modeltest subdirectory self.modelTestDir = os.path.join(self.logsDir, "modeltest") os.makedirs(self.modelTestDir, exist_ok=True) # Copy test image to modeltest directory if it exists testImageSource = os.path.join(self.logsDir, "_testdata_photo_2025-06-03_13-05-52.jpg") testImageDest = os.path.join(self.modelTestDir, "_testdata_photo_2025-06-03_13-05-52.jpg") if os.path.exists(testImageSource) and not os.path.exists(testImageDest): import shutil shutil.copy2(testImageSource, testImageDest) print(f"📷 Test image copied to: {testImageDest}") async def initialize(self): """Initialize the AI service.""" # Set logging level to INFO to reduce noise import logging logging.getLogger().setLevel(logging.INFO) # The AI service needs to be recreated with proper initialization from modules.services.serviceAi.mainServiceAi import AiService self.services.ai = await AiService.create(self.services) # Create a minimal workflow context from modules.datamodels.datamodelChat import ChatWorkflow import uuid self.services.currentWorkflow = ChatWorkflow( id=str(uuid.uuid4()), name="Test Workflow", status="running", startedAt=self.services.utils.timestampGetUtc(), lastActivity=self.services.utils.timestampGetUtc(), currentRound=1, currentTask=0, currentAction=0, totalTasks=0, totalActions=0, mandateId="test_mandate", messageIds=[], workflowMode="React", maxSteps=5 ) print("✅ AI Service initialized successfully") print(f"📁 Results will be saved to: {self.modelTestDir}") async def testModel(self, modelName: str) -> Dict[str, Any]: """Test a specific AI model with a simple prompt.""" print(f"\n{'='*60}") print(f"TESTING MODEL: {modelName}") print(f"{'='*60}") # Use same prompt for all web models import json if "tavily" in modelName.lower() or "perplexity" in modelName.lower() or "llama" in modelName.lower() or "sonar" in modelName.lower() or "mistral" in modelName.lower(): # All web models use the same JSON formatted prompt # Country format: Use full name for Tavily (Switzerland), Perplexity converts ISO codes to names testPrompt = json.dumps({ "prompt": "Research, what ValueOn company in switzerland does and who works there? Return as JSON.", "maxResults": 5, "timeRange": "y", "country": "CH", # ISO-2 code, Perplexity will convert to "Switzerland" "format": "json" }, indent=2) else: # Fallback for other models testPrompt = "Generate a comprehensive analysis of the current state of artificial intelligence. Return as JSON." print(f"Test prompt: {testPrompt}") print(f"Prompt length: {len(testPrompt)} characters") startTime = asyncio.get_event_loop().time() try: # Create options to force this specific model if "internal" in modelName.lower(): options = AiCallOptions( operationType=OperationTypeEnum.DATA_EXTRACT, preferredModel=modelName ) else: options = AiCallOptions( operationType=OperationTypeEnum.DATA_GENERATE, preferredModel=modelName ) # Call the AI service DIRECTLY through the model's functionCall # This tests the actual model, not the document generation pipeline # Get the model directly from the registry using the model registry from modules.aicore.aicoreModelRegistry import modelRegistry model = modelRegistry.getModel(modelName) if not model: raise Exception(f"Model {modelName} not found") # Create AiModelCall and call the model's functionCall directly from modules.datamodels.datamodelAi import AiModelCall import base64 import os # Prepare messages and options based on model type if "vision" in modelName.lower(): # For vision models, skip for now since they require special handling print(f"⚠️ Skipping vision model {modelName} - requires special image handling") return { "modelName": modelName, "status": "SKIPPED", "processingTime": 0.0, "responseLength": 0, "responseType": "skipped", "hasContent": False, "error": "Vision model requires special image handling", "fullResponse": "Skipped - vision model requires special image handling" } else: # For other models, use normal functionCall messages = [{"role": "user", "content": testPrompt}] modelCall = AiModelCall( messages=messages, model=model, options=options ) response = await model.functionCall(modelCall) endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime # Analyze response - now we get AiModelResponse objects if hasattr(response, 'success'): # AiModelResponse object if response.success: result = { "modelName": modelName, "status": "SUCCESS", "processingTime": round(processingTime, 2), "responseLength": len(response.content) if response.content else 0, "responseType": "AiModelResponse", "hasContent": bool(response.content), "error": None, "modelUsed": modelName, "priceUsd": 0.0, # AiModelResponse doesn't have price info "bytesSent": 0, "bytesReceived": len(response.content.encode('utf-8')) if response.content else 0 } # Try to parse content as JSON if response.content: try: json.loads(response.content) result["isValidJson"] = True except: result["isValidJson"] = False result["responsePreview"] = response.content[:200] + "..." if len(response.content) > 200 else response.content result["fullResponse"] = response.content else: result["isValidJson"] = False result["responsePreview"] = "Empty response" result["fullResponse"] = "" print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s") print(f"📄 Response length: {len(response.content) if response.content else 0} characters") print(f"📄 Model used: {modelName}") print(f"📄 Response preview: {result['responsePreview']}") else: error = response.error or "Unknown error" result = { "modelName": modelName, "status": "ERROR", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "AiModelResponse", "hasContent": False, "error": error, "fullResponse": str(response) } print(f"❌ ERROR - {error}") elif isinstance(response, dict): # Fallback for dict responses if response.get("success", True): result = { "modelName": modelName, "status": "SUCCESS", "processingTime": round(processingTime, 2), "responseLength": len(str(response)), "responseType": "dict", "hasContent": True, "error": None } # Try to parse as JSON try: jsonResponse = json.dumps(response, indent=2) result["responsePreview"] = jsonResponse[:200] + "..." if len(jsonResponse) > 200 else jsonResponse result["isValidJson"] = True result["fullResponse"] = jsonResponse except: result["responsePreview"] = str(response)[:200] + "..." if len(str(response)) > 200 else str(response) result["isValidJson"] = False result["fullResponse"] = str(response) print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s") print(f"📄 Response length: {len(str(response))} characters") print(f"📄 Response preview: {result['responsePreview']}") else: error = response.get("error", "Unknown error") result = { "modelName": modelName, "status": "ERROR", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "error", "hasContent": False, "error": error, "fullResponse": str(response) } print(f"❌ ERROR - {error}") else: # String response result = { "modelName": modelName, "status": "SUCCESS", "processingTime": round(processingTime, 2), "responseLength": len(str(response)), "responseType": "string", "hasContent": True, "error": None } # Try to parse as JSON try: json.loads(str(response)) result["isValidJson"] = True except: result["isValidJson"] = False result["responsePreview"] = str(response)[:200] + "..." if len(str(response)) > 200 else str(response) result["fullResponse"] = str(response) print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s") print(f"📄 Response length: {len(str(response))} characters") print(f"📄 Response preview: {result['responsePreview']}") # Save text response for all models if result.get("status") == "SUCCESS": self._saveTextResponse(modelName, result) except Exception as e: endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime result = { "modelName": modelName, "status": "EXCEPTION", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "exception", "hasContent": False, "error": str(e) } print(f"💥 EXCEPTION - {str(e)}") self.testResults.append(result) # Save individual model result immediately self._saveIndividualModelResult(modelName, result) return result def _saveImageResponse(self, modelName: str, result: Dict[str, Any]): """Save base64 image response to file.""" try: fullResponse = result.get("fullResponse", "") base64Data = None # Try to extract base64 data from response if isinstance(fullResponse, dict): # Look for base64 data in the response if "content" in fullResponse: base64Data = fullResponse["content"] elif "data" in fullResponse: base64Data = fullResponse["data"] elif "image" in fullResponse: base64Data = fullResponse["image"] else: # Try to find base64 data in string response import re base64Match = re.search(r'data:image/[^;]+;base64,([A-Za-z0-9+/=]+)', str(fullResponse)) if base64Match: base64Data = base64Match.group(1) else: # Try to find pure base64 string base64Match = re.search(r'([A-Za-z0-9+/=]{100,})', str(fullResponse)) if base64Match: base64Data = base64Match.group(1) if base64Data: # Clean base64 data if base64Data.startswith('data:image/'): base64Data = base64Data.split(',', 1)[1] # Decode and save image imageData = base64.b64decode(base64Data) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{modelName}_{timestamp}.png" filepath = os.path.join(self.modelTestDir, filename) with open(filepath, 'wb') as f: f.write(imageData) result["savedImage"] = filepath print(f"🖼️ Image saved: {filepath}") else: print(f"⚠️ No base64 image data found in response") except Exception as e: print(f"❌ Error saving image: {str(e)}") result["imageSaveError"] = str(e) def _saveTextResponse(self, modelName: str, result: Dict[str, Any]): """Save text response to file.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{modelName}_{timestamp}.txt" filepath = os.path.join(self.modelTestDir, filename) # Prepare content for saving content = result.get("fullResponse", "") if not content: content = result.get("responsePreview", "No content available") # Add metadata header metadata = f"""Model: {modelName} Test Time: {timestamp} Status: {result.get('status', 'Unknown')} Processing Time: {result.get('processingTime', 0):.2f}s Response Length: {result.get('responseLength', 0)} characters Is Valid JSON: {result.get('isValidJson', False)} --- RESPONSE CONTENT --- {content} """ with open(filepath, 'w', encoding='utf-8') as f: f.write(metadata) result["savedTextFile"] = filepath print(f"📄 Text response saved: {filepath}") except Exception as e: print(f"❌ Error saving text response: {str(e)}") result["textSaveError"] = str(e) def _saveIndividualModelResult(self, modelName: str, result: Dict[str, Any]): """Save individual model test result to file.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{modelName}_{timestamp}.json" filepath = os.path.join(self.modelTestDir, filename) # Prepare individual result data individualData = { "modelName": modelName, "testTimestamp": timestamp, "testDate": datetime.now().isoformat(), "result": result } # Save to JSON file with open(filepath, 'w', encoding='utf-8') as f: json.dump(individualData, f, indent=2, ensure_ascii=False) print(f"📄 Individual result saved: {filename}") except Exception as e: print(f"❌ Error saving individual result: {str(e)}") def getAllAvailableModels(self) -> List[str]: """Get all available model names.""" # Hardcoded list of known models - same approach as test_ai_behavior.py return [ # "claude-3-5-sonnet-20241022", # Skipped - text model, test later # "claude-3-5-sonnet-20241022-vision", # Skipped - requires image input # "gpt-4o", # Skipped - text model, test later # "gpt-3.5-turbo", # Skipped - text model, test later # "gpt-4o-vision", # Skipped - requires image input # "dall-e-3", # Skipped - image generation, test later "sonar", # Perplexity web model "sonar-pro", # Perplexity web model "tavily-search", # Tavily web model (unified research) # "internal-extractor", # Skipped - internal model, test later # "internal-generator", # Skipped - internal model, test later # "internal-renderer" # Skipped - internal model, test later ] def saveTestResults(self): """Save detailed test results to file.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") resultsFile = os.path.join(self.modelTestDir, f"modeltest_results_{timestamp}.json") # Prepare results for saving saveData = { "testTimestamp": timestamp, "testDate": datetime.now().isoformat(), "totalModels": len(self.testResults), "successfulModels": len([r for r in self.testResults if r["status"] == "SUCCESS"]), "errorModels": len([r for r in self.testResults if r["status"] == "ERROR"]), "exceptionModels": len([r for r in self.testResults if r["status"] == "EXCEPTION"]), "results": self.testResults } # Calculate success rate if saveData["totalModels"] > 0: saveData["successRate"] = (saveData["successfulModels"] / saveData["totalModels"]) * 100 else: saveData["successRate"] = 0 # Save to JSON file with open(resultsFile, 'w', encoding='utf-8') as f: json.dump(saveData, f, indent=2, ensure_ascii=False) print(f"📄 Detailed results saved: {resultsFile}") return resultsFile def printTestSummary(self): """Print a summary of all test results.""" print(f"\n{'='*80}") print("AI MODELS TEST SUMMARY") print(f"{'='*80}") totalModels = len(self.testResults) successfulModels = len([r for r in self.testResults if r["status"] == "SUCCESS"]) errorModels = len([r for r in self.testResults if r["status"] == "ERROR"]) exceptionModels = len([r for r in self.testResults if r["status"] == "EXCEPTION"]) print(f"📊 Total models tested: {totalModels}") print(f"✅ Successful: {successfulModels}") print(f"❌ Errors: {errorModels}") print(f"💥 Exceptions: {exceptionModels}") print(f"📈 Success rate: {(successfulModels/totalModels*100):.1f}%" if totalModels > 0 else "0%") print(f"\n{'='*80}") print("DETAILED RESULTS") print(f"{'='*80}") for result in self.testResults: status_icon = { "SUCCESS": "✅", "ERROR": "❌", "EXCEPTION": "💥" }.get(result["status"], "❓") print(f"\n{status_icon} {result['modelName']}") print(f" Status: {result['status']}") print(f" Processing time: {result['processingTime']}s") print(f" Response length: {result['responseLength']} characters") print(f" Response type: {result['responseType']}") if result.get("isValidJson") is not None: print(f" Valid JSON: {'Yes' if result['isValidJson'] else 'No'}") if result["error"]: print(f" Error: {result['error']}") if result.get("responsePreview"): print(f" Preview: {result['responsePreview']}") # Find fastest and slowest models if successfulModels > 0: successfulResults = [r for r in self.testResults if r["status"] == "SUCCESS"] fastest = min(successfulResults, key=lambda x: x["processingTime"]) slowest = max(successfulResults, key=lambda x: x["processingTime"]) print(f"\n{'='*80}") print("PERFORMANCE HIGHLIGHTS") print(f"{'='*80}") print(f"🚀 Fastest model: {fastest['modelName']} ({fastest['processingTime']}s)") print(f"🐌 Slowest model: {slowest['modelName']} ({slowest['processingTime']}s)") async def main(): """Run AI models testing.""" tester = AIModelsTester() print("Starting AI Models Testing...") print("Initializing AI service...") await tester.initialize() # Get all available models models = tester.getAllAvailableModels() print(f"\nFound {len(models)} models to test:") for i, model in enumerate(models, 1): print(f" {i}. {model}") print(f"\n{'='*80}") print("STARTING INDIVIDUAL MODEL TESTS") print(f"{'='*80}") print("Press Enter after each model test to continue to the next one...") # Test each model individually for i, modelName in enumerate(models, 1): print(f"\n[{i}/{len(models)}] Testing model: {modelName}") # Test the model await tester.testModel(modelName) # Pause for user input (except for the last model) if i < len(models): input(f"\nPress Enter to continue to the next model...") # Save detailed results to file resultsFile = tester.saveTestResults() # Print final summary tester.printTestSummary() print(f"\n{'='*80}") print("TESTING COMPLETED") print(f"{'='*80}") print(f"📄 Results saved to: {resultsFile}") print(f"📁 Images saved to: {tester.modelTestDir}") if __name__ == "__main__": asyncio.run(main())