gateway/test1_ai_models.py
2025-10-28 00:14:24 +01:00

794 lines
34 KiB
Python

#!/usr/bin/env python3
"""
AI Models Test - Tests IMAGE_GENERATE functionality on all models that support it
This script tests all models that have IMAGE_GENERATE capability, validates that
they can generate images from text prompts, and analyzes the quality of results.
CODE FLOW ANALYSIS:
1. methodAi.generateImage() is called with prompt and optional size/quality/style
2. mainServiceAi.generateImage() is called
-> delegates to subCoreAi.generateImage()
-> which calls aiObjects.generateImage()
-> which creates AiModelCall and calls model.functionCall()
WHERE FUNCTIONS ARE USED:
- mainServiceAi.generateImage(): Public API entry point for image generation
- subCoreAi.generateImage(): Internal implementation, called by mainServiceAi
- aiObjects.generateImage(): Creates standardized call and invokes model
- model.functionCall(): Direct model plugin call (e.g., DALL-E 3)
"""
import asyncio
import json
import sys
import os
import base64
from datetime import datetime
from typing import Dict, Any, List
# Add the gateway to path
sys.path.append(os.path.dirname(__file__))
# Import the service initialization
from modules.features.chatPlayground.mainChatPlayground import getServices
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
from modules.datamodels.datamodelUam import User
class AIModelsTester:
def __init__(self):
# Create a minimal user context for testing
testUser = User(
id="test_user",
username="test_user",
email="test@example.com",
fullName="Test User",
language="en",
mandateId="test_mandate"
)
# Initialize services using the existing system
self.services = getServices(testUser, None) # Test user, no workflow
self.testResults = []
# Create logs directory if it doesn't exist
self.logsDir = os.path.join(os.path.dirname(__file__), "..", "local", "logs")
os.makedirs(self.logsDir, exist_ok=True)
# Create modeltest subdirectory
self.modelTestDir = os.path.join(self.logsDir, "modeltest")
os.makedirs(self.modelTestDir, exist_ok=True)
async def initialize(self):
"""Initialize the AI service."""
# Set logging level to DEBUG for detailed output
import logging
logging.getLogger().setLevel(logging.DEBUG)
# Initialize the model registry with all connectors
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicorePluginTavily import AiTavily
from modules.aicore.aicorePluginPerplexity import AiPerplexity
# Note: We don't need to register web connectors for IMAGE_ANALYSE testing
# modelRegistry.registerConnector(AiTavily())
# modelRegistry.registerConnector(AiPerplexity())
# The AI service needs to be recreated with proper initialization
from modules.services.serviceAi.mainServiceAi import AiService
self.services.ai = await AiService.create(self.services)
# Also initialize extraction service for image processing
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
self.services.extraction = ExtractionService(self.services)
# Create a minimal workflow context
from modules.datamodels.datamodelChat import ChatWorkflow
import uuid
self.services.currentWorkflow = ChatWorkflow(
id=str(uuid.uuid4()),
name="Test Workflow",
status="running",
startedAt=self.services.utils.timestampGetUtc(),
lastActivity=self.services.utils.timestampGetUtc(),
currentRound=1,
currentTask=0,
currentAction=0,
totalTasks=0,
totalActions=0,
mandateId="test_mandate",
messageIds=[],
workflowMode="React",
maxSteps=5
)
print("✅ AI Service initialized successfully")
print(f"📁 Results will be saved to: {self.modelTestDir}")
async def testModel(self, modelName: str) -> Dict[str, Any]:
"""Test a specific AI model with IMAGE_GENERATE operation."""
print(f"\n{'='*60}")
print(f"TESTING MODEL: {modelName}")
print(f"OPERATION TYPE: IMAGE_GENERATE")
print(f"{'='*60}")
# Test prompt for image generation
testPrompt = 'Create a creative birthday cake designed to look like a monster truck tire/wheel. The cake appears to be chocolate-flavored and is decorated to resemble a large black tire with treads around the sides. On top of the cake, there is a mound of chocolate cake or brownie material meant to look like dirt or mud, with a toy monster truck positioned on top. The monster truck has large wheels and appears to be reddish in color. There are several small decorative flags in light blue and mint green colors stuck into the "dirt" mound. The words "HAPPY BIRTHDAY" are written in white letters around the side of the tire-shaped cake. The image appears to be from Yandex Images, as indicated by Russian text at the bottom. The status bar at the top shows 13:02 time and 82% battery level.'
size = "1024x1024"
quality = "standard"
style = "vivid"
print(f"Test prompt: {testPrompt}")
print(f"Size: {size}, Quality: {quality}, Style: {style}")
startTime = asyncio.get_event_loop().time()
try:
# Get model directly from registry and test it
from modules.aicore.aicoreModelRegistry import modelRegistry
model = modelRegistry.getModel(modelName)
if not model:
raise Exception(f"Model {modelName} not found")
# Create messages for image generation (plain text prompt)
messages = [
{
"role": "user",
"content": testPrompt
}
]
# Create model call with image generation parameters
from modules.datamodels.datamodelAi import AiModelCall, AiCallOptions
modelCall = AiModelCall(
messages=messages,
model=model,
options=AiCallOptions(
operationType=OperationTypeEnum.IMAGE_GENERATE,
size=size,
quality=quality,
style=style
)
)
# Call model directly
print(f"Calling model.functionCall() for {modelName}")
modelResponse = await model.functionCall(modelCall)
if not modelResponse.success:
raise Exception(f"Model call failed: {modelResponse.error}")
result = modelResponse.content
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
# Analyze result (base64 image data)
if result:
analysisResult = {
"modelName": modelName,
"status": "SUCCESS",
"processingTime": round(processingTime, 2),
"responseLength": len(result) if result else 0,
"responseType": "base64_image",
"hasContent": True,
"error": None,
"testPrompt": testPrompt,
"size": size,
"quality": quality,
"style": style,
"isBase64": result.startswith("data:image") if isinstance(result, str) else False
}
# Check if result is base64
import base64
try:
# If it's a data URL, extract the base64 part
if result.startswith("data:image"):
base64Data = result.split(",")[1] if "," in result else result
else:
base64Data = result
# Try to decode to verify it's valid base64
imageBytes = base64.b64decode(base64Data)
analysisResult["isValidBase64"] = True
analysisResult["imageByteSize"] = len(imageBytes)
except:
analysisResult["isValidBase64"] = False
analysisResult["imageByteSize"] = 0
analysisResult["responsePreview"] = result[:100] + "..." if len(result) > 100 else result
analysisResult["fullResponse"] = result
print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s")
print(f"📄 Response length: {len(result)} characters")
print(f"🖼️ Valid base64: {analysisResult.get('isValidBase64', False)}")
if analysisResult.get('imageByteSize'):
print(f"🖼️ Image size: {analysisResult['imageByteSize']} bytes")
result = analysisResult
# Validate that content was extracted
if result.get("status") == "SUCCESS" and result.get("fullResponse"):
self._validateImageResponse(modelName, result)
else:
result = {
"modelName": modelName,
"status": "ERROR",
"processingTime": round(processingTime, 2),
"responseLength": 0,
"responseType": "error",
"hasContent": False,
"error": "Empty response",
"fullResponse": ""
}
except Exception as e:
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
result = {
"modelName": modelName,
"status": "EXCEPTION",
"processingTime": round(processingTime, 2),
"responseLength": 0,
"responseType": "exception",
"hasContent": False,
"error": str(e),
"testPrompt": testPrompt,
"size": size,
"quality": quality,
"style": style
}
print(f"💥 EXCEPTION - {str(e)}")
self.testResults.append(result)
# Save text response even for exceptions to log the prompt
if result.get("status") in ["SUCCESS", "EXCEPTION", "ERROR"]:
self._saveImageResponse(modelName, result)
# Save individual model result immediately
self._saveIndividualModelResult(modelName, result)
return result
def _saveImageResponse(self, modelName: str, result: Dict[str, Any]):
"""Save image generation response as image file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save as image file
filename = f"{modelName}_{timestamp}.png"
filepath = os.path.join(self.modelTestDir, filename)
# Get image data
content = result.get("fullResponse", "")
if not content:
print(f"⚠️ No image data to save for {modelName}")
return
# Decode base64 image data
import base64
try:
# Extract base64 data if it's a data URL
if content.startswith("data:image"):
base64Data = content.split(",")[1] if "," in content else content
else:
base64Data = content
# Decode base64 to bytes
imageBytes = base64.b64decode(base64Data)
# Save image file
with open(filepath, 'wb') as f:
f.write(imageBytes)
result["savedImageFile"] = filepath
print(f"🖼️ Image saved: {filepath}")
# Also save metadata as JSON
metadata = {
"modelName": modelName,
"timestamp": timestamp,
"status": result.get('status', 'Unknown'),
"processingTime": result.get('processingTime', 0),
"responseLength": result.get('responseLength', 0),
"isValidBase64": result.get('isValidBase64', False),
"imageByteSize": len(imageBytes),
"size": result.get('size', 'N/A'),
"quality": result.get('quality', 'N/A'),
"style": result.get('style', 'N/A'),
"testPrompt": result.get('testPrompt', 'N/A'),
"imageFile": filename
}
metadataFile = os.path.join(self.modelTestDir, f"{modelName}_{timestamp}_metadata.json")
with open(metadataFile, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
print(f"📄 Metadata saved: {metadataFile}")
except Exception as decodeError:
print(f"❌ Error decoding base64 image data: {str(decodeError)}")
# Fall back to saving as text file
textFile = os.path.join(self.modelTestDir, f"{modelName}_{timestamp}.txt")
with open(textFile, 'w', encoding='utf-8') as f:
f.write(f"Error decoding image:\n{str(decodeError)}\n\nBase64 data:\n{content[:500]}...")
print(f"📄 Saved base64 data as text: {textFile}")
except Exception as e:
print(f"❌ Error saving image generation response: {str(e)}")
result["saveError"] = str(e)
def _saveTextResponse(self, modelName: str, result: Dict[str, Any]):
"""Save text response to file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{modelName}_{timestamp}.txt"
filepath = os.path.join(self.modelTestDir, filename)
# Prepare content for saving
content = result.get("fullResponse", "")
if not content:
content = result.get("responsePreview", "No content available")
# If there's an error, include it in the content
if result.get("error"):
content = f"ERROR: {result.get('error')}\n\n{content}"
# Get prompt and config for logging
config = result.get("crawlConfig", {})
crawlDepth = config.get("depth", "N/A")
crawlWidth = config.get("width", "N/A")
# Get both the original JSON prompt and the actual prompt sent
originalPrompt = result.get("testPrompt", "N/A")
actualPromptSent = result.get("actualPromptSent", "N/A")
# Add metadata header
metadata = f"""Model: {modelName}
Test Time: {timestamp}
Status: {result.get('status', 'Unknown')}
Processing Time: {result.get('processingTime', 0):.2f}s
Response Length: {result.get('responseLength', 0)} characters
Is Valid JSON: {result.get('isValidJson', False)}
Test Method: {result.get('testMethod', 'standard')}
Pages Crawled: {result.get('pagesCrawled', 'N/A')}
Crawled URL: {result.get('crawledUrl', 'N/A')}
Has URL: {result.get('hasUrl', 'N/A')}
Has Title: {result.get('hasTitle', 'N/A')}
Has Content: {result.get('hasContent', 'N/A')}
Content Length: {result.get('contentLength', 'N/A')} characters
--- CRAWL CONFIGURATION ---
Depth: {crawlDepth}
Width: {crawlWidth}
--- ORIGINAL JSON PROMPT (input) ---
{originalPrompt}
--- ACTUAL PROMPT SENT TO API (EXACT) ---
{actualPromptSent}
--- RESPONSE CONTENT ---
{content}
"""
with open(filepath, 'w', encoding='utf-8') as f:
f.write(metadata)
result["savedTextFile"] = filepath
print(f"📄 Text response saved: {filepath}")
except Exception as e:
print(f"❌ Error saving text response: {str(e)}")
result["textSaveError"] = str(e)
def _validateCrawlResponse(self, modelName: str, result: Dict[str, Any]):
"""Validate that the WEB_CRAWL response contains crawled content."""
try:
content = result.get("fullResponse", "")
# Try to parse as JSON
crawledData = {}
try:
parsed = json.loads(content)
if isinstance(parsed, dict):
crawledData = parsed
except:
pass
# Check for expected fields: url, title, content
hasUrl = bool(crawledData.get("url"))
hasTitle = bool(crawledData.get("title"))
hasContent = bool(crawledData.get("content"))
contentLength = len(crawledData.get("content", ""))
result["hasUrl"] = hasUrl
result["hasTitle"] = hasTitle
result["hasContent"] = hasContent
result["contentLength"] = contentLength
result["crawledUrl"] = crawledData.get("url", "")
if hasUrl and hasContent:
print(f"✅ Successfully crawled content from URL: {crawledData.get('url', 'unknown')}")
print(f" Content length: {contentLength} characters")
print(f" Title: {crawledData.get('title', 'N/A')}")
else:
print(f"⚠️ Incomplete crawl response - URL: {hasUrl}, Content: {hasContent}")
except Exception as e:
print(f"❌ Error validating crawl response: {str(e)}")
result["crawlValidationError"] = str(e)
def _validateImageResponse(self, modelName: str, result: Dict[str, Any]):
"""Validate that the IMAGE_GENERATE response contains a valid base64 image."""
try:
content = result.get("fullResponse", "")
# Check if content is a valid base64 image
hasContent = bool(content and len(content.strip()) > 0)
result["hasContent"] = hasContent
if hasContent:
isBase64 = result.get("isValidBase64", False)
imageSize = result.get("imageByteSize", 0)
imageSizeKB = imageSize / 1024 if imageSize > 0 else 0
print(f"✅ Successfully generated image")
print(f" Image size: {imageSizeKB:.2f} KB ({imageSize} bytes)")
print(f" Valid base64: {'Yes' if isBase64 else 'No'}")
else:
print(f"⚠️ Empty or invalid image generation response")
except Exception as e:
print(f"❌ Error validating image response: {str(e)}")
result["validationError"] = str(e)
async def _testTavilyDirect(self, modelName: str, crawlDepth: int = 3, crawlWidth: int = 50) -> Dict[str, Any]:
"""Test Tavily API directly using the crawl() method with better link following."""
print(f"\n{'='*60}")
print(f"TESTING TAVILY DIRECT API (crawl method)")
print(f"{'='*60}")
startTime = asyncio.get_event_loop().time()
try:
from tavily import AsyncTavilyClient
from modules.shared.configuration import APP_CONFIG
apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET")
if not apiKey:
raise Exception("Tavily API key not found")
client = AsyncTavilyClient(api_key=apiKey)
# Map our configuration to Tavily parameters
# maxWidth -> limit (pages per level)
# maxDepth -> max_depth (link following depth)
# max_breadth = maxWidth (breadth of crawl at each level)
tavilyLimit = crawlWidth
tavilyMaxDepth = crawlDepth
tavilyMaxBreadth = crawlWidth
print(f"Calling Tavily API with crawl() method...")
print(f"URL: https://www.valueon.ch")
print(f"Instructions: Who works in this company?")
print(f"Limit: {tavilyLimit} pages per level")
print(f"Max depth: {tavilyMaxDepth} (follows links {tavilyMaxDepth} levels deep)")
print(f"Max breadth: {tavilyMaxBreadth} (up to {tavilyMaxBreadth} pages at each level)")
print(f"Deep and Broad Crawl Configuration Active")
response = await client.crawl(
url="https://www.valueon.ch",
instructions="Who works in this company?",
limit=tavilyLimit,
max_depth=tavilyMaxDepth,
max_breadth=tavilyMaxBreadth
)
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
# Analyze response
contentLength = 0
pagesCrawled = 0
fullContent = ""
if isinstance(response, dict):
# Check if it has results
if "results" in response:
results = response["results"]
pagesCrawled = len(results)
content_parts = []
for result in results:
url = result.get("url", "")
title = result.get("title", "")
content = result.get("raw_content", result.get("content", ""))
content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n")
contentLength += len(content)
fullContent = "\n".join(content_parts)
else:
fullContent = json.dumps(response, indent=2)
contentLength = len(fullContent)
elif isinstance(response, list):
pagesCrawled = len(response)
content_parts = []
for item in response:
if isinstance(item, dict):
url = item.get("url", "")
title = item.get("title", "")
content = item.get("raw_content", item.get("content", ""))
content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n")
contentLength += len(content)
fullContent = "\n".join(content_parts)
else:
fullContent = str(response)
contentLength = len(fullContent)
result = {
"modelName": modelName,
"status": "SUCCESS",
"processingTime": round(processingTime, 2),
"responseLength": contentLength,
"responseType": "TavilyDirectAPI",
"hasContent": True,
"error": None,
"modelUsed": modelName,
"priceUsd": 0.0,
"bytesSent": 0,
"bytesReceived": contentLength,
"isValidJson": True,
"fullResponse": fullContent,
"pagesCrawled": pagesCrawled,
"testMethod": "direct_api_crawl"
}
print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s")
print(f"📄 Pages crawled: {pagesCrawled}")
print(f"📄 Total content length: {contentLength} characters")
# Save the response
self._saveTextResponse(modelName, result)
self._validateCrawlResponse(modelName, result)
self._saveIndividualModelResult(modelName, result)
self.testResults.append(result)
return result
except Exception as e:
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
result = {
"modelName": modelName,
"status": "EXCEPTION",
"processingTime": round(processingTime, 2),
"responseLength": 0,
"responseType": "exception",
"hasContent": False,
"error": str(e)
}
print(f"💥 EXCEPTION - {str(e)}")
self.testResults.append(result)
return result
def _saveIndividualModelResult(self, modelName: str, result: Dict[str, Any]):
"""Save individual model test result to file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{modelName}_{timestamp}.json"
filepath = os.path.join(self.modelTestDir, filename)
# Prepare individual result data
individualData = {
"modelName": modelName,
"testTimestamp": timestamp,
"testDate": datetime.now().isoformat(),
"result": result
}
# Save to JSON file
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(individualData, f, indent=2, ensure_ascii=False)
print(f"📄 Individual result saved: {filename}")
except Exception as e:
print(f"❌ Error saving individual result: {str(e)}")
def getAllAvailableModels(self) -> List[str]:
"""Get all available model names that support IMAGE_GENERATE."""
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.datamodels.datamodelAi import OperationTypeEnum
# Get all models from registry
allModels = modelRegistry.getAvailableModels()
# Filter models that support IMAGE_GENERATE
imageGenerateModels = []
for model in allModels:
if model.operationTypes and any(
ot.operationType == OperationTypeEnum.IMAGE_GENERATE
for ot in model.operationTypes
):
imageGenerateModels.append(model.name)
# Filter to common models for testing (remove filter to test all models)
# imageGenerateModels = [m for m in imageGenerateModels if "dall-e" in m.lower()]
print(f"Found {len(imageGenerateModels)} models that support IMAGE_GENERATE:")
for modelName in imageGenerateModels:
print(f" - {modelName}")
return imageGenerateModels
def saveTestResults(self):
"""Save detailed test results to file."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
resultsFile = os.path.join(self.modelTestDir, f"modeltest_results_{timestamp}.json")
# Prepare results for saving
saveData = {
"testTimestamp": timestamp,
"testDate": datetime.now().isoformat(),
"totalModels": len(self.testResults),
"successfulModels": len([r for r in self.testResults if r["status"] == "SUCCESS"]),
"errorModels": len([r for r in self.testResults if r["status"] == "ERROR"]),
"exceptionModels": len([r for r in self.testResults if r["status"] == "EXCEPTION"]),
"results": self.testResults
}
# Calculate success rate
if saveData["totalModels"] > 0:
saveData["successRate"] = (saveData["successfulModels"] / saveData["totalModels"]) * 100
else:
saveData["successRate"] = 0
# Save to JSON file
with open(resultsFile, 'w', encoding='utf-8') as f:
json.dump(saveData, f, indent=2, ensure_ascii=False)
print(f"📄 Detailed results saved: {resultsFile}")
return resultsFile
def printTestSummary(self):
"""Print a summary of all test results."""
print(f"\n{'='*80}")
print("AI MODELS TEST SUMMARY")
print(f"{'='*80}")
totalModels = len(self.testResults)
successfulModels = len([r for r in self.testResults if r["status"] == "SUCCESS"])
errorModels = len([r for r in self.testResults if r["status"] == "ERROR"])
exceptionModels = len([r for r in self.testResults if r["status"] == "EXCEPTION"])
print(f"📊 Total models tested: {totalModels}")
print(f"✅ Successful: {successfulModels}")
print(f"❌ Errors: {errorModels}")
print(f"💥 Exceptions: {exceptionModels}")
print(f"📈 Success rate: {(successfulModels/totalModels*100):.1f}%" if totalModels > 0 else "0%")
print(f"\n{'='*80}")
print("DETAILED RESULTS")
print(f"{'='*80}")
for result in self.testResults:
status_icon = {
"SUCCESS": "",
"ERROR": "",
"EXCEPTION": "💥"
}.get(result["status"], "")
print(f"\n{status_icon} {result['modelName']}")
print(f" Status: {result['status']}")
print(f" Processing time: {result['processingTime']}s")
print(f" Response length: {result['responseLength']} characters")
print(f" Response type: {result['responseType']}")
if result.get("isValidJson") is not None:
print(f" Valid JSON: {'Yes' if result['isValidJson'] else 'No'}")
if result.get("crawledUrl"):
print(f" Crawled URL: {result['crawledUrl']}")
if result.get("contentLength") is not None:
print(f" Content length: {result['contentLength']} characters")
if result.get("pagesCrawled") is not None:
print(f" Pages crawled: {result['pagesCrawled']}")
if result["error"]:
print(f" Error: {result['error']}")
if result.get("responsePreview"):
print(f" Preview: {result['responsePreview']}")
# Find fastest and slowest models
if successfulModels > 0:
successfulResults = [r for r in self.testResults if r["status"] == "SUCCESS"]
fastest = min(successfulResults, key=lambda x: x["processingTime"])
slowest = max(successfulResults, key=lambda x: x["processingTime"])
print(f"\n{'='*80}")
print("PERFORMANCE HIGHLIGHTS")
print(f"{'='*80}")
print(f"🚀 Fastest model: {fastest['modelName']} ({fastest['processingTime']}s)")
print(f"🐌 Slowest model: {slowest['modelName']} ({slowest['processingTime']}s)")
# Find models with most content
modelsWithContent = [r for r in successfulResults if r.get("contentLength", 0) > 0]
if modelsWithContent:
mostContent = max(modelsWithContent, key=lambda x: x.get("contentLength", 0))
totalContent = sum(r.get("contentLength", 0) for r in modelsWithContent)
avgContent = totalContent / len(modelsWithContent)
print(f"📄 Model with most content: {mostContent['modelName']} ({mostContent.get('contentLength', 0)} chars)")
print(f"📊 Average content per model: {avgContent:.0f} characters")
print(f"📊 Total content crawled across all models: {totalContent} characters")
# Find models with most pages crawled (for Tavily direct API)
modelsWithPages = [r for r in successfulResults if r.get("pagesCrawled", 0) > 0]
if modelsWithPages:
mostPages = max(modelsWithPages, key=lambda x: x.get("pagesCrawled", 0))
totalPages = sum(r.get("pagesCrawled", 0) for r in modelsWithPages)
avgPages = totalPages / len(modelsWithPages)
print(f"🔍 Model with most pages crawled: {mostPages['modelName']} ({mostPages.get('pagesCrawled', 0)} pages)")
print(f"📊 Average pages per model: {avgPages:.1f} pages")
print(f"📊 Total pages crawled across all models: {totalPages} pages")
async def main():
"""Run AI models testing for IMAGE_GENERATE operation."""
tester = AIModelsTester()
print("Starting AI Models Testing for IMAGE_GENERATE...")
print("Initializing AI service...")
await tester.initialize()
# Get all available models
models = tester.getAllAvailableModels()
print(f"\nFound {len(models)} models to test:")
for i, model in enumerate(models, 1):
print(f" {i}. {model}")
print(f"\n{'='*80}")
print("STARTING IMAGE_GENERATE TESTS")
print(f"{'='*80}")
print("Testing each model's ability to generate images from text prompts...")
print("Press Enter after each model test to continue to the next one...")
# Test each model individually
for i, modelName in enumerate(models, 1):
print(f"\n[{i}/{len(models)}] Testing model: {modelName}")
# Test the model
await tester.testModel(modelName)
# Pause for user input (except for the last model)
if i < len(models):
input(f"\nPress Enter to continue to the next model...")
# Save detailed results to file
resultsFile = tester.saveTestResults()
# Print final summary
tester.printTestSummary()
print(f"\n{'='*80}")
print("TESTING COMPLETED")
print(f"{'='*80}")
print(f"📄 Results saved to: {resultsFile}")
print(f"📁 Test results saved to: {tester.modelTestDir}")
if __name__ == "__main__":
asyncio.run(main())