794 lines
34 KiB
Python
794 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AI Models Test - Tests IMAGE_GENERATE functionality on all models that support it
|
|
|
|
This script tests all models that have IMAGE_GENERATE capability, validates that
|
|
they can generate images from text prompts, and analyzes the quality of results.
|
|
|
|
CODE FLOW ANALYSIS:
|
|
|
|
1. methodAi.generateImage() is called with prompt and optional size/quality/style
|
|
2. mainServiceAi.generateImage() is called
|
|
-> delegates to subCoreAi.generateImage()
|
|
-> which calls aiObjects.generateImage()
|
|
-> which creates AiModelCall and calls model.functionCall()
|
|
|
|
WHERE FUNCTIONS ARE USED:
|
|
- mainServiceAi.generateImage(): Public API entry point for image generation
|
|
- subCoreAi.generateImage(): Internal implementation, called by mainServiceAi
|
|
- aiObjects.generateImage(): Creates standardized call and invokes model
|
|
- model.functionCall(): Direct model plugin call (e.g., DALL-E 3)
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
import os
|
|
import base64
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List
|
|
|
|
# Add the gateway to path
|
|
sys.path.append(os.path.dirname(__file__))
|
|
|
|
# Import the service initialization
|
|
from modules.features.chatPlayground.mainChatPlayground import getServices
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
|
|
from modules.datamodels.datamodelUam import User
|
|
|
|
class AIModelsTester:
|
|
def __init__(self):
|
|
# Create a minimal user context for testing
|
|
testUser = User(
|
|
id="test_user",
|
|
username="test_user",
|
|
email="test@example.com",
|
|
fullName="Test User",
|
|
language="en",
|
|
mandateId="test_mandate"
|
|
)
|
|
|
|
# Initialize services using the existing system
|
|
self.services = getServices(testUser, None) # Test user, no workflow
|
|
self.testResults = []
|
|
|
|
# Create logs directory if it doesn't exist
|
|
self.logsDir = os.path.join(os.path.dirname(__file__), "..", "local", "logs")
|
|
os.makedirs(self.logsDir, exist_ok=True)
|
|
|
|
# Create modeltest subdirectory
|
|
self.modelTestDir = os.path.join(self.logsDir, "modeltest")
|
|
os.makedirs(self.modelTestDir, exist_ok=True)
|
|
|
|
async def initialize(self):
|
|
"""Initialize the AI service."""
|
|
# Set logging level to DEBUG for detailed output
|
|
import logging
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Initialize the model registry with all connectors
|
|
from modules.aicore.aicoreModelRegistry import modelRegistry
|
|
from modules.aicore.aicorePluginTavily import AiTavily
|
|
from modules.aicore.aicorePluginPerplexity import AiPerplexity
|
|
|
|
# Note: We don't need to register web connectors for IMAGE_ANALYSE testing
|
|
# modelRegistry.registerConnector(AiTavily())
|
|
# modelRegistry.registerConnector(AiPerplexity())
|
|
|
|
# The AI service needs to be recreated with proper initialization
|
|
from modules.services.serviceAi.mainServiceAi import AiService
|
|
self.services.ai = await AiService.create(self.services)
|
|
|
|
# Also initialize extraction service for image processing
|
|
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
|
|
self.services.extraction = ExtractionService(self.services)
|
|
|
|
# Create a minimal workflow context
|
|
from modules.datamodels.datamodelChat import ChatWorkflow
|
|
import uuid
|
|
|
|
self.services.currentWorkflow = ChatWorkflow(
|
|
id=str(uuid.uuid4()),
|
|
name="Test Workflow",
|
|
status="running",
|
|
startedAt=self.services.utils.timestampGetUtc(),
|
|
lastActivity=self.services.utils.timestampGetUtc(),
|
|
currentRound=1,
|
|
currentTask=0,
|
|
currentAction=0,
|
|
totalTasks=0,
|
|
totalActions=0,
|
|
mandateId="test_mandate",
|
|
messageIds=[],
|
|
workflowMode="React",
|
|
maxSteps=5
|
|
)
|
|
|
|
print("✅ AI Service initialized successfully")
|
|
print(f"📁 Results will be saved to: {self.modelTestDir}")
|
|
|
|
async def testModel(self, modelName: str) -> Dict[str, Any]:
|
|
"""Test a specific AI model with IMAGE_GENERATE operation."""
|
|
print(f"\n{'='*60}")
|
|
print(f"TESTING MODEL: {modelName}")
|
|
print(f"OPERATION TYPE: IMAGE_GENERATE")
|
|
print(f"{'='*60}")
|
|
|
|
# Test prompt for image generation
|
|
testPrompt = 'Create a creative birthday cake designed to look like a monster truck tire/wheel. The cake appears to be chocolate-flavored and is decorated to resemble a large black tire with treads around the sides. On top of the cake, there is a mound of chocolate cake or brownie material meant to look like dirt or mud, with a toy monster truck positioned on top. The monster truck has large wheels and appears to be reddish in color. There are several small decorative flags in light blue and mint green colors stuck into the "dirt" mound. The words "HAPPY BIRTHDAY" are written in white letters around the side of the tire-shaped cake. The image appears to be from Yandex Images, as indicated by Russian text at the bottom. The status bar at the top shows 13:02 time and 82% battery level.'
|
|
size = "1024x1024"
|
|
quality = "standard"
|
|
style = "vivid"
|
|
|
|
print(f"Test prompt: {testPrompt}")
|
|
print(f"Size: {size}, Quality: {quality}, Style: {style}")
|
|
|
|
startTime = asyncio.get_event_loop().time()
|
|
|
|
try:
|
|
# Get model directly from registry and test it
|
|
from modules.aicore.aicoreModelRegistry import modelRegistry
|
|
model = modelRegistry.getModel(modelName)
|
|
|
|
if not model:
|
|
raise Exception(f"Model {modelName} not found")
|
|
|
|
# Create messages for image generation (plain text prompt)
|
|
messages = [
|
|
{
|
|
"role": "user",
|
|
"content": testPrompt
|
|
}
|
|
]
|
|
|
|
# Create model call with image generation parameters
|
|
from modules.datamodels.datamodelAi import AiModelCall, AiCallOptions
|
|
modelCall = AiModelCall(
|
|
messages=messages,
|
|
model=model,
|
|
options=AiCallOptions(
|
|
operationType=OperationTypeEnum.IMAGE_GENERATE,
|
|
size=size,
|
|
quality=quality,
|
|
style=style
|
|
)
|
|
)
|
|
|
|
# Call model directly
|
|
print(f"Calling model.functionCall() for {modelName}")
|
|
modelResponse = await model.functionCall(modelCall)
|
|
|
|
if not modelResponse.success:
|
|
raise Exception(f"Model call failed: {modelResponse.error}")
|
|
|
|
result = modelResponse.content
|
|
|
|
endTime = asyncio.get_event_loop().time()
|
|
processingTime = endTime - startTime
|
|
|
|
# Analyze result (base64 image data)
|
|
if result:
|
|
analysisResult = {
|
|
"modelName": modelName,
|
|
"status": "SUCCESS",
|
|
"processingTime": round(processingTime, 2),
|
|
"responseLength": len(result) if result else 0,
|
|
"responseType": "base64_image",
|
|
"hasContent": True,
|
|
"error": None,
|
|
"testPrompt": testPrompt,
|
|
"size": size,
|
|
"quality": quality,
|
|
"style": style,
|
|
"isBase64": result.startswith("data:image") if isinstance(result, str) else False
|
|
}
|
|
|
|
# Check if result is base64
|
|
import base64
|
|
try:
|
|
# If it's a data URL, extract the base64 part
|
|
if result.startswith("data:image"):
|
|
base64Data = result.split(",")[1] if "," in result else result
|
|
else:
|
|
base64Data = result
|
|
|
|
# Try to decode to verify it's valid base64
|
|
imageBytes = base64.b64decode(base64Data)
|
|
analysisResult["isValidBase64"] = True
|
|
analysisResult["imageByteSize"] = len(imageBytes)
|
|
except:
|
|
analysisResult["isValidBase64"] = False
|
|
analysisResult["imageByteSize"] = 0
|
|
|
|
analysisResult["responsePreview"] = result[:100] + "..." if len(result) > 100 else result
|
|
analysisResult["fullResponse"] = result
|
|
|
|
print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s")
|
|
print(f"📄 Response length: {len(result)} characters")
|
|
print(f"🖼️ Valid base64: {analysisResult.get('isValidBase64', False)}")
|
|
if analysisResult.get('imageByteSize'):
|
|
print(f"🖼️ Image size: {analysisResult['imageByteSize']} bytes")
|
|
|
|
result = analysisResult
|
|
|
|
# Validate that content was extracted
|
|
if result.get("status") == "SUCCESS" and result.get("fullResponse"):
|
|
self._validateImageResponse(modelName, result)
|
|
else:
|
|
result = {
|
|
"modelName": modelName,
|
|
"status": "ERROR",
|
|
"processingTime": round(processingTime, 2),
|
|
"responseLength": 0,
|
|
"responseType": "error",
|
|
"hasContent": False,
|
|
"error": "Empty response",
|
|
"fullResponse": ""
|
|
}
|
|
|
|
except Exception as e:
|
|
endTime = asyncio.get_event_loop().time()
|
|
processingTime = endTime - startTime
|
|
|
|
result = {
|
|
"modelName": modelName,
|
|
"status": "EXCEPTION",
|
|
"processingTime": round(processingTime, 2),
|
|
"responseLength": 0,
|
|
"responseType": "exception",
|
|
"hasContent": False,
|
|
"error": str(e),
|
|
"testPrompt": testPrompt,
|
|
"size": size,
|
|
"quality": quality,
|
|
"style": style
|
|
}
|
|
|
|
print(f"💥 EXCEPTION - {str(e)}")
|
|
|
|
self.testResults.append(result)
|
|
|
|
# Save text response even for exceptions to log the prompt
|
|
if result.get("status") in ["SUCCESS", "EXCEPTION", "ERROR"]:
|
|
self._saveImageResponse(modelName, result)
|
|
|
|
# Save individual model result immediately
|
|
self._saveIndividualModelResult(modelName, result)
|
|
|
|
return result
|
|
|
|
def _saveImageResponse(self, modelName: str, result: Dict[str, Any]):
|
|
"""Save image generation response as image file."""
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Save as image file
|
|
filename = f"{modelName}_{timestamp}.png"
|
|
filepath = os.path.join(self.modelTestDir, filename)
|
|
|
|
# Get image data
|
|
content = result.get("fullResponse", "")
|
|
|
|
if not content:
|
|
print(f"⚠️ No image data to save for {modelName}")
|
|
return
|
|
|
|
# Decode base64 image data
|
|
import base64
|
|
|
|
try:
|
|
# Extract base64 data if it's a data URL
|
|
if content.startswith("data:image"):
|
|
base64Data = content.split(",")[1] if "," in content else content
|
|
else:
|
|
base64Data = content
|
|
|
|
# Decode base64 to bytes
|
|
imageBytes = base64.b64decode(base64Data)
|
|
|
|
# Save image file
|
|
with open(filepath, 'wb') as f:
|
|
f.write(imageBytes)
|
|
|
|
result["savedImageFile"] = filepath
|
|
print(f"🖼️ Image saved: {filepath}")
|
|
|
|
# Also save metadata as JSON
|
|
metadata = {
|
|
"modelName": modelName,
|
|
"timestamp": timestamp,
|
|
"status": result.get('status', 'Unknown'),
|
|
"processingTime": result.get('processingTime', 0),
|
|
"responseLength": result.get('responseLength', 0),
|
|
"isValidBase64": result.get('isValidBase64', False),
|
|
"imageByteSize": len(imageBytes),
|
|
"size": result.get('size', 'N/A'),
|
|
"quality": result.get('quality', 'N/A'),
|
|
"style": result.get('style', 'N/A'),
|
|
"testPrompt": result.get('testPrompt', 'N/A'),
|
|
"imageFile": filename
|
|
}
|
|
|
|
metadataFile = os.path.join(self.modelTestDir, f"{modelName}_{timestamp}_metadata.json")
|
|
with open(metadataFile, 'w', encoding='utf-8') as f:
|
|
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"📄 Metadata saved: {metadataFile}")
|
|
|
|
except Exception as decodeError:
|
|
print(f"❌ Error decoding base64 image data: {str(decodeError)}")
|
|
# Fall back to saving as text file
|
|
textFile = os.path.join(self.modelTestDir, f"{modelName}_{timestamp}.txt")
|
|
with open(textFile, 'w', encoding='utf-8') as f:
|
|
f.write(f"Error decoding image:\n{str(decodeError)}\n\nBase64 data:\n{content[:500]}...")
|
|
print(f"📄 Saved base64 data as text: {textFile}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error saving image generation response: {str(e)}")
|
|
result["saveError"] = str(e)
|
|
|
|
def _saveTextResponse(self, modelName: str, result: Dict[str, Any]):
|
|
"""Save text response to file."""
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"{modelName}_{timestamp}.txt"
|
|
filepath = os.path.join(self.modelTestDir, filename)
|
|
|
|
# Prepare content for saving
|
|
content = result.get("fullResponse", "")
|
|
if not content:
|
|
content = result.get("responsePreview", "No content available")
|
|
|
|
# If there's an error, include it in the content
|
|
if result.get("error"):
|
|
content = f"ERROR: {result.get('error')}\n\n{content}"
|
|
|
|
# Get prompt and config for logging
|
|
config = result.get("crawlConfig", {})
|
|
crawlDepth = config.get("depth", "N/A")
|
|
crawlWidth = config.get("width", "N/A")
|
|
|
|
# Get both the original JSON prompt and the actual prompt sent
|
|
originalPrompt = result.get("testPrompt", "N/A")
|
|
actualPromptSent = result.get("actualPromptSent", "N/A")
|
|
|
|
# Add metadata header
|
|
metadata = f"""Model: {modelName}
|
|
Test Time: {timestamp}
|
|
Status: {result.get('status', 'Unknown')}
|
|
Processing Time: {result.get('processingTime', 0):.2f}s
|
|
Response Length: {result.get('responseLength', 0)} characters
|
|
Is Valid JSON: {result.get('isValidJson', False)}
|
|
Test Method: {result.get('testMethod', 'standard')}
|
|
Pages Crawled: {result.get('pagesCrawled', 'N/A')}
|
|
Crawled URL: {result.get('crawledUrl', 'N/A')}
|
|
Has URL: {result.get('hasUrl', 'N/A')}
|
|
Has Title: {result.get('hasTitle', 'N/A')}
|
|
Has Content: {result.get('hasContent', 'N/A')}
|
|
Content Length: {result.get('contentLength', 'N/A')} characters
|
|
|
|
--- CRAWL CONFIGURATION ---
|
|
Depth: {crawlDepth}
|
|
Width: {crawlWidth}
|
|
|
|
--- ORIGINAL JSON PROMPT (input) ---
|
|
{originalPrompt}
|
|
|
|
--- ACTUAL PROMPT SENT TO API (EXACT) ---
|
|
{actualPromptSent}
|
|
|
|
--- RESPONSE CONTENT ---
|
|
{content}
|
|
"""
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(metadata)
|
|
|
|
result["savedTextFile"] = filepath
|
|
print(f"📄 Text response saved: {filepath}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error saving text response: {str(e)}")
|
|
result["textSaveError"] = str(e)
|
|
|
|
def _validateCrawlResponse(self, modelName: str, result: Dict[str, Any]):
|
|
"""Validate that the WEB_CRAWL response contains crawled content."""
|
|
try:
|
|
content = result.get("fullResponse", "")
|
|
|
|
# Try to parse as JSON
|
|
crawledData = {}
|
|
try:
|
|
parsed = json.loads(content)
|
|
if isinstance(parsed, dict):
|
|
crawledData = parsed
|
|
except:
|
|
pass
|
|
|
|
# Check for expected fields: url, title, content
|
|
hasUrl = bool(crawledData.get("url"))
|
|
hasTitle = bool(crawledData.get("title"))
|
|
hasContent = bool(crawledData.get("content"))
|
|
contentLength = len(crawledData.get("content", ""))
|
|
|
|
result["hasUrl"] = hasUrl
|
|
result["hasTitle"] = hasTitle
|
|
result["hasContent"] = hasContent
|
|
result["contentLength"] = contentLength
|
|
result["crawledUrl"] = crawledData.get("url", "")
|
|
|
|
if hasUrl and hasContent:
|
|
print(f"✅ Successfully crawled content from URL: {crawledData.get('url', 'unknown')}")
|
|
print(f" Content length: {contentLength} characters")
|
|
print(f" Title: {crawledData.get('title', 'N/A')}")
|
|
else:
|
|
print(f"⚠️ Incomplete crawl response - URL: {hasUrl}, Content: {hasContent}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error validating crawl response: {str(e)}")
|
|
result["crawlValidationError"] = str(e)
|
|
|
|
def _validateImageResponse(self, modelName: str, result: Dict[str, Any]):
|
|
"""Validate that the IMAGE_GENERATE response contains a valid base64 image."""
|
|
try:
|
|
content = result.get("fullResponse", "")
|
|
|
|
# Check if content is a valid base64 image
|
|
hasContent = bool(content and len(content.strip()) > 0)
|
|
result["hasContent"] = hasContent
|
|
|
|
if hasContent:
|
|
isBase64 = result.get("isValidBase64", False)
|
|
imageSize = result.get("imageByteSize", 0)
|
|
imageSizeKB = imageSize / 1024 if imageSize > 0 else 0
|
|
|
|
print(f"✅ Successfully generated image")
|
|
print(f" Image size: {imageSizeKB:.2f} KB ({imageSize} bytes)")
|
|
print(f" Valid base64: {'Yes' if isBase64 else 'No'}")
|
|
else:
|
|
print(f"⚠️ Empty or invalid image generation response")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error validating image response: {str(e)}")
|
|
result["validationError"] = str(e)
|
|
|
|
async def _testTavilyDirect(self, modelName: str, crawlDepth: int = 3, crawlWidth: int = 50) -> Dict[str, Any]:
|
|
"""Test Tavily API directly using the crawl() method with better link following."""
|
|
print(f"\n{'='*60}")
|
|
print(f"TESTING TAVILY DIRECT API (crawl method)")
|
|
print(f"{'='*60}")
|
|
|
|
startTime = asyncio.get_event_loop().time()
|
|
|
|
try:
|
|
from tavily import AsyncTavilyClient
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET")
|
|
if not apiKey:
|
|
raise Exception("Tavily API key not found")
|
|
|
|
client = AsyncTavilyClient(api_key=apiKey)
|
|
|
|
# Map our configuration to Tavily parameters
|
|
# maxWidth -> limit (pages per level)
|
|
# maxDepth -> max_depth (link following depth)
|
|
# max_breadth = maxWidth (breadth of crawl at each level)
|
|
tavilyLimit = crawlWidth
|
|
tavilyMaxDepth = crawlDepth
|
|
tavilyMaxBreadth = crawlWidth
|
|
|
|
print(f"Calling Tavily API with crawl() method...")
|
|
print(f"URL: https://www.valueon.ch")
|
|
print(f"Instructions: Who works in this company?")
|
|
print(f"Limit: {tavilyLimit} pages per level")
|
|
print(f"Max depth: {tavilyMaxDepth} (follows links {tavilyMaxDepth} levels deep)")
|
|
print(f"Max breadth: {tavilyMaxBreadth} (up to {tavilyMaxBreadth} pages at each level)")
|
|
print(f"Deep and Broad Crawl Configuration Active")
|
|
|
|
response = await client.crawl(
|
|
url="https://www.valueon.ch",
|
|
instructions="Who works in this company?",
|
|
limit=tavilyLimit,
|
|
max_depth=tavilyMaxDepth,
|
|
max_breadth=tavilyMaxBreadth
|
|
)
|
|
|
|
endTime = asyncio.get_event_loop().time()
|
|
processingTime = endTime - startTime
|
|
|
|
# Analyze response
|
|
contentLength = 0
|
|
pagesCrawled = 0
|
|
fullContent = ""
|
|
|
|
if isinstance(response, dict):
|
|
# Check if it has results
|
|
if "results" in response:
|
|
results = response["results"]
|
|
pagesCrawled = len(results)
|
|
content_parts = []
|
|
for result in results:
|
|
url = result.get("url", "")
|
|
title = result.get("title", "")
|
|
content = result.get("raw_content", result.get("content", ""))
|
|
content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n")
|
|
contentLength += len(content)
|
|
|
|
fullContent = "\n".join(content_parts)
|
|
else:
|
|
fullContent = json.dumps(response, indent=2)
|
|
contentLength = len(fullContent)
|
|
elif isinstance(response, list):
|
|
pagesCrawled = len(response)
|
|
content_parts = []
|
|
for item in response:
|
|
if isinstance(item, dict):
|
|
url = item.get("url", "")
|
|
title = item.get("title", "")
|
|
content = item.get("raw_content", item.get("content", ""))
|
|
content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n")
|
|
contentLength += len(content)
|
|
|
|
fullContent = "\n".join(content_parts)
|
|
else:
|
|
fullContent = str(response)
|
|
contentLength = len(fullContent)
|
|
|
|
result = {
|
|
"modelName": modelName,
|
|
"status": "SUCCESS",
|
|
"processingTime": round(processingTime, 2),
|
|
"responseLength": contentLength,
|
|
"responseType": "TavilyDirectAPI",
|
|
"hasContent": True,
|
|
"error": None,
|
|
"modelUsed": modelName,
|
|
"priceUsd": 0.0,
|
|
"bytesSent": 0,
|
|
"bytesReceived": contentLength,
|
|
"isValidJson": True,
|
|
"fullResponse": fullContent,
|
|
"pagesCrawled": pagesCrawled,
|
|
"testMethod": "direct_api_crawl"
|
|
}
|
|
|
|
print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s")
|
|
print(f"📄 Pages crawled: {pagesCrawled}")
|
|
print(f"📄 Total content length: {contentLength} characters")
|
|
|
|
# Save the response
|
|
self._saveTextResponse(modelName, result)
|
|
self._validateCrawlResponse(modelName, result)
|
|
self._saveIndividualModelResult(modelName, result)
|
|
|
|
self.testResults.append(result)
|
|
return result
|
|
|
|
except Exception as e:
|
|
endTime = asyncio.get_event_loop().time()
|
|
processingTime = endTime - startTime
|
|
|
|
result = {
|
|
"modelName": modelName,
|
|
"status": "EXCEPTION",
|
|
"processingTime": round(processingTime, 2),
|
|
"responseLength": 0,
|
|
"responseType": "exception",
|
|
"hasContent": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
print(f"💥 EXCEPTION - {str(e)}")
|
|
self.testResults.append(result)
|
|
return result
|
|
|
|
def _saveIndividualModelResult(self, modelName: str, result: Dict[str, Any]):
|
|
"""Save individual model test result to file."""
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"{modelName}_{timestamp}.json"
|
|
filepath = os.path.join(self.modelTestDir, filename)
|
|
|
|
# Prepare individual result data
|
|
individualData = {
|
|
"modelName": modelName,
|
|
"testTimestamp": timestamp,
|
|
"testDate": datetime.now().isoformat(),
|
|
"result": result
|
|
}
|
|
|
|
# Save to JSON file
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(individualData, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"📄 Individual result saved: {filename}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error saving individual result: {str(e)}")
|
|
|
|
def getAllAvailableModels(self) -> List[str]:
|
|
"""Get all available model names that support IMAGE_GENERATE."""
|
|
from modules.aicore.aicoreModelRegistry import modelRegistry
|
|
from modules.datamodels.datamodelAi import OperationTypeEnum
|
|
|
|
# Get all models from registry
|
|
allModels = modelRegistry.getAvailableModels()
|
|
|
|
# Filter models that support IMAGE_GENERATE
|
|
imageGenerateModels = []
|
|
for model in allModels:
|
|
if model.operationTypes and any(
|
|
ot.operationType == OperationTypeEnum.IMAGE_GENERATE
|
|
for ot in model.operationTypes
|
|
):
|
|
imageGenerateModels.append(model.name)
|
|
|
|
# Filter to common models for testing (remove filter to test all models)
|
|
# imageGenerateModels = [m for m in imageGenerateModels if "dall-e" in m.lower()]
|
|
|
|
print(f"Found {len(imageGenerateModels)} models that support IMAGE_GENERATE:")
|
|
for modelName in imageGenerateModels:
|
|
print(f" - {modelName}")
|
|
|
|
return imageGenerateModels
|
|
|
|
def saveTestResults(self):
|
|
"""Save detailed test results to file."""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
resultsFile = os.path.join(self.modelTestDir, f"modeltest_results_{timestamp}.json")
|
|
|
|
# Prepare results for saving
|
|
saveData = {
|
|
"testTimestamp": timestamp,
|
|
"testDate": datetime.now().isoformat(),
|
|
"totalModels": len(self.testResults),
|
|
"successfulModels": len([r for r in self.testResults if r["status"] == "SUCCESS"]),
|
|
"errorModels": len([r for r in self.testResults if r["status"] == "ERROR"]),
|
|
"exceptionModels": len([r for r in self.testResults if r["status"] == "EXCEPTION"]),
|
|
"results": self.testResults
|
|
}
|
|
|
|
# Calculate success rate
|
|
if saveData["totalModels"] > 0:
|
|
saveData["successRate"] = (saveData["successfulModels"] / saveData["totalModels"]) * 100
|
|
else:
|
|
saveData["successRate"] = 0
|
|
|
|
# Save to JSON file
|
|
with open(resultsFile, 'w', encoding='utf-8') as f:
|
|
json.dump(saveData, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"📄 Detailed results saved: {resultsFile}")
|
|
return resultsFile
|
|
|
|
def printTestSummary(self):
|
|
"""Print a summary of all test results."""
|
|
print(f"\n{'='*80}")
|
|
print("AI MODELS TEST SUMMARY")
|
|
print(f"{'='*80}")
|
|
|
|
totalModels = len(self.testResults)
|
|
successfulModels = len([r for r in self.testResults if r["status"] == "SUCCESS"])
|
|
errorModels = len([r for r in self.testResults if r["status"] == "ERROR"])
|
|
exceptionModels = len([r for r in self.testResults if r["status"] == "EXCEPTION"])
|
|
|
|
print(f"📊 Total models tested: {totalModels}")
|
|
print(f"✅ Successful: {successfulModels}")
|
|
print(f"❌ Errors: {errorModels}")
|
|
print(f"💥 Exceptions: {exceptionModels}")
|
|
print(f"📈 Success rate: {(successfulModels/totalModels*100):.1f}%" if totalModels > 0 else "0%")
|
|
|
|
print(f"\n{'='*80}")
|
|
print("DETAILED RESULTS")
|
|
print(f"{'='*80}")
|
|
|
|
for result in self.testResults:
|
|
status_icon = {
|
|
"SUCCESS": "✅",
|
|
"ERROR": "❌",
|
|
"EXCEPTION": "💥"
|
|
}.get(result["status"], "❓")
|
|
|
|
print(f"\n{status_icon} {result['modelName']}")
|
|
print(f" Status: {result['status']}")
|
|
print(f" Processing time: {result['processingTime']}s")
|
|
print(f" Response length: {result['responseLength']} characters")
|
|
print(f" Response type: {result['responseType']}")
|
|
|
|
if result.get("isValidJson") is not None:
|
|
print(f" Valid JSON: {'Yes' if result['isValidJson'] else 'No'}")
|
|
|
|
if result.get("crawledUrl"):
|
|
print(f" Crawled URL: {result['crawledUrl']}")
|
|
|
|
if result.get("contentLength") is not None:
|
|
print(f" Content length: {result['contentLength']} characters")
|
|
|
|
if result.get("pagesCrawled") is not None:
|
|
print(f" Pages crawled: {result['pagesCrawled']}")
|
|
|
|
if result["error"]:
|
|
print(f" Error: {result['error']}")
|
|
|
|
if result.get("responsePreview"):
|
|
print(f" Preview: {result['responsePreview']}")
|
|
|
|
# Find fastest and slowest models
|
|
if successfulModels > 0:
|
|
successfulResults = [r for r in self.testResults if r["status"] == "SUCCESS"]
|
|
fastest = min(successfulResults, key=lambda x: x["processingTime"])
|
|
slowest = max(successfulResults, key=lambda x: x["processingTime"])
|
|
|
|
print(f"\n{'='*80}")
|
|
print("PERFORMANCE HIGHLIGHTS")
|
|
print(f"{'='*80}")
|
|
print(f"🚀 Fastest model: {fastest['modelName']} ({fastest['processingTime']}s)")
|
|
print(f"🐌 Slowest model: {slowest['modelName']} ({slowest['processingTime']}s)")
|
|
|
|
# Find models with most content
|
|
modelsWithContent = [r for r in successfulResults if r.get("contentLength", 0) > 0]
|
|
if modelsWithContent:
|
|
mostContent = max(modelsWithContent, key=lambda x: x.get("contentLength", 0))
|
|
totalContent = sum(r.get("contentLength", 0) for r in modelsWithContent)
|
|
avgContent = totalContent / len(modelsWithContent)
|
|
print(f"📄 Model with most content: {mostContent['modelName']} ({mostContent.get('contentLength', 0)} chars)")
|
|
print(f"📊 Average content per model: {avgContent:.0f} characters")
|
|
print(f"📊 Total content crawled across all models: {totalContent} characters")
|
|
|
|
# Find models with most pages crawled (for Tavily direct API)
|
|
modelsWithPages = [r for r in successfulResults if r.get("pagesCrawled", 0) > 0]
|
|
if modelsWithPages:
|
|
mostPages = max(modelsWithPages, key=lambda x: x.get("pagesCrawled", 0))
|
|
totalPages = sum(r.get("pagesCrawled", 0) for r in modelsWithPages)
|
|
avgPages = totalPages / len(modelsWithPages)
|
|
print(f"🔍 Model with most pages crawled: {mostPages['modelName']} ({mostPages.get('pagesCrawled', 0)} pages)")
|
|
print(f"📊 Average pages per model: {avgPages:.1f} pages")
|
|
print(f"📊 Total pages crawled across all models: {totalPages} pages")
|
|
|
|
async def main():
|
|
"""Run AI models testing for IMAGE_GENERATE operation."""
|
|
tester = AIModelsTester()
|
|
|
|
print("Starting AI Models Testing for IMAGE_GENERATE...")
|
|
print("Initializing AI service...")
|
|
await tester.initialize()
|
|
|
|
# Get all available models
|
|
models = tester.getAllAvailableModels()
|
|
|
|
print(f"\nFound {len(models)} models to test:")
|
|
for i, model in enumerate(models, 1):
|
|
print(f" {i}. {model}")
|
|
|
|
print(f"\n{'='*80}")
|
|
print("STARTING IMAGE_GENERATE TESTS")
|
|
print(f"{'='*80}")
|
|
print("Testing each model's ability to generate images from text prompts...")
|
|
print("Press Enter after each model test to continue to the next one...")
|
|
|
|
# Test each model individually
|
|
for i, modelName in enumerate(models, 1):
|
|
print(f"\n[{i}/{len(models)}] Testing model: {modelName}")
|
|
|
|
# Test the model
|
|
await tester.testModel(modelName)
|
|
|
|
# Pause for user input (except for the last model)
|
|
if i < len(models):
|
|
input(f"\nPress Enter to continue to the next model...")
|
|
|
|
# Save detailed results to file
|
|
resultsFile = tester.saveTestResults()
|
|
|
|
# Print final summary
|
|
tester.printTestSummary()
|
|
|
|
print(f"\n{'='*80}")
|
|
print("TESTING COMPLETED")
|
|
print(f"{'='*80}")
|
|
print(f"📄 Results saved to: {resultsFile}")
|
|
print(f"📁 Test results saved to: {tester.modelTestDir}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|