#!/usr/bin/env python3 """ AI Models Test - Tests WEB_CRAWL functionality on all models that support it This script tests all models that have WEB_CRAWL capability, validates that they can crawl specific URLs and return content, and analyzes the quality of results. """ import asyncio import json import sys import os import base64 from datetime import datetime from typing import Dict, Any, List # Add the gateway to path sys.path.append(os.path.dirname(__file__)) # Import the service initialization from modules.features.chatPlayground.mainChatPlayground import getServices from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.datamodels.datamodelUam import User class AIModelsTester: def __init__(self): # Create a minimal user context for testing testUser = User( id="test_user", username="test_user", email="test@example.com", fullName="Test User", language="en", mandateId="test_mandate" ) # Initialize services using the existing system self.services = getServices(testUser, None) # Test user, no workflow self.testResults = [] # Create logs directory if it doesn't exist self.logsDir = os.path.join(os.path.dirname(__file__), "..", "local", "logs") os.makedirs(self.logsDir, exist_ok=True) # Create modeltest subdirectory self.modelTestDir = os.path.join(self.logsDir, "modeltest") os.makedirs(self.modelTestDir, exist_ok=True) # Copy test image to modeltest directory if it exists testImageSource = os.path.join(self.logsDir, "_testdata_photo_2025-06-03_13-05-52.jpg") testImageDest = os.path.join(self.modelTestDir, "_testdata_photo_2025-06-03_13-05-52.jpg") if os.path.exists(testImageSource) and not os.path.exists(testImageDest): import shutil shutil.copy2(testImageSource, testImageDest) print(f"📷 Test image copied to: {testImageDest}") async def initialize(self): """Initialize the AI service.""" # Set logging level to DEBUG for detailed output import logging logging.getLogger().setLevel(logging.DEBUG) # Initialize the model registry with all connectors from modules.aicore.aicoreModelRegistry import modelRegistry from modules.aicore.aicorePluginTavily import AiTavily from modules.aicore.aicorePluginPerplexity import AiPerplexity # Register web connectors that support WEB_CRAWL modelRegistry.registerConnector(AiTavily()) modelRegistry.registerConnector(AiPerplexity()) # The AI service needs to be recreated with proper initialization from modules.services.serviceAi.mainServiceAi import AiService self.services.ai = await AiService.create(self.services) # Create a minimal workflow context from modules.datamodels.datamodelChat import ChatWorkflow import uuid self.services.currentWorkflow = ChatWorkflow( id=str(uuid.uuid4()), name="Test Workflow", status="running", startedAt=self.services.utils.timestampGetUtc(), lastActivity=self.services.utils.timestampGetUtc(), currentRound=1, currentTask=0, currentAction=0, totalTasks=0, totalActions=0, mandateId="test_mandate", messageIds=[], workflowMode="React", maxSteps=5 ) print("✅ AI Service initialized successfully") print(f"📁 Results will be saved to: {self.modelTestDir}") async def testModel(self, modelName: str) -> Dict[str, Any]: """Test a specific AI model with WEB_CRAWL operation.""" print(f"\n{'='*60}") print(f"TESTING MODEL: {modelName}") print(f"OPERATION TYPE: WEB_CRAWL") print(f"{'='*60}") # CRAWL CONFIGURATION # Deep and Broad Web Crawl Example: # - maxDepth: 3 (deep) - follows links up to 3 levels from starting page # - Level 1: Starting page # - Level 2: Pages linked from starting page # - Level 3: Pages linked from Level 2 pages # - maxWidth: 50 (broad) - crawls up to 50 pages at each depth level # This results in potential maximum of ~1,250 pages (if 50 links exist at each level) # # Common configurations: # - Fast/Overview: maxDepth=1, maxWidth=5 (shallow, focused) # - General/Standard: maxDepth=2, maxWidth=10 (balanced) # - Deep and Broad: maxDepth=3, maxWidth=50 (comprehensive) CRAWL_DEPTH = 3 # Deep crawl: follows links 3 levels deep CRAWL_WIDTH = 50 # Broad crawl: up to 50 pages per level print(f"Crawl Configuration:") print(f" - Depth: {CRAWL_DEPTH} levels (deep)") print(f" - Width: {CRAWL_WIDTH} pages per level (broad)") print(f" - Theoretical max: {CRAWL_WIDTH ** min(CRAWL_DEPTH, 3)} pages") # Use WEB_CRAWL specific prompt format from modules.datamodels.datamodelAi import AiCallPromptWebCrawl # Test with simple prompt like playground example simplePrompt = f"https://www.valueon.ch: Who works in this company?" # But keep structured format for now to match our API testPrompt = json.dumps({ "instruction": "Who works in this company?", "url": "https://www.valueon.ch", "maxDepth": CRAWL_DEPTH, "maxWidth": CRAWL_WIDTH }, indent=2) print(f"Simple prompt (playground style): {simplePrompt}") # For Tavily models, test direct API call for better link following if "tavily" in modelName.lower(): return await self._testTavilyDirect(modelName, CRAWL_DEPTH, CRAWL_WIDTH) print(f"Test prompt: {testPrompt}") print(f"Prompt length: {len(testPrompt)} characters") startTime = asyncio.get_event_loop().time() try: # Create options for WEB_CRAWL operation options = AiCallOptions( operationType=OperationTypeEnum.WEB_CRAWL, preferredModel=modelName ) # Call the AI service DIRECTLY through the model's functionCall # This tests the actual model, not the document generation pipeline # Get the model directly from the registry using the model registry from modules.aicore.aicoreModelRegistry import modelRegistry model = modelRegistry.getModel(modelName) if not model: raise Exception(f"Model {modelName} not found") # Create AiModelCall and call the model's functionCall directly from modules.datamodels.datamodelAi import AiModelCall import base64 import os # For WEB_CRAWL models, use normal functionCall with structured prompt messages = [{"role": "user", "content": testPrompt}] modelCall = AiModelCall( messages=messages, model=model, options=options ) response = await model.functionCall(modelCall) endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime # Analyze response - now we get AiModelResponse objects if hasattr(response, 'success'): # AiModelResponse object if response.success: result = { "modelName": modelName, "status": "SUCCESS", "processingTime": round(processingTime, 2), "responseLength": len(response.content) if response.content else 0, "responseType": "AiModelResponse", "hasContent": bool(response.content), "error": None, "modelUsed": modelName, "priceUsd": 0.0, # AiModelResponse doesn't have price info "bytesSent": 0, "bytesReceived": len(response.content.encode('utf-8')) if response.content else 0 } # Extract actual prompt sent if available in metadata if hasattr(response, 'metadata') and response.metadata: result["actualPromptSent"] = response.metadata.get("actualPromptSent", "N/A") # Try to parse content as JSON if response.content: try: json.loads(response.content) result["isValidJson"] = True except: result["isValidJson"] = False result["responsePreview"] = response.content[:200] + "..." if len(response.content) > 200 else response.content result["fullResponse"] = response.content else: result["isValidJson"] = False result["responsePreview"] = "Empty response" result["fullResponse"] = "" print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s") print(f"📄 Response length: {len(response.content) if response.content else 0} characters") print(f"📄 Model used: {modelName}") print(f"📄 Response preview: {result['responsePreview']}") else: error = response.error or "Unknown error" result = { "modelName": modelName, "status": "ERROR", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "AiModelResponse", "hasContent": False, "error": error, "fullResponse": str(response) } print(f"❌ ERROR - {error}") elif isinstance(response, dict): # Fallback for dict responses if response.get("success", True): result = { "modelName": modelName, "status": "SUCCESS", "processingTime": round(processingTime, 2), "responseLength": len(str(response)), "responseType": "dict", "hasContent": True, "error": None } # Try to parse as JSON try: jsonResponse = json.dumps(response, indent=2) result["responsePreview"] = jsonResponse[:200] + "..." if len(jsonResponse) > 200 else jsonResponse result["isValidJson"] = True result["fullResponse"] = jsonResponse except: result["responsePreview"] = str(response)[:200] + "..." if len(str(response)) > 200 else str(response) result["isValidJson"] = False result["fullResponse"] = str(response) print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s") print(f"📄 Response length: {len(str(response))} characters") print(f"📄 Response preview: {result['responsePreview']}") else: error = response.get("error", "Unknown error") result = { "modelName": modelName, "status": "ERROR", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "error", "hasContent": False, "error": error, "fullResponse": str(response) } print(f"❌ ERROR - {error}") else: # String response result = { "modelName": modelName, "status": "SUCCESS", "processingTime": round(processingTime, 2), "responseLength": len(str(response)), "responseType": "string", "hasContent": True, "error": None } # Try to parse as JSON try: json.loads(str(response)) result["isValidJson"] = True except: result["isValidJson"] = False result["responsePreview"] = str(response)[:200] + "..." if len(str(response)) > 200 else str(response) result["fullResponse"] = str(response) print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s") print(f"📄 Response length: {len(str(response))} characters") print(f"📄 Response preview: {result['responsePreview']}") # Add prompt to result for logging result["testPrompt"] = testPrompt result["crawlConfig"] = { "depth": CRAWL_DEPTH, "width": CRAWL_WIDTH } # For WEB_CRAWL, also validate that content was extracted if result.get("status") == "SUCCESS" and result.get("fullResponse"): self._validateCrawlResponse(modelName, result) except Exception as e: endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime result = { "modelName": modelName, "status": "EXCEPTION", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "exception", "hasContent": False, "error": str(e), "testPrompt": testPrompt, "crawlConfig": { "depth": CRAWL_DEPTH, "width": CRAWL_WIDTH } } print(f"💥 EXCEPTION - {str(e)}") self.testResults.append(result) # Save text response even for exceptions to log the prompt if result.get("status") in ["SUCCESS", "EXCEPTION", "ERROR"]: self._saveTextResponse(modelName, result) # Save individual model result immediately self._saveIndividualModelResult(modelName, result) return result def _saveImageResponse(self, modelName: str, result: Dict[str, Any]): """Save base64 image response to file.""" try: fullResponse = result.get("fullResponse", "") base64Data = None # Try to extract base64 data from response if isinstance(fullResponse, dict): # Look for base64 data in the response if "content" in fullResponse: base64Data = fullResponse["content"] elif "data" in fullResponse: base64Data = fullResponse["data"] elif "image" in fullResponse: base64Data = fullResponse["image"] else: # Try to find base64 data in string response import re base64Match = re.search(r'data:image/[^;]+;base64,([A-Za-z0-9+/=]+)', str(fullResponse)) if base64Match: base64Data = base64Match.group(1) else: # Try to find pure base64 string base64Match = re.search(r'([A-Za-z0-9+/=]{100,})', str(fullResponse)) if base64Match: base64Data = base64Match.group(1) if base64Data: # Clean base64 data if base64Data.startswith('data:image/'): base64Data = base64Data.split(',', 1)[1] # Decode and save image imageData = base64.b64decode(base64Data) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{modelName}_{timestamp}.png" filepath = os.path.join(self.modelTestDir, filename) with open(filepath, 'wb') as f: f.write(imageData) result["savedImage"] = filepath print(f"🖼️ Image saved: {filepath}") else: print(f"⚠️ No base64 image data found in response") except Exception as e: print(f"❌ Error saving image: {str(e)}") result["imageSaveError"] = str(e) def _saveTextResponse(self, modelName: str, result: Dict[str, Any]): """Save text response to file.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{modelName}_{timestamp}.txt" filepath = os.path.join(self.modelTestDir, filename) # Prepare content for saving content = result.get("fullResponse", "") if not content: content = result.get("responsePreview", "No content available") # If there's an error, include it in the content if result.get("error"): content = f"ERROR: {result.get('error')}\n\n{content}" # Get prompt and config for logging config = result.get("crawlConfig", {}) crawlDepth = config.get("depth", "N/A") crawlWidth = config.get("width", "N/A") # Get both the original JSON prompt and the actual prompt sent originalPrompt = result.get("testPrompt", "N/A") actualPromptSent = result.get("actualPromptSent", "N/A") # Add metadata header metadata = f"""Model: {modelName} Test Time: {timestamp} Status: {result.get('status', 'Unknown')} Processing Time: {result.get('processingTime', 0):.2f}s Response Length: {result.get('responseLength', 0)} characters Is Valid JSON: {result.get('isValidJson', False)} Test Method: {result.get('testMethod', 'standard')} Pages Crawled: {result.get('pagesCrawled', 'N/A')} Crawled URL: {result.get('crawledUrl', 'N/A')} Has URL: {result.get('hasUrl', 'N/A')} Has Title: {result.get('hasTitle', 'N/A')} Has Content: {result.get('hasContent', 'N/A')} Content Length: {result.get('contentLength', 'N/A')} characters --- CRAWL CONFIGURATION --- Depth: {crawlDepth} Width: {crawlWidth} --- ORIGINAL JSON PROMPT (input) --- {originalPrompt} --- ACTUAL PROMPT SENT TO API (EXACT) --- {actualPromptSent} --- RESPONSE CONTENT --- {content} """ with open(filepath, 'w', encoding='utf-8') as f: f.write(metadata) result["savedTextFile"] = filepath print(f"📄 Text response saved: {filepath}") except Exception as e: print(f"❌ Error saving text response: {str(e)}") result["textSaveError"] = str(e) def _validateCrawlResponse(self, modelName: str, result: Dict[str, Any]): """Validate that the WEB_CRAWL response contains crawled content.""" try: content = result.get("fullResponse", "") # Try to parse as JSON crawledData = {} try: parsed = json.loads(content) if isinstance(parsed, dict): crawledData = parsed except: pass # Check for expected fields: url, title, content hasUrl = bool(crawledData.get("url")) hasTitle = bool(crawledData.get("title")) hasContent = bool(crawledData.get("content")) contentLength = len(crawledData.get("content", "")) result["hasUrl"] = hasUrl result["hasTitle"] = hasTitle result["hasContent"] = hasContent result["contentLength"] = contentLength result["crawledUrl"] = crawledData.get("url", "") if hasUrl and hasContent: print(f"✅ Successfully crawled content from URL: {crawledData.get('url', 'unknown')}") print(f" Content length: {contentLength} characters") print(f" Title: {crawledData.get('title', 'N/A')}") else: print(f"⚠️ Incomplete crawl response - URL: {hasUrl}, Content: {hasContent}") except Exception as e: print(f"❌ Error validating crawl response: {str(e)}") result["crawlValidationError"] = str(e) async def _testTavilyDirect(self, modelName: str, crawlDepth: int = 3, crawlWidth: int = 50) -> Dict[str, Any]: """Test Tavily API directly using the crawl() method with better link following.""" print(f"\n{'='*60}") print(f"TESTING TAVILY DIRECT API (crawl method)") print(f"{'='*60}") startTime = asyncio.get_event_loop().time() try: from tavily import AsyncTavilyClient from modules.shared.configuration import APP_CONFIG apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET") if not apiKey: raise Exception("Tavily API key not found") client = AsyncTavilyClient(api_key=apiKey) # Map our configuration to Tavily parameters # maxWidth -> limit (pages per level) # maxDepth -> max_depth (link following depth) # max_breadth = maxWidth (breadth of crawl at each level) tavilyLimit = crawlWidth tavilyMaxDepth = crawlDepth tavilyMaxBreadth = crawlWidth print(f"Calling Tavily API with crawl() method...") print(f"URL: https://www.valueon.ch") print(f"Instructions: Who works in this company?") print(f"Limit: {tavilyLimit} pages per level") print(f"Max depth: {tavilyMaxDepth} (follows links {tavilyMaxDepth} levels deep)") print(f"Max breadth: {tavilyMaxBreadth} (up to {tavilyMaxBreadth} pages at each level)") print(f"Deep and Broad Crawl Configuration Active") response = await client.crawl( url="https://www.valueon.ch", instructions="Who works in this company?", limit=tavilyLimit, max_depth=tavilyMaxDepth, max_breadth=tavilyMaxBreadth ) endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime # Analyze response contentLength = 0 pagesCrawled = 0 fullContent = "" if isinstance(response, dict): # Check if it has results if "results" in response: results = response["results"] pagesCrawled = len(results) content_parts = [] for result in results: url = result.get("url", "") title = result.get("title", "") content = result.get("raw_content", result.get("content", "")) content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n") contentLength += len(content) fullContent = "\n".join(content_parts) else: fullContent = json.dumps(response, indent=2) contentLength = len(fullContent) elif isinstance(response, list): pagesCrawled = len(response) content_parts = [] for item in response: if isinstance(item, dict): url = item.get("url", "") title = item.get("title", "") content = item.get("raw_content", item.get("content", "")) content_parts.append(f"URL: {url}\nTitle: {title}\nContent: {content}\n{'='*60}\n") contentLength += len(content) fullContent = "\n".join(content_parts) else: fullContent = str(response) contentLength = len(fullContent) result = { "modelName": modelName, "status": "SUCCESS", "processingTime": round(processingTime, 2), "responseLength": contentLength, "responseType": "TavilyDirectAPI", "hasContent": True, "error": None, "modelUsed": modelName, "priceUsd": 0.0, "bytesSent": 0, "bytesReceived": contentLength, "isValidJson": True, "fullResponse": fullContent, "pagesCrawled": pagesCrawled, "testMethod": "direct_api_crawl" } print(f"✅ SUCCESS - Processing time: {processingTime:.2f}s") print(f"📄 Pages crawled: {pagesCrawled}") print(f"📄 Total content length: {contentLength} characters") # Save the response self._saveTextResponse(modelName, result) self._validateCrawlResponse(modelName, result) self._saveIndividualModelResult(modelName, result) self.testResults.append(result) return result except Exception as e: endTime = asyncio.get_event_loop().time() processingTime = endTime - startTime result = { "modelName": modelName, "status": "EXCEPTION", "processingTime": round(processingTime, 2), "responseLength": 0, "responseType": "exception", "hasContent": False, "error": str(e) } print(f"💥 EXCEPTION - {str(e)}") self.testResults.append(result) return result def _saveIndividualModelResult(self, modelName: str, result: Dict[str, Any]): """Save individual model test result to file.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{modelName}_{timestamp}.json" filepath = os.path.join(self.modelTestDir, filename) # Prepare individual result data individualData = { "modelName": modelName, "testTimestamp": timestamp, "testDate": datetime.now().isoformat(), "result": result } # Save to JSON file with open(filepath, 'w', encoding='utf-8') as f: json.dump(individualData, f, indent=2, ensure_ascii=False) print(f"📄 Individual result saved: {filename}") except Exception as e: print(f"❌ Error saving individual result: {str(e)}") def getAllAvailableModels(self) -> List[str]: """Get all available model names that support WEB_CRAWL.""" from modules.aicore.aicoreModelRegistry import modelRegistry from modules.datamodels.datamodelAi import OperationTypeEnum # Get all models from registry allModels = modelRegistry.getAvailableModels() # Filter models that support WEB_CRAWL webCrawlModels = [] for model in allModels: if model.operationTypes and any( ot.operationType == OperationTypeEnum.WEB_CRAWL for ot in model.operationTypes ): # Include both Tavily and Perplexity models webCrawlModels.append(model.name) # Filter to only "sonar" model for testing webCrawlModels = [m for m in webCrawlModels if m == "sonar"] print(f"Found {len(webCrawlModels)} models that support WEB_CRAWL (filtered to sonar):") for modelName in webCrawlModels: print(f" - {modelName}") return webCrawlModels def saveTestResults(self): """Save detailed test results to file.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") resultsFile = os.path.join(self.modelTestDir, f"modeltest_results_{timestamp}.json") # Prepare results for saving saveData = { "testTimestamp": timestamp, "testDate": datetime.now().isoformat(), "totalModels": len(self.testResults), "successfulModels": len([r for r in self.testResults if r["status"] == "SUCCESS"]), "errorModels": len([r for r in self.testResults if r["status"] == "ERROR"]), "exceptionModels": len([r for r in self.testResults if r["status"] == "EXCEPTION"]), "results": self.testResults } # Calculate success rate if saveData["totalModels"] > 0: saveData["successRate"] = (saveData["successfulModels"] / saveData["totalModels"]) * 100 else: saveData["successRate"] = 0 # Save to JSON file with open(resultsFile, 'w', encoding='utf-8') as f: json.dump(saveData, f, indent=2, ensure_ascii=False) print(f"📄 Detailed results saved: {resultsFile}") return resultsFile def printTestSummary(self): """Print a summary of all test results.""" print(f"\n{'='*80}") print("AI MODELS TEST SUMMARY") print(f"{'='*80}") totalModels = len(self.testResults) successfulModels = len([r for r in self.testResults if r["status"] == "SUCCESS"]) errorModels = len([r for r in self.testResults if r["status"] == "ERROR"]) exceptionModels = len([r for r in self.testResults if r["status"] == "EXCEPTION"]) print(f"📊 Total models tested: {totalModels}") print(f"✅ Successful: {successfulModels}") print(f"❌ Errors: {errorModels}") print(f"💥 Exceptions: {exceptionModels}") print(f"📈 Success rate: {(successfulModels/totalModels*100):.1f}%" if totalModels > 0 else "0%") print(f"\n{'='*80}") print("DETAILED RESULTS") print(f"{'='*80}") for result in self.testResults: status_icon = { "SUCCESS": "✅", "ERROR": "❌", "EXCEPTION": "💥" }.get(result["status"], "❓") print(f"\n{status_icon} {result['modelName']}") print(f" Status: {result['status']}") print(f" Processing time: {result['processingTime']}s") print(f" Response length: {result['responseLength']} characters") print(f" Response type: {result['responseType']}") if result.get("isValidJson") is not None: print(f" Valid JSON: {'Yes' if result['isValidJson'] else 'No'}") if result.get("crawledUrl"): print(f" Crawled URL: {result['crawledUrl']}") if result.get("contentLength") is not None: print(f" Content length: {result['contentLength']} characters") if result.get("pagesCrawled") is not None: print(f" Pages crawled: {result['pagesCrawled']}") if result["error"]: print(f" Error: {result['error']}") if result.get("responsePreview"): print(f" Preview: {result['responsePreview']}") # Find fastest and slowest models if successfulModels > 0: successfulResults = [r for r in self.testResults if r["status"] == "SUCCESS"] fastest = min(successfulResults, key=lambda x: x["processingTime"]) slowest = max(successfulResults, key=lambda x: x["processingTime"]) print(f"\n{'='*80}") print("PERFORMANCE HIGHLIGHTS") print(f"{'='*80}") print(f"🚀 Fastest model: {fastest['modelName']} ({fastest['processingTime']}s)") print(f"🐌 Slowest model: {slowest['modelName']} ({slowest['processingTime']}s)") # Find models with most content modelsWithContent = [r for r in successfulResults if r.get("contentLength", 0) > 0] if modelsWithContent: mostContent = max(modelsWithContent, key=lambda x: x.get("contentLength", 0)) totalContent = sum(r.get("contentLength", 0) for r in modelsWithContent) avgContent = totalContent / len(modelsWithContent) print(f"📄 Model with most content: {mostContent['modelName']} ({mostContent.get('contentLength', 0)} chars)") print(f"📊 Average content per model: {avgContent:.0f} characters") print(f"📊 Total content crawled across all models: {totalContent} characters") # Find models with most pages crawled (for Tavily direct API) modelsWithPages = [r for r in successfulResults if r.get("pagesCrawled", 0) > 0] if modelsWithPages: mostPages = max(modelsWithPages, key=lambda x: x.get("pagesCrawled", 0)) totalPages = sum(r.get("pagesCrawled", 0) for r in modelsWithPages) avgPages = totalPages / len(modelsWithPages) print(f"🔍 Model with most pages crawled: {mostPages['modelName']} ({mostPages.get('pagesCrawled', 0)} pages)") print(f"📊 Average pages per model: {avgPages:.1f} pages") print(f"📊 Total pages crawled across all models: {totalPages} pages") async def main(): """Run AI models testing for WEB_CRAWL operation.""" tester = AIModelsTester() print("Starting AI Models Testing for WEB_CRAWL...") print("Initializing AI service...") await tester.initialize() # Get all available models models = tester.getAllAvailableModels() print(f"\nFound {len(models)} models to test:") for i, model in enumerate(models, 1): print(f" {i}. {model}") print(f"\n{'='*80}") print("STARTING WEB_CRAWL TESTS") print(f"{'='*80}") print("Testing each model's ability to crawl URLs and return content...") print("Press Enter after each model test to continue to the next one...") # Test each model individually for i, modelName in enumerate(models, 1): print(f"\n[{i}/{len(models)}] Testing model: {modelName}") # Test the model await tester.testModel(modelName) # Pause for user input (except for the last model) if i < len(models): input(f"\nPress Enter to continue to the next model...") # Save detailed results to file resultsFile = tester.saveTestResults() # Print final summary tester.printTestSummary() print(f"\n{'='*80}") print("TESTING COMPLETED") print(f"{'='*80}") print(f"📄 Results saved to: {resultsFile}") print(f"📁 Images saved to: {tester.modelTestDir}") if __name__ == "__main__": asyncio.run(main())