gateway/modules/methods/methodWeb.py
2025-08-29 11:22:49 +02:00

817 lines
34 KiB
Python

"""
Web operations method module.
Handles web scraping, crawling, and search operations.
"""
import logging
import requests
import json
import re
import copy
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
from urllib.parse import urlparse, urljoin
import time
import random
from bs4 import BeautifulSoup
import os
# Selenium imports for JavaScript-heavy pages
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from modules.chat.methodBase import MethodBase, action
from modules.interfaces.interfaceChatModel import ActionResult
from modules.shared.configuration import APP_CONFIG
from modules.shared.timezoneUtils import get_utc_timestamp
logger = logging.getLogger(__name__)
class MethodWeb(MethodBase):
"""
Web method implementation for web operations.
- web.search: Uses Google SerpAPI to find relevant URLs for a query. Returns only search result metadata (title, URL, snippet). Does NOT fetch or extract page content.
- web.crawl: Fetches and extracts main content from a list of URLs, either provided directly or via referenced documents. Uses a headless browser for JavaScript-heavy pages.
"""
def __init__(self, serviceCenter: Any):
super().__init__(serviceCenter)
self.name = "web"
self.description = "Handle web operations like search and crawling"
self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "")
self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google")
self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto")
self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5"))
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
self.timeout = 30
def _format_timestamp_for_filename(self) -> str:
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
def _readUrl(self, url: str) -> BeautifulSoup:
"""Read a URL and return a BeautifulSoup parser for the content with enhanced error handling"""
if not url or not url.startswith(('http://', 'https://')):
logger.error(f"Invalid URL: {url}")
return None
# Enhanced headers to mimic real browser
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9,de;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0'
}
try:
# Use session for better connection handling
session = requests.Session()
session.headers.update(headers)
# Initial request with allow_redirects
response = session.get(url, timeout=self.timeout, allow_redirects=True)
# Handle various status codes
if response.status_code == 200:
# Success - parse content
logger.debug(f"Successfully read URL: {url}")
return BeautifulSoup(response.text, 'html.parser')
elif response.status_code == 202:
# Accepted - retry with backoff
logger.info(f"Status 202 for {url}, retrying with backoff...")
backoff_times = [1.0, 2.0, 5.0, 10.0]
for wait_time in backoff_times:
time.sleep(wait_time)
retry_response = session.get(url, timeout=self.timeout, allow_redirects=True)
if retry_response.status_code == 200:
logger.debug(f"Successfully read URL after retry: {url}")
return BeautifulSoup(retry_response.text, 'html.parser')
elif retry_response.status_code != 202:
break
logger.warning(f"Failed to read URL after retries: {url}")
return None
elif response.status_code in [301, 302, 307, 308]:
# Redirect - should be handled by allow_redirects=True
logger.warning(f"Unexpected redirect status {response.status_code} for {url}")
return None
elif response.status_code == 403:
# Forbidden - try with different user agent
logger.warning(f"403 Forbidden for {url}, trying with different user agent...")
headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
session.headers.update(headers)
retry_response = session.get(url, timeout=self.timeout, allow_redirects=True)
if retry_response.status_code == 200:
logger.debug(f"Successfully read URL with different user agent: {url}")
return BeautifulSoup(retry_response.text, 'html.parser')
else:
logger.error(f"Still getting {retry_response.status_code} for {url}")
return None
elif response.status_code == 429:
# Rate limited - wait and retry
logger.warning(f"Rate limited for {url}, waiting 30 seconds...")
time.sleep(30)
retry_response = session.get(url, timeout=self.timeout, allow_redirects=True)
if retry_response.status_code == 200:
logger.debug(f"Successfully read URL after rate limit: {url}")
return BeautifulSoup(retry_response.text, 'html.parser')
else:
logger.error(f"Still getting {retry_response.status_code} after rate limit wait for {url}")
return None
else:
# Other error status codes
logger.error(f"HTTP {response.status_code} for {url}")
return None
except requests.exceptions.Timeout:
logger.error(f"Timeout reading URL: {url}")
return None
except requests.exceptions.ConnectionError:
logger.error(f"Connection error reading URL: {url}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Request error reading URL {url}: {str(e)}")
return None
except Exception as e:
logger.error(f"Unexpected error reading URL {url}: {str(e)}")
return None
def _extractTitle(self, soup: BeautifulSoup, url: str) -> str:
"""Extract the title from a webpage"""
if not soup:
return f"Error with {url}"
# Extract title from title tag
title_tag = soup.find('title')
title = title_tag.text.strip() if title_tag else "No title"
# Alternative: Also look for h1 tags if title tag is missing
if title == "No title":
h1_tag = soup.find('h1')
if h1_tag:
title = h1_tag.text.strip()
return title
def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 50000) -> str:
"""Extract the main content from an HTML page with enhanced content detection"""
if not soup:
return ""
# Try to find main content elements in priority order with more selectors
main_content = None
content_selectors = [
'main',
'article',
'#content',
'.content',
'#main',
'.main',
'.post-content',
'.entry-content',
'.article-content',
'.page-content',
'[role="main"]',
'.container',
'.wrapper'
]
for selector in content_selectors:
content = soup.select_one(selector)
if content:
main_content = content
logger.debug(f"Found main content using selector: {selector}")
break
# If no main content found, use the body
if not main_content:
main_content = soup.find('body') or soup
logger.debug("Using body as main content")
# Safely copy the main_content element
if main_content is None:
return ""
try:
content_copy = copy.copy(main_content)
except Exception:
content_copy = main_content
# Remove elements that don't contribute to main content (less aggressive)
elements_to_remove = [
'script', 'style', 'noscript',
'nav', 'footer', 'header', 'aside',
'.sidebar', '#sidebar', '.comments', '#comments',
'.advertisement', '.ads', '.ad', '.banner',
'iframe', '.social-share', '.share-buttons',
'.breadcrumb', '.breadcrumbs', '.pagination',
'.related-posts', '.related-articles',
'.newsletter', '.subscribe', '.signup',
'.cookie-notice', '.privacy-notice',
'.popup', '.modal', '.overlay'
]
for selector in elements_to_remove:
for element in content_copy.select(selector):
element.extract()
# Extract text content with better formatting
text_content = content_copy.get_text(separator='\n', strip=True)
# Clean up the text
lines = text_content.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if line and len(line) > 10: # Only keep meaningful lines
cleaned_lines.append(line)
# Join lines with proper spacing
cleaned_content = '\n\n'.join(cleaned_lines)
# If content is too short, try alternative extraction
if len(cleaned_content) < 500:
logger.debug("Content too short, trying alternative extraction...")
# Try to extract from all paragraphs
paragraphs = soup.find_all(['p', 'div', 'section'])
alt_content = []
for p in paragraphs:
text = p.get_text(strip=True)
if text and len(text) > 20: # Only meaningful paragraphs
alt_content.append(text)
if alt_content:
cleaned_content = '\n\n'.join(alt_content[:20]) # Limit to first 20 paragraphs
# Limit to max_chars but preserve complete sentences
if len(cleaned_content) > max_chars:
# Try to cut at a sentence boundary
sentences = cleaned_content.split('. ')
truncated_content = ""
for sentence in sentences:
if len(truncated_content + sentence) < max_chars:
truncated_content += sentence + ". "
else:
break
cleaned_content = truncated_content.strip()
logger.debug(f"Extracted {len(cleaned_content)} characters of content")
return cleaned_content
def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Check basic accessibility features"""
issues = []
warnings = []
# Check for alt text on images
images_without_alt = soup.find_all('img', alt='')
if images_without_alt:
issues.append(f"Found {len(images_without_alt)} images without alt text")
# Check for proper heading structure
headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
if not headings:
warnings.append("No headings found - poor document structure")
# Check for form labels
forms = soup.find_all('form')
for form in forms:
inputs = form.find_all('input')
for input_elem in inputs:
if input_elem.get('type') not in ['submit', 'button', 'hidden']:
if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}):
warnings.append("Form input without proper label")
return {
"status": "warning" if warnings else "pass",
"issues": issues,
"warnings": warnings
}
def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Check basic SEO features"""
issues = []
warnings = []
# Check for title tag
title = soup.find('title')
if not title:
issues.append("Missing title tag")
elif len(title.get_text()) < 10:
warnings.append("Title tag is too short")
elif len(title.get_text()) > 60:
warnings.append("Title tag is too long")
# Check for meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if not meta_desc:
warnings.append("Missing meta description")
elif meta_desc.get('content'):
if len(meta_desc.get('content')) < 50:
warnings.append("Meta description is too short")
elif len(meta_desc.get('content')) > 160:
warnings.append("Meta description is too long")
# Check for h1 tag
h1_tags = soup.find_all('h1')
if not h1_tags:
warnings.append("No H1 tag found")
elif len(h1_tags) > 1:
warnings.append("Multiple H1 tags found")
return {
"status": "warning" if warnings else "pass",
"issues": issues,
"warnings": warnings
}
def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
"""Check basic performance indicators"""
warnings = []
# Count images
images = soup.find_all('img')
if len(images) > 20:
warnings.append(f"Many images found ({len(images)}) - may impact loading speed")
# Check for external resources
external_scripts = soup.find_all('script', src=True)
external_styles = soup.find_all('link', rel='stylesheet')
if len(external_scripts) > 10:
warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed")
if len(external_styles) > 5:
warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed")
return {
"status": "warning" if warnings else "pass",
"warnings": warnings,
"metrics": {
"images": len(images),
"external_scripts": len(external_scripts),
"external_styles": len(external_styles)
}
}
def _detectJavaScriptRendering(self, soup: BeautifulSoup) -> bool:
"""Detect if a page likely requires JavaScript rendering"""
if not soup:
return False
# Check for common indicators of JavaScript-rendered content
indicators = [
# Angular, React, Vue indicators
soup.find('div', {'ng-app': True}),
soup.find('div', {'id': 'root'}),
soup.find('div', {'id': 'app'}),
soup.find('div', {'id': 'react-root'}),
# SPA indicators
soup.find('div', {'id': 'spa-root'}),
soup.find('div', {'class': 'spa-container'}),
# Modern framework indicators
soup.find('div', {'data-reactroot': True}),
soup.find('div', {'data-ng-controller': True}),
# Empty content with scripts
len(soup.get_text(strip=True)) < 100 and len(soup.find_all('script')) > 2
]
return any(indicators)
def _extractMetaInformation(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
"""Extract meta information from the page"""
meta_info = {
"url": url,
"title": self._extractTitle(soup, url),
"description": "",
"keywords": "",
"author": "",
"language": "",
"robots": "",
"viewport": "",
"charset": "",
"canonical": ""
}
# Extract meta tags
meta_tags = soup.find_all('meta')
for meta in meta_tags:
name = meta.get('name', '').lower()
property = meta.get('property', '').lower()
content = meta.get('content', '')
if name == 'description' or property == 'og:description':
meta_info['description'] = content
elif name == 'keywords':
meta_info['keywords'] = content
elif name == 'author':
meta_info['author'] = content
elif name == 'language':
meta_info['language'] = content
elif name == 'robots':
meta_info['robots'] = content
elif name == 'viewport':
meta_info['viewport'] = content
elif property == 'og:title':
meta_info['title'] = content
elif property == 'og:url':
meta_info['canonical'] = content
# Extract charset
charset_meta = soup.find('meta', charset=True)
if charset_meta:
meta_info['charset'] = charset_meta.get('charset', '')
# Extract canonical URL
canonical_link = soup.find('link', rel='canonical')
if canonical_link:
meta_info['canonical'] = canonical_link.get('href', '')
return meta_info
def _getAlternativeApproaches(self, url: str, requires_js: bool, content_length: int) -> List[str]:
"""Get alternative approaches for sites that are difficult to crawl"""
approaches = []
if requires_js:
approaches.extend([
"Site requires JavaScript rendering - consider using a headless browser",
"Try accessing the site's API endpoints directly",
"Look for RSS feeds or sitemaps",
"Check if the site has a mobile version that's easier to parse"
])
if content_length < 100:
approaches.extend([
"Site may have anti-bot protection - try with different user agents",
"Check if the site requires authentication",
"Look for alternative URLs (www vs non-www, http vs https)",
"Try accessing the site's robots.txt for crawling guidelines"
])
# Add general suggestions
approaches.extend([
"Use the web.search action to find alternative sources",
"Try the web.scrape action with specific CSS selectors",
"Check if the site has a public API or data export"
])
return approaches
@action
async def search(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Perform a web search and output a .txt file with a plain list of URLs (one per line).
Parameters:
query (str): Search query to perform
maxResults (int, optional): Maximum number of results (default: 10)
filter (str, optional): Filter criteria for search results
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:
query = parameters.get("query")
max_results = parameters.get("maxResults", 10)
filter_param = parameters.get("filter")
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
if not query:
return ActionResult.isFailure(error="Search query is required")
if not self.srcApikey:
return ActionResult.isFailure(error="SerpAPI key not configured")
userLanguage = "en"
if hasattr(self.service, 'user') and hasattr(self.service.user, 'language'):
userLanguage = self.service.user.language
params = {
"engine": self.srcEngine,
"q": query,
"api_key": self.srcApikey,
"num": min(max_results, self.maxResults),
"hl": userLanguage
}
if filter_param:
params["filter"] = filter_param
response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout)
response.raise_for_status()
search_results = response.json()
results = []
if "organic_results" in search_results:
results = search_results["organic_results"][:max_results]
# Assume 'results' is a list of dicts with 'url' keys
urls = [item['url'] for item in results if 'url' in item and isinstance(item['url'], str)]
url_list_str = "\n".join(urls)
# Determine output format based on expected formats
output_extension = ".txt" # Default
output_mime_type = "text/plain" # Default
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
# Use the first expected format
expected_format = expectedDocumentFormats[0]
output_extension = expected_format.get("extension", ".txt")
output_mime_type = expected_format.get("mimeType", "text/plain")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
else:
logger.info("No expected format specified, using default .txt format")
# Create result data
result_data = {
"query": query,
"maxResults": max_results,
"filter": filter_param,
"totalResults": len(urls),
"urls": urls,
"urlList": url_list_str,
"timestamp": get_utc_timestamp()
}
return ActionResult(
success=True,
documents=[
{
"documentName": f"web_search_{self._format_timestamp_for_filename()}{output_extension}",
"documentData": result_data,
"mimeType": output_mime_type
}
]
)
except Exception as e:
logger.error(f"Error searching web: {str(e)}")
return ActionResult(
success=False,
error=str(e)
)
def _selenium_extract_content(self, url: str) -> Optional[str]:
"""Use Selenium to fetch and extract main content from a JS-heavy page."""
options = Options()
options.headless = True
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument(f'user-agent={self.user_agent}')
try:
driver = webdriver.Chrome(options=options)
driver.set_page_load_timeout(self.timeout)
driver.get(url)
# Wait for body to load
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
html = driver.page_source
driver.quit()
soup = BeautifulSoup(html, 'html.parser')
return self._extractMainContent(soup)
except WebDriverException as e:
logger.warning(f"Selenium failed for {url}: {str(e)}")
return None
except Exception as e:
logger.warning(f"Selenium error for {url}: {str(e)}")
return None
@action
async def crawl(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Crawl a list of URLs provided in a document (.txt) with URLs separated by newline, comma, or semicolon.
Parameters:
document (str): Document containing URL list
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:
document = parameters.get("document")
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
if not document:
return ActionResult.isFailure(error="No document with URL list provided.")
# Read the document content
with open(document, "r", encoding="utf-8") as f:
content = f.read()
# Split URLs by newline, comma, or semicolon
import re
urls = re.split(r'[\n,;]+', content)
urls = [u.strip() for u in urls if u.strip()]
if not urls:
return ActionResult.isFailure(error="No valid URLs provided in the document.")
crawl_results = []
for url in urls:
try:
logger.info(f"Crawling URL: {url}")
# Try Selenium first
content = self._selenium_extract_content(url)
if not content:
# Fallback to requests/BeautifulSoup
soup = self._readUrl(url)
content = self._extractMainContent(soup)
title = self._extractTitle(BeautifulSoup(content, 'html.parser'), url) if content else "No title"
meta_info = {"url": url, "title": title}
content_length = len(content) if content else 0
crawl_results.append({
"url": url,
"title": title,
"content": content,
"content_length": content_length,
"meta_info": meta_info,
"timestamp": get_utc_timestamp()
})
logger.info(f"Successfully crawled {url} - extracted {content_length} characters")
except Exception as e:
logger.error(f"Error crawling web page {url}: {str(e)}")
crawl_results.append({
"error": str(e),
"url": url,
"suggestions": [
"Check if the URL is accessible",
"Try with a different user agent",
"Verify the site doesn't block automated access"
]
})
# Determine output format based on expected formats
output_extension = ".json" # Default
output_mime_type = "application/json" # Default
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
# Use the first expected format
expected_format = expectedDocumentFormats[0]
output_extension = expected_format.get("extension", ".json")
output_mime_type = expected_format.get("mimeType", "application/json")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
else:
logger.info("No expected format specified, using default .json format")
result_data = {
"urls": urls,
"maxDepth": 1, # Simplified crawl
"includeImages": False,
"followLinks": True,
"crawlResults": crawl_results,
"summary": {
"total_urls": len(urls),
"successful_crawls": len([r for r in crawl_results if "error" not in r]),
"failed_crawls": len([r for r in crawl_results if "error" in r]),
"total_content_chars": sum([r.get("content_length", 0) for r in crawl_results if "content_length" in r])
},
"timestamp": get_utc_timestamp()
}
return ActionResult(
success=True,
documents=[
{
"documentName": f"web_crawl_{self._format_timestamp_for_filename()}{output_extension}",
"documentData": result_data,
"mimeType": output_mime_type
}
]
)
except Exception as e:
logger.error(f"Error crawling web pages: {str(e)}")
return ActionResult(
success=False,
error=str(e)
)
@action
async def scrape(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Scrape specific data from web pages
Parameters:
url (str): URL to scrape
selectors (Dict[str, str]): CSS selectors for data extraction
format (str, optional): Output format (default: "json")
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:
url = parameters.get("url")
selectors = parameters.get("selectors")
format = parameters.get("format", "json")
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
if not url or not selectors:
return ActionResult(
success=False,
error="URL and selectors are required"
)
# Read the URL
soup = self._readUrl(url)
if not soup:
return ActionResult(
success=False,
error="Failed to read URL"
)
extracted_content = {}
if selectors:
# Extract content using provided selectors
for selector_name, selector in selectors.items():
elements = soup.select(selector)
if elements:
if format == "text":
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
elif format == "html":
extracted_content[selector_name] = [str(elem) for elem in elements]
else:
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
else:
extracted_content[selector_name] = []
else:
# Auto-extract common elements
extracted_content = {
"title": self._extractTitle(soup, url),
"main_content": self._extractMainContent(soup),
"headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])],
"links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))],
"images": [img.get('src') for img in soup.find_all('img', src=True)]
}
scrape_result = {
"url": url,
"selectors": selectors,
"format": format,
"content": extracted_content,
"timestamp": get_utc_timestamp()
}
# Create result data
result_data = {
"url": url,
"selectors": selectors,
"format": format,
"scrapedData": scrape_result,
"timestamp": get_utc_timestamp()
}
# Determine output format based on expected formats
output_extension = f".{format}" # Default to format parameter
output_mime_type = "application/json" # Default
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
# Use the first expected format
expected_format = expectedDocumentFormats[0]
output_extension = expected_format.get("extension", f".{format}")
output_mime_type = expected_format.get("mimeType", "application/json")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
else:
logger.info(f"No expected format specified, using format parameter: {format}")
return ActionResult(
success=True,
documents=[
{
"documentName": f"web_scrape_{self._format_timestamp_for_filename()}{output_extension}",
"documentData": result_data,
"mimeType": output_mime_type
}
]
)
except Exception as e:
logger.error(f"Error scraping web page: {str(e)}")
return ActionResult(
success=False,
error=str(e)
)