#!/usr/bin/env python3 """ AI Models Test - Tests IMAGE_GENERATE functionality on all models that support it This script tests all models that have IMAGE_GENERATE capability, validates that they can generate images from text prompts, and analyzes the quality of results. CODE FLOW ANALYSIS: 1. methodAi.generateImage() is called with prompt and optional size/quality/style 2. mainServiceAi.generateImage() is called -> delegates to subCoreAi.generateImage() -> which calls aiObjects.generateImage() -> which creates AiModelCall and calls model.functionCall() WHERE FUNCTIONS ARE USED: - mainServiceAi.generateImage(): Public API entry point for image generation - subCoreAi.generateImage(): Internal implementation, called by mainServiceAi - aiObjects.generateImage(): Creates standardized call and invokes model - model.functionCall(): Direct model plugin call (e.g., DALL-E 3) """ import asyncio import json import sys import os import base64 from datetime import datetime from typing import Dict, Any, List # Add the gateway to path sys.path.append(os.path.dirname(__file__)) # Import the service initialization from modules.features.chatPlayground.mainChatPlayground import getServices from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.datamodels.datamodelUam import User class AIModelsTester: def __init__(self): # Create a minimal user context for testing testUser = User( id="test_user", username="test_user", email="test@example.com", fullName="Test User", language="en", mandateId="test_mandate" ) # Initialize services using the existing system self.services = getServices(testUser, None) # Test user, no workflow self.testResults = [] # Create logs directory if it doesn't exist self.logsDir = os.path.join(os.path.dirname(__file__), "..", "local", "logs") os.makedirs(self.logsDir, exist_ok=True) # Create modeltest subdirectory self.modelTestDir = os.path.join(self.logsDir, "modeltest") os.makedirs(self.modelTestDir, exist_ok=True) async def initialize(self): """Initialize the AI service.""" # Set logging level to DEBUG for detailed output import logging logging.getLogger().setLevel(logging.DEBUG) # Initialize the model registry with all connectors from modules.aicore.aicoreModelRegistry import modelRegistry from modules.aicore.aicorePluginTavily import AiTavily from modules.aicore.aicorePluginPerplexity import AiPerplexity # Note: We don't need to register web connectors for IMAGE_ANALYSE testing # modelRegistry.registerConnector(AiTavily()) # modelRegistry.registerConnector(AiPerplexity()) # The AI service needs to be recreated with proper initialization from modules.services.serviceAi.mainServiceAi import AiService self.services.ai = await AiService.create(self.services) # Also initialize extraction service for image processing from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService self.services.extraction = ExtractionService(self.services) # Create a minimal workflow context from modules.datamodels.datamodelChat import ChatWorkflow import uuid self.services.currentWorkflow = ChatWorkflow( id=str(uuid.uuid4()), name="Test Workflow", status="running", startedAt=self.services.utils.timestampGetUtc(), lastActivity=self.services.utils.timestampGetUtc(), currentRound=1, currentTask=0, currentAction=0, totalTasks=0, totalActions=0, mandateId="test_mandate", messageIds=[], workflowMode="React", maxSteps=5 ) print("✅ AI Service initialized successfully") print(f"📁 Results will be saved to: {self.modelTestDir}") async def testModel(self, modelName: str) -> Dict[str, Any]: """Test a specific AI model with IMAGE_GENERATE operation.""" print(f"\n{'='*60}") print(f"TESTING MODEL: {modelName}") print(f"OPERATION TYPE: IMAGE_GENERATE") print(f"{'='*60}") # Test prompt for image generation testPrompt = 'Create a creative birthday cake designed to look like a monster truck tire/wheel. The cake appears to be chocolate-flavored and is decorated to resemble a large black tire with treads around the sides. On top of the cake, there is a mound of chocolate cake or brownie material meant to look like dirt or mud, with a toy monster truck positioned on top. The monster truck has large wheels and appears to be reddish in color. There are several small decorative flags in light blue and mint green colors stuck into the "dirt" mound. The words "HAPPY BIRTHDAY" are written in white letters around the side of the tire-shaped cake. The image appears to be from Yandex Images, as indicated by Russian text at the bottom. The status bar at the top shows 13:02 time and 82% battery level.' size = "1024x1024" quality = "standard" style = "vivid" print(f"Test prompt: {testPrompt}") print(f"Size: {size}, Quality: {quality}, Style: {style}") startTime = asyncio.get_event_loop().time() try: # Get model directly from registry and test it from modules.aicore.aicoreModelRegistry import modelRegistry model = modelRegistry.getModel(modelName) if not model: raise Exception(f"Model {modelName} not found") # Create messages for image generation (plain text prompt) messages = [ { "role": "user", "content": testPrompt } ] # Create model call with image generation parameters from modules.datamodels.datamodelAi import AiModelCall, AiCallOptions modelCall = AiModelCall( messages=messages, model=model, options=AiCallOptions( operationType=OperationTypeEnum.IMAGE_GENERATE, size=size, quality=quality, style=style ) ) # Call model directly print(f"Calling model.functionCall() for {modelName}") modelResponse = await model.functionCall(modelCall) if not modelResponse.success: raise Exception(f"Model call failed: {modelResponse.error}") result = modelResponse.content endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime # Analyze result (base64 image data) if result: analysisResult = { "modelName": modelName, "status": "SUCCESS", "processingTime": round(processingTime, 2), "responseLength": len(result) if result else 0, "responseType": "base64_image", "hasContent": True, "error": None, "testPrompt": testPrompt, "size": size, "quality": quality, "style": style, "isBase64": result.startswith("data:image") if isinstance(result, str) else False } # Check if result is base64 import base64 try: # If it's a data URL, extract the base64 part if result.startswith("data:image"): base64Data = result.split(",")[1] if "," in result else result else: base64Data = result # Try to decode to verify it's valid base64 imageBytes = base64.b64decode(base64Data) analysisResult["isValidBase64"] = True analysisResult["imageByteSize"] = len(imageBytes) except: analysisResult["isValidBase64"] = False analysisResult["imageByteSize"] = 0 analysisResult["responsePreview"] = result[:100] + "..." if len(result) > 100 else result analysisResult["fullResponse"] = result print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s") print(f"📄 Response length: {len(result)} characters") print(f"🖼️ Valid base64: {analysisResult.get('isValidBase64', False)}") if analysisResult.get('imageByteSize'): print(f"🖼️ Image size: {analysisResult['imageByteSize']} bytes") result = analysisResult # Validate that content was extracted if result.get("status") == "SUCCESS" and result.get("fullResponse"): self._validateImageResponse(modelName, result) else: result = { "modelName": modelName, "status": "ERROR", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "error", "hasContent": False, "error": "Empty response", "fullResponse": "" } except Exception as e: endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime result = { "modelName": modelName, "status": "EXCEPTION", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "exception", "hasContent": False, "error": str(e), "testPrompt": testPrompt, "size": size, "quality": quality, "style": style } print(f"💥 EXCEPTION - {str(e)}") self.testResults.append(result) # Save text response even for exceptions to log the prompt if result.get("status") in ["SUCCESS", "EXCEPTION", "ERROR"]: self._saveImageResponse(modelName, result) # Save individual model result immediately self._saveIndividualModelResult(modelName, result) return result def _saveImageResponse(self, modelName: str, result: Dict[str, Any]): """Save image generation response as image file.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Save as image file filename = f"{modelName}_{timestamp}.png" filepath = os.path.join(self.modelTestDir, filename) # Get image data content = result.get("fullResponse", "") if not content: print(f"⚠️ No image data to save for {modelName}") return # Decode base64 image data import base64 try: # Extract base64 data if it's a data URL if content.startswith("data:image"): base64Data = content.split(",")[1] if "," in content else content else: base64Data = content # Decode base64 to bytes imageBytes = base64.b64decode(base64Data) # Save image file with open(filepath, 'wb') as f: f.write(imageBytes) result["savedImageFile"] = filepath print(f"🖼️ Image saved: {filepath}") # Also save metadata as JSON metadata = { "modelName": modelName, "timestamp": timestamp, "status": result.get('status', 'Unknown'), "processingTime": result.get('processingTime', 0), "responseLength": result.get('responseLength', 0), "isValidBase64": result.get('isValidBase64', False), "imageByteSize": len(imageBytes), "size": result.get('size', 'N/A'), "quality": result.get('quality', 'N/A'), "style": result.get('style', 'N/A'), "testPrompt": result.get('testPrompt', 'N/A'), "imageFile": filename } metadataFile = os.path.join(self.modelTestDir, f"{modelName}_{timestamp}_metadata.json") with open(metadataFile, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) print(f"📄 Metadata saved: {metadataFile}") except Exception as decodeError: print(f"❌ Error decoding base64 image data: {str(decodeError)}") # Fall back to saving as text file textFile = os.path.join(self.modelTestDir, f"{modelName}_{timestamp}.txt") with open(textFile, 'w', encoding='utf-8') as f: f.write(f"Error decoding image:\n{str(decodeError)}\n\nBase64 data:\n{content[:500]}...") print(f"📄 Saved base64 data as text: {textFile}") except Exception as e: print(f"❌ Error saving image generation response: {str(e)}") result["saveError"] = str(e) def _saveTextResponse(self, modelName: str, result: Dict[str, Any]): """Save text response to file.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{modelName}_{timestamp}.txt" filepath = os.path.join(self.modelTestDir, filename) # Prepare content for saving content = result.get("fullResponse", "") if not content: content = result.get("responsePreview", "No content available") # If there's an error, include it in the content if result.get("error"): content = f"ERROR: {result.get('error')}\n\n{content}" # Get prompt and config for logging config = result.get("crawlConfig", {}) crawlDepth = config.get("depth", "N/A") crawlWidth = config.get("width", "N/A") # Get both the original JSON prompt and the actual prompt sent originalPrompt = result.get("testPrompt", "N/A") actualPromptSent = result.get("actualPromptSent", "N/A") # Add metadata header metadata = f"""Model: {modelName} Test Time: {timestamp} Status: {result.get('status', 'Unknown')} Processing Time: {result.get('processingTime', 0):.2f}s Response Length: {result.get('responseLength', 0)} characters Is Valid JSON: {result.get('isValidJson', False)} Test Method: {result.get('testMethod', 'standard')} Pages Crawled: {result.get('pagesCrawled', 'N/A')} Crawled URL: {result.get('crawledUrl', 'N/A')} Has URL: {result.get('hasUrl', 'N/A')} Has Title: {result.get('hasTitle', 'N/A')} Has Content: {result.get('hasContent', 'N/A')} Content Length: {result.get('contentLength', 'N/A')} characters --- CRAWL CONFIGURATION --- Depth: {crawlDepth} Width: {crawlWidth} --- ORIGINAL JSON PROMPT (input) --- {originalPrompt} --- ACTUAL PROMPT SENT TO API (EXACT) --- {actualPromptSent} --- RESPONSE CONTENT --- {content} """ with open(filepath, 'w', encoding='utf-8') as f: f.write(metadata) result["savedTextFile"] = filepath print(f"📄 Text response saved: {filepath}") except Exception as e: print(f"❌ Error saving text response: {str(e)}") result["textSaveError"] = str(e) def _validateCrawlResponse(self, modelName: str, result: Dict[str, Any]): """Validate that the WEB_CRAWL response contains crawled content.""" try: content = result.get("fullResponse", "") # Try to parse as JSON crawledData = {} try: parsed = json.loads(content) if isinstance(parsed, dict): crawledData = parsed except: pass # Check for expected fields: url, title, content hasUrl = bool(crawledData.get("url")) hasTitle = bool(crawledData.get("title")) hasContent = bool(crawledData.get("content")) contentLength = len(crawledData.get("content", "")) result["hasUrl"] = hasUrl result["hasTitle"] = hasTitle result["hasContent"] = hasContent result["contentLength"] = contentLength result["crawledUrl"] = crawledData.get("url", "") if hasUrl and hasContent: print(f"✅ Successfully crawled content from URL: {crawledData.get('url', 'unknown')}") print(f" Content length: {contentLength} characters") print(f" Title: {crawledData.get('title', 'N/A')}") else: print(f"⚠️ Incomplete crawl response - URL: {hasUrl}, Content: {hasContent}") except Exception as e: print(f"❌ Error validating crawl response: {str(e)}") result["crawlValidationError"] = str(e) def _validateImageResponse(self, modelName: str, result: Dict[str, Any]): """Validate that the IMAGE_GENERATE response contains a valid base64 image.""" try: content = result.get("fullResponse", "") # Check if content is a valid base64 image hasContent = bool(content and len(content.strip()) > 0) result["hasContent"] = hasContent if hasContent: isBase64 = result.get("isValidBase64", False) imageSize = result.get("imageByteSize", 0) imageSizeKB = imageSize / 1024 if imageSize > 0 else 0 print(f"✅ Successfully generated image") print(f" Image size: {imageSizeKB:.2f} KB ({imageSize} bytes)") print(f" Valid base64: {'Yes' if isBase64 else 'No'}") else: print(f"⚠️ Empty or invalid image generation response") except Exception as e: print(f"❌ Error validating image response: {str(e)}") result["validationError"] = str(e) async def _testTavilyDirect(self, modelName: str, crawlDepth: int = 3, crawlWidth: int = 50) -> Dict[str, Any]: """Test Tavily API directly using the crawl() method with better link following.""" print(f"\n{'='*60}") print(f"TESTING TAVILY DIRECT API (crawl method)") print(f"{'='*60}") startTime = asyncio.get_event_loop().time() try: from tavily import AsyncTavilyClient from modules.shared.configuration import APP_CONFIG apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET") if not apiKey: raise Exception("Tavily API key not found") client = AsyncTavilyClient(api_key=apiKey) # Map our configuration to Tavily parameters # maxWidth -> limit (pages per level) # maxDepth -> max_depth (link following depth) # max_breadth = maxWidth (breadth of crawl at each level) tavilyLimit = crawlWidth tavilyMaxDepth = crawlDepth tavilyMaxBreadth = crawlWidth print(f"Calling Tavily API with crawl() method...") print(f"URL: https://www.valueon.ch") print(f"Instructions: Who works in this company?") print(f"Limit: {tavilyLimit} pages per level") print(f"Max depth: {tavilyMaxDepth} (follows links {tavilyMaxDepth} levels deep)") print(f"Max breadth: {tavilyMaxBreadth} (up to {tavilyMaxBreadth} pages at each level)") print(f"Deep and Broad Crawl Configuration Active") response = await client.crawl( url="https://www.valueon.ch", instructions="Who works in this company?", limit=tavilyLimit, max_depth=tavilyMaxDepth, max_breadth=tavilyMaxBreadth ) endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime # Analyze response contentLength = 0 pagesCrawled = 0 fullContent = "" if isinstance(response, dict): # Check if it has results if "results" in response: results = response["results"] pagesCrawled = len(results) content_parts = [] for result in results: url = result.get("url", "") title = result.get("title", "") content = result.get("raw_content", result.get("content", "")) content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n") contentLength += len(content) fullContent = "\n".join(content_parts) else: fullContent = json.dumps(response, indent=2) contentLength = len(fullContent) elif isinstance(response, list): pagesCrawled = len(response) content_parts = [] for item in response: if isinstance(item, dict): url = item.get("url", "") title = item.get("title", "") content = item.get("raw_content", item.get("content", "")) content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n") contentLength += len(content) fullContent = "\n".join(content_parts) else: fullContent = str(response) contentLength = len(fullContent) result = { "modelName": modelName, "status": "SUCCESS", "processingTime": round(processingTime, 2), "responseLength": contentLength, "responseType": "TavilyDirectAPI", "hasContent": True, "error": None, "modelUsed": modelName, "priceUsd": 0.0, "bytesSent": 0, "bytesReceived": contentLength, "isValidJson": True, "fullResponse": fullContent, "pagesCrawled": pagesCrawled, "testMethod": "direct_api_crawl" } print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s") print(f"📄 Pages crawled: {pagesCrawled}") print(f"📄 Total content length: {contentLength} characters") # Save the response self._saveTextResponse(modelName, result) self._validateCrawlResponse(modelName, result) self._saveIndividualModelResult(modelName, result) self.testResults.append(result) return result except Exception as e: endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime result = { "modelName": modelName, "status": "EXCEPTION", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "exception", "hasContent": False, "error": str(e) } print(f"💥 EXCEPTION - {str(e)}") self.testResults.append(result) return result def _saveIndividualModelResult(self, modelName: str, result: Dict[str, Any]): """Save individual model test result to file.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{modelName}_{timestamp}.json" filepath = os.path.join(self.modelTestDir, filename) # Prepare individual result data individualData = { "modelName": modelName, "testTimestamp": timestamp, "testDate": datetime.now().isoformat(), "result": result } # Save to JSON file with open(filepath, 'w', encoding='utf-8') as f: json.dump(individualData, f, indent=2, ensure_ascii=False) print(f"📄 Individual result saved: {filename}") except Exception as e: print(f"❌ Error saving individual result: {str(e)}") def getAllAvailableModels(self) -> List[str]: """Get all available model names that support IMAGE_GENERATE.""" from modules.aicore.aicoreModelRegistry import modelRegistry from modules.datamodels.datamodelAi import OperationTypeEnum # Get all models from registry allModels = modelRegistry.getAvailableModels() # Filter models that support IMAGE_GENERATE imageGenerateModels = [] for model in allModels: if model.operationTypes and any( ot.operationType == OperationTypeEnum.IMAGE_GENERATE for ot in model.operationTypes ): imageGenerateModels.append(model.name) # Filter to common models for testing (remove filter to test all models) # imageGenerateModels = [m for m in imageGenerateModels if "dall-e" in m.lower()] print(f"Found {len(imageGenerateModels)} models that support IMAGE_GENERATE:") for modelName in imageGenerateModels: print(f" - {modelName}") return imageGenerateModels def saveTestResults(self): """Save detailed test results to file.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") resultsFile = os.path.join(self.modelTestDir, f"modeltest_results_{timestamp}.json") # Prepare results for saving saveData = { "testTimestamp": timestamp, "testDate": datetime.now().isoformat(), "totalModels": len(self.testResults), "successfulModels": len([r for r in self.testResults if r["status"] == "SUCCESS"]), "errorModels": len([r for r in self.testResults if r["status"] == "ERROR"]), "exceptionModels": len([r for r in self.testResults if r["status"] == "EXCEPTION"]), "results": self.testResults } # Calculate success rate if saveData["totalModels"] > 0: saveData["successRate"] = (saveData["successfulModels"] / saveData["totalModels"]) * 100 else: saveData["successRate"] = 0 # Save to JSON file with open(resultsFile, 'w', encoding='utf-8') as f: json.dump(saveData, f, indent=2, ensure_ascii=False) print(f"📄 Detailed results saved: {resultsFile}") return resultsFile def printTestSummary(self): """Print a summary of all test results.""" print(f"\n{'='*80}") print("AI MODELS TEST SUMMARY") print(f"{'='*80}") totalModels = len(self.testResults) successfulModels = len([r for r in self.testResults if r["status"] == "SUCCESS"]) errorModels = len([r for r in self.testResults if r["status"] == "ERROR"]) exceptionModels = len([r for r in self.testResults if r["status"] == "EXCEPTION"]) print(f"📊 Total models tested: {totalModels}") print(f"✅ Successful: {successfulModels}") print(f"❌ Errors: {errorModels}") print(f"💥 Exceptions: {exceptionModels}") print(f"📈 Success rate: {(successfulModels/totalModels*100):.1f}%" if totalModels > 0 else "0%") print(f"\n{'='*80}") print("DETAILED RESULTS") print(f"{'='*80}") for result in self.testResults: status_icon = { "SUCCESS": "✅", "ERROR": "❌", "EXCEPTION": "💥" }.get(result["status"], "❓") print(f"\n{status_icon} {result['modelName']}") print(f" Status: {result['status']}") print(f" Processing time: {result['processingTime']}s") print(f" Response length: {result['responseLength']} characters") print(f" Response type: {result['responseType']}") if result.get("isValidJson") is not None: print(f" Valid JSON: {'Yes' if result['isValidJson'] else 'No'}") if result.get("crawledUrl"): print(f" Crawled URL: {result['crawledUrl']}") if result.get("contentLength") is not None: print(f" Content length: {result['contentLength']} characters") if result.get("pagesCrawled") is not None: print(f" Pages crawled: {result['pagesCrawled']}") if result["error"]: print(f" Error: {result['error']}") if result.get("responsePreview"): print(f" Preview: {result['responsePreview']}") # Find fastest and slowest models if successfulModels > 0: successfulResults = [r for r in self.testResults if r["status"] == "SUCCESS"] fastest = min(successfulResults, key=lambda x: x["processingTime"]) slowest = max(successfulResults, key=lambda x: x["processingTime"]) print(f"\n{'='*80}") print("PERFORMANCE HIGHLIGHTS") print(f"{'='*80}") print(f"🚀 Fastest model: {fastest['modelName']} ({fastest['processingTime']}s)") print(f"🐌 Slowest model: {slowest['modelName']} ({slowest['processingTime']}s)") # Find models with most content modelsWithContent = [r for r in successfulResults if r.get("contentLength", 0) > 0] if modelsWithContent: mostContent = max(modelsWithContent, key=lambda x: x.get("contentLength", 0)) totalContent = sum(r.get("contentLength", 0) for r in modelsWithContent) avgContent = totalContent / len(modelsWithContent) print(f"📄 Model with most content: {mostContent['modelName']} ({mostContent.get('contentLength', 0)} chars)") print(f"📊 Average content per model: {avgContent:.0f} characters") print(f"📊 Total content crawled across all models: {totalContent} characters") # Find models with most pages crawled (for Tavily direct API) modelsWithPages = [r for r in successfulResults if r.get("pagesCrawled", 0) > 0] if modelsWithPages: mostPages = max(modelsWithPages, key=lambda x: x.get("pagesCrawled", 0)) totalPages = sum(r.get("pagesCrawled", 0) for r in modelsWithPages) avgPages = totalPages / len(modelsWithPages) print(f"🔍 Model with most pages crawled: {mostPages['modelName']} ({mostPages.get('pagesCrawled', 0)} pages)") print(f"📊 Average pages per model: {avgPages:.1f} pages") print(f"📊 Total pages crawled across all models: {totalPages} pages") async def main(): """Run AI models testing for IMAGE_GENERATE operation.""" tester = AIModelsTester() print("Starting AI Models Testing for IMAGE_GENERATE...") print("Initializing AI service...") await tester.initialize() # Get all available models models = tester.getAllAvailableModels() print(f"\nFound {len(models)} models to test:") for i, model in enumerate(models, 1): print(f" {i}. {model}") print(f"\n{'='*80}") print("STARTING IMAGE_GENERATE TESTS") print(f"{'='*80}") print("Testing each model's ability to generate images from text prompts...") print("Press Enter after each model test to continue to the next one...") # Test each model individually for i, modelName in enumerate(models, 1): print(f"\n[{i}/{len(models)}] Testing model: {modelName}") # Test the model await tester.testModel(modelName) # Pause for user input (except for the last model) if i < len(models): input(f"\nPress Enter to continue to the next model...") # Save detailed results to file resultsFile = tester.saveTestResults() # Print final summary tester.printTestSummary() print(f"\n{'='*80}") print("TESTING COMPLETED") print(f"{'='*80}") print(f"📄 Results saved to: {resultsFile}") print(f"📁 Test results saved to: {tester.modelTestDir}") if __name__ == "__main__": asyncio.run(main())