gateway/test_ai_models.py
2025-10-26 23:09:26 +01:00

838 lines
34 KiB
Python

#!/usr/bin/env python3
"""
AI Models Test - Tests IMAGE_ANALYSE functionality on all models that support it
This script tests all models that have IMAGE_ANALYSE capability, validates that
they can analyze images and return structured content, and analyzes the quality of results.
CODE FLOW ANALYSIS:
1. methodAi.process() is called by AI planner with prompt and documents (images)
2. mainServiceAi.callAiDocuments() is called
-> delegates to subCoreAi.callAiDocuments()
-> which calls subDocumentProcessing.callAiText()
-> which processes chunks and detects images
-> for image chunks, calls subCoreAi.readImage()
-> which calls aiObjects.callImage() with operationType=IMAGE_ANALYSE
OR direct call:
- mainServiceAi.readImage() can be called directly (used in this test)
-> delegates to subCoreAi.readImage()
-> which calls aiObjects.callImage() with operationType=IMAGE_ANALYSE
WHERE FUNCTIONS ARE USED:
- mainServiceAi.readImage(): Public API entry point for direct image analysis
- mainServiceAi.generateImage(): Public API entry point for image generation
- subCoreAi.readImage(): Internal implementation, called by document processing or directly
- subCoreAi.generateImage(): Internal implementation, called by mainServiceAi
- subDocumentProcessing._processChunksWithMapping(): Detects image chunks and calls readImage()
"""
import asyncio
import json
import sys
import os
import base64
from datetime import datetime
from typing import Dict, Any, List
# Add the gateway to path
sys.path.append(os.path.dirname(__file__))
# Import the service initialization
from modules.features.chatPlayground.mainChatPlayground import getServices
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
from modules.datamodels.datamodelUam import User
class AIModelsTester:
def __init__(self):
# Create a minimal user context for testing
testUser = User(
id="test_user",
username="test_user",
email="test@example.com",
fullName="Test User",
language="en",
mandateId="test_mandate"
)
# Initialize services using the existing system
self.services = getServices(testUser, None) # Test user, no workflow
self.testResults = []
# Create logs directory if it doesn't exist
self.logsDir = os.path.join(os.path.dirname(__file__), "..", "local", "logs")
os.makedirs(self.logsDir, exist_ok=True)
# Create modeltest subdirectory
self.modelTestDir = os.path.join(self.logsDir, "modeltest")
os.makedirs(self.modelTestDir, exist_ok=True)
# Copy test image to modeltest directory if it exists
testImageSource = os.path.join(self.logsDir, "_testdata_photo_2025-06-03_13-05-52.jpg")
testImageDest = os.path.join(self.modelTestDir, "_testdata_photo_2025-06-03_13-05-52.jpg")
if os.path.exists(testImageSource) and not os.path.exists(testImageDest):
import shutil
shutil.copy2(testImageSource, testImageDest)
print(f"📷 Test image copied to: {testImageDest}")
# Find test image
self.testImagePath = None
if os.path.exists(testImageDest):
self.testImagePath = testImageDest
else:
# Try to find any image in modeltest directory
for file in os.listdir(self.modelTestDir):
if file.lower().endswith(('.jpg', '.jpeg', '.png')):
self.testImagePath = os.path.join(self.modelTestDir, file)
break
if self.testImagePath:
print(f"📷 Using test image: {self.testImagePath}")
else:
print(f"⚠️ No test image found in {self.modelTestDir}")
async def initialize(self):
"""Initialize the AI service."""
# Set logging level to DEBUG for detailed output
import logging
logging.getLogger().setLevel(logging.DEBUG)
# Initialize the model registry with all connectors
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicorePluginTavily import AiTavily
from modules.aicore.aicorePluginPerplexity import AiPerplexity
# Note: We don't need to register web connectors for IMAGE_ANALYSE testing
# modelRegistry.registerConnector(AiTavily())
# modelRegistry.registerConnector(AiPerplexity())
# The AI service needs to be recreated with proper initialization
from modules.services.serviceAi.mainServiceAi import AiService
self.services.ai = await AiService.create(self.services)
# Also initialize extraction service for image processing
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
self.services.extraction = ExtractionService(self.services)
# Create a minimal workflow context
from modules.datamodels.datamodelChat import ChatWorkflow
import uuid
self.services.currentWorkflow = ChatWorkflow(
id=str(uuid.uuid4()),
name="Test Workflow",
status="running",
startedAt=self.services.utils.timestampGetUtc(),
lastActivity=self.services.utils.timestampGetUtc(),
currentRound=1,
currentTask=0,
currentAction=0,
totalTasks=0,
totalActions=0,
mandateId="test_mandate",
messageIds=[],
workflowMode="React",
maxSteps=5
)
print("✅ AI Service initialized successfully")
print(f"📁 Results will be saved to: {self.modelTestDir}")
async def testModel(self, modelName: str) -> Dict[str, Any]:
"""Test a specific AI model with IMAGE_ANALYSE operation."""
print(f"\n{'='*60}")
print(f"TESTING MODEL: {modelName}")
print(f"OPERATION TYPE: IMAGE_ANALYSE")
print(f"{'='*60}")
# Check if test image exists
if not self.testImagePath or not os.path.exists(self.testImagePath):
result = {
"modelName": modelName,
"status": "ERROR",
"processingTime": 0.0,
"responseLength": 0,
"responseType": "error",
"hasContent": False,
"error": "No test image available",
"fullResponse": ""
}
self.testResults.append(result)
return result
# Test prompt for image analysis
testPrompt = "Analyze this image and describe what you see. Extract any text, numbers, or structured data."
print(f"Test image: {self.testImagePath}")
print(f"Test prompt: {testPrompt}")
# Load image data
with open(self.testImagePath, 'rb') as f:
imageData = f.read()
print(f"Image size: {len(imageData)} bytes")
# Determine image MIME type from extension
if self.testImagePath.lower().endswith('.png'):
mimeType = "image/png"
elif self.testImagePath.lower().endswith(('.jpg', '.jpeg')):
mimeType = "image/jpeg"
else:
mimeType = "image/jpeg" # Default
print(f"Image MIME type: {mimeType}")
startTime = asyncio.get_event_loop().time()
try:
# Get model directly from registry and test it
from modules.aicore.aicoreModelRegistry import modelRegistry
model = modelRegistry.getModel(modelName)
if not model:
raise Exception(f"Model {modelName} not found")
# Import base64 for image data conversion
import base64
# Convert image data to base64 string
if isinstance(imageData, bytes):
imageDataStr = base64.b64encode(imageData).decode('utf-8')
else:
imageDataStr = imageData
# Create messages in vision format
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": testPrompt},
{
"type": "image_url",
"image_url": {
"url": f"data:{mimeType};base64,{imageDataStr}"
}
}
]
}
]
# Create model call
from modules.datamodels.datamodelAi import AiModelCall, AiCallOptions
modelCall = AiModelCall(
messages=messages,
model=model,
options=AiCallOptions(operationType=OperationTypeEnum.IMAGE_ANALYSE)
)
# Call model directly
print(f"Calling model.functionCall() for {modelName}")
modelResponse = await model.functionCall(modelCall)
if not modelResponse.success:
raise Exception(f"Model call failed: {modelResponse.error}")
result = modelResponse.content
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
# Analyze result (string response from readImage)
if result:
analysisResult = {
"modelName": modelName,
"status": "SUCCESS",
"processingTime": round(processingTime, 2),
"responseLength": len(result) if result else 0,
"responseType": "string",
"hasContent": True,
"error": None,
"testPrompt": testPrompt,
"imagePath": self.testImagePath,
"imageSize": len(imageData),
"mimeType": mimeType
}
# Try to parse as JSON
try:
json.loads(result)
analysisResult["isValidJson"] = True
except:
analysisResult["isValidJson"] = False
analysisResult["responsePreview"] = result[:200] + "..." if len(result) > 200 else result
analysisResult["fullResponse"] = result
print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s")
print(f"📄 Response length: {len(result)} characters")
print(f"📄 Response preview: {analysisResult['responsePreview']}")
result = analysisResult
# Validate that content was extracted
if result.get("status") == "SUCCESS" and result.get("fullResponse"):
self._validateImageResponse(modelName, result)
else:
result = {
"modelName": modelName,
"status": "ERROR",
"processingTime": round(processingTime, 2),
"responseLength": 0,
"responseType": "error",
"hasContent": False,
"error": "Empty response",
"fullResponse": ""
}
except Exception as e:
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
result = {
"modelName": modelName,
"status": "EXCEPTION",
"processingTime": round(processingTime, 2),
"responseLength": 0,
"responseType": "exception",
"hasContent": False,
"error": str(e),
"testPrompt": testPrompt,
"imagePath": self.testImagePath,
"imageSize": len(imageData) if imageData else 0,
"mimeType": mimeType
}
print(f"💥 EXCEPTION - {str(e)}")
self.testResults.append(result)
# Save text response even for exceptions to log the prompt
if result.get("status") in ["SUCCESS", "EXCEPTION", "ERROR"]:
self._saveImageResponse(modelName, result)
# Save individual model result immediately
self._saveIndividualModelResult(modelName, result)
return result
def _saveImageResponse(self, modelName: str, result: Dict[str, Any]):
"""Save image analysis response to file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{modelName}_{timestamp}.txt"
filepath = os.path.join(self.modelTestDir, filename)
# Prepare content for saving
content = result.get("fullResponse", "")
if not content:
content = result.get("responsePreview", "No content available")
# If there's an error, include it in the content
if result.get("error"):
content = f"ERROR: {result.get('error')}\n\n{content}"
# Add metadata header
metadata = f"""Model: {modelName}
Test Time: {timestamp}
Status: {result.get('status', 'Unknown')}
Processing Time: {result.get('processingTime', 0):.2f}s
Response Length: {result.get('responseLength', 0)} characters
Is Valid JSON: {result.get('isValidJson', False)}
Image Path: {result.get('imagePath', 'N/A')}
Image Size: {result.get('imageSize', 'N/A')} bytes
MIME Type: {result.get('mimeType', 'N/A')}
--- TEST PROMPT ---
{result.get('testPrompt', 'N/A')}
--- RESPONSE CONTENT ---
{content}
"""
with open(filepath, 'w', encoding='utf-8') as f:
f.write(metadata)
result["savedTextFile"] = filepath
print(f"📄 Analysis response saved: {filepath}")
except Exception as e:
print(f"❌ Error saving analysis response: {str(e)}")
result["saveError"] = str(e)
def _saveTextResponse(self, modelName: str, result: Dict[str, Any]):
"""Save text response to file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{modelName}_{timestamp}.txt"
filepath = os.path.join(self.modelTestDir, filename)
# Prepare content for saving
content = result.get("fullResponse", "")
if not content:
content = result.get("responsePreview", "No content available")
# If there's an error, include it in the content
if result.get("error"):
content = f"ERROR: {result.get('error')}\n\n{content}"
# Get prompt and config for logging
config = result.get("crawlConfig", {})
crawlDepth = config.get("depth", "N/A")
crawlWidth = config.get("width", "N/A")
# Get both the original JSON prompt and the actual prompt sent
originalPrompt = result.get("testPrompt", "N/A")
actualPromptSent = result.get("actualPromptSent", "N/A")
# Add metadata header
metadata = f"""Model: {modelName}
Test Time: {timestamp}
Status: {result.get('status', 'Unknown')}
Processing Time: {result.get('processingTime', 0):.2f}s
Response Length: {result.get('responseLength', 0)} characters
Is Valid JSON: {result.get('isValidJson', False)}
Test Method: {result.get('testMethod', 'standard')}
Pages Crawled: {result.get('pagesCrawled', 'N/A')}
Crawled URL: {result.get('crawledUrl', 'N/A')}
Has URL: {result.get('hasUrl', 'N/A')}
Has Title: {result.get('hasTitle', 'N/A')}
Has Content: {result.get('hasContent', 'N/A')}
Content Length: {result.get('contentLength', 'N/A')} characters
--- CRAWL CONFIGURATION ---
Depth: {crawlDepth}
Width: {crawlWidth}
--- ORIGINAL JSON PROMPT (input) ---
{originalPrompt}
--- ACTUAL PROMPT SENT TO API (EXACT) ---
{actualPromptSent}
--- RESPONSE CONTENT ---
{content}
"""
with open(filepath, 'w', encoding='utf-8') as f:
f.write(metadata)
result["savedTextFile"] = filepath
print(f"📄 Text response saved: {filepath}")
except Exception as e:
print(f"❌ Error saving text response: {str(e)}")
result["textSaveError"] = str(e)
def _validateCrawlResponse(self, modelName: str, result: Dict[str, Any]):
"""Validate that the WEB_CRAWL response contains crawled content."""
try:
content = result.get("fullResponse", "")
# Try to parse as JSON
crawledData = {}
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
crawledData = parsed
except:
pass
# Check for expected fields: url, title, content
hasUrl = bool(crawledData.get("url"))
hasTitle = bool(crawledData.get("title"))
hasContent = bool(crawledData.get("content"))
contentLength = len(crawledData.get("content", ""))
result["hasUrl"] = hasUrl
result["hasTitle"] = hasTitle
result["hasContent"] = hasContent
result["contentLength"] = contentLength
result["crawledUrl"] = crawledData.get("url", "")
if hasUrl and hasContent:
print(f"✅ Successfully crawled content from URL: {crawledData.get('url', 'unknown')}")
print(f" Content length: {contentLength} characters")
print(f" Title: {crawledData.get('title', 'N/A')}")
else:
print(f"⚠️ Incomplete crawl response - URL: {hasUrl}, Content: {hasContent}")
except Exception as e:
print(f"❌ Error validating crawl response: {str(e)}")
result["crawlValidationError"] = str(e)
def _validateImageResponse(self, modelName: str, result: Dict[str, Any]):
"""Validate that the IMAGE_ANALYSE response contains analyzed content."""
try:
content = result.get("fullResponse", "")
# Check if content is meaningful
hasContent = bool(content and len(content.strip()) > 0)
contentLength = len(content)
result["hasContent"] = hasContent
result["contentLength"] = contentLength
# Try to determine what kind of content was extracted
if hasContent:
# Check if it's structured data
isStructured = False
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
isStructured = True
except:
pass
result["isStructured"] = isStructured
print(f"✅ Successfully analyzed image")
print(f" Content length: {contentLength} characters")
print(f" Is structured: {'Yes' if isStructured else 'No'}")
else:
print(f"⚠️ Empty or invalid image analysis response")
except Exception as e:
print(f"❌ Error validating image response: {str(e)}")
result["validationError"] = str(e)
async def _testTavilyDirect(self, modelName: str, crawlDepth: int = 3, crawlWidth: int = 50) -> Dict[str, Any]:
"""Test Tavily API directly using the crawl() method with better link following."""
print(f"\n{'='*60}")
print(f"TESTING TAVILY DIRECT API (crawl method)")
print(f"{'='*60}")
startTime = asyncio.get_event_loop().time()
try:
from tavily import AsyncTavilyClient
from modules.shared.configuration import APP_CONFIG
apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET")
if not apiKey:
raise Exception("Tavily API key not found")
client = AsyncTavilyClient(api_key=apiKey)
# Map our configuration to Tavily parameters
# maxWidth -> limit (pages per level)
# maxDepth -> max_depth (link following depth)
# max_breadth = maxWidth (breadth of crawl at each level)
tavilyLimit = crawlWidth
tavilyMaxDepth = crawlDepth
tavilyMaxBreadth = crawlWidth
print(f"Calling Tavily API with crawl() method...")
print(f"URL: https://www.valueon.ch")
print(f"Instructions: Who works in this company?")
print(f"Limit: {tavilyLimit} pages per level")
print(f"Max depth: {tavilyMaxDepth} (follows links {tavilyMaxDepth} levels deep)")
print(f"Max breadth: {tavilyMaxBreadth} (up to {tavilyMaxBreadth} pages at each level)")
print(f"Deep and Broad Crawl Configuration Active")
response = await client.crawl(
url="https://www.valueon.ch",
instructions="Who works in this company?",
limit=tavilyLimit,
max_depth=tavilyMaxDepth,
max_breadth=tavilyMaxBreadth
)
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
# Analyze response
contentLength = 0
pagesCrawled = 0
fullContent = ""
if isinstance(response, dict):
# Check if it has results
if "results" in response:
results = response["results"]
pagesCrawled = len(results)
content_parts = []
for result in results:
url = result.get("url", "")
title = result.get("title", "")
content = result.get("raw_content", result.get("content", ""))
content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n")
contentLength += len(content)
fullContent = "\n".join(content_parts)
else:
fullContent = json.dumps(response, indent=2)
contentLength = len(fullContent)
elif isinstance(response, list):
pagesCrawled = len(response)
content_parts = []
for item in response:
if isinstance(item, dict):
url = item.get("url", "")
title = item.get("title", "")
content = item.get("raw_content", item.get("content", ""))
content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n")
contentLength += len(content)
fullContent = "\n".join(content_parts)
else:
fullContent = str(response)
contentLength = len(fullContent)
result = {
"modelName": modelName,
"status": "SUCCESS",
"processingTime": round(processingTime, 2),
"responseLength": contentLength,
"responseType": "TavilyDirectAPI",
"hasContent": True,
"error": None,
"modelUsed": modelName,
"priceUsd": 0.0,
"bytesSent": 0,
"bytesReceived": contentLength,
"isValidJson": True,
"fullResponse": fullContent,
"pagesCrawled": pagesCrawled,
"testMethod": "direct_api_crawl"
}
print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s")
print(f"📄 Pages crawled: {pagesCrawled}")
print(f"📄 Total content length: {contentLength} characters")
# Save the response
self._saveTextResponse(modelName, result)
self._validateCrawlResponse(modelName, result)
self._saveIndividualModelResult(modelName, result)
self.testResults.append(result)
return result
except Exception as e:
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
result = {
"modelName": modelName,
"status": "EXCEPTION",
"processingTime": round(processingTime, 2),
"responseLength": 0,
"responseType": "exception",
"hasContent": False,
"error": str(e)
}
print(f"💥 EXCEPTION - {str(e)}")
self.testResults.append(result)
return result
def _saveIndividualModelResult(self, modelName: str, result: Dict[str, Any]):
"""Save individual model test result to file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{modelName}_{timestamp}.json"
filepath = os.path.join(self.modelTestDir, filename)
# Prepare individual result data
individualData = {
"modelName": modelName,
"testTimestamp": timestamp,
"testDate": datetime.now().isoformat(),
"result": result
}
# Save to JSON file
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(individualData, f, indent=2, ensure_ascii=False)
print(f"📄 Individual result saved: {filename}")
except Exception as e:
print(f"❌ Error saving individual result: {str(e)}")
def getAllAvailableModels(self) -> List[str]:
"""Get all available model names that support IMAGE_ANALYSE."""
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.datamodels.datamodelAi import OperationTypeEnum
# Get all models from registry
allModels = modelRegistry.getAvailableModels()
# Filter models that support IMAGE_ANALYSE
imageAnalyseModels = []
for model in allModels:
if model.operationTypes and any(
ot.operationType == OperationTypeEnum.IMAGE_ANALYSE
for ot in model.operationTypes
):
imageAnalyseModels.append(model.name)
# Filter to common models for testing (remove filter to test all models)
# imageAnalyseModels = [m for m in imageAnalyseModels if "gpt" in m.lower() or "claude" in m.lower()]
print(f"Found {len(imageAnalyseModels)} models that support IMAGE_ANALYSE:")
for modelName in imageAnalyseModels:
print(f" - {modelName}")
return imageAnalyseModels
def saveTestResults(self):
"""Save detailed test results to file."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
resultsFile = os.path.join(self.modelTestDir, f"modeltest_results_{timestamp}.json")
# Prepare results for saving
saveData = {
"testTimestamp": timestamp,
"testDate": datetime.now().isoformat(),
"totalModels": len(self.testResults),
"successfulModels": len([r for r in self.testResults if r["status"] == "SUCCESS"]),
"errorModels": len([r for r in self.testResults if r["status"] == "ERROR"]),
"exceptionModels": len([r for r in self.testResults if r["status"] == "EXCEPTION"]),
"results": self.testResults
}
# Calculate success rate
if saveData["totalModels"] > 0:
saveData["successRate"] = (saveData["successfulModels"] / saveData["totalModels"]) * 100
else:
saveData["successRate"] = 0
# Save to JSON file
with open(resultsFile, 'w', encoding='utf-8') as f:
json.dump(saveData, f, indent=2, ensure_ascii=False)
print(f"📄 Detailed results saved: {resultsFile}")
return resultsFile
def printTestSummary(self):
"""Print a summary of all test results."""
print(f"\n{'='*80}")
print("AI MODELS TEST SUMMARY")
print(f"{'='*80}")
totalModels = len(self.testResults)
successfulModels = len([r for r in self.testResults if r["status"] == "SUCCESS"])
errorModels = len([r for r in self.testResults if r["status"] == "ERROR"])
exceptionModels = len([r for r in self.testResults if r["status"] == "EXCEPTION"])
print(f"📊 Total models tested: {totalModels}")
print(f"✅ Successful: {successfulModels}")
print(f"❌ Errors: {errorModels}")
print(f"💥 Exceptions: {exceptionModels}")
print(f"📈 Success rate: {(successfulModels/totalModels*100):.1f}%" if totalModels > 0 else "0%")
print(f"\n{'='*80}")
print("DETAILED RESULTS")
print(f"{'='*80}")
for result in self.testResults:
status_icon = {
"SUCCESS": "",
"ERROR": "",
"EXCEPTION": "💥"
}.get(result["status"], "")
print(f"\n{status_icon} {result['modelName']}")
print(f" Status: {result['status']}")
print(f" Processing time: {result['processingTime']}s")
print(f" Response length: {result['responseLength']} characters")
print(f" Response type: {result['responseType']}")
if result.get("isValidJson") is not None:
print(f" Valid JSON: {'Yes' if result['isValidJson'] else 'No'}")
if result.get("crawledUrl"):
print(f" Crawled URL: {result['crawledUrl']}")
if result.get("contentLength") is not None:
print(f" Content length: {result['contentLength']} characters")
if result.get("pagesCrawled") is not None:
print(f" Pages crawled: {result['pagesCrawled']}")
if result["error"]:
print(f" Error: {result['error']}")
if result.get("responsePreview"):
print(f" Preview: {result['responsePreview']}")
# Find fastest and slowest models
if successfulModels > 0:
successfulResults = [r for r in self.testResults if r["status"] == "SUCCESS"]
fastest = min(successfulResults, key=lambda x: x["processingTime"])
slowest = max(successfulResults, key=lambda x: x["processingTime"])
print(f"\n{'='*80}")
print("PERFORMANCE HIGHLIGHTS")
print(f"{'='*80}")
print(f"🚀 Fastest model: {fastest['modelName']} ({fastest['processingTime']}s)")
print(f"🐌 Slowest model: {slowest['modelName']} ({slowest['processingTime']}s)")
# Find models with most content
modelsWithContent = [r for r in successfulResults if r.get("contentLength", 0) > 0]
if modelsWithContent:
mostContent = max(modelsWithContent, key=lambda x: x.get("contentLength", 0))
totalContent = sum(r.get("contentLength", 0) for r in modelsWithContent)
avgContent = totalContent / len(modelsWithContent)
print(f"📄 Model with most content: {mostContent['modelName']} ({mostContent.get('contentLength', 0)} chars)")
print(f"📊 Average content per model: {avgContent:.0f} characters")
print(f"📊 Total content crawled across all models: {totalContent} characters")
# Find models with most pages crawled (for Tavily direct API)
modelsWithPages = [r for r in successfulResults if r.get("pagesCrawled", 0) > 0]
if modelsWithPages:
mostPages = max(modelsWithPages, key=lambda x: x.get("pagesCrawled", 0))
totalPages = sum(r.get("pagesCrawled", 0) for r in modelsWithPages)
avgPages = totalPages / len(modelsWithPages)
print(f"🔍 Model with most pages crawled: {mostPages['modelName']} ({mostPages.get('pagesCrawled', 0)} pages)")
print(f"📊 Average pages per model: {avgPages:.1f} pages")
print(f"📊 Total pages crawled across all models: {totalPages} pages")
async def main():
"""Run AI models testing for WEB_CRAWL operation."""
tester = AIModelsTester()
print("Starting AI Models Testing for IMAGE_ANALYSE...")
print("Initializing AI service...")
await tester.initialize()
# Get all available models
models = tester.getAllAvailableModels()
print(f"\nFound {len(models)} models to test:")
for i, model in enumerate(models, 1):
print(f" {i}. {model}")
print(f"\n{'='*80}")
print("STARTING IMAGE_ANALYSE TESTS")
print(f"{'='*80}")
print("Testing each model's ability to analyze images and return structured content...")
print("Press Enter after each model test to continue to the next one...")
# Test each model individually
for i, modelName in enumerate(models, 1):
print(f"\n[{i}/{len(models)}] Testing model: {modelName}")
# Test the model
await tester.testModel(modelName)
# Pause for user input (except for the last model)
if i < len(models):
input(f"\nPress Enter to continue to the next model...")
# Save detailed results to file
resultsFile = tester.saveTestResults()
# Print final summary
tester.printTestSummary()
print(f"\n{'='*80}")
print("TESTING COMPLETED")
print(f"{'='*80}")
print(f"📄 Results saved to: {resultsFile}")
print(f"📁 Test results saved to: {tester.modelTestDir}")
if __name__ == "__main__":
asyncio.run(main())