658 lines
26 KiB
Python
658 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for MethodWeb class.
|
|
Tests all web actions: search, crawl, and scrape with various parameter sets.
|
|
|
|
Features:
|
|
- Tests web search functionality with different queries
|
|
- Tests web crawling with URL lists
|
|
- Tests web scraping (search + crawl combined)
|
|
- Detailed logging and progress tracking
|
|
- Error handling and validation testing
|
|
- Configuration validation
|
|
|
|
Usage:
|
|
- Interactive mode: python test_methodWeb.py
|
|
- Automated mode: python test_methodWeb.py --auto
|
|
- Verbose mode: python test_methodWeb.py --verbose
|
|
"""
|
|
|
|
import os
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import List, Optional, Dict, Any
|
|
from datetime import datetime, UTC
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Filter out specific unwanted log messages
|
|
class LogFilter(logging.Filter):
|
|
"""Filter to hide specific unwanted log messages."""
|
|
|
|
def filter(self, record):
|
|
# Hide HTTP request info messages
|
|
if "HTTP Request:" in record.getMessage() and "POST https://api.tavily.com" in record.getMessage():
|
|
return False
|
|
|
|
# Hide HTTP response info messages
|
|
if "HTTP/1.1 200 OK" in record.getMessage():
|
|
return False
|
|
|
|
return True
|
|
|
|
# Apply the filter to the root logger
|
|
root_logger = logging.getLogger()
|
|
root_logger.addFilter(LogFilter())
|
|
|
|
def check_dependencies():
|
|
"""Check if required dependencies are available."""
|
|
missing_deps = []
|
|
|
|
# Check for required dependencies
|
|
try:
|
|
import tavily
|
|
logger.info("tavily-python is available")
|
|
except ImportError:
|
|
missing_deps.append("tavily-python")
|
|
logger.error("tavily-python is missing")
|
|
|
|
try:
|
|
import httpx
|
|
logger.info("httpx is available")
|
|
except ImportError:
|
|
missing_deps.append("httpx")
|
|
logger.error("httpx is missing")
|
|
|
|
if missing_deps:
|
|
logger.error("\n" + "="*60)
|
|
logger.error("MISSING DEPENDENCIES DETECTED!")
|
|
logger.error("="*60)
|
|
logger.error("The following packages are required but not installed:")
|
|
for dep in missing_deps:
|
|
logger.error(f" - {dep}")
|
|
logger.error("\nTo install all dependencies, run:")
|
|
logger.error("pip install -r requirements.txt")
|
|
logger.error("="*60)
|
|
return False
|
|
|
|
logger.info("All required dependencies are available!")
|
|
return True
|
|
|
|
def check_module_imports():
|
|
"""Check if we can import the required modules."""
|
|
try:
|
|
# Add the gateway directory to the path so we can import our modules
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
|
|
|
|
from modules.methods.methodWeb import MethodWeb
|
|
from modules.chat.serviceCenter import ServiceCenter
|
|
from modules.interfaces.interfaceAppModel import User, UserConnection, UserPrivilege, AuthAuthority
|
|
from modules.interfaces.interfaceChatModel import ChatWorkflow, TaskItem, TaskStatus
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
logger.info("All required modules imported successfully")
|
|
return True
|
|
except ImportError as e:
|
|
logger.error(f"Failed to import required modules: {e}")
|
|
logger.error("Make sure you're running this script from the gateway directory")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error importing modules: {e}")
|
|
return False
|
|
|
|
def check_configuration():
|
|
"""Check if required configuration is available."""
|
|
try:
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
# Check Tavily API key
|
|
tavily_api_key = APP_CONFIG.get("Connector_WebTavily_API_KEY")
|
|
if not tavily_api_key or tavily_api_key == "your_tavily_api_key_here":
|
|
logger.error("Tavily API key not configured!")
|
|
logger.error("Please set Connector_WebTavily_API_KEY in config.ini")
|
|
return False
|
|
|
|
logger.info("Tavily API key is configured")
|
|
|
|
# Check other web configuration
|
|
web_configs = [
|
|
"Web_Search_MAX_QUERY_LENGTH",
|
|
"Web_Search_MAX_RESULTS",
|
|
"Web_Search_MIN_RESULTS",
|
|
"Web_Crawl_TIMEOUT",
|
|
"Web_Crawl_MAX_RETRIES",
|
|
"Web_Crawl_RETRY_DELAY"
|
|
]
|
|
|
|
for config_key in web_configs:
|
|
value = APP_CONFIG.get(config_key)
|
|
if value:
|
|
logger.info(f"Configuration {config_key}: {value}")
|
|
else:
|
|
logger.warning(f"Configuration {config_key} not set, using default")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to check configuration: {e}")
|
|
return False
|
|
|
|
def create_mock_service_center():
|
|
"""Create a proper ServiceCenter for testing purposes."""
|
|
try:
|
|
from modules.chat.serviceCenter import ServiceCenter
|
|
from modules.interfaces.interfaceAppModel import User, UserPrivilege, AuthAuthority
|
|
from modules.interfaces.interfaceChatModel import ChatWorkflow, TaskItem, TaskStatus
|
|
from modules.interfaces.interfaceChatModel import ChatLog, ChatMessage, ChatStat
|
|
|
|
# Create proper user with all required fields
|
|
mock_user = User(
|
|
id="test_user_web_001",
|
|
username="testuser_web",
|
|
email="testweb@example.com",
|
|
fullName="Test Web User",
|
|
language="en",
|
|
enabled=True,
|
|
privilege=UserPrivilege.USER,
|
|
authenticationAuthority=AuthAuthority.LOCAL,
|
|
mandateId="test_mandate_web_001"
|
|
)
|
|
|
|
# Create proper workflow with all required fields
|
|
current_time = datetime.now(UTC).timestamp()
|
|
mock_workflow = ChatWorkflow(
|
|
id="test_workflow_web_001",
|
|
mandateId="test_mandate_web_001",
|
|
status="active",
|
|
name="Test Web Method Workflow",
|
|
currentRound=1,
|
|
lastActivity=current_time,
|
|
startedAt=current_time,
|
|
logs=[],
|
|
messages=[],
|
|
stats=None,
|
|
tasks=[]
|
|
)
|
|
|
|
# Create service center
|
|
service_center = ServiceCenter(mock_user, mock_workflow)
|
|
logger.info("ServiceCenter created successfully for web testing")
|
|
return service_center
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create ServiceCenter: {e}")
|
|
return None
|
|
|
|
class MethodWebTester:
|
|
"""Test class for MethodWeb functionality."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the tester."""
|
|
self.method_web = None
|
|
self.service_center = None
|
|
|
|
# Test results tracking
|
|
self.test_results = []
|
|
|
|
# Test parameter sets
|
|
self.test_queries = [
|
|
"Python programming tutorial",
|
|
"FastAPI documentation",
|
|
"machine learning basics",
|
|
"web scraping best practices"
|
|
]
|
|
|
|
self.test_urls = [
|
|
"https://docs.python.org/3/tutorial/",
|
|
"https://fastapi.tiangolo.com/",
|
|
"https://scikit-learn.org/stable/",
|
|
"https://requests.readthedocs.io/en/latest/"
|
|
]
|
|
|
|
def initialize_method_web(self):
|
|
"""Initialize the MethodWeb instance with a proper ServiceCenter."""
|
|
try:
|
|
# First create the service center
|
|
self.service_center = create_mock_service_center()
|
|
if not self.service_center:
|
|
logger.error("Failed to create ServiceCenter!")
|
|
return False
|
|
|
|
# Now create MethodWeb with the service center
|
|
from modules.methods.methodWeb import MethodWeb
|
|
self.method_web = MethodWeb(self.service_center)
|
|
logger.info("MethodWeb initialized successfully with ServiceCenter")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize MethodWeb: {e}")
|
|
return False
|
|
|
|
async def test_search_action(self, query: str, max_results: int = 5) -> Dict[str, Any]:
|
|
"""Test the search action with given parameters."""
|
|
logger.info(f"Testing search action with query: '{query}', max_results: {max_results}")
|
|
|
|
try:
|
|
parameters = {
|
|
"query": query,
|
|
"maxResults": max_results
|
|
}
|
|
|
|
result = await self.method_web.search(parameters)
|
|
|
|
test_result = {
|
|
"action": "search",
|
|
"query": query,
|
|
"max_results": max_results,
|
|
"success": result.success,
|
|
"error": result.error if not result.success else None,
|
|
"documents_count": len(result.documents) if result.documents else 0,
|
|
"result_label": result.resultLabel if hasattr(result, 'resultLabel') else None
|
|
}
|
|
|
|
if result.success:
|
|
logger.info(f"✅ Search successful: {test_result['documents_count']} documents returned")
|
|
if result.documents:
|
|
for i, doc in enumerate(result.documents):
|
|
logger.info(f" Document {i+1}: {doc.documentName}")
|
|
if hasattr(doc, 'documentData') and hasattr(doc.documentData, 'results'):
|
|
logger.info(f" Results count: {len(doc.documentData.results)}")
|
|
else:
|
|
logger.error(f"❌ Search failed: {result.error}")
|
|
|
|
return test_result
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.error(f"❌ Search action exception: {error_msg}")
|
|
return {
|
|
"action": "search",
|
|
"query": query,
|
|
"max_results": max_results,
|
|
"success": False,
|
|
"error": f"Exception: {error_msg}",
|
|
"documents_count": 0,
|
|
"result_label": None
|
|
}
|
|
|
|
async def test_crawl_action(self, urls: List[str]) -> Dict[str, Any]:
|
|
"""Test the crawl action with given URLs."""
|
|
logger.info(f"Testing crawl action with {len(urls)} URLs")
|
|
|
|
try:
|
|
# Monkeypatch the service to return a mock document and file data
|
|
class _MockDoc:
|
|
def __init__(self, fileId: str, fileName: str = "mock_search_results.json"):
|
|
self.fileId = fileId
|
|
self.fileName = fileName
|
|
|
|
def _mock_get_docs(_doc_ids):
|
|
return [_MockDoc(fileId="mock_file_id", fileName="mock_search_results.json")]
|
|
|
|
# Build minimal JSON structure expected by methodWeb.crawl
|
|
mock_payload = {
|
|
"documentData": {
|
|
"results": [{"url": u} for u in urls]
|
|
}
|
|
}
|
|
|
|
def _mock_get_file_data(_file_id):
|
|
return json.dumps(mock_payload).encode("utf-8")
|
|
|
|
# Apply monkeypatches to the method's service
|
|
self.method_web.service.getChatDocumentsFromDocumentList = _mock_get_docs
|
|
self.method_web.service.getFileData = _mock_get_file_data
|
|
|
|
# Use any string as the document list reference; service is mocked
|
|
parameters = {"documentList": "mock_document_list_ref"}
|
|
|
|
result = await self.method_web.crawl(parameters)
|
|
|
|
test_result = {
|
|
"action": "crawl",
|
|
"urls_count": len(urls),
|
|
"success": result.success,
|
|
"error": result.error if not result.success else None,
|
|
"documents_count": len(result.documents) if result.documents else 0,
|
|
"result_label": result.resultLabel if hasattr(result, 'resultLabel') else None
|
|
}
|
|
|
|
if result.success:
|
|
logger.info(f"✅ Crawl successful: {test_result['documents_count']} documents returned")
|
|
if result.documents:
|
|
for i, doc in enumerate(result.documents):
|
|
logger.info(f" Document {i+1}: {doc.documentName}")
|
|
else:
|
|
logger.error(f"❌ Crawl failed: {result.error}")
|
|
|
|
return test_result
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.error(f"❌ Crawl action exception: {error_msg}")
|
|
return {
|
|
"action": "crawl",
|
|
"urls_count": len(urls),
|
|
"success": False,
|
|
"error": f"Exception: {error_msg}",
|
|
"documents_count": 0,
|
|
"result_label": None
|
|
}
|
|
|
|
async def test_scrape_action(self, query: str, max_results: int = 3) -> Dict[str, Any]:
|
|
"""Test the scrape action (search + crawl combined) with given parameters."""
|
|
logger.info(f"Testing scrape action with query: '{query}', max_results: {max_results}")
|
|
|
|
try:
|
|
parameters = {
|
|
"query": query,
|
|
"maxResults": max_results
|
|
}
|
|
|
|
result = await self.method_web.scrape(parameters)
|
|
|
|
test_result = {
|
|
"action": "scrape",
|
|
"query": query,
|
|
"max_results": max_results,
|
|
"success": result.success,
|
|
"error": result.error if not result.success else None,
|
|
"documents_count": len(result.documents) if result.documents else 0,
|
|
"result_label": result.resultLabel if hasattr(result, 'resultLabel') else None
|
|
}
|
|
|
|
if result.success:
|
|
logger.info(f"✅ Scrape successful: {test_result['documents_count']} documents returned")
|
|
if result.documents:
|
|
for i, doc in enumerate(result.documents):
|
|
logger.info(f" Document {i+1}: {doc.documentName}")
|
|
if hasattr(doc, 'documentData') and hasattr(doc.documentData, 'results'):
|
|
logger.info(f" Results count: {len(doc.documentData.results)}")
|
|
else:
|
|
logger.error(f"❌ Scrape failed: {result.error}")
|
|
|
|
return test_result
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.error(f"❌ Scrape action exception: {error_msg}")
|
|
return {
|
|
"action": "scrape",
|
|
"query": query,
|
|
"max_results": max_results,
|
|
"success": False,
|
|
"error": f"Exception: {error_msg}",
|
|
"documents_count": 0,
|
|
"result_label": None
|
|
}
|
|
|
|
async def test_parameter_validation(self) -> List[Dict[str, Any]]:
|
|
"""Test parameter validation with invalid inputs."""
|
|
logger.info("Testing parameter validation with invalid inputs")
|
|
|
|
validation_tests = []
|
|
|
|
# Test 1: Empty query
|
|
logger.info("Test 1: Empty query")
|
|
result = await self.test_search_action("", 5)
|
|
# For validation tests, we expect the request to fail with validation error
|
|
if not result["success"] and "validation error" in result.get("error", "").lower():
|
|
result["success"] = True # Mark as successful validation test
|
|
result["validation_test"] = True
|
|
result["expected_behavior"] = "Correctly rejected empty query"
|
|
logger.info("✅ Validation test PASSED: Empty query correctly rejected")
|
|
validation_tests.append(result)
|
|
|
|
# Test 2: Query too long (over 400 characters)
|
|
long_query = "a" * 500
|
|
logger.info("Test 2: Query too long")
|
|
result = await self.test_search_action(long_query, 5)
|
|
if not result["success"] and "validation error" in result.get("error", "").lower():
|
|
result["success"] = True # Mark as successful validation test
|
|
result["validation_test"] = True
|
|
result["expected_behavior"] = "Correctly rejected overly long query"
|
|
logger.info("✅ Validation test PASSED: Long query correctly rejected")
|
|
validation_tests.append(result)
|
|
|
|
# Test 3: Max results too high
|
|
logger.info("Test 3: Max results too high")
|
|
result = await self.test_search_action("test", 25)
|
|
if not result["success"] and "validation error" in result.get("error", "").lower():
|
|
result["success"] = True # Mark as successful validation test
|
|
result["validation_test"] = True
|
|
result["expected_behavior"] = "Correctly rejected excessive max results"
|
|
logger.info("✅ Validation test PASSED: High max results correctly rejected")
|
|
validation_tests.append(result)
|
|
|
|
# Test 4: Max results too low
|
|
logger.info("Test 4: Max results too low")
|
|
result = await self.test_search_action("test", 0)
|
|
if not result["success"] and "validation error" in result.get("error", "").lower():
|
|
result["success"] = True # Mark as successful validation test
|
|
result["validation_test"] = True
|
|
result["expected_behavior"] = "Correctly rejected zero max results"
|
|
logger.info("✅ Validation test PASSED: Zero max results correctly rejected")
|
|
validation_tests.append(result)
|
|
|
|
return validation_tests
|
|
|
|
async def run_all_tests(self) -> None:
|
|
"""Run all web method tests."""
|
|
logger.info("Starting MethodWeb comprehensive tests")
|
|
logger.info("=" * 60)
|
|
|
|
# Initialize the method
|
|
if not self.initialize_method_web():
|
|
logger.error("Cannot proceed without MethodWeb!")
|
|
return
|
|
|
|
# Test 1: Search actions with different queries
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("TEST 1: SEARCH ACTIONS")
|
|
logger.info("=" * 60)
|
|
|
|
for i, query in enumerate(self.test_queries):
|
|
logger.info(f"\nSearch test {i+1}/{len(self.test_queries)}")
|
|
result = await self.test_search_action(query, 3)
|
|
self.test_results.append(result)
|
|
await asyncio.sleep(1) # Rate limiting
|
|
|
|
# Test 2: Scrape actions (search + crawl combined)
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("TEST 2: SCRAPE ACTIONS")
|
|
logger.info("=" * 60)
|
|
|
|
scrape_queries = self.test_queries[:2] # Use first 2 queries for scraping
|
|
for i, query in enumerate(scrape_queries):
|
|
logger.info(f"\nScrape test {i+1}/{len(scrape_queries)}")
|
|
result = await self.test_scrape_action(query, 2)
|
|
self.test_results.append(result)
|
|
await asyncio.sleep(2) # Rate limiting for scraping
|
|
|
|
# Test 3: Parameter validation
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("TEST 3: PARAMETER VALIDATION")
|
|
logger.info("=" * 60)
|
|
|
|
validation_results = await self.test_parameter_validation()
|
|
self.test_results.extend(validation_results)
|
|
|
|
# Test 4: Crawl action (if we have search results)
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("TEST 4: CRAWL ACTIONS")
|
|
logger.info("=" * 60)
|
|
|
|
logger.info("Testing crawl with sample URLs")
|
|
result = await self.test_crawl_action(self.test_urls[:2])
|
|
self.test_results.append(result)
|
|
|
|
# Print comprehensive summary
|
|
self.print_test_summary()
|
|
|
|
def print_test_summary(self):
|
|
"""Print comprehensive test summary."""
|
|
logger.info("\n" + "=" * 80)
|
|
logger.info("COMPREHENSIVE TEST SUMMARY")
|
|
logger.info("=" * 80)
|
|
|
|
total_tests = len(self.test_results)
|
|
successful_tests = sum(1 for result in self.test_results if result["success"])
|
|
failed_tests = total_tests - successful_tests
|
|
|
|
logger.info(f"Total tests run: {total_tests}")
|
|
logger.info(f"Successful: {successful_tests}")
|
|
logger.info(f"Failed: {failed_tests}")
|
|
logger.info(f"Success rate: {(successful_tests/total_tests)*100:.1f}%")
|
|
|
|
# Group results by action type
|
|
action_groups = {}
|
|
for result in self.test_results:
|
|
action = result["action"]
|
|
if action not in action_groups:
|
|
action_groups[action] = []
|
|
action_groups[action].append(result)
|
|
|
|
logger.info("\n" + "-" * 80)
|
|
logger.info("RESULTS BY ACTION TYPE:")
|
|
logger.info("-" * 80)
|
|
|
|
for action, results in action_groups.items():
|
|
action_successful = sum(1 for r in results if r["success"])
|
|
action_total = len(results)
|
|
logger.info(f"\n{action.upper()} ACTIONS:")
|
|
logger.info(f" Total: {action_total}, Successful: {action_successful}, Failed: {action_total - action_successful}")
|
|
|
|
for i, result in enumerate(results):
|
|
status_icon = "✅" if result["success"] else "❌"
|
|
|
|
# Handle validation tests specially
|
|
if result.get("validation_test", False):
|
|
logger.info(f" {status_icon} Validation Test {i+1}: {result.get('expected_behavior', 'Validation working correctly')}")
|
|
if result.get("error"):
|
|
logger.info(f" Validation Error: {result['error']}")
|
|
elif action == "search":
|
|
logger.info(f" {status_icon} Test {i+1}: '{result['query']}' -> {result['documents_count']} docs")
|
|
elif action == "scrape":
|
|
logger.info(f" {status_icon} Test {i+1}: '{result['query']}' -> {result['documents_count']} docs")
|
|
elif action == "crawl":
|
|
logger.info(f" {status_icon} Test {i+1}: {result['urls_count']} URLs -> {result['documents_count']} docs")
|
|
|
|
if not result["success"] and not result.get("validation_test", False):
|
|
logger.info(f" Error: {result['error']}")
|
|
|
|
logger.info("\n" + "-" * 80)
|
|
logger.info("CONFIGURATION STATUS:")
|
|
logger.info("-" * 80)
|
|
|
|
try:
|
|
from modules.shared.configuration import APP_CONFIG
|
|
tavily_key = APP_CONFIG.get("Connector_WebTavily_API_KEY")
|
|
if tavily_key and tavily_key != "your_tavily_api_key_here":
|
|
logger.info("✅ Tavily API key: Configured")
|
|
else:
|
|
logger.info("❌ Tavily API key: Not configured")
|
|
|
|
web_configs = [
|
|
("Web_Search_MAX_QUERY_LENGTH", "400"),
|
|
("Web_Search_MAX_RESULTS", "20"),
|
|
("Web_Search_MIN_RESULTS", "1"),
|
|
("Web_Crawl_TIMEOUT", "30"),
|
|
("Web_Crawl_MAX_RETRIES", "3"),
|
|
("Web_Crawl_RETRY_DELAY", "2")
|
|
]
|
|
|
|
for config_key, default_value in web_configs:
|
|
value = APP_CONFIG.get(config_key, default_value)
|
|
logger.info(f"✅ {config_key}: {value}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Configuration check failed: {e}")
|
|
|
|
logger.info("=" * 80)
|
|
|
|
def parse_arguments():
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(description='MethodWeb Test Script')
|
|
parser.add_argument('--auto', action='store_true',
|
|
help='Run tests automatically without user interaction')
|
|
parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
parser.add_argument('--quick', action='store_true',
|
|
help='Run quick tests with fewer queries')
|
|
|
|
return parser.parse_args()
|
|
|
|
async def main():
|
|
"""Main function to run the tests."""
|
|
# Parse command line arguments
|
|
args = parse_arguments()
|
|
|
|
# Set logging level based on verbosity
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
else:
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
logger.info("MethodWeb Test Script")
|
|
logger.info("=" * 50)
|
|
|
|
# Check dependencies first
|
|
if not check_dependencies():
|
|
logger.error("Please install missing dependencies before running tests.")
|
|
return
|
|
|
|
# Check module imports
|
|
if not check_module_imports():
|
|
logger.error("Cannot import required modules. Please check your setup.")
|
|
return
|
|
|
|
# Check configuration
|
|
if not check_configuration():
|
|
logger.error("Configuration check failed. Please check your config.ini file.")
|
|
return
|
|
|
|
# Determine test mode
|
|
if args.auto:
|
|
logger.info("Running in automated mode")
|
|
else:
|
|
# Interactive mode: ask user for confirmation
|
|
print("\n" + "=" * 50)
|
|
print("METHODWEB TEST SCRIPT")
|
|
print("=" * 50)
|
|
print("This script will test the MethodWeb functionality including:")
|
|
print("- Web search actions")
|
|
print("- Web scraping actions")
|
|
print("- Web crawling actions")
|
|
print("- Parameter validation")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
choice = input("Do you want to proceed? (y/N): ").strip().lower()
|
|
if choice not in ['y', 'yes']:
|
|
logger.info("Test cancelled by user")
|
|
return
|
|
except (EOFError, KeyboardInterrupt):
|
|
logger.info("Test cancelled by user")
|
|
return
|
|
|
|
# Create tester and run tests
|
|
tester = MethodWebTester()
|
|
|
|
# Modify test queries for quick mode
|
|
if args.quick:
|
|
tester.test_queries = tester.test_queries[:2] # Use only first 2 queries
|
|
logger.info("Running in quick mode with reduced test set")
|
|
|
|
await tester.run_all_tests()
|
|
|
|
if __name__ == "__main__":
|
|
# Run the tests
|
|
asyncio.run(main())
|