gateway/modules/methods/methodWeb.py
2025-07-04 15:10:26 +02:00

612 lines
No EOL
22 KiB
Python

"""
Web method module.
Handles web operations using the web service.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
import requests
from bs4 import BeautifulSoup
import time
from modules.workflow.methodBase import MethodBase, ActionResult, action
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
class WebService:
"""Service for web operations like searching and crawling"""
def __init__(self, serviceContainer: Any):
self.serviceContainer = serviceContainer
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
self.timeout = 30
# Web search configuration from agentWebcrawler
self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "")
self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google")
self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto")
self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5"))
if not self.srcApikey:
logger.warning("SerpAPI key not configured for web search")
async def searchWeb(self, query: str, maxResults: int = 10) -> Dict[str, Any]:
"""Search web content using Google search via SerpAPI"""
try:
if not self.srcApikey:
return {
"error": "SerpAPI key not configured",
"query": query
}
# Get user language from service container if available
userLanguage = "en" # Default language
if hasattr(self.serviceContainer, 'user') and hasattr(self.serviceContainer.user, 'language'):
userLanguage = self.serviceContainer.user.language
# Format the search request for SerpAPI
params = {
"engine": self.srcEngine,
"q": query,
"api_key": self.srcApikey,
"num": min(maxResults, self.maxResults), # Number of results to return
"hl": userLanguage # User language
}
# Make the API request
response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout)
response.raise_for_status()
# Parse JSON response
search_results = response.json()
# Extract organic results
results = []
if "organic_results" in search_results:
for result in search_results["organic_results"][:maxResults]:
# Extract title
title = result.get("title", "No title")
# Extract URL
url = result.get("link", "No URL")
# Extract snippet
snippet = result.get("snippet", "No description")
# Get actual page content
try:
targetPageSoup = self._readUrl(url)
content = self._extractMainContent(targetPageSoup)
except Exception as e:
logger.warning(f"Error extracting content from {url}: {str(e)}")
content = f"Error extracting content: {str(e)}"
results.append({
'title': title,
'url': url,
'snippet': snippet,
'content': content
})
# Limit number of results
if len(results) >= maxResults:
break
else:
logger.warning(f"No organic results found in SerpAPI response for: {query}")
return {
"query": query,
"maxResults": maxResults,
"results": results,
"totalFound": len(results),
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Error searching web: {str(e)}")
return {
"error": str(e),
"query": query
}
async def crawlPage(self, url: str, depth: int = 1, followLinks: bool = True, extractContent: bool = True) -> Dict[str, Any]:
"""Crawl web page and extract content"""
try:
# Read the URL
soup = self._readUrl(url)
if not soup:
return {
"error": "Failed to read URL",
"url": url
}
# Extract basic information
title = self._extractTitle(soup, url)
content = self._extractMainContent(soup) if extractContent else ""
# Extract links if requested
links = []
if followLinks:
for link in soup.find_all('a', href=True):
href = link.get('href')
if href and href.startswith(('http://', 'https://')):
links.append({
'url': href,
'text': link.get_text(strip=True)[:100]
})
# Extract images
images = []
for img in soup.find_all('img', src=True):
src = img.get('src')
if src:
images.append({
'src': src,
'alt': img.get('alt', ''),
'title': img.get('title', '')
})
return {
"url": url,
"depth": depth,
"followLinks": followLinks,
"extractContent": extractContent,
"title": title,
"content": content,
"links": links[:10], # Limit to first 10 links
"images": images[:10], # Limit to first 10 images
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Error crawling web page: {str(e)}")
return {
"error": str(e),
"url": url
}
async def extractContent(self, url: str, selectors: Dict[str, str] = None, format: str = "text") -> Dict[str, Any]:
"""Extract content from web page using selectors"""
try:
# Read the URL
soup = self._readUrl(url)
if not soup:
return {
"error": "Failed to read URL",
"url": url
}
extracted_content = {}
if selectors:
# Extract content using provided selectors
for selector_name, selector in selectors.items():
elements = soup.select(selector)
if elements:
if format == "text":
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
elif format == "html":
extracted_content[selector_name] = [str(elem) for elem in elements]
else:
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
else:
extracted_content[selector_name] = []
else:
# Auto-extract common elements
extracted_content = {
"title": self._extractTitle(soup, url),
"main_content": self._extractMainContent(soup),
"headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])],
"links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))],
"images": [img.get('src') for img in soup.find_all('img', src=True)]
}
return {
"url": url,
"selectors": selectors,
"format": format,
"content": extracted_content,
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Error extracting content: {str(e)}")
return {
"error": str(e),
"url": url
}
async def validatePage(self, url: str, checks: List[str] = None) -> Dict[str, Any]:
"""Validate web page for various criteria"""
if checks is None:
checks = ["accessibility", "seo", "performance"]
try:
# Read the URL
soup = self._readUrl(url)
if not soup:
return {
"error": "Failed to read URL",
"url": url
}
validation_results = {}
for check in checks:
if check == "accessibility":
validation_results["accessibility"] = self._checkAccessibility(soup)
elif check == "seo":
validation_results["seo"] = self._checkSEO(soup)
elif check == "performance":
validation_results["performance"] = self._checkPerformance(soup, url)
else:
validation_results[check] = {"status": "unknown", "message": f"Unknown check type: {check}"}
return {
"url": url,
"checks": checks,
"results": validation_results,
"timestamp": datetime.now(UTC).isoformat()
}
except Exception as e:
logger.error(f"Error validating web page: {str(e)}")
return {
"error": str(e),
"url": url
}
def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Check basic accessibility features"""
issues = []
warnings = []
# Check for alt text on images
images_without_alt = soup.find_all('img', alt='')
if images_without_alt:
issues.append(f"Found {len(images_without_alt)} images without alt text")
# Check for proper heading structure
headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
if not headings:
warnings.append("No headings found - poor document structure")
# Check for form labels
forms = soup.find_all('form')
for form in forms:
inputs = form.find_all('input')
for input_elem in inputs:
if input_elem.get('type') not in ['submit', 'button', 'hidden']:
if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}):
warnings.append("Form input without proper label")
return {
"status": "warning" if warnings else "pass",
"issues": issues,
"warnings": warnings
}
def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Check basic SEO features"""
issues = []
warnings = []
# Check for title tag
title = soup.find('title')
if not title:
issues.append("Missing title tag")
elif len(title.get_text()) < 10:
warnings.append("Title tag is too short")
elif len(title.get_text()) > 60:
warnings.append("Title tag is too long")
# Check for meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if not meta_desc:
warnings.append("Missing meta description")
elif meta_desc.get('content'):
if len(meta_desc.get('content')) < 50:
warnings.append("Meta description is too short")
elif len(meta_desc.get('content')) > 160:
warnings.append("Meta description is too long")
# Check for h1 tag
h1_tags = soup.find_all('h1')
if not h1_tags:
warnings.append("No H1 tag found")
elif len(h1_tags) > 1:
warnings.append("Multiple H1 tags found")
return {
"status": "warning" if warnings else "pass",
"issues": issues,
"warnings": warnings
}
def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
"""Check basic performance indicators"""
warnings = []
# Count images
images = soup.find_all('img')
if len(images) > 20:
warnings.append(f"Many images found ({len(images)}) - may impact loading speed")
# Check for external resources
external_scripts = soup.find_all('script', src=True)
external_styles = soup.find_all('link', rel='stylesheet')
if len(external_scripts) > 10:
warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed")
if len(external_styles) > 5:
warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed")
return {
"status": "warning" if warnings else "pass",
"warnings": warnings,
"metrics": {
"images": len(images),
"external_scripts": len(external_scripts),
"external_styles": len(external_styles)
}
}
def _readUrl(self, url: str) -> BeautifulSoup:
"""Read a URL and return a BeautifulSoup parser for the content"""
if not url or not url.startswith(('http://', 'https://')):
return None
headers = {
'User-Agent': self.user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml',
'Accept-Language': 'en-US,en;q=0.9',
}
try:
# Initial request
response = requests.get(url, headers=headers, timeout=self.timeout)
# Handling for status 202
if response.status_code == 202:
# Retry with backoff
backoff_times = [0.5, 1.0, 2.0, 5.0]
for wait_time in backoff_times:
time.sleep(wait_time)
response = requests.get(url, headers=headers, timeout=self.timeout)
if response.status_code != 202:
break
# Raise for error status codes
response.raise_for_status()
# Parse HTML
return BeautifulSoup(response.text, 'html.parser')
except Exception as e:
logger.error(f"Error reading URL {url}: {str(e)}")
return None
def _extractTitle(self, soup: BeautifulSoup, url: str) -> str:
"""Extract the title from a webpage"""
if not soup:
return f"Error with {url}"
# Extract title from title tag
title_tag = soup.find('title')
title = title_tag.text.strip() if title_tag else "No title"
# Alternative: Also look for h1 tags if title tag is missing
if title == "No title":
h1_tag = soup.find('h1')
if h1_tag:
title = h1_tag.text.strip()
return title
def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 10000) -> str:
"""Extract the main content from an HTML page"""
if not soup:
return ""
# Try to find main content elements in priority order
main_content = None
for selector in ['main', 'article', '#content', '.content', '#main', '.main']:
content = soup.select_one(selector)
if content:
main_content = content
break
# If no main content found, use the body
if not main_content:
main_content = soup.find('body') or soup
# Remove script, style, nav, footer elements that don't contribute to main content
for element in main_content.select('script, style, nav, footer, header, aside, .sidebar, #sidebar, .comments, #comments, .advertisement, .ads, iframe'):
element.extract()
# Extract text content
text_content = main_content.get_text(separator=' ', strip=True)
# Limit to max_chars
return text_content[:max_chars]
class MethodWeb(MethodBase):
"""Web method implementation for web operations"""
def __init__(self, serviceContainer: Any):
"""Initialize the web method"""
super().__init__(serviceContainer)
self.name = "web"
self.description = "Handle web operations like searching and crawling"
self.webService = WebService(serviceContainer)
@action
async def search(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Search web content
Parameters:
query (str): Search query
maxResults (int, optional): Maximum number of results (default: 10)
"""
try:
query = parameters.get("query")
maxResults = parameters.get("maxResults", 10)
if not query:
return self._createResult(
success=False,
data={},
error="Search query is required"
)
# Search web
results = await self.webService.searchWeb(
query=query,
maxResults=maxResults
)
return self._createResult(
success=True,
data=results
)
except Exception as e:
logger.error(f"Error searching web: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)
@action
async def crawl(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Crawl web page
Parameters:
url (str): URL to crawl
depth (int, optional): Crawl depth (default: 1)
followLinks (bool, optional): Whether to follow links (default: True)
extractContent (bool, optional): Whether to extract content (default: True)
"""
try:
url = parameters.get("url")
depth = parameters.get("depth", 1)
followLinks = parameters.get("followLinks", True)
extractContent = parameters.get("extractContent", True)
if not url:
return self._createResult(
success=False,
data={},
error="URL is required"
)
# Crawl page
results = await self.webService.crawlPage(
url=url,
depth=depth,
followLinks=followLinks,
extractContent=extractContent
)
return self._createResult(
success=True,
data=results
)
except Exception as e:
logger.error(f"Error crawling web page: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)
@action
async def extract(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Extract content from web page
Parameters:
url (str): URL to extract content from
selectors (Dict[str, str], optional): CSS selectors for specific content
format (str, optional): Output format (default: "text")
"""
try:
url = parameters.get("url")
selectors = parameters.get("selectors", {})
format = parameters.get("format", "text")
if not url:
return self._createResult(
success=False,
data={},
error="URL is required"
)
# Extract content
content = await self.webService.extractContent(
url=url,
selectors=selectors,
format=format
)
return self._createResult(
success=True,
data=content
)
except Exception as e:
logger.error(f"Error extracting content: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)
@action
async def validate(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Validate web page
Parameters:
url (str): URL to validate
checks (List[str], optional): Types of checks to perform (default: ["accessibility", "seo", "performance"])
"""
try:
url = parameters.get("url")
checks = parameters.get("checks", ["accessibility", "seo", "performance"])
if not url:
return self._createResult(
success=False,
data={},
error="URL is required"
)
# Validate page
results = await self.webService.validatePage(
url=url,
checks=checks
)
return self._createResult(
success=True,
data=results
)
except Exception as e:
logger.error(f"Error validating web page: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)