630 lines
No EOL
24 KiB
Python
630 lines
No EOL
24 KiB
Python
"""
|
|
Web method module.
|
|
Handles web operations using the web service.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, UTC
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import time
|
|
import uuid
|
|
|
|
from modules.workflow.methodBase import MethodBase, ActionResult, action
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MethodWeb(MethodBase):
|
|
"""Web method implementation for web operations"""
|
|
|
|
def __init__(self, serviceContainer: Any):
|
|
"""Initialize the web method"""
|
|
super().__init__(serviceContainer)
|
|
self.name = "web"
|
|
self.description = "Handle web operations like crawling and scraping"
|
|
|
|
# Web search configuration from agentWebcrawler
|
|
self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "")
|
|
self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google")
|
|
self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto")
|
|
self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5"))
|
|
|
|
if not self.srcApikey:
|
|
logger.warning("SerpAPI key not configured for web search")
|
|
|
|
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
|
self.timeout = 30
|
|
|
|
def _readUrl(self, url: str) -> BeautifulSoup:
|
|
"""Read a URL and return a BeautifulSoup parser for the content"""
|
|
if not url or not url.startswith(('http://', 'https://')):
|
|
return None
|
|
|
|
headers = {
|
|
'User-Agent': self.user_agent,
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml',
|
|
'Accept-Language': 'en-US,en;q=0.9',
|
|
}
|
|
|
|
try:
|
|
# Initial request
|
|
response = requests.get(url, headers=headers, timeout=self.timeout)
|
|
|
|
# Handling for status 202
|
|
if response.status_code == 202:
|
|
# Retry with backoff
|
|
backoff_times = [0.5, 1.0, 2.0, 5.0]
|
|
|
|
for wait_time in backoff_times:
|
|
time.sleep(wait_time)
|
|
response = requests.get(url, headers=headers, timeout=self.timeout)
|
|
|
|
if response.status_code != 202:
|
|
break
|
|
|
|
# Raise for error status codes
|
|
response.raise_for_status()
|
|
|
|
# Parse HTML
|
|
return BeautifulSoup(response.text, 'html.parser')
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading URL {url}: {str(e)}")
|
|
return None
|
|
|
|
def _extractTitle(self, soup: BeautifulSoup, url: str) -> str:
|
|
"""Extract the title from a webpage"""
|
|
if not soup:
|
|
return f"Error with {url}"
|
|
|
|
# Extract title from title tag
|
|
title_tag = soup.find('title')
|
|
title = title_tag.text.strip() if title_tag else "No title"
|
|
|
|
# Alternative: Also look for h1 tags if title tag is missing
|
|
if title == "No title":
|
|
h1_tag = soup.find('h1')
|
|
if h1_tag:
|
|
title = h1_tag.text.strip()
|
|
|
|
return title
|
|
|
|
def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 10000) -> str:
|
|
"""Extract the main content from an HTML page"""
|
|
if not soup:
|
|
return ""
|
|
|
|
# Try to find main content elements in priority order
|
|
main_content = None
|
|
for selector in ['main', 'article', '#content', '.content', '#main', '.main']:
|
|
content = soup.select_one(selector)
|
|
if content:
|
|
main_content = content
|
|
break
|
|
|
|
# If no main content found, use the body
|
|
if not main_content:
|
|
main_content = soup.find('body') or soup
|
|
|
|
# Remove script, style, nav, footer elements that don't contribute to main content
|
|
for element in main_content.select('script, style, nav, footer, header, aside, .sidebar, #sidebar, .comments, #comments, .advertisement, .ads, iframe'):
|
|
element.extract()
|
|
|
|
# Extract text content
|
|
text_content = main_content.get_text(separator=' ', strip=True)
|
|
|
|
# Limit to max_chars
|
|
return text_content[:max_chars]
|
|
|
|
def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]:
|
|
"""Check basic accessibility features"""
|
|
issues = []
|
|
warnings = []
|
|
|
|
# Check for alt text on images
|
|
images_without_alt = soup.find_all('img', alt='')
|
|
if images_without_alt:
|
|
issues.append(f"Found {len(images_without_alt)} images without alt text")
|
|
|
|
# Check for proper heading structure
|
|
headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
|
|
if not headings:
|
|
warnings.append("No headings found - poor document structure")
|
|
|
|
# Check for form labels
|
|
forms = soup.find_all('form')
|
|
for form in forms:
|
|
inputs = form.find_all('input')
|
|
for input_elem in inputs:
|
|
if input_elem.get('type') not in ['submit', 'button', 'hidden']:
|
|
if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}):
|
|
warnings.append("Form input without proper label")
|
|
|
|
return {
|
|
"status": "warning" if warnings else "pass",
|
|
"issues": issues,
|
|
"warnings": warnings
|
|
}
|
|
|
|
def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]:
|
|
"""Check basic SEO features"""
|
|
issues = []
|
|
warnings = []
|
|
|
|
# Check for title tag
|
|
title = soup.find('title')
|
|
if not title:
|
|
issues.append("Missing title tag")
|
|
elif len(title.get_text()) < 10:
|
|
warnings.append("Title tag is too short")
|
|
elif len(title.get_text()) > 60:
|
|
warnings.append("Title tag is too long")
|
|
|
|
# Check for meta description
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'})
|
|
if not meta_desc:
|
|
warnings.append("Missing meta description")
|
|
elif meta_desc.get('content'):
|
|
if len(meta_desc.get('content')) < 50:
|
|
warnings.append("Meta description is too short")
|
|
elif len(meta_desc.get('content')) > 160:
|
|
warnings.append("Meta description is too long")
|
|
|
|
# Check for h1 tag
|
|
h1_tags = soup.find_all('h1')
|
|
if not h1_tags:
|
|
warnings.append("No H1 tag found")
|
|
elif len(h1_tags) > 1:
|
|
warnings.append("Multiple H1 tags found")
|
|
|
|
return {
|
|
"status": "warning" if warnings else "pass",
|
|
"issues": issues,
|
|
"warnings": warnings
|
|
}
|
|
|
|
def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
|
|
"""Check basic performance indicators"""
|
|
warnings = []
|
|
|
|
# Count images
|
|
images = soup.find_all('img')
|
|
if len(images) > 20:
|
|
warnings.append(f"Many images found ({len(images)}) - may impact loading speed")
|
|
|
|
# Check for external resources
|
|
external_scripts = soup.find_all('script', src=True)
|
|
external_styles = soup.find_all('link', rel='stylesheet')
|
|
|
|
if len(external_scripts) > 10:
|
|
warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed")
|
|
|
|
if len(external_styles) > 5:
|
|
warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed")
|
|
|
|
return {
|
|
"status": "warning" if warnings else "pass",
|
|
"warnings": warnings,
|
|
"metrics": {
|
|
"images": len(images),
|
|
"external_scripts": len(external_scripts),
|
|
"external_styles": len(external_styles)
|
|
}
|
|
}
|
|
|
|
@action
|
|
async def crawl(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Crawl web pages and extract content
|
|
|
|
Parameters:
|
|
urls (List[str]): List of URLs to crawl
|
|
maxDepth (int, optional): Maximum crawl depth (default: 2)
|
|
includeImages (bool, optional): Whether to include images (default: False)
|
|
followLinks (bool, optional): Whether to follow links (default: True)
|
|
"""
|
|
try:
|
|
urls = parameters.get("urls")
|
|
maxDepth = parameters.get("maxDepth", 2)
|
|
includeImages = parameters.get("includeImages", False)
|
|
followLinks = parameters.get("followLinks", True)
|
|
|
|
if not urls:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="URLs are required"
|
|
)
|
|
|
|
# Crawl each URL
|
|
crawl_results = []
|
|
|
|
for url in urls:
|
|
try:
|
|
# Read the URL
|
|
soup = self._readUrl(url)
|
|
if not soup:
|
|
crawl_results.append({
|
|
"error": "Failed to read URL",
|
|
"url": url
|
|
})
|
|
continue
|
|
|
|
# Extract basic information
|
|
title = self._extractTitle(soup, url)
|
|
content = self._extractMainContent(soup) if True else ""
|
|
|
|
# Extract links if requested
|
|
links = []
|
|
if followLinks:
|
|
for link in soup.find_all('a', href=True):
|
|
href = link.get('href')
|
|
if href and href.startswith(('http://', 'https://')):
|
|
links.append({
|
|
'url': href,
|
|
'text': link.get_text(strip=True)[:100]
|
|
})
|
|
|
|
# Extract images
|
|
images = []
|
|
for img in soup.find_all('img', src=True):
|
|
src = img.get('src')
|
|
if src:
|
|
images.append({
|
|
'src': src,
|
|
'alt': img.get('alt', ''),
|
|
'title': img.get('title', '')
|
|
})
|
|
|
|
crawl_results.append({
|
|
"url": url,
|
|
"depth": maxDepth,
|
|
"followLinks": followLinks,
|
|
"extractContent": True,
|
|
"title": title,
|
|
"content": content,
|
|
"links": links[:10], # Limit to first 10 links
|
|
"images": images[:10], # Limit to first 10 images
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error crawling web page {url}: {str(e)}")
|
|
crawl_results.append({
|
|
"error": str(e),
|
|
"url": url
|
|
})
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"urls": urls,
|
|
"maxDepth": maxDepth,
|
|
"includeImages": includeImages,
|
|
"followLinks": followLinks,
|
|
"crawlResults": crawl_results,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_crawl_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
|
|
"documentData": result_data
|
|
}
|
|
]
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error crawling web pages: {str(e)}")
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error=str(e)
|
|
)
|
|
|
|
@action
|
|
async def scrape(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Scrape specific data from web pages
|
|
|
|
Parameters:
|
|
url (str): URL to scrape
|
|
selectors (Dict[str, str]): CSS selectors for data extraction
|
|
format (str, optional): Output format (default: "json")
|
|
"""
|
|
try:
|
|
url = parameters.get("url")
|
|
selectors = parameters.get("selectors")
|
|
format = parameters.get("format", "json")
|
|
|
|
if not url or not selectors:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="URL and selectors are required"
|
|
)
|
|
|
|
# Read the URL
|
|
soup = self._readUrl(url)
|
|
if not soup:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="Failed to read URL"
|
|
)
|
|
|
|
extracted_content = {}
|
|
|
|
if selectors:
|
|
# Extract content using provided selectors
|
|
for selector_name, selector in selectors.items():
|
|
elements = soup.select(selector)
|
|
if elements:
|
|
if format == "text":
|
|
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
|
|
elif format == "html":
|
|
extracted_content[selector_name] = [str(elem) for elem in elements]
|
|
else:
|
|
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
|
|
else:
|
|
extracted_content[selector_name] = []
|
|
else:
|
|
# Auto-extract common elements
|
|
extracted_content = {
|
|
"title": self._extractTitle(soup, url),
|
|
"main_content": self._extractMainContent(soup),
|
|
"headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])],
|
|
"links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))],
|
|
"images": [img.get('src') for img in soup.find_all('img', src=True)]
|
|
}
|
|
|
|
scrape_result = {
|
|
"url": url,
|
|
"selectors": selectors,
|
|
"format": format,
|
|
"content": extracted_content,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"url": url,
|
|
"selectors": selectors,
|
|
"format": format,
|
|
"scrapedData": scrape_result,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_scrape_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.{format}",
|
|
"documentData": result_data
|
|
}
|
|
]
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error scraping web page: {str(e)}")
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error=str(e)
|
|
)
|
|
|
|
@action
|
|
async def search(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Search web content
|
|
|
|
Parameters:
|
|
query (str): Search query
|
|
engine (str, optional): Search engine to use (default: "google")
|
|
maxResults (int, optional): Maximum number of results (default: 10)
|
|
filter (str, optional): Additional search filters
|
|
"""
|
|
try:
|
|
query = parameters.get("query")
|
|
engine = parameters.get("engine", "google")
|
|
maxResults = parameters.get("maxResults", 10)
|
|
filter = parameters.get("filter")
|
|
|
|
if not query:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="Search query is required"
|
|
)
|
|
|
|
# Search web content using Google search via SerpAPI
|
|
try:
|
|
if not self.srcApikey:
|
|
search_result = {
|
|
"error": "SerpAPI key not configured",
|
|
"query": query
|
|
}
|
|
else:
|
|
# Get user language from service container if available
|
|
userLanguage = "en" # Default language
|
|
if hasattr(self.service, 'user') and hasattr(self.service.user, 'language'):
|
|
userLanguage = self.service.user.language
|
|
|
|
# Format the search request for SerpAPI
|
|
params = {
|
|
"engine": self.srcEngine,
|
|
"q": query,
|
|
"api_key": self.srcApikey,
|
|
"num": min(maxResults, self.maxResults), # Number of results to return
|
|
"hl": userLanguage # User language
|
|
}
|
|
|
|
# Make the API request
|
|
response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
|
|
# Parse JSON response
|
|
search_results = response.json()
|
|
|
|
# Extract organic results
|
|
results = []
|
|
|
|
if "organic_results" in search_results:
|
|
for result in search_results["organic_results"][:maxResults]:
|
|
# Extract title
|
|
title = result.get("title", "No title")
|
|
|
|
# Extract URL
|
|
url = result.get("link", "No URL")
|
|
|
|
# Extract snippet
|
|
snippet = result.get("snippet", "No description")
|
|
|
|
# Get actual page content
|
|
try:
|
|
targetPageSoup = self._readUrl(url)
|
|
content = self._extractMainContent(targetPageSoup)
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting content from {url}: {str(e)}")
|
|
content = f"Error extracting content: {str(e)}"
|
|
|
|
results.append({
|
|
'title': title,
|
|
'url': url,
|
|
'snippet': snippet,
|
|
'content': content
|
|
})
|
|
|
|
# Limit number of results
|
|
if len(results) >= maxResults:
|
|
break
|
|
else:
|
|
logger.warning(f"No organic results found in SerpAPI response for: {query}")
|
|
|
|
search_result = {
|
|
"query": query,
|
|
"maxResults": maxResults,
|
|
"results": results,
|
|
"totalFound": len(results),
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching web: {str(e)}")
|
|
search_result = {
|
|
"error": str(e),
|
|
"query": query
|
|
}
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"query": query,
|
|
"engine": engine,
|
|
"maxResults": maxResults,
|
|
"filter": filter,
|
|
"searchResults": search_result,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_search_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
|
|
"documentData": result_data
|
|
}
|
|
]
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching web: {str(e)}")
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error=str(e)
|
|
)
|
|
|
|
@action
|
|
async def validate(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Validate web pages for various criteria
|
|
|
|
Parameters:
|
|
url (str): URL to validate
|
|
checks (List[str], optional): Types of checks to perform (default: ["accessibility", "seo", "performance"])
|
|
"""
|
|
try:
|
|
url = parameters.get("url")
|
|
checks = parameters.get("checks", ["accessibility", "seo", "performance"])
|
|
|
|
if not url:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="URL is required"
|
|
)
|
|
|
|
# Read the URL
|
|
soup = self._readUrl(url)
|
|
if not soup:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="Failed to read URL"
|
|
)
|
|
|
|
validation_results = {}
|
|
|
|
for check in checks:
|
|
if check == "accessibility":
|
|
validation_results["accessibility"] = self._checkAccessibility(soup)
|
|
elif check == "seo":
|
|
validation_results["seo"] = self._checkSEO(soup)
|
|
elif check == "performance":
|
|
validation_results["performance"] = self._checkPerformance(soup, url)
|
|
else:
|
|
validation_results[check] = {"status": "unknown", "message": f"Unknown check type: {check}"}
|
|
|
|
validation_result = {
|
|
"url": url,
|
|
"checks": checks,
|
|
"results": validation_results,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"url": url,
|
|
"checks": checks,
|
|
"validationResult": validation_result,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_validation_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
|
|
"documentData": result_data
|
|
}
|
|
]
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating web page: {str(e)}")
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error=str(e)
|
|
) |