gateway/modules/methods/methodWeb.py
2025-07-08 01:14:27 +02:00

614 lines
No EOL
24 KiB
Python

"""
Web method module.
Handles web operations using the web service.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
import requests
from bs4 import BeautifulSoup
import time
import uuid
from modules.workflow.methodBase import MethodBase, ActionResult, action
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
class MethodWeb(MethodBase):
"""Web method implementation for web operations"""
def __init__(self, serviceContainer: Any):
"""Initialize the web method"""
super().__init__(serviceContainer)
self.name = "web"
self.description = "Handle web operations like crawling and scraping"
# Web search configuration from agentWebcrawler
self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "")
self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google")
self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto")
self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5"))
if not self.srcApikey:
logger.warning("SerpAPI key not configured for web search")
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
self.timeout = 30
def _readUrl(self, url: str) -> BeautifulSoup:
"""Read a URL and return a BeautifulSoup parser for the content"""
if not url or not url.startswith(('http://', 'https://')):
return None
headers = {
'User-Agent': self.user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml',
'Accept-Language': 'en-US,en;q=0.9',
}
try:
# Initial request
response = requests.get(url, headers=headers, timeout=self.timeout)
# Handling for status 202
if response.status_code == 202:
# Retry with backoff
backoff_times = [0.5, 1.0, 2.0, 5.0]
for wait_time in backoff_times:
time.sleep(wait_time)
response = requests.get(url, headers=headers, timeout=self.timeout)
if response.status_code != 202:
break
# Raise for error status codes
response.raise_for_status()
# Parse HTML
return BeautifulSoup(response.text, 'html.parser')
except Exception as e:
logger.error(f"Error reading URL {url}: {str(e)}")
return None
def _extractTitle(self, soup: BeautifulSoup, url: str) -> str:
"""Extract the title from a webpage"""
if not soup:
return f"Error with {url}"
# Extract title from title tag
title_tag = soup.find('title')
title = title_tag.text.strip() if title_tag else "No title"
# Alternative: Also look for h1 tags if title tag is missing
if title == "No title":
h1_tag = soup.find('h1')
if h1_tag:
title = h1_tag.text.strip()
return title
def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 10000) -> str:
"""Extract the main content from an HTML page"""
if not soup:
return ""
# Try to find main content elements in priority order
main_content = None
for selector in ['main', 'article', '#content', '.content', '#main', '.main']:
content = soup.select_one(selector)
if content:
main_content = content
break
# If no main content found, use the body
if not main_content:
main_content = soup.find('body') or soup
# Remove script, style, nav, footer elements that don't contribute to main content
for element in main_content.select('script, style, nav, footer, header, aside, .sidebar, #sidebar, .comments, #comments, .advertisement, .ads, iframe'):
element.extract()
# Extract text content
text_content = main_content.get_text(separator=' ', strip=True)
# Limit to max_chars
return text_content[:max_chars]
def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Check basic accessibility features"""
issues = []
warnings = []
# Check for alt text on images
images_without_alt = soup.find_all('img', alt='')
if images_without_alt:
issues.append(f"Found {len(images_without_alt)} images without alt text")
# Check for proper heading structure
headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
if not headings:
warnings.append("No headings found - poor document structure")
# Check for form labels
forms = soup.find_all('form')
for form in forms:
inputs = form.find_all('input')
for input_elem in inputs:
if input_elem.get('type') not in ['submit', 'button', 'hidden']:
if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}):
warnings.append("Form input without proper label")
return {
"status": "warning" if warnings else "pass",
"issues": issues,
"warnings": warnings
}
def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Check basic SEO features"""
issues = []
warnings = []
# Check for title tag
title = soup.find('title')
if not title:
issues.append("Missing title tag")
elif len(title.get_text()) < 10:
warnings.append("Title tag is too short")
elif len(title.get_text()) > 60:
warnings.append("Title tag is too long")
# Check for meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if not meta_desc:
warnings.append("Missing meta description")
elif meta_desc.get('content'):
if len(meta_desc.get('content')) < 50:
warnings.append("Meta description is too short")
elif len(meta_desc.get('content')) > 160:
warnings.append("Meta description is too long")
# Check for h1 tag
h1_tags = soup.find_all('h1')
if not h1_tags:
warnings.append("No H1 tag found")
elif len(h1_tags) > 1:
warnings.append("Multiple H1 tags found")
return {
"status": "warning" if warnings else "pass",
"issues": issues,
"warnings": warnings
}
def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
"""Check basic performance indicators"""
warnings = []
# Count images
images = soup.find_all('img')
if len(images) > 20:
warnings.append(f"Many images found ({len(images)}) - may impact loading speed")
# Check for external resources
external_scripts = soup.find_all('script', src=True)
external_styles = soup.find_all('link', rel='stylesheet')
if len(external_scripts) > 10:
warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed")
if len(external_styles) > 5:
warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed")
return {
"status": "warning" if warnings else "pass",
"warnings": warnings,
"metrics": {
"images": len(images),
"external_scripts": len(external_scripts),
"external_styles": len(external_styles)
}
}
@action
async def crawl(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Crawl web pages and extract content
Parameters:
urls (List[str]): List of URLs to crawl
maxDepth (int, optional): Maximum crawl depth (default: 2)
includeImages (bool, optional): Whether to include images (default: False)
followLinks (bool, optional): Whether to follow links (default: True)
"""
try:
urls = parameters.get("urls")
maxDepth = parameters.get("maxDepth", 2)
includeImages = parameters.get("includeImages", False)
followLinks = parameters.get("followLinks", True)
if not urls:
return self._createResult(
success=False,
data={},
error="URLs are required"
)
# Crawl each URL
crawl_results = []
for url in urls:
try:
# Read the URL
soup = self._readUrl(url)
if not soup:
crawl_results.append({
"error": "Failed to read URL",
"url": url
})
continue
# Extract basic information
title = self._extractTitle(soup, url)
content = self._extractMainContent(soup) if True else ""
# Extract links if requested
links = []
if followLinks:
for link in soup.find_all('a', href=True):
href = link.get('href')
if href and href.startswith(('http://', 'https://')):
links.append({
'url': href,
'text': link.get_text(strip=True)[:100]
})
# Extract images
images = []
for img in soup.find_all('img', src=True):
src = img.get('src')
if src:
images.append({
'src': src,
'alt': img.get('alt', ''),
'title': img.get('title', '')
})
crawl_results.append({
"url": url,
"depth": maxDepth,
"followLinks": followLinks,
"extractContent": True,
"title": title,
"content": content,
"links": links[:10], # Limit to first 10 links
"images": images[:10], # Limit to first 10 images
"timestamp": datetime.now(UTC).isoformat()
})
except Exception as e:
logger.error(f"Error crawling web page {url}: {str(e)}")
crawl_results.append({
"error": str(e),
"url": url
})
# Create result data
result_data = {
"urls": urls,
"maxDepth": maxDepth,
"includeImages": includeImages,
"followLinks": followLinks,
"crawlResults": crawl_results,
"timestamp": datetime.now(UTC).isoformat()
}
return self._createResult(
success=True,
data={
"documentName": f"web_crawl_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
"documentData": result_data
}
)
except Exception as e:
logger.error(f"Error crawling web pages: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)
@action
async def scrape(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Scrape specific data from web pages
Parameters:
url (str): URL to scrape
selectors (Dict[str, str]): CSS selectors for data extraction
format (str, optional): Output format (default: "json")
"""
try:
url = parameters.get("url")
selectors = parameters.get("selectors")
format = parameters.get("format", "json")
if not url or not selectors:
return self._createResult(
success=False,
data={},
error="URL and selectors are required"
)
# Read the URL
soup = self._readUrl(url)
if not soup:
return self._createResult(
success=False,
data={},
error="Failed to read URL"
)
extracted_content = {}
if selectors:
# Extract content using provided selectors
for selector_name, selector in selectors.items():
elements = soup.select(selector)
if elements:
if format == "text":
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
elif format == "html":
extracted_content[selector_name] = [str(elem) for elem in elements]
else:
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
else:
extracted_content[selector_name] = []
else:
# Auto-extract common elements
extracted_content = {
"title": self._extractTitle(soup, url),
"main_content": self._extractMainContent(soup),
"headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])],
"links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))],
"images": [img.get('src') for img in soup.find_all('img', src=True)]
}
scrape_result = {
"url": url,
"selectors": selectors,
"format": format,
"content": extracted_content,
"timestamp": datetime.now(UTC).isoformat()
}
# Create result data
result_data = {
"url": url,
"selectors": selectors,
"format": format,
"scrapedData": scrape_result,
"timestamp": datetime.now(UTC).isoformat()
}
return self._createResult(
success=True,
data={
"documentName": f"web_scrape_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.{format}",
"documentData": result_data
}
)
except Exception as e:
logger.error(f"Error scraping web page: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)
@action
async def search(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Search web content
Parameters:
query (str): Search query
engine (str, optional): Search engine to use (default: "google")
maxResults (int, optional): Maximum number of results (default: 10)
filter (str, optional): Additional search filters
"""
try:
query = parameters.get("query")
engine = parameters.get("engine", "google")
maxResults = parameters.get("maxResults", 10)
filter = parameters.get("filter")
if not query:
return self._createResult(
success=False,
data={},
error="Search query is required"
)
# Search web content using Google search via SerpAPI
try:
if not self.srcApikey:
search_result = {
"error": "SerpAPI key not configured",
"query": query
}
else:
# Get user language from service container if available
userLanguage = "en" # Default language
if hasattr(self.serviceContainer, 'user') and hasattr(self.serviceContainer.user, 'language'):
userLanguage = self.serviceContainer.user.language
# Format the search request for SerpAPI
params = {
"engine": self.srcEngine,
"q": query,
"api_key": self.srcApikey,
"num": min(maxResults, self.maxResults), # Number of results to return
"hl": userLanguage # User language
}
# Make the API request
response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout)
response.raise_for_status()
# Parse JSON response
search_results = response.json()
# Extract organic results
results = []
if "organic_results" in search_results:
for result in search_results["organic_results"][:maxResults]:
# Extract title
title = result.get("title", "No title")
# Extract URL
url = result.get("link", "No URL")
# Extract snippet
snippet = result.get("snippet", "No description")
# Get actual page content
try:
targetPageSoup = self._readUrl(url)
content = self._extractMainContent(targetPageSoup)
except Exception as e:
logger.warning(f"Error extracting content from {url}: {str(e)}")
content = f"Error extracting content: {str(e)}"
results.append({
'title': title,
'url': url,
'snippet': snippet,
'content': content
})
# Limit number of results
if len(results) >= maxResults:
break
else:
logger.warning(f"No organic results found in SerpAPI response for: {query}")
search_result = {
"query": query,
"maxResults": maxResults,
"results": results,
"totalFound": len(results),
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Error searching web: {str(e)}")
search_result = {
"error": str(e),
"query": query
}
# Create result data
result_data = {
"query": query,
"engine": engine,
"maxResults": maxResults,
"filter": filter,
"searchResults": search_result,
"timestamp": datetime.now(UTC).isoformat()
}
return self._createResult(
success=True,
data={
"documentName": f"web_search_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
"documentData": result_data
}
)
except Exception as e:
logger.error(f"Error searching web: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)
@action
async def validate(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Validate web pages for various criteria
Parameters:
url (str): URL to validate
checks (List[str], optional): Types of checks to perform (default: ["accessibility", "seo", "performance"])
"""
try:
url = parameters.get("url")
checks = parameters.get("checks", ["accessibility", "seo", "performance"])
if not url:
return self._createResult(
success=False,
data={},
error="URL is required"
)
# Read the URL
soup = self._readUrl(url)
if not soup:
return self._createResult(
success=False,
data={},
error="Failed to read URL"
)
validation_results = {}
for check in checks:
if check == "accessibility":
validation_results["accessibility"] = self._checkAccessibility(soup)
elif check == "seo":
validation_results["seo"] = self._checkSEO(soup)
elif check == "performance":
validation_results["performance"] = self._checkPerformance(soup, url)
else:
validation_results[check] = {"status": "unknown", "message": f"Unknown check type: {check}"}
validation_result = {
"url": url,
"checks": checks,
"results": validation_results,
"timestamp": datetime.now(UTC).isoformat()
}
# Create result data
result_data = {
"url": url,
"checks": checks,
"validationResult": validation_result,
"timestamp": datetime.now(UTC).isoformat()
}
return self._createResult(
success=True,
data={
"documentName": f"web_validation_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
"documentData": result_data
}
)
except Exception as e:
logger.error(f"Error validating web page: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)