gateway/tests/functional/test02_ai_models.py
2026-01-23 01:10:00 +01:00

905 lines
39 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
AI Models Test - Tests ALL operation types on ALL models that support them
This script tests all available models with all their supported operation types:
- PLAN: Planning operations
- DATA_ANALYSE: Data analysis
- DATA_GENERATE: Data generation
- DATA_EXTRACT: Data extraction
- IMAGE_ANALYSE: Image analysis
- IMAGE_GENERATE: Image generation
- WEB_SEARCH_DATA: Web search
- WEB_CRAWL: Web crawling
For each model, it tests every operation type the model supports and validates
the results. Results are saved to files for analysis.
"""
import asyncio
import json
import sys
import os
import base64
from datetime import datetime
from typing import Dict, Any, List
# Add the gateway to path (go up 2 levels from tests/functional/)
_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if _gateway_path not in sys.path:
sys.path.insert(0, _gateway_path)
# Import the service initialization
from modules.services import getInterface as getServices
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
from modules.datamodels.datamodelUam import User
class AIModelsTester:
def __init__(self):
# Create a minimal user context for testing
testUser = User(
id="test_user",
username="test_user",
email="test@example.com",
fullName="Test User",
language="en",
mandateId="test_mandate"
)
# Initialize services using the existing system
self.services = getServices(testUser, None) # Test user, no workflow
self.testResults = []
# Create logs directory if it doesn't exist (go up 2 levels from tests/unit/services/)
_gateway_dir = os.path.dirname(_gateway_path)
self.logsDir = os.path.join(_gateway_dir, "local", "logs")
os.makedirs(self.logsDir, exist_ok=True)
# Create modeltest subdirectory
self.modelTestDir = os.path.join(self.logsDir, "modeltest")
os.makedirs(self.modelTestDir, exist_ok=True)
async def initialize(self):
"""Initialize the AI service."""
# Set logging level to DEBUG for detailed output
import logging
logging.getLogger().setLevel(logging.DEBUG)
# Initialize the model registry with all connectors
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicorePluginTavily import AiTavily
from modules.aicore.aicorePluginPerplexity import AiPerplexity
# Note: We don't need to register web connectors for IMAGE_ANALYSE testing
# modelRegistry.registerConnector(AiTavily())
# modelRegistry.registerConnector(AiPerplexity())
# The AI service needs to be recreated with proper initialization
from modules.services.serviceAi.mainServiceAi import AiService
self.services.ai = await AiService.create(self.services)
# Also initialize extraction service for image processing
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
self.services.extraction = ExtractionService(self.services)
# Create a minimal workflow context
from modules.datamodels.datamodelChat import ChatWorkflow, WorkflowModeEnum
import uuid
self.services.currentWorkflow = ChatWorkflow(
id=str(uuid.uuid4()),
name="Test Workflow",
status="running",
startedAt=self.services.utils.timestampGetUtc(),
lastActivity=self.services.utils.timestampGetUtc(),
currentRound=1,
currentTask=0,
currentAction=0,
totalTasks=0,
totalActions=0,
mandateId="test_mandate",
messageIds=[],
workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC,
maxSteps=5
)
print("✅ AI Service initialized successfully")
print(f"📁 Results will be saved to: {self.modelTestDir}")
def _getTestPromptForOperation(self, operationType) -> str:
"""Get appropriate test prompt for each operation type."""
from modules.datamodels.datamodelAi import OperationTypeEnum
prompts = {
OperationTypeEnum.PLAN: "Create a project plan for developing a mobile app with 5 main tasks.",
OperationTypeEnum.DATA_ANALYSE: "Analyze the pros and cons of cloud computing.",
OperationTypeEnum.DATA_GENERATE: "Generate a list of 10 creative marketing ideas for a tech startup.",
OperationTypeEnum.DATA_EXTRACT: "Extract key information from this text about artificial intelligence trends.",
OperationTypeEnum.IMAGE_ANALYSE: "Describe what you see in this image.",
OperationTypeEnum.IMAGE_GENERATE: "A futuristic cityscape with flying cars and neon lights.",
OperationTypeEnum.WEB_SEARCH_DATA: "Who works in valueon ag in switzerland?", # Search query for valueon.ch
OperationTypeEnum.WEB_CRAWL: "https://www.valueon.ch" # URL to crawl
}
return prompts.get(operationType, "Test prompt for this operation type.")
def _createTestImage(self) -> str:
"""Load test image file and convert to base64 data URL."""
import base64
# Path to test image (relative to gateway directory)
testImagePath = os.path.join(
os.path.dirname(__file__), # tests/functional/
"..", # tests/
"testdata", # tests/testdata/
"Foto20250906_125903.jpg"
)
# Resolve absolute path
testImagePath = os.path.abspath(testImagePath)
if not os.path.exists(testImagePath):
raise FileNotFoundError(f"Test image not found at: {testImagePath}")
# Read image file and convert to base64
with open(testImagePath, 'rb') as f:
imageBytes = f.read()
imageBase64 = base64.b64encode(imageBytes).decode('utf-8')
return f"data:image/jpeg;base64,{imageBase64}"
async def testModelOperation(self, modelName: str, operationType, model) -> Dict[str, Any]:
"""Test a specific AI model with a specific operation type."""
print(f"\n Testing operation: {operationType.name}")
testPrompt = self._getTestPromptForOperation(operationType)
startTime = asyncio.get_event_loop().time()
try:
# Create messages - format differs for IMAGE_ANALYSE
from modules.datamodels.datamodelAi import OperationTypeEnum
if operationType == OperationTypeEnum.IMAGE_ANALYSE:
# For image analysis, content must be a list with text and image
testImage = self._createTestImage()
messages = [{
"role": "user",
"content": [
{"type": "text", "text": testPrompt},
{"type": "image_url", "image_url": {"url": testImage}}
]
}]
else:
# For other operations, simple text content
messages = [{"role": "user", "content": testPrompt}]
# Create model call options
from modules.datamodels.datamodelAi import (
AiModelCall, AiCallOptions, AiCallPromptImage,
AiCallPromptWebSearch, AiCallPromptWebCrawl
)
import json
options = AiCallOptions(operationType=operationType)
# Format message content based on operation type
if operationType == OperationTypeEnum.IMAGE_GENERATE:
# Create structured prompt with image generation parameters
imagePrompt = AiCallPromptImage(
prompt=testPrompt,
size="1024x1024",
quality="standard",
style="vivid"
)
# Update message content to JSON format
messages[0]["content"] = json.dumps(imagePrompt.model_dump())
elif operationType == OperationTypeEnum.WEB_SEARCH_DATA:
# Create structured prompt for web search
webSearchPrompt = AiCallPromptWebSearch(
instruction=testPrompt,
maxNumberPages=5 # Limit for testing
)
# Update message content to JSON format
messages[0]["content"] = json.dumps(webSearchPrompt.model_dump())
elif operationType == OperationTypeEnum.WEB_CRAWL:
# Create structured prompt for web crawl
webCrawlPrompt = AiCallPromptWebCrawl(
instruction="Extract the main content from this page",
url=testPrompt, # testPrompt contains the URL
maxDepth=1, # Limit for testing
maxWidth=3 # Limit for testing
)
# Update message content to JSON format
messages[0]["content"] = json.dumps(webCrawlPrompt.model_dump())
modelCall = AiModelCall(
messages=messages,
model=model,
options=options
)
# Call model directly
modelResponse = await model.functionCall(modelCall)
if not modelResponse.success:
raise Exception(f"Model call failed: {modelResponse.error}")
result = modelResponse.content
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
# Analyze result based on operation type
analysisResult = {
"modelName": modelName,
"operationType": operationType.name,
"status": "SUCCESS",
"processingTime": round(processingTime, 2),
"responseLength": len(str(result)) if result else 0,
"hasContent": bool(result),
"error": None,
"testPrompt": testPrompt,
"fullResponse": str(result) if result else ""
}
# Operation-specific analysis
if operationType == OperationTypeEnum.IMAGE_GENERATE:
analysisResult["responseType"] = "base64_image"
import base64
try:
if isinstance(result, str) and result.startswith("data:image"):
base64Data = result.split(",")[1] if "," in result else result
else:
base64Data = result if isinstance(result, str) else ""
if base64Data:
imageBytes = base64.b64decode(base64Data)
analysisResult["isValidBase64"] = True
analysisResult["imageByteSize"] = len(imageBytes)
else:
analysisResult["isValidBase64"] = False
analysisResult["imageByteSize"] = 0
except:
analysisResult["isValidBase64"] = False
analysisResult["imageByteSize"] = 0
elif operationType in [OperationTypeEnum.DATA_ANALYSE, OperationTypeEnum.DATA_GENERATE, OperationTypeEnum.PLAN]:
analysisResult["responseType"] = "text"
try:
import json
json.loads(str(result))
analysisResult["isValidJson"] = True
except:
analysisResult["isValidJson"] = False
else:
analysisResult["responseType"] = "text"
analysisResult["responsePreview"] = str(result)[:200] + "..." if len(str(result)) > 200 else str(result)
print(f" ✅ SUCCESS - Processing time: {processingTime:.2f}s, Response length: {analysisResult['responseLength']} chars")
return analysisResult
except Exception as e:
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
result = {
"modelName": modelName,
"operationType": operationType.name,
"status": "EXCEPTION",
"processingTime": round(processingTime, 2),
"responseLength": 0,
"responseType": "exception",
"hasContent": False,
"error": str(e),
"testPrompt": testPrompt,
"fullResponse": ""
}
print(f" 💥 EXCEPTION - {str(e)}")
return result
async def testModel(self, modelInfo: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Test a specific AI model with all its supported operation types."""
modelName = modelInfo["displayName"]
operationTypes = modelInfo["operationTypes"]
print(f"\n{'='*60}")
print(f"TESTING MODEL: {modelName}")
print(f"Supported operations: {', '.join([op.name for op in operationTypes])}")
print(f"{'='*60}")
# Get model from registry
from modules.aicore.aicoreModelRegistry import modelRegistry
model = modelRegistry.getModel(modelName)
if not model:
errorResult = {
"modelName": modelName,
"operationType": "ALL",
"status": "ERROR",
"processingTime": 0,
"responseLength": 0,
"responseType": "error",
"hasContent": False,
"error": f"Model {modelName} not found in registry",
"fullResponse": ""
}
self.testResults.append(errorResult)
return [errorResult]
# Test each operation type
results = []
for operationType in operationTypes:
result = await self.testModelOperation(modelName, operationType, model)
results.append(result)
self.testResults.append(result)
# Save individual result
self._saveIndividualModelResult(f"{modelName}_{operationType.name}", result)
return results
def _saveImageResponse(self, modelName: str, result: Dict[str, Any]):
"""Save image generation response as image file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save as image file
filename = f"{modelName}_{timestamp}.png"
filepath = os.path.join(self.modelTestDir, filename)
# Get image data
content = result.get("fullResponse", "")
if not content:
print(f"⚠️ No image data to save for {modelName}")
return
# Decode base64 image data
import base64
try:
# Extract base64 data if it's a data URL
if content.startswith("data:image"):
base64Data = content.split(",")[1] if "," in content else content
else:
base64Data = content
# Decode base64 to bytes
imageBytes = base64.b64decode(base64Data)
# Save image file
with open(filepath, 'wb') as f:
f.write(imageBytes)
result["savedImageFile"] = filepath
print(f"🖼️ Image saved: {filepath}")
# Also save metadata as JSON
metadata = {
"modelName": modelName,
"timestamp": timestamp,
"status": result.get('status', 'Unknown'),
"processingTime": result.get('processingTime', 0),
"responseLength": result.get('responseLength', 0),
"isValidBase64": result.get('isValidBase64', False),
"imageByteSize": len(imageBytes),
"size": result.get('size', 'N/A'),
"quality": result.get('quality', 'N/A'),
"style": result.get('style', 'N/A'),
"testPrompt": result.get('testPrompt', 'N/A'),
"imageFile": filename
}
metadataFile = os.path.join(self.modelTestDir, f"{modelName}_{timestamp}_metadata.json")
with open(metadataFile, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
print(f"📄 Metadata saved: {metadataFile}")
except Exception as decodeError:
print(f"❌ Error decoding base64 image data: {str(decodeError)}")
# Fall back to saving as text file
textFile = os.path.join(self.modelTestDir, f"{modelName}_{timestamp}.txt")
with open(textFile, 'w', encoding='utf-8') as f:
f.write(f"Error decoding image:\n{str(decodeError)}\n\nBase64 data:\n{content[:500]}...")
print(f"📄 Saved base64 data as text: {textFile}")
except Exception as e:
print(f"❌ Error saving image generation response: {str(e)}")
result["saveError"] = str(e)
def _saveTextResponse(self, modelName: str, result: Dict[str, Any]):
"""Save text response to file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{modelName}_{timestamp}.txt"
filepath = os.path.join(self.modelTestDir, filename)
# Prepare content for saving
content = result.get("fullResponse", "")
if not content:
content = result.get("responsePreview", "No content available")
# If there's an error, include it in the content
if result.get("error"):
content = f"ERROR: {result.get('error')}\n\n{content}"
# Get prompt and config for logging
config = result.get("crawlConfig", {})
crawlDepth = config.get("depth", "N/A")
crawlWidth = config.get("width", "N/A")
# Get both the original JSON prompt and the actual prompt sent
originalPrompt = result.get("testPrompt", "N/A")
actualPromptSent = result.get("actualPromptSent", "N/A")
# Add metadata header
metadata = f"""Model: {modelName}
Test Time: {timestamp}
Status: {result.get('status', 'Unknown')}
Processing Time: {result.get('processingTime', 0):.2f}s
Response Length: {result.get('responseLength', 0)} characters
Is Valid JSON: {result.get('isValidJson', False)}
Test Method: {result.get('testMethod', 'standard')}
Pages Crawled: {result.get('pagesCrawled', 'N/A')}
Crawled URL: {result.get('crawledUrl', 'N/A')}
Has URL: {result.get('hasUrl', 'N/A')}
Has Title: {result.get('hasTitle', 'N/A')}
Has Content: {result.get('hasContent', 'N/A')}
Content Length: {result.get('contentLength', 'N/A')} characters
--- CRAWL CONFIGURATION ---
Depth: {crawlDepth}
Width: {crawlWidth}
--- ORIGINAL JSON PROMPT (input) ---
{originalPrompt}
--- ACTUAL PROMPT SENT TO API (EXACT) ---
{actualPromptSent}
--- RESPONSE CONTENT ---
{content}
"""
with open(filepath, 'w', encoding='utf-8') as f:
f.write(metadata)
result["savedTextFile"] = filepath
print(f"📄 Text response saved: {filepath}")
except Exception as e:
print(f"❌ Error saving text response: {str(e)}")
result["textSaveError"] = str(e)
def _validateCrawlResponse(self, modelName: str, result: Dict[str, Any]):
"""Validate that the WEB_CRAWL response contains crawled content."""
try:
content = result.get("fullResponse", "")
# Try to parse as JSON
crawledData = {}
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
crawledData = parsed
except:
pass
# Check for expected fields: url, title, content
hasUrl = bool(crawledData.get("url"))
hasTitle = bool(crawledData.get("title"))
hasContent = bool(crawledData.get("content"))
contentLength = len(crawledData.get("content", ""))
result["hasUrl"] = hasUrl
result["hasTitle"] = hasTitle
result["hasContent"] = hasContent
result["contentLength"] = contentLength
result["crawledUrl"] = crawledData.get("url", "")
if hasUrl and hasContent:
print(f"✅ Successfully crawled content from URL: {crawledData.get('url', 'unknown')}")
print(f" Content length: {contentLength} characters")
print(f" Title: {crawledData.get('title', 'N/A')}")
else:
print(f"⚠️ Incomplete crawl response - URL: {hasUrl}, Content: {hasContent}")
except Exception as e:
print(f"❌ Error validating crawl response: {str(e)}")
result["crawlValidationError"] = str(e)
def _validateImageResponse(self, modelName: str, result: Dict[str, Any]):
"""Validate that the IMAGE_GENERATE response contains a valid base64 image."""
try:
content = result.get("fullResponse", "")
# Check if content is a valid base64 image
hasContent = bool(content and len(content.strip()) > 0)
result["hasContent"] = hasContent
if hasContent:
isBase64 = result.get("isValidBase64", False)
imageSize = result.get("imageByteSize", 0)
imageSizeKB = imageSize / 1024 if imageSize > 0 else 0
print(f"✅ Successfully generated image")
print(f" Image size: {imageSizeKB:.2f} KB ({imageSize} bytes)")
print(f" Valid base64: {'Yes' if isBase64 else 'No'}")
else:
print(f"⚠️ Empty or invalid image generation response")
except Exception as e:
print(f"❌ Error validating image response: {str(e)}")
result["validationError"] = str(e)
async def _testTavilyDirect(self, modelName: str, crawlDepth: int = 3, crawlWidth: int = 50) -> Dict[str, Any]:
"""Test Tavily API directly using the crawl() method with better link following."""
print(f"\n{'='*60}")
print(f"TESTING TAVILY DIRECT API (crawl method)")
print(f"{'='*60}")
startTime = asyncio.get_event_loop().time()
try:
from tavily import AsyncTavilyClient
from modules.shared.configuration import APP_CONFIG
apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET")
if not apiKey:
raise Exception("Tavily API key not found")
client = AsyncTavilyClient(api_key=apiKey)
# Map our configuration to Tavily parameters
# maxWidth -> limit (pages per level)
# maxDepth -> max_depth (link following depth)
# max_breadth = maxWidth (breadth of crawl at each level)
tavilyLimit = crawlWidth
tavilyMaxDepth = crawlDepth
tavilyMaxBreadth = crawlWidth
print(f"Calling Tavily API with crawl() method...")
print(f"URL: https://www.valueon.ch")
print(f"Instructions: Who works in this company?")
print(f"Limit: {tavilyLimit} pages per level")
print(f"Max depth: {tavilyMaxDepth} (follows links {tavilyMaxDepth} levels deep)")
print(f"Max breadth: {tavilyMaxBreadth} (up to {tavilyMaxBreadth} pages at each level)")
print(f"Deep and Broad Crawl Configuration Active")
response = await client.crawl(
url="https://www.valueon.ch",
instructions="Who works in this company?",
limit=tavilyLimit,
max_depth=tavilyMaxDepth,
max_breadth=tavilyMaxBreadth
)
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
# Analyze response
contentLength = 0
pagesCrawled = 0
fullContent = ""
if isinstance(response, dict):
# Check if it has results
if "results" in response:
results = response["results"]
pagesCrawled = len(results)
content_parts = []
for result in results:
url = result.get("url", "")
title = result.get("title", "")
content = result.get("raw_content", result.get("content", ""))
content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n")
contentLength += len(content)
fullContent = "\n".join(content_parts)
else:
fullContent = json.dumps(response, indent=2)
contentLength = len(fullContent)
elif isinstance(response, list):
pagesCrawled = len(response)
content_parts = []
for item in response:
if isinstance(item, dict):
url = item.get("url", "")
title = item.get("title", "")
content = item.get("raw_content", item.get("content", ""))
content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n")
contentLength += len(content)
fullContent = "\n".join(content_parts)
else:
fullContent = str(response)
contentLength = len(fullContent)
result = {
"modelName": modelName,
"status": "SUCCESS",
"processingTime": round(processingTime, 2),
"responseLength": contentLength,
"responseType": "TavilyDirectAPI",
"hasContent": True,
"error": None,
"modelUsed": modelName,
"priceUsd": 0.0,
"bytesSent": 0,
"bytesReceived": contentLength,
"isValidJson": True,
"fullResponse": fullContent,
"pagesCrawled": pagesCrawled,
"testMethod": "direct_api_crawl"
}
print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s")
print(f"📄 Pages crawled: {pagesCrawled}")
print(f"📄 Total content length: {contentLength} characters")
# Save the response
self._saveTextResponse(modelName, result)
self._validateCrawlResponse(modelName, result)
self._saveIndividualModelResult(modelName, result)
self.testResults.append(result)
return result
except Exception as e:
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
result = {
"modelName": modelName,
"status": "EXCEPTION",
"processingTime": round(processingTime, 2),
"responseLength": 0,
"responseType": "exception",
"hasContent": False,
"error": str(e)
}
print(f"💥 EXCEPTION - {str(e)}")
self.testResults.append(result)
return result
def _saveIndividualModelResult(self, modelName: str, result: Dict[str, Any]):
"""Save individual model test result to file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{modelName}_{timestamp}.json"
filepath = os.path.join(self.modelTestDir, filename)
# Prepare individual result data
individualData = {
"modelName": modelName,
"testTimestamp": timestamp,
"testDate": datetime.now().isoformat(),
"result": result
}
# Save to JSON file
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(individualData, f, indent=2, ensure_ascii=False)
print(f"📄 Individual result saved: {filename}")
except Exception as e:
print(f"❌ Error saving individual result: {str(e)}")
def getAllAvailableModels(self) -> List[Dict[str, Any]]:
"""Get all available models with their supported operation types."""
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.datamodels.datamodelAi import OperationTypeEnum
# Get all models from registry
allModels = modelRegistry.getAvailableModels()
totalModels = len(allModels)
print(f"\n📊 Total models in registry: {totalModels}")
# Collect all models with their supported operation types
modelsToTest = []
for model in allModels:
if model.operationTypes and len(model.operationTypes) > 0:
supportedOps = [ot.operationType for ot in model.operationTypes]
modelsToTest.append({
"displayName": model.displayName,
"name": model.name,
"operationTypes": supportedOps
})
print(f"✅ Found {len(modelsToTest)} model(s) with operation type support (will test all):")
for i, modelInfo in enumerate(modelsToTest, 1):
opsStr = ", ".join([op.name for op in modelInfo["operationTypes"]])
print(f" {i}. {modelInfo['displayName']} - Operations: {opsStr}")
if len(modelsToTest) < totalModels:
skipped = totalModels - len(modelsToTest)
print(f" {skipped} model(s) have no operation types and will be skipped.")
return modelsToTest
def saveTestResults(self):
"""Save detailed test results to file."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
resultsFile = os.path.join(self.modelTestDir, f"modeltest_results_{timestamp}.json")
# Prepare results for saving
saveData = {
"testTimestamp": timestamp,
"testDate": datetime.now().isoformat(),
"totalModels": len(self.testResults),
"successfulModels": len([r for r in self.testResults if r["status"] == "SUCCESS"]),
"errorModels": len([r for r in self.testResults if r["status"] == "ERROR"]),
"exceptionModels": len([r for r in self.testResults if r["status"] == "EXCEPTION"]),
"results": self.testResults
}
# Calculate success rate
if saveData["totalModels"] > 0:
saveData["successRate"] = (saveData["successfulModels"] / saveData["totalModels"]) * 100
else:
saveData["successRate"] = 0
# Save to JSON file
with open(resultsFile, 'w', encoding='utf-8') as f:
json.dump(saveData, f, indent=2, ensure_ascii=False)
print(f"📄 Detailed results saved: {resultsFile}")
return resultsFile
def printTestSummary(self):
"""Print a summary of all test results."""
print(f"\n{'='*80}")
print("AI MODELS TEST SUMMARY")
print(f"{'='*80}")
totalTests = len(self.testResults)
successfulTests = len([r for r in self.testResults if r["status"] == "SUCCESS"])
errorTests = len([r for r in self.testResults if r["status"] == "ERROR"])
exceptionTests = len([r for r in self.testResults if r["status"] == "EXCEPTION"])
# Count unique models
uniqueModels = len(set(r["modelName"] for r in self.testResults))
print(f"📊 Total tests executed: {totalTests}")
print(f"📦 Unique models tested: {uniqueModels}")
print(f"✅ Successful tests: {successfulTests}")
print(f"❌ Error tests: {errorTests}")
print(f"💥 Exception tests: {exceptionTests}")
print(f"📈 Success rate: {(successfulTests/totalTests*100):.1f}%" if totalTests > 0 else "0%")
print(f"\n{'='*80}")
print("DETAILED RESULTS")
print(f"{'='*80}")
# Group results by model
from collections import defaultdict
resultsByModel = defaultdict(list)
for result in self.testResults:
resultsByModel[result['modelName']].append(result)
for modelName, modelResults in resultsByModel.items():
print(f"\n📦 {modelName}")
for result in modelResults:
status_icon = {
"SUCCESS": "",
"ERROR": "",
"EXCEPTION": "💥"
}.get(result["status"], "")
opType = result.get("operationType", "UNKNOWN")
print(f" {status_icon} {opType}: {result['status']} - {result['processingTime']}s - {result['responseLength']} chars")
if result.get("isValidJson") is not None:
print(f" Valid JSON: {'Yes' if result['isValidJson'] else 'No'}")
if result.get("isValidBase64") is not None:
print(f" Valid Base64: {'Yes' if result['isValidBase64'] else 'No'}")
if result.get("imageByteSize"):
print(f" Image size: {result['imageByteSize']} bytes")
if result.get("crawledUrl"):
print(f" Crawled URL: {result['crawledUrl']}")
if result.get("contentLength") is not None:
print(f" Content length: {result['contentLength']} characters")
if result.get("pagesCrawled") is not None:
print(f" Pages crawled: {result['pagesCrawled']}")
if result.get("error"):
print(f" Error: {result['error']}")
# Find fastest and slowest tests
if successfulTests > 0:
successfulResults = [r for r in self.testResults if r["status"] == "SUCCESS"]
fastest = min(successfulResults, key=lambda x: x["processingTime"])
slowest = max(successfulResults, key=lambda x: x["processingTime"])
print(f"\n{'='*80}")
print("PERFORMANCE HIGHLIGHTS")
print(f"{'='*80}")
print(f"🚀 Fastest test: {fastest['modelName']} - {fastest.get('operationType', 'UNKNOWN')} ({fastest['processingTime']}s)")
print(f"🐌 Slowest test: {slowest['modelName']} - {slowest.get('operationType', 'UNKNOWN')} ({slowest['processingTime']}s)")
# Find models with most content
modelsWithContent = [r for r in successfulResults if r.get("contentLength", 0) > 0]
if modelsWithContent:
mostContent = max(modelsWithContent, key=lambda x: x.get("contentLength", 0))
totalContent = sum(r.get("contentLength", 0) for r in modelsWithContent)
avgContent = totalContent / len(modelsWithContent)
print(f"📄 Model with most content: {mostContent['modelName']} ({mostContent.get('contentLength', 0)} chars)")
print(f"📊 Average content per model: {avgContent:.0f} characters")
print(f"📊 Total content crawled across all models: {totalContent} characters")
# Find models with most pages crawled (for Tavily direct API)
modelsWithPages = [r for r in successfulResults if r.get("pagesCrawled", 0) > 0]
if modelsWithPages:
mostPages = max(modelsWithPages, key=lambda x: x.get("pagesCrawled", 0))
totalPages = sum(r.get("pagesCrawled", 0) for r in modelsWithPages)
avgPages = totalPages / len(modelsWithPages)
print(f"🔍 Model with most pages crawled: {mostPages['modelName']} ({mostPages.get('pagesCrawled', 0)} pages)")
print(f"📊 Average pages per model: {avgPages:.1f} pages")
print(f"📊 Total pages crawled across all models: {totalPages} pages")
async def main():
"""Run AI models testing for all operation types."""
tester = AIModelsTester()
print("Starting AI Models Testing for ALL Operation Types...")
print("Initializing AI service...")
await tester.initialize()
# Get all available models with their operation types
models = tester.getAllAvailableModels()
if not models:
print("\n⚠️ No models found with operation type support.")
print(" Please check that models with operation types are registered.")
return
# Count total tests (models * operation types)
totalTests = sum(len(model["operationTypes"]) for model in models)
print(f"\n{'='*80}")
print("STARTING COMPREHENSIVE MODEL TESTS")
print(f"{'='*80}")
print(f"Testing {len(models)} model(s) with {totalTests} total operation type test(s)...")
print("All models and their supported operation types will be tested automatically.")
print(f"{'='*80}\n")
# Test each model with all its operation types
testCount = 0
for i, modelInfo in enumerate(models, 1):
print(f"\n{'='*80}")
print(f"[Model {i}/{len(models)}] Testing: {modelInfo['displayName']}")
print(f"{'='*80}")
# Test the model (tests all its operation types)
results = await tester.testModel(modelInfo)
testCount += len(results)
print(f"\n✅ Completed {len(results)} test(s) for {modelInfo['displayName']}")
# Save detailed results to file
resultsFile = tester.saveTestResults()
# Print final summary
tester.printTestSummary()
print(f"\n{'='*80}")
print("TESTING COMPLETED")
print(f"{'='*80}")
print(f"📊 Total tests executed: {testCount}")
print(f"📄 Results saved to: {resultsFile}")
print(f"📁 Test results saved to: {tester.modelTestDir}")
if __name__ == "__main__":
asyncio.run(main())